1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
2
3use crate::governance::sn_register::{
4 SnLimit, SnRegister, SnRegisterMessage, SnRegisterResponse,
5};
6use crate::governance::subject_register::{
7 SubjectRegister, SubjectRegisterMessage, SubjectRegisterResponse,
8};
9use crate::model::common::{
10 Interval, IntervalSet, TrackerEventVisibility, TrackerStoredVisibility,
11 TrackerVisibilityMode, TrackerVisibilityState, emit_fail, purge_storage,
12};
13use async_trait::async_trait;
14use ave_actors::{
15 Actor, ActorContext, ActorError, ActorPath, Event, Handler, Message,
16 Response,
17};
18use ave_actors::{LightPersistence, PersistentActor};
19use ave_common::identity::{DigestIdentifier, PublicKey};
20use ave_common::{Namespace, SchemaType};
21use borsh::{BorshDeserialize, BorshSerialize};
22use serde::{Deserialize, Serialize};
23use tracing::{Span, debug, error, info_span, warn};
24
25use crate::db::Storable;
26
27#[derive(
28 Debug,
29 Clone,
30 Serialize,
31 Deserialize,
32 Default,
33 BorshDeserialize,
34 BorshSerialize,
35)]
36pub struct WitnessesRegister {
37 gov_sn: u64,
38 subjects: HashMap<DigestIdentifier, TransferData>,
39 witnesses:
40 HashMap<(PublicKey, SchemaType), HashMap<Namespace, IntervalData>>,
41 witnesses_creator: HashMap<
42 (PublicKey, String, SchemaType),
43 HashMap<WitnessesType, IntervalData>,
44 >,
45 witnesses_creator_grants: HashMap<
46 (PublicKey, String, SchemaType),
47 HashMap<WitnessesType, CreatorWitnessGrantHistory>,
48 >,
49 #[serde(skip)]
50 ledger_batch_size: usize,
51}
52
53type IntervalData = (IntervalSet, Option<u64>);
54
55pub enum ActualSearch {
56 End(SnLimit),
57 Continue { gov_version: Option<u64> },
58}
59
60#[derive(
61 Debug,
62 Clone,
63 Serialize,
64 Deserialize,
65 BorshDeserialize,
66 BorshSerialize,
67 Hash,
68 PartialEq,
69 Eq,
70 PartialOrd,
71 Ord,
72)]
73pub enum WitnessesType {
74 User(PublicKey),
75 Witnesses,
76}
77
78#[derive(
79 Debug,
80 Clone,
81 Serialize,
82 Deserialize,
83 BorshDeserialize,
84 BorshSerialize,
85 Hash,
86 PartialEq,
87 Eq,
88 PartialOrd,
89 Ord,
90)]
91pub enum CreatorWitnessGrant {
92 Hash,
93 Clear(BTreeSet<String>),
94 Full,
95}
96
97#[derive(
98 Debug,
99 Clone,
100 Serialize,
101 Deserialize,
102 Default,
103 BorshDeserialize,
104 BorshSerialize,
105)]
106pub struct CreatorWitnessRegistration {
107 pub witnesses: Vec<WitnessesType>,
108 pub grants: Vec<(WitnessesType, CreatorWitnessGrant)>,
109}
110
111#[derive(
112 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
113)]
114pub struct CreatorWitnessGrantRange {
115 pub interval: Interval,
116 pub grant: CreatorWitnessGrant,
117}
118
119#[derive(
120 Debug,
121 Clone,
122 Serialize,
123 Deserialize,
124 Default,
125 BorshDeserialize,
126 BorshSerialize,
127)]
128pub struct CreatorWitnessGrantHistory {
129 pub closed: Vec<CreatorWitnessGrantRange>,
130 pub current_from: Option<u64>,
131 pub current_grant: Option<CreatorWitnessGrant>,
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub enum TrackerDeliveryMode {
136 Clear,
137 Opaque,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct TrackerDeliveryRange {
142 pub from_sn: u64,
143 pub to_sn: u64,
144 pub mode: TrackerDeliveryMode,
145}
146
147#[derive(
148 Debug,
149 Clone,
150 Serialize,
151 Deserialize,
152 Default,
153 BorshDeserialize,
154 BorshSerialize,
155)]
156pub struct TransferData {
157 actual_owner: PublicKey,
158 actual_new_owner_data: Option<(PublicKey, u64)>,
159 sn: u64,
160 gov_version: u64,
161 old_owners: HashMap<PublicKey, OldOwnerData>,
162 visibility_state: TrackerVisibilityState,
163}
164
165#[derive(
166 Debug,
167 Clone,
168 Serialize,
169 Deserialize,
170 Default,
171 BorshDeserialize,
172 BorshSerialize,
173)]
174pub struct OldOwnerData {
175 sn: u64,
176 interval_gov_version: IntervalSet,
177}
178
179#[derive(Debug, Clone)]
180pub enum WitnessesRegisterMessage {
181 PurgeStorage,
182 GetSnGov,
183 GetTrackerSnOwner {
184 subject_id: DigestIdentifier,
185 },
186 ListCurrentWitnessSubjects {
187 node: PublicKey,
188 governance_version: u64,
189 after_subject_id: Option<DigestIdentifier>,
190 limit: usize,
191 },
192 UpdateCreatorsWitnessesFact {
193 version: u64,
194 new_creator: HashMap<
195 (SchemaType, String, PublicKey),
196 CreatorWitnessRegistration,
197 >,
198 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
199 update_creator_witnesses: HashMap<
200 (SchemaType, String, PublicKey),
201 CreatorWitnessRegistration,
202 >,
203
204 new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
205 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
206 },
207 UpdateCreatorsWitnessesConfirm {
208 version: u64,
209 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
210 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
211 },
212 UpdateSn {
213 subject_id: DigestIdentifier,
214 sn: u64,
215 },
216 UpdateSnGov {
217 sn: u64,
218 },
219 Create {
220 subject_id: DigestIdentifier,
221 owner: PublicKey,
222 gov_version: u64,
223 },
224 Transfer {
225 subject_id: DigestIdentifier,
226 new_owner: PublicKey,
227 gov_version: u64,
228 },
229 Confirm {
230 subject_id: DigestIdentifier,
231 sn: u64,
232 gov_version: u64,
233 },
234 DeleteSubject {
235 subject_id: DigestIdentifier,
236 },
237 UpdateTrackerVisibility {
238 subject_id: DigestIdentifier,
239 sn: u64,
240 mode: TrackerVisibilityMode,
241 stored_visibility: TrackerStoredVisibility,
242 event_visibility: TrackerEventVisibility,
243 },
244 Reject {
245 subject_id: DigestIdentifier,
246 sn: u64,
247 gov_version: u64,
248 },
249 Access {
250 subject_id: DigestIdentifier,
251 node: PublicKey,
252 namespace: String,
253 schema_id: SchemaType,
254 },
255 GetTrackerVisibilityState {
256 subject_id: DigestIdentifier,
257 },
258 GetTrackerWindow {
259 subject_id: DigestIdentifier,
260 node: PublicKey,
261 namespace: String,
262 schema_id: SchemaType,
263 actual_sn: Option<u64>,
264 },
265}
266
267impl Message for WitnessesRegisterMessage {
268 fn is_critical(&self) -> bool {
269 matches!(
270 self,
271 Self::PurgeStorage
272 | Self::UpdateCreatorsWitnessesFact { .. }
273 | Self::UpdateCreatorsWitnessesConfirm { .. }
274 | Self::UpdateSn { .. }
275 | Self::UpdateSnGov { .. }
276 | Self::Create { .. }
277 | Self::Transfer { .. }
278 | Self::Confirm { .. }
279 | Self::UpdateTrackerVisibility { .. }
280 | Self::DeleteSubject { .. }
281 | Self::Reject { .. }
282 )
283 }
284}
285
286#[derive(
287 Debug, Clone, Serialize, Deserialize, BorshDeserialize, BorshSerialize,
288)]
289pub enum WitnessesRegisterEvent {
290 UpdateCreatorsWitnessesFact {
291 version: u64,
292 new_creator: HashMap<
293 (SchemaType, String, PublicKey),
294 CreatorWitnessRegistration,
295 >,
296 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
297 update_creator_witnesses: HashMap<
298 (SchemaType, String, PublicKey),
299 CreatorWitnessRegistration,
300 >,
301
302 new_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
303 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
304 },
305 UpdateCreatorsWitnessesConfirm {
306 version: u64,
307 remove_creator: HashSet<(SchemaType, String, PublicKey)>,
308 remove_witnesses: HashMap<(SchemaType, PublicKey), Vec<Namespace>>,
309 },
310 UpdateSn {
311 subject_id: DigestIdentifier,
312 sn: u64,
313 },
314 UpdateSnGov {
315 sn: u64,
316 },
317 Create {
318 subject_id: DigestIdentifier,
319 owner: PublicKey,
320 gov_version: u64,
321 },
322 Transfer {
323 subject_id: DigestIdentifier,
324 new_owner: PublicKey,
325 gov_version: u64,
326 },
327 Confirm {
328 subject_id: DigestIdentifier,
329 sn: u64,
330 gov_version: u64,
331 },
332 DeleteSubject {
333 subject_id: DigestIdentifier,
334 },
335 UpdateTrackerVisibility {
336 subject_id: DigestIdentifier,
337 sn: u64,
338 mode: TrackerVisibilityMode,
339 stored_visibility: TrackerStoredVisibility,
340 event_visibility: TrackerEventVisibility,
341 },
342 Reject {
343 subject_id: DigestIdentifier,
344 sn: u64,
345 gov_version: u64,
346 },
347}
348
349impl Event for WitnessesRegisterEvent {}
350
351pub enum WitnessesRegisterResponse {
352 Access {
353 sn: Option<u64>,
354 },
355 GovSn {
356 sn: u64,
357 },
358 TrackerOwnerSn {
359 data: Option<(PublicKey, u64)>,
360 },
361 CurrentWitnessSubjects {
362 governance_version: u64,
363 items: Vec<CurrentWitnessSubject>,
364 next_cursor: Option<DigestIdentifier>,
365 },
366 TrackerVisibilityState {
367 state: TrackerVisibilityState,
368 },
369 TrackerWindow {
370 sn: Option<u64>,
371 clear_sn: Option<u64>,
372 is_all: bool,
373 ranges: Vec<TrackerDeliveryRange>,
374 },
375 Ok,
376}
377
378impl Response for WitnessesRegisterResponse {}
379
380#[derive(Debug, Clone, Serialize, Deserialize)]
381pub struct CurrentWitnessSubject {
382 pub subject_id: DigestIdentifier,
383 pub target_sn: u64,
384}
385
386impl CreatorWitnessGrantHistory {
387 fn apply_version(
388 &mut self,
389 version: u64,
390 grant: Option<CreatorWitnessGrant>,
391 ) {
392 match (&self.current_from, &self.current_grant, &grant) {
393 (Some(current_from), Some(current_grant), Some(next_grant))
394 if current_grant != next_grant =>
395 {
396 self.closed.push(CreatorWitnessGrantRange {
397 interval: Interval::new(*current_from, version - 1),
398 grant: current_grant.clone(),
399 });
400 self.current_from = Some(version);
401 self.current_grant = Some(next_grant.clone());
402 }
403 (Some(_), Some(_), Some(_)) => {}
404 (Some(current_from), Some(current_grant), None) => {
405 self.closed.push(CreatorWitnessGrantRange {
406 interval: Interval::new(*current_from, version - 1),
407 grant: current_grant.clone(),
408 });
409 self.current_from = None;
410 self.current_grant = None;
411 }
412 (None, None, Some(next_grant)) => {
413 self.current_from = Some(version);
414 self.current_grant = Some(next_grant.clone());
415 }
416 _ => {}
417 }
418 }
419}
420
421impl WitnessesRegister {
422 fn merge_grant(
423 actual: Option<CreatorWitnessGrant>,
424 next: &CreatorWitnessGrant,
425 ) -> CreatorWitnessGrant {
426 match (actual, next) {
427 (Some(CreatorWitnessGrant::Full), ..)
428 | (_, CreatorWitnessGrant::Full) => CreatorWitnessGrant::Full,
429 (
430 Some(CreatorWitnessGrant::Clear(mut left)),
431 CreatorWitnessGrant::Clear(right),
432 ) => {
433 left.extend(right.iter().cloned());
434 CreatorWitnessGrant::Clear(left)
435 }
436 (
437 Some(CreatorWitnessGrant::Clear(left)),
438 CreatorWitnessGrant::Hash,
439 ) => CreatorWitnessGrant::Clear(left),
440 (
441 Some(CreatorWitnessGrant::Hash),
442 CreatorWitnessGrant::Clear(right),
443 ) => CreatorWitnessGrant::Clear(right.clone()),
444 (Some(CreatorWitnessGrant::Hash), CreatorWitnessGrant::Hash)
445 | (None, CreatorWitnessGrant::Hash) => CreatorWitnessGrant::Hash,
446 (None, CreatorWitnessGrant::Clear(right)) => {
447 CreatorWitnessGrant::Clear(right.clone())
448 }
449 }
450 }
451
452 fn interval_overlaps_owner(
453 current_from: Option<u64>,
454 intervals: &IntervalSet,
455 owner_lo: u64,
456 owner_hi: u64,
457 ) -> bool {
458 current_from.is_some_and(|from| from <= owner_hi)
459 || intervals.max_covered_in(owner_lo, owner_hi).is_some()
460 }
461
462 fn covered_old_owner_intervals(
463 current_from: Option<u64>,
464 intervals: &IntervalSet,
465 old_owner: &OldOwnerData,
466 ) -> IntervalSet {
467 let mut covered = IntervalSet::new();
468
469 for owner_range in old_owner.interval_gov_version.iter() {
470 if let Some(from) = current_from {
471 let lo = from.max(owner_range.lo);
472 if lo <= owner_range.hi {
473 covered.insert(Interval {
474 lo,
475 hi: owner_range.hi,
476 });
477 }
478 }
479
480 for witness_range in intervals.iter() {
481 let lo = witness_range.lo.max(owner_range.lo);
482 let hi = witness_range.hi.min(owner_range.hi);
483
484 if lo <= hi {
485 covered.insert(Interval { lo, hi });
486 }
487 }
488 }
489
490 covered
491 }
492
493 fn max_covered_old_owner_gov_version(
494 current_from: Option<u64>,
495 intervals: &IntervalSet,
496 old_owner: &OldOwnerData,
497 ) -> Option<u64> {
498 Self::covered_old_owner_intervals(current_from, intervals, old_owner)
499 .iter()
500 .last()
501 .map(|range| range.hi)
502 }
503
504 fn grant_for_owner_interval(
505 history: &CreatorWitnessGrantHistory,
506 owner_lo: u64,
507 owner_hi: u64,
508 ) -> Option<&CreatorWitnessGrant> {
509 if let (Some(current_from), Some(current_grant)) =
510 (history.current_from, history.current_grant.as_ref())
511 && current_from <= owner_hi
512 {
513 return Some(current_grant);
514 }
515
516 history
517 .closed
518 .iter()
519 .rev()
520 .find(|range| {
521 range.interval.lo <= owner_hi && range.interval.hi >= owner_lo
522 })
523 .map(|range| &range.grant)
524 }
525
526 fn schema_witness_covers_owner_interval(
527 &self,
528 node: &PublicKey,
529 schema_id: &SchemaType,
530 namespace: &Namespace,
531 owner_lo: u64,
532 owner_hi: u64,
533 ) -> bool {
534 let matches = |witness_data: &HashMap<Namespace, IntervalData>| {
535 witness_data.iter().any(
536 |(current_namespace, (intervals, current_from))| {
537 current_namespace.is_ancestor_or_equal_of(namespace)
538 && Self::interval_overlaps_owner(
539 *current_from,
540 intervals,
541 owner_lo,
542 owner_hi,
543 )
544 },
545 )
546 };
547
548 self.witnesses
549 .get(&(node.clone(), schema_id.clone()))
550 .is_some_and(matches)
551 || self
552 .witnesses
553 .get(&(node.clone(), SchemaType::TrackerSchemas))
554 .is_some_and(matches)
555 }
556
557 fn creator_grant_for_owner_interval(
558 &self,
559 node: &PublicKey,
560 creator: &PublicKey,
561 schema_id: &SchemaType,
562 namespace: &Namespace,
563 owner_lo: u64,
564 owner_hi: u64,
565 ) -> Option<CreatorWitnessGrant> {
566 let grants = self.witnesses_creator_grants.get(&(
567 creator.clone(),
568 namespace.to_string(),
569 schema_id.clone(),
570 ))?;
571 let intervals = self.witnesses_creator.get(&(
572 creator.clone(),
573 namespace.to_string(),
574 schema_id.clone(),
575 ))?;
576
577 let mut out = None;
578
579 if let (Some(history), Some((creator_intervals, creator_current_from))) = (
580 grants.get(&WitnessesType::User(node.clone())),
581 intervals.get(&WitnessesType::User(node.clone())),
582 ) && Self::interval_overlaps_owner(
583 *creator_current_from,
584 creator_intervals,
585 owner_lo,
586 owner_hi,
587 ) && let Some(grant) =
588 Self::grant_for_owner_interval(history, owner_lo, owner_hi)
589 {
590 out = Some(Self::merge_grant(out, grant));
591 }
592
593 if let (Some(history), Some((creator_intervals, creator_current_from))) = (
594 grants.get(&WitnessesType::Witnesses),
595 intervals.get(&WitnessesType::Witnesses),
596 ) && Self::interval_overlaps_owner(
597 *creator_current_from,
598 creator_intervals,
599 owner_lo,
600 owner_hi,
601 ) && self.schema_witness_covers_owner_interval(
602 node, schema_id, namespace, owner_lo, owner_hi,
603 ) && let Some(grant) =
604 Self::grant_for_owner_interval(history, owner_lo, owner_hi)
605 {
606 out = Some(Self::merge_grant(out, grant));
607 }
608
609 out
610 }
611
612 fn creator_grant_for_event_or_current_owner(
613 &self,
614 node: &PublicKey,
615 creator: &PublicKey,
616 schema_id: &SchemaType,
617 namespace: &Namespace,
618 event_gov_version: u64,
619 owner_from_gov_version: u64,
620 ) -> Option<CreatorWitnessGrant> {
621 self.creator_grant_for_owner_interval(
622 node,
623 creator,
624 schema_id,
625 namespace,
626 event_gov_version,
627 event_gov_version,
628 )
629 .or_else(|| {
630 self.creator_grant_for_owner_interval(
631 node,
632 creator,
633 schema_id,
634 namespace,
635 owner_from_gov_version,
636 u64::MAX,
637 )
638 })
639 }
640
641 fn grant_allows_clear(
642 grant: Option<CreatorWitnessGrant>,
643 viewpoints: &BTreeSet<String>,
644 ) -> bool {
645 match grant {
646 Some(CreatorWitnessGrant::Full) => true,
647 Some(CreatorWitnessGrant::Clear(allowed)) => {
648 viewpoints.is_empty() || viewpoints.is_subset(&allowed)
649 }
650 Some(CreatorWitnessGrant::Hash) | None => false,
651 }
652 }
653
654 async fn get_gov_version_window(
655 &self,
656 ctx: &ActorContext<Self>,
657 subject_id: &DigestIdentifier,
658 from_sn: u64,
659 to_sn: u64,
660 ) -> Result<Vec<(Interval, u64)>, ActorError> {
661 let governance_id = ctx.path().parent().key();
662 let path = ActorPath::from(format!(
663 "/user/node/subject_manager/{}/sn_register",
664 governance_id
665 ));
666 let sn_register = ctx.system().get_actor::<SnRegister>(&path).await?;
667 let response = sn_register
668 .ask(SnRegisterMessage::GetGovVersionWindow {
669 subject_id: subject_id.clone(),
670 from_sn,
671 to_sn,
672 })
673 .await?;
674
675 match response {
676 SnRegisterResponse::GovVersionWindow(ranges) => Ok(ranges
677 .into_iter()
678 .map(|range| {
679 (
680 Interval::new(range.from_sn, range.to_sn),
681 range.gov_version,
682 )
683 })
684 .collect()),
685 _ => Err(ActorError::UnexpectedResponse {
686 path,
687 expected: "SnRegisterResponse::GovVersionWindow".to_owned(),
688 }),
689 }
690 }
691
692 fn gov_version_for_sn(
693 gov_versions: &[(Interval, u64)],
694 sn: u64,
695 ) -> Option<u64> {
696 gov_versions
697 .iter()
698 .find(|(interval, _)| interval.contains(sn))
699 .map(|(_, gov_version)| *gov_version)
700 }
701
702 fn event_delivery_mode(
703 &self,
704 data: &TransferData,
705 node: &PublicKey,
706 namespace: &Namespace,
707 schema_id: &SchemaType,
708 sn: u64,
709 gov_version: u64,
710 ) -> TrackerDeliveryMode {
711 let stored_span = data.visibility_state.iter_stored(sn, sn).next();
712 let event_span = data.visibility_state.iter_events(sn, sn).next();
713
714 let Some(stored_span) = stored_span else {
715 return TrackerDeliveryMode::Opaque;
716 };
717 let Some(event_span) = event_span else {
718 return TrackerDeliveryMode::Opaque;
719 };
720
721 match event_span.visibility {
722 TrackerEventVisibility::NonFact => TrackerDeliveryMode::Clear,
723 TrackerEventVisibility::Fact(viewpoints) => {
724 if viewpoints.is_empty() {
725 return TrackerDeliveryMode::Clear;
726 }
727
728 if data.actual_owner == *node
729 || data
730 .actual_new_owner_data
731 .as_ref()
732 .is_some_and(|(new_owner, _)| new_owner == node)
733 {
734 return TrackerDeliveryMode::Clear;
735 }
736
737 if let Some(old_owner) = data.old_owners.get(node)
738 && sn <= old_owner.sn
739 {
740 return TrackerDeliveryMode::Clear;
741 }
742
743 if matches!(
744 stored_span.visibility,
745 TrackerStoredVisibility::None
746 ) {
747 return TrackerDeliveryMode::Opaque;
748 }
749
750 let mut grant = None;
751
752 if gov_version >= data.gov_version {
753 grant = Some(Self::merge_grant(
754 grant,
755 &self
756 .creator_grant_for_event_or_current_owner(
757 node,
758 &data.actual_owner,
759 schema_id,
760 namespace,
761 gov_version,
762 data.gov_version,
763 )
764 .unwrap_or(CreatorWitnessGrant::Hash),
765 ));
766 }
767
768 if let Some((new_owner, new_owner_gov_version)) =
769 &data.actual_new_owner_data
770 && gov_version >= *new_owner_gov_version
771 {
772 grant = Some(Self::merge_grant(
773 grant,
774 &self
775 .creator_grant_for_event_or_current_owner(
776 node,
777 new_owner,
778 schema_id,
779 namespace,
780 gov_version,
781 *new_owner_gov_version,
782 )
783 .unwrap_or(CreatorWitnessGrant::Hash),
784 ));
785 }
786
787 for (creator, old_owner) in &data.old_owners {
788 if sn > old_owner.sn {
789 continue;
790 }
791
792 for range in old_owner.interval_gov_version.iter().rev() {
793 if !range.contains(gov_version) {
794 continue;
795 }
796
797 grant = Some(Self::merge_grant(
798 grant,
799 &self
800 .creator_grant_for_owner_interval(
801 node,
802 creator,
803 schema_id,
804 namespace,
805 gov_version,
806 gov_version,
807 )
808 .unwrap_or(CreatorWitnessGrant::Hash),
809 ));
810 break;
811 }
812 }
813
814 if Self::grant_allows_clear(grant, viewpoints) {
815 TrackerDeliveryMode::Clear
816 } else {
817 TrackerDeliveryMode::Opaque
818 }
819 }
820 }
821 }
822
823 async fn build_tracker_window(
824 &self,
825 ctx: &ActorContext<Self>,
826 subject_id: &DigestIdentifier,
827 node: &PublicKey,
828 namespace: String,
829 schema_id: SchemaType,
830 actual_sn: Option<u64>,
831 ) -> Result<
832 (Option<u64>, Option<u64>, bool, Vec<TrackerDeliveryRange>),
833 ActorError,
834 > {
835 let Some(data) = self.subjects.get(subject_id) else {
836 return Ok((None, None, true, Vec::new()));
837 };
838
839 let access_limit = match self
840 .search_witnesses(
841 ctx,
842 node,
843 data,
844 namespace.clone(),
845 schema_id.clone(),
846 subject_id.clone(),
847 )
848 .await?
849 {
850 SnLimit::Sn(sn) => Some(sn),
851 SnLimit::LastSn => Some(data.sn),
852 SnLimit::NotSn => {
853 if data.actual_owner == *node
854 || data
855 .actual_new_owner_data
856 .as_ref()
857 .is_some_and(|(new_owner, _)| new_owner == node)
858 {
859 Some(data.sn)
860 } else {
861 data.old_owners.get(node).map(|old_owner| old_owner.sn)
862 }
863 }
864 };
865
866 let Some(access_limit) = access_limit else {
867 return Ok((None, None, true, Vec::new()));
868 };
869
870 let from_sn = actual_sn.map_or(0, |sn| sn.saturating_add(1));
871 if from_sn > access_limit {
872 return Ok((None, None, true, Vec::new()));
873 }
874
875 let namespace = Namespace::from(namespace);
876 let gov_versions = self
877 .get_gov_version_window(ctx, subject_id, from_sn, access_limit)
878 .await?;
879
880 let mut ranges: Vec<TrackerDeliveryRange> = Vec::new();
881 let mut clear_sn = None;
882
883 for sn in from_sn..=access_limit {
884 let Some(gov_version) = Self::gov_version_for_sn(&gov_versions, sn)
885 .or_else(|| (sn == 0).then_some(data.gov_version))
886 else {
887 continue;
888 };
889
890 let mode = self.event_delivery_mode(
891 data,
892 node,
893 &namespace,
894 &schema_id,
895 sn,
896 gov_version,
897 );
898
899 match ranges.last_mut() {
900 Some(last)
901 if std::mem::discriminant(&last.mode)
902 == std::mem::discriminant(&mode)
903 && last.to_sn + 1 == sn =>
904 {
905 last.to_sn = sn;
906 }
907 _ => ranges.push(TrackerDeliveryRange {
908 from_sn: sn,
909 to_sn: sn,
910 mode: mode.clone(),
911 }),
912 }
913
914 if matches!(mode, TrackerDeliveryMode::Clear)
915 && ((clear_sn.is_none()
916 && matches!(
917 ranges.first().map(|x| &x.mode),
918 Some(TrackerDeliveryMode::Clear)
919 ))
920 || clear_sn == Some(sn.saturating_sub(1)))
921 {
922 clear_sn = Some(sn);
923 }
924 }
925
926 Ok((Some(access_limit), clear_sn, true, ranges))
927 }
928
929 async fn access_limit_for_node(
930 &self,
931 ctx: &ActorContext<Self>,
932 subject_id: &DigestIdentifier,
933 node: &PublicKey,
934 namespace: &str,
935 schema_id: &SchemaType,
936 ) -> Result<Option<u64>, ActorError> {
937 let Some(data) = self.subjects.get(subject_id) else {
938 return Ok(None);
939 };
940
941 let sn = if data.actual_owner == *node {
942 Some(data.sn)
943 } else if let Some((new_owner, ..)) = &data.actual_new_owner_data
944 && new_owner == node
945 {
946 Some(data.sn)
947 } else if let Some(old_data) = data.old_owners.get(node) {
948 let sn_limit = self
949 .search_witnesses(
950 ctx,
951 node,
952 data,
953 namespace.to_owned(),
954 schema_id.clone(),
955 subject_id.clone(),
956 )
957 .await?;
958
959 let sn = match sn_limit {
960 SnLimit::Sn(sn) => sn.max(old_data.sn),
961 SnLimit::LastSn => unreachable!(
962 "search_witnesses can not return SnLimit::LastSn"
963 ),
964 SnLimit::NotSn => old_data.sn,
965 };
966
967 Some(sn)
968 } else {
969 let sn_limit = self
970 .search_witnesses(
971 ctx,
972 node,
973 data,
974 namespace.to_owned(),
975 schema_id.clone(),
976 subject_id.clone(),
977 )
978 .await?;
979
980 match sn_limit {
981 SnLimit::Sn(sn) => Some(sn),
982 SnLimit::LastSn => unreachable!(
983 "search_witnesses can not return SnLimit::LastSn"
984 ),
985 SnLimit::NotSn => None,
986 }
987 };
988
989 Ok(sn)
990 }
991
992 fn close_creator_registration(
993 &mut self,
994 schema_id: &SchemaType,
995 namespace: &str,
996 creator: &PublicKey,
997 version: u64,
998 ) {
999 if let Some(witnesses) = self.witnesses_creator.get_mut(&(
1000 creator.clone(),
1001 namespace.to_owned(),
1002 schema_id.clone(),
1003 )) {
1004 for (.., (interval, last)) in witnesses.iter_mut() {
1005 if let Some(last) = last.take() {
1006 interval.insert(Interval {
1007 lo: last,
1008 hi: version - 1,
1009 });
1010 }
1011 }
1012 }
1013
1014 if let Some(grants) = self.witnesses_creator_grants.get_mut(&(
1015 creator.clone(),
1016 namespace.to_owned(),
1017 schema_id.clone(),
1018 )) {
1019 for history in grants.values_mut() {
1020 history.apply_version(version, None);
1021 }
1022 }
1023 }
1024
1025 fn apply_creator_registration(
1026 &mut self,
1027 schema_id: &SchemaType,
1028 namespace: &str,
1029 creator: &PublicKey,
1030 registration: &CreatorWitnessRegistration,
1031 version: u64,
1032 ) {
1033 let creator_entry = self
1034 .witnesses_creator
1035 .entry((creator.clone(), namespace.to_owned(), schema_id.clone()))
1036 .or_default();
1037
1038 let witnesses: HashSet<_> =
1039 registration.witnesses.iter().cloned().collect();
1040 for (witness_type, (interval, last)) in creator_entry.iter_mut() {
1041 if !witnesses.contains(witness_type)
1042 && let Some(lo) = last.take()
1043 {
1044 interval.insert(Interval {
1045 lo,
1046 hi: version - 1,
1047 });
1048 }
1049 }
1050
1051 for witness in ®istration.witnesses {
1052 if let Some((.., last)) = creator_entry.get_mut(witness) {
1053 if last.is_none() {
1054 *last = Some(version);
1055 }
1056 } else {
1057 creator_entry.insert(
1058 witness.clone(),
1059 (IntervalSet::new(), Some(version)),
1060 );
1061 }
1062 }
1063
1064 let creator_grants = self
1065 .witnesses_creator_grants
1066 .entry((creator.clone(), namespace.to_owned(), schema_id.clone()))
1067 .or_default();
1068
1069 let grant_map: HashMap<_, _> =
1070 registration.grants.iter().cloned().collect();
1071
1072 for (witness_type, history) in creator_grants.iter_mut() {
1073 history
1074 .apply_version(version, grant_map.get(witness_type).cloned());
1075 }
1076
1077 for (witness_type, grant) in grant_map {
1078 creator_grants
1079 .entry(witness_type)
1080 .or_default()
1081 .apply_version(version, Some(grant));
1082 }
1083 }
1084
1085 async fn get_sn(
1086 &self,
1087 ctx: &ActorContext<Self>,
1088 subject_id: DigestIdentifier,
1089 gov_version: u64,
1090 ) -> Result<SnLimit, ActorError> {
1091 let governance_id = ctx.path().parent().key();
1092
1093 let path = ActorPath::from(format!(
1094 "/user/node/subject_manager/{}/sn_register",
1095 governance_id
1096 ));
1097 let sn_register = ctx.system().get_actor::<SnRegister>(&path).await?;
1098 let response = sn_register
1099 .ask(SnRegisterMessage::GetSn {
1100 subject_id,
1101 gov_version,
1102 })
1103 .await?;
1104
1105 match response {
1106 SnRegisterResponse::Sn(sn_limit) => Ok(sn_limit),
1107 _ => Err(ActorError::UnexpectedResponse {
1108 path,
1109 expected: "SnRegisterResponse::Sn".to_owned(),
1110 }),
1111 }
1112 }
1113
1114 fn search_in_schema(
1115 witness_data: &HashMap<Namespace, (IntervalSet, Option<u64>)>,
1116 parse_namespace: &Namespace,
1117 data: &OldOwnerData,
1118 mut better_gov_version: Option<u64>,
1119 ) -> Option<u64> {
1120 for (namespace, (interval, actual_lo)) in witness_data.iter() {
1121 if !namespace.is_ancestor_or_equal_of(parse_namespace) {
1122 continue;
1123 }
1124
1125 if let Some(gov_version) = Self::max_covered_old_owner_gov_version(
1126 *actual_lo, interval, data,
1127 ) {
1128 better_gov_version = better_gov_version.max(Some(gov_version));
1129 }
1130 }
1131
1132 better_gov_version
1133 }
1134
1135 async fn search_in_schema_actual(
1136 witness_data: &HashMap<Namespace, (IntervalSet, Option<u64>)>,
1137 parse_namespace: &Namespace,
1138 gov_version: u64,
1139 sn: u64,
1140 mut better_gov_version: Option<u64>,
1141 ) -> ActualSearch {
1142 for (namespace, (interval, actual_lo)) in witness_data.iter() {
1143 if namespace.is_ancestor_or_equal_of(parse_namespace) {
1144 if actual_lo.is_some() {
1146 return ActualSearch::End(SnLimit::Sn(sn));
1147 }
1148
1149 if let Some(range) = interval.iter().last()
1150 && gov_version <= range.hi
1151 {
1152 better_gov_version = better_gov_version.max(Some(range.hi));
1154 }
1155 }
1156 }
1157
1158 ActualSearch::Continue {
1159 gov_version: better_gov_version,
1160 }
1161 }
1162
1163 async fn search_schemas_actual(
1165 &self,
1166 node: &PublicKey,
1167 schema_id: &SchemaType,
1168 parse_namespace: &Namespace,
1169 gov_version: u64,
1170 sn: u64,
1171 better_gov_version: Option<u64>,
1172 ) -> ActualSearch {
1173 let better_gov_version = if let Some(witness_data) =
1175 self.witnesses.get(&(node.clone(), schema_id.clone()))
1176 {
1177 match Self::search_in_schema_actual(
1178 witness_data,
1179 parse_namespace,
1180 gov_version,
1181 sn,
1182 better_gov_version,
1183 )
1184 .await
1185 {
1186 ActualSearch::End(sn_limit) => {
1187 return ActualSearch::End(sn_limit);
1188 }
1189 ActualSearch::Continue { gov_version } => gov_version,
1190 }
1191 } else {
1192 better_gov_version
1193 };
1194
1195 if let Some(witness_data) = self
1197 .witnesses
1198 .get(&(node.clone(), SchemaType::TrackerSchemas))
1199 {
1200 return Self::search_in_schema_actual(
1201 witness_data,
1202 parse_namespace,
1203 gov_version,
1204 sn,
1205 better_gov_version,
1206 )
1207 .await;
1208 }
1209
1210 ActualSearch::Continue {
1211 gov_version: better_gov_version,
1212 }
1213 }
1214
1215 fn search_schemas_old(
1217 &self,
1218 node: &PublicKey,
1219 schema_id: &SchemaType,
1220 parse_namespace: &Namespace,
1221 data: &OldOwnerData,
1222 better_gov_version: Option<u64>,
1223 ) -> Option<u64> {
1224 let better_gov_version = self
1226 .witnesses
1227 .get(&(node.clone(), schema_id.clone()))
1228 .map_or(better_gov_version, |witness_data| {
1229 Self::search_in_schema(
1230 witness_data,
1231 parse_namespace,
1232 data,
1233 better_gov_version,
1234 )
1235 });
1236
1237 if let Some(witness_data) = self
1239 .witnesses
1240 .get(&(node.clone(), SchemaType::TrackerSchemas))
1241 {
1242 return Self::search_in_schema(
1243 witness_data,
1244 parse_namespace,
1245 data,
1246 better_gov_version,
1247 );
1248 }
1249
1250 better_gov_version
1251 }
1252
1253 async fn check_current_owner(
1255 &self,
1256 witnesses_creator: &HashMap<WitnessesType, (IntervalSet, Option<u64>)>,
1257 node: &PublicKey,
1258 schema_id: &SchemaType,
1259 parse_namespace: &Namespace,
1260 sn: u64,
1261 owner_better_gov_version: (u64, Option<u64>),
1262 ) -> ActualSearch {
1263 let (owner_gov_version, mut better_gov_version) =
1264 owner_better_gov_version;
1265
1266 if let Some((interval, actual_lo)) =
1268 witnesses_creator.get(&WitnessesType::User(node.clone()))
1269 {
1270 if actual_lo.is_some() {
1272 return ActualSearch::End(SnLimit::Sn(sn));
1273 }
1274 if let Some(range) = interval.iter().last()
1277 && owner_gov_version <= range.hi
1278 {
1279 better_gov_version = better_gov_version.max(Some(range.hi));
1281 }
1282 }
1283
1284 if let Some((interval, actual_lo)) =
1289 witnesses_creator.get(&WitnessesType::Witnesses)
1290 {
1291 let witnesses_active = actual_lo.is_some()
1292 || interval
1293 .iter()
1294 .last()
1295 .is_some_and(|range| owner_gov_version <= range.hi);
1296
1297 if witnesses_active {
1298 return self
1299 .search_schemas_actual(
1300 node,
1301 schema_id,
1302 parse_namespace,
1303 owner_gov_version,
1304 sn,
1305 better_gov_version,
1306 )
1307 .await;
1308 }
1309 }
1310
1311 ActualSearch::Continue {
1312 gov_version: better_gov_version,
1313 }
1314 }
1315
1316 async fn search_witnesses(
1317 &self,
1318 ctx: &ActorContext<Self>,
1319 node: &PublicKey,
1320 data: &TransferData,
1321 namespace: String,
1322 schema_id: SchemaType,
1323 subject_id: DigestIdentifier,
1324 ) -> Result<SnLimit, ActorError> {
1325 let mut better_gov_version: Option<u64> = None;
1326 let mut better_sn: Option<u64> = None;
1327 let parse_namespace = Namespace::from(namespace.clone());
1328
1329 if let Some(witnesses_creator) = self.witnesses_creator.get(&(
1331 data.actual_owner.to_owned(),
1332 namespace.clone(),
1333 schema_id.clone(),
1334 )) {
1335 match self
1336 .check_current_owner(
1337 witnesses_creator,
1338 node,
1339 &schema_id,
1340 &parse_namespace,
1341 data.sn,
1342 (data.gov_version, better_gov_version),
1343 )
1344 .await
1345 {
1346 ActualSearch::End(sn_limit) => return Ok(sn_limit),
1347 ActualSearch::Continue { gov_version } => {
1348 better_gov_version = gov_version;
1349 }
1350 }
1351 }
1352
1353 if let Some((new_owner, new_owner_gov_version)) =
1354 &data.actual_new_owner_data
1355 && let Some(witnesses_creator) = self.witnesses_creator.get(&(
1356 new_owner.to_owned(),
1357 namespace.clone(),
1358 schema_id.clone(),
1359 ))
1360 {
1361 match self
1362 .check_current_owner(
1363 witnesses_creator,
1364 node,
1365 &schema_id,
1366 &parse_namespace,
1367 data.sn,
1368 (*new_owner_gov_version, better_gov_version),
1369 )
1370 .await
1371 {
1372 ActualSearch::End(sn_limit) => return Ok(sn_limit),
1373 ActualSearch::Continue { gov_version } => {
1374 better_gov_version = gov_version;
1375 }
1376 }
1377 }
1378
1379 for (creator, old_data) in data.old_owners.iter() {
1381 if let Some(witnesses_creator) = self.witnesses_creator.get(&(
1382 creator.to_owned(),
1383 namespace.clone(),
1384 schema_id.clone(),
1385 )) {
1386 if let Some((interval, actual_lo)) =
1387 witnesses_creator.get(&WitnessesType::User(node.clone()))
1388 && let Some(gov_version) =
1389 Self::max_covered_old_owner_gov_version(
1390 *actual_lo, interval, old_data,
1391 )
1392 {
1393 match self
1394 .get_sn(ctx, subject_id.clone(), gov_version)
1395 .await?
1396 {
1397 SnLimit::Sn(sn) => {
1398 better_sn =
1399 better_sn.max(Some(sn.min(old_data.sn)));
1400 }
1401 SnLimit::LastSn => {
1402 better_sn = better_sn.max(Some(old_data.sn));
1403 }
1404 SnLimit::NotSn => {}
1405 }
1406 }
1407
1408 if let Some((interval, actual_lo)) =
1410 witnesses_creator.get(&WitnessesType::Witnesses)
1411 {
1412 let covered_old_owner = Self::covered_old_owner_intervals(
1413 *actual_lo, interval, old_data,
1414 );
1415
1416 if covered_old_owner.iter().next().is_some() {
1417 let capped_old_owner = OldOwnerData {
1418 sn: old_data.sn,
1419 interval_gov_version: covered_old_owner,
1420 };
1421
1422 if let Some(gov_version) = self.search_schemas_old(
1423 node,
1424 &schema_id,
1425 &parse_namespace,
1426 &capped_old_owner,
1427 better_gov_version,
1428 ) {
1429 match self
1430 .get_sn(ctx, subject_id.clone(), gov_version)
1431 .await?
1432 {
1433 SnLimit::Sn(sn) => {
1434 better_sn = better_sn
1435 .max(Some(sn.min(old_data.sn)));
1436 }
1437 SnLimit::LastSn => {
1438 better_sn =
1439 better_sn.max(Some(old_data.sn));
1440 }
1441 SnLimit::NotSn => {}
1442 }
1443 }
1444 }
1445 }
1446 }
1447 }
1448
1449 let sn_limit = if let Some(gov_version) = better_gov_version {
1450 match self.get_sn(ctx, subject_id.clone(), gov_version).await? {
1451 SnLimit::Sn(sn) => better_sn
1452 .map_or(SnLimit::Sn(sn), |better_sn| {
1453 SnLimit::Sn(sn.max(better_sn))
1454 }),
1455 SnLimit::LastSn => SnLimit::Sn(data.sn),
1456 SnLimit::NotSn => better_sn.map_or(SnLimit::NotSn, SnLimit::Sn),
1457 }
1458 } else if let Some(better_sn) = better_sn {
1459 SnLimit::Sn(better_sn)
1460 } else {
1461 SnLimit::NotSn
1462 };
1463
1464 Ok(sn_limit)
1465 }
1466
1467 fn has_active_schema_witness(
1468 &self,
1469 node: &PublicKey,
1470 schema_id: &SchemaType,
1471 namespace: &Namespace,
1472 ) -> bool {
1473 let has_match = |witness_data: &HashMap<Namespace, IntervalData>| {
1474 witness_data
1475 .iter()
1476 .any(|(current_namespace, (_, current_lo))| {
1477 current_lo.is_some()
1478 && current_namespace.is_ancestor_or_equal_of(namespace)
1479 })
1480 };
1481
1482 self.witnesses
1483 .get(&(node.clone(), schema_id.clone()))
1484 .is_some_and(has_match)
1485 || self
1486 .witnesses
1487 .get(&(node.clone(), SchemaType::TrackerSchemas))
1488 .is_some_and(has_match)
1489 }
1490
1491 fn is_current_witness_for_entry(
1492 &self,
1493 node: &PublicKey,
1494 schema_id: &SchemaType,
1495 namespace: &str,
1496 creator_witnesses: &HashMap<WitnessesType, IntervalData>,
1497 ) -> bool {
1498 if creator_witnesses
1499 .get(&WitnessesType::User(node.clone()))
1500 .is_some_and(|(_, current_lo)| current_lo.is_some())
1501 {
1502 return true;
1503 }
1504
1505 if !creator_witnesses
1506 .get(&WitnessesType::Witnesses)
1507 .is_some_and(|(_, current_lo)| current_lo.is_some())
1508 {
1509 return false;
1510 }
1511
1512 self.has_active_schema_witness(
1513 node,
1514 schema_id,
1515 &Namespace::from(namespace.to_owned()),
1516 )
1517 }
1518
1519 async fn get_subjects_for_owner_schema(
1520 &self,
1521 ctx: &ActorContext<Self>,
1522 owner: &PublicKey,
1523 schema_id: &SchemaType,
1524 namespace: &str,
1525 ) -> Result<Vec<DigestIdentifier>, ActorError> {
1526 let governance_id = ctx.path().parent().key();
1527 let path = ActorPath::from(format!(
1528 "/user/node/subject_manager/{}/subject_register",
1529 governance_id
1530 ));
1531 let actor = ctx.system().get_actor::<SubjectRegister>(&path).await?;
1532 let response = actor
1533 .ask(SubjectRegisterMessage::GetSubjectsByOwnerSchema {
1534 owner: owner.clone(),
1535 schema_id: schema_id.clone(),
1536 namespace: namespace.to_owned(),
1537 })
1538 .await?;
1539
1540 match response {
1541 SubjectRegisterResponse::Subjects(subjects) => Ok(subjects),
1542 _ => Err(ActorError::UnexpectedResponse {
1543 path,
1544 expected: "SubjectRegisterResponse::Subjects".to_owned(),
1545 }),
1546 }
1547 }
1548
1549 async fn list_current_witness_subjects(
1550 &self,
1551 ctx: &ActorContext<Self>,
1552 node: &PublicKey,
1553 governance_version: u64,
1554 after_subject_id: Option<DigestIdentifier>,
1555 limit: usize,
1556 ) -> Result<
1557 (Vec<CurrentWitnessSubject>, Option<DigestIdentifier>),
1558 ActorError,
1559 > {
1560 let mut subjects = BTreeMap::new();
1561
1562 for ((creator, namespace, schema_id), creator_witnesses) in
1563 &self.witnesses_creator
1564 {
1565 if !self.is_current_witness_for_entry(
1566 node,
1567 schema_id,
1568 namespace,
1569 creator_witnesses,
1570 ) {
1571 continue;
1572 }
1573
1574 let current_subjects = self
1575 .get_subjects_for_owner_schema(
1576 ctx, creator, schema_id, namespace,
1577 )
1578 .await?;
1579
1580 for subject_id in current_subjects {
1581 if let Some(data) = self.subjects.get(&subject_id) {
1582 subjects.insert(subject_id, data.sn);
1583 }
1584 }
1585 }
1586
1587 let limit = limit.max(1);
1588 let mut items = Vec::with_capacity(limit + 1);
1589 let effective_cursor = if governance_version == self.gov_sn {
1590 after_subject_id
1591 } else {
1592 None
1593 };
1594
1595 for (subject_id, target_sn) in subjects {
1596 if effective_cursor
1597 .as_ref()
1598 .is_some_and(|cursor| &subject_id <= cursor)
1599 {
1600 continue;
1601 }
1602
1603 items.push(CurrentWitnessSubject {
1604 subject_id,
1605 target_sn,
1606 });
1607
1608 if items.len() > limit {
1609 break;
1610 }
1611 }
1612
1613 let next_cursor = if items.len() > limit {
1614 let extra = items.pop();
1615 let _ = extra;
1616 items.last().map(|item| item.subject_id.clone())
1617 } else {
1618 None
1619 };
1620
1621 Ok((items, next_cursor))
1622 }
1623}
1624
1625#[async_trait]
1626impl Actor for WitnessesRegister {
1627 type Event = WitnessesRegisterEvent;
1628 type Message = WitnessesRegisterMessage;
1629 type Response = WitnessesRegisterResponse;
1630
1631 fn get_span(_id: &str, parent_span: Option<Span>) -> tracing::Span {
1632 parent_span.map_or_else(
1633 || info_span!("WitnessesRegister"),
1634 |parent_span| info_span!(parent: parent_span, "WitnessesRegister"),
1635 )
1636 }
1637
1638 async fn pre_start(
1639 &mut self,
1640 ctx: &mut ActorContext<Self>,
1641 ) -> Result<(), ActorError> {
1642 let prefix = ctx.path().parent().key();
1643 if let Err(e) = self
1644 .init_store("witnesses_register", Some(prefix), false, ctx)
1645 .await
1646 {
1647 error!(
1648 error = %e,
1649 "Failed to initialize witnesses_register store"
1650 );
1651 return Err(e);
1652 }
1653 Ok(())
1654 }
1655}
1656
1657#[async_trait]
1658impl Handler<Self> for WitnessesRegister {
1659 async fn handle_message(
1660 &mut self,
1661 _sender: ActorPath,
1662 msg: WitnessesRegisterMessage,
1663 ctx: &mut ActorContext<Self>,
1664 ) -> Result<WitnessesRegisterResponse, ActorError> {
1665 match msg {
1666 WitnessesRegisterMessage::PurgeStorage => {
1667 purge_storage(ctx).await?;
1668
1669 debug!(
1670 msg_type = "PurgeStorage",
1671 "Witnesses register storage purged"
1672 );
1673
1674 return Ok(WitnessesRegisterResponse::Ok);
1675 }
1676 WitnessesRegisterMessage::ListCurrentWitnessSubjects {
1677 node,
1678 governance_version,
1679 after_subject_id,
1680 limit,
1681 } => {
1682 let (items, next_cursor) = self
1683 .list_current_witness_subjects(
1684 ctx,
1685 &node,
1686 governance_version,
1687 after_subject_id,
1688 limit,
1689 )
1690 .await?;
1691
1692 return Ok(WitnessesRegisterResponse::CurrentWitnessSubjects {
1693 governance_version: self.gov_sn,
1694 items,
1695 next_cursor,
1696 });
1697 }
1698 WitnessesRegisterMessage::GetTrackerSnOwner { subject_id } => {
1699 let data = self
1700 .subjects
1701 .get(&subject_id)
1702 .map(|data| (data.actual_owner.clone(), data.sn));
1703
1704 debug!(
1705 msg_type = "GetTrackerSnOwner",
1706 subject_id = %subject_id,
1707 found = data.is_some(),
1708 "Tracker sn owner lookup completed"
1709 );
1710
1711 return Ok(WitnessesRegisterResponse::TrackerOwnerSn { data });
1712 }
1713 WitnessesRegisterMessage::GetSnGov => {
1714 debug!(
1715 msg_type = "GetSnGov",
1716 sn = self.gov_sn,
1717 "Governance sn retrieved"
1718 );
1719 return Ok(WitnessesRegisterResponse::GovSn {
1720 sn: self.gov_sn,
1721 });
1722 }
1723 WitnessesRegisterMessage::UpdateSnGov { sn } => {
1724 self.on_event(WitnessesRegisterEvent::UpdateSnGov { sn }, ctx)
1725 .await;
1726
1727 debug!(
1728 msg_type = "UpdateSnGov",
1729 sn = sn,
1730 "Governance sn updated"
1731 );
1732 }
1733 WitnessesRegisterMessage::UpdateCreatorsWitnessesConfirm {
1734 version,
1735 remove_creator,
1736 remove_witnesses,
1737 } => {
1738 let remove_creator_count = remove_creator.len();
1739 let remove_witnesses_count = remove_witnesses.len();
1740 self.on_event(
1741 WitnessesRegisterEvent::UpdateCreatorsWitnessesConfirm {
1742 version,
1743 remove_creator,
1744 remove_witnesses,
1745 },
1746 ctx,
1747 )
1748 .await;
1749
1750 debug!(
1751 msg_type = "UpdateCreatorsWitnessesConfirm",
1752 version = version,
1753 remove_creator_count = remove_creator_count,
1754 remove_witnesses_count = remove_witnesses_count,
1755 "Creators and witnesses confirm updated"
1756 );
1757 }
1758 WitnessesRegisterMessage::UpdateCreatorsWitnessesFact {
1759 version,
1760 new_creator,
1761 remove_creator,
1762 update_creator_witnesses,
1763 new_witnesses,
1764 remove_witnesses,
1765 } => {
1766 let new_creator_count = new_creator.len();
1767 let remove_creator_count = remove_creator.len();
1768 self.on_event(
1769 WitnessesRegisterEvent::UpdateCreatorsWitnessesFact {
1770 version,
1771 new_creator,
1772 remove_creator,
1773 update_creator_witnesses,
1774 new_witnesses,
1775 remove_witnesses,
1776 },
1777 ctx,
1778 )
1779 .await;
1780
1781 debug!(
1782 msg_type = "UpdateCreatorsWitnessesFact",
1783 version = version,
1784 new_creator_count = new_creator_count,
1785 remove_creator_count = remove_creator_count,
1786 "Creators and witnesses updated"
1787 );
1788 }
1789 WitnessesRegisterMessage::UpdateSn { sn, subject_id } => {
1790 self.on_event(
1791 WitnessesRegisterEvent::UpdateSn {
1792 sn,
1793 subject_id: subject_id.clone(),
1794 },
1795 ctx,
1796 )
1797 .await;
1798
1799 debug!(
1800 msg_type = "UpdateSn",
1801 subject_id = %subject_id,
1802 sn = sn,
1803 "Sequence number updated"
1804 );
1805 }
1806 WitnessesRegisterMessage::UpdateTrackerVisibility {
1807 subject_id,
1808 sn,
1809 mode,
1810 stored_visibility,
1811 event_visibility,
1812 } => {
1813 self.on_event(
1814 WitnessesRegisterEvent::UpdateTrackerVisibility {
1815 subject_id: subject_id.clone(),
1816 sn,
1817 mode,
1818 stored_visibility: stored_visibility.clone(),
1819 event_visibility: event_visibility.clone(),
1820 },
1821 ctx,
1822 )
1823 .await;
1824
1825 debug!(
1826 msg_type = "UpdateTrackerVisibility",
1827 subject_id = %subject_id,
1828 sn = sn,
1829 "Tracker visibility updated"
1830 );
1831 }
1832 WitnessesRegisterMessage::Create {
1833 subject_id,
1834 owner,
1835 gov_version,
1836 } => {
1837 self.on_event(
1838 WitnessesRegisterEvent::Create {
1839 subject_id: subject_id.clone(),
1840 owner: owner.clone(),
1841 gov_version,
1842 },
1843 ctx,
1844 )
1845 .await;
1846
1847 debug!(
1848 msg_type = "Create",
1849 subject_id = %subject_id,
1850 owner = %owner,
1851 gov_version = gov_version,
1852 "Transfer entry created"
1853 );
1854 }
1855 WitnessesRegisterMessage::Transfer {
1856 subject_id,
1857 new_owner,
1858 gov_version,
1859 } => {
1860 self.on_event(
1861 WitnessesRegisterEvent::Transfer {
1862 subject_id: subject_id.clone(),
1863 new_owner: new_owner.clone(),
1864 gov_version,
1865 },
1866 ctx,
1867 )
1868 .await;
1869
1870 debug!(
1871 msg_type = "Transfer",
1872 subject_id = %subject_id,
1873 new_owner = %new_owner,
1874 gov_version = gov_version,
1875 "New transfer registered"
1876 );
1877 }
1878 WitnessesRegisterMessage::Reject {
1879 subject_id,
1880 sn,
1881 gov_version,
1882 } => {
1883 self.on_event(
1884 WitnessesRegisterEvent::Reject {
1885 subject_id: subject_id.clone(),
1886 sn,
1887 gov_version,
1888 },
1889 ctx,
1890 )
1891 .await;
1892
1893 debug!(
1894 msg_type = "Reject",
1895 subject_id = %subject_id,
1896 sn = sn,
1897 gov_version = gov_version,
1898 "The transfer was rejected"
1899 );
1900 }
1901 WitnessesRegisterMessage::Confirm {
1902 subject_id,
1903 sn,
1904 gov_version,
1905 } => {
1906 self.on_event(
1907 WitnessesRegisterEvent::Confirm {
1908 subject_id: subject_id.clone(),
1909 sn,
1910 gov_version,
1911 },
1912 ctx,
1913 )
1914 .await;
1915
1916 debug!(
1917 msg_type = "Confirm",
1918 subject_id = %subject_id,
1919 sn = sn,
1920 gov_version = gov_version,
1921 "The transfer was confirmed"
1922 );
1923 }
1924 WitnessesRegisterMessage::DeleteSubject { subject_id } => {
1925 self.on_event(
1926 WitnessesRegisterEvent::DeleteSubject {
1927 subject_id: subject_id.clone(),
1928 },
1929 ctx,
1930 )
1931 .await;
1932
1933 debug!(
1934 msg_type = "DeleteSubject",
1935 subject_id = %subject_id,
1936 "Witness subject entry deleted"
1937 );
1938 }
1939 WitnessesRegisterMessage::Access {
1940 subject_id,
1941 node,
1942 namespace,
1943 schema_id,
1944 } => {
1945 let sn = self
1946 .access_limit_for_node(
1947 ctx,
1948 &subject_id,
1949 &node,
1950 &namespace,
1951 &schema_id,
1952 )
1953 .await?;
1954
1955 debug!(
1956 msg_type = "Access",
1957 subject_id = %subject_id,
1958 node = %node,
1959 namespace = %namespace,
1960 schema_id = %schema_id,
1961 sn = sn,
1962 "Checked access status"
1963 );
1964
1965 return Ok(WitnessesRegisterResponse::Access { sn });
1966 }
1967 WitnessesRegisterMessage::GetTrackerVisibilityState {
1968 subject_id,
1969 } => {
1970 let state = self
1971 .subjects
1972 .get(&subject_id)
1973 .map(|data| data.visibility_state.clone())
1974 .unwrap_or_default();
1975
1976 return Ok(WitnessesRegisterResponse::TrackerVisibilityState {
1977 state,
1978 });
1979 }
1980 WitnessesRegisterMessage::GetTrackerWindow {
1981 subject_id,
1982 node,
1983 namespace,
1984 schema_id,
1985 actual_sn,
1986 } => {
1987 let (sn, clear_sn, is_all, ranges) = self
1988 .build_tracker_window(
1989 ctx,
1990 &subject_id,
1991 &node,
1992 namespace,
1993 schema_id,
1994 actual_sn,
1995 )
1996 .await?;
1997
1998 return Ok(WitnessesRegisterResponse::TrackerWindow {
1999 sn,
2000 clear_sn,
2001 is_all,
2002 ranges,
2003 });
2004 }
2005 };
2006
2007 Ok(WitnessesRegisterResponse::Ok)
2008 }
2009
2010 async fn on_event(
2011 &mut self,
2012 event: WitnessesRegisterEvent,
2013 ctx: &mut ActorContext<Self>,
2014 ) {
2015 if let Err(e) = self.persist(&event, ctx).await {
2016 error!(
2017 event = ?event,
2018 error = %e,
2019 "Failed to persist witnesses register event"
2020 );
2021 emit_fail(ctx, e).await;
2022 }
2023 }
2024}
2025
2026#[async_trait]
2027impl PersistentActor for WitnessesRegister {
2028 type Persistence = LightPersistence;
2029 type InitParams = usize;
2030
2031 fn create_initial(params: Self::InitParams) -> Self {
2032 Self {
2033 ledger_batch_size: params,
2034 ..Self::default()
2035 }
2036 }
2037
2038 fn apply(&mut self, event: &Self::Event) -> Result<(), ActorError> {
2039 match event {
2040 WitnessesRegisterEvent::UpdateSnGov { sn } => {
2041 self.gov_sn = *sn;
2042
2043 debug!(
2044 event_type = "UpdateSnGov",
2045 sn = sn,
2046 "Governance sn updated in state"
2047 );
2048 }
2049 WitnessesRegisterEvent::UpdateCreatorsWitnessesConfirm {
2050 version,
2051 remove_creator,
2052 remove_witnesses,
2053 } => {
2054 for (schema_id, ns, creator) in remove_creator.iter() {
2055 self.close_creator_registration(
2056 schema_id, ns, creator, *version,
2057 );
2058 }
2059
2060 for ((schema_id, witness), namespace) in remove_witnesses {
2061 if let Some(witness_namespace) = self
2062 .witnesses
2063 .get_mut(&(witness.clone(), schema_id.clone()))
2064 {
2065 for ns in namespace.iter() {
2066 if let Some((interval, last)) =
2067 witness_namespace.get_mut(ns)
2068 && let Some(last) = last.take()
2069 {
2070 interval.insert(Interval {
2071 lo: last,
2072 hi: *version - 1,
2073 });
2074 }
2075 }
2076 }
2077 }
2078
2079 debug!(
2080 event_type = "UpdateCreatorsWitnessesConfirm",
2081 version = version,
2082 remove_witnesses_count = remove_witnesses.len(),
2083 remove_creator_count = remove_creator.len(),
2084 "Creators and witnesses updated in state"
2085 );
2086 }
2087 WitnessesRegisterEvent::UpdateCreatorsWitnessesFact {
2088 version,
2089 new_creator,
2090 remove_creator,
2091 update_creator_witnesses,
2092 new_witnesses,
2093 remove_witnesses,
2094 } => {
2095 for ((schema_id, ns, creator), registration) in
2096 new_creator.iter()
2097 {
2098 self.apply_creator_registration(
2099 schema_id,
2100 ns,
2101 creator,
2102 registration,
2103 *version,
2104 );
2105 }
2106
2107 for (schema_id, ns, creator) in remove_creator.iter() {
2108 self.close_creator_registration(
2109 schema_id, ns, creator, *version,
2110 );
2111 }
2112
2113 for ((schema_id, ns, creator), registration) in
2114 update_creator_witnesses.iter()
2115 {
2116 self.apply_creator_registration(
2117 schema_id,
2118 ns,
2119 creator,
2120 registration,
2121 *version,
2122 );
2123 }
2124
2125 for ((schema_id, witness), namespace) in new_witnesses {
2126 for ns in namespace.iter() {
2127 self.witnesses
2128 .entry((witness.clone(), schema_id.clone()))
2129 .or_default()
2130 .entry(ns.clone())
2131 .or_default()
2132 .1 = Some(*version);
2133 }
2134 }
2135
2136 for ((schema_id, witness), namespace) in remove_witnesses {
2137 if let Some(witness_namespace) = self
2138 .witnesses
2139 .get_mut(&(witness.clone(), schema_id.clone()))
2140 {
2141 for ns in namespace.iter() {
2142 if let Some((interval, last)) =
2143 witness_namespace.get_mut(ns)
2144 && let Some(last) = last.take()
2145 {
2146 interval.insert(Interval {
2147 lo: last,
2148 hi: *version - 1,
2149 });
2150 }
2151 }
2152 }
2153 }
2154
2155 debug!(
2156 event_type = "UpdateCreatorsWitnessesFact",
2157 version = version,
2158 remove_creator_count = remove_creator.len(),
2159 update_creator_witnesses_count =
2160 update_creator_witnesses.len(),
2161 new_witnesses_count = new_witnesses.len(),
2162 new_creator_count = new_creator.len(),
2163 remove_creator_count = remove_creator.len(),
2164 "Creators and witnesses updated in state"
2165 );
2166 }
2167 WitnessesRegisterEvent::UpdateSn { subject_id, sn } => {
2168 if let Some(data) = self.subjects.get_mut(subject_id) {
2169 data.sn = *sn;
2170
2171 debug!(
2172 event_type = "UpdateSn",
2173 subject_id = %subject_id,
2174 sn = sn,
2175 "Sequence number updated"
2176 );
2177 } else {
2178 error!(
2179 event_type = "UpdateSn",
2180 subject_id = %subject_id,
2181 "Subject not found in register"
2182 );
2183 };
2184 }
2185 WitnessesRegisterEvent::UpdateTrackerVisibility {
2186 subject_id,
2187 sn,
2188 mode,
2189 stored_visibility,
2190 event_visibility,
2191 } => {
2192 if let Some(data) = self.subjects.get_mut(subject_id) {
2193 data.visibility_state.set_mode(*mode);
2194 data.visibility_state.record_event(
2195 *sn,
2196 stored_visibility.clone(),
2197 event_visibility.clone(),
2198 );
2199 } else {
2200 warn!(
2201 event_type = "UpdateTrackerVisibility",
2202 subject_id = %subject_id,
2203 sn = sn,
2204 "Tracker visibility update ignored because subject was not found"
2205 );
2206 }
2207 }
2208 WitnessesRegisterEvent::Create {
2209 subject_id,
2210 owner,
2211 gov_version,
2212 } => {
2213 let data = self.subjects.entry(subject_id.clone()).or_default();
2214
2215 data.actual_owner = owner.clone();
2216 data.gov_version = *gov_version;
2217 data.visibility_state = TrackerVisibilityState::default();
2218 data.visibility_state.record_event(
2219 0,
2220 TrackerStoredVisibility::Full,
2221 TrackerEventVisibility::NonFact,
2222 );
2223
2224 debug!(
2225 event_type = "Create",
2226 subject_id = %subject_id,
2227 owner = %owner,
2228 gov_version = gov_version,
2229 "Transfer entry created"
2230 );
2231 }
2232 WitnessesRegisterEvent::Transfer {
2233 subject_id,
2234 new_owner,
2235 gov_version,
2236 } => {
2237 if let Some(data) = self.subjects.get_mut(subject_id) {
2238 data.actual_new_owner_data =
2239 Some((new_owner.clone(), *gov_version));
2240
2241 debug!(
2242 event_type = "Transfer",
2243 subject_id = %subject_id,
2244 new_owner = %new_owner,
2245 gov_version = gov_version,
2246 "Transfer initiated"
2247 );
2248 } else {
2249 error!(
2250 event_type = "Transfer",
2251 subject_id = %subject_id,
2252 new_owner = %new_owner,
2253 "Subject not found in register"
2254 );
2255 };
2256 }
2257 WitnessesRegisterEvent::Confirm {
2258 subject_id,
2259 sn,
2260 gov_version,
2261 } => {
2262 if let Some(data) = self.subjects.get_mut(subject_id) {
2263 let new_owner = data.actual_new_owner_data.take();
2264
2265 if let Some((new_owner, new_owner_gov_version)) = new_owner
2266 {
2267 let entry = data
2268 .old_owners
2269 .entry(data.actual_owner.clone())
2270 .or_default();
2271 entry.sn = *sn;
2272 entry.interval_gov_version.insert(Interval {
2273 lo: data.gov_version,
2274 hi: *gov_version,
2275 });
2276
2277 data.actual_owner = new_owner;
2278 data.gov_version = new_owner_gov_version;
2279
2280 debug!(
2281 event_type = "Confirm",
2282 subject_id = %subject_id,
2283 sn = sn,
2284 gov_version = gov_version,
2285 "Transfer confirmed"
2286 );
2287 } else {
2288 error!(
2289 event_type = "Confirm",
2290 subject_id = %subject_id,
2291 sn = sn,
2292 "No pending new owner to confirm"
2293 );
2294 };
2295 } else {
2296 error!(
2297 event_type = "Confirm",
2298 subject_id = %subject_id,
2299 sn = sn,
2300 "Subject not found in register"
2301 );
2302 };
2303 }
2304 WitnessesRegisterEvent::DeleteSubject { subject_id } => {
2305 self.subjects.remove(subject_id);
2306
2307 debug!(
2308 event_type = "DeleteSubject",
2309 subject_id = %subject_id,
2310 "Witness subject entry deleted from state"
2311 );
2312 }
2313 WitnessesRegisterEvent::Reject {
2314 subject_id,
2315 sn,
2316 gov_version,
2317 } => {
2318 if let Some(data) = self.subjects.get_mut(subject_id) {
2319 let new_owner = data.actual_new_owner_data.take();
2320
2321 if let Some((new_owner, new_owner_gov_version)) = new_owner
2322 {
2323 let entry =
2324 data.old_owners.entry(new_owner).or_default();
2325 entry.sn = *sn;
2326 entry.interval_gov_version.insert(Interval {
2327 lo: new_owner_gov_version,
2328 hi: *gov_version,
2329 });
2330
2331 debug!(
2332 event_type = "Reject",
2333 subject_id = %subject_id,
2334 sn = sn,
2335 gov_version = gov_version,
2336 "Transfer rejected"
2337 );
2338 } else {
2339 error!(
2340 event_type = "Reject",
2341 subject_id = %subject_id,
2342 sn = sn,
2343 "No pending new owner to reject"
2344 );
2345 };
2346 } else {
2347 error!(
2348 event_type = "Reject",
2349 subject_id = %subject_id,
2350 sn = sn,
2351 "Subject not found in register"
2352 );
2353 };
2354 }
2355 };
2356
2357 Ok(())
2358 }
2359}
2360
2361impl Storable for WitnessesRegister {}