Skip to main content

amqp_client_rust/api/
connection.rs

1use crate::api::connection_manager::ConnectionCommand;
2use crate::domain::config::Config;
3use crate::{
4    api::{
5        connection_manager::ConnectionManager,
6        utils::{
7            Confirmations, ContentEncoding, DeliveryMode, Handler,
8            QueueOptions, RPCHandler, compress,
9        },
10    },
11    errors::{AppError, AppErrorType},
12};
13use std::{
14    sync::{
15        Arc,
16        atomic::{AtomicBool, Ordering},
17    },
18};
19use tokio::{
20    sync::{mpsc, oneshot},
21    time::{Duration, timeout},
22};
23
24
25// The Handle exposed to the EventBus
26#[derive(Clone)]
27pub struct AsyncConnection {
28    sender: mpsc::UnboundedSender<ConnectionCommand>,
29    publisher_confirms: Confirmations,
30    is_closing: Arc<AtomicBool>,
31}
32
33impl AsyncConnection {
34    pub fn new(
35        config: Arc<Config>,
36        publisher_confirms: Confirmations,
37        auto_ack: bool,
38        prefetch_count: Option<u16>,
39    ) -> Self {
40        let (tx, rx) = mpsc::unbounded_channel();
41
42        let manager = ConnectionManager::new(
43            config,
44            tx.clone(),
45            rx,
46            publisher_confirms,
47            auto_ack,
48            prefetch_count,
49        );
50        tokio::spawn(async move {
51            manager.run().await;
52        });
53        Self {
54            sender: tx,
55            publisher_confirms,
56            is_closing: Arc::new(AtomicBool::new(false)),
57        }
58    }
59
60    pub async fn publish(
61        &self,
62        exchange_name: &str,
63        routing_key: &str,
64        body: impl Into<Vec<u8>>,
65        content_type: &str,
66        content_encoding: ContentEncoding,
67        command_timeout: Option<Duration>,
68        delivery_mode: DeliveryMode,
69        expiration: Option<u32>,
70    ) -> Result<(), AppError> {
71        if self.is_closing.load(Ordering::Acquire) {
72            return Err(AppError::new(
73                Some("Connection is shutting down".to_owned()),
74                None,
75                AppErrorType::InternalError, // Or a new ConnectionClosed type
76            ));
77        }
78        let (resp_tx, resp_rx) = oneshot::channel();
79        let body = compress(body, content_encoding)?;
80        if self.publisher_confirms == Confirmations::PublisherConfirms {
81            let confirmation = oneshot::channel();
82
83            let cmd = ConnectionCommand::Publish {
84                exchange_name: exchange_name.to_string(),
85                routing_key: routing_key.to_string(),
86                body,
87                content_type: content_type.to_string(),
88                content_encoding,
89                delivery_mode,
90                expiration,
91                response: resp_tx,
92                confirm: Some(confirmation.0),
93            };
94            let (_, _) =
95                tokio::try_join!(self.send_command(cmd, resp_rx, command_timeout), async {
96                    match timeout(
97                        command_timeout.unwrap_or(Duration::from_secs(16)),
98                        confirmation.1,
99                    )
100                    .await
101                    {
102                        Ok(Ok(res)) => res,
103                        Ok(Err(_)) => Err(AppError::new(
104                            Some("Confirm channel closed".to_owned()),
105                            None,
106                            AppErrorType::InternalError,
107                        )),
108                        Err(_) => Err(AppError::new(
109                            Some("Timeout waiting for confirmation".to_owned()),
110                            None,
111                            AppErrorType::TimeoutError,
112                        )),
113                    }
114                })?;
115            Ok(())
116        } else {
117            let cmd = ConnectionCommand::Publish {
118                exchange_name: exchange_name.to_string(),
119                routing_key: routing_key.to_string(),
120                body,
121                content_type: content_type.to_string(),
122                content_encoding,
123                delivery_mode,
124                expiration,
125                response: resp_tx,
126                confirm: None,
127            };
128            self.send_command(cmd, resp_rx, command_timeout).await
129        }
130    }
131
132    pub async fn subscribe(
133        &self,
134        handler: Handler,
135        routing_key: &str,
136        exchange_name: &str,
137        exchange_type: &str,
138        queue_name: &str,
139        process_timeout: Option<Duration>,
140        timeout_duration: Option<Duration>,
141        queue_options: QueueOptions,
142    ) -> Result<(), AppError> {
143        if self.is_closing.load(Ordering::Acquire) {
144            return Err(AppError::new(
145                Some("Connection is shutting down".to_string()),
146                None,
147                AppErrorType::InternalError, // Or a new ConnectionClosed type
148            ));
149        }
150        let (resp_tx, resp_rx) = oneshot::channel();
151        let cmd = ConnectionCommand::Subscribe {
152            handler,
153            routing_key: routing_key.to_string(),
154            exchange_name: exchange_name.to_string(),
155            exchange_type: exchange_type.to_string(),
156            queue_name: queue_name.to_string(),
157            response: resp_tx,
158            process_timeout,
159            queue_options,
160        };
161        self.send_command(cmd, resp_rx, timeout_duration).await
162    }
163
164    pub async fn rpc_server(
165        &self,
166        handler: RPCHandler,
167        routing_key: &str,
168        exchange_name: &str,
169        exchange_type: &str,
170        queue_name: &str,
171        response_timeout: Option<Duration>,
172        timeout_duration: Option<Duration>,
173        queue_options: QueueOptions,
174    ) -> Result<(), AppError> {
175        if self.is_closing.load(Ordering::Acquire) {
176            return Err(AppError::new(
177                Some("Connection is shutting down".to_string()),
178                None,
179                AppErrorType::InternalError,
180            ));
181        }
182        let (resp_tx, resp_rx) = oneshot::channel();
183        let cmd = ConnectionCommand::RpcServer {
184            handler,
185            routing_key: routing_key.to_string(),
186            exchange_name: exchange_name.to_string(),
187            exchange_type: exchange_type.to_string(),
188            queue_name: queue_name.to_string(),
189            response: resp_tx,
190            response_timeout,
191            queue_options,
192        };
193        self.send_command(cmd, resp_rx, timeout_duration).await
194    }
195
196    pub async fn rpc_client(
197        &self,
198        exchange_name: &str,
199        routing_key: &str,
200        body: impl Into<Vec<u8>>,
201        content_type: &str,
202        content_encoding: ContentEncoding,
203        response_timeout_millis: u32,
204        command_timeout: Option<Duration>,
205        delivery_mode: DeliveryMode,
206        expiration: Option<u32>,
207    ) -> Result<Vec<u8>, AppError> {
208        if self.is_closing.load(Ordering::Acquire) {
209            return Err(AppError::new(
210                Some("Connection is shutting down".to_string()),
211                None,
212                AppErrorType::InternalError,
213            ));
214        }
215        let (resp_tx, resp_rx) = oneshot::channel();
216        let body = compress(body.into(), content_encoding)?;
217        if self.publisher_confirms == Confirmations::RPCClientPublisherConfirms {
218            let confirmation = oneshot::channel();
219            let cmd = ConnectionCommand::RpcClient {
220                exchange_name: exchange_name.to_string(),
221                routing_key: routing_key.to_string(),
222                body,
223                content_type: content_type.to_string(),
224                content_encoding,
225                response_timeout_millis,
226                delivery_mode,
227                expiration,
228                response: resp_tx,
229                confirm: Some(confirmation.0),
230            };
231            let confirmation = async {
232                match timeout(
233                    command_timeout.unwrap_or(Duration::from_secs(16)),
234                    confirmation.1,
235                )
236                .await
237                {
238                    Ok(Ok(res)) => res,
239                    Ok(Err(_)) => Err(AppError::new(
240                        Some("Confirm channel closed".to_owned()),
241                        None,
242                        AppErrorType::InternalError,
243                    )),
244                    Err(_) => Err(AppError::new(
245                        Some("Timeout waiting for confirmation".to_owned()),
246                        None,
247                        AppErrorType::TimeoutError,
248                    )),
249                }
250            };
251            let (response, _) = tokio::try_join!(
252                self.send_command(cmd, resp_rx, command_timeout),
253                confirmation
254            )?;
255            Ok(response)
256        } else {
257            let cmd = ConnectionCommand::RpcClient {
258                exchange_name: exchange_name.to_string(),
259                routing_key: routing_key.to_string(),
260                body,
261                content_type: content_type.to_string(),
262                content_encoding,
263                response_timeout_millis,
264                delivery_mode,
265                expiration,
266                response: resp_tx,
267                confirm: None,
268            };
269            self.send_command(cmd, resp_rx, command_timeout).await
270        }
271    }
272
273    pub async fn update_secret(
274        &self,
275        new_secret: &str,
276        reason: &str,
277        command_timeout: Option<Duration>,
278    ) -> Result<(), AppError> {
279        if self.is_closing.load(Ordering::Acquire) {
280            return Err(AppError::new(
281                Some("Connection is shutting down".to_string()),
282                None,
283                AppErrorType::InternalError,
284            ));
285        }
286        let (resp_tx, resp_rx) = oneshot::channel();
287        let cmd = ConnectionCommand::UpdateSecret {
288            new_secret: new_secret.to_string(),
289            reason: reason.to_string(),
290            response: resp_tx,
291        };
292        self.send_command(cmd, resp_rx, command_timeout).await
293    }
294
295    async fn send_command<T>(
296        &self,
297        cmd: ConnectionCommand,
298        rx: oneshot::Receiver<Result<T, AppError>>,
299        command_timeout: Option<Duration>,
300    ) -> Result<T, AppError> {
301        if self.sender.send(cmd).is_err() {
302            return Err(AppError::new(
303                Some("Connection manager dropped".to_string()),
304                None,
305                AppErrorType::InternalError,
306            ));
307        }
308
309        match command_timeout {
310            Some(dur) => match timeout(dur, rx).await {
311                Ok(Ok(res)) => res,
312                Ok(Err(_)) => Err(AppError::new(
313                    Some("Response channel closed".to_owned()),
314                    None,
315                    AppErrorType::InternalError,
316                )),
317                Err(_) => Err(AppError::new(
318                    Some("Timeout waiting for connection".to_owned()),
319                    None,
320                    AppErrorType::TimeoutError,
321                )),
322            },
323            None => match rx.await {
324                Ok(res) => res,
325                Err(_) => Err(AppError::new(
326                    Some("Response channel closed".to_owned()),
327                    None,
328                    AppErrorType::InternalError,
329                )),
330            },
331        }
332    }
333
334    pub async fn close(&self) -> Result<(), Box<dyn std::error::Error>> {
335        self.is_closing.store(true, Ordering::Release);
336        let (tx, rx) = oneshot::channel();
337        self.sender
338            .send(ConnectionCommand::Close { response: tx })?;
339        rx.await?;
340        Ok(())
341    }
342}