use algocline_core::execution::{ObserverHandle, ObserverRecvError, ProgressEvent};
use tokio::sync::broadcast;
pub struct BroadcastObserverHandle {
rx: broadcast::Receiver<ProgressEvent>,
}
impl BroadcastObserverHandle {
pub(crate) fn new(tx: &broadcast::Sender<ProgressEvent>) -> Self {
Self { rx: tx.subscribe() }
}
}
impl ObserverHandle for BroadcastObserverHandle {
fn recv(
&mut self,
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<ProgressEvent, ObserverRecvError>> + Send + '_>,
> {
Box::pin(async move {
match self.rx.recv().await {
Ok(event) => Ok(event),
Err(broadcast::error::RecvError::Lagged(n)) => Err(ObserverRecvError::Lagged(n)),
Err(broadcast::error::RecvError::Closed) => Err(ObserverRecvError::Closed),
}
})
}
fn try_recv(&mut self) -> Result<ProgressEvent, ObserverRecvError> {
match self.rx.try_recv() {
Ok(event) => Ok(event),
Err(broadcast::error::TryRecvError::Empty) => {
Err(ObserverRecvError::Closed)
}
Err(broadcast::error::TryRecvError::Lagged(n)) => Err(ObserverRecvError::Lagged(n)),
Err(broadcast::error::TryRecvError::Closed) => Err(ObserverRecvError::Closed),
}
}
fn close(self: Box<Self>) {
drop(self);
}
}
#[cfg(test)]
mod tests {
use super::*;
use algocline_core::execution::{ExecutionStateTag, ProgressEvent};
use tokio::sync::broadcast;
fn make_state_transition(from: ExecutionStateTag, to: ExecutionStateTag) -> ProgressEvent {
ProgressEvent::StateTransition { from, to, at: 0 }
}
#[tokio::test]
async fn observe_sink_free() {
let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
drop(_initial_rx);
let event = make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Done);
let _ = tx.send(event);
let mut handle = BroadcastObserverHandle::new(&tx);
let new_event =
make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Paused);
tx.send(new_event).expect("send after subscribe");
let received = handle.recv().await.expect("recv after subscribe");
assert!(matches!(
received,
ProgressEvent::StateTransition {
to: ExecutionStateTag::Paused,
..
}
));
}
#[tokio::test]
async fn observe_multi_subscriber_fan_out() {
let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
drop(_initial_rx);
let mut h1 = BroadcastObserverHandle::new(&tx);
let mut h2 = BroadcastObserverHandle::new(&tx);
let mut h3 = BroadcastObserverHandle::new(&tx);
let events = vec![
make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Paused),
make_state_transition(ExecutionStateTag::Paused, ExecutionStateTag::Running),
make_state_transition(ExecutionStateTag::Running, ExecutionStateTag::Done),
];
for e in &events {
tx.send(e.clone()).expect("send");
}
drop(tx);
for handle in [&mut h1, &mut h2, &mut h3] {
let mut received = Vec::new();
loop {
match handle.recv().await {
Ok(e) => received.push(e),
Err(ObserverRecvError::Closed) => break,
Err(ObserverRecvError::Lagged(n)) => {
panic!("unexpected lag: {n}")
}
}
}
assert_eq!(
received.len(),
3,
"each subscriber must receive all 3 events"
);
}
}
#[tokio::test]
async fn observe_lagged_observable() {
let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(4);
drop(_initial_rx);
let mut slow_handle = BroadcastObserverHandle::new(&tx);
for _ in 0..5 {
tx.send(make_state_transition(
ExecutionStateTag::Running,
ExecutionStateTag::Running,
))
.ok(); }
let result = slow_handle.recv().await;
assert!(
matches!(result, Err(ObserverRecvError::Lagged(_))),
"slow observer must receive Lagged error, got: {result:?}"
);
}
#[tokio::test]
async fn terminal_event_closes_receiver() {
let (tx, _initial_rx) = broadcast::channel::<ProgressEvent>(256);
drop(_initial_rx);
let mut handle = BroadcastObserverHandle::new(&tx);
tx.send(make_state_transition(
ExecutionStateTag::Running,
ExecutionStateTag::Done,
))
.expect("send terminal");
drop(tx);
let first = handle.recv().await.expect("terminal event");
assert!(matches!(
first,
ProgressEvent::StateTransition {
to: ExecutionStateTag::Done,
..
}
));
let result = handle.recv().await;
assert!(
matches!(result, Err(ObserverRecvError::Closed)),
"after sender drop, recv must return Closed, got: {result:?}"
);
}
}