Skip to main content

pjson_rs/domain/aggregates/
stream_session.rs

1//! StreamSession aggregate root managing multiple streams
2
3use crate::domain::{
4    DomainError, DomainResult,
5    entities::{Frame, Stream, stream::StreamConfig},
6    events::{DomainEvent, SessionState},
7    value_objects::{JsonData, Priority, SessionId, StreamId},
8};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12
13/// Custom serde for SessionId within aggregates
14mod serde_session_id {
15    use crate::domain::value_objects::SessionId;
16    use serde::{Deserialize, Deserializer, Serialize, Serializer};
17
18    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
19    where
20        S: Serializer,
21    {
22        id.as_uuid().serialize(serializer)
23    }
24
25    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
26    where
27        D: Deserializer<'de>,
28    {
29        let uuid = uuid::Uuid::deserialize(deserializer)?;
30        Ok(SessionId::from_uuid(uuid))
31    }
32}
33
34/// Custom serde for HashMap<StreamId, Stream>
35mod serde_stream_map {
36    use crate::domain::{entities::Stream, value_objects::StreamId};
37    use serde::{Deserialize, Deserializer, Serialize, Serializer};
38    use std::collections::HashMap;
39
40    pub fn serialize<S>(map: &HashMap<StreamId, Stream>, serializer: S) -> Result<S::Ok, S::Error>
41    where
42        S: Serializer,
43    {
44        let uuid_map: HashMap<String, &Stream> = map
45            .iter()
46            .map(|(k, v)| (k.as_uuid().to_string(), v))
47            .collect();
48        uuid_map.serialize(serializer)
49    }
50
51    pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<StreamId, Stream>, D::Error>
52    where
53        D: Deserializer<'de>,
54    {
55        let uuid_map: HashMap<String, Stream> = HashMap::deserialize(deserializer)?;
56        uuid_map
57            .into_iter()
58            .map(|(k, v)| {
59                uuid::Uuid::parse_str(&k)
60                    .map(|uuid| (StreamId::from_uuid(uuid), v))
61                    .map_err(serde::de::Error::custom)
62            })
63            .collect()
64    }
65}
66
67/// Session configuration
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct SessionConfig {
70    /// Maximum concurrent streams
71    pub max_concurrent_streams: usize,
72    /// Session timeout in seconds
73    pub session_timeout_seconds: u64,
74    /// Default stream configuration
75    pub default_stream_config: StreamConfig,
76    /// Enable session-level compression
77    pub enable_compression: bool,
78    /// Custom metadata
79    pub metadata: HashMap<String, String>,
80}
81
82impl Default for SessionConfig {
83    fn default() -> Self {
84        Self {
85            max_concurrent_streams: 10,
86            session_timeout_seconds: 3600, // 1 hour
87            default_stream_config: StreamConfig::default(),
88            enable_compression: true,
89            metadata: HashMap::new(),
90        }
91    }
92}
93
94/// Session statistics and monitoring
95#[derive(Debug, Clone, Default, Serialize, Deserialize)]
96pub struct SessionStats {
97    /// Total number of streams the session has hosted.
98    pub total_streams: u64,
99    /// Number of streams currently in an active state.
100    pub active_streams: u64,
101    /// Number of streams that completed successfully.
102    pub completed_streams: u64,
103    /// Number of streams that terminated with an error.
104    pub failed_streams: u64,
105    /// Total number of frames emitted by the session.
106    pub total_frames: u64,
107    /// Total payload bytes emitted by the session.
108    pub total_bytes: u64,
109    /// Average per-stream duration, in milliseconds.
110    pub average_stream_duration_ms: f64,
111}
112
113/// StreamSession aggregate root - manages multiple prioritized streams
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct StreamSession {
116    #[serde(with = "serde_session_id")]
117    id: SessionId,
118    state: SessionState,
119    config: SessionConfig,
120    stats: SessionStats,
121    created_at: DateTime<Utc>,
122    updated_at: DateTime<Utc>,
123    expires_at: DateTime<Utc>,
124    completed_at: Option<DateTime<Utc>>,
125
126    // Aggregate state
127    #[serde(with = "serde_stream_map")]
128    streams: HashMap<StreamId, Stream>,
129    pending_events: VecDeque<DomainEvent>,
130
131    // Session metadata
132    client_info: Option<String>,
133    user_agent: Option<String>,
134    ip_address: Option<String>,
135}
136
137impl StreamSession {
138    /// Create new session
139    pub fn new(config: SessionConfig) -> Self {
140        let now = Utc::now();
141        let expires_at = now + chrono::Duration::seconds(config.session_timeout_seconds as i64);
142
143        Self {
144            id: SessionId::new(),
145            state: SessionState::Initializing,
146            config,
147            stats: SessionStats::default(),
148            created_at: now,
149            updated_at: now,
150            expires_at,
151            completed_at: None,
152            streams: HashMap::new(),
153            pending_events: VecDeque::new(),
154            client_info: None,
155            user_agent: None,
156            ip_address: None,
157        }
158    }
159
160    /// Get session ID
161    pub fn id(&self) -> SessionId {
162        self.id
163    }
164
165    /// Get current state
166    pub fn state(&self) -> &SessionState {
167        &self.state
168    }
169
170    /// Get configuration
171    pub fn config(&self) -> &SessionConfig {
172        &self.config
173    }
174
175    /// Get statistics
176    pub fn stats(&self) -> &SessionStats {
177        &self.stats
178    }
179
180    /// Get creation timestamp
181    pub fn created_at(&self) -> DateTime<Utc> {
182        self.created_at
183    }
184
185    /// Get last update timestamp
186    pub fn updated_at(&self) -> DateTime<Utc> {
187        self.updated_at
188    }
189
190    /// Get expiration timestamp
191    pub fn expires_at(&self) -> DateTime<Utc> {
192        self.expires_at
193    }
194
195    /// Get completion timestamp
196    pub fn completed_at(&self) -> Option<DateTime<Utc>> {
197        self.completed_at
198    }
199
200    /// Get client info metadata
201    pub fn client_info(&self) -> Option<&str> {
202        self.client_info.as_deref()
203    }
204
205    /// Get session duration if completed
206    pub fn duration(&self) -> Option<chrono::Duration> {
207        self.completed_at.map(|end| end - self.created_at)
208    }
209
210    /// Check if session is expired
211    pub fn is_expired(&self) -> bool {
212        Utc::now() > self.expires_at
213    }
214
215    /// Check if session is active
216    pub fn is_active(&self) -> bool {
217        matches!(self.state, SessionState::Active) && !self.is_expired()
218    }
219
220    /// Get all streams
221    pub fn streams(&self) -> &HashMap<StreamId, Stream> {
222        &self.streams
223    }
224
225    /// Get stream by ID
226    pub fn get_stream(&self, stream_id: StreamId) -> Option<&Stream> {
227        self.streams.get(&stream_id)
228    }
229
230    /// Update configuration of a child stream through the aggregate root.
231    ///
232    /// Child mutations must flow through the aggregate so the session-level
233    /// timestamp is bumped and a [`DomainEvent::StreamConfigUpdated`] event is
234    /// raised. Application code MUST NOT reach into `Stream` directly to
235    /// mutate config — the previous `get_stream_mut` accessor that allowed
236    /// this was a DDD violation (see issue #259).
237    pub fn update_stream_config(
238        &mut self,
239        stream_id: StreamId,
240        config: StreamConfig,
241    ) -> DomainResult<()> {
242        let stream = self
243            .streams
244            .get_mut(&stream_id)
245            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
246
247        stream.update_config(config)?;
248        self.update_timestamp();
249
250        self.add_event(DomainEvent::StreamConfigUpdated {
251            session_id: self.id,
252            stream_id,
253            timestamp: Utc::now(),
254        });
255
256        Ok(())
257    }
258
259    /// Generate patch frames for a child stream through the aggregate root.
260    ///
261    /// Wraps [`Stream::create_patch_frames`] so that session-level statistics
262    /// (`stats.total_frames`) and the `updated_at` timestamp stay consistent
263    /// with the child mutation, and a [`DomainEvent::FramesBatched`] event is
264    /// raised when frames are produced.
265    pub fn create_stream_patch_frames(
266        &mut self,
267        stream_id: StreamId,
268        priority_threshold: Priority,
269        max_frames: usize,
270    ) -> DomainResult<Vec<Frame>> {
271        let stream = self
272            .streams
273            .get_mut(&stream_id)
274            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
275
276        let frames = stream.create_patch_frames(priority_threshold, max_frames)?;
277
278        self.stats.total_frames += frames.len() as u64;
279        self.update_timestamp();
280
281        if !frames.is_empty() {
282            self.add_event(DomainEvent::FramesBatched {
283                session_id: self.id,
284                frame_count: frames.len(),
285                timestamp: Utc::now(),
286            });
287        }
288
289        Ok(frames)
290    }
291
292    /// Activate session
293    pub fn activate(&mut self) -> DomainResult<()> {
294        match self.state {
295            SessionState::Initializing => {
296                self.state = SessionState::Active;
297                self.update_timestamp();
298
299                self.add_event(DomainEvent::SessionActivated {
300                    session_id: self.id,
301                    timestamp: Utc::now(),
302                });
303
304                Ok(())
305            }
306            _ => Err(DomainError::InvalidStateTransition(format!(
307                "Cannot activate session from state: {:?}",
308                self.state
309            ))),
310        }
311    }
312
313    /// Create new stream in this session
314    pub fn create_stream(&mut self, source_data: JsonData) -> DomainResult<StreamId> {
315        if !self.is_active() {
316            return Err(DomainError::InvalidSessionState(
317                "Session is not active".to_string(),
318            ));
319        }
320
321        if self.streams.len() >= self.config.max_concurrent_streams {
322            return Err(DomainError::TooManyStreams(format!(
323                "Maximum {} concurrent streams exceeded",
324                self.config.max_concurrent_streams
325            )));
326        }
327
328        // source_data is now already JsonData (domain type)
329        let domain_data = source_data;
330
331        let stream = Stream::new(
332            self.id,
333            domain_data,
334            self.config.default_stream_config.clone(),
335        );
336        let stream_id = stream.id();
337
338        self.streams.insert(stream_id, stream);
339        self.stats.total_streams += 1;
340        self.stats.active_streams += 1;
341        self.update_timestamp();
342
343        self.add_event(DomainEvent::StreamCreated {
344            session_id: self.id,
345            stream_id,
346            timestamp: Utc::now(),
347        });
348
349        Ok(stream_id)
350    }
351
352    /// Start streaming for a specific stream
353    pub fn start_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
354        let stream = self
355            .streams
356            .get_mut(&stream_id)
357            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
358
359        stream.start_streaming()?;
360        self.update_timestamp();
361
362        self.add_event(DomainEvent::StreamStarted {
363            session_id: self.id,
364            stream_id,
365            timestamp: Utc::now(),
366        });
367
368        Ok(())
369    }
370
371    /// Complete a specific stream
372    pub fn complete_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
373        let stream = self
374            .streams
375            .get_mut(&stream_id)
376            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
377
378        stream.complete()?;
379
380        // Update session stats
381        self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
382        self.stats.completed_streams += 1;
383
384        // Update average duration
385        if let Some(duration) = stream.duration() {
386            let duration_ms = duration.num_milliseconds() as f64;
387            self.stats.average_stream_duration_ms =
388                (self.stats.average_stream_duration_ms + duration_ms) / 2.0;
389        }
390
391        self.update_timestamp();
392
393        self.add_event(DomainEvent::StreamCompleted {
394            session_id: self.id,
395            stream_id,
396            timestamp: Utc::now(),
397        });
398
399        Ok(())
400    }
401
402    /// Fail a specific stream
403    pub fn fail_stream(&mut self, stream_id: StreamId, error: String) -> DomainResult<()> {
404        let stream = self
405            .streams
406            .get_mut(&stream_id)
407            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
408
409        stream.fail(error.clone())?;
410
411        // Update session stats
412        self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
413        self.stats.failed_streams += 1;
414
415        self.update_timestamp();
416
417        self.add_event(DomainEvent::StreamFailed {
418            session_id: self.id,
419            stream_id,
420            error,
421            timestamp: Utc::now(),
422        });
423
424        Ok(())
425    }
426
427    /// Create frames for all active streams based on priority
428    pub fn create_priority_frames(&mut self, batch_size: usize) -> DomainResult<Vec<Frame>> {
429        if !self.is_active() {
430            return Err(DomainError::InvalidSessionState(
431                "Session is not active".to_string(),
432            ));
433        }
434
435        let mut all_frames = Vec::new();
436        let mut frame_count = 0;
437
438        // Collect frames from all active streams, sorted by priority
439        let mut stream_frames: Vec<(Priority, StreamId, Frame)> = Vec::new();
440
441        for (stream_id, stream) in &mut self.streams {
442            if !stream.is_active() {
443                continue;
444            }
445
446            // Try to create frames from this stream
447            let frames = stream.create_patch_frames(Priority::BACKGROUND, 5)?;
448
449            for frame in frames {
450                let priority = frame.priority();
451                stream_frames.push((priority, *stream_id, frame));
452            }
453        }
454
455        // Sort by priority (descending)
456        stream_frames.sort_by_key(|frame| std::cmp::Reverse(frame.0));
457
458        // Take up to batch_size frames
459        for (_, _, frame) in stream_frames.into_iter().take(batch_size) {
460            all_frames.push(frame);
461            frame_count += 1;
462        }
463
464        // Update session stats
465        self.stats.total_frames += frame_count;
466        self.update_timestamp();
467
468        if !all_frames.is_empty() {
469            self.add_event(DomainEvent::FramesBatched {
470                session_id: self.id,
471                frame_count: all_frames.len(),
472                timestamp: Utc::now(),
473            });
474        }
475
476        Ok(all_frames)
477    }
478
479    /// Close session gracefully
480    pub fn close(&mut self) -> DomainResult<()> {
481        match self.state {
482            SessionState::Active => {
483                self.state = SessionState::Closing;
484
485                // Close all active streams
486                let active_stream_ids: Vec<_> = self
487                    .streams
488                    .iter()
489                    .filter(|(_, stream)| stream.is_active())
490                    .map(|(id, _)| *id)
491                    .collect();
492
493                for stream_id in active_stream_ids {
494                    if let Some(stream) = self.streams.get_mut(&stream_id) {
495                        let _ = stream.cancel(); // Best effort
496                    }
497                }
498
499                self.state = SessionState::Completed;
500                self.completed_at = Some(Utc::now());
501                self.update_timestamp();
502
503                self.add_event(DomainEvent::SessionClosed {
504                    session_id: self.id,
505                    timestamp: Utc::now(),
506                });
507
508                Ok(())
509            }
510            _ => Err(DomainError::InvalidStateTransition(format!(
511                "Cannot close session from state: {:?}",
512                self.state
513            ))),
514        }
515    }
516
517    /// Force close expired session with proper cleanup
518    pub fn force_close_expired(&mut self) -> DomainResult<bool> {
519        if !self.is_expired() {
520            return Ok(false);
521        }
522
523        // Force close regardless of current state
524        let old_state = self.state.clone();
525        self.state = SessionState::Failed;
526        self.completed_at = Some(Utc::now());
527        self.update_timestamp();
528
529        // Force cancel all streams with timeout reason
530        for stream in self.streams.values_mut() {
531            let _ = stream.cancel(); // Best effort cleanup
532        }
533
534        // Clear stream collections for memory cleanup
535        self.streams.clear();
536
537        // Emit timeout event
538        self.add_event(DomainEvent::SessionTimedOut {
539            session_id: self.id,
540            original_state: old_state,
541            timeout_duration: self.config.session_timeout_seconds,
542            timestamp: Utc::now(),
543        });
544
545        Ok(true)
546    }
547
548    /// Extend session timeout (if allowed)
549    pub fn extend_timeout(&mut self, additional_seconds: u64) -> DomainResult<()> {
550        if self.is_expired() {
551            return Err(DomainError::InvalidStateTransition(
552                "Cannot extend timeout for expired session".to_string(),
553            ));
554        }
555
556        self.expires_at += chrono::Duration::seconds(additional_seconds as i64);
557        self.update_timestamp();
558
559        self.add_event(DomainEvent::SessionTimeoutExtended {
560            session_id: self.id,
561            additional_seconds,
562            new_expires_at: self.expires_at,
563            timestamp: Utc::now(),
564        });
565
566        Ok(())
567    }
568
569    /// Set client information
570    pub fn set_client_info(
571        &mut self,
572        client_info: String,
573        user_agent: Option<String>,
574        ip_address: Option<String>,
575    ) {
576        self.client_info = Some(client_info);
577        self.user_agent = user_agent;
578        self.ip_address = ip_address;
579        self.update_timestamp();
580    }
581
582    /// Get pending domain events
583    pub fn pending_events(&self) -> &VecDeque<DomainEvent> {
584        &self.pending_events
585    }
586
587    /// Take all pending events (clears the queue)
588    pub fn take_events(&mut self) -> VecDeque<DomainEvent> {
589        std::mem::take(&mut self.pending_events)
590    }
591
592    /// Check session health
593    pub fn health_check(&self) -> SessionHealth {
594        let active_count = self.streams.values().filter(|s| s.is_active()).count();
595        let failed_count = self
596            .streams
597            .values()
598            .filter(|s| {
599                matches!(
600                    s.state(),
601                    crate::domain::entities::stream::StreamState::Failed
602                )
603            })
604            .count();
605
606        SessionHealth {
607            is_healthy: self.is_active() && failed_count == 0,
608            active_streams: active_count,
609            failed_streams: failed_count,
610            is_expired: self.is_expired(),
611            uptime_seconds: (Utc::now() - self.created_at).num_seconds(),
612        }
613    }
614
615    /// Private helper: Add domain event
616    fn add_event(&mut self, event: DomainEvent) {
617        self.pending_events.push_back(event);
618    }
619
620    /// Private helper: Update timestamp
621    fn update_timestamp(&mut self) {
622        self.updated_at = Utc::now();
623    }
624}
625
626/// Session health information
627#[derive(Debug, Clone, Serialize, Deserialize)]
628pub struct SessionHealth {
629    /// Aggregate health flag derived from rates and recent activity.
630    pub is_healthy: bool,
631    /// Number of streams currently in an active state.
632    pub active_streams: usize,
633    /// Number of streams that have terminated with an error.
634    pub failed_streams: usize,
635    /// Whether the session has passed its expiry instant.
636    pub is_expired: bool,
637    /// Seconds elapsed since the session was created.
638    pub uptime_seconds: i64,
639}
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644
645    #[test]
646    fn test_session_creation_and_activation() {
647        let mut session = StreamSession::new(SessionConfig::default());
648
649        assert_eq!(session.state(), &SessionState::Initializing);
650        assert!(!session.is_active());
651
652        assert!(session.activate().is_ok());
653        assert_eq!(session.state(), &SessionState::Active);
654        assert!(session.is_active());
655    }
656
657    #[test]
658    fn test_stream_management() {
659        let mut session = StreamSession::new(SessionConfig::default());
660        assert!(session.activate().is_ok());
661
662        let mut map = HashMap::new();
663        map.insert("test".to_string(), JsonData::String("data".to_string()));
664        let source_data = JsonData::Object(map);
665
666        // Create stream
667        let stream_id = session.create_stream(source_data).unwrap();
668        assert_eq!(session.streams().len(), 1);
669        assert_eq!(session.stats().total_streams, 1);
670        assert_eq!(session.stats().active_streams, 1);
671
672        // Start stream
673        assert!(session.start_stream(stream_id).is_ok());
674
675        // Complete stream
676        assert!(session.complete_stream(stream_id).is_ok());
677        assert_eq!(session.stats().active_streams, 0);
678        assert_eq!(session.stats().completed_streams, 1);
679    }
680
681    #[test]
682    fn test_concurrent_stream_limit() {
683        let config = SessionConfig {
684            max_concurrent_streams: 2,
685            ..Default::default()
686        };
687        let mut session = StreamSession::new(config);
688        assert!(session.activate().is_ok());
689
690        let source_data = JsonData::Object(HashMap::new());
691
692        // Create max streams
693        assert!(session.create_stream(source_data.clone()).is_ok());
694        assert!(session.create_stream(source_data.clone()).is_ok());
695
696        // Should fail to create third stream
697        assert!(session.create_stream(source_data).is_err());
698    }
699
700    #[test]
701    fn test_session_expiration() {
702        let config = SessionConfig {
703            session_timeout_seconds: 1,
704            ..Default::default()
705        };
706        let session = StreamSession::new(config);
707
708        // Session should not be expired immediately
709        assert!(!session.is_expired());
710
711        // Would need to sleep for 1+ seconds to test expiration in real scenario
712        // For unit test, we verify the expiration logic exists
713        assert!(session.expires_at > session.created_at);
714    }
715
716    #[test]
717    fn test_domain_events() {
718        let mut session = StreamSession::new(SessionConfig::default());
719
720        // Events should be generated for state transitions
721        assert!(session.activate().is_ok());
722        assert!(!session.pending_events().is_empty());
723
724        let events = session.take_events();
725        assert_eq!(events.len(), 1);
726
727        // Events queue should be empty after taking
728        assert!(session.pending_events().is_empty());
729    }
730
731    #[test]
732    fn test_session_health() {
733        let mut session = StreamSession::new(SessionConfig::default());
734        assert!(session.activate().is_ok());
735
736        let health = session.health_check();
737        assert!(health.is_healthy);
738        assert_eq!(health.active_streams, 0);
739        assert_eq!(health.failed_streams, 0);
740        assert!(!health.is_expired);
741        assert!(health.uptime_seconds >= 0);
742    }
743}