1use crate::domain::{
4 DomainError, DomainResult,
5 entities::{Frame, Stream, stream::StreamConfig},
6 events::{DomainEvent, SessionState},
7 value_objects::{JsonData, Priority, SessionId, StreamId},
8};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12
13mod serde_session_id {
15 use crate::domain::value_objects::SessionId;
16 use serde::{Deserialize, Deserializer, Serialize, Serializer};
17
18 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
19 where
20 S: Serializer,
21 {
22 id.as_uuid().serialize(serializer)
23 }
24
25 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
26 where
27 D: Deserializer<'de>,
28 {
29 let uuid = uuid::Uuid::deserialize(deserializer)?;
30 Ok(SessionId::from_uuid(uuid))
31 }
32}
33
34mod serde_stream_map {
36 use crate::domain::{entities::Stream, value_objects::StreamId};
37 use serde::{Deserialize, Deserializer, Serialize, Serializer};
38 use std::collections::HashMap;
39
40 pub fn serialize<S>(map: &HashMap<StreamId, Stream>, serializer: S) -> Result<S::Ok, S::Error>
41 where
42 S: Serializer,
43 {
44 let uuid_map: HashMap<String, &Stream> = map
45 .iter()
46 .map(|(k, v)| (k.as_uuid().to_string(), v))
47 .collect();
48 uuid_map.serialize(serializer)
49 }
50
51 pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<StreamId, Stream>, D::Error>
52 where
53 D: Deserializer<'de>,
54 {
55 let uuid_map: HashMap<String, Stream> = HashMap::deserialize(deserializer)?;
56 uuid_map
57 .into_iter()
58 .map(|(k, v)| {
59 uuid::Uuid::parse_str(&k)
60 .map(|uuid| (StreamId::from_uuid(uuid), v))
61 .map_err(serde::de::Error::custom)
62 })
63 .collect()
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct SessionConfig {
70 pub max_concurrent_streams: usize,
72 pub session_timeout_seconds: u64,
74 pub default_stream_config: StreamConfig,
76 pub enable_compression: bool,
78 pub metadata: HashMap<String, String>,
80}
81
82impl Default for SessionConfig {
83 fn default() -> Self {
84 Self {
85 max_concurrent_streams: 10,
86 session_timeout_seconds: 3600, default_stream_config: StreamConfig::default(),
88 enable_compression: true,
89 metadata: HashMap::new(),
90 }
91 }
92}
93
94#[derive(Debug, Clone, Default, Serialize, Deserialize)]
96pub struct SessionStats {
97 pub total_streams: u64,
99 pub active_streams: u64,
101 pub completed_streams: u64,
103 pub failed_streams: u64,
105 pub total_frames: u64,
107 pub total_bytes: u64,
109 pub average_stream_duration_ms: f64,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct StreamSession {
116 #[serde(with = "serde_session_id")]
117 id: SessionId,
118 state: SessionState,
119 config: SessionConfig,
120 stats: SessionStats,
121 created_at: DateTime<Utc>,
122 updated_at: DateTime<Utc>,
123 expires_at: DateTime<Utc>,
124 completed_at: Option<DateTime<Utc>>,
125
126 #[serde(with = "serde_stream_map")]
128 streams: HashMap<StreamId, Stream>,
129 pending_events: VecDeque<DomainEvent>,
130
131 client_info: Option<String>,
133 user_agent: Option<String>,
134 ip_address: Option<String>,
135}
136
137impl StreamSession {
138 pub fn new(config: SessionConfig) -> Self {
140 let now = Utc::now();
141 let expires_at = now + chrono::Duration::seconds(config.session_timeout_seconds as i64);
142
143 Self {
144 id: SessionId::new(),
145 state: SessionState::Initializing,
146 config,
147 stats: SessionStats::default(),
148 created_at: now,
149 updated_at: now,
150 expires_at,
151 completed_at: None,
152 streams: HashMap::new(),
153 pending_events: VecDeque::new(),
154 client_info: None,
155 user_agent: None,
156 ip_address: None,
157 }
158 }
159
160 pub fn id(&self) -> SessionId {
162 self.id
163 }
164
165 pub fn state(&self) -> &SessionState {
167 &self.state
168 }
169
170 pub fn config(&self) -> &SessionConfig {
172 &self.config
173 }
174
175 pub fn stats(&self) -> &SessionStats {
177 &self.stats
178 }
179
180 pub fn created_at(&self) -> DateTime<Utc> {
182 self.created_at
183 }
184
185 pub fn updated_at(&self) -> DateTime<Utc> {
187 self.updated_at
188 }
189
190 pub fn expires_at(&self) -> DateTime<Utc> {
192 self.expires_at
193 }
194
195 pub fn completed_at(&self) -> Option<DateTime<Utc>> {
197 self.completed_at
198 }
199
200 pub fn client_info(&self) -> Option<&str> {
202 self.client_info.as_deref()
203 }
204
205 pub fn duration(&self) -> Option<chrono::Duration> {
207 self.completed_at.map(|end| end - self.created_at)
208 }
209
210 pub fn is_expired(&self) -> bool {
212 Utc::now() > self.expires_at
213 }
214
215 pub fn is_active(&self) -> bool {
217 matches!(self.state, SessionState::Active) && !self.is_expired()
218 }
219
220 pub fn streams(&self) -> &HashMap<StreamId, Stream> {
222 &self.streams
223 }
224
225 pub fn get_stream(&self, stream_id: StreamId) -> Option<&Stream> {
227 self.streams.get(&stream_id)
228 }
229
230 pub fn update_stream_config(
238 &mut self,
239 stream_id: StreamId,
240 config: StreamConfig,
241 ) -> DomainResult<()> {
242 let stream = self
243 .streams
244 .get_mut(&stream_id)
245 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
246
247 stream.update_config(config)?;
248 self.update_timestamp();
249
250 self.add_event(DomainEvent::StreamConfigUpdated {
251 session_id: self.id,
252 stream_id,
253 timestamp: Utc::now(),
254 });
255
256 Ok(())
257 }
258
259 pub fn create_stream_patch_frames(
266 &mut self,
267 stream_id: StreamId,
268 priority_threshold: Priority,
269 max_frames: usize,
270 ) -> DomainResult<Vec<Frame>> {
271 let stream = self
272 .streams
273 .get_mut(&stream_id)
274 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
275
276 let frames = stream.create_patch_frames(priority_threshold, max_frames)?;
277
278 self.stats.total_frames += frames.len() as u64;
279 self.update_timestamp();
280
281 if !frames.is_empty() {
282 self.add_event(DomainEvent::FramesBatched {
283 session_id: self.id,
284 frame_count: frames.len(),
285 timestamp: Utc::now(),
286 });
287 }
288
289 Ok(frames)
290 }
291
292 pub fn activate(&mut self) -> DomainResult<()> {
294 match self.state {
295 SessionState::Initializing => {
296 self.state = SessionState::Active;
297 self.update_timestamp();
298
299 self.add_event(DomainEvent::SessionActivated {
300 session_id: self.id,
301 timestamp: Utc::now(),
302 });
303
304 Ok(())
305 }
306 _ => Err(DomainError::InvalidStateTransition(format!(
307 "Cannot activate session from state: {:?}",
308 self.state
309 ))),
310 }
311 }
312
313 pub fn create_stream(&mut self, source_data: JsonData) -> DomainResult<StreamId> {
315 if !self.is_active() {
316 return Err(DomainError::InvalidSessionState(
317 "Session is not active".to_string(),
318 ));
319 }
320
321 if self.streams.len() >= self.config.max_concurrent_streams {
322 return Err(DomainError::TooManyStreams(format!(
323 "Maximum {} concurrent streams exceeded",
324 self.config.max_concurrent_streams
325 )));
326 }
327
328 let domain_data = source_data;
330
331 let stream = Stream::new(
332 self.id,
333 domain_data,
334 self.config.default_stream_config.clone(),
335 );
336 let stream_id = stream.id();
337
338 self.streams.insert(stream_id, stream);
339 self.stats.total_streams += 1;
340 self.stats.active_streams += 1;
341 self.update_timestamp();
342
343 self.add_event(DomainEvent::StreamCreated {
344 session_id: self.id,
345 stream_id,
346 timestamp: Utc::now(),
347 });
348
349 Ok(stream_id)
350 }
351
352 pub fn start_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
354 let stream = self
355 .streams
356 .get_mut(&stream_id)
357 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
358
359 stream.start_streaming()?;
360 self.update_timestamp();
361
362 self.add_event(DomainEvent::StreamStarted {
363 session_id: self.id,
364 stream_id,
365 timestamp: Utc::now(),
366 });
367
368 Ok(())
369 }
370
371 pub fn complete_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
373 let stream = self
374 .streams
375 .get_mut(&stream_id)
376 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
377
378 stream.complete()?;
379
380 self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
382 self.stats.completed_streams += 1;
383
384 if let Some(duration) = stream.duration() {
386 let duration_ms = duration.num_milliseconds() as f64;
387 self.stats.average_stream_duration_ms =
388 (self.stats.average_stream_duration_ms + duration_ms) / 2.0;
389 }
390
391 self.update_timestamp();
392
393 self.add_event(DomainEvent::StreamCompleted {
394 session_id: self.id,
395 stream_id,
396 timestamp: Utc::now(),
397 });
398
399 Ok(())
400 }
401
402 pub fn fail_stream(&mut self, stream_id: StreamId, error: String) -> DomainResult<()> {
404 let stream = self
405 .streams
406 .get_mut(&stream_id)
407 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
408
409 stream.fail(error.clone())?;
410
411 self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
413 self.stats.failed_streams += 1;
414
415 self.update_timestamp();
416
417 self.add_event(DomainEvent::StreamFailed {
418 session_id: self.id,
419 stream_id,
420 error,
421 timestamp: Utc::now(),
422 });
423
424 Ok(())
425 }
426
427 pub fn create_priority_frames(&mut self, batch_size: usize) -> DomainResult<Vec<Frame>> {
429 if !self.is_active() {
430 return Err(DomainError::InvalidSessionState(
431 "Session is not active".to_string(),
432 ));
433 }
434
435 let mut all_frames = Vec::new();
436 let mut frame_count = 0;
437
438 let mut stream_frames: Vec<(Priority, StreamId, Frame)> = Vec::new();
440
441 for (stream_id, stream) in &mut self.streams {
442 if !stream.is_active() {
443 continue;
444 }
445
446 let frames = stream.create_patch_frames(Priority::BACKGROUND, 5)?;
448
449 for frame in frames {
450 let priority = frame.priority();
451 stream_frames.push((priority, *stream_id, frame));
452 }
453 }
454
455 stream_frames.sort_by_key(|frame| std::cmp::Reverse(frame.0));
457
458 for (_, _, frame) in stream_frames.into_iter().take(batch_size) {
460 all_frames.push(frame);
461 frame_count += 1;
462 }
463
464 self.stats.total_frames += frame_count;
466 self.update_timestamp();
467
468 if !all_frames.is_empty() {
469 self.add_event(DomainEvent::FramesBatched {
470 session_id: self.id,
471 frame_count: all_frames.len(),
472 timestamp: Utc::now(),
473 });
474 }
475
476 Ok(all_frames)
477 }
478
479 pub fn close(&mut self) -> DomainResult<()> {
481 match self.state {
482 SessionState::Active => {
483 self.state = SessionState::Closing;
484
485 let active_stream_ids: Vec<_> = self
487 .streams
488 .iter()
489 .filter(|(_, stream)| stream.is_active())
490 .map(|(id, _)| *id)
491 .collect();
492
493 for stream_id in active_stream_ids {
494 if let Some(stream) = self.streams.get_mut(&stream_id) {
495 let _ = stream.cancel(); }
497 }
498
499 self.state = SessionState::Completed;
500 self.completed_at = Some(Utc::now());
501 self.update_timestamp();
502
503 self.add_event(DomainEvent::SessionClosed {
504 session_id: self.id,
505 timestamp: Utc::now(),
506 });
507
508 Ok(())
509 }
510 _ => Err(DomainError::InvalidStateTransition(format!(
511 "Cannot close session from state: {:?}",
512 self.state
513 ))),
514 }
515 }
516
517 pub fn force_close_expired(&mut self) -> DomainResult<bool> {
519 if !self.is_expired() {
520 return Ok(false);
521 }
522
523 let old_state = self.state.clone();
525 self.state = SessionState::Failed;
526 self.completed_at = Some(Utc::now());
527 self.update_timestamp();
528
529 for stream in self.streams.values_mut() {
531 let _ = stream.cancel(); }
533
534 self.streams.clear();
536
537 self.add_event(DomainEvent::SessionTimedOut {
539 session_id: self.id,
540 original_state: old_state,
541 timeout_duration: self.config.session_timeout_seconds,
542 timestamp: Utc::now(),
543 });
544
545 Ok(true)
546 }
547
548 pub fn extend_timeout(&mut self, additional_seconds: u64) -> DomainResult<()> {
550 if self.is_expired() {
551 return Err(DomainError::InvalidStateTransition(
552 "Cannot extend timeout for expired session".to_string(),
553 ));
554 }
555
556 self.expires_at += chrono::Duration::seconds(additional_seconds as i64);
557 self.update_timestamp();
558
559 self.add_event(DomainEvent::SessionTimeoutExtended {
560 session_id: self.id,
561 additional_seconds,
562 new_expires_at: self.expires_at,
563 timestamp: Utc::now(),
564 });
565
566 Ok(())
567 }
568
569 pub fn set_client_info(
571 &mut self,
572 client_info: String,
573 user_agent: Option<String>,
574 ip_address: Option<String>,
575 ) {
576 self.client_info = Some(client_info);
577 self.user_agent = user_agent;
578 self.ip_address = ip_address;
579 self.update_timestamp();
580 }
581
582 pub fn pending_events(&self) -> &VecDeque<DomainEvent> {
584 &self.pending_events
585 }
586
587 pub fn take_events(&mut self) -> VecDeque<DomainEvent> {
589 std::mem::take(&mut self.pending_events)
590 }
591
592 pub fn health_check(&self) -> SessionHealth {
594 let active_count = self.streams.values().filter(|s| s.is_active()).count();
595 let failed_count = self
596 .streams
597 .values()
598 .filter(|s| {
599 matches!(
600 s.state(),
601 crate::domain::entities::stream::StreamState::Failed
602 )
603 })
604 .count();
605
606 SessionHealth {
607 is_healthy: self.is_active() && failed_count == 0,
608 active_streams: active_count,
609 failed_streams: failed_count,
610 is_expired: self.is_expired(),
611 uptime_seconds: (Utc::now() - self.created_at).num_seconds(),
612 }
613 }
614
615 fn add_event(&mut self, event: DomainEvent) {
617 self.pending_events.push_back(event);
618 }
619
620 fn update_timestamp(&mut self) {
622 self.updated_at = Utc::now();
623 }
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize)]
628pub struct SessionHealth {
629 pub is_healthy: bool,
631 pub active_streams: usize,
633 pub failed_streams: usize,
635 pub is_expired: bool,
637 pub uptime_seconds: i64,
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644
645 #[test]
646 fn test_session_creation_and_activation() {
647 let mut session = StreamSession::new(SessionConfig::default());
648
649 assert_eq!(session.state(), &SessionState::Initializing);
650 assert!(!session.is_active());
651
652 assert!(session.activate().is_ok());
653 assert_eq!(session.state(), &SessionState::Active);
654 assert!(session.is_active());
655 }
656
657 #[test]
658 fn test_stream_management() {
659 let mut session = StreamSession::new(SessionConfig::default());
660 assert!(session.activate().is_ok());
661
662 let mut map = HashMap::new();
663 map.insert("test".to_string(), JsonData::String("data".to_string()));
664 let source_data = JsonData::Object(map);
665
666 let stream_id = session.create_stream(source_data).unwrap();
668 assert_eq!(session.streams().len(), 1);
669 assert_eq!(session.stats().total_streams, 1);
670 assert_eq!(session.stats().active_streams, 1);
671
672 assert!(session.start_stream(stream_id).is_ok());
674
675 assert!(session.complete_stream(stream_id).is_ok());
677 assert_eq!(session.stats().active_streams, 0);
678 assert_eq!(session.stats().completed_streams, 1);
679 }
680
681 #[test]
682 fn test_concurrent_stream_limit() {
683 let config = SessionConfig {
684 max_concurrent_streams: 2,
685 ..Default::default()
686 };
687 let mut session = StreamSession::new(config);
688 assert!(session.activate().is_ok());
689
690 let source_data = JsonData::Object(HashMap::new());
691
692 assert!(session.create_stream(source_data.clone()).is_ok());
694 assert!(session.create_stream(source_data.clone()).is_ok());
695
696 assert!(session.create_stream(source_data).is_err());
698 }
699
700 #[test]
701 fn test_session_expiration() {
702 let config = SessionConfig {
703 session_timeout_seconds: 1,
704 ..Default::default()
705 };
706 let session = StreamSession::new(config);
707
708 assert!(!session.is_expired());
710
711 assert!(session.expires_at > session.created_at);
714 }
715
716 #[test]
717 fn test_domain_events() {
718 let mut session = StreamSession::new(SessionConfig::default());
719
720 assert!(session.activate().is_ok());
722 assert!(!session.pending_events().is_empty());
723
724 let events = session.take_events();
725 assert_eq!(events.len(), 1);
726
727 assert!(session.pending_events().is_empty());
729 }
730
731 #[test]
732 fn test_session_health() {
733 let mut session = StreamSession::new(SessionConfig::default());
734 assert!(session.activate().is_ok());
735
736 let health = session.health_check();
737 assert!(health.is_healthy);
738 assert_eq!(health.active_streams, 0);
739 assert_eq!(health.failed_streams, 0);
740 assert!(!health.is_expired);
741 assert!(health.uptime_seconds >= 0);
742 }
743}