Skip to main content

atomr_streams/
queue.rs

1//! Source.Queue — a source fed by an explicit producer handle.
2
3use tokio::sync::mpsc;
4
5use crate::source::Source;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[non_exhaustive]
9pub enum QueueOfferResult {
10    Enqueued,
11    Dropped,
12    Failure,
13    QueueClosed,
14}
15
16pub struct SourceQueue<T> {
17    tx: mpsc::UnboundedSender<T>,
18}
19
20impl<T: Send + 'static> SourceQueue<T> {
21    /// Create a source + producer pair.
22    pub fn new() -> (Self, Source<T>) {
23        let (tx, rx) = mpsc::unbounded_channel();
24        (Self { tx }, Source::from_receiver(rx))
25    }
26
27    /// Offer an element synchronously. `QueueClosed` if the downstream stopped.
28    pub fn offer(&self, value: T) -> QueueOfferResult {
29        match self.tx.send(value) {
30            Ok(()) => QueueOfferResult::Enqueued,
31            Err(_) => QueueOfferResult::QueueClosed,
32        }
33    }
34
35    /// Close the queue so the source completes normally.
36    pub fn complete(self) {
37        drop(self.tx);
38    }
39
40    pub fn is_closed(&self) -> bool {
41        self.tx.is_closed()
42    }
43}
44
45#[cfg(test)]
46mod tests {
47    use super::*;
48    use crate::sink::Sink;
49
50    #[tokio::test]
51    async fn source_queue_delivers_then_completes_on_drop() {
52        let (q, src) = SourceQueue::<i32>::new();
53        let handle = tokio::spawn(async move { Sink::collect(src).await });
54        assert_eq!(q.offer(1), QueueOfferResult::Enqueued);
55        assert_eq!(q.offer(2), QueueOfferResult::Enqueued);
56        q.complete();
57        assert_eq!(handle.await.unwrap(), vec![1, 2]);
58    }
59}