pjson_rs/application/services/
streaming_service.rs

1//! High-level streaming orchestration service
2
3use crate::{
4    application::{ApplicationResult, commands::*, handlers::CommandHandler},
5    domain::{
6        entities::Frame,
7        services::PriorityService,
8        value_objects::{Priority, SessionId, StreamId},
9    },
10};
11use std::sync::Arc;
12
13/// High-level service for streaming workflows and optimizations
14#[derive(Debug)]
15pub struct StreamingService<CH>
16where
17    CH: CommandHandler<GenerateFramesCommand, Vec<Frame>>
18        + CommandHandler<BatchGenerateFramesCommand, Vec<Frame>>
19        + CommandHandler<AdjustPriorityThresholdCommand, ()>,
20{
21    command_handler: Arc<CH>,
22    #[allow(dead_code)]
23    priority_service: Arc<PriorityService>,
24}
25
26impl<CH> StreamingService<CH>
27where
28    CH: CommandHandler<GenerateFramesCommand, Vec<Frame>>
29        + CommandHandler<BatchGenerateFramesCommand, Vec<Frame>>
30        + CommandHandler<AdjustPriorityThresholdCommand, ()>
31        + Send
32        + Sync,
33{
34    pub fn new(command_handler: Arc<CH>) -> Self {
35        Self {
36            command_handler,
37            priority_service: Arc::new(PriorityService::default()),
38        }
39    }
40
41    /// Generate optimized frames for a single stream based on adaptive priority
42    pub async fn generate_adaptive_frames(
43        &self,
44        session_id: SessionId,
45        stream_id: StreamId,
46        performance_context: &PerformanceContext,
47    ) -> ApplicationResult<AdaptiveFrameResult> {
48        // Calculate adaptive priority threshold based on performance
49        let priority_threshold = self.calculate_adaptive_priority(performance_context);
50
51        // Calculate optimal batch size based on network conditions
52        let max_frames = self.calculate_optimal_batch_size(performance_context);
53
54        // Generate frames
55        let command = GenerateFramesCommand {
56            session_id: session_id.into(),
57            stream_id: stream_id.into(),
58            priority_threshold: priority_threshold.into(),
59            max_frames,
60        };
61
62        let frames = self.command_handler.handle(command).await?;
63
64        Ok(AdaptiveFrameResult {
65            frames,
66            priority_threshold_used: priority_threshold,
67            batch_size_used: max_frames,
68            adaptation_reason: self.get_adaptation_reason(performance_context),
69        })
70    }
71
72    /// Generate priority-optimized frames across multiple streams
73    pub async fn generate_cross_stream_optimized_frames(
74        &self,
75        session_id: SessionId,
76        performance_context: &PerformanceContext,
77    ) -> ApplicationResult<CrossStreamFrameResult> {
78        // Calculate global priority threshold
79        let priority_threshold = self.calculate_global_priority_threshold(performance_context);
80
81        // Calculate total frame budget
82        let max_frames = self.calculate_total_frame_budget(performance_context);
83
84        // Generate cross-stream optimized batch
85        let command = BatchGenerateFramesCommand {
86            session_id: session_id.into(),
87            priority_threshold: priority_threshold.into(),
88            max_frames,
89        };
90
91        let frames = self.command_handler.handle(command).await?;
92
93        // Analyze the results
94        let frame_distribution = self.analyze_frame_distribution(&frames);
95
96        let optimization_metrics =
97            self.calculate_optimization_metrics(&frames, performance_context);
98
99        Ok(CrossStreamFrameResult {
100            frames,
101            priority_threshold_used: priority_threshold,
102            total_frames: max_frames,
103            frame_distribution,
104            optimization_metrics,
105        })
106    }
107
108    /// Automatically adjust priority thresholds based on streaming performance
109    pub async fn auto_adjust_priorities(
110        &self,
111        session_id: SessionId,
112        streaming_metrics: &StreamingMetrics,
113    ) -> ApplicationResult<PriorityAdjustmentResult> {
114        let mut adjustments = Vec::new();
115
116        // Analyze latency performance
117        if let Some(adjustment) = self.analyze_latency_adjustment(streaming_metrics) {
118            let command = AdjustPriorityThresholdCommand {
119                session_id: session_id.into(),
120                new_threshold: adjustment.new_threshold.into(),
121                reason: adjustment.reason.clone(),
122            };
123
124            self.command_handler.handle(command).await?;
125            adjustments.push(adjustment);
126        }
127
128        // Analyze throughput performance
129        if let Some(adjustment) = self.analyze_throughput_adjustment(streaming_metrics) {
130            let command = AdjustPriorityThresholdCommand {
131                session_id: session_id.into(),
132                new_threshold: adjustment.new_threshold.into(),
133                reason: adjustment.reason.clone(),
134            };
135
136            self.command_handler.handle(command).await?;
137            adjustments.push(adjustment);
138        }
139
140        // Analyze error rate performance
141        if let Some(adjustment) = self.analyze_error_rate_adjustment(streaming_metrics) {
142            let command = AdjustPriorityThresholdCommand {
143                session_id: session_id.into(),
144                new_threshold: adjustment.new_threshold.into(),
145                reason: adjustment.reason.clone(),
146            };
147
148            self.command_handler.handle(command).await?;
149            adjustments.push(adjustment);
150        }
151
152        Ok(PriorityAdjustmentResult {
153            adjustments,
154            metrics_analyzed: streaming_metrics.clone(),
155        })
156    }
157
158    /// Optimize streaming for specific use cases
159    pub async fn optimize_for_use_case(
160        &self,
161        session_id: SessionId,
162        use_case: StreamingUseCase,
163    ) -> ApplicationResult<UseCaseOptimizationResult> {
164        let optimization_strategy = match use_case {
165            StreamingUseCase::RealTimeDashboard => {
166                OptimizationStrategy {
167                    priority_threshold: Priority::HIGH,
168                    max_frame_size: 16 * 1024, // 16KB for low latency
169                    batch_size: 5,
170                    description: "Optimized for real-time dashboard updates".to_string(),
171                }
172            }
173            StreamingUseCase::BulkDataTransfer => {
174                OptimizationStrategy {
175                    priority_threshold: Priority::MEDIUM,
176                    max_frame_size: 256 * 1024, // 256KB for throughput
177                    batch_size: 20,
178                    description: "Optimized for bulk data transfer efficiency".to_string(),
179                }
180            }
181            StreamingUseCase::MobileApp => {
182                OptimizationStrategy {
183                    priority_threshold: Priority::HIGH,
184                    max_frame_size: 8 * 1024, // 8KB for mobile networks
185                    batch_size: 3,
186                    description: "Optimized for mobile network constraints".to_string(),
187                }
188            }
189            StreamingUseCase::ProgressiveWebApp => {
190                OptimizationStrategy {
191                    priority_threshold: Priority::CRITICAL,
192                    max_frame_size: 32 * 1024, // 32KB balanced
193                    batch_size: 8,
194                    description: "Optimized for progressive web app UX".to_string(),
195                }
196            }
197        };
198
199        // Apply optimization
200        let command = BatchGenerateFramesCommand {
201            session_id: session_id.into(),
202            priority_threshold: optimization_strategy.priority_threshold.into(),
203            max_frames: optimization_strategy.batch_size,
204        };
205
206        let frames = self.command_handler.handle(command).await?;
207
208        Ok(UseCaseOptimizationResult {
209            use_case,
210            strategy_applied: optimization_strategy,
211            frames_generated: frames,
212        })
213    }
214
215    /// Private: Calculate adaptive priority based on performance
216    fn calculate_adaptive_priority(&self, context: &PerformanceContext) -> Priority {
217        let mut priority = Priority::MEDIUM;
218
219        // Adjust based on latency
220        if context.average_latency_ms > 1000.0 {
221            priority = Priority::HIGH; // Only send high priority in high latency
222        } else if context.average_latency_ms < 100.0 {
223            priority = Priority::LOW; // Can afford to send more data
224        }
225
226        // Adjust based on bandwidth
227        if context.available_bandwidth_mbps < 1.0 {
228            priority = priority.increase_by(20); // Prioritize more aggressively
229        } else if context.available_bandwidth_mbps > 10.0 {
230            priority = priority.decrease_by(10); // Can send more data
231        }
232
233        // Adjust based on error rate
234        if context.error_rate > 0.05 {
235            priority = priority.increase_by(30); // Much more selective
236        }
237
238        priority
239    }
240
241    /// Private: Calculate optimal batch size
242    fn calculate_optimal_batch_size(&self, context: &PerformanceContext) -> usize {
243        let base_size = 10;
244
245        // Adjust based on latency (lower latency = smaller batches for responsiveness)
246        let latency_factor = if context.average_latency_ms < 50.0 {
247            0.5
248        } else if context.average_latency_ms > 500.0 {
249            2.0
250        } else {
251            1.0
252        };
253
254        // Adjust based on bandwidth
255        let bandwidth_factor = (context.available_bandwidth_mbps / 5.0).clamp(0.2, 3.0);
256
257        // Adjust based on CPU usage
258        let cpu_factor = if context.cpu_usage > 0.8 {
259            0.7 // Reduce batch size when CPU is high
260        } else {
261            1.0
262        };
263
264        ((base_size as f64) * latency_factor * bandwidth_factor * cpu_factor) as usize
265    }
266
267    /// Private: Calculate global priority threshold for multi-stream optimization
268    fn calculate_global_priority_threshold(&self, context: &PerformanceContext) -> Priority {
269        // More aggressive prioritization for global optimization
270        let individual_threshold = self.calculate_adaptive_priority(context);
271        individual_threshold.increase_by(10)
272    }
273
274    /// Private: Calculate total frame budget
275    fn calculate_total_frame_budget(&self, context: &PerformanceContext) -> usize {
276        let individual_budget = self.calculate_optimal_batch_size(context);
277        (individual_budget as f64 * 1.5) as usize // 50% more for multi-stream
278    }
279
280    /// Private: Get adaptation reason description
281    fn get_adaptation_reason(&self, context: &PerformanceContext) -> String {
282        let mut reasons = Vec::new();
283
284        if context.average_latency_ms > 1000.0 {
285            reasons.push("High latency detected".to_string());
286        }
287
288        if context.available_bandwidth_mbps < 1.0 {
289            reasons.push("Limited bandwidth".to_string());
290        }
291
292        if context.error_rate > 0.05 {
293            reasons.push("High error rate".to_string());
294        }
295
296        if context.cpu_usage > 0.8 {
297            reasons.push("High CPU usage".to_string());
298        }
299
300        if reasons.is_empty() {
301            "Optimal conditions".to_string()
302        } else {
303            reasons.join(", ")
304        }
305    }
306
307    /// Private: Analyze frame distribution
308    fn analyze_frame_distribution(&self, frames: &[Frame]) -> FrameDistribution {
309        let mut critical = 0;
310        let mut high = 0;
311        let mut medium = 0;
312        let mut low = 0;
313        let mut background = 0;
314
315        for frame in frames {
316            match frame.priority() {
317                p if p >= Priority::CRITICAL => critical += 1,
318                p if p >= Priority::HIGH => high += 1,
319                p if p >= Priority::MEDIUM => medium += 1,
320                p if p >= Priority::LOW => low += 1,
321                _ => background += 1,
322            }
323        }
324
325        FrameDistribution {
326            critical,
327            high,
328            medium,
329            low,
330            background,
331        }
332    }
333
334    /// Private: Calculate optimization metrics
335    fn calculate_optimization_metrics(
336        &self,
337        frames: &[Frame],
338        context: &PerformanceContext,
339    ) -> OptimizationMetrics {
340        let total_size: usize = frames.iter().map(|f| f.estimated_size()).sum();
341        let average_priority: f64 = frames
342            .iter()
343            .map(|f| f.priority().value() as f64)
344            .sum::<f64>()
345            / frames.len() as f64;
346
347        let estimated_transfer_time =
348            total_size as f64 / (context.available_bandwidth_mbps * 125_000.0); // Convert to bytes/sec
349
350        OptimizationMetrics {
351            total_frames: frames.len(),
352            total_bytes: total_size,
353            average_priority,
354            estimated_transfer_time_seconds: estimated_transfer_time,
355            compression_ratio: 1.0, // Would calculate actual compression
356        }
357    }
358
359    /// Private: Analyze latency-based adjustments
360    fn analyze_latency_adjustment(&self, metrics: &StreamingMetrics) -> Option<PriorityAdjustment> {
361        if metrics.average_latency_ms > 2000.0 {
362            Some(PriorityAdjustment {
363                new_threshold: Priority::CRITICAL,
364                reason: format!("Latency too high: {}ms", metrics.average_latency_ms),
365                impact: "Reducing data volume for latency".to_string(),
366            })
367        } else if metrics.average_latency_ms < 50.0 && metrics.throughput_mbps > 5.0 {
368            Some(PriorityAdjustment {
369                new_threshold: Priority::LOW,
370                reason: format!("Excellent latency: {}ms", metrics.average_latency_ms),
371                impact: "Increasing data volume for throughput".to_string(),
372            })
373        } else {
374            None
375        }
376    }
377
378    /// Private: Analyze throughput-based adjustments
379    fn analyze_throughput_adjustment(
380        &self,
381        metrics: &StreamingMetrics,
382    ) -> Option<PriorityAdjustment> {
383        if metrics.throughput_mbps < 0.5 {
384            Some(PriorityAdjustment {
385                new_threshold: Priority::HIGH,
386                reason: format!("Low throughput: {:.2} Mbps", metrics.throughput_mbps),
387                impact: "Prioritizing critical data only".to_string(),
388            })
389        } else {
390            None
391        }
392    }
393
394    /// Private: Analyze error rate adjustments
395    fn analyze_error_rate_adjustment(
396        &self,
397        metrics: &StreamingMetrics,
398    ) -> Option<PriorityAdjustment> {
399        if metrics.error_rate > 0.1 {
400            Some(PriorityAdjustment {
401                new_threshold: Priority::CRITICAL,
402                reason: format!("High error rate: {:.1}%", metrics.error_rate * 100.0),
403                impact: "Sending only most critical data".to_string(),
404            })
405        } else {
406            None
407        }
408    }
409}
410
411/// Performance context for adaptive streaming
412#[derive(Debug, Clone)]
413pub struct PerformanceContext {
414    pub average_latency_ms: f64,
415    pub available_bandwidth_mbps: f64,
416    pub error_rate: f64,
417    pub cpu_usage: f64,
418    pub memory_usage: f64,
419}
420
421/// Streaming metrics for analysis
422#[derive(Debug, Clone)]
423pub struct StreamingMetrics {
424    pub average_latency_ms: f64,
425    pub throughput_mbps: f64,
426    pub error_rate: f64,
427    pub frames_per_second: f64,
428    pub active_streams: usize,
429}
430
431/// Streaming use cases for optimization
432#[derive(Debug, Clone)]
433pub enum StreamingUseCase {
434    RealTimeDashboard,
435    BulkDataTransfer,
436    MobileApp,
437    ProgressiveWebApp,
438}
439
440/// Optimization strategy configuration
441#[derive(Debug, Clone)]
442pub struct OptimizationStrategy {
443    pub priority_threshold: Priority,
444    pub max_frame_size: usize,
445    pub batch_size: usize,
446    pub description: String,
447}
448
449/// Result of adaptive frame generation
450#[derive(Debug, Clone)]
451pub struct AdaptiveFrameResult {
452    pub frames: Vec<Frame>,
453    pub priority_threshold_used: Priority,
454    pub batch_size_used: usize,
455    pub adaptation_reason: String,
456}
457
458/// Result of cross-stream frame generation
459#[derive(Debug, Clone)]
460pub struct CrossStreamFrameResult {
461    pub frames: Vec<Frame>,
462    pub priority_threshold_used: Priority,
463    pub total_frames: usize,
464    pub frame_distribution: FrameDistribution,
465    pub optimization_metrics: OptimizationMetrics,
466}
467
468/// Frame distribution by priority
469#[derive(Debug, Clone)]
470pub struct FrameDistribution {
471    pub critical: usize,
472    pub high: usize,
473    pub medium: usize,
474    pub low: usize,
475    pub background: usize,
476}
477
478/// Optimization effectiveness metrics
479#[derive(Debug, Clone)]
480pub struct OptimizationMetrics {
481    pub total_frames: usize,
482    pub total_bytes: usize,
483    pub average_priority: f64,
484    pub estimated_transfer_time_seconds: f64,
485    pub compression_ratio: f64,
486}
487
488/// Priority adjustment recommendation
489#[derive(Debug, Clone)]
490pub struct PriorityAdjustment {
491    pub new_threshold: Priority,
492    pub reason: String,
493    pub impact: String,
494}
495
496/// Result of priority adjustment analysis
497#[derive(Debug, Clone)]
498pub struct PriorityAdjustmentResult {
499    pub adjustments: Vec<PriorityAdjustment>,
500    pub metrics_analyzed: StreamingMetrics,
501}
502
503/// Result of use case optimization
504#[derive(Debug, Clone)]
505pub struct UseCaseOptimizationResult {
506    pub use_case: StreamingUseCase,
507    pub strategy_applied: OptimizationStrategy,
508    pub frames_generated: Vec<Frame>,
509}