graphrefly_graph/observe.rs
1//! `Graph::observe()` / `observe_all()` / `observe_all_reactive()` —
2//! default sink-style message tap (canonical §3.6.2 default mode).
3//!
4//! D246: observe handles carry a Core-free [`Graph`] (a cheap `Arc`
5//! clone) + the embedder's `&Core` passed explicitly per call. There is
6//! **no RAII `Drop`** below the binding (D246 rule 3 — this eliminates
7//! the Blind #4 lock-across-unsubscribe-in-`Drop` deadlock class
8//! entirely): `subscribe` returns ids; teardown is the owner-invoked
9//! [`detach`](GraphObserveAllReactive::detach) (synchronous,
10//! `&Core`-explicit). The embedder's [`graphrefly_core::OwnedCore`] is
11//! the one RAII boundary.
12//!
13//! The reactive `observe_all` ns-listener fires owner-side with `&Core`
14//! (D246 rule 2). The Core-topology prune of torn-down nodes fires
15//! *inside* a wave, so its `unsubscribe` is routed through
16//! `MailboxOp::Defer` (D246 rule 6 — sink-in-wave defers).
17
18use std::cell::RefCell;
19use std::collections::HashSet;
20use std::rc::{Rc, Weak};
21use std::sync::Arc;
22
23use graphrefly_core::{
24 Core, CoreFull, LockId, Message, NodeId, PauseError, ResumeReport, Sink, SubscriptionId,
25 TopologyEvent, TopologySubscriptionId,
26};
27
28use crate::graph::{register_ns_sink, Graph, GraphInner, NamespaceChangeSink};
29
30/// Id pair for a single observe subscription. D246: a plain value (no
31/// `Drop`); detach explicitly via [`Self::detach`] or let the
32/// embedder's `OwnedCore` tear down on owner-thread drop.
33#[derive(Debug, Clone, Copy)]
34pub struct ObserveSub {
35 node_id: NodeId,
36 sub_id: SubscriptionId,
37}
38
39impl ObserveSub {
40 /// The observed node.
41 #[must_use]
42 pub fn node_id(&self) -> NodeId {
43 self.node_id
44 }
45
46 /// The subscription id.
47 #[must_use]
48 pub fn sub_id(&self) -> SubscriptionId {
49 self.sub_id
50 }
51
52 /// Owner-invoked, synchronous detach (D246 rule 3).
53 pub fn detach(&self, core: &Core) {
54 core.unsubscribe(self.node_id, self.sub_id);
55 }
56}
57
58/// Single-node observe handle (canonical §3.6.2). D246: holds a
59/// Core-free [`Graph`]; `&Core` is passed per call.
60#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
61pub struct GraphObserveOne {
62 graph: Graph,
63 node_id: NodeId,
64}
65
66impl GraphObserveOne {
67 pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
68 Self { graph, node_id }
69 }
70
71 /// The observed `NodeId`.
72 #[must_use]
73 pub fn node_id(&self) -> NodeId {
74 self.node_id
75 }
76
77 /// Subscribe a sink. Returns an [`ObserveSub`] id pair — detach
78 /// owner-invoked (D246 rule 3).
79 pub fn subscribe(&self, core: &Core, sink: Sink) -> ObserveSub {
80 let sub_id = core.subscribe(self.node_id, sink);
81 ObserveSub {
82 node_id: self.node_id,
83 sub_id,
84 }
85 }
86
87 /// Send `[PAUSE, lock]` upstream.
88 ///
89 /// # Errors
90 /// See [`PauseError`].
91 pub fn pause(&self, core: &Core, lock: LockId) -> Result<(), PauseError> {
92 core.pause(self.node_id, lock)
93 }
94
95 /// Send `[RESUME, lock]` upstream.
96 ///
97 /// # Errors
98 /// See [`PauseError`].
99 pub fn resume(&self, core: &Core, lock: LockId) -> Result<Option<ResumeReport>, PauseError> {
100 core.resume(self.node_id, lock)
101 }
102
103 /// Send `[INVALIDATE]` upstream.
104 pub fn invalidate(&self, core: &Core) {
105 core.invalidate(self.node_id);
106 }
107
108 /// The backing graph handle.
109 #[must_use]
110 pub fn graph(&self) -> &Graph {
111 &self.graph
112 }
113}
114
115/// All-nodes observe handle. Subscriptions are tied to the set of
116/// nodes named at `subscribe()` call time. D246: no RAII `Drop` —
117/// You MUST call [`Self::detach`]`(core)` (owner-invoked) — these Core
118/// subscriptions are opened via raw `core.subscribe` and are NOT
119/// `OwnedCore`-tracked, so dropping the handle without `detach` leaks
120/// them for the `Core` lifetime.
121#[must_use = "GraphObserveAll holds Core subscriptions NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
122pub struct GraphObserveAll {
123 graph: Graph,
124 subs: Vec<(NodeId, SubscriptionId)>,
125}
126
127impl GraphObserveAll {
128 pub(crate) fn new(graph: Graph) -> Self {
129 Self {
130 graph,
131 subs: Vec::new(),
132 }
133 }
134
135 /// Multi-cast subscribe against every named node at this moment.
136 /// Returns the node count.
137 pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
138 where
139 F: Fn(&str, &[Message]) + 'static,
140 {
141 let names_to_ids: Vec<(String, NodeId)> = {
142 let inner = self.graph.inner_arc().borrow_mut();
143 inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
144 };
145 let sink_arc: Arc<F> = Arc::new(sink);
146 let count = names_to_ids.len();
147 for (name, id) in names_to_ids {
148 let sink_clone = sink_arc.clone();
149 let owned_name = name;
150 let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
151 sink_clone(&owned_name, msgs);
152 });
153 let sub_id = core.subscribe(id, inner_sink);
154 self.subs.push((id, sub_id));
155 }
156 count
157 }
158
159 /// Owner-invoked, synchronous detach of every fan-out sink
160 /// (D246 rule 3 — no `Drop`, so no Blind #4 deadlock class).
161 pub fn detach(&mut self, core: &Core) {
162 for (node_id, sub_id) in self.subs.drain(..) {
163 core.unsubscribe(node_id, sub_id);
164 }
165 }
166}
167
168// -------------------------------------------------------------------
169// Reactive observe_all — auto-subscribe late-added nodes
170// -------------------------------------------------------------------
171
172struct ObserveAllReactiveInner {
173 /// Set of `NodeId`s we've already subscribed to.
174 subscribed: HashSet<NodeId>,
175 /// Live `(node, sub)` pairs — unsubscribed on detach / prune.
176 subs: Vec<(NodeId, SubscriptionId)>,
177}
178
179/// Reactive `observe_all` — auto-subscribes late-added named nodes via
180/// the owner-side namespace-change listener, and prunes torn-down
181/// nodes via the Core topology sub (the prune `unsubscribe` is
182/// `MailboxOp::Defer`'d since `NodeTornDown` fires in-wave — D246 r6).
183/// D246 rule 3: no RAII `Drop`; teardown is the owner-invoked
184/// [`Self::detach`]`(core)` — owner-invoked, REQUIRED. The ns-sink is
185/// collected by `graph.destroy(core)`; the Core topology sub + fan-out
186/// subs are opened via raw `core.subscribe*` and are NOT
187/// `OwnedCore`-tracked, so `detach(core)` is the only thing that
188/// collects them (dropping the handle without it leaks them).
189#[must_use = "GraphObserveAllReactive holds a Core topology sub + fan-out subs NOT tracked by OwnedCore; you MUST call detach(core) or they leak"]
190pub struct GraphObserveAllReactive {
191 graph: Graph,
192 ns_sink_id: Option<u64>,
193 topo_sub_id: Option<TopologySubscriptionId>,
194 inner: Rc<RefCell<ObserveAllReactiveInner>>,
195}
196
197impl GraphObserveAllReactive {
198 pub(crate) fn new(graph: Graph) -> Self {
199 Self {
200 graph,
201 ns_sink_id: None,
202 topo_sub_id: None,
203 inner: Rc::new(RefCell::new(ObserveAllReactiveInner {
204 subscribed: HashSet::new(),
205 subs: Vec::new(),
206 })),
207 }
208 }
209
210 /// Subscribe a sink to all current AND future named nodes.
211 ///
212 /// # Panics
213 ///
214 /// Panics if called more than once on the same handle (single-shot
215 /// wiring; rebuild via `observe_all_reactive`).
216 //
217 // Single load-bearing subscribe path: wires initial-snapshot taps for
218 // every named node PLUS a namespace-change sink that wires late
219 // additions. Splitting mechanically would interleave shared state
220 // (sink Arc, weak inner, ns_sink_id) across helpers without clarifying
221 // intent — keep cohesive.
222 #[allow(clippy::too_many_lines)]
223 pub fn subscribe<F>(&mut self, core: &Core, sink: F) -> usize
224 where
225 F: Fn(&str, &[Message]) + 'static,
226 {
227 assert!(
228 self.ns_sink_id.is_none(),
229 "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
230 );
231
232 let sink_arc: Arc<F> = Arc::new(sink);
233
234 // P4: install the namespace listener BEFORE the initial
235 // snapshot (the listener's `subscribed.insert` dedups).
236 // D246 rule 2: the listener receives the owner's `&Core` at
237 // fire-time (no stored/cloned Core); it subscribes new nodes
238 // synchronously (fire_namespace_change is owner-side, not
239 // in-wave).
240 let weak_graph_inner: Weak<RefCell<GraphInner>> = Rc::downgrade(self.graph.inner_arc());
241 let inner_for_ns = self.inner.clone();
242 let sink_for_ns = sink_arc.clone();
243 let ns_sink: NamespaceChangeSink = Arc::new(move |core: &Core| {
244 let Some(arc_inner) = weak_graph_inner.upgrade() else {
245 return;
246 };
247 let new_nodes: Vec<(String, NodeId)> = {
248 let graph_inner = arc_inner.borrow_mut();
249 let state = inner_for_ns.borrow_mut();
250 graph_inner
251 .names
252 .iter()
253 .filter(|(_n, id)| !state.subscribed.contains(id))
254 .map(|(n, id)| (n.clone(), *id))
255 .collect()
256 };
257 for (name, id) in new_nodes {
258 let should = {
259 let mut state = inner_for_ns.borrow_mut();
260 state.subscribed.insert(id)
261 };
262 if should {
263 let sink_clone = sink_for_ns.clone();
264 let owned_name = name;
265 let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
266 sink_clone(&owned_name, msgs);
267 });
268 let sub_id = core.subscribe(id, msg_sink);
269 inner_for_ns.borrow_mut().subs.push((id, sub_id));
270 }
271 }
272 });
273 self.ns_sink_id = Some(register_ns_sink(self.graph.inner_arc(), ns_sink));
274
275 // Slice V3 D2: prune torn-down nodes. `NodeTornDown` fires
276 // in-wave → the `unsubscribe` is `MailboxOp::Defer`'d (D246 r6:
277 // no synchronous sink-side Core mutation).
278 let inner_for_topo = self.inner.clone();
279 // D249/S2c: post to the owner-side `!Send` `DeferQueue` (the
280 // closure captures the `Rc<RefCell<ObserveAllReactiveInner>>`,
281 // `!Send`); `Rc<DeferQueue>` is owner-thread-only — fine, this
282 // topo sink is `!Send` (D248) and fires on the owner thread.
283 let deferred = core.defer_queue();
284 // D246 rule 8 (S4): reusable coalescing slot. Prune is NOT
285 // idempotent (each torn id must be unsubscribed once), so
286 // accumulate torn ids into a shared owner-thread-only buffer
287 // and post ONE `Box` per drain that processes all of them —
288 // instead of one boxed closure per `NodeTornDown`.
289 // Behaviour-equivalent: the set of (node,sub) pairs unsubscribed
290 // is exactly the union, just batched into one deferred pass.
291 let pending: Rc<RefCell<Vec<NodeId>>> = Rc::new(RefCell::new(Vec::new()));
292 let scheduled = Rc::new(std::cell::Cell::new(false));
293 let topo_sink: Arc<dyn Fn(&TopologyEvent)> = Arc::new(move |event: &TopologyEvent| {
294 if let TopologyEvent::NodeTornDown(id) = event {
295 // INVARIANT (QA, 2026-05-19): push BEFORE the
296 // `scheduled.get()` check so a fire arriving while a
297 // defer is in-flight (after `sched.set(false)`,
298 // before the next batch) still gets captured by the
299 // in-flight drain's `mem::take`. Re-entry from
300 // `cf.unsubscribe` (a future code path adding
301 // teardown-on-last-unsub) would land here; the
302 // closure releases the `pending` borrow via
303 // `mem::take` BEFORE invoking `cf.*` so no
304 // `already-borrowed` panic.
305 pending.borrow_mut().push(*id);
306 if scheduled.get() {
307 return; // already armed for this drain — coalesce.
308 }
309 scheduled.set(true);
310 let inner_for_defer = inner_for_topo.clone();
311 let pending_for_defer = Rc::clone(&pending);
312 let sched = Rc::clone(&scheduled);
313 // No `HandleId` captured — Core-gone (`false`) just
314 // skips the prune; nothing to release (D235 P8).
315 let _ = deferred.post(Box::new(move |cf: &dyn CoreFull| {
316 sched.set(false);
317 let torn: Vec<NodeId> = std::mem::take(&mut *pending_for_defer.borrow_mut());
318 let to_unsub: Vec<(NodeId, SubscriptionId)> = {
319 let mut state = inner_for_defer.borrow_mut();
320 let mut acc = Vec::new();
321 for id in torn {
322 if state.subscribed.remove(&id) {
323 let (keep, drop_): (Vec<_>, Vec<_>) =
324 state.subs.drain(..).partition(|(n, _)| *n != id);
325 state.subs = keep;
326 acc.extend(drop_);
327 }
328 }
329 acc
330 };
331 for (n, s) in to_unsub {
332 cf.unsubscribe(n, s);
333 }
334 }));
335 }
336 });
337 self.topo_sub_id = Some(core.subscribe_topology(topo_sink));
338
339 // Initial snapshot (listener's idempotent walk dedups overlap).
340 let names_to_ids: Vec<(String, NodeId)> = {
341 let graph_inner = self.graph.inner_arc().borrow_mut();
342 graph_inner
343 .names
344 .iter()
345 .map(|(n, id)| (n.clone(), *id))
346 .collect()
347 };
348 let initial_count = names_to_ids.len();
349 let to_subscribe: Vec<(String, NodeId)> = {
350 let mut state = self.inner.borrow_mut();
351 names_to_ids
352 .into_iter()
353 .filter(|(_n, id)| state.subscribed.insert(*id))
354 .collect()
355 };
356 for (name, id) in to_subscribe {
357 let sink_clone = sink_arc.clone();
358 let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
359 sink_clone(&name, msgs);
360 });
361 let sub_id = core.subscribe(id, msg_sink);
362 self.inner.borrow_mut().subs.push((id, sub_id));
363 }
364
365 initial_count
366 }
367
368 /// Owner-invoked, synchronous teardown (D246 rule 3 — replaces the
369 /// retired RAII `Drop`; eliminates the Blind #4 deadlock class).
370 /// Topology sub first, then namespace sink, then drain the fan-out
371 /// subs into a local `Vec` and release the `inner` lock BEFORE the
372 /// `core.unsubscribe` cascade (`Core::unsubscribe` runs the full
373 /// deactivation chain and can fire sinks synchronously; holding
374 /// `inner` across it would self-deadlock — the pre-β invariant,
375 /// preserved).
376 pub fn detach(&mut self, core: &Core) {
377 if let Some(id) = self.topo_sub_id.take() {
378 core.unsubscribe_topology(id);
379 }
380 if let Some(id) = self.ns_sink_id.take() {
381 crate::graph::unregister_ns_sink(self.graph.inner_arc(), id);
382 }
383 let drained: Vec<(NodeId, SubscriptionId)> = {
384 let mut state = self.inner.borrow_mut();
385 state.subs.drain(..).collect()
386 };
387 for (node_id, sub_id) in drained {
388 core.unsubscribe(node_id, sub_id);
389 }
390 }
391
392 /// The backing graph handle.
393 #[must_use]
394 pub fn graph(&self) -> &Graph {
395 &self.graph
396 }
397}