pjson_rs/application/services/
stream_context.rs

1//! Stream context separation - Config vs Session state
2//!
3//! Separates configuration (static) from session state (dynamic runtime data)
4//! following the principle of separating concerns and improving performance.
5
6use serde::{Deserialize, Serialize};
7
8/// Static configuration for streaming operations
9/// This should be immutable after creation and can be shared across sessions
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct StreamConfig {
12    /// Maximum bandwidth allocation in Mbps
13    pub max_bandwidth_mbps: f64,
14
15    /// Latency threshold for priority adjustment in milliseconds
16    pub latency_threshold_ms: f64,
17
18    /// Maximum acceptable error rate (0.0-1.0)
19    pub max_error_rate: f64,
20
21    /// CPU usage threshold for throttling (0.0-1.0)
22    pub cpu_usage_threshold: f64,
23
24    /// Memory usage threshold for throttling (0.0-100.0)
25    pub memory_usage_threshold_percent: f64,
26
27    /// Maximum concurrent connections per session
28    pub max_connection_count: usize,
29
30    /// Default priority strategy
31    pub default_prioritization_strategy: PrioritizationStrategy,
32
33    /// Enable adaptive priority adjustment
34    pub enable_adaptive_priority: bool,
35
36    /// Buffer size configuration
37    pub buffer_size_kb: usize,
38
39    /// Compression settings
40    pub enable_compression: bool,
41    pub compression_level: u8,
42
43    /// Timeout configurations
44    pub session_timeout_seconds: u64,
45    pub frame_timeout_ms: u64,
46}
47
48impl Default for StreamConfig {
49    fn default() -> Self {
50        Self {
51            max_bandwidth_mbps: 100.0,
52            latency_threshold_ms: 500.0,
53            max_error_rate: 0.05,
54            cpu_usage_threshold: 0.8,
55            memory_usage_threshold_percent: 80.0,
56            max_connection_count: 100,
57            default_prioritization_strategy: PrioritizationStrategy::Balanced,
58            enable_adaptive_priority: true,
59            buffer_size_kb: 64,
60            enable_compression: true,
61            compression_level: 6,
62            session_timeout_seconds: 3600, // 1 hour
63            frame_timeout_ms: 30000,       // 30 seconds
64        }
65    }
66}
67
68/// Dynamic session state tracking runtime metrics and performance
69/// This data changes frequently during a session's lifetime
70#[derive(Debug, Clone)]
71pub struct StreamSession {
72    /// Current average latency in milliseconds
73    pub current_latency_ms: f64,
74
75    /// Current bandwidth utilization in Mbps
76    pub current_bandwidth_mbps: f64,
77
78    /// Current error rate (0.0-1.0)
79    pub current_error_rate: f64,
80
81    /// Current CPU usage (0.0-1.0)
82    pub current_cpu_usage: f64,
83
84    /// Current memory usage percentage (0.0-100.0)
85    pub current_memory_usage_percent: f64,
86
87    /// Current active connection count
88    pub active_connection_count: usize,
89
90    /// Number of frames processed in this session
91    pub frames_processed: u64,
92
93    /// Total bytes transferred in this session
94    pub bytes_transferred: u64,
95
96    /// Session start time
97    pub session_start_time: std::time::Instant,
98
99    /// Last update time for metrics
100    pub last_metrics_update: std::time::Instant,
101
102    /// Current priority override (if any)
103    pub priority_override: Option<crate::domain::value_objects::Priority>,
104
105    /// Performance trend (improving/degrading/stable)
106    pub performance_trend: PerformanceTrend,
107
108    /// Adaptive adjustments made during this session
109    pub adaptive_adjustments: Vec<AdaptiveAdjustment>,
110}
111
112impl Default for StreamSession {
113    fn default() -> Self {
114        let now = std::time::Instant::now();
115        Self {
116            current_latency_ms: 100.0,
117            current_bandwidth_mbps: 10.0,
118            current_error_rate: 0.01,
119            current_cpu_usage: 0.5,
120            current_memory_usage_percent: 60.0,
121            active_connection_count: 1,
122            frames_processed: 0,
123            bytes_transferred: 0,
124            session_start_time: now,
125            last_metrics_update: now,
126            priority_override: None,
127            performance_trend: PerformanceTrend::Stable,
128            adaptive_adjustments: Vec::new(),
129        }
130    }
131}
132
133/// Performance trend analysis
134#[derive(Debug, Clone, PartialEq)]
135pub enum PerformanceTrend {
136    Improving,
137    Stable,
138    Degrading,
139}
140
141/// Record of adaptive adjustments made during session
142#[derive(Debug, Clone)]
143pub struct AdaptiveAdjustment {
144    pub timestamp: std::time::Instant,
145    pub adjustment_type: AdjustmentType,
146    pub reason: String,
147    pub impact_estimate: f64, // estimated improvement (0.0-1.0)
148}
149
150#[derive(Debug, Clone)]
151pub enum AdjustmentType {
152    PriorityIncrease(u8),
153    PriorityDecrease(u8),
154    BufferSizeChange(usize),
155    CompressionToggle(bool),
156    ThrottleEnable,
157    ThrottleDisable,
158}
159
160/// Re-export prioritization strategy from existing module
161pub use super::prioritization_service::PrioritizationStrategy;
162
163/// Combined context that references both config and session
164/// This provides a unified interface while keeping concerns separated
165#[derive(Debug)]
166pub struct StreamContext<'a> {
167    pub config: &'a StreamConfig,
168    pub session: &'a StreamSession,
169}
170
171impl<'a> StreamContext<'a> {
172    pub fn new(config: &'a StreamConfig, session: &'a StreamSession) -> Self {
173        Self { config, session }
174    }
175
176    /// Check if current metrics exceed configured thresholds
177    pub fn is_threshold_exceeded(&self) -> bool {
178        self.session.current_latency_ms > self.config.latency_threshold_ms
179            || self.session.current_error_rate > self.config.max_error_rate
180            || self.session.current_cpu_usage > self.config.cpu_usage_threshold
181            || self.session.current_memory_usage_percent
182                > self.config.memory_usage_threshold_percent
183    }
184
185    /// Calculate performance score (0.0 = poor, 1.0 = excellent)
186    pub fn performance_score(&self) -> f64 {
187        let latency_score = (self.config.latency_threshold_ms
188            - self
189                .session
190                .current_latency_ms
191                .min(self.config.latency_threshold_ms))
192            / self.config.latency_threshold_ms;
193        let error_score =
194            1.0 - (self.session.current_error_rate / self.config.max_error_rate).min(1.0);
195        let cpu_score =
196            1.0 - (self.session.current_cpu_usage / self.config.cpu_usage_threshold).min(1.0);
197        let memory_score = 1.0
198            - (self.session.current_memory_usage_percent
199                / self.config.memory_usage_threshold_percent)
200                .min(1.0);
201
202        (latency_score + error_score + cpu_score + memory_score) / 4.0
203    }
204
205    /// Get session duration
206    pub fn session_duration(&self) -> std::time::Duration {
207        self.session
208            .last_metrics_update
209            .duration_since(self.session.session_start_time)
210    }
211
212    /// Check if session should timeout
213    pub fn should_timeout(&self) -> bool {
214        self.session_duration().as_secs() > self.config.session_timeout_seconds
215    }
216}
217
218/// Utility functions for migration from old PerformanceContext
219impl StreamSession {
220    /// Create from legacy PerformanceContext
221    pub fn from_performance_context(
222        ctx: &super::prioritization_service::PerformanceContext,
223    ) -> Self {
224        Self {
225            current_latency_ms: ctx.average_latency_ms,
226            current_bandwidth_mbps: ctx.available_bandwidth_mbps,
227            current_error_rate: ctx.error_rate,
228            current_cpu_usage: ctx.cpu_usage,
229            current_memory_usage_percent: ctx.memory_usage_percent,
230            active_connection_count: ctx.connection_count,
231            ..Default::default()
232        }
233    }
234
235    /// Convert to legacy PerformanceContext for backward compatibility
236    pub fn to_performance_context(&self) -> super::prioritization_service::PerformanceContext {
237        super::prioritization_service::PerformanceContext {
238            average_latency_ms: self.current_latency_ms,
239            available_bandwidth_mbps: self.current_bandwidth_mbps,
240            error_rate: self.current_error_rate,
241            cpu_usage: self.current_cpu_usage,
242            memory_usage_percent: self.current_memory_usage_percent,
243            connection_count: self.active_connection_count,
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    #[test]
253    fn test_stream_config_default() {
254        let config = StreamConfig::default();
255        assert_eq!(config.max_bandwidth_mbps, 100.0);
256        assert_eq!(config.latency_threshold_ms, 500.0);
257        assert!(config.enable_adaptive_priority);
258    }
259
260    #[test]
261    fn test_stream_session_default() {
262        let session = StreamSession::default();
263        assert_eq!(session.frames_processed, 0);
264        assert_eq!(session.bytes_transferred, 0);
265        assert_eq!(session.performance_trend, PerformanceTrend::Stable);
266    }
267
268    #[test]
269    fn test_stream_context_threshold_check() {
270        let config = StreamConfig::default();
271        let mut session = StreamSession::default();
272
273        // Normal operation - no thresholds exceeded
274        let context = StreamContext::new(&config, &session);
275        assert!(!context.is_threshold_exceeded());
276
277        // Exceed latency threshold
278        session.current_latency_ms = 600.0; // > 500.0 threshold
279        let context = StreamContext::new(&config, &session);
280        assert!(context.is_threshold_exceeded());
281    }
282
283    #[test]
284    fn test_performance_score_calculation() {
285        let config = StreamConfig::default();
286        let session = StreamSession::default(); // Good defaults
287
288        let context = StreamContext::new(&config, &session);
289        let score = context.performance_score();
290
291        // Should be a good score with default values
292        assert!(score > 0.5);
293        assert!(score <= 1.0);
294    }
295
296    #[test]
297    fn test_migration_from_performance_context() {
298        let perf_ctx = crate::application::services::prioritization_service::PerformanceContext {
299            average_latency_ms: 200.0,
300            available_bandwidth_mbps: 50.0,
301            error_rate: 0.02,
302            cpu_usage: 0.7,
303            memory_usage_percent: 75.0,
304            connection_count: 5,
305        };
306
307        let session = StreamSession::from_performance_context(&perf_ctx);
308
309        assert_eq!(session.current_latency_ms, 200.0);
310        assert_eq!(session.current_bandwidth_mbps, 50.0);
311        assert_eq!(session.current_error_rate, 0.02);
312        assert_eq!(session.current_cpu_usage, 0.7);
313        assert_eq!(session.current_memory_usage_percent, 75.0);
314        assert_eq!(session.active_connection_count, 5);
315
316        // Test round-trip conversion
317        let converted_back = session.to_performance_context();
318        assert_eq!(
319            converted_back.average_latency_ms,
320            perf_ctx.average_latency_ms
321        );
322        assert_eq!(
323            converted_back.available_bandwidth_mbps,
324            perf_ctx.available_bandwidth_mbps
325        );
326    }
327
328    #[test]
329    fn test_timeout_detection() {
330        let config = StreamConfig {
331            session_timeout_seconds: 1, // 1 second timeout
332            ..StreamConfig::default()
333        };
334
335        let mut session = StreamSession {
336            session_start_time: std::time::Instant::now() - std::time::Duration::from_secs(2),
337            ..StreamSession::default()
338        };
339        session.last_metrics_update = std::time::Instant::now();
340
341        let context = StreamContext::new(&config, &session);
342        assert!(context.should_timeout());
343    }
344}