Skip to main content

ave_core/node/
subject_manager.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8    Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler, Message,
9    NotPersistentActor, PersistentActor, Response, Sink,
10};
11use ave_common::identity::{DigestIdentifier, HashAlgorithm, PublicKey};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16    governance::{
17        Governance, GovernanceMessage, GovernanceResponse, data::GovernanceData,
18    },
19    helpers::db::ExternalDB,
20    model::event::{Protocols, ValidationMetadata},
21    node::{Node, NodeMessage, NodeResponse, SubjectData},
22    subject::{SignedLedger, SubjectMetadata},
23    tracker::{
24        InitParamsTracker, Tracker, TrackerInit, TrackerMessage,
25        TrackerResponse,
26    },
27};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub enum SubjectManagerMessage {
31    UpGovernances {
32        governance_ids: Vec<DigestIdentifier>,
33    },
34    Up {
35        subject_id: DigestIdentifier,
36        requester: String,
37        create_ledger: Option<Box<SignedLedger>>,
38    },
39    Finish {
40        subject_id: DigestIdentifier,
41        requester: String,
42    },
43    DeleteTracker {
44        subject_id: DigestIdentifier,
45    },
46    DeleteGovernance {
47        subject_id: DigestIdentifier,
48    },
49}
50
51impl Message for SubjectManagerMessage {}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub enum SubjectManagerResponse {
55    Up,
56    Finish,
57    DeleteTracker,
58    DeleteGovernance,
59}
60
61impl Response for SubjectManagerResponse {}
62
63#[derive(Debug, Default, Clone)]
64struct SubjectEntry {
65    requesters: HashSet<String>,
66}
67
68pub struct SubjectManager {
69    our_key: Arc<PublicKey>,
70    hash: HashAlgorithm,
71    is_service: bool,
72    subjects: HashMap<DigestIdentifier, SubjectEntry>,
73}
74
75impl SubjectManager {
76    pub fn new(
77        our_key: Arc<PublicKey>,
78        hash: HashAlgorithm,
79        is_service: bool,
80    ) -> Self {
81        Self {
82            our_key,
83            hash,
84            is_service,
85            subjects: HashMap::new(),
86        }
87    }
88
89    async fn up_governances(
90        &self,
91        ctx: &mut ActorContext<Self>,
92        governance_ids: Vec<DigestIdentifier>,
93    ) -> Result<(), ActorError> {
94        let safe_mode = if let Some(config) = ctx
95            .system()
96            .get_helper::<crate::system::ConfigHelper>("config")
97            .await
98        {
99            config.safe_mode
100        } else {
101            return Err(ActorError::Helper {
102                name: "config".to_owned(),
103                reason: "Not found".to_owned(),
104            });
105        };
106
107        for governance_id in governance_ids {
108            let actor: ActorRef<Governance> = ctx
109                .create_child(
110                    &governance_id.to_string(),
111                    Governance::initial((
112                        None,
113                        self.our_key.clone(),
114                        self.hash,
115                        self.is_service,
116                    )),
117                )
118                .await?;
119
120            if !safe_mode {
121                let Some(ext_db): Option<Arc<ExternalDB>> =
122                    ctx.system().get_helper("ext_db").await
123                else {
124                    return Err(ActorError::Helper {
125                        name: "ext_db".to_owned(),
126                        reason: "Not found".to_owned(),
127                    });
128                };
129
130                let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
131                ctx.system().run_sink(sink).await;
132            }
133        }
134
135        Ok(())
136    }
137
138    async fn up(
139        &mut self,
140        ctx: &mut ActorContext<Self>,
141        subject_id: DigestIdentifier,
142        requester: String,
143        create_ledger: Option<Box<SignedLedger>>,
144    ) -> Result<(), ActorError> {
145        if let Some(entry) = self.subjects.get_mut(&subject_id) {
146            entry.requesters.insert(requester);
147            return Ok(());
148        }
149
150        if let Some(ledger) = create_ledger {
151            let ledger = *ledger;
152            let metadata = Self::metadata_from_create_ledger(&ledger)?;
153
154            if metadata.schema_id.is_gov() {
155                self.create_governance(ctx, &subject_id, metadata, ledger)
156                    .await?;
157                return Ok(());
158            }
159
160            self.create_tracker(ctx, &subject_id, metadata, ledger)
161                .await?;
162        } else {
163            self.load_tracker(ctx, &subject_id).await?;
164        }
165
166        let entry = self.subjects.entry(subject_id).or_default();
167        entry.requesters.insert(requester);
168
169        Ok(())
170    }
171
172    async fn finish(
173        &mut self,
174        ctx: &ActorContext<Self>,
175        subject_id: DigestIdentifier,
176        requester: String,
177    ) -> Result<(), ActorError> {
178        let Some(entry) = self.subjects.get_mut(&subject_id) else {
179            return Ok(());
180        };
181
182        entry.requesters.remove(&requester);
183
184        if !entry.requesters.is_empty() {
185            return Ok(());
186        }
187
188        let tracker = ctx.get_child::<Tracker>(&subject_id.to_string()).await?;
189        tracker.ask_stop().await?;
190        self.subjects.remove(&subject_id);
191
192        Ok(())
193    }
194
195    async fn delete_tracker(
196        &mut self,
197        ctx: &mut ActorContext<Self>,
198        subject_id: DigestIdentifier,
199    ) -> Result<(), ActorError> {
200        let mut cleanup_errors = Vec::new();
201
202        let tracker = match ctx
203            .create_child(
204                &subject_id.to_string(),
205                Tracker::initial(InitParamsTracker {
206                    data: None,
207                    hash: self.hash,
208                    is_service: self.is_service,
209                    public_key: self.our_key.clone(),
210                }),
211            )
212            .await
213        {
214            Ok(actor) => Some(actor),
215            Err(ActorError::Exists { .. }) => {
216                match ctx.get_child::<Tracker>(&subject_id.to_string()).await {
217                    Ok(actor) => Some(actor),
218                    Err(error) => {
219                        cleanup_errors.push(format!("tracker lookup: {error}"));
220                        None
221                    }
222                }
223            }
224            Err(error) => {
225                cleanup_errors.push(format!("tracker: {error}"));
226                None
227            }
228        };
229
230        if let Some(tracker) = tracker {
231            match tracker.ask(TrackerMessage::PurgeStorage).await {
232                Ok(TrackerResponse::Ok) => {}
233                Ok(other) => cleanup_errors
234                    .push(format!("tracker: unexpected response {other:?}")),
235                Err(error) => cleanup_errors.push(format!("tracker: {error}")),
236            }
237
238            if let Err(error) = tracker.ask_stop().await {
239                cleanup_errors.push(format!("tracker stop: {error}"));
240            }
241        }
242
243        self.subjects.remove(&subject_id);
244
245        let governance_id = match ctx.get_parent::<Node>().await {
246            Ok(node) => match node
247                .ask(NodeMessage::GetSubjectData(subject_id.clone()))
248                .await
249            {
250                Ok(NodeResponse::SubjectData(Some(SubjectData::Tracker {
251                    governance_id,
252                    ..
253                }))) => Some(governance_id),
254                Ok(NodeResponse::SubjectData(Some(
255                    SubjectData::Governance { .. },
256                ))) => {
257                    cleanup_errors.push(format!(
258                        "subject '{}' is governance, not tracker",
259                        subject_id
260                    ));
261                    None
262                }
263                Ok(NodeResponse::SubjectData(None)) => {
264                    cleanup_errors.push(format!(
265                        "subject '{}' not found in node",
266                        subject_id
267                    ));
268                    None
269                }
270                Ok(other) => {
271                    cleanup_errors
272                        .push(format!("node: unexpected response {other:?}"));
273                    None
274                }
275                Err(error) => {
276                    cleanup_errors.push(format!("node: {error}"));
277                    None
278                }
279            },
280            Err(error) => {
281                cleanup_errors.push(format!("node parent: {error}"));
282                None
283            }
284        };
285
286        if let Some(governance_id) = governance_id {
287            match ctx
288                .get_child::<Governance>(&governance_id.to_string())
289                .await
290            {
291                Ok(governance) => {
292                    match governance
293                        .ask(GovernanceMessage::DeleteTrackerReferences {
294                            subject_id: subject_id.clone(),
295                        })
296                        .await
297                    {
298                        Ok(GovernanceResponse::Ok) => {}
299                        Ok(other) => cleanup_errors.push(format!(
300                            "governance: unexpected response {other:?}"
301                        )),
302                        Err(error) => {
303                            cleanup_errors.push(format!("governance: {error}"))
304                        }
305                    }
306                }
307                Err(error) => {
308                    cleanup_errors.push(format!("governance lookup: {error}"));
309                }
310            }
311        }
312
313        if cleanup_errors.is_empty() {
314            Ok(())
315        } else {
316            Err(ActorError::Functional {
317                description: cleanup_errors.join("; "),
318            })
319        }
320    }
321
322    async fn delete_governance(
323        &mut self,
324        ctx: &mut ActorContext<Self>,
325        subject_id: DigestIdentifier,
326    ) -> Result<(), ActorError> {
327        let mut cleanup_errors = Vec::new();
328
329        let governance = match ctx
330            .create_child(
331                &subject_id.to_string(),
332                Governance::initial((
333                    None,
334                    self.our_key.clone(),
335                    self.hash,
336                    self.is_service,
337                )),
338            )
339            .await
340        {
341            Ok(actor) => Some(actor),
342            Err(ActorError::Exists { .. }) => {
343                match ctx.get_child::<Governance>(&subject_id.to_string()).await
344                {
345                    Ok(actor) => Some(actor),
346                    Err(error) => {
347                        cleanup_errors
348                            .push(format!("governance lookup: {error}"));
349                        None
350                    }
351                }
352            }
353            Err(error) => {
354                cleanup_errors.push(format!("governance: {error}"));
355                None
356            }
357        };
358
359        if let Some(governance) = governance {
360            match governance
361                .ask(GovernanceMessage::DeleteGovernanceStorage)
362                .await
363            {
364                Ok(GovernanceResponse::Ok) => {}
365                Ok(other) => cleanup_errors
366                    .push(format!("governance: unexpected response {other:?}")),
367                Err(error) => {
368                    cleanup_errors.push(format!("governance: {error}"))
369                }
370            }
371
372            if let Err(error) = governance.ask_stop().await {
373                cleanup_errors.push(format!("governance stop: {error}"));
374            }
375        }
376
377        self.subjects.remove(&subject_id);
378
379        if cleanup_errors.is_empty() {
380            Ok(())
381        } else {
382            Err(ActorError::Functional {
383                description: cleanup_errors.join("; "),
384            })
385        }
386    }
387
388    async fn load_tracker(
389        &self,
390        ctx: &mut ActorContext<Self>,
391        subject_id: &DigestIdentifier,
392    ) -> Result<(), ActorError> {
393        let tracker_actor: ActorRef<Tracker> = ctx
394            .create_child(
395                &subject_id.to_string(),
396                Tracker::initial(InitParamsTracker {
397                    data: None,
398                    hash: self.hash,
399                    is_service: self.is_service,
400                    public_key: self.our_key.clone(),
401                }),
402            )
403            .await?;
404
405        self.run_tracker_sink(ctx, tracker_actor).await
406    }
407
408    async fn create_tracker(
409        &self,
410        ctx: &mut ActorContext<Self>,
411        subject_id: &DigestIdentifier,
412        metadata: crate::subject::Metadata,
413        ledger: SignedLedger,
414    ) -> Result<(), ActorError> {
415        let tracker_actor: ActorRef<Tracker> = ctx
416            .create_child(
417                &subject_id.to_string(),
418                Tracker::initial(InitParamsTracker {
419                    data: Some(TrackerInit::from(&metadata)),
420                    hash: self.hash,
421                    is_service: self.is_service,
422                    public_key: self.our_key.clone(),
423                }),
424            )
425            .await?;
426
427        self.run_tracker_sink(ctx, tracker_actor.clone()).await?;
428
429        if let Err(error) = tracker_actor
430            .ask(TrackerMessage::UpdateLedger {
431                events: vec![ledger],
432            })
433            .await
434        {
435            tracker_actor.tell_stop().await;
436            return Err(error);
437        }
438
439        self.register_subject_in_node(
440            ctx,
441            metadata.owner.clone(),
442            metadata.subject_id.clone(),
443            SubjectData::Tracker {
444                governance_id: metadata.governance_id.clone(),
445                schema_id: metadata.schema_id.clone(),
446                namespace: metadata.namespace.to_string(),
447                active: true,
448            },
449        )
450        .await?;
451
452        Ok(())
453    }
454
455    async fn create_governance(
456        &self,
457        ctx: &mut ActorContext<Self>,
458        subject_id: &DigestIdentifier,
459        metadata: crate::subject::Metadata,
460        ledger: SignedLedger,
461    ) -> Result<(), ActorError> {
462        let governance_data = serde_json::from_value::<GovernanceData>(
463            metadata.properties.0.clone(),
464        )
465        .map_err(|e| ActorError::Functional {
466            description: format!(
467                "Governance properties must be GovernanceData: {e}"
468            ),
469        })?;
470
471        if ctx
472            .get_child::<Governance>(&subject_id.to_string())
473            .await
474            .is_ok()
475        {
476            return Ok(());
477        }
478
479        let governance_actor: ActorRef<Governance> = ctx
480            .create_child(
481                &subject_id.to_string(),
482                Governance::initial((
483                    Some((SubjectMetadata::new(&metadata), governance_data)),
484                    self.our_key.clone(),
485                    self.hash,
486                    self.is_service,
487                )),
488            )
489            .await?;
490
491        self.run_governance_sink(ctx, governance_actor.clone())
492            .await?;
493
494        if let Err(error) = governance_actor
495            .ask(GovernanceMessage::UpdateLedger {
496                events: vec![ledger],
497            })
498            .await
499        {
500            governance_actor.tell_stop().await;
501            return Err(error);
502        }
503
504        self.register_subject_in_node(
505            ctx,
506            metadata.owner.clone(),
507            metadata.subject_id.clone(),
508            SubjectData::Governance { active: true },
509        )
510        .await?;
511
512        Ok(())
513    }
514
515    fn metadata_from_create_ledger(
516        ledger: &SignedLedger,
517    ) -> Result<crate::subject::Metadata, ActorError> {
518        match &ledger.content().protocols {
519            Protocols::Create { validation } => {
520                if let ValidationMetadata::Metadata(metadata) =
521                    &validation.validation_metadata
522                {
523                    Ok(*metadata.clone())
524                } else {
525                    Err(ActorError::Functional {
526                        description:
527                            "Create validation metadata must be Metadata"
528                                .to_owned(),
529                    })
530                }
531            }
532            _ => Err(ActorError::Functional {
533                description:
534                    "SubjectManager create flow requires a create ledger"
535                        .to_owned(),
536            }),
537        }
538    }
539
540    async fn run_tracker_sink(
541        &self,
542        ctx: &ActorContext<Self>,
543        actor: ActorRef<Tracker>,
544    ) -> Result<(), ActorError> {
545        let safe_mode = if let Some(config) = ctx
546            .system()
547            .get_helper::<crate::system::ConfigHelper>("config")
548            .await
549        {
550            config.safe_mode
551        } else {
552            return Err(ActorError::Helper {
553                name: "config".to_owned(),
554                reason: "Not found".to_owned(),
555            });
556        };
557
558        if safe_mode {
559            return Ok(());
560        }
561
562        let Some(ext_db): Option<Arc<ExternalDB>> =
563            ctx.system().get_helper("ext_db").await
564        else {
565            return Err(ActorError::Helper {
566                name: "ext_db".to_owned(),
567                reason: "Not found".to_owned(),
568            });
569        };
570
571        let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
572        ctx.system().run_sink(sink).await;
573        Ok(())
574    }
575
576    async fn run_governance_sink(
577        &self,
578        ctx: &ActorContext<Self>,
579        actor: ActorRef<Governance>,
580    ) -> Result<(), ActorError> {
581        let safe_mode = if let Some(config) = ctx
582            .system()
583            .get_helper::<crate::system::ConfigHelper>("config")
584            .await
585        {
586            config.safe_mode
587        } else {
588            return Err(ActorError::Helper {
589                name: "config".to_owned(),
590                reason: "Not found".to_owned(),
591            });
592        };
593
594        if safe_mode {
595            return Ok(());
596        }
597
598        let Some(ext_db): Option<Arc<ExternalDB>> =
599            ctx.system().get_helper("ext_db").await
600        else {
601            return Err(ActorError::Helper {
602                name: "ext_db".to_owned(),
603                reason: "Not found".to_owned(),
604            });
605        };
606
607        let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
608        ctx.system().run_sink(sink).await;
609        Ok(())
610    }
611
612    async fn register_subject_in_node(
613        &self,
614        ctx: &ActorContext<Self>,
615        owner: PublicKey,
616        subject_id: DigestIdentifier,
617        data: SubjectData,
618    ) -> Result<(), ActorError> {
619        let node = ctx.get_parent::<Node>().await?;
620        let response = node
621            .ask(NodeMessage::RegisterSubject {
622                owner,
623                subject_id,
624                data,
625            })
626            .await?;
627
628        match response {
629            NodeResponse::Ok => Ok(()),
630            _ => Err(ActorError::UnexpectedResponse {
631                path: ctx.path().parent(),
632                expected: "NodeResponse::Ok".to_owned(),
633            }),
634        }
635    }
636}
637
638#[async_trait]
639impl Actor for SubjectManager {
640    type Event = ();
641    type Message = SubjectManagerMessage;
642    type Response = SubjectManagerResponse;
643
644    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
645        parent_span.map_or_else(
646            || info_span!("SubjectManager"),
647            |parent_span| info_span!(parent: parent_span, "SubjectManager"),
648        )
649    }
650}
651
652impl NotPersistentActor for SubjectManager {}
653
654#[async_trait]
655impl Handler<Self> for SubjectManager {
656    async fn handle_message(
657        &mut self,
658        _sender: ActorPath,
659        msg: SubjectManagerMessage,
660        ctx: &mut ActorContext<Self>,
661    ) -> Result<SubjectManagerResponse, ActorError> {
662        match msg {
663            SubjectManagerMessage::UpGovernances { governance_ids } => {
664                debug!(
665                    governance_count = governance_ids.len(),
666                    "Governance bootstrap requested"
667                );
668                self.up_governances(ctx, governance_ids).await?;
669                Ok(SubjectManagerResponse::Up)
670            }
671            SubjectManagerMessage::Up {
672                subject_id,
673                requester,
674                create_ledger,
675            } => {
676                debug!(
677                    subject_id = %subject_id,
678                    requester = %requester,
679                    "Subject up requested"
680                );
681                self.up(ctx, subject_id, requester, create_ledger).await?;
682                Ok(SubjectManagerResponse::Up)
683            }
684            SubjectManagerMessage::Finish {
685                subject_id,
686                requester,
687            } => {
688                debug!(
689                    subject_id = %subject_id,
690                    requester = %requester,
691                    "Subject finish requested"
692                );
693                self.finish(ctx, subject_id, requester).await?;
694                Ok(SubjectManagerResponse::Finish)
695            }
696            SubjectManagerMessage::DeleteTracker { subject_id } => {
697                debug!(
698                    subject_id = %subject_id,
699                    "Tracker delete requested"
700                );
701                self.delete_tracker(ctx, subject_id).await?;
702                Ok(SubjectManagerResponse::DeleteTracker)
703            }
704            SubjectManagerMessage::DeleteGovernance { subject_id } => {
705                debug!(
706                    subject_id = %subject_id,
707                    "Governance delete requested"
708                );
709                self.delete_governance(ctx, subject_id).await?;
710                Ok(SubjectManagerResponse::DeleteGovernance)
711            }
712        }
713    }
714
715    async fn on_child_fault(
716        &mut self,
717        error: ActorError,
718        ctx: &mut ActorContext<Self>,
719    ) -> ave_actors::ChildAction {
720        error!(error = %error, "Child fault in subject manager");
721        ctx.system().crash_system();
722        ave_actors::ChildAction::Stop
723    }
724}