Skip to main content

graphrefly_storage/
tier.rs

1//! Tier abstraction layer (Phase 14.6 — DS-14-storage Audit 4 + Q5 + Q8
2//! locks, M4.B 2026-05-10).
3//!
4//! Three-layer tier model (matching `packages/pure-ts/src/extra/storage/tiers.ts`):
5//!
6//! - **Layer 1** — [`StorageBackend`](crate::backend::StorageBackend): generic
7//!   bytes-level kv I/O.
8//! - **Layer 2** — tier specializations parametric over `T`:
9//!   - [`SnapshotStorageTier<T>`] — one record per `save(snapshot)` call.
10//!   - [`AppendLogStorageTier<T>`] — sequential entries with optional
11//!     partitioning via `key_of`.
12//!   - [`KvStorageTier<T>`] — many records, addressable by string key.
13//! - **Layer 3** — high-level wiring (`Graph::attach_storage`, M4.E
14//!   integration).
15//!
16//! All sub-traits inherit [`BaseStorageTier`], which carries the cadence
17//! knobs (`debounce_ms`, `compact_every`), transaction lifecycle (`flush`,
18//! `rollback`), and the dyn-safe bytes-level
19//! [`BaseStorageTier::list_by_prefix_bytes`] enumeration. Typed enumeration
20//! (decoding via the tier's codec) lives on the typed sub-traits as free
21//! functions or default-impl helpers.
22//!
23//! # Sync, NOT async (D143)
24//!
25//! Every method returns directly (no `Future`). Memory / redb / `std::fs`
26//! backends are all sync-compatible; tokio-backed networking backends wrap
27//! their async surface at the adapter layer (e.g. `tokio::Handle::block_on`).
28//! See [`crate::backend`] module doc.
29//!
30//! # `debounce_ms` semantics at M4.B (D144 — option (b))
31//!
32//! The accessor [`BaseStorageTier::debounce_ms`] returns the configured
33//! window. **The tier itself does NOT drive a timer.** The Graph layer
34//! consumes this value at attach time (M4.E) and schedules `flush()` via its
35//! own reactive timer source (`from_timer` / `from_cron`). Until then,
36//! `save` always buffers and `flush` always commits — debounce has no
37//! automatic effect.
38
39use crate::backend::StorageBackend;
40use crate::error::StorageError;
41
42/// Boxed lazy iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`].
43/// Each item is either a `(key, bytes)` entry decoded from the backend or a
44/// surfaced [`StorageError`] (e.g. on first-yield if the backend doesn't
45/// support enumeration — lazy-throw semantics from the TS impl).
46pub type ListByPrefixIter<'a> =
47    Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a>;
48
49// ── Layer 2 — BaseStorageTier (cadence + transaction surface) ─────────────
50
51/// Common tier surface — cadence knobs + transaction lifecycle + bytes-level
52/// enumeration.
53///
54/// Implements Audit 4's "one wave = one transaction" model:
55/// - After every wave (and on `batch()` close), Graph iterates attached tiers
56///   and calls `tier.flush()`.
57/// - On wave-throw, Graph calls `tier.rollback()` to discard pending writes.
58/// - Cross-tier atomicity is best-effort; each tier owns its own transaction.
59///
60/// Lock 4.D / 6.E defaults:
61/// - `debounce_ms = None` — sync-through flush at wave close.
62/// - `compact_every = None` — no forced flush cap.
63pub trait BaseStorageTier: Send + Sync {
64    /// Diagnostic tier name (e.g. `"snapshot:my-graph"`). Surfaces in error
65    /// messages and `Display` impls.
66    fn name(&self) -> &str;
67
68    /// Debounce window in milliseconds. `None` (default) = sync-through.
69    /// Graph-layer attach reads this and schedules timed `flush()` via its
70    /// own reactive timer (M4.E); the tier itself does NOT drive a timer.
71    fn debounce_ms(&self) -> Option<u32> {
72        None
73    }
74
75    /// Force a flush every Nth accepted write regardless of debounce.
76    /// `None` = no cap; `Some(N)` triggers `flush()` on the Nth `save` (or
77    /// `append_entries` for log tiers).
78    fn compact_every(&self) -> Option<u32> {
79        None
80    }
81
82    /// Commit pending writes. Graph calls at wave-close / debounce-fire.
83    fn flush(&self) -> Result<(), StorageError>;
84
85    /// Discard pending writes. Graph calls on wave-throw.
86    fn rollback(&self) -> Result<(), StorageError>;
87
88    /// Lazily enumerate raw `(key, bytes)` pairs under a literal byte-prefix
89    /// (DS-14-storage Q5 lock).
90    ///
91    /// Yields in lex-ASC key order; for the canonical WAL key format
92    /// `${prefix}/${frame_seq:020}`, lex-ASC string sort = numeric ASC
93    /// `frame_seq` sort.
94    ///
95    /// Typed enumeration (decode via the tier's codec) lives on the typed
96    /// sub-traits as free helpers — see [`crate::wal::iterate_wal_frames`]
97    /// (M4.E) for the WAL-aware pattern. Decoupling the enumeration from
98    /// the codec keeps this method dyn-safe (avoids a generic method on the
99    /// trait).
100    ///
101    /// Backends without `list` support surface
102    /// [`StorageError::BackendNoListSupport`] on first iteration.
103    fn list_by_prefix_bytes<'a>(&'a self, prefix: &str) -> ListByPrefixIter<'a>;
104
105    /// Force a `mode:"full"` baseline immediately (Q8 lock). Bypasses
106    /// `compact_every` cadence; useful at deploy boundaries, end-of-process
107    /// drains, or test fixtures. Default impl is a `flush()` so tiers that
108    /// don't write baselines can still implement the method trivially.
109    fn compact(&self) -> Result<(), StorageError> {
110        self.flush()
111    }
112}
113
114// ── Layer 2 — Snapshot tier ───────────────────────────────────────────────
115
116/// Snapshot tier — writes a single record per `save(snapshot)` call. Mirrors
117/// TS `SnapshotStorageTier<T>`.
118///
119/// Backend key is determined by the tier's `key_of` closure (default: a
120/// constant `tier.name`). Successive saves overwrite the same key.
121pub trait SnapshotStorageTier<T>: BaseStorageTier
122where
123    T: Send + Sync + 'static,
124{
125    /// Buffer a snapshot pending flush. Honors `compact_every` and
126    /// `debounce_ms` semantics per the [`BaseStorageTier`] contract.
127    fn save(&self, snapshot: T) -> Result<(), StorageError>;
128
129    /// Load the most-recently-saved snapshot. Returns `Ok(None)` if no
130    /// snapshot has been persisted yet.
131    fn load(&self) -> Result<Option<T>, StorageError>;
132}
133
134// ── Layer 2 — Append-log tier ─────────────────────────────────────────────
135
136/// **D269 — Persistence mode (memo:Re P1 parity).** Mirrors TS
137/// `AppendLogStorageOptions.mode`. `Append` (default) reads existing
138/// bucket bytes, decodes, merges new entries, encodes, writes back —
139/// the M4.B behavior. `Overwrite` skips the read/merge entirely and
140/// snapshots the current batch as the bucket's full contents. Used
141/// for callers that ship full snapshots per wave (e.g. WAL replay
142/// drivers) rather than deltas. Feeding deltas into an `Overwrite`
143/// tier silently truncates the log to the last batch — `attach_storage`
144/// rejects overwrite sinks at attachment time.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
146pub enum AppendLogMode {
147    /// Read existing bucket, merge new entries, write back. M4.B default.
148    #[default]
149    Append,
150    /// Replace bucket contents with the current batch (no read-merge).
151    Overwrite,
152}
153
154/// **D269 — Opaque cursor for windowed `load_entries` pagination
155/// (memo:Re loadEntries-pagination parity).** Mirrors TS `AppendCursor`.
156/// `position` is a forward-only offset into the flattened, lex-ASC-by-
157/// key, entry-order-within-key sequence. `tag` is reserved for future
158/// stable-iteration tokens; currently always `None`.
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
160pub struct AppendCursor {
161    pub position: u64,
162    pub tag: Option<u64>,
163}
164
165impl AppendCursor {
166    #[must_use]
167    pub const fn from_position(position: u64) -> Self {
168        Self {
169            position,
170            tag: None,
171        }
172    }
173}
174
175/// **D269** — Options for [`AppendLogStorageTier::load_entries`].
176#[derive(Debug, Clone, Default)]
177pub struct LoadEntriesOpts<'a> {
178    pub cursor: Option<AppendCursor>,
179    pub page_size: Option<u32>,
180    pub key_filter: Option<&'a str>,
181}
182
183/// **D269** — Result of a paginated [`AppendLogStorageTier::load_entries`].
184/// `cursor.is_none()` ⇒ no more entries (consumer should stop). Returning
185/// the whole tail with `cursor.is_none()` matches the back-compat shape
186/// (bare `load_entries(LoadEntriesOpts::default())` is byte-for-byte the
187/// pre-D269 behavior).
188#[must_use = "AppendLoadResult.cursor must be threaded into the next load_entries call; ignoring it voids the pagination contract"]
189#[derive(Debug, Clone)]
190pub struct AppendLoadResult<T> {
191    pub entries: Vec<T>,
192    pub cursor: Option<AppendCursor>,
193}
194
195/// Append-log tier — bulk-friendly entry persistence with optional
196/// partitioning via `key_of`. Mirrors TS `AppendLogStorageTier<T>`.
197///
198/// Storage shape: each backend key holds a serialized array of all entries
199/// for that partition, growing on every flush. Adapters that need true
200/// append-only semantics layer their own tier over the same backend.
201pub trait AppendLogStorageTier<T>: BaseStorageTier
202where
203    T: Send + Sync + 'static,
204{
205    /// Append entries to the per-key buckets (each bucket determined by the
206    /// tier's `key_of` closure). Honors `compact_every` cadence.
207    fn append_entries(&self, entries: &[T]) -> Result<(), StorageError>;
208
209    /// D269: persistence mode (`Append` default — read-merge; `Overwrite` —
210    /// replace bucket per flush). Exposed so delta-shipping consumers like
211    /// `ReactiveLog::attach_storage` can reject `Overwrite` tiers at
212    /// attach time.
213    fn mode(&self) -> AppendLogMode {
214        AppendLogMode::Append
215    }
216
217    /// D269 — windowed cursor pagination (memo:Re loadEntries-pagination
218    /// parity). With `LoadEntriesOpts::default()` returns the whole log
219    /// (back-compat shape) and `cursor: None`. With `page_size = Some(n)`
220    /// returns `[start, start+n)` of the flattened (lex-ASC-by-key,
221    /// entry-order-within-key) sequence and a forward-only cursor
222    /// (`None` ⇒ no more). Pre-D269 callers using
223    /// `load_entries_legacy(key_filter)` get the old signature.
224    fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError>;
225
226    /// Pre-D269 convenience: load all entries (no pagination, no cursor).
227    /// Equivalent to `load_entries(LoadEntriesOpts { key_filter, .. })`.
228    fn load_entries_all(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError> {
229        Ok(self
230            .load_entries(LoadEntriesOpts {
231                cursor: None,
232                page_size: None,
233                key_filter,
234            })?
235            .entries)
236    }
237}
238
239// ── Layer 2 — KV tier ─────────────────────────────────────────────────────
240
241/// Key-value tier — typed records under arbitrary string keys with codec
242/// serialization at the storage boundary. Mirrors TS `KvStorageTier<T>`.
243///
244/// Use for content-addressed caches (replay), multi-record archives
245/// (snapshot index, AI memory), and fixture stores. Snapshot is "one
246/// record"; append-log is "sequential entries"; kv is "many records,
247/// addressable by key".
248pub trait KvStorageTier<T>: BaseStorageTier
249where
250    T: Send + Sync + 'static,
251{
252    /// Buffer a key→value mapping pending flush. Repeated `save(k, _)`
253    /// before flush overwrites the buffered value for that key. Honors
254    /// `compact_every` cadence.
255    fn save(&self, key: &str, value: T) -> Result<(), StorageError>;
256
257    /// Read the value at `key`. Returns `Ok(None)` on miss.
258    fn load(&self, key: &str) -> Result<Option<T>, StorageError>;
259
260    /// Delete the value at `key`. Flushes through to the backend
261    /// immediately (matches TS `delete` semantics — no debounce).
262    fn delete(&self, key: &str) -> Result<(), StorageError>;
263
264    /// Enumerate keys under `prefix` (lex-ASC). Delegates to
265    /// [`StorageBackend::list`] via the tier's backend.
266    fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError>;
267}
268
269// ── Shared bytes-level prefix-iter helper ─────────────────────────────────
270
271/// Iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`] for tiers
272/// backed by a generic `StorageBackend`. Reads the key list eagerly (the
273/// backend's `list` is sync); reads + yields each entry lazily as the
274/// consumer pulls.
275///
276/// The eager-list / lazy-read split matches the TS impl behavior: the
277/// backend may have to walk a directory or query an index to produce keys
278/// (one round-trip); reading the bytes per key is the dominant cost and
279/// should stay lazy.
280pub(crate) struct PrefixIter<'a, B: StorageBackend + ?Sized> {
281    backend: &'a B,
282    keys: std::vec::IntoIter<String>,
283    /// First-yield error (e.g. backend doesn't support list). Stored on
284    /// construction so the consumer sees it on the first `next()` call —
285    /// mirrors TS lazy-throw semantics for `backend-no-list-support`.
286    pending_error: Option<StorageError>,
287}
288
289impl<'a, B: StorageBackend + ?Sized> PrefixIter<'a, B> {
290    pub(crate) fn new(backend: &'a B, prefix: &str) -> Self {
291        match backend.list(prefix) {
292            Ok(mut keys) => {
293                // Filter to literal byte-prefix matches (defensive — some
294                // backends may return overly-wide lists) and sort lex-ASC.
295                keys.retain(|k| k.starts_with(prefix));
296                keys.sort();
297                Self {
298                    backend,
299                    keys: keys.into_iter(),
300                    pending_error: None,
301                }
302            }
303            Err(e) => Self {
304                backend,
305                keys: Vec::new().into_iter(),
306                pending_error: Some(e),
307            },
308        }
309    }
310}
311
312impl<B: StorageBackend + ?Sized> Iterator for PrefixIter<'_, B> {
313    type Item = Result<(String, Vec<u8>), StorageError>;
314
315    fn next(&mut self) -> Option<Self::Item> {
316        if let Some(e) = self.pending_error.take() {
317            return Some(Err(e));
318        }
319        loop {
320            let key = self.keys.next()?;
321            match self.backend.read(&key) {
322                Ok(Some(bytes)) if !bytes.is_empty() => return Some(Ok((key, bytes))),
323                // Empty / absent entries are skipped (the key was listed but
324                // is no longer readable — race with a concurrent delete, or
325                // a backend that holds empty placeholders).
326                Ok(_) => {}
327                Err(e) => return Some(Err(e)),
328            }
329        }
330    }
331}
332
333// Cross-tier helper traits (e.g. `HasBackend` / `HasCodec` for typed
334// list-by-prefix decoders) land in M4.E when the Graph integration actually
335// consumes them — keeping speculative infrastructure out of M4.B.
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::backend::MemoryBackend;
341
342    /// Compile-time check: all four tier traits are object-safe so callers
343    /// can store heterogeneous tiers in `Vec<Box<dyn ...>>`.
344    #[test]
345    fn tier_traits_are_dyn_safe() {
346        fn assert_dyn<T: ?Sized>() {}
347        assert_dyn::<dyn BaseStorageTier>();
348        assert_dyn::<dyn SnapshotStorageTier<u64>>();
349        assert_dyn::<dyn AppendLogStorageTier<u64>>();
350        assert_dyn::<dyn KvStorageTier<u64>>();
351    }
352
353    #[test]
354    fn prefix_iter_yields_lex_asc() {
355        let b = MemoryBackend::new();
356        b.write("g/02", b"two").unwrap();
357        b.write("g/01", b"one").unwrap();
358        b.write("g/10", b"ten").unwrap();
359        b.write("other", b"x").unwrap();
360        let iter = PrefixIter::new(&b, "g/");
361        let collected: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
362        assert_eq!(
363            collected,
364            vec![
365                ("g/01".to_string(), b"one".to_vec()),
366                ("g/02".to_string(), b"two".to_vec()),
367                ("g/10".to_string(), b"ten".to_vec()),
368            ],
369        );
370    }
371
372    #[test]
373    fn prefix_iter_surfaces_backend_no_list_support_lazily() {
374        struct NoList;
375        impl StorageBackend for NoList {
376            fn name(&self) -> &'static str {
377                "no-list"
378            }
379            fn read(&self, _key: &str) -> Result<Option<Vec<u8>>, StorageError> {
380                Ok(None)
381            }
382            fn write(&self, _k: &str, _b: &[u8]) -> Result<(), StorageError> {
383                Ok(())
384            }
385        }
386        let b = NoList;
387        // Construction succeeds — error is deferred to first `next()`.
388        let mut iter = PrefixIter::new(&b, "g/");
389        let first = iter.next();
390        assert!(matches!(
391            first,
392            Some(Err(StorageError::BackendNoListSupport { .. }))
393        ));
394        // Subsequent `next()` returns `None` (iterator exhausted).
395        assert!(iter.next().is_none());
396    }
397}