rust_rabbit/
connection.rs1use crate::error::RustRabbitError;
7use lapin::{Channel, Connection as LapinConnection, ConnectionProperties};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{debug, info, warn};
11use url::Url;
12
13#[derive(Debug, Clone)]
15pub struct ConnectionConfig {
16 pub url: String,
18
19 pub connection_timeout: u64,
21
22 pub heartbeat: u64,
24}
25
26impl ConnectionConfig {
27 pub fn new(url: &str) -> Self {
29 Self {
30 url: url.to_string(),
31 connection_timeout: 30,
32 heartbeat: 60,
33 }
34 }
35
36 pub fn connection_timeout(mut self, timeout_secs: u64) -> Self {
38 self.connection_timeout = timeout_secs;
39 self
40 }
41
42 pub fn heartbeat(mut self, heartbeat_secs: u64) -> Self {
44 self.heartbeat = heartbeat_secs;
45 self
46 }
47}
48
49#[derive(Debug)]
51pub struct Connection {
52 inner: Arc<RwLock<Option<LapinConnection>>>,
53 config: ConnectionConfig,
54}
55
56impl Connection {
57 pub async fn new(url: &str) -> Result<Arc<Self>, RustRabbitError> {
72 let config = ConnectionConfig::new(url);
73 Self::with_config(config).await
74 }
75
76 pub async fn with_config(config: ConnectionConfig) -> Result<Arc<Self>, RustRabbitError> {
78 let connection = Self {
79 inner: Arc::new(RwLock::new(None)),
80 config,
81 };
82
83 let arc_connection = Arc::new(connection);
84 arc_connection.connect().await?;
85
86 Ok(arc_connection)
87 }
88
89 pub fn url(&self) -> &str {
91 &self.config.url
92 }
93
94 pub async fn is_connected(&self) -> bool {
96 let conn_guard = self.inner.read().await;
97 if let Some(ref conn) = *conn_guard {
98 conn.status().connected()
99 } else {
100 false
101 }
102 }
103
104 pub async fn create_channel(&self) -> Result<Channel, RustRabbitError> {
108 if !self.is_connected().await {
110 warn!("Connection lost, attempting to reconnect...");
111 self.reconnect().await?;
112 }
113
114 let conn_guard = self.inner.read().await;
115 if let Some(ref conn) = *conn_guard {
116 let channel = conn.create_channel().await?;
117 debug!("Created new channel");
118 Ok(channel)
119 } else {
120 Err(RustRabbitError::Connection(
121 "No active connection".to_string(),
122 ))
123 }
124 }
125
126 pub async fn reconnect(&self) -> Result<(), RustRabbitError> {
128 info!("Reconnecting to RabbitMQ...");
129 self.connect().await
130 }
131
132 pub async fn close(&self) -> Result<(), RustRabbitError> {
134 let mut conn_guard = self.inner.write().await;
135 if let Some(conn) = conn_guard.take() {
136 conn.close(200, "Normal shutdown").await?;
137 info!("Connection closed");
138 }
139 Ok(())
140 }
141
142 async fn connect(&self) -> Result<(), RustRabbitError> {
144 let _parsed_url = Url::parse(&self.config.url)
146 .map_err(|e| RustRabbitError::Configuration(format!("Invalid URL: {}", e)))?;
147
148 let properties = ConnectionProperties::default();
150
151 debug!("Connecting to RabbitMQ at {}", self.config.url);
153
154 let connection = LapinConnection::connect(&self.config.url, properties).await?;
155
156 info!("Successfully connected to RabbitMQ");
157
158 let mut conn_guard = self.inner.write().await;
160 *conn_guard = Some(connection);
161
162 Ok(())
163 }
164}
165
166impl Drop for Connection {
167 fn drop(&mut self) {
168 debug!("Connection dropped");
170 }
171}
172
173pub struct ConnectionBuilder {
175 config: ConnectionConfig,
176}
177
178impl ConnectionBuilder {
179 pub fn new(url: &str) -> Self {
181 Self {
182 config: ConnectionConfig::new(url),
183 }
184 }
185
186 pub fn connection_timeout(mut self, timeout_secs: u64) -> Self {
188 self.config = self.config.connection_timeout(timeout_secs);
189 self
190 }
191
192 pub fn heartbeat(mut self, heartbeat_secs: u64) -> Self {
194 self.config = self.config.heartbeat(heartbeat_secs);
195 self
196 }
197
198 pub async fn connect(self) -> Result<Arc<Connection>, RustRabbitError> {
200 Connection::with_config(self.config).await
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 #[test]
209 fn test_connection_config() {
210 let config = ConnectionConfig::new("amqp://localhost:5672")
211 .connection_timeout(60)
212 .heartbeat(30);
213
214 assert_eq!(config.url, "amqp://localhost:5672");
215 assert_eq!(config.connection_timeout, 60);
216 assert_eq!(config.heartbeat, 30);
217 }
218
219 #[test]
220 fn test_connection_builder() {
221 let builder = ConnectionBuilder::new("amqp://localhost:5672")
222 .connection_timeout(45)
223 .heartbeat(20);
224
225 assert_eq!(builder.config.url, "amqp://localhost:5672");
226 assert_eq!(builder.config.connection_timeout, 45);
227 assert_eq!(builder.config.heartbeat, 20);
228 }
229
230 #[test]
231 fn test_invalid_url() {
232 let result = std::panic::catch_unwind(|| {
233 let _config = ConnectionConfig::new("invalid-url");
234 });
235
236 assert!(result.is_ok()); }
238}