1use crate::domain::{
4 DomainError, DomainResult,
5 entities::{Frame, Stream, stream::StreamConfig},
6 events::{DomainEvent, SessionState},
7 value_objects::{JsonData, Priority, SessionId, StreamId},
8};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12
13mod serde_session_id {
15 use crate::domain::value_objects::SessionId;
16 use serde::{Deserialize, Deserializer, Serialize, Serializer};
17
18 pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
19 where
20 S: Serializer,
21 {
22 id.as_uuid().serialize(serializer)
23 }
24
25 pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
26 where
27 D: Deserializer<'de>,
28 {
29 let uuid = uuid::Uuid::deserialize(deserializer)?;
30 Ok(SessionId::from_uuid(uuid))
31 }
32}
33
34#[allow(dead_code)]
36mod serde_stream_id {
37 use crate::domain::value_objects::StreamId;
38 use serde::{Deserialize, Deserializer, Serialize, Serializer};
39
40 pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
41 where
42 S: Serializer,
43 {
44 id.as_uuid().serialize(serializer)
45 }
46
47 pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
48 where
49 D: Deserializer<'de>,
50 {
51 let uuid = uuid::Uuid::deserialize(deserializer)?;
52 Ok(StreamId::from_uuid(uuid))
53 }
54}
55
56mod serde_stream_map {
58 use crate::domain::{entities::Stream, value_objects::StreamId};
59 use serde::{Deserialize, Deserializer, Serialize, Serializer};
60 use std::collections::HashMap;
61
62 pub fn serialize<S>(map: &HashMap<StreamId, Stream>, serializer: S) -> Result<S::Ok, S::Error>
63 where
64 S: Serializer,
65 {
66 let uuid_map: HashMap<String, &Stream> = map
67 .iter()
68 .map(|(k, v)| (k.as_uuid().to_string(), v))
69 .collect();
70 uuid_map.serialize(serializer)
71 }
72
73 pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<StreamId, Stream>, D::Error>
74 where
75 D: Deserializer<'de>,
76 {
77 let uuid_map: HashMap<String, Stream> = HashMap::deserialize(deserializer)?;
78 uuid_map
79 .into_iter()
80 .map(|(k, v)| {
81 uuid::Uuid::parse_str(&k)
82 .map(|uuid| (StreamId::from_uuid(uuid), v))
83 .map_err(serde::de::Error::custom)
84 })
85 .collect()
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SessionConfig {
92 pub max_concurrent_streams: usize,
94 pub session_timeout_seconds: u64,
96 pub default_stream_config: StreamConfig,
98 pub enable_compression: bool,
100 pub metadata: HashMap<String, String>,
102}
103
104impl Default for SessionConfig {
105 fn default() -> Self {
106 Self {
107 max_concurrent_streams: 10,
108 session_timeout_seconds: 3600, default_stream_config: StreamConfig::default(),
110 enable_compression: true,
111 metadata: HashMap::new(),
112 }
113 }
114}
115
116#[derive(Debug, Clone, Default, Serialize, Deserialize)]
118pub struct SessionStats {
119 pub total_streams: u64,
120 pub active_streams: u64,
121 pub completed_streams: u64,
122 pub failed_streams: u64,
123 pub total_frames: u64,
124 pub total_bytes: u64,
125 pub average_stream_duration_ms: f64,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct StreamSession {
131 #[serde(with = "serde_session_id")]
132 id: SessionId,
133 state: SessionState,
134 config: SessionConfig,
135 stats: SessionStats,
136 created_at: DateTime<Utc>,
137 updated_at: DateTime<Utc>,
138 expires_at: DateTime<Utc>,
139 completed_at: Option<DateTime<Utc>>,
140
141 #[serde(with = "serde_stream_map")]
143 streams: HashMap<StreamId, Stream>,
144 pending_events: VecDeque<DomainEvent>,
145
146 client_info: Option<String>,
148 user_agent: Option<String>,
149 ip_address: Option<String>,
150}
151
152impl StreamSession {
153 pub fn new(config: SessionConfig) -> Self {
155 let now = Utc::now();
156 let expires_at = now + chrono::Duration::seconds(config.session_timeout_seconds as i64);
157
158 Self {
159 id: SessionId::new(),
160 state: SessionState::Initializing,
161 config,
162 stats: SessionStats::default(),
163 created_at: now,
164 updated_at: now,
165 expires_at,
166 completed_at: None,
167 streams: HashMap::new(),
168 pending_events: VecDeque::new(),
169 client_info: None,
170 user_agent: None,
171 ip_address: None,
172 }
173 }
174
175 pub fn id(&self) -> SessionId {
177 self.id
178 }
179
180 pub fn state(&self) -> &SessionState {
182 &self.state
183 }
184
185 pub fn config(&self) -> &SessionConfig {
187 &self.config
188 }
189
190 pub fn stats(&self) -> &SessionStats {
192 &self.stats
193 }
194
195 pub fn created_at(&self) -> DateTime<Utc> {
197 self.created_at
198 }
199
200 pub fn updated_at(&self) -> DateTime<Utc> {
202 self.updated_at
203 }
204
205 pub fn expires_at(&self) -> DateTime<Utc> {
207 self.expires_at
208 }
209
210 pub fn completed_at(&self) -> Option<DateTime<Utc>> {
212 self.completed_at
213 }
214
215 pub fn duration(&self) -> Option<chrono::Duration> {
217 self.completed_at.map(|end| end - self.created_at)
218 }
219
220 pub fn is_expired(&self) -> bool {
222 Utc::now() > self.expires_at
223 }
224
225 pub fn is_active(&self) -> bool {
227 matches!(self.state, SessionState::Active) && !self.is_expired()
228 }
229
230 pub fn streams(&self) -> &HashMap<StreamId, Stream> {
232 &self.streams
233 }
234
235 pub fn get_stream(&self, stream_id: StreamId) -> Option<&Stream> {
237 self.streams.get(&stream_id)
238 }
239
240 pub fn get_stream_mut(&mut self, stream_id: StreamId) -> Option<&mut Stream> {
242 self.streams.get_mut(&stream_id)
243 }
244
245 pub fn activate(&mut self) -> DomainResult<()> {
247 match self.state {
248 SessionState::Initializing => {
249 self.state = SessionState::Active;
250 self.update_timestamp();
251
252 self.add_event(DomainEvent::SessionActivated {
253 session_id: self.id,
254 timestamp: Utc::now(),
255 });
256
257 Ok(())
258 }
259 _ => Err(DomainError::InvalidStateTransition(format!(
260 "Cannot activate session from state: {:?}",
261 self.state
262 ))),
263 }
264 }
265
266 pub fn create_stream(&mut self, source_data: JsonData) -> DomainResult<StreamId> {
268 if !self.is_active() {
269 return Err(DomainError::InvalidSessionState(
270 "Session is not active".to_string(),
271 ));
272 }
273
274 if self.streams.len() >= self.config.max_concurrent_streams {
275 return Err(DomainError::TooManyStreams(format!(
276 "Maximum {} concurrent streams exceeded",
277 self.config.max_concurrent_streams
278 )));
279 }
280
281 let domain_data = source_data;
283
284 let stream = Stream::new(
285 self.id,
286 domain_data,
287 self.config.default_stream_config.clone(),
288 );
289 let stream_id = stream.id();
290
291 self.streams.insert(stream_id, stream);
292 self.stats.total_streams += 1;
293 self.stats.active_streams += 1;
294 self.update_timestamp();
295
296 self.add_event(DomainEvent::StreamCreated {
297 session_id: self.id,
298 stream_id,
299 timestamp: Utc::now(),
300 });
301
302 Ok(stream_id)
303 }
304
305 pub fn start_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
307 let stream = self
308 .streams
309 .get_mut(&stream_id)
310 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
311
312 stream.start_streaming()?;
313 self.update_timestamp();
314
315 self.add_event(DomainEvent::StreamStarted {
316 session_id: self.id,
317 stream_id,
318 timestamp: Utc::now(),
319 });
320
321 Ok(())
322 }
323
324 pub fn complete_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
326 let stream = self
327 .streams
328 .get_mut(&stream_id)
329 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
330
331 stream.complete()?;
332
333 self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
335 self.stats.completed_streams += 1;
336
337 if let Some(duration) = stream.duration() {
339 let duration_ms = duration.num_milliseconds() as f64;
340 self.stats.average_stream_duration_ms =
341 (self.stats.average_stream_duration_ms + duration_ms) / 2.0;
342 }
343
344 self.update_timestamp();
345
346 self.add_event(DomainEvent::StreamCompleted {
347 session_id: self.id,
348 stream_id,
349 timestamp: Utc::now(),
350 });
351
352 Ok(())
353 }
354
355 pub fn fail_stream(&mut self, stream_id: StreamId, error: String) -> DomainResult<()> {
357 let stream = self
358 .streams
359 .get_mut(&stream_id)
360 .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
361
362 stream.fail(error.clone())?;
363
364 self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
366 self.stats.failed_streams += 1;
367
368 self.update_timestamp();
369
370 self.add_event(DomainEvent::StreamFailed {
371 session_id: self.id,
372 stream_id,
373 error,
374 timestamp: Utc::now(),
375 });
376
377 Ok(())
378 }
379
380 pub fn create_priority_frames(&mut self, batch_size: usize) -> DomainResult<Vec<Frame>> {
382 if !self.is_active() {
383 return Err(DomainError::InvalidSessionState(
384 "Session is not active".to_string(),
385 ));
386 }
387
388 let mut all_frames = Vec::new();
389 let mut frame_count = 0;
390
391 let mut stream_frames: Vec<(Priority, StreamId, Frame)> = Vec::new();
393
394 for (stream_id, stream) in &mut self.streams {
395 if !stream.is_active() {
396 continue;
397 }
398
399 let frames = stream.create_patch_frames(Priority::BACKGROUND, 5)?;
401
402 for frame in frames {
403 let priority = frame.priority();
404 stream_frames.push((priority, *stream_id, frame));
405 }
406 }
407
408 stream_frames.sort_by_key(|frame| std::cmp::Reverse(frame.0));
410
411 for (_, _, frame) in stream_frames.into_iter().take(batch_size) {
413 all_frames.push(frame);
414 frame_count += 1;
415 }
416
417 self.stats.total_frames += frame_count;
419 self.update_timestamp();
420
421 if !all_frames.is_empty() {
422 self.add_event(DomainEvent::FramesBatched {
423 session_id: self.id,
424 frame_count: all_frames.len(),
425 timestamp: Utc::now(),
426 });
427 }
428
429 Ok(all_frames)
430 }
431
432 pub fn close(&mut self) -> DomainResult<()> {
434 match self.state {
435 SessionState::Active => {
436 self.state = SessionState::Closing;
437
438 let active_stream_ids: Vec<_> = self
440 .streams
441 .iter()
442 .filter(|(_, stream)| stream.is_active())
443 .map(|(id, _)| *id)
444 .collect();
445
446 for stream_id in active_stream_ids {
447 if let Some(stream) = self.streams.get_mut(&stream_id) {
448 let _ = stream.cancel(); }
450 }
451
452 self.state = SessionState::Completed;
453 self.completed_at = Some(Utc::now());
454 self.update_timestamp();
455
456 self.add_event(DomainEvent::SessionClosed {
457 session_id: self.id,
458 timestamp: Utc::now(),
459 });
460
461 Ok(())
462 }
463 _ => Err(DomainError::InvalidStateTransition(format!(
464 "Cannot close session from state: {:?}",
465 self.state
466 ))),
467 }
468 }
469
470 pub fn force_close_expired(&mut self) -> DomainResult<bool> {
472 if !self.is_expired() {
473 return Ok(false);
474 }
475
476 let old_state = self.state.clone();
478 self.state = SessionState::Failed;
479 self.completed_at = Some(Utc::now());
480 self.update_timestamp();
481
482 for stream in self.streams.values_mut() {
484 let _ = stream.cancel(); }
486
487 self.streams.clear();
489
490 self.add_event(DomainEvent::SessionTimedOut {
492 session_id: self.id,
493 original_state: old_state,
494 timeout_duration: self.config.session_timeout_seconds,
495 timestamp: Utc::now(),
496 });
497
498 Ok(true)
499 }
500
501 pub fn extend_timeout(&mut self, additional_seconds: u64) -> DomainResult<()> {
503 if self.is_expired() {
504 return Err(DomainError::InvalidStateTransition(
505 "Cannot extend timeout for expired session".to_string(),
506 ));
507 }
508
509 self.expires_at += chrono::Duration::seconds(additional_seconds as i64);
510 self.update_timestamp();
511
512 self.add_event(DomainEvent::SessionTimeoutExtended {
513 session_id: self.id,
514 additional_seconds,
515 new_expires_at: self.expires_at,
516 timestamp: Utc::now(),
517 });
518
519 Ok(())
520 }
521
522 pub fn set_client_info(
524 &mut self,
525 client_info: String,
526 user_agent: Option<String>,
527 ip_address: Option<String>,
528 ) {
529 self.client_info = Some(client_info);
530 self.user_agent = user_agent;
531 self.ip_address = ip_address;
532 self.update_timestamp();
533 }
534
535 pub fn pending_events(&self) -> &VecDeque<DomainEvent> {
537 &self.pending_events
538 }
539
540 pub fn take_events(&mut self) -> VecDeque<DomainEvent> {
542 std::mem::take(&mut self.pending_events)
543 }
544
545 pub fn health_check(&self) -> SessionHealth {
547 let active_count = self.streams.values().filter(|s| s.is_active()).count();
548 let failed_count = self
549 .streams
550 .values()
551 .filter(|s| {
552 matches!(
553 s.state(),
554 crate::domain::entities::stream::StreamState::Failed
555 )
556 })
557 .count();
558
559 SessionHealth {
560 is_healthy: self.is_active() && failed_count == 0,
561 active_streams: active_count,
562 failed_streams: failed_count,
563 is_expired: self.is_expired(),
564 uptime_seconds: (Utc::now() - self.created_at).num_seconds(),
565 }
566 }
567
568 fn add_event(&mut self, event: DomainEvent) {
570 self.pending_events.push_back(event);
571 }
572
573 fn update_timestamp(&mut self) {
575 self.updated_at = Utc::now();
576 }
577}
578
579#[derive(Debug, Clone, Serialize, Deserialize)]
581pub struct SessionHealth {
582 pub is_healthy: bool,
583 pub active_streams: usize,
584 pub failed_streams: usize,
585 pub is_expired: bool,
586 pub uptime_seconds: i64,
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592
593 #[test]
594 fn test_session_creation_and_activation() {
595 let mut session = StreamSession::new(SessionConfig::default());
596
597 assert_eq!(session.state(), &SessionState::Initializing);
598 assert!(!session.is_active());
599
600 assert!(session.activate().is_ok());
601 assert_eq!(session.state(), &SessionState::Active);
602 assert!(session.is_active());
603 }
604
605 #[test]
606 fn test_stream_management() {
607 let mut session = StreamSession::new(SessionConfig::default());
608 assert!(session.activate().is_ok());
609
610 let mut map = HashMap::new();
611 map.insert("test".to_string(), JsonData::String("data".to_string()));
612 let source_data = JsonData::Object(map);
613
614 let stream_id = session.create_stream(source_data).unwrap();
616 assert_eq!(session.streams().len(), 1);
617 assert_eq!(session.stats().total_streams, 1);
618 assert_eq!(session.stats().active_streams, 1);
619
620 assert!(session.start_stream(stream_id).is_ok());
622
623 assert!(session.complete_stream(stream_id).is_ok());
625 assert_eq!(session.stats().active_streams, 0);
626 assert_eq!(session.stats().completed_streams, 1);
627 }
628
629 #[test]
630 fn test_concurrent_stream_limit() {
631 let config = SessionConfig {
632 max_concurrent_streams: 2,
633 ..Default::default()
634 };
635 let mut session = StreamSession::new(config);
636 assert!(session.activate().is_ok());
637
638 let source_data = JsonData::Object(HashMap::new());
639
640 assert!(session.create_stream(source_data.clone()).is_ok());
642 assert!(session.create_stream(source_data.clone()).is_ok());
643
644 assert!(session.create_stream(source_data).is_err());
646 }
647
648 #[test]
649 fn test_session_expiration() {
650 let config = SessionConfig {
651 session_timeout_seconds: 1,
652 ..Default::default()
653 };
654 let session = StreamSession::new(config);
655
656 assert!(!session.is_expired());
658
659 assert!(session.expires_at > session.created_at);
662 }
663
664 #[test]
665 fn test_domain_events() {
666 let mut session = StreamSession::new(SessionConfig::default());
667
668 assert!(session.activate().is_ok());
670 assert!(!session.pending_events().is_empty());
671
672 let events = session.take_events();
673 assert_eq!(events.len(), 1);
674
675 assert!(session.pending_events().is_empty());
677 }
678
679 #[test]
680 fn test_session_health() {
681 let mut session = StreamSession::new(SessionConfig::default());
682 assert!(session.activate().is_ok());
683
684 let health = session.health_check();
685 assert!(health.is_healthy);
686 assert_eq!(health.active_streams, 0);
687 assert_eq!(health.failed_streams, 0);
688 assert!(!health.is_expired);
689 assert!(health.uptime_seconds >= 0);
690 }
691}