1mod backoff_strategy;
2mod events;
3mod guard;
4mod manager;
5mod stats;
6
7pub use backoff_strategy::BackoffStrategy;
8pub use events::StatEvent;
9pub use guard::ConnectionGuard;
10pub use manager::Manager as ConnectionManager;
11pub use stats::ClientStats;
12pub use stats::ConnectionStats;
13pub use stats::IpStats;
14
15#[cfg(test)]
16mod tests {
17 use tokio::{
18 sync::{Mutex, mpsc},
19 time::sleep,
20 };
21
22 use crate::{
23 ConnectionError, RelayError, StatsConfig, StatsManager,
24 config::{BackoffConfig, ConnectionConfig},
25 };
26
27 use super::*;
28 use std::{
29 collections::HashMap,
30 net::{IpAddr, Ipv4Addr, SocketAddr},
31 sync::Arc,
32 time::Duration,
33 };
34
35 #[tokio::test]
36 async fn test_connection_limits() {
37 let config = ConnectionConfig {
38 max_connections: 2,
39 per_ip_limits: Some(1),
40 idle_timeout: Duration::from_secs(60),
41 error_timeout: Duration::from_secs(300),
42 connect_timeout: Duration::from_secs(5),
43 backoff: BackoffConfig::default(),
44 };
45
46 let (stats_tx, _) = mpsc::channel(100);
47 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
48 let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
49
50 let conn1 = manager.accept_connection(addr1).await;
52 assert!(conn1.is_ok(), "First connection should succeed");
53
54 let conn2 = manager.accept_connection(addr1).await;
56 match conn2 {
57 Err(RelayError::Connection(ConnectionError::LimitExceeded(msg))) => {
58 assert!(
59 msg.contains("127.0.0.1:1234"),
60 "Wrong IP in error message: {}",
61 msg
62 );
63 return; }
65 other => panic!("Expected LimitExceeded error, got: {:?}", other),
66 }
67 }
68
69 #[tokio::test]
70 async fn test_connection_stats_after_limit() {
71 let config = ConnectionConfig {
72 max_connections: 1,
73 per_ip_limits: Some(1),
74 ..Default::default()
75 };
76
77 let stats_config = StatsConfig::default();
78
79 let (stats_manager, stats_tx) = StatsManager::new(stats_config);
80 let stats_manager = Arc::new(Mutex::new(stats_manager));
81
82 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
83
84 let stats_handle = tokio::spawn({
85 async move {
86 let mut stats_manager = stats_manager.lock().await;
87 stats_manager.run(shutdown_rx).await;
88 }
89 });
90
91 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
92
93 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
94
95 let conn = manager.accept_connection(addr).await.unwrap();
97
98 let _err = manager.accept_connection(addr).await.unwrap_err();
100
101 let stats = manager.get_stats().await.unwrap();
103
104 assert_eq!(
105 stats.active_connections, 1,
106 "Should have one active connection"
107 );
108 assert_eq!(
109 stats.total_connections, 1,
110 "Should have one total connection"
111 );
112
113 drop(conn);
115
116 shutdown_tx.send(true).unwrap();
117 stats_handle.await.unwrap();
118 }
119
120 #[tokio::test]
121 async fn test_idle_connection_cleanup() {
122 let config = ConnectionConfig {
123 idle_timeout: Duration::from_millis(100),
124 ..Default::default()
125 };
126
127 let stats_config = StatsConfig {
128 cleanup_interval: config.idle_timeout,
129 idle_timeout: config.idle_timeout,
130 error_timeout: config.error_timeout,
131 max_events_per_second: 10000, };
133
134 let (stats_manager, stats_tx) = StatsManager::new(stats_config);
135 let stats_manager = Arc::new(Mutex::new(stats_manager));
136
137 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
138
139 let stats_handle = tokio::spawn({
140 async move {
141 let mut stats_manager = stats_manager.lock().await;
142 stats_manager.run(shutdown_rx).await;
143 }
144 });
145
146 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
147 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
148
149 let _conn = manager.accept_connection(addr).await.unwrap();
151
152 let stats = manager.get_stats().await.unwrap();
154 assert_eq!(stats.active_connections, 1);
155
156 sleep(Duration::from_millis(200)).await;
158
159 assert!(manager.cleanup_idle_connections().await.is_ok());
161
162 let stats = manager.get_stats().await.unwrap();
164 assert_eq!(stats.active_connections, 0);
165
166 shutdown_tx.send(true).unwrap();
167 stats_handle.await.unwrap();
168 }
169
170 #[tokio::test]
171 async fn test_connection_guard_cleanup() {
172 let config = ConnectionConfig::default();
173
174 let stats_config = StatsConfig {
175 cleanup_interval: config.idle_timeout,
176 idle_timeout: config.idle_timeout,
177 error_timeout: config.error_timeout,
178 max_events_per_second: 10000, };
180
181 let (stats_manager, stats_tx) = StatsManager::new(stats_config);
182 let stats_manager = Arc::new(Mutex::new(stats_manager));
183
184 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
185
186 let stats_handle = tokio::spawn({
187 async move {
188 let mut stats_manager = stats_manager.lock().await;
189 stats_manager.run(shutdown_rx).await;
190 }
191 });
192
193 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
194
195 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
196
197 {
198 let guard = manager.accept_connection(addr).await.unwrap();
199 let stats = manager.get_stats().await.unwrap();
200 assert_eq!(stats.active_connections, 1);
201
202 drop(guard);
204 }
205
206 sleep(Duration::from_millis(50)).await;
208
209 let stats = manager.get_stats().await.unwrap();
210 assert_eq!(stats.active_connections, 0);
211
212 shutdown_tx.send(true).unwrap();
213 stats_handle.await.unwrap();
214 }
215
216 #[tokio::test]
217 async fn test_backoff_strategy() {
218 let config = BackoffConfig {
219 initial_interval: Duration::from_millis(100),
220 max_interval: Duration::from_secs(1),
221 multiplier: 2.0,
222 max_retries: 3,
223 };
224
225 let mut strategy = BackoffStrategy::new(config);
226
227 assert_eq!(strategy.next_backoff().unwrap().as_millis(), 100);
229 assert_eq!(strategy.next_backoff().unwrap().as_millis(), 200);
230 assert_eq!(strategy.next_backoff().unwrap().as_millis(), 400);
231
232 assert!(strategy.next_backoff().is_none());
234
235 strategy.reset();
237 assert_eq!(strategy.next_backoff().unwrap().as_millis(), 100);
238 }
239
240 #[tokio::test]
241 async fn test_connection_lifecycle() {
242 let config = ConnectionConfig::default();
243 let (stats_tx, mut stats_rx) = mpsc::channel(100);
244 let manager = Arc::new(ConnectionManager::new(config, stats_tx));
245
246 tokio::spawn(async move {
248 while let Some(event) = stats_rx.recv().await {
249 match event {
250 StatEvent::QueryConnectionStats { response_tx } => {
251 let _ = response_tx.send(ConnectionStats {
252 total_connections: 1,
253 active_connections: 1,
254 total_requests: 0,
255 total_errors: 0,
256 requests_per_second: 0.0,
257 avg_response_time_ms: 0,
258 per_ip_stats: HashMap::new(),
259 });
260 }
261 _ => {}
262 }
263 }
264 });
265
266 let addr = "127.0.0.1:8080".parse().unwrap();
267
268 let guard = manager.accept_connection(addr).await.unwrap();
270 assert_eq!(manager.get_connection_count(&addr).await, 1);
271
272 let stats = manager.get_stats().await.unwrap();
274 assert_eq!(stats.active_connections, 1);
275
276 drop(guard);
278 sleep(Duration::from_millis(100)).await;
279 assert_eq!(manager.get_connection_count(&addr).await, 0);
280 }
281}