modbus_relay/connection/
mod.rs

1mod backoff_strategy;
2mod events;
3mod guard;
4mod manager;
5mod stats;
6
7pub use backoff_strategy::BackoffStrategy;
8pub use events::StatEvent;
9pub use guard::ConnectionGuard;
10pub use manager::Manager as ConnectionManager;
11pub use stats::ClientStats;
12pub use stats::ConnectionStats;
13pub use stats::IpStats;
14
15#[cfg(test)]
16mod tests {
17    use tokio::{
18        sync::{Mutex, mpsc},
19        time::sleep,
20    };
21
22    use crate::{
23        ConnectionError, RelayError, StatsConfig, StatsManager,
24        config::{BackoffConfig, ConnectionConfig},
25    };
26
27    use super::*;
28    use std::{
29        collections::HashMap,
30        net::{IpAddr, Ipv4Addr, SocketAddr},
31        sync::Arc,
32        time::Duration,
33    };
34
35    #[tokio::test]
36    async fn test_connection_limits() {
37        let config = ConnectionConfig {
38            max_connections: 2,
39            per_ip_limits: Some(1),
40            idle_timeout: Duration::from_secs(60),
41            error_timeout: Duration::from_secs(300),
42            connect_timeout: Duration::from_secs(5),
43            backoff: BackoffConfig::default(),
44        };
45
46        let (stats_tx, _) = mpsc::channel(100);
47        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
48        let addr1 = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
49
50        // First connection should succeed
51        let conn1 = manager.accept_connection(addr1).await;
52        assert!(conn1.is_ok(), "First connection should succeed");
53
54        // Second connection from same IP should fail immediately (per-IP limit)
55        let conn2 = manager.accept_connection(addr1).await;
56        match conn2 {
57            Err(RelayError::Connection(ConnectionError::LimitExceeded(msg))) => {
58                assert!(
59                    msg.contains("127.0.0.1:1234"),
60                    "Wrong IP in error message: {}",
61                    msg
62                );
63                return; // <-- Return here after checking error
64            }
65            other => panic!("Expected LimitExceeded error, got: {:?}", other),
66        }
67    }
68
69    #[tokio::test]
70    async fn test_connection_stats_after_limit() {
71        let config = ConnectionConfig {
72            max_connections: 1,
73            per_ip_limits: Some(1),
74            ..Default::default()
75        };
76
77        let stats_config = StatsConfig::default();
78
79        let (stats_manager, stats_tx) = StatsManager::new(stats_config);
80        let stats_manager = Arc::new(Mutex::new(stats_manager));
81
82        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
83
84        let stats_handle = tokio::spawn({
85            async move {
86                let mut stats_manager = stats_manager.lock().await;
87                stats_manager.run(shutdown_rx).await;
88            }
89        });
90
91        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
92
93        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
94
95        // First connection succeeds
96        let conn = manager.accept_connection(addr).await.unwrap();
97
98        // Second connection fails
99        let _err = manager.accept_connection(addr).await.unwrap_err();
100
101        // Check stats
102        let stats = manager.get_stats().await.unwrap();
103
104        assert_eq!(
105            stats.active_connections, 1,
106            "Should have one active connection"
107        );
108        assert_eq!(
109            stats.total_connections, 1,
110            "Should have one total connection"
111        );
112
113        // Cleanup
114        drop(conn);
115
116        shutdown_tx.send(true).unwrap();
117        stats_handle.await.unwrap();
118    }
119
120    #[tokio::test]
121    async fn test_idle_connection_cleanup() {
122        let config = ConnectionConfig {
123            idle_timeout: Duration::from_millis(100),
124            ..Default::default()
125        };
126
127        let stats_config = StatsConfig {
128            cleanup_interval: config.idle_timeout,
129            idle_timeout: config.idle_timeout,
130            error_timeout: config.error_timeout,
131            max_events_per_second: 10000, // TODO(aljen): Make configurable
132        };
133
134        let (stats_manager, stats_tx) = StatsManager::new(stats_config);
135        let stats_manager = Arc::new(Mutex::new(stats_manager));
136
137        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
138
139        let stats_handle = tokio::spawn({
140            async move {
141                let mut stats_manager = stats_manager.lock().await;
142                stats_manager.run(shutdown_rx).await;
143            }
144        });
145
146        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
147        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
148
149        // Create a connection
150        let _conn = manager.accept_connection(addr).await.unwrap();
151
152        // Verify connection is active
153        let stats = manager.get_stats().await.unwrap();
154        assert_eq!(stats.active_connections, 1);
155
156        // Wait for connection to become idle
157        sleep(Duration::from_millis(200)).await;
158
159        // Cleanup should work
160        assert!(manager.cleanup_idle_connections().await.is_ok());
161
162        // Verify connection was cleaned up
163        let stats = manager.get_stats().await.unwrap();
164        assert_eq!(stats.active_connections, 0);
165
166        shutdown_tx.send(true).unwrap();
167        stats_handle.await.unwrap();
168    }
169
170    #[tokio::test]
171    async fn test_connection_guard_cleanup() {
172        let config = ConnectionConfig::default();
173
174        let stats_config = StatsConfig {
175            cleanup_interval: config.idle_timeout,
176            idle_timeout: config.idle_timeout,
177            error_timeout: config.error_timeout,
178            max_events_per_second: 10000, // TODO(aljen): Make configurable
179        };
180
181        let (stats_manager, stats_tx) = StatsManager::new(stats_config);
182        let stats_manager = Arc::new(Mutex::new(stats_manager));
183
184        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
185
186        let stats_handle = tokio::spawn({
187            async move {
188                let mut stats_manager = stats_manager.lock().await;
189                stats_manager.run(shutdown_rx).await;
190            }
191        });
192
193        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
194
195        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1234);
196
197        {
198            let guard = manager.accept_connection(addr).await.unwrap();
199            let stats = manager.get_stats().await.unwrap();
200            assert_eq!(stats.active_connections, 1);
201
202            // Guard should clean up when dropped
203            drop(guard);
204        }
205
206        // Wait a bit for async cleanup
207        sleep(Duration::from_millis(50)).await;
208
209        let stats = manager.get_stats().await.unwrap();
210        assert_eq!(stats.active_connections, 0);
211
212        shutdown_tx.send(true).unwrap();
213        stats_handle.await.unwrap();
214    }
215
216    #[tokio::test]
217    async fn test_backoff_strategy() {
218        let config = BackoffConfig {
219            initial_interval: Duration::from_millis(100),
220            max_interval: Duration::from_secs(1),
221            multiplier: 2.0,
222            max_retries: 3,
223        };
224
225        let mut strategy = BackoffStrategy::new(config);
226
227        // The first attempts should return increasing values
228        assert_eq!(strategy.next_backoff().unwrap().as_millis(), 100);
229        assert_eq!(strategy.next_backoff().unwrap().as_millis(), 200);
230        assert_eq!(strategy.next_backoff().unwrap().as_millis(), 400);
231
232        // After exhausting attempts, it should return None
233        assert!(strategy.next_backoff().is_none());
234
235        // After reset, it should start from the beginning
236        strategy.reset();
237        assert_eq!(strategy.next_backoff().unwrap().as_millis(), 100);
238    }
239
240    #[tokio::test]
241    async fn test_connection_lifecycle() {
242        let config = ConnectionConfig::default();
243        let (stats_tx, mut stats_rx) = mpsc::channel(100);
244        let manager = Arc::new(ConnectionManager::new(config, stats_tx));
245
246        // Handle stats events in background
247        tokio::spawn(async move {
248            while let Some(event) = stats_rx.recv().await {
249                match event {
250                    StatEvent::QueryConnectionStats { response_tx } => {
251                        let _ = response_tx.send(ConnectionStats {
252                            total_connections: 1,
253                            active_connections: 1,
254                            total_requests: 0,
255                            total_errors: 0,
256                            requests_per_second: 0.0,
257                            avg_response_time_ms: 0,
258                            per_ip_stats: HashMap::new(),
259                        });
260                    }
261                    _ => {}
262                }
263            }
264        });
265
266        let addr = "127.0.0.1:8080".parse().unwrap();
267
268        // Test connection acceptance
269        let guard = manager.accept_connection(addr).await.unwrap();
270        assert_eq!(manager.get_connection_count(&addr).await, 1);
271
272        // Test statistics
273        let stats = manager.get_stats().await.unwrap();
274        assert_eq!(stats.active_connections, 1);
275
276        // Test connection cleanup
277        drop(guard);
278        sleep(Duration::from_millis(100)).await;
279        assert_eq!(manager.get_connection_count(&addr).await, 0);
280    }
281}