strev 0.6.0

Event-driven pub/sub messaging library with compile-time ack safety
Documentation
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::{Stream, StreamExt};

use crate::message::{Message, Pending};

pub struct MessageStream {
    inner: ReceiverStream<Message<Pending>>,
}

impl MessageStream {
    pub fn channel(buffer: usize) -> (mpsc::Sender<Message<Pending>>, Self) {
        let (tx, rx) = mpsc::channel(buffer);
        (
            tx,
            Self {
                inner: ReceiverStream::new(rx),
            },
        )
    }
}

impl Stream for MessageStream {
    type Item = Message<Pending>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.inner).poll_next(cx)
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.inner.size_hint()
    }
}

pub async fn bulk_read(
    stream: &mut MessageStream,
    limit: usize,
    timeout: Duration,
) -> Vec<Message<Pending>> {
    let mut messages = Vec::with_capacity(limit);
    let deadline = tokio::time::sleep(timeout);
    tokio::pin!(deadline);

    loop {
        if messages.len() >= limit {
            break;
        }
        tokio::select! {
            _ = &mut deadline => break,
            msg = stream.next() => {
                match msg {
                    Some(m) => messages.push(m),
                    None => break,
                }
            }
        }
    }

    messages
}