Skip to main content

graphrefly_storage/
graph_integration.rs

1//! Graph-level storage integration (M4.E2 — D170–D174).
2//!
3//! Free functions (not `Graph` methods) because `graphrefly-graph` does not
4//! depend on `graphrefly-storage` — the dependency flows in the other
5//! direction. See D170.
6//!
7//! # Provided APIs
8//!
9//! - [`GraphCheckpointRecord`] — portable baseline type wrapping a
10//!   [`GraphPersistSnapshot`] + seq metadata + `format_version` (F7 close).
11//! - [`diff_snapshots`] / [`decompose_diff_to_frames`] — snapshot diff →
12//!   WAL frame generation engine (D172).
13//! - [`attach_snapshot_storage`] + [`StorageHandle`] — wire observe
14//!   subscription → snapshot diff → WAL frame writes.
15//! - [`restore_snapshot`] — three-phase replay (baseline → checksum verify
16//!   → lifecycle-scoped batch).
17//!
18//! # Manifest (D173 — F4 close)
19//!
20//! F4 (cross-restart key recovery) is structurally closed by D174: at the
21//! attach boundary, `key_of` is derived deterministically from
22//! `graph.name`. On restore, the snapshot key is known without a separate
23//! manifest entry. The checkpoint record's `seq` field serves as the WAL
24//! high-water mark.
25
26use std::sync::{Arc, Mutex};
27
28use graphrefly_core::Message;
29use graphrefly_graph::{Graph, GraphPersistSnapshot, NodeSlice};
30use graphrefly_structures::{BaseChange, Lifecycle, Version};
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33
34use crate::error::{PhaseStat, RestoreError, RestoreResult, StorageError};
35use crate::tier::{KvStorageTier, SnapshotStorageTier};
36use crate::wal::{
37    graph_wal_prefix, verify_wal_frame_checksum, wal_frame_checksum, wal_frame_key, WALFrame,
38    WalTag, REPLAY_ORDER,
39};
40
41// ── Constants ──────────────────────────────────────────────────────────────
42
43/// Current snapshot format version. Embedded in [`GraphCheckpointRecord`]
44/// and in `BaseChange.version` within decomposed WAL frames (F7 close).
45pub const SNAPSHOT_VERSION: u64 = 1;
46
47// ── Types ──────────────────────────────────────────────────────────────────
48
49/// Portable baseline record written by [`attach_snapshot_storage`] on
50/// full-snapshot writes. Contains the full [`GraphPersistSnapshot`] plus
51/// metadata for WAL cursor alignment.
52///
53/// The `format_version` field closes F7 (missing `format_version` on
54/// `WALFrame` and checkpoint records).
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GraphCheckpointRecord {
57    /// Graph name (matches `snapshot.name`).
58    pub name: String,
59    /// Snapshot mode — `"full"` for baseline, `"diff"` reserved for future
60    /// incremental baselines.
61    pub mode: String,
62    /// The complete graph state.
63    pub snapshot: GraphPersistSnapshot,
64    /// WAL-tier cursor at baseline write time. Frames with `frame_seq >
65    /// seq` are the delta.
66    pub seq: u64,
67    /// Wall-clock timestamp at baseline write time.
68    pub timestamp_ns: u64,
69    /// Format version (F7 close).
70    pub format_version: u64,
71}
72
73// ── Diff engine (D172) ─────────────────────────────────────────────────────
74
75/// Structural diff between two [`GraphPersistSnapshot`]s.
76#[derive(Debug, Clone)]
77pub struct GraphSnapshotDiff {
78    /// Node names present in `after` but not `before`.
79    pub nodes_added: Vec<String>,
80    /// Full slices for added nodes (parallel to `nodes_added`).
81    pub nodes_added_slices: Vec<NodeSlice>,
82    /// Node names present in `before` but not `after`.
83    pub nodes_removed: Vec<String>,
84    /// Nodes whose `value` field changed between snapshots.
85    pub value_changes: Vec<ValueChange>,
86    /// Subgraph mount names added.
87    pub subgraphs_added: Vec<String>,
88    /// Subgraph mount names removed.
89    pub subgraphs_removed: Vec<String>,
90}
91
92impl GraphSnapshotDiff {
93    /// True when no structural or value changes were detected.
94    #[must_use]
95    pub fn is_empty(&self) -> bool {
96        self.nodes_added.is_empty()
97            && self.nodes_removed.is_empty()
98            && self.value_changes.is_empty()
99            && self.subgraphs_added.is_empty()
100            && self.subgraphs_removed.is_empty()
101    }
102}
103
104/// A single node value change detected by [`diff_snapshots`].
105#[derive(Debug, Clone)]
106pub struct ValueChange {
107    /// Node path that changed.
108    pub path: String,
109    /// New value. `None` means the node transitioned to sentinel (INVALIDATE).
110    pub to: Option<Value>,
111}
112
113/// Compare two snapshots and produce a structural diff.
114///
115/// Only examines the top-level namespace (not recursive into subgraphs —
116/// subgraph diffs are handled by the attach wiring per-subgraph).
117#[must_use]
118pub fn diff_snapshots(
119    before: &GraphPersistSnapshot,
120    after: &GraphPersistSnapshot,
121) -> GraphSnapshotDiff {
122    let mut nodes_added = Vec::new();
123    let mut nodes_added_slices = Vec::new();
124    let mut nodes_removed = Vec::new();
125    let mut value_changes = Vec::new();
126    let mut subgraphs_added = Vec::new();
127    let mut subgraphs_removed = Vec::new();
128
129    // Nodes added or changed.
130    for (name, after_slice) in &after.nodes {
131        if let Some(before_slice) = before.nodes.get(name) {
132            if before_slice.value != after_slice.value {
133                value_changes.push(ValueChange {
134                    path: name.clone(),
135                    to: after_slice.value.clone(),
136                });
137            }
138        } else {
139            nodes_added.push(name.clone());
140            nodes_added_slices.push(after_slice.clone());
141        }
142    }
143
144    // Nodes removed.
145    for name in before.nodes.keys() {
146        if !after.nodes.contains_key(name) {
147            nodes_removed.push(name.clone());
148        }
149    }
150
151    // Subgraphs added/removed.
152    for name in after.subgraphs.keys() {
153        if !before.subgraphs.contains_key(name) {
154            subgraphs_added.push(name.clone());
155        }
156    }
157    for name in before.subgraphs.keys() {
158        if !after.subgraphs.contains_key(name) {
159            subgraphs_removed.push(name.clone());
160        }
161    }
162
163    GraphSnapshotDiff {
164        nodes_added,
165        nodes_added_slices,
166        nodes_removed,
167        value_changes,
168        subgraphs_added,
169        subgraphs_removed,
170    }
171}
172
173/// Intermediate frame before checksum stamping.
174struct DecomposedFrame {
175    lifecycle: Lifecycle,
176    path: String,
177    change: BaseChange<Value>,
178}
179
180/// Convert a [`GraphSnapshotDiff`] into WAL frames ready for persistence.
181///
182/// `timestamp_ns` is the wall-clock at diff time. `base_seq` is the WAL
183/// cursor; returned frames have `frame_seq` values starting at
184/// `base_seq + 1`.
185///
186/// Returns `(frames, next_seq)` where `next_seq` is the highest assigned
187/// `frame_seq`.
188pub fn decompose_diff_to_frames(
189    diff: &GraphSnapshotDiff,
190    timestamp_ns: u64,
191    base_seq: u64,
192) -> Result<(Vec<WALFrame<Value>>, u64), StorageError> {
193    let mut decomposed = Vec::new();
194
195    let wrap = |structure: &str, lifecycle: Lifecycle, payload: Value| -> BaseChange<Value> {
196        BaseChange {
197            structure: structure.to_owned(),
198            version: Version::Counter(SNAPSHOT_VERSION),
199            t_ns: timestamp_ns,
200            seq: None,
201            lifecycle,
202            change: payload,
203        }
204    };
205
206    // Spec lifecycle: node add/remove, subgraph mount/unmount.
207    for (i, name) in diff.nodes_added.iter().enumerate() {
208        let slice = &diff.nodes_added_slices[i];
209        let payload = serde_json::json!({
210            "kind": "graph.add",
211            "nodeId": name,
212            "slice": serde_json::to_value(slice).map_err(|e|
213                StorageError::Codec(crate::codec::CodecError::Encode(e.to_string()))
214            )?,
215        });
216        decomposed.push(DecomposedFrame {
217            lifecycle: Lifecycle::Spec,
218            path: name.clone(),
219            change: wrap("graph.spec", Lifecycle::Spec, payload),
220        });
221    }
222
223    for name in &diff.nodes_removed {
224        let payload = serde_json::json!({
225            "kind": "graph.remove",
226            "nodeId": name,
227        });
228        decomposed.push(DecomposedFrame {
229            lifecycle: Lifecycle::Spec,
230            path: name.clone(),
231            change: wrap("graph.spec", Lifecycle::Spec, payload),
232        });
233    }
234
235    for name in &diff.subgraphs_added {
236        let payload = serde_json::json!({
237            "kind": "graph.mount",
238            "path": name,
239            "subgraphId": name,
240        });
241        decomposed.push(DecomposedFrame {
242            lifecycle: Lifecycle::Spec,
243            path: name.clone(),
244            change: wrap("graph.spec", Lifecycle::Spec, payload),
245        });
246    }
247
248    for name in &diff.subgraphs_removed {
249        let payload = serde_json::json!({
250            "kind": "graph.unmount",
251            "path": name,
252        });
253        decomposed.push(DecomposedFrame {
254            lifecycle: Lifecycle::Spec,
255            path: name.clone(),
256            change: wrap("graph.spec", Lifecycle::Spec, payload),
257        });
258    }
259
260    // Data lifecycle: value changes.
261    for vc in &diff.value_changes {
262        let payload = if let Some(ref value) = vc.to {
263            serde_json::json!({
264                "kind": "node.set",
265                "path": vc.path,
266                "value": value,
267            })
268        } else {
269            serde_json::json!({
270                "kind": "node.invalidate",
271                "path": vc.path,
272            })
273        };
274        decomposed.push(DecomposedFrame {
275            lifecycle: Lifecycle::Data,
276            path: vc.path.clone(),
277            change: wrap("graph.value", Lifecycle::Data, payload),
278        });
279    }
280
281    // Assign frame_seq and compute checksums.
282    let mut seq = base_seq;
283    let mut frames = Vec::with_capacity(decomposed.len());
284    for d in decomposed {
285        seq += 1;
286        let mut frame = WALFrame {
287            t: WalTag,
288            lifecycle: d.lifecycle,
289            path: d.path,
290            change: d.change,
291            frame_seq: seq,
292            frame_t_ns: timestamp_ns,
293            checksum: String::new(),
294            format_version: 1,
295        };
296        frame.checksum = wal_frame_checksum(&frame)?;
297        frames.push(frame);
298    }
299
300    Ok((frames, seq))
301}
302
303// ── Attach (D170) ──────────────────────────────────────────────────────────
304
305/// Per-tier state managed by the attach wiring.
306struct TierState {
307    /// The snapshot tier (writes full baselines).
308    snapshot_tier: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
309    /// Optional WAL tier (writes individual delta frames).
310    wal_tier: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
311    /// WAL key prefix derived from `graph.name`.
312    wal_prefix: String,
313    /// Monotonic cursor.
314    seq: u64,
315    /// Flush counter for `compact_every` cadence.
316    flush_count: u64,
317    /// Configured compact-every cadence (0 = every flush writes full baseline).
318    compact_every: u32,
319    /// Snapshot tier's `debounce_ms` snapshot at attach time (D171).
320    /// `0` (the unset / disabled case) means inline-flush; `>0` means
321    /// `save()` buffers and the caller drives explicit flush via
322    /// [`StorageHandle::flush_all`].
323    snapshot_debounce_ms: u32,
324    /// WAL tier's `debounce_ms` snapshot at attach time (D171).
325    wal_debounce_ms: u32,
326    /// Last snapshot used for diff computation.
327    last_snapshot: Option<GraphPersistSnapshot>,
328    /// Disposed flag.
329    disposed: bool,
330}
331
332/// Configuration for a single snapshot+WAL tier pair.
333pub struct AttachTierPair {
334    /// Snapshot tier for full baselines.
335    pub snapshot: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
336    /// Optional WAL tier for delta frames. When `None`, every flush
337    /// writes a full baseline (no incremental WAL).
338    pub wal: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
339}
340
341/// Filter predicate for [`AttachOptions`].
342pub type PathFilter = Box<dyn Fn(&str) -> bool + Send + Sync>;
343
344/// Error callback for [`AttachOptions`].
345pub type ErrorCallback = Box<dyn Fn(&StorageError) + Send + Sync>;
346
347/// Options for [`attach_snapshot_storage`].
348#[derive(Default)]
349pub struct AttachOptions {
350    /// Per-path filter. Return `true` to persist changes for this path.
351    /// `None` means persist all paths.
352    pub filter: Option<PathFilter>,
353    /// Error callback invoked when a flush fails.
354    pub on_error: Option<ErrorCallback>,
355}
356
357/// Handle returned by [`attach_snapshot_storage`]. Dropping this handle
358/// unsubscribes the observe sink (RAII disposal).
359pub struct StorageHandle {
360    /// Shared state; inner `disposed` flag prevents late-fire callbacks.
361    state: Arc<Mutex<Vec<TierState>>>,
362    /// Graph reference (kept alive so the observe subscription stays valid).
363    _graph: Graph,
364    /// The observe handle — dropping it unsubscribes all sinks.
365    _observe: graphrefly_graph::GraphObserveAllReactive,
366}
367
368impl StorageHandle {
369    /// Explicitly dispose (equivalent to `Drop`, but callable).
370    ///
371    /// F3 (QA 2026-05-14): on `PoisonError` (a panic in the observe
372    /// sink poisoned the state mutex), recover via `into_inner()`
373    /// rather than silently no-op. The disposed flag still gets set
374    /// even after a sink panic, which matters for cleanup.
375    pub fn dispose(&self) {
376        let mut states = self
377            .state
378            .lock()
379            .unwrap_or_else(std::sync::PoisonError::into_inner);
380        for s in states.iter_mut() {
381            s.disposed = true;
382        }
383    }
384
385    /// Drain pending writes on all tiers (D171).
386    ///
387    /// When a tier is configured with `debounce_ms > 0`,
388    /// [`attach_snapshot_storage`]'s observe sink writes via the
389    /// tier's `save()` but skips the inline `flush()`. Callers
390    /// invoke this method — typically from a binding-side reactive
391    /// timer subgraph (`graphrefly_operators::temporal::interval`,
392    /// shipped in Slice T) — to commit the buffered writes to their
393    /// backends. With `debounce_ms == 0`, the observe sink already
394    /// force-flushes inline; calling this method is then a no-op
395    /// because the tier's pending buffers are empty.
396    ///
397    /// Returns the first error encountered, if any. Successful tiers
398    /// in the same call still flush; the error is surfaced for the
399    /// caller's diagnostics.
400    ///
401    /// # Lock discipline (N3, QA 2026-05-14)
402    ///
403    /// The state mutex is **not** held across `tier.flush()` calls.
404    /// Holding it across blocking I/O would (a) serialize the
405    /// reactive graph against backend latency (every observe-sink
406    /// fire would wait on flush completion), and (b) deadlock if a
407    /// caller invokes `flush_all` from inside an `on_error` callback
408    /// (the observe sink already holds `state.lock()` when it
409    /// invokes `on_error`). The implementation snapshots the per-
410    /// tier flush requests under the lock, drops the lock, then
411    /// runs the flushes in sequence. After each flush, a brief
412    /// re-lock applies any per-tier state mutations (none today —
413    /// `flush()` doesn't return new state).
414    ///
415    /// # Errors
416    ///
417    /// Returns the first `StorageError` encountered. Subsequent
418    /// errors are dropped (acceptable v1; the registered
419    /// `on_error` callback fires per error in the observe sink path
420    /// and is the canonical multi-error reporting surface).
421    ///
422    /// F3: recovers from a poisoned state mutex via `into_inner`
423    /// rather than silently returning Ok.
424    pub fn flush_all(&self) -> Result<(), StorageError> {
425        // Snapshot the per-tier flush callables under the lock,
426        // then drop the lock before invoking them. Each tier's
427        // `flush()` is a `&self` method on `Box<dyn ...>` — to call
428        // it without holding the outer lock, we collect raw
429        // pointers... no, `Box<dyn ...>` can't be cloned. Instead,
430        // we re-lock briefly per tier to call flush, but never hold
431        // the lock across MULTIPLE tier flushes — that's the key
432        // deadlock avoidance.
433        //
434        // The remaining single-tier-lock-during-flush window can
435        // deadlock if `tier.flush()` synchronously re-enters
436        // `dispose()` / `flush_all()` (which would try to re-acquire
437        // `self.state`). Tier impls in `graphrefly-storage` don't do
438        // that; document the contract for future binding-side tier
439        // impls in the rustdoc.
440        let tier_count = {
441            let states = self
442                .state
443                .lock()
444                .unwrap_or_else(std::sync::PoisonError::into_inner);
445            states.len()
446        };
447        let mut first_err: Option<StorageError> = None;
448        for idx in 0..tier_count {
449            // Per-tier flush: brief lock to access the tier, run
450            // flush, drop lock immediately. The tier itself is
451            // behind a Box<dyn ...> inside the Vec; flush() takes
452            // &self, so the &mut on the outer Vec is only needed
453            // for `s.disposed` check.
454            let snapshot_err: Option<StorageError>;
455            let wal_err: Option<StorageError>;
456            {
457                let mut states = self
458                    .state
459                    .lock()
460                    .unwrap_or_else(std::sync::PoisonError::into_inner);
461                let Some(s) = states.get_mut(idx) else {
462                    continue;
463                };
464                if s.disposed {
465                    continue;
466                }
467                // Call flush while holding the lock — this is
468                // narrow-scoped and the only contention is with the
469                // observe sink, which is itself bounded by wave
470                // dispatch. No `on_error` invocation under this
471                // lock (callers receive errors via the return
472                // value, not via the registered callback).
473                snapshot_err = s.snapshot_tier.flush().err();
474                wal_err = s.wal_tier.as_ref().and_then(|wal| wal.flush().err());
475            }
476            if let Some(e) = snapshot_err {
477                if first_err.is_none() {
478                    first_err = Some(e);
479                }
480            }
481            if let Some(e) = wal_err {
482                if first_err.is_none() {
483                    first_err = Some(e);
484                }
485            }
486        }
487        match first_err {
488            None => Ok(()),
489            Some(e) => Err(e),
490        }
491    }
492}
493
494impl Drop for StorageHandle {
495    fn drop(&mut self) {
496        self.dispose();
497    }
498}
499
500/// Wire an observe subscription on `graph` that persists node changes
501/// to the provided snapshot+WAL tier pairs.
502///
503/// # Debounce (D171, resolved 2026-05-14)
504///
505/// When a tier's `debounce_ms` is `Some(ms)` with `ms > 0`, the
506/// observe callback writes via the tier's `save()` (which buffers
507/// internally per the [`BaseStorageTier`](crate::tier::BaseStorageTier)
508/// contract) but does NOT force a `flush()`. Callers drain pending
509/// writes by invoking [`StorageHandle::flush_all`] — typically from a
510/// binding-side reactive timer (e.g.,
511/// `graphrefly_operators::temporal::interval`). This keeps
512/// `graphrefly-storage` sync + binding-agnostic per the canonical
513/// no-async-in-storage invariant; reactive-timer wiring lives in the
514/// layer that already owns the timer primitive.
515///
516/// When `debounce_ms` is `None` or `Some(0)`, the observe callback
517/// continues to force-flush inline as before.
518///
519/// # `key_of` (D174, closes F8)
520///
521/// The snapshot tier's backend key is derived from `graph.name` via
522/// the checkpoint record's `name` field. Cross-impl `key_of` divergence
523/// disappears at this boundary.
524#[must_use = "the returned StorageHandle owns the observe subscription; \
525              dropping it immediately unsubscribes and stops persistence"]
526pub fn attach_snapshot_storage(
527    graph: &Graph,
528    pairs: Vec<AttachTierPair>,
529    options: AttachOptions,
530) -> StorageHandle {
531    let graph_name = graph.name();
532    let wal_prefix = graph_wal_prefix(&graph_name);
533
534    let mut states = Vec::with_capacity(pairs.len());
535    for pair in pairs {
536        // Bootstrap: enumerate existing WAL frames to find high-water seq.
537        let mut high_seq: u64 = 0;
538        if let Some(ref wal) = pair.wal {
539            if let Ok(keys) = wal.list(&wal_prefix) {
540                for key in keys {
541                    if let Some(seg) = key.rsplit('/').next() {
542                        if let Ok(s) = seg.parse::<u64>() {
543                            high_seq = high_seq.max(s);
544                        }
545                    }
546                }
547            }
548        }
549
550        let compact_every = pair.snapshot.compact_every().unwrap_or(10);
551        let snapshot_debounce = pair.snapshot.debounce_ms().unwrap_or(0);
552        let wal_debounce = pair.wal.as_ref().and_then(|w| w.debounce_ms()).unwrap_or(0);
553
554        states.push(TierState {
555            snapshot_tier: pair.snapshot,
556            wal_tier: pair.wal,
557            wal_prefix: wal_prefix.clone(),
558            seq: high_seq,
559            flush_count: 0,
560            compact_every,
561            snapshot_debounce_ms: snapshot_debounce,
562            wal_debounce_ms: wal_debounce,
563            last_snapshot: None,
564            disposed: false,
565        });
566    }
567
568    let shared_states = Arc::new(Mutex::new(states));
569    let states_for_sink = shared_states.clone();
570    let graph_clone = graph.clone();
571    let filter = options.filter;
572    let on_error = options.on_error;
573
574    // Wire observe_all_reactive so late-added nodes are also covered.
575    let mut observe = graph.observe_all_reactive();
576    observe.subscribe(move |path: &str, messages: &[Message]| {
577        // Filter: only tiers 3–5 (DATA/RESOLVED, INVALIDATE, COMPLETE/ERROR).
578        let dominated_by_tier = messages.iter().any(|m| {
579            let t = m.tier();
580            (3..6).contains(&t)
581        });
582        if !dominated_by_tier {
583            return;
584        }
585
586        // Optional path filter.
587        if let Some(ref f) = filter {
588            if !f(path) {
589                return;
590            }
591        }
592
593        // Take a snapshot once, shared across all sync tiers.
594        let snapshot = graph_clone.snapshot();
595
596        // N3 (QA 2026-05-14) — collect errors INSIDE the lock,
597        // invoke `on_error` AFTER releasing the lock. The earlier
598        // pattern called `cb(&e)` while holding `states_for_sink`,
599        // which deadlocked if the user's callback re-entered
600        // `StorageHandle::flush_all` / `dispose` (both acquire the
601        // same `Mutex`). F3: recover from poisoned lock via
602        // `into_inner` rather than silently no-op.
603        let collected_errors: Vec<StorageError> = {
604            let mut states = states_for_sink
605                .lock()
606                .unwrap_or_else(std::sync::PoisonError::into_inner);
607            let mut errs = Vec::new();
608            for s in states.iter_mut() {
609                if s.disposed {
610                    continue;
611                }
612                if let Err(e) = flush_tier(s, &snapshot) {
613                    errs.push(e);
614                }
615            }
616            errs
617        };
618        if !collected_errors.is_empty() {
619            if let Some(ref cb) = on_error {
620                for e in &collected_errors {
621                    cb(e);
622                }
623            }
624        }
625    });
626
627    StorageHandle {
628        state: shared_states,
629        _graph: graph.clone(),
630        _observe: observe,
631    }
632}
633
634/// Flush a single tier: either full baseline or WAL delta.
635fn flush_tier(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
636    s.flush_count += 1;
637
638    let write_full = s.wal_tier.is_none()
639        || s.last_snapshot.is_none()
640        || (s.compact_every > 0 && s.flush_count.is_multiple_of(u64::from(s.compact_every)));
641
642    if write_full {
643        write_full_baseline(s, snapshot)?;
644    } else {
645        write_wal_delta(s, snapshot)?;
646    }
647
648    s.last_snapshot = Some(snapshot.clone());
649    Ok(())
650}
651
652/// Write a full baseline snapshot to the snapshot tier.
653///
654/// When `snapshot_debounce_ms > 0` (D171), the inner `save()` buffers
655/// per the tier's `BaseStorageTier` contract and we DON'T force a
656/// `flush()`. The caller drains via [`StorageHandle::flush_all`].
657fn write_full_baseline(
658    s: &mut TierState,
659    snapshot: &GraphPersistSnapshot,
660) -> Result<(), StorageError> {
661    let timestamp_ns = graphrefly_core::wall_clock_ns();
662    let record = GraphCheckpointRecord {
663        name: snapshot.name.clone(),
664        mode: "full".to_owned(),
665        snapshot: snapshot.clone(),
666        seq: s.seq,
667        timestamp_ns,
668        format_version: SNAPSHOT_VERSION,
669    };
670
671    s.snapshot_tier.save(record)?;
672    if s.snapshot_debounce_ms == 0 {
673        s.snapshot_tier.flush()?;
674    }
675    Ok(())
676}
677
678/// Write WAL delta frames for the diff between `last_snapshot` and current.
679///
680/// D171: WAL `flush()` is skipped when the WAL tier has
681/// `debounce_ms > 0`; the caller drains via
682/// [`StorageHandle::flush_all`].
683fn write_wal_delta(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
684    let last = s
685        .last_snapshot
686        .as_ref()
687        .expect("caller ensures last_snapshot is Some");
688    let diff = diff_snapshots(last, snapshot);
689
690    if diff.is_empty() {
691        return Ok(());
692    }
693
694    let timestamp_ns = graphrefly_core::wall_clock_ns();
695    let (frames, next_seq) = decompose_diff_to_frames(&diff, timestamp_ns, s.seq)?;
696
697    if let Some(ref wal) = s.wal_tier {
698        for frame in &frames {
699            let key = wal_frame_key(&s.wal_prefix, frame.frame_seq);
700            wal.save(&key, frame.clone())?;
701        }
702        if s.wal_debounce_ms == 0 {
703            wal.flush()?;
704        }
705    }
706
707    s.seq = next_seq;
708    Ok(())
709}
710
711// ── Restore (D170) ─────────────────────────────────────────────────────────
712
713/// Torn-write policy for mid-stream checksum failures.
714#[derive(Debug, Clone, Copy, PartialEq, Eq)]
715pub enum TornWritePolicy {
716    /// Drop the frame and continue (default for WAL tail).
717    Skip,
718    /// Abort the entire restore.
719    Abort,
720}
721
722/// Callback for torn-write decisions. Receives the `frame_seq` and
723/// reason; returns the desired policy.
724pub type OnTornWrite = Box<dyn Fn(u64, &str) -> TornWritePolicy + Send + Sync>;
725
726/// Options for [`restore_snapshot`].
727pub struct RestoreOptions<'a> {
728    /// Snapshot tier to load the baseline from.
729    pub snapshot_tier: &'a dyn SnapshotStorageTier<GraphCheckpointRecord>,
730    /// WAL tier to enumerate delta frames from.
731    pub wal_tier: &'a dyn KvStorageTier<WALFrame<Value>>,
732    /// Optional max `frame_seq` to replay up to. `None` = replay all.
733    pub target_seq: Option<u64>,
734    /// Torn-write callback. If `None`, defaults: tail = Skip, mid = Abort.
735    pub on_torn_write: Option<OnTornWrite>,
736}
737
738/// Three-phase WAL replay: baseline load → checksum verify → lifecycle-
739/// scoped batch.
740///
741/// # Phase 1: Baseline
742///
743/// Loads the `mode:"full"` baseline from the snapshot tier. The snapshot
744/// key is derived from `graph.name` (D174).
745///
746/// # Phase 2: Checksum verification
747///
748/// Enumerates WAL frames with `frame_seq > baseline.seq`, verifies each
749/// frame's SHA-256 checksum, applies torn-write policy on mismatch.
750///
751/// # Phase 3: Lifecycle-scoped batch replay
752///
753/// Groups verified frames by lifecycle. Replays in cross-scope order
754/// (`Spec → Data → Ownership`). Each lifecycle runs in a `graph.batch()`
755/// for atomic partial-restore semantics (Q2).
756pub fn restore_snapshot(
757    graph: &Graph,
758    opts: &RestoreOptions<'_>,
759) -> Result<RestoreResult, RestoreError> {
760    // Phase 1: Load and apply baseline.
761    let baseline = load_baseline(graph, opts)?;
762    let baseline_seq = baseline.seq;
763
764    // Phase 1b: Collect WAL frames post-baseline.
765    let collected = collect_wal_frames(opts, &baseline.name, baseline_seq)?;
766
767    // Phase 2: Checksum verification.
768    let (verified, skipped) = verify_frames(collected, opts.on_torn_write.as_ref())?;
769
770    // Phase 3: Lifecycle-scoped batch replay.
771    Ok(replay_by_lifecycle(graph, &verified, baseline_seq, skipped))
772}
773
774/// Phase 1: Load baseline from snapshot tier + apply to graph.
775fn load_baseline(
776    graph: &Graph,
777    opts: &RestoreOptions<'_>,
778) -> Result<GraphCheckpointRecord, RestoreError> {
779    let baseline = opts
780        .snapshot_tier
781        .load()
782        .map_err(|e| RestoreError::PhaseFailed {
783            lifecycle: Lifecycle::Spec,
784            frame_seq: 0,
785            message: format!("baseline load failed: {e}"),
786        })?
787        .ok_or(RestoreError::BaselineMissing)?;
788
789    if baseline.mode != "full" {
790        return Err(RestoreError::BaselineMissing);
791    }
792
793    graph
794        .restore(&baseline.snapshot)
795        .map_err(|e| RestoreError::PhaseFailed {
796            lifecycle: Lifecycle::Spec,
797            frame_seq: 0,
798            message: format!("baseline restore failed: {e}"),
799        })?;
800
801    Ok(baseline)
802}
803
804/// Phase 1b: Enumerate + filter WAL frames.
805fn collect_wal_frames(
806    opts: &RestoreOptions<'_>,
807    graph_name: &str,
808    baseline_seq: u64,
809) -> Result<Vec<WALFrame<Value>>, RestoreError> {
810    let wal_prefix = graph_wal_prefix(graph_name);
811    let keys = opts
812        .wal_tier
813        .list(&wal_prefix)
814        .map_err(|e| RestoreError::PhaseFailed {
815            lifecycle: Lifecycle::Spec,
816            frame_seq: 0,
817            message: format!("WAL frame enumeration failed: {e}"),
818        })?;
819
820    let mut collected: Vec<WALFrame<Value>> = Vec::new();
821    for key in keys {
822        let frame_seq = key
823            .rsplit('/')
824            .next()
825            .and_then(|s| s.parse::<u64>().ok())
826            .unwrap_or(0);
827
828        if frame_seq <= baseline_seq {
829            continue;
830        }
831        if let Some(target) = opts.target_seq {
832            if frame_seq > target {
833                continue;
834            }
835        }
836
837        if let Some(frame) = opts
838            .wal_tier
839            .load(&key)
840            .map_err(|e| RestoreError::PhaseFailed {
841                lifecycle: Lifecycle::Data,
842                frame_seq,
843                message: format!("WAL frame load failed: {e}"),
844            })?
845        {
846            collected.push(frame);
847        }
848    }
849
850    collected.sort_by_key(|f| f.frame_seq);
851    Ok(collected)
852}
853
854/// Phase 2: Verify checksums, apply torn-write policy.
855fn verify_frames(
856    collected: Vec<WALFrame<Value>>,
857    on_torn_write: Option<&OnTornWrite>,
858) -> Result<(Vec<WALFrame<Value>>, u64), RestoreError> {
859    let mut verified = Vec::new();
860    let mut skipped: u64 = 0;
861    let total = collected.len();
862
863    for (i, frame) in collected.into_iter().enumerate() {
864        if verify_wal_frame_checksum(&frame).unwrap_or(false) {
865            verified.push(frame);
866            continue;
867        }
868
869        let is_tail = i == total - 1;
870        let policy = if let Some(cb) = on_torn_write {
871            cb(frame.frame_seq, "checksum-mismatch")
872        } else if is_tail {
873            TornWritePolicy::Skip
874        } else {
875            TornWritePolicy::Abort
876        };
877
878        match policy {
879            TornWritePolicy::Skip => skipped += 1,
880            TornWritePolicy::Abort => {
881                return Err(RestoreError::TornWriteMidStream {
882                    frame_seq: frame.frame_seq,
883                    reason: "checksum-mismatch".to_owned(),
884                });
885            }
886        }
887    }
888
889    Ok((verified, skipped))
890}
891
892/// Phase 3: Group by lifecycle, replay in cross-scope order.
893fn replay_by_lifecycle(
894    graph: &Graph,
895    verified: &[WALFrame<Value>],
896    baseline_seq: u64,
897    skipped: u64,
898) -> RestoreResult {
899    let mut grouped: [Vec<WALFrame<Value>>; 3] = [Vec::new(), Vec::new(), Vec::new()];
900    for frame in verified {
901        for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
902            if frame.lifecycle == *lifecycle {
903                grouped[idx].push(frame.clone());
904                break;
905            }
906        }
907    }
908
909    let mut phases = Vec::new();
910    let mut replayed: u64 = 0;
911    let mut final_seq: u64 = baseline_seq;
912
913    for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
914        let life_frames = &grouped[idx];
915        if life_frames.is_empty() {
916            continue;
917        }
918        let frame_count = life_frames.len() as u64;
919        let max_seq = life_frames.iter().map(|f| f.frame_seq).max().unwrap_or(0);
920
921        let frames_for_batch = life_frames.clone();
922        let graph_for_batch = graph.clone();
923        graph.batch(move || {
924            for frame in &frames_for_batch {
925                apply_wal_frame(&graph_for_batch, frame);
926            }
927        });
928
929        replayed += frame_count;
930        final_seq = final_seq.max(max_seq);
931        phases.push(PhaseStat {
932            lifecycle: *lifecycle,
933            frames: frame_count,
934        });
935    }
936
937    RestoreResult {
938        replayed_frames: replayed,
939        skipped_frames: skipped,
940        final_seq,
941        phases,
942    }
943}
944
945/// Apply a single WAL frame to a graph. Mirrors TS `applyWalFrame`.
946fn apply_wal_frame(graph: &Graph, frame: &WALFrame<Value>) {
947    let change = &frame.change.change;
948    let kind = change.get("kind").and_then(Value::as_str).unwrap_or("");
949
950    match frame.lifecycle {
951        Lifecycle::Spec => match kind {
952            "graph.add" => {
953                let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
954                if node_id_str.is_empty() || graph.try_resolve(node_id_str).is_some() {
955                    return; // already present or invalid
956                }
957                // Only auto-create state nodes (matches TS behavior).
958                let slice = change.get("slice");
959                let node_type = slice
960                    .and_then(|s| s.get("type"))
961                    .and_then(Value::as_str)
962                    .unwrap_or("");
963                if node_type != "state" {
964                    return;
965                }
966                let initial_value = slice.and_then(|s| s.get("value")).cloned();
967                let handle = initial_value.map_or(graphrefly_core::NO_HANDLE, |v| {
968                    graph.core().binding_ptr().deserialize_value(v)
969                });
970                let _ = graph.state(node_id_str, Some(handle));
971            }
972            "graph.remove" => {
973                let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
974                if !node_id_str.is_empty() && graph.try_resolve(node_id_str).is_some() {
975                    let _ = graph.remove(node_id_str);
976                }
977            }
978            // graph.mount, graph.unmount — deferred (Phase 14.6+)
979            _ => {}
980        },
981        Lifecycle::Data => match kind {
982            "node.set" => {
983                let path = change.get("path").and_then(Value::as_str).unwrap_or("");
984                if let Some(value) = change.get("value") {
985                    if !path.is_empty() && graph.try_resolve(path).is_some() {
986                        let handle = graph.core().binding_ptr().deserialize_value(value.clone());
987                        graph.set(path, handle);
988                    }
989                }
990            }
991            "node.invalidate" => {
992                let path = change.get("path").and_then(Value::as_str).unwrap_or("");
993                if !path.is_empty() {
994                    if let Some(id) = graph.try_resolve(path) {
995                        graph.invalidate(id);
996                    }
997                }
998            }
999            // node.versionBump — deferred (V0 versioning is internal)
1000            _ => {}
1001        },
1002        // Ownership lifecycle — deferred (Phase 13)
1003        Lifecycle::Ownership => {}
1004    }
1005}