use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::Arc;
use graphrefly_core::{BindingBoundary, Core, CoreFull, NodeId, NodeKind, TerminalKind, NO_HANDLE};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use crate::graph::{resolve_checked, Graph, GraphInner, PATH_SEP};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphPersistSnapshot {
pub name: String,
pub nodes: IndexMap<String, NodeSlice>,
#[serde(default, skip_serializing_if = "IndexMap::is_empty")]
pub subgraphs: IndexMap<String, GraphPersistSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeSlice {
#[serde(rename = "type")]
pub node_type: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value: Option<serde_json::Value>,
pub status: NodeSnapshotStatus,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub deps: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NodeSnapshotStatus {
Sentinel,
Live,
Completed,
Errored {
#[serde(default, skip_serializing_if = "Option::is_none")]
error: Option<serde_json::Value>,
},
}
#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
#[error("snapshot name `{expected}` does not match graph name `{actual}`")]
NameMismatch { expected: String, actual: String },
#[error("node `{0}` in snapshot not found in graph namespace")]
UnknownNode(String),
#[error("subgraph `{0}` in snapshot not found in graph mount tree")]
UnknownSubgraph(String),
#[error("auto-hydration: unresolvable deps for node `{0}` (deps: {1:?})")]
UnresolvableDeps(String, Vec<String>),
#[error("auto-hydration: no factory registered for node type `{0}` (node `{1}`)")]
MissingFactory(String, String),
#[error("snapshot decode: state node `{name}` at graph `{graph_path}` collides with an existing child mount of the same name")]
NameCollision { name: String, graph_path: String },
}
pub type NodeFactory =
Box<dyn Fn(&Core, &Graph, &str, &NodeSlice, &[NodeId]) -> Result<NodeId, SnapshotError>>;
pub type SnapshotBuilder = Box<dyn FnOnce(&Core, &Graph)>;
pub(crate) fn snapshot_of(
core: &dyn CoreFull,
inner_arc: &Rc<RefCell<GraphInner>>,
) -> GraphPersistSnapshot {
let id_to_tree_path = build_id_to_tree_path(inner_arc);
snapshot_of_with_tree_paths(core, inner_arc, &id_to_tree_path, "")
}
fn build_id_to_tree_path(root: &Rc<RefCell<GraphInner>>) -> HashMap<NodeId, String> {
let mut map = HashMap::new();
walk_tree_paths(root, "", &mut map);
map
}
fn walk_tree_paths(
inner_arc: &Rc<RefCell<GraphInner>>,
path_prefix: &str,
out: &mut HashMap<NodeId, String>,
) {
let inner = inner_arc.borrow_mut();
let names: Vec<(String, NodeId)> = inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
.children
.iter()
.map(|(n, g)| (n.clone(), g.clone()))
.collect();
drop(inner);
for (name, id) in &names {
let abs_path = if path_prefix.is_empty() {
name.clone()
} else {
format!("{path_prefix}{PATH_SEP}{name}")
};
out.insert(*id, abs_path);
}
for (child_name, child_inner) in &children {
let child_prefix = if path_prefix.is_empty() {
child_name.clone()
} else {
format!("{path_prefix}{PATH_SEP}{child_name}")
};
walk_tree_paths(child_inner, &child_prefix, out);
}
}
fn absolute_to_owner_relative(owner_path: &str, abs_path: &str) -> String {
debug_assert_ne!(
owner_path, abs_path,
"D276 invariant: self-deps are rejected at registration; \
encoding a dep whose absolute path equals the owner's would emit \
an empty relative path"
);
let owner_segs: Vec<&str> = if owner_path.is_empty() {
Vec::new()
} else {
owner_path.split(PATH_SEP).collect()
};
let abs_segs: Vec<&str> = if abs_path.is_empty() {
Vec::new()
} else {
abs_path.split(PATH_SEP).collect()
};
let mut common = 0;
while common < owner_segs.len()
&& common < abs_segs.len()
&& owner_segs[common] == abs_segs[common]
{
common += 1;
}
let up_count = owner_segs.len() - common;
let down_segs = &abs_segs[common..];
if up_count == 0 {
return down_segs.join(PATH_SEP);
}
let mut parts: Vec<&str> = vec![".."; up_count];
parts.extend(down_segs);
parts.join(PATH_SEP)
}
fn snapshot_of_with_tree_paths(
core: &dyn CoreFull,
inner_arc: &Rc<RefCell<GraphInner>>,
id_to_tree_path: &HashMap<NodeId, String>,
owner_path: &str,
) -> GraphPersistSnapshot {
let (name, node_entries, children, id_to_name) = {
let inner = inner_arc.borrow_mut();
let name = inner.name.clone();
let node_entries: Vec<(String, NodeId)> =
inner.names.iter().map(|(n, &id)| (n.clone(), id)).collect();
let children: Vec<(String, Rc<RefCell<GraphInner>>)> = inner
.children
.iter()
.map(|(n, g)| (n.clone(), g.clone()))
.collect();
let id_to_name: IndexMap<NodeId, String> =
inner.names.iter().map(|(n, &id)| (id, n.clone())).collect();
(name, node_entries, children, id_to_name)
};
let mut nodes = IndexMap::new();
for (node_name, node_id) in &node_entries {
let kind = core.kind_of(*node_id);
let node_type = match kind {
Some(NodeKind::State) => "state",
Some(NodeKind::Derived) => "derived",
Some(NodeKind::Dynamic) => "dynamic",
Some(NodeKind::Producer) => "producer",
Some(NodeKind::Operator(_)) => "operator",
None => "unknown",
};
let cache = core.cache_of(*node_id);
let value = if cache == NO_HANDLE {
None
} else {
core.serialize_handle(cache)
};
let terminal = core.is_terminal(*node_id);
let status = match terminal {
Some(TerminalKind::Complete) => NodeSnapshotStatus::Completed,
Some(TerminalKind::Error(err_handle)) => NodeSnapshotStatus::Errored {
error: core.serialize_handle(err_handle),
},
None => {
if core.has_fired_once(*node_id) || cache != NO_HANDLE {
NodeSnapshotStatus::Live
} else {
NodeSnapshotStatus::Sentinel
}
}
};
let dep_ids = core.deps_of(*node_id);
let deps: Vec<String> = dep_ids
.iter()
.map(|dep_id| {
if let Some(local_name) = id_to_name.get(dep_id) {
local_name.clone()
} else if let Some(tree_path) = id_to_tree_path.get(dep_id) {
absolute_to_owner_relative(owner_path, tree_path)
} else {
format!("_anon_{}", dep_id.raw())
}
})
.collect();
nodes.insert(
node_name.clone(),
NodeSlice {
node_type: node_type.to_owned(),
value,
status,
deps,
},
);
}
let mut subgraphs = IndexMap::new();
for (child_name, child_inner) in children {
let child_owner_path = if owner_path.is_empty() {
child_name.clone()
} else {
format!("{owner_path}{PATH_SEP}{child_name}")
};
subgraphs.insert(
child_name,
snapshot_of_with_tree_paths(core, &child_inner, id_to_tree_path, &child_owner_path),
);
}
GraphPersistSnapshot {
name,
nodes,
subgraphs,
}
}
fn restore_into(
core: &Core,
inner_arc: &Rc<RefCell<GraphInner>>,
snapshot: &GraphPersistSnapshot,
) -> Result<(), SnapshotError> {
let graph_name = inner_arc.borrow_mut().name.clone();
if snapshot.name != graph_name {
return Err(SnapshotError::NameMismatch {
expected: snapshot.name.clone(),
actual: graph_name,
});
}
let binding = core.binding_ptr();
for (node_name, slice) in &snapshot.nodes {
let node_id = resolve_checked(inner_arc, node_name)
.ok()
.flatten()
.ok_or_else(|| SnapshotError::UnknownNode(node_name.clone()))?;
if slice.node_type == "state" {
if let Some(ref value) = slice.value {
let handle = binding.deserialize_value(value.clone());
core.emit(node_id, handle);
}
}
match &slice.status {
NodeSnapshotStatus::Completed => {
core.complete(node_id);
}
NodeSnapshotStatus::Errored { error } => {
if let Some(err_val) = error {
let err_handle = binding.deserialize_value(err_val.clone());
core.error(node_id, err_handle);
}
}
NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
}
}
let child_pairs: Vec<(String, Rc<RefCell<GraphInner>>)> = {
let inner = inner_arc.borrow_mut();
snapshot
.subgraphs
.keys()
.map(|name| {
let child = inner
.children
.get(name)
.ok_or_else(|| SnapshotError::UnknownSubgraph(name.clone()))?;
Ok((name.clone(), child.clone()))
})
.collect::<Result<Vec<_>, SnapshotError>>()?
};
for (child_name, child_inner) in child_pairs {
restore_into(core, &child_inner, &snapshot.subgraphs[&child_name])?;
}
Ok(())
}
impl Graph {
#[must_use]
pub fn snapshot(&self, core: &Core) -> GraphPersistSnapshot {
snapshot_of(core, &self.inner)
}
#[must_use]
pub fn snapshot_full(&self, core: &dyn CoreFull) -> GraphPersistSnapshot {
snapshot_of(core, &self.inner)
}
pub fn restore(
&self,
core: &Core,
snapshot: &GraphPersistSnapshot,
) -> Result<(), SnapshotError> {
restore_into(core, &self.inner, snapshot)
}
pub fn from_snapshot(
core: &Core,
snapshot: &GraphPersistSnapshot,
builder: Option<SnapshotBuilder>,
factories: Option<IndexMap<String, NodeFactory>>,
) -> Result<Self, SnapshotError> {
let graph = Graph::new(&snapshot.name);
let binding: Arc<dyn BindingBoundary> = core.binding();
if let Some(build_fn) = builder {
build_fn(core, &graph);
graph.restore(core, snapshot)?;
return Ok(graph);
}
let factories = factories.unwrap_or_default();
let mut graph_map: IndexMap<String, Graph> = IndexMap::new();
graph_map.insert(String::new(), graph.clone());
mount_subgraphs_recursive(core, &graph, snapshot, "", &mut graph_map)?;
create_state_nodes_recursive(core, snapshot, "", &graph_map, &binding)?;
let mut derived_queue: Vec<DerivedEntry> = Vec::new();
collect_derived_recursive(snapshot, "", &graph_map, &mut derived_queue);
create_derived_with_retry(core, &factories, derived_queue)?;
apply_status_recursive(core, snapshot, "", &graph_map, &binding)?;
Ok(graph)
}
}
struct DerivedEntry {
owner_graph: Graph,
name: String,
slice: NodeSlice,
}
fn mount_subgraphs_recursive(
core: &Core,
parent: &Graph,
snap: &GraphPersistSnapshot,
parent_path: &str,
graph_map: &mut IndexMap<String, Graph>,
) -> Result<(), SnapshotError> {
for (child_name, child_snap) in &snap.subgraphs {
let child_graph = parent
.mount_new(core, child_name)
.map_err(|_| SnapshotError::UnknownSubgraph(child_name.clone()))?;
let child_path = if parent_path.is_empty() {
child_name.clone()
} else {
format!("{parent_path}{PATH_SEP}{child_name}")
};
graph_map.insert(child_path.clone(), child_graph.clone());
mount_subgraphs_recursive(core, &child_graph, child_snap, &child_path, graph_map)?;
}
Ok(())
}
fn create_state_nodes_recursive(
core: &Core,
snap: &GraphPersistSnapshot,
owner_path: &str,
graph_map: &IndexMap<String, Graph>,
binding: &Arc<dyn BindingBoundary>,
) -> Result<(), SnapshotError> {
let owner_graph = graph_map
.get(owner_path)
.expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
let child_mount_names: std::collections::HashSet<String> =
owner_graph.child_names().into_iter().collect();
for (name, slice) in &snap.nodes {
if slice.node_type == "state" && child_mount_names.contains(name) {
return Err(SnapshotError::NameCollision {
name: name.clone(),
graph_path: owner_path.to_owned(),
});
}
}
for (name, slice) in &snap.nodes {
if slice.node_type == "state" {
let initial = slice
.value
.as_ref()
.map(|v| binding.deserialize_value(v.clone()));
owner_graph
.state(core, name, initial)
.map_err(|_| SnapshotError::UnknownNode(name.clone()))?;
}
}
for (child_name, child_snap) in &snap.subgraphs {
let child_path = if owner_path.is_empty() {
child_name.clone()
} else {
format!("{owner_path}{PATH_SEP}{child_name}")
};
create_state_nodes_recursive(core, child_snap, &child_path, graph_map, binding)?;
}
Ok(())
}
fn collect_derived_recursive(
snap: &GraphPersistSnapshot,
owner_path: &str,
graph_map: &IndexMap<String, Graph>,
out: &mut Vec<DerivedEntry>,
) {
let owner_graph = graph_map
.get(owner_path)
.expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
for (name, slice) in &snap.nodes {
if slice.node_type != "state" {
out.push(DerivedEntry {
owner_graph: owner_graph.clone(),
name: name.clone(),
slice: slice.clone(),
});
}
}
for (child_name, child_snap) in &snap.subgraphs {
let child_path = if owner_path.is_empty() {
child_name.clone()
} else {
format!("{owner_path}{PATH_SEP}{child_name}")
};
collect_derived_recursive(child_snap, &child_path, graph_map, out);
}
}
fn create_derived_with_retry(
core: &Core,
factories: &IndexMap<String, NodeFactory>,
entries: Vec<DerivedEntry>,
) -> Result<(), SnapshotError> {
let mut remaining = entries;
loop {
let before = remaining.len();
let mut still_remaining = Vec::new();
for entry in remaining {
let mut resolved = Vec::with_capacity(entry.slice.deps.len());
let mut all_ok = true;
for dep_name in &entry.slice.deps {
if let Some(dep_id) = entry.owner_graph.try_resolve(dep_name) {
resolved.push(dep_id);
} else {
all_ok = false;
break;
}
}
if all_ok {
let factory = factories.get(&entry.slice.node_type).ok_or_else(|| {
SnapshotError::MissingFactory(entry.slice.node_type.clone(), entry.name.clone())
})?;
factory(
core,
&entry.owner_graph,
&entry.name,
&entry.slice,
&resolved,
)?;
} else {
still_remaining.push(entry);
}
}
remaining = still_remaining;
if remaining.is_empty() {
break;
}
if remaining.len() == before {
let entry = &remaining[0];
return Err(SnapshotError::UnresolvableDeps(
entry.name.clone(),
entry.slice.deps.clone(),
));
}
}
Ok(())
}
fn apply_status_recursive(
core: &Core,
snap: &GraphPersistSnapshot,
owner_path: &str,
graph_map: &IndexMap<String, Graph>,
binding: &Arc<dyn BindingBoundary>,
) -> Result<(), SnapshotError> {
let owner_graph = graph_map
.get(owner_path)
.expect("D276 invariant: graph_map covers every subgraph mounted in Pass 0");
for (name, slice) in &snap.nodes {
let node_id = owner_graph
.try_resolve(name)
.ok_or_else(|| SnapshotError::UnknownNode(name.clone()))?;
match &slice.status {
NodeSnapshotStatus::Completed => {
owner_graph.complete(core, node_id);
}
NodeSnapshotStatus::Errored { error } => {
if let Some(err_val) = error {
let err_handle = binding.deserialize_value(err_val.clone());
owner_graph.error(core, node_id, err_handle);
}
}
NodeSnapshotStatus::Sentinel | NodeSnapshotStatus::Live => {}
}
}
for (child_name, child_snap) in &snap.subgraphs {
let child_path = if owner_path.is_empty() {
child_name.clone()
} else {
format!("{owner_path}{PATH_SEP}{child_name}")
};
apply_status_recursive(core, child_snap, &child_path, graph_map, binding)?;
}
Ok(())
}