Skip to main content

graphrefly_graph/
snapshot.rs

1//! `snapshot()` / `restore()` / `Graph::from_snapshot()` — portable
2//! serialization of graph state (M4.E1, R3.8).
3//!
4//! D246: `snapshot`/`restore`/`from_snapshot` are inherent [`Graph`]
5//! methods over the Core-free namespace tree, taking the embedder's
6//! `&Core` explicitly (D246 rule 2). `snapshot_of` is generic over
7//! `&dyn CoreFull` (the one facade) so the storage in-wave
8//! `MailboxOp::Defer` observe-sink can run it (read-only;
9//! `serialize_handle` delegates to the binding). No `SubgraphRef`/
10//! `GraphOps`/`SnapshotOps` — one `Graph`, plain free fns.
11//!
12//! # Handle-protocol boundary
13//!
14//! `snapshot()` calls `BindingBoundary::serialize_handle`; `restore()`
15//! / `from_snapshot()` call `BindingBoundary::deserialize_value`.
16//! Per D169 edges are omitted (derived from deps via `edges()`).
17
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::rc::Rc;
21use std::sync::Arc;
22
23use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
24use indexmap::IndexMap;
25use serde::{Deserialize, Serialize};
26
27use crate::graph::{resolve_checked, Graph, GraphInner, PATH_SEP};
28
29/// Portable snapshot of a graph's state.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct GraphPersistSnapshot {
32    /// Graph name as set at construction / mount.
33    pub name: String,
34    /// Per-node state by local name, in namespace insertion order.
35    pub nodes: IndexMap<String, NodeSlice>,
36    /// Mounted subgraph snapshots, keyed by mount name.
37    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
38    pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
39}
40
41/// Per-node state within a snapshot.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeSlice {
44    /// `"state"` / `"derived"` / `"dynamic"` / `"producer"` / `"operator"`.
45    #[serde(rename = "type")]
46    pub node_type: String,
47    /// Serialized cache value. `None` when sentinel.
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub value: Option<serde_json::Value>,
50    /// Node lifecycle status.
51    pub status: NodeSnapshotStatus,
52    /// Dependency names in declaration order.
53    #[serde(default, skip_serializing_if = "Vec::is_empty")]
54    pub deps: Vec<String>,
55}
56
57/// Lifecycle status stored in a snapshot.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(rename_all = "lowercase")]
60pub enum NodeSnapshotStatus {
61    /// Never emitted DATA.
62    Sentinel,
63    /// Has emitted at least one DATA.
64    Live,
65    /// Terminal: COMPLETE.
66    Completed,
67    /// Terminal: ERROR (carries the serialized error value).
68    Errored {
69        #[serde(default, skip_serializing_if = "Option::is_none")]
70        error: Option<serde_json::Value>,
71    },
72}
73
74/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
75#[derive(Debug, thiserror::Error)]
76pub enum SnapshotError {
77    #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
78    NameMismatch { expected: String, actual: String },
79    #[error("node `{0}` in snapshot not found in graph namespace")]
80    UnknownNode(String),
81    #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
82    UnknownSubgraph(String),
83    #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
84    UnresolvableDeps(String, Vec<String>),
85    #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
86    MissingFactory(String, String),
87    /// D279 (2026-05-22, E-ii.1): a state node in the snapshot collides
88    /// with an existing child mount name on the owner graph at decode
89    /// time. Raised by Pass 1's pre-validation BEFORE any Core mutation
90    /// — prevents the orphan-`NodeId` leak that pre-D279 occurred when
91    /// `Graph::state` registered a `NodeId` before the namespace `add`
92    /// returned `NameError::Collision`. `graph_path` is the owner
93    /// graph's tree-relative path (empty string for the root).
94    #[error("snapshot decode: state node `{name}` at graph `{graph_path}` collides with an existing child mount of the same name")]
95    NameCollision { name: String, graph_path: String },
96}
97
98/// Factory for auto-hydration mode. D246: receives the embedder's
99/// `&Core` + the Core-free [`Graph`] handle.
100pub type NodeFactory =
101    Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
102
103/// Builder function for `Graph::from_snapshot` builder mode (D246:
104/// `&Core` + Core-free [`Graph`]).
105pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
106
107/// D246: recursive snapshot over `(&dyn CoreFull, &Rc<RefCell<GraphInner>>)`
108/// — `&dyn CoreFull` (the one facade) so the storage in-wave
109/// `MailboxOp::Defer` observe-sink can run it (read-only;
110/// `serialize_handle` delegates to the binding).
111///
112/// **D276 (cross-mount deps):** the encoder pre-computes a tree-wide
113/// `id_to_tree_path: HashMap<NodeId, String>` covering every named
114/// node in the snapshot tree. For each dep the encoder emits an
115/// **owner-relative path** using [`PATH_SEP`] (`"::"`) and `".."`
116/// segments — the same syntax accepted by [`Graph::try_resolve`]:
117///
118/// - same-graph dep → bare local name (back-compat with snapshots
119///   produced by callers that never crossed a mount boundary).
120/// - cross-mount dep down → `"child::name"` / `"child::nested::name"`.
121/// - cross-mount dep up → `"..::name"`, `"..::..::name"`, …
122/// - cross-mount sibling → `"..::sibling::name"`.
123///
124/// Pre-D276 the encoder fell through to `"_anon_<rawid>"` for any
125/// dep whose `NodeId` wasn't in the LOCAL graph's `names` map,
126/// destroying every cross-mount reference at serialization time.
127/// The new owner-relative encoding round-trips through
128/// `Graph::from_snapshot`'s tree-wide hydration; the decoder
129/// resolves dep names via [`Graph::try_resolve`] on the owner graph,
130/// reusing Slice V3's cross-subgraph path machinery.
131pub(crate) fn snapshot_of(
132    core: &dyn CoreFull,
133    inner_arc: &Rc<RefCell<GraphInner>>,
134) -> GraphPersistSnapshot {
135    let id_to_tree_path = build_id_to_tree_path(inner_arc);
136    snapshot_of_with_tree_paths(core, inner_arc, &id_to_tree_path, "")
137}
138
139/// D276: walk the entire mount tree under `root` and build a
140/// `NodeId → absolute_path` map. Absolute paths are **relative to
141/// the snapshot root** (no leading root name) and use [`PATH_SEP`]
142/// (`"::"`) — the same syntax accepted by [`Graph::try_resolve`].
143fn build_id_to_tree_path(root: &Rc<RefCell<GraphInner>>) -> HashMap<NodeId, String> {
144    let mut map = HashMap::new();
145    walk_tree_paths(root, "", &mut map);
146    map
147}
148
149fn walk_tree_paths(
150    inner_arc: &Rc<RefCell<GraphInner>>,
151    path_prefix: &str,
152    out: &mut HashMap<NodeId, String>,
153) {
154    let inner = inner_arc.borrow_mut();
155    let names: Vec<(String, NodeId)> = inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
156    let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
157        .children
158        .iter()
159        .map(|(n, g)| (n.clone(), g.clone()))
160        .collect();
161    drop(inner);
162    for (name, id) in &names {
163        let abs_path = if path_prefix.is_empty() {
164            name.clone()
165        } else {
166            format!("{path_prefix}{PATH_SEP}{name}")
167        };
168        out.insert(*id, abs_path);
169    }
170    for (child_name, child_inner) in &children {
171        let child_prefix = if path_prefix.is_empty() {
172            child_name.clone()
173        } else {
174            format!("{path_prefix}{PATH_SEP}{child_name}")
175        };
176        walk_tree_paths(child_inner, &child_prefix, out);
177    }
178}
179
180/// D276 helper — convert a dep's absolute path to a path relative
181/// to the owner graph. Same-graph deps collapse to a bare local
182/// name (back-compat); cross-mount deps use `".."`/`"name"`
183/// segments separated by [`PATH_SEP`].
184///
185/// Examples (`owner_path` → `abs_path` → result):
186///
187/// - `""` → `"a"` → `"a"` (root local)
188/// - `"child"` → `"child::b"` → `"b"` (same-graph local)
189/// - `""` → `"child::b"` → `"child::b"` (descend)
190/// - `"child"` → `"a"` → `"..::a"` (ascend to root)
191/// - `"child::nested"` → `"a"` → `"..::..::a"` (ascend two)
192/// - `"child::a"` → `"other::b"` → `"..::other::b"` (sibling)
193fn absolute_to_owner_relative(owner_path: &str, abs_path: &str) -> String {
194    // /qa G2.7 (2026-05-22): self-dep guard. graphrefly rejects self-deps at
195    // registration (`SetDepsError::SelfDep`), so a healthy `Core::deps_of`
196    // never yields the owning node's own id. If a snapshot is hand-
197    // constructed pathologically OR an operator-internal NodeId is reused
198    // structurally as both owner and dep, computing `owner_path == abs_path`
199    // here would emit `""` — which `Graph::try_resolve("")` rejects with
200    // `PathError::Empty`, eventually surfacing as `UnresolvableDeps` from
201    // the decode retry loop. Catch it loudly at encode in debug builds.
202    debug_assert_ne!(
203        owner_path, abs_path,
204        "D276 invariant: self-deps are rejected at registration; \
205         encoding a dep whose absolute path equals the owner's would emit \
206         an empty relative path"
207    );
208    let owner_segs: Vec<&str> = if owner_path.is_empty() {
209        Vec::new()
210    } else {
211        owner_path.split(PATH_SEP).collect()
212    };
213    let abs_segs: Vec<&str> = if abs_path.is_empty() {
214        Vec::new()
215    } else {
216        abs_path.split(PATH_SEP).collect()
217    };
218    let mut common = 0;
219    while common < owner_segs.len()
220        && common < abs_segs.len()
221        && owner_segs[common] == abs_segs[common]
222    {
223        common += 1;
224    }
225    let up_count = owner_segs.len() - common;
226    let down_segs = &abs_segs[common..];
227    if up_count == 0 {
228        return down_segs.join(PATH_SEP);
229    }
230    let mut parts: Vec<&str> = vec![".."; up_count];
231    parts.extend(down_segs);
232    parts.join(PATH_SEP)
233}
234
235fn snapshot_of_with_tree_paths(
236    core: &dyn CoreFull,
237    inner_arc: &Rc<RefCell<GraphInner>>,
238    id_to_tree_path: &HashMap<NodeId, String>,
239    owner_path: &str,
240) -> GraphPersistSnapshot {
241    let (name, node_entries, children, id_to_name) = {
242        let inner = inner_arc.borrow_mut();
243        let name = inner.name.clone();
244        let node_entries: Vec<(String, NodeId)> =
245            inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
246        let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
247            .children
248            .iter()
249            .map(|(n, g)| (n.clone(), g.clone()))
250            .collect();
251        let id_to_name: IndexMap<NodeId, String> =
252            inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
253        (name, node_entries, children, id_to_name)
254    };
255
256    let mut nodes = IndexMap::new();
257
258    for (node_name, node_id) in &node_entries {
259        let kind = core.kind_of(*node_id);
260        let node_type = match kind {
261            Some(NodeKind::State) => "state",
262            Some(NodeKind::Derived) => "derived",
263            Some(NodeKind::Dynamic) => "dynamic",
264            Some(NodeKind::Producer) => "producer",
265            Some(NodeKind::Operator(_)) => "operator",
266            None => "unknown",
267        };
268
269        let cache = core.cache_of(*node_id);
270        let value = if cache == NO_HANDLE {
271            None
272        } else {
273            core.serialize_handle(cache)
274        };
275
276        let terminal = core.is_terminal(*node_id);
277        let status = match terminal {
278            Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
279            Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
280                error: core.serialize_handle(err_handle),
281            },
282            None => {
283                if core.has_fired_once(*node_id) || cache != NO_HANDLE {
284                    NodeSnapshotStatus::Live
285                } else {
286                    NodeSnapshotStatus::Sentinel
287                }
288            }
289        };
290
291        // D276: dep-name encoding — 3 tiers, all owner-relative:
292        // (1) same-graph dep → bare local name (back-compat — pre-D276
293        //     snapshots used this shape exclusively).
294        // (2) cross-mount dep that IS named somewhere in the snapshot
295        //     tree → owner-relative path via [`PATH_SEP`] + `".."`
296        //     segments (resolves via `Graph::try_resolve` on decode).
297        // (3) anonymous dep (operator-internal NodeId with no name in
298        //     ANY graph) → `_anon_<rawid>` fallback (pre-D276 behavior,
299        //     unchanged; decode still fails with `UnresolvableDeps`
300        //     for these — not in M4.E1 scope).
301        let dep_ids = core.deps_of(*node_id);
302        let deps: Vec<String> = dep_ids
303            .iter()
304            .map(|dep_id| {
305                if let Some(local_name) = id_to_name.get(dep_id) {
306                    local_name.clone()
307                } else if let Some(tree_path) = id_to_tree_path.get(dep_id) {
308                    absolute_to_owner_relative(owner_path, tree_path)
309                } else {
310                    format!("_anon_{}", dep_id.raw())
311                }
312            })
313            .collect();
314
315        nodes.insert(
316            node_name.clone(),
317            NodeSlice {
318                node_type: node_type.to_owned(),
319                value,
320                status,
321                deps,
322            },
323        );
324    }
325
326    let mut subgraphs = IndexMap::new();
327    for (child_name, child_inner) in children {
328        let child_owner_path = if owner_path.is_empty() {
329            child_name.clone()
330        } else {
331            format!("{owner_path}{PATH_SEP}{child_name}")
332        };
333        subgraphs.insert(
334            child_name,
335            snapshot_of_with_tree_paths(core, &child_inner, id_to_tree_path, &child_owner_path),
336        );
337    }
338
339    GraphPersistSnapshot {
340        name,
341        nodes,
342        subgraphs,
343    }
344}
345
346/// Recursive restore over `(&Core, &Rc<RefCell<GraphInner>>)`.
347fn restore_into(
348    core: &Core,
349    inner_arc: &Rc<RefCell<GraphInner>>,
350    snapshot: &GraphPersistSnapshot,
351) -> Result<(), SnapshotError> {
352    let graph_name = inner_arc.borrow_mut().name.clone();
353    if snapshot.name != graph_name {
354        return Err(SnapshotError::NameMismatch {
355            expected: snapshot.name.clone(),
356            actual: graph_name,
357        });
358    }
359
360    let binding = core.binding_ptr();
361
362    for (node_name, slice) in &snapshot.nodes {
363        let node_id = resolve_checked(inner_arc, node_name)
364            .ok()
365            .flatten()
366            .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
367
368        if slice.node_type == "state" {
369            if let Some(ref value) = slice.value {
370                let handle = binding.deserialize_value(value.clone());
371                core.emit(node_id, handle);
372            }
373        }
374
375        match &slice.status {
376            NodeSnapshotStatus::Completed => {
377                core.complete(node_id);
378            }
379            NodeSnapshotStatus::Errored { error } => {
380                if let Some(err_val) = error {
381                    let err_handle = binding.deserialize_value(err_val.clone());
382                    core.error(node_id, err_handle);
383                }
384            }
385            NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
386        }
387    }
388
389    let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
390        let inner = inner_arc.borrow_mut();
391        snapshot
392            .subgraphs
393            .keys()
394            .map(|name| {
395                let child = inner
396                    .children
397                    .get(name)
398                    .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
399                Ok((name.clone(), child.clone()))
400            })
401            .collect::<Result<Vec<_>, SnapshotError>>()?
402    };
403    for (child_name, child_inner) in child_pairs {
404        restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
405    }
406
407    Ok(())
408}
409
410impl Graph {
411    /// Serialize this graph's state into a portable snapshot.
412    ///
413    /// # Concurrent-mutation caveat (torn read; M4.E1 / D167)
414    ///
415    /// `snapshot()` is a **point-in-time best-effort capture**, not an
416    /// isolated read. The implementation holds the graph's inner lock
417    /// for the namespace walk (collect names + child mounts), then
418    /// drops it before per-node `core.cache_of` / `core.is_terminal`
419    /// queries. If another thread (or a re-entrant wave) mutates state
420    /// during the post-walk phase, the snapshot may capture a mix of
421    /// pre- and post-mutation values for different nodes — individual
422    /// node slices are internally consistent, but the cross-node
423    /// composition is not transaction-isolated.
424    ///
425    /// The TS impl has the same semantics. No user has requested
426    /// snapshot-level isolation. If you need a consistent cross-node
427    /// view, the supported pattern is:
428    ///
429    /// - Wrap the snapshot call in [`Core::batch`] (drains the wave
430    ///   before `snapshot()` returns; subsequent emissions wait).
431    /// - OR call `graph.signal(SignalKind::Pause(lock))` first, then
432    ///   `snapshot()`, then `Resume(lock)` — explicitly freezes the
433    ///   reactive layer for the duration.
434    ///
435    /// A future copy-on-write epoch / snapshot-under-lock would close
436    /// the torn-read window at the cost of holding the Core lock for
437    /// the full serialization walk; gated on D196 consumer-pressure
438    /// (no scenario today justifies the lock-contention trade).
439    #[must_use]
440    pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
441        snapshot_of(core, &self.inner)
442    }
443
444    /// [`Self::snapshot`] over the one object-safe facade (D246 rule 5)
445    /// — for the storage in-wave `MailboxOp::Defer(|cf: &dyn CoreFull|)`
446    /// path, which only has a `&dyn CoreFull` (not a concrete `&Core`).
447    /// Read-only; `serialize_handle` delegates to the binding.
448    ///
449    /// Inherits the same concurrent-mutation caveat as [`Self::snapshot`].
450    #[must_use]
451    pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
452        snapshot_of(core, &self.inner)
453    }
454
455    /// Restore state from a snapshot into this existing graph.
456    ///
457    /// # Errors
458    /// `NameMismatch` if names differ; `UnknownNode`/`UnknownSubgraph`
459    /// for snapshot entries absent from the graph.
460    pub fn restore(
461        &self,
462        core: &Core,
463        snapshot: &GraphPersistSnapshot,
464    ) -> Result<(), SnapshotError> {
465        restore_into(core, &self.inner, snapshot)
466    }
467
468    /// Reconstruct a graph from a snapshot. **Builder mode**
469    /// (`builder = Some`): build topology then `restore()` values.
470    /// **Auto-hydration** (`builder = None`): reconstruct topology +
471    /// state from the snapshot via `factories` (state nodes need none).
472    ///
473    /// D246: the embedder owns the `Core` (see
474    /// [`graphrefly_core::OwnedCore`]) and passes it in; the binding is
475    /// `core.binding_ptr()`.
476    ///
477    /// # Errors
478    /// `UnresolvableDeps` if auto-hydration can't resolve a node's
479    /// deps; `MissingFactory` for a non-state node type with no factory.
480    pub fn from_snapshot(
481        core: &Core,
482        snapshot: &GraphPersistSnapshot,
483        builder: Option<SnapshotBuilder>,
484        factories: Option<IndexMap<String, NodeFactory>>,
485    ) -> Result<Self, SnapshotError> {
486        let graph = Graph::new(&snapshot.name);
487        let binding: Arc<dyn BindingBoundary> = core.binding();
488
489        if let Some(build_fn) = builder {
490            build_fn(core, &graph);
491            graph.restore(core, snapshot)?;
492            return Ok(graph);
493        }
494
495        let factories = factories.unwrap_or_default();
496
497        // D276 tree-wide hydration — replaces the pre-D276 single-pass
498        // per-graph `hydrate_subgraph` / `hydrate_nodes` recursion.
499        // Four passes:
500        //
501        //   Pass 0 — mount tree: recursively `mount_new` every
502        //            subgraph; record `(absolute_path → Graph)` so
503        //            later passes can address each owning graph.
504        //   Pass 1 — state-first: walk tree creating ALL state nodes
505        //            (no deps to resolve). Each state node is
506        //            registered in its owner graph's `names`, so
507        //            subsequent passes can locate it via
508        //            [`Graph::try_resolve`].
509        //   Pass 2 — derived: collect ALL non-state nodes across the
510        //            tree into one queue. Run ONE shared retry loop;
511        //            dep names resolve via the owner graph's
512        //            `try_resolve`, which natively handles owner-
513        //            relative paths with `".."` and `"::"` segments
514        //            (Slice V3 cross-subgraph path machinery).
515        //   Pass 3 — status restore: walk tree, apply
516        //            `Completed` / `Errored` via the same
517        //            `try_resolve` lookup.
518        //
519        // Back-compat: snapshots produced by callers that never
520        // crossed a mount boundary use bare local names only; those
521        // resolve identically to the pre-D276 flat lookup (and
522        // identically to a one-segment `try_resolve` on the owner).
523        //
524        // See `~/src/graphrefly-ts/docs/rust-port-decisions.md` D276
525        // and the `docs/porting-deferred.md` M4.E1 closure block.
526
527        // Pass 0 — mount tree.
528        let mut graph_map: IndexMap<String, Graph> = IndexMap::new();
529        graph_map.insert(String::new(), graph.clone());
530        mount_subgraphs_recursive(core, &graph, snapshot, "", &mut graph_map)?;
531
532        // Pass 1 — state nodes tree-wide.
533        create_state_nodes_recursive(core, snapshot, "", &graph_map, &binding)?;
534
535        // Pass 2 — derived nodes tree-wide with shared retry loop.
536        let mut derived_queue: Vec<DerivedEntry> = Vec::new();
537        collect_derived_recursive(snapshot, "", &graph_map, &mut derived_queue);
538        create_derived_with_retry(core, &factories, derived_queue)?;
539
540        // Pass 3 — status restore.
541        apply_status_recursive(core, snapshot, "", &graph_map, &binding)?;
542
543        Ok(graph)
544    }
545}
546
547/// D276 auto-hydration: a non-state node awaiting derivation in
548/// Pass 2's shared retry loop. The owner graph is captured so the
549/// retry loop can call `owner_graph.try_resolve(dep_name)` against
550/// the right namespace (owner-relative paths walk via the owner's
551/// parent/child references in [`crate::graph::resolve_checked`]).
552struct DerivedEntry {
553    owner_graph: Graph,
554    name: String,
555    slice: NodeSlice,
556}
557
558/// D276 Pass 0 — mount every subgraph under `parent` and accumulate
559/// each subgraph's absolute path → [`Graph`] handle into `graph_map`.
560fn mount_subgraphs_recursive(
561    core: &Core,
562    parent: &Graph,
563    snap: &GraphPersistSnapshot,
564    parent_path: &str,
565    graph_map: &mut IndexMap<String, Graph>,
566) -> Result<(), SnapshotError> {
567    for (child_name, child_snap) in &snap.subgraphs {
568        let child_graph = parent
569            .mount_new(core, child_name)
570            .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
571        let child_path = if parent_path.is_empty() {
572            child_name.clone()
573        } else {
574            format!("{parent_path}{PATH_SEP}{child_name}")
575        };
576        graph_map.insert(child_path.clone(), child_graph.clone());
577        mount_subgraphs_recursive(core, &child_graph, child_snap, &child_path, graph_map)?;
578    }
579    Ok(())
580}
581
582/// D276 Pass 1 — recursively create every state node in the tree
583/// (state nodes have no deps to resolve). Each new state node is
584/// registered in its owner graph's `names`; Pass 2/3 locate it via
585/// [`Graph::try_resolve`].
586fn create_state_nodes_recursive(
587    core: &Core,
588    snap: &GraphPersistSnapshot,
589    owner_path: &str,
590    graph_map: &IndexMap<String, Graph>,
591    binding: &Arc<dyn BindingBoundary>,
592) -> Result<(), SnapshotError> {
593    let owner_graph = graph_map
594        .get(owner_path)
595        .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
596
597    // D279 (2026-05-22, E-ii.1): pre-validate every state name against the
598    // owner graph's mount tree BEFORE any `register_state` call. Pre-D279,
599    // `Graph::state` called `core.register_state(...)` first (allocating a
600    // fresh NodeId + cache retention) THEN `add(name, ...)` — a name
601    // collision against a child mount populated by Pass 0 left an orphan
602    // NodeId in Core's registry with no path to teardown, and the surfaced
603    // error (`UnknownNode` via `map_err(|_| ...)`) discarded the real
604    // collision cause. Pre-validation closes both bugs at once: zero Core
605    // mutation on a doomed restore, and a dedicated `NameCollision`
606    // diagnostic.
607    let child_mount_names: std::collections::HashSet<String> =
608        owner_graph.child_names().into_iter().collect();
609    for (name, slice) in &snap.nodes {
610        if slice.node_type == "state" && child_mount_names.contains(name) {
611            return Err(SnapshotError::NameCollision {
612                name: name.clone(),
613                graph_path: owner_path.to_owned(),
614            });
615        }
616    }
617
618    for (name, slice) in &snap.nodes {
619        if slice.node_type == "state" {
620            let initial = slice
621                .value
622                .as_ref()
623                .map(|v| binding.deserialize_value(v.clone()));
624            owner_graph
625                .state(core, name, initial)
626                .map_err(|_| SnapshotError::UnknownNode(name.clone()))?;
627        }
628    }
629    for (child_name, child_snap) in &snap.subgraphs {
630        let child_path = if owner_path.is_empty() {
631            child_name.clone()
632        } else {
633            format!("{owner_path}{PATH_SEP}{child_name}")
634        };
635        create_state_nodes_recursive(core, child_snap, &child_path, graph_map, binding)?;
636    }
637    Ok(())
638}
639
640/// D276 Pass 2a — walk the entire tree collecting non-state nodes
641/// into one flat queue tagged with their owner graph + path. The
642/// queue is then run through [`create_derived_with_retry`] which
643/// can resolve deps that cross mount boundaries.
644fn collect_derived_recursive(
645    snap: &GraphPersistSnapshot,
646    owner_path: &str,
647    graph_map: &IndexMap<String, Graph>,
648    out: &mut Vec<DerivedEntry>,
649) {
650    let owner_graph = graph_map
651        .get(owner_path)
652        .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
653    for (name, slice) in &snap.nodes {
654        if slice.node_type != "state" {
655            out.push(DerivedEntry {
656                owner_graph: owner_graph.clone(),
657                name: name.clone(),
658                slice: slice.clone(),
659            });
660        }
661    }
662    for (child_name, child_snap) in &snap.subgraphs {
663        let child_path = if owner_path.is_empty() {
664            child_name.clone()
665        } else {
666            format!("{owner_path}{PATH_SEP}{child_name}")
667        };
668        collect_derived_recursive(child_snap, &child_path, graph_map, out);
669    }
670}
671
672/// D276 Pass 2b — tree-wide retry loop for derived/dynamic/operator
673/// nodes. Each iteration resolves an entry's deps via the owner
674/// graph's [`Graph::try_resolve`], which natively understands the
675/// owner-relative path syntax emitted by the D276 encoder:
676///
677/// - bare name → local lookup in owner's `names` (pre-D276 shape).
678/// - `"child::name"` → descend into a mounted subgraph.
679/// - `"..::name"` → walk to parent.
680/// - `"..::sibling::name"` → walk to parent then descend into a sibling.
681///
682/// Loop terminates when (a) all entries created — success — or
683/// (b) one pass made no progress — `UnresolvableDeps` on the first
684/// stuck entry.
685fn create_derived_with_retry(
686    core: &Core,
687    factories: &IndexMap<String, NodeFactory>,
688    entries: Vec<DerivedEntry>,
689) -> Result<(), SnapshotError> {
690    let mut remaining = entries;
691    loop {
692        let before = remaining.len();
693        let mut still_remaining = Vec::new();
694
695        for entry in remaining {
696            let mut resolved = Vec::with_capacity(entry.slice.deps.len());
697            let mut all_ok = true;
698            for dep_name in &entry.slice.deps {
699                if let Some(dep_id) = entry.owner_graph.try_resolve(dep_name) {
700                    resolved.push(dep_id);
701                } else {
702                    all_ok = false;
703                    break;
704                }
705            }
706            if all_ok {
707                let factory = factories.get(&entry.slice.node_type).ok_or_else(|| {
708                    SnapshotError::MissingFactory(entry.slice.node_type.clone(), entry.name.clone())
709                })?;
710                factory(
711                    core,
712                    &entry.owner_graph,
713                    &entry.name,
714                    &entry.slice,
715                    &resolved,
716                )?;
717            } else {
718                still_remaining.push(entry);
719            }
720        }
721
722        remaining = still_remaining;
723        if remaining.is_empty() {
724            break;
725        }
726        if remaining.len() == before {
727            let entry = &remaining[0];
728            return Err(SnapshotError::UnresolvableDeps(
729                entry.name.clone(),
730                entry.slice.deps.clone(),
731            ));
732        }
733    }
734    Ok(())
735}
736
737/// D276 Pass 3 — recursively apply each node's snapshot status
738/// (Completed / Errored) via the owner graph's [`Graph::try_resolve`].
739/// Sentinel / Live are no-ops (state nodes already received their
740/// cache during Pass 1's `Graph::state` initial-value path; derived
741/// recompute on first subscribe).
742fn apply_status_recursive(
743    core: &Core,
744    snap: &GraphPersistSnapshot,
745    owner_path: &str,
746    graph_map: &IndexMap<String, Graph>,
747    binding: &Arc<dyn BindingBoundary>,
748) -> Result<(), SnapshotError> {
749    let owner_graph = graph_map
750        .get(owner_path)
751        .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
752    for (name, slice) in &snap.nodes {
753        // /qa G2.1 (2026-05-22): mirror `restore_into:339` — a node listed
754        // in the snapshot that doesn't resolve via `try_resolve` is a
755        // structural inconsistency (Pass 1/Pass 2 should have created it).
756        // Silently dropping the status would mask a Pass-2 hydration bug
757        // as "successful restore with corrupted lifecycle state."
758        let node_id = owner_graph
759            .try_resolve(name)
760            .ok_or_else(|| SnapshotError::UnknownNode(name.clone()))?;
761        match &slice.status {
762            NodeSnapshotStatus::Completed => {
763                owner_graph.complete(core, node_id);
764            }
765            NodeSnapshotStatus::Errored { error } => {
766                if let Some(err_val) = error {
767                    let err_handle = binding.deserialize_value(err_val.clone());
768                    owner_graph.error(core, node_id, err_handle);
769                }
770            }
771            NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
772        }
773    }
774    for (child_name, child_snap) in &snap.subgraphs {
775        let child_path = if owner_path.is_empty() {
776            child_name.clone()
777        } else {
778            format!("{owner_path}{PATH_SEP}{child_name}")
779        };
780        apply_status_recursive(core, child_snap, &child_path, graph_map, binding)?;
781    }
782    Ok(())
783}