Skip to main content

graphrefly_storage/
memory.rs

1//! Concrete tier implementations + memory factories (M4.B 2026-05-10).
2//!
3//! Three generic structs back the three tier sub-traits:
4//! - [`SnapshotStorage<B, T, C>`] → [`SnapshotStorageTier<T>`]
5//! - [`AppendLogStorage<B, T, C>`] → [`AppendLogStorageTier<T>`]
6//! - [`KvStorage<B, T, C>`] → [`KvStorageTier<T>`]
7//!
8//! Each holds:
9//! - `backend: Arc<B>` (shared, multi-tier-share-one-backend supported per D147)
10//! - `codec: C` (default `JsonCodec` per D148)
11//! - `name: String`, `debounce_ms`, `compact_every` — cadence knobs
12//! - `filter`, `key_of` — optional closures (boxed `dyn Fn` per D149)
13//! - Internal pending buffer (`parking_lot::Mutex`'d)
14//!
15//! Convenience factories `memory_snapshot()` / `memory_append_log()` /
16//! `memory_kv()` wrap a fresh in-process backend.
17
18use std::sync::Arc;
19
20use parking_lot::Mutex;
21use serde::{de::DeserializeOwned, Serialize};
22
23use crate::backend::{memory_backend, MemoryBackend, StorageBackend};
24use crate::codec::{Codec, JsonCodec};
25use crate::error::StorageError;
26use crate::tier::{
27    AppendLogStorageTier, BaseStorageTier, KvStorageTier, PrefixIter, SnapshotStorageTier,
28};
29
30type FilterFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
31type KeyOfFn<T> = Box<dyn Fn(&T) -> String + Send + Sync>;
32type KvFilterFn<T> = Box<dyn Fn(&str, &T) -> bool + Send + Sync>;
33
34// ── Snapshot tier ─────────────────────────────────────────────────────────
35
36/// Snapshot tier — buffers one pending snapshot; `flush()` encodes via codec
37/// and writes to the backend under `key_of(snapshot)`. Mirrors TS
38/// `snapshotStorage(backend, opts)`.
39pub struct SnapshotStorage<B, T, C = JsonCodec>
40where
41    B: StorageBackend + ?Sized,
42    T: Send + Sync + 'static,
43    C: Codec<T>,
44{
45    backend: Arc<B>,
46    codec: C,
47    name: String,
48    debounce_ms: Option<u32>,
49    compact_every: Option<u32>,
50    filter: Option<FilterFn<T>>,
51    key_of: KeyOfFn<T>,
52    /// Single buffered snapshot pending flush. `Mutex<Option<T>>` rather
53    /// than `Mutex<T>` so `rollback` and `flush` can take ownership cheaply.
54    pending: Mutex<Option<T>>,
55    /// Total `save()` calls accepted (post-filter). Used by `compact_every`.
56    write_count: Mutex<u64>,
57    /// Last key written to the backend; cached so `load()` can find the
58    /// most recent baseline when `key_of` varies per snapshot.
59    last_saved_key: Mutex<Option<String>>,
60}
61
62/// Options for [`SnapshotStorage`] construction.
63pub struct SnapshotStorageOptions<T, C = JsonCodec>
64where
65    T: Send + Sync + 'static,
66    C: Codec<T>,
67{
68    pub name: Option<String>,
69    pub codec: C,
70    pub debounce_ms: Option<u32>,
71    pub compact_every: Option<u32>,
72    pub filter: Option<FilterFn<T>>,
73    pub key_of: Option<KeyOfFn<T>>,
74}
75
76impl<T> Default for SnapshotStorageOptions<T, JsonCodec>
77where
78    T: Serialize + DeserializeOwned + Send + Sync + 'static,
79{
80    fn default() -> Self {
81        Self {
82            name: None,
83            codec: JsonCodec,
84            debounce_ms: None,
85            compact_every: None,
86            filter: None,
87            key_of: None,
88        }
89    }
90}
91
92/// Factory: wrap a backend as a snapshot tier.
93///
94/// # Panics
95///
96/// Panics if `opts.compact_every == Some(0)` (use `None` to disable
97/// compaction; values ≥ 1 specify the cadence). Pre-1.0 footgun guard
98/// per /qa A4.
99pub fn snapshot_storage<B, T, C>(
100    backend: Arc<B>,
101    opts: SnapshotStorageOptions<T, C>,
102) -> SnapshotStorage<B, T, C>
103where
104    B: StorageBackend + ?Sized,
105    T: Send + Sync + 'static,
106    C: Codec<T>,
107{
108    assert!(
109        opts.compact_every != Some(0),
110        "snapshot_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
111    );
112    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
113    let fallback_key = name.clone();
114    let key_of = opts
115        .key_of
116        .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
117    SnapshotStorage {
118        backend,
119        codec: opts.codec,
120        name,
121        debounce_ms: opts.debounce_ms,
122        compact_every: opts.compact_every,
123        filter: opts.filter,
124        key_of,
125        pending: Mutex::new(None),
126        write_count: Mutex::new(0),
127        last_saved_key: Mutex::new(None),
128    }
129}
130
131/// Convenience: snapshot tier over a fresh in-memory backend.
132pub fn memory_snapshot<T, C>(
133    opts: SnapshotStorageOptions<T, C>,
134) -> SnapshotStorage<MemoryBackend, T, C>
135where
136    T: Send + Sync + 'static,
137    C: Codec<T>,
138{
139    snapshot_storage(memory_backend(), opts)
140}
141
142impl<B, T, C> SnapshotStorage<B, T, C>
143where
144    B: StorageBackend + ?Sized,
145    T: Send + Sync + 'static,
146    C: Codec<T>,
147{
148    /// Encode + write a snapshot to the backend, updating `last_saved_key` on
149    /// success. Returns `Err((snapshot, error))` on failure so the caller can
150    /// restore the snapshot to the pending slot (D165 — F1 fix).
151    fn try_flush(
152        backend: &B,
153        codec: &C,
154        key_of: &KeyOfFn<T>,
155        last_saved_key: &Mutex<Option<String>>,
156        snapshot: T,
157    ) -> Result<(), (T, StorageError)> {
158        let key = key_of(&snapshot);
159        let bytes = match codec.encode(&snapshot) {
160            Ok(b) => b,
161            Err(e) => return Err((snapshot, e.into())),
162        };
163        if let Err(e) = backend.write(&key, &bytes) {
164            return Err((snapshot, e));
165        }
166        *last_saved_key.lock() = Some(key);
167        Ok(())
168    }
169}
170
171impl<B, T, C> BaseStorageTier for SnapshotStorage<B, T, C>
172where
173    B: StorageBackend + ?Sized,
174    T: Send + Sync + 'static,
175    C: Codec<T>,
176{
177    fn name(&self) -> &str {
178        &self.name
179    }
180    fn debounce_ms(&self) -> Option<u32> {
181        self.debounce_ms
182    }
183    fn compact_every(&self) -> Option<u32> {
184        self.compact_every
185    }
186
187    // D165 — F1 fix: take pending, attempt encode+write, restore on failure.
188    // Pre-fix: `mem::take` before encode lost pending on encode/write error.
189    fn flush(&self) -> Result<(), StorageError> {
190        let slot = self.pending.lock().take();
191        let Some(snapshot) = slot else {
192            return Ok(());
193        };
194        match Self::try_flush(
195            &*self.backend,
196            &self.codec,
197            &self.key_of,
198            &self.last_saved_key,
199            snapshot,
200        ) {
201            Ok(()) => Ok(()),
202            Err((snapshot, err)) => {
203                // Restore pending so the caller can retry.
204                *self.pending.lock() = Some(snapshot);
205                Err(err)
206            }
207        }
208    }
209
210    fn rollback(&self) -> Result<(), StorageError> {
211        *self.pending.lock() = None;
212        Ok(())
213    }
214
215    fn list_by_prefix_bytes<'a>(
216        &'a self,
217        prefix: &str,
218    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
219        Box::new(PrefixIter::new(&*self.backend, prefix))
220    }
221
222    fn compact(&self) -> Result<(), StorageError> {
223        self.flush()
224    }
225}
226
227impl<B, T, C> SnapshotStorageTier<T> for SnapshotStorage<B, T, C>
228where
229    B: StorageBackend + ?Sized,
230    T: Send + Sync + 'static,
231    C: Codec<T>,
232{
233    fn save(&self, snapshot: T) -> Result<(), StorageError> {
234        if let Some(filter) = &self.filter {
235            if !filter(&snapshot) {
236                return Ok(());
237            }
238        }
239        // /qa F2 + A1 (D138-followup, 2026-05-10): hold `pending` lock across
240        // the count update + trigger decision + capture, so the snapshot that
241        // triggers a compact cadence is THE snapshot that gets persisted
242        // (closes the snapshot-compact-trigger race). Boundary-crossing
243        // trigger logic (`prev/N != new/N`) replaces strict `is_multiple_of`
244        // so batch save patterns can't skip the trigger when count jumps
245        // multiple boundaries — matches TS-side fix.
246        let captured: Option<T> = {
247            let mut pending = self.pending.lock();
248            *pending = Some(snapshot);
249            let mut count = self.write_count.lock();
250            let prev = *count;
251            *count = count.saturating_add(1);
252            let new = *count;
253            let compact_trigger = matches!(
254                self.compact_every,
255                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
256            );
257            let trigger = compact_trigger || self.debounce_ms.is_none();
258            if trigger {
259                pending.take()
260            } else {
261                None
262            }
263        };
264        if let Some(snap) = captured {
265            // D165 — F1 fix: restore pending on encode/write failure.
266            if let Err((snap, err)) = Self::try_flush(
267                &self.backend,
268                &self.codec,
269                &self.key_of,
270                &self.last_saved_key,
271                snap,
272            ) {
273                *self.pending.lock() = Some(snap);
274                return Err(err);
275            }
276        }
277        Ok(())
278    }
279
280    fn load(&self) -> Result<Option<T>, StorageError> {
281        let key = self
282            .last_saved_key
283            .lock()
284            .clone()
285            .unwrap_or_else(|| self.name.clone());
286        match self.backend.read(&key)? {
287            Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
288            _ => Ok(None),
289        }
290    }
291}
292
293// ── Append-log tier ───────────────────────────────────────────────────────
294
295/// Append-log tier — buffers per-key entries; `flush()` encodes each
296/// bucket as an array via codec and merge-writes it into the backend.
297/// Mirrors TS `appendLogStorage`.
298pub struct AppendLogStorage<B, T, C = JsonCodec>
299where
300    B: StorageBackend + ?Sized,
301    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
302    C: Codec<Vec<T>>,
303{
304    backend: Arc<B>,
305    codec: C,
306    name: String,
307    debounce_ms: Option<u32>,
308    compact_every: Option<u32>,
309    key_of: KeyOfFn<T>,
310    /// Per-key pending buckets (matches TS `Map<string, T[]>`).
311    pending: Mutex<std::collections::HashMap<String, Vec<T>>>,
312    /// Total entries appended (post-filter); drives `compact_every`.
313    append_count: Mutex<u64>,
314}
315
316pub struct AppendLogStorageOptions<T, C = JsonCodec>
317where
318    T: Send + Sync + 'static,
319    C: Codec<Vec<T>>,
320{
321    pub name: Option<String>,
322    pub codec: C,
323    pub debounce_ms: Option<u32>,
324    pub compact_every: Option<u32>,
325    pub key_of: Option<KeyOfFn<T>>,
326}
327
328impl<T> Default for AppendLogStorageOptions<T, JsonCodec>
329where
330    T: Serialize + DeserializeOwned + Send + Sync + 'static,
331{
332    fn default() -> Self {
333        Self {
334            name: None,
335            codec: JsonCodec,
336            debounce_ms: None,
337            compact_every: None,
338            key_of: None,
339        }
340    }
341}
342
343/// Factory: wrap a backend as an append-log tier.
344///
345/// # Panics
346///
347/// Panics if `opts.compact_every == Some(0)`. See [`snapshot_storage`] for
348/// the rationale (pre-1.0 footgun guard per /qa A4).
349pub fn append_log_storage<B, T, C>(
350    backend: Arc<B>,
351    opts: AppendLogStorageOptions<T, C>,
352) -> AppendLogStorage<B, T, C>
353where
354    B: StorageBackend + ?Sized,
355    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
356    C: Codec<Vec<T>>,
357{
358    assert!(
359        opts.compact_every != Some(0),
360        "append_log_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
361    );
362    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
363    let fallback_key = name.clone();
364    let key_of = opts
365        .key_of
366        .unwrap_or_else(|| Box::new(move |_| fallback_key.clone()));
367    AppendLogStorage {
368        backend,
369        codec: opts.codec,
370        name,
371        debounce_ms: opts.debounce_ms,
372        compact_every: opts.compact_every,
373        key_of,
374        pending: Mutex::new(std::collections::HashMap::new()),
375        append_count: Mutex::new(0),
376    }
377}
378
379pub fn memory_append_log<T, C>(
380    opts: AppendLogStorageOptions<T, C>,
381) -> AppendLogStorage<MemoryBackend, T, C>
382where
383    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
384    C: Codec<Vec<T>>,
385{
386    append_log_storage(memory_backend(), opts)
387}
388
389impl<B, T, C> BaseStorageTier for AppendLogStorage<B, T, C>
390where
391    B: StorageBackend + ?Sized,
392    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
393    C: Codec<Vec<T>>,
394{
395    fn name(&self) -> &str {
396        &self.name
397    }
398    fn debounce_ms(&self) -> Option<u32> {
399        self.debounce_ms
400    }
401    fn compact_every(&self) -> Option<u32> {
402        self.compact_every
403    }
404
405    // D165 — F1 fix: take pending, attempt per-bucket encode+write, restore
406    // unprocessed buckets on failure so the caller can retry.
407    fn flush(&self) -> Result<(), StorageError> {
408        let mut buckets = std::mem::take(&mut *self.pending.lock());
409        let keys: Vec<String> = buckets.keys().cloned().collect();
410        for key in keys {
411            let bucket = match buckets.remove(&key) {
412                Some(b) if !b.is_empty() => b,
413                _ => continue,
414            };
415            // Read existing, merge, write back.
416            let existing = match self.backend.read(&key) {
417                Ok(e) => e,
418                Err(e) => {
419                    buckets.insert(key, bucket);
420                    *self.pending.lock() = buckets;
421                    return Err(e);
422                }
423            };
424            let mut merged = match existing {
425                Some(bytes) if !bytes.is_empty() => match self.codec.decode(&bytes) {
426                    Ok(v) => v,
427                    Err(e) => {
428                        buckets.insert(key, bucket);
429                        *self.pending.lock() = buckets;
430                        return Err(e.into());
431                    }
432                },
433                _ => Vec::new(),
434            };
435            let bucket_backup = bucket.clone();
436            merged.extend(bucket);
437            let encoded = match self.codec.encode(&merged) {
438                Ok(b) => b,
439                Err(e) => {
440                    buckets.insert(key, bucket_backup);
441                    *self.pending.lock() = buckets;
442                    return Err(e.into());
443                }
444            };
445            if let Err(e) = self.backend.write(&key, &encoded) {
446                *self.pending.lock() = buckets;
447                return Err(e);
448            }
449        }
450        Ok(())
451    }
452
453    fn rollback(&self) -> Result<(), StorageError> {
454        self.pending.lock().clear();
455        Ok(())
456    }
457
458    fn list_by_prefix_bytes<'a>(
459        &'a self,
460        prefix: &str,
461    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
462        Box::new(PrefixIter::new(&*self.backend, prefix))
463    }
464}
465
466impl<B, T, C> AppendLogStorageTier<T> for AppendLogStorage<B, T, C>
467where
468    B: StorageBackend + ?Sized,
469    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
470    C: Codec<Vec<T>>,
471{
472    fn append_entries(&self, entries: &[T]) -> Result<(), StorageError> {
473        if entries.is_empty() {
474            return Ok(());
475        }
476        // /qa F2 (D138-followup, 2026-05-10): boundary-crossing trigger logic
477        // — a batch that jumps multiple compact_every boundaries fires one
478        // flush (closes the strict-divisibility gap where
479        // `append_entries(&[a,b,c,d,e])` with compact_every=3 would miss the
480        // boundary at 3). Matches TS-side fix.
481        let trigger_now = {
482            let mut pending = self.pending.lock();
483            for entry in entries {
484                let k = (self.key_of)(entry);
485                pending.entry(k).or_default().push(entry.clone());
486            }
487            let mut count = self.append_count.lock();
488            let prev = *count;
489            *count = count.saturating_add(entries.len() as u64);
490            let new = *count;
491            let compact_trigger = matches!(
492                self.compact_every,
493                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
494            );
495            compact_trigger || self.debounce_ms.is_none()
496        };
497        if trigger_now {
498            self.flush()?;
499        }
500        Ok(())
501    }
502
503    fn load_entries(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError> {
504        // Use the backend's enumeration to find keys. If `list` isn't
505        // supported, fall back to the tier name as the single key.
506        let keys = match self.backend.list(key_filter.unwrap_or("")) {
507            Ok(ks) => ks,
508            Err(StorageError::BackendNoListSupport { .. }) => match key_filter {
509                Some(k) => vec![k.to_string()],
510                None => vec![self.name.clone()],
511            },
512            Err(e) => return Err(e),
513        };
514        let mut all = Vec::new();
515        for k in keys {
516            if let Some(bytes) = self.backend.read(&k)? {
517                if !bytes.is_empty() {
518                    let entries: Vec<T> = self.codec.decode(&bytes)?;
519                    all.extend(entries);
520                }
521            }
522        }
523        Ok(all)
524    }
525}
526
527// ── KV tier ───────────────────────────────────────────────────────────────
528
529/// Key-value tier — buffers per-key pending writes; `flush()` encodes each
530/// value via codec and writes it to the backend. Mirrors TS `kvStorage`.
531pub struct KvStorage<B, T, C = JsonCodec>
532where
533    B: StorageBackend + ?Sized,
534    T: Send + Sync + 'static,
535    C: Codec<T>,
536{
537    backend: Arc<B>,
538    codec: C,
539    name: String,
540    debounce_ms: Option<u32>,
541    compact_every: Option<u32>,
542    filter: Option<KvFilterFn<T>>,
543    pending: Mutex<std::collections::HashMap<String, T>>,
544    write_count: Mutex<u64>,
545}
546
547pub struct KvStorageOptions<T, C = JsonCodec>
548where
549    T: Send + Sync + 'static,
550    C: Codec<T>,
551{
552    pub name: Option<String>,
553    pub codec: C,
554    pub debounce_ms: Option<u32>,
555    pub compact_every: Option<u32>,
556    pub filter: Option<KvFilterFn<T>>,
557}
558
559impl<T> Default for KvStorageOptions<T, JsonCodec>
560where
561    T: Serialize + DeserializeOwned + Send + Sync + 'static,
562{
563    fn default() -> Self {
564        Self {
565            name: None,
566            codec: JsonCodec,
567            debounce_ms: None,
568            compact_every: None,
569            filter: None,
570        }
571    }
572}
573
574/// Factory: wrap a backend as a kv tier.
575///
576/// # Panics
577///
578/// Panics if `opts.compact_every == Some(0)`. See [`snapshot_storage`] for
579/// the rationale (pre-1.0 footgun guard per /qa A4).
580pub fn kv_storage<B, T, C>(backend: Arc<B>, opts: KvStorageOptions<T, C>) -> KvStorage<B, T, C>
581where
582    B: StorageBackend + ?Sized,
583    T: Send + Sync + 'static,
584    C: Codec<T>,
585{
586    assert!(
587        opts.compact_every != Some(0),
588        "kv_storage: compact_every must be None or Some(n) where n >= 1, got Some(0)",
589    );
590    let name = opts.name.unwrap_or_else(|| backend.name().to_string());
591    KvStorage {
592        backend,
593        codec: opts.codec,
594        name,
595        debounce_ms: opts.debounce_ms,
596        compact_every: opts.compact_every,
597        filter: opts.filter,
598        pending: Mutex::new(std::collections::HashMap::new()),
599        write_count: Mutex::new(0),
600    }
601}
602
603pub fn memory_kv<T, C>(opts: KvStorageOptions<T, C>) -> KvStorage<MemoryBackend, T, C>
604where
605    T: Send + Sync + 'static,
606    C: Codec<T>,
607{
608    kv_storage(memory_backend(), opts)
609}
610
611impl<B, T, C> BaseStorageTier for KvStorage<B, T, C>
612where
613    B: StorageBackend + ?Sized,
614    T: Send + Sync + 'static,
615    C: Codec<T>,
616{
617    fn name(&self) -> &str {
618        &self.name
619    }
620    fn debounce_ms(&self) -> Option<u32> {
621        self.debounce_ms
622    }
623    fn compact_every(&self) -> Option<u32> {
624        self.compact_every
625    }
626
627    // D165 — F1 fix: take pending, attempt per-entry encode+write, restore
628    // unprocessed entries on failure so the caller can retry.
629    fn flush(&self) -> Result<(), StorageError> {
630        let mut entries = std::mem::take(&mut *self.pending.lock());
631        let keys: Vec<String> = entries.keys().cloned().collect();
632        for key in keys {
633            let Some(value) = entries.remove(&key) else {
634                continue;
635            };
636            let bytes = match self.codec.encode(&value) {
637                Ok(b) => b,
638                Err(e) => {
639                    entries.insert(key, value);
640                    *self.pending.lock() = entries;
641                    return Err(e.into());
642                }
643            };
644            if let Err(e) = self.backend.write(&key, &bytes) {
645                entries.insert(key, value);
646                *self.pending.lock() = entries;
647                return Err(e);
648            }
649        }
650        Ok(())
651    }
652
653    fn rollback(&self) -> Result<(), StorageError> {
654        self.pending.lock().clear();
655        Ok(())
656    }
657
658    fn list_by_prefix_bytes<'a>(
659        &'a self,
660        prefix: &str,
661    ) -> Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a> {
662        Box::new(PrefixIter::new(&*self.backend, prefix))
663    }
664}
665
666impl<B, T, C> KvStorageTier<T> for KvStorage<B, T, C>
667where
668    B: StorageBackend + ?Sized,
669    T: Send + Sync + 'static,
670    C: Codec<T>,
671{
672    fn save(&self, key: &str, value: T) -> Result<(), StorageError> {
673        if let Some(filter) = &self.filter {
674            if !filter(key, &value) {
675                return Ok(());
676            }
677        }
678        // /qa F2 (D138-followup, 2026-05-10): boundary-crossing trigger logic
679        // — matches the Snapshot/AppendLog fix above. A batch of saves that
680        // jumps a compact_every boundary fires one flush.
681        let trigger_now = {
682            self.pending.lock().insert(key.to_string(), value);
683            let mut count = self.write_count.lock();
684            let prev = *count;
685            *count = count.saturating_add(1);
686            let new = *count;
687            let compact_trigger = matches!(
688                self.compact_every,
689                Some(n) if n > 0 && (prev / u64::from(n)) != (new / u64::from(n))
690            );
691            compact_trigger || self.debounce_ms.is_none()
692        };
693        if trigger_now {
694            self.flush()?;
695        }
696        Ok(())
697    }
698
699    fn load(&self, key: &str) -> Result<Option<T>, StorageError> {
700        match self.backend.read(key)? {
701            Some(bytes) if !bytes.is_empty() => Ok(Some(self.codec.decode(&bytes)?)),
702            _ => Ok(None),
703        }
704    }
705
706    fn delete(&self, key: &str) -> Result<(), StorageError> {
707        // /qa A2 (2026-05-10): backend.delete fires FIRST so a failure leaves
708        // pending intact (caller can retry). Pre-fix had pending cleared
709        // before backend.delete, meaning a backend.delete failure left the
710        // backend with stale data + pending empty — silent data divergence
711        // visible only on next `load(key)`.
712        self.backend.delete(key)?;
713        self.pending.lock().remove(key);
714        Ok(())
715    }
716
717    fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
718        self.backend.list(prefix)
719    }
720}