use std::sync::{Arc, Weak};
use graphrefly_core::{Core, HandleId, NodeId, NodeKind, TerminalKind, NO_HANDLE};
use indexmap::IndexMap;
use parking_lot::Mutex;
use serde::{Serialize, Serializer};
use crate::graph::{Graph, GraphInner};
#[derive(Debug, Clone, Serialize)]
pub struct GraphDescribeOutput {
pub name: String,
pub nodes: IndexMap<String, NodeDescribe>,
pub edges: Vec<EdgeDescribe>,
pub subgraphs: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct NodeDescribe {
#[serde(rename = "type")]
pub r#type: NodeTypeStr,
pub status: NodeStatus,
#[serde(serialize_with = "ser_opt_handle")]
pub value: Option<HandleId>,
pub deps: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub meta: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize)]
pub struct EdgeDescribe {
pub from: String,
pub to: String,
}
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum NodeTypeStr {
State,
Derived,
Dynamic,
Producer,
Effect,
Operator,
}
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NodeStatus {
Sentinel,
Pending,
Dirty,
Settled,
Resolved,
Completed,
Errored,
}
impl Graph {
#[must_use]
pub fn describe(&self) -> GraphDescribeOutput {
let inner = self.inner.lock();
let graph_name = inner.name.clone();
let local_names: IndexMap<NodeId, String> = inner
.names
.iter()
.map(|(name, id)| (*id, name.clone()))
.collect();
let subgraphs: Vec<String> = inner.children.keys().cloned().collect();
let names_iter: Vec<(String, NodeId)> =
inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect();
drop(inner);
let mut nodes: IndexMap<String, NodeDescribe> = IndexMap::new();
let mut edges: Vec<EdgeDescribe> = Vec::new();
for (name, id) in &names_iter {
let kind = self.core.kind_of(*id).unwrap_or(NodeKind::State);
let cache = self.core.cache_of(*id);
let terminal = self.core.is_terminal(*id);
let dirty = self.core.is_dirty(*id);
let fired = self.core.has_fired_once(*id);
let dep_ids = self.core.deps_of(*id);
let dep_names: Vec<String> = dep_ids
.iter()
.map(|d| {
local_names
.get(d)
.cloned()
.unwrap_or_else(|| format!("_anon_{}", d.raw()))
})
.collect();
for dep_name in &dep_names {
edges.push(EdgeDescribe {
from: dep_name.clone(),
to: name.clone(),
});
}
nodes.insert(
name.clone(),
NodeDescribe {
r#type: type_str_of(kind),
status: status_of(kind, cache, terminal, dirty, fired),
value: if cache == NO_HANDLE {
None
} else {
Some(cache)
},
deps: dep_names,
meta: None,
},
);
}
GraphDescribeOutput {
name: graph_name,
nodes,
edges,
subgraphs,
}
}
}
#[allow(clippy::ref_option)]
fn ser_opt_handle<S: Serializer>(value: &Option<HandleId>, ser: S) -> Result<S::Ok, S::Error> {
match value {
Some(h) => ser.serialize_some(&h.raw()),
None => ser.serialize_none(),
}
}
fn type_str_of(kind: NodeKind) -> NodeTypeStr {
match kind {
NodeKind::State => NodeTypeStr::State,
NodeKind::Producer => NodeTypeStr::Producer,
NodeKind::Derived => NodeTypeStr::Derived,
NodeKind::Dynamic => NodeTypeStr::Dynamic,
NodeKind::Operator(_) => NodeTypeStr::Operator,
}
}
fn status_of(
kind: NodeKind,
cache: HandleId,
terminal: Option<TerminalKind>,
dirty: bool,
fired: bool,
) -> NodeStatus {
match terminal {
Some(TerminalKind::Error(_)) => return NodeStatus::Errored,
Some(TerminalKind::Complete) => return NodeStatus::Completed,
None => {}
}
if dirty {
return NodeStatus::Dirty;
}
if cache == NO_HANDLE {
return match kind {
NodeKind::State => NodeStatus::Sentinel,
NodeKind::Producer | NodeKind::Derived | NodeKind::Dynamic | NodeKind::Operator(_) => {
if fired {
NodeStatus::Sentinel
} else {
NodeStatus::Pending
}
}
};
}
NodeStatus::Settled
}
pub type DescribeSink = Arc<dyn Fn(&GraphDescribeOutput) + Send + Sync>;
#[must_use = "ReactiveDescribeHandle holds the subscription; dropping it unsubscribes"]
pub struct ReactiveDescribeHandle {
graph: Graph,
ns_sink_id: u64,
}
impl Drop for ReactiveDescribeHandle {
fn drop(&mut self) {
self.graph.unsubscribe_namespace_change(self.ns_sink_id);
}
}
const _: fn() = || {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<ReactiveDescribeHandle>();
};
impl Graph {
pub fn describe_reactive(&self, sink: DescribeSink) -> ReactiveDescribeHandle {
sink(&self.describe());
let weak_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.inner);
let core: Core = self.core.clone();
let ns_sink = Arc::new(move || {
let Some(arc_inner) = weak_inner.upgrade() else {
return;
};
let graph = Graph {
core: core.clone(),
inner: arc_inner,
};
let snapshot = graph.describe();
sink(&snapshot);
});
let ns_sink_id = self.subscribe_namespace_change(ns_sink);
ReactiveDescribeHandle {
graph: self.clone(),
ns_sink_id,
}
}
}