Skip to main content

queue_core/
queue.rs

1use async_trait::async_trait;
2
3use crate::Result;
4
5/// Publishes work items onto a queue. Cloneable backends let many producers
6/// share one queue.
7#[async_trait]
8pub trait Producer<T: Send>: std::fmt::Debug + Send + Sync {
9    /// Publish an item. May wait (backpressure) until the queue has room.
10    async fn publish(&self, item: T) -> Result<()>;
11}
12
13/// Pulls work items from a queue for processing.
14#[async_trait]
15pub trait Consumer<T: Send>: std::fmt::Debug + Send {
16    /// The next delivery, or `None` once the queue is closed and drained.
17    async fn recv(&mut self) -> Result<Option<Delivery<T>>>;
18}
19
20/// A received item paired with the handle that confirms or returns it.
21///
22/// Processing is complete only when the [`AckHandle`] taken from
23/// [`into_parts`](Self::into_parts) is acked; until then a durable backend may
24/// redeliver after a crash. Dropping a delivery without acking leaves it
25/// unconfirmed (a durable backend redelivers it later).
26#[derive(Debug)]
27pub struct Delivery<T> {
28    item: T,
29    handle: Box<dyn AckHandle>,
30}
31
32impl<T> Delivery<T> {
33    /// Build a delivery from an item and its backend ack handle.
34    pub fn new(item: T, handle: Box<dyn AckHandle>) -> Self {
35        Self { item, handle }
36    }
37
38    /// Split into the item and its ack handle, so the item can be processed and
39    /// the handle acked once the work is durably done.
40    pub fn into_parts(self) -> (T, Box<dyn AckHandle>) {
41        (self.item, self.handle)
42    }
43
44    /// Return the item to the queue for redelivery (processing failed).
45    pub async fn nack(self) -> Result<()> {
46        self.handle.nack().await
47    }
48}
49
50/// Confirms or returns a single delivery. Backend-specific: a no-op for
51/// in-memory channels, a server acknowledgement for a durable broker.
52#[async_trait]
53pub trait AckHandle: std::fmt::Debug + Send {
54    /// Confirm the delivery; it will not be redelivered.
55    async fn ack(self: Box<Self>) -> Result<()>;
56
57    /// Return the delivery for redelivery.
58    async fn nack(self: Box<Self>) -> Result<()>;
59}