Skip to main content

ave_core/governance/
role_register.rs

1use std::collections::{HashMap, HashSet};
2
3use crate::{
4    governance::model::Quorum,
5    model::common::{CeilingMap, Interval, IntervalSet, emit_fail},
6};
7use async_trait::async_trait;
8use ave_actors::{
9    Actor, ActorContext, ActorError, ActorPath, Event, Handler,
10    LightPersistence, Message, PersistentActor, Response,
11};
12
13use ave_common::{Namespace, SchemaType, identity::PublicKey};
14use borsh::{BorshDeserialize, BorshSerialize};
15use serde::{Deserialize, Serialize};
16use tracing::{Span, debug, error, info_span};
17
18use crate::db::Storable;
19
20#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
21pub struct SearchRole {
22    pub schema_id: SchemaType,
23    pub namespace: Namespace,
24}
25
26#[derive(
27    Debug,
28    Clone,
29    Serialize,
30    Deserialize,
31    Hash,
32    Eq,
33    PartialEq,
34    Ord,
35    PartialOrd,
36    BorshDeserialize,
37    BorshSerialize,
38)]
39pub struct RoleData {
40    pub key: PublicKey,
41    pub namespace: Namespace,
42}
43
44#[derive(
45    Debug,
46    Clone,
47    Serialize,
48    Deserialize,
49    BorshDeserialize,
50    BorshSerialize,
51    Default,
52)]
53pub struct RoleRegister {
54    version: u64,
55
56    appr_quorum: Quorum,
57    approvers: HashSet<PublicKey>,
58
59    eval_quorum: HashMap<SchemaType, Quorum>,
60    evaluators: HashMap<SchemaType, HashSet<(PublicKey, Namespace)>>,
61
62    vali_quorum: HashMap<SchemaType, CeilingMap<Quorum>>,
63    validators:
64        HashMap<SchemaType, HashMap<(PublicKey, Namespace), IntervalData>>,
65}
66
67type IntervalData = (IntervalSet, Option<u64>);
68
69impl RoleRegister {
70    pub fn new() -> Self {
71        Self {
72            version: 0,
73            appr_quorum: Quorum::Majority,
74            eval_quorum: HashMap::new(),
75            vali_quorum: HashMap::new(),
76            evaluators: HashMap::new(),
77            validators: HashMap::new(),
78            approvers: HashSet::new(),
79        }
80    }
81}
82
83#[derive(
84    Debug, Clone, Deserialize, Serialize, BorshDeserialize, BorshSerialize,
85)]
86pub struct UpdateRole {
87    pub schema_id: SchemaType,
88    pub role: HashSet<RoleData>,
89}
90
91#[derive(
92    Debug, Clone, Deserialize, Serialize, BorshDeserialize, BorshSerialize,
93)]
94pub struct UpdateQuorum {
95    pub schema_id: SchemaType,
96    pub quorum: Quorum,
97}
98
99#[derive(Debug, Clone)]
100pub struct RoleDataRegister {
101    pub workers: HashSet<PublicKey>,
102    pub quorum: Quorum,
103}
104
105#[derive(Debug, Clone)]
106pub enum RoleRegisterMessage {
107    SearchActualRoles {
108        version: u64,
109        evaluation: SearchRole,
110        approval: bool,
111    },
112    SearchValidators {
113        search: SearchRole,
114        version: u64,
115    },
116    UpdateVersion {
117        version: u64,
118    },
119    UpdateFact {
120        version: u64,
121
122        appr_quorum: Option<Quorum>,
123        eval_quorum: HashMap<SchemaType, Quorum>,
124        vali_quorum: HashMap<SchemaType, Quorum>,
125
126        new_approvers: Vec<PublicKey>,
127        remove_approvers: Vec<PublicKey>,
128
129        new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
130        remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
131
132        new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
133        remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
134    },
135    UpdateConfirm {
136        version: u64,
137
138        new_approver: Option<PublicKey>,
139        remove_approver: PublicKey,
140
141        new_evaluator: Option<PublicKey>,
142        remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
143
144        new_validator: Option<PublicKey>,
145        remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
146    },
147}
148impl Message for RoleRegisterMessage {
149    fn is_critical(&self) -> bool {
150        matches!(
151            self,
152            Self::UpdateVersion { .. }
153                | Self::UpdateFact { .. }
154                | Self::UpdateConfirm { .. }
155        )
156    }
157}
158
159#[derive(Debug, Clone)]
160pub enum RoleRegisterResponse {
161    ActualRoles {
162        evaluation: RoleDataRegister,
163        approval: Option<RoleDataRegister>,
164    },
165    Validation(RoleDataRegister),
166    MissingData,
167    OutOfVersion,
168    Ok,
169}
170
171impl Response for RoleRegisterResponse {}
172
173#[derive(
174    Debug, Clone, Deserialize, Serialize, BorshDeserialize, BorshSerialize,
175)]
176pub enum RoleRegisterEvent {
177    UpdateVersion {
178        version: u64,
179    },
180    UpdateFact {
181        version: u64,
182
183        appr_quorum: Option<Quorum>,
184        eval_quorum: HashMap<SchemaType, Quorum>,
185        vali_quorum: HashMap<SchemaType, Quorum>,
186
187        new_approvers: Vec<PublicKey>,
188        remove_approvers: Vec<PublicKey>,
189
190        new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
191        remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
192
193        new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
194        remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
195    },
196    UpdateConfirm {
197        version: u64,
198
199        new_approver: Option<PublicKey>,
200        remove_approver: PublicKey,
201
202        new_evaluator: Option<PublicKey>,
203        remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
204
205        new_validator: Option<PublicKey>,
206        remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
207    },
208}
209
210impl Event for RoleRegisterEvent {}
211
212#[async_trait]
213impl Actor for RoleRegister {
214    type Event = RoleRegisterEvent;
215    type Message = RoleRegisterMessage;
216    type Response = RoleRegisterResponse;
217
218    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
219        parent_span.map_or_else(
220            || info_span!("RoleRegister"),
221            |parent_span| info_span!(parent: parent_span, "RoleRegister"),
222        )
223    }
224
225    async fn pre_start(
226        &mut self,
227        ctx: &mut ActorContext<Self>,
228    ) -> Result<(), ActorError> {
229        let prefix = ctx.path().parent().key();
230        if let Err(e) = self
231            .init_store("role_register", Some(prefix), true, ctx)
232            .await
233        {
234            error!(
235                error = %e,
236                "Failed to initialize role_register store"
237            );
238            return Err(e);
239        }
240        Ok(())
241    }
242}
243
244#[async_trait]
245impl Handler<Self> for RoleRegister {
246    async fn handle_message(
247        &mut self,
248        _sender: ActorPath,
249        msg: RoleRegisterMessage,
250        ctx: &mut ActorContext<Self>,
251    ) -> Result<RoleRegisterResponse, ActorError> {
252        match msg {
253            RoleRegisterMessage::SearchActualRoles {
254                version,
255                evaluation,
256                approval,
257            } => {
258                if version != self.version {
259                    debug!(
260                        msg_type = "SearchActualRoles",
261                        version = version,
262                        current_version = self.version,
263                        schema_id = %evaluation.schema_id,
264                        namespace = %evaluation.namespace,
265                        "Request version exceeds current version"
266                    );
267                    return Ok(RoleRegisterResponse::OutOfVersion);
268                }
269
270                'data: {
271                    let approvers = if approval {
272                        if self.approvers.is_empty() {
273                            break 'data;
274                        } else {
275                            Some(RoleDataRegister {
276                                workers: self.approvers.clone(),
277                                quorum: self.appr_quorum.clone(),
278                            })
279                        }
280                    } else {
281                        None
282                    };
283
284                    let mut all_eval = if !evaluation.schema_id.is_gov()
285                        && let Some(evaluators) =
286                            self.evaluators.get(&SchemaType::TrackerSchemas)
287                    {
288                        let mut schema_eval = vec![];
289                        for (key, namespace) in evaluators {
290                            if namespace
291                                .is_ancestor_or_equal_of(&evaluation.namespace)
292                            {
293                                schema_eval.push(key.clone());
294                            }
295                        }
296
297                        schema_eval
298                    } else {
299                        vec![]
300                    };
301
302                    let mut schema_eval = if let Some(evaluators) =
303                        self.evaluators.get(&evaluation.schema_id)
304                    {
305                        let mut schema_eval = vec![];
306                        for (key, namespace) in evaluators {
307                            if namespace
308                                .is_ancestor_or_equal_of(&evaluation.namespace)
309                            {
310                                schema_eval.push(key.clone());
311                            }
312                        }
313
314                        schema_eval
315                    } else {
316                        vec![]
317                    };
318
319                    let quorum = if let Some(quorum_schema) =
320                        self.eval_quorum.get(&evaluation.schema_id)
321                    {
322                        quorum_schema.clone()
323                    } else {
324                        break 'data;
325                    };
326
327                    if schema_eval.is_empty() && all_eval.is_empty() {
328                        break 'data;
329                    }
330
331                    let mut evaluators = vec![];
332                    evaluators.append(&mut schema_eval);
333                    evaluators.append(&mut all_eval);
334
335                    debug!(
336                        msg_type = "SearchActualRoles",
337                        version = version,
338                        schema_id = %evaluation.schema_id,
339                        namespace = %evaluation.namespace,
340                        evaluators_count = evaluators.len(),
341                        has_approvers = approvers.is_some(),
342                        "Found actual roles successfully"
343                    );
344
345                    return Ok(RoleRegisterResponse::ActualRoles {
346                        evaluation: RoleDataRegister {
347                            workers: evaluators.iter().cloned().collect(),
348                            quorum,
349                        },
350                        approval: approvers,
351                    });
352                }
353
354                debug!(
355                    msg_type = "SearchActualRoles",
356                    version = version,
357                    schema_id = %evaluation.schema_id,
358                    namespace = %evaluation.namespace,
359                    "Missing role data for version"
360                );
361                Ok(RoleRegisterResponse::MissingData)
362            }
363            RoleRegisterMessage::SearchValidators { search, version } => {
364                if version > self.version {
365                    debug!(
366                        msg_type = "SearchValidators",
367                        version = version,
368                        current_version = self.version,
369                        schema_id = %search.schema_id,
370                        namespace = %search.namespace,
371                        "Request version exceeds current version"
372                    );
373                    return Ok(RoleRegisterResponse::OutOfVersion);
374                }
375
376                let mut all_val = if !search.schema_id.is_gov()
377                    && let Some(validators) =
378                        self.validators.get(&SchemaType::TrackerSchemas)
379                {
380                    // PublicKey, Namespace), (IntervalSet, Option<u64>
381                    let mut schema_val = vec![];
382                    for ((key, namespace), (interval, last)) in validators {
383                        if namespace.is_ancestor_or_equal_of(&search.namespace)
384                        {
385                            if let Some(last) = last
386                                && last <= &version
387                            {
388                                schema_val.push(key.clone());
389                            } else if interval.contains(version) {
390                                schema_val.push(key.clone());
391                            }
392                        }
393                    }
394
395                    schema_val
396                } else {
397                    vec![]
398                };
399
400                let mut schema_val = if let Some(validators) =
401                    self.validators.get(&search.schema_id)
402                {
403                    let mut schema_val = vec![];
404                    for ((key, namespace), (interval, last)) in validators {
405                        if namespace.is_ancestor_or_equal_of(&search.namespace)
406                        {
407                            if let Some(last) = last
408                                && last <= &version
409                            {
410                                schema_val.push(key.clone());
411                            } else if interval.contains(version) {
412                                schema_val.push(key.clone());
413                            }
414                        }
415                    }
416
417                    schema_val
418                } else {
419                    vec![]
420                };
421
422                'data: {
423                    let quorum = if let Some(quorum_schema) =
424                        self.vali_quorum.get(&search.schema_id)
425                    {
426                        let Some(quorum) =
427                            quorum_schema.get_prev_or_equal(version)
428                        else {
429                            break 'data;
430                        };
431
432                        quorum
433                    } else {
434                        break 'data;
435                    };
436
437                    if schema_val.is_empty() && all_val.is_empty() {
438                        break 'data;
439                    }
440
441                    let mut validators = vec![];
442                    validators.append(&mut schema_val);
443                    validators.append(&mut all_val);
444
445                    debug!(
446                        msg_type = "SearchValidators",
447                        version = version,
448                        schema_id = %search.schema_id,
449                        namespace = %search.namespace,
450                        validators_count = validators.len(),
451                        "Found validators successfully"
452                    );
453
454                    return Ok(RoleRegisterResponse::Validation(
455                        RoleDataRegister {
456                            workers: validators.iter().cloned().collect(),
457                            quorum,
458                        },
459                    ));
460                }
461
462                debug!(
463                    msg_type = "SearchValidators",
464                    version = version,
465                    schema_id = %search.schema_id,
466                    namespace = %search.namespace,
467                    "Missing validator data for version"
468                );
469                Ok(RoleRegisterResponse::MissingData)
470            }
471            RoleRegisterMessage::UpdateVersion { version } => {
472                if version > self.version || self.version == 0 {
473                    self.on_event(
474                        RoleRegisterEvent::UpdateVersion { version },
475                        ctx,
476                    )
477                    .await;
478
479                    debug!(
480                        msg_type = "UpdateVersion",
481                        version = version,
482                        "Roles register updated successfully"
483                    );
484                } else {
485                    debug!(
486                        msg_type = "UpdateVersion",
487                        version = version,
488                        current_version = self.version,
489                        "Update skipped, version not greater than current"
490                    );
491                }
492
493                Ok(RoleRegisterResponse::Ok)
494            }
495            RoleRegisterMessage::UpdateConfirm {
496                version,
497                new_approver,
498                remove_approver,
499                new_evaluator,
500                remove_evaluators,
501                new_validator,
502                remove_validators,
503            } => {
504                if version > self.version || self.version == 0 {
505                    self.on_event(
506                        RoleRegisterEvent::UpdateConfirm {
507                            version,
508                            new_approver,
509                            remove_approver,
510                            new_evaluator,
511                            remove_evaluators,
512                            new_validator,
513                            remove_validators,
514                        },
515                        ctx,
516                    )
517                    .await;
518
519                    debug!(
520                        msg_type = "UpdateConfirm",
521                        version = version,
522                        "Roles register updated successfully"
523                    );
524                } else {
525                    debug!(
526                        msg_type = "UpdateConfirm",
527                        version = version,
528                        current_version = self.version,
529                        "Update skipped, version not greater than current"
530                    );
531                }
532
533                Ok(RoleRegisterResponse::Ok)
534            }
535            RoleRegisterMessage::UpdateFact {
536                version,
537                appr_quorum,
538                eval_quorum,
539                vali_quorum,
540                new_approvers,
541                remove_approvers,
542                new_evaluators,
543                remove_evaluators,
544                new_validators,
545                remove_validators,
546            } => {
547                if version > self.version || self.version == 0 {
548                    self.on_event(
549                        RoleRegisterEvent::UpdateFact {
550                            version,
551                            appr_quorum,
552                            eval_quorum,
553                            vali_quorum,
554                            new_approvers,
555                            remove_approvers,
556                            new_evaluators,
557                            remove_evaluators,
558                            new_validators,
559                            remove_validators,
560                        },
561                        ctx,
562                    )
563                    .await;
564
565                    debug!(
566                        msg_type = "UpdateFact",
567                        version = version,
568                        "Roles register updated successfully"
569                    );
570                } else {
571                    debug!(
572                        msg_type = "UpdateFact",
573                        version = version,
574                        current_version = self.version,
575                        "Update skipped, version not greater than current"
576                    );
577                }
578
579                Ok(RoleRegisterResponse::Ok)
580            }
581        }
582    }
583
584    async fn on_event(
585        &mut self,
586        event: RoleRegisterEvent,
587        ctx: &mut ActorContext<Self>,
588    ) {
589        if let Err(e) = self.persist(&event, ctx).await {
590            let version = match &event {
591                RoleRegisterEvent::UpdateFact { version, .. } => *version,
592                RoleRegisterEvent::UpdateVersion { version } => *version,
593                RoleRegisterEvent::UpdateConfirm { version, .. } => *version,
594            };
595            error!(
596                version = version,
597                error = %e,
598                "Failed to persist role register event"
599            );
600            emit_fail(ctx, e).await;
601        }
602    }
603}
604
605#[async_trait]
606impl PersistentActor for RoleRegister {
607    type Persistence = LightPersistence;
608    type InitParams = ();
609
610    fn create_initial(_params: Self::InitParams) -> Self {
611        Self::default()
612    }
613
614    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
615        match event {
616            RoleRegisterEvent::UpdateVersion { version } => {
617                self.version = *version;
618            }
619            RoleRegisterEvent::UpdateConfirm {
620                version,
621                new_approver,
622                remove_approver,
623                new_evaluator,
624                remove_evaluators,
625                new_validator,
626                remove_validators,
627            } => {
628                self.version = *version;
629                if let Some(approver) = new_approver {
630                    self.approvers.insert(approver.clone());
631                }
632
633                if let Some(evaluator) = new_evaluator {
634                    self.evaluators
635                        .entry(SchemaType::Governance)
636                        .or_default()
637                        .insert((evaluator.clone(), Namespace::new()));
638                }
639
640                if let Some(validator) = new_validator {
641                    self.validators
642                        .entry(SchemaType::Governance)
643                        .or_default()
644                        .entry((validator.clone(), Namespace::new()))
645                        .or_default()
646                        .1 = Some(*version);
647                }
648
649                self.approvers.remove(remove_approver);
650
651                for ((schema_id, evaluator), namespaces) in
652                    remove_evaluators.iter()
653                {
654                    for ns in namespaces.iter() {
655                        self.evaluators
656                            .entry(schema_id.clone())
657                            .or_default()
658                            .remove(&(evaluator.clone(), ns.clone()));
659                    }
660                }
661
662                for ((schema_id, validator), namespaces) in
663                    remove_validators.iter()
664                {
665                    for ns in namespaces.iter() {
666                        let (interval, last) = self
667                            .validators
668                            .entry(schema_id.clone())
669                            .or_default()
670                            .entry((validator.clone(), ns.clone()))
671                            .or_default();
672                        if let Some(last) = last.take() {
673                            interval.insert(Interval {
674                                lo: last,
675                                hi: *version - 1,
676                            });
677                        }
678                    }
679                }
680
681                debug!(
682                    event_type = "UpdateFact",
683                    version = version,
684                    new_approver = new_approver.is_some(),
685                    remove_approvers_count = 1,
686                    new_evaluator = new_evaluator.is_some(),
687                    remove_evaluators_count = remove_evaluators.len(),
688                    new_validator = new_validator.is_some(),
689                    remove_validators_count = remove_validators.len(),
690                    "Role register state updated"
691                );
692            }
693            RoleRegisterEvent::UpdateFact {
694                version,
695                appr_quorum,
696                eval_quorum,
697                vali_quorum,
698                new_approvers,
699                remove_approvers,
700                new_evaluators,
701                remove_evaluators,
702                new_validators,
703                remove_validators,
704            } => {
705                self.version = *version;
706
707                if let Some(appr_quorum) = appr_quorum {
708                    self.appr_quorum = appr_quorum.clone();
709                }
710
711                for (schema_id, quorum) in vali_quorum.iter() {
712                    self.vali_quorum
713                        .entry(schema_id.clone())
714                        .or_default()
715                        .insert(*version, quorum.clone());
716                }
717
718                for (schema_id, quorum) in eval_quorum.iter() {
719                    self.eval_quorum.insert(schema_id.clone(), quorum.clone());
720                }
721
722                for approver in new_approvers.iter() {
723                    self.approvers.insert(approver.clone());
724                }
725
726                for approver in remove_approvers.iter() {
727                    self.approvers.remove(approver);
728                }
729
730                for ((schema_id, evaluator), namespaces) in
731                    new_evaluators.iter()
732                {
733                    for ns in namespaces.iter() {
734                        self.evaluators
735                            .entry(schema_id.clone())
736                            .or_default()
737                            .insert((evaluator.clone(), ns.clone()));
738                    }
739                }
740
741                for ((schema_id, evaluator), namespaces) in
742                    remove_evaluators.iter()
743                {
744                    for ns in namespaces.iter() {
745                        self.evaluators
746                            .entry(schema_id.clone())
747                            .or_default()
748                            .remove(&(evaluator.clone(), ns.clone()));
749                    }
750                }
751
752                for ((schema_id, validator), namespaces) in
753                    new_validators.iter()
754                {
755                    for ns in namespaces.iter() {
756                        self.validators
757                            .entry(schema_id.clone())
758                            .or_default()
759                            .entry((validator.clone(), ns.clone()))
760                            .or_default()
761                            .1 = Some(*version);
762                    }
763                }
764
765                for ((schema_id, validator), namespaces) in
766                    remove_validators.iter()
767                {
768                    for ns in namespaces.iter() {
769                        let (interval, last) = self
770                            .validators
771                            .entry(schema_id.clone())
772                            .or_default()
773                            .entry((validator.clone(), ns.clone()))
774                            .or_default();
775                        if let Some(last) = last.take() {
776                            interval.insert(Interval {
777                                lo: last,
778                                hi: *version - 1,
779                            });
780                        }
781                    }
782                }
783
784                debug!(
785                    event_type = "UpdateFact",
786                    version = version,
787                    new_approvers_count = new_approvers.len(),
788                    remove_approvers_count = remove_approvers.len(),
789                    new_evaluators_count = new_evaluators.len(),
790                    remove_evaluators_count = remove_evaluators.len(),
791                    new_validators_count = new_validators.len(),
792                    remove_validators_count = remove_validators.len(),
793                    "Role register state updated"
794                );
795            }
796        }
797        Ok(())
798    }
799}
800
801impl Storable for RoleRegister {}