Skip to main content

graphrefly_graph/
describe.rs

1// D248: post-S2c the substrate is `!Send + !Sync` single-owner Core; the
2// Sink/TopologySink callbacks were deliberately relaxed to `Arc<dyn Fn>`
3// (dropped `+ Send + Sync`). Rc would suffice and is the architecturally
4// correct type for inherently single-owner sinks — the Arc→Rc cleanup is
5// a separate slice tracked in porting-deferred.md. Until then, `Arc` is
6// over-conservative but correct, and this file's Arc<Sink> sites cite
7// the deliberate D248 relaxation, not a missed Send+Sync bound.
8#![allow(clippy::arc_with_non_send_sync)]
9
10//! `Graph::describe()` — JSON form of canonical spec §3.6 + Appendix B.
11//!
12//! D246: describe logic is a free fn [`describe_of`] over
13//! `(&dyn CoreFull, &Rc<RefCell<GraphInner>>)` so the one [`Graph`]
14//! (`crate::Graph`) reuses it, AND so the in-wave reactive-describe
15//! `MailboxOp::Defer` closure (D246 rule 6) can run it through the
16//! `&dyn CoreFull` it is handed (the one facade carries read-only
17//! inspection). `ReactiveDescribeHandle` holds ids only (Core-free,
18//! `Send`); there is **no RAII `Drop`** (D246 rule 3) — teardown is the
19//! owner-invoked [`ReactiveDescribeHandle::detach`].
20//!
21//! # Value rendering — raw vs. binding-rendered
22//!
23//! Canonical TS surfaces `value: T` directly. The Rust port preserves
24//! the handle-protocol cleaving plane (`value: DescribeValue`):
25//! `Handle(HandleId)` raw u64 (default) or `Rendered(serde_json::Value)`
26//! via [`DebugBindingBoundary`].
27
28use std::cell::RefCell;
29use std::rc::{Rc, Weak};
30use std::sync::Arc;
31
32use graphrefly_core::{
33    Core, CoreFull, HandleId, NodeId, NodeKind, OperatorOp, TerminalKind, TopologyEvent,
34    TopologySubscriptionId, NO_HANDLE,
35};
36use indexmap::IndexMap;
37use serde::{Serialize, Serializer};
38
39use crate::debug::DebugBindingBoundary;
40use crate::graph::{register_ns_sink, unregister_ns_sink, GraphInner};
41
42/// Top-level `describe()` output (canonical Appendix B JSON schema).
43#[derive(Debug, Clone, Serialize)]
44pub struct GraphDescribeOutput {
45    /// Graph name as set at construction / mount.
46    pub name: String,
47    /// Local nodes by name.
48    pub nodes: IndexMap<String, NodeDescribe>,
49    /// Local edges (dep → consumer).
50    pub edges: Vec<EdgeDescribe>,
51    /// Mounted child names.
52    pub subgraphs: Vec<String>,
53}
54
55/// Per-node descriptor.
56#[derive(Debug, Clone, Serialize)]
57pub struct NodeDescribe {
58    /// `"state"` / `"derived"` / `"dynamic"` / `"producer"`.
59    #[serde(rename = "type")]
60    pub r#type: NodeTypeStr,
61    /// Lifecycle status (canonical Appendix B enum).
62    pub status: NodeStatus,
63    /// Current cache value. `None` when sentinel (`NO_HANDLE`).
64    pub value: Option<DescribeValue>,
65    /// Dep names in declaration order (`_anon_<NodeId>` for unnamed).
66    pub deps: Vec<String>,
67    /// Operator discriminant (e.g. `"map"`); `None` for non-operators.
68    #[serde(default, skip_serializing_if = "Option::is_none", rename = "operator")]
69    pub operator_kind: Option<String>,
70    /// Free-form metadata per canonical Appendix B. Always `None` in
71    /// this slice (metadata-storage primitive not yet shipped).
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub meta: Option<serde_json::Value>,
74}
75
76/// Per-node cache value in `describe` output. Serialized uniformly
77/// without an enum tag.
78#[derive(Debug, Clone, PartialEq)]
79pub enum DescribeValue {
80    /// Raw handle view (default for [`crate::GraphOps::describe`]).
81    Handle(HandleId),
82    /// Binding-rendered view (from [`crate::GraphOps::describe_with_debug`]).
83    Rendered(serde_json::Value),
84}
85
86impl Serialize for DescribeValue {
87    fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
88        match self {
89            DescribeValue::Handle(h) => ser.serialize_u64(h.raw()),
90            DescribeValue::Rendered(v) => v.serialize(ser),
91        }
92    }
93}
94
95/// Edge between two named nodes (or a named node and `_anon_<NodeId>`).
96#[derive(Debug, Clone, Serialize)]
97pub struct EdgeDescribe {
98    pub from: String,
99    pub to: String,
100}
101
102/// Canonical Appendix B `type` enum.
103#[derive(Debug, Clone, Copy, Serialize)]
104#[serde(rename_all = "lowercase")]
105pub enum NodeTypeStr {
106    State,
107    Derived,
108    Dynamic,
109    Producer,
110    Effect,
111    Operator,
112}
113
114/// Canonical Appendix B `status` enum.
115#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
116#[serde(rename_all = "lowercase")]
117pub enum NodeStatus {
118    Sentinel,
119    Pending,
120    Dirty,
121    Settled,
122    Resolved,
123    Completed,
124    Errored,
125}
126
127/// β/D243: describe over the read-only-inspection `&dyn CoreFull` (so
128/// the in-wave `MailboxOp::Defer` reactive-describe closure can run it)
129/// + the namespace handle. Pure read; no `Core`-mutation.
130pub(crate) fn describe_of(
131    core: &dyn CoreFull,
132    inner_arc: &Rc<RefCell<GraphInner>>,
133    debug: Option<&dyn DebugBindingBoundary>,
134) -> GraphDescribeOutput {
135    let (graph_name, local_names, subgraphs, names_iter) = {
136        let inner = inner_arc.borrow_mut();
137        let graph_name = inner.name.clone();
138        let local_names: IndexMap<NodeId, String> = inner
139            .names
140            .iter()
141            .map(|(name, id)| (*id, name.clone()))
142            .collect();
143        let subgraphs: Vec<String> = inner.children.keys().cloned().collect();
144        let names_iter: Vec<(String, NodeId)> =
145            inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect();
146        (graph_name, local_names, subgraphs, names_iter)
147    };
148
149    let mut nodes: IndexMap<String, NodeDescribe> = IndexMap::new();
150    let mut edges: Vec<EdgeDescribe> = Vec::new();
151
152    for (name, id) in &names_iter {
153        let kind = core.kind_of(*id).unwrap_or(NodeKind::State);
154        let cache = core.cache_of(*id);
155        let terminal = core.is_terminal(*id);
156        let dirty = core.is_dirty(*id);
157        let fired = core.has_fired_once(*id);
158
159        let dep_ids = core.deps_of(*id);
160        let dep_names: Vec<String> = dep_ids
161            .iter()
162            .map(|d| {
163                local_names
164                    .get(d)
165                    .cloned()
166                    .unwrap_or_else(|| format!("_anon_{}", d.raw()))
167            })
168            .collect();
169        for dep_name in &dep_names {
170            edges.push(EdgeDescribe {
171                from: dep_name.clone(),
172                to: name.clone(),
173            });
174        }
175
176        let value = if cache == NO_HANDLE {
177            None
178        } else if let Some(debug) = debug {
179            Some(DescribeValue::Rendered(debug.handle_to_debug(cache)))
180        } else {
181            Some(DescribeValue::Handle(cache))
182        };
183
184        let operator_kind = match kind {
185            NodeKind::Operator(op) => Some(operator_op_name(op)),
186            _ => None,
187        };
188        nodes.insert(
189            name.clone(),
190            NodeDescribe {
191                r#type: type_str_of(kind),
192                status: status_of(kind, cache, terminal, dirty, fired),
193                value,
194                deps: dep_names,
195                operator_kind,
196                meta: None,
197            },
198        );
199    }
200
201    GraphDescribeOutput {
202        name: graph_name,
203        nodes,
204        edges,
205        subgraphs,
206    }
207}
208
209fn type_str_of(kind: NodeKind) -> NodeTypeStr {
210    match kind {
211        NodeKind::State => NodeTypeStr::State,
212        NodeKind::Producer => NodeTypeStr::Producer,
213        NodeKind::Derived => NodeTypeStr::Derived,
214        NodeKind::Dynamic => NodeTypeStr::Dynamic,
215        NodeKind::Operator(_) => NodeTypeStr::Operator,
216    }
217}
218
219fn operator_op_name(op: OperatorOp) -> String {
220    match op {
221        OperatorOp::Map { .. } => "map",
222        OperatorOp::Filter { .. } => "filter",
223        OperatorOp::Scan { .. } => "scan",
224        OperatorOp::Reduce { .. } => "reduce",
225        OperatorOp::DistinctUntilChanged { .. } => "distinctUntilChanged",
226        OperatorOp::Pairwise { .. } => "pairwise",
227        OperatorOp::Combine { .. } => "combine",
228        OperatorOp::WithLatestFrom { .. } => "withLatestFrom",
229        OperatorOp::Merge => "merge",
230        OperatorOp::Take { .. } => "take",
231        OperatorOp::Skip { .. } => "skip",
232        OperatorOp::TakeWhile { .. } => "takeWhile",
233        OperatorOp::Last { .. } => "last",
234        OperatorOp::Tap { .. } => "tap",
235        OperatorOp::TapFirst { .. } => "tapFirst",
236        OperatorOp::Valve => "valve",
237        OperatorOp::Settle { .. } => "settle",
238    }
239    .to_owned()
240}
241
242/// Canonical-spec §3.6.1 status mapping. Precedence: errored >
243/// completed > dirty > (cache-cleared) > settled > pending > sentinel.
244/// R1.3.7.b: `cache == NO_HANDLE` discriminates Sentinel-vs-Settled
245/// BEFORE the `fired` check (post-INVALIDATE fired compute → Sentinel).
246fn status_of(
247    kind: NodeKind,
248    cache: HandleId,
249    terminal: Option<TerminalKind>,
250    dirty: bool,
251    fired: bool,
252) -> NodeStatus {
253    match terminal {
254        Some(TerminalKind::Error(_)) => return NodeStatus::Errored,
255        Some(TerminalKind::Complete) => return NodeStatus::Completed,
256        None => {}
257    }
258    if dirty {
259        return NodeStatus::Dirty;
260    }
261    if cache == NO_HANDLE {
262        return match kind {
263            NodeKind::State => NodeStatus::Sentinel,
264            NodeKind::Producer | NodeKind::Derived | NodeKind::Dynamic | NodeKind::Operator(_) => {
265                if fired {
266                    NodeStatus::Sentinel
267                } else {
268                    NodeStatus::Pending
269                }
270            }
271        };
272    }
273    NodeStatus::Settled
274}
275
276// -------------------------------------------------------------------
277// Reactive describe (canonical §3.6.1 `reactive: true` mode)
278// -------------------------------------------------------------------
279
280/// Sink type for reactive describe.
281pub type DescribeSink = Arc<dyn Fn(&GraphDescribeOutput)>;
282
283/// Id-bearing handle for a reactive describe subscription.
284///
285/// D246 rule 3: Core-free (`Send`), **no RAII `Drop`** — teardown is
286/// the owner-invoked synchronous [`Self::detach`]. This eliminates the
287/// "unsubscribe in `Drop`" deadlock class. The embedder's
288/// Teardown is the owner-invoked [`Self::detach`]`(core)` — REQUIRED.
289/// The ns-sink is also collected by `graph.destroy(core)`; the Core
290/// topology sub is opened via raw `core.subscribe_topology` and is NOT
291/// `OwnedCore`-tracked, so only `detach(core)` collects it.
292#[must_use = "ReactiveDescribeHandle holds a Core topology sub NOT tracked by OwnedCore; you MUST call detach(core) or it leaks"]
293pub struct ReactiveDescribeHandle {
294    inner: Rc<RefCell<GraphInner>>,
295    ns_sink_id: u64,
296    /// Slice V3 D5: Core topology sub for `DepsChanged` (edges change
297    /// without a namespace change). D246 r6: re-snapshot is in-wave
298    /// `MailboxOp::Defer`'d (the topology event fires inside a Core
299    /// wave; `describe_of` runs via the handed `&dyn CoreFull`).
300    topo_sub_id: TopologySubscriptionId,
301}
302
303impl ReactiveDescribeHandle {
304    /// Owner-invoked, synchronous detach (D246 rule 3). Topology sub
305    /// first (so a topo fire mid-detach can't re-snapshot through a
306    /// half-removed namespace sink), then the namespace sink.
307    pub fn detach(&self, core: &Core) {
308        core.unsubscribe_topology(self.topo_sub_id);
309        unregister_ns_sink(&self.inner, self.ns_sink_id);
310    }
311}
312
313/// Build a reactive-describe subscription. Push-on-subscribe fires
314/// the current snapshot once, then re-fires on every namespace change
315/// (owner-side `&Core`, D246 r2) and on `set_deps` `DepsChanged`
316/// (in-wave `MailboxOp::Defer` → `&dyn CoreFull`, D246 r6).
317pub(crate) fn describe_reactive_in(
318    core: &Core,
319    inner: &Rc<RefCell<GraphInner>>,
320    sink: &DescribeSink,
321) -> ReactiveDescribeHandle {
322    // Push-on-subscribe (no lock held).
323    sink(&describe_of(core, inner, None));
324
325    // Namespace-change path (owner-side `&Core`, β/D231).
326    let weak_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(inner);
327    let sink_ns = sink.clone();
328    let ns_sink: crate::graph::NamespaceChangeSink = Arc::new(move |c: &Core| {
329        let Some(arc_inner) = weak_inner.upgrade() else {
330            return;
331        };
332        sink_ns(&describe_of(c, &arc_inner, None));
333    });
334    let ns_sink_id = register_ns_sink(inner, ns_sink);
335
336    // Topology path (set_deps → `DepsChanged`, fired inside a Core
337    // wave): re-snapshot via an in-wave `MailboxOp::Defer` so it runs
338    // owner-side with a real `&dyn CoreFull` (D243/D233).
339    let weak_inner_topo: Weak<RefCell<GraphInner>> = Rc::downgrade(inner);
340    // D249/S2c: owner-side `!Send` `DeferQueue` (the closure captures a
341    // `Weak<RefCell<GraphInner>>`, `!Send`). Owner-thread-only `Rc` —
342    // fine: this topo sink is `!Send` (D248) and fires owner-side.
343    let deferred = core.defer_queue();
344    let sink_topo = sink.clone();
345    // D246 rule 8 (S4): reusable coalescing slot. Re-snapshot is
346    // idempotent at drain time (`describe_of` reads current state), so
347    // N `DepsChanged` in one wave need only ONE deferred re-snapshot,
348    // not N boxed closures. `scheduled` (owner-thread-only `Cell`) gates
349    // a single `Box` post per drain; the closure clears it so the next
350    // wave re-arms. Behaviour-equivalent (deferred-snapshot acceptable,
351    // D243/D244) — one alloc + one snapshot per wave, not per emission.
352    let scheduled = Rc::new(std::cell::Cell::new(false));
353    let topo_sink: Arc<dyn Fn(&TopologyEvent)> = Arc::new(move |event: &TopologyEvent| {
354        if matches!(event, TopologyEvent::DepsChanged { .. }) {
355            if scheduled.get() {
356                return; // already armed for this drain — coalesce.
357            }
358            // INVARIANT (QA, 2026-05-19): the `upgrade()` check runs
359            // BEFORE `scheduled.set(true)`, so a graph-gone fire never
360            // poisons the slot (`scheduled` stays `false`; a later
361            // fire on the next wave re-tries the upgrade fresh).
362            let Some(arc_inner) = weak_inner_topo.upgrade() else {
363                return;
364            };
365            let s = sink_topo.clone();
366            let sched = Rc::clone(&scheduled);
367            sched.set(true);
368            // The Defer closure captures no `HandleId` (only an
369            // `Arc<sink>` + a `Weak`-upgraded inner) — if the Core
370            // is gone (`false`) the snapshot simply won't fire;
371            // nothing to release (D235 P8 pattern).
372            let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
373                sched.set(false);
374                s(&describe_of(cf, &arc_inner, None));
375            }));
376        }
377    });
378    let topo_sub_id = core.subscribe_topology(topo_sink);
379
380    ReactiveDescribeHandle {
381        inner: inner.clone(),
382        ns_sink_id,
383        topo_sub_id,
384    }
385}