1use crate::config::ServerConfig;
6use crate::connection::{Connection, ConnectionHandle};
7use aerosocket_core::Result;
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::{mpsc, Mutex};
12use tokio::time::interval;
13
14#[derive(Debug, Clone)]
16pub struct ManagerStats {
17 pub active_connections: usize,
19 pub total_connections: u64,
21 pub timeout_closures: u64,
23 pub error_closures: u64,
25 pub normal_closures: u64,
27 pub memory_usage: u64,
29 pub peak_connections: usize,
31}
32
33impl Default for ManagerStats {
34 fn default() -> Self {
35 Self {
36 active_connections: 0,
37 total_connections: 0,
38 timeout_closures: 0,
39 error_closures: 0,
40 normal_closures: 0,
41 memory_usage: 0,
42 peak_connections: 0,
43 }
44 }
45}
46
47#[derive(Debug)]
49pub struct ConnectionManager {
50 config: ServerConfig,
52 connections: Arc<Mutex<HashMap<u64, ConnectionHandle>>>,
54 stats: Arc<Mutex<ManagerStats>>,
56 next_id: Arc<Mutex<u64>>,
58 cleanup_interval: Duration,
60 cleanup_tx: mpsc::Sender<u64>,
62 cleanup_rx: Arc<Mutex<mpsc::Receiver<u64>>>,
64}
65
66impl ConnectionManager {
67 pub fn new(config: ServerConfig) -> Self {
69 let (cleanup_tx, cleanup_rx) = mpsc::channel(1000);
70
71 Self {
72 cleanup_interval: Duration::from_secs(30), config,
74 connections: Arc::new(Mutex::new(HashMap::new())),
75 stats: Arc::new(Mutex::new(ManagerStats::default())),
76 next_id: Arc::new(Mutex::new(1)),
77 cleanup_tx,
78 cleanup_rx: Arc::new(Mutex::new(cleanup_rx)),
79 }
80 }
81
82 pub fn set_cleanup_interval(&mut self, interval: Duration) {
84 self.cleanup_interval = interval;
85 }
86
87 pub async fn add_connection(&self, connection: Connection) -> Result<ConnectionHandle> {
89 let mut next_id = self.next_id.lock().await;
90 let id = *next_id;
91 *next_id += 1;
92
93 let handle = ConnectionHandle::new(id, connection);
94
95 let mut connections = self.connections.lock().await;
96 connections.insert(id, handle.clone());
97
98 let mut stats = self.stats.lock().await;
100 stats.active_connections = connections.len();
101 stats.total_connections += 1;
102 stats.peak_connections = stats.peak_connections.max(stats.active_connections);
103
104 Ok(handle)
105 }
106
107 pub async fn remove_connection(&self, id: u64, reason: CloseReason) {
109 let mut connections = self.connections.lock().await;
110 if connections.remove(&id).is_some() {
111 let mut stats = self.stats.lock().await;
113 stats.active_connections = connections.len();
114
115 match reason {
116 CloseReason::Timeout => stats.timeout_closures += 1,
117 CloseReason::Error => stats.error_closures += 1,
118 CloseReason::Normal => stats.normal_closures += 1,
119 }
120 }
121 }
122
123 pub async fn get_connection(&self, id: u64) -> Option<ConnectionHandle> {
125 let connections = self.connections.lock().await;
126 connections.get(&id).cloned()
127 }
128
129 pub async fn get_all_connections(&self) -> Vec<ConnectionHandle> {
131 let connections = self.connections.lock().await;
132 connections.values().cloned().collect()
133 }
134
135 pub async fn connection_count(&self) -> usize {
137 let connections = self.connections.lock().await;
138 connections.len()
139 }
140
141 pub async fn get_stats(&self) -> ManagerStats {
143 let stats = self.stats.lock().await;
144 ManagerStats {
145 active_connections: stats.active_connections,
146 total_connections: stats.total_connections,
147 timeout_closures: stats.timeout_closures,
148 error_closures: stats.error_closures,
149 normal_closures: stats.normal_closures,
150 memory_usage: stats.memory_usage,
151 peak_connections: stats.peak_connections,
152 }
153 }
154
155 pub async fn start_cleanup_task(&self) {
157 let connections = self.connections.clone();
158 let stats = self.stats.clone();
159 let cleanup_rx = self.cleanup_rx.clone();
160 let cleanup_interval = self.cleanup_interval;
161 let idle_timeout = self.config.idle_timeout;
162
163 tokio::spawn(async move {
164 let mut cleanup_interval_timer = interval(cleanup_interval);
165 let mut cleanup_receiver = cleanup_rx.lock().await;
166
167 loop {
168 tokio::select! {
169 _ = cleanup_interval_timer.tick() => {
170 Self::cleanup_idle_connections(&connections, &stats, idle_timeout).await;
172 }
173 Some(id) = cleanup_receiver.recv() => {
174 Self::remove_connection_internal(&connections, &stats, id, CloseReason::Timeout).await;
176 }
177 }
178 }
179 });
180 }
181
182 async fn cleanup_idle_connections(
184 connections: &Arc<Mutex<HashMap<u64, ConnectionHandle>>>,
185 stats: &Arc<Mutex<ManagerStats>>,
186 idle_timeout: Duration,
187 ) {
188 let mut connections_map = connections.lock().await;
189 let mut to_remove = Vec::new();
190
191 for (id, handle) in connections_map.iter() {
192 if let Ok(connection) = handle.try_lock().await {
193 if connection.is_timed_out() {
194 to_remove.push(*id);
195 }
196 }
197 }
198
199 for id in to_remove {
200 connections_map.remove(&id);
201 let mut stats = stats.lock().await;
202 stats.active_connections = connections_map.len();
203 stats.timeout_closures += 1;
204 }
205 }
206
207 async fn remove_connection_internal(
209 connections: &Arc<Mutex<HashMap<u64, ConnectionHandle>>>,
210 stats: &Arc<Mutex<ManagerStats>>,
211 id: u64,
212 reason: CloseReason,
213 ) {
214 let mut connections_map = connections.lock().await;
215 if connections_map.remove(&id).is_some() {
216 let mut stats = stats.lock().await;
217 stats.active_connections = connections_map.len();
218
219 match reason {
220 CloseReason::Timeout => stats.timeout_closures += 1,
221 CloseReason::Error => stats.error_closures += 1,
222 CloseReason::Normal => stats.normal_closures += 1,
223 }
224 }
225 }
226
227 pub async fn monitor_connections(&self) -> Result<Vec<ConnectionHealth>> {
229 let connections = self.connections.lock().await;
230 let mut health_reports = Vec::new();
231
232 for (id, handle) in connections.iter() {
233 if let Ok(connection) = handle.try_lock().await {
234 let health = ConnectionHealth {
235 id: *id,
236 remote_addr: connection.remote_addr(),
237 state: connection.state(),
238 uptime: connection.metadata().established_at.elapsed(),
239 last_activity: connection.metadata().last_activity_at.elapsed(),
240 messages_sent: connection.metadata().messages_sent,
241 messages_received: connection.metadata().messages_received,
242 bytes_sent: connection.metadata().bytes_sent,
243 bytes_received: connection.metadata().bytes_received,
244 time_until_timeout: connection.time_until_timeout(),
245 };
246 health_reports.push(health);
247 }
248 }
249
250 Ok(health_reports)
251 }
252
253 pub async fn close_all_connections(&self) {
255 let connections = self.connections.lock().await;
256 let handles: Vec<_> = connections.values().cloned().collect();
257 let connection_count = connections.len();
258 drop(connections);
259
260 for handle in handles {
261 if let Ok(mut connection) = handle.try_lock().await {
262 let _ = connection.close(Some(1000), Some("Server shutdown")).await;
263 }
264 }
265
266 let mut connections_map = self.connections.lock().await;
268 connections_map.clear();
269
270 let mut stats = self.stats.lock().await;
272 stats.active_connections = 0;
273 stats.normal_closures += connection_count as u64;
274 }
275}
276
277impl Drop for ConnectionManager {
278 fn drop(&mut self) {
279 let connections = self.connections.clone();
281 tokio::spawn(async move {
282 let manager = ConnectionManager {
283 config: ServerConfig::default(),
284 connections,
285 stats: Arc::new(Mutex::new(ManagerStats::default())),
286 next_id: Arc::new(Mutex::new(0)),
287 cleanup_interval: Duration::ZERO,
288 cleanup_tx: mpsc::channel(1).0,
289 cleanup_rx: Arc::new(Mutex::new(mpsc::channel(1).1)),
290 };
291 manager.close_all_connections().await;
292 });
293 }
294}
295
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
298pub enum CloseReason {
299 Timeout,
301 Error,
303 Normal,
305}
306
307#[derive(Debug, Clone)]
309pub struct ConnectionHealth {
310 pub id: u64,
312 pub remote_addr: std::net::SocketAddr,
314 pub state: crate::connection::ConnectionState,
316 pub uptime: Duration,
318 pub last_activity: Duration,
320 pub messages_sent: u64,
322 pub messages_received: u64,
324 pub bytes_sent: u64,
326 pub bytes_received: u64,
328 pub time_until_timeout: Option<Duration>,
330}