graphrefly-graph 0.0.6

GraphReFly Graph container, describe/observe, content-addressed snapshots
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
//! `Graph::describe()` — JSON form of canonical spec §3.6 + Appendix B.
//!
//! Static JSON form (Slice E+) + reactive describe (Slice F+). Pretty
//! / mermaid / d2 / stage-log / explain / reachable variants are
//! deferred (subsequent slices).
//!
//! # Value rendering — raw vs. binding-rendered (F sub-slice, 2026-05-10)
//!
//! Canonical TS surfaces `value: T` directly. The Rust port preserves
//! the handle-protocol cleaving plane (Core operates on opaque
//! `HandleId` integers; binding-side owns `HandleId → T`) by surfacing
//! `value: DescribeValue`:
//!
//! - `DescribeValue::Handle(HandleId)` — raw u64 view, used by
//!   `Graph::describe()` (the default). Suitable for parity tests
//!   that compare against TS by mapping handles through the binding
//!   manually, and for debug contexts that don't have a debug
//!   binding wired up.
//! - `DescribeValue::Rendered(serde_json::Value)` — binding-rendered
//!   view, used by `Graph::describe_with_debug(debug)`. The caller
//!   passes a [`DebugBindingBoundary`] impl that knows how to
//!   project each registered value into a JSON form. This is the
//!   "developer-friendly" surface — looks just like TS's `value: T`
//!   in the serialized JSON because the rendering happens at the
//!   binding boundary, off the Core hot path.
//!
//! Each value field serializes uniformly: as a u64 number for
//! `Handle`, or as the binding's chosen JSON shape for `Rendered`.
//! `None` (sentinel cache) serializes as `null` in both modes.

use std::sync::{Arc, Weak};

use graphrefly_core::{
    Core, HandleId, NodeId, NodeKind, OperatorOp, TerminalKind, TopologyEvent,
    TopologySubscription, NO_HANDLE,
};
use indexmap::IndexMap;
use parking_lot::Mutex;
use serde::{Serialize, Serializer};

use crate::debug::DebugBindingBoundary;
use crate::graph::{Graph, GraphInner};

/// Top-level `describe()` output (canonical Appendix B JSON schema).
///
/// `nodes` is insertion-ordered (matches namespace registration
/// order) — load-bearing for stable serialized output.
#[derive(Debug, Clone, Serialize)]
pub struct GraphDescribeOutput {
    /// Graph name as set at construction / mount.
    pub name: String,
    /// Local nodes by name.
    pub nodes: IndexMap<String, NodeDescribe>,
    /// Local edges (dep → consumer).
    pub edges: Vec<EdgeDescribe>,
    /// Mounted child names (recurse via `Graph::node(child).describe()`).
    pub subgraphs: Vec<String>,
}

/// Per-node descriptor.
#[derive(Debug, Clone, Serialize)]
pub struct NodeDescribe {
    /// `"state"` / `"derived"` / `"dynamic"` / `"producer"`.
    /// Producer-vs-state inference: a state node with no fn-id but
    /// `has_fired_once=true` may stem from a producer pattern; the
    /// rust-side classifier just reports `kind` directly. (Producer
    /// inference is a binding-side concern — see canonical §3.6.1.)
    #[serde(rename = "type")]
    pub r#type: NodeTypeStr,
    /// Lifecycle status (canonical Appendix B enum).
    pub status: NodeStatus,
    /// Current cache value (F sub-slice, 2026-05-10). `None` when
    /// the cache is sentinel (`NO_HANDLE`). Otherwise:
    ///
    /// - `DescribeValue::Handle(HandleId)` — raw u64 (from
    ///   [`Graph::describe`]).
    /// - `DescribeValue::Rendered(serde_json::Value)` — binding-
    ///   rendered (from [`Graph::describe_with_debug`]).
    ///
    /// Serialization is uniform: the inner u64 or JSON value
    /// appears directly in the output (no enum tag).
    pub value: Option<DescribeValue>,
    /// Dep names in declaration order. Unnamed deps surface as
    /// `_anon_<NodeId>` to keep the output lossless without
    /// elevating Core-only nodes into the namespace.
    pub deps: Vec<String>,
    /// Operator discriminant (e.g. `"map"`, `"filter"`, `"combine"`).
    /// `None` for non-operator nodes. Slice V5: surfaces the
    /// `OperatorOp` variant name so consumers can distinguish
    /// operator kinds (was previously just `type: "operator"`).
    #[serde(default, skip_serializing_if = "Option::is_none", rename = "operator")]
    pub operator_kind: Option<String>,
    /// Free-form metadata per canonical Appendix B (e.g. `{
    /// "description": "...", "type": "integer", "range": [1, 10] }`).
    /// Always `None` in this slice — the metadata-storage primitive
    /// on Core hasn't shipped yet. Reserved as `Option<serde_json::Value>`
    /// so the JSON shape stays forward-compatible (omitted via
    /// `skip_serializing_if` when None to keep current outputs slim).
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub meta: Option<serde_json::Value>,
}

