Skip to main content

algocline_engine/execution/
observer.rs

1//! `BroadcastObserverHandle` — concrete [`ObserverHandle`] implementation backed
2//! by a `tokio::sync::broadcast::Receiver<ProgressEvent>`.
3//!
4//! Multiple handles may exist concurrently; each receives the full event stream
5//! independently (sink-free fan-out, Crux R3).
6
7use algocline_core::execution::{ObserverHandle, ObserverRecvError, ProgressEvent};
8use tokio::sync::broadcast;
9
10// ---------------------------------------------------------------------------
11// BroadcastObserverHandle
12// ---------------------------------------------------------------------------
13
14/// Wraps a `broadcast::Receiver<ProgressEvent>` and implements the
15/// [`ObserverHandle`] trait defined in `algocline-core`.
16///
17/// Obtaining a handle is synchronous (`broadcast::Sender::subscribe()` does not
18/// perform I/O) and the handle is immediately valid — no pre-registered sink is
19/// required (Crux R3 / design-v1.md §5.4).
20pub struct BroadcastObserverHandle {
21    rx: broadcast::Receiver<ProgressEvent>,
22}
23
24impl BroadcastObserverHandle {
25    /// Create a new handle by subscribing to the given broadcast sender.
26    pub(crate) fn new(tx: &broadcast::Sender<ProgressEvent>) -> Self {
27        Self { rx: tx.subscribe() }
28    }
29}
30
31impl ObserverHandle for BroadcastObserverHandle {
32    /// Await the next [`ProgressEvent`] from the broadcast channel.
33    ///
34    /// - Returns `Ok(event)` on success.
35    /// - Returns `Err(ObserverRecvError::Lagged(n))` when the receiver fell behind
36    ///   and `n` events were skipped; the next call resumes from the latest available
37    ///   event (no silent drop — the gap is reported).
38    /// - Returns `Err(ObserverRecvError::Closed)` when the sender is dropped
39    ///   (session terminated).
40    fn recv(
41        &mut self,
42    ) -> std::pin::Pin<
43        Box<dyn std::future::Future<Output = Result<ProgressEvent, ObserverRecvError>> + Send + '_>,
44    > {
45        Box::pin(async move {
46            match self.rx.recv().await {
47                Ok(event) => Ok(event),
48                Err(broadcast::error::RecvError::Lagged(n)) => Err(ObserverRecvError::Lagged(n)),
49                Err(broadcast::error::RecvError::Closed) => Err(ObserverRecvError::Closed),
50            }
51        })
52    }
53
54    /// Non-blocking receive.
55    ///
56    /// Returns an event immediately if one is available, or an appropriate
57    /// `Err` variant otherwise.
58    fn try_recv(&mut self) -> Result<ProgressEvent, ObserverRecvError> {
59        match self.rx.try_recv() {
60            Ok(event) => Ok(event),
61            Err(broadcast::error::TryRecvError::Empty) => {
62                // No event available right now — map to Closed so callers
63                // treat it as "nothing to drain", consistent with the `Empty`
64                // variant not existing on `ObserverRecvError`.
65                // Callers that need non-empty semantics should use `recv()`.
66                Err(ObserverRecvError::Closed)
67            }
68            Err(broadcast::error::TryRecvError::Lagged(n)) => Err(ObserverRecvError::Lagged(n)),
69            Err(broadcast::error::TryRecvError::Closed) => Err(ObserverRecvError::Closed),
70        }
71    }
72
73    fn close(self: Box<Self>) {
74        // Drop self — the receiver is automatically unsubscribed when dropped.
75        // No explicit close call is needed on `broadcast::Receiver`.
76        drop(self);
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use algocline_core::execution::{ExecutionStateTag, ProgressEvent};
84    use tokio::sync::broadcast;
85
86    fn make_state_transition(from: ExecutionStateTag, to: ExecutionStateTag) -> ProgressEvent {
87        ProgressEvent::StateTransition { from, to, at: 0 }
88    }
89
90    // -----------------------------------------------------------------------
91    // observe_sink_free (Crux R3)
92    // -----------------------------------------------------------------------
93
94    /// Sink-free property (Crux R3): a new observer can subscribe at any point
95    /// in the session lifetime without any pre-registration requirement.
96    /// Events published before subscription are NOT delivered (broadcast does not
97    /// buffer past events for new subscribers), but all events published after
98    /// subscription are received correctly.
99    ///
100    /// Note: `tokio::sync::broadcast::Sender::send` returns `Err` when there are
101    /// zero active receivers — this is expected tokio behavior.  The driver_loop
102    /// uses `let _ = bus_tx.send(...)` to tolerate zero-receiver sends gracefully.
103    /// Sink-free means "no pre-registration is required to subscribe", not that
104    /// send must succeed with 0 receivers.
105    #[tokio::test]
106    async fn observe_sink_free() {
107        let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
108        // Drop the initial receiver → 0 subscribers.
109        drop(_initial_rx);
110
111        // Send with 0 receivers: tokio broadcast returns Err here, which is
112        // expected.  The driver tolerates this with `let _ = bus_tx.send(...)`.
113        let event = make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Done);
114        let _ = tx.send(event); // result intentionally ignored
115
116        // Subscribe after the above send; the historical event is NOT delivered
117        // (broadcast does not buffer past events for new subscribers).
118        let mut handle = BroadcastObserverHandle::new(&tx);
119
120        // Publish a new event — the fresh subscriber must receive it.
121        let new_event =
122            make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Paused);
123        tx.send(new_event).expect("send after subscribe");
124
125        let received = handle.recv().await.expect("recv after subscribe");
126        assert!(matches!(
127            received,
128            ProgressEvent::StateTransition {
129                to: ExecutionStateTag::Paused,
130                ..
131            }
132        ));
133    }
134
135    // -----------------------------------------------------------------------
136    // observe_multi_subscriber_fan_out (Crux R3)
137    // -----------------------------------------------------------------------
138
139    /// N=3 independent receivers each get the same event without interfering.
140    #[tokio::test]
141    async fn observe_multi_subscriber_fan_out() {
142        let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
143        drop(_initial_rx);
144
145        let mut h1 = BroadcastObserverHandle::new(&tx);
146        let mut h2 = BroadcastObserverHandle::new(&tx);
147        let mut h3 = BroadcastObserverHandle::new(&tx);
148
149        let events = vec![
150            make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Paused),
151            make_state_transition(ExecutionStateTag::Paused, ExecutionStateTag::Running),
152            make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Done),
153        ];
154
155        for e in &events {
156            tx.send(e.clone()).expect("send");
157        }
158
159        // Drop tx so receivers eventually get Closed after draining.
160        drop(tx);
161
162        for handle in [&mut h1, &mut h2, &mut h3] {
163            let mut received = Vec::new();
164            loop {
165                match handle.recv().await {
166                    Ok(e) => received.push(e),
167                    Err(ObserverRecvError::Closed) => break,
168                    Err(ObserverRecvError::Lagged(n)) => {
169                        panic!("unexpected lag: {n}")
170                    }
171                }
172            }
173            assert_eq!(
174                received.len(),
175                3,
176                "each subscriber must receive all 3 events"
177            );
178        }
179    }
180
181    // -----------------------------------------------------------------------
182    // observe_lagged_observable
183    // -----------------------------------------------------------------------
184
185    /// A slow observer that falls behind capacity gets `RecvError::Lagged(n)`.
186    #[tokio::test]
187    async fn observe_lagged_observable() {
188        // Small capacity so overflow is easy to trigger.
189        let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(4);
190        drop(_initial_rx);
191
192        let mut slow_handle = BroadcastObserverHandle::new(&tx);
193
194        // Publish more events than the channel capacity (4+1=5 events).
195        for _ in 0..5 {
196            tx.send(make_state_transition(
197                ExecutionStateTag::Running,
198                ExecutionStateTag::Running,
199            ))
200            .ok(); // ok() — we intentionally overflow
201        }
202
203        // The slow receiver must see Lagged, not silently drop events.
204        let result = slow_handle.recv().await;
205        assert!(
206            matches!(result, Err(ObserverRecvError::Lagged(_))),
207            "slow observer must receive Lagged error, got: {result:?}"
208        );
209    }
210
211    // -----------------------------------------------------------------------
212    // terminal_event_closes_receiver
213    // -----------------------------------------------------------------------
214
215    /// After the sender is dropped (session terminated), receivers observe Closed.
216    #[tokio::test]
217    async fn terminal_event_closes_receiver() {
218        let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
219        drop(_initial_rx);
220
221        let mut handle = BroadcastObserverHandle::new(&tx);
222
223        // Publish a terminal transition and then drop the sender.
224        tx.send(make_state_transition(
225            ExecutionStateTag::Running,
226            ExecutionStateTag::Done,
227        ))
228        .expect("send terminal");
229        drop(tx);
230
231        // Drain the one event.
232        let first = handle.recv().await.expect("terminal event");
233        assert!(matches!(
234            first,
235            ProgressEvent::StateTransition {
236                to: ExecutionStateTag::Done,
237                ..
238            }
239        ));
240
241        // Next call must return Closed.
242        let result = handle.recv().await;
243        assert!(
244            matches!(result, Err(ObserverRecvError::Closed)),
245            "after sender drop, recv must return Closed, got: {result:?}"
246        );
247    }
248}