Skip to main content

aura_sync/core/
session.rs

1//! Unified session management for aura-sync protocols
2//!
3//! This module provides a centralized session management system for
4//! all session lifecycle, state tracking, and coordination patterns.
5//!
6//! **Time System**: Uses `PhysicalTime` for timestamps per the unified time architecture.
7
8use crate::core::metrics::ErrorCategory;
9use crate::core::{
10    sync_resource_with_limit, sync_session_error, sync_timeout_error, sync_validation_error,
11    MetricsCollector, SyncConfig, SyncResult,
12};
13use aura_core::time::PhysicalTime;
14use aura_core::{DeviceId, SessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::time::Duration;
18
19/// Unified session state machine following choreographic patterns
20///
21/// **Time System**: Uses `PhysicalTime` for timestamps per the unified time architecture.
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub enum SessionState<T> {
24    /// Session initialization phase
25    Initializing {
26        participants: Vec<DeviceId>,
27        /// Timeout timestamp (unified time system)
28        timeout_at: PhysicalTime,
29        /// Creation timestamp (unified time system)
30        created_at: PhysicalTime,
31    },
32    /// Active session with protocol-specific state
33    Active {
34        protocol_state: T,
35        /// Start timestamp (unified time system)
36        started_at: PhysicalTime,
37        participants: Vec<DeviceId>,
38        /// Timeout timestamp (unified time system)
39        timeout_at: PhysicalTime,
40    },
41    /// Session termination phase with results
42    Terminating {
43        result: SessionResult,
44        /// Cleanup deadline timestamp (unified time system)
45        cleanup_deadline: PhysicalTime,
46    },
47    /// Session completed and cleaned up
48    Completed(SessionResult),
49}
50
51impl<T> SessionState<T> {
52    /// Check if session has timed out
53    ///
54    /// **Time System**: Accepts `PhysicalTime` for comparison.
55    pub fn is_timed_out(&self, now: &PhysicalTime) -> bool {
56        match self {
57            SessionState::Initializing { timeout_at, .. } => now.ts_ms >= timeout_at.ts_ms,
58            SessionState::Active { timeout_at, .. } => now.ts_ms >= timeout_at.ts_ms,
59            SessionState::Terminating {
60                cleanup_deadline, ..
61            } => now.ts_ms >= cleanup_deadline.ts_ms,
62            SessionState::Completed(_) => false,
63        }
64    }
65
66    /// Check if session has timed out (from milliseconds)
67    ///
68    /// Convenience method for backward compatibility.
69    pub fn is_timed_out_ms(&self, now_ms: u64) -> bool {
70        self.is_timed_out(&PhysicalTime {
71            ts_ms: now_ms,
72            uncertainty: None,
73        })
74    }
75
76    /// Get session participants
77    pub fn participants(&self) -> &[DeviceId] {
78        match self {
79            SessionState::Initializing { participants, .. } => participants,
80            SessionState::Active { participants, .. } => participants,
81            SessionState::Terminating { .. } => &[],
82            SessionState::Completed(_) => &[],
83        }
84    }
85
86    /// Get session duration in milliseconds (if active or completed)
87    ///
88    /// **Time System**: Accepts `PhysicalTime` for comparison.
89    pub fn duration_ms(&self, current_time: &PhysicalTime) -> Option<u64> {
90        match self {
91            SessionState::Active { started_at, .. } => {
92                Some(current_time.ts_ms.saturating_sub(started_at.ts_ms))
93            }
94            SessionState::Terminating { result, .. } | SessionState::Completed(result) => {
95                match result {
96                    SessionResult::Success { duration_ms, .. } => Some(*duration_ms),
97                    SessionResult::Failure { duration_ms, .. } => Some(*duration_ms),
98                    SessionResult::Timeout { duration_ms, .. } => Some(*duration_ms),
99                }
100            }
101            SessionState::Initializing { created_at, .. } => {
102                Some(current_time.ts_ms.saturating_sub(created_at.ts_ms))
103            }
104        }
105    }
106
107    /// Check if session is in a terminal state
108    pub fn is_terminal(&self) -> bool {
109        matches!(self, SessionState::Completed(_))
110    }
111
112    /// Check if session is active
113    pub fn is_active(&self) -> bool {
114        matches!(self, SessionState::Active { .. })
115    }
116}
117
118/// Session results with comprehensive context
119#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
120pub enum SessionResult {
121    Success {
122        duration_ms: u64,
123        operations_count: u64,
124        bytes_transferred: u64,
125        participants: Vec<DeviceId>,
126        metadata: HashMap<String, String>,
127    },
128    Failure {
129        error: SessionError,
130        duration_ms: u64,
131        partial_results: Option<PartialResults>,
132    },
133    Timeout {
134        duration_ms: u64,
135        last_known_state: String,
136    },
137}
138
139impl SessionResult {
140    /// Check if result represents success
141    pub fn is_success(&self) -> bool {
142        matches!(self, SessionResult::Success { .. })
143    }
144
145    /// Get duration regardless of outcome
146    pub fn duration_ms(&self) -> u64 {
147        match self {
148            SessionResult::Success { duration_ms, .. } => *duration_ms,
149            SessionResult::Failure { duration_ms, .. } => *duration_ms,
150            SessionResult::Timeout { duration_ms, .. } => *duration_ms,
151        }
152    }
153
154    /// Get operations count for successful sessions
155    pub fn operations_count(&self) -> u64 {
156        match self {
157            SessionResult::Success {
158                operations_count, ..
159            } => *operations_count,
160            SessionResult::Failure {
161                partial_results: Some(partial),
162                ..
163            } => partial.operations_completed,
164            _ => 0,
165        }
166    }
167}
168
169/// Partial results for failed sessions
170#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
171pub struct PartialResults {
172    pub operations_completed: u64,
173    pub bytes_transferred: u64,
174    pub completed_participants: Vec<DeviceId>,
175    pub last_successful_operation: Option<String>,
176}
177
178/// Session-specific errors
179#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)]
180pub enum SessionError {
181    #[error("Session timeout after {duration_ms}ms")]
182    Timeout { duration_ms: u64 },
183
184    #[error("Participant {participant} disconnected")]
185    ParticipantDisconnected { participant: DeviceId },
186
187    #[error("Resource limit exceeded: {limit_type}")]
188    ResourceLimitExceeded { limit_type: String },
189
190    #[error("Protocol constraint violation: {constraint}")]
191    ProtocolViolation { constraint: String },
192
193    #[error("Session capacity exceeded: {current}/{max}")]
194    CapacityExceeded { current: u64, max: u64 },
195
196    #[error("Invalid session state transition: {from} -> {to}")]
197    InvalidStateTransition { from: String, to: String },
198}
199
200/// Session configuration derived from main SyncConfig
201#[derive(Debug, Clone)]
202pub struct SessionConfig {
203    /// Session timeout duration
204    pub timeout: Duration,
205    /// Maximum number of participants per session
206    pub max_participants: u32,
207    /// Cleanup interval for stale sessions
208    pub cleanup_interval: Duration,
209    /// Maximum concurrent sessions
210    pub max_concurrent_sessions: u32,
211    /// Session resource limits
212    pub resource_limits: SessionResourceLimits,
213}
214
215impl Default for SessionConfig {
216    fn default() -> Self {
217        Self {
218            timeout: Duration::from_secs(300), // 5 minutes
219            max_participants: 10,
220            cleanup_interval: Duration::from_secs(60), // 1 minute
221            max_concurrent_sessions: 20,
222            resource_limits: SessionResourceLimits::default(),
223        }
224    }
225}
226
227impl From<&SyncConfig> for SessionConfig {
228    fn from(sync_config: &SyncConfig) -> Self {
229        Self {
230            timeout: sync_config.network.sync_timeout,
231            max_participants: 10, // Could be configurable
232            cleanup_interval: sync_config.network.cleanup_interval,
233            max_concurrent_sessions: sync_config.peer_management.max_concurrent_syncs,
234            resource_limits: SessionResourceLimits::from(&sync_config.performance),
235        }
236    }
237}
238
239/// Resource limits for session management
240#[derive(Debug, Clone)]
241pub struct SessionResourceLimits {
242    /// Maximum memory usage per session in bytes
243    pub max_memory_per_session: u64,
244    /// Maximum duration a session can be active
245    pub max_session_duration: Duration,
246    /// Maximum number of operations per session
247    pub max_operations_per_session: u32,
248}
249
250impl Default for SessionResourceLimits {
251    fn default() -> Self {
252        Self {
253            max_memory_per_session: 10 * 1024 * 1024,        // 10 MB
254            max_session_duration: Duration::from_secs(3600), // 1 hour
255            max_operations_per_session: 10000,
256        }
257    }
258}
259
260impl From<&crate::core::PerformanceConfig> for SessionResourceLimits {
261    fn from(perf_config: &crate::core::PerformanceConfig) -> Self {
262        Self {
263            max_memory_per_session: perf_config.memory_limit / 10, // 1/10th of total limit
264            max_session_duration: Duration::from_secs(3600),       // 1 hour
265            max_operations_per_session: 10000,
266        }
267    }
268}
269
270/// Generic session manager for protocol-agnostic session coordination
271///
272/// **Time System**: Uses `PhysicalTime` for timestamps per the unified time architecture.
273pub struct SessionManager<T> {
274    /// Active sessions indexed by session ID
275    sessions: HashMap<SessionId, SessionState<T>>,
276    /// Session configuration
277    config: SessionConfig,
278    /// Metrics collector for session telemetry
279    metrics: Option<MetricsCollector>,
280    /// Last cleanup timestamp (unified time system)
281    last_cleanup: PhysicalTime,
282    /// Monotonic counter used to derive deterministic-but-unique session IDs
283    session_counter: u64,
284}
285
286impl<T> SessionManager<T>
287where
288    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>,
289{
290    /// Create a new session manager
291    ///
292    /// **Time System**: Uses `PhysicalTime` for timestamps.
293    pub fn new(config: SessionConfig, now: PhysicalTime) -> Self {
294        Self {
295            sessions: HashMap::new(),
296            config,
297            metrics: None,
298            last_cleanup: now,
299            session_counter: 0,
300        }
301    }
302
303    /// Create a new session manager from milliseconds timestamp
304    ///
305    /// Convenience constructor for backward compatibility.
306    pub fn new_from_ms(config: SessionConfig, now_ms: u64) -> Self {
307        Self::new(
308            config,
309            PhysicalTime {
310                ts_ms: now_ms,
311                uncertainty: None,
312            },
313        )
314    }
315
316    /// Create session manager with metrics collection
317    ///
318    /// **Time System**: Uses `PhysicalTime` for timestamps.
319    pub fn with_metrics(
320        config: SessionConfig,
321        metrics: MetricsCollector,
322        now: PhysicalTime,
323    ) -> Self {
324        Self {
325            sessions: HashMap::new(),
326            config,
327            metrics: Some(metrics),
328            last_cleanup: now,
329            session_counter: 0,
330        }
331    }
332
333    /// Deterministically derive a unique session ID using the caller-supplied timestamp
334    /// and a local monotonic counter (no ambient randomness).
335    fn generate_session_id(&mut self, now: &PhysicalTime) -> SessionId {
336        let mut input = Vec::new();
337        input.extend_from_slice(b"aura.sync.session.id");
338        input.extend_from_slice(&now.ts_ms.to_le_bytes());
339        input.extend_from_slice(&self.session_counter.to_le_bytes());
340        self.session_counter = self.session_counter.wrapping_add(1);
341
342        let digest = aura_core::hash::hash(&input);
343        let mut uuid_bytes = [0u8; 16];
344        uuid_bytes.copy_from_slice(&digest[..16]);
345        SessionId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes))
346    }
347
348    /// Create a new session with participants
349    ///
350    /// **Time System**: Uses `PhysicalTime` for timestamps.
351    /// Note: Callers should obtain `now` via their time provider and pass it to this method
352    pub fn create_session(
353        &mut self,
354        participants: Vec<DeviceId>,
355        now: &PhysicalTime,
356    ) -> SyncResult<SessionId> {
357        // Validate participant count
358        if participants.len() > self.config.max_participants as usize {
359            return Err(sync_validation_error(format!(
360                "Too many participants: {} > {}",
361                participants.len(),
362                self.config.max_participants
363            )));
364        }
365
366        // Check concurrent session limit
367        let active_count = self.count_active_sessions();
368        if active_count >= self.config.max_concurrent_sessions as usize {
369            return Err(sync_resource_with_limit(
370                "concurrent_sessions",
371                "Maximum concurrent sessions exceeded",
372                u64::from(self.config.max_concurrent_sessions),
373            ));
374        }
375
376        let session_id = self.generate_session_id(now);
377        let timeout_ms = now.ts_ms + self.config.timeout.as_millis() as u64;
378
379        let session_state = SessionState::Initializing {
380            participants,
381            timeout_at: PhysicalTime {
382                ts_ms: timeout_ms,
383                uncertainty: now.uncertainty,
384            },
385            created_at: now.clone(),
386        };
387
388        self.sessions.insert(session_id, session_state);
389
390        // Record metrics with the provided now parameter
391        if let Some(ref metrics) = self.metrics {
392            metrics.record_sync_start(&session_id.to_string(), now.ts_ms);
393        }
394
395        Ok(session_id)
396    }
397
398    /// Create a new session with participants (from milliseconds)
399    ///
400    /// Convenience method for backward compatibility.
401    pub fn create_session_ms(
402        &mut self,
403        participants: Vec<DeviceId>,
404        now_ms: u64,
405    ) -> SyncResult<SessionId> {
406        self.create_session(
407            participants,
408            &PhysicalTime {
409                ts_ms: now_ms,
410                uncertainty: None,
411            },
412        )
413    }
414
415    /// Activate a session with initial protocol state
416    ///
417    /// **Time System**: Uses `PhysicalTime` for timestamps.
418    pub fn activate_session(
419        &mut self,
420        session_id: SessionId,
421        protocol_state: T,
422        current_time: &PhysicalTime,
423    ) -> SyncResult<()> {
424        let session = self
425            .sessions
426            .get_mut(&session_id)
427            .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
428
429        // Check timeout before pattern matching to avoid borrow conflicts
430        if session.is_timed_out(current_time) {
431            return Err(sync_timeout_error(
432                "session_activation",
433                self.config.timeout,
434            ));
435        }
436
437        match session {
438            SessionState::Initializing { participants, .. } => {
439                let participants = participants.clone();
440                let timeout_ms = current_time.ts_ms
441                    + self.config.resource_limits.max_session_duration.as_millis() as u64;
442                *session = SessionState::Active {
443                    protocol_state,
444                    started_at: current_time.clone(),
445                    participants,
446                    timeout_at: PhysicalTime {
447                        ts_ms: timeout_ms,
448                        uncertainty: current_time.uncertainty,
449                    },
450                };
451
452                Ok(())
453            }
454            _ => Err(sync_session_error(format!(
455                "Session {session_id} is not in initializing state"
456            ))),
457        }
458    }
459
460    /// Activate a session with initial protocol state (from milliseconds)
461    ///
462    /// Convenience method for backward compatibility.
463    pub fn activate_session_ms(
464        &mut self,
465        session_id: SessionId,
466        protocol_state: T,
467        current_timestamp_ms: u64,
468    ) -> SyncResult<()> {
469        self.activate_session(
470            session_id,
471            protocol_state,
472            &PhysicalTime {
473                ts_ms: current_timestamp_ms,
474                uncertainty: None,
475            },
476        )
477    }
478
479    /// Update session protocol state
480    ///
481    /// **Time System**: Uses `PhysicalTime` for timestamps.
482    pub fn update_session(
483        &mut self,
484        session_id: SessionId,
485        new_state: T,
486        current_time: &PhysicalTime,
487    ) -> SyncResult<()>
488    where
489        T: std::fmt::Debug,
490    {
491        let session = self
492            .sessions
493            .get_mut(&session_id)
494            .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
495
496        // Check timeout before pattern matching to avoid borrow conflicts
497        if session.is_timed_out(current_time) {
498            self.timeout_session(session_id, current_time)?;
499            return Err(sync_timeout_error("session_update", self.config.timeout));
500        }
501
502        match session {
503            SessionState::Active { protocol_state, .. } => {
504                *protocol_state = new_state;
505                Ok(())
506            }
507            _ => Err(sync_session_error(format!(
508                "Session {session_id} is not active"
509            ))),
510        }
511    }
512
513    /// Update session protocol state (from milliseconds)
514    ///
515    /// Convenience method for backward compatibility.
516    pub fn update_session_ms(
517        &mut self,
518        session_id: SessionId,
519        new_state: T,
520        current_timestamp_ms: u64,
521    ) -> SyncResult<()>
522    where
523        T: std::fmt::Debug,
524    {
525        self.update_session(
526            session_id,
527            new_state,
528            &PhysicalTime {
529                ts_ms: current_timestamp_ms,
530                uncertainty: None,
531            },
532        )
533    }
534
535    /// Complete a session successfully
536    ///
537    /// **Time System**: Uses `PhysicalTime` for timestamps.
538    pub fn complete_session(
539        &mut self,
540        session_id: SessionId,
541        operations_count: u64,
542        bytes_transferred: u64,
543        metadata: HashMap<String, String>,
544        current_time: &PhysicalTime,
545    ) -> SyncResult<()> {
546        let session = self
547            .sessions
548            .get_mut(&session_id)
549            .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
550
551        let duration_ms = session.duration_ms(current_time).unwrap_or(0);
552        let participants = session.participants().to_vec();
553
554        let result = SessionResult::Success {
555            duration_ms,
556            operations_count,
557            bytes_transferred,
558            participants,
559            metadata,
560        };
561
562        *session = SessionState::Completed(result);
563
564        // Record metrics
565        if let Some(ref metrics) = self.metrics {
566            metrics.record_sync_completion(
567                &session_id.to_string(),
568                operations_count,
569                bytes_transferred,
570                current_time.ts_ms,
571            );
572        }
573
574        Ok(())
575    }
576
577    /// Complete a session successfully (from milliseconds)
578    ///
579    /// Convenience method for backward compatibility.
580    pub fn complete_session_ms(
581        &mut self,
582        session_id: SessionId,
583        operations_count: u64,
584        bytes_transferred: u64,
585        metadata: HashMap<String, String>,
586        current_timestamp_ms: u64,
587    ) -> SyncResult<()> {
588        self.complete_session(
589            session_id,
590            operations_count,
591            bytes_transferred,
592            metadata,
593            &PhysicalTime {
594                ts_ms: current_timestamp_ms,
595                uncertainty: None,
596            },
597        )
598    }
599
600    /// Fail a session with error context
601    ///
602    /// **Time System**: Uses `PhysicalTime` for timestamps.
603    pub fn fail_session(
604        &mut self,
605        session_id: SessionId,
606        error: SessionError,
607        partial_results: Option<PartialResults>,
608        current_time: &PhysicalTime,
609    ) -> SyncResult<()> {
610        let session = self
611            .sessions
612            .get_mut(&session_id)
613            .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
614
615        let duration_ms = session.duration_ms(current_time).unwrap_or(0);
616
617        let result = SessionResult::Failure {
618            error: error.clone(),
619            duration_ms,
620            partial_results,
621        };
622
623        *session = SessionState::Completed(result);
624
625        // Record metrics
626        if let Some(ref metrics) = self.metrics {
627            let category = match error {
628                SessionError::Timeout { .. } => ErrorCategory::Timeout,
629                SessionError::ParticipantDisconnected { .. } => ErrorCategory::Network,
630                SessionError::ResourceLimitExceeded { .. } => ErrorCategory::Resource,
631                SessionError::ProtocolViolation { .. } => ErrorCategory::Protocol,
632                SessionError::CapacityExceeded { .. } => ErrorCategory::Resource,
633                SessionError::InvalidStateTransition { .. } => ErrorCategory::Protocol,
634            };
635            metrics.record_sync_failure(&session_id.to_string(), category, &error.to_string());
636        }
637
638        Ok(())
639    }
640
641    /// Fail a session with error context (from milliseconds)
642    ///
643    /// Convenience method for backward compatibility.
644    pub fn fail_session_ms(
645        &mut self,
646        session_id: SessionId,
647        error: SessionError,
648        partial_results: Option<PartialResults>,
649        current_timestamp_ms: u64,
650    ) -> SyncResult<()> {
651        self.fail_session(
652            session_id,
653            error,
654            partial_results,
655            &PhysicalTime {
656                ts_ms: current_timestamp_ms,
657                uncertainty: None,
658            },
659        )
660    }
661
662    /// Timeout a session
663    ///
664    /// **Time System**: Uses `PhysicalTime` for timestamps.
665    pub fn timeout_session(
666        &mut self,
667        session_id: SessionId,
668        current_time: &PhysicalTime,
669    ) -> SyncResult<()>
670    where
671        T: std::fmt::Debug,
672    {
673        let session = self
674            .sessions
675            .get_mut(&session_id)
676            .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
677
678        let duration_ms = session.duration_ms(current_time).unwrap_or(0);
679        let last_known_state = format!("{session:?}");
680
681        let result = SessionResult::Timeout {
682            duration_ms,
683            last_known_state,
684        };
685
686        *session = SessionState::Completed(result);
687
688        // Record metrics
689        if let Some(ref metrics) = self.metrics {
690            metrics.record_sync_failure(
691                &session_id.to_string(),
692                ErrorCategory::Timeout,
693                "Session timeout",
694            );
695        }
696
697        Ok(())
698    }
699
700    /// Timeout a session (from milliseconds)
701    ///
702    /// Convenience method for backward compatibility.
703    pub fn timeout_session_ms(
704        &mut self,
705        session_id: SessionId,
706        current_timestamp_ms: u64,
707    ) -> SyncResult<()>
708    where
709        T: std::fmt::Debug,
710    {
711        self.timeout_session(
712            session_id,
713            &PhysicalTime {
714                ts_ms: current_timestamp_ms,
715                uncertainty: None,
716            },
717        )
718    }
719
720    /// Get session state
721    pub fn get_session(&self, session_id: &SessionId) -> Option<&SessionState<T>> {
722        self.sessions.get(session_id)
723    }
724
725    /// Get protocol state for active session
726    pub fn get_protocol_state(&self, session_id: &SessionId) -> Option<&T> {
727        match self.sessions.get(session_id)? {
728            SessionState::Active { protocol_state, .. } => Some(protocol_state),
729            _ => None,
730        }
731    }
732
733    /// List all active sessions
734    pub fn active_sessions(&self) -> Vec<(SessionId, &SessionState<T>)> {
735        self.sessions
736            .iter()
737            .filter(|(_, state)| state.is_active())
738            .map(|(id, state)| (*id, state))
739            .collect()
740    }
741
742    /// Count active sessions
743    pub fn count_active_sessions(&self) -> usize {
744        self.sessions
745            .values()
746            .filter(|state| state.is_active())
747            .count()
748    }
749
750    /// Count completed sessions
751    pub fn count_completed_sessions(&self) -> usize {
752        self.sessions
753            .values()
754            .filter(|state| state.is_terminal())
755            .count()
756    }
757
758    /// Cleanup stale and completed sessions
759    ///
760    /// **Time System**: Uses `PhysicalTime` for timestamps.
761    /// Note: Callers should obtain `now` via their time provider and pass it to this method
762    pub fn cleanup_stale_sessions(&mut self, now: &PhysicalTime) -> SyncResult<usize>
763    where
764        T: std::fmt::Debug,
765    {
766        let elapsed_ms = now.ts_ms.saturating_sub(self.last_cleanup.ts_ms);
767        if elapsed_ms < self.config.cleanup_interval.as_millis() as u64 {
768            return Ok(0);
769        }
770
771        let mut removed = 0;
772        let mut to_timeout = Vec::new();
773        let mut to_remove = Vec::new();
774
775        // Identify sessions to timeout or remove
776        for (session_id, session) in &self.sessions {
777            if session.is_timed_out(now) && !session.is_terminal() {
778                to_timeout.push(*session_id);
779            } else if session.is_terminal() {
780                // Completed sessions are removed on the next cleanup run
781                to_remove.push(*session_id);
782            }
783        }
784
785        // Timeout sessions
786        for session_id in to_timeout {
787            self.timeout_session(session_id, now)?;
788            removed += 1;
789        }
790
791        // Remove completed sessions
792        for session_id in to_remove {
793            self.sessions.remove(&session_id);
794            removed += 1;
795        }
796
797        self.last_cleanup = now.clone();
798        Ok(removed)
799    }
800
801    /// Cleanup stale and completed sessions (from milliseconds)
802    ///
803    /// Convenience method for backward compatibility.
804    pub fn cleanup_stale_sessions_ms(&mut self, now_ms: u64) -> SyncResult<usize>
805    where
806        T: std::fmt::Debug,
807    {
808        self.cleanup_stale_sessions(&PhysicalTime {
809            ts_ms: now_ms,
810            uncertainty: None,
811        })
812    }
813
814    /// Get session statistics
815    pub fn get_statistics(&self) -> SessionManagerStatistics {
816        let mut active_count = 0u64;
817        let mut completed_count = 0u64;
818        let mut failed_count = 0u64;
819        let mut timeout_count = 0u64;
820        let mut total_duration_ms = 0u64;
821        let mut total_operations = 0u64;
822
823        for session in self.sessions.values() {
824            match session {
825                SessionState::Active { .. } => active_count += 1,
826                SessionState::Completed(result) => match result {
827                    SessionResult::Success {
828                        duration_ms,
829                        operations_count,
830                        ..
831                    } => {
832                        completed_count += 1;
833                        total_duration_ms += duration_ms;
834                        total_operations += operations_count;
835                    }
836                    SessionResult::Failure { duration_ms, .. } => {
837                        failed_count += 1;
838                        total_duration_ms += duration_ms;
839                    }
840                    SessionResult::Timeout { duration_ms, .. } => {
841                        timeout_count += 1;
842                        total_duration_ms += duration_ms;
843                    }
844                },
845                _ => {} // Ignore initializing/terminating for stats
846            }
847        }
848
849        let total_sessions = completed_count + failed_count + timeout_count;
850        let success_rate = if total_sessions > 0 {
851            (completed_count as f64 / total_sessions as f64) * 100.0
852        } else {
853            100.0
854        };
855
856        let average_duration_ms = if total_sessions > 0 {
857            total_duration_ms / total_sessions
858        } else {
859            0
860        };
861
862        SessionManagerStatistics {
863            active_sessions: active_count,
864            completed_sessions: completed_count,
865            failed_sessions: failed_count,
866            timeout_sessions: timeout_count,
867            total_sessions,
868            success_rate_percent: success_rate,
869            average_duration_ms,
870            total_operations,
871        }
872    }
873
874    /// Close a session for a specific peer
875    pub fn close_session(&mut self, peer: DeviceId) -> SyncResult<()> {
876        // Find sessions involving this peer and close them
877        let session_ids_to_remove: Vec<SessionId> = self
878            .sessions
879            .iter()
880            .filter_map(|(session_id, session_state)| match session_state {
881                SessionState::Active { participants, .. } => {
882                    if participants.contains(&peer) {
883                        Some(*session_id)
884                    } else {
885                        None
886                    }
887                }
888                _ => None,
889            })
890            .collect();
891
892        for session_id in session_ids_to_remove {
893            self.sessions.remove(&session_id);
894        }
895
896        Ok(())
897    }
898
899    /// Check if peer has an active session
900    pub fn has_active_session(&self, peer: DeviceId) -> bool {
901        self.sessions.values().any(|session_state| {
902            matches!(session_state, SessionState::Active { participants, .. } if participants.contains(&peer))
903        })
904    }
905}
906
907/// Session manager statistics
908#[derive(Debug, Clone, Serialize, Deserialize)]
909pub struct SessionManagerStatistics {
910    pub active_sessions: u64,
911    pub completed_sessions: u64,
912    pub failed_sessions: u64,
913    pub timeout_sessions: u64,
914    pub total_sessions: u64,
915    pub success_rate_percent: f64,
916    pub average_duration_ms: u64,
917    pub total_operations: u64,
918}
919
920impl Default for SessionManagerStatistics {
921    fn default() -> Self {
922        Self {
923            active_sessions: 0,
924            completed_sessions: 0,
925            failed_sessions: 0,
926            timeout_sessions: 0,
927            total_sessions: 0,
928            success_rate_percent: 100.0,
929            average_duration_ms: 0,
930            total_operations: 0,
931        }
932    }
933}
934
935/// Session manager builder for easy configuration
936pub struct SessionManagerBuilder<T> {
937    config: SessionConfig,
938    metrics: Option<MetricsCollector>,
939    _phantom: std::marker::PhantomData<T>,
940}
941
942impl<T> SessionManagerBuilder<T>
943where
944    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>,
945{
946    /// Create new builder with default configuration
947    pub fn new() -> Self {
948        Self {
949            config: SessionConfig::default(),
950            metrics: None,
951            _phantom: std::marker::PhantomData,
952        }
953    }
954
955    /// Set custom configuration
956    pub fn config(mut self, config: SessionConfig) -> Self {
957        self.config = config;
958        self
959    }
960
961    /// Enable metrics collection
962    pub fn with_metrics(mut self, metrics: MetricsCollector) -> Self {
963        self.metrics = Some(metrics);
964        self
965    }
966
967    /// Build the session manager
968    ///
969    /// **Time System**: Uses `PhysicalTime` for timestamps.
970    /// Note: Callers should obtain `now` via their time provider and pass it to this method
971    pub fn build(self, now: PhysicalTime) -> SessionManager<T> {
972        if let Some(metrics) = self.metrics {
973            SessionManager::with_metrics(self.config, metrics, now)
974        } else {
975            SessionManager::new(self.config, now)
976        }
977    }
978
979    /// Build the session manager (from milliseconds)
980    ///
981    /// Convenience method for backward compatibility.
982    pub fn build_ms(self, now_ms: u64) -> SessionManager<T> {
983        self.build(PhysicalTime {
984            ts_ms: now_ms,
985            uncertainty: None,
986        })
987    }
988}
989
990impl<T> Default for SessionManagerBuilder<T>
991where
992    T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>,
993{
994    fn default() -> Self {
995        Self::new()
996    }
997}
998
999#[cfg(test)]
1000mod tests {
1001    use super::*;
1002    use aura_core::AuraError;
1003    use aura_testkit::builders::test_device_id;
1004
1005    /// Helper function to create PhysicalTime for tests
1006    fn test_time(ts_ms: u64) -> PhysicalTime {
1007        PhysicalTime {
1008            ts_ms,
1009            uncertainty: None,
1010        }
1011    }
1012
1013    #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1014    struct TestProtocolState {
1015        phase: String,
1016        data: Vec<u8>,
1017    }
1018
1019    #[test]
1020    fn test_session_creation_and_activation() {
1021        let now = test_time(1000000); // Unix timestamp in ms
1022        let mut manager =
1023            SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1024        let participants = vec![test_device_id(1), test_device_id(2)];
1025
1026        // Create session
1027        let session_id = manager.create_session(participants.clone(), &now).unwrap();
1028        assert_eq!(manager.count_active_sessions(), 0); // Not active yet
1029
1030        // Activate session
1031        let initial_state = TestProtocolState {
1032            phase: "initialization".to_string(),
1033            data: vec![1, 2, 3],
1034        };
1035        manager
1036            .activate_session(session_id, initial_state.clone(), &now)
1037            .unwrap();
1038        assert_eq!(manager.count_active_sessions(), 1);
1039
1040        // Verify session state
1041        let session = manager.get_session(&session_id).unwrap();
1042        match session {
1043            SessionState::Active {
1044                protocol_state,
1045                participants: session_participants,
1046                ..
1047            } => {
1048                assert_eq!(protocol_state, &initial_state);
1049                assert_eq!(session_participants, &participants);
1050            }
1051            _ => panic!("Session should be active"),
1052        }
1053    }
1054
1055    #[test]
1056    fn test_session_completion() {
1057        let now = test_time(1000000); // Unix timestamp in ms
1058        let mut manager =
1059            SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1060        let session_id = manager
1061            .create_session(vec![test_device_id(1)], &now)
1062            .unwrap();
1063
1064        let initial_state = TestProtocolState {
1065            phase: "test".to_string(),
1066            data: vec![],
1067        };
1068        manager
1069            .activate_session(session_id, initial_state, &now)
1070            .unwrap();
1071
1072        // Complete session
1073        let mut metadata = HashMap::new();
1074        metadata.insert("test_key".to_string(), "test_value".to_string());
1075
1076        manager
1077            .complete_session(session_id, 100, 1024, metadata, &test_time(1000100))
1078            .unwrap();
1079        assert_eq!(manager.count_active_sessions(), 0);
1080        assert_eq!(manager.count_completed_sessions(), 1);
1081
1082        // Verify result
1083        let session = manager.get_session(&session_id).unwrap();
1084        match session {
1085            SessionState::Completed(SessionResult::Success {
1086                operations_count,
1087                bytes_transferred,
1088                ..
1089            }) => {
1090                assert_eq!(*operations_count, 100);
1091                assert_eq!(*bytes_transferred, 1024);
1092            }
1093            _ => panic!("Session should be completed successfully"),
1094        }
1095    }
1096
1097    #[test]
1098    fn test_session_failure() {
1099        let now = test_time(1000000); // Unix timestamp in ms
1100        let mut manager =
1101            SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1102        let session_id = manager
1103            .create_session(vec![test_device_id(1)], &now)
1104            .unwrap();
1105
1106        let initial_state = TestProtocolState {
1107            phase: "test".to_string(),
1108            data: vec![],
1109        };
1110        manager
1111            .activate_session(session_id, initial_state, &now)
1112            .unwrap();
1113
1114        // Fail session
1115        let error = SessionError::ProtocolViolation {
1116            constraint: "test constraint".to_string(),
1117        };
1118        manager
1119            .fail_session(session_id, error, None, &test_time(1000010))
1120            .unwrap();
1121
1122        // Verify failure
1123        let session = manager.get_session(&session_id).unwrap();
1124        match session {
1125            SessionState::Completed(SessionResult::Failure {
1126                error: session_error,
1127                ..
1128            }) => match session_error {
1129                SessionError::ProtocolViolation { constraint } => {
1130                    assert_eq!(constraint, "test constraint");
1131                }
1132                _ => panic!("Wrong error type"),
1133            },
1134            _ => panic!("Session should be completed with failure"),
1135        }
1136    }
1137
1138    #[test]
1139    fn test_concurrent_session_limit() {
1140        let config = SessionConfig {
1141            max_concurrent_sessions: 2,
1142            ..SessionConfig::default()
1143        };
1144        let now = test_time(1000000); // Unix timestamp in ms
1145        let mut manager = SessionManager::<TestProtocolState>::new(config, now.clone());
1146
1147        // Create and activate maximum sessions
1148        let session1 = manager
1149            .create_session(vec![test_device_id(1)], &now)
1150            .unwrap();
1151        let session2 = manager
1152            .create_session(vec![test_device_id(1)], &now)
1153            .unwrap();
1154
1155        let state = TestProtocolState {
1156            phase: "test".to_string(),
1157            data: vec![],
1158        };
1159        manager
1160            .activate_session(session1, state.clone(), &now)
1161            .unwrap();
1162        manager.activate_session(session2, state, &now).unwrap();
1163
1164        // Try to exceed limit
1165        let result = manager.create_session(vec![test_device_id(1)], &now);
1166        assert!(result.is_err());
1167        assert!(matches!(result.unwrap_err(), AuraError::Internal { .. }));
1168    }
1169
1170    #[test]
1171    fn test_session_timeout() {
1172        let config = SessionConfig {
1173            timeout: Duration::from_millis(100),
1174            ..SessionConfig::default()
1175        };
1176        let now = test_time(1000000); // Unix timestamp in ms
1177        let mut manager = SessionManager::<TestProtocolState>::new(config, now.clone());
1178
1179        let session_id = manager
1180            .create_session(vec![test_device_id(1)], &now)
1181            .unwrap();
1182
1183        // Advance time past timeout (100ms timeout, we advance 200ms)
1184        let future_time = test_time(1000200);
1185
1186        // Try to activate - should fail due to timeout
1187        let state = TestProtocolState {
1188            phase: "test".to_string(),
1189            data: vec![],
1190        };
1191        let result = manager.activate_session(session_id, state, &future_time);
1192        assert!(result.is_err());
1193        // Timeout errors now map to Internal
1194        assert!(matches!(result.unwrap_err(), AuraError::Internal { .. }));
1195    }
1196
1197    #[test]
1198    fn test_cleanup_stale_sessions() {
1199        let config = SessionConfig {
1200            cleanup_interval: Duration::from_millis(50),
1201            ..SessionConfig::default()
1202        };
1203        let now = test_time(1000000); // Unix timestamp in ms
1204        let mut manager = SessionManager::<TestProtocolState>::new(config, now.clone());
1205
1206        // Create and complete a session
1207        let session_id = manager
1208            .create_session(vec![test_device_id(1)], &now)
1209            .unwrap();
1210        let state = TestProtocolState {
1211            phase: "test".to_string(),
1212            data: vec![],
1213        };
1214        manager.activate_session(session_id, state, &now).unwrap();
1215        manager
1216            .complete_session(session_id, 0, 0, HashMap::new(), &test_time(1000050))
1217            .unwrap();
1218
1219        assert_eq!(manager.sessions.len(), 1);
1220
1221        // Advance time past cleanup interval (50ms interval, we advance 200ms total)
1222        let cleanup_time = test_time(1000200);
1223
1224        // Cleanup should remove completed sessions
1225        let removed = manager.cleanup_stale_sessions(&cleanup_time).unwrap();
1226        assert!(removed > 0);
1227    }
1228
1229    #[test]
1230    fn test_session_statistics() {
1231        let now = test_time(1000000); // Unix timestamp in ms
1232        let mut manager =
1233            SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1234
1235        // Create and complete some sessions
1236        for i in 0..3 {
1237            let session_id = manager
1238                .create_session(vec![test_device_id(1)], &now)
1239                .unwrap();
1240            let state = TestProtocolState {
1241                phase: "test".to_string(),
1242                data: vec![],
1243            };
1244            manager.activate_session(session_id, state, &now).unwrap();
1245
1246            if i < 2 {
1247                manager
1248                    .complete_session(
1249                        session_id,
1250                        10 * (i + 1),
1251                        100 * (i + 1),
1252                        HashMap::new(),
1253                        &test_time(1000000 + 100 * (i + 1)),
1254                    )
1255                    .unwrap();
1256            } else {
1257                let error = SessionError::ProtocolViolation {
1258                    constraint: "test".to_string(),
1259                };
1260                manager
1261                    .fail_session(session_id, error, None, &test_time(1000050))
1262                    .unwrap();
1263            }
1264        }
1265
1266        let stats = manager.get_statistics();
1267        assert_eq!(stats.total_sessions, 3);
1268        assert_eq!(stats.completed_sessions, 2);
1269        assert_eq!(stats.failed_sessions, 1);
1270        assert_eq!(stats.timeout_sessions, 0);
1271        assert!((stats.success_rate_percent - 66.67).abs() < 0.1); // 2/3 * 100
1272    }
1273}