Skip to main content

graphrefly_graph/
observe.rs

1//! `Graph::observe()` / `observe_all()` / `observe_all_reactive()` —
2//! default sink-style message tap (canonical §3.6.2 default mode).
3//!
4//! D246: observe handles carry a Core-free [`Graph`] (a cheap `Arc`
5//! clone) + the embedder's `&Core` passed explicitly per call. There is
6//! **no RAII `Drop`** below the binding (D246 rule 3 — this eliminates
7//! the Blind #4 lock-across-unsubscribe-in-`Drop` deadlock class
8//! entirely): `subscribe` returns ids; teardown is the owner-invoked
9//! [`detach`](GraphObserveAllReactive::detach) (synchronous,
10//! `&Core`-explicit). The embedder's [`graphrefly_core::OwnedCore`] is
11//! the one RAII boundary.
12//!
13//! The reactive `observe_all` ns-listener fires owner-side with `&Core`
14//! (D246 rule 2). The Core-topology prune of torn-down nodes fires
15//! *inside* a wave, so its `unsubscribe` is routed through
16//! `MailboxOp::Defer` (D246 rule 6 — sink-in-wave defers).
17
18use std::cell::RefCell;
19use std::collections::HashSet;
20use std::rc::{Rc, Weak};
21use std::sync::Arc;
22
23use graphrefly_core::{
24    Core, CoreFull, LockId, Message, NodeId, PauseError, ResumeReport, Sink, SubscriptionId,
25    TopologyEvent, TopologySubscriptionId, UpError,
26};
27
28use crate::graph::{register_ns_sink, Graph, GraphInner, NamespaceChangeSink};
29
30/// Id pair for a single observe subscription. D246: a plain value (no
31/// `Drop`); detach explicitly via [`Self::detach`] or let the
32/// embedder's `OwnedCore` tear down on owner-thread drop.
33#[derive(Debug, Clone, Copy)]
34pub struct ObserveSub {
35    node_id: NodeId,
36    sub_id: SubscriptionId,
37}
38
39impl ObserveSub {
40    /// The observed node.
41    #[must_use]
42    pub fn node_id(&self) -> NodeId {
43        self.node_id
44    }
45
46    /// The subscription id.
47    #[must_use]
48    pub fn sub_id(&self) -> SubscriptionId {
49        self.sub_id
50    }
51
52    /// Owner-invoked, synchronous detach (D246 rule 3).
53    pub fn detach(&self, core: &Core) {
54        core.unsubscribe(self.node_id, self.sub_id);
55    }
56}
57
58/// Single-node observe handle (canonical §3.6.2).
59///
60/// D246: holds a Core-free [`Graph`]; `&Core` is passed per call.
61///
62/// # Two coexisting message-injection shapes (D298, 2026-05-26)
63///
64/// This impl exposes BOTH the canonical `up(messages)` open-vocab
65/// shape AND per-tier typed methods. **They have different semantics
66/// and are NOT sugar wrappers for each other.** Both shapes are
67/// supported as first-class public API; users pick based on intent.
68///
69/// 1. **Canonical R3.6.2 — [`Self::up`]`(messages)`** — send messages
70///    UPSTREAM toward the observed node's sources. Routes through
71///    [`Core::up`], which iterates each dep and dispatches the
72///    message-specific upstream action (`Pause` → pause each dep;
73///    `Resume` → resume each dep; `Invalidate` → recursive plain-
74///    forward to leaves per R1.4.2; `Teardown` → teardown each dep).
75///    For a leaf node with no deps (e.g., a state node), `up()` is a
76///    no-op — there is no upstream. Open message vocabulary mirrors
77///    pure-ts `Node.up(messages)` (`core/node.ts:1430`).
78///
79/// 2. **Direct typed methods — [`Self::pause`] / [`Self::resume`] /
80///    [`Self::invalidate`]** — mutate the OBSERVED NODE directly
81///    (not its upstream). `observer.pause(lock)` calls
82///    [`Core::pause`]`(self.node_id, lock)` and pauses the observed
83///    node itself; for a leaf state node, this DOES pause it.
84///    Rust-idiomatic typed-arg shortcuts for the common
85///    "pause/resume/invalidate the thing I'm observing" pattern.
86///    Non-allocating (no `Vec<Message>` heap churn on the control
87///    plane). The collaboration directive in
88///    `feedback_no_imperative` user memory favors these
89///    intent-named typed calls over message-injection vocabulary
90///    when the operation IS "act on this node directly."
91///
92/// Use `up(messages)` for canonical upstream-injection (matches TS
93/// pure-ts wire shape; required for cross-impl parity scenarios
94/// asserting canonical R3.6.2 behavior). Use typed methods for
95/// direct-node ergonomic mutation. Same `GraphObserveOne` handle
96/// exposes both.
97///
98/// # Decision provenance
99///
100/// - Q1 user-locked Option 2 (canonical primary + typed sugar
101///   coexisting). Override of D196 (consumer-pressure gate) per the
102///   `/porting-to-rs clear all the perf items and the deferred items.
103///   regardless if they need a valid consumer demand` directive.
104/// - D298 (`~/src/graphrefly-ts/docs/rust-port-decisions.md`).
105#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
106pub struct GraphObserveOne {
107    graph: Graph,
108    node_id: NodeId,
109}
110
111impl GraphObserveOne {
112    pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
113        Self { graph, node_id }
114    }
115
116    /// The observed `NodeId`.
117    #[must_use]
118    pub fn node_id(&self) -> NodeId {
119        self.node_id
120    }
121
122    /// Subscribe a sink. Returns an [`ObserveSub`] id pair — detach
123    /// owner-invoked (D246 rule 3).
124    pub fn subscribe(&self, core: &Core, sink: Sink) -> ObserveSub {
125        let sub_id = core.subscribe(self.node_id, sink);
126        ObserveSub {
127            node_id: self.node_id,
128            sub_id,
129        }
130    }
131
132    /// Send `[PAUSE, lock]` upstream.
133    ///
134    /// # Errors
135    /// See [`PauseError`].
136    pub fn pause(&self, core: &Core, lock: LockId) -> Result<(), PauseError> {
137        core.pause(self.node_id, lock)
138    }
139
140    /// Send `[RESUME, lock]` upstream.
141    ///
142    /// # Errors
143    /// See [`PauseError`].
144    pub fn resume(&self, core: &Core, lock: LockId) -> Result<Option<ResumeReport>, PauseError> {
145        core.resume(self.node_id, lock)
146    }
147
148    /// Send `[INVALIDATE]` upstream.
149    pub fn invalidate(&self, core: &Core) {
150        core.invalidate(self.node_id);
151    }
152
153    /// Send messages upstream toward the observed node's sources
154    /// (canonical R3.6.2 `up(messages)` shape — D298, 2026-05-26).
155    ///
156    /// Each message is dispatched via [`Core::up`] to the observed
157    /// node, which then forwards per the tier-specific upstream
158    /// routing:
159    ///
160    /// - `Message::Pause(lock)` → [`Core::pause`] on each dep.
161    /// - `Message::Resume(lock)` → [`Core::resume`] on each dep.
162    /// - `Message::Invalidate` → recursive plain-forward to leaf
163    ///   sources per R1.4.2 (no self-process at intermediates).
164    /// - `Message::Teardown` → [`Core::teardown`] cascade on each
165    ///   dep.
166    /// - `Message::Start` / `Message::Dirty` → no-op upstream per
167    ///   the routing table in [`Core::up`].
168    ///
169    /// For a leaf node (no deps — e.g., a state node) `up()` is a
170    /// no-op. Use the typed [`Self::pause`] / [`Self::resume`] /
171    /// [`Self::invalidate`] methods if the intent is to mutate the
172    /// observed node directly.
173    ///
174    /// Fails on the first message that errors; subsequent messages
175    /// are NOT dispatched. Empty `messages` is `Ok(())`.
176    ///
177    /// # Errors
178    ///
179    /// - [`UpError::UnknownNode`] — observed node not registered
180    ///   (only reachable if `teardown` removed it after `observe()`).
181    /// - [`UpError::TierForbidden`] — `messages` contains a
182    ///   tier-3 (`Data`/`Resolved`) or tier-5 (`Complete`/`Error`)
183    ///   variant; these are downstream-only per R1.4.1.
184    pub fn up(&self, core: &Core, messages: &[Message]) -> Result<(), UpError> {
185        for &msg in messages {
186            core.up(self.node_id, msg)?;
187        }
188        Ok(())
189    }
190
191    /// The backing graph handle.
192    #[must_use]
193    pub fn graph(&self) -> &Graph {
194        &self.graph
195    }
196}
197
198/// All-nodes observe handle. Subscriptions are tied to the set of
199/// nodes named at `subscribe()` call time. D246: no RAII `Drop` —
200/// You MUST call [`Self::detach`]`(core)` (owner-invoked) — these Core
201/// subscriptions are opened via raw `core.subscribe` and are NOT
202/// `OwnedCore`-tracked, so dropping the handle without `detach` leaks
203/// them for the `Core` lifetime.
204#[must_use = "GraphObserveAll holds Core subscriptions NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
205pub struct GraphObserveAll {
206    graph: Graph,
207    subs: Vec<(NodeId, SubscriptionId)>,
208}
209
210impl GraphObserveAll {
211    pub(crate) fn new(graph: Graph) -> Self {
212        Self {
213            graph,
214            subs: Vec::new(),
215        }
216    }
217
218    /// Multi-cast subscribe against every named node at this moment.
219    /// Returns the node count.
220    pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
221    where
222        F: Fn(&str, &[Message]) + 'static,
223    {
224        let names_to_ids: Vec<(String, NodeId)> = {
225            let inner = self.graph.inner_arc().borrow_mut();
226            inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
227        };
228        let sink_arc: Arc<F> = Arc::new(sink);
229        let count = names_to_ids.len();
230        for (name, id) in names_to_ids {
231            let sink_clone = sink_arc.clone();
232            let owned_name = name;
233            let inner_sink: Sink = Rc::new(move |msgs: &[Message]| {
234                sink_clone(&owned_name, msgs);
235            });
236            let sub_id = core.subscribe(id, inner_sink);
237            self.subs.push((id, sub_id));
238        }
239        count
240    }
241
242    /// Owner-invoked, synchronous detach of every fan-out sink
243    /// (D246 rule 3 — no `Drop`, so no Blind #4 deadlock class).
244    pub fn detach(&mut self, core: &Core) {
245        for (node_id, sub_id) in self.subs.drain(..) {
246            core.unsubscribe(node_id, sub_id);
247        }
248    }
249}
250
251// -------------------------------------------------------------------
252// Reactive observe_all — auto-subscribe late-added nodes
253// -------------------------------------------------------------------
254
255struct ObserveAllReactiveInner {
256    /// Set of `NodeId`s we've already subscribed to.
257    subscribed: HashSet<NodeId>,
258    /// Live `(node, sub)` pairs — unsubscribed on detach / prune.
259    subs: Vec<(NodeId, SubscriptionId)>,
260}
261
262/// Reactive `observe_all` — auto-subscribes late-added named nodes via
263/// the owner-side namespace-change listener, and prunes torn-down
264/// nodes via the Core topology sub (the prune `unsubscribe` is
265/// `MailboxOp::Defer`'d since `NodeTornDown` fires in-wave — D246 r6).
266/// D246 rule 3: no RAII `Drop`; teardown is the owner-invoked
267/// [`Self::detach`]`(core)` — owner-invoked, REQUIRED. The ns-sink is
268/// collected by `graph.destroy(core)`; the Core topology sub + fan-out
269/// subs are opened via raw `core.subscribe*` and are NOT
270/// `OwnedCore`-tracked, so `detach(core)` is the only thing that
271/// collects them (dropping the handle without it leaks them).
272#[must_use = "GraphObserveAllReactive holds a Core topology sub + fan-out subs NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
273pub struct GraphObserveAllReactive {
274    graph: Graph,
275    ns_sink_id: Option<u64>,
276    topo_sub_id: Option<TopologySubscriptionId>,
277    inner: Rc<RefCell<ObserveAllReactiveInner>>,
278}
279
280impl GraphObserveAllReactive {
281    pub(crate) fn new(graph: Graph) -> Self {
282        Self {
283            graph,
284            ns_sink_id: None,
285            topo_sub_id: None,
286            inner: Rc::new(RefCell::new(ObserveAllReactiveInner {
287                subscribed: HashSet::new(),
288                subs: Vec::new(),
289            })),
290        }
291    }
292
293    /// Subscribe a sink to all current AND future named nodes.
294    ///
295    /// # Panics
296    ///
297    /// Panics if called more than once on the same handle (single-shot
298    /// wiring; rebuild via `observe_all_reactive`).
299    //
300    // Single load-bearing subscribe path: wires initial-snapshot taps for
301    // every named node PLUS a namespace-change sink that wires late
302    // additions. Splitting mechanically would interleave shared state
303    // (sink Arc, weak inner, ns_sink_id) across helpers without clarifying
304    // intent — keep cohesive.
305    #[allow(clippy::too_many_lines)]
306    pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
307    where
308        F: Fn(&str, &[Message]) + 'static,
309    {
310        assert!(
311            self.ns_sink_id.is_none(),
312            "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
313        );
314
315        let sink_arc: Arc<F> = Arc::new(sink);
316
317        // P4: install the namespace listener BEFORE the initial
318        // snapshot (the listener's `subscribed.insert` dedups).
319        // D246 rule 2: the listener receives the owner's `&Core` at
320        // fire-time (no stored/cloned Core); it subscribes new nodes
321        // synchronously (fire_namespace_change is owner-side, not
322        // in-wave).
323        let weak_graph_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(self.graph.inner_arc());
324        let inner_for_ns = self.inner.clone();
325        let sink_for_ns = sink_arc.clone();
326        let ns_sink: NamespaceChangeSink = Rc::new(move |core: &Core| {
327            let Some(arc_inner) = weak_graph_inner.upgrade() else {
328                return;
329            };
330            let new_nodes: Vec<(String, NodeId)> = {
331                let graph_inner = arc_inner.borrow_mut();
332                let state = inner_for_ns.borrow_mut();
333                graph_inner
334                    .names
335                    .iter()
336                    .filter(|(_n, id)| !state.subscribed.contains(id))
337                    .map(|(n, id)| (n.clone(), *id))
338                    .collect()
339            };
340            for (name, id) in new_nodes {
341                let should = {
342                    let mut state = inner_for_ns.borrow_mut();
343                    state.subscribed.insert(id)
344                };
345                if should {
346                    let sink_clone = sink_for_ns.clone();
347                    let owned_name = name;
348                    let msg_sink: Sink = Rc::new(move |msgs: &[Message]| {
349                        sink_clone(&owned_name, msgs);
350                    });
351                    let sub_id = core.subscribe(id, msg_sink);
352                    inner_for_ns.borrow_mut().subs.push((id, sub_id));
353                }
354            }
355        });
356        self.ns_sink_id = Some(register_ns_sink(self.graph.inner_arc(), ns_sink));
357
358        // Slice V3 D2: prune torn-down nodes. `NodeTornDown` fires
359        // in-wave → the `unsubscribe` is `MailboxOp::Defer`'d (D246 r6:
360        // no synchronous sink-side Core mutation).
361        let inner_for_topo = self.inner.clone();
362        // D249/S2c: post to the owner-side `!Send` `DeferQueue` (the
363        // closure captures the `Rc<RefCell<ObserveAllReactiveInner>>`,
364        // `!Send`); `Rc<DeferQueue>` is owner-thread-only — fine, this
365        // topo sink is `!Send` (D248) and fires on the owner thread.
366        let deferred = core.defer_queue();
367        // D246 rule 8 (S4): reusable coalescing slot. Prune is NOT
368        // idempotent (each torn id must be unsubscribed once), so
369        // accumulate torn ids into a shared owner-thread-only buffer
370        // and post ONE `Box` per drain that processes all of them —
371        // instead of one boxed closure per `NodeTornDown`.
372        // Behaviour-equivalent: the set of (node,sub) pairs unsubscribed
373        // is exactly the union, just batched into one deferred pass.
374        let pending: Rc<RefCell<Vec<NodeId>>> = Rc::new(RefCell::new(Vec::new()));
375        let scheduled = Rc::new(std::cell::Cell::new(false));
376        let topo_sink: Rc<dyn Fn(&TopologyEvent)> = Rc::new(move |event: &TopologyEvent| {
377            if let TopologyEvent::NodeTornDown(id) = event {
378                // INVARIANT (QA, 2026-05-19): push BEFORE the
379                // `scheduled.get()` check so a fire arriving while a
380                // defer is in-flight (after `sched.set(false)`,
381                // before the next batch) still gets captured by the
382                // in-flight drain's `mem::take`. Re-entry from
383                // `cf.unsubscribe` (a future code path adding
384                // teardown-on-last-unsub) would land here; the
385                // closure releases the `pending` borrow via
386                // `mem::take` BEFORE invoking `cf.*` so no
387                // `already-borrowed` panic.
388                pending.borrow_mut().push(*id);
389                if scheduled.get() {
390                    return; // already armed for this drain — coalesce.
391                }
392                scheduled.set(true);
393                let inner_for_defer = inner_for_topo.clone();
394                let pending_for_defer = Rc::clone(&pending);
395                let sched = Rc::clone(&scheduled);
396                // No `HandleId` captured — Core-gone (`false`) just
397                // skips the prune; nothing to release (D235 P8).
398                let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
399                    sched.set(false);
400                    let torn: Vec<NodeId> = std::mem::take(&mut *pending_for_defer.borrow_mut());
401                    let to_unsub: Vec<(NodeId, SubscriptionId)> = {
402                        let mut state = inner_for_defer.borrow_mut();
403                        let mut acc = Vec::new();
404                        for id in torn {
405                            if state.subscribed.remove(&id) {
406                                let (keep, drop_): (Vec<_>, Vec<_>) =
407                                    state.subs.drain(..).partition(|(n, _)| *n != id);
408                                state.subs = keep;
409                                acc.extend(drop_);
410                            }
411                        }
412                        acc
413                    };
414                    for (n, s) in to_unsub {
415                        cf.unsubscribe(n, s);
416                    }
417                }));
418            }
419        });
420        self.topo_sub_id = Some(core.subscribe_topology(topo_sink));
421
422        // Initial snapshot (listener's idempotent walk dedups overlap).
423        let names_to_ids: Vec<(String, NodeId)> = {
424            let graph_inner = self.graph.inner_arc().borrow_mut();
425            graph_inner
426                .names
427                .iter()
428                .map(|(n, id)| (n.clone(), *id))
429                .collect()
430        };
431        let initial_count = names_to_ids.len();
432        let to_subscribe: Vec<(String, NodeId)> = {
433            let mut state = self.inner.borrow_mut();
434            names_to_ids
435                .into_iter()
436                .filter(|(_n, id)| state.subscribed.insert(*id))
437                .collect()
438        };
439        for (name, id) in to_subscribe {
440            let sink_clone = sink_arc.clone();
441            let msg_sink: Sink = Rc::new(move |msgs: &[Message]| {
442                sink_clone(&name, msgs);
443            });
444            let sub_id = core.subscribe(id, msg_sink);
445            self.inner.borrow_mut().subs.push((id, sub_id));
446        }
447
448        initial_count
449    }
450
451    /// Owner-invoked, synchronous teardown (D246 rule 3 — replaces the
452    /// retired RAII `Drop`; eliminates the Blind #4 deadlock class).
453    /// Topology sub first, then namespace sink, then drain the fan-out
454    /// subs into a local `Vec` and release the `inner` lock BEFORE the
455    /// `core.unsubscribe` cascade (`Core::unsubscribe` runs the full
456    /// deactivation chain and can fire sinks synchronously; holding
457    /// `inner` across it would self-deadlock — the pre-β invariant,
458    /// preserved).
459    pub fn detach(&mut self, core: &Core) {
460        if let Some(id) = self.topo_sub_id.take() {
461            core.unsubscribe_topology(id);
462        }
463        if let Some(id) = self.ns_sink_id.take() {
464            crate::graph::unregister_ns_sink(self.graph.inner_arc(), id);
465        }
466        let drained: Vec<(NodeId, SubscriptionId)> = {
467            let mut state = self.inner.borrow_mut();
468            state.subs.drain(..).collect()
469        };
470        for (node_id, sub_id) in drained {
471            core.unsubscribe(node_id, sub_id);
472        }
473    }
474
475    /// The backing graph handle.
476    #[must_use]
477    pub fn graph(&self) -> &Graph {
478        &self.graph
479    }
480}