1use std::{
5 collections::{HashMap, HashSet},
6 path::Path,
7 sync::Arc,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use register::Register;
12use tokio::fs;
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16 auth::{Auth, AuthMessage, AuthResponse},
17 db::Storable,
18 distribution::worker::DistriWorker,
19 governance::{Governance, GovernanceMessage, GovernanceResponse},
20 helpers::{db::ExternalDB, network::service::NetworkSender},
21 manual_distribution::ManualDistribution,
22 model::common::node::SignTypesNode,
23 node::subject_manager::{SubjectManager, SubjectManagerMessage},
24 subject::{
25 SignedLedger, replay_sink_events as replay_ledgers_to_sink_events,
26 },
27 system::ConfigHelper,
28 tracker::{Tracker, TrackerMessage, TrackerResponse},
29};
30
31use ave_common::{
32 SchemaType,
33 identity::{
34 DigestIdentifier, HashAlgorithm, PublicKey, Signature, keys::KeyPair,
35 },
36 response::SinkEventsPage,
37};
38
39use async_trait::async_trait;
40use ave_actors::{
41 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
42 Message, Response, Sink,
43};
44use ave_actors::{LightPersistence, PersistentActor};
45use serde::{Deserialize, Serialize};
46
47pub mod register;
48pub mod subject_manager;
49
50#[derive(
51 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
52)]
53pub struct TransferSubject {
54 pub name: Option<String>,
55 pub subject_id: DigestIdentifier,
56 pub new_owner: PublicKey,
57 pub actual_owner: PublicKey,
58}
59
60#[derive(
61 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
62)]
63pub struct TransferData {
64 pub name: Option<String>,
65 pub new_owner: PublicKey,
66 pub actual_owner: PublicKey,
67}
68
69#[derive(
70 Clone,
71 Debug,
72 Serialize,
73 Deserialize,
74 BorshSerialize,
75 BorshDeserialize,
76 Eq,
77 PartialEq,
78)]
79pub enum SubjectData {
80 Tracker {
81 governance_id: DigestIdentifier,
82 schema_id: SchemaType,
83 namespace: String,
84 active: bool,
85 },
86 Governance {
87 active: bool,
88 },
89}
90
91impl SubjectData {
92 pub fn get_schema_id(&self) -> SchemaType {
93 match self {
94 Self::Tracker { schema_id, .. } => schema_id.clone(),
95 Self::Governance { .. } => SchemaType::Governance,
96 }
97 }
98
99 pub fn get_governance_id(&self) -> Option<DigestIdentifier> {
100 match self {
101 Self::Tracker { governance_id, .. } => Some(governance_id.clone()),
102 Self::Governance { .. } => None,
103 }
104 }
105
106 pub fn get_namespace(&self) -> String {
107 match self {
108 Self::Tracker { namespace, .. } => namespace.clone(),
109 Self::Governance { .. } => String::default(),
110 }
111 }
112
113 pub const fn get_active(&self) -> bool {
114 match self {
115 Self::Tracker { active, .. } => *active,
116 Self::Governance { active } => *active,
117 }
118 }
119
120 pub const fn eol(&mut self) {
121 match self {
122 Self::Tracker { active, .. } => *active = false,
123 Self::Governance { active } => *active = false,
124 };
125 }
126}
127
128#[derive(Debug, Serialize, Deserialize, Clone)]
130pub struct Node {
131 #[serde(skip)]
133 owner: KeyPair,
134 #[serde(skip)]
135 our_key: Arc<PublicKey>,
136 #[serde(skip)]
137 hash: Option<HashAlgorithm>,
138 #[serde(skip)]
139 is_service: bool,
140 owned_subjects: HashMap<DigestIdentifier, SubjectData>,
142 known_subjects: HashMap<DigestIdentifier, SubjectData>,
144
145 transfer_subjects: HashMap<DigestIdentifier, TransferData>,
146
147 reject_subjects: HashSet<DigestIdentifier>,
148}
149
150impl BorshSerialize for Node {
152 fn serialize<W: std::io::Write>(
153 &self,
154 writer: &mut W,
155 ) -> std::io::Result<()> {
156 BorshSerialize::serialize(&self.owned_subjects, writer)?;
158 BorshSerialize::serialize(&self.known_subjects, writer)?;
159 BorshSerialize::serialize(&self.transfer_subjects, writer)?;
160 BorshSerialize::serialize(&self.reject_subjects, writer)?;
161 Ok(())
162 }
163}
164
165impl BorshDeserialize for Node {
166 fn deserialize_reader<R: std::io::Read>(
167 reader: &mut R,
168 ) -> std::io::Result<Self> {
169 let owned_subjects =
171 HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
172 reader,
173 )?;
174 let known_subjects =
175 HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
176 reader,
177 )?;
178 let transfer_subjects =
179 HashMap::<DigestIdentifier, TransferData>::deserialize_reader(
180 reader,
181 )?;
182 let reject_subjects =
183 HashSet::<DigestIdentifier>::deserialize_reader(reader)?;
184
185 let owner = KeyPair::default();
188 let our_key = Arc::new(PublicKey::default());
189 let hash = None;
190
191 Ok(Self {
192 hash,
193 our_key,
194 owner,
195 owned_subjects,
196 known_subjects,
197 transfer_subjects,
198 reject_subjects,
199 is_service: false,
200 })
201 }
202}
203
204impl Node {
205 fn get_subject_data(
206 &self,
207 subject_id: &DigestIdentifier,
208 ) -> Option<SubjectData> {
209 self.owned_subjects
210 .get(subject_id)
211 .or_else(|| self.known_subjects.get(subject_id))
212 .cloned()
213 }
214
215 pub fn transfer_subject(&mut self, data: TransferSubject) {
217 if data.new_owner == *self.our_key {
218 self.reject_subjects.remove(&data.subject_id);
219 }
220
221 self.transfer_subjects.insert(
222 data.subject_id,
223 TransferData {
224 name: data.name,
225 new_owner: data.new_owner,
226 actual_owner: data.actual_owner,
227 },
228 );
229 }
230
231 pub fn delete_transfer(&mut self, subject_id: &DigestIdentifier) {
232 if let Some(data) = self.transfer_subjects.remove(subject_id)
233 && data.actual_owner == *self.our_key
234 {
235 self.reject_subjects.insert(subject_id.clone());
236 }
237 }
238
239 pub fn confirm_transfer(&mut self, subject_id: DigestIdentifier) {
240 self.our_key.to_string();
241
242 if let Some(data) = self.transfer_subjects.remove(&subject_id) {
243 if data.actual_owner == *self.our_key {
244 if let Some(data) = self.owned_subjects.remove(&subject_id) {
245 self.known_subjects.insert(subject_id, data);
246 }
247 } else if data.new_owner == *self.our_key
248 && let Some(data) = self.known_subjects.remove(&subject_id)
249 {
250 self.owned_subjects.insert(subject_id, data);
251 };
252 };
253 }
254
255 pub fn eol(&mut self, subject_id: DigestIdentifier, i_owner: bool) {
256 if i_owner {
257 if let Some(data) = self.owned_subjects.get_mut(&subject_id) {
258 data.eol();
259 }
260 } else if let Some(data) = self.known_subjects.get_mut(&subject_id) {
261 data.eol();
262 }
263 }
264
265 pub fn register_subject(
266 &mut self,
267 subject_id: DigestIdentifier,
268 owner: PublicKey,
269 data: SubjectData,
270 ) {
271 if *self.our_key == owner {
272 self.owned_subjects.insert(subject_id, data);
273 } else {
274 self.known_subjects.insert(subject_id, data);
275 }
276 }
277
278 pub fn delete_subject(&mut self, subject_id: &DigestIdentifier) {
279 self.owned_subjects
280 .remove(subject_id)
281 .or_else(|| self.known_subjects.remove(subject_id));
282
283 self.transfer_subjects.remove(subject_id);
284 self.reject_subjects.remove(subject_id);
285 }
286
287 fn governance_trackers(
288 &self,
289 governance_id: &DigestIdentifier,
290 ) -> Vec<DigestIdentifier> {
291 self.owned_subjects
292 .iter()
293 .chain(self.known_subjects.iter())
294 .filter_map(|(subject_id, data)| match data {
295 SubjectData::Tracker {
296 governance_id: tracker_governance_id,
297 ..
298 } if tracker_governance_id == governance_id => {
299 Some(subject_id.clone())
300 }
301 _ => None,
302 })
303 .collect()
304 }
305
306 fn sign<T: BorshSerialize>(
307 &self,
308 content: &T,
309 ) -> Result<Signature, ActorError> {
310 Signature::new(content, &self.owner).map_err(|e| {
311 ActorError::Functional {
312 description: format!("{}", e),
313 }
314 })
315 }
316
317 async fn build_compilation_dir(
318 ctx: &ActorContext<Self>,
319 ) -> Result<(), ActorError> {
320 let contracts_path = if let Some(config) =
321 ctx.system().get_helper::<ConfigHelper>("config").await
322 {
323 config.contracts_path
324 } else {
325 error!("Config helper not found");
326 return Err(ActorError::Helper {
327 name: "config".to_owned(),
328 reason: "Not found".to_string(),
329 });
330 };
331
332 let dir = contracts_path.join("contracts");
333
334 if !Path::new(&dir).exists() {
335 fs::create_dir_all(&dir).await.map_err(|e| {
336 error!(
337 error = %e,
338 path = ?dir,
339 "Failed to create contracts directory"
340 );
341 ActorError::FunctionalCritical {
342 description: format!("Can not create contracts dir: {}", e),
343 }
344 })?;
345 }
346 Ok(())
347 }
348
349 async fn create_distributors(
350 &self,
351 ctx: &mut ActorContext<Self>,
352 network: &Arc<NetworkSender>,
353 ) -> Result<(), ActorError> {
354 for subject in self.owned_subjects.keys() {
355 let distributor_name = format!("distributor_{}", subject);
356 if ctx
357 .get_child::<DistriWorker>(&distributor_name)
358 .await
359 .is_err()
360 {
361 ctx.create_child(
362 &distributor_name,
363 DistriWorker {
364 our_key: self.our_key.clone(),
365 network: network.clone(),
366 },
367 )
368 .await?;
369 }
370 }
371
372 for subject in self.known_subjects.keys() {
373 let distributor_name = format!("distributor_{}", subject);
374 if ctx
375 .get_child::<DistriWorker>(&distributor_name)
376 .await
377 .is_err()
378 {
379 ctx.create_child(
380 &distributor_name,
381 DistriWorker {
382 our_key: self.our_key.clone(),
383 network: network.clone(),
384 },
385 )
386 .await?;
387 }
388 }
389
390 Ok(())
391 }
392
393 fn governance_ids(&self) -> Vec<DigestIdentifier> {
394 let mut governance_ids = self
395 .owned_subjects
396 .iter()
397 .filter(|(_, data)| matches!(data, SubjectData::Governance { .. }))
398 .map(|(subject_id, _)| subject_id.clone())
399 .collect::<HashSet<_>>();
400
401 governance_ids.extend(
402 self.known_subjects
403 .iter()
404 .filter(|(_, data)| {
405 matches!(data, SubjectData::Governance { .. })
406 })
407 .map(|(subject_id, _)| subject_id.clone()),
408 );
409
410 governance_ids.into_iter().collect()
411 }
412
413 async fn get_tracker_ledger_batch(
414 ctx: &ActorContext<Self>,
415 actor: &ave_actors::ActorRef<Tracker>,
416 lo_sn: Option<u64>,
417 hi_sn: u64,
418 ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
419 let response = actor
420 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
421 .await?;
422
423 match response {
424 TrackerResponse::Ledger { ledger, is_all } => Ok((ledger, is_all)),
425 _ => Err(ActorError::UnexpectedResponse {
426 path: ctx.path().clone() / "subject_manager",
427 expected: "TrackerResponse::Ledger".to_owned(),
428 }),
429 }
430 }
431
432 async fn get_governance_ledger_batch(
433 ctx: &ActorContext<Self>,
434 actor: &ave_actors::ActorRef<Governance>,
435 lo_sn: Option<u64>,
436 hi_sn: u64,
437 ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
438 let response = actor
439 .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
440 .await?;
441
442 match response {
443 GovernanceResponse::Ledger { ledger, is_all } => {
444 Ok((ledger, is_all))
445 }
446 _ => Err(ActorError::UnexpectedResponse {
447 path: ctx.path().clone() / "subject_manager",
448 expected: "GovernanceResponse::Ledger".to_owned(),
449 }),
450 }
451 }
452
453 async fn collect_tracker_ledger(
454 ctx: &ActorContext<Self>,
455 subject_id: &DigestIdentifier,
456 hi_sn: u64,
457 ) -> Result<Vec<SignedLedger>, ActorError> {
458 let path = ActorPath::from(format!(
459 "/user/node/subject_manager/{}",
460 subject_id
461 ));
462 let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
463 let mut ledger = Vec::new();
464 let mut lo_sn = None;
465
466 loop {
467 let (mut batch, is_all) =
468 Self::get_tracker_ledger_batch(ctx, &tracker, lo_sn, hi_sn)
469 .await?;
470 if batch.is_empty() {
471 break;
472 }
473 lo_sn = batch.last().map(|event| event.content().sn);
474 ledger.append(&mut batch);
475 if is_all {
476 break;
477 }
478 }
479
480 Ok(ledger)
481 }
482
483 async fn collect_governance_ledger(
484 ctx: &ActorContext<Self>,
485 subject_id: &DigestIdentifier,
486 hi_sn: u64,
487 ) -> Result<Vec<SignedLedger>, ActorError> {
488 let path = ActorPath::from(format!(
489 "/user/node/subject_manager/{}",
490 subject_id
491 ));
492 let governance = ctx.system().get_actor::<Governance>(&path).await?;
493 let mut ledger = Vec::new();
494 let mut lo_sn = None;
495
496 loop {
497 let (mut batch, is_all) = Self::get_governance_ledger_batch(
498 ctx,
499 &governance,
500 lo_sn,
501 hi_sn,
502 )
503 .await?;
504 if batch.is_empty() {
505 break;
506 }
507 lo_sn = batch.last().map(|event| event.content().sn);
508 ledger.append(&mut batch);
509 if is_all {
510 break;
511 }
512 }
513
514 Ok(ledger)
515 }
516
517 async fn replay_sink_events(
518 &self,
519 ctx: &ActorContext<Self>,
520 subject_id: DigestIdentifier,
521 from_sn: u64,
522 to_sn: Option<u64>,
523 limit: u64,
524 ) -> Result<SinkEventsPage, ActorError> {
525 if limit == 0 {
526 return Err(ActorError::Functional {
527 description: "Replay limit must be greater than zero"
528 .to_owned(),
529 });
530 }
531 if let Some(to_sn) = to_sn
532 && from_sn > to_sn
533 {
534 return Err(ActorError::Functional {
535 description: "Replay range requires from_sn <= to_sn"
536 .to_owned(),
537 });
538 }
539
540 let Some(subject_data) = self
541 .owned_subjects
542 .get(&subject_id)
543 .cloned()
544 .or_else(|| self.known_subjects.get(&subject_id).cloned())
545 else {
546 return Err(ActorError::NotFound {
547 path: ActorPath::from(format!(
548 "/user/node/subject_manager/{}",
549 subject_id
550 )),
551 });
552 };
553
554 let sink_timestamp = ave_common::identity::TimeStamp::now().as_nanos();
555 let public_key = self.our_key.to_string();
556
557 match subject_data {
558 SubjectData::Governance { .. } => {
559 let path = ActorPath::from(format!(
560 "/user/node/subject_manager/{}",
561 subject_id
562 ));
563 let governance =
564 ctx.system().get_actor::<Governance>(&path).await?;
565 let response =
566 governance.ask(GovernanceMessage::GetMetadata).await?;
567 let GovernanceResponse::Metadata(metadata) = response else {
568 return Err(ActorError::UnexpectedResponse {
569 path,
570 expected: "GovernanceResponse::Metadata".to_owned(),
571 });
572 };
573 let hi_sn = to_sn
574 .map(|to_sn| to_sn.min(metadata.sn))
575 .unwrap_or(metadata.sn);
576 let ledger =
577 Self::collect_governance_ledger(ctx, &subject_id, hi_sn)
578 .await?;
579 replay_ledgers_to_sink_events(
580 &ledger,
581 &public_key,
582 from_sn,
583 to_sn,
584 limit,
585 sink_timestamp,
586 )
587 }
588 SubjectData::Tracker { .. } => {
589 let subject_manager =
590 ctx.get_child::<SubjectManager>("subject_manager").await?;
591 let requester = format!(
592 "node_replay_sink_events:{}:{}:{}",
593 subject_id, from_sn, limit
594 );
595 subject_manager
596 .ask(SubjectManagerMessage::Up {
597 subject_id: subject_id.clone(),
598 requester: requester.clone(),
599 create_ledger: None,
600 })
601 .await?;
602
603 let result = async {
604 let path = ActorPath::from(format!(
605 "/user/node/subject_manager/{}",
606 subject_id
607 ));
608 let tracker =
609 ctx.system().get_actor::<Tracker>(&path).await?;
610 let response =
611 tracker.ask(TrackerMessage::GetMetadata).await?;
612 let TrackerResponse::Metadata(metadata) = response else {
613 return Err(ActorError::UnexpectedResponse {
614 path,
615 expected: "TrackerResponse::Metadata".to_owned(),
616 });
617 };
618 let hi_sn = to_sn
619 .map(|to_sn| to_sn.min(metadata.sn))
620 .unwrap_or(metadata.sn);
621 let ledger =
622 Self::collect_tracker_ledger(ctx, &subject_id, hi_sn)
623 .await?;
624 replay_ledgers_to_sink_events(
625 &ledger,
626 &public_key,
627 from_sn,
628 to_sn,
629 limit,
630 sink_timestamp,
631 )
632 }
633 .await;
634
635 let _ = subject_manager
636 .ask(SubjectManagerMessage::Finish {
637 subject_id,
638 requester,
639 })
640 .await;
641
642 result
643 }
644 }
645 }
646}
647
648#[derive(Clone, Debug, Serialize, Deserialize)]
650pub enum NodeMessage {
651 GetGovernances,
652 GetSinkEvents {
653 subject_id: DigestIdentifier,
654 from_sn: u64,
655 to_sn: Option<u64>,
656 limit: u64,
657 },
658 SignRequest(Box<SignTypesNode>),
659 PendingTransfers,
660 RegisterSubject {
661 owner: PublicKey,
662 subject_id: DigestIdentifier,
663 data: SubjectData,
664 },
665 GetSubjectData(DigestIdentifier),
666 GovernanceTrackers(DigestIdentifier),
667 DeleteSubject(DigestIdentifier),
668 IOwnerNewOwnerSubject(DigestIdentifier),
669 ICanSendLastLedger(DigestIdentifier),
670 AuthData(DigestIdentifier),
671 TransferSubject(TransferSubject),
672 RejectTransfer(DigestIdentifier),
673 ConfirmTransfer(DigestIdentifier),
674 EOLSubject {
675 subject_id: DigestIdentifier,
676 i_owner: bool,
677 },
678}
679
680impl Message for NodeMessage {
681 fn is_critical(&self) -> bool {
682 matches!(
683 self,
684 Self::TransferSubject(..)
685 | Self::DeleteSubject(..)
686 | Self::RejectTransfer(..)
687 | Self::ConfirmTransfer(..)
688 | Self::EOLSubject { .. }
689 )
690 }
691}
692
693#[derive(Clone, Debug, Serialize, Deserialize)]
695pub enum NodeResponse {
696 Governances(Vec<DigestIdentifier>),
697 SinkEvents(SinkEventsPage),
698 SubjectData(Option<SubjectData>),
699 GovernanceTrackers(Vec<DigestIdentifier>),
700 PendingTransfers(Vec<TransferSubject>),
701 SignRequest(Signature),
702 IOwnerNewOwner {
703 i_owner: bool,
704 i_new_owner: Option<bool>,
705 },
706 AuthData {
707 auth: bool,
708 subject_data: Option<SubjectData>,
709 },
710 Ok,
711}
712
713impl Response for NodeResponse {}
714
715#[derive(
717 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
718)]
719pub enum NodeEvent {
720 RegisterSubject {
721 owner: PublicKey,
722 subject_id: DigestIdentifier,
723 data: SubjectData,
724 },
725 DeleteSubject(DigestIdentifier),
726 TransferSubject(TransferSubject),
727 RejectTransfer(DigestIdentifier),
728 ConfirmTransfer(DigestIdentifier),
729 EOLSubject {
730 subject_id: DigestIdentifier,
731 i_owner: bool,
732 },
733}
734
735impl Event for NodeEvent {}
736
737#[async_trait]
738impl Actor for Node {
739 type Event = NodeEvent;
740 type Message = NodeMessage;
741 type Response = NodeResponse;
742
743 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
744 parent_span.map_or_else(
745 || info_span!("Node"),
746 |parent_span| info_span!(parent: parent_span, "Node"),
747 )
748 }
749
750 async fn pre_start(
751 &mut self,
752 ctx: &mut ave_actors::ActorContext<Self>,
753 ) -> Result<(), ActorError> {
754 if let Err(e) = Self::build_compilation_dir(ctx).await {
755 error!(
756 error = %e,
757 "Failed to build compilation directory"
758 );
759 return Err(e);
760 }
761
762 if let Err(e) = self.init_store("node", None, true, ctx).await {
764 error!(
765 error = %e,
766 "Failed to initialize node store"
767 );
768 return Err(e);
769 }
770
771 let Some(config) =
772 ctx.system().get_helper::<ConfigHelper>("config").await
773 else {
774 error!("Config helper not found");
775 return Err(ActorError::Helper {
776 name: "config".to_string(),
777 reason: "Not found".to_string(),
778 });
779 };
780 let safe_mode = config.safe_mode;
781 let Some(network): Option<Arc<NetworkSender>> =
782 ctx.system().get_helper("network").await
783 else {
784 error!("Network helper not found");
785 return Err(ActorError::Helper {
786 name: "network".to_string(),
787 reason: "Not found".to_string(),
788 });
789 };
790
791 if !safe_mode {
792 let register_actor =
793 match ctx.create_child("register", Register).await {
794 Ok(actor) => actor,
795 Err(e) => {
796 error!(error = %e, "Failed to create register child");
797 return Err(e);
798 }
799 };
800
801 let Some(ext_db): Option<Arc<ExternalDB>> =
802 ctx.system().get_helper("ext_db").await
803 else {
804 error!("External DB helper not found");
805 return Err(ActorError::Helper {
806 name: "ext_db".to_string(),
807 reason: "Not found".to_string(),
808 });
809 };
810
811 let sink =
812 Sink::new(register_actor.subscribe(), ext_db.get_register());
813 ctx.system().run_sink(sink).await;
814
815 if let Err(e) = ctx
816 .create_child(
817 "manual_distribution",
818 ManualDistribution::new(self.our_key.clone()),
819 )
820 .await
821 {
822 error!(
823 error = %e,
824 "Failed to create manual_distribution child"
825 );
826 return Err(e);
827 }
828
829 if let Err(e) = self.create_distributors(ctx, &network).await {
830 error!(
831 error = %e,
832 "Failed to create distributors"
833 );
834 return Err(e);
835 }
836 }
837
838 let Some(hash) = self.hash else {
839 error!("Hash is None during subject manager startup");
840 return Err(ActorError::FunctionalCritical {
841 description: "Hash is None".to_string(),
842 });
843 };
844
845 let subject_manager = match ctx
846 .create_child(
847 "subject_manager",
848 SubjectManager::new(
849 self.our_key.clone(),
850 hash,
851 self.is_service,
852 ),
853 )
854 .await
855 {
856 Ok(actor) => actor,
857 Err(e) => {
858 error!(error = %e, "Failed to create subject_manager child");
859 return Err(e);
860 }
861 };
862
863 if let Err(e) = subject_manager
864 .ask(SubjectManagerMessage::UpGovernances {
865 governance_ids: self.governance_ids(),
866 })
867 .await
868 {
869 error!(error = %e, "Failed to bootstrap governances");
870 return Err(e);
871 }
872
873 if let Err(e) = ctx
874 .create_child(
875 "auth",
876 Auth::initial((network.clone(), self.our_key.clone())),
877 )
878 .await
879 {
880 error!(
881 error = %e,
882 "Failed to create auth child"
883 );
884 return Err(e);
885 }
886
887 if !safe_mode
888 && let Err(e) = ctx
889 .create_child(
890 "distributor",
891 DistriWorker {
892 our_key: self.our_key.clone(),
893 network,
894 },
895 )
896 .await
897 {
898 error!(
899 error = %e,
900 "Failed to create distributor child"
901 );
902 return Err(e);
903 }
904
905 Ok(())
906 }
907}
908
909#[async_trait]
910impl Handler<Self> for Node {
911 async fn handle_message(
912 &mut self,
913 _sender: ActorPath,
914 msg: NodeMessage,
915 ctx: &mut ave_actors::ActorContext<Self>,
916 ) -> Result<NodeResponse, ActorError> {
917 match msg {
918 NodeMessage::GetSinkEvents {
919 subject_id,
920 from_sn,
921 to_sn,
922 limit,
923 } => Ok(NodeResponse::SinkEvents(
924 self.replay_sink_events(ctx, subject_id, from_sn, to_sn, limit)
925 .await?,
926 )),
927 NodeMessage::RegisterSubject {
928 owner,
929 subject_id,
930 data,
931 } => {
932 let Some(network): Option<Arc<NetworkSender>> =
933 ctx.system().get_helper("network").await
934 else {
935 error!(
936 msg_type = "RegisterSubject",
937 subject_id = %subject_id,
938 "Network helper not found"
939 );
940 return Err(ActorError::Helper {
941 name: "network".to_string(),
942 reason: "Not found".to_string(),
943 });
944 };
945
946 self.on_event(
947 NodeEvent::RegisterSubject {
948 owner,
949 subject_id: subject_id.clone(),
950 data,
951 },
952 ctx,
953 )
954 .await;
955
956 let distributor_name = format!("distributor_{}", subject_id);
957 if ctx
958 .get_child::<DistriWorker>(&distributor_name)
959 .await
960 .is_err()
961 {
962 ctx.create_child(
963 &distributor_name,
964 DistriWorker {
965 our_key: self.our_key.clone(),
966 network,
967 },
968 )
969 .await?;
970 }
971
972 Ok(NodeResponse::Ok)
973 }
974 NodeMessage::EOLSubject {
975 subject_id,
976 i_owner,
977 } => {
978 self.on_event(
979 NodeEvent::EOLSubject {
980 subject_id: subject_id.clone(),
981 i_owner,
982 },
983 ctx,
984 )
985 .await;
986
987 debug!(
988 msg_type = "EOLSubject",
989 subject_id = %subject_id,
990 i_owner = %i_owner,
991 "EOL confirmed"
992 );
993
994 Ok(NodeResponse::Ok)
995 }
996 NodeMessage::GetGovernances => {
997 let mut gov_know = self
998 .known_subjects
999 .iter()
1000 .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1001 .map(|x| x.0.clone())
1002 .collect::<Vec<DigestIdentifier>>();
1003 let mut gov_owned = self
1004 .owned_subjects
1005 .iter()
1006 .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1007 .map(|x| x.0.clone())
1008 .collect::<Vec<DigestIdentifier>>();
1009 gov_know.append(&mut gov_owned);
1010
1011 return Ok(NodeResponse::Governances(gov_know));
1012 }
1013 NodeMessage::ICanSendLastLedger(subject_id) => {
1014 let subject_data = if self.reject_subjects.contains(&subject_id)
1015 {
1016 self.known_subjects.get(&subject_id).cloned()
1017 } else {
1018 self.owned_subjects.get(&subject_id).cloned()
1019 };
1020
1021 Ok(NodeResponse::SubjectData(subject_data))
1022 }
1023 NodeMessage::GetSubjectData(subject_id) => {
1024 let data = self.get_subject_data(&subject_id);
1025 if data.is_none() {
1026 debug!(
1027 msg_type = "GetSubjectData",
1028 subject_id = %subject_id,
1029 "Subject not found"
1030 );
1031 }
1032
1033 debug!(
1034 msg_type = "GetSubjectData",
1035 subject_id = %subject_id,
1036 "Subject data retrieved successfully"
1037 );
1038
1039 Ok(NodeResponse::SubjectData(data))
1040 }
1041 NodeMessage::GovernanceTrackers(governance_id) => {
1042 let trackers = self.governance_trackers(&governance_id);
1043
1044 debug!(
1045 msg_type = "GovernanceTrackers",
1046 governance_id = %governance_id,
1047 count = trackers.len(),
1048 "Governance tracker association check completed"
1049 );
1050
1051 Ok(NodeResponse::GovernanceTrackers(trackers))
1052 }
1053 NodeMessage::DeleteSubject(subject_id) => {
1054 self.on_event(
1055 NodeEvent::DeleteSubject(subject_id.clone()),
1056 ctx,
1057 )
1058 .await;
1059
1060 debug!(
1061 msg_type = "DeleteSubject",
1062 subject_id = %subject_id,
1063 "Subject deleted from node state"
1064 );
1065
1066 Ok(NodeResponse::Ok)
1067 }
1068 NodeMessage::PendingTransfers => {
1069 let transfers: Vec<TransferSubject> = self
1070 .transfer_subjects
1071 .iter()
1072 .map(|x| TransferSubject {
1073 name: x.1.name.clone(),
1074 subject_id: x.0.clone(),
1075 new_owner: x.1.new_owner.clone(),
1076 actual_owner: x.1.actual_owner.clone(),
1077 })
1078 .collect();
1079
1080 debug!(
1081 msg_type = "PendingTransfers",
1082 count = transfers.len(),
1083 "Retrieved pending transfers"
1084 );
1085
1086 Ok(NodeResponse::PendingTransfers(transfers))
1087 }
1088 NodeMessage::RejectTransfer(subject_id) => {
1089 self.on_event(
1090 NodeEvent::RejectTransfer(subject_id.clone()),
1091 ctx,
1092 )
1093 .await;
1094
1095 debug!(
1096 msg_type = "RejectTransfer",
1097 subject_id = %subject_id,
1098 "Transfer rejected successfully"
1099 );
1100
1101 Ok(NodeResponse::Ok)
1102 }
1103 NodeMessage::TransferSubject(data) => {
1104 let subject_id = data.subject_id.clone();
1105 self.on_event(NodeEvent::TransferSubject(data), ctx).await;
1106
1107 debug!(
1108 msg_type = "TransferSubject",
1109 subject_id = %subject_id,
1110 "Subject transfer registered successfully"
1111 );
1112
1113 Ok(NodeResponse::Ok)
1114 }
1115 NodeMessage::ConfirmTransfer(subject_id) => {
1116 self.on_event(
1117 NodeEvent::ConfirmTransfer(subject_id.clone()),
1118 ctx,
1119 )
1120 .await;
1121
1122 debug!(
1123 msg_type = "ConfirmTransfer",
1124 subject_id = %subject_id,
1125 "Transfer confirmed between other parties"
1126 );
1127
1128 Ok(NodeResponse::Ok)
1129 }
1130 NodeMessage::SignRequest(content) => {
1131 let content = *content;
1132 let content_type = match &content {
1133 SignTypesNode::EventRequest(_) => "EventRequest",
1134 SignTypesNode::ValidationReq(_) => "ValidationReq",
1135 SignTypesNode::ValidationRes(_) => "ValidationRes",
1136 SignTypesNode::EvaluationReq(_) => "EvaluationReq",
1137 SignTypesNode::EvaluationRes(_) => "EvaluationRes",
1138 SignTypesNode::ApprovalReq(_) => "ApprovalReq",
1139 SignTypesNode::ApprovalRes(_) => "ApprovalRes",
1140 SignTypesNode::Ledger(_) => "Ledger",
1141 };
1142
1143 let sign = match content {
1144 SignTypesNode::EventRequest(event_req) => {
1145 self.sign(&event_req)
1146 }
1147 SignTypesNode::ValidationReq(validation_req) => {
1148 self.sign(&*validation_req)
1149 }
1150 SignTypesNode::ValidationRes(validation_res) => {
1151 self.sign(&validation_res)
1152 }
1153 SignTypesNode::EvaluationReq(evaluation_req) => {
1154 self.sign(&evaluation_req)
1155 }
1156 SignTypesNode::EvaluationRes(evaluation_res) => {
1157 self.sign(&evaluation_res)
1158 }
1159 SignTypesNode::ApprovalReq(approval_req) => {
1160 self.sign(&approval_req)
1161 }
1162 SignTypesNode::ApprovalRes(approval_res) => {
1163 self.sign(&*approval_res)
1164 }
1165 SignTypesNode::Ledger(ledger) => self.sign(&ledger),
1166 }
1167 .map_err(|e| {
1168 error!(
1169 msg_type = "SignRequest",
1170 content_type = content_type,
1171 error = %e,
1172 "Failed to sign content"
1173 );
1174 ActorError::FunctionalCritical {
1175 description: format!("Can not sign event: {}", e),
1176 }
1177 })?;
1178
1179 debug!(
1180 msg_type = "SignRequest",
1181 content_type = content_type,
1182 "Content signed successfully"
1183 );
1184
1185 Ok(NodeResponse::SignRequest(sign))
1186 }
1187 NodeMessage::IOwnerNewOwnerSubject(subject_id) => {
1188 let i_owner =
1189 self.owned_subjects.keys().any(|x| *x == subject_id);
1190
1191 let i_new_owner = if let Some(data) =
1192 self.transfer_subjects.get(&subject_id)
1193 {
1194 Some(data.new_owner == *self.our_key)
1195 } else {
1196 None
1197 };
1198
1199 debug!(
1200 msg_type = "OwnerPendingSubject",
1201 subject_id = %subject_id,
1202 i_owner = i_owner,
1203 i_new_owner = i_new_owner,
1204 "Checked owner/pending status"
1205 );
1206
1207 Ok(NodeResponse::IOwnerNewOwner {
1208 i_owner,
1209 i_new_owner,
1210 })
1211 }
1212 NodeMessage::AuthData(subject_id) => {
1213 let authorized_subjects = match ctx
1214 .get_child::<Auth>("auth")
1215 .await
1216 {
1217 Ok(auth) => {
1218 let res = match auth.ask(AuthMessage::GetAuths).await {
1219 Ok(res) => res,
1220 Err(e) => {
1221 error!(
1222 msg_type = "AuthData",
1223 subject_id = %subject_id,
1224 error = %e,
1225 "Failed to get authorizations from auth actor"
1226 );
1227 ctx.system().crash_system();
1228 return Err(e);
1229 }
1230 };
1231 let AuthResponse::Auths { subjects } = res else {
1232 error!(
1233 msg_type = "AuthData",
1234 subject_id = %subject_id,
1235 "Unexpected response from auth actor"
1236 );
1237 ctx.system().crash_system();
1238 return Err(ActorError::UnexpectedResponse {
1239 expected: "AuthResponse::Auths".to_owned(),
1240 path: ctx.path().clone() / "auth",
1241 });
1242 };
1243 subjects
1244 }
1245 Err(e) => {
1246 error!(
1247 msg_type = "AuthData",
1248 subject_id = %subject_id,
1249 "Auth actor not found"
1250 );
1251 ctx.system().crash_system();
1252 return Err(e);
1253 }
1254 };
1255
1256 let auth_subj =
1257 authorized_subjects.iter().any(|x| x.clone() == subject_id);
1258
1259 let subj_data = self
1260 .known_subjects
1261 .get(&subject_id)
1262 .or_else(|| self.owned_subjects.get(&subject_id))
1263 .cloned();
1264
1265 debug!(
1266 msg_type = "AuthData",
1267 subject_id = %subject_id,
1268 authorized = auth_subj,
1269 subject_data = ?subj_data,
1270 "Checked subject authorization status"
1271 );
1272
1273 Ok(NodeResponse::AuthData {
1274 auth: auth_subj,
1275 subject_data: subj_data,
1276 })
1277 }
1278 }
1279 }
1280
1281 async fn on_child_fault(
1282 &mut self,
1283 error: ActorError,
1284 ctx: &mut ActorContext<Self>,
1285 ) -> ChildAction {
1286 error!(
1287 error = %error,
1288 "Child actor fault, stopping system"
1289 );
1290 ctx.system().crash_system();
1291 ChildAction::Stop
1292 }
1293
1294 async fn on_event(
1295 &mut self,
1296 event: NodeEvent,
1297 ctx: &mut ActorContext<Self>,
1298 ) {
1299 if let Err(e) = self.persist(&event, ctx).await {
1300 error!(
1301 event = ?event,
1302 error = %e,
1303 "Failed to persist node event"
1304 );
1305 ctx.system().crash_system();
1306 }
1307 }
1308}
1309
1310pub struct InitParamsNode {
1311 pub key_pair: KeyPair,
1312 pub public_key: Arc<PublicKey>,
1313 pub hash: HashAlgorithm,
1314 pub is_service: bool,
1315}
1316
1317#[async_trait]
1318impl PersistentActor for Node {
1319 type Persistence = LightPersistence;
1320 type InitParams = InitParamsNode;
1321
1322 fn update(&mut self, state: Self) {
1323 self.owned_subjects = state.owned_subjects;
1324 self.known_subjects = state.known_subjects;
1325 self.transfer_subjects = state.transfer_subjects;
1326 self.reject_subjects = state.reject_subjects
1327 }
1328
1329 fn create_initial(params: Self::InitParams) -> Self {
1330 Self {
1331 hash: Some(params.hash),
1332 owner: params.key_pair,
1333 our_key: params.public_key,
1334 is_service: params.is_service,
1335 owned_subjects: HashMap::new(),
1336 known_subjects: HashMap::new(),
1337 transfer_subjects: HashMap::new(),
1338 reject_subjects: HashSet::new(),
1339 }
1340 }
1341
1342 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1344 match event {
1345 NodeEvent::EOLSubject {
1346 subject_id,
1347 i_owner,
1348 } => {
1349 self.eol(subject_id.clone(), *i_owner);
1350 debug!(
1351 event_type = "EOLSubject",
1352 subject_id = %subject_id,
1353 i_owner = &i_owner,
1354 "Applied eol"
1355 );
1356 }
1357 NodeEvent::ConfirmTransfer(subject_id) => {
1358 self.confirm_transfer(subject_id.clone());
1359 debug!(
1360 event_type = "ConfirmTransfer",
1361 subject_id = %subject_id,
1362 "Applied transfer confirmation"
1363 );
1364 }
1365 NodeEvent::RegisterSubject {
1366 subject_id,
1367 data,
1368 owner,
1369 } => {
1370 self.register_subject(
1371 subject_id.clone(),
1372 owner.clone(),
1373 data.clone(),
1374 );
1375 debug!(
1376 event_type = "RegisterSubject",
1377 subject_id = %subject_id,
1378 owner = %owner,
1379 "Applied subject registration"
1380 );
1381 }
1382 NodeEvent::DeleteSubject(subject_id) => {
1383 self.delete_subject(subject_id);
1384 debug!(
1385 event_type = "DeleteSubject",
1386 subject_id = %subject_id,
1387 "Applied subject deletion"
1388 );
1389 }
1390 NodeEvent::RejectTransfer(subject_id) => {
1391 self.delete_transfer(subject_id);
1392 debug!(
1393 event_type = "RejectTransfer",
1394 subject_id = %subject_id,
1395 "Applied transfer rejection"
1396 );
1397 }
1398 NodeEvent::TransferSubject(transfer) => {
1399 self.transfer_subject(transfer.clone());
1400 debug!(
1401 event_type = "TransferSubject",
1402 subject_id = %transfer.subject_id,
1403 new_owner = %transfer.new_owner,
1404 "Applied subject transfer"
1405 );
1406 }
1407 };
1408
1409 Ok(())
1410 }
1411}
1412
1413#[async_trait]
1414impl Storable for Node {}