graphrefly_graph/observe.rs
1//! `Graph::observe()` / `observe_all()` / `observe_all_reactive()` —
2//! default sink-style message tap (canonical §3.6.2 default mode).
3//!
4//! D246: observe handles carry a Core-free [`Graph`] (a cheap `Arc`
5//! clone) + the embedder's `&Core` passed explicitly per call. There is
6//! **no RAII `Drop`** below the binding (D246 rule 3 — this eliminates
7//! the Blind #4 lock-across-unsubscribe-in-`Drop` deadlock class
8//! entirely): `subscribe` returns ids; teardown is the owner-invoked
9//! [`detach`](GraphObserveAllReactive::detach) (synchronous,
10//! `&Core`-explicit). The embedder's [`graphrefly_core::OwnedCore`] is
11//! the one RAII boundary.
12//!
13//! The reactive `observe_all` ns-listener fires owner-side with `&Core`
14//! (D246 rule 2). The Core-topology prune of torn-down nodes fires
15//! *inside* a wave, so its `unsubscribe` is routed through
16//! `MailboxOp::Defer` (D246 rule 6 — sink-in-wave defers).
17
18use std::cell::RefCell;
19use std::collections::HashSet;
20use std::rc::{Rc, Weak};
21use std::sync::Arc;
22
23use graphrefly_core::{
24 Core, CoreFull, LockId, Message, NodeId, PauseError, ResumeReport, Sink, SubscriptionId,
25 TopologyEvent, TopologySubscriptionId,
26};
27
28use crate::graph::{register_ns_sink, Graph, GraphInner, NamespaceChangeSink};
29
30/// Id pair for a single observe subscription. D246: a plain value (no
31/// `Drop`); detach explicitly via [`Self::detach`] or let the
32/// embedder's `OwnedCore` tear down on owner-thread drop.
33#[derive(Debug, Clone, Copy)]
34pub struct ObserveSub {
35 node_id: NodeId,
36 sub_id: SubscriptionId,
37}
38
39impl ObserveSub {
40 /// The observed node.
41 #[must_use]
42 pub fn node_id(&self) -> NodeId {
43 self.node_id
44 }
45
46 /// The subscription id.
47 #[must_use]
48 pub fn sub_id(&self) -> SubscriptionId {
49 self.sub_id
50 }
51
52 /// Owner-invoked, synchronous detach (D246 rule 3).
53 pub fn detach(&self, core: &Core) {
54 core.unsubscribe(self.node_id, self.sub_id);
55 }
56}
57
58/// Single-node observe handle (canonical §3.6.2).
59///
60/// D246: holds a Core-free [`Graph`]; `&Core` is passed per call.
61///
62/// # Deliberate divergence from canonical R3.6.2 `up(messages)` (D280 doc-lock)
63///
64/// Canonical R3.6.2 specifies a unified `up(messages: Messages)`
65/// upstream-injection API. This impl exposes the per-tier control
66/// methods [`Self::pause`] / [`Self::resume`] / [`Self::invalidate`]
67/// as separate methods rather than a single `up(messages)` shape.
68/// The split is deliberate, on two grounds:
69///
70/// 1. **Non-allocating ergonomics.** A unified
71/// `up(Vec<Message>)` forces a `Vec` allocation on every upstream
72/// call. The per-tier methods take their args by value and take a
73/// direct path into [`Core::pause`] / [`Core::resume`] /
74/// [`Core::invalidate`] — zero heap churn on the control plane.
75///
76/// 2. **Avoids re-exposing an imperative-shaped public surface.**
77/// The Rust port's collaboration directive (`feedback_no_imperative`
78/// user memory) is to expose reactive `NodeInput` shapes at the
79/// public surface, not imperative message-injection. The per-tier
80/// methods read as intent-named control calls (`pause(lock)`,
81/// `resume(lock)`, `invalidate()`); a unified `up([PAUSE, lock])`
82/// re-exposes the protocol-internal `Messages` shape and tier
83/// numbers as call-site vocabulary.
84///
85/// Cross-binding wrappers (napi-rs `BenchGraph`, future pyo3, etc.)
86/// may reassemble a unified `up(messages)` if a JS/Python idiomatic
87/// API needs it; the substrate-side split is the Rust public-surface
88/// contract.
89///
90/// # Lift point (only if needed)
91///
92/// If a Rust consumer surfaces a need for a unified `up(messages)` —
93/// e.g., a cross-impl parity scenario authored against the canonical
94/// R3.6.2 shape — add it alongside the per-tier methods (additive,
95/// non-breaking). Gated on D196 consumer pressure.
96#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
97pub struct GraphObserveOne {
98 graph: Graph,
99 node_id: NodeId,
100}
101
102impl GraphObserveOne {
103 pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
104 Self { graph, node_id }
105 }
106
107 /// The observed `NodeId`.
108 #[must_use]
109 pub fn node_id(&self) -> NodeId {
110 self.node_id
111 }
112
113 /// Subscribe a sink. Returns an [`ObserveSub`] id pair — detach
114 /// owner-invoked (D246 rule 3).
115 pub fn subscribe(&self, core: &Core, sink: Sink) -> ObserveSub {
116 let sub_id = core.subscribe(self.node_id, sink);
117 ObserveSub {
118 node_id: self.node_id,
119 sub_id,
120 }
121 }
122
123 /// Send `[PAUSE, lock]` upstream.
124 ///
125 /// # Errors
126 /// See [`PauseError`].
127 pub fn pause(&self, core: &Core, lock: LockId) -> Result<(), PauseError> {
128 core.pause(self.node_id, lock)
129 }
130
131 /// Send `[RESUME, lock]` upstream.
132 ///
133 /// # Errors
134 /// See [`PauseError`].
135 pub fn resume(&self, core: &Core, lock: LockId) -> Result<Option<ResumeReport>, PauseError> {
136 core.resume(self.node_id, lock)
137 }
138
139 /// Send `[INVALIDATE]` upstream.
140 pub fn invalidate(&self, core: &Core) {
141 core.invalidate(self.node_id);
142 }
143
144 /// The backing graph handle.
145 #[must_use]
146 pub fn graph(&self) -> &Graph {
147 &self.graph
148 }
149}
150
151/// All-nodes observe handle. Subscriptions are tied to the set of
152/// nodes named at `subscribe()` call time. D246: no RAII `Drop` —
153/// You MUST call [`Self::detach`]`(core)` (owner-invoked) — these Core
154/// subscriptions are opened via raw `core.subscribe` and are NOT
155/// `OwnedCore`-tracked, so dropping the handle without `detach` leaks
156/// them for the `Core` lifetime.
157#[must_use = "GraphObserveAll holds Core subscriptions NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
158pub struct GraphObserveAll {
159 graph: Graph,
160 subs: Vec<(NodeId, SubscriptionId)>,
161}
162
163impl GraphObserveAll {
164 pub(crate) fn new(graph: Graph) -> Self {
165 Self {
166 graph,
167 subs: Vec::new(),
168 }
169 }
170
171 /// Multi-cast subscribe against every named node at this moment.
172 /// Returns the node count.
173 pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
174 where
175 F: Fn(&str, &[Message]) + 'static,
176 {
177 let names_to_ids: Vec<(String, NodeId)> = {
178 let inner = self.graph.inner_arc().borrow_mut();
179 inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
180 };
181 let sink_arc: Arc<F> = Arc::new(sink);
182 let count = names_to_ids.len();
183 for (name, id) in names_to_ids {
184 let sink_clone = sink_arc.clone();
185 let owned_name = name;
186 let inner_sink: Sink = Rc::new(move |msgs: &[Message]| {
187 sink_clone(&owned_name, msgs);
188 });
189 let sub_id = core.subscribe(id, inner_sink);
190 self.subs.push((id, sub_id));
191 }
192 count
193 }
194
195 /// Owner-invoked, synchronous detach of every fan-out sink
196 /// (D246 rule 3 — no `Drop`, so no Blind #4 deadlock class).
197 pub fn detach(&mut self, core: &Core) {
198 for (node_id, sub_id) in self.subs.drain(..) {
199 core.unsubscribe(node_id, sub_id);
200 }
201 }
202}
203
204// -------------------------------------------------------------------
205// Reactive observe_all — auto-subscribe late-added nodes
206// -------------------------------------------------------------------
207
208struct ObserveAllReactiveInner {
209 /// Set of `NodeId`s we've already subscribed to.
210 subscribed: HashSet<NodeId>,
211 /// Live `(node, sub)` pairs — unsubscribed on detach / prune.
212 subs: Vec<(NodeId, SubscriptionId)>,
213}
214
215/// Reactive `observe_all` — auto-subscribes late-added named nodes via
216/// the owner-side namespace-change listener, and prunes torn-down
217/// nodes via the Core topology sub (the prune `unsubscribe` is
218/// `MailboxOp::Defer`'d since `NodeTornDown` fires in-wave — D246 r6).
219/// D246 rule 3: no RAII `Drop`; teardown is the owner-invoked
220/// [`Self::detach`]`(core)` — owner-invoked, REQUIRED. The ns-sink is
221/// collected by `graph.destroy(core)`; the Core topology sub + fan-out
222/// subs are opened via raw `core.subscribe*` and are NOT
223/// `OwnedCore`-tracked, so `detach(core)` is the only thing that
224/// collects them (dropping the handle without it leaks them).
225#[must_use = "GraphObserveAllReactive holds a Core topology sub + fan-out subs NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
226pub struct GraphObserveAllReactive {
227 graph: Graph,
228 ns_sink_id: Option<u64>,
229 topo_sub_id: Option<TopologySubscriptionId>,
230 inner: Rc<RefCell<ObserveAllReactiveInner>>,
231}
232
233impl GraphObserveAllReactive {
234 pub(crate) fn new(graph: Graph) -> Self {
235 Self {
236 graph,
237 ns_sink_id: None,
238 topo_sub_id: None,
239 inner: Rc::new(RefCell::new(ObserveAllReactiveInner {
240 subscribed: HashSet::new(),
241 subs: Vec::new(),
242 })),
243 }
244 }
245
246 /// Subscribe a sink to all current AND future named nodes.
247 ///
248 /// # Panics
249 ///
250 /// Panics if called more than once on the same handle (single-shot
251 /// wiring; rebuild via `observe_all_reactive`).
252 //
253 // Single load-bearing subscribe path: wires initial-snapshot taps for
254 // every named node PLUS a namespace-change sink that wires late
255 // additions. Splitting mechanically would interleave shared state
256 // (sink Arc, weak inner, ns_sink_id) across helpers without clarifying
257 // intent — keep cohesive.
258 #[allow(clippy::too_many_lines)]
259 pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
260 where
261 F: Fn(&str, &[Message]) + 'static,
262 {
263 assert!(
264 self.ns_sink_id.is_none(),
265 "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
266 );
267
268 let sink_arc: Arc<F> = Arc::new(sink);
269
270 // P4: install the namespace listener BEFORE the initial
271 // snapshot (the listener's `subscribed.insert` dedups).
272 // D246 rule 2: the listener receives the owner's `&Core` at
273 // fire-time (no stored/cloned Core); it subscribes new nodes
274 // synchronously (fire_namespace_change is owner-side, not
275 // in-wave).
276 let weak_graph_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(self.graph.inner_arc());
277 let inner_for_ns = self.inner.clone();
278 let sink_for_ns = sink_arc.clone();
279 let ns_sink: NamespaceChangeSink = Rc::new(move |core: &Core| {
280 let Some(arc_inner) = weak_graph_inner.upgrade() else {
281 return;
282 };
283 let new_nodes: Vec<(String, NodeId)> = {
284 let graph_inner = arc_inner.borrow_mut();
285 let state = inner_for_ns.borrow_mut();
286 graph_inner
287 .names
288 .iter()
289 .filter(|(_n, id)| !state.subscribed.contains(id))
290 .map(|(n, id)| (n.clone(), *id))
291 .collect()
292 };
293 for (name, id) in new_nodes {
294 let should = {
295 let mut state = inner_for_ns.borrow_mut();
296 state.subscribed.insert(id)
297 };
298 if should {
299 let sink_clone = sink_for_ns.clone();
300 let owned_name = name;
301 let msg_sink: Sink = Rc::new(move |msgs: &[Message]| {
302 sink_clone(&owned_name, msgs);
303 });
304 let sub_id = core.subscribe(id, msg_sink);
305 inner_for_ns.borrow_mut().subs.push((id, sub_id));
306 }
307 }
308 });
309 self.ns_sink_id = Some(register_ns_sink(self.graph.inner_arc(), ns_sink));
310
311 // Slice V3 D2: prune torn-down nodes. `NodeTornDown` fires
312 // in-wave → the `unsubscribe` is `MailboxOp::Defer`'d (D246 r6:
313 // no synchronous sink-side Core mutation).
314 let inner_for_topo = self.inner.clone();
315 // D249/S2c: post to the owner-side `!Send` `DeferQueue` (the
316 // closure captures the `Rc<RefCell<ObserveAllReactiveInner>>`,
317 // `!Send`); `Rc<DeferQueue>` is owner-thread-only — fine, this
318 // topo sink is `!Send` (D248) and fires on the owner thread.
319 let deferred = core.defer_queue();
320 // D246 rule 8 (S4): reusable coalescing slot. Prune is NOT
321 // idempotent (each torn id must be unsubscribed once), so
322 // accumulate torn ids into a shared owner-thread-only buffer
323 // and post ONE `Box` per drain that processes all of them —
324 // instead of one boxed closure per `NodeTornDown`.
325 // Behaviour-equivalent: the set of (node,sub) pairs unsubscribed
326 // is exactly the union, just batched into one deferred pass.
327 let pending: Rc<RefCell<Vec<NodeId>>> = Rc::new(RefCell::new(Vec::new()));
328 let scheduled = Rc::new(std::cell::Cell::new(false));
329 let topo_sink: Rc<dyn Fn(&TopologyEvent)> = Rc::new(move |event: &TopologyEvent| {
330 if let TopologyEvent::NodeTornDown(id) = event {
331 // INVARIANT (QA, 2026-05-19): push BEFORE the
332 // `scheduled.get()` check so a fire arriving while a
333 // defer is in-flight (after `sched.set(false)`,
334 // before the next batch) still gets captured by the
335 // in-flight drain's `mem::take`. Re-entry from
336 // `cf.unsubscribe` (a future code path adding
337 // teardown-on-last-unsub) would land here; the
338 // closure releases the `pending` borrow via
339 // `mem::take` BEFORE invoking `cf.*` so no
340 // `already-borrowed` panic.
341 pending.borrow_mut().push(*id);
342 if scheduled.get() {
343 return; // already armed for this drain — coalesce.
344 }
345 scheduled.set(true);
346 let inner_for_defer = inner_for_topo.clone();
347 let pending_for_defer = Rc::clone(&pending);
348 let sched = Rc::clone(&scheduled);
349 // No `HandleId` captured — Core-gone (`false`) just
350 // skips the prune; nothing to release (D235 P8).
351 let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
352 sched.set(false);
353 let torn: Vec<NodeId> = std::mem::take(&mut *pending_for_defer.borrow_mut());
354 let to_unsub: Vec<(NodeId, SubscriptionId)> = {
355 let mut state = inner_for_defer.borrow_mut();
356 let mut acc = Vec::new();
357 for id in torn {
358 if state.subscribed.remove(&id) {
359 let (keep, drop_): (Vec<_>, Vec<_>) =
360 state.subs.drain(..).partition(|(n, _)| *n != id);
361 state.subs = keep;
362 acc.extend(drop_);
363 }
364 }
365 acc
366 };
367 for (n, s) in to_unsub {
368 cf.unsubscribe(n, s);
369 }
370 }));
371 }
372 });
373 self.topo_sub_id = Some(core.subscribe_topology(topo_sink));
374
375 // Initial snapshot (listener's idempotent walk dedups overlap).
376 let names_to_ids: Vec<(String, NodeId)> = {
377 let graph_inner = self.graph.inner_arc().borrow_mut();
378 graph_inner
379 .names
380 .iter()
381 .map(|(n, id)| (n.clone(), *id))
382 .collect()
383 };
384 let initial_count = names_to_ids.len();
385 let to_subscribe: Vec<(String, NodeId)> = {
386 let mut state = self.inner.borrow_mut();
387 names_to_ids
388 .into_iter()
389 .filter(|(_n, id)| state.subscribed.insert(*id))
390 .collect()
391 };
392 for (name, id) in to_subscribe {
393 let sink_clone = sink_arc.clone();
394 let msg_sink: Sink = Rc::new(move |msgs: &[Message]| {
395 sink_clone(&name, msgs);
396 });
397 let sub_id = core.subscribe(id, msg_sink);
398 self.inner.borrow_mut().subs.push((id, sub_id));
399 }
400
401 initial_count
402 }
403
404 /// Owner-invoked, synchronous teardown (D246 rule 3 — replaces the
405 /// retired RAII `Drop`; eliminates the Blind #4 deadlock class).
406 /// Topology sub first, then namespace sink, then drain the fan-out
407 /// subs into a local `Vec` and release the `inner` lock BEFORE the
408 /// `core.unsubscribe` cascade (`Core::unsubscribe` runs the full
409 /// deactivation chain and can fire sinks synchronously; holding
410 /// `inner` across it would self-deadlock — the pre-β invariant,
411 /// preserved).
412 pub fn detach(&mut self, core: &Core) {
413 if let Some(id) = self.topo_sub_id.take() {
414 core.unsubscribe_topology(id);
415 }
416 if let Some(id) = self.ns_sink_id.take() {
417 crate::graph::unregister_ns_sink(self.graph.inner_arc(), id);
418 }
419 let drained: Vec<(NodeId, SubscriptionId)> = {
420 let mut state = self.inner.borrow_mut();
421 state.subs.drain(..).collect()
422 };
423 for (node_id, sub_id) in drained {
424 core.unsubscribe(node_id, sub_id);
425 }
426 }
427
428 /// The backing graph handle.
429 #[must_use]
430 pub fn graph(&self) -> &Graph {
431 &self.graph
432 }
433}