Skip to main content

graphrefly_graph/
snapshot.rs

1//! `Graph::snapshot()` / `Graph::restore()` / `Graph::from_snapshot()`
2//! — portable serialization of graph state (M4.E1, R3.8).
3//!
4//! # Handle-protocol boundary
5//!
6//! `snapshot()` calls `BindingBoundary::serialize_handle` to project
7//! each node's cached `HandleId` into a `serde_json::Value`. `restore()`
8//! / `from_snapshot()` call `BindingBoundary::deserialize_value` to
9//! re-intern values from JSON back into handles.
10//!
11//! # Edges
12//!
13//! Per D169, edges are omitted from the snapshot (they're derived from
14//! deps via `Graph::edges()`). This keeps the format lean; edges can be
15//! added as an additive field later without breaking the format.
16
17use std::sync::Arc;
18
19use graphrefly_core::{BindingBoundary, NodeId, NodeKind, TerminalKind, NO_HANDLE};
20use indexmap::IndexMap;
21use serde::{Deserialize, Serialize};
22
23use crate::graph::Graph;
24
25/// Portable snapshot of a graph's state — survives serialization
26/// round-trips (JSON, CBOR, etc.).
27///
28/// Contains the graph name, per-node state slices, and mounted
29/// subgraph names (recursive). Edges are omitted (derived from
30/// deps per D169).
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct GraphPersistSnapshot {
33    /// Graph name as set at construction / mount.
34    pub name: String,
35    /// Per-node state by local name, in namespace insertion order.
36    pub nodes: IndexMap<String, NodeSlice>,
37    /// Mounted subgraph snapshots, keyed by mount name.
38    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
39    pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
40}
41
42/// Per-node state within a snapshot.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct NodeSlice {
45    /// Node kind: `"state"`, `"derived"`, `"dynamic"`, `"producer"`,
46    /// `"operator"`.
47    #[serde(rename = "type")]
48    pub node_type: String,
49    /// Serialized cache value. `None` when the cache is sentinel
50    /// (node has never emitted DATA).
51    #[serde(default, skip_serializing_if = "Option::is_none")]
52    pub value: Option<serde_json::Value>,
53    /// Node lifecycle status.
54    pub status: NodeSnapshotStatus,
55    /// Dependency names in declaration order (empty for state/producer).
56    #[serde(default, skip_serializing_if = "Vec::is_empty")]
57    pub deps: Vec<String>,
58}
59
60/// Lifecycle status stored in a snapshot.
61#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
62#[serde(rename_all = "lowercase")]
63pub enum NodeSnapshotStatus {
64    /// Never emitted DATA.
65    Sentinel,
66    /// Has emitted at least one DATA.
67    Live,
68    /// Terminal: COMPLETE.
69    Completed,
70    /// Terminal: ERROR. Carries the serialized error value.
71    Errored {
72        #[serde(default, skip_serializing_if = "Option::is_none")]
73        error: Option<serde_json::Value>,
74    },
75}
76
77/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
78#[derive(Debug, thiserror::Error)]
79pub enum SnapshotError {
80    #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
81    NameMismatch { expected: String, actual: String },
82    #[error("node `{0}` in snapshot not found in graph namespace")]
83    UnknownNode(String),
84    #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
85    UnknownSubgraph(String),
86    #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
87    UnresolvableDeps(String, Vec<String>),
88    #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
89    MissingFactory(String, String),
90}
91
92/// Factory function for auto-hydration mode of `Graph::from_snapshot`.
93/// Given a node name, its snapshot slice, and already-resolved dep
94/// `NodeId`s, returns the `NodeId` of the reconstructed node registered
95/// in the provided `Graph`.
96///
97/// The factory is responsible for calling `graph.state(...)`,
98/// `graph.derived(...)`, etc. — it knows the node's type and how to
99/// wire the fn/equals from the application's closure registry.
100pub type NodeFactory =
101    Box<dyn Fn(&Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
102
103/// Builder function for `Graph::from_snapshot` builder mode.
104pub type SnapshotBuilder = Box<dyn FnOnce(&Graph)>;
105
106impl Graph {
107    /// Serialize this graph's state into a portable snapshot.
108    ///
109    /// Walks the namespace and mounted subgraphs recursively. Each
110    /// node's cache value is serialized via
111    /// [`BindingBoundary::serialize_handle`]. Terminal error payloads
112    /// are also serialized.
113    ///
114    /// The snapshot captures state-at-call-time; concurrent mutations
115    /// during snapshot may produce a torn read. Use `Graph::batch` or
116    /// `Graph::signal(Pause)` for consistency if needed.
117    #[must_use]
118    pub fn snapshot(&self) -> GraphPersistSnapshot {
119        self.snapshot_inner()
120    }
121
122    fn snapshot_inner(&self) -> GraphPersistSnapshot {
123        let inner = self.inner.lock();
124        let name = inner.name.clone();
125
126        // Build a name→NodeId map + collect dep NodeIds for each node,
127        // all under the inner lock.
128        let node_entries: Vec<(String, NodeId)> =
129            inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
130        let children: Vec<(String, Graph)> = inner
131            .children
132            .iter()
133            .map(|(n, g)| (n.clone(), g.clone()))
134            .collect();
135
136        // Build reverse map for dep name resolution.
137        let id_to_name: IndexMap<NodeId, String> =
138            inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
139        drop(inner);
140
141        let binding = self.core.binding_ptr();
142        let mut nodes = IndexMap::new();
143
144        for (node_name, node_id) in &node_entries {
145            let kind = self.core.kind_of(*node_id);
146            let node_type = match kind {
147                Some(NodeKind::State) => "state",
148                Some(NodeKind::Derived) => "derived",
149                Some(NodeKind::Dynamic) => "dynamic",
150                Some(NodeKind::Producer) => "producer",
151                Some(NodeKind::Operator(_)) => "operator",
152                None => "unknown",
153            };
154
155            // Serialize cache value.
156            let cache = self.core.cache_of(*node_id);
157            let value = if cache == NO_HANDLE {
158                None
159            } else {
160                binding.serialize_handle(cache)
161            };
162
163            // Terminal status.
164            let terminal = self.core.is_terminal(*node_id);
165            let status = match terminal {
166                Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
167                Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
168                    error: binding.serialize_handle(err_handle),
169                },
170                None => {
171                    if self.core.has_fired_once(*node_id) || cache != NO_HANDLE {
172                        NodeSnapshotStatus::Live
173                    } else {
174                        NodeSnapshotStatus::Sentinel
175                    }
176                }
177            };
178
179            // Dep names in declaration order.
180            let dep_ids = self.core.deps_of(*node_id);
181            let deps: Vec<String> = dep_ids
182                .iter()
183                .map(|dep_id| {
184                    id_to_name
185                        .get(dep_id)
186                        .cloned()
187                        .unwrap_or_else(|| format!("_anon_{}", dep_id.raw()))
188                })
189                .collect();
190
191            nodes.insert(
192                node_name.clone(),
193                NodeSlice {
194                    node_type: node_type.to_owned(),
195                    value,
196                    status,
197                    deps,
198                },
199            );
200        }
201
202        // Recurse into mounted subgraphs.
203        let mut subgraphs = IndexMap::new();
204        for (child_name, child_graph) in children {
205            subgraphs.insert(child_name, child_graph.snapshot_inner());
206        }
207
208        GraphPersistSnapshot {
209            name,
210            nodes,
211            subgraphs,
212        }
213    }
214
215    /// Restore state from a snapshot into an existing graph.
216    ///
217    /// The graph must already have nodes registered with matching names.
218    /// For each node in the snapshot:
219    /// - State nodes: emits the serialized value via `Core::emit`.
220    /// - Compute nodes (derived/dynamic/operator): skipped (their state
221    ///   is derived from deps; they'll recompute when deps emit).
222    /// - Terminal status is NOT restored (terminals require a separate
223    ///   lifecycle mechanism — `complete()` / `error()` calls).
224    ///
225    /// Recurses into mounted subgraphs.
226    ///
227    /// # Errors
228    ///
229    /// Returns `SnapshotError::NameMismatch` if the snapshot name
230    /// doesn't match the graph name. Returns `SnapshotError::UnknownNode`
231    /// for nodes in the snapshot that aren't in the graph namespace.
232    pub fn restore(&self, snapshot: &GraphPersistSnapshot) -> Result<(), SnapshotError> {
233        let graph_name = self.name();
234        if snapshot.name != graph_name {
235            return Err(SnapshotError::NameMismatch {
236                expected: snapshot.name.clone(),
237                actual: graph_name,
238            });
239        }
240
241        let binding = self.core.binding_ptr();
242
243        for (node_name, slice) in &snapshot.nodes {
244            let node_id = self
245                .try_resolve(node_name)
246                .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
247
248            // Only restore values for state nodes — compute nodes derive
249            // their state from deps and will recompute.
250            if slice.node_type == "state" {
251                if let Some(ref value) = slice.value {
252                    let handle = binding.deserialize_value(value.clone());
253                    self.core.emit(node_id, handle);
254                }
255            }
256
257            // Restore terminal status.
258            match &slice.status {
259                NodeSnapshotStatus::Completed => {
260                    self.core.complete(node_id);
261                }
262                NodeSnapshotStatus::Errored { error } => {
263                    if let Some(err_val) = error {
264                        let err_handle = binding.deserialize_value(err_val.clone());
265                        self.core.error(node_id, err_handle);
266                    }
267                }
268                NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
269            }
270        }
271
272        // Recurse into subgraphs. Collect under lock, restore after drop.
273        let child_pairs: Vec<(String, Graph)> = {
274            let inner = self.inner.lock();
275            snapshot
276                .subgraphs
277                .keys()
278                .map(|name| {
279                    let child = inner
280                        .children
281                        .get(name)
282                        .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
283                    Ok((name.clone(), child.clone()))
284                })
285                .collect::<Result<Vec<_>, SnapshotError>>()?
286        };
287        for (child_name, child) in child_pairs {
288            child.restore(&snapshot.subgraphs[&child_name])?;
289        }
290
291        Ok(())
292    }
293
294    /// Reconstruct a graph from a snapshot.
295    ///
296    /// **Builder mode** (when `builder` is `Some`): creates an empty
297    /// graph, runs the builder to register nodes, then calls `restore()`
298    /// to populate state from the snapshot. The builder controls topology;
299    /// the snapshot only provides values.
300    ///
301    /// **Auto-hydration mode** (when `builder` is `None`): reconstructs
302    /// both topology and state from the snapshot. Requires `factories`
303    /// for non-state nodes — a map from node type string (e.g.
304    /// `"derived"`) to a factory function that registers the node in
305    /// the graph given its name, snapshot slice, and resolved dep ids.
306    ///
307    /// State nodes are auto-created without a factory (they only need
308    /// a name and optional initial value).
309    ///
310    /// # Errors
311    ///
312    /// Returns `SnapshotError::UnresolvableDeps` if auto-hydration
313    /// can't resolve a node's dependencies after iterating all nodes.
314    /// Returns `SnapshotError::MissingFactory` if a non-state node
315    /// type has no registered factory.
316    pub fn from_snapshot(
317        snapshot: &GraphPersistSnapshot,
318        binding: &Arc<dyn BindingBoundary>,
319        builder: Option<SnapshotBuilder>,
320        factories: Option<IndexMap<String, NodeFactory>>,
321    ) -> Result<Self, SnapshotError> {
322        let graph = Graph::new(&snapshot.name, Arc::clone(binding));
323
324        if let Some(build_fn) = builder {
325            // Builder mode: user registers nodes, snapshot restores state.
326            build_fn(&graph);
327            graph.restore(snapshot)?;
328            return Ok(graph);
329        }
330
331        // Auto-hydration mode.
332        let factories = factories.unwrap_or_default();
333
334        // Phase 1: Mount hierarchy (recursive).
335        for (child_name, child_snapshot) in &snapshot.subgraphs {
336            let child = graph
337                .mount_new(child_name)
338                .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
339            Self::hydrate_subgraph(&child, child_snapshot, binding, &factories)?;
340        }
341
342        // Phase 2: Iterative node reconstruction.
343        Self::hydrate_nodes(&graph, snapshot, binding, &factories)?;
344
345        Ok(graph)
346    }
347
348    fn hydrate_subgraph(
349        graph: &Graph,
350        snapshot: &GraphPersistSnapshot,
351        binding: &Arc<dyn BindingBoundary>,
352        factories: &IndexMap<String, NodeFactory>,
353    ) -> Result<(), SnapshotError> {
354        // Mount children first.
355        for (child_name, child_snapshot) in &snapshot.subgraphs {
356            let child = graph
357                .mount_new(child_name)
358                .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
359            Self::hydrate_subgraph(&child, child_snapshot, binding, factories)?;
360        }
361        // Then hydrate nodes.
362        Self::hydrate_nodes(graph, snapshot, binding, factories)
363    }
364
365    fn hydrate_nodes(
366        graph: &Graph,
367        snapshot: &GraphPersistSnapshot,
368        binding: &Arc<dyn BindingBoundary>,
369        factories: &IndexMap<String, NodeFactory>,
370    ) -> Result<(), SnapshotError> {
371        let mut created: IndexMap<String, NodeId> = IndexMap::new();
372        let mut remaining: Vec<(String, NodeSlice)> = snapshot
373            .nodes
374            .iter()
375            .map(|(n, s)| (n.clone(), s.clone()))
376            .collect();
377
378        // Iterative resolution: keep passing until all nodes are created
379        // or no progress is made.
380        loop {
381            let before = remaining.len();
382            let mut still_remaining = Vec::new();
383
384            for (name, slice) in remaining {
385                // Check if all deps are resolved.
386                let deps_resolved: Option<Vec<NodeId>> = if slice.deps.is_empty() {
387                    Some(Vec::new())
388                } else {
389                    let mut resolved = Vec::with_capacity(slice.deps.len());
390                    let mut all_ok = true;
391                    for dep_name in &slice.deps {
392                        if let Some(&dep_id) = created.get(dep_name) {
393                            resolved.push(dep_id);
394                        } else {
395                            all_ok = false;
396                            break;
397                        }
398                    }
399                    if all_ok {
400                        Some(resolved)
401                    } else {
402                        None
403                    }
404                };
405
406                if let Some(dep_ids) = deps_resolved {
407                    // Create the node.
408                    let node_id = if slice.node_type == "state" {
409                        // State nodes: auto-create with initial value.
410                        let initial = slice
411                            .value
412                            .as_ref()
413                            .map(|v| binding.deserialize_value(v.clone()));
414                        graph
415                            .state(&name, initial)
416                            .map_err(|_| SnapshotError::UnknownNode(name.clone()))?
417                    } else {
418                        // Non-state nodes: use factory.
419                        let factory = factories.get(&slice.node_type).ok_or_else(|| {
420                            SnapshotError::MissingFactory(slice.node_type.clone(), name.clone())
421                        })?;
422                        factory(graph, &name, &slice, &dep_ids)?
423                    };
424                    created.insert(name, node_id);
425                } else {
426                    still_remaining.push((name, slice));
427                }
428            }
429
430            remaining = still_remaining;
431            if remaining.is_empty() {
432                break;
433            }
434            if remaining.len() == before {
435                // No progress — unresolvable deps.
436                let (name, slice) = &remaining[0];
437                return Err(SnapshotError::UnresolvableDeps(
438                    name.clone(),
439                    slice.deps.clone(),
440                ));
441            }
442        }
443
444        // Phase 3: Restore terminal status for all nodes.
445        for (name, slice) in &snapshot.nodes {
446            if let Some(&node_id) = created.get(name) {
447                match &slice.status {
448                    NodeSnapshotStatus::Completed => {
449                        graph.complete(node_id);
450                    }
451                    NodeSnapshotStatus::Errored { error } => {
452                        if let Some(err_val) = error {
453                            let err_handle = binding.deserialize_value(err_val.clone());
454                            graph.error(node_id, err_handle);
455                        }
456                    }
457                    NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
458                }
459            }
460        }
461
462        Ok(())
463    }
464}