1use arkhe_forge_core::context::EventRecord;
18use arkhe_forge_core::event::{
19 ArkheEvent, PerRegionErasureProgress, ProgressScope, RuntimeSignatureClass,
20 UserErasureCompleted, UserErasureScheduled,
21};
22use arkhe_forge_core::pii::DekId;
23use arkhe_forge_core::user::UserId;
24use arkhe_kernel::abi::{Tick, TypeCode};
25use bytes::Bytes;
26use std::collections::HashMap;
27
28use crate::projection::{
29 ObserverState, Projection, ProjectionContext, ProjectionCursor, ProjectionError,
30};
31
32pub trait DekShredder: Send + Sync {
47 fn shred(&mut self, dek_id: DekId) -> Result<DekShredAttestation, DekShredError>;
52
53 fn shred_with_regions(
62 &mut self,
63 dek_id: DekId,
64 shred_tick: Tick,
65 ) -> Result<ShredResult, DekShredError> {
66 let overall = self.shred(dek_id)?;
67 let region = RegionProgress {
68 scope: default_region_scope(),
69 shred_tick,
70 attestation_class: overall.attestation_class,
71 attestation_bytes: overall.attestation_bytes.clone(),
72 };
73 Ok(ShredResult {
74 regions: vec![region],
75 overall,
76 })
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct RegionProgress {
87 pub scope: ProgressScope,
89 pub shred_tick: Tick,
91 pub attestation_class: RuntimeSignatureClass,
93 pub attestation_bytes: Bytes,
95}
96
97#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ShredResult {
104 pub regions: Vec<RegionProgress>,
106 pub overall: DekShredAttestation,
110}
111
112#[allow(clippy::expect_used)]
118fn default_region_scope() -> ProgressScope {
119 let label = arkhe_forge_core::component::BoundedString::<64>::new("default-region")
120 .expect("'default-region' is 14 bytes, within the BoundedString<64> cap");
121 ProgressScope::Region(label)
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct DekShredAttestation {
134 pub attestation_class: RuntimeSignatureClass,
136 pub attestation_bytes: Bytes,
138 pub log_index: Option<u64>,
147}
148
149#[non_exhaustive]
151#[derive(Debug, thiserror::Error)]
152pub enum DekShredError {
153 #[error("DEK id unknown to the shredder")]
156 UnknownDek,
157 #[error("DEK already shredded")]
160 AlreadyShredded,
161 #[error("shredder backend error: {0}")]
164 Backend(&'static str),
165}
166
167#[derive(Debug, Default)]
171pub struct InMemoryDekShredder {
172 live: HashMap<DekId, ()>,
173 shredded: HashMap<DekId, DekShredAttestation>,
174 next_log_index: u64,
175}
176
177impl InMemoryDekShredder {
178 #[inline]
180 #[must_use]
181 pub fn new() -> Self {
182 Self::default()
183 }
184
185 pub fn register(&mut self, dek_id: DekId) {
188 self.live.insert(dek_id, ());
189 }
190
191 #[must_use]
193 pub fn is_shredded(&self, dek_id: &DekId) -> bool {
194 self.shredded.contains_key(dek_id)
195 }
196
197 fn issue_attestation(&mut self, dek_id: DekId) -> DekShredAttestation {
198 let log_index = self.next_log_index;
199 self.next_log_index = self.next_log_index.saturating_add(1);
200 let key = blake3::derive_key("arkhe-forge-dek-shred-attestation", &dek_id.0);
203 let mut h = blake3::Hasher::new_keyed(&key);
204 h.update(&log_index.to_be_bytes());
205 let digest = h.finalize();
206 DekShredAttestation {
207 attestation_class: RuntimeSignatureClass::Ed25519,
208 attestation_bytes: Bytes::copy_from_slice(digest.as_bytes()),
209 log_index: Some(log_index),
210 }
211 }
212}
213
214impl DekShredder for InMemoryDekShredder {
215 fn shred(&mut self, dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
216 if let Some(cached) = self.shredded.get(&dek_id) {
217 return Ok(cached.clone());
218 }
219 if self.live.remove(&dek_id).is_none() {
220 return Err(DekShredError::UnknownDek);
221 }
222 let attestation = self.issue_attestation(dek_id);
223 self.shredded.insert(dek_id, attestation.clone());
224 Ok(attestation)
225 }
226}
227
228#[derive(Debug, Clone, Default)]
234pub struct UserPiiRows {
235 pub rows: Vec<u64>,
237 pub dek_id: Option<DekId>,
239}
240
241pub trait PiiRowStore: Send + Sync {
244 fn rows_for(&self, user: UserId) -> UserPiiRows;
247 fn tombstone(&mut self, user: UserId) -> Result<(), ProjectionError>;
250 fn is_tombstoned(&self, user: UserId) -> bool;
252}
253
254#[derive(Debug, Default)]
256pub struct InMemoryPiiRowStore {
257 users: HashMap<UserId, UserPiiRows>,
258 tombstoned: HashMap<UserId, ()>,
259}
260
261impl InMemoryPiiRowStore {
262 #[inline]
264 #[must_use]
265 pub fn new() -> Self {
266 Self::default()
267 }
268
269 pub fn upsert(&mut self, user: UserId, rows: Vec<u64>, dek_id: DekId) {
272 self.users.insert(
273 user,
274 UserPiiRows {
275 rows,
276 dek_id: Some(dek_id),
277 },
278 );
279 }
280}
281
282impl PiiRowStore for InMemoryPiiRowStore {
283 fn rows_for(&self, user: UserId) -> UserPiiRows {
284 self.users.get(&user).cloned().unwrap_or_default()
285 }
286
287 fn tombstone(&mut self, user: UserId) -> Result<(), ProjectionError> {
288 self.users.remove(&user);
289 self.tombstoned.insert(user, ());
290 Ok(())
291 }
292
293 fn is_tombstoned(&self, user: UserId) -> bool {
294 self.tombstoned.contains_key(&user)
295 }
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
305pub struct ErasureCompletion {
306 pub user: UserId,
308 pub completed_tick: Tick,
310 pub tombstoned_rows: usize,
312 pub attestation: DekShredAttestation,
315 pub regions: Vec<RegionProgress>,
320}
321
322pub struct ErasureCascadeObserver {
335 observes: [TypeCode; 1],
336 cursor: Option<ProjectionCursor>,
337 rows: Box<dyn PiiRowStore>,
338 shredder: Box<dyn DekShredder>,
339 completions: Vec<ErasureCompletion>,
340}
341
342impl ErasureCascadeObserver {
343 #[must_use]
345 pub fn new(rows: Box<dyn PiiRowStore>, shredder: Box<dyn DekShredder>) -> Self {
346 Self {
347 observes: [TypeCode(UserErasureScheduled::TYPE_CODE)],
348 cursor: None,
349 rows,
350 shredder,
351 completions: Vec::new(),
352 }
353 }
354
355 pub fn drain_completions(&mut self) -> Vec<ErasureCompletion> {
359 core::mem::take(&mut self.completions)
360 }
361
362 #[must_use]
364 pub fn pii_rows(&self) -> &dyn PiiRowStore {
365 self.rows.as_ref()
366 }
367
368 #[must_use]
370 pub fn shredder(&self) -> &dyn DekShredder {
371 self.shredder.as_ref()
372 }
373
374 #[must_use]
379 pub fn into_completed_event(
380 completion: &ErasureCompletion,
381 schema_version: u16,
382 transparency_log_index: u64,
383 ) -> UserErasureCompleted {
384 UserErasureCompleted {
385 schema_version,
386 user: completion.user,
387 dek_shred_tick: completion.completed_tick,
388 attestation_class: completion.attestation.attestation_class,
389 attestation_bytes: completion.attestation.attestation_bytes.clone(),
390 transparency_log_index,
391 }
392 }
393
394 #[must_use]
401 pub fn per_region_events(
402 completion: &ErasureCompletion,
403 schema_version: u16,
404 ) -> Vec<PerRegionErasureProgress> {
405 completion
406 .regions
407 .iter()
408 .map(|r| PerRegionErasureProgress {
409 schema_version,
410 user: completion.user,
411 scope: r.scope.clone(),
412 shred_tick: r.shred_tick,
413 attestation_class: r.attestation_class,
414 attestation_bytes: r.attestation_bytes.clone(),
415 })
416 .collect()
417 }
418}
419
420impl Projection for ErasureCascadeObserver {
421 fn observes(&self) -> &[TypeCode] {
422 &self.observes
423 }
424
425 fn on_event(
426 &mut self,
427 event: &EventRecord,
428 ctx: &ProjectionContext<'_>,
429 ) -> Result<(), ProjectionError> {
430 let scheduled: UserErasureScheduled = postcard::from_bytes(&event.payload)
431 .map_err(|_| ProjectionError::DecodeFailed("UserErasureScheduled payload"))?;
432 let user = scheduled.user;
433 let rows = self.rows.rows_for(user);
434 let tombstoned = rows.rows.len();
435
436 let result: ShredResult = match rows.dek_id {
446 Some(dek_id) => match self.shredder.shred_with_regions(dek_id, ctx.tick) {
447 Ok(r) => r,
448 Err(DekShredError::AlreadyShredded) => {
449 return Err(ProjectionError::Storage(
455 "shredder returned AlreadyShredded; implementations must cache attestation",
456 ));
457 }
458 Err(DekShredError::UnknownDek) => {
459 return Err(ProjectionError::Storage("DEK unknown to shredder"));
460 }
461 Err(DekShredError::Backend(msg)) => return Err(ProjectionError::Storage(msg)),
462 },
463 None => ShredResult {
464 regions: Vec::new(),
465 overall: DekShredAttestation {
466 attestation_class: RuntimeSignatureClass::None,
467 attestation_bytes: Bytes::new(),
468 log_index: None,
469 },
470 },
471 };
472
473 self.rows.tombstone(user)?;
477
478 self.completions.push(ErasureCompletion {
479 user,
480 completed_tick: ctx.tick,
481 tombstoned_rows: tombstoned,
482 attestation: result.overall,
483 regions: result.regions,
484 });
485
486 self.cursor = Some(ProjectionCursor {
487 sequence: event.sequence,
488 tick: event.tick,
489 });
490 Ok(())
491 }
492
493 fn on_state_change(&mut self, _new_state: ObserverState) -> Result<(), ProjectionError> {
494 Ok(())
498 }
499
500 fn last_applied(&self) -> Option<(u64, Tick)> {
501 self.cursor.map(|c| (c.sequence, c.tick))
502 }
503}
504
505#[cfg(test)]
508#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
509mod tests {
510 use super::*;
511 use crate::projection::ProjectionRouter;
512 use arkhe_kernel::abi::{EntityId, InstanceId};
513
514 fn uid(v: u64) -> UserId {
515 UserId::new(EntityId::new(v).unwrap())
516 }
517
518 fn make_scheduled_event(user: UserId, seq: u64, tick: u64) -> EventRecord {
519 let ev = UserErasureScheduled {
520 schema_version: 1,
521 user,
522 scheduled_tick: Tick(tick),
523 };
524 EventRecord {
525 type_code: UserErasureScheduled::TYPE_CODE,
526 sequence: seq,
527 tick: Tick(tick),
528 payload: Bytes::from(postcard::to_stdvec(&ev).unwrap()),
529 }
530 }
531
532 fn ctx(tick: u64) -> ProjectionContext<'static> {
533 ProjectionContext::new(Tick(tick), InstanceId::new(1).unwrap())
534 }
535
536 #[test]
537 fn observer_observes_user_erasure_scheduled_only() {
538 let obs = ErasureCascadeObserver::new(
539 Box::new(InMemoryPiiRowStore::new()),
540 Box::new(InMemoryDekShredder::new()),
541 );
542 assert_eq!(obs.observes(), &[TypeCode(UserErasureScheduled::TYPE_CODE)]);
543 }
544
545 #[test]
546 fn cascade_tombstones_rows_and_shreds_dek() {
547 let mut store = InMemoryPiiRowStore::new();
548 let user = uid(42);
549 let dek_id = DekId([0xAB; 16]);
550 store.upsert(user, vec![10, 11, 12], dek_id);
551 let mut shredder = InMemoryDekShredder::new();
552 shredder.register(dek_id);
553 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
554
555 obs.on_event(&make_scheduled_event(user, 0, 100), &ctx(100))
556 .unwrap();
557
558 assert!(obs.pii_rows().is_tombstoned(user));
559 let completions = obs.drain_completions();
560 assert_eq!(completions.len(), 1);
561 assert_eq!(completions[0].user, user);
562 assert_eq!(completions[0].tombstoned_rows, 3);
563 assert_eq!(completions[0].completed_tick, Tick(100));
564 assert_eq!(
565 completions[0].attestation.attestation_class,
566 RuntimeSignatureClass::Ed25519
567 );
568 }
569
570 #[test]
571 fn cascade_no_rows_still_emits_completion() {
572 let obs_store = InMemoryPiiRowStore::new();
573 let shredder = InMemoryDekShredder::new();
574 let mut obs = ErasureCascadeObserver::new(Box::new(obs_store), Box::new(shredder));
575 let user = uid(7);
576 obs.on_event(&make_scheduled_event(user, 0, 5), &ctx(5))
577 .unwrap();
578 let completions = obs.drain_completions();
579 assert_eq!(completions.len(), 1);
580 assert_eq!(completions[0].tombstoned_rows, 0);
581 assert_eq!(
582 completions[0].attestation.attestation_class,
583 RuntimeSignatureClass::None
584 );
585 assert_eq!(completions[0].attestation.log_index, None);
588 }
589
590 #[test]
591 fn first_shred_log_index_is_some_zero_distinct_from_no_dek() {
592 let mut store = InMemoryPiiRowStore::new();
597 let user_real = uid(101);
598 let dek_id = DekId([0xAA; 16]);
599 store.upsert(user_real, vec![1], dek_id);
600 let mut shredder = InMemoryDekShredder::new();
601 shredder.register(dek_id);
602 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
603
604 obs.on_event(&make_scheduled_event(user_real, 0, 50), &ctx(50))
605 .unwrap();
606 let real = obs.drain_completions();
607 assert_eq!(real[0].attestation.log_index, Some(0));
608
609 let user_empty = uid(202);
610 obs.on_event(&make_scheduled_event(user_empty, 1, 51), &ctx(51))
611 .unwrap();
612 let empty = obs.drain_completions();
613 assert_eq!(empty[0].attestation.log_index, None);
614 }
615
616 #[test]
617 fn cascade_unknown_dek_surfaces_storage_error() {
618 let mut store = InMemoryPiiRowStore::new();
619 let user = uid(9);
620 store.upsert(user, vec![1], DekId([0x99; 16]));
622 let shredder = InMemoryDekShredder::new();
623 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
624 let err = obs
625 .on_event(&make_scheduled_event(user, 0, 5), &ctx(5))
626 .unwrap_err();
627 assert!(matches!(err, ProjectionError::Storage(_)));
628 assert!(!obs.pii_rows().is_tombstoned(user));
630 }
631
632 #[test]
633 fn shred_failure_keeps_rows_live() {
634 struct FailingShredder;
637 impl DekShredder for FailingShredder {
638 fn shred(&mut self, _dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
639 Err(DekShredError::Backend("inject: KMS unavailable"))
640 }
641 }
642
643 let mut store = InMemoryPiiRowStore::new();
644 let user = uid(777);
645 let dek_id = DekId([0xAA; 16]);
646 store.upsert(user, vec![1, 2, 3], dek_id);
647 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(FailingShredder));
648
649 let err = obs
650 .on_event(&make_scheduled_event(user, 0, 10), &ctx(10))
651 .unwrap_err();
652 assert!(matches!(err, ProjectionError::Storage(_)));
653
654 assert!(!obs.pii_rows().is_tombstoned(user));
656 assert_eq!(obs.pii_rows().rows_for(user).rows.len(), 3);
657
658 assert!(obs.drain_completions().is_empty());
660 }
661
662 #[test]
663 fn already_shredded_surfaces_as_storage_error() {
664 struct BrokenShredder;
669 impl DekShredder for BrokenShredder {
670 fn shred(&mut self, _dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
671 Err(DekShredError::AlreadyShredded)
672 }
673 }
674
675 let mut store = InMemoryPiiRowStore::new();
676 let user = uid(999);
677 let dek_id = DekId([0xCC; 16]);
678 store.upsert(user, vec![1], dek_id);
679 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(BrokenShredder));
680
681 let err = obs
682 .on_event(&make_scheduled_event(user, 0, 30), &ctx(30))
683 .unwrap_err();
684 assert!(matches!(err, ProjectionError::Storage(_)));
685 assert!(!obs.pii_rows().is_tombstoned(user));
687 assert!(obs.drain_completions().is_empty());
688 }
689
690 #[test]
691 fn cascade_replay_after_tombstone_holds_rows_dead() {
692 let mut store = InMemoryPiiRowStore::new();
699 let user = uid(1234);
700 let dek_id = DekId([0xEF; 16]);
701 store.upsert(user, vec![1, 2], dek_id);
702 let mut shredder = InMemoryDekShredder::new();
703 shredder.register(dek_id);
704 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
705
706 obs.on_event(&make_scheduled_event(user, 0, 40), &ctx(40))
707 .unwrap();
708 let first = obs.drain_completions();
709 assert_eq!(first.len(), 1);
710 assert_eq!(
711 first[0].attestation.attestation_class,
712 RuntimeSignatureClass::Ed25519
713 );
714 assert!(obs.pii_rows().is_tombstoned(user));
715
716 obs.on_event(&make_scheduled_event(user, 1, 41), &ctx(41))
718 .unwrap();
719 let replayed = obs.drain_completions();
720 assert_eq!(replayed.len(), 1);
721 assert_eq!(
722 replayed[0].attestation.attestation_class,
723 RuntimeSignatureClass::None
724 );
725 assert!(obs.pii_rows().is_tombstoned(user));
726 }
727
728 #[test]
729 fn cascade_participates_in_projection_router_dispatch() {
730 let mut store = InMemoryPiiRowStore::new();
731 let user = uid(123);
732 let dek_id = DekId([0xEE; 16]);
733 store.upsert(user, vec![1, 2], dek_id);
734 let mut shredder = InMemoryDekShredder::new();
735 shredder.register(dek_id);
736
737 let mut router = ProjectionRouter::new();
738 router.promote_to_active().unwrap();
739 router.register(Box::new(ErasureCascadeObserver::new(
740 Box::new(store),
741 Box::new(shredder),
742 )));
743
744 let applied = router
745 .dispatch(&make_scheduled_event(user, 0, 300), &ctx(300))
746 .unwrap();
747 assert_eq!(applied, 1);
748 }
749
750 #[test]
751 fn completed_event_roundtrip_via_helper() {
752 let completion = ErasureCompletion {
753 user: uid(1),
754 completed_tick: Tick(250),
755 tombstoned_rows: 4,
756 attestation: DekShredAttestation {
757 attestation_class: RuntimeSignatureClass::Hybrid,
758 attestation_bytes: Bytes::from_static(&[0u8; 128]),
759 log_index: Some(7),
760 },
761 regions: Vec::new(),
762 };
763 let event = ErasureCascadeObserver::into_completed_event(&completion, 1, 99);
764 assert_eq!(event.user, uid(1));
765 assert_eq!(event.dek_shred_tick, Tick(250));
766 assert_eq!(event.attestation_class, RuntimeSignatureClass::Hybrid);
767 assert_eq!(event.transparency_log_index, 99);
768
769 let bytes = postcard::to_stdvec(&event).unwrap();
771 let back: UserErasureCompleted = postcard::from_bytes(&bytes).unwrap();
772 assert_eq!(back, event);
773 }
774
775 #[test]
776 fn per_region_events_default_emits_one_entry_per_completion() {
777 let mut store = InMemoryPiiRowStore::new();
781 let user = uid(11);
782 let dek_id = DekId([0xAA; 16]);
783 store.upsert(user, vec![1, 2, 3], dek_id);
784 let mut shredder = InMemoryDekShredder::new();
785 shredder.register(dek_id);
786 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
787
788 obs.on_event(&make_scheduled_event(user, 0, 100), &ctx(100))
789 .unwrap();
790 let completions = obs.drain_completions();
791 assert_eq!(completions.len(), 1);
792 let completion = &completions[0];
793 assert_eq!(
794 completion.regions.len(),
795 1,
796 "single-region default emits 1 entry"
797 );
798 let region = &completion.regions[0];
799 assert!(matches!(region.scope, ProgressScope::Region(_)));
800 assert_eq!(region.shred_tick, Tick(100));
801 assert_eq!(region.attestation_class, RuntimeSignatureClass::Ed25519);
802
803 let events = ErasureCascadeObserver::per_region_events(completion, 1);
804 assert_eq!(events.len(), 1);
805 assert_eq!(events[0].user, user);
806 assert_eq!(events[0].shred_tick, Tick(100));
807
808 let bytes = postcard::to_stdvec(&events[0]).unwrap();
810 let back: PerRegionErasureProgress = postcard::from_bytes(&bytes).unwrap();
811 assert_eq!(back, events[0]);
812 }
813
814 #[test]
815 fn per_region_events_no_dek_user_emits_zero_entries() {
816 let store = InMemoryPiiRowStore::new();
820 let user = uid(12);
821 let shredder = InMemoryDekShredder::new();
822 let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
823
824 obs.on_event(&make_scheduled_event(user, 0, 200), &ctx(200))
825 .unwrap();
826 let completions = obs.drain_completions();
827 assert_eq!(completions.len(), 1);
828 assert!(completions[0].regions.is_empty());
829 let events = ErasureCascadeObserver::per_region_events(&completions[0], 1);
830 assert!(events.is_empty());
831 }
832
833 #[test]
839 fn e_user_3_cascade_activates_end_to_end() {
840 use arkhe_forge_core::action::ActionCompute;
841 use arkhe_forge_core::context::ActionContext as L1ActionContext;
842 use arkhe_forge_core::user::GdprEraseUser;
843 use arkhe_kernel::abi::{CapabilityMask, Principal};
844
845 let user = uid(7777);
846
847 let act = GdprEraseUser {
849 schema_version: 1,
850 user,
851 };
852 let mut l1 = L1ActionContext::new(
853 [0u8; 32],
854 InstanceId::new(1).unwrap(),
855 Tick(100),
856 Principal::System,
857 CapabilityMask::SYSTEM,
858 );
859 act.compute(&mut l1).unwrap();
860 let mut events = l1.drain_events();
861 assert_eq!(events.len(), 1);
862 let scheduling_record = events.pop().unwrap();
863
864 let mut store = InMemoryPiiRowStore::new();
866 let dek_id = DekId([0xCD; 16]);
867 store.upsert(user, vec![100, 101, 102, 103], dek_id);
868 let mut shredder = InMemoryDekShredder::new();
869 shredder.register(dek_id);
870 let mut router = ProjectionRouter::new();
871 router.promote_to_active().unwrap();
872 router.register(Box::new(ErasureCascadeObserver::new(
873 Box::new(store),
874 Box::new(shredder),
875 )));
876 let event_record = EventRecord {
877 type_code: scheduling_record.type_code,
878 sequence: 0,
879 tick: scheduling_record.tick,
880 payload: scheduling_record.payload.clone(),
881 };
882 let applied = router.dispatch(&event_record, &ctx(100)).unwrap();
883 assert_eq!(applied, 1);
884 }
885}