1use std::{
46 pin::Pin,
47 sync::Arc,
48 task::{Context, Poll},
49};
50
51use anyhow::Result;
52use async_trait::async_trait;
53use dashmap::{mapref::entry::Entry, DashMap};
54use futures::Stream;
55use lapin::{options::QueueDeleteOptions, types::FieldTable, ExchangeKind};
56use pin_project::pin_project;
57use tracing::{error, instrument};
58
59use super::{
60 Connection, DeliveryMode, Publisher, QueueDurability, QueueHandle, QueueOptions,
61 SyndicationMode,
62};
63use crate::{
64 acker::Acker,
65 serializer::{Serializable, Serializer},
66};
67
68impl DeliveryMode {
69 fn into_message_properties(self) -> lapin::BasicProperties {
70 match self {
71 DeliveryMode::Persistent => lapin::BasicProperties::default().with_delivery_mode(2),
72 DeliveryMode::Ephemeral => lapin::BasicProperties::default(),
73 }
74 }
75}
76
77impl QueueDurability {
78 fn into_queue_declare_options(self) -> lapin::options::QueueDeclareOptions {
79 match self {
80 QueueDurability::NonDurable => lapin::options::QueueDeclareOptions::default(),
81 QueueDurability::Durable => lapin::options::QueueDeclareOptions {
82 durable: true,
83 ..Default::default()
84 },
85 }
86 }
87
88 fn into_exchange_declare_options(self) -> lapin::options::ExchangeDeclareOptions {
89 match self {
90 QueueDurability::NonDurable => lapin::options::ExchangeDeclareOptions::default(),
91 QueueDurability::Durable => lapin::options::ExchangeDeclareOptions {
92 durable: true,
93 ..Default::default()
94 },
95 }
96 }
97}
98
99impl QueueOptions {
100 fn into_declare_arguments(self) -> FieldTable {
101 let mut arguments = FieldTable::default();
102 if let QueueDurability::NonDurable = self.durability {
103 arguments.insert("x-expires".into(), 1800000.into());
105 }
106 arguments
107 }
108
109 fn into_queue_declare_options(self) -> lapin::options::QueueDeclareOptions {
110 self.durability.into_queue_declare_options()
111 }
112
113 fn into_exchange_declare_options(self) -> lapin::options::ExchangeDeclareOptions {
114 self.durability.into_exchange_declare_options()
115 }
116
117 fn into_message_properties(self) -> lapin::BasicProperties {
118 self.delivery_mode.into_message_properties()
119 }
120}
121
122pub struct AMQPConnectionOptions<'a> {
124 pub uri: &'a str,
126 pub qos: Option<u16>,
133 pub serializer: Serializer,
134}
135
136#[derive(Clone, Debug)]
156pub struct AMQPConnection {
157 channel: lapin::Channel,
158 connection: Arc<lapin::Connection>,
159 serializer: Serializer,
160 queue_options: DashMap<String, QueueOptions>,
161}
162
163impl AMQPConnection {
164 pub async fn new(
166 AMQPConnectionOptions {
167 uri,
168 qos,
169 serializer,
170 }: AMQPConnectionOptions<'_>,
171 ) -> Result<Self> {
172 let options = lapin::ConnectionProperties::default()
173 .with_executor(tokio_executor_trait::Tokio::current())
174 .with_reactor(tokio_reactor_trait::Tokio);
175
176 let connection = lapin::Connection::connect(uri, options).await?;
177 let channel = connection.create_channel().await?;
178
179 channel
180 .basic_qos(qos.unwrap_or(1), Default::default())
181 .await?;
182
183 Ok(AMQPConnection {
184 channel,
185 connection: Arc::new(connection),
186 serializer,
187 queue_options: DashMap::new(),
188 })
189 }
190
191 async fn declare_broadcast(
192 &self,
193 name: &str,
194 options: QueueOptions,
195 ) -> Result<<AMQPConnection as Connection>::QueueHandle> {
196 FieldTable::default().insert("x-expires".into(), 60000.into());
197 self.channel
198 .exchange_declare(
199 name,
200 ExchangeKind::Fanout,
201 options.into_exchange_declare_options(),
202 options.into_declare_arguments(),
203 )
204 .await?;
205
206 self.channel
207 .queue_declare(
208 name,
209 options.into_queue_declare_options(),
210 options.into_declare_arguments(),
211 )
212 .await?;
213
214 self.channel
215 .queue_bind(
216 name,
217 name,
218 "", Default::default(),
220 Default::default(),
221 )
222 .await?;
223
224 Ok(AMQPQueueHandle {
225 channel: self.channel.clone(),
226 name: name.to_string(),
227 serializer: self.serializer,
228 options,
229 })
230 }
231
232 async fn delete_broadcast(&self, name: &str) -> Result<()> {
233 self.channel
234 .queue_delete(name, QueueDeleteOptions::default())
235 .await?;
236
237 self.channel
238 .exchange_delete(name, Default::default())
239 .await?;
240
241 Ok(())
242 }
243
244 async fn declare_single(
245 &self,
246 name: &str,
247 options: QueueOptions,
248 ) -> Result<<AMQPConnection as Connection>::QueueHandle> {
249 self.channel
250 .queue_declare(
251 name,
252 options.into_queue_declare_options(),
253 options.into_declare_arguments(),
254 )
255 .await?;
256
257 Ok(AMQPQueueHandle {
258 channel: self.channel.clone(),
259 name: name.to_string(),
260 serializer: self.serializer,
261 options,
262 })
263 }
264
265 async fn delete_single(&self, name: &str) -> Result<()> {
266 self.channel
267 .queue_delete(name, QueueDeleteOptions::default())
268 .await?;
269
270 Ok(())
271 }
272}
273
274#[async_trait]
275impl Connection for AMQPConnection {
276 type QueueHandle = AMQPQueueHandle;
277
278 async fn close(&self) -> Result<()> {
279 _ = self.channel.close(200, "Goodbye").await;
280 _ = self.connection.close(200, "Goodbye").await;
281
282 Ok(())
283 }
284
285 async fn declare_queue(&self, name: &str, options: QueueOptions) -> Result<Self::QueueHandle> {
304 match options.syndication_mode {
305 SyndicationMode::ExactlyOnce => self.declare_single(name, options).await,
306 SyndicationMode::Broadcast => self.declare_broadcast(name, options).await,
307 }
308 }
309
310 async fn delete_queue(&self, name: &str) -> Result<()> {
311 match self.queue_options.entry(name.to_string()) {
312 Entry::Occupied(options) => {
313 match options.get().syndication_mode {
314 SyndicationMode::ExactlyOnce => self.delete_single(name).await?,
315 SyndicationMode::Broadcast => self.delete_broadcast(name).await?,
316 };
317
318 options.remove();
319
320 Ok(())
321 }
322 Entry::Vacant(_) => self.delete_broadcast(name).await,
323 }
324 }
325}
326
327impl AMQPQueueHandle {
328 async fn publish_broadcast<PayloadTarget: Serializable>(
329 &self,
330 payload: &PayloadTarget,
331 ) -> Result<()> {
332 self.channel
333 .basic_publish(
334 &self.name,
335 "", Default::default(),
337 &self.serializer.to_bytes(payload)?,
338 self.options.into_message_properties(),
339 )
340 .await?
341 .await?;
342
343 Ok(())
344 }
345
346 async fn publish_single<PayloadTarget: Serializable>(
347 &self,
348 payload: &PayloadTarget,
349 ) -> Result<()> {
350 self.channel
351 .basic_publish(
352 "",
353 &self.name,
354 Default::default(),
355 &self.serializer.to_bytes(payload)?,
356 self.options.into_message_properties(),
357 )
358 .await?
359 .await?;
360
361 Ok(())
362 }
363}
364
365#[derive(Clone)]
367pub struct AMQPQueueHandle {
368 channel: lapin::Channel,
369 name: String,
370 serializer: Serializer,
371 options: QueueOptions,
372}
373
374pub struct AMQPPublisher<T> {
375 queue_handle: AMQPQueueHandle,
376 _phantom: std::marker::PhantomData<T>,
377}
378
379impl<T> AMQPPublisher<T> {
380 pub fn new(queue_handle: AMQPQueueHandle) -> Self {
381 Self {
382 queue_handle,
383 _phantom: std::marker::PhantomData,
384 }
385 }
386}
387
388#[async_trait]
389impl<T: Serializable> Publisher<T> for AMQPPublisher<T> {
390 #[instrument(skip_all, level = "trace")]
428 async fn publish(&self, payload: &T) -> Result<()> {
429 match self.queue_handle.options.syndication_mode {
430 SyndicationMode::ExactlyOnce => self.queue_handle.publish_single(payload).await,
431 SyndicationMode::Broadcast => self.queue_handle.publish_broadcast(payload).await,
432 }
433 }
434
435 async fn close(&self) -> Result<()> {
436 Ok(())
437 }
438}
439
440#[async_trait]
441impl QueueHandle for AMQPQueueHandle {
442 type Acker = AMQPAcker;
443 type Consumer<PayloadTarget: Serializable> = AMQPConsumer<PayloadTarget>;
444 type Publisher<PayloadTarget: Serializable> = AMQPPublisher<PayloadTarget>;
445
446 fn publisher<PayloadTarget: Serializable>(&self) -> Self::Publisher<PayloadTarget> {
447 AMQPPublisher::new(self.clone())
448 }
449
450 #[instrument(skip(self), level = "trace")]
482 async fn declare_consumer<PayloadTarget: Serializable>(
483 &self,
484 consumer_name: &str,
485 ) -> Result<Self::Consumer<PayloadTarget>> {
486 let consumer = self
487 .channel
488 .basic_consume(
489 &self.name,
490 consumer_name,
491 Default::default(),
492 Default::default(),
493 )
494 .await?;
495
496 Ok(AMQPConsumer {
497 inner: consumer,
498 serializer: self.serializer,
499 _phantom: std::marker::PhantomData,
500 })
501 }
502}
503
504#[pin_project]
505pub struct AMQPConsumer<PayloadTarget> {
506 #[pin]
507 inner: lapin::Consumer,
508 serializer: Serializer,
509 _phantom: std::marker::PhantomData<PayloadTarget>,
510}
511
512impl<PayloadTarget: Serializable> Stream for AMQPConsumer<PayloadTarget> {
513 type Item = (PayloadTarget, AMQPAcker);
514
515 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
516 let this = self.project();
517 match this.inner.poll_next(cx) {
518 Poll::Ready(Some(Ok(delivery))) => {
519 let payload = this.serializer.from_bytes(&delivery.data);
520 match payload {
521 Ok(payload) => Poll::Ready(Some((payload, AMQPAcker { delivery }))),
522 Err(err) => {
523 error!("Error deserializing message, error: {err}");
524 Poll::Pending
525 }
526 }
527 }
528 Poll::Ready(Some(Err(err))) => {
529 let err = anyhow::Error::from(err);
530 error!("Error receiving message, error: {err}",);
531 Poll::Pending
532 }
533 Poll::Ready(None) => Poll::Ready(None),
534 Poll::Pending => Poll::Pending,
535 }
536 }
537}
538
539#[derive(Debug)]
540pub struct AMQPAcker {
542 delivery: lapin::message::Delivery,
543}
544
545#[async_trait]
546impl Acker for AMQPAcker {
547 async fn ack(&self) -> Result<()> {
548 Ok(self.delivery.ack(Default::default()).await?)
549 }
550
551 async fn nack(&self) -> Result<()> {
552 Ok(self.delivery.nack(Default::default()).await?)
553 }
554}