Skip to main content

ave_core/node/
mod.rs

1//! Node module
2//!
3
4use std::{
5    collections::{HashMap, HashSet},
6    path::Path,
7    sync::Arc,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use register::Register;
12use tokio::fs;
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16    auth::{Auth, AuthMessage, AuthResponse},
17    db::Storable,
18    distribution::worker::DistriWorker,
19    governance::{Governance, GovernanceMessage, GovernanceResponse},
20    helpers::{db::ExternalDB, network::service::NetworkSender},
21    manual_distribution::ManualDistribution,
22    model::common::node::SignTypesNode,
23    node::subject_manager::{SubjectManager, SubjectManagerMessage},
24    subject::{
25        SignedLedger, replay_sink_events as replay_ledgers_to_sink_events,
26    },
27    system::ConfigHelper,
28    tracker::{Tracker, TrackerMessage, TrackerResponse},
29};
30
31use ave_common::{
32    SchemaType,
33    identity::{
34        DigestIdentifier, HashAlgorithm, PublicKey, Signature, keys::KeyPair,
35    },
36    response::SinkEventsPage,
37};
38
39use async_trait::async_trait;
40use ave_actors::{
41    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
42    Message, Response, Sink,
43};
44use ave_actors::{LightPersistence, PersistentActor};
45use serde::{Deserialize, Serialize};
46
47pub mod register;
48pub mod subject_manager;
49
50#[derive(
51    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
52)]
53pub struct TransferSubject {
54    pub name: Option<String>,
55    pub subject_id: DigestIdentifier,
56    pub new_owner: PublicKey,
57    pub actual_owner: PublicKey,
58}
59
60#[derive(
61    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
62)]
63pub struct TransferData {
64    pub name: Option<String>,
65    pub new_owner: PublicKey,
66    pub actual_owner: PublicKey,
67}
68
69#[derive(
70    Clone,
71    Debug,
72    Serialize,
73    Deserialize,
74    BorshSerialize,
75    BorshDeserialize,
76    Eq,
77    PartialEq,
78)]
79pub enum SubjectData {
80    Tracker {
81        governance_id: DigestIdentifier,
82        schema_id: SchemaType,
83        namespace: String,
84        active: bool,
85    },
86    Governance {
87        active: bool,
88    },
89}
90
91impl SubjectData {
92    pub fn get_schema_id(&self) -> SchemaType {
93        match self {
94            Self::Tracker { schema_id, .. } => schema_id.clone(),
95            Self::Governance { .. } => SchemaType::Governance,
96        }
97    }
98
99    pub fn get_governance_id(&self) -> Option<DigestIdentifier> {
100        match self {
101            Self::Tracker { governance_id, .. } => Some(governance_id.clone()),
102            Self::Governance { .. } => None,
103        }
104    }
105
106    pub fn get_namespace(&self) -> String {
107        match self {
108            Self::Tracker { namespace, .. } => namespace.clone(),
109            Self::Governance { .. } => String::default(),
110        }
111    }
112
113    pub const fn get_active(&self) -> bool {
114        match self {
115            Self::Tracker { active, .. } => *active,
116            Self::Governance { active } => *active,
117        }
118    }
119
120    pub const fn eol(&mut self) {
121        match self {
122            Self::Tracker { active, .. } => *active = false,
123            Self::Governance { active } => *active = false,
124        };
125    }
126}
127
128/// Node struct.
129#[derive(Debug, Serialize, Deserialize, Clone)]
130pub struct Node {
131    /// Owner of the node.
132    #[serde(skip)]
133    owner: KeyPair,
134    #[serde(skip)]
135    our_key: Arc<PublicKey>,
136    #[serde(skip)]
137    hash: Option<HashAlgorithm>,
138    #[serde(skip)]
139    is_service: bool,
140    /// The node's owned subjects.
141    owned_subjects: HashMap<DigestIdentifier, SubjectData>,
142    /// The node's known subjects.
143    known_subjects: HashMap<DigestIdentifier, SubjectData>,
144
145    transfer_subjects: HashMap<DigestIdentifier, TransferData>,
146
147    reject_subjects: HashSet<DigestIdentifier>,
148}
149
150// Manual Borsh implementation to skip the 'owner' field
151impl BorshSerialize for Node {
152    fn serialize<W: std::io::Write>(
153        &self,
154        writer: &mut W,
155    ) -> std::io::Result<()> {
156        // Serialize only the fields we want to persist, skipping 'owner'
157        BorshSerialize::serialize(&self.owned_subjects, writer)?;
158        BorshSerialize::serialize(&self.known_subjects, writer)?;
159        BorshSerialize::serialize(&self.transfer_subjects, writer)?;
160        BorshSerialize::serialize(&self.reject_subjects, writer)?;
161        Ok(())
162    }
163}
164
165impl BorshDeserialize for Node {
166    fn deserialize_reader<R: std::io::Read>(
167        reader: &mut R,
168    ) -> std::io::Result<Self> {
169        // Deserialize the persisted fields
170        let owned_subjects =
171            HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
172                reader,
173            )?;
174        let known_subjects =
175            HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
176                reader,
177            )?;
178        let transfer_subjects =
179            HashMap::<DigestIdentifier, TransferData>::deserialize_reader(
180                reader,
181            )?;
182        let reject_subjects =
183            HashSet::<DigestIdentifier>::deserialize_reader(reader)?;
184
185        // Create a default/placeholder KeyPair for 'owner'
186        // This will be replaced by the actual owner during actor initialization
187        let owner = KeyPair::default();
188        let our_key = Arc::new(PublicKey::default());
189        let hash = None;
190
191        Ok(Self {
192            hash,
193            our_key,
194            owner,
195            owned_subjects,
196            known_subjects,
197            transfer_subjects,
198            reject_subjects,
199            is_service: false,
200        })
201    }
202}
203
204impl Node {
205    fn get_subject_data(
206        &self,
207        subject_id: &DigestIdentifier,
208    ) -> Option<SubjectData> {
209        self.owned_subjects
210            .get(subject_id)
211            .or_else(|| self.known_subjects.get(subject_id))
212            .cloned()
213    }
214
215    /// Adds a subject to the node's owned subjects.
216    pub fn transfer_subject(&mut self, data: TransferSubject) {
217        if data.new_owner == *self.our_key {
218            self.reject_subjects.remove(&data.subject_id);
219        }
220
221        self.transfer_subjects.insert(
222            data.subject_id,
223            TransferData {
224                name: data.name,
225                new_owner: data.new_owner,
226                actual_owner: data.actual_owner,
227            },
228        );
229    }
230
231    pub fn delete_transfer(&mut self, subject_id: &DigestIdentifier) {
232        if let Some(data) = self.transfer_subjects.remove(subject_id)
233            && data.actual_owner == *self.our_key
234        {
235            self.reject_subjects.insert(subject_id.clone());
236        }
237    }
238
239    pub fn confirm_transfer(&mut self, subject_id: DigestIdentifier) {
240        self.our_key.to_string();
241
242        if let Some(data) = self.transfer_subjects.remove(&subject_id) {
243            if data.actual_owner == *self.our_key {
244                if let Some(data) = self.owned_subjects.remove(&subject_id) {
245                    self.known_subjects.insert(subject_id, data);
246                }
247            } else if data.new_owner == *self.our_key
248                && let Some(data) = self.known_subjects.remove(&subject_id)
249            {
250                self.owned_subjects.insert(subject_id, data);
251            };
252        };
253    }
254
255    pub fn eol(&mut self, subject_id: DigestIdentifier, i_owner: bool) {
256        if i_owner {
257            if let Some(data) = self.owned_subjects.get_mut(&subject_id) {
258                data.eol();
259            }
260        } else if let Some(data) = self.known_subjects.get_mut(&subject_id) {
261            data.eol();
262        }
263    }
264
265    pub fn register_subject(
266        &mut self,
267        subject_id: DigestIdentifier,
268        owner: PublicKey,
269        data: SubjectData,
270    ) {
271        if *self.our_key == owner {
272            self.owned_subjects.insert(subject_id, data);
273        } else {
274            self.known_subjects.insert(subject_id, data);
275        }
276    }
277
278    pub fn delete_subject(&mut self, subject_id: &DigestIdentifier) {
279        self.owned_subjects
280            .remove(subject_id)
281            .or_else(|| self.known_subjects.remove(subject_id));
282
283        self.transfer_subjects.remove(subject_id);
284        self.reject_subjects.remove(subject_id);
285    }
286
287    fn governance_trackers(
288        &self,
289        governance_id: &DigestIdentifier,
290    ) -> Vec<DigestIdentifier> {
291        self.owned_subjects
292            .iter()
293            .chain(self.known_subjects.iter())
294            .filter_map(|(subject_id, data)| match data {
295                SubjectData::Tracker {
296                    governance_id: tracker_governance_id,
297                    ..
298                } if tracker_governance_id == governance_id => {
299                    Some(subject_id.clone())
300                }
301                _ => None,
302            })
303            .collect()
304    }
305
306    fn sign<T: BorshSerialize>(
307        &self,
308        content: &T,
309    ) -> Result<Signature, ActorError> {
310        Signature::new(content, &self.owner).map_err(|e| {
311            ActorError::Functional {
312                description: format!("{}", e),
313            }
314        })
315    }
316
317    async fn build_compilation_dir(
318        ctx: &ActorContext<Self>,
319    ) -> Result<(), ActorError> {
320        let contracts_path = if let Some(config) =
321            ctx.system().get_helper::<ConfigHelper>("config").await
322        {
323            config.contracts_path
324        } else {
325            error!("Config helper not found");
326            return Err(ActorError::Helper {
327                name: "config".to_owned(),
328                reason: "Not found".to_string(),
329            });
330        };
331
332        let dir = contracts_path.join("contracts");
333
334        if !Path::new(&dir).exists() {
335            fs::create_dir_all(&dir).await.map_err(|e| {
336                error!(
337                    error = %e,
338                    path = ?dir,
339                    "Failed to create contracts directory"
340                );
341                ActorError::FunctionalCritical {
342                    description: format!("Can not create contracts dir: {}", e),
343                }
344            })?;
345        }
346        Ok(())
347    }
348
349    async fn create_distributors(
350        &self,
351        ctx: &mut ActorContext<Self>,
352        network: &Arc<NetworkSender>,
353    ) -> Result<(), ActorError> {
354        for subject in self.owned_subjects.keys() {
355            let distributor_name = format!("distributor_{}", subject);
356            if ctx
357                .get_child::<DistriWorker>(&distributor_name)
358                .await
359                .is_err()
360            {
361                ctx.create_child(
362                    &distributor_name,
363                    DistriWorker {
364                        our_key: self.our_key.clone(),
365                        network: network.clone(),
366                    },
367                )
368                .await?;
369            }
370        }
371
372        for subject in self.known_subjects.keys() {
373            let distributor_name = format!("distributor_{}", subject);
374            if ctx
375                .get_child::<DistriWorker>(&distributor_name)
376                .await
377                .is_err()
378            {
379                ctx.create_child(
380                    &distributor_name,
381                    DistriWorker {
382                        our_key: self.our_key.clone(),
383                        network: network.clone(),
384                    },
385                )
386                .await?;
387            }
388        }
389
390        Ok(())
391    }
392
393    fn governance_ids(&self) -> Vec<DigestIdentifier> {
394        let mut governance_ids = self
395            .owned_subjects
396            .iter()
397            .filter(|(_, data)| matches!(data, SubjectData::Governance { .. }))
398            .map(|(subject_id, _)| subject_id.clone())
399            .collect::<HashSet<_>>();
400
401        governance_ids.extend(
402            self.known_subjects
403                .iter()
404                .filter(|(_, data)| {
405                    matches!(data, SubjectData::Governance { .. })
406                })
407                .map(|(subject_id, _)| subject_id.clone()),
408        );
409
410        governance_ids.into_iter().collect()
411    }
412
413    async fn get_tracker_ledger_batch(
414        ctx: &ActorContext<Self>,
415        actor: &ave_actors::ActorRef<Tracker>,
416        lo_sn: Option<u64>,
417        hi_sn: u64,
418    ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
419        let response = actor
420            .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
421            .await?;
422
423        match response {
424            TrackerResponse::Ledger { ledger, is_all } => Ok((ledger, is_all)),
425            _ => Err(ActorError::UnexpectedResponse {
426                path: ctx.path().clone() / "subject_manager",
427                expected: "TrackerResponse::Ledger".to_owned(),
428            }),
429        }
430    }
431
432    async fn get_governance_ledger_batch(
433        ctx: &ActorContext<Self>,
434        actor: &ave_actors::ActorRef<Governance>,
435        lo_sn: Option<u64>,
436        hi_sn: u64,
437    ) -> Result<(Vec<SignedLedger>, bool), ActorError> {
438        let response = actor
439            .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
440            .await?;
441
442        match response {
443            GovernanceResponse::Ledger { ledger, is_all } => {
444                Ok((ledger, is_all))
445            }
446            _ => Err(ActorError::UnexpectedResponse {
447                path: ctx.path().clone() / "subject_manager",
448                expected: "GovernanceResponse::Ledger".to_owned(),
449            }),
450        }
451    }
452
453    async fn collect_tracker_ledger(
454        ctx: &ActorContext<Self>,
455        subject_id: &DigestIdentifier,
456        hi_sn: u64,
457    ) -> Result<Vec<SignedLedger>, ActorError> {
458        let path = ActorPath::from(format!(
459            "/user/node/subject_manager/{}",
460            subject_id
461        ));
462        let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
463        let mut ledger = Vec::new();
464        let mut lo_sn = None;
465
466        loop {
467            let (mut batch, is_all) =
468                Self::get_tracker_ledger_batch(ctx, &tracker, lo_sn, hi_sn)
469                    .await?;
470            if batch.is_empty() {
471                break;
472            }
473            lo_sn = batch.last().map(|event| event.content().sn);
474            ledger.append(&mut batch);
475            if is_all {
476                break;
477            }
478        }
479
480        Ok(ledger)
481    }
482
483    async fn collect_governance_ledger(
484        ctx: &ActorContext<Self>,
485        subject_id: &DigestIdentifier,
486        hi_sn: u64,
487    ) -> Result<Vec<SignedLedger>, ActorError> {
488        let path = ActorPath::from(format!(
489            "/user/node/subject_manager/{}",
490            subject_id
491        ));
492        let governance = ctx.system().get_actor::<Governance>(&path).await?;
493        let mut ledger = Vec::new();
494        let mut lo_sn = None;
495
496        loop {
497            let (mut batch, is_all) = Self::get_governance_ledger_batch(
498                ctx,
499                &governance,
500                lo_sn,
501                hi_sn,
502            )
503            .await?;
504            if batch.is_empty() {
505                break;
506            }
507            lo_sn = batch.last().map(|event| event.content().sn);
508            ledger.append(&mut batch);
509            if is_all {
510                break;
511            }
512        }
513
514        Ok(ledger)
515    }
516
517    async fn replay_sink_events(
518        &self,
519        ctx: &ActorContext<Self>,
520        subject_id: DigestIdentifier,
521        from_sn: u64,
522        to_sn: Option<u64>,
523        limit: u64,
524    ) -> Result<SinkEventsPage, ActorError> {
525        if limit == 0 {
526            return Err(ActorError::Functional {
527                description: "Replay limit must be greater than zero"
528                    .to_owned(),
529            });
530        }
531        if let Some(to_sn) = to_sn
532            && from_sn > to_sn
533        {
534            return Err(ActorError::Functional {
535                description: "Replay range requires from_sn <= to_sn"
536                    .to_owned(),
537            });
538        }
539
540        let Some(subject_data) = self
541            .owned_subjects
542            .get(&subject_id)
543            .cloned()
544            .or_else(|| self.known_subjects.get(&subject_id).cloned())
545        else {
546            return Err(ActorError::NotFound {
547                path: ActorPath::from(format!(
548                    "/user/node/subject_manager/{}",
549                    subject_id
550                )),
551            });
552        };
553
554        let sink_timestamp = ave_common::identity::TimeStamp::now().as_nanos();
555        let public_key = self.our_key.to_string();
556
557        match subject_data {
558            SubjectData::Governance { .. } => {
559                let path = ActorPath::from(format!(
560                    "/user/node/subject_manager/{}",
561                    subject_id
562                ));
563                let governance =
564                    ctx.system().get_actor::<Governance>(&path).await?;
565                let response =
566                    governance.ask(GovernanceMessage::GetMetadata).await?;
567                let GovernanceResponse::Metadata(metadata) = response else {
568                    return Err(ActorError::UnexpectedResponse {
569                        path,
570                        expected: "GovernanceResponse::Metadata".to_owned(),
571                    });
572                };
573                let hi_sn = to_sn
574                    .map(|to_sn| to_sn.min(metadata.sn))
575                    .unwrap_or(metadata.sn);
576                let ledger =
577                    Self::collect_governance_ledger(ctx, &subject_id, hi_sn)
578                        .await?;
579                replay_ledgers_to_sink_events(
580                    &ledger,
581                    &public_key,
582                    from_sn,
583                    to_sn,
584                    limit,
585                    sink_timestamp,
586                )
587            }
588            SubjectData::Tracker { .. } => {
589                let subject_manager =
590                    ctx.get_child::<SubjectManager>("subject_manager").await?;
591                let requester = format!(
592                    "node_replay_sink_events:{}:{}:{}",
593                    subject_id, from_sn, limit
594                );
595                subject_manager
596                    .ask(SubjectManagerMessage::Up {
597                        subject_id: subject_id.clone(),
598                        requester: requester.clone(),
599                        create_ledger: None,
600                    })
601                    .await?;
602
603                let result = async {
604                    let path = ActorPath::from(format!(
605                        "/user/node/subject_manager/{}",
606                        subject_id
607                    ));
608                    let tracker =
609                        ctx.system().get_actor::<Tracker>(&path).await?;
610                    let response =
611                        tracker.ask(TrackerMessage::GetMetadata).await?;
612                    let TrackerResponse::Metadata(metadata) = response else {
613                        return Err(ActorError::UnexpectedResponse {
614                            path,
615                            expected: "TrackerResponse::Metadata".to_owned(),
616                        });
617                    };
618                    let hi_sn = to_sn
619                        .map(|to_sn| to_sn.min(metadata.sn))
620                        .unwrap_or(metadata.sn);
621                    let ledger =
622                        Self::collect_tracker_ledger(ctx, &subject_id, hi_sn)
623                            .await?;
624                    replay_ledgers_to_sink_events(
625                        &ledger,
626                        &public_key,
627                        from_sn,
628                        to_sn,
629                        limit,
630                        sink_timestamp,
631                    )
632                }
633                .await;
634
635                let _ = subject_manager
636                    .ask(SubjectManagerMessage::Finish {
637                        subject_id,
638                        requester,
639                    })
640                    .await;
641
642                result
643            }
644        }
645    }
646}
647
648/// Node message.
649#[derive(Clone, Debug, Serialize, Deserialize)]
650pub enum NodeMessage {
651    GetGovernances,
652    GetSinkEvents {
653        subject_id: DigestIdentifier,
654        from_sn: u64,
655        to_sn: Option<u64>,
656        limit: u64,
657    },
658    SignRequest(Box<SignTypesNode>),
659    PendingTransfers,
660    RegisterSubject {
661        owner: PublicKey,
662        subject_id: DigestIdentifier,
663        data: SubjectData,
664    },
665    GetSubjectData(DigestIdentifier),
666    GovernanceTrackers(DigestIdentifier),
667    DeleteSubject(DigestIdentifier),
668    IOwnerNewOwnerSubject(DigestIdentifier),
669    ICanSendLastLedger(DigestIdentifier),
670    AuthData(DigestIdentifier),
671    TransferSubject(TransferSubject),
672    RejectTransfer(DigestIdentifier),
673    ConfirmTransfer(DigestIdentifier),
674    EOLSubject {
675        subject_id: DigestIdentifier,
676        i_owner: bool,
677    },
678}
679
680impl Message for NodeMessage {
681    fn is_critical(&self) -> bool {
682        matches!(
683            self,
684            Self::TransferSubject(..)
685                | Self::DeleteSubject(..)
686                | Self::RejectTransfer(..)
687                | Self::ConfirmTransfer(..)
688                | Self::EOLSubject { .. }
689        )
690    }
691}
692
693/// Node response.
694#[derive(Clone, Debug, Serialize, Deserialize)]
695pub enum NodeResponse {
696    Governances(Vec<DigestIdentifier>),
697    SinkEvents(SinkEventsPage),
698    SubjectData(Option<SubjectData>),
699    GovernanceTrackers(Vec<DigestIdentifier>),
700    PendingTransfers(Vec<TransferSubject>),
701    SignRequest(Signature),
702    IOwnerNewOwner {
703        i_owner: bool,
704        i_new_owner: Option<bool>,
705    },
706    AuthData {
707        auth: bool,
708        subject_data: Option<SubjectData>,
709    },
710    Ok,
711}
712
713impl Response for NodeResponse {}
714
715/// Node event.
716#[derive(
717    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
718)]
719pub enum NodeEvent {
720    RegisterSubject {
721        owner: PublicKey,
722        subject_id: DigestIdentifier,
723        data: SubjectData,
724    },
725    DeleteSubject(DigestIdentifier),
726    TransferSubject(TransferSubject),
727    RejectTransfer(DigestIdentifier),
728    ConfirmTransfer(DigestIdentifier),
729    EOLSubject {
730        subject_id: DigestIdentifier,
731        i_owner: bool,
732    },
733}
734
735impl Event for NodeEvent {}
736
737#[async_trait]
738impl Actor for Node {
739    type Event = NodeEvent;
740    type Message = NodeMessage;
741    type Response = NodeResponse;
742
743    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
744        parent_span.map_or_else(
745            || info_span!("Node"),
746            |parent_span| info_span!(parent: parent_span, "Node"),
747        )
748    }
749
750    async fn pre_start(
751        &mut self,
752        ctx: &mut ave_actors::ActorContext<Self>,
753    ) -> Result<(), ActorError> {
754        if let Err(e) = Self::build_compilation_dir(ctx).await {
755            error!(
756                error = %e,
757                "Failed to build compilation directory"
758            );
759            return Err(e);
760        }
761
762        // Start store
763        if let Err(e) = self.init_store("node", None, true, ctx).await {
764            error!(
765                error = %e,
766                "Failed to initialize node store"
767            );
768            return Err(e);
769        }
770
771        let Some(config) =
772            ctx.system().get_helper::<ConfigHelper>("config").await
773        else {
774            error!("Config helper not found");
775            return Err(ActorError::Helper {
776                name: "config".to_string(),
777                reason: "Not found".to_string(),
778            });
779        };
780        let safe_mode = config.safe_mode;
781        let Some(network): Option<Arc<NetworkSender>> =
782            ctx.system().get_helper("network").await
783        else {
784            error!("Network helper not found");
785            return Err(ActorError::Helper {
786                name: "network".to_string(),
787                reason: "Not found".to_string(),
788            });
789        };
790
791        if !safe_mode {
792            let register_actor =
793                match ctx.create_child("register", Register).await {
794                    Ok(actor) => actor,
795                    Err(e) => {
796                        error!(error = %e, "Failed to create register child");
797                        return Err(e);
798                    }
799                };
800
801            let Some(ext_db): Option<Arc<ExternalDB>> =
802                ctx.system().get_helper("ext_db").await
803            else {
804                error!("External DB helper not found");
805                return Err(ActorError::Helper {
806                    name: "ext_db".to_string(),
807                    reason: "Not found".to_string(),
808                });
809            };
810
811            let sink =
812                Sink::new(register_actor.subscribe(), ext_db.get_register());
813            ctx.system().run_sink(sink).await;
814
815            if let Err(e) = ctx
816                .create_child(
817                    "manual_distribution",
818                    ManualDistribution::new(self.our_key.clone()),
819                )
820                .await
821            {
822                error!(
823                    error = %e,
824                    "Failed to create manual_distribution child"
825                );
826                return Err(e);
827            }
828
829            if let Err(e) = self.create_distributors(ctx, &network).await {
830                error!(
831                    error = %e,
832                    "Failed to create distributors"
833                );
834                return Err(e);
835            }
836        }
837
838        let Some(hash) = self.hash else {
839            error!("Hash is None during subject manager startup");
840            return Err(ActorError::FunctionalCritical {
841                description: "Hash is None".to_string(),
842            });
843        };
844
845        let subject_manager = match ctx
846            .create_child(
847                "subject_manager",
848                SubjectManager::new(
849                    self.our_key.clone(),
850                    hash,
851                    self.is_service,
852                ),
853            )
854            .await
855        {
856            Ok(actor) => actor,
857            Err(e) => {
858                error!(error = %e, "Failed to create subject_manager child");
859                return Err(e);
860            }
861        };
862
863        if let Err(e) = subject_manager
864            .ask(SubjectManagerMessage::UpGovernances {
865                governance_ids: self.governance_ids(),
866            })
867            .await
868        {
869            error!(error = %e, "Failed to bootstrap governances");
870            return Err(e);
871        }
872
873        if let Err(e) = ctx
874            .create_child(
875                "auth",
876                Auth::initial((network.clone(), self.our_key.clone())),
877            )
878            .await
879        {
880            error!(
881                error = %e,
882                "Failed to create auth child"
883            );
884            return Err(e);
885        }
886
887        if !safe_mode
888            && let Err(e) = ctx
889                .create_child(
890                    "distributor",
891                    DistriWorker {
892                        our_key: self.our_key.clone(),
893                        network,
894                    },
895                )
896                .await
897        {
898            error!(
899                error = %e,
900                "Failed to create distributor child"
901            );
902            return Err(e);
903        }
904
905        Ok(())
906    }
907}
908
909#[async_trait]
910impl Handler<Self> for Node {
911    async fn handle_message(
912        &mut self,
913        _sender: ActorPath,
914        msg: NodeMessage,
915        ctx: &mut ave_actors::ActorContext<Self>,
916    ) -> Result<NodeResponse, ActorError> {
917        match msg {
918            NodeMessage::GetSinkEvents {
919                subject_id,
920                from_sn,
921                to_sn,
922                limit,
923            } => Ok(NodeResponse::SinkEvents(
924                self.replay_sink_events(ctx, subject_id, from_sn, to_sn, limit)
925                    .await?,
926            )),
927            NodeMessage::RegisterSubject {
928                owner,
929                subject_id,
930                data,
931            } => {
932                let Some(network): Option<Arc<NetworkSender>> =
933                    ctx.system().get_helper("network").await
934                else {
935                    error!(
936                        msg_type = "RegisterSubject",
937                        subject_id = %subject_id,
938                        "Network helper not found"
939                    );
940                    return Err(ActorError::Helper {
941                        name: "network".to_string(),
942                        reason: "Not found".to_string(),
943                    });
944                };
945
946                self.on_event(
947                    NodeEvent::RegisterSubject {
948                        owner,
949                        subject_id: subject_id.clone(),
950                        data,
951                    },
952                    ctx,
953                )
954                .await;
955
956                let distributor_name = format!("distributor_{}", subject_id);
957                if ctx
958                    .get_child::<DistriWorker>(&distributor_name)
959                    .await
960                    .is_err()
961                {
962                    ctx.create_child(
963                        &distributor_name,
964                        DistriWorker {
965                            our_key: self.our_key.clone(),
966                            network,
967                        },
968                    )
969                    .await?;
970                }
971
972                Ok(NodeResponse::Ok)
973            }
974            NodeMessage::EOLSubject {
975                subject_id,
976                i_owner,
977            } => {
978                self.on_event(
979                    NodeEvent::EOLSubject {
980                        subject_id: subject_id.clone(),
981                        i_owner,
982                    },
983                    ctx,
984                )
985                .await;
986
987                debug!(
988                    msg_type = "EOLSubject",
989                    subject_id = %subject_id,
990                    i_owner = %i_owner,
991                    "EOL confirmed"
992                );
993
994                Ok(NodeResponse::Ok)
995            }
996            NodeMessage::GetGovernances => {
997                let mut gov_know = self
998                    .known_subjects
999                    .iter()
1000                    .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1001                    .map(|x| x.0.clone())
1002                    .collect::<Vec<DigestIdentifier>>();
1003                let mut gov_owned = self
1004                    .owned_subjects
1005                    .iter()
1006                    .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1007                    .map(|x| x.0.clone())
1008                    .collect::<Vec<DigestIdentifier>>();
1009                gov_know.append(&mut gov_owned);
1010
1011                return Ok(NodeResponse::Governances(gov_know));
1012            }
1013            NodeMessage::ICanSendLastLedger(subject_id) => {
1014                let subject_data = if self.reject_subjects.contains(&subject_id)
1015                {
1016                    self.known_subjects.get(&subject_id).cloned()
1017                } else {
1018                    self.owned_subjects.get(&subject_id).cloned()
1019                };
1020
1021                Ok(NodeResponse::SubjectData(subject_data))
1022            }
1023            NodeMessage::GetSubjectData(subject_id) => {
1024                let data = self.get_subject_data(&subject_id);
1025                if data.is_none() {
1026                    debug!(
1027                        msg_type = "GetSubjectData",
1028                        subject_id = %subject_id,
1029                        "Subject not found"
1030                    );
1031                }
1032
1033                debug!(
1034                    msg_type = "GetSubjectData",
1035                    subject_id = %subject_id,
1036                    "Subject data retrieved successfully"
1037                );
1038
1039                Ok(NodeResponse::SubjectData(data))
1040            }
1041            NodeMessage::GovernanceTrackers(governance_id) => {
1042                let trackers = self.governance_trackers(&governance_id);
1043
1044                debug!(
1045                    msg_type = "GovernanceTrackers",
1046                    governance_id = %governance_id,
1047                    count = trackers.len(),
1048                    "Governance tracker association check completed"
1049                );
1050
1051                Ok(NodeResponse::GovernanceTrackers(trackers))
1052            }
1053            NodeMessage::DeleteSubject(subject_id) => {
1054                self.on_event(
1055                    NodeEvent::DeleteSubject(subject_id.clone()),
1056                    ctx,
1057                )
1058                .await;
1059
1060                debug!(
1061                    msg_type = "DeleteSubject",
1062                    subject_id = %subject_id,
1063                    "Subject deleted from node state"
1064                );
1065
1066                Ok(NodeResponse::Ok)
1067            }
1068            NodeMessage::PendingTransfers => {
1069                let transfers: Vec<TransferSubject> = self
1070                    .transfer_subjects
1071                    .iter()
1072                    .map(|x| TransferSubject {
1073                        name: x.1.name.clone(),
1074                        subject_id: x.0.clone(),
1075                        new_owner: x.1.new_owner.clone(),
1076                        actual_owner: x.1.actual_owner.clone(),
1077                    })
1078                    .collect();
1079
1080                debug!(
1081                    msg_type = "PendingTransfers",
1082                    count = transfers.len(),
1083                    "Retrieved pending transfers"
1084                );
1085
1086                Ok(NodeResponse::PendingTransfers(transfers))
1087            }
1088            NodeMessage::RejectTransfer(subject_id) => {
1089                self.on_event(
1090                    NodeEvent::RejectTransfer(subject_id.clone()),
1091                    ctx,
1092                )
1093                .await;
1094
1095                debug!(
1096                    msg_type = "RejectTransfer",
1097                    subject_id = %subject_id,
1098                    "Transfer rejected successfully"
1099                );
1100
1101                Ok(NodeResponse::Ok)
1102            }
1103            NodeMessage::TransferSubject(data) => {
1104                let subject_id = data.subject_id.clone();
1105                self.on_event(NodeEvent::TransferSubject(data), ctx).await;
1106
1107                debug!(
1108                    msg_type = "TransferSubject",
1109                    subject_id = %subject_id,
1110                    "Subject transfer registered successfully"
1111                );
1112
1113                Ok(NodeResponse::Ok)
1114            }
1115            NodeMessage::ConfirmTransfer(subject_id) => {
1116                self.on_event(
1117                    NodeEvent::ConfirmTransfer(subject_id.clone()),
1118                    ctx,
1119                )
1120                .await;
1121
1122                debug!(
1123                    msg_type = "ConfirmTransfer",
1124                    subject_id = %subject_id,
1125                    "Transfer confirmed between other parties"
1126                );
1127
1128                Ok(NodeResponse::Ok)
1129            }
1130            NodeMessage::SignRequest(content) => {
1131                let content = *content;
1132                let content_type = match &content {
1133                    SignTypesNode::EventRequest(_) => "EventRequest",
1134                    SignTypesNode::ValidationReq(_) => "ValidationReq",
1135                    SignTypesNode::ValidationRes(_) => "ValidationRes",
1136                    SignTypesNode::EvaluationReq(_) => "EvaluationReq",
1137                    SignTypesNode::EvaluationRes(_) => "EvaluationRes",
1138                    SignTypesNode::ApprovalReq(_) => "ApprovalReq",
1139                    SignTypesNode::ApprovalRes(_) => "ApprovalRes",
1140                    SignTypesNode::Ledger(_) => "Ledger",
1141                };
1142
1143                let sign = match content {
1144                    SignTypesNode::EventRequest(event_req) => {
1145                        self.sign(&event_req)
1146                    }
1147                    SignTypesNode::ValidationReq(validation_req) => {
1148                        self.sign(&*validation_req)
1149                    }
1150                    SignTypesNode::ValidationRes(validation_res) => {
1151                        self.sign(&validation_res)
1152                    }
1153                    SignTypesNode::EvaluationReq(evaluation_req) => {
1154                        self.sign(&evaluation_req)
1155                    }
1156                    SignTypesNode::EvaluationRes(evaluation_res) => {
1157                        self.sign(&evaluation_res)
1158                    }
1159                    SignTypesNode::ApprovalReq(approval_req) => {
1160                        self.sign(&approval_req)
1161                    }
1162                    SignTypesNode::ApprovalRes(approval_res) => {
1163                        self.sign(&*approval_res)
1164                    }
1165                    SignTypesNode::Ledger(ledger) => self.sign(&ledger),
1166                }
1167                .map_err(|e| {
1168                    error!(
1169                        msg_type = "SignRequest",
1170                        content_type = content_type,
1171                        error = %e,
1172                        "Failed to sign content"
1173                    );
1174                    ActorError::FunctionalCritical {
1175                        description: format!("Can not sign event: {}", e),
1176                    }
1177                })?;
1178
1179                debug!(
1180                    msg_type = "SignRequest",
1181                    content_type = content_type,
1182                    "Content signed successfully"
1183                );
1184
1185                Ok(NodeResponse::SignRequest(sign))
1186            }
1187            NodeMessage::IOwnerNewOwnerSubject(subject_id) => {
1188                let i_owner =
1189                    self.owned_subjects.keys().any(|x| *x == subject_id);
1190
1191                let i_new_owner = if let Some(data) =
1192                    self.transfer_subjects.get(&subject_id)
1193                {
1194                    Some(data.new_owner == *self.our_key)
1195                } else {
1196                    None
1197                };
1198
1199                debug!(
1200                    msg_type = "OwnerPendingSubject",
1201                    subject_id = %subject_id,
1202                    i_owner = i_owner,
1203                    i_new_owner = i_new_owner,
1204                    "Checked owner/pending status"
1205                );
1206
1207                Ok(NodeResponse::IOwnerNewOwner {
1208                    i_owner,
1209                    i_new_owner,
1210                })
1211            }
1212            NodeMessage::AuthData(subject_id) => {
1213                let authorized_subjects = match ctx
1214                    .get_child::<Auth>("auth")
1215                    .await
1216                {
1217                    Ok(auth) => {
1218                        let res = match auth.ask(AuthMessage::GetAuths).await {
1219                            Ok(res) => res,
1220                            Err(e) => {
1221                                error!(
1222                                    msg_type = "AuthData",
1223                                    subject_id = %subject_id,
1224                                    error = %e,
1225                                    "Failed to get authorizations from auth actor"
1226                                );
1227                                ctx.system().crash_system();
1228                                return Err(e);
1229                            }
1230                        };
1231                        let AuthResponse::Auths { subjects } = res else {
1232                            error!(
1233                                msg_type = "AuthData",
1234                                subject_id = %subject_id,
1235                                "Unexpected response from auth actor"
1236                            );
1237                            ctx.system().crash_system();
1238                            return Err(ActorError::UnexpectedResponse {
1239                                expected: "AuthResponse::Auths".to_owned(),
1240                                path: ctx.path().clone() / "auth",
1241                            });
1242                        };
1243                        subjects
1244                    }
1245                    Err(e) => {
1246                        error!(
1247                            msg_type = "AuthData",
1248                            subject_id = %subject_id,
1249                            "Auth actor not found"
1250                        );
1251                        ctx.system().crash_system();
1252                        return Err(e);
1253                    }
1254                };
1255
1256                let auth_subj =
1257                    authorized_subjects.iter().any(|x| x.clone() == subject_id);
1258
1259                let subj_data = self
1260                    .known_subjects
1261                    .get(&subject_id)
1262                    .or_else(|| self.owned_subjects.get(&subject_id))
1263                    .cloned();
1264
1265                debug!(
1266                    msg_type = "AuthData",
1267                    subject_id = %subject_id,
1268                    authorized = auth_subj,
1269                    subject_data = ?subj_data,
1270                    "Checked subject authorization status"
1271                );
1272
1273                Ok(NodeResponse::AuthData {
1274                    auth: auth_subj,
1275                    subject_data: subj_data,
1276                })
1277            }
1278        }
1279    }
1280
1281    async fn on_child_fault(
1282        &mut self,
1283        error: ActorError,
1284        ctx: &mut ActorContext<Self>,
1285    ) -> ChildAction {
1286        error!(
1287            error = %error,
1288            "Child actor fault, stopping system"
1289        );
1290        ctx.system().crash_system();
1291        ChildAction::Stop
1292    }
1293
1294    async fn on_event(
1295        &mut self,
1296        event: NodeEvent,
1297        ctx: &mut ActorContext<Self>,
1298    ) {
1299        if let Err(e) = self.persist(&event, ctx).await {
1300            error!(
1301                event = ?event,
1302                error = %e,
1303                "Failed to persist node event"
1304            );
1305            ctx.system().crash_system();
1306        }
1307    }
1308}
1309
1310pub struct InitParamsNode {
1311    pub key_pair: KeyPair,
1312    pub public_key: Arc<PublicKey>,
1313    pub hash: HashAlgorithm,
1314    pub is_service: bool,
1315}
1316
1317#[async_trait]
1318impl PersistentActor for Node {
1319    type Persistence = LightPersistence;
1320    type InitParams = InitParamsNode;
1321
1322    fn update(&mut self, state: Self) {
1323        self.owned_subjects = state.owned_subjects;
1324        self.known_subjects = state.known_subjects;
1325        self.transfer_subjects = state.transfer_subjects;
1326        self.reject_subjects = state.reject_subjects
1327    }
1328
1329    fn create_initial(params: Self::InitParams) -> Self {
1330        Self {
1331            hash: Some(params.hash),
1332            owner: params.key_pair,
1333            our_key: params.public_key,
1334            is_service: params.is_service,
1335            owned_subjects: HashMap::new(),
1336            known_subjects: HashMap::new(),
1337            transfer_subjects: HashMap::new(),
1338            reject_subjects: HashSet::new(),
1339        }
1340    }
1341
1342    /// Change node state.
1343    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1344        match event {
1345            NodeEvent::EOLSubject {
1346                subject_id,
1347                i_owner,
1348            } => {
1349                self.eol(subject_id.clone(), *i_owner);
1350                debug!(
1351                    event_type = "EOLSubject",
1352                    subject_id = %subject_id,
1353                    i_owner = &i_owner,
1354                    "Applied eol"
1355                );
1356            }
1357            NodeEvent::ConfirmTransfer(subject_id) => {
1358                self.confirm_transfer(subject_id.clone());
1359                debug!(
1360                    event_type = "ConfirmTransfer",
1361                    subject_id = %subject_id,
1362                    "Applied transfer confirmation"
1363                );
1364            }
1365            NodeEvent::RegisterSubject {
1366                subject_id,
1367                data,
1368                owner,
1369            } => {
1370                self.register_subject(
1371                    subject_id.clone(),
1372                    owner.clone(),
1373                    data.clone(),
1374                );
1375                debug!(
1376                    event_type = "RegisterSubject",
1377                    subject_id = %subject_id,
1378                    owner = %owner,
1379                    "Applied subject registration"
1380                );
1381            }
1382            NodeEvent::DeleteSubject(subject_id) => {
1383                self.delete_subject(subject_id);
1384                debug!(
1385                    event_type = "DeleteSubject",
1386                    subject_id = %subject_id,
1387                    "Applied subject deletion"
1388                );
1389            }
1390            NodeEvent::RejectTransfer(subject_id) => {
1391                self.delete_transfer(subject_id);
1392                debug!(
1393                    event_type = "RejectTransfer",
1394                    subject_id = %subject_id,
1395                    "Applied transfer rejection"
1396                );
1397            }
1398            NodeEvent::TransferSubject(transfer) => {
1399                self.transfer_subject(transfer.clone());
1400                debug!(
1401                    event_type = "TransferSubject",
1402                    subject_id = %transfer.subject_id,
1403                    new_owner = %transfer.new_owner,
1404                    "Applied subject transfer"
1405                );
1406            }
1407        };
1408
1409        Ok(())
1410    }
1411}
1412
1413#[async_trait]
1414impl Storable for Node {}