graphrefly_storage/graph_integration.rs
1//! Graph-level storage integration (M4.E2 — D170–D174).
2//!
3//! Free functions (not `Graph` methods) because `graphrefly-graph` does not
4//! depend on `graphrefly-storage` — the dependency flows in the other
5//! direction. See D170.
6//!
7//! # Provided APIs
8//!
9//! - [`GraphCheckpointRecord`] — portable baseline type wrapping a
10//! [`GraphPersistSnapshot`] + seq metadata + `format_version` (F7 close).
11//! - [`diff_snapshots`] / [`decompose_diff_to_frames`] — snapshot diff →
12//! WAL frame generation engine (D172).
13//! - [`attach_snapshot_storage`] + [`StorageHandle`] — wire observe
14//! subscription → snapshot diff → WAL frame writes.
15//! - [`restore_snapshot`] — three-phase replay (baseline → checksum verify
16//! → lifecycle-scoped batch).
17//!
18//! # Manifest (D173 — F4 close)
19//!
20//! F4 (cross-restart key recovery) is structurally closed by D174: at the
21//! attach boundary, `key_of` is derived deterministically from
22//! `graph.name`. On restore, the snapshot key is known without a separate
23//! manifest entry. The checkpoint record's `seq` field serves as the WAL
24//! high-water mark.
25
26use std::sync::{Arc, Mutex};
27
28use graphrefly_core::{Core, CoreFull, Message};
29use graphrefly_graph::{Graph, GraphObserveAllReactive, GraphPersistSnapshot, NodeSlice};
30use graphrefly_structures::{BaseChange, Lifecycle, Version};
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33
34use crate::error::{PhaseStat, RestoreError, RestoreResult, StorageError};
35use crate::tier::{KvStorageTier, SnapshotStorageTier};
36use crate::wal::{
37 graph_wal_prefix, verify_wal_frame_checksum, wal_frame_checksum, wal_frame_key, WALFrame,
38 WalTag, REPLAY_ORDER,
39};
40
41// ── Constants ──────────────────────────────────────────────────────────────
42
43/// Current snapshot format version. Embedded in [`GraphCheckpointRecord`]
44/// and in `BaseChange.version` within decomposed WAL frames (F7 close).
45pub const SNAPSHOT_VERSION: u64 = 1;
46
47// ── Types ──────────────────────────────────────────────────────────────────
48
49/// Portable baseline record written by [`attach_snapshot_storage`] on
50/// full-snapshot writes. Contains the full [`GraphPersistSnapshot`] plus
51/// metadata for WAL cursor alignment.
52///
53/// The `format_version` field closes F7 (missing `format_version` on
54/// `WALFrame` and checkpoint records).
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GraphCheckpointRecord {
57 /// Graph name (matches `snapshot.name`).
58 pub name: String,
59 /// Snapshot mode — `"full"` for baseline, `"diff"` reserved for future
60 /// incremental baselines.
61 pub mode: String,
62 /// The complete graph state.
63 pub snapshot: GraphPersistSnapshot,
64 /// WAL-tier cursor at baseline write time. Frames with `frame_seq >
65 /// seq` are the delta.
66 pub seq: u64,
67 /// Wall-clock timestamp at baseline write time.
68 pub timestamp_ns: u64,
69 /// Format version (F7 close).
70 pub format_version: u64,
71}
72
73// ── Diff engine (D172) ─────────────────────────────────────────────────────
74
75/// Structural diff between two [`GraphPersistSnapshot`]s.
76#[derive(Debug, Clone)]
77pub struct GraphSnapshotDiff {
78 /// Node names present in `after` but not `before`.
79 pub nodes_added: Vec<String>,
80 /// Full slices for added nodes (parallel to `nodes_added`).
81 pub nodes_added_slices: Vec<NodeSlice>,
82 /// Node names present in `before` but not `after`.
83 pub nodes_removed: Vec<String>,
84 /// Nodes whose `value` field changed between snapshots.
85 pub value_changes: Vec<ValueChange>,
86 /// Subgraph mount names added.
87 pub subgraphs_added: Vec<String>,
88 /// Subgraph mount names removed.
89 pub subgraphs_removed: Vec<String>,
90}
91
92impl GraphSnapshotDiff {
93 /// True when no structural or value changes were detected.
94 #[must_use]
95 pub fn is_empty(&self) -> bool {
96 self.nodes_added.is_empty()
97 && self.nodes_removed.is_empty()
98 && self.value_changes.is_empty()
99 && self.subgraphs_added.is_empty()
100 && self.subgraphs_removed.is_empty()
101 }
102}
103
104/// A single node value change detected by [`diff_snapshots`].
105#[derive(Debug, Clone)]
106pub struct ValueChange {
107 /// Node path that changed.
108 pub path: String,
109 /// New value. `None` means the node transitioned to sentinel (INVALIDATE).
110 pub to: Option<Value>,
111}
112
113/// Compare two snapshots and produce a structural diff.
114///
115/// Only examines the top-level namespace (not recursive into subgraphs —
116/// subgraph diffs are handled by the attach wiring per-subgraph).
117#[must_use]
118pub fn diff_snapshots(
119 before: &GraphPersistSnapshot,
120 after: &GraphPersistSnapshot,
121) -> GraphSnapshotDiff {
122 let mut nodes_added = Vec::new();
123 let mut nodes_added_slices = Vec::new();
124 let mut nodes_removed = Vec::new();
125 let mut value_changes = Vec::new();
126 let mut subgraphs_added = Vec::new();
127 let mut subgraphs_removed = Vec::new();
128
129 // Nodes added or changed.
130 for (name, after_slice) in &after.nodes {
131 if let Some(before_slice) = before.nodes.get(name) {
132 if before_slice.value != after_slice.value {
133 value_changes.push(ValueChange {
134 path: name.clone(),
135 to: after_slice.value.clone(),
136 });
137 }
138 } else {
139 nodes_added.push(name.clone());
140 nodes_added_slices.push(after_slice.clone());
141 }
142 }
143
144 // Nodes removed.
145 for name in before.nodes.keys() {
146 if !after.nodes.contains_key(name) {
147 nodes_removed.push(name.clone());
148 }
149 }
150
151 // Subgraphs added/removed.
152 for name in after.subgraphs.keys() {
153 if !before.subgraphs.contains_key(name) {
154 subgraphs_added.push(name.clone());
155 }
156 }
157 for name in before.subgraphs.keys() {
158 if !after.subgraphs.contains_key(name) {
159 subgraphs_removed.push(name.clone());
160 }
161 }
162
163 GraphSnapshotDiff {
164 nodes_added,
165 nodes_added_slices,
166 nodes_removed,
167 value_changes,
168 subgraphs_added,
169 subgraphs_removed,
170 }
171}
172
173/// Intermediate frame before checksum stamping.
174struct DecomposedFrame {
175 lifecycle: Lifecycle,
176 path: String,
177 change: BaseChange<Value>,
178}
179
180/// Convert a [`GraphSnapshotDiff`] into WAL frames ready for persistence.
181///
182/// `timestamp_ns` is the wall-clock at diff time. `base_seq` is the WAL
183/// cursor; returned frames have `frame_seq` values starting at
184/// `base_seq + 1`.
185///
186/// Returns `(frames, next_seq)` where `next_seq` is the highest assigned
187/// `frame_seq`.
188pub fn decompose_diff_to_frames(
189 diff: &GraphSnapshotDiff,
190 timestamp_ns: u64,
191 base_seq: u64,
192) -> Result<(Vec<WALFrame<Value>>, u64), StorageError> {
193 let mut decomposed = Vec::new();
194
195 let wrap = |structure: &str, lifecycle: Lifecycle, payload: Value| -> BaseChange<Value> {
196 BaseChange {
197 structure: structure.to_owned(),
198 version: Version::Counter(SNAPSHOT_VERSION),
199 t_ns: timestamp_ns,
200 seq: None,
201 lifecycle,
202 change: payload,
203 }
204 };
205
206 // Spec lifecycle: node add/remove, subgraph mount/unmount.
207 for (i, name) in diff.nodes_added.iter().enumerate() {
208 let slice = &diff.nodes_added_slices[i];
209 let payload = serde_json::json!({
210 "kind": "graph.add",
211 "nodeId": name,
212 "slice": serde_json::to_value(slice).map_err(|e|
213 StorageError::Codec(crate::codec::CodecError::Encode(e.to_string()))
214 )?,
215 });
216 decomposed.push(DecomposedFrame {
217 lifecycle: Lifecycle::Spec,
218 path: name.clone(),
219 change: wrap("graph.spec", Lifecycle::Spec, payload),
220 });
221 }
222
223 for name in &diff.nodes_removed {
224 let payload = serde_json::json!({
225 "kind": "graph.remove",
226 "nodeId": name,
227 });
228 decomposed.push(DecomposedFrame {
229 lifecycle: Lifecycle::Spec,
230 path: name.clone(),
231 change: wrap("graph.spec", Lifecycle::Spec, payload),
232 });
233 }
234
235 for name in &diff.subgraphs_added {
236 let payload = serde_json::json!({
237 "kind": "graph.mount",
238 "path": name,
239 "subgraphId": name,
240 });
241 decomposed.push(DecomposedFrame {
242 lifecycle: Lifecycle::Spec,
243 path: name.clone(),
244 change: wrap("graph.spec", Lifecycle::Spec, payload),
245 });
246 }
247
248 for name in &diff.subgraphs_removed {
249 let payload = serde_json::json!({
250 "kind": "graph.unmount",
251 "path": name,
252 });
253 decomposed.push(DecomposedFrame {
254 lifecycle: Lifecycle::Spec,
255 path: name.clone(),
256 change: wrap("graph.spec", Lifecycle::Spec, payload),
257 });
258 }
259
260 // Data lifecycle: value changes.
261 for vc in &diff.value_changes {
262 let payload = if let Some(ref value) = vc.to {
263 serde_json::json!({
264 "kind": "node.set",
265 "path": vc.path,
266 "value": value,
267 })
268 } else {
269 serde_json::json!({
270 "kind": "node.invalidate",
271 "path": vc.path,
272 })
273 };
274 decomposed.push(DecomposedFrame {
275 lifecycle: Lifecycle::Data,
276 path: vc.path.clone(),
277 change: wrap("graph.value", Lifecycle::Data, payload),
278 });
279 }
280
281 // Assign frame_seq and compute checksums.
282 let mut seq = base_seq;
283 let mut frames = Vec::with_capacity(decomposed.len());
284 for d in decomposed {
285 seq += 1;
286 let mut frame = WALFrame {
287 t: WalTag,
288 lifecycle: d.lifecycle,
289 path: d.path,
290 change: d.change,
291 frame_seq: seq,
292 frame_t_ns: timestamp_ns,
293 checksum: String::new(),
294 format_version: 1,
295 };
296 frame.checksum = wal_frame_checksum(&frame)?;
297 frames.push(frame);
298 }
299
300 Ok((frames, seq))
301}
302
303// ── Attach (D170) ──────────────────────────────────────────────────────────
304
305/// Per-tier state managed by the attach wiring.
306struct TierState {
307 /// The snapshot tier (writes full baselines).
308 snapshot_tier: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
309 /// Optional WAL tier (writes individual delta frames).
310 wal_tier: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
311 /// WAL key prefix derived from `graph.name`.
312 wal_prefix: String,
313 /// Monotonic cursor.
314 seq: u64,
315 /// Flush counter for `compact_every` cadence.
316 flush_count: u64,
317 /// Configured compact-every cadence (0 = every flush writes full baseline).
318 compact_every: u32,
319 /// Snapshot tier's `debounce_ms` snapshot at attach time (D171).
320 /// `0` (the unset / disabled case) means inline-flush; `>0` means
321 /// `save()` buffers and the caller drives explicit flush via
322 /// [`StorageHandle::flush_all`].
323 snapshot_debounce_ms: u32,
324 /// WAL tier's `debounce_ms` snapshot at attach time (D171).
325 wal_debounce_ms: u32,
326 /// Last snapshot used for diff computation.
327 last_snapshot: Option<GraphPersistSnapshot>,
328 /// Disposed flag.
329 disposed: bool,
330}
331
332/// Configuration for a single snapshot+WAL tier pair.
333pub struct AttachTierPair {
334 /// Snapshot tier for full baselines.
335 pub snapshot: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
336 /// Optional WAL tier for delta frames. When `None`, every flush
337 /// writes a full baseline (no incremental WAL).
338 pub wal: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
339}
340
341/// Filter predicate for [`AttachOptions`].
342pub type PathFilter = Box<dyn Fn(&str) -> bool + Send + Sync>;
343
344/// Error callback for [`AttachOptions`].
345pub type ErrorCallback = Box<dyn Fn(&StorageError) + Send + Sync>;
346
347/// Options for [`attach_snapshot_storage`].
348#[derive(Default)]
349pub struct AttachOptions {
350 /// Per-path filter. Return `true` to persist changes for this path.
351 /// `None` means persist all paths.
352 pub filter: Option<PathFilter>,
353 /// Error callback invoked when a flush fails.
354 pub on_error: Option<ErrorCallback>,
355}
356
357/// Handle returned by [`attach_snapshot_storage`].
358///
359/// D246 r3: there is **no RAII `Drop`**. The observe subscription is
360/// torn down owner-side, synchronously, by calling
361/// [`StorageHandle::detach`] with the owner's `&Core` (the
362/// [`graphrefly_core::OwnedCore`] borrow). [`StorageHandle::dispose`]
363/// remains a Core-free flag flip that stops late-fire persistence
364/// without unsubscribing (e.g. when the `&Core` is not in hand);
365/// `detach` is the full teardown.
366///
367/// `Graph` is now `Send + Sync + 'static` and Core-free; the held
368/// [`GraphObserveAllReactive`] owns a cheap `Arc`-clone of the graph
369/// (no `'g` borrow), so `StorageHandle` is `'static`.
370pub struct StorageHandle {
371 /// Shared state; inner `disposed` flag prevents late-fire callbacks.
372 state: Arc<Mutex<Vec<TierState>>>,
373 /// The observe handle. `detach(core)` unsubscribes all sinks
374 /// (owner-invoked, synchronous — D246 r3). Wrapped in a `Mutex`
375 /// because `GraphObserveAllReactive::detach` is `&mut self` while
376 /// `StorageHandle` exposes `&self` teardown for ergonomics.
377 observe: Mutex<Option<GraphObserveAllReactive>>,
378}
379
380impl StorageHandle {
381 /// Explicitly dispose: flip the per-tier `disposed` flag so the
382 /// in-wave observe sink stops scheduling persistence. Core-free
383 /// (no unsubscribe) — use [`StorageHandle::detach`] for the full
384 /// owner-side teardown when the `&Core` is available.
385 ///
386 /// F3 (QA 2026-05-14): on `PoisonError` (a panic in the observe
387 /// sink poisoned the state mutex), recover via `into_inner()`
388 /// rather than silently no-op. The disposed flag still gets set
389 /// even after a sink panic, which matters for cleanup.
390 pub fn dispose(&self) {
391 let mut states = self
392 .state
393 .lock()
394 .unwrap_or_else(std::sync::PoisonError::into_inner);
395 for s in states.iter_mut() {
396 s.disposed = true;
397 }
398 }
399
400 /// Owner-invoked, synchronous teardown (D246 r3 — replaces the
401 /// retired RAII `Drop`). Flips the `disposed` flags then detaches
402 /// the observe handle (unsubscribes every per-node sink, the
403 /// namespace-change sink, and the topology sub) via the owner's
404 /// `&Core`. Idempotent: a second call is a no-op (the observe
405 /// handle is taken on first call).
406 pub fn detach(&self, core: &Core) {
407 self.dispose();
408 let observe = self
409 .observe
410 .lock()
411 .unwrap_or_else(std::sync::PoisonError::into_inner)
412 .take();
413 if let Some(mut observe) = observe {
414 observe.detach(core);
415 }
416 }
417
418 /// Drain pending writes on all tiers (D171).
419 ///
420 /// When a tier is configured with `debounce_ms > 0`,
421 /// [`attach_snapshot_storage`]'s observe sink writes via the
422 /// tier's `save()` but skips the inline `flush()`. Callers
423 /// invoke this method — typically from a binding-side reactive
424 /// timer subgraph (`graphrefly_operators::temporal::interval`,
425 /// shipped in Slice T) — to commit the buffered writes to their
426 /// backends. With `debounce_ms == 0`, the observe sink already
427 /// force-flushes inline; calling this method is then a no-op
428 /// because the tier's pending buffers are empty.
429 ///
430 /// Returns the first error encountered, if any. Successful tiers
431 /// in the same call still flush; the error is surfaced for the
432 /// caller's diagnostics.
433 ///
434 /// # Lock discipline (N3, QA 2026-05-14)
435 ///
436 /// The state mutex is **not** held across `tier.flush()` calls.
437 /// Holding it across blocking I/O would (a) serialize the
438 /// reactive graph against backend latency (every observe-sink
439 /// fire would wait on flush completion), and (b) deadlock if a
440 /// caller invokes `flush_all` from inside an `on_error` callback
441 /// (the observe sink already holds `state.lock()` when it
442 /// invokes `on_error`). The implementation snapshots the per-
443 /// tier flush requests under the lock, drops the lock, then
444 /// runs the flushes in sequence. After each flush, a brief
445 /// re-lock applies any per-tier state mutations (none today —
446 /// `flush()` doesn't return new state).
447 ///
448 /// # Errors
449 ///
450 /// Returns the first `StorageError` encountered. Subsequent
451 /// errors are dropped (acceptable v1; the registered
452 /// `on_error` callback fires per error in the observe sink path
453 /// and is the canonical multi-error reporting surface).
454 ///
455 /// F3: recovers from a poisoned state mutex via `into_inner`
456 /// rather than silently returning Ok.
457 pub fn flush_all(&self) -> Result<(), StorageError> {
458 // Snapshot the per-tier flush callables under the lock,
459 // then drop the lock before invoking them. Each tier's
460 // `flush()` is a `&self` method on `Box<dyn ...>` — to call
461 // it without holding the outer lock, we collect raw
462 // pointers... no, `Box<dyn ...>` can't be cloned. Instead,
463 // we re-lock briefly per tier to call flush, but never hold
464 // the lock across MULTIPLE tier flushes — that's the key
465 // deadlock avoidance.
466 //
467 // The remaining single-tier-lock-during-flush window can
468 // deadlock if `tier.flush()` synchronously re-enters
469 // `dispose()` / `flush_all()` (which would try to re-acquire
470 // `self.state`). Tier impls in `graphrefly-storage` don't do
471 // that; document the contract for future binding-side tier
472 // impls in the rustdoc.
473 let tier_count = {
474 let states = self
475 .state
476 .lock()
477 .unwrap_or_else(std::sync::PoisonError::into_inner);
478 states.len()
479 };
480 let mut first_err: Option<StorageError> = None;
481 for idx in 0..tier_count {
482 // Per-tier flush: brief lock to access the tier, run
483 // flush, drop lock immediately. The tier itself is
484 // behind a Box<dyn ...> inside the Vec; flush() takes
485 // &self, so the &mut on the outer Vec is only needed
486 // for `s.disposed` check.
487 let snapshot_err: Option<StorageError>;
488 let wal_err: Option<StorageError>;
489 {
490 let mut states = self
491 .state
492 .lock()
493 .unwrap_or_else(std::sync::PoisonError::into_inner);
494 let Some(s) = states.get_mut(idx) else {
495 continue;
496 };
497 if s.disposed {
498 continue;
499 }
500 // Call flush while holding the lock — this is
501 // narrow-scoped and the only contention is with the
502 // observe sink, which is itself bounded by wave
503 // dispatch. No `on_error` invocation under this
504 // lock (callers receive errors via the return
505 // value, not via the registered callback).
506 snapshot_err = s.snapshot_tier.flush().err();
507 wal_err = s.wal_tier.as_ref().and_then(|wal| wal.flush().err());
508 }
509 if let Some(e) = snapshot_err {
510 if first_err.is_none() {
511 first_err = Some(e);
512 }
513 }
514 if let Some(e) = wal_err {
515 if first_err.is_none() {
516 first_err = Some(e);
517 }
518 }
519 }
520 match first_err {
521 None => Ok(()),
522 Some(e) => Err(e),
523 }
524 }
525}
526
527// D246 r3: no `impl Drop` — teardown is owner-invoked
528// ([`StorageHandle::detach`]); a parameterless `Drop` cannot reach
529// `&Core`. The observe subscription is opened via raw `core.subscribe`
530// and is NOT `OwnedCore`-tracked: you MUST call `detach(core)` —
531// dropping a `StorageHandle` without it (and without a subsequent
532// `graph.destroy(core)`) leaks the subscription for the `Core`
533// lifetime. `OwnedCore` drop does NOT collect it.
534
535/// Wire an observe subscription on `graph` that persists node changes
536/// to the provided snapshot+WAL tier pairs.
537///
538/// # Debounce (D171, resolved 2026-05-14)
539///
540/// When a tier's `debounce_ms` is `Some(ms)` with `ms > 0`, the
541/// observe callback writes via the tier's `save()` (which buffers
542/// internally per the [`BaseStorageTier`](crate::tier::BaseStorageTier)
543/// contract) but does NOT force a `flush()`. Callers drain pending
544/// writes by invoking [`StorageHandle::flush_all`] — typically from a
545/// binding-side reactive timer (e.g.,
546/// `graphrefly_operators::temporal::interval`). This keeps
547/// `graphrefly-storage` sync + binding-agnostic per the canonical
548/// no-async-in-storage invariant; reactive-timer wiring lives in the
549/// layer that already owns the timer primitive.
550///
551/// When `debounce_ms` is `None` or `Some(0)`, the observe callback
552/// continues to force-flush inline as before.
553///
554/// # `key_of` (D174, closes F8)
555///
556/// The snapshot tier's backend key is derived from `graph.name` via
557/// the checkpoint record's `name` field. Cross-impl `key_of` divergence
558/// disappears at this boundary.
559/// D246: the embedder owns the [`Core`] (see
560/// [`graphrefly_core::OwnedCore`]) and passes `&Core` in — the
561/// `observe_all_reactive` subscription is opened owner-side. Teardown
562/// is owner-invoked via [`StorageHandle::detach`] (no RAII `Drop`,
563/// D246 r3).
564#[must_use = "the returned StorageHandle owns the observe subscription; \
565 call StorageHandle::detach(core) to unsubscribe and stop \
566 persistence (D246 r3 — no RAII Drop)"]
567pub fn attach_snapshot_storage(
568 core: &Core,
569 graph: &Graph,
570 pairs: Vec<AttachTierPair>,
571 options: AttachOptions,
572) -> StorageHandle {
573 let graph_name = graph.name();
574 let wal_prefix = graph_wal_prefix(&graph_name);
575
576 let mut states = Vec::with_capacity(pairs.len());
577 for pair in pairs {
578 // Bootstrap: enumerate existing WAL frames to find high-water seq.
579 let mut high_seq: u64 = 0;
580 if let Some(ref wal) = pair.wal {
581 if let Ok(keys) = wal.list(&wal_prefix) {
582 for key in keys {
583 if let Some(seg) = key.rsplit('/').next() {
584 if let Ok(s) = seg.parse::<u64>() {
585 high_seq = high_seq.max(s);
586 }
587 }
588 }
589 }
590 }
591
592 let compact_every = pair.snapshot.compact_every().unwrap_or(10);
593 let snapshot_debounce = pair.snapshot.debounce_ms().unwrap_or(0);
594 let wal_debounce = pair.wal.as_ref().and_then(|w| w.debounce_ms()).unwrap_or(0);
595
596 states.push(TierState {
597 snapshot_tier: pair.snapshot,
598 wal_tier: pair.wal,
599 wal_prefix: wal_prefix.clone(),
600 seq: high_seq,
601 flush_count: 0,
602 compact_every,
603 snapshot_debounce_ms: snapshot_debounce,
604 wal_debounce_ms: wal_debounce,
605 last_snapshot: None,
606 disposed: false,
607 });
608 }
609
610 let shared_states = Arc::new(Mutex::new(states));
611 let states_for_sink = shared_states.clone();
612 let filter = options.filter;
613 // β/D244: wrap in `Arc` so each per-fire `MailboxOp::Defer` can
614 // clone the callback (`ErrorCallback` is a non-`Clone` `Box`).
615 let on_error = options.on_error.map(Arc::new);
616 // β/D242/D244/D246: the observe sink fires *in-wave* (during Core
617 // dispatch) and has no `&Core`. `Graph` is now Core-free and
618 // `Send + Sync + 'static` (D246), so a cheap `Arc`-clone is
619 // captured directly into the deferred closure (the old
620 // `NamespaceHandle` split is gone). The snapshot + tier-flush is
621 // posted as a `MailboxOp::Defer` and runs on the owner with a real
622 // `&dyn CoreFull`. Behaviour: snapshot+persist is in-wave-deferred
623 // to quiescence rather than synchronous in the sink — consistent
624 // with D243/D244 "deferred snapshot acceptable" (storage
625 // persistence is a downstream observation).
626 let graph_for_sink = graph.clone();
627 // D249/S2c: the defer closure captures a `Graph`
628 // (`Rc<RefCell<GraphInner>>`, `!Send` post-D248), so it must use
629 // the owner-only `!Send` `DeferQueue`, not the `Send` `CoreMailbox`
630 // Defer. Owner-thread-only `Rc` — fine: `observe_all_reactive`'s
631 // sink is `!Send` and fires owner-side.
632 let deferred = core.defer_queue();
633
634 // D246 rule 8 (S4): reusable coalescing slot. The snapshot+persist
635 // is idempotent at drain time (`snapshot_full` reads current state),
636 // so N qualifying emissions in one wave need only ONE deferred
637 // snapshot+flush, not N boxed closures. `scheduled` (owner-thread-
638 // only `Cell`) gates a single `Box` post per drain; the closure
639 // clears it so the next wave re-arms.
640 //
641 // M2 (QA, 2026-05-19): `compact_every` cadence is parameterised
642 // per-emission (TS parity), so the coalescing MUST NOT also
643 // collapse the count. `pending_count` tracks the number of
644 // qualifying emits observed in the wave; the closure consumes it
645 // and `flush_tier` increments `flush_count` by that count + tests
646 // boundary-crossing of `compact_every`, preserving the per-emission
647 // compact cadence under the per-wave snapshot. Persisted state is
648 // the final wave state either way (deferred-snapshot acceptable,
649 // D243/D244) — only the *cadence* needs the count.
650 let scheduled = std::rc::Rc::new(std::cell::Cell::new(false));
651 let pending_count = std::rc::Rc::new(std::cell::Cell::new(0usize));
652
653 // Wire observe_all_reactive so late-added nodes are also covered.
654 let mut observe = graph.observe_all_reactive();
655 observe.subscribe(core, move |path: &str, messages: &[Message]| {
656 // Filter: only tiers 3–5 (DATA/RESOLVED, INVALIDATE, COMPLETE/ERROR).
657 let dominated_by_tier = messages.iter().any(|m| {
658 let t = m.tier();
659 (3..6).contains(&t)
660 });
661 if !dominated_by_tier {
662 return;
663 }
664
665 // Optional path filter (cheap, no Core — stays synchronous).
666 if let Some(ref f) = filter {
667 if !f(path) {
668 return;
669 }
670 }
671
672 // M2: count this qualifying emit BEFORE the coalesce gate, so
673 // `compact_every` cadence stays per-emission. Saturating add —
674 // a `usize` overflow on count-per-wave is unreachable in any
675 // realistic workload (max waves are bounded by `max_ops`).
676 pending_count.set(pending_count.get().saturating_add(1));
677
678 if scheduled.get() {
679 return; // already armed for this drain — coalesce.
680 }
681 let graph_for_defer = graph_for_sink.clone();
682 let states_for_defer = states_for_sink.clone();
683 let on_error = on_error.clone();
684 let sched = std::rc::Rc::clone(&scheduled);
685 let pc = std::rc::Rc::clone(&pending_count);
686 sched.set(true);
687 // Defer the snapshot + flush owner-side (D244). Core-gone
688 // (`false`) ⇒ dropped unrun: nothing to persist on a
689 // torn-down graph, no handles captured (no leak).
690 let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
691 sched.set(false);
692 // M2: consume the per-emission count and reset for the
693 // next wave.
694 let count = pc.replace(0);
695 // Take a snapshot once, shared across all sync tiers.
696 // D246: the in-wave facade `&dyn CoreFull` drives the
697 // snapshot through the one public `Graph` type (Core-free,
698 // Send+Sync — captured directly).
699 let snapshot = graph_for_defer.snapshot_full(cf);
700
701 // N3 (QA 2026-05-14) — collect errors INSIDE the lock,
702 // invoke `on_error` AFTER releasing the lock (re-entrant
703 // `flush_all`/`dispose` would otherwise deadlock). F3:
704 // recover from a poisoned lock via `into_inner`.
705 let collected_errors: Vec<StorageError> = {
706 let mut states = states_for_defer
707 .lock()
708 .unwrap_or_else(std::sync::PoisonError::into_inner);
709 let mut errs = Vec::new();
710 for s in states.iter_mut() {
711 if s.disposed {
712 continue;
713 }
714 if let Err(e) = flush_tier(s, &snapshot, count) {
715 errs.push(e);
716 }
717 }
718 errs
719 };
720 if !collected_errors.is_empty() {
721 if let Some(ref cb) = on_error {
722 for e in &collected_errors {
723 cb(e);
724 }
725 }
726 }
727 }));
728 });
729
730 StorageHandle {
731 state: shared_states,
732 observe: Mutex::new(Some(observe)),
733 }
734}
735
736/// Flush a single tier: either full baseline or WAL delta.
737///
738/// `count` is the number of qualifying observed emits this coalesced
739/// flush represents (M2 / QA 2026-05-19). `flush_count` advances by
740/// `count` to preserve the TS-parity per-emission cadence of
741/// `compact_every`: `write_full` fires if any of the increments in
742/// `before+1..=before+count` would have been a multiple of
743/// `compact_every` (boundary-crossing test) — equivalent to firing on
744/// each emit one-by-one, just batched into the coalesced flush.
745fn flush_tier(
746 s: &mut TierState,
747 snapshot: &GraphPersistSnapshot,
748 count: usize,
749) -> Result<(), StorageError> {
750 let before = s.flush_count;
751 // Defensive: a `count == 0` call (no qualifying emits) is a no-op
752 // here — still walks the write path so first-baseline & WAL deltas
753 // honor `last_snapshot` correctness, but doesn't tick cadence.
754 let inc = count as u64;
755 s.flush_count = s.flush_count.saturating_add(inc);
756
757 let write_full = s.wal_tier.is_none()
758 || s.last_snapshot.is_none()
759 || (s.compact_every > 0 && {
760 // Did the batch [before+1 ..= before+count] cross at least
761 // one multiple of `compact_every`? Integer-division test.
762 let cmp = u64::from(s.compact_every);
763 (before / cmp) < (s.flush_count / cmp)
764 });
765
766 if write_full {
767 write_full_baseline(s, snapshot)?;
768 } else {
769 write_wal_delta(s, snapshot)?;
770 }
771
772 s.last_snapshot = Some(snapshot.clone());
773 Ok(())
774}
775
776/// Write a full baseline snapshot to the snapshot tier.
777///
778/// When `snapshot_debounce_ms > 0` (D171), the inner `save()` buffers
779/// per the tier's `BaseStorageTier` contract and we DON'T force a
780/// `flush()`. The caller drains via [`StorageHandle::flush_all`].
781fn write_full_baseline(
782 s: &mut TierState,
783 snapshot: &GraphPersistSnapshot,
784) -> Result<(), StorageError> {
785 let timestamp_ns = graphrefly_core::wall_clock_ns();
786 let record = GraphCheckpointRecord {
787 name: snapshot.name.clone(),
788 mode: "full".to_owned(),
789 snapshot: snapshot.clone(),
790 seq: s.seq,
791 timestamp_ns,
792 format_version: SNAPSHOT_VERSION,
793 };
794
795 s.snapshot_tier.save(record)?;
796 if s.snapshot_debounce_ms == 0 {
797 s.snapshot_tier.flush()?;
798 }
799 Ok(())
800}
801
802/// Write WAL delta frames for the diff between `last_snapshot` and current.
803///
804/// D171: WAL `flush()` is skipped when the WAL tier has
805/// `debounce_ms > 0`; the caller drains via
806/// [`StorageHandle::flush_all`].
807fn write_wal_delta(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
808 let last = s
809 .last_snapshot
810 .as_ref()
811 .expect("caller ensures last_snapshot is Some");
812 let diff = diff_snapshots(last, snapshot);
813
814 if diff.is_empty() {
815 return Ok(());
816 }
817
818 let timestamp_ns = graphrefly_core::wall_clock_ns();
819 let (frames, next_seq) = decompose_diff_to_frames(&diff, timestamp_ns, s.seq)?;
820
821 if let Some(ref wal) = s.wal_tier {
822 for frame in &frames {
823 let key = wal_frame_key(&s.wal_prefix, frame.frame_seq);
824 wal.save(&key, frame.clone())?;
825 }
826 if s.wal_debounce_ms == 0 {
827 wal.flush()?;
828 }
829 }
830
831 s.seq = next_seq;
832 Ok(())
833}
834
835// ── Restore (D170) ─────────────────────────────────────────────────────────
836
837/// Torn-write policy for mid-stream checksum failures.
838#[derive(Debug, Clone, Copy, PartialEq, Eq)]
839pub enum TornWritePolicy {
840 /// Drop the frame and continue (default for WAL tail).
841 Skip,
842 /// Abort the entire restore.
843 Abort,
844}
845
846/// Callback for torn-write decisions. Receives the `frame_seq` and
847/// reason; returns the desired policy.
848pub type OnTornWrite = Box<dyn Fn(u64, &str) -> TornWritePolicy + Send + Sync>;
849
850/// Options for [`restore_snapshot`].
851pub struct RestoreOptions<'a> {
852 /// Snapshot tier to load the baseline from.
853 pub snapshot_tier: &'a dyn SnapshotStorageTier<GraphCheckpointRecord>,
854 /// WAL tier to enumerate delta frames from.
855 pub wal_tier: &'a dyn KvStorageTier<WALFrame<Value>>,
856 /// Optional max `frame_seq` to replay up to. `None` = replay all.
857 pub target_seq: Option<u64>,
858 /// Torn-write callback. If `None`, defaults: tail = Skip, mid = Abort.
859 pub on_torn_write: Option<OnTornWrite>,
860}
861
862/// Three-phase WAL replay: baseline load → checksum verify → lifecycle-
863/// scoped batch.
864///
865/// # Phase 1: Baseline
866///
867/// Loads the `mode:"full"` baseline from the snapshot tier. The snapshot
868/// key is derived from `graph.name` (D174).
869///
870/// # Phase 2: Checksum verification
871///
872/// Enumerates WAL frames with `frame_seq > baseline.seq`, verifies each
873/// frame's SHA-256 checksum, applies torn-write policy on mismatch.
874///
875/// # Phase 3: Lifecycle-scoped batch replay
876///
877/// Groups verified frames by lifecycle. Replays in cross-scope order
878/// (`Spec → Data → Ownership`). Each lifecycle runs in a `graph.batch()`
879/// for atomic partial-restore semantics (Q2).
880///
881/// D246: the embedder owns the [`Core`] (see
882/// [`graphrefly_core::OwnedCore`]) and threads `&Core` into every
883/// Core-touching graph mutation (baseline restore + WAL replay).
884pub fn restore_snapshot(
885 core: &Core,
886 graph: &Graph,
887 opts: &RestoreOptions<'_>,
888) -> Result<RestoreResult, RestoreError> {
889 // Phase 1: Load and apply baseline.
890 let baseline = load_baseline(core, graph, opts)?;
891 let baseline_seq = baseline.seq;
892
893 // Phase 1b: Collect WAL frames post-baseline.
894 let collected = collect_wal_frames(opts, &baseline.name, baseline_seq)?;
895
896 // Phase 2: Checksum verification.
897 let (verified, skipped) = verify_frames(collected, opts.on_torn_write.as_ref())?;
898
899 // Phase 3: Lifecycle-scoped batch replay.
900 Ok(replay_by_lifecycle(
901 core,
902 graph,
903 &verified,
904 baseline_seq,
905 skipped,
906 ))
907}
908
909/// Phase 1: Load baseline from snapshot tier + apply to graph.
910fn load_baseline(
911 core: &Core,
912 graph: &Graph,
913 opts: &RestoreOptions<'_>,
914) -> Result<GraphCheckpointRecord, RestoreError> {
915 let baseline = opts
916 .snapshot_tier
917 .load()
918 .map_err(|e| RestoreError::PhaseFailed {
919 lifecycle: Lifecycle::Spec,
920 frame_seq: 0,
921 message: format!("baseline load failed: {e}"),
922 })?
923 .ok_or(RestoreError::BaselineMissing)?;
924
925 if baseline.mode != "full" {
926 return Err(RestoreError::BaselineMissing);
927 }
928
929 graph
930 .restore(core, &baseline.snapshot)
931 .map_err(|e| RestoreError::PhaseFailed {
932 lifecycle: Lifecycle::Spec,
933 frame_seq: 0,
934 message: format!("baseline restore failed: {e}"),
935 })?;
936
937 Ok(baseline)
938}
939
940/// Phase 1b: Enumerate + filter WAL frames.
941fn collect_wal_frames(
942 opts: &RestoreOptions<'_>,
943 graph_name: &str,
944 baseline_seq: u64,
945) -> Result<Vec<WALFrame<Value>>, RestoreError> {
946 let wal_prefix = graph_wal_prefix(graph_name);
947 let keys = opts
948 .wal_tier
949 .list(&wal_prefix)
950 .map_err(|e| RestoreError::PhaseFailed {
951 lifecycle: Lifecycle::Spec,
952 frame_seq: 0,
953 message: format!("WAL frame enumeration failed: {e}"),
954 })?;
955
956 let mut collected: Vec<WALFrame<Value>> = Vec::new();
957 for key in keys {
958 let frame_seq = key
959 .rsplit('/')
960 .next()
961 .and_then(|s| s.parse::<u64>().ok())
962 .unwrap_or(0);
963
964 if frame_seq <= baseline_seq {
965 continue;
966 }
967 if let Some(target) = opts.target_seq {
968 if frame_seq > target {
969 continue;
970 }
971 }
972
973 if let Some(frame) = opts
974 .wal_tier
975 .load(&key)
976 .map_err(|e| RestoreError::PhaseFailed {
977 lifecycle: Lifecycle::Data,
978 frame_seq,
979 message: format!("WAL frame load failed: {e}"),
980 })?
981 {
982 collected.push(frame);
983 }
984 }
985
986 collected.sort_by_key(|f| f.frame_seq);
987 Ok(collected)
988}
989
990/// Phase 2: Verify checksums, apply torn-write policy.
991fn verify_frames(
992 collected: Vec<WALFrame<Value>>,
993 on_torn_write: Option<&OnTornWrite>,
994) -> Result<(Vec<WALFrame<Value>>, u64), RestoreError> {
995 let mut verified = Vec::new();
996 let mut skipped: u64 = 0;
997 let total = collected.len();
998
999 for (i, frame) in collected.into_iter().enumerate() {
1000 if verify_wal_frame_checksum(&frame).unwrap_or(false) {
1001 verified.push(frame);
1002 continue;
1003 }
1004
1005 let is_tail = i == total - 1;
1006 let policy = if let Some(cb) = on_torn_write {
1007 cb(frame.frame_seq, "checksum-mismatch")
1008 } else if is_tail {
1009 TornWritePolicy::Skip
1010 } else {
1011 TornWritePolicy::Abort
1012 };
1013
1014 match policy {
1015 TornWritePolicy::Skip => skipped += 1,
1016 TornWritePolicy::Abort => {
1017 return Err(RestoreError::TornWriteMidStream {
1018 frame_seq: frame.frame_seq,
1019 reason: "checksum-mismatch".to_owned(),
1020 });
1021 }
1022 }
1023 }
1024
1025 Ok((verified, skipped))
1026}
1027
1028/// Phase 3: Group by lifecycle, replay in cross-scope order.
1029fn replay_by_lifecycle(
1030 core: &Core,
1031 graph: &Graph,
1032 verified: &[WALFrame<Value>],
1033 baseline_seq: u64,
1034 skipped: u64,
1035) -> RestoreResult {
1036 let mut grouped: [Vec<WALFrame<Value>>; 3] = [Vec::new(), Vec::new(), Vec::new()];
1037 for frame in verified {
1038 for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
1039 if frame.lifecycle == *lifecycle {
1040 grouped[idx].push(frame.clone());
1041 break;
1042 }
1043 }
1044 }
1045
1046 let mut phases = Vec::new();
1047 let mut replayed: u64 = 0;
1048 let mut final_seq: u64 = baseline_seq;
1049
1050 for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
1051 let life_frames = &grouped[idx];
1052 if life_frames.is_empty() {
1053 continue;
1054 }
1055 let frame_count = life_frames.len() as u64;
1056 let max_seq = life_frames.iter().map(|f| f.frame_seq).max().unwrap_or(0);
1057
1058 graph.batch(core, || {
1059 for frame in life_frames {
1060 apply_wal_frame(core, graph, frame);
1061 }
1062 });
1063
1064 replayed += frame_count;
1065 final_seq = final_seq.max(max_seq);
1066 phases.push(PhaseStat {
1067 lifecycle: *lifecycle,
1068 frames: frame_count,
1069 });
1070 }
1071
1072 RestoreResult {
1073 replayed_frames: replayed,
1074 skipped_frames: skipped,
1075 final_seq,
1076 phases,
1077 }
1078}
1079
1080/// Apply a single WAL frame to a graph. Mirrors TS `applyWalFrame`.
1081///
1082/// D246: every Core-touching `graph.*` call takes the owner's `&Core`;
1083/// binding access is `core.binding_ptr()` (the retired `Graph::core()`
1084/// is gone — `Graph` is Core-free).
1085fn apply_wal_frame(core: &Core, graph: &Graph, frame: &WALFrame<Value>) {
1086 let change = &frame.change.change;
1087 let kind = change.get("kind").and_then(Value::as_str).unwrap_or("");
1088
1089 match frame.lifecycle {
1090 Lifecycle::Spec => match kind {
1091 "graph.add" => {
1092 let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
1093 if node_id_str.is_empty() || graph.try_resolve(node_id_str).is_some() {
1094 return; // already present or invalid
1095 }
1096 // Only auto-create state nodes (matches TS behavior).
1097 let slice = change.get("slice");
1098 let node_type = slice
1099 .and_then(|s| s.get("type"))
1100 .and_then(Value::as_str)
1101 .unwrap_or("");
1102 if node_type != "state" {
1103 return;
1104 }
1105 let initial_value = slice.and_then(|s| s.get("value")).cloned();
1106 let handle = initial_value.map_or(graphrefly_core::NO_HANDLE, |v| {
1107 core.binding_ptr().deserialize_value(v)
1108 });
1109 let _ = graph.state(core, node_id_str, Some(handle));
1110 }
1111 "graph.remove" => {
1112 let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
1113 if !node_id_str.is_empty() && graph.try_resolve(node_id_str).is_some() {
1114 let _ = graph.remove(core, node_id_str);
1115 }
1116 }
1117 // graph.mount, graph.unmount — deferred (Phase 14.6+)
1118 _ => {}
1119 },
1120 Lifecycle::Data => match kind {
1121 "node.set" => {
1122 let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1123 if let Some(value) = change.get("value") {
1124 if !path.is_empty() && graph.try_resolve(path).is_some() {
1125 let handle = core.binding_ptr().deserialize_value(value.clone());
1126 graph.set(core, path, handle);
1127 }
1128 }
1129 }
1130 "node.invalidate" => {
1131 let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1132 if !path.is_empty() {
1133 if let Some(id) = graph.try_resolve(path) {
1134 graph.invalidate(core, id);
1135 }
1136 }
1137 }
1138 // node.versionBump — deferred (V0 versioning is internal)
1139 _ => {}
1140 },
1141 // Ownership lifecycle — deferred (Phase 13)
1142 Lifecycle::Ownership => {}
1143 }
1144}