paladin/queue/mod.rs
1//! Simplified interface for interacting with queues.
2//!
3//! Different queuing systems will provide many different configuration options
4//! and features — we do not attempt to provide a unified interface for all of
5//! them. Rather, we provided a bare minimum interface that is sufficient to
6//! satisfy the semantics of this system. In particular, connection management,
7//! queue declaration, queue consumption, and message publishing are the only
8//! operations that are supported.
9
10use std::fmt::Debug;
11
12use anyhow::Result;
13use async_trait::async_trait;
14use futures::Stream;
15
16use crate::{acker::Acker, serializer::Serializable};
17
18/// The delivery mode of a message.
19#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
20pub enum DeliveryMode {
21 /// A persistent message will be persisted to disk and will survive a broker
22 /// restart if the queue is durable. If the queue is non-durable, the
23 /// message message will be persisted to disk until it is delivered to a
24 /// consumer or until the broker is restarted.
25 #[default]
26 Persistent,
27 /// An ephemeral message will not be persisted to disk and will be lost if
28 /// the broker is restarted.
29 Ephemeral,
30}
31
32/// The syndication mode of a queue.
33#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
34pub enum SyndicationMode {
35 /// A single-delivery queue will deliver a message to a single consumer.
36 #[default]
37 ExactlyOnce,
38 /// A broadcast queue will deliver a message to all consumers.
39 Broadcast,
40}
41
42/// The durability of a queue.
43#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
44pub enum QueueDurability {
45 /// A non-durable queue will be deleted when the broker is restarted.
46 #[default]
47 NonDurable,
48 /// A durable queue will survive a broker restart.
49 Durable,
50}
51
52/// Queue declaration options.
53#[derive(Clone, Copy, Debug, Default)]
54pub struct QueueOptions {
55 /// The message delivery mode.
56 pub delivery_mode: DeliveryMode,
57 /// The syndication mode.
58 pub syndication_mode: SyndicationMode,
59 /// The durability of the queue.
60 pub durability: QueueDurability,
61}
62
63/// A connection to a queue.
64///
65/// Connections should be cheap to clone such that references need not be passed
66/// around.
67#[async_trait]
68pub trait Connection: Clone {
69 type QueueHandle: QueueHandle;
70
71 /// Close the connection.
72 async fn close(&self) -> Result<()>;
73
74 /// Declare a queue.
75 ///
76 /// Queue declaration should be idempotent, in that it should instantiate a
77 /// queue if it does not exist, and otherwise return the existing queue.
78 async fn declare_queue(&self, name: &str, options: QueueOptions) -> Result<Self::QueueHandle>;
79
80 /// Delete the queue.
81 async fn delete_queue(&self, name: &str) -> Result<()>;
82}
83
84/// A handle to a queue.
85///
86/// Handles should be cheap to clone such that references need not be passed
87/// around.
88#[async_trait]
89pub trait QueueHandle: Clone {
90 type Acker: Acker;
91 type Consumer<T: Serializable>: Stream<Item = (T, Self::Acker)>;
92 type Publisher<T: Serializable>: Publisher<T>;
93
94 fn publisher<PayloadTarget: Serializable>(&self) -> Self::Publisher<PayloadTarget>;
95
96 async fn publish<PayloadTarget: Serializable>(&self, payload: &PayloadTarget) -> Result<()>
97 where
98 Self::Publisher<PayloadTarget>: Send,
99 {
100 self.publisher().publish(payload).await
101 }
102
103 /// Declare a queue consumer.
104 async fn declare_consumer<PayloadTarget: Serializable>(
105 &self,
106 consumer_name: &str,
107 ) -> Result<Self::Consumer<PayloadTarget>>;
108}
109
110pub mod amqp;
111pub mod in_memory;
112mod publisher;
113pub use publisher::*;