Skip to main content

arkhe_forge_platform/
crypto_erasure.rs

1//! Erasure cascade observer — E-user-3 cascade activation.
2//!
3//! When the L1 compute emits `UserErasureScheduled` (via `GdprEraseUser`
4//! lease), this observer drains the user's `EncryptedPii<T>` rows, writes
5//! per-row tombstones, and emits the terminal `UserErasureCompleted`
6//! receipt once the DEK has been shredded. The SLA is `p95 < 24h`; this
7//! module ships the deterministic in-memory path used for tests + the
8//! Tier-0 dev harness. The real HSM `delete_key` + multi-region 2PC
9//! fanout land alongside the `hf2_kms` backend — the observer's
10//! [`Projection`] surface is stable.
11//!
12//! The observer is **not** a Band-1 compute path; it is a Band-2 derived
13//! projection that consumes WAL-anchored events. Every mutation goes
14//! through the same `Projection::on_event` entry-point so the router's
15//! dedup + gap detection catches replay drift.
16
17use arkhe_forge_core::context::EventRecord;
18use arkhe_forge_core::event::{
19    ArkheEvent, PerRegionErasureProgress, ProgressScope, RuntimeSignatureClass,
20    UserErasureCompleted, UserErasureScheduled,
21};
22use arkhe_forge_core::pii::DekId;
23use arkhe_forge_core::user::UserId;
24use arkhe_kernel::abi::{Tick, TypeCode};
25use bytes::Bytes;
26use std::collections::HashMap;
27
28use crate::projection::{
29    ObserverState, Projection, ProjectionContext, ProjectionCursor, ProjectionError,
30};
31
32// ===================== DEK shredder =====================
33
34/// Per-user DEK shred backend. Production wires this
35/// to an HSM `delete_key` RPC; the in-memory implementation (below) is
36/// sufficient for the Tier-0 dev harness.
37///
38/// **Idempotency contract**: a compliant implementation caches the
39/// attestation emitted on the first shred call for `dek_id` and returns
40/// `Ok(cached)` on any subsequent call for the same id. `Err(AlreadyShredded)`
41/// is reserved for stateless backends that cannot cache; the cascade
42/// observer surfaces it as [`ProjectionError::Storage`] and refuses
43/// completion — the emit path must never synthesise a replacement
44/// attestation, otherwise the `UserErasureCompleted` receipt loses its
45/// cryptographic binding to the HSM destruction event.
46pub trait DekShredder: Send + Sync {
47    /// Drop plaintext DEK material for `dek_id` and return an
48    /// attestation payload the observer folds into the
49    /// `UserErasureCompleted.attestation_bytes` field. See the trait-
50    /// level idempotency contract for replay semantics.
51    fn shred(&mut self, dek_id: DekId) -> Result<DekShredAttestation, DekShredError>;
52
53    /// Multi-region 2PC variant. Real backends
54    /// override this to drive a multi-KMS / multi-region shred and return
55    /// the per-region progress entries so the cascade observer can emit
56    /// matching `PerRegionErasureProgress` events.
57    ///
58    /// Default implementation calls [`Self::shred`] and wraps the result
59    /// as a single-region [`ShredResult`] — preserves the current single-
60    /// region semantics for backends that do not opt in.
61    fn shred_with_regions(
62        &mut self,
63        dek_id: DekId,
64        shred_tick: Tick,
65    ) -> Result<ShredResult, DekShredError> {
66        let overall = self.shred(dek_id)?;
67        let region = RegionProgress {
68            scope: default_region_scope(),
69            shred_tick,
70            attestation_class: overall.attestation_class,
71            attestation_bytes: overall.attestation_bytes.clone(),
72        };
73        Ok(ShredResult {
74            regions: vec![region],
75            overall,
76        })
77    }
78}
79
80/// Per-region shred progress entry (two-phase commit).
81///
82/// Single-region backends emit one entry with [`ProgressScope::Region`] /
83/// `"default-region"`; multi-region backends emit one entry per
84/// participating region or KMS endpoint.
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct RegionProgress {
87    /// Region or KMS scope identifier.
88    pub scope: ProgressScope,
89    /// Tick at which this scope's DEK shred completed.
90    pub shred_tick: Tick,
91    /// Signature class used for this scope's attestation payload.
92    pub attestation_class: RuntimeSignatureClass,
93    /// Signed attestation bytes for this scope.
94    pub attestation_bytes: Bytes,
95}
96
97/// Multi-region aggregate result returned by
98/// [`DekShredder::shred_with_regions`]. The `overall` attestation is what
99/// the observer folds into the terminal `UserErasureCompleted` receipt;
100/// the per-region `regions` list drives the intermediate
101/// `PerRegionErasureProgress` events.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ShredResult {
104    /// Per-region progress, in shred-completion order.
105    pub regions: Vec<RegionProgress>,
106    /// Aggregate attestation. For single-region backends this matches the
107    /// only `RegionProgress` entry; for multi-region backends, the
108    /// coordinator-signed roll-up.
109    pub overall: DekShredAttestation,
110}
111
112/// Single-region default scope — `Region("default-region")`. Stable
113/// sentinel for backends that do not opt into multi-region shred.
114///
115/// `"default-region"` is 14 ASCII bytes, well within `BoundedString::<64>`'s
116/// cap, so construction is infallible; the `expect` documents the invariant.
117#[allow(clippy::expect_used)]
118fn default_region_scope() -> ProgressScope {
119    let label = arkhe_forge_core::component::BoundedString::<64>::new("default-region")
120        .expect("'default-region' is 14 bytes, within the BoundedString<64> cap");
121    ProgressScope::Region(label)
122}
123
124/// HSM / KMS key-destruction receipt — what the shredder returns once a
125/// DEK is gone. The attestation class + bytes are forwarded into the
126/// `UserErasureCompleted` event.
127///
128/// `log_index` is `Option<u64>` so the no-DEK observer branch (cascade
129/// for a user whose row store never held a DEK) can emit `None` rather
130/// than the sentinel value `0` — the previous placeholder collided with
131/// the genuine "first-shred" log entry of a real shredder.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct DekShredAttestation {
134    /// Signature class used for the attestation (`Ed25519`, `Hybrid`, …).
135    pub attestation_class: RuntimeSignatureClass,
136    /// Signed attestation payload.
137    pub attestation_bytes: Bytes,
138    /// Monotonic destruction-log sequence — used by the transparency
139    /// layer for gap detection. `None` when the
140    /// attestation is a synthetic no-DEK placeholder (no transparency
141    /// entry to anchor); `Some(n)` when a real shredder issued the
142    /// receipt. The type is **not** wire-serialised — observer-local
143    /// state forwarded into `UserErasureCompleted` via
144    /// [`ErasureCascadeObserver::into_completed_event`], where the
145    /// caller supplies the WAL `transparency_log_index` independently.
146    pub log_index: Option<u64>,
147}
148
149/// DEK shred failure taxonomy.
150#[non_exhaustive]
151#[derive(Debug, thiserror::Error)]
152pub enum DekShredError {
153    /// `dek_id` was not recognised — benign on replay of an already
154    /// shredded user, fatal if the observer is first-seeing the event.
155    #[error("DEK id unknown to the shredder")]
156    UnknownDek,
157    /// DEK had been shredded on a previous call — the observer treats
158    /// this as a no-op + reuses the cached attestation.
159    #[error("DEK already shredded")]
160    AlreadyShredded,
161    /// Backend-specific failure — network, auth, RPC error. Observers
162    /// surface this via `ProjectionError::Storage`.
163    #[error("shredder backend error: {0}")]
164    Backend(&'static str),
165}
166
167/// In-memory [`DekShredder`] — deterministic Ed25519-style placeholder
168/// attestation for tests and the Tier-0 harness. All paths emit
169/// `RuntimeSignatureClass::Ed25519` regardless of compliance tier.
170#[derive(Debug, Default)]
171pub struct InMemoryDekShredder {
172    live: HashMap<DekId, ()>,
173    shredded: HashMap<DekId, DekShredAttestation>,
174    next_log_index: u64,
175}
176
177impl InMemoryDekShredder {
178    /// Construct an empty shredder — no DEKs registered.
179    #[inline]
180    #[must_use]
181    pub fn new() -> Self {
182        Self::default()
183    }
184
185    /// Register a DEK the observer will later be asked to shred. The
186    /// shredder does not store plaintext key material — just the id.
187    pub fn register(&mut self, dek_id: DekId) {
188        self.live.insert(dek_id, ());
189    }
190
191    /// Check whether a given DEK id was shredded.
192    #[must_use]
193    pub fn is_shredded(&self, dek_id: &DekId) -> bool {
194        self.shredded.contains_key(dek_id)
195    }
196
197    fn issue_attestation(&mut self, dek_id: DekId) -> DekShredAttestation {
198        let log_index = self.next_log_index;
199        self.next_log_index = self.next_log_index.saturating_add(1);
200        // Deterministic payload — domain-separated BLAKE3 keyed by
201        // `dek_id`. Production paths sign with an HSM-held key.
202        let key = blake3::derive_key("arkhe-forge-dek-shred-attestation", &dek_id.0);
203        let mut h = blake3::Hasher::new_keyed(&key);
204        h.update(&log_index.to_be_bytes());
205        let digest = h.finalize();
206        DekShredAttestation {
207            attestation_class: RuntimeSignatureClass::Ed25519,
208            attestation_bytes: Bytes::copy_from_slice(digest.as_bytes()),
209            log_index: Some(log_index),
210        }
211    }
212}
213
214impl DekShredder for InMemoryDekShredder {
215    fn shred(&mut self, dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
216        if let Some(cached) = self.shredded.get(&dek_id) {
217            return Ok(cached.clone());
218        }
219        if self.live.remove(&dek_id).is_none() {
220            return Err(DekShredError::UnknownDek);
221        }
222        let attestation = self.issue_attestation(dek_id);
223        self.shredded.insert(dek_id, attestation.clone());
224        Ok(attestation)
225    }
226}
227
228// ===================== PII tombstone store =====================
229
230/// Per-user encrypted-PII row descriptor — opaque to the cascade. The
231/// observer never decrypts; it just rewrites the row into a tombstone
232/// and re-emits a DEK shred signal.
233#[derive(Debug, Clone, Default)]
234pub struct UserPiiRows {
235    /// Encrypted-PII row ids attached to the user.
236    pub rows: Vec<u64>,
237    /// `DekId` the user's ciphertexts are wrapped under.
238    pub dek_id: Option<DekId>,
239}
240
241/// Per-user store — production uses a PG-backed table; this in-memory
242/// map suits tests.
243pub trait PiiRowStore: Send + Sync {
244    /// Fetch the set of encrypted-PII rows attached to `user`. Returns
245    /// a fresh `UserPiiRows` (possibly empty) when the user has none.
246    fn rows_for(&self, user: UserId) -> UserPiiRows;
247    /// Mark every row as tombstoned. Idempotent — a repeat call after
248    /// cascade completion must be a no-op.
249    fn tombstone(&mut self, user: UserId) -> Result<(), ProjectionError>;
250    /// Whether `user`'s rows have already been tombstoned.
251    fn is_tombstoned(&self, user: UserId) -> bool;
252}
253
254/// In-memory [`PiiRowStore`] for tests.
255#[derive(Debug, Default)]
256pub struct InMemoryPiiRowStore {
257    users: HashMap<UserId, UserPiiRows>,
258    tombstoned: HashMap<UserId, ()>,
259}
260
261impl InMemoryPiiRowStore {
262    /// Empty store.
263    #[inline]
264    #[must_use]
265    pub fn new() -> Self {
266        Self::default()
267    }
268
269    /// Attach `rows` to `user` under the given DEK id. Test fixture
270    /// setter.
271    pub fn upsert(&mut self, user: UserId, rows: Vec<u64>, dek_id: DekId) {
272        self.users.insert(
273            user,
274            UserPiiRows {
275                rows,
276                dek_id: Some(dek_id),
277            },
278        );
279    }
280}
281
282impl PiiRowStore for InMemoryPiiRowStore {
283    fn rows_for(&self, user: UserId) -> UserPiiRows {
284        self.users.get(&user).cloned().unwrap_or_default()
285    }
286
287    fn tombstone(&mut self, user: UserId) -> Result<(), ProjectionError> {
288        self.users.remove(&user);
289        self.tombstoned.insert(user, ());
290        Ok(())
291    }
292
293    fn is_tombstoned(&self, user: UserId) -> bool {
294        self.tombstoned.contains_key(&user)
295    }
296}
297
298// ===================== ErasureCascadeObserver =====================
299
300/// Completion record — one per user, accumulated as the observer runs.
301/// The router exposes these to callers that want to emit the matching
302/// `PerRegionErasureProgress` (one per `regions` entry) plus the terminal
303/// `UserErasureCompleted` event onto the WAL (`ctx.emit_event`).
304#[derive(Debug, Clone, PartialEq, Eq)]
305pub struct ErasureCompletion {
306    /// Subject.
307    pub user: UserId,
308    /// Tick at which the cascade finished.
309    pub completed_tick: Tick,
310    /// Row count tombstoned.
311    pub tombstoned_rows: usize,
312    /// Aggregate DEK shred attestation — folded into
313    /// `UserErasureCompleted.attestation_bytes`.
314    pub attestation: DekShredAttestation,
315    /// Per-region progress entries (single-element Vec for single-region
316    /// backends). Each entry maps 1:1 to a `PerRegionErasureProgress`
317    /// event the caller emits before the terminal `UserErasureCompleted`
318    /// (two-phase commit).
319    pub regions: Vec<RegionProgress>,
320}
321
322/// L2 observer that drives the **E-user-3 cascade**. On every
323/// `UserErasureScheduled` event it:
324///
325/// 1. Looks up the user's encrypted-PII rows via the attached
326///    [`PiiRowStore`].
327/// 2. Calls [`PiiRowStore::tombstone`] to mark every row soft-deleted.
328/// 3. Invokes the attached [`DekShredder`] to drop the underlying DEK.
329/// 4. Records an [`ErasureCompletion`] the caller can convert into a
330///    `UserErasureCompleted` event on the next tick.
331///
332/// The observer holds its own cursor — the router still centralises
333/// dedup + gap detection via the `Projection` trait.
334pub struct ErasureCascadeObserver {
335    observes: [TypeCode; 1],
336    cursor: Option<ProjectionCursor>,
337    rows: Box<dyn PiiRowStore>,
338    shredder: Box<dyn DekShredder>,
339    completions: Vec<ErasureCompletion>,
340}
341
342impl ErasureCascadeObserver {
343    /// Construct the observer with concrete backends.
344    #[must_use]
345    pub fn new(rows: Box<dyn PiiRowStore>, shredder: Box<dyn DekShredder>) -> Self {
346        Self {
347            observes: [TypeCode(UserErasureScheduled::TYPE_CODE)],
348            cursor: None,
349            rows,
350            shredder,
351            completions: Vec::new(),
352        }
353    }
354
355    /// Drain accumulated [`ErasureCompletion`]s. The caller feeds each
356    /// one back into an `ActionContext::emit_event(UserErasureCompleted
357    /// { ... })` so the next WAL tick anchors the receipt.
358    pub fn drain_completions(&mut self) -> Vec<ErasureCompletion> {
359        core::mem::take(&mut self.completions)
360    }
361
362    /// Borrow the store for inspection — tests only.
363    #[must_use]
364    pub fn pii_rows(&self) -> &dyn PiiRowStore {
365        self.rows.as_ref()
366    }
367
368    /// Borrow the shredder for inspection — tests only.
369    #[must_use]
370    pub fn shredder(&self) -> &dyn DekShredder {
371        self.shredder.as_ref()
372    }
373
374    /// Convenience — build a `UserErasureCompleted` event from a
375    /// drained completion. The caller chooses the `schema_version` /
376    /// transparency log index from its own anchor; this helper wires
377    /// the remaining five fields.
378    #[must_use]
379    pub fn into_completed_event(
380        completion: &ErasureCompletion,
381        schema_version: u16,
382        transparency_log_index: u64,
383    ) -> UserErasureCompleted {
384        UserErasureCompleted {
385            schema_version,
386            user: completion.user,
387            dek_shred_tick: completion.completed_tick,
388            attestation_class: completion.attestation.attestation_class,
389            attestation_bytes: completion.attestation.attestation_bytes.clone(),
390            transparency_log_index,
391        }
392    }
393
394    /// Convenience — fan out a completion's per-region progress entries
395    /// as `PerRegionErasureProgress` events (two-phase
396    /// commit). Single-region backends emit one event; multi-
397    /// region backends emit one event per participating region. The
398    /// caller emits these *before* the terminal `UserErasureCompleted`
399    /// receipt so external consumers see the full erasure transcript.
400    #[must_use]
401    pub fn per_region_events(
402        completion: &ErasureCompletion,
403        schema_version: u16,
404    ) -> Vec<PerRegionErasureProgress> {
405        completion
406            .regions
407            .iter()
408            .map(|r| PerRegionErasureProgress {
409                schema_version,
410                user: completion.user,
411                scope: r.scope.clone(),
412                shred_tick: r.shred_tick,
413                attestation_class: r.attestation_class,
414                attestation_bytes: r.attestation_bytes.clone(),
415            })
416            .collect()
417    }
418}
419
420impl Projection for ErasureCascadeObserver {
421    fn observes(&self) -> &[TypeCode] {
422        &self.observes
423    }
424
425    fn on_event(
426        &mut self,
427        event: &EventRecord,
428        ctx: &ProjectionContext<'_>,
429    ) -> Result<(), ProjectionError> {
430        let scheduled: UserErasureScheduled = postcard::from_bytes(&event.payload)
431            .map_err(|_| ProjectionError::DecodeFailed("UserErasureScheduled payload"))?;
432        let user = scheduled.user;
433        let rows = self.rows.rows_for(user);
434        let tombstoned = rows.rows.len();
435
436        // 2PC ordering — **shred first**, tombstone second. A failure in
437        // the shred step aborts before any row state changes, so rows
438        // remain live + DEK live (a clean retry state). A failure in the
439        // tombstone step (post-shred) leaves the ciphertext already
440        // crypto-erased: replay re-runs tombstone under the cached
441        // attestation the idempotent shredder returns. The inverse order
442        // would expose a backup-replay window where rows look dead while
443        // the DEK is still live, letting an adversary with a pre-
444        // tombstone snapshot unwrap the ciphertext.
445        let result: ShredResult = match rows.dek_id {
446            Some(dek_id) => match self.shredder.shred_with_regions(dek_id, ctx.tick) {
447                Ok(r) => r,
448                Err(DekShredError::AlreadyShredded) => {
449                    // Shredder violated the idempotency contract (must
450                    // cache + replay `Ok`). Refuse completion so the
451                    // operator can re-register the DEK rather than emit
452                    // an empty-attestation receipt that a regulator
453                    // would reject as invalid proof of destruction.
454                    return Err(ProjectionError::Storage(
455                        "shredder returned AlreadyShredded; implementations must cache attestation",
456                    ));
457                }
458                Err(DekShredError::UnknownDek) => {
459                    return Err(ProjectionError::Storage("DEK unknown to shredder"));
460                }
461                Err(DekShredError::Backend(msg)) => return Err(ProjectionError::Storage(msg)),
462            },
463            None => ShredResult {
464                regions: Vec::new(),
465                overall: DekShredAttestation {
466                    attestation_class: RuntimeSignatureClass::None,
467                    attestation_bytes: Bytes::new(),
468                    log_index: None,
469                },
470            },
471        };
472
473        // DEK is destroyed; tombstone the rows for storage hygiene. If
474        // this step fails the ciphertext is already GDPR-erased and the
475        // next replay redoes the tombstone idempotently.
476        self.rows.tombstone(user)?;
477
478        self.completions.push(ErasureCompletion {
479            user,
480            completed_tick: ctx.tick,
481            tombstoned_rows: tombstoned,
482            attestation: result.overall,
483            regions: result.regions,
484        });
485
486        self.cursor = Some(ProjectionCursor {
487            sequence: event.sequence,
488            tick: event.tick,
489        });
490        Ok(())
491    }
492
493    fn on_state_change(&mut self, _new_state: ObserverState) -> Result<(), ProjectionError> {
494        // No-op — completions persist across promotions. Demotion /
495        // drain callers pull pending completions via
496        // `drain_completions` before ceding primary.
497        Ok(())
498    }
499
500    fn last_applied(&self) -> Option<(u64, Tick)> {
501        self.cursor.map(|c| (c.sequence, c.tick))
502    }
503}
504
505// ===================== Tests =====================
506
507#[cfg(test)]
508#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
509mod tests {
510    use super::*;
511    use crate::projection::ProjectionRouter;
512    use arkhe_kernel::abi::{EntityId, InstanceId};
513
514    fn uid(v: u64) -> UserId {
515        UserId::new(EntityId::new(v).unwrap())
516    }
517
518    fn make_scheduled_event(user: UserId, seq: u64, tick: u64) -> EventRecord {
519        let ev = UserErasureScheduled {
520            schema_version: 1,
521            user,
522            scheduled_tick: Tick(tick),
523        };
524        EventRecord {
525            type_code: UserErasureScheduled::TYPE_CODE,
526            sequence: seq,
527            tick: Tick(tick),
528            payload: Bytes::from(postcard::to_stdvec(&ev).unwrap()),
529        }
530    }
531
532    fn ctx(tick: u64) -> ProjectionContext<'static> {
533        ProjectionContext::new(Tick(tick), InstanceId::new(1).unwrap())
534    }
535
536    #[test]
537    fn observer_observes_user_erasure_scheduled_only() {
538        let obs = ErasureCascadeObserver::new(
539            Box::new(InMemoryPiiRowStore::new()),
540            Box::new(InMemoryDekShredder::new()),
541        );
542        assert_eq!(obs.observes(), &[TypeCode(UserErasureScheduled::TYPE_CODE)]);
543    }
544
545    #[test]
546    fn cascade_tombstones_rows_and_shreds_dek() {
547        let mut store = InMemoryPiiRowStore::new();
548        let user = uid(42);
549        let dek_id = DekId([0xAB; 16]);
550        store.upsert(user, vec![10, 11, 12], dek_id);
551        let mut shredder = InMemoryDekShredder::new();
552        shredder.register(dek_id);
553        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
554
555        obs.on_event(&make_scheduled_event(user, 0, 100), &ctx(100))
556            .unwrap();
557
558        assert!(obs.pii_rows().is_tombstoned(user));
559        let completions = obs.drain_completions();
560        assert_eq!(completions.len(), 1);
561        assert_eq!(completions[0].user, user);
562        assert_eq!(completions[0].tombstoned_rows, 3);
563        assert_eq!(completions[0].completed_tick, Tick(100));
564        assert_eq!(
565            completions[0].attestation.attestation_class,
566            RuntimeSignatureClass::Ed25519
567        );
568    }
569
570    #[test]
571    fn cascade_no_rows_still_emits_completion() {
572        let obs_store = InMemoryPiiRowStore::new();
573        let shredder = InMemoryDekShredder::new();
574        let mut obs = ErasureCascadeObserver::new(Box::new(obs_store), Box::new(shredder));
575        let user = uid(7);
576        obs.on_event(&make_scheduled_event(user, 0, 5), &ctx(5))
577            .unwrap();
578        let completions = obs.drain_completions();
579        assert_eq!(completions.len(), 1);
580        assert_eq!(completions[0].tombstoned_rows, 0);
581        assert_eq!(
582            completions[0].attestation.attestation_class,
583            RuntimeSignatureClass::None
584        );
585        // m13 — synthetic no-DEK attestation must not collide with the
586        // genuine `Some(0)` log entry of a real shredder.
587        assert_eq!(completions[0].attestation.log_index, None);
588    }
589
590    #[test]
591    fn first_shred_log_index_is_some_zero_distinct_from_no_dek() {
592        // m13 regression — `InMemoryDekShredder::issue_attestation`
593        // hands out `Some(0)` on the first shred. Cascading a user
594        // with a real DEK and another with no DEK must show the two
595        // attestations are distinguishable on `log_index` alone.
596        let mut store = InMemoryPiiRowStore::new();
597        let user_real = uid(101);
598        let dek_id = DekId([0xAA; 16]);
599        store.upsert(user_real, vec![1], dek_id);
600        let mut shredder = InMemoryDekShredder::new();
601        shredder.register(dek_id);
602        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
603
604        obs.on_event(&make_scheduled_event(user_real, 0, 50), &ctx(50))
605            .unwrap();
606        let real = obs.drain_completions();
607        assert_eq!(real[0].attestation.log_index, Some(0));
608
609        let user_empty = uid(202);
610        obs.on_event(&make_scheduled_event(user_empty, 1, 51), &ctx(51))
611            .unwrap();
612        let empty = obs.drain_completions();
613        assert_eq!(empty[0].attestation.log_index, None);
614    }
615
616    #[test]
617    fn cascade_unknown_dek_surfaces_storage_error() {
618        let mut store = InMemoryPiiRowStore::new();
619        let user = uid(9);
620        // DEK referenced by the store is NOT registered with the shredder.
621        store.upsert(user, vec![1], DekId([0x99; 16]));
622        let shredder = InMemoryDekShredder::new();
623        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
624        let err = obs
625            .on_event(&make_scheduled_event(user, 0, 5), &ctx(5))
626            .unwrap_err();
627        assert!(matches!(err, ProjectionError::Storage(_)));
628        // Shred-first ordering — failure aborts before tombstone.
629        assert!(!obs.pii_rows().is_tombstoned(user));
630    }
631
632    #[test]
633    fn shred_failure_keeps_rows_live() {
634        // Backend error during shred must leave rows untombstoned so a
635        // replay can retry without a backup-replay window.
636        struct FailingShredder;
637        impl DekShredder for FailingShredder {
638            fn shred(&mut self, _dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
639                Err(DekShredError::Backend("inject: KMS unavailable"))
640            }
641        }
642
643        let mut store = InMemoryPiiRowStore::new();
644        let user = uid(777);
645        let dek_id = DekId([0xAA; 16]);
646        store.upsert(user, vec![1, 2, 3], dek_id);
647        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(FailingShredder));
648
649        let err = obs
650            .on_event(&make_scheduled_event(user, 0, 10), &ctx(10))
651            .unwrap_err();
652        assert!(matches!(err, ProjectionError::Storage(_)));
653
654        // Rows must survive — tombstone was never attempted.
655        assert!(!obs.pii_rows().is_tombstoned(user));
656        assert_eq!(obs.pii_rows().rows_for(user).rows.len(), 3);
657
658        // No completion emitted.
659        assert!(obs.drain_completions().is_empty());
660    }
661
662    #[test]
663    fn already_shredded_surfaces_as_storage_error() {
664        // A shredder that breaks the idempotency contract by returning
665        // `AlreadyShredded` instead of the cached attestation must be
666        // refused — the observer will not synthesise a replacement
667        // attestation that would fail regulator verification.
668        struct BrokenShredder;
669        impl DekShredder for BrokenShredder {
670            fn shred(&mut self, _dek_id: DekId) -> Result<DekShredAttestation, DekShredError> {
671                Err(DekShredError::AlreadyShredded)
672            }
673        }
674
675        let mut store = InMemoryPiiRowStore::new();
676        let user = uid(999);
677        let dek_id = DekId([0xCC; 16]);
678        store.upsert(user, vec![1], dek_id);
679        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(BrokenShredder));
680
681        let err = obs
682            .on_event(&make_scheduled_event(user, 0, 30), &ctx(30))
683            .unwrap_err();
684        assert!(matches!(err, ProjectionError::Storage(_)));
685        // Shred-first ordering preserved row state for retry.
686        assert!(!obs.pii_rows().is_tombstoned(user));
687        assert!(obs.drain_completions().is_empty());
688    }
689
690    #[test]
691    fn cascade_replay_after_tombstone_holds_rows_dead() {
692        // Backup-restore smoke — a crashed cascade replayed from the WAL
693        // finds the rows already tombstoned. The in-memory store drops
694        // the DEK reference on tombstone, so the replay takes the no-
695        // rows branch and emits a placeholder completion; the key
696        // invariant is that rows stay tombstoned throughout and no
697        // panic / data resurface occurs.
698        let mut store = InMemoryPiiRowStore::new();
699        let user = uid(1234);
700        let dek_id = DekId([0xEF; 16]);
701        store.upsert(user, vec![1, 2], dek_id);
702        let mut shredder = InMemoryDekShredder::new();
703        shredder.register(dek_id);
704        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
705
706        obs.on_event(&make_scheduled_event(user, 0, 40), &ctx(40))
707            .unwrap();
708        let first = obs.drain_completions();
709        assert_eq!(first.len(), 1);
710        assert_eq!(
711            first[0].attestation.attestation_class,
712            RuntimeSignatureClass::Ed25519
713        );
714        assert!(obs.pii_rows().is_tombstoned(user));
715
716        // Replay (WAL-driven recovery).
717        obs.on_event(&make_scheduled_event(user, 1, 41), &ctx(41))
718            .unwrap();
719        let replayed = obs.drain_completions();
720        assert_eq!(replayed.len(), 1);
721        assert_eq!(
722            replayed[0].attestation.attestation_class,
723            RuntimeSignatureClass::None
724        );
725        assert!(obs.pii_rows().is_tombstoned(user));
726    }
727
728    #[test]
729    fn cascade_participates_in_projection_router_dispatch() {
730        let mut store = InMemoryPiiRowStore::new();
731        let user = uid(123);
732        let dek_id = DekId([0xEE; 16]);
733        store.upsert(user, vec![1, 2], dek_id);
734        let mut shredder = InMemoryDekShredder::new();
735        shredder.register(dek_id);
736
737        let mut router = ProjectionRouter::new();
738        router.promote_to_active().unwrap();
739        router.register(Box::new(ErasureCascadeObserver::new(
740            Box::new(store),
741            Box::new(shredder),
742        )));
743
744        let applied = router
745            .dispatch(&make_scheduled_event(user, 0, 300), &ctx(300))
746            .unwrap();
747        assert_eq!(applied, 1);
748    }
749
750    #[test]
751    fn completed_event_roundtrip_via_helper() {
752        let completion = ErasureCompletion {
753            user: uid(1),
754            completed_tick: Tick(250),
755            tombstoned_rows: 4,
756            attestation: DekShredAttestation {
757                attestation_class: RuntimeSignatureClass::Hybrid,
758                attestation_bytes: Bytes::from_static(&[0u8; 128]),
759                log_index: Some(7),
760            },
761            regions: Vec::new(),
762        };
763        let event = ErasureCascadeObserver::into_completed_event(&completion, 1, 99);
764        assert_eq!(event.user, uid(1));
765        assert_eq!(event.dek_shred_tick, Tick(250));
766        assert_eq!(event.attestation_class, RuntimeSignatureClass::Hybrid);
767        assert_eq!(event.transparency_log_index, 99);
768
769        // Wire-level roundtrip — confirm the event struct postcard-encodes.
770        let bytes = postcard::to_stdvec(&event).unwrap();
771        let back: UserErasureCompleted = postcard::from_bytes(&bytes).unwrap();
772        assert_eq!(back, event);
773    }
774
775    #[test]
776    fn per_region_events_default_emits_one_entry_per_completion() {
777        // Single-region default — `DekShredder::shred_with_regions` wraps
778        // the single attestation as a 1-element Vec, so the cascade emits
779        // exactly one `PerRegionErasureProgress` per user.
780        let mut store = InMemoryPiiRowStore::new();
781        let user = uid(11);
782        let dek_id = DekId([0xAA; 16]);
783        store.upsert(user, vec![1, 2, 3], dek_id);
784        let mut shredder = InMemoryDekShredder::new();
785        shredder.register(dek_id);
786        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
787
788        obs.on_event(&make_scheduled_event(user, 0, 100), &ctx(100))
789            .unwrap();
790        let completions = obs.drain_completions();
791        assert_eq!(completions.len(), 1);
792        let completion = &completions[0];
793        assert_eq!(
794            completion.regions.len(),
795            1,
796            "single-region default emits 1 entry"
797        );
798        let region = &completion.regions[0];
799        assert!(matches!(region.scope, ProgressScope::Region(_)));
800        assert_eq!(region.shred_tick, Tick(100));
801        assert_eq!(region.attestation_class, RuntimeSignatureClass::Ed25519);
802
803        let events = ErasureCascadeObserver::per_region_events(completion, 1);
804        assert_eq!(events.len(), 1);
805        assert_eq!(events[0].user, user);
806        assert_eq!(events[0].shred_tick, Tick(100));
807
808        // Wire-level roundtrip on the per-region event.
809        let bytes = postcard::to_stdvec(&events[0]).unwrap();
810        let back: PerRegionErasureProgress = postcard::from_bytes(&bytes).unwrap();
811        assert_eq!(back, events[0]);
812    }
813
814    #[test]
815    fn per_region_events_no_dek_user_emits_zero_entries() {
816        // A user whose row store never carried a DEK has `regions` left
817        // empty (the synthetic `RuntimeSignatureClass::None` attestation
818        // is overall-only) — zero `PerRegionErasureProgress` events.
819        let store = InMemoryPiiRowStore::new();
820        let user = uid(12);
821        let shredder = InMemoryDekShredder::new();
822        let mut obs = ErasureCascadeObserver::new(Box::new(store), Box::new(shredder));
823
824        obs.on_event(&make_scheduled_event(user, 0, 200), &ctx(200))
825            .unwrap();
826        let completions = obs.drain_completions();
827        assert_eq!(completions.len(), 1);
828        assert!(completions[0].regions.is_empty());
829        let events = ErasureCascadeObserver::per_region_events(&completions[0], 1);
830        assert!(events.is_empty());
831    }
832
833    /// **E-user-3 integration** — the axiom harness in `arkhe-forge-core`
834    /// pins that `GdprEraseUser::compute` emits `UserErasureScheduled`.
835    /// This platform-level test shows the cascade observer picks the
836    /// event up and reaches the `UserErasureCompleted` completion
837    /// record — E-user-3 cascade trigger.
838    #[test]
839    fn e_user_3_cascade_activates_end_to_end() {
840        use arkhe_forge_core::action::ActionCompute;
841        use arkhe_forge_core::context::ActionContext as L1ActionContext;
842        use arkhe_forge_core::user::GdprEraseUser;
843        use arkhe_kernel::abi::{CapabilityMask, Principal};
844
845        let user = uid(7777);
846
847        // 1. Run the L1 compute to produce the scheduling event.
848        let act = GdprEraseUser {
849            schema_version: 1,
850            user,
851        };
852        let mut l1 = L1ActionContext::new(
853            [0u8; 32],
854            InstanceId::new(1).unwrap(),
855            Tick(100),
856            Principal::System,
857            CapabilityMask::SYSTEM,
858        );
859        act.compute(&mut l1).unwrap();
860        let mut events = l1.drain_events();
861        assert_eq!(events.len(), 1);
862        let scheduling_record = events.pop().unwrap();
863
864        // 2. Feed the event into the cascade observer.
865        let mut store = InMemoryPiiRowStore::new();
866        let dek_id = DekId([0xCD; 16]);
867        store.upsert(user, vec![100, 101, 102, 103], dek_id);
868        let mut shredder = InMemoryDekShredder::new();
869        shredder.register(dek_id);
870        let mut router = ProjectionRouter::new();
871        router.promote_to_active().unwrap();
872        router.register(Box::new(ErasureCascadeObserver::new(
873            Box::new(store),
874            Box::new(shredder),
875        )));
876        let event_record = EventRecord {
877            type_code: scheduling_record.type_code,
878            sequence: 0,
879            tick: scheduling_record.tick,
880            payload: scheduling_record.payload.clone(),
881        };
882        let applied = router.dispatch(&event_record, &ctx(100)).unwrap();
883        assert_eq!(applied, 1);
884    }
885}