Skip to main content

graphrefly_storage/
graph_integration.rs

1//! Graph-level storage integration (M4.E2 — D170–D174).
2//!
3//! Free functions (not `Graph` methods) because `graphrefly-graph` does not
4//! depend on `graphrefly-storage` — the dependency flows in the other
5//! direction. See D170.
6//!
7//! # Provided APIs
8//!
9//! - [`GraphCheckpointRecord`] — portable baseline type wrapping a
10//!   [`GraphPersistSnapshot`] + seq metadata + `format_version` (F7 close).
11//! - [`diff_snapshots`] / [`decompose_diff_to_frames`] — snapshot diff →
12//!   WAL frame generation engine (D172).
13//! - [`attach_snapshot_storage`] + [`StorageHandle`] — wire observe
14//!   subscription → snapshot diff → WAL frame writes.
15//! - [`restore_snapshot`] — three-phase replay (baseline → checksum verify
16//!   → lifecycle-scoped batch).
17//!
18//! # Manifest (D173 — F4 close)
19//!
20//! F4 (cross-restart key recovery) is structurally closed by D174: at the
21//! attach boundary, `key_of` is derived deterministically from
22//! `graph.name`. On restore, the snapshot key is known without a separate
23//! manifest entry. The checkpoint record's `seq` field serves as the WAL
24//! high-water mark.
25
26use std::sync::{Arc, Mutex};
27
28use graphrefly_core::{Core, CoreFull, Message};
29use graphrefly_graph::{Graph, GraphObserveAllReactive, GraphPersistSnapshot, NodeSlice};
30use graphrefly_structures::{BaseChange, Lifecycle, Version};
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33
34use crate::error::{PhaseStat, RestoreError, RestoreResult, StorageError};
35use crate::tier::{KvStorageTier, SnapshotStorageTier};
36use crate::wal::{
37    graph_wal_prefix, verify_wal_frame_checksum, wal_frame_checksum, wal_frame_key, WALFrame,
38    WalTag, REPLAY_ORDER,
39};
40
41// ── Constants ──────────────────────────────────────────────────────────────
42
43/// Current snapshot format version. Embedded in [`GraphCheckpointRecord`]
44/// and in `BaseChange.version` within decomposed WAL frames (F7 close).
45pub const SNAPSHOT_VERSION: u64 = 1;
46
47// ── Types ──────────────────────────────────────────────────────────────────
48
49/// Portable baseline record written by [`attach_snapshot_storage`] on
50/// full-snapshot writes. Contains the full [`GraphPersistSnapshot`] plus
51/// metadata for WAL cursor alignment.
52///
53/// The `format_version` field closes F7 (missing `format_version` on
54/// `WALFrame` and checkpoint records).
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GraphCheckpointRecord {
57    /// Graph name (matches `snapshot.name`).
58    pub name: String,
59    /// Snapshot mode — `"full"` for baseline, `"diff"` reserved for future
60    /// incremental baselines.
61    pub mode: String,
62    /// The complete graph state.
63    pub snapshot: GraphPersistSnapshot,
64    /// WAL-tier cursor at baseline write time. Frames with `frame_seq >
65    /// seq` are the delta.
66    pub seq: u64,
67    /// Wall-clock timestamp at baseline write time.
68    pub timestamp_ns: u64,
69    /// Format version (F7 close).
70    pub format_version: u64,
71}
72
73// ── Diff engine (D172) ─────────────────────────────────────────────────────
74
75/// Structural diff between two [`GraphPersistSnapshot`]s.
76#[derive(Debug, Clone)]
77pub struct GraphSnapshotDiff {
78    /// Node names present in `after` but not `before`.
79    pub nodes_added: Vec<String>,
80    /// Full slices for added nodes (parallel to `nodes_added`).
81    pub nodes_added_slices: Vec<NodeSlice>,
82    /// Node names present in `before` but not `after`.
83    pub nodes_removed: Vec<String>,
84    /// Nodes whose `value` field changed between snapshots.
85    pub value_changes: Vec<ValueChange>,
86    /// Subgraph mount names added.
87    pub subgraphs_added: Vec<String>,
88    /// Subgraph mount names removed.
89    pub subgraphs_removed: Vec<String>,
90}
91
92impl GraphSnapshotDiff {
93    /// True when no structural or value changes were detected.
94    #[must_use]
95    pub fn is_empty(&self) -> bool {
96        self.nodes_added.is_empty()
97            && self.nodes_removed.is_empty()
98            && self.value_changes.is_empty()
99            && self.subgraphs_added.is_empty()
100            && self.subgraphs_removed.is_empty()
101    }
102}
103
104/// A single node value change detected by [`diff_snapshots`].
105#[derive(Debug, Clone)]
106pub struct ValueChange {
107    /// Node path that changed.
108    pub path: String,
109    /// New value. `None` means the node transitioned to sentinel (INVALIDATE).
110    pub to: Option<Value>,
111}
112
113/// Compare two snapshots and produce a structural diff.
114///
115/// Only examines the top-level namespace (not recursive into subgraphs —
116/// subgraph diffs are handled by the attach wiring per-subgraph).
117#[must_use]
118pub fn diff_snapshots(
119    before: &GraphPersistSnapshot,
120    after: &GraphPersistSnapshot,
121) -> GraphSnapshotDiff {
122    let mut nodes_added = Vec::new();
123    let mut nodes_added_slices = Vec::new();
124    let mut nodes_removed = Vec::new();
125    let mut value_changes = Vec::new();
126    let mut subgraphs_added = Vec::new();
127    let mut subgraphs_removed = Vec::new();
128
129    // Nodes added or changed.
130    for (name, after_slice) in &after.nodes {
131        if let Some(before_slice) = before.nodes.get(name) {
132            if before_slice.value != after_slice.value {
133                value_changes.push(ValueChange {
134                    path: name.clone(),
135                    to: after_slice.value.clone(),
136                });
137            }
138        } else {
139            nodes_added.push(name.clone());
140            nodes_added_slices.push(after_slice.clone());
141        }
142    }
143
144    // Nodes removed.
145    for name in before.nodes.keys() {
146        if !after.nodes.contains_key(name) {
147            nodes_removed.push(name.clone());
148        }
149    }
150
151    // Subgraphs added/removed.
152    for name in after.subgraphs.keys() {
153        if !before.subgraphs.contains_key(name) {
154            subgraphs_added.push(name.clone());
155        }
156    }
157    for name in before.subgraphs.keys() {
158        if !after.subgraphs.contains_key(name) {
159            subgraphs_removed.push(name.clone());
160        }
161    }
162
163    GraphSnapshotDiff {
164        nodes_added,
165        nodes_added_slices,
166        nodes_removed,
167        value_changes,
168        subgraphs_added,
169        subgraphs_removed,
170    }
171}
172
173/// Intermediate frame before checksum stamping.
174struct DecomposedFrame {
175    lifecycle: Lifecycle,
176    path: String,
177    change: BaseChange<Value>,
178}
179
180/// Convert a [`GraphSnapshotDiff`] into WAL frames ready for persistence.
181///
182/// `timestamp_ns` is the wall-clock at diff time. `base_seq` is the WAL
183/// cursor; returned frames have `frame_seq` values starting at
184/// `base_seq + 1`.
185///
186/// Returns `(frames, next_seq)` where `next_seq` is the highest assigned
187/// `frame_seq`.
188pub fn decompose_diff_to_frames(
189    diff: &GraphSnapshotDiff,
190    timestamp_ns: u64,
191    base_seq: u64,
192) -> Result<(Vec<WALFrame<Value>>, u64), StorageError> {
193    let mut decomposed = Vec::new();
194
195    let wrap = |structure: &str, lifecycle: Lifecycle, payload: Value| -> BaseChange<Value> {
196        BaseChange {
197            structure: structure.to_owned(),
198            version: Version::Counter(SNAPSHOT_VERSION),
199            t_ns: timestamp_ns,
200            seq: None,
201            lifecycle,
202            change: payload,
203        }
204    };
205
206    // Spec lifecycle: node add/remove, subgraph mount/unmount.
207    for (i, name) in diff.nodes_added.iter().enumerate() {
208        let slice = &diff.nodes_added_slices[i];
209        let payload = serde_json::json!({
210            "kind": "graph.add",
211            "nodeId": name,
212            "slice": serde_json::to_value(slice).map_err(|e|
213                StorageError::Codec(crate::codec::CodecError::Encode(e.to_string()))
214            )?,
215        });
216        decomposed.push(DecomposedFrame {
217            lifecycle: Lifecycle::Spec,
218            path: name.clone(),
219            change: wrap("graph.spec", Lifecycle::Spec, payload),
220        });
221    }
222
223    for name in &diff.nodes_removed {
224        let payload = serde_json::json!({
225            "kind": "graph.remove",
226            "nodeId": name,
227        });
228        decomposed.push(DecomposedFrame {
229            lifecycle: Lifecycle::Spec,
230            path: name.clone(),
231            change: wrap("graph.spec", Lifecycle::Spec, payload),
232        });
233    }
234
235    for name in &diff.subgraphs_added {
236        let payload = serde_json::json!({
237            "kind": "graph.mount",
238            "path": name,
239            "subgraphId": name,
240        });
241        decomposed.push(DecomposedFrame {
242            lifecycle: Lifecycle::Spec,
243            path: name.clone(),
244            change: wrap("graph.spec", Lifecycle::Spec, payload),
245        });
246    }
247
248    for name in &diff.subgraphs_removed {
249        let payload = serde_json::json!({
250            "kind": "graph.unmount",
251            "path": name,
252        });
253        decomposed.push(DecomposedFrame {
254            lifecycle: Lifecycle::Spec,
255            path: name.clone(),
256            change: wrap("graph.spec", Lifecycle::Spec, payload),
257        });
258    }
259
260    // Data lifecycle: value changes.
261    for vc in &diff.value_changes {
262        let payload = if let Some(ref value) = vc.to {
263            serde_json::json!({
264                "kind": "node.set",
265                "path": vc.path,
266                "value": value,
267            })
268        } else {
269            serde_json::json!({
270                "kind": "node.invalidate",
271                "path": vc.path,
272            })
273        };
274        decomposed.push(DecomposedFrame {
275            lifecycle: Lifecycle::Data,
276            path: vc.path.clone(),
277            change: wrap("graph.value", Lifecycle::Data, payload),
278        });
279    }
280
281    // Assign frame_seq and compute checksums.
282    let mut seq = base_seq;
283    let mut frames = Vec::with_capacity(decomposed.len());
284    for d in decomposed {
285        seq += 1;
286        let mut frame = WALFrame {
287            t: WalTag,
288            lifecycle: d.lifecycle,
289            path: d.path,
290            change: d.change,
291            frame_seq: seq,
292            frame_t_ns: timestamp_ns,
293            checksum: String::new(),
294            format_version: 1,
295        };
296        frame.checksum = wal_frame_checksum(&frame)?;
297        frames.push(frame);
298    }
299
300    Ok((frames, seq))
301}
302
303// ── Attach (D170) ──────────────────────────────────────────────────────────
304
305/// Per-tier state managed by the attach wiring.
306struct TierState {
307    /// The snapshot tier (writes full baselines).
308    snapshot_tier: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
309    /// Optional WAL tier (writes individual delta frames).
310    wal_tier: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
311    /// WAL key prefix derived from `graph.name`.
312    wal_prefix: String,
313    /// Monotonic cursor.
314    seq: u64,
315    /// Flush counter for `compact_every` cadence.
316    flush_count: u64,
317    /// Configured compact-every cadence (0 = every flush writes full baseline).
318    compact_every: u32,
319    /// Snapshot tier's `debounce_ms` snapshot at attach time (D171).
320    /// `0` (the unset / disabled case) means inline-flush; `>0` means
321    /// `save()` buffers and the caller drives explicit flush via
322    /// [`StorageHandle::flush_all`].
323    snapshot_debounce_ms: u32,
324    /// WAL tier's `debounce_ms` snapshot at attach time (D171).
325    wal_debounce_ms: u32,
326    /// Last snapshot used for diff computation.
327    last_snapshot: Option<GraphPersistSnapshot>,
328    /// Disposed flag.
329    disposed: bool,
330}
331
332/// Configuration for a single snapshot+WAL tier pair.
333pub struct AttachTierPair {
334    /// Snapshot tier for full baselines.
335    pub snapshot: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
336    /// Optional WAL tier for delta frames. When `None`, every flush
337    /// writes a full baseline (no incremental WAL).
338    pub wal: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
339}
340
341/// Filter predicate for [`AttachOptions`].
342pub type PathFilter = Box<dyn Fn(&str) -> bool + Send + Sync>;
343
344/// Error callback for [`AttachOptions`].
345pub type ErrorCallback = Box<dyn Fn(&StorageError) + Send + Sync>;
346
347/// Options for [`attach_snapshot_storage`].
348#[derive(Default)]
349pub struct AttachOptions {
350    /// Per-path filter. Return `true` to persist changes for this path.
351    /// `None` means persist all paths.
352    pub filter: Option<PathFilter>,
353    /// Error callback invoked when a flush fails.
354    pub on_error: Option<ErrorCallback>,
355}
356
357/// Handle returned by [`attach_snapshot_storage`].
358///
359/// D246 r3: there is **no RAII `Drop`**. The observe subscription is
360/// torn down owner-side, synchronously, by calling
361/// [`StorageHandle::detach`] with the owner's `&Core` (the
362/// [`graphrefly_core::OwnedCore`] borrow). [`StorageHandle::dispose`]
363/// remains a Core-free flag flip that stops late-fire persistence
364/// without unsubscribing (e.g. when the `&Core` is not in hand);
365/// `detach` is the full teardown.
366///
367/// `Graph` is now `Send + Sync + 'static` and Core-free; the held
368/// [`GraphObserveAllReactive`] owns a cheap `Arc`-clone of the graph
369/// (no `'g` borrow), so `StorageHandle` is `'static`.
370pub struct StorageHandle {
371    /// Shared state; inner `disposed` flag prevents late-fire callbacks.
372    state: Arc<Mutex<Vec<TierState>>>,
373    /// The observe handle. `detach(core)` unsubscribes all sinks
374    /// (owner-invoked, synchronous — D246 r3). Wrapped in a `Mutex`
375    /// because `GraphObserveAllReactive::detach` is `&mut self` while
376    /// `StorageHandle` exposes `&self` teardown for ergonomics.
377    observe: Mutex<Option<GraphObserveAllReactive>>,
378}
379
380impl StorageHandle {
381    /// Explicitly dispose: flip the per-tier `disposed` flag so the
382    /// in-wave observe sink stops scheduling persistence. Core-free
383    /// (no unsubscribe) — use [`StorageHandle::detach`] for the full
384    /// owner-side teardown when the `&Core` is available.
385    ///
386    /// F3 (QA 2026-05-14): on `PoisonError` (a panic in the observe
387    /// sink poisoned the state mutex), recover via `into_inner()`
388    /// rather than silently no-op. The disposed flag still gets set
389    /// even after a sink panic, which matters for cleanup.
390    pub fn dispose(&self) {
391        let mut states = self
392            .state
393            .lock()
394            .unwrap_or_else(std::sync::PoisonError::into_inner);
395        for s in states.iter_mut() {
396            s.disposed = true;
397        }
398    }
399
400    /// Owner-invoked, synchronous teardown (D246 r3 — replaces the
401    /// retired RAII `Drop`). Flips the `disposed` flags then detaches
402    /// the observe handle (unsubscribes every per-node sink, the
403    /// namespace-change sink, and the topology sub) via the owner's
404    /// `&Core`. Idempotent: a second call is a no-op (the observe
405    /// handle is taken on first call).
406    pub fn detach(&self, core: &Core) {
407        self.dispose();
408        let observe = self
409            .observe
410            .lock()
411            .unwrap_or_else(std::sync::PoisonError::into_inner)
412            .take();
413        if let Some(mut observe) = observe {
414            observe.detach(core);
415        }
416    }
417
418    /// Drain pending writes on all tiers (D171).
419    ///
420    /// When a tier is configured with `debounce_ms > 0`,
421    /// [`attach_snapshot_storage`]'s observe sink writes via the
422    /// tier's `save()` but skips the inline `flush()`. Callers
423    /// invoke this method — typically from a binding-side reactive
424    /// timer subgraph (`graphrefly_operators::temporal::interval`,
425    /// shipped in Slice T) — to commit the buffered writes to their
426    /// backends. With `debounce_ms == 0`, the observe sink already
427    /// force-flushes inline; calling this method is then a no-op
428    /// because the tier's pending buffers are empty.
429    ///
430    /// Returns the first error encountered, if any. Successful tiers
431    /// in the same call still flush; the error is surfaced for the
432    /// caller's diagnostics.
433    ///
434    /// # Lock discipline (N3, QA 2026-05-14)
435    ///
436    /// The state mutex is **not** held across `tier.flush()` calls.
437    /// Holding it across blocking I/O would (a) serialize the
438    /// reactive graph against backend latency (every observe-sink
439    /// fire would wait on flush completion), and (b) deadlock if a
440    /// caller invokes `flush_all` from inside an `on_error` callback
441    /// (the observe sink already holds `state.lock()` when it
442    /// invokes `on_error`). The implementation snapshots the per-
443    /// tier flush requests under the lock, drops the lock, then
444    /// runs the flushes in sequence. After each flush, a brief
445    /// re-lock applies any per-tier state mutations (none today —
446    /// `flush()` doesn't return new state).
447    ///
448    /// # Errors
449    ///
450    /// Returns the first `StorageError` encountered. Subsequent
451    /// errors are dropped (acceptable v1; the registered
452    /// `on_error` callback fires per error in the observe sink path
453    /// and is the canonical multi-error reporting surface).
454    ///
455    /// F3: recovers from a poisoned state mutex via `into_inner`
456    /// rather than silently returning Ok.
457    pub fn flush_all(&self) -> Result<(), StorageError> {
458        // Snapshot the per-tier flush callables under the lock,
459        // then drop the lock before invoking them. Each tier's
460        // `flush()` is a `&self` method on `Box<dyn ...>` — to call
461        // it without holding the outer lock, we collect raw
462        // pointers... no, `Box<dyn ...>` can't be cloned. Instead,
463        // we re-lock briefly per tier to call flush, but never hold
464        // the lock across MULTIPLE tier flushes — that's the key
465        // deadlock avoidance.
466        //
467        // The remaining single-tier-lock-during-flush window can
468        // deadlock if `tier.flush()` synchronously re-enters
469        // `dispose()` / `flush_all()` (which would try to re-acquire
470        // `self.state`). Tier impls in `graphrefly-storage` don't do
471        // that; document the contract for future binding-side tier
472        // impls in the rustdoc.
473        let tier_count = {
474            let states = self
475                .state
476                .lock()
477                .unwrap_or_else(std::sync::PoisonError::into_inner);
478            states.len()
479        };
480        let mut first_err: Option<StorageError> = None;
481        for idx in 0..tier_count {
482            // Per-tier flush: brief lock to access the tier, run
483            // flush, drop lock immediately. The tier itself is
484            // behind a Box<dyn ...> inside the Vec; flush() takes
485            // &self, so the &mut on the outer Vec is only needed
486            // for `s.disposed` check.
487            let snapshot_err: Option<StorageError>;
488            let wal_err: Option<StorageError>;
489            {
490                let mut states = self
491                    .state
492                    .lock()
493                    .unwrap_or_else(std::sync::PoisonError::into_inner);
494                let Some(s) = states.get_mut(idx) else {
495                    continue;
496                };
497                if s.disposed {
498                    continue;
499                }
500                // Call flush while holding the lock — this is
501                // narrow-scoped and the only contention is with the
502                // observe sink, which is itself bounded by wave
503                // dispatch. No `on_error` invocation under this
504                // lock (callers receive errors via the return
505                // value, not via the registered callback).
506                snapshot_err = s.snapshot_tier.flush().err();
507                wal_err = s.wal_tier.as_ref().and_then(|wal| wal.flush().err());
508            }
509            if let Some(e) = snapshot_err {
510                if first_err.is_none() {
511                    first_err = Some(e);
512                }
513            }
514            if let Some(e) = wal_err {
515                if first_err.is_none() {
516                    first_err = Some(e);
517                }
518            }
519        }
520        match first_err {
521            None => Ok(()),
522            Some(e) => Err(e),
523        }
524    }
525}
526
527// D246 r3: no `impl Drop` — teardown is owner-invoked
528// ([`StorageHandle::detach`]); a parameterless `Drop` cannot reach
529// `&Core`. The observe subscription is opened via raw `core.subscribe`
530// and is NOT `OwnedCore`-tracked: you MUST call `detach(core)` —
531// dropping a `StorageHandle` without it (and without a subsequent
532// `graph.destroy(core)`) leaks the subscription for the `Core`
533// lifetime. `OwnedCore` drop does NOT collect it.
534
535/// Wire an observe subscription on `graph` that persists node changes
536/// to the provided snapshot+WAL tier pairs.
537///
538/// # Debounce (D171, resolved 2026-05-14)
539///
540/// When a tier's `debounce_ms` is `Some(ms)` with `ms > 0`, the
541/// observe callback writes via the tier's `save()` (which buffers
542/// internally per the [`BaseStorageTier`](crate::tier::BaseStorageTier)
543/// contract) but does NOT force a `flush()`. Callers drain pending
544/// writes by invoking [`StorageHandle::flush_all`] — typically from a
545/// binding-side reactive timer (e.g.,
546/// `graphrefly_operators::temporal::interval`). This keeps
547/// `graphrefly-storage` sync + binding-agnostic per the canonical
548/// no-async-in-storage invariant; reactive-timer wiring lives in the
549/// layer that already owns the timer primitive.
550///
551/// When `debounce_ms` is `None` or `Some(0)`, the observe callback
552/// continues to force-flush inline as before.
553///
554/// # `key_of` (D174, closes F8)
555///
556/// The snapshot tier's backend key is derived from `graph.name` via
557/// the checkpoint record's `name` field. Cross-impl `key_of` divergence
558/// disappears at this boundary.
559/// D246: the embedder owns the [`Core`] (see
560/// [`graphrefly_core::OwnedCore`]) and passes `&Core` in — the
561/// `observe_all_reactive` subscription is opened owner-side. Teardown
562/// is owner-invoked via [`StorageHandle::detach`] (no RAII `Drop`,
563/// D246 r3).
564#[must_use = "the returned StorageHandle owns the observe subscription; \
565              call StorageHandle::detach(core) to unsubscribe and stop \
566              persistence (D246 r3 — no RAII Drop)"]
567pub fn attach_snapshot_storage(
568    core: &Core,
569    graph: &Graph,
570    pairs: Vec<AttachTierPair>,
571    options: AttachOptions,
572) -> StorageHandle {
573    let graph_name = graph.name();
574    let wal_prefix = graph_wal_prefix(&graph_name);
575
576    let mut states = Vec::with_capacity(pairs.len());
577    for pair in pairs {
578        // Bootstrap: enumerate existing WAL frames to find high-water seq.
579        let mut high_seq: u64 = 0;
580        if let Some(ref wal) = pair.wal {
581            if let Ok(keys) = wal.list(&wal_prefix) {
582                for key in keys {
583                    if let Some(seg) = key.rsplit('/').next() {
584                        if let Ok(s) = seg.parse::<u64>() {
585                            high_seq = high_seq.max(s);
586                        }
587                    }
588                }
589            }
590        }
591
592        let compact_every = pair.snapshot.compact_every().unwrap_or(10);
593        let snapshot_debounce = pair.snapshot.debounce_ms().unwrap_or(0);
594        let wal_debounce = pair.wal.as_ref().and_then(|w| w.debounce_ms()).unwrap_or(0);
595
596        states.push(TierState {
597            snapshot_tier: pair.snapshot,
598            wal_tier: pair.wal,
599            wal_prefix: wal_prefix.clone(),
600            seq: high_seq,
601            flush_count: 0,
602            compact_every,
603            snapshot_debounce_ms: snapshot_debounce,
604            wal_debounce_ms: wal_debounce,
605            last_snapshot: None,
606            disposed: false,
607        });
608    }
609
610    let shared_states = Arc::new(Mutex::new(states));
611    let states_for_sink = shared_states.clone();
612    let filter = options.filter;
613    // β/D244: wrap in `Arc` so each per-fire `MailboxOp::Defer` can
614    // clone the callback (`ErrorCallback` is a non-`Clone` `Box`).
615    let on_error = options.on_error.map(Arc::new);
616    // β/D242/D244/D246: the observe sink fires *in-wave* (during Core
617    // dispatch) and has no `&Core`. `Graph` is now Core-free and
618    // `Send + Sync + 'static` (D246), so a cheap `Arc`-clone is
619    // captured directly into the deferred closure (the old
620    // `NamespaceHandle` split is gone). The snapshot + tier-flush is
621    // posted as a `MailboxOp::Defer` and runs on the owner with a real
622    // `&dyn CoreFull`. Behaviour: snapshot+persist is in-wave-deferred
623    // to quiescence rather than synchronous in the sink — consistent
624    // with D243/D244 "deferred snapshot acceptable" (storage
625    // persistence is a downstream observation).
626    let graph_for_sink = graph.clone();
627    // D249/S2c: the defer closure captures a `Graph`
628    // (`Rc<RefCell<GraphInner>>`, `!Send` post-D248), so it must use
629    // the owner-only `!Send` `DeferQueue`, not the `Send` `CoreMailbox`
630    // Defer. Owner-thread-only `Rc` — fine: `observe_all_reactive`'s
631    // sink is `!Send` and fires owner-side.
632    let deferred = core.defer_queue();
633
634    // D246 rule 8 (S4): reusable coalescing slot. The snapshot+persist
635    // is idempotent at drain time (`snapshot_full` reads current state),
636    // so N qualifying emissions in one wave need only ONE deferred
637    // snapshot+flush, not N boxed closures. `scheduled` (owner-thread-
638    // only `Cell`) gates a single `Box` post per drain; the closure
639    // clears it so the next wave re-arms.
640    //
641    // M2 (QA, 2026-05-19): `compact_every` cadence is parameterised
642    // per-emission (TS parity), so the coalescing MUST NOT also
643    // collapse the count. `pending_count` tracks the number of
644    // qualifying emits observed in the wave; the closure consumes it
645    // and `flush_tier` increments `flush_count` by that count + tests
646    // boundary-crossing of `compact_every`, preserving the per-emission
647    // compact cadence under the per-wave snapshot. Persisted state is
648    // the final wave state either way (deferred-snapshot acceptable,
649    // D243/D244) — only the *cadence* needs the count.
650    let scheduled = std::rc::Rc::new(std::cell::Cell::new(false));
651    let pending_count = std::rc::Rc::new(std::cell::Cell::new(0usize));
652
653    // Wire observe_all_reactive so late-added nodes are also covered.
654    let mut observe = graph.observe_all_reactive();
655    observe.subscribe(core, move |path: &str, messages: &[Message]| {
656        // Filter: only tiers 3–5 (DATA/RESOLVED, INVALIDATE, COMPLETE/ERROR).
657        let dominated_by_tier = messages.iter().any(|m| {
658            let t = m.tier();
659            (3..6).contains(&t)
660        });
661        if !dominated_by_tier {
662            return;
663        }
664
665        // Optional path filter (cheap, no Core — stays synchronous).
666        if let Some(ref f) = filter {
667            if !f(path) {
668                return;
669            }
670        }
671
672        // M2: count this qualifying emit BEFORE the coalesce gate, so
673        // `compact_every` cadence stays per-emission. Saturating add —
674        // a `usize` overflow on count-per-wave is unreachable in any
675        // realistic workload (max waves are bounded by `max_ops`).
676        pending_count.set(pending_count.get().saturating_add(1));
677
678        if scheduled.get() {
679            return; // already armed for this drain — coalesce.
680        }
681        let graph_for_defer = graph_for_sink.clone();
682        let states_for_defer = states_for_sink.clone();
683        let on_error = on_error.clone();
684        let sched = std::rc::Rc::clone(&scheduled);
685        let pc = std::rc::Rc::clone(&pending_count);
686        sched.set(true);
687        // Defer the snapshot + flush owner-side (D244). Core-gone
688        // (`false`) ⇒ dropped unrun: nothing to persist on a
689        // torn-down graph, no handles captured (no leak).
690        let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
691            sched.set(false);
692            // M2: consume the per-emission count and reset for the
693            // next wave.
694            let count = pc.replace(0);
695            // Take a snapshot once, shared across all sync tiers.
696            // D246: the in-wave facade `&dyn CoreFull` drives the
697            // snapshot through the one public `Graph` type (Core-free,
698            // Send+Sync — captured directly).
699            let snapshot = graph_for_defer.snapshot_full(cf);
700
701            // N3 (QA 2026-05-14) — collect errors INSIDE the lock,
702            // invoke `on_error` AFTER releasing the lock (re-entrant
703            // `flush_all`/`dispose` would otherwise deadlock). F3:
704            // recover from a poisoned lock via `into_inner`.
705            let collected_errors: Vec<StorageError> = {
706                let mut states = states_for_defer
707                    .lock()
708                    .unwrap_or_else(std::sync::PoisonError::into_inner);
709                let mut errs = Vec::new();
710                for s in states.iter_mut() {
711                    if s.disposed {
712                        continue;
713                    }
714                    if let Err(e) = flush_tier(s, &snapshot, count) {
715                        errs.push(e);
716                    }
717                }
718                errs
719            };
720            if !collected_errors.is_empty() {
721                if let Some(ref cb) = on_error {
722                    for e in &collected_errors {
723                        cb(e);
724                    }
725                }
726            }
727        }));
728    });
729
730    StorageHandle {
731        state: shared_states,
732        observe: Mutex::new(Some(observe)),
733    }
734}
735
736/// Flush a single tier: either full baseline or WAL delta.
737///
738/// `count` is the number of qualifying observed emits this coalesced
739/// flush represents (M2 / QA 2026-05-19). `flush_count` advances by
740/// `count` to preserve the TS-parity per-emission cadence of
741/// `compact_every`: `write_full` fires if any of the increments in
742/// `before+1..=before+count` would have been a multiple of
743/// `compact_every` (boundary-crossing test) — equivalent to firing on
744/// each emit one-by-one, just batched into the coalesced flush.
745fn flush_tier(
746    s: &mut TierState,
747    snapshot: &GraphPersistSnapshot,
748    count: usize,
749) -> Result<(), StorageError> {
750    let before = s.flush_count;
751    // Defensive: a `count == 0` call (no qualifying emits) is a no-op
752    // here — still walks the write path so first-baseline & WAL deltas
753    // honor `last_snapshot` correctness, but doesn't tick cadence.
754    let inc = count as u64;
755    s.flush_count = s.flush_count.saturating_add(inc);
756
757    let write_full = s.wal_tier.is_none()
758        || s.last_snapshot.is_none()
759        || (s.compact_every > 0 && {
760            // Did the batch [before+1 ..= before+count] cross at least
761            // one multiple of `compact_every`? Integer-division test.
762            let cmp = u64::from(s.compact_every);
763            (before / cmp) < (s.flush_count / cmp)
764        });
765
766    if write_full {
767        write_full_baseline(s, snapshot)?;
768    } else {
769        write_wal_delta(s, snapshot)?;
770    }
771
772    s.last_snapshot = Some(snapshot.clone());
773    Ok(())
774}
775
776/// Write a full baseline snapshot to the snapshot tier.
777///
778/// When `snapshot_debounce_ms > 0` (D171), the inner `save()` buffers
779/// per the tier's `BaseStorageTier` contract and we DON'T force a
780/// `flush()`. The caller drains via [`StorageHandle::flush_all`].
781fn write_full_baseline(
782    s: &mut TierState,
783    snapshot: &GraphPersistSnapshot,
784) -> Result<(), StorageError> {
785    let timestamp_ns = graphrefly_core::wall_clock_ns();
786    let record = GraphCheckpointRecord {
787        name: snapshot.name.clone(),
788        mode: "full".to_owned(),
789        snapshot: snapshot.clone(),
790        seq: s.seq,
791        timestamp_ns,
792        format_version: SNAPSHOT_VERSION,
793    };
794
795    s.snapshot_tier.save(record)?;
796    if s.snapshot_debounce_ms == 0 {
797        s.snapshot_tier.flush()?;
798    }
799    Ok(())
800}
801
802/// Write WAL delta frames for the diff between `last_snapshot` and current.
803///
804/// D171: WAL `flush()` is skipped when the WAL tier has
805/// `debounce_ms > 0`; the caller drains via
806/// [`StorageHandle::flush_all`].
807fn write_wal_delta(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
808    let last = s
809        .last_snapshot
810        .as_ref()
811        .expect("caller ensures last_snapshot is Some");
812    let diff = diff_snapshots(last, snapshot);
813
814    if diff.is_empty() {
815        return Ok(());
816    }
817
818    let timestamp_ns = graphrefly_core::wall_clock_ns();
819    let (frames, next_seq) = decompose_diff_to_frames(&diff, timestamp_ns, s.seq)?;
820
821    if let Some(ref wal) = s.wal_tier {
822        for frame in &frames {
823            let key = wal_frame_key(&s.wal_prefix, frame.frame_seq);
824            wal.save(&key, frame.clone())?;
825        }
826        if s.wal_debounce_ms == 0 {
827            wal.flush()?;
828        }
829    }
830
831    s.seq = next_seq;
832    Ok(())
833}
834
835// ── Restore (D170) ─────────────────────────────────────────────────────────
836
837/// Torn-write policy for mid-stream checksum failures.
838#[derive(Debug, Clone, Copy, PartialEq, Eq)]
839pub enum TornWritePolicy {
840    /// Drop the frame and continue (default for WAL tail).
841    Skip,
842    /// Abort the entire restore.
843    Abort,
844}
845
846/// Callback for torn-write decisions. Receives the `frame_seq` and
847/// reason; returns the desired policy.
848pub type OnTornWrite = Box<dyn Fn(u64, &str) -> TornWritePolicy + Send + Sync>;
849
850/// Options for [`restore_snapshot`].
851pub struct RestoreOptions<'a> {
852    /// Snapshot tier to load the baseline from.
853    pub snapshot_tier: &'a dyn SnapshotStorageTier<GraphCheckpointRecord>,
854    /// WAL tier to enumerate delta frames from.
855    pub wal_tier: &'a dyn KvStorageTier<WALFrame<Value>>,
856    /// Optional max `frame_seq` to replay up to. `None` = replay all.
857    pub target_seq: Option<u64>,
858    /// Torn-write callback. If `None`, defaults: tail = Skip, mid = Abort.
859    pub on_torn_write: Option<OnTornWrite>,
860}
861
862/// Three-phase WAL replay: baseline load → checksum verify → lifecycle-
863/// scoped batch.
864///
865/// # Phase 1: Baseline
866///
867/// Loads the `mode:"full"` baseline from the snapshot tier. The snapshot
868/// key is derived from `graph.name` (D174).
869///
870/// # Phase 2: Checksum verification
871///
872/// Enumerates WAL frames with `frame_seq > baseline.seq`, verifies each
873/// frame's SHA-256 checksum, applies torn-write policy on mismatch.
874///
875/// # Phase 3: Lifecycle-scoped batch replay
876///
877/// Groups verified frames by lifecycle. Replays in cross-scope order
878/// (`Spec → Data → Ownership`). Each lifecycle runs in a `graph.batch()`
879/// for atomic partial-restore semantics (Q2).
880///
881/// D246: the embedder owns the [`Core`] (see
882/// [`graphrefly_core::OwnedCore`]) and threads `&Core` into every
883/// Core-touching graph mutation (baseline restore + WAL replay).
884pub fn restore_snapshot(
885    core: &Core,
886    graph: &Graph,
887    opts: &RestoreOptions<'_>,
888) -> Result<RestoreResult, RestoreError> {
889    // Phase 1: Load and apply baseline.
890    let baseline = load_baseline(core, graph, opts)?;
891    let baseline_seq = baseline.seq;
892
893    // Phase 1b: Collect WAL frames post-baseline.
894    let collected = collect_wal_frames(opts, &baseline.name, baseline_seq)?;
895
896    // Phase 2: Checksum verification.
897    let (verified, skipped) = verify_frames(collected, opts.on_torn_write.as_ref())?;
898
899    // Phase 3: Lifecycle-scoped batch replay.
900    replay_by_lifecycle(core, graph, &verified, baseline_seq, skipped)
901}
902
903/// Phase 1: Load baseline from snapshot tier + apply to graph.
904fn load_baseline(
905    core: &Core,
906    graph: &Graph,
907    opts: &RestoreOptions<'_>,
908) -> Result<GraphCheckpointRecord, RestoreError> {
909    let baseline = opts
910        .snapshot_tier
911        .load()
912        .map_err(|e| RestoreError::PhaseFailed {
913            lifecycle: Lifecycle::Spec,
914            frame_seq: 0,
915            message: format!("baseline load failed: {e}"),
916        })?
917        .ok_or(RestoreError::BaselineMissing)?;
918
919    if baseline.mode != "full" {
920        return Err(RestoreError::BaselineMissing);
921    }
922
923    graph
924        .restore(core, &baseline.snapshot)
925        .map_err(|e| RestoreError::PhaseFailed {
926            lifecycle: Lifecycle::Spec,
927            frame_seq: 0,
928            message: format!("baseline restore failed: {e}"),
929        })?;
930
931    Ok(baseline)
932}
933
934/// Phase 1b: Enumerate + filter WAL frames.
935fn collect_wal_frames(
936    opts: &RestoreOptions<'_>,
937    graph_name: &str,
938    baseline_seq: u64,
939) -> Result<Vec<WALFrame<Value>>, RestoreError> {
940    let wal_prefix = graph_wal_prefix(graph_name);
941    let keys = opts
942        .wal_tier
943        .list(&wal_prefix)
944        .map_err(|e| RestoreError::PhaseFailed {
945            lifecycle: Lifecycle::Spec,
946            frame_seq: 0,
947            message: format!("WAL frame enumeration failed: {e}"),
948        })?;
949
950    let mut collected: Vec<WALFrame<Value>> = Vec::new();
951    for key in keys {
952        let frame_seq = key
953            .rsplit('/')
954            .next()
955            .and_then(|s| s.parse::<u64>().ok())
956            .unwrap_or(0);
957
958        if frame_seq <= baseline_seq {
959            continue;
960        }
961        if let Some(target) = opts.target_seq {
962            if frame_seq > target {
963                continue;
964            }
965        }
966
967        if let Some(frame) = opts
968            .wal_tier
969            .load(&key)
970            .map_err(|e| RestoreError::PhaseFailed {
971                lifecycle: Lifecycle::Data,
972                frame_seq,
973                message: format!("WAL frame load failed: {e}"),
974            })?
975        {
976            collected.push(frame);
977        }
978    }
979
980    collected.sort_by_key(|f| f.frame_seq);
981    Ok(collected)
982}
983
984/// Phase 2: Verify checksums, apply torn-write policy.
985fn verify_frames(
986    collected: Vec<WALFrame<Value>>,
987    on_torn_write: Option<&OnTornWrite>,
988) -> Result<(Vec<WALFrame<Value>>, u64), RestoreError> {
989    let mut verified = Vec::new();
990    let mut skipped: u64 = 0;
991    let total = collected.len();
992
993    for (i, frame) in collected.into_iter().enumerate() {
994        if verify_wal_frame_checksum(&frame).unwrap_or(false) {
995            verified.push(frame);
996            continue;
997        }
998
999        let is_tail = i == total - 1;
1000        let policy = if let Some(cb) = on_torn_write {
1001            cb(frame.frame_seq, "checksum-mismatch")
1002        } else if is_tail {
1003            TornWritePolicy::Skip
1004        } else {
1005            TornWritePolicy::Abort
1006        };
1007
1008        match policy {
1009            TornWritePolicy::Skip => skipped += 1,
1010            TornWritePolicy::Abort => {
1011                return Err(RestoreError::TornWriteMidStream {
1012                    frame_seq: frame.frame_seq,
1013                    reason: "checksum-mismatch".to_owned(),
1014                });
1015            }
1016        }
1017    }
1018
1019    Ok((verified, skipped))
1020}
1021
1022/// Phase 3: Group by lifecycle, replay in cross-scope order.
1023fn replay_by_lifecycle(
1024    core: &Core,
1025    graph: &Graph,
1026    verified: &[WALFrame<Value>],
1027    baseline_seq: u64,
1028    skipped: u64,
1029) -> Result<RestoreResult, RestoreError> {
1030    let mut grouped: [Vec<WALFrame<Value>>; 3] = [Vec::new(), Vec::new(), Vec::new()];
1031    for frame in verified {
1032        for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
1033            if frame.lifecycle == *lifecycle {
1034                grouped[idx].push(frame.clone());
1035                break;
1036            }
1037        }
1038    }
1039
1040    let mut phases = Vec::new();
1041    let mut replayed: u64 = 0;
1042    let mut final_seq: u64 = baseline_seq;
1043
1044    for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
1045        let life_frames = &grouped[idx];
1046        if life_frames.is_empty() {
1047            continue;
1048        }
1049        let frame_count = life_frames.len() as u64;
1050        let max_seq = life_frames.iter().map(|f| f.frame_seq).max().unwrap_or(0);
1051
1052        // /qa G1.2 (2026-05-22): `apply_wal_frame` now returns
1053        // `Result<(), RestoreError>`. Thread per-frame errors out of the
1054        // `graph.batch` closure via a `RefCell<Option<RestoreError>>` and
1055        // surface the first failure after the batch drains. Matches the
1056        // B1/B2 "honest error" theme — a failed mount/unmount during
1057        // replay aborts the batch loudly rather than corrupting topology
1058        // silently.
1059        let batch_err: std::cell::RefCell<Option<RestoreError>> = std::cell::RefCell::new(None);
1060        graph.batch(core, || {
1061            for frame in life_frames {
1062                if batch_err.borrow().is_some() {
1063                    break;
1064                }
1065                if let Err(e) = apply_wal_frame(core, graph, frame) {
1066                    *batch_err.borrow_mut() = Some(e);
1067                    break;
1068                }
1069            }
1070        });
1071        if let Some(e) = batch_err.into_inner() {
1072            return Err(e);
1073        }
1074
1075        replayed += frame_count;
1076        final_seq = final_seq.max(max_seq);
1077        phases.push(PhaseStat {
1078            lifecycle: *lifecycle,
1079            frames: frame_count,
1080        });
1081    }
1082
1083    Ok(RestoreResult {
1084        replayed_frames: replayed,
1085        skipped_frames: skipped,
1086        final_seq,
1087        phases,
1088    })
1089}
1090
1091/// /qa G1.1 (2026-05-22): walk a multi-segment mount/unmount `path`
1092/// (e.g. `"parent::child::nested"`) to find the OWNER GRAPH where the
1093/// leaf mount actually lives, returning `(owner_graph, leaf_name)`.
1094/// For a single-segment path returns `(graph, path)`. Returns `None`
1095/// if any intermediate segment can't be resolved as a child mount.
1096///
1097/// TS `_collectSubgraphs` (`packages/pure-ts/src/graph/graph.ts:3486`)
1098/// emits fully-qualified paths from root; the Rust storage replay must
1099/// walk those segments to address the right owner graph before calling
1100/// `mount_new` / `unmount`. Pre-/qa B4 the mount/unmount arms passed
1101/// the multi-segment path straight to `graph.mount_new(core, path)`
1102/// which rejects [`PATH_SEP`] in names — silently no-op'd via the
1103/// `let _ = …` pattern.
1104fn resolve_mount_parent<'a>(graph: &Graph, path: &'a str) -> Option<(Graph, &'a str)> {
1105    if path.is_empty() {
1106        return None;
1107    }
1108    let mut current = graph.clone();
1109    let mut segments = path.split("::");
1110    let first = segments.next()?;
1111    let mut prev = first;
1112    for seg in segments {
1113        current = current.child(prev)?;
1114        prev = seg;
1115    }
1116    Some((current, prev))
1117}
1118
1119/// Apply a single WAL frame to a graph. Mirrors TS `applyWalFrame`.
1120///
1121/// D246: every Core-touching `graph.*` call takes the owner's `&Core`;
1122/// binding access is `core.binding_ptr()` (the retired `Graph::core()`
1123/// is gone — `Graph` is Core-free).
1124///
1125/// /qa G1.2 (2026-05-22): returns `Result<(), RestoreError>`. The
1126/// pre-/qa silent `let _ = …` swallow on every arm regressed the
1127/// B1/B2 "honest error" theme; this signature propagates per-frame
1128/// errors up to `replay_by_lifecycle` so a failed mount/collision
1129/// during replay aborts the batch loudly. Idempotent skips (already-
1130/// present add, absent remove, etc.) continue to return `Ok(())`.
1131fn apply_wal_frame(
1132    core: &Core,
1133    graph: &Graph,
1134    frame: &WALFrame<Value>,
1135) -> Result<(), RestoreError> {
1136    let change = &frame.change.change;
1137    let kind = change.get("kind").and_then(Value::as_str).unwrap_or("");
1138    let frame_seq = frame.frame_seq;
1139    let lifecycle = frame.lifecycle;
1140    let phase_err = |message: String| RestoreError::PhaseFailed {
1141        lifecycle,
1142        frame_seq,
1143        message,
1144    };
1145
1146    match frame.lifecycle {
1147        Lifecycle::Spec => apply_spec_frame(core, graph, kind, change, &phase_err),
1148        Lifecycle::Data => {
1149            apply_data_frame(core, graph, kind, change);
1150            Ok(())
1151        }
1152        // Ownership lifecycle — deferred (Phase 13)
1153        Lifecycle::Ownership => Ok(()),
1154    }
1155}
1156
1157/// /qa G1.2 + `clippy::too_many_lines` (2026-05-22): extracted from
1158/// `apply_wal_frame` so the parent fn fits under the 100-line cap.
1159/// Per-arm semantics unchanged; each arm is idempotent on the
1160/// already-applied case and propagates real errors as
1161/// [`RestoreError::PhaseFailed`].
1162fn apply_spec_frame(
1163    core: &Core,
1164    graph: &Graph,
1165    kind: &str,
1166    change: &Value,
1167    phase_err: &impl Fn(String) -> RestoreError,
1168) -> Result<(), RestoreError> {
1169    match kind {
1170        "graph.add" => {
1171            let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
1172            if node_id_str.is_empty() || graph.try_resolve(node_id_str).is_some() {
1173                return Ok(()); // already present or invalid path — idempotent
1174            }
1175            // Only auto-create state nodes (matches TS behavior).
1176            let slice = change.get("slice");
1177            let node_type = slice
1178                .and_then(|s| s.get("type"))
1179                .and_then(Value::as_str)
1180                .unwrap_or("");
1181            if node_type != "state" {
1182                return Ok(());
1183            }
1184            let initial_value = slice.and_then(|s| s.get("value")).cloned();
1185            let handle = initial_value.map_or(graphrefly_core::NO_HANDLE, |v| {
1186                core.binding_ptr().deserialize_value(v)
1187            });
1188            graph
1189                .state(core, node_id_str, Some(handle))
1190                .map_err(|e| phase_err(format!("graph.add `{node_id_str}` failed: {e:?}")))?;
1191            Ok(())
1192        }
1193        "graph.remove" => {
1194            let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
1195            if node_id_str.is_empty() || graph.try_resolve(node_id_str).is_none() {
1196                return Ok(()); // absent — idempotent
1197            }
1198            graph
1199                .remove(core, node_id_str)
1200                .map_err(|e| phase_err(format!("graph.remove `{node_id_str}` failed: {e:?}")))?;
1201            Ok(())
1202        }
1203        "graph.mount" => {
1204            // B4 (2026-05-22, /porting-to-rs storage-honest-error batch):
1205            // mount the subgraph if absent. Idempotent on replay; emitted
1206            // BEFORE any value-changes at the subgraph level so
1207            // `node.set X.y` is guaranteed to find the mount.
1208            //
1209            // /qa G1.1 (2026-05-22): walk multi-segment paths via
1210            // `resolve_mount_parent`. TS `_collectSubgraphs` emits
1211            // `"parent::child::nested"` for nested mounts; the Rust replay
1212            // must descend through `Graph::child` to find the owner of the
1213            // leaf mount.
1214            let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1215            if path.is_empty() {
1216                return Ok(());
1217            }
1218            let (parent_graph, leaf) = resolve_mount_parent(graph, path).ok_or_else(|| {
1219                phase_err(format!(
1220                    "graph.mount `{path}`: parent path segments do not resolve to a mounted subgraph"
1221                ))
1222            })?;
1223            if parent_graph.child_names().iter().any(|n| n == leaf) {
1224                return Ok(()); // already mounted; idempotent
1225            }
1226            parent_graph
1227                .mount_new(core, leaf)
1228                .map_err(|e| phase_err(format!("graph.mount `{path}` failed: {e:?}")))?;
1229            Ok(())
1230        }
1231        "graph.unmount" => {
1232            // B4 (2026-05-22): unmount if present.
1233            // /qa G1.1 (2026-05-22): walk multi-segment paths (see
1234            // `graph.mount` above).
1235            let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1236            if path.is_empty() {
1237                return Ok(());
1238            }
1239            let Some((parent_graph, leaf)) = resolve_mount_parent(graph, path) else {
1240                return Ok(()); // parent absent → leaf necessarily absent → idempotent
1241            };
1242            if !parent_graph.child_names().iter().any(|n| n == leaf) {
1243                return Ok(()); // already absent; idempotent
1244            }
1245            parent_graph
1246                .unmount(core, leaf)
1247                .map_err(|e| phase_err(format!("graph.unmount `{path}` failed: {e:?}")))?;
1248            Ok(())
1249        }
1250        _ => Ok(()),
1251    }
1252}
1253
1254/// /qa G1.2 + `clippy::too_many_lines` (2026-05-22): extracted from
1255/// `apply_wal_frame`. `node.set` / `node.invalidate` arms — these never
1256/// returned errors pre-/qa (each is best-effort against the current
1257/// graph state) so the signature returns `Result` purely for symmetry
1258/// with the spec-lifecycle arm.
1259fn apply_data_frame(core: &Core, graph: &Graph, kind: &str, change: &Value) {
1260    match kind {
1261        "node.set" => {
1262            let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1263            if let Some(value) = change.get("value") {
1264                if !path.is_empty() && graph.try_resolve(path).is_some() {
1265                    let handle = core.binding_ptr().deserialize_value(value.clone());
1266                    graph.set(core, path, handle);
1267                }
1268            }
1269        }
1270        "node.invalidate" => {
1271            let path = change.get("path").and_then(Value::as_str).unwrap_or("");
1272            if !path.is_empty() {
1273                if let Some(id) = graph.try_resolve(path) {
1274                    graph.invalidate(core, id);
1275                }
1276            }
1277        }
1278        // node.versionBump — deferred (V0 versioning is internal)
1279        _ => {}
1280    }
1281}