ipfrs_network/
session.rs

1//! Connection Session Management
2//!
3//! This module provides utilities for tracking connection sessions and their lifecycles.
4//!
5//! # Features
6//!
7//! - Track active connection sessions with metadata
8//! - Session lifecycle management (created, active, idle, closing, closed)
9//! - Idle timeout detection
10//! - Session statistics and metrics
11//! - Callback hooks for lifecycle events
12//!
13//! # Example
14//!
15//! ```
16//! use ipfrs_network::session::{SessionManager, SessionConfig, SessionState};
17//! use libp2p::PeerId;
18//! use std::time::Duration;
19//!
20//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
21//! let config = SessionConfig {
22//!     idle_timeout: Duration::from_secs(300),
23//!     max_sessions: 1000,
24//!     ..Default::default()
25//! };
26//!
27//! let mut manager = SessionManager::new(config);
28//!
29//! // Create a new session
30//! let peer_id = PeerId::random();
31//! manager.create_session(peer_id);
32//!
33//! // Check session state
34//! if let Some(session) = manager.get_session(&peer_id) {
35//!     println!("Session state: {:?}", session.state);
36//!     println!("Duration: {:?}", session.duration());
37//! }
38//!
39//! // Update activity
40//! manager.mark_activity(&peer_id);
41//!
42//! // Close session
43//! manager.close_session(&peer_id);
44//! # Ok(())
45//! # }
46//! ```
47
48use dashmap::DashMap;
49use libp2p::PeerId;
50use serde::{Deserialize, Serialize};
51use std::sync::Arc;
52use std::time::{Duration, Instant};
53
54/// Session state
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
56pub enum SessionState {
57    /// Session is being created
58    Creating,
59    /// Session is active
60    Active,
61    /// Session is idle (no recent activity)
62    Idle,
63    /// Session is being closed
64    Closing,
65    /// Session is closed
66    Closed,
67}
68
69impl SessionState {
70    /// Check if session is active (creating or active)
71    pub fn is_active(&self) -> bool {
72        matches!(self, Self::Creating | Self::Active)
73    }
74
75    /// Check if session is terminated (closing or closed)
76    pub fn is_terminated(&self) -> bool {
77        matches!(self, Self::Closing | Self::Closed)
78    }
79}
80
81/// Connection session information
82#[derive(Debug, Clone)]
83pub struct Session {
84    /// Peer ID
85    pub peer_id: PeerId,
86    /// Session state
87    pub state: SessionState,
88    /// Session creation time
89    pub created_at: Instant,
90    /// Last activity time
91    pub last_activity: Instant,
92    /// Session closed time (if closed)
93    pub closed_at: Option<Instant>,
94    /// Bytes sent in this session
95    pub bytes_sent: u64,
96    /// Bytes received in this session
97    pub bytes_received: u64,
98    /// Number of messages sent
99    pub messages_sent: u64,
100    /// Number of messages received
101    pub messages_received: u64,
102    /// Custom session metadata
103    pub metadata: SessionMetadata,
104}
105
106impl Session {
107    /// Create a new session
108    pub fn new(peer_id: PeerId) -> Self {
109        let now = Instant::now();
110        Self {
111            peer_id,
112            state: SessionState::Creating,
113            created_at: now,
114            last_activity: now,
115            closed_at: None,
116            bytes_sent: 0,
117            bytes_received: 0,
118            messages_sent: 0,
119            messages_received: 0,
120            metadata: SessionMetadata::default(),
121        }
122    }
123
124    /// Get session duration
125    pub fn duration(&self) -> Duration {
126        if let Some(closed_at) = self.closed_at {
127            closed_at.duration_since(self.created_at)
128        } else {
129            Instant::now().duration_since(self.created_at)
130        }
131    }
132
133    /// Get time since last activity
134    pub fn idle_duration(&self) -> Duration {
135        Instant::now().duration_since(self.last_activity)
136    }
137
138    /// Check if session is idle (exceeds timeout)
139    pub fn is_idle(&self, timeout: Duration) -> bool {
140        self.idle_duration() > timeout
141    }
142
143    /// Mark activity
144    pub fn mark_activity(&mut self) {
145        self.last_activity = Instant::now();
146        if self.state == SessionState::Idle {
147            self.state = SessionState::Active;
148        }
149    }
150
151    /// Update sent bytes
152    pub fn add_bytes_sent(&mut self, bytes: u64) {
153        self.bytes_sent += bytes;
154        self.mark_activity();
155    }
156
157    /// Update received bytes
158    pub fn add_bytes_received(&mut self, bytes: u64) {
159        self.bytes_received += bytes;
160        self.mark_activity();
161    }
162
163    /// Record message sent
164    pub fn record_message_sent(&mut self) {
165        self.messages_sent += 1;
166        self.mark_activity();
167    }
168
169    /// Record message received
170    pub fn record_message_received(&mut self) {
171        self.messages_received += 1;
172        self.mark_activity();
173    }
174}
175
176/// Session metadata
177#[derive(Debug, Clone, Default, Serialize, Deserialize)]
178pub struct SessionMetadata {
179    /// Connection endpoint (address)
180    pub endpoint: Option<String>,
181    /// Connection protocol
182    pub protocol: Option<String>,
183    /// Session tags
184    pub tags: Vec<String>,
185    /// Connection quality score (0.0-1.0)
186    pub quality_score: Option<f64>,
187}
188
189/// Session configuration
190#[derive(Debug, Clone)]
191pub struct SessionConfig {
192    /// Idle timeout before marking session as idle
193    pub idle_timeout: Duration,
194    /// Maximum number of concurrent sessions
195    pub max_sessions: usize,
196    /// Enable automatic cleanup of closed sessions
197    pub auto_cleanup: bool,
198    /// Cleanup interval for closed sessions
199    pub cleanup_interval: Duration,
200}
201
202impl Default for SessionConfig {
203    fn default() -> Self {
204        Self {
205            idle_timeout: Duration::from_secs(300), // 5 minutes
206            max_sessions: 1000,
207            auto_cleanup: true,
208            cleanup_interval: Duration::from_secs(60), // 1 minute
209        }
210    }
211}
212
213/// Session statistics
214#[derive(Debug, Clone, Default, Serialize, Deserialize)]
215pub struct SessionStats {
216    /// Total sessions created
217    pub total_created: u64,
218    /// Currently active sessions
219    pub active_sessions: usize,
220    /// Currently idle sessions
221    pub idle_sessions: usize,
222    /// Total sessions closed
223    pub total_closed: u64,
224    /// Total bytes sent across all sessions
225    pub total_bytes_sent: u64,
226    /// Total bytes received across all sessions
227    pub total_bytes_received: u64,
228    /// Total messages sent
229    pub total_messages_sent: u64,
230    /// Total messages received
231    pub total_messages_received: u64,
232    /// Average session duration
233    pub avg_session_duration: Duration,
234}
235
236/// Session manager
237pub struct SessionManager {
238    /// Configuration
239    config: SessionConfig,
240    /// Active sessions
241    sessions: Arc<DashMap<PeerId, Session>>,
242    /// Statistics
243    stats: Arc<parking_lot::RwLock<SessionStats>>,
244}
245
246impl SessionManager {
247    /// Create a new session manager
248    pub fn new(config: SessionConfig) -> Self {
249        Self {
250            config,
251            sessions: Arc::new(DashMap::new()),
252            stats: Arc::new(parking_lot::RwLock::new(SessionStats::default())),
253        }
254    }
255
256    /// Create a new session
257    pub fn create_session(&self, peer_id: PeerId) -> bool {
258        // Check max sessions limit
259        if self.sessions.len() >= self.config.max_sessions {
260            return false;
261        }
262
263        let session = Session::new(peer_id);
264        let inserted = self.sessions.insert(peer_id, session).is_none();
265
266        if inserted {
267            let mut stats = self.stats.write();
268            stats.total_created += 1;
269        }
270
271        inserted
272    }
273
274    /// Get a session
275    pub fn get_session(&self, peer_id: &PeerId) -> Option<Session> {
276        self.sessions.get(peer_id).map(|s| s.clone())
277    }
278
279    /// Mark session as active
280    pub fn activate_session(&self, peer_id: &PeerId) {
281        if let Some(mut session) = self.sessions.get_mut(peer_id) {
282            session.state = SessionState::Active;
283            session.mark_activity();
284        }
285    }
286
287    /// Mark activity on a session
288    pub fn mark_activity(&self, peer_id: &PeerId) {
289        if let Some(mut session) = self.sessions.get_mut(peer_id) {
290            session.mark_activity();
291        }
292    }
293
294    /// Update session bandwidth
295    pub fn update_bandwidth(&self, peer_id: &PeerId, sent: u64, received: u64) {
296        if let Some(mut session) = self.sessions.get_mut(peer_id) {
297            session.add_bytes_sent(sent);
298            session.add_bytes_received(received);
299
300            let mut stats = self.stats.write();
301            stats.total_bytes_sent += sent;
302            stats.total_bytes_received += received;
303        }
304    }
305
306    /// Record message activity
307    pub fn record_message(&self, peer_id: &PeerId, sent: bool) {
308        if let Some(mut session) = self.sessions.get_mut(peer_id) {
309            if sent {
310                session.record_message_sent();
311                let mut stats = self.stats.write();
312                stats.total_messages_sent += 1;
313            } else {
314                session.record_message_received();
315                let mut stats = self.stats.write();
316                stats.total_messages_received += 1;
317            }
318        }
319    }
320
321    /// Update session metadata
322    pub fn update_metadata(&self, peer_id: &PeerId, metadata: SessionMetadata) {
323        if let Some(mut session) = self.sessions.get_mut(peer_id) {
324            session.metadata = metadata;
325        }
326    }
327
328    /// Close a session
329    pub fn close_session(&self, peer_id: &PeerId) {
330        if let Some(mut session) = self.sessions.get_mut(peer_id) {
331            session.state = SessionState::Closing;
332            session.closed_at = Some(Instant::now());
333
334            let mut stats = self.stats.write();
335            stats.total_closed += 1;
336
337            // Update average duration
338            let total_duration = stats.avg_session_duration.as_secs_f64()
339                * (stats.total_closed - 1) as f64
340                + session.duration().as_secs_f64();
341            stats.avg_session_duration =
342                Duration::from_secs_f64(total_duration / stats.total_closed as f64);
343        }
344    }
345
346    /// Remove a closed session
347    pub fn remove_session(&self, peer_id: &PeerId) -> Option<Session> {
348        self.sessions.remove(peer_id).map(|(_, s)| s)
349    }
350
351    /// Get all sessions
352    pub fn get_all_sessions(&self) -> Vec<Session> {
353        self.sessions.iter().map(|entry| entry.clone()).collect()
354    }
355
356    /// Get sessions by state
357    pub fn get_sessions_by_state(&self, state: SessionState) -> Vec<Session> {
358        self.sessions
359            .iter()
360            .filter(|entry| entry.state == state)
361            .map(|entry| entry.clone())
362            .collect()
363    }
364
365    /// Check for idle sessions and mark them
366    pub fn check_idle_sessions(&self) -> Vec<PeerId> {
367        let mut idle_peers = Vec::new();
368
369        for mut entry in self.sessions.iter_mut() {
370            if entry.state == SessionState::Active && entry.is_idle(self.config.idle_timeout) {
371                entry.state = SessionState::Idle;
372                idle_peers.push(entry.peer_id);
373            }
374        }
375
376        idle_peers
377    }
378
379    /// Cleanup closed sessions
380    pub fn cleanup_closed_sessions(&self) -> usize {
381        let closed_sessions: Vec<PeerId> = self
382            .sessions
383            .iter()
384            .filter(|entry| entry.state == SessionState::Closed)
385            .map(|entry| entry.peer_id)
386            .collect();
387
388        let count = closed_sessions.len();
389        for peer_id in closed_sessions {
390            self.sessions.remove(&peer_id);
391        }
392
393        count
394    }
395
396    /// Get statistics
397    pub fn stats(&self) -> SessionStats {
398        let mut stats = self.stats.read().clone();
399
400        // Update current session counts
401        stats.active_sessions = self
402            .sessions
403            .iter()
404            .filter(|e| e.state == SessionState::Active)
405            .count();
406        stats.idle_sessions = self
407            .sessions
408            .iter()
409            .filter(|e| e.state == SessionState::Idle)
410            .count();
411
412        stats
413    }
414
415    /// Get session count
416    pub fn session_count(&self) -> usize {
417        self.sessions.len()
418    }
419
420    /// Get active session count
421    pub fn active_session_count(&self) -> usize {
422        self.sessions.iter().filter(|e| e.state.is_active()).count()
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_session_creation() {
432        let peer_id = PeerId::random();
433        let session = Session::new(peer_id);
434
435        assert_eq!(session.peer_id, peer_id);
436        assert_eq!(session.state, SessionState::Creating);
437        assert_eq!(session.bytes_sent, 0);
438        assert_eq!(session.bytes_received, 0);
439    }
440
441    #[test]
442    fn test_session_state() {
443        assert!(SessionState::Creating.is_active());
444        assert!(SessionState::Active.is_active());
445        assert!(!SessionState::Idle.is_active());
446        assert!(SessionState::Closing.is_terminated());
447        assert!(SessionState::Closed.is_terminated());
448    }
449
450    #[test]
451    fn test_session_activity() {
452        let peer_id = PeerId::random();
453        let mut session = Session::new(peer_id);
454
455        let initial_time = session.last_activity;
456        std::thread::sleep(std::time::Duration::from_millis(10));
457
458        session.mark_activity();
459        assert!(session.last_activity > initial_time);
460    }
461
462    #[test]
463    fn test_session_bandwidth() {
464        let peer_id = PeerId::random();
465        let mut session = Session::new(peer_id);
466
467        session.add_bytes_sent(1024);
468        session.add_bytes_received(2048);
469
470        assert_eq!(session.bytes_sent, 1024);
471        assert_eq!(session.bytes_received, 2048);
472    }
473
474    #[test]
475    fn test_session_messages() {
476        let peer_id = PeerId::random();
477        let mut session = Session::new(peer_id);
478
479        session.record_message_sent();
480        session.record_message_sent();
481        session.record_message_received();
482
483        assert_eq!(session.messages_sent, 2);
484        assert_eq!(session.messages_received, 1);
485    }
486
487    #[test]
488    fn test_session_manager_creation() {
489        let config = SessionConfig::default();
490        let manager = SessionManager::new(config);
491
492        let peer_id = PeerId::random();
493        assert!(manager.create_session(peer_id));
494        assert_eq!(manager.session_count(), 1);
495
496        let stats = manager.stats();
497        assert_eq!(stats.total_created, 1);
498    }
499
500    #[test]
501    fn test_session_manager_max_sessions() {
502        let config = SessionConfig {
503            max_sessions: 2,
504            ..Default::default()
505        };
506        let manager = SessionManager::new(config);
507
508        let peer1 = PeerId::random();
509        let peer2 = PeerId::random();
510        let peer3 = PeerId::random();
511
512        assert!(manager.create_session(peer1));
513        assert!(manager.create_session(peer2));
514        assert!(!manager.create_session(peer3)); // Should fail (max reached)
515
516        assert_eq!(manager.session_count(), 2);
517    }
518
519    #[test]
520    fn test_session_manager_activity() {
521        let manager = SessionManager::new(SessionConfig::default());
522        let peer_id = PeerId::random();
523
524        manager.create_session(peer_id);
525        manager.activate_session(&peer_id);
526
527        let session = manager.get_session(&peer_id).unwrap();
528        assert_eq!(session.state, SessionState::Active);
529    }
530
531    #[test]
532    fn test_session_manager_bandwidth() {
533        let manager = SessionManager::new(SessionConfig::default());
534        let peer_id = PeerId::random();
535
536        manager.create_session(peer_id);
537        manager.update_bandwidth(&peer_id, 1024, 2048);
538
539        let session = manager.get_session(&peer_id).unwrap();
540        assert_eq!(session.bytes_sent, 1024);
541        assert_eq!(session.bytes_received, 2048);
542
543        let stats = manager.stats();
544        assert_eq!(stats.total_bytes_sent, 1024);
545        assert_eq!(stats.total_bytes_received, 2048);
546    }
547
548    #[test]
549    fn test_session_manager_close() {
550        let manager = SessionManager::new(SessionConfig::default());
551        let peer_id = PeerId::random();
552
553        manager.create_session(peer_id);
554        manager.close_session(&peer_id);
555
556        let session = manager.get_session(&peer_id).unwrap();
557        assert_eq!(session.state, SessionState::Closing);
558        assert!(session.closed_at.is_some());
559
560        let stats = manager.stats();
561        assert_eq!(stats.total_closed, 1);
562    }
563
564    #[test]
565    fn test_session_manager_remove() {
566        let manager = SessionManager::new(SessionConfig::default());
567        let peer_id = PeerId::random();
568
569        manager.create_session(peer_id);
570        assert_eq!(manager.session_count(), 1);
571
572        manager.remove_session(&peer_id);
573        assert_eq!(manager.session_count(), 0);
574    }
575
576    #[test]
577    fn test_session_manager_filter_by_state() {
578        let manager = SessionManager::new(SessionConfig::default());
579
580        let peer1 = PeerId::random();
581        let peer2 = PeerId::random();
582
583        manager.create_session(peer1);
584        manager.create_session(peer2);
585        manager.activate_session(&peer1);
586
587        let active = manager.get_sessions_by_state(SessionState::Active);
588        assert_eq!(active.len(), 1);
589
590        let creating = manager.get_sessions_by_state(SessionState::Creating);
591        assert_eq!(creating.len(), 1);
592    }
593
594    #[test]
595    fn test_session_manager_cleanup() {
596        let manager = SessionManager::new(SessionConfig::default());
597
598        let peer1 = PeerId::random();
599        let peer2 = PeerId::random();
600
601        manager.create_session(peer1);
602        manager.create_session(peer2);
603
604        manager.close_session(&peer1);
605        if let Some(mut session) = manager.sessions.get_mut(&peer1) {
606            session.state = SessionState::Closed;
607        }
608
609        let cleaned = manager.cleanup_closed_sessions();
610        assert_eq!(cleaned, 1);
611        assert_eq!(manager.session_count(), 1);
612    }
613
614    #[test]
615    fn test_session_idle_detection() {
616        let peer_id = PeerId::random();
617        let session = Session::new(peer_id);
618
619        // Should not be idle immediately
620        assert!(!session.is_idle(Duration::from_secs(1)));
621
622        // Create session with old last_activity
623        let mut old_session = Session::new(peer_id);
624        old_session.last_activity = Instant::now() - Duration::from_secs(10);
625
626        // Should be idle after 5 seconds
627        assert!(old_session.is_idle(Duration::from_secs(5)));
628    }
629}