Skip to main content

ave_core/tracker/
mod.rs

1use std::sync::Arc;
2
3use crate::{
4    db::Storable,
5    governance::{
6        sn_register::{SnRegister, SnRegisterMessage},
7        subject_register::{SubjectRegister, SubjectRegisterMessage},
8        witnesses_register::{WitnessesRegister, WitnessesRegisterMessage},
9    },
10    helpers::{db::ExternalDB, sink::AveSink},
11    model::{
12        common::{emit_fail, get_last_event, purge_storage},
13        event::{Protocols, ValidationMetadata},
14    },
15    node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
16    subject::{
17        DataForSink, EventLedgerDataForSink, Metadata, SignedLedger, Subject,
18        SubjectMetadata,
19        error::SubjectError,
20        sinkdata::{SinkData, SinkDataMessage},
21    },
22    validation::request::LastData,
23};
24
25use ave_actors::{
26    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
27    Response, Sink,
28};
29use ave_common::{
30    Namespace, ValueWrapper,
31    identity::{DigestIdentifier, HashAlgorithm, PublicKey, hash_borsh},
32    request::EventRequest,
33};
34
35use async_trait::async_trait;
36use ave_actors::{FullPersistence, PersistentActor};
37use borsh::{BorshDeserialize, BorshSerialize};
38use json_patch::{Patch, patch};
39use serde::{Deserialize, Serialize};
40use tracing::{Span, debug, error, info_span, warn};
41
42#[derive(Default, Debug, Serialize, Deserialize, Clone)]
43pub struct Tracker {
44    #[serde(skip)]
45    pub our_key: Arc<PublicKey>,
46    #[serde(skip)]
47    pub service: bool,
48    #[serde(skip)]
49    pub hash: Option<HashAlgorithm>,
50
51    pub subject_metadata: SubjectMetadata,
52    pub governance_id: DigestIdentifier,
53    /// The namespace of the subject.
54    pub namespace: Namespace,
55    /// The version of the governance contract that created the subject.
56    pub genesis_gov_version: u64,
57    /// The current status of the subject.
58    pub properties: ValueWrapper,
59}
60
61#[derive(Default)]
62pub struct TrackerInit {
63    pub subject_metadata: SubjectMetadata,
64    pub governance_id: DigestIdentifier,
65    pub namespace: Namespace,
66    pub genesis_gov_version: u64,
67    pub properties: ValueWrapper,
68}
69
70impl From<&Metadata> for TrackerInit {
71    fn from(value: &Metadata) -> Self {
72        Self {
73            subject_metadata: SubjectMetadata::new(value),
74            governance_id: value.governance_id.clone(),
75            namespace: value.namespace.clone(),
76            genesis_gov_version: value.genesis_gov_version,
77            properties: value.properties.clone(),
78        }
79    }
80}
81
82impl BorshSerialize for Tracker {
83    fn serialize<W: std::io::Write>(
84        &self,
85        writer: &mut W,
86    ) -> std::io::Result<()> {
87        // Serialize only the fields we want to persist, skipping 'owner'
88        BorshSerialize::serialize(&self.subject_metadata, writer)?;
89        BorshSerialize::serialize(&self.governance_id, writer)?;
90        BorshSerialize::serialize(&self.namespace, writer)?;
91        BorshSerialize::serialize(&self.genesis_gov_version, writer)?;
92        BorshSerialize::serialize(&self.properties, writer)?;
93
94        Ok(())
95    }
96}
97
98impl BorshDeserialize for Tracker {
99    fn deserialize_reader<R: std::io::Read>(
100        reader: &mut R,
101    ) -> std::io::Result<Self> {
102        // Deserialize the persisted fields
103        let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
104        let governance_id = DigestIdentifier::deserialize_reader(reader)?;
105        let namespace = Namespace::deserialize_reader(reader)?;
106        let genesis_gov_version = u64::deserialize_reader(reader)?;
107        let properties = ValueWrapper::deserialize_reader(reader)?;
108
109        // Create a default/placeholder KeyPair for 'owner'
110        // This will be replaced by the actual owner during actor initialization
111        let our_key = Arc::new(PublicKey::default());
112        let hash = None;
113
114        Ok(Self {
115            service: false,
116            hash,
117            our_key,
118            subject_metadata,
119            governance_id,
120            namespace,
121            genesis_gov_version,
122            properties,
123        })
124    }
125}
126
127#[async_trait]
128impl Subject for Tracker {
129    async fn update_sn(
130        &self,
131        ctx: &mut ActorContext<Self>,
132    ) -> Result<(), ActorError> {
133        let witnesses_register = ctx
134            .system()
135            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
136                "/user/node/subject_manager/{}/witnesses_register",
137                self.governance_id
138            )))
139            .await?;
140        witnesses_register
141            .tell(WitnessesRegisterMessage::UpdateSn {
142                subject_id: self.subject_metadata.subject_id.clone(),
143                sn: self.subject_metadata.sn,
144            })
145            .await
146    }
147
148    async fn eol(
149        &self,
150        ctx: &mut ActorContext<Self>,
151    ) -> Result<(), ActorError> {
152        let node_path = ActorPath::from("/user/node");
153        let node = ctx.system().get_actor::<Node>(&node_path).await?;
154        node.tell(NodeMessage::EOLSubject {
155            subject_id: self.subject_metadata.subject_id.clone(),
156            i_owner: *self.our_key == self.subject_metadata.owner,
157        })
158        .await
159    }
160
161    async fn reject(
162        &self,
163        ctx: &mut ActorContext<Self>,
164        gov_version: u64,
165    ) -> Result<(), ActorError> {
166        let node_path = ActorPath::from("/user/node");
167        let node = ctx.system().get_actor::<Node>(&node_path).await?;
168        node.tell(NodeMessage::RejectTransfer(
169            self.subject_metadata.subject_id.clone(),
170        ))
171        .await?;
172
173        let witnesses_register = ctx
174            .system()
175            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
176                "/user/node/subject_manager/{}/witnesses_register",
177                self.governance_id
178            )))
179            .await?;
180        witnesses_register
181            .tell(WitnessesRegisterMessage::Reject {
182                subject_id: self.subject_metadata.subject_id.clone(),
183                sn: self.subject_metadata.sn + 1,
184                gov_version,
185            })
186            .await
187    }
188
189    async fn confirm(
190        &self,
191        ctx: &mut ActorContext<Self>,
192        new_owner: PublicKey,
193        gov_version: u64,
194    ) -> Result<(), ActorError> {
195        let node_path = ActorPath::from("/user/node");
196        let node = ctx.system().get_actor::<Node>(&node_path).await?;
197        node.tell(NodeMessage::ConfirmTransfer(
198            self.subject_metadata.subject_id.clone(),
199        ))
200        .await?;
201
202        if self.service || *self.our_key == self.subject_metadata.owner {
203            let subject_register = ctx
204                .system()
205                .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
206                    "/user/node/subject_manager/{}/subject_register",
207                    self.governance_id
208                )))
209                .await?;
210
211            let _response = subject_register
212                .ask(SubjectRegisterMessage::UpdateSubject {
213                    new_owner,
214                    old_owner: self.subject_metadata.owner.clone(),
215                    subject_id: self.subject_metadata.subject_id.clone(),
216                    namespace: self.namespace.to_string(),
217                    schema_id: self.subject_metadata.schema_id.clone(),
218                    gov_version,
219                })
220                .await?;
221        }
222
223        let witnesses_register = ctx
224            .system()
225            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
226                "/user/node/subject_manager/{}/witnesses_register",
227                self.governance_id
228            )))
229            .await?;
230        witnesses_register
231            .tell(WitnessesRegisterMessage::Confirm {
232                subject_id: self.subject_metadata.subject_id.clone(),
233                sn: self.subject_metadata.sn + 1,
234                gov_version,
235            })
236            .await
237    }
238
239    async fn transfer(
240        &self,
241        ctx: &mut ActorContext<Self>,
242        new_owner: PublicKey,
243        gov_version: u64,
244    ) -> Result<(), ActorError> {
245        let node_path = ActorPath::from("/user/node");
246        let node = ctx.system().get_actor::<Node>(&node_path).await?;
247        node.tell(NodeMessage::TransferSubject(TransferSubject {
248            name: self.subject_metadata.name.clone(),
249            subject_id: self.subject_metadata.subject_id.clone(),
250            new_owner: new_owner.clone(),
251            actual_owner: self.subject_metadata.owner.clone(),
252        }))
253        .await?;
254
255        let witnesses_register = ctx
256            .system()
257            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
258                "/user/node/subject_manager/{}/witnesses_register",
259                self.governance_id
260            )))
261            .await?;
262        witnesses_register
263            .tell(WitnessesRegisterMessage::Transfer {
264                subject_id: self.subject_metadata.subject_id.clone(),
265                new_owner,
266                gov_version,
267            })
268            .await
269    }
270
271    async fn get_last_ledger(
272        &self,
273        ctx: &mut ActorContext<Self>,
274    ) -> Result<Option<SignedLedger>, ActorError> {
275        get_last_event(ctx).await
276    }
277
278    fn apply_patch(
279        &mut self,
280        json_patch: ValueWrapper,
281    ) -> Result<(), ActorError> {
282        let patch_json = serde_json::from_value::<Patch>(json_patch.0)
283            .map_err(|e| {
284                let error = SubjectError::PatchConversionFailed {
285                    details: e.to_string(),
286                };
287                error!(
288                    error = %e,
289                    subject_id = %self.subject_metadata.subject_id,
290                    "Failed to convert patch from JSON"
291                );
292                ActorError::Functional {
293                    description: error.to_string(),
294                }
295            })?;
296
297        patch(&mut self.properties.0, &patch_json).map_err(|e| {
298            let error = SubjectError::PatchApplicationFailed {
299                details: e.to_string(),
300            };
301            error!(
302                error = %e,
303                subject_id = %self.subject_metadata.subject_id,
304                "Failed to apply patch to properties"
305            );
306            ActorError::Functional {
307                description: error.to_string(),
308            }
309        })?;
310
311        debug!(
312            subject_id = %self.subject_metadata.subject_id,
313            "Patch applied successfully"
314        );
315
316        Ok(())
317    }
318
319    async fn manager_new_ledger_events(
320        &mut self,
321        ctx: &mut ActorContext<Self>,
322        events: Vec<SignedLedger>,
323    ) -> Result<(), ActorError> {
324        let Some(hash) = self.hash else {
325            return Err(ActorError::FunctionalCritical {
326                description: "Can not obtain Hash".to_string(),
327            });
328        };
329
330        let current_sn = self.subject_metadata.sn;
331
332        if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
333        {
334            if let ActorError::Functional { description } = e.clone() {
335                warn!(
336                    error = %description,
337                    subject_id = %self.subject_metadata.subject_id,
338                    sn = self.subject_metadata.sn,
339                    "Error verifying new ledger events"
340                );
341
342                // Falló en la creación
343                if self.subject_metadata.sn == 0 {
344                    return Err(e);
345                }
346            } else {
347                error!(
348                    error = %e,
349                    subject_id = %self.subject_metadata.subject_id,
350                    sn = self.subject_metadata.sn,
351                    "Critical error verifying new ledger events"
352                );
353                return Err(e);
354            }
355        };
356
357        if current_sn < self.subject_metadata.sn || current_sn == 0 {
358            Self::publish_sink(
359                ctx,
360                SinkDataMessage::UpdateState(Box::new(Metadata::from(
361                    self.clone(),
362                ))),
363            )
364            .await?;
365
366            self.update_sn(ctx).await?;
367        }
368
369        Ok(())
370    }
371}
372
373impl Tracker {
374    async fn create(
375        &self,
376        ctx: &ActorContext<Self>,
377        gov_version: u64,
378    ) -> Result<(), ActorError> {
379        let sn_register = ctx
380            .system()
381            .get_actor::<SnRegister>(&ActorPath::from(format!(
382                "/user/node/subject_manager/{}/sn_register",
383                self.governance_id
384            )))
385            .await?;
386
387        sn_register
388            .tell(SnRegisterMessage::RegisterSn {
389                subject_id: self.subject_metadata.subject_id.clone(),
390                gov_version,
391                sn: 0,
392            })
393            .await?;
394
395        if self.service || *self.our_key == self.subject_metadata.owner {
396            let subject_register = ctx
397                .system()
398                .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
399                    "/user/node/subject_manager/{}/subject_register",
400                    self.governance_id
401                )))
402                .await?;
403
404            let _response = subject_register
405                .ask(SubjectRegisterMessage::CreateSubject {
406                    creator: self.subject_metadata.owner.clone(),
407                    subject_id: self.subject_metadata.subject_id.clone(),
408                    namespace: self.namespace.to_string(),
409                    schema_id: self.subject_metadata.schema_id.clone(),
410                    gov_version,
411                })
412                .await?;
413        }
414
415        let witnesses_register = ctx
416            .system()
417            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
418                "/user/node/subject_manager/{}/witnesses_register",
419                self.governance_id
420            )))
421            .await?;
422
423        witnesses_register
424            .tell(WitnessesRegisterMessage::Create {
425                subject_id: self.subject_metadata.subject_id.clone(),
426                gov_version,
427                owner: self.subject_metadata.owner.clone(),
428            })
429            .await
430    }
431
432    async fn register_gov_version_sn(
433        &self,
434        ctx: &ActorContext<Self>,
435        gov_version: u64,
436    ) -> Result<(), ActorError> {
437        let sn_register = ctx
438            .system()
439            .get_actor::<SnRegister>(&ActorPath::from(format!(
440                "/user/node/subject_manager/{}/sn_register",
441                self.governance_id
442            )))
443            .await?;
444
445        sn_register
446            .tell(SnRegisterMessage::RegisterSn {
447                subject_id: self.subject_metadata.subject_id.clone(),
448                gov_version,
449                sn: self.subject_metadata.sn,
450            })
451            .await
452    }
453
454    async fn verify_new_ledger_events(
455        &mut self,
456        ctx: &mut ActorContext<Self>,
457        events: Vec<SignedLedger>,
458        hash: &HashAlgorithm,
459    ) -> Result<(), ActorError> {
460        let mut iter = events.into_iter();
461        let last_ledger = get_last_event(ctx).await?;
462
463        let Some(first) = iter.next() else {
464            return Ok(());
465        };
466
467        let mut pending = Vec::new();
468
469        let mut last_ledger = if let Some(last_ledger) = last_ledger {
470            pending.push(first);
471            last_ledger
472        } else {
473            if let Err(e) = Self::verify_first_ledger_event(
474                ctx,
475                &first,
476                hash,
477                Metadata::from(self.clone()),
478            )
479            .await
480            {
481                return Err(ActorError::Functional {
482                    description: e.to_string(),
483                });
484            }
485
486            self.create(ctx, first.content().gov_version).await?;
487
488            self.on_event(first.clone(), ctx).await;
489
490            Self::register(
491                ctx,
492                RegisterMessage::RegisterSubj {
493                    gov_id: self.governance_id.to_string(),
494                    subject_id: self.subject_metadata.subject_id.to_string(),
495                    schema_id: self.subject_metadata.schema_id.clone(),
496                    namespace: self.namespace.to_string(),
497                    name: self.subject_metadata.name.clone(),
498                    description: self.subject_metadata.description.clone(),
499                },
500            )
501            .await?;
502
503            Self::event_to_sink(
504                ctx,
505                DataForSink {
506                    gov_id: Some(self.governance_id.to_string()),
507                    subject_id: self.subject_metadata.subject_id.to_string(),
508                    sn: self.subject_metadata.sn,
509                    owner: self.subject_metadata.owner.to_string(),
510                    namespace: self.namespace.to_string(),
511                    schema_id: self.subject_metadata.schema_id.clone(),
512                    issuer: first
513                        .content()
514                        .event_request
515                        .signature()
516                        .signer
517                        .to_string(),
518                    event_ledger_timestamp: first
519                        .signature()
520                        .timestamp
521                        .as_nanos(),
522                    event_request_timestamp: first
523                        .content()
524                        .event_request
525                        .signature()
526                        .timestamp
527                        .as_nanos(),
528                    gov_version: first.content().gov_version,
529                    event_data_ledger: EventLedgerDataForSink::build(
530                        &first.content().protocols,
531                        &self.properties.0,
532                    ),
533                },
534                first.content().event_request.content(),
535            )
536            .await?;
537
538            first
539        };
540
541        pending.extend(iter);
542
543        for event in pending {
544            let actual_ledger_hash =
545                hash_borsh(&*hash.hasher(), &last_ledger.0).map_err(|e| {
546                    ActorError::FunctionalCritical {
547                        description: format!(
548                            "Can not creacte actual ledger event hash: {}",
549                            e
550                        ),
551                    }
552                })?;
553            let last_data = LastData {
554                gov_version: last_ledger.content().gov_version,
555                vali_data: last_ledger
556                    .content()
557                    .protocols
558                    .get_validation_data(),
559            };
560
561            let last_gov_version = last_data.gov_version;
562
563            let last_event_is_ok = match Self::verify_new_ledger_event(
564                ctx,
565                &event,
566                Metadata::from(self.clone()),
567                actual_ledger_hash,
568                last_data,
569                hash,
570            )
571            .await
572            {
573                Ok(last_event_is_ok) => last_event_is_ok,
574                Err(e) => {
575                    // Check if it's a sequence number error
576                    if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
577                        // El evento que estamos aplicando no es el siguiente.
578                        continue;
579                    } else {
580                        return Err(ActorError::Functional {
581                            description: e.to_string(),
582                        });
583                    }
584                }
585            };
586
587            let event_gov_version = event.content().gov_version;
588
589            if last_event_is_ok {
590                if last_gov_version != event_gov_version {
591                    self.register_gov_version_sn(ctx, last_gov_version).await?;
592                }
593
594                match event.content().event_request.content().clone() {
595                    EventRequest::Transfer(transfer_request) => {
596                        self.transfer(
597                            ctx,
598                            transfer_request.new_owner,
599                            event.content().gov_version,
600                        )
601                        .await?;
602                    }
603                    EventRequest::Reject(..) => {
604                        self.reject(ctx, event.content().gov_version).await?;
605                    }
606                    EventRequest::Confirm(..) => {
607                        self.confirm(
608                            ctx,
609                            event.signature().signer.clone(),
610                            event.content().gov_version,
611                        )
612                        .await?;
613                    }
614                    EventRequest::EOL(..) => {
615                        self.eol(ctx).await?;
616
617                        Self::register(
618                            ctx,
619                            RegisterMessage::EOLSubj {
620                                gov_id: self.governance_id.to_string(),
621                                subj_id: self
622                                    .subject_metadata
623                                    .subject_id
624                                    .to_string(),
625                            },
626                        )
627                        .await?
628                    }
629                    _ => {}
630                };
631
632                Self::event_to_sink(
633                    ctx,
634                    DataForSink {
635                        gov_id: Some(self.governance_id.to_string()),
636                        subject_id: self
637                            .subject_metadata
638                            .subject_id
639                            .to_string(),
640                        sn: self.subject_metadata.sn,
641                        owner: self.subject_metadata.owner.to_string(),
642                        namespace: self.namespace.to_string(),
643                        schema_id: self.subject_metadata.schema_id.clone(),
644                        issuer: event
645                            .content()
646                            .event_request
647                            .signature()
648                            .signer
649                            .to_string(),
650                        event_ledger_timestamp: event
651                            .signature()
652                            .timestamp
653                            .as_nanos(),
654                        event_request_timestamp: event
655                            .content()
656                            .event_request
657                            .signature()
658                            .timestamp
659                            .as_nanos(),
660                        gov_version: event.content().gov_version,
661                        event_data_ledger: EventLedgerDataForSink::build(
662                            &event.content().protocols,
663                            &self.properties.0,
664                        ),
665                    },
666                    event.content().event_request.content(),
667                )
668                .await?;
669            }
670
671            // Aplicar evento.
672            self.on_event(event.clone(), ctx).await;
673
674            // Registrar la gov_version del evento con el sn ya actualizado.
675            // Necesario cuando varios eventos comparten la misma gov_version:
676            // la transición (línea anterior) solo captura el sn antes del primer
677            // evento del nuevo gov_version, pero no el sn final de ese tramo.
678            self.register_gov_version_sn(ctx, event_gov_version).await?;
679
680            // Actualizar último evento.
681            last_ledger = event.clone();
682        }
683
684        Ok(())
685    }
686}
687
688#[derive(Debug, Clone)]
689pub enum TrackerMessage {
690    GetMetadata,
691    GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
692    GetLastLedger,
693    PurgeStorage,
694    UpdateLedger { events: Vec<SignedLedger> },
695}
696
697impl Message for TrackerMessage {}
698
699#[derive(Debug, Clone)]
700pub enum TrackerResponse {
701    /// The subject metadata.
702    Metadata(Box<Metadata>),
703    UpdateResult(u64, PublicKey, Option<PublicKey>),
704    Ledger {
705        ledger: Vec<SignedLedger>,
706        is_all: bool,
707    },
708    LastLedger {
709        ledger_event: Box<Option<SignedLedger>>,
710    },
711    Sn(u64),
712    Ok,
713}
714impl Response for TrackerResponse {}
715
716#[async_trait]
717impl Actor for Tracker {
718    type Event = SignedLedger;
719    type Message = TrackerMessage;
720    type Response = TrackerResponse;
721
722    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
723        parent_span.map_or_else(
724            || info_span!("Tracker", id),
725            |parent_span| info_span!(parent: parent_span, "Tracker", id),
726        )
727    }
728
729    async fn pre_start(
730        &mut self,
731        ctx: &mut ActorContext<Self>,
732    ) -> Result<(), ActorError> {
733        if let Err(e) = self.init_store("tracker", None, true, ctx).await {
734            error!(
735                error = %e,
736                "Failed to initialize tracker store"
737            );
738            return Err(e);
739        }
740
741        let Some(config): Option<crate::system::ConfigHelper> =
742            ctx.system().get_helper("config").await
743        else {
744            return Err(ActorError::Helper {
745                name: "config".to_owned(),
746                reason: "Not found".to_owned(),
747            });
748        };
749
750        if config.safe_mode {
751            return Ok(());
752        }
753
754        let our_key = self.our_key.clone();
755
756        if self.subject_metadata.active {
757            let Some(ext_db): Option<Arc<ExternalDB>> =
758                ctx.system().get_helper("ext_db").await
759            else {
760                error!("External database helper not found");
761                return Err(ActorError::Helper {
762                    name: "ext_db".to_owned(),
763                    reason: "Not found".to_owned(),
764                });
765            };
766
767            let Some(ave_sink): Option<AveSink> =
768                ctx.system().get_helper("sink").await
769            else {
770                error!("Sink helper not found");
771                return Err(ActorError::Helper {
772                    name: "sink".to_owned(),
773                    reason: "Not found".to_owned(),
774                });
775            };
776
777            let sink_actor = match ctx
778                .create_child(
779                    "sink_data",
780                    SinkData {
781                        public_key: our_key.to_string(),
782                    },
783                )
784                .await
785            {
786                Ok(actor) => actor,
787                Err(e) => {
788                    error!(
789                        error = %e,
790                        "Failed to create sink_data child"
791                    );
792                    return Err(e);
793                }
794            };
795            let sink =
796                Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
797            ctx.system().run_sink(sink).await;
798
799            let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
800            ctx.system().run_sink(sink).await;
801        }
802
803        Ok(())
804    }
805}
806
807#[async_trait]
808impl Handler<Self> for Tracker {
809    async fn handle_message(
810        &mut self,
811        _sender: ActorPath,
812        msg: TrackerMessage,
813        ctx: &mut ActorContext<Self>,
814    ) -> Result<TrackerResponse, ActorError> {
815        match msg {
816            TrackerMessage::GetLedger { lo_sn, hi_sn } => {
817                let (ledger, is_all) =
818                    self.get_ledger(ctx, lo_sn, hi_sn).await?;
819                Ok(TrackerResponse::Ledger { ledger, is_all })
820            }
821            TrackerMessage::GetLastLedger => {
822                let ledger_event = self.get_last_ledger(ctx).await?;
823                Ok(TrackerResponse::LastLedger {
824                    ledger_event: Box::new(ledger_event),
825                })
826            }
827            TrackerMessage::GetMetadata => Ok(TrackerResponse::Metadata(
828                Box::new(Metadata::from(self.clone())),
829            )),
830            TrackerMessage::PurgeStorage => {
831                purge_storage(ctx).await?;
832
833                debug!(
834                    msg_type = "PurgeStorage",
835                    subject_id = %self.subject_metadata.subject_id,
836                    "Tracker storage purged"
837                );
838
839                Ok(TrackerResponse::Ok)
840            }
841            TrackerMessage::UpdateLedger { events } => {
842                let events_count = events.len();
843                if let Err(e) =
844                    self.manager_new_ledger_events(ctx, events).await
845                {
846                    warn!(
847                        msg_type = "UpdateLedger",
848                        error = %e,
849                        subject_id = %self.subject_metadata.subject_id,
850                        events_count = events_count,
851                        "Failed to verify new ledger events"
852                    );
853                    return Err(e);
854                };
855
856                debug!(
857                    msg_type = "UpdateLedger",
858                    subject_id = %self.subject_metadata.subject_id,
859                    sn = self.subject_metadata.sn,
860                    events_count = events_count,
861                    "Ledger updated successfully"
862                );
863
864                Ok(TrackerResponse::UpdateResult(
865                    self.subject_metadata.sn,
866                    self.subject_metadata.owner.clone(),
867                    self.subject_metadata.new_owner.clone(),
868                ))
869            }
870        }
871    }
872
873    async fn on_event(
874        &mut self,
875        event: SignedLedger,
876        ctx: &mut ActorContext<Self>,
877    ) {
878        if let Err(e) = self.persist(&event, ctx).await {
879            error!(
880                error = %e,
881                subject_id = %self.subject_metadata.subject_id,
882                sn = self.subject_metadata.sn,
883                "Failed to persist event"
884            );
885            emit_fail(ctx, e).await;
886        };
887
888        if let Err(e) = ctx.publish_event(event.clone()).await {
889            error!(
890                error = %e,
891                subject_id = %self.subject_metadata.subject_id,
892                sn = self.subject_metadata.sn,
893                "Failed to publish event"
894            );
895            emit_fail(ctx, e).await;
896        } else {
897            debug!(
898                subject_id = %self.subject_metadata.subject_id,
899                sn = self.subject_metadata.sn,
900                "Event persisted and published successfully"
901            );
902        }
903    }
904
905    async fn on_child_fault(
906        &mut self,
907        error: ActorError,
908        ctx: &mut ActorContext<Self>,
909    ) -> ChildAction {
910        error!(
911            subject_id = %self.subject_metadata.subject_id,
912            sn = self.subject_metadata.sn,
913            error = %error,
914            "Child fault in tracker"
915        );
916        emit_fail(ctx, error).await;
917        ChildAction::Stop
918    }
919}
920
921pub struct InitParamsTracker {
922    pub data: Option<TrackerInit>,
923    pub public_key: Arc<PublicKey>,
924    pub hash: HashAlgorithm,
925    pub is_service: bool,
926}
927
928#[async_trait]
929impl PersistentActor for Tracker {
930    type Persistence = FullPersistence;
931    type InitParams = InitParamsTracker;
932
933    fn update(&mut self, state: Self) {
934        self.properties = state.properties;
935        self.governance_id = state.governance_id;
936        self.namespace = state.namespace;
937        self.genesis_gov_version = state.genesis_gov_version;
938        self.subject_metadata = state.subject_metadata;
939    }
940
941    fn create_initial(params: Self::InitParams) -> Self {
942        let init = params.data.unwrap_or_default();
943
944        Self {
945            service: params.is_service,
946            hash: Some(params.hash),
947            our_key: params.public_key,
948            subject_metadata: init.subject_metadata,
949            properties: init.properties,
950            genesis_gov_version: init.genesis_gov_version,
951            governance_id: init.governance_id,
952            namespace: init.namespace,
953        }
954    }
955
956    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
957        match (
958            event.content().event_request.content(),
959            &event.content().protocols,
960        ) {
961            (EventRequest::Create(..), Protocols::Create { validation }) => {
962                if let ValidationMetadata::Metadata(metadata) =
963                    &validation.validation_metadata
964                {
965                    self.subject_metadata = SubjectMetadata::new(metadata);
966                    self.properties = metadata.properties.clone();
967
968                    debug!(
969                        event_type = "Create",
970                        subject_id = %self.subject_metadata.subject_id,
971                        sn = self.subject_metadata.sn,
972                        "Applied create event"
973                    );
974                } else {
975                    error!(
976                        event_type = "Create",
977                        "Validation metadata must be Metadata type"
978                    );
979                    return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
980                }
981
982                return Ok(());
983            }
984            (
985                EventRequest::Fact(..),
986                Protocols::TrackerFact { evaluation, .. },
987            ) => {
988                if let Some(eval_res) = evaluation.evaluator_res() {
989                    self.apply_patch(eval_res.patch)?;
990                    debug!(
991                        event_type = "Fact",
992                        subject_id = %self.subject_metadata.subject_id,
993                        "Applied fact event with patch"
994                    );
995                }
996            }
997            (
998                EventRequest::Transfer(transfer_request),
999                Protocols::Transfer { evaluation, .. },
1000            ) => {
1001                if evaluation.evaluator_res().is_some() {
1002                    self.subject_metadata.new_owner =
1003                        Some(transfer_request.new_owner.clone());
1004                    debug!(
1005                        event_type = "Transfer",
1006                        subject_id = %self.subject_metadata.subject_id,
1007                        new_owner = %transfer_request.new_owner,
1008                        "Applied transfer event"
1009                    );
1010                }
1011            }
1012            (EventRequest::Confirm(..), Protocols::TrackerConfirm { .. }) => {
1013                if let Some(new_owner) = self.subject_metadata.new_owner.take()
1014                {
1015                    self.subject_metadata.owner = new_owner.clone();
1016                    debug!(
1017                        event_type = "Confirm",
1018                        subject_id = %self.subject_metadata.subject_id,
1019                        new_owner = %new_owner,
1020                        "Applied confirm event"
1021                    );
1022                } else {
1023                    error!(
1024                        event_type = "Confirm",
1025                        subject_id = %self.subject_metadata.subject_id,
1026                        "New owner is None in confirm event"
1027                    );
1028                    return Err(ActorError::Functional {
1029                        description: "In confirm event, new owner is None"
1030                            .to_owned(),
1031                    });
1032                }
1033            }
1034            (EventRequest::Reject(..), Protocols::Reject { .. }) => {
1035                self.subject_metadata.new_owner = None;
1036                debug!(
1037                    event_type = "Reject",
1038                    subject_id = %self.subject_metadata.subject_id,
1039                    "Applied reject event"
1040                );
1041            }
1042            (EventRequest::EOL(..), Protocols::EOL { .. }) => {
1043                self.subject_metadata.active = false;
1044                debug!(
1045                    event_type = "EOL",
1046                    subject_id = %self.subject_metadata.subject_id,
1047                    "Applied EOL event"
1048                );
1049            }
1050            _ => {
1051                error!(
1052                    subject_id = %self.subject_metadata.subject_id,
1053                    "Invalid protocol data for Tracker"
1054                );
1055                return Err(ActorError::Functional {
1056                    description:
1057                        "Protocols data is for Governance and this is a Tracker"
1058                            .to_owned(),
1059                });
1060            }
1061        }
1062
1063        self.subject_metadata.sn += 1;
1064        self.subject_metadata.prev_ledger_event_hash =
1065            event.content().prev_ledger_event_hash.clone();
1066
1067        Ok(())
1068    }
1069}
1070
1071impl Storable for Tracker {}