graphrefly_graph/snapshot.rs
1//! `snapshot()` / `restore()` / `Graph::from_snapshot()` — portable
2//! serialization of graph state (M4.E1, R3.8).
3//!
4//! D246: `snapshot`/`restore`/`from_snapshot` are inherent [`Graph`]
5//! methods over the Core-free namespace tree, taking the embedder's
6//! `&Core` explicitly (D246 rule 2). `snapshot_of` is generic over
7//! `&dyn CoreFull` (the one facade) so the storage in-wave
8//! `MailboxOp::Defer` observe-sink can run it (read-only;
9//! `serialize_handle` delegates to the binding). No `SubgraphRef`/
10//! `GraphOps`/`SnapshotOps` — one `Graph`, plain free fns.
11//!
12//! # Handle-protocol boundary
13//!
14//! `snapshot()` calls `BindingBoundary::serialize_handle`; `restore()`
15//! / `from_snapshot()` call `BindingBoundary::deserialize_value`.
16//! Per D169 edges are omitted (derived from deps via `edges()`).
17
18use std::cell::RefCell;
19use std::collections::HashMap;
20use std::rc::Rc;
21use std::sync::Arc;
22
23use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
24use indexmap::IndexMap;
25use serde::{Deserialize, Serialize};
26
27use crate::graph::{resolve_checked, Graph, GraphInner, PATH_SEP};
28
29/// Portable snapshot of a graph's state.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct GraphPersistSnapshot {
32 /// Graph name as set at construction / mount.
33 pub name: String,
34 /// Per-node state by local name, in namespace insertion order.
35 pub nodes: IndexMap<String, NodeSlice>,
36 /// Mounted subgraph snapshots, keyed by mount name.
37 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
38 pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
39}
40
41/// Per-node state within a snapshot.
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct NodeSlice {
44 /// `"state"` / `"derived"` / `"dynamic"` / `"producer"` / `"operator"`.
45 #[serde(rename = "type")]
46 pub node_type: String,
47 /// Serialized cache value. `None` when sentinel.
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub value: Option<serde_json::Value>,
50 /// Node lifecycle status.
51 pub status: NodeSnapshotStatus,
52 /// Dependency names in declaration order.
53 #[serde(default, skip_serializing_if = "Vec::is_empty")]
54 pub deps: Vec<String>,
55}
56
57/// Lifecycle status stored in a snapshot.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
59#[serde(rename_all = "lowercase")]
60pub enum NodeSnapshotStatus {
61 /// Never emitted DATA.
62 Sentinel,
63 /// Has emitted at least one DATA.
64 Live,
65 /// Terminal: COMPLETE.
66 Completed,
67 /// Terminal: ERROR (carries the serialized error value).
68 Errored {
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 error: Option<serde_json::Value>,
71 },
72}
73
74/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
75#[derive(Debug, thiserror::Error)]
76pub enum SnapshotError {
77 #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
78 NameMismatch { expected: String, actual: String },
79 #[error("node `{0}` in snapshot not found in graph namespace")]
80 UnknownNode(String),
81 #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
82 UnknownSubgraph(String),
83 #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
84 UnresolvableDeps(String, Vec<String>),
85 #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
86 MissingFactory(String, String),
87 /// D279 (2026-05-22, E-ii.1): a state node in the snapshot collides
88 /// with an existing child mount name on the owner graph at decode
89 /// time. Raised by Pass 1's pre-validation BEFORE any Core mutation
90 /// — prevents the orphan-`NodeId` leak that pre-D279 occurred when
91 /// `Graph::state` registered a `NodeId` before the namespace `add`
92 /// returned `NameError::Collision`. `graph_path` is the owner
93 /// graph's tree-relative path (empty string for the root).
94 #[error("snapshot decode: state node `{name}` at graph `{graph_path}` collides with an existing child mount of the same name")]
95 NameCollision { name: String, graph_path: String },
96}
97
98/// Factory for auto-hydration mode. D246: receives the embedder's
99/// `&Core` + the Core-free [`Graph`] handle.
100pub type NodeFactory =
101 Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
102
103/// Builder function for `Graph::from_snapshot` builder mode (D246:
104/// `&Core` + Core-free [`Graph`]).
105pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
106
107/// D246: recursive snapshot over `(&dyn CoreFull, &Rc<RefCell<GraphInner>>)`
108/// — `&dyn CoreFull` (the one facade) so the storage in-wave
109/// `MailboxOp::Defer` observe-sink can run it (read-only;
110/// `serialize_handle` delegates to the binding).
111///
112/// **D276 (cross-mount deps):** the encoder pre-computes a tree-wide
113/// `id_to_tree_path: HashMap<NodeId, String>` covering every named
114/// node in the snapshot tree. For each dep the encoder emits an
115/// **owner-relative path** using [`PATH_SEP`] (`"::"`) and `".."`
116/// segments — the same syntax accepted by [`Graph::try_resolve`]:
117///
118/// - same-graph dep → bare local name (back-compat with snapshots
119/// produced by callers that never crossed a mount boundary).
120/// - cross-mount dep down → `"child::name"` / `"child::nested::name"`.
121/// - cross-mount dep up → `"..::name"`, `"..::..::name"`, …
122/// - cross-mount sibling → `"..::sibling::name"`.
123///
124/// Pre-D276 the encoder fell through to `"_anon_<rawid>"` for any
125/// dep whose `NodeId` wasn't in the LOCAL graph's `names` map,
126/// destroying every cross-mount reference at serialization time.
127/// The new owner-relative encoding round-trips through
128/// `Graph::from_snapshot`'s tree-wide hydration; the decoder
129/// resolves dep names via [`Graph::try_resolve`] on the owner graph,
130/// reusing Slice V3's cross-subgraph path machinery.
131pub(crate) fn snapshot_of(
132 core: &dyn CoreFull,
133 inner_arc: &Rc<RefCell<GraphInner>>,
134) -> GraphPersistSnapshot {
135 let id_to_tree_path = build_id_to_tree_path(inner_arc);
136 snapshot_of_with_tree_paths(core, inner_arc, &id_to_tree_path, "")
137}
138
139/// D276: walk the entire mount tree under `root` and build a
140/// `NodeId → absolute_path` map. Absolute paths are **relative to
141/// the snapshot root** (no leading root name) and use [`PATH_SEP`]
142/// (`"::"`) — the same syntax accepted by [`Graph::try_resolve`].
143fn build_id_to_tree_path(root: &Rc<RefCell<GraphInner>>) -> HashMap<NodeId, String> {
144 let mut map = HashMap::new();
145 walk_tree_paths(root, "", &mut map);
146 map
147}
148
149fn walk_tree_paths(
150 inner_arc: &Rc<RefCell<GraphInner>>,
151 path_prefix: &str,
152 out: &mut HashMap<NodeId, String>,
153) {
154 let inner = inner_arc.borrow_mut();
155 let names: Vec<(String, NodeId)> = inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
156 let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
157 .children
158 .iter()
159 .map(|(n, g)| (n.clone(), g.clone()))
160 .collect();
161 drop(inner);
162 for (name, id) in &names {
163 let abs_path = if path_prefix.is_empty() {
164 name.clone()
165 } else {
166 format!("{path_prefix}{PATH_SEP}{name}")
167 };
168 out.insert(*id, abs_path);
169 }
170 for (child_name, child_inner) in &children {
171 let child_prefix = if path_prefix.is_empty() {
172 child_name.clone()
173 } else {
174 format!("{path_prefix}{PATH_SEP}{child_name}")
175 };
176 walk_tree_paths(child_inner, &child_prefix, out);
177 }
178}
179
180/// D276 helper — convert a dep's absolute path to a path relative
181/// to the owner graph. Same-graph deps collapse to a bare local
182/// name (back-compat); cross-mount deps use `".."`/`"name"`
183/// segments separated by [`PATH_SEP`].
184///
185/// Examples (`owner_path` → `abs_path` → result):
186///
187/// - `""` → `"a"` → `"a"` (root local)
188/// - `"child"` → `"child::b"` → `"b"` (same-graph local)
189/// - `""` → `"child::b"` → `"child::b"` (descend)
190/// - `"child"` → `"a"` → `"..::a"` (ascend to root)
191/// - `"child::nested"` → `"a"` → `"..::..::a"` (ascend two)
192/// - `"child::a"` → `"other::b"` → `"..::other::b"` (sibling)
193fn absolute_to_owner_relative(owner_path: &str, abs_path: &str) -> String {
194 // /qa G2.7 (2026-05-22): self-dep guard. graphrefly rejects self-deps at
195 // registration (`SetDepsError::SelfDep`), so a healthy `Core::deps_of`
196 // never yields the owning node's own id. If a snapshot is hand-
197 // constructed pathologically OR an operator-internal NodeId is reused
198 // structurally as both owner and dep, computing `owner_path == abs_path`
199 // here would emit `""` — which `Graph::try_resolve("")` rejects with
200 // `PathError::Empty`, eventually surfacing as `UnresolvableDeps` from
201 // the decode retry loop. Catch it loudly at encode in debug builds.
202 debug_assert_ne!(
203 owner_path, abs_path,
204 "D276 invariant: self-deps are rejected at registration; \
205 encoding a dep whose absolute path equals the owner's would emit \
206 an empty relative path"
207 );
208 let owner_segs: Vec<&str> = if owner_path.is_empty() {
209 Vec::new()
210 } else {
211 owner_path.split(PATH_SEP).collect()
212 };
213 let abs_segs: Vec<&str> = if abs_path.is_empty() {
214 Vec::new()
215 } else {
216 abs_path.split(PATH_SEP).collect()
217 };
218 let mut common = 0;
219 while common < owner_segs.len()
220 && common < abs_segs.len()
221 && owner_segs[common] == abs_segs[common]
222 {
223 common += 1;
224 }
225 let up_count = owner_segs.len() - common;
226 let down_segs = &abs_segs[common..];
227 if up_count == 0 {
228 return down_segs.join(PATH_SEP);
229 }
230 let mut parts: Vec<&str> = vec![".."; up_count];
231 parts.extend(down_segs);
232 parts.join(PATH_SEP)
233}
234
235fn snapshot_of_with_tree_paths(
236 core: &dyn CoreFull,
237 inner_arc: &Rc<RefCell<GraphInner>>,
238 id_to_tree_path: &HashMap<NodeId, String>,
239 owner_path: &str,
240) -> GraphPersistSnapshot {
241 let (name, node_entries, children, id_to_name) = {
242 let inner = inner_arc.borrow_mut();
243 let name = inner.name.clone();
244 let node_entries: Vec<(String, NodeId)> =
245 inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
246 let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
247 .children
248 .iter()
249 .map(|(n, g)| (n.clone(), g.clone()))
250 .collect();
251 let id_to_name: IndexMap<NodeId, String> =
252 inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
253 (name, node_entries, children, id_to_name)
254 };
255
256 let mut nodes = IndexMap::new();
257
258 for (node_name, node_id) in &node_entries {
259 let kind = core.kind_of(*node_id);
260 let node_type = match kind {
261 Some(NodeKind::State) => "state",
262 Some(NodeKind::Derived) => "derived",
263 Some(NodeKind::Dynamic) => "dynamic",
264 Some(NodeKind::Producer) => "producer",
265 Some(NodeKind::Operator(_)) => "operator",
266 None => "unknown",
267 };
268
269 let cache = core.cache_of(*node_id);
270 let value = if cache == NO_HANDLE {
271 None
272 } else {
273 core.serialize_handle(cache)
274 };
275
276 let terminal = core.is_terminal(*node_id);
277 let status = match terminal {
278 Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
279 Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
280 error: core.serialize_handle(err_handle),
281 },
282 None => {
283 if core.has_fired_once(*node_id) || cache != NO_HANDLE {
284 NodeSnapshotStatus::Live
285 } else {
286 NodeSnapshotStatus::Sentinel
287 }
288 }
289 };
290
291 // D276: dep-name encoding — 3 tiers, all owner-relative:
292 // (1) same-graph dep → bare local name (back-compat — pre-D276
293 // snapshots used this shape exclusively).
294 // (2) cross-mount dep that IS named somewhere in the snapshot
295 // tree → owner-relative path via [`PATH_SEP`] + `".."`
296 // segments (resolves via `Graph::try_resolve` on decode).
297 // (3) anonymous dep (operator-internal NodeId with no name in
298 // ANY graph) → `_anon_<rawid>` fallback (pre-D276 behavior,
299 // unchanged; decode still fails with `UnresolvableDeps`
300 // for these — not in M4.E1 scope).
301 let dep_ids = core.deps_of(*node_id);
302 let deps: Vec<String> = dep_ids
303 .iter()
304 .map(|dep_id| {
305 if let Some(local_name) = id_to_name.get(dep_id) {
306 local_name.clone()
307 } else if let Some(tree_path) = id_to_tree_path.get(dep_id) {
308 absolute_to_owner_relative(owner_path, tree_path)
309 } else {
310 format!("_anon_{}", dep_id.raw())
311 }
312 })
313 .collect();
314
315 nodes.insert(
316 node_name.clone(),
317 NodeSlice {
318 node_type: node_type.to_owned(),
319 value,
320 status,
321 deps,
322 },
323 );
324 }
325
326 let mut subgraphs = IndexMap::new();
327 for (child_name, child_inner) in children {
328 let child_owner_path = if owner_path.is_empty() {
329 child_name.clone()
330 } else {
331 format!("{owner_path}{PATH_SEP}{child_name}")
332 };
333 subgraphs.insert(
334 child_name,
335 snapshot_of_with_tree_paths(core, &child_inner, id_to_tree_path, &child_owner_path),
336 );
337 }
338
339 GraphPersistSnapshot {
340 name,
341 nodes,
342 subgraphs,
343 }
344}
345
346/// Recursive restore over `(&Core, &Rc<RefCell<GraphInner>>)`.
347fn restore_into(
348 core: &Core,
349 inner_arc: &Rc<RefCell<GraphInner>>,
350 snapshot: &GraphPersistSnapshot,
351) -> Result<(), SnapshotError> {
352 let graph_name = inner_arc.borrow_mut().name.clone();
353 if snapshot.name != graph_name {
354 return Err(SnapshotError::NameMismatch {
355 expected: snapshot.name.clone(),
356 actual: graph_name,
357 });
358 }
359
360 let binding = core.binding_ptr();
361
362 for (node_name, slice) in &snapshot.nodes {
363 let node_id = resolve_checked(inner_arc, node_name)
364 .ok()
365 .flatten()
366 .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
367
368 if slice.node_type == "state" {
369 if let Some(ref value) = slice.value {
370 let handle = binding.deserialize_value(value.clone());
371 core.emit(node_id, handle);
372 }
373 }
374
375 match &slice.status {
376 NodeSnapshotStatus::Completed => {
377 core.complete(node_id);
378 }
379 NodeSnapshotStatus::Errored { error } => {
380 if let Some(err_val) = error {
381 let err_handle = binding.deserialize_value(err_val.clone());
382 core.error(node_id, err_handle);
383 }
384 }
385 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
386 }
387 }
388
389 let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
390 let inner = inner_arc.borrow_mut();
391 snapshot
392 .subgraphs
393 .keys()
394 .map(|name| {
395 let child = inner
396 .children
397 .get(name)
398 .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
399 Ok((name.clone(), child.clone()))
400 })
401 .collect::<Result<Vec<_>, SnapshotError>>()?
402 };
403 for (child_name, child_inner) in child_pairs {
404 restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
405 }
406
407 Ok(())
408}
409
410impl Graph {
411 /// Serialize this graph's state into a portable snapshot.
412 ///
413 /// # Concurrent-mutation caveat (torn read; M4.E1 / D167)
414 ///
415 /// `snapshot()` is a **point-in-time best-effort capture**, not an
416 /// isolated read. The implementation holds the graph's inner lock
417 /// for the namespace walk (collect names + child mounts), then
418 /// drops it before per-node `core.cache_of` / `core.is_terminal`
419 /// queries. If another thread (or a re-entrant wave) mutates state
420 /// during the post-walk phase, the snapshot may capture a mix of
421 /// pre- and post-mutation values for different nodes — individual
422 /// node slices are internally consistent, but the cross-node
423 /// composition is not transaction-isolated.
424 ///
425 /// The TS impl has the same semantics. No user has requested
426 /// snapshot-level isolation. If you need a consistent cross-node
427 /// view, the supported pattern is:
428 ///
429 /// - Wrap the snapshot call in [`Core::batch`] (drains the wave
430 /// before `snapshot()` returns; subsequent emissions wait).
431 /// - OR call `graph.signal(SignalKind::Pause(lock))` first, then
432 /// `snapshot()`, then `Resume(lock)` — explicitly freezes the
433 /// reactive layer for the duration.
434 ///
435 /// A future copy-on-write epoch / snapshot-under-lock would close
436 /// the torn-read window at the cost of holding the Core lock for
437 /// the full serialization walk; gated on D196 consumer-pressure
438 /// (no scenario today justifies the lock-contention trade).
439 #[must_use]
440 pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
441 snapshot_of(core, &self.inner)
442 }
443
444 /// [`Self::snapshot`] over the one object-safe facade (D246 rule 5)
445 /// — for the storage in-wave `MailboxOp::Defer(|cf: &dyn CoreFull|)`
446 /// path, which only has a `&dyn CoreFull` (not a concrete `&Core`).
447 /// Read-only; `serialize_handle` delegates to the binding.
448 ///
449 /// Inherits the same concurrent-mutation caveat as [`Self::snapshot`].
450 #[must_use]
451 pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
452 snapshot_of(core, &self.inner)
453 }
454
455 /// Restore state from a snapshot into this existing graph.
456 ///
457 /// # Errors
458 /// `NameMismatch` if names differ; `UnknownNode`/`UnknownSubgraph`
459 /// for snapshot entries absent from the graph.
460 pub fn restore(
461 &self,
462 core: &Core,
463 snapshot: &GraphPersistSnapshot,
464 ) -> Result<(), SnapshotError> {
465 restore_into(core, &self.inner, snapshot)
466 }
467
468 /// Reconstruct a graph from a snapshot. **Builder mode**
469 /// (`builder = Some`): build topology then `restore()` values.
470 /// **Auto-hydration** (`builder = None`): reconstruct topology +
471 /// state from the snapshot via `factories` (state nodes need none).
472 ///
473 /// D246: the embedder owns the `Core` (see
474 /// [`graphrefly_core::OwnedCore`]) and passes it in; the binding is
475 /// `core.binding_ptr()`.
476 ///
477 /// # Errors
478 /// `UnresolvableDeps` if auto-hydration can't resolve a node's
479 /// deps; `MissingFactory` for a non-state node type with no factory.
480 pub fn from_snapshot(
481 core: &Core,
482 snapshot: &GraphPersistSnapshot,
483 builder: Option<SnapshotBuilder>,
484 factories: Option<IndexMap<String, NodeFactory>>,
485 ) -> Result<Self, SnapshotError> {
486 let graph = Graph::new(&snapshot.name);
487 let binding: Arc<dyn BindingBoundary> = core.binding();
488
489 if let Some(build_fn) = builder {
490 build_fn(core, &graph);
491 graph.restore(core, snapshot)?;
492 return Ok(graph);
493 }
494
495 let factories = factories.unwrap_or_default();
496
497 // D276 tree-wide hydration — replaces the pre-D276 single-pass
498 // per-graph `hydrate_subgraph` / `hydrate_nodes` recursion.
499 // Four passes:
500 //
501 // Pass 0 — mount tree: recursively `mount_new` every
502 // subgraph; record `(absolute_path → Graph)` so
503 // later passes can address each owning graph.
504 // Pass 1 — state-first: walk tree creating ALL state nodes
505 // (no deps to resolve). Each state node is
506 // registered in its owner graph's `names`, so
507 // subsequent passes can locate it via
508 // [`Graph::try_resolve`].
509 // Pass 2 — derived: collect ALL non-state nodes across the
510 // tree into one queue. Run ONE shared retry loop;
511 // dep names resolve via the owner graph's
512 // `try_resolve`, which natively handles owner-
513 // relative paths with `".."` and `"::"` segments
514 // (Slice V3 cross-subgraph path machinery).
515 // Pass 3 — status restore: walk tree, apply
516 // `Completed` / `Errored` via the same
517 // `try_resolve` lookup.
518 //
519 // Back-compat: snapshots produced by callers that never
520 // crossed a mount boundary use bare local names only; those
521 // resolve identically to the pre-D276 flat lookup (and
522 // identically to a one-segment `try_resolve` on the owner).
523 //
524 // See `~/src/graphrefly-ts/docs/rust-port-decisions.md` D276
525 // and the `docs/porting-deferred.md` M4.E1 closure block.
526
527 // Pass 0 — mount tree.
528 let mut graph_map: IndexMap<String, Graph> = IndexMap::new();
529 graph_map.insert(String::new(), graph.clone());
530 mount_subgraphs_recursive(core, &graph, snapshot, "", &mut graph_map)?;
531
532 // Pass 1 — state nodes tree-wide.
533 create_state_nodes_recursive(core, snapshot, "", &graph_map, &binding)?;
534
535 // Pass 2 — derived nodes tree-wide with shared retry loop.
536 let mut derived_queue: Vec<DerivedEntry> = Vec::new();
537 collect_derived_recursive(snapshot, "", &graph_map, &mut derived_queue);
538 create_derived_with_retry(core, &factories, derived_queue)?;
539
540 // Pass 3 — status restore.
541 apply_status_recursive(core, snapshot, "", &graph_map, &binding)?;
542
543 Ok(graph)
544 }
545}
546
547/// D276 auto-hydration: a non-state node awaiting derivation in
548/// Pass 2's shared retry loop. The owner graph is captured so the
549/// retry loop can call `owner_graph.try_resolve(dep_name)` against
550/// the right namespace (owner-relative paths walk via the owner's
551/// parent/child references in [`crate::graph::resolve_checked`]).
552struct DerivedEntry {
553 owner_graph: Graph,
554 name: String,
555 slice: NodeSlice,
556}
557
558/// D276 Pass 0 — mount every subgraph under `parent` and accumulate
559/// each subgraph's absolute path → [`Graph`] handle into `graph_map`.
560fn mount_subgraphs_recursive(
561 core: &Core,
562 parent: &Graph,
563 snap: &GraphPersistSnapshot,
564 parent_path: &str,
565 graph_map: &mut IndexMap<String, Graph>,
566) -> Result<(), SnapshotError> {
567 for (child_name, child_snap) in &snap.subgraphs {
568 let child_graph = parent
569 .mount_new(core, child_name)
570 .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
571 let child_path = if parent_path.is_empty() {
572 child_name.clone()
573 } else {
574 format!("{parent_path}{PATH_SEP}{child_name}")
575 };
576 graph_map.insert(child_path.clone(), child_graph.clone());
577 mount_subgraphs_recursive(core, &child_graph, child_snap, &child_path, graph_map)?;
578 }
579 Ok(())
580}
581
582/// D276 Pass 1 — recursively create every state node in the tree
583/// (state nodes have no deps to resolve). Each new state node is
584/// registered in its owner graph's `names`; Pass 2/3 locate it via
585/// [`Graph::try_resolve`].
586fn create_state_nodes_recursive(
587 core: &Core,
588 snap: &GraphPersistSnapshot,
589 owner_path: &str,
590 graph_map: &IndexMap<String, Graph>,
591 binding: &Arc<dyn BindingBoundary>,
592) -> Result<(), SnapshotError> {
593 let owner_graph = graph_map
594 .get(owner_path)
595 .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
596
597 // D279 (2026-05-22, E-ii.1): pre-validate every state name against the
598 // owner graph's mount tree BEFORE any `register_state` call. Pre-D279,
599 // `Graph::state` called `core.register_state(...)` first (allocating a
600 // fresh NodeId + cache retention) THEN `add(name, ...)` — a name
601 // collision against a child mount populated by Pass 0 left an orphan
602 // NodeId in Core's registry with no path to teardown, and the surfaced
603 // error (`UnknownNode` via `map_err(|_| ...)`) discarded the real
604 // collision cause. Pre-validation closes both bugs at once: zero Core
605 // mutation on a doomed restore, and a dedicated `NameCollision`
606 // diagnostic.
607 let child_mount_names: std::collections::HashSet<String> =
608 owner_graph.child_names().into_iter().collect();
609 for (name, slice) in &snap.nodes {
610 if slice.node_type == "state" && child_mount_names.contains(name) {
611 return Err(SnapshotError::NameCollision {
612 name: name.clone(),
613 graph_path: owner_path.to_owned(),
614 });
615 }
616 }
617
618 for (name, slice) in &snap.nodes {
619 if slice.node_type == "state" {
620 let initial = slice
621 .value
622 .as_ref()
623 .map(|v| binding.deserialize_value(v.clone()));
624 owner_graph
625 .state(core, name, initial)
626 .map_err(|_| SnapshotError::UnknownNode(name.clone()))?;
627 }
628 }
629 for (child_name, child_snap) in &snap.subgraphs {
630 let child_path = if owner_path.is_empty() {
631 child_name.clone()
632 } else {
633 format!("{owner_path}{PATH_SEP}{child_name}")
634 };
635 create_state_nodes_recursive(core, child_snap, &child_path, graph_map, binding)?;
636 }
637 Ok(())
638}
639
640/// D276 Pass 2a — walk the entire tree collecting non-state nodes
641/// into one flat queue tagged with their owner graph + path. The
642/// queue is then run through [`create_derived_with_retry`] which
643/// can resolve deps that cross mount boundaries.
644fn collect_derived_recursive(
645 snap: &GraphPersistSnapshot,
646 owner_path: &str,
647 graph_map: &IndexMap<String, Graph>,
648 out: &mut Vec<DerivedEntry>,
649) {
650 let owner_graph = graph_map
651 .get(owner_path)
652 .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
653 for (name, slice) in &snap.nodes {
654 if slice.node_type != "state" {
655 out.push(DerivedEntry {
656 owner_graph: owner_graph.clone(),
657 name: name.clone(),
658 slice: slice.clone(),
659 });
660 }
661 }
662 for (child_name, child_snap) in &snap.subgraphs {
663 let child_path = if owner_path.is_empty() {
664 child_name.clone()
665 } else {
666 format!("{owner_path}{PATH_SEP}{child_name}")
667 };
668 collect_derived_recursive(child_snap, &child_path, graph_map, out);
669 }
670}
671
672/// D276 Pass 2b — tree-wide retry loop for derived/dynamic/operator
673/// nodes. Each iteration resolves an entry's deps via the owner
674/// graph's [`Graph::try_resolve`], which natively understands the
675/// owner-relative path syntax emitted by the D276 encoder:
676///
677/// - bare name → local lookup in owner's `names` (pre-D276 shape).
678/// - `"child::name"` → descend into a mounted subgraph.
679/// - `"..::name"` → walk to parent.
680/// - `"..::sibling::name"` → walk to parent then descend into a sibling.
681///
682/// Loop terminates when (a) all entries created — success — or
683/// (b) one pass made no progress — `UnresolvableDeps` on the first
684/// stuck entry.
685fn create_derived_with_retry(
686 core: &Core,
687 factories: &IndexMap<String, NodeFactory>,
688 entries: Vec<DerivedEntry>,
689) -> Result<(), SnapshotError> {
690 let mut remaining = entries;
691 loop {
692 let before = remaining.len();
693 let mut still_remaining = Vec::new();
694
695 for entry in remaining {
696 let mut resolved = Vec::with_capacity(entry.slice.deps.len());
697 let mut all_ok = true;
698 for dep_name in &entry.slice.deps {
699 if let Some(dep_id) = entry.owner_graph.try_resolve(dep_name) {
700 resolved.push(dep_id);
701 } else {
702 all_ok = false;
703 break;
704 }
705 }
706 if all_ok {
707 let factory = factories.get(&entry.slice.node_type).ok_or_else(|| {
708 SnapshotError::MissingFactory(entry.slice.node_type.clone(), entry.name.clone())
709 })?;
710 factory(
711 core,
712 &entry.owner_graph,
713 &entry.name,
714 &entry.slice,
715 &resolved,
716 )?;
717 } else {
718 still_remaining.push(entry);
719 }
720 }
721
722 remaining = still_remaining;
723 if remaining.is_empty() {
724 break;
725 }
726 if remaining.len() == before {
727 let entry = &remaining[0];
728 return Err(SnapshotError::UnresolvableDeps(
729 entry.name.clone(),
730 entry.slice.deps.clone(),
731 ));
732 }
733 }
734 Ok(())
735}
736
737/// D276 Pass 3 — recursively apply each node's snapshot status
738/// (Completed / Errored) via the owner graph's [`Graph::try_resolve`].
739/// Sentinel / Live are no-ops (state nodes already received their
740/// cache during Pass 1's `Graph::state` initial-value path; derived
741/// recompute on first subscribe).
742fn apply_status_recursive(
743 core: &Core,
744 snap: &GraphPersistSnapshot,
745 owner_path: &str,
746 graph_map: &IndexMap<String, Graph>,
747 binding: &Arc<dyn BindingBoundary>,
748) -> Result<(), SnapshotError> {
749 let owner_graph = graph_map
750 .get(owner_path)
751 .expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
752 for (name, slice) in &snap.nodes {
753 // /qa G2.1 (2026-05-22): mirror `restore_into:339` — a node listed
754 // in the snapshot that doesn't resolve via `try_resolve` is a
755 // structural inconsistency (Pass 1/Pass 2 should have created it).
756 // Silently dropping the status would mask a Pass-2 hydration bug
757 // as "successful restore with corrupted lifecycle state."
758 let node_id = owner_graph
759 .try_resolve(name)
760 .ok_or_else(|| SnapshotError::UnknownNode(name.clone()))?;
761 match &slice.status {
762 NodeSnapshotStatus::Completed => {
763 owner_graph.complete(core, node_id);
764 }
765 NodeSnapshotStatus::Errored { error } => {
766 if let Some(err_val) = error {
767 let err_handle = binding.deserialize_value(err_val.clone());
768 owner_graph.error(core, node_id, err_handle);
769 }
770 }
771 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
772 }
773 }
774 for (child_name, child_snap) in &snap.subgraphs {
775 let child_path = if owner_path.is_empty() {
776 child_name.clone()
777 } else {
778 format!("{owner_path}{PATH_SEP}{child_name}")
779 };
780 apply_status_recursive(core, child_snap, &child_path, graph_map, binding)?;
781 }
782 Ok(())
783}