1use crate::{
5 approval::{
6 persist::{ApprPersist, InitApprPersist},
7 types::VotationType,
8 },
9 db::Storable,
10 evaluation::{
11 compiler::{
12 CompilerResponse, ContractCompiler, ContractCompilerMessage,
13 },
14 request::EvalWorkerContext,
15 schema::{EvaluationSchema, EvaluationSchemaMessage},
16 worker::{EvalWorker, EvalWorkerMessage},
17 },
18 governance::{
19 contract_register::{
20 ContractRegister, ContractRegisterMessage, ContractRegisterResponse,
21 },
22 data::GovernanceData,
23 events::{
24 GovernanceEvent, governance_event_roles_update_fact,
25 governance_event_update_creator_change,
26 },
27 model::{
28 CreatorQuantity, CreatorWitness, HashThisRole, ProtocolTypes,
29 Quorum, RoleCreator, RoleTypes, Schema, WitnessesData,
30 },
31 role_register::{
32 CurrentValidationRoles, RoleRegister, RoleRegisterMessage,
33 RoleRegisterResponse,
34 },
35 sn_register::{SnRegister, SnRegisterMessage, SnRegisterResponse},
36 subject_register::{
37 SubjectRegister, SubjectRegisterMessage, SubjectRegisterResponse,
38 },
39 tracker_sync::{TrackerSync, TrackerSyncConfig},
40 version_sync::{GovernanceVersionSync, GovernanceVersionSyncMessage},
41 witnesses_register::{
42 CreatorWitnessGrant, CreatorWitnessRegistration, WitnessesRegister,
43 WitnessesRegisterMessage, WitnessesRegisterResponse, WitnessesType,
44 },
45 },
46 helpers::{db::ExternalDB, network::service::NetworkSender, sink::AveSink},
47 model::{
48 common::{
49 emit_fail, get_last_event, purge_storage, subject::make_obsolete,
50 },
51 event::{Ledger, Protocols, ValidationMetadata},
52 },
53 node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
54 subject::{
55 DataForSink, EventLedgerDataForSink, Metadata, Subject,
56 SubjectMetadata,
57 error::SubjectError,
58 sinkdata::{SinkData, SinkDataMessage},
59 },
60 system::ConfigHelper,
61 validation::{
62 request::LastData,
63 schema::{ValidationSchema, ValidationSchemaMessage},
64 worker::{ValiWorker, ValiWorkerMessage},
65 },
66};
67
68use ave_actors::{
69 Actor, ActorContext, ActorError, ActorPath, ActorRef, ChildAction, Handler,
70 Message, Response, Sink,
71};
72use ave_common::{
73 Namespace, SchemaType, ValueWrapper,
74 identity::{DigestIdentifier, HashAlgorithm, PublicKey},
75 request::EventRequest,
76 response::SubjectDB,
77 schematype::ReservedWords,
78};
79
80use async_trait::async_trait;
81use ave_actors::{FullPersistence, PersistentActor};
82use borsh::{BorshDeserialize, BorshSerialize};
83use json_patch::{Patch, patch};
84use serde::{Deserialize, Serialize};
85use tracing::{Span, debug, error, info_span, warn};
86
87use std::{
88 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
89 sync::Arc,
90 time::Duration,
91};
92use tokio::{fs, sync::RwLock};
93use wasmtime::Module;
94
95pub mod contract_register;
96pub mod data;
97pub mod error;
98pub mod events;
99pub mod model;
100pub mod role_register;
101pub mod sn_register;
102pub mod subject_register;
103pub mod tracker_sync;
104pub mod version_sync;
105pub mod witnesses_register;
106
107type SchemaCreatorsByNamespace =
108 BTreeMap<SchemaType, BTreeMap<PublicKey, BTreeSet<Namespace>>>;
109type SchemaIssuersByNamespace =
110 BTreeMap<SchemaType, (BTreeMap<PublicKey, BTreeSet<Namespace>>, bool)>;
111
112struct SchemaRuntimeRoles<'a> {
113 creators_eval: &'a SchemaCreatorsByNamespace,
114 issuers_eval: &'a SchemaIssuersByNamespace,
115 creators_vali: &'a SchemaCreatorsByNamespace,
116}
117
118pub struct RolesUpdate {
119 pub appr_quorum: Option<Quorum>,
120 pub new_approvers: Vec<PublicKey>,
121 pub remove_approvers: Vec<PublicKey>,
122
123 pub eval_quorum: HashMap<SchemaType, Quorum>,
124 pub new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
125 pub remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
126
127 pub vali_quorum: HashMap<SchemaType, Quorum>,
128 pub new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
129 pub remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
130
131 pub new_creator: HashMap<
132 (SchemaType, String, PublicKey),
133 (CreatorQuantity, Vec<WitnessesType>),
134 >,
135 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
136
137 pub new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
138 pub remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
139}
140
141pub struct RolesUpdateConfirm {
142 pub new_approver: Option<PublicKey>,
143 pub remove_approver: PublicKey,
144
145 pub new_evaluator: Option<PublicKey>,
146 pub remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
147
148 pub new_validator: Option<PublicKey>,
149 pub remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
150
151 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
152 pub remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
153}
154
155pub struct RolesUpdateRemove {
156 pub witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
157 pub creator: HashSet<(SchemaType, String, PublicKey)>,
158 pub approvers: Vec<PublicKey>,
159 pub evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
160 pub validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
161}
162
163pub struct CreatorRoleUpdate {
164 pub new_creator: HashMap<
165 (SchemaType, String, PublicKey),
166 (CreatorQuantity, BTreeSet<CreatorWitness>),
167 >,
168
169 pub update_creator_quantity:
170 HashSet<(SchemaType, String, PublicKey, CreatorQuantity)>,
171
172 pub update_creator_witnesses:
173 HashSet<(SchemaType, String, PublicKey, BTreeSet<CreatorWitness>)>,
174
175 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
176}
177
178#[derive(Default, Debug, Serialize, Deserialize, Clone)]
179pub struct Governance {
180 #[serde(skip)]
181 pub our_key: Arc<PublicKey>,
182 #[serde(skip)]
183 pub service: bool,
184 #[serde(skip)]
185 pub hash: Option<HashAlgorithm>,
186 pub subject_metadata: SubjectMetadata,
187 pub properties: GovernanceData,
188}
189
190impl BorshSerialize for Governance {
191 fn serialize<W: std::io::Write>(
192 &self,
193 writer: &mut W,
194 ) -> std::io::Result<()> {
195 BorshSerialize::serialize(&self.subject_metadata, writer)?;
197 BorshSerialize::serialize(&self.properties, writer)?;
198
199 Ok(())
200 }
201}
202
203impl BorshDeserialize for Governance {
204 fn deserialize_reader<R: std::io::Read>(
205 reader: &mut R,
206 ) -> std::io::Result<Self> {
207 let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
209 let properties = GovernanceData::deserialize_reader(reader)?;
210
211 let our_key = Arc::new(PublicKey::default());
214 let hash = None;
215
216 Ok(Self {
217 hash,
218 our_key,
219 service: false,
220 subject_metadata,
221 properties,
222 })
223 }
224}
225
226#[async_trait]
227impl Subject for Governance {
228 async fn update_sn(
229 &self,
230 ctx: &mut ActorContext<Self>,
231 ) -> Result<(), ActorError> {
232 let witnesses_register = ctx
233 .get_child::<WitnessesRegister>("witnesses_register")
234 .await?;
235
236 witnesses_register
237 .tell(WitnessesRegisterMessage::UpdateSnGov {
238 sn: self.subject_metadata.sn,
239 })
240 .await
241 }
242
243 async fn eol(
244 &self,
245 ctx: &mut ActorContext<Self>,
246 ) -> Result<(), ActorError> {
247 let node_path = ActorPath::from("/user/node");
248 let node = ctx.system().get_actor::<Node>(&node_path).await?;
249 node.tell(NodeMessage::EOLSubject {
250 subject_id: self.subject_metadata.subject_id.clone(),
251 i_owner: *self.our_key == self.subject_metadata.owner,
252 })
253 .await
254 }
255
256 async fn reject(
257 &self,
258 ctx: &mut ActorContext<Self>,
259 _gov_version: u64,
260 ) -> Result<(), ActorError> {
261 let node_path = ActorPath::from("/user/node");
262 let node = ctx.system().get_actor::<Node>(&node_path).await?;
263 node.tell(NodeMessage::RejectTransfer(
264 self.subject_metadata.subject_id.clone(),
265 ))
266 .await
267 }
268
269 async fn confirm(
270 &self,
271 ctx: &mut ActorContext<Self>,
272 _new_owner: PublicKey,
273 _gov_version: u64,
274 ) -> Result<(), ActorError> {
275 let node_path = ActorPath::from("/user/node");
276 let node = ctx.system().get_actor::<Node>(&node_path).await?;
277 node.tell(NodeMessage::ConfirmTransfer(
278 self.subject_metadata.subject_id.clone(),
279 ))
280 .await
281 }
282
283 async fn transfer(
284 &self,
285 ctx: &mut ActorContext<Self>,
286 new_owner: PublicKey,
287 _gov_version: u64,
288 ) -> Result<(), ActorError> {
289 let node_path = ActorPath::from("/user/node");
290 let node = ctx.system().get_actor::<Node>(&node_path).await?;
291 node.tell(NodeMessage::TransferSubject(TransferSubject {
292 name: self.subject_metadata.name.clone(),
293 subject_id: self.subject_metadata.subject_id.clone(),
294 new_owner: new_owner.clone(),
295 actual_owner: self.subject_metadata.owner.clone(),
296 }))
297 .await
298 }
299
300 async fn get_last_ledger(
301 &self,
302 ctx: &mut ActorContext<Self>,
303 ) -> Result<Option<Ledger>, ActorError> {
304 get_last_event(ctx).await
305 }
306
307 fn apply_patch(
308 &mut self,
309 json_patch: ValueWrapper,
310 ) -> Result<(), ActorError> {
311 let patch_json = serde_json::from_value::<Patch>(json_patch.0)
312 .map_err(|e| {
313 let error = SubjectError::PatchConversionFailed {
314 details: e.to_string(),
315 };
316 error!(
317 error = %e,
318 subject_id = %self.subject_metadata.subject_id,
319 "Failed to convert patch from JSON"
320 );
321 ActorError::Functional {
322 description: error.to_string(),
323 }
324 })?;
325
326 let mut properties = self.properties.to_value_wrapper();
327
328 patch(&mut properties.0, &patch_json).map_err(|e| {
329 let error = SubjectError::PatchApplicationFailed {
330 details: e.to_string(),
331 };
332 error!(
333 error = %e,
334 subject_id = %self.subject_metadata.subject_id,
335 "Failed to apply patch to properties"
336 );
337 ActorError::Functional {
338 description: error.to_string(),
339 }
340 })?;
341
342 self.properties = serde_json::from_value::<GovernanceData>(
343 properties.0,
344 )
345 .map_err(|e| {
346 let error = SubjectError::GovernanceDataConversionFailed {
347 details: e.to_string(),
348 };
349 error!(
350 error = %e,
351 subject_id = %self.subject_metadata.subject_id,
352 "Failed to convert properties to GovernanceData"
353 );
354 ActorError::Functional {
355 description: error.to_string(),
356 }
357 })?;
358
359 debug!(
360 subject_id = %self.subject_metadata.subject_id,
361 "Patch applied successfully"
362 );
363
364 Ok(())
365 }
366
367 async fn manager_new_ledger_events(
368 &mut self,
369 ctx: &mut ActorContext<Self>,
370 events: Vec<Ledger>,
371 ) -> Result<(), ActorError> {
372 let Some(network) = ctx
373 .system()
374 .get_helper::<Arc<NetworkSender>>("network")
375 .await
376 else {
377 return Err(ActorError::Helper {
378 name: "network".to_owned(),
379 reason: "Not found".to_owned(),
380 });
381 };
382
383 let Some(hash) = self.hash else {
384 return Err(ActorError::FunctionalCritical {
385 description: "Hash algorithm is None".to_string(),
386 });
387 };
388
389 let current_sn = self.subject_metadata.sn;
390 let current_new_owner_some = self.subject_metadata.new_owner.is_some();
391 let i_current_new_owner = self.subject_metadata.new_owner.clone()
392 == Some((*self.our_key).clone());
393 let current_owner = self.subject_metadata.owner.clone();
394
395 let current_properties = self.properties.clone();
396
397 if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
398 {
399 if let ActorError::Functional { description } = e.clone() {
400 warn!(
401 error = %description,
402 subject_id = %self.subject_metadata.subject_id,
403 sn = self.subject_metadata.sn,
404 "Error verifying new ledger events"
405 );
406
407 if self.subject_metadata.sn == 0 {
409 return Err(e);
410 }
411 } else {
412 error!(
413 error = %e,
414 subject_id = %self.subject_metadata.subject_id,
415 sn = self.subject_metadata.sn,
416 "Critical error verifying new ledger events"
417 );
418 return Err(e);
419 }
420 };
421
422 if current_sn < self.subject_metadata.sn {
423 let old_gov = current_properties;
424 if !self.subject_metadata.active {
425 if current_owner == *self.our_key {
426 Self::down_owner(ctx).await?;
427 } else {
428 Self::down_not_owner(ctx, &old_gov, self.our_key.clone())
429 .await?;
430 }
431
432 let old_schemas_eval = old_gov
433 .schemas_name(ProtocolTypes::Evaluation, &self.our_key);
434
435 Self::down_compilers_schemas(
436 ctx,
437 &old_schemas_eval,
438 &self.subject_metadata.subject_id,
439 )
440 .await?;
441
442 let old_schemas_val = old_gov
443 .schemas_name(ProtocolTypes::Validation, &self.our_key);
444
445 Self::down_schemas(ctx, &old_schemas_eval, &old_schemas_val)
446 .await?;
447 } else {
448 let new_owner_some = self.subject_metadata.new_owner.is_some();
449 let i_new_owner = self.subject_metadata.new_owner.clone()
450 == Some((*self.our_key).clone());
451 let mut up_not_owner: bool = false;
452 let mut up_owner: bool = false;
453
454 if current_owner == *self.our_key {
455 if current_owner != self.subject_metadata.owner {
457 if !current_new_owner_some && !i_new_owner {
459 up_not_owner = true;
461 } else if current_new_owner_some && i_new_owner {
462 up_owner = true;
463 }
464 } else {
465 if current_new_owner_some && !new_owner_some {
467 up_owner = true;
468 } else if !current_new_owner_some && new_owner_some {
469 up_not_owner = true;
470 }
471 }
472 } else {
473 if current_owner != self.subject_metadata.owner
475 && self.subject_metadata.owner == *self.our_key
476 {
477 if !new_owner_some && !i_current_new_owner {
479 up_owner = true;
481 } else if new_owner_some && i_current_new_owner {
482 up_not_owner = true;
483 }
484 } else if i_current_new_owner && !i_new_owner {
485 up_not_owner = true;
486 } else if !i_current_new_owner && i_new_owner {
487 up_owner = true;
488 }
489 }
490
491 if up_not_owner {
492 Self::down_owner(ctx).await?;
493 self.up_not_owner(ctx, &hash, &network).await?;
494 } else if up_owner {
495 Self::down_not_owner(ctx, &old_gov, self.our_key.clone())
496 .await?;
497 self.up_owner(ctx, &hash, &network).await?;
498 }
499
500 if !up_not_owner
503 && !up_owner
504 && *self.our_key != self.subject_metadata.owner
505 {
506 self.up_down_not_owner(ctx, &old_gov, &hash, &network)
507 .await?;
508 }
509
510 self.manager_schemas_compilers(ctx, &old_gov).await?;
511 self.update_childs(ctx).await?;
512 }
513
514 let _ = make_obsolete(ctx, &self.subject_metadata.subject_id).await;
515 }
516
517 if current_sn < self.subject_metadata.sn || current_sn == 0 {
518 Self::publish_sink(
519 ctx,
520 SinkDataMessage::UpdateState(Box::new(SubjectDB::from(
521 self.clone(),
522 ))),
523 )
524 .await?;
525
526 self.update_sn(ctx).await?;
527 self.refresh_version_sync(ctx).await?;
528 }
529
530 Ok(())
531 }
532}
533
534impl Governance {
535 async fn ledger_batch_size(
536 ctx: &ActorContext<Self>,
537 ) -> Result<usize, ActorError> {
538 let Some(config) =
539 ctx.system().get_helper::<ConfigHelper>("config").await
540 else {
541 return Err(ActorError::Helper {
542 name: "config".to_string(),
543 reason: "Not Found".to_string(),
544 });
545 };
546
547 Ok(config.ledger_batch_size)
548 }
549
550 async fn up_approver_only(
551 &self,
552 ctx: &mut ActorContext<Self>,
553 hash: &HashAlgorithm,
554 network: &Arc<NetworkSender>,
555 ) -> Result<(), ActorError> {
556 if !self.properties.has_this_role(HashThisRole::Gov {
557 who: (*self.our_key).clone(),
558 role: RoleTypes::Approver,
559 }) {
560 return Ok(());
561 }
562
563 let always_accept = if let Some(config) =
564 ctx.system().get_helper::<ConfigHelper>("config").await
565 {
566 config.always_accept
567 } else {
568 return Err(ActorError::Helper {
569 name: "config".to_string(),
570 reason: "Not Found".to_string(),
571 });
572 };
573
574 let pass_votation = if always_accept {
575 VotationType::AlwaysAccept
576 } else {
577 VotationType::Manual
578 };
579
580 let owner = *self.our_key == self.subject_metadata.owner;
581 let i_new_owner =
582 self.subject_metadata.new_owner == Some((*self.our_key).clone());
583
584 let node_key = if (owner && self.subject_metadata.new_owner.is_none())
585 || i_new_owner
586 {
587 (*self.our_key).clone()
588 } else {
589 self.subject_metadata
590 .new_owner
591 .clone()
592 .unwrap_or_else(|| self.subject_metadata.owner.clone())
593 };
594
595 let init_approver = InitApprPersist {
596 our_key: self.our_key.clone(),
597 node_key,
598 subject_id: self.subject_metadata.subject_id.clone(),
599 pass_votation,
600 helpers: (*hash, network.clone()),
601 };
602
603 ctx.create_child("approver", ApprPersist::initial(init_approver))
604 .await?;
605
606 Ok(())
607 }
608
609 async fn current_validation_roles(
610 &self,
611 ctx: &ActorContext<Self>,
612 schema_id: SchemaType,
613 ) -> Result<CurrentValidationRoles, ActorError> {
614 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
615 let response = actor
616 .ask(RoleRegisterMessage::GetCurrentValidationRoles { schema_id })
617 .await?;
618
619 match response {
620 RoleRegisterResponse::CurrentValidationRoles(roles) => Ok(roles),
621 _ => Err(ActorError::UnexpectedResponse {
622 path: ActorPath::from(format!(
623 "/user/node/subject_manager/{}/role_register",
624 self.subject_metadata.subject_id
625 )),
626 expected: "RoleRegisterResponse::CurrentValidationRoles"
627 .to_owned(),
628 }),
629 }
630 }
631
632 async fn refresh_version_sync(
633 &self,
634 ctx: &ActorContext<Self>,
635 ) -> Result<(), ActorError> {
636 if !self.service {
637 return Ok(());
638 }
639
640 let version_sync = ctx
641 .get_child::<GovernanceVersionSync>("version_sync")
642 .await?;
643 let governance_peers = self
644 .properties
645 .get_witnesses(WitnessesData::Gov)
646 .map_err(|e| ActorError::Functional {
647 description: e.to_string(),
648 })?;
649
650 let _ = version_sync
651 .ask(GovernanceVersionSyncMessage::RefreshGovernance {
652 version: self.properties.version,
653 governance_peers,
654 })
655 .await?;
656
657 Ok(())
658 }
659
660 async fn update_schemas(
661 &self,
662 ctx: &ActorContext<Self>,
663 schema_creators_eval: &BTreeMap<
664 SchemaType,
665 BTreeMap<PublicKey, BTreeSet<Namespace>>,
666 >,
667 schema_issuers_eval: &BTreeMap<
668 SchemaType,
669 (BTreeMap<PublicKey, BTreeSet<Namespace>>, bool),
670 >,
671 schema_creators_vali: &BTreeMap<
672 SchemaType,
673 BTreeMap<PublicKey, BTreeSet<Namespace>>,
674 >,
675 update_eval: &BTreeMap<SchemaType, ValueWrapper>,
676 update_vali: &BTreeMap<SchemaType, ValueWrapper>,
677 ) -> Result<(), ActorError> {
678 for (schema_id, init_state) in update_eval.iter() {
679 let actor = ctx
680 .get_child::<EvaluationSchema>(&format!(
681 "{}_evaluation",
682 schema_id
683 ))
684 .await?;
685
686 actor
687 .tell(EvaluationSchemaMessage::Update {
688 members: self
689 .properties
690 .members
691 .values()
692 .cloned()
693 .collect(),
694 creators: schema_creators_eval
695 .get(schema_id)
696 .cloned()
697 .unwrap_or_default(),
698 issuers: schema_issuers_eval
699 .get(schema_id)
700 .map(|(issuers, _)| issuers.clone())
701 .unwrap_or_default(),
702 issuer_any: schema_issuers_eval
703 .get(schema_id)
704 .map(|(_, issuer_any)| *issuer_any)
705 .unwrap_or(false),
706 schema_viewpoints: self
707 .properties
708 .schemas
709 .get(schema_id)
710 .map(|schema| schema.viewpoints.clone())
711 .unwrap_or_default(),
712 sn: self.subject_metadata.sn,
713 gov_version: self.properties.version,
714 init_state: init_state.clone(),
715 })
716 .await?;
717 }
718
719 for (schema_id, init_state) in update_vali.iter() {
720 let current_roles = self
721 .current_validation_roles(ctx, schema_id.clone())
722 .await?;
723 let actor = ctx
724 .get_child::<ValidationSchema>(&format!(
725 "{}_validation",
726 schema_id
727 ))
728 .await?;
729
730 actor
731 .tell(ValidationSchemaMessage::Update {
732 creators: schema_creators_vali
733 .get(schema_id)
734 .cloned()
735 .unwrap_or_default(),
736 sn: self.subject_metadata.sn,
737 gov_version: self.properties.version,
738 init_state: init_state.clone(),
739 current_roles: current_roles.schema,
740 })
741 .await?;
742 }
743
744 Ok(())
745 }
746
747 async fn down_schemas(
748 ctx: &ActorContext<Self>,
749 old_schemas_eval: &BTreeSet<SchemaType>,
750 old_schemas_val: &BTreeSet<SchemaType>,
751 ) -> Result<(), ActorError> {
752 for schema_id in old_schemas_eval {
753 let actor = ctx
754 .get_child::<EvaluationSchema>(&format!(
755 "{}_evaluation",
756 schema_id
757 ))
758 .await?;
759 actor.ask_stop().await?;
760 }
761
762 for schema_id in old_schemas_val {
763 let actor = ctx
764 .get_child::<ValidationSchema>(&format!(
765 "{}_validation",
766 schema_id
767 ))
768 .await?;
769 actor.ask_stop().await?;
770 }
771
772 Ok(())
773 }
774
775 async fn up_schemas(
776 &self,
777 ctx: &mut ActorContext<Self>,
778 schema_roles: SchemaRuntimeRoles<'_>,
779 up_eval: &BTreeMap<SchemaType, ValueWrapper>,
780 up_vali: &BTreeMap<SchemaType, ValueWrapper>,
781 hash_network: (&HashAlgorithm, &Arc<NetworkSender>),
782 ) -> Result<(), ActorError> {
783 for (schema_id, init_state) in up_eval.iter() {
784 let eval_actor = EvaluationSchema {
785 our_key: self.our_key.clone(),
786 governance_id: self.subject_metadata.subject_id.clone(),
787 gov_version: self.properties.version,
788 sn: self.subject_metadata.sn,
789 members: self.properties.members.values().cloned().collect(),
790 creators: schema_roles
791 .creators_eval
792 .get(schema_id)
793 .cloned()
794 .unwrap_or_default(),
795 issuers: schema_roles
796 .issuers_eval
797 .get(schema_id)
798 .map(|(issuers, _)| issuers.clone())
799 .unwrap_or_default(),
800 issuer_any: schema_roles
801 .issuers_eval
802 .get(schema_id)
803 .map(|(_, issuer_any)| *issuer_any)
804 .unwrap_or(false),
805 schema_viewpoints: self
806 .properties
807 .schemas
808 .get(schema_id)
809 .map(|schema| schema.viewpoints.clone())
810 .unwrap_or_default(),
811 schema_id: schema_id.clone(),
812 init_state: init_state.clone(),
813 hash: *hash_network.0,
814 network: hash_network.1.clone(),
815 };
816
817 ctx.create_child(&format!("{}_evaluation", schema_id), eval_actor)
818 .await?;
819 }
820
821 for (schema_id, init_state) in up_vali.iter() {
822 let current_roles = self
823 .current_validation_roles(ctx, schema_id.clone())
824 .await?;
825 let vali_actor = ValidationSchema {
826 our_key: self.our_key.clone(),
827 governance_id: self.subject_metadata.subject_id.clone(),
828 gov_version: self.properties.version,
829 sn: self.subject_metadata.sn,
830 creators: schema_roles
831 .creators_vali
832 .get(schema_id)
833 .cloned()
834 .unwrap_or_default(),
835 schema_id: schema_id.clone(),
836 init_state: init_state.clone(),
837 current_roles: current_roles.schema,
838 hash: *hash_network.0,
839 network: hash_network.1.clone(),
840 };
841
842 ctx.create_child(&format!("{}_validation", schema_id), vali_actor)
843 .await?;
844 }
845
846 Ok(())
847 }
848
849 async fn manager_schemas_compilers(
850 &self,
851 ctx: &mut ActorContext<Self>,
852 old_gov: &GovernanceData,
853 ) -> Result<(), ActorError> {
854 let Some(network) = ctx
855 .system()
856 .get_helper::<Arc<NetworkSender>>("network")
857 .await
858 else {
859 return Err(ActorError::Helper {
860 name: "network".to_owned(),
861 reason: "Not found".to_owned(),
862 });
863 };
864
865 let Some(hash) = self.hash else {
866 return Err(ActorError::FunctionalCritical {
867 description: "Hash algorithm is None".to_string(),
868 });
869 };
870
871 let (old_schemas_eval, new_schemas_eval) = {
872 let old_schemas_eval =
873 old_gov.schemas_name(ProtocolTypes::Evaluation, &self.our_key);
874
875 let new_schemas_eval = self
876 .properties
877 .schemas(ProtocolTypes::Evaluation, &self.our_key);
878
879 let down = old_schemas_eval
882 .clone()
883 .iter()
884 .filter(|x| !new_schemas_eval.contains_key(x))
885 .cloned()
886 .collect();
887 Self::down_compilers_schemas(
888 ctx,
889 &down,
890 &self.subject_metadata.subject_id,
891 )
892 .await?;
893
894 let up = new_schemas_eval
896 .clone()
897 .iter()
898 .filter(|x| !old_schemas_eval.contains(x.0))
899 .map(|x| (x.0.clone(), x.1.clone()))
900 .collect();
901
902 Self::up_compilers_schemas(
903 ctx,
904 &up,
905 self.subject_metadata.subject_id.clone(),
906 &hash,
907 )
908 .await?;
909
910 let current = new_schemas_eval
912 .clone()
913 .iter()
914 .filter(|x| old_schemas_eval.contains(x.0))
915 .map(|x| (x.0.clone(), x.1.clone()))
916 .collect();
917
918 Self::compile_schemas(
919 ctx,
920 current,
921 self.subject_metadata.subject_id.clone(),
922 )
923 .await?;
924
925 (
926 old_schemas_eval,
927 new_schemas_eval
928 .iter()
929 .map(|x| (x.0.clone(), x.1.initial_value.clone()))
930 .collect::<BTreeMap<SchemaType, ValueWrapper>>(),
931 )
932 };
933 let old_schemas_vali =
934 old_gov.schemas_name(ProtocolTypes::Validation, &self.our_key);
935
936 let new_schemas_vali = self
937 .properties
938 .schemas_init_value(ProtocolTypes::Validation, &self.our_key);
939
940 let down_eval = old_schemas_eval
942 .clone()
943 .iter()
944 .filter(|x| !new_schemas_eval.contains_key(x))
945 .cloned()
946 .collect();
947
948 let down_vali = old_schemas_vali
949 .clone()
950 .iter()
951 .filter(|x| !new_schemas_vali.contains_key(x))
952 .cloned()
953 .collect();
954
955 Self::down_schemas(ctx, &down_eval, &down_vali).await?;
956
957 let schemas_namespace_eval = self
959 .properties
960 .schemas_namespace(ProtocolTypes::Evaluation, &self.our_key);
961
962 let schema_creators_eval = self
963 .properties
964 .schema_creators_namespace(schemas_namespace_eval.clone());
965 let schema_issuers_eval = self
966 .properties
967 .schema_issuers_namespace(schemas_namespace_eval);
968
969 let up_eval = new_schemas_eval
970 .clone()
971 .iter()
972 .filter(|x| !old_schemas_eval.contains(x.0))
973 .map(|x| (x.0.clone(), x.1.clone()))
974 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
975
976 let schemas_namespace_vali = self
977 .properties
978 .schemas_namespace(ProtocolTypes::Validation, &self.our_key);
979
980 let schema_creators_vali = self
981 .properties
982 .schema_creators_namespace(schemas_namespace_vali);
983
984 let up_vali = new_schemas_vali
985 .clone()
986 .iter()
987 .filter(|x| !old_schemas_vali.contains(x.0))
988 .map(|x| (x.0.clone(), x.1.clone()))
989 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
990 self.up_schemas(
992 ctx,
993 SchemaRuntimeRoles {
994 creators_eval: &schema_creators_eval,
995 issuers_eval: &schema_issuers_eval,
996 creators_vali: &schema_creators_vali,
997 },
998 &up_eval,
999 &up_vali,
1000 (&hash, &network),
1001 )
1002 .await?;
1003
1004 let update_eval = new_schemas_eval
1006 .clone()
1007 .iter()
1008 .filter(|x| old_schemas_eval.contains(x.0))
1009 .map(|x| (x.0.clone(), x.1.clone()))
1010 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
1011
1012 let update_vali = new_schemas_vali
1013 .clone()
1014 .iter()
1015 .filter(|x| old_schemas_vali.contains(x.0))
1016 .map(|x| (x.0.clone(), x.1.clone()))
1017 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
1018
1019 self.update_schemas(
1020 ctx,
1021 &schema_creators_eval,
1022 &schema_issuers_eval,
1023 &schema_creators_vali,
1024 &update_eval,
1025 &update_vali,
1026 )
1027 .await
1028 }
1029
1030 async fn update_childs(
1031 &self,
1032 ctx: &ActorContext<Self>,
1033 ) -> Result<(), ActorError> {
1034 if let Ok(evaluator) = ctx.get_child::<EvalWorker>("evaluator").await {
1035 let (issuers, issuer_any) = self.properties.governance_issuers();
1036 evaluator
1037 .tell(EvalWorkerMessage::Update {
1038 gov_version: self.properties.version,
1039 issuers,
1040 issuer_any,
1041 })
1042 .await?;
1043 }
1044
1045 if let Ok(validator) = ctx.get_child::<ValiWorker>("validator").await {
1046 let current_roles = self
1047 .current_validation_roles(ctx, SchemaType::Governance)
1048 .await?;
1049 validator
1050 .tell(ValiWorkerMessage::UpdateCurrentRoles {
1051 gov_version: self.properties.version,
1052 current_roles: crate::validation::worker::CurrentWorkerRoles {
1053 approval: current_roles.approval,
1054 evaluation: crate::governance::role_register::RoleDataRegister {
1055 workers: current_roles
1056 .schema
1057 .evaluation
1058 .iter()
1059 .map(|role| role.key.clone())
1060 .collect(),
1061 quorum: current_roles.schema.evaluation_quorum,
1062 },
1063 },
1064 })
1065 .await?;
1066 }
1067
1068 Ok(())
1069 }
1070
1071 async fn sweep_contract_artifacts(
1072 &self,
1073 ctx: &ActorContext<Self>,
1074 schemas: &BTreeMap<SchemaType, Schema>,
1075 ) -> Result<(), ActorError> {
1076 let Some(config) =
1077 ctx.system().get_helper::<ConfigHelper>("config").await
1078 else {
1079 return Err(ActorError::Helper {
1080 name: "config".to_string(),
1081 reason: "Not Found".to_string(),
1082 });
1083 };
1084
1085 let Some(contracts) = ctx
1086 .system()
1087 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1088 "contracts",
1089 )
1090 .await
1091 else {
1092 return Err(ActorError::Helper {
1093 name: "contracts".to_string(),
1094 reason: "Not Found".to_string(),
1095 });
1096 };
1097
1098 let contract_register = ctx
1099 .get_child::<ContractRegister>("contract_register")
1100 .await?;
1101
1102 let prefix = format!("{}_", self.subject_metadata.subject_id);
1103 let mut allowed: HashSet<String> = schemas
1104 .keys()
1105 .map(|schema_id| {
1106 format!("{}_{}", self.subject_metadata.subject_id, schema_id)
1107 })
1108 .collect();
1109
1110 let registered: Vec<String> = match contract_register
1111 .ask(ContractRegisterMessage::ListContracts)
1112 .await?
1113 {
1114 ContractRegisterResponse::Contracts(contracts) => contracts,
1115 _ => Vec::new(),
1116 };
1117
1118 for contract_name in registered {
1119 if contract_name.starts_with(&prefix)
1120 && !allowed.contains(&contract_name)
1121 {
1122 contract_register
1123 .tell(ContractRegisterMessage::DeleteMetadata {
1124 contract_name: contract_name.clone(),
1125 })
1126 .await?;
1127 let mut contracts = contracts.write().await;
1128 contracts.remove(&contract_name);
1129 }
1130 }
1131
1132 let contracts_dir = config.contracts_path;
1133 if !contracts_dir.exists() {
1134 return Ok(());
1135 }
1136
1137 let mut entries = fs::read_dir(&contracts_dir).await.map_err(|e| {
1138 ActorError::Functional {
1139 description: format!(
1140 "Can not read contracts directory {}: {}",
1141 contracts_dir.display(),
1142 e
1143 ),
1144 }
1145 })?;
1146
1147 while let Some(entry) =
1148 entries
1149 .next_entry()
1150 .await
1151 .map_err(|e| ActorError::Functional {
1152 description: format!(
1153 "Can not iterate contracts directory {}: {}",
1154 contracts_dir.display(),
1155 e
1156 ),
1157 })?
1158 {
1159 let file_name = entry.file_name().to_string_lossy().to_string();
1160 if !file_name.starts_with(&prefix) {
1161 continue;
1162 }
1163
1164 let is_temp = file_name.starts_with(&format!(
1165 "{}_temp_",
1166 self.subject_metadata.subject_id
1167 ));
1168 if is_temp || !allowed.contains(&file_name) {
1169 let path = entry.path();
1170 let _ = fs::remove_dir_all(path).await;
1171 if !is_temp {
1172 allowed.remove(&file_name);
1173 }
1174 }
1175 }
1176
1177 Ok(())
1178 }
1179
1180 async fn delete_all_contract_artifacts(
1181 &self,
1182 ctx: &ActorContext<Self>,
1183 contract_register: &ActorRef<ContractRegister>,
1184 ) -> Result<(), ActorError> {
1185 let Some(config) =
1186 ctx.system().get_helper::<ConfigHelper>("config").await
1187 else {
1188 return Err(ActorError::Helper {
1189 name: "config".to_string(),
1190 reason: "Not Found".to_string(),
1191 });
1192 };
1193
1194 let Some(contracts) = ctx
1195 .system()
1196 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1197 "contracts",
1198 )
1199 .await
1200 else {
1201 return Err(ActorError::Helper {
1202 name: "contracts".to_string(),
1203 reason: "Not Found".to_string(),
1204 });
1205 };
1206
1207 let prefix = format!("{}_", self.subject_metadata.subject_id);
1208
1209 let registered: Vec<String> = match contract_register
1210 .ask(ContractRegisterMessage::ListContracts)
1211 .await?
1212 {
1213 ContractRegisterResponse::Contracts(contracts) => contracts,
1214 _ => Vec::new(),
1215 };
1216
1217 for contract_name in registered {
1218 if contract_name.starts_with(&prefix) {
1219 contract_register
1220 .ask(ContractRegisterMessage::DeleteMetadata {
1221 contract_name: contract_name.clone(),
1222 })
1223 .await?;
1224 let mut contracts = contracts.write().await;
1225 contracts.remove(&contract_name);
1226 }
1227 }
1228
1229 let contracts_dir = config.contracts_path;
1230 if !contracts_dir.exists() {
1231 return Ok(());
1232 }
1233
1234 let mut entries = fs::read_dir(&contracts_dir).await.map_err(|e| {
1235 ActorError::Functional {
1236 description: format!(
1237 "Can not read contracts directory {}: {}",
1238 contracts_dir.display(),
1239 e
1240 ),
1241 }
1242 })?;
1243
1244 while let Some(entry) =
1245 entries
1246 .next_entry()
1247 .await
1248 .map_err(|e| ActorError::Functional {
1249 description: format!(
1250 "Can not iterate contracts directory {}: {}",
1251 contracts_dir.display(),
1252 e
1253 ),
1254 })?
1255 {
1256 let file_name = entry.file_name().to_string_lossy().to_string();
1257 if file_name.starts_with(&prefix) {
1258 let path = entry.path();
1259 fs::remove_dir_all(&path).await.map_err(|e| {
1260 ActorError::Functional {
1261 description: format!(
1262 "Can not remove contract directory {}: {}",
1263 path.display(),
1264 e
1265 ),
1266 }
1267 })?;
1268 }
1269 }
1270
1271 Ok(())
1272 }
1273
1274 async fn build_childs(
1275 &self,
1276 ctx: &mut ActorContext<Self>,
1277 hash: &HashAlgorithm,
1278 network: &Arc<NetworkSender>,
1279 ) -> Result<(), ActorError> {
1280 let owner = *self.our_key == self.subject_metadata.owner;
1282 let new_owner = self.subject_metadata.new_owner.is_some();
1283 let i_new_owner =
1284 self.subject_metadata.new_owner == Some((*self.our_key).clone());
1285
1286 if new_owner {
1287 if i_new_owner {
1288 self.up_owner(ctx, hash, network).await?;
1289 } else {
1290 self.up_not_owner(ctx, hash, network).await?;
1291 }
1292 } else if owner {
1293 self.up_owner(ctx, hash, network).await?;
1294 } else {
1295 self.up_not_owner(ctx, hash, network).await?;
1296 }
1297
1298 let new_schemas_eval = {
1299 let schemas = self
1300 .properties
1301 .schemas(ProtocolTypes::Evaluation, &self.our_key);
1302 self.sweep_contract_artifacts(ctx, &schemas).await?;
1303 Self::up_compilers_schemas(
1304 ctx,
1305 &schemas,
1306 self.subject_metadata.subject_id.clone(),
1307 hash,
1308 )
1309 .await?;
1310
1311 schemas
1312 .iter()
1313 .map(|x| (x.0.clone(), x.1.initial_value.clone()))
1314 .collect::<BTreeMap<SchemaType, ValueWrapper>>()
1315 };
1316
1317 let schemas_namespace_eval = self
1318 .properties
1319 .schemas_namespace(ProtocolTypes::Evaluation, &self.our_key);
1320
1321 let schema_creators_eval = self
1322 .properties
1323 .schema_creators_namespace(schemas_namespace_eval.clone());
1324 let schema_issuers_eval = self
1325 .properties
1326 .schema_issuers_namespace(schemas_namespace_eval);
1327
1328 let schemas_namespace_vali = self
1329 .properties
1330 .schemas_namespace(ProtocolTypes::Validation, &self.our_key);
1331
1332 let schema_creators_vali = self
1333 .properties
1334 .schema_creators_namespace(schemas_namespace_vali);
1335
1336 let new_schemas_vali = self
1337 .properties
1338 .schemas_init_value(ProtocolTypes::Validation, &self.our_key);
1339
1340 self.up_schemas(
1341 ctx,
1342 SchemaRuntimeRoles {
1343 creators_eval: &schema_creators_eval,
1344 issuers_eval: &schema_issuers_eval,
1345 creators_vali: &schema_creators_vali,
1346 },
1347 &new_schemas_eval,
1348 &new_schemas_vali,
1349 (hash, network),
1350 )
1351 .await
1352 }
1353
1354 async fn up_not_owner(
1355 &self,
1356 ctx: &mut ActorContext<Self>,
1357 hash: &HashAlgorithm,
1358 network: &Arc<NetworkSender>,
1359 ) -> Result<(), ActorError> {
1360 let node_key = self.subject_metadata.new_owner.as_ref().map_or_else(
1361 || self.subject_metadata.owner.clone(),
1362 |new_owner| new_owner.clone(),
1363 );
1364
1365 if self.properties.has_this_role(HashThisRole::Gov {
1366 who: (*self.our_key).clone(),
1367 role: RoleTypes::Validator,
1368 }) {
1369 let current_roles = self
1370 .current_validation_roles(ctx, SchemaType::Governance)
1371 .await?;
1372 let validator = ValiWorker {
1374 node_key: node_key.clone(),
1375 our_key: self.our_key.clone(),
1376 init_state: None,
1377 governance_id: self.subject_metadata.subject_id.clone(),
1378 gov_version: self.properties.version,
1379 sn: self.subject_metadata.sn,
1380 hash: *hash,
1381 network: network.clone(),
1382 current_roles: crate::validation::worker::CurrentWorkerRoles {
1383 approval: current_roles.approval,
1384 evaluation:
1385 crate::governance::role_register::RoleDataRegister {
1386 workers: current_roles
1387 .schema
1388 .evaluation
1389 .iter()
1390 .map(|role| role.key.clone())
1391 .collect(),
1392 quorum: current_roles.schema.evaluation_quorum,
1393 },
1394 },
1395 stop: false,
1396 };
1397 ctx.create_child("validator", validator).await?;
1398 }
1399
1400 if self.properties.has_this_role(HashThisRole::Gov {
1401 who: (*self.our_key).clone(),
1402 role: RoleTypes::Evaluator,
1403 }) {
1404 let (issuers, issuer_any) = self.properties.governance_issuers();
1405 let evaluator = EvalWorker {
1407 node_key: node_key.clone(),
1408 our_key: self.our_key.clone(),
1409 governance_id: self.subject_metadata.subject_id.clone(),
1410 gov_version: self.properties.version,
1411 sn: self.subject_metadata.sn,
1412 context: EvalWorkerContext::Governance {
1413 issuers,
1414 issuer_any,
1415 },
1416 init_state: None,
1417 hash: *hash,
1418 network: network.clone(),
1419 stop: false,
1420 };
1421 ctx.create_child("evaluator", evaluator).await?;
1422 }
1423
1424 if self.properties.has_this_role(HashThisRole::Gov {
1425 who: (*self.our_key).clone(),
1426 role: RoleTypes::Approver,
1427 }) {
1428 let always_accept = if let Some(config) =
1429 ctx.system().get_helper::<ConfigHelper>("config").await
1430 {
1431 config.always_accept
1432 } else {
1433 return Err(ActorError::Helper {
1434 name: "config".to_owned(),
1435 reason: "Not found".to_owned(),
1436 });
1437 };
1438
1439 let pass_votation = if always_accept {
1440 VotationType::AlwaysAccept
1441 } else {
1442 VotationType::Manual
1443 };
1444
1445 let init_approver = InitApprPersist {
1446 our_key: self.our_key.clone(),
1447 node_key: node_key.clone(),
1448 subject_id: self.subject_metadata.subject_id.clone(),
1449 pass_votation,
1450 helpers: (*hash, network.clone()),
1451 };
1452
1453 ctx.create_child("approver", ApprPersist::initial(init_approver))
1454 .await?;
1455 }
1456
1457 Ok(())
1458 }
1459
1460 async fn up_down_not_owner(
1461 &self,
1462 ctx: &mut ActorContext<Self>,
1463 old_gov: &GovernanceData,
1464 hash: &HashAlgorithm,
1465 network: &Arc<NetworkSender>,
1466 ) -> Result<(), ActorError> {
1467 let node_key = self.subject_metadata.new_owner.as_ref().map_or_else(
1468 || self.subject_metadata.owner.clone(),
1469 |new_owner| new_owner.clone(),
1470 );
1471
1472 let old_val = old_gov.has_this_role(HashThisRole::Gov {
1473 who: (*self.our_key).clone(),
1474 role: RoleTypes::Validator,
1475 });
1476
1477 let new_val = self.properties.has_this_role(HashThisRole::Gov {
1478 who: (*self.our_key).clone(),
1479 role: RoleTypes::Validator,
1480 });
1481
1482 match (old_val, new_val) {
1483 (true, false) => {
1484 let actor = ctx.get_child::<ValiWorker>("validator").await?;
1485 actor.ask_stop().await?;
1486 }
1487 (false, true) => {
1488 let current_roles = self
1489 .current_validation_roles(ctx, SchemaType::Governance)
1490 .await?;
1491 let validator = ValiWorker {
1493 node_key: node_key.clone(),
1494 our_key: self.our_key.clone(),
1495 init_state: None,
1496 governance_id: self.subject_metadata.subject_id.clone(),
1497 gov_version: self.properties.version,
1498 sn: self.subject_metadata.sn,
1499 hash: *hash,
1500 network: network.clone(),
1501 current_roles: crate::validation::worker::CurrentWorkerRoles {
1502 approval: current_roles.approval,
1503 evaluation: crate::governance::role_register::RoleDataRegister {
1504 workers: current_roles
1505 .schema
1506 .evaluation
1507 .iter()
1508 .map(|role| role.key.clone())
1509 .collect(),
1510 quorum: current_roles.schema.evaluation_quorum,
1511 },
1512 },
1513 stop: false,
1514 };
1515 ctx.create_child("validator", validator).await?;
1516 }
1517 _ => {}
1518 };
1519
1520 let old_eval = old_gov.has_this_role(HashThisRole::Gov {
1521 who: (*self.our_key).clone(),
1522 role: RoleTypes::Evaluator,
1523 });
1524
1525 let new_eval = self.properties.has_this_role(HashThisRole::Gov {
1526 who: (*self.our_key).clone(),
1527 role: RoleTypes::Evaluator,
1528 });
1529
1530 match (old_eval, new_eval) {
1531 (true, false) => {
1532 let actor = ctx.get_child::<EvalWorker>("evaluator").await?;
1533
1534 actor.ask_stop().await?;
1535 }
1536 (false, true) => {
1537 let (issuers, issuer_any) =
1538 self.properties.governance_issuers();
1539 let evaluator = EvalWorker {
1540 node_key: node_key.clone(),
1541 our_key: self.our_key.clone(),
1542 governance_id: self.subject_metadata.subject_id.clone(),
1543 gov_version: self.properties.version,
1544 sn: self.subject_metadata.sn,
1545 context: EvalWorkerContext::Governance {
1546 issuers,
1547 issuer_any,
1548 },
1549 init_state: None,
1550 hash: *hash,
1551 network: network.clone(),
1552 stop: false,
1553 };
1554 ctx.create_child("evaluator", evaluator).await?;
1555 }
1556 _ => {}
1557 };
1558
1559 let old_appr = old_gov.has_this_role(HashThisRole::Gov {
1560 who: (*self.our_key).clone(),
1561 role: RoleTypes::Approver,
1562 });
1563
1564 let new_appr = self.properties.has_this_role(HashThisRole::Gov {
1565 who: (*self.our_key).clone(),
1566 role: RoleTypes::Approver,
1567 });
1568
1569 match (old_appr, new_appr) {
1570 (true, false) => {
1571 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1572
1573 actor.ask_stop().await?;
1574 }
1575 (false, true) => {
1576 let always_accept = if let Some(config) =
1577 ctx.system().get_helper::<ConfigHelper>("config").await
1578 {
1579 config.always_accept
1580 } else {
1581 return Err(ActorError::Helper {
1582 name: "config".to_owned(),
1583 reason: "Not found".to_owned(),
1584 });
1585 };
1586
1587 let pass_votation = if always_accept {
1588 VotationType::AlwaysAccept
1589 } else {
1590 VotationType::Manual
1591 };
1592
1593 let init_approver = InitApprPersist {
1594 our_key: self.our_key.clone(),
1595 node_key: node_key.clone(),
1596 subject_id: self.subject_metadata.subject_id.clone(),
1597 pass_votation,
1598 helpers: (*hash, network.clone()),
1599 };
1600
1601 ctx.create_child(
1602 "approver",
1603 ApprPersist::initial(init_approver),
1604 )
1605 .await?;
1606 }
1607 _ => {}
1608 };
1609
1610 Ok(())
1611 }
1612
1613 async fn down_not_owner(
1614 ctx: &ActorContext<Self>,
1615 gov: &GovernanceData,
1616 our_key: Arc<PublicKey>,
1617 ) -> Result<(), ActorError> {
1618 if gov.has_this_role(HashThisRole::Gov {
1619 who: (*our_key).clone(),
1620 role: RoleTypes::Validator,
1621 }) {
1622 let actor = ctx.get_child::<ValiWorker>("validator").await?;
1623
1624 actor.ask_stop().await?;
1625 }
1626
1627 if gov.has_this_role(HashThisRole::Gov {
1628 who: (*our_key).clone(),
1629 role: RoleTypes::Evaluator,
1630 }) {
1631 let actor = ctx.get_child::<EvalWorker>("evaluator").await?;
1632
1633 actor.ask_stop().await?;
1634 }
1635
1636 if gov.has_this_role(HashThisRole::Gov {
1637 who: (*our_key).clone(),
1638 role: RoleTypes::Approver,
1639 }) {
1640 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1641
1642 actor.ask_stop().await?;
1643 }
1644
1645 Ok(())
1646 }
1647
1648 async fn up_owner(
1649 &self,
1650 ctx: &mut ActorContext<Self>,
1651 hash: &HashAlgorithm,
1652 network: &Arc<NetworkSender>,
1653 ) -> Result<(), ActorError> {
1654 let always_accept = if let Some(config) =
1655 ctx.system().get_helper::<ConfigHelper>("config").await
1656 {
1657 config.always_accept
1658 } else {
1659 return Err(ActorError::Helper {
1660 name: "config".to_string(),
1661 reason: "Not Found".to_string(),
1662 });
1663 };
1664 let pass_votation = if always_accept {
1665 VotationType::AlwaysAccept
1666 } else {
1667 VotationType::Manual
1668 };
1669
1670 let init_approver = InitApprPersist {
1671 our_key: self.our_key.clone(),
1672 node_key: (*self.our_key).clone(),
1673 subject_id: self.subject_metadata.subject_id.clone(),
1674 pass_votation,
1675 helpers: (*hash, network.clone()),
1676 };
1677
1678 ctx.create_child("approver", ApprPersist::initial(init_approver))
1679 .await?;
1680
1681 Ok(())
1682 }
1683
1684 async fn down_owner(ctx: &ActorContext<Self>) -> Result<(), ActorError> {
1685 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1686 actor.ask_stop().await?;
1687
1688 Ok(())
1689 }
1690
1691 async fn up_compilers_schemas(
1692 ctx: &mut ActorContext<Self>,
1693 schemas: &BTreeMap<SchemaType, Schema>,
1694 subject_id: DigestIdentifier,
1695 hash: &HashAlgorithm,
1696 ) -> Result<(), ActorError> {
1697 let contracts_path = if let Some(config) =
1698 ctx.system().get_helper::<ConfigHelper>("config").await
1699 {
1700 config.contracts_path
1701 } else {
1702 return Err(ActorError::Helper {
1703 name: "config".to_string(),
1704 reason: "Not Found".to_string(),
1705 });
1706 };
1707
1708 for (id, schema) in schemas {
1709 let actor_name = format!("{}_contract_compiler", id);
1710
1711 let compiler = ctx
1712 .create_child(&actor_name, ContractCompiler::new(*hash))
1713 .await?;
1714
1715 let Schema {
1716 contract,
1717 initial_value,
1718 viewpoints: _,
1719 } = schema;
1720
1721 let response = compiler
1722 .ask(ContractCompilerMessage::Compile {
1723 contract_name: format!("{}_{}", subject_id, id),
1724 contract: contract.clone(),
1725 initial_value: initial_value.0.clone(),
1726 contract_path: contracts_path
1727 .join("contracts")
1728 .join(format!("{}_{}", subject_id, id)),
1729 })
1730 .await?;
1731
1732 if let CompilerResponse::Error(error) = response {
1733 return Err(ActorError::Functional {
1734 description: format!(
1735 "Can not compile schema contract {}: {}",
1736 id, error
1737 ),
1738 });
1739 }
1740 }
1741
1742 Ok(())
1743 }
1744
1745 async fn down_compilers_schemas(
1746 ctx: &ActorContext<Self>,
1747 schemas: &BTreeSet<SchemaType>,
1748 subject_id: &DigestIdentifier,
1749 ) -> Result<(), ActorError> {
1750 let Some(config) =
1751 ctx.system().get_helper::<ConfigHelper>("config").await
1752 else {
1753 return Err(ActorError::Helper {
1754 name: "config".to_string(),
1755 reason: "Not Found".to_string(),
1756 });
1757 };
1758
1759 let Some(contracts) = ctx
1760 .system()
1761 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1762 "contracts",
1763 )
1764 .await
1765 else {
1766 return Err(ActorError::Helper {
1767 name: "contracts".to_string(),
1768 reason: "Not Found".to_string(),
1769 });
1770 };
1771
1772 let contract_register = ctx
1773 .get_child::<ContractRegister>("contract_register")
1774 .await?;
1775
1776 for schema_id in schemas.iter() {
1777 let actor = ctx
1778 .get_child::<ContractCompiler>(&format!(
1779 "{}_contract_compiler",
1780 schema_id
1781 ))
1782 .await?;
1783
1784 actor.ask_stop().await?;
1785
1786 let contract_name = format!("{}_{}", subject_id, schema_id);
1787 contract_register
1788 .tell(ContractRegisterMessage::DeleteMetadata {
1789 contract_name: contract_name.clone(),
1790 })
1791 .await?;
1792
1793 {
1794 let mut contracts = contracts.write().await;
1795 contracts.remove(&contract_name);
1796 }
1797
1798 let contract_path = config.contracts_path.join(&contract_name);
1799 let _ = fs::remove_dir_all(contract_path).await;
1800 }
1801
1802 Ok(())
1803 }
1804
1805 async fn compile_schemas(
1806 ctx: &ActorContext<Self>,
1807 schemas: HashMap<SchemaType, Schema>,
1808 subject_id: DigestIdentifier,
1809 ) -> Result<(), ActorError> {
1810 let contracts_path = if let Some(config) =
1811 ctx.system().get_helper::<ConfigHelper>("config").await
1812 {
1813 config.contracts_path
1814 } else {
1815 return Err(ActorError::Helper {
1816 name: "config".to_owned(),
1817 reason: "Not found".to_owned(),
1818 });
1819 };
1820
1821 for (id, schema) in schemas {
1822 let actor = ctx
1823 .get_child::<ContractCompiler>(&format!(
1824 "{}_contract_compiler",
1825 id
1826 ))
1827 .await?;
1828
1829 let response = actor
1830 .ask(ContractCompilerMessage::Compile {
1831 contract_name: format!("{}_{}", subject_id, id),
1832 contract: schema.contract.clone(),
1833 initial_value: schema.initial_value.0.clone(),
1834 contract_path: contracts_path
1835 .join("contracts")
1836 .join(format!("{}_{}", subject_id, id)),
1837 })
1838 .await?;
1839
1840 if let CompilerResponse::Error(error) = response {
1841 return Err(ActorError::Functional {
1842 description: format!(
1843 "Can not refresh schema contract {}: {}",
1844 id, error
1845 ),
1846 });
1847 }
1848 }
1849
1850 Ok(())
1851 }
1852
1853 fn build_creators_register_fact(
1854 &self,
1855 new_creator: HashMap<
1856 (SchemaType, String, PublicKey),
1857 (CreatorQuantity, Vec<WitnessesType>),
1858 >,
1859 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
1860 creator_update: CreatorRoleUpdate,
1861 new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1862 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1863 ) -> (SubjectRegisterMessage, WitnessesRegisterMessage) {
1864 let creator_registration = |schema_id: &SchemaType,
1865 namespace: &str,
1866 creator: &PublicKey,
1867 witnesses: Vec<WitnessesType>|
1868 -> CreatorWitnessRegistration {
1869 let namespace_value = Namespace::from(namespace.to_owned());
1870 let creator_name = self
1871 .properties
1872 .members
1873 .iter()
1874 .find(|(_, key)| *key == creator)
1875 .map(|(name, _)| name.clone());
1876
1877 let grants = creator_name
1878 .and_then(|creator_name| {
1879 self.properties
1880 .roles_schema
1881 .get(schema_id)
1882 .and_then(|roles_schema| {
1883 roles_schema.creator.get(&RoleCreator::create(
1884 &creator_name,
1885 namespace_value,
1886 ))
1887 })
1888 .map(|creator_role| {
1889 creator_role
1890 .witnesses
1891 .iter()
1892 .filter_map(|witness| {
1893 let witness_type = if witness.name
1894 == ReservedWords::Witnesses.to_string()
1895 {
1896 Some(WitnessesType::Witnesses)
1897 } else {
1898 self.properties
1899 .members
1900 .get(&witness.name)
1901 .cloned()
1902 .map(WitnessesType::User)
1903 }?;
1904
1905 let grant = if witness.viewpoints.contains(
1906 &ReservedWords::AllViewpoints
1907 .to_string(),
1908 ) {
1909 CreatorWitnessGrant::Full
1910 } else if witness.viewpoints.is_empty() {
1911 CreatorWitnessGrant::Hash
1912 } else {
1913 CreatorWitnessGrant::Clear(
1914 witness.viewpoints.clone(),
1915 )
1916 };
1917
1918 Some((witness_type, grant))
1919 })
1920 .collect::<Vec<_>>()
1921 })
1922 })
1923 .unwrap_or_default();
1924
1925 CreatorWitnessRegistration { witnesses, grants }
1926 };
1927
1928 let mut data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)> =
1929 vec![];
1930
1931 let mut new_creator_data: HashMap<
1932 (SchemaType, String, PublicKey),
1933 CreatorWitnessRegistration,
1934 > = HashMap::new();
1935
1936 let mut update_creator_witnesses_data: HashMap<
1937 (SchemaType, String, PublicKey),
1938 CreatorWitnessRegistration,
1939 > = HashMap::new();
1940
1941 for ((schema_id, ns, creator), (quantity, witnesses)) in
1942 new_creator.iter()
1943 {
1944 data.push((
1945 creator.clone(),
1946 schema_id.clone(),
1947 ns.clone(),
1948 quantity.clone(),
1949 ));
1950
1951 new_creator_data.insert(
1952 (schema_id.clone(), ns.clone(), creator.clone()),
1953 creator_registration(schema_id, ns, creator, witnesses.clone()),
1954 );
1955 }
1956
1957 for (schema_id, ns, creator) in remove_creator.iter() {
1958 data.push((
1959 creator.clone(),
1960 schema_id.clone(),
1961 ns.clone(),
1962 CreatorQuantity::Quantity(0),
1963 ));
1964 }
1965
1966 for ((schema_id, ns, creator), (quantity, creator_witnesses)) in
1967 creator_update.new_creator.iter()
1968 {
1969 data.push((
1970 creator.clone(),
1971 schema_id.clone(),
1972 ns.clone(),
1973 quantity.clone(),
1974 ));
1975
1976 let mut witnesses = vec![];
1977 for witness in creator_witnesses.iter() {
1978 if witness.name == ReservedWords::Witnesses.to_string() {
1979 witnesses.push(WitnessesType::Witnesses);
1980 } else if let Some(w) =
1981 self.properties.members.get(&witness.name)
1982 {
1983 witnesses.push(WitnessesType::User(w.clone()));
1984 }
1985 }
1986
1987 new_creator_data.insert(
1988 (schema_id.clone(), ns.clone(), creator.clone()),
1989 creator_registration(schema_id, ns, creator, witnesses),
1990 );
1991 }
1992
1993 for (schema_id, ns, creator, quantity) in
1994 creator_update.update_creator_quantity.iter()
1995 {
1996 data.push((
1997 creator.clone(),
1998 schema_id.clone(),
1999 ns.clone(),
2000 quantity.clone(),
2001 ));
2002 }
2003
2004 for (schema_id, ns, creator, creator_witnesses) in
2005 creator_update.update_creator_witnesses.iter()
2006 {
2007 let mut witnesses = vec![];
2008 for witness in creator_witnesses.iter() {
2009 if witness.name == ReservedWords::Witnesses.to_string() {
2010 witnesses.push(WitnessesType::Witnesses);
2011 } else if let Some(w) =
2012 self.properties.members.get(&witness.name)
2013 {
2014 witnesses.push(WitnessesType::User(w.clone()));
2015 }
2016 }
2017
2018 update_creator_witnesses_data.insert(
2019 (schema_id.clone(), ns.clone(), creator.clone()),
2020 creator_registration(schema_id, ns, creator, witnesses),
2021 );
2022 }
2023
2024 for (schema_id, ns, creator) in creator_update.remove_creator.iter() {
2025 data.push((
2026 creator.clone(),
2027 schema_id.clone(),
2028 ns.clone(),
2029 CreatorQuantity::Quantity(0),
2030 ));
2031 }
2032
2033 (
2034 SubjectRegisterMessage::RegisterData {
2035 gov_version: self.properties.version,
2036 data,
2037 },
2038 WitnessesRegisterMessage::UpdateCreatorsWitnessesFact {
2039 version: self.properties.version,
2040 new_creator: new_creator_data,
2041 remove_creator,
2042 update_creator_witnesses: update_creator_witnesses_data,
2043 new_witnesses,
2044 remove_witnesses,
2045 },
2046 )
2047 }
2048
2049 fn build_creators_register_confirm(
2050 &self,
2051 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
2052 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
2053 ) -> (SubjectRegisterMessage, WitnessesRegisterMessage) {
2054 let data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)> =
2055 remove_creator
2056 .iter()
2057 .map(|x| {
2058 (
2059 x.2.clone(),
2060 x.0.clone(),
2061 x.1.clone(),
2062 CreatorQuantity::Quantity(0),
2063 )
2064 })
2065 .collect();
2066 (
2067 SubjectRegisterMessage::RegisterData {
2068 gov_version: self.properties.version,
2069 data,
2070 },
2071 WitnessesRegisterMessage::UpdateCreatorsWitnessesConfirm {
2072 version: self.properties.version,
2073 remove_creator,
2074 remove_witnesses,
2075 },
2076 )
2077 }
2078
2079 async fn first_role_register(
2080 &self,
2081 ctx: &ActorContext<Self>,
2082 ) -> Result<(), ActorError> {
2083 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2084
2085 actor
2086 .tell(RoleRegisterMessage::UpdateFact {
2087 version: 0,
2088 appr_quorum: Some(Quorum::Majority),
2089 eval_quorum: HashMap::from([(
2090 SchemaType::Governance,
2091 Quorum::Majority,
2092 )]),
2093 new_approvers: vec![self.subject_metadata.owner.clone()],
2094 new_evaluators: HashMap::from([(
2095 (
2096 SchemaType::Governance,
2097 self.subject_metadata.owner.clone(),
2098 ),
2099 vec![Namespace::new()],
2100 )]),
2101 new_validators: HashMap::from([(
2102 (
2103 SchemaType::Governance,
2104 self.subject_metadata.owner.clone(),
2105 ),
2106 vec![Namespace::new()],
2107 )]),
2108 remove_approvers: vec![],
2109 remove_evaluators: HashMap::new(),
2110 remove_validators: HashMap::new(),
2111 vali_quorum: HashMap::from([(
2112 SchemaType::Governance,
2113 Quorum::Majority,
2114 )]),
2115 })
2116 .await
2117 }
2118
2119 async fn update_gov_version(
2120 &self,
2121 ctx: &ActorContext<Self>,
2122 ) -> Result<(), ActorError> {
2123 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2124
2125 actor
2126 .tell(RoleRegisterMessage::UpdateVersion {
2127 version: self.properties.version + 1,
2128 })
2129 .await
2130 }
2131
2132 async fn update_registers_fact(
2133 &self,
2134 ctx: &ActorContext<Self>,
2135 update: RolesUpdate,
2136 creator_update: CreatorRoleUpdate,
2137 ) -> Result<(), ActorError> {
2138 let RolesUpdate {
2139 appr_quorum,
2140 new_approvers,
2141 remove_approvers,
2142 eval_quorum,
2143 new_evaluators,
2144 remove_evaluators,
2145 vali_quorum,
2146 new_validators,
2147 remove_validators,
2148 new_creator,
2149 remove_creator,
2150 new_witnesses,
2151 remove_witnesses,
2152 } = update;
2153
2154 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2155 actor
2156 .tell(RoleRegisterMessage::UpdateFact {
2157 version: self.properties.version,
2158 appr_quorum,
2159 eval_quorum,
2160 new_approvers,
2161 new_evaluators,
2162 new_validators,
2163 remove_approvers,
2164 remove_evaluators,
2165 remove_validators,
2166 vali_quorum,
2167 })
2168 .await?;
2169
2170 let (subj_msg, wit_msg) = self.build_creators_register_fact(
2171 new_creator,
2172 remove_creator,
2173 creator_update,
2174 new_witnesses,
2175 remove_witnesses,
2176 );
2177
2178 let actor =
2179 ctx.get_child::<SubjectRegister>("subject_register").await?;
2180
2181 actor.tell(subj_msg).await?;
2182
2183 let actor = ctx
2184 .get_child::<WitnessesRegister>("witnesses_register")
2185 .await?;
2186
2187 actor.tell(wit_msg).await
2188 }
2189
2190 async fn update_registers_confirm(
2191 &self,
2192 ctx: &ActorContext<Self>,
2193 update: RolesUpdateConfirm,
2194 ) -> Result<(), ActorError> {
2195 let RolesUpdateConfirm {
2196 new_approver,
2197 remove_approver,
2198 new_evaluator,
2199 remove_evaluators,
2200 new_validator,
2201 remove_validators,
2202 remove_creator,
2203 remove_witnesses,
2204 } = update;
2205
2206 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2207 actor
2208 .tell(RoleRegisterMessage::UpdateConfirm {
2209 version: self.properties.version,
2210 new_approver,
2211 remove_approver,
2212 new_evaluator,
2213 remove_evaluators,
2214 new_validator,
2215 remove_validators,
2216 })
2217 .await?;
2218
2219 let (subj_msg, wit_msg) = self
2220 .build_creators_register_confirm(remove_creator, remove_witnesses);
2221
2222 let actor =
2223 ctx.get_child::<SubjectRegister>("subject_register").await?;
2224
2225 actor.tell(subj_msg).await?;
2226
2227 let actor = ctx
2228 .get_child::<WitnessesRegister>("witnesses_register")
2229 .await?;
2230
2231 actor.tell(wit_msg).await
2232 }
2233
2234 async fn verify_new_ledger_events(
2235 &mut self,
2236 ctx: &mut ActorContext<Self>,
2237 events: Vec<Ledger>,
2238 hash: &HashAlgorithm,
2239 ) -> Result<(), ActorError> {
2240 let mut iter = events.into_iter();
2241 let last_ledger = get_last_event(ctx).await?;
2242
2243 let mut last_ledger = if let Some(last_ledger) = last_ledger {
2244 last_ledger
2245 } else {
2246 let Some(first) = iter.next() else {
2247 return Ok(());
2248 };
2249 if let Err(e) = Self::verify_first_ledger_event(
2250 ctx,
2251 &first,
2252 hash,
2253 Metadata::from(self.clone()),
2254 )
2255 .await
2256 {
2257 return Err(ActorError::Functional {
2258 description: e.to_string(),
2259 });
2260 }
2261
2262 self.on_event(first.clone(), ctx).await;
2263 Self::register(
2264 ctx,
2265 RegisterMessage::RegisterGov {
2266 gov_id: self.subject_metadata.subject_id.to_string(),
2267 name: self.subject_metadata.name.clone(),
2268 description: self.subject_metadata.description.clone(),
2269 },
2270 )
2271 .await?;
2272
2273 self.first_role_register(ctx).await?;
2274
2275 let (issuer, event_request_timestamp) =
2276 first.get_issuer_event_request_timestamp();
2277 let event_request = first.get_event_request();
2278
2279 Self::event_to_sink(
2280 ctx,
2281 DataForSink {
2282 gov_id: None,
2283 subject_id: self.subject_metadata.subject_id.to_string(),
2284 sn: self.subject_metadata.sn,
2285 owner: self.subject_metadata.owner.to_string(),
2286 namespace: String::default(),
2287 schema_id: self.subject_metadata.schema_id.clone(),
2288 issuer,
2289 event_ledger_timestamp: first
2290 .ledger_seal_signature
2291 .timestamp
2292 .as_nanos(),
2293 event_request_timestamp,
2294 gov_version: first.gov_version,
2295 event_data_ledger: EventLedgerDataForSink::build(
2296 &first.protocols,
2297 &self.properties.to_value_wrapper().0,
2298 ),
2299 },
2300 event_request,
2301 )
2302 .await?;
2303
2304 first
2305 };
2306
2307 for event in iter {
2308 let actual_ledger_hash =
2309 last_ledger.ledger_hash(*hash).map_err(|e| {
2310 ActorError::FunctionalCritical {
2311 description: format!(
2312 "Can not creacte actual ledger event hash: {}",
2313 e
2314 ),
2315 }
2316 })?;
2317
2318 let last_data = LastData {
2319 gov_version: last_ledger.gov_version,
2320 vali_data: last_ledger.protocols.get_validation_data(),
2321 };
2322
2323 let last_event_is_ok = match Self::verify_new_ledger_event(
2324 ctx,
2325 Self::verify_new_ledger_event_args(
2326 &event,
2327 Metadata::from(self.clone()),
2328 actual_ledger_hash,
2329 last_data,
2330 hash,
2331 true,
2332 false,
2333 ),
2334 )
2335 .await
2336 {
2337 Ok(last_event_is_ok) => last_event_is_ok,
2338 Err(e) => {
2339 if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
2341 continue;
2343 } else {
2344 return Err(ActorError::Functional {
2345 description: e.to_string(),
2346 });
2347 }
2348 }
2349 };
2350
2351 let Some(event_request) = event.get_event_request() else {
2352 error!(
2353 subject_id = %self.subject_metadata.subject_id,
2354 sn = event.sn,
2355 "Governance replay event is missing clear event_request"
2356 );
2357 return Err(ActorError::Functional {
2358 description:
2359 "Governance replay event is missing clear event_request"
2360 .to_owned(),
2361 });
2362 };
2363
2364 let (update_fact, update_confirm) = if last_event_is_ok {
2365 match &event_request {
2366 EventRequest::Transfer(transfer_request) => {
2367 self.transfer(
2368 ctx,
2369 transfer_request.new_owner.clone(),
2370 0,
2371 )
2372 .await?;
2373
2374 self.update_gov_version(ctx).await?;
2375 }
2376 EventRequest::Reject(..) => {
2377 self.reject(ctx, 0).await?;
2378
2379 self.update_gov_version(ctx).await?;
2380 }
2381 EventRequest::EOL(..) => {
2382 self.eol(ctx).await?;
2383
2384 Self::register(
2385 ctx,
2386 RegisterMessage::EOLGov {
2387 gov_id: self
2388 .subject_metadata
2389 .subject_id
2390 .to_string(),
2391 },
2392 )
2393 .await?;
2394
2395 self.update_gov_version(ctx).await?;
2396 }
2397 _ => {}
2398 };
2399
2400 let update_confirm =
2401 if let EventRequest::Confirm(..) = &event_request {
2402 self.confirm(
2403 ctx,
2404 event.ledger_seal_signature.signer.clone(),
2405 0,
2406 )
2407 .await?;
2408
2409 if let Some(new_owner_key) =
2410 &self.subject_metadata.new_owner
2411 {
2412 Some(self.properties.roles_update_remove_confirm(
2413 &self.subject_metadata.owner,
2414 new_owner_key,
2415 ))
2416 } else {
2417 None
2418 }
2419 } else {
2420 None
2421 };
2422
2423 let update_fact = if let EventRequest::Fact(fact_request) =
2424 &event_request
2425 {
2426 let governance_event = serde_json::from_value::<GovernanceEvent>(fact_request.payload.0.clone()).map_err(|e| {
2427 ActorError::FunctionalCritical{description: format!("Can not convert payload into governance event in governance fact event: {}", e)}
2428 })?;
2429
2430 let rm_members = governance_event
2431 .members
2432 .as_ref()
2433 .map_or_else(|| None, |members| members.remove.clone());
2434
2435 let rm_schemas = governance_event
2436 .schemas
2437 .as_ref()
2438 .map_or_else(|| None, |schemas| schemas.remove.clone());
2439
2440 let rm_roles =
2441 if rm_members.is_some() || rm_schemas.is_some() {
2442 Some(self.properties.roles_update_remove_fact(
2443 rm_members, rm_schemas,
2444 ))
2445 } else {
2446 None
2447 };
2448
2449 let creator_update = governance_event_update_creator_change(
2450 &governance_event,
2451 &self.properties.members,
2452 &self.properties.roles_schema,
2453 );
2454
2455 Some((governance_event, creator_update, rm_roles))
2456 } else {
2457 None
2458 };
2459 (update_fact, update_confirm)
2460 } else {
2461 (None, None)
2462 };
2463
2464 self.on_event(event.clone(), ctx).await;
2466
2467 let (issuer, event_request_timestamp) =
2468 event.get_issuer_event_request_timestamp();
2469 Self::event_to_sink(
2470 ctx,
2471 DataForSink {
2472 gov_id: None,
2473 subject_id: self.subject_metadata.subject_id.to_string(),
2474 sn: self.subject_metadata.sn,
2475 owner: self.subject_metadata.owner.to_string(),
2476 namespace: String::default(),
2477 schema_id: self.subject_metadata.schema_id.clone(),
2478 issuer,
2479 event_ledger_timestamp: event
2480 .ledger_seal_signature
2481 .timestamp
2482 .as_nanos(),
2483 event_request_timestamp,
2484 gov_version: event.gov_version,
2485 event_data_ledger: EventLedgerDataForSink::build(
2486 &event.protocols,
2487 &self.properties.to_value_wrapper().0,
2488 ),
2489 },
2490 Some(event_request.clone()),
2491 )
2492 .await?;
2493
2494 if let Some((event, creator_update, rm_roles)) = update_fact {
2495 let update = governance_event_roles_update_fact(
2496 &event,
2497 &self.properties.members,
2498 rm_roles,
2499 );
2500
2501 self.update_registers_fact(ctx, update, creator_update)
2502 .await?;
2503 }
2504
2505 if let Some(update_confirm) = update_confirm {
2506 self.update_registers_confirm(ctx, update_confirm).await?;
2507 }
2508
2509 last_ledger = event.clone();
2511 }
2512
2513 Ok(())
2514 }
2515
2516 async fn delete_tracker_references(
2517 &self,
2518 ctx: &mut ActorContext<Self>,
2519 subject_id: DigestIdentifier,
2520 ) -> Result<(), ActorError> {
2521 let mut cleanup_errors = Vec::new();
2522
2523 let subject_register = match ctx
2524 .create_child("subject_register", SubjectRegister::initial(()))
2525 .await
2526 {
2527 Ok(actor) => Some(actor),
2528 Err(ActorError::Exists { .. }) => {
2529 match ctx.get_child::<SubjectRegister>("subject_register").await
2530 {
2531 Ok(actor) => Some(actor),
2532 Err(error) => {
2533 cleanup_errors
2534 .push(format!("subject_register lookup: {error}"));
2535 None
2536 }
2537 }
2538 }
2539 Err(error) => {
2540 cleanup_errors.push(format!("subject_register: {error}"));
2541 None
2542 }
2543 };
2544
2545 if let Some(subject_register) = subject_register {
2546 match subject_register
2547 .ask(SubjectRegisterMessage::DeleteSubject {
2548 subject_id: subject_id.clone(),
2549 })
2550 .await
2551 {
2552 Ok(SubjectRegisterResponse::Ok) => {}
2553 Ok(other) => cleanup_errors.push(format!(
2554 "subject_register: unexpected response {other:?}"
2555 )),
2556 Err(error) => {
2557 cleanup_errors.push(format!("subject_register: {error}"))
2558 }
2559 }
2560
2561 if let Err(error) = subject_register.ask_stop().await {
2562 cleanup_errors.push(format!("subject_register stop: {error}"));
2563 }
2564 }
2565
2566 let sn_register = match ctx
2567 .create_child("sn_register", SnRegister::initial(()))
2568 .await
2569 {
2570 Ok(actor) => Some(actor),
2571 Err(ActorError::Exists { .. }) => {
2572 match ctx.get_child::<SnRegister>("sn_register").await {
2573 Ok(actor) => Some(actor),
2574 Err(error) => {
2575 cleanup_errors
2576 .push(format!("sn_register lookup: {error}"));
2577 None
2578 }
2579 }
2580 }
2581 Err(error) => {
2582 cleanup_errors.push(format!("sn_register: {error}"));
2583 None
2584 }
2585 };
2586
2587 if let Some(sn_register) = sn_register {
2588 match sn_register
2589 .ask(SnRegisterMessage::DeleteSubject {
2590 subject_id: subject_id.clone(),
2591 })
2592 .await
2593 {
2594 Ok(SnRegisterResponse::Ok) => {}
2595 Ok(other) => cleanup_errors.push(format!(
2596 "sn_register: unexpected response {other:?}"
2597 )),
2598 Err(error) => {
2599 cleanup_errors.push(format!("sn_register: {error}"))
2600 }
2601 }
2602
2603 if let Err(error) = sn_register.ask_stop().await {
2604 cleanup_errors.push(format!("sn_register stop: {error}"));
2605 }
2606 }
2607
2608 let witnesses_register = match ctx
2609 .create_child(
2610 "witnesses_register",
2611 WitnessesRegister::initial(Self::ledger_batch_size(ctx).await?),
2612 )
2613 .await
2614 {
2615 Ok(actor) => Some(actor),
2616 Err(ActorError::Exists { .. }) => {
2617 match ctx
2618 .get_child::<WitnessesRegister>("witnesses_register")
2619 .await
2620 {
2621 Ok(actor) => Some(actor),
2622 Err(error) => {
2623 cleanup_errors.push(format!(
2624 "witnesses_register lookup: {error}"
2625 ));
2626 None
2627 }
2628 }
2629 }
2630 Err(error) => {
2631 cleanup_errors.push(format!("witnesses_register: {error}"));
2632 None
2633 }
2634 };
2635
2636 if let Some(witnesses_register) = witnesses_register {
2637 match witnesses_register
2638 .ask(WitnessesRegisterMessage::DeleteSubject {
2639 subject_id: subject_id.clone(),
2640 })
2641 .await
2642 {
2643 Ok(WitnessesRegisterResponse::Ok) => {}
2644 Ok(_) => cleanup_errors
2645 .push("witnesses_register: unexpected response".to_owned()),
2646 Err(error) => {
2647 cleanup_errors.push(format!("witnesses_register: {error}"))
2648 }
2649 }
2650
2651 if let Err(error) = witnesses_register.ask_stop().await {
2652 cleanup_errors
2653 .push(format!("witnesses_register stop: {error}"));
2654 }
2655 }
2656
2657 if cleanup_errors.is_empty() {
2658 Ok(())
2659 } else {
2660 Err(ActorError::Functional {
2661 description: cleanup_errors.join("; "),
2662 })
2663 }
2664 }
2665
2666 async fn delete_governance_storage(
2667 &self,
2668 ctx: &mut ActorContext<Self>,
2669 ) -> Result<(), ActorError> {
2670 let mut cleanup_errors = Vec::new();
2671
2672 if self.properties.has_this_role(HashThisRole::Gov {
2673 who: (*self.our_key).clone(),
2674 role: RoleTypes::Approver,
2675 }) {
2676 let hash = self.hash.map_or_else(
2677 || {
2678 cleanup_errors
2679 .push("approver init: missing hash".to_owned());
2680 None
2681 },
2682 Some,
2683 );
2684
2685 let network = ctx
2686 .system()
2687 .get_helper::<Arc<NetworkSender>>("network")
2688 .await
2689 .map_or_else(
2690 || {
2691 cleanup_errors.push(
2692 "approver init: missing network helper".to_owned(),
2693 );
2694 None
2695 },
2696 Some,
2697 );
2698
2699 if let (Some(hash), Some(network)) = (hash, network) {
2700 let approver = match ctx
2701 .get_child::<ApprPersist>("approver")
2702 .await
2703 {
2704 Ok(actor) => Some(actor),
2705 Err(_) => match self
2706 .up_approver_only(ctx, &hash, &network)
2707 .await
2708 {
2709 Ok(()) => match ctx
2710 .get_child::<ApprPersist>("approver")
2711 .await
2712 {
2713 Ok(actor) => Some(actor),
2714 Err(error) => {
2715 cleanup_errors
2716 .push(format!("approver lookup: {error}"));
2717 None
2718 }
2719 },
2720 Err(error) => {
2721 cleanup_errors.push(format!("approver: {error}"));
2722 None
2723 }
2724 },
2725 };
2726
2727 if let Some(approver) = approver {
2728 match approver
2729 .ask(crate::approval::persist::ApprPersistMessage::PurgeStorage)
2730 .await
2731 {
2732 Ok(crate::approval::persist::ApprPersistResponse::Ok) => {}
2733 Ok(_) => cleanup_errors
2734 .push("approver: unexpected response".to_owned()),
2735 Err(error) => {
2736 cleanup_errors.push(format!("approver: {error}"))
2737 }
2738 }
2739
2740 if let Err(error) = approver.ask_stop().await {
2741 cleanup_errors.push(format!("approver stop: {error}"));
2742 }
2743 }
2744 }
2745 }
2746
2747 let contract_register = match ctx
2748 .create_child("contract_register", ContractRegister::initial(()))
2749 .await
2750 {
2751 Ok(actor) => Some(actor),
2752 Err(ActorError::Exists { .. }) => {
2753 match ctx
2754 .get_child::<ContractRegister>("contract_register")
2755 .await
2756 {
2757 Ok(actor) => Some(actor),
2758 Err(error) => {
2759 cleanup_errors
2760 .push(format!("contract_register lookup: {error}"));
2761 None
2762 }
2763 }
2764 }
2765 Err(error) => {
2766 cleanup_errors.push(format!("contract_register: {error}"));
2767 None
2768 }
2769 };
2770
2771 if let Some(contract_register) = contract_register {
2772 if let Err(error) = self
2773 .delete_all_contract_artifacts(ctx, &contract_register)
2774 .await
2775 {
2776 cleanup_errors.push(format!("contract_artifacts: {error}"));
2777 }
2778
2779 match contract_register
2780 .ask(ContractRegisterMessage::PurgeStorage)
2781 .await
2782 {
2783 Ok(ContractRegisterResponse::Ok) => {}
2784 Ok(other) => cleanup_errors.push(format!(
2785 "contract_register: unexpected response {other:?}"
2786 )),
2787 Err(error) => {
2788 cleanup_errors.push(format!("contract_register: {error}"))
2789 }
2790 }
2791
2792 if let Err(error) = contract_register.ask_stop().await {
2793 cleanup_errors.push(format!("contract_register stop: {error}"));
2794 }
2795 }
2796
2797 let role_register = match ctx
2798 .create_child("role_register", RoleRegister::initial(()))
2799 .await
2800 {
2801 Ok(actor) => Some(actor),
2802 Err(ActorError::Exists { .. }) => {
2803 match ctx.get_child::<RoleRegister>("role_register").await {
2804 Ok(actor) => Some(actor),
2805 Err(error) => {
2806 cleanup_errors
2807 .push(format!("role_register lookup: {error}"));
2808 None
2809 }
2810 }
2811 }
2812 Err(error) => {
2813 cleanup_errors.push(format!("role_register: {error}"));
2814 None
2815 }
2816 };
2817
2818 if let Some(role_register) = role_register {
2819 match role_register.ask(RoleRegisterMessage::PurgeStorage).await {
2820 Ok(RoleRegisterResponse::Ok) => {}
2821 Ok(other) => cleanup_errors.push(format!(
2822 "role_register: unexpected response {other:?}"
2823 )),
2824 Err(error) => {
2825 cleanup_errors.push(format!("role_register: {error}"))
2826 }
2827 }
2828
2829 if let Err(error) = role_register.ask_stop().await {
2830 cleanup_errors.push(format!("role_register stop: {error}"));
2831 }
2832 }
2833
2834 let subject_register = match ctx
2835 .create_child("subject_register", SubjectRegister::initial(()))
2836 .await
2837 {
2838 Ok(actor) => Some(actor),
2839 Err(ActorError::Exists { .. }) => {
2840 match ctx.get_child::<SubjectRegister>("subject_register").await
2841 {
2842 Ok(actor) => Some(actor),
2843 Err(error) => {
2844 cleanup_errors
2845 .push(format!("subject_register lookup: {error}"));
2846 None
2847 }
2848 }
2849 }
2850 Err(error) => {
2851 cleanup_errors.push(format!("subject_register: {error}"));
2852 None
2853 }
2854 };
2855
2856 if let Some(subject_register) = subject_register {
2857 match subject_register
2858 .ask(SubjectRegisterMessage::PurgeStorage)
2859 .await
2860 {
2861 Ok(SubjectRegisterResponse::Ok) => {}
2862 Ok(other) => cleanup_errors.push(format!(
2863 "subject_register: unexpected response {other:?}"
2864 )),
2865 Err(error) => {
2866 cleanup_errors.push(format!("subject_register: {error}"))
2867 }
2868 }
2869
2870 if let Err(error) = subject_register.ask_stop().await {
2871 cleanup_errors.push(format!("subject_register stop: {error}"));
2872 }
2873 }
2874
2875 let sn_register = match ctx
2876 .create_child("sn_register", SnRegister::initial(()))
2877 .await
2878 {
2879 Ok(actor) => Some(actor),
2880 Err(ActorError::Exists { .. }) => {
2881 match ctx.get_child::<SnRegister>("sn_register").await {
2882 Ok(actor) => Some(actor),
2883 Err(error) => {
2884 cleanup_errors
2885 .push(format!("sn_register lookup: {error}"));
2886 None
2887 }
2888 }
2889 }
2890 Err(error) => {
2891 cleanup_errors.push(format!("sn_register: {error}"));
2892 None
2893 }
2894 };
2895
2896 if let Some(sn_register) = sn_register {
2897 match sn_register.ask(SnRegisterMessage::PurgeStorage).await {
2898 Ok(SnRegisterResponse::Ok) => {}
2899 Ok(other) => cleanup_errors.push(format!(
2900 "sn_register: unexpected response {other:?}"
2901 )),
2902 Err(error) => {
2903 cleanup_errors.push(format!("sn_register: {error}"))
2904 }
2905 }
2906
2907 if let Err(error) = sn_register.ask_stop().await {
2908 cleanup_errors.push(format!("sn_register stop: {error}"));
2909 }
2910 }
2911
2912 let witnesses_register = match ctx
2913 .create_child(
2914 "witnesses_register",
2915 WitnessesRegister::initial(Self::ledger_batch_size(ctx).await?),
2916 )
2917 .await
2918 {
2919 Ok(actor) => Some(actor),
2920 Err(ActorError::Exists { .. }) => {
2921 match ctx
2922 .get_child::<WitnessesRegister>("witnesses_register")
2923 .await
2924 {
2925 Ok(actor) => Some(actor),
2926 Err(error) => {
2927 cleanup_errors.push(format!(
2928 "witnesses_register lookup: {error}"
2929 ));
2930 None
2931 }
2932 }
2933 }
2934 Err(error) => {
2935 cleanup_errors.push(format!("witnesses_register: {error}"));
2936 None
2937 }
2938 };
2939
2940 if let Some(witnesses_register) = witnesses_register {
2941 match witnesses_register
2942 .ask(WitnessesRegisterMessage::PurgeStorage)
2943 .await
2944 {
2945 Ok(WitnessesRegisterResponse::Ok) => {}
2946 Ok(_) => cleanup_errors
2947 .push("witnesses_register: unexpected response".to_owned()),
2948 Err(error) => {
2949 cleanup_errors.push(format!("witnesses_register: {error}"))
2950 }
2951 }
2952
2953 if let Err(error) = witnesses_register.ask_stop().await {
2954 cleanup_errors
2955 .push(format!("witnesses_register stop: {error}"));
2956 }
2957 }
2958
2959 if let Err(error) = purge_storage(ctx).await {
2960 cleanup_errors.push(format!("governance: {error}"));
2961 }
2962
2963 if cleanup_errors.is_empty() {
2964 Ok(())
2965 } else {
2966 Err(ActorError::Functional {
2967 description: cleanup_errors.join("; "),
2968 })
2969 }
2970 }
2971}
2972
2973#[derive(Debug, Clone)]
2975pub enum GovernanceMessage {
2976 GetMetadata,
2977 GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
2978 GetLastLedger,
2979 DeleteTrackerReferences { subject_id: DigestIdentifier },
2980 DeleteGovernanceStorage,
2981 UpdateLedger { events: Vec<Ledger> },
2982 GetGovernance,
2983 GetVersion,
2984}
2985
2986impl Message for GovernanceMessage {}
2987
2988#[derive(Debug, Clone)]
2989pub enum GovernanceResponse {
2990 Metadata(Box<Metadata>),
2992 UpdateResult(u64, PublicKey, Option<PublicKey>),
2993 Ledger {
2994 ledger: Vec<Ledger>,
2995 is_all: bool,
2996 },
2997 LastLedger {
2998 ledger_event: Box<Option<Ledger>>,
2999 },
3000 Governance(Box<GovernanceData>),
3001 NewCompilers(Vec<SchemaType>),
3002 Sn(u64),
3003 Version(u64),
3004 Ok,
3005}
3006impl Response for GovernanceResponse {}
3007
3008#[async_trait]
3009impl Actor for Governance {
3010 type Event = Ledger;
3011 type Message = GovernanceMessage;
3012 type Response = GovernanceResponse;
3013
3014 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
3015 parent_span.map_or_else(
3016 || info_span!("Governance", id),
3017 |parent_span| info_span!(parent: parent_span, "Governance", id),
3018 )
3019 }
3020
3021 async fn pre_start(
3022 &mut self,
3023 ctx: &mut ActorContext<Self>,
3024 ) -> Result<(), ActorError> {
3025 if let Err(e) = self.init_store("governance", None, true, ctx).await {
3026 error!(
3027 error = %e,
3028 "Failed to initialize governance store"
3029 );
3030 return Err(e);
3031 }
3032
3033 let safe_mode = if let Some(config) =
3034 ctx.system().get_helper::<ConfigHelper>("config").await
3035 {
3036 config.safe_mode
3037 } else {
3038 return Err(ActorError::Helper {
3039 name: "config".to_owned(),
3040 reason: "Not found".to_owned(),
3041 });
3042 };
3043
3044 if safe_mode {
3045 let Some(hash) = self.hash else {
3046 error!("Hash algorithm not found");
3047 return Err(ActorError::FunctionalCritical {
3048 description: "Hash algorithm is None".to_string(),
3049 });
3050 };
3051
3052 let Some(network) = ctx
3053 .system()
3054 .get_helper::<Arc<NetworkSender>>("network")
3055 .await
3056 else {
3057 error!("Network helper not found");
3058 return Err(ActorError::Helper {
3059 name: "network".to_owned(),
3060 reason: "Not found".to_owned(),
3061 });
3062 };
3063
3064 self.up_approver_only(ctx, &hash, &network).await?;
3065 return Ok(());
3066 }
3067
3068 let Some(hash) = self.hash else {
3069 error!("Hash algorithm not found");
3070 return Err(ActorError::FunctionalCritical {
3071 description: "Hash algorithm is None".to_string(),
3072 });
3073 };
3074
3075 let Some(ext_db): Option<Arc<ExternalDB>> =
3076 ctx.system().get_helper("ext_db").await
3077 else {
3078 error!("External database helper not found");
3079 return Err(ActorError::Helper {
3080 name: "ext_db".to_owned(),
3081 reason: "Not found".to_owned(),
3082 });
3083 };
3084
3085 let Some(ave_sink): Option<AveSink> =
3086 ctx.system().get_helper("sink").await
3087 else {
3088 error!("Sink helper not found");
3089 return Err(ActorError::Helper {
3090 name: "sink".to_owned(),
3091 reason: "Not found".to_owned(),
3092 });
3093 };
3094
3095 let Some(network) = ctx
3096 .system()
3097 .get_helper::<Arc<NetworkSender>>("network")
3098 .await
3099 else {
3100 error!("Network helper not found");
3101 return Err(ActorError::Helper {
3102 name: "network".to_owned(),
3103 reason: "Not found".to_owned(),
3104 });
3105 };
3106
3107 if let Err(e) = ctx
3108 .create_child("role_register", RoleRegister::initial(()))
3109 .await
3110 {
3111 error!(
3112 error = %e,
3113 "Failed to create role_register child"
3114 );
3115 return Err(e);
3116 }
3117
3118 if let Err(e) = ctx
3119 .create_child("subject_register", SubjectRegister::initial(()))
3120 .await
3121 {
3122 error!(
3123 error = %e,
3124 "Failed to create subject_register child"
3125 );
3126 return Err(e);
3127 }
3128
3129 if let Err(e) = ctx
3130 .create_child("sn_register", SnRegister::initial(()))
3131 .await
3132 {
3133 error!(
3134 error = %e,
3135 "Failed to create sn_register child"
3136 );
3137 return Err(e);
3138 }
3139
3140 if let Err(e) = ctx
3141 .create_child(
3142 "witnesses_register",
3143 WitnessesRegister::initial(Self::ledger_batch_size(ctx).await?),
3144 )
3145 .await
3146 {
3147 error!(
3148 error = %e,
3149 "Failed to create witnesses_register child"
3150 );
3151 return Err(e);
3152 }
3153
3154 if let Err(e) = ctx
3155 .create_child("contract_register", ContractRegister::initial(()))
3156 .await
3157 {
3158 error!(
3159 error = %e,
3160 "Failed to create contract_register child"
3161 );
3162 return Err(e);
3163 }
3164
3165 if self.subject_metadata.active {
3166 if let Err(e) = self.build_childs(ctx, &hash, &network).await {
3167 error!(
3168 error = %e,
3169 "Failed to build governance child actors"
3170 );
3171 return Err(e);
3172 }
3173
3174 let sink_actor = match ctx
3175 .create_child(
3176 "sink_data",
3177 SinkData {
3178 public_key: self.our_key.to_string(),
3179 },
3180 )
3181 .await
3182 {
3183 Ok(actor) => actor,
3184 Err(e) => {
3185 error!(
3186 error = %e,
3187 "Failed to create sink_data child"
3188 );
3189 return Err(e);
3190 }
3191 };
3192 let sink =
3193 Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
3194 ctx.system().run_sink(sink).await;
3195
3196 let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
3197 ctx.system().run_sink(sink).await;
3198 }
3199
3200 if self.service {
3201 let Some(config): Option<ConfigHelper> =
3202 ctx.system().get_helper("config").await
3203 else {
3204 error!("Config helper not found");
3205 return Err(ActorError::Helper {
3206 name: "config".to_owned(),
3207 reason: "Not found".to_owned(),
3208 });
3209 };
3210
3211 let version_sync_tick_interval = Duration::from_secs(
3212 config.sync_governance.interval_secs.max(1),
3213 );
3214 let version_sync_response_timeout = Duration::from_secs(
3215 config.sync_governance.response_timeout_secs.max(1),
3216 );
3217 let tracker_sync_tick_interval =
3218 Duration::from_secs(config.sync_tracker.interval_secs.max(1));
3219 let tracker_sync_response_timeout = Duration::from_secs(
3220 config.sync_tracker.response_timeout_secs.max(1),
3221 );
3222 let tracker_sync_update_timeout = Duration::from_secs(
3223 config.sync_tracker.update_timeout_secs.max(1),
3224 );
3225
3226 if let Err(e) = ctx
3227 .create_child(
3228 "tracker_sync",
3229 TrackerSync::new(
3230 self.subject_metadata.subject_id.clone(),
3231 self.our_key.clone(),
3232 network.clone(),
3233 TrackerSyncConfig {
3234 service: self.service,
3235 tick_interval: tracker_sync_tick_interval,
3236 response_timeout: tracker_sync_response_timeout,
3237 page_size: config.sync_tracker.page_size,
3238 update_batch_size: config
3239 .sync_tracker
3240 .update_batch_size,
3241 update_timeout: tracker_sync_update_timeout,
3242 },
3243 ),
3244 )
3245 .await
3246 {
3247 error!(
3248 error = %e,
3249 subject_id = %self.subject_metadata.subject_id,
3250 "Failed to create tracker_sync child"
3251 );
3252 return Err(e);
3253 }
3254
3255 let version_sync = ctx
3256 .create_child(
3257 "version_sync",
3258 GovernanceVersionSync::new(
3259 self.subject_metadata.subject_id.clone(),
3260 self.our_key.clone(),
3261 network.clone(),
3262 self.properties.version,
3263 config.sync_governance.sample_size,
3264 version_sync_tick_interval,
3265 version_sync_response_timeout,
3266 ),
3267 )
3268 .await?;
3269
3270 let governance_peers = self
3271 .properties
3272 .get_witnesses(WitnessesData::Gov)
3273 .map_err(|e| ActorError::Functional {
3274 description: e.to_string(),
3275 })?;
3276
3277 let _ = version_sync
3278 .ask(GovernanceVersionSyncMessage::RefreshGovernance {
3279 version: self.properties.version,
3280 governance_peers,
3281 })
3282 .await?;
3283 }
3284
3285 Ok(())
3286 }
3287}
3288
3289#[async_trait]
3290impl Handler<Self> for Governance {
3291 async fn handle_message(
3292 &mut self,
3293 _sender: ActorPath,
3294 msg: GovernanceMessage,
3295 ctx: &mut ActorContext<Self>,
3296 ) -> Result<GovernanceResponse, ActorError> {
3297 match msg {
3298 GovernanceMessage::GetVersion => {
3299 Ok(GovernanceResponse::Version(self.properties.version))
3300 }
3301 GovernanceMessage::GetLedger { lo_sn, hi_sn } => {
3302 let (ledger, is_all) =
3303 self.get_ledger(ctx, lo_sn, hi_sn).await?;
3304 Ok(GovernanceResponse::Ledger { ledger, is_all })
3305 }
3306 GovernanceMessage::GetLastLedger => {
3307 let ledger_event = self.get_last_ledger(ctx).await?;
3308 Ok(GovernanceResponse::LastLedger {
3309 ledger_event: Box::new(ledger_event),
3310 })
3311 }
3312 GovernanceMessage::GetMetadata => Ok(GovernanceResponse::Metadata(
3313 Box::new(Metadata::from(self.clone())),
3314 )),
3315 GovernanceMessage::DeleteTrackerReferences { subject_id } => {
3316 self.delete_tracker_references(ctx, subject_id.clone())
3317 .await?;
3318
3319 debug!(
3320 msg_type = "DeleteTrackerReferences",
3321 subject_id = %subject_id,
3322 governance_id = %self.subject_metadata.subject_id,
3323 "Tracker references deleted from governance"
3324 );
3325
3326 Ok(GovernanceResponse::Ok)
3327 }
3328 GovernanceMessage::DeleteGovernanceStorage => {
3329 self.delete_governance_storage(ctx).await?;
3330
3331 debug!(
3332 msg_type = "DeleteGovernanceStorage",
3333 governance_id = %self.subject_metadata.subject_id,
3334 "Governance storage deleted"
3335 );
3336
3337 Ok(GovernanceResponse::Ok)
3338 }
3339 GovernanceMessage::UpdateLedger { events } => {
3340 let events_count = events.len();
3341 if let Err(e) =
3342 self.manager_new_ledger_events(ctx, events).await
3343 {
3344 warn!(
3345 msg_type = "UpdateLedger",
3346 error = %e,
3347 subject_id = %self.subject_metadata.subject_id,
3348 events_count = events_count,
3349 "Failed to verify new ledger events"
3350 );
3351 return Err(e);
3352 };
3353
3354 debug!(
3355 msg_type = "UpdateLedger",
3356 subject_id = %self.subject_metadata.subject_id,
3357 sn = self.subject_metadata.sn,
3358 events_count = events_count,
3359 "Ledger updated successfully"
3360 );
3361
3362 Ok(GovernanceResponse::UpdateResult(
3363 self.subject_metadata.sn,
3364 self.subject_metadata.owner.clone(),
3365 self.subject_metadata.new_owner.clone(),
3366 ))
3367 }
3368 GovernanceMessage::GetGovernance => {
3369 Ok(GovernanceResponse::Governance(Box::new(
3370 self.properties.clone(),
3371 )))
3372 }
3373 }
3374 }
3375
3376 async fn on_event(&mut self, event: Ledger, ctx: &mut ActorContext<Self>) {
3377 if let Err(e) = self.persist(&event, ctx).await {
3378 error!(
3379 error = %e,
3380 subject_id = %self.subject_metadata.subject_id,
3381 sn = self.subject_metadata.sn,
3382 "Failed to persist event"
3383 );
3384 emit_fail(ctx, e).await;
3385 };
3386
3387 if let Err(e) = ctx.publish_event(event).await {
3388 error!(
3389 error = %e,
3390 subject_id = %self.subject_metadata.subject_id,
3391 sn = self.subject_metadata.sn,
3392 "Failed to publish event"
3393 );
3394 emit_fail(ctx, e).await;
3395 } else {
3396 debug!(
3397 subject_id = %self.subject_metadata.subject_id,
3398 sn = self.subject_metadata.sn,
3399 "Event persisted and published successfully"
3400 );
3401 }
3402 }
3403
3404 async fn on_child_fault(
3405 &mut self,
3406 error: ActorError,
3407 ctx: &mut ActorContext<Self>,
3408 ) -> ChildAction {
3409 error!(
3410 error = %error,
3411 subject_id = %self.subject_metadata.subject_id,
3412 "Child fault occurred"
3413 );
3414 emit_fail(ctx, error).await;
3415 ChildAction::Stop
3416 }
3417}
3418
3419#[async_trait]
3420impl PersistentActor for Governance {
3421 type Persistence = FullPersistence;
3422 type InitParams = (
3423 Option<(SubjectMetadata, GovernanceData)>,
3424 Arc<PublicKey>,
3425 HashAlgorithm,
3426 bool,
3427 );
3428
3429 fn update(&mut self, state: Self) {
3430 self.properties = state.properties;
3431 self.subject_metadata = state.subject_metadata;
3432 }
3433
3434 fn create_initial(params: Self::InitParams) -> Self {
3435 let (subject_metadata, properties) =
3436 if let Some((subject_metadata, properties)) = params.0 {
3437 (subject_metadata, properties)
3438 } else {
3439 Default::default()
3440 };
3441 Self {
3442 hash: Some(params.2),
3443 our_key: params.1,
3444 service: params.3,
3445 subject_metadata,
3446 properties,
3447 }
3448 }
3449
3450 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3451 match &event.protocols {
3452 Protocols::Create {
3453 validation,
3454 event_request,
3455 } => {
3456 if let EventRequest::Create(..) = event_request.content() {
3457 } else {
3458 error!(
3459 event_type = "Create",
3460 subject_id = %self.subject_metadata.subject_id,
3461 actual_request = ?event_request.content(),
3462 "Unexpected event request type for governance create apply"
3463 );
3464 return Err(ActorError::Functional {
3465 description:
3466 "In create event, event request must be Create"
3467 .to_owned(),
3468 });
3469 };
3470
3471 if let ValidationMetadata::Metadata(metadata) =
3472 &validation.validation_metadata
3473 {
3474 self.subject_metadata = SubjectMetadata::new(metadata);
3475 self.properties = serde_json::from_value::<GovernanceData>(
3476 metadata.properties.0.clone(),
3477 )
3478 .map_err(|e| {
3479 error!(
3480 event_type = "Create",
3481 subject_id = %self.subject_metadata.subject_id,
3482 error = %e,
3483 "Failed to convert properties into GovernanceData"
3484 );
3485 ActorError::Functional { description: format!("In create event, can not convert properties into GovernanceData: {e}")}
3486 })?;
3487
3488 debug!(
3489 event_type = "Create",
3490 subject_id = %self.subject_metadata.subject_id,
3491 sn = self.subject_metadata.sn,
3492 "Applied create event"
3493 );
3494 } else {
3495 error!(
3496 event_type = "Create",
3497 "Validation metadata must be Metadata type"
3498 );
3499 return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
3500 }
3501
3502 return Ok(());
3503 }
3504 Protocols::GovFact {
3505 evaluation,
3506 approval,
3507 event_request,
3508 ..
3509 } => {
3510 if let EventRequest::Fact(..) = event_request.content() {
3511 } else {
3512 error!(
3513 event_type = "Fact",
3514 subject_id = %self.subject_metadata.subject_id,
3515 actual_request = ?event_request.content(),
3516 "Unexpected event request type for governance fact apply"
3517 );
3518 return Err(ActorError::Functional {
3519 description:
3520 "In fact event, event request must be Fact"
3521 .to_owned(),
3522 });
3523 };
3524
3525 if let Some(eval_res) = evaluation.evaluator_response_ok() {
3526 if let Some(appr_res) = approval {
3527 if appr_res.approved {
3528 self.apply_patch(eval_res.patch)?;
3529 debug!(
3530 event_type = "Fact",
3531 subject_id = %self.subject_metadata.subject_id,
3532 approved = true,
3533 "Applied fact event with patch"
3534 );
3535 } else {
3536 debug!(
3537 event_type = "Fact",
3538 subject_id = %self.subject_metadata.subject_id,
3539 approved = false,
3540 "Fact event not approved, patch not applied"
3541 );
3542 }
3543 } else {
3544 error!(
3545 event_type = "Fact",
3546 subject_id = %self.subject_metadata.subject_id,
3547 "Evaluation successful but no approval present"
3548 );
3549 return Err(ActorError::Functional { description: "The evaluation event was successful, but there is no approval".to_owned() });
3550 }
3551 }
3552 }
3553
3554 Protocols::Transfer {
3555 evaluation,
3556 event_request,
3557 ..
3558 } => {
3559 let EventRequest::Transfer(transfer_request) =
3560 event_request.content()
3561 else {
3562 error!(
3563 event_type = "Transfer",
3564 subject_id = %self.subject_metadata.subject_id,
3565 actual_request = ?event_request.content(),
3566 "Unexpected event request type for governance transfer apply"
3567 );
3568 return Err(ActorError::Functional {
3569 description:
3570 "In transfer event, event request must be Transfer"
3571 .to_owned(),
3572 });
3573 };
3574
3575 if evaluation.evaluator_response_ok().is_some() {
3576 self.subject_metadata.new_owner =
3577 Some(transfer_request.new_owner.clone());
3578 debug!(
3579 event_type = "Transfer",
3580 subject_id = %self.subject_metadata.subject_id,
3581 new_owner = %transfer_request.new_owner,
3582 "Applied transfer event"
3583 );
3584 }
3585 }
3586
3587 Protocols::GovConfirm {
3588 evaluation,
3589 event_request,
3590 ..
3591 } => {
3592 if let EventRequest::Confirm(..) = event_request.content() {
3593 } else {
3594 error!(
3595 event_type = "Confirm",
3596 subject_id = %self.subject_metadata.subject_id,
3597 actual_request = ?event_request.content(),
3598 "Unexpected event request type for governance confirm apply"
3599 );
3600 return Err(ActorError::Functional {
3601 description:
3602 "In confirm event, event request must be Confirm"
3603 .to_owned(),
3604 });
3605 };
3606
3607 if let Some(eval_res) = evaluation.evaluator_response_ok() {
3608 if let Some(new_owner) =
3609 self.subject_metadata.new_owner.take()
3610 {
3611 self.subject_metadata.owner = new_owner.clone();
3612 self.apply_patch(eval_res.patch)?;
3613 debug!(
3614 event_type = "Confirm",
3615 subject_id = %self.subject_metadata.subject_id,
3616 new_owner = %new_owner,
3617 "Applied confirm event with patch"
3618 );
3619 } else {
3620 error!(
3621 event_type = "Confirm",
3622 subject_id = %self.subject_metadata.subject_id,
3623 "New owner is None in confirm event"
3624 );
3625 return Err(ActorError::Functional {
3626 description: "In confirm event, new owner is None"
3627 .to_owned(),
3628 });
3629 }
3630 }
3631 }
3632 Protocols::Reject { event_request, .. } => {
3633 if let EventRequest::Reject(..) = event_request.content() {
3634 } else {
3635 error!(
3636 event_type = "Reject",
3637 subject_id = %self.subject_metadata.subject_id,
3638 actual_request = ?event_request.content(),
3639 "Unexpected event request type for governance reject apply"
3640 );
3641 return Err(ActorError::Functional {
3642 description:
3643 "In reject event, event request must be Reject"
3644 .to_owned(),
3645 });
3646 };
3647
3648 self.subject_metadata.new_owner = None;
3649 debug!(
3650 event_type = "Reject",
3651 subject_id = %self.subject_metadata.subject_id,
3652 "Applied reject event"
3653 );
3654 }
3655 Protocols::EOL { event_request, .. } => {
3656 if let EventRequest::EOL(..) = event_request.content() {
3657 } else {
3658 error!(
3659 event_type = "EOL",
3660 subject_id = %self.subject_metadata.subject_id,
3661 actual_request = ?event_request.content(),
3662 "Unexpected event request type for governance eol apply"
3663 );
3664 return Err(ActorError::Functional {
3665 description: "In EOL event, event request must be EOL"
3666 .to_owned(),
3667 });
3668 };
3669
3670 self.subject_metadata.active = false;
3671 debug!(
3672 event_type = "EOL",
3673 subject_id = %self.subject_metadata.subject_id,
3674 "Applied EOL event"
3675 );
3676 }
3677 _ => {
3678 error!(
3679 subject_id = %self.subject_metadata.subject_id,
3680 "Invalid protocol data for Governance"
3681 );
3682 return Err(ActorError::Functional {
3683 description: "Invalid protocol data for Governance"
3684 .to_owned(),
3685 });
3686 }
3687 }
3688
3689 if event.protocols.is_success() {
3690 self.properties.version += 1;
3691 }
3692
3693 self.subject_metadata.sn += 1;
3694 self.subject_metadata.prev_ledger_event_hash =
3695 event.prev_ledger_event_hash.clone();
3696
3697 Ok(())
3698 }
3699}
3700
3701impl Storable for Governance {}