graphrefly_storage/tier.rs
1//! Tier abstraction layer (Phase 14.6 — DS-14-storage Audit 4 + Q5 + Q8
2//! locks, M4.B 2026-05-10).
3//!
4//! Three-layer tier model (matching `packages/pure-ts/src/extra/storage/tiers.ts`):
5//!
6//! - **Layer 1** — [`StorageBackend`](crate::backend::StorageBackend): generic
7//! bytes-level kv I/O.
8//! - **Layer 2** — tier specializations parametric over `T`:
9//! - [`SnapshotStorageTier<T>`] — one record per `save(snapshot)` call.
10//! - [`AppendLogStorageTier<T>`] — sequential entries with optional
11//! partitioning via `key_of`.
12//! - [`KvStorageTier<T>`] — many records, addressable by string key.
13//! - **Layer 3** — high-level wiring (`Graph::attach_storage`, M4.E
14//! integration).
15//!
16//! All sub-traits inherit [`BaseStorageTier`], which carries the cadence
17//! knobs (`debounce_ms`, `compact_every`), transaction lifecycle (`flush`,
18//! `rollback`), and the dyn-safe bytes-level
19//! [`BaseStorageTier::list_by_prefix_bytes`] enumeration. Typed enumeration
20//! (decoding via the tier's codec) lives on the typed sub-traits as free
21//! functions or default-impl helpers.
22//!
23//! # Sync, NOT async (D143)
24//!
25//! Every method returns directly (no `Future`). Memory / redb / `std::fs`
26//! backends are all sync-compatible; tokio-backed networking backends wrap
27//! their async surface at the adapter layer (e.g. `tokio::Handle::block_on`).
28//! See [`crate::backend`] module doc.
29//!
30//! # `debounce_ms` semantics at M4.B (D144 — option (b))
31//!
32//! The accessor [`BaseStorageTier::debounce_ms`] returns the configured
33//! window. **The tier itself does NOT drive a timer.** The Graph layer
34//! consumes this value at attach time (M4.E) and schedules `flush()` via its
35//! own reactive timer source (`from_timer` / `from_cron`). Until then,
36//! `save` always buffers and `flush` always commits — debounce has no
37//! automatic effect.
38
39use crate::backend::StorageBackend;
40use crate::error::StorageError;
41
42/// Boxed lazy iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`].
43/// Each item is either a `(key, bytes)` entry decoded from the backend or a
44/// surfaced [`StorageError`] (e.g. on first-yield if the backend doesn't
45/// support enumeration — lazy-throw semantics from the TS impl).
46pub type ListByPrefixIter<'a> =
47 Box<dyn Iterator<Item = Result<(String, Vec<u8>), StorageError>> + 'a>;
48
49// ── Layer 2 — BaseStorageTier (cadence + transaction surface) ─────────────
50
51/// Common tier surface — cadence knobs + transaction lifecycle + bytes-level
52/// enumeration.
53///
54/// Implements Audit 4's "one wave = one transaction" model:
55/// - After every wave (and on `batch()` close), Graph iterates attached tiers
56/// and calls `tier.flush()`.
57/// - On wave-throw, Graph calls `tier.rollback()` to discard pending writes.
58/// - Cross-tier atomicity is best-effort; each tier owns its own transaction.
59///
60/// Lock 4.D / 6.E defaults:
61/// - `debounce_ms = None` — sync-through flush at wave close.
62/// - `compact_every = None` — no forced flush cap.
63pub trait BaseStorageTier: Send + Sync {
64 /// Diagnostic tier name (e.g. `"snapshot:my-graph"`). Surfaces in error
65 /// messages and `Display` impls.
66 fn name(&self) -> &str;
67
68 /// Debounce window in milliseconds. `None` (default) = sync-through.
69 /// Graph-layer attach reads this and schedules timed `flush()` via its
70 /// own reactive timer (M4.E); the tier itself does NOT drive a timer.
71 fn debounce_ms(&self) -> Option<u32> {
72 None
73 }
74
75 /// Force a flush every Nth accepted write regardless of debounce.
76 /// `None` = no cap; `Some(N)` triggers `flush()` on the Nth `save` (or
77 /// `append_entries` for log tiers).
78 fn compact_every(&self) -> Option<u32> {
79 None
80 }
81
82 /// Commit pending writes. Graph calls at wave-close / debounce-fire.
83 fn flush(&self) -> Result<(), StorageError>;
84
85 /// Discard pending writes. Graph calls on wave-throw.
86 fn rollback(&self) -> Result<(), StorageError>;
87
88 /// Lazily enumerate raw `(key, bytes)` pairs under a literal byte-prefix
89 /// (DS-14-storage Q5 lock).
90 ///
91 /// Yields in lex-ASC key order; for the canonical WAL key format
92 /// `${prefix}/${frame_seq:020}`, lex-ASC string sort = numeric ASC
93 /// `frame_seq` sort.
94 ///
95 /// Typed enumeration (decode via the tier's codec) lives on the typed
96 /// sub-traits as free helpers — see [`crate::wal::iterate_wal_frames`]
97 /// (M4.E) for the WAL-aware pattern. Decoupling the enumeration from
98 /// the codec keeps this method dyn-safe (avoids a generic method on the
99 /// trait).
100 ///
101 /// Backends without `list` support surface
102 /// [`StorageError::BackendNoListSupport`] on first iteration.
103 fn list_by_prefix_bytes<'a>(&'a self, prefix: &str) -> ListByPrefixIter<'a>;
104
105 /// Force a `mode:"full"` baseline immediately (Q8 lock). Bypasses
106 /// `compact_every` cadence; useful at deploy boundaries, end-of-process
107 /// drains, or test fixtures. Default impl is a `flush()` so tiers that
108 /// don't write baselines can still implement the method trivially.
109 fn compact(&self) -> Result<(), StorageError> {
110 self.flush()
111 }
112}
113
114// ── Layer 2 — Snapshot tier ───────────────────────────────────────────────
115
116/// Snapshot tier — writes a single record per `save(snapshot)` call. Mirrors
117/// TS `SnapshotStorageTier<T>`.
118///
119/// Backend key is determined by the tier's `key_of` closure (default: a
120/// constant `tier.name`). Successive saves overwrite the same key.
121pub trait SnapshotStorageTier<T>: BaseStorageTier
122where
123 T: Send + Sync + 'static,
124{
125 /// Buffer a snapshot pending flush. Honors `compact_every` and
126 /// `debounce_ms` semantics per the [`BaseStorageTier`] contract.
127 fn save(&self, snapshot: T) -> Result<(), StorageError>;
128
129 /// Load the most-recently-saved snapshot. Returns `Ok(None)` if no
130 /// snapshot has been persisted yet.
131 fn load(&self) -> Result<Option<T>, StorageError>;
132}
133
134// ── Layer 2 — Append-log tier ─────────────────────────────────────────────
135
136/// Append-log tier — bulk-friendly entry persistence with optional
137/// partitioning via `key_of`. Mirrors TS `AppendLogStorageTier<T>`.
138///
139/// Storage shape: each backend key holds a serialized array of all entries
140/// for that partition, growing on every flush. Adapters that need true
141/// append-only semantics layer their own tier over the same backend.
142pub trait AppendLogStorageTier<T>: BaseStorageTier
143where
144 T: Send + Sync + 'static,
145{
146 /// Append entries to the per-key buckets (each bucket determined by the
147 /// tier's `key_of` closure). Honors `compact_every` cadence.
148 fn append_entries(&self, entries: &[T]) -> Result<(), StorageError>;
149
150 /// Load all entries across all known keys (or filtered by `key_filter`).
151 /// Eager — for paginated reads, a future cursor-based API can be added.
152 fn load_entries(&self, key_filter: Option<&str>) -> Result<Vec<T>, StorageError>;
153}
154
155// ── Layer 2 — KV tier ─────────────────────────────────────────────────────
156
157/// Key-value tier — typed records under arbitrary string keys with codec
158/// serialization at the storage boundary. Mirrors TS `KvStorageTier<T>`.
159///
160/// Use for content-addressed caches (replay), multi-record archives
161/// (snapshot index, AI memory), and fixture stores. Snapshot is "one
162/// record"; append-log is "sequential entries"; kv is "many records,
163/// addressable by key".
164pub trait KvStorageTier<T>: BaseStorageTier
165where
166 T: Send + Sync + 'static,
167{
168 /// Buffer a key→value mapping pending flush. Repeated `save(k, _)`
169 /// before flush overwrites the buffered value for that key. Honors
170 /// `compact_every` cadence.
171 fn save(&self, key: &str, value: T) -> Result<(), StorageError>;
172
173 /// Read the value at `key`. Returns `Ok(None)` on miss.
174 fn load(&self, key: &str) -> Result<Option<T>, StorageError>;
175
176 /// Delete the value at `key`. Flushes through to the backend
177 /// immediately (matches TS `delete` semantics — no debounce).
178 fn delete(&self, key: &str) -> Result<(), StorageError>;
179
180 /// Enumerate keys under `prefix` (lex-ASC). Delegates to
181 /// [`StorageBackend::list`] via the tier's backend.
182 fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError>;
183}
184
185// ── Shared bytes-level prefix-iter helper ─────────────────────────────────
186
187/// Iterator yielded by [`BaseStorageTier::list_by_prefix_bytes`] for tiers
188/// backed by a generic `StorageBackend`. Reads the key list eagerly (the
189/// backend's `list` is sync); reads + yields each entry lazily as the
190/// consumer pulls.
191///
192/// The eager-list / lazy-read split matches the TS impl behavior: the
193/// backend may have to walk a directory or query an index to produce keys
194/// (one round-trip); reading the bytes per key is the dominant cost and
195/// should stay lazy.
196pub(crate) struct PrefixIter<'a, B: StorageBackend + ?Sized> {
197 backend: &'a B,
198 keys: std::vec::IntoIter<String>,
199 /// First-yield error (e.g. backend doesn't support list). Stored on
200 /// construction so the consumer sees it on the first `next()` call —
201 /// mirrors TS lazy-throw semantics for `backend-no-list-support`.
202 pending_error: Option<StorageError>,
203}
204
205impl<'a, B: StorageBackend + ?Sized> PrefixIter<'a, B> {
206 pub(crate) fn new(backend: &'a B, prefix: &str) -> Self {
207 match backend.list(prefix) {
208 Ok(mut keys) => {
209 // Filter to literal byte-prefix matches (defensive — some
210 // backends may return overly-wide lists) and sort lex-ASC.
211 keys.retain(|k| k.starts_with(prefix));
212 keys.sort();
213 Self {
214 backend,
215 keys: keys.into_iter(),
216 pending_error: None,
217 }
218 }
219 Err(e) => Self {
220 backend,
221 keys: Vec::new().into_iter(),
222 pending_error: Some(e),
223 },
224 }
225 }
226}
227
228impl<B: StorageBackend + ?Sized> Iterator for PrefixIter<'_, B> {
229 type Item = Result<(String, Vec<u8>), StorageError>;
230
231 fn next(&mut self) -> Option<Self::Item> {
232 if let Some(e) = self.pending_error.take() {
233 return Some(Err(e));
234 }
235 loop {
236 let key = self.keys.next()?;
237 match self.backend.read(&key) {
238 Ok(Some(bytes)) if !bytes.is_empty() => return Some(Ok((key, bytes))),
239 // Empty / absent entries are skipped (the key was listed but
240 // is no longer readable — race with a concurrent delete, or
241 // a backend that holds empty placeholders).
242 Ok(_) => {}
243 Err(e) => return Some(Err(e)),
244 }
245 }
246 }
247}
248
249// Cross-tier helper traits (e.g. `HasBackend` / `HasCodec` for typed
250// list-by-prefix decoders) land in M4.E when the Graph integration actually
251// consumes them — keeping speculative infrastructure out of M4.B.
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use crate::backend::MemoryBackend;
257
258 /// Compile-time check: all four tier traits are object-safe so callers
259 /// can store heterogeneous tiers in `Vec<Box<dyn ...>>`.
260 #[test]
261 fn tier_traits_are_dyn_safe() {
262 fn assert_dyn<T: ?Sized>() {}
263 assert_dyn::<dyn BaseStorageTier>();
264 assert_dyn::<dyn SnapshotStorageTier<u64>>();
265 assert_dyn::<dyn AppendLogStorageTier<u64>>();
266 assert_dyn::<dyn KvStorageTier<u64>>();
267 }
268
269 #[test]
270 fn prefix_iter_yields_lex_asc() {
271 let b = MemoryBackend::new();
272 b.write("g/02", b"two").unwrap();
273 b.write("g/01", b"one").unwrap();
274 b.write("g/10", b"ten").unwrap();
275 b.write("other", b"x").unwrap();
276 let iter = PrefixIter::new(&b, "g/");
277 let collected: Vec<_> = iter.collect::<Result<Vec<_>, _>>().unwrap();
278 assert_eq!(
279 collected,
280 vec![
281 ("g/01".to_string(), b"one".to_vec()),
282 ("g/02".to_string(), b"two".to_vec()),
283 ("g/10".to_string(), b"ten".to_vec()),
284 ],
285 );
286 }
287
288 #[test]
289 fn prefix_iter_surfaces_backend_no_list_support_lazily() {
290 struct NoList;
291 impl StorageBackend for NoList {
292 fn name(&self) -> &'static str {
293 "no-list"
294 }
295 fn read(&self, _key: &str) -> Result<Option<Vec<u8>>, StorageError> {
296 Ok(None)
297 }
298 fn write(&self, _k: &str, _b: &[u8]) -> Result<(), StorageError> {
299 Ok(())
300 }
301 }
302 let b = NoList;
303 // Construction succeeds — error is deferred to first `next()`.
304 let mut iter = PrefixIter::new(&b, "g/");
305 let first = iter.next();
306 assert!(matches!(
307 first,
308 Some(Err(StorageError::BackendNoListSupport { .. }))
309 ));
310 // Subsequent `next()` returns `None` (iterator exhausted).
311 assert!(iter.next().is_none());
312 }
313}