mod backoff_strategy;
mod events;
mod guard;
mod manager;
mod stats;
pub use backoff_strategy::BackoffStrategy;
pub use events::StatEvent;
pub use guard::ConnectionGuard;
pub use manager::Manager as ConnectionManager;
pub use stats::ClientStats;
pub use stats::ConnectionStats;
pub use stats::IpStats;
#[cfg(test)]
mod tests {
use tokio::{
sync::{Mutex, mpsc},
time::sleep,
};
use crate::{
ConnectionError, RelayError, StatsConfig, StatsManager,
config::{BackoffConfig, ConnectionConfig},
};
use super::*;
use std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
#[tokio::test]
async fn test_connection_limits() {
let config = ConnectionConfig {
max_connections: 2,
per_ip_limits: Some(1),
idle_timeout: Duration::from_secs(60),
error_timeout: Duration::from_secs(300),
connect_timeout: Duration::from_secs(5),
backoff: BackoffConfig::default(),
};
let (stats_tx, _) = mpsc::channel(100);
let manager = Arc::new(ConnectionManager::new(config, stats_tx));
let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
let conn1 = manager.accept_connection(addr1).await;
assert!(conn1.is_ok(), "First connection should succeed");
let conn2 = manager.accept_connection(addr1).await;
match conn2 {
Err(RelayError::Connection(ConnectionError::LimitExceeded(msg))) => {
assert!(
msg.contains("127.0.0.1:1234"),
"Wrong IP in error message: {}",
msg
);
return; }
other => panic!("Expected LimitExceeded error, got: {:?}", other),
}
}
#[tokio::test]
async fn test_connection_stats_after_limit() {
let config = ConnectionConfig {
max_connections: 1,
per_ip_limits: Some(1),
..Default::default()
};
let stats_config = StatsConfig::default();
let (stats_manager, stats_tx) = StatsManager::new(stats_config);
let stats_manager = Arc::new(Mutex::new(stats_manager));
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let stats_handle = tokio::spawn({
async move {
let mut stats_manager = stats_manager.lock().await;
stats_manager.run(shutdown_rx).await;
}
});
let manager = Arc::new(ConnectionManager::new(config, stats_tx));
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
let conn = manager.accept_connection(addr).await.unwrap();
let _err = manager.accept_connection(addr).await.unwrap_err();
let stats = manager.get_stats().await.unwrap();
assert_eq!(
stats.active_connections, 1,
"Should have one active connection"
);
assert_eq!(
stats.total_connections, 1,
"Should have one total connection"
);
drop(conn);
shutdown_tx.send(true).unwrap();
stats_handle.await.unwrap();
}
#[tokio::test]
async fn test_idle_connection_cleanup() {
let config = ConnectionConfig {
idle_timeout: Duration::from_millis(100),
..Default::default()
};
let stats_config = StatsConfig {
cleanup_interval: config.idle_timeout,
idle_timeout: config.idle_timeout,
error_timeout: config.error_timeout,
max_events_per_second: 10000, };
let (stats_manager, stats_tx) = StatsManager::new(stats_config);
let stats_manager = Arc::new(Mutex::new(stats_manager));
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let stats_handle = tokio::spawn({
async move {
let mut stats_manager = stats_manager.lock().await;
stats_manager.run(shutdown_rx).await;
}
});
let manager = Arc::new(ConnectionManager::new(config, stats_tx));
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
let _conn = manager.accept_connection(addr).await.unwrap();
let stats = manager.get_stats().await.unwrap();
assert_eq!(stats.active_connections, 1);
sleep(Duration::from_millis(200)).await;
assert!(manager.cleanup_idle_connections().await.is_ok());
let stats = manager.get_stats().await.unwrap();
assert_eq!(stats.active_connections, 0);
shutdown_tx.send(true).unwrap();
stats_handle.await.unwrap();
}
#[tokio::test]
async fn test_connection_guard_cleanup() {
let config = ConnectionConfig::default();
let stats_config = StatsConfig {
cleanup_interval: config.idle_timeout,
idle_timeout: config.idle_timeout,
error_timeout: config.error_timeout,
max_events_per_second: 10000, };
let (stats_manager, stats_tx) = StatsManager::new(stats_config);
let stats_manager = Arc::new(Mutex::new(stats_manager));
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let stats_handle = tokio::spawn({
async move {
let mut stats_manager = stats_manager.lock().await;
stats_manager.run(shutdown_rx).await;
}
});
let manager = Arc::new(ConnectionManager::new(config, stats_tx));
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
{
let guard = manager.accept_connection(addr).await.unwrap();
let stats = manager.get_stats().await.unwrap();
assert_eq!(stats.active_connections, 1);
drop(guard);
}
sleep(Duration::from_millis(50)).await;
let stats = manager.get_stats().await.unwrap();
assert_eq!(stats.active_connections, 0);
shutdown_tx.send(true).unwrap();
stats_handle.await.unwrap();
}
#[tokio::test]
async fn test_backoff_strategy() {
let config = BackoffConfig {
initial_interval: Duration::from_millis(100),
max_interval: Duration::from_secs(1),
multiplier: 2.0,
max_retries: 3,
};
let mut strategy = BackoffStrategy::new(config);
assert_eq!(strategy.next_backoff().unwrap().as_millis(), 100);
assert_eq!(strategy.next_backoff().unwrap().as_millis(), 200);
assert_eq!(strategy.next_backoff().unwrap().as_millis(), 400);
assert!(strategy.next_backoff().is_none());
strategy.reset();
assert_eq!(strategy.next_backoff().unwrap().as_millis(), 100);
}
#[tokio::test]
async fn test_connection_lifecycle() {
let config = ConnectionConfig::default();
let (stats_tx, mut stats_rx) = mpsc::channel(100);
let manager = Arc::new(ConnectionManager::new(config, stats_tx));
tokio::spawn(async move {
while let Some(event) = stats_rx.recv().await {
match event {
StatEvent::QueryConnectionStats { response_tx } => {
let _ = response_tx.send(ConnectionStats {
total_connections: 1,
active_connections: 1,
total_requests: 0,
total_errors: 0,
requests_per_second: 0.0,
avg_response_time_ms: 0,
per_ip_stats: HashMap::new(),
});
}
_ => {}
}
}
});
let addr = "127.0.0.1:8080".parse().unwrap();
let guard = manager.accept_connection(addr).await.unwrap();
assert_eq!(manager.get_connection_count(&addr).await, 1);
let stats = manager.get_stats().await.unwrap();
assert_eq!(stats.active_connections, 1);
drop(guard);
sleep(Duration::from_millis(100)).await;
assert_eq!(manager.get_connection_count(&addr).await, 0);
}
}