Skip to main content

graphrefly_storage/
tier.rs

1//! Tier abstraction layer (Phase 14.6 — DS-14-storage Audit 4 + Q5 + Q8
2//! locks, M4.B 2026-05-10).
3//!
4//! Three-layer tier model (matching `packages/pure-ts/src/extra/storage/tiers.ts`):
5//!
6//! - **Layer 1** — [`StorageBackend`](crate::backend::StorageBackend): generic
7//!   bytes-level kv I/O.
8//! - **Layer 2** — tier specializations parametric over `T`:
9//!   - [`SnapshotStorageTier<T>`] — one record per `save(snapshot)` call.
10//!   - [`AppendLogStorageTier<T>`] — sequential entries with optional
11//!     partitioning via `key_of`.
12//!   - [`KvStorageTier<T>`] — many records, addressable by string key.
13//! - **Layer 3** — high-level wiring (`Graph::attach_storage`, M4.E
14//!   integration).
15//!
16//! All sub-traits inherit [`BaseStorageTier`], which carries the cadence
17//! knobs (`debounce_ms`, `compact_every`), transaction lifecycle (`flush`,
18//! `rollback`), and the dyn-safe bytes-level
19//! [`BaseStorageTier::list_by_prefix_bytes`] enumeration. Typed enumeration
20//! (decoding via the tier's codec) lives on the typed sub-traits as free
21//! functions or default-impl helpers.
22//!
23//! # Sync, NOT async (D143)
24//!
25//! Every method returns directly (no `Future`). Memory / redb / `std::fs`
26//! backends are all sync-compatible; tokio-backed networking backends wrap
27//! their async surface at the adapter layer (e.g. `tokio::Handle::block_on`).
28//! See [`crate::backend`] module doc.
29//!
30//! # `debounce_ms` semantics at M4.B (D144 — option (b))
31//!
32//! The accessor [`BaseStorageTier::debounce_ms`] returns the configured
33//! window. **The tier itself does NOT drive a timer.** The Graph layer
34//! consumes this value at attach time (M4.E) and schedules `flush()` via its
35//! own reactive timer source (`from_timer` / `from_cron`). Until then,
36//! `save` always buffers and `flush` always commits — debounce has no
37//! automatic effect.
38
39use crate::backend::StorageBackend;
40use crate::error::StorageError;
41
42/// Boxed lazy iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`].
43/// Each item is either a `(key, bytes)` entry decoded from the backend or a
44/// surfaced [`StorageError`] (e.g. on first-yield if the backend doesn't
45/// support enumeration — lazy-throw semantics from the TS impl).
46pub type ListByPrefixIter<'a> =
47    Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a>;
48
49// ── Layer 2 — BaseStorageTier (cadence + transaction surface) ─────────────
50
51/// Common tier surface — cadence knobs + transaction lifecycle + bytes-level
52/// enumeration.
53///
54/// Implements Audit 4's "one wave = one transaction" model:
55/// - After every wave (and on `batch()` close), Graph iterates attached tiers
56///   and calls `tier.flush()`.
57/// - On wave-throw, Graph calls `tier.rollback()` to discard pending writes.
58/// - Cross-tier atomicity is best-effort; each tier owns its own transaction.
59///
60/// Lock 4.D / 6.E defaults:
61/// - `debounce_ms = None` — sync-through flush at wave close.
62/// - `compact_every = None` — no forced flush cap.
63pub trait BaseStorageTier: Send + Sync {
64    /// Diagnostic tier name (e.g. `"snapshot:my-graph"`). Surfaces in error
65    /// messages and `Display` impls.
66    fn name(&self) -> &str;
67
68    /// Debounce window in milliseconds. `None` (default) = sync-through.
69    /// Graph-layer attach reads this and schedules timed `flush()` via its
70    /// own reactive timer (M4.E); the tier itself does NOT drive a timer.
71    fn debounce_ms(&self) -> Option<u32> {
72        None
73    }
74
75    /// Force a flush every Nth accepted write regardless of debounce.
76    /// `None` = no cap; `Some(N)` triggers `flush()` on the Nth `save` (or
77    /// `append_entries` for log tiers).
78    fn compact_every(&self) -> Option<u32> {
79        None
80    }
81
82    /// Commit pending writes. Graph calls at wave-close / debounce-fire.
83    fn flush(&self) -> Result<(), StorageError>;
84
85    /// Discard pending writes. Graph calls on wave-throw.
86    fn rollback(&self) -> Result<(), StorageError>;
87
88    /// Lazily enumerate raw `(key, bytes)` pairs under a literal byte-prefix
89    /// (DS-14-storage Q5 lock).
90    ///
91    /// Yields in lex-ASC key order; for the canonical WAL key format
92    /// `${prefix}/${frame_seq:020}`, lex-ASC string sort = numeric ASC
93    /// `frame_seq` sort.
94    ///
95    /// Typed enumeration (decode via the tier's codec) lives on the typed
96    /// sub-traits as free helpers — see [`crate::wal::iterate_wal_frames`]
97    /// (M4.E) for the WAL-aware pattern. Decoupling the enumeration from
98    /// the codec keeps this method dyn-safe (avoids a generic method on the
99    /// trait).
100    ///
101    /// Backends without `list` support surface
102    /// [`StorageError::BackendNoListSupport`] on first iteration.
103    fn list_by_prefix_bytes<'a>(&'a self, prefix: &str) -> ListByPrefixIter<'a>;
104
105    /// Force a `mode:"full"` baseline immediately (Q8 lock). Bypasses
106    /// `compact_every` cadence; useful at deploy boundaries, end-of-process
107    /// drains, or test fixtures. Default impl is a `flush()` so tiers that
108    /// don't write baselines can still implement the method trivially.
109    fn compact(&self) -> Result<(), StorageError> {
110        self.flush()
111    }
112}
113
114// ── Layer 2 — Snapshot tier ───────────────────────────────────────────────
115
116/// Snapshot tier — writes a single record per `save(snapshot)` call. Mirrors
117/// TS `SnapshotStorageTier<T>`.
118///
119/// Backend key is determined by the tier's `key_of` closure (default: a
120/// constant `tier.name`). Successive saves overwrite the same key.
121pub trait SnapshotStorageTier<T>: BaseStorageTier
122where
123    T: Send + Sync + 'static,
124{
125    /// Buffer a snapshot pending flush. Honors `compact_every` and
126    /// `debounce_ms` semantics per the [`BaseStorageTier`] contract.
127    fn save(&self, snapshot: T) -> Result<(), StorageError>;
128
129    /// Load the most-recently-saved snapshot. Returns `Ok(None)` if no
130    /// snapshot has been persisted yet.
131    fn load(&self) -> Result<Option<T>, StorageError>;
132}
133
134// ── Layer 2 — Append-log tier ─────────────────────────────────────────────
135
136/// Append-log tier — bulk-friendly entry persistence with optional
137/// partitioning via `key_of`. Mirrors TS `AppendLogStorageTier<T>`.
138///
139/// Storage shape: each backend key holds a serialized array of all entries
140/// for that partition, growing on every flush. Adapters that need true
141/// append-only semantics layer their own tier over the same backend.
142pub trait AppendLogStorageTier<T>: BaseStorageTier
143where
144    T: Send + Sync + 'static,
145{
146    /// Append entries to the per-key buckets (each bucket determined by the
147    /// tier's `key_of` closure). Honors `compact_every` cadence.
148    fn append_entries(&self, entries: &[T]) -> Result<(), StorageError>;
149
150    /// Load all entries across all known keys (or filtered by `key_filter`).
151    /// Eager — for paginated reads, a future cursor-based API can be added.
152    fn load_entries(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError>;
153}
154
155// ── Layer 2 — KV tier ─────────────────────────────────────────────────────
156
157/// Key-value tier — typed records under arbitrary string keys with codec
158/// serialization at the storage boundary. Mirrors TS `KvStorageTier<T>`.
159///
160/// Use for content-addressed caches (replay), multi-record archives
161/// (snapshot index, AI memory), and fixture stores. Snapshot is "one
162/// record"; append-log is "sequential entries"; kv is "many records,
163/// addressable by key".
164pub trait KvStorageTier<T>: BaseStorageTier
165where
166    T: Send + Sync + 'static,
167{
168    /// Buffer a key→value mapping pending flush. Repeated `save(k, _)`
169    /// before flush overwrites the buffered value for that key. Honors
170    /// `compact_every` cadence.
171    fn save(&self, key: &str, value: T) -> Result<(), StorageError>;
172
173    /// Read the value at `key`. Returns `Ok(None)` on miss.
174    fn load(&self, key: &str) -> Result<Option<T>, StorageError>;
175
176    /// Delete the value at `key`. Flushes through to the backend
177    /// immediately (matches TS `delete` semantics — no debounce).
178    fn delete(&self, key: &str) -> Result<(), StorageError>;
179
180    /// Enumerate keys under `prefix` (lex-ASC). Delegates to
181    /// [`StorageBackend::list`] via the tier's backend.
182    fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError>;
183}
184
185// ── Shared bytes-level prefix-iter helper ─────────────────────────────────
186
187/// Iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`] for tiers
188/// backed by a generic `StorageBackend`. Reads the key list eagerly (the
189/// backend's `list` is sync); reads + yields each entry lazily as the
190/// consumer pulls.
191///
192/// The eager-list / lazy-read split matches the TS impl behavior: the
193/// backend may have to walk a directory or query an index to produce keys
194/// (one round-trip); reading the bytes per key is the dominant cost and
195/// should stay lazy.
196pub(crate) struct PrefixIter<'a, B: StorageBackend + ?Sized> {
197    backend: &'a B,
198    keys: std::vec::IntoIter<String>,
199    /// First-yield error (e.g. backend doesn't support list). Stored on
200    /// construction so the consumer sees it on the first `next()` call —
201    /// mirrors TS lazy-throw semantics for `backend-no-list-support`.
202    pending_error: Option<StorageError>,
203}
204
205impl<'a, B: StorageBackend + ?Sized> PrefixIter<'a, B> {
206    pub(crate) fn new(backend: &'a B, prefix: &str) -> Self {
207        match backend.list(prefix) {
208            Ok(mut keys) => {
209                // Filter to literal byte-prefix matches (defensive — some
210                // backends may return overly-wide lists) and sort lex-ASC.
211                keys.retain(|k| k.starts_with(prefix));
212                keys.sort();
213                Self {
214                    backend,
215                    keys: keys.into_iter(),
216                    pending_error: None,
217                }
218            }
219            Err(e) => Self {
220                backend,
221                keys: Vec::new().into_iter(),
222                pending_error: Some(e),
223            },
224        }
225    }
226}
227
228impl<B: StorageBackend + ?Sized> Iterator for PrefixIter<'_, B> {
229    type Item = Result<(String, Vec<u8>), StorageError>;
230
231    fn next(&mut self) -> Option<Self::Item> {
232        if let Some(e) = self.pending_error.take() {
233            return Some(Err(e));
234        }
235        loop {
236            let key = self.keys.next()?;
237            match self.backend.read(&key) {
238                Ok(Some(bytes)) if !bytes.is_empty() => return Some(Ok((key, bytes))),
239                // Empty / absent entries are skipped (the key was listed but
240                // is no longer readable — race with a concurrent delete, or
241                // a backend that holds empty placeholders).
242                Ok(_) => {}
243                Err(e) => return Some(Err(e)),
244            }
245        }
246    }
247}
248
249// Cross-tier helper traits (e.g. `HasBackend` / `HasCodec` for typed
250// list-by-prefix decoders) land in M4.E when the Graph integration actually
251// consumes them — keeping speculative infrastructure out of M4.B.
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use crate::backend::MemoryBackend;
257
258    /// Compile-time check: all four tier traits are object-safe so callers
259    /// can store heterogeneous tiers in `Vec<Box<dyn ...>>`.
260    #[test]
261    fn tier_traits_are_dyn_safe() {
262        fn assert_dyn<T: ?Sized>() {}
263        assert_dyn::<dyn BaseStorageTier>();
264        assert_dyn::<dyn SnapshotStorageTier<u64>>();
265        assert_dyn::<dyn AppendLogStorageTier<u64>>();
266        assert_dyn::<dyn KvStorageTier<u64>>();
267    }
268
269    #[test]
270    fn prefix_iter_yields_lex_asc() {
271        let b = MemoryBackend::new();
272        b.write("g/02", b"two").unwrap();
273        b.write("g/01", b"one").unwrap();
274        b.write("g/10", b"ten").unwrap();
275        b.write("other", b"x").unwrap();
276        let iter = PrefixIter::new(&b, "g/");
277        let collected: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
278        assert_eq!(
279            collected,
280            vec![
281                ("g/01".to_string(), b"one".to_vec()),
282                ("g/02".to_string(), b"two".to_vec()),
283                ("g/10".to_string(), b"ten".to_vec()),
284            ],
285        );
286    }
287
288    #[test]
289    fn prefix_iter_surfaces_backend_no_list_support_lazily() {
290        struct NoList;
291        impl StorageBackend for NoList {
292            fn name(&self) -> &'static str {
293                "no-list"
294            }
295            fn read(&self, _key: &str) -> Result<Option<Vec<u8>>, StorageError> {
296                Ok(None)
297            }
298            fn write(&self, _k: &str, _b: &[u8]) -> Result<(), StorageError> {
299                Ok(())
300            }
301        }
302        let b = NoList;
303        // Construction succeeds — error is deferred to first `next()`.
304        let mut iter = PrefixIter::new(&b, "g/");
305        let first = iter.next();
306        assert!(matches!(
307            first,
308            Some(Err(StorageError::BackendNoListSupport { .. }))
309        ));
310        // Subsequent `next()` returns `None` (iterator exhausted).
311        assert!(iter.next().is_none());
312    }
313}