graphrefly_storage/tier.rs
1//! Tier abstraction layer (Phase 14.6 — DS-14-storage Audit 4 + Q5 + Q8
2//! locks, M4.B 2026-05-10).
3//!
4//! Three-layer tier model (matching `packages/pure-ts/src/extra/storage/tiers.ts`):
5//!
6//! - **Layer 1** — [`StorageBackend`](crate::backend::StorageBackend): generic
7//! bytes-level kv I/O.
8//! - **Layer 2** — tier specializations parametric over `T`:
9//! - [`SnapshotStorageTier<T>`] — one record per `save(snapshot)` call.
10//! - [`AppendLogStorageTier<T>`] — sequential entries with optional
11//! partitioning via `key_of`.
12//! - [`KvStorageTier<T>`] — many records, addressable by string key.
13//! - **Layer 3** — high-level wiring (`Graph::attach_storage`, M4.E
14//! integration).
15//!
16//! All sub-traits inherit [`BaseStorageTier`], which carries the cadence
17//! knobs (`debounce_ms`, `compact_every`), transaction lifecycle (`flush`,
18//! `rollback`), and the dyn-safe bytes-level
19//! [`BaseStorageTier::list_by_prefix_bytes`] enumeration. Typed enumeration
20//! (decoding via the tier's codec) lives on the typed sub-traits as free
21//! functions or default-impl helpers.
22//!
23//! # Sync, NOT async (D143)
24//!
25//! Every method returns directly (no `Future`). Memory / redb / `std::fs`
26//! backends are all sync-compatible; tokio-backed networking backends wrap
27//! their async surface at the adapter layer (e.g. `tokio::Handle::block_on`).
28//! See [`crate::backend`] module doc.
29//!
30//! # `debounce_ms` semantics at M4.B (D144 — option (b))
31//!
32//! The accessor [`BaseStorageTier::debounce_ms`] returns the configured
33//! window. **The tier itself does NOT drive a timer.** The Graph layer
34//! consumes this value at attach time (M4.E) and schedules `flush()` via its
35//! own reactive timer source (`from_timer` / `from_cron`). Until then,
36//! `save` always buffers and `flush` always commits — debounce has no
37//! automatic effect.
38
39use crate::backend::StorageBackend;
40use crate::error::StorageError;
41
42/// Boxed lazy iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`].
43/// Each item is either a `(key, bytes)` entry decoded from the backend or a
44/// surfaced [`StorageError`] (e.g. on first-yield if the backend doesn't
45/// support enumeration — lazy-throw semantics from the TS impl).
46pub type ListByPrefixIter<'a> =
47 Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a>;
48
49// ── Layer 2 — BaseStorageTier (cadence + transaction surface) ─────────────
50
51/// Common tier surface — cadence knobs + transaction lifecycle + bytes-level
52/// enumeration.
53///
54/// Implements Audit 4's "one wave = one transaction" model:
55/// - After every wave (and on `batch()` close), Graph iterates attached tiers
56/// and calls `tier.flush()`.
57/// - On wave-throw, Graph calls `tier.rollback()` to discard pending writes.
58/// - Cross-tier atomicity is best-effort; each tier owns its own transaction.
59///
60/// Lock 4.D / 6.E defaults:
61/// - `debounce_ms = None` — sync-through flush at wave close.
62/// - `compact_every = None` — no forced flush cap.
63pub trait BaseStorageTier: Send + Sync {
64 /// Diagnostic tier name (e.g. `"snapshot:my-graph"`). Surfaces in error
65 /// messages and `Display` impls.
66 fn name(&self) -> &str;
67
68 /// Debounce window in milliseconds. `None` (default) = sync-through.
69 /// Graph-layer attach reads this and schedules timed `flush()` via its
70 /// own reactive timer (M4.E); the tier itself does NOT drive a timer.
71 fn debounce_ms(&self) -> Option<u32> {
72 None
73 }
74
75 /// Force a flush every Nth accepted write regardless of debounce.
76 /// `None` = no cap; `Some(N)` triggers `flush()` on the Nth `save` (or
77 /// `append_entries` for log tiers).
78 fn compact_every(&self) -> Option<u32> {
79 None
80 }
81
82 /// Commit pending writes. Graph calls at wave-close / debounce-fire.
83 fn flush(&self) -> Result<(), StorageError>;
84
85 /// Discard pending writes. Graph calls on wave-throw.
86 fn rollback(&self) -> Result<(), StorageError>;
87
88 /// Lazily enumerate raw `(key, bytes)` pairs under a literal byte-prefix
89 /// (DS-14-storage Q5 lock).
90 ///
91 /// Yields in lex-ASC key order; for the canonical WAL key format
92 /// `${prefix}/${frame_seq:020}`, lex-ASC string sort = numeric ASC
93 /// `frame_seq` sort.
94 ///
95 /// Typed enumeration (decode via the tier's codec) lives on the typed
96 /// sub-traits as free helpers — see [`crate::wal::iterate_wal_frames`]
97 /// (M4.E) for the WAL-aware pattern. Decoupling the enumeration from
98 /// the codec keeps this method dyn-safe (avoids a generic method on the
99 /// trait).
100 ///
101 /// Backends without `list` support surface
102 /// [`StorageError::BackendNoListSupport`] on first iteration.
103 fn list_by_prefix_bytes<'a>(&'a self, prefix: &str) -> ListByPrefixIter<'a>;
104
105 /// Force a `mode:"full"` baseline immediately (Q8 lock). Bypasses
106 /// `compact_every` cadence; useful at deploy boundaries, end-of-process
107 /// drains, or test fixtures. Default impl is a `flush()` so tiers that
108 /// don't write baselines can still implement the method trivially.
109 fn compact(&self) -> Result<(), StorageError> {
110 self.flush()
111 }
112}
113
114// ── Layer 2 — Snapshot tier ───────────────────────────────────────────────
115
116/// Snapshot tier — writes a single record per `save(snapshot)` call. Mirrors
117/// TS `SnapshotStorageTier<T>`.
118///
119/// Backend key is determined by the tier's `key_of` closure (default: a
120/// constant `tier.name`). Successive saves overwrite the same key.
121pub trait SnapshotStorageTier<T>: BaseStorageTier
122where
123 T: Send + Sync + 'static,
124{
125 /// Buffer a snapshot pending flush. Honors `compact_every` and
126 /// `debounce_ms` semantics per the [`BaseStorageTier`] contract.
127 fn save(&self, snapshot: T) -> Result<(), StorageError>;
128
129 /// Load the most-recently-saved snapshot. Returns `Ok(None)` if no
130 /// snapshot has been persisted yet.
131 fn load(&self) -> Result<Option<T>, StorageError>;
132}
133
134// ── Layer 2 — Append-log tier ─────────────────────────────────────────────
135
136/// **D269 — Persistence mode (memo:Re P1 parity).** Mirrors TS
137/// `AppendLogStorageOptions.mode`. `Append` (default) reads existing
138/// bucket bytes, decodes, merges new entries, encodes, writes back —
139/// the M4.B behavior. `Overwrite` skips the read/merge entirely and
140/// snapshots the current batch as the bucket's full contents. Used
141/// for callers that ship full snapshots per wave (e.g. WAL replay
142/// drivers) rather than deltas. Feeding deltas into an `Overwrite`
143/// tier silently truncates the log to the last batch — `attach_storage`
144/// rejects overwrite sinks at attachment time.
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
146pub enum AppendLogMode {
147 /// Read existing bucket, merge new entries, write back. M4.B default.
148 #[default]
149 Append,
150 /// Replace bucket contents with the current batch (no read-merge).
151 Overwrite,
152}
153
154/// **D269 — Opaque cursor for windowed `load_entries` pagination
155/// (memo:Re loadEntries-pagination parity).** Mirrors TS `AppendCursor`.
156/// `position` is a forward-only offset into the flattened, lex-ASC-by-
157/// key, entry-order-within-key sequence. `tag` is reserved for future
158/// stable-iteration tokens; currently always `None`.
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
160pub struct AppendCursor {
161 pub position: u64,
162 pub tag: Option<u64>,
163}
164
165impl AppendCursor {
166 #[must_use]
167 pub const fn from_position(position: u64) -> Self {
168 Self {
169 position,
170 tag: None,
171 }
172 }
173}
174
175/// **D269** — Options for [`AppendLogStorageTier::load_entries`].
176#[derive(Debug, Clone, Default)]
177pub struct LoadEntriesOpts<'a> {
178 pub cursor: Option<AppendCursor>,
179 pub page_size: Option<u32>,
180 pub key_filter: Option<&'a str>,
181}
182
183/// **D269** — Result of a paginated [`AppendLogStorageTier::load_entries`].
184/// `cursor.is_none()` ⇒ no more entries (consumer should stop). Returning
185/// the whole tail with `cursor.is_none()` matches the back-compat shape
186/// (bare `load_entries(LoadEntriesOpts::default())` is byte-for-byte the
187/// pre-D269 behavior).
188#[must_use = "AppendLoadResult.cursor must be threaded into the next load_entries call; ignoring it voids the pagination contract"]
189#[derive(Debug, Clone)]
190pub struct AppendLoadResult<T> {
191 pub entries: Vec<T>,
192 pub cursor: Option<AppendCursor>,
193}
194
195/// Append-log tier — bulk-friendly entry persistence with optional
196/// partitioning via `key_of`. Mirrors TS `AppendLogStorageTier<T>`.
197///
198/// Storage shape: each backend key holds a serialized array of all entries
199/// for that partition, growing on every flush. Adapters that need true
200/// append-only semantics layer their own tier over the same backend.
201pub trait AppendLogStorageTier<T>: BaseStorageTier
202where
203 T: Send + Sync + 'static,
204{
205 /// Append entries to the per-key buckets (each bucket determined by the
206 /// tier's `key_of` closure). Honors `compact_every` cadence.
207 fn append_entries(&self, entries: &[T]) -> Result<(), StorageError>;
208
209 /// D269: persistence mode (`Append` default — read-merge; `Overwrite` —
210 /// replace bucket per flush). Exposed so delta-shipping consumers like
211 /// `ReactiveLog::attach_storage` can reject `Overwrite` tiers at
212 /// attach time.
213 fn mode(&self) -> AppendLogMode {
214 AppendLogMode::Append
215 }
216
217 /// D269 — windowed cursor pagination (memo:Re loadEntries-pagination
218 /// parity). With `LoadEntriesOpts::default()` returns the whole log
219 /// (back-compat shape) and `cursor: None`. With `page_size = Some(n)`
220 /// returns `[start, start+n)` of the flattened (lex-ASC-by-key,
221 /// entry-order-within-key) sequence and a forward-only cursor
222 /// (`None` ⇒ no more). Pre-D269 callers using
223 /// `load_entries_legacy(key_filter)` get the old signature.
224 fn load_entries(&self, opts: LoadEntriesOpts<'_>) -> Result<AppendLoadResult<T>, StorageError>;
225
226 /// Pre-D269 convenience: load all entries (no pagination, no cursor).
227 /// Equivalent to `load_entries(LoadEntriesOpts { key_filter, .. })`.
228 fn load_entries_all(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError> {
229 Ok(self
230 .load_entries(LoadEntriesOpts {
231 cursor: None,
232 page_size: None,
233 key_filter,
234 })?
235 .entries)
236 }
237}
238
239// ── Layer 2 — KV tier ─────────────────────────────────────────────────────
240
241/// Key-value tier — typed records under arbitrary string keys with codec
242/// serialization at the storage boundary. Mirrors TS `KvStorageTier<T>`.
243///
244/// Use for content-addressed caches (replay), multi-record archives
245/// (snapshot index, AI memory), and fixture stores. Snapshot is "one
246/// record"; append-log is "sequential entries"; kv is "many records,
247/// addressable by key".
248pub trait KvStorageTier<T>: BaseStorageTier
249where
250 T: Send + Sync + 'static,
251{
252 /// Buffer a key→value mapping pending flush. Repeated `save(k, _)`
253 /// before flush overwrites the buffered value for that key. Honors
254 /// `compact_every` cadence.
255 fn save(&self, key: &str, value: T) -> Result<(), StorageError>;
256
257 /// Read the value at `key`. Returns `Ok(None)` on miss.
258 fn load(&self, key: &str) -> Result<Option<T>, StorageError>;
259
260 /// Delete the value at `key`. Flushes through to the backend
261 /// immediately (matches TS `delete` semantics — no debounce).
262 fn delete(&self, key: &str) -> Result<(), StorageError>;
263
264 /// Enumerate keys under `prefix` (lex-ASC). Delegates to
265 /// [`StorageBackend::list`] via the tier's backend.
266 fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError>;
267}
268
269// ── Shared bytes-level prefix-iter helper ─────────────────────────────────
270
271/// Iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`] for tiers
272/// backed by a generic `StorageBackend`. Reads the key list eagerly (the
273/// backend's `list` is sync); reads + yields each entry lazily as the
274/// consumer pulls.
275///
276/// The eager-list / lazy-read split matches the TS impl behavior: the
277/// backend may have to walk a directory or query an index to produce keys
278/// (one round-trip); reading the bytes per key is the dominant cost and
279/// should stay lazy.
280pub(crate) struct PrefixIter<'a, B: StorageBackend + ?Sized> {
281 backend: &'a B,
282 keys: std::vec::IntoIter<String>,
283 /// First-yield error (e.g. backend doesn't support list). Stored on
284 /// construction so the consumer sees it on the first `next()` call —
285 /// mirrors TS lazy-throw semantics for `backend-no-list-support`.
286 pending_error: Option<StorageError>,
287}
288
289impl<'a, B: StorageBackend + ?Sized> PrefixIter<'a, B> {
290 pub(crate) fn new(backend: &'a B, prefix: &str) -> Self {
291 match backend.list(prefix) {
292 Ok(mut keys) => {
293 // Filter to literal byte-prefix matches (defensive — some
294 // backends may return overly-wide lists) and sort lex-ASC.
295 keys.retain(|k| k.starts_with(prefix));
296 keys.sort();
297 Self {
298 backend,
299 keys: keys.into_iter(),
300 pending_error: None,
301 }
302 }
303 Err(e) => Self {
304 backend,
305 keys: Vec::new().into_iter(),
306 pending_error: Some(e),
307 },
308 }
309 }
310}
311
312impl<B: StorageBackend + ?Sized> Iterator for PrefixIter<'_, B> {
313 type Item = Result<(String, Vec<u8>), StorageError>;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 if let Some(e) = self.pending_error.take() {
317 return Some(Err(e));
318 }
319 loop {
320 let key = self.keys.next()?;
321 match self.backend.read(&key) {
322 Ok(Some(bytes)) if !bytes.is_empty() => return Some(Ok((key, bytes))),
323 // Empty / absent entries are skipped (the key was listed but
324 // is no longer readable — race with a concurrent delete, or
325 // a backend that holds empty placeholders).
326 Ok(_) => {}
327 Err(e) => return Some(Err(e)),
328 }
329 }
330 }
331}
332
333// Cross-tier helper traits (e.g. `HasBackend` / `HasCodec` for typed
334// list-by-prefix decoders) land in M4.E when the Graph integration actually
335// consumes them — keeping speculative infrastructure out of M4.B.
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::backend::MemoryBackend;
341
342 /// Compile-time check: all four tier traits are object-safe so callers
343 /// can store heterogeneous tiers in `Vec<Box<dyn ...>>`.
344 #[test]
345 fn tier_traits_are_dyn_safe() {
346 fn assert_dyn<T: ?Sized>() {}
347 assert_dyn::<dyn BaseStorageTier>();
348 assert_dyn::<dyn SnapshotStorageTier<u64>>();
349 assert_dyn::<dyn AppendLogStorageTier<u64>>();
350 assert_dyn::<dyn KvStorageTier<u64>>();
351 }
352
353 #[test]
354 fn prefix_iter_yields_lex_asc() {
355 let b = MemoryBackend::new();
356 b.write("g/02", b"two").unwrap();
357 b.write("g/01", b"one").unwrap();
358 b.write("g/10", b"ten").unwrap();
359 b.write("other", b"x").unwrap();
360 let iter = PrefixIter::new(&b, "g/");
361 let collected: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
362 assert_eq!(
363 collected,
364 vec![
365 ("g/01".to_string(), b"one".to_vec()),
366 ("g/02".to_string(), b"two".to_vec()),
367 ("g/10".to_string(), b"ten".to_vec()),
368 ],
369 );
370 }
371
372 #[test]
373 fn prefix_iter_surfaces_backend_no_list_support_lazily() {
374 struct NoList;
375 impl StorageBackend for NoList {
376 fn name(&self) -> &'static str {
377 "no-list"
378 }
379 fn read(&self, _key: &str) -> Result<Option<Vec<u8>>, StorageError> {
380 Ok(None)
381 }
382 fn write(&self, _k: &str, _b: &[u8]) -> Result<(), StorageError> {
383 Ok(())
384 }
385 }
386 let b = NoList;
387 // Construction succeeds — error is deferred to first `next()`.
388 let mut iter = PrefixIter::new(&b, "g/");
389 let first = iter.next();
390 assert!(matches!(
391 first,
392 Some(Err(StorageError::BackendNoListSupport { .. }))
393 ));
394 // Subsequent `next()` returns `None` (iterator exhausted).
395 assert!(iter.next().is_none());
396 }
397}