queue_core/queue.rs
1use async_trait::async_trait;
2
3use crate::Result;
4
5/// Publishes work items onto a queue. Cloneable backends let many producers
6/// share one queue.
7#[async_trait]
8pub trait Producer<T: Send>: std::fmt::Debug + Send + Sync {
9 /// Publish an item. May wait (backpressure) until the queue has room.
10 async fn publish(&self, item: T) -> Result<()>;
11}
12
13/// Pulls work items from a queue for processing.
14#[async_trait]
15pub trait Consumer<T: Send>: std::fmt::Debug + Send {
16 /// The next delivery, or `None` once the queue is closed and drained.
17 async fn recv(&mut self) -> Result<Option<Delivery<T>>>;
18}
19
20/// A received item paired with the handle that confirms or returns it.
21///
22/// Processing is complete only when the [`AckHandle`] taken from
23/// [`into_parts`](Self::into_parts) is acked; until then a durable backend may
24/// redeliver after a crash. Dropping a delivery without acking leaves it
25/// unconfirmed (a durable backend redelivers it later).
26#[derive(Debug)]
27pub struct Delivery<T> {
28 item: T,
29 handle: Box<dyn AckHandle>,
30}
31
32impl<T> Delivery<T> {
33 /// Build a delivery from an item and its backend ack handle.
34 pub fn new(item: T, handle: Box<dyn AckHandle>) -> Self {
35 Self { item, handle }
36 }
37
38 /// Split into the item and its ack handle, so the item can be processed and
39 /// the handle acked once the work is durably done.
40 pub fn into_parts(self) -> (T, Box<dyn AckHandle>) {
41 (self.item, self.handle)
42 }
43
44 /// Return the item to the queue for redelivery (processing failed).
45 pub async fn nack(self) -> Result<()> {
46 self.handle.nack().await
47 }
48}
49
50/// Confirms or returns a single delivery. Backend-specific: a no-op for
51/// in-memory channels, a server acknowledgement for a durable broker.
52#[async_trait]
53pub trait AckHandle: std::fmt::Debug + Send {
54 /// Confirm the delivery; it will not be redelivered.
55 async fn ack(self: Box<Self>) -> Result<()>;
56
57 /// Return the delivery for redelivery.
58 async fn nack(self: Box<Self>) -> Result<()>;
59}