rabbit_auto/
stream_builder.rs

1// #[cfg(feature = "async_std_runtime")]
2// use async_std::sync::RwLock;
3use futures::{StreamExt};
4use crate::auto_ack::{AutoAck, auto_ack};
5use lapin::{Channel, Consumer};
6use crate::consumer::ConsumerWrapper;
7use anyhow::Result;
8use lapin::acker::Acker;
9use lapin::message::Delivery;
10use lapin::options::QueueDeleteOptions;
11use crate::comms::{RabbitDispatcher};
12
13#[derive(Default)]
14pub struct StreamBuilderWithName<Q, T, K, E> {
15    /// Gives this name to the queue, or the default would be the `routing_key`.
16    pub queue_name: Option<Q>,
17    /// The tag of the consumer
18    pub tag: T,
19    /// The exchange where to bind this queue
20    pub exchange: E,
21    /// The routing key of the consumer
22    pub routing_key: K,
23    /// When creating the queue, delete the old if it is not compatible
24    pub incompatible_delete: bool,
25    pub qos: Option<(u16, Option<lapin::options::BasicQosOptions>)>,
26    pub declare: Option<lapin::options::QueueDeclareOptions>,
27    pub declare_fields: Option<lapin::types::FieldTable>,
28    pub binding: Option<lapin::options::QueueBindOptions>,
29    pub binding_fields: Option<lapin::types::FieldTable>,
30    pub consume: Option<lapin::options::BasicConsumeOptions>,
31    pub consume_fields: Option<lapin::types::FieldTable>,
32}
33
34#[async_trait::async_trait]
35impl<Q, T, K, E> RabbitDispatcher for StreamBuilderWithName<Q, T, K, E>
36    where T: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
37          K: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
38          E: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
39          Q: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
40{
41    type Object = Consumer;
42
43    async fn dispatcher(&self, channel: &Channel) -> Result<Self::Object> {
44        let consumer = {
45            if let Some((qos, options)) = self.qos {
46                channel.basic_qos(qos,
47                                  options.unwrap_or_else(|| lapin::options::BasicQosOptions::default())
48                ).await?;
49            }
50            let queue_name = if let Some(queue_name) = self.queue_name.as_ref() {
51                queue_name.as_ref()
52            } else {
53                self.routing_key.as_ref()
54            };
55            if let Err(err) = channel.queue_declare(queue_name,
56                                                    self.declare
57                                                        .unwrap_or_else(|| lapin::options::QueueDeclareOptions::default()),
58                                                    self.declare_fields.clone().unwrap_or_else(|| lapin::types::FieldTable::default()),
59            ).await {
60                log::warn!("Deleting incompatible queue: {}", err);
61                if self.incompatible_delete {
62                    let msgs = channel.queue_delete(queue_name, QueueDeleteOptions {
63                        if_unused: false,
64                        if_empty: false,
65                        nowait: true,
66                    }).await?;
67                    log::warn!("Incompatible queue deleted");
68                    if msgs > 0 {
69                        log::warn!("Deleting the previous queue purged {} messages", msgs);
70                    }
71                    Err(err)?;
72                } else {
73                    Err(err)?;
74                }
75            }
76            log::trace!("Queue declared, binding it");
77
78            channel
79                .queue_bind(
80                    queue_name,
81                    self.exchange.as_ref(),
82                    self.routing_key.as_ref(),
83                    self.binding.unwrap_or_else(|| lapin::options::QueueBindOptions::default()),
84                    self.binding_fields.clone().unwrap_or_else(|| lapin::types::FieldTable::default()),
85                )
86                .await?;
87            channel
88                .basic_consume(
89                    queue_name,
90                    self.tag.as_ref(),
91                    self.consume.unwrap_or_else(|| lapin::options::BasicConsumeOptions::default()),
92                    self.consume_fields.clone().unwrap_or_else(|| lapin::types::FieldTable::default()),
93                )
94                .await?
95        };
96        Ok(consumer)
97    }
98}
99
100pub trait Deserialise: Sized {
101    fn deserialise(data: Vec<u8>) -> Result<Self>;
102}
103
104
105pub mod queue_options;
106
107impl<Q, T,K,E> StreamBuilderWithName<Q, T, K, E>
108    where T: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
109          K: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
110          E: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
111          Q: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
112{
113    /// Creates a consumer which returns channel and the delivery.
114    pub async fn create_plain(self) -> impl StreamExt<Item = Delivery> + Unpin + Send {
115        ConsumerWrapper::new(Box::new(self)).await
116    }
117
118    /// Creates a consumer which returns an acker and the deserialised item
119    pub async fn create<I: Deserialise>(self) -> impl StreamExt<Item = (Acker, Result<I>)> + Unpin + Send {
120        let consumer = self.create_plain().await;
121        consumer.map(| delivery| (delivery.acker, I::deserialise(delivery.data)))
122    }
123
124    /// Creates a consumer which returns autoack and the item
125    pub async fn create_auto_ack<I: Deserialise>(self) -> impl StreamExt<Item = (AutoAck, Result<I>)> + Unpin + Send {
126        let consumer = self.create_plain().await;
127        auto_ack(consumer).map(|(ack, delivery)| (ack, I::deserialise(delivery)))
128    }
129}
130
131
132#[derive(Clone, Default)]
133pub struct StreamBuilder<T, K, E> {
134    /// The tag of the consumer
135    pub tag: T,
136    /// The exchange where to bind this queue
137    pub exchange: E,
138    /// The routing key of the consumer
139    pub routing_key: K,
140    /// When creating the queue, delete the old if it is not compatible
141    pub incompatible_delete: bool,
142    pub qos: Option<(u16, Option<lapin::options::BasicQosOptions>)>,
143    pub declare: Option<lapin::options::QueueDeclareOptions>,
144    pub declare_fields: Option<lapin::types::FieldTable>,
145    pub binding: Option<lapin::options::QueueBindOptions>,
146    pub binding_fields: Option<lapin::types::FieldTable>,
147    pub consume: Option<lapin::options::BasicConsumeOptions>,
148    pub consume_fields: Option<lapin::types::FieldTable>,
149}
150
151impl<T, K, E> StreamBuilder<T, K, E> {
152    fn into_with_name(self) -> StreamBuilderWithName<&'static str, T, K, E> {
153        StreamBuilderWithName::<&'static str, T, K, E> {
154            tag: self.tag,
155            exchange: self.exchange,
156            routing_key: self.routing_key,
157            queue_name: None,
158            incompatible_delete: self.incompatible_delete,
159            qos: self.qos,
160            declare: self.declare,
161            declare_fields: self.declare_fields,
162            binding: self.binding,
163            binding_fields: self.binding_fields,
164            consume: self.consume,
165            consume_fields: self.consume_fields,
166        }
167    }
168}
169
170
171impl<T, K, E> StreamBuilder<T, K, E>
172    where T: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
173          K: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
174          E: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
175{
176    /// Creates a consumer which returns channel and the delivery.
177    pub async fn create_plain(self) -> impl StreamExt<Item = Delivery> + Unpin + Send {
178            self.into_with_name().create_plain().await
179    }
180
181    /// Creates a consumer which returns an acker and the deserialised item
182    pub async fn create<I: Deserialise>(self) -> impl StreamExt<Item = (Acker, Result<I>)> + Unpin + Send {
183        let consumer = self.create_plain().await;
184        consumer.map(| delivery| (delivery.acker, I::deserialise(delivery.data)))
185    }
186
187    /// Creates a consumer which returns autoack and the item
188    pub async fn create_auto_ack<I: Deserialise>(self) -> impl StreamExt<Item = (AutoAck, Result<I>)> + Unpin + Send {
189        let consumer = self.create_plain().await;
190        auto_ack(consumer).map(|(ack, delivery)| (ack, I::deserialise(delivery)))
191    }
192}