Skip to main content

graphrefly_storage/
memory.rs

1//! Concrete tier implementations + memory factories (M4.B 2026-05-10).
2//!
3//! Three generic structs back the three tier sub-traits:
4//! - [`SnapshotStorage<B, T, C>`] → [`SnapshotStorageTier<T>`]
5//! - [`AppendLogStorage<B, T, C>`] → [`AppendLogStorageTier<T>`]
6//! - [`KvStorage<B, T, C>`] → [`KvStorageTier<T>`]
7//!
8//! Each holds:
9//! - `backend: Arc<B>` (shared, multi-tier-share-one-backend supported per D147)
10//! - `codec: C` (default `JsonCodec` per D148)
11//! - `name: String`, `debounce_ms`, `compact_every` — cadence knobs
12//! - `filter`, `key_of` — optional closures (boxed `dyn Fn` per D149)
13//! - Internal pending buffer (`parking_lot::Mutex`'d)
14//!
15//! Convenience factories `memory_snapshot()` / `memory_append_log()` /
16//! `memory_kv()` wrap a fresh in-process backend.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use serde::{de::DeserializeOwned, Serialize};
23
24use crate::backend::{memory_backend, MemoryBackend, StorageBackend};
25use crate::codec::{Codec, JsonCodec};
26use crate::error::StorageError;
27use crate::tier::{
28    AppendCursor, AppendLoadResult, AppendLogMode, AppendLogStorageTier, BaseStorageTier,
29    KvStorageTier, LoadEntriesOpts, PrefixIter, SnapshotStorageTier,
30};
31
32type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
33type KeyOfFn<T> = Box<dyn Fn(&T) -> String + Send + Sync>;
34type KvFilterFn<T> = Box<dyn Fn(&str, &T) -> bool + Send + Sync>;
35
36// ── Snapshot tier ─────────────────────────────────────────────────────────
37
38/// Snapshot tier — buffers one pending snapshot; `flush()` encodes via codec
39/// and writes to the backend under `key_of(snapshot)`. Mirrors TS
40/// `snapshotStorage(backend, opts)`.
41pub struct SnapshotStorage<B, T, C = JsonCodec>
42where
43    B: StorageBackend + ?Sized,
44    T: Send + Sync + 'static,
45    C: Codec<T>,
46{
47    backend: Arc<B>,
48    codec: C,
49    name: String,
50    debounce_ms: Option<u32>,
51    compact_every: Option<u32>,
52    filter: Option<FilterFn<T>>,
53    key_of: KeyOfFn<T>,
54    /// Single buffered snapshot pending flush. `Mutex<Option<T>>` rather
55    /// than `Mutex<T>` so `rollback` and `flush` can take ownership cheaply.
56    pending: Mutex<Option<T>>,
57    /// Total `save()` calls accepted (post-filter). Used by `compact_every`.
58    write_count: Mutex<u64>,
59    /// Last key written to the backend; cached so `load()` can find the
60    /// most recent baseline when `key_of` varies per snapshot.
61    last_saved_key: Mutex<Option<String>>,
62}
63
64/// Options for [`SnapshotStorage`] construction.
65pub struct SnapshotStorageOptions<T, C = JsonCodec>
66where
67    T: Send + Sync + 'static,
68    C: Codec<T>,
69{
70    pub name: Option<String>,
71    pub codec: C,
72    pub debounce_ms: Option<u32>,
73    pub compact_every: Option<u32>,
74    pub filter: Option<FilterFn<T>>,
75    pub key_of: Option<KeyOfFn<T>>,
76}
77
78impl<T> Default for SnapshotStorageOptions<T, JsonCodec>
79where
80    T: Serialize + DeserializeOwned + Send + Sync + 'static,
81{
82    fn default() -> Self {
83        Self {
84            name: None,
85            codec: JsonCodec,
86            debounce_ms: None,
87            compact_every: None,
88            filter: None,
89            key_of: None,
90        }
91    }
92}
93
94/// Factory: wrap a backend as a snapshot tier.
95///
96/// # Panics
97///
98/// Panics if `opts.compact_every == Some(0)` (use `None` to disable
99/// compaction; values ≥ 1 specify the cadence). Pre-1.0 footgun guard
100/// per /qa A4.
101pub fn snapshot_storage<B, T, C>(
102    backend: Arc<B>,
103    opts: SnapshotStorageOptions<T, C>,
104) -> SnapshotStorage<B, T, C>
105where
106    B: StorageBackend + ?Sized,
107    T: Send + Sync + 'static,
108    C: Codec<T>,
109{
110    assert!(
111        opts.compact_every != Some(0),
112        "snapshot_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
113    );
114    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
115    let fallback_key = name.clone();
116    let key_of = opts
117        .key_of
118        .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
119    SnapshotStorage {
120        backend,
121        codec: opts.codec,
122        name,
123        debounce_ms: opts.debounce_ms,
124        compact_every: opts.compact_every,
125        filter: opts.filter,
126        key_of,
127        pending: Mutex::new(None),
128        write_count: Mutex::new(0),
129        last_saved_key: Mutex::new(None),
130    }
131}
132
133/// Convenience: snapshot tier over a fresh in-memory backend.
134pub fn memory_snapshot<T, C>(
135    opts: SnapshotStorageOptions<T, C>,
136) -> SnapshotStorage<MemoryBackend, T, C>
137where
138    T: Send + Sync + 'static,
139    C: Codec<T>,
140{
141    snapshot_storage(memory_backend(), opts)
142}
143
144impl<B, T, C> SnapshotStorage<B, T, C>
145where
146    B: StorageBackend + ?Sized,
147    T: Send + Sync + 'static,
148    C: Codec<T>,
149{
150    /// Encode + write a snapshot to the backend, updating `last_saved_key` on
151    /// success. Returns `Err((snapshot, error))` on failure so the caller can
152    /// restore the snapshot to the pending slot (D165 — F1 fix).
153    fn try_flush(
154        backend: &B,
155        codec: &C,
156        key_of: &KeyOfFn<T>,
157        last_saved_key: &Mutex<Option<String>>,
158        snapshot: T,
159    ) -> Result<(), (T, StorageError)> {
160        let key = key_of(&snapshot);
161        let bytes = match codec.encode(&snapshot) {
162            Ok(b) => b,
163            Err(e) => return Err((snapshot, e.into())),
164        };
165        if let Err(e) = backend.write(&key, &bytes) {
166            return Err((snapshot, e));
167        }
168        *last_saved_key.lock() = Some(key);
169        Ok(())
170    }
171}
172
173impl<B, T, C> BaseStorageTier for SnapshotStorage<B, T, C>
174where
175    B: StorageBackend + ?Sized,
176    T: Send + Sync + 'static,
177    C: Codec<T>,
178{
179    fn name(&self) -> &str {
180        &self.name
181    }
182    fn debounce_ms(&self) -> Option<u32> {
183        self.debounce_ms
184    }
185    fn compact_every(&self) -> Option<u32> {
186        self.compact_every
187    }
188
189    // D165 — F1 fix: take pending, attempt encode+write, restore on failure.
190    // Pre-fix: `mem::take` before encode lost pending on encode/write error.
191    fn flush(&self) -> Result<(), StorageError> {
192        let slot = self.pending.lock().take();
193        let Some(snapshot) = slot else {
194            return Ok(());
195        };
196        match Self::try_flush(
197            &*self.backend,
198            &self.codec,
199            &self.key_of,
200            &self.last_saved_key,
201            snapshot,
202        ) {
203            Ok(()) => Ok(()),
204            Err((snapshot, err)) => {
205                // Restore pending so the caller can retry.
206                *self.pending.lock() = Some(snapshot);
207                Err(err)
208            }
209        }
210    }
211
212    fn rollback(&self) -> Result<(), StorageError> {
213        *self.pending.lock() = None;
214        Ok(())
215    }
216
217    fn list_by_prefix_bytes<'a>(
218        &'a self,
219        prefix: &str,
220    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
221        Box::new(PrefixIter::new(&*self.backend, prefix))
222    }
223
224    fn compact(&self) -> Result<(), StorageError> {
225        self.flush()
226    }
227}
228
229impl<B, T, C> SnapshotStorageTier<T> for SnapshotStorage<B, T, C>
230where
231    B: StorageBackend + ?Sized,
232    T: Send + Sync + 'static,
233    C: Codec<T>,
234{
235    fn save(&self, snapshot: T) -> Result<(), StorageError> {
236        if let Some(filter) = &self.filter {
237            if !filter(&snapshot) {
238                return Ok(());
239            }
240        }
241        // /qa F2 + A1 (D138-followup, 2026-05-10): hold `pending` lock across
242        // the count update + trigger decision + capture, so the snapshot that
243        // triggers a compact cadence is THE snapshot that gets persisted
244        // (closes the snapshot-compact-trigger race). Boundary-crossing
245        // trigger logic (`prev/N != new/N`) replaces strict `is_multiple_of`
246        // so batch save patterns can't skip the trigger when count jumps
247        // multiple boundaries — matches TS-side fix.
248        let captured: Option<T> = {
249            let mut pending = self.pending.lock();
250            *pending = Some(snapshot);
251            let mut count = self.write_count.lock();
252            let prev = *count;
253            *count = count.saturating_add(1);
254            let new = *count;
255            let compact_trigger = matches!(
256                self.compact_every,
257                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
258            );
259            let trigger = compact_trigger || self.debounce_ms.is_none();
260            if trigger {
261                pending.take()
262            } else {
263                None
264            }
265        };
266        if let Some(snap) = captured {
267            // D165 — F1 fix: restore pending on encode/write failure.
268            if let Err((snap, err)) = Self::try_flush(
269                &self.backend,
270                &self.codec,
271                &self.key_of,
272                &self.last_saved_key,
273                snap,
274            ) {
275                *self.pending.lock() = Some(snap);
276                return Err(err);
277            }
278        }
279        Ok(())
280    }
281
282    fn load(&self) -> Result<Option<T>, StorageError> {
283        let key = self
284            .last_saved_key
285            .lock()
286            .clone()
287            .unwrap_or_else(|| self.name.clone());
288        match self.backend.read(&key)? {
289            Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
290            _ => Ok(None),
291        }
292    }
293}
294
295// ── Append-log tier ───────────────────────────────────────────────────────
296
297/// Append-log tier — buffers per-key entries; `flush()` encodes each
298/// bucket as an array via codec and merge-writes it into the backend.
299/// Mirrors TS `appendLogStorage`.
300pub struct AppendLogStorage<B, T, C = JsonCodec>
301where
302    B: StorageBackend + ?Sized,
303    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
304    C: Codec<Vec<T>>,
305{
306    backend: Arc<B>,
307    codec: C,
308    name: String,
309    debounce_ms: Option<u32>,
310    compact_every: Option<u32>,
311    /// **D269 — persistence mode (memo:Re P1 parity).** See
312    /// [`AppendLogMode`]. Default `Append`.
313    mode: AppendLogMode,
314    key_of: KeyOfFn<T>,
315    /// Per-key pending buckets (matches TS `Map<string, T[]>`).
316    pending: Mutex<std::collections::HashMap<String, Vec<T>>>,
317    /// Total entries appended (post-filter); drives `compact_every`.
318    append_count: Mutex<u64>,
319    /// **D268 — rollback epoch token (memo:Re P0(d) parity).** Mirrors
320    /// the TS `rollbackEpoch` (`packages/pure-ts/src/extra/storage/
321    /// tiers.ts`). Bumped by [`AppendLogStorage::rollback`]; captured
322    /// by [`AppendLogStorage::flush`] at start. If a concurrent
323    /// `rollback` advances the epoch while a `flush` is in flight
324    /// (lock dropped between `pending.lock` take and per-bucket
325    /// `backend.write`), the flush aborts before each subsequent
326    /// per-bucket write — entries that haven't yet hit the backend
327    /// are dropped. Best-effort: an `backend.write` already past the
328    /// epoch check can't be un-sent.
329    ///
330    /// Note: Rust's `flush()` is sync end-to-end (no async chained-
331    /// microtask hazard like TS), so the *primary* TS rollback bug
332    /// (epoch tracking pending in-flight writes scheduled pre-rollback
333    /// across microtask boundaries) doesn't structurally apply. The
334    /// epoch here covers the narrower **concurrent multi-thread**
335    /// rollback-during-flush window — a real correctness improvement
336    /// for callers that race rollback against flush on different
337    /// threads.
338    rollback_epoch: AtomicU64,
339}
340
341pub struct AppendLogStorageOptions<T, C = JsonCodec>
342where
343    T: Send + Sync + 'static,
344    C: Codec<Vec<T>>,
345{
346    pub name: Option<String>,
347    pub codec: C,
348    pub debounce_ms: Option<u32>,
349    pub compact_every: Option<u32>,
350    pub key_of: Option<KeyOfFn<T>>,
351    /// D269: persistence mode. Default `Append` (read-merge).
352    pub mode: AppendLogMode,
353}
354
355impl<T> Default for AppendLogStorageOptions<T, JsonCodec>
356where
357    T: Serialize + DeserializeOwned + Send + Sync + 'static,
358{
359    fn default() -> Self {
360        Self {
361            name: None,
362            codec: JsonCodec,
363            debounce_ms: None,
364            compact_every: None,
365            key_of: None,
366            mode: AppendLogMode::Append,
367        }
368    }
369}
370
371/// Factory: wrap a backend as an append-log tier.
372///
373/// # Panics
374///
375/// Panics if `opts.compact_every == Some(0)`. See [`snapshot_storage`] for
376/// the rationale (pre-1.0 footgun guard per /qa A4).
377pub fn append_log_storage<B, T, C>(
378    backend: Arc<B>,
379    opts: AppendLogStorageOptions<T, C>,
380) -> AppendLogStorage<B, T, C>
381where
382    B: StorageBackend + ?Sized,
383    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
384    C: Codec<Vec<T>>,
385{
386    assert!(
387        opts.compact_every != Some(0),
388        "append_log_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
389    );
390    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
391    let fallback_key = name.clone();
392    let key_of = opts
393        .key_of
394        .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
395    AppendLogStorage {
396        backend,
397        codec: opts.codec,
398        name,
399        debounce_ms: opts.debounce_ms,
400        compact_every: opts.compact_every,
401        mode: opts.mode,
402        key_of,
403        pending: Mutex::new(std::collections::HashMap::new()),
404        append_count: Mutex::new(0),
405        rollback_epoch: AtomicU64::new(0),
406    }
407}
408
409pub fn memory_append_log<T, C>(
410    opts: AppendLogStorageOptions<T, C>,
411) -> AppendLogStorage<MemoryBackend, T, C>
412where
413    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
414    C: Codec<Vec<T>>,
415{
416    append_log_storage(memory_backend(), opts)
417}
418
419impl<B, T, C> BaseStorageTier for AppendLogStorage<B, T, C>
420where
421    B: StorageBackend + ?Sized,
422    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
423    C: Codec<Vec<T>>,
424{
425    fn name(&self) -> &str {
426        &self.name
427    }
428    fn debounce_ms(&self) -> Option<u32> {
429        self.debounce_ms
430    }
431    fn compact_every(&self) -> Option<u32> {
432        self.compact_every
433    }
434
435    // D165 — F1 fix: take pending, attempt per-bucket encode+write, restore
436    // unprocessed buckets on failure so the caller can retry.
437    //
438    // D268 — rollback-epoch check (memo:Re P0(d) parity). Captures the
439    // epoch at the top of flush; before each per-bucket backend write,
440    // verifies the epoch hasn't been advanced by a concurrent rollback.
441    // If advanced, drop all remaining buckets (they're considered
442    // rolled back) and return Ok — already-written buckets are
443    // best-effort; the epoch check is the abort boundary.
444    fn flush(&self) -> Result<(), StorageError> {
445        let scheduled_epoch = self.rollback_epoch.load(Ordering::Acquire);
446        let mut buckets = std::mem::take(&mut *self.pending.lock());
447        let keys: Vec<String> = buckets.keys().cloned().collect();
448        for key in keys {
449            // D268 epoch check: a concurrent `rollback()` invalidates
450            // any not-yet-written bucket.
451            if self.rollback_epoch.load(Ordering::Acquire) != scheduled_epoch {
452                return Ok(());
453            }
454            let bucket = match buckets.remove(&key) {
455                Some(b) if !b.is_empty() => b,
456                _ => continue,
457            };
458            // D269: Overwrite mode skips read+merge — the bucket
459            // IS the full contents to persist. Append mode (default)
460            // reads existing bytes, decodes, extends, encodes.
461            //
462            // `restore_payload` holds what we put back into `pending`
463            // if encode/write fails: in Append mode, only the NEW
464            // entries (so a retry re-reads existing + re-merges);
465            // in Overwrite mode, the bucket itself (a retry writes
466            // the same snapshot). /qa-fix 2026-05-21: was previously
467            // restoring `final_payload` (existing + new) in Append
468            // mode, which caused existing entries to duplicate on
469            // retry. Covered by
470            // `append_log_append_mode_encode_failure_does_not_duplicate_on_retry`
471            // and `append_log_append_mode_write_failure_does_not_duplicate_on_retry`.
472            // D-B (next batch, 2026-05-21) — rollback-epoch check is also
473            // applied to the error-restore path. If a concurrent
474            // `rollback()` advances the epoch DURING a failing
475            // `backend.read` / `codec.decode` / `codec.encode` /
476            // `backend.write`, the bucket-restore is silently dropped
477            // (the user's rollback intent supersedes the failed write's
478            // restore intent — without this check, retry would re-
479            // resurrect the rolled-back bucket).
480            let restore_or_drop = |buckets: &mut std::collections::HashMap<String, Vec<T>>,
481                                   key: String,
482                                   payload: Vec<T>| {
483                if self.rollback_epoch.load(Ordering::Acquire) == scheduled_epoch {
484                    buckets.insert(key, payload);
485                    *self.pending.lock() = std::mem::take(buckets);
486                }
487                // else: epoch advanced — drop the bucket; the user's
488                // `rollback()` wins. Already-written buckets stay
489                // (best-effort, same as the in-loop epoch abort).
490            };
491            let (final_payload, restore_payload): (Vec<T>, Vec<T>) = match self.mode {
492                AppendLogMode::Overwrite => {
493                    let snapshot = bucket.clone();
494                    (bucket, snapshot)
495                }
496                AppendLogMode::Append => {
497                    let existing = match self.backend.read(&key) {
498                        Ok(e) => e,
499                        Err(e) => {
500                            restore_or_drop(&mut buckets, key, bucket);
501                            return Err(e);
502                        }
503                    };
504                    let mut merged = match existing {
505                        Some(bytes) if !bytes.is_empty() => match self.codec.decode(&bytes) {
506                            Ok(v) => v,
507                            Err(e) => {
508                                restore_or_drop(&mut buckets, key, bucket);
509                                return Err(e.into());
510                            }
511                        },
512                        _ => Vec::new(),
513                    };
514                    let new_entries_backup = bucket.clone();
515                    merged.extend(bucket);
516                    (merged, new_entries_backup)
517                }
518            };
519            let encoded = match self.codec.encode(&final_payload) {
520                Ok(b) => b,
521                Err(e) => {
522                    restore_or_drop(&mut buckets, key, restore_payload);
523                    return Err(e.into());
524                }
525            };
526            if let Err(e) = self.backend.write(&key, &encoded) {
527                restore_or_drop(&mut buckets, key, restore_payload);
528                return Err(e);
529            }
530        }
531        Ok(())
532    }
533
534    // D268 — rollback bumps the epoch atomically before clearing
535    // pending so a concurrent `flush` sees the bump on its next
536    // per-bucket check and aborts. Bump+clear order matters: clearing
537    // pending without bumping the epoch would let a flush that has
538    // already taken pending (via `mem::take`) proceed unaware that
539    // rollback was called.
540    fn rollback(&self) -> Result<(), StorageError> {
541        self.rollback_epoch.fetch_add(1, Ordering::AcqRel);
542        self.pending.lock().clear();
543        Ok(())
544    }
545
546    fn list_by_prefix_bytes<'a>(
547        &'a self,
548        prefix: &str,
549    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
550        Box::new(PrefixIter::new(&*self.backend, prefix))
551    }
552}
553
554impl<B, T, C> AppendLogStorageTier<T> for AppendLogStorage<B, T, C>
555where
556    B: StorageBackend + ?Sized,
557    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
558    C: Codec<Vec<T>>,
559{
560    fn append_entries(&self, entries: &[T]) -> Result<(), StorageError> {
561        if entries.is_empty() {
562            return Ok(());
563        }
564        // /qa F2 (D138-followup, 2026-05-10): boundary-crossing trigger logic
565        // — a batch that jumps multiple compact_every boundaries fires one
566        // flush (closes the strict-divisibility gap where
567        // `append_entries(&[a,b,c,d,e])` with compact_every=3 would miss the
568        // boundary at 3). Matches TS-side fix.
569        let trigger_now = {
570            let mut pending = self.pending.lock();
571            for entry in entries {
572                let k = (self.key_of)(entry);
573                pending.entry(k).or_default().push(entry.clone());
574            }
575            let mut count = self.append_count.lock();
576            let prev = *count;
577            *count = count.saturating_add(entries.len() as u64);
578            let new = *count;
579            let compact_trigger = matches!(
580                self.compact_every,
581                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
582            );
583            compact_trigger || self.debounce_ms.is_none()
584        };
585        if trigger_now {
586            self.flush()?;
587        }
588        Ok(())
589    }
590
591    fn mode(&self) -> AppendLogMode {
592        self.mode
593    }
594
595    // D269 — windowed cursor pagination (memo:Re loadEntries-pagination
596    // parity). Bare `load_entries(LoadEntriesOpts::default())` returns
597    // the whole log + `cursor: None` (back-compat). With `page_size =
598    // Some(n)` returns the `[start, start+n)` window of the flattened
599    // lex-ASC-by-key, entry-order-within-key sequence + a forward-only
600    // cursor (`None` ⇒ no more). For a *partitioned* multi-key log we
601    // short-circuit decoding past `start + page_size + 1` entries so
602    // the consumer's per-page working set is bounded; a single-key log
603    // still decodes its one blob (per-key codec-blob model — pagination
604    // bounds the *consumer's* page, not the tier's per-key decode).
605    fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError> {
606        // (1) Enumerate backend keys (deterministic order — `PrefixIter`
607        // sorts lex-ASC). If `list` isn't supported, fall back to the
608        // tier name as a single key.
609        let mut keys = match self.backend.list(opts.key_filter.unwrap_or("")) {
610            Ok(ks) => ks,
611            Err(StorageError::BackendNoListSupport { .. }) => match opts.key_filter {
612                Some(k) => vec![k.to_string()],
613                None => vec![self.name.clone()],
614            },
615            Err(e) => return Err(e),
616        };
617        keys.sort();
618
619        let start: u64 = opts.cursor.map_or(0, |c| c.position);
620        // page_size <= 0 (None or Some(0)) ⇒ whole tail, no further cursor.
621        let page_size = opts.page_size.filter(|n| *n > 0);
622
623        // Compute the early-stop boundary: one entry past the window
624        // (start + page_size). When unbounded, decode everything.
625        let want_decoded_at_least = page_size.map(|n| start + u64::from(n) + 1);
626
627        let mut decoded: Vec<T> = Vec::new();
628        let mut total_seen: u64 = 0;
629
630        for k in keys {
631            if let Some(want) = want_decoded_at_least {
632                if total_seen >= want {
633                    break;
634                }
635            }
636            if let Some(bytes) = self.backend.read(&k)? {
637                if !bytes.is_empty() {
638                    let entries: Vec<T> = self.codec.decode(&bytes)?;
639                    total_seen = total_seen.saturating_add(entries.len() as u64);
640                    decoded.extend(entries);
641                }
642            }
643        }
644
645        // (2) Slice the decoded window. `start` past end ⇒ empty page;
646        // cursor advances to `None`.
647        let start_idx: usize = start.try_into().unwrap_or(usize::MAX).min(decoded.len());
648        let mut window: Vec<T> = decoded.split_off(start_idx);
649
650        let next_cursor: Option<AppendCursor> = match page_size {
651            Some(n) => {
652                let n_usize: usize = (n as usize).min(window.len());
653                let has_more = window.len() > n_usize;
654                window.truncate(n_usize);
655                if has_more {
656                    Some(AppendCursor::from_position(start + u64::from(n)))
657                } else {
658                    None
659                }
660            }
661            None => None,
662        };
663
664        Ok(AppendLoadResult {
665            entries: window,
666            cursor: next_cursor,
667        })
668    }
669}
670
671// ── KV tier ───────────────────────────────────────────────────────────────
672
673/// Key-value tier — buffers per-key pending writes; `flush()` encodes each
674/// value via codec and writes it to the backend. Mirrors TS `kvStorage`.
675pub struct KvStorage<B, T, C = JsonCodec>
676where
677    B: StorageBackend + ?Sized,
678    T: Send + Sync + 'static,
679    C: Codec<T>,
680{
681    backend: Arc<B>,
682    codec: C,
683    name: String,
684    debounce_ms: Option<u32>,
685    compact_every: Option<u32>,
686    filter: Option<KvFilterFn<T>>,
687    pending: Mutex<std::collections::HashMap<String, T>>,
688    write_count: Mutex<u64>,
689}
690
691pub struct KvStorageOptions<T, C = JsonCodec>
692where
693    T: Send + Sync + 'static,
694    C: Codec<T>,
695{
696    pub name: Option<String>,
697    pub codec: C,
698    pub debounce_ms: Option<u32>,
699    pub compact_every: Option<u32>,
700    pub filter: Option<KvFilterFn<T>>,
701}
702
703impl<T> Default for KvStorageOptions<T, JsonCodec>
704where
705    T: Serialize + DeserializeOwned + Send + Sync + 'static,
706{
707    fn default() -> Self {
708        Self {
709            name: None,
710            codec: JsonCodec,
711            debounce_ms: None,
712            compact_every: None,
713            filter: None,
714        }
715    }
716}
717
718/// Factory: wrap a backend as a kv tier.
719///
720/// # Panics
721///
722/// Panics if `opts.compact_every == Some(0)`. See [`snapshot_storage`] for
723/// the rationale (pre-1.0 footgun guard per /qa A4).
724pub fn kv_storage<B, T, C>(backend: Arc<B>, opts: KvStorageOptions<T, C>) -> KvStorage<B, T, C>
725where
726    B: StorageBackend + ?Sized,
727    T: Send + Sync + 'static,
728    C: Codec<T>,
729{
730    assert!(
731        opts.compact_every != Some(0),
732        "kv_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
733    );
734    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
735    KvStorage {
736        backend,
737        codec: opts.codec,
738        name,
739        debounce_ms: opts.debounce_ms,
740        compact_every: opts.compact_every,
741        filter: opts.filter,
742        pending: Mutex::new(std::collections::HashMap::new()),
743        write_count: Mutex::new(0),
744    }
745}
746
747pub fn memory_kv<T, C>(opts: KvStorageOptions<T, C>) -> KvStorage<MemoryBackend, T, C>
748where
749    T: Send + Sync + 'static,
750    C: Codec<T>,
751{
752    kv_storage(memory_backend(), opts)
753}
754
755impl<B, T, C> BaseStorageTier for KvStorage<B, T, C>
756where
757    B: StorageBackend + ?Sized,
758    T: Send + Sync + 'static,
759    C: Codec<T>,
760{
761    fn name(&self) -> &str {
762        &self.name
763    }
764    fn debounce_ms(&self) -> Option<u32> {
765        self.debounce_ms
766    }
767    fn compact_every(&self) -> Option<u32> {
768        self.compact_every
769    }
770
771    // D165 — F1 fix: take pending, attempt per-entry encode+write, restore
772    // unprocessed entries on failure so the caller can retry.
773    fn flush(&self) -> Result<(), StorageError> {
774        let mut entries = std::mem::take(&mut *self.pending.lock());
775        let keys: Vec<String> = entries.keys().cloned().collect();
776        for key in keys {
777            let Some(value) = entries.remove(&key) else {
778                continue;
779            };
780            let bytes = match self.codec.encode(&value) {
781                Ok(b) => b,
782                Err(e) => {
783                    entries.insert(key, value);
784                    *self.pending.lock() = entries;
785                    return Err(e.into());
786                }
787            };
788            if let Err(e) = self.backend.write(&key, &bytes) {
789                entries.insert(key, value);
790                *self.pending.lock() = entries;
791                return Err(e);
792            }
793        }
794        Ok(())
795    }
796
797    fn rollback(&self) -> Result<(), StorageError> {
798        self.pending.lock().clear();
799        Ok(())
800    }
801
802    fn list_by_prefix_bytes<'a>(
803        &'a self,
804        prefix: &str,
805    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
806        Box::new(PrefixIter::new(&*self.backend, prefix))
807    }
808}
809
810impl<B, T, C> KvStorageTier<T> for KvStorage<B, T, C>
811where
812    B: StorageBackend + ?Sized,
813    T: Send + Sync + 'static,
814    C: Codec<T>,
815{
816    fn save(&self, key: &str, value: T) -> Result<(), StorageError> {
817        if let Some(filter) = &self.filter {
818            if !filter(key, &value) {
819                return Ok(());
820            }
821        }
822        // /qa F2 (D138-followup, 2026-05-10): boundary-crossing trigger logic
823        // — matches the Snapshot/AppendLog fix above. A batch of saves that
824        // jumps a compact_every boundary fires one flush.
825        let trigger_now = {
826            self.pending.lock().insert(key.to_string(), value);
827            let mut count = self.write_count.lock();
828            let prev = *count;
829            *count = count.saturating_add(1);
830            let new = *count;
831            let compact_trigger = matches!(
832                self.compact_every,
833                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
834            );
835            compact_trigger || self.debounce_ms.is_none()
836        };
837        if trigger_now {
838            self.flush()?;
839        }
840        Ok(())
841    }
842
843    fn load(&self, key: &str) -> Result<Option<T>, StorageError> {
844        match self.backend.read(key)? {
845            Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
846            _ => Ok(None),
847        }
848    }
849
850    fn delete(&self, key: &str) -> Result<(), StorageError> {
851        // /qa A2 (2026-05-10): backend.delete fires FIRST so a failure leaves
852        // pending intact (caller can retry). Pre-fix had pending cleared
853        // before backend.delete, meaning a backend.delete failure left the
854        // backend with stale data + pending empty — silent data divergence
855        // visible only on next `load(key)`.
856        self.backend.delete(key)?;
857        self.pending.lock().remove(key);
858        Ok(())
859    }
860
861    fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
862        self.backend.list(prefix)
863    }
864}