paladin/queue/
mod.rs

1//! Simplified interface for interacting with queues.
2//!
3//! Different queuing systems will provide many different configuration options
4//! and features — we do not attempt to provide a unified interface for all of
5//! them. Rather, we provided a bare minimum interface that is sufficient to
6//! satisfy the semantics of this system. In particular, connection management,
7//! queue declaration, queue consumption, and message publishing are the only
8//! operations that are supported.
9
10use std::fmt::Debug;
11
12use anyhow::Result;
13use async_trait::async_trait;
14use futures::Stream;
15
16use crate::{acker::Acker, serializer::Serializable};
17
18/// The delivery mode of a message.
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
20pub enum DeliveryMode {
21    /// A persistent message will be persisted to disk and will survive a broker
22    /// restart if the queue is durable. If the queue is non-durable, the
23    /// message message will be persisted to disk until it is delivered to a
24    /// consumer or until the broker is restarted.
25    #[default]
26    Persistent,
27    /// An ephemeral message will not be persisted to disk and will be lost if
28    /// the broker is restarted.
29    Ephemeral,
30}
31
32/// The syndication mode of a queue.
33#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
34pub enum SyndicationMode {
35    /// A single-delivery queue will deliver a message to a single consumer.
36    #[default]
37    ExactlyOnce,
38    /// A broadcast queue will deliver a message to all consumers.
39    Broadcast,
40}
41
42/// The durability of a queue.
43#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
44pub enum QueueDurability {
45    /// A non-durable queue will be deleted when the broker is restarted.
46    #[default]
47    NonDurable,
48    /// A durable queue will survive a broker restart.
49    Durable,
50}
51
52/// Queue declaration options.
53#[derive(Clone, Copy, Debug, Default)]
54pub struct QueueOptions {
55    /// The message delivery mode.
56    pub delivery_mode: DeliveryMode,
57    /// The syndication mode.
58    pub syndication_mode: SyndicationMode,
59    /// The durability of the queue.
60    pub durability: QueueDurability,
61}
62
63/// A connection to a queue.
64///
65/// Connections should be cheap to clone such that references need not be passed
66/// around.
67#[async_trait]
68pub trait Connection: Clone {
69    type QueueHandle: QueueHandle;
70
71    /// Close the connection.
72    async fn close(&self) -> Result<()>;
73
74    /// Declare a queue.
75    ///
76    /// Queue declaration should be idempotent, in that it should instantiate a
77    /// queue if it does not exist, and otherwise return the existing queue.
78    async fn declare_queue(&self, name: &str, options: QueueOptions) -> Result<Self::QueueHandle>;
79
80    /// Delete the queue.
81    async fn delete_queue(&self, name: &str) -> Result<()>;
82}
83
84/// A handle to a queue.
85///
86/// Handles should be cheap to clone such that references need not be passed
87/// around.
88#[async_trait]
89pub trait QueueHandle: Clone {
90    type Acker: Acker;
91    type Consumer<T: Serializable>: Stream<Item = (T, Self::Acker)>;
92    type Publisher<T: Serializable>: Publisher<T>;
93
94    fn publisher<PayloadTarget: Serializable>(&self) -> Self::Publisher<PayloadTarget>;
95
96    async fn publish<PayloadTarget: Serializable>(&self, payload: &PayloadTarget) -> Result<()>
97    where
98        Self::Publisher<PayloadTarget>: Send,
99    {
100        self.publisher().publish(payload).await
101    }
102
103    /// Declare a queue consumer.
104    async fn declare_consumer<PayloadTarget: Serializable>(
105        &self,
106        consumer_name: &str,
107    ) -> Result<Self::Consumer<PayloadTarget>>;
108}
109
110pub mod amqp;
111pub mod in_memory;
112mod publisher;
113pub use publisher::*;