graphrefly_graph/snapshot.rs
1//! `snapshot()` / `restore()` / `Graph::from_snapshot()` — portable
2//! serialization of graph state (M4.E1, R3.8).
3//!
4//! D246: `snapshot`/`restore`/`from_snapshot` are inherent [`Graph`]
5//! methods over the Core-free namespace tree, taking the embedder's
6//! `&Core` explicitly (D246 rule 2). `snapshot_of` is generic over
7//! `&dyn CoreFull` (the one facade) so the storage in-wave
8//! `MailboxOp::Defer` observe-sink can run it (read-only;
9//! `serialize_handle` delegates to the binding). No `SubgraphRef`/
10//! `GraphOps`/`SnapshotOps` — one `Graph`, plain free fns.
11//!
12//! # Handle-protocol boundary
13//!
14//! `snapshot()` calls `BindingBoundary::serialize_handle`; `restore()`
15//! / `from_snapshot()` call `BindingBoundary::deserialize_value`.
16//! Per D169 edges are omitted (derived from deps via `edges()`).
17
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::rc::Rc;
21use std::sync::Arc;
22
23use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
24use indexmap::IndexMap;
25use serde::{Deserialize, Serialize};
26
27use crate::graph::{resolve_checked, Graph, GraphInner, PATH_SEP};
28
29/// Portable snapshot of a graph's state.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct GraphPersistSnapshot {
32 /// Graph name as set at construction / mount.
33 pub name: String,
34 /// Per-node state by local name, in namespace insertion order.
35 pub nodes: IndexMap<String, NodeSlice>,
36 /// Mounted subgraph snapshots, keyed by mount name.
37 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
38 pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
39}
40
41/// Per-node state within a snapshot.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeSlice {
44 /// `"state"` / `"derived"` / `"dynamic"` / `"producer"` / `"operator"`.
45 #[serde(rename = "type")]
46 pub node_type: String,
47 /// Serialized cache value. `None` when sentinel.
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub value: Option<serde_json::Value>,
50 /// Node lifecycle status.
51 pub status: NodeSnapshotStatus,
52 /// Dependency names in declaration order.
53 #[serde(default, skip_serializing_if = "Vec::is_empty")]
54 pub deps: Vec<String>,
55}
56
57/// Lifecycle status stored in a snapshot.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(rename_all = "lowercase")]
60pub enum NodeSnapshotStatus {
61 /// Never emitted DATA.
62 Sentinel,
63 /// Has emitted at least one DATA.
64 Live,
65 /// Terminal: COMPLETE.
66 Completed,
67 /// Terminal: ERROR (carries the serialized error value).
68 Errored {
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 error: Option<serde_json::Value>,
71 },
72}
73
74/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
75#[derive(Debug, thiserror::Error)]
76pub enum SnapshotError {
77 #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
78 NameMismatch { expected: String, actual: String },
79 #[error("node `{0}` in snapshot not found in graph namespace")]
80 UnknownNode(String),
81 #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
82 UnknownSubgraph(String),
83 #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
84 UnresolvableDeps(String, Vec<String>),
85 #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
86 MissingFactory(String, String),
87 /// D279 (2026-05-22, E-ii.1): a state node in the snapshot collides
88 /// with an existing child mount name on the owner graph at decode
89 /// time. Raised by Pass 1's pre-validation BEFORE any Core mutation
90 /// — prevents the orphan-`NodeId` leak that pre-D279 occurred when
91 /// `Graph::state` registered a `NodeId` before the namespace `add`
92 /// returned `NameError::Collision`. `graph_path` is the owner
93 /// graph's tree-relative path (empty string for the root).
94 #[error("snapshot decode: state node `{name}` at graph `{graph_path}` collides with an existing child mount of the same name")]
95 NameCollision { name: String, graph_path: String },
96}
97
98/// Factory for auto-hydration mode. D246: receives the embedder's
99/// `&Core` + the Core-free [`Graph`] handle.
100pub type NodeFactory =
101 Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
102
103/// Builder function for `Graph::from_snapshot` builder mode (D246:
104/// `&Core` + Core-free [`Graph`]).
105pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
106
107/// D246: recursive snapshot over `(&dyn CoreFull, &Rc<RefCell<GraphInner>>)`
108/// — `&dyn CoreFull` (the one facade) so the storage in-wave
109/// `MailboxOp::Defer` observe-sink can run it (read-only;
110/// `serialize_handle` delegates to the binding).
111///
112/// **D276 (cross-mount deps):** the encoder pre-computes a tree-wide
113/// `id_to_tree_path: HashMap<NodeId, String>` covering every named
114/// node in the snapshot tree. For each dep the encoder emits an
115/// **owner-relative path** using [`PATH_SEP`] (`"::"`) and `".."`
116/// segments — the same syntax accepted by [`Graph::try_resolve`]:
117///
118/// - same-graph dep → bare local name (back-compat with snapshots
119/// produced by callers that never crossed a mount boundary).
120/// - cross-mount dep down → `"child::name"` / `"child::nested::name"`.
121/// - cross-mount dep up → `"..::name"`, `"..::..::name"`, …
122/// - cross-mount sibling → `"..::sibling::name"`.
123///
124/// Pre-D276 the encoder fell through to `"_anon_<rawid>"` for any
125/// dep whose `NodeId` wasn't in the LOCAL graph's `names` map,
126/// destroying every cross-mount reference at serialization time.
127/// The new owner-relative encoding round-trips through
128/// `Graph::from_snapshot`'s tree-wide hydration; the decoder
129/// resolves dep names via [`Graph::try_resolve`] on the owner graph,
130/// reusing Slice V3's cross-subgraph path machinery.
131pub(crate) fn snapshot_of(
132 core: &dyn CoreFull,
133 inner_arc: &Rc<RefCell<GraphInner>>,
134) -> GraphPersistSnapshot {
135 let id_to_tree_path = build_id_to_tree_path(inner_arc);
136 snapshot_of_with_tree_paths(core, inner_arc, &id_to_tree_path, "")
137}
138
139/// D276: walk the entire mount tree under `root` and build a
140/// `NodeId → absolute_path` map. Absolute paths are **relative to
141/// the snapshot root** (no leading root name) and use [`PATH_SEP`]
142/// (`"::"`) — the same syntax accepted by [`Graph::try_resolve`].
143fn build_id_to_tree_path(root: &Rc<RefCell<GraphInner>>) -> HashMap<NodeId, String> {
144 let mut map = HashMap::new();
145 walk_tree_paths(root, "", &mut map);
146 map
147}
148
149fn walk_tree_paths(
150 inner_arc: &Rc<RefCell<GraphInner>>,
151 path_prefix: &str,
152 out: &mut HashMap<NodeId, String>,
153) {
154 let inner = inner_arc.borrow_mut();
155 let names: Vec<(String, NodeId)> = inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
156 let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
157 .children
158 .iter()
159 .map(|(n, g)| (n.clone(), g.clone()))
160 .collect();
161 drop(inner);
162 for (name, id) in &names {
163 let abs_path = if path_prefix.is_empty() {
164 name.clone()
165 } else {
166 format!("{path_prefix}{PATH_SEP}{name}")
167 };
168 out.insert(*id, abs_path);
169 }
170 for (child_name, child_inner) in &children {
171 let child_prefix = if path_prefix.is_empty() {
172 child_name.clone()
173 } else {
174 format!("{path_prefix}{PATH_SEP}{child_name}")
175 };
176 walk_tree_paths(child_inner, &child_prefix, out);
177 }
178}
179
180/// D276 helper — convert a dep's absolute path to a path relative
181/// to the owner graph. Same-graph deps collapse to a bare local
182/// name (back-compat); cross-mount deps use `".."`/`"name"`
183/// segments separated by [`PATH_SEP`].
184///
185/// Examples (`owner_path` → `abs_path` → result):
186///
187/// - `""` → `"a"` → `"a"` (root local)
188/// - `"child"` → `"child::b"` → `"b"` (same-graph local)
189/// - `""` → `"child::b"` → `"child::b"` (descend)
190/// - `"child"` → `"a"` → `"..::a"` (ascend to root)
191/// - `"child::nested"` → `"a"` → `"..::..::a"` (ascend two)
192/// - `"child::a"` → `"other::b"` → `"..::other::b"` (sibling)
193fn absolute_to_owner_relative(owner_path: &str, abs_path: &str) -> String {
194 // /qa G2.7 (2026-05-22): self-dep guard. graphrefly rejects self-deps at
195 // registration (`SetDepsError::SelfDep`), so a healthy `Core::deps_of`
196 // never yields the owning node's own id. If a snapshot is hand-
197 // constructed pathologically OR an operator-internal NodeId is reused
198 // structurally as both owner and dep, computing `owner_path == abs_path`
199 // here would emit `""` — which `Graph::try_resolve("")` rejects with
200 // `PathError::Empty`, eventually surfacing as `UnresolvableDeps` from
201 // the decode retry loop. Catch it loudly at encode in debug builds.
202 debug_assert_ne!(
203 owner_path, abs_path,
204 "D276 invariant: self-deps are rejected at registration; \
205 encoding a dep whose absolute path equals the owner's would emit \
206 an empty relative path"
207 );
208 let owner_segs: Vec<&str> = if owner_path.is_empty() {
209 Vec::new()
210 } else {
211 owner_path.split(PATH_SEP).collect()
212 };
213 let abs_segs: Vec<&str> = if abs_path.is_empty() {
214 Vec::new()
215 } else {
216 abs_path.split(PATH_SEP).collect()
217 };
218 let mut common = 0;
219 while common < owner_segs.len()
220 && common < abs_segs.len()
221 && owner_segs[common] == abs_segs[common]
222 {
223 common += 1;
224 }
225 let up_count = owner_segs.len() - common;
226 let down_segs = &abs_segs[common..];
227 if up_count == 0 {
228 return down_segs.join(PATH_SEP);
229 }
230 let mut parts: Vec<&str> = vec![".."; up_count];
231 parts.extend(down_segs);
232 parts.join(PATH_SEP)
233}
234
235fn snapshot_of_with_tree_paths(
236 core: &dyn CoreFull,
237 inner_arc: &Rc<RefCell<GraphInner>>,
238 id_to_tree_path: &HashMap<NodeId, String>,
239 owner_path: &str,
240) -> GraphPersistSnapshot {
241 let (name, node_entries, children, id_to_name) = {
242 let inner = inner_arc.borrow_mut();
243 let name = inner.name.clone();
244 let node_entries: Vec<(String, NodeId)> =
245 inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
246 let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
247 .children
248 .iter()
249 .map(|(n, g)| (n.clone(), g.clone()))
250 .collect();
251 let id_to_name: IndexMap<NodeId, String> =
252 inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
253 (name, node_entries, children, id_to_name)
254 };
255
256 let mut nodes = IndexMap::new();
257
258 for (node_name, node_id) in &node_entries {
259 let kind = core.kind_of(*node_id);
260 let node_type = match kind {
261 Some(NodeKind::State) => "state",
262 Some(NodeKind::Derived) => "derived",
263 Some(NodeKind::Dynamic) => "dynamic",
264 Some(NodeKind::Producer) => "producer",
265 Some(NodeKind::Operator(_)) => "operator",
266 None => "unknown",
267 };
268
269 let cache = core.cache_of(*node_id);
270 let value = if cache == NO_HANDLE {
271 None
272 } else {
273 core.serialize_handle(cache)
274 };
275
276 let terminal = core.is_terminal(*node_id);
277 let status = match terminal {
278 Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
279 Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
280 error: core.serialize_handle(err_handle),
281 },
282 None => {
283 if core.has_fired_once(*node_id) || cache != NO_HANDLE {
284 NodeSnapshotStatus::Live
285 } else {
286 NodeSnapshotStatus::Sentinel
287 }
288 }
289 };
290
291 // D276: dep-name encoding — 3 tiers, all owner-relative:
292 // (1) same-graph dep → bare local name (back-compat — pre-D276
293 // snapshots used this shape exclusively).
294 // (2) cross-mount dep that IS named somewhere in the snapshot
295 // tree → owner-relative path via [`PATH_SEP`] + `".."`
296 // segments (resolves via `Graph::try_resolve` on decode).
297 // (3) anonymous dep (operator-internal NodeId with no name in
298 // ANY graph) → `_anon_<rawid>` fallback (pre-D276 behavior,
299 // unchanged; decode still fails with `UnresolvableDeps`
300 // for these — not in M4.E1 scope).
301 //
302 // D301 B.b (Q4 sub-decision, 2026-05-26): persistence-vs-
303 // presentation distinction — snapshot encode KEEPS the
304 // `_anon_<rawid>` marker while describe (`describe.rs:229`,
305 // `graph.rs:1055`) converges to empty-string for TS parity.
306 // Rationale: the marker carries real consumer value at decode
307 // time. `SnapshotError::UnresolvableDeps` (snapshot.rs:84)
308 // formats as `"unresolvable deps for node `{0}` (deps: {1:?})"`
309 // — `{1:?}` is Debug-format of `Vec<String>`, preserving each
310 // `_anon_<rawid>` verbatim. Converging snapshot to `""` would
311 // degrade the diagnostic to `(deps: ["", ""])` — indistinguish-
312 // able collisions for multiple unresolvable anon deps.
313 // Describe is a presentation surface (cross-arm wire-shape
314 // parity matters more than per-NodeId disambiguation); snapshot
315 // is a persistence surface (decode-time diagnostic fidelity
316 // matters more than wire-shape parity to TS — which has its
317 // own analogous gap on the TS-snapshot side).
318 let dep_ids = core.deps_of(*node_id);
319 let deps: Vec<String> = dep_ids
320 .iter()
321 .map(|dep_id| {
322 if let Some(local_name) = id_to_name.get(dep_id) {
323 local_name.clone()
324 } else if let Some(tree_path) = id_to_tree_path.get(dep_id) {
325 absolute_to_owner_relative(owner_path, tree_path)
326 } else {
327 format!("_anon_{}", dep_id.raw())
328 }
329 })
330 .collect();
331
332 nodes.insert(
333 node_name.clone(),
334 NodeSlice {
335 node_type: node_type.to_owned(),
336 value,
337 status,
338 deps,
339 },
340 );
341 }
342
343 let mut subgraphs = IndexMap::new();
344 for (child_name, child_inner) in children {
345 let child_owner_path = if owner_path.is_empty() {
346 child_name.clone()
347 } else {
348 format!("{owner_path}{PATH_SEP}{child_name}")
349 };
350 subgraphs.insert(
351 child_name,
352 snapshot_of_with_tree_paths(core, &child_inner, id_to_tree_path, &child_owner_path),
353 );
354 }
355
356 GraphPersistSnapshot {
357 name,
358 nodes,
359 subgraphs,
360 }
361}
362
363/// Recursive restore over `(&Core, &Rc<RefCell<GraphInner>>)`.
364fn restore_into(
365 core: &Core,
366 inner_arc: &Rc<RefCell<GraphInner>>,
367 snapshot: &GraphPersistSnapshot,
368) -> Result<(), SnapshotError> {
369 let graph_name = inner_arc.borrow_mut().name.clone();
370 if snapshot.name != graph_name {
371 return Err(SnapshotError::NameMismatch {
372 expected: snapshot.name.clone(),
373 actual: graph_name,
374 });
375 }
376
377 let binding = core.binding_ptr();
378
379 for (node_name, slice) in &snapshot.nodes {
380 let node_id = resolve_checked(inner_arc, node_name)
381 .ok()
382 .flatten()
383 .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
384
385 if slice.node_type == "state" {
386 if let Some(ref value) = slice.value {
387 let handle = binding.deserialize_value(value.clone());
388 core.emit(node_id, handle);
389 }
390 }
391
392 match &slice.status {
393 NodeSnapshotStatus::Completed => {
394 core.complete(node_id);
395 }
396 NodeSnapshotStatus::Errored { error } => {
397 if let Some(err_val) = error {
398 let err_handle = binding.deserialize_value(err_val.clone());
399 core.error(node_id, err_handle);
400 }
401 }
402 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
403 }
404 }
405
406 let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
407 let inner = inner_arc.borrow_mut();
408 snapshot
409 .subgraphs
410 .keys()
411 .map(|name| {
412 let child = inner
413 .children
414 .get(name)
415 .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
416 Ok((name.clone(), child.clone()))
417 })
418 .collect::<Result<Vec<_>, SnapshotError>>()?
419 };
420 for (child_name, child_inner) in child_pairs {
421 restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
422 }
423
424 Ok(())
425}
426
427impl Graph {
428 /// Serialize this graph's state into a portable snapshot.
429 ///
430 /// # Concurrent-mutation caveat (torn read; M4.E1 / D167)
431 ///
432 /// `snapshot()` is a **point-in-time best-effort capture**, not an
433 /// isolated read. The implementation holds the graph's inner lock
434 /// for the namespace walk (collect names + child mounts), then
435 /// drops it before per-node `core.cache_of` / `core.is_terminal`
436 /// queries. If another thread (or a re-entrant wave) mutates state
437 /// during the post-walk phase, the snapshot may capture a mix of
438 /// pre- and post-mutation values for different nodes — individual
439 /// node slices are internally consistent, but the cross-node
440 /// composition is not transaction-isolated.
441 ///
442 /// The TS impl has the same semantics. No user has requested
443 /// snapshot-level isolation. If you need a consistent cross-node
444 /// view, the supported pattern is:
445 ///
446 /// - Wrap the snapshot call in [`Core::batch`] (drains the wave
447 /// before `snapshot()` returns; subsequent emissions wait).
448 /// - OR call `graph.signal(SignalKind::Pause(lock))` first, then
449 /// `snapshot()`, then `Resume(lock)` — explicitly freezes the
450 /// reactive layer for the duration.
451 ///
452 /// A future copy-on-write epoch / snapshot-under-lock would close
453 /// the torn-read window at the cost of holding the Core lock for
454 /// the full serialization walk; gated on D196 consumer-pressure
455 /// (no scenario today justifies the lock-contention trade).
456 #[must_use]
457 pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
458 snapshot_of(core, &self.inner)
459 }
460
461 /// [`Self::snapshot`] over the one object-safe facade (D246 rule 5)
462 /// — for the storage in-wave `MailboxOp::Defer(|cf: &dyn CoreFull|)`
463 /// path, which only has a `&dyn CoreFull` (not a concrete `&Core`).
464 /// Read-only; `serialize_handle` delegates to the binding.
465 ///
466 /// Inherits the same concurrent-mutation caveat as [`Self::snapshot`].
467 #[must_use]
468 pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
469 snapshot_of(core, &self.inner)
470 }
471
472 /// Restore state from a snapshot into this existing graph.
473 ///
474 /// # Errors
475 /// `NameMismatch` if names differ; `UnknownNode`/`UnknownSubgraph`
476 /// for snapshot entries absent from the graph.
477 pub fn restore(
478 &self,
479 core: &Core,
480 snapshot: &GraphPersistSnapshot,
481 ) -> Result<(), SnapshotError> {
482 restore_into(core, &self.inner, snapshot)
483 }
484
485 /// Reconstruct a graph from a snapshot. **Builder mode**
486 /// (`builder = Some`): build topology then `restore()` values.
487 /// **Auto-hydration** (`builder = None`): reconstruct topology +
488 /// state from the snapshot via `factories` (state nodes need none).
489 ///
490 /// D246: the embedder owns the `Core` (see
491 /// [`graphrefly_core::OwnedCore`]) and passes it in; the binding is
492 /// `core.binding_ptr()`.
493 ///
494 /// # Errors
495 /// `UnresolvableDeps` if auto-hydration can't resolve a node's
496 /// deps; `MissingFactory` for a non-state node type with no factory.
497 pub fn from_snapshot(
498 core: &Core,
499 snapshot: &GraphPersistSnapshot,
500 builder: Option<SnapshotBuilder>,
501 factories: Option<IndexMap<String, NodeFactory>>,
502 ) -> Result<Self, SnapshotError> {
503 let graph = Graph::new(&snapshot.name);
504 let binding: Arc<dyn BindingBoundary> = core.binding();
505
506 if let Some(build_fn) = builder {
507 build_fn(core, &graph);
508 graph.restore(core, snapshot)?;
509 return Ok(graph);
510 }
511
512 let factories = factories.unwrap_or_default();
513
514 // D276 tree-wide hydration — replaces the pre-D276 single-pass
515 // per-graph `hydrate_subgraph` / `hydrate_nodes` recursion.
516 // Four passes:
517 //
518 // Pass 0 — mount tree: recursively `mount_new` every
519 // subgraph; record `(absolute_path → Graph)` so
520 // later passes can address each owning graph.
521 // Pass 1 — state-first: walk tree creating ALL state nodes
522 // (no deps to resolve). Each state node is
523 // registered in its owner graph's `names`, so
524 // subsequent passes can locate it via
525 // [`Graph::try_resolve`].
526 // Pass 2 — derived: collect ALL non-state nodes across the
527 // tree into one queue. Run ONE shared retry loop;
528 // dep names resolve via the owner graph's
529 // `try_resolve`, which natively handles owner-
530 // relative paths with `".."` and `"::"` segments
531 // (Slice V3 cross-subgraph path machinery).
532 // Pass 3 — status restore: walk tree, apply
533 // `Completed` / `Errored` via the same
534 // `try_resolve` lookup.
535 //
536 // Back-compat: snapshots produced by callers that never
537 // crossed a mount boundary use bare local names only; those
538 // resolve identically to the pre-D276 flat lookup (and
539 // identically to a one-segment `try_resolve` on the owner).
540 //
541 // See `~/src/graphrefly-ts/docs/rust-port-decisions.md` D276
542 // and the `docs/porting-deferred.md` M4.E1 closure block.
543
544 // Pass 0 — mount tree.
545 let mut graph_map: IndexMap<String, Graph> = IndexMap::new();
546 graph_map.insert(String::new(), graph.clone());
547 mount_subgraphs_recursive(core, &graph, snapshot, "", &mut graph_map)?;
548
549 // Pass 1 — state nodes tree-wide.
550 create_state_nodes_recursive(core, snapshot, "", &graph_map, &binding)?;
551
552 // Pass 2 — derived nodes tree-wide with shared retry loop.
553 let mut derived_queue: Vec<DerivedEntry> = Vec::new();
554 collect_derived_recursive(snapshot, "", &graph_map, &mut derived_queue);
555 create_derived_with_retry(core, &factories, derived_queue)?;
556
557 // Pass 3 — status restore.
558 apply_status_recursive(core, snapshot, "", &graph_map, &binding)?;
559
560 Ok(graph)
561 }
562}
563
564/// D276 auto-hydration: a non-state node awaiting derivation in
565/// Pass 2's shared retry loop. The owner graph is captured so the
566/// retry loop can call `owner_graph.try_resolve(dep_name)` against
567/// the right namespace (owner-relative paths walk via the owner's
568/// parent/child references in [`crate::graph::resolve_checked`]).
569struct DerivedEntry {
570 owner_graph: Graph,
571 name: String,
572 slice: NodeSlice,
573}
574
575/// D276 Pass 0 — mount every subgraph under `parent` and accumulate
576/// each subgraph's absolute path → [`Graph`] handle into `graph_map`.
577fn mount_subgraphs_recursive(
578 core: &Core,
579 parent: &Graph,
580 snap: &GraphPersistSnapshot,
581 parent_path: &str,
582 graph_map: &mut IndexMap<String, Graph>,
583) -> Result<(), SnapshotError> {
584 for (child_name, child_snap) in &snap.subgraphs {
585 let child_graph = parent
586 .mount_new(core, child_name)
587 .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
588 let child_path = if parent_path.is_empty() {
589 child_name.clone()
590 } else {
591 format!("{parent_path}{PATH_SEP}{child_name}")
592 };
593 graph_map.insert(child_path.clone(), child_graph.clone());
594 mount_subgraphs_recursive(core, &child_graph, child_snap, &child_path, graph_map)?;
595 }
596 Ok(())
597}
598
599/// D276 Pass 1 — recursively create every state node in the tree
600/// (state nodes have no deps to resolve). Each new state node is
601/// registered in its owner graph's `names`; Pass 2/3 locate it via
602/// [`Graph::try_resolve`].
603fn create_state_nodes_recursive(
604 core: &Core,
605 snap: &GraphPersistSnapshot,
606 owner_path: &str,
607 graph_map: &IndexMap<String, Graph>,
608 binding: &Arc<dyn BindingBoundary>,
609) -> Result<(), SnapshotError> {
610 let owner_graph = graph_map
611 .get(owner_path)
612 .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
613
614 // D279 (2026-05-22, E-ii.1): pre-validate every state name against the
615 // owner graph's mount tree BEFORE any `register_state` call. Pre-D279,
616 // `Graph::state` called `core.register_state(...)` first (allocating a
617 // fresh NodeId + cache retention) THEN `add(name, ...)` — a name
618 // collision against a child mount populated by Pass 0 left an orphan
619 // NodeId in Core's registry with no path to teardown, and the surfaced
620 // error (`UnknownNode` via `map_err(|_| ...)`) discarded the real
621 // collision cause. Pre-validation closes both bugs at once: zero Core
622 // mutation on a doomed restore, and a dedicated `NameCollision`
623 // diagnostic.
624 let child_mount_names: std::collections::HashSet<String> =
625 owner_graph.child_names().into_iter().collect();
626 for (name, slice) in &snap.nodes {
627 if slice.node_type == "state" && child_mount_names.contains(name) {
628 return Err(SnapshotError::NameCollision {
629 name: name.clone(),
630 graph_path: owner_path.to_owned(),
631 });
632 }
633 }
634
635 for (name, slice) in &snap.nodes {
636 if slice.node_type == "state" {
637 let initial = slice
638 .value
639 .as_ref()
640 .map(|v| binding.deserialize_value(v.clone()));
641 owner_graph
642 .state(core, name, initial)
643 .map_err(|_| SnapshotError::UnknownNode(name.clone()))?;
644 }
645 }
646 for (child_name, child_snap) in &snap.subgraphs {
647 let child_path = if owner_path.is_empty() {
648 child_name.clone()
649 } else {
650 format!("{owner_path}{PATH_SEP}{child_name}")
651 };
652 create_state_nodes_recursive(core, child_snap, &child_path, graph_map, binding)?;
653 }
654 Ok(())
655}
656
657/// D276 Pass 2a — walk the entire tree collecting non-state nodes
658/// into one flat queue tagged with their owner graph + path. The
659/// queue is then run through [`create_derived_with_retry`] which
660/// can resolve deps that cross mount boundaries.
661fn collect_derived_recursive(
662 snap: &GraphPersistSnapshot,
663 owner_path: &str,
664 graph_map: &IndexMap<String, Graph>,
665 out: &mut Vec<DerivedEntry>,
666) {
667 let owner_graph = graph_map
668 .get(owner_path)
669 .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
670 for (name, slice) in &snap.nodes {
671 if slice.node_type != "state" {
672 out.push(DerivedEntry {
673 owner_graph: owner_graph.clone(),
674 name: name.clone(),
675 slice: slice.clone(),
676 });
677 }
678 }
679 for (child_name, child_snap) in &snap.subgraphs {
680 let child_path = if owner_path.is_empty() {
681 child_name.clone()
682 } else {
683 format!("{owner_path}{PATH_SEP}{child_name}")
684 };
685 collect_derived_recursive(child_snap, &child_path, graph_map, out);
686 }
687}
688
689/// D276 Pass 2b — tree-wide retry loop for derived/dynamic/operator
690/// nodes. Each iteration resolves an entry's deps via the owner
691/// graph's [`Graph::try_resolve`], which natively understands the
692/// owner-relative path syntax emitted by the D276 encoder:
693///
694/// - bare name → local lookup in owner's `names` (pre-D276 shape).
695/// - `"child::name"` → descend into a mounted subgraph.
696/// - `"..::name"` → walk to parent.
697/// - `"..::sibling::name"` → walk to parent then descend into a sibling.
698///
699/// Loop terminates when (a) all entries created — success — or
700/// (b) one pass made no progress — `UnresolvableDeps` on the first
701/// stuck entry.
702fn create_derived_with_retry(
703 core: &Core,
704 factories: &IndexMap<String, NodeFactory>,
705 entries: Vec<DerivedEntry>,
706) -> Result<(), SnapshotError> {
707 let mut remaining = entries;
708 loop {
709 let before = remaining.len();
710 let mut still_remaining = Vec::new();
711
712 for entry in remaining {
713 let mut resolved = Vec::with_capacity(entry.slice.deps.len());
714 let mut all_ok = true;
715 for dep_name in &entry.slice.deps {
716 if let Some(dep_id) = entry.owner_graph.try_resolve(dep_name) {
717 resolved.push(dep_id);
718 } else {
719 all_ok = false;
720 break;
721 }
722 }
723 if all_ok {
724 let factory = factories.get(&entry.slice.node_type).ok_or_else(|| {
725 SnapshotError::MissingFactory(entry.slice.node_type.clone(), entry.name.clone())
726 })?;
727 factory(
728 core,
729 &entry.owner_graph,
730 &entry.name,
731 &entry.slice,
732 &resolved,
733 )?;
734 } else {
735 still_remaining.push(entry);
736 }
737 }
738
739 remaining = still_remaining;
740 if remaining.is_empty() {
741 break;
742 }
743 if remaining.len() == before {
744 let entry = &remaining[0];
745 return Err(SnapshotError::UnresolvableDeps(
746 entry.name.clone(),
747 entry.slice.deps.clone(),
748 ));
749 }
750 }
751 Ok(())
752}
753
754/// D276 Pass 3 — recursively apply each node's snapshot status
755/// (Completed / Errored) via the owner graph's [`Graph::try_resolve`].
756/// Sentinel / Live are no-ops (state nodes already received their
757/// cache during Pass 1's `Graph::state` initial-value path; derived
758/// recompute on first subscribe).
759fn apply_status_recursive(
760 core: &Core,
761 snap: &GraphPersistSnapshot,
762 owner_path: &str,
763 graph_map: &IndexMap<String, Graph>,
764 binding: &Arc<dyn BindingBoundary>,
765) -> Result<(), SnapshotError> {
766 let owner_graph = graph_map
767 .get(owner_path)
768 .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
769 for (name, slice) in &snap.nodes {
770 // /qa G2.1 (2026-05-22): mirror `restore_into:339` — a node listed
771 // in the snapshot that doesn't resolve via `try_resolve` is a
772 // structural inconsistency (Pass 1/Pass 2 should have created it).
773 // Silently dropping the status would mask a Pass-2 hydration bug
774 // as "successful restore with corrupted lifecycle state."
775 let node_id = owner_graph
776 .try_resolve(name)
777 .ok_or_else(|| SnapshotError::UnknownNode(name.clone()))?;
778 match &slice.status {
779 NodeSnapshotStatus::Completed => {
780 owner_graph.complete(core, node_id);
781 }
782 NodeSnapshotStatus::Errored { error } => {
783 if let Some(err_val) = error {
784 let err_handle = binding.deserialize_value(err_val.clone());
785 owner_graph.error(core, node_id, err_handle);
786 }
787 }
788 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
789 }
790 }
791 for (child_name, child_snap) in &snap.subgraphs {
792 let child_path = if owner_path.is_empty() {
793 child_name.clone()
794 } else {
795 format!("{owner_path}{PATH_SEP}{child_name}")
796 };
797 apply_status_recursive(core, child_snap, &child_path, graph_map, binding)?;
798 }
799 Ok(())
800}