1use std::{collections::BTreeSet, sync::Arc};
2
3use crate::{
4 db::Storable,
5 governance::{
6 sn_register::{SnRegister, SnRegisterMessage},
7 subject_register::{SubjectRegister, SubjectRegisterMessage},
8 witnesses_register::{
9 WitnessesRegister, WitnessesRegisterMessage,
10 WitnessesRegisterResponse,
11 },
12 },
13 helpers::{db::ExternalDB, sink::AveSink},
14 model::{
15 common::{
16 TrackerEventVisibility, TrackerStoredVisibility,
17 TrackerVisibilityMode, TrackerVisibilityState, emit_fail,
18 get_last_event, purge_storage,
19 },
20 event::{Ledger, Protocols, ValidationMetadata},
21 },
22 node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
23 subject::{
24 DataForSink, EventLedgerDataForSink, Metadata, Subject,
25 SubjectMetadata,
26 error::SubjectError,
27 sinkdata::{SinkData, SinkDataMessage},
28 },
29 validation::request::LastData,
30};
31
32use ave_actors::{
33 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
34 Response, Sink,
35};
36use ave_common::{
37 Namespace, ValueWrapper,
38 identity::{DigestIdentifier, HashAlgorithm, PublicKey},
39 request::EventRequest,
40 response::SubjectDB,
41};
42
43use async_trait::async_trait;
44use ave_actors::{FullPersistence, PersistentActor};
45use borsh::{BorshDeserialize, BorshSerialize};
46use json_patch::{Patch, patch};
47use serde::{Deserialize, Serialize};
48use tracing::{Span, debug, error, info_span, warn};
49
50#[derive(Debug, Serialize, Deserialize, Clone)]
51pub struct Tracker {
52 #[serde(skip)]
53 pub our_key: Arc<PublicKey>,
54 #[serde(skip)]
55 pub service: bool,
56 #[serde(skip)]
57 pub only_clear_events: bool,
58 #[serde(skip)]
59 pub hash: Option<HashAlgorithm>,
60
61 pub subject_metadata: SubjectMetadata,
62 pub governance_id: DigestIdentifier,
63 pub namespace: Namespace,
65 pub genesis_gov_version: u64,
67 pub visibility_mode: TrackerVisibilityMode,
68 pub properties: ValueWrapper,
70}
71
72#[derive(Default)]
73pub struct TrackerInit {
74 pub subject_metadata: SubjectMetadata,
75 pub governance_id: DigestIdentifier,
76 pub namespace: Namespace,
77 pub genesis_gov_version: u64,
78 pub properties: ValueWrapper,
79}
80
81impl From<&Metadata> for TrackerInit {
82 fn from(value: &Metadata) -> Self {
83 Self {
84 subject_metadata: SubjectMetadata::new(value),
85 governance_id: value.governance_id.clone(),
86 namespace: value.namespace.clone(),
87 genesis_gov_version: value.genesis_gov_version,
88 properties: value.properties.clone(),
89 }
90 }
91}
92
93impl BorshSerialize for Tracker {
94 fn serialize<W: std::io::Write>(
95 &self,
96 writer: &mut W,
97 ) -> std::io::Result<()> {
98 BorshSerialize::serialize(&self.subject_metadata, writer)?;
100 BorshSerialize::serialize(&self.governance_id, writer)?;
101 BorshSerialize::serialize(&self.namespace, writer)?;
102 BorshSerialize::serialize(&self.genesis_gov_version, writer)?;
103 BorshSerialize::serialize(&self.visibility_mode, writer)?;
104 BorshSerialize::serialize(&self.properties, writer)?;
105
106 Ok(())
107 }
108}
109
110impl BorshDeserialize for Tracker {
111 fn deserialize_reader<R: std::io::Read>(
112 reader: &mut R,
113 ) -> std::io::Result<Self> {
114 let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
116 let governance_id = DigestIdentifier::deserialize_reader(reader)?;
117 let namespace = Namespace::deserialize_reader(reader)?;
118 let genesis_gov_version = u64::deserialize_reader(reader)?;
119 let visibility_mode =
120 TrackerVisibilityMode::deserialize_reader(reader)?;
121 let properties = ValueWrapper::deserialize_reader(reader)?;
122
123 let our_key = Arc::new(PublicKey::default());
126 let hash = None;
127
128 Ok(Self {
129 service: false,
130 only_clear_events: false,
131 hash,
132 our_key,
133 subject_metadata,
134 governance_id,
135 namespace,
136 genesis_gov_version,
137 visibility_mode,
138 properties,
139 })
140 }
141}
142
143#[async_trait]
144impl Subject for Tracker {
145 async fn update_sn(
146 &self,
147 ctx: &mut ActorContext<Self>,
148 ) -> Result<(), ActorError> {
149 let witnesses_register = ctx
150 .system()
151 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
152 "/user/node/subject_manager/{}/witnesses_register",
153 self.governance_id
154 )))
155 .await?;
156 witnesses_register
157 .tell(WitnessesRegisterMessage::UpdateSn {
158 subject_id: self.subject_metadata.subject_id.clone(),
159 sn: self.subject_metadata.sn,
160 })
161 .await
162 }
163
164 async fn eol(
165 &self,
166 ctx: &mut ActorContext<Self>,
167 ) -> Result<(), ActorError> {
168 let node_path = ActorPath::from("/user/node");
169 let node = ctx.system().get_actor::<Node>(&node_path).await?;
170 node.tell(NodeMessage::EOLSubject {
171 subject_id: self.subject_metadata.subject_id.clone(),
172 i_owner: *self.our_key == self.subject_metadata.owner,
173 })
174 .await
175 }
176
177 async fn reject(
178 &self,
179 ctx: &mut ActorContext<Self>,
180 gov_version: u64,
181 ) -> Result<(), ActorError> {
182 let node_path = ActorPath::from("/user/node");
183 let node = ctx.system().get_actor::<Node>(&node_path).await?;
184 node.tell(NodeMessage::RejectTransfer(
185 self.subject_metadata.subject_id.clone(),
186 ))
187 .await?;
188
189 let witnesses_register = ctx
190 .system()
191 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
192 "/user/node/subject_manager/{}/witnesses_register",
193 self.governance_id
194 )))
195 .await?;
196 witnesses_register
197 .tell(WitnessesRegisterMessage::Reject {
198 subject_id: self.subject_metadata.subject_id.clone(),
199 sn: self.subject_metadata.sn + 1,
200 gov_version,
201 })
202 .await
203 }
204
205 async fn confirm(
206 &self,
207 ctx: &mut ActorContext<Self>,
208 new_owner: PublicKey,
209 gov_version: u64,
210 ) -> Result<(), ActorError> {
211 let node_path = ActorPath::from("/user/node");
212 let node = ctx.system().get_actor::<Node>(&node_path).await?;
213 node.tell(NodeMessage::ConfirmTransfer(
214 self.subject_metadata.subject_id.clone(),
215 ))
216 .await?;
217
218 if self.service || *self.our_key == self.subject_metadata.owner {
219 let subject_register = ctx
220 .system()
221 .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
222 "/user/node/subject_manager/{}/subject_register",
223 self.governance_id
224 )))
225 .await?;
226
227 let _response = subject_register
228 .ask(SubjectRegisterMessage::UpdateSubject {
229 new_owner,
230 old_owner: self.subject_metadata.owner.clone(),
231 subject_id: self.subject_metadata.subject_id.clone(),
232 namespace: self.namespace.to_string(),
233 schema_id: self.subject_metadata.schema_id.clone(),
234 gov_version,
235 })
236 .await?;
237 }
238
239 let witnesses_register = ctx
240 .system()
241 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
242 "/user/node/subject_manager/{}/witnesses_register",
243 self.governance_id
244 )))
245 .await?;
246 witnesses_register
247 .tell(WitnessesRegisterMessage::Confirm {
248 subject_id: self.subject_metadata.subject_id.clone(),
249 sn: self.subject_metadata.sn + 1,
250 gov_version,
251 })
252 .await
253 }
254
255 async fn transfer(
256 &self,
257 ctx: &mut ActorContext<Self>,
258 new_owner: PublicKey,
259 gov_version: u64,
260 ) -> Result<(), ActorError> {
261 let node_path = ActorPath::from("/user/node");
262 let node = ctx.system().get_actor::<Node>(&node_path).await?;
263 node.tell(NodeMessage::TransferSubject(TransferSubject {
264 name: self.subject_metadata.name.clone(),
265 subject_id: self.subject_metadata.subject_id.clone(),
266 new_owner: new_owner.clone(),
267 actual_owner: self.subject_metadata.owner.clone(),
268 }))
269 .await?;
270
271 let witnesses_register = ctx
272 .system()
273 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
274 "/user/node/subject_manager/{}/witnesses_register",
275 self.governance_id
276 )))
277 .await?;
278 witnesses_register
279 .tell(WitnessesRegisterMessage::Transfer {
280 subject_id: self.subject_metadata.subject_id.clone(),
281 new_owner,
282 gov_version,
283 })
284 .await
285 }
286
287 async fn get_last_ledger(
288 &self,
289 ctx: &mut ActorContext<Self>,
290 ) -> Result<Option<Ledger>, ActorError> {
291 get_last_event(ctx).await
292 }
293
294 fn apply_patch(
295 &mut self,
296 json_patch: ValueWrapper,
297 ) -> Result<(), ActorError> {
298 let patch_json = serde_json::from_value::<Patch>(json_patch.0)
299 .map_err(|e| {
300 let error = SubjectError::PatchConversionFailed {
301 details: e.to_string(),
302 };
303 error!(
304 error = %e,
305 subject_id = %self.subject_metadata.subject_id,
306 "Failed to convert patch from JSON"
307 );
308 ActorError::Functional {
309 description: error.to_string(),
310 }
311 })?;
312
313 patch(&mut self.properties.0, &patch_json).map_err(|e| {
314 let error = SubjectError::PatchApplicationFailed {
315 details: e.to_string(),
316 };
317 error!(
318 error = %e,
319 subject_id = %self.subject_metadata.subject_id,
320 "Failed to apply patch to properties"
321 );
322 ActorError::Functional {
323 description: error.to_string(),
324 }
325 })?;
326
327 debug!(
328 subject_id = %self.subject_metadata.subject_id,
329 "Patch applied successfully"
330 );
331
332 Ok(())
333 }
334
335 async fn manager_new_ledger_events(
336 &mut self,
337 ctx: &mut ActorContext<Self>,
338 events: Vec<Ledger>,
339 ) -> Result<(), ActorError> {
340 let Some(hash) = self.hash else {
341 return Err(ActorError::FunctionalCritical {
342 description: "Can not obtain Hash".to_string(),
343 });
344 };
345
346 let current_sn = self.subject_metadata.sn;
347
348 if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
349 {
350 if let ActorError::Functional { description } = e.clone() {
351 warn!(
352 error = %description,
353 subject_id = %self.subject_metadata.subject_id,
354 sn = self.subject_metadata.sn,
355 "Error verifying new ledger events"
356 );
357
358 if self.subject_metadata.sn == 0 {
360 return Err(e);
361 }
362 } else {
363 error!(
364 error = %e,
365 subject_id = %self.subject_metadata.subject_id,
366 sn = self.subject_metadata.sn,
367 "Critical error verifying new ledger events"
368 );
369 return Err(e);
370 }
371 };
372
373 if current_sn < self.subject_metadata.sn || current_sn == 0 {
374 let subject_db = self.build_subject_db(ctx).await?;
375 Self::publish_sink(
376 ctx,
377 SinkDataMessage::UpdateState(Box::new(subject_db)),
378 )
379 .await?;
380
381 self.update_sn(ctx).await?;
382 }
383
384 Ok(())
385 }
386}
387
388impl Tracker {
389 const fn public_visibilities()
390 -> (TrackerStoredVisibility, TrackerEventVisibility) {
391 (
392 TrackerStoredVisibility::Full,
393 TrackerEventVisibility::NonFact,
394 )
395 }
396
397 fn fact_visibilities(
398 viewpoints: &BTreeSet<String>,
399 opaque: bool,
400 ) -> (TrackerStoredVisibility, TrackerEventVisibility) {
401 let event_visibility = TrackerEventVisibility::Fact(viewpoints.clone());
402
403 let stored_visibility = if opaque {
404 TrackerStoredVisibility::None
405 } else if viewpoints.is_empty() {
406 TrackerStoredVisibility::Full
407 } else {
408 TrackerStoredVisibility::Only(viewpoints.clone())
409 };
410
411 (stored_visibility, event_visibility)
412 }
413
414 const fn is_full(&self) -> bool {
415 matches!(self.visibility_mode, TrackerVisibilityMode::Full)
416 }
417
418 async fn record_visibility_event(
419 &self,
420 ctx: &ActorContext<Self>,
421 event: &Ledger,
422 ) -> Result<(), ActorError> {
423 let (stored_visibility, event_visibility, mode) = match &event.protocols
424 {
425 Protocols::Create { .. } => {
426 let (stored_visibility, event_visibility) =
427 Self::public_visibilities();
428 (
429 stored_visibility,
430 event_visibility,
431 TrackerVisibilityMode::Full,
432 )
433 }
434 Protocols::TrackerFactFull { event_request, .. } => {
435 let EventRequest::Fact(fact_request) = event_request.content()
436 else {
437 return Err(ActorError::Functional {
438 description:
439 "In fact event, event request must be Fact"
440 .to_owned(),
441 });
442 };
443 let (stored_visibility, event_visibility) =
444 Self::fact_visibilities(&fact_request.viewpoints, false);
445 (stored_visibility, event_visibility, self.visibility_mode)
446 }
447 Protocols::TrackerFactOpaque { evaluation, .. } => {
448 let (stored_visibility, event_visibility) =
449 Self::fact_visibilities(&evaluation.viewpoints, true);
450 let mode = if evaluation.is_ok() {
451 TrackerVisibilityMode::Opaque
452 } else {
453 self.visibility_mode
454 };
455 (stored_visibility, event_visibility, mode)
456 }
457 Protocols::Transfer { .. }
458 | Protocols::TrackerConfirm { .. }
459 | Protocols::Reject { .. }
460 | Protocols::EOL { .. } => {
461 let (stored_visibility, event_visibility) =
462 Self::public_visibilities();
463 (stored_visibility, event_visibility, self.visibility_mode)
464 }
465 _ => {
466 return Err(ActorError::Functional {
467 description: "Invalid protocol data for tracker visibility"
468 .to_owned(),
469 });
470 }
471 };
472
473 let witnesses_register = ctx
474 .system()
475 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
476 "/user/node/subject_manager/{}/witnesses_register",
477 self.governance_id
478 )))
479 .await?;
480
481 witnesses_register
482 .tell(WitnessesRegisterMessage::UpdateTrackerVisibility {
483 subject_id: self.subject_metadata.subject_id.clone(),
484 sn: event.sn,
485 mode,
486 stored_visibility,
487 event_visibility,
488 })
489 .await
490 }
491
492 async fn get_tracker_visibility_state(
493 &self,
494 ctx: &ActorContext<Self>,
495 ) -> Result<TrackerVisibilityState, ActorError> {
496 let witnesses_register = ctx
497 .system()
498 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
499 "/user/node/subject_manager/{}/witnesses_register",
500 self.governance_id
501 )))
502 .await?;
503
504 let response = witnesses_register
505 .ask(WitnessesRegisterMessage::GetTrackerVisibilityState {
506 subject_id: self.subject_metadata.subject_id.clone(),
507 })
508 .await?;
509
510 match response {
511 WitnessesRegisterResponse::TrackerVisibilityState { state } => {
512 Ok(state)
513 }
514 _ => Err(ActorError::UnexpectedResponse {
515 path: ActorPath::from(format!(
516 "/user/node/subject_manager/{}/witnesses_register",
517 self.governance_id
518 )),
519 expected: "WitnessesRegisterResponse::TrackerVisibilityState"
520 .to_owned(),
521 }),
522 }
523 }
524
525 async fn build_subject_db(
526 &self,
527 ctx: &ActorContext<Self>,
528 ) -> Result<SubjectDB, ActorError> {
529 let visibility_state = self.get_tracker_visibility_state(ctx).await?;
530
531 Ok(SubjectDB {
532 name: self.subject_metadata.name.clone(),
533 description: self.subject_metadata.description.clone(),
534 subject_id: self.subject_metadata.subject_id.to_string(),
535 governance_id: self.governance_id.to_string(),
536 genesis_gov_version: self.genesis_gov_version,
537 prev_ledger_event_hash: if self
538 .subject_metadata
539 .prev_ledger_event_hash
540 .is_empty()
541 {
542 None
543 } else {
544 Some(self.subject_metadata.prev_ledger_event_hash.to_string())
545 },
546 schema_id: self.subject_metadata.schema_id.to_string(),
547 namespace: self.namespace.to_string(),
548 sn: self.subject_metadata.sn,
549 creator: self.subject_metadata.creator.to_string(),
550 owner: self.subject_metadata.owner.to_string(),
551 new_owner: self
552 .subject_metadata
553 .new_owner
554 .clone()
555 .map(|owner| owner.to_string()),
556 active: self.subject_metadata.active,
557 tracker_visibility: Some(visibility_state.into()),
558 properties: self.properties.0.clone(),
559 })
560 }
561
562 async fn create(
563 &self,
564 ctx: &ActorContext<Self>,
565 gov_version: u64,
566 ) -> Result<(), ActorError> {
567 let sn_register = ctx
568 .system()
569 .get_actor::<SnRegister>(&ActorPath::from(format!(
570 "/user/node/subject_manager/{}/sn_register",
571 self.governance_id
572 )))
573 .await?;
574
575 sn_register
576 .tell(SnRegisterMessage::RegisterSn {
577 subject_id: self.subject_metadata.subject_id.clone(),
578 gov_version,
579 sn: 0,
580 })
581 .await?;
582
583 if self.service || *self.our_key == self.subject_metadata.owner {
584 let subject_register = ctx
585 .system()
586 .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
587 "/user/node/subject_manager/{}/subject_register",
588 self.governance_id
589 )))
590 .await?;
591
592 let _response = subject_register
593 .ask(SubjectRegisterMessage::CreateSubject {
594 creator: self.subject_metadata.owner.clone(),
595 subject_id: self.subject_metadata.subject_id.clone(),
596 namespace: self.namespace.to_string(),
597 schema_id: self.subject_metadata.schema_id.clone(),
598 gov_version,
599 })
600 .await?;
601 }
602
603 let witnesses_register = ctx
604 .system()
605 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
606 "/user/node/subject_manager/{}/witnesses_register",
607 self.governance_id
608 )))
609 .await?;
610
611 witnesses_register
612 .tell(WitnessesRegisterMessage::Create {
613 subject_id: self.subject_metadata.subject_id.clone(),
614 gov_version,
615 owner: self.subject_metadata.owner.clone(),
616 })
617 .await
618 }
619
620 async fn register_gov_version_sn(
621 &self,
622 ctx: &ActorContext<Self>,
623 gov_version: u64,
624 ) -> Result<(), ActorError> {
625 let sn_register = ctx
626 .system()
627 .get_actor::<SnRegister>(&ActorPath::from(format!(
628 "/user/node/subject_manager/{}/sn_register",
629 self.governance_id
630 )))
631 .await?;
632
633 sn_register
634 .tell(SnRegisterMessage::RegisterSn {
635 subject_id: self.subject_metadata.subject_id.clone(),
636 gov_version,
637 sn: self.subject_metadata.sn,
638 })
639 .await
640 }
641
642 async fn verify_new_ledger_events(
643 &mut self,
644 ctx: &mut ActorContext<Self>,
645 events: Vec<Ledger>,
646 hash: &HashAlgorithm,
647 ) -> Result<(), ActorError> {
648 let mut iter = events.into_iter();
649 let last_ledger = get_last_event(ctx).await?;
650
651 let Some(first) = iter.next() else {
652 return Ok(());
653 };
654
655 let mut pending = Vec::new();
656
657 let mut last_ledger = if let Some(last_ledger) = last_ledger {
658 pending.push(first);
659 last_ledger
660 } else {
661 if let Err(e) = Self::verify_first_ledger_event(
662 ctx,
663 &first,
664 hash,
665 Metadata::from(self.clone()),
666 )
667 .await
668 {
669 return Err(ActorError::Functional {
670 description: e.to_string(),
671 });
672 }
673
674 self.create(ctx, first.gov_version).await?;
675
676 self.on_event(first.clone(), ctx).await;
677 self.record_visibility_event(ctx, &first).await?;
678
679 Self::register(
680 ctx,
681 RegisterMessage::RegisterSubj {
682 gov_id: self.governance_id.to_string(),
683 subject_id: self.subject_metadata.subject_id.to_string(),
684 schema_id: self.subject_metadata.schema_id.clone(),
685 namespace: self.namespace.to_string(),
686 name: self.subject_metadata.name.clone(),
687 description: self.subject_metadata.description.clone(),
688 },
689 )
690 .await?;
691
692 let (issuer, event_request_timestamp) =
693 first.get_issuer_event_request_timestamp();
694 let event_request = first.get_event_request();
695
696 Self::event_to_sink(
697 ctx,
698 DataForSink {
699 gov_id: Some(self.governance_id.to_string()),
700 subject_id: self.subject_metadata.subject_id.to_string(),
701 sn: self.subject_metadata.sn,
702 owner: self.subject_metadata.owner.to_string(),
703 namespace: self.namespace.to_string(),
704 schema_id: self.subject_metadata.schema_id.clone(),
705 issuer,
706 event_ledger_timestamp: first
707 .ledger_seal_signature
708 .timestamp
709 .as_nanos(),
710 event_request_timestamp,
711 gov_version: first.gov_version,
712 event_data_ledger: EventLedgerDataForSink::build(
713 &first.protocols,
714 &self.properties.0,
715 ),
716 },
717 event_request,
718 )
719 .await?;
720
721 first
722 };
723
724 pending.extend(iter);
725
726 for event in pending {
727 let actual_ledger_hash =
728 last_ledger.ledger_hash(*hash).map_err(|e| {
729 ActorError::FunctionalCritical {
730 description: format!(
731 "Can not creacte actual ledger event hash: {}",
732 e
733 ),
734 }
735 })?;
736
737 let last_data = LastData {
738 gov_version: last_ledger.gov_version,
739 vali_data: last_ledger.protocols.get_validation_data(),
740 };
741
742 let last_gov_version = last_data.gov_version;
743
744 let last_event_is_ok = match Self::verify_new_ledger_event(
745 ctx,
746 Self::verify_new_ledger_event_args(
747 &event,
748 Metadata::from(self.clone()),
749 actual_ledger_hash,
750 last_data,
751 hash,
752 self.is_full(),
753 self.only_clear_events,
754 ),
755 )
756 .await
757 {
758 Ok(last_event_is_ok) => last_event_is_ok,
759 Err(e) => {
760 if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
762 continue;
764 } else {
765 return Err(ActorError::Functional {
766 description: e.to_string(),
767 });
768 }
769 }
770 };
771
772 let event_gov_version = event.gov_version;
773
774 let event_request = event.get_event_request();
775
776 if last_event_is_ok {
777 if last_gov_version != event_gov_version {
778 self.register_gov_version_sn(ctx, last_gov_version).await?;
779 }
780 if let Some(event_request) = &event_request {
781 match event_request {
782 EventRequest::Transfer(transfer_request) => {
783 self.transfer(
784 ctx,
785 transfer_request.new_owner.clone(),
786 event.gov_version,
787 )
788 .await?;
789 }
790 EventRequest::Reject(..) => {
791 self.reject(ctx, event.gov_version).await?;
792 }
793 EventRequest::Confirm(..) => {
794 self.confirm(
795 ctx,
796 event.ledger_seal_signature.signer.clone(),
797 event.gov_version,
798 )
799 .await?;
800 }
801 EventRequest::EOL(..) => {
802 self.eol(ctx).await?;
803
804 Self::register(
805 ctx,
806 RegisterMessage::EOLSubj {
807 gov_id: self.governance_id.to_string(),
808 subj_id: self
809 .subject_metadata
810 .subject_id
811 .to_string(),
812 },
813 )
814 .await?
815 }
816 _ => {}
817 };
818 }
819 }
820
821 self.on_event(event.clone(), ctx).await;
823 self.record_visibility_event(ctx, &event).await?;
824
825 let (issuer, event_request_timestamp) =
826 event.get_issuer_event_request_timestamp();
827 Self::event_to_sink(
828 ctx,
829 DataForSink {
830 gov_id: Some(self.governance_id.to_string()),
831 subject_id: self.subject_metadata.subject_id.to_string(),
832 sn: self.subject_metadata.sn,
833 owner: self.subject_metadata.owner.to_string(),
834 namespace: self.namespace.to_string(),
835 schema_id: self.subject_metadata.schema_id.clone(),
836 issuer,
837 event_ledger_timestamp: event
838 .ledger_seal_signature
839 .timestamp
840 .as_nanos(),
841 event_request_timestamp,
842 gov_version: event.gov_version,
843 event_data_ledger: EventLedgerDataForSink::build(
844 &event.protocols,
845 &self.properties.0,
846 ),
847 },
848 event_request,
849 )
850 .await?;
851
852 self.register_gov_version_sn(ctx, event_gov_version).await?;
857
858 last_ledger = event.clone();
860 }
861
862 Ok(())
863 }
864}
865
866#[derive(Debug, Clone)]
867pub enum TrackerMessage {
868 GetMetadata,
869 GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
870 GetLastLedger,
871 PurgeStorage,
872 UpdateLedger { events: Vec<Ledger> },
873}
874
875impl Message for TrackerMessage {}
876
877#[derive(Debug, Clone)]
878pub enum TrackerResponse {
879 Metadata(Box<Metadata>),
881 UpdateResult(u64, PublicKey, Option<PublicKey>),
882 Ledger {
883 ledger: Vec<Ledger>,
884 is_all: bool,
885 },
886 LastLedger {
887 ledger_event: Box<Option<Ledger>>,
888 },
889 Sn(u64),
890 Ok,
891}
892impl Response for TrackerResponse {}
893
894#[async_trait]
895impl Actor for Tracker {
896 type Event = Ledger;
897 type Message = TrackerMessage;
898 type Response = TrackerResponse;
899
900 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
901 parent_span.map_or_else(
902 || info_span!("Tracker", id),
903 |parent_span| info_span!(parent: parent_span, "Tracker", id),
904 )
905 }
906
907 async fn pre_start(
908 &mut self,
909 ctx: &mut ActorContext<Self>,
910 ) -> Result<(), ActorError> {
911 if let Err(e) = self.init_store("tracker", None, true, ctx).await {
912 error!(
913 error = %e,
914 "Failed to initialize tracker store"
915 );
916 return Err(e);
917 }
918
919 let Some(config): Option<crate::system::ConfigHelper> =
920 ctx.system().get_helper("config").await
921 else {
922 return Err(ActorError::Helper {
923 name: "config".to_owned(),
924 reason: "Not found".to_owned(),
925 });
926 };
927
928 if config.safe_mode {
929 return Ok(());
930 }
931
932 let our_key = self.our_key.clone();
933
934 if self.subject_metadata.active {
935 let Some(ext_db): Option<Arc<ExternalDB>> =
936 ctx.system().get_helper("ext_db").await
937 else {
938 error!("External database helper not found");
939 return Err(ActorError::Helper {
940 name: "ext_db".to_owned(),
941 reason: "Not found".to_owned(),
942 });
943 };
944
945 let Some(ave_sink): Option<AveSink> =
946 ctx.system().get_helper("sink").await
947 else {
948 error!("Sink helper not found");
949 return Err(ActorError::Helper {
950 name: "sink".to_owned(),
951 reason: "Not found".to_owned(),
952 });
953 };
954
955 let sink_actor = match ctx
956 .create_child(
957 "sink_data",
958 SinkData {
959 public_key: our_key.to_string(),
960 },
961 )
962 .await
963 {
964 Ok(actor) => actor,
965 Err(e) => {
966 error!(
967 error = %e,
968 "Failed to create sink_data child"
969 );
970 return Err(e);
971 }
972 };
973 let sink =
974 Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
975 ctx.system().run_sink(sink).await;
976
977 let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
978 ctx.system().run_sink(sink).await;
979 }
980
981 Ok(())
982 }
983}
984
985#[async_trait]
986impl Handler<Self> for Tracker {
987 async fn handle_message(
988 &mut self,
989 _sender: ActorPath,
990 msg: TrackerMessage,
991 ctx: &mut ActorContext<Self>,
992 ) -> Result<TrackerResponse, ActorError> {
993 match msg {
994 TrackerMessage::GetLedger { lo_sn, hi_sn } => {
995 let (ledger, is_all) =
996 self.get_ledger(ctx, lo_sn, hi_sn).await?;
997 Ok(TrackerResponse::Ledger { ledger, is_all })
998 }
999 TrackerMessage::GetLastLedger => {
1000 let ledger_event = self.get_last_ledger(ctx).await?;
1001 Ok(TrackerResponse::LastLedger {
1002 ledger_event: Box::new(ledger_event),
1003 })
1004 }
1005 TrackerMessage::GetMetadata => Ok(TrackerResponse::Metadata(
1006 Box::new(Metadata::from(self.clone())),
1007 )),
1008 TrackerMessage::PurgeStorage => {
1009 purge_storage(ctx).await?;
1010
1011 debug!(
1012 msg_type = "PurgeStorage",
1013 subject_id = %self.subject_metadata.subject_id,
1014 "Tracker storage purged"
1015 );
1016
1017 Ok(TrackerResponse::Ok)
1018 }
1019 TrackerMessage::UpdateLedger { events } => {
1020 let events_count = events.len();
1021 if let Err(e) =
1022 self.manager_new_ledger_events(ctx, events).await
1023 {
1024 warn!(
1025 msg_type = "UpdateLedger",
1026 error = %e,
1027 subject_id = %self.subject_metadata.subject_id,
1028 events_count = events_count,
1029 "Failed to verify new ledger events"
1030 );
1031 return Err(e);
1032 };
1033
1034 debug!(
1035 msg_type = "UpdateLedger",
1036 subject_id = %self.subject_metadata.subject_id,
1037 sn = self.subject_metadata.sn,
1038 events_count = events_count,
1039 "Ledger updated successfully"
1040 );
1041
1042 Ok(TrackerResponse::UpdateResult(
1043 self.subject_metadata.sn,
1044 self.subject_metadata.owner.clone(),
1045 self.subject_metadata.new_owner.clone(),
1046 ))
1047 }
1048 }
1049 }
1050
1051 async fn on_event(&mut self, event: Ledger, ctx: &mut ActorContext<Self>) {
1052 if let Err(e) = self.persist(&event, ctx).await {
1053 error!(
1054 error = %e,
1055 subject_id = %self.subject_metadata.subject_id,
1056 sn = self.subject_metadata.sn,
1057 "Failed to persist event"
1058 );
1059 emit_fail(ctx, e).await;
1060 };
1061
1062 if let Err(e) = ctx.publish_event(event.clone()).await {
1063 error!(
1064 error = %e,
1065 subject_id = %self.subject_metadata.subject_id,
1066 sn = self.subject_metadata.sn,
1067 "Failed to publish event"
1068 );
1069 emit_fail(ctx, e).await;
1070 } else {
1071 debug!(
1072 subject_id = %self.subject_metadata.subject_id,
1073 sn = self.subject_metadata.sn,
1074 "Event persisted and published successfully"
1075 );
1076 }
1077 }
1078
1079 async fn on_child_fault(
1080 &mut self,
1081 error: ActorError,
1082 ctx: &mut ActorContext<Self>,
1083 ) -> ChildAction {
1084 error!(
1085 subject_id = %self.subject_metadata.subject_id,
1086 sn = self.subject_metadata.sn,
1087 error = %error,
1088 "Child fault in tracker"
1089 );
1090 emit_fail(ctx, error).await;
1091 ChildAction::Stop
1092 }
1093}
1094
1095pub struct InitParamsTracker {
1096 pub data: Option<TrackerInit>,
1097 pub public_key: Arc<PublicKey>,
1098 pub hash: HashAlgorithm,
1099 pub is_service: bool,
1100 pub only_clear_events: bool,
1101}
1102
1103#[async_trait]
1104impl PersistentActor for Tracker {
1105 type Persistence = FullPersistence;
1106 type InitParams = InitParamsTracker;
1107
1108 fn update(&mut self, state: Self) {
1109 self.properties = state.properties;
1110 self.visibility_mode = state.visibility_mode;
1111 self.governance_id = state.governance_id;
1112 self.namespace = state.namespace;
1113 self.genesis_gov_version = state.genesis_gov_version;
1114 self.subject_metadata = state.subject_metadata;
1115 }
1116
1117 fn create_initial(params: Self::InitParams) -> Self {
1118 let init = params.data.unwrap_or_default();
1119
1120 Self {
1121 service: params.is_service,
1122 only_clear_events: params.only_clear_events,
1123 hash: Some(params.hash),
1124 our_key: params.public_key,
1125 subject_metadata: init.subject_metadata,
1126 properties: init.properties,
1127 genesis_gov_version: init.genesis_gov_version,
1128 governance_id: init.governance_id,
1129 namespace: init.namespace,
1130 visibility_mode: TrackerVisibilityMode::Full,
1131 }
1132 }
1133
1134 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1135 match &event.protocols {
1136 Protocols::Create {
1137 validation,
1138 event_request,
1139 } => {
1140 if let EventRequest::Create(..) = event_request.content() {
1141 } else {
1142 error!(
1143 event_type = "Create",
1144 subject_id = %self.subject_metadata.subject_id,
1145 actual_request = ?event_request.content(),
1146 "Unexpected event request type for tracker create apply"
1147 );
1148 return Err(ActorError::Functional {
1149 description:
1150 "In create event, event request must be Create"
1151 .to_owned(),
1152 });
1153 }
1154
1155 if let ValidationMetadata::Metadata(metadata) =
1156 &validation.validation_metadata
1157 {
1158 self.subject_metadata = SubjectMetadata::new(metadata);
1159 self.properties = metadata.properties.clone();
1160 self.visibility_mode = TrackerVisibilityMode::Full;
1161
1162 debug!(
1163 event_type = "Create",
1164 subject_id = %self.subject_metadata.subject_id,
1165 sn = self.subject_metadata.sn,
1166 "Applied create event"
1167 );
1168 } else {
1169 error!(
1170 event_type = "Create",
1171 "Validation metadata must be Metadata type"
1172 );
1173 return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
1174 }
1175
1176 return Ok(());
1177 }
1178 Protocols::TrackerFactFull {
1179 evaluation,
1180 event_request,
1181 ..
1182 } => {
1183 let EventRequest::Fact(_fact_request) = event_request.content()
1184 else {
1185 error!(
1186 event_type = "Fact",
1187 subject_id = %self.subject_metadata.subject_id,
1188 actual_request = ?event_request.content(),
1189 "Unexpected event request type for tracker fact apply"
1190 );
1191 return Err(ActorError::Functional {
1192 description:
1193 "In fact event, event request must be Fact"
1194 .to_owned(),
1195 });
1196 };
1197
1198 if let Some(eval_res) = evaluation.evaluator_response_ok() {
1199 if self.is_full() {
1200 self.apply_patch(eval_res.patch)?;
1201 debug!(
1202 event_type = "Fact",
1203 subject_id = %self.subject_metadata.subject_id,
1204 "Applied fact event with patch"
1205 );
1206 } else {
1207 debug!(
1208 event_type = "Fact",
1209 subject_id = %self.subject_metadata.subject_id,
1210 "Tracker is not in full mode, fact patch not applied"
1211 );
1212 }
1213 }
1214 }
1215 Protocols::TrackerFactOpaque { evaluation, .. } => {
1216 if evaluation.is_ok() {
1217 self.visibility_mode = TrackerVisibilityMode::Opaque;
1218 }
1219 debug!(
1220 event_type = "FactOpaque",
1221 subject_id = %self.subject_metadata.subject_id,
1222 "Applied tracker opaque fact event"
1223 );
1224 }
1225 Protocols::Transfer {
1226 evaluation,
1227 event_request,
1228 ..
1229 } => {
1230 let EventRequest::Transfer(transfer_request) =
1231 event_request.content()
1232 else {
1233 error!(
1234 event_type = "Transfer",
1235 subject_id = %self.subject_metadata.subject_id,
1236 actual_request = ?event_request.content(),
1237 "Unexpected event request type for tracker transfer apply"
1238 );
1239 return Err(ActorError::Functional {
1240 description:
1241 "In transfer event, event request must be Transfer"
1242 .to_owned(),
1243 });
1244 };
1245
1246 if evaluation.evaluator_response_ok().is_some() {
1247 if self.is_full()
1248 && let Some(eval_res) =
1249 evaluation.evaluator_response_ok()
1250 {
1251 self.apply_patch(eval_res.patch)?;
1252 } else if !self.is_full() {
1253 debug!(
1254 event_type = "Transfer",
1255 subject_id = %self.subject_metadata.subject_id,
1256 "Tracker is not in full mode, transfer patch not applied"
1257 );
1258 }
1259 self.subject_metadata.new_owner =
1260 Some(transfer_request.new_owner.clone());
1261 debug!(
1262 event_type = "Transfer",
1263 subject_id = %self.subject_metadata.subject_id,
1264 new_owner = %transfer_request.new_owner,
1265 "Applied transfer event"
1266 );
1267 }
1268 }
1269 Protocols::TrackerConfirm { event_request, .. } => {
1270 if let EventRequest::Confirm(..) = event_request.content() {
1271 } else {
1272 error!(
1273 event_type = "Confirm",
1274 subject_id = %self.subject_metadata.subject_id,
1275 actual_request = ?event_request.content(),
1276 "Unexpected event request type for tracker confirm apply"
1277 );
1278 return Err(ActorError::Functional {
1279 description:
1280 "In confirm event, event request must be Confirm"
1281 .to_owned(),
1282 });
1283 }
1284
1285 if let Some(new_owner) = self.subject_metadata.new_owner.take()
1286 {
1287 self.subject_metadata.owner = new_owner.clone();
1288 debug!(
1289 event_type = "Confirm",
1290 subject_id = %self.subject_metadata.subject_id,
1291 new_owner = %new_owner,
1292 "Applied confirm event"
1293 );
1294 } else {
1295 error!(
1296 event_type = "Confirm",
1297 subject_id = %self.subject_metadata.subject_id,
1298 "New owner is None in confirm event"
1299 );
1300 return Err(ActorError::Functional {
1301 description: "In confirm event, new owner is None"
1302 .to_owned(),
1303 });
1304 }
1305 }
1306 Protocols::Reject { event_request, .. } => {
1307 if let EventRequest::Reject(..) = event_request.content() {
1308 } else {
1309 error!(
1310 event_type = "Reject",
1311 subject_id = %self.subject_metadata.subject_id,
1312 actual_request = ?event_request.content(),
1313 "Unexpected event request type for tracker reject apply"
1314 );
1315 return Err(ActorError::Functional {
1316 description:
1317 "In reject event, event request must be Reject"
1318 .to_owned(),
1319 });
1320 }
1321
1322 self.subject_metadata.new_owner = None;
1323 debug!(
1324 event_type = "Reject",
1325 subject_id = %self.subject_metadata.subject_id,
1326 "Applied reject event"
1327 );
1328 }
1329 Protocols::EOL { event_request, .. } => {
1330 if let EventRequest::EOL(..) = event_request.content() {
1331 } else {
1332 error!(
1333 event_type = "EOL",
1334 subject_id = %self.subject_metadata.subject_id,
1335 actual_request = ?event_request.content(),
1336 "Unexpected event request type for tracker eol apply"
1337 );
1338 return Err(ActorError::Functional {
1339 description: "In EOL event, event request must be EOL"
1340 .to_owned(),
1341 });
1342 }
1343
1344 self.subject_metadata.active = false;
1345 debug!(
1346 event_type = "EOL",
1347 subject_id = %self.subject_metadata.subject_id,
1348 "Applied EOL event"
1349 );
1350 }
1351 _ => {
1352 error!(
1353 subject_id = %self.subject_metadata.subject_id,
1354 "Invalid protocol data for Tracker"
1355 );
1356 return Err(ActorError::Functional {
1357 description:
1358 "Protocols data is for Governance and this is a Tracker"
1359 .to_owned(),
1360 });
1361 }
1362 }
1363
1364 self.subject_metadata.sn += 1;
1365 self.subject_metadata.prev_ledger_event_hash =
1366 event.prev_ledger_event_hash.clone();
1367
1368 Ok(())
1369 }
1370}
1371
1372impl Storable for Tracker {}