use arkhe_forge_core::context::EventRecord;
use arkhe_forge_core::event::{
ArkheEvent, PerRegionErasureProgress, ProgressScope, RuntimeSignatureClass,
UserErasureCompleted, UserErasureScheduled,
};
use arkhe_forge_core::pii::DekId;
use arkhe_forge_core::user::UserId;
use arkhe_kernel::abi::{Tick, TypeCode};
use bytes::Bytes;
use std::collections::HashMap;
use crate::projection::{
ObserverState, Projection, ProjectionContext, ProjectionCursor, ProjectionError,
};
pub trait DekShredder: Send + Sync {
fn shred(&mut self, dek_id: DekId) -> Result<DekShredAttestation, DekShredError>;
fn shred_with_regions(
&mut self,
dek_id: DekId,
shred_tick: Tick,
) -> Result<ShredResult, DekShredError> {
let overall = self.shred(dek_id)?;
let region = RegionProgress {
scope: default_region_scope(),
shred_tick,
attestation_class: overall.attestation_class,
attestation_bytes: overall.attestation_bytes.clone(),
};
Ok(ShredResult {
regions: vec![region],
overall,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RegionProgress {
pub scope: ProgressScope,
pub shred_tick: Tick,
pub attestation_class: RuntimeSignatureClass,
pub attestation_bytes: Bytes,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ShredResult {
pub regions: Vec<RegionProgress>,
pub overall: DekShredAttestation,
}
#[allow(clippy::expect_used)]
fn default_region_scope() -> ProgressScope {
let label = arkhe_forge_core::component::BoundedString::<64>::new("default-region")
.expect("'default-region' is 14 bytes, within the BoundedString<64> cap");
ProgressScope::Region(label)
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DekShredAttestation {
pub attestation_class: RuntimeSignatureClass,
pub attestation_bytes: Bytes,
pub log_index: Option<u64>,
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum DekShredError {
#[error("DEK id unknown to the shredder")]
UnknownDek,
#[error("DEK already shredded")]
AlreadyShredded,
#[error("shredder backend error: {0}")]
Backend(&'static str),
}
#[derive(Debug, Default)]
pub struct InMemoryDekShredder {
live: HashMap<DekId, ()>,
shredded: HashMap<DekId, DekShredAttestation>,
next_log_index: u64,
}
impl InMemoryDekShredder {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register(&mut self, dek_id: DekId) {
self.live.insert(dek_id, ());
}
#[must_use]
pub fn is_shredded(&self, dek_id: &DekId) -> bool {
self.shredded.contains_key(dek_id)
}
fn issue_attestation(&mut self, dek_id: DekId) -> DekShredAttestation {
let log_index = self.next_log_index;
self.next_log_index = self.next_log_index.saturating_add(1);
let key = blake3::derive_key("arkhe-forge-dek-shred-attestation", &dek_id.0);
let mut h = blake3::Hasher::new_keyed(&key);
h.update(&log_index.to_be_bytes());
let digest = h.finalize();
DekShredAttestation {
attestation_class: RuntimeSignatureClass::Ed25519,
attestation_bytes: Bytes::copy_from_slice(digest.as_bytes()),
log_index: Some(log_index),
}
}
}
impl DekShredder for InMemoryDekShredder {
fn shred(&mut self, dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
if let Some(cached) = self.shredded.get(&dek_id) {
return Ok(cached.clone());
}
if self.live.remove(&dek_id).is_none() {
return Err(DekShredError::UnknownDek);
}
let attestation = self.issue_attestation(dek_id);
self.shredded.insert(dek_id, attestation.clone());
Ok(attestation)
}
}
#[derive(Debug, Clone, Default)]
pub struct UserPiiRows {
pub rows: Vec<u64>,
pub dek_id: Option<DekId>,
}
pub trait PiiRowStore: Send + Sync {
fn rows_for(&self, user: UserId) -> UserPiiRows;
fn tombstone(&mut self, user: UserId) -> Result<(), ProjectionError>;
fn is_tombstoned(&self, user: UserId) -> bool;
}
#[derive(Debug, Default)]
pub struct InMemoryPiiRowStore {
users: HashMap<UserId, UserPiiRows>,
tombstoned: HashMap<UserId, ()>,
}
impl InMemoryPiiRowStore {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn upsert(&mut self, user: UserId, rows: Vec<u64>, dek_id: DekId) {
self.users.insert(
user,
UserPiiRows {
rows,
dek_id: Some(dek_id),
},
);
}
}
impl PiiRowStore for InMemoryPiiRowStore {
fn rows_for(&self, user: UserId) -> UserPiiRows {
self.users.get(&user).cloned().unwrap_or_default()
}
fn tombstone(&mut self, user: UserId) -> Result<(), ProjectionError> {
self.users.remove(&user);
self.tombstoned.insert(user, ());
Ok(())
}
fn is_tombstoned(&self, user: UserId) -> bool {
self.tombstoned.contains_key(&user)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ErasureCompletion {
pub user: UserId,
pub completed_tick: Tick,
pub tombstoned_rows: usize,
pub attestation: DekShredAttestation,
pub regions: Vec<RegionProgress>,
}
pub struct ErasureCascadeObserver {
observes: [TypeCode; 1],
cursor: Option<ProjectionCursor>,
rows: Box<dyn PiiRowStore>,
shredder: Box<dyn DekShredder>,
completions: Vec<ErasureCompletion>,
}
impl ErasureCascadeObserver {
#[must_use]
pub fn new(rows: Box<dyn PiiRowStore>, shredder: Box<dyn DekShredder>) -> Self {
Self {
observes: [TypeCode(UserErasureScheduled::TYPE_CODE)],
cursor: None,
rows,
shredder,
completions: Vec::new(),
}
}
pub fn drain_completions(&mut self) -> Vec<ErasureCompletion> {
core::mem::take(&mut self.completions)
}
#[must_use]
pub fn pii_rows(&self) -> &dyn PiiRowStore {
self.rows.as_ref()
}
#[must_use]
pub fn shredder(&self) -> &dyn DekShredder {
self.shredder.as_ref()
}
#[must_use]
pub fn into_completed_event(
completion: &ErasureCompletion,
schema_version: u16,
transparency_log_index: u64,
) -> UserErasureCompleted {
UserErasureCompleted {
schema_version,
user: completion.user,
dek_shred_tick: completion.completed_tick,
attestation_class: completion.attestation.attestation_class,
attestation_bytes: completion.attestation.attestation_bytes.clone(),
transparency_log_index,
}
}
#[must_use]
pub fn per_region_events(
completion: &ErasureCompletion,
schema_version: u16,
) -> Vec<PerRegionErasureProgress> {
completion
.regions
.iter()
.map(|r| PerRegionErasureProgress {
schema_version,
user: completion.user,
scope: r.scope.clone(),
shred_tick: r.shred_tick,
attestation_class: r.attestation_class,
attestation_bytes: r.attestation_bytes.clone(),
})
.collect()
}
}
impl Projection for ErasureCascadeObserver {
fn observes(&self) -> &[TypeCode] {
&self.observes
}
fn on_event(
&mut self,
event: &EventRecord,
ctx: &ProjectionContext<'_>,
) -> Result<(), ProjectionError> {
let scheduled: UserErasureScheduled = postcard::from_bytes(&event.payload)
.map_err(|_| ProjectionError::DecodeFailed("UserErasureScheduled payload"))?;
let user = scheduled.user;
let rows = self.rows.rows_for(user);
let tombstoned = rows.rows.len();
let result: ShredResult = match rows.dek_id {
Some(dek_id) => match self.shredder.shred_with_regions(dek_id, ctx.tick) {
Ok(r) => r,
Err(DekShredError::AlreadyShredded) => {
return Err(ProjectionError::Storage(
"shredder returned AlreadyShredded; implementations must cache attestation",
));
}
Err(DekShredError::UnknownDek) => {
return Err(ProjectionError::Storage("DEK unknown to shredder"));
}
Err(DekShredError::Backend(msg)) => return Err(ProjectionError::Storage(msg)),
},
None => ShredResult {
regions: Vec::new(),
overall: DekShredAttestation {
attestation_class: RuntimeSignatureClass::None,
attestation_bytes: Bytes::new(),
log_index: None,
},
},
};
self.rows.tombstone(user)?;
self.completions.push(ErasureCompletion {
user,
completed_tick: ctx.tick,
tombstoned_rows: tombstoned,
attestation: result.overall,
regions: result.regions,
});
self.cursor = Some(ProjectionCursor {
sequence: event.sequence,
tick: event.tick,
});
Ok(())
}
fn on_state_change(&mut self, _new_state: ObserverState) -> Result<(), ProjectionError> {
Ok(())
}
fn last_applied(&self) -> Option<(u64, Tick)> {
self.cursor.map(|c| (c.sequence, c.tick))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::projection::ProjectionRouter;
use arkhe_kernel::abi::{EntityId, InstanceId};
fn uid(v: u64) -> UserId {
UserId::new(EntityId::new(v).unwrap())
}
fn make_scheduled_event(user: UserId, seq: u64, tick: u64) -> EventRecord {
let ev = UserErasureScheduled {
schema_version: 1,
user,
scheduled_tick: Tick(tick),
};
EventRecord {
type_code: UserErasureScheduled::TYPE_CODE,
sequence: seq,
tick: Tick(tick),
payload: Bytes::from(postcard::to_stdvec(&ev).unwrap()),
}
}
fn ctx(tick: u64) -> ProjectionContext<'static> {
ProjectionContext::new(Tick(tick), InstanceId::new(1).unwrap())
}
#[test]
fn observer_observes_user_erasure_scheduled_only() {
let obs = ErasureCascadeObserver::new(
Box::new(InMemoryPiiRowStore::new()),
Box::new(InMemoryDekShredder::new()),
);
assert_eq!(obs.observes(), &[TypeCode(UserErasureScheduled::TYPE_CODE)]);
}
#[test]
fn cascade_tombstones_rows_and_shreds_dek() {
let mut store = InMemoryPiiRowStore::new();
let user = uid(42);
let dek_id = DekId([0xAB; 16]);
store.upsert(user, vec![10, 11, 12], dek_id);
let mut shredder = InMemoryDekShredder::new();
shredder.register(dek_id);
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
obs.on_event(&make_scheduled_event(user, 0, 100), &ctx(100))
.unwrap();
assert!(obs.pii_rows().is_tombstoned(user));
let completions = obs.drain_completions();
assert_eq!(completions.len(), 1);
assert_eq!(completions[0].user, user);
assert_eq!(completions[0].tombstoned_rows, 3);
assert_eq!(completions[0].completed_tick, Tick(100));
assert_eq!(
completions[0].attestation.attestation_class,
RuntimeSignatureClass::Ed25519
);
}
#[test]
fn cascade_no_rows_still_emits_completion() {
let obs_store = InMemoryPiiRowStore::new();
let shredder = InMemoryDekShredder::new();
let mut obs = ErasureCascadeObserver::new(Box::new(obs_store), Box::new(shredder));
let user = uid(7);
obs.on_event(&make_scheduled_event(user, 0, 5), &ctx(5))
.unwrap();
let completions = obs.drain_completions();
assert_eq!(completions.len(), 1);
assert_eq!(completions[0].tombstoned_rows, 0);
assert_eq!(
completions[0].attestation.attestation_class,
RuntimeSignatureClass::None
);
assert_eq!(completions[0].attestation.log_index, None);
}
#[test]
fn first_shred_log_index_is_some_zero_distinct_from_no_dek() {
let mut store = InMemoryPiiRowStore::new();
let user_real = uid(101);
let dek_id = DekId([0xAA; 16]);
store.upsert(user_real, vec![1], dek_id);
let mut shredder = InMemoryDekShredder::new();
shredder.register(dek_id);
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
obs.on_event(&make_scheduled_event(user_real, 0, 50), &ctx(50))
.unwrap();
let real = obs.drain_completions();
assert_eq!(real[0].attestation.log_index, Some(0));
let user_empty = uid(202);
obs.on_event(&make_scheduled_event(user_empty, 1, 51), &ctx(51))
.unwrap();
let empty = obs.drain_completions();
assert_eq!(empty[0].attestation.log_index, None);
}
#[test]
fn cascade_unknown_dek_surfaces_storage_error() {
let mut store = InMemoryPiiRowStore::new();
let user = uid(9);
store.upsert(user, vec![1], DekId([0x99; 16]));
let shredder = InMemoryDekShredder::new();
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
let err = obs
.on_event(&make_scheduled_event(user, 0, 5), &ctx(5))
.unwrap_err();
assert!(matches!(err, ProjectionError::Storage(_)));
assert!(!obs.pii_rows().is_tombstoned(user));
}
#[test]
fn shred_failure_keeps_rows_live() {
struct FailingShredder;
impl DekShredder for FailingShredder {
fn shred(&mut self, _dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
Err(DekShredError::Backend("inject: KMS unavailable"))
}
}
let mut store = InMemoryPiiRowStore::new();
let user = uid(777);
let dek_id = DekId([0xAA; 16]);
store.upsert(user, vec![1, 2, 3], dek_id);
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(FailingShredder));
let err = obs
.on_event(&make_scheduled_event(user, 0, 10), &ctx(10))
.unwrap_err();
assert!(matches!(err, ProjectionError::Storage(_)));
assert!(!obs.pii_rows().is_tombstoned(user));
assert_eq!(obs.pii_rows().rows_for(user).rows.len(), 3);
assert!(obs.drain_completions().is_empty());
}
#[test]
fn already_shredded_surfaces_as_storage_error() {
struct BrokenShredder;
impl DekShredder for BrokenShredder {
fn shred(&mut self, _dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
Err(DekShredError::AlreadyShredded)
}
}
let mut store = InMemoryPiiRowStore::new();
let user = uid(999);
let dek_id = DekId([0xCC; 16]);
store.upsert(user, vec![1], dek_id);
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(BrokenShredder));
let err = obs
.on_event(&make_scheduled_event(user, 0, 30), &ctx(30))
.unwrap_err();
assert!(matches!(err, ProjectionError::Storage(_)));
assert!(!obs.pii_rows().is_tombstoned(user));
assert!(obs.drain_completions().is_empty());
}
#[test]
fn cascade_replay_after_tombstone_holds_rows_dead() {
let mut store = InMemoryPiiRowStore::new();
let user = uid(1234);
let dek_id = DekId([0xEF; 16]);
store.upsert(user, vec![1, 2], dek_id);
let mut shredder = InMemoryDekShredder::new();
shredder.register(dek_id);
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
obs.on_event(&make_scheduled_event(user, 0, 40), &ctx(40))
.unwrap();
let first = obs.drain_completions();
assert_eq!(first.len(), 1);
assert_eq!(
first[0].attestation.attestation_class,
RuntimeSignatureClass::Ed25519
);
assert!(obs.pii_rows().is_tombstoned(user));
obs.on_event(&make_scheduled_event(user, 1, 41), &ctx(41))
.unwrap();
let replayed = obs.drain_completions();
assert_eq!(replayed.len(), 1);
assert_eq!(
replayed[0].attestation.attestation_class,
RuntimeSignatureClass::None
);
assert!(obs.pii_rows().is_tombstoned(user));
}
#[test]
fn cascade_participates_in_projection_router_dispatch() {
let mut store = InMemoryPiiRowStore::new();
let user = uid(123);
let dek_id = DekId([0xEE; 16]);
store.upsert(user, vec![1, 2], dek_id);
let mut shredder = InMemoryDekShredder::new();
shredder.register(dek_id);
let mut router = ProjectionRouter::new();
router.promote_to_active().unwrap();
router.register(Box::new(ErasureCascadeObserver::new(
Box::new(store),
Box::new(shredder),
)));
let applied = router
.dispatch(&make_scheduled_event(user, 0, 300), &ctx(300))
.unwrap();
assert_eq!(applied, 1);
}
#[test]
fn completed_event_roundtrip_via_helper() {
let completion = ErasureCompletion {
user: uid(1),
completed_tick: Tick(250),
tombstoned_rows: 4,
attestation: DekShredAttestation {
attestation_class: RuntimeSignatureClass::Hybrid,
attestation_bytes: Bytes::from_static(&[0u8; 128]),
log_index: Some(7),
},
regions: Vec::new(),
};
let event = ErasureCascadeObserver::into_completed_event(&completion, 1, 99);
assert_eq!(event.user, uid(1));
assert_eq!(event.dek_shred_tick, Tick(250));
assert_eq!(event.attestation_class, RuntimeSignatureClass::Hybrid);
assert_eq!(event.transparency_log_index, 99);
let bytes = postcard::to_stdvec(&event).unwrap();
let back: UserErasureCompleted = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, event);
}
#[test]
fn per_region_events_default_emits_one_entry_per_completion() {
let mut store = InMemoryPiiRowStore::new();
let user = uid(11);
let dek_id = DekId([0xAA; 16]);
store.upsert(user, vec![1, 2, 3], dek_id);
let mut shredder = InMemoryDekShredder::new();
shredder.register(dek_id);
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
obs.on_event(&make_scheduled_event(user, 0, 100), &ctx(100))
.unwrap();
let completions = obs.drain_completions();
assert_eq!(completions.len(), 1);
let completion = &completions[0];
assert_eq!(
completion.regions.len(),
1,
"single-region default emits 1 entry"
);
let region = &completion.regions[0];
assert!(matches!(region.scope, ProgressScope::Region(_)));
assert_eq!(region.shred_tick, Tick(100));
assert_eq!(region.attestation_class, RuntimeSignatureClass::Ed25519);
let events = ErasureCascadeObserver::per_region_events(completion, 1);
assert_eq!(events.len(), 1);
assert_eq!(events[0].user, user);
assert_eq!(events[0].shred_tick, Tick(100));
let bytes = postcard::to_stdvec(&events[0]).unwrap();
let back: PerRegionErasureProgress = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, events[0]);
}
#[test]
fn per_region_events_no_dek_user_emits_zero_entries() {
let store = InMemoryPiiRowStore::new();
let user = uid(12);
let shredder = InMemoryDekShredder::new();
let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
obs.on_event(&make_scheduled_event(user, 0, 200), &ctx(200))
.unwrap();
let completions = obs.drain_completions();
assert_eq!(completions.len(), 1);
assert!(completions[0].regions.is_empty());
let events = ErasureCascadeObserver::per_region_events(&completions[0], 1);
assert!(events.is_empty());
}
#[test]
fn e_user_3_cascade_activates_end_to_end() {
use arkhe_forge_core::action::ActionCompute;
use arkhe_forge_core::context::ActionContext as L1ActionContext;
use arkhe_forge_core::user::GdprEraseUser;
use arkhe_kernel::abi::{CapabilityMask, Principal};
let user = uid(7777);
let act = GdprEraseUser {
schema_version: 1,
user,
};
let mut l1 = L1ActionContext::new(
[0u8; 32],
InstanceId::new(1).unwrap(),
Tick(100),
Principal::System,
CapabilityMask::SYSTEM,
);
act.compute(&mut l1).unwrap();
let mut events = l1.drain_events();
assert_eq!(events.len(), 1);
let scheduling_record = events.pop().unwrap();
let mut store = InMemoryPiiRowStore::new();
let dek_id = DekId([0xCD; 16]);
store.upsert(user, vec![100, 101, 102, 103], dek_id);
let mut shredder = InMemoryDekShredder::new();
shredder.register(dek_id);
let mut router = ProjectionRouter::new();
router.promote_to_active().unwrap();
router.register(Box::new(ErasureCascadeObserver::new(
Box::new(store),
Box::new(shredder),
)));
let event_record = EventRecord {
type_code: scheduling_record.type_code,
sequence: 0,
tick: scheduling_record.tick,
payload: scheduling_record.payload.clone(),
};
let applied = router.dispatch(&event_record, &ctx(100)).unwrap();
assert_eq!(applied, 1);
}
}