use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use tokio::sync::broadcast;
use tokio_util::sync::ReusableBoxFuture;
use crate::json_rpc::SequencedEvent;
#[must_use = "dropping the Subscription immediately unsubscribes"]
pub struct Subscription {
on_drop: Option<Box<dyn FnOnce() + Send + Sync>>,
}
impl Subscription {
pub fn new(on_drop: impl FnOnce() + Send + Sync + 'static) -> Self {
Self {
on_drop: Some(Box::new(on_drop)),
}
}
pub fn noop() -> Self {
Self { on_drop: None }
}
pub fn detach(mut self) {
self.on_drop = None;
}
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("active", &self.on_drop.is_some())
.finish()
}
}
impl Drop for Subscription {
fn drop(&mut self) {
if let Some(on_drop) = self.on_drop.take() {
on_drop();
}
}
}
pub struct ByteStream {
inner: ReusableBoxFuture<'static, (Result<Vec<u8>, broadcast::error::RecvError>, broadcast::Receiver<Vec<u8>>)>,
}
impl ByteStream {
pub fn new(rx: broadcast::Receiver<Vec<u8>>) -> Self {
Self {
inner: ReusableBoxFuture::new(recv_bytes(rx)),
}
}
}
async fn recv_bytes(
mut rx: broadcast::Receiver<Vec<u8>>,
) -> (Result<Vec<u8>, broadcast::error::RecvError>, broadcast::Receiver<Vec<u8>>) {
let result = rx.recv().await;
(result, rx)
}
impl Stream for ByteStream {
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let (result, rx) = match self.inner.poll(cx) {
Poll::Ready(value) => value,
Poll::Pending => return Poll::Pending,
};
self.inner.set(recv_bytes(rx));
match result {
Ok(bytes) => return Poll::Ready(Some(bytes)),
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return Poll::Ready(None),
}
}
}
}
struct ReplayState {
buffered: VecDeque<SequencedEvent>,
rx: broadcast::Receiver<SequencedEvent>,
last_delivered: i64,
}
pub fn subscribe_with_replay(
buffered: VecDeque<SequencedEvent>,
rx: broadcast::Receiver<SequencedEvent>,
start_after: i64,
replay: bool,
) -> impl Stream<Item = SequencedEvent> + Send {
let buffered = if replay { buffered } else { VecDeque::new() };
let state = ReplayState {
buffered,
rx,
last_delivered: start_after,
};
futures::stream::unfold(state, |mut state| async move {
while let Some(event) = state.buffered.pop_front() {
if event.sequence_number <= state.last_delivered {
continue;
}
state.last_delivered = event.sequence_number;
return Some((event, state));
}
loop {
match state.rx.recv().await {
Ok(event) => {
if event.sequence_number <= state.last_delivered {
continue;
}
state.last_delivered = event.sequence_number;
return Some((event, state));
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
}
}
})
}