Skip to main content

ave_core/node/
mod.rs

1//! Node module
2//!
3
4use std::{
5    collections::{HashMap, HashSet},
6    path::Path,
7    sync::Arc,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use register::Register;
12use tokio::fs;
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16    auth::{Auth, AuthMessage, AuthResponse},
17    db::Storable,
18    distribution::worker::DistriWorker,
19    governance::{Governance, GovernanceMessage, data::GovernanceData},
20    helpers::{db::ExternalDB, network::service::NetworkSender},
21    manual_distribution::ManualDistribution,
22    model::{
23        common::node::SignTypesNode,
24        event::{Protocols, ValidationMetadata},
25    },
26    subject::{SignedLedger, SubjectMetadata},
27    system::ConfigHelper,
28    tracker::{InitParamsTracker, Tracker, TrackerInit, TrackerMessage},
29};
30
31use ave_common::{
32    SchemaType,
33    identity::{
34        DigestIdentifier, HashAlgorithm, PublicKey, Signature, keys::KeyPair,
35    },
36};
37
38use async_trait::async_trait;
39use ave_actors::{
40    Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
41    Message, Response, Sink,
42};
43use ave_actors::{LightPersistence, PersistentActor};
44use serde::{Deserialize, Serialize};
45
46pub mod register;
47
48#[derive(
49    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
50)]
51pub struct TransferSubject {
52    pub name: Option<String>,
53    pub subject_id: DigestIdentifier,
54    pub new_owner: PublicKey,
55    pub actual_owner: PublicKey,
56}
57
58#[derive(
59    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
60)]
61pub struct TransferData {
62    pub name: Option<String>,
63    pub new_owner: PublicKey,
64    pub actual_owner: PublicKey,
65}
66
67#[derive(
68    Clone,
69    Debug,
70    Serialize,
71    Deserialize,
72    BorshSerialize,
73    BorshDeserialize,
74    Eq,
75    PartialEq,
76)]
77pub enum SubjectData {
78    Tracker {
79        governance_id: DigestIdentifier,
80        schema_id: SchemaType,
81        namespace: String,
82        active: bool,
83    },
84    Governance {
85        active: bool,
86    },
87}
88
89impl SubjectData {
90    pub fn get_schema_id(&self) -> SchemaType {
91        match self {
92            Self::Tracker { schema_id, .. } => schema_id.clone(),
93            Self::Governance { .. } => SchemaType::Governance,
94        }
95    }
96
97    pub fn get_governance_id(&self) -> Option<DigestIdentifier> {
98        match self {
99            Self::Tracker { governance_id, .. } => Some(governance_id.clone()),
100            Self::Governance { .. } => None,
101        }
102    }
103
104    pub fn get_namespace(&self) -> String {
105        match self {
106            Self::Tracker { namespace, .. } => namespace.clone(),
107            Self::Governance { .. } => String::default(),
108        }
109    }
110
111    pub const fn get_active(&self) -> bool {
112        match self {
113            Self::Tracker { active, .. } => *active,
114            Self::Governance { active } => *active,
115        }
116    }
117
118    pub const fn eol(&mut self) {
119        match self {
120            Self::Tracker { active, .. } => *active = false,
121            Self::Governance { active } => *active = false,
122        };
123    }
124}
125
126/// Node struct.
127#[derive(Debug, Serialize, Deserialize, Clone)]
128pub struct Node {
129    /// Owner of the node.
130    #[serde(skip)]
131    owner: KeyPair,
132    #[serde(skip)]
133    our_key: Arc<PublicKey>,
134    #[serde(skip)]
135    hash: Option<HashAlgorithm>,
136    #[serde(skip)]
137    is_service: bool,
138    /// The node's owned subjects.
139    owned_subjects: HashMap<DigestIdentifier, SubjectData>,
140    /// The node's known subjects.
141    known_subjects: HashMap<DigestIdentifier, SubjectData>,
142
143    transfer_subjects: HashMap<DigestIdentifier, TransferData>,
144
145    reject_subjects: HashSet<DigestIdentifier>,
146}
147
148// Manual Borsh implementation to skip the 'owner' field
149impl BorshSerialize for Node {
150    fn serialize<W: std::io::Write>(
151        &self,
152        writer: &mut W,
153    ) -> std::io::Result<()> {
154        // Serialize only the fields we want to persist, skipping 'owner'
155        BorshSerialize::serialize(&self.owned_subjects, writer)?;
156        BorshSerialize::serialize(&self.known_subjects, writer)?;
157        BorshSerialize::serialize(&self.transfer_subjects, writer)?;
158        BorshSerialize::serialize(&self.reject_subjects, writer)?;
159        Ok(())
160    }
161}
162
163impl BorshDeserialize for Node {
164    fn deserialize_reader<R: std::io::Read>(
165        reader: &mut R,
166    ) -> std::io::Result<Self> {
167        // Deserialize the persisted fields
168        let owned_subjects =
169            HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
170                reader,
171            )?;
172        let known_subjects =
173            HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
174                reader,
175            )?;
176        let transfer_subjects =
177            HashMap::<DigestIdentifier, TransferData>::deserialize_reader(
178                reader,
179            )?;
180        let reject_subjects =
181            HashSet::<DigestIdentifier>::deserialize_reader(reader)?;
182
183        // Create a default/placeholder KeyPair for 'owner'
184        // This will be replaced by the actual owner during actor initialization
185        let owner = KeyPair::default();
186        let our_key = Arc::new(PublicKey::default());
187        let hash = None;
188
189        Ok(Self {
190            hash,
191            our_key,
192            owner,
193            owned_subjects,
194            known_subjects,
195            transfer_subjects,
196            reject_subjects,
197            is_service: false,
198        })
199    }
200}
201
202impl Node {
203    /// Adds a subject to the node's owned subjects.
204    pub fn transfer_subject(&mut self, data: TransferSubject) {
205        if data.new_owner == *self.our_key {
206            self.reject_subjects.remove(&data.subject_id);
207        }
208
209        self.transfer_subjects.insert(
210            data.subject_id,
211            TransferData {
212                name: data.name,
213                new_owner: data.new_owner,
214                actual_owner: data.actual_owner,
215            },
216        );
217    }
218
219    pub fn delete_transfer(&mut self, subject_id: &DigestIdentifier) {
220        if let Some(data) = self.transfer_subjects.remove(subject_id)
221            && data.actual_owner == *self.our_key
222        {
223            self.reject_subjects.insert(subject_id.clone());
224        }
225    }
226
227    pub fn confirm_transfer(&mut self, subject_id: DigestIdentifier) {
228        self.our_key.to_string();
229
230        if let Some(data) = self.transfer_subjects.remove(&subject_id) {
231            if data.actual_owner == *self.our_key {
232                if let Some(data) = self.owned_subjects.remove(&subject_id) {
233                    self.known_subjects.insert(subject_id, data);
234                }
235            } else if data.new_owner == *self.our_key
236                && let Some(data) = self.known_subjects.remove(&subject_id)
237            {
238                self.owned_subjects.insert(subject_id, data);
239            };
240        };
241    }
242
243    pub fn eol(&mut self, subject_id: DigestIdentifier, i_owner: bool) {
244        if i_owner {
245            if let Some(data) = self.owned_subjects.get_mut(&subject_id) {
246                data.eol();
247            }
248        } else if let Some(data) = self.known_subjects.get_mut(&subject_id) {
249            data.eol();
250        }
251    }
252
253    pub fn register_subject(
254        &mut self,
255        subject_id: DigestIdentifier,
256        owner: PublicKey,
257        data: SubjectData,
258    ) {
259        if *self.our_key == owner {
260            self.owned_subjects.insert(subject_id, data);
261        } else {
262            self.known_subjects.insert(subject_id, data);
263        }
264    }
265
266    fn sign<T: BorshSerialize>(
267        &self,
268        content: &T,
269    ) -> Result<Signature, ActorError> {
270        Signature::new(content, &self.owner).map_err(|e| {
271            ActorError::Functional {
272                description: format!("{}", e),
273            }
274        })
275    }
276
277    async fn build_compilation_dir(
278        ctx: &ActorContext<Self>,
279    ) -> Result<(), ActorError> {
280        let contracts_path = if let Some(config) =
281            ctx.system().get_helper::<ConfigHelper>("config").await
282        {
283            config.contracts_path
284        } else {
285            error!("Config helper not found");
286            return Err(ActorError::Helper {
287                name: "config".to_owned(),
288                reason: "Not found".to_string(),
289            });
290        };
291
292        let dir = contracts_path.join("contracts");
293
294        if !Path::new(&dir).exists() {
295            fs::create_dir_all(&dir).await.map_err(|e| {
296                error!(
297                    error = %e,
298                    path = ?dir,
299                    "Failed to create contracts directory"
300                );
301                ActorError::FunctionalCritical {
302                    description: format!("Can not create contracts dir: {}", e),
303                }
304            })?;
305        }
306        Ok(())
307    }
308
309    async fn create_subjects(
310        &self,
311        ctx: &mut ActorContext<Self>,
312        network: &Arc<NetworkSender>,
313    ) -> Result<(), ActorError> {
314        let Some(ext_db): Option<Arc<ExternalDB>> =
315            ctx.system().get_helper("ext_db").await
316        else {
317            error!("External database helper not found");
318            return Err(ActorError::Helper {
319                name: "ext_db".to_string(),
320                reason: "Not found".to_string(),
321            });
322        };
323
324        let Some(hash) = self.hash else {
325            error!("Hash is None during subject creation");
326            return Err(ActorError::FunctionalCritical {
327                description: "Hash is None".to_string(),
328            });
329        };
330
331        for (subject, data) in self.owned_subjects.clone() {
332            if let SubjectData::Governance { .. } = data {
333                let governance_actor = ctx
334                    .create_child(
335                        &subject.to_string(),
336                        Governance::initial((None, self.our_key.clone(), hash)),
337                    )
338                    .await?;
339
340                let sink = Sink::new(
341                    governance_actor.subscribe(),
342                    ext_db.get_subject(),
343                );
344
345                ctx.system().run_sink(sink).await;
346
347                ctx.create_child(
348                    &format!("distributor_{}", subject),
349                    DistriWorker {
350                        our_key: self.our_key.clone(),
351                        network: network.clone(),
352                    },
353                )
354                .await?;
355            } else {
356                let tracker_actor = ctx
357                    .create_child(
358                        &subject.to_string(),
359                        Tracker::initial(InitParamsTracker {
360                            data: None,
361                            hash,
362                            is_service: self.is_service,
363                            public_key: self.our_key.clone(),
364                        }),
365                    )
366                    .await?;
367
368                let sink =
369                    Sink::new(tracker_actor.subscribe(), ext_db.get_subject());
370
371                ctx.system().run_sink(sink).await;
372
373                ctx.create_child(
374                    &format!("distributor_{}", subject),
375                    DistriWorker {
376                        our_key: self.our_key.clone(),
377                        network: network.clone(),
378                    },
379                )
380                .await?;
381            }
382        }
383
384        for (subject, data) in self.known_subjects.clone() {
385            let i_new_owner = self
386                .transfer_subjects
387                .get(&subject)
388                .is_some_and(|transfer| transfer.new_owner == *self.our_key);
389
390            if let SubjectData::Governance { .. } = data {
391                let governance_actor = ctx
392                    .create_child(
393                        &subject.to_string(),
394                        Governance::initial((None, self.our_key.clone(), hash)),
395                    )
396                    .await?;
397
398                let sink = Sink::new(
399                    governance_actor.subscribe(),
400                    ext_db.get_subject(),
401                );
402
403                ctx.system().run_sink(sink).await;
404
405                ctx.create_child(
406                    &format!("distributor_{}", subject),
407                    DistriWorker {
408                        our_key: self.our_key.clone(),
409                        network: network.clone(),
410                    },
411                )
412                .await?;
413            } else if i_new_owner {
414                let tracker_actor = ctx
415                    .create_child(
416                        &subject.to_string(),
417                        Tracker::initial(InitParamsTracker {
418                            data: None,
419                            hash,
420                            is_service: self.is_service,
421                            public_key: self.our_key.clone(),
422                        }),
423                    )
424                    .await?;
425
426                let sink =
427                    Sink::new(tracker_actor.subscribe(), ext_db.get_subject());
428
429                ctx.system().run_sink(sink).await;
430
431                ctx.create_child(
432                    &format!("distributor_{}", subject),
433                    DistriWorker {
434                        our_key: self.our_key.clone(),
435                        network: network.clone(),
436                    },
437                )
438                .await?;
439            }
440        }
441
442        Ok(())
443    }
444}
445
446/// Node message.
447#[derive(Clone, Debug, Serialize, Deserialize)]
448pub enum NodeMessage {
449    GetGovernances,
450    SignRequest(SignTypesNode),
451    PendingTransfers,
452    UpSubject {
453        subject_id: DigestIdentifier,
454        light: bool,
455    },
456    GetSubjectData(DigestIdentifier),
457    CreateNewSubject(SignedLedger),
458    IOwnerNewOwnerSubject(DigestIdentifier),
459    ICanSendLastLedger(DigestIdentifier),
460    AuthData(DigestIdentifier),
461    TransferSubject(TransferSubject),
462    RejectTransfer(DigestIdentifier),
463    ConfirmTransfer(DigestIdentifier),
464    EOLSubject {
465        subject_id: DigestIdentifier,
466        i_owner: bool,
467    },
468}
469
470impl Message for NodeMessage {
471    fn is_critical(&self) -> bool {
472        matches!(
473            self,
474            Self::TransferSubject(..)
475                | Self::RejectTransfer(..)
476                | Self::ConfirmTransfer(..)
477                | Self::EOLSubject { .. }
478        )
479    }
480}
481
482/// Node response.
483#[derive(Clone, Debug, Serialize, Deserialize)]
484pub enum NodeResponse {
485    Governances(Vec<DigestIdentifier>),
486    SubjectData(Option<SubjectData>),
487    PendingTransfers(Vec<TransferSubject>),
488    SignRequest(Signature),
489    IOwnerNewOwner {
490        i_owner: bool,
491        i_new_owner: Option<bool>,
492    },
493    AuthData {
494        auth: bool,
495        subject_data: Option<SubjectData>,
496    },
497    Ok,
498}
499
500impl Response for NodeResponse {}
501
502/// Node event.
503#[derive(
504    Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
505)]
506pub enum NodeEvent {
507    RegisterSubject {
508        owner: PublicKey,
509        subject_id: DigestIdentifier,
510        data: SubjectData,
511    },
512    TransferSubject(TransferSubject),
513    RejectTransfer(DigestIdentifier),
514    ConfirmTransfer(DigestIdentifier),
515    EOLSubject {
516        subject_id: DigestIdentifier,
517        i_owner: bool,
518    },
519}
520
521impl Event for NodeEvent {}
522
523#[async_trait]
524impl Actor for Node {
525    type Event = NodeEvent;
526    type Message = NodeMessage;
527    type Response = NodeResponse;
528
529    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
530        parent_span.map_or_else(
531            || info_span!("Node"),
532            |parent_span| info_span!(parent: parent_span, "Node"),
533        )
534    }
535
536    async fn pre_start(
537        &mut self,
538        ctx: &mut ave_actors::ActorContext<Self>,
539    ) -> Result<(), ActorError> {
540        if let Err(e) = Self::build_compilation_dir(ctx).await {
541            error!(
542                error = %e,
543                "Failed to build compilation directory"
544            );
545            return Err(e);
546        }
547
548        // Start store
549        if let Err(e) = self.init_store("node", None, true, ctx).await {
550            error!(
551                error = %e,
552                "Failed to initialize node store"
553            );
554            return Err(e);
555        }
556
557        let Some(network): Option<Arc<NetworkSender>> =
558            ctx.system().get_helper("network").await
559        else {
560            error!("Network helper not found");
561            return Err(ActorError::Helper {
562                name: "network".to_string(),
563                reason: "Not found".to_string(),
564            });
565        };
566
567        let register_actor = match ctx.create_child("register", Register).await {
568            Ok(actor) => actor,
569            Err(e) => {
570                error!(error = %e, "Failed to create register child");
571                return Err(e);
572            }
573        };
574
575        let Some(ext_db): Option<Arc<ExternalDB>> =
576            ctx.system().get_helper("ext_db").await
577        else {
578            error!("External DB helper not found");
579            return Err(ActorError::Helper {
580                name: "ext_db".to_string(),
581                reason: "Not found".to_string(),
582            });
583        };
584
585        let sink = Sink::new(register_actor.subscribe(), ext_db.get_register());
586        ctx.system().run_sink(sink).await;
587
588        if let Err(e) = ctx
589            .create_child(
590                "manual_distribution",
591                ManualDistribution::new(self.our_key.clone()),
592            )
593            .await
594        {
595            error!(
596                error = %e,
597                "Failed to create manual_distribution child"
598            );
599            return Err(e);
600        }
601
602        if let Err(e) = self.create_subjects(ctx, &network).await {
603            error!(
604                error = %e,
605                "Failed to create subjects"
606            );
607            return Err(e);
608        }
609
610        if let Err(e) = ctx
611            .create_child(
612                "auth",
613                Auth::initial((network.clone(), self.our_key.clone())),
614            )
615            .await
616        {
617            error!(
618                error = %e,
619                "Failed to create auth child"
620            );
621            return Err(e);
622        }
623
624        if let Err(e) = ctx
625            .create_child(
626                "distributor",
627                DistriWorker {
628                    our_key: self.our_key.clone(),
629                    network,
630                },
631            )
632            .await
633        {
634            error!(
635                error = %e,
636                "Failed to create distributor child"
637            );
638            return Err(e);
639        }
640
641        Ok(())
642    }
643}
644
645#[async_trait]
646impl Handler<Self> for Node {
647    async fn handle_message(
648        &mut self,
649        _sender: ActorPath,
650        msg: NodeMessage,
651        ctx: &mut ave_actors::ActorContext<Self>,
652    ) -> Result<NodeResponse, ActorError> {
653        match msg {
654            NodeMessage::EOLSubject {
655                subject_id,
656                i_owner,
657            } => {
658                self.on_event(
659                    NodeEvent::EOLSubject {
660                        subject_id: subject_id.clone(),
661                        i_owner,
662                    },
663                    ctx,
664                )
665                .await;
666
667                debug!(
668                    msg_type = "EOLSubject",
669                    subject_id = %subject_id,
670                    i_owner = %i_owner,
671                    "EOL confirmed"
672                );
673
674                Ok(NodeResponse::Ok)
675            }
676            NodeMessage::GetGovernances => {
677                let mut gov_know = self
678                    .known_subjects
679                    .iter()
680                    .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
681                    .map(|x| x.0.clone())
682                    .collect::<Vec<DigestIdentifier>>();
683                let mut gov_owned = self
684                    .owned_subjects
685                    .iter()
686                    .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
687                    .map(|x| x.0.clone())
688                    .collect::<Vec<DigestIdentifier>>();
689                gov_know.append(&mut gov_owned);
690
691                return Ok(NodeResponse::Governances(gov_know));
692            }
693            NodeMessage::ICanSendLastLedger(subject_id) => {
694                let subject_data = if self.reject_subjects.contains(&subject_id)
695                {
696                    self.known_subjects.get(&subject_id).cloned()
697                } else {
698                    self.owned_subjects.get(&subject_id).cloned()
699                };
700
701                Ok(NodeResponse::SubjectData(subject_data))
702            }
703            NodeMessage::UpSubject { subject_id, light } => {
704                let Some(ext_db): Option<Arc<ExternalDB>> =
705                    ctx.system().get_helper("ext_db").await
706                else {
707                    error!(
708                        msg_type = "UpSubject",
709                        subject_id = %subject_id,
710                        "External database helper not found"
711                    );
712                    return Err(ActorError::Helper {
713                        name: "ext_db".to_string(),
714                        reason: "Not found".to_string(),
715                    });
716                };
717
718                let Some(hash) = self.hash else {
719                    error!(
720                        msg_type = "UpSubject",
721                        subject_id = %subject_id,
722                        "Hash is None"
723                    );
724                    return Err(ActorError::FunctionalCritical {
725                        description: "Hash is None".to_string(),
726                    });
727                };
728
729                let tracker_actor = ctx
730                    .create_child(
731                        &subject_id.to_string(),
732                        Tracker::initial(InitParamsTracker {
733                            data: None,
734                            hash,
735                            is_service: self.is_service,
736                            public_key: self.our_key.clone(),
737                        }),
738                    )
739                    .await?;
740                if !light {
741                    let sink = Sink::new(
742                        tracker_actor.subscribe(),
743                        ext_db.get_subject(),
744                    );
745                    ctx.system().run_sink(sink).await;
746                }
747
748                debug!(
749                    msg_type = "UpSubject",
750                    subject_id = %subject_id,
751                    light = light,
752                    "Subject brought up successfully"
753                );
754
755                Ok(NodeResponse::Ok)
756            }
757            NodeMessage::GetSubjectData(subject_id) => {
758                let data = if let Some(data) =
759                    self.owned_subjects.get(&subject_id)
760                {
761                    Some(data.clone())
762                } else if let Some(data) = self.known_subjects.get(&subject_id)
763                {
764                    Some(data.clone())
765                } else {
766                    debug!(
767                        msg_type = "GetSubjectData",
768                        subject_id = %subject_id,
769                        "Subject not found"
770                    );
771
772                    None
773                };
774
775                debug!(
776                    msg_type = "GetSubjectData",
777                    subject_id = %subject_id,
778                    "Subject data retrieved successfully"
779                );
780
781                Ok(NodeResponse::SubjectData(data))
782            }
783            NodeMessage::PendingTransfers => {
784                let transfers: Vec<TransferSubject> = self
785                    .transfer_subjects
786                    .iter()
787                    .map(|x| TransferSubject {
788                        name: x.1.name.clone(),
789                        subject_id: x.0.clone(),
790                        new_owner: x.1.new_owner.clone(),
791                        actual_owner: x.1.actual_owner.clone(),
792                    })
793                    .collect();
794
795                debug!(
796                    msg_type = "PendingTransfers",
797                    count = transfers.len(),
798                    "Retrieved pending transfers"
799                );
800
801                Ok(NodeResponse::PendingTransfers(transfers))
802            }
803            NodeMessage::RejectTransfer(subject_id) => {
804                self.on_event(
805                    NodeEvent::RejectTransfer(subject_id.clone()),
806                    ctx,
807                )
808                .await;
809
810                debug!(
811                    msg_type = "RejectTransfer",
812                    subject_id = %subject_id,
813                    "Transfer rejected successfully"
814                );
815
816                Ok(NodeResponse::Ok)
817            }
818            NodeMessage::TransferSubject(data) => {
819                let subject_id = data.subject_id.clone();
820                self.on_event(NodeEvent::TransferSubject(data), ctx).await;
821
822                debug!(
823                    msg_type = "TransferSubject",
824                    subject_id = %subject_id,
825                    "Subject transfer registered successfully"
826                );
827
828                Ok(NodeResponse::Ok)
829            }
830            NodeMessage::ConfirmTransfer(subject_id) => {
831                self.on_event(
832                    NodeEvent::ConfirmTransfer(subject_id.clone()),
833                    ctx,
834                )
835                .await;
836
837                debug!(
838                    msg_type = "ConfirmTransfer",
839                    subject_id = %subject_id,
840                    "Transfer confirmed between other parties"
841                );
842
843                Ok(NodeResponse::Ok)
844            }
845            NodeMessage::CreateNewSubject(ledger) => {
846                let Some(ext_db): Option<Arc<ExternalDB>> =
847                    ctx.system().get_helper("ext_db").await
848                else {
849                    error!(
850                        msg_type = "CreateNewSubject",
851                        "External database helper not found"
852                    );
853                    return Err(ActorError::Helper {
854                        name: "ext_db".to_string(),
855                        reason: "Not found".to_string(),
856                    });
857                };
858
859                let Some(network): Option<Arc<NetworkSender>> =
860                    ctx.system().get_helper("network").await
861                else {
862                    error!(
863                        msg_type = "CreateNewSubject",
864                        "Network helper not found"
865                    );
866                    return Err(ActorError::Helper {
867                        name: "network".to_string(),
868                        reason: "Not found".to_string(),
869                    });
870                };
871
872                let Some(hash) = self.hash else {
873                    error!(msg_type = "CreateNewSubject", "Hash is None");
874                    return Err(ActorError::FunctionalCritical {
875                        description: "Hash is None".to_string(),
876                    });
877                };
878
879                let metadata = match &ledger.content().protocols {
880                    Protocols::Create { validation } => {
881                        if let ValidationMetadata::Metadata(metadata) =
882                            &validation.validation_metadata
883                        {
884                            metadata.clone()
885                        } else {
886                            error!(
887                                msg_type = "CreateNewSubject",
888                                "ValidationMetadata must be Metadata in Create event"
889                            );
890                            return Err(ActorError::Functional { description: "In Create event ValidationMetadata must be Metadata".to_string() });
891                        }
892                    }
893                    _ => {
894                        error!(
895                            msg_type = "CreateNewSubject",
896                            "Event must be a Create event"
897                        );
898                        return Err(ActorError::Functional {
899                            description: "Event must be a Create event"
900                                .to_string(),
901                        });
902                    }
903                };
904
905                let subject_id = metadata.subject_id.to_string();
906
907                let subject_data = if metadata.schema_id.is_gov() {
908                    let subject_metadata = SubjectMetadata::new(&metadata);
909                    let governance_data =
910                        serde_json::from_value::<GovernanceData>(
911                            metadata.properties.0,
912                        )
913                        .map_err(|e| {
914                            error!(
915                                msg_type = "CreateNewSubject",
916                                subject_id = %subject_id,
917                                error = %e,
918                                "Governance properties must be GovernanceData"
919                            );
920                            ActorError::Functional { description: format!("In governance properties must be a GovernanceData: {e}")}
921                        })?;
922
923                    let governance = Governance::initial((
924                        Some((subject_metadata, governance_data)),
925                        self.our_key.clone(),
926                        hash,
927                    ));
928
929                    let governance_actor =
930                        ctx.create_child(&subject_id, governance).await?;
931
932                    let sink = Sink::new(
933                        governance_actor.subscribe(),
934                        ext_db.get_subject(),
935                    );
936                    ctx.system().run_sink(sink).await;
937
938                    if let Err(e) = governance_actor
939                        .ask(GovernanceMessage::UpdateLedger {
940                            events: vec![ledger.clone()],
941                        })
942                        .await
943                    {
944                        error!(
945                            msg_type = "CreateNewSubject",
946                            subject_id = %subject_id,
947                            error = %e,
948                            "Failed to update governance ledger"
949                        );
950                        governance_actor.tell_stop().await;
951                        return Err(e);
952                    };
953
954                    debug!(
955                        msg_type = "CreateNewSubject",
956                        subject_id = %subject_id,
957                        "Governance subject created successfully"
958                    );
959
960                    SubjectData::Governance { active: true }
961                } else {
962                    let tracker_init = TrackerInit::from(&*metadata);
963
964                    let tracker = Tracker::initial(InitParamsTracker {
965                        data: Some(tracker_init),
966                        hash,
967                        is_service: self.is_service,
968                        public_key: self.our_key.clone(),
969                    });
970
971                    let tracker_actor =
972                        ctx.create_child(&subject_id, tracker).await?;
973
974                    let sink = Sink::new(
975                        tracker_actor.subscribe(),
976                        ext_db.get_subject(),
977                    );
978                    ctx.system().run_sink(sink).await;
979
980                    if let Err(e) = tracker_actor
981                        .ask(TrackerMessage::UpdateLedger {
982                            events: vec![ledger.clone()],
983                        })
984                        .await
985                    {
986                        error!(
987                            msg_type = "CreateNewSubject",
988                            subject_id = %subject_id,
989                            error = %e,
990                            "Failed to update tracker ledger"
991                        );
992                        tracker_actor.tell_stop().await;
993                        return Err(e);
994                    };
995
996                    debug!(
997                        msg_type = "CreateNewSubject",
998                        subject_id = %subject_id,
999                        governance_id = %metadata.governance_id,
1000                        "Tracker subject created successfully"
1001                    );
1002
1003                    SubjectData::Tracker {
1004                        governance_id: metadata.governance_id.clone(),
1005                        schema_id: metadata.schema_id.clone(),
1006                        namespace: metadata.namespace.to_string(),
1007                        active: true,
1008                    }
1009                };
1010
1011                self.on_event(
1012                    NodeEvent::RegisterSubject {
1013                        subject_id: metadata.subject_id.clone(),
1014                        owner: metadata.owner.clone(),
1015                        data: subject_data,
1016                    },
1017                    ctx,
1018                )
1019                .await;
1020
1021                ctx.create_child(
1022                    &format!("distributor_{}", subject_id),
1023                    DistriWorker {
1024                        our_key: self.our_key.clone(),
1025                        network,
1026                    },
1027                )
1028                .await?;
1029
1030                debug!(
1031                    msg_type = "CreateNewSubject",
1032                    subject_id = %subject_id,
1033                    "New subject and distributor created successfully"
1034                );
1035
1036                Ok(NodeResponse::Ok)
1037            }
1038            NodeMessage::SignRequest(content) => {
1039                let content_type = match &content {
1040                    SignTypesNode::EventRequest(_) => "EventRequest",
1041                    SignTypesNode::ValidationReq(_) => "ValidationReq",
1042                    SignTypesNode::ValidationRes(_) => "ValidationRes",
1043                    SignTypesNode::EvaluationReq(_) => "EvaluationReq",
1044                    SignTypesNode::EvaluationRes(_) => "EvaluationRes",
1045                    SignTypesNode::ApprovalReq(_) => "ApprovalReq",
1046                    SignTypesNode::ApprovalRes(_) => "ApprovalRes",
1047                    SignTypesNode::Ledger(_) => "Ledger",
1048                };
1049
1050                let sign = match content {
1051                    SignTypesNode::EventRequest(event_req) => {
1052                        self.sign(&event_req)
1053                    }
1054                    SignTypesNode::ValidationReq(validation_req) => {
1055                        self.sign(&*validation_req)
1056                    }
1057                    SignTypesNode::ValidationRes(validation_res) => {
1058                        self.sign(&validation_res)
1059                    }
1060                    SignTypesNode::EvaluationReq(evaluation_req) => {
1061                        self.sign(&evaluation_req)
1062                    }
1063                    SignTypesNode::EvaluationRes(evaluation_res) => {
1064                        self.sign(&evaluation_res)
1065                    }
1066                    SignTypesNode::ApprovalReq(approval_req) => {
1067                        self.sign(&approval_req)
1068                    }
1069                    SignTypesNode::ApprovalRes(approval_res) => {
1070                        self.sign(&*approval_res)
1071                    }
1072                    SignTypesNode::Ledger(ledger) => self.sign(&ledger),
1073                }
1074                .map_err(|e| {
1075                    error!(
1076                        msg_type = "SignRequest",
1077                        content_type = content_type,
1078                        error = %e,
1079                        "Failed to sign content"
1080                    );
1081                    ActorError::FunctionalCritical {
1082                        description: format!("Can not sign event: {}", e),
1083                    }
1084                })?;
1085
1086                debug!(
1087                    msg_type = "SignRequest",
1088                    content_type = content_type,
1089                    "Content signed successfully"
1090                );
1091
1092                Ok(NodeResponse::SignRequest(sign))
1093            }
1094            NodeMessage::IOwnerNewOwnerSubject(subject_id) => {
1095                let i_owner =
1096                    self.owned_subjects.keys().any(|x| *x == subject_id);
1097
1098                let i_new_owner = if let Some(data) =
1099                    self.transfer_subjects.get(&subject_id)
1100                {
1101                    Some(data.new_owner == *self.our_key)
1102                } else {
1103                    None
1104                };
1105
1106                debug!(
1107                    msg_type = "OwnerPendingSubject",
1108                    subject_id = %subject_id,
1109                    i_owner = i_owner,
1110                    i_new_owner = i_new_owner,
1111                    "Checked owner/pending status"
1112                );
1113
1114                Ok(NodeResponse::IOwnerNewOwner {
1115                    i_owner,
1116                    i_new_owner,
1117                })
1118            }
1119            NodeMessage::AuthData(subject_id) => {
1120                let authorized_subjects = match ctx
1121                    .get_child::<Auth>("auth")
1122                    .await
1123                {
1124                    Ok(auth) => {
1125                        let res = match auth.ask(AuthMessage::GetAuths).await {
1126                            Ok(res) => res,
1127                            Err(e) => {
1128                                error!(
1129                                    msg_type = "AuthData",
1130                                    subject_id = %subject_id,
1131                                    error = %e,
1132                                    "Failed to get authorizations from auth actor"
1133                                );
1134                                ctx.system().crash_system();
1135                                return Err(e);
1136                            }
1137                        };
1138                        let AuthResponse::Auths { subjects } = res else {
1139                            error!(
1140                                msg_type = "AuthData",
1141                                subject_id = %subject_id,
1142                                "Unexpected response from auth actor"
1143                            );
1144                            ctx.system().crash_system();
1145                            return Err(ActorError::UnexpectedResponse {
1146                                expected: "AuthResponse::Auths".to_owned(),
1147                                path: ctx.path().clone() / "auth",
1148                            });
1149                        };
1150                        subjects
1151                    }
1152                    Err(e) => {
1153                        error!(
1154                            msg_type = "AuthData",
1155                            subject_id = %subject_id,
1156                            "Auth actor not found"
1157                        );
1158                        ctx.system().crash_system();
1159                        return Err(e);
1160                    }
1161                };
1162
1163                let auth_subj =
1164                    authorized_subjects.iter().any(|x| x.clone() == subject_id);
1165
1166                let subj_data = self
1167                    .known_subjects
1168                    .get(&subject_id)
1169                    .or_else(|| self.owned_subjects.get(&subject_id))
1170                    .cloned();
1171
1172                debug!(
1173                    msg_type = "AuthData",
1174                    subject_id = %subject_id,
1175                    authorized = auth_subj,
1176                    subject_data = ?subj_data,
1177                    "Checked subject authorization status"
1178                );
1179
1180                Ok(NodeResponse::AuthData {
1181                    auth: auth_subj,
1182                    subject_data: subj_data,
1183                })
1184            }
1185        }
1186    }
1187
1188    async fn on_child_fault(
1189        &mut self,
1190        error: ActorError,
1191        ctx: &mut ActorContext<Self>,
1192    ) -> ChildAction {
1193        error!(
1194            error = %error,
1195            "Child actor fault, stopping system"
1196        );
1197        ctx.system().crash_system();
1198        ChildAction::Stop
1199    }
1200
1201    async fn on_event(
1202        &mut self,
1203        event: NodeEvent,
1204        ctx: &mut ActorContext<Self>,
1205    ) {
1206        if let Err(e) = self.persist(&event, ctx).await {
1207            error!(
1208                event = ?event,
1209                error = %e,
1210                "Failed to persist node event"
1211            );
1212            ctx.system().crash_system();
1213        }
1214    }
1215}
1216
1217pub struct InitParamsNode {
1218    pub key_pair: KeyPair,
1219    pub public_key: Arc<PublicKey>,
1220    pub hash: HashAlgorithm,
1221    pub is_service: bool,
1222}
1223
1224#[async_trait]
1225impl PersistentActor for Node {
1226    type Persistence = LightPersistence;
1227    type InitParams = InitParamsNode;
1228
1229    fn update(&mut self, state: Self) {
1230        self.owned_subjects = state.owned_subjects;
1231        self.known_subjects = state.known_subjects;
1232        self.transfer_subjects = state.transfer_subjects;
1233        self.reject_subjects = state.reject_subjects
1234    }
1235
1236    fn create_initial(params: Self::InitParams) -> Self {
1237        Self {
1238            hash: Some(params.hash),
1239            owner: params.key_pair,
1240            our_key: params.public_key,
1241            is_service: params.is_service,
1242            owned_subjects: HashMap::new(),
1243            known_subjects: HashMap::new(),
1244            transfer_subjects: HashMap::new(),
1245            reject_subjects: HashSet::new(),
1246        }
1247    }
1248
1249    /// Change node state.
1250    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1251        match event {
1252            NodeEvent::EOLSubject {
1253                subject_id,
1254                i_owner,
1255            } => {
1256                self.eol(subject_id.clone(), *i_owner);
1257                debug!(
1258                    event_type = "EOLSubject",
1259                    subject_id = %subject_id,
1260                    i_owner = &i_owner,
1261                    "Applied eol"
1262                );
1263            }
1264            NodeEvent::ConfirmTransfer(subject_id) => {
1265                self.confirm_transfer(subject_id.clone());
1266                debug!(
1267                    event_type = "ConfirmTransfer",
1268                    subject_id = %subject_id,
1269                    "Applied transfer confirmation"
1270                );
1271            }
1272            NodeEvent::RegisterSubject {
1273                subject_id,
1274                data,
1275                owner,
1276            } => {
1277                self.register_subject(
1278                    subject_id.clone(),
1279                    owner.clone(),
1280                    data.clone(),
1281                );
1282                debug!(
1283                    event_type = "RegisterSubject",
1284                    subject_id = %subject_id,
1285                    owner = %owner,
1286                    "Applied subject registration"
1287                );
1288            }
1289            NodeEvent::RejectTransfer(subject_id) => {
1290                self.delete_transfer(subject_id);
1291                debug!(
1292                    event_type = "RejectTransfer",
1293                    subject_id = %subject_id,
1294                    "Applied transfer rejection"
1295                );
1296            }
1297            NodeEvent::TransferSubject(transfer) => {
1298                self.transfer_subject(transfer.clone());
1299                debug!(
1300                    event_type = "TransferSubject",
1301                    subject_id = %transfer.subject_id,
1302                    new_owner = %transfer.new_owner,
1303                    "Applied subject transfer"
1304                );
1305            }
1306        };
1307
1308        Ok(())
1309    }
1310}
1311
1312#[async_trait]
1313impl Storable for Node {}
1314
1315#[cfg(test)]
1316pub mod tests {}