pjson_rs/application/services/
prioritization_service.rs

1//! Service responsible for priority calculation and adaptation
2//!
3//! This service focuses solely on determining the optimal priority thresholds
4//! for streaming operations based on performance context and business rules.
5
6use crate::{application::ApplicationResult, domain::value_objects::Priority};
7
8/// Context information for priority calculations
9#[derive(Debug, Clone)]
10pub struct PerformanceContext {
11    pub average_latency_ms: f64,
12    pub available_bandwidth_mbps: f64,
13    pub error_rate: f64,
14    pub cpu_usage: f64,
15    pub memory_usage_percent: f64,
16    pub connection_count: usize,
17}
18
19impl Default for PerformanceContext {
20    fn default() -> Self {
21        Self {
22            average_latency_ms: 100.0,
23            available_bandwidth_mbps: 10.0,
24            error_rate: 0.01,
25            cpu_usage: 0.5,
26            memory_usage_percent: 60.0,
27            connection_count: 1,
28        }
29    }
30}
31
32/// Strategies for priority calculation
33#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
34pub enum PrioritizationStrategy {
35    /// Conservative - prioritize stability over performance
36    Conservative,
37    /// Balanced - balance between performance and stability
38    Balanced,
39    /// Aggressive - prioritize performance over stability
40    Aggressive,
41    /// Custom - use custom rules
42    Custom(CustomPriorityRules),
43}
44
45/// Custom priority calculation rules
46#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
47pub struct CustomPriorityRules {
48    pub latency_threshold_ms: f64,
49    pub bandwidth_threshold_mbps: f64,
50    pub error_rate_threshold: f64,
51    pub priority_boost_on_error: u8,
52    pub priority_reduction_on_good_performance: u8,
53}
54
55impl Default for CustomPriorityRules {
56    fn default() -> Self {
57        Self {
58            latency_threshold_ms: 500.0,
59            bandwidth_threshold_mbps: 5.0,
60            error_rate_threshold: 0.03,
61            priority_boost_on_error: 20,
62            priority_reduction_on_good_performance: 10,
63        }
64    }
65}
66
67/// Result of priority calculation
68#[derive(Debug, Clone)]
69pub struct PriorityCalculationResult {
70    pub calculated_priority: Priority,
71    pub reasoning: Vec<String>,
72    pub confidence_score: f64,
73    pub strategy_used: PrioritizationStrategy,
74}
75
76/// Service for calculating and adapting priorities
77#[derive(Debug)]
78pub struct PrioritizationService {
79    strategy: PrioritizationStrategy,
80}
81
82impl PrioritizationService {
83    pub fn new(strategy: PrioritizationStrategy) -> Self {
84        Self { strategy }
85    }
86
87    /// Calculate adaptive priority based on current performance context
88    pub fn calculate_adaptive_priority(
89        &self,
90        context: &PerformanceContext,
91    ) -> ApplicationResult<PriorityCalculationResult> {
92        let mut reasoning = Vec::new();
93
94        let priority = match &self.strategy {
95            PrioritizationStrategy::Conservative => {
96                self.calculate_conservative_priority(context, &mut reasoning)?
97            }
98            PrioritizationStrategy::Balanced => {
99                self.calculate_balanced_priority(context, &mut reasoning)?
100            }
101            PrioritizationStrategy::Aggressive => {
102                self.calculate_aggressive_priority(context, &mut reasoning)?
103            }
104            PrioritizationStrategy::Custom(rules) => {
105                self.calculate_custom_priority(context, rules, &mut reasoning)?
106            }
107        };
108
109        // Adjust confidence based on context stability
110        let confidence = self.calculate_confidence_score(context);
111
112        Ok(PriorityCalculationResult {
113            calculated_priority: priority,
114            reasoning,
115            confidence_score: confidence,
116            strategy_used: self.strategy.clone(),
117        })
118    }
119
120    /// Calculate priority for global multi-stream optimization
121    pub fn calculate_global_priority(
122        &self,
123        context: &PerformanceContext,
124        stream_count: usize,
125    ) -> ApplicationResult<PriorityCalculationResult> {
126        let mut base_result = self.calculate_adaptive_priority(context)?;
127
128        // Adjust for multi-stream scenarios
129        let stream_factor = match stream_count {
130            1..=3 => 0,
131            4..=10 => 10,
132            11..=50 => 20,
133            _ => 30,
134        };
135
136        base_result.calculated_priority =
137            base_result.calculated_priority.increase_by(stream_factor);
138        base_result.reasoning.push(format!(
139            "Increased priority by {stream_factor} for {stream_count} concurrent streams"
140        ));
141
142        Ok(base_result)
143    }
144
145    /// Calculate priority adjustments based on streaming metrics
146    pub fn analyze_priority_adjustments(
147        &self,
148        metrics: &StreamingMetrics,
149    ) -> ApplicationResult<Vec<PriorityAdjustment>> {
150        let mut adjustments = Vec::new();
151
152        // Analyze latency-based adjustments
153        if let Some(adjustment) = self.analyze_latency_adjustment(metrics)? {
154            adjustments.push(adjustment);
155        }
156
157        // Analyze throughput-based adjustments
158        if let Some(adjustment) = self.analyze_throughput_adjustment(metrics)? {
159            adjustments.push(adjustment);
160        }
161
162        // Analyze error-rate-based adjustments
163        if let Some(adjustment) = self.analyze_error_rate_adjustment(metrics)? {
164            adjustments.push(adjustment);
165        }
166
167        Ok(adjustments)
168    }
169
170    /// Update prioritization strategy dynamically
171    pub fn update_strategy(&mut self, new_strategy: PrioritizationStrategy) {
172        self.strategy = new_strategy;
173    }
174
175    // Private implementation methods
176
177    fn calculate_conservative_priority(
178        &self,
179        context: &PerformanceContext,
180        reasoning: &mut Vec<String>,
181    ) -> ApplicationResult<Priority> {
182        let mut priority = Priority::HIGH; // Start conservatively high
183
184        // Conservative approach: err on the side of higher priority
185        if context.error_rate > 0.02 {
186            priority = Priority::CRITICAL;
187            reasoning.push("Conservative: High error rate detected".to_string());
188        }
189
190        if context.average_latency_ms > 500.0 {
191            priority = Priority::CRITICAL;
192            reasoning.push("Conservative: High latency detected".to_string());
193        }
194
195        if context.cpu_usage > 0.7 {
196            priority = priority.increase_by(10);
197            reasoning.push("Conservative: High CPU usage".to_string());
198        }
199
200        Ok(priority)
201    }
202
203    fn calculate_balanced_priority(
204        &self,
205        context: &PerformanceContext,
206        reasoning: &mut Vec<String>,
207    ) -> ApplicationResult<Priority> {
208        let mut priority = Priority::MEDIUM; // Start balanced
209        reasoning.push("Balanced: Starting with medium priority".to_string());
210
211        // Latency adjustments
212        if context.average_latency_ms > 1000.0 {
213            priority = Priority::HIGH;
214            reasoning.push("Balanced: High latency - prioritizing critical data".to_string());
215        } else if context.average_latency_ms < 100.0 {
216            priority = Priority::LOW;
217            reasoning.push("Balanced: Low latency - can send more data".to_string());
218        }
219
220        // Bandwidth adjustments
221        if context.available_bandwidth_mbps < 1.0 {
222            priority = priority.increase_by(20);
223            reasoning.push("Balanced: Limited bandwidth - being more selective".to_string());
224        } else if context.available_bandwidth_mbps > 10.0 {
225            priority = priority.decrease_by(10);
226            reasoning.push("Balanced: Good bandwidth - can send more data".to_string());
227        }
228
229        // Error rate adjustments
230        if context.error_rate > 0.05 {
231            priority = priority.increase_by(30);
232            reasoning.push("Balanced: High error rate - much more selective".to_string());
233        }
234
235        Ok(priority)
236    }
237
238    fn calculate_aggressive_priority(
239        &self,
240        context: &PerformanceContext,
241        reasoning: &mut Vec<String>,
242    ) -> ApplicationResult<Priority> {
243        let mut priority = Priority::LOW; // Start aggressively low
244
245        // Aggressive approach: only increase priority when absolutely necessary
246        if context.error_rate > 0.1 {
247            priority = Priority::HIGH;
248            reasoning.push("Aggressive: Very high error rate - must prioritize".to_string());
249        } else if context.error_rate > 0.05 {
250            priority = Priority::MEDIUM;
251            reasoning.push("Aggressive: High error rate - moderate prioritization".to_string());
252        }
253
254        if context.average_latency_ms > 2000.0 {
255            priority = Priority::HIGH;
256            reasoning.push("Aggressive: Extremely high latency".to_string());
257        }
258
259        if context.available_bandwidth_mbps < 0.5 {
260            priority = priority.increase_by(40);
261            reasoning.push("Aggressive: Very limited bandwidth".to_string());
262        }
263
264        Ok(priority)
265    }
266
267    fn calculate_custom_priority(
268        &self,
269        context: &PerformanceContext,
270        rules: &CustomPriorityRules,
271        reasoning: &mut Vec<String>,
272    ) -> ApplicationResult<Priority> {
273        let mut priority = Priority::MEDIUM;
274
275        if context.average_latency_ms > rules.latency_threshold_ms {
276            priority = priority.increase_by(rules.priority_boost_on_error);
277            reasoning.push(format!(
278                "Custom: Latency {:.1}ms exceeds threshold {:.1}ms",
279                context.average_latency_ms, rules.latency_threshold_ms
280            ));
281        }
282
283        if context.available_bandwidth_mbps < rules.bandwidth_threshold_mbps {
284            priority = priority.increase_by(rules.priority_boost_on_error);
285            reasoning.push(format!(
286                "Custom: Bandwidth {:.1}Mbps below threshold {:.1}Mbps",
287                context.available_bandwidth_mbps, rules.bandwidth_threshold_mbps
288            ));
289        }
290
291        if context.error_rate > rules.error_rate_threshold {
292            priority = priority.increase_by(rules.priority_boost_on_error);
293            reasoning.push(format!(
294                "Custom: Error rate {:.3} exceeds threshold {:.3}",
295                context.error_rate, rules.error_rate_threshold
296            ));
297        }
298
299        // Apply reduction for good performance
300        if context.average_latency_ms < rules.latency_threshold_ms / 2.0
301            && context.available_bandwidth_mbps > rules.bandwidth_threshold_mbps * 2.0
302            && context.error_rate < rules.error_rate_threshold / 2.0
303        {
304            priority = priority.decrease_by(rules.priority_reduction_on_good_performance);
305            reasoning.push("Custom: Excellent performance - reducing priority".to_string());
306        }
307
308        Ok(priority)
309    }
310
311    fn calculate_confidence_score(&self, context: &PerformanceContext) -> f64 {
312        let mut confidence: f64 = 1.0;
313
314        // Reduce confidence based on volatility indicators
315        if context.error_rate > 0.1 {
316            confidence *= 0.7; // High error rate reduces confidence
317        }
318
319        if context.cpu_usage > 0.9 {
320            confidence *= 0.8; // High CPU usage may affect measurements
321        }
322
323        if context.connection_count > 100 {
324            confidence *= 0.9; // High load may affect accuracy
325        }
326
327        confidence.max(0.1) // Minimum confidence of 10%
328    }
329
330    fn analyze_latency_adjustment(
331        &self,
332        metrics: &StreamingMetrics,
333    ) -> ApplicationResult<Option<PriorityAdjustment>> {
334        if metrics.average_latency_ms > 1500.0 && metrics.p99_latency_ms > 3000.0 {
335            Ok(Some(PriorityAdjustment {
336                new_threshold: Priority::CRITICAL,
337                reason: format!(
338                    "Latency degradation: avg {:.1}ms, p99 {:.1}ms",
339                    metrics.average_latency_ms, metrics.p99_latency_ms
340                ),
341                confidence: 0.9,
342                urgency: AdjustmentUrgency::High,
343            }))
344        } else if metrics.average_latency_ms < 100.0 && metrics.p99_latency_ms < 200.0 {
345            Ok(Some(PriorityAdjustment {
346                new_threshold: Priority::LOW,
347                reason: format!(
348                    "Excellent latency: avg {:.1}ms, p99 {:.1}ms",
349                    metrics.average_latency_ms, metrics.p99_latency_ms
350                ),
351                confidence: 0.8,
352                urgency: AdjustmentUrgency::Low,
353            }))
354        } else {
355            Ok(None)
356        }
357    }
358
359    fn analyze_throughput_adjustment(
360        &self,
361        metrics: &StreamingMetrics,
362    ) -> ApplicationResult<Option<PriorityAdjustment>> {
363        if metrics.throughput_mbps < 1.0 && metrics.error_rate < 0.02 {
364            // Low throughput but no errors suggests we can be more aggressive
365            Ok(Some(PriorityAdjustment {
366                new_threshold: Priority::LOW,
367                reason: format!(
368                    "Low throughput {:.1}Mbps with good stability",
369                    metrics.throughput_mbps
370                ),
371                confidence: 0.7,
372                urgency: AdjustmentUrgency::Medium,
373            }))
374        } else if metrics.throughput_mbps > 50.0 {
375            // Very high throughput suggests we can be more selective
376            Ok(Some(PriorityAdjustment {
377                new_threshold: Priority::MEDIUM,
378                reason: format!(
379                    "High throughput {:.1}Mbps allows selectivity",
380                    metrics.throughput_mbps
381                ),
382                confidence: 0.8,
383                urgency: AdjustmentUrgency::Low,
384            }))
385        } else {
386            Ok(None)
387        }
388    }
389
390    fn analyze_error_rate_adjustment(
391        &self,
392        metrics: &StreamingMetrics,
393    ) -> ApplicationResult<Option<PriorityAdjustment>> {
394        if metrics.error_rate > 0.1 {
395            Ok(Some(PriorityAdjustment {
396                new_threshold: Priority::CRITICAL,
397                reason: format!("High error rate {:.1}%", metrics.error_rate * 100.0),
398                confidence: 0.95,
399                urgency: AdjustmentUrgency::Critical,
400            }))
401        } else if metrics.error_rate < 0.001 {
402            Ok(Some(PriorityAdjustment {
403                new_threshold: Priority::LOW,
404                reason: format!(
405                    "Excellent stability {:.3}% errors",
406                    metrics.error_rate * 100.0
407                ),
408                confidence: 0.8,
409                urgency: AdjustmentUrgency::Low,
410            }))
411        } else {
412            Ok(None)
413        }
414    }
415}
416
417impl Default for PrioritizationService {
418    fn default() -> Self {
419        Self::new(PrioritizationStrategy::Balanced)
420    }
421}
422
423// Supporting types
424
425/// Metrics for streaming performance analysis
426#[derive(Debug, Clone)]
427pub struct StreamingMetrics {
428    pub average_latency_ms: f64,
429    pub p50_latency_ms: f64,
430    pub p95_latency_ms: f64,
431    pub p99_latency_ms: f64,
432    pub throughput_mbps: f64,
433    pub error_rate: f64,
434    pub frames_sent: u64,
435    pub bytes_sent: u64,
436    pub connections_active: usize,
437}
438
439/// Recommended priority adjustment
440#[derive(Debug, Clone)]
441pub struct PriorityAdjustment {
442    pub new_threshold: Priority,
443    pub reason: String,
444    pub confidence: f64,
445    pub urgency: AdjustmentUrgency,
446}
447
448// Use shared AdjustmentUrgency type
449use crate::application::shared::AdjustmentUrgency;
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454
455    #[test]
456    fn test_conservative_strategy() {
457        let service = PrioritizationService::new(PrioritizationStrategy::Conservative);
458        let context = PerformanceContext {
459            average_latency_ms: 600.0,
460            error_rate: 0.03,
461            ..Default::default()
462        };
463
464        let result = service.calculate_adaptive_priority(&context).unwrap();
465        assert_eq!(result.calculated_priority, Priority::CRITICAL);
466        assert!(!result.reasoning.is_empty());
467    }
468
469    #[test]
470    fn test_balanced_strategy() {
471        let service = PrioritizationService::new(PrioritizationStrategy::Balanced);
472        let context = PerformanceContext::default();
473
474        let result = service.calculate_adaptive_priority(&context).unwrap();
475        assert!(result.confidence_score > 0.0);
476        assert!(!result.reasoning.is_empty());
477    }
478
479    #[test]
480    fn test_custom_strategy() {
481        let custom_rules = CustomPriorityRules {
482            latency_threshold_ms: 200.0,
483            bandwidth_threshold_mbps: 5.0,
484            error_rate_threshold: 0.02,
485            priority_boost_on_error: 25,
486            priority_reduction_on_good_performance: 15,
487        };
488
489        let service = PrioritizationService::new(PrioritizationStrategy::Custom(custom_rules));
490        let context = PerformanceContext {
491            average_latency_ms: 50.0,       // Good
492            available_bandwidth_mbps: 15.0, // Good
493            error_rate: 0.005,              // Good
494            ..Default::default()
495        };
496
497        let result = service.calculate_adaptive_priority(&context).unwrap();
498        // Should reduce priority due to excellent performance
499        assert!(result.calculated_priority <= Priority::MEDIUM);
500    }
501}