/// Per-node cache value in `describe` output. Surfaced as `value:
/// <u64>` when produced by [`Graph::describe`] (raw handle view), or
/// as `value: <T>` when produced by
/// [`Graph::describe_with_debug`] (binding-rendered view). Serialized
/// uniformly without an enum tag — consumers see either a number
/// or whatever JSON shape the binding emits.
#[derive(Debug, Clone, PartialEq)]
pub enum DescribeValue {
    /// Raw handle view. Default for [`Graph::describe`]. The
    /// serialized JSON is a `Number` (the u64 raw view of the
    /// handle).
    Handle(HandleId),
    /// Binding-rendered view. Produced by
    /// [`Graph::describe_with_debug`] via the supplied
    /// [`DebugBindingBoundary`]. The serialized JSON is whatever
    /// shape the binding's `handle_to_debug` returned (string,
    /// number, object — fully under binding control).
    Rendered(serde_json::Value),
}

impl Serialize for DescribeValue {
    fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
        match self {
            DescribeValue::Handle(h) => ser.serialize_u64(h.raw()),
            DescribeValue::Rendered(v) => v.serialize(ser),
        }
    }
}

/// Edge between two named nodes (or a named node and an anonymous
/// dep, surfaced as `_anon_<NodeId>`).
#[derive(Debug, Clone, Serialize)]
pub struct EdgeDescribe {
    pub from: String,
    pub to: String,
}

/// Canonical Appendix B `type` enum.
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum NodeTypeStr {
    State,
    Derived,
    Dynamic,
    /// Reserved for future producer-pattern classification — the Rust
    /// port doesn't infer this kind today; emitted only when the
    /// binding side has annotated it.
    Producer,
    /// Reserved for future side-effect classification. Same caveat
    /// as `Producer`.
    Effect,
    /// Reserved for the operator catalog when M3 lands.
    Operator,
}

/// Canonical Appendix B `status` enum.
#[derive(Debug, Clone, Copy, Serialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum NodeStatus {
    /// State node with sentinel cache (never had a value).
    Sentinel,
    /// Compute node that has not yet fired (first-run gate not satisfied).
    Pending,
    /// DIRTY queued; tier-3 settle has not flushed yet.
    Dirty,
    /// Has a value, no terminal, no DIRTY pending.
    Settled,
    /// Same as `Settled` for static descriptors — wave-internal
    /// "resolved-this-wave" doesn't survive flush. Reserved for
    /// reactive-describe later.
    Resolved,
    /// Terminated via `[COMPLETE]`.
    Completed,
    /// Terminated via `[ERROR, h]`.
    Errored,
}

impl Graph {
    /// Snapshot the graph's topology + lifecycle state. JSON form only
    /// in this slice (see module docs).
    ///
    /// `value` fields serialize as raw u64 handles. Pass a
    /// [`DebugBindingBoundary`] to
    /// [`Self::describe_with_debug`](Self::describe_with_debug)
    /// instead if you want `value: T`-shaped output.
    #[must_use]
    pub fn describe(&self) -> GraphDescribeOutput {
        self.describe_inner(None)
    }

    /// Variant of [`Self::describe`] that renders each node's
    /// `value` via the supplied [`DebugBindingBoundary`].
    ///
    /// Useful when consuming `describe()` output to display values
    /// to humans (e.g., debugging UIs, log scrapers) — the JSON
    /// surfaces the binding's `T` shape rather than opaque u64
    /// handles.
    ///
    /// The trait is intentionally outside
    /// [`graphrefly_core::BindingBoundary`] so the hot-path FFI
    /// surface stays narrow. Bindings opt in by implementing both.
    /// Pre-1.0: bindings that don't ship `DebugBindingBoundary`
    /// simply force callers to use raw [`Self::describe`] (no
    /// fallback). See [`crate::debug`] for the trait's contract.
    #[must_use]
    pub fn describe_with_debug(&self, debug: &dyn DebugBindingBoundary) -> GraphDescribeOutput {
        self.describe_inner(Some(debug))
    }

    fn describe_inner(&self, debug: Option<&dyn DebugBindingBoundary>) -> GraphDescribeOutput {
        let inner = self.inner.lock();
        let graph_name = inner.name.clone();
        let local_names: IndexMap<NodeId, String> = inner
            .names
            .iter()
            .map(|(name, id)| (*id, name.clone()))
            .collect();
        let subgraphs: Vec<String> = inner.children.keys().cloned().collect();
        let names_iter: Vec<(String, NodeId)> =
            inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect();
        drop(inner);

        let mut nodes: IndexMap<String, NodeDescribe> = IndexMap::new();
        let mut edges: Vec<EdgeDescribe> = Vec::new();

        for (name, id) in &names_iter {
            let kind = self.core.kind_of(*id).unwrap_or(NodeKind::State);
            let cache = self.core.cache_of(*id);
            let terminal = self.core.is_terminal(*id);
            let dirty = self.core.is_dirty(*id);
            let fired = self.core.has_fired_once(*id);

            let dep_ids = self.core.deps_of(*id);
            let dep_names: Vec<String> = dep_ids
                .iter()
                .map(|d| {
                    local_names
                        .get(d)
                        .cloned()
                        .unwrap_or_else(|| format!("_anon_{}", d.raw()))
                })
                .collect();
            for dep_name in &dep_names {
                edges.push(EdgeDescribe {
                    from: dep_name.clone(),
                    to: name.clone(),
                });
            }

            // F sub-slice (2026-05-10): pick raw vs binding-rendered
            // value. Sentinel cache (NO_HANDLE) → None regardless of
            // mode. Real handle: route through debug binding when
            // supplied, else surface raw.
            let value = if cache == NO_HANDLE {
                None
            } else if let Some(debug) = debug {
                Some(DescribeValue::Rendered(debug.handle_to_debug(cache)))
            } else {
                Some(DescribeValue::Handle(cache))
            };

            let operator_kind = match kind {
                NodeKind::Operator(op) => Some(operator_op_name(op)),
                _ => None,
            };
            nodes.insert(
                name.clone(),
                NodeDescribe {
                    r#type: type_str_of(kind),
                    status: status_of(kind, cache, terminal, dirty, fired),
                    value,
                    deps: dep_names,
                    operator_kind,
                    meta: None,
                },
            );
        }

        GraphDescribeOutput {
            name: graph_name,
            nodes,
            edges,
            subgraphs,
        }
    }
}

fn type_str_of(kind: NodeKind) -> NodeTypeStr {
    match kind {
        NodeKind::State => NodeTypeStr::State,
        NodeKind::Producer => NodeTypeStr::Producer,
        NodeKind::Derived => NodeTypeStr::Derived,
        NodeKind::Dynamic => NodeTypeStr::Dynamic,
        NodeKind::Operator(_) => NodeTypeStr::Operator,
    }
}

/// Slice V5: surfaces the `OperatorOp` variant name as a lowercase
/// string for the `operator` field in `NodeDescribe`.
fn operator_op_name(op: OperatorOp) -> String {
    match op {
        OperatorOp::Map { .. } => "map",
        OperatorOp::Filter { .. } => "filter",
        OperatorOp::Scan { .. } => "scan",
        OperatorOp::Reduce { .. } => "reduce",
        OperatorOp::DistinctUntilChanged { .. } => "distinctUntilChanged",
        OperatorOp::Pairwise { .. } => "pairwise",
        OperatorOp::Combine { .. } => "combine",
        OperatorOp::WithLatestFrom { .. } => "withLatestFrom",
        OperatorOp::Merge => "merge",
        OperatorOp::Take { .. } => "take",
        OperatorOp::Skip { .. } => "skip",
        OperatorOp::TakeWhile { .. } => "takeWhile",
        OperatorOp::Last { .. } => "last",
        OperatorOp::Tap { .. } => "tap",
        OperatorOp::TapFirst { .. } => "tapFirst",
        OperatorOp::Valve => "valve",
        OperatorOp::Settle { .. } => "settle",
    }
    .to_owned()
}

/// Canonical-spec §3.6.1 status mapping.
///
/// Precedence (high to low): `errored` > `completed` > `dirty` >
/// (cache-cleared discriminator) > (`settled` if `cache != NO_HANDLE`)
/// > (`pending` for unfired compute) > (`sentinel` for state).
///
/// # R1.3.7.b post-INVALIDATE classification (Slice F, A8 — 2026-05-07)
///
/// Per canonical R1.3.7.b: "The emitting node's status transitions to
/// 'sentinel' (no value, nothing pending) — NOT 'dirty' (value about to
/// change) — because INVALIDATE has cleared the cache outright with no new
/// value pending."
///
/// Implementation: a *fired* compute node with `cache == NO_HANDLE` and no
/// terminal and no DIRTY pending has been `INVALIDATE`-d (the only path that
/// clears the cache without setting a terminal). Report `Sentinel`, NOT
/// `Settled` (the prior bug). State nodes use the same logic — `cache == NO_HANDLE`
/// always means `Sentinel` regardless of `fired`.
///
/// # Reactive-describe note
///
/// When both `terminal.is_some()` AND `dirty == true` (a wave that began
/// before the terminal was installed and still has unflushed tier-1 traffic),
/// this static classifier reports the terminal status. Reactive describe will
/// need a `terminating` substate to surface the unflushed wave — not modeled
/// here because the static walk happens between waves in practice.
fn status_of(
    kind: NodeKind,
    cache: HandleId,
    terminal: Option<TerminalKind>,
    dirty: bool,
    fired: bool,
) -> NodeStatus {
    match terminal {
        Some(TerminalKind::Error(_)) => return NodeStatus::Errored,
        Some(TerminalKind::Complete) => return NodeStatus::Completed,
        None => {}
    }
    if dirty {
        return NodeStatus::Dirty;
    }
    // R1.3.7.b: `cache == NO_HANDLE` discriminates Sentinel vs Settled
    // BEFORE the `fired` check, so post-INVALIDATE on fired compute nodes
    // correctly reports `Sentinel` (was incorrectly `Settled` pre-A8).
    if cache == NO_HANDLE {
        return match kind {
            NodeKind::State => NodeStatus::Sentinel,
            NodeKind::Producer | NodeKind::Derived | NodeKind::Dynamic | NodeKind::Operator(_) => {
                if fired {
                    // Compute node that previously fired but currently has
                    // sentinel cache → INVALIDATE wiped it. R1.3.7.b says
                    // status is `sentinel`, not `pending` (pending = first-fire
                    // gate not yet satisfied).
                    NodeStatus::Sentinel
                } else {
                    NodeStatus::Pending
                }
            }
        };
    }
    NodeStatus::Settled
}

// -------------------------------------------------------------------
// Reactive describe (canonical §3.6.1 `reactive: true` mode)
// -------------------------------------------------------------------

/// Sink type for reactive describe — receives a fresh `GraphDescribeOutput`
/// on every namespace change.
pub type DescribeSink = Arc<dyn Fn(&GraphDescribeOutput) + Send + Sync>;

/// RAII handle for a reactive describe subscription. Dropping it stops
/// the namespace listener and frees the describe-sink.
///
/// The reactive describe fires synchronously from Graph-level
/// namespace mutations (`add`, `remove`, `destroy`, `mount`,
/// `unmount`, and the cascaded teardowns of `core.teardown`). Each
/// fire re-snapshots the full `Graph::describe()` and delivers it
/// to the sink.
#[must_use = "ReactiveDescribeHandle holds the subscription; dropping it unsubscribes"]
pub struct ReactiveDescribeHandle {
    graph: Graph,
    ns_sink_id: u64,
    /// Slice V3 D5: Core topology subscription for `DepsChanged` events.
    /// When deps change (via `set_deps`), edges in describe output change
    /// even though the namespace hasn't changed. Dropping this field
    /// automatically unsubscribes from Core topology events.
    topo_sub: Option<TopologySubscription>,
}

impl Drop for ReactiveDescribeHandle {
    fn drop(&mut self) {
        // Drop topology sub BEFORE unsubscribing namespace sink to avoid
        // potential deadlock if the topology sink fires during unsubscribe.
        self.topo_sub.take();
        self.graph.unsubscribe_namespace_change(self.ns_sink_id);
    }
}

// Send + Sync compile-time assertion.
const _: fn() = || {
    fn assert_send_sync<T: Send + Sync>() {}
    assert_send_sync::<ReactiveDescribeHandle>();
};

impl Graph {
    /// Subscribe to live topology snapshots. The sink fires immediately
    /// with the current [`GraphDescribeOutput`] (push-on-subscribe per
    /// canonical §2.5.2 / R3.6.1) and then again with a fresh snapshot
    /// every time a node is added, removed, mounted, unmounted, or the
    /// graph is destroyed.
    ///
    /// Returns a [`ReactiveDescribeHandle`] — dropping it unsubscribes.
    ///
    /// This is the `reactive: true` mode from canonical §3.6.1. The
    /// `reactive: "diff"` (changeset) mode is deferred to Phase 14.
    ///
    /// Note: `set_deps` topology changes fire via Core's topology
    /// primitive, not this Graph-level namespace hook. If callers also
    /// need `set_deps` notifications, compose with
    /// [`graphrefly_core::Core::subscribe_topology`].
    ///
    /// The sink captures only a [`Weak`] reference to the graph's inner
    /// state, so the `namespace_sinks` → sink → Graph → `namespace_sinks`
    /// Arc cycle is broken at the sink edge (see P6 in the Slice F /qa
    /// closing notes).
    pub fn describe_reactive(&self, sink: DescribeSink) -> ReactiveDescribeHandle {
        // Push-on-subscribe: fire current snapshot once before installing
        // the listener. Sink runs without any Graph lock held.
        sink(&self.describe());

        // Capture Weak<inner> + Core (clone) to break the
        // namespace_sinks → sink → Graph → namespace_sinks Arc cycle.
        // If the user leaks the handle, the graph still drops cleanly
        // because the sink's Weak ref does not keep `inner` alive.
        let weak_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.inner);
        let core: Core = self.core.clone();
        let sink_for_ns = sink.clone();
        let ns_sink = Arc::new(move || {
            let Some(arc_inner) = weak_inner.upgrade() else {
                return;
            };
            let graph = Graph {
                core: core.clone(),
                inner: arc_inner,
            };
            let snapshot = graph.describe();
            sink_for_ns(&snapshot);
        });
        let ns_sink_id = self.subscribe_namespace_change(ns_sink);

        // Slice V3 D5: subscribe to Core topology events so that
        // `set_deps` changes (which alter edges without touching the
        // namespace) also trigger a describe update.
        let weak_inner_topo: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.inner);
        let core_topo: Core = self.core.clone();
        let topo_sink: Arc<dyn Fn(&TopologyEvent) + Send + Sync> =
            Arc::new(move |event: &TopologyEvent| {
                if matches!(event, TopologyEvent::DepsChanged { .. }) {
                    let Some(arc_inner) = weak_inner_topo.upgrade() else {
                        return;
                    };
                    let graph = Graph {
                        core: core_topo.clone(),
                        inner: arc_inner,
                    };
                    let snapshot = graph.describe();
                    sink(&snapshot);
                }
            });
        let topo_sub = self.core.subscribe_topology(topo_sink);

        ReactiveDescribeHandle {
            graph: self.clone(),
            ns_sink_id,
            topo_sub: Some(topo_sub),
        }
    }
}