1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use ave_actors::{
8 Actor, ActorContext, ActorError, ActorPath, ActorRef, Handler, Message,
9 NotPersistentActor, PersistentActor, Response, Sink,
10};
11use ave_common::identity::{DigestIdentifier, HashAlgorithm, PublicKey};
12use serde::{Deserialize, Serialize};
13use tracing::{Span, debug, error, info_span};
14
15use crate::{
16 governance::{
17 Governance, GovernanceMessage, GovernanceResponse, data::GovernanceData,
18 },
19 helpers::db::ExternalDB,
20 model::event::{Protocols, ValidationMetadata},
21 node::{Node, NodeMessage, NodeResponse, SubjectData},
22 subject::{SignedLedger, SubjectMetadata},
23 tracker::{
24 InitParamsTracker, Tracker, TrackerInit, TrackerMessage,
25 TrackerResponse,
26 },
27};
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub enum SubjectManagerMessage {
31 UpGovernances {
32 governance_ids: Vec<DigestIdentifier>,
33 },
34 Up {
35 subject_id: DigestIdentifier,
36 requester: String,
37 create_ledger: Option<Box<SignedLedger>>,
38 },
39 Finish {
40 subject_id: DigestIdentifier,
41 requester: String,
42 },
43 DeleteTracker {
44 subject_id: DigestIdentifier,
45 },
46 DeleteGovernance {
47 subject_id: DigestIdentifier,
48 },
49}
50
51impl Message for SubjectManagerMessage {}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub enum SubjectManagerResponse {
55 Up,
56 Finish,
57 DeleteTracker,
58 DeleteGovernance,
59}
60
61impl Response for SubjectManagerResponse {}
62
63#[derive(Debug, Default, Clone)]
64struct SubjectEntry {
65 requesters: HashSet<String>,
66}
67
68pub struct SubjectManager {
69 our_key: Arc<PublicKey>,
70 hash: HashAlgorithm,
71 is_service: bool,
72 subjects: HashMap<DigestIdentifier, SubjectEntry>,
73}
74
75impl SubjectManager {
76 pub fn new(
77 our_key: Arc<PublicKey>,
78 hash: HashAlgorithm,
79 is_service: bool,
80 ) -> Self {
81 Self {
82 our_key,
83 hash,
84 is_service,
85 subjects: HashMap::new(),
86 }
87 }
88
89 async fn up_governances(
90 &self,
91 ctx: &mut ActorContext<Self>,
92 governance_ids: Vec<DigestIdentifier>,
93 ) -> Result<(), ActorError> {
94 let safe_mode = if let Some(config) = ctx
95 .system()
96 .get_helper::<crate::system::ConfigHelper>("config")
97 .await
98 {
99 config.safe_mode
100 } else {
101 return Err(ActorError::Helper {
102 name: "config".to_owned(),
103 reason: "Not found".to_owned(),
104 });
105 };
106
107 for governance_id in governance_ids {
108 let actor: ActorRef<Governance> = ctx
109 .create_child(
110 &governance_id.to_string(),
111 Governance::initial((
112 None,
113 self.our_key.clone(),
114 self.hash,
115 self.is_service,
116 )),
117 )
118 .await?;
119
120 if !safe_mode {
121 let Some(ext_db): Option<Arc<ExternalDB>> =
122 ctx.system().get_helper("ext_db").await
123 else {
124 return Err(ActorError::Helper {
125 name: "ext_db".to_owned(),
126 reason: "Not found".to_owned(),
127 });
128 };
129
130 let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
131 ctx.system().run_sink(sink).await;
132 }
133 }
134
135 Ok(())
136 }
137
138 async fn up(
139 &mut self,
140 ctx: &mut ActorContext<Self>,
141 subject_id: DigestIdentifier,
142 requester: String,
143 create_ledger: Option<Box<SignedLedger>>,
144 ) -> Result<(), ActorError> {
145 if let Some(entry) = self.subjects.get_mut(&subject_id) {
146 entry.requesters.insert(requester);
147 return Ok(());
148 }
149
150 if let Some(ledger) = create_ledger {
151 let ledger = *ledger;
152 let metadata = Self::metadata_from_create_ledger(&ledger)?;
153
154 if metadata.schema_id.is_gov() {
155 self.create_governance(ctx, &subject_id, metadata, ledger)
156 .await?;
157 return Ok(());
158 }
159
160 self.create_tracker(ctx, &subject_id, metadata, ledger)
161 .await?;
162 } else {
163 self.load_tracker(ctx, &subject_id).await?;
164 }
165
166 let entry = self.subjects.entry(subject_id).or_default();
167 entry.requesters.insert(requester);
168
169 Ok(())
170 }
171
172 async fn finish(
173 &mut self,
174 ctx: &ActorContext<Self>,
175 subject_id: DigestIdentifier,
176 requester: String,
177 ) -> Result<(), ActorError> {
178 let Some(entry) = self.subjects.get_mut(&subject_id) else {
179 return Ok(());
180 };
181
182 entry.requesters.remove(&requester);
183
184 if !entry.requesters.is_empty() {
185 return Ok(());
186 }
187
188 let tracker = ctx.get_child::<Tracker>(&subject_id.to_string()).await?;
189 tracker.ask_stop().await?;
190 self.subjects.remove(&subject_id);
191
192 Ok(())
193 }
194
195 async fn delete_tracker(
196 &mut self,
197 ctx: &mut ActorContext<Self>,
198 subject_id: DigestIdentifier,
199 ) -> Result<(), ActorError> {
200 let mut cleanup_errors = Vec::new();
201
202 let tracker = match ctx
203 .create_child(
204 &subject_id.to_string(),
205 Tracker::initial(InitParamsTracker {
206 data: None,
207 hash: self.hash,
208 is_service: self.is_service,
209 public_key: self.our_key.clone(),
210 }),
211 )
212 .await
213 {
214 Ok(actor) => Some(actor),
215 Err(ActorError::Exists { .. }) => {
216 match ctx.get_child::<Tracker>(&subject_id.to_string()).await {
217 Ok(actor) => Some(actor),
218 Err(error) => {
219 cleanup_errors.push(format!("tracker lookup: {error}"));
220 None
221 }
222 }
223 }
224 Err(error) => {
225 cleanup_errors.push(format!("tracker: {error}"));
226 None
227 }
228 };
229
230 if let Some(tracker) = tracker {
231 match tracker.ask(TrackerMessage::PurgeStorage).await {
232 Ok(TrackerResponse::Ok) => {}
233 Ok(other) => cleanup_errors
234 .push(format!("tracker: unexpected response {other:?}")),
235 Err(error) => cleanup_errors.push(format!("tracker: {error}")),
236 }
237
238 if let Err(error) = tracker.ask_stop().await {
239 cleanup_errors.push(format!("tracker stop: {error}"));
240 }
241 }
242
243 self.subjects.remove(&subject_id);
244
245 let governance_id = match ctx.get_parent::<Node>().await {
246 Ok(node) => match node
247 .ask(NodeMessage::GetSubjectData(subject_id.clone()))
248 .await
249 {
250 Ok(NodeResponse::SubjectData(Some(SubjectData::Tracker {
251 governance_id,
252 ..
253 }))) => Some(governance_id),
254 Ok(NodeResponse::SubjectData(Some(
255 SubjectData::Governance { .. },
256 ))) => {
257 cleanup_errors.push(format!(
258 "subject '{}' is governance, not tracker",
259 subject_id
260 ));
261 None
262 }
263 Ok(NodeResponse::SubjectData(None)) => {
264 cleanup_errors.push(format!(
265 "subject '{}' not found in node",
266 subject_id
267 ));
268 None
269 }
270 Ok(other) => {
271 cleanup_errors
272 .push(format!("node: unexpected response {other:?}"));
273 None
274 }
275 Err(error) => {
276 cleanup_errors.push(format!("node: {error}"));
277 None
278 }
279 },
280 Err(error) => {
281 cleanup_errors.push(format!("node parent: {error}"));
282 None
283 }
284 };
285
286 if let Some(governance_id) = governance_id {
287 match ctx
288 .get_child::<Governance>(&governance_id.to_string())
289 .await
290 {
291 Ok(governance) => {
292 match governance
293 .ask(GovernanceMessage::DeleteTrackerReferences {
294 subject_id: subject_id.clone(),
295 })
296 .await
297 {
298 Ok(GovernanceResponse::Ok) => {}
299 Ok(other) => cleanup_errors.push(format!(
300 "governance: unexpected response {other:?}"
301 )),
302 Err(error) => {
303 cleanup_errors.push(format!("governance: {error}"))
304 }
305 }
306 }
307 Err(error) => {
308 cleanup_errors.push(format!("governance lookup: {error}"));
309 }
310 }
311 }
312
313 if cleanup_errors.is_empty() {
314 Ok(())
315 } else {
316 Err(ActorError::Functional {
317 description: cleanup_errors.join("; "),
318 })
319 }
320 }
321
322 async fn delete_governance(
323 &mut self,
324 ctx: &mut ActorContext<Self>,
325 subject_id: DigestIdentifier,
326 ) -> Result<(), ActorError> {
327 let mut cleanup_errors = Vec::new();
328
329 let governance = match ctx
330 .create_child(
331 &subject_id.to_string(),
332 Governance::initial((
333 None,
334 self.our_key.clone(),
335 self.hash,
336 self.is_service,
337 )),
338 )
339 .await
340 {
341 Ok(actor) => Some(actor),
342 Err(ActorError::Exists { .. }) => {
343 match ctx.get_child::<Governance>(&subject_id.to_string()).await
344 {
345 Ok(actor) => Some(actor),
346 Err(error) => {
347 cleanup_errors
348 .push(format!("governance lookup: {error}"));
349 None
350 }
351 }
352 }
353 Err(error) => {
354 cleanup_errors.push(format!("governance: {error}"));
355 None
356 }
357 };
358
359 if let Some(governance) = governance {
360 match governance
361 .ask(GovernanceMessage::DeleteGovernanceStorage)
362 .await
363 {
364 Ok(GovernanceResponse::Ok) => {}
365 Ok(other) => cleanup_errors
366 .push(format!("governance: unexpected response {other:?}")),
367 Err(error) => {
368 cleanup_errors.push(format!("governance: {error}"))
369 }
370 }
371
372 if let Err(error) = governance.ask_stop().await {
373 cleanup_errors.push(format!("governance stop: {error}"));
374 }
375 }
376
377 self.subjects.remove(&subject_id);
378
379 if cleanup_errors.is_empty() {
380 Ok(())
381 } else {
382 Err(ActorError::Functional {
383 description: cleanup_errors.join("; "),
384 })
385 }
386 }
387
388 async fn load_tracker(
389 &self,
390 ctx: &mut ActorContext<Self>,
391 subject_id: &DigestIdentifier,
392 ) -> Result<(), ActorError> {
393 let tracker_actor: ActorRef<Tracker> = ctx
394 .create_child(
395 &subject_id.to_string(),
396 Tracker::initial(InitParamsTracker {
397 data: None,
398 hash: self.hash,
399 is_service: self.is_service,
400 public_key: self.our_key.clone(),
401 }),
402 )
403 .await?;
404
405 self.run_tracker_sink(ctx, tracker_actor).await
406 }
407
408 async fn create_tracker(
409 &self,
410 ctx: &mut ActorContext<Self>,
411 subject_id: &DigestIdentifier,
412 metadata: crate::subject::Metadata,
413 ledger: SignedLedger,
414 ) -> Result<(), ActorError> {
415 let tracker_actor: ActorRef<Tracker> = ctx
416 .create_child(
417 &subject_id.to_string(),
418 Tracker::initial(InitParamsTracker {
419 data: Some(TrackerInit::from(&metadata)),
420 hash: self.hash,
421 is_service: self.is_service,
422 public_key: self.our_key.clone(),
423 }),
424 )
425 .await?;
426
427 self.run_tracker_sink(ctx, tracker_actor.clone()).await?;
428
429 if let Err(error) = tracker_actor
430 .ask(TrackerMessage::UpdateLedger {
431 events: vec![ledger],
432 })
433 .await
434 {
435 tracker_actor.tell_stop().await;
436 return Err(error);
437 }
438
439 self.register_subject_in_node(
440 ctx,
441 metadata.owner.clone(),
442 metadata.subject_id.clone(),
443 SubjectData::Tracker {
444 governance_id: metadata.governance_id.clone(),
445 schema_id: metadata.schema_id.clone(),
446 namespace: metadata.namespace.to_string(),
447 active: true,
448 },
449 )
450 .await?;
451
452 Ok(())
453 }
454
455 async fn create_governance(
456 &self,
457 ctx: &mut ActorContext<Self>,
458 subject_id: &DigestIdentifier,
459 metadata: crate::subject::Metadata,
460 ledger: SignedLedger,
461 ) -> Result<(), ActorError> {
462 let governance_data = serde_json::from_value::<GovernanceData>(
463 metadata.properties.0.clone(),
464 )
465 .map_err(|e| ActorError::Functional {
466 description: format!(
467 "Governance properties must be GovernanceData: {e}"
468 ),
469 })?;
470
471 if ctx
472 .get_child::<Governance>(&subject_id.to_string())
473 .await
474 .is_ok()
475 {
476 return Ok(());
477 }
478
479 let governance_actor: ActorRef<Governance> = ctx
480 .create_child(
481 &subject_id.to_string(),
482 Governance::initial((
483 Some((SubjectMetadata::new(&metadata), governance_data)),
484 self.our_key.clone(),
485 self.hash,
486 self.is_service,
487 )),
488 )
489 .await?;
490
491 self.run_governance_sink(ctx, governance_actor.clone())
492 .await?;
493
494 if let Err(error) = governance_actor
495 .ask(GovernanceMessage::UpdateLedger {
496 events: vec![ledger],
497 })
498 .await
499 {
500 governance_actor.tell_stop().await;
501 return Err(error);
502 }
503
504 self.register_subject_in_node(
505 ctx,
506 metadata.owner.clone(),
507 metadata.subject_id.clone(),
508 SubjectData::Governance { active: true },
509 )
510 .await?;
511
512 Ok(())
513 }
514
515 fn metadata_from_create_ledger(
516 ledger: &SignedLedger,
517 ) -> Result<crate::subject::Metadata, ActorError> {
518 match &ledger.content().protocols {
519 Protocols::Create { validation } => {
520 if let ValidationMetadata::Metadata(metadata) =
521 &validation.validation_metadata
522 {
523 Ok(*metadata.clone())
524 } else {
525 Err(ActorError::Functional {
526 description:
527 "Create validation metadata must be Metadata"
528 .to_owned(),
529 })
530 }
531 }
532 _ => Err(ActorError::Functional {
533 description:
534 "SubjectManager create flow requires a create ledger"
535 .to_owned(),
536 }),
537 }
538 }
539
540 async fn run_tracker_sink(
541 &self,
542 ctx: &ActorContext<Self>,
543 actor: ActorRef<Tracker>,
544 ) -> Result<(), ActorError> {
545 let safe_mode = if let Some(config) = ctx
546 .system()
547 .get_helper::<crate::system::ConfigHelper>("config")
548 .await
549 {
550 config.safe_mode
551 } else {
552 return Err(ActorError::Helper {
553 name: "config".to_owned(),
554 reason: "Not found".to_owned(),
555 });
556 };
557
558 if safe_mode {
559 return Ok(());
560 }
561
562 let Some(ext_db): Option<Arc<ExternalDB>> =
563 ctx.system().get_helper("ext_db").await
564 else {
565 return Err(ActorError::Helper {
566 name: "ext_db".to_owned(),
567 reason: "Not found".to_owned(),
568 });
569 };
570
571 let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
572 ctx.system().run_sink(sink).await;
573 Ok(())
574 }
575
576 async fn run_governance_sink(
577 &self,
578 ctx: &ActorContext<Self>,
579 actor: ActorRef<Governance>,
580 ) -> Result<(), ActorError> {
581 let safe_mode = if let Some(config) = ctx
582 .system()
583 .get_helper::<crate::system::ConfigHelper>("config")
584 .await
585 {
586 config.safe_mode
587 } else {
588 return Err(ActorError::Helper {
589 name: "config".to_owned(),
590 reason: "Not found".to_owned(),
591 });
592 };
593
594 if safe_mode {
595 return Ok(());
596 }
597
598 let Some(ext_db): Option<Arc<ExternalDB>> =
599 ctx.system().get_helper("ext_db").await
600 else {
601 return Err(ActorError::Helper {
602 name: "ext_db".to_owned(),
603 reason: "Not found".to_owned(),
604 });
605 };
606
607 let sink = Sink::new(actor.subscribe(), ext_db.get_subject());
608 ctx.system().run_sink(sink).await;
609 Ok(())
610 }
611
612 async fn register_subject_in_node(
613 &self,
614 ctx: &ActorContext<Self>,
615 owner: PublicKey,
616 subject_id: DigestIdentifier,
617 data: SubjectData,
618 ) -> Result<(), ActorError> {
619 let node = ctx.get_parent::<Node>().await?;
620 let response = node
621 .ask(NodeMessage::RegisterSubject {
622 owner,
623 subject_id,
624 data,
625 })
626 .await?;
627
628 match response {
629 NodeResponse::Ok => Ok(()),
630 _ => Err(ActorError::UnexpectedResponse {
631 path: ctx.path().parent(),
632 expected: "NodeResponse::Ok".to_owned(),
633 }),
634 }
635 }
636}
637
638#[async_trait]
639impl Actor for SubjectManager {
640 type Event = ();
641 type Message = SubjectManagerMessage;
642 type Response = SubjectManagerResponse;
643
644 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
645 parent_span.map_or_else(
646 || info_span!("SubjectManager"),
647 |parent_span| info_span!(parent: parent_span, "SubjectManager"),
648 )
649 }
650}
651
652impl NotPersistentActor for SubjectManager {}
653
654#[async_trait]
655impl Handler<Self> for SubjectManager {
656 async fn handle_message(
657 &mut self,
658 _sender: ActorPath,
659 msg: SubjectManagerMessage,
660 ctx: &mut ActorContext<Self>,
661 ) -> Result<SubjectManagerResponse, ActorError> {
662 match msg {
663 SubjectManagerMessage::UpGovernances { governance_ids } => {
664 debug!(
665 governance_count = governance_ids.len(),
666 "Governance bootstrap requested"
667 );
668 self.up_governances(ctx, governance_ids).await?;
669 Ok(SubjectManagerResponse::Up)
670 }
671 SubjectManagerMessage::Up {
672 subject_id,
673 requester,
674 create_ledger,
675 } => {
676 debug!(
677 subject_id = %subject_id,
678 requester = %requester,
679 "Subject up requested"
680 );
681 self.up(ctx, subject_id, requester, create_ledger).await?;
682 Ok(SubjectManagerResponse::Up)
683 }
684 SubjectManagerMessage::Finish {
685 subject_id,
686 requester,
687 } => {
688 debug!(
689 subject_id = %subject_id,
690 requester = %requester,
691 "Subject finish requested"
692 );
693 self.finish(ctx, subject_id, requester).await?;
694 Ok(SubjectManagerResponse::Finish)
695 }
696 SubjectManagerMessage::DeleteTracker { subject_id } => {
697 debug!(
698 subject_id = %subject_id,
699 "Tracker delete requested"
700 );
701 self.delete_tracker(ctx, subject_id).await?;
702 Ok(SubjectManagerResponse::DeleteTracker)
703 }
704 SubjectManagerMessage::DeleteGovernance { subject_id } => {
705 debug!(
706 subject_id = %subject_id,
707 "Governance delete requested"
708 );
709 self.delete_governance(ctx, subject_id).await?;
710 Ok(SubjectManagerResponse::DeleteGovernance)
711 }
712 }
713 }
714
715 async fn on_child_fault(
716 &mut self,
717 error: ActorError,
718 ctx: &mut ActorContext<Self>,
719 ) -> ave_actors::ChildAction {
720 error!(error = %error, "Child fault in subject manager");
721 ctx.system().crash_system();
722 ave_actors::ChildAction::Stop
723 }
724}