1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler, Message,
9 NotPersistentActor, PersistentActor, Response, Sink,
10};
11use ave_common::identity::{DigestIdentifier, HashAlgorithm, PublicKey};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16 governance::{
17 Governance, GovernanceMessage, GovernanceResponse, data::GovernanceData,
18 },
19 helpers::db::ExternalDB,
20 model::event::{Ledger, Protocols, ValidationMetadata},
21 node::{Node, NodeMessage, NodeResponse, SubjectData},
22 subject::SubjectMetadata,
23 tracker::{
24 InitParamsTracker, Tracker, TrackerInit, TrackerMessage,
25 TrackerResponse,
26 },
27};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub enum SubjectManagerMessage {
31 UpGovernances {
32 governance_ids: Vec<DigestIdentifier>,
33 },
34 Up {
35 subject_id: DigestIdentifier,
36 requester: String,
37 create_ledger: Option<Box<Ledger>>,
38 },
39 Finish {
40 subject_id: DigestIdentifier,
41 requester: String,
42 },
43 DeleteTracker {
44 subject_id: DigestIdentifier,
45 },
46 DeleteGovernance {
47 subject_id: DigestIdentifier,
48 },
49}
50
51impl Message for SubjectManagerMessage {}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub enum SubjectManagerResponse {
55 Up,
56 Finish,
57 DeleteTracker,
58 DeleteGovernance,
59}
60
61impl Response for SubjectManagerResponse {}
62
63#[derive(Debug, Default, Clone)]
64struct SubjectEntry {
65 requesters: HashSet<String>,
66}
67
68pub struct SubjectManager {
69 our_key: Arc<PublicKey>,
70 hash: HashAlgorithm,
71 is_service: bool,
72 only_clear_events: bool,
73 subjects: HashMap<DigestIdentifier, SubjectEntry>,
74}
75
76impl SubjectManager {
77 pub fn new(
78 our_key: Arc<PublicKey>,
79 hash: HashAlgorithm,
80 is_service: bool,
81 only_clear_events: bool,
82 ) -> Self {
83 Self {
84 our_key,
85 hash,
86 is_service,
87 only_clear_events,
88 subjects: HashMap::new(),
89 }
90 }
91
92 async fn up_governances(
93 &self,
94 ctx: &mut ActorContext<Self>,
95 governance_ids: Vec<DigestIdentifier>,
96 ) -> Result<(), ActorError> {
97 let safe_mode = if let Some(config) = ctx
98 .system()
99 .get_helper::<crate::system::ConfigHelper>("config")
100 .await
101 {
102 config.safe_mode
103 } else {
104 return Err(ActorError::Helper {
105 name: "config".to_owned(),
106 reason: "Not found".to_owned(),
107 });
108 };
109
110 for governance_id in governance_ids {
111 let actor: ActorRef<Governance> = ctx
112 .create_child(
113 &governance_id.to_string(),
114 Governance::initial((
115 None,
116 self.our_key.clone(),
117 self.hash,
118 self.is_service,
119 )),
120 )
121 .await?;
122
123 if !safe_mode {
124 let Some(ext_db): Option<Arc<ExternalDB>> =
125 ctx.system().get_helper("ext_db").await
126 else {
127 return Err(ActorError::Helper {
128 name: "ext_db".to_owned(),
129 reason: "Not found".to_owned(),
130 });
131 };
132
133 let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
134 ctx.system().run_sink(sink).await;
135 }
136 }
137
138 Ok(())
139 }
140
141 async fn up(
142 &mut self,
143 ctx: &mut ActorContext<Self>,
144 subject_id: DigestIdentifier,
145 requester: String,
146 create_ledger: Option<Box<Ledger>>,
147 ) -> Result<(), ActorError> {
148 if let Some(entry) = self.subjects.get_mut(&subject_id) {
149 entry.requesters.insert(requester);
150 return Ok(());
151 }
152
153 if let Some(ledger) = create_ledger {
154 let ledger = *ledger;
155 let metadata = Self::metadata_from_create_ledger(&ledger)?;
156
157 if metadata.schema_id.is_gov() {
158 self.create_governance(ctx, &subject_id, metadata, ledger)
159 .await?;
160 return Ok(());
161 }
162
163 self.create_tracker(ctx, &subject_id, metadata, ledger)
164 .await?;
165 } else {
166 self.load_tracker(ctx, &subject_id).await?;
167 }
168
169 let entry = self.subjects.entry(subject_id).or_default();
170 entry.requesters.insert(requester);
171
172 Ok(())
173 }
174
175 async fn finish(
176 &mut self,
177 ctx: &ActorContext<Self>,
178 subject_id: DigestIdentifier,
179 requester: String,
180 ) -> Result<(), ActorError> {
181 let Some(entry) = self.subjects.get_mut(&subject_id) else {
182 return Ok(());
183 };
184
185 entry.requesters.remove(&requester);
186
187 if !entry.requesters.is_empty() {
188 return Ok(());
189 }
190
191 let tracker = ctx.get_child::<Tracker>(&subject_id.to_string()).await?;
192 tracker.ask_stop().await?;
193 self.subjects.remove(&subject_id);
194
195 Ok(())
196 }
197
198 async fn delete_tracker(
199 &mut self,
200 ctx: &mut ActorContext<Self>,
201 subject_id: DigestIdentifier,
202 ) -> Result<(), ActorError> {
203 let mut cleanup_errors = Vec::new();
204
205 let tracker = match ctx
206 .create_child(
207 &subject_id.to_string(),
208 Tracker::initial(InitParamsTracker {
209 data: None,
210 hash: self.hash,
211 is_service: self.is_service,
212 only_clear_events: self.only_clear_events,
213 public_key: self.our_key.clone(),
214 }),
215 )
216 .await
217 {
218 Ok(actor) => Some(actor),
219 Err(ActorError::Exists { .. }) => {
220 match ctx.get_child::<Tracker>(&subject_id.to_string()).await {
221 Ok(actor) => Some(actor),
222 Err(error) => {
223 cleanup_errors.push(format!("tracker lookup: {error}"));
224 None
225 }
226 }
227 }
228 Err(error) => {
229 cleanup_errors.push(format!("tracker: {error}"));
230 None
231 }
232 };
233
234 if let Some(tracker) = tracker {
235 match tracker.ask(TrackerMessage::PurgeStorage).await {
236 Ok(TrackerResponse::Ok) => {}
237 Ok(other) => cleanup_errors
238 .push(format!("tracker: unexpected response {other:?}")),
239 Err(error) => cleanup_errors.push(format!("tracker: {error}")),
240 }
241
242 if let Err(error) = tracker.ask_stop().await {
243 cleanup_errors.push(format!("tracker stop: {error}"));
244 }
245 }
246
247 self.subjects.remove(&subject_id);
248
249 let governance_id = match ctx.get_parent::<Node>().await {
250 Ok(node) => match node
251 .ask(NodeMessage::GetSubjectData(subject_id.clone()))
252 .await
253 {
254 Ok(NodeResponse::SubjectData(Some(SubjectData::Tracker {
255 governance_id,
256 ..
257 }))) => Some(governance_id),
258 Ok(NodeResponse::SubjectData(Some(
259 SubjectData::Governance { .. },
260 ))) => {
261 cleanup_errors.push(format!(
262 "subject '{}' is governance, not tracker",
263 subject_id
264 ));
265 None
266 }
267 Ok(NodeResponse::SubjectData(None)) => {
268 cleanup_errors.push(format!(
269 "subject '{}' not found in node",
270 subject_id
271 ));
272 None
273 }
274 Ok(other) => {
275 cleanup_errors
276 .push(format!("node: unexpected response {other:?}"));
277 None
278 }
279 Err(error) => {
280 cleanup_errors.push(format!("node: {error}"));
281 None
282 }
283 },
284 Err(error) => {
285 cleanup_errors.push(format!("node parent: {error}"));
286 None
287 }
288 };
289
290 if let Some(governance_id) = governance_id {
291 match ctx
292 .get_child::<Governance>(&governance_id.to_string())
293 .await
294 {
295 Ok(governance) => {
296 match governance
297 .ask(GovernanceMessage::DeleteTrackerReferences {
298 subject_id: subject_id.clone(),
299 })
300 .await
301 {
302 Ok(GovernanceResponse::Ok) => {}
303 Ok(other) => cleanup_errors.push(format!(
304 "governance: unexpected response {other:?}"
305 )),
306 Err(error) => {
307 cleanup_errors.push(format!("governance: {error}"))
308 }
309 }
310 }
311 Err(error) => {
312 cleanup_errors.push(format!("governance lookup: {error}"));
313 }
314 }
315 }
316
317 if cleanup_errors.is_empty() {
318 Ok(())
319 } else {
320 Err(ActorError::Functional {
321 description: cleanup_errors.join("; "),
322 })
323 }
324 }
325
326 async fn delete_governance(
327 &mut self,
328 ctx: &mut ActorContext<Self>,
329 subject_id: DigestIdentifier,
330 ) -> Result<(), ActorError> {
331 let mut cleanup_errors = Vec::new();
332
333 let governance = match ctx
334 .create_child(
335 &subject_id.to_string(),
336 Governance::initial((
337 None,
338 self.our_key.clone(),
339 self.hash,
340 self.is_service,
341 )),
342 )
343 .await
344 {
345 Ok(actor) => Some(actor),
346 Err(ActorError::Exists { .. }) => {
347 match ctx.get_child::<Governance>(&subject_id.to_string()).await
348 {
349 Ok(actor) => Some(actor),
350 Err(error) => {
351 cleanup_errors
352 .push(format!("governance lookup: {error}"));
353 None
354 }
355 }
356 }
357 Err(error) => {
358 cleanup_errors.push(format!("governance: {error}"));
359 None
360 }
361 };
362
363 if let Some(governance) = governance {
364 match governance
365 .ask(GovernanceMessage::DeleteGovernanceStorage)
366 .await
367 {
368 Ok(GovernanceResponse::Ok) => {}
369 Ok(other) => cleanup_errors
370 .push(format!("governance: unexpected response {other:?}")),
371 Err(error) => {
372 cleanup_errors.push(format!("governance: {error}"))
373 }
374 }
375
376 if let Err(error) = governance.ask_stop().await {
377 cleanup_errors.push(format!("governance stop: {error}"));
378 }
379 }
380
381 self.subjects.remove(&subject_id);
382
383 if cleanup_errors.is_empty() {
384 Ok(())
385 } else {
386 Err(ActorError::Functional {
387 description: cleanup_errors.join("; "),
388 })
389 }
390 }
391
392 async fn load_tracker(
393 &self,
394 ctx: &mut ActorContext<Self>,
395 subject_id: &DigestIdentifier,
396 ) -> Result<(), ActorError> {
397 let tracker_actor: ActorRef<Tracker> = ctx
398 .create_child(
399 &subject_id.to_string(),
400 Tracker::initial(InitParamsTracker {
401 data: None,
402 hash: self.hash,
403 is_service: self.is_service,
404 only_clear_events: self.only_clear_events,
405 public_key: self.our_key.clone(),
406 }),
407 )
408 .await?;
409
410 self.run_tracker_sink(ctx, tracker_actor).await
411 }
412
413 async fn create_tracker(
414 &self,
415 ctx: &mut ActorContext<Self>,
416 subject_id: &DigestIdentifier,
417 metadata: crate::subject::Metadata,
418 ledger: Ledger,
419 ) -> Result<(), ActorError> {
420 let tracker_actor: ActorRef<Tracker> = ctx
421 .create_child(
422 &subject_id.to_string(),
423 Tracker::initial(InitParamsTracker {
424 data: Some(TrackerInit::from(&metadata)),
425 hash: self.hash,
426 is_service: self.is_service,
427 only_clear_events: self.only_clear_events,
428 public_key: self.our_key.clone(),
429 }),
430 )
431 .await?;
432
433 self.run_tracker_sink(ctx, tracker_actor.clone()).await?;
434
435 if let Err(error) = tracker_actor
436 .ask(TrackerMessage::UpdateLedger {
437 events: vec![ledger],
438 })
439 .await
440 {
441 tracker_actor.tell_stop().await;
442 return Err(error);
443 }
444
445 self.register_subject_in_node(
446 ctx,
447 metadata.owner.clone(),
448 metadata.subject_id.clone(),
449 SubjectData::Tracker {
450 governance_id: metadata.governance_id.clone(),
451 schema_id: metadata.schema_id.clone(),
452 namespace: metadata.namespace.to_string(),
453 active: true,
454 },
455 )
456 .await?;
457
458 Ok(())
459 }
460
461 async fn create_governance(
462 &self,
463 ctx: &mut ActorContext<Self>,
464 subject_id: &DigestIdentifier,
465 metadata: crate::subject::Metadata,
466 ledger: Ledger,
467 ) -> Result<(), ActorError> {
468 let governance_data = serde_json::from_value::<GovernanceData>(
469 metadata.properties.0.clone(),
470 )
471 .map_err(|e| ActorError::Functional {
472 description: format!(
473 "Governance properties must be GovernanceData: {e}"
474 ),
475 })?;
476
477 if ctx
478 .get_child::<Governance>(&subject_id.to_string())
479 .await
480 .is_ok()
481 {
482 return Ok(());
483 }
484
485 let governance_actor: ActorRef<Governance> = ctx
486 .create_child(
487 &subject_id.to_string(),
488 Governance::initial((
489 Some((SubjectMetadata::new(&metadata), governance_data)),
490 self.our_key.clone(),
491 self.hash,
492 self.is_service,
493 )),
494 )
495 .await?;
496
497 self.run_governance_sink(ctx, governance_actor.clone())
498 .await?;
499
500 if let Err(error) = governance_actor
501 .ask(GovernanceMessage::UpdateLedger {
502 events: vec![ledger],
503 })
504 .await
505 {
506 governance_actor.tell_stop().await;
507 return Err(error);
508 }
509
510 self.register_subject_in_node(
511 ctx,
512 metadata.owner.clone(),
513 metadata.subject_id.clone(),
514 SubjectData::Governance { active: true },
515 )
516 .await?;
517
518 Ok(())
519 }
520
521 fn metadata_from_create_ledger(
522 ledger: &Ledger,
523 ) -> Result<crate::subject::Metadata, ActorError> {
524 match &ledger.protocols {
525 Protocols::Create { validation, .. } => {
526 if let ValidationMetadata::Metadata(metadata) =
527 &validation.validation_metadata
528 {
529 Ok(*metadata.clone())
530 } else {
531 Err(ActorError::Functional {
532 description:
533 "Create validation metadata must be Metadata"
534 .to_owned(),
535 })
536 }
537 }
538 _ => Err(ActorError::Functional {
539 description:
540 "SubjectManager create flow requires a create ledger"
541 .to_owned(),
542 }),
543 }
544 }
545
546 async fn run_tracker_sink(
547 &self,
548 ctx: &ActorContext<Self>,
549 actor: ActorRef<Tracker>,
550 ) -> Result<(), ActorError> {
551 let safe_mode = if let Some(config) = ctx
552 .system()
553 .get_helper::<crate::system::ConfigHelper>("config")
554 .await
555 {
556 config.safe_mode
557 } else {
558 return Err(ActorError::Helper {
559 name: "config".to_owned(),
560 reason: "Not found".to_owned(),
561 });
562 };
563
564 if safe_mode {
565 return Ok(());
566 }
567
568 let Some(ext_db): Option<Arc<ExternalDB>> =
569 ctx.system().get_helper("ext_db").await
570 else {
571 return Err(ActorError::Helper {
572 name: "ext_db".to_owned(),
573 reason: "Not found".to_owned(),
574 });
575 };
576
577 let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
578 ctx.system().run_sink(sink).await;
579 Ok(())
580 }
581
582 async fn run_governance_sink(
583 &self,
584 ctx: &ActorContext<Self>,
585 actor: ActorRef<Governance>,
586 ) -> Result<(), ActorError> {
587 let safe_mode = if let Some(config) = ctx
588 .system()
589 .get_helper::<crate::system::ConfigHelper>("config")
590 .await
591 {
592 config.safe_mode
593 } else {
594 return Err(ActorError::Helper {
595 name: "config".to_owned(),
596 reason: "Not found".to_owned(),
597 });
598 };
599
600 if safe_mode {
601 return Ok(());
602 }
603
604 let Some(ext_db): Option<Arc<ExternalDB>> =
605 ctx.system().get_helper("ext_db").await
606 else {
607 return Err(ActorError::Helper {
608 name: "ext_db".to_owned(),
609 reason: "Not found".to_owned(),
610 });
611 };
612
613 let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
614 ctx.system().run_sink(sink).await;
615 Ok(())
616 }
617
618 async fn register_subject_in_node(
619 &self,
620 ctx: &ActorContext<Self>,
621 owner: PublicKey,
622 subject_id: DigestIdentifier,
623 data: SubjectData,
624 ) -> Result<(), ActorError> {
625 let node = ctx.get_parent::<Node>().await?;
626 let response = node
627 .ask(NodeMessage::RegisterSubject {
628 owner,
629 subject_id,
630 data,
631 })
632 .await?;
633
634 match response {
635 NodeResponse::Ok => Ok(()),
636 _ => Err(ActorError::UnexpectedResponse {
637 path: ctx.path().parent(),
638 expected: "NodeResponse::Ok".to_owned(),
639 }),
640 }
641 }
642}
643
644#[async_trait]
645impl Actor for SubjectManager {
646 type Event = ();
647 type Message = SubjectManagerMessage;
648 type Response = SubjectManagerResponse;
649
650 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
651 parent_span.map_or_else(
652 || info_span!("SubjectManager"),
653 |parent_span| info_span!(parent: parent_span, "SubjectManager"),
654 )
655 }
656}
657
658impl NotPersistentActor for SubjectManager {}
659
660#[async_trait]
661impl Handler<Self> for SubjectManager {
662 async fn handle_message(
663 &mut self,
664 _sender: ActorPath,
665 msg: SubjectManagerMessage,
666 ctx: &mut ActorContext<Self>,
667 ) -> Result<SubjectManagerResponse, ActorError> {
668 match msg {
669 SubjectManagerMessage::UpGovernances { governance_ids } => {
670 debug!(
671 governance_count = governance_ids.len(),
672 "Governance bootstrap requested"
673 );
674 self.up_governances(ctx, governance_ids).await?;
675 Ok(SubjectManagerResponse::Up)
676 }
677 SubjectManagerMessage::Up {
678 subject_id,
679 requester,
680 create_ledger,
681 } => {
682 debug!(
683 subject_id = %subject_id,
684 requester = %requester,
685 "Subject up requested"
686 );
687 self.up(ctx, subject_id, requester, create_ledger).await?;
688 Ok(SubjectManagerResponse::Up)
689 }
690 SubjectManagerMessage::Finish {
691 subject_id,
692 requester,
693 } => {
694 debug!(
695 subject_id = %subject_id,
696 requester = %requester,
697 "Subject finish requested"
698 );
699 self.finish(ctx, subject_id, requester).await?;
700 Ok(SubjectManagerResponse::Finish)
701 }
702 SubjectManagerMessage::DeleteTracker { subject_id } => {
703 debug!(
704 subject_id = %subject_id,
705 "Tracker delete requested"
706 );
707 self.delete_tracker(ctx, subject_id).await?;
708 Ok(SubjectManagerResponse::DeleteTracker)
709 }
710 SubjectManagerMessage::DeleteGovernance { subject_id } => {
711 debug!(
712 subject_id = %subject_id,
713 "Governance delete requested"
714 );
715 self.delete_governance(ctx, subject_id).await?;
716 Ok(SubjectManagerResponse::DeleteGovernance)
717 }
718 }
719 }
720
721 async fn on_child_fault(
722 &mut self,
723 error: ActorError,
724 ctx: &mut ActorContext<Self>,
725 ) -> ave_actors::ChildAction {
726 error!(error = %error, "Child fault in subject manager");
727 ctx.system().crash_system();
728 ave_actors::ChildAction::Stop
729 }
730}