1use std::cell::RefCell;
19use std::rc::Rc;
20use std::sync::Arc;
21
22use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
23use indexmap::IndexMap;
24use serde::{Deserialize, Serialize};
25
26use crate::graph::{resolve_checked, Graph, GraphInner};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct GraphPersistSnapshot {
31 pub name: String,
33 pub nodes: IndexMap<String, NodeSlice>,
35 #[serde(default, skip_serializing_if = "IndexMap::is_empty")]
37 pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct NodeSlice {
43 #[serde(rename = "type")]
45 pub node_type: String,
46 #[serde(default, skip_serializing_if = "Option::is_none")]
48 pub value: Option<serde_json::Value>,
49 pub status: NodeSnapshotStatus,
51 #[serde(default, skip_serializing_if = "Vec::is_empty")]
53 pub deps: Vec<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
58#[serde(rename_all = "lowercase")]
59pub enum NodeSnapshotStatus {
60 Sentinel,
62 Live,
64 Completed,
66 Errored {
68 #[serde(default, skip_serializing_if = "Option::is_none")]
69 error: Option<serde_json::Value>,
70 },
71}
72
73#[derive(Debug, thiserror::Error)]
75pub enum SnapshotError {
76 #[error("snapshot name `{expected}` does not match graph name `{actual}`")]
77 NameMismatch { expected: String, actual: String },
78 #[error("node `{0}` in snapshot not found in graph namespace")]
79 UnknownNode(String),
80 #[error("subgraph `{0}` in snapshot not found in graph mount tree")]
81 UnknownSubgraph(String),
82 #[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
83 UnresolvableDeps(String, Vec<String>),
84 #[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
85 MissingFactory(String, String),
86}
87
88pub type NodeFactory =
91 Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
92
93pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
96
97pub(crate) fn snapshot_of(
102 core: &dyn CoreFull,
103 inner_arc: &Rc<RefCell<GraphInner>>,
104) -> GraphPersistSnapshot {
105 let (name, node_entries, children, id_to_name) = {
106 let inner = inner_arc.borrow_mut();
107 let name = inner.name.clone();
108 let node_entries: Vec<(String, NodeId)> =
109 inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
110 let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
111 .children
112 .iter()
113 .map(|(n, g)| (n.clone(), g.clone()))
114 .collect();
115 let id_to_name: IndexMap<NodeId, String> =
116 inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
117 (name, node_entries, children, id_to_name)
118 };
119
120 let mut nodes = IndexMap::new();
121
122 for (node_name, node_id) in &node_entries {
123 let kind = core.kind_of(*node_id);
124 let node_type = match kind {
125 Some(NodeKind::State) => "state",
126 Some(NodeKind::Derived) => "derived",
127 Some(NodeKind::Dynamic) => "dynamic",
128 Some(NodeKind::Producer) => "producer",
129 Some(NodeKind::Operator(_)) => "operator",
130 None => "unknown",
131 };
132
133 let cache = core.cache_of(*node_id);
134 let value = if cache == NO_HANDLE {
135 None
136 } else {
137 core.serialize_handle(cache)
138 };
139
140 let terminal = core.is_terminal(*node_id);
141 let status = match terminal {
142 Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
143 Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
144 error: core.serialize_handle(err_handle),
145 },
146 None => {
147 if core.has_fired_once(*node_id) || cache != NO_HANDLE {
148 NodeSnapshotStatus::Live
149 } else {
150 NodeSnapshotStatus::Sentinel
151 }
152 }
153 };
154
155 let dep_ids = core.deps_of(*node_id);
156 let deps: Vec<String> = dep_ids
157 .iter()
158 .map(|dep_id| {
159 id_to_name
160 .get(dep_id)
161 .cloned()
162 .unwrap_or_else(|| format!("_anon_{}", dep_id.raw()))
163 })
164 .collect();
165
166 nodes.insert(
167 node_name.clone(),
168 NodeSlice {
169 node_type: node_type.to_owned(),
170 value,
171 status,
172 deps,
173 },
174 );
175 }
176
177 let mut subgraphs = IndexMap::new();
178 for (child_name, child_inner) in children {
179 subgraphs.insert(child_name, snapshot_of(core, &child_inner));
180 }
181
182 GraphPersistSnapshot {
183 name,
184 nodes,
185 subgraphs,
186 }
187}
188
189fn restore_into(
191 core: &Core,
192 inner_arc: &Rc<RefCell<GraphInner>>,
193 snapshot: &GraphPersistSnapshot,
194) -> Result<(), SnapshotError> {
195 let graph_name = inner_arc.borrow_mut().name.clone();
196 if snapshot.name != graph_name {
197 return Err(SnapshotError::NameMismatch {
198 expected: snapshot.name.clone(),
199 actual: graph_name,
200 });
201 }
202
203 let binding = core.binding_ptr();
204
205 for (node_name, slice) in &snapshot.nodes {
206 let node_id = resolve_checked(inner_arc, node_name)
207 .ok()
208 .flatten()
209 .ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
210
211 if slice.node_type == "state" {
212 if let Some(ref value) = slice.value {
213 let handle = binding.deserialize_value(value.clone());
214 core.emit(node_id, handle);
215 }
216 }
217
218 match &slice.status {
219 NodeSnapshotStatus::Completed => {
220 core.complete(node_id);
221 }
222 NodeSnapshotStatus::Errored { error } => {
223 if let Some(err_val) = error {
224 let err_handle = binding.deserialize_value(err_val.clone());
225 core.error(node_id, err_handle);
226 }
227 }
228 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
229 }
230 }
231
232 let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
233 let inner = inner_arc.borrow_mut();
234 snapshot
235 .subgraphs
236 .keys()
237 .map(|name| {
238 let child = inner
239 .children
240 .get(name)
241 .ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
242 Ok((name.clone(), child.clone()))
243 })
244 .collect::<Result<Vec<_>, SnapshotError>>()?
245 };
246 for (child_name, child_inner) in child_pairs {
247 restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
248 }
249
250 Ok(())
251}
252
253impl Graph {
254 #[must_use]
256 pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
257 snapshot_of(core, &self.inner)
258 }
259
260 #[must_use]
265 pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
266 snapshot_of(core, &self.inner)
267 }
268
269 pub fn restore(
275 &self,
276 core: &Core,
277 snapshot: &GraphPersistSnapshot,
278 ) -> Result<(), SnapshotError> {
279 restore_into(core, &self.inner, snapshot)
280 }
281
282 pub fn from_snapshot(
295 core: &Core,
296 snapshot: &GraphPersistSnapshot,
297 builder: Option<SnapshotBuilder>,
298 factories: Option<IndexMap<String, NodeFactory>>,
299 ) -> Result<Self, SnapshotError> {
300 let graph = Graph::new(&snapshot.name);
301 let binding: Arc<dyn BindingBoundary> = core.binding();
302
303 if let Some(build_fn) = builder {
304 build_fn(core, &graph);
305 graph.restore(core, snapshot)?;
306 return Ok(graph);
307 }
308
309 let factories = factories.unwrap_or_default();
310 for (child_name, child_snapshot) in &snapshot.subgraphs {
311 let child = graph
312 .mount_new(core, child_name)
313 .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
314 hydrate_subgraph(core, &child, child_snapshot, &binding, &factories)?;
315 }
316 hydrate_nodes(core, &graph, snapshot, &binding, &factories)?;
317
318 Ok(graph)
319 }
320}
321
322fn hydrate_subgraph(
323 core: &Core,
324 g: &Graph,
325 snapshot: &GraphPersistSnapshot,
326 binding: &Arc<dyn BindingBoundary>,
327 factories: &IndexMap<String, NodeFactory>,
328) -> Result<(), SnapshotError> {
329 for (child_name, child_snapshot) in &snapshot.subgraphs {
330 let child = g
331 .mount_new(core, child_name)
332 .map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
333 hydrate_subgraph(core, &child, child_snapshot, binding, factories)?;
334 }
335 hydrate_nodes(core, g, snapshot, binding, factories)
336}
337
338fn hydrate_nodes(
339 core: &Core,
340 g: &Graph,
341 snapshot: &GraphPersistSnapshot,
342 binding: &Arc<dyn BindingBoundary>,
343 factories: &IndexMap<String, NodeFactory>,
344) -> Result<(), SnapshotError> {
345 let mut created: IndexMap<String, NodeId> = IndexMap::new();
346 let mut remaining: Vec<(String, NodeSlice)> = snapshot
347 .nodes
348 .iter()
349 .map(|(n, s)| (n.clone(), s.clone()))
350 .collect();
351
352 loop {
353 let before = remaining.len();
354 let mut still_remaining = Vec::new();
355
356 for (name, slice) in remaining {
357 let deps_resolved: Option<Vec<NodeId>> = if slice.deps.is_empty() {
358 Some(Vec::new())
359 } else {
360 let mut resolved = Vec::with_capacity(slice.deps.len());
361 let mut all_ok = true;
362 for dep_name in &slice.deps {
363 if let Some(&dep_id) = created.get(dep_name) {
364 resolved.push(dep_id);
365 } else {
366 all_ok = false;
367 break;
368 }
369 }
370 if all_ok {
371 Some(resolved)
372 } else {
373 None
374 }
375 };
376
377 if let Some(dep_ids) = deps_resolved {
378 let node_id = if slice.node_type == "state" {
379 let initial = slice
380 .value
381 .as_ref()
382 .map(|v| binding.deserialize_value(v.clone()));
383 g.state(core, &name, initial)
384 .map_err(|_| SnapshotError::UnknownNode(name.clone()))?
385 } else {
386 let factory = factories.get(&slice.node_type).ok_or_else(|| {
387 SnapshotError::MissingFactory(slice.node_type.clone(), name.clone())
388 })?;
389 factory(core, g, &name, &slice, &dep_ids)?
390 };
391 created.insert(name, node_id);
392 } else {
393 still_remaining.push((name, slice));
394 }
395 }
396
397 remaining = still_remaining;
398 if remaining.is_empty() {
399 break;
400 }
401 if remaining.len() == before {
402 let (name, slice) = &remaining[0];
403 return Err(SnapshotError::UnresolvableDeps(
404 name.clone(),
405 slice.deps.clone(),
406 ));
407 }
408 }
409
410 for (name, slice) in &snapshot.nodes {
411 if let Some(&node_id) = created.get(name) {
412 match &slice.status {
413 NodeSnapshotStatus::Completed => {
414 g.complete(core, node_id);
415 }
416 NodeSnapshotStatus::Errored { error } => {
417 if let Some(err_val) = error {
418 let err_handle = binding.deserialize_value(err_val.clone());
419 g.error(core, node_id, err_handle);
420 }
421 }
422 NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
423 }
424 }
425 }
426
427 Ok(())
428}