1use std::sync::Arc;
2
3use crate::{
4 db::Storable,
5 governance::{
6 sn_register::{SnRegister, SnRegisterMessage},
7 subject_register::{SubjectRegister, SubjectRegisterMessage},
8 witnesses_register::{WitnessesRegister, WitnessesRegisterMessage},
9 },
10 helpers::{db::ExternalDB, sink::AveSink},
11 model::{
12 common::{emit_fail, get_last_event, purge_storage},
13 event::{Protocols, ValidationMetadata},
14 },
15 node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
16 subject::{
17 DataForSink, EventLedgerDataForSink, Metadata, SignedLedger, Subject,
18 SubjectMetadata,
19 error::SubjectError,
20 sinkdata::{SinkData, SinkDataMessage},
21 },
22 validation::request::LastData,
23};
24
25use ave_actors::{
26 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
27 Response, Sink,
28};
29use ave_common::{
30 Namespace, ValueWrapper,
31 identity::{DigestIdentifier, HashAlgorithm, PublicKey, hash_borsh},
32 request::EventRequest,
33};
34
35use async_trait::async_trait;
36use ave_actors::{FullPersistence, PersistentActor};
37use borsh::{BorshDeserialize, BorshSerialize};
38use json_patch::{Patch, patch};
39use serde::{Deserialize, Serialize};
40use tracing::{Span, debug, error, info_span, warn};
41
42#[derive(Default, Debug, Serialize, Deserialize, Clone)]
43pub struct Tracker {
44 #[serde(skip)]
45 pub our_key: Arc<PublicKey>,
46 #[serde(skip)]
47 pub service: bool,
48 #[serde(skip)]
49 pub hash: Option<HashAlgorithm>,
50
51 pub subject_metadata: SubjectMetadata,
52 pub governance_id: DigestIdentifier,
53 pub namespace: Namespace,
55 pub genesis_gov_version: u64,
57 pub properties: ValueWrapper,
59}
60
61#[derive(Default)]
62pub struct TrackerInit {
63 pub subject_metadata: SubjectMetadata,
64 pub governance_id: DigestIdentifier,
65 pub namespace: Namespace,
66 pub genesis_gov_version: u64,
67 pub properties: ValueWrapper,
68}
69
70impl From<&Metadata> for TrackerInit {
71 fn from(value: &Metadata) -> Self {
72 Self {
73 subject_metadata: SubjectMetadata::new(value),
74 governance_id: value.governance_id.clone(),
75 namespace: value.namespace.clone(),
76 genesis_gov_version: value.genesis_gov_version,
77 properties: value.properties.clone(),
78 }
79 }
80}
81
82impl BorshSerialize for Tracker {
83 fn serialize<W: std::io::Write>(
84 &self,
85 writer: &mut W,
86 ) -> std::io::Result<()> {
87 BorshSerialize::serialize(&self.subject_metadata, writer)?;
89 BorshSerialize::serialize(&self.governance_id, writer)?;
90 BorshSerialize::serialize(&self.namespace, writer)?;
91 BorshSerialize::serialize(&self.genesis_gov_version, writer)?;
92 BorshSerialize::serialize(&self.properties, writer)?;
93
94 Ok(())
95 }
96}
97
98impl BorshDeserialize for Tracker {
99 fn deserialize_reader<R: std::io::Read>(
100 reader: &mut R,
101 ) -> std::io::Result<Self> {
102 let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
104 let governance_id = DigestIdentifier::deserialize_reader(reader)?;
105 let namespace = Namespace::deserialize_reader(reader)?;
106 let genesis_gov_version = u64::deserialize_reader(reader)?;
107 let properties = ValueWrapper::deserialize_reader(reader)?;
108
109 let our_key = Arc::new(PublicKey::default());
112 let hash = None;
113
114 Ok(Self {
115 service: false,
116 hash,
117 our_key,
118 subject_metadata,
119 governance_id,
120 namespace,
121 genesis_gov_version,
122 properties,
123 })
124 }
125}
126
127#[async_trait]
128impl Subject for Tracker {
129 async fn update_sn(
130 &self,
131 ctx: &mut ActorContext<Self>,
132 ) -> Result<(), ActorError> {
133 let witnesses_register = ctx
134 .system()
135 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
136 "/user/node/subject_manager/{}/witnesses_register",
137 self.governance_id
138 )))
139 .await?;
140 witnesses_register
141 .tell(WitnessesRegisterMessage::UpdateSn {
142 subject_id: self.subject_metadata.subject_id.clone(),
143 sn: self.subject_metadata.sn,
144 })
145 .await
146 }
147
148 async fn eol(
149 &self,
150 ctx: &mut ActorContext<Self>,
151 ) -> Result<(), ActorError> {
152 let node_path = ActorPath::from("/user/node");
153 let node = ctx.system().get_actor::<Node>(&node_path).await?;
154 node.tell(NodeMessage::EOLSubject {
155 subject_id: self.subject_metadata.subject_id.clone(),
156 i_owner: *self.our_key == self.subject_metadata.owner,
157 })
158 .await
159 }
160
161 async fn reject(
162 &self,
163 ctx: &mut ActorContext<Self>,
164 gov_version: u64,
165 ) -> Result<(), ActorError> {
166 let node_path = ActorPath::from("/user/node");
167 let node = ctx.system().get_actor::<Node>(&node_path).await?;
168 node.tell(NodeMessage::RejectTransfer(
169 self.subject_metadata.subject_id.clone(),
170 ))
171 .await?;
172
173 let witnesses_register = ctx
174 .system()
175 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
176 "/user/node/subject_manager/{}/witnesses_register",
177 self.governance_id
178 )))
179 .await?;
180 witnesses_register
181 .tell(WitnessesRegisterMessage::Reject {
182 subject_id: self.subject_metadata.subject_id.clone(),
183 sn: self.subject_metadata.sn + 1,
184 gov_version,
185 })
186 .await
187 }
188
189 async fn confirm(
190 &self,
191 ctx: &mut ActorContext<Self>,
192 new_owner: PublicKey,
193 gov_version: u64,
194 ) -> Result<(), ActorError> {
195 let node_path = ActorPath::from("/user/node");
196 let node = ctx.system().get_actor::<Node>(&node_path).await?;
197 node.tell(NodeMessage::ConfirmTransfer(
198 self.subject_metadata.subject_id.clone(),
199 ))
200 .await?;
201
202 if self.service || *self.our_key == self.subject_metadata.owner {
203 let subject_register = ctx
204 .system()
205 .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
206 "/user/node/subject_manager/{}/subject_register",
207 self.governance_id
208 )))
209 .await?;
210
211 let _response = subject_register
212 .ask(SubjectRegisterMessage::UpdateSubject {
213 new_owner,
214 old_owner: self.subject_metadata.owner.clone(),
215 subject_id: self.subject_metadata.subject_id.clone(),
216 namespace: self.namespace.to_string(),
217 schema_id: self.subject_metadata.schema_id.clone(),
218 gov_version,
219 })
220 .await?;
221 }
222
223 let witnesses_register = ctx
224 .system()
225 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
226 "/user/node/subject_manager/{}/witnesses_register",
227 self.governance_id
228 )))
229 .await?;
230 witnesses_register
231 .tell(WitnessesRegisterMessage::Confirm {
232 subject_id: self.subject_metadata.subject_id.clone(),
233 sn: self.subject_metadata.sn + 1,
234 gov_version,
235 })
236 .await
237 }
238
239 async fn transfer(
240 &self,
241 ctx: &mut ActorContext<Self>,
242 new_owner: PublicKey,
243 gov_version: u64,
244 ) -> Result<(), ActorError> {
245 let node_path = ActorPath::from("/user/node");
246 let node = ctx.system().get_actor::<Node>(&node_path).await?;
247 node.tell(NodeMessage::TransferSubject(TransferSubject {
248 name: self.subject_metadata.name.clone(),
249 subject_id: self.subject_metadata.subject_id.clone(),
250 new_owner: new_owner.clone(),
251 actual_owner: self.subject_metadata.owner.clone(),
252 }))
253 .await?;
254
255 let witnesses_register = ctx
256 .system()
257 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
258 "/user/node/subject_manager/{}/witnesses_register",
259 self.governance_id
260 )))
261 .await?;
262 witnesses_register
263 .tell(WitnessesRegisterMessage::Transfer {
264 subject_id: self.subject_metadata.subject_id.clone(),
265 new_owner,
266 gov_version,
267 })
268 .await
269 }
270
271 async fn get_last_ledger(
272 &self,
273 ctx: &mut ActorContext<Self>,
274 ) -> Result<Option<SignedLedger>, ActorError> {
275 get_last_event(ctx).await
276 }
277
278 fn apply_patch(
279 &mut self,
280 json_patch: ValueWrapper,
281 ) -> Result<(), ActorError> {
282 let patch_json = serde_json::from_value::<Patch>(json_patch.0)
283 .map_err(|e| {
284 let error = SubjectError::PatchConversionFailed {
285 details: e.to_string(),
286 };
287 error!(
288 error = %e,
289 subject_id = %self.subject_metadata.subject_id,
290 "Failed to convert patch from JSON"
291 );
292 ActorError::Functional {
293 description: error.to_string(),
294 }
295 })?;
296
297 patch(&mut self.properties.0, &patch_json).map_err(|e| {
298 let error = SubjectError::PatchApplicationFailed {
299 details: e.to_string(),
300 };
301 error!(
302 error = %e,
303 subject_id = %self.subject_metadata.subject_id,
304 "Failed to apply patch to properties"
305 );
306 ActorError::Functional {
307 description: error.to_string(),
308 }
309 })?;
310
311 debug!(
312 subject_id = %self.subject_metadata.subject_id,
313 "Patch applied successfully"
314 );
315
316 Ok(())
317 }
318
319 async fn manager_new_ledger_events(
320 &mut self,
321 ctx: &mut ActorContext<Self>,
322 events: Vec<SignedLedger>,
323 ) -> Result<(), ActorError> {
324 let Some(hash) = self.hash else {
325 return Err(ActorError::FunctionalCritical {
326 description: "Can not obtain Hash".to_string(),
327 });
328 };
329
330 let current_sn = self.subject_metadata.sn;
331
332 if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
333 {
334 if let ActorError::Functional { description } = e.clone() {
335 warn!(
336 error = %description,
337 subject_id = %self.subject_metadata.subject_id,
338 sn = self.subject_metadata.sn,
339 "Error verifying new ledger events"
340 );
341
342 if self.subject_metadata.sn == 0 {
344 return Err(e);
345 }
346 } else {
347 error!(
348 error = %e,
349 subject_id = %self.subject_metadata.subject_id,
350 sn = self.subject_metadata.sn,
351 "Critical error verifying new ledger events"
352 );
353 return Err(e);
354 }
355 };
356
357 if current_sn < self.subject_metadata.sn || current_sn == 0 {
358 Self::publish_sink(
359 ctx,
360 SinkDataMessage::UpdateState(Box::new(Metadata::from(
361 self.clone(),
362 ))),
363 )
364 .await?;
365
366 self.update_sn(ctx).await?;
367 }
368
369 Ok(())
370 }
371}
372
373impl Tracker {
374 async fn create(
375 &self,
376 ctx: &ActorContext<Self>,
377 gov_version: u64,
378 ) -> Result<(), ActorError> {
379 let sn_register = ctx
380 .system()
381 .get_actor::<SnRegister>(&ActorPath::from(format!(
382 "/user/node/subject_manager/{}/sn_register",
383 self.governance_id
384 )))
385 .await?;
386
387 sn_register
388 .tell(SnRegisterMessage::RegisterSn {
389 subject_id: self.subject_metadata.subject_id.clone(),
390 gov_version,
391 sn: 0,
392 })
393 .await?;
394
395 if self.service || *self.our_key == self.subject_metadata.owner {
396 let subject_register = ctx
397 .system()
398 .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
399 "/user/node/subject_manager/{}/subject_register",
400 self.governance_id
401 )))
402 .await?;
403
404 let _response = subject_register
405 .ask(SubjectRegisterMessage::CreateSubject {
406 creator: self.subject_metadata.owner.clone(),
407 subject_id: self.subject_metadata.subject_id.clone(),
408 namespace: self.namespace.to_string(),
409 schema_id: self.subject_metadata.schema_id.clone(),
410 gov_version,
411 })
412 .await?;
413 }
414
415 let witnesses_register = ctx
416 .system()
417 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
418 "/user/node/subject_manager/{}/witnesses_register",
419 self.governance_id
420 )))
421 .await?;
422
423 witnesses_register
424 .tell(WitnessesRegisterMessage::Create {
425 subject_id: self.subject_metadata.subject_id.clone(),
426 gov_version,
427 owner: self.subject_metadata.owner.clone(),
428 })
429 .await
430 }
431
432 async fn register_gov_version_sn(
433 &self,
434 ctx: &ActorContext<Self>,
435 gov_version: u64,
436 ) -> Result<(), ActorError> {
437 let sn_register = ctx
438 .system()
439 .get_actor::<SnRegister>(&ActorPath::from(format!(
440 "/user/node/subject_manager/{}/sn_register",
441 self.governance_id
442 )))
443 .await?;
444
445 sn_register
446 .tell(SnRegisterMessage::RegisterSn {
447 subject_id: self.subject_metadata.subject_id.clone(),
448 gov_version,
449 sn: self.subject_metadata.sn,
450 })
451 .await
452 }
453
454 async fn verify_new_ledger_events(
455 &mut self,
456 ctx: &mut ActorContext<Self>,
457 events: Vec<SignedLedger>,
458 hash: &HashAlgorithm,
459 ) -> Result<(), ActorError> {
460 let mut iter = events.into_iter();
461 let last_ledger = get_last_event(ctx).await?;
462
463 let Some(first) = iter.next() else {
464 return Ok(());
465 };
466
467 let mut pending = Vec::new();
468
469 let mut last_ledger = if let Some(last_ledger) = last_ledger {
470 pending.push(first);
471 last_ledger
472 } else {
473 if let Err(e) = Self::verify_first_ledger_event(
474 ctx,
475 &first,
476 hash,
477 Metadata::from(self.clone()),
478 )
479 .await
480 {
481 return Err(ActorError::Functional {
482 description: e.to_string(),
483 });
484 }
485
486 self.create(ctx, first.content().gov_version).await?;
487
488 self.on_event(first.clone(), ctx).await;
489
490 Self::register(
491 ctx,
492 RegisterMessage::RegisterSubj {
493 gov_id: self.governance_id.to_string(),
494 subject_id: self.subject_metadata.subject_id.to_string(),
495 schema_id: self.subject_metadata.schema_id.clone(),
496 namespace: self.namespace.to_string(),
497 name: self.subject_metadata.name.clone(),
498 description: self.subject_metadata.description.clone(),
499 },
500 )
501 .await?;
502
503 Self::event_to_sink(
504 ctx,
505 DataForSink {
506 gov_id: Some(self.governance_id.to_string()),
507 subject_id: self.subject_metadata.subject_id.to_string(),
508 sn: self.subject_metadata.sn,
509 owner: self.subject_metadata.owner.to_string(),
510 namespace: self.namespace.to_string(),
511 schema_id: self.subject_metadata.schema_id.clone(),
512 issuer: first
513 .content()
514 .event_request
515 .signature()
516 .signer
517 .to_string(),
518 event_ledger_timestamp: first
519 .signature()
520 .timestamp
521 .as_nanos(),
522 event_request_timestamp: first
523 .content()
524 .event_request
525 .signature()
526 .timestamp
527 .as_nanos(),
528 gov_version: first.content().gov_version,
529 event_data_ledger: EventLedgerDataForSink::build(
530 &first.content().protocols,
531 &self.properties.0,
532 ),
533 },
534 first.content().event_request.content(),
535 )
536 .await?;
537
538 first
539 };
540
541 pending.extend(iter);
542
543 for event in pending {
544 let actual_ledger_hash =
545 hash_borsh(&*hash.hasher(), &last_ledger.0).map_err(|e| {
546 ActorError::FunctionalCritical {
547 description: format!(
548 "Can not creacte actual ledger event hash: {}",
549 e
550 ),
551 }
552 })?;
553 let last_data = LastData {
554 gov_version: last_ledger.content().gov_version,
555 vali_data: last_ledger
556 .content()
557 .protocols
558 .get_validation_data(),
559 };
560
561 let last_gov_version = last_data.gov_version;
562
563 let last_event_is_ok = match Self::verify_new_ledger_event(
564 ctx,
565 &event,
566 Metadata::from(self.clone()),
567 actual_ledger_hash,
568 last_data,
569 hash,
570 )
571 .await
572 {
573 Ok(last_event_is_ok) => last_event_is_ok,
574 Err(e) => {
575 if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
577 continue;
579 } else {
580 return Err(ActorError::Functional {
581 description: e.to_string(),
582 });
583 }
584 }
585 };
586
587 let event_gov_version = event.content().gov_version;
588
589 if last_event_is_ok {
590 if last_gov_version != event_gov_version {
591 self.register_gov_version_sn(ctx, last_gov_version).await?;
592 }
593
594 match event.content().event_request.content().clone() {
595 EventRequest::Transfer(transfer_request) => {
596 self.transfer(
597 ctx,
598 transfer_request.new_owner,
599 event.content().gov_version,
600 )
601 .await?;
602 }
603 EventRequest::Reject(..) => {
604 self.reject(ctx, event.content().gov_version).await?;
605 }
606 EventRequest::Confirm(..) => {
607 self.confirm(
608 ctx,
609 event.signature().signer.clone(),
610 event.content().gov_version,
611 )
612 .await?;
613 }
614 EventRequest::EOL(..) => {
615 self.eol(ctx).await?;
616
617 Self::register(
618 ctx,
619 RegisterMessage::EOLSubj {
620 gov_id: self.governance_id.to_string(),
621 subj_id: self
622 .subject_metadata
623 .subject_id
624 .to_string(),
625 },
626 )
627 .await?
628 }
629 _ => {}
630 };
631
632 Self::event_to_sink(
633 ctx,
634 DataForSink {
635 gov_id: Some(self.governance_id.to_string()),
636 subject_id: self
637 .subject_metadata
638 .subject_id
639 .to_string(),
640 sn: self.subject_metadata.sn,
641 owner: self.subject_metadata.owner.to_string(),
642 namespace: self.namespace.to_string(),
643 schema_id: self.subject_metadata.schema_id.clone(),
644 issuer: event
645 .content()
646 .event_request
647 .signature()
648 .signer
649 .to_string(),
650 event_ledger_timestamp: event
651 .signature()
652 .timestamp
653 .as_nanos(),
654 event_request_timestamp: event
655 .content()
656 .event_request
657 .signature()
658 .timestamp
659 .as_nanos(),
660 gov_version: event.content().gov_version,
661 event_data_ledger: EventLedgerDataForSink::build(
662 &event.content().protocols,
663 &self.properties.0,
664 ),
665 },
666 event.content().event_request.content(),
667 )
668 .await?;
669 }
670
671 self.on_event(event.clone(), ctx).await;
673
674 self.register_gov_version_sn(ctx, event_gov_version).await?;
679
680 last_ledger = event.clone();
682 }
683
684 Ok(())
685 }
686}
687
688#[derive(Debug, Clone)]
689pub enum TrackerMessage {
690 GetMetadata,
691 GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
692 GetLastLedger,
693 PurgeStorage,
694 UpdateLedger { events: Vec<SignedLedger> },
695}
696
697impl Message for TrackerMessage {}
698
699#[derive(Debug, Clone)]
700pub enum TrackerResponse {
701 Metadata(Box<Metadata>),
703 UpdateResult(u64, PublicKey, Option<PublicKey>),
704 Ledger {
705 ledger: Vec<SignedLedger>,
706 is_all: bool,
707 },
708 LastLedger {
709 ledger_event: Box<Option<SignedLedger>>,
710 },
711 Sn(u64),
712 Ok,
713}
714impl Response for TrackerResponse {}
715
716#[async_trait]
717impl Actor for Tracker {
718 type Event = SignedLedger;
719 type Message = TrackerMessage;
720 type Response = TrackerResponse;
721
722 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
723 parent_span.map_or_else(
724 || info_span!("Tracker", id),
725 |parent_span| info_span!(parent: parent_span, "Tracker", id),
726 )
727 }
728
729 async fn pre_start(
730 &mut self,
731 ctx: &mut ActorContext<Self>,
732 ) -> Result<(), ActorError> {
733 if let Err(e) = self.init_store("tracker", None, true, ctx).await {
734 error!(
735 error = %e,
736 "Failed to initialize tracker store"
737 );
738 return Err(e);
739 }
740
741 let Some(config): Option<crate::system::ConfigHelper> =
742 ctx.system().get_helper("config").await
743 else {
744 return Err(ActorError::Helper {
745 name: "config".to_owned(),
746 reason: "Not found".to_owned(),
747 });
748 };
749
750 if config.safe_mode {
751 return Ok(());
752 }
753
754 let our_key = self.our_key.clone();
755
756 if self.subject_metadata.active {
757 let Some(ext_db): Option<Arc<ExternalDB>> =
758 ctx.system().get_helper("ext_db").await
759 else {
760 error!("External database helper not found");
761 return Err(ActorError::Helper {
762 name: "ext_db".to_owned(),
763 reason: "Not found".to_owned(),
764 });
765 };
766
767 let Some(ave_sink): Option<AveSink> =
768 ctx.system().get_helper("sink").await
769 else {
770 error!("Sink helper not found");
771 return Err(ActorError::Helper {
772 name: "sink".to_owned(),
773 reason: "Not found".to_owned(),
774 });
775 };
776
777 let sink_actor = match ctx
778 .create_child(
779 "sink_data",
780 SinkData {
781 public_key: our_key.to_string(),
782 },
783 )
784 .await
785 {
786 Ok(actor) => actor,
787 Err(e) => {
788 error!(
789 error = %e,
790 "Failed to create sink_data child"
791 );
792 return Err(e);
793 }
794 };
795 let sink =
796 Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
797 ctx.system().run_sink(sink).await;
798
799 let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
800 ctx.system().run_sink(sink).await;
801 }
802
803 Ok(())
804 }
805}
806
807#[async_trait]
808impl Handler<Self> for Tracker {
809 async fn handle_message(
810 &mut self,
811 _sender: ActorPath,
812 msg: TrackerMessage,
813 ctx: &mut ActorContext<Self>,
814 ) -> Result<TrackerResponse, ActorError> {
815 match msg {
816 TrackerMessage::GetLedger { lo_sn, hi_sn } => {
817 let (ledger, is_all) =
818 self.get_ledger(ctx, lo_sn, hi_sn).await?;
819 Ok(TrackerResponse::Ledger { ledger, is_all })
820 }
821 TrackerMessage::GetLastLedger => {
822 let ledger_event = self.get_last_ledger(ctx).await?;
823 Ok(TrackerResponse::LastLedger {
824 ledger_event: Box::new(ledger_event),
825 })
826 }
827 TrackerMessage::GetMetadata => Ok(TrackerResponse::Metadata(
828 Box::new(Metadata::from(self.clone())),
829 )),
830 TrackerMessage::PurgeStorage => {
831 purge_storage(ctx).await?;
832
833 debug!(
834 msg_type = "PurgeStorage",
835 subject_id = %self.subject_metadata.subject_id,
836 "Tracker storage purged"
837 );
838
839 Ok(TrackerResponse::Ok)
840 }
841 TrackerMessage::UpdateLedger { events } => {
842 let events_count = events.len();
843 if let Err(e) =
844 self.manager_new_ledger_events(ctx, events).await
845 {
846 warn!(
847 msg_type = "UpdateLedger",
848 error = %e,
849 subject_id = %self.subject_metadata.subject_id,
850 events_count = events_count,
851 "Failed to verify new ledger events"
852 );
853 return Err(e);
854 };
855
856 debug!(
857 msg_type = "UpdateLedger",
858 subject_id = %self.subject_metadata.subject_id,
859 sn = self.subject_metadata.sn,
860 events_count = events_count,
861 "Ledger updated successfully"
862 );
863
864 Ok(TrackerResponse::UpdateResult(
865 self.subject_metadata.sn,
866 self.subject_metadata.owner.clone(),
867 self.subject_metadata.new_owner.clone(),
868 ))
869 }
870 }
871 }
872
873 async fn on_event(
874 &mut self,
875 event: SignedLedger,
876 ctx: &mut ActorContext<Self>,
877 ) {
878 if let Err(e) = self.persist(&event, ctx).await {
879 error!(
880 error = %e,
881 subject_id = %self.subject_metadata.subject_id,
882 sn = self.subject_metadata.sn,
883 "Failed to persist event"
884 );
885 emit_fail(ctx, e).await;
886 };
887
888 if let Err(e) = ctx.publish_event(event.clone()).await {
889 error!(
890 error = %e,
891 subject_id = %self.subject_metadata.subject_id,
892 sn = self.subject_metadata.sn,
893 "Failed to publish event"
894 );
895 emit_fail(ctx, e).await;
896 } else {
897 debug!(
898 subject_id = %self.subject_metadata.subject_id,
899 sn = self.subject_metadata.sn,
900 "Event persisted and published successfully"
901 );
902 }
903 }
904
905 async fn on_child_fault(
906 &mut self,
907 error: ActorError,
908 ctx: &mut ActorContext<Self>,
909 ) -> ChildAction {
910 error!(
911 subject_id = %self.subject_metadata.subject_id,
912 sn = self.subject_metadata.sn,
913 error = %error,
914 "Child fault in tracker"
915 );
916 emit_fail(ctx, error).await;
917 ChildAction::Stop
918 }
919}
920
921pub struct InitParamsTracker {
922 pub data: Option<TrackerInit>,
923 pub public_key: Arc<PublicKey>,
924 pub hash: HashAlgorithm,
925 pub is_service: bool,
926}
927
928#[async_trait]
929impl PersistentActor for Tracker {
930 type Persistence = FullPersistence;
931 type InitParams = InitParamsTracker;
932
933 fn update(&mut self, state: Self) {
934 self.properties = state.properties;
935 self.governance_id = state.governance_id;
936 self.namespace = state.namespace;
937 self.genesis_gov_version = state.genesis_gov_version;
938 self.subject_metadata = state.subject_metadata;
939 }
940
941 fn create_initial(params: Self::InitParams) -> Self {
942 let init = params.data.unwrap_or_default();
943
944 Self {
945 service: params.is_service,
946 hash: Some(params.hash),
947 our_key: params.public_key,
948 subject_metadata: init.subject_metadata,
949 properties: init.properties,
950 genesis_gov_version: init.genesis_gov_version,
951 governance_id: init.governance_id,
952 namespace: init.namespace,
953 }
954 }
955
956 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
957 match (
958 event.content().event_request.content(),
959 &event.content().protocols,
960 ) {
961 (EventRequest::Create(..), Protocols::Create { validation }) => {
962 if let ValidationMetadata::Metadata(metadata) =
963 &validation.validation_metadata
964 {
965 self.subject_metadata = SubjectMetadata::new(metadata);
966 self.properties = metadata.properties.clone();
967
968 debug!(
969 event_type = "Create",
970 subject_id = %self.subject_metadata.subject_id,
971 sn = self.subject_metadata.sn,
972 "Applied create event"
973 );
974 } else {
975 error!(
976 event_type = "Create",
977 "Validation metadata must be Metadata type"
978 );
979 return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
980 }
981
982 return Ok(());
983 }
984 (
985 EventRequest::Fact(..),
986 Protocols::TrackerFact { evaluation, .. },
987 ) => {
988 if let Some(eval_res) = evaluation.evaluator_res() {
989 self.apply_patch(eval_res.patch)?;
990 debug!(
991 event_type = "Fact",
992 subject_id = %self.subject_metadata.subject_id,
993 "Applied fact event with patch"
994 );
995 }
996 }
997 (
998 EventRequest::Transfer(transfer_request),
999 Protocols::Transfer { evaluation, .. },
1000 ) => {
1001 if evaluation.evaluator_res().is_some() {
1002 self.subject_metadata.new_owner =
1003 Some(transfer_request.new_owner.clone());
1004 debug!(
1005 event_type = "Transfer",
1006 subject_id = %self.subject_metadata.subject_id,
1007 new_owner = %transfer_request.new_owner,
1008 "Applied transfer event"
1009 );
1010 }
1011 }
1012 (EventRequest::Confirm(..), Protocols::TrackerConfirm { .. }) => {
1013 if let Some(new_owner) = self.subject_metadata.new_owner.take()
1014 {
1015 self.subject_metadata.owner = new_owner.clone();
1016 debug!(
1017 event_type = "Confirm",
1018 subject_id = %self.subject_metadata.subject_id,
1019 new_owner = %new_owner,
1020 "Applied confirm event"
1021 );
1022 } else {
1023 error!(
1024 event_type = "Confirm",
1025 subject_id = %self.subject_metadata.subject_id,
1026 "New owner is None in confirm event"
1027 );
1028 return Err(ActorError::Functional {
1029 description: "In confirm event, new owner is None"
1030 .to_owned(),
1031 });
1032 }
1033 }
1034 (EventRequest::Reject(..), Protocols::Reject { .. }) => {
1035 self.subject_metadata.new_owner = None;
1036 debug!(
1037 event_type = "Reject",
1038 subject_id = %self.subject_metadata.subject_id,
1039 "Applied reject event"
1040 );
1041 }
1042 (EventRequest::EOL(..), Protocols::EOL { .. }) => {
1043 self.subject_metadata.active = false;
1044 debug!(
1045 event_type = "EOL",
1046 subject_id = %self.subject_metadata.subject_id,
1047 "Applied EOL event"
1048 );
1049 }
1050 _ => {
1051 error!(
1052 subject_id = %self.subject_metadata.subject_id,
1053 "Invalid protocol data for Tracker"
1054 );
1055 return Err(ActorError::Functional {
1056 description:
1057 "Protocols data is for Governance and this is a Tracker"
1058 .to_owned(),
1059 });
1060 }
1061 }
1062
1063 self.subject_metadata.sn += 1;
1064 self.subject_metadata.prev_ledger_event_hash =
1065 event.content().prev_ledger_event_hash.clone();
1066
1067 Ok(())
1068 }
1069}
1070
1071impl Storable for Tracker {}