Skip to main content

ave_core/node/
mod.rs

1//! Node module
2//!
3
4use std::{
5    collections::{HashMap, HashSet},
6    path::Path,
7    sync::Arc,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use register::Register;
12use tokio::fs;
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16    auth::{Auth, AuthInitParams, AuthMessage, AuthResponse},
17    db::Storable,
18    distribution::worker::DistriWorker,
19    governance::{Governance, GovernanceMessage, GovernanceResponse},
20    helpers::{db::ExternalDB, network::service::NetworkSender},
21    manual_distribution::ManualDistribution,
22    model::{common::node::SignTypesNode, event::Ledger},
23    node::subject_manager::{SubjectManager, SubjectManagerMessage},
24    subject::replay_sink_events as replay_ledgers_to_sink_events,
25    system::ConfigHelper,
26    tracker::{Tracker, TrackerMessage, TrackerResponse},
27};
28
29use ave_common::{
30    SchemaType,
31    identity::{
32        DigestIdentifier, HashAlgorithm, PublicKey, Signature, keys::KeyPair,
33    },
34    response::SinkEventsPage,
35};
36
37use async_trait::async_trait;
38use ave_actors::{
39    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
40    Message, Response, Sink,
41};
42use ave_actors::{LightPersistence, PersistentActor};
43use serde::{Deserialize, Serialize};
44
45pub mod register;
46pub mod subject_manager;
47
48#[derive(
49    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
50)]
51pub struct TransferSubject {
52    pub name: Option<String>,
53    pub subject_id: DigestIdentifier,
54    pub new_owner: PublicKey,
55    pub actual_owner: PublicKey,
56}
57
58#[derive(
59    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
60)]
61pub struct TransferData {
62    pub name: Option<String>,
63    pub new_owner: PublicKey,
64    pub actual_owner: PublicKey,
65}
66
67#[derive(
68    Clone,
69    Debug,
70    Serialize,
71    Deserialize,
72    BorshSerialize,
73    BorshDeserialize,
74    Eq,
75    PartialEq,
76)]
77pub enum SubjectData {
78    Tracker {
79        governance_id: DigestIdentifier,
80        schema_id: SchemaType,
81        namespace: String,
82        active: bool,
83    },
84    Governance {
85        active: bool,
86    },
87}
88
89impl SubjectData {
90    pub fn get_schema_id(&self) -> SchemaType {
91        match self {
92            Self::Tracker { schema_id, .. } => schema_id.clone(),
93            Self::Governance { .. } => SchemaType::Governance,
94        }
95    }
96
97    pub fn get_governance_id(&self) -> Option<DigestIdentifier> {
98        match self {
99            Self::Tracker { governance_id, .. } => Some(governance_id.clone()),
100            Self::Governance { .. } => None,
101        }
102    }
103
104    pub fn get_namespace(&self) -> String {
105        match self {
106            Self::Tracker { namespace, .. } => namespace.clone(),
107            Self::Governance { .. } => String::default(),
108        }
109    }
110
111    pub const fn get_active(&self) -> bool {
112        match self {
113            Self::Tracker { active, .. } => *active,
114            Self::Governance { active } => *active,
115        }
116    }
117
118    pub const fn eol(&mut self) {
119        match self {
120            Self::Tracker { active, .. } => *active = false,
121            Self::Governance { active } => *active = false,
122        };
123    }
124}
125
126/// Node struct.
127#[derive(Debug, Serialize, Deserialize, Clone)]
128pub struct Node {
129    /// Owner of the node.
130    #[serde(skip)]
131    owner: KeyPair,
132    #[serde(skip)]
133    our_key: Arc<PublicKey>,
134    #[serde(skip)]
135    hash: Option<HashAlgorithm>,
136    #[serde(skip)]
137    is_service: bool,
138    #[serde(skip)]
139    only_clear_events: bool,
140    #[serde(skip)]
141    ledger_batch_size: u64,
142    /// The node's owned subjects.
143    owned_subjects: HashMap<DigestIdentifier, SubjectData>,
144    /// The node's known subjects.
145    known_subjects: HashMap<DigestIdentifier, SubjectData>,
146
147    transfer_subjects: HashMap<DigestIdentifier, TransferData>,
148
149    reject_subjects: HashSet<DigestIdentifier>,
150}
151
152// Manual Borsh implementation to skip the 'owner' field
153impl BorshSerialize for Node {
154    fn serialize<W: std::io::Write>(
155        &self,
156        writer: &mut W,
157    ) -> std::io::Result<()> {
158        // Serialize only the fields we want to persist, skipping 'owner'
159        BorshSerialize::serialize(&self.owned_subjects, writer)?;
160        BorshSerialize::serialize(&self.known_subjects, writer)?;
161        BorshSerialize::serialize(&self.transfer_subjects, writer)?;
162        BorshSerialize::serialize(&self.reject_subjects, writer)?;
163        Ok(())
164    }
165}
166
167impl BorshDeserialize for Node {
168    fn deserialize_reader<R: std::io::Read>(
169        reader: &mut R,
170    ) -> std::io::Result<Self> {
171        // Deserialize the persisted fields
172        let owned_subjects =
173            HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
174                reader,
175            )?;
176        let known_subjects =
177            HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
178                reader,
179            )?;
180        let transfer_subjects =
181            HashMap::<DigestIdentifier, TransferData>::deserialize_reader(
182                reader,
183            )?;
184        let reject_subjects =
185            HashSet::<DigestIdentifier>::deserialize_reader(reader)?;
186
187        // Create a default/placeholder KeyPair for 'owner'
188        // This will be replaced by the actual owner during actor initialization
189        let owner = KeyPair::default();
190        let our_key = Arc::new(PublicKey::default());
191        let hash = None;
192
193        Ok(Self {
194            hash,
195            our_key,
196            owner,
197            ledger_batch_size: 100,
198            owned_subjects,
199            known_subjects,
200            transfer_subjects,
201            reject_subjects,
202            is_service: false,
203            only_clear_events: false,
204        })
205    }
206}
207
208impl Node {
209    fn get_subject_data(
210        &self,
211        subject_id: &DigestIdentifier,
212    ) -> Option<SubjectData> {
213        self.owned_subjects
214            .get(subject_id)
215            .or_else(|| self.known_subjects.get(subject_id))
216            .cloned()
217    }
218
219    /// Adds a subject to the node's owned subjects.
220    pub fn transfer_subject(&mut self, data: TransferSubject) {
221        if data.new_owner == *self.our_key {
222            self.reject_subjects.remove(&data.subject_id);
223        }
224
225        self.transfer_subjects.insert(
226            data.subject_id,
227            TransferData {
228                name: data.name,
229                new_owner: data.new_owner,
230                actual_owner: data.actual_owner,
231            },
232        );
233    }
234
235    pub fn delete_transfer(&mut self, subject_id: &DigestIdentifier) {
236        if let Some(data) = self.transfer_subjects.remove(subject_id)
237            && data.new_owner == *self.our_key
238        {
239            self.reject_subjects.insert(subject_id.clone());
240        }
241    }
242
243    pub fn confirm_transfer(&mut self, subject_id: DigestIdentifier) {
244        self.our_key.to_string();
245
246        if let Some(data) = self.transfer_subjects.remove(&subject_id) {
247            if data.actual_owner == *self.our_key {
248                if let Some(data) = self.owned_subjects.remove(&subject_id) {
249                    self.known_subjects.insert(subject_id, data);
250                }
251            } else if data.new_owner == *self.our_key
252                && let Some(data) = self.known_subjects.remove(&subject_id)
253            {
254                self.owned_subjects.insert(subject_id, data);
255            };
256        };
257    }
258
259    pub fn eol(&mut self, subject_id: DigestIdentifier, i_owner: bool) {
260        if i_owner {
261            if let Some(data) = self.owned_subjects.get_mut(&subject_id) {
262                data.eol();
263            }
264        } else if let Some(data) = self.known_subjects.get_mut(&subject_id) {
265            data.eol();
266        }
267    }
268
269    pub fn register_subject(
270        &mut self,
271        subject_id: DigestIdentifier,
272        owner: PublicKey,
273        data: SubjectData,
274    ) {
275        if *self.our_key == owner {
276            self.owned_subjects.insert(subject_id, data);
277        } else {
278            self.known_subjects.insert(subject_id, data);
279        }
280    }
281
282    pub fn delete_subject(&mut self, subject_id: &DigestIdentifier) {
283        self.owned_subjects
284            .remove(subject_id)
285            .or_else(|| self.known_subjects.remove(subject_id));
286
287        self.transfer_subjects.remove(subject_id);
288        self.reject_subjects.remove(subject_id);
289    }
290
291    fn governance_trackers(
292        &self,
293        governance_id: &DigestIdentifier,
294    ) -> Vec<DigestIdentifier> {
295        self.owned_subjects
296            .iter()
297            .chain(self.known_subjects.iter())
298            .filter_map(|(subject_id, data)| match data {
299                SubjectData::Tracker {
300                    governance_id: tracker_governance_id,
301                    ..
302                } if tracker_governance_id == governance_id => {
303                    Some(subject_id.clone())
304                }
305                _ => None,
306            })
307            .collect()
308    }
309
310    fn sign<T: BorshSerialize>(
311        &self,
312        content: &T,
313    ) -> Result<Signature, ActorError> {
314        Signature::new(content, &self.owner).map_err(|e| {
315            ActorError::Functional {
316                description: format!("{}", e),
317            }
318        })
319    }
320
321    async fn build_compilation_dir(
322        ctx: &ActorContext<Self>,
323    ) -> Result<(), ActorError> {
324        let contracts_path = if let Some(config) =
325            ctx.system().get_helper::<ConfigHelper>("config").await
326        {
327            config.contracts_path
328        } else {
329            error!("Config helper not found");
330            return Err(ActorError::Helper {
331                name: "config".to_owned(),
332                reason: "Not found".to_string(),
333            });
334        };
335
336        let dir = contracts_path.join("contracts");
337
338        if !Path::new(&dir).exists() {
339            fs::create_dir_all(&dir).await.map_err(|e| {
340                error!(
341                    error = %e,
342                    path = ?dir,
343                    "Failed to create contracts directory"
344                );
345                ActorError::FunctionalCritical {
346                    description: format!("Can not create contracts dir: {}", e),
347                }
348            })?;
349        }
350        Ok(())
351    }
352
353    async fn create_distributors(
354        &self,
355        ctx: &mut ActorContext<Self>,
356        network: &Arc<NetworkSender>,
357    ) -> Result<(), ActorError> {
358        for subject in self.owned_subjects.keys() {
359            let distributor_name = format!("distributor_{}", subject);
360            if ctx
361                .get_child::<DistriWorker>(&distributor_name)
362                .await
363                .is_err()
364            {
365                ctx.create_child(
366                    &distributor_name,
367                    DistriWorker {
368                        our_key: self.our_key.clone(),
369                        network: network.clone(),
370                        ledger_batch_size: self.ledger_batch_size,
371                    },
372                )
373                .await?;
374            }
375        }
376
377        for subject in self.known_subjects.keys() {
378            let distributor_name = format!("distributor_{}", subject);
379            if ctx
380                .get_child::<DistriWorker>(&distributor_name)
381                .await
382                .is_err()
383            {
384                ctx.create_child(
385                    &distributor_name,
386                    DistriWorker {
387                        our_key: self.our_key.clone(),
388                        network: network.clone(),
389                        ledger_batch_size: self.ledger_batch_size,
390                    },
391                )
392                .await?;
393            }
394        }
395
396        Ok(())
397    }
398
399    fn governance_ids(&self) -> Vec<DigestIdentifier> {
400        let mut governance_ids = self
401            .owned_subjects
402            .iter()
403            .filter(|(_, data)| matches!(data, SubjectData::Governance { .. }))
404            .map(|(subject_id, _)| subject_id.clone())
405            .collect::<HashSet<_>>();
406
407        governance_ids.extend(
408            self.known_subjects
409                .iter()
410                .filter(|(_, data)| {
411                    matches!(data, SubjectData::Governance { .. })
412                })
413                .map(|(subject_id, _)| subject_id.clone()),
414        );
415
416        governance_ids.into_iter().collect()
417    }
418
419    async fn get_tracker_ledger_batch(
420        ctx: &ActorContext<Self>,
421        actor: &ave_actors::ActorRef<Tracker>,
422        lo_sn: Option<u64>,
423        hi_sn: u64,
424    ) -> Result<(Vec<Ledger>, bool), ActorError> {
425        let response = actor
426            .ask(TrackerMessage::GetLedger { lo_sn, hi_sn })
427            .await?;
428
429        match response {
430            TrackerResponse::Ledger { ledger, is_all } => Ok((ledger, is_all)),
431            _ => Err(ActorError::UnexpectedResponse {
432                path: ctx.path().clone() / "subject_manager",
433                expected: "TrackerResponse::Ledger".to_owned(),
434            }),
435        }
436    }
437
438    async fn get_governance_ledger_batch(
439        ctx: &ActorContext<Self>,
440        actor: &ave_actors::ActorRef<Governance>,
441        lo_sn: Option<u64>,
442        hi_sn: u64,
443    ) -> Result<(Vec<Ledger>, bool), ActorError> {
444        let response = actor
445            .ask(GovernanceMessage::GetLedger { lo_sn, hi_sn })
446            .await?;
447
448        match response {
449            GovernanceResponse::Ledger { ledger, is_all } => {
450                Ok((ledger, is_all))
451            }
452            _ => Err(ActorError::UnexpectedResponse {
453                path: ctx.path().clone() / "subject_manager",
454                expected: "GovernanceResponse::Ledger".to_owned(),
455            }),
456        }
457    }
458
459    async fn collect_tracker_ledger(
460        &self,
461        ctx: &ActorContext<Self>,
462        subject_id: &DigestIdentifier,
463        hi_sn: u64,
464    ) -> Result<Vec<Ledger>, ActorError> {
465        let path = ActorPath::from(format!(
466            "/user/node/subject_manager/{}",
467            subject_id
468        ));
469        let tracker = ctx.system().get_actor::<Tracker>(&path).await?;
470        let mut ledger = Vec::new();
471        let mut lo_sn = None;
472        let ledger_batch_size = self.ledger_batch_size;
473
474        loop {
475            let from_sn = lo_sn.map_or(0_u64, |sn: u64| sn.saturating_add(1));
476            let batch_hi_sn = from_sn
477                .saturating_add(ledger_batch_size)
478                .saturating_sub(1)
479                .min(hi_sn);
480            let (mut batch, is_all) = Self::get_tracker_ledger_batch(
481                ctx,
482                &tracker,
483                lo_sn,
484                batch_hi_sn,
485            )
486            .await?;
487            if batch.is_empty() {
488                break;
489            }
490            lo_sn = batch.last().map(|event| event.sn);
491            ledger.append(&mut batch);
492            if is_all {
493                break;
494            }
495        }
496
497        Ok(ledger)
498    }
499
500    async fn collect_governance_ledger(
501        &self,
502        ctx: &ActorContext<Self>,
503        subject_id: &DigestIdentifier,
504        hi_sn: u64,
505    ) -> Result<Vec<Ledger>, ActorError> {
506        let path = ActorPath::from(format!(
507            "/user/node/subject_manager/{}",
508            subject_id
509        ));
510        let governance = ctx.system().get_actor::<Governance>(&path).await?;
511        let mut ledger = Vec::new();
512        let mut lo_sn = None;
513        let ledger_batch_size = self.ledger_batch_size;
514
515        loop {
516            let from_sn = lo_sn.map_or(0_u64, |sn: u64| sn.saturating_add(1));
517            let batch_hi_sn = from_sn
518                .saturating_add(ledger_batch_size)
519                .saturating_sub(1)
520                .min(hi_sn);
521            let (mut batch, is_all) = Self::get_governance_ledger_batch(
522                ctx,
523                &governance,
524                lo_sn,
525                batch_hi_sn,
526            )
527            .await?;
528            if batch.is_empty() {
529                break;
530            }
531            lo_sn = batch.last().map(|event| event.sn);
532            ledger.append(&mut batch);
533            if is_all {
534                break;
535            }
536        }
537
538        Ok(ledger)
539    }
540
541    async fn replay_sink_events(
542        &self,
543        ctx: &ActorContext<Self>,
544        subject_id: DigestIdentifier,
545        from_sn: u64,
546        to_sn: Option<u64>,
547        limit: u64,
548    ) -> Result<SinkEventsPage, ActorError> {
549        if limit == 0 {
550            return Err(ActorError::Functional {
551                description: "Replay limit must be greater than zero"
552                    .to_owned(),
553            });
554        }
555        if let Some(to_sn) = to_sn
556            && from_sn > to_sn
557        {
558            return Err(ActorError::Functional {
559                description: "Replay range requires from_sn <= to_sn"
560                    .to_owned(),
561            });
562        }
563
564        let Some(subject_data) = self
565            .owned_subjects
566            .get(&subject_id)
567            .cloned()
568            .or_else(|| self.known_subjects.get(&subject_id).cloned())
569        else {
570            return Err(ActorError::NotFound {
571                path: ActorPath::from(format!(
572                    "/user/node/subject_manager/{}",
573                    subject_id
574                )),
575            });
576        };
577
578        let sink_timestamp = ave_common::identity::TimeStamp::now().as_nanos();
579        let public_key = self.our_key.to_string();
580
581        match subject_data {
582            SubjectData::Governance { .. } => {
583                let path = ActorPath::from(format!(
584                    "/user/node/subject_manager/{}",
585                    subject_id
586                ));
587                let governance =
588                    ctx.system().get_actor::<Governance>(&path).await?;
589                let response =
590                    governance.ask(GovernanceMessage::GetMetadata).await?;
591                let GovernanceResponse::Metadata(metadata) = response else {
592                    return Err(ActorError::UnexpectedResponse {
593                        path,
594                        expected: "GovernanceResponse::Metadata".to_owned(),
595                    });
596                };
597                let hi_sn = to_sn
598                    .map(|to_sn| to_sn.min(metadata.sn))
599                    .unwrap_or(metadata.sn);
600                let ledger = self
601                    .collect_governance_ledger(ctx, &subject_id, hi_sn)
602                    .await?;
603                replay_ledgers_to_sink_events(
604                    &ledger,
605                    &public_key,
606                    from_sn,
607                    to_sn,
608                    limit,
609                    sink_timestamp,
610                )
611            }
612            SubjectData::Tracker { .. } => {
613                let subject_manager =
614                    ctx.get_child::<SubjectManager>("subject_manager").await?;
615                let requester = format!(
616                    "node_replay_sink_events:{}:{}:{}",
617                    subject_id, from_sn, limit
618                );
619                subject_manager
620                    .ask(SubjectManagerMessage::Up {
621                        subject_id: subject_id.clone(),
622                        requester: requester.clone(),
623                        create_ledger: None,
624                    })
625                    .await?;
626
627                let result = async {
628                    let path = ActorPath::from(format!(
629                        "/user/node/subject_manager/{}",
630                        subject_id
631                    ));
632                    let tracker =
633                        ctx.system().get_actor::<Tracker>(&path).await?;
634                    let response =
635                        tracker.ask(TrackerMessage::GetMetadata).await?;
636                    let TrackerResponse::Metadata(metadata) = response else {
637                        return Err(ActorError::UnexpectedResponse {
638                            path,
639                            expected: "TrackerResponse::Metadata".to_owned(),
640                        });
641                    };
642                    let hi_sn = to_sn
643                        .map(|to_sn| to_sn.min(metadata.sn))
644                        .unwrap_or(metadata.sn);
645                    let ledger = self
646                        .collect_tracker_ledger(ctx, &subject_id, hi_sn)
647                        .await?;
648                    replay_ledgers_to_sink_events(
649                        &ledger,
650                        &public_key,
651                        from_sn,
652                        to_sn,
653                        limit,
654                        sink_timestamp,
655                    )
656                }
657                .await;
658
659                let _ = subject_manager
660                    .ask(SubjectManagerMessage::Finish {
661                        subject_id,
662                        requester,
663                    })
664                    .await;
665
666                result
667            }
668        }
669    }
670}
671
672/// Node message.
673#[derive(Clone, Debug, Serialize, Deserialize)]
674pub enum NodeMessage {
675    GetGovernances,
676    GetSinkEvents {
677        subject_id: DigestIdentifier,
678        from_sn: u64,
679        to_sn: Option<u64>,
680        limit: u64,
681    },
682    SignRequest(Box<SignTypesNode>),
683    PendingTransfers,
684    RegisterSubject {
685        owner: PublicKey,
686        subject_id: DigestIdentifier,
687        data: SubjectData,
688    },
689    GetSubjectData(DigestIdentifier),
690    GovernanceTrackers(DigestIdentifier),
691    DeleteSubject(DigestIdentifier),
692    IOwnerNewOwnerSubject(DigestIdentifier),
693    ICanSendLastLedger(DigestIdentifier),
694    AuthData(DigestIdentifier),
695    TransferSubject(TransferSubject),
696    RejectTransfer(DigestIdentifier),
697    ConfirmTransfer(DigestIdentifier),
698    EOLSubject {
699        subject_id: DigestIdentifier,
700        i_owner: bool,
701    },
702}
703
704impl Message for NodeMessage {
705    fn is_critical(&self) -> bool {
706        matches!(
707            self,
708            Self::TransferSubject(..)
709                | Self::DeleteSubject(..)
710                | Self::RejectTransfer(..)
711                | Self::ConfirmTransfer(..)
712                | Self::EOLSubject { .. }
713        )
714    }
715}
716
717/// Node response.
718#[derive(Clone, Debug, Serialize, Deserialize)]
719pub enum NodeResponse {
720    Governances(Vec<DigestIdentifier>),
721    SinkEvents(SinkEventsPage),
722    SubjectData(Option<SubjectData>),
723    GovernanceTrackers(Vec<DigestIdentifier>),
724    PendingTransfers(Vec<TransferSubject>),
725    SignRequest(Signature),
726    IOwnerNewOwner {
727        i_owner: bool,
728        i_new_owner: Option<bool>,
729    },
730    AuthData {
731        auth: bool,
732        subject_data: Option<SubjectData>,
733    },
734    Ok,
735}
736
737impl Response for NodeResponse {}
738
739/// Node event.
740#[derive(
741    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
742)]
743pub enum NodeEvent {
744    RegisterSubject {
745        owner: PublicKey,
746        subject_id: DigestIdentifier,
747        data: SubjectData,
748    },
749    DeleteSubject(DigestIdentifier),
750    TransferSubject(TransferSubject),
751    RejectTransfer(DigestIdentifier),
752    ConfirmTransfer(DigestIdentifier),
753    EOLSubject {
754        subject_id: DigestIdentifier,
755        i_owner: bool,
756    },
757}
758
759impl Event for NodeEvent {}
760
761#[async_trait]
762impl Actor for Node {
763    type Event = NodeEvent;
764    type Message = NodeMessage;
765    type Response = NodeResponse;
766
767    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
768        parent_span.map_or_else(
769            || info_span!("Node"),
770            |parent_span| info_span!(parent: parent_span, "Node"),
771        )
772    }
773
774    async fn pre_start(
775        &mut self,
776        ctx: &mut ave_actors::ActorContext<Self>,
777    ) -> Result<(), ActorError> {
778        if let Err(e) = Self::build_compilation_dir(ctx).await {
779            error!(
780                error = %e,
781                "Failed to build compilation directory"
782            );
783            return Err(e);
784        }
785
786        // Start store
787        if let Err(e) = self.init_store("node", None, true, ctx).await {
788            error!(
789                error = %e,
790                "Failed to initialize node store"
791            );
792            return Err(e);
793        }
794
795        let Some(config) =
796            ctx.system().get_helper::<ConfigHelper>("config").await
797        else {
798            error!("Config helper not found");
799            return Err(ActorError::Helper {
800                name: "config".to_string(),
801                reason: "Not found".to_string(),
802            });
803        };
804        let safe_mode = config.safe_mode;
805        let Some(network): Option<Arc<NetworkSender>> =
806            ctx.system().get_helper("network").await
807        else {
808            error!("Network helper not found");
809            return Err(ActorError::Helper {
810                name: "network".to_string(),
811                reason: "Not found".to_string(),
812            });
813        };
814
815        if !safe_mode {
816            let register_actor =
817                match ctx.create_child("register", Register).await {
818                    Ok(actor) => actor,
819                    Err(e) => {
820                        error!(error = %e, "Failed to create register child");
821                        return Err(e);
822                    }
823                };
824
825            let Some(ext_db): Option<Arc<ExternalDB>> =
826                ctx.system().get_helper("ext_db").await
827            else {
828                error!("External DB helper not found");
829                return Err(ActorError::Helper {
830                    name: "ext_db".to_string(),
831                    reason: "Not found".to_string(),
832                });
833            };
834
835            let sink =
836                Sink::new(register_actor.subscribe(), ext_db.get_register());
837            ctx.system().run_sink(sink).await;
838
839            if let Err(e) = ctx
840                .create_child(
841                    "manual_distribution",
842                    ManualDistribution::new(self.our_key.clone()),
843                )
844                .await
845            {
846                error!(
847                    error = %e,
848                    "Failed to create manual_distribution child"
849                );
850                return Err(e);
851            }
852
853            if let Err(e) = self.create_distributors(ctx, &network).await {
854                error!(
855                    error = %e,
856                    "Failed to create distributors"
857                );
858                return Err(e);
859            }
860        }
861
862        let Some(hash) = self.hash else {
863            error!("Hash is None during subject manager startup");
864            return Err(ActorError::FunctionalCritical {
865                description: "Hash is None".to_string(),
866            });
867        };
868
869        let subject_manager = match ctx
870            .create_child(
871                "subject_manager",
872                SubjectManager::new(
873                    self.our_key.clone(),
874                    hash,
875                    self.is_service,
876                    self.only_clear_events,
877                ),
878            )
879            .await
880        {
881            Ok(actor) => actor,
882            Err(e) => {
883                error!(error = %e, "Failed to create subject_manager child");
884                return Err(e);
885            }
886        };
887
888        if let Err(e) = subject_manager
889            .ask(SubjectManagerMessage::UpGovernances {
890                governance_ids: self.governance_ids(),
891            })
892            .await
893        {
894            error!(error = %e, "Failed to bootstrap governances");
895            return Err(e);
896        }
897
898        if let Err(e) = ctx
899            .create_child(
900                "auth",
901                Auth::initial(AuthInitParams {
902                    network: network.clone(),
903                    our_key: self.our_key.clone(),
904                    round_retry_interval_secs: config
905                        .sync_update
906                        .round_retry_interval_secs,
907                    max_round_retries: config.sync_update.max_round_retries,
908                    witness_retry_count: config.sync_update.witness_retry_count,
909                    witness_retry_interval_secs: config
910                        .sync_update
911                        .witness_retry_interval_secs,
912                }),
913            )
914            .await
915        {
916            error!(
917                error = %e,
918                "Failed to create auth child"
919            );
920            return Err(e);
921        }
922
923        if !safe_mode
924            && let Err(e) = ctx
925                .create_child(
926                    "distributor",
927                    DistriWorker {
928                        our_key: self.our_key.clone(),
929                        network,
930                        ledger_batch_size: self.ledger_batch_size,
931                    },
932                )
933                .await
934        {
935            error!(
936                error = %e,
937                "Failed to create distributor child"
938            );
939            return Err(e);
940        }
941
942        Ok(())
943    }
944}
945
946#[async_trait]
947impl Handler<Self> for Node {
948    async fn handle_message(
949        &mut self,
950        _sender: ActorPath,
951        msg: NodeMessage,
952        ctx: &mut ave_actors::ActorContext<Self>,
953    ) -> Result<NodeResponse, ActorError> {
954        match msg {
955            NodeMessage::GetSinkEvents {
956                subject_id,
957                from_sn,
958                to_sn,
959                limit,
960            } => Ok(NodeResponse::SinkEvents(
961                self.replay_sink_events(ctx, subject_id, from_sn, to_sn, limit)
962                    .await?,
963            )),
964            NodeMessage::RegisterSubject {
965                owner,
966                subject_id,
967                data,
968            } => {
969                let Some(network): Option<Arc<NetworkSender>> =
970                    ctx.system().get_helper("network").await
971                else {
972                    error!(
973                        msg_type = "RegisterSubject",
974                        subject_id = %subject_id,
975                        "Network helper not found"
976                    );
977                    return Err(ActorError::Helper {
978                        name: "network".to_string(),
979                        reason: "Not found".to_string(),
980                    });
981                };
982
983                self.on_event(
984                    NodeEvent::RegisterSubject {
985                        owner,
986                        subject_id: subject_id.clone(),
987                        data,
988                    },
989                    ctx,
990                )
991                .await;
992
993                let distributor_name = format!("distributor_{}", subject_id);
994                if ctx
995                    .get_child::<DistriWorker>(&distributor_name)
996                    .await
997                    .is_err()
998                {
999                    ctx.create_child(
1000                        &distributor_name,
1001                        DistriWorker {
1002                            our_key: self.our_key.clone(),
1003                            network,
1004                            ledger_batch_size: self.ledger_batch_size,
1005                        },
1006                    )
1007                    .await?;
1008                }
1009
1010                Ok(NodeResponse::Ok)
1011            }
1012            NodeMessage::EOLSubject {
1013                subject_id,
1014                i_owner,
1015            } => {
1016                self.on_event(
1017                    NodeEvent::EOLSubject {
1018                        subject_id: subject_id.clone(),
1019                        i_owner,
1020                    },
1021                    ctx,
1022                )
1023                .await;
1024
1025                debug!(
1026                    msg_type = "EOLSubject",
1027                    subject_id = %subject_id,
1028                    i_owner = %i_owner,
1029                    "EOL confirmed"
1030                );
1031
1032                Ok(NodeResponse::Ok)
1033            }
1034            NodeMessage::GetGovernances => {
1035                let mut gov_know = self
1036                    .known_subjects
1037                    .iter()
1038                    .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1039                    .map(|x| x.0.clone())
1040                    .collect::<Vec<DigestIdentifier>>();
1041                let mut gov_owned = self
1042                    .owned_subjects
1043                    .iter()
1044                    .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
1045                    .map(|x| x.0.clone())
1046                    .collect::<Vec<DigestIdentifier>>();
1047                gov_know.append(&mut gov_owned);
1048
1049                return Ok(NodeResponse::Governances(gov_know));
1050            }
1051            NodeMessage::ICanSendLastLedger(subject_id) => {
1052                let subject_data = if self.reject_subjects.contains(&subject_id)
1053                {
1054                    self.known_subjects.get(&subject_id).cloned()
1055                } else {
1056                    self.owned_subjects.get(&subject_id).cloned()
1057                };
1058
1059                Ok(NodeResponse::SubjectData(subject_data))
1060            }
1061            NodeMessage::GetSubjectData(subject_id) => {
1062                let data = self.get_subject_data(&subject_id);
1063                if data.is_none() {
1064                    debug!(
1065                        msg_type = "GetSubjectData",
1066                        subject_id = %subject_id,
1067                        "Subject not found"
1068                    );
1069                } else {
1070                    debug!(
1071                        msg_type = "GetSubjectData",
1072                        subject_id = %subject_id,
1073                        "Subject data retrieved successfully"
1074                    );
1075                }
1076
1077                Ok(NodeResponse::SubjectData(data))
1078            }
1079            NodeMessage::GovernanceTrackers(governance_id) => {
1080                let trackers = self.governance_trackers(&governance_id);
1081
1082                debug!(
1083                    msg_type = "GovernanceTrackers",
1084                    governance_id = %governance_id,
1085                    count = trackers.len(),
1086                    "Governance tracker association check completed"
1087                );
1088
1089                Ok(NodeResponse::GovernanceTrackers(trackers))
1090            }
1091            NodeMessage::DeleteSubject(subject_id) => {
1092                self.on_event(
1093                    NodeEvent::DeleteSubject(subject_id.clone()),
1094                    ctx,
1095                )
1096                .await;
1097
1098                debug!(
1099                    msg_type = "DeleteSubject",
1100                    subject_id = %subject_id,
1101                    "Subject deleted from node state"
1102                );
1103
1104                Ok(NodeResponse::Ok)
1105            }
1106            NodeMessage::PendingTransfers => {
1107                let transfers: Vec<TransferSubject> = self
1108                    .transfer_subjects
1109                    .iter()
1110                    .map(|x| TransferSubject {
1111                        name: x.1.name.clone(),
1112                        subject_id: x.0.clone(),
1113                        new_owner: x.1.new_owner.clone(),
1114                        actual_owner: x.1.actual_owner.clone(),
1115                    })
1116                    .collect();
1117
1118                debug!(
1119                    msg_type = "PendingTransfers",
1120                    count = transfers.len(),
1121                    "Retrieved pending transfers"
1122                );
1123
1124                Ok(NodeResponse::PendingTransfers(transfers))
1125            }
1126            NodeMessage::RejectTransfer(subject_id) => {
1127                self.on_event(
1128                    NodeEvent::RejectTransfer(subject_id.clone()),
1129                    ctx,
1130                )
1131                .await;
1132
1133                debug!(
1134                    msg_type = "RejectTransfer",
1135                    subject_id = %subject_id,
1136                    "Transfer rejected successfully"
1137                );
1138
1139                Ok(NodeResponse::Ok)
1140            }
1141            NodeMessage::TransferSubject(data) => {
1142                let subject_id = data.subject_id.clone();
1143                self.on_event(NodeEvent::TransferSubject(data), ctx).await;
1144
1145                debug!(
1146                    msg_type = "TransferSubject",
1147                    subject_id = %subject_id,
1148                    "Subject transfer registered successfully"
1149                );
1150
1151                Ok(NodeResponse::Ok)
1152            }
1153            NodeMessage::ConfirmTransfer(subject_id) => {
1154                self.on_event(
1155                    NodeEvent::ConfirmTransfer(subject_id.clone()),
1156                    ctx,
1157                )
1158                .await;
1159
1160                debug!(
1161                    msg_type = "ConfirmTransfer",
1162                    subject_id = %subject_id,
1163                    "Transfer confirmed between other parties"
1164                );
1165
1166                Ok(NodeResponse::Ok)
1167            }
1168            NodeMessage::SignRequest(content) => {
1169                let content = *content;
1170                let content_type = match &content {
1171                    SignTypesNode::EventRequest(_) => "EventRequest",
1172                    SignTypesNode::ValidationReq(_) => "ValidationReq",
1173                    SignTypesNode::ValidationRes(_) => "ValidationRes",
1174                    SignTypesNode::EvaluationReq(_) => "EvaluationReq",
1175                    SignTypesNode::EvaluationSignature(_) => "EvaluationRes",
1176                    SignTypesNode::ApprovalReq(_) => "ApprovalReq",
1177                    SignTypesNode::ApprovalRes(_) => "ApprovalRes",
1178                    SignTypesNode::LedgerSeal(_) => "LedgerSeal",
1179                };
1180
1181                let sign = match content {
1182                    SignTypesNode::EventRequest(event_req) => {
1183                        self.sign(&event_req)
1184                    }
1185                    SignTypesNode::ValidationReq(validation_req) => {
1186                        self.sign(&*validation_req)
1187                    }
1188                    SignTypesNode::ValidationRes(validation_res) => {
1189                        self.sign(&validation_res)
1190                    }
1191                    SignTypesNode::EvaluationReq(evaluation_req) => {
1192                        self.sign(&*evaluation_req)
1193                    }
1194                    SignTypesNode::EvaluationSignature(evaluation_res) => {
1195                        self.sign(&evaluation_res)
1196                    }
1197                    SignTypesNode::ApprovalReq(approval_req) => {
1198                        self.sign(&approval_req)
1199                    }
1200                    SignTypesNode::ApprovalRes(approval_res) => {
1201                        self.sign(&*approval_res)
1202                    }
1203                    SignTypesNode::LedgerSeal(ledger) => self.sign(&ledger),
1204                }
1205                .map_err(|e| {
1206                    error!(
1207                        msg_type = "SignRequest",
1208                        content_type = content_type,
1209                        error = %e,
1210                        "Failed to sign content"
1211                    );
1212                    ActorError::FunctionalCritical {
1213                        description: format!("Can not sign event: {}", e),
1214                    }
1215                })?;
1216
1217                debug!(
1218                    msg_type = "SignRequest",
1219                    content_type = content_type,
1220                    "Content signed successfully"
1221                );
1222
1223                Ok(NodeResponse::SignRequest(sign))
1224            }
1225            NodeMessage::IOwnerNewOwnerSubject(subject_id) => {
1226                let i_owner =
1227                    self.owned_subjects.keys().any(|x| *x == subject_id);
1228
1229                let i_new_owner = if let Some(data) =
1230                    self.transfer_subjects.get(&subject_id)
1231                {
1232                    Some(data.new_owner == *self.our_key)
1233                } else {
1234                    None
1235                };
1236
1237                debug!(
1238                    msg_type = "OwnerPendingSubject",
1239                    subject_id = %subject_id,
1240                    i_owner = i_owner,
1241                    i_new_owner = i_new_owner,
1242                    "Checked owner/pending status"
1243                );
1244
1245                Ok(NodeResponse::IOwnerNewOwner {
1246                    i_owner,
1247                    i_new_owner,
1248                })
1249            }
1250            NodeMessage::AuthData(subject_id) => {
1251                let authorized_subjects = match ctx
1252                    .get_child::<Auth>("auth")
1253                    .await
1254                {
1255                    Ok(auth) => {
1256                        let res = match auth.ask(AuthMessage::GetAuths).await {
1257                            Ok(res) => res,
1258                            Err(e) => {
1259                                error!(
1260                                    msg_type = "AuthData",
1261                                    subject_id = %subject_id,
1262                                    error = %e,
1263                                    "Failed to get authorizations from auth actor"
1264                                );
1265                                ctx.system().crash_system();
1266                                return Err(e);
1267                            }
1268                        };
1269                        let AuthResponse::Auths { subjects } = res else {
1270                            error!(
1271                                msg_type = "AuthData",
1272                                subject_id = %subject_id,
1273                                "Unexpected response from auth actor"
1274                            );
1275                            ctx.system().crash_system();
1276                            return Err(ActorError::UnexpectedResponse {
1277                                expected: "AuthResponse::Auths".to_owned(),
1278                                path: ctx.path().clone() / "auth",
1279                            });
1280                        };
1281                        subjects
1282                    }
1283                    Err(e) => {
1284                        error!(
1285                            msg_type = "AuthData",
1286                            subject_id = %subject_id,
1287                            error = %e,
1288                            "Auth actor not found"
1289                        );
1290                        ctx.system().crash_system();
1291                        return Err(e);
1292                    }
1293                };
1294
1295                let auth_subj =
1296                    authorized_subjects.iter().any(|x| x.clone() == subject_id);
1297
1298                let subj_data = self
1299                    .known_subjects
1300                    .get(&subject_id)
1301                    .or_else(|| self.owned_subjects.get(&subject_id))
1302                    .cloned();
1303
1304                debug!(
1305                    msg_type = "AuthData",
1306                    subject_id = %subject_id,
1307                    authorized = auth_subj,
1308                    subject_data = ?subj_data,
1309                    "Checked subject authorization status"
1310                );
1311
1312                Ok(NodeResponse::AuthData {
1313                    auth: auth_subj,
1314                    subject_data: subj_data,
1315                })
1316            }
1317        }
1318    }
1319
1320    async fn on_child_fault(
1321        &mut self,
1322        error: ActorError,
1323        ctx: &mut ActorContext<Self>,
1324    ) -> ChildAction {
1325        error!(
1326            error = %error,
1327            "Child actor fault, stopping system"
1328        );
1329        ctx.system().crash_system();
1330        ChildAction::Stop
1331    }
1332
1333    async fn on_event(
1334        &mut self,
1335        event: NodeEvent,
1336        ctx: &mut ActorContext<Self>,
1337    ) {
1338        if let Err(e) = self.persist(&event, ctx).await {
1339            error!(
1340                event = ?event,
1341                error = %e,
1342                "Failed to persist node event"
1343            );
1344            ctx.system().crash_system();
1345        }
1346    }
1347}
1348
1349pub struct InitParamsNode {
1350    pub key_pair: KeyPair,
1351    pub public_key: Arc<PublicKey>,
1352    pub hash: HashAlgorithm,
1353    pub is_service: bool,
1354    pub only_clear_events: bool,
1355    pub ledger_batch_size: u64,
1356}
1357
1358#[async_trait]
1359impl PersistentActor for Node {
1360    type Persistence = LightPersistence;
1361    type InitParams = InitParamsNode;
1362
1363    fn update(&mut self, state: Self) {
1364        self.owned_subjects = state.owned_subjects;
1365        self.known_subjects = state.known_subjects;
1366        self.transfer_subjects = state.transfer_subjects;
1367        self.reject_subjects = state.reject_subjects
1368    }
1369
1370    fn create_initial(params: Self::InitParams) -> Self {
1371        Self {
1372            hash: Some(params.hash),
1373            owner: params.key_pair,
1374            our_key: params.public_key,
1375            is_service: params.is_service,
1376            only_clear_events: params.only_clear_events,
1377            ledger_batch_size: params.ledger_batch_size,
1378            owned_subjects: HashMap::new(),
1379            known_subjects: HashMap::new(),
1380            transfer_subjects: HashMap::new(),
1381            reject_subjects: HashSet::new(),
1382        }
1383    }
1384
1385    /// Change node state.
1386    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1387        match event {
1388            NodeEvent::EOLSubject {
1389                subject_id,
1390                i_owner,
1391            } => {
1392                self.eol(subject_id.clone(), *i_owner);
1393                debug!(
1394                    event_type = "EOLSubject",
1395                    subject_id = %subject_id,
1396                    i_owner = &i_owner,
1397                    "Applied eol"
1398                );
1399            }
1400            NodeEvent::ConfirmTransfer(subject_id) => {
1401                self.confirm_transfer(subject_id.clone());
1402                debug!(
1403                    event_type = "ConfirmTransfer",
1404                    subject_id = %subject_id,
1405                    "Applied transfer confirmation"
1406                );
1407            }
1408            NodeEvent::RegisterSubject {
1409                subject_id,
1410                data,
1411                owner,
1412            } => {
1413                self.register_subject(
1414                    subject_id.clone(),
1415                    owner.clone(),
1416                    data.clone(),
1417                );
1418                debug!(
1419                    event_type = "RegisterSubject",
1420                    subject_id = %subject_id,
1421                    owner = %owner,
1422                    "Applied subject registration"
1423                );
1424            }
1425            NodeEvent::DeleteSubject(subject_id) => {
1426                self.delete_subject(subject_id);
1427                debug!(
1428                    event_type = "DeleteSubject",
1429                    subject_id = %subject_id,
1430                    "Applied subject deletion"
1431                );
1432            }
1433            NodeEvent::RejectTransfer(subject_id) => {
1434                self.delete_transfer(subject_id);
1435                debug!(
1436                    event_type = "RejectTransfer",
1437                    subject_id = %subject_id,
1438                    "Applied transfer rejection"
1439                );
1440            }
1441            NodeEvent::TransferSubject(transfer) => {
1442                self.transfer_subject(transfer.clone());
1443                debug!(
1444                    event_type = "TransferSubject",
1445                    subject_id = %transfer.subject_id,
1446                    new_owner = %transfer.new_owner,
1447                    "Applied subject transfer"
1448                );
1449            }
1450        };
1451
1452        Ok(())
1453    }
1454}
1455
1456#[async_trait]
1457impl Storable for Node {}