pjson_rs/application/services/
stream_context.rs1use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct StreamConfig {
12 pub max_bandwidth_mbps: f64,
14
15 pub latency_threshold_ms: f64,
17
18 pub max_error_rate: f64,
20
21 pub cpu_usage_threshold: f64,
23
24 pub memory_usage_threshold_percent: f64,
26
27 pub max_connection_count: usize,
29
30 pub default_prioritization_strategy: PrioritizationStrategy,
32
33 pub enable_adaptive_priority: bool,
35
36 pub buffer_size_kb: usize,
38
39 pub enable_compression: bool,
41 pub compression_level: u8,
42
43 pub session_timeout_seconds: u64,
45 pub frame_timeout_ms: u64,
46}
47
48impl Default for StreamConfig {
49 fn default() -> Self {
50 Self {
51 max_bandwidth_mbps: 100.0,
52 latency_threshold_ms: 500.0,
53 max_error_rate: 0.05,
54 cpu_usage_threshold: 0.8,
55 memory_usage_threshold_percent: 80.0,
56 max_connection_count: 100,
57 default_prioritization_strategy: PrioritizationStrategy::Balanced,
58 enable_adaptive_priority: true,
59 buffer_size_kb: 64,
60 enable_compression: true,
61 compression_level: 6,
62 session_timeout_seconds: 3600, frame_timeout_ms: 30000, }
65 }
66}
67
68#[derive(Debug, Clone)]
71pub struct StreamSession {
72 pub current_latency_ms: f64,
74
75 pub current_bandwidth_mbps: f64,
77
78 pub current_error_rate: f64,
80
81 pub current_cpu_usage: f64,
83
84 pub current_memory_usage_percent: f64,
86
87 pub active_connection_count: usize,
89
90 pub frames_processed: u64,
92
93 pub bytes_transferred: u64,
95
96 pub session_start_time: std::time::Instant,
98
99 pub last_metrics_update: std::time::Instant,
101
102 pub priority_override: Option<crate::domain::value_objects::Priority>,
104
105 pub performance_trend: PerformanceTrend,
107
108 pub adaptive_adjustments: Vec<AdaptiveAdjustment>,
110}
111
112impl Default for StreamSession {
113 fn default() -> Self {
114 let now = std::time::Instant::now();
115 Self {
116 current_latency_ms: 100.0,
117 current_bandwidth_mbps: 10.0,
118 current_error_rate: 0.01,
119 current_cpu_usage: 0.5,
120 current_memory_usage_percent: 60.0,
121 active_connection_count: 1,
122 frames_processed: 0,
123 bytes_transferred: 0,
124 session_start_time: now,
125 last_metrics_update: now,
126 priority_override: None,
127 performance_trend: PerformanceTrend::Stable,
128 adaptive_adjustments: Vec::new(),
129 }
130 }
131}
132
133#[derive(Debug, Clone, PartialEq)]
135pub enum PerformanceTrend {
136 Improving,
137 Stable,
138 Degrading,
139}
140
141#[derive(Debug, Clone)]
143pub struct AdaptiveAdjustment {
144 pub timestamp: std::time::Instant,
145 pub adjustment_type: AdjustmentType,
146 pub reason: String,
147 pub impact_estimate: f64, }
149
150#[derive(Debug, Clone)]
151pub enum AdjustmentType {
152 PriorityIncrease(u8),
153 PriorityDecrease(u8),
154 BufferSizeChange(usize),
155 CompressionToggle(bool),
156 ThrottleEnable,
157 ThrottleDisable,
158}
159
160pub use super::prioritization_service::PrioritizationStrategy;
162
163#[derive(Debug)]
166pub struct StreamContext<'a> {
167 pub config: &'a StreamConfig,
168 pub session: &'a StreamSession,
169}
170
171impl<'a> StreamContext<'a> {
172 pub fn new(config: &'a StreamConfig, session: &'a StreamSession) -> Self {
173 Self { config, session }
174 }
175
176 pub fn is_threshold_exceeded(&self) -> bool {
178 self.session.current_latency_ms > self.config.latency_threshold_ms
179 || self.session.current_error_rate > self.config.max_error_rate
180 || self.session.current_cpu_usage > self.config.cpu_usage_threshold
181 || self.session.current_memory_usage_percent
182 > self.config.memory_usage_threshold_percent
183 }
184
185 pub fn performance_score(&self) -> f64 {
187 let latency_score = (self.config.latency_threshold_ms
188 - self
189 .session
190 .current_latency_ms
191 .min(self.config.latency_threshold_ms))
192 / self.config.latency_threshold_ms;
193 let error_score =
194 1.0 - (self.session.current_error_rate / self.config.max_error_rate).min(1.0);
195 let cpu_score =
196 1.0 - (self.session.current_cpu_usage / self.config.cpu_usage_threshold).min(1.0);
197 let memory_score = 1.0
198 - (self.session.current_memory_usage_percent
199 / self.config.memory_usage_threshold_percent)
200 .min(1.0);
201
202 (latency_score + error_score + cpu_score + memory_score) / 4.0
203 }
204
205 pub fn session_duration(&self) -> std::time::Duration {
207 self.session
208 .last_metrics_update
209 .duration_since(self.session.session_start_time)
210 }
211
212 pub fn should_timeout(&self) -> bool {
214 self.session_duration().as_secs() > self.config.session_timeout_seconds
215 }
216}
217
218impl StreamSession {
220 pub fn from_performance_context(
222 ctx: &super::prioritization_service::PerformanceContext,
223 ) -> Self {
224 Self {
225 current_latency_ms: ctx.average_latency_ms,
226 current_bandwidth_mbps: ctx.available_bandwidth_mbps,
227 current_error_rate: ctx.error_rate,
228 current_cpu_usage: ctx.cpu_usage,
229 current_memory_usage_percent: ctx.memory_usage_percent,
230 active_connection_count: ctx.connection_count,
231 ..Default::default()
232 }
233 }
234
235 pub fn to_performance_context(&self) -> super::prioritization_service::PerformanceContext {
237 super::prioritization_service::PerformanceContext {
238 average_latency_ms: self.current_latency_ms,
239 available_bandwidth_mbps: self.current_bandwidth_mbps,
240 error_rate: self.current_error_rate,
241 cpu_usage: self.current_cpu_usage,
242 memory_usage_percent: self.current_memory_usage_percent,
243 connection_count: self.active_connection_count,
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 #[test]
253 fn test_stream_config_default() {
254 let config = StreamConfig::default();
255 assert_eq!(config.max_bandwidth_mbps, 100.0);
256 assert_eq!(config.latency_threshold_ms, 500.0);
257 assert!(config.enable_adaptive_priority);
258 }
259
260 #[test]
261 fn test_stream_session_default() {
262 let session = StreamSession::default();
263 assert_eq!(session.frames_processed, 0);
264 assert_eq!(session.bytes_transferred, 0);
265 assert_eq!(session.performance_trend, PerformanceTrend::Stable);
266 }
267
268 #[test]
269 fn test_stream_context_threshold_check() {
270 let config = StreamConfig::default();
271 let mut session = StreamSession::default();
272
273 let context = StreamContext::new(&config, &session);
275 assert!(!context.is_threshold_exceeded());
276
277 session.current_latency_ms = 600.0; let context = StreamContext::new(&config, &session);
280 assert!(context.is_threshold_exceeded());
281 }
282
283 #[test]
284 fn test_performance_score_calculation() {
285 let config = StreamConfig::default();
286 let session = StreamSession::default(); let context = StreamContext::new(&config, &session);
289 let score = context.performance_score();
290
291 assert!(score > 0.5);
293 assert!(score <= 1.0);
294 }
295
296 #[test]
297 fn test_migration_from_performance_context() {
298 let perf_ctx = crate::application::services::prioritization_service::PerformanceContext {
299 average_latency_ms: 200.0,
300 available_bandwidth_mbps: 50.0,
301 error_rate: 0.02,
302 cpu_usage: 0.7,
303 memory_usage_percent: 75.0,
304 connection_count: 5,
305 };
306
307 let session = StreamSession::from_performance_context(&perf_ctx);
308
309 assert_eq!(session.current_latency_ms, 200.0);
310 assert_eq!(session.current_bandwidth_mbps, 50.0);
311 assert_eq!(session.current_error_rate, 0.02);
312 assert_eq!(session.current_cpu_usage, 0.7);
313 assert_eq!(session.current_memory_usage_percent, 75.0);
314 assert_eq!(session.active_connection_count, 5);
315
316 let converted_back = session.to_performance_context();
318 assert_eq!(
319 converted_back.average_latency_ms,
320 perf_ctx.average_latency_ms
321 );
322 assert_eq!(
323 converted_back.available_bandwidth_mbps,
324 perf_ctx.available_bandwidth_mbps
325 );
326 }
327
328 #[test]
329 fn test_timeout_detection() {
330 let config = StreamConfig {
331 session_timeout_seconds: 1, ..StreamConfig::default()
333 };
334
335 let mut session = StreamSession {
336 session_start_time: std::time::Instant::now() - std::time::Duration::from_secs(2),
337 ..StreamSession::default()
338 };
339 session.last_metrics_update = std::time::Instant::now();
340
341 let context = StreamContext::new(&config, &session);
342 assert!(context.should_timeout());
343 }
344}