graphrefly_storage/graph_integration.rs
1//! Graph-level storage integration (M4.E2 — D170–D174).
2//!
3//! Free functions (not `Graph` methods) because `graphrefly-graph` does not
4//! depend on `graphrefly-storage` — the dependency flows in the other
5//! direction. See D170.
6//!
7//! # Provided APIs
8//!
9//! - [`GraphCheckpointRecord`] — portable baseline type wrapping a
10//! [`GraphPersistSnapshot`] + seq metadata + `format_version` (F7 close).
11//! - [`diff_snapshots`] / [`decompose_diff_to_frames`] — snapshot diff →
12//! WAL frame generation engine (D172).
13//! - [`attach_snapshot_storage`] + [`StorageHandle`] — wire observe
14//! subscription → snapshot diff → WAL frame writes.
15//! - [`restore_snapshot`] — three-phase replay (baseline → checksum verify
16//! → lifecycle-scoped batch).
17//!
18//! # Manifest (D173 — F4 close)
19//!
20//! F4 (cross-restart key recovery) is structurally closed by D174: at the
21//! attach boundary, `key_of` is derived deterministically from
22//! `graph.name`. On restore, the snapshot key is known without a separate
23//! manifest entry. The checkpoint record's `seq` field serves as the WAL
24//! high-water mark.
25
26use std::sync::{Arc, Mutex};
27
28use graphrefly_core::Message;
29use graphrefly_graph::{Graph, GraphPersistSnapshot, NodeSlice};
30use graphrefly_structures::{BaseChange, Lifecycle, Version};
31use serde::{Deserialize, Serialize};
32use serde_json::Value;
33
34use crate::error::{PhaseStat, RestoreError, RestoreResult, StorageError};
35use crate::tier::{KvStorageTier, SnapshotStorageTier};
36use crate::wal::{
37 graph_wal_prefix, verify_wal_frame_checksum, wal_frame_checksum, wal_frame_key, WALFrame,
38 WalTag, REPLAY_ORDER,
39};
40
41// ── Constants ──────────────────────────────────────────────────────────────
42
43/// Current snapshot format version. Embedded in [`GraphCheckpointRecord`]
44/// and in `BaseChange.version` within decomposed WAL frames (F7 close).
45pub const SNAPSHOT_VERSION: u64 = 1;
46
47// ── Types ──────────────────────────────────────────────────────────────────
48
49/// Portable baseline record written by [`attach_snapshot_storage`] on
50/// full-snapshot writes. Contains the full [`GraphPersistSnapshot`] plus
51/// metadata for WAL cursor alignment.
52///
53/// The `format_version` field closes F7 (missing `format_version` on
54/// `WALFrame` and checkpoint records).
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct GraphCheckpointRecord {
57 /// Graph name (matches `snapshot.name`).
58 pub name: String,
59 /// Snapshot mode — `"full"` for baseline, `"diff"` reserved for future
60 /// incremental baselines.
61 pub mode: String,
62 /// The complete graph state.
63 pub snapshot: GraphPersistSnapshot,
64 /// WAL-tier cursor at baseline write time. Frames with `frame_seq >
65 /// seq` are the delta.
66 pub seq: u64,
67 /// Wall-clock timestamp at baseline write time.
68 pub timestamp_ns: u64,
69 /// Format version (F7 close).
70 pub format_version: u64,
71}
72
73// ── Diff engine (D172) ─────────────────────────────────────────────────────
74
75/// Structural diff between two [`GraphPersistSnapshot`]s.
76#[derive(Debug, Clone)]
77pub struct GraphSnapshotDiff {
78 /// Node names present in `after` but not `before`.
79 pub nodes_added: Vec<String>,
80 /// Full slices for added nodes (parallel to `nodes_added`).
81 pub nodes_added_slices: Vec<NodeSlice>,
82 /// Node names present in `before` but not `after`.
83 pub nodes_removed: Vec<String>,
84 /// Nodes whose `value` field changed between snapshots.
85 pub value_changes: Vec<ValueChange>,
86 /// Subgraph mount names added.
87 pub subgraphs_added: Vec<String>,
88 /// Subgraph mount names removed.
89 pub subgraphs_removed: Vec<String>,
90}
91
92impl GraphSnapshotDiff {
93 /// True when no structural or value changes were detected.
94 #[must_use]
95 pub fn is_empty(&self) -> bool {
96 self.nodes_added.is_empty()
97 && self.nodes_removed.is_empty()
98 && self.value_changes.is_empty()
99 && self.subgraphs_added.is_empty()
100 && self.subgraphs_removed.is_empty()
101 }
102}
103
104/// A single node value change detected by [`diff_snapshots`].
105#[derive(Debug, Clone)]
106pub struct ValueChange {
107 /// Node path that changed.
108 pub path: String,
109 /// New value. `None` means the node transitioned to sentinel (INVALIDATE).
110 pub to: Option<Value>,
111}
112
113/// Compare two snapshots and produce a structural diff.
114///
115/// Only examines the top-level namespace (not recursive into subgraphs —
116/// subgraph diffs are handled by the attach wiring per-subgraph).
117#[must_use]
118pub fn diff_snapshots(
119 before: &GraphPersistSnapshot,
120 after: &GraphPersistSnapshot,
121) -> GraphSnapshotDiff {
122 let mut nodes_added = Vec::new();
123 let mut nodes_added_slices = Vec::new();
124 let mut nodes_removed = Vec::new();
125 let mut value_changes = Vec::new();
126 let mut subgraphs_added = Vec::new();
127 let mut subgraphs_removed = Vec::new();
128
129 // Nodes added or changed.
130 for (name, after_slice) in &after.nodes {
131 if let Some(before_slice) = before.nodes.get(name) {
132 if before_slice.value != after_slice.value {
133 value_changes.push(ValueChange {
134 path: name.clone(),
135 to: after_slice.value.clone(),
136 });
137 }
138 } else {
139 nodes_added.push(name.clone());
140 nodes_added_slices.push(after_slice.clone());
141 }
142 }
143
144 // Nodes removed.
145 for name in before.nodes.keys() {
146 if !after.nodes.contains_key(name) {
147 nodes_removed.push(name.clone());
148 }
149 }
150
151 // Subgraphs added/removed.
152 for name in after.subgraphs.keys() {
153 if !before.subgraphs.contains_key(name) {
154 subgraphs_added.push(name.clone());
155 }
156 }
157 for name in before.subgraphs.keys() {
158 if !after.subgraphs.contains_key(name) {
159 subgraphs_removed.push(name.clone());
160 }
161 }
162
163 GraphSnapshotDiff {
164 nodes_added,
165 nodes_added_slices,
166 nodes_removed,
167 value_changes,
168 subgraphs_added,
169 subgraphs_removed,
170 }
171}
172
173/// Intermediate frame before checksum stamping.
174struct DecomposedFrame {
175 lifecycle: Lifecycle,
176 path: String,
177 change: BaseChange<Value>,
178}
179
180/// Convert a [`GraphSnapshotDiff`] into WAL frames ready for persistence.
181///
182/// `timestamp_ns` is the wall-clock at diff time. `base_seq` is the WAL
183/// cursor; returned frames have `frame_seq` values starting at
184/// `base_seq + 1`.
185///
186/// Returns `(frames, next_seq)` where `next_seq` is the highest assigned
187/// `frame_seq`.
188pub fn decompose_diff_to_frames(
189 diff: &GraphSnapshotDiff,
190 timestamp_ns: u64,
191 base_seq: u64,
192) -> Result<(Vec<WALFrame<Value>>, u64), StorageError> {
193 let mut decomposed = Vec::new();
194
195 let wrap = |structure: &str, lifecycle: Lifecycle, payload: Value| -> BaseChange<Value> {
196 BaseChange {
197 structure: structure.to_owned(),
198 version: Version::Counter(SNAPSHOT_VERSION),
199 t_ns: timestamp_ns,
200 seq: None,
201 lifecycle,
202 change: payload,
203 }
204 };
205
206 // Spec lifecycle: node add/remove, subgraph mount/unmount.
207 for (i, name) in diff.nodes_added.iter().enumerate() {
208 let slice = &diff.nodes_added_slices[i];
209 let payload = serde_json::json!({
210 "kind": "graph.add",
211 "nodeId": name,
212 "slice": serde_json::to_value(slice).map_err(|e|
213 StorageError::Codec(crate::codec::CodecError::Encode(e.to_string()))
214 )?,
215 });
216 decomposed.push(DecomposedFrame {
217 lifecycle: Lifecycle::Spec,
218 path: name.clone(),
219 change: wrap("graph.spec", Lifecycle::Spec, payload),
220 });
221 }
222
223 for name in &diff.nodes_removed {
224 let payload = serde_json::json!({
225 "kind": "graph.remove",
226 "nodeId": name,
227 });
228 decomposed.push(DecomposedFrame {
229 lifecycle: Lifecycle::Spec,
230 path: name.clone(),
231 change: wrap("graph.spec", Lifecycle::Spec, payload),
232 });
233 }
234
235 for name in &diff.subgraphs_added {
236 let payload = serde_json::json!({
237 "kind": "graph.mount",
238 "path": name,
239 "subgraphId": name,
240 });
241 decomposed.push(DecomposedFrame {
242 lifecycle: Lifecycle::Spec,
243 path: name.clone(),
244 change: wrap("graph.spec", Lifecycle::Spec, payload),
245 });
246 }
247
248 for name in &diff.subgraphs_removed {
249 let payload = serde_json::json!({
250 "kind": "graph.unmount",
251 "path": name,
252 });
253 decomposed.push(DecomposedFrame {
254 lifecycle: Lifecycle::Spec,
255 path: name.clone(),
256 change: wrap("graph.spec", Lifecycle::Spec, payload),
257 });
258 }
259
260 // Data lifecycle: value changes.
261 for vc in &diff.value_changes {
262 let payload = if let Some(ref value) = vc.to {
263 serde_json::json!({
264 "kind": "node.set",
265 "path": vc.path,
266 "value": value,
267 })
268 } else {
269 serde_json::json!({
270 "kind": "node.invalidate",
271 "path": vc.path,
272 })
273 };
274 decomposed.push(DecomposedFrame {
275 lifecycle: Lifecycle::Data,
276 path: vc.path.clone(),
277 change: wrap("graph.value", Lifecycle::Data, payload),
278 });
279 }
280
281 // Assign frame_seq and compute checksums.
282 let mut seq = base_seq;
283 let mut frames = Vec::with_capacity(decomposed.len());
284 for d in decomposed {
285 seq += 1;
286 let mut frame = WALFrame {
287 t: WalTag,
288 lifecycle: d.lifecycle,
289 path: d.path,
290 change: d.change,
291 frame_seq: seq,
292 frame_t_ns: timestamp_ns,
293 checksum: String::new(),
294 format_version: 1,
295 };
296 frame.checksum = wal_frame_checksum(&frame)?;
297 frames.push(frame);
298 }
299
300 Ok((frames, seq))
301}
302
303// ── Attach (D170) ──────────────────────────────────────────────────────────
304
305/// Per-tier state managed by the attach wiring.
306struct TierState {
307 /// The snapshot tier (writes full baselines).
308 snapshot_tier: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
309 /// Optional WAL tier (writes individual delta frames).
310 wal_tier: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
311 /// WAL key prefix derived from `graph.name`.
312 wal_prefix: String,
313 /// Monotonic cursor.
314 seq: u64,
315 /// Flush counter for `compact_every` cadence.
316 flush_count: u64,
317 /// Configured compact-every cadence (0 = every flush writes full baseline).
318 compact_every: u32,
319 /// Snapshot tier's `debounce_ms` snapshot at attach time (D171).
320 /// `0` (the unset / disabled case) means inline-flush; `>0` means
321 /// `save()` buffers and the caller drives explicit flush via
322 /// [`StorageHandle::flush_all`].
323 snapshot_debounce_ms: u32,
324 /// WAL tier's `debounce_ms` snapshot at attach time (D171).
325 wal_debounce_ms: u32,
326 /// Last snapshot used for diff computation.
327 last_snapshot: Option<GraphPersistSnapshot>,
328 /// Disposed flag.
329 disposed: bool,
330}
331
332/// Configuration for a single snapshot+WAL tier pair.
333pub struct AttachTierPair {
334 /// Snapshot tier for full baselines.
335 pub snapshot: Box<dyn SnapshotStorageTier<GraphCheckpointRecord>>,
336 /// Optional WAL tier for delta frames. When `None`, every flush
337 /// writes a full baseline (no incremental WAL).
338 pub wal: Option<Box<dyn KvStorageTier<WALFrame<Value>>>>,
339}
340
341/// Filter predicate for [`AttachOptions`].
342pub type PathFilter = Box<dyn Fn(&str) -> bool + Send + Sync>;
343
344/// Error callback for [`AttachOptions`].
345pub type ErrorCallback = Box<dyn Fn(&StorageError) + Send + Sync>;
346
347/// Options for [`attach_snapshot_storage`].
348#[derive(Default)]
349pub struct AttachOptions {
350 /// Per-path filter. Return `true` to persist changes for this path.
351 /// `None` means persist all paths.
352 pub filter: Option<PathFilter>,
353 /// Error callback invoked when a flush fails.
354 pub on_error: Option<ErrorCallback>,
355}
356
357/// Handle returned by [`attach_snapshot_storage`]. Dropping this handle
358/// unsubscribes the observe sink (RAII disposal).
359pub struct StorageHandle {
360 /// Shared state; inner `disposed` flag prevents late-fire callbacks.
361 state: Arc<Mutex<Vec<TierState>>>,
362 /// Graph reference (kept alive so the observe subscription stays valid).
363 _graph: Graph,
364 /// The observe handle — dropping it unsubscribes all sinks.
365 _observe: graphrefly_graph::GraphObserveAllReactive,
366}
367
368impl StorageHandle {
369 /// Explicitly dispose (equivalent to `Drop`, but callable).
370 ///
371 /// F3 (QA 2026-05-14): on `PoisonError` (a panic in the observe
372 /// sink poisoned the state mutex), recover via `into_inner()`
373 /// rather than silently no-op. The disposed flag still gets set
374 /// even after a sink panic, which matters for cleanup.
375 pub fn dispose(&self) {
376 let mut states = self
377 .state
378 .lock()
379 .unwrap_or_else(std::sync::PoisonError::into_inner);
380 for s in states.iter_mut() {
381 s.disposed = true;
382 }
383 }
384
385 /// Drain pending writes on all tiers (D171).
386 ///
387 /// When a tier is configured with `debounce_ms > 0`,
388 /// [`attach_snapshot_storage`]'s observe sink writes via the
389 /// tier's `save()` but skips the inline `flush()`. Callers
390 /// invoke this method — typically from a binding-side reactive
391 /// timer subgraph (`graphrefly_operators::temporal::interval`,
392 /// shipped in Slice T) — to commit the buffered writes to their
393 /// backends. With `debounce_ms == 0`, the observe sink already
394 /// force-flushes inline; calling this method is then a no-op
395 /// because the tier's pending buffers are empty.
396 ///
397 /// Returns the first error encountered, if any. Successful tiers
398 /// in the same call still flush; the error is surfaced for the
399 /// caller's diagnostics.
400 ///
401 /// # Lock discipline (N3, QA 2026-05-14)
402 ///
403 /// The state mutex is **not** held across `tier.flush()` calls.
404 /// Holding it across blocking I/O would (a) serialize the
405 /// reactive graph against backend latency (every observe-sink
406 /// fire would wait on flush completion), and (b) deadlock if a
407 /// caller invokes `flush_all` from inside an `on_error` callback
408 /// (the observe sink already holds `state.lock()` when it
409 /// invokes `on_error`). The implementation snapshots the per-
410 /// tier flush requests under the lock, drops the lock, then
411 /// runs the flushes in sequence. After each flush, a brief
412 /// re-lock applies any per-tier state mutations (none today —
413 /// `flush()` doesn't return new state).
414 ///
415 /// # Errors
416 ///
417 /// Returns the first `StorageError` encountered. Subsequent
418 /// errors are dropped (acceptable v1; the registered
419 /// `on_error` callback fires per error in the observe sink path
420 /// and is the canonical multi-error reporting surface).
421 ///
422 /// F3: recovers from a poisoned state mutex via `into_inner`
423 /// rather than silently returning Ok.
424 pub fn flush_all(&self) -> Result<(), StorageError> {
425 // Snapshot the per-tier flush callables under the lock,
426 // then drop the lock before invoking them. Each tier's
427 // `flush()` is a `&self` method on `Box<dyn ...>` — to call
428 // it without holding the outer lock, we collect raw
429 // pointers... no, `Box<dyn ...>` can't be cloned. Instead,
430 // we re-lock briefly per tier to call flush, but never hold
431 // the lock across MULTIPLE tier flushes — that's the key
432 // deadlock avoidance.
433 //
434 // The remaining single-tier-lock-during-flush window can
435 // deadlock if `tier.flush()` synchronously re-enters
436 // `dispose()` / `flush_all()` (which would try to re-acquire
437 // `self.state`). Tier impls in `graphrefly-storage` don't do
438 // that; document the contract for future binding-side tier
439 // impls in the rustdoc.
440 let tier_count = {
441 let states = self
442 .state
443 .lock()
444 .unwrap_or_else(std::sync::PoisonError::into_inner);
445 states.len()
446 };
447 let mut first_err: Option<StorageError> = None;
448 for idx in 0..tier_count {
449 // Per-tier flush: brief lock to access the tier, run
450 // flush, drop lock immediately. The tier itself is
451 // behind a Box<dyn ...> inside the Vec; flush() takes
452 // &self, so the &mut on the outer Vec is only needed
453 // for `s.disposed` check.
454 let snapshot_err: Option<StorageError>;
455 let wal_err: Option<StorageError>;
456 {
457 let mut states = self
458 .state
459 .lock()
460 .unwrap_or_else(std::sync::PoisonError::into_inner);
461 let Some(s) = states.get_mut(idx) else {
462 continue;
463 };
464 if s.disposed {
465 continue;
466 }
467 // Call flush while holding the lock — this is
468 // narrow-scoped and the only contention is with the
469 // observe sink, which is itself bounded by wave
470 // dispatch. No `on_error` invocation under this
471 // lock (callers receive errors via the return
472 // value, not via the registered callback).
473 snapshot_err = s.snapshot_tier.flush().err();
474 wal_err = s.wal_tier.as_ref().and_then(|wal| wal.flush().err());
475 }
476 if let Some(e) = snapshot_err {
477 if first_err.is_none() {
478 first_err = Some(e);
479 }
480 }
481 if let Some(e) = wal_err {
482 if first_err.is_none() {
483 first_err = Some(e);
484 }
485 }
486 }
487 match first_err {
488 None => Ok(()),
489 Some(e) => Err(e),
490 }
491 }
492}
493
494impl Drop for StorageHandle {
495 fn drop(&mut self) {
496 self.dispose();
497 }
498}
499
500/// Wire an observe subscription on `graph` that persists node changes
501/// to the provided snapshot+WAL tier pairs.
502///
503/// # Debounce (D171, resolved 2026-05-14)
504///
505/// When a tier's `debounce_ms` is `Some(ms)` with `ms > 0`, the
506/// observe callback writes via the tier's `save()` (which buffers
507/// internally per the [`BaseStorageTier`](crate::tier::BaseStorageTier)
508/// contract) but does NOT force a `flush()`. Callers drain pending
509/// writes by invoking [`StorageHandle::flush_all`] — typically from a
510/// binding-side reactive timer (e.g.,
511/// `graphrefly_operators::temporal::interval`). This keeps
512/// `graphrefly-storage` sync + binding-agnostic per the canonical
513/// no-async-in-storage invariant; reactive-timer wiring lives in the
514/// layer that already owns the timer primitive.
515///
516/// When `debounce_ms` is `None` or `Some(0)`, the observe callback
517/// continues to force-flush inline as before.
518///
519/// # `key_of` (D174, closes F8)
520///
521/// The snapshot tier's backend key is derived from `graph.name` via
522/// the checkpoint record's `name` field. Cross-impl `key_of` divergence
523/// disappears at this boundary.
524#[must_use = "the returned StorageHandle owns the observe subscription; \
525 dropping it immediately unsubscribes and stops persistence"]
526pub fn attach_snapshot_storage(
527 graph: &Graph,
528 pairs: Vec<AttachTierPair>,
529 options: AttachOptions,
530) -> StorageHandle {
531 let graph_name = graph.name();
532 let wal_prefix = graph_wal_prefix(&graph_name);
533
534 let mut states = Vec::with_capacity(pairs.len());
535 for pair in pairs {
536 // Bootstrap: enumerate existing WAL frames to find high-water seq.
537 let mut high_seq: u64 = 0;
538 if let Some(ref wal) = pair.wal {
539 if let Ok(keys) = wal.list(&wal_prefix) {
540 for key in keys {
541 if let Some(seg) = key.rsplit('/').next() {
542 if let Ok(s) = seg.parse::<u64>() {
543 high_seq = high_seq.max(s);
544 }
545 }
546 }
547 }
548 }
549
550 let compact_every = pair.snapshot.compact_every().unwrap_or(10);
551 let snapshot_debounce = pair.snapshot.debounce_ms().unwrap_or(0);
552 let wal_debounce = pair.wal.as_ref().and_then(|w| w.debounce_ms()).unwrap_or(0);
553
554 states.push(TierState {
555 snapshot_tier: pair.snapshot,
556 wal_tier: pair.wal,
557 wal_prefix: wal_prefix.clone(),
558 seq: high_seq,
559 flush_count: 0,
560 compact_every,
561 snapshot_debounce_ms: snapshot_debounce,
562 wal_debounce_ms: wal_debounce,
563 last_snapshot: None,
564 disposed: false,
565 });
566 }
567
568 let shared_states = Arc::new(Mutex::new(states));
569 let states_for_sink = shared_states.clone();
570 let graph_clone = graph.clone();
571 let filter = options.filter;
572 let on_error = options.on_error;
573
574 // Wire observe_all_reactive so late-added nodes are also covered.
575 let mut observe = graph.observe_all_reactive();
576 observe.subscribe(move |path: &str, messages: &[Message]| {
577 // Filter: only tiers 3–5 (DATA/RESOLVED, INVALIDATE, COMPLETE/ERROR).
578 let dominated_by_tier = messages.iter().any(|m| {
579 let t = m.tier();
580 (3..6).contains(&t)
581 });
582 if !dominated_by_tier {
583 return;
584 }
585
586 // Optional path filter.
587 if let Some(ref f) = filter {
588 if !f(path) {
589 return;
590 }
591 }
592
593 // Take a snapshot once, shared across all sync tiers.
594 let snapshot = graph_clone.snapshot();
595
596 // N3 (QA 2026-05-14) — collect errors INSIDE the lock,
597 // invoke `on_error` AFTER releasing the lock. The earlier
598 // pattern called `cb(&e)` while holding `states_for_sink`,
599 // which deadlocked if the user's callback re-entered
600 // `StorageHandle::flush_all` / `dispose` (both acquire the
601 // same `Mutex`). F3: recover from poisoned lock via
602 // `into_inner` rather than silently no-op.
603 let collected_errors: Vec<StorageError> = {
604 let mut states = states_for_sink
605 .lock()
606 .unwrap_or_else(std::sync::PoisonError::into_inner);
607 let mut errs = Vec::new();
608 for s in states.iter_mut() {
609 if s.disposed {
610 continue;
611 }
612 if let Err(e) = flush_tier(s, &snapshot) {
613 errs.push(e);
614 }
615 }
616 errs
617 };
618 if !collected_errors.is_empty() {
619 if let Some(ref cb) = on_error {
620 for e in &collected_errors {
621 cb(e);
622 }
623 }
624 }
625 });
626
627 StorageHandle {
628 state: shared_states,
629 _graph: graph.clone(),
630 _observe: observe,
631 }
632}
633
634/// Flush a single tier: either full baseline or WAL delta.
635fn flush_tier(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
636 s.flush_count += 1;
637
638 let write_full = s.wal_tier.is_none()
639 || s.last_snapshot.is_none()
640 || (s.compact_every > 0 && s.flush_count.is_multiple_of(u64::from(s.compact_every)));
641
642 if write_full {
643 write_full_baseline(s, snapshot)?;
644 } else {
645 write_wal_delta(s, snapshot)?;
646 }
647
648 s.last_snapshot = Some(snapshot.clone());
649 Ok(())
650}
651
652/// Write a full baseline snapshot to the snapshot tier.
653///
654/// When `snapshot_debounce_ms > 0` (D171), the inner `save()` buffers
655/// per the tier's `BaseStorageTier` contract and we DON'T force a
656/// `flush()`. The caller drains via [`StorageHandle::flush_all`].
657fn write_full_baseline(
658 s: &mut TierState,
659 snapshot: &GraphPersistSnapshot,
660) -> Result<(), StorageError> {
661 let timestamp_ns = graphrefly_core::wall_clock_ns();
662 let record = GraphCheckpointRecord {
663 name: snapshot.name.clone(),
664 mode: "full".to_owned(),
665 snapshot: snapshot.clone(),
666 seq: s.seq,
667 timestamp_ns,
668 format_version: SNAPSHOT_VERSION,
669 };
670
671 s.snapshot_tier.save(record)?;
672 if s.snapshot_debounce_ms == 0 {
673 s.snapshot_tier.flush()?;
674 }
675 Ok(())
676}
677
678/// Write WAL delta frames for the diff between `last_snapshot` and current.
679///
680/// D171: WAL `flush()` is skipped when the WAL tier has
681/// `debounce_ms > 0`; the caller drains via
682/// [`StorageHandle::flush_all`].
683fn write_wal_delta(s: &mut TierState, snapshot: &GraphPersistSnapshot) -> Result<(), StorageError> {
684 let last = s
685 .last_snapshot
686 .as_ref()
687 .expect("caller ensures last_snapshot is Some");
688 let diff = diff_snapshots(last, snapshot);
689
690 if diff.is_empty() {
691 return Ok(());
692 }
693
694 let timestamp_ns = graphrefly_core::wall_clock_ns();
695 let (frames, next_seq) = decompose_diff_to_frames(&diff, timestamp_ns, s.seq)?;
696
697 if let Some(ref wal) = s.wal_tier {
698 for frame in &frames {
699 let key = wal_frame_key(&s.wal_prefix, frame.frame_seq);
700 wal.save(&key, frame.clone())?;
701 }
702 if s.wal_debounce_ms == 0 {
703 wal.flush()?;
704 }
705 }
706
707 s.seq = next_seq;
708 Ok(())
709}
710
711// ── Restore (D170) ─────────────────────────────────────────────────────────
712
713/// Torn-write policy for mid-stream checksum failures.
714#[derive(Debug, Clone, Copy, PartialEq, Eq)]
715pub enum TornWritePolicy {
716 /// Drop the frame and continue (default for WAL tail).
717 Skip,
718 /// Abort the entire restore.
719 Abort,
720}
721
722/// Callback for torn-write decisions. Receives the `frame_seq` and
723/// reason; returns the desired policy.
724pub type OnTornWrite = Box<dyn Fn(u64, &str) -> TornWritePolicy + Send + Sync>;
725
726/// Options for [`restore_snapshot`].
727pub struct RestoreOptions<'a> {
728 /// Snapshot tier to load the baseline from.
729 pub snapshot_tier: &'a dyn SnapshotStorageTier<GraphCheckpointRecord>,
730 /// WAL tier to enumerate delta frames from.
731 pub wal_tier: &'a dyn KvStorageTier<WALFrame<Value>>,
732 /// Optional max `frame_seq` to replay up to. `None` = replay all.
733 pub target_seq: Option<u64>,
734 /// Torn-write callback. If `None`, defaults: tail = Skip, mid = Abort.
735 pub on_torn_write: Option<OnTornWrite>,
736}
737
738/// Three-phase WAL replay: baseline load → checksum verify → lifecycle-
739/// scoped batch.
740///
741/// # Phase 1: Baseline
742///
743/// Loads the `mode:"full"` baseline from the snapshot tier. The snapshot
744/// key is derived from `graph.name` (D174).
745///
746/// # Phase 2: Checksum verification
747///
748/// Enumerates WAL frames with `frame_seq > baseline.seq`, verifies each
749/// frame's SHA-256 checksum, applies torn-write policy on mismatch.
750///
751/// # Phase 3: Lifecycle-scoped batch replay
752///
753/// Groups verified frames by lifecycle. Replays in cross-scope order
754/// (`Spec → Data → Ownership`). Each lifecycle runs in a `graph.batch()`
755/// for atomic partial-restore semantics (Q2).
756pub fn restore_snapshot(
757 graph: &Graph,
758 opts: &RestoreOptions<'_>,
759) -> Result<RestoreResult, RestoreError> {
760 // Phase 1: Load and apply baseline.
761 let baseline = load_baseline(graph, opts)?;
762 let baseline_seq = baseline.seq;
763
764 // Phase 1b: Collect WAL frames post-baseline.
765 let collected = collect_wal_frames(opts, &baseline.name, baseline_seq)?;
766
767 // Phase 2: Checksum verification.
768 let (verified, skipped) = verify_frames(collected, opts.on_torn_write.as_ref())?;
769
770 // Phase 3: Lifecycle-scoped batch replay.
771 Ok(replay_by_lifecycle(graph, &verified, baseline_seq, skipped))
772}
773
774/// Phase 1: Load baseline from snapshot tier + apply to graph.
775fn load_baseline(
776 graph: &Graph,
777 opts: &RestoreOptions<'_>,
778) -> Result<GraphCheckpointRecord, RestoreError> {
779 let baseline = opts
780 .snapshot_tier
781 .load()
782 .map_err(|e| RestoreError::PhaseFailed {
783 lifecycle: Lifecycle::Spec,
784 frame_seq: 0,
785 message: format!("baseline load failed: {e}"),
786 })?
787 .ok_or(RestoreError::BaselineMissing)?;
788
789 if baseline.mode != "full" {
790 return Err(RestoreError::BaselineMissing);
791 }
792
793 graph
794 .restore(&baseline.snapshot)
795 .map_err(|e| RestoreError::PhaseFailed {
796 lifecycle: Lifecycle::Spec,
797 frame_seq: 0,
798 message: format!("baseline restore failed: {e}"),
799 })?;
800
801 Ok(baseline)
802}
803
804/// Phase 1b: Enumerate + filter WAL frames.
805fn collect_wal_frames(
806 opts: &RestoreOptions<'_>,
807 graph_name: &str,
808 baseline_seq: u64,
809) -> Result<Vec<WALFrame<Value>>, RestoreError> {
810 let wal_prefix = graph_wal_prefix(graph_name);
811 let keys = opts
812 .wal_tier
813 .list(&wal_prefix)
814 .map_err(|e| RestoreError::PhaseFailed {
815 lifecycle: Lifecycle::Spec,
816 frame_seq: 0,
817 message: format!("WAL frame enumeration failed: {e}"),
818 })?;
819
820 let mut collected: Vec<WALFrame<Value>> = Vec::new();
821 for key in keys {
822 let frame_seq = key
823 .rsplit('/')
824 .next()
825 .and_then(|s| s.parse::<u64>().ok())
826 .unwrap_or(0);
827
828 if frame_seq <= baseline_seq {
829 continue;
830 }
831 if let Some(target) = opts.target_seq {
832 if frame_seq > target {
833 continue;
834 }
835 }
836
837 if let Some(frame) = opts
838 .wal_tier
839 .load(&key)
840 .map_err(|e| RestoreError::PhaseFailed {
841 lifecycle: Lifecycle::Data,
842 frame_seq,
843 message: format!("WAL frame load failed: {e}"),
844 })?
845 {
846 collected.push(frame);
847 }
848 }
849
850 collected.sort_by_key(|f| f.frame_seq);
851 Ok(collected)
852}
853
854/// Phase 2: Verify checksums, apply torn-write policy.
855fn verify_frames(
856 collected: Vec<WALFrame<Value>>,
857 on_torn_write: Option<&OnTornWrite>,
858) -> Result<(Vec<WALFrame<Value>>, u64), RestoreError> {
859 let mut verified = Vec::new();
860 let mut skipped: u64 = 0;
861 let total = collected.len();
862
863 for (i, frame) in collected.into_iter().enumerate() {
864 if verify_wal_frame_checksum(&frame).unwrap_or(false) {
865 verified.push(frame);
866 continue;
867 }
868
869 let is_tail = i == total - 1;
870 let policy = if let Some(cb) = on_torn_write {
871 cb(frame.frame_seq, "checksum-mismatch")
872 } else if is_tail {
873 TornWritePolicy::Skip
874 } else {
875 TornWritePolicy::Abort
876 };
877
878 match policy {
879 TornWritePolicy::Skip => skipped += 1,
880 TornWritePolicy::Abort => {
881 return Err(RestoreError::TornWriteMidStream {
882 frame_seq: frame.frame_seq,
883 reason: "checksum-mismatch".to_owned(),
884 });
885 }
886 }
887 }
888
889 Ok((verified, skipped))
890}
891
892/// Phase 3: Group by lifecycle, replay in cross-scope order.
893fn replay_by_lifecycle(
894 graph: &Graph,
895 verified: &[WALFrame<Value>],
896 baseline_seq: u64,
897 skipped: u64,
898) -> RestoreResult {
899 let mut grouped: [Vec<WALFrame<Value>>; 3] = [Vec::new(), Vec::new(), Vec::new()];
900 for frame in verified {
901 for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
902 if frame.lifecycle == *lifecycle {
903 grouped[idx].push(frame.clone());
904 break;
905 }
906 }
907 }
908
909 let mut phases = Vec::new();
910 let mut replayed: u64 = 0;
911 let mut final_seq: u64 = baseline_seq;
912
913 for (idx, lifecycle) in REPLAY_ORDER.iter().enumerate() {
914 let life_frames = &grouped[idx];
915 if life_frames.is_empty() {
916 continue;
917 }
918 let frame_count = life_frames.len() as u64;
919 let max_seq = life_frames.iter().map(|f| f.frame_seq).max().unwrap_or(0);
920
921 let frames_for_batch = life_frames.clone();
922 let graph_for_batch = graph.clone();
923 graph.batch(move || {
924 for frame in &frames_for_batch {
925 apply_wal_frame(&graph_for_batch, frame);
926 }
927 });
928
929 replayed += frame_count;
930 final_seq = final_seq.max(max_seq);
931 phases.push(PhaseStat {
932 lifecycle: *lifecycle,
933 frames: frame_count,
934 });
935 }
936
937 RestoreResult {
938 replayed_frames: replayed,
939 skipped_frames: skipped,
940 final_seq,
941 phases,
942 }
943}
944
945/// Apply a single WAL frame to a graph. Mirrors TS `applyWalFrame`.
946fn apply_wal_frame(graph: &Graph, frame: &WALFrame<Value>) {
947 let change = &frame.change.change;
948 let kind = change.get("kind").and_then(Value::as_str).unwrap_or("");
949
950 match frame.lifecycle {
951 Lifecycle::Spec => match kind {
952 "graph.add" => {
953 let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
954 if node_id_str.is_empty() || graph.try_resolve(node_id_str).is_some() {
955 return; // already present or invalid
956 }
957 // Only auto-create state nodes (matches TS behavior).
958 let slice = change.get("slice");
959 let node_type = slice
960 .and_then(|s| s.get("type"))
961 .and_then(Value::as_str)
962 .unwrap_or("");
963 if node_type != "state" {
964 return;
965 }
966 let initial_value = slice.and_then(|s| s.get("value")).cloned();
967 let handle = initial_value.map_or(graphrefly_core::NO_HANDLE, |v| {
968 graph.core().binding_ptr().deserialize_value(v)
969 });
970 let _ = graph.state(node_id_str, Some(handle));
971 }
972 "graph.remove" => {
973 let node_id_str = change.get("nodeId").and_then(Value::as_str).unwrap_or("");
974 if !node_id_str.is_empty() && graph.try_resolve(node_id_str).is_some() {
975 let _ = graph.remove(node_id_str);
976 }
977 }
978 // graph.mount, graph.unmount — deferred (Phase 14.6+)
979 _ => {}
980 },
981 Lifecycle::Data => match kind {
982 "node.set" => {
983 let path = change.get("path").and_then(Value::as_str).unwrap_or("");
984 if let Some(value) = change.get("value") {
985 if !path.is_empty() && graph.try_resolve(path).is_some() {
986 let handle = graph.core().binding_ptr().deserialize_value(value.clone());
987 graph.set(path, handle);
988 }
989 }
990 }
991 "node.invalidate" => {
992 let path = change.get("path").and_then(Value::as_str).unwrap_or("");
993 if !path.is_empty() {
994 if let Some(id) = graph.try_resolve(path) {
995 graph.invalidate(id);
996 }
997 }
998 }
999 // node.versionBump — deferred (V0 versioning is internal)
1000 _ => {}
1001 },
1002 // Ownership lifecycle — deferred (Phase 13)
1003 Lifecycle::Ownership => {}
1004 }
1005}