Skip to main content

graphrefly_graph/
observe.rs

1//! `Graph::observe()` / `Graph::observe_all()` — default sink-style
2//! message tap (canonical spec §3.6.2 default mode).
3//!
4//! `observe_all_reactive()` auto-subscribes late-added nodes via
5//! the Core topology-change notification primitive (Slice F+).
6//!
7//! Async-iterable / reactive (`Node<ObserveChangeset>`) / changeset
8//! variants are deferred (Phase 14).
9
10use std::collections::HashSet;
11use std::sync::{Arc, Weak};
12
13use graphrefly_core::{
14    Core, LockId, Message, NodeId, PauseError, Sink, Subscription, TopologyEvent,
15    TopologySubscription,
16};
17use parking_lot::Mutex;
18
19use crate::graph::{Graph, GraphInner};
20
21/// Single-node observe handle. `subscribe` taps downstream messages
22/// from the observed node (same payload shape as a direct
23/// [`graphrefly_core::Core::subscribe`], including the
24/// `[Start, Data?]` handshake when a cache is present per R1.2.3
25/// / R1.3.5.a).
26///
27/// `up` methods send tier-2 / tier-4 messages upstream
28/// (`PAUSE` / `RESUME` / `INVALIDATE`) — the `up(messages)` API per
29/// canonical §3.6.2 specialized to the supported message kinds.
30/// Tier-3 / tier-5 / tier-6 messages have no upstream-injection
31/// semantics.
32#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
33pub struct GraphObserveOne {
34    graph: Graph,
35    node_id: NodeId,
36}
37
38impl GraphObserveOne {
39    pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
40        Self { graph, node_id }
41    }
42
43    /// The observed `NodeId`.
44    #[must_use]
45    pub fn node_id(&self) -> NodeId {
46        self.node_id
47    }
48
49    /// Subscribe a sink. Drop the returned [`Subscription`] to detach.
50    pub fn subscribe(&self, sink: Sink) -> Subscription {
51        self.graph.subscribe(self.node_id, sink)
52    }
53
54    /// Send `[PAUSE, lock]` upstream (per canonical §3.6.2 `up(...)`).
55    pub fn pause(&self, lock: LockId) -> Result<(), PauseError> {
56        self.graph.pause(self.node_id, lock)
57    }
58
59    /// Send `[RESUME, lock]` upstream.
60    pub fn resume(
61        &self,
62        lock: LockId,
63    ) -> Result<Option<graphrefly_core::ResumeReport>, PauseError> {
64        self.graph.resume(self.node_id, lock)
65    }
66
67    /// Send `[INVALIDATE]` upstream.
68    pub fn invalidate(&self) {
69        self.graph.invalidate(self.node_id);
70    }
71}
72
73/// All-nodes observe handle. `subscribe` multiplexes across every
74/// named node in the graph (NOT recursive into mounts — observers
75/// wanting recursion compose with `child.observe_all()` per
76/// subgraph).
77///
78/// The sink receives `(name, &[Message])` tuples; `name` is the
79/// local namespace name. Subscriptions stay tied to the set of
80/// nodes named at `observe_all()` call time — late-added nodes
81/// are NOT auto-subscribed in this slice (lifts with the reactive
82/// observe topology-change primitive in a later slice).
83#[must_use = "GraphObserveAll holds Subscriptions; dropping it unsubscribes all sinks"]
84pub struct GraphObserveAll {
85    graph: Graph,
86    /// One `Subscription` per named node at `observe_all()` call time.
87    /// Held by the handle; dropping the handle unsubscribes every
88    /// fan-out sink.
89    subs: Vec<Subscription>,
90}
91
92impl GraphObserveAll {
93    pub(crate) fn new(graph: Graph) -> Self {
94        Self {
95            graph,
96            subs: Vec::new(),
97        }
98    }
99
100    /// Multi-cast subscribe — registers `sink` against every named
101    /// node at this exact moment. Each underlying subscription is
102    /// kept alive in `self`; drop the handle to unsubscribe all.
103    ///
104    /// Returns the number of nodes the sink was registered against.
105    pub fn subscribe<F>(&mut self, sink: F) -> usize
106    where
107        F: Fn(&str, &[Message]) + Send + Sync + 'static,
108    {
109        let names_to_ids: Vec<(String, NodeId)> = {
110            let inner = self.graph.inner.lock();
111            inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
112        };
113        let sink_arc: Arc<F> = Arc::new(sink);
114        let count = names_to_ids.len();
115        for (name, id) in names_to_ids {
116            let sink_clone = sink_arc.clone();
117            let owned_name = name;
118            let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
119                sink_clone(&owned_name, msgs);
120            });
121            let sub = self.graph.subscribe(id, inner_sink);
122            self.subs.push(sub);
123        }
124        count
125    }
126}
127
128impl Graph {
129    /// Tap a single node's downstream message stream.
130    ///
131    /// # Panics
132    ///
133    /// Panics if `path` doesn't resolve. Use [`Graph::try_resolve`]
134    /// + a manual subscribe if non-panicking is required.
135    pub fn observe(&self, path: &str) -> GraphObserveOne {
136        let id = self.node(path);
137        GraphObserveOne::new(self.clone(), id)
138    }
139
140    /// Tap every named node in this graph.
141    ///
142    /// # Snapshot semantics
143    ///
144    /// The returned handle subscribes against the namespace at
145    /// the moment `subscribe()` is called. Nodes named AFTER that
146    /// call are not auto-subscribed. Use [`Self::observe_all_reactive`]
147    /// for dynamic membership.
148    pub fn observe_all(&self) -> GraphObserveAll {
149        GraphObserveAll::new(self.clone())
150    }
151
152    /// Tap every named node AND auto-subscribe late-added nodes.
153    ///
154    /// Like [`Self::observe_all`] but subscribes to topology changes
155    /// so that nodes registered AFTER the initial `subscribe()` call
156    /// are automatically picked up. Dropping the returned handle
157    /// unsubscribes all fan-out sinks AND the topology listener.
158    pub fn observe_all_reactive(&self) -> GraphObserveAllReactive {
159        GraphObserveAllReactive::new(self.clone())
160    }
161}
162
163// -------------------------------------------------------------------
164// Reactive observe_all — auto-subscribe late-added nodes
165// -------------------------------------------------------------------
166
167/// Shared state for the reactive `observe_all` subscription.
168struct ObserveAllReactiveInner {
169    /// Set of `NodeId`s we've already subscribed to.
170    subscribed: HashSet<NodeId>,
171    /// Live subscriptions — kept alive so dropping them unsubscribes.
172    subs: Vec<Subscription>,
173}
174
175/// Reactive variant of [`GraphObserveAll`] that auto-subscribes
176/// late-added named nodes via Graph namespace-change notifications.
177///
178/// Drop to unsubscribe everything.
179#[must_use = "GraphObserveAllReactive holds Subscriptions; dropping it unsubscribes all sinks"]
180pub struct GraphObserveAllReactive {
181    graph: Graph,
182    /// Namespace-change subscription id. Dropped BEFORE `inner` to
183    /// avoid deadlock: unsubscribe removes the sink (which holds a
184    /// `Weak<Mutex<GraphInner>>`); the closure's captured Arcs are
185    /// not the last refs because `inner` is still owned by `Self`.
186    ns_sink_id: Option<u64>,
187    /// Slice V3: Core topology subscription for pruning torn-down nodes
188    /// from `inner.subscribed` + `inner.subs`. Prevents unbounded
189    /// accumulation on long-running graphs (D2 in porting-deferred.md).
190    topo_sub: Option<TopologySubscription>,
191    inner: Arc<Mutex<ObserveAllReactiveInner>>,
192}
193
194// Send + Sync compile-time assertion.
195const _: fn() = || {
196    fn assert_send_sync<T: Send + Sync>() {}
197    assert_send_sync::<GraphObserveAllReactive>();
198};
199
200impl Drop for GraphObserveAllReactive {
201    fn drop(&mut self) {
202        // Unsubscribe namespace sink BEFORE inner drops, to avoid
203        // the deadlock described in the field comment.
204        if let Some(id) = self.ns_sink_id.take() {
205            self.graph.unsubscribe_namespace_change(id);
206        }
207    }
208}
209
210impl GraphObserveAllReactive {
211    fn new(graph: Graph) -> Self {
212        Self {
213            graph,
214            ns_sink_id: None,
215            topo_sub: None,
216            inner: Arc::new(Mutex::new(ObserveAllReactiveInner {
217                subscribed: HashSet::new(),
218                subs: Vec::new(),
219            })),
220        }
221    }
222
223    /// Subscribe a sink to all current AND future named nodes.
224    ///
225    /// The sink receives `(name, &[Message])` tuples, same as
226    /// [`GraphObserveAll::subscribe`]. Returns the number of nodes
227    /// subscribed at call time (future auto-subscriptions are not
228    /// counted).
229    ///
230    /// # Panics
231    ///
232    /// Panics if called more than once on the same handle. The v1
233    /// contract is "single-shot wiring"; rebuild the handle via
234    /// [`Graph::observe_all_reactive`] to install another sink.
235    pub fn subscribe<F>(&mut self, sink: F) -> usize
236    where
237        F: Fn(&str, &[Message]) + Send + Sync + 'static,
238    {
239        // P5 — subscribe-once contract. A second call would leak the
240        // first namespace sink (we'd overwrite `ns_sink_id` without
241        // unsubscribing the prior). Panic on misuse instead.
242        assert!(
243            self.ns_sink_id.is_none(),
244            "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
245        );
246
247        let sink_arc: Arc<F> = Arc::new(sink);
248
249        // P4 — install the namespace listener BEFORE the initial
250        // snapshot. A node added concurrently between snapshot and
251        // listener-install would otherwise be permanently missed.
252        // The listener's `inner.subscribed.insert(id)` dedups against
253        // the snapshot, so an idempotent overlap is harmless.
254        //
255        // P6 — capture Weak<inner> + Core (clone) instead of the full
256        // Graph clone. This breaks the namespace_sinks → sink → Graph
257        // → namespace_sinks Arc cycle so leaking the handle does not
258        // leak the graph.
259        let weak_graph_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.graph.inner);
260        let core: Core = self.graph.core.clone();
261        let inner_for_ns = self.inner.clone();
262        let sink_for_ns = sink_arc.clone();
263        let ns_sink = Arc::new(move || {
264            let Some(arc_inner) = weak_graph_inner.upgrade() else {
265                // Graph dropped; silent no-op.
266                return;
267            };
268            let graph = Graph {
269                core: core.clone(),
270                inner: arc_inner,
271            };
272            // Scan for any newly-named nodes we haven't subscribed to.
273            let new_nodes: Vec<(String, NodeId)> = {
274                let graph_inner = graph.inner.lock();
275                let state = inner_for_ns.lock();
276                graph_inner
277                    .names
278                    .iter()
279                    .filter(|(_name, id)| !state.subscribed.contains(id))
280                    .map(|(n, id)| (n.clone(), *id))
281                    .collect()
282            };
283            for (name, id) in new_nodes {
284                let should_subscribe = {
285                    let mut state = inner_for_ns.lock();
286                    state.subscribed.insert(id)
287                };
288                if should_subscribe {
289                    let sink_clone = sink_for_ns.clone();
290                    let owned_name = name;
291                    let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
292                        sink_clone(&owned_name, msgs);
293                    });
294                    let sub = graph.subscribe(id, msg_sink);
295                    inner_for_ns.lock().subs.push(sub);
296                }
297            }
298        });
299        self.ns_sink_id = Some(self.graph.subscribe_namespace_change(ns_sink));
300
301        // Slice V3: subscribe to Core topology events to prune torn-down
302        // nodes from subscribed + subs. Prevents unbounded accumulation
303        // on long-running graphs (D2 in porting-deferred.md).
304        let inner_for_topo = self.inner.clone();
305        let topo_sink: Arc<dyn Fn(&TopologyEvent) + Send + Sync> =
306            Arc::new(move |event: &TopologyEvent| {
307                if let TopologyEvent::NodeTornDown(id) = event {
308                    let mut state = inner_for_topo.lock();
309                    if state.subscribed.remove(id) {
310                        // Drop the Subscription for this node. The
311                        // Subscription's Drop will unsubscribe the sink
312                        // from Core.
313                        state.subs.retain(|sub| sub.node_id() != *id);
314                    }
315                }
316            });
317        self.topo_sub = Some(self.graph.core.subscribe_topology(topo_sink));
318
319        // Now take the initial snapshot. Any node added between
320        // listener-install and snapshot will be picked up by the
321        // listener's idempotent walk — `inner.subscribed.insert(id)`
322        // returns false if we beat the listener to it.
323        let names_to_ids: Vec<(String, NodeId)> = {
324            let graph_inner = self.graph.inner.lock();
325            graph_inner
326                .names
327                .iter()
328                .map(|(n, id)| (n.clone(), *id))
329                .collect()
330        };
331        let initial_count = names_to_ids.len();
332        let to_subscribe: Vec<(String, NodeId)> = {
333            let mut inner = self.inner.lock();
334            names_to_ids
335                .into_iter()
336                .filter(|(_name, id)| inner.subscribed.insert(*id))
337                .collect()
338        };
339        for (name, id) in to_subscribe {
340            let sink_clone = sink_arc.clone();
341            let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
342                sink_clone(&name, msgs);
343            });
344            let sub = self.graph.subscribe(id, msg_sink);
345            self.inner.lock().subs.push(sub);
346        }
347
348        initial_count
349    }
350}