Skip to main content

graphrefly_graph/
observe.rs

1//! `Graph::observe()` / `Graph::observe_all()` — default sink-style
2//! message tap (canonical spec §3.6.2 default mode).
3//!
4//! `observe_all_reactive()` auto-subscribes late-added nodes via
5//! the Core topology-change notification primitive (Slice F+).
6//!
7//! Async-iterable / reactive (`Node<ObserveChangeset>`) / changeset
8//! variants are deferred (Phase 14).
9
10use std::collections::HashSet;
11use std::sync::{Arc, Weak};
12
13use graphrefly_core::{Core, LockId, Message, NodeId, PauseError, Sink, Subscription};
14use parking_lot::Mutex;
15
16use crate::graph::{Graph, GraphInner};
17
18/// Single-node observe handle. `subscribe` taps downstream messages
19/// from the observed node (same payload shape as a direct
20/// [`graphrefly_core::Core::subscribe`], including the
21/// `[Start, Data?]` handshake when a cache is present per R1.2.3
22/// / R1.3.5.a).
23///
24/// `up` methods send tier-2 / tier-4 messages upstream
25/// (`PAUSE` / `RESUME` / `INVALIDATE`) — the `up(messages)` API per
26/// canonical §3.6.2 specialized to the supported message kinds.
27/// Tier-3 / tier-5 / tier-6 messages have no upstream-injection
28/// semantics.
29#[must_use = "GraphObserveOne does nothing until you call subscribe()"]
30pub struct GraphObserveOne {
31    graph: Graph,
32    node_id: NodeId,
33}
34
35impl GraphObserveOne {
36    pub(crate) fn new(graph: Graph, node_id: NodeId) -> Self {
37        Self { graph, node_id }
38    }
39
40    /// The observed `NodeId`.
41    #[must_use]
42    pub fn node_id(&self) -> NodeId {
43        self.node_id
44    }
45
46    /// Subscribe a sink. Drop the returned [`Subscription`] to detach.
47    pub fn subscribe(&self, sink: Sink) -> Subscription {
48        self.graph.subscribe(self.node_id, sink)
49    }
50
51    /// Send `[PAUSE, lock]` upstream (per canonical §3.6.2 `up(...)`).
52    pub fn pause(&self, lock: LockId) -> Result<(), PauseError> {
53        self.graph.pause(self.node_id, lock)
54    }
55
56    /// Send `[RESUME, lock]` upstream.
57    pub fn resume(
58        &self,
59        lock: LockId,
60    ) -> Result<Option<graphrefly_core::ResumeReport>, PauseError> {
61        self.graph.resume(self.node_id, lock)
62    }
63
64    /// Send `[INVALIDATE]` upstream.
65    pub fn invalidate(&self) {
66        self.graph.invalidate(self.node_id);
67    }
68}
69
70/// All-nodes observe handle. `subscribe` multiplexes across every
71/// named node in the graph (NOT recursive into mounts — observers
72/// wanting recursion compose with `child.observe_all()` per
73/// subgraph).
74///
75/// The sink receives `(name, &[Message])` tuples; `name` is the
76/// local namespace name. Subscriptions stay tied to the set of
77/// nodes named at `observe_all()` call time — late-added nodes
78/// are NOT auto-subscribed in this slice (lifts with the reactive
79/// observe topology-change primitive in a later slice).
80#[must_use = "GraphObserveAll holds Subscriptions; dropping it unsubscribes all sinks"]
81pub struct GraphObserveAll {
82    graph: Graph,
83    /// One `Subscription` per named node at `observe_all()` call time.
84    /// Held by the handle; dropping the handle unsubscribes every
85    /// fan-out sink.
86    subs: Vec<Subscription>,
87}
88
89impl GraphObserveAll {
90    pub(crate) fn new(graph: Graph) -> Self {
91        Self {
92            graph,
93            subs: Vec::new(),
94        }
95    }
96
97    /// Multi-cast subscribe — registers `sink` against every named
98    /// node at this exact moment. Each underlying subscription is
99    /// kept alive in `self`; drop the handle to unsubscribe all.
100    ///
101    /// Returns the number of nodes the sink was registered against.
102    pub fn subscribe<F>(&mut self, sink: F) -> usize
103    where
104        F: Fn(&str, &[Message]) + Send + Sync + 'static,
105    {
106        let names_to_ids: Vec<(String, NodeId)> = {
107            let inner = self.graph.inner.lock();
108            inner.names.iter().map(|(n, id)| (n.clone(), *id)).collect()
109        };
110        let sink_arc: Arc<F> = Arc::new(sink);
111        let count = names_to_ids.len();
112        for (name, id) in names_to_ids {
113            let sink_clone = sink_arc.clone();
114            let owned_name = name;
115            let inner_sink: Sink = Arc::new(move |msgs: &[Message]| {
116                sink_clone(&owned_name, msgs);
117            });
118            let sub = self.graph.subscribe(id, inner_sink);
119            self.subs.push(sub);
120        }
121        count
122    }
123}
124
125impl Graph {
126    /// Tap a single node's downstream message stream.
127    ///
128    /// # Panics
129    ///
130    /// Panics if `path` doesn't resolve. Use [`Graph::try_resolve`]
131    /// + a manual subscribe if non-panicking is required.
132    pub fn observe(&self, path: &str) -> GraphObserveOne {
133        let id = self.node(path);
134        GraphObserveOne::new(self.clone(), id)
135    }
136
137    /// Tap every named node in this graph.
138    ///
139    /// # Snapshot semantics
140    ///
141    /// The returned handle subscribes against the namespace at
142    /// the moment `subscribe()` is called. Nodes named AFTER that
143    /// call are not auto-subscribed. Use [`Self::observe_all_reactive`]
144    /// for dynamic membership.
145    pub fn observe_all(&self) -> GraphObserveAll {
146        GraphObserveAll::new(self.clone())
147    }
148
149    /// Tap every named node AND auto-subscribe late-added nodes.
150    ///
151    /// Like [`Self::observe_all`] but subscribes to topology changes
152    /// so that nodes registered AFTER the initial `subscribe()` call
153    /// are automatically picked up. Dropping the returned handle
154    /// unsubscribes all fan-out sinks AND the topology listener.
155    pub fn observe_all_reactive(&self) -> GraphObserveAllReactive {
156        GraphObserveAllReactive::new(self.clone())
157    }
158}
159
160// -------------------------------------------------------------------
161// Reactive observe_all — auto-subscribe late-added nodes
162// -------------------------------------------------------------------
163
164/// Shared state for the reactive `observe_all` subscription.
165struct ObserveAllReactiveInner {
166    /// Set of `NodeId`s we've already subscribed to.
167    subscribed: HashSet<NodeId>,
168    /// Live subscriptions — kept alive so dropping them unsubscribes.
169    subs: Vec<Subscription>,
170}
171
172/// Reactive variant of [`GraphObserveAll`] that auto-subscribes
173/// late-added named nodes via Graph namespace-change notifications.
174///
175/// Drop to unsubscribe everything.
176#[must_use = "GraphObserveAllReactive holds Subscriptions; dropping it unsubscribes all sinks"]
177pub struct GraphObserveAllReactive {
178    graph: Graph,
179    /// Namespace-change subscription id. Dropped BEFORE `inner` to
180    /// avoid deadlock: unsubscribe removes the sink (which holds a
181    /// `Weak<Mutex<GraphInner>>`); the closure's captured Arcs are
182    /// not the last refs because `inner` is still owned by `Self`.
183    ns_sink_id: Option<u64>,
184    inner: Arc<Mutex<ObserveAllReactiveInner>>,
185}
186
187// Send + Sync compile-time assertion.
188const _: fn() = || {
189    fn assert_send_sync<T: Send + Sync>() {}
190    assert_send_sync::<GraphObserveAllReactive>();
191};
192
193impl Drop for GraphObserveAllReactive {
194    fn drop(&mut self) {
195        // Unsubscribe namespace sink BEFORE inner drops, to avoid
196        // the deadlock described in the field comment.
197        if let Some(id) = self.ns_sink_id.take() {
198            self.graph.unsubscribe_namespace_change(id);
199        }
200    }
201}
202
203impl GraphObserveAllReactive {
204    fn new(graph: Graph) -> Self {
205        Self {
206            graph,
207            ns_sink_id: None,
208            inner: Arc::new(Mutex::new(ObserveAllReactiveInner {
209                subscribed: HashSet::new(),
210                subs: Vec::new(),
211            })),
212        }
213    }
214
215    /// Subscribe a sink to all current AND future named nodes.
216    ///
217    /// The sink receives `(name, &[Message])` tuples, same as
218    /// [`GraphObserveAll::subscribe`]. Returns the number of nodes
219    /// subscribed at call time (future auto-subscriptions are not
220    /// counted).
221    ///
222    /// # Panics
223    ///
224    /// Panics if called more than once on the same handle. The v1
225    /// contract is "single-shot wiring"; rebuild the handle via
226    /// [`Graph::observe_all_reactive`] to install another sink.
227    pub fn subscribe<F>(&mut self, sink: F) -> usize
228    where
229        F: Fn(&str, &[Message]) + Send + Sync + 'static,
230    {
231        // P5 — subscribe-once contract. A second call would leak the
232        // first namespace sink (we'd overwrite `ns_sink_id` without
233        // unsubscribing the prior). Panic on misuse instead.
234        assert!(
235            self.ns_sink_id.is_none(),
236            "GraphObserveAllReactive::subscribe is single-shot; called twice on the same handle"
237        );
238
239        let sink_arc: Arc<F> = Arc::new(sink);
240
241        // P4 — install the namespace listener BEFORE the initial
242        // snapshot. A node added concurrently between snapshot and
243        // listener-install would otherwise be permanently missed.
244        // The listener's `inner.subscribed.insert(id)` dedups against
245        // the snapshot, so an idempotent overlap is harmless.
246        //
247        // P6 — capture Weak<inner> + Core (clone) instead of the full
248        // Graph clone. This breaks the namespace_sinks → sink → Graph
249        // → namespace_sinks Arc cycle so leaking the handle does not
250        // leak the graph.
251        let weak_graph_inner: Weak<Mutex<GraphInner>> = Arc::downgrade(&self.graph.inner);
252        let core: Core = self.graph.core.clone();
253        let inner_for_ns = self.inner.clone();
254        let sink_for_ns = sink_arc.clone();
255        let ns_sink = Arc::new(move || {
256            let Some(arc_inner) = weak_graph_inner.upgrade() else {
257                // Graph dropped; silent no-op.
258                return;
259            };
260            let graph = Graph {
261                core: core.clone(),
262                inner: arc_inner,
263            };
264            // Scan for any newly-named nodes we haven't subscribed to.
265            let new_nodes: Vec<(String, NodeId)> = {
266                let graph_inner = graph.inner.lock();
267                let state = inner_for_ns.lock();
268                graph_inner
269                    .names
270                    .iter()
271                    .filter(|(_name, id)| !state.subscribed.contains(id))
272                    .map(|(n, id)| (n.clone(), *id))
273                    .collect()
274            };
275            for (name, id) in new_nodes {
276                let should_subscribe = {
277                    let mut state = inner_for_ns.lock();
278                    state.subscribed.insert(id)
279                };
280                if should_subscribe {
281                    let sink_clone = sink_for_ns.clone();
282                    let owned_name = name;
283                    let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
284                        sink_clone(&owned_name, msgs);
285                    });
286                    let sub = graph.subscribe(id, msg_sink);
287                    inner_for_ns.lock().subs.push(sub);
288                }
289            }
290        });
291        self.ns_sink_id = Some(self.graph.subscribe_namespace_change(ns_sink));
292
293        // Now take the initial snapshot. Any node added between
294        // listener-install and snapshot will be picked up by the
295        // listener's idempotent walk — `inner.subscribed.insert(id)`
296        // returns false if we beat the listener to it.
297        let names_to_ids: Vec<(String, NodeId)> = {
298            let graph_inner = self.graph.inner.lock();
299            graph_inner
300                .names
301                .iter()
302                .map(|(n, id)| (n.clone(), *id))
303                .collect()
304        };
305        let initial_count = names_to_ids.len();
306        let to_subscribe: Vec<(String, NodeId)> = {
307            let mut inner = self.inner.lock();
308            names_to_ids
309                .into_iter()
310                .filter(|(_name, id)| inner.subscribed.insert(*id))
311                .collect()
312        };
313        for (name, id) in to_subscribe {
314            let sink_clone = sink_arc.clone();
315            let msg_sink: Sink = Arc::new(move |msgs: &[Message]| {
316                sink_clone(&name, msgs);
317            });
318            let sub = self.graph.subscribe(id, msg_sink);
319            self.inner.lock().subs.push(sub);
320        }
321
322        initial_count
323    }
324}