1#![doc = include_str!("../README.md")]
2
3use async_trait::async_trait;
4use queue_core::{AckHandle, Consumer, Delivery, Producer, QueueError, Result};
5use tokio::sync::mpsc;
6
7pub fn channel<T: Send + 'static>(capacity: usize) -> (ChannelProducer<T>, ChannelConsumer<T>) {
9 let (tx, rx) = mpsc::channel(capacity);
10 (ChannelProducer { tx }, ChannelConsumer { rx })
11}
12
13pub struct ChannelProducer<T> {
15 tx: mpsc::Sender<T>,
16}
17
18impl<T> Clone for ChannelProducer<T> {
19 fn clone(&self) -> Self {
20 Self {
21 tx: self.tx.clone(),
22 }
23 }
24}
25
26impl<T> std::fmt::Debug for ChannelProducer<T> {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 f.debug_struct("ChannelProducer").finish_non_exhaustive()
29 }
30}
31
32#[async_trait]
33impl<T: Send + 'static> Producer<T> for ChannelProducer<T> {
34 async fn publish(&self, item: T) -> Result<()> {
35 self.tx.send(item).await.map_err(|_| QueueError::Closed)
36 }
37}
38
39pub struct ChannelConsumer<T> {
41 rx: mpsc::Receiver<T>,
42}
43
44impl<T> ChannelConsumer<T> {
45 pub fn is_empty(&self) -> bool {
50 self.rx.is_empty()
51 }
52}
53
54impl<T> std::fmt::Debug for ChannelConsumer<T> {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("ChannelConsumer").finish_non_exhaustive()
57 }
58}
59
60#[async_trait]
61impl<T: Send + 'static> Consumer<T> for ChannelConsumer<T> {
62 async fn recv(&mut self) -> Result<Option<Delivery<T>>> {
63 Ok(self
64 .rx
65 .recv()
66 .await
67 .map(|item| Delivery::new(item, Box::new(ChannelAck))))
68 }
69}
70
71#[derive(Debug)]
73struct ChannelAck;
74
75#[async_trait]
76impl AckHandle for ChannelAck {
77 async fn ack(self: Box<Self>) -> Result<()> {
78 Ok(())
79 }
80
81 async fn nack(self: Box<Self>) -> Result<()> {
82 Ok(())
83 }
84}
85
86#[cfg(test)]
87#[allow(clippy::unwrap_used)]
88mod tests;