1use crate::core::metrics::ErrorCategory;
9use crate::core::{
10 sync_resource_with_limit, sync_session_error, sync_timeout_error, sync_validation_error,
11 MetricsCollector, SyncConfig, SyncResult,
12};
13use aura_core::time::PhysicalTime;
14use aura_core::{DeviceId, SessionId};
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::time::Duration;
18
19#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23pub enum SessionState<T> {
24 Initializing {
26 participants: Vec<DeviceId>,
27 timeout_at: PhysicalTime,
29 created_at: PhysicalTime,
31 },
32 Active {
34 protocol_state: T,
35 started_at: PhysicalTime,
37 participants: Vec<DeviceId>,
38 timeout_at: PhysicalTime,
40 },
41 Terminating {
43 result: SessionResult,
44 cleanup_deadline: PhysicalTime,
46 },
47 Completed(SessionResult),
49}
50
51impl<T> SessionState<T> {
52 pub fn is_timed_out(&self, now: &PhysicalTime) -> bool {
56 match self {
57 SessionState::Initializing { timeout_at, .. } => now.ts_ms >= timeout_at.ts_ms,
58 SessionState::Active { timeout_at, .. } => now.ts_ms >= timeout_at.ts_ms,
59 SessionState::Terminating {
60 cleanup_deadline, ..
61 } => now.ts_ms >= cleanup_deadline.ts_ms,
62 SessionState::Completed(_) => false,
63 }
64 }
65
66 pub fn is_timed_out_ms(&self, now_ms: u64) -> bool {
70 self.is_timed_out(&PhysicalTime {
71 ts_ms: now_ms,
72 uncertainty: None,
73 })
74 }
75
76 pub fn participants(&self) -> &[DeviceId] {
78 match self {
79 SessionState::Initializing { participants, .. } => participants,
80 SessionState::Active { participants, .. } => participants,
81 SessionState::Terminating { .. } => &[],
82 SessionState::Completed(_) => &[],
83 }
84 }
85
86 pub fn duration_ms(&self, current_time: &PhysicalTime) -> Option<u64> {
90 match self {
91 SessionState::Active { started_at, .. } => {
92 Some(current_time.ts_ms.saturating_sub(started_at.ts_ms))
93 }
94 SessionState::Terminating { result, .. } | SessionState::Completed(result) => {
95 match result {
96 SessionResult::Success { duration_ms, .. } => Some(*duration_ms),
97 SessionResult::Failure { duration_ms, .. } => Some(*duration_ms),
98 SessionResult::Timeout { duration_ms, .. } => Some(*duration_ms),
99 }
100 }
101 SessionState::Initializing { created_at, .. } => {
102 Some(current_time.ts_ms.saturating_sub(created_at.ts_ms))
103 }
104 }
105 }
106
107 pub fn is_terminal(&self) -> bool {
109 matches!(self, SessionState::Completed(_))
110 }
111
112 pub fn is_active(&self) -> bool {
114 matches!(self, SessionState::Active { .. })
115 }
116}
117
118#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
120pub enum SessionResult {
121 Success {
122 duration_ms: u64,
123 operations_count: u64,
124 bytes_transferred: u64,
125 participants: Vec<DeviceId>,
126 metadata: HashMap<String, String>,
127 },
128 Failure {
129 error: SessionError,
130 duration_ms: u64,
131 partial_results: Option<PartialResults>,
132 },
133 Timeout {
134 duration_ms: u64,
135 last_known_state: String,
136 },
137}
138
139impl SessionResult {
140 pub fn is_success(&self) -> bool {
142 matches!(self, SessionResult::Success { .. })
143 }
144
145 pub fn duration_ms(&self) -> u64 {
147 match self {
148 SessionResult::Success { duration_ms, .. } => *duration_ms,
149 SessionResult::Failure { duration_ms, .. } => *duration_ms,
150 SessionResult::Timeout { duration_ms, .. } => *duration_ms,
151 }
152 }
153
154 pub fn operations_count(&self) -> u64 {
156 match self {
157 SessionResult::Success {
158 operations_count, ..
159 } => *operations_count,
160 SessionResult::Failure {
161 partial_results: Some(partial),
162 ..
163 } => partial.operations_completed,
164 _ => 0,
165 }
166 }
167}
168
169#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
171pub struct PartialResults {
172 pub operations_completed: u64,
173 pub bytes_transferred: u64,
174 pub completed_participants: Vec<DeviceId>,
175 pub last_successful_operation: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, thiserror::Error, Serialize, Deserialize)]
180pub enum SessionError {
181 #[error("Session timeout after {duration_ms}ms")]
182 Timeout { duration_ms: u64 },
183
184 #[error("Participant {participant} disconnected")]
185 ParticipantDisconnected { participant: DeviceId },
186
187 #[error("Resource limit exceeded: {limit_type}")]
188 ResourceLimitExceeded { limit_type: String },
189
190 #[error("Protocol constraint violation: {constraint}")]
191 ProtocolViolation { constraint: String },
192
193 #[error("Session capacity exceeded: {current}/{max}")]
194 CapacityExceeded { current: u64, max: u64 },
195
196 #[error("Invalid session state transition: {from} -> {to}")]
197 InvalidStateTransition { from: String, to: String },
198}
199
200#[derive(Debug, Clone)]
202pub struct SessionConfig {
203 pub timeout: Duration,
205 pub max_participants: u32,
207 pub cleanup_interval: Duration,
209 pub max_concurrent_sessions: u32,
211 pub resource_limits: SessionResourceLimits,
213}
214
215impl Default for SessionConfig {
216 fn default() -> Self {
217 Self {
218 timeout: Duration::from_secs(300), max_participants: 10,
220 cleanup_interval: Duration::from_secs(60), max_concurrent_sessions: 20,
222 resource_limits: SessionResourceLimits::default(),
223 }
224 }
225}
226
227impl From<&SyncConfig> for SessionConfig {
228 fn from(sync_config: &SyncConfig) -> Self {
229 Self {
230 timeout: sync_config.network.sync_timeout,
231 max_participants: 10, cleanup_interval: sync_config.network.cleanup_interval,
233 max_concurrent_sessions: sync_config.peer_management.max_concurrent_syncs,
234 resource_limits: SessionResourceLimits::from(&sync_config.performance),
235 }
236 }
237}
238
239#[derive(Debug, Clone)]
241pub struct SessionResourceLimits {
242 pub max_memory_per_session: u64,
244 pub max_session_duration: Duration,
246 pub max_operations_per_session: u32,
248}
249
250impl Default for SessionResourceLimits {
251 fn default() -> Self {
252 Self {
253 max_memory_per_session: 10 * 1024 * 1024, max_session_duration: Duration::from_secs(3600), max_operations_per_session: 10000,
256 }
257 }
258}
259
260impl From<&crate::core::PerformanceConfig> for SessionResourceLimits {
261 fn from(perf_config: &crate::core::PerformanceConfig) -> Self {
262 Self {
263 max_memory_per_session: perf_config.memory_limit / 10, max_session_duration: Duration::from_secs(3600), max_operations_per_session: 10000,
266 }
267 }
268}
269
270pub struct SessionManager<T> {
274 sessions: HashMap<SessionId, SessionState<T>>,
276 config: SessionConfig,
278 metrics: Option<MetricsCollector>,
280 last_cleanup: PhysicalTime,
282 session_counter: u64,
284}
285
286impl<T> SessionManager<T>
287where
288 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>,
289{
290 pub fn new(config: SessionConfig, now: PhysicalTime) -> Self {
294 Self {
295 sessions: HashMap::new(),
296 config,
297 metrics: None,
298 last_cleanup: now,
299 session_counter: 0,
300 }
301 }
302
303 pub fn new_from_ms(config: SessionConfig, now_ms: u64) -> Self {
307 Self::new(
308 config,
309 PhysicalTime {
310 ts_ms: now_ms,
311 uncertainty: None,
312 },
313 )
314 }
315
316 pub fn with_metrics(
320 config: SessionConfig,
321 metrics: MetricsCollector,
322 now: PhysicalTime,
323 ) -> Self {
324 Self {
325 sessions: HashMap::new(),
326 config,
327 metrics: Some(metrics),
328 last_cleanup: now,
329 session_counter: 0,
330 }
331 }
332
333 fn generate_session_id(&mut self, now: &PhysicalTime) -> SessionId {
336 let mut input = Vec::new();
337 input.extend_from_slice(b"aura.sync.session.id");
338 input.extend_from_slice(&now.ts_ms.to_le_bytes());
339 input.extend_from_slice(&self.session_counter.to_le_bytes());
340 self.session_counter = self.session_counter.wrapping_add(1);
341
342 let digest = aura_core::hash::hash(&input);
343 let mut uuid_bytes = [0u8; 16];
344 uuid_bytes.copy_from_slice(&digest[..16]);
345 SessionId::from_uuid(uuid::Uuid::from_bytes(uuid_bytes))
346 }
347
348 pub fn create_session(
353 &mut self,
354 participants: Vec<DeviceId>,
355 now: &PhysicalTime,
356 ) -> SyncResult<SessionId> {
357 if participants.len() > self.config.max_participants as usize {
359 return Err(sync_validation_error(format!(
360 "Too many participants: {} > {}",
361 participants.len(),
362 self.config.max_participants
363 )));
364 }
365
366 let active_count = self.count_active_sessions();
368 if active_count >= self.config.max_concurrent_sessions as usize {
369 return Err(sync_resource_with_limit(
370 "concurrent_sessions",
371 "Maximum concurrent sessions exceeded",
372 u64::from(self.config.max_concurrent_sessions),
373 ));
374 }
375
376 let session_id = self.generate_session_id(now);
377 let timeout_ms = now.ts_ms + self.config.timeout.as_millis() as u64;
378
379 let session_state = SessionState::Initializing {
380 participants,
381 timeout_at: PhysicalTime {
382 ts_ms: timeout_ms,
383 uncertainty: now.uncertainty,
384 },
385 created_at: now.clone(),
386 };
387
388 self.sessions.insert(session_id, session_state);
389
390 if let Some(ref metrics) = self.metrics {
392 metrics.record_sync_start(&session_id.to_string(), now.ts_ms);
393 }
394
395 Ok(session_id)
396 }
397
398 pub fn create_session_ms(
402 &mut self,
403 participants: Vec<DeviceId>,
404 now_ms: u64,
405 ) -> SyncResult<SessionId> {
406 self.create_session(
407 participants,
408 &PhysicalTime {
409 ts_ms: now_ms,
410 uncertainty: None,
411 },
412 )
413 }
414
415 pub fn activate_session(
419 &mut self,
420 session_id: SessionId,
421 protocol_state: T,
422 current_time: &PhysicalTime,
423 ) -> SyncResult<()> {
424 let session = self
425 .sessions
426 .get_mut(&session_id)
427 .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
428
429 if session.is_timed_out(current_time) {
431 return Err(sync_timeout_error(
432 "session_activation",
433 self.config.timeout,
434 ));
435 }
436
437 match session {
438 SessionState::Initializing { participants, .. } => {
439 let participants = participants.clone();
440 let timeout_ms = current_time.ts_ms
441 + self.config.resource_limits.max_session_duration.as_millis() as u64;
442 *session = SessionState::Active {
443 protocol_state,
444 started_at: current_time.clone(),
445 participants,
446 timeout_at: PhysicalTime {
447 ts_ms: timeout_ms,
448 uncertainty: current_time.uncertainty,
449 },
450 };
451
452 Ok(())
453 }
454 _ => Err(sync_session_error(format!(
455 "Session {session_id} is not in initializing state"
456 ))),
457 }
458 }
459
460 pub fn activate_session_ms(
464 &mut self,
465 session_id: SessionId,
466 protocol_state: T,
467 current_timestamp_ms: u64,
468 ) -> SyncResult<()> {
469 self.activate_session(
470 session_id,
471 protocol_state,
472 &PhysicalTime {
473 ts_ms: current_timestamp_ms,
474 uncertainty: None,
475 },
476 )
477 }
478
479 pub fn update_session(
483 &mut self,
484 session_id: SessionId,
485 new_state: T,
486 current_time: &PhysicalTime,
487 ) -> SyncResult<()>
488 where
489 T: std::fmt::Debug,
490 {
491 let session = self
492 .sessions
493 .get_mut(&session_id)
494 .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
495
496 if session.is_timed_out(current_time) {
498 self.timeout_session(session_id, current_time)?;
499 return Err(sync_timeout_error("session_update", self.config.timeout));
500 }
501
502 match session {
503 SessionState::Active { protocol_state, .. } => {
504 *protocol_state = new_state;
505 Ok(())
506 }
507 _ => Err(sync_session_error(format!(
508 "Session {session_id} is not active"
509 ))),
510 }
511 }
512
513 pub fn update_session_ms(
517 &mut self,
518 session_id: SessionId,
519 new_state: T,
520 current_timestamp_ms: u64,
521 ) -> SyncResult<()>
522 where
523 T: std::fmt::Debug,
524 {
525 self.update_session(
526 session_id,
527 new_state,
528 &PhysicalTime {
529 ts_ms: current_timestamp_ms,
530 uncertainty: None,
531 },
532 )
533 }
534
535 pub fn complete_session(
539 &mut self,
540 session_id: SessionId,
541 operations_count: u64,
542 bytes_transferred: u64,
543 metadata: HashMap<String, String>,
544 current_time: &PhysicalTime,
545 ) -> SyncResult<()> {
546 let session = self
547 .sessions
548 .get_mut(&session_id)
549 .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
550
551 let duration_ms = session.duration_ms(current_time).unwrap_or(0);
552 let participants = session.participants().to_vec();
553
554 let result = SessionResult::Success {
555 duration_ms,
556 operations_count,
557 bytes_transferred,
558 participants,
559 metadata,
560 };
561
562 *session = SessionState::Completed(result);
563
564 if let Some(ref metrics) = self.metrics {
566 metrics.record_sync_completion(
567 &session_id.to_string(),
568 operations_count,
569 bytes_transferred,
570 current_time.ts_ms,
571 );
572 }
573
574 Ok(())
575 }
576
577 pub fn complete_session_ms(
581 &mut self,
582 session_id: SessionId,
583 operations_count: u64,
584 bytes_transferred: u64,
585 metadata: HashMap<String, String>,
586 current_timestamp_ms: u64,
587 ) -> SyncResult<()> {
588 self.complete_session(
589 session_id,
590 operations_count,
591 bytes_transferred,
592 metadata,
593 &PhysicalTime {
594 ts_ms: current_timestamp_ms,
595 uncertainty: None,
596 },
597 )
598 }
599
600 pub fn fail_session(
604 &mut self,
605 session_id: SessionId,
606 error: SessionError,
607 partial_results: Option<PartialResults>,
608 current_time: &PhysicalTime,
609 ) -> SyncResult<()> {
610 let session = self
611 .sessions
612 .get_mut(&session_id)
613 .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
614
615 let duration_ms = session.duration_ms(current_time).unwrap_or(0);
616
617 let result = SessionResult::Failure {
618 error: error.clone(),
619 duration_ms,
620 partial_results,
621 };
622
623 *session = SessionState::Completed(result);
624
625 if let Some(ref metrics) = self.metrics {
627 let category = match error {
628 SessionError::Timeout { .. } => ErrorCategory::Timeout,
629 SessionError::ParticipantDisconnected { .. } => ErrorCategory::Network,
630 SessionError::ResourceLimitExceeded { .. } => ErrorCategory::Resource,
631 SessionError::ProtocolViolation { .. } => ErrorCategory::Protocol,
632 SessionError::CapacityExceeded { .. } => ErrorCategory::Resource,
633 SessionError::InvalidStateTransition { .. } => ErrorCategory::Protocol,
634 };
635 metrics.record_sync_failure(&session_id.to_string(), category, &error.to_string());
636 }
637
638 Ok(())
639 }
640
641 pub fn fail_session_ms(
645 &mut self,
646 session_id: SessionId,
647 error: SessionError,
648 partial_results: Option<PartialResults>,
649 current_timestamp_ms: u64,
650 ) -> SyncResult<()> {
651 self.fail_session(
652 session_id,
653 error,
654 partial_results,
655 &PhysicalTime {
656 ts_ms: current_timestamp_ms,
657 uncertainty: None,
658 },
659 )
660 }
661
662 pub fn timeout_session(
666 &mut self,
667 session_id: SessionId,
668 current_time: &PhysicalTime,
669 ) -> SyncResult<()>
670 where
671 T: std::fmt::Debug,
672 {
673 let session = self
674 .sessions
675 .get_mut(&session_id)
676 .ok_or_else(|| sync_session_error(format!("Session {session_id} not found")))?;
677
678 let duration_ms = session.duration_ms(current_time).unwrap_or(0);
679 let last_known_state = format!("{session:?}");
680
681 let result = SessionResult::Timeout {
682 duration_ms,
683 last_known_state,
684 };
685
686 *session = SessionState::Completed(result);
687
688 if let Some(ref metrics) = self.metrics {
690 metrics.record_sync_failure(
691 &session_id.to_string(),
692 ErrorCategory::Timeout,
693 "Session timeout",
694 );
695 }
696
697 Ok(())
698 }
699
700 pub fn timeout_session_ms(
704 &mut self,
705 session_id: SessionId,
706 current_timestamp_ms: u64,
707 ) -> SyncResult<()>
708 where
709 T: std::fmt::Debug,
710 {
711 self.timeout_session(
712 session_id,
713 &PhysicalTime {
714 ts_ms: current_timestamp_ms,
715 uncertainty: None,
716 },
717 )
718 }
719
720 pub fn get_session(&self, session_id: &SessionId) -> Option<&SessionState<T>> {
722 self.sessions.get(session_id)
723 }
724
725 pub fn get_protocol_state(&self, session_id: &SessionId) -> Option<&T> {
727 match self.sessions.get(session_id)? {
728 SessionState::Active { protocol_state, .. } => Some(protocol_state),
729 _ => None,
730 }
731 }
732
733 pub fn active_sessions(&self) -> Vec<(SessionId, &SessionState<T>)> {
735 self.sessions
736 .iter()
737 .filter(|(_, state)| state.is_active())
738 .map(|(id, state)| (*id, state))
739 .collect()
740 }
741
742 pub fn count_active_sessions(&self) -> usize {
744 self.sessions
745 .values()
746 .filter(|state| state.is_active())
747 .count()
748 }
749
750 pub fn count_completed_sessions(&self) -> usize {
752 self.sessions
753 .values()
754 .filter(|state| state.is_terminal())
755 .count()
756 }
757
758 pub fn cleanup_stale_sessions(&mut self, now: &PhysicalTime) -> SyncResult<usize>
763 where
764 T: std::fmt::Debug,
765 {
766 let elapsed_ms = now.ts_ms.saturating_sub(self.last_cleanup.ts_ms);
767 if elapsed_ms < self.config.cleanup_interval.as_millis() as u64 {
768 return Ok(0);
769 }
770
771 let mut removed = 0;
772 let mut to_timeout = Vec::new();
773 let mut to_remove = Vec::new();
774
775 for (session_id, session) in &self.sessions {
777 if session.is_timed_out(now) && !session.is_terminal() {
778 to_timeout.push(*session_id);
779 } else if session.is_terminal() {
780 to_remove.push(*session_id);
782 }
783 }
784
785 for session_id in to_timeout {
787 self.timeout_session(session_id, now)?;
788 removed += 1;
789 }
790
791 for session_id in to_remove {
793 self.sessions.remove(&session_id);
794 removed += 1;
795 }
796
797 self.last_cleanup = now.clone();
798 Ok(removed)
799 }
800
801 pub fn cleanup_stale_sessions_ms(&mut self, now_ms: u64) -> SyncResult<usize>
805 where
806 T: std::fmt::Debug,
807 {
808 self.cleanup_stale_sessions(&PhysicalTime {
809 ts_ms: now_ms,
810 uncertainty: None,
811 })
812 }
813
814 pub fn get_statistics(&self) -> SessionManagerStatistics {
816 let mut active_count = 0u64;
817 let mut completed_count = 0u64;
818 let mut failed_count = 0u64;
819 let mut timeout_count = 0u64;
820 let mut total_duration_ms = 0u64;
821 let mut total_operations = 0u64;
822
823 for session in self.sessions.values() {
824 match session {
825 SessionState::Active { .. } => active_count += 1,
826 SessionState::Completed(result) => match result {
827 SessionResult::Success {
828 duration_ms,
829 operations_count,
830 ..
831 } => {
832 completed_count += 1;
833 total_duration_ms += duration_ms;
834 total_operations += operations_count;
835 }
836 SessionResult::Failure { duration_ms, .. } => {
837 failed_count += 1;
838 total_duration_ms += duration_ms;
839 }
840 SessionResult::Timeout { duration_ms, .. } => {
841 timeout_count += 1;
842 total_duration_ms += duration_ms;
843 }
844 },
845 _ => {} }
847 }
848
849 let total_sessions = completed_count + failed_count + timeout_count;
850 let success_rate = if total_sessions > 0 {
851 (completed_count as f64 / total_sessions as f64) * 100.0
852 } else {
853 100.0
854 };
855
856 let average_duration_ms = if total_sessions > 0 {
857 total_duration_ms / total_sessions
858 } else {
859 0
860 };
861
862 SessionManagerStatistics {
863 active_sessions: active_count,
864 completed_sessions: completed_count,
865 failed_sessions: failed_count,
866 timeout_sessions: timeout_count,
867 total_sessions,
868 success_rate_percent: success_rate,
869 average_duration_ms,
870 total_operations,
871 }
872 }
873
874 pub fn close_session(&mut self, peer: DeviceId) -> SyncResult<()> {
876 let session_ids_to_remove: Vec<SessionId> = self
878 .sessions
879 .iter()
880 .filter_map(|(session_id, session_state)| match session_state {
881 SessionState::Active { participants, .. } => {
882 if participants.contains(&peer) {
883 Some(*session_id)
884 } else {
885 None
886 }
887 }
888 _ => None,
889 })
890 .collect();
891
892 for session_id in session_ids_to_remove {
893 self.sessions.remove(&session_id);
894 }
895
896 Ok(())
897 }
898
899 pub fn has_active_session(&self, peer: DeviceId) -> bool {
901 self.sessions.values().any(|session_state| {
902 matches!(session_state, SessionState::Active { participants, .. } if participants.contains(&peer))
903 })
904 }
905}
906
907#[derive(Debug, Clone, Serialize, Deserialize)]
909pub struct SessionManagerStatistics {
910 pub active_sessions: u64,
911 pub completed_sessions: u64,
912 pub failed_sessions: u64,
913 pub timeout_sessions: u64,
914 pub total_sessions: u64,
915 pub success_rate_percent: f64,
916 pub average_duration_ms: u64,
917 pub total_operations: u64,
918}
919
920impl Default for SessionManagerStatistics {
921 fn default() -> Self {
922 Self {
923 active_sessions: 0,
924 completed_sessions: 0,
925 failed_sessions: 0,
926 timeout_sessions: 0,
927 total_sessions: 0,
928 success_rate_percent: 100.0,
929 average_duration_ms: 0,
930 total_operations: 0,
931 }
932 }
933}
934
935pub struct SessionManagerBuilder<T> {
937 config: SessionConfig,
938 metrics: Option<MetricsCollector>,
939 _phantom: std::marker::PhantomData<T>,
940}
941
942impl<T> SessionManagerBuilder<T>
943where
944 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>,
945{
946 pub fn new() -> Self {
948 Self {
949 config: SessionConfig::default(),
950 metrics: None,
951 _phantom: std::marker::PhantomData,
952 }
953 }
954
955 pub fn config(mut self, config: SessionConfig) -> Self {
957 self.config = config;
958 self
959 }
960
961 pub fn with_metrics(mut self, metrics: MetricsCollector) -> Self {
963 self.metrics = Some(metrics);
964 self
965 }
966
967 pub fn build(self, now: PhysicalTime) -> SessionManager<T> {
972 if let Some(metrics) = self.metrics {
973 SessionManager::with_metrics(self.config, metrics, now)
974 } else {
975 SessionManager::new(self.config, now)
976 }
977 }
978
979 pub fn build_ms(self, now_ms: u64) -> SessionManager<T> {
983 self.build(PhysicalTime {
984 ts_ms: now_ms,
985 uncertainty: None,
986 })
987 }
988}
989
990impl<T> Default for SessionManagerBuilder<T>
991where
992 T: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>,
993{
994 fn default() -> Self {
995 Self::new()
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::*;
1002 use aura_core::AuraError;
1003 use aura_testkit::builders::test_device_id;
1004
1005 fn test_time(ts_ms: u64) -> PhysicalTime {
1007 PhysicalTime {
1008 ts_ms,
1009 uncertainty: None,
1010 }
1011 }
1012
1013 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1014 struct TestProtocolState {
1015 phase: String,
1016 data: Vec<u8>,
1017 }
1018
1019 #[test]
1020 fn test_session_creation_and_activation() {
1021 let now = test_time(1000000); let mut manager =
1023 SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1024 let participants = vec![test_device_id(1), test_device_id(2)];
1025
1026 let session_id = manager.create_session(participants.clone(), &now).unwrap();
1028 assert_eq!(manager.count_active_sessions(), 0); let initial_state = TestProtocolState {
1032 phase: "initialization".to_string(),
1033 data: vec![1, 2, 3],
1034 };
1035 manager
1036 .activate_session(session_id, initial_state.clone(), &now)
1037 .unwrap();
1038 assert_eq!(manager.count_active_sessions(), 1);
1039
1040 let session = manager.get_session(&session_id).unwrap();
1042 match session {
1043 SessionState::Active {
1044 protocol_state,
1045 participants: session_participants,
1046 ..
1047 } => {
1048 assert_eq!(protocol_state, &initial_state);
1049 assert_eq!(session_participants, &participants);
1050 }
1051 _ => panic!("Session should be active"),
1052 }
1053 }
1054
1055 #[test]
1056 fn test_session_completion() {
1057 let now = test_time(1000000); let mut manager =
1059 SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1060 let session_id = manager
1061 .create_session(vec![test_device_id(1)], &now)
1062 .unwrap();
1063
1064 let initial_state = TestProtocolState {
1065 phase: "test".to_string(),
1066 data: vec![],
1067 };
1068 manager
1069 .activate_session(session_id, initial_state, &now)
1070 .unwrap();
1071
1072 let mut metadata = HashMap::new();
1074 metadata.insert("test_key".to_string(), "test_value".to_string());
1075
1076 manager
1077 .complete_session(session_id, 100, 1024, metadata, &test_time(1000100))
1078 .unwrap();
1079 assert_eq!(manager.count_active_sessions(), 0);
1080 assert_eq!(manager.count_completed_sessions(), 1);
1081
1082 let session = manager.get_session(&session_id).unwrap();
1084 match session {
1085 SessionState::Completed(SessionResult::Success {
1086 operations_count,
1087 bytes_transferred,
1088 ..
1089 }) => {
1090 assert_eq!(*operations_count, 100);
1091 assert_eq!(*bytes_transferred, 1024);
1092 }
1093 _ => panic!("Session should be completed successfully"),
1094 }
1095 }
1096
1097 #[test]
1098 fn test_session_failure() {
1099 let now = test_time(1000000); let mut manager =
1101 SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1102 let session_id = manager
1103 .create_session(vec![test_device_id(1)], &now)
1104 .unwrap();
1105
1106 let initial_state = TestProtocolState {
1107 phase: "test".to_string(),
1108 data: vec![],
1109 };
1110 manager
1111 .activate_session(session_id, initial_state, &now)
1112 .unwrap();
1113
1114 let error = SessionError::ProtocolViolation {
1116 constraint: "test constraint".to_string(),
1117 };
1118 manager
1119 .fail_session(session_id, error, None, &test_time(1000010))
1120 .unwrap();
1121
1122 let session = manager.get_session(&session_id).unwrap();
1124 match session {
1125 SessionState::Completed(SessionResult::Failure {
1126 error: session_error,
1127 ..
1128 }) => match session_error {
1129 SessionError::ProtocolViolation { constraint } => {
1130 assert_eq!(constraint, "test constraint");
1131 }
1132 _ => panic!("Wrong error type"),
1133 },
1134 _ => panic!("Session should be completed with failure"),
1135 }
1136 }
1137
1138 #[test]
1139 fn test_concurrent_session_limit() {
1140 let config = SessionConfig {
1141 max_concurrent_sessions: 2,
1142 ..SessionConfig::default()
1143 };
1144 let now = test_time(1000000); let mut manager = SessionManager::<TestProtocolState>::new(config, now.clone());
1146
1147 let session1 = manager
1149 .create_session(vec![test_device_id(1)], &now)
1150 .unwrap();
1151 let session2 = manager
1152 .create_session(vec![test_device_id(1)], &now)
1153 .unwrap();
1154
1155 let state = TestProtocolState {
1156 phase: "test".to_string(),
1157 data: vec![],
1158 };
1159 manager
1160 .activate_session(session1, state.clone(), &now)
1161 .unwrap();
1162 manager.activate_session(session2, state, &now).unwrap();
1163
1164 let result = manager.create_session(vec![test_device_id(1)], &now);
1166 assert!(result.is_err());
1167 assert!(matches!(result.unwrap_err(), AuraError::Internal { .. }));
1168 }
1169
1170 #[test]
1171 fn test_session_timeout() {
1172 let config = SessionConfig {
1173 timeout: Duration::from_millis(100),
1174 ..SessionConfig::default()
1175 };
1176 let now = test_time(1000000); let mut manager = SessionManager::<TestProtocolState>::new(config, now.clone());
1178
1179 let session_id = manager
1180 .create_session(vec![test_device_id(1)], &now)
1181 .unwrap();
1182
1183 let future_time = test_time(1000200);
1185
1186 let state = TestProtocolState {
1188 phase: "test".to_string(),
1189 data: vec![],
1190 };
1191 let result = manager.activate_session(session_id, state, &future_time);
1192 assert!(result.is_err());
1193 assert!(matches!(result.unwrap_err(), AuraError::Internal { .. }));
1195 }
1196
1197 #[test]
1198 fn test_cleanup_stale_sessions() {
1199 let config = SessionConfig {
1200 cleanup_interval: Duration::from_millis(50),
1201 ..SessionConfig::default()
1202 };
1203 let now = test_time(1000000); let mut manager = SessionManager::<TestProtocolState>::new(config, now.clone());
1205
1206 let session_id = manager
1208 .create_session(vec![test_device_id(1)], &now)
1209 .unwrap();
1210 let state = TestProtocolState {
1211 phase: "test".to_string(),
1212 data: vec![],
1213 };
1214 manager.activate_session(session_id, state, &now).unwrap();
1215 manager
1216 .complete_session(session_id, 0, 0, HashMap::new(), &test_time(1000050))
1217 .unwrap();
1218
1219 assert_eq!(manager.sessions.len(), 1);
1220
1221 let cleanup_time = test_time(1000200);
1223
1224 let removed = manager.cleanup_stale_sessions(&cleanup_time).unwrap();
1226 assert!(removed > 0);
1227 }
1228
1229 #[test]
1230 fn test_session_statistics() {
1231 let now = test_time(1000000); let mut manager =
1233 SessionManager::<TestProtocolState>::new(SessionConfig::default(), now.clone());
1234
1235 for i in 0..3 {
1237 let session_id = manager
1238 .create_session(vec![test_device_id(1)], &now)
1239 .unwrap();
1240 let state = TestProtocolState {
1241 phase: "test".to_string(),
1242 data: vec![],
1243 };
1244 manager.activate_session(session_id, state, &now).unwrap();
1245
1246 if i < 2 {
1247 manager
1248 .complete_session(
1249 session_id,
1250 10 * (i + 1),
1251 100 * (i + 1),
1252 HashMap::new(),
1253 &test_time(1000000 + 100 * (i + 1)),
1254 )
1255 .unwrap();
1256 } else {
1257 let error = SessionError::ProtocolViolation {
1258 constraint: "test".to_string(),
1259 };
1260 manager
1261 .fail_session(session_id, error, None, &test_time(1000050))
1262 .unwrap();
1263 }
1264 }
1265
1266 let stats = manager.get_statistics();
1267 assert_eq!(stats.total_sessions, 3);
1268 assert_eq!(stats.completed_sessions, 2);
1269 assert_eq!(stats.failed_sessions, 1);
1270 assert_eq!(stats.timeout_sessions, 0);
1271 assert!((stats.success_rate_percent - 66.67).abs() < 0.1); }
1273}