use core::pin::Pin;
use futures::Stream;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tsoracle_consensus::LeaderState;
#[derive(Debug, thiserror::Error)]
pub enum SendError {
#[error("all receivers have been dropped")]
Closed,
}
pub fn leader_event_channel() -> (LeaderEventSender, LeaderEventStream) {
let (tx, rx) = watch::channel(LeaderState::Unknown);
(
LeaderEventSender { tx },
LeaderEventStream {
inner: WatchStream::new(rx),
},
)
}
#[derive(Clone)]
pub struct LeaderEventSender {
tx: watch::Sender<LeaderState>,
}
impl LeaderEventSender {
pub fn send(&self, state: LeaderState) -> Result<(), SendError> {
if *self.tx.borrow() == state {
if self.tx.is_closed() {
return Err(SendError::Closed);
}
return Ok(());
}
self.tx.send(state).map_err(|_| SendError::Closed)
}
}
pub struct LeaderEventStream {
inner: WatchStream<LeaderState>,
}
impl LeaderEventStream {
#[must_use]
pub fn into_pin(self) -> Pin<Box<dyn Stream<Item = LeaderState> + Send>> {
Box::pin(self)
}
}
impl Stream for LeaderEventStream {
type Item = LeaderState;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use tokio::task::yield_now;
use tsoracle_consensus::LeaderState;
use tsoracle_core::Epoch;
#[tokio::test]
async fn stream_yields_changes_polled_between_sends() {
let (sender, mut stream) = leader_event_channel();
assert_eq!(stream.next().await, Some(LeaderState::Unknown));
sender
.send(LeaderState::Leader { epoch: Epoch(1) })
.unwrap();
yield_now().await;
assert_eq!(
stream.next().await,
Some(LeaderState::Leader { epoch: Epoch(1) })
);
sender
.send(LeaderState::Leader { epoch: Epoch(2) })
.unwrap();
yield_now().await;
assert_eq!(
stream.next().await,
Some(LeaderState::Leader { epoch: Epoch(2) })
);
drop(sender);
assert_eq!(stream.next().await, None);
}
#[tokio::test]
async fn send_if_modified_accepts_repeated_payload() {
let (sender, _stream) = leader_event_channel();
let same = LeaderState::Leader { epoch: Epoch(1) };
assert!(sender.send(same.clone()).is_ok());
assert!(sender.send(same).is_ok());
assert!(sender.send(LeaderState::Leader { epoch: Epoch(2) }).is_ok());
}
#[tokio::test]
async fn send_returns_closed_after_stream_dropped() {
let (sender, stream) = leader_event_channel();
drop(stream);
let result = sender.send(LeaderState::Leader { epoch: Epoch(1) });
assert!(matches!(result, Err(SendError::Closed)));
}
#[tokio::test]
async fn send_returns_closed_for_debounced_payload_after_drop() {
let (sender, stream) = leader_event_channel();
sender
.send(LeaderState::Leader { epoch: Epoch(1) })
.expect("first send succeeds while stream alive");
drop(stream);
let result = sender.send(LeaderState::Leader { epoch: Epoch(1) });
assert!(matches!(result, Err(SendError::Closed)));
}
#[tokio::test]
async fn into_pin_yields_stream_with_initial_and_changes() {
let (sender, stream) = leader_event_channel();
let mut pinned = stream.into_pin();
assert_eq!(pinned.next().await, Some(LeaderState::Unknown));
sender
.send(LeaderState::Leader { epoch: Epoch(7) })
.unwrap();
yield_now().await;
assert_eq!(
pinned.next().await,
Some(LeaderState::Leader { epoch: Epoch(7) }),
);
}
}