graphrefly_graph/snapshot.rs
1//! `Graph::snapshot()` / `Graph::restore()` / `Graph::from_snapshot()`
2//! — portable serialization of graph state (M4.E1, R3.8).
3//!
4//! # Handle-protocol boundary
5//!
6//! `snapshot()` calls `BindingBoundary::serialize_handle` to project
7//! each node's cached `HandleId` into a `serde_json::Value`. `restore()`
8//! / `from_snapshot()` call `BindingBoundary::deserialize_value` to
9//! re-intern values from JSON back into handles.
10//!
11//! # Edges
12//!
13//! Per D169, edges are omitted from the snapshot (they're derived from
14//! deps via `Graph::edges()`). This keeps the format lean; edges can be
15//! added as an additive field later without breaking the format.
16
17use std::sync::Arc;
18
19use graphrefly_core::{BindingBoundary, NodeId, NodeKind, TerminalKind, NO_HANDLE};
20use indexmap::IndexMap;
21use serde::{Deserialize, Serialize};
22
23use crate::graph::Graph;
24
25/// Portable snapshot of a graph's state — survives serialization
26/// round-trips (JSON, CBOR, etc.).
27///
28/// Contains the graph name, per-node state slices, and mounted
29/// subgraph names (recursive). Edges are omitted (derived from
30/// deps per D169).
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct GraphPersistSnapshot {
33 /// Graph name as set at construction / mount.
34 pub name: String,
35 /// Per-node state by local name, in namespace insertion order.
36 pub nodes: IndexMap<String, NodeSlice>,
37 /// Mounted subgraph snapshots, keyed by mount name.
38 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
39 pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
40}
41
42/// Per-node state within a snapshot.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct NodeSlice {
45 /// Node kind: `"state"`, `"derived"`, `"dynamic"`, `"producer"`,
46 /// `"operator"`.
47 #[serde(rename = "type")]
48 pub node_type: String,
49 /// Serialized cache value. `None` when the cache is sentinel
50 /// (node has never emitted DATA).
51 #[serde(default, skip_serializing_if = "Option::is_none")]
52 pub value: Option<serde_json::Value>,
53 /// Node lifecycle status.
54 pub status: NodeSnapshotStatus,
55 /// Dependency names in declaration order (empty for state/producer).
56 #[serde(default, skip_serializing_if = "Vec::is_empty")]
57 pub deps: Vec<String>,
58}
59
60/// Lifecycle status stored in a snapshot.
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(rename_all = "lowercase")]
63pub enum NodeSnapshotStatus {
64 /// Never emitted DATA.
65 Sentinel,
66 /// Has emitted at least one DATA.
67 Live,
68 /// Terminal: COMPLETE.
69 Completed,
70 /// Terminal: ERROR. Carries the serialized error value.
71 Errored {
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 error: Option<serde_json::Value>,
74 },
75}
76
77/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
78#[derive(Debug, thiserror::Error)]
79pub enum SnapshotError {
80 #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
81 NameMismatch { expected: String, actual: String },
82 #[error("node `{0}` in snapshot not found in graph namespace")]
83 UnknownNode(String),
84 #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
85 UnknownSubgraph(String),
86 #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
87 UnresolvableDeps(String, Vec<String>),
88 #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
89 MissingFactory(String, String),
90}
91
92/// Factory function for auto-hydration mode of `Graph::from_snapshot`.
93/// Given a node name, its snapshot slice, and already-resolved dep
94/// `NodeId`s, returns the `NodeId` of the reconstructed node registered
95/// in the provided `Graph`.
96///
97/// The factory is responsible for calling `graph.state(...)`,
98/// `graph.derived(...)`, etc. — it knows the node's type and how to
99/// wire the fn/equals from the application's closure registry.
100pub type NodeFactory =
101 Box<dyn Fn(&Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
102
103/// Builder function for `Graph::from_snapshot` builder mode.
104pub type SnapshotBuilder = Box<dyn FnOnce(&Graph)>;
105
106impl Graph {
107 /// Serialize this graph's state into a portable snapshot.
108 ///
109 /// Walks the namespace and mounted subgraphs recursively. Each
110 /// node's cache value is serialized via
111 /// [`BindingBoundary::serialize_handle`]. Terminal error payloads
112 /// are also serialized.
113 ///
114 /// The snapshot captures state-at-call-time; concurrent mutations
115 /// during snapshot may produce a torn read. Use `Graph::batch` or
116 /// `Graph::signal(Pause)` for consistency if needed.
117 #[must_use]
118 pub fn snapshot(&self) -> GraphPersistSnapshot {
119 self.snapshot_inner()
120 }
121
122 fn snapshot_inner(&self) -> GraphPersistSnapshot {
123 let inner = self.inner.lock();
124 let name = inner.name.clone();
125
126 // Build a name→NodeId map + collect dep NodeIds for each node,
127 // all under the inner lock.
128 let node_entries: Vec<(String, NodeId)> =
129 inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
130 let children: Vec<(String, Graph)> = inner
131 .children
132 .iter()
133 .map(|(n, g)| (n.clone(), g.clone()))
134 .collect();
135
136 // Build reverse map for dep name resolution.
137 let id_to_name: IndexMap<NodeId, String> =
138 inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
139 drop(inner);
140
141 let binding = self.core.binding_ptr();
142 let mut nodes = IndexMap::new();
143
144 for (node_name, node_id) in &node_entries {
145 let kind = self.core.kind_of(*node_id);
146 let node_type = match kind {
147 Some(NodeKind::State) => "state",
148 Some(NodeKind::Derived) => "derived",
149 Some(NodeKind::Dynamic) => "dynamic",
150 Some(NodeKind::Producer) => "producer",
151 Some(NodeKind::Operator(_)) => "operator",
152 None => "unknown",
153 };
154
155 // Serialize cache value.
156 let cache = self.core.cache_of(*node_id);
157 let value = if cache == NO_HANDLE {
158 None
159 } else {
160 binding.serialize_handle(cache)
161 };
162
163 // Terminal status.
164 let terminal = self.core.is_terminal(*node_id);
165 let status = match terminal {
166 Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
167 Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
168 error: binding.serialize_handle(err_handle),
169 },
170 None => {
171 if self.core.has_fired_once(*node_id) || cache != NO_HANDLE {
172 NodeSnapshotStatus::Live
173 } else {
174 NodeSnapshotStatus::Sentinel
175 }
176 }
177 };
178
179 // Dep names in declaration order.
180 let dep_ids = self.core.deps_of(*node_id);
181 let deps: Vec<String> = dep_ids
182 .iter()
183 .map(|dep_id| {
184 id_to_name
185 .get(dep_id)
186 .cloned()
187 .unwrap_or_else(|| format!("_anon_{}", dep_id.raw()))
188 })
189 .collect();
190
191 nodes.insert(
192 node_name.clone(),
193 NodeSlice {
194 node_type: node_type.to_owned(),
195 value,
196 status,
197 deps,
198 },
199 );
200 }
201
202 // Recurse into mounted subgraphs.
203 let mut subgraphs = IndexMap::new();
204 for (child_name, child_graph) in children {
205 subgraphs.insert(child_name, child_graph.snapshot_inner());
206 }
207
208 GraphPersistSnapshot {
209 name,
210 nodes,
211 subgraphs,
212 }
213 }
214
215 /// Restore state from a snapshot into an existing graph.
216 ///
217 /// The graph must already have nodes registered with matching names.
218 /// For each node in the snapshot:
219 /// - State nodes: emits the serialized value via `Core::emit`.
220 /// - Compute nodes (derived/dynamic/operator): skipped (their state
221 /// is derived from deps; they'll recompute when deps emit).
222 /// - Terminal status is NOT restored (terminals require a separate
223 /// lifecycle mechanism — `complete()` / `error()` calls).
224 ///
225 /// Recurses into mounted subgraphs.
226 ///
227 /// # Errors
228 ///
229 /// Returns `SnapshotError::NameMismatch` if the snapshot name
230 /// doesn't match the graph name. Returns `SnapshotError::UnknownNode`
231 /// for nodes in the snapshot that aren't in the graph namespace.
232 pub fn restore(&self, snapshot: &GraphPersistSnapshot) -> Result<(), SnapshotError> {
233 let graph_name = self.name();
234 if snapshot.name != graph_name {
235 return Err(SnapshotError::NameMismatch {
236 expected: snapshot.name.clone(),
237 actual: graph_name,
238 });
239 }
240
241 let binding = self.core.binding_ptr();
242
243 for (node_name, slice) in &snapshot.nodes {
244 let node_id = self
245 .try_resolve(node_name)
246 .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
247
248 // Only restore values for state nodes — compute nodes derive
249 // their state from deps and will recompute.
250 if slice.node_type == "state" {
251 if let Some(ref value) = slice.value {
252 let handle = binding.deserialize_value(value.clone());
253 self.core.emit(node_id, handle);
254 }
255 }
256
257 // Restore terminal status.
258 match &slice.status {
259 NodeSnapshotStatus::Completed => {
260 self.core.complete(node_id);
261 }
262 NodeSnapshotStatus::Errored { error } => {
263 if let Some(err_val) = error {
264 let err_handle = binding.deserialize_value(err_val.clone());
265 self.core.error(node_id, err_handle);
266 }
267 }
268 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
269 }
270 }
271
272 // Recurse into subgraphs. Collect under lock, restore after drop.
273 let child_pairs: Vec<(String, Graph)> = {
274 let inner = self.inner.lock();
275 snapshot
276 .subgraphs
277 .keys()
278 .map(|name| {
279 let child = inner
280 .children
281 .get(name)
282 .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
283 Ok((name.clone(), child.clone()))
284 })
285 .collect::<Result<Vec<_>, SnapshotError>>()?
286 };
287 for (child_name, child) in child_pairs {
288 child.restore(&snapshot.subgraphs[&child_name])?;
289 }
290
291 Ok(())
292 }
293
294 /// Reconstruct a graph from a snapshot.
295 ///
296 /// **Builder mode** (when `builder` is `Some`): creates an empty
297 /// graph, runs the builder to register nodes, then calls `restore()`
298 /// to populate state from the snapshot. The builder controls topology;
299 /// the snapshot only provides values.
300 ///
301 /// **Auto-hydration mode** (when `builder` is `None`): reconstructs
302 /// both topology and state from the snapshot. Requires `factories`
303 /// for non-state nodes — a map from node type string (e.g.
304 /// `"derived"`) to a factory function that registers the node in
305 /// the graph given its name, snapshot slice, and resolved dep ids.
306 ///
307 /// State nodes are auto-created without a factory (they only need
308 /// a name and optional initial value).
309 ///
310 /// # Errors
311 ///
312 /// Returns `SnapshotError::UnresolvableDeps` if auto-hydration
313 /// can't resolve a node's dependencies after iterating all nodes.
314 /// Returns `SnapshotError::MissingFactory` if a non-state node
315 /// type has no registered factory.
316 pub fn from_snapshot(
317 snapshot: &GraphPersistSnapshot,
318 binding: &Arc<dyn BindingBoundary>,
319 builder: Option<SnapshotBuilder>,
320 factories: Option<IndexMap<String, NodeFactory>>,
321 ) -> Result<Self, SnapshotError> {
322 let graph = Graph::new(&snapshot.name, Arc::clone(binding));
323
324 if let Some(build_fn) = builder {
325 // Builder mode: user registers nodes, snapshot restores state.
326 build_fn(&graph);
327 graph.restore(snapshot)?;
328 return Ok(graph);
329 }
330
331 // Auto-hydration mode.
332 let factories = factories.unwrap_or_default();
333
334 // Phase 1: Mount hierarchy (recursive).
335 for (child_name, child_snapshot) in &snapshot.subgraphs {
336 let child = graph
337 .mount_new(child_name)
338 .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
339 Self::hydrate_subgraph(&child, child_snapshot, binding, &factories)?;
340 }
341
342 // Phase 2: Iterative node reconstruction.
343 Self::hydrate_nodes(&graph, snapshot, binding, &factories)?;
344
345 Ok(graph)
346 }
347
348 fn hydrate_subgraph(
349 graph: &Graph,
350 snapshot: &GraphPersistSnapshot,
351 binding: &Arc<dyn BindingBoundary>,
352 factories: &IndexMap<String, NodeFactory>,
353 ) -> Result<(), SnapshotError> {
354 // Mount children first.
355 for (child_name, child_snapshot) in &snapshot.subgraphs {
356 let child = graph
357 .mount_new(child_name)
358 .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
359 Self::hydrate_subgraph(&child, child_snapshot, binding, factories)?;
360 }
361 // Then hydrate nodes.
362 Self::hydrate_nodes(graph, snapshot, binding, factories)
363 }
364
365 fn hydrate_nodes(
366 graph: &Graph,
367 snapshot: &GraphPersistSnapshot,
368 binding: &Arc<dyn BindingBoundary>,
369 factories: &IndexMap<String, NodeFactory>,
370 ) -> Result<(), SnapshotError> {
371 let mut created: IndexMap<String, NodeId> = IndexMap::new();
372 let mut remaining: Vec<(String, NodeSlice)> = snapshot
373 .nodes
374 .iter()
375 .map(|(n, s)| (n.clone(), s.clone()))
376 .collect();
377
378 // Iterative resolution: keep passing until all nodes are created
379 // or no progress is made.
380 loop {
381 let before = remaining.len();
382 let mut still_remaining = Vec::new();
383
384 for (name, slice) in remaining {
385 // Check if all deps are resolved.
386 let deps_resolved: Option<Vec<NodeId>> = if slice.deps.is_empty() {
387 Some(Vec::new())
388 } else {
389 let mut resolved = Vec::with_capacity(slice.deps.len());
390 let mut all_ok = true;
391 for dep_name in &slice.deps {
392 if let Some(&dep_id) = created.get(dep_name) {
393 resolved.push(dep_id);
394 } else {
395 all_ok = false;
396 break;
397 }
398 }
399 if all_ok {
400 Some(resolved)
401 } else {
402 None
403 }
404 };
405
406 if let Some(dep_ids) = deps_resolved {
407 // Create the node.
408 let node_id = if slice.node_type == "state" {
409 // State nodes: auto-create with initial value.
410 let initial = slice
411 .value
412 .as_ref()
413 .map(|v| binding.deserialize_value(v.clone()));
414 graph
415 .state(&name, initial)
416 .map_err(|_| SnapshotError::UnknownNode(name.clone()))?
417 } else {
418 // Non-state nodes: use factory.
419 let factory = factories.get(&slice.node_type).ok_or_else(|| {
420 SnapshotError::MissingFactory(slice.node_type.clone(), name.clone())
421 })?;
422 factory(graph, &name, &slice, &dep_ids)?
423 };
424 created.insert(name, node_id);
425 } else {
426 still_remaining.push((name, slice));
427 }
428 }
429
430 remaining = still_remaining;
431 if remaining.is_empty() {
432 break;
433 }
434 if remaining.len() == before {
435 // No progress — unresolvable deps.
436 let (name, slice) = &remaining[0];
437 return Err(SnapshotError::UnresolvableDeps(
438 name.clone(),
439 slice.deps.clone(),
440 ));
441 }
442 }
443
444 // Phase 3: Restore terminal status for all nodes.
445 for (name, slice) in &snapshot.nodes {
446 if let Some(&node_id) = created.get(name) {
447 match &slice.status {
448 NodeSnapshotStatus::Completed => {
449 graph.complete(node_id);
450 }
451 NodeSnapshotStatus::Errored { error } => {
452 if let Some(err_val) = error {
453 let err_handle = binding.deserialize_value(err_val.clone());
454 graph.error(node_id, err_handle);
455 }
456 }
457 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
458 }
459 }
460 }
461
462 Ok(())
463 }
464}