1use std::collections::{HashMap, HashSet};
2
3use crate::{
4 governance::model::Quorum,
5 model::common::{CeilingMap, Interval, IntervalSet, emit_fail},
6};
7use async_trait::async_trait;
8use ave_actors::{
9 Actor, ActorContext, ActorError, ActorPath, Event, Handler,
10 LightPersistence, Message, PersistentActor, Response,
11};
12
13use ave_common::{Namespace, SchemaType, identity::PublicKey};
14use borsh::{BorshDeserialize, BorshSerialize};
15use serde::{Deserialize, Serialize};
16use tracing::{Span, debug, error, info_span};
17
18use crate::db::Storable;
19
20#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
21pub struct SearchRole {
22 pub schema_id: SchemaType,
23 pub namespace: Namespace,
24}
25
26#[derive(
27 Debug,
28 Clone,
29 Serialize,
30 Deserialize,
31 Hash,
32 Eq,
33 PartialEq,
34 Ord,
35 PartialOrd,
36 BorshDeserialize,
37 BorshSerialize,
38)]
39pub struct RoleData {
40 pub key: PublicKey,
41 pub namespace: Namespace,
42}
43
44#[derive(
45 Debug,
46 Clone,
47 Serialize,
48 Deserialize,
49 BorshDeserialize,
50 BorshSerialize,
51 Default,
52)]
53pub struct RoleRegister {
54 version: u64,
55
56 appr_quorum: Quorum,
57 approvers: HashSet<PublicKey>,
58
59 eval_quorum: HashMap<SchemaType, Quorum>,
60 evaluators: HashMap<SchemaType, HashSet<(PublicKey, Namespace)>>,
61
62 vali_quorum: HashMap<SchemaType, CeilingMap<Quorum>>,
63 validators:
64 HashMap<SchemaType, HashMap<(PublicKey, Namespace), IntervalData>>,
65}
66
67type IntervalData = (IntervalSet, Option<u64>);
68
69impl RoleRegister {
70 pub fn new() -> Self {
71 Self {
72 version: 0,
73 appr_quorum: Quorum::Majority,
74 eval_quorum: HashMap::new(),
75 vali_quorum: HashMap::new(),
76 evaluators: HashMap::new(),
77 validators: HashMap::new(),
78 approvers: HashSet::new(),
79 }
80 }
81}
82
83#[derive(
84 Debug, Clone, Deserialize, Serialize, BorshDeserialize, BorshSerialize,
85)]
86pub struct UpdateRole {
87 pub schema_id: SchemaType,
88 pub role: HashSet<RoleData>,
89}
90
91#[derive(
92 Debug, Clone, Deserialize, Serialize, BorshDeserialize, BorshSerialize,
93)]
94pub struct UpdateQuorum {
95 pub schema_id: SchemaType,
96 pub quorum: Quorum,
97}
98
99#[derive(Debug, Clone)]
100pub struct RoleDataRegister {
101 pub workers: HashSet<PublicKey>,
102 pub quorum: Quorum,
103}
104
105#[derive(Debug, Clone)]
106pub enum RoleRegisterMessage {
107 SearchActualRoles {
108 version: u64,
109 evaluation: SearchRole,
110 approval: bool,
111 },
112 SearchValidators {
113 search: SearchRole,
114 version: u64,
115 },
116 UpdateVersion {
117 version: u64,
118 },
119 UpdateFact {
120 version: u64,
121
122 appr_quorum: Option<Quorum>,
123 eval_quorum: HashMap<SchemaType, Quorum>,
124 vali_quorum: HashMap<SchemaType, Quorum>,
125
126 new_approvers: Vec<PublicKey>,
127 remove_approvers: Vec<PublicKey>,
128
129 new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
130 remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
131
132 new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
133 remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
134 },
135 UpdateConfirm {
136 version: u64,
137
138 new_approver: Option<PublicKey>,
139 remove_approver: PublicKey,
140
141 new_evaluator: Option<PublicKey>,
142 remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
143
144 new_validator: Option<PublicKey>,
145 remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
146 },
147}
148impl Message for RoleRegisterMessage {
149 fn is_critical(&self) -> bool {
150 matches!(
151 self,
152 Self::UpdateVersion { .. }
153 | Self::UpdateFact { .. }
154 | Self::UpdateConfirm { .. }
155 )
156 }
157}
158
159#[derive(Debug, Clone)]
160pub enum RoleRegisterResponse {
161 ActualRoles {
162 evaluation: RoleDataRegister,
163 approval: Option<RoleDataRegister>,
164 },
165 Validation(RoleDataRegister),
166 MissingData,
167 OutOfVersion,
168 Ok,
169}
170
171impl Response for RoleRegisterResponse {}
172
173#[derive(
174 Debug, Clone, Deserialize, Serialize, BorshDeserialize, BorshSerialize,
175)]
176pub enum RoleRegisterEvent {
177 UpdateVersion {
178 version: u64,
179 },
180 UpdateFact {
181 version: u64,
182
183 appr_quorum: Option<Quorum>,
184 eval_quorum: HashMap<SchemaType, Quorum>,
185 vali_quorum: HashMap<SchemaType, Quorum>,
186
187 new_approvers: Vec<PublicKey>,
188 remove_approvers: Vec<PublicKey>,
189
190 new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
191 remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
192
193 new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
194 remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
195 },
196 UpdateConfirm {
197 version: u64,
198
199 new_approver: Option<PublicKey>,
200 remove_approver: PublicKey,
201
202 new_evaluator: Option<PublicKey>,
203 remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
204
205 new_validator: Option<PublicKey>,
206 remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
207 },
208}
209
210impl Event for RoleRegisterEvent {}
211
212#[async_trait]
213impl Actor for RoleRegister {
214 type Event = RoleRegisterEvent;
215 type Message = RoleRegisterMessage;
216 type Response = RoleRegisterResponse;
217
218 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
219 parent_span.map_or_else(
220 || info_span!("RoleRegister"),
221 |parent_span| info_span!(parent: parent_span, "RoleRegister"),
222 )
223 }
224
225 async fn pre_start(
226 &mut self,
227 ctx: &mut ActorContext<Self>,
228 ) -> Result<(), ActorError> {
229 let prefix = ctx.path().parent().key();
230 if let Err(e) = self
231 .init_store("role_register", Some(prefix), true, ctx)
232 .await
233 {
234 error!(
235 error = %e,
236 "Failed to initialize role_register store"
237 );
238 return Err(e);
239 }
240 Ok(())
241 }
242}
243
244#[async_trait]
245impl Handler<Self> for RoleRegister {
246 async fn handle_message(
247 &mut self,
248 _sender: ActorPath,
249 msg: RoleRegisterMessage,
250 ctx: &mut ActorContext<Self>,
251 ) -> Result<RoleRegisterResponse, ActorError> {
252 match msg {
253 RoleRegisterMessage::SearchActualRoles {
254 version,
255 evaluation,
256 approval,
257 } => {
258 if version != self.version {
259 debug!(
260 msg_type = "SearchActualRoles",
261 version = version,
262 current_version = self.version,
263 schema_id = %evaluation.schema_id,
264 namespace = %evaluation.namespace,
265 "Request version exceeds current version"
266 );
267 return Ok(RoleRegisterResponse::OutOfVersion);
268 }
269
270 'data: {
271 let approvers = if approval {
272 if self.approvers.is_empty() {
273 break 'data;
274 } else {
275 Some(RoleDataRegister {
276 workers: self.approvers.clone(),
277 quorum: self.appr_quorum.clone(),
278 })
279 }
280 } else {
281 None
282 };
283
284 let mut all_eval = if !evaluation.schema_id.is_gov()
285 && let Some(evaluators) =
286 self.evaluators.get(&SchemaType::TrackerSchemas)
287 {
288 let mut schema_eval = vec![];
289 for (key, namespace) in evaluators {
290 if namespace
291 .is_ancestor_or_equal_of(&evaluation.namespace)
292 {
293 schema_eval.push(key.clone());
294 }
295 }
296
297 schema_eval
298 } else {
299 vec![]
300 };
301
302 let mut schema_eval = if let Some(evaluators) =
303 self.evaluators.get(&evaluation.schema_id)
304 {
305 let mut schema_eval = vec![];
306 for (key, namespace) in evaluators {
307 if namespace
308 .is_ancestor_or_equal_of(&evaluation.namespace)
309 {
310 schema_eval.push(key.clone());
311 }
312 }
313
314 schema_eval
315 } else {
316 vec![]
317 };
318
319 let quorum = if let Some(quorum_schema) =
320 self.eval_quorum.get(&evaluation.schema_id)
321 {
322 quorum_schema.clone()
323 } else {
324 break 'data;
325 };
326
327 if schema_eval.is_empty() && all_eval.is_empty() {
328 break 'data;
329 }
330
331 let mut evaluators = vec![];
332 evaluators.append(&mut schema_eval);
333 evaluators.append(&mut all_eval);
334
335 debug!(
336 msg_type = "SearchActualRoles",
337 version = version,
338 schema_id = %evaluation.schema_id,
339 namespace = %evaluation.namespace,
340 evaluators_count = evaluators.len(),
341 has_approvers = approvers.is_some(),
342 "Found actual roles successfully"
343 );
344
345 return Ok(RoleRegisterResponse::ActualRoles {
346 evaluation: RoleDataRegister {
347 workers: evaluators.iter().cloned().collect(),
348 quorum,
349 },
350 approval: approvers,
351 });
352 }
353
354 debug!(
355 msg_type = "SearchActualRoles",
356 version = version,
357 schema_id = %evaluation.schema_id,
358 namespace = %evaluation.namespace,
359 "Missing role data for version"
360 );
361 Ok(RoleRegisterResponse::MissingData)
362 }
363 RoleRegisterMessage::SearchValidators { search, version } => {
364 if version > self.version {
365 debug!(
366 msg_type = "SearchValidators",
367 version = version,
368 current_version = self.version,
369 schema_id = %search.schema_id,
370 namespace = %search.namespace,
371 "Request version exceeds current version"
372 );
373 return Ok(RoleRegisterResponse::OutOfVersion);
374 }
375
376 let mut all_val = if !search.schema_id.is_gov()
377 && let Some(validators) =
378 self.validators.get(&SchemaType::TrackerSchemas)
379 {
380 let mut schema_val = vec![];
382 for ((key, namespace), (interval, last)) in validators {
383 if namespace.is_ancestor_or_equal_of(&search.namespace)
384 {
385 if let Some(last) = last
386 && last <= &version
387 {
388 schema_val.push(key.clone());
389 } else if interval.contains(version) {
390 schema_val.push(key.clone());
391 }
392 }
393 }
394
395 schema_val
396 } else {
397 vec![]
398 };
399
400 let mut schema_val = if let Some(validators) =
401 self.validators.get(&search.schema_id)
402 {
403 let mut schema_val = vec![];
404 for ((key, namespace), (interval, last)) in validators {
405 if namespace.is_ancestor_or_equal_of(&search.namespace)
406 {
407 if let Some(last) = last
408 && last <= &version
409 {
410 schema_val.push(key.clone());
411 } else if interval.contains(version) {
412 schema_val.push(key.clone());
413 }
414 }
415 }
416
417 schema_val
418 } else {
419 vec![]
420 };
421
422 'data: {
423 let quorum = if let Some(quorum_schema) =
424 self.vali_quorum.get(&search.schema_id)
425 {
426 let Some(quorum) =
427 quorum_schema.get_prev_or_equal(version)
428 else {
429 break 'data;
430 };
431
432 quorum
433 } else {
434 break 'data;
435 };
436
437 if schema_val.is_empty() && all_val.is_empty() {
438 break 'data;
439 }
440
441 let mut validators = vec![];
442 validators.append(&mut schema_val);
443 validators.append(&mut all_val);
444
445 debug!(
446 msg_type = "SearchValidators",
447 version = version,
448 schema_id = %search.schema_id,
449 namespace = %search.namespace,
450 validators_count = validators.len(),
451 "Found validators successfully"
452 );
453
454 return Ok(RoleRegisterResponse::Validation(
455 RoleDataRegister {
456 workers: validators.iter().cloned().collect(),
457 quorum,
458 },
459 ));
460 }
461
462 debug!(
463 msg_type = "SearchValidators",
464 version = version,
465 schema_id = %search.schema_id,
466 namespace = %search.namespace,
467 "Missing validator data for version"
468 );
469 Ok(RoleRegisterResponse::MissingData)
470 }
471 RoleRegisterMessage::UpdateVersion { version } => {
472 if version > self.version || self.version == 0 {
473 self.on_event(
474 RoleRegisterEvent::UpdateVersion { version },
475 ctx,
476 )
477 .await;
478
479 debug!(
480 msg_type = "UpdateVersion",
481 version = version,
482 "Roles register updated successfully"
483 );
484 } else {
485 debug!(
486 msg_type = "UpdateVersion",
487 version = version,
488 current_version = self.version,
489 "Update skipped, version not greater than current"
490 );
491 }
492
493 Ok(RoleRegisterResponse::Ok)
494 }
495 RoleRegisterMessage::UpdateConfirm {
496 version,
497 new_approver,
498 remove_approver,
499 new_evaluator,
500 remove_evaluators,
501 new_validator,
502 remove_validators,
503 } => {
504 if version > self.version || self.version == 0 {
505 self.on_event(
506 RoleRegisterEvent::UpdateConfirm {
507 version,
508 new_approver,
509 remove_approver,
510 new_evaluator,
511 remove_evaluators,
512 new_validator,
513 remove_validators,
514 },
515 ctx,
516 )
517 .await;
518
519 debug!(
520 msg_type = "UpdateConfirm",
521 version = version,
522 "Roles register updated successfully"
523 );
524 } else {
525 debug!(
526 msg_type = "UpdateConfirm",
527 version = version,
528 current_version = self.version,
529 "Update skipped, version not greater than current"
530 );
531 }
532
533 Ok(RoleRegisterResponse::Ok)
534 }
535 RoleRegisterMessage::UpdateFact {
536 version,
537 appr_quorum,
538 eval_quorum,
539 vali_quorum,
540 new_approvers,
541 remove_approvers,
542 new_evaluators,
543 remove_evaluators,
544 new_validators,
545 remove_validators,
546 } => {
547 if version > self.version || self.version == 0 {
548 self.on_event(
549 RoleRegisterEvent::UpdateFact {
550 version,
551 appr_quorum,
552 eval_quorum,
553 vali_quorum,
554 new_approvers,
555 remove_approvers,
556 new_evaluators,
557 remove_evaluators,
558 new_validators,
559 remove_validators,
560 },
561 ctx,
562 )
563 .await;
564
565 debug!(
566 msg_type = "UpdateFact",
567 version = version,
568 "Roles register updated successfully"
569 );
570 } else {
571 debug!(
572 msg_type = "UpdateFact",
573 version = version,
574 current_version = self.version,
575 "Update skipped, version not greater than current"
576 );
577 }
578
579 Ok(RoleRegisterResponse::Ok)
580 }
581 }
582 }
583
584 async fn on_event(
585 &mut self,
586 event: RoleRegisterEvent,
587 ctx: &mut ActorContext<Self>,
588 ) {
589 if let Err(e) = self.persist(&event, ctx).await {
590 let version = match &event {
591 RoleRegisterEvent::UpdateFact { version, .. } => *version,
592 RoleRegisterEvent::UpdateVersion { version } => *version,
593 RoleRegisterEvent::UpdateConfirm { version, .. } => *version,
594 };
595 error!(
596 version = version,
597 error = %e,
598 "Failed to persist role register event"
599 );
600 emit_fail(ctx, e).await;
601 }
602 }
603}
604
605#[async_trait]
606impl PersistentActor for RoleRegister {
607 type Persistence = LightPersistence;
608 type InitParams = ();
609
610 fn create_initial(_params: Self::InitParams) -> Self {
611 Self::default()
612 }
613
614 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
615 match event {
616 RoleRegisterEvent::UpdateVersion { version } => {
617 self.version = *version;
618 }
619 RoleRegisterEvent::UpdateConfirm {
620 version,
621 new_approver,
622 remove_approver,
623 new_evaluator,
624 remove_evaluators,
625 new_validator,
626 remove_validators,
627 } => {
628 self.version = *version;
629 if let Some(approver) = new_approver {
630 self.approvers.insert(approver.clone());
631 }
632
633 if let Some(evaluator) = new_evaluator {
634 self.evaluators
635 .entry(SchemaType::Governance)
636 .or_default()
637 .insert((evaluator.clone(), Namespace::new()));
638 }
639
640 if let Some(validator) = new_validator {
641 self.validators
642 .entry(SchemaType::Governance)
643 .or_default()
644 .entry((validator.clone(), Namespace::new()))
645 .or_default()
646 .1 = Some(*version);
647 }
648
649 self.approvers.remove(remove_approver);
650
651 for ((schema_id, evaluator), namespaces) in
652 remove_evaluators.iter()
653 {
654 for ns in namespaces.iter() {
655 self.evaluators
656 .entry(schema_id.clone())
657 .or_default()
658 .remove(&(evaluator.clone(), ns.clone()));
659 }
660 }
661
662 for ((schema_id, validator), namespaces) in
663 remove_validators.iter()
664 {
665 for ns in namespaces.iter() {
666 let (interval, last) = self
667 .validators
668 .entry(schema_id.clone())
669 .or_default()
670 .entry((validator.clone(), ns.clone()))
671 .or_default();
672 if let Some(last) = last.take() {
673 interval.insert(Interval {
674 lo: last,
675 hi: *version - 1,
676 });
677 }
678 }
679 }
680
681 debug!(
682 event_type = "UpdateFact",
683 version = version,
684 new_approver = new_approver.is_some(),
685 remove_approvers_count = 1,
686 new_evaluator = new_evaluator.is_some(),
687 remove_evaluators_count = remove_evaluators.len(),
688 new_validator = new_validator.is_some(),
689 remove_validators_count = remove_validators.len(),
690 "Role register state updated"
691 );
692 }
693 RoleRegisterEvent::UpdateFact {
694 version,
695 appr_quorum,
696 eval_quorum,
697 vali_quorum,
698 new_approvers,
699 remove_approvers,
700 new_evaluators,
701 remove_evaluators,
702 new_validators,
703 remove_validators,
704 } => {
705 self.version = *version;
706
707 if let Some(appr_quorum) = appr_quorum {
708 self.appr_quorum = appr_quorum.clone();
709 }
710
711 for (schema_id, quorum) in vali_quorum.iter() {
712 self.vali_quorum
713 .entry(schema_id.clone())
714 .or_default()
715 .insert(*version, quorum.clone());
716 }
717
718 for (schema_id, quorum) in eval_quorum.iter() {
719 self.eval_quorum.insert(schema_id.clone(), quorum.clone());
720 }
721
722 for approver in new_approvers.iter() {
723 self.approvers.insert(approver.clone());
724 }
725
726 for approver in remove_approvers.iter() {
727 self.approvers.remove(approver);
728 }
729
730 for ((schema_id, evaluator), namespaces) in
731 new_evaluators.iter()
732 {
733 for ns in namespaces.iter() {
734 self.evaluators
735 .entry(schema_id.clone())
736 .or_default()
737 .insert((evaluator.clone(), ns.clone()));
738 }
739 }
740
741 for ((schema_id, evaluator), namespaces) in
742 remove_evaluators.iter()
743 {
744 for ns in namespaces.iter() {
745 self.evaluators
746 .entry(schema_id.clone())
747 .or_default()
748 .remove(&(evaluator.clone(), ns.clone()));
749 }
750 }
751
752 for ((schema_id, validator), namespaces) in
753 new_validators.iter()
754 {
755 for ns in namespaces.iter() {
756 self.validators
757 .entry(schema_id.clone())
758 .or_default()
759 .entry((validator.clone(), ns.clone()))
760 .or_default()
761 .1 = Some(*version);
762 }
763 }
764
765 for ((schema_id, validator), namespaces) in
766 remove_validators.iter()
767 {
768 for ns in namespaces.iter() {
769 let (interval, last) = self
770 .validators
771 .entry(schema_id.clone())
772 .or_default()
773 .entry((validator.clone(), ns.clone()))
774 .or_default();
775 if let Some(last) = last.take() {
776 interval.insert(Interval {
777 lo: last,
778 hi: *version - 1,
779 });
780 }
781 }
782 }
783
784 debug!(
785 event_type = "UpdateFact",
786 version = version,
787 new_approvers_count = new_approvers.len(),
788 remove_approvers_count = remove_approvers.len(),
789 new_evaluators_count = new_evaluators.len(),
790 remove_evaluators_count = remove_evaluators.len(),
791 new_validators_count = new_validators.len(),
792 remove_validators_count = remove_validators.len(),
793 "Role register state updated"
794 );
795 }
796 }
797 Ok(())
798 }
799}
800
801impl Storable for RoleRegister {}