1use futures::{StreamExt};
4use crate::auto_ack::{AutoAck, auto_ack};
5use lapin::{Channel, Consumer};
6use crate::consumer::ConsumerWrapper;
7use anyhow::Result;
8use lapin::acker::Acker;
9use lapin::message::Delivery;
10use lapin::options::QueueDeleteOptions;
11use crate::comms::{RabbitDispatcher};
12
13#[derive(Default)]
14pub struct StreamBuilderWithName<Q, T, K, E> {
15 pub queue_name: Option<Q>,
17 pub tag: T,
19 pub exchange: E,
21 pub routing_key: K,
23 pub incompatible_delete: bool,
25 pub qos: Option<(u16, Option<lapin::options::BasicQosOptions>)>,
26 pub declare: Option<lapin::options::QueueDeclareOptions>,
27 pub declare_fields: Option<lapin::types::FieldTable>,
28 pub binding: Option<lapin::options::QueueBindOptions>,
29 pub binding_fields: Option<lapin::types::FieldTable>,
30 pub consume: Option<lapin::options::BasicConsumeOptions>,
31 pub consume_fields: Option<lapin::types::FieldTable>,
32}
33
34#[async_trait::async_trait]
35impl<Q, T, K, E> RabbitDispatcher for StreamBuilderWithName<Q, T, K, E>
36 where T: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
37 K: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
38 E: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
39 Q: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
40{
41 type Object = Consumer;
42
43 async fn dispatcher(&self, channel: &Channel) -> Result<Self::Object> {
44 let consumer = {
45 if let Some((qos, options)) = self.qos {
46 channel.basic_qos(qos,
47 options.unwrap_or_else(|| lapin::options::BasicQosOptions::default())
48 ).await?;
49 }
50 let queue_name = if let Some(queue_name) = self.queue_name.as_ref() {
51 queue_name.as_ref()
52 } else {
53 self.routing_key.as_ref()
54 };
55 if let Err(err) = channel.queue_declare(queue_name,
56 self.declare
57 .unwrap_or_else(|| lapin::options::QueueDeclareOptions::default()),
58 self.declare_fields.clone().unwrap_or_else(|| lapin::types::FieldTable::default()),
59 ).await {
60 log::warn!("Deleting incompatible queue: {}", err);
61 if self.incompatible_delete {
62 let msgs = channel.queue_delete(queue_name, QueueDeleteOptions {
63 if_unused: false,
64 if_empty: false,
65 nowait: true,
66 }).await?;
67 log::warn!("Incompatible queue deleted");
68 if msgs > 0 {
69 log::warn!("Deleting the previous queue purged {} messages", msgs);
70 }
71 Err(err)?;
72 } else {
73 Err(err)?;
74 }
75 }
76 log::trace!("Queue declared, binding it");
77
78 channel
79 .queue_bind(
80 queue_name,
81 self.exchange.as_ref(),
82 self.routing_key.as_ref(),
83 self.binding.unwrap_or_else(|| lapin::options::QueueBindOptions::default()),
84 self.binding_fields.clone().unwrap_or_else(|| lapin::types::FieldTable::default()),
85 )
86 .await?;
87 channel
88 .basic_consume(
89 queue_name,
90 self.tag.as_ref(),
91 self.consume.unwrap_or_else(|| lapin::options::BasicConsumeOptions::default()),
92 self.consume_fields.clone().unwrap_or_else(|| lapin::types::FieldTable::default()),
93 )
94 .await?
95 };
96 Ok(consumer)
97 }
98}
99
100pub trait Deserialise: Sized {
101 fn deserialise(data: Vec<u8>) -> Result<Self>;
102}
103
104
105pub mod queue_options;
106
107impl<Q, T,K,E> StreamBuilderWithName<Q, T, K, E>
108 where T: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
109 K: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
110 E: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
111 Q: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
112{
113 pub async fn create_plain(self) -> impl StreamExt<Item = Delivery> + Unpin + Send {
115 ConsumerWrapper::new(Box::new(self)).await
116 }
117
118 pub async fn create<I: Deserialise>(self) -> impl StreamExt<Item = (Acker, Result<I>)> + Unpin + Send {
120 let consumer = self.create_plain().await;
121 consumer.map(| delivery| (delivery.acker, I::deserialise(delivery.data)))
122 }
123
124 pub async fn create_auto_ack<I: Deserialise>(self) -> impl StreamExt<Item = (AutoAck, Result<I>)> + Unpin + Send {
126 let consumer = self.create_plain().await;
127 auto_ack(consumer).map(|(ack, delivery)| (ack, I::deserialise(delivery)))
128 }
129}
130
131
132#[derive(Clone, Default)]
133pub struct StreamBuilder<T, K, E> {
134 pub tag: T,
136 pub exchange: E,
138 pub routing_key: K,
140 pub incompatible_delete: bool,
142 pub qos: Option<(u16, Option<lapin::options::BasicQosOptions>)>,
143 pub declare: Option<lapin::options::QueueDeclareOptions>,
144 pub declare_fields: Option<lapin::types::FieldTable>,
145 pub binding: Option<lapin::options::QueueBindOptions>,
146 pub binding_fields: Option<lapin::types::FieldTable>,
147 pub consume: Option<lapin::options::BasicConsumeOptions>,
148 pub consume_fields: Option<lapin::types::FieldTable>,
149}
150
151impl<T, K, E> StreamBuilder<T, K, E> {
152 fn into_with_name(self) -> StreamBuilderWithName<&'static str, T, K, E> {
153 StreamBuilderWithName::<&'static str, T, K, E> {
154 tag: self.tag,
155 exchange: self.exchange,
156 routing_key: self.routing_key,
157 queue_name: None,
158 incompatible_delete: self.incompatible_delete,
159 qos: self.qos,
160 declare: self.declare,
161 declare_fields: self.declare_fields,
162 binding: self.binding,
163 binding_fields: self.binding_fields,
164 consume: self.consume,
165 consume_fields: self.consume_fields,
166 }
167 }
168}
169
170
171impl<T, K, E> StreamBuilder<T, K, E>
172 where T: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
173 K: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
174 E: AsRef<str> + Send + Unpin + 'static + Clone + Sync,
175{
176 pub async fn create_plain(self) -> impl StreamExt<Item = Delivery> + Unpin + Send {
178 self.into_with_name().create_plain().await
179 }
180
181 pub async fn create<I: Deserialise>(self) -> impl StreamExt<Item = (Acker, Result<I>)> + Unpin + Send {
183 let consumer = self.create_plain().await;
184 consumer.map(| delivery| (delivery.acker, I::deserialise(delivery.data)))
185 }
186
187 pub async fn create_auto_ack<I: Deserialise>(self) -> impl StreamExt<Item = (AutoAck, Result<I>)> + Unpin + Send {
189 let consumer = self.create_plain().await;
190 auto_ack(consumer).map(|(ack, delivery)| (ack, I::deserialise(delivery)))
191 }
192}