paladin/queue/
amqp.rs

1//! AMQP queue binding using [`lapin`].
2//!
3//! # Example
4//!
5//! ```no_run
6//! use paladin::{
7//!     acker::Acker,
8//!     queue::{
9//!         Connection, QueueOptions, QueueHandle,
10//!         amqp::{AMQPConnection, AMQPConnectionOptions}
11//!     }
12//! };
13//! use serde::{Serialize, Deserialize};
14//! use anyhow::Result;
15//! use futures::StreamExt;
16//!
17//! #[derive(Serialize, Deserialize)]
18//! struct MyStruct {
19//!     field: String,
20//! }
21//!
22//! #[tokio::main]
23//! async fn main() -> Result<()> {
24//!     let conn = AMQPConnection::new(AMQPConnectionOptions {
25//!         uri: "amqp://localhost:5672",
26//!         qos: Some(1),
27//!         serializer: Default::default(),
28//!     }).await?;
29//!     let queue = conn.declare_queue("my_queue", QueueOptions::default()).await?;
30//!
31//!     // Publish a message
32//!     queue.publish(&MyStruct { field: "hello world".to_string() }).await?;
33//!
34//!     let mut consumer = queue.declare_consumer::<MyStruct>("my_consumer").await?;
35//!     while let Some((payload, delivery)) = consumer.next().await {
36//!         // ...
37//!         delivery.ack().await?;
38//!         break;
39//!     }
40//!     // ...
41//!
42//!     Ok(())
43//! }
44//! ```
45use std::{
46    pin::Pin,
47    sync::Arc,
48    task::{Context, Poll},
49};
50
51use anyhow::Result;
52use async_trait::async_trait;
53use dashmap::{mapref::entry::Entry, DashMap};
54use futures::Stream;
55use lapin::{options::QueueDeleteOptions, types::FieldTable, ExchangeKind};
56use pin_project::pin_project;
57use tracing::{error, instrument};
58
59use super::{
60    Connection, DeliveryMode, Publisher, QueueDurability, QueueHandle, QueueOptions,
61    SyndicationMode,
62};
63use crate::{
64    acker::Acker,
65    serializer::{Serializable, Serializer},
66};
67
68impl DeliveryMode {
69    fn into_message_properties(self) -> lapin::BasicProperties {
70        match self {
71            DeliveryMode::Persistent => lapin::BasicProperties::default().with_delivery_mode(2),
72            DeliveryMode::Ephemeral => lapin::BasicProperties::default(),
73        }
74    }
75}
76
77impl QueueDurability {
78    fn into_queue_declare_options(self) -> lapin::options::QueueDeclareOptions {
79        match self {
80            QueueDurability::NonDurable => lapin::options::QueueDeclareOptions::default(),
81            QueueDurability::Durable => lapin::options::QueueDeclareOptions {
82                durable: true,
83                ..Default::default()
84            },
85        }
86    }
87
88    fn into_exchange_declare_options(self) -> lapin::options::ExchangeDeclareOptions {
89        match self {
90            QueueDurability::NonDurable => lapin::options::ExchangeDeclareOptions::default(),
91            QueueDurability::Durable => lapin::options::ExchangeDeclareOptions {
92                durable: true,
93                ..Default::default()
94            },
95        }
96    }
97}
98
99impl QueueOptions {
100    fn into_declare_arguments(self) -> FieldTable {
101        let mut arguments = FieldTable::default();
102        if let QueueDurability::NonDurable = self.durability {
103            // Auto expire non-durable queues after 30 minutes of inactivity
104            arguments.insert("x-expires".into(), 1800000.into());
105        }
106        arguments
107    }
108
109    fn into_queue_declare_options(self) -> lapin::options::QueueDeclareOptions {
110        self.durability.into_queue_declare_options()
111    }
112
113    fn into_exchange_declare_options(self) -> lapin::options::ExchangeDeclareOptions {
114        self.durability.into_exchange_declare_options()
115    }
116
117    fn into_message_properties(self) -> lapin::BasicProperties {
118        self.delivery_mode.into_message_properties()
119    }
120}
121
122/// Options for creating an [`AMQPConnection`].
123pub struct AMQPConnectionOptions<'a> {
124    /// The AMQP URI to connect to.
125    pub uri: &'a str,
126    /// The Quality of Service to use for the queue.
127    /// This determines how many unacknowledged messages the broker will deliver
128    /// to the consumer before requiring acknowledgements. By setting this,
129    /// you can control the rate at which messages are delivered to a consumer,
130    /// thus affecting throughput and ensuring that a single consumer
131    /// doesn't get overwhelmed. See <https://www.rabbitmq.com/consumer-prefetch.html>
132    pub qos: Option<u16>,
133    pub serializer: Serializer,
134}
135
136/// A instance of a connection to an AMQP queue.
137///
138/// # Example
139/// ```no_run
140/// use paladin::queue::{
141///     amqp::{AMQPConnection, AMQPConnectionOptions}
142/// };
143/// # use anyhow::Result;
144/// # #[tokio::main]
145/// # async fn main() -> Result<()> {
146/// let conn = AMQPConnection::new(AMQPConnectionOptions {
147///     uri: "amqp://localhost:5672",
148///     qos: Some(1),
149///     serializer: Default::default(),
150/// }).await?;
151///
152/// Ok(())
153/// # }
154/// ```
155#[derive(Clone, Debug)]
156pub struct AMQPConnection {
157    channel: lapin::Channel,
158    connection: Arc<lapin::Connection>,
159    serializer: Serializer,
160    queue_options: DashMap<String, QueueOptions>,
161}
162
163impl AMQPConnection {
164    /// Get a handle to the connection.
165    pub async fn new(
166        AMQPConnectionOptions {
167            uri,
168            qos,
169            serializer,
170        }: AMQPConnectionOptions<'_>,
171    ) -> Result<Self> {
172        let options = lapin::ConnectionProperties::default()
173            .with_executor(tokio_executor_trait::Tokio::current())
174            .with_reactor(tokio_reactor_trait::Tokio);
175
176        let connection = lapin::Connection::connect(uri, options).await?;
177        let channel = connection.create_channel().await?;
178
179        channel
180            .basic_qos(qos.unwrap_or(1), Default::default())
181            .await?;
182
183        Ok(AMQPConnection {
184            channel,
185            connection: Arc::new(connection),
186            serializer,
187            queue_options: DashMap::new(),
188        })
189    }
190
191    async fn declare_broadcast(
192        &self,
193        name: &str,
194        options: QueueOptions,
195    ) -> Result<<AMQPConnection as Connection>::QueueHandle> {
196        FieldTable::default().insert("x-expires".into(), 60000.into());
197        self.channel
198            .exchange_declare(
199                name,
200                ExchangeKind::Fanout,
201                options.into_exchange_declare_options(),
202                options.into_declare_arguments(),
203            )
204            .await?;
205
206        self.channel
207            .queue_declare(
208                name,
209                options.into_queue_declare_options(),
210                options.into_declare_arguments(),
211            )
212            .await?;
213
214        self.channel
215            .queue_bind(
216                name,
217                name,
218                "", // Routing key is ignored in fanout exchanges
219                Default::default(),
220                Default::default(),
221            )
222            .await?;
223
224        Ok(AMQPQueueHandle {
225            channel: self.channel.clone(),
226            name: name.to_string(),
227            serializer: self.serializer,
228            options,
229        })
230    }
231
232    async fn delete_broadcast(&self, name: &str) -> Result<()> {
233        self.channel
234            .queue_delete(name, QueueDeleteOptions::default())
235            .await?;
236
237        self.channel
238            .exchange_delete(name, Default::default())
239            .await?;
240
241        Ok(())
242    }
243
244    async fn declare_single(
245        &self,
246        name: &str,
247        options: QueueOptions,
248    ) -> Result<<AMQPConnection as Connection>::QueueHandle> {
249        self.channel
250            .queue_declare(
251                name,
252                options.into_queue_declare_options(),
253                options.into_declare_arguments(),
254            )
255            .await?;
256
257        Ok(AMQPQueueHandle {
258            channel: self.channel.clone(),
259            name: name.to_string(),
260            serializer: self.serializer,
261            options,
262        })
263    }
264
265    async fn delete_single(&self, name: &str) -> Result<()> {
266        self.channel
267            .queue_delete(name, QueueDeleteOptions::default())
268            .await?;
269
270        Ok(())
271    }
272}
273
274#[async_trait]
275impl Connection for AMQPConnection {
276    type QueueHandle = AMQPQueueHandle;
277
278    async fn close(&self) -> Result<()> {
279        _ = self.channel.close(200, "Goodbye").await;
280        _ = self.connection.close(200, "Goodbye").await;
281
282        Ok(())
283    }
284
285    /// Declare an AMQP queue with the given name.
286    ///
287    /// ```no_run
288    /// # use paladin::{
289    ///     queue::{Connection, QueueOptions, amqp::{AMQPConnection, AMQPConnectionOptions}}
290    /// };
291    /// # use anyhow::Result;
292    /// # #[tokio::main]
293    /// # async fn main() -> Result<()> {
294    /// let conn = AMQPConnection::new(AMQPConnectionOptions {
295    ///     uri: "amqp://localhost:5672",
296    ///     qos: Some(1),
297    ///     serializer: Default::default(),
298    /// }).await?;
299    /// let queue = conn.declare_queue("my_queue", QueueOptions::default()).await?;
300    ///
301    /// Ok(())
302    /// # }
303    async fn declare_queue(&self, name: &str, options: QueueOptions) -> Result<Self::QueueHandle> {
304        match options.syndication_mode {
305            SyndicationMode::ExactlyOnce => self.declare_single(name, options).await,
306            SyndicationMode::Broadcast => self.declare_broadcast(name, options).await,
307        }
308    }
309
310    async fn delete_queue(&self, name: &str) -> Result<()> {
311        match self.queue_options.entry(name.to_string()) {
312            Entry::Occupied(options) => {
313                match options.get().syndication_mode {
314                    SyndicationMode::ExactlyOnce => self.delete_single(name).await?,
315                    SyndicationMode::Broadcast => self.delete_broadcast(name).await?,
316                };
317
318                options.remove();
319
320                Ok(())
321            }
322            Entry::Vacant(_) => self.delete_broadcast(name).await,
323        }
324    }
325}
326
327impl AMQPQueueHandle {
328    async fn publish_broadcast<PayloadTarget: Serializable>(
329        &self,
330        payload: &PayloadTarget,
331    ) -> Result<()> {
332        self.channel
333            .basic_publish(
334                &self.name,
335                "", // Routing key is ignored in fanout exchanges
336                Default::default(),
337                &self.serializer.to_bytes(payload)?,
338                self.options.into_message_properties(),
339            )
340            .await?
341            .await?;
342
343        Ok(())
344    }
345
346    async fn publish_single<PayloadTarget: Serializable>(
347        &self,
348        payload: &PayloadTarget,
349    ) -> Result<()> {
350        self.channel
351            .basic_publish(
352                "",
353                &self.name,
354                Default::default(),
355                &self.serializer.to_bytes(payload)?,
356                self.options.into_message_properties(),
357            )
358            .await?
359            .await?;
360
361        Ok(())
362    }
363}
364
365/// A handle to an AMQP queue.
366#[derive(Clone)]
367pub struct AMQPQueueHandle {
368    channel: lapin::Channel,
369    name: String,
370    serializer: Serializer,
371    options: QueueOptions,
372}
373
374pub struct AMQPPublisher<T> {
375    queue_handle: AMQPQueueHandle,
376    _phantom: std::marker::PhantomData<T>,
377}
378
379impl<T> AMQPPublisher<T> {
380    pub fn new(queue_handle: AMQPQueueHandle) -> Self {
381        Self {
382            queue_handle,
383            _phantom: std::marker::PhantomData,
384        }
385    }
386}
387
388#[async_trait]
389impl<T: Serializable> Publisher<T> for AMQPPublisher<T> {
390    /// Publish a message to the queue.
391    ///
392    /// # Example
393    /// ```no_run
394    /// use paladin::{
395    ///     queue::{
396    ///         Connection,
397    ///         QueueHandle,
398    ///         QueueOptions,
399    ///         amqp::{AMQPConnection, AMQPConnectionOptions}
400    ///     }
401    /// };
402    /// use serde::{Serialize, Deserialize};
403    /// use anyhow::Result;
404    ///
405    /// #[derive(Serialize, Deserialize)]
406    /// struct MyStruct {
407    ///     field: String,
408    /// }
409    ///
410    /// #[tokio::main]
411    /// async fn main() -> Result<()> {
412    ///     let conn = AMQPConnection::new(AMQPConnectionOptions {
413    ///         uri: "amqp://localhost:5672",
414    ///         qos: Some(1),
415    ///         serializer: Default::default(),
416    ///     }).await?;
417    ///     let queue = conn.declare_queue("my_queue", QueueOptions::default()).await?;
418    ///
419    ///     let payload = MyStruct {
420    ///        field: "hello world".to_string(),
421    ///     };
422    ///
423    ///     queue.publish(&payload).await?;
424    ///
425    ///     Ok(())
426    /// }
427    #[instrument(skip_all, level = "trace")]
428    async fn publish(&self, payload: &T) -> Result<()> {
429        match self.queue_handle.options.syndication_mode {
430            SyndicationMode::ExactlyOnce => self.queue_handle.publish_single(payload).await,
431            SyndicationMode::Broadcast => self.queue_handle.publish_broadcast(payload).await,
432        }
433    }
434
435    async fn close(&self) -> Result<()> {
436        Ok(())
437    }
438}
439
440#[async_trait]
441impl QueueHandle for AMQPQueueHandle {
442    type Acker = AMQPAcker;
443    type Consumer<PayloadTarget: Serializable> = AMQPConsumer<PayloadTarget>;
444    type Publisher<PayloadTarget: Serializable> = AMQPPublisher<PayloadTarget>;
445
446    fn publisher<PayloadTarget: Serializable>(&self) -> Self::Publisher<PayloadTarget> {
447        AMQPPublisher::new(self.clone())
448    }
449
450    /// Get a consumer instance to the queue.
451    /// ```no_run
452    /// use paladin::{
453    ///     queue::{
454    ///         Connection,
455    ///         QueueHandle,
456    ///         QueueOptions,
457    ///         amqp::{AMQPConnection, AMQPConnectionOptions}
458    ///     }
459    /// };
460    /// use serde::{Serialize, Deserialize};
461    /// use anyhow::Result;
462    ///
463    /// #[derive(Serialize, Deserialize)]
464    /// struct MyStruct {
465    ///     field: String,
466    /// }
467    ///
468    /// #[tokio::main]
469    /// async fn main() -> Result<()> {
470    ///     let conn = AMQPConnection::new(AMQPConnectionOptions {
471    ///         uri: "amqp://localhost:5672",
472    ///         qos: Some(1),
473    ///         serializer: Default::default(),
474    ///     }).await?;
475    ///     let queue = conn.declare_queue("my_queue", QueueOptions::default()).await?;
476    ///
477    ///     let consumer = queue.declare_consumer::<MyStruct>("my_consumer").await?;
478    ///
479    ///     Ok(())
480    /// }
481    #[instrument(skip(self), level = "trace")]
482    async fn declare_consumer<PayloadTarget: Serializable>(
483        &self,
484        consumer_name: &str,
485    ) -> Result<Self::Consumer<PayloadTarget>> {
486        let consumer = self
487            .channel
488            .basic_consume(
489                &self.name,
490                consumer_name,
491                Default::default(),
492                Default::default(),
493            )
494            .await?;
495
496        Ok(AMQPConsumer {
497            inner: consumer,
498            serializer: self.serializer,
499            _phantom: std::marker::PhantomData,
500        })
501    }
502}
503
504#[pin_project]
505pub struct AMQPConsumer<PayloadTarget> {
506    #[pin]
507    inner: lapin::Consumer,
508    serializer: Serializer,
509    _phantom: std::marker::PhantomData<PayloadTarget>,
510}
511
512impl<PayloadTarget: Serializable> Stream for AMQPConsumer<PayloadTarget> {
513    type Item = (PayloadTarget, AMQPAcker);
514
515    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
516        let this = self.project();
517        match this.inner.poll_next(cx) {
518            Poll::Ready(Some(Ok(delivery))) => {
519                let payload = this.serializer.from_bytes(&delivery.data);
520                match payload {
521                    Ok(payload) => Poll::Ready(Some((payload, AMQPAcker { delivery }))),
522                    Err(err) => {
523                        error!("Error deserializing message, error: {err}");
524                        Poll::Pending
525                    }
526                }
527            }
528            Poll::Ready(Some(Err(err))) => {
529                let err = anyhow::Error::from(err);
530                error!("Error receiving message, error: {err}",);
531                Poll::Pending
532            }
533            Poll::Ready(None) => Poll::Ready(None),
534            Poll::Pending => Poll::Pending,
535        }
536    }
537}
538
539#[derive(Debug)]
540/// An acker for an AMQP consumer.
541pub struct AMQPAcker {
542    delivery: lapin::message::Delivery,
543}
544
545#[async_trait]
546impl Acker for AMQPAcker {
547    async fn ack(&self) -> Result<()> {
548        Ok(self.delivery.ack(Default::default()).await?)
549    }
550
551    async fn nack(&self) -> Result<()> {
552        Ok(self.delivery.nack(Default::default()).await?)
553    }
554}