1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use crate::configuration::config_model::ConnectionProperties as LocalProperties;
use futures::future::{BoxFuture, FutureExt};
use lapin::Channel;
use lapin::{Connection, ConnectionProperties};
use std::{thread, time};
#[derive(Debug)]
pub struct GenericError<T> {
pub why: ErrorType,
pub last_reason: T,
}
#[derive(Debug)]
pub enum ErrorType {
MaximumConnectionRetriesReached,
CannotCreateChannel,
Unknown,
}
pub fn get_connection<'a>(
addr: &'a str,
retry: u64,
total_retries: u64,
) -> BoxFuture<'a, Result<Connection, GenericError<lapin::Error>>> {
return Box::pin(
async move {
let con_promise = Connection::connect(
&addr,
ConnectionProperties::default().with_default_executor(8),
);
let conn_res = con_promise.await;
let connection = match conn_res {
Ok(c) => Ok(c),
Err(why) => {
println!("[{}] - {:?}", line!(), why);
if retry > total_retries {
GenericError {
why: ErrorType::MaximumConnectionRetriesReached,
last_reason: why,
};
}
let hibernate = time::Duration::from_millis(retry * 1000);
thread::sleep(hibernate);
let c = get_connection(addr, retry + 1, total_retries);
c.await
}
};
connection
}
.boxed(),
);
}
pub fn build_url(config: LocalProperties) -> String {
let url = format!(
"amqp://{}:{}@{}:{}/{}?heartbeat={}&connection_timeout={}",
config.username,
config.password,
config.host,
config.port,
config.vhost,
config.heartbeat,
config.connection_timeout
);
return url;
}
pub async fn create_channel<'a>(
addr: &'a str,
total_retries: u64,
) -> Result<Channel, GenericError<lapin::Error>> {
let conn = get_connection(&addr, 0, total_retries).await?;
return match conn.create_channel().await {
Ok(ch) => Ok(ch),
Err(why) => Err(GenericError {
why: ErrorType::CannotCreateChannel,
last_reason: why,
}),
};
}
#[cfg(test)]
mod tests {
use crate::configuration::config_model::*;
use crate::consumer::connection_manager::build_url;
#[test]
fn amqp_url_generated_succesfully() {
let url = build_url(ConnectionProperties::default());
assert_eq!(
"amqp://guest:guest@127.0.0.1:5672//?heartbeat=10&connection_timeout=1000",
url
);
}
}