Skip to main content

graphrefly_storage/
graph_integration.rs

1//! Graph-level storage integration (M4.E2 — D170–D174).
2//!
3//! Free functions (not `Graph` methods) because `graphrefly-graph` does not
4//! depend on `graphrefly-storage` — the dependency flows in the other
5//! direction. See D170.
6//!
7//! # Provided APIs
8//!
9//! - [`GraphCheckpointRecord`] — portable baseline type wrapping a
10//!   [`GraphPersistSnapshot`] + seq metadata + `format_version` (F7 close).
11//! - [`diff_snapshots`] / [`decompose_diff_to_frames`] — snapshot diff →
12//!   WAL frame generation engine (D172).
13//! - [`attach_snapshot_storage`] + [`StorageHandle`] — wire observe
14//!   subscription → snapshot diff → WAL frame writes.
15//! - [`restore_snapshot`] — three-phase replay (baseline → checksum verify
16//!   → lifecycle-scoped batch).
17//!
18//! # Manifest (D173 — F4 close)
19//!
20//! F4 (cross-restart key recovery) is structurally closed by D174: at the
21//! attach boundary, `key_of` is derived deterministically from
22//! `graph.name`. On restore, the snapshot key is known without a separate
23//! manifest entry. The checkpoint record's `seq` field serves as the WAL
24//! high-water mark.
25
26use std::sync::{Arc, Mutex};
27
28use graphrefly_core::{Core, CoreFull, Message};
29use graphrefly_graph::{Graph, GraphObserveAllReactive, GraphPersistSnapshot, NodeSlice};
30use graphrefly_structures::{BaseChange, Lifecycle, Version};
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33
34use crate::error::{PhaseStat, RestoreError, RestoreResult, StorageError};
35use crate::tier::{KvStorageTier, SnapshotStorageTier};
36use crate::wal::{
37    graph_wal_prefix, verify_wal_frame_checksum, wal_frame_checksum, wal_frame_key, WALFrame,
38    WalTag, REPLAY_ORDER,
39};
40
41// ── Constants ──────────────────────────────────────────────────────────────
42
43/// Current snapshot format version. Embedded in [`GraphCheckpointRecord`]
44/// and in `BaseChange.version` within decomposed WAL frames (F7 close).
45pub const SNAPSHOT_VERSION: u64 = 1;
46
47// ── Types ──────────────────────────────────────────────────────────────────
48
49/// Portable baseline record written by [`attach_snapshot_storage`] on
50/// full-snapshot writes. Contains the full [`GraphPersistSnapshot`] plus
51/// metadata for WAL cursor alignment.
52///
53/// The `format_version` field closes F7 (missing `format_version` on
54/// `WALFrame` and checkpoint records).
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GraphCheckpointRecord {
57    /// Graph name (matches `snapshot.name`).
58    pub name: String,
59    /// Snapshot mode — `"full"` for baseline, `"diff"` reserved for future
60    /// incremental baselines.
61    pub mode: String,
62    /// The complete graph state.
63    pub snapshot: GraphPersistSnapshot,
64    /// WAL-tier cursor at baseline write time. Frames with `frame_seq >
65    /// seq` are the delta.
66    pub seq: u64,
67    /// Wall-clock timestamp at baseline write time.
68    pub timestamp_ns: u64,
69    /// Format version (F7 close).
70    pub format_version: u64,
71}
72
73// ── Diff engine (D172) ─────────────────────────────────────────────────────
74
75/// Structural diff between two [`GraphPersistSnapshot`]s.
76#[derive(Debug, Clone)]
77pub struct GraphSnapshotDiff {
78    /// Node names present in `after` but not `before`.
79    pub nodes_added: Vec<String>,
80    /// Full slices for added nodes (parallel to `nodes_added`).
81    pub nodes_added_slices: Vec<NodeSlice>,
82    /// Node names present in `before` but not `after`.
83    pub nodes_removed: Vec<String>,
84    /// Nodes whose `value` field changed between snapshots.
85    pub value_changes: Vec<ValueChange>,
86    /// Subgraph mount names added.
87    pub subgraphs_added: Vec<String>,
88    /// Subgraph mount names removed.
89    pub subgraphs_removed: Vec<String>,
90}
91
92impl GraphSnapshotDiff {
93    /// True when no structural or value changes were detected.
94    #[must_use]
95    pub fn is_empty(&self) -> bool {
96        self.nodes_added.is_empty()
97            && self.nodes_removed.is_empty()
98            && self.value_changes.is_empty()
99            && self.subgraphs_added.is_empty()
100            && self.subgraphs_removed.is_empty()
101    }
102}
103
104/// A single node value change detected by [`diff_snapshots`].
105#[derive(Debug, Clone)]
106pub struct ValueChange {
107    /// Node path that changed.
108    pub path: String,
109    /// New value. `None` means the node transitioned to sentinel (INVALIDATE).
110    pub to: Option<Value>,
111}
112
113/// Compare two snapshots and produce a structural diff.
114///
115/// Only examines the top-level namespace (not recursive into subgraphs —
116/// subgraph diffs are handled by the attach wiring per-subgraph).
117#[must_use]
118pub fn diff_snapshots(
119    before: &GraphPersistSnapshot,
120    after: &GraphPersistSnapshot,
121) -> GraphSnapshotDiff {
122    let mut nodes_added = Vec::new();
123    let mut nodes_added_slices = Vec::new();
124    let mut nodes_removed = Vec::new();
125    let mut value_changes = Vec::new();
126    let mut subgraphs_added = Vec::new();
127    let mut subgraphs_removed = Vec::new();
128
129    // Nodes added or changed.
130    for (name, after_slice) in &after.nodes {
131        if let Some(before_slice) = before.nodes.get(name) {
132            if before_slice.value != after_slice.value {
133                value_changes.push(ValueChange {
134                    path: name.clone(),
135                    to: after_slice.value.clone(),
136                });
137            }
138        } else {
139            nodes_added.push(name.clone());
140            nodes_added_slices.push(after_slice.clone());
141        }
142    }
143
144    // Nodes removed.
145    for name in before.nodes.keys() {
146        if !after.nodes.contains_key(name) {
147            nodes_removed.push(name.clone());
148        }
149    }
150
151    // Subgraphs added/removed.
152    for name in after.subgraphs.keys() {
153        if !before.subgraphs.contains_key(name) {
154            subgraphs_added.push(name.clone());
155        }
156    }
157    for name in before.subgraphs.keys() {
158        if !after.subgraphs.contains_key(name) {
159            subgraphs_removed.push(name.clone());
160        }
161    }
162
163    GraphSnapshotDiff {
164        nodes_added,
165        nodes_added_slices,
166        nodes_removed,
167        value_changes,
168        subgraphs_added,
169        subgraphs_removed,
170    }
171}
172
173/// Intermediate frame before checksum stamping.
174struct DecomposedFrame {
175    lifecycle: Lifecycle,
176    path: String,
177    change: BaseChange<Value>,
178}
179
180/// Convert a [`GraphSnapshotDiff`] into WAL frames ready for persistence.
181///
182/// `timestamp_ns` is the wall-clock at diff time. `base_seq` is the WAL
183/// cursor; returned frames have `frame_seq` values starting at
184/// `base_seq + 1`.
185///
186/// Returns `(frames, next_seq)` where `next_seq` is the highest assigned
187/// `frame_seq`.
188pub fn decompose_diff_to_frames(
189    diff: &GraphSnapshotDiff,
190    timestamp_ns: u64,
191    base_seq: u64,
192) -> Result<(Vec<WALFrame<Value>>, u64), StorageError> {
193    let mut decomposed = Vec::new();
194
195    let wrap = |structure: &str, lifecycle: Lifecycle, payload: Value| -> BaseChange<Value> {
196        BaseChange {
197            structure: structure.to_owned(),
198            version: Version::Counter(SNAPSHOT_VERSION),
199            t_ns: timestamp_ns,
200            seq: None,
201            lifecycle,
202            change: payload,
203        }
204    };
205
206    // Spec lifecycle: node add/remove, subgraph mount/unmount.
207    for (i, name) in diff.nodes_added.iter().enumerate() {
208        let slice = &diff.nodes_added_slices[i];
209        let payload = serde_json::json!({
210            "kind": "graph.add",
211            "nodeId": name,
212            "slice": serde_json::to_value(slice).map_err(|e|
213                StorageError::Codec(crate::codec::CodecError::Encode(e.to_string()))
214            )?,
215        });
216        decomposed.push(DecomposedFrame {
217            lifecycle: Lifecycle::Spec,
218            path: name.clone(),
219            change: wrap("graph.spec", Lifecycle::Spec, payload),
220        });
221    }
222
223    for name in &diff.nodes_removed {
224        let payload = serde_json::json!({
225            "kind": "graph.remove",
226            "nodeId": name,
227        });
228        decomposed.push(DecomposedFrame {
229            lifecycle: Lifecycle::Spec,
230            path: name.clone(),
231            change: wrap("graph.spec", Lifecycle::Spec, payload),
232        });
233    }
234
235    for name in &diff.subgraphs_added {
236        let payload = serde_json::json!({
237            "kind": "graph.mount",
238            "path": name,
239            "subgraphId": name,
240        });
241        decomposed.push(DecomposedFrame {
242            lifecycle: Lifecycle::Spec,
243            path: name.clone(),
244            change: wrap("graph.spec", Lifecycle::Spec, payload),
245        });
246    }
247
248    for name in &diff.subgraphs_removed {
249        let payload = serde_json::json!({
250            "kind": "graph.unmount",
251            "path": name,
252        });
253        decomposed.push(DecomposedFrame {
254            lifecycle: Lifecycle::Spec,
255            path: name.clone(),
256            change: wrap("graph.spec", Lifecycle::Spec, payload),
257        });
258    }
259
260    // Data lifecycle: value changes.
261    for vc in &diff.value_changes {
262        let payload = if let Some(ref value) = vc.to {
263            serde_json::json!({
264                "kind": "node.set",
265                "path": vc.path,
266                "value": value,
267            })
268        } else {
269            serde_json::json!({
270                "kind": "node.invalidate",
271                "path": vc.path,
272            })
273        };
274        decomposed.push(DecomposedFrame {
275            lifecycle: Lifecycle::Data,
276            path: vc.path.clone(),
277            change: wrap("graph.value", Lifecycle::Data, payload),
278        });
279    }
280
281    // Assign frame_seq and compute checksums.
282    let mut seq = base_seq;
283    let mut frames = Vec::with_capacity(decomposed.len());
284    for d in decomposed {
285        seq += 1;
286        let mut frame = WALFrame {
287            t: WalTag,
288            lifecycle: d.lifecycle,
289            path: d.path,
290            change: d.change,
291            frame_seq: seq,
292            frame_t_ns: timestamp_ns,
293            checksum: String::new(),
294            format_version: 1,
295        };
296        frame.checksum = wal_frame_checksum(&frame)?;
297        frames.push(frame);
298    }
299
300    Ok((frames, seq))
301}
302
303// ── Attach (D170) ──────────────────────────────────────────────────────────
304
305/// Per-tier state managed by the attach wiring.
306struct TierState {
307    /// The snapshot tier (writes full baselines).
308    snapshot_tier: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
309    /// Optional WAL tier (writes individual delta frames).
310    wal_tier: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
311    /// WAL key prefix derived from `graph.name`.
312    wal_prefix: String,
313    /// Monotonic cursor.
314    seq: u64,
315    /// Flush counter for `compact_every` cadence.
316    flush_count: u64,
317    /// Configured compact-every cadence (0 = every flush writes full baseline).
318    compact_every: u32,
319    /// Snapshot tier's `debounce_ms` snapshot at attach time (D171).
320    /// `0` (the unset / disabled case) means inline-flush; `>0` means
321    /// `save()` buffers and the caller drives explicit flush via
322    /// [`StorageHandle::flush_all`].
323    snapshot_debounce_ms: u32,
324    /// WAL tier's `debounce_ms` snapshot at attach time (D171).
325    wal_debounce_ms: u32,
326    /// Last snapshot used for diff computation.
327    last_snapshot: Option<GraphPersistSnapshot>,
328    /// Disposed flag.
329    disposed: bool,
330}
331
332/// Configuration for a single snapshot+WAL tier pair.
333pub struct AttachTierPair {
334    /// Snapshot tier for full baselines.
335    pub snapshot: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
336    /// Optional WAL tier for delta frames. When `None`, every flush
337    /// writes a full baseline (no incremental WAL).
338    pub wal: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
339}
340
341/// Filter predicate for [`AttachOptions`].
342pub type PathFilter = Box<dyn Fn(&str) -> bool + Send + Sync>;
343
344/// Error callback for [`AttachOptions`].
345pub type ErrorCallback = Box<dyn Fn(&StorageError) + Send + Sync>;
346
347/// Options for [`attach_snapshot_storage`].
348#[derive(Default)]
349pub struct AttachOptions {
350    /// Per-path filter. Return `true` to persist changes for this path.
351    /// `None` means persist all paths.
352    pub filter: Option<PathFilter>,
353    /// Error callback invoked when a flush fails.
354    pub on_error: Option<ErrorCallback>,
355}
356
357/// Handle returned by [`attach_snapshot_storage`].
358///
359/// D246 r3: there is **no RAII `Drop`**. The observe subscription is
360/// torn down owner-side, synchronously, by calling
361/// [`StorageHandle::detach`] with the owner's `&Core` (the
362/// [`graphrefly_core::OwnedCore`] borrow). [`StorageHandle::dispose`]
363/// remains a Core-free flag flip that stops late-fire persistence
364/// without unsubscribing (e.g. when the `&Core` is not in hand);
365/// `detach` is the full teardown.
366///
367/// `Graph` is now `Send + Sync + 'static` and Core-free; the held
368/// [`GraphObserveAllReactive`] owns a cheap `Arc`-clone of the graph
369/// (no `'g` borrow), so `StorageHandle` is `'static`.
370pub struct StorageHandle {
371    /// Shared state; inner `disposed` flag prevents late-fire callbacks.
372    state: Arc<Mutex<Vec<TierState>>>,
373    /// The observe handle. `detach(core)` unsubscribes all sinks
374    /// (owner-invoked, synchronous — D246 r3). Wrapped in a `Mutex`
375    /// because `GraphObserveAllReactive::detach` is `&mut self` while
376    /// `StorageHandle` exposes `&self` teardown for ergonomics.
377    observe: Mutex<Option<GraphObserveAllReactive>>,
378}
379
380impl StorageHandle {
381    /// Explicitly dispose: flip the per-tier `disposed` flag so the
382    /// in-wave observe sink stops scheduling persistence. Core-free
383    /// (no unsubscribe) — use [`StorageHandle::detach`] for the full
384    /// owner-side teardown when the `&Core` is available.
385    ///
386    /// F3 (QA 2026-05-14): on `PoisonError` (a panic in the observe
387    /// sink poisoned the state mutex), recover via `into_inner()`
388    /// rather than silently no-op. The disposed flag still gets set
389    /// even after a sink panic, which matters for cleanup.
390    pub fn dispose(&self) {
391        let mut states = self
392            .state
393            .lock()
394            .unwrap_or_else(std::sync::PoisonError::into_inner);
395        for s in states.iter_mut() {
396            s.disposed = true;
397        }
398    }
399
400    /// Owner-invoked, synchronous teardown (D246 r3 — replaces the
401    /// retired RAII `Drop`). Flips the `disposed` flags then detaches
402    /// the observe handle (unsubscribes every per-node sink, the
403    /// namespace-change sink, and the topology sub) via the owner's
404    /// `&Core`. Idempotent: a second call is a no-op (the observe
405    /// handle is taken on first call).
406    pub fn detach(&self, core: &Core) {
407        self.dispose();
408        let observe = self
409            .observe
410            .lock()
411            .unwrap_or_else(std::sync::PoisonError::into_inner)
412            .take();
413        if let Some(mut observe) = observe {
414            observe.detach(core);
415        }
416    }
417
418    /// Drain pending writes on all tiers (D171).
419    ///
420    /// When a tier is configured with `debounce_ms > 0`,
421    /// [`attach_snapshot_storage`]'s observe sink writes via the
422    /// tier's `save()` but skips the inline `flush()`. Callers
423    /// invoke this method — typically from a binding-side reactive
424    /// timer subgraph (`graphrefly_operators::temporal::interval`,
425    /// shipped in Slice T) — to commit the buffered writes to their
426    /// backends. With `debounce_ms == 0`, the observe sink already
427    /// force-flushes inline; calling this method is then a no-op
428    /// because the tier's pending buffers are empty.
429    ///
430    /// Returns the first error encountered, if any. Successful tiers
431    /// in the same call still flush; the error is surfaced for the
432    /// caller's diagnostics.
433    ///
434    /// # Lock discipline (N3, QA 2026-05-14)
435    ///
436    /// The state mutex is **not** held across `tier.flush()` calls.
437    /// Holding it across blocking I/O would (a) serialize the
438    /// reactive graph against backend latency (every observe-sink
439    /// fire would wait on flush completion), and (b) deadlock if a
440    /// caller invokes `flush_all` from inside an `on_error` callback
441    /// (the observe sink already holds `state.lock()` when it
442    /// invokes `on_error`). The implementation snapshots the per-
443    /// tier flush requests under the lock, drops the lock, then
444    /// runs the flushes in sequence. After each flush, a brief
445    /// re-lock applies any per-tier state mutations (none today —
446    /// `flush()` doesn't return new state).
447    ///
448    /// # Errors
449    ///
450    /// Returns the first `StorageError` encountered. Subsequent
451    /// errors are dropped (acceptable v1; the registered
452    /// `on_error` callback fires per error in the observe sink path
453    /// and is the canonical multi-error reporting surface).
454    ///
455    /// F3: recovers from a poisoned state mutex via `into_inner`
456    /// rather than silently returning Ok.
457    pub fn flush_all(&self) -> Result<(), StorageError> {
458        // Snapshot the per-tier flush callables under the lock,
459        // then drop the lock before invoking them. Each tier's
460        // `flush()` is a `&self` method on `Box<dyn ...>` — to call
461        // it without holding the outer lock, we collect raw
462        // pointers... no, `Box<dyn ...>` can't be cloned. Instead,
463        // we re-lock briefly per tier to call flush, but never hold
464        // the lock across MULTIPLE tier flushes — that's the key
465        // deadlock avoidance.
466        //
467        // The remaining single-tier-lock-during-flush window can
468        // deadlock if `tier.flush()` synchronously re-enters
469        // `dispose()` / `flush_all()` (which would try to re-acquire
470        // `self.state`). Tier impls in `graphrefly-storage` don't do
471        // that; document the contract for future binding-side tier
472        // impls in the rustdoc.
473        let tier_count = {
474            let states = self
475                .state
476                .lock()
477                .unwrap_or_else(std::sync::PoisonError::into_inner);
478            states.len()
479        };
480        let mut first_err: Option<StorageError> = None;
481        for idx in 0..tier_count {
482            // Per-tier flush: brief lock to access the tier, run
483            // flush, drop lock immediately. The tier itself is
484            // behind a Box<dyn ...> inside the Vec; flush() takes
485            // &self, so the &mut on the outer Vec is only needed
486            // for `s.disposed` check.
487            let snapshot_err: Option<StorageError>;
488            let wal_err: Option<StorageError>;
489            {
490                let mut states = self
491                    .state
492                    .lock()
493                    .unwrap_or_else(std::sync::PoisonError::into_inner);
494                let Some(s) = states.get_mut(idx) else {
495                    continue;
496                };
497                if s.disposed {
498                    continue;
499                }
500                // Call flush while holding the lock — this is
501                // narrow-scoped and the only contention is with the
502                // observe sink, which is itself bounded by wave
503                // dispatch. No `on_error` invocation under this
504                // lock (callers receive errors via the return
505                // value, not via the registered callback).
506                snapshot_err = s.snapshot_tier.flush().err();
507                wal_err = s.wal_tier.as_ref().and_then(|wal| wal.flush().err());
508            }
509            if let Some(e) = snapshot_err {
510                if first_err.is_none() {
511                    first_err = Some(e);
512                }
513            }
514            if let Some(e) = wal_err {
515                if first_err.is_none() {
516                    first_err = Some(e);
517                }
518            }
519        }
520        match first_err {
521            None => Ok(()),
522            Some(e) => Err(e),
523        }
524    }
525}
526
527// D246 r3: no `impl Drop` — teardown is owner-invoked
528// ([`StorageHandle::detach`]); a parameterless `Drop` cannot reach
529// `&Core`. The observe subscription is opened via raw `core.subscribe`
530// and is NOT `OwnedCore`-tracked: you MUST call `detach(core)` —
531// dropping a `StorageHandle` without it (and without a subsequent
532// `graph.destroy(core)`) leaks the subscription for the `Core`
533// lifetime. `OwnedCore` drop does NOT collect it.
534
535/// Wire an observe subscription on `graph` that persists node changes
536/// to the provided snapshot+WAL tier pairs.
537///
538/// # Debounce (D171, resolved 2026-05-14)
539///
540/// When a tier's `debounce_ms` is `Some(ms)` with `ms > 0`, the
541/// observe callback writes via the tier's `save()` (which buffers
542/// internally per the [`BaseStorageTier`](crate::tier::BaseStorageTier)
543/// contract) but does NOT force a `flush()`. Callers drain pending
544/// writes by invoking [`StorageHandle::flush_all`] — typically from a
545/// binding-side reactive timer (e.g.,
546/// `graphrefly_operators::temporal::interval`). This keeps
547/// `graphrefly-storage` sync + binding-agnostic per the canonical
548/// no-async-in-storage invariant; reactive-timer wiring lives in the
549/// layer that already owns the timer primitive.
550///
551/// When `debounce_ms` is `None` or `Some(0)`, the observe callback
552/// continues to force-flush inline as before.
553///
554/// # `key_of` (D174, closes F8)
555///
556/// The snapshot tier's backend key is derived from `graph.name` via
557/// the checkpoint record's `name` field. Cross-impl `key_of` divergence
558/// disappears at this boundary.
559/// D246: the embedder owns the [`Core`] (see
560/// [`graphrefly_core::OwnedCore`]) and passes `&Core` in — the
561/// `observe_all_reactive` subscription is opened owner-side. Teardown
562/// is owner-invoked via [`StorageHandle::detach`] (no RAII `Drop`,
563/// D246 r3).
564#[must_use = "the returned StorageHandle owns the observe subscription; \
565              call StorageHandle::detach(core) to unsubscribe and stop \
566              persistence (D246 r3 — no RAII Drop)"]
567pub fn attach_snapshot_storage(
568    core: &Core,
569    graph: &Graph,
570    pairs: Vec<AttachTierPair>,
571    options: AttachOptions,
572) -> StorageHandle {
573    let graph_name = graph.name();
574    let wal_prefix = graph_wal_prefix(&graph_name);
575
576    let mut states = Vec::with_capacity(pairs.len());
577    for pair in pairs {
578        // Bootstrap: enumerate existing WAL frames to find high-water seq.
579        let mut high_seq: u64 = 0;
580        if let Some(ref wal) = pair.wal {
581            if let Ok(keys) = wal.list(&wal_prefix) {
582                for key in keys {
583                    if let Some(seg) = key.rsplit('/').next() {
584                        if let Ok(s) = seg.parse::<u64>() {
585                            high_seq = high_seq.max(s);
586                        }
587                    }
588                }
589            }
590        }
591
592        let compact_every = pair.snapshot.compact_every().unwrap_or(10);
593        let snapshot_debounce = pair.snapshot.debounce_ms().unwrap_or(0);
594        let wal_debounce = pair.wal.as_ref().and_then(|w| w.debounce_ms()).unwrap_or(0);
595
596        states.push(TierState {
597            snapshot_tier: pair.snapshot,
598            wal_tier: pair.wal,
599            wal_prefix: wal_prefix.clone(),
600            seq: high_seq,
601            flush_count: 0,
602            compact_every,
603            snapshot_debounce_ms: snapshot_debounce,
604            wal_debounce_ms: wal_debounce,
605            last_snapshot: None,
606            disposed: false,
607        });
608    }
609
610    let shared_states = Arc::new(Mutex::new(states));
611    let states_for_sink = shared_states.clone();
612    let filter = options.filter;
613    // β/D244: wrap in `Arc` so each per-fire `MailboxOp::Defer` can
614    // clone the callback (`ErrorCallback` is a non-`Clone` `Box`).
615    let on_error = options.on_error.map(Arc::new);
616    // β/D242/D244/D246: the observe sink fires *in-wave* (during Core
617    // dispatch) and has no `&Core`. `Graph` is now Core-free and
618    // `Send + Sync + 'static` (D246), so a cheap `Arc`-clone is
619    // captured directly into the deferred closure (the old
620    // `NamespaceHandle` split is gone). The snapshot + tier-flush is
621    // posted as a `MailboxOp::Defer` and runs on the owner with a real
622    // `&dyn CoreFull`. Behaviour: snapshot+persist is in-wave-deferred
623    // to quiescence rather than synchronous in the sink — consistent
624    // with D243/D244 "deferred snapshot acceptable" (storage
625    // persistence is a downstream observation).
626    let graph_for_sink = graph.clone();
627    // D249/S2c: the defer closure captures a `Graph`
628    // (`Rc<RefCell<GraphInner>>`, `!Send` post-D248), so it must use
629    // the owner-only `!Send` `DeferQueue`, not the `Send` `CoreMailbox`
630    // Defer. Owner-thread-only `Rc` — fine: `observe_all_reactive`'s
631    // sink is `!Send` and fires owner-side.
632    let deferred = core.defer_queue();
633
634    // D246 rule 8 (S4): reusable coalescing slot. The snapshot+persist
635    // is idempotent at drain time (`snapshot_full` reads current state),
636    // so N qualifying emissions in one wave need only ONE deferred
637    // snapshot+flush, not N boxed closures. `scheduled` (owner-thread-
638    // only `Cell`) gates a single `Box` post per drain; the closure
639    // clears it so the next wave re-arms.
640    //
641    // M2 (QA, 2026-05-19): `compact_every` cadence is parameterised
642    // per-emission (TS parity), so the coalescing MUST NOT also
643    // collapse the count. `pending_count` tracks the number of
644    // qualifying emits observed in the wave; the closure consumes it
645    // and `flush_tier` increments `flush_count` by that count + tests
646    // boundary-crossing of `compact_every`, preserving the per-emission
647    // compact cadence under the per-wave snapshot. Persisted state is
648    // the final wave state either way (deferred-snapshot acceptable,
649    // D243/D244) — only the *cadence* needs the count.
650    let scheduled = std::rc::Rc::new(std::cell::Cell::new(false));
651    let pending_count = std::rc::Rc::new(std::cell::Cell::new(0usize));
652
653    // Wire observe_all_reactive so late-added nodes are also covered.
654    let mut observe = graph.observe_all_reactive();
655    observe.subscribe(core, move |path: &str, messages: &[Message]| {
656        // Filter: only tiers 3–5 (DATA/RESOLVED, INVALIDATE, COMPLETE/ERROR).
657        let dominated_by_tier = messages.iter().any(|m| {
658            let t = m.tier();
659            (3..6).contains(&t)
660        });
661        if !dominated_by_tier {
662            return;
663        }
664
665        // Optional path filter (cheap, no Core — stays synchronous).
666        if let Some(ref f) = filter {
667            if !f(path) {
668                return;
669            }
670        }
671
672        // M2: count this qualifying emit BEFORE the coalesce gate, so
673        // `compact_every` cadence stays per-emission. Saturating add —
674        // a `usize` overflow on count-per-wave is unreachable in any
675        // realistic workload (max waves are bounded by `max_ops`).
676        pending_count.set(pending_count.get().saturating_add(1));
677
678        if scheduled.get() {
679            return; // already armed for this drain — coalesce.
680        }
681        let graph_for_defer = graph_for_sink.clone();
682        let states_for_defer = states_for_sink.clone();
683        let on_error = on_error.clone();
684        let sched = std::rc::Rc::clone(&scheduled);
685        let pc = std::rc::Rc::clone(&pending_count);
686        sched.set(true);
687        // Defer the snapshot + flush owner-side (D244). Core-gone
688        // (`false`) ⇒ dropped unrun: nothing to persist on a
689        // torn-down graph, no handles captured (no leak).
690        let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
691            sched.set(false);
692            // M2: consume the per-emission count and reset for the
693            // next wave.
694            let count = pc.replace(0);
695            // Take a snapshot once, shared across all sync tiers.
696            // D246: the in-wave facade `&dyn CoreFull` drives the
697            // snapshot through the one public `Graph` type (Core-free,
698            // Send+Sync — captured directly).
699            let snapshot = graph_for_defer.snapshot_full(cf);
700
701            // N3 (QA 2026-05-14) — collect errors INSIDE the lock,
702            // invoke `on_error` AFTER releasing the lock (re-entrant
703            // `flush_all`/`dispose` would otherwise deadlock). F3:
704            // recover from a poisoned lock via `into_inner`.
705            let collected_errors: Vec<StorageError> = {
706                let mut states = states_for_defer
707                    .lock()
708                    .unwrap_or_else(std::sync::PoisonError::into_inner);
709                let mut errs = Vec::new();
710                for s in states.iter_mut() {
711                    if s.disposed {
712                        continue;
713                    }
714                    if let Err(e) = flush_tier(s, &snapshot, count) {
715                        errs.push(e);
716                    }
717                }
718                errs
719            };
720            if !collected_errors.is_empty() {
721                if let Some(ref cb) = on_error {
722                    for e in &collected_errors {
723                        cb(e);
724                    }
725                }
726            }
727        }));
728    });
729
730    StorageHandle {
731        state: shared_states,
732        observe: Mutex::new(Some(observe)),
733    }
734}
735
736/// Flush a single tier: either full baseline or WAL delta.
737///
738/// `count` is the number of qualifying observed emits this coalesced
739/// flush represents (M2 / QA 2026-05-19). `flush_count` advances by
740/// `count` to preserve the TS-parity per-emission cadence of
741/// `compact_every`: `write_full` fires if any of the increments in
742/// `before+1..=before+count` would have been a multiple of
743/// `compact_every` (boundary-crossing test) — equivalent to firing on
744/// each emit one-by-one, just batched into the coalesced flush.
745fn flush_tier(
746    s: &mut TierState,
747    snapshot: &GraphPersistSnapshot,
748    count: usize,
749) -> Result<(), StorageError> {
750    let before = s.flush_count;
751    // Defensive: a `count == 0` call (no qualifying emits) is a no-op
752    // here — still walks the write path so first-baseline & WAL deltas
753    // honor `last_snapshot` correctness, but doesn't tick cadence.
754    let inc = count as u64;
755    s.flush_count = s.flush_count.saturating_add(inc);
756
757    let write_full = s.wal_tier.is_none()
758        || s.last_snapshot.is_none()
759        || (s.compact_every > 0 && {
760            // Did the batch [before+1 ..= before+count] cross at least
761            // one multiple of `compact_every`? Integer-division test.
762            let cmp = u64::from(s.compact_every);
763            (before / cmp) < (s.flush_count / cmp)
764        });
765
766    if write_full {
767        write_full_baseline(s, snapshot)?;
768    } else {
769        write_wal_delta(s, snapshot)?;
770    }
771
772    s.last_snapshot = Some(snapshot.clone());
773    Ok(())
774}
775
776/// Write a full baseline snapshot to the snapshot tier.
777///
778/// When `snapshot_debounce_ms > 0` (D171), the inner `save()` buffers
779/// per the tier's `BaseStorageTier` contract and we DON'T force a
780/// `flush()`. The caller drains via [`StorageHandle::flush_all`].
781fn write_full_baseline(
782    s: &mut TierState,
783    snapshot: &GraphPersistSnapshot,
784) -> Result<(), StorageError> {
785    let timestamp_ns = graphrefly_core::wall_clock_ns();
786    let record = GraphCheckpointRecord {
787        name: snapshot.name.clone(),
788        mode: "full".to_owned(),
789        snapshot: snapshot.clone(),
790        seq: s.seq,
791        timestamp_ns,
792        format_version: SNAPSHOT_VERSION,
793    };
794
795    s.snapshot_tier.save(record)?;
796    if s.snapshot_debounce_ms == 0 {
797        s.snapshot_tier.flush()?;
798    }
799    Ok(())
800}
801
802/// Write WAL delta frames for the diff between `last_snapshot` and current.
803///
804/// D171: WAL `flush()` is skipped when the WAL tier has
805/// `debounce_ms > 0`; the caller drains via
806/// [`StorageHandle::flush_all`].
807fn write_wal_delta(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
808    let last = s
809        .last_snapshot
810        .as_ref()
811        .expect("caller ensures last_snapshot is Some");
812    let diff = diff_snapshots(last, snapshot);
813
814    if diff.is_empty() {
815        return Ok(());
816    }
817
818    let timestamp_ns = graphrefly_core::wall_clock_ns();
819    let (frames, next_seq) = decompose_diff_to_frames(&diff, timestamp_ns, s.seq)?;
820
821    if let Some(ref wal) = s.wal_tier {
822        for frame in &frames {
823            let key = wal_frame_key(&s.wal_prefix, frame.frame_seq);
824            wal.save(&key, frame.clone())?;
825        }
826        if s.wal_debounce_ms == 0 {
827            wal.flush()?;
828        }
829    }
830
831    s.seq = next_seq;
832    Ok(())
833}
834
835// ── Restore (D170) ─────────────────────────────────────────────────────────
836
837/// Torn-write policy for mid-stream checksum failures.
838#[derive(Debug, Clone, Copy, PartialEq, Eq)]
839pub enum TornWritePolicy {
840    /// Drop the frame and continue (default for WAL tail).
841    Skip,
842    /// Abort the entire restore.
843    Abort,
844}
845
846/// Callback for torn-write decisions. Receives the `frame_seq` and
847/// reason; returns the desired policy.
848pub type OnTornWrite = Box<dyn Fn(u64, &str) -> TornWritePolicy + Send + Sync>;
849
850/// Options for [`restore_snapshot`].
851pub struct RestoreOptions<'a> {
852    /// Snapshot tier to load the baseline from.
853    pub snapshot_tier: &'a dyn SnapshotStorageTier<GraphCheckpointRecord>,
854    /// WAL tier to enumerate delta frames from.
855    pub wal_tier: &'a dyn KvStorageTier<WALFrame<Value>>,
856    /// Optional max `frame_seq` to replay up to. `None` = replay all.
857    pub target_seq: Option<u64>,
858    /// Torn-write callback. If `None`, defaults: tail = Skip, mid = Abort.
859    pub on_torn_write: Option<OnTornWrite>,
860}
861
862/// Three-phase WAL replay: baseline load → checksum verify → lifecycle-
863/// scoped batch.
864///
865/// # Phase 1: Baseline
866///
867/// Loads the `mode:"full"` baseline from the snapshot tier. The snapshot
868/// key is derived from `graph.name` (D174).
869///
870/// # Phase 2: Checksum verification
871///
872/// Enumerates WAL frames with `frame_seq > baseline.seq`, verifies each
873/// frame's SHA-256 checksum, applies torn-write policy on mismatch.
874///
875/// # Phase 3: Lifecycle-scoped batch replay
876///
877/// Groups verified frames by lifecycle. Replays in cross-scope order
878/// (`Spec → Data → Ownership`). Each lifecycle runs in a `graph.batch()`
879/// for atomic partial-restore semantics (Q2).
880///
881/// D246: the embedder owns the [`Core`] (see
882/// [`graphrefly_core::OwnedCore`]) and threads `&Core` into every
883/// Core-touching graph mutation (baseline restore + WAL replay).
884pub fn restore_snapshot(
885    core: &Core,
886    graph: &Graph,
887    opts: &RestoreOptions<'_>,
888) -> Result<RestoreResult, RestoreError> {
889    // Phase 1: Load and apply baseline.
890    let baseline = load_baseline(core, graph, opts)?;
891    let baseline_seq = baseline.seq;
892
893    // Phase 1b: Collect WAL frames post-baseline.
894    let collected = collect_wal_frames(opts, &baseline.name, baseline_seq)?;
895
896    // Phase 2: Checksum verification.
897    let (verified, skipped) = verify_frames(collected, opts.on_torn_write.as_ref())?;
898
899    // Phase 3: Lifecycle-scoped batch replay.
900    Ok(replay_by_lifecycle(
901        core,
902        graph,
903        &verified,
904        baseline_seq,
905        skipped,
906    ))
907}
908
909/// Phase 1: Load baseline from snapshot tier + apply to graph.
910fn load_baseline(
911    core: &Core,
912    graph: &Graph,
913    opts: &RestoreOptions<'_>,
914) -> Result<GraphCheckpointRecord, RestoreError> {
915    let baseline = opts
916        .snapshot_tier
917        .load()
918        .map_err(|e| RestoreError::PhaseFailed {
919            lifecycle: Lifecycle::Spec,
920            frame_seq: 0,
921            message: format!("baseline load failed: {e}"),
922        })?
923        .ok_or(RestoreError::BaselineMissing)?;
924
925    if baseline.mode != "full" {
926        return Err(RestoreError::BaselineMissing);
927    }
928
929    graph
930        .restore(core, &baseline.snapshot)
931        .map_err(|e| RestoreError::PhaseFailed {
932            lifecycle: Lifecycle::Spec,
933            frame_seq: 0,
934            message: format!("baseline restore failed: {e}"),
935        })?;
936
937    Ok(baseline)
938}
939
940/// Phase 1b: Enumerate + filter WAL frames.
941fn collect_wal_frames(
942    opts: &RestoreOptions<'_>,
943    graph_name: &str,
944    baseline_seq: u64,
945) -> Result<Vec<WALFrame<Value>>, RestoreError> {
946    let wal_prefix = graph_wal_prefix(graph_name);
947    let keys = opts
948        .wal_tier
949        .list(&wal_prefix)
950        .map_err(|e| RestoreError::PhaseFailed {
951            lifecycle: Lifecycle::Spec,
952            frame_seq: 0,
953            message: format!("WAL frame enumeration failed: {e}"),
954        })?;
955
956    let mut collected: Vec<WALFrame<Value>> = Vec::new();
957    for key in keys {
958        let frame_seq = key
959            .rsplit('/')
960            .next()
961            .and_then(|s| s.parse::<u64>().ok())
962            .unwrap_or(0);
963
964        if frame_seq <= baseline_seq {
965            continue;
966        }
967        if let Some(target) = opts.target_seq {
968            if frame_seq > target {
969                continue;
970            }
971        }
972
973        if let Some(frame) = opts
974            .wal_tier
975            .load(&key)
976            .map_err(|e| RestoreError::PhaseFailed {
977                lifecycle: Lifecycle::Data,
978                frame_seq,
979                message: format!("WAL frame load failed: {e}"),
980            })?
981        {
982            collected.push(frame);
983        }
984    }
985
986    collected.sort_by_key(|f| f.frame_seq);
987    Ok(collected)
988}
989
990/// Phase 2: Verify checksums, apply torn-write policy.
991fn verify_frames(
992    collected: Vec<WALFrame<Value>>,
993    on_torn_write: Option<&OnTornWrite>,
994) -> Result<(Vec<WALFrame<Value>>, u64), RestoreError> {
995    let mut verified = Vec::new();
996    let mut skipped: u64 = 0;
997    let total = collected.len();
998
999    for (i, frame) in collected.into_iter().enumerate() {
1000        if verify_wal_frame_checksum(&frame).unwrap_or(false) {
1001            verified.push(frame);
1002            continue;
1003        }
1004
1005        let is_tail = i == total - 1;
1006        let policy = if let Some(cb) = on_torn_write {
1007            cb(frame.frame_seq, "checksum-mismatch")
1008        } else if is_tail {
1009            TornWritePolicy::Skip
1010        } else {
1011            TornWritePolicy::Abort
1012        };
1013
1014        match policy {
1015            TornWritePolicy::Skip => skipped += 1,
1016            TornWritePolicy::Abort => {
1017                return Err(RestoreError::TornWriteMidStream {
1018                    frame_seq: frame.frame_seq,
1019                    reason: "checksum-mismatch".to_owned(),
1020                });
1021            }
1022        }
1023    }
1024
1025    Ok((verified, skipped))
1026}
1027
1028/// Phase 3: Group by lifecycle, replay in cross-scope order.
1029fn replay_by_lifecycle(
1030    core: &Core,
1031    graph: &Graph,
1032    verified: &[WALFrame<Value>],
1033    baseline_seq: u64,
1034    skipped: u64,
1035) -> RestoreResult {
1036    let mut grouped: [Vec<WALFrame<Value>>; 3] = [Vec::new(), Vec::new(), Vec::new()];
1037    for frame in verified {
1038        for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
1039            if frame.lifecycle == *lifecycle {
1040                grouped[idx].push(frame.clone());
1041                break;
1042            }
1043        }
1044    }
1045
1046    let mut phases = Vec::new();
1047    let mut replayed: u64 = 0;
1048    let mut final_seq: u64 = baseline_seq;
1049
1050    for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
1051        let life_frames = &grouped[idx];
1052        if life_frames.is_empty() {
1053            continue;
1054        }
1055        let frame_count = life_frames.len() as u64;
1056        let max_seq = life_frames.iter().map(|f| f.frame_seq).max().unwrap_or(0);
1057
1058        graph.batch(core, || {
1059            for frame in life_frames {
1060                apply_wal_frame(core, graph, frame);
1061            }
1062        });
1063
1064        replayed += frame_count;
1065        final_seq = final_seq.max(max_seq);
1066        phases.push(PhaseStat {
1067            lifecycle: *lifecycle,
1068            frames: frame_count,
1069        });
1070    }
1071
1072    RestoreResult {
1073        replayed_frames: replayed,
1074        skipped_frames: skipped,
1075        final_seq,
1076        phases,
1077    }
1078}
1079
1080/// Apply a single WAL frame to a graph. Mirrors TS `applyWalFrame`.
1081///
1082/// D246: every Core-touching `graph.*` call takes the owner's `&Core`;
1083/// binding access is `core.binding_ptr()` (the retired `Graph::core()`
1084/// is gone — `Graph` is Core-free).
1085fn apply_wal_frame(core: &Core, graph: &Graph, frame: &WALFrame<Value>) {
1086    let change = &frame.change.change;
1087    let kind = change.get("kind").and_then(Value::as_str).unwrap_or("");
1088
1089    match frame.lifecycle {
1090        Lifecycle::Spec => match kind {
1091            "graph.add" => {
1092                let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
1093                if node_id_str.is_empty() || graph.try_resolve(node_id_str).is_some() {
1094                    return; // already present or invalid
1095                }
1096                // Only auto-create state nodes (matches TS behavior).
1097                let slice = change.get("slice");
1098                let node_type = slice
1099                    .and_then(|s| s.get("type"))
1100                    .and_then(Value::as_str)
1101                    .unwrap_or("");
1102                if node_type != "state" {
1103                    return;
1104                }
1105                let initial_value = slice.and_then(|s| s.get("value")).cloned();
1106                let handle = initial_value.map_or(graphrefly_core::NO_HANDLE, |v| {
1107                    core.binding_ptr().deserialize_value(v)
1108                });
1109                let _ = graph.state(core, node_id_str, Some(handle));
1110            }
1111            "graph.remove" => {
1112                let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
1113                if !node_id_str.is_empty() && graph.try_resolve(node_id_str).is_some() {
1114                    let _ = graph.remove(core, node_id_str);
1115                }
1116            }
1117            // graph.mount, graph.unmount — deferred (Phase 14.6+)
1118            _ => {}
1119        },
1120        Lifecycle::Data => match kind {
1121            "node.set" => {
1122                let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1123                if let Some(value) = change.get("value") {
1124                    if !path.is_empty() && graph.try_resolve(path).is_some() {
1125                        let handle = core.binding_ptr().deserialize_value(value.clone());
1126                        graph.set(core, path, handle);
1127                    }
1128                }
1129            }
1130            "node.invalidate" => {
1131                let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1132                if !path.is_empty() {
1133                    if let Some(id) = graph.try_resolve(path) {
1134                        graph.invalidate(core, id);
1135                    }
1136                }
1137            }
1138            // node.versionBump — deferred (V0 versioning is internal)
1139            _ => {}
1140        },
1141        // Ownership lifecycle — deferred (Phase 13)
1142        Lifecycle::Ownership => {}
1143    }
1144}