Skip to main content

atomr_streams/
queue.rs

1//! Source.Queue — a source fed by an explicit producer handle.
2//! akka.net: `ISourceQueue`, `Source.Queue`.
3
4use tokio::sync::mpsc;
5
6use crate::source::Source;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9#[non_exhaustive]
10pub enum QueueOfferResult {
11    Enqueued,
12    Dropped,
13    Failure,
14    QueueClosed,
15}
16
17pub struct SourceQueue<T> {
18    tx: mpsc::UnboundedSender<T>,
19}
20
21impl<T: Send + 'static> SourceQueue<T> {
22    /// Create a source + producer pair. akka.net: `Source.Queue<T>(size, overflow)`.
23    pub fn new() -> (Self, Source<T>) {
24        let (tx, rx) = mpsc::unbounded_channel();
25        (Self { tx }, Source::from_receiver(rx))
26    }
27
28    /// Offer an element synchronously. `QueueClosed` if the downstream stopped.
29    pub fn offer(&self, value: T) -> QueueOfferResult {
30        match self.tx.send(value) {
31            Ok(()) => QueueOfferResult::Enqueued,
32            Err(_) => QueueOfferResult::QueueClosed,
33        }
34    }
35
36    /// Close the queue so the source completes normally.
37    pub fn complete(self) {
38        drop(self.tx);
39    }
40
41    pub fn is_closed(&self) -> bool {
42        self.tx.is_closed()
43    }
44}
45
46#[cfg(test)]
47mod tests {
48    use super::*;
49    use crate::sink::Sink;
50
51    #[tokio::test]
52    async fn source_queue_delivers_then_completes_on_drop() {
53        let (q, src) = SourceQueue::<i32>::new();
54        let handle = tokio::spawn(async move { Sink::collect(src).await });
55        assert_eq!(q.offer(1), QueueOfferResult::Enqueued);
56        assert_eq!(q.offer(2), QueueOfferResult::Enqueued);
57        q.complete();
58        assert_eq!(handle.await.unwrap(), vec![1, 2]);
59    }
60}