rust_rabbit/
connection.rs

1//! Simplified Connection Management for rust-rabbit
2//!
3//! Basic RabbitMQ connection handling without complex pooling or health monitoring.
4//! Just simple, reliable connection management.
5
6use crate::error::RustRabbitError;
7use lapin::{Channel, Connection as LapinConnection, ConnectionProperties};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10use tracing::{debug, info, warn};
11use url::Url;
12
13/// Simple connection configuration
14#[derive(Debug, Clone)]
15pub struct ConnectionConfig {
16    /// Connection URL (e.g., "amqp://user:pass@localhost:5672/vhost")
17    pub url: String,
18
19    /// Connection timeout in seconds
20    pub connection_timeout: u64,
21
22    /// Heartbeat interval in seconds (0 to disable)
23    pub heartbeat: u64,
24}
25
26impl ConnectionConfig {
27    /// Create a new connection config with URL
28    pub fn new(url: &str) -> Self {
29        Self {
30            url: url.to_string(),
31            connection_timeout: 30,
32            heartbeat: 60,
33        }
34    }
35
36    /// Set connection timeout
37    pub fn connection_timeout(mut self, timeout_secs: u64) -> Self {
38        self.connection_timeout = timeout_secs;
39        self
40    }
41
42    /// Set heartbeat interval (0 to disable)
43    pub fn heartbeat(mut self, heartbeat_secs: u64) -> Self {
44        self.heartbeat = heartbeat_secs;
45        self
46    }
47}
48
49/// Simple RabbitMQ connection wrapper
50#[derive(Debug)]
51pub struct Connection {
52    inner: Arc<RwLock<Option<LapinConnection>>>,
53    config: ConnectionConfig,
54}
55
56impl Connection {
57    /// Create a new connection
58    ///
59    /// # Arguments
60    /// * `url` - RabbitMQ connection URL (e.g., "amqp://localhost:5672")
61    ///
62    /// # Example
63    /// ```rust,no_run
64    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
65    /// use rust_rabbit::Connection;
66    ///
67    /// let connection = Connection::new("amqp://guest:guest@localhost:5672").await?;
68    /// # Ok(())
69    /// # }
70    /// ```
71    pub async fn new(url: &str) -> Result<Arc<Self>, RustRabbitError> {
72        let config = ConnectionConfig::new(url);
73        Self::with_config(config).await
74    }
75
76    /// Create a new connection with custom configuration
77    pub async fn with_config(config: ConnectionConfig) -> Result<Arc<Self>, RustRabbitError> {
78        let connection = Self {
79            inner: Arc::new(RwLock::new(None)),
80            config,
81        };
82
83        let arc_connection = Arc::new(connection);
84        arc_connection.connect().await?;
85
86        Ok(arc_connection)
87    }
88
89    /// Get connection URL for debugging
90    pub fn url(&self) -> &str {
91        &self.config.url
92    }
93
94    /// Check if connection is active
95    pub async fn is_connected(&self) -> bool {
96        let conn_guard = self.inner.read().await;
97        if let Some(ref conn) = *conn_guard {
98            conn.status().connected()
99        } else {
100            false
101        }
102    }
103
104    /// Create a new channel
105    ///
106    /// Automatically reconnects if the connection is lost.
107    pub async fn create_channel(&self) -> Result<Channel, RustRabbitError> {
108        // Check if we need to reconnect
109        if !self.is_connected().await {
110            warn!("Connection lost, attempting to reconnect...");
111            self.reconnect().await?;
112        }
113
114        let conn_guard = self.inner.read().await;
115        if let Some(ref conn) = *conn_guard {
116            let channel = conn.create_channel().await?;
117            debug!("Created new channel");
118            Ok(channel)
119        } else {
120            Err(RustRabbitError::Connection(
121                "No active connection".to_string(),
122            ))
123        }
124    }
125
126    /// Manually reconnect
127    pub async fn reconnect(&self) -> Result<(), RustRabbitError> {
128        info!("Reconnecting to RabbitMQ...");
129        self.connect().await
130    }
131
132    /// Close the connection
133    pub async fn close(&self) -> Result<(), RustRabbitError> {
134        let mut conn_guard = self.inner.write().await;
135        if let Some(conn) = conn_guard.take() {
136            conn.close(200, "Normal shutdown").await?;
137            info!("Connection closed");
138        }
139        Ok(())
140    }
141
142    /// Internal connection establishment
143    async fn connect(&self) -> Result<(), RustRabbitError> {
144        // Validate URL
145        let _parsed_url = Url::parse(&self.config.url)
146            .map_err(|e| RustRabbitError::Configuration(format!("Invalid URL: {}", e)))?;
147
148        // Create connection properties (heartbeat removed - not available in this lapin version)
149        let properties = ConnectionProperties::default();
150
151        // Establish connection
152        debug!("Connecting to RabbitMQ at {}", self.config.url);
153
154        let connection = LapinConnection::connect(&self.config.url, properties).await?;
155
156        info!("Successfully connected to RabbitMQ");
157
158        // Store connection
159        let mut conn_guard = self.inner.write().await;
160        *conn_guard = Some(connection);
161
162        Ok(())
163    }
164}
165
166impl Drop for Connection {
167    fn drop(&mut self) {
168        // Connection will be automatically closed when dropped
169        debug!("Connection dropped");
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    #[test]
178    fn test_connection_config() {
179        let config = ConnectionConfig::new("amqp://localhost:5672")
180            .connection_timeout(60)
181            .heartbeat(30);
182
183        assert_eq!(config.url, "amqp://localhost:5672");
184        assert_eq!(config.connection_timeout, 60);
185        assert_eq!(config.heartbeat, 30);
186    }
187
188    #[test]
189    fn test_connection_url_validation() {
190        // Test basic URL structure validation
191        let url = "amqp://localhost:5672";
192        assert!(url.contains("amqp://"));
193        assert!(url.contains("localhost"));
194        assert!(url.contains("5672"));
195    }
196
197    #[test]
198    fn test_invalid_url() {
199        let result = std::panic::catch_unwind(|| {
200            let _config = ConnectionConfig::new("invalid-url");
201        });
202
203        assert!(result.is_ok()); // Config creation should not panic, validation happens on connect
204    }
205}