1use async_trait::async_trait;
15use queue_core::{AckHandle, Consumer, Delivery, Producer, QueueError, Result};
16use tokio::sync::mpsc;
17
18pub fn channel<T: Send + 'static>(capacity: usize) -> (ChannelProducer<T>, ChannelConsumer<T>) {
20 let (tx, rx) = mpsc::channel(capacity);
21 (ChannelProducer { tx }, ChannelConsumer { rx })
22}
23
24pub struct ChannelProducer<T> {
26 tx: mpsc::Sender<T>,
27}
28
29impl<T> Clone for ChannelProducer<T> {
30 fn clone(&self) -> Self {
31 Self {
32 tx: self.tx.clone(),
33 }
34 }
35}
36
37impl<T> std::fmt::Debug for ChannelProducer<T> {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("ChannelProducer").finish_non_exhaustive()
40 }
41}
42
43#[async_trait]
44impl<T: Send + 'static> Producer<T> for ChannelProducer<T> {
45 async fn publish(&self, item: T) -> Result<()> {
46 self.tx.send(item).await.map_err(|_| QueueError::Closed)
47 }
48}
49
50pub struct ChannelConsumer<T> {
52 rx: mpsc::Receiver<T>,
53}
54
55impl<T> ChannelConsumer<T> {
56 pub fn is_empty(&self) -> bool {
61 self.rx.is_empty()
62 }
63}
64
65impl<T> std::fmt::Debug for ChannelConsumer<T> {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 f.debug_struct("ChannelConsumer").finish_non_exhaustive()
68 }
69}
70
71#[async_trait]
72impl<T: Send + 'static> Consumer<T> for ChannelConsumer<T> {
73 async fn recv(&mut self) -> Result<Option<Delivery<T>>> {
74 Ok(self
75 .rx
76 .recv()
77 .await
78 .map(|item| Delivery::new(item, Box::new(ChannelAck))))
79 }
80}
81
82#[derive(Debug)]
84struct ChannelAck;
85
86#[async_trait]
87impl AckHandle for ChannelAck {
88 async fn ack(self: Box<Self>) -> Result<()> {
89 Ok(())
90 }
91
92 async fn nack(self: Box<Self>) -> Result<()> {
93 Ok(())
94 }
95}
96
97#[cfg(test)]
98#[allow(clippy::unwrap_used)]
99mod tests;