pjson_rs/domain/aggregates/
stream_session.rs

1//! StreamSession aggregate root managing multiple streams
2
3use crate::domain::{
4    DomainError, DomainResult,
5    entities::{Frame, Stream, stream::StreamConfig},
6    events::{DomainEvent, SessionState},
7    value_objects::{JsonData, Priority, SessionId, StreamId},
8};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12
13/// Custom serde for SessionId within aggregates
14mod serde_session_id {
15    use crate::domain::value_objects::SessionId;
16    use serde::{Deserialize, Deserializer, Serialize, Serializer};
17
18    pub fn serialize<S>(id: &SessionId, serializer: S) -> Result<S::Ok, S::Error>
19    where
20        S: Serializer,
21    {
22        id.as_uuid().serialize(serializer)
23    }
24
25    pub fn deserialize<'de, D>(deserializer: D) -> Result<SessionId, D::Error>
26    where
27        D: Deserializer<'de>,
28    {
29        let uuid = uuid::Uuid::deserialize(deserializer)?;
30        Ok(SessionId::from_uuid(uuid))
31    }
32}
33
34/// Custom serde for StreamId within aggregates
35#[allow(dead_code)]
36mod serde_stream_id {
37    use crate::domain::value_objects::StreamId;
38    use serde::{Deserialize, Deserializer, Serialize, Serializer};
39
40    pub fn serialize<S>(id: &StreamId, serializer: S) -> Result<S::Ok, S::Error>
41    where
42        S: Serializer,
43    {
44        id.as_uuid().serialize(serializer)
45    }
46
47    pub fn deserialize<'de, D>(deserializer: D) -> Result<StreamId, D::Error>
48    where
49        D: Deserializer<'de>,
50    {
51        let uuid = uuid::Uuid::deserialize(deserializer)?;
52        Ok(StreamId::from_uuid(uuid))
53    }
54}
55
56/// Custom serde for HashMap<StreamId, Stream>
57mod serde_stream_map {
58    use crate::domain::{entities::Stream, value_objects::StreamId};
59    use serde::{Deserialize, Deserializer, Serialize, Serializer};
60    use std::collections::HashMap;
61
62    pub fn serialize<S>(map: &HashMap<StreamId, Stream>, serializer: S) -> Result<S::Ok, S::Error>
63    where
64        S: Serializer,
65    {
66        let uuid_map: HashMap<String, &Stream> = map
67            .iter()
68            .map(|(k, v)| (k.as_uuid().to_string(), v))
69            .collect();
70        uuid_map.serialize(serializer)
71    }
72
73    pub fn deserialize<'de, D>(deserializer: D) -> Result<HashMap<StreamId, Stream>, D::Error>
74    where
75        D: Deserializer<'de>,
76    {
77        let uuid_map: HashMap<String, Stream> = HashMap::deserialize(deserializer)?;
78        uuid_map
79            .into_iter()
80            .map(|(k, v)| {
81                uuid::Uuid::parse_str(&k)
82                    .map(|uuid| (StreamId::from_uuid(uuid), v))
83                    .map_err(serde::de::Error::custom)
84            })
85            .collect()
86    }
87}
88
89/// Session configuration
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct SessionConfig {
92    /// Maximum concurrent streams
93    pub max_concurrent_streams: usize,
94    /// Session timeout in seconds
95    pub session_timeout_seconds: u64,
96    /// Default stream configuration
97    pub default_stream_config: StreamConfig,
98    /// Enable session-level compression
99    pub enable_compression: bool,
100    /// Custom metadata
101    pub metadata: HashMap<String, String>,
102}
103
104impl Default for SessionConfig {
105    fn default() -> Self {
106        Self {
107            max_concurrent_streams: 10,
108            session_timeout_seconds: 3600, // 1 hour
109            default_stream_config: StreamConfig::default(),
110            enable_compression: true,
111            metadata: HashMap::new(),
112        }
113    }
114}
115
116/// Session statistics and monitoring
117#[derive(Debug, Clone, Default, Serialize, Deserialize)]
118pub struct SessionStats {
119    pub total_streams: u64,
120    pub active_streams: u64,
121    pub completed_streams: u64,
122    pub failed_streams: u64,
123    pub total_frames: u64,
124    pub total_bytes: u64,
125    pub average_stream_duration_ms: f64,
126}
127
128/// StreamSession aggregate root - manages multiple prioritized streams
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct StreamSession {
131    #[serde(with = "serde_session_id")]
132    id: SessionId,
133    state: SessionState,
134    config: SessionConfig,
135    stats: SessionStats,
136    created_at: DateTime<Utc>,
137    updated_at: DateTime<Utc>,
138    expires_at: DateTime<Utc>,
139    completed_at: Option<DateTime<Utc>>,
140
141    // Aggregate state
142    #[serde(with = "serde_stream_map")]
143    streams: HashMap<StreamId, Stream>,
144    pending_events: VecDeque<DomainEvent>,
145
146    // Session metadata
147    client_info: Option<String>,
148    user_agent: Option<String>,
149    ip_address: Option<String>,
150}
151
152impl StreamSession {
153    /// Create new session
154    pub fn new(config: SessionConfig) -> Self {
155        let now = Utc::now();
156        let expires_at = now + chrono::Duration::seconds(config.session_timeout_seconds as i64);
157
158        Self {
159            id: SessionId::new(),
160            state: SessionState::Initializing,
161            config,
162            stats: SessionStats::default(),
163            created_at: now,
164            updated_at: now,
165            expires_at,
166            completed_at: None,
167            streams: HashMap::new(),
168            pending_events: VecDeque::new(),
169            client_info: None,
170            user_agent: None,
171            ip_address: None,
172        }
173    }
174
175    /// Get session ID
176    pub fn id(&self) -> SessionId {
177        self.id
178    }
179
180    /// Get current state
181    pub fn state(&self) -> &SessionState {
182        &self.state
183    }
184
185    /// Get configuration
186    pub fn config(&self) -> &SessionConfig {
187        &self.config
188    }
189
190    /// Get statistics
191    pub fn stats(&self) -> &SessionStats {
192        &self.stats
193    }
194
195    /// Get creation timestamp
196    pub fn created_at(&self) -> DateTime<Utc> {
197        self.created_at
198    }
199
200    /// Get last update timestamp
201    pub fn updated_at(&self) -> DateTime<Utc> {
202        self.updated_at
203    }
204
205    /// Get expiration timestamp
206    pub fn expires_at(&self) -> DateTime<Utc> {
207        self.expires_at
208    }
209
210    /// Get completion timestamp
211    pub fn completed_at(&self) -> Option<DateTime<Utc>> {
212        self.completed_at
213    }
214
215    /// Get session duration if completed
216    pub fn duration(&self) -> Option<chrono::Duration> {
217        self.completed_at.map(|end| end - self.created_at)
218    }
219
220    /// Check if session is expired
221    pub fn is_expired(&self) -> bool {
222        Utc::now() > self.expires_at
223    }
224
225    /// Check if session is active
226    pub fn is_active(&self) -> bool {
227        matches!(self.state, SessionState::Active) && !self.is_expired()
228    }
229
230    /// Get all streams
231    pub fn streams(&self) -> &HashMap<StreamId, Stream> {
232        &self.streams
233    }
234
235    /// Get stream by ID
236    pub fn get_stream(&self, stream_id: StreamId) -> Option<&Stream> {
237        self.streams.get(&stream_id)
238    }
239
240    /// Get mutable stream by ID
241    pub fn get_stream_mut(&mut self, stream_id: StreamId) -> Option<&mut Stream> {
242        self.streams.get_mut(&stream_id)
243    }
244
245    /// Activate session
246    pub fn activate(&mut self) -> DomainResult<()> {
247        match self.state {
248            SessionState::Initializing => {
249                self.state = SessionState::Active;
250                self.update_timestamp();
251
252                self.add_event(DomainEvent::SessionActivated {
253                    session_id: self.id,
254                    timestamp: Utc::now(),
255                });
256
257                Ok(())
258            }
259            _ => Err(DomainError::InvalidStateTransition(format!(
260                "Cannot activate session from state: {:?}",
261                self.state
262            ))),
263        }
264    }
265
266    /// Create new stream in this session
267    pub fn create_stream(&mut self, source_data: JsonData) -> DomainResult<StreamId> {
268        if !self.is_active() {
269            return Err(DomainError::InvalidSessionState(
270                "Session is not active".to_string(),
271            ));
272        }
273
274        if self.streams.len() >= self.config.max_concurrent_streams {
275            return Err(DomainError::TooManyStreams(format!(
276                "Maximum {} concurrent streams exceeded",
277                self.config.max_concurrent_streams
278            )));
279        }
280
281        // source_data is now already JsonData (domain type)
282        let domain_data = source_data;
283
284        let stream = Stream::new(
285            self.id,
286            domain_data,
287            self.config.default_stream_config.clone(),
288        );
289        let stream_id = stream.id();
290
291        self.streams.insert(stream_id, stream);
292        self.stats.total_streams += 1;
293        self.stats.active_streams += 1;
294        self.update_timestamp();
295
296        self.add_event(DomainEvent::StreamCreated {
297            session_id: self.id,
298            stream_id,
299            timestamp: Utc::now(),
300        });
301
302        Ok(stream_id)
303    }
304
305    /// Start streaming for a specific stream
306    pub fn start_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
307        let stream = self
308            .streams
309            .get_mut(&stream_id)
310            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
311
312        stream.start_streaming()?;
313        self.update_timestamp();
314
315        self.add_event(DomainEvent::StreamStarted {
316            session_id: self.id,
317            stream_id,
318            timestamp: Utc::now(),
319        });
320
321        Ok(())
322    }
323
324    /// Complete a specific stream
325    pub fn complete_stream(&mut self, stream_id: StreamId) -> DomainResult<()> {
326        let stream = self
327            .streams
328            .get_mut(&stream_id)
329            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
330
331        stream.complete()?;
332
333        // Update session stats
334        self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
335        self.stats.completed_streams += 1;
336
337        // Update average duration
338        if let Some(duration) = stream.duration() {
339            let duration_ms = duration.num_milliseconds() as f64;
340            self.stats.average_stream_duration_ms =
341                (self.stats.average_stream_duration_ms + duration_ms) / 2.0;
342        }
343
344        self.update_timestamp();
345
346        self.add_event(DomainEvent::StreamCompleted {
347            session_id: self.id,
348            stream_id,
349            timestamp: Utc::now(),
350        });
351
352        Ok(())
353    }
354
355    /// Fail a specific stream
356    pub fn fail_stream(&mut self, stream_id: StreamId, error: String) -> DomainResult<()> {
357        let stream = self
358            .streams
359            .get_mut(&stream_id)
360            .ok_or_else(|| DomainError::StreamNotFound(stream_id.to_string()))?;
361
362        stream.fail(error.clone())?;
363
364        // Update session stats
365        self.stats.active_streams = self.stats.active_streams.saturating_sub(1);
366        self.stats.failed_streams += 1;
367
368        self.update_timestamp();
369
370        self.add_event(DomainEvent::StreamFailed {
371            session_id: self.id,
372            stream_id,
373            error,
374            timestamp: Utc::now(),
375        });
376
377        Ok(())
378    }
379
380    /// Create frames for all active streams based on priority
381    pub fn create_priority_frames(&mut self, batch_size: usize) -> DomainResult<Vec<Frame>> {
382        if !self.is_active() {
383            return Err(DomainError::InvalidSessionState(
384                "Session is not active".to_string(),
385            ));
386        }
387
388        let mut all_frames = Vec::new();
389        let mut frame_count = 0;
390
391        // Collect frames from all active streams, sorted by priority
392        let mut stream_frames: Vec<(Priority, StreamId, Frame)> = Vec::new();
393
394        for (stream_id, stream) in &mut self.streams {
395            if !stream.is_active() {
396                continue;
397            }
398
399            // Try to create frames from this stream
400            let frames = stream.create_patch_frames(Priority::BACKGROUND, 5)?;
401
402            for frame in frames {
403                let priority = frame.priority();
404                stream_frames.push((priority, *stream_id, frame));
405            }
406        }
407
408        // Sort by priority (descending)
409        stream_frames.sort_by_key(|frame| std::cmp::Reverse(frame.0));
410
411        // Take up to batch_size frames
412        for (_, _, frame) in stream_frames.into_iter().take(batch_size) {
413            all_frames.push(frame);
414            frame_count += 1;
415        }
416
417        // Update session stats
418        self.stats.total_frames += frame_count;
419        self.update_timestamp();
420
421        if !all_frames.is_empty() {
422            self.add_event(DomainEvent::FramesBatched {
423                session_id: self.id,
424                frame_count: all_frames.len(),
425                timestamp: Utc::now(),
426            });
427        }
428
429        Ok(all_frames)
430    }
431
432    /// Close session gracefully
433    pub fn close(&mut self) -> DomainResult<()> {
434        match self.state {
435            SessionState::Active => {
436                self.state = SessionState::Closing;
437
438                // Close all active streams
439                let active_stream_ids: Vec<_> = self
440                    .streams
441                    .iter()
442                    .filter(|(_, stream)| stream.is_active())
443                    .map(|(id, _)| *id)
444                    .collect();
445
446                for stream_id in active_stream_ids {
447                    if let Some(stream) = self.streams.get_mut(&stream_id) {
448                        let _ = stream.cancel(); // Best effort
449                    }
450                }
451
452                self.state = SessionState::Completed;
453                self.completed_at = Some(Utc::now());
454                self.update_timestamp();
455
456                self.add_event(DomainEvent::SessionClosed {
457                    session_id: self.id,
458                    timestamp: Utc::now(),
459                });
460
461                Ok(())
462            }
463            _ => Err(DomainError::InvalidStateTransition(format!(
464                "Cannot close session from state: {:?}",
465                self.state
466            ))),
467        }
468    }
469
470    /// Force close expired session with proper cleanup
471    pub fn force_close_expired(&mut self) -> DomainResult<bool> {
472        if !self.is_expired() {
473            return Ok(false);
474        }
475
476        // Force close regardless of current state
477        let old_state = self.state.clone();
478        self.state = SessionState::Failed;
479        self.completed_at = Some(Utc::now());
480        self.update_timestamp();
481
482        // Force cancel all streams with timeout reason
483        for stream in self.streams.values_mut() {
484            let _ = stream.cancel(); // Best effort cleanup
485        }
486
487        // Clear stream collections for memory cleanup
488        self.streams.clear();
489
490        // Emit timeout event
491        self.add_event(DomainEvent::SessionTimedOut {
492            session_id: self.id,
493            original_state: old_state,
494            timeout_duration: self.config.session_timeout_seconds,
495            timestamp: Utc::now(),
496        });
497
498        Ok(true)
499    }
500
501    /// Extend session timeout (if allowed)
502    pub fn extend_timeout(&mut self, additional_seconds: u64) -> DomainResult<()> {
503        if self.is_expired() {
504            return Err(DomainError::InvalidStateTransition(
505                "Cannot extend timeout for expired session".to_string(),
506            ));
507        }
508
509        self.expires_at += chrono::Duration::seconds(additional_seconds as i64);
510        self.update_timestamp();
511
512        self.add_event(DomainEvent::SessionTimeoutExtended {
513            session_id: self.id,
514            additional_seconds,
515            new_expires_at: self.expires_at,
516            timestamp: Utc::now(),
517        });
518
519        Ok(())
520    }
521
522    /// Set client information
523    pub fn set_client_info(
524        &mut self,
525        client_info: String,
526        user_agent: Option<String>,
527        ip_address: Option<String>,
528    ) {
529        self.client_info = Some(client_info);
530        self.user_agent = user_agent;
531        self.ip_address = ip_address;
532        self.update_timestamp();
533    }
534
535    /// Get pending domain events
536    pub fn pending_events(&self) -> &VecDeque<DomainEvent> {
537        &self.pending_events
538    }
539
540    /// Take all pending events (clears the queue)
541    pub fn take_events(&mut self) -> VecDeque<DomainEvent> {
542        std::mem::take(&mut self.pending_events)
543    }
544
545    /// Check session health
546    pub fn health_check(&self) -> SessionHealth {
547        let active_count = self.streams.values().filter(|s| s.is_active()).count();
548        let failed_count = self
549            .streams
550            .values()
551            .filter(|s| {
552                matches!(
553                    s.state(),
554                    crate::domain::entities::stream::StreamState::Failed
555                )
556            })
557            .count();
558
559        SessionHealth {
560            is_healthy: self.is_active() && failed_count == 0,
561            active_streams: active_count,
562            failed_streams: failed_count,
563            is_expired: self.is_expired(),
564            uptime_seconds: (Utc::now() - self.created_at).num_seconds(),
565        }
566    }
567
568    /// Private helper: Add domain event
569    fn add_event(&mut self, event: DomainEvent) {
570        self.pending_events.push_back(event);
571    }
572
573    /// Private helper: Update timestamp
574    fn update_timestamp(&mut self) {
575        self.updated_at = Utc::now();
576    }
577}
578
579/// Session health information
580#[derive(Debug, Clone, Serialize, Deserialize)]
581pub struct SessionHealth {
582    pub is_healthy: bool,
583    pub active_streams: usize,
584    pub failed_streams: usize,
585    pub is_expired: bool,
586    pub uptime_seconds: i64,
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592
593    #[test]
594    fn test_session_creation_and_activation() {
595        let mut session = StreamSession::new(SessionConfig::default());
596
597        assert_eq!(session.state(), &SessionState::Initializing);
598        assert!(!session.is_active());
599
600        assert!(session.activate().is_ok());
601        assert_eq!(session.state(), &SessionState::Active);
602        assert!(session.is_active());
603    }
604
605    #[test]
606    fn test_stream_management() {
607        let mut session = StreamSession::new(SessionConfig::default());
608        assert!(session.activate().is_ok());
609
610        let mut map = HashMap::new();
611        map.insert("test".to_string(), JsonData::String("data".to_string()));
612        let source_data = JsonData::Object(map);
613
614        // Create stream
615        let stream_id = session.create_stream(source_data).unwrap();
616        assert_eq!(session.streams().len(), 1);
617        assert_eq!(session.stats().total_streams, 1);
618        assert_eq!(session.stats().active_streams, 1);
619
620        // Start stream
621        assert!(session.start_stream(stream_id).is_ok());
622
623        // Complete stream
624        assert!(session.complete_stream(stream_id).is_ok());
625        assert_eq!(session.stats().active_streams, 0);
626        assert_eq!(session.stats().completed_streams, 1);
627    }
628
629    #[test]
630    fn test_concurrent_stream_limit() {
631        let config = SessionConfig {
632            max_concurrent_streams: 2,
633            ..Default::default()
634        };
635        let mut session = StreamSession::new(config);
636        assert!(session.activate().is_ok());
637
638        let source_data = JsonData::Object(HashMap::new());
639
640        // Create max streams
641        assert!(session.create_stream(source_data.clone()).is_ok());
642        assert!(session.create_stream(source_data.clone()).is_ok());
643
644        // Should fail to create third stream
645        assert!(session.create_stream(source_data).is_err());
646    }
647
648    #[test]
649    fn test_session_expiration() {
650        let config = SessionConfig {
651            session_timeout_seconds: 1,
652            ..Default::default()
653        };
654        let session = StreamSession::new(config);
655
656        // Session should not be expired immediately
657        assert!(!session.is_expired());
658
659        // Would need to sleep for 1+ seconds to test expiration in real scenario
660        // For unit test, we verify the expiration logic exists
661        assert!(session.expires_at > session.created_at);
662    }
663
664    #[test]
665    fn test_domain_events() {
666        let mut session = StreamSession::new(SessionConfig::default());
667
668        // Events should be generated for state transitions
669        assert!(session.activate().is_ok());
670        assert!(!session.pending_events().is_empty());
671
672        let events = session.take_events();
673        assert_eq!(events.len(), 1);
674
675        // Events queue should be empty after taking
676        assert!(session.pending_events().is_empty());
677    }
678
679    #[test]
680    fn test_session_health() {
681        let mut session = StreamSession::new(SessionConfig::default());
682        assert!(session.activate().is_ok());
683
684        let health = session.health_check();
685        assert!(health.is_healthy);
686        assert_eq!(health.active_streams, 0);
687        assert_eq!(health.failed_streams, 0);
688        assert!(!health.is_expired);
689        assert!(health.uptime_seconds >= 0);
690    }
691}