general_mq/amqp/
connection.rs

1use std::{
2    collections::HashMap,
3    error::Error as StdError,
4    str::FromStr,
5    sync::{Arc, Mutex},
6    time::Duration,
7};
8
9use amqprs::{
10    connection::{Connection as AmqprsConnection, OpenConnectionArguments},
11    error::Error as AmqprsError,
12    security::SecurityCredentials,
13    tls::TlsAdaptor,
14};
15use async_trait::async_trait;
16use lapin::uri::{AMQPScheme, AMQPUri};
17use tokio::{
18    task::{self, JoinHandle},
19    time,
20};
21
22use crate::{
23    ID_SIZE,
24    connection::{EventHandler, GmqConnection, Status},
25    randomstring,
26};
27
28/// Manages an AMQP connection.
29#[derive(Clone)]
30pub struct AmqpConnection {
31    /// Options of the connection.
32    opts: InnerOptions,
33    /// Connection status.
34    status: Arc<Mutex<Status>>,
35    /// Hold the connection instance.
36    conn: Arc<Mutex<Option<AmqprsConnection>>>,
37    /// Event handlers.
38    handlers: Arc<Mutex<HashMap<String, Arc<dyn EventHandler>>>>,
39    /// The event loop to manage and monitor the connection instance.
40    ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
41}
42
43/// The connection options.
44pub struct AmqpConnectionOptions {
45    /// Connection URI. Use `amqp|amqps://username:password@host:port/vhost` format.
46    ///
47    /// Default is `amqp://localhost/%2f`.
48    pub uri: String,
49    /// Connection timeout in milliseconds.
50    ///
51    /// Default or zero value is `3000`.
52    pub connect_timeout_millis: u64,
53    /// Time in milliseconds from disconnection to reconnection.
54    ///
55    /// Default or zero value is `1000`.
56    pub reconnect_millis: u64,
57}
58
59/// The validated options for management.
60#[derive(Clone)]
61struct InnerOptions {
62    /// The formatted URI resource.
63    args: OpenConnectionArguments,
64    /// Time in milliseconds from disconnection to reconnection.
65    reconnect_millis: u64,
66}
67
68/// Default connect timeout in milliseconds.
69const DEF_CONN_TIMEOUT_MS: u64 = 3000;
70/// Default reconnect time in milliseconds.
71const DEF_RECONN_TIME_MS: u64 = 1000;
72
73impl AmqpConnection {
74    /// Create a connection instance.
75    pub fn new(opts: AmqpConnectionOptions) -> Result<AmqpConnection, String> {
76        let mut uri = AMQPUri::from_str(opts.uri.as_str())?;
77        uri.query.connection_timeout = match opts.connect_timeout_millis {
78            0 => Some(DEF_CONN_TIMEOUT_MS),
79            _ => Some(opts.connect_timeout_millis),
80        };
81        if uri.vhost.len() == 0 {
82            uri.vhost = "/".to_string();
83        }
84        let mut args = OpenConnectionArguments::default();
85        args.host(&uri.authority.host)
86            .port(uri.authority.port)
87            .credentials(SecurityCredentials::new_plain(
88                &uri.authority.userinfo.username,
89                &uri.authority.userinfo.password,
90            ))
91            .virtual_host(&uri.vhost);
92        if uri.scheme == AMQPScheme::AMQPS {
93            let adaptor = match TlsAdaptor::without_client_auth(None, uri.authority.host.clone()) {
94                Err(e) => return Err(e.to_string()),
95                Ok(adaptor) => adaptor,
96            };
97            args.tls_adaptor(adaptor);
98        }
99
100        Ok(AmqpConnection {
101            opts: InnerOptions {
102                args,
103                reconnect_millis: match opts.reconnect_millis {
104                    0 => DEF_RECONN_TIME_MS,
105                    _ => opts.reconnect_millis,
106                },
107            },
108            status: Arc::new(Mutex::new(Status::Closed)),
109            conn: Arc::new(Mutex::new(None)),
110            handlers: Arc::new(Mutex::new(HashMap::<String, Arc<dyn EventHandler>>::new())),
111            ev_loop: Arc::new(Mutex::new(None)),
112        })
113    }
114
115    /// To get the raw AMQP connection instance for channel declaration.
116    pub(super) fn get_raw_connection(&self) -> Option<AmqprsConnection> {
117        match self.conn.lock().unwrap().as_ref() {
118            None => None,
119            Some(conn) => Some(conn.clone()),
120        }
121    }
122}
123
124#[async_trait]
125impl GmqConnection for AmqpConnection {
126    fn status(&self) -> Status {
127        *self.status.lock().unwrap()
128    }
129
130    fn add_handler(&mut self, handler: Arc<dyn EventHandler>) -> String {
131        let id = randomstring(ID_SIZE);
132        self.handlers.lock().unwrap().insert(id.clone(), handler);
133        id
134    }
135
136    fn remove_handler(&mut self, id: &str) {
137        self.handlers.lock().unwrap().remove(id);
138    }
139
140    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
141        {
142            let mut task_handle_mutex = self.ev_loop.lock().unwrap();
143            if (*task_handle_mutex).is_some() {
144                return Ok(());
145            }
146            *self.status.lock().unwrap() = Status::Connecting;
147            *task_handle_mutex = Some(create_event_loop(self));
148        }
149        Ok(())
150    }
151
152    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
153        match { self.ev_loop.lock().unwrap().take() } {
154            None => return Ok(()),
155            Some(handle) => handle.abort(),
156        }
157        {
158            *self.status.lock().unwrap() = Status::Closing;
159        }
160
161        let conn = { self.conn.lock().unwrap().take() };
162        let mut result: Result<(), AmqprsError> = Ok(());
163        if let Some(conn) = conn {
164            result = conn.close().await;
165        }
166
167        {
168            *self.status.lock().unwrap() = Status::Closed;
169        }
170        let handlers = { (*self.handlers.lock().unwrap()).clone() };
171        for (id, handler) in handlers {
172            let conn = Arc::new(self.clone());
173            task::spawn(async move {
174                handler.on_status(id.clone(), conn, Status::Closed).await;
175            });
176        }
177
178        result?;
179        Ok(())
180    }
181}
182
183impl Default for AmqpConnectionOptions {
184    fn default() -> Self {
185        AmqpConnectionOptions {
186            uri: "amqp://localhost".to_string(),
187            connect_timeout_millis: DEF_CONN_TIMEOUT_MS,
188            reconnect_millis: DEF_RECONN_TIME_MS,
189        }
190    }
191}
192
193/// To create an event loop runtime task.
194fn create_event_loop(conn: &AmqpConnection) -> JoinHandle<()> {
195    let this = Arc::new(conn.clone());
196    task::spawn(async move {
197        loop {
198            match this.status() {
199                Status::Closing | Status::Closed => break,
200                Status::Connecting => {
201                    let conn = match AmqprsConnection::open(&this.opts.args).await {
202                        Err(_) => {
203                            time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
204                            continue;
205                        }
206                        Ok(conn) => conn,
207                    };
208                    {
209                        let mut status_mutex = this.status.lock().unwrap();
210                        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
211                            continue;
212                        }
213                        *status_mutex = Status::Connected;
214                    }
215                    {
216                        *this.conn.lock().unwrap() = Some(conn);
217                    }
218
219                    let handlers = { (*this.handlers.lock().unwrap()).clone() };
220                    for (id, handler) in handlers {
221                        let conn = this.clone();
222                        task::spawn(async move {
223                            handler.on_status(id.clone(), conn, Status::Connected).await;
224                        });
225                    }
226                }
227                Status::Connected => {
228                    time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
229                    let mut to_disconnected = true;
230                    {
231                        if let Some(conn) = (*this.conn.lock().unwrap()).as_ref() {
232                            if conn.is_open() {
233                                to_disconnected = false;
234                            }
235                        }
236                    }
237                    if !to_disconnected {
238                        continue;
239                    }
240
241                    {
242                        let mut status_mutex = this.status.lock().unwrap();
243                        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
244                            continue;
245                        }
246                        *status_mutex = Status::Disconnected;
247                    }
248                    let conn = { this.conn.lock().unwrap().take() };
249                    if let Some(conn) = conn {
250                        let _ = conn.close().await;
251                    }
252
253                    let handlers = { (*this.handlers.lock().unwrap()).clone() };
254                    for (id, handler) in handlers {
255                        let conn = this.clone();
256                        task::spawn(async move {
257                            handler
258                                .on_status(id.clone(), conn, Status::Disconnected)
259                                .await;
260                        });
261                    }
262                    time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
263                    {
264                        let mut status_mutex = this.status.lock().unwrap();
265                        if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
266                            continue;
267                        }
268                        *status_mutex = Status::Connecting;
269                    }
270                    let handlers = { (*this.handlers.lock().unwrap()).clone() };
271                    for (id, handler) in handlers {
272                        let conn = this.clone();
273                        task::spawn(async move {
274                            handler
275                                .on_status(id.clone(), conn, Status::Connecting)
276                                .await;
277                        });
278                    }
279                }
280                Status::Disconnected => {
281                    *this.status.lock().unwrap() = Status::Connecting;
282                }
283            }
284        }
285    })
286}