Skip to main content

graphrefly_graph/
snapshot.rs

1//! `snapshot()` / `restore()` / `Graph::from_snapshot()` — portable
2//! serialization of graph state (M4.E1, R3.8).
3//!
4//! D246: `snapshot`/`restore`/`from_snapshot` are inherent [`Graph`]
5//! methods over the Core-free namespace tree, taking the embedder's
6//! `&Core` explicitly (D246 rule 2). `snapshot_of` is generic over
7//! `&dyn CoreFull` (the one facade) so the storage in-wave
8//! `MailboxOp::Defer` observe-sink can run it (read-only;
9//! `serialize_handle` delegates to the binding). No `SubgraphRef`/
10//! `GraphOps`/`SnapshotOps` — one `Graph`, plain free fns.
11//!
12//! # Handle-protocol boundary
13//!
14//! `snapshot()` calls `BindingBoundary::serialize_handle`; `restore()`
15//! / `from_snapshot()` call `BindingBoundary::deserialize_value`.
16//! Per D169 edges are omitted (derived from deps via `edges()`).
17
18use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
23use indexmap::IndexMap;
24use serde::{Deserialize, Serialize};
25
26use crate::graph::{resolve_checked, Graph, GraphInner};
27
28/// Portable snapshot of a graph's state.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct GraphPersistSnapshot {
31    /// Graph name as set at construction / mount.
32    pub name: String,
33    /// Per-node state by local name, in namespace insertion order.
34    pub nodes: IndexMap<String, NodeSlice>,
35    /// Mounted subgraph snapshots, keyed by mount name.
36    #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
37    pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
38}
39
40/// Per-node state within a snapshot.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeSlice {
43    /// `"state"` / `"derived"` / `"dynamic"` / `"producer"` / `"operator"`.
44    #[serde(rename = "type")]
45    pub node_type: String,
46    /// Serialized cache value. `None` when sentinel.
47    #[serde(default, skip_serializing_if = "Option::is_none")]
48    pub value: Option<serde_json::Value>,
49    /// Node lifecycle status.
50    pub status: NodeSnapshotStatus,
51    /// Dependency names in declaration order.
52    #[serde(default, skip_serializing_if = "Vec::is_empty")]
53    pub deps: Vec<String>,
54}
55
56/// Lifecycle status stored in a snapshot.
57#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
58#[serde(rename_all = "lowercase")]
59pub enum NodeSnapshotStatus {
60    /// Never emitted DATA.
61    Sentinel,
62    /// Has emitted at least one DATA.
63    Live,
64    /// Terminal: COMPLETE.
65    Completed,
66    /// Terminal: ERROR (carries the serialized error value).
67    Errored {
68        #[serde(default, skip_serializing_if = "Option::is_none")]
69        error: Option<serde_json::Value>,
70    },
71}
72
73/// Errors from [`Graph::restore`] and [`Graph::from_snapshot`].
74#[derive(Debug, thiserror::Error)]
75pub enum SnapshotError {
76    #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
77    NameMismatch { expected: String, actual: String },
78    #[error("node `{0}` in snapshot not found in graph namespace")]
79    UnknownNode(String),
80    #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
81    UnknownSubgraph(String),
82    #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
83    UnresolvableDeps(String, Vec<String>),
84    #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
85    MissingFactory(String, String),
86}
87
88/// Factory for auto-hydration mode. D246: receives the embedder's
89/// `&Core` + the Core-free [`Graph`] handle.
90pub type NodeFactory =
91    Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
92
93/// Builder function for `Graph::from_snapshot` builder mode (D246:
94/// `&Core` + Core-free [`Graph`]).
95pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
96
97/// D246: recursive snapshot over `(&dyn CoreFull, &Rc<RefCell<GraphInner>>)`
98/// — `&dyn CoreFull` (the one facade) so the storage in-wave
99/// `MailboxOp::Defer` observe-sink can run it (read-only;
100/// `serialize_handle` delegates to the binding).
101pub(crate) fn snapshot_of(
102    core: &dyn CoreFull,
103    inner_arc: &Rc<RefCell<GraphInner>>,
104) -> GraphPersistSnapshot {
105    let (name, node_entries, children, id_to_name) = {
106        let inner = inner_arc.borrow_mut();
107        let name = inner.name.clone();
108        let node_entries: Vec<(String, NodeId)> =
109            inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
110        let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
111            .children
112            .iter()
113            .map(|(n, g)| (n.clone(), g.clone()))
114            .collect();
115        let id_to_name: IndexMap<NodeId, String> =
116            inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
117        (name, node_entries, children, id_to_name)
118    };
119
120    let mut nodes = IndexMap::new();
121
122    for (node_name, node_id) in &node_entries {
123        let kind = core.kind_of(*node_id);
124        let node_type = match kind {
125            Some(NodeKind::State) => "state",
126            Some(NodeKind::Derived) => "derived",
127            Some(NodeKind::Dynamic) => "dynamic",
128            Some(NodeKind::Producer) => "producer",
129            Some(NodeKind::Operator(_)) => "operator",
130            None => "unknown",
131        };
132
133        let cache = core.cache_of(*node_id);
134        let value = if cache == NO_HANDLE {
135            None
136        } else {
137            core.serialize_handle(cache)
138        };
139
140        let terminal = core.is_terminal(*node_id);
141        let status = match terminal {
142            Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
143            Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
144                error: core.serialize_handle(err_handle),
145            },
146            None => {
147                if core.has_fired_once(*node_id) || cache != NO_HANDLE {
148                    NodeSnapshotStatus::Live
149                } else {
150                    NodeSnapshotStatus::Sentinel
151                }
152            }
153        };
154
155        let dep_ids = core.deps_of(*node_id);
156        let deps: Vec<String> = dep_ids
157            .iter()
158            .map(|dep_id| {
159                id_to_name
160                    .get(dep_id)
161                    .cloned()
162                    .unwrap_or_else(|| format!("_anon_{}", dep_id.raw()))
163            })
164            .collect();
165
166        nodes.insert(
167            node_name.clone(),
168            NodeSlice {
169                node_type: node_type.to_owned(),
170                value,
171                status,
172                deps,
173            },
174        );
175    }
176
177    let mut subgraphs = IndexMap::new();
178    for (child_name, child_inner) in children {
179        subgraphs.insert(child_name, snapshot_of(core, &child_inner));
180    }
181
182    GraphPersistSnapshot {
183        name,
184        nodes,
185        subgraphs,
186    }
187}
188
189/// Recursive restore over `(&Core, &Rc<RefCell<GraphInner>>)`.
190fn restore_into(
191    core: &Core,
192    inner_arc: &Rc<RefCell<GraphInner>>,
193    snapshot: &GraphPersistSnapshot,
194) -> Result<(), SnapshotError> {
195    let graph_name = inner_arc.borrow_mut().name.clone();
196    if snapshot.name != graph_name {
197        return Err(SnapshotError::NameMismatch {
198            expected: snapshot.name.clone(),
199            actual: graph_name,
200        });
201    }
202
203    let binding = core.binding_ptr();
204
205    for (node_name, slice) in &snapshot.nodes {
206        let node_id = resolve_checked(inner_arc, node_name)
207            .ok()
208            .flatten()
209            .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
210
211        if slice.node_type == "state" {
212            if let Some(ref value) = slice.value {
213                let handle = binding.deserialize_value(value.clone());
214                core.emit(node_id, handle);
215            }
216        }
217
218        match &slice.status {
219            NodeSnapshotStatus::Completed => {
220                core.complete(node_id);
221            }
222            NodeSnapshotStatus::Errored { error } => {
223                if let Some(err_val) = error {
224                    let err_handle = binding.deserialize_value(err_val.clone());
225                    core.error(node_id, err_handle);
226                }
227            }
228            NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
229        }
230    }
231
232    let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
233        let inner = inner_arc.borrow_mut();
234        snapshot
235            .subgraphs
236            .keys()
237            .map(|name| {
238                let child = inner
239                    .children
240                    .get(name)
241                    .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
242                Ok((name.clone(), child.clone()))
243            })
244            .collect::<Result<Vec<_>, SnapshotError>>()?
245    };
246    for (child_name, child_inner) in child_pairs {
247        restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
248    }
249
250    Ok(())
251}
252
253impl Graph {
254    /// Serialize this graph's state into a portable snapshot.
255    #[must_use]
256    pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
257        snapshot_of(core, &self.inner)
258    }
259
260    /// [`Self::snapshot`] over the one object-safe facade (D246 rule 5)
261    /// — for the storage in-wave `MailboxOp::Defer(|cf: &dyn CoreFull|)`
262    /// path, which only has a `&dyn CoreFull` (not a concrete `&Core`).
263    /// Read-only; `serialize_handle` delegates to the binding.
264    #[must_use]
265    pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
266        snapshot_of(core, &self.inner)
267    }
268
269    /// Restore state from a snapshot into this existing graph.
270    ///
271    /// # Errors
272    /// `NameMismatch` if names differ; `UnknownNode`/`UnknownSubgraph`
273    /// for snapshot entries absent from the graph.
274    pub fn restore(
275        &self,
276        core: &Core,
277        snapshot: &GraphPersistSnapshot,
278    ) -> Result<(), SnapshotError> {
279        restore_into(core, &self.inner, snapshot)
280    }
281
282    /// Reconstruct a graph from a snapshot. **Builder mode**
283    /// (`builder = Some`): build topology then `restore()` values.
284    /// **Auto-hydration** (`builder = None`): reconstruct topology +
285    /// state from the snapshot via `factories` (state nodes need none).
286    ///
287    /// D246: the embedder owns the `Core` (see
288    /// [`graphrefly_core::OwnedCore`]) and passes it in; the binding is
289    /// `core.binding_ptr()`.
290    ///
291    /// # Errors
292    /// `UnresolvableDeps` if auto-hydration can't resolve a node's
293    /// deps; `MissingFactory` for a non-state node type with no factory.
294    pub fn from_snapshot(
295        core: &Core,
296        snapshot: &GraphPersistSnapshot,
297        builder: Option<SnapshotBuilder>,
298        factories: Option<IndexMap<String, NodeFactory>>,
299    ) -> Result<Self, SnapshotError> {
300        let graph = Graph::new(&snapshot.name);
301        let binding: Arc<dyn BindingBoundary> = core.binding();
302
303        if let Some(build_fn) = builder {
304            build_fn(core, &graph);
305            graph.restore(core, snapshot)?;
306            return Ok(graph);
307        }
308
309        let factories = factories.unwrap_or_default();
310        for (child_name, child_snapshot) in &snapshot.subgraphs {
311            let child = graph
312                .mount_new(core, child_name)
313                .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
314            hydrate_subgraph(core, &child, child_snapshot, &binding, &factories)?;
315        }
316        hydrate_nodes(core, &graph, snapshot, &binding, &factories)?;
317
318        Ok(graph)
319    }
320}
321
322fn hydrate_subgraph(
323    core: &Core,
324    g: &Graph,
325    snapshot: &GraphPersistSnapshot,
326    binding: &Arc<dyn BindingBoundary>,
327    factories: &IndexMap<String, NodeFactory>,
328) -> Result<(), SnapshotError> {
329    for (child_name, child_snapshot) in &snapshot.subgraphs {
330        let child = g
331            .mount_new(core, child_name)
332            .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
333        hydrate_subgraph(core, &child, child_snapshot, binding, factories)?;
334    }
335    hydrate_nodes(core, g, snapshot, binding, factories)
336}
337
338fn hydrate_nodes(
339    core: &Core,
340    g: &Graph,
341    snapshot: &GraphPersistSnapshot,
342    binding: &Arc<dyn BindingBoundary>,
343    factories: &IndexMap<String, NodeFactory>,
344) -> Result<(), SnapshotError> {
345    let mut created: IndexMap<String, NodeId> = IndexMap::new();
346    let mut remaining: Vec<(String, NodeSlice)> = snapshot
347        .nodes
348        .iter()
349        .map(|(n, s)| (n.clone(), s.clone()))
350        .collect();
351
352    loop {
353        let before = remaining.len();
354        let mut still_remaining = Vec::new();
355
356        for (name, slice) in remaining {
357            let deps_resolved: Option<Vec<NodeId>> = if slice.deps.is_empty() {
358                Some(Vec::new())
359            } else {
360                let mut resolved = Vec::with_capacity(slice.deps.len());
361                let mut all_ok = true;
362                for dep_name in &slice.deps {
363                    if let Some(&dep_id) = created.get(dep_name) {
364                        resolved.push(dep_id);
365                    } else {
366                        all_ok = false;
367                        break;
368                    }
369                }
370                if all_ok {
371                    Some(resolved)
372                } else {
373                    None
374                }
375            };
376
377            if let Some(dep_ids) = deps_resolved {
378                let node_id = if slice.node_type == "state" {
379                    let initial = slice
380                        .value
381                        .as_ref()
382                        .map(|v| binding.deserialize_value(v.clone()));
383                    g.state(core, &name, initial)
384                        .map_err(|_| SnapshotError::UnknownNode(name.clone()))?
385                } else {
386                    let factory = factories.get(&slice.node_type).ok_or_else(|| {
387                        SnapshotError::MissingFactory(slice.node_type.clone(), name.clone())
388                    })?;
389                    factory(core, g, &name, &slice, &dep_ids)?
390                };
391                created.insert(name, node_id);
392            } else {
393                still_remaining.push((name, slice));
394            }
395        }
396
397        remaining = still_remaining;
398        if remaining.is_empty() {
399            break;
400        }
401        if remaining.len() == before {
402            let (name, slice) = &remaining[0];
403            return Err(SnapshotError::UnresolvableDeps(
404                name.clone(),
405                slice.deps.clone(),
406            ));
407        }
408    }
409
410    for (name, slice) in &snapshot.nodes {
411        if let Some(&node_id) = created.get(name) {
412            match &slice.status {
413                NodeSnapshotStatus::Completed => {
414                    g.complete(core, node_id);
415                }
416                NodeSnapshotStatus::Errored { error } => {
417                    if let Some(err_val) = error {
418                        let err_handle = binding.deserialize_value(err_val.clone());
419                        g.error(core, node_id, err_handle);
420                    }
421                }
422                NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
423            }
424        }
425    }
426
427    Ok(())
428}