graphrefly_graph/observe.rs
1//! `Graph::observe()` / `Graph::observe_all()` — default sink-style
2//! message tap (canonical spec §3.6.2 default mode).
3//!
4//! `observe_all_reactive()` auto-subscribes late-added nodes via
5//! the Core topology-change notification primitive (Slice F+).
6//!
7//! Async-iterable / reactive (`Node<ObserveChangeset>`) / changeset
8//! variants are deferred (Phase 14).
9
10use std::collections::HashSet;
11use std::sync::{Arc, Weak};
12
13use graphrefly_core::{Core, LockId, Message, NodeId, PauseError, Sink, Subscription};
14use parking_lot::Mutex;
15
16use crate::graph::{Graph, GraphInner};
17
18/// Single-node observe handle. `subscribe` taps downstream messages
19/// from the observed node (same payload shape as a direct
20/// [`graphrefly_core::Core::subscribe`], including the
21/// `[Start, Data?]` handshake when a cache is present per R1.2.3
22/// / R1.3.5.a).
23///
24/// `up` methods send tier-2 / tier-4 messages upstream
25/// (`PAUSE` / `RESUME` / `INVALIDATE`) — the `up(messages)` API per
26/// canonical §3.6.2 specialized to the supported message kinds.
27/// Tier-3 / tier-5 / tier-6 messages have no upstream-injection
28/// semantics.
29#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
30pub struct GraphObserveOne {
31 graph: Graph,
32 node_id: NodeId,
33}
34
35impl GraphObserveOne {
36 pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
37 Self { graph, node_id }
38 }
39
40 /// The observed `NodeId`.
41 #[must_use]
42 pub fn node_id(&self) -> NodeId {
43 self.node_id
44 }
45
46 /// Subscribe a sink. Drop the returned [`Subscription`] to detach.
47 pub fn subscribe(&self, sink: Sink) -> Subscription {
48 self.graph.subscribe(self.node_id, sink)
49 }
50
51 /// Send `[PAUSE, lock]` upstream (per canonical §3.6.2 `up(...)`).
52 pub fn pause(&self, lock: LockId) -> Result<(), PauseError> {
53 self.graph.pause(self.node_id, lock)
54 }
55
56 /// Send `[RESUME, lock]` upstream.
57 pub fn resume(
58 &self,
59 lock: LockId,
60 ) -> Result<Option<graphrefly_core::ResumeReport>, PauseError> {
61 self.graph.resume(self.node_id, lock)
62 }
63
64 /// Send `[INVALIDATE]` upstream.
65 pub fn invalidate(&self) {
66 self.graph.invalidate(self.node_id);
67 }
68}
69
70/// All-nodes observe handle. `subscribe` multiplexes across every
71/// named node in the graph (NOT recursive into mounts — observers
72/// wanting recursion compose with `child.observe_all()` per
73/// subgraph).
74///
75/// The sink receives `(name, &[Message])` tuples; `name` is the
76/// local namespace name. Subscriptions stay tied to the set of
77/// nodes named at `observe_all()` call time — late-added nodes
78/// are NOT auto-subscribed in this slice (lifts with the reactive
79/// observe topology-change primitive in a later slice).
80#[must_use = "GraphObserveAll holds Subscriptions; dropping it unsubscribes all sinks"]
81pub struct GraphObserveAll {
82 graph: Graph,
83 /// One `Subscription` per named node at `observe_all()` call time.
84 /// Held by the handle; dropping the handle unsubscribes every
85 /// fan-out sink.
86 subs: Vec<Subscription>,
87}
88
89impl GraphObserveAll {
90 pub(crate) fn new(graph: Graph) -> Self {
91 Self {
92 graph,
93 subs: Vec::new(),
94 }
95 }
96
97 /// Multi-cast subscribe — registers `sink` against every named
98 /// node at this exact moment. Each underlying subscription is
99 /// kept alive in `self`; drop the handle to unsubscribe all.
100 ///
101 /// Returns the number of nodes the sink was registered against.
102 pub fn subscribe<F>(&mut self, sink: F) -> usize
103 where
104 F: Fn(&str, &[Message]) + Send + Sync + 'static,
105 {
106 let names_to_ids: Vec<(String, NodeId)> = {
107 let inner = self.graph.inner.lock();
108 inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
109 };
110 let sink_arc: Arc<F> = Arc::new(sink);
111 let count = names_to_ids.len();
112 for (name, id) in names_to_ids {
113 let sink_clone = sink_arc.clone();
114 let owned_name = name;
115 let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
116 sink_clone(&owned_name, msgs);
117 });
118 let sub = self.graph.subscribe(id, inner_sink);
119 self.subs.push(sub);
120 }
121 count
122 }
123}
124
125impl Graph {
126 /// Tap a single node's downstream message stream.
127 ///
128 /// # Panics
129 ///
130 /// Panics if `path` doesn't resolve. Use [`Graph::try_resolve`]
131 /// + a manual subscribe if non-panicking is required.
132 pub fn observe(&self, path: &str) -> GraphObserveOne {
133 let id = self.node(path);
134 GraphObserveOne::new(self.clone(), id)
135 }
136
137 /// Tap every named node in this graph.
138 ///
139 /// # Snapshot semantics
140 ///
141 /// The returned handle subscribes against the namespace at
142 /// the moment `subscribe()` is called. Nodes named AFTER that
143 /// call are not auto-subscribed. Use [`Self::observe_all_reactive`]
144 /// for dynamic membership.
145 pub fn observe_all(&self) -> GraphObserveAll {
146 GraphObserveAll::new(self.clone())
147 }
148
149 /// Tap every named node AND auto-subscribe late-added nodes.
150 ///
151 /// Like [`Self::observe_all`] but subscribes to topology changes
152 /// so that nodes registered AFTER the initial `subscribe()` call
153 /// are automatically picked up. Dropping the returned handle
154 /// unsubscribes all fan-out sinks AND the topology listener.
155 pub fn observe_all_reactive(&self) -> GraphObserveAllReactive {
156 GraphObserveAllReactive::new(self.clone())
157 }
158}
159
160// -------------------------------------------------------------------
161// Reactive observe_all — auto-subscribe late-added nodes
162// -------------------------------------------------------------------
163
164/// Shared state for the reactive `observe_all` subscription.
165struct ObserveAllReactiveInner {
166 /// Set of `NodeId`s we've already subscribed to.
167 subscribed: HashSet<NodeId>,
168 /// Live subscriptions — kept alive so dropping them unsubscribes.
169 subs: Vec<Subscription>,
170}
171
172/// Reactive variant of [`GraphObserveAll`] that auto-subscribes
173/// late-added named nodes via Graph namespace-change notifications.
174///
175/// Drop to unsubscribe everything.
176#[must_use = "GraphObserveAllReactive holds Subscriptions; dropping it unsubscribes all sinks"]
177pub struct GraphObserveAllReactive {
178 graph: Graph,
179 /// Namespace-change subscription id. Dropped BEFORE `inner` to
180 /// avoid deadlock: unsubscribe removes the sink (which holds a
181 /// `Weak<Mutex<GraphInner>>`); the closure's captured Arcs are
182 /// not the last refs because `inner` is still owned by `Self`.
183 ns_sink_id: Option<u64>,
184 inner: Arc<Mutex<ObserveAllReactiveInner>>,
185}
186
187// Send + Sync compile-time assertion.
188const _: fn() = || {
189 fn assert_send_sync<T: Send + Sync>() {}
190 assert_send_sync::<GraphObserveAllReactive>();
191};
192
193impl Drop for GraphObserveAllReactive {
194 fn drop(&mut self) {
195 // Unsubscribe namespace sink BEFORE inner drops, to avoid
196 // the deadlock described in the field comment.
197 if let Some(id) = self.ns_sink_id.take() {
198 self.graph.unsubscribe_namespace_change(id);
199 }
200 }
201}
202
203impl GraphObserveAllReactive {
204 fn new(graph: Graph) -> Self {
205 Self {
206 graph,
207 ns_sink_id: None,
208 inner: Arc::new(Mutex::new(ObserveAllReactiveInner {
209 subscribed: HashSet::new(),
210 subs: Vec::new(),
211 })),
212 }
213 }
214
215 /// Subscribe a sink to all current AND future named nodes.
216 ///
217 /// The sink receives `(name, &[Message])` tuples, same as
218 /// [`GraphObserveAll::subscribe`]. Returns the number of nodes
219 /// subscribed at call time (future auto-subscriptions are not
220 /// counted).
221 ///
222 /// # Panics
223 ///
224 /// Panics if called more than once on the same handle. The v1
225 /// contract is "single-shot wiring"; rebuild the handle via
226 /// [`Graph::observe_all_reactive`] to install another sink.
227 pub fn subscribe<F>(&mut self, sink: F) -> usize
228 where
229 F: Fn(&str, &[Message]) + Send + Sync + 'static,
230 {
231 // P5 — subscribe-once contract. A second call would leak the
232 // first namespace sink (we'd overwrite `ns_sink_id` without
233 // unsubscribing the prior). Panic on misuse instead.
234 assert!(
235 self.ns_sink_id.is_none(),
236 "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
237 );
238
239 let sink_arc: Arc<F> = Arc::new(sink);
240
241 // P4 — install the namespace listener BEFORE the initial
242 // snapshot. A node added concurrently between snapshot and
243 // listener-install would otherwise be permanently missed.
244 // The listener's `inner.subscribed.insert(id)` dedups against
245 // the snapshot, so an idempotent overlap is harmless.
246 //
247 // P6 — capture Weak<inner> + Core (clone) instead of the full
248 // Graph clone. This breaks the namespace_sinks → sink → Graph
249 // → namespace_sinks Arc cycle so leaking the handle does not
250 // leak the graph.
251 let weak_graph_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.graph.inner);
252 let core: Core = self.graph.core.clone();
253 let inner_for_ns = self.inner.clone();
254 let sink_for_ns = sink_arc.clone();
255 let ns_sink = Arc::new(move || {
256 let Some(arc_inner) = weak_graph_inner.upgrade() else {
257 // Graph dropped; silent no-op.
258 return;
259 };
260 let graph = Graph {
261 core: core.clone(),
262 inner: arc_inner,
263 };
264 // Scan for any newly-named nodes we haven't subscribed to.
265 let new_nodes: Vec<(String, NodeId)> = {
266 let graph_inner = graph.inner.lock();
267 let state = inner_for_ns.lock();
268 graph_inner
269 .names
270 .iter()
271 .filter(|(_name, id)| !state.subscribed.contains(id))
272 .map(|(n, id)| (n.clone(), *id))
273 .collect()
274 };
275 for (name, id) in new_nodes {
276 let should_subscribe = {
277 let mut state = inner_for_ns.lock();
278 state.subscribed.insert(id)
279 };
280 if should_subscribe {
281 let sink_clone = sink_for_ns.clone();
282 let owned_name = name;
283 let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
284 sink_clone(&owned_name, msgs);
285 });
286 let sub = graph.subscribe(id, msg_sink);
287 inner_for_ns.lock().subs.push(sub);
288 }
289 }
290 });
291 self.ns_sink_id = Some(self.graph.subscribe_namespace_change(ns_sink));
292
293 // Now take the initial snapshot. Any node added between
294 // listener-install and snapshot will be picked up by the
295 // listener's idempotent walk — `inner.subscribed.insert(id)`
296 // returns false if we beat the listener to it.
297 let names_to_ids: Vec<(String, NodeId)> = {
298 let graph_inner = self.graph.inner.lock();
299 graph_inner
300 .names
301 .iter()
302 .map(|(n, id)| (n.clone(), *id))
303 .collect()
304 };
305 let initial_count = names_to_ids.len();
306 let to_subscribe: Vec<(String, NodeId)> = {
307 let mut inner = self.inner.lock();
308 names_to_ids
309 .into_iter()
310 .filter(|(_name, id)| inner.subscribed.insert(*id))
311 .collect()
312 };
313 for (name, id) in to_subscribe {
314 let sink_clone = sink_arc.clone();
315 let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
316 sink_clone(&name, msgs);
317 });
318 let sub = self.graph.subscribe(id, msg_sink);
319 self.inner.lock().subs.push(sub);
320 }
321
322 initial_count
323 }
324}