graphrefly_graph/observe.rs
1//! `Graph::observe()` / `Graph::observe_all()` — default sink-style
2//! message tap (canonical spec §3.6.2 default mode).
3//!
4//! `observe_all_reactive()` auto-subscribes late-added nodes via
5//! the Core topology-change notification primitive (Slice F+).
6//!
7//! Async-iterable / reactive (`Node<ObserveChangeset>`) / changeset
8//! variants are deferred (Phase 14).
9
10use std::collections::HashSet;
11use std::sync::{Arc, Weak};
12
13use graphrefly_core::{
14 Core, LockId, Message, NodeId, PauseError, Sink, Subscription, TopologyEvent,
15 TopologySubscription,
16};
17use parking_lot::Mutex;
18
19use crate::graph::{Graph, GraphInner};
20
21/// Single-node observe handle. `subscribe` taps downstream messages
22/// from the observed node (same payload shape as a direct
23/// [`graphrefly_core::Core::subscribe`], including the
24/// `[Start, Data?]` handshake when a cache is present per R1.2.3
25/// / R1.3.5.a).
26///
27/// `up` methods send tier-2 / tier-4 messages upstream
28/// (`PAUSE` / `RESUME` / `INVALIDATE`) — the `up(messages)` API per
29/// canonical §3.6.2 specialized to the supported message kinds.
30/// Tier-3 / tier-5 / tier-6 messages have no upstream-injection
31/// semantics.
32#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
33pub struct GraphObserveOne {
34 graph: Graph,
35 node_id: NodeId,
36}
37
38impl GraphObserveOne {
39 pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
40 Self { graph, node_id }
41 }
42
43 /// The observed `NodeId`.
44 #[must_use]
45 pub fn node_id(&self) -> NodeId {
46 self.node_id
47 }
48
49 /// Subscribe a sink. Drop the returned [`Subscription`] to detach.
50 pub fn subscribe(&self, sink: Sink) -> Subscription {
51 self.graph.subscribe(self.node_id, sink)
52 }
53
54 /// Send `[PAUSE, lock]` upstream (per canonical §3.6.2 `up(...)`).
55 pub fn pause(&self, lock: LockId) -> Result<(), PauseError> {
56 self.graph.pause(self.node_id, lock)
57 }
58
59 /// Send `[RESUME, lock]` upstream.
60 pub fn resume(
61 &self,
62 lock: LockId,
63 ) -> Result<Option<graphrefly_core::ResumeReport>, PauseError> {
64 self.graph.resume(self.node_id, lock)
65 }
66
67 /// Send `[INVALIDATE]` upstream.
68 pub fn invalidate(&self) {
69 self.graph.invalidate(self.node_id);
70 }
71}
72
73/// All-nodes observe handle. `subscribe` multiplexes across every
74/// named node in the graph (NOT recursive into mounts — observers
75/// wanting recursion compose with `child.observe_all()` per
76/// subgraph).
77///
78/// The sink receives `(name, &[Message])` tuples; `name` is the
79/// local namespace name. Subscriptions stay tied to the set of
80/// nodes named at `observe_all()` call time — late-added nodes
81/// are NOT auto-subscribed in this slice (lifts with the reactive
82/// observe topology-change primitive in a later slice).
83#[must_use = "GraphObserveAll holds Subscriptions; dropping it unsubscribes all sinks"]
84pub struct GraphObserveAll {
85 graph: Graph,
86 /// One `Subscription` per named node at `observe_all()` call time.
87 /// Held by the handle; dropping the handle unsubscribes every
88 /// fan-out sink.
89 subs: Vec<Subscription>,
90}
91
92impl GraphObserveAll {
93 pub(crate) fn new(graph: Graph) -> Self {
94 Self {
95 graph,
96 subs: Vec::new(),
97 }
98 }
99
100 /// Multi-cast subscribe — registers `sink` against every named
101 /// node at this exact moment. Each underlying subscription is
102 /// kept alive in `self`; drop the handle to unsubscribe all.
103 ///
104 /// Returns the number of nodes the sink was registered against.
105 pub fn subscribe<F>(&mut self, sink: F) -> usize
106 where
107 F: Fn(&str, &[Message]) + Send + Sync + 'static,
108 {
109 let names_to_ids: Vec<(String, NodeId)> = {
110 let inner = self.graph.inner.lock();
111 inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
112 };
113 let sink_arc: Arc<F> = Arc::new(sink);
114 let count = names_to_ids.len();
115 for (name, id) in names_to_ids {
116 let sink_clone = sink_arc.clone();
117 let owned_name = name;
118 let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
119 sink_clone(&owned_name, msgs);
120 });
121 let sub = self.graph.subscribe(id, inner_sink);
122 self.subs.push(sub);
123 }
124 count
125 }
126}
127
128impl Graph {
129 /// Tap a single node's downstream message stream.
130 ///
131 /// # Panics
132 ///
133 /// Panics if `path` doesn't resolve. Use [`Graph::try_resolve`]
134 /// + a manual subscribe if non-panicking is required.
135 pub fn observe(&self, path: &str) -> GraphObserveOne {
136 let id = self.node(path);
137 GraphObserveOne::new(self.clone(), id)
138 }
139
140 /// Tap every named node in this graph.
141 ///
142 /// # Snapshot semantics
143 ///
144 /// The returned handle subscribes against the namespace at
145 /// the moment `subscribe()` is called. Nodes named AFTER that
146 /// call are not auto-subscribed. Use [`Self::observe_all_reactive`]
147 /// for dynamic membership.
148 pub fn observe_all(&self) -> GraphObserveAll {
149 GraphObserveAll::new(self.clone())
150 }
151
152 /// Tap every named node AND auto-subscribe late-added nodes.
153 ///
154 /// Like [`Self::observe_all`] but subscribes to topology changes
155 /// so that nodes registered AFTER the initial `subscribe()` call
156 /// are automatically picked up. Dropping the returned handle
157 /// unsubscribes all fan-out sinks AND the topology listener.
158 pub fn observe_all_reactive(&self) -> GraphObserveAllReactive {
159 GraphObserveAllReactive::new(self.clone())
160 }
161}
162
163// -------------------------------------------------------------------
164// Reactive observe_all — auto-subscribe late-added nodes
165// -------------------------------------------------------------------
166
167/// Shared state for the reactive `observe_all` subscription.
168struct ObserveAllReactiveInner {
169 /// Set of `NodeId`s we've already subscribed to.
170 subscribed: HashSet<NodeId>,
171 /// Live subscriptions — kept alive so dropping them unsubscribes.
172 subs: Vec<Subscription>,
173}
174
175/// Reactive variant of [`GraphObserveAll`] that auto-subscribes
176/// late-added named nodes via Graph namespace-change notifications.
177///
178/// Drop to unsubscribe everything.
179#[must_use = "GraphObserveAllReactive holds Subscriptions; dropping it unsubscribes all sinks"]
180pub struct GraphObserveAllReactive {
181 graph: Graph,
182 /// Namespace-change subscription id. Dropped BEFORE `inner` to
183 /// avoid deadlock: unsubscribe removes the sink (which holds a
184 /// `Weak<Mutex<GraphInner>>`); the closure's captured Arcs are
185 /// not the last refs because `inner` is still owned by `Self`.
186 ns_sink_id: Option<u64>,
187 /// Slice V3: Core topology subscription for pruning torn-down nodes
188 /// from `inner.subscribed` + `inner.subs`. Prevents unbounded
189 /// accumulation on long-running graphs (D2 in porting-deferred.md).
190 topo_sub: Option<TopologySubscription>,
191 inner: Arc<Mutex<ObserveAllReactiveInner>>,
192}
193
194// Send + Sync compile-time assertion.
195const _: fn() = || {
196 fn assert_send_sync<T: Send + Sync>() {}
197 assert_send_sync::<GraphObserveAllReactive>();
198};
199
200impl Drop for GraphObserveAllReactive {
201 fn drop(&mut self) {
202 // Unsubscribe namespace sink BEFORE inner drops, to avoid
203 // the deadlock described in the field comment.
204 if let Some(id) = self.ns_sink_id.take() {
205 self.graph.unsubscribe_namespace_change(id);
206 }
207 }
208}
209
210impl GraphObserveAllReactive {
211 fn new(graph: Graph) -> Self {
212 Self {
213 graph,
214 ns_sink_id: None,
215 topo_sub: None,
216 inner: Arc::new(Mutex::new(ObserveAllReactiveInner {
217 subscribed: HashSet::new(),
218 subs: Vec::new(),
219 })),
220 }
221 }
222
223 /// Subscribe a sink to all current AND future named nodes.
224 ///
225 /// The sink receives `(name, &[Message])` tuples, same as
226 /// [`GraphObserveAll::subscribe`]. Returns the number of nodes
227 /// subscribed at call time (future auto-subscriptions are not
228 /// counted).
229 ///
230 /// # Panics
231 ///
232 /// Panics if called more than once on the same handle. The v1
233 /// contract is "single-shot wiring"; rebuild the handle via
234 /// [`Graph::observe_all_reactive`] to install another sink.
235 pub fn subscribe<F>(&mut self, sink: F) -> usize
236 where
237 F: Fn(&str, &[Message]) + Send + Sync + 'static,
238 {
239 // P5 — subscribe-once contract. A second call would leak the
240 // first namespace sink (we'd overwrite `ns_sink_id` without
241 // unsubscribing the prior). Panic on misuse instead.
242 assert!(
243 self.ns_sink_id.is_none(),
244 "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
245 );
246
247 let sink_arc: Arc<F> = Arc::new(sink);
248
249 // P4 — install the namespace listener BEFORE the initial
250 // snapshot. A node added concurrently between snapshot and
251 // listener-install would otherwise be permanently missed.
252 // The listener's `inner.subscribed.insert(id)` dedups against
253 // the snapshot, so an idempotent overlap is harmless.
254 //
255 // P6 — capture Weak<inner> + Core (clone) instead of the full
256 // Graph clone. This breaks the namespace_sinks → sink → Graph
257 // → namespace_sinks Arc cycle so leaking the handle does not
258 // leak the graph.
259 let weak_graph_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.graph.inner);
260 let core: Core = self.graph.core.clone();
261 let inner_for_ns = self.inner.clone();
262 let sink_for_ns = sink_arc.clone();
263 let ns_sink = Arc::new(move || {
264 let Some(arc_inner) = weak_graph_inner.upgrade() else {
265 // Graph dropped; silent no-op.
266 return;
267 };
268 let graph = Graph {
269 core: core.clone(),
270 inner: arc_inner,
271 };
272 // Scan for any newly-named nodes we haven't subscribed to.
273 let new_nodes: Vec<(String, NodeId)> = {
274 let graph_inner = graph.inner.lock();
275 let state = inner_for_ns.lock();
276 graph_inner
277 .names
278 .iter()
279 .filter(|(_name, id)| !state.subscribed.contains(id))
280 .map(|(n, id)| (n.clone(), *id))
281 .collect()
282 };
283 for (name, id) in new_nodes {
284 let should_subscribe = {
285 let mut state = inner_for_ns.lock();
286 state.subscribed.insert(id)
287 };
288 if should_subscribe {
289 let sink_clone = sink_for_ns.clone();
290 let owned_name = name;
291 let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
292 sink_clone(&owned_name, msgs);
293 });
294 let sub = graph.subscribe(id, msg_sink);
295 inner_for_ns.lock().subs.push(sub);
296 }
297 }
298 });
299 self.ns_sink_id = Some(self.graph.subscribe_namespace_change(ns_sink));
300
301 // Slice V3: subscribe to Core topology events to prune torn-down
302 // nodes from subscribed + subs. Prevents unbounded accumulation
303 // on long-running graphs (D2 in porting-deferred.md).
304 let inner_for_topo = self.inner.clone();
305 let topo_sink: Arc<dyn Fn(&TopologyEvent) + Send + Sync> =
306 Arc::new(move |event: &TopologyEvent| {
307 if let TopologyEvent::NodeTornDown(id) = event {
308 let mut state = inner_for_topo.lock();
309 if state.subscribed.remove(id) {
310 // Drop the Subscription for this node. The
311 // Subscription's Drop will unsubscribe the sink
312 // from Core.
313 state.subs.retain(|sub| sub.node_id() != *id);
314 }
315 }
316 });
317 self.topo_sub = Some(self.graph.core.subscribe_topology(topo_sink));
318
319 // Now take the initial snapshot. Any node added between
320 // listener-install and snapshot will be picked up by the
321 // listener's idempotent walk — `inner.subscribed.insert(id)`
322 // returns false if we beat the listener to it.
323 let names_to_ids: Vec<(String, NodeId)> = {
324 let graph_inner = self.graph.inner.lock();
325 graph_inner
326 .names
327 .iter()
328 .map(|(n, id)| (n.clone(), *id))
329 .collect()
330 };
331 let initial_count = names_to_ids.len();
332 let to_subscribe: Vec<(String, NodeId)> = {
333 let mut inner = self.inner.lock();
334 names_to_ids
335 .into_iter()
336 .filter(|(_name, id)| inner.subscribed.insert(*id))
337 .collect()
338 };
339 for (name, id) in to_subscribe {
340 let sink_clone = sink_arc.clone();
341 let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
342 sink_clone(&name, msgs);
343 });
344 let sub = self.graph.subscribe(id, msg_sink);
345 self.inner.lock().subs.push(sub);
346 }
347
348 initial_count
349 }
350}