1use crate::api::connection_manager::ConnectionCommand;
2use crate::domain::config::Config;
3use crate::{
4 api::{
5 connection_manager::ConnectionManager,
6 utils::{
7 Confirmations, ContentEncoding, DeliveryMode, Handler,
8 QueueOptions, RPCHandler, compress,
9 },
10 },
11 errors::{AppError, AppErrorType},
12};
13use std::{
14 sync::{
15 Arc,
16 atomic::{AtomicBool, Ordering},
17 },
18};
19use tokio::{
20 sync::{mpsc, oneshot},
21 time::{Duration, timeout},
22};
23
24
25#[derive(Clone)]
27pub struct AsyncConnection {
28 sender: mpsc::UnboundedSender<ConnectionCommand>,
29 publisher_confirms: Confirmations,
30 is_closing: Arc<AtomicBool>,
31}
32
33impl AsyncConnection {
34 pub fn new(
35 config: Arc<Config>,
36 publisher_confirms: Confirmations,
37 auto_ack: bool,
38 prefetch_count: Option<u16>,
39 ) -> Self {
40 let (tx, rx) = mpsc::unbounded_channel();
41
42 let manager = ConnectionManager::new(
43 config,
44 tx.clone(),
45 rx,
46 publisher_confirms,
47 auto_ack,
48 prefetch_count,
49 );
50 tokio::spawn(async move {
51 manager.run().await;
52 });
53 Self {
54 sender: tx,
55 publisher_confirms,
56 is_closing: Arc::new(AtomicBool::new(false)),
57 }
58 }
59
60 pub async fn publish(
61 &self,
62 exchange_name: &str,
63 routing_key: &str,
64 body: impl Into<Vec<u8>>,
65 content_type: &str,
66 content_encoding: ContentEncoding,
67 command_timeout: Option<Duration>,
68 delivery_mode: DeliveryMode,
69 expiration: Option<u32>,
70 ) -> Result<(), AppError> {
71 if self.is_closing.load(Ordering::Acquire) {
72 return Err(AppError::new(
73 Some("Connection is shutting down".to_owned()),
74 None,
75 AppErrorType::InternalError, ));
77 }
78 let (resp_tx, resp_rx) = oneshot::channel();
79 let body = compress(body, content_encoding)?;
80 if self.publisher_confirms == Confirmations::PublisherConfirms {
81 let confirmation = oneshot::channel();
82
83 let cmd = ConnectionCommand::Publish {
84 exchange_name: exchange_name.to_string(),
85 routing_key: routing_key.to_string(),
86 body,
87 content_type: content_type.to_string(),
88 content_encoding,
89 delivery_mode,
90 expiration,
91 response: resp_tx,
92 confirm: Some(confirmation.0),
93 };
94 let (_, _) =
95 tokio::try_join!(self.send_command(cmd, resp_rx, command_timeout), async {
96 match timeout(
97 command_timeout.unwrap_or(Duration::from_secs(16)),
98 confirmation.1,
99 )
100 .await
101 {
102 Ok(Ok(res)) => res,
103 Ok(Err(_)) => Err(AppError::new(
104 Some("Confirm channel closed".to_owned()),
105 None,
106 AppErrorType::InternalError,
107 )),
108 Err(_) => Err(AppError::new(
109 Some("Timeout waiting for confirmation".to_owned()),
110 None,
111 AppErrorType::TimeoutError,
112 )),
113 }
114 })?;
115 Ok(())
116 } else {
117 let cmd = ConnectionCommand::Publish {
118 exchange_name: exchange_name.to_string(),
119 routing_key: routing_key.to_string(),
120 body,
121 content_type: content_type.to_string(),
122 content_encoding,
123 delivery_mode,
124 expiration,
125 response: resp_tx,
126 confirm: None,
127 };
128 self.send_command(cmd, resp_rx, command_timeout).await
129 }
130 }
131
132 pub async fn subscribe(
133 &self,
134 handler: Handler,
135 routing_key: &str,
136 exchange_name: &str,
137 exchange_type: &str,
138 queue_name: &str,
139 process_timeout: Option<Duration>,
140 timeout_duration: Option<Duration>,
141 queue_options: QueueOptions,
142 ) -> Result<(), AppError> {
143 if self.is_closing.load(Ordering::Acquire) {
144 return Err(AppError::new(
145 Some("Connection is shutting down".to_string()),
146 None,
147 AppErrorType::InternalError, ));
149 }
150 let (resp_tx, resp_rx) = oneshot::channel();
151 let cmd = ConnectionCommand::Subscribe {
152 handler,
153 routing_key: routing_key.to_string(),
154 exchange_name: exchange_name.to_string(),
155 exchange_type: exchange_type.to_string(),
156 queue_name: queue_name.to_string(),
157 response: resp_tx,
158 process_timeout,
159 queue_options,
160 };
161 self.send_command(cmd, resp_rx, timeout_duration).await
162 }
163
164 pub async fn rpc_server(
165 &self,
166 handler: RPCHandler,
167 routing_key: &str,
168 exchange_name: &str,
169 exchange_type: &str,
170 queue_name: &str,
171 response_timeout: Option<Duration>,
172 timeout_duration: Option<Duration>,
173 queue_options: QueueOptions,
174 ) -> Result<(), AppError> {
175 if self.is_closing.load(Ordering::Acquire) {
176 return Err(AppError::new(
177 Some("Connection is shutting down".to_string()),
178 None,
179 AppErrorType::InternalError,
180 ));
181 }
182 let (resp_tx, resp_rx) = oneshot::channel();
183 let cmd = ConnectionCommand::RpcServer {
184 handler,
185 routing_key: routing_key.to_string(),
186 exchange_name: exchange_name.to_string(),
187 exchange_type: exchange_type.to_string(),
188 queue_name: queue_name.to_string(),
189 response: resp_tx,
190 response_timeout,
191 queue_options,
192 };
193 self.send_command(cmd, resp_rx, timeout_duration).await
194 }
195
196 pub async fn rpc_client(
197 &self,
198 exchange_name: &str,
199 routing_key: &str,
200 body: impl Into<Vec<u8>>,
201 content_type: &str,
202 content_encoding: ContentEncoding,
203 response_timeout_millis: u32,
204 command_timeout: Option<Duration>,
205 delivery_mode: DeliveryMode,
206 expiration: Option<u32>,
207 ) -> Result<Vec<u8>, AppError> {
208 if self.is_closing.load(Ordering::Acquire) {
209 return Err(AppError::new(
210 Some("Connection is shutting down".to_string()),
211 None,
212 AppErrorType::InternalError,
213 ));
214 }
215 let (resp_tx, resp_rx) = oneshot::channel();
216 let body = compress(body.into(), content_encoding)?;
217 if self.publisher_confirms == Confirmations::RPCClientPublisherConfirms {
218 let confirmation = oneshot::channel();
219 let cmd = ConnectionCommand::RpcClient {
220 exchange_name: exchange_name.to_string(),
221 routing_key: routing_key.to_string(),
222 body,
223 content_type: content_type.to_string(),
224 content_encoding,
225 response_timeout_millis,
226 delivery_mode,
227 expiration,
228 response: resp_tx,
229 confirm: Some(confirmation.0),
230 };
231 let confirmation = async {
232 match timeout(
233 command_timeout.unwrap_or(Duration::from_secs(16)),
234 confirmation.1,
235 )
236 .await
237 {
238 Ok(Ok(res)) => res,
239 Ok(Err(_)) => Err(AppError::new(
240 Some("Confirm channel closed".to_owned()),
241 None,
242 AppErrorType::InternalError,
243 )),
244 Err(_) => Err(AppError::new(
245 Some("Timeout waiting for confirmation".to_owned()),
246 None,
247 AppErrorType::TimeoutError,
248 )),
249 }
250 };
251 let (response, _) = tokio::try_join!(
252 self.send_command(cmd, resp_rx, command_timeout),
253 confirmation
254 )?;
255 Ok(response)
256 } else {
257 let cmd = ConnectionCommand::RpcClient {
258 exchange_name: exchange_name.to_string(),
259 routing_key: routing_key.to_string(),
260 body,
261 content_type: content_type.to_string(),
262 content_encoding,
263 response_timeout_millis,
264 delivery_mode,
265 expiration,
266 response: resp_tx,
267 confirm: None,
268 };
269 self.send_command(cmd, resp_rx, command_timeout).await
270 }
271 }
272
273 pub async fn update_secret(
274 &self,
275 new_secret: &str,
276 reason: &str,
277 command_timeout: Option<Duration>,
278 ) -> Result<(), AppError> {
279 if self.is_closing.load(Ordering::Acquire) {
280 return Err(AppError::new(
281 Some("Connection is shutting down".to_string()),
282 None,
283 AppErrorType::InternalError,
284 ));
285 }
286 let (resp_tx, resp_rx) = oneshot::channel();
287 let cmd = ConnectionCommand::UpdateSecret {
288 new_secret: new_secret.to_string(),
289 reason: reason.to_string(),
290 response: resp_tx,
291 };
292 self.send_command(cmd, resp_rx, command_timeout).await
293 }
294
295 async fn send_command<T>(
296 &self,
297 cmd: ConnectionCommand,
298 rx: oneshot::Receiver<Result<T, AppError>>,
299 command_timeout: Option<Duration>,
300 ) -> Result<T, AppError> {
301 if self.sender.send(cmd).is_err() {
302 return Err(AppError::new(
303 Some("Connection manager dropped".to_string()),
304 None,
305 AppErrorType::InternalError,
306 ));
307 }
308
309 match command_timeout {
310 Some(dur) => match timeout(dur, rx).await {
311 Ok(Ok(res)) => res,
312 Ok(Err(_)) => Err(AppError::new(
313 Some("Response channel closed".to_owned()),
314 None,
315 AppErrorType::InternalError,
316 )),
317 Err(_) => Err(AppError::new(
318 Some("Timeout waiting for connection".to_owned()),
319 None,
320 AppErrorType::TimeoutError,
321 )),
322 },
323 None => match rx.await {
324 Ok(res) => res,
325 Err(_) => Err(AppError::new(
326 Some("Response channel closed".to_owned()),
327 None,
328 AppErrorType::InternalError,
329 )),
330 },
331 }
332 }
333
334 pub async fn close(&self) -> Result<(), Box<dyn std::error::Error>> {
335 self.is_closing.store(true, Ordering::Release);
336 let (tx, rx) = oneshot::channel();
337 self.sender
338 .send(ConnectionCommand::Close { response: tx })?;
339 rx.await?;
340 Ok(())
341 }
342}