1use crate::domain::config::QoSConfig;
2use crate::{
3 api::connection::AsyncConnection,
4 domain::config::Config,
5 errors::AppError,
6};
7use std::error::Error as StdError;
8use std::future::Future;
9use std::sync::Arc;
10use tokio::time::Duration;
11use std::pin::Pin;
12use crate::api::utils::{Confirmations, ContentEncoding, DeliveryMode, Message, QueueOptions};
13
14#[derive(Clone)]
15pub struct AsyncEventbusRabbitMQ {
16 config: Arc<Config>,
17 pub_connection: AsyncConnection,
18 sub_connection: AsyncConnection,
19 rpc_client_connection: AsyncConnection,
20 rpc_server_connection: AsyncConnection,
21}
22
23impl AsyncEventbusRabbitMQ {
24 pub fn new(config: Config, qos_config: QoSConfig) -> Self {
25 let config = Arc::new(config);
26 Self {
27 config: Arc::clone(&config),
28 pub_connection: AsyncConnection::new(Arc::clone(&config), if qos_config.pub_confirm { Confirmations::PublisherConfirms } else { Confirmations::Disables }, false, None),
29 sub_connection: AsyncConnection::new(Arc::clone(&config), Confirmations::Disables, qos_config.sub_auto_ack, qos_config.sub_prefetch),
30 rpc_client_connection: AsyncConnection::new(Arc::clone(&config), if qos_config.rpc_client_confirm { Confirmations::RPCClientPublisherConfirms } else { Confirmations::Disables }, qos_config.rpc_client_auto_ack, qos_config.rpc_client_prefetch),
31 rpc_server_connection: AsyncConnection::new(Arc::clone(&config), if qos_config.rpc_server_confirm { Confirmations::RPCServerPublisherConfirms } else { Confirmations::Disables }, qos_config.rpc_server_auto_ack, qos_config.rpc_server_prefetch),
32 }
33 }
34
35 pub async fn update_secret(&self, new_secret: &str, reason: &str, command_timeout: Option<Duration>) -> Result<(), AppError> {
36 tokio::try_join!(
37 self.pub_connection.update_secret(new_secret, reason, command_timeout),
38 self.sub_connection.update_secret(new_secret, reason, command_timeout),
39 self.rpc_client_connection.update_secret(new_secret, reason, command_timeout),
40 self.rpc_server_connection.update_secret(new_secret, reason, command_timeout)
41 )?;
42 Ok(())
43 }
44 pub async fn publish(
45 &self,
46 exchange_name: &str,
47 routing_key: &str,
48 body: impl Into<Vec<u8>>,
49 content_type: Option<&str>,
50 content_encoding: ContentEncoding,
51 command_timeout: Option<Duration>,
52 delivery_mode: Option<DeliveryMode>,
53 expiration: Option<u32>,
54 ) -> Result<(), AppError> {
55 let content_type = content_type.unwrap_or("application/json");
56 let delivery_mode = delivery_mode.unwrap_or(DeliveryMode::Transient);
57 let command_timeout = command_timeout.or(Some(Duration::from_secs(16)));
58
59 self.pub_connection.publish(
60 exchange_name,
61 routing_key,
62 body,
63 content_type,
64 content_encoding,
65 command_timeout,
66 delivery_mode,
67 expiration,
68 ).await
69 }
70
71 pub async fn subscribe<F, Fut>(
72 &self,
73 exchange_name: &str,
74 routing_key: &str,
75 handler: F,
76 process_timeout: Option<Duration>,
77 command_timeout: Option<Duration>,
78 ) -> Result<(), AppError>
79 where
80 F: Fn(Message) -> Fut + Send + Sync + 'static,
81 Fut: Future<Output = Result<(), Box<dyn StdError + Send + Sync>>> + Send + 'static,
82 {
83 let command_timeout = command_timeout.or(Some(Duration::from_secs(16)));
84 let queue_name = &self.config.options.queue_name;
85 let exchange_type = "topic";
86
87 let handler = Arc::new(move |data| {
88 Box::pin(handler(data)) as Pin<Box<dyn Future<Output = Result<(), Box<dyn StdError + Send + Sync>>> + Send>>
89 });
90 let queue_options = QueueOptions::new()
91 .auto_delete(false)
92 .durable(true)
93 .exclusive(false)
94 .no_create(false);
95
96 self.sub_connection.subscribe(
97 handler,
98 routing_key,
99 exchange_name,
100 exchange_type,
101 queue_name,
102 process_timeout,
103 command_timeout,
104 queue_options
105 ).await
106 }
107
108 pub async fn rpc_client(
109 &self,
110 exchange_name: &str,
111 routing_key: &str,
112 body: impl Into<Vec<u8>>,
113 content_type: &str,
114 content_encoding: ContentEncoding,
115 response_timeout_millis: u32,
116 command_timeout: Option<Duration>,
117 delivery_mode: Option<DeliveryMode>,
118 expiration: Option<u32>
119 ) -> Result<Vec<u8>, AppError>
120 {
121 let command_timeout = command_timeout.or(Some(Duration::from_secs(32)));
122 let delivery_mode = delivery_mode.unwrap_or(DeliveryMode::Transient);
123
124 self.rpc_client_connection.rpc_client(
125 exchange_name,
126 routing_key,
127 body,
128 content_type,
129 content_encoding,
130 response_timeout_millis,
131 command_timeout,
132 delivery_mode,
133 expiration,
134 ).await
135 }
136
137 pub async fn provide_resource<F, Fut>(
138 &self,
139 routing_key: &str,
140 handler: F,
141 process_timeout: Option<Duration>,
142 command_timeout: Option<Duration>,
143 ) -> Result<(), AppError>
144 where
145 F: Fn(Message) -> Fut + Send + Sync + 'static,
146 Fut: Future<Output = Result<Message, Box<dyn StdError + Send + Sync>>> + Send + 'static,
147 {
148 let command_timeout = command_timeout.or(Some(Duration::from_secs(16)));
149 let queue_name = &self.config.options.rpc_queue_name;
150 let exchange_name = &self.config.options.rpc_exchange_name;
151 let exchange_type = "topic";
152
153 let handler = Arc::new(move |data| {
154 Box::pin(handler(data)) as Pin<Box<dyn Future<Output = Result<Message, Box<dyn StdError + Send + Sync>>> + Send>>
155 });
156
157 let queue_options = QueueOptions::new()
158 .auto_delete(false)
159 .durable(true)
160 .exclusive(false)
161 .no_create(false);
162
163 self.rpc_server_connection.rpc_server(
164 handler,
165 routing_key,
166 exchange_name,
167 exchange_type,
168 queue_name,
169 process_timeout,
170 command_timeout,
171 queue_options
172 ).await
173 }
174
175 pub async fn dispose(&self) -> Result<(), Box<dyn std::error::Error>> {
176 self.sub_connection.close().await?;
177 self.rpc_server_connection.close().await?;
178 self.pub_connection.close().await?;
179 self.rpc_client_connection.close().await?;
180 Ok(())
181 }
182}