algocline_engine/execution/observer.rs
1//! `BroadcastObserverHandle` — concrete [`ObserverHandle`] implementation backed
2//! by a `tokio::sync::broadcast::Receiver<ProgressEvent>`.
3//!
4//! Multiple handles may exist concurrently; each receives the full event stream
5//! independently (sink-free fan-out, Crux R3).
6
7use algocline_core::execution::{ObserverHandle, ObserverRecvError, ProgressEvent};
8use tokio::sync::broadcast;
9
10// ---------------------------------------------------------------------------
11// BroadcastObserverHandle
12// ---------------------------------------------------------------------------
13
14/// Wraps a `broadcast::Receiver<ProgressEvent>` and implements the
15/// [`ObserverHandle`] trait defined in `algocline-core`.
16///
17/// Obtaining a handle is synchronous (`broadcast::Sender::subscribe()` does not
18/// perform I/O) and the handle is immediately valid — no pre-registered sink is
19/// required (Crux R3 / design-v1.md §5.4).
20pub struct BroadcastObserverHandle {
21 rx: broadcast::Receiver<ProgressEvent>,
22}
23
24impl BroadcastObserverHandle {
25 /// Create a new handle by subscribing to the given broadcast sender.
26 pub(crate) fn new(tx: &broadcast::Sender<ProgressEvent>) -> Self {
27 Self { rx: tx.subscribe() }
28 }
29}
30
31impl ObserverHandle for BroadcastObserverHandle {
32 /// Await the next [`ProgressEvent`] from the broadcast channel.
33 ///
34 /// - Returns `Ok(event)` on success.
35 /// - Returns `Err(ObserverRecvError::Lagged(n))` when the receiver fell behind
36 /// and `n` events were skipped; the next call resumes from the latest available
37 /// event (no silent drop — the gap is reported).
38 /// - Returns `Err(ObserverRecvError::Closed)` when the sender is dropped
39 /// (session terminated).
40 fn recv(
41 &mut self,
42 ) -> std::pin::Pin<
43 Box<dyn std::future::Future<Output = Result<ProgressEvent, ObserverRecvError>> + Send + '_>,
44 > {
45 Box::pin(async move {
46 match self.rx.recv().await {
47 Ok(event) => Ok(event),
48 Err(broadcast::error::RecvError::Lagged(n)) => Err(ObserverRecvError::Lagged(n)),
49 Err(broadcast::error::RecvError::Closed) => Err(ObserverRecvError::Closed),
50 }
51 })
52 }
53
54 /// Non-blocking receive.
55 ///
56 /// Returns an event immediately if one is available, or an appropriate
57 /// `Err` variant otherwise.
58 fn try_recv(&mut self) -> Result<ProgressEvent, ObserverRecvError> {
59 match self.rx.try_recv() {
60 Ok(event) => Ok(event),
61 Err(broadcast::error::TryRecvError::Empty) => {
62 // No event available right now — map to Closed so callers
63 // treat it as "nothing to drain", consistent with the `Empty`
64 // variant not existing on `ObserverRecvError`.
65 // Callers that need non-empty semantics should use `recv()`.
66 Err(ObserverRecvError::Closed)
67 }
68 Err(broadcast::error::TryRecvError::Lagged(n)) => Err(ObserverRecvError::Lagged(n)),
69 Err(broadcast::error::TryRecvError::Closed) => Err(ObserverRecvError::Closed),
70 }
71 }
72
73 fn close(self: Box<Self>) {
74 // Drop self — the receiver is automatically unsubscribed when dropped.
75 // No explicit close call is needed on `broadcast::Receiver`.
76 drop(self);
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use algocline_core::execution::{ExecutionStateTag, ProgressEvent};
84 use tokio::sync::broadcast;
85
86 fn make_state_transition(from: ExecutionStateTag, to: ExecutionStateTag) -> ProgressEvent {
87 ProgressEvent::StateTransition { from, to, at: 0 }
88 }
89
90 // -----------------------------------------------------------------------
91 // observe_sink_free (Crux R3)
92 // -----------------------------------------------------------------------
93
94 /// Sink-free property (Crux R3): a new observer can subscribe at any point
95 /// in the session lifetime without any pre-registration requirement.
96 /// Events published before subscription are NOT delivered (broadcast does not
97 /// buffer past events for new subscribers), but all events published after
98 /// subscription are received correctly.
99 ///
100 /// Note: `tokio::sync::broadcast::Sender::send` returns `Err` when there are
101 /// zero active receivers — this is expected tokio behavior. The driver_loop
102 /// uses `let _ = bus_tx.send(...)` to tolerate zero-receiver sends gracefully.
103 /// Sink-free means "no pre-registration is required to subscribe", not that
104 /// send must succeed with 0 receivers.
105 #[tokio::test]
106 async fn observe_sink_free() {
107 let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
108 // Drop the initial receiver → 0 subscribers.
109 drop(_initial_rx);
110
111 // Send with 0 receivers: tokio broadcast returns Err here, which is
112 // expected. The driver tolerates this with `let _ = bus_tx.send(...)`.
113 let event = make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Done);
114 let _ = tx.send(event); // result intentionally ignored
115
116 // Subscribe after the above send; the historical event is NOT delivered
117 // (broadcast does not buffer past events for new subscribers).
118 let mut handle = BroadcastObserverHandle::new(&tx);
119
120 // Publish a new event — the fresh subscriber must receive it.
121 let new_event =
122 make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Paused);
123 tx.send(new_event).expect("send after subscribe");
124
125 let received = handle.recv().await.expect("recv after subscribe");
126 assert!(matches!(
127 received,
128 ProgressEvent::StateTransition {
129 to: ExecutionStateTag::Paused,
130 ..
131 }
132 ));
133 }
134
135 // -----------------------------------------------------------------------
136 // observe_multi_subscriber_fan_out (Crux R3)
137 // -----------------------------------------------------------------------
138
139 /// N=3 independent receivers each get the same event without interfering.
140 #[tokio::test]
141 async fn observe_multi_subscriber_fan_out() {
142 let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
143 drop(_initial_rx);
144
145 let mut h1 = BroadcastObserverHandle::new(&tx);
146 let mut h2 = BroadcastObserverHandle::new(&tx);
147 let mut h3 = BroadcastObserverHandle::new(&tx);
148
149 let events = vec![
150 make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Paused),
151 make_state_transition(ExecutionStateTag::Paused, ExecutionStateTag::Running),
152 make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Done),
153 ];
154
155 for e in &events {
156 tx.send(e.clone()).expect("send");
157 }
158
159 // Drop tx so receivers eventually get Closed after draining.
160 drop(tx);
161
162 for handle in [&mut h1, &mut h2, &mut h3] {
163 let mut received = Vec::new();
164 loop {
165 match handle.recv().await {
166 Ok(e) => received.push(e),
167 Err(ObserverRecvError::Closed) => break,
168 Err(ObserverRecvError::Lagged(n)) => {
169 panic!("unexpected lag: {n}")
170 }
171 }
172 }
173 assert_eq!(
174 received.len(),
175 3,
176 "each subscriber must receive all 3 events"
177 );
178 }
179 }
180
181 // -----------------------------------------------------------------------
182 // observe_lagged_observable
183 // -----------------------------------------------------------------------
184
185 /// A slow observer that falls behind capacity gets `RecvError::Lagged(n)`.
186 #[tokio::test]
187 async fn observe_lagged_observable() {
188 // Small capacity so overflow is easy to trigger.
189 let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(4);
190 drop(_initial_rx);
191
192 let mut slow_handle = BroadcastObserverHandle::new(&tx);
193
194 // Publish more events than the channel capacity (4+1=5 events).
195 for _ in 0..5 {
196 tx.send(make_state_transition(
197 ExecutionStateTag::Running,
198 ExecutionStateTag::Running,
199 ))
200 .ok(); // ok() — we intentionally overflow
201 }
202
203 // The slow receiver must see Lagged, not silently drop events.
204 let result = slow_handle.recv().await;
205 assert!(
206 matches!(result, Err(ObserverRecvError::Lagged(_))),
207 "slow observer must receive Lagged error, got: {result:?}"
208 );
209 }
210
211 // -----------------------------------------------------------------------
212 // terminal_event_closes_receiver
213 // -----------------------------------------------------------------------
214
215 /// After the sender is dropped (session terminated), receivers observe Closed.
216 #[tokio::test]
217 async fn terminal_event_closes_receiver() {
218 let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
219 drop(_initial_rx);
220
221 let mut handle = BroadcastObserverHandle::new(&tx);
222
223 // Publish a terminal transition and then drop the sender.
224 tx.send(make_state_transition(
225 ExecutionStateTag::Running,
226 ExecutionStateTag::Done,
227 ))
228 .expect("send terminal");
229 drop(tx);
230
231 // Drain the one event.
232 let first = handle.recv().await.expect("terminal event");
233 assert!(matches!(
234 first,
235 ProgressEvent::StateTransition {
236 to: ExecutionStateTag::Done,
237 ..
238 }
239 ));
240
241 // Next call must return Closed.
242 let result = handle.recv().await;
243 assert!(
244 matches!(result, Err(ObserverRecvError::Closed)),
245 "after sender drop, recv must return Closed, got: {result:?}"
246 );
247 }
248}