Skip to main content

queue_channel/
lib.rs

1//! An in-process work queue over a bounded `tokio` mpsc channel.
2//!
3//! [`channel`] returns a cloneable [`ChannelProducer`] and a single
4//! [`ChannelConsumer`]. The bound gives backpressure (publishing waits when the
5//! channel is full); the queue closes and drains once every producer is
6//! dropped.
7//!
8//! Acknowledgement is a no-op: an in-memory channel has nothing to make
9//! durable, so redelivery for the change pipeline comes from the source (the
10//! replication slot only advances once work is confirmed downstream), not the
11//! channel. The [`AckHandle`] surface is still honored so the same engine loop
12//! works unchanged against a durable backend later.
13
14use async_trait::async_trait;
15use queue_core::{AckHandle, Consumer, Delivery, Producer, QueueError, Result};
16use tokio::sync::mpsc;
17
18/// Create a bounded in-process queue with room for `capacity` pending items.
19pub fn channel<T: Send + 'static>(capacity: usize) -> (ChannelProducer<T>, ChannelConsumer<T>) {
20    let (tx, rx) = mpsc::channel(capacity);
21    (ChannelProducer { tx }, ChannelConsumer { rx })
22}
23
24/// The publishing half. Clone it to publish from many tasks.
25pub struct ChannelProducer<T> {
26    tx: mpsc::Sender<T>,
27}
28
29impl<T> Clone for ChannelProducer<T> {
30    fn clone(&self) -> Self {
31        Self {
32            tx: self.tx.clone(),
33        }
34    }
35}
36
37impl<T> std::fmt::Debug for ChannelProducer<T> {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("ChannelProducer").finish_non_exhaustive()
40    }
41}
42
43#[async_trait]
44impl<T: Send + 'static> Producer<T> for ChannelProducer<T> {
45    async fn publish(&self, item: T) -> Result<()> {
46        self.tx.send(item).await.map_err(|_| QueueError::Closed)
47    }
48}
49
50/// The consuming half — a single consumer (clone the producer, not this).
51pub struct ChannelConsumer<T> {
52    rx: mpsc::Receiver<T>,
53}
54
55impl<T> ChannelConsumer<T> {
56    /// Whether no items are currently waiting in the queue — a point-in-time
57    /// snapshot of whether the pipeline has drained everything captured so far.
58    /// The engine reads this at a batch boundary to decide whether a flush has
59    /// *caught up* (see `Sink::flush`'s `caught_up`).
60    pub fn is_empty(&self) -> bool {
61        self.rx.is_empty()
62    }
63}
64
65impl<T> std::fmt::Debug for ChannelConsumer<T> {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        f.debug_struct("ChannelConsumer").finish_non_exhaustive()
68    }
69}
70
71#[async_trait]
72impl<T: Send + 'static> Consumer<T> for ChannelConsumer<T> {
73    async fn recv(&mut self) -> Result<Option<Delivery<T>>> {
74        Ok(self
75            .rx
76            .recv()
77            .await
78            .map(|item| Delivery::new(item, Box::new(ChannelAck))))
79    }
80}
81
82/// In-memory deliveries need no acknowledgement.
83#[derive(Debug)]
84struct ChannelAck;
85
86#[async_trait]
87impl AckHandle for ChannelAck {
88    async fn ack(self: Box<Self>) -> Result<()> {
89        Ok(())
90    }
91
92    async fn nack(self: Box<Self>) -> Result<()> {
93        Ok(())
94    }
95}
96
97#[cfg(test)]
98#[allow(clippy::unwrap_used)]
99mod tests;