Skip to main content

ave_core/tracker/
mod.rs

1use std::sync::Arc;
2
3use crate::{
4    db::Storable,
5    governance::{
6        Governance, GovernanceMessage, GovernanceResponse,
7        data::GovernanceData,
8        sn_register::{SnRegister, SnRegisterMessage},
9        subject_register::{SubjectRegister, SubjectRegisterMessage},
10        witnesses_register::{WitnessesRegister, WitnessesRegisterMessage},
11    },
12    helpers::{db::ExternalDB, sink::AveSink},
13    model::{
14        common::{emit_fail, get_last_event},
15        event::{Protocols, ValidationMetadata},
16    },
17    node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
18    subject::{
19        DataForSink, EventLedgerDataForSink, Metadata, SignedLedger, Subject,
20        SubjectMetadata,
21        error::SubjectError,
22        sinkdata::{SinkData, SinkDataMessage},
23    },
24    validation::request::LastData,
25};
26
27use ave_actors::{
28    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
29    Response, Sink,
30};
31use ave_common::{
32    Namespace, ValueWrapper,
33    identity::{DigestIdentifier, HashAlgorithm, PublicKey, hash_borsh},
34    request::EventRequest,
35};
36
37use async_trait::async_trait;
38use ave_actors::{FullPersistence, PersistentActor};
39use borsh::{BorshDeserialize, BorshSerialize};
40use json_patch::{Patch, patch};
41use serde::{Deserialize, Serialize};
42use tracing::{Span, debug, error, info_span, warn};
43
44#[derive(Default, Debug, Serialize, Deserialize, Clone)]
45pub struct Tracker {
46    #[serde(skip)]
47    pub our_key: Arc<PublicKey>,
48    #[serde(skip)]
49    pub service: bool,
50    #[serde(skip)]
51    pub hash: Option<HashAlgorithm>,
52
53    pub subject_metadata: SubjectMetadata,
54    pub governance_id: DigestIdentifier,
55    /// The namespace of the subject.
56    pub namespace: Namespace,
57    /// The version of the governance contract that created the subject.
58    pub genesis_gov_version: u64,
59    /// The current status of the subject.
60    pub properties: ValueWrapper,
61}
62
63#[derive(Default)]
64pub struct TrackerInit {
65    pub subject_metadata: SubjectMetadata,
66    pub governance_id: DigestIdentifier,
67    pub namespace: Namespace,
68    pub genesis_gov_version: u64,
69    pub properties: ValueWrapper,
70}
71
72impl From<&Metadata> for TrackerInit {
73    fn from(value: &Metadata) -> Self {
74        Self {
75            subject_metadata: SubjectMetadata::new(value),
76            governance_id: value.governance_id.clone(),
77            namespace: value.namespace.clone(),
78            genesis_gov_version: value.genesis_gov_version,
79            properties: value.properties.clone(),
80        }
81    }
82}
83
84impl BorshSerialize for Tracker {
85    fn serialize<W: std::io::Write>(
86        &self,
87        writer: &mut W,
88    ) -> std::io::Result<()> {
89        // Serialize only the fields we want to persist, skipping 'owner'
90        BorshSerialize::serialize(&self.subject_metadata, writer)?;
91        BorshSerialize::serialize(&self.governance_id, writer)?;
92        BorshSerialize::serialize(&self.namespace, writer)?;
93        BorshSerialize::serialize(&self.genesis_gov_version, writer)?;
94        BorshSerialize::serialize(&self.properties, writer)?;
95
96        Ok(())
97    }
98}
99
100impl BorshDeserialize for Tracker {
101    fn deserialize_reader<R: std::io::Read>(
102        reader: &mut R,
103    ) -> std::io::Result<Self> {
104        // Deserialize the persisted fields
105        let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
106        let governance_id = DigestIdentifier::deserialize_reader(reader)?;
107        let namespace = Namespace::deserialize_reader(reader)?;
108        let genesis_gov_version = u64::deserialize_reader(reader)?;
109        let properties = ValueWrapper::deserialize_reader(reader)?;
110
111        // Create a default/placeholder KeyPair for 'owner'
112        // This will be replaced by the actual owner during actor initialization
113        let our_key = Arc::new(PublicKey::default());
114        let hash = None;
115
116        Ok(Self {
117            service: false,
118            hash,
119            our_key,
120            subject_metadata,
121            governance_id,
122            namespace,
123            genesis_gov_version,
124            properties,
125        })
126    }
127}
128
129#[async_trait]
130impl Subject for Tracker {
131    async fn update_sn(
132        &self,
133        ctx: &mut ActorContext<Self>,
134    ) -> Result<(), ActorError> {
135        let witnesses_register = ctx
136            .system()
137            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
138                "/user/node/{}/witnesses_register",
139                self.governance_id
140            )))
141            .await?;
142        witnesses_register
143            .tell(WitnessesRegisterMessage::UpdateSn {
144                subject_id: self.subject_metadata.subject_id.clone(),
145                sn: self.subject_metadata.sn,
146            })
147            .await
148    }
149
150    async fn eol(
151        &self,
152        ctx: &mut ActorContext<Self>,
153    ) -> Result<(), ActorError> {
154        let node = ctx.get_parent::<Node>().await?;
155        node.tell(NodeMessage::EOLSubject {
156            subject_id: self.subject_metadata.subject_id.clone(),
157            i_owner: *self.our_key == self.subject_metadata.owner,
158        })
159        .await
160    }
161
162    async fn reject(
163        &self,
164        ctx: &mut ActorContext<Self>,
165        gov_version: u64,
166    ) -> Result<(), ActorError> {
167        let node = ctx.get_parent::<Node>().await?;
168        node.tell(NodeMessage::RejectTransfer(
169            self.subject_metadata.subject_id.clone(),
170        ))
171        .await?;
172
173        let witnesses_register = ctx
174            .system()
175            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
176                "/user/node/{}/witnesses_register",
177                self.governance_id
178            )))
179            .await?;
180        witnesses_register
181            .tell(WitnessesRegisterMessage::Reject {
182                subject_id: self.subject_metadata.subject_id.clone(),
183                sn: self.subject_metadata.sn + 1,
184                gov_version,
185            })
186            .await
187    }
188
189    async fn confirm(
190        &self,
191        ctx: &mut ActorContext<Self>,
192        new_owner: PublicKey,
193        gov_version: u64,
194    ) -> Result<(), ActorError> {
195        let node = ctx.get_parent::<Node>().await?;
196        node.tell(NodeMessage::ConfirmTransfer(
197            self.subject_metadata.subject_id.clone(),
198        ))
199        .await?;
200
201        if self.service || *self.our_key == self.subject_metadata.owner {
202            let subject_register = ctx
203                .system()
204                .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
205                    "/user/node/{}/subject_register",
206                    self.governance_id
207                )))
208                .await?;
209
210            let _response = subject_register
211                .ask(SubjectRegisterMessage::UpdateSubject {
212                    new_owner,
213                    old_owner: self.subject_metadata.owner.clone(),
214                    subject_id: self.subject_metadata.subject_id.clone(),
215                    namespace: self.namespace.to_string(),
216                    schema_id: self.subject_metadata.schema_id.clone(),
217                    gov_version,
218                })
219                .await?;
220        }
221
222        let witnesses_register = ctx
223            .system()
224            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
225                "/user/node/{}/witnesses_register",
226                self.governance_id
227            )))
228            .await?;
229        witnesses_register
230            .tell(WitnessesRegisterMessage::Confirm {
231                subject_id: self.subject_metadata.subject_id.clone(),
232                sn: self.subject_metadata.sn + 1,
233                gov_version,
234            })
235            .await
236    }
237
238    async fn transfer(
239        &self,
240        ctx: &mut ActorContext<Self>,
241        new_owner: PublicKey,
242        gov_version: u64,
243    ) -> Result<(), ActorError> {
244        let node = ctx.get_parent::<Node>().await?;
245        node.tell(NodeMessage::TransferSubject(TransferSubject {
246            name: self.subject_metadata.name.clone(),
247            subject_id: self.subject_metadata.subject_id.clone(),
248            new_owner: new_owner.clone(),
249            actual_owner: self.subject_metadata.owner.clone(),
250        }))
251        .await?;
252
253        let witnesses_register = ctx
254            .system()
255            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
256                "/user/node/{}/witnesses_register",
257                self.governance_id
258            )))
259            .await?;
260        witnesses_register
261            .tell(WitnessesRegisterMessage::Transfer {
262                subject_id: self.subject_metadata.subject_id.clone(),
263                new_owner,
264                gov_version,
265            })
266            .await
267    }
268
269    async fn get_last_ledger(
270        &self,
271        ctx: &mut ActorContext<Self>,
272    ) -> Result<Option<SignedLedger>, ActorError> {
273        get_last_event(ctx).await
274    }
275
276    fn apply_patch(
277        &mut self,
278        json_patch: ValueWrapper,
279    ) -> Result<(), ActorError> {
280        let patch_json = serde_json::from_value::<Patch>(json_patch.0)
281            .map_err(|e| {
282                let error = SubjectError::PatchConversionFailed {
283                    details: e.to_string(),
284                };
285                error!(
286                    error = %e,
287                    subject_id = %self.subject_metadata.subject_id,
288                    "Failed to convert patch from JSON"
289                );
290                ActorError::Functional {
291                    description: error.to_string(),
292                }
293            })?;
294
295        patch(&mut self.properties.0, &patch_json).map_err(|e| {
296            let error = SubjectError::PatchApplicationFailed {
297                details: e.to_string(),
298            };
299            error!(
300                error = %e,
301                subject_id = %self.subject_metadata.subject_id,
302                "Failed to apply patch to properties"
303            );
304            ActorError::Functional {
305                description: error.to_string(),
306            }
307        })?;
308
309        debug!(
310            subject_id = %self.subject_metadata.subject_id,
311            "Patch applied successfully"
312        );
313
314        Ok(())
315    }
316
317    async fn manager_new_ledger_events(
318        &mut self,
319        ctx: &mut ActorContext<Self>,
320        events: Vec<SignedLedger>,
321    ) -> Result<(), ActorError> {
322        let Some(hash) = self.hash else {
323            return Err(ActorError::FunctionalCritical {
324                description: "Can not obtain Hash".to_string(),
325            });
326        };
327
328        let current_sn = self.subject_metadata.sn;
329
330        if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
331        {
332            if let ActorError::Functional { description } = e.clone() {
333                warn!(
334                    error = %description,
335                    subject_id = %self.subject_metadata.subject_id,
336                    sn = self.subject_metadata.sn,
337                    "Error verifying new ledger events"
338                );
339
340                // Falló en la creación
341                if self.subject_metadata.sn == 0 {
342                    return Err(e);
343                }
344            } else {
345                error!(
346                    error = %e,
347                    subject_id = %self.subject_metadata.subject_id,
348                    sn = self.subject_metadata.sn,
349                    "Critical error verifying new ledger events"
350                );
351                return Err(e);
352            }
353        };
354
355        if current_sn < self.subject_metadata.sn || current_sn == 0 {
356            Self::publish_sink(
357                ctx,
358                SinkDataMessage::UpdateState(Box::new(Metadata::from(
359                    self.clone(),
360                ))),
361            )
362            .await?;
363
364            self.update_sn(ctx).await?;
365        }
366
367        Ok(())
368    }
369}
370
371impl Tracker {
372    async fn create(
373        &self,
374        ctx: &ActorContext<Self>,
375        gov_version: u64,
376    ) -> Result<(), ActorError> {
377        let sn_register = ctx
378            .system()
379            .get_actor::<SnRegister>(&ActorPath::from(format!(
380                "/user/node/{}/sn_register",
381                self.governance_id
382            )))
383            .await?;
384
385        sn_register
386            .tell(SnRegisterMessage::RegisterSn {
387                subject_id: self.subject_metadata.subject_id.clone(),
388                gov_version,
389                sn: 0,
390            })
391            .await?;
392
393        if self.service || *self.our_key == self.subject_metadata.owner {
394            let subject_register = ctx
395                .system()
396                .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
397                    "/user/node/{}/subject_register",
398                    self.governance_id
399                )))
400                .await?;
401
402            let _response = subject_register
403                .ask(SubjectRegisterMessage::CreateSubject {
404                    creator: self.subject_metadata.owner.clone(),
405                    subject_id: self.subject_metadata.subject_id.clone(),
406                    namespace: self.namespace.to_string(),
407                    schema_id: self.subject_metadata.schema_id.clone(),
408                    gov_version,
409                })
410                .await?;
411        }
412
413        let witnesses_register = ctx
414            .system()
415            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
416                "/user/node/{}/witnesses_register",
417                self.governance_id
418            )))
419            .await?;
420
421        witnesses_register
422            .tell(WitnessesRegisterMessage::Create {
423                subject_id: self.subject_metadata.subject_id.clone(),
424                gov_version,
425                owner: self.subject_metadata.owner.clone(),
426            })
427            .await
428    }
429
430    async fn register_gov_version_sn(
431        &self,
432        ctx: &ActorContext<Self>,
433        gov_version: u64,
434    ) -> Result<(), ActorError> {
435        let sn_register = ctx
436            .system()
437            .get_actor::<SnRegister>(&ActorPath::from(format!(
438                "/user/node/{}/sn_register",
439                self.governance_id
440            )))
441            .await?;
442
443        sn_register
444            .tell(SnRegisterMessage::RegisterSn {
445                subject_id: self.subject_metadata.subject_id.clone(),
446                gov_version,
447                sn: self.subject_metadata.sn,
448            })
449            .await
450    }
451
452    async fn get_governance(
453        &self,
454        ctx: &ActorContext<Self>,
455    ) -> Result<GovernanceData, ActorError> {
456        let governance_path =
457            ActorPath::from(format!("/user/node/{}", self.governance_id));
458
459        let governance_actor = ctx
460            .system()
461            .get_actor::<Governance>(&governance_path)
462            .await?;
463
464        let response = governance_actor
465            .ask(GovernanceMessage::GetGovernance)
466            .await?;
467
468        match response {
469            GovernanceResponse::Governance(gov) => Ok(*gov),
470            _ => Err(ActorError::UnexpectedResponse {
471                path: governance_path,
472                expected: "GovernanceResponse::Governance".to_owned(),
473            }),
474        }
475    }
476
477    async fn verify_new_ledger_events(
478        &mut self,
479        ctx: &mut ActorContext<Self>,
480        events: Vec<SignedLedger>,
481        hash: &HashAlgorithm,
482    ) -> Result<(), ActorError> {
483        let mut iter = events.into_iter();
484        let last_ledger = get_last_event(ctx).await?;
485
486        let Some(first) = iter.next() else {
487            return Ok(());
488        };
489
490        let mut pending = Vec::new();
491
492        let mut last_ledger = if let Some(last_ledger) = last_ledger {
493            pending.push(first);
494            last_ledger
495        } else {
496            if let Err(e) = Self::verify_first_ledger_event(
497                ctx,
498                &first,
499                hash,
500                Metadata::from(self.clone()),
501            )
502            .await
503            {
504                return Err(ActorError::Functional {
505                    description: e.to_string(),
506                });
507            }
508
509            self.create(ctx, first.content().gov_version).await?;
510
511            self.on_event(first.clone(), ctx).await;
512
513            Self::register(
514                ctx,
515                RegisterMessage::RegisterSubj {
516                    gov_id: self.governance_id.to_string(),
517                    subject_id: self.subject_metadata.subject_id.to_string(),
518                    schema_id: self.subject_metadata.schema_id.clone(),
519                    namespace: self.namespace.to_string(),
520                    name: self.subject_metadata.name.clone(),
521                    description: self.subject_metadata.description.clone(),
522                },
523            )
524            .await?;
525
526            Self::event_to_sink(
527                ctx,
528                DataForSink {
529                    gov_id: Some(self.governance_id.to_string()),
530                    subject_id: self.subject_metadata.subject_id.to_string(),
531                    sn: self.subject_metadata.sn,
532                    owner: self.subject_metadata.owner.to_string(),
533                    namespace: self.namespace.to_string(),
534                    schema_id: self.subject_metadata.schema_id.clone(),
535                    issuer: first
536                        .content()
537                        .event_request
538                        .signature()
539                        .signer
540                        .to_string(),
541                    event_ledger_timestamp: first
542                        .signature()
543                        .timestamp
544                        .as_nanos(),
545                    event_request_timestamp: first
546                        .content()
547                        .event_request
548                        .signature()
549                        .timestamp
550                        .as_nanos(),
551                    gov_version: first.content().gov_version,
552                    event_data_ledger: EventLedgerDataForSink::build(
553                        &first.content().protocols,
554                        &self.properties.0,
555                    ),
556                },
557                first.content().event_request.content(),
558            )
559            .await?;
560
561            first
562        };
563
564        pending.extend(iter);
565
566        for event in pending {
567            let actual_ledger_hash =
568                hash_borsh(&*hash.hasher(), &last_ledger.0).map_err(|e| {
569                    ActorError::FunctionalCritical {
570                        description: format!(
571                            "Can not creacte actual ledger event hash: {}",
572                            e
573                        ),
574                    }
575                })?;
576            let last_data = LastData {
577                gov_version: last_ledger.content().gov_version,
578                vali_data: last_ledger
579                    .content()
580                    .protocols
581                    .get_validation_data(),
582            };
583
584            let last_gov_version = last_data.gov_version;
585
586            let last_event_is_ok = match Self::verify_new_ledger_event(
587                ctx,
588                &event,
589                Metadata::from(self.clone()),
590                actual_ledger_hash,
591                last_data,
592                hash,
593            )
594            .await
595            {
596                Ok(last_event_is_ok) => last_event_is_ok,
597                Err(e) => {
598                    // Check if it's a sequence number error
599                    if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
600                        // El evento que estamos aplicando no es el siguiente.
601                        continue;
602                    } else {
603                        return Err(ActorError::Functional {
604                            description: e.to_string(),
605                        });
606                    }
607                }
608            };
609
610            let event_gov_version = event.content().gov_version;
611
612            if last_event_is_ok {
613                if last_gov_version != event_gov_version {
614                    self.register_gov_version_sn(ctx, last_gov_version).await?;
615                }
616
617                match event.content().event_request.content().clone() {
618                    EventRequest::Transfer(transfer_request) => {
619                        self.transfer(
620                            ctx,
621                            transfer_request.new_owner,
622                            event.content().gov_version,
623                        )
624                        .await?;
625                    }
626                    EventRequest::Reject(..) => {
627                        self.reject(ctx, event.content().gov_version).await?;
628                    }
629                    EventRequest::Confirm(..) => {
630                        self.confirm(
631                            ctx,
632                            event.signature().signer.clone(),
633                            event.content().gov_version,
634                        )
635                        .await?;
636                    }
637                    EventRequest::EOL(..) => {
638                        self.eol(ctx).await?;
639
640                        Self::register(
641                            ctx,
642                            RegisterMessage::EOLSubj {
643                                gov_id: self.governance_id.to_string(),
644                                subj_id: self
645                                    .subject_metadata
646                                    .subject_id
647                                    .to_string(),
648                            },
649                        )
650                        .await?
651                    }
652                    _ => {}
653                };
654
655                Self::event_to_sink(
656                    ctx,
657                    DataForSink {
658                        gov_id: Some(self.governance_id.to_string()),
659                        subject_id: self
660                            .subject_metadata
661                            .subject_id
662                            .to_string(),
663                        sn: self.subject_metadata.sn,
664                        owner: self.subject_metadata.owner.to_string(),
665                        namespace: self.namespace.to_string(),
666                        schema_id: self.subject_metadata.schema_id.clone(),
667                        issuer: event
668                            .content()
669                            .event_request
670                            .signature()
671                            .signer
672                            .to_string(),
673                        event_ledger_timestamp: event
674                            .signature()
675                            .timestamp
676                            .as_nanos(),
677                        event_request_timestamp: event
678                            .content()
679                            .event_request
680                            .signature()
681                            .timestamp
682                            .as_nanos(),
683                        gov_version: event.content().gov_version,
684                        event_data_ledger: EventLedgerDataForSink::build(
685                            &event.content().protocols,
686                            &self.properties.0,
687                        ),
688                    },
689                    event.content().event_request.content(),
690                )
691                .await?;
692            }
693
694            // Aplicar evento.
695            self.on_event(event.clone(), ctx).await;
696
697            // Registrar la gov_version del evento con el sn ya actualizado.
698            // Necesario cuando varios eventos comparten la misma gov_version:
699            // la transición (línea anterior) solo captura el sn antes del primer
700            // evento del nuevo gov_version, pero no el sn final de ese tramo.
701            self.register_gov_version_sn(ctx, event_gov_version).await?;
702
703            // Actualizar último evento.
704            last_ledger = event.clone();
705        }
706
707        Ok(())
708    }
709}
710
711#[derive(Debug, Clone)]
712pub enum TrackerMessage {
713    GetMetadata,
714    GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
715    GetLastLedger,
716    UpdateLedger { events: Vec<SignedLedger> },
717    GetGovernance,
718}
719
720impl Message for TrackerMessage {}
721
722#[derive(Debug, Clone)]
723pub enum TrackerResponse {
724    /// The subject metadata.
725    Metadata(Box<Metadata>),
726    UpdateResult(u64, PublicKey, Option<PublicKey>),
727    Ledger {
728        ledger: Vec<SignedLedger>,
729        is_all: bool,
730    },
731    LastLedger {
732        ledger_event: Box<Option<SignedLedger>>,
733    },
734    Governance(Box<GovernanceData>),
735    Sn(u64),
736    Ok,
737}
738impl Response for TrackerResponse {}
739
740#[async_trait]
741impl Actor for Tracker {
742    type Event = SignedLedger;
743    type Message = TrackerMessage;
744    type Response = TrackerResponse;
745
746    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
747        parent_span.map_or_else(
748            || info_span!("Tracker", id),
749            |parent_span| info_span!(parent: parent_span, "Tracker", id),
750        )
751    }
752
753    async fn pre_start(
754        &mut self,
755        ctx: &mut ActorContext<Self>,
756    ) -> Result<(), ActorError> {
757        if let Err(e) = self.init_store("tracker", None, true, ctx).await {
758            error!(
759                error = %e,
760                "Failed to initialize tracker store"
761            );
762            return Err(e);
763        }
764
765        let our_key = self.our_key.clone();
766
767        if self.subject_metadata.active {
768            let Some(ext_db): Option<Arc<ExternalDB>> =
769                ctx.system().get_helper("ext_db").await
770            else {
771                error!("External database helper not found");
772                return Err(ActorError::Helper {
773                    name: "ext_db".to_owned(),
774                    reason: "Not found".to_owned(),
775                });
776            };
777
778            let Some(ave_sink): Option<AveSink> =
779                ctx.system().get_helper("sink").await
780            else {
781                error!("Sink helper not found");
782                return Err(ActorError::Helper {
783                    name: "sink".to_owned(),
784                    reason: "Not found".to_owned(),
785                });
786            };
787
788            let sink_actor = match ctx
789                .create_child(
790                    "sink_data",
791                    SinkData {
792                        public_key: our_key.to_string(),
793                    },
794                )
795                .await
796            {
797                Ok(actor) => actor,
798                Err(e) => {
799                    error!(
800                        error = %e,
801                        "Failed to create sink_data child"
802                    );
803                    return Err(e);
804                }
805            };
806            let sink =
807                Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
808            ctx.system().run_sink(sink).await;
809
810            let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
811            ctx.system().run_sink(sink).await;
812        }
813
814        Ok(())
815    }
816}
817
818#[async_trait]
819impl Handler<Self> for Tracker {
820    async fn handle_message(
821        &mut self,
822        _sender: ActorPath,
823        msg: TrackerMessage,
824        ctx: &mut ActorContext<Self>,
825    ) -> Result<TrackerResponse, ActorError> {
826        match msg {
827            TrackerMessage::GetLedger { lo_sn, hi_sn } => {
828                let (ledger, is_all) =
829                    self.get_ledger(ctx, lo_sn, hi_sn).await?;
830                Ok(TrackerResponse::Ledger { ledger, is_all })
831            }
832            TrackerMessage::GetLastLedger => {
833                let ledger_event = self.get_last_ledger(ctx).await?;
834                Ok(TrackerResponse::LastLedger {
835                    ledger_event: Box::new(ledger_event),
836                })
837            }
838            TrackerMessage::GetMetadata => Ok(TrackerResponse::Metadata(
839                Box::new(Metadata::from(self.clone())),
840            )),
841            TrackerMessage::UpdateLedger { events } => {
842                let events_count = events.len();
843                if let Err(e) =
844                    self.manager_new_ledger_events(ctx, events).await
845                {
846                    warn!(
847                        msg_type = "UpdateLedger",
848                        error = %e,
849                        subject_id = %self.subject_metadata.subject_id,
850                        events_count = events_count,
851                        "Failed to verify new ledger events"
852                    );
853                    return Err(e);
854                };
855
856                debug!(
857                    msg_type = "UpdateLedger",
858                    subject_id = %self.subject_metadata.subject_id,
859                    sn = self.subject_metadata.sn,
860                    events_count = events_count,
861                    "Ledger updated successfully"
862                );
863
864                Ok(TrackerResponse::UpdateResult(
865                    self.subject_metadata.sn,
866                    self.subject_metadata.owner.clone(),
867                    self.subject_metadata.new_owner.clone(),
868                ))
869            }
870            TrackerMessage::GetGovernance => {
871                return Ok(TrackerResponse::Governance(Box::new(
872                    self.get_governance(ctx).await?,
873                )));
874            }
875        }
876    }
877
878    async fn on_event(
879        &mut self,
880        event: SignedLedger,
881        ctx: &mut ActorContext<Self>,
882    ) {
883        if let Err(e) = self.persist(&event, ctx).await {
884            error!(
885                error = %e,
886                subject_id = %self.subject_metadata.subject_id,
887                sn = self.subject_metadata.sn,
888                "Failed to persist event"
889            );
890            emit_fail(ctx, e).await;
891        };
892
893        if let Err(e) = ctx.publish_event(event.clone()).await {
894            error!(
895                error = %e,
896                subject_id = %self.subject_metadata.subject_id,
897                sn = self.subject_metadata.sn,
898                "Failed to publish event"
899            );
900            emit_fail(ctx, e).await;
901        } else {
902            debug!(
903                subject_id = %self.subject_metadata.subject_id,
904                sn = self.subject_metadata.sn,
905                "Event persisted and published successfully"
906            );
907        }
908    }
909
910    async fn on_child_fault(
911        &mut self,
912        error: ActorError,
913        ctx: &mut ActorContext<Self>,
914    ) -> ChildAction {
915        error!(
916            subject_id = %self.subject_metadata.subject_id,
917            sn = self.subject_metadata.sn,
918            error = %error,
919            "Child fault in tracker"
920        );
921        emit_fail(ctx, error).await;
922        ChildAction::Stop
923    }
924}
925
926pub struct InitParamsTracker {
927    pub data: Option<TrackerInit>,
928    pub public_key: Arc<PublicKey>,
929    pub hash: HashAlgorithm,
930    pub is_service: bool,
931}
932
933#[async_trait]
934impl PersistentActor for Tracker {
935    type Persistence = FullPersistence;
936    type InitParams = InitParamsTracker;
937
938    fn update(&mut self, state: Self) {
939        self.properties = state.properties;
940        self.governance_id = state.governance_id;
941        self.namespace = state.namespace;
942        self.genesis_gov_version = state.genesis_gov_version;
943        self.subject_metadata = state.subject_metadata;
944    }
945
946    fn create_initial(params: Self::InitParams) -> Self {
947        let init = params.data.unwrap_or_default();
948
949        Self {
950            service: params.is_service,
951            hash: Some(params.hash),
952            our_key: params.public_key,
953            subject_metadata: init.subject_metadata,
954            properties: init.properties,
955            genesis_gov_version: init.genesis_gov_version,
956            governance_id: init.governance_id,
957            namespace: init.namespace,
958        }
959    }
960
961    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
962        match (
963            event.content().event_request.content(),
964            &event.content().protocols,
965        ) {
966            (EventRequest::Create(..), Protocols::Create { validation }) => {
967                if let ValidationMetadata::Metadata(metadata) =
968                    &validation.validation_metadata
969                {
970                    self.subject_metadata = SubjectMetadata::new(metadata);
971                    self.properties = metadata.properties.clone();
972
973                    debug!(
974                        event_type = "Create",
975                        subject_id = %self.subject_metadata.subject_id,
976                        sn = self.subject_metadata.sn,
977                        "Applied create event"
978                    );
979                } else {
980                    error!(
981                        event_type = "Create",
982                        "Validation metadata must be Metadata type"
983                    );
984                    return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
985                }
986
987                return Ok(());
988            }
989            (
990                EventRequest::Fact(..),
991                Protocols::TrackerFact { evaluation, .. },
992            ) => {
993                if let Some(eval_res) = evaluation.evaluator_res() {
994                    self.apply_patch(eval_res.patch)?;
995                    debug!(
996                        event_type = "Fact",
997                        subject_id = %self.subject_metadata.subject_id,
998                        "Applied fact event with patch"
999                    );
1000                }
1001            }
1002            (
1003                EventRequest::Transfer(transfer_request),
1004                Protocols::Transfer { evaluation, .. },
1005            ) => {
1006                if evaluation.evaluator_res().is_some() {
1007                    self.subject_metadata.new_owner =
1008                        Some(transfer_request.new_owner.clone());
1009                    debug!(
1010                        event_type = "Transfer",
1011                        subject_id = %self.subject_metadata.subject_id,
1012                        new_owner = %transfer_request.new_owner,
1013                        "Applied transfer event"
1014                    );
1015                }
1016            }
1017            (EventRequest::Confirm(..), Protocols::TrackerConfirm { .. }) => {
1018                if let Some(new_owner) = self.subject_metadata.new_owner.take()
1019                {
1020                    self.subject_metadata.owner = new_owner.clone();
1021                    debug!(
1022                        event_type = "Confirm",
1023                        subject_id = %self.subject_metadata.subject_id,
1024                        new_owner = %new_owner,
1025                        "Applied confirm event"
1026                    );
1027                } else {
1028                    error!(
1029                        event_type = "Confirm",
1030                        subject_id = %self.subject_metadata.subject_id,
1031                        "New owner is None in confirm event"
1032                    );
1033                    return Err(ActorError::Functional {
1034                        description: "In confirm event, new owner is None"
1035                            .to_owned(),
1036                    });
1037                }
1038            }
1039            (EventRequest::Reject(..), Protocols::Reject { .. }) => {
1040                self.subject_metadata.new_owner = None;
1041                debug!(
1042                    event_type = "Reject",
1043                    subject_id = %self.subject_metadata.subject_id,
1044                    "Applied reject event"
1045                );
1046            }
1047            (EventRequest::EOL(..), Protocols::EOL { .. }) => {
1048                self.subject_metadata.active = false;
1049                debug!(
1050                    event_type = "EOL",
1051                    subject_id = %self.subject_metadata.subject_id,
1052                    "Applied EOL event"
1053                );
1054            }
1055            _ => {
1056                error!(
1057                    subject_id = %self.subject_metadata.subject_id,
1058                    "Invalid protocol data for Tracker"
1059                );
1060                return Err(ActorError::Functional {
1061                    description:
1062                        "Protocols data is for Governance and this is a Tracker"
1063                            .to_owned(),
1064                });
1065            }
1066        }
1067
1068        self.subject_metadata.sn += 1;
1069        self.subject_metadata.prev_ledger_event_hash =
1070            event.content().prev_ledger_event_hash.clone();
1071
1072        Ok(())
1073    }
1074}
1075
1076impl Storable for Tracker {}