1use dashmap::DashMap;
49use libp2p::PeerId;
50use serde::{Deserialize, Serialize};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum SessionState {
57 Creating,
59 Active,
61 Idle,
63 Closing,
65 Closed,
67}
68
69impl SessionState {
70 pub fn is_active(&self) -> bool {
72 matches!(self, Self::Creating | Self::Active)
73 }
74
75 pub fn is_terminated(&self) -> bool {
77 matches!(self, Self::Closing | Self::Closed)
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct Session {
84 pub peer_id: PeerId,
86 pub state: SessionState,
88 pub created_at: Instant,
90 pub last_activity: Instant,
92 pub closed_at: Option<Instant>,
94 pub bytes_sent: u64,
96 pub bytes_received: u64,
98 pub messages_sent: u64,
100 pub messages_received: u64,
102 pub metadata: SessionMetadata,
104}
105
106impl Session {
107 pub fn new(peer_id: PeerId) -> Self {
109 let now = Instant::now();
110 Self {
111 peer_id,
112 state: SessionState::Creating,
113 created_at: now,
114 last_activity: now,
115 closed_at: None,
116 bytes_sent: 0,
117 bytes_received: 0,
118 messages_sent: 0,
119 messages_received: 0,
120 metadata: SessionMetadata::default(),
121 }
122 }
123
124 pub fn duration(&self) -> Duration {
126 if let Some(closed_at) = self.closed_at {
127 closed_at.duration_since(self.created_at)
128 } else {
129 Instant::now().duration_since(self.created_at)
130 }
131 }
132
133 pub fn idle_duration(&self) -> Duration {
135 Instant::now().duration_since(self.last_activity)
136 }
137
138 pub fn is_idle(&self, timeout: Duration) -> bool {
140 self.idle_duration() > timeout
141 }
142
143 pub fn mark_activity(&mut self) {
145 self.last_activity = Instant::now();
146 if self.state == SessionState::Idle {
147 self.state = SessionState::Active;
148 }
149 }
150
151 pub fn add_bytes_sent(&mut self, bytes: u64) {
153 self.bytes_sent += bytes;
154 self.mark_activity();
155 }
156
157 pub fn add_bytes_received(&mut self, bytes: u64) {
159 self.bytes_received += bytes;
160 self.mark_activity();
161 }
162
163 pub fn record_message_sent(&mut self) {
165 self.messages_sent += 1;
166 self.mark_activity();
167 }
168
169 pub fn record_message_received(&mut self) {
171 self.messages_received += 1;
172 self.mark_activity();
173 }
174}
175
176#[derive(Debug, Clone, Default, Serialize, Deserialize)]
178pub struct SessionMetadata {
179 pub endpoint: Option<String>,
181 pub protocol: Option<String>,
183 pub tags: Vec<String>,
185 pub quality_score: Option<f64>,
187}
188
189#[derive(Debug, Clone)]
191pub struct SessionConfig {
192 pub idle_timeout: Duration,
194 pub max_sessions: usize,
196 pub auto_cleanup: bool,
198 pub cleanup_interval: Duration,
200}
201
202impl Default for SessionConfig {
203 fn default() -> Self {
204 Self {
205 idle_timeout: Duration::from_secs(300), max_sessions: 1000,
207 auto_cleanup: true,
208 cleanup_interval: Duration::from_secs(60), }
210 }
211}
212
213#[derive(Debug, Clone, Default, Serialize, Deserialize)]
215pub struct SessionStats {
216 pub total_created: u64,
218 pub active_sessions: usize,
220 pub idle_sessions: usize,
222 pub total_closed: u64,
224 pub total_bytes_sent: u64,
226 pub total_bytes_received: u64,
228 pub total_messages_sent: u64,
230 pub total_messages_received: u64,
232 pub avg_session_duration: Duration,
234}
235
236pub struct SessionManager {
238 config: SessionConfig,
240 sessions: Arc<DashMap<PeerId, Session>>,
242 stats: Arc<parking_lot::RwLock<SessionStats>>,
244}
245
246impl SessionManager {
247 pub fn new(config: SessionConfig) -> Self {
249 Self {
250 config,
251 sessions: Arc::new(DashMap::new()),
252 stats: Arc::new(parking_lot::RwLock::new(SessionStats::default())),
253 }
254 }
255
256 pub fn create_session(&self, peer_id: PeerId) -> bool {
258 if self.sessions.len() >= self.config.max_sessions {
260 return false;
261 }
262
263 let session = Session::new(peer_id);
264 let inserted = self.sessions.insert(peer_id, session).is_none();
265
266 if inserted {
267 let mut stats = self.stats.write();
268 stats.total_created += 1;
269 }
270
271 inserted
272 }
273
274 pub fn get_session(&self, peer_id: &PeerId) -> Option<Session> {
276 self.sessions.get(peer_id).map(|s| s.clone())
277 }
278
279 pub fn activate_session(&self, peer_id: &PeerId) {
281 if let Some(mut session) = self.sessions.get_mut(peer_id) {
282 session.state = SessionState::Active;
283 session.mark_activity();
284 }
285 }
286
287 pub fn mark_activity(&self, peer_id: &PeerId) {
289 if let Some(mut session) = self.sessions.get_mut(peer_id) {
290 session.mark_activity();
291 }
292 }
293
294 pub fn update_bandwidth(&self, peer_id: &PeerId, sent: u64, received: u64) {
296 if let Some(mut session) = self.sessions.get_mut(peer_id) {
297 session.add_bytes_sent(sent);
298 session.add_bytes_received(received);
299
300 let mut stats = self.stats.write();
301 stats.total_bytes_sent += sent;
302 stats.total_bytes_received += received;
303 }
304 }
305
306 pub fn record_message(&self, peer_id: &PeerId, sent: bool) {
308 if let Some(mut session) = self.sessions.get_mut(peer_id) {
309 if sent {
310 session.record_message_sent();
311 let mut stats = self.stats.write();
312 stats.total_messages_sent += 1;
313 } else {
314 session.record_message_received();
315 let mut stats = self.stats.write();
316 stats.total_messages_received += 1;
317 }
318 }
319 }
320
321 pub fn update_metadata(&self, peer_id: &PeerId, metadata: SessionMetadata) {
323 if let Some(mut session) = self.sessions.get_mut(peer_id) {
324 session.metadata = metadata;
325 }
326 }
327
328 pub fn close_session(&self, peer_id: &PeerId) {
330 if let Some(mut session) = self.sessions.get_mut(peer_id) {
331 session.state = SessionState::Closing;
332 session.closed_at = Some(Instant::now());
333
334 let mut stats = self.stats.write();
335 stats.total_closed += 1;
336
337 let total_duration = stats.avg_session_duration.as_secs_f64()
339 * (stats.total_closed - 1) as f64
340 + session.duration().as_secs_f64();
341 stats.avg_session_duration =
342 Duration::from_secs_f64(total_duration / stats.total_closed as f64);
343 }
344 }
345
346 pub fn remove_session(&self, peer_id: &PeerId) -> Option<Session> {
348 self.sessions.remove(peer_id).map(|(_, s)| s)
349 }
350
351 pub fn get_all_sessions(&self) -> Vec<Session> {
353 self.sessions.iter().map(|entry| entry.clone()).collect()
354 }
355
356 pub fn get_sessions_by_state(&self, state: SessionState) -> Vec<Session> {
358 self.sessions
359 .iter()
360 .filter(|entry| entry.state == state)
361 .map(|entry| entry.clone())
362 .collect()
363 }
364
365 pub fn check_idle_sessions(&self) -> Vec<PeerId> {
367 let mut idle_peers = Vec::new();
368
369 for mut entry in self.sessions.iter_mut() {
370 if entry.state == SessionState::Active && entry.is_idle(self.config.idle_timeout) {
371 entry.state = SessionState::Idle;
372 idle_peers.push(entry.peer_id);
373 }
374 }
375
376 idle_peers
377 }
378
379 pub fn cleanup_closed_sessions(&self) -> usize {
381 let closed_sessions: Vec<PeerId> = self
382 .sessions
383 .iter()
384 .filter(|entry| entry.state == SessionState::Closed)
385 .map(|entry| entry.peer_id)
386 .collect();
387
388 let count = closed_sessions.len();
389 for peer_id in closed_sessions {
390 self.sessions.remove(&peer_id);
391 }
392
393 count
394 }
395
396 pub fn stats(&self) -> SessionStats {
398 let mut stats = self.stats.read().clone();
399
400 stats.active_sessions = self
402 .sessions
403 .iter()
404 .filter(|e| e.state == SessionState::Active)
405 .count();
406 stats.idle_sessions = self
407 .sessions
408 .iter()
409 .filter(|e| e.state == SessionState::Idle)
410 .count();
411
412 stats
413 }
414
415 pub fn session_count(&self) -> usize {
417 self.sessions.len()
418 }
419
420 pub fn active_session_count(&self) -> usize {
422 self.sessions.iter().filter(|e| e.state.is_active()).count()
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 #[test]
431 fn test_session_creation() {
432 let peer_id = PeerId::random();
433 let session = Session::new(peer_id);
434
435 assert_eq!(session.peer_id, peer_id);
436 assert_eq!(session.state, SessionState::Creating);
437 assert_eq!(session.bytes_sent, 0);
438 assert_eq!(session.bytes_received, 0);
439 }
440
441 #[test]
442 fn test_session_state() {
443 assert!(SessionState::Creating.is_active());
444 assert!(SessionState::Active.is_active());
445 assert!(!SessionState::Idle.is_active());
446 assert!(SessionState::Closing.is_terminated());
447 assert!(SessionState::Closed.is_terminated());
448 }
449
450 #[test]
451 fn test_session_activity() {
452 let peer_id = PeerId::random();
453 let mut session = Session::new(peer_id);
454
455 let initial_time = session.last_activity;
456 std::thread::sleep(std::time::Duration::from_millis(10));
457
458 session.mark_activity();
459 assert!(session.last_activity > initial_time);
460 }
461
462 #[test]
463 fn test_session_bandwidth() {
464 let peer_id = PeerId::random();
465 let mut session = Session::new(peer_id);
466
467 session.add_bytes_sent(1024);
468 session.add_bytes_received(2048);
469
470 assert_eq!(session.bytes_sent, 1024);
471 assert_eq!(session.bytes_received, 2048);
472 }
473
474 #[test]
475 fn test_session_messages() {
476 let peer_id = PeerId::random();
477 let mut session = Session::new(peer_id);
478
479 session.record_message_sent();
480 session.record_message_sent();
481 session.record_message_received();
482
483 assert_eq!(session.messages_sent, 2);
484 assert_eq!(session.messages_received, 1);
485 }
486
487 #[test]
488 fn test_session_manager_creation() {
489 let config = SessionConfig::default();
490 let manager = SessionManager::new(config);
491
492 let peer_id = PeerId::random();
493 assert!(manager.create_session(peer_id));
494 assert_eq!(manager.session_count(), 1);
495
496 let stats = manager.stats();
497 assert_eq!(stats.total_created, 1);
498 }
499
500 #[test]
501 fn test_session_manager_max_sessions() {
502 let config = SessionConfig {
503 max_sessions: 2,
504 ..Default::default()
505 };
506 let manager = SessionManager::new(config);
507
508 let peer1 = PeerId::random();
509 let peer2 = PeerId::random();
510 let peer3 = PeerId::random();
511
512 assert!(manager.create_session(peer1));
513 assert!(manager.create_session(peer2));
514 assert!(!manager.create_session(peer3)); assert_eq!(manager.session_count(), 2);
517 }
518
519 #[test]
520 fn test_session_manager_activity() {
521 let manager = SessionManager::new(SessionConfig::default());
522 let peer_id = PeerId::random();
523
524 manager.create_session(peer_id);
525 manager.activate_session(&peer_id);
526
527 let session = manager.get_session(&peer_id).unwrap();
528 assert_eq!(session.state, SessionState::Active);
529 }
530
531 #[test]
532 fn test_session_manager_bandwidth() {
533 let manager = SessionManager::new(SessionConfig::default());
534 let peer_id = PeerId::random();
535
536 manager.create_session(peer_id);
537 manager.update_bandwidth(&peer_id, 1024, 2048);
538
539 let session = manager.get_session(&peer_id).unwrap();
540 assert_eq!(session.bytes_sent, 1024);
541 assert_eq!(session.bytes_received, 2048);
542
543 let stats = manager.stats();
544 assert_eq!(stats.total_bytes_sent, 1024);
545 assert_eq!(stats.total_bytes_received, 2048);
546 }
547
548 #[test]
549 fn test_session_manager_close() {
550 let manager = SessionManager::new(SessionConfig::default());
551 let peer_id = PeerId::random();
552
553 manager.create_session(peer_id);
554 manager.close_session(&peer_id);
555
556 let session = manager.get_session(&peer_id).unwrap();
557 assert_eq!(session.state, SessionState::Closing);
558 assert!(session.closed_at.is_some());
559
560 let stats = manager.stats();
561 assert_eq!(stats.total_closed, 1);
562 }
563
564 #[test]
565 fn test_session_manager_remove() {
566 let manager = SessionManager::new(SessionConfig::default());
567 let peer_id = PeerId::random();
568
569 manager.create_session(peer_id);
570 assert_eq!(manager.session_count(), 1);
571
572 manager.remove_session(&peer_id);
573 assert_eq!(manager.session_count(), 0);
574 }
575
576 #[test]
577 fn test_session_manager_filter_by_state() {
578 let manager = SessionManager::new(SessionConfig::default());
579
580 let peer1 = PeerId::random();
581 let peer2 = PeerId::random();
582
583 manager.create_session(peer1);
584 manager.create_session(peer2);
585 manager.activate_session(&peer1);
586
587 let active = manager.get_sessions_by_state(SessionState::Active);
588 assert_eq!(active.len(), 1);
589
590 let creating = manager.get_sessions_by_state(SessionState::Creating);
591 assert_eq!(creating.len(), 1);
592 }
593
594 #[test]
595 fn test_session_manager_cleanup() {
596 let manager = SessionManager::new(SessionConfig::default());
597
598 let peer1 = PeerId::random();
599 let peer2 = PeerId::random();
600
601 manager.create_session(peer1);
602 manager.create_session(peer2);
603
604 manager.close_session(&peer1);
605 if let Some(mut session) = manager.sessions.get_mut(&peer1) {
606 session.state = SessionState::Closed;
607 }
608
609 let cleaned = manager.cleanup_closed_sessions();
610 assert_eq!(cleaned, 1);
611 assert_eq!(manager.session_count(), 1);
612 }
613
614 #[test]
615 fn test_session_idle_detection() {
616 let peer_id = PeerId::random();
617 let session = Session::new(peer_id);
618
619 assert!(!session.is_idle(Duration::from_secs(1)));
621
622 let mut old_session = Session::new(peer_id);
624 old_session.last_activity = Instant::now() - Duration::from_secs(10);
625
626 assert!(old_session.is_idle(Duration::from_secs(5)));
628 }
629}