Skip to main content

ave_core/tracker/
mod.rs

1use std::{collections::BTreeSet, sync::Arc};
2
3use crate::{
4    db::Storable,
5    governance::{
6        sn_register::{SnRegister, SnRegisterMessage},
7        subject_register::{SubjectRegister, SubjectRegisterMessage},
8        witnesses_register::{
9            WitnessesRegister, WitnessesRegisterMessage,
10            WitnessesRegisterResponse,
11        },
12    },
13    helpers::{db::ExternalDB, sink::AveSink},
14    model::{
15        common::{
16            TrackerEventVisibility, TrackerStoredVisibility,
17            TrackerVisibilityMode, TrackerVisibilityState, emit_fail,
18            get_last_event, purge_storage,
19        },
20        event::{Ledger, Protocols, ValidationMetadata},
21    },
22    node::{Node, NodeMessage, TransferSubject, register::RegisterMessage},
23    subject::{
24        DataForSink, EventLedgerDataForSink, Metadata, Subject,
25        SubjectMetadata,
26        error::SubjectError,
27        sinkdata::{SinkData, SinkDataMessage},
28    },
29    validation::request::LastData,
30};
31
32use ave_actors::{
33    Actor, ActorContext, ActorError, ActorPath, ChildAction, Handler, Message,
34    Response, Sink,
35};
36use ave_common::{
37    Namespace, ValueWrapper,
38    identity::{DigestIdentifier, HashAlgorithm, PublicKey},
39    request::EventRequest,
40    response::SubjectDB,
41};
42
43use async_trait::async_trait;
44use ave_actors::{FullPersistence, PersistentActor};
45use borsh::{BorshDeserialize, BorshSerialize};
46use json_patch::{Patch, patch};
47use serde::{Deserialize, Serialize};
48use tracing::{Span, debug, error, info_span, warn};
49
50#[derive(Debug, Serialize, Deserialize, Clone)]
51pub struct Tracker {
52    #[serde(skip)]
53    pub our_key: Arc<PublicKey>,
54    #[serde(skip)]
55    pub service: bool,
56    #[serde(skip)]
57    pub only_clear_events: bool,
58    #[serde(skip)]
59    pub hash: Option<HashAlgorithm>,
60
61    pub subject_metadata: SubjectMetadata,
62    pub governance_id: DigestIdentifier,
63    /// The namespace of the subject.
64    pub namespace: Namespace,
65    /// The version of the governance contract that created the subject.
66    pub genesis_gov_version: u64,
67    pub visibility_mode: TrackerVisibilityMode,
68    /// The current status of the subject.
69    pub properties: ValueWrapper,
70}
71
72#[derive(Default)]
73pub struct TrackerInit {
74    pub subject_metadata: SubjectMetadata,
75    pub governance_id: DigestIdentifier,
76    pub namespace: Namespace,
77    pub genesis_gov_version: u64,
78    pub properties: ValueWrapper,
79}
80
81impl From<&Metadata> for TrackerInit {
82    fn from(value: &Metadata) -> Self {
83        Self {
84            subject_metadata: SubjectMetadata::new(value),
85            governance_id: value.governance_id.clone(),
86            namespace: value.namespace.clone(),
87            genesis_gov_version: value.genesis_gov_version,
88            properties: value.properties.clone(),
89        }
90    }
91}
92
93impl BorshSerialize for Tracker {
94    fn serialize<W: std::io::Write>(
95        &self,
96        writer: &mut W,
97    ) -> std::io::Result<()> {
98        // Serialize only the fields we want to persist, skipping 'owner'
99        BorshSerialize::serialize(&self.subject_metadata, writer)?;
100        BorshSerialize::serialize(&self.governance_id, writer)?;
101        BorshSerialize::serialize(&self.namespace, writer)?;
102        BorshSerialize::serialize(&self.genesis_gov_version, writer)?;
103        BorshSerialize::serialize(&self.visibility_mode, writer)?;
104        BorshSerialize::serialize(&self.properties, writer)?;
105
106        Ok(())
107    }
108}
109
110impl BorshDeserialize for Tracker {
111    fn deserialize_reader<R: std::io::Read>(
112        reader: &mut R,
113    ) -> std::io::Result<Self> {
114        // Deserialize the persisted fields
115        let subject_metadata = SubjectMetadata::deserialize_reader(reader)?;
116        let governance_id = DigestIdentifier::deserialize_reader(reader)?;
117        let namespace = Namespace::deserialize_reader(reader)?;
118        let genesis_gov_version = u64::deserialize_reader(reader)?;
119        let visibility_mode =
120            TrackerVisibilityMode::deserialize_reader(reader)?;
121        let properties = ValueWrapper::deserialize_reader(reader)?;
122
123        // Create a default/placeholder KeyPair for 'owner'
124        // This will be replaced by the actual owner during actor initialization
125        let our_key = Arc::new(PublicKey::default());
126        let hash = None;
127
128        Ok(Self {
129            service: false,
130            only_clear_events: false,
131            hash,
132            our_key,
133            subject_metadata,
134            governance_id,
135            namespace,
136            genesis_gov_version,
137            visibility_mode,
138            properties,
139        })
140    }
141}
142
143#[async_trait]
144impl Subject for Tracker {
145    async fn update_sn(
146        &self,
147        ctx: &mut ActorContext<Self>,
148    ) -> Result<(), ActorError> {
149        let witnesses_register = ctx
150            .system()
151            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
152                "/user/node/subject_manager/{}/witnesses_register",
153                self.governance_id
154            )))
155            .await?;
156        witnesses_register
157            .tell(WitnessesRegisterMessage::UpdateSn {
158                subject_id: self.subject_metadata.subject_id.clone(),
159                sn: self.subject_metadata.sn,
160            })
161            .await
162    }
163
164    async fn eol(
165        &self,
166        ctx: &mut ActorContext<Self>,
167    ) -> Result<(), ActorError> {
168        let node_path = ActorPath::from("/user/node");
169        let node = ctx.system().get_actor::<Node>(&node_path).await?;
170        node.tell(NodeMessage::EOLSubject {
171            subject_id: self.subject_metadata.subject_id.clone(),
172            i_owner: *self.our_key == self.subject_metadata.owner,
173        })
174        .await
175    }
176
177    async fn reject(
178        &self,
179        ctx: &mut ActorContext<Self>,
180        gov_version: u64,
181    ) -> Result<(), ActorError> {
182        let node_path = ActorPath::from("/user/node");
183        let node = ctx.system().get_actor::<Node>(&node_path).await?;
184        node.tell(NodeMessage::RejectTransfer(
185            self.subject_metadata.subject_id.clone(),
186        ))
187        .await?;
188
189        let witnesses_register = ctx
190            .system()
191            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
192                "/user/node/subject_manager/{}/witnesses_register",
193                self.governance_id
194            )))
195            .await?;
196        witnesses_register
197            .tell(WitnessesRegisterMessage::Reject {
198                subject_id: self.subject_metadata.subject_id.clone(),
199                sn: self.subject_metadata.sn + 1,
200                gov_version,
201            })
202            .await
203    }
204
205    async fn confirm(
206        &self,
207        ctx: &mut ActorContext<Self>,
208        new_owner: PublicKey,
209        gov_version: u64,
210    ) -> Result<(), ActorError> {
211        let node_path = ActorPath::from("/user/node");
212        let node = ctx.system().get_actor::<Node>(&node_path).await?;
213        node.tell(NodeMessage::ConfirmTransfer(
214            self.subject_metadata.subject_id.clone(),
215        ))
216        .await?;
217
218        if self.service || *self.our_key == self.subject_metadata.owner {
219            let subject_register = ctx
220                .system()
221                .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
222                    "/user/node/subject_manager/{}/subject_register",
223                    self.governance_id
224                )))
225                .await?;
226
227            let _response = subject_register
228                .ask(SubjectRegisterMessage::UpdateSubject {
229                    new_owner,
230                    old_owner: self.subject_metadata.owner.clone(),
231                    subject_id: self.subject_metadata.subject_id.clone(),
232                    namespace: self.namespace.to_string(),
233                    schema_id: self.subject_metadata.schema_id.clone(),
234                    gov_version,
235                })
236                .await?;
237        }
238
239        let witnesses_register = ctx
240            .system()
241            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
242                "/user/node/subject_manager/{}/witnesses_register",
243                self.governance_id
244            )))
245            .await?;
246        witnesses_register
247            .tell(WitnessesRegisterMessage::Confirm {
248                subject_id: self.subject_metadata.subject_id.clone(),
249                sn: self.subject_metadata.sn + 1,
250                gov_version,
251            })
252            .await
253    }
254
255    async fn transfer(
256        &self,
257        ctx: &mut ActorContext<Self>,
258        new_owner: PublicKey,
259        gov_version: u64,
260    ) -> Result<(), ActorError> {
261        let node_path = ActorPath::from("/user/node");
262        let node = ctx.system().get_actor::<Node>(&node_path).await?;
263        node.tell(NodeMessage::TransferSubject(TransferSubject {
264            name: self.subject_metadata.name.clone(),
265            subject_id: self.subject_metadata.subject_id.clone(),
266            new_owner: new_owner.clone(),
267            actual_owner: self.subject_metadata.owner.clone(),
268        }))
269        .await?;
270
271        let witnesses_register = ctx
272            .system()
273            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
274                "/user/node/subject_manager/{}/witnesses_register",
275                self.governance_id
276            )))
277            .await?;
278        witnesses_register
279            .tell(WitnessesRegisterMessage::Transfer {
280                subject_id: self.subject_metadata.subject_id.clone(),
281                new_owner,
282                gov_version,
283            })
284            .await
285    }
286
287    async fn get_last_ledger(
288        &self,
289        ctx: &mut ActorContext<Self>,
290    ) -> Result<Option<Ledger>, ActorError> {
291        get_last_event(ctx).await
292    }
293
294    fn apply_patch(
295        &mut self,
296        json_patch: ValueWrapper,
297    ) -> Result<(), ActorError> {
298        let patch_json = serde_json::from_value::<Patch>(json_patch.0)
299            .map_err(|e| {
300                let error = SubjectError::PatchConversionFailed {
301                    details: e.to_string(),
302                };
303                error!(
304                    error = %e,
305                    subject_id = %self.subject_metadata.subject_id,
306                    "Failed to convert patch from JSON"
307                );
308                ActorError::Functional {
309                    description: error.to_string(),
310                }
311            })?;
312
313        patch(&mut self.properties.0, &patch_json).map_err(|e| {
314            let error = SubjectError::PatchApplicationFailed {
315                details: e.to_string(),
316            };
317            error!(
318                error = %e,
319                subject_id = %self.subject_metadata.subject_id,
320                "Failed to apply patch to properties"
321            );
322            ActorError::Functional {
323                description: error.to_string(),
324            }
325        })?;
326
327        debug!(
328            subject_id = %self.subject_metadata.subject_id,
329            "Patch applied successfully"
330        );
331
332        Ok(())
333    }
334
335    async fn manager_new_ledger_events(
336        &mut self,
337        ctx: &mut ActorContext<Self>,
338        events: Vec<Ledger>,
339    ) -> Result<(), ActorError> {
340        let Some(hash) = self.hash else {
341            return Err(ActorError::FunctionalCritical {
342                description: "Can not obtain Hash".to_string(),
343            });
344        };
345
346        let current_sn = self.subject_metadata.sn;
347
348        if let Err(e) = self.verify_new_ledger_events(ctx, events, &hash).await
349        {
350            if let ActorError::Functional { description } = e.clone() {
351                warn!(
352                    error = %description,
353                    subject_id = %self.subject_metadata.subject_id,
354                    sn = self.subject_metadata.sn,
355                    "Error verifying new ledger events"
356                );
357
358                // Falló en la creación
359                if self.subject_metadata.sn == 0 {
360                    return Err(e);
361                }
362            } else {
363                error!(
364                    error = %e,
365                    subject_id = %self.subject_metadata.subject_id,
366                    sn = self.subject_metadata.sn,
367                    "Critical error verifying new ledger events"
368                );
369                return Err(e);
370            }
371        };
372
373        if current_sn < self.subject_metadata.sn || current_sn == 0 {
374            let subject_db = self.build_subject_db(ctx).await?;
375            Self::publish_sink(
376                ctx,
377                SinkDataMessage::UpdateState(Box::new(subject_db)),
378            )
379            .await?;
380
381            self.update_sn(ctx).await?;
382        }
383
384        Ok(())
385    }
386}
387
388impl Tracker {
389    const fn public_visibilities()
390    -> (TrackerStoredVisibility, TrackerEventVisibility) {
391        (
392            TrackerStoredVisibility::Full,
393            TrackerEventVisibility::NonFact,
394        )
395    }
396
397    fn fact_visibilities(
398        viewpoints: &BTreeSet<String>,
399        opaque: bool,
400    ) -> (TrackerStoredVisibility, TrackerEventVisibility) {
401        let event_visibility = TrackerEventVisibility::Fact(viewpoints.clone());
402
403        let stored_visibility = if opaque {
404            TrackerStoredVisibility::None
405        } else if viewpoints.is_empty() {
406            TrackerStoredVisibility::Full
407        } else {
408            TrackerStoredVisibility::Only(viewpoints.clone())
409        };
410
411        (stored_visibility, event_visibility)
412    }
413
414    const fn is_full(&self) -> bool {
415        matches!(self.visibility_mode, TrackerVisibilityMode::Full)
416    }
417
418    async fn record_visibility_event(
419        &self,
420        ctx: &ActorContext<Self>,
421        event: &Ledger,
422    ) -> Result<(), ActorError> {
423        let (stored_visibility, event_visibility, mode) = match &event.protocols
424        {
425            Protocols::Create { .. } => {
426                let (stored_visibility, event_visibility) =
427                    Self::public_visibilities();
428                (
429                    stored_visibility,
430                    event_visibility,
431                    TrackerVisibilityMode::Full,
432                )
433            }
434            Protocols::TrackerFactFull { event_request, .. } => {
435                let EventRequest::Fact(fact_request) = event_request.content()
436                else {
437                    return Err(ActorError::Functional {
438                        description:
439                            "In fact event, event request must be Fact"
440                                .to_owned(),
441                    });
442                };
443                let (stored_visibility, event_visibility) =
444                    Self::fact_visibilities(&fact_request.viewpoints, false);
445                (stored_visibility, event_visibility, self.visibility_mode)
446            }
447            Protocols::TrackerFactOpaque { evaluation, .. } => {
448                let (stored_visibility, event_visibility) =
449                    Self::fact_visibilities(&evaluation.viewpoints, true);
450                let mode = if evaluation.is_ok() {
451                    TrackerVisibilityMode::Opaque
452                } else {
453                    self.visibility_mode
454                };
455                (stored_visibility, event_visibility, mode)
456            }
457            Protocols::Transfer { .. }
458            | Protocols::TrackerConfirm { .. }
459            | Protocols::Reject { .. }
460            | Protocols::EOL { .. } => {
461                let (stored_visibility, event_visibility) =
462                    Self::public_visibilities();
463                (stored_visibility, event_visibility, self.visibility_mode)
464            }
465            _ => {
466                return Err(ActorError::Functional {
467                    description: "Invalid protocol data for tracker visibility"
468                        .to_owned(),
469                });
470            }
471        };
472
473        let witnesses_register = ctx
474            .system()
475            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
476                "/user/node/subject_manager/{}/witnesses_register",
477                self.governance_id
478            )))
479            .await?;
480
481        witnesses_register
482            .tell(WitnessesRegisterMessage::UpdateTrackerVisibility {
483                subject_id: self.subject_metadata.subject_id.clone(),
484                sn: event.sn,
485                mode,
486                stored_visibility,
487                event_visibility,
488            })
489            .await
490    }
491
492    async fn get_tracker_visibility_state(
493        &self,
494        ctx: &ActorContext<Self>,
495    ) -> Result<TrackerVisibilityState, ActorError> {
496        let witnesses_register = ctx
497            .system()
498            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
499                "/user/node/subject_manager/{}/witnesses_register",
500                self.governance_id
501            )))
502            .await?;
503
504        let response = witnesses_register
505            .ask(WitnessesRegisterMessage::GetTrackerVisibilityState {
506                subject_id: self.subject_metadata.subject_id.clone(),
507            })
508            .await?;
509
510        match response {
511            WitnessesRegisterResponse::TrackerVisibilityState { state } => {
512                Ok(state)
513            }
514            _ => Err(ActorError::UnexpectedResponse {
515                path: ActorPath::from(format!(
516                    "/user/node/subject_manager/{}/witnesses_register",
517                    self.governance_id
518                )),
519                expected: "WitnessesRegisterResponse::TrackerVisibilityState"
520                    .to_owned(),
521            }),
522        }
523    }
524
525    async fn build_subject_db(
526        &self,
527        ctx: &ActorContext<Self>,
528    ) -> Result<SubjectDB, ActorError> {
529        let visibility_state = self.get_tracker_visibility_state(ctx).await?;
530
531        Ok(SubjectDB {
532            name: self.subject_metadata.name.clone(),
533            description: self.subject_metadata.description.clone(),
534            subject_id: self.subject_metadata.subject_id.to_string(),
535            governance_id: self.governance_id.to_string(),
536            genesis_gov_version: self.genesis_gov_version,
537            prev_ledger_event_hash: if self
538                .subject_metadata
539                .prev_ledger_event_hash
540                .is_empty()
541            {
542                None
543            } else {
544                Some(self.subject_metadata.prev_ledger_event_hash.to_string())
545            },
546            schema_id: self.subject_metadata.schema_id.to_string(),
547            namespace: self.namespace.to_string(),
548            sn: self.subject_metadata.sn,
549            creator: self.subject_metadata.creator.to_string(),
550            owner: self.subject_metadata.owner.to_string(),
551            new_owner: self
552                .subject_metadata
553                .new_owner
554                .clone()
555                .map(|owner| owner.to_string()),
556            active: self.subject_metadata.active,
557            tracker_visibility: Some(visibility_state.into()),
558            properties: self.properties.0.clone(),
559        })
560    }
561
562    async fn create(
563        &self,
564        ctx: &ActorContext<Self>,
565        gov_version: u64,
566    ) -> Result<(), ActorError> {
567        let sn_register = ctx
568            .system()
569            .get_actor::<SnRegister>(&ActorPath::from(format!(
570                "/user/node/subject_manager/{}/sn_register",
571                self.governance_id
572            )))
573            .await?;
574
575        sn_register
576            .tell(SnRegisterMessage::RegisterSn {
577                subject_id: self.subject_metadata.subject_id.clone(),
578                gov_version,
579                sn: 0,
580            })
581            .await?;
582
583        if self.service || *self.our_key == self.subject_metadata.owner {
584            let subject_register = ctx
585                .system()
586                .get_actor::<SubjectRegister>(&ActorPath::from(&format!(
587                    "/user/node/subject_manager/{}/subject_register",
588                    self.governance_id
589                )))
590                .await?;
591
592            let _response = subject_register
593                .ask(SubjectRegisterMessage::CreateSubject {
594                    creator: self.subject_metadata.owner.clone(),
595                    subject_id: self.subject_metadata.subject_id.clone(),
596                    namespace: self.namespace.to_string(),
597                    schema_id: self.subject_metadata.schema_id.clone(),
598                    gov_version,
599                })
600                .await?;
601        }
602
603        let witnesses_register = ctx
604            .system()
605            .get_actor::<WitnessesRegister>(&ActorPath::from(format!(
606                "/user/node/subject_manager/{}/witnesses_register",
607                self.governance_id
608            )))
609            .await?;
610
611        witnesses_register
612            .tell(WitnessesRegisterMessage::Create {
613                subject_id: self.subject_metadata.subject_id.clone(),
614                gov_version,
615                owner: self.subject_metadata.owner.clone(),
616            })
617            .await
618    }
619
620    async fn register_gov_version_sn(
621        &self,
622        ctx: &ActorContext<Self>,
623        gov_version: u64,
624    ) -> Result<(), ActorError> {
625        let sn_register = ctx
626            .system()
627            .get_actor::<SnRegister>(&ActorPath::from(format!(
628                "/user/node/subject_manager/{}/sn_register",
629                self.governance_id
630            )))
631            .await?;
632
633        sn_register
634            .tell(SnRegisterMessage::RegisterSn {
635                subject_id: self.subject_metadata.subject_id.clone(),
636                gov_version,
637                sn: self.subject_metadata.sn,
638            })
639            .await
640    }
641
642    async fn verify_new_ledger_events(
643        &mut self,
644        ctx: &mut ActorContext<Self>,
645        events: Vec<Ledger>,
646        hash: &HashAlgorithm,
647    ) -> Result<(), ActorError> {
648        let mut iter = events.into_iter();
649        let last_ledger = get_last_event(ctx).await?;
650
651        let Some(first) = iter.next() else {
652            return Ok(());
653        };
654
655        let mut pending = Vec::new();
656
657        let mut last_ledger = if let Some(last_ledger) = last_ledger {
658            pending.push(first);
659            last_ledger
660        } else {
661            if let Err(e) = Self::verify_first_ledger_event(
662                ctx,
663                &first,
664                hash,
665                Metadata::from(self.clone()),
666            )
667            .await
668            {
669                return Err(ActorError::Functional {
670                    description: e.to_string(),
671                });
672            }
673
674            self.create(ctx, first.gov_version).await?;
675
676            self.on_event(first.clone(), ctx).await;
677            self.record_visibility_event(ctx, &first).await?;
678
679            Self::register(
680                ctx,
681                RegisterMessage::RegisterSubj {
682                    gov_id: self.governance_id.to_string(),
683                    subject_id: self.subject_metadata.subject_id.to_string(),
684                    schema_id: self.subject_metadata.schema_id.clone(),
685                    namespace: self.namespace.to_string(),
686                    name: self.subject_metadata.name.clone(),
687                    description: self.subject_metadata.description.clone(),
688                },
689            )
690            .await?;
691
692            let (issuer, event_request_timestamp) =
693                first.get_issuer_event_request_timestamp();
694            let event_request = first.get_event_request();
695
696            Self::event_to_sink(
697                ctx,
698                DataForSink {
699                    gov_id: Some(self.governance_id.to_string()),
700                    subject_id: self.subject_metadata.subject_id.to_string(),
701                    sn: self.subject_metadata.sn,
702                    owner: self.subject_metadata.owner.to_string(),
703                    namespace: self.namespace.to_string(),
704                    schema_id: self.subject_metadata.schema_id.clone(),
705                    issuer,
706                    event_ledger_timestamp: first
707                        .ledger_seal_signature
708                        .timestamp
709                        .as_nanos(),
710                    event_request_timestamp,
711                    gov_version: first.gov_version,
712                    event_data_ledger: EventLedgerDataForSink::build(
713                        &first.protocols,
714                        &self.properties.0,
715                    ),
716                },
717                event_request,
718            )
719            .await?;
720
721            first
722        };
723
724        pending.extend(iter);
725
726        for event in pending {
727            let actual_ledger_hash =
728                last_ledger.ledger_hash(*hash).map_err(|e| {
729                    ActorError::FunctionalCritical {
730                        description: format!(
731                            "Can not creacte actual ledger event hash: {}",
732                            e
733                        ),
734                    }
735                })?;
736
737            let last_data = LastData {
738                gov_version: last_ledger.gov_version,
739                vali_data: last_ledger.protocols.get_validation_data(),
740            };
741
742            let last_gov_version = last_data.gov_version;
743
744            let last_event_is_ok = match Self::verify_new_ledger_event(
745                ctx,
746                Self::verify_new_ledger_event_args(
747                    &event,
748                    Metadata::from(self.clone()),
749                    actual_ledger_hash,
750                    last_data,
751                    hash,
752                    self.is_full(),
753                    self.only_clear_events,
754                ),
755            )
756            .await
757            {
758                Ok(last_event_is_ok) => last_event_is_ok,
759                Err(e) => {
760                    // Check if it's a sequence number error
761                    if matches!(e, SubjectError::InvalidSequenceNumber { .. }) {
762                        // El evento que estamos aplicando no es el siguiente.
763                        continue;
764                    } else {
765                        return Err(ActorError::Functional {
766                            description: e.to_string(),
767                        });
768                    }
769                }
770            };
771
772            let event_gov_version = event.gov_version;
773
774            let event_request = event.get_event_request();
775
776            if last_event_is_ok {
777                if last_gov_version != event_gov_version {
778                    self.register_gov_version_sn(ctx, last_gov_version).await?;
779                }
780                if let Some(event_request) = &event_request {
781                    match event_request {
782                        EventRequest::Transfer(transfer_request) => {
783                            self.transfer(
784                                ctx,
785                                transfer_request.new_owner.clone(),
786                                event.gov_version,
787                            )
788                            .await?;
789                        }
790                        EventRequest::Reject(..) => {
791                            self.reject(ctx, event.gov_version).await?;
792                        }
793                        EventRequest::Confirm(..) => {
794                            self.confirm(
795                                ctx,
796                                event.ledger_seal_signature.signer.clone(),
797                                event.gov_version,
798                            )
799                            .await?;
800                        }
801                        EventRequest::EOL(..) => {
802                            self.eol(ctx).await?;
803
804                            Self::register(
805                                ctx,
806                                RegisterMessage::EOLSubj {
807                                    gov_id: self.governance_id.to_string(),
808                                    subj_id: self
809                                        .subject_metadata
810                                        .subject_id
811                                        .to_string(),
812                                },
813                            )
814                            .await?
815                        }
816                        _ => {}
817                    };
818                }
819            }
820
821            // Aplicar evento.
822            self.on_event(event.clone(), ctx).await;
823            self.record_visibility_event(ctx, &event).await?;
824
825            let (issuer, event_request_timestamp) =
826                event.get_issuer_event_request_timestamp();
827            Self::event_to_sink(
828                ctx,
829                DataForSink {
830                    gov_id: Some(self.governance_id.to_string()),
831                    subject_id: self.subject_metadata.subject_id.to_string(),
832                    sn: self.subject_metadata.sn,
833                    owner: self.subject_metadata.owner.to_string(),
834                    namespace: self.namespace.to_string(),
835                    schema_id: self.subject_metadata.schema_id.clone(),
836                    issuer,
837                    event_ledger_timestamp: event
838                        .ledger_seal_signature
839                        .timestamp
840                        .as_nanos(),
841                    event_request_timestamp,
842                    gov_version: event.gov_version,
843                    event_data_ledger: EventLedgerDataForSink::build(
844                        &event.protocols,
845                        &self.properties.0,
846                    ),
847                },
848                event_request,
849            )
850            .await?;
851
852            // Registrar la gov_version del evento con el sn ya actualizado.
853            // Necesario cuando varios eventos comparten la misma gov_version:
854            // la transición (línea anterior) solo captura el sn antes del primer
855            // evento del nuevo gov_version, pero no el sn final de ese tramo.
856            self.register_gov_version_sn(ctx, event_gov_version).await?;
857
858            // Actualizar último evento.
859            last_ledger = event.clone();
860        }
861
862        Ok(())
863    }
864}
865
866#[derive(Debug, Clone)]
867pub enum TrackerMessage {
868    GetMetadata,
869    GetLedger { lo_sn: Option<u64>, hi_sn: u64 },
870    GetLastLedger,
871    PurgeStorage,
872    UpdateLedger { events: Vec<Ledger> },
873}
874
875impl Message for TrackerMessage {}
876
877#[derive(Debug, Clone)]
878pub enum TrackerResponse {
879    /// The subject metadata.
880    Metadata(Box<Metadata>),
881    UpdateResult(u64, PublicKey, Option<PublicKey>),
882    Ledger {
883        ledger: Vec<Ledger>,
884        is_all: bool,
885    },
886    LastLedger {
887        ledger_event: Box<Option<Ledger>>,
888    },
889    Sn(u64),
890    Ok,
891}
892impl Response for TrackerResponse {}
893
894#[async_trait]
895impl Actor for Tracker {
896    type Event = Ledger;
897    type Message = TrackerMessage;
898    type Response = TrackerResponse;
899
900    fn get_span(id: &str, parent_span: Option<Span>) -> tracing::Span {
901        parent_span.map_or_else(
902            || info_span!("Tracker", id),
903            |parent_span| info_span!(parent: parent_span, "Tracker", id),
904        )
905    }
906
907    async fn pre_start(
908        &mut self,
909        ctx: &mut ActorContext<Self>,
910    ) -> Result<(), ActorError> {
911        if let Err(e) = self.init_store("tracker", None, true, ctx).await {
912            error!(
913                error = %e,
914                "Failed to initialize tracker store"
915            );
916            return Err(e);
917        }
918
919        let Some(config): Option<crate::system::ConfigHelper> =
920            ctx.system().get_helper("config").await
921        else {
922            return Err(ActorError::Helper {
923                name: "config".to_owned(),
924                reason: "Not found".to_owned(),
925            });
926        };
927
928        if config.safe_mode {
929            return Ok(());
930        }
931
932        let our_key = self.our_key.clone();
933
934        if self.subject_metadata.active {
935            let Some(ext_db): Option<Arc<ExternalDB>> =
936                ctx.system().get_helper("ext_db").await
937            else {
938                error!("External database helper not found");
939                return Err(ActorError::Helper {
940                    name: "ext_db".to_owned(),
941                    reason: "Not found".to_owned(),
942                });
943            };
944
945            let Some(ave_sink): Option<AveSink> =
946                ctx.system().get_helper("sink").await
947            else {
948                error!("Sink helper not found");
949                return Err(ActorError::Helper {
950                    name: "sink".to_owned(),
951                    reason: "Not found".to_owned(),
952                });
953            };
954
955            let sink_actor = match ctx
956                .create_child(
957                    "sink_data",
958                    SinkData {
959                        public_key: our_key.to_string(),
960                    },
961                )
962                .await
963            {
964                Ok(actor) => actor,
965                Err(e) => {
966                    error!(
967                        error = %e,
968                        "Failed to create sink_data child"
969                    );
970                    return Err(e);
971                }
972            };
973            let sink =
974                Sink::new(sink_actor.subscribe(), ext_db.get_sink_data());
975            ctx.system().run_sink(sink).await;
976
977            let sink = Sink::new(sink_actor.subscribe(), ave_sink.clone());
978            ctx.system().run_sink(sink).await;
979        }
980
981        Ok(())
982    }
983}
984
985#[async_trait]
986impl Handler<Self> for Tracker {
987    async fn handle_message(
988        &mut self,
989        _sender: ActorPath,
990        msg: TrackerMessage,
991        ctx: &mut ActorContext<Self>,
992    ) -> Result<TrackerResponse, ActorError> {
993        match msg {
994            TrackerMessage::GetLedger { lo_sn, hi_sn } => {
995                let (ledger, is_all) =
996                    self.get_ledger(ctx, lo_sn, hi_sn).await?;
997                Ok(TrackerResponse::Ledger { ledger, is_all })
998            }
999            TrackerMessage::GetLastLedger => {
1000                let ledger_event = self.get_last_ledger(ctx).await?;
1001                Ok(TrackerResponse::LastLedger {
1002                    ledger_event: Box::new(ledger_event),
1003                })
1004            }
1005            TrackerMessage::GetMetadata => Ok(TrackerResponse::Metadata(
1006                Box::new(Metadata::from(self.clone())),
1007            )),
1008            TrackerMessage::PurgeStorage => {
1009                purge_storage(ctx).await?;
1010
1011                debug!(
1012                    msg_type = "PurgeStorage",
1013                    subject_id = %self.subject_metadata.subject_id,
1014                    "Tracker storage purged"
1015                );
1016
1017                Ok(TrackerResponse::Ok)
1018            }
1019            TrackerMessage::UpdateLedger { events } => {
1020                let events_count = events.len();
1021                if let Err(e) =
1022                    self.manager_new_ledger_events(ctx, events).await
1023                {
1024                    warn!(
1025                        msg_type = "UpdateLedger",
1026                        error = %e,
1027                        subject_id = %self.subject_metadata.subject_id,
1028                        events_count = events_count,
1029                        "Failed to verify new ledger events"
1030                    );
1031                    return Err(e);
1032                };
1033
1034                debug!(
1035                    msg_type = "UpdateLedger",
1036                    subject_id = %self.subject_metadata.subject_id,
1037                    sn = self.subject_metadata.sn,
1038                    events_count = events_count,
1039                    "Ledger updated successfully"
1040                );
1041
1042                Ok(TrackerResponse::UpdateResult(
1043                    self.subject_metadata.sn,
1044                    self.subject_metadata.owner.clone(),
1045                    self.subject_metadata.new_owner.clone(),
1046                ))
1047            }
1048        }
1049    }
1050
1051    async fn on_event(&mut self, event: Ledger, ctx: &mut ActorContext<Self>) {
1052        if let Err(e) = self.persist(&event, ctx).await {
1053            error!(
1054                error = %e,
1055                subject_id = %self.subject_metadata.subject_id,
1056                sn = self.subject_metadata.sn,
1057                "Failed to persist event"
1058            );
1059            emit_fail(ctx, e).await;
1060        };
1061
1062        if let Err(e) = ctx.publish_event(event.clone()).await {
1063            error!(
1064                error = %e,
1065                subject_id = %self.subject_metadata.subject_id,
1066                sn = self.subject_metadata.sn,
1067                "Failed to publish event"
1068            );
1069            emit_fail(ctx, e).await;
1070        } else {
1071            debug!(
1072                subject_id = %self.subject_metadata.subject_id,
1073                sn = self.subject_metadata.sn,
1074                "Event persisted and published successfully"
1075            );
1076        }
1077    }
1078
1079    async fn on_child_fault(
1080        &mut self,
1081        error: ActorError,
1082        ctx: &mut ActorContext<Self>,
1083    ) -> ChildAction {
1084        error!(
1085            subject_id = %self.subject_metadata.subject_id,
1086            sn = self.subject_metadata.sn,
1087            error = %error,
1088            "Child fault in tracker"
1089        );
1090        emit_fail(ctx, error).await;
1091        ChildAction::Stop
1092    }
1093}
1094
1095pub struct InitParamsTracker {
1096    pub data: Option<TrackerInit>,
1097    pub public_key: Arc<PublicKey>,
1098    pub hash: HashAlgorithm,
1099    pub is_service: bool,
1100    pub only_clear_events: bool,
1101}
1102
1103#[async_trait]
1104impl PersistentActor for Tracker {
1105    type Persistence = FullPersistence;
1106    type InitParams = InitParamsTracker;
1107
1108    fn update(&mut self, state: Self) {
1109        self.properties = state.properties;
1110        self.visibility_mode = state.visibility_mode;
1111        self.governance_id = state.governance_id;
1112        self.namespace = state.namespace;
1113        self.genesis_gov_version = state.genesis_gov_version;
1114        self.subject_metadata = state.subject_metadata;
1115    }
1116
1117    fn create_initial(params: Self::InitParams) -> Self {
1118        let init = params.data.unwrap_or_default();
1119
1120        Self {
1121            service: params.is_service,
1122            only_clear_events: params.only_clear_events,
1123            hash: Some(params.hash),
1124            our_key: params.public_key,
1125            subject_metadata: init.subject_metadata,
1126            properties: init.properties,
1127            genesis_gov_version: init.genesis_gov_version,
1128            governance_id: init.governance_id,
1129            namespace: init.namespace,
1130            visibility_mode: TrackerVisibilityMode::Full,
1131        }
1132    }
1133
1134    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
1135        match &event.protocols {
1136            Protocols::Create {
1137                validation,
1138                event_request,
1139            } => {
1140                if let EventRequest::Create(..) = event_request.content() {
1141                } else {
1142                    error!(
1143                        event_type = "Create",
1144                        subject_id = %self.subject_metadata.subject_id,
1145                        actual_request = ?event_request.content(),
1146                        "Unexpected event request type for tracker create apply"
1147                    );
1148                    return Err(ActorError::Functional {
1149                        description:
1150                            "In create event, event request must be Create"
1151                                .to_owned(),
1152                    });
1153                }
1154
1155                if let ValidationMetadata::Metadata(metadata) =
1156                    &validation.validation_metadata
1157                {
1158                    self.subject_metadata = SubjectMetadata::new(metadata);
1159                    self.properties = metadata.properties.clone();
1160                    self.visibility_mode = TrackerVisibilityMode::Full;
1161
1162                    debug!(
1163                        event_type = "Create",
1164                        subject_id = %self.subject_metadata.subject_id,
1165                        sn = self.subject_metadata.sn,
1166                        "Applied create event"
1167                    );
1168                } else {
1169                    error!(
1170                        event_type = "Create",
1171                        "Validation metadata must be Metadata type"
1172                    );
1173                    return Err(ActorError::Functional { description: "In create event, validation metadata must be a Metadata".to_owned() });
1174                }
1175
1176                return Ok(());
1177            }
1178            Protocols::TrackerFactFull {
1179                evaluation,
1180                event_request,
1181                ..
1182            } => {
1183                let EventRequest::Fact(_fact_request) = event_request.content()
1184                else {
1185                    error!(
1186                        event_type = "Fact",
1187                        subject_id = %self.subject_metadata.subject_id,
1188                        actual_request = ?event_request.content(),
1189                        "Unexpected event request type for tracker fact apply"
1190                    );
1191                    return Err(ActorError::Functional {
1192                        description:
1193                            "In fact event, event request must be Fact"
1194                                .to_owned(),
1195                    });
1196                };
1197
1198                if let Some(eval_res) = evaluation.evaluator_response_ok() {
1199                    if self.is_full() {
1200                        self.apply_patch(eval_res.patch)?;
1201                        debug!(
1202                            event_type = "Fact",
1203                            subject_id = %self.subject_metadata.subject_id,
1204                            "Applied fact event with patch"
1205                        );
1206                    } else {
1207                        debug!(
1208                            event_type = "Fact",
1209                            subject_id = %self.subject_metadata.subject_id,
1210                            "Tracker is not in full mode, fact patch not applied"
1211                        );
1212                    }
1213                }
1214            }
1215            Protocols::TrackerFactOpaque { evaluation, .. } => {
1216                if evaluation.is_ok() {
1217                    self.visibility_mode = TrackerVisibilityMode::Opaque;
1218                }
1219                debug!(
1220                    event_type = "FactOpaque",
1221                    subject_id = %self.subject_metadata.subject_id,
1222                    "Applied tracker opaque fact event"
1223                );
1224            }
1225            Protocols::Transfer {
1226                evaluation,
1227                event_request,
1228                ..
1229            } => {
1230                let EventRequest::Transfer(transfer_request) =
1231                    event_request.content()
1232                else {
1233                    error!(
1234                        event_type = "Transfer",
1235                        subject_id = %self.subject_metadata.subject_id,
1236                        actual_request = ?event_request.content(),
1237                        "Unexpected event request type for tracker transfer apply"
1238                    );
1239                    return Err(ActorError::Functional {
1240                        description:
1241                            "In transfer event, event request must be Transfer"
1242                                .to_owned(),
1243                    });
1244                };
1245
1246                if evaluation.evaluator_response_ok().is_some() {
1247                    if self.is_full()
1248                        && let Some(eval_res) =
1249                            evaluation.evaluator_response_ok()
1250                    {
1251                        self.apply_patch(eval_res.patch)?;
1252                    } else if !self.is_full() {
1253                        debug!(
1254                            event_type = "Transfer",
1255                            subject_id = %self.subject_metadata.subject_id,
1256                            "Tracker is not in full mode, transfer patch not applied"
1257                        );
1258                    }
1259                    self.subject_metadata.new_owner =
1260                        Some(transfer_request.new_owner.clone());
1261                    debug!(
1262                        event_type = "Transfer",
1263                        subject_id = %self.subject_metadata.subject_id,
1264                        new_owner = %transfer_request.new_owner,
1265                        "Applied transfer event"
1266                    );
1267                }
1268            }
1269            Protocols::TrackerConfirm { event_request, .. } => {
1270                if let EventRequest::Confirm(..) = event_request.content() {
1271                } else {
1272                    error!(
1273                        event_type = "Confirm",
1274                        subject_id = %self.subject_metadata.subject_id,
1275                        actual_request = ?event_request.content(),
1276                        "Unexpected event request type for tracker confirm apply"
1277                    );
1278                    return Err(ActorError::Functional {
1279                        description:
1280                            "In confirm event, event request must be Confirm"
1281                                .to_owned(),
1282                    });
1283                }
1284
1285                if let Some(new_owner) = self.subject_metadata.new_owner.take()
1286                {
1287                    self.subject_metadata.owner = new_owner.clone();
1288                    debug!(
1289                        event_type = "Confirm",
1290                        subject_id = %self.subject_metadata.subject_id,
1291                        new_owner = %new_owner,
1292                        "Applied confirm event"
1293                    );
1294                } else {
1295                    error!(
1296                        event_type = "Confirm",
1297                        subject_id = %self.subject_metadata.subject_id,
1298                        "New owner is None in confirm event"
1299                    );
1300                    return Err(ActorError::Functional {
1301                        description: "In confirm event, new owner is None"
1302                            .to_owned(),
1303                    });
1304                }
1305            }
1306            Protocols::Reject { event_request, .. } => {
1307                if let EventRequest::Reject(..) = event_request.content() {
1308                } else {
1309                    error!(
1310                        event_type = "Reject",
1311                        subject_id = %self.subject_metadata.subject_id,
1312                        actual_request = ?event_request.content(),
1313                        "Unexpected event request type for tracker reject apply"
1314                    );
1315                    return Err(ActorError::Functional {
1316                        description:
1317                            "In reject event, event request must be Reject"
1318                                .to_owned(),
1319                    });
1320                }
1321
1322                self.subject_metadata.new_owner = None;
1323                debug!(
1324                    event_type = "Reject",
1325                    subject_id = %self.subject_metadata.subject_id,
1326                    "Applied reject event"
1327                );
1328            }
1329            Protocols::EOL { event_request, .. } => {
1330                if let EventRequest::EOL(..) = event_request.content() {
1331                } else {
1332                    error!(
1333                        event_type = "EOL",
1334                        subject_id = %self.subject_metadata.subject_id,
1335                        actual_request = ?event_request.content(),
1336                        "Unexpected event request type for tracker eol apply"
1337                    );
1338                    return Err(ActorError::Functional {
1339                        description: "In EOL event, event request must be EOL"
1340                            .to_owned(),
1341                    });
1342                }
1343
1344                self.subject_metadata.active = false;
1345                debug!(
1346                    event_type = "EOL",
1347                    subject_id = %self.subject_metadata.subject_id,
1348                    "Applied EOL event"
1349                );
1350            }
1351            _ => {
1352                error!(
1353                    subject_id = %self.subject_metadata.subject_id,
1354                    "Invalid protocol data for Tracker"
1355                );
1356                return Err(ActorError::Functional {
1357                    description:
1358                        "Protocols data is for Governance and this is a Tracker"
1359                            .to_owned(),
1360                });
1361            }
1362        }
1363
1364        self.subject_metadata.sn += 1;
1365        self.subject_metadata.prev_ledger_event_hash =
1366            event.prev_ledger_event_hash.clone();
1367
1368        Ok(())
1369    }
1370}
1371
1372impl Storable for Tracker {}