1use crate::{
4 application::{ApplicationResult, commands::*, handlers::CommandHandler},
5 domain::{
6 entities::Frame,
7 services::PriorityService,
8 value_objects::{Priority, SessionId, StreamId},
9 },
10};
11use std::sync::Arc;
12
13#[derive(Debug)]
15pub struct StreamingService<CH>
16where
17 CH: CommandHandler<GenerateFramesCommand, Vec<Frame>>
18 + CommandHandler<BatchGenerateFramesCommand, Vec<Frame>>
19 + CommandHandler<AdjustPriorityThresholdCommand, ()>,
20{
21 command_handler: Arc<CH>,
22 #[allow(dead_code)]
23 priority_service: Arc<PriorityService>,
24}
25
26impl<CH> StreamingService<CH>
27where
28 CH: CommandHandler<GenerateFramesCommand, Vec<Frame>>
29 + CommandHandler<BatchGenerateFramesCommand, Vec<Frame>>
30 + CommandHandler<AdjustPriorityThresholdCommand, ()>
31 + Send
32 + Sync,
33{
34 pub fn new(command_handler: Arc<CH>) -> Self {
35 Self {
36 command_handler,
37 priority_service: Arc::new(PriorityService::default()),
38 }
39 }
40
41 pub async fn generate_adaptive_frames(
43 &self,
44 session_id: SessionId,
45 stream_id: StreamId,
46 performance_context: &PerformanceContext,
47 ) -> ApplicationResult<AdaptiveFrameResult> {
48 let priority_threshold = self.calculate_adaptive_priority(performance_context);
50
51 let max_frames = self.calculate_optimal_batch_size(performance_context);
53
54 let command = GenerateFramesCommand {
56 session_id: session_id.into(),
57 stream_id: stream_id.into(),
58 priority_threshold: priority_threshold.into(),
59 max_frames,
60 };
61
62 let frames = self.command_handler.handle(command).await?;
63
64 Ok(AdaptiveFrameResult {
65 frames,
66 priority_threshold_used: priority_threshold,
67 batch_size_used: max_frames,
68 adaptation_reason: self.get_adaptation_reason(performance_context),
69 })
70 }
71
72 pub async fn generate_cross_stream_optimized_frames(
74 &self,
75 session_id: SessionId,
76 performance_context: &PerformanceContext,
77 ) -> ApplicationResult<CrossStreamFrameResult> {
78 let priority_threshold = self.calculate_global_priority_threshold(performance_context);
80
81 let max_frames = self.calculate_total_frame_budget(performance_context);
83
84 let command = BatchGenerateFramesCommand {
86 session_id: session_id.into(),
87 priority_threshold: priority_threshold.into(),
88 max_frames,
89 };
90
91 let frames = self.command_handler.handle(command).await?;
92
93 let frame_distribution = self.analyze_frame_distribution(&frames);
95
96 let optimization_metrics =
97 self.calculate_optimization_metrics(&frames, performance_context);
98
99 Ok(CrossStreamFrameResult {
100 frames,
101 priority_threshold_used: priority_threshold,
102 total_frames: max_frames,
103 frame_distribution,
104 optimization_metrics,
105 })
106 }
107
108 pub async fn auto_adjust_priorities(
110 &self,
111 session_id: SessionId,
112 streaming_metrics: &StreamingMetrics,
113 ) -> ApplicationResult<PriorityAdjustmentResult> {
114 let mut adjustments = Vec::new();
115
116 if let Some(adjustment) = self.analyze_latency_adjustment(streaming_metrics) {
118 let command = AdjustPriorityThresholdCommand {
119 session_id: session_id.into(),
120 new_threshold: adjustment.new_threshold.into(),
121 reason: adjustment.reason.clone(),
122 };
123
124 self.command_handler.handle(command).await?;
125 adjustments.push(adjustment);
126 }
127
128 if let Some(adjustment) = self.analyze_throughput_adjustment(streaming_metrics) {
130 let command = AdjustPriorityThresholdCommand {
131 session_id: session_id.into(),
132 new_threshold: adjustment.new_threshold.into(),
133 reason: adjustment.reason.clone(),
134 };
135
136 self.command_handler.handle(command).await?;
137 adjustments.push(adjustment);
138 }
139
140 if let Some(adjustment) = self.analyze_error_rate_adjustment(streaming_metrics) {
142 let command = AdjustPriorityThresholdCommand {
143 session_id: session_id.into(),
144 new_threshold: adjustment.new_threshold.into(),
145 reason: adjustment.reason.clone(),
146 };
147
148 self.command_handler.handle(command).await?;
149 adjustments.push(adjustment);
150 }
151
152 Ok(PriorityAdjustmentResult {
153 adjustments,
154 metrics_analyzed: streaming_metrics.clone(),
155 })
156 }
157
158 pub async fn optimize_for_use_case(
160 &self,
161 session_id: SessionId,
162 use_case: StreamingUseCase,
163 ) -> ApplicationResult<UseCaseOptimizationResult> {
164 let optimization_strategy = match use_case {
165 StreamingUseCase::RealTimeDashboard => {
166 OptimizationStrategy {
167 priority_threshold: Priority::HIGH,
168 max_frame_size: 16 * 1024, batch_size: 5,
170 description: "Optimized for real-time dashboard updates".to_string(),
171 }
172 }
173 StreamingUseCase::BulkDataTransfer => {
174 OptimizationStrategy {
175 priority_threshold: Priority::MEDIUM,
176 max_frame_size: 256 * 1024, batch_size: 20,
178 description: "Optimized for bulk data transfer efficiency".to_string(),
179 }
180 }
181 StreamingUseCase::MobileApp => {
182 OptimizationStrategy {
183 priority_threshold: Priority::HIGH,
184 max_frame_size: 8 * 1024, batch_size: 3,
186 description: "Optimized for mobile network constraints".to_string(),
187 }
188 }
189 StreamingUseCase::ProgressiveWebApp => {
190 OptimizationStrategy {
191 priority_threshold: Priority::CRITICAL,
192 max_frame_size: 32 * 1024, batch_size: 8,
194 description: "Optimized for progressive web app UX".to_string(),
195 }
196 }
197 };
198
199 let command = BatchGenerateFramesCommand {
201 session_id: session_id.into(),
202 priority_threshold: optimization_strategy.priority_threshold.into(),
203 max_frames: optimization_strategy.batch_size,
204 };
205
206 let frames = self.command_handler.handle(command).await?;
207
208 Ok(UseCaseOptimizationResult {
209 use_case,
210 strategy_applied: optimization_strategy,
211 frames_generated: frames,
212 })
213 }
214
215 fn calculate_adaptive_priority(&self, context: &PerformanceContext) -> Priority {
217 let mut priority = Priority::MEDIUM;
218
219 if context.average_latency_ms > 1000.0 {
221 priority = Priority::HIGH; } else if context.average_latency_ms < 100.0 {
223 priority = Priority::LOW; }
225
226 if context.available_bandwidth_mbps < 1.0 {
228 priority = priority.increase_by(20); } else if context.available_bandwidth_mbps > 10.0 {
230 priority = priority.decrease_by(10); }
232
233 if context.error_rate > 0.05 {
235 priority = priority.increase_by(30); }
237
238 priority
239 }
240
241 fn calculate_optimal_batch_size(&self, context: &PerformanceContext) -> usize {
243 let base_size = 10;
244
245 let latency_factor = if context.average_latency_ms < 50.0 {
247 0.5
248 } else if context.average_latency_ms > 500.0 {
249 2.0
250 } else {
251 1.0
252 };
253
254 let bandwidth_factor = (context.available_bandwidth_mbps / 5.0).clamp(0.2, 3.0);
256
257 let cpu_factor = if context.cpu_usage > 0.8 {
259 0.7 } else {
261 1.0
262 };
263
264 ((base_size as f64) * latency_factor * bandwidth_factor * cpu_factor) as usize
265 }
266
267 fn calculate_global_priority_threshold(&self, context: &PerformanceContext) -> Priority {
269 let individual_threshold = self.calculate_adaptive_priority(context);
271 individual_threshold.increase_by(10)
272 }
273
274 fn calculate_total_frame_budget(&self, context: &PerformanceContext) -> usize {
276 let individual_budget = self.calculate_optimal_batch_size(context);
277 (individual_budget as f64 * 1.5) as usize }
279
280 fn get_adaptation_reason(&self, context: &PerformanceContext) -> String {
282 let mut reasons = Vec::new();
283
284 if context.average_latency_ms > 1000.0 {
285 reasons.push("High latency detected".to_string());
286 }
287
288 if context.available_bandwidth_mbps < 1.0 {
289 reasons.push("Limited bandwidth".to_string());
290 }
291
292 if context.error_rate > 0.05 {
293 reasons.push("High error rate".to_string());
294 }
295
296 if context.cpu_usage > 0.8 {
297 reasons.push("High CPU usage".to_string());
298 }
299
300 if reasons.is_empty() {
301 "Optimal conditions".to_string()
302 } else {
303 reasons.join(", ")
304 }
305 }
306
307 fn analyze_frame_distribution(&self, frames: &[Frame]) -> FrameDistribution {
309 let mut critical = 0;
310 let mut high = 0;
311 let mut medium = 0;
312 let mut low = 0;
313 let mut background = 0;
314
315 for frame in frames {
316 match frame.priority() {
317 p if p >= Priority::CRITICAL => critical += 1,
318 p if p >= Priority::HIGH => high += 1,
319 p if p >= Priority::MEDIUM => medium += 1,
320 p if p >= Priority::LOW => low += 1,
321 _ => background += 1,
322 }
323 }
324
325 FrameDistribution {
326 critical,
327 high,
328 medium,
329 low,
330 background,
331 }
332 }
333
334 fn calculate_optimization_metrics(
336 &self,
337 frames: &[Frame],
338 context: &PerformanceContext,
339 ) -> OptimizationMetrics {
340 let total_size: usize = frames.iter().map(|f| f.estimated_size()).sum();
341 let average_priority: f64 = frames
342 .iter()
343 .map(|f| f.priority().value() as f64)
344 .sum::<f64>()
345 / frames.len() as f64;
346
347 let estimated_transfer_time =
348 total_size as f64 / (context.available_bandwidth_mbps * 125_000.0); OptimizationMetrics {
351 total_frames: frames.len(),
352 total_bytes: total_size,
353 average_priority,
354 estimated_transfer_time_seconds: estimated_transfer_time,
355 compression_ratio: 1.0, }
357 }
358
359 fn analyze_latency_adjustment(&self, metrics: &StreamingMetrics) -> Option<PriorityAdjustment> {
361 if metrics.average_latency_ms > 2000.0 {
362 Some(PriorityAdjustment {
363 new_threshold: Priority::CRITICAL,
364 reason: format!("Latency too high: {}ms", metrics.average_latency_ms),
365 impact: "Reducing data volume for latency".to_string(),
366 })
367 } else if metrics.average_latency_ms < 50.0 && metrics.throughput_mbps > 5.0 {
368 Some(PriorityAdjustment {
369 new_threshold: Priority::LOW,
370 reason: format!("Excellent latency: {}ms", metrics.average_latency_ms),
371 impact: "Increasing data volume for throughput".to_string(),
372 })
373 } else {
374 None
375 }
376 }
377
378 fn analyze_throughput_adjustment(
380 &self,
381 metrics: &StreamingMetrics,
382 ) -> Option<PriorityAdjustment> {
383 if metrics.throughput_mbps < 0.5 {
384 Some(PriorityAdjustment {
385 new_threshold: Priority::HIGH,
386 reason: format!("Low throughput: {:.2} Mbps", metrics.throughput_mbps),
387 impact: "Prioritizing critical data only".to_string(),
388 })
389 } else {
390 None
391 }
392 }
393
394 fn analyze_error_rate_adjustment(
396 &self,
397 metrics: &StreamingMetrics,
398 ) -> Option<PriorityAdjustment> {
399 if metrics.error_rate > 0.1 {
400 Some(PriorityAdjustment {
401 new_threshold: Priority::CRITICAL,
402 reason: format!("High error rate: {:.1}%", metrics.error_rate * 100.0),
403 impact: "Sending only most critical data".to_string(),
404 })
405 } else {
406 None
407 }
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct PerformanceContext {
414 pub average_latency_ms: f64,
415 pub available_bandwidth_mbps: f64,
416 pub error_rate: f64,
417 pub cpu_usage: f64,
418 pub memory_usage: f64,
419}
420
421#[derive(Debug, Clone)]
423pub struct StreamingMetrics {
424 pub average_latency_ms: f64,
425 pub throughput_mbps: f64,
426 pub error_rate: f64,
427 pub frames_per_second: f64,
428 pub active_streams: usize,
429}
430
431#[derive(Debug, Clone)]
433pub enum StreamingUseCase {
434 RealTimeDashboard,
435 BulkDataTransfer,
436 MobileApp,
437 ProgressiveWebApp,
438}
439
440#[derive(Debug, Clone)]
442pub struct OptimizationStrategy {
443 pub priority_threshold: Priority,
444 pub max_frame_size: usize,
445 pub batch_size: usize,
446 pub description: String,
447}
448
449#[derive(Debug, Clone)]
451pub struct AdaptiveFrameResult {
452 pub frames: Vec<Frame>,
453 pub priority_threshold_used: Priority,
454 pub batch_size_used: usize,
455 pub adaptation_reason: String,
456}
457
458#[derive(Debug, Clone)]
460pub struct CrossStreamFrameResult {
461 pub frames: Vec<Frame>,
462 pub priority_threshold_used: Priority,
463 pub total_frames: usize,
464 pub frame_distribution: FrameDistribution,
465 pub optimization_metrics: OptimizationMetrics,
466}
467
468#[derive(Debug, Clone)]
470pub struct FrameDistribution {
471 pub critical: usize,
472 pub high: usize,
473 pub medium: usize,
474 pub low: usize,
475 pub background: usize,
476}
477
478#[derive(Debug, Clone)]
480pub struct OptimizationMetrics {
481 pub total_frames: usize,
482 pub total_bytes: usize,
483 pub average_priority: f64,
484 pub estimated_transfer_time_seconds: f64,
485 pub compression_ratio: f64,
486}
487
488#[derive(Debug, Clone)]
490pub struct PriorityAdjustment {
491 pub new_threshold: Priority,
492 pub reason: String,
493 pub impact: String,
494}
495
496#[derive(Debug, Clone)]
498pub struct PriorityAdjustmentResult {
499 pub adjustments: Vec<PriorityAdjustment>,
500 pub metrics_analyzed: StreamingMetrics,
501}
502
503#[derive(Debug, Clone)]
505pub struct UseCaseOptimizationResult {
506 pub use_case: StreamingUseCase,
507 pub strategy_applied: OptimizationStrategy,
508 pub frames_generated: Vec<Frame>,
509}