1use tokio::sync::mpsc;
4
5use crate::source::Source;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8#[non_exhaustive]
9pub enum QueueOfferResult {
10 Enqueued,
11 Dropped,
12 Failure,
13 QueueClosed,
14}
15
16pub struct SourceQueue<T> {
17 tx: mpsc::UnboundedSender<T>,
18}
19
20impl<T: Send + 'static> SourceQueue<T> {
21 pub fn new() -> (Self, Source<T>) {
23 let (tx, rx) = mpsc::unbounded_channel();
24 (Self { tx }, Source::from_receiver(rx))
25 }
26
27 pub fn offer(&self, value: T) -> QueueOfferResult {
29 match self.tx.send(value) {
30 Ok(()) => QueueOfferResult::Enqueued,
31 Err(_) => QueueOfferResult::QueueClosed,
32 }
33 }
34
35 pub fn complete(self) {
37 drop(self.tx);
38 }
39
40 pub fn is_closed(&self) -> bool {
41 self.tx.is_closed()
42 }
43}
44
45#[cfg(test)]
46mod tests {
47 use super::*;
48 use crate::sink::Sink;
49
50 #[tokio::test]
51 async fn source_queue_delivers_then_completes_on_drop() {
52 let (q, src) = SourceQueue::<i32>::new();
53 let handle = tokio::spawn(async move { Sink::collect(src).await });
54 assert_eq!(q.offer(1), QueueOfferResult::Enqueued);
55 assert_eq!(q.offer(2), QueueOfferResult::Enqueued);
56 q.complete();
57 assert_eq!(handle.await.unwrap(), vec![1, 2]);
58 }
59}