1use tokio::sync::mpsc;
5
6use crate::source::Source;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9#[non_exhaustive]
10pub enum QueueOfferResult {
11 Enqueued,
12 Dropped,
13 Failure,
14 QueueClosed,
15}
16
17pub struct SourceQueue<T> {
18 tx: mpsc::UnboundedSender<T>,
19}
20
21impl<T: Send + 'static> SourceQueue<T> {
22 pub fn new() -> (Self, Source<T>) {
24 let (tx, rx) = mpsc::unbounded_channel();
25 (Self { tx }, Source::from_receiver(rx))
26 }
27
28 pub fn offer(&self, value: T) -> QueueOfferResult {
30 match self.tx.send(value) {
31 Ok(()) => QueueOfferResult::Enqueued,
32 Err(_) => QueueOfferResult::QueueClosed,
33 }
34 }
35
36 pub fn complete(self) {
38 drop(self.tx);
39 }
40
41 pub fn is_closed(&self) -> bool {
42 self.tx.is_closed()
43 }
44}
45
46#[cfg(test)]
47mod tests {
48 use super::*;
49 use crate::sink::Sink;
50
51 #[tokio::test]
52 async fn source_queue_delivers_then_completes_on_drop() {
53 let (q, src) = SourceQueue::<i32>::new();
54 let handle = tokio::spawn(async move { Sink::collect(src).await });
55 assert_eq!(q.offer(1), QueueOfferResult::Enqueued);
56 assert_eq!(q.offer(2), QueueOfferResult::Enqueued);
57 q.complete();
58 assert_eq!(handle.await.unwrap(), vec![1, 2]);
59 }
60}