Skip to main content

queue_channel/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use async_trait::async_trait;
4use queue_core::{AckHandle, Consumer, Delivery, Producer, QueueError, Result};
5use tokio::sync::mpsc;
6
7/// Create a bounded in-process queue with room for `capacity` pending items.
8pub fn channel<T: Send + 'static>(capacity: usize) -> (ChannelProducer<T>, ChannelConsumer<T>) {
9    let (tx, rx) = mpsc::channel(capacity);
10    (ChannelProducer { tx }, ChannelConsumer { rx })
11}
12
13/// The publishing half. Clone it to publish from many tasks.
14pub struct ChannelProducer<T> {
15    tx: mpsc::Sender<T>,
16}
17
18impl<T> Clone for ChannelProducer<T> {
19    fn clone(&self) -> Self {
20        Self {
21            tx: self.tx.clone(),
22        }
23    }
24}
25
26impl<T> std::fmt::Debug for ChannelProducer<T> {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        f.debug_struct("ChannelProducer").finish_non_exhaustive()
29    }
30}
31
32#[async_trait]
33impl<T: Send + 'static> Producer<T> for ChannelProducer<T> {
34    async fn publish(&self, item: T) -> Result<()> {
35        self.tx.send(item).await.map_err(|_| QueueError::Closed)
36    }
37}
38
39/// The consuming half — a single consumer (clone the producer, not this).
40pub struct ChannelConsumer<T> {
41    rx: mpsc::Receiver<T>,
42}
43
44impl<T> ChannelConsumer<T> {
45    /// Whether no items are currently waiting in the queue — a point-in-time
46    /// snapshot of whether the pipeline has drained everything captured so far.
47    /// The engine reads this at a batch boundary to decide whether a flush has
48    /// *caught up* (see `Sink::flush`'s `caught_up`).
49    pub fn is_empty(&self) -> bool {
50        self.rx.is_empty()
51    }
52}
53
54impl<T> std::fmt::Debug for ChannelConsumer<T> {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("ChannelConsumer").finish_non_exhaustive()
57    }
58}
59
60#[async_trait]
61impl<T: Send + 'static> Consumer<T> for ChannelConsumer<T> {
62    async fn recv(&mut self) -> Result<Option<Delivery<T>>> {
63        Ok(self
64            .rx
65            .recv()
66            .await
67            .map(|item| Delivery::new(item, Box::new(ChannelAck))))
68    }
69}
70
71/// In-memory deliveries need no acknowledgement.
72#[derive(Debug)]
73struct ChannelAck;
74
75#[async_trait]
76impl AckHandle for ChannelAck {
77    async fn ack(self: Box<Self>) -> Result<()> {
78        Ok(())
79    }
80
81    async fn nack(self: Box<Self>) -> Result<()> {
82        Ok(())
83    }
84}
85
86#[cfg(test)]
87#[allow(clippy::unwrap_used)]
88mod tests;