1use crate::{
5 approval::{
6 persist::{ApprPersist, InitApprPersist},
7 types::VotationType,
8 },
9 db::Storable,
10 evaluation::{
11 compiler::{
12 CompilerResponse, ContractCompiler, ContractCompilerMessage,
13 },
14 request::EvalWorkerContext,
15 schema::{EvaluationSchema, EvaluationSchemaMessage},
16 worker::{EvalWorker, EvalWorkerMessage},
17 },
18 governance::{
19 contract_register::{
20 ContractRegister, ContractRegisterMessage, ContractRegisterResponse,
21 },
22 data::GovernanceData,
23 events::{
24 GovernanceEvent, governance_event_roles_update_fact,
25 governance_event_update_creator_change,
26 },
27 model::{
28 CreatorQuantity, CreatorWitness, HashThisRole, ProtocolTypes,
29 Quorum, RoleCreator, RoleTypes, Schema, WitnessesData,
30 },
31 role_register::{
32 CurrentValidationRoles, RoleRegister, RoleRegisterMessage,
33 RoleRegisterResponse,
34 },
35 sn_register::{SnRegister, SnRegisterMessage, SnRegisterResponse},
36 subject_register::{
37 SubjectRegister, SubjectRegisterMessage, SubjectRegisterResponse,
38 },
39 tracker_sync::{TrackerSync, TrackerSyncConfig},
40 version_sync::{GovernanceVersionSync, GovernanceVersionSyncMessage},
41 witnesses_register::{
42 CreatorWitnessGrant, CreatorWitnessRegistration, WitnessesRegister,
43 WitnessesRegisterMessage, WitnessesRegisterResponse, WitnessesType,
44 },
45 },
46 helpers::{db::ExternalDB, network::service::NetworkSender, sink::AveSink},
47 model::{
48 common::{
49 emit_fail, get_last_event, purge_storage, subject::make_obsolete,
50 },
51 event::{Ledger, Protocols, ValidationMetadata},
52 },
53 node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
54 subject::{
55 DataForSink, EventLedgerDataForSink, Metadata, Subject,
56 SubjectMetadata,
57 error::SubjectError,
58 sinkdata::{SinkData, SinkDataMessage},
59 },
60 system::ConfigHelper,
61 validation::{
62 request::LastData,
63 schema::{ValidationSchema, ValidationSchemaMessage},
64 worker::{ValiWorker, ValiWorkerMessage},
65 },
66};
67
68use ave_actors::{
69 Actor, ActorContext, ActorError, ActorPath, ActorRef, ChildAction, Handler,
70 Message, Response, Sink,
71};
72use ave_common::{
73 Namespace, SchemaType, ValueWrapper,
74 identity::{DigestIdentifier, HashAlgorithm, PublicKey},
75 request::EventRequest,
76 response::SubjectDB,
77 schematype::ReservedWords,
78};
79
80use async_trait::async_trait;
81use ave_actors::{FullPersistence, PersistentActor};
82use borsh::{BorshDeserialize, BorshSerialize};
83use json_patch::{Patch, patch};
84use serde::{Deserialize, Serialize};
85use tracing::{Span, debug, error, info_span, warn};
86
87use std::{
88 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
89 sync::Arc,
90 time::Duration,
91};
92use tokio::{fs, sync::RwLock};
93use wasmtime::Module;
94
95pub mod contract_register;
96pub mod data;
97pub mod error;
98pub mod events;
99pub mod model;
100pub mod role_register;
101pub mod sn_register;
102pub mod subject_register;
103pub mod tracker_sync;
104pub mod version_sync;
105pub mod witnesses_register;
106
107type SchemaCreatorsByNamespace =
108 BTreeMap<SchemaType, BTreeMap<PublicKey, BTreeSet<Namespace>>>;
109type SchemaIssuersByNamespace =
110 BTreeMap<SchemaType, (BTreeMap<PublicKey, BTreeSet<Namespace>>, bool)>;
111
112struct SchemaRuntimeRoles<'a> {
113 creators_eval: &'a SchemaCreatorsByNamespace,
114 issuers_eval: &'a SchemaIssuersByNamespace,
115 creators_vali: &'a SchemaCreatorsByNamespace,
116}
117
118pub struct RolesUpdate {
119 pub appr_quorum: Option<Quorum>,
120 pub new_approvers: Vec<PublicKey>,
121 pub remove_approvers: Vec<PublicKey>,
122
123 pub eval_quorum: HashMap<SchemaType, Quorum>,
124 pub new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
125 pub remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
126
127 pub vali_quorum: HashMap<SchemaType, Quorum>,
128 pub new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
129 pub remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
130
131 pub new_creator: HashMap<
132 (SchemaType, String, PublicKey),
133 (CreatorQuantity, Vec<WitnessesType>),
134 >,
135 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
136
137 pub new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
138 pub remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
139}
140
141pub struct RolesUpdateConfirm {
142 pub new_approver: Option<PublicKey>,
143 pub remove_approver: PublicKey,
144
145 pub new_evaluator: Option<PublicKey>,
146 pub remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
147
148 pub new_validator: Option<PublicKey>,
149 pub remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
150
151 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
152 pub remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
153}
154
155pub struct RolesUpdateRemove {
156 pub witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
157 pub creator: HashSet<(SchemaType, String, PublicKey)>,
158 pub approvers: Vec<PublicKey>,
159 pub evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
160 pub validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
161}
162
163pub struct CreatorRoleUpdate {
164 pub new_creator: HashMap<
165 (SchemaType, String, PublicKey),
166 (CreatorQuantity, BTreeSet<CreatorWitness>),
167 >,
168
169 pub update_creator_quantity:
170 HashSet<(SchemaType, String, PublicKey, CreatorQuantity)>,
171
172 pub update_creator_witnesses:
173 HashSet<(SchemaType, String, PublicKey, BTreeSet<CreatorWitness>)>,
174
175 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
176}
177
178#[derive(Default, Debug, Serialize, Deserialize, Clone)]
179pub struct Governance {
180 #[serde(skip)]
181 pub our_key: Arc<PublicKey>,
182 #[serde(skip)]
183 pub service: bool,
184 #[serde(skip)]
185 pub hash: Option<HashAlgorithm>,
186 pub subject_metadata: SubjectMetadata,
187 pub properties: GovernanceData,
188}
189
190impl BorshSerialize for Governance {
191 fn serialize<W: std::io::Write>(
192 &self,
193 writer: &mut W,
194 ) -> std::io::Result<()> {
195 BorshSerialize::serialize(&self.subject_metadata, writer)?;
197 BorshSerialize::serialize(&self.properties, writer)?;
198
199 Ok(())
200 }
201}
202
203impl BorshDeserialize for Governance {
204 fn deserialize_reader<R: std::io::Read>(
205 reader: &mut R,
206 ) -> std::io::Result<Self> {
207 let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
209 let properties = GovernanceData::deserialize_reader(reader)?;
210
211 let our_key = Arc::new(PublicKey::default());
214 let hash = None;
215
216 Ok(Self {
217 hash,
218 our_key,
219 service: false,
220 subject_metadata,
221 properties,
222 })
223 }
224}
225
226#[async_trait]
227impl Subject for Governance {
228 async fn update_sn(
229 &self,
230 ctx: &mut ActorContext<Self>,
231 ) -> Result<(), ActorError> {
232 let witnesses_register = ctx
233 .get_child::<WitnessesRegister>("witnesses_register")
234 .await?;
235
236 witnesses_register
237 .tell(WitnessesRegisterMessage::UpdateSnGov {
238 sn: self.subject_metadata.sn,
239 })
240 .await
241 }
242
243 async fn eol(
244 &self,
245 ctx: &mut ActorContext<Self>,
246 ) -> Result<(), ActorError> {
247 let node_path = ActorPath::from("/user/node");
248 let node = ctx.system().get_actor::<Node>(&node_path).await?;
249 node.tell(NodeMessage::EOLSubject {
250 subject_id: self.subject_metadata.subject_id.clone(),
251 i_owner: *self.our_key == self.subject_metadata.owner,
252 })
253 .await
254 }
255
256 async fn reject(
257 &self,
258 ctx: &mut ActorContext<Self>,
259 _gov_version: u64,
260 ) -> Result<(), ActorError> {
261 let node_path = ActorPath::from("/user/node");
262 let node = ctx.system().get_actor::<Node>(&node_path).await?;
263 node.tell(NodeMessage::RejectTransfer(
264 self.subject_metadata.subject_id.clone(),
265 ))
266 .await
267 }
268
269 async fn confirm(
270 &self,
271 ctx: &mut ActorContext<Self>,
272 _new_owner: PublicKey,
273 _gov_version: u64,
274 ) -> Result<(), ActorError> {
275 let node_path = ActorPath::from("/user/node");
276 let node = ctx.system().get_actor::<Node>(&node_path).await?;
277 node.tell(NodeMessage::ConfirmTransfer(
278 self.subject_metadata.subject_id.clone(),
279 ))
280 .await
281 }
282
283 async fn transfer(
284 &self,
285 ctx: &mut ActorContext<Self>,
286 new_owner: PublicKey,
287 _gov_version: u64,
288 ) -> Result<(), ActorError> {
289 let node_path = ActorPath::from("/user/node");
290 let node = ctx.system().get_actor::<Node>(&node_path).await?;
291 node.tell(NodeMessage::TransferSubject(TransferSubject {
292 name: self.subject_metadata.name.clone(),
293 subject_id: self.subject_metadata.subject_id.clone(),
294 new_owner: new_owner.clone(),
295 actual_owner: self.subject_metadata.owner.clone(),
296 }))
297 .await
298 }
299
300 async fn get_last_ledger(
301 &self,
302 ctx: &mut ActorContext<Self>,
303 ) -> Result<Option<Ledger>, ActorError> {
304 get_last_event(ctx).await
305 }
306
307 fn apply_patch(
308 &mut self,
309 json_patch: ValueWrapper,
310 ) -> Result<(), ActorError> {
311 let patch_json = serde_json::from_value::<Patch>(json_patch.0)
312 .map_err(|e| {
313 let error = SubjectError::PatchConversionFailed {
314 details: e.to_string(),
315 };
316 error!(
317 error = %e,
318 subject_id = %self.subject_metadata.subject_id,
319 "Failed to convert patch from JSON"
320 );
321 ActorError::Functional {
322 description: error.to_string(),
323 }
324 })?;
325
326 let mut properties = self.properties.to_value_wrapper();
327
328 patch(&mut properties.0, &patch_json).map_err(|e| {
329 let error = SubjectError::PatchApplicationFailed {
330 details: e.to_string(),
331 };
332 error!(
333 error = %e,
334 subject_id = %self.subject_metadata.subject_id,
335 "Failed to apply patch to properties"
336 );
337 ActorError::Functional {
338 description: error.to_string(),
339 }
340 })?;
341
342 self.properties = serde_json::from_value::<GovernanceData>(
343 properties.0,
344 )
345 .map_err(|e| {
346 let error = SubjectError::GovernanceDataConversionFailed {
347 details: e.to_string(),
348 };
349 error!(
350 error = %e,
351 subject_id = %self.subject_metadata.subject_id,
352 "Failed to convert properties to GovernanceData"
353 );
354 ActorError::Functional {
355 description: error.to_string(),
356 }
357 })?;
358
359 debug!(
360 subject_id = %self.subject_metadata.subject_id,
361 "Patch applied successfully"
362 );
363
364 Ok(())
365 }
366
367 async fn manager_new_ledger_events(
368 &mut self,
369 ctx: &mut ActorContext<Self>,
370 events: Vec<Ledger>,
371 ) -> Result<(), ActorError> {
372 let Some(network) = ctx
373 .system()
374 .get_helper::<Arc<NetworkSender>>("network")
375 .await
376 else {
377 return Err(ActorError::Helper {
378 name: "network".to_owned(),
379 reason: "Not found".to_owned(),
380 });
381 };
382
383 let Some(hash) = self.hash else {
384 return Err(ActorError::FunctionalCritical {
385 description: "Hash algorithm is None".to_string(),
386 });
387 };
388
389 let current_sn = self.subject_metadata.sn;
390 let current_new_owner_some = self.subject_metadata.new_owner.is_some();
391 let i_current_new_owner = self.subject_metadata.new_owner.clone()
392 == Some((*self.our_key).clone());
393 let current_owner = self.subject_metadata.owner.clone();
394
395 let current_properties = self.properties.clone();
396
397 if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
398 {
399 if let ActorError::Functional { description } = e.clone() {
400 warn!(
401 error = %description,
402 subject_id = %self.subject_metadata.subject_id,
403 sn = self.subject_metadata.sn,
404 "Error verifying new ledger events"
405 );
406
407 if self.subject_metadata.sn == 0 {
409 return Err(e);
410 }
411 } else {
412 error!(
413 error = %e,
414 subject_id = %self.subject_metadata.subject_id,
415 sn = self.subject_metadata.sn,
416 "Critical error verifying new ledger events"
417 );
418 return Err(e);
419 }
420 };
421
422 if current_sn < self.subject_metadata.sn {
423 let old_gov = current_properties;
424 if !self.subject_metadata.active {
425 if current_owner == *self.our_key {
426 Self::down_owner(ctx).await?;
427 } else {
428 Self::down_not_owner(ctx, &old_gov, self.our_key.clone())
429 .await?;
430 }
431
432 let old_schemas_eval = old_gov
433 .schemas_name(ProtocolTypes::Evaluation, &self.our_key);
434
435 Self::down_compilers_schemas(
436 ctx,
437 &old_schemas_eval,
438 &self.subject_metadata.subject_id,
439 )
440 .await?;
441
442 let old_schemas_val = old_gov
443 .schemas_name(ProtocolTypes::Validation, &self.our_key);
444
445 Self::down_schemas(ctx, &old_schemas_eval, &old_schemas_val)
446 .await?;
447 } else {
448 let new_owner_some = self.subject_metadata.new_owner.is_some();
449 let i_new_owner = self.subject_metadata.new_owner.clone()
450 == Some((*self.our_key).clone());
451 let mut up_not_owner: bool = false;
452 let mut up_owner: bool = false;
453
454 if current_owner == *self.our_key {
455 if current_owner != self.subject_metadata.owner {
457 if !current_new_owner_some && !i_new_owner {
459 up_not_owner = true;
461 } else if current_new_owner_some && i_new_owner {
462 up_owner = true;
463 }
464 } else {
465 if current_new_owner_some && !new_owner_some {
467 up_owner = true;
468 } else if !current_new_owner_some && new_owner_some {
469 up_not_owner = true;
470 }
471 }
472 } else {
473 if current_owner != self.subject_metadata.owner
475 && self.subject_metadata.owner == *self.our_key
476 {
477 if !new_owner_some && !i_current_new_owner {
479 up_owner = true;
481 } else if new_owner_some && i_current_new_owner {
482 up_not_owner = true;
483 }
484 } else if i_current_new_owner && !i_new_owner {
485 up_not_owner = true;
486 } else if !i_current_new_owner && i_new_owner {
487 up_owner = true;
488 }
489 }
490
491 if up_not_owner {
492 Self::down_owner(ctx).await?;
493 self.up_not_owner(ctx, &hash, &network).await?;
494 } else if up_owner {
495 Self::down_not_owner(ctx, &old_gov, self.our_key.clone())
496 .await?;
497 self.up_owner(ctx, &hash, &network).await?;
498 }
499
500 if !up_not_owner
503 && !up_owner
504 && *self.our_key != self.subject_metadata.owner
505 {
506 self.up_down_not_owner(ctx, &old_gov, &hash, &network)
507 .await?;
508 }
509
510 self.manager_schemas_compilers(ctx, &old_gov).await?;
511 self.update_childs(ctx).await?;
512 }
513
514 let _ = make_obsolete(ctx, &self.subject_metadata.subject_id).await;
515 }
516
517 if current_sn < self.subject_metadata.sn || current_sn == 0 {
518 Self::publish_sink(
519 ctx,
520 SinkDataMessage::UpdateState(Box::new(SubjectDB::from(
521 self.clone(),
522 ))),
523 )
524 .await?;
525
526 self.update_sn(ctx).await?;
527 self.refresh_version_sync(ctx).await?;
528 }
529
530 Ok(())
531 }
532}
533
534impl Governance {
535 async fn ledger_batch_size(
536 ctx: &ActorContext<Self>,
537 ) -> Result<usize, ActorError> {
538 let Some(config) =
539 ctx.system().get_helper::<ConfigHelper>("config").await
540 else {
541 return Err(ActorError::Helper {
542 name: "config".to_string(),
543 reason: "Not Found".to_string(),
544 });
545 };
546
547 Ok(config.ledger_batch_size)
548 }
549
550 async fn up_approver_only(
551 &self,
552 ctx: &mut ActorContext<Self>,
553 hash: &HashAlgorithm,
554 network: &Arc<NetworkSender>,
555 ) -> Result<(), ActorError> {
556 if !self.properties.has_this_role(HashThisRole::Gov {
557 who: (*self.our_key).clone(),
558 role: RoleTypes::Approver,
559 }) {
560 return Ok(());
561 }
562
563 let always_accept = if let Some(config) =
564 ctx.system().get_helper::<ConfigHelper>("config").await
565 {
566 config.always_accept
567 } else {
568 return Err(ActorError::Helper {
569 name: "config".to_string(),
570 reason: "Not Found".to_string(),
571 });
572 };
573
574 let pass_votation = if always_accept {
575 VotationType::AlwaysAccept
576 } else {
577 VotationType::Manual
578 };
579
580 let owner = *self.our_key == self.subject_metadata.owner;
581 let i_new_owner =
582 self.subject_metadata.new_owner == Some((*self.our_key).clone());
583
584 let node_key = if (owner && self.subject_metadata.new_owner.is_none())
585 || i_new_owner
586 {
587 (*self.our_key).clone()
588 } else {
589 self.subject_metadata
590 .new_owner
591 .clone()
592 .unwrap_or_else(|| self.subject_metadata.owner.clone())
593 };
594
595 let init_approver = InitApprPersist {
596 our_key: self.our_key.clone(),
597 node_key,
598 subject_id: self.subject_metadata.subject_id.clone(),
599 pass_votation,
600 helpers: (*hash, network.clone()),
601 };
602
603 ctx.create_child("approver", ApprPersist::initial(init_approver))
604 .await?;
605
606 Ok(())
607 }
608
609 async fn current_validation_roles(
610 &self,
611 ctx: &ActorContext<Self>,
612 schema_id: SchemaType,
613 ) -> Result<CurrentValidationRoles, ActorError> {
614 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
615 let response = actor
616 .ask(RoleRegisterMessage::GetCurrentValidationRoles { schema_id })
617 .await?;
618
619 match response {
620 RoleRegisterResponse::CurrentValidationRoles(roles) => Ok(roles),
621 _ => Err(ActorError::UnexpectedResponse {
622 path: ActorPath::from(format!(
623 "/user/node/subject_manager/{}/role_register",
624 self.subject_metadata.subject_id
625 )),
626 expected: "RoleRegisterResponse::CurrentValidationRoles"
627 .to_owned(),
628 }),
629 }
630 }
631
632 async fn refresh_version_sync(
633 &self,
634 ctx: &ActorContext<Self>,
635 ) -> Result<(), ActorError> {
636 if !self.service {
637 return Ok(());
638 }
639
640 let version_sync = ctx
641 .get_child::<GovernanceVersionSync>("version_sync")
642 .await?;
643 let governance_peers = self
644 .properties
645 .get_witnesses(WitnessesData::Gov)
646 .map_err(|e| ActorError::Functional {
647 description: e.to_string(),
648 })?;
649
650 let _ = version_sync
651 .ask(GovernanceVersionSyncMessage::RefreshGovernance {
652 version: self.properties.version,
653 governance_peers,
654 })
655 .await?;
656
657 Ok(())
658 }
659
660 async fn update_schemas(
661 &self,
662 ctx: &ActorContext<Self>,
663 schema_creators_eval: &BTreeMap<
664 SchemaType,
665 BTreeMap<PublicKey, BTreeSet<Namespace>>,
666 >,
667 schema_issuers_eval: &BTreeMap<
668 SchemaType,
669 (BTreeMap<PublicKey, BTreeSet<Namespace>>, bool),
670 >,
671 schema_creators_vali: &BTreeMap<
672 SchemaType,
673 BTreeMap<PublicKey, BTreeSet<Namespace>>,
674 >,
675 update_eval: &BTreeMap<SchemaType, ValueWrapper>,
676 update_vali: &BTreeMap<SchemaType, ValueWrapper>,
677 ) -> Result<(), ActorError> {
678 for (schema_id, init_state) in update_eval.iter() {
679 let actor = ctx
680 .get_child::<EvaluationSchema>(&format!(
681 "{}_evaluation",
682 schema_id
683 ))
684 .await?;
685
686 actor
687 .tell(EvaluationSchemaMessage::Update {
688 members: self.properties.members.values().cloned().collect(),
689 creators: schema_creators_eval
690 .get(schema_id)
691 .cloned()
692 .unwrap_or_default(),
693 issuers: schema_issuers_eval
694 .get(schema_id)
695 .map(|(issuers, _)| issuers.clone())
696 .unwrap_or_default(),
697 issuer_any: schema_issuers_eval
698 .get(schema_id)
699 .map(|(_, issuer_any)| *issuer_any)
700 .unwrap_or(false),
701 schema_viewpoints: self
702 .properties
703 .schemas
704 .get(schema_id)
705 .map(|schema| schema.viewpoints.clone())
706 .unwrap_or_default(),
707 sn: self.subject_metadata.sn,
708 gov_version: self.properties.version,
709 init_state: init_state.clone(),
710 })
711 .await?;
712 }
713
714 for (schema_id, init_state) in update_vali.iter() {
715 let current_roles = self
716 .current_validation_roles(ctx, schema_id.clone())
717 .await?;
718 let actor = ctx
719 .get_child::<ValidationSchema>(&format!(
720 "{}_validation",
721 schema_id
722 ))
723 .await?;
724
725 actor
726 .tell(ValidationSchemaMessage::Update {
727 creators: schema_creators_vali
728 .get(schema_id)
729 .cloned()
730 .unwrap_or_default(),
731 sn: self.subject_metadata.sn,
732 gov_version: self.properties.version,
733 init_state: init_state.clone(),
734 current_roles: current_roles.schema,
735 })
736 .await?;
737 }
738
739 Ok(())
740 }
741
742 async fn down_schemas(
743 ctx: &ActorContext<Self>,
744 old_schemas_eval: &BTreeSet<SchemaType>,
745 old_schemas_val: &BTreeSet<SchemaType>,
746 ) -> Result<(), ActorError> {
747 for schema_id in old_schemas_eval {
748 let actor = ctx
749 .get_child::<EvaluationSchema>(&format!(
750 "{}_evaluation",
751 schema_id
752 ))
753 .await?;
754 actor.ask_stop().await?;
755 }
756
757 for schema_id in old_schemas_val {
758 let actor = ctx
759 .get_child::<ValidationSchema>(&format!(
760 "{}_validation",
761 schema_id
762 ))
763 .await?;
764 actor.ask_stop().await?;
765 }
766
767 Ok(())
768 }
769
770 async fn up_schemas(
771 &self,
772 ctx: &mut ActorContext<Self>,
773 schema_roles: SchemaRuntimeRoles<'_>,
774 up_eval: &BTreeMap<SchemaType, ValueWrapper>,
775 up_vali: &BTreeMap<SchemaType, ValueWrapper>,
776 hash_network: (&HashAlgorithm, &Arc<NetworkSender>),
777 ) -> Result<(), ActorError> {
778 for (schema_id, init_state) in up_eval.iter() {
779 let eval_actor = EvaluationSchema {
780 our_key: self.our_key.clone(),
781 governance_id: self.subject_metadata.subject_id.clone(),
782 gov_version: self.properties.version,
783 sn: self.subject_metadata.sn,
784 members: self.properties.members.values().cloned().collect(),
785 creators: schema_roles
786 .creators_eval
787 .get(schema_id)
788 .cloned()
789 .unwrap_or_default(),
790 issuers: schema_roles
791 .issuers_eval
792 .get(schema_id)
793 .map(|(issuers, _)| issuers.clone())
794 .unwrap_or_default(),
795 issuer_any: schema_roles
796 .issuers_eval
797 .get(schema_id)
798 .map(|(_, issuer_any)| *issuer_any)
799 .unwrap_or(false),
800 schema_viewpoints: self
801 .properties
802 .schemas
803 .get(schema_id)
804 .map(|schema| schema.viewpoints.clone())
805 .unwrap_or_default(),
806 schema_id: schema_id.clone(),
807 init_state: init_state.clone(),
808 hash: *hash_network.0,
809 network: hash_network.1.clone(),
810 };
811
812 ctx.create_child(&format!("{}_evaluation", schema_id), eval_actor)
813 .await?;
814 }
815
816 for (schema_id, init_state) in up_vali.iter() {
817 let current_roles = self
818 .current_validation_roles(ctx, schema_id.clone())
819 .await?;
820 let vali_actor = ValidationSchema {
821 our_key: self.our_key.clone(),
822 governance_id: self.subject_metadata.subject_id.clone(),
823 gov_version: self.properties.version,
824 sn: self.subject_metadata.sn,
825 creators: schema_roles
826 .creators_vali
827 .get(schema_id)
828 .cloned()
829 .unwrap_or_default(),
830 schema_id: schema_id.clone(),
831 init_state: init_state.clone(),
832 current_roles: current_roles.schema,
833 hash: *hash_network.0,
834 network: hash_network.1.clone(),
835 };
836
837 ctx.create_child(&format!("{}_validation", schema_id), vali_actor)
838 .await?;
839 }
840
841 Ok(())
842 }
843
844 async fn manager_schemas_compilers(
845 &self,
846 ctx: &mut ActorContext<Self>,
847 old_gov: &GovernanceData,
848 ) -> Result<(), ActorError> {
849 let Some(network) = ctx
850 .system()
851 .get_helper::<Arc<NetworkSender>>("network")
852 .await
853 else {
854 return Err(ActorError::Helper {
855 name: "network".to_owned(),
856 reason: "Not found".to_owned(),
857 });
858 };
859
860 let Some(hash) = self.hash else {
861 return Err(ActorError::FunctionalCritical {
862 description: "Hash algorithm is None".to_string(),
863 });
864 };
865
866 let (old_schemas_eval, new_schemas_eval) = {
867 let old_schemas_eval =
868 old_gov.schemas_name(ProtocolTypes::Evaluation, &self.our_key);
869
870 let new_schemas_eval = self
871 .properties
872 .schemas(ProtocolTypes::Evaluation, &self.our_key);
873
874 let down = old_schemas_eval
877 .clone()
878 .iter()
879 .filter(|x| !new_schemas_eval.contains_key(x))
880 .cloned()
881 .collect();
882 Self::down_compilers_schemas(
883 ctx,
884 &down,
885 &self.subject_metadata.subject_id,
886 )
887 .await?;
888
889 let up = new_schemas_eval
891 .clone()
892 .iter()
893 .filter(|x| !old_schemas_eval.contains(x.0))
894 .map(|x| (x.0.clone(), x.1.clone()))
895 .collect();
896
897 Self::up_compilers_schemas(
898 ctx,
899 &up,
900 self.subject_metadata.subject_id.clone(),
901 &hash,
902 )
903 .await?;
904
905 let current = new_schemas_eval
907 .clone()
908 .iter()
909 .filter(|x| old_schemas_eval.contains(x.0))
910 .map(|x| (x.0.clone(), x.1.clone()))
911 .collect();
912
913 Self::compile_schemas(
914 ctx,
915 current,
916 self.subject_metadata.subject_id.clone(),
917 )
918 .await?;
919
920 (
921 old_schemas_eval,
922 new_schemas_eval
923 .iter()
924 .map(|x| (x.0.clone(), x.1.initial_value.clone()))
925 .collect::<BTreeMap<SchemaType, ValueWrapper>>(),
926 )
927 };
928 let old_schemas_vali =
929 old_gov.schemas_name(ProtocolTypes::Validation, &self.our_key);
930
931 let new_schemas_vali = self
932 .properties
933 .schemas_init_value(ProtocolTypes::Validation, &self.our_key);
934
935 let down_eval = old_schemas_eval
937 .clone()
938 .iter()
939 .filter(|x| !new_schemas_eval.contains_key(x))
940 .cloned()
941 .collect();
942
943 let down_vali = old_schemas_vali
944 .clone()
945 .iter()
946 .filter(|x| !new_schemas_vali.contains_key(x))
947 .cloned()
948 .collect();
949
950 Self::down_schemas(ctx, &down_eval, &down_vali).await?;
951
952 let schemas_namespace_eval = self
954 .properties
955 .schemas_namespace(ProtocolTypes::Evaluation, &self.our_key);
956
957 let schema_creators_eval = self
958 .properties
959 .schema_creators_namespace(schemas_namespace_eval.clone());
960 let schema_issuers_eval = self
961 .properties
962 .schema_issuers_namespace(schemas_namespace_eval);
963
964 let up_eval = new_schemas_eval
965 .clone()
966 .iter()
967 .filter(|x| !old_schemas_eval.contains(x.0))
968 .map(|x| (x.0.clone(), x.1.clone()))
969 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
970
971 let schemas_namespace_vali = self
972 .properties
973 .schemas_namespace(ProtocolTypes::Validation, &self.our_key);
974
975 let schema_creators_vali = self
976 .properties
977 .schema_creators_namespace(schemas_namespace_vali);
978
979 let up_vali = new_schemas_vali
980 .clone()
981 .iter()
982 .filter(|x| !old_schemas_vali.contains(x.0))
983 .map(|x| (x.0.clone(), x.1.clone()))
984 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
985 self.up_schemas(
987 ctx,
988 SchemaRuntimeRoles {
989 creators_eval: &schema_creators_eval,
990 issuers_eval: &schema_issuers_eval,
991 creators_vali: &schema_creators_vali,
992 },
993 &up_eval,
994 &up_vali,
995 (&hash, &network),
996 )
997 .await?;
998
999 let update_eval = new_schemas_eval
1001 .clone()
1002 .iter()
1003 .filter(|x| old_schemas_eval.contains(x.0))
1004 .map(|x| (x.0.clone(), x.1.clone()))
1005 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
1006
1007 let update_vali = new_schemas_vali
1008 .clone()
1009 .iter()
1010 .filter(|x| old_schemas_vali.contains(x.0))
1011 .map(|x| (x.0.clone(), x.1.clone()))
1012 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
1013
1014 self.update_schemas(
1015 ctx,
1016 &schema_creators_eval,
1017 &schema_issuers_eval,
1018 &schema_creators_vali,
1019 &update_eval,
1020 &update_vali,
1021 )
1022 .await
1023 }
1024
1025 async fn update_childs(
1026 &self,
1027 ctx: &ActorContext<Self>,
1028 ) -> Result<(), ActorError> {
1029 if let Ok(evaluator) = ctx.get_child::<EvalWorker>("evaluator").await {
1030 let (issuers, issuer_any) = self.properties.governance_issuers();
1031 evaluator
1032 .tell(EvalWorkerMessage::Update {
1033 gov_version: self.properties.version,
1034 issuers,
1035 issuer_any,
1036 })
1037 .await?;
1038 }
1039
1040 if let Ok(validator) = ctx.get_child::<ValiWorker>("validator").await {
1041 let current_roles = self
1042 .current_validation_roles(ctx, SchemaType::Governance)
1043 .await?;
1044 validator
1045 .tell(ValiWorkerMessage::UpdateCurrentRoles {
1046 gov_version: self.properties.version,
1047 current_roles: crate::validation::worker::CurrentWorkerRoles {
1048 approval: current_roles.approval,
1049 evaluation: crate::governance::role_register::RoleDataRegister {
1050 workers: current_roles
1051 .schema
1052 .evaluation
1053 .iter()
1054 .map(|role| role.key.clone())
1055 .collect(),
1056 quorum: current_roles.schema.evaluation_quorum,
1057 },
1058 },
1059 })
1060 .await?;
1061 }
1062
1063 Ok(())
1064 }
1065
1066 async fn sweep_contract_artifacts(
1067 &self,
1068 ctx: &ActorContext<Self>,
1069 schemas: &BTreeMap<SchemaType, Schema>,
1070 ) -> Result<(), ActorError> {
1071 let Some(config) =
1072 ctx.system().get_helper::<ConfigHelper>("config").await
1073 else {
1074 return Err(ActorError::Helper {
1075 name: "config".to_string(),
1076 reason: "Not Found".to_string(),
1077 });
1078 };
1079
1080 let Some(contracts) = ctx
1081 .system()
1082 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1083 "contracts",
1084 )
1085 .await
1086 else {
1087 return Err(ActorError::Helper {
1088 name: "contracts".to_string(),
1089 reason: "Not Found".to_string(),
1090 });
1091 };
1092
1093 let contract_register = ctx
1094 .get_child::<ContractRegister>("contract_register")
1095 .await?;
1096
1097 let prefix = format!("{}_", self.subject_metadata.subject_id);
1098 let mut allowed: HashSet<String> = schemas
1099 .keys()
1100 .map(|schema_id| {
1101 format!("{}_{}", self.subject_metadata.subject_id, schema_id)
1102 })
1103 .collect();
1104
1105 let registered: Vec<String> = match contract_register
1106 .ask(ContractRegisterMessage::ListContracts)
1107 .await?
1108 {
1109 ContractRegisterResponse::Contracts(contracts) => contracts,
1110 _ => Vec::new(),
1111 };
1112
1113 for contract_name in registered {
1114 if contract_name.starts_with(&prefix)
1115 && !allowed.contains(&contract_name)
1116 {
1117 contract_register
1118 .tell(ContractRegisterMessage::DeleteMetadata {
1119 contract_name: contract_name.clone(),
1120 })
1121 .await?;
1122 let mut contracts = contracts.write().await;
1123 contracts.remove(&contract_name);
1124 }
1125 }
1126
1127 let contracts_dir = config.contracts_path.join("contracts");
1128 if !contracts_dir.exists() {
1129 return Ok(());
1130 }
1131
1132 let mut entries = fs::read_dir(&contracts_dir).await.map_err(|e| {
1133 ActorError::Functional {
1134 description: format!(
1135 "Can not read contracts directory {}: {}",
1136 contracts_dir.display(),
1137 e
1138 ),
1139 }
1140 })?;
1141
1142 while let Some(entry) =
1143 entries
1144 .next_entry()
1145 .await
1146 .map_err(|e| ActorError::Functional {
1147 description: format!(
1148 "Can not iterate contracts directory {}: {}",
1149 contracts_dir.display(),
1150 e
1151 ),
1152 })?
1153 {
1154 let file_name = entry.file_name().to_string_lossy().to_string();
1155 if !file_name.starts_with(&prefix) {
1156 continue;
1157 }
1158
1159 let is_temp = file_name.starts_with(&format!(
1160 "{}_temp_",
1161 self.subject_metadata.subject_id
1162 ));
1163 if is_temp || !allowed.contains(&file_name) {
1164 let path = entry.path();
1165 let _ = fs::remove_dir_all(path).await;
1166 if !is_temp {
1167 allowed.remove(&file_name);
1168 }
1169 }
1170 }
1171
1172 Ok(())
1173 }
1174
1175 async fn delete_all_contract_artifacts(
1176 &self,
1177 ctx: &ActorContext<Self>,
1178 contract_register: &ActorRef<ContractRegister>,
1179 ) -> Result<(), ActorError> {
1180 let Some(config) =
1181 ctx.system().get_helper::<ConfigHelper>("config").await
1182 else {
1183 return Err(ActorError::Helper {
1184 name: "config".to_string(),
1185 reason: "Not Found".to_string(),
1186 });
1187 };
1188
1189 let Some(contracts) = ctx
1190 .system()
1191 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1192 "contracts",
1193 )
1194 .await
1195 else {
1196 return Err(ActorError::Helper {
1197 name: "contracts".to_string(),
1198 reason: "Not Found".to_string(),
1199 });
1200 };
1201
1202 let prefix = format!("{}_", self.subject_metadata.subject_id);
1203
1204 let registered: Vec<String> = match contract_register
1205 .ask(ContractRegisterMessage::ListContracts)
1206 .await?
1207 {
1208 ContractRegisterResponse::Contracts(contracts) => contracts,
1209 _ => Vec::new(),
1210 };
1211
1212 for contract_name in registered {
1213 if contract_name.starts_with(&prefix) {
1214 contract_register
1215 .ask(ContractRegisterMessage::DeleteMetadata {
1216 contract_name: contract_name.clone(),
1217 })
1218 .await?;
1219 let mut contracts = contracts.write().await;
1220 contracts.remove(&contract_name);
1221 }
1222 }
1223
1224 let contracts_dir = config.contracts_path.join("contracts");
1225 if !contracts_dir.exists() {
1226 return Ok(());
1227 }
1228
1229 let mut entries = fs::read_dir(&contracts_dir).await.map_err(|e| {
1230 ActorError::Functional {
1231 description: format!(
1232 "Can not read contracts directory {}: {}",
1233 contracts_dir.display(),
1234 e
1235 ),
1236 }
1237 })?;
1238
1239 while let Some(entry) =
1240 entries
1241 .next_entry()
1242 .await
1243 .map_err(|e| ActorError::Functional {
1244 description: format!(
1245 "Can not iterate contracts directory {}: {}",
1246 contracts_dir.display(),
1247 e
1248 ),
1249 })?
1250 {
1251 let file_name = entry.file_name().to_string_lossy().to_string();
1252 if file_name.starts_with(&prefix) {
1253 let path = entry.path();
1254 fs::remove_dir_all(&path).await.map_err(|e| {
1255 ActorError::Functional {
1256 description: format!(
1257 "Can not remove contract directory {}: {}",
1258 path.display(),
1259 e
1260 ),
1261 }
1262 })?;
1263 }
1264 }
1265
1266 Ok(())
1267 }
1268
1269 async fn build_childs(
1270 &self,
1271 ctx: &mut ActorContext<Self>,
1272 hash: &HashAlgorithm,
1273 network: &Arc<NetworkSender>,
1274 ) -> Result<(), ActorError> {
1275 let owner = *self.our_key == self.subject_metadata.owner;
1277 let new_owner = self.subject_metadata.new_owner.is_some();
1278 let i_new_owner =
1279 self.subject_metadata.new_owner == Some((*self.our_key).clone());
1280
1281 if new_owner {
1282 if i_new_owner {
1283 self.up_owner(ctx, hash, network).await?;
1284 } else {
1285 self.up_not_owner(ctx, hash, network).await?;
1286 }
1287 } else if owner {
1288 self.up_owner(ctx, hash, network).await?;
1289 } else {
1290 self.up_not_owner(ctx, hash, network).await?;
1291 }
1292
1293 let new_schemas_eval = {
1294 let schemas = self
1295 .properties
1296 .schemas(ProtocolTypes::Evaluation, &self.our_key);
1297 self.sweep_contract_artifacts(ctx, &schemas).await?;
1298 Self::up_compilers_schemas(
1299 ctx,
1300 &schemas,
1301 self.subject_metadata.subject_id.clone(),
1302 hash,
1303 )
1304 .await?;
1305
1306 schemas
1307 .iter()
1308 .map(|x| (x.0.clone(), x.1.initial_value.clone()))
1309 .collect::<BTreeMap<SchemaType, ValueWrapper>>()
1310 };
1311
1312 let schemas_namespace_eval = self
1313 .properties
1314 .schemas_namespace(ProtocolTypes::Evaluation, &self.our_key);
1315
1316 let schema_creators_eval = self
1317 .properties
1318 .schema_creators_namespace(schemas_namespace_eval.clone());
1319 let schema_issuers_eval = self
1320 .properties
1321 .schema_issuers_namespace(schemas_namespace_eval);
1322
1323 let schemas_namespace_vali = self
1324 .properties
1325 .schemas_namespace(ProtocolTypes::Validation, &self.our_key);
1326
1327 let schema_creators_vali = self
1328 .properties
1329 .schema_creators_namespace(schemas_namespace_vali);
1330
1331 let new_schemas_vali = self
1332 .properties
1333 .schemas_init_value(ProtocolTypes::Validation, &self.our_key);
1334
1335 self.up_schemas(
1336 ctx,
1337 SchemaRuntimeRoles {
1338 creators_eval: &schema_creators_eval,
1339 issuers_eval: &schema_issuers_eval,
1340 creators_vali: &schema_creators_vali,
1341 },
1342 &new_schemas_eval,
1343 &new_schemas_vali,
1344 (hash, network),
1345 )
1346 .await
1347 }
1348
1349 async fn up_not_owner(
1350 &self,
1351 ctx: &mut ActorContext<Self>,
1352 hash: &HashAlgorithm,
1353 network: &Arc<NetworkSender>,
1354 ) -> Result<(), ActorError> {
1355 let node_key = self.subject_metadata.new_owner.as_ref().map_or_else(
1356 || self.subject_metadata.owner.clone(),
1357 |new_owner| new_owner.clone(),
1358 );
1359
1360 if self.properties.has_this_role(HashThisRole::Gov {
1361 who: (*self.our_key).clone(),
1362 role: RoleTypes::Validator,
1363 }) {
1364 let current_roles = self
1365 .current_validation_roles(ctx, SchemaType::Governance)
1366 .await?;
1367 let validator = ValiWorker {
1369 node_key: node_key.clone(),
1370 our_key: self.our_key.clone(),
1371 init_state: None,
1372 governance_id: self.subject_metadata.subject_id.clone(),
1373 gov_version: self.properties.version,
1374 sn: self.subject_metadata.sn,
1375 hash: *hash,
1376 network: network.clone(),
1377 current_roles: crate::validation::worker::CurrentWorkerRoles {
1378 approval: current_roles.approval,
1379 evaluation:
1380 crate::governance::role_register::RoleDataRegister {
1381 workers: current_roles
1382 .schema
1383 .evaluation
1384 .iter()
1385 .map(|role| role.key.clone())
1386 .collect(),
1387 quorum: current_roles.schema.evaluation_quorum,
1388 },
1389 },
1390 stop: false,
1391 };
1392 ctx.create_child("validator", validator).await?;
1393 }
1394
1395 if self.properties.has_this_role(HashThisRole::Gov {
1396 who: (*self.our_key).clone(),
1397 role: RoleTypes::Evaluator,
1398 }) {
1399 let (issuers, issuer_any) = self.properties.governance_issuers();
1400 let evaluator = EvalWorker {
1402 node_key: node_key.clone(),
1403 our_key: self.our_key.clone(),
1404 governance_id: self.subject_metadata.subject_id.clone(),
1405 gov_version: self.properties.version,
1406 sn: self.subject_metadata.sn,
1407 context: EvalWorkerContext::Governance {
1408 issuers,
1409 issuer_any,
1410 },
1411 init_state: None,
1412 hash: *hash,
1413 network: network.clone(),
1414 stop: false,
1415 };
1416 ctx.create_child("evaluator", evaluator).await?;
1417 }
1418
1419 if self.properties.has_this_role(HashThisRole::Gov {
1420 who: (*self.our_key).clone(),
1421 role: RoleTypes::Approver,
1422 }) {
1423 let always_accept = if let Some(config) =
1424 ctx.system().get_helper::<ConfigHelper>("config").await
1425 {
1426 config.always_accept
1427 } else {
1428 return Err(ActorError::Helper {
1429 name: "config".to_owned(),
1430 reason: "Not found".to_owned(),
1431 });
1432 };
1433
1434 let pass_votation = if always_accept {
1435 VotationType::AlwaysAccept
1436 } else {
1437 VotationType::Manual
1438 };
1439
1440 let init_approver = InitApprPersist {
1441 our_key: self.our_key.clone(),
1442 node_key: node_key.clone(),
1443 subject_id: self.subject_metadata.subject_id.clone(),
1444 pass_votation,
1445 helpers: (*hash, network.clone()),
1446 };
1447
1448 ctx.create_child("approver", ApprPersist::initial(init_approver))
1449 .await?;
1450 }
1451
1452 Ok(())
1453 }
1454
1455 async fn up_down_not_owner(
1456 &self,
1457 ctx: &mut ActorContext<Self>,
1458 old_gov: &GovernanceData,
1459 hash: &HashAlgorithm,
1460 network: &Arc<NetworkSender>,
1461 ) -> Result<(), ActorError> {
1462 let node_key = self.subject_metadata.new_owner.as_ref().map_or_else(
1463 || self.subject_metadata.owner.clone(),
1464 |new_owner| new_owner.clone(),
1465 );
1466
1467 let old_val = old_gov.has_this_role(HashThisRole::Gov {
1468 who: (*self.our_key).clone(),
1469 role: RoleTypes::Validator,
1470 });
1471
1472 let new_val = self.properties.has_this_role(HashThisRole::Gov {
1473 who: (*self.our_key).clone(),
1474 role: RoleTypes::Validator,
1475 });
1476
1477 match (old_val, new_val) {
1478 (true, false) => {
1479 let actor = ctx.get_child::<ValiWorker>("validator").await?;
1480 actor.ask_stop().await?;
1481 }
1482 (false, true) => {
1483 let current_roles = self
1484 .current_validation_roles(ctx, SchemaType::Governance)
1485 .await?;
1486 let validator = ValiWorker {
1488 node_key: node_key.clone(),
1489 our_key: self.our_key.clone(),
1490 init_state: None,
1491 governance_id: self.subject_metadata.subject_id.clone(),
1492 gov_version: self.properties.version,
1493 sn: self.subject_metadata.sn,
1494 hash: *hash,
1495 network: network.clone(),
1496 current_roles: crate::validation::worker::CurrentWorkerRoles {
1497 approval: current_roles.approval,
1498 evaluation: crate::governance::role_register::RoleDataRegister {
1499 workers: current_roles
1500 .schema
1501 .evaluation
1502 .iter()
1503 .map(|role| role.key.clone())
1504 .collect(),
1505 quorum: current_roles.schema.evaluation_quorum,
1506 },
1507 },
1508 stop: false,
1509 };
1510 ctx.create_child("validator", validator).await?;
1511 }
1512 _ => {}
1513 };
1514
1515 let old_eval = old_gov.has_this_role(HashThisRole::Gov {
1516 who: (*self.our_key).clone(),
1517 role: RoleTypes::Evaluator,
1518 });
1519
1520 let new_eval = self.properties.has_this_role(HashThisRole::Gov {
1521 who: (*self.our_key).clone(),
1522 role: RoleTypes::Evaluator,
1523 });
1524
1525 match (old_eval, new_eval) {
1526 (true, false) => {
1527 let actor = ctx.get_child::<EvalWorker>("evaluator").await?;
1528
1529 actor.ask_stop().await?;
1530 }
1531 (false, true) => {
1532 let (issuers, issuer_any) = self.properties.governance_issuers();
1533 let evaluator = EvalWorker {
1534 node_key: node_key.clone(),
1535 our_key: self.our_key.clone(),
1536 governance_id: self.subject_metadata.subject_id.clone(),
1537 gov_version: self.properties.version,
1538 sn: self.subject_metadata.sn,
1539 context: EvalWorkerContext::Governance {
1540 issuers,
1541 issuer_any,
1542 },
1543 init_state: None,
1544 hash: *hash,
1545 network: network.clone(),
1546 stop: false,
1547 };
1548 ctx.create_child("evaluator", evaluator).await?;
1549 }
1550 _ => {}
1551 };
1552
1553 let old_appr = old_gov.has_this_role(HashThisRole::Gov {
1554 who: (*self.our_key).clone(),
1555 role: RoleTypes::Approver,
1556 });
1557
1558 let new_appr = self.properties.has_this_role(HashThisRole::Gov {
1559 who: (*self.our_key).clone(),
1560 role: RoleTypes::Approver,
1561 });
1562
1563 match (old_appr, new_appr) {
1564 (true, false) => {
1565 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1566
1567 actor.ask_stop().await?;
1568 }
1569 (false, true) => {
1570 let always_accept = if let Some(config) =
1571 ctx.system().get_helper::<ConfigHelper>("config").await
1572 {
1573 config.always_accept
1574 } else {
1575 return Err(ActorError::Helper {
1576 name: "config".to_owned(),
1577 reason: "Not found".to_owned(),
1578 });
1579 };
1580
1581 let pass_votation = if always_accept {
1582 VotationType::AlwaysAccept
1583 } else {
1584 VotationType::Manual
1585 };
1586
1587 let init_approver = InitApprPersist {
1588 our_key: self.our_key.clone(),
1589 node_key: node_key.clone(),
1590 subject_id: self.subject_metadata.subject_id.clone(),
1591 pass_votation,
1592 helpers: (*hash, network.clone()),
1593 };
1594
1595 ctx.create_child(
1596 "approver",
1597 ApprPersist::initial(init_approver),
1598 )
1599 .await?;
1600 }
1601 _ => {}
1602 };
1603
1604 Ok(())
1605 }
1606
1607 async fn down_not_owner(
1608 ctx: &ActorContext<Self>,
1609 gov: &GovernanceData,
1610 our_key: Arc<PublicKey>,
1611 ) -> Result<(), ActorError> {
1612 if gov.has_this_role(HashThisRole::Gov {
1613 who: (*our_key).clone(),
1614 role: RoleTypes::Validator,
1615 }) {
1616 let actor = ctx.get_child::<ValiWorker>("validator").await?;
1617
1618 actor.ask_stop().await?;
1619 }
1620
1621 if gov.has_this_role(HashThisRole::Gov {
1622 who: (*our_key).clone(),
1623 role: RoleTypes::Evaluator,
1624 }) {
1625 let actor = ctx.get_child::<EvalWorker>("evaluator").await?;
1626
1627 actor.ask_stop().await?;
1628 }
1629
1630 if gov.has_this_role(HashThisRole::Gov {
1631 who: (*our_key).clone(),
1632 role: RoleTypes::Approver,
1633 }) {
1634 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1635
1636 actor.ask_stop().await?;
1637 }
1638
1639 Ok(())
1640 }
1641
1642 async fn up_owner(
1643 &self,
1644 ctx: &mut ActorContext<Self>,
1645 hash: &HashAlgorithm,
1646 network: &Arc<NetworkSender>,
1647 ) -> Result<(), ActorError> {
1648 let always_accept = if let Some(config) =
1649 ctx.system().get_helper::<ConfigHelper>("config").await
1650 {
1651 config.always_accept
1652 } else {
1653 return Err(ActorError::Helper {
1654 name: "config".to_string(),
1655 reason: "Not Found".to_string(),
1656 });
1657 };
1658 let pass_votation = if always_accept {
1659 VotationType::AlwaysAccept
1660 } else {
1661 VotationType::Manual
1662 };
1663
1664 let init_approver = InitApprPersist {
1665 our_key: self.our_key.clone(),
1666 node_key: (*self.our_key).clone(),
1667 subject_id: self.subject_metadata.subject_id.clone(),
1668 pass_votation,
1669 helpers: (*hash, network.clone()),
1670 };
1671
1672 ctx.create_child("approver", ApprPersist::initial(init_approver))
1673 .await?;
1674
1675 Ok(())
1676 }
1677
1678 async fn down_owner(ctx: &ActorContext<Self>) -> Result<(), ActorError> {
1679 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1680 actor.ask_stop().await?;
1681
1682 Ok(())
1683 }
1684
1685 async fn up_compilers_schemas(
1686 ctx: &mut ActorContext<Self>,
1687 schemas: &BTreeMap<SchemaType, Schema>,
1688 subject_id: DigestIdentifier,
1689 hash: &HashAlgorithm,
1690 ) -> Result<(), ActorError> {
1691 let contracts_path = if let Some(config) =
1692 ctx.system().get_helper::<ConfigHelper>("config").await
1693 {
1694 config.contracts_path
1695 } else {
1696 return Err(ActorError::Helper {
1697 name: "config".to_string(),
1698 reason: "Not Found".to_string(),
1699 });
1700 };
1701
1702 for (id, schema) in schemas {
1703 let actor_name = format!("{}_contract_compiler", id);
1704
1705 let compiler = ctx
1706 .create_child(&actor_name, ContractCompiler::new(*hash))
1707 .await?;
1708
1709 let Schema {
1710 contract,
1711 initial_value,
1712 viewpoints: _,
1713 } = schema;
1714
1715 let response = compiler
1716 .ask(ContractCompilerMessage::Compile {
1717 contract_name: format!("{}_{}", subject_id, id),
1718 contract: contract.clone(),
1719 initial_value: initial_value.0.clone(),
1720 contract_path: contracts_path
1721 .join("contracts")
1722 .join(format!("{}_{}", subject_id, id)),
1723 })
1724 .await?;
1725
1726 if let CompilerResponse::Error(error) = response {
1727 return Err(ActorError::Functional {
1728 description: format!(
1729 "Can not compile schema contract {}: {}",
1730 id, error
1731 ),
1732 });
1733 }
1734 }
1735
1736 Ok(())
1737 }
1738
1739 async fn down_compilers_schemas(
1740 ctx: &ActorContext<Self>,
1741 schemas: &BTreeSet<SchemaType>,
1742 subject_id: &DigestIdentifier,
1743 ) -> Result<(), ActorError> {
1744 let Some(config) =
1745 ctx.system().get_helper::<ConfigHelper>("config").await
1746 else {
1747 return Err(ActorError::Helper {
1748 name: "config".to_string(),
1749 reason: "Not Found".to_string(),
1750 });
1751 };
1752
1753 let Some(contracts) = ctx
1754 .system()
1755 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1756 "contracts",
1757 )
1758 .await
1759 else {
1760 return Err(ActorError::Helper {
1761 name: "contracts".to_string(),
1762 reason: "Not Found".to_string(),
1763 });
1764 };
1765
1766 let contract_register = ctx
1767 .get_child::<ContractRegister>("contract_register")
1768 .await?;
1769
1770 for schema_id in schemas.iter() {
1771 let actor = ctx
1772 .get_child::<ContractCompiler>(&format!(
1773 "{}_contract_compiler",
1774 schema_id
1775 ))
1776 .await?;
1777
1778 actor.ask_stop().await?;
1779
1780 let contract_name = format!("{}_{}", subject_id, schema_id);
1781 contract_register
1782 .tell(ContractRegisterMessage::DeleteMetadata {
1783 contract_name: contract_name.clone(),
1784 })
1785 .await?;
1786
1787 {
1788 let mut contracts = contracts.write().await;
1789 contracts.remove(&contract_name);
1790 }
1791
1792 let contract_path =
1793 config.contracts_path.join("contracts").join(&contract_name);
1794 let _ = fs::remove_dir_all(contract_path).await;
1795 }
1796
1797 Ok(())
1798 }
1799
1800 async fn compile_schemas(
1801 ctx: &ActorContext<Self>,
1802 schemas: HashMap<SchemaType, Schema>,
1803 subject_id: DigestIdentifier,
1804 ) -> Result<(), ActorError> {
1805 let contracts_path = if let Some(config) =
1806 ctx.system().get_helper::<ConfigHelper>("config").await
1807 {
1808 config.contracts_path
1809 } else {
1810 return Err(ActorError::Helper {
1811 name: "config".to_owned(),
1812 reason: "Not found".to_owned(),
1813 });
1814 };
1815
1816 for (id, schema) in schemas {
1817 let actor = ctx
1818 .get_child::<ContractCompiler>(&format!(
1819 "{}_contract_compiler",
1820 id
1821 ))
1822 .await?;
1823
1824 let response = actor
1825 .ask(ContractCompilerMessage::Compile {
1826 contract_name: format!("{}_{}", subject_id, id),
1827 contract: schema.contract.clone(),
1828 initial_value: schema.initial_value.0.clone(),
1829 contract_path: contracts_path
1830 .join("contracts")
1831 .join(format!("{}_{}", subject_id, id)),
1832 })
1833 .await?;
1834
1835 if let CompilerResponse::Error(error) = response {
1836 return Err(ActorError::Functional {
1837 description: format!(
1838 "Can not refresh schema contract {}: {}",
1839 id, error
1840 ),
1841 });
1842 }
1843 }
1844
1845 Ok(())
1846 }
1847
1848 fn build_creators_register_fact(
1849 &self,
1850 new_creator: HashMap<
1851 (SchemaType, String, PublicKey),
1852 (CreatorQuantity, Vec<WitnessesType>),
1853 >,
1854 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
1855 creator_update: CreatorRoleUpdate,
1856 new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1857 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1858 ) -> (SubjectRegisterMessage, WitnessesRegisterMessage) {
1859 let creator_registration = |schema_id: &SchemaType,
1860 namespace: &str,
1861 creator: &PublicKey,
1862 witnesses: Vec<WitnessesType>|
1863 -> CreatorWitnessRegistration {
1864 let namespace_value = Namespace::from(namespace.to_owned());
1865 let creator_name = self
1866 .properties
1867 .members
1868 .iter()
1869 .find(|(_, key)| *key == creator)
1870 .map(|(name, _)| name.clone());
1871
1872 let grants = creator_name
1873 .and_then(|creator_name| {
1874 self.properties
1875 .roles_schema
1876 .get(schema_id)
1877 .and_then(|roles_schema| {
1878 roles_schema.creator.get(&RoleCreator::create(
1879 &creator_name,
1880 namespace_value,
1881 ))
1882 })
1883 .map(|creator_role| {
1884 creator_role
1885 .witnesses
1886 .iter()
1887 .filter_map(|witness| {
1888 let witness_type = if witness.name
1889 == ReservedWords::Witnesses.to_string()
1890 {
1891 Some(WitnessesType::Witnesses)
1892 } else {
1893 self.properties
1894 .members
1895 .get(&witness.name)
1896 .cloned()
1897 .map(WitnessesType::User)
1898 }?;
1899
1900 let grant = if witness.viewpoints.contains(
1901 &ReservedWords::AllViewpoints
1902 .to_string(),
1903 ) {
1904 CreatorWitnessGrant::Full
1905 } else if witness.viewpoints.is_empty() {
1906 CreatorWitnessGrant::Hash
1907 } else {
1908 CreatorWitnessGrant::Clear(
1909 witness.viewpoints.clone(),
1910 )
1911 };
1912
1913 Some((witness_type, grant))
1914 })
1915 .collect::<Vec<_>>()
1916 })
1917 })
1918 .unwrap_or_default();
1919
1920 CreatorWitnessRegistration { witnesses, grants }
1921 };
1922
1923 let mut data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)> =
1924 vec![];
1925
1926 let mut new_creator_data: HashMap<
1927 (SchemaType, String, PublicKey),
1928 CreatorWitnessRegistration,
1929 > = HashMap::new();
1930
1931 let mut update_creator_witnesses_data: HashMap<
1932 (SchemaType, String, PublicKey),
1933 CreatorWitnessRegistration,
1934 > = HashMap::new();
1935
1936 for ((schema_id, ns, creator), (quantity, witnesses)) in
1937 new_creator.iter()
1938 {
1939 data.push((
1940 creator.clone(),
1941 schema_id.clone(),
1942 ns.clone(),
1943 quantity.clone(),
1944 ));
1945
1946 new_creator_data.insert(
1947 (schema_id.clone(), ns.clone(), creator.clone()),
1948 creator_registration(schema_id, ns, creator, witnesses.clone()),
1949 );
1950 }
1951
1952 for (schema_id, ns, creator) in remove_creator.iter() {
1953 data.push((
1954 creator.clone(),
1955 schema_id.clone(),
1956 ns.clone(),
1957 CreatorQuantity::Quantity(0),
1958 ));
1959 }
1960
1961 for ((schema_id, ns, creator), (quantity, creator_witnesses)) in
1962 creator_update.new_creator.iter()
1963 {
1964 data.push((
1965 creator.clone(),
1966 schema_id.clone(),
1967 ns.clone(),
1968 quantity.clone(),
1969 ));
1970
1971 let mut witnesses = vec![];
1972 for witness in creator_witnesses.iter() {
1973 if witness.name == ReservedWords::Witnesses.to_string() {
1974 witnesses.push(WitnessesType::Witnesses);
1975 } else if let Some(w) =
1976 self.properties.members.get(&witness.name)
1977 {
1978 witnesses.push(WitnessesType::User(w.clone()));
1979 }
1980 }
1981
1982 new_creator_data.insert(
1983 (schema_id.clone(), ns.clone(), creator.clone()),
1984 creator_registration(schema_id, ns, creator, witnesses),
1985 );
1986 }
1987
1988 for (schema_id, ns, creator, quantity) in
1989 creator_update.update_creator_quantity.iter()
1990 {
1991 data.push((
1992 creator.clone(),
1993 schema_id.clone(),
1994 ns.clone(),
1995 quantity.clone(),
1996 ));
1997 }
1998
1999 for (schema_id, ns, creator, creator_witnesses) in
2000 creator_update.update_creator_witnesses.iter()
2001 {
2002 let mut witnesses = vec![];
2003 for witness in creator_witnesses.iter() {
2004 if witness.name == ReservedWords::Witnesses.to_string() {
2005 witnesses.push(WitnessesType::Witnesses);
2006 } else if let Some(w) =
2007 self.properties.members.get(&witness.name)
2008 {
2009 witnesses.push(WitnessesType::User(w.clone()));
2010 }
2011 }
2012
2013 update_creator_witnesses_data.insert(
2014 (schema_id.clone(), ns.clone(), creator.clone()),
2015 creator_registration(schema_id, ns, creator, witnesses),
2016 );
2017 }
2018
2019 for (schema_id, ns, creator) in creator_update.remove_creator.iter() {
2020 data.push((
2021 creator.clone(),
2022 schema_id.clone(),
2023 ns.clone(),
2024 CreatorQuantity::Quantity(0),
2025 ));
2026 }
2027
2028 (
2029 SubjectRegisterMessage::RegisterData {
2030 gov_version: self.properties.version,
2031 data,
2032 },
2033 WitnessesRegisterMessage::UpdateCreatorsWitnessesFact {
2034 version: self.properties.version,
2035 new_creator: new_creator_data,
2036 remove_creator,
2037 update_creator_witnesses: update_creator_witnesses_data,
2038 new_witnesses,
2039 remove_witnesses,
2040 },
2041 )
2042 }
2043
2044 fn build_creators_register_confirm(
2045 &self,
2046 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
2047 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
2048 ) -> (SubjectRegisterMessage, WitnessesRegisterMessage) {
2049 let data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)> =
2050 remove_creator
2051 .iter()
2052 .map(|x| {
2053 (
2054 x.2.clone(),
2055 x.0.clone(),
2056 x.1.clone(),
2057 CreatorQuantity::Quantity(0),
2058 )
2059 })
2060 .collect();
2061 (
2062 SubjectRegisterMessage::RegisterData {
2063 gov_version: self.properties.version,
2064 data,
2065 },
2066 WitnessesRegisterMessage::UpdateCreatorsWitnessesConfirm {
2067 version: self.properties.version,
2068 remove_creator,
2069 remove_witnesses,
2070 },
2071 )
2072 }
2073
2074 async fn first_role_register(
2075 &self,
2076 ctx: &ActorContext<Self>,
2077 ) -> Result<(), ActorError> {
2078 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2079
2080 actor
2081 .tell(RoleRegisterMessage::UpdateFact {
2082 version: 0,
2083 appr_quorum: Some(Quorum::Majority),
2084 eval_quorum: HashMap::from([(
2085 SchemaType::Governance,
2086 Quorum::Majority,
2087 )]),
2088 new_approvers: vec![self.subject_metadata.owner.clone()],
2089 new_evaluators: HashMap::from([(
2090 (
2091 SchemaType::Governance,
2092 self.subject_metadata.owner.clone(),
2093 ),
2094 vec![Namespace::new()],
2095 )]),
2096 new_validators: HashMap::from([(
2097 (
2098 SchemaType::Governance,
2099 self.subject_metadata.owner.clone(),
2100 ),
2101 vec![Namespace::new()],
2102 )]),
2103 remove_approvers: vec![],
2104 remove_evaluators: HashMap::new(),
2105 remove_validators: HashMap::new(),
2106 vali_quorum: HashMap::from([(
2107 SchemaType::Governance,
2108 Quorum::Majority,
2109 )]),
2110 })
2111 .await
2112 }
2113
2114 async fn update_gov_version(
2115 &self,
2116 ctx: &ActorContext<Self>,
2117 ) -> Result<(), ActorError> {
2118 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2119
2120 actor
2121 .tell(RoleRegisterMessage::UpdateVersion {
2122 version: self.properties.version + 1,
2123 })
2124 .await
2125 }
2126
2127 async fn update_registers_fact(
2128 &self,
2129 ctx: &ActorContext<Self>,
2130 update: RolesUpdate,
2131 creator_update: CreatorRoleUpdate,
2132 ) -> Result<(), ActorError> {
2133 let RolesUpdate {
2134 appr_quorum,
2135 new_approvers,
2136 remove_approvers,
2137 eval_quorum,
2138 new_evaluators,
2139 remove_evaluators,
2140 vali_quorum,
2141 new_validators,
2142 remove_validators,
2143 new_creator,
2144 remove_creator,
2145 new_witnesses,
2146 remove_witnesses,
2147 } = update;
2148
2149 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2150 actor
2151 .tell(RoleRegisterMessage::UpdateFact {
2152 version: self.properties.version,
2153 appr_quorum,
2154 eval_quorum,
2155 new_approvers,
2156 new_evaluators,
2157 new_validators,
2158 remove_approvers,
2159 remove_evaluators,
2160 remove_validators,
2161 vali_quorum,
2162 })
2163 .await?;
2164
2165 let (subj_msg, wit_msg) = self.build_creators_register_fact(
2166 new_creator,
2167 remove_creator,
2168 creator_update,
2169 new_witnesses,
2170 remove_witnesses,
2171 );
2172
2173 let actor =
2174 ctx.get_child::<SubjectRegister>("subject_register").await?;
2175
2176 actor.tell(subj_msg).await?;
2177
2178 let actor = ctx
2179 .get_child::<WitnessesRegister>("witnesses_register")
2180 .await?;
2181
2182 actor.tell(wit_msg).await
2183 }
2184
2185 async fn update_registers_confirm(
2186 &self,
2187 ctx: &ActorContext<Self>,
2188 update: RolesUpdateConfirm,
2189 ) -> Result<(), ActorError> {
2190 let RolesUpdateConfirm {
2191 new_approver,
2192 remove_approver,
2193 new_evaluator,
2194 remove_evaluators,
2195 new_validator,
2196 remove_validators,
2197 remove_creator,
2198 remove_witnesses,
2199 } = update;
2200
2201 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2202 actor
2203 .tell(RoleRegisterMessage::UpdateConfirm {
2204 version: self.properties.version,
2205 new_approver,
2206 remove_approver,
2207 new_evaluator,
2208 remove_evaluators,
2209 new_validator,
2210 remove_validators,
2211 })
2212 .await?;
2213
2214 let (subj_msg, wit_msg) = self
2215 .build_creators_register_confirm(remove_creator, remove_witnesses);
2216
2217 let actor =
2218 ctx.get_child::<SubjectRegister>("subject_register").await?;
2219
2220 actor.tell(subj_msg).await?;
2221
2222 let actor = ctx
2223 .get_child::<WitnessesRegister>("witnesses_register")
2224 .await?;
2225
2226 actor.tell(wit_msg).await
2227 }
2228
2229 async fn verify_new_ledger_events(
2230 &mut self,
2231 ctx: &mut ActorContext<Self>,
2232 events: Vec<Ledger>,
2233 hash: &HashAlgorithm,
2234 ) -> Result<(), ActorError> {
2235 let mut iter = events.into_iter();
2236 let last_ledger = get_last_event(ctx).await?;
2237
2238 let mut last_ledger = if let Some(last_ledger) = last_ledger {
2239 last_ledger
2240 } else {
2241 let Some(first) = iter.next() else {
2242 return Ok(());
2243 };
2244 if let Err(e) = Self::verify_first_ledger_event(
2245 ctx,
2246 &first,
2247 hash,
2248 Metadata::from(self.clone()),
2249 )
2250 .await
2251 {
2252 return Err(ActorError::Functional {
2253 description: e.to_string(),
2254 });
2255 }
2256
2257 self.on_event(first.clone(), ctx).await;
2258 Self::register(
2259 ctx,
2260 RegisterMessage::RegisterGov {
2261 gov_id: self.subject_metadata.subject_id.to_string(),
2262 name: self.subject_metadata.name.clone(),
2263 description: self.subject_metadata.description.clone(),
2264 },
2265 )
2266 .await?;
2267
2268 self.first_role_register(ctx).await?;
2269
2270 let (issuer, event_request_timestamp) =
2271 first.get_issuer_event_request_timestamp();
2272 let event_request = first.get_event_request();
2273
2274 Self::event_to_sink(
2275 ctx,
2276 DataForSink {
2277 gov_id: None,
2278 subject_id: self.subject_metadata.subject_id.to_string(),
2279 sn: self.subject_metadata.sn,
2280 owner: self.subject_metadata.owner.to_string(),
2281 namespace: String::default(),
2282 schema_id: self.subject_metadata.schema_id.clone(),
2283 issuer,
2284 event_ledger_timestamp: first
2285 .ledger_seal_signature
2286 .timestamp
2287 .as_nanos(),
2288 event_request_timestamp,
2289 gov_version: first.gov_version,
2290 event_data_ledger: EventLedgerDataForSink::build(
2291 &first.protocols,
2292 &self.properties.to_value_wrapper().0,
2293 ),
2294 },
2295 event_request,
2296 )
2297 .await?;
2298
2299 first
2300 };
2301
2302 for event in iter {
2303 let actual_ledger_hash =
2304 last_ledger.ledger_hash(*hash).map_err(|e| {
2305 ActorError::FunctionalCritical {
2306 description: format!(
2307 "Can not creacte actual ledger event hash: {}",
2308 e
2309 ),
2310 }
2311 })?;
2312
2313 let last_data = LastData {
2314 gov_version: last_ledger.gov_version,
2315 vali_data: last_ledger.protocols.get_validation_data(),
2316 };
2317
2318 let last_event_is_ok = match Self::verify_new_ledger_event(
2319 ctx,
2320 Self::verify_new_ledger_event_args(
2321 &event,
2322 Metadata::from(self.clone()),
2323 actual_ledger_hash,
2324 last_data,
2325 hash,
2326 true,
2327 false,
2328 ),
2329 )
2330 .await
2331 {
2332 Ok(last_event_is_ok) => last_event_is_ok,
2333 Err(e) => {
2334 if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
2336 continue;
2338 } else {
2339 return Err(ActorError::Functional {
2340 description: e.to_string(),
2341 });
2342 }
2343 }
2344 };
2345
2346 let Some(event_request) = event.get_event_request() else {
2347 error!(
2348 subject_id = %self.subject_metadata.subject_id,
2349 sn = event.sn,
2350 "Governance replay event is missing clear event_request"
2351 );
2352 return Err(ActorError::Functional {
2353 description:
2354 "Governance replay event is missing clear event_request"
2355 .to_owned(),
2356 });
2357 };
2358
2359 let (update_fact, update_confirm) = if last_event_is_ok {
2360 match &event_request {
2361 EventRequest::Transfer(transfer_request) => {
2362 self.transfer(
2363 ctx,
2364 transfer_request.new_owner.clone(),
2365 0,
2366 )
2367 .await?;
2368
2369 self.update_gov_version(ctx).await?;
2370 }
2371 EventRequest::Reject(..) => {
2372 self.reject(ctx, 0).await?;
2373
2374 self.update_gov_version(ctx).await?;
2375 }
2376 EventRequest::EOL(..) => {
2377 self.eol(ctx).await?;
2378
2379 Self::register(
2380 ctx,
2381 RegisterMessage::EOLGov {
2382 gov_id: self
2383 .subject_metadata
2384 .subject_id
2385 .to_string(),
2386 },
2387 )
2388 .await?;
2389
2390 self.update_gov_version(ctx).await?;
2391 }
2392 _ => {}
2393 };
2394
2395 let update_confirm =
2396 if let EventRequest::Confirm(..) = &event_request {
2397 self.confirm(
2398 ctx,
2399 event.ledger_seal_signature.signer.clone(),
2400 0,
2401 )
2402 .await?;
2403
2404 if let Some(new_owner_key) =
2405 &self.subject_metadata.new_owner
2406 {
2407 Some(self.properties.roles_update_remove_confirm(
2408 &self.subject_metadata.owner,
2409 new_owner_key,
2410 ))
2411 } else {
2412 None
2413 }
2414 } else {
2415 None
2416 };
2417
2418 let update_fact = if let EventRequest::Fact(fact_request) =
2419 &event_request
2420 {
2421 let governance_event = serde_json::from_value::<GovernanceEvent>(fact_request.payload.0.clone()).map_err(|e| {
2422 ActorError::FunctionalCritical{description: format!("Can not convert payload into governance event in governance fact event: {}", e)}
2423 })?;
2424
2425 let rm_members = governance_event
2426 .members
2427 .as_ref()
2428 .map_or_else(|| None, |members| members.remove.clone());
2429
2430 let rm_schemas = governance_event
2431 .schemas
2432 .as_ref()
2433 .map_or_else(|| None, |schemas| schemas.remove.clone());
2434
2435 let rm_roles =
2436 if rm_members.is_some() || rm_schemas.is_some() {
2437 Some(self.properties.roles_update_remove_fact(
2438 rm_members, rm_schemas,
2439 ))
2440 } else {
2441 None
2442 };
2443
2444 let creator_update = governance_event_update_creator_change(
2445 &governance_event,
2446 &self.properties.members,
2447 &self.properties.roles_schema,
2448 );
2449
2450 Some((governance_event, creator_update, rm_roles))
2451 } else {
2452 None
2453 };
2454 (update_fact, update_confirm)
2455 } else {
2456 (None, None)
2457 };
2458
2459 self.on_event(event.clone(), ctx).await;
2461
2462 let (issuer, event_request_timestamp) =
2463 event.get_issuer_event_request_timestamp();
2464 Self::event_to_sink(
2465 ctx,
2466 DataForSink {
2467 gov_id: None,
2468 subject_id: self.subject_metadata.subject_id.to_string(),
2469 sn: self.subject_metadata.sn,
2470 owner: self.subject_metadata.owner.to_string(),
2471 namespace: String::default(),
2472 schema_id: self.subject_metadata.schema_id.clone(),
2473 issuer,
2474 event_ledger_timestamp: event
2475 .ledger_seal_signature
2476 .timestamp
2477 .as_nanos(),
2478 event_request_timestamp,
2479 gov_version: event.gov_version,
2480 event_data_ledger: EventLedgerDataForSink::build(
2481 &event.protocols,
2482 &self.properties.to_value_wrapper().0,
2483 ),
2484 },
2485 Some(event_request.clone()),
2486 )
2487 .await?;
2488
2489 if let Some((event, creator_update, rm_roles)) = update_fact {
2490 let update = governance_event_roles_update_fact(
2491 &event,
2492 &self.properties.members,
2493 rm_roles,
2494 );
2495
2496 self.update_registers_fact(ctx, update, creator_update)
2497 .await?;
2498 }
2499
2500 if let Some(update_confirm) = update_confirm {
2501 self.update_registers_confirm(ctx, update_confirm).await?;
2502 }
2503
2504 last_ledger = event.clone();
2506 }
2507
2508 Ok(())
2509 }
2510
2511 async fn delete_tracker_references(
2512 &self,
2513 ctx: &mut ActorContext<Self>,
2514 subject_id: DigestIdentifier,
2515 ) -> Result<(), ActorError> {
2516 let mut cleanup_errors = Vec::new();
2517
2518 let subject_register = match ctx
2519 .create_child("subject_register", SubjectRegister::initial(()))
2520 .await
2521 {
2522 Ok(actor) => Some(actor),
2523 Err(ActorError::Exists { .. }) => {
2524 match ctx.get_child::<SubjectRegister>("subject_register").await
2525 {
2526 Ok(actor) => Some(actor),
2527 Err(error) => {
2528 cleanup_errors
2529 .push(format!("subject_register lookup: {error}"));
2530 None
2531 }
2532 }
2533 }
2534 Err(error) => {
2535 cleanup_errors.push(format!("subject_register: {error}"));
2536 None
2537 }
2538 };
2539
2540 if let Some(subject_register) = subject_register {
2541 match subject_register
2542 .ask(SubjectRegisterMessage::DeleteSubject {
2543 subject_id: subject_id.clone(),
2544 })
2545 .await
2546 {
2547 Ok(SubjectRegisterResponse::Ok) => {}
2548 Ok(other) => cleanup_errors.push(format!(
2549 "subject_register: unexpected response {other:?}"
2550 )),
2551 Err(error) => {
2552 cleanup_errors.push(format!("subject_register: {error}"))
2553 }
2554 }
2555
2556 if let Err(error) = subject_register.ask_stop().await {
2557 cleanup_errors.push(format!("subject_register stop: {error}"));
2558 }
2559 }
2560
2561 let sn_register = match ctx
2562 .create_child("sn_register", SnRegister::initial(()))
2563 .await
2564 {
2565 Ok(actor) => Some(actor),
2566 Err(ActorError::Exists { .. }) => {
2567 match ctx.get_child::<SnRegister>("sn_register").await {
2568 Ok(actor) => Some(actor),
2569 Err(error) => {
2570 cleanup_errors
2571 .push(format!("sn_register lookup: {error}"));
2572 None
2573 }
2574 }
2575 }
2576 Err(error) => {
2577 cleanup_errors.push(format!("sn_register: {error}"));
2578 None
2579 }
2580 };
2581
2582 if let Some(sn_register) = sn_register {
2583 match sn_register
2584 .ask(SnRegisterMessage::DeleteSubject {
2585 subject_id: subject_id.clone(),
2586 })
2587 .await
2588 {
2589 Ok(SnRegisterResponse::Ok) => {}
2590 Ok(other) => cleanup_errors.push(format!(
2591 "sn_register: unexpected response {other:?}"
2592 )),
2593 Err(error) => {
2594 cleanup_errors.push(format!("sn_register: {error}"))
2595 }
2596 }
2597
2598 if let Err(error) = sn_register.ask_stop().await {
2599 cleanup_errors.push(format!("sn_register stop: {error}"));
2600 }
2601 }
2602
2603 let witnesses_register = match ctx
2604 .create_child(
2605 "witnesses_register",
2606 WitnessesRegister::initial(Self::ledger_batch_size(ctx).await?),
2607 )
2608 .await
2609 {
2610 Ok(actor) => Some(actor),
2611 Err(ActorError::Exists { .. }) => {
2612 match ctx
2613 .get_child::<WitnessesRegister>("witnesses_register")
2614 .await
2615 {
2616 Ok(actor) => Some(actor),
2617 Err(error) => {
2618 cleanup_errors.push(format!(
2619 "witnesses_register lookup: {error}"
2620 ));
2621 None
2622 }
2623 }
2624 }
2625 Err(error) => {
2626 cleanup_errors.push(format!("witnesses_register: {error}"));
2627 None
2628 }
2629 };
2630
2631 if let Some(witnesses_register) = witnesses_register {
2632 match witnesses_register
2633 .ask(WitnessesRegisterMessage::DeleteSubject {
2634 subject_id: subject_id.clone(),
2635 })
2636 .await
2637 {
2638 Ok(WitnessesRegisterResponse::Ok) => {}
2639 Ok(_) => cleanup_errors
2640 .push("witnesses_register: unexpected response".to_owned()),
2641 Err(error) => {
2642 cleanup_errors.push(format!("witnesses_register: {error}"))
2643 }
2644 }
2645
2646 if let Err(error) = witnesses_register.ask_stop().await {
2647 cleanup_errors
2648 .push(format!("witnesses_register stop: {error}"));
2649 }
2650 }
2651
2652 if cleanup_errors.is_empty() {
2653 Ok(())
2654 } else {
2655 Err(ActorError::Functional {
2656 description: cleanup_errors.join("; "),
2657 })
2658 }
2659 }
2660
2661 async fn delete_governance_storage(
2662 &self,
2663 ctx: &mut ActorContext<Self>,
2664 ) -> Result<(), ActorError> {
2665 let mut cleanup_errors = Vec::new();
2666
2667 if self.properties.has_this_role(HashThisRole::Gov {
2668 who: (*self.our_key).clone(),
2669 role: RoleTypes::Approver,
2670 }) {
2671 let hash = self.hash.map_or_else(
2672 || {
2673 cleanup_errors
2674 .push("approver init: missing hash".to_owned());
2675 None
2676 },
2677 Some,
2678 );
2679
2680 let network = ctx
2681 .system()
2682 .get_helper::<Arc<NetworkSender>>("network")
2683 .await
2684 .map_or_else(
2685 || {
2686 cleanup_errors.push(
2687 "approver init: missing network helper".to_owned(),
2688 );
2689 None
2690 },
2691 Some,
2692 );
2693
2694 if let (Some(hash), Some(network)) = (hash, network) {
2695 let approver = match ctx
2696 .get_child::<ApprPersist>("approver")
2697 .await
2698 {
2699 Ok(actor) => Some(actor),
2700 Err(_) => match self
2701 .up_approver_only(ctx, &hash, &network)
2702 .await
2703 {
2704 Ok(()) => match ctx
2705 .get_child::<ApprPersist>("approver")
2706 .await
2707 {
2708 Ok(actor) => Some(actor),
2709 Err(error) => {
2710 cleanup_errors
2711 .push(format!("approver lookup: {error}"));
2712 None
2713 }
2714 },
2715 Err(error) => {
2716 cleanup_errors.push(format!("approver: {error}"));
2717 None
2718 }
2719 },
2720 };
2721
2722 if let Some(approver) = approver {
2723 match approver
2724 .ask(crate::approval::persist::ApprPersistMessage::PurgeStorage)
2725 .await
2726 {
2727 Ok(crate::approval::persist::ApprPersistResponse::Ok) => {}
2728 Ok(_) => cleanup_errors
2729 .push("approver: unexpected response".to_owned()),
2730 Err(error) => {
2731 cleanup_errors.push(format!("approver: {error}"))
2732 }
2733 }
2734
2735 if let Err(error) = approver.ask_stop().await {
2736 cleanup_errors.push(format!("approver stop: {error}"));
2737 }
2738 }
2739 }
2740 }
2741
2742 let contract_register = match ctx
2743 .create_child("contract_register", ContractRegister::initial(()))
2744 .await
2745 {
2746 Ok(actor) => Some(actor),
2747 Err(ActorError::Exists { .. }) => {
2748 match ctx
2749 .get_child::<ContractRegister>("contract_register")
2750 .await
2751 {
2752 Ok(actor) => Some(actor),
2753 Err(error) => {
2754 cleanup_errors
2755 .push(format!("contract_register lookup: {error}"));
2756 None
2757 }
2758 }
2759 }
2760 Err(error) => {
2761 cleanup_errors.push(format!("contract_register: {error}"));
2762 None
2763 }
2764 };
2765
2766 if let Some(contract_register) = contract_register {
2767 if let Err(error) = self
2768 .delete_all_contract_artifacts(ctx, &contract_register)
2769 .await
2770 {
2771 cleanup_errors.push(format!("contract_artifacts: {error}"));
2772 }
2773
2774 match contract_register
2775 .ask(ContractRegisterMessage::PurgeStorage)
2776 .await
2777 {
2778 Ok(ContractRegisterResponse::Ok) => {}
2779 Ok(other) => cleanup_errors.push(format!(
2780 "contract_register: unexpected response {other:?}"
2781 )),
2782 Err(error) => {
2783 cleanup_errors.push(format!("contract_register: {error}"))
2784 }
2785 }
2786
2787 if let Err(error) = contract_register.ask_stop().await {
2788 cleanup_errors.push(format!("contract_register stop: {error}"));
2789 }
2790 }
2791
2792 let role_register = match ctx
2793 .create_child("role_register", RoleRegister::initial(()))
2794 .await
2795 {
2796 Ok(actor) => Some(actor),
2797 Err(ActorError::Exists { .. }) => {
2798 match ctx.get_child::<RoleRegister>("role_register").await {
2799 Ok(actor) => Some(actor),
2800 Err(error) => {
2801 cleanup_errors
2802 .push(format!("role_register lookup: {error}"));
2803 None
2804 }
2805 }
2806 }
2807 Err(error) => {
2808 cleanup_errors.push(format!("role_register: {error}"));
2809 None
2810 }
2811 };
2812
2813 if let Some(role_register) = role_register {
2814 match role_register.ask(RoleRegisterMessage::PurgeStorage).await {
2815 Ok(RoleRegisterResponse::Ok) => {}
2816 Ok(other) => cleanup_errors.push(format!(
2817 "role_register: unexpected response {other:?}"
2818 )),
2819 Err(error) => {
2820 cleanup_errors.push(format!("role_register: {error}"))
2821 }
2822 }
2823
2824 if let Err(error) = role_register.ask_stop().await {
2825 cleanup_errors.push(format!("role_register stop: {error}"));
2826 }
2827 }
2828
2829 let subject_register = match ctx
2830 .create_child("subject_register", SubjectRegister::initial(()))
2831 .await
2832 {
2833 Ok(actor) => Some(actor),
2834 Err(ActorError::Exists { .. }) => {
2835 match ctx.get_child::<SubjectRegister>("subject_register").await
2836 {
2837 Ok(actor) => Some(actor),
2838 Err(error) => {
2839 cleanup_errors
2840 .push(format!("subject_register lookup: {error}"));
2841 None
2842 }
2843 }
2844 }
2845 Err(error) => {
2846 cleanup_errors.push(format!("subject_register: {error}"));
2847 None
2848 }
2849 };
2850
2851 if let Some(subject_register) = subject_register {
2852 match subject_register
2853 .ask(SubjectRegisterMessage::PurgeStorage)
2854 .await
2855 {
2856 Ok(SubjectRegisterResponse::Ok) => {}
2857 Ok(other) => cleanup_errors.push(format!(
2858 "subject_register: unexpected response {other:?}"
2859 )),
2860 Err(error) => {
2861 cleanup_errors.push(format!("subject_register: {error}"))
2862 }
2863 }
2864
2865 if let Err(error) = subject_register.ask_stop().await {
2866 cleanup_errors.push(format!("subject_register stop: {error}"));
2867 }
2868 }
2869
2870 let sn_register = match ctx
2871 .create_child("sn_register", SnRegister::initial(()))
2872 .await
2873 {
2874 Ok(actor) => Some(actor),
2875 Err(ActorError::Exists { .. }) => {
2876 match ctx.get_child::<SnRegister>("sn_register").await {
2877 Ok(actor) => Some(actor),
2878 Err(error) => {
2879 cleanup_errors
2880 .push(format!("sn_register lookup: {error}"));
2881 None
2882 }
2883 }
2884 }
2885 Err(error) => {
2886 cleanup_errors.push(format!("sn_register: {error}"));
2887 None
2888 }
2889 };
2890
2891 if let Some(sn_register) = sn_register {
2892 match sn_register.ask(SnRegisterMessage::PurgeStorage).await {
2893 Ok(SnRegisterResponse::Ok) => {}
2894 Ok(other) => cleanup_errors.push(format!(
2895 "sn_register: unexpected response {other:?}"
2896 )),
2897 Err(error) => {
2898 cleanup_errors.push(format!("sn_register: {error}"))
2899 }
2900 }
2901
2902 if let Err(error) = sn_register.ask_stop().await {
2903 cleanup_errors.push(format!("sn_register stop: {error}"));
2904 }
2905 }
2906
2907 let witnesses_register = match ctx
2908 .create_child(
2909 "witnesses_register",
2910 WitnessesRegister::initial(Self::ledger_batch_size(ctx).await?),
2911 )
2912 .await
2913 {
2914 Ok(actor) => Some(actor),
2915 Err(ActorError::Exists { .. }) => {
2916 match ctx
2917 .get_child::<WitnessesRegister>("witnesses_register")
2918 .await
2919 {
2920 Ok(actor) => Some(actor),
2921 Err(error) => {
2922 cleanup_errors.push(format!(
2923 "witnesses_register lookup: {error}"
2924 ));
2925 None
2926 }
2927 }
2928 }
2929 Err(error) => {
2930 cleanup_errors.push(format!("witnesses_register: {error}"));
2931 None
2932 }
2933 };
2934
2935 if let Some(witnesses_register) = witnesses_register {
2936 match witnesses_register
2937 .ask(WitnessesRegisterMessage::PurgeStorage)
2938 .await
2939 {
2940 Ok(WitnessesRegisterResponse::Ok) => {}
2941 Ok(_) => cleanup_errors
2942 .push("witnesses_register: unexpected response".to_owned()),
2943 Err(error) => {
2944 cleanup_errors.push(format!("witnesses_register: {error}"))
2945 }
2946 }
2947
2948 if let Err(error) = witnesses_register.ask_stop().await {
2949 cleanup_errors
2950 .push(format!("witnesses_register stop: {error}"));
2951 }
2952 }
2953
2954 if let Err(error) = purge_storage(ctx).await {
2955 cleanup_errors.push(format!("governance: {error}"));
2956 }
2957
2958 if cleanup_errors.is_empty() {
2959 Ok(())
2960 } else {
2961 Err(ActorError::Functional {
2962 description: cleanup_errors.join("; "),
2963 })
2964 }
2965 }
2966}
2967
2968#[derive(Debug, Clone)]
2970pub enum GovernanceMessage {
2971 GetMetadata,
2972 GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
2973 GetLastLedger,
2974 DeleteTrackerReferences { subject_id: DigestIdentifier },
2975 DeleteGovernanceStorage,
2976 UpdateLedger { events: Vec<Ledger> },
2977 GetGovernance,
2978 GetVersion,
2979}
2980
2981impl Message for GovernanceMessage {}
2982
2983#[derive(Debug, Clone)]
2984pub enum GovernanceResponse {
2985 Metadata(Box<Metadata>),
2987 UpdateResult(u64, PublicKey, Option<PublicKey>),
2988 Ledger {
2989 ledger: Vec<Ledger>,
2990 is_all: bool,
2991 },
2992 LastLedger {
2993 ledger_event: Box<Option<Ledger>>,
2994 },
2995 Governance(Box<GovernanceData>),
2996 NewCompilers(Vec<SchemaType>),
2997 Sn(u64),
2998 Version(u64),
2999 Ok,
3000}
3001impl Response for GovernanceResponse {}
3002
3003#[async_trait]
3004impl Actor for Governance {
3005 type Event = Ledger;
3006 type Message = GovernanceMessage;
3007 type Response = GovernanceResponse;
3008
3009 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
3010 parent_span.map_or_else(
3011 || info_span!("Governance", id),
3012 |parent_span| info_span!(parent: parent_span, "Governance", id),
3013 )
3014 }
3015
3016 async fn pre_start(
3017 &mut self,
3018 ctx: &mut ActorContext<Self>,
3019 ) -> Result<(), ActorError> {
3020 if let Err(e) = self.init_store("governance", None, true, ctx).await {
3021 error!(
3022 error = %e,
3023 "Failed to initialize governance store"
3024 );
3025 return Err(e);
3026 }
3027
3028 let safe_mode = if let Some(config) =
3029 ctx.system().get_helper::<ConfigHelper>("config").await
3030 {
3031 config.safe_mode
3032 } else {
3033 return Err(ActorError::Helper {
3034 name: "config".to_owned(),
3035 reason: "Not found".to_owned(),
3036 });
3037 };
3038
3039 if safe_mode {
3040 let Some(hash) = self.hash else {
3041 error!("Hash algorithm not found");
3042 return Err(ActorError::FunctionalCritical {
3043 description: "Hash algorithm is None".to_string(),
3044 });
3045 };
3046
3047 let Some(network) = ctx
3048 .system()
3049 .get_helper::<Arc<NetworkSender>>("network")
3050 .await
3051 else {
3052 error!("Network helper not found");
3053 return Err(ActorError::Helper {
3054 name: "network".to_owned(),
3055 reason: "Not found".to_owned(),
3056 });
3057 };
3058
3059 self.up_approver_only(ctx, &hash, &network).await?;
3060 return Ok(());
3061 }
3062
3063 let Some(hash) = self.hash else {
3064 error!("Hash algorithm not found");
3065 return Err(ActorError::FunctionalCritical {
3066 description: "Hash algorithm is None".to_string(),
3067 });
3068 };
3069
3070 let Some(ext_db): Option<Arc<ExternalDB>> =
3071 ctx.system().get_helper("ext_db").await
3072 else {
3073 error!("External database helper not found");
3074 return Err(ActorError::Helper {
3075 name: "ext_db".to_owned(),
3076 reason: "Not found".to_owned(),
3077 });
3078 };
3079
3080 let Some(ave_sink): Option<AveSink> =
3081 ctx.system().get_helper("sink").await
3082 else {
3083 error!("Sink helper not found");
3084 return Err(ActorError::Helper {
3085 name: "sink".to_owned(),
3086 reason: "Not found".to_owned(),
3087 });
3088 };
3089
3090 let Some(network) = ctx
3091 .system()
3092 .get_helper::<Arc<NetworkSender>>("network")
3093 .await
3094 else {
3095 error!("Network helper not found");
3096 return Err(ActorError::Helper {
3097 name: "network".to_owned(),
3098 reason: "Not found".to_owned(),
3099 });
3100 };
3101
3102 if let Err(e) = ctx
3103 .create_child("role_register", RoleRegister::initial(()))
3104 .await
3105 {
3106 error!(
3107 error = %e,
3108 "Failed to create role_register child"
3109 );
3110 return Err(e);
3111 }
3112
3113 if let Err(e) = ctx
3114 .create_child("subject_register", SubjectRegister::initial(()))
3115 .await
3116 {
3117 error!(
3118 error = %e,
3119 "Failed to create subject_register child"
3120 );
3121 return Err(e);
3122 }
3123
3124 if let Err(e) = ctx
3125 .create_child("sn_register", SnRegister::initial(()))
3126 .await
3127 {
3128 error!(
3129 error = %e,
3130 "Failed to create sn_register child"
3131 );
3132 return Err(e);
3133 }
3134
3135 if let Err(e) = ctx
3136 .create_child(
3137 "witnesses_register",
3138 WitnessesRegister::initial(Self::ledger_batch_size(ctx).await?),
3139 )
3140 .await
3141 {
3142 error!(
3143 error = %e,
3144 "Failed to create witnesses_register child"
3145 );
3146 return Err(e);
3147 }
3148
3149 if let Err(e) = ctx
3150 .create_child("contract_register", ContractRegister::initial(()))
3151 .await
3152 {
3153 error!(
3154 error = %e,
3155 "Failed to create contract_register child"
3156 );
3157 return Err(e);
3158 }
3159
3160 if self.subject_metadata.active {
3161 if let Err(e) = self.build_childs(ctx, &hash, &network).await {
3162 error!(
3163 error = %e,
3164 "Failed to build governance child actors"
3165 );
3166 return Err(e);
3167 }
3168
3169 let sink_actor = match ctx
3170 .create_child(
3171 "sink_data",
3172 SinkData {
3173 public_key: self.our_key.to_string(),
3174 },
3175 )
3176 .await
3177 {
3178 Ok(actor) => actor,
3179 Err(e) => {
3180 error!(
3181 error = %e,
3182 "Failed to create sink_data child"
3183 );
3184 return Err(e);
3185 }
3186 };
3187 let sink =
3188 Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
3189 ctx.system().run_sink(sink).await;
3190
3191 let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
3192 ctx.system().run_sink(sink).await;
3193 }
3194
3195 if self.service {
3196 let Some(config): Option<ConfigHelper> =
3197 ctx.system().get_helper("config").await
3198 else {
3199 error!("Config helper not found");
3200 return Err(ActorError::Helper {
3201 name: "config".to_owned(),
3202 reason: "Not found".to_owned(),
3203 });
3204 };
3205
3206 let version_sync_tick_interval = Duration::from_secs(
3207 config.sync_governance.interval_secs.max(1),
3208 );
3209 let version_sync_response_timeout = Duration::from_secs(
3210 config.sync_governance.response_timeout_secs.max(1),
3211 );
3212 let tracker_sync_tick_interval =
3213 Duration::from_secs(config.sync_tracker.interval_secs.max(1));
3214 let tracker_sync_response_timeout = Duration::from_secs(
3215 config.sync_tracker.response_timeout_secs.max(1),
3216 );
3217 let tracker_sync_update_timeout = Duration::from_secs(
3218 config.sync_tracker.update_timeout_secs.max(1),
3219 );
3220
3221 if let Err(e) = ctx
3222 .create_child(
3223 "tracker_sync",
3224 TrackerSync::new(
3225 self.subject_metadata.subject_id.clone(),
3226 self.our_key.clone(),
3227 network.clone(),
3228 TrackerSyncConfig {
3229 service: self.service,
3230 tick_interval: tracker_sync_tick_interval,
3231 response_timeout: tracker_sync_response_timeout,
3232 page_size: config.sync_tracker.page_size,
3233 update_batch_size: config
3234 .sync_tracker
3235 .update_batch_size,
3236 update_timeout: tracker_sync_update_timeout,
3237 },
3238 ),
3239 )
3240 .await
3241 {
3242 error!(
3243 error = %e,
3244 subject_id = %self.subject_metadata.subject_id,
3245 "Failed to create tracker_sync child"
3246 );
3247 return Err(e);
3248 }
3249
3250 let version_sync = ctx
3251 .create_child(
3252 "version_sync",
3253 GovernanceVersionSync::new(
3254 self.subject_metadata.subject_id.clone(),
3255 self.our_key.clone(),
3256 network.clone(),
3257 self.properties.version,
3258 config.sync_governance.sample_size,
3259 version_sync_tick_interval,
3260 version_sync_response_timeout,
3261 ),
3262 )
3263 .await?;
3264
3265 let governance_peers = self
3266 .properties
3267 .get_witnesses(WitnessesData::Gov)
3268 .map_err(|e| ActorError::Functional {
3269 description: e.to_string(),
3270 })?;
3271
3272 let _ = version_sync
3273 .ask(GovernanceVersionSyncMessage::RefreshGovernance {
3274 version: self.properties.version,
3275 governance_peers,
3276 })
3277 .await?;
3278 }
3279
3280 Ok(())
3281 }
3282}
3283
3284#[async_trait]
3285impl Handler<Self> for Governance {
3286 async fn handle_message(
3287 &mut self,
3288 _sender: ActorPath,
3289 msg: GovernanceMessage,
3290 ctx: &mut ActorContext<Self>,
3291 ) -> Result<GovernanceResponse, ActorError> {
3292 match msg {
3293 GovernanceMessage::GetVersion => {
3294 Ok(GovernanceResponse::Version(self.properties.version))
3295 }
3296 GovernanceMessage::GetLedger { lo_sn, hi_sn } => {
3297 let (ledger, is_all) =
3298 self.get_ledger(ctx, lo_sn, hi_sn).await?;
3299 Ok(GovernanceResponse::Ledger { ledger, is_all })
3300 }
3301 GovernanceMessage::GetLastLedger => {
3302 let ledger_event = self.get_last_ledger(ctx).await?;
3303 Ok(GovernanceResponse::LastLedger {
3304 ledger_event: Box::new(ledger_event),
3305 })
3306 }
3307 GovernanceMessage::GetMetadata => Ok(GovernanceResponse::Metadata(
3308 Box::new(Metadata::from(self.clone())),
3309 )),
3310 GovernanceMessage::DeleteTrackerReferences { subject_id } => {
3311 self.delete_tracker_references(ctx, subject_id.clone())
3312 .await?;
3313
3314 debug!(
3315 msg_type = "DeleteTrackerReferences",
3316 subject_id = %subject_id,
3317 governance_id = %self.subject_metadata.subject_id,
3318 "Tracker references deleted from governance"
3319 );
3320
3321 Ok(GovernanceResponse::Ok)
3322 }
3323 GovernanceMessage::DeleteGovernanceStorage => {
3324 self.delete_governance_storage(ctx).await?;
3325
3326 debug!(
3327 msg_type = "DeleteGovernanceStorage",
3328 governance_id = %self.subject_metadata.subject_id,
3329 "Governance storage deleted"
3330 );
3331
3332 Ok(GovernanceResponse::Ok)
3333 }
3334 GovernanceMessage::UpdateLedger { events } => {
3335 let events_count = events.len();
3336 if let Err(e) =
3337 self.manager_new_ledger_events(ctx, events).await
3338 {
3339 warn!(
3340 msg_type = "UpdateLedger",
3341 error = %e,
3342 subject_id = %self.subject_metadata.subject_id,
3343 events_count = events_count,
3344 "Failed to verify new ledger events"
3345 );
3346 return Err(e);
3347 };
3348
3349 debug!(
3350 msg_type = "UpdateLedger",
3351 subject_id = %self.subject_metadata.subject_id,
3352 sn = self.subject_metadata.sn,
3353 events_count = events_count,
3354 "Ledger updated successfully"
3355 );
3356
3357 Ok(GovernanceResponse::UpdateResult(
3358 self.subject_metadata.sn,
3359 self.subject_metadata.owner.clone(),
3360 self.subject_metadata.new_owner.clone(),
3361 ))
3362 }
3363 GovernanceMessage::GetGovernance => {
3364 Ok(GovernanceResponse::Governance(Box::new(
3365 self.properties.clone(),
3366 )))
3367 }
3368 }
3369 }
3370
3371 async fn on_event(&mut self, event: Ledger, ctx: &mut ActorContext<Self>) {
3372 if let Err(e) = self.persist(&event, ctx).await {
3373 error!(
3374 error = %e,
3375 subject_id = %self.subject_metadata.subject_id,
3376 sn = self.subject_metadata.sn,
3377 "Failed to persist event"
3378 );
3379 emit_fail(ctx, e).await;
3380 };
3381
3382 if let Err(e) = ctx.publish_event(event).await {
3383 error!(
3384 error = %e,
3385 subject_id = %self.subject_metadata.subject_id,
3386 sn = self.subject_metadata.sn,
3387 "Failed to publish event"
3388 );
3389 emit_fail(ctx, e).await;
3390 } else {
3391 debug!(
3392 subject_id = %self.subject_metadata.subject_id,
3393 sn = self.subject_metadata.sn,
3394 "Event persisted and published successfully"
3395 );
3396 }
3397 }
3398
3399 async fn on_child_fault(
3400 &mut self,
3401 error: ActorError,
3402 ctx: &mut ActorContext<Self>,
3403 ) -> ChildAction {
3404 error!(
3405 error = %error,
3406 subject_id = %self.subject_metadata.subject_id,
3407 "Child fault occurred"
3408 );
3409 emit_fail(ctx, error).await;
3410 ChildAction::Stop
3411 }
3412}
3413
3414#[async_trait]
3415impl PersistentActor for Governance {
3416 type Persistence = FullPersistence;
3417 type InitParams = (
3418 Option<(SubjectMetadata, GovernanceData)>,
3419 Arc<PublicKey>,
3420 HashAlgorithm,
3421 bool,
3422 );
3423
3424 fn update(&mut self, state: Self) {
3425 self.properties = state.properties;
3426 self.subject_metadata = state.subject_metadata;
3427 }
3428
3429 fn create_initial(params: Self::InitParams) -> Self {
3430 let (subject_metadata, properties) =
3431 if let Some((subject_metadata, properties)) = params.0 {
3432 (subject_metadata, properties)
3433 } else {
3434 Default::default()
3435 };
3436 Self {
3437 hash: Some(params.2),
3438 our_key: params.1,
3439 service: params.3,
3440 subject_metadata,
3441 properties,
3442 }
3443 }
3444
3445 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3446 match &event.protocols {
3447 Protocols::Create {
3448 validation,
3449 event_request,
3450 } => {
3451 if let EventRequest::Create(..) = event_request.content() {
3452 } else {
3453 error!(
3454 event_type = "Create",
3455 subject_id = %self.subject_metadata.subject_id,
3456 actual_request = ?event_request.content(),
3457 "Unexpected event request type for governance create apply"
3458 );
3459 return Err(ActorError::Functional {
3460 description:
3461 "In create event, event request must be Create"
3462 .to_owned(),
3463 });
3464 };
3465
3466 if let ValidationMetadata::Metadata(metadata) =
3467 &validation.validation_metadata
3468 {
3469 self.subject_metadata = SubjectMetadata::new(metadata);
3470 self.properties = serde_json::from_value::<GovernanceData>(
3471 metadata.properties.0.clone(),
3472 )
3473 .map_err(|e| {
3474 error!(
3475 event_type = "Create",
3476 subject_id = %self.subject_metadata.subject_id,
3477 error = %e,
3478 "Failed to convert properties into GovernanceData"
3479 );
3480 ActorError::Functional { description: format!("In create event, can not convert properties into GovernanceData: {e}")}
3481 })?;
3482
3483 debug!(
3484 event_type = "Create",
3485 subject_id = %self.subject_metadata.subject_id,
3486 sn = self.subject_metadata.sn,
3487 "Applied create event"
3488 );
3489 } else {
3490 error!(
3491 event_type = "Create",
3492 "Validation metadata must be Metadata type"
3493 );
3494 return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
3495 }
3496
3497 return Ok(());
3498 }
3499 Protocols::GovFact {
3500 evaluation,
3501 approval,
3502 event_request,
3503 ..
3504 } => {
3505 if let EventRequest::Fact(..) = event_request.content() {
3506 } else {
3507 error!(
3508 event_type = "Fact",
3509 subject_id = %self.subject_metadata.subject_id,
3510 actual_request = ?event_request.content(),
3511 "Unexpected event request type for governance fact apply"
3512 );
3513 return Err(ActorError::Functional {
3514 description:
3515 "In fact event, event request must be Fact"
3516 .to_owned(),
3517 });
3518 };
3519
3520 if let Some(eval_res) = evaluation.evaluator_response_ok() {
3521 if let Some(appr_res) = approval {
3522 if appr_res.approved {
3523 self.apply_patch(eval_res.patch)?;
3524 debug!(
3525 event_type = "Fact",
3526 subject_id = %self.subject_metadata.subject_id,
3527 approved = true,
3528 "Applied fact event with patch"
3529 );
3530 } else {
3531 debug!(
3532 event_type = "Fact",
3533 subject_id = %self.subject_metadata.subject_id,
3534 approved = false,
3535 "Fact event not approved, patch not applied"
3536 );
3537 }
3538 } else {
3539 error!(
3540 event_type = "Fact",
3541 subject_id = %self.subject_metadata.subject_id,
3542 "Evaluation successful but no approval present"
3543 );
3544 return Err(ActorError::Functional { description: "The evaluation event was successful, but there is no approval".to_owned() });
3545 }
3546 }
3547 }
3548
3549 Protocols::Transfer {
3550 evaluation,
3551 event_request,
3552 ..
3553 } => {
3554 let EventRequest::Transfer(transfer_request) =
3555 event_request.content()
3556 else {
3557 error!(
3558 event_type = "Transfer",
3559 subject_id = %self.subject_metadata.subject_id,
3560 actual_request = ?event_request.content(),
3561 "Unexpected event request type for governance transfer apply"
3562 );
3563 return Err(ActorError::Functional {
3564 description:
3565 "In transfer event, event request must be Transfer"
3566 .to_owned(),
3567 });
3568 };
3569
3570 if evaluation.evaluator_response_ok().is_some() {
3571 self.subject_metadata.new_owner =
3572 Some(transfer_request.new_owner.clone());
3573 debug!(
3574 event_type = "Transfer",
3575 subject_id = %self.subject_metadata.subject_id,
3576 new_owner = %transfer_request.new_owner,
3577 "Applied transfer event"
3578 );
3579 }
3580 }
3581
3582 Protocols::GovConfirm {
3583 evaluation,
3584 event_request,
3585 ..
3586 } => {
3587 if let EventRequest::Confirm(..) = event_request.content() {
3588 } else {
3589 error!(
3590 event_type = "Confirm",
3591 subject_id = %self.subject_metadata.subject_id,
3592 actual_request = ?event_request.content(),
3593 "Unexpected event request type for governance confirm apply"
3594 );
3595 return Err(ActorError::Functional {
3596 description:
3597 "In confirm event, event request must be Confirm"
3598 .to_owned(),
3599 });
3600 };
3601
3602 if let Some(eval_res) = evaluation.evaluator_response_ok() {
3603 if let Some(new_owner) =
3604 self.subject_metadata.new_owner.take()
3605 {
3606 self.subject_metadata.owner = new_owner.clone();
3607 self.apply_patch(eval_res.patch)?;
3608 debug!(
3609 event_type = "Confirm",
3610 subject_id = %self.subject_metadata.subject_id,
3611 new_owner = %new_owner,
3612 "Applied confirm event with patch"
3613 );
3614 } else {
3615 error!(
3616 event_type = "Confirm",
3617 subject_id = %self.subject_metadata.subject_id,
3618 "New owner is None in confirm event"
3619 );
3620 return Err(ActorError::Functional {
3621 description: "In confirm event, new owner is None"
3622 .to_owned(),
3623 });
3624 }
3625 }
3626 }
3627 Protocols::Reject { event_request, .. } => {
3628 if let EventRequest::Reject(..) = event_request.content() {
3629 } else {
3630 error!(
3631 event_type = "Reject",
3632 subject_id = %self.subject_metadata.subject_id,
3633 actual_request = ?event_request.content(),
3634 "Unexpected event request type for governance reject apply"
3635 );
3636 return Err(ActorError::Functional {
3637 description:
3638 "In reject event, event request must be Reject"
3639 .to_owned(),
3640 });
3641 };
3642
3643 self.subject_metadata.new_owner = None;
3644 debug!(
3645 event_type = "Reject",
3646 subject_id = %self.subject_metadata.subject_id,
3647 "Applied reject event"
3648 );
3649 }
3650 Protocols::EOL { event_request, .. } => {
3651 if let EventRequest::EOL(..) = event_request.content() {
3652 } else {
3653 error!(
3654 event_type = "EOL",
3655 subject_id = %self.subject_metadata.subject_id,
3656 actual_request = ?event_request.content(),
3657 "Unexpected event request type for governance eol apply"
3658 );
3659 return Err(ActorError::Functional {
3660 description: "In EOL event, event request must be EOL"
3661 .to_owned(),
3662 });
3663 };
3664
3665 self.subject_metadata.active = false;
3666 debug!(
3667 event_type = "EOL",
3668 subject_id = %self.subject_metadata.subject_id,
3669 "Applied EOL event"
3670 );
3671 }
3672 _ => {
3673 error!(
3674 subject_id = %self.subject_metadata.subject_id,
3675 "Invalid protocol data for Governance"
3676 );
3677 return Err(ActorError::Functional {
3678 description: "Invalid protocol data for Governance"
3679 .to_owned(),
3680 });
3681 }
3682 }
3683
3684 if event.protocols.is_success() {
3685 self.properties.version += 1;
3686 }
3687
3688 self.subject_metadata.sn += 1;
3689 self.subject_metadata.prev_ledger_event_hash =
3690 event.prev_ledger_event_hash.clone();
3691
3692 Ok(())
3693 }
3694}
3695
3696impl Storable for Governance {}