1use std::sync::Arc;
2
3use crate::{
4 db::Storable,
5 governance::{
6 Governance, GovernanceMessage, GovernanceResponse,
7 data::GovernanceData,
8 sn_register::{SnRegister, SnRegisterMessage},
9 subject_register::{SubjectRegister, SubjectRegisterMessage},
10 witnesses_register::{WitnessesRegister, WitnessesRegisterMessage},
11 },
12 helpers::{db::ExternalDB, sink::AveSink},
13 model::{
14 common::{emit_fail, get_last_event},
15 event::{Protocols, ValidationMetadata},
16 },
17 node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
18 subject::{
19 DataForSink, EventLedgerDataForSink, Metadata, SignedLedger, Subject,
20 SubjectMetadata,
21 error::SubjectError,
22 sinkdata::{SinkData, SinkDataMessage},
23 },
24 validation::request::LastData,
25};
26
27use ave_actors::{
28 Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
29 Response, Sink,
30};
31use ave_common::{
32 Namespace, ValueWrapper,
33 identity::{DigestIdentifier, HashAlgorithm, PublicKey, hash_borsh},
34 request::EventRequest,
35};
36
37use async_trait::async_trait;
38use ave_actors::{FullPersistence, PersistentActor};
39use borsh::{BorshDeserialize, BorshSerialize};
40use json_patch::{Patch, patch};
41use serde::{Deserialize, Serialize};
42use tracing::{Span, debug, error, info_span, warn};
43
44#[derive(Default, Debug, Serialize, Deserialize, Clone)]
45pub struct Tracker {
46 #[serde(skip)]
47 pub our_key: Arc<PublicKey>,
48 #[serde(skip)]
49 pub service: bool,
50 #[serde(skip)]
51 pub hash: Option<HashAlgorithm>,
52
53 pub subject_metadata: SubjectMetadata,
54 pub governance_id: DigestIdentifier,
55 pub namespace: Namespace,
57 pub genesis_gov_version: u64,
59 pub properties: ValueWrapper,
61}
62
63#[derive(Default)]
64pub struct TrackerInit {
65 pub subject_metadata: SubjectMetadata,
66 pub governance_id: DigestIdentifier,
67 pub namespace: Namespace,
68 pub genesis_gov_version: u64,
69 pub properties: ValueWrapper,
70}
71
72impl From<&Metadata> for TrackerInit {
73 fn from(value: &Metadata) -> Self {
74 Self {
75 subject_metadata: SubjectMetadata::new(value),
76 governance_id: value.governance_id.clone(),
77 namespace: value.namespace.clone(),
78 genesis_gov_version: value.genesis_gov_version,
79 properties: value.properties.clone(),
80 }
81 }
82}
83
84impl BorshSerialize for Tracker {
85 fn serialize<W: std::io::Write>(
86 &self,
87 writer: &mut W,
88 ) -> std::io::Result<()> {
89 BorshSerialize::serialize(&self.subject_metadata, writer)?;
91 BorshSerialize::serialize(&self.governance_id, writer)?;
92 BorshSerialize::serialize(&self.namespace, writer)?;
93 BorshSerialize::serialize(&self.genesis_gov_version, writer)?;
94 BorshSerialize::serialize(&self.properties, writer)?;
95
96 Ok(())
97 }
98}
99
100impl BorshDeserialize for Tracker {
101 fn deserialize_reader<R: std::io::Read>(
102 reader: &mut R,
103 ) -> std::io::Result<Self> {
104 let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
106 let governance_id = DigestIdentifier::deserialize_reader(reader)?;
107 let namespace = Namespace::deserialize_reader(reader)?;
108 let genesis_gov_version = u64::deserialize_reader(reader)?;
109 let properties = ValueWrapper::deserialize_reader(reader)?;
110
111 let our_key = Arc::new(PublicKey::default());
114 let hash = None;
115
116 Ok(Self {
117 service: false,
118 hash,
119 our_key,
120 subject_metadata,
121 governance_id,
122 namespace,
123 genesis_gov_version,
124 properties,
125 })
126 }
127}
128
129#[async_trait]
130impl Subject for Tracker {
131 async fn update_sn(
132 &self,
133 ctx: &mut ActorContext<Self>,
134 ) -> Result<(), ActorError> {
135 let witnesses_register = ctx
136 .system()
137 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
138 "/user/node/{}/witnesses_register",
139 self.governance_id
140 )))
141 .await?;
142 witnesses_register
143 .tell(WitnessesRegisterMessage::UpdateSn {
144 subject_id: self.subject_metadata.subject_id.clone(),
145 sn: self.subject_metadata.sn,
146 })
147 .await
148 }
149
150 async fn eol(
151 &self,
152 ctx: &mut ActorContext<Self>,
153 ) -> Result<(), ActorError> {
154 let node = ctx.get_parent::<Node>().await?;
155 node.tell(NodeMessage::EOLSubject {
156 subject_id: self.subject_metadata.subject_id.clone(),
157 i_owner: *self.our_key == self.subject_metadata.owner,
158 })
159 .await
160 }
161
162 async fn reject(
163 &self,
164 ctx: &mut ActorContext<Self>,
165 gov_version: u64,
166 ) -> Result<(), ActorError> {
167 let node = ctx.get_parent::<Node>().await?;
168 node.tell(NodeMessage::RejectTransfer(
169 self.subject_metadata.subject_id.clone(),
170 ))
171 .await?;
172
173 let witnesses_register = ctx
174 .system()
175 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
176 "/user/node/{}/witnesses_register",
177 self.governance_id
178 )))
179 .await?;
180 witnesses_register
181 .tell(WitnessesRegisterMessage::Reject {
182 subject_id: self.subject_metadata.subject_id.clone(),
183 sn: self.subject_metadata.sn + 1,
184 gov_version,
185 })
186 .await
187 }
188
189 async fn confirm(
190 &self,
191 ctx: &mut ActorContext<Self>,
192 new_owner: PublicKey,
193 gov_version: u64,
194 ) -> Result<(), ActorError> {
195 let node = ctx.get_parent::<Node>().await?;
196 node.tell(NodeMessage::ConfirmTransfer(
197 self.subject_metadata.subject_id.clone(),
198 ))
199 .await?;
200
201 if self.service || *self.our_key == self.subject_metadata.owner {
202 let subject_register = ctx
203 .system()
204 .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
205 "/user/node/{}/subject_register",
206 self.governance_id
207 )))
208 .await?;
209
210 let _response = subject_register
211 .ask(SubjectRegisterMessage::UpdateSubject {
212 new_owner,
213 old_owner: self.subject_metadata.owner.clone(),
214 subject_id: self.subject_metadata.subject_id.clone(),
215 namespace: self.namespace.to_string(),
216 schema_id: self.subject_metadata.schema_id.clone(),
217 gov_version,
218 })
219 .await?;
220 }
221
222 let witnesses_register = ctx
223 .system()
224 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
225 "/user/node/{}/witnesses_register",
226 self.governance_id
227 )))
228 .await?;
229 witnesses_register
230 .tell(WitnessesRegisterMessage::Confirm {
231 subject_id: self.subject_metadata.subject_id.clone(),
232 sn: self.subject_metadata.sn + 1,
233 gov_version,
234 })
235 .await
236 }
237
238 async fn transfer(
239 &self,
240 ctx: &mut ActorContext<Self>,
241 new_owner: PublicKey,
242 gov_version: u64,
243 ) -> Result<(), ActorError> {
244 let node = ctx.get_parent::<Node>().await?;
245 node.tell(NodeMessage::TransferSubject(TransferSubject {
246 name: self.subject_metadata.name.clone(),
247 subject_id: self.subject_metadata.subject_id.clone(),
248 new_owner: new_owner.clone(),
249 actual_owner: self.subject_metadata.owner.clone(),
250 }))
251 .await?;
252
253 let witnesses_register = ctx
254 .system()
255 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
256 "/user/node/{}/witnesses_register",
257 self.governance_id
258 )))
259 .await?;
260 witnesses_register
261 .tell(WitnessesRegisterMessage::Transfer {
262 subject_id: self.subject_metadata.subject_id.clone(),
263 new_owner,
264 gov_version,
265 })
266 .await
267 }
268
269 async fn get_last_ledger(
270 &self,
271 ctx: &mut ActorContext<Self>,
272 ) -> Result<Option<SignedLedger>, ActorError> {
273 get_last_event(ctx).await
274 }
275
276 fn apply_patch(
277 &mut self,
278 json_patch: ValueWrapper,
279 ) -> Result<(), ActorError> {
280 let patch_json = serde_json::from_value::<Patch>(json_patch.0)
281 .map_err(|e| {
282 let error = SubjectError::PatchConversionFailed {
283 details: e.to_string(),
284 };
285 error!(
286 error = %e,
287 subject_id = %self.subject_metadata.subject_id,
288 "Failed to convert patch from JSON"
289 );
290 ActorError::Functional {
291 description: error.to_string(),
292 }
293 })?;
294
295 patch(&mut self.properties.0, &patch_json).map_err(|e| {
296 let error = SubjectError::PatchApplicationFailed {
297 details: e.to_string(),
298 };
299 error!(
300 error = %e,
301 subject_id = %self.subject_metadata.subject_id,
302 "Failed to apply patch to properties"
303 );
304 ActorError::Functional {
305 description: error.to_string(),
306 }
307 })?;
308
309 debug!(
310 subject_id = %self.subject_metadata.subject_id,
311 "Patch applied successfully"
312 );
313
314 Ok(())
315 }
316
317 async fn manager_new_ledger_events(
318 &mut self,
319 ctx: &mut ActorContext<Self>,
320 events: Vec<SignedLedger>,
321 ) -> Result<(), ActorError> {
322 let Some(hash) = self.hash else {
323 return Err(ActorError::FunctionalCritical {
324 description: "Can not obtain Hash".to_string(),
325 });
326 };
327
328 let current_sn = self.subject_metadata.sn;
329
330 if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
331 {
332 if let ActorError::Functional { description } = e.clone() {
333 warn!(
334 error = %description,
335 subject_id = %self.subject_metadata.subject_id,
336 sn = self.subject_metadata.sn,
337 "Error verifying new ledger events"
338 );
339
340 if self.subject_metadata.sn == 0 {
342 return Err(e);
343 }
344 } else {
345 error!(
346 error = %e,
347 subject_id = %self.subject_metadata.subject_id,
348 sn = self.subject_metadata.sn,
349 "Critical error verifying new ledger events"
350 );
351 return Err(e);
352 }
353 };
354
355 if current_sn < self.subject_metadata.sn || current_sn == 0 {
356 Self::publish_sink(
357 ctx,
358 SinkDataMessage::UpdateState(Box::new(Metadata::from(
359 self.clone(),
360 ))),
361 )
362 .await?;
363
364 self.update_sn(ctx).await?;
365 }
366
367 Ok(())
368 }
369}
370
371impl Tracker {
372 async fn create(
373 &self,
374 ctx: &ActorContext<Self>,
375 gov_version: u64,
376 ) -> Result<(), ActorError> {
377 let sn_register = ctx
378 .system()
379 .get_actor::<SnRegister>(&ActorPath::from(format!(
380 "/user/node/{}/sn_register",
381 self.governance_id
382 )))
383 .await?;
384
385 sn_register
386 .tell(SnRegisterMessage::RegisterSn {
387 subject_id: self.subject_metadata.subject_id.clone(),
388 gov_version,
389 sn: 0,
390 })
391 .await?;
392
393 if self.service || *self.our_key == self.subject_metadata.owner {
394 let subject_register = ctx
395 .system()
396 .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
397 "/user/node/{}/subject_register",
398 self.governance_id
399 )))
400 .await?;
401
402 let _response = subject_register
403 .ask(SubjectRegisterMessage::CreateSubject {
404 creator: self.subject_metadata.owner.clone(),
405 subject_id: self.subject_metadata.subject_id.clone(),
406 namespace: self.namespace.to_string(),
407 schema_id: self.subject_metadata.schema_id.clone(),
408 gov_version,
409 })
410 .await?;
411 }
412
413 let witnesses_register = ctx
414 .system()
415 .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
416 "/user/node/{}/witnesses_register",
417 self.governance_id
418 )))
419 .await?;
420
421 witnesses_register
422 .tell(WitnessesRegisterMessage::Create {
423 subject_id: self.subject_metadata.subject_id.clone(),
424 gov_version,
425 owner: self.subject_metadata.owner.clone(),
426 })
427 .await
428 }
429
430 async fn register_gov_version_sn(
431 &self,
432 ctx: &ActorContext<Self>,
433 gov_version: u64,
434 ) -> Result<(), ActorError> {
435 let sn_register = ctx
436 .system()
437 .get_actor::<SnRegister>(&ActorPath::from(format!(
438 "/user/node/{}/sn_register",
439 self.governance_id
440 )))
441 .await?;
442
443 sn_register
444 .tell(SnRegisterMessage::RegisterSn {
445 subject_id: self.subject_metadata.subject_id.clone(),
446 gov_version,
447 sn: self.subject_metadata.sn,
448 })
449 .await
450 }
451
452 async fn get_governance(
453 &self,
454 ctx: &ActorContext<Self>,
455 ) -> Result<GovernanceData, ActorError> {
456 let governance_path =
457 ActorPath::from(format!("/user/node/{}", self.governance_id));
458
459 let governance_actor = ctx
460 .system()
461 .get_actor::<Governance>(&governance_path)
462 .await?;
463
464 let response = governance_actor
465 .ask(GovernanceMessage::GetGovernance)
466 .await?;
467
468 match response {
469 GovernanceResponse::Governance(gov) => Ok(*gov),
470 _ => Err(ActorError::UnexpectedResponse {
471 path: governance_path,
472 expected: "GovernanceResponse::Governance".to_owned(),
473 }),
474 }
475 }
476
477 async fn verify_new_ledger_events(
478 &mut self,
479 ctx: &mut ActorContext<Self>,
480 events: Vec<SignedLedger>,
481 hash: &HashAlgorithm,
482 ) -> Result<(), ActorError> {
483 let mut iter = events.into_iter();
484 let last_ledger = get_last_event(ctx).await?;
485
486 let Some(first) = iter.next() else {
487 return Ok(());
488 };
489
490 let mut pending = Vec::new();
491
492 let mut last_ledger = if let Some(last_ledger) = last_ledger {
493 pending.push(first);
494 last_ledger
495 } else {
496 if let Err(e) = Self::verify_first_ledger_event(
497 ctx,
498 &first,
499 hash,
500 Metadata::from(self.clone()),
501 )
502 .await
503 {
504 return Err(ActorError::Functional {
505 description: e.to_string(),
506 });
507 }
508
509 self.create(ctx, first.content().gov_version).await?;
510
511 self.on_event(first.clone(), ctx).await;
512
513 Self::register(
514 ctx,
515 RegisterMessage::RegisterSubj {
516 gov_id: self.governance_id.to_string(),
517 subject_id: self.subject_metadata.subject_id.to_string(),
518 schema_id: self.subject_metadata.schema_id.clone(),
519 namespace: self.namespace.to_string(),
520 name: self.subject_metadata.name.clone(),
521 description: self.subject_metadata.description.clone(),
522 },
523 )
524 .await?;
525
526 Self::event_to_sink(
527 ctx,
528 DataForSink {
529 gov_id: Some(self.governance_id.to_string()),
530 subject_id: self.subject_metadata.subject_id.to_string(),
531 sn: self.subject_metadata.sn,
532 owner: self.subject_metadata.owner.to_string(),
533 namespace: self.namespace.to_string(),
534 schema_id: self.subject_metadata.schema_id.clone(),
535 issuer: first
536 .content()
537 .event_request
538 .signature()
539 .signer
540 .to_string(),
541 event_ledger_timestamp: first
542 .signature()
543 .timestamp
544 .as_nanos(),
545 event_request_timestamp: first
546 .content()
547 .event_request
548 .signature()
549 .timestamp
550 .as_nanos(),
551 gov_version: first.content().gov_version,
552 event_data_ledger: EventLedgerDataForSink::build(
553 &first.content().protocols,
554 &self.properties.0,
555 ),
556 },
557 first.content().event_request.content(),
558 )
559 .await?;
560
561 first
562 };
563
564 pending.extend(iter);
565
566 for event in pending {
567 let actual_ledger_hash =
568 hash_borsh(&*hash.hasher(), &last_ledger.0).map_err(|e| {
569 ActorError::FunctionalCritical {
570 description: format!(
571 "Can not creacte actual ledger event hash: {}",
572 e
573 ),
574 }
575 })?;
576 let last_data = LastData {
577 gov_version: last_ledger.content().gov_version,
578 vali_data: last_ledger
579 .content()
580 .protocols
581 .get_validation_data(),
582 };
583
584 let last_gov_version = last_data.gov_version;
585
586 let last_event_is_ok = match Self::verify_new_ledger_event(
587 ctx,
588 &event,
589 Metadata::from(self.clone()),
590 actual_ledger_hash,
591 last_data,
592 hash,
593 )
594 .await
595 {
596 Ok(last_event_is_ok) => last_event_is_ok,
597 Err(e) => {
598 if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
600 continue;
602 } else {
603 return Err(ActorError::Functional {
604 description: e.to_string(),
605 });
606 }
607 }
608 };
609
610 let event_gov_version = event.content().gov_version;
611
612 if last_event_is_ok {
613 if last_gov_version != event_gov_version {
614 self.register_gov_version_sn(ctx, last_gov_version).await?;
615 }
616
617 match event.content().event_request.content().clone() {
618 EventRequest::Transfer(transfer_request) => {
619 self.transfer(
620 ctx,
621 transfer_request.new_owner,
622 event.content().gov_version,
623 )
624 .await?;
625 }
626 EventRequest::Reject(..) => {
627 self.reject(ctx, event.content().gov_version).await?;
628 }
629 EventRequest::Confirm(..) => {
630 self.confirm(
631 ctx,
632 event.signature().signer.clone(),
633 event.content().gov_version,
634 )
635 .await?;
636 }
637 EventRequest::EOL(..) => {
638 self.eol(ctx).await?;
639
640 Self::register(
641 ctx,
642 RegisterMessage::EOLSubj {
643 gov_id: self.governance_id.to_string(),
644 subj_id: self
645 .subject_metadata
646 .subject_id
647 .to_string(),
648 },
649 )
650 .await?
651 }
652 _ => {}
653 };
654
655 Self::event_to_sink(
656 ctx,
657 DataForSink {
658 gov_id: Some(self.governance_id.to_string()),
659 subject_id: self
660 .subject_metadata
661 .subject_id
662 .to_string(),
663 sn: self.subject_metadata.sn,
664 owner: self.subject_metadata.owner.to_string(),
665 namespace: self.namespace.to_string(),
666 schema_id: self.subject_metadata.schema_id.clone(),
667 issuer: event
668 .content()
669 .event_request
670 .signature()
671 .signer
672 .to_string(),
673 event_ledger_timestamp: event
674 .signature()
675 .timestamp
676 .as_nanos(),
677 event_request_timestamp: event
678 .content()
679 .event_request
680 .signature()
681 .timestamp
682 .as_nanos(),
683 gov_version: event.content().gov_version,
684 event_data_ledger: EventLedgerDataForSink::build(
685 &event.content().protocols,
686 &self.properties.0,
687 ),
688 },
689 event.content().event_request.content(),
690 )
691 .await?;
692 }
693
694 self.on_event(event.clone(), ctx).await;
696
697 self.register_gov_version_sn(ctx, event_gov_version).await?;
702
703 last_ledger = event.clone();
705 }
706
707 Ok(())
708 }
709}
710
711#[derive(Debug, Clone)]
712pub enum TrackerMessage {
713 GetMetadata,
714 GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
715 GetLastLedger,
716 UpdateLedger { events: Vec<SignedLedger> },
717 GetGovernance,
718}
719
720impl Message for TrackerMessage {}
721
722#[derive(Debug, Clone)]
723pub enum TrackerResponse {
724 Metadata(Box<Metadata>),
726 UpdateResult(u64, PublicKey, Option<PublicKey>),
727 Ledger {
728 ledger: Vec<SignedLedger>,
729 is_all: bool,
730 },
731 LastLedger {
732 ledger_event: Box<Option<SignedLedger>>,
733 },
734 Governance(Box<GovernanceData>),
735 Sn(u64),
736 Ok,
737}
738impl Response for TrackerResponse {}
739
740#[async_trait]
741impl Actor for Tracker {
742 type Event = SignedLedger;
743 type Message = TrackerMessage;
744 type Response = TrackerResponse;
745
746 fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
747 parent_span.map_or_else(
748 || info_span!("Tracker", id),
749 |parent_span| info_span!(parent: parent_span, "Tracker", id),
750 )
751 }
752
753 async fn pre_start(
754 &mut self,
755 ctx: &mut ActorContext<Self>,
756 ) -> Result<(), ActorError> {
757 if let Err(e) = self.init_store("tracker", None, true, ctx).await {
758 error!(
759 error = %e,
760 "Failed to initialize tracker store"
761 );
762 return Err(e);
763 }
764
765 let our_key = self.our_key.clone();
766
767 if self.subject_metadata.active {
768 let Some(ext_db): Option<Arc<ExternalDB>> =
769 ctx.system().get_helper("ext_db").await
770 else {
771 error!("External database helper not found");
772 return Err(ActorError::Helper {
773 name: "ext_db".to_owned(),
774 reason: "Not found".to_owned(),
775 });
776 };
777
778 let Some(ave_sink): Option<AveSink> =
779 ctx.system().get_helper("sink").await
780 else {
781 error!("Sink helper not found");
782 return Err(ActorError::Helper {
783 name: "sink".to_owned(),
784 reason: "Not found".to_owned(),
785 });
786 };
787
788 let sink_actor = match ctx
789 .create_child(
790 "sink_data",
791 SinkData {
792 public_key: our_key.to_string(),
793 },
794 )
795 .await
796 {
797 Ok(actor) => actor,
798 Err(e) => {
799 error!(
800 error = %e,
801 "Failed to create sink_data child"
802 );
803 return Err(e);
804 }
805 };
806 let sink =
807 Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
808 ctx.system().run_sink(sink).await;
809
810 let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
811 ctx.system().run_sink(sink).await;
812 }
813
814 Ok(())
815 }
816}
817
818#[async_trait]
819impl Handler<Self> for Tracker {
820 async fn handle_message(
821 &mut self,
822 _sender: ActorPath,
823 msg: TrackerMessage,
824 ctx: &mut ActorContext<Self>,
825 ) -> Result<TrackerResponse, ActorError> {
826 match msg {
827 TrackerMessage::GetLedger { lo_sn, hi_sn } => {
828 let (ledger, is_all) =
829 self.get_ledger(ctx, lo_sn, hi_sn).await?;
830 Ok(TrackerResponse::Ledger { ledger, is_all })
831 }
832 TrackerMessage::GetLastLedger => {
833 let ledger_event = self.get_last_ledger(ctx).await?;
834 Ok(TrackerResponse::LastLedger {
835 ledger_event: Box::new(ledger_event),
836 })
837 }
838 TrackerMessage::GetMetadata => Ok(TrackerResponse::Metadata(
839 Box::new(Metadata::from(self.clone())),
840 )),
841 TrackerMessage::UpdateLedger { events } => {
842 let events_count = events.len();
843 if let Err(e) =
844 self.manager_new_ledger_events(ctx, events).await
845 {
846 warn!(
847 msg_type = "UpdateLedger",
848 error = %e,
849 subject_id = %self.subject_metadata.subject_id,
850 events_count = events_count,
851 "Failed to verify new ledger events"
852 );
853 return Err(e);
854 };
855
856 debug!(
857 msg_type = "UpdateLedger",
858 subject_id = %self.subject_metadata.subject_id,
859 sn = self.subject_metadata.sn,
860 events_count = events_count,
861 "Ledger updated successfully"
862 );
863
864 Ok(TrackerResponse::UpdateResult(
865 self.subject_metadata.sn,
866 self.subject_metadata.owner.clone(),
867 self.subject_metadata.new_owner.clone(),
868 ))
869 }
870 TrackerMessage::GetGovernance => {
871 return Ok(TrackerResponse::Governance(Box::new(
872 self.get_governance(ctx).await?,
873 )));
874 }
875 }
876 }
877
878 async fn on_event(
879 &mut self,
880 event: SignedLedger,
881 ctx: &mut ActorContext<Self>,
882 ) {
883 if let Err(e) = self.persist(&event, ctx).await {
884 error!(
885 error = %e,
886 subject_id = %self.subject_metadata.subject_id,
887 sn = self.subject_metadata.sn,
888 "Failed to persist event"
889 );
890 emit_fail(ctx, e).await;
891 };
892
893 if let Err(e) = ctx.publish_event(event.clone()).await {
894 error!(
895 error = %e,
896 subject_id = %self.subject_metadata.subject_id,
897 sn = self.subject_metadata.sn,
898 "Failed to publish event"
899 );
900 emit_fail(ctx, e).await;
901 } else {
902 debug!(
903 subject_id = %self.subject_metadata.subject_id,
904 sn = self.subject_metadata.sn,
905 "Event persisted and published successfully"
906 );
907 }
908 }
909
910 async fn on_child_fault(
911 &mut self,
912 error: ActorError,
913 ctx: &mut ActorContext<Self>,
914 ) -> ChildAction {
915 error!(
916 subject_id = %self.subject_metadata.subject_id,
917 sn = self.subject_metadata.sn,
918 error = %error,
919 "Child fault in tracker"
920 );
921 emit_fail(ctx, error).await;
922 ChildAction::Stop
923 }
924}
925
926pub struct InitParamsTracker {
927 pub data: Option<TrackerInit>,
928 pub public_key: Arc<PublicKey>,
929 pub hash: HashAlgorithm,
930 pub is_service: bool,
931}
932
933#[async_trait]
934impl PersistentActor for Tracker {
935 type Persistence = FullPersistence;
936 type InitParams = InitParamsTracker;
937
938 fn update(&mut self, state: Self) {
939 self.properties = state.properties;
940 self.governance_id = state.governance_id;
941 self.namespace = state.namespace;
942 self.genesis_gov_version = state.genesis_gov_version;
943 self.subject_metadata = state.subject_metadata;
944 }
945
946 fn create_initial(params: Self::InitParams) -> Self {
947 let init = params.data.unwrap_or_default();
948
949 Self {
950 service: params.is_service,
951 hash: Some(params.hash),
952 our_key: params.public_key,
953 subject_metadata: init.subject_metadata,
954 properties: init.properties,
955 genesis_gov_version: init.genesis_gov_version,
956 governance_id: init.governance_id,
957 namespace: init.namespace,
958 }
959 }
960
961 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
962 match (
963 event.content().event_request.content(),
964 &event.content().protocols,
965 ) {
966 (EventRequest::Create(..), Protocols::Create { validation }) => {
967 if let ValidationMetadata::Metadata(metadata) =
968 &validation.validation_metadata
969 {
970 self.subject_metadata = SubjectMetadata::new(metadata);
971 self.properties = metadata.properties.clone();
972
973 debug!(
974 event_type = "Create",
975 subject_id = %self.subject_metadata.subject_id,
976 sn = self.subject_metadata.sn,
977 "Applied create event"
978 );
979 } else {
980 error!(
981 event_type = "Create",
982 "Validation metadata must be Metadata type"
983 );
984 return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
985 }
986
987 return Ok(());
988 }
989 (
990 EventRequest::Fact(..),
991 Protocols::TrackerFact { evaluation, .. },
992 ) => {
993 if let Some(eval_res) = evaluation.evaluator_res() {
994 self.apply_patch(eval_res.patch)?;
995 debug!(
996 event_type = "Fact",
997 subject_id = %self.subject_metadata.subject_id,
998 "Applied fact event with patch"
999 );
1000 }
1001 }
1002 (
1003 EventRequest::Transfer(transfer_request),
1004 Protocols::Transfer { evaluation, .. },
1005 ) => {
1006 if evaluation.evaluator_res().is_some() {
1007 self.subject_metadata.new_owner =
1008 Some(transfer_request.new_owner.clone());
1009 debug!(
1010 event_type = "Transfer",
1011 subject_id = %self.subject_metadata.subject_id,
1012 new_owner = %transfer_request.new_owner,
1013 "Applied transfer event"
1014 );
1015 }
1016 }
1017 (EventRequest::Confirm(..), Protocols::TrackerConfirm { .. }) => {
1018 if let Some(new_owner) = self.subject_metadata.new_owner.take()
1019 {
1020 self.subject_metadata.owner = new_owner.clone();
1021 debug!(
1022 event_type = "Confirm",
1023 subject_id = %self.subject_metadata.subject_id,
1024 new_owner = %new_owner,
1025 "Applied confirm event"
1026 );
1027 } else {
1028 error!(
1029 event_type = "Confirm",
1030 subject_id = %self.subject_metadata.subject_id,
1031 "New owner is None in confirm event"
1032 );
1033 return Err(ActorError::Functional {
1034 description: "In confirm event, new owner is None"
1035 .to_owned(),
1036 });
1037 }
1038 }
1039 (EventRequest::Reject(..), Protocols::Reject { .. }) => {
1040 self.subject_metadata.new_owner = None;
1041 debug!(
1042 event_type = "Reject",
1043 subject_id = %self.subject_metadata.subject_id,
1044 "Applied reject event"
1045 );
1046 }
1047 (EventRequest::EOL(..), Protocols::EOL { .. }) => {
1048 self.subject_metadata.active = false;
1049 debug!(
1050 event_type = "EOL",
1051 subject_id = %self.subject_metadata.subject_id,
1052 "Applied EOL event"
1053 );
1054 }
1055 _ => {
1056 error!(
1057 subject_id = %self.subject_metadata.subject_id,
1058 "Invalid protocol data for Tracker"
1059 );
1060 return Err(ActorError::Functional {
1061 description:
1062 "Protocols data is for Governance and this is a Tracker"
1063 .to_owned(),
1064 });
1065 }
1066 }
1067
1068 self.subject_metadata.sn += 1;
1069 self.subject_metadata.prev_ledger_event_hash =
1070 event.content().prev_ledger_event_hash.clone();
1071
1072 Ok(())
1073 }
1074}
1075
1076impl Storable for Tracker {}