use crate::error::RustRabbitError;
use lapin::{Channel, Connection as LapinConnection, ConnectionProperties};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use url::Url;
#[derive(Debug, Clone)]
pub struct ConnectionConfig {
pub url: String,
pub connection_timeout: u64,
pub heartbeat: u64,
}
impl ConnectionConfig {
pub fn new(url: &str) -> Self {
Self {
url: url.to_string(),
connection_timeout: 30,
heartbeat: 60,
}
}
pub fn connection_timeout(mut self, timeout_secs: u64) -> Self {
self.connection_timeout = timeout_secs;
self
}
pub fn heartbeat(mut self, heartbeat_secs: u64) -> Self {
self.heartbeat = heartbeat_secs;
self
}
}
#[derive(Debug)]
pub struct Connection {
inner: Arc<RwLock<Option<LapinConnection>>>,
config: ConnectionConfig,
}
impl Connection {
pub async fn new(url: &str) -> Result<Arc<Self>, RustRabbitError> {
let config = ConnectionConfig::new(url);
Self::with_config(config).await
}
pub async fn with_config(config: ConnectionConfig) -> Result<Arc<Self>, RustRabbitError> {
let connection = Self {
inner: Arc::new(RwLock::new(None)),
config,
};
let arc_connection = Arc::new(connection);
arc_connection.connect().await?;
Ok(arc_connection)
}
pub fn url(&self) -> &str {
&self.config.url
}
pub async fn is_connected(&self) -> bool {
let conn_guard = self.inner.read().await;
if let Some(ref conn) = *conn_guard {
conn.status().connected()
} else {
false
}
}
pub async fn create_channel(&self) -> Result<Channel, RustRabbitError> {
if !self.is_connected().await {
warn!("Connection lost, attempting to reconnect...");
self.reconnect().await?;
}
let conn_guard = self.inner.read().await;
if let Some(ref conn) = *conn_guard {
let channel = conn.create_channel().await?;
debug!("Created new channel");
Ok(channel)
} else {
Err(RustRabbitError::Connection(
"No active connection".to_string(),
))
}
}
pub async fn reconnect(&self) -> Result<(), RustRabbitError> {
info!("Reconnecting to RabbitMQ...");
self.connect().await
}
pub async fn close(&self) -> Result<(), RustRabbitError> {
let mut conn_guard = self.inner.write().await;
if let Some(conn) = conn_guard.take() {
conn.close(200, "Normal shutdown").await?;
info!("Connection closed");
}
Ok(())
}
async fn connect(&self) -> Result<(), RustRabbitError> {
let _parsed_url = Url::parse(&self.config.url)
.map_err(|e| RustRabbitError::Configuration(format!("Invalid URL: {}", e)))?;
let properties = ConnectionProperties::default();
debug!("Connecting to RabbitMQ at {}", self.config.url);
let connection = LapinConnection::connect(&self.config.url, properties).await?;
info!("Successfully connected to RabbitMQ");
let mut conn_guard = self.inner.write().await;
*conn_guard = Some(connection);
Ok(())
}
}
impl Drop for Connection {
fn drop(&mut self) {
debug!("Connection dropped");
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_config() {
let config = ConnectionConfig::new("amqp://localhost:5672")
.connection_timeout(60)
.heartbeat(30);
assert_eq!(config.url, "amqp://localhost:5672");
assert_eq!(config.connection_timeout, 60);
assert_eq!(config.heartbeat, 30);
}
#[test]
fn test_connection_url_validation() {
let url = "amqp://localhost:5672";
assert!(url.contains("amqp://"));
assert!(url.contains("localhost"));
assert!(url.contains("5672"));
}
#[test]
fn test_invalid_url() {
let result = std::panic::catch_unwind(|| {
let _config = ConnectionConfig::new("invalid-url");
});
assert!(result.is_ok()); }
}