#![allow(clippy::arc_with_non_send_sync)]
use std::cell::RefCell;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use graphrefly_core::{
Core, CoreFull, HandleId, NodeId, NodeKind, OperatorOp, TerminalKind, TopologyEvent,
TopologySubscriptionId, NO_HANDLE,
};
use indexmap::IndexMap;
use serde::{Serialize, Serializer};
use crate::debug::DebugBindingBoundary;
use crate::graph::{register_ns_sink, unregister_ns_sink, GraphInner};
#[derive(Debug, Clone, Serialize)]
pub struct GraphDescribeOutput {
pub name: String,
pub nodes: IndexMap<String, NodeDescribe>,
pub edges: Vec<EdgeDescribe>,
pub subgraphs: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct NodeDescribe {
#[serde(rename = "type")]
pub r#type: NodeTypeStr,
pub status: NodeStatus,
pub value: Option<DescribeValue>,
pub deps: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none", rename = "operator")]
pub operator_kind: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub meta: Option<serde_json::Value>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum DescribeValue {
Handle(HandleId),
Rendered(serde_json::Value),
}
impl Serialize for DescribeValue {
fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
match self {
DescribeValue::Handle(h) => ser.serialize_u64(h.raw()),
DescribeValue::Rendered(v) => v.serialize(ser),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct EdgeDescribe {
pub from: String,
pub to: String,
}
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum NodeTypeStr {
State,
Derived,
Dynamic,
Producer,
Effect,
Operator,
}
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NodeStatus {
Sentinel,
Pending,
Dirty,
Settled,
Resolved,
Completed,
Errored,
}
pub(crate) fn describe_of(
core: &dyn CoreFull,
inner_arc: &Rc<RefCell<GraphInner>>,
debug: Option<&dyn DebugBindingBoundary>,
) -> GraphDescribeOutput {
let (graph_name, local_names, subgraphs, names_iter) = {
let inner = inner_arc.borrow_mut();
let graph_name = inner.name.clone();
let local_names: IndexMap<NodeId, String> = inner
.names
.iter()
.map(|(name, id)| (*id, name.clone()))
.collect();
let subgraphs: Vec<String> = inner.children.keys().cloned().collect();
let names_iter: Vec<(String, NodeId)> =
inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect();
(graph_name, local_names, subgraphs, names_iter)
};
let mut nodes: IndexMap<String, NodeDescribe> = IndexMap::new();
let mut edges: Vec<EdgeDescribe> = Vec::new();
for (name, id) in &names_iter {
let kind = core.kind_of(*id).unwrap_or(NodeKind::State);
let cache = core.cache_of(*id);
let terminal = core.is_terminal(*id);
let dirty = core.is_dirty(*id);
let fired = core.has_fired_once(*id);
let dep_ids = core.deps_of(*id);
let dep_names: Vec<String> = dep_ids
.iter()
.map(|d| {
local_names
.get(d)
.cloned()
.unwrap_or_else(|| format!("_anon_{}", d.raw()))
})
.collect();
for dep_name in &dep_names {
edges.push(EdgeDescribe {
from: dep_name.clone(),
to: name.clone(),
});
}
let value = if cache == NO_HANDLE {
None
} else if let Some(debug) = debug {
Some(DescribeValue::Rendered(debug.handle_to_debug(cache)))
} else {
Some(DescribeValue::Handle(cache))
};
let operator_kind = match kind {
NodeKind::Operator(op) => Some(operator_op_name(op)),
_ => None,
};
nodes.insert(
name.clone(),
NodeDescribe {
r#type: type_str_of(kind),
status: status_of(kind, cache, terminal, dirty, fired),
value,
deps: dep_names,
operator_kind,
meta: None,
},
);
}
GraphDescribeOutput {
name: graph_name,
nodes,
edges,
subgraphs,
}
}
fn type_str_of(kind: NodeKind) -> NodeTypeStr {
match kind {
NodeKind::State => NodeTypeStr::State,
NodeKind::Producer => NodeTypeStr::Producer,
NodeKind::Derived => NodeTypeStr::Derived,
NodeKind::Dynamic => NodeTypeStr::Dynamic,
NodeKind::Operator(_) => NodeTypeStr::Operator,
}
}
fn operator_op_name(op: OperatorOp) -> String {
match op {
OperatorOp::Map { .. } => "map",
OperatorOp::Filter { .. } => "filter",
OperatorOp::Scan { .. } => "scan",
OperatorOp::Reduce { .. } => "reduce",
OperatorOp::DistinctUntilChanged { .. } => "distinctUntilChanged",
OperatorOp::Pairwise { .. } => "pairwise",
OperatorOp::Combine { .. } => "combine",
OperatorOp::WithLatestFrom { .. } => "withLatestFrom",
OperatorOp::Merge => "merge",
OperatorOp::Take { .. } => "take",
OperatorOp::Skip { .. } => "skip",
OperatorOp::TakeWhile { .. } => "takeWhile",
OperatorOp::Last { .. } => "last",
OperatorOp::Tap { .. } => "tap",
OperatorOp::TapFirst { .. } => "tapFirst",
OperatorOp::Valve => "valve",
OperatorOp::Settle { .. } => "settle",
}
.to_owned()
}
fn status_of(
kind: NodeKind,
cache: HandleId,
terminal: Option<TerminalKind>,
dirty: bool,
fired: bool,
) -> NodeStatus {
match terminal {
Some(TerminalKind::Error(_)) => return NodeStatus::Errored,
Some(TerminalKind::Complete) => return NodeStatus::Completed,
None => {}
}
if dirty {
return NodeStatus::Dirty;
}
if cache == NO_HANDLE {
return match kind {
NodeKind::State => NodeStatus::Sentinel,
NodeKind::Producer | NodeKind::Derived | NodeKind::Dynamic | NodeKind::Operator(_) => {
if fired {
NodeStatus::Sentinel
} else {
NodeStatus::Pending
}
}
};
}
NodeStatus::Settled
}
pub type DescribeSink = Arc<dyn Fn(&GraphDescribeOutput)>;
#[must_use = "ReactiveDescribeHandle holds a Core topology sub NOT tracked by OwnedCore; you MUST call detach(core) or it leaks"]
pub struct ReactiveDescribeHandle {
inner: Rc<RefCell<GraphInner>>,
ns_sink_id: u64,
topo_sub_id: TopologySubscriptionId,
}
impl ReactiveDescribeHandle {
pub fn detach(&self, core: &Core) {
core.unsubscribe_topology(self.topo_sub_id);
unregister_ns_sink(&self.inner, self.ns_sink_id);
}
}
pub(crate) fn describe_reactive_in(
core: &Core,
inner: &Rc<RefCell<GraphInner>>,
sink: &DescribeSink,
) -> ReactiveDescribeHandle {
sink(&describe_of(core, inner, None));
let weak_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(inner);
let sink_ns = sink.clone();
let ns_sink: crate::graph::NamespaceChangeSink = Arc::new(move |c: &Core| {
let Some(arc_inner) = weak_inner.upgrade() else {
return;
};
sink_ns(&describe_of(c, &arc_inner, None));
});
let ns_sink_id = register_ns_sink(inner, ns_sink);
let weak_inner_topo: Weak<RefCell<GraphInner>> = Rc::downgrade(inner);
let deferred = core.defer_queue();
let sink_topo = sink.clone();
let scheduled = Rc::new(std::cell::Cell::new(false));
let topo_sink: Arc<dyn Fn(&TopologyEvent)> = Arc::new(move |event: &TopologyEvent| {
if matches!(event, TopologyEvent::DepsChanged { .. }) {
if scheduled.get() {
return; }
let Some(arc_inner) = weak_inner_topo.upgrade() else {
return;
};
let s = sink_topo.clone();
let sched = Rc::clone(&scheduled);
sched.set(true);
let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
sched.set(false);
s(&describe_of(cf, &arc_inner, None));
}));
}
});
let topo_sub_id = core.subscribe_topology(topo_sink);
ReactiveDescribeHandle {
inner: inner.clone(),
ns_sink_id,
topo_sub_id,
}
}