1#![allow(clippy::arc_with_non_send_sync)]
9
10use std::cell::RefCell;
29use std::rc::{Rc, Weak};
30use std::sync::Arc;
31
32use graphrefly_core::{
33 Core, CoreFull, HandleId, NodeId, NodeKind, OperatorOp, TerminalKind, TopologyEvent,
34 TopologySubscriptionId, NO_HANDLE,
35};
36use indexmap::IndexMap;
37use serde::{Serialize, Serializer};
38
39use crate::debug::DebugBindingBoundary;
40use crate::graph::{register_ns_sink, unregister_ns_sink, GraphInner};
41
42#[derive(Debug, Clone, Serialize)]
44pub struct GraphDescribeOutput {
45 pub name: String,
47 pub nodes: IndexMap<String, NodeDescribe>,
49 pub edges: Vec<EdgeDescribe>,
51 pub subgraphs: Vec<String>,
53}
54
55#[derive(Debug, Clone, Serialize)]
57pub struct NodeDescribe {
58 #[serde(rename = "type")]
60 pub r#type: NodeTypeStr,
61 pub status: NodeStatus,
63 pub value: Option<DescribeValue>,
65 pub deps: Vec<String>,
67 #[serde(default, skip_serializing_if = "Option::is_none", rename = "operator")]
69 pub operator_kind: Option<String>,
70 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub meta: Option<serde_json::Value>,
74}
75
76#[derive(Debug, Clone, PartialEq)]
79pub enum DescribeValue {
80 Handle(HandleId),
82 Rendered(serde_json::Value),
84}
85
86impl Serialize for DescribeValue {
87 fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
88 match self {
89 DescribeValue::Handle(h) => ser.serialize_u64(h.raw()),
90 DescribeValue::Rendered(v) => v.serialize(ser),
91 }
92 }
93}
94
95#[derive(Debug, Clone, Serialize)]
97pub struct EdgeDescribe {
98 pub from: String,
99 pub to: String,
100}
101
102#[derive(Debug, Clone, Copy, Serialize)]
104#[serde(rename_all = "lowercase")]
105pub enum NodeTypeStr {
106 State,
107 Derived,
108 Dynamic,
109 Producer,
110 Effect,
111 Operator,
112}
113
114#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
116#[serde(rename_all = "lowercase")]
117pub enum NodeStatus {
118 Sentinel,
119 Pending,
120 Dirty,
121 Settled,
122 Resolved,
123 Completed,
124 Errored,
125}
126
127pub(crate) fn describe_of(
131 core: &dyn CoreFull,
132 inner_arc: &Rc<RefCell<GraphInner>>,
133 debug: Option<&dyn DebugBindingBoundary>,
134) -> GraphDescribeOutput {
135 let (graph_name, local_names, subgraphs, names_iter) = {
136 let inner = inner_arc.borrow_mut();
137 let graph_name = inner.name.clone();
138 let local_names: IndexMap<NodeId, String> = inner
139 .names
140 .iter()
141 .map(|(name, id)| (*id, name.clone()))
142 .collect();
143 let subgraphs: Vec<String> = inner.children.keys().cloned().collect();
144 let names_iter: Vec<(String, NodeId)> =
145 inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect();
146 (graph_name, local_names, subgraphs, names_iter)
147 };
148
149 let mut nodes: IndexMap<String, NodeDescribe> = IndexMap::new();
150 let mut edges: Vec<EdgeDescribe> = Vec::new();
151
152 for (name, id) in &names_iter {
153 let kind = core.kind_of(*id).unwrap_or(NodeKind::State);
154 let cache = core.cache_of(*id);
155 let terminal = core.is_terminal(*id);
156 let dirty = core.is_dirty(*id);
157 let fired = core.has_fired_once(*id);
158
159 let dep_ids = core.deps_of(*id);
160 let dep_names: Vec<String> = dep_ids
161 .iter()
162 .map(|d| {
163 local_names
164 .get(d)
165 .cloned()
166 .unwrap_or_else(|| format!("_anon_{}", d.raw()))
167 })
168 .collect();
169 for dep_name in &dep_names {
170 edges.push(EdgeDescribe {
171 from: dep_name.clone(),
172 to: name.clone(),
173 });
174 }
175
176 let value = if cache == NO_HANDLE {
177 None
178 } else if let Some(debug) = debug {
179 Some(DescribeValue::Rendered(debug.handle_to_debug(cache)))
180 } else {
181 Some(DescribeValue::Handle(cache))
182 };
183
184 let operator_kind = match kind {
185 NodeKind::Operator(op) => Some(operator_op_name(op)),
186 _ => None,
187 };
188 nodes.insert(
189 name.clone(),
190 NodeDescribe {
191 r#type: type_str_of(kind),
192 status: status_of(kind, cache, terminal, dirty, fired),
193 value,
194 deps: dep_names,
195 operator_kind,
196 meta: None,
197 },
198 );
199 }
200
201 GraphDescribeOutput {
202 name: graph_name,
203 nodes,
204 edges,
205 subgraphs,
206 }
207}
208
209fn type_str_of(kind: NodeKind) -> NodeTypeStr {
210 match kind {
211 NodeKind::State => NodeTypeStr::State,
212 NodeKind::Producer => NodeTypeStr::Producer,
213 NodeKind::Derived => NodeTypeStr::Derived,
214 NodeKind::Dynamic => NodeTypeStr::Dynamic,
215 NodeKind::Operator(_) => NodeTypeStr::Operator,
216 }
217}
218
219fn operator_op_name(op: OperatorOp) -> String {
220 match op {
221 OperatorOp::Map { .. } => "map",
222 OperatorOp::Filter { .. } => "filter",
223 OperatorOp::Scan { .. } => "scan",
224 OperatorOp::Reduce { .. } => "reduce",
225 OperatorOp::DistinctUntilChanged { .. } => "distinctUntilChanged",
226 OperatorOp::Pairwise { .. } => "pairwise",
227 OperatorOp::Combine { .. } => "combine",
228 OperatorOp::WithLatestFrom { .. } => "withLatestFrom",
229 OperatorOp::Merge => "merge",
230 OperatorOp::Take { .. } => "take",
231 OperatorOp::Skip { .. } => "skip",
232 OperatorOp::TakeWhile { .. } => "takeWhile",
233 OperatorOp::Last { .. } => "last",
234 OperatorOp::Tap { .. } => "tap",
235 OperatorOp::TapFirst { .. } => "tapFirst",
236 OperatorOp::Valve => "valve",
237 OperatorOp::Settle { .. } => "settle",
238 }
239 .to_owned()
240}
241
242fn status_of(
247 kind: NodeKind,
248 cache: HandleId,
249 terminal: Option<TerminalKind>,
250 dirty: bool,
251 fired: bool,
252) -> NodeStatus {
253 match terminal {
254 Some(TerminalKind::Error(_)) => return NodeStatus::Errored,
255 Some(TerminalKind::Complete) => return NodeStatus::Completed,
256 None => {}
257 }
258 if dirty {
259 return NodeStatus::Dirty;
260 }
261 if cache == NO_HANDLE {
262 return match kind {
263 NodeKind::State => NodeStatus::Sentinel,
264 NodeKind::Producer | NodeKind::Derived | NodeKind::Dynamic | NodeKind::Operator(_) => {
265 if fired {
266 NodeStatus::Sentinel
267 } else {
268 NodeStatus::Pending
269 }
270 }
271 };
272 }
273 NodeStatus::Settled
274}
275
276pub type DescribeSink = Arc<dyn Fn(&GraphDescribeOutput)>;
282
283#[must_use = "ReactiveDescribeHandle holds a Core topology sub NOT tracked by OwnedCore; you MUST call detach(core) or it leaks"]
293pub struct ReactiveDescribeHandle {
294 inner: Rc<RefCell<GraphInner>>,
295 ns_sink_id: u64,
296 topo_sub_id: TopologySubscriptionId,
301}
302
303impl ReactiveDescribeHandle {
304 pub fn detach(&self, core: &Core) {
308 core.unsubscribe_topology(self.topo_sub_id);
309 unregister_ns_sink(&self.inner, self.ns_sink_id);
310 }
311}
312
313pub(crate) fn describe_reactive_in(
318 core: &Core,
319 inner: &Rc<RefCell<GraphInner>>,
320 sink: &DescribeSink,
321) -> ReactiveDescribeHandle {
322 sink(&describe_of(core, inner, None));
324
325 let weak_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(inner);
327 let sink_ns = sink.clone();
328 let ns_sink: crate::graph::NamespaceChangeSink = Arc::new(move |c: &Core| {
329 let Some(arc_inner) = weak_inner.upgrade() else {
330 return;
331 };
332 sink_ns(&describe_of(c, &arc_inner, None));
333 });
334 let ns_sink_id = register_ns_sink(inner, ns_sink);
335
336 let weak_inner_topo: Weak<RefCell<GraphInner>> = Rc::downgrade(inner);
340 let deferred = core.defer_queue();
344 let sink_topo = sink.clone();
345 let scheduled = Rc::new(std::cell::Cell::new(false));
353 let topo_sink: Arc<dyn Fn(&TopologyEvent)> = Arc::new(move |event: &TopologyEvent| {
354 if matches!(event, TopologyEvent::DepsChanged { .. }) {
355 if scheduled.get() {
356 return; }
358 let Some(arc_inner) = weak_inner_topo.upgrade() else {
363 return;
364 };
365 let s = sink_topo.clone();
366 let sched = Rc::clone(&scheduled);
367 sched.set(true);
368 let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
373 sched.set(false);
374 s(&describe_of(cf, &arc_inner, None));
375 }));
376 }
377 });
378 let topo_sub_id = core.subscribe_topology(topo_sink);
379
380 ReactiveDescribeHandle {
381 inner: inner.clone(),
382 ns_sink_id,
383 topo_sub_id,
384 }
385}