Skip to main content

graphrefly_storage/
memory.rs

1//! Concrete tier implementations + memory factories (M4.B 2026-05-10).
2//!
3//! Three generic structs back the three tier sub-traits:
4//! - [`SnapshotStorage<B, T, C>`] → [`SnapshotStorageTier<T>`]
5//! - [`AppendLogStorage<B, T, C>`] → [`AppendLogStorageTier<T>`]
6//! - [`KvStorage<B, T, C>`] → [`KvStorageTier<T>`]
7//!
8//! Each holds:
9//! - `backend: Arc<B>` (shared, multi-tier-share-one-backend supported per D147)
10//! - `codec: C` (default `JsonCodec` per D148)
11//! - `name: String`, `debounce_ms`, `compact_every` — cadence knobs
12//! - `filter`, `key_of` — optional closures (boxed `dyn Fn` per D149)
13//! - Internal pending buffer (`parking_lot::Mutex`'d)
14//!
15//! Convenience factories `memory_snapshot()` / `memory_append_log()` /
16//! `memory_kv()` wrap a fresh in-process backend.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21use parking_lot::Mutex;
22use serde::{de::DeserializeOwned, Serialize};
23
24use crate::backend::{memory_backend, MemoryBackend, StorageBackend};
25use crate::codec::{Codec, JsonCodec};
26use crate::error::StorageError;
27use crate::tier::{
28    AppendCursor, AppendLoadResult, AppendLogMode, AppendLogStorageTier, BaseStorageTier,
29    KvStorageTier, LoadEntriesOpts, PrefixIter, SnapshotStorageTier,
30};
31
32type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
33type KeyOfFn<T> = Box<dyn Fn(&T) -> String + Send + Sync>;
34type KvFilterFn<T> = Box<dyn Fn(&str, &T) -> bool + Send + Sync>;
35
36// ── Snapshot tier ─────────────────────────────────────────────────────────
37
38/// Snapshot tier — buffers one pending snapshot; `flush()` encodes via codec
39/// and writes to the backend under `key_of(snapshot)`. Mirrors TS
40/// `snapshotStorage(backend, opts)`.
41pub struct SnapshotStorage<B, T, C = JsonCodec>
42where
43    B: StorageBackend + ?Sized,
44    T: Send + Sync + 'static,
45    C: Codec<T>,
46{
47    backend: Arc<B>,
48    codec: C,
49    name: String,
50    debounce_ms: Option<u32>,
51    compact_every: Option<u32>,
52    filter: Option<FilterFn<T>>,
53    key_of: KeyOfFn<T>,
54    /// Single buffered snapshot pending flush. `Mutex<Option<T>>` rather
55    /// than `Mutex<T>` so `rollback` and `flush` can take ownership cheaply.
56    pending: Mutex<Option<T>>,
57    /// Total `save()` calls accepted (post-filter). Used by `compact_every`.
58    write_count: Mutex<u64>,
59    /// Last key written to the backend; cached so `load()` can find the
60    /// most recent baseline when `key_of` varies per snapshot.
61    last_saved_key: Mutex<Option<String>>,
62}
63
64/// Options for [`SnapshotStorage`] construction.
65pub struct SnapshotStorageOptions<T, C = JsonCodec>
66where
67    T: Send + Sync + 'static,
68    C: Codec<T>,
69{
70    pub name: Option<String>,
71    pub codec: C,
72    pub debounce_ms: Option<u32>,
73    pub compact_every: Option<u32>,
74    pub filter: Option<FilterFn<T>>,
75    pub key_of: Option<KeyOfFn<T>>,
76}
77
78impl<T> Default for SnapshotStorageOptions<T, JsonCodec>
79where
80    T: Serialize + DeserializeOwned + Send + Sync + 'static,
81{
82    fn default() -> Self {
83        Self {
84            name: None,
85            codec: JsonCodec,
86            debounce_ms: None,
87            compact_every: None,
88            filter: None,
89            key_of: None,
90        }
91    }
92}
93
94/// Factory: wrap a backend as a snapshot tier.
95///
96/// # Panics
97///
98/// Panics if `opts.compact_every == Some(0)` (use `None` to disable
99/// compaction; values ≥ 1 specify the cadence). Pre-1.0 footgun guard
100/// per /qa A4.
101pub fn snapshot_storage<B, T, C>(
102    backend: Arc<B>,
103    opts: SnapshotStorageOptions<T, C>,
104) -> SnapshotStorage<B, T, C>
105where
106    B: StorageBackend + ?Sized,
107    T: Send + Sync + 'static,
108    C: Codec<T>,
109{
110    assert!(
111        opts.compact_every != Some(0),
112        "snapshot_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
113    );
114    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
115    let fallback_key = name.clone();
116    let key_of = opts
117        .key_of
118        .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
119    SnapshotStorage {
120        backend,
121        codec: opts.codec,
122        name,
123        debounce_ms: opts.debounce_ms,
124        compact_every: opts.compact_every,
125        filter: opts.filter,
126        key_of,
127        pending: Mutex::new(None),
128        write_count: Mutex::new(0),
129        last_saved_key: Mutex::new(None),
130    }
131}
132
133/// Convenience: snapshot tier over a fresh in-memory backend.
134pub fn memory_snapshot<T, C>(
135    opts: SnapshotStorageOptions<T, C>,
136) -> SnapshotStorage<MemoryBackend, T, C>
137where
138    T: Send + Sync + 'static,
139    C: Codec<T>,
140{
141    snapshot_storage(memory_backend(), opts)
142}
143
144impl<B, T, C> SnapshotStorage<B, T, C>
145where
146    B: StorageBackend + ?Sized,
147    T: Send + Sync + 'static,
148    C: Codec<T>,
149{
150    /// Encode + write a snapshot to the backend, updating `last_saved_key` on
151    /// success. Returns `Err((snapshot, error))` on failure so the caller can
152    /// restore the snapshot to the pending slot (D165 — F1 fix).
153    fn try_flush(
154        backend: &B,
155        codec: &C,
156        key_of: &KeyOfFn<T>,
157        last_saved_key: &Mutex<Option<String>>,
158        snapshot: T,
159    ) -> Result<(), (T, StorageError)> {
160        let key = key_of(&snapshot);
161        let bytes = match codec.encode(&snapshot) {
162            Ok(b) => b,
163            Err(e) => return Err((snapshot, e.into())),
164        };
165        if let Err(e) = backend.write(&key, &bytes) {
166            return Err((snapshot, e));
167        }
168        *last_saved_key.lock() = Some(key);
169        Ok(())
170    }
171}
172
173impl<B, T, C> BaseStorageTier for SnapshotStorage<B, T, C>
174where
175    B: StorageBackend + ?Sized,
176    T: Send + Sync + 'static,
177    C: Codec<T>,
178{
179    fn name(&self) -> &str {
180        &self.name
181    }
182    fn debounce_ms(&self) -> Option<u32> {
183        self.debounce_ms
184    }
185    fn compact_every(&self) -> Option<u32> {
186        self.compact_every
187    }
188
189    // D165 — F1 fix: take pending, attempt encode+write, restore on failure.
190    // Pre-fix: `mem::take` before encode lost pending on encode/write error.
191    fn flush(&self) -> Result<(), StorageError> {
192        let slot = self.pending.lock().take();
193        let Some(snapshot) = slot else {
194            return Ok(());
195        };
196        match Self::try_flush(
197            &*self.backend,
198            &self.codec,
199            &self.key_of,
200            &self.last_saved_key,
201            snapshot,
202        ) {
203            Ok(()) => Ok(()),
204            Err((snapshot, err)) => {
205                // Restore pending so the caller can retry.
206                *self.pending.lock() = Some(snapshot);
207                Err(err)
208            }
209        }
210    }
211
212    fn rollback(&self) -> Result<(), StorageError> {
213        *self.pending.lock() = None;
214        Ok(())
215    }
216
217    fn list_by_prefix_bytes<'a>(
218        &'a self,
219        prefix: &str,
220    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
221        Box::new(PrefixIter::new(&*self.backend, prefix))
222    }
223
224    fn compact(&self) -> Result<(), StorageError> {
225        self.flush()
226    }
227}
228
229impl<B, T, C> SnapshotStorageTier<T> for SnapshotStorage<B, T, C>
230where
231    B: StorageBackend + ?Sized,
232    T: Send + Sync + 'static,
233    C: Codec<T>,
234{
235    fn save(&self, snapshot: T) -> Result<(), StorageError> {
236        if let Some(filter) = &self.filter {
237            if !filter(&snapshot) {
238                return Ok(());
239            }
240        }
241        // /qa F2 + A1 (D138-followup, 2026-05-10): hold `pending` lock across
242        // the count update + trigger decision + capture, so the snapshot that
243        // triggers a compact cadence is THE snapshot that gets persisted
244        // (closes the snapshot-compact-trigger race). Boundary-crossing
245        // trigger logic (`prev/N != new/N`) replaces strict `is_multiple_of`
246        // so batch save patterns can't skip the trigger when count jumps
247        // multiple boundaries — matches TS-side fix.
248        let captured: Option<T> = {
249            let mut pending = self.pending.lock();
250            *pending = Some(snapshot);
251            let mut count = self.write_count.lock();
252            let prev = *count;
253            *count = count.saturating_add(1);
254            let new = *count;
255            let compact_trigger = matches!(
256                self.compact_every,
257                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
258            );
259            let trigger = compact_trigger || self.debounce_ms.is_none();
260            if trigger {
261                pending.take()
262            } else {
263                None
264            }
265        };
266        if let Some(snap) = captured {
267            // D165 — F1 fix: restore pending on encode/write failure.
268            if let Err((snap, err)) = Self::try_flush(
269                &self.backend,
270                &self.codec,
271                &self.key_of,
272                &self.last_saved_key,
273                snap,
274            ) {
275                *self.pending.lock() = Some(snap);
276                return Err(err);
277            }
278        }
279        Ok(())
280    }
281
282    fn load(&self) -> Result<Option<T>, StorageError> {
283        let key = self
284            .last_saved_key
285            .lock()
286            .clone()
287            .unwrap_or_else(|| self.name.clone());
288        match self.backend.read(&key)? {
289            Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
290            _ => Ok(None),
291        }
292    }
293}
294
295// ── Append-log tier ───────────────────────────────────────────────────────
296
297/// Append-log tier — buffers per-key entries; `flush()` encodes each
298/// bucket as an array via codec and merge-writes it into the backend.
299/// Mirrors TS `appendLogStorage`.
300pub struct AppendLogStorage<B, T, C = JsonCodec>
301where
302    B: StorageBackend + ?Sized,
303    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
304    C: Codec<Vec<T>>,
305{
306    backend: Arc<B>,
307    codec: C,
308    name: String,
309    debounce_ms: Option<u32>,
310    compact_every: Option<u32>,
311    /// **D269 — persistence mode (memo:Re P1 parity).** See
312    /// [`AppendLogMode`]. Default `Append`.
313    mode: AppendLogMode,
314    key_of: KeyOfFn<T>,
315    /// Per-key pending buckets (matches TS `Map<string, T[]>`).
316    pending: Mutex<std::collections::HashMap<String, Vec<T>>>,
317    /// Total entries appended (post-filter); drives `compact_every`.
318    append_count: Mutex<u64>,
319    /// **D268 — rollback epoch token (memo:Re P0(d) parity).** Mirrors
320    /// the TS `rollbackEpoch` (`packages/pure-ts/src/extra/storage/
321    /// tiers.ts`). Bumped by [`AppendLogStorage::rollback`]; captured
322    /// by [`AppendLogStorage::flush`] at start. If a concurrent
323    /// `rollback` advances the epoch while a `flush` is in flight
324    /// (lock dropped between `pending.lock` take and per-bucket
325    /// `backend.write`), the flush aborts before each subsequent
326    /// per-bucket write — entries that haven't yet hit the backend
327    /// are dropped. Best-effort: an `backend.write` already past the
328    /// epoch check can't be un-sent.
329    ///
330    /// Note: Rust's `flush()` is sync end-to-end (no async chained-
331    /// microtask hazard like TS), so the *primary* TS rollback bug
332    /// (epoch tracking pending in-flight writes scheduled pre-rollback
333    /// across microtask boundaries) doesn't structurally apply. The
334    /// epoch here covers the narrower **concurrent multi-thread**
335    /// rollback-during-flush window — a real correctness improvement
336    /// for callers that race rollback against flush on different
337    /// threads.
338    rollback_epoch: AtomicU64,
339}
340
341pub struct AppendLogStorageOptions<T, C = JsonCodec>
342where
343    T: Send + Sync + 'static,
344    C: Codec<Vec<T>>,
345{
346    pub name: Option<String>,
347    pub codec: C,
348    pub debounce_ms: Option<u32>,
349    pub compact_every: Option<u32>,
350    pub key_of: Option<KeyOfFn<T>>,
351    /// D269: persistence mode. Default `Append` (read-merge).
352    pub mode: AppendLogMode,
353}
354
355impl<T> Default for AppendLogStorageOptions<T, JsonCodec>
356where
357    T: Serialize + DeserializeOwned + Send + Sync + 'static,
358{
359    fn default() -> Self {
360        Self {
361            name: None,
362            codec: JsonCodec,
363            debounce_ms: None,
364            compact_every: None,
365            key_of: None,
366            mode: AppendLogMode::Append,
367        }
368    }
369}
370
371/// Factory: wrap a backend as an append-log tier.
372///
373/// # Panics
374///
375/// Panics if `opts.compact_every == Some(0)`. See [`snapshot_storage`] for
376/// the rationale (pre-1.0 footgun guard per /qa A4).
377pub fn append_log_storage<B, T, C>(
378    backend: Arc<B>,
379    opts: AppendLogStorageOptions<T, C>,
380) -> AppendLogStorage<B, T, C>
381where
382    B: StorageBackend + ?Sized,
383    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
384    C: Codec<Vec<T>>,
385{
386    assert!(
387        opts.compact_every != Some(0),
388        "append_log_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
389    );
390    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
391    let fallback_key = name.clone();
392    let key_of = opts
393        .key_of
394        .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
395    AppendLogStorage {
396        backend,
397        codec: opts.codec,
398        name,
399        debounce_ms: opts.debounce_ms,
400        compact_every: opts.compact_every,
401        mode: opts.mode,
402        key_of,
403        pending: Mutex::new(std::collections::HashMap::new()),
404        append_count: Mutex::new(0),
405        rollback_epoch: AtomicU64::new(0),
406    }
407}
408
409pub fn memory_append_log<T, C>(
410    opts: AppendLogStorageOptions<T, C>,
411) -> AppendLogStorage<MemoryBackend, T, C>
412where
413    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
414    C: Codec<Vec<T>>,
415{
416    append_log_storage(memory_backend(), opts)
417}
418
419impl<B, T, C> BaseStorageTier for AppendLogStorage<B, T, C>
420where
421    B: StorageBackend + ?Sized,
422    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
423    C: Codec<Vec<T>>,
424{
425    fn name(&self) -> &str {
426        &self.name
427    }
428    fn debounce_ms(&self) -> Option<u32> {
429        self.debounce_ms
430    }
431    fn compact_every(&self) -> Option<u32> {
432        self.compact_every
433    }
434
435    // D165 — F1 fix: take pending, attempt per-bucket encode+write, restore
436    // unprocessed buckets on failure so the caller can retry.
437    //
438    // D268 — rollback-epoch check (memo:Re P0(d) parity). Captures the
439    // epoch at the top of flush; before each per-bucket backend write,
440    // verifies the epoch hasn't been advanced by a concurrent rollback.
441    // If advanced, drop all remaining buckets (they're considered
442    // rolled back) and return Ok — already-written buckets are
443    // best-effort; the epoch check is the abort boundary.
444    fn flush(&self) -> Result<(), StorageError> {
445        let scheduled_epoch = self.rollback_epoch.load(Ordering::Acquire);
446        let mut buckets = std::mem::take(&mut *self.pending.lock());
447        let keys: Vec<String> = buckets.keys().cloned().collect();
448        for key in keys {
449            // D268 epoch check: a concurrent `rollback()` invalidates
450            // any not-yet-written bucket.
451            if self.rollback_epoch.load(Ordering::Acquire) != scheduled_epoch {
452                return Ok(());
453            }
454            let bucket = match buckets.remove(&key) {
455                Some(b) if !b.is_empty() => b,
456                _ => continue,
457            };
458            // D269: Overwrite mode skips read+merge — the bucket
459            // IS the full contents to persist. Append mode (default)
460            // reads existing bytes, decodes, extends, encodes.
461            //
462            // `restore_payload` holds what we put back into `pending`
463            // if encode/write fails: in Append mode, only the NEW
464            // entries (so a retry re-reads existing + re-merges);
465            // in Overwrite mode, the bucket itself (a retry writes
466            // the same snapshot). /qa-fix 2026-05-21: was previously
467            // restoring `final_payload` (existing + new) in Append
468            // mode, which caused existing entries to duplicate on
469            // retry. Covered by
470            // `append_log_append_mode_encode_failure_does_not_duplicate_on_retry`
471            // and `append_log_append_mode_write_failure_does_not_duplicate_on_retry`.
472            let (final_payload, restore_payload): (Vec<T>, Vec<T>) = match self.mode {
473                AppendLogMode::Overwrite => {
474                    let snapshot = bucket.clone();
475                    (bucket, snapshot)
476                }
477                AppendLogMode::Append => {
478                    let existing = match self.backend.read(&key) {
479                        Ok(e) => e,
480                        Err(e) => {
481                            buckets.insert(key, bucket);
482                            *self.pending.lock() = buckets;
483                            return Err(e);
484                        }
485                    };
486                    let mut merged = match existing {
487                        Some(bytes) if !bytes.is_empty() => match self.codec.decode(&bytes) {
488                            Ok(v) => v,
489                            Err(e) => {
490                                buckets.insert(key, bucket);
491                                *self.pending.lock() = buckets;
492                                return Err(e.into());
493                            }
494                        },
495                        _ => Vec::new(),
496                    };
497                    let new_entries_backup = bucket.clone();
498                    merged.extend(bucket);
499                    (merged, new_entries_backup)
500                }
501            };
502            let encoded = match self.codec.encode(&final_payload) {
503                Ok(b) => b,
504                Err(e) => {
505                    buckets.insert(key, restore_payload);
506                    *self.pending.lock() = buckets;
507                    return Err(e.into());
508                }
509            };
510            if let Err(e) = self.backend.write(&key, &encoded) {
511                buckets.insert(key, restore_payload);
512                *self.pending.lock() = buckets;
513                return Err(e);
514            }
515        }
516        Ok(())
517    }
518
519    // D268 — rollback bumps the epoch atomically before clearing
520    // pending so a concurrent `flush` sees the bump on its next
521    // per-bucket check and aborts. Bump+clear order matters: clearing
522    // pending without bumping the epoch would let a flush that has
523    // already taken pending (via `mem::take`) proceed unaware that
524    // rollback was called.
525    fn rollback(&self) -> Result<(), StorageError> {
526        self.rollback_epoch.fetch_add(1, Ordering::AcqRel);
527        self.pending.lock().clear();
528        Ok(())
529    }
530
531    fn list_by_prefix_bytes<'a>(
532        &'a self,
533        prefix: &str,
534    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
535        Box::new(PrefixIter::new(&*self.backend, prefix))
536    }
537}
538
539impl<B, T, C> AppendLogStorageTier<T> for AppendLogStorage<B, T, C>
540where
541    B: StorageBackend + ?Sized,
542    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
543    C: Codec<Vec<T>>,
544{
545    fn append_entries(&self, entries: &[T]) -> Result<(), StorageError> {
546        if entries.is_empty() {
547            return Ok(());
548        }
549        // /qa F2 (D138-followup, 2026-05-10): boundary-crossing trigger logic
550        // — a batch that jumps multiple compact_every boundaries fires one
551        // flush (closes the strict-divisibility gap where
552        // `append_entries(&[a,b,c,d,e])` with compact_every=3 would miss the
553        // boundary at 3). Matches TS-side fix.
554        let trigger_now = {
555            let mut pending = self.pending.lock();
556            for entry in entries {
557                let k = (self.key_of)(entry);
558                pending.entry(k).or_default().push(entry.clone());
559            }
560            let mut count = self.append_count.lock();
561            let prev = *count;
562            *count = count.saturating_add(entries.len() as u64);
563            let new = *count;
564            let compact_trigger = matches!(
565                self.compact_every,
566                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
567            );
568            compact_trigger || self.debounce_ms.is_none()
569        };
570        if trigger_now {
571            self.flush()?;
572        }
573        Ok(())
574    }
575
576    fn mode(&self) -> AppendLogMode {
577        self.mode
578    }
579
580    // D269 — windowed cursor pagination (memo:Re loadEntries-pagination
581    // parity). Bare `load_entries(LoadEntriesOpts::default())` returns
582    // the whole log + `cursor: None` (back-compat). With `page_size =
583    // Some(n)` returns the `[start, start+n)` window of the flattened
584    // lex-ASC-by-key, entry-order-within-key sequence + a forward-only
585    // cursor (`None` ⇒ no more). For a *partitioned* multi-key log we
586    // short-circuit decoding past `start + page_size + 1` entries so
587    // the consumer's per-page working set is bounded; a single-key log
588    // still decodes its one blob (per-key codec-blob model — pagination
589    // bounds the *consumer's* page, not the tier's per-key decode).
590    fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError> {
591        // (1) Enumerate backend keys (deterministic order — `PrefixIter`
592        // sorts lex-ASC). If `list` isn't supported, fall back to the
593        // tier name as a single key.
594        let mut keys = match self.backend.list(opts.key_filter.unwrap_or("")) {
595            Ok(ks) => ks,
596            Err(StorageError::BackendNoListSupport { .. }) => match opts.key_filter {
597                Some(k) => vec![k.to_string()],
598                None => vec![self.name.clone()],
599            },
600            Err(e) => return Err(e),
601        };
602        keys.sort();
603
604        let start: u64 = opts.cursor.map_or(0, |c| c.position);
605        // page_size <= 0 (None or Some(0)) ⇒ whole tail, no further cursor.
606        let page_size = opts.page_size.filter(|n| *n > 0);
607
608        // Compute the early-stop boundary: one entry past the window
609        // (start + page_size). When unbounded, decode everything.
610        let want_decoded_at_least = page_size.map(|n| start + u64::from(n) + 1);
611
612        let mut decoded: Vec<T> = Vec::new();
613        let mut total_seen: u64 = 0;
614
615        for k in keys {
616            if let Some(want) = want_decoded_at_least {
617                if total_seen >= want {
618                    break;
619                }
620            }
621            if let Some(bytes) = self.backend.read(&k)? {
622                if !bytes.is_empty() {
623                    let entries: Vec<T> = self.codec.decode(&bytes)?;
624                    total_seen = total_seen.saturating_add(entries.len() as u64);
625                    decoded.extend(entries);
626                }
627            }
628        }
629
630        // (2) Slice the decoded window. `start` past end ⇒ empty page;
631        // cursor advances to `None`.
632        let start_idx: usize = start.try_into().unwrap_or(usize::MAX).min(decoded.len());
633        let mut window: Vec<T> = decoded.split_off(start_idx);
634
635        let next_cursor: Option<AppendCursor> = match page_size {
636            Some(n) => {
637                let n_usize: usize = (n as usize).min(window.len());
638                let has_more = window.len() > n_usize;
639                window.truncate(n_usize);
640                if has_more {
641                    Some(AppendCursor::from_position(start + u64::from(n)))
642                } else {
643                    None
644                }
645            }
646            None => None,
647        };
648
649        Ok(AppendLoadResult {
650            entries: window,
651            cursor: next_cursor,
652        })
653    }
654}
655
656// ── KV tier ───────────────────────────────────────────────────────────────
657
658/// Key-value tier — buffers per-key pending writes; `flush()` encodes each
659/// value via codec and writes it to the backend. Mirrors TS `kvStorage`.
660pub struct KvStorage<B, T, C = JsonCodec>
661where
662    B: StorageBackend + ?Sized,
663    T: Send + Sync + 'static,
664    C: Codec<T>,
665{
666    backend: Arc<B>,
667    codec: C,
668    name: String,
669    debounce_ms: Option<u32>,
670    compact_every: Option<u32>,
671    filter: Option<KvFilterFn<T>>,
672    pending: Mutex<std::collections::HashMap<String, T>>,
673    write_count: Mutex<u64>,
674}
675
676pub struct KvStorageOptions<T, C = JsonCodec>
677where
678    T: Send + Sync + 'static,
679    C: Codec<T>,
680{
681    pub name: Option<String>,
682    pub codec: C,
683    pub debounce_ms: Option<u32>,
684    pub compact_every: Option<u32>,
685    pub filter: Option<KvFilterFn<T>>,
686}
687
688impl<T> Default for KvStorageOptions<T, JsonCodec>
689where
690    T: Serialize + DeserializeOwned + Send + Sync + 'static,
691{
692    fn default() -> Self {
693        Self {
694            name: None,
695            codec: JsonCodec,
696            debounce_ms: None,
697            compact_every: None,
698            filter: None,
699        }
700    }
701}
702
703/// Factory: wrap a backend as a kv tier.
704///
705/// # Panics
706///
707/// Panics if `opts.compact_every == Some(0)`. See [`snapshot_storage`] for
708/// the rationale (pre-1.0 footgun guard per /qa A4).
709pub fn kv_storage<B, T, C>(backend: Arc<B>, opts: KvStorageOptions<T, C>) -> KvStorage<B, T, C>
710where
711    B: StorageBackend + ?Sized,
712    T: Send + Sync + 'static,
713    C: Codec<T>,
714{
715    assert!(
716        opts.compact_every != Some(0),
717        "kv_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
718    );
719    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
720    KvStorage {
721        backend,
722        codec: opts.codec,
723        name,
724        debounce_ms: opts.debounce_ms,
725        compact_every: opts.compact_every,
726        filter: opts.filter,
727        pending: Mutex::new(std::collections::HashMap::new()),
728        write_count: Mutex::new(0),
729    }
730}
731
732pub fn memory_kv<T, C>(opts: KvStorageOptions<T, C>) -> KvStorage<MemoryBackend, T, C>
733where
734    T: Send + Sync + 'static,
735    C: Codec<T>,
736{
737    kv_storage(memory_backend(), opts)
738}
739
740impl<B, T, C> BaseStorageTier for KvStorage<B, T, C>
741where
742    B: StorageBackend + ?Sized,
743    T: Send + Sync + 'static,
744    C: Codec<T>,
745{
746    fn name(&self) -> &str {
747        &self.name
748    }
749    fn debounce_ms(&self) -> Option<u32> {
750        self.debounce_ms
751    }
752    fn compact_every(&self) -> Option<u32> {
753        self.compact_every
754    }
755
756    // D165 — F1 fix: take pending, attempt per-entry encode+write, restore
757    // unprocessed entries on failure so the caller can retry.
758    fn flush(&self) -> Result<(), StorageError> {
759        let mut entries = std::mem::take(&mut *self.pending.lock());
760        let keys: Vec<String> = entries.keys().cloned().collect();
761        for key in keys {
762            let Some(value) = entries.remove(&key) else {
763                continue;
764            };
765            let bytes = match self.codec.encode(&value) {
766                Ok(b) => b,
767                Err(e) => {
768                    entries.insert(key, value);
769                    *self.pending.lock() = entries;
770                    return Err(e.into());
771                }
772            };
773            if let Err(e) = self.backend.write(&key, &bytes) {
774                entries.insert(key, value);
775                *self.pending.lock() = entries;
776                return Err(e);
777            }
778        }
779        Ok(())
780    }
781
782    fn rollback(&self) -> Result<(), StorageError> {
783        self.pending.lock().clear();
784        Ok(())
785    }
786
787    fn list_by_prefix_bytes<'a>(
788        &'a self,
789        prefix: &str,
790    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
791        Box::new(PrefixIter::new(&*self.backend, prefix))
792    }
793}
794
795impl<B, T, C> KvStorageTier<T> for KvStorage<B, T, C>
796where
797    B: StorageBackend + ?Sized,
798    T: Send + Sync + 'static,
799    C: Codec<T>,
800{
801    fn save(&self, key: &str, value: T) -> Result<(), StorageError> {
802        if let Some(filter) = &self.filter {
803            if !filter(key, &value) {
804                return Ok(());
805            }
806        }
807        // /qa F2 (D138-followup, 2026-05-10): boundary-crossing trigger logic
808        // — matches the Snapshot/AppendLog fix above. A batch of saves that
809        // jumps a compact_every boundary fires one flush.
810        let trigger_now = {
811            self.pending.lock().insert(key.to_string(), value);
812            let mut count = self.write_count.lock();
813            let prev = *count;
814            *count = count.saturating_add(1);
815            let new = *count;
816            let compact_trigger = matches!(
817                self.compact_every,
818                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
819            );
820            compact_trigger || self.debounce_ms.is_none()
821        };
822        if trigger_now {
823            self.flush()?;
824        }
825        Ok(())
826    }
827
828    fn load(&self, key: &str) -> Result<Option<T>, StorageError> {
829        match self.backend.read(key)? {
830            Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
831            _ => Ok(None),
832        }
833    }
834
835    fn delete(&self, key: &str) -> Result<(), StorageError> {
836        // /qa A2 (2026-05-10): backend.delete fires FIRST so a failure leaves
837        // pending intact (caller can retry). Pre-fix had pending cleared
838        // before backend.delete, meaning a backend.delete failure left the
839        // backend with stale data + pending empty — silent data divergence
840        // visible only on next `load(key)`.
841        self.backend.delete(key)?;
842        self.pending.lock().remove(key);
843        Ok(())
844    }
845
846    fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
847        self.backend.list(prefix)
848    }
849}