1use std::{
5 collections::{HashMap, HashSet},
6 path::Path,
7 sync::Arc,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use register::Register;
12use tokio::fs;
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16 auth::{Auth, AuthMessage, AuthResponse},
17 db::Storable,
18 distribution::worker::DistriWorker,
19 governance::{Governance, GovernanceMessage, data::GovernanceData},
20 helpers::{db::ExternalDB, network::service::NetworkSender},
21 manual_distribution::ManualDistribution,
22 model::{
23 common::node::SignTypesNode,
24 event::{Protocols, ValidationMetadata},
25 },
26 subject::{SignedLedger, SubjectMetadata},
27 system::ConfigHelper,
28 tracker::{InitParamsTracker, Tracker, TrackerInit, TrackerMessage},
29};
30
31use ave_common::{
32 SchemaType,
33 identity::{
34 DigestIdentifier, HashAlgorithm, PublicKey, Signature, keys::KeyPair,
35 },
36};
37
38use async_trait::async_trait;
39use ave_actors::{
40 Actor, ActorContext, ActorError, ActorPath, ChildAction, Event, Handler,
41 Message, Response, Sink,
42};
43use ave_actors::{LightPersistence, PersistentActor};
44use serde::{Deserialize, Serialize};
45
46pub mod register;
47
48#[derive(
49 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
50)]
51pub struct TransferSubject {
52 pub name: Option<String>,
53 pub subject_id: DigestIdentifier,
54 pub new_owner: PublicKey,
55 pub actual_owner: PublicKey,
56}
57
58#[derive(
59 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
60)]
61pub struct TransferData {
62 pub name: Option<String>,
63 pub new_owner: PublicKey,
64 pub actual_owner: PublicKey,
65}
66
67#[derive(
68 Clone,
69 Debug,
70 Serialize,
71 Deserialize,
72 BorshSerialize,
73 BorshDeserialize,
74 Eq,
75 PartialEq,
76)]
77pub enum SubjectData {
78 Tracker {
79 governance_id: DigestIdentifier,
80 schema_id: SchemaType,
81 namespace: String,
82 active: bool,
83 },
84 Governance {
85 active: bool,
86 },
87}
88
89impl SubjectData {
90 pub fn get_schema_id(&self) -> SchemaType {
91 match self {
92 Self::Tracker { schema_id, .. } => schema_id.clone(),
93 Self::Governance { .. } => SchemaType::Governance,
94 }
95 }
96
97 pub fn get_governance_id(&self) -> Option<DigestIdentifier> {
98 match self {
99 Self::Tracker { governance_id, .. } => Some(governance_id.clone()),
100 Self::Governance { .. } => None,
101 }
102 }
103
104 pub fn get_namespace(&self) -> String {
105 match self {
106 Self::Tracker { namespace, .. } => namespace.clone(),
107 Self::Governance { .. } => String::default(),
108 }
109 }
110
111 pub const fn get_active(&self) -> bool {
112 match self {
113 Self::Tracker { active, .. } => *active,
114 Self::Governance { active } => *active,
115 }
116 }
117
118 pub const fn eol(&mut self) {
119 match self {
120 Self::Tracker { active, .. } => *active = false,
121 Self::Governance { active } => *active = false,
122 };
123 }
124}
125
126#[derive(Debug, Serialize, Deserialize, Clone)]
128pub struct Node {
129 #[serde(skip)]
131 owner: KeyPair,
132 #[serde(skip)]
133 our_key: Arc<PublicKey>,
134 #[serde(skip)]
135 hash: Option<HashAlgorithm>,
136 #[serde(skip)]
137 is_service: bool,
138 owned_subjects: HashMap<DigestIdentifier, SubjectData>,
140 known_subjects: HashMap<DigestIdentifier, SubjectData>,
142
143 transfer_subjects: HashMap<DigestIdentifier, TransferData>,
144
145 reject_subjects: HashSet<DigestIdentifier>,
146}
147
148impl BorshSerialize for Node {
150 fn serialize<W: std::io::Write>(
151 &self,
152 writer: &mut W,
153 ) -> std::io::Result<()> {
154 BorshSerialize::serialize(&self.owned_subjects, writer)?;
156 BorshSerialize::serialize(&self.known_subjects, writer)?;
157 BorshSerialize::serialize(&self.transfer_subjects, writer)?;
158 BorshSerialize::serialize(&self.reject_subjects, writer)?;
159 Ok(())
160 }
161}
162
163impl BorshDeserialize for Node {
164 fn deserialize_reader<R: std::io::Read>(
165 reader: &mut R,
166 ) -> std::io::Result<Self> {
167 let owned_subjects =
169 HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
170 reader,
171 )?;
172 let known_subjects =
173 HashMap::<DigestIdentifier, SubjectData>::deserialize_reader(
174 reader,
175 )?;
176 let transfer_subjects =
177 HashMap::<DigestIdentifier, TransferData>::deserialize_reader(
178 reader,
179 )?;
180 let reject_subjects =
181 HashSet::<DigestIdentifier>::deserialize_reader(reader)?;
182
183 let owner = KeyPair::default();
186 let our_key = Arc::new(PublicKey::default());
187 let hash = None;
188
189 Ok(Self {
190 hash,
191 our_key,
192 owner,
193 owned_subjects,
194 known_subjects,
195 transfer_subjects,
196 reject_subjects,
197 is_service: false,
198 })
199 }
200}
201
202impl Node {
203 pub fn transfer_subject(&mut self, data: TransferSubject) {
205 if data.new_owner == *self.our_key {
206 self.reject_subjects.remove(&data.subject_id);
207 }
208
209 self.transfer_subjects.insert(
210 data.subject_id,
211 TransferData {
212 name: data.name,
213 new_owner: data.new_owner,
214 actual_owner: data.actual_owner,
215 },
216 );
217 }
218
219 pub fn delete_transfer(&mut self, subject_id: &DigestIdentifier) {
220 if let Some(data) = self.transfer_subjects.remove(subject_id)
221 && data.actual_owner == *self.our_key
222 {
223 self.reject_subjects.insert(subject_id.clone());
224 }
225 }
226
227 pub fn confirm_transfer(&mut self, subject_id: DigestIdentifier) {
228 self.our_key.to_string();
229
230 if let Some(data) = self.transfer_subjects.remove(&subject_id) {
231 if data.actual_owner == *self.our_key {
232 if let Some(data) = self.owned_subjects.remove(&subject_id) {
233 self.known_subjects.insert(subject_id, data);
234 }
235 } else if data.new_owner == *self.our_key
236 && let Some(data) = self.known_subjects.remove(&subject_id)
237 {
238 self.owned_subjects.insert(subject_id, data);
239 };
240 };
241 }
242
243 pub fn eol(&mut self, subject_id: DigestIdentifier, i_owner: bool) {
244 if i_owner {
245 if let Some(data) = self.owned_subjects.get_mut(&subject_id) {
246 data.eol();
247 }
248 } else if let Some(data) = self.known_subjects.get_mut(&subject_id) {
249 data.eol();
250 }
251 }
252
253 pub fn register_subject(
254 &mut self,
255 subject_id: DigestIdentifier,
256 owner: PublicKey,
257 data: SubjectData,
258 ) {
259 if *self.our_key == owner {
260 self.owned_subjects.insert(subject_id, data);
261 } else {
262 self.known_subjects.insert(subject_id, data);
263 }
264 }
265
266 fn sign<T: BorshSerialize>(
267 &self,
268 content: &T,
269 ) -> Result<Signature, ActorError> {
270 Signature::new(content, &self.owner).map_err(|e| {
271 ActorError::Functional {
272 description: format!("{}", e),
273 }
274 })
275 }
276
277 async fn build_compilation_dir(
278 ctx: &ActorContext<Self>,
279 ) -> Result<(), ActorError> {
280 let contracts_path = if let Some(config) =
281 ctx.system().get_helper::<ConfigHelper>("config").await
282 {
283 config.contracts_path
284 } else {
285 error!("Config helper not found");
286 return Err(ActorError::Helper {
287 name: "config".to_owned(),
288 reason: "Not found".to_string(),
289 });
290 };
291
292 let dir = contracts_path.join("contracts");
293
294 if !Path::new(&dir).exists() {
295 fs::create_dir_all(&dir).await.map_err(|e| {
296 error!(
297 error = %e,
298 path = ?dir,
299 "Failed to create contracts directory"
300 );
301 ActorError::FunctionalCritical {
302 description: format!("Can not create contracts dir: {}", e),
303 }
304 })?;
305 }
306 Ok(())
307 }
308
309 async fn create_subjects(
310 &self,
311 ctx: &mut ActorContext<Self>,
312 network: &Arc<NetworkSender>,
313 ) -> Result<(), ActorError> {
314 let Some(ext_db): Option<Arc<ExternalDB>> =
315 ctx.system().get_helper("ext_db").await
316 else {
317 error!("External database helper not found");
318 return Err(ActorError::Helper {
319 name: "ext_db".to_string(),
320 reason: "Not found".to_string(),
321 });
322 };
323
324 let Some(hash) = self.hash else {
325 error!("Hash is None during subject creation");
326 return Err(ActorError::FunctionalCritical {
327 description: "Hash is None".to_string(),
328 });
329 };
330
331 for (subject, data) in self.owned_subjects.clone() {
332 if let SubjectData::Governance { .. } = data {
333 let governance_actor = ctx
334 .create_child(
335 &subject.to_string(),
336 Governance::initial((None, self.our_key.clone(), hash)),
337 )
338 .await?;
339
340 let sink = Sink::new(
341 governance_actor.subscribe(),
342 ext_db.get_subject(),
343 );
344
345 ctx.system().run_sink(sink).await;
346
347 ctx.create_child(
348 &format!("distributor_{}", subject),
349 DistriWorker {
350 our_key: self.our_key.clone(),
351 network: network.clone(),
352 },
353 )
354 .await?;
355 } else {
356 let tracker_actor = ctx
357 .create_child(
358 &subject.to_string(),
359 Tracker::initial(InitParamsTracker {
360 data: None,
361 hash,
362 is_service: self.is_service,
363 public_key: self.our_key.clone(),
364 }),
365 )
366 .await?;
367
368 let sink =
369 Sink::new(tracker_actor.subscribe(), ext_db.get_subject());
370
371 ctx.system().run_sink(sink).await;
372
373 ctx.create_child(
374 &format!("distributor_{}", subject),
375 DistriWorker {
376 our_key: self.our_key.clone(),
377 network: network.clone(),
378 },
379 )
380 .await?;
381 }
382 }
383
384 for (subject, data) in self.known_subjects.clone() {
385 let i_new_owner = self
386 .transfer_subjects
387 .get(&subject)
388 .is_some_and(|transfer| transfer.new_owner == *self.our_key);
389
390 if let SubjectData::Governance { .. } = data {
391 let governance_actor = ctx
392 .create_child(
393 &subject.to_string(),
394 Governance::initial((None, self.our_key.clone(), hash)),
395 )
396 .await?;
397
398 let sink = Sink::new(
399 governance_actor.subscribe(),
400 ext_db.get_subject(),
401 );
402
403 ctx.system().run_sink(sink).await;
404
405 ctx.create_child(
406 &format!("distributor_{}", subject),
407 DistriWorker {
408 our_key: self.our_key.clone(),
409 network: network.clone(),
410 },
411 )
412 .await?;
413 } else if i_new_owner {
414 let tracker_actor = ctx
415 .create_child(
416 &subject.to_string(),
417 Tracker::initial(InitParamsTracker {
418 data: None,
419 hash,
420 is_service: self.is_service,
421 public_key: self.our_key.clone(),
422 }),
423 )
424 .await?;
425
426 let sink =
427 Sink::new(tracker_actor.subscribe(), ext_db.get_subject());
428
429 ctx.system().run_sink(sink).await;
430
431 ctx.create_child(
432 &format!("distributor_{}", subject),
433 DistriWorker {
434 our_key: self.our_key.clone(),
435 network: network.clone(),
436 },
437 )
438 .await?;
439 }
440 }
441
442 Ok(())
443 }
444}
445
446#[derive(Clone, Debug, Serialize, Deserialize)]
448pub enum NodeMessage {
449 GetGovernances,
450 SignRequest(SignTypesNode),
451 PendingTransfers,
452 UpSubject {
453 subject_id: DigestIdentifier,
454 light: bool,
455 },
456 GetSubjectData(DigestIdentifier),
457 CreateNewSubject(SignedLedger),
458 IOwnerNewOwnerSubject(DigestIdentifier),
459 ICanSendLastLedger(DigestIdentifier),
460 AuthData(DigestIdentifier),
461 TransferSubject(TransferSubject),
462 RejectTransfer(DigestIdentifier),
463 ConfirmTransfer(DigestIdentifier),
464 EOLSubject {
465 subject_id: DigestIdentifier,
466 i_owner: bool,
467 },
468}
469
470impl Message for NodeMessage {
471 fn is_critical(&self) -> bool {
472 matches!(
473 self,
474 Self::TransferSubject(..)
475 | Self::RejectTransfer(..)
476 | Self::ConfirmTransfer(..)
477 | Self::EOLSubject { .. }
478 )
479 }
480}
481
482#[derive(Clone, Debug, Serialize, Deserialize)]
484pub enum NodeResponse {
485 Governances(Vec<DigestIdentifier>),
486 SubjectData(Option<SubjectData>),
487 PendingTransfers(Vec<TransferSubject>),
488 SignRequest(Signature),
489 IOwnerNewOwner {
490 i_owner: bool,
491 i_new_owner: Option<bool>,
492 },
493 AuthData {
494 auth: bool,
495 subject_data: Option<SubjectData>,
496 },
497 Ok,
498}
499
500impl Response for NodeResponse {}
501
502#[derive(
504 Clone, Debug, Serialize, Deserialize, BorshSerialize, BorshDeserialize,
505)]
506pub enum NodeEvent {
507 RegisterSubject {
508 owner: PublicKey,
509 subject_id: DigestIdentifier,
510 data: SubjectData,
511 },
512 TransferSubject(TransferSubject),
513 RejectTransfer(DigestIdentifier),
514 ConfirmTransfer(DigestIdentifier),
515 EOLSubject {
516 subject_id: DigestIdentifier,
517 i_owner: bool,
518 },
519}
520
521impl Event for NodeEvent {}
522
523#[async_trait]
524impl Actor for Node {
525 type Event = NodeEvent;
526 type Message = NodeMessage;
527 type Response = NodeResponse;
528
529 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
530 parent_span.map_or_else(
531 || info_span!("Node"),
532 |parent_span| info_span!(parent: parent_span, "Node"),
533 )
534 }
535
536 async fn pre_start(
537 &mut self,
538 ctx: &mut ave_actors::ActorContext<Self>,
539 ) -> Result<(), ActorError> {
540 if let Err(e) = Self::build_compilation_dir(ctx).await {
541 error!(
542 error = %e,
543 "Failed to build compilation directory"
544 );
545 return Err(e);
546 }
547
548 if let Err(e) = self.init_store("node", None, true, ctx).await {
550 error!(
551 error = %e,
552 "Failed to initialize node store"
553 );
554 return Err(e);
555 }
556
557 let Some(network): Option<Arc<NetworkSender>> =
558 ctx.system().get_helper("network").await
559 else {
560 error!("Network helper not found");
561 return Err(ActorError::Helper {
562 name: "network".to_string(),
563 reason: "Not found".to_string(),
564 });
565 };
566
567 let register_actor = match ctx.create_child("register", Register).await {
568 Ok(actor) => actor,
569 Err(e) => {
570 error!(error = %e, "Failed to create register child");
571 return Err(e);
572 }
573 };
574
575 let Some(ext_db): Option<Arc<ExternalDB>> =
576 ctx.system().get_helper("ext_db").await
577 else {
578 error!("External DB helper not found");
579 return Err(ActorError::Helper {
580 name: "ext_db".to_string(),
581 reason: "Not found".to_string(),
582 });
583 };
584
585 let sink = Sink::new(register_actor.subscribe(), ext_db.get_register());
586 ctx.system().run_sink(sink).await;
587
588 if let Err(e) = ctx
589 .create_child(
590 "manual_distribution",
591 ManualDistribution::new(self.our_key.clone()),
592 )
593 .await
594 {
595 error!(
596 error = %e,
597 "Failed to create manual_distribution child"
598 );
599 return Err(e);
600 }
601
602 if let Err(e) = self.create_subjects(ctx, &network).await {
603 error!(
604 error = %e,
605 "Failed to create subjects"
606 );
607 return Err(e);
608 }
609
610 if let Err(e) = ctx
611 .create_child(
612 "auth",
613 Auth::initial((network.clone(), self.our_key.clone())),
614 )
615 .await
616 {
617 error!(
618 error = %e,
619 "Failed to create auth child"
620 );
621 return Err(e);
622 }
623
624 if let Err(e) = ctx
625 .create_child(
626 "distributor",
627 DistriWorker {
628 our_key: self.our_key.clone(),
629 network,
630 },
631 )
632 .await
633 {
634 error!(
635 error = %e,
636 "Failed to create distributor child"
637 );
638 return Err(e);
639 }
640
641 Ok(())
642 }
643}
644
645#[async_trait]
646impl Handler<Self> for Node {
647 async fn handle_message(
648 &mut self,
649 _sender: ActorPath,
650 msg: NodeMessage,
651 ctx: &mut ave_actors::ActorContext<Self>,
652 ) -> Result<NodeResponse, ActorError> {
653 match msg {
654 NodeMessage::EOLSubject {
655 subject_id,
656 i_owner,
657 } => {
658 self.on_event(
659 NodeEvent::EOLSubject {
660 subject_id: subject_id.clone(),
661 i_owner,
662 },
663 ctx,
664 )
665 .await;
666
667 debug!(
668 msg_type = "EOLSubject",
669 subject_id = %subject_id,
670 i_owner = %i_owner,
671 "EOL confirmed"
672 );
673
674 Ok(NodeResponse::Ok)
675 }
676 NodeMessage::GetGovernances => {
677 let mut gov_know = self
678 .known_subjects
679 .iter()
680 .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
681 .map(|x| x.0.clone())
682 .collect::<Vec<DigestIdentifier>>();
683 let mut gov_owned = self
684 .owned_subjects
685 .iter()
686 .filter(|x| matches!(x.1, SubjectData::Governance { .. }))
687 .map(|x| x.0.clone())
688 .collect::<Vec<DigestIdentifier>>();
689 gov_know.append(&mut gov_owned);
690
691 return Ok(NodeResponse::Governances(gov_know));
692 }
693 NodeMessage::ICanSendLastLedger(subject_id) => {
694 let subject_data = if self.reject_subjects.contains(&subject_id)
695 {
696 self.known_subjects.get(&subject_id).cloned()
697 } else {
698 self.owned_subjects.get(&subject_id).cloned()
699 };
700
701 Ok(NodeResponse::SubjectData(subject_data))
702 }
703 NodeMessage::UpSubject { subject_id, light } => {
704 let Some(ext_db): Option<Arc<ExternalDB>> =
705 ctx.system().get_helper("ext_db").await
706 else {
707 error!(
708 msg_type = "UpSubject",
709 subject_id = %subject_id,
710 "External database helper not found"
711 );
712 return Err(ActorError::Helper {
713 name: "ext_db".to_string(),
714 reason: "Not found".to_string(),
715 });
716 };
717
718 let Some(hash) = self.hash else {
719 error!(
720 msg_type = "UpSubject",
721 subject_id = %subject_id,
722 "Hash is None"
723 );
724 return Err(ActorError::FunctionalCritical {
725 description: "Hash is None".to_string(),
726 });
727 };
728
729 let tracker_actor = ctx
730 .create_child(
731 &subject_id.to_string(),
732 Tracker::initial(InitParamsTracker {
733 data: None,
734 hash,
735 is_service: self.is_service,
736 public_key: self.our_key.clone(),
737 }),
738 )
739 .await?;
740 if !light {
741 let sink = Sink::new(
742 tracker_actor.subscribe(),
743 ext_db.get_subject(),
744 );
745 ctx.system().run_sink(sink).await;
746 }
747
748 debug!(
749 msg_type = "UpSubject",
750 subject_id = %subject_id,
751 light = light,
752 "Subject brought up successfully"
753 );
754
755 Ok(NodeResponse::Ok)
756 }
757 NodeMessage::GetSubjectData(subject_id) => {
758 let data = if let Some(data) =
759 self.owned_subjects.get(&subject_id)
760 {
761 Some(data.clone())
762 } else if let Some(data) = self.known_subjects.get(&subject_id)
763 {
764 Some(data.clone())
765 } else {
766 debug!(
767 msg_type = "GetSubjectData",
768 subject_id = %subject_id,
769 "Subject not found"
770 );
771
772 None
773 };
774
775 debug!(
776 msg_type = "GetSubjectData",
777 subject_id = %subject_id,
778 "Subject data retrieved successfully"
779 );
780
781 Ok(NodeResponse::SubjectData(data))
782 }
783 NodeMessage::PendingTransfers => {
784 let transfers: Vec<TransferSubject> = self
785 .transfer_subjects
786 .iter()
787 .map(|x| TransferSubject {
788 name: x.1.name.clone(),
789 subject_id: x.0.clone(),
790 new_owner: x.1.new_owner.clone(),
791 actual_owner: x.1.actual_owner.clone(),
792 })
793 .collect();
794
795 debug!(
796 msg_type = "PendingTransfers",
797 count = transfers.len(),
798 "Retrieved pending transfers"
799 );
800
801 Ok(NodeResponse::PendingTransfers(transfers))
802 }
803 NodeMessage::RejectTransfer(subject_id) => {
804 self.on_event(
805 NodeEvent::RejectTransfer(subject_id.clone()),
806 ctx,
807 )
808 .await;
809
810 debug!(
811 msg_type = "RejectTransfer",
812 subject_id = %subject_id,
813 "Transfer rejected successfully"
814 );
815
816 Ok(NodeResponse::Ok)
817 }
818 NodeMessage::TransferSubject(data) => {
819 let subject_id = data.subject_id.clone();
820 self.on_event(NodeEvent::TransferSubject(data), ctx).await;
821
822 debug!(
823 msg_type = "TransferSubject",
824 subject_id = %subject_id,
825 "Subject transfer registered successfully"
826 );
827
828 Ok(NodeResponse::Ok)
829 }
830 NodeMessage::ConfirmTransfer(subject_id) => {
831 self.on_event(
832 NodeEvent::ConfirmTransfer(subject_id.clone()),
833 ctx,
834 )
835 .await;
836
837 debug!(
838 msg_type = "ConfirmTransfer",
839 subject_id = %subject_id,
840 "Transfer confirmed between other parties"
841 );
842
843 Ok(NodeResponse::Ok)
844 }
845 NodeMessage::CreateNewSubject(ledger) => {
846 let Some(ext_db): Option<Arc<ExternalDB>> =
847 ctx.system().get_helper("ext_db").await
848 else {
849 error!(
850 msg_type = "CreateNewSubject",
851 "External database helper not found"
852 );
853 return Err(ActorError::Helper {
854 name: "ext_db".to_string(),
855 reason: "Not found".to_string(),
856 });
857 };
858
859 let Some(network): Option<Arc<NetworkSender>> =
860 ctx.system().get_helper("network").await
861 else {
862 error!(
863 msg_type = "CreateNewSubject",
864 "Network helper not found"
865 );
866 return Err(ActorError::Helper {
867 name: "network".to_string(),
868 reason: "Not found".to_string(),
869 });
870 };
871
872 let Some(hash) = self.hash else {
873 error!(msg_type = "CreateNewSubject", "Hash is None");
874 return Err(ActorError::FunctionalCritical {
875 description: "Hash is None".to_string(),
876 });
877 };
878
879 let metadata = match &ledger.content().protocols {
880 Protocols::Create { validation } => {
881 if let ValidationMetadata::Metadata(metadata) =
882 &validation.validation_metadata
883 {
884 metadata.clone()
885 } else {
886 error!(
887 msg_type = "CreateNewSubject",
888 "ValidationMetadata must be Metadata in Create event"
889 );
890 return Err(ActorError::Functional { description: "In Create event ValidationMetadata must be Metadata".to_string() });
891 }
892 }
893 _ => {
894 error!(
895 msg_type = "CreateNewSubject",
896 "Event must be a Create event"
897 );
898 return Err(ActorError::Functional {
899 description: "Event must be a Create event"
900 .to_string(),
901 });
902 }
903 };
904
905 let subject_id = metadata.subject_id.to_string();
906
907 let subject_data = if metadata.schema_id.is_gov() {
908 let subject_metadata = SubjectMetadata::new(&metadata);
909 let governance_data =
910 serde_json::from_value::<GovernanceData>(
911 metadata.properties.0,
912 )
913 .map_err(|e| {
914 error!(
915 msg_type = "CreateNewSubject",
916 subject_id = %subject_id,
917 error = %e,
918 "Governance properties must be GovernanceData"
919 );
920 ActorError::Functional { description: format!("In governance properties must be a GovernanceData: {e}")}
921 })?;
922
923 let governance = Governance::initial((
924 Some((subject_metadata, governance_data)),
925 self.our_key.clone(),
926 hash,
927 ));
928
929 let governance_actor =
930 ctx.create_child(&subject_id, governance).await?;
931
932 let sink = Sink::new(
933 governance_actor.subscribe(),
934 ext_db.get_subject(),
935 );
936 ctx.system().run_sink(sink).await;
937
938 if let Err(e) = governance_actor
939 .ask(GovernanceMessage::UpdateLedger {
940 events: vec![ledger.clone()],
941 })
942 .await
943 {
944 error!(
945 msg_type = "CreateNewSubject",
946 subject_id = %subject_id,
947 error = %e,
948 "Failed to update governance ledger"
949 );
950 governance_actor.tell_stop().await;
951 return Err(e);
952 };
953
954 debug!(
955 msg_type = "CreateNewSubject",
956 subject_id = %subject_id,
957 "Governance subject created successfully"
958 );
959
960 SubjectData::Governance { active: true }
961 } else {
962 let tracker_init = TrackerInit::from(&*metadata);
963
964 let tracker = Tracker::initial(InitParamsTracker {
965 data: Some(tracker_init),
966 hash,
967 is_service: self.is_service,
968 public_key: self.our_key.clone(),
969 });
970
971 let tracker_actor =
972 ctx.create_child(&subject_id, tracker).await?;
973
974 let sink = Sink::new(
975 tracker_actor.subscribe(),
976 ext_db.get_subject(),
977 );
978 ctx.system().run_sink(sink).await;
979
980 if let Err(e) = tracker_actor
981 .ask(TrackerMessage::UpdateLedger {
982 events: vec![ledger.clone()],
983 })
984 .await
985 {
986 error!(
987 msg_type = "CreateNewSubject",
988 subject_id = %subject_id,
989 error = %e,
990 "Failed to update tracker ledger"
991 );
992 tracker_actor.tell_stop().await;
993 return Err(e);
994 };
995
996 debug!(
997 msg_type = "CreateNewSubject",
998 subject_id = %subject_id,
999 governance_id = %metadata.governance_id,
1000 "Tracker subject created successfully"
1001 );
1002
1003 SubjectData::Tracker {
1004 governance_id: metadata.governance_id.clone(),
1005 schema_id: metadata.schema_id.clone(),
1006 namespace: metadata.namespace.to_string(),
1007 active: true,
1008 }
1009 };
1010
1011 self.on_event(
1012 NodeEvent::RegisterSubject {
1013 subject_id: metadata.subject_id.clone(),
1014 owner: metadata.owner.clone(),
1015 data: subject_data,
1016 },
1017 ctx,
1018 )
1019 .await;
1020
1021 ctx.create_child(
1022 &format!("distributor_{}", subject_id),
1023 DistriWorker {
1024 our_key: self.our_key.clone(),
1025 network,
1026 },
1027 )
1028 .await?;
1029
1030 debug!(
1031 msg_type = "CreateNewSubject",
1032 subject_id = %subject_id,
1033 "New subject and distributor created successfully"
1034 );
1035
1036 Ok(NodeResponse::Ok)
1037 }
1038 NodeMessage::SignRequest(content) => {
1039 let content_type = match &content {
1040 SignTypesNode::EventRequest(_) => "EventRequest",
1041 SignTypesNode::ValidationReq(_) => "ValidationReq",
1042 SignTypesNode::ValidationRes(_) => "ValidationRes",
1043 SignTypesNode::EvaluationReq(_) => "EvaluationReq",
1044 SignTypesNode::EvaluationRes(_) => "EvaluationRes",
1045 SignTypesNode::ApprovalReq(_) => "ApprovalReq",
1046 SignTypesNode::ApprovalRes(_) => "ApprovalRes",
1047 SignTypesNode::Ledger(_) => "Ledger",
1048 };
1049
1050 let sign = match content {
1051 SignTypesNode::EventRequest(event_req) => {
1052 self.sign(&event_req)
1053 }
1054 SignTypesNode::ValidationReq(validation_req) => {
1055 self.sign(&*validation_req)
1056 }
1057 SignTypesNode::ValidationRes(validation_res) => {
1058 self.sign(&validation_res)
1059 }
1060 SignTypesNode::EvaluationReq(evaluation_req) => {
1061 self.sign(&evaluation_req)
1062 }
1063 SignTypesNode::EvaluationRes(evaluation_res) => {
1064 self.sign(&evaluation_res)
1065 }
1066 SignTypesNode::ApprovalReq(approval_req) => {
1067 self.sign(&approval_req)
1068 }
1069 SignTypesNode::ApprovalRes(approval_res) => {
1070 self.sign(&*approval_res)
1071 }
1072 SignTypesNode::Ledger(ledger) => self.sign(&ledger),
1073 }
1074 .map_err(|e| {
1075 error!(
1076 msg_type = "SignRequest",
1077 content_type = content_type,
1078 error = %e,
1079 "Failed to sign content"
1080 );
1081 ActorError::FunctionalCritical {
1082 description: format!("Can not sign event: {}", e),
1083 }
1084 })?;
1085
1086 debug!(
1087 msg_type = "SignRequest",
1088 content_type = content_type,
1089 "Content signed successfully"
1090 );
1091
1092 Ok(NodeResponse::SignRequest(sign))
1093 }
1094 NodeMessage::IOwnerNewOwnerSubject(subject_id) => {
1095 let i_owner =
1096 self.owned_subjects.keys().any(|x| *x == subject_id);
1097
1098 let i_new_owner = if let Some(data) =
1099 self.transfer_subjects.get(&subject_id)
1100 {
1101 Some(data.new_owner == *self.our_key)
1102 } else {
1103 None
1104 };
1105
1106 debug!(
1107 msg_type = "OwnerPendingSubject",
1108 subject_id = %subject_id,
1109 i_owner = i_owner,
1110 i_new_owner = i_new_owner,
1111 "Checked owner/pending status"
1112 );
1113
1114 Ok(NodeResponse::IOwnerNewOwner {
1115 i_owner,
1116 i_new_owner,
1117 })
1118 }
1119 NodeMessage::AuthData(subject_id) => {
1120 let authorized_subjects = match ctx
1121 .get_child::<Auth>("auth")
1122 .await
1123 {
1124 Ok(auth) => {
1125 let res = match auth.ask(AuthMessage::GetAuths).await {
1126 Ok(res) => res,
1127 Err(e) => {
1128 error!(
1129 msg_type = "AuthData",
1130 subject_id = %subject_id,
1131 error = %e,
1132 "Failed to get authorizations from auth actor"
1133 );
1134 ctx.system().crash_system();
1135 return Err(e);
1136 }
1137 };
1138 let AuthResponse::Auths { subjects } = res else {
1139 error!(
1140 msg_type = "AuthData",
1141 subject_id = %subject_id,
1142 "Unexpected response from auth actor"
1143 );
1144 ctx.system().crash_system();
1145 return Err(ActorError::UnexpectedResponse {
1146 expected: "AuthResponse::Auths".to_owned(),
1147 path: ctx.path().clone() / "auth",
1148 });
1149 };
1150 subjects
1151 }
1152 Err(e) => {
1153 error!(
1154 msg_type = "AuthData",
1155 subject_id = %subject_id,
1156 "Auth actor not found"
1157 );
1158 ctx.system().crash_system();
1159 return Err(e);
1160 }
1161 };
1162
1163 let auth_subj =
1164 authorized_subjects.iter().any(|x| x.clone() == subject_id);
1165
1166 let subj_data = self
1167 .known_subjects
1168 .get(&subject_id)
1169 .or_else(|| self.owned_subjects.get(&subject_id))
1170 .cloned();
1171
1172 debug!(
1173 msg_type = "AuthData",
1174 subject_id = %subject_id,
1175 authorized = auth_subj,
1176 subject_data = ?subj_data,
1177 "Checked subject authorization status"
1178 );
1179
1180 Ok(NodeResponse::AuthData {
1181 auth: auth_subj,
1182 subject_data: subj_data,
1183 })
1184 }
1185 }
1186 }
1187
1188 async fn on_child_fault(
1189 &mut self,
1190 error: ActorError,
1191 ctx: &mut ActorContext<Self>,
1192 ) -> ChildAction {
1193 error!(
1194 error = %error,
1195 "Child actor fault, stopping system"
1196 );
1197 ctx.system().crash_system();
1198 ChildAction::Stop
1199 }
1200
1201 async fn on_event(
1202 &mut self,
1203 event: NodeEvent,
1204 ctx: &mut ActorContext<Self>,
1205 ) {
1206 if let Err(e) = self.persist(&event, ctx).await {
1207 error!(
1208 event = ?event,
1209 error = %e,
1210 "Failed to persist node event"
1211 );
1212 ctx.system().crash_system();
1213 }
1214 }
1215}
1216
1217pub struct InitParamsNode {
1218 pub key_pair: KeyPair,
1219 pub public_key: Arc<PublicKey>,
1220 pub hash: HashAlgorithm,
1221 pub is_service: bool,
1222}
1223
1224#[async_trait]
1225impl PersistentActor for Node {
1226 type Persistence = LightPersistence;
1227 type InitParams = InitParamsNode;
1228
1229 fn update(&mut self, state: Self) {
1230 self.owned_subjects = state.owned_subjects;
1231 self.known_subjects = state.known_subjects;
1232 self.transfer_subjects = state.transfer_subjects;
1233 self.reject_subjects = state.reject_subjects
1234 }
1235
1236 fn create_initial(params: Self::InitParams) -> Self {
1237 Self {
1238 hash: Some(params.hash),
1239 owner: params.key_pair,
1240 our_key: params.public_key,
1241 is_service: params.is_service,
1242 owned_subjects: HashMap::new(),
1243 known_subjects: HashMap::new(),
1244 transfer_subjects: HashMap::new(),
1245 reject_subjects: HashSet::new(),
1246 }
1247 }
1248
1249 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1251 match event {
1252 NodeEvent::EOLSubject {
1253 subject_id,
1254 i_owner,
1255 } => {
1256 self.eol(subject_id.clone(), *i_owner);
1257 debug!(
1258 event_type = "EOLSubject",
1259 subject_id = %subject_id,
1260 i_owner = &i_owner,
1261 "Applied eol"
1262 );
1263 }
1264 NodeEvent::ConfirmTransfer(subject_id) => {
1265 self.confirm_transfer(subject_id.clone());
1266 debug!(
1267 event_type = "ConfirmTransfer",
1268 subject_id = %subject_id,
1269 "Applied transfer confirmation"
1270 );
1271 }
1272 NodeEvent::RegisterSubject {
1273 subject_id,
1274 data,
1275 owner,
1276 } => {
1277 self.register_subject(
1278 subject_id.clone(),
1279 owner.clone(),
1280 data.clone(),
1281 );
1282 debug!(
1283 event_type = "RegisterSubject",
1284 subject_id = %subject_id,
1285 owner = %owner,
1286 "Applied subject registration"
1287 );
1288 }
1289 NodeEvent::RejectTransfer(subject_id) => {
1290 self.delete_transfer(subject_id);
1291 debug!(
1292 event_type = "RejectTransfer",
1293 subject_id = %subject_id,
1294 "Applied transfer rejection"
1295 );
1296 }
1297 NodeEvent::TransferSubject(transfer) => {
1298 self.transfer_subject(transfer.clone());
1299 debug!(
1300 event_type = "TransferSubject",
1301 subject_id = %transfer.subject_id,
1302 new_owner = %transfer.new_owner,
1303 "Applied subject transfer"
1304 );
1305 }
1306 };
1307
1308 Ok(())
1309 }
1310}
1311
1312#[async_trait]
1313impl Storable for Node {}
1314
1315#[cfg(test)]
1316pub mod tests {}