1use crate::{
5 approval::{
6 persist::{ApprPersist, InitApprPersist},
7 types::VotationType,
8 },
9 db::Storable,
10 evaluation::{
11 compiler::{
12 CompilerResponse, ContractCompiler, ContractCompilerMessage,
13 },
14 schema::{EvaluationSchema, EvaluationSchemaMessage},
15 worker::{EvalWorker, EvalWorkerMessage},
16 },
17 governance::{
18 contract_register::{
19 ContractRegister, ContractRegisterMessage, ContractRegisterResponse,
20 },
21 data::GovernanceData,
22 events::{
23 GovernanceEvent, governance_event_roles_update_fact,
24 governance_event_update_creator_change,
25 },
26 model::{
27 CreatorQuantity, HashThisRole, ProtocolTypes, Quorum, RoleTypes,
28 Schema, WitnessesData,
29 },
30 role_register::{
31 CurrentValidationRoles, RoleRegister, RoleRegisterMessage,
32 RoleRegisterResponse,
33 },
34 sn_register::{SnRegister, SnRegisterMessage, SnRegisterResponse},
35 subject_register::{
36 SubjectRegister, SubjectRegisterMessage, SubjectRegisterResponse,
37 },
38 tracker_sync::{TrackerSync, TrackerSyncConfig},
39 version_sync::{GovernanceVersionSync, GovernanceVersionSyncMessage},
40 witnesses_register::{
41 WitnessesRegister, WitnessesRegisterMessage,
42 WitnessesRegisterResponse, WitnessesType,
43 },
44 },
45 helpers::{db::ExternalDB, network::service::NetworkSender, sink::AveSink},
46 model::{
47 common::{
48 emit_fail, get_last_event, purge_storage, subject::make_obsolete,
49 },
50 event::{Protocols, ValidationMetadata},
51 },
52 node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
53 subject::{
54 DataForSink, EventLedgerDataForSink, Metadata, SignedLedger, Subject,
55 SubjectMetadata,
56 error::SubjectError,
57 sinkdata::{SinkData, SinkDataMessage},
58 },
59 system::ConfigHelper,
60 validation::{
61 request::LastData,
62 schema::{ValidationSchema, ValidationSchemaMessage},
63 worker::{ValiWorker, ValiWorkerMessage},
64 },
65};
66
67use ave_actors::{
68 Actor, ActorContext, ActorError, ActorPath, ActorRef, ChildAction, Handler,
69 Message, Response, Sink,
70};
71use ave_common::{
72 Namespace, SchemaType, ValueWrapper,
73 identity::{DigestIdentifier, HashAlgorithm, PublicKey, hash_borsh},
74 request::EventRequest,
75 schematype::ReservedWords,
76};
77
78use async_trait::async_trait;
79use ave_actors::{FullPersistence, PersistentActor};
80use borsh::{BorshDeserialize, BorshSerialize};
81use json_patch::{Patch, patch};
82use serde::{Deserialize, Serialize};
83use tracing::{Span, debug, error, info_span, warn};
84
85use std::{
86 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
87 sync::Arc,
88 time::Duration,
89};
90use tokio::{fs, sync::RwLock};
91use wasmtime::Module;
92
93pub mod contract_register;
94pub mod data;
95pub mod error;
96pub mod events;
97pub mod model;
98pub mod role_register;
99pub mod sn_register;
100pub mod subject_register;
101pub mod tracker_sync;
102pub mod version_sync;
103pub mod witnesses_register;
104
105pub struct RolesUpdate {
106 pub appr_quorum: Option<Quorum>,
107 pub new_approvers: Vec<PublicKey>,
108 pub remove_approvers: Vec<PublicKey>,
109
110 pub eval_quorum: HashMap<SchemaType, Quorum>,
111 pub new_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
112 pub remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
113
114 pub vali_quorum: HashMap<SchemaType, Quorum>,
115 pub new_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
116 pub remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
117
118 pub new_creator: HashMap<
119 (SchemaType, String, PublicKey),
120 (CreatorQuantity, Vec<WitnessesType>),
121 >,
122 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
123
124 pub new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
125 pub remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
126}
127
128pub struct RolesUpdateConfirm {
129 pub new_approver: Option<PublicKey>,
130 pub remove_approver: PublicKey,
131
132 pub new_evaluator: Option<PublicKey>,
133 pub remove_evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
134
135 pub new_validator: Option<PublicKey>,
136 pub remove_validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
137
138 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
139 pub remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
140}
141
142pub struct RolesUpdateRemove {
143 pub witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
144 pub creator: HashSet<(SchemaType, String, PublicKey)>,
145 pub approvers: Vec<PublicKey>,
146 pub evaluators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
147 pub validators: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
148}
149
150pub struct CreatorRoleUpdate {
151 pub new_creator: HashMap<
152 (SchemaType, String, PublicKey),
153 (CreatorQuantity, BTreeSet<String>),
154 >,
155
156 pub update_creator_quantity:
157 HashSet<(SchemaType, String, PublicKey, CreatorQuantity)>,
158
159 pub update_creator_witnesses:
160 HashSet<(SchemaType, String, PublicKey, BTreeSet<String>)>,
161
162 pub remove_creator: HashSet<(SchemaType, String, PublicKey)>,
163}
164
165#[derive(Default, Debug, Serialize, Deserialize, Clone)]
166pub struct Governance {
167 #[serde(skip)]
168 pub our_key: Arc<PublicKey>,
169 #[serde(skip)]
170 pub service: bool,
171 #[serde(skip)]
172 pub hash: Option<HashAlgorithm>,
173 pub subject_metadata: SubjectMetadata,
174 pub properties: GovernanceData,
175}
176
177impl BorshSerialize for Governance {
178 fn serialize<W: std::io::Write>(
179 &self,
180 writer: &mut W,
181 ) -> std::io::Result<()> {
182 BorshSerialize::serialize(&self.subject_metadata, writer)?;
184 BorshSerialize::serialize(&self.properties, writer)?;
185
186 Ok(())
187 }
188}
189
190impl BorshDeserialize for Governance {
191 fn deserialize_reader<R: std::io::Read>(
192 reader: &mut R,
193 ) -> std::io::Result<Self> {
194 let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
196 let properties = GovernanceData::deserialize_reader(reader)?;
197
198 let our_key = Arc::new(PublicKey::default());
201 let hash = None;
202
203 Ok(Self {
204 hash,
205 our_key,
206 service: false,
207 subject_metadata,
208 properties,
209 })
210 }
211}
212
213#[async_trait]
214impl Subject for Governance {
215 async fn update_sn(
216 &self,
217 ctx: &mut ActorContext<Self>,
218 ) -> Result<(), ActorError> {
219 let witnesses_register = ctx
220 .get_child::<WitnessesRegister>("witnesses_register")
221 .await?;
222
223 witnesses_register
224 .tell(WitnessesRegisterMessage::UpdateSnGov {
225 sn: self.subject_metadata.sn,
226 })
227 .await
228 }
229
230 async fn eol(
231 &self,
232 ctx: &mut ActorContext<Self>,
233 ) -> Result<(), ActorError> {
234 let node_path = ActorPath::from("/user/node");
235 let node = ctx.system().get_actor::<Node>(&node_path).await?;
236 node.tell(NodeMessage::EOLSubject {
237 subject_id: self.subject_metadata.subject_id.clone(),
238 i_owner: *self.our_key == self.subject_metadata.owner,
239 })
240 .await
241 }
242
243 async fn reject(
244 &self,
245 ctx: &mut ActorContext<Self>,
246 _gov_version: u64,
247 ) -> Result<(), ActorError> {
248 let node_path = ActorPath::from("/user/node");
249 let node = ctx.system().get_actor::<Node>(&node_path).await?;
250 node.tell(NodeMessage::RejectTransfer(
251 self.subject_metadata.subject_id.clone(),
252 ))
253 .await
254 }
255
256 async fn confirm(
257 &self,
258 ctx: &mut ActorContext<Self>,
259 _new_owner: PublicKey,
260 _gov_version: u64,
261 ) -> Result<(), ActorError> {
262 let node_path = ActorPath::from("/user/node");
263 let node = ctx.system().get_actor::<Node>(&node_path).await?;
264 node.tell(NodeMessage::ConfirmTransfer(
265 self.subject_metadata.subject_id.clone(),
266 ))
267 .await
268 }
269
270 async fn transfer(
271 &self,
272 ctx: &mut ActorContext<Self>,
273 new_owner: PublicKey,
274 _gov_version: u64,
275 ) -> Result<(), ActorError> {
276 let node_path = ActorPath::from("/user/node");
277 let node = ctx.system().get_actor::<Node>(&node_path).await?;
278 node.tell(NodeMessage::TransferSubject(TransferSubject {
279 name: self.subject_metadata.name.clone(),
280 subject_id: self.subject_metadata.subject_id.clone(),
281 new_owner: new_owner.clone(),
282 actual_owner: self.subject_metadata.owner.clone(),
283 }))
284 .await
285 }
286
287 async fn get_last_ledger(
288 &self,
289 ctx: &mut ActorContext<Self>,
290 ) -> Result<Option<SignedLedger>, ActorError> {
291 get_last_event(ctx).await
292 }
293
294 fn apply_patch(
295 &mut self,
296 json_patch: ValueWrapper,
297 ) -> Result<(), ActorError> {
298 let patch_json = serde_json::from_value::<Patch>(json_patch.0)
299 .map_err(|e| {
300 let error = SubjectError::PatchConversionFailed {
301 details: e.to_string(),
302 };
303 error!(
304 error = %e,
305 subject_id = %self.subject_metadata.subject_id,
306 "Failed to convert patch from JSON"
307 );
308 ActorError::Functional {
309 description: error.to_string(),
310 }
311 })?;
312
313 let mut properties = self.properties.to_value_wrapper();
314
315 patch(&mut properties.0, &patch_json).map_err(|e| {
316 let error = SubjectError::PatchApplicationFailed {
317 details: e.to_string(),
318 };
319 error!(
320 error = %e,
321 subject_id = %self.subject_metadata.subject_id,
322 "Failed to apply patch to properties"
323 );
324 ActorError::Functional {
325 description: error.to_string(),
326 }
327 })?;
328
329 self.properties = serde_json::from_value::<GovernanceData>(
330 properties.0,
331 )
332 .map_err(|e| {
333 let error = SubjectError::GovernanceDataConversionFailed {
334 details: e.to_string(),
335 };
336 error!(
337 error = %e,
338 subject_id = %self.subject_metadata.subject_id,
339 "Failed to convert properties to GovernanceData"
340 );
341 ActorError::Functional {
342 description: error.to_string(),
343 }
344 })?;
345
346 debug!(
347 subject_id = %self.subject_metadata.subject_id,
348 "Patch applied successfully"
349 );
350
351 Ok(())
352 }
353
354 async fn manager_new_ledger_events(
355 &mut self,
356 ctx: &mut ActorContext<Self>,
357 events: Vec<SignedLedger>,
358 ) -> Result<(), ActorError> {
359 let Some(network) = ctx
360 .system()
361 .get_helper::<Arc<NetworkSender>>("network")
362 .await
363 else {
364 return Err(ActorError::Helper {
365 name: "network".to_owned(),
366 reason: "Not found".to_owned(),
367 });
368 };
369
370 let Some(hash) = self.hash else {
371 return Err(ActorError::FunctionalCritical {
372 description: "Hash algorithm is None".to_string(),
373 });
374 };
375
376 let current_sn = self.subject_metadata.sn;
377 let current_new_owner_some = self.subject_metadata.new_owner.is_some();
378 let i_current_new_owner = self.subject_metadata.new_owner.clone()
379 == Some((*self.our_key).clone());
380 let current_owner = self.subject_metadata.owner.clone();
381
382 let current_properties = self.properties.clone();
383
384 if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
385 {
386 if let ActorError::Functional { description } = e.clone() {
387 warn!(
388 error = %description,
389 subject_id = %self.subject_metadata.subject_id,
390 sn = self.subject_metadata.sn,
391 "Error verifying new ledger events"
392 );
393
394 if self.subject_metadata.sn == 0 {
396 return Err(e);
397 }
398 } else {
399 error!(
400 error = %e,
401 subject_id = %self.subject_metadata.subject_id,
402 sn = self.subject_metadata.sn,
403 "Critical error verifying new ledger events"
404 );
405 return Err(e);
406 }
407 };
408
409 if current_sn < self.subject_metadata.sn {
410 let old_gov = current_properties;
411 if !self.subject_metadata.active {
412 if current_owner == *self.our_key {
413 Self::down_owner(ctx).await?;
414 } else {
415 Self::down_not_owner(ctx, &old_gov, self.our_key.clone())
416 .await?;
417 }
418
419 let old_schemas_eval = old_gov
420 .schemas_name(ProtocolTypes::Evaluation, &self.our_key);
421
422 Self::down_compilers_schemas(
423 ctx,
424 &old_schemas_eval,
425 &self.subject_metadata.subject_id,
426 )
427 .await?;
428
429 let old_schemas_val = old_gov
430 .schemas_name(ProtocolTypes::Validation, &self.our_key);
431
432 Self::down_schemas(ctx, &old_schemas_eval, &old_schemas_val)
433 .await?;
434 } else {
435 let new_owner_some = self.subject_metadata.new_owner.is_some();
436 let i_new_owner = self.subject_metadata.new_owner.clone()
437 == Some((*self.our_key).clone());
438 let mut up_not_owner: bool = false;
439 let mut up_owner: bool = false;
440
441 if current_owner == *self.our_key {
442 if current_owner != self.subject_metadata.owner {
444 if !current_new_owner_some && !i_new_owner {
446 up_not_owner = true;
448 } else if current_new_owner_some && i_new_owner {
449 up_owner = true;
450 }
451 } else {
452 if current_new_owner_some && !new_owner_some {
454 up_owner = true;
455 } else if !current_new_owner_some && new_owner_some {
456 up_not_owner = true;
457 }
458 }
459 } else {
460 if current_owner != self.subject_metadata.owner
462 && self.subject_metadata.owner == *self.our_key
463 {
464 if !new_owner_some && !i_current_new_owner {
466 up_owner = true;
468 } else if new_owner_some && i_current_new_owner {
469 up_not_owner = true;
470 }
471 } else if i_current_new_owner && !i_new_owner {
472 up_not_owner = true;
473 } else if !i_current_new_owner && i_new_owner {
474 up_owner = true;
475 }
476 }
477
478 if up_not_owner {
479 Self::down_owner(ctx).await?;
480 self.up_not_owner(ctx, &hash, &network).await?;
481 } else if up_owner {
482 Self::down_not_owner(ctx, &old_gov, self.our_key.clone())
483 .await?;
484 self.up_owner(ctx, &hash, &network).await?;
485 }
486
487 if !up_not_owner
490 && !up_owner
491 && *self.our_key != self.subject_metadata.owner
492 {
493 self.up_down_not_owner(ctx, &old_gov, &hash, &network)
494 .await?;
495 }
496
497 self.manager_schemas_compilers(ctx, &old_gov).await?;
498 self.update_childs(ctx).await?;
499 }
500
501 let _ = make_obsolete(ctx, &self.subject_metadata.subject_id).await;
502 }
503
504 if current_sn < self.subject_metadata.sn || current_sn == 0 {
505 Self::publish_sink(
506 ctx,
507 SinkDataMessage::UpdateState(Box::new(Metadata::from(
508 self.clone(),
509 ))),
510 )
511 .await?;
512
513 self.update_sn(ctx).await?;
514 self.refresh_version_sync(ctx).await?;
515 }
516
517 Ok(())
518 }
519}
520
521impl Governance {
522 async fn up_approver_only(
523 &self,
524 ctx: &mut ActorContext<Self>,
525 hash: &HashAlgorithm,
526 network: &Arc<NetworkSender>,
527 ) -> Result<(), ActorError> {
528 if !self.properties.has_this_role(HashThisRole::Gov {
529 who: (*self.our_key).clone(),
530 role: RoleTypes::Approver,
531 }) {
532 return Ok(());
533 }
534
535 let always_accept = if let Some(config) =
536 ctx.system().get_helper::<ConfigHelper>("config").await
537 {
538 config.always_accept
539 } else {
540 return Err(ActorError::Helper {
541 name: "config".to_string(),
542 reason: "Not Found".to_string(),
543 });
544 };
545
546 let pass_votation = if always_accept {
547 VotationType::AlwaysAccept
548 } else {
549 VotationType::Manual
550 };
551
552 let owner = *self.our_key == self.subject_metadata.owner;
553 let i_new_owner =
554 self.subject_metadata.new_owner == Some((*self.our_key).clone());
555
556 let node_key = if (owner && self.subject_metadata.new_owner.is_none())
557 || i_new_owner
558 {
559 (*self.our_key).clone()
560 } else {
561 self.subject_metadata
562 .new_owner
563 .clone()
564 .unwrap_or_else(|| self.subject_metadata.owner.clone())
565 };
566
567 let init_approver = InitApprPersist {
568 our_key: self.our_key.clone(),
569 node_key,
570 subject_id: self.subject_metadata.subject_id.clone(),
571 pass_votation,
572 helpers: (*hash, network.clone()),
573 };
574
575 ctx.create_child("approver", ApprPersist::initial(init_approver))
576 .await?;
577
578 Ok(())
579 }
580
581 async fn current_validation_roles(
582 &self,
583 ctx: &ActorContext<Self>,
584 schema_id: SchemaType,
585 ) -> Result<CurrentValidationRoles, ActorError> {
586 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
587 let response = actor
588 .ask(RoleRegisterMessage::GetCurrentValidationRoles { schema_id })
589 .await?;
590
591 match response {
592 RoleRegisterResponse::CurrentValidationRoles(roles) => Ok(roles),
593 _ => Err(ActorError::UnexpectedResponse {
594 path: ActorPath::from(format!(
595 "/user/node/subject_manager/{}/role_register",
596 self.subject_metadata.subject_id
597 )),
598 expected: "RoleRegisterResponse::CurrentValidationRoles"
599 .to_owned(),
600 }),
601 }
602 }
603
604 async fn refresh_version_sync(
605 &self,
606 ctx: &ActorContext<Self>,
607 ) -> Result<(), ActorError> {
608 if !self.service {
609 return Ok(());
610 }
611
612 let version_sync = ctx
613 .get_child::<GovernanceVersionSync>("version_sync")
614 .await?;
615 let governance_peers = self
616 .properties
617 .get_witnesses(WitnessesData::Gov)
618 .map_err(|e| ActorError::Functional {
619 description: e.to_string(),
620 })?;
621
622 let _ = version_sync
623 .ask(GovernanceVersionSyncMessage::RefreshGovernance {
624 version: self.properties.version,
625 governance_peers,
626 })
627 .await?;
628
629 Ok(())
630 }
631
632 async fn update_schemas(
633 &self,
634 ctx: &ActorContext<Self>,
635 schema_creators_eval: &BTreeMap<
636 SchemaType,
637 BTreeMap<PublicKey, BTreeSet<Namespace>>,
638 >,
639 schema_creators_vali: &BTreeMap<
640 SchemaType,
641 BTreeMap<PublicKey, BTreeSet<Namespace>>,
642 >,
643 update_eval: &BTreeMap<SchemaType, ValueWrapper>,
644 update_vali: &BTreeMap<SchemaType, ValueWrapper>,
645 ) -> Result<(), ActorError> {
646 for (schema_id, init_state) in update_eval.iter() {
647 let actor = ctx
648 .get_child::<EvaluationSchema>(&format!(
649 "{}_evaluation",
650 schema_id
651 ))
652 .await?;
653
654 actor
655 .tell(EvaluationSchemaMessage::Update {
656 creators: schema_creators_eval
657 .get(schema_id)
658 .cloned()
659 .unwrap_or_default(),
660 sn: self.subject_metadata.sn,
661 gov_version: self.properties.version,
662 init_state: init_state.clone(),
663 })
664 .await?;
665 }
666
667 for (schema_id, init_state) in update_vali.iter() {
668 let current_roles = self
669 .current_validation_roles(ctx, schema_id.clone())
670 .await?;
671 let actor = ctx
672 .get_child::<ValidationSchema>(&format!(
673 "{}_validation",
674 schema_id
675 ))
676 .await?;
677
678 actor
679 .tell(ValidationSchemaMessage::Update {
680 creators: schema_creators_vali
681 .get(schema_id)
682 .cloned()
683 .unwrap_or_default(),
684 sn: self.subject_metadata.sn,
685 gov_version: self.properties.version,
686 init_state: init_state.clone(),
687 current_roles: current_roles.schema,
688 })
689 .await?;
690 }
691
692 Ok(())
693 }
694
695 async fn down_schemas(
696 ctx: &ActorContext<Self>,
697 old_schemas_eval: &BTreeSet<SchemaType>,
698 old_schemas_val: &BTreeSet<SchemaType>,
699 ) -> Result<(), ActorError> {
700 for schema_id in old_schemas_eval {
701 let actor = ctx
702 .get_child::<EvaluationSchema>(&format!(
703 "{}_evaluation",
704 schema_id
705 ))
706 .await?;
707 actor.ask_stop().await?;
708 }
709
710 for schema_id in old_schemas_val {
711 let actor = ctx
712 .get_child::<ValidationSchema>(&format!(
713 "{}_validation",
714 schema_id
715 ))
716 .await?;
717 actor.ask_stop().await?;
718 }
719
720 Ok(())
721 }
722
723 async fn up_schemas(
724 &self,
725 ctx: &mut ActorContext<Self>,
726 schema_creators_eval: &BTreeMap<
727 SchemaType,
728 BTreeMap<PublicKey, BTreeSet<Namespace>>,
729 >,
730 schema_creators_vali: &BTreeMap<
731 SchemaType,
732 BTreeMap<PublicKey, BTreeSet<Namespace>>,
733 >,
734 up_eval: &BTreeMap<SchemaType, ValueWrapper>,
735 up_vali: &BTreeMap<SchemaType, ValueWrapper>,
736 hash_network: (&HashAlgorithm, &Arc<NetworkSender>),
737 ) -> Result<(), ActorError> {
738 for (schema_id, init_state) in up_eval.iter() {
739 let eval_actor = EvaluationSchema {
740 our_key: self.our_key.clone(),
741 governance_id: self.subject_metadata.subject_id.clone(),
742 gov_version: self.properties.version,
743 sn: self.subject_metadata.sn,
744 creators: schema_creators_eval
745 .get(schema_id)
746 .cloned()
747 .unwrap_or_default(),
748 schema_id: schema_id.clone(),
749 init_state: init_state.clone(),
750 hash: *hash_network.0,
751 network: hash_network.1.clone(),
752 };
753
754 ctx.create_child(&format!("{}_evaluation", schema_id), eval_actor)
755 .await?;
756 }
757
758 for (schema_id, init_state) in up_vali.iter() {
759 let current_roles = self
760 .current_validation_roles(ctx, schema_id.clone())
761 .await?;
762 let vali_actor = ValidationSchema {
763 our_key: self.our_key.clone(),
764 governance_id: self.subject_metadata.subject_id.clone(),
765 gov_version: self.properties.version,
766 sn: self.subject_metadata.sn,
767 creators: schema_creators_vali
768 .get(schema_id)
769 .cloned()
770 .unwrap_or_default(),
771 schema_id: schema_id.clone(),
772 init_state: init_state.clone(),
773 current_roles: current_roles.schema,
774 hash: *hash_network.0,
775 network: hash_network.1.clone(),
776 };
777
778 ctx.create_child(&format!("{}_validation", schema_id), vali_actor)
779 .await?;
780 }
781
782 Ok(())
783 }
784
785 async fn manager_schemas_compilers(
786 &self,
787 ctx: &mut ActorContext<Self>,
788 old_gov: &GovernanceData,
789 ) -> Result<(), ActorError> {
790 let Some(network) = ctx
791 .system()
792 .get_helper::<Arc<NetworkSender>>("network")
793 .await
794 else {
795 return Err(ActorError::Helper {
796 name: "network".to_owned(),
797 reason: "Not found".to_owned(),
798 });
799 };
800
801 let Some(hash) = self.hash else {
802 return Err(ActorError::FunctionalCritical {
803 description: "Hash algorithm is None".to_string(),
804 });
805 };
806
807 let (old_schemas_eval, new_schemas_eval) = {
808 let old_schemas_eval =
809 old_gov.schemas_name(ProtocolTypes::Evaluation, &self.our_key);
810
811 let new_schemas_eval = self
812 .properties
813 .schemas(ProtocolTypes::Evaluation, &self.our_key);
814
815 let down = old_schemas_eval
818 .clone()
819 .iter()
820 .filter(|x| !new_schemas_eval.contains_key(x))
821 .cloned()
822 .collect();
823 Self::down_compilers_schemas(
824 ctx,
825 &down,
826 &self.subject_metadata.subject_id,
827 )
828 .await?;
829
830 let up = new_schemas_eval
832 .clone()
833 .iter()
834 .filter(|x| !old_schemas_eval.contains(x.0))
835 .map(|x| (x.0.clone(), x.1.clone()))
836 .collect();
837
838 Self::up_compilers_schemas(
839 ctx,
840 &up,
841 self.subject_metadata.subject_id.clone(),
842 &hash,
843 )
844 .await?;
845
846 let current = new_schemas_eval
848 .clone()
849 .iter()
850 .filter(|x| old_schemas_eval.contains(x.0))
851 .map(|x| (x.0.clone(), x.1.clone()))
852 .collect();
853
854 Self::compile_schemas(
855 ctx,
856 current,
857 self.subject_metadata.subject_id.clone(),
858 )
859 .await?;
860
861 (
862 old_schemas_eval,
863 new_schemas_eval
864 .iter()
865 .map(|x| (x.0.clone(), x.1.initial_value.clone()))
866 .collect::<BTreeMap<SchemaType, ValueWrapper>>(),
867 )
868 };
869 let old_schemas_vali =
870 old_gov.schemas_name(ProtocolTypes::Validation, &self.our_key);
871
872 let new_schemas_vali = self
873 .properties
874 .schemas_init_value(ProtocolTypes::Validation, &self.our_key);
875
876 let down_eval = old_schemas_eval
878 .clone()
879 .iter()
880 .filter(|x| !new_schemas_eval.contains_key(x))
881 .cloned()
882 .collect();
883
884 let down_vali = old_schemas_vali
885 .clone()
886 .iter()
887 .filter(|x| !new_schemas_vali.contains_key(x))
888 .cloned()
889 .collect();
890
891 Self::down_schemas(ctx, &down_eval, &down_vali).await?;
892
893 let schemas_namespace_eval = self
895 .properties
896 .schemas_namespace(ProtocolTypes::Evaluation, &self.our_key);
897
898 let schema_creators_eval = self
899 .properties
900 .schema_creators_namespace(schemas_namespace_eval);
901
902 let up_eval = new_schemas_eval
903 .clone()
904 .iter()
905 .filter(|x| !old_schemas_eval.contains(x.0))
906 .map(|x| (x.0.clone(), x.1.clone()))
907 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
908
909 let schemas_namespace_vali = self
910 .properties
911 .schemas_namespace(ProtocolTypes::Validation, &self.our_key);
912
913 let schema_creators_vali = self
914 .properties
915 .schema_creators_namespace(schemas_namespace_vali);
916
917 let up_vali = new_schemas_vali
918 .clone()
919 .iter()
920 .filter(|x| !old_schemas_vali.contains(x.0))
921 .map(|x| (x.0.clone(), x.1.clone()))
922 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
923 self.up_schemas(
925 ctx,
926 &schema_creators_eval,
927 &schema_creators_vali,
928 &up_eval,
929 &up_vali,
930 (&hash, &network),
931 )
932 .await?;
933
934 let update_eval = new_schemas_eval
936 .clone()
937 .iter()
938 .filter(|x| old_schemas_eval.contains(x.0))
939 .map(|x| (x.0.clone(), x.1.clone()))
940 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
941
942 let update_vali = new_schemas_vali
943 .clone()
944 .iter()
945 .filter(|x| old_schemas_vali.contains(x.0))
946 .map(|x| (x.0.clone(), x.1.clone()))
947 .collect::<BTreeMap<SchemaType, ValueWrapper>>();
948
949 self.update_schemas(
950 ctx,
951 &schema_creators_eval,
952 &schema_creators_vali,
953 &update_eval,
954 &update_vali,
955 )
956 .await
957 }
958
959 async fn update_childs(
960 &self,
961 ctx: &ActorContext<Self>,
962 ) -> Result<(), ActorError> {
963 if let Ok(evaluator) = ctx.get_child::<EvalWorker>("evaluator").await {
964 evaluator
965 .tell(EvalWorkerMessage::UpdateGovVersion {
966 gov_version: self.properties.version,
967 })
968 .await?;
969 }
970
971 if let Ok(validator) = ctx.get_child::<ValiWorker>("validator").await {
972 let current_roles = self
973 .current_validation_roles(ctx, SchemaType::Governance)
974 .await?;
975 validator
976 .tell(ValiWorkerMessage::UpdateCurrentRoles {
977 gov_version: self.properties.version,
978 current_roles: crate::validation::worker::CurrentWorkerRoles {
979 approval: current_roles.approval,
980 evaluation: crate::governance::role_register::RoleDataRegister {
981 workers: current_roles
982 .schema
983 .evaluation
984 .iter()
985 .map(|role| role.key.clone())
986 .collect(),
987 quorum: current_roles.schema.evaluation_quorum,
988 },
989 },
990 })
991 .await?;
992 }
993
994 Ok(())
995 }
996
997 async fn sweep_contract_artifacts(
998 &self,
999 ctx: &ActorContext<Self>,
1000 schemas: &BTreeMap<SchemaType, Schema>,
1001 ) -> Result<(), ActorError> {
1002 let Some(config) =
1003 ctx.system().get_helper::<ConfigHelper>("config").await
1004 else {
1005 return Err(ActorError::Helper {
1006 name: "config".to_string(),
1007 reason: "Not Found".to_string(),
1008 });
1009 };
1010
1011 let Some(contracts) = ctx
1012 .system()
1013 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1014 "contracts",
1015 )
1016 .await
1017 else {
1018 return Err(ActorError::Helper {
1019 name: "contracts".to_string(),
1020 reason: "Not Found".to_string(),
1021 });
1022 };
1023
1024 let contract_register = ctx
1025 .get_child::<ContractRegister>("contract_register")
1026 .await?;
1027
1028 let prefix = format!("{}_", self.subject_metadata.subject_id);
1029 let mut allowed: HashSet<String> = schemas
1030 .keys()
1031 .map(|schema_id| {
1032 format!("{}_{}", self.subject_metadata.subject_id, schema_id)
1033 })
1034 .collect();
1035
1036 let registered: Vec<String> = match contract_register
1037 .ask(ContractRegisterMessage::ListContracts)
1038 .await?
1039 {
1040 ContractRegisterResponse::Contracts(contracts) => contracts,
1041 _ => Vec::new(),
1042 };
1043
1044 for contract_name in registered {
1045 if contract_name.starts_with(&prefix)
1046 && !allowed.contains(&contract_name)
1047 {
1048 contract_register
1049 .tell(ContractRegisterMessage::DeleteMetadata {
1050 contract_name: contract_name.clone(),
1051 })
1052 .await?;
1053 let mut contracts = contracts.write().await;
1054 contracts.remove(&contract_name);
1055 }
1056 }
1057
1058 let contracts_dir = config.contracts_path.join("contracts");
1059 if !contracts_dir.exists() {
1060 return Ok(());
1061 }
1062
1063 let mut entries = fs::read_dir(&contracts_dir).await.map_err(|e| {
1064 ActorError::Functional {
1065 description: format!(
1066 "Can not read contracts directory {}: {}",
1067 contracts_dir.display(),
1068 e
1069 ),
1070 }
1071 })?;
1072
1073 while let Some(entry) =
1074 entries
1075 .next_entry()
1076 .await
1077 .map_err(|e| ActorError::Functional {
1078 description: format!(
1079 "Can not iterate contracts directory {}: {}",
1080 contracts_dir.display(),
1081 e
1082 ),
1083 })?
1084 {
1085 let file_name = entry.file_name().to_string_lossy().to_string();
1086 if !file_name.starts_with(&prefix) {
1087 continue;
1088 }
1089
1090 let is_temp = file_name.starts_with(&format!(
1091 "{}_temp_",
1092 self.subject_metadata.subject_id
1093 ));
1094 if is_temp || !allowed.contains(&file_name) {
1095 let path = entry.path();
1096 let _ = fs::remove_dir_all(path).await;
1097 if !is_temp {
1098 allowed.remove(&file_name);
1099 }
1100 }
1101 }
1102
1103 Ok(())
1104 }
1105
1106 async fn delete_all_contract_artifacts(
1107 &self,
1108 ctx: &ActorContext<Self>,
1109 contract_register: &ActorRef<ContractRegister>,
1110 ) -> Result<(), ActorError> {
1111 let Some(config) =
1112 ctx.system().get_helper::<ConfigHelper>("config").await
1113 else {
1114 return Err(ActorError::Helper {
1115 name: "config".to_string(),
1116 reason: "Not Found".to_string(),
1117 });
1118 };
1119
1120 let Some(contracts) = ctx
1121 .system()
1122 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1123 "contracts",
1124 )
1125 .await
1126 else {
1127 return Err(ActorError::Helper {
1128 name: "contracts".to_string(),
1129 reason: "Not Found".to_string(),
1130 });
1131 };
1132
1133 let prefix = format!("{}_", self.subject_metadata.subject_id);
1134
1135 let registered: Vec<String> = match contract_register
1136 .ask(ContractRegisterMessage::ListContracts)
1137 .await?
1138 {
1139 ContractRegisterResponse::Contracts(contracts) => contracts,
1140 _ => Vec::new(),
1141 };
1142
1143 for contract_name in registered {
1144 if contract_name.starts_with(&prefix) {
1145 contract_register
1146 .ask(ContractRegisterMessage::DeleteMetadata {
1147 contract_name: contract_name.clone(),
1148 })
1149 .await?;
1150 let mut contracts = contracts.write().await;
1151 contracts.remove(&contract_name);
1152 }
1153 }
1154
1155 let contracts_dir = config.contracts_path.join("contracts");
1156 if !contracts_dir.exists() {
1157 return Ok(());
1158 }
1159
1160 let mut entries = fs::read_dir(&contracts_dir).await.map_err(|e| {
1161 ActorError::Functional {
1162 description: format!(
1163 "Can not read contracts directory {}: {}",
1164 contracts_dir.display(),
1165 e
1166 ),
1167 }
1168 })?;
1169
1170 while let Some(entry) =
1171 entries
1172 .next_entry()
1173 .await
1174 .map_err(|e| ActorError::Functional {
1175 description: format!(
1176 "Can not iterate contracts directory {}: {}",
1177 contracts_dir.display(),
1178 e
1179 ),
1180 })?
1181 {
1182 let file_name = entry.file_name().to_string_lossy().to_string();
1183 if file_name.starts_with(&prefix) {
1184 let path = entry.path();
1185 fs::remove_dir_all(&path).await.map_err(|e| {
1186 ActorError::Functional {
1187 description: format!(
1188 "Can not remove contract directory {}: {}",
1189 path.display(),
1190 e
1191 ),
1192 }
1193 })?;
1194 }
1195 }
1196
1197 Ok(())
1198 }
1199
1200 async fn build_childs(
1201 &self,
1202 ctx: &mut ActorContext<Self>,
1203 hash: &HashAlgorithm,
1204 network: &Arc<NetworkSender>,
1205 ) -> Result<(), ActorError> {
1206 let owner = *self.our_key == self.subject_metadata.owner;
1208 let new_owner = self.subject_metadata.new_owner.is_some();
1209 let i_new_owner =
1210 self.subject_metadata.new_owner == Some((*self.our_key).clone());
1211
1212 if new_owner {
1213 if i_new_owner {
1214 self.up_owner(ctx, hash, network).await?;
1215 } else {
1216 self.up_not_owner(ctx, hash, network).await?;
1217 }
1218 } else if owner {
1219 self.up_owner(ctx, hash, network).await?;
1220 } else {
1221 self.up_not_owner(ctx, hash, network).await?;
1222 }
1223
1224 let new_schemas_eval = {
1225 let schemas = self
1226 .properties
1227 .schemas(ProtocolTypes::Evaluation, &self.our_key);
1228 self.sweep_contract_artifacts(ctx, &schemas).await?;
1229 Self::up_compilers_schemas(
1230 ctx,
1231 &schemas,
1232 self.subject_metadata.subject_id.clone(),
1233 hash,
1234 )
1235 .await?;
1236
1237 schemas
1238 .iter()
1239 .map(|x| (x.0.clone(), x.1.initial_value.clone()))
1240 .collect::<BTreeMap<SchemaType, ValueWrapper>>()
1241 };
1242
1243 let schemas_namespace_eval = self
1244 .properties
1245 .schemas_namespace(ProtocolTypes::Evaluation, &self.our_key);
1246
1247 let schema_creators_eval = self
1248 .properties
1249 .schema_creators_namespace(schemas_namespace_eval);
1250
1251 let schemas_namespace_vali = self
1252 .properties
1253 .schemas_namespace(ProtocolTypes::Validation, &self.our_key);
1254
1255 let schema_creators_vali = self
1256 .properties
1257 .schema_creators_namespace(schemas_namespace_vali);
1258
1259 let new_schemas_vali = self
1260 .properties
1261 .schemas_init_value(ProtocolTypes::Validation, &self.our_key);
1262
1263 self.up_schemas(
1264 ctx,
1265 &schema_creators_eval,
1266 &schema_creators_vali,
1267 &new_schemas_eval,
1268 &new_schemas_vali,
1269 (hash, network),
1270 )
1271 .await
1272 }
1273
1274 async fn up_not_owner(
1275 &self,
1276 ctx: &mut ActorContext<Self>,
1277 hash: &HashAlgorithm,
1278 network: &Arc<NetworkSender>,
1279 ) -> Result<(), ActorError> {
1280 let node_key = self.subject_metadata.new_owner.as_ref().map_or_else(
1281 || self.subject_metadata.owner.clone(),
1282 |new_owner| new_owner.clone(),
1283 );
1284
1285 if self.properties.has_this_role(HashThisRole::Gov {
1286 who: (*self.our_key).clone(),
1287 role: RoleTypes::Validator,
1288 }) {
1289 let current_roles = self
1290 .current_validation_roles(ctx, SchemaType::Governance)
1291 .await?;
1292 let validator = ValiWorker {
1294 node_key: node_key.clone(),
1295 our_key: self.our_key.clone(),
1296 init_state: None,
1297 governance_id: self.subject_metadata.subject_id.clone(),
1298 gov_version: self.properties.version,
1299 sn: self.subject_metadata.sn,
1300 hash: *hash,
1301 network: network.clone(),
1302 current_roles: crate::validation::worker::CurrentWorkerRoles {
1303 approval: current_roles.approval,
1304 evaluation:
1305 crate::governance::role_register::RoleDataRegister {
1306 workers: current_roles
1307 .schema
1308 .evaluation
1309 .iter()
1310 .map(|role| role.key.clone())
1311 .collect(),
1312 quorum: current_roles.schema.evaluation_quorum,
1313 },
1314 },
1315 stop: false,
1316 };
1317 ctx.create_child("validator", validator).await?;
1318 }
1319
1320 if self.properties.has_this_role(HashThisRole::Gov {
1321 who: (*self.our_key).clone(),
1322 role: RoleTypes::Evaluator,
1323 }) {
1324 let evaluator = EvalWorker {
1326 node_key: node_key.clone(),
1327 our_key: self.our_key.clone(),
1328 governance_id: self.subject_metadata.subject_id.clone(),
1329 gov_version: self.properties.version,
1330 sn: self.subject_metadata.sn,
1331 init_state: None,
1332 hash: *hash,
1333 network: network.clone(),
1334 stop: false,
1335 };
1336 ctx.create_child("evaluator", evaluator).await?;
1337 }
1338
1339 if self.properties.has_this_role(HashThisRole::Gov {
1340 who: (*self.our_key).clone(),
1341 role: RoleTypes::Approver,
1342 }) {
1343 let always_accept = if let Some(config) =
1344 ctx.system().get_helper::<ConfigHelper>("config").await
1345 {
1346 config.always_accept
1347 } else {
1348 return Err(ActorError::Helper {
1349 name: "config".to_owned(),
1350 reason: "Not found".to_owned(),
1351 });
1352 };
1353
1354 let pass_votation = if always_accept {
1355 VotationType::AlwaysAccept
1356 } else {
1357 VotationType::Manual
1358 };
1359
1360 let init_approver = InitApprPersist {
1361 our_key: self.our_key.clone(),
1362 node_key: node_key.clone(),
1363 subject_id: self.subject_metadata.subject_id.clone(),
1364 pass_votation,
1365 helpers: (*hash, network.clone()),
1366 };
1367
1368 ctx.create_child("approver", ApprPersist::initial(init_approver))
1369 .await?;
1370 }
1371
1372 Ok(())
1373 }
1374
1375 async fn up_down_not_owner(
1376 &self,
1377 ctx: &mut ActorContext<Self>,
1378 old_gov: &GovernanceData,
1379 hash: &HashAlgorithm,
1380 network: &Arc<NetworkSender>,
1381 ) -> Result<(), ActorError> {
1382 let node_key = self.subject_metadata.new_owner.as_ref().map_or_else(
1383 || self.subject_metadata.owner.clone(),
1384 |new_owner| new_owner.clone(),
1385 );
1386
1387 let old_val = old_gov.has_this_role(HashThisRole::Gov {
1388 who: (*self.our_key).clone(),
1389 role: RoleTypes::Validator,
1390 });
1391
1392 let new_val = self.properties.has_this_role(HashThisRole::Gov {
1393 who: (*self.our_key).clone(),
1394 role: RoleTypes::Validator,
1395 });
1396
1397 match (old_val, new_val) {
1398 (true, false) => {
1399 let actor = ctx.get_child::<ValiWorker>("validator").await?;
1400 actor.ask_stop().await?;
1401 }
1402 (false, true) => {
1403 let current_roles = self
1404 .current_validation_roles(ctx, SchemaType::Governance)
1405 .await?;
1406 let validator = ValiWorker {
1408 node_key: node_key.clone(),
1409 our_key: self.our_key.clone(),
1410 init_state: None,
1411 governance_id: self.subject_metadata.subject_id.clone(),
1412 gov_version: self.properties.version,
1413 sn: self.subject_metadata.sn,
1414 hash: *hash,
1415 network: network.clone(),
1416 current_roles: crate::validation::worker::CurrentWorkerRoles {
1417 approval: current_roles.approval,
1418 evaluation: crate::governance::role_register::RoleDataRegister {
1419 workers: current_roles
1420 .schema
1421 .evaluation
1422 .iter()
1423 .map(|role| role.key.clone())
1424 .collect(),
1425 quorum: current_roles.schema.evaluation_quorum,
1426 },
1427 },
1428 stop: false,
1429 };
1430 ctx.create_child("validator", validator).await?;
1431 }
1432 _ => {}
1433 };
1434
1435 let old_eval = old_gov.has_this_role(HashThisRole::Gov {
1436 who: (*self.our_key).clone(),
1437 role: RoleTypes::Evaluator,
1438 });
1439
1440 let new_eval = self.properties.has_this_role(HashThisRole::Gov {
1441 who: (*self.our_key).clone(),
1442 role: RoleTypes::Evaluator,
1443 });
1444
1445 match (old_eval, new_eval) {
1446 (true, false) => {
1447 let actor = ctx.get_child::<EvalWorker>("evaluator").await?;
1448
1449 actor.ask_stop().await?;
1450 }
1451 (false, true) => {
1452 let evaluator = EvalWorker {
1453 node_key: node_key.clone(),
1454 our_key: self.our_key.clone(),
1455 governance_id: self.subject_metadata.subject_id.clone(),
1456 gov_version: self.properties.version,
1457 sn: self.subject_metadata.sn,
1458 init_state: None,
1459 hash: *hash,
1460 network: network.clone(),
1461 stop: false,
1462 };
1463 ctx.create_child("evaluator", evaluator).await?;
1464 }
1465 _ => {}
1466 };
1467
1468 let old_appr = old_gov.has_this_role(HashThisRole::Gov {
1469 who: (*self.our_key).clone(),
1470 role: RoleTypes::Approver,
1471 });
1472
1473 let new_appr = self.properties.has_this_role(HashThisRole::Gov {
1474 who: (*self.our_key).clone(),
1475 role: RoleTypes::Approver,
1476 });
1477
1478 match (old_appr, new_appr) {
1479 (true, false) => {
1480 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1481
1482 actor.ask_stop().await?;
1483 }
1484 (false, true) => {
1485 let always_accept = if let Some(config) =
1486 ctx.system().get_helper::<ConfigHelper>("config").await
1487 {
1488 config.always_accept
1489 } else {
1490 return Err(ActorError::Helper {
1491 name: "config".to_owned(),
1492 reason: "Not found".to_owned(),
1493 });
1494 };
1495
1496 let pass_votation = if always_accept {
1497 VotationType::AlwaysAccept
1498 } else {
1499 VotationType::Manual
1500 };
1501
1502 let init_approver = InitApprPersist {
1503 our_key: self.our_key.clone(),
1504 node_key: node_key.clone(),
1505 subject_id: self.subject_metadata.subject_id.clone(),
1506 pass_votation,
1507 helpers: (*hash, network.clone()),
1508 };
1509
1510 ctx.create_child(
1511 "approver",
1512 ApprPersist::initial(init_approver),
1513 )
1514 .await?;
1515 }
1516 _ => {}
1517 };
1518
1519 Ok(())
1520 }
1521
1522 async fn down_not_owner(
1523 ctx: &ActorContext<Self>,
1524 gov: &GovernanceData,
1525 our_key: Arc<PublicKey>,
1526 ) -> Result<(), ActorError> {
1527 if gov.has_this_role(HashThisRole::Gov {
1528 who: (*our_key).clone(),
1529 role: RoleTypes::Validator,
1530 }) {
1531 let actor = ctx.get_child::<ValiWorker>("validator").await?;
1532
1533 actor.ask_stop().await?;
1534 }
1535
1536 if gov.has_this_role(HashThisRole::Gov {
1537 who: (*our_key).clone(),
1538 role: RoleTypes::Evaluator,
1539 }) {
1540 let actor = ctx.get_child::<EvalWorker>("evaluator").await?;
1541
1542 actor.ask_stop().await?;
1543 }
1544
1545 if gov.has_this_role(HashThisRole::Gov {
1546 who: (*our_key).clone(),
1547 role: RoleTypes::Approver,
1548 }) {
1549 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1550
1551 actor.ask_stop().await?;
1552 }
1553
1554 Ok(())
1555 }
1556
1557 async fn up_owner(
1558 &self,
1559 ctx: &mut ActorContext<Self>,
1560 hash: &HashAlgorithm,
1561 network: &Arc<NetworkSender>,
1562 ) -> Result<(), ActorError> {
1563 let always_accept = if let Some(config) =
1564 ctx.system().get_helper::<ConfigHelper>("config").await
1565 {
1566 config.always_accept
1567 } else {
1568 return Err(ActorError::Helper {
1569 name: "config".to_string(),
1570 reason: "Not Found".to_string(),
1571 });
1572 };
1573 let pass_votation = if always_accept {
1574 VotationType::AlwaysAccept
1575 } else {
1576 VotationType::Manual
1577 };
1578
1579 let init_approver = InitApprPersist {
1580 our_key: self.our_key.clone(),
1581 node_key: (*self.our_key).clone(),
1582 subject_id: self.subject_metadata.subject_id.clone(),
1583 pass_votation,
1584 helpers: (*hash, network.clone()),
1585 };
1586
1587 ctx.create_child("approver", ApprPersist::initial(init_approver))
1588 .await?;
1589
1590 Ok(())
1591 }
1592
1593 async fn down_owner(ctx: &ActorContext<Self>) -> Result<(), ActorError> {
1594 let actor = ctx.get_child::<ApprPersist>("approver").await?;
1595 actor.ask_stop().await?;
1596
1597 Ok(())
1598 }
1599
1600 async fn up_compilers_schemas(
1601 ctx: &mut ActorContext<Self>,
1602 schemas: &BTreeMap<SchemaType, Schema>,
1603 subject_id: DigestIdentifier,
1604 hash: &HashAlgorithm,
1605 ) -> Result<(), ActorError> {
1606 let contracts_path = if let Some(config) =
1607 ctx.system().get_helper::<ConfigHelper>("config").await
1608 {
1609 config.contracts_path
1610 } else {
1611 return Err(ActorError::Helper {
1612 name: "config".to_string(),
1613 reason: "Not Found".to_string(),
1614 });
1615 };
1616
1617 for (id, schema) in schemas {
1618 let actor_name = format!("{}_contract_compiler", id);
1619
1620 let compiler = ctx
1621 .create_child(&actor_name, ContractCompiler::new(*hash))
1622 .await?;
1623
1624 let Schema {
1625 contract,
1626 initial_value,
1627 } = schema;
1628
1629 let response = compiler
1630 .ask(ContractCompilerMessage::Compile {
1631 contract_name: format!("{}_{}", subject_id, id),
1632 contract: contract.clone(),
1633 initial_value: initial_value.0.clone(),
1634 contract_path: contracts_path
1635 .join("contracts")
1636 .join(format!("{}_{}", subject_id, id)),
1637 })
1638 .await?;
1639
1640 if let CompilerResponse::Error(error) = response {
1641 return Err(ActorError::Functional {
1642 description: format!(
1643 "Can not compile schema contract {}: {}",
1644 id, error
1645 ),
1646 });
1647 }
1648 }
1649
1650 Ok(())
1651 }
1652
1653 async fn down_compilers_schemas(
1654 ctx: &ActorContext<Self>,
1655 schemas: &BTreeSet<SchemaType>,
1656 subject_id: &DigestIdentifier,
1657 ) -> Result<(), ActorError> {
1658 let Some(config) =
1659 ctx.system().get_helper::<ConfigHelper>("config").await
1660 else {
1661 return Err(ActorError::Helper {
1662 name: "config".to_string(),
1663 reason: "Not Found".to_string(),
1664 });
1665 };
1666
1667 let Some(contracts) = ctx
1668 .system()
1669 .get_helper::<Arc<RwLock<HashMap<String, Arc<Module>>>>>(
1670 "contracts",
1671 )
1672 .await
1673 else {
1674 return Err(ActorError::Helper {
1675 name: "contracts".to_string(),
1676 reason: "Not Found".to_string(),
1677 });
1678 };
1679
1680 let contract_register = ctx
1681 .get_child::<ContractRegister>("contract_register")
1682 .await?;
1683
1684 for schema_id in schemas.iter() {
1685 let actor = ctx
1686 .get_child::<ContractCompiler>(&format!(
1687 "{}_contract_compiler",
1688 schema_id
1689 ))
1690 .await?;
1691
1692 actor.ask_stop().await?;
1693
1694 let contract_name = format!("{}_{}", subject_id, schema_id);
1695 contract_register
1696 .tell(ContractRegisterMessage::DeleteMetadata {
1697 contract_name: contract_name.clone(),
1698 })
1699 .await?;
1700
1701 {
1702 let mut contracts = contracts.write().await;
1703 contracts.remove(&contract_name);
1704 }
1705
1706 let contract_path =
1707 config.contracts_path.join("contracts").join(&contract_name);
1708 let _ = fs::remove_dir_all(contract_path).await;
1709 }
1710
1711 Ok(())
1712 }
1713
1714 async fn compile_schemas(
1715 ctx: &ActorContext<Self>,
1716 schemas: HashMap<SchemaType, Schema>,
1717 subject_id: DigestIdentifier,
1718 ) -> Result<(), ActorError> {
1719 let contracts_path = if let Some(config) =
1720 ctx.system().get_helper::<ConfigHelper>("config").await
1721 {
1722 config.contracts_path
1723 } else {
1724 return Err(ActorError::Helper {
1725 name: "config".to_owned(),
1726 reason: "Not found".to_owned(),
1727 });
1728 };
1729
1730 for (id, schema) in schemas {
1731 let actor = ctx
1732 .get_child::<ContractCompiler>(&format!(
1733 "{}_contract_compiler",
1734 id
1735 ))
1736 .await?;
1737
1738 let response = actor
1739 .ask(ContractCompilerMessage::Compile {
1740 contract_name: format!("{}_{}", subject_id, id),
1741 contract: schema.contract.clone(),
1742 initial_value: schema.initial_value.0.clone(),
1743 contract_path: contracts_path
1744 .join("contracts")
1745 .join(format!("{}_{}", subject_id, id)),
1746 })
1747 .await?;
1748
1749 if let CompilerResponse::Error(error) = response {
1750 return Err(ActorError::Functional {
1751 description: format!(
1752 "Can not refresh schema contract {}: {}",
1753 id, error
1754 ),
1755 });
1756 }
1757 }
1758
1759 Ok(())
1760 }
1761
1762 fn build_creators_register_fact(
1763 &self,
1764 new_creator: HashMap<
1765 (SchemaType, String, PublicKey),
1766 (CreatorQuantity, Vec<WitnessesType>),
1767 >,
1768 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
1769 creator_update: CreatorRoleUpdate,
1770 new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1771 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1772 ) -> (SubjectRegisterMessage, WitnessesRegisterMessage) {
1773 let mut data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)> =
1774 vec![];
1775
1776 let mut new_creator_data: HashMap<
1777 (SchemaType, String, PublicKey),
1778 Vec<WitnessesType>,
1779 > = HashMap::new();
1780
1781 let mut update_creator_witnesses_data: HashSet<(
1782 SchemaType,
1783 String,
1784 PublicKey,
1785 Vec<WitnessesType>,
1786 )> = HashSet::new();
1787
1788 for ((schema_id, ns, creator), (quantity, witnesses)) in
1789 new_creator.iter()
1790 {
1791 data.push((
1792 creator.clone(),
1793 schema_id.clone(),
1794 ns.clone(),
1795 quantity.clone(),
1796 ));
1797
1798 new_creator_data.insert(
1799 (schema_id.clone(), ns.clone(), creator.clone()),
1800 witnesses.clone(),
1801 );
1802 }
1803
1804 for (schema_id, ns, creator) in remove_creator.iter() {
1805 data.push((
1806 creator.clone(),
1807 schema_id.clone(),
1808 ns.clone(),
1809 CreatorQuantity::Quantity(0),
1810 ));
1811 }
1812
1813 for ((schema_id, ns, creator), (quantity, creator_witnesses)) in
1814 creator_update.new_creator.iter()
1815 {
1816 data.push((
1817 creator.clone(),
1818 schema_id.clone(),
1819 ns.clone(),
1820 quantity.clone(),
1821 ));
1822
1823 let mut witnesses = vec![];
1824 for witness in creator_witnesses.iter() {
1825 if witness == &ReservedWords::Witnesses.to_string() {
1826 witnesses.push(WitnessesType::Witnesses);
1827 } else if let Some(w) = self.properties.members.get(witness) {
1828 witnesses.push(WitnessesType::User(w.clone()));
1829 }
1830 }
1831
1832 new_creator_data.insert(
1833 (schema_id.clone(), ns.clone(), creator.clone()),
1834 witnesses,
1835 );
1836 }
1837
1838 for (schema_id, ns, creator, quantity) in
1839 creator_update.update_creator_quantity.iter()
1840 {
1841 data.push((
1842 creator.clone(),
1843 schema_id.clone(),
1844 ns.clone(),
1845 quantity.clone(),
1846 ));
1847 }
1848
1849 for (schema_id, ns, creator, creator_witnesses) in
1850 creator_update.update_creator_witnesses.iter()
1851 {
1852 let mut witnesses = vec![];
1853 for witness in creator_witnesses.iter() {
1854 if witness == &ReservedWords::Witnesses.to_string() {
1855 witnesses.push(WitnessesType::Witnesses);
1856 } else if let Some(w) = self.properties.members.get(witness) {
1857 witnesses.push(WitnessesType::User(w.clone()));
1858 }
1859 }
1860
1861 update_creator_witnesses_data.insert((
1862 schema_id.clone(),
1863 ns.clone(),
1864 creator.clone(),
1865 witnesses,
1866 ));
1867 }
1868
1869 for (schema_id, ns, creator) in creator_update.remove_creator.iter() {
1870 data.push((
1871 creator.clone(),
1872 schema_id.clone(),
1873 ns.clone(),
1874 CreatorQuantity::Quantity(0),
1875 ));
1876 }
1877
1878 (
1879 SubjectRegisterMessage::RegisterData {
1880 gov_version: self.properties.version,
1881 data,
1882 },
1883 WitnessesRegisterMessage::UpdateCreatorsWitnessesFact {
1884 version: self.properties.version,
1885 new_creator: new_creator_data,
1886 remove_creator,
1887 update_creator_witnesses: update_creator_witnesses_data,
1888 new_witnesses,
1889 remove_witnesses,
1890 },
1891 )
1892 }
1893
1894 fn build_creators_register_confirm(
1895 &self,
1896 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
1897 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
1898 ) -> (SubjectRegisterMessage, WitnessesRegisterMessage) {
1899 let data: Vec<(PublicKey, SchemaType, String, CreatorQuantity)> =
1900 remove_creator
1901 .iter()
1902 .map(|x| {
1903 (
1904 x.2.clone(),
1905 x.0.clone(),
1906 x.1.clone(),
1907 CreatorQuantity::Quantity(0),
1908 )
1909 })
1910 .collect();
1911 (
1912 SubjectRegisterMessage::RegisterData {
1913 gov_version: self.properties.version,
1914 data,
1915 },
1916 WitnessesRegisterMessage::UpdateCreatorsWitnessesConfirm {
1917 version: self.properties.version,
1918 remove_creator,
1919 remove_witnesses,
1920 },
1921 )
1922 }
1923
1924 async fn first_role_register(
1925 &self,
1926 ctx: &ActorContext<Self>,
1927 ) -> Result<(), ActorError> {
1928 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
1929
1930 actor
1931 .tell(RoleRegisterMessage::UpdateFact {
1932 version: 0,
1933 appr_quorum: Some(Quorum::Majority),
1934 eval_quorum: HashMap::from([(
1935 SchemaType::Governance,
1936 Quorum::Majority,
1937 )]),
1938 new_approvers: vec![self.subject_metadata.owner.clone()],
1939 new_evaluators: HashMap::from([(
1940 (
1941 SchemaType::Governance,
1942 self.subject_metadata.owner.clone(),
1943 ),
1944 vec![Namespace::new()],
1945 )]),
1946 new_validators: HashMap::from([(
1947 (
1948 SchemaType::Governance,
1949 self.subject_metadata.owner.clone(),
1950 ),
1951 vec![Namespace::new()],
1952 )]),
1953 remove_approvers: vec![],
1954 remove_evaluators: HashMap::new(),
1955 remove_validators: HashMap::new(),
1956 vali_quorum: HashMap::from([(
1957 SchemaType::Governance,
1958 Quorum::Majority,
1959 )]),
1960 })
1961 .await
1962 }
1963
1964 async fn update_gov_version(
1965 &self,
1966 ctx: &ActorContext<Self>,
1967 ) -> Result<(), ActorError> {
1968 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
1969
1970 actor
1971 .tell(RoleRegisterMessage::UpdateVersion {
1972 version: self.properties.version + 1,
1973 })
1974 .await
1975 }
1976
1977 async fn update_registers_fact(
1978 &self,
1979 ctx: &ActorContext<Self>,
1980 update: RolesUpdate,
1981 creator_update: CreatorRoleUpdate,
1982 ) -> Result<(), ActorError> {
1983 let RolesUpdate {
1984 appr_quorum,
1985 new_approvers,
1986 remove_approvers,
1987 eval_quorum,
1988 new_evaluators,
1989 remove_evaluators,
1990 vali_quorum,
1991 new_validators,
1992 remove_validators,
1993 new_creator,
1994 remove_creator,
1995 new_witnesses,
1996 remove_witnesses,
1997 } = update;
1998
1999 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2000 actor
2001 .tell(RoleRegisterMessage::UpdateFact {
2002 version: self.properties.version,
2003 appr_quorum,
2004 eval_quorum,
2005 new_approvers,
2006 new_evaluators,
2007 new_validators,
2008 remove_approvers,
2009 remove_evaluators,
2010 remove_validators,
2011 vali_quorum,
2012 })
2013 .await?;
2014
2015 let (subj_msg, wit_msg) = self.build_creators_register_fact(
2016 new_creator,
2017 remove_creator,
2018 creator_update,
2019 new_witnesses,
2020 remove_witnesses,
2021 );
2022
2023 let actor =
2024 ctx.get_child::<SubjectRegister>("subject_register").await?;
2025
2026 actor.tell(subj_msg).await?;
2027
2028 let actor = ctx
2029 .get_child::<WitnessesRegister>("witnesses_register")
2030 .await?;
2031
2032 actor.tell(wit_msg).await
2033 }
2034
2035 async fn update_registers_confirm(
2036 &self,
2037 ctx: &ActorContext<Self>,
2038 update: RolesUpdateConfirm,
2039 ) -> Result<(), ActorError> {
2040 let RolesUpdateConfirm {
2041 new_approver,
2042 remove_approver,
2043 new_evaluator,
2044 remove_evaluators,
2045 new_validator,
2046 remove_validators,
2047 remove_creator,
2048 remove_witnesses,
2049 } = update;
2050
2051 let actor = ctx.get_child::<RoleRegister>("role_register").await?;
2052 actor
2053 .tell(RoleRegisterMessage::UpdateConfirm {
2054 version: self.properties.version,
2055 new_approver,
2056 remove_approver,
2057 new_evaluator,
2058 remove_evaluators,
2059 new_validator,
2060 remove_validators,
2061 })
2062 .await?;
2063
2064 let (subj_msg, wit_msg) = self
2065 .build_creators_register_confirm(remove_creator, remove_witnesses);
2066
2067 let actor =
2068 ctx.get_child::<SubjectRegister>("subject_register").await?;
2069
2070 actor.tell(subj_msg).await?;
2071
2072 let actor = ctx
2073 .get_child::<WitnessesRegister>("witnesses_register")
2074 .await?;
2075
2076 actor.tell(wit_msg).await
2077 }
2078
2079 async fn verify_new_ledger_events(
2080 &mut self,
2081 ctx: &mut ActorContext<Self>,
2082 events: Vec<SignedLedger>,
2083 hash: &HashAlgorithm,
2084 ) -> Result<(), ActorError> {
2085 let mut iter = events.into_iter();
2086 let last_ledger = get_last_event(ctx).await?;
2087
2088 let mut last_ledger = if let Some(last_ledger) = last_ledger {
2089 last_ledger
2090 } else {
2091 let Some(first) = iter.next() else {
2092 return Ok(());
2093 };
2094 if let Err(e) = Self::verify_first_ledger_event(
2095 ctx,
2096 &first,
2097 hash,
2098 Metadata::from(self.clone()),
2099 )
2100 .await
2101 {
2102 return Err(ActorError::Functional {
2103 description: e.to_string(),
2104 });
2105 }
2106
2107 self.on_event(first.clone(), ctx).await;
2108 Self::register(
2109 ctx,
2110 RegisterMessage::RegisterGov {
2111 gov_id: self.subject_metadata.subject_id.to_string(),
2112 name: self.subject_metadata.name.clone(),
2113 description: self.subject_metadata.description.clone(),
2114 },
2115 )
2116 .await?;
2117
2118 self.first_role_register(ctx).await?;
2119
2120 Self::event_to_sink(
2121 ctx,
2122 DataForSink {
2123 gov_id: None,
2124 subject_id: self.subject_metadata.subject_id.to_string(),
2125 sn: self.subject_metadata.sn,
2126 owner: self.subject_metadata.owner.to_string(),
2127 namespace: String::default(),
2128 schema_id: self.subject_metadata.schema_id.clone(),
2129 issuer: first
2130 .content()
2131 .event_request
2132 .signature()
2133 .signer
2134 .to_string(),
2135 event_ledger_timestamp: first
2136 .signature()
2137 .timestamp
2138 .as_nanos(),
2139 event_request_timestamp: first
2140 .content()
2141 .event_request
2142 .signature()
2143 .timestamp
2144 .as_nanos(),
2145 gov_version: first.content().gov_version,
2146 event_data_ledger: EventLedgerDataForSink::build(
2147 &first.content().protocols,
2148 &self.properties.to_value_wrapper().0,
2149 ),
2150 },
2151 first.content().event_request.content(),
2152 )
2153 .await?;
2154
2155 first
2156 };
2157
2158 for event in iter {
2159 let actual_ledger_hash =
2160 hash_borsh(&*hash.hasher(), &last_ledger.0).map_err(|e| {
2161 ActorError::FunctionalCritical {
2162 description: format!(
2163 "Can not creacte actual ledger event hash: {}",
2164 e
2165 ),
2166 }
2167 })?;
2168 let last_data = LastData {
2169 gov_version: last_ledger.content().gov_version,
2170 vali_data: last_ledger
2171 .content()
2172 .protocols
2173 .get_validation_data(),
2174 };
2175
2176 let last_event_is_ok = match Self::verify_new_ledger_event(
2177 ctx,
2178 &event,
2179 Metadata::from(self.clone()),
2180 actual_ledger_hash,
2181 last_data,
2182 hash,
2183 )
2184 .await
2185 {
2186 Ok(last_event_is_ok) => last_event_is_ok,
2187 Err(e) => {
2188 if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
2190 continue;
2192 } else {
2193 return Err(ActorError::Functional {
2194 description: e.to_string(),
2195 });
2196 }
2197 }
2198 };
2199 let (update_fact, update_confirm) = if last_event_is_ok {
2200 match event.content().event_request.content().clone() {
2201 EventRequest::Transfer(transfer_request) => {
2202 self.transfer(
2203 ctx,
2204 transfer_request.new_owner.clone(),
2205 0,
2206 )
2207 .await?;
2208
2209 self.update_gov_version(ctx).await?;
2210 }
2211 EventRequest::Reject(..) => {
2212 self.reject(ctx, 0).await?;
2213
2214 self.update_gov_version(ctx).await?;
2215 }
2216 EventRequest::EOL(..) => {
2217 self.eol(ctx).await?;
2218
2219 Self::register(
2220 ctx,
2221 RegisterMessage::EOLGov {
2222 gov_id: self
2223 .subject_metadata
2224 .subject_id
2225 .to_string(),
2226 },
2227 )
2228 .await?;
2229
2230 self.update_gov_version(ctx).await?;
2231 }
2232 _ => {}
2233 };
2234
2235 Self::event_to_sink(
2236 ctx,
2237 DataForSink {
2238 gov_id: None,
2239 subject_id: self
2240 .subject_metadata
2241 .subject_id
2242 .to_string(),
2243 sn: self.subject_metadata.sn,
2244 owner: self.subject_metadata.owner.to_string(),
2245 namespace: String::default(),
2246 schema_id: self.subject_metadata.schema_id.clone(),
2247 issuer: event
2248 .content()
2249 .event_request
2250 .signature()
2251 .signer
2252 .to_string(),
2253 event_ledger_timestamp: event
2254 .signature()
2255 .timestamp
2256 .as_nanos(),
2257 event_request_timestamp: event
2258 .content()
2259 .event_request
2260 .signature()
2261 .timestamp
2262 .as_nanos(),
2263 gov_version: event.content().gov_version,
2264 event_data_ledger: EventLedgerDataForSink::build(
2265 &event.content().protocols,
2266 &self.properties.to_value_wrapper().0,
2267 ),
2268 },
2269 event.content().event_request.content(),
2270 )
2271 .await?;
2272
2273 let update_confirm = if let EventRequest::Confirm(..) =
2274 &event.content().event_request.content()
2275 {
2276 self.confirm(ctx, event.signature().signer.clone(), 0)
2277 .await?;
2278
2279 if let Some(new_owner_key) =
2280 &self.subject_metadata.new_owner
2281 {
2282 Some(self.properties.roles_update_remove_confirm(
2283 &self.subject_metadata.owner,
2284 new_owner_key,
2285 ))
2286 } else {
2287 None
2288 }
2289 } else {
2290 None
2291 };
2292
2293 let update_fact = if let EventRequest::Fact(fact_request) =
2294 &event.content().event_request.content()
2295 {
2296 let governance_event = serde_json::from_value::<GovernanceEvent>(fact_request.payload.0.clone()).map_err(|e| {
2297 ActorError::FunctionalCritical{description: format!("Can not convert payload into governance event in governance fact event: {}", e)}
2298 })?;
2299
2300 let rm_members = governance_event
2301 .members
2302 .as_ref()
2303 .map_or_else(|| None, |members| members.remove.clone());
2304
2305 let rm_schemas = governance_event
2306 .schemas
2307 .as_ref()
2308 .map_or_else(|| None, |schemas| schemas.remove.clone());
2309
2310 let rm_roles =
2311 if rm_members.is_some() || rm_schemas.is_some() {
2312 Some(self.properties.roles_update_remove_fact(
2313 rm_members, rm_schemas,
2314 ))
2315 } else {
2316 None
2317 };
2318
2319 let creator_update = governance_event_update_creator_change(
2320 &governance_event,
2321 &self.properties.members,
2322 &self.properties.roles_schema,
2323 );
2324
2325 Some((governance_event, creator_update, rm_roles))
2326 } else {
2327 None
2328 };
2329 (update_fact, update_confirm)
2330 } else {
2331 (None, None)
2332 };
2333
2334 self.on_event(event.clone(), ctx).await;
2336
2337 if let Some((event, creator_update, rm_roles)) = update_fact {
2338 let update = governance_event_roles_update_fact(
2339 &event,
2340 &self.properties.members,
2341 rm_roles,
2342 );
2343
2344 self.update_registers_fact(ctx, update, creator_update)
2345 .await?;
2346 }
2347
2348 if let Some(update_confirm) = update_confirm {
2349 self.update_registers_confirm(ctx, update_confirm).await?;
2350 }
2351
2352 last_ledger = event.clone();
2354 }
2355
2356 Ok(())
2357 }
2358
2359 async fn delete_tracker_references(
2360 &self,
2361 ctx: &mut ActorContext<Self>,
2362 subject_id: DigestIdentifier,
2363 ) -> Result<(), ActorError> {
2364 let mut cleanup_errors = Vec::new();
2365
2366 let subject_register = match ctx
2367 .create_child("subject_register", SubjectRegister::initial(()))
2368 .await
2369 {
2370 Ok(actor) => Some(actor),
2371 Err(ActorError::Exists { .. }) => {
2372 match ctx.get_child::<SubjectRegister>("subject_register").await
2373 {
2374 Ok(actor) => Some(actor),
2375 Err(error) => {
2376 cleanup_errors
2377 .push(format!("subject_register lookup: {error}"));
2378 None
2379 }
2380 }
2381 }
2382 Err(error) => {
2383 cleanup_errors.push(format!("subject_register: {error}"));
2384 None
2385 }
2386 };
2387
2388 if let Some(subject_register) = subject_register {
2389 match subject_register
2390 .ask(SubjectRegisterMessage::DeleteSubject {
2391 subject_id: subject_id.clone(),
2392 })
2393 .await
2394 {
2395 Ok(SubjectRegisterResponse::Ok) => {}
2396 Ok(other) => cleanup_errors.push(format!(
2397 "subject_register: unexpected response {other:?}"
2398 )),
2399 Err(error) => {
2400 cleanup_errors.push(format!("subject_register: {error}"))
2401 }
2402 }
2403
2404 if let Err(error) = subject_register.ask_stop().await {
2405 cleanup_errors.push(format!("subject_register stop: {error}"));
2406 }
2407 }
2408
2409 let sn_register = match ctx
2410 .create_child("sn_register", SnRegister::initial(()))
2411 .await
2412 {
2413 Ok(actor) => Some(actor),
2414 Err(ActorError::Exists { .. }) => {
2415 match ctx.get_child::<SnRegister>("sn_register").await {
2416 Ok(actor) => Some(actor),
2417 Err(error) => {
2418 cleanup_errors
2419 .push(format!("sn_register lookup: {error}"));
2420 None
2421 }
2422 }
2423 }
2424 Err(error) => {
2425 cleanup_errors.push(format!("sn_register: {error}"));
2426 None
2427 }
2428 };
2429
2430 if let Some(sn_register) = sn_register {
2431 match sn_register
2432 .ask(SnRegisterMessage::DeleteSubject {
2433 subject_id: subject_id.clone(),
2434 })
2435 .await
2436 {
2437 Ok(SnRegisterResponse::Ok) => {}
2438 Ok(other) => cleanup_errors.push(format!(
2439 "sn_register: unexpected response {other:?}"
2440 )),
2441 Err(error) => {
2442 cleanup_errors.push(format!("sn_register: {error}"))
2443 }
2444 }
2445
2446 if let Err(error) = sn_register.ask_stop().await {
2447 cleanup_errors.push(format!("sn_register stop: {error}"));
2448 }
2449 }
2450
2451 let witnesses_register = match ctx
2452 .create_child("witnesses_register", WitnessesRegister::initial(()))
2453 .await
2454 {
2455 Ok(actor) => Some(actor),
2456 Err(ActorError::Exists { .. }) => {
2457 match ctx
2458 .get_child::<WitnessesRegister>("witnesses_register")
2459 .await
2460 {
2461 Ok(actor) => Some(actor),
2462 Err(error) => {
2463 cleanup_errors.push(format!(
2464 "witnesses_register lookup: {error}"
2465 ));
2466 None
2467 }
2468 }
2469 }
2470 Err(error) => {
2471 cleanup_errors.push(format!("witnesses_register: {error}"));
2472 None
2473 }
2474 };
2475
2476 if let Some(witnesses_register) = witnesses_register {
2477 match witnesses_register
2478 .ask(WitnessesRegisterMessage::DeleteSubject {
2479 subject_id: subject_id.clone(),
2480 })
2481 .await
2482 {
2483 Ok(WitnessesRegisterResponse::Ok) => {}
2484 Ok(_) => cleanup_errors
2485 .push("witnesses_register: unexpected response".to_owned()),
2486 Err(error) => {
2487 cleanup_errors.push(format!("witnesses_register: {error}"))
2488 }
2489 }
2490
2491 if let Err(error) = witnesses_register.ask_stop().await {
2492 cleanup_errors
2493 .push(format!("witnesses_register stop: {error}"));
2494 }
2495 }
2496
2497 if cleanup_errors.is_empty() {
2498 Ok(())
2499 } else {
2500 Err(ActorError::Functional {
2501 description: cleanup_errors.join("; "),
2502 })
2503 }
2504 }
2505
2506 async fn delete_governance_storage(
2507 &self,
2508 ctx: &mut ActorContext<Self>,
2509 ) -> Result<(), ActorError> {
2510 let mut cleanup_errors = Vec::new();
2511
2512 if self.properties.has_this_role(HashThisRole::Gov {
2513 who: (*self.our_key).clone(),
2514 role: RoleTypes::Approver,
2515 }) {
2516 let hash = self.hash.map_or_else(
2517 || {
2518 cleanup_errors
2519 .push("approver init: missing hash".to_owned());
2520 None
2521 },
2522 Some,
2523 );
2524
2525 let network = ctx
2526 .system()
2527 .get_helper::<Arc<NetworkSender>>("network")
2528 .await
2529 .map_or_else(
2530 || {
2531 cleanup_errors.push(
2532 "approver init: missing network helper".to_owned(),
2533 );
2534 None
2535 },
2536 Some,
2537 );
2538
2539 if let (Some(hash), Some(network)) = (hash, network) {
2540 let approver = match ctx
2541 .get_child::<ApprPersist>("approver")
2542 .await
2543 {
2544 Ok(actor) => Some(actor),
2545 Err(_) => match self
2546 .up_approver_only(ctx, &hash, &network)
2547 .await
2548 {
2549 Ok(()) => match ctx
2550 .get_child::<ApprPersist>("approver")
2551 .await
2552 {
2553 Ok(actor) => Some(actor),
2554 Err(error) => {
2555 cleanup_errors
2556 .push(format!("approver lookup: {error}"));
2557 None
2558 }
2559 },
2560 Err(error) => {
2561 cleanup_errors.push(format!("approver: {error}"));
2562 None
2563 }
2564 },
2565 };
2566
2567 if let Some(approver) = approver {
2568 match approver
2569 .ask(crate::approval::persist::ApprPersistMessage::PurgeStorage)
2570 .await
2571 {
2572 Ok(crate::approval::persist::ApprPersistResponse::Ok) => {}
2573 Ok(_) => cleanup_errors
2574 .push("approver: unexpected response".to_owned()),
2575 Err(error) => {
2576 cleanup_errors.push(format!("approver: {error}"))
2577 }
2578 }
2579
2580 if let Err(error) = approver.ask_stop().await {
2581 cleanup_errors.push(format!("approver stop: {error}"));
2582 }
2583 }
2584 }
2585 }
2586
2587 let contract_register = match ctx
2588 .create_child("contract_register", ContractRegister::initial(()))
2589 .await
2590 {
2591 Ok(actor) => Some(actor),
2592 Err(ActorError::Exists { .. }) => {
2593 match ctx
2594 .get_child::<ContractRegister>("contract_register")
2595 .await
2596 {
2597 Ok(actor) => Some(actor),
2598 Err(error) => {
2599 cleanup_errors
2600 .push(format!("contract_register lookup: {error}"));
2601 None
2602 }
2603 }
2604 }
2605 Err(error) => {
2606 cleanup_errors.push(format!("contract_register: {error}"));
2607 None
2608 }
2609 };
2610
2611 if let Some(contract_register) = contract_register {
2612 if let Err(error) = self
2613 .delete_all_contract_artifacts(ctx, &contract_register)
2614 .await
2615 {
2616 cleanup_errors.push(format!("contract_artifacts: {error}"));
2617 }
2618
2619 match contract_register
2620 .ask(ContractRegisterMessage::PurgeStorage)
2621 .await
2622 {
2623 Ok(ContractRegisterResponse::Ok) => {}
2624 Ok(other) => cleanup_errors.push(format!(
2625 "contract_register: unexpected response {other:?}"
2626 )),
2627 Err(error) => {
2628 cleanup_errors.push(format!("contract_register: {error}"))
2629 }
2630 }
2631
2632 if let Err(error) = contract_register.ask_stop().await {
2633 cleanup_errors.push(format!("contract_register stop: {error}"));
2634 }
2635 }
2636
2637 let role_register = match ctx
2638 .create_child("role_register", RoleRegister::initial(()))
2639 .await
2640 {
2641 Ok(actor) => Some(actor),
2642 Err(ActorError::Exists { .. }) => {
2643 match ctx.get_child::<RoleRegister>("role_register").await {
2644 Ok(actor) => Some(actor),
2645 Err(error) => {
2646 cleanup_errors
2647 .push(format!("role_register lookup: {error}"));
2648 None
2649 }
2650 }
2651 }
2652 Err(error) => {
2653 cleanup_errors.push(format!("role_register: {error}"));
2654 None
2655 }
2656 };
2657
2658 if let Some(role_register) = role_register {
2659 match role_register.ask(RoleRegisterMessage::PurgeStorage).await {
2660 Ok(RoleRegisterResponse::Ok) => {}
2661 Ok(other) => cleanup_errors.push(format!(
2662 "role_register: unexpected response {other:?}"
2663 )),
2664 Err(error) => {
2665 cleanup_errors.push(format!("role_register: {error}"))
2666 }
2667 }
2668
2669 if let Err(error) = role_register.ask_stop().await {
2670 cleanup_errors.push(format!("role_register stop: {error}"));
2671 }
2672 }
2673
2674 let subject_register = match ctx
2675 .create_child("subject_register", SubjectRegister::initial(()))
2676 .await
2677 {
2678 Ok(actor) => Some(actor),
2679 Err(ActorError::Exists { .. }) => {
2680 match ctx.get_child::<SubjectRegister>("subject_register").await
2681 {
2682 Ok(actor) => Some(actor),
2683 Err(error) => {
2684 cleanup_errors
2685 .push(format!("subject_register lookup: {error}"));
2686 None
2687 }
2688 }
2689 }
2690 Err(error) => {
2691 cleanup_errors.push(format!("subject_register: {error}"));
2692 None
2693 }
2694 };
2695
2696 if let Some(subject_register) = subject_register {
2697 match subject_register
2698 .ask(SubjectRegisterMessage::PurgeStorage)
2699 .await
2700 {
2701 Ok(SubjectRegisterResponse::Ok) => {}
2702 Ok(other) => cleanup_errors.push(format!(
2703 "subject_register: unexpected response {other:?}"
2704 )),
2705 Err(error) => {
2706 cleanup_errors.push(format!("subject_register: {error}"))
2707 }
2708 }
2709
2710 if let Err(error) = subject_register.ask_stop().await {
2711 cleanup_errors.push(format!("subject_register stop: {error}"));
2712 }
2713 }
2714
2715 let sn_register = match ctx
2716 .create_child("sn_register", SnRegister::initial(()))
2717 .await
2718 {
2719 Ok(actor) => Some(actor),
2720 Err(ActorError::Exists { .. }) => {
2721 match ctx.get_child::<SnRegister>("sn_register").await {
2722 Ok(actor) => Some(actor),
2723 Err(error) => {
2724 cleanup_errors
2725 .push(format!("sn_register lookup: {error}"));
2726 None
2727 }
2728 }
2729 }
2730 Err(error) => {
2731 cleanup_errors.push(format!("sn_register: {error}"));
2732 None
2733 }
2734 };
2735
2736 if let Some(sn_register) = sn_register {
2737 match sn_register.ask(SnRegisterMessage::PurgeStorage).await {
2738 Ok(SnRegisterResponse::Ok) => {}
2739 Ok(other) => cleanup_errors.push(format!(
2740 "sn_register: unexpected response {other:?}"
2741 )),
2742 Err(error) => {
2743 cleanup_errors.push(format!("sn_register: {error}"))
2744 }
2745 }
2746
2747 if let Err(error) = sn_register.ask_stop().await {
2748 cleanup_errors.push(format!("sn_register stop: {error}"));
2749 }
2750 }
2751
2752 let witnesses_register = match ctx
2753 .create_child("witnesses_register", WitnessesRegister::initial(()))
2754 .await
2755 {
2756 Ok(actor) => Some(actor),
2757 Err(ActorError::Exists { .. }) => {
2758 match ctx
2759 .get_child::<WitnessesRegister>("witnesses_register")
2760 .await
2761 {
2762 Ok(actor) => Some(actor),
2763 Err(error) => {
2764 cleanup_errors.push(format!(
2765 "witnesses_register lookup: {error}"
2766 ));
2767 None
2768 }
2769 }
2770 }
2771 Err(error) => {
2772 cleanup_errors.push(format!("witnesses_register: {error}"));
2773 None
2774 }
2775 };
2776
2777 if let Some(witnesses_register) = witnesses_register {
2778 match witnesses_register
2779 .ask(WitnessesRegisterMessage::PurgeStorage)
2780 .await
2781 {
2782 Ok(WitnessesRegisterResponse::Ok) => {}
2783 Ok(_) => cleanup_errors
2784 .push("witnesses_register: unexpected response".to_owned()),
2785 Err(error) => {
2786 cleanup_errors.push(format!("witnesses_register: {error}"))
2787 }
2788 }
2789
2790 if let Err(error) = witnesses_register.ask_stop().await {
2791 cleanup_errors
2792 .push(format!("witnesses_register stop: {error}"));
2793 }
2794 }
2795
2796 if let Err(error) = purge_storage(ctx).await {
2797 cleanup_errors.push(format!("governance: {error}"));
2798 }
2799
2800 if cleanup_errors.is_empty() {
2801 Ok(())
2802 } else {
2803 Err(ActorError::Functional {
2804 description: cleanup_errors.join("; "),
2805 })
2806 }
2807 }
2808}
2809
2810#[derive(Debug, Clone)]
2812pub enum GovernanceMessage {
2813 GetMetadata,
2814 GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
2815 GetLastLedger,
2816 DeleteTrackerReferences { subject_id: DigestIdentifier },
2817 DeleteGovernanceStorage,
2818 UpdateLedger { events: Vec<SignedLedger> },
2819 GetGovernance,
2820 GetVersion,
2821}
2822
2823impl Message for GovernanceMessage {}
2824
2825#[derive(Debug, Clone)]
2826pub enum GovernanceResponse {
2827 Metadata(Box<Metadata>),
2829 UpdateResult(u64, PublicKey, Option<PublicKey>),
2830 Ledger {
2831 ledger: Vec<SignedLedger>,
2832 is_all: bool,
2833 },
2834 LastLedger {
2835 ledger_event: Box<Option<SignedLedger>>,
2836 },
2837 Governance(Box<GovernanceData>),
2838 NewCompilers(Vec<SchemaType>),
2839 Sn(u64),
2840 Version(u64),
2841 Ok,
2842}
2843impl Response for GovernanceResponse {}
2844
2845#[async_trait]
2846impl Actor for Governance {
2847 type Event = SignedLedger;
2848 type Message = GovernanceMessage;
2849 type Response = GovernanceResponse;
2850
2851 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
2852 parent_span.map_or_else(
2853 || info_span!("Governance", id),
2854 |parent_span| info_span!(parent: parent_span, "Governance", id),
2855 )
2856 }
2857
2858 async fn pre_start(
2859 &mut self,
2860 ctx: &mut ActorContext<Self>,
2861 ) -> Result<(), ActorError> {
2862 if let Err(e) = self.init_store("governance", None, true, ctx).await {
2863 error!(
2864 error = %e,
2865 "Failed to initialize governance store"
2866 );
2867 return Err(e);
2868 }
2869
2870 let safe_mode = if let Some(config) =
2871 ctx.system().get_helper::<ConfigHelper>("config").await
2872 {
2873 config.safe_mode
2874 } else {
2875 return Err(ActorError::Helper {
2876 name: "config".to_owned(),
2877 reason: "Not found".to_owned(),
2878 });
2879 };
2880
2881 if safe_mode {
2882 let Some(hash) = self.hash else {
2883 error!("Hash algorithm not found");
2884 return Err(ActorError::FunctionalCritical {
2885 description: "Hash algorithm is None".to_string(),
2886 });
2887 };
2888
2889 let Some(network) = ctx
2890 .system()
2891 .get_helper::<Arc<NetworkSender>>("network")
2892 .await
2893 else {
2894 error!("Network helper not found");
2895 return Err(ActorError::Helper {
2896 name: "network".to_owned(),
2897 reason: "Not found".to_owned(),
2898 });
2899 };
2900
2901 self.up_approver_only(ctx, &hash, &network).await?;
2902 return Ok(());
2903 }
2904
2905 let Some(hash) = self.hash else {
2906 error!("Hash algorithm not found");
2907 return Err(ActorError::FunctionalCritical {
2908 description: "Hash algorithm is None".to_string(),
2909 });
2910 };
2911
2912 let Some(ext_db): Option<Arc<ExternalDB>> =
2913 ctx.system().get_helper("ext_db").await
2914 else {
2915 error!("External database helper not found");
2916 return Err(ActorError::Helper {
2917 name: "ext_db".to_owned(),
2918 reason: "Not found".to_owned(),
2919 });
2920 };
2921
2922 let Some(ave_sink): Option<AveSink> =
2923 ctx.system().get_helper("sink").await
2924 else {
2925 error!("Sink helper not found");
2926 return Err(ActorError::Helper {
2927 name: "sink".to_owned(),
2928 reason: "Not found".to_owned(),
2929 });
2930 };
2931
2932 let Some(network) = ctx
2933 .system()
2934 .get_helper::<Arc<NetworkSender>>("network")
2935 .await
2936 else {
2937 error!("Network helper not found");
2938 return Err(ActorError::Helper {
2939 name: "network".to_owned(),
2940 reason: "Not found".to_owned(),
2941 });
2942 };
2943
2944 if let Err(e) = ctx
2945 .create_child("role_register", RoleRegister::initial(()))
2946 .await
2947 {
2948 error!(
2949 error = %e,
2950 "Failed to create role_register child"
2951 );
2952 return Err(e);
2953 }
2954
2955 if let Err(e) = ctx
2956 .create_child("subject_register", SubjectRegister::initial(()))
2957 .await
2958 {
2959 error!(
2960 error = %e,
2961 "Failed to create subject_register child"
2962 );
2963 return Err(e);
2964 }
2965
2966 if let Err(e) = ctx
2967 .create_child("sn_register", SnRegister::initial(()))
2968 .await
2969 {
2970 error!(
2971 error = %e,
2972 "Failed to create sn_register child"
2973 );
2974 return Err(e);
2975 }
2976
2977 if let Err(e) = ctx
2978 .create_child("witnesses_register", WitnessesRegister::initial(()))
2979 .await
2980 {
2981 error!(
2982 error = %e,
2983 "Failed to create witnesses_register child"
2984 );
2985 return Err(e);
2986 }
2987
2988 if let Err(e) = ctx
2989 .create_child("contract_register", ContractRegister::initial(()))
2990 .await
2991 {
2992 error!(
2993 error = %e,
2994 "Failed to create contract_register child"
2995 );
2996 return Err(e);
2997 }
2998
2999 if self.subject_metadata.active {
3000 if let Err(e) = self.build_childs(ctx, &hash, &network).await {
3001 error!(
3002 error = %e,
3003 "Failed to build governance child actors"
3004 );
3005 return Err(e);
3006 }
3007
3008 let sink_actor = match ctx
3009 .create_child(
3010 "sink_data",
3011 SinkData {
3012 public_key: self.our_key.to_string(),
3013 },
3014 )
3015 .await
3016 {
3017 Ok(actor) => actor,
3018 Err(e) => {
3019 error!(
3020 error = %e,
3021 "Failed to create sink_data child"
3022 );
3023 return Err(e);
3024 }
3025 };
3026 let sink =
3027 Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
3028 ctx.system().run_sink(sink).await;
3029
3030 let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
3031 ctx.system().run_sink(sink).await;
3032 }
3033
3034 if self.service {
3035 let Some(config): Option<ConfigHelper> =
3036 ctx.system().get_helper("config").await
3037 else {
3038 error!("Config helper not found");
3039 return Err(ActorError::Helper {
3040 name: "config".to_owned(),
3041 reason: "Not found".to_owned(),
3042 });
3043 };
3044
3045 let version_sync_tick_interval = Duration::from_secs(
3046 config.sync_governance.interval_secs.max(1),
3047 );
3048 let version_sync_response_timeout = Duration::from_secs(
3049 config.sync_governance.response_timeout_secs.max(1),
3050 );
3051 let tracker_sync_tick_interval =
3052 Duration::from_secs(config.sync_tracker.interval_secs.max(1));
3053 let tracker_sync_response_timeout = Duration::from_secs(
3054 config.sync_tracker.response_timeout_secs.max(1),
3055 );
3056 let tracker_sync_update_timeout = Duration::from_secs(
3057 config.sync_tracker.update_timeout_secs.max(1),
3058 );
3059
3060 if let Err(e) = ctx
3061 .create_child(
3062 "tracker_sync",
3063 TrackerSync::new(
3064 self.subject_metadata.subject_id.clone(),
3065 self.our_key.clone(),
3066 network.clone(),
3067 TrackerSyncConfig {
3068 service: self.service,
3069 tick_interval: tracker_sync_tick_interval,
3070 response_timeout: tracker_sync_response_timeout,
3071 page_size: config.sync_tracker.page_size,
3072 update_batch_size: config
3073 .sync_tracker
3074 .update_batch_size,
3075 update_timeout: tracker_sync_update_timeout,
3076 },
3077 ),
3078 )
3079 .await
3080 {
3081 error!(
3082 error = %e,
3083 subject_id = %self.subject_metadata.subject_id,
3084 "Failed to create tracker_sync child"
3085 );
3086 return Err(e);
3087 }
3088
3089 let version_sync = ctx
3090 .create_child(
3091 "version_sync",
3092 GovernanceVersionSync::new(
3093 self.subject_metadata.subject_id.clone(),
3094 self.our_key.clone(),
3095 network.clone(),
3096 self.properties.version,
3097 config.sync_governance.sample_size,
3098 version_sync_tick_interval,
3099 version_sync_response_timeout,
3100 ),
3101 )
3102 .await?;
3103
3104 let governance_peers = self
3105 .properties
3106 .get_witnesses(WitnessesData::Gov)
3107 .map_err(|e| ActorError::Functional {
3108 description: e.to_string(),
3109 })?;
3110
3111 let _ = version_sync
3112 .ask(GovernanceVersionSyncMessage::RefreshGovernance {
3113 version: self.properties.version,
3114 governance_peers,
3115 })
3116 .await?;
3117 }
3118
3119 Ok(())
3120 }
3121}
3122
3123#[async_trait]
3124impl Handler<Self> for Governance {
3125 async fn handle_message(
3126 &mut self,
3127 _sender: ActorPath,
3128 msg: GovernanceMessage,
3129 ctx: &mut ActorContext<Self>,
3130 ) -> Result<GovernanceResponse, ActorError> {
3131 match msg {
3132 GovernanceMessage::GetVersion => {
3133 Ok(GovernanceResponse::Version(self.properties.version))
3134 }
3135 GovernanceMessage::GetLedger { lo_sn, hi_sn } => {
3136 let (ledger, is_all) =
3137 self.get_ledger(ctx, lo_sn, hi_sn).await?;
3138 Ok(GovernanceResponse::Ledger { ledger, is_all })
3139 }
3140 GovernanceMessage::GetLastLedger => {
3141 let ledger_event = self.get_last_ledger(ctx).await?;
3142 Ok(GovernanceResponse::LastLedger {
3143 ledger_event: Box::new(ledger_event),
3144 })
3145 }
3146 GovernanceMessage::GetMetadata => Ok(GovernanceResponse::Metadata(
3147 Box::new(Metadata::from(self.clone())),
3148 )),
3149 GovernanceMessage::DeleteTrackerReferences { subject_id } => {
3150 self.delete_tracker_references(ctx, subject_id.clone())
3151 .await?;
3152
3153 debug!(
3154 msg_type = "DeleteTrackerReferences",
3155 subject_id = %subject_id,
3156 governance_id = %self.subject_metadata.subject_id,
3157 "Tracker references deleted from governance"
3158 );
3159
3160 Ok(GovernanceResponse::Ok)
3161 }
3162 GovernanceMessage::DeleteGovernanceStorage => {
3163 self.delete_governance_storage(ctx).await?;
3164
3165 debug!(
3166 msg_type = "DeleteGovernanceStorage",
3167 governance_id = %self.subject_metadata.subject_id,
3168 "Governance storage deleted"
3169 );
3170
3171 Ok(GovernanceResponse::Ok)
3172 }
3173 GovernanceMessage::UpdateLedger { events } => {
3174 let events_count = events.len();
3175 if let Err(e) =
3176 self.manager_new_ledger_events(ctx, events).await
3177 {
3178 warn!(
3179 msg_type = "UpdateLedger",
3180 error = %e,
3181 subject_id = %self.subject_metadata.subject_id,
3182 events_count = events_count,
3183 "Failed to verify new ledger events"
3184 );
3185 return Err(e);
3186 };
3187
3188 debug!(
3189 msg_type = "UpdateLedger",
3190 subject_id = %self.subject_metadata.subject_id,
3191 sn = self.subject_metadata.sn,
3192 events_count = events_count,
3193 "Ledger updated successfully"
3194 );
3195
3196 Ok(GovernanceResponse::UpdateResult(
3197 self.subject_metadata.sn,
3198 self.subject_metadata.owner.clone(),
3199 self.subject_metadata.new_owner.clone(),
3200 ))
3201 }
3202 GovernanceMessage::GetGovernance => {
3203 Ok(GovernanceResponse::Governance(Box::new(
3204 self.properties.clone(),
3205 )))
3206 }
3207 }
3208 }
3209
3210 async fn on_event(
3211 &mut self,
3212 event: SignedLedger,
3213 ctx: &mut ActorContext<Self>,
3214 ) {
3215 if let Err(e) = self.persist(&event, ctx).await {
3216 error!(
3217 error = %e,
3218 subject_id = %self.subject_metadata.subject_id,
3219 sn = self.subject_metadata.sn,
3220 "Failed to persist event"
3221 );
3222 emit_fail(ctx, e).await;
3223 };
3224
3225 if let Err(e) = ctx.publish_event(event).await {
3226 error!(
3227 error = %e,
3228 subject_id = %self.subject_metadata.subject_id,
3229 sn = self.subject_metadata.sn,
3230 "Failed to publish event"
3231 );
3232 emit_fail(ctx, e).await;
3233 } else {
3234 debug!(
3235 subject_id = %self.subject_metadata.subject_id,
3236 sn = self.subject_metadata.sn,
3237 "Event persisted and published successfully"
3238 );
3239 }
3240 }
3241
3242 async fn on_child_fault(
3243 &mut self,
3244 error: ActorError,
3245 ctx: &mut ActorContext<Self>,
3246 ) -> ChildAction {
3247 error!(
3248 error = %error,
3249 subject_id = %self.subject_metadata.subject_id,
3250 "Child fault occurred"
3251 );
3252 emit_fail(ctx, error).await;
3253 ChildAction::Stop
3254 }
3255}
3256
3257#[async_trait]
3258impl PersistentActor for Governance {
3259 type Persistence = FullPersistence;
3260 type InitParams = (
3261 Option<(SubjectMetadata, GovernanceData)>,
3262 Arc<PublicKey>,
3263 HashAlgorithm,
3264 bool,
3265 );
3266
3267 fn update(&mut self, state: Self) {
3268 self.properties = state.properties;
3269 self.subject_metadata = state.subject_metadata;
3270 }
3271
3272 fn create_initial(params: Self::InitParams) -> Self {
3273 let (subject_metadata, properties) =
3274 if let Some((subject_metadata, properties)) = params.0 {
3275 (subject_metadata, properties)
3276 } else {
3277 Default::default()
3278 };
3279 Self {
3280 hash: Some(params.2),
3281 our_key: params.1,
3282 service: params.3,
3283 subject_metadata,
3284 properties,
3285 }
3286 }
3287
3288 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
3289 match (
3290 event.content().event_request.content(),
3291 &event.content().protocols,
3292 ) {
3293 (EventRequest::Create(..), Protocols::Create { validation }) => {
3294 if let ValidationMetadata::Metadata(metadata) =
3295 &validation.validation_metadata
3296 {
3297 self.subject_metadata = SubjectMetadata::new(metadata);
3298 self.properties = serde_json::from_value::<GovernanceData>(
3299 metadata.properties.0.clone(),
3300 )
3301 .map_err(|e| {
3302 error!(
3303 event_type = "Create",
3304 subject_id = %self.subject_metadata.subject_id,
3305 error = %e,
3306 "Failed to convert properties into GovernanceData"
3307 );
3308 ActorError::Functional { description: format!("In create event, can not convert properties into GovernanceData: {e}")}
3309 })?;
3310
3311 debug!(
3312 event_type = "Create",
3313 subject_id = %self.subject_metadata.subject_id,
3314 sn = self.subject_metadata.sn,
3315 "Applied create event"
3316 );
3317 } else {
3318 error!(
3319 event_type = "Create",
3320 "Validation metadata must be Metadata type"
3321 );
3322 return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
3323 }
3324
3325 return Ok(());
3326 }
3327 (
3328 EventRequest::Fact(..),
3329 Protocols::GovFact {
3330 evaluation,
3331 approval,
3332 ..
3333 },
3334 ) => {
3335 if let Some(eval_res) = evaluation.evaluator_res() {
3336 if let Some(appr_res) = approval {
3337 if appr_res.approved {
3338 self.apply_patch(eval_res.patch)?;
3339 debug!(
3340 event_type = "Fact",
3341 subject_id = %self.subject_metadata.subject_id,
3342 approved = true,
3343 "Applied fact event with patch"
3344 );
3345 } else {
3346 debug!(
3347 event_type = "Fact",
3348 subject_id = %self.subject_metadata.subject_id,
3349 approved = false,
3350 "Fact event not approved, patch not applied"
3351 );
3352 }
3353 } else {
3354 error!(
3355 event_type = "Fact",
3356 subject_id = %self.subject_metadata.subject_id,
3357 "Evaluation successful but no approval present"
3358 );
3359 return Err(ActorError::Functional { description: "The evaluation event was successful, but there is no approval".to_owned() });
3360 }
3361 }
3362 }
3363 (
3364 EventRequest::Transfer(transfer_request),
3365 Protocols::Transfer { evaluation, .. },
3366 ) => {
3367 if evaluation.evaluator_res().is_some() {
3368 self.subject_metadata.new_owner =
3369 Some(transfer_request.new_owner.clone());
3370 debug!(
3371 event_type = "Transfer",
3372 subject_id = %self.subject_metadata.subject_id,
3373 new_owner = %transfer_request.new_owner,
3374 "Applied transfer event"
3375 );
3376 }
3377 }
3378 (
3379 EventRequest::Confirm(..),
3380 Protocols::GovConfirm { evaluation, .. },
3381 ) => {
3382 if let Some(eval_res) = evaluation.evaluator_res() {
3383 if let Some(new_owner) =
3384 self.subject_metadata.new_owner.take()
3385 {
3386 self.subject_metadata.owner = new_owner.clone();
3387 self.apply_patch(eval_res.patch)?;
3388 debug!(
3389 event_type = "Confirm",
3390 subject_id = %self.subject_metadata.subject_id,
3391 new_owner = %new_owner,
3392 "Applied confirm event with patch"
3393 );
3394 } else {
3395 error!(
3396 event_type = "Confirm",
3397 subject_id = %self.subject_metadata.subject_id,
3398 "New owner is None in confirm event"
3399 );
3400 return Err(ActorError::Functional {
3401 description: "In confirm event, new owner is None"
3402 .to_owned(),
3403 });
3404 }
3405 }
3406 }
3407 (EventRequest::Reject(..), Protocols::Reject { .. }) => {
3408 self.subject_metadata.new_owner = None;
3409 debug!(
3410 event_type = "Reject",
3411 subject_id = %self.subject_metadata.subject_id,
3412 "Applied reject event"
3413 );
3414 }
3415 (EventRequest::EOL(..), Protocols::EOL { .. }) => {
3416 self.subject_metadata.active = false;
3417 debug!(
3418 event_type = "EOL",
3419 subject_id = %self.subject_metadata.subject_id,
3420 "Applied EOL event"
3421 );
3422 }
3423 _ => {
3424 error!(
3425 subject_id = %self.subject_metadata.subject_id,
3426 "Invalid protocol data for Governance"
3427 );
3428 return Err(ActorError::Functional {
3429 description: "Invalid protocol data for Governance"
3430 .to_owned(),
3431 });
3432 }
3433 }
3434
3435 if event.content().protocols.is_success() {
3436 self.properties.version += 1;
3437 }
3438
3439 self.subject_metadata.sn += 1;
3440 self.subject_metadata.prev_ledger_event_hash =
3441 event.content().prev_ledger_event_hash.clone();
3442
3443 Ok(())
3444 }
3445}
3446
3447impl Storable for Governance {}