1use std::{
5 collections::{HashMap, HashSet},
6 path::Path,
7 sync::Arc,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use register::Register;
12use tokio::fs;
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16 auth::{Auth, AuthInitParams, AuthMessage, AuthResponse},
17 db::Storable,
18 distribution::worker::DistriWorker,
19 governance::{Governance, GovernanceMessage, GovernanceResponse},
20 helpers::{db::ExternalDB, network::service::NetworkSender},
21 manual_distribution::ManualDistribution,
22 model::{common::node::SignTypesNode, event::Ledger},
23 node::subject_manager::{SubjectManager, SubjectManagerMessage},
24 subject::replay_sink_events as replay_ledgers_to_sink_events,
25 system::ConfigHelper,
26 tracker::{Tracker, TrackerMessage, TrackerResponse},
27};
28
29use ave_common::{
30 SchemaType,
31 identity::{
32 DigestIdentifier, HashAlgorithm, PublicKey, Signature, keys::KeyPair,
33 },
34 response::SinkEventsPage,
35};
36
37use async_trait::async_trait;
38use ave_actors::{
39 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
40 Message, Response, Sink,
41};
42use ave_actors::{LightPersistence, PersistentActor};
43use serde::{Deserialize, Serialize};
44
45pub mod register;
46pub mod subject_manager;
47
48#[derive(
49 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
50)]
51pub struct TransferSubject {
52 pub name: Option<String>,
53 pub subject_id: DigestIdentifier,
54 pub new_owner: PublicKey,
55 pub actual_owner: PublicKey,
56}
57
58#[derive(
59 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
60)]
61pub struct TransferData {
62 pub name: Option<String>,
63 pub new_owner: PublicKey,
64 pub actual_owner: PublicKey,
65}
66
67#[derive(
68 Clone,
69 Debug,
70 Serialize,
71 Deserialize,
72 BorshSerialize,
73 BorshDeserialize,
74 Eq,
75 PartialEq,
76)]
77pub enum SubjectData {
78 Tracker {
79 governance_id: DigestIdentifier,
80 schema_id: SchemaType,
81 namespace: String,
82 active: bool,
83 },
84 Governance {
85 active: bool,
86 },
87}
88
89impl SubjectData {
90 pub fn get_schema_id(&self) -> SchemaType {
91 match self {
92 Self::Tracker { schema_id, .. } => schema_id.clone(),
93 Self::Governance { .. } => SchemaType::Governance,
94 }
95 }
96
97 pub fn get_governance_id(&self) -> Option<DigestIdentifier> {
98 match self {
99 Self::Tracker { governance_id, .. } => Some(governance_id.clone()),
100 Self::Governance { .. } => None,
101 }
102 }
103
104 pub fn get_namespace(&self) -> String {
105 match self {
106 Self::Tracker { namespace, .. } => namespace.clone(),
107 Self::Governance { .. } => String::default(),
108 }
109 }
110
111 pub const fn get_active(&self) -> bool {
112 match self {
113 Self::Tracker { active, .. } => *active,
114 Self::Governance { active } => *active,
115 }
116 }
117
118 pub const fn eol(&mut self) {
119 match self {
120 Self::Tracker { active, .. } => *active = false,
121 Self::Governance { active } => *active = false,
122 };
123 }
124}
125
126#[derive(Debug, Serialize, Deserialize, Clone)]
128pub struct Node {
129 #[serde(skip)]
131 owner: KeyPair,
132 #[serde(skip)]
133 our_key: Arc<PublicKey>,
134 #[serde(skip)]
135 hash: Option<HashAlgorithm>,
136 #[serde(skip)]
137 is_service: bool,
138 #[serde(skip)]
139 only_clear_events: bool,
140 #[serde(skip)]
141 ledger_batch_size: u64,
142 owned_subjects: HashMap<DigestIdentifier, SubjectData>,
144 known_subjects: HashMap<DigestIdentifier, SubjectData>,
146
147 transfer_subjects: HashMap<DigestIdentifier, TransferData>,
148
149 reject_subjects: HashSet<DigestIdentifier>,
150}
151
152impl BorshSerialize for Node {
154 fn serialize<W: std::io::Write>(
155 &self,
156 writer: &mut W,
157 ) -> std::io::Result<()> {
158 BorshSerialize::serialize(&self.owned_subjects, writer)?;
160 BorshSerialize::serialize(&self.known_subjects, writer)?;
161 BorshSerialize::serialize(&self.transfer_subjects, writer)?;
162 BorshSerialize::serialize(&self.reject_subjects, writer)?;
163 Ok(())
164 }
165}
166
167impl BorshDeserialize for Node {
168 fn deserialize_reader<R: std::io::Read>(
169 reader: &mut R,
170 ) -> std::io::Result<Self> {
171 let owned_subjects =
173 HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
174 reader,
175 )?;
176 let known_subjects =
177 HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
178 reader,
179 )?;
180 let transfer_subjects =
181 HashMap::<DigestIdentifier, TransferData>::deserialize_reader(
182 reader,
183 )?;
184 let reject_subjects =
185 HashSet::<DigestIdentifier>::deserialize_reader(reader)?;
186
187 let owner = KeyPair::default();
190 let our_key = Arc::new(PublicKey::default());
191 let hash = None;
192
193 Ok(Self {
194 hash,
195 our_key,
196 owner,
197 ledger_batch_size: 100,
198 owned_subjects,
199 known_subjects,
200 transfer_subjects,
201 reject_subjects,
202 is_service: false,
203 only_clear_events: false,
204 })
205 }
206}
207
208impl Node {
209 fn get_subject_data(
210 &self,
211 subject_id: &DigestIdentifier,
212 ) -> Option<SubjectData> {
213 self.owned_subjects
214 .get(subject_id)
215 .or_else(|| self.known_subjects.get(subject_id))
216 .cloned()
217 }
218
219 pub fn transfer_subject(&mut self, data: TransferSubject) {
221 if data.new_owner == *self.our_key {
222 self.reject_subjects.remove(&data.subject_id);
223 }
224
225 self.transfer_subjects.insert(
226 data.subject_id,
227 TransferData {
228 name: data.name,
229 new_owner: data.new_owner,
230 actual_owner: data.actual_owner,
231 },
232 );
233 }
234
235 pub fn delete_transfer(&mut self, subject_id: &DigestIdentifier) {
236 if let Some(data) = self.transfer_subjects.remove(subject_id)
237 && data.new_owner == *self.our_key
238 {
239 self.reject_subjects.insert(subject_id.clone());
240 }
241 }
242
243 pub fn confirm_transfer(&mut self, subject_id: DigestIdentifier) {
244 self.our_key.to_string();
245
246 if let Some(data) = self.transfer_subjects.remove(&subject_id) {
247 if data.actual_owner == *self.our_key {
248 if let Some(data) = self.owned_subjects.remove(&subject_id) {
249 self.known_subjects.insert(subject_id, data);
250 }
251 } else if data.new_owner == *self.our_key
252 && let Some(data) = self.known_subjects.remove(&subject_id)
253 {
254 self.owned_subjects.insert(subject_id, data);
255 };
256 };
257 }
258
259 pub fn eol(&mut self, subject_id: DigestIdentifier, i_owner: bool) {
260 if i_owner {
261 if let Some(data) = self.owned_subjects.get_mut(&subject_id) {
262 data.eol();
263 }
264 } else if let Some(data) = self.known_subjects.get_mut(&subject_id) {
265 data.eol();
266 }
267 }
268
269 pub fn register_subject(
270 &mut self,
271 subject_id: DigestIdentifier,
272 owner: PublicKey,
273 data: SubjectData,
274 ) {
275 if *self.our_key == owner {
276 self.owned_subjects.insert(subject_id, data);
277 } else {
278 self.known_subjects.insert(subject_id, data);
279 }
280 }
281
282 pub fn delete_subject(&mut self, subject_id: &DigestIdentifier) {
283 self.owned_subjects
284 .remove(subject_id)
285 .or_else(|| self.known_subjects.remove(subject_id));
286
287 self.transfer_subjects.remove(subject_id);
288 self.reject_subjects.remove(subject_id);
289 }
290
291 fn governance_trackers(
292 &self,
293 governance_id: &DigestIdentifier,
294 ) -> Vec<DigestIdentifier> {
295 self.owned_subjects
296 .iter()
297 .chain(self.known_subjects.iter())
298 .filter_map(|(subject_id, data)| match data {
299 SubjectData::Tracker {
300 governance_id: tracker_governance_id,
301 ..
302 } if tracker_governance_id == governance_id => {
303 Some(subject_id.clone())
304 }
305 _ => None,
306 })
307 .collect()
308 }
309
310 fn sign<T: BorshSerialize>(
311 &self,
312 content: &T,
313 ) -> Result<Signature, ActorError> {
314 Signature::new(content, &self.owner).map_err(|e| {
315 ActorError::Functional {
316 description: format!("{}", e),
317 }
318 })
319 }
320
321 async fn build_compilation_dir(
322 ctx: &ActorContext<Self>,
323 ) -> Result<(), ActorError> {
324 let contracts_path = if let Some(config) =
325 ctx.system().get_helper::<ConfigHelper>("config").await
326 {
327 config.contracts_path
328 } else {
329 error!("Config helper not found");
330 return Err(ActorError::Helper {
331 name: "config".to_owned(),
332 reason: "Not found".to_string(),
333 });
334 };
335
336 let dir = contracts_path.join("contracts");
337
338 if !Path::new(&dir).exists() {
339 fs::create_dir_all(&dir).await.map_err(|e| {
340 error!(
341 error = %e,
342 path = ?dir,
343 "Failed to create contracts directory"
344 );
345 ActorError::FunctionalCritical {
346 description: format!("Can not create contracts dir: {}", e),
347 }
348 })?;
349 }
350 Ok(())
351 }
352
353 async fn create_distributors(
354 &self,
355 ctx: &mut ActorContext<Self>,
356 network: &Arc<NetworkSender>,
357 ) -> Result<(), ActorError> {
358 for subject in self.owned_subjects.keys() {
359 let distributor_name = format!("distributor_{}", subject);
360 if ctx
361 .get_child::<DistriWorker>(&distributor_name)
362 .await
363 .is_err()
364 {
365 ctx.create_child(
366 &distributor_name,
367 DistriWorker {
368 our_key: self.our_key.clone(),
369 network: network.clone(),
370 ledger_batch_size: self.ledger_batch_size,
371 },
372 )
373 .await?;
374 }
375 }
376
377 for subject in self.known_subjects.keys() {
378 let distributor_name = format!("distributor_{}", subject);
379 if ctx
380 .get_child::<DistriWorker>(&distributor_name)
381 .await
382 .is_err()
383 {
384 ctx.create_child(
385 &distributor_name,
386 DistriWorker {
387 our_key: self.our_key.clone(),
388 network: network.clone(),
389 ledger_batch_size: self.ledger_batch_size,
390 },
391 )
392 .await?;
393 }
394 }
395
396 Ok(())
397 }
398
399 fn governance_ids(&self) -> Vec<DigestIdentifier> {
400 let mut governance_ids = self
401 .owned_subjects
402 .iter()
403 .filter(|(_, data)| matches!(data, SubjectData::Governance { .. }))
404 .map(|(subject_id, _)| subject_id.clone())
405 .collect::<HashSet<_>>();
406
407 governance_ids.extend(
408 self.known_subjects
409 .iter()
410 .filter(|(_, data)| {
411 matches!(data, SubjectData::Governance { .. })
412 })
413 .map(|(subject_id, _)| subject_id.clone()),
414 );
415
416 governance_ids.into_iter().collect()
417 }
418
419 async fn get_tracker_ledger_batch(
420 ctx: &ActorContext<Self>,
421 actor: &ave_actors::ActorRef<Tracker>,
422 lo_sn: Option<u64>,
423 hi_sn: u64,
424 ) -> Result<(Vec<Ledger>, bool), ActorError> {
425 let response = actor
426 .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
427 .await?;
428
429 match response {
430 TrackerResponse::Ledger { ledger, is_all } => Ok((ledger, is_all)),
431 _ => Err(ActorError::UnexpectedResponse {
432 path: ctx.path().clone() / "subject_manager",
433 expected: "TrackerResponse::Ledger".to_owned(),
434 }),
435 }
436 }
437
438 async fn get_governance_ledger_batch(
439 ctx: &ActorContext<Self>,
440 actor: &ave_actors::ActorRef<Governance>,
441 lo_sn: Option<u64>,
442 hi_sn: u64,
443 ) -> Result<(Vec<Ledger>, bool), ActorError> {
444 let response = actor
445 .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
446 .await?;
447
448 match response {
449 GovernanceResponse::Ledger { ledger, is_all } => {
450 Ok((ledger, is_all))
451 }
452 _ => Err(ActorError::UnexpectedResponse {
453 path: ctx.path().clone() / "subject_manager",
454 expected: "GovernanceResponse::Ledger".to_owned(),
455 }),
456 }
457 }
458
459 async fn collect_tracker_ledger(
460 &self,
461 ctx: &ActorContext<Self>,
462 subject_id: &DigestIdentifier,
463 hi_sn: u64,
464 ) -> Result<Vec<Ledger>, ActorError> {
465 let path = ActorPath::from(format!(
466 "/user/node/subject_manager/{}",
467 subject_id
468 ));
469 let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
470 let mut ledger = Vec::new();
471 let mut lo_sn = None;
472 let ledger_batch_size = self.ledger_batch_size;
473
474 loop {
475 let from_sn = lo_sn.map_or(0_u64, |sn: u64| sn.saturating_add(1));
476 let batch_hi_sn = from_sn
477 .saturating_add(ledger_batch_size)
478 .saturating_sub(1)
479 .min(hi_sn);
480 let (mut batch, is_all) = Self::get_tracker_ledger_batch(
481 ctx,
482 &tracker,
483 lo_sn,
484 batch_hi_sn,
485 )
486 .await?;
487 if batch.is_empty() {
488 break;
489 }
490 lo_sn = batch.last().map(|event| event.sn);
491 ledger.append(&mut batch);
492 if is_all {
493 break;
494 }
495 }
496
497 Ok(ledger)
498 }
499
500 async fn collect_governance_ledger(
501 &self,
502 ctx: &ActorContext<Self>,
503 subject_id: &DigestIdentifier,
504 hi_sn: u64,
505 ) -> Result<Vec<Ledger>, ActorError> {
506 let path = ActorPath::from(format!(
507 "/user/node/subject_manager/{}",
508 subject_id
509 ));
510 let governance = ctx.system().get_actor::<Governance>(&path).await?;
511 let mut ledger = Vec::new();
512 let mut lo_sn = None;
513 let ledger_batch_size = self.ledger_batch_size;
514
515 loop {
516 let from_sn = lo_sn.map_or(0_u64, |sn: u64| sn.saturating_add(1));
517 let batch_hi_sn = from_sn
518 .saturating_add(ledger_batch_size)
519 .saturating_sub(1)
520 .min(hi_sn);
521 let (mut batch, is_all) = Self::get_governance_ledger_batch(
522 ctx,
523 &governance,
524 lo_sn,
525 batch_hi_sn,
526 )
527 .await?;
528 if batch.is_empty() {
529 break;
530 }
531 lo_sn = batch.last().map(|event| event.sn);
532 ledger.append(&mut batch);
533 if is_all {
534 break;
535 }
536 }
537
538 Ok(ledger)
539 }
540
541 async fn replay_sink_events(
542 &self,
543 ctx: &ActorContext<Self>,
544 subject_id: DigestIdentifier,
545 from_sn: u64,
546 to_sn: Option<u64>,
547 limit: u64,
548 ) -> Result<SinkEventsPage, ActorError> {
549 if limit == 0 {
550 return Err(ActorError::Functional {
551 description: "Replay limit must be greater than zero"
552 .to_owned(),
553 });
554 }
555 if let Some(to_sn) = to_sn
556 && from_sn > to_sn
557 {
558 return Err(ActorError::Functional {
559 description: "Replay range requires from_sn <= to_sn"
560 .to_owned(),
561 });
562 }
563
564 let Some(subject_data) = self
565 .owned_subjects
566 .get(&subject_id)
567 .cloned()
568 .or_else(|| self.known_subjects.get(&subject_id).cloned())
569 else {
570 return Err(ActorError::NotFound {
571 path: ActorPath::from(format!(
572 "/user/node/subject_manager/{}",
573 subject_id
574 )),
575 });
576 };
577
578 let sink_timestamp = ave_common::identity::TimeStamp::now().as_nanos();
579 let public_key = self.our_key.to_string();
580
581 match subject_data {
582 SubjectData::Governance { .. } => {
583 let path = ActorPath::from(format!(
584 "/user/node/subject_manager/{}",
585 subject_id
586 ));
587 let governance =
588 ctx.system().get_actor::<Governance>(&path).await?;
589 let response =
590 governance.ask(GovernanceMessage::GetMetadata).await?;
591 let GovernanceResponse::Metadata(metadata) = response else {
592 return Err(ActorError::UnexpectedResponse {
593 path,
594 expected: "GovernanceResponse::Metadata".to_owned(),
595 });
596 };
597 let hi_sn = to_sn
598 .map(|to_sn| to_sn.min(metadata.sn))
599 .unwrap_or(metadata.sn);
600 let ledger = self
601 .collect_governance_ledger(ctx, &subject_id, hi_sn)
602 .await?;
603 replay_ledgers_to_sink_events(
604 &ledger,
605 &public_key,
606 from_sn,
607 to_sn,
608 limit,
609 sink_timestamp,
610 )
611 }
612 SubjectData::Tracker { .. } => {
613 let subject_manager =
614 ctx.get_child::<SubjectManager>("subject_manager").await?;
615 let requester = format!(
616 "node_replay_sink_events:{}:{}:{}",
617 subject_id, from_sn, limit
618 );
619 subject_manager
620 .ask(SubjectManagerMessage::Up {
621 subject_id: subject_id.clone(),
622 requester: requester.clone(),
623 create_ledger: None,
624 })
625 .await?;
626
627 let result = async {
628 let path = ActorPath::from(format!(
629 "/user/node/subject_manager/{}",
630 subject_id
631 ));
632 let tracker =
633 ctx.system().get_actor::<Tracker>(&path).await?;
634 let response =
635 tracker.ask(TrackerMessage::GetMetadata).await?;
636 let TrackerResponse::Metadata(metadata) = response else {
637 return Err(ActorError::UnexpectedResponse {
638 path,
639 expected: "TrackerResponse::Metadata".to_owned(),
640 });
641 };
642 let hi_sn = to_sn
643 .map(|to_sn| to_sn.min(metadata.sn))
644 .unwrap_or(metadata.sn);
645 let ledger = self
646 .collect_tracker_ledger(ctx, &subject_id, hi_sn)
647 .await?;
648 replay_ledgers_to_sink_events(
649 &ledger,
650 &public_key,
651 from_sn,
652 to_sn,
653 limit,
654 sink_timestamp,
655 )
656 }
657 .await;
658
659 let _ = subject_manager
660 .ask(SubjectManagerMessage::Finish {
661 subject_id,
662 requester,
663 })
664 .await;
665
666 result
667 }
668 }
669 }
670}
671
672#[derive(Clone, Debug, Serialize, Deserialize)]
674pub enum NodeMessage {
675 GetGovernances,
676 GetSinkEvents {
677 subject_id: DigestIdentifier,
678 from_sn: u64,
679 to_sn: Option<u64>,
680 limit: u64,
681 },
682 SignRequest(Box<SignTypesNode>),
683 PendingTransfers,
684 RegisterSubject {
685 owner: PublicKey,
686 subject_id: DigestIdentifier,
687 data: SubjectData,
688 },
689 GetSubjectData(DigestIdentifier),
690 GovernanceTrackers(DigestIdentifier),
691 DeleteSubject(DigestIdentifier),
692 IOwnerNewOwnerSubject(DigestIdentifier),
693 ICanSendLastLedger(DigestIdentifier),
694 AuthData(DigestIdentifier),
695 TransferSubject(TransferSubject),
696 RejectTransfer(DigestIdentifier),
697 ConfirmTransfer(DigestIdentifier),
698 EOLSubject {
699 subject_id: DigestIdentifier,
700 i_owner: bool,
701 },
702}
703
704impl Message for NodeMessage {
705 fn is_critical(&self) -> bool {
706 matches!(
707 self,
708 Self::TransferSubject(..)
709 | Self::DeleteSubject(..)
710 | Self::RejectTransfer(..)
711 | Self::ConfirmTransfer(..)
712 | Self::EOLSubject { .. }
713 )
714 }
715}
716
717#[derive(Clone, Debug, Serialize, Deserialize)]
719pub enum NodeResponse {
720 Governances(Vec<DigestIdentifier>),
721 SinkEvents(SinkEventsPage),
722 SubjectData(Option<SubjectData>),
723 GovernanceTrackers(Vec<DigestIdentifier>),
724 PendingTransfers(Vec<TransferSubject>),
725 SignRequest(Signature),
726 IOwnerNewOwner {
727 i_owner: bool,
728 i_new_owner: Option<bool>,
729 },
730 AuthData {
731 auth: bool,
732 subject_data: Option<SubjectData>,
733 },
734 Ok,
735}
736
737impl Response for NodeResponse {}
738
739#[derive(
741 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
742)]
743pub enum NodeEvent {
744 RegisterSubject {
745 owner: PublicKey,
746 subject_id: DigestIdentifier,
747 data: SubjectData,
748 },
749 DeleteSubject(DigestIdentifier),
750 TransferSubject(TransferSubject),
751 RejectTransfer(DigestIdentifier),
752 ConfirmTransfer(DigestIdentifier),
753 EOLSubject {
754 subject_id: DigestIdentifier,
755 i_owner: bool,
756 },
757}
758
759impl Event for NodeEvent {}
760
761#[async_trait]
762impl Actor for Node {
763 type Event = NodeEvent;
764 type Message = NodeMessage;
765 type Response = NodeResponse;
766
767 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
768 parent_span.map_or_else(
769 || info_span!("Node"),
770 |parent_span| info_span!(parent: parent_span, "Node"),
771 )
772 }
773
774 async fn pre_start(
775 &mut self,
776 ctx: &mut ave_actors::ActorContext<Self>,
777 ) -> Result<(), ActorError> {
778 if let Err(e) = Self::build_compilation_dir(ctx).await {
779 error!(
780 error = %e,
781 "Failed to build compilation directory"
782 );
783 return Err(e);
784 }
785
786 if let Err(e) = self.init_store("node", None, true, ctx).await {
788 error!(
789 error = %e,
790 "Failed to initialize node store"
791 );
792 return Err(e);
793 }
794
795 let Some(config) =
796 ctx.system().get_helper::<ConfigHelper>("config").await
797 else {
798 error!("Config helper not found");
799 return Err(ActorError::Helper {
800 name: "config".to_string(),
801 reason: "Not found".to_string(),
802 });
803 };
804 let safe_mode = config.safe_mode;
805 let Some(network): Option<Arc<NetworkSender>> =
806 ctx.system().get_helper("network").await
807 else {
808 error!("Network helper not found");
809 return Err(ActorError::Helper {
810 name: "network".to_string(),
811 reason: "Not found".to_string(),
812 });
813 };
814
815 if !safe_mode {
816 let register_actor =
817 match ctx.create_child("register", Register).await {
818 Ok(actor) => actor,
819 Err(e) => {
820 error!(error = %e, "Failed to create register child");
821 return Err(e);
822 }
823 };
824
825 let Some(ext_db): Option<Arc<ExternalDB>> =
826 ctx.system().get_helper("ext_db").await
827 else {
828 error!("External DB helper not found");
829 return Err(ActorError::Helper {
830 name: "ext_db".to_string(),
831 reason: "Not found".to_string(),
832 });
833 };
834
835 let sink =
836 Sink::new(register_actor.subscribe(), ext_db.get_register());
837 ctx.system().run_sink(sink).await;
838
839 if let Err(e) = ctx
840 .create_child(
841 "manual_distribution",
842 ManualDistribution::new(self.our_key.clone()),
843 )
844 .await
845 {
846 error!(
847 error = %e,
848 "Failed to create manual_distribution child"
849 );
850 return Err(e);
851 }
852
853 if let Err(e) = self.create_distributors(ctx, &network).await {
854 error!(
855 error = %e,
856 "Failed to create distributors"
857 );
858 return Err(e);
859 }
860 }
861
862 let Some(hash) = self.hash else {
863 error!("Hash is None during subject manager startup");
864 return Err(ActorError::FunctionalCritical {
865 description: "Hash is None".to_string(),
866 });
867 };
868
869 let subject_manager = match ctx
870 .create_child(
871 "subject_manager",
872 SubjectManager::new(
873 self.our_key.clone(),
874 hash,
875 self.is_service,
876 self.only_clear_events,
877 ),
878 )
879 .await
880 {
881 Ok(actor) => actor,
882 Err(e) => {
883 error!(error = %e, "Failed to create subject_manager child");
884 return Err(e);
885 }
886 };
887
888 if let Err(e) = subject_manager
889 .ask(SubjectManagerMessage::UpGovernances {
890 governance_ids: self.governance_ids(),
891 })
892 .await
893 {
894 error!(error = %e, "Failed to bootstrap governances");
895 return Err(e);
896 }
897
898 if let Err(e) = ctx
899 .create_child(
900 "auth",
901 Auth::initial(AuthInitParams {
902 network: network.clone(),
903 our_key: self.our_key.clone(),
904 round_retry_interval_secs: config
905 .sync_update
906 .round_retry_interval_secs,
907 max_round_retries: config.sync_update.max_round_retries,
908 witness_retry_count: config.sync_update.witness_retry_count,
909 witness_retry_interval_secs: config
910 .sync_update
911 .witness_retry_interval_secs,
912 }),
913 )
914 .await
915 {
916 error!(
917 error = %e,
918 "Failed to create auth child"
919 );
920 return Err(e);
921 }
922
923 if !safe_mode
924 && let Err(e) = ctx
925 .create_child(
926 "distributor",
927 DistriWorker {
928 our_key: self.our_key.clone(),
929 network,
930 ledger_batch_size: self.ledger_batch_size,
931 },
932 )
933 .await
934 {
935 error!(
936 error = %e,
937 "Failed to create distributor child"
938 );
939 return Err(e);
940 }
941
942 Ok(())
943 }
944}
945
946#[async_trait]
947impl Handler<Self> for Node {
948 async fn handle_message(
949 &mut self,
950 _sender: ActorPath,
951 msg: NodeMessage,
952 ctx: &mut ave_actors::ActorContext<Self>,
953 ) -> Result<NodeResponse, ActorError> {
954 match msg {
955 NodeMessage::GetSinkEvents {
956 subject_id,
957 from_sn,
958 to_sn,
959 limit,
960 } => Ok(NodeResponse::SinkEvents(
961 self.replay_sink_events(ctx, subject_id, from_sn, to_sn, limit)
962 .await?,
963 )),
964 NodeMessage::RegisterSubject {
965 owner,
966 subject_id,
967 data,
968 } => {
969 let Some(network): Option<Arc<NetworkSender>> =
970 ctx.system().get_helper("network").await
971 else {
972 error!(
973 msg_type = "RegisterSubject",
974 subject_id = %subject_id,
975 "Network helper not found"
976 );
977 return Err(ActorError::Helper {
978 name: "network".to_string(),
979 reason: "Not found".to_string(),
980 });
981 };
982
983 self.on_event(
984 NodeEvent::RegisterSubject {
985 owner,
986 subject_id: subject_id.clone(),
987 data,
988 },
989 ctx,
990 )
991 .await;
992
993 let distributor_name = format!("distributor_{}", subject_id);
994 if ctx
995 .get_child::<DistriWorker>(&distributor_name)
996 .await
997 .is_err()
998 {
999 ctx.create_child(
1000 &distributor_name,
1001 DistriWorker {
1002 our_key: self.our_key.clone(),
1003 network,
1004 ledger_batch_size: self.ledger_batch_size,
1005 },
1006 )
1007 .await?;
1008 }
1009
1010 Ok(NodeResponse::Ok)
1011 }
1012 NodeMessage::EOLSubject {
1013 subject_id,
1014 i_owner,
1015 } => {
1016 self.on_event(
1017 NodeEvent::EOLSubject {
1018 subject_id: subject_id.clone(),
1019 i_owner,
1020 },
1021 ctx,
1022 )
1023 .await;
1024
1025 debug!(
1026 msg_type = "EOLSubject",
1027 subject_id = %subject_id,
1028 i_owner = %i_owner,
1029 "EOL confirmed"
1030 );
1031
1032 Ok(NodeResponse::Ok)
1033 }
1034 NodeMessage::GetGovernances => {
1035 let mut gov_know = self
1036 .known_subjects
1037 .iter()
1038 .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1039 .map(|x| x.0.clone())
1040 .collect::<Vec<DigestIdentifier>>();
1041 let mut gov_owned = self
1042 .owned_subjects
1043 .iter()
1044 .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1045 .map(|x| x.0.clone())
1046 .collect::<Vec<DigestIdentifier>>();
1047 gov_know.append(&mut gov_owned);
1048
1049 return Ok(NodeResponse::Governances(gov_know));
1050 }
1051 NodeMessage::ICanSendLastLedger(subject_id) => {
1052 let subject_data = if self.reject_subjects.contains(&subject_id)
1053 {
1054 self.known_subjects.get(&subject_id).cloned()
1055 } else {
1056 self.owned_subjects.get(&subject_id).cloned()
1057 };
1058
1059 Ok(NodeResponse::SubjectData(subject_data))
1060 }
1061 NodeMessage::GetSubjectData(subject_id) => {
1062 let data = self.get_subject_data(&subject_id);
1063 if data.is_none() {
1064 debug!(
1065 msg_type = "GetSubjectData",
1066 subject_id = %subject_id,
1067 "Subject not found"
1068 );
1069 } else {
1070 debug!(
1071 msg_type = "GetSubjectData",
1072 subject_id = %subject_id,
1073 "Subject data retrieved successfully"
1074 );
1075 }
1076
1077 Ok(NodeResponse::SubjectData(data))
1078 }
1079 NodeMessage::GovernanceTrackers(governance_id) => {
1080 let trackers = self.governance_trackers(&governance_id);
1081
1082 debug!(
1083 msg_type = "GovernanceTrackers",
1084 governance_id = %governance_id,
1085 count = trackers.len(),
1086 "Governance tracker association check completed"
1087 );
1088
1089 Ok(NodeResponse::GovernanceTrackers(trackers))
1090 }
1091 NodeMessage::DeleteSubject(subject_id) => {
1092 self.on_event(
1093 NodeEvent::DeleteSubject(subject_id.clone()),
1094 ctx,
1095 )
1096 .await;
1097
1098 debug!(
1099 msg_type = "DeleteSubject",
1100 subject_id = %subject_id,
1101 "Subject deleted from node state"
1102 );
1103
1104 Ok(NodeResponse::Ok)
1105 }
1106 NodeMessage::PendingTransfers => {
1107 let transfers: Vec<TransferSubject> = self
1108 .transfer_subjects
1109 .iter()
1110 .map(|x| TransferSubject {
1111 name: x.1.name.clone(),
1112 subject_id: x.0.clone(),
1113 new_owner: x.1.new_owner.clone(),
1114 actual_owner: x.1.actual_owner.clone(),
1115 })
1116 .collect();
1117
1118 debug!(
1119 msg_type = "PendingTransfers",
1120 count = transfers.len(),
1121 "Retrieved pending transfers"
1122 );
1123
1124 Ok(NodeResponse::PendingTransfers(transfers))
1125 }
1126 NodeMessage::RejectTransfer(subject_id) => {
1127 self.on_event(
1128 NodeEvent::RejectTransfer(subject_id.clone()),
1129 ctx,
1130 )
1131 .await;
1132
1133 debug!(
1134 msg_type = "RejectTransfer",
1135 subject_id = %subject_id,
1136 "Transfer rejected successfully"
1137 );
1138
1139 Ok(NodeResponse::Ok)
1140 }
1141 NodeMessage::TransferSubject(data) => {
1142 let subject_id = data.subject_id.clone();
1143 self.on_event(NodeEvent::TransferSubject(data), ctx).await;
1144
1145 debug!(
1146 msg_type = "TransferSubject",
1147 subject_id = %subject_id,
1148 "Subject transfer registered successfully"
1149 );
1150
1151 Ok(NodeResponse::Ok)
1152 }
1153 NodeMessage::ConfirmTransfer(subject_id) => {
1154 self.on_event(
1155 NodeEvent::ConfirmTransfer(subject_id.clone()),
1156 ctx,
1157 )
1158 .await;
1159
1160 debug!(
1161 msg_type = "ConfirmTransfer",
1162 subject_id = %subject_id,
1163 "Transfer confirmed between other parties"
1164 );
1165
1166 Ok(NodeResponse::Ok)
1167 }
1168 NodeMessage::SignRequest(content) => {
1169 let content = *content;
1170 let content_type = match &content {
1171 SignTypesNode::EventRequest(_) => "EventRequest",
1172 SignTypesNode::ValidationReq(_) => "ValidationReq",
1173 SignTypesNode::ValidationRes(_) => "ValidationRes",
1174 SignTypesNode::EvaluationReq(_) => "EvaluationReq",
1175 SignTypesNode::EvaluationSignature(_) => "EvaluationRes",
1176 SignTypesNode::ApprovalReq(_) => "ApprovalReq",
1177 SignTypesNode::ApprovalRes(_) => "ApprovalRes",
1178 SignTypesNode::LedgerSeal(_) => "LedgerSeal",
1179 };
1180
1181 let sign = match content {
1182 SignTypesNode::EventRequest(event_req) => {
1183 self.sign(&event_req)
1184 }
1185 SignTypesNode::ValidationReq(validation_req) => {
1186 self.sign(&*validation_req)
1187 }
1188 SignTypesNode::ValidationRes(validation_res) => {
1189 self.sign(&validation_res)
1190 }
1191 SignTypesNode::EvaluationReq(evaluation_req) => {
1192 self.sign(&*evaluation_req)
1193 }
1194 SignTypesNode::EvaluationSignature(evaluation_res) => {
1195 self.sign(&evaluation_res)
1196 }
1197 SignTypesNode::ApprovalReq(approval_req) => {
1198 self.sign(&approval_req)
1199 }
1200 SignTypesNode::ApprovalRes(approval_res) => {
1201 self.sign(&*approval_res)
1202 }
1203 SignTypesNode::LedgerSeal(ledger) => self.sign(&ledger),
1204 }
1205 .map_err(|e| {
1206 error!(
1207 msg_type = "SignRequest",
1208 content_type = content_type,
1209 error = %e,
1210 "Failed to sign content"
1211 );
1212 ActorError::FunctionalCritical {
1213 description: format!("Can not sign event: {}", e),
1214 }
1215 })?;
1216
1217 debug!(
1218 msg_type = "SignRequest",
1219 content_type = content_type,
1220 "Content signed successfully"
1221 );
1222
1223 Ok(NodeResponse::SignRequest(sign))
1224 }
1225 NodeMessage::IOwnerNewOwnerSubject(subject_id) => {
1226 let i_owner =
1227 self.owned_subjects.keys().any(|x| *x == subject_id);
1228
1229 let i_new_owner = if let Some(data) =
1230 self.transfer_subjects.get(&subject_id)
1231 {
1232 Some(data.new_owner == *self.our_key)
1233 } else {
1234 None
1235 };
1236
1237 debug!(
1238 msg_type = "OwnerPendingSubject",
1239 subject_id = %subject_id,
1240 i_owner = i_owner,
1241 i_new_owner = i_new_owner,
1242 "Checked owner/pending status"
1243 );
1244
1245 Ok(NodeResponse::IOwnerNewOwner {
1246 i_owner,
1247 i_new_owner,
1248 })
1249 }
1250 NodeMessage::AuthData(subject_id) => {
1251 let authorized_subjects = match ctx
1252 .get_child::<Auth>("auth")
1253 .await
1254 {
1255 Ok(auth) => {
1256 let res = match auth.ask(AuthMessage::GetAuths).await {
1257 Ok(res) => res,
1258 Err(e) => {
1259 error!(
1260 msg_type = "AuthData",
1261 subject_id = %subject_id,
1262 error = %e,
1263 "Failed to get authorizations from auth actor"
1264 );
1265 ctx.system().crash_system();
1266 return Err(e);
1267 }
1268 };
1269 let AuthResponse::Auths { subjects } = res else {
1270 error!(
1271 msg_type = "AuthData",
1272 subject_id = %subject_id,
1273 "Unexpected response from auth actor"
1274 );
1275 ctx.system().crash_system();
1276 return Err(ActorError::UnexpectedResponse {
1277 expected: "AuthResponse::Auths".to_owned(),
1278 path: ctx.path().clone() / "auth",
1279 });
1280 };
1281 subjects
1282 }
1283 Err(e) => {
1284 error!(
1285 msg_type = "AuthData",
1286 subject_id = %subject_id,
1287 error = %e,
1288 "Auth actor not found"
1289 );
1290 ctx.system().crash_system();
1291 return Err(e);
1292 }
1293 };
1294
1295 let auth_subj =
1296 authorized_subjects.iter().any(|x| x.clone() == subject_id);
1297
1298 let subj_data = self
1299 .known_subjects
1300 .get(&subject_id)
1301 .or_else(|| self.owned_subjects.get(&subject_id))
1302 .cloned();
1303
1304 debug!(
1305 msg_type = "AuthData",
1306 subject_id = %subject_id,
1307 authorized = auth_subj,
1308 subject_data = ?subj_data,
1309 "Checked subject authorization status"
1310 );
1311
1312 Ok(NodeResponse::AuthData {
1313 auth: auth_subj,
1314 subject_data: subj_data,
1315 })
1316 }
1317 }
1318 }
1319
1320 async fn on_child_fault(
1321 &mut self,
1322 error: ActorError,
1323 ctx: &mut ActorContext<Self>,
1324 ) -> ChildAction {
1325 error!(
1326 error = %error,
1327 "Child actor fault, stopping system"
1328 );
1329 ctx.system().crash_system();
1330 ChildAction::Stop
1331 }
1332
1333 async fn on_event(
1334 &mut self,
1335 event: NodeEvent,
1336 ctx: &mut ActorContext<Self>,
1337 ) {
1338 if let Err(e) = self.persist(&event, ctx).await {
1339 error!(
1340 event = ?event,
1341 error = %e,
1342 "Failed to persist node event"
1343 );
1344 ctx.system().crash_system();
1345 }
1346 }
1347}
1348
1349pub struct InitParamsNode {
1350 pub key_pair: KeyPair,
1351 pub public_key: Arc<PublicKey>,
1352 pub hash: HashAlgorithm,
1353 pub is_service: bool,
1354 pub only_clear_events: bool,
1355 pub ledger_batch_size: u64,
1356}
1357
1358#[async_trait]
1359impl PersistentActor for Node {
1360 type Persistence = LightPersistence;
1361 type InitParams = InitParamsNode;
1362
1363 fn update(&mut self, state: Self) {
1364 self.owned_subjects = state.owned_subjects;
1365 self.known_subjects = state.known_subjects;
1366 self.transfer_subjects = state.transfer_subjects;
1367 self.reject_subjects = state.reject_subjects
1368 }
1369
1370 fn create_initial(params: Self::InitParams) -> Self {
1371 Self {
1372 hash: Some(params.hash),
1373 owner: params.key_pair,
1374 our_key: params.public_key,
1375 is_service: params.is_service,
1376 only_clear_events: params.only_clear_events,
1377 ledger_batch_size: params.ledger_batch_size,
1378 owned_subjects: HashMap::new(),
1379 known_subjects: HashMap::new(),
1380 transfer_subjects: HashMap::new(),
1381 reject_subjects: HashSet::new(),
1382 }
1383 }
1384
1385 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1387 match event {
1388 NodeEvent::EOLSubject {
1389 subject_id,
1390 i_owner,
1391 } => {
1392 self.eol(subject_id.clone(), *i_owner);
1393 debug!(
1394 event_type = "EOLSubject",
1395 subject_id = %subject_id,
1396 i_owner = &i_owner,
1397 "Applied eol"
1398 );
1399 }
1400 NodeEvent::ConfirmTransfer(subject_id) => {
1401 self.confirm_transfer(subject_id.clone());
1402 debug!(
1403 event_type = "ConfirmTransfer",
1404 subject_id = %subject_id,
1405 "Applied transfer confirmation"
1406 );
1407 }
1408 NodeEvent::RegisterSubject {
1409 subject_id,
1410 data,
1411 owner,
1412 } => {
1413 self.register_subject(
1414 subject_id.clone(),
1415 owner.clone(),
1416 data.clone(),
1417 );
1418 debug!(
1419 event_type = "RegisterSubject",
1420 subject_id = %subject_id,
1421 owner = %owner,
1422 "Applied subject registration"
1423 );
1424 }
1425 NodeEvent::DeleteSubject(subject_id) => {
1426 self.delete_subject(subject_id);
1427 debug!(
1428 event_type = "DeleteSubject",
1429 subject_id = %subject_id,
1430 "Applied subject deletion"
1431 );
1432 }
1433 NodeEvent::RejectTransfer(subject_id) => {
1434 self.delete_transfer(subject_id);
1435 debug!(
1436 event_type = "RejectTransfer",
1437 subject_id = %subject_id,
1438 "Applied transfer rejection"
1439 );
1440 }
1441 NodeEvent::TransferSubject(transfer) => {
1442 self.transfer_subject(transfer.clone());
1443 debug!(
1444 event_type = "TransferSubject",
1445 subject_id = %transfer.subject_id,
1446 new_owner = %transfer.new_owner,
1447 "Applied subject transfer"
1448 );
1449 }
1450 };
1451
1452 Ok(())
1453 }
1454}
1455
1456#[async_trait]
1457impl Storable for Node {}