Skip to main content

ave_core/governance/
witnesses_register.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
2
3use crate::governance::sn_register::{
4    SnLimit, SnRegister, SnRegisterMessage, SnRegisterResponse,
5};
6use crate::governance::subject_register::{
7    SubjectRegister, SubjectRegisterMessage, SubjectRegisterResponse,
8};
9use crate::model::common::{
10    Interval, IntervalSet, TrackerEventVisibility, TrackerStoredVisibility,
11    TrackerVisibilityMode, TrackerVisibilityState, emit_fail, purge_storage,
12};
13use async_trait::async_trait;
14use ave_actors::{
15    Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
16    Response,
17};
18use ave_actors::{LightPersistence, PersistentActor};
19use ave_common::identity::{DigestIdentifier, PublicKey};
20use ave_common::{Namespace, SchemaType};
21use borsh::{BorshDeserialize, BorshSerialize};
22use serde::{Deserialize, Serialize};
23use tracing::{Span, debug, error, info_span, warn};
24
25use crate::db::Storable;
26
27#[derive(
28    Debug,
29    Clone,
30    Serialize,
31    Deserialize,
32    Default,
33    BorshDeserialize,
34    BorshSerialize,
35)]
36pub struct WitnessesRegister {
37    gov_sn: u64,
38    subjects: HashMap<DigestIdentifier, TransferData>,
39    witnesses:
40        HashMap<(PublicKey, SchemaType), HashMap<Namespace, IntervalData>>,
41    witnesses_creator: HashMap<
42        (PublicKey, String, SchemaType),
43        HashMap<WitnessesType, IntervalData>,
44    >,
45    witnesses_creator_grants: HashMap<
46        (PublicKey, String, SchemaType),
47        HashMap<WitnessesType, CreatorWitnessGrantHistory>,
48    >,
49    #[serde(skip)]
50    ledger_batch_size: usize,
51}
52
53type IntervalData = (IntervalSet, Option<u64>);
54
55pub enum ActualSearch {
56    End(SnLimit),
57    Continue { gov_version: Option<u64> },
58}
59
60#[derive(
61    Debug,
62    Clone,
63    Serialize,
64    Deserialize,
65    BorshDeserialize,
66    BorshSerialize,
67    Hash,
68    PartialEq,
69    Eq,
70    PartialOrd,
71    Ord,
72)]
73pub enum WitnessesType {
74    User(PublicKey),
75    Witnesses,
76}
77
78#[derive(
79    Debug,
80    Clone,
81    Serialize,
82    Deserialize,
83    BorshDeserialize,
84    BorshSerialize,
85    Hash,
86    PartialEq,
87    Eq,
88    PartialOrd,
89    Ord,
90)]
91pub enum CreatorWitnessGrant {
92    Hash,
93    Clear(BTreeSet<String>),
94    Full,
95}
96
97#[derive(
98    Debug,
99    Clone,
100    Serialize,
101    Deserialize,
102    Default,
103    BorshDeserialize,
104    BorshSerialize,
105)]
106pub struct CreatorWitnessRegistration {
107    pub witnesses: Vec<WitnessesType>,
108    pub grants: Vec<(WitnessesType, CreatorWitnessGrant)>,
109}
110
111#[derive(
112    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
113)]
114pub struct CreatorWitnessGrantRange {
115    pub interval: Interval,
116    pub grant: CreatorWitnessGrant,
117}
118
119#[derive(
120    Debug,
121    Clone,
122    Serialize,
123    Deserialize,
124    Default,
125    BorshDeserialize,
126    BorshSerialize,
127)]
128pub struct CreatorWitnessGrantHistory {
129    pub closed: Vec<CreatorWitnessGrantRange>,
130    pub current_from: Option<u64>,
131    pub current_grant: Option<CreatorWitnessGrant>,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub enum TrackerDeliveryMode {
136    Clear,
137    Opaque,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct TrackerDeliveryRange {
142    pub from_sn: u64,
143    pub to_sn: u64,
144    pub mode: TrackerDeliveryMode,
145}
146
147#[derive(
148    Debug,
149    Clone,
150    Serialize,
151    Deserialize,
152    Default,
153    BorshDeserialize,
154    BorshSerialize,
155)]
156pub struct TransferData {
157    actual_owner: PublicKey,
158    actual_new_owner_data: Option<(PublicKey, u64)>,
159    sn: u64,
160    gov_version: u64,
161    old_owners: HashMap<PublicKey, OldOwnerData>,
162    visibility_state: TrackerVisibilityState,
163}
164
165#[derive(
166    Debug,
167    Clone,
168    Serialize,
169    Deserialize,
170    Default,
171    BorshDeserialize,
172    BorshSerialize,
173)]
174pub struct OldOwnerData {
175    sn: u64,
176    interval_gov_version: IntervalSet,
177}
178
179#[derive(Debug, Clone)]
180pub enum WitnessesRegisterMessage {
181    PurgeStorage,
182    GetSnGov,
183    GetTrackerSnOwner {
184        subject_id: DigestIdentifier,
185    },
186    ListCurrentWitnessSubjects {
187        node: PublicKey,
188        governance_version: u64,
189        after_subject_id: Option<DigestIdentifier>,
190        limit: usize,
191    },
192    UpdateCreatorsWitnessesFact {
193        version: u64,
194        new_creator: HashMap<
195            (SchemaType, String, PublicKey),
196            CreatorWitnessRegistration,
197        >,
198        remove_creator: HashSet<(SchemaType, String, PublicKey)>,
199        update_creator_witnesses: HashMap<
200            (SchemaType, String, PublicKey),
201            CreatorWitnessRegistration,
202        >,
203
204        new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
205        remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
206    },
207    UpdateCreatorsWitnessesConfirm {
208        version: u64,
209        remove_creator: HashSet<(SchemaType, String, PublicKey)>,
210        remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
211    },
212    UpdateSn {
213        subject_id: DigestIdentifier,
214        sn: u64,
215    },
216    UpdateSnGov {
217        sn: u64,
218    },
219    Create {
220        subject_id: DigestIdentifier,
221        owner: PublicKey,
222        gov_version: u64,
223    },
224    Transfer {
225        subject_id: DigestIdentifier,
226        new_owner: PublicKey,
227        gov_version: u64,
228    },
229    Confirm {
230        subject_id: DigestIdentifier,
231        sn: u64,
232        gov_version: u64,
233    },
234    DeleteSubject {
235        subject_id: DigestIdentifier,
236    },
237    UpdateTrackerVisibility {
238        subject_id: DigestIdentifier,
239        sn: u64,
240        mode: TrackerVisibilityMode,
241        stored_visibility: TrackerStoredVisibility,
242        event_visibility: TrackerEventVisibility,
243    },
244    Reject {
245        subject_id: DigestIdentifier,
246        sn: u64,
247        gov_version: u64,
248    },
249    Access {
250        subject_id: DigestIdentifier,
251        node: PublicKey,
252        namespace: String,
253        schema_id: SchemaType,
254    },
255    GetTrackerVisibilityState {
256        subject_id: DigestIdentifier,
257    },
258    GetTrackerWindow {
259        subject_id: DigestIdentifier,
260        node: PublicKey,
261        namespace: String,
262        schema_id: SchemaType,
263        actual_sn: Option<u64>,
264    },
265}
266
267impl Message for WitnessesRegisterMessage {
268    fn is_critical(&self) -> bool {
269        matches!(
270            self,
271            Self::PurgeStorage
272                | Self::UpdateCreatorsWitnessesFact { .. }
273                | Self::UpdateCreatorsWitnessesConfirm { .. }
274                | Self::UpdateSn { .. }
275                | Self::UpdateSnGov { .. }
276                | Self::Create { .. }
277                | Self::Transfer { .. }
278                | Self::Confirm { .. }
279                | Self::UpdateTrackerVisibility { .. }
280                | Self::DeleteSubject { .. }
281                | Self::Reject { .. }
282        )
283    }
284}
285
286#[derive(
287    Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
288)]
289pub enum WitnessesRegisterEvent {
290    UpdateCreatorsWitnessesFact {
291        version: u64,
292        new_creator: HashMap<
293            (SchemaType, String, PublicKey),
294            CreatorWitnessRegistration,
295        >,
296        remove_creator: HashSet<(SchemaType, String, PublicKey)>,
297        update_creator_witnesses: HashMap<
298            (SchemaType, String, PublicKey),
299            CreatorWitnessRegistration,
300        >,
301
302        new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
303        remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
304    },
305    UpdateCreatorsWitnessesConfirm {
306        version: u64,
307        remove_creator: HashSet<(SchemaType, String, PublicKey)>,
308        remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
309    },
310    UpdateSn {
311        subject_id: DigestIdentifier,
312        sn: u64,
313    },
314    UpdateSnGov {
315        sn: u64,
316    },
317    Create {
318        subject_id: DigestIdentifier,
319        owner: PublicKey,
320        gov_version: u64,
321    },
322    Transfer {
323        subject_id: DigestIdentifier,
324        new_owner: PublicKey,
325        gov_version: u64,
326    },
327    Confirm {
328        subject_id: DigestIdentifier,
329        sn: u64,
330        gov_version: u64,
331    },
332    DeleteSubject {
333        subject_id: DigestIdentifier,
334    },
335    UpdateTrackerVisibility {
336        subject_id: DigestIdentifier,
337        sn: u64,
338        mode: TrackerVisibilityMode,
339        stored_visibility: TrackerStoredVisibility,
340        event_visibility: TrackerEventVisibility,
341    },
342    Reject {
343        subject_id: DigestIdentifier,
344        sn: u64,
345        gov_version: u64,
346    },
347}
348
349impl Event for WitnessesRegisterEvent {}
350
351pub enum WitnessesRegisterResponse {
352    Access {
353        sn: Option<u64>,
354    },
355    GovSn {
356        sn: u64,
357    },
358    TrackerOwnerSn {
359        data: Option<(PublicKey, u64)>,
360    },
361    CurrentWitnessSubjects {
362        governance_version: u64,
363        items: Vec<CurrentWitnessSubject>,
364        next_cursor: Option<DigestIdentifier>,
365    },
366    TrackerVisibilityState {
367        state: TrackerVisibilityState,
368    },
369    TrackerWindow {
370        sn: Option<u64>,
371        clear_sn: Option<u64>,
372        is_all: bool,
373        ranges: Vec<TrackerDeliveryRange>,
374    },
375    Ok,
376}
377
378impl Response for WitnessesRegisterResponse {}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct CurrentWitnessSubject {
382    pub subject_id: DigestIdentifier,
383    pub target_sn: u64,
384}
385
386impl CreatorWitnessGrantHistory {
387    fn apply_version(
388        &mut self,
389        version: u64,
390        grant: Option<CreatorWitnessGrant>,
391    ) {
392        match (&self.current_from, &self.current_grant, &grant) {
393            (Some(current_from), Some(current_grant), Some(next_grant))
394                if current_grant != next_grant =>
395            {
396                self.closed.push(CreatorWitnessGrantRange {
397                    interval: Interval::new(*current_from, version - 1),
398                    grant: current_grant.clone(),
399                });
400                self.current_from = Some(version);
401                self.current_grant = Some(next_grant.clone());
402            }
403            (Some(_), Some(_), Some(_)) => {}
404            (Some(current_from), Some(current_grant), None) => {
405                self.closed.push(CreatorWitnessGrantRange {
406                    interval: Interval::new(*current_from, version - 1),
407                    grant: current_grant.clone(),
408                });
409                self.current_from = None;
410                self.current_grant = None;
411            }
412            (None, None, Some(next_grant)) => {
413                self.current_from = Some(version);
414                self.current_grant = Some(next_grant.clone());
415            }
416            _ => {}
417        }
418    }
419}
420
421impl WitnessesRegister {
422    fn merge_grant(
423        actual: Option<CreatorWitnessGrant>,
424        next: &CreatorWitnessGrant,
425    ) -> CreatorWitnessGrant {
426        match (actual, next) {
427            (Some(CreatorWitnessGrant::Full), ..)
428            | (_, CreatorWitnessGrant::Full) => CreatorWitnessGrant::Full,
429            (
430                Some(CreatorWitnessGrant::Clear(mut left)),
431                CreatorWitnessGrant::Clear(right),
432            ) => {
433                left.extend(right.iter().cloned());
434                CreatorWitnessGrant::Clear(left)
435            }
436            (
437                Some(CreatorWitnessGrant::Clear(left)),
438                CreatorWitnessGrant::Hash,
439            ) => CreatorWitnessGrant::Clear(left),
440            (
441                Some(CreatorWitnessGrant::Hash),
442                CreatorWitnessGrant::Clear(right),
443            ) => CreatorWitnessGrant::Clear(right.clone()),
444            (Some(CreatorWitnessGrant::Hash), CreatorWitnessGrant::Hash)
445            | (None, CreatorWitnessGrant::Hash) => CreatorWitnessGrant::Hash,
446            (None, CreatorWitnessGrant::Clear(right)) => {
447                CreatorWitnessGrant::Clear(right.clone())
448            }
449        }
450    }
451
452    fn interval_overlaps_owner(
453        current_from: Option<u64>,
454        intervals: &IntervalSet,
455        owner_lo: u64,
456        owner_hi: u64,
457    ) -> bool {
458        current_from.is_some_and(|from| from <= owner_hi)
459            || intervals.max_covered_in(owner_lo, owner_hi).is_some()
460    }
461
462    fn covered_old_owner_intervals(
463        current_from: Option<u64>,
464        intervals: &IntervalSet,
465        old_owner: &OldOwnerData,
466    ) -> IntervalSet {
467        let mut covered = IntervalSet::new();
468
469        for owner_range in old_owner.interval_gov_version.iter() {
470            if let Some(from) = current_from {
471                let lo = from.max(owner_range.lo);
472                if lo <= owner_range.hi {
473                    covered.insert(Interval {
474                        lo,
475                        hi: owner_range.hi,
476                    });
477                }
478            }
479
480            for witness_range in intervals.iter() {
481                let lo = witness_range.lo.max(owner_range.lo);
482                let hi = witness_range.hi.min(owner_range.hi);
483
484                if lo <= hi {
485                    covered.insert(Interval { lo, hi });
486                }
487            }
488        }
489
490        covered
491    }
492
493    fn max_covered_old_owner_gov_version(
494        current_from: Option<u64>,
495        intervals: &IntervalSet,
496        old_owner: &OldOwnerData,
497    ) -> Option<u64> {
498        Self::covered_old_owner_intervals(current_from, intervals, old_owner)
499            .iter()
500            .last()
501            .map(|range| range.hi)
502    }
503
504    fn grant_for_owner_interval(
505        history: &CreatorWitnessGrantHistory,
506        owner_lo: u64,
507        owner_hi: u64,
508    ) -> Option<&CreatorWitnessGrant> {
509        if let (Some(current_from), Some(current_grant)) =
510            (history.current_from, history.current_grant.as_ref())
511            && current_from <= owner_hi
512        {
513            return Some(current_grant);
514        }
515
516        history
517            .closed
518            .iter()
519            .rev()
520            .find(|range| {
521                range.interval.lo <= owner_hi && range.interval.hi >= owner_lo
522            })
523            .map(|range| &range.grant)
524    }
525
526    fn schema_witness_covers_owner_interval(
527        &self,
528        node: &PublicKey,
529        schema_id: &SchemaType,
530        namespace: &Namespace,
531        owner_lo: u64,
532        owner_hi: u64,
533    ) -> bool {
534        let matches = |witness_data: &HashMap<Namespace, IntervalData>| {
535            witness_data.iter().any(
536                |(current_namespace, (intervals, current_from))| {
537                    current_namespace.is_ancestor_or_equal_of(namespace)
538                        && Self::interval_overlaps_owner(
539                            *current_from,
540                            intervals,
541                            owner_lo,
542                            owner_hi,
543                        )
544                },
545            )
546        };
547
548        self.witnesses
549            .get(&(node.clone(), schema_id.clone()))
550            .is_some_and(matches)
551            || self
552                .witnesses
553                .get(&(node.clone(), SchemaType::TrackerSchemas))
554                .is_some_and(matches)
555    }
556
557    fn creator_grant_for_owner_interval(
558        &self,
559        node: &PublicKey,
560        creator: &PublicKey,
561        schema_id: &SchemaType,
562        namespace: &Namespace,
563        owner_lo: u64,
564        owner_hi: u64,
565    ) -> Option<CreatorWitnessGrant> {
566        let grants = self.witnesses_creator_grants.get(&(
567            creator.clone(),
568            namespace.to_string(),
569            schema_id.clone(),
570        ))?;
571        let intervals = self.witnesses_creator.get(&(
572            creator.clone(),
573            namespace.to_string(),
574            schema_id.clone(),
575        ))?;
576
577        let mut out = None;
578
579        if let (Some(history), Some((creator_intervals, creator_current_from))) = (
580            grants.get(&WitnessesType::User(node.clone())),
581            intervals.get(&WitnessesType::User(node.clone())),
582        ) && Self::interval_overlaps_owner(
583            *creator_current_from,
584            creator_intervals,
585            owner_lo,
586            owner_hi,
587        ) && let Some(grant) =
588            Self::grant_for_owner_interval(history, owner_lo, owner_hi)
589        {
590            out = Some(Self::merge_grant(out, grant));
591        }
592
593        if let (Some(history), Some((creator_intervals, creator_current_from))) = (
594            grants.get(&WitnessesType::Witnesses),
595            intervals.get(&WitnessesType::Witnesses),
596        ) && Self::interval_overlaps_owner(
597            *creator_current_from,
598            creator_intervals,
599            owner_lo,
600            owner_hi,
601        ) && self.schema_witness_covers_owner_interval(
602            node, schema_id, namespace, owner_lo, owner_hi,
603        ) && let Some(grant) =
604            Self::grant_for_owner_interval(history, owner_lo, owner_hi)
605        {
606            out = Some(Self::merge_grant(out, grant));
607        }
608
609        out
610    }
611
612    fn creator_grant_for_event_or_current_owner(
613        &self,
614        node: &PublicKey,
615        creator: &PublicKey,
616        schema_id: &SchemaType,
617        namespace: &Namespace,
618        event_gov_version: u64,
619        owner_from_gov_version: u64,
620    ) -> Option<CreatorWitnessGrant> {
621        self.creator_grant_for_owner_interval(
622            node,
623            creator,
624            schema_id,
625            namespace,
626            event_gov_version,
627            event_gov_version,
628        )
629        .or_else(|| {
630            self.creator_grant_for_owner_interval(
631                node,
632                creator,
633                schema_id,
634                namespace,
635                owner_from_gov_version,
636                u64::MAX,
637            )
638        })
639    }
640
641    fn grant_allows_clear(
642        grant: Option<CreatorWitnessGrant>,
643        viewpoints: &BTreeSet<String>,
644    ) -> bool {
645        match grant {
646            Some(CreatorWitnessGrant::Full) => true,
647            Some(CreatorWitnessGrant::Clear(allowed)) => {
648                viewpoints.is_empty() || viewpoints.is_subset(&allowed)
649            }
650            Some(CreatorWitnessGrant::Hash) | None => false,
651        }
652    }
653
654    async fn get_gov_version_window(
655        &self,
656        ctx: &ActorContext<Self>,
657        subject_id: &DigestIdentifier,
658        from_sn: u64,
659        to_sn: u64,
660    ) -> Result<Vec<(Interval, u64)>, ActorError> {
661        let governance_id = ctx.path().parent().key();
662        let path = ActorPath::from(format!(
663            "/user/node/subject_manager/{}/sn_register",
664            governance_id
665        ));
666        let sn_register = ctx.system().get_actor::<SnRegister>(&path).await?;
667        let response = sn_register
668            .ask(SnRegisterMessage::GetGovVersionWindow {
669                subject_id: subject_id.clone(),
670                from_sn,
671                to_sn,
672            })
673            .await?;
674
675        match response {
676            SnRegisterResponse::GovVersionWindow(ranges) => Ok(ranges
677                .into_iter()
678                .map(|range| {
679                    (
680                        Interval::new(range.from_sn, range.to_sn),
681                        range.gov_version,
682                    )
683                })
684                .collect()),
685            _ => Err(ActorError::UnexpectedResponse {
686                path,
687                expected: "SnRegisterResponse::GovVersionWindow".to_owned(),
688            }),
689        }
690    }
691
692    fn gov_version_for_sn(
693        gov_versions: &[(Interval, u64)],
694        sn: u64,
695    ) -> Option<u64> {
696        gov_versions
697            .iter()
698            .find(|(interval, _)| interval.contains(sn))
699            .map(|(_, gov_version)| *gov_version)
700    }
701
702    fn event_delivery_mode(
703        &self,
704        data: &TransferData,
705        node: &PublicKey,
706        namespace: &Namespace,
707        schema_id: &SchemaType,
708        sn: u64,
709        gov_version: u64,
710    ) -> TrackerDeliveryMode {
711        let stored_span = data.visibility_state.iter_stored(sn, sn).next();
712        let event_span = data.visibility_state.iter_events(sn, sn).next();
713
714        let Some(stored_span) = stored_span else {
715            return TrackerDeliveryMode::Opaque;
716        };
717        let Some(event_span) = event_span else {
718            return TrackerDeliveryMode::Opaque;
719        };
720
721        match event_span.visibility {
722            TrackerEventVisibility::NonFact => TrackerDeliveryMode::Clear,
723            TrackerEventVisibility::Fact(viewpoints) => {
724                if viewpoints.is_empty() {
725                    return TrackerDeliveryMode::Clear;
726                }
727
728                if data.actual_owner == *node
729                    || data
730                        .actual_new_owner_data
731                        .as_ref()
732                        .is_some_and(|(new_owner, _)| new_owner == node)
733                {
734                    return TrackerDeliveryMode::Clear;
735                }
736
737                if let Some(old_owner) = data.old_owners.get(node)
738                    && sn <= old_owner.sn
739                {
740                    return TrackerDeliveryMode::Clear;
741                }
742
743                if matches!(
744                    stored_span.visibility,
745                    TrackerStoredVisibility::None
746                ) {
747                    return TrackerDeliveryMode::Opaque;
748                }
749
750                let mut grant = None;
751
752                if gov_version >= data.gov_version {
753                    grant = Some(Self::merge_grant(
754                        grant,
755                        &self
756                            .creator_grant_for_event_or_current_owner(
757                                node,
758                                &data.actual_owner,
759                                schema_id,
760                                namespace,
761                                gov_version,
762                                data.gov_version,
763                            )
764                            .unwrap_or(CreatorWitnessGrant::Hash),
765                    ));
766                }
767
768                if let Some((new_owner, new_owner_gov_version)) =
769                    &data.actual_new_owner_data
770                    && gov_version >= *new_owner_gov_version
771                {
772                    grant = Some(Self::merge_grant(
773                        grant,
774                        &self
775                            .creator_grant_for_event_or_current_owner(
776                                node,
777                                new_owner,
778                                schema_id,
779                                namespace,
780                                gov_version,
781                                *new_owner_gov_version,
782                            )
783                            .unwrap_or(CreatorWitnessGrant::Hash),
784                    ));
785                }
786
787                for (creator, old_owner) in &data.old_owners {
788                    if sn > old_owner.sn {
789                        continue;
790                    }
791
792                    for range in old_owner.interval_gov_version.iter().rev() {
793                        if !range.contains(gov_version) {
794                            continue;
795                        }
796
797                        grant = Some(Self::merge_grant(
798                            grant,
799                            &self
800                                .creator_grant_for_owner_interval(
801                                    node,
802                                    creator,
803                                    schema_id,
804                                    namespace,
805                                    gov_version,
806                                    gov_version,
807                                )
808                                .unwrap_or(CreatorWitnessGrant::Hash),
809                        ));
810                        break;
811                    }
812                }
813
814                if Self::grant_allows_clear(grant, viewpoints) {
815                    TrackerDeliveryMode::Clear
816                } else {
817                    TrackerDeliveryMode::Opaque
818                }
819            }
820        }
821    }
822
823    async fn build_tracker_window(
824        &self,
825        ctx: &ActorContext<Self>,
826        subject_id: &DigestIdentifier,
827        node: &PublicKey,
828        namespace: String,
829        schema_id: SchemaType,
830        actual_sn: Option<u64>,
831    ) -> Result<
832        (Option<u64>, Option<u64>, bool, Vec<TrackerDeliveryRange>),
833        ActorError,
834    > {
835        let Some(data) = self.subjects.get(subject_id) else {
836            return Ok((None, None, true, Vec::new()));
837        };
838
839        let access_limit = match self
840            .search_witnesses(
841                ctx,
842                node,
843                data,
844                namespace.clone(),
845                schema_id.clone(),
846                subject_id.clone(),
847            )
848            .await?
849        {
850            SnLimit::Sn(sn) => Some(sn),
851            SnLimit::LastSn => Some(data.sn),
852            SnLimit::NotSn => {
853                if data.actual_owner == *node
854                    || data
855                        .actual_new_owner_data
856                        .as_ref()
857                        .is_some_and(|(new_owner, _)| new_owner == node)
858                {
859                    Some(data.sn)
860                } else {
861                    data.old_owners.get(node).map(|old_owner| old_owner.sn)
862                }
863            }
864        };
865
866        let Some(access_limit) = access_limit else {
867            return Ok((None, None, true, Vec::new()));
868        };
869
870        let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
871        if from_sn > access_limit {
872            return Ok((None, None, true, Vec::new()));
873        }
874
875        let namespace = Namespace::from(namespace);
876        let gov_versions = self
877            .get_gov_version_window(ctx, subject_id, from_sn, access_limit)
878            .await?;
879
880        let mut ranges: Vec<TrackerDeliveryRange> = Vec::new();
881        let mut clear_sn = None;
882
883        for sn in from_sn..=access_limit {
884            let Some(gov_version) = Self::gov_version_for_sn(&gov_versions, sn)
885                .or_else(|| (sn == 0).then_some(data.gov_version))
886            else {
887                continue;
888            };
889
890            let mode = self.event_delivery_mode(
891                data,
892                node,
893                &namespace,
894                &schema_id,
895                sn,
896                gov_version,
897            );
898
899            match ranges.last_mut() {
900                Some(last)
901                    if std::mem::discriminant(&last.mode)
902                        == std::mem::discriminant(&mode)
903                        && last.to_sn + 1 == sn =>
904                {
905                    last.to_sn = sn;
906                }
907                _ => ranges.push(TrackerDeliveryRange {
908                    from_sn: sn,
909                    to_sn: sn,
910                    mode: mode.clone(),
911                }),
912            }
913
914            if matches!(mode, TrackerDeliveryMode::Clear)
915                && ((clear_sn.is_none()
916                    && matches!(
917                        ranges.first().map(|x| &x.mode),
918                        Some(TrackerDeliveryMode::Clear)
919                    ))
920                    || clear_sn == Some(sn.saturating_sub(1)))
921            {
922                clear_sn = Some(sn);
923            }
924        }
925
926        Ok((Some(access_limit), clear_sn, true, ranges))
927    }
928
929    async fn access_limit_for_node(
930        &self,
931        ctx: &ActorContext<Self>,
932        subject_id: &DigestIdentifier,
933        node: &PublicKey,
934        namespace: &str,
935        schema_id: &SchemaType,
936    ) -> Result<Option<u64>, ActorError> {
937        let Some(data) = self.subjects.get(subject_id) else {
938            return Ok(None);
939        };
940
941        let sn = if data.actual_owner == *node {
942            Some(data.sn)
943        } else if let Some((new_owner, ..)) = &data.actual_new_owner_data
944            && new_owner == node
945        {
946            Some(data.sn)
947        } else if let Some(old_data) = data.old_owners.get(node) {
948            let sn_limit = self
949                .search_witnesses(
950                    ctx,
951                    node,
952                    data,
953                    namespace.to_owned(),
954                    schema_id.clone(),
955                    subject_id.clone(),
956                )
957                .await?;
958
959            let sn = match sn_limit {
960                SnLimit::Sn(sn) => sn.max(old_data.sn),
961                SnLimit::LastSn => unreachable!(
962                    "search_witnesses can not return SnLimit::LastSn"
963                ),
964                SnLimit::NotSn => old_data.sn,
965            };
966
967            Some(sn)
968        } else {
969            let sn_limit = self
970                .search_witnesses(
971                    ctx,
972                    node,
973                    data,
974                    namespace.to_owned(),
975                    schema_id.clone(),
976                    subject_id.clone(),
977                )
978                .await?;
979
980            match sn_limit {
981                SnLimit::Sn(sn) => Some(sn),
982                SnLimit::LastSn => unreachable!(
983                    "search_witnesses can not return SnLimit::LastSn"
984                ),
985                SnLimit::NotSn => None,
986            }
987        };
988
989        Ok(sn)
990    }
991
992    fn close_creator_registration(
993        &mut self,
994        schema_id: &SchemaType,
995        namespace: &str,
996        creator: &PublicKey,
997        version: u64,
998    ) {
999        if let Some(witnesses) = self.witnesses_creator.get_mut(&(
1000            creator.clone(),
1001            namespace.to_owned(),
1002            schema_id.clone(),
1003        )) {
1004            for (.., (interval, last)) in witnesses.iter_mut() {
1005                if let Some(last) = last.take() {
1006                    interval.insert(Interval {
1007                        lo: last,
1008                        hi: version - 1,
1009                    });
1010                }
1011            }
1012        }
1013
1014        if let Some(grants) = self.witnesses_creator_grants.get_mut(&(
1015            creator.clone(),
1016            namespace.to_owned(),
1017            schema_id.clone(),
1018        )) {
1019            for history in grants.values_mut() {
1020                history.apply_version(version, None);
1021            }
1022        }
1023    }
1024
1025    fn apply_creator_registration(
1026        &mut self,
1027        schema_id: &SchemaType,
1028        namespace: &str,
1029        creator: &PublicKey,
1030        registration: &CreatorWitnessRegistration,
1031        version: u64,
1032    ) {
1033        let creator_entry = self
1034            .witnesses_creator
1035            .entry((creator.clone(), namespace.to_owned(), schema_id.clone()))
1036            .or_default();
1037
1038        let witnesses: HashSet<_> =
1039            registration.witnesses.iter().cloned().collect();
1040        for (witness_type, (interval, last)) in creator_entry.iter_mut() {
1041            if !witnesses.contains(witness_type)
1042                && let Some(lo) = last.take()
1043            {
1044                interval.insert(Interval {
1045                    lo,
1046                    hi: version - 1,
1047                });
1048            }
1049        }
1050
1051        for witness in &registration.witnesses {
1052            if let Some((.., last)) = creator_entry.get_mut(witness) {
1053                if last.is_none() {
1054                    *last = Some(version);
1055                }
1056            } else {
1057                creator_entry.insert(
1058                    witness.clone(),
1059                    (IntervalSet::new(), Some(version)),
1060                );
1061            }
1062        }
1063
1064        let creator_grants = self
1065            .witnesses_creator_grants
1066            .entry((creator.clone(), namespace.to_owned(), schema_id.clone()))
1067            .or_default();
1068
1069        let grant_map: HashMap<_, _> =
1070            registration.grants.iter().cloned().collect();
1071
1072        for (witness_type, history) in creator_grants.iter_mut() {
1073            history
1074                .apply_version(version, grant_map.get(witness_type).cloned());
1075        }
1076
1077        for (witness_type, grant) in grant_map {
1078            creator_grants
1079                .entry(witness_type)
1080                .or_default()
1081                .apply_version(version, Some(grant));
1082        }
1083    }
1084
1085    async fn get_sn(
1086        &self,
1087        ctx: &ActorContext<Self>,
1088        subject_id: DigestIdentifier,
1089        gov_version: u64,
1090    ) -> Result<SnLimit, ActorError> {
1091        let governance_id = ctx.path().parent().key();
1092
1093        let path = ActorPath::from(format!(
1094            "/user/node/subject_manager/{}/sn_register",
1095            governance_id
1096        ));
1097        let sn_register = ctx.system().get_actor::<SnRegister>(&path).await?;
1098        let response = sn_register
1099            .ask(SnRegisterMessage::GetSn {
1100                subject_id,
1101                gov_version,
1102            })
1103            .await?;
1104
1105        match response {
1106            SnRegisterResponse::Sn(sn_limit) => Ok(sn_limit),
1107            _ => Err(ActorError::UnexpectedResponse {
1108                path,
1109                expected: "SnRegisterResponse::Sn".to_owned(),
1110            }),
1111        }
1112    }
1113
1114    fn search_in_schema(
1115        witness_data: &HashMap<Namespace, (IntervalSet, Option<u64>)>,
1116        parse_namespace: &Namespace,
1117        data: &OldOwnerData,
1118        mut better_gov_version: Option<u64>,
1119    ) -> Option<u64> {
1120        for (namespace, (interval, actual_lo)) in witness_data.iter() {
1121            if !namespace.is_ancestor_or_equal_of(parse_namespace) {
1122                continue;
1123            }
1124
1125            if let Some(gov_version) = Self::max_covered_old_owner_gov_version(
1126                *actual_lo, interval, data,
1127            ) {
1128                better_gov_version = better_gov_version.max(Some(gov_version));
1129            }
1130        }
1131
1132        better_gov_version
1133    }
1134
1135    async fn search_in_schema_actual(
1136        witness_data: &HashMap<Namespace, (IntervalSet, Option<u64>)>,
1137        parse_namespace: &Namespace,
1138        gov_version: u64,
1139        sn: u64,
1140        mut better_gov_version: Option<u64>,
1141    ) -> ActualSearch {
1142        for (namespace, (interval, actual_lo)) in witness_data.iter() {
1143            if namespace.is_ancestor_or_equal_of(parse_namespace) {
1144                // Actualmente soy testigo del owner
1145                if actual_lo.is_some() {
1146                    return ActualSearch::End(SnLimit::Sn(sn));
1147                }
1148
1149                if let Some(range) = interval.iter().last()
1150                    && gov_version <= range.hi
1151                {
1152                    // range.hi es la máxima gov_version que puede acceder, hay que pedir cual es ese sn.
1153                    better_gov_version = better_gov_version.max(Some(range.hi));
1154                }
1155            }
1156        }
1157
1158        ActualSearch::Continue {
1159            gov_version: better_gov_version,
1160        }
1161    }
1162
1163    /// Busca en los testigos de schema para ambos tipos (específico y TrackerSchemas) usando search_in_schema_actual
1164    async fn search_schemas_actual(
1165        &self,
1166        node: &PublicKey,
1167        schema_id: &SchemaType,
1168        parse_namespace: &Namespace,
1169        gov_version: u64,
1170        sn: u64,
1171        better_gov_version: Option<u64>,
1172    ) -> ActualSearch {
1173        // el esquema específico
1174        let better_gov_version = if let Some(witness_data) =
1175            self.witnesses.get(&(node.clone(), schema_id.clone()))
1176        {
1177            match Self::search_in_schema_actual(
1178                witness_data,
1179                parse_namespace,
1180                gov_version,
1181                sn,
1182                better_gov_version,
1183            )
1184            .await
1185            {
1186                ActualSearch::End(sn_limit) => {
1187                    return ActualSearch::End(sn_limit);
1188                }
1189                ActualSearch::Continue { gov_version } => gov_version,
1190            }
1191        } else {
1192            better_gov_version
1193        };
1194
1195        // todos los esquemas
1196        if let Some(witness_data) = self
1197            .witnesses
1198            .get(&(node.clone(), SchemaType::TrackerSchemas))
1199        {
1200            return Self::search_in_schema_actual(
1201                witness_data,
1202                parse_namespace,
1203                gov_version,
1204                sn,
1205                better_gov_version,
1206            )
1207            .await;
1208        }
1209
1210        ActualSearch::Continue {
1211            gov_version: better_gov_version,
1212        }
1213    }
1214
1215    /// Busca en los testigos de schema para ambos tipos (específico y TrackerSchemas) usando search_in_schema para old owners
1216    fn search_schemas_old(
1217        &self,
1218        node: &PublicKey,
1219        schema_id: &SchemaType,
1220        parse_namespace: &Namespace,
1221        data: &OldOwnerData,
1222        better_gov_version: Option<u64>,
1223    ) -> Option<u64> {
1224        // el esquema específico
1225        let better_gov_version = self
1226            .witnesses
1227            .get(&(node.clone(), schema_id.clone()))
1228            .map_or(better_gov_version, |witness_data| {
1229                Self::search_in_schema(
1230                    witness_data,
1231                    parse_namespace,
1232                    data,
1233                    better_gov_version,
1234                )
1235            });
1236
1237        // todos los esquemas
1238        if let Some(witness_data) = self
1239            .witnesses
1240            .get(&(node.clone(), SchemaType::TrackerSchemas))
1241        {
1242            return Self::search_in_schema(
1243                witness_data,
1244                parse_namespace,
1245                data,
1246                better_gov_version,
1247            );
1248        }
1249
1250        better_gov_version
1251    }
1252
1253    /// Busca testigos para un owner actual (actual_owner o new_owner en transferencia)
1254    async fn check_current_owner(
1255        &self,
1256        witnesses_creator: &HashMap<WitnessesType, (IntervalSet, Option<u64>)>,
1257        node: &PublicKey,
1258        schema_id: &SchemaType,
1259        parse_namespace: &Namespace,
1260        sn: u64,
1261        owner_better_gov_version: (u64, Option<u64>),
1262    ) -> ActualSearch {
1263        let (owner_gov_version, mut better_gov_version) =
1264            owner_better_gov_version;
1265
1266        // Si el nodo es testigo explicito
1267        if let Some((interval, actual_lo)) =
1268            witnesses_creator.get(&WitnessesType::User(node.clone()))
1269        {
1270            // Actualmente soy testigo del owner
1271            if actual_lo.is_some() {
1272                return ActualSearch::End(SnLimit::Sn(sn));
1273            }
1274            // Ya no soy testigo del owner, mira mi último intervalo, si era testigo cuando él empezó
1275            // a ser owner puedo recibir la copia hasta que dejé de ser testigo, mi rango.hi
1276            if let Some(range) = interval.iter().last()
1277                && owner_gov_version <= range.hi
1278            {
1279                // range.hi es la máxima gov_version que puede acceder, hay que pedir cual es ese sn.
1280                better_gov_version = better_gov_version.max(Some(range.hi));
1281            }
1282        }
1283
1284        // Solo delegar a los testigos de schema si el rol Witnesses estaba activo
1285        // cuando el owner empezó a serlo (actual_lo activo, o intervalo cerrado que llega
1286        // hasta owner_gov_version). Si el intervalo cerró antes de que el owner empezase,
1287        // contains_key sería true pero no debe triggear search_schemas_actual.
1288        if let Some((interval, actual_lo)) =
1289            witnesses_creator.get(&WitnessesType::Witnesses)
1290        {
1291            let witnesses_active = actual_lo.is_some()
1292                || interval
1293                    .iter()
1294                    .last()
1295                    .is_some_and(|range| owner_gov_version <= range.hi);
1296
1297            if witnesses_active {
1298                return self
1299                    .search_schemas_actual(
1300                        node,
1301                        schema_id,
1302                        parse_namespace,
1303                        owner_gov_version,
1304                        sn,
1305                        better_gov_version,
1306                    )
1307                    .await;
1308            }
1309        }
1310
1311        ActualSearch::Continue {
1312            gov_version: better_gov_version,
1313        }
1314    }
1315
1316    async fn search_witnesses(
1317        &self,
1318        ctx: &ActorContext<Self>,
1319        node: &PublicKey,
1320        data: &TransferData,
1321        namespace: String,
1322        schema_id: SchemaType,
1323        subject_id: DigestIdentifier,
1324    ) -> Result<SnLimit, ActorError> {
1325        let mut better_gov_version: Option<u64> = None;
1326        let mut better_sn: Option<u64> = None;
1327        let parse_namespace = Namespace::from(namespace.clone());
1328
1329        // Obtengo los testigos del owner
1330        if let Some(witnesses_creator) = self.witnesses_creator.get(&(
1331            data.actual_owner.to_owned(),
1332            namespace.clone(),
1333            schema_id.clone(),
1334        )) {
1335            match self
1336                .check_current_owner(
1337                    witnesses_creator,
1338                    node,
1339                    &schema_id,
1340                    &parse_namespace,
1341                    data.sn,
1342                    (data.gov_version, better_gov_version),
1343                )
1344                .await
1345            {
1346                ActualSearch::End(sn_limit) => return Ok(sn_limit),
1347                ActualSearch::Continue { gov_version } => {
1348                    better_gov_version = gov_version;
1349                }
1350            }
1351        }
1352
1353        if let Some((new_owner, new_owner_gov_version)) =
1354            &data.actual_new_owner_data
1355            && let Some(witnesses_creator) = self.witnesses_creator.get(&(
1356                new_owner.to_owned(),
1357                namespace.clone(),
1358                schema_id.clone(),
1359            ))
1360        {
1361            match self
1362                .check_current_owner(
1363                    witnesses_creator,
1364                    node,
1365                    &schema_id,
1366                    &parse_namespace,
1367                    data.sn,
1368                    (*new_owner_gov_version, better_gov_version),
1369                )
1370                .await
1371            {
1372                ActualSearch::End(sn_limit) => return Ok(sn_limit),
1373                ActualSearch::Continue { gov_version } => {
1374                    better_gov_version = gov_version;
1375                }
1376            }
1377        }
1378
1379        // Not_owners
1380        for (creator, old_data) in data.old_owners.iter() {
1381            if let Some(witnesses_creator) = self.witnesses_creator.get(&(
1382                creator.to_owned(),
1383                namespace.clone(),
1384                schema_id.clone(),
1385            )) {
1386                if let Some((interval, actual_lo)) =
1387                    witnesses_creator.get(&WitnessesType::User(node.clone()))
1388                    && let Some(gov_version) =
1389                        Self::max_covered_old_owner_gov_version(
1390                            *actual_lo, interval, old_data,
1391                        )
1392                {
1393                    match self
1394                        .get_sn(ctx, subject_id.clone(), gov_version)
1395                        .await?
1396                    {
1397                        SnLimit::Sn(sn) => {
1398                            better_sn =
1399                                better_sn.max(Some(sn.min(old_data.sn)));
1400                        }
1401                        SnLimit::LastSn => {
1402                            better_sn = better_sn.max(Some(old_data.sn));
1403                        }
1404                        SnLimit::NotSn => {}
1405                    }
1406                }
1407
1408                // Witness de schema.
1409                if let Some((interval, actual_lo)) =
1410                    witnesses_creator.get(&WitnessesType::Witnesses)
1411                {
1412                    let covered_old_owner = Self::covered_old_owner_intervals(
1413                        *actual_lo, interval, old_data,
1414                    );
1415
1416                    if covered_old_owner.iter().next().is_some() {
1417                        let capped_old_owner = OldOwnerData {
1418                            sn: old_data.sn,
1419                            interval_gov_version: covered_old_owner,
1420                        };
1421
1422                        if let Some(gov_version) = self.search_schemas_old(
1423                            node,
1424                            &schema_id,
1425                            &parse_namespace,
1426                            &capped_old_owner,
1427                            better_gov_version,
1428                        ) {
1429                            match self
1430                                .get_sn(ctx, subject_id.clone(), gov_version)
1431                                .await?
1432                            {
1433                                SnLimit::Sn(sn) => {
1434                                    better_sn = better_sn
1435                                        .max(Some(sn.min(old_data.sn)));
1436                                }
1437                                SnLimit::LastSn => {
1438                                    better_sn =
1439                                        better_sn.max(Some(old_data.sn));
1440                                }
1441                                SnLimit::NotSn => {}
1442                            }
1443                        }
1444                    }
1445                }
1446            }
1447        }
1448
1449        let sn_limit = if let Some(gov_version) = better_gov_version {
1450            match self.get_sn(ctx, subject_id.clone(), gov_version).await? {
1451                SnLimit::Sn(sn) => better_sn
1452                    .map_or(SnLimit::Sn(sn), |better_sn| {
1453                        SnLimit::Sn(sn.max(better_sn))
1454                    }),
1455                SnLimit::LastSn => SnLimit::Sn(data.sn),
1456                SnLimit::NotSn => better_sn.map_or(SnLimit::NotSn, SnLimit::Sn),
1457            }
1458        } else if let Some(better_sn) = better_sn {
1459            SnLimit::Sn(better_sn)
1460        } else {
1461            SnLimit::NotSn
1462        };
1463
1464        Ok(sn_limit)
1465    }
1466
1467    fn has_active_schema_witness(
1468        &self,
1469        node: &PublicKey,
1470        schema_id: &SchemaType,
1471        namespace: &Namespace,
1472    ) -> bool {
1473        let has_match = |witness_data: &HashMap<Namespace, IntervalData>| {
1474            witness_data
1475                .iter()
1476                .any(|(current_namespace, (_, current_lo))| {
1477                    current_lo.is_some()
1478                        && current_namespace.is_ancestor_or_equal_of(namespace)
1479                })
1480        };
1481
1482        self.witnesses
1483            .get(&(node.clone(), schema_id.clone()))
1484            .is_some_and(has_match)
1485            || self
1486                .witnesses
1487                .get(&(node.clone(), SchemaType::TrackerSchemas))
1488                .is_some_and(has_match)
1489    }
1490
1491    fn is_current_witness_for_entry(
1492        &self,
1493        node: &PublicKey,
1494        schema_id: &SchemaType,
1495        namespace: &str,
1496        creator_witnesses: &HashMap<WitnessesType, IntervalData>,
1497    ) -> bool {
1498        if creator_witnesses
1499            .get(&WitnessesType::User(node.clone()))
1500            .is_some_and(|(_, current_lo)| current_lo.is_some())
1501        {
1502            return true;
1503        }
1504
1505        if !creator_witnesses
1506            .get(&WitnessesType::Witnesses)
1507            .is_some_and(|(_, current_lo)| current_lo.is_some())
1508        {
1509            return false;
1510        }
1511
1512        self.has_active_schema_witness(
1513            node,
1514            schema_id,
1515            &Namespace::from(namespace.to_owned()),
1516        )
1517    }
1518
1519    async fn get_subjects_for_owner_schema(
1520        &self,
1521        ctx: &ActorContext<Self>,
1522        owner: &PublicKey,
1523        schema_id: &SchemaType,
1524        namespace: &str,
1525    ) -> Result<Vec<DigestIdentifier>, ActorError> {
1526        let governance_id = ctx.path().parent().key();
1527        let path = ActorPath::from(format!(
1528            "/user/node/subject_manager/{}/subject_register",
1529            governance_id
1530        ));
1531        let actor = ctx.system().get_actor::<SubjectRegister>(&path).await?;
1532        let response = actor
1533            .ask(SubjectRegisterMessage::GetSubjectsByOwnerSchema {
1534                owner: owner.clone(),
1535                schema_id: schema_id.clone(),
1536                namespace: namespace.to_owned(),
1537            })
1538            .await?;
1539
1540        match response {
1541            SubjectRegisterResponse::Subjects(subjects) => Ok(subjects),
1542            _ => Err(ActorError::UnexpectedResponse {
1543                path,
1544                expected: "SubjectRegisterResponse::Subjects".to_owned(),
1545            }),
1546        }
1547    }
1548
1549    async fn list_current_witness_subjects(
1550        &self,
1551        ctx: &ActorContext<Self>,
1552        node: &PublicKey,
1553        governance_version: u64,
1554        after_subject_id: Option<DigestIdentifier>,
1555        limit: usize,
1556    ) -> Result<
1557        (Vec<CurrentWitnessSubject>, Option<DigestIdentifier>),
1558        ActorError,
1559    > {
1560        let mut subjects = BTreeMap::new();
1561
1562        for ((creator, namespace, schema_id), creator_witnesses) in
1563            &self.witnesses_creator
1564        {
1565            if !self.is_current_witness_for_entry(
1566                node,
1567                schema_id,
1568                namespace,
1569                creator_witnesses,
1570            ) {
1571                continue;
1572            }
1573
1574            let current_subjects = self
1575                .get_subjects_for_owner_schema(
1576                    ctx, creator, schema_id, namespace,
1577                )
1578                .await?;
1579
1580            for subject_id in current_subjects {
1581                if let Some(data) = self.subjects.get(&subject_id) {
1582                    subjects.insert(subject_id, data.sn);
1583                }
1584            }
1585        }
1586
1587        let limit = limit.max(1);
1588        let mut items = Vec::with_capacity(limit + 1);
1589        let effective_cursor = if governance_version == self.gov_sn {
1590            after_subject_id
1591        } else {
1592            None
1593        };
1594
1595        for (subject_id, target_sn) in subjects {
1596            if effective_cursor
1597                .as_ref()
1598                .is_some_and(|cursor| &subject_id <= cursor)
1599            {
1600                continue;
1601            }
1602
1603            items.push(CurrentWitnessSubject {
1604                subject_id,
1605                target_sn,
1606            });
1607
1608            if items.len() > limit {
1609                break;
1610            }
1611        }
1612
1613        let next_cursor = if items.len() > limit {
1614            let extra = items.pop();
1615            let _ = extra;
1616            items.last().map(|item| item.subject_id.clone())
1617        } else {
1618            None
1619        };
1620
1621        Ok((items, next_cursor))
1622    }
1623}
1624
1625#[async_trait]
1626impl Actor for WitnessesRegister {
1627    type Event = WitnessesRegisterEvent;
1628    type Message = WitnessesRegisterMessage;
1629    type Response = WitnessesRegisterResponse;
1630
1631    fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
1632        parent_span.map_or_else(
1633            || info_span!("WitnessesRegister"),
1634            |parent_span| info_span!(parent: parent_span, "WitnessesRegister"),
1635        )
1636    }
1637
1638    async fn pre_start(
1639        &mut self,
1640        ctx: &mut ActorContext<Self>,
1641    ) -> Result<(), ActorError> {
1642        let prefix = ctx.path().parent().key();
1643        if let Err(e) = self
1644            .init_store("witnesses_register", Some(prefix), false, ctx)
1645            .await
1646        {
1647            error!(
1648                error = %e,
1649                "Failed to initialize witnesses_register store"
1650            );
1651            return Err(e);
1652        }
1653        Ok(())
1654    }
1655}
1656
1657#[async_trait]
1658impl Handler<Self> for WitnessesRegister {
1659    async fn handle_message(
1660        &mut self,
1661        _sender: ActorPath,
1662        msg: WitnessesRegisterMessage,
1663        ctx: &mut ActorContext<Self>,
1664    ) -> Result<WitnessesRegisterResponse, ActorError> {
1665        match msg {
1666            WitnessesRegisterMessage::PurgeStorage => {
1667                purge_storage(ctx).await?;
1668
1669                debug!(
1670                    msg_type = "PurgeStorage",
1671                    "Witnesses register storage purged"
1672                );
1673
1674                return Ok(WitnessesRegisterResponse::Ok);
1675            }
1676            WitnessesRegisterMessage::ListCurrentWitnessSubjects {
1677                node,
1678                governance_version,
1679                after_subject_id,
1680                limit,
1681            } => {
1682                let (items, next_cursor) = self
1683                    .list_current_witness_subjects(
1684                        ctx,
1685                        &node,
1686                        governance_version,
1687                        after_subject_id,
1688                        limit,
1689                    )
1690                    .await?;
1691
1692                return Ok(WitnessesRegisterResponse::CurrentWitnessSubjects {
1693                    governance_version: self.gov_sn,
1694                    items,
1695                    next_cursor,
1696                });
1697            }
1698            WitnessesRegisterMessage::GetTrackerSnOwner { subject_id } => {
1699                let data = self
1700                    .subjects
1701                    .get(&subject_id)
1702                    .map(|data| (data.actual_owner.clone(), data.sn));
1703
1704                debug!(
1705                    msg_type = "GetTrackerSnOwner",
1706                    subject_id = %subject_id,
1707                    found = data.is_some(),
1708                    "Tracker sn owner lookup completed"
1709                );
1710
1711                return Ok(WitnessesRegisterResponse::TrackerOwnerSn { data });
1712            }
1713            WitnessesRegisterMessage::GetSnGov => {
1714                debug!(
1715                    msg_type = "GetSnGov",
1716                    sn = self.gov_sn,
1717                    "Governance sn retrieved"
1718                );
1719                return Ok(WitnessesRegisterResponse::GovSn {
1720                    sn: self.gov_sn,
1721                });
1722            }
1723            WitnessesRegisterMessage::UpdateSnGov { sn } => {
1724                self.on_event(WitnessesRegisterEvent::UpdateSnGov { sn }, ctx)
1725                    .await;
1726
1727                debug!(
1728                    msg_type = "UpdateSnGov",
1729                    sn = sn,
1730                    "Governance sn updated"
1731                );
1732            }
1733            WitnessesRegisterMessage::UpdateCreatorsWitnessesConfirm {
1734                version,
1735                remove_creator,
1736                remove_witnesses,
1737            } => {
1738                let remove_creator_count = remove_creator.len();
1739                let remove_witnesses_count = remove_witnesses.len();
1740                self.on_event(
1741                    WitnessesRegisterEvent::UpdateCreatorsWitnessesConfirm {
1742                        version,
1743                        remove_creator,
1744                        remove_witnesses,
1745                    },
1746                    ctx,
1747                )
1748                .await;
1749
1750                debug!(
1751                    msg_type = "UpdateCreatorsWitnessesConfirm",
1752                    version = version,
1753                    remove_creator_count = remove_creator_count,
1754                    remove_witnesses_count = remove_witnesses_count,
1755                    "Creators and witnesses confirm updated"
1756                );
1757            }
1758            WitnessesRegisterMessage::UpdateCreatorsWitnessesFact {
1759                version,
1760                new_creator,
1761                remove_creator,
1762                update_creator_witnesses,
1763                new_witnesses,
1764                remove_witnesses,
1765            } => {
1766                let new_creator_count = new_creator.len();
1767                let remove_creator_count = remove_creator.len();
1768                self.on_event(
1769                    WitnessesRegisterEvent::UpdateCreatorsWitnessesFact {
1770                        version,
1771                        new_creator,
1772                        remove_creator,
1773                        update_creator_witnesses,
1774                        new_witnesses,
1775                        remove_witnesses,
1776                    },
1777                    ctx,
1778                )
1779                .await;
1780
1781                debug!(
1782                    msg_type = "UpdateCreatorsWitnessesFact",
1783                    version = version,
1784                    new_creator_count = new_creator_count,
1785                    remove_creator_count = remove_creator_count,
1786                    "Creators and witnesses updated"
1787                );
1788            }
1789            WitnessesRegisterMessage::UpdateSn { sn, subject_id } => {
1790                self.on_event(
1791                    WitnessesRegisterEvent::UpdateSn {
1792                        sn,
1793                        subject_id: subject_id.clone(),
1794                    },
1795                    ctx,
1796                )
1797                .await;
1798
1799                debug!(
1800                    msg_type = "UpdateSn",
1801                    subject_id = %subject_id,
1802                    sn = sn,
1803                    "Sequence number updated"
1804                );
1805            }
1806            WitnessesRegisterMessage::UpdateTrackerVisibility {
1807                subject_id,
1808                sn,
1809                mode,
1810                stored_visibility,
1811                event_visibility,
1812            } => {
1813                self.on_event(
1814                    WitnessesRegisterEvent::UpdateTrackerVisibility {
1815                        subject_id: subject_id.clone(),
1816                        sn,
1817                        mode,
1818                        stored_visibility: stored_visibility.clone(),
1819                        event_visibility: event_visibility.clone(),
1820                    },
1821                    ctx,
1822                )
1823                .await;
1824
1825                debug!(
1826                    msg_type = "UpdateTrackerVisibility",
1827                    subject_id = %subject_id,
1828                    sn = sn,
1829                    "Tracker visibility updated"
1830                );
1831            }
1832            WitnessesRegisterMessage::Create {
1833                subject_id,
1834                owner,
1835                gov_version,
1836            } => {
1837                self.on_event(
1838                    WitnessesRegisterEvent::Create {
1839                        subject_id: subject_id.clone(),
1840                        owner: owner.clone(),
1841                        gov_version,
1842                    },
1843                    ctx,
1844                )
1845                .await;
1846
1847                debug!(
1848                    msg_type = "Create",
1849                    subject_id = %subject_id,
1850                    owner = %owner,
1851                    gov_version = gov_version,
1852                    "Transfer entry created"
1853                );
1854            }
1855            WitnessesRegisterMessage::Transfer {
1856                subject_id,
1857                new_owner,
1858                gov_version,
1859            } => {
1860                self.on_event(
1861                    WitnessesRegisterEvent::Transfer {
1862                        subject_id: subject_id.clone(),
1863                        new_owner: new_owner.clone(),
1864                        gov_version,
1865                    },
1866                    ctx,
1867                )
1868                .await;
1869
1870                debug!(
1871                    msg_type = "Transfer",
1872                    subject_id = %subject_id,
1873                    new_owner = %new_owner,
1874                    gov_version = gov_version,
1875                    "New transfer registered"
1876                );
1877            }
1878            WitnessesRegisterMessage::Reject {
1879                subject_id,
1880                sn,
1881                gov_version,
1882            } => {
1883                self.on_event(
1884                    WitnessesRegisterEvent::Reject {
1885                        subject_id: subject_id.clone(),
1886                        sn,
1887                        gov_version,
1888                    },
1889                    ctx,
1890                )
1891                .await;
1892
1893                debug!(
1894                    msg_type = "Reject",
1895                    subject_id = %subject_id,
1896                    sn = sn,
1897                    gov_version = gov_version,
1898                    "The transfer was rejected"
1899                );
1900            }
1901            WitnessesRegisterMessage::Confirm {
1902                subject_id,
1903                sn,
1904                gov_version,
1905            } => {
1906                self.on_event(
1907                    WitnessesRegisterEvent::Confirm {
1908                        subject_id: subject_id.clone(),
1909                        sn,
1910                        gov_version,
1911                    },
1912                    ctx,
1913                )
1914                .await;
1915
1916                debug!(
1917                    msg_type = "Confirm",
1918                    subject_id = %subject_id,
1919                    sn = sn,
1920                    gov_version = gov_version,
1921                    "The transfer was confirmed"
1922                );
1923            }
1924            WitnessesRegisterMessage::DeleteSubject { subject_id } => {
1925                self.on_event(
1926                    WitnessesRegisterEvent::DeleteSubject {
1927                        subject_id: subject_id.clone(),
1928                    },
1929                    ctx,
1930                )
1931                .await;
1932
1933                debug!(
1934                    msg_type = "DeleteSubject",
1935                    subject_id = %subject_id,
1936                    "Witness subject entry deleted"
1937                );
1938            }
1939            WitnessesRegisterMessage::Access {
1940                subject_id,
1941                node,
1942                namespace,
1943                schema_id,
1944            } => {
1945                let sn = self
1946                    .access_limit_for_node(
1947                        ctx,
1948                        &subject_id,
1949                        &node,
1950                        &namespace,
1951                        &schema_id,
1952                    )
1953                    .await?;
1954
1955                debug!(
1956                    msg_type = "Access",
1957                    subject_id = %subject_id,
1958                    node = %node,
1959                    namespace = %namespace,
1960                    schema_id = %schema_id,
1961                    sn = sn,
1962                    "Checked access status"
1963                );
1964
1965                return Ok(WitnessesRegisterResponse::Access { sn });
1966            }
1967            WitnessesRegisterMessage::GetTrackerVisibilityState {
1968                subject_id,
1969            } => {
1970                let state = self
1971                    .subjects
1972                    .get(&subject_id)
1973                    .map(|data| data.visibility_state.clone())
1974                    .unwrap_or_default();
1975
1976                return Ok(WitnessesRegisterResponse::TrackerVisibilityState {
1977                    state,
1978                });
1979            }
1980            WitnessesRegisterMessage::GetTrackerWindow {
1981                subject_id,
1982                node,
1983                namespace,
1984                schema_id,
1985                actual_sn,
1986            } => {
1987                let (sn, clear_sn, is_all, ranges) = self
1988                    .build_tracker_window(
1989                        ctx,
1990                        &subject_id,
1991                        &node,
1992                        namespace,
1993                        schema_id,
1994                        actual_sn,
1995                    )
1996                    .await?;
1997
1998                return Ok(WitnessesRegisterResponse::TrackerWindow {
1999                    sn,
2000                    clear_sn,
2001                    is_all,
2002                    ranges,
2003                });
2004            }
2005        };
2006
2007        Ok(WitnessesRegisterResponse::Ok)
2008    }
2009
2010    async fn on_event(
2011        &mut self,
2012        event: WitnessesRegisterEvent,
2013        ctx: &mut ActorContext<Self>,
2014    ) {
2015        if let Err(e) = self.persist(&event, ctx).await {
2016            error!(
2017                event = ?event,
2018                error = %e,
2019                "Failed to persist witnesses register event"
2020            );
2021            emit_fail(ctx, e).await;
2022        }
2023    }
2024}
2025
2026#[async_trait]
2027impl PersistentActor for WitnessesRegister {
2028    type Persistence = LightPersistence;
2029    type InitParams = usize;
2030
2031    fn create_initial(params: Self::InitParams) -> Self {
2032        Self {
2033            ledger_batch_size: params,
2034            ..Self::default()
2035        }
2036    }
2037
2038    fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
2039        match event {
2040            WitnessesRegisterEvent::UpdateSnGov { sn } => {
2041                self.gov_sn = *sn;
2042
2043                debug!(
2044                    event_type = "UpdateSnGov",
2045                    sn = sn,
2046                    "Governance sn updated in state"
2047                );
2048            }
2049            WitnessesRegisterEvent::UpdateCreatorsWitnessesConfirm {
2050                version,
2051                remove_creator,
2052                remove_witnesses,
2053            } => {
2054                for (schema_id, ns, creator) in remove_creator.iter() {
2055                    self.close_creator_registration(
2056                        schema_id, ns, creator, *version,
2057                    );
2058                }
2059
2060                for ((schema_id, witness), namespace) in remove_witnesses {
2061                    if let Some(witness_namespace) = self
2062                        .witnesses
2063                        .get_mut(&(witness.clone(), schema_id.clone()))
2064                    {
2065                        for ns in namespace.iter() {
2066                            if let Some((interval, last)) =
2067                                witness_namespace.get_mut(ns)
2068                                && let Some(last) = last.take()
2069                            {
2070                                interval.insert(Interval {
2071                                    lo: last,
2072                                    hi: *version - 1,
2073                                });
2074                            }
2075                        }
2076                    }
2077                }
2078
2079                debug!(
2080                    event_type = "UpdateCreatorsWitnessesConfirm",
2081                    version = version,
2082                    remove_witnesses_count = remove_witnesses.len(),
2083                    remove_creator_count = remove_creator.len(),
2084                    "Creators and witnesses updated in state"
2085                );
2086            }
2087            WitnessesRegisterEvent::UpdateCreatorsWitnessesFact {
2088                version,
2089                new_creator,
2090                remove_creator,
2091                update_creator_witnesses,
2092                new_witnesses,
2093                remove_witnesses,
2094            } => {
2095                for ((schema_id, ns, creator), registration) in
2096                    new_creator.iter()
2097                {
2098                    self.apply_creator_registration(
2099                        schema_id,
2100                        ns,
2101                        creator,
2102                        registration,
2103                        *version,
2104                    );
2105                }
2106
2107                for (schema_id, ns, creator) in remove_creator.iter() {
2108                    self.close_creator_registration(
2109                        schema_id, ns, creator, *version,
2110                    );
2111                }
2112
2113                for ((schema_id, ns, creator), registration) in
2114                    update_creator_witnesses.iter()
2115                {
2116                    self.apply_creator_registration(
2117                        schema_id,
2118                        ns,
2119                        creator,
2120                        registration,
2121                        *version,
2122                    );
2123                }
2124
2125                for ((schema_id, witness), namespace) in new_witnesses {
2126                    for ns in namespace.iter() {
2127                        self.witnesses
2128                            .entry((witness.clone(), schema_id.clone()))
2129                            .or_default()
2130                            .entry(ns.clone())
2131                            .or_default()
2132                            .1 = Some(*version);
2133                    }
2134                }
2135
2136                for ((schema_id, witness), namespace) in remove_witnesses {
2137                    if let Some(witness_namespace) = self
2138                        .witnesses
2139                        .get_mut(&(witness.clone(), schema_id.clone()))
2140                    {
2141                        for ns in namespace.iter() {
2142                            if let Some((interval, last)) =
2143                                witness_namespace.get_mut(ns)
2144                                && let Some(last) = last.take()
2145                            {
2146                                interval.insert(Interval {
2147                                    lo: last,
2148                                    hi: *version - 1,
2149                                });
2150                            }
2151                        }
2152                    }
2153                }
2154
2155                debug!(
2156                    event_type = "UpdateCreatorsWitnessesFact",
2157                    version = version,
2158                    remove_creator_count = remove_creator.len(),
2159                    update_creator_witnesses_count =
2160                        update_creator_witnesses.len(),
2161                    new_witnesses_count = new_witnesses.len(),
2162                    new_creator_count = new_creator.len(),
2163                    remove_creator_count = remove_creator.len(),
2164                    "Creators and witnesses updated in state"
2165                );
2166            }
2167            WitnessesRegisterEvent::UpdateSn { subject_id, sn } => {
2168                if let Some(data) = self.subjects.get_mut(subject_id) {
2169                    data.sn = *sn;
2170
2171                    debug!(
2172                        event_type = "UpdateSn",
2173                        subject_id = %subject_id,
2174                        sn = sn,
2175                        "Sequence number updated"
2176                    );
2177                } else {
2178                    error!(
2179                        event_type = "UpdateSn",
2180                        subject_id = %subject_id,
2181                        "Subject not found in register"
2182                    );
2183                };
2184            }
2185            WitnessesRegisterEvent::UpdateTrackerVisibility {
2186                subject_id,
2187                sn,
2188                mode,
2189                stored_visibility,
2190                event_visibility,
2191            } => {
2192                if let Some(data) = self.subjects.get_mut(subject_id) {
2193                    data.visibility_state.set_mode(*mode);
2194                    data.visibility_state.record_event(
2195                        *sn,
2196                        stored_visibility.clone(),
2197                        event_visibility.clone(),
2198                    );
2199                } else {
2200                    warn!(
2201                        event_type = "UpdateTrackerVisibility",
2202                        subject_id = %subject_id,
2203                        sn = sn,
2204                        "Tracker visibility update ignored because subject was not found"
2205                    );
2206                }
2207            }
2208            WitnessesRegisterEvent::Create {
2209                subject_id,
2210                owner,
2211                gov_version,
2212            } => {
2213                let data = self.subjects.entry(subject_id.clone()).or_default();
2214
2215                data.actual_owner = owner.clone();
2216                data.gov_version = *gov_version;
2217                data.visibility_state = TrackerVisibilityState::default();
2218                data.visibility_state.record_event(
2219                    0,
2220                    TrackerStoredVisibility::Full,
2221                    TrackerEventVisibility::NonFact,
2222                );
2223
2224                debug!(
2225                    event_type = "Create",
2226                    subject_id = %subject_id,
2227                    owner = %owner,
2228                    gov_version = gov_version,
2229                    "Transfer entry created"
2230                );
2231            }
2232            WitnessesRegisterEvent::Transfer {
2233                subject_id,
2234                new_owner,
2235                gov_version,
2236            } => {
2237                if let Some(data) = self.subjects.get_mut(subject_id) {
2238                    data.actual_new_owner_data =
2239                        Some((new_owner.clone(), *gov_version));
2240
2241                    debug!(
2242                        event_type = "Transfer",
2243                        subject_id = %subject_id,
2244                        new_owner = %new_owner,
2245                        gov_version = gov_version,
2246                        "Transfer initiated"
2247                    );
2248                } else {
2249                    error!(
2250                        event_type = "Transfer",
2251                        subject_id = %subject_id,
2252                        new_owner = %new_owner,
2253                        "Subject not found in register"
2254                    );
2255                };
2256            }
2257            WitnessesRegisterEvent::Confirm {
2258                subject_id,
2259                sn,
2260                gov_version,
2261            } => {
2262                if let Some(data) = self.subjects.get_mut(subject_id) {
2263                    let new_owner = data.actual_new_owner_data.take();
2264
2265                    if let Some((new_owner, new_owner_gov_version)) = new_owner
2266                    {
2267                        let entry = data
2268                            .old_owners
2269                            .entry(data.actual_owner.clone())
2270                            .or_default();
2271                        entry.sn = *sn;
2272                        entry.interval_gov_version.insert(Interval {
2273                            lo: data.gov_version,
2274                            hi: *gov_version,
2275                        });
2276
2277                        data.actual_owner = new_owner;
2278                        data.gov_version = new_owner_gov_version;
2279
2280                        debug!(
2281                            event_type = "Confirm",
2282                            subject_id = %subject_id,
2283                            sn = sn,
2284                            gov_version = gov_version,
2285                            "Transfer confirmed"
2286                        );
2287                    } else {
2288                        error!(
2289                            event_type = "Confirm",
2290                            subject_id = %subject_id,
2291                            sn = sn,
2292                            "No pending new owner to confirm"
2293                        );
2294                    };
2295                } else {
2296                    error!(
2297                        event_type = "Confirm",
2298                        subject_id = %subject_id,
2299                        sn = sn,
2300                        "Subject not found in register"
2301                    );
2302                };
2303            }
2304            WitnessesRegisterEvent::DeleteSubject { subject_id } => {
2305                self.subjects.remove(subject_id);
2306
2307                debug!(
2308                    event_type = "DeleteSubject",
2309                    subject_id = %subject_id,
2310                    "Witness subject entry deleted from state"
2311                );
2312            }
2313            WitnessesRegisterEvent::Reject {
2314                subject_id,
2315                sn,
2316                gov_version,
2317            } => {
2318                if let Some(data) = self.subjects.get_mut(subject_id) {
2319                    let new_owner = data.actual_new_owner_data.take();
2320
2321                    if let Some((new_owner, new_owner_gov_version)) = new_owner
2322                    {
2323                        let entry =
2324                            data.old_owners.entry(new_owner).or_default();
2325                        entry.sn = *sn;
2326                        entry.interval_gov_version.insert(Interval {
2327                            lo: new_owner_gov_version,
2328                            hi: *gov_version,
2329                        });
2330
2331                        debug!(
2332                            event_type = "Reject",
2333                            subject_id = %subject_id,
2334                            sn = sn,
2335                            gov_version = gov_version,
2336                            "Transfer rejected"
2337                        );
2338                    } else {
2339                        error!(
2340                            event_type = "Reject",
2341                            subject_id = %subject_id,
2342                            sn = sn,
2343                            "No pending new owner to reject"
2344                        );
2345                    };
2346                } else {
2347                    error!(
2348                        event_type = "Reject",
2349                        subject_id = %subject_id,
2350                        sn = sn,
2351                        "Subject not found in register"
2352                    );
2353                };
2354            }
2355        };
2356
2357        Ok(())
2358    }
2359}
2360
2361impl Storable for WitnessesRegister {}