Skip to main content

graphrefly_graph/
snapshot.rs

1//! `snapshot()` / `restore()` / `Graph::from_snapshot()` — portable
2//! serialization of graph state (M4.E1, R3.8).
3//!
4//! D246: `snapshot`/`restore`/`from_snapshot` are inherent [`Graph`]
5//! methods over the Core-free namespace tree, taking the embedder's
6//! `&Core` explicitly (D246 rule 2). `snapshot_of` is generic over
7//! `&dyn CoreFull` (the one facade) so the storage in-wave
8//! `MailboxOp::Defer` observe-sink can run it (read-only;
9//! `serialize_handle` delegates to the binding). No `SubgraphRef`/
10//! `GraphOps`/`SnapshotOps` — one `Graph`, plain free fns.
11//!
12//! # Handle-protocol boundary
13//!
14//! `snapshot()` calls `BindingBoundary::serialize_handle`; `restore()`
15//! / `from_snapshot()` call `BindingBoundary::deserialize_value`.
16//! Per D169 edges are omitted (derived from deps via `edges()`).
17
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::rc::Rc;
21use std::sync::Arc;
22
23use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
24use indexmap::IndexMap;
25use serde::{Deserialize, Serialize};
26
27use crate::graph::{resolve_checked, Graph, GraphInner, PATH_SEP};
28
29/// Portable snapshot of a graph's state.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct GraphPersistSnapshot {
32    /// Graph name as set at construction / mount.
33    pub name: String,
34    /// Per-node state by local name, in namespace insertion order.
35    pub nodes: IndexMap<String, NodeSlice>,
36    /// Mounted subgraph snapshots, keyed by mount name.
37    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
38    pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
39}
40
41/// Per-node state within a snapshot.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeSlice {
44    /// `"state"` / `"derived"` / `"dynamic"` / `"producer"` / `"operator"`.
45    #[serde(rename = "type")]
46    pub node_type: String,
47    /// Serialized cache value. `None` when sentinel.
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub value: Option<serde_json::Value>,
50    /// Node lifecycle status.
51    pub status: NodeSnapshotStatus,
52    /// Dependency names in declaration order.
53    #[serde(default, skip_serializing_if = "Vec::is_empty")]
54    pub deps: Vec<String>,
55}
56
57/// Lifecycle status stored in a snapshot.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(rename_all = "lowercase")]
60pub enum NodeSnapshotStatus {
61    /// Never emitted DATA.
62    Sentinel,
63    /// Has emitted at least one DATA.
64    Live,
65    /// Terminal: COMPLETE.
66    Completed,
67    /// Terminal: ERROR (carries the serialized error value).
68    Errored {
69        #[serde(default, skip_serializing_if = "Option::is_none")]
70        error: Option<serde_json::Value>,
71    },
72}
73
74/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
75#[derive(Debug, thiserror::Error)]
76pub enum SnapshotError {
77    #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
78    NameMismatch { expected: String, actual: String },
79    #[error("node `{0}` in snapshot not found in graph namespace")]
80    UnknownNode(String),
81    #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
82    UnknownSubgraph(String),
83    #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
84    UnresolvableDeps(String, Vec<String>),
85    #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
86    MissingFactory(String, String),
87    /// D279 (2026-05-22, E-ii.1): a state node in the snapshot collides
88    /// with an existing child mount name on the owner graph at decode
89    /// time. Raised by Pass 1's pre-validation BEFORE any Core mutation
90    /// — prevents the orphan-`NodeId` leak that pre-D279 occurred when
91    /// `Graph::state` registered a `NodeId` before the namespace `add`
92    /// returned `NameError::Collision`. `graph_path` is the owner
93    /// graph's tree-relative path (empty string for the root).
94    #[error("snapshot decode: state node `{name}` at graph `{graph_path}` collides with an existing child mount of the same name")]
95    NameCollision { name: String, graph_path: String },
96}
97
98/// Factory for auto-hydration mode. D246: receives the embedder's
99/// `&Core` + the Core-free [`Graph`] handle.
100pub type NodeFactory =
101    Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
102
103/// Builder function for `Graph::from_snapshot` builder mode (D246:
104/// `&Core` + Core-free [`Graph`]).
105pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
106
107/// D246: recursive snapshot over `(&dyn CoreFull, &Rc<RefCell<GraphInner>>)`
108/// — `&dyn CoreFull` (the one facade) so the storage in-wave
109/// `MailboxOp::Defer` observe-sink can run it (read-only;
110/// `serialize_handle` delegates to the binding).
111///
112/// **D276 (cross-mount deps):** the encoder pre-computes a tree-wide
113/// `id_to_tree_path: HashMap<NodeId, String>` covering every named
114/// node in the snapshot tree. For each dep the encoder emits an
115/// **owner-relative path** using [`PATH_SEP`] (`"::"`) and `".."`
116/// segments — the same syntax accepted by [`Graph::try_resolve`]:
117///
118/// - same-graph dep → bare local name (back-compat with snapshots
119///   produced by callers that never crossed a mount boundary).
120/// - cross-mount dep down → `"child::name"` / `"child::nested::name"`.
121/// - cross-mount dep up → `"..::name"`, `"..::..::name"`, …
122/// - cross-mount sibling → `"..::sibling::name"`.
123///
124/// Pre-D276 the encoder fell through to `"_anon_<rawid>"` for any
125/// dep whose `NodeId` wasn't in the LOCAL graph's `names` map,
126/// destroying every cross-mount reference at serialization time.
127/// The new owner-relative encoding round-trips through
128/// `Graph::from_snapshot`'s tree-wide hydration; the decoder
129/// resolves dep names via [`Graph::try_resolve`] on the owner graph,
130/// reusing Slice V3's cross-subgraph path machinery.
131pub(crate) fn snapshot_of(
132    core: &dyn CoreFull,
133    inner_arc: &Rc<RefCell<GraphInner>>,
134) -> GraphPersistSnapshot {
135    let id_to_tree_path = build_id_to_tree_path(inner_arc);
136    snapshot_of_with_tree_paths(core, inner_arc, &id_to_tree_path, "")
137}
138
139/// D276: walk the entire mount tree under `root` and build a
140/// `NodeId → absolute_path` map. Absolute paths are **relative to
141/// the snapshot root** (no leading root name) and use [`PATH_SEP`]
142/// (`"::"`) — the same syntax accepted by [`Graph::try_resolve`].
143fn build_id_to_tree_path(root: &Rc<RefCell<GraphInner>>) -> HashMap<NodeId, String> {
144    let mut map = HashMap::new();
145    walk_tree_paths(root, "", &mut map);
146    map
147}
148
149fn walk_tree_paths(
150    inner_arc: &Rc<RefCell<GraphInner>>,
151    path_prefix: &str,
152    out: &mut HashMap<NodeId, String>,
153) {
154    let inner = inner_arc.borrow_mut();
155    let names: Vec<(String, NodeId)> = inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
156    let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
157        .children
158        .iter()
159        .map(|(n, g)| (n.clone(), g.clone()))
160        .collect();
161    drop(inner);
162    for (name, id) in &names {
163        let abs_path = if path_prefix.is_empty() {
164            name.clone()
165        } else {
166            format!("{path_prefix}{PATH_SEP}{name}")
167        };
168        out.insert(*id, abs_path);
169    }
170    for (child_name, child_inner) in &children {
171        let child_prefix = if path_prefix.is_empty() {
172            child_name.clone()
173        } else {
174            format!("{path_prefix}{PATH_SEP}{child_name}")
175        };
176        walk_tree_paths(child_inner, &child_prefix, out);
177    }
178}
179
180/// D276 helper — convert a dep's absolute path to a path relative
181/// to the owner graph. Same-graph deps collapse to a bare local
182/// name (back-compat); cross-mount deps use `".."`/`"name"`
183/// segments separated by [`PATH_SEP`].
184///
185/// Examples (`owner_path` → `abs_path` → result):
186///
187/// - `""` → `"a"` → `"a"` (root local)
188/// - `"child"` → `"child::b"` → `"b"` (same-graph local)
189/// - `""` → `"child::b"` → `"child::b"` (descend)
190/// - `"child"` → `"a"` → `"..::a"` (ascend to root)
191/// - `"child::nested"` → `"a"` → `"..::..::a"` (ascend two)
192/// - `"child::a"` → `"other::b"` → `"..::other::b"` (sibling)
193fn absolute_to_owner_relative(owner_path: &str, abs_path: &str) -> String {
194    // /qa G2.7 (2026-05-22): self-dep guard. graphrefly rejects self-deps at
195    // registration (`SetDepsError::SelfDep`), so a healthy `Core::deps_of`
196    // never yields the owning node's own id. If a snapshot is hand-
197    // constructed pathologically OR an operator-internal NodeId is reused
198    // structurally as both owner and dep, computing `owner_path == abs_path`
199    // here would emit `""` — which `Graph::try_resolve("")` rejects with
200    // `PathError::Empty`, eventually surfacing as `UnresolvableDeps` from
201    // the decode retry loop. Catch it loudly at encode in debug builds.
202    debug_assert_ne!(
203        owner_path, abs_path,
204        "D276 invariant: self-deps are rejected at registration; \
205         encoding a dep whose absolute path equals the owner's would emit \
206         an empty relative path"
207    );
208    let owner_segs: Vec<&str> = if owner_path.is_empty() {
209        Vec::new()
210    } else {
211        owner_path.split(PATH_SEP).collect()
212    };
213    let abs_segs: Vec<&str> = if abs_path.is_empty() {
214        Vec::new()
215    } else {
216        abs_path.split(PATH_SEP).collect()
217    };
218    let mut common = 0;
219    while common < owner_segs.len()
220        && common < abs_segs.len()
221        && owner_segs[common] == abs_segs[common]
222    {
223        common += 1;
224    }
225    let up_count = owner_segs.len() - common;
226    let down_segs = &abs_segs[common..];
227    if up_count == 0 {
228        return down_segs.join(PATH_SEP);
229    }
230    let mut parts: Vec<&str> = vec![".."; up_count];
231    parts.extend(down_segs);
232    parts.join(PATH_SEP)
233}
234
235fn snapshot_of_with_tree_paths(
236    core: &dyn CoreFull,
237    inner_arc: &Rc<RefCell<GraphInner>>,
238    id_to_tree_path: &HashMap<NodeId, String>,
239    owner_path: &str,
240) -> GraphPersistSnapshot {
241    let (name, node_entries, children, id_to_name) = {
242        let inner = inner_arc.borrow_mut();
243        let name = inner.name.clone();
244        let node_entries: Vec<(String, NodeId)> =
245            inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
246        let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
247            .children
248            .iter()
249            .map(|(n, g)| (n.clone(), g.clone()))
250            .collect();
251        let id_to_name: IndexMap<NodeId, String> =
252            inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
253        (name, node_entries, children, id_to_name)
254    };
255
256    let mut nodes = IndexMap::new();
257
258    for (node_name, node_id) in &node_entries {
259        let kind = core.kind_of(*node_id);
260        let node_type = match kind {
261            Some(NodeKind::State) => "state",
262            Some(NodeKind::Derived) => "derived",
263            Some(NodeKind::Dynamic) => "dynamic",
264            Some(NodeKind::Producer) => "producer",
265            Some(NodeKind::Operator(_)) => "operator",
266            None => "unknown",
267        };
268
269        let cache = core.cache_of(*node_id);
270        let value = if cache == NO_HANDLE {
271            None
272        } else {
273            core.serialize_handle(cache)
274        };
275
276        let terminal = core.is_terminal(*node_id);
277        let status = match terminal {
278            Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
279            Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
280                error: core.serialize_handle(err_handle),
281            },
282            None => {
283                if core.has_fired_once(*node_id) || cache != NO_HANDLE {
284                    NodeSnapshotStatus::Live
285                } else {
286                    NodeSnapshotStatus::Sentinel
287                }
288            }
289        };
290
291        // D276: dep-name encoding — 3 tiers, all owner-relative:
292        // (1) same-graph dep → bare local name (back-compat — pre-D276
293        //     snapshots used this shape exclusively).
294        // (2) cross-mount dep that IS named somewhere in the snapshot
295        //     tree → owner-relative path via [`PATH_SEP`] + `".."`
296        //     segments (resolves via `Graph::try_resolve` on decode).
297        // (3) anonymous dep (operator-internal NodeId with no name in
298        //     ANY graph) → `_anon_<rawid>` fallback (pre-D276 behavior,
299        //     unchanged; decode still fails with `UnresolvableDeps`
300        //     for these — not in M4.E1 scope).
301        //
302        // D301 B.b (Q4 sub-decision, 2026-05-26): persistence-vs-
303        // presentation distinction — snapshot encode KEEPS the
304        // `_anon_<rawid>` marker while describe (`describe.rs:229`,
305        // `graph.rs:1055`) converges to empty-string for TS parity.
306        // Rationale: the marker carries real consumer value at decode
307        // time. `SnapshotError::UnresolvableDeps` (snapshot.rs:84)
308        // formats as `"unresolvable deps for node `{0}` (deps: {1:?})"`
309        // — `{1:?}` is Debug-format of `Vec<String>`, preserving each
310        // `_anon_<rawid>` verbatim. Converging snapshot to `""` would
311        // degrade the diagnostic to `(deps: ["", ""])` — indistinguish-
312        // able collisions for multiple unresolvable anon deps.
313        // Describe is a presentation surface (cross-arm wire-shape
314        // parity matters more than per-NodeId disambiguation); snapshot
315        // is a persistence surface (decode-time diagnostic fidelity
316        // matters more than wire-shape parity to TS — which has its
317        // own analogous gap on the TS-snapshot side).
318        let dep_ids = core.deps_of(*node_id);
319        let deps: Vec<String> = dep_ids
320            .iter()
321            .map(|dep_id| {
322                if let Some(local_name) = id_to_name.get(dep_id) {
323                    local_name.clone()
324                } else if let Some(tree_path) = id_to_tree_path.get(dep_id) {
325                    absolute_to_owner_relative(owner_path, tree_path)
326                } else {
327                    format!("_anon_{}", dep_id.raw())
328                }
329            })
330            .collect();
331
332        nodes.insert(
333            node_name.clone(),
334            NodeSlice {
335                node_type: node_type.to_owned(),
336                value,
337                status,
338                deps,
339            },
340        );
341    }
342
343    let mut subgraphs = IndexMap::new();
344    for (child_name, child_inner) in children {
345        let child_owner_path = if owner_path.is_empty() {
346            child_name.clone()
347        } else {
348            format!("{owner_path}{PATH_SEP}{child_name}")
349        };
350        subgraphs.insert(
351            child_name,
352            snapshot_of_with_tree_paths(core, &child_inner, id_to_tree_path, &child_owner_path),
353        );
354    }
355
356    GraphPersistSnapshot {
357        name,
358        nodes,
359        subgraphs,
360    }
361}
362
363/// Recursive restore over `(&Core, &Rc<RefCell<GraphInner>>)`.
364fn restore_into(
365    core: &Core,
366    inner_arc: &Rc<RefCell<GraphInner>>,
367    snapshot: &GraphPersistSnapshot,
368) -> Result<(), SnapshotError> {
369    let graph_name = inner_arc.borrow_mut().name.clone();
370    if snapshot.name != graph_name {
371        return Err(SnapshotError::NameMismatch {
372            expected: snapshot.name.clone(),
373            actual: graph_name,
374        });
375    }
376
377    let binding = core.binding_ptr();
378
379    for (node_name, slice) in &snapshot.nodes {
380        let node_id = resolve_checked(inner_arc, node_name)
381            .ok()
382            .flatten()
383            .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
384
385        if slice.node_type == "state" {
386            if let Some(ref value) = slice.value {
387                let handle = binding.deserialize_value(value.clone());
388                core.emit(node_id, handle);
389            }
390        }
391
392        match &slice.status {
393            NodeSnapshotStatus::Completed => {
394                core.complete(node_id);
395            }
396            NodeSnapshotStatus::Errored { error } => {
397                if let Some(err_val) = error {
398                    let err_handle = binding.deserialize_value(err_val.clone());
399                    core.error(node_id, err_handle);
400                }
401            }
402            NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
403        }
404    }
405
406    let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
407        let inner = inner_arc.borrow_mut();
408        snapshot
409            .subgraphs
410            .keys()
411            .map(|name| {
412                let child = inner
413                    .children
414                    .get(name)
415                    .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
416                Ok((name.clone(), child.clone()))
417            })
418            .collect::<Result<Vec<_>, SnapshotError>>()?
419    };
420    for (child_name, child_inner) in child_pairs {
421        restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
422    }
423
424    Ok(())
425}
426
427impl Graph {
428    /// Serialize this graph's state into a portable snapshot.
429    ///
430    /// # Concurrent-mutation caveat (torn read; M4.E1 / D167)
431    ///
432    /// `snapshot()` is a **point-in-time best-effort capture**, not an
433    /// isolated read. The implementation holds the graph's inner lock
434    /// for the namespace walk (collect names + child mounts), then
435    /// drops it before per-node `core.cache_of` / `core.is_terminal`
436    /// queries. If another thread (or a re-entrant wave) mutates state
437    /// during the post-walk phase, the snapshot may capture a mix of
438    /// pre- and post-mutation values for different nodes — individual
439    /// node slices are internally consistent, but the cross-node
440    /// composition is not transaction-isolated.
441    ///
442    /// The TS impl has the same semantics. No user has requested
443    /// snapshot-level isolation. If you need a consistent cross-node
444    /// view, the supported pattern is:
445    ///
446    /// - Wrap the snapshot call in [`Core::batch`] (drains the wave
447    ///   before `snapshot()` returns; subsequent emissions wait).
448    /// - OR call `graph.signal(SignalKind::Pause(lock))` first, then
449    ///   `snapshot()`, then `Resume(lock)` — explicitly freezes the
450    ///   reactive layer for the duration.
451    ///
452    /// A future copy-on-write epoch / snapshot-under-lock would close
453    /// the torn-read window at the cost of holding the Core lock for
454    /// the full serialization walk; gated on D196 consumer-pressure
455    /// (no scenario today justifies the lock-contention trade).
456    #[must_use]
457    pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
458        snapshot_of(core, &self.inner)
459    }
460
461    /// [`Self::snapshot`] over the one object-safe facade (D246 rule 5)
462    /// — for the storage in-wave `MailboxOp::Defer(|cf: &dyn CoreFull|)`
463    /// path, which only has a `&dyn CoreFull` (not a concrete `&Core`).
464    /// Read-only; `serialize_handle` delegates to the binding.
465    ///
466    /// Inherits the same concurrent-mutation caveat as [`Self::snapshot`].
467    #[must_use]
468    pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
469        snapshot_of(core, &self.inner)
470    }
471
472    /// Restore state from a snapshot into this existing graph.
473    ///
474    /// # Errors
475    /// `NameMismatch` if names differ; `UnknownNode`/`UnknownSubgraph`
476    /// for snapshot entries absent from the graph.
477    pub fn restore(
478        &self,
479        core: &Core,
480        snapshot: &GraphPersistSnapshot,
481    ) -> Result<(), SnapshotError> {
482        restore_into(core, &self.inner, snapshot)
483    }
484
485    /// Reconstruct a graph from a snapshot. **Builder mode**
486    /// (`builder = Some`): build topology then `restore()` values.
487    /// **Auto-hydration** (`builder = None`): reconstruct topology +
488    /// state from the snapshot via `factories` (state nodes need none).
489    ///
490    /// D246: the embedder owns the `Core` (see
491    /// [`graphrefly_core::OwnedCore`]) and passes it in; the binding is
492    /// `core.binding_ptr()`.
493    ///
494    /// # Errors
495    /// `UnresolvableDeps` if auto-hydration can't resolve a node's
496    /// deps; `MissingFactory` for a non-state node type with no factory.
497    pub fn from_snapshot(
498        core: &Core,
499        snapshot: &GraphPersistSnapshot,
500        builder: Option<SnapshotBuilder>,
501        factories: Option<IndexMap<String, NodeFactory>>,
502    ) -> Result<Self, SnapshotError> {
503        let graph = Graph::new(&snapshot.name);
504        let binding: Arc<dyn BindingBoundary> = core.binding();
505
506        if let Some(build_fn) = builder {
507            build_fn(core, &graph);
508            graph.restore(core, snapshot)?;
509            return Ok(graph);
510        }
511
512        let factories = factories.unwrap_or_default();
513
514        // D276 tree-wide hydration — replaces the pre-D276 single-pass
515        // per-graph `hydrate_subgraph` / `hydrate_nodes` recursion.
516        // Four passes:
517        //
518        //   Pass 0 — mount tree: recursively `mount_new` every
519        //            subgraph; record `(absolute_path → Graph)` so
520        //            later passes can address each owning graph.
521        //   Pass 1 — state-first: walk tree creating ALL state nodes
522        //            (no deps to resolve). Each state node is
523        //            registered in its owner graph's `names`, so
524        //            subsequent passes can locate it via
525        //            [`Graph::try_resolve`].
526        //   Pass 2 — derived: collect ALL non-state nodes across the
527        //            tree into one queue. Run ONE shared retry loop;
528        //            dep names resolve via the owner graph's
529        //            `try_resolve`, which natively handles owner-
530        //            relative paths with `".."` and `"::"` segments
531        //            (Slice V3 cross-subgraph path machinery).
532        //   Pass 3 — status restore: walk tree, apply
533        //            `Completed` / `Errored` via the same
534        //            `try_resolve` lookup.
535        //
536        // Back-compat: snapshots produced by callers that never
537        // crossed a mount boundary use bare local names only; those
538        // resolve identically to the pre-D276 flat lookup (and
539        // identically to a one-segment `try_resolve` on the owner).
540        //
541        // See `~/src/graphrefly-ts/docs/rust-port-decisions.md` D276
542        // and the `docs/porting-deferred.md` M4.E1 closure block.
543
544        // Pass 0 — mount tree.
545        let mut graph_map: IndexMap<String, Graph> = IndexMap::new();
546        graph_map.insert(String::new(), graph.clone());
547        mount_subgraphs_recursive(core, &graph, snapshot, "", &mut graph_map)?;
548
549        // Pass 1 — state nodes tree-wide.
550        create_state_nodes_recursive(core, snapshot, "", &graph_map, &binding)?;
551
552        // Pass 2 — derived nodes tree-wide with shared retry loop.
553        let mut derived_queue: Vec<DerivedEntry> = Vec::new();
554        collect_derived_recursive(snapshot, "", &graph_map, &mut derived_queue);
555        create_derived_with_retry(core, &factories, derived_queue)?;
556
557        // Pass 3 — status restore.
558        apply_status_recursive(core, snapshot, "", &graph_map, &binding)?;
559
560        Ok(graph)
561    }
562}
563
564/// D276 auto-hydration: a non-state node awaiting derivation in
565/// Pass 2's shared retry loop. The owner graph is captured so the
566/// retry loop can call `owner_graph.try_resolve(dep_name)` against
567/// the right namespace (owner-relative paths walk via the owner's
568/// parent/child references in [`crate::graph::resolve_checked`]).
569struct DerivedEntry {
570    owner_graph: Graph,
571    name: String,
572    slice: NodeSlice,
573}
574
575/// D276 Pass 0 — mount every subgraph under `parent` and accumulate
576/// each subgraph's absolute path → [`Graph`] handle into `graph_map`.
577fn mount_subgraphs_recursive(
578    core: &Core,
579    parent: &Graph,
580    snap: &GraphPersistSnapshot,
581    parent_path: &str,
582    graph_map: &mut IndexMap<String, Graph>,
583) -> Result<(), SnapshotError> {
584    for (child_name, child_snap) in &snap.subgraphs {
585        let child_graph = parent
586            .mount_new(core, child_name)
587            .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
588        let child_path = if parent_path.is_empty() {
589            child_name.clone()
590        } else {
591            format!("{parent_path}{PATH_SEP}{child_name}")
592        };
593        graph_map.insert(child_path.clone(), child_graph.clone());
594        mount_subgraphs_recursive(core, &child_graph, child_snap, &child_path, graph_map)?;
595    }
596    Ok(())
597}
598
599/// D276 Pass 1 — recursively create every state node in the tree
600/// (state nodes have no deps to resolve). Each new state node is
601/// registered in its owner graph's `names`; Pass 2/3 locate it via
602/// [`Graph::try_resolve`].
603fn create_state_nodes_recursive(
604    core: &Core,
605    snap: &GraphPersistSnapshot,
606    owner_path: &str,
607    graph_map: &IndexMap<String, Graph>,
608    binding: &Arc<dyn BindingBoundary>,
609) -> Result<(), SnapshotError> {
610    let owner_graph = graph_map
611        .get(owner_path)
612        .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
613
614    // D279 (2026-05-22, E-ii.1): pre-validate every state name against the
615    // owner graph's mount tree BEFORE any `register_state` call. Pre-D279,
616    // `Graph::state` called `core.register_state(...)` first (allocating a
617    // fresh NodeId + cache retention) THEN `add(name, ...)` — a name
618    // collision against a child mount populated by Pass 0 left an orphan
619    // NodeId in Core's registry with no path to teardown, and the surfaced
620    // error (`UnknownNode` via `map_err(|_| ...)`) discarded the real
621    // collision cause. Pre-validation closes both bugs at once: zero Core
622    // mutation on a doomed restore, and a dedicated `NameCollision`
623    // diagnostic.
624    let child_mount_names: std::collections::HashSet<String> =
625        owner_graph.child_names().into_iter().collect();
626    for (name, slice) in &snap.nodes {
627        if slice.node_type == "state" && child_mount_names.contains(name) {
628            return Err(SnapshotError::NameCollision {
629                name: name.clone(),
630                graph_path: owner_path.to_owned(),
631            });
632        }
633    }
634
635    for (name, slice) in &snap.nodes {
636        if slice.node_type == "state" {
637            let initial = slice
638                .value
639                .as_ref()
640                .map(|v| binding.deserialize_value(v.clone()));
641            owner_graph
642                .state(core, name, initial)
643                .map_err(|_| SnapshotError::UnknownNode(name.clone()))?;
644        }
645    }
646    for (child_name, child_snap) in &snap.subgraphs {
647        let child_path = if owner_path.is_empty() {
648            child_name.clone()
649        } else {
650            format!("{owner_path}{PATH_SEP}{child_name}")
651        };
652        create_state_nodes_recursive(core, child_snap, &child_path, graph_map, binding)?;
653    }
654    Ok(())
655}
656
657/// D276 Pass 2a — walk the entire tree collecting non-state nodes
658/// into one flat queue tagged with their owner graph + path. The
659/// queue is then run through [`create_derived_with_retry`] which
660/// can resolve deps that cross mount boundaries.
661fn collect_derived_recursive(
662    snap: &GraphPersistSnapshot,
663    owner_path: &str,
664    graph_map: &IndexMap<String, Graph>,
665    out: &mut Vec<DerivedEntry>,
666) {
667    let owner_graph = graph_map
668        .get(owner_path)
669        .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
670    for (name, slice) in &snap.nodes {
671        if slice.node_type != "state" {
672            out.push(DerivedEntry {
673                owner_graph: owner_graph.clone(),
674                name: name.clone(),
675                slice: slice.clone(),
676            });
677        }
678    }
679    for (child_name, child_snap) in &snap.subgraphs {
680        let child_path = if owner_path.is_empty() {
681            child_name.clone()
682        } else {
683            format!("{owner_path}{PATH_SEP}{child_name}")
684        };
685        collect_derived_recursive(child_snap, &child_path, graph_map, out);
686    }
687}
688
689/// D276 Pass 2b — tree-wide retry loop for derived/dynamic/operator
690/// nodes. Each iteration resolves an entry's deps via the owner
691/// graph's [`Graph::try_resolve`], which natively understands the
692/// owner-relative path syntax emitted by the D276 encoder:
693///
694/// - bare name → local lookup in owner's `names` (pre-D276 shape).
695/// - `"child::name"` → descend into a mounted subgraph.
696/// - `"..::name"` → walk to parent.
697/// - `"..::sibling::name"` → walk to parent then descend into a sibling.
698///
699/// Loop terminates when (a) all entries created — success — or
700/// (b) one pass made no progress — `UnresolvableDeps` on the first
701/// stuck entry.
702fn create_derived_with_retry(
703    core: &Core,
704    factories: &IndexMap<String, NodeFactory>,
705    entries: Vec<DerivedEntry>,
706) -> Result<(), SnapshotError> {
707    let mut remaining = entries;
708    loop {
709        let before = remaining.len();
710        let mut still_remaining = Vec::new();
711
712        for entry in remaining {
713            let mut resolved = Vec::with_capacity(entry.slice.deps.len());
714            let mut all_ok = true;
715            for dep_name in &entry.slice.deps {
716                if let Some(dep_id) = entry.owner_graph.try_resolve(dep_name) {
717                    resolved.push(dep_id);
718                } else {
719                    all_ok = false;
720                    break;
721                }
722            }
723            if all_ok {
724                let factory = factories.get(&entry.slice.node_type).ok_or_else(|| {
725                    SnapshotError::MissingFactory(entry.slice.node_type.clone(), entry.name.clone())
726                })?;
727                factory(
728                    core,
729                    &entry.owner_graph,
730                    &entry.name,
731                    &entry.slice,
732                    &resolved,
733                )?;
734            } else {
735                still_remaining.push(entry);
736            }
737        }
738
739        remaining = still_remaining;
740        if remaining.is_empty() {
741            break;
742        }
743        if remaining.len() == before {
744            let entry = &remaining[0];
745            return Err(SnapshotError::UnresolvableDeps(
746                entry.name.clone(),
747                entry.slice.deps.clone(),
748            ));
749        }
750    }
751    Ok(())
752}
753
754/// D276 Pass 3 — recursively apply each node's snapshot status
755/// (Completed / Errored) via the owner graph's [`Graph::try_resolve`].
756/// Sentinel / Live are no-ops (state nodes already received their
757/// cache during Pass 1's `Graph::state` initial-value path; derived
758/// recompute on first subscribe).
759fn apply_status_recursive(
760    core: &Core,
761    snap: &GraphPersistSnapshot,
762    owner_path: &str,
763    graph_map: &IndexMap<String, Graph>,
764    binding: &Arc<dyn BindingBoundary>,
765) -> Result<(), SnapshotError> {
766    let owner_graph = graph_map
767        .get(owner_path)
768        .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
769    for (name, slice) in &snap.nodes {
770        // /qa G2.1 (2026-05-22): mirror `restore_into:339` — a node listed
771        // in the snapshot that doesn't resolve via `try_resolve` is a
772        // structural inconsistency (Pass 1/Pass 2 should have created it).
773        // Silently dropping the status would mask a Pass-2 hydration bug
774        // as "successful restore with corrupted lifecycle state."
775        let node_id = owner_graph
776            .try_resolve(name)
777            .ok_or_else(|| SnapshotError::UnknownNode(name.clone()))?;
778        match &slice.status {
779            NodeSnapshotStatus::Completed => {
780                owner_graph.complete(core, node_id);
781            }
782            NodeSnapshotStatus::Errored { error } => {
783                if let Some(err_val) = error {
784                    let err_handle = binding.deserialize_value(err_val.clone());
785                    owner_graph.error(core, node_id, err_handle);
786                }
787            }
788            NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
789        }
790    }
791    for (child_name, child_snap) in &snap.subgraphs {
792        let child_path = if owner_path.is_empty() {
793            child_name.clone()
794        } else {
795            format!("{owner_path}{PATH_SEP}{child_name}")
796        };
797        apply_status_recursive(core, child_snap, &child_path, graph_map, binding)?;
798    }
799    Ok(())
800}