Skip to main content

ave_core/node/
subject_manager.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler, Message,
9    NotPersistentActor, PersistentActor, Response, Sink,
10};
11use ave_common::identity::{DigestIdentifier, HashAlgorithm, PublicKey};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16    governance::{
17        Governance, GovernanceMessage, GovernanceResponse, data::GovernanceData,
18    },
19    helpers::db::ExternalDB,
20    model::event::{Ledger, Protocols, ValidationMetadata},
21    node::{Node, NodeMessage, NodeResponse, SubjectData},
22    subject::SubjectMetadata,
23    tracker::{
24        InitParamsTracker, Tracker, TrackerInit, TrackerMessage,
25        TrackerResponse,
26    },
27};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub enum SubjectManagerMessage {
31    UpGovernances {
32        governance_ids: Vec<DigestIdentifier>,
33    },
34    Up {
35        subject_id: DigestIdentifier,
36        requester: String,
37        create_ledger: Option<Box<Ledger>>,
38    },
39    Finish {
40        subject_id: DigestIdentifier,
41        requester: String,
42    },
43    DeleteTracker {
44        subject_id: DigestIdentifier,
45    },
46    DeleteGovernance {
47        subject_id: DigestIdentifier,
48    },
49}
50
51impl Message for SubjectManagerMessage {}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub enum SubjectManagerResponse {
55    Up,
56    Finish,
57    DeleteTracker,
58    DeleteGovernance,
59}
60
61impl Response for SubjectManagerResponse {}
62
63#[derive(Debug, Default, Clone)]
64struct SubjectEntry {
65    requesters: HashSet<String>,
66}
67
68pub struct SubjectManager {
69    our_key: Arc<PublicKey>,
70    hash: HashAlgorithm,
71    is_service: bool,
72    only_clear_events: bool,
73    subjects: HashMap<DigestIdentifier, SubjectEntry>,
74}
75
76impl SubjectManager {
77    pub fn new(
78        our_key: Arc<PublicKey>,
79        hash: HashAlgorithm,
80        is_service: bool,
81        only_clear_events: bool,
82    ) -> Self {
83        Self {
84            our_key,
85            hash,
86            is_service,
87            only_clear_events,
88            subjects: HashMap::new(),
89        }
90    }
91
92    async fn up_governances(
93        &self,
94        ctx: &mut ActorContext<Self>,
95        governance_ids: Vec<DigestIdentifier>,
96    ) -> Result<(), ActorError> {
97        let safe_mode = if let Some(config) = ctx
98            .system()
99            .get_helper::<crate::system::ConfigHelper>("config")
100            .await
101        {
102            config.safe_mode
103        } else {
104            return Err(ActorError::Helper {
105                name: "config".to_owned(),
106                reason: "Not found".to_owned(),
107            });
108        };
109
110        for governance_id in governance_ids {
111            let actor: ActorRef<Governance> = ctx
112                .create_child(
113                    &governance_id.to_string(),
114                    Governance::initial((
115                        None,
116                        self.our_key.clone(),
117                        self.hash,
118                        self.is_service,
119                    )),
120                )
121                .await?;
122
123            if !safe_mode {
124                let Some(ext_db): Option<Arc<ExternalDB>> =
125                    ctx.system().get_helper("ext_db").await
126                else {
127                    return Err(ActorError::Helper {
128                        name: "ext_db".to_owned(),
129                        reason: "Not found".to_owned(),
130                    });
131                };
132
133                let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
134                ctx.system().run_sink(sink).await;
135            }
136        }
137
138        Ok(())
139    }
140
141    async fn up(
142        &mut self,
143        ctx: &mut ActorContext<Self>,
144        subject_id: DigestIdentifier,
145        requester: String,
146        create_ledger: Option<Box<Ledger>>,
147    ) -> Result<(), ActorError> {
148        if let Some(entry) = self.subjects.get_mut(&subject_id) {
149            entry.requesters.insert(requester);
150            return Ok(());
151        }
152
153        if let Some(ledger) = create_ledger {
154            let ledger = *ledger;
155            let metadata = Self::metadata_from_create_ledger(&ledger)?;
156
157            if metadata.schema_id.is_gov() {
158                self.create_governance(ctx, &subject_id, metadata, ledger)
159                    .await?;
160                return Ok(());
161            }
162
163            self.create_tracker(ctx, &subject_id, metadata, ledger)
164                .await?;
165        } else {
166            self.load_tracker(ctx, &subject_id).await?;
167        }
168
169        let entry = self.subjects.entry(subject_id).or_default();
170        entry.requesters.insert(requester);
171
172        Ok(())
173    }
174
175    async fn finish(
176        &mut self,
177        ctx: &ActorContext<Self>,
178        subject_id: DigestIdentifier,
179        requester: String,
180    ) -> Result<(), ActorError> {
181        let Some(entry) = self.subjects.get_mut(&subject_id) else {
182            return Ok(());
183        };
184
185        entry.requesters.remove(&requester);
186
187        if !entry.requesters.is_empty() {
188            return Ok(());
189        }
190
191        let tracker = ctx.get_child::<Tracker>(&subject_id.to_string()).await?;
192        tracker.ask_stop().await?;
193        self.subjects.remove(&subject_id);
194
195        Ok(())
196    }
197
198    async fn delete_tracker(
199        &mut self,
200        ctx: &mut ActorContext<Self>,
201        subject_id: DigestIdentifier,
202    ) -> Result<(), ActorError> {
203        let mut cleanup_errors = Vec::new();
204
205        let tracker = match ctx
206            .create_child(
207                &subject_id.to_string(),
208                Tracker::initial(InitParamsTracker {
209                    data: None,
210                    hash: self.hash,
211                    is_service: self.is_service,
212                    only_clear_events: self.only_clear_events,
213                    public_key: self.our_key.clone(),
214                }),
215            )
216            .await
217        {
218            Ok(actor) => Some(actor),
219            Err(ActorError::Exists { .. }) => {
220                match ctx.get_child::<Tracker>(&subject_id.to_string()).await {
221                    Ok(actor) => Some(actor),
222                    Err(error) => {
223                        cleanup_errors.push(format!("tracker lookup: {error}"));
224                        None
225                    }
226                }
227            }
228            Err(error) => {
229                cleanup_errors.push(format!("tracker: {error}"));
230                None
231            }
232        };
233
234        if let Some(tracker) = tracker {
235            match tracker.ask(TrackerMessage::PurgeStorage).await {
236                Ok(TrackerResponse::Ok) => {}
237                Ok(other) => cleanup_errors
238                    .push(format!("tracker: unexpected response {other:?}")),
239                Err(error) => cleanup_errors.push(format!("tracker: {error}")),
240            }
241
242            if let Err(error) = tracker.ask_stop().await {
243                cleanup_errors.push(format!("tracker stop: {error}"));
244            }
245        }
246
247        self.subjects.remove(&subject_id);
248
249        let governance_id = match ctx.get_parent::<Node>().await {
250            Ok(node) => match node
251                .ask(NodeMessage::GetSubjectData(subject_id.clone()))
252                .await
253            {
254                Ok(NodeResponse::SubjectData(Some(SubjectData::Tracker {
255                    governance_id,
256                    ..
257                }))) => Some(governance_id),
258                Ok(NodeResponse::SubjectData(Some(
259                    SubjectData::Governance { .. },
260                ))) => {
261                    cleanup_errors.push(format!(
262                        "subject '{}' is governance, not tracker",
263                        subject_id
264                    ));
265                    None
266                }
267                Ok(NodeResponse::SubjectData(None)) => {
268                    cleanup_errors.push(format!(
269                        "subject '{}' not found in node",
270                        subject_id
271                    ));
272                    None
273                }
274                Ok(other) => {
275                    cleanup_errors
276                        .push(format!("node: unexpected response {other:?}"));
277                    None
278                }
279                Err(error) => {
280                    cleanup_errors.push(format!("node: {error}"));
281                    None
282                }
283            },
284            Err(error) => {
285                cleanup_errors.push(format!("node parent: {error}"));
286                None
287            }
288        };
289
290        if let Some(governance_id) = governance_id {
291            match ctx
292                .get_child::<Governance>(&governance_id.to_string())
293                .await
294            {
295                Ok(governance) => {
296                    match governance
297                        .ask(GovernanceMessage::DeleteTrackerReferences {
298                            subject_id: subject_id.clone(),
299                        })
300                        .await
301                    {
302                        Ok(GovernanceResponse::Ok) => {}
303                        Ok(other) => cleanup_errors.push(format!(
304                            "governance: unexpected response {other:?}"
305                        )),
306                        Err(error) => {
307                            cleanup_errors.push(format!("governance: {error}"))
308                        }
309                    }
310                }
311                Err(error) => {
312                    cleanup_errors.push(format!("governance lookup: {error}"));
313                }
314            }
315        }
316
317        if cleanup_errors.is_empty() {
318            Ok(())
319        } else {
320            Err(ActorError::Functional {
321                description: cleanup_errors.join("; "),
322            })
323        }
324    }
325
326    async fn delete_governance(
327        &mut self,
328        ctx: &mut ActorContext<Self>,
329        subject_id: DigestIdentifier,
330    ) -> Result<(), ActorError> {
331        let mut cleanup_errors = Vec::new();
332
333        let governance = match ctx
334            .create_child(
335                &subject_id.to_string(),
336                Governance::initial((
337                    None,
338                    self.our_key.clone(),
339                    self.hash,
340                    self.is_service,
341                )),
342            )
343            .await
344        {
345            Ok(actor) => Some(actor),
346            Err(ActorError::Exists { .. }) => {
347                match ctx.get_child::<Governance>(&subject_id.to_string()).await
348                {
349                    Ok(actor) => Some(actor),
350                    Err(error) => {
351                        cleanup_errors
352                            .push(format!("governance lookup: {error}"));
353                        None
354                    }
355                }
356            }
357            Err(error) => {
358                cleanup_errors.push(format!("governance: {error}"));
359                None
360            }
361        };
362
363        if let Some(governance) = governance {
364            match governance
365                .ask(GovernanceMessage::DeleteGovernanceStorage)
366                .await
367            {
368                Ok(GovernanceResponse::Ok) => {}
369                Ok(other) => cleanup_errors
370                    .push(format!("governance: unexpected response {other:?}")),
371                Err(error) => {
372                    cleanup_errors.push(format!("governance: {error}"))
373                }
374            }
375
376            if let Err(error) = governance.ask_stop().await {
377                cleanup_errors.push(format!("governance stop: {error}"));
378            }
379        }
380
381        self.subjects.remove(&subject_id);
382
383        if cleanup_errors.is_empty() {
384            Ok(())
385        } else {
386            Err(ActorError::Functional {
387                description: cleanup_errors.join("; "),
388            })
389        }
390    }
391
392    async fn load_tracker(
393        &self,
394        ctx: &mut ActorContext<Self>,
395        subject_id: &DigestIdentifier,
396    ) -> Result<(), ActorError> {
397        let tracker_actor: ActorRef<Tracker> = ctx
398            .create_child(
399                &subject_id.to_string(),
400                Tracker::initial(InitParamsTracker {
401                    data: None,
402                    hash: self.hash,
403                    is_service: self.is_service,
404                    only_clear_events: self.only_clear_events,
405                    public_key: self.our_key.clone(),
406                }),
407            )
408            .await?;
409
410        self.run_tracker_sink(ctx, tracker_actor).await
411    }
412
413    async fn create_tracker(
414        &self,
415        ctx: &mut ActorContext<Self>,
416        subject_id: &DigestIdentifier,
417        metadata: crate::subject::Metadata,
418        ledger: Ledger,
419    ) -> Result<(), ActorError> {
420        let tracker_actor: ActorRef<Tracker> = ctx
421            .create_child(
422                &subject_id.to_string(),
423                Tracker::initial(InitParamsTracker {
424                    data: Some(TrackerInit::from(&metadata)),
425                    hash: self.hash,
426                    is_service: self.is_service,
427                    only_clear_events: self.only_clear_events,
428                    public_key: self.our_key.clone(),
429                }),
430            )
431            .await?;
432
433        self.run_tracker_sink(ctx, tracker_actor.clone()).await?;
434
435        if let Err(error) = tracker_actor
436            .ask(TrackerMessage::UpdateLedger {
437                events: vec![ledger],
438            })
439            .await
440        {
441            tracker_actor.tell_stop().await;
442            return Err(error);
443        }
444
445        self.register_subject_in_node(
446            ctx,
447            metadata.owner.clone(),
448            metadata.subject_id.clone(),
449            SubjectData::Tracker {
450                governance_id: metadata.governance_id.clone(),
451                schema_id: metadata.schema_id.clone(),
452                namespace: metadata.namespace.to_string(),
453                active: true,
454            },
455        )
456        .await?;
457
458        Ok(())
459    }
460
461    async fn create_governance(
462        &self,
463        ctx: &mut ActorContext<Self>,
464        subject_id: &DigestIdentifier,
465        metadata: crate::subject::Metadata,
466        ledger: Ledger,
467    ) -> Result<(), ActorError> {
468        let governance_data = serde_json::from_value::<GovernanceData>(
469            metadata.properties.0.clone(),
470        )
471        .map_err(|e| ActorError::Functional {
472            description: format!(
473                "Governance properties must be GovernanceData: {e}"
474            ),
475        })?;
476
477        if ctx
478            .get_child::<Governance>(&subject_id.to_string())
479            .await
480            .is_ok()
481        {
482            return Ok(());
483        }
484
485        let governance_actor: ActorRef<Governance> = ctx
486            .create_child(
487                &subject_id.to_string(),
488                Governance::initial((
489                    Some((SubjectMetadata::new(&metadata), governance_data)),
490                    self.our_key.clone(),
491                    self.hash,
492                    self.is_service,
493                )),
494            )
495            .await?;
496
497        self.run_governance_sink(ctx, governance_actor.clone())
498            .await?;
499
500        if let Err(error) = governance_actor
501            .ask(GovernanceMessage::UpdateLedger {
502                events: vec![ledger],
503            })
504            .await
505        {
506            governance_actor.tell_stop().await;
507            return Err(error);
508        }
509
510        self.register_subject_in_node(
511            ctx,
512            metadata.owner.clone(),
513            metadata.subject_id.clone(),
514            SubjectData::Governance { active: true },
515        )
516        .await?;
517
518        Ok(())
519    }
520
521    fn metadata_from_create_ledger(
522        ledger: &Ledger,
523    ) -> Result<crate::subject::Metadata, ActorError> {
524        match &ledger.protocols {
525            Protocols::Create { validation, .. } => {
526                if let ValidationMetadata::Metadata(metadata) =
527                    &validation.validation_metadata
528                {
529                    Ok(*metadata.clone())
530                } else {
531                    Err(ActorError::Functional {
532                        description:
533                            "Create validation metadata must be Metadata"
534                                .to_owned(),
535                    })
536                }
537            }
538            _ => Err(ActorError::Functional {
539                description:
540                    "SubjectManager create flow requires a create ledger"
541                        .to_owned(),
542            }),
543        }
544    }
545
546    async fn run_tracker_sink(
547        &self,
548        ctx: &ActorContext<Self>,
549        actor: ActorRef<Tracker>,
550    ) -> Result<(), ActorError> {
551        let safe_mode = if let Some(config) = ctx
552            .system()
553            .get_helper::<crate::system::ConfigHelper>("config")
554            .await
555        {
556            config.safe_mode
557        } else {
558            return Err(ActorError::Helper {
559                name: "config".to_owned(),
560                reason: "Not found".to_owned(),
561            });
562        };
563
564        if safe_mode {
565            return Ok(());
566        }
567
568        let Some(ext_db): Option<Arc<ExternalDB>> =
569            ctx.system().get_helper("ext_db").await
570        else {
571            return Err(ActorError::Helper {
572                name: "ext_db".to_owned(),
573                reason: "Not found".to_owned(),
574            });
575        };
576
577        let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
578        ctx.system().run_sink(sink).await;
579        Ok(())
580    }
581
582    async fn run_governance_sink(
583        &self,
584        ctx: &ActorContext<Self>,
585        actor: ActorRef<Governance>,
586    ) -> Result<(), ActorError> {
587        let safe_mode = if let Some(config) = ctx
588            .system()
589            .get_helper::<crate::system::ConfigHelper>("config")
590            .await
591        {
592            config.safe_mode
593        } else {
594            return Err(ActorError::Helper {
595                name: "config".to_owned(),
596                reason: "Not found".to_owned(),
597            });
598        };
599
600        if safe_mode {
601            return Ok(());
602        }
603
604        let Some(ext_db): Option<Arc<ExternalDB>> =
605            ctx.system().get_helper("ext_db").await
606        else {
607            return Err(ActorError::Helper {
608                name: "ext_db".to_owned(),
609                reason: "Not found".to_owned(),
610            });
611        };
612
613        let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
614        ctx.system().run_sink(sink).await;
615        Ok(())
616    }
617
618    async fn register_subject_in_node(
619        &self,
620        ctx: &ActorContext<Self>,
621        owner: PublicKey,
622        subject_id: DigestIdentifier,
623        data: SubjectData,
624    ) -> Result<(), ActorError> {
625        let node = ctx.get_parent::<Node>().await?;
626        let response = node
627            .ask(NodeMessage::RegisterSubject {
628                owner,
629                subject_id,
630                data,
631            })
632            .await?;
633
634        match response {
635            NodeResponse::Ok => Ok(()),
636            _ => Err(ActorError::UnexpectedResponse {
637                path: ctx.path().parent(),
638                expected: "NodeResponse::Ok".to_owned(),
639            }),
640        }
641    }
642}
643
644#[async_trait]
645impl Actor for SubjectManager {
646    type Event = ();
647    type Message = SubjectManagerMessage;
648    type Response = SubjectManagerResponse;
649
650    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
651        parent_span.map_or_else(
652            || info_span!("SubjectManager"),
653            |parent_span| info_span!(parent: parent_span, "SubjectManager"),
654        )
655    }
656}
657
658impl NotPersistentActor for SubjectManager {}
659
660#[async_trait]
661impl Handler<Self> for SubjectManager {
662    async fn handle_message(
663        &mut self,
664        _sender: ActorPath,
665        msg: SubjectManagerMessage,
666        ctx: &mut ActorContext<Self>,
667    ) -> Result<SubjectManagerResponse, ActorError> {
668        match msg {
669            SubjectManagerMessage::UpGovernances { governance_ids } => {
670                debug!(
671                    governance_count = governance_ids.len(),
672                    "Governance bootstrap requested"
673                );
674                self.up_governances(ctx, governance_ids).await?;
675                Ok(SubjectManagerResponse::Up)
676            }
677            SubjectManagerMessage::Up {
678                subject_id,
679                requester,
680                create_ledger,
681            } => {
682                debug!(
683                    subject_id = %subject_id,
684                    requester = %requester,
685                    "Subject up requested"
686                );
687                self.up(ctx, subject_id, requester, create_ledger).await?;
688                Ok(SubjectManagerResponse::Up)
689            }
690            SubjectManagerMessage::Finish {
691                subject_id,
692                requester,
693            } => {
694                debug!(
695                    subject_id = %subject_id,
696                    requester = %requester,
697                    "Subject finish requested"
698                );
699                self.finish(ctx, subject_id, requester).await?;
700                Ok(SubjectManagerResponse::Finish)
701            }
702            SubjectManagerMessage::DeleteTracker { subject_id } => {
703                debug!(
704                    subject_id = %subject_id,
705                    "Tracker delete requested"
706                );
707                self.delete_tracker(ctx, subject_id).await?;
708                Ok(SubjectManagerResponse::DeleteTracker)
709            }
710            SubjectManagerMessage::DeleteGovernance { subject_id } => {
711                debug!(
712                    subject_id = %subject_id,
713                    "Governance delete requested"
714                );
715                self.delete_governance(ctx, subject_id).await?;
716                Ok(SubjectManagerResponse::DeleteGovernance)
717            }
718        }
719    }
720
721    async fn on_child_fault(
722        &mut self,
723        error: ActorError,
724        ctx: &mut ActorContext<Self>,
725    ) -> ave_actors::ChildAction {
726        error!(error = %error, "Child fault in subject manager");
727        ctx.system().crash_system();
728        ave_actors::ChildAction::Stop
729    }
730}