general_mq/amqp/
connection.rs1use std::{
2 collections::HashMap,
3 error::Error as StdError,
4 str::FromStr,
5 sync::{Arc, Mutex},
6 time::Duration,
7};
8
9use amqprs::{
10 connection::{Connection as AmqprsConnection, OpenConnectionArguments},
11 error::Error as AmqprsError,
12 security::SecurityCredentials,
13 tls::TlsAdaptor,
14};
15use async_trait::async_trait;
16use lapin::uri::{AMQPScheme, AMQPUri};
17use tokio::{
18 task::{self, JoinHandle},
19 time,
20};
21
22use crate::{
23 ID_SIZE,
24 connection::{EventHandler, GmqConnection, Status},
25 randomstring,
26};
27
28#[derive(Clone)]
30pub struct AmqpConnection {
31 opts: InnerOptions,
33 status: Arc<Mutex<Status>>,
35 conn: Arc<Mutex<Option<AmqprsConnection>>>,
37 handlers: Arc<Mutex<HashMap<String, Arc<dyn EventHandler>>>>,
39 ev_loop: Arc<Mutex<Option<JoinHandle<()>>>>,
41}
42
43pub struct AmqpConnectionOptions {
45 pub uri: String,
49 pub connect_timeout_millis: u64,
53 pub reconnect_millis: u64,
57}
58
59#[derive(Clone)]
61struct InnerOptions {
62 args: OpenConnectionArguments,
64 reconnect_millis: u64,
66}
67
68const DEF_CONN_TIMEOUT_MS: u64 = 3000;
70const DEF_RECONN_TIME_MS: u64 = 1000;
72
73impl AmqpConnection {
74 pub fn new(opts: AmqpConnectionOptions) -> Result<AmqpConnection, String> {
76 let mut uri = AMQPUri::from_str(opts.uri.as_str())?;
77 uri.query.connection_timeout = match opts.connect_timeout_millis {
78 0 => Some(DEF_CONN_TIMEOUT_MS),
79 _ => Some(opts.connect_timeout_millis),
80 };
81 if uri.vhost.len() == 0 {
82 uri.vhost = "/".to_string();
83 }
84 let mut args = OpenConnectionArguments::default();
85 args.host(&uri.authority.host)
86 .port(uri.authority.port)
87 .credentials(SecurityCredentials::new_plain(
88 &uri.authority.userinfo.username,
89 &uri.authority.userinfo.password,
90 ))
91 .virtual_host(&uri.vhost);
92 if uri.scheme == AMQPScheme::AMQPS {
93 let adaptor = match TlsAdaptor::without_client_auth(None, uri.authority.host.clone()) {
94 Err(e) => return Err(e.to_string()),
95 Ok(adaptor) => adaptor,
96 };
97 args.tls_adaptor(adaptor);
98 }
99
100 Ok(AmqpConnection {
101 opts: InnerOptions {
102 args,
103 reconnect_millis: match opts.reconnect_millis {
104 0 => DEF_RECONN_TIME_MS,
105 _ => opts.reconnect_millis,
106 },
107 },
108 status: Arc::new(Mutex::new(Status::Closed)),
109 conn: Arc::new(Mutex::new(None)),
110 handlers: Arc::new(Mutex::new(HashMap::<String, Arc<dyn EventHandler>>::new())),
111 ev_loop: Arc::new(Mutex::new(None)),
112 })
113 }
114
115 pub(super) fn get_raw_connection(&self) -> Option<AmqprsConnection> {
117 match self.conn.lock().unwrap().as_ref() {
118 None => None,
119 Some(conn) => Some(conn.clone()),
120 }
121 }
122}
123
124#[async_trait]
125impl GmqConnection for AmqpConnection {
126 fn status(&self) -> Status {
127 *self.status.lock().unwrap()
128 }
129
130 fn add_handler(&mut self, handler: Arc<dyn EventHandler>) -> String {
131 let id = randomstring(ID_SIZE);
132 self.handlers.lock().unwrap().insert(id.clone(), handler);
133 id
134 }
135
136 fn remove_handler(&mut self, id: &str) {
137 self.handlers.lock().unwrap().remove(id);
138 }
139
140 fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
141 {
142 let mut task_handle_mutex = self.ev_loop.lock().unwrap();
143 if (*task_handle_mutex).is_some() {
144 return Ok(());
145 }
146 *self.status.lock().unwrap() = Status::Connecting;
147 *task_handle_mutex = Some(create_event_loop(self));
148 }
149 Ok(())
150 }
151
152 async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
153 match { self.ev_loop.lock().unwrap().take() } {
154 None => return Ok(()),
155 Some(handle) => handle.abort(),
156 }
157 {
158 *self.status.lock().unwrap() = Status::Closing;
159 }
160
161 let conn = { self.conn.lock().unwrap().take() };
162 let mut result: Result<(), AmqprsError> = Ok(());
163 if let Some(conn) = conn {
164 result = conn.close().await;
165 }
166
167 {
168 *self.status.lock().unwrap() = Status::Closed;
169 }
170 let handlers = { (*self.handlers.lock().unwrap()).clone() };
171 for (id, handler) in handlers {
172 let conn = Arc::new(self.clone());
173 task::spawn(async move {
174 handler.on_status(id.clone(), conn, Status::Closed).await;
175 });
176 }
177
178 result?;
179 Ok(())
180 }
181}
182
183impl Default for AmqpConnectionOptions {
184 fn default() -> Self {
185 AmqpConnectionOptions {
186 uri: "amqp://localhost".to_string(),
187 connect_timeout_millis: DEF_CONN_TIMEOUT_MS,
188 reconnect_millis: DEF_RECONN_TIME_MS,
189 }
190 }
191}
192
193fn create_event_loop(conn: &AmqpConnection) -> JoinHandle<()> {
195 let this = Arc::new(conn.clone());
196 task::spawn(async move {
197 loop {
198 match this.status() {
199 Status::Closing | Status::Closed => break,
200 Status::Connecting => {
201 let conn = match AmqprsConnection::open(&this.opts.args).await {
202 Err(_) => {
203 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
204 continue;
205 }
206 Ok(conn) => conn,
207 };
208 {
209 let mut status_mutex = this.status.lock().unwrap();
210 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
211 continue;
212 }
213 *status_mutex = Status::Connected;
214 }
215 {
216 *this.conn.lock().unwrap() = Some(conn);
217 }
218
219 let handlers = { (*this.handlers.lock().unwrap()).clone() };
220 for (id, handler) in handlers {
221 let conn = this.clone();
222 task::spawn(async move {
223 handler.on_status(id.clone(), conn, Status::Connected).await;
224 });
225 }
226 }
227 Status::Connected => {
228 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
229 let mut to_disconnected = true;
230 {
231 if let Some(conn) = (*this.conn.lock().unwrap()).as_ref() {
232 if conn.is_open() {
233 to_disconnected = false;
234 }
235 }
236 }
237 if !to_disconnected {
238 continue;
239 }
240
241 {
242 let mut status_mutex = this.status.lock().unwrap();
243 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
244 continue;
245 }
246 *status_mutex = Status::Disconnected;
247 }
248 let conn = { this.conn.lock().unwrap().take() };
249 if let Some(conn) = conn {
250 let _ = conn.close().await;
251 }
252
253 let handlers = { (*this.handlers.lock().unwrap()).clone() };
254 for (id, handler) in handlers {
255 let conn = this.clone();
256 task::spawn(async move {
257 handler
258 .on_status(id.clone(), conn, Status::Disconnected)
259 .await;
260 });
261 }
262 time::sleep(Duration::from_millis(this.opts.reconnect_millis)).await;
263 {
264 let mut status_mutex = this.status.lock().unwrap();
265 if *status_mutex == Status::Closing || *status_mutex == Status::Closed {
266 continue;
267 }
268 *status_mutex = Status::Connecting;
269 }
270 let handlers = { (*this.handlers.lock().unwrap()).clone() };
271 for (id, handler) in handlers {
272 let conn = this.clone();
273 task::spawn(async move {
274 handler
275 .on_status(id.clone(), conn, Status::Connecting)
276 .await;
277 });
278 }
279 }
280 Status::Disconnected => {
281 *this.status.lock().unwrap() = Status::Connecting;
282 }
283 }
284 }
285 })
286}