pjson_rs/application/services/
prioritization_service.rs1use crate::{application::ApplicationResult, domain::value_objects::Priority};
7
8#[derive(Debug, Clone)]
10pub struct PerformanceContext {
11 pub average_latency_ms: f64,
12 pub available_bandwidth_mbps: f64,
13 pub error_rate: f64,
14 pub cpu_usage: f64,
15 pub memory_usage_percent: f64,
16 pub connection_count: usize,
17}
18
19impl Default for PerformanceContext {
20 fn default() -> Self {
21 Self {
22 average_latency_ms: 100.0,
23 available_bandwidth_mbps: 10.0,
24 error_rate: 0.01,
25 cpu_usage: 0.5,
26 memory_usage_percent: 60.0,
27 connection_count: 1,
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
34pub enum PrioritizationStrategy {
35 Conservative,
37 Balanced,
39 Aggressive,
41 Custom(CustomPriorityRules),
43}
44
45#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
47pub struct CustomPriorityRules {
48 pub latency_threshold_ms: f64,
49 pub bandwidth_threshold_mbps: f64,
50 pub error_rate_threshold: f64,
51 pub priority_boost_on_error: u8,
52 pub priority_reduction_on_good_performance: u8,
53}
54
55impl Default for CustomPriorityRules {
56 fn default() -> Self {
57 Self {
58 latency_threshold_ms: 500.0,
59 bandwidth_threshold_mbps: 5.0,
60 error_rate_threshold: 0.03,
61 priority_boost_on_error: 20,
62 priority_reduction_on_good_performance: 10,
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct PriorityCalculationResult {
70 pub calculated_priority: Priority,
71 pub reasoning: Vec<String>,
72 pub confidence_score: f64,
73 pub strategy_used: PrioritizationStrategy,
74}
75
76#[derive(Debug)]
78pub struct PrioritizationService {
79 strategy: PrioritizationStrategy,
80}
81
82impl PrioritizationService {
83 pub fn new(strategy: PrioritizationStrategy) -> Self {
84 Self { strategy }
85 }
86
87 pub fn calculate_adaptive_priority(
89 &self,
90 context: &PerformanceContext,
91 ) -> ApplicationResult<PriorityCalculationResult> {
92 let mut reasoning = Vec::new();
93
94 let priority = match &self.strategy {
95 PrioritizationStrategy::Conservative => {
96 self.calculate_conservative_priority(context, &mut reasoning)?
97 }
98 PrioritizationStrategy::Balanced => {
99 self.calculate_balanced_priority(context, &mut reasoning)?
100 }
101 PrioritizationStrategy::Aggressive => {
102 self.calculate_aggressive_priority(context, &mut reasoning)?
103 }
104 PrioritizationStrategy::Custom(rules) => {
105 self.calculate_custom_priority(context, rules, &mut reasoning)?
106 }
107 };
108
109 let confidence = self.calculate_confidence_score(context);
111
112 Ok(PriorityCalculationResult {
113 calculated_priority: priority,
114 reasoning,
115 confidence_score: confidence,
116 strategy_used: self.strategy.clone(),
117 })
118 }
119
120 pub fn calculate_global_priority(
122 &self,
123 context: &PerformanceContext,
124 stream_count: usize,
125 ) -> ApplicationResult<PriorityCalculationResult> {
126 let mut base_result = self.calculate_adaptive_priority(context)?;
127
128 let stream_factor = match stream_count {
130 1..=3 => 0,
131 4..=10 => 10,
132 11..=50 => 20,
133 _ => 30,
134 };
135
136 base_result.calculated_priority =
137 base_result.calculated_priority.increase_by(stream_factor);
138 base_result.reasoning.push(format!(
139 "Increased priority by {stream_factor} for {stream_count} concurrent streams"
140 ));
141
142 Ok(base_result)
143 }
144
145 pub fn analyze_priority_adjustments(
147 &self,
148 metrics: &StreamingMetrics,
149 ) -> ApplicationResult<Vec<PriorityAdjustment>> {
150 let mut adjustments = Vec::new();
151
152 if let Some(adjustment) = self.analyze_latency_adjustment(metrics)? {
154 adjustments.push(adjustment);
155 }
156
157 if let Some(adjustment) = self.analyze_throughput_adjustment(metrics)? {
159 adjustments.push(adjustment);
160 }
161
162 if let Some(adjustment) = self.analyze_error_rate_adjustment(metrics)? {
164 adjustments.push(adjustment);
165 }
166
167 Ok(adjustments)
168 }
169
170 pub fn update_strategy(&mut self, new_strategy: PrioritizationStrategy) {
172 self.strategy = new_strategy;
173 }
174
175 fn calculate_conservative_priority(
178 &self,
179 context: &PerformanceContext,
180 reasoning: &mut Vec<String>,
181 ) -> ApplicationResult<Priority> {
182 let mut priority = Priority::HIGH; if context.error_rate > 0.02 {
186 priority = Priority::CRITICAL;
187 reasoning.push("Conservative: High error rate detected".to_string());
188 }
189
190 if context.average_latency_ms > 500.0 {
191 priority = Priority::CRITICAL;
192 reasoning.push("Conservative: High latency detected".to_string());
193 }
194
195 if context.cpu_usage > 0.7 {
196 priority = priority.increase_by(10);
197 reasoning.push("Conservative: High CPU usage".to_string());
198 }
199
200 Ok(priority)
201 }
202
203 fn calculate_balanced_priority(
204 &self,
205 context: &PerformanceContext,
206 reasoning: &mut Vec<String>,
207 ) -> ApplicationResult<Priority> {
208 let mut priority = Priority::MEDIUM; reasoning.push("Balanced: Starting with medium priority".to_string());
210
211 if context.average_latency_ms > 1000.0 {
213 priority = Priority::HIGH;
214 reasoning.push("Balanced: High latency - prioritizing critical data".to_string());
215 } else if context.average_latency_ms < 100.0 {
216 priority = Priority::LOW;
217 reasoning.push("Balanced: Low latency - can send more data".to_string());
218 }
219
220 if context.available_bandwidth_mbps < 1.0 {
222 priority = priority.increase_by(20);
223 reasoning.push("Balanced: Limited bandwidth - being more selective".to_string());
224 } else if context.available_bandwidth_mbps > 10.0 {
225 priority = priority.decrease_by(10);
226 reasoning.push("Balanced: Good bandwidth - can send more data".to_string());
227 }
228
229 if context.error_rate > 0.05 {
231 priority = priority.increase_by(30);
232 reasoning.push("Balanced: High error rate - much more selective".to_string());
233 }
234
235 Ok(priority)
236 }
237
238 fn calculate_aggressive_priority(
239 &self,
240 context: &PerformanceContext,
241 reasoning: &mut Vec<String>,
242 ) -> ApplicationResult<Priority> {
243 let mut priority = Priority::LOW; if context.error_rate > 0.1 {
247 priority = Priority::HIGH;
248 reasoning.push("Aggressive: Very high error rate - must prioritize".to_string());
249 } else if context.error_rate > 0.05 {
250 priority = Priority::MEDIUM;
251 reasoning.push("Aggressive: High error rate - moderate prioritization".to_string());
252 }
253
254 if context.average_latency_ms > 2000.0 {
255 priority = Priority::HIGH;
256 reasoning.push("Aggressive: Extremely high latency".to_string());
257 }
258
259 if context.available_bandwidth_mbps < 0.5 {
260 priority = priority.increase_by(40);
261 reasoning.push("Aggressive: Very limited bandwidth".to_string());
262 }
263
264 Ok(priority)
265 }
266
267 fn calculate_custom_priority(
268 &self,
269 context: &PerformanceContext,
270 rules: &CustomPriorityRules,
271 reasoning: &mut Vec<String>,
272 ) -> ApplicationResult<Priority> {
273 let mut priority = Priority::MEDIUM;
274
275 if context.average_latency_ms > rules.latency_threshold_ms {
276 priority = priority.increase_by(rules.priority_boost_on_error);
277 reasoning.push(format!(
278 "Custom: Latency {:.1}ms exceeds threshold {:.1}ms",
279 context.average_latency_ms, rules.latency_threshold_ms
280 ));
281 }
282
283 if context.available_bandwidth_mbps < rules.bandwidth_threshold_mbps {
284 priority = priority.increase_by(rules.priority_boost_on_error);
285 reasoning.push(format!(
286 "Custom: Bandwidth {:.1}Mbps below threshold {:.1}Mbps",
287 context.available_bandwidth_mbps, rules.bandwidth_threshold_mbps
288 ));
289 }
290
291 if context.error_rate > rules.error_rate_threshold {
292 priority = priority.increase_by(rules.priority_boost_on_error);
293 reasoning.push(format!(
294 "Custom: Error rate {:.3} exceeds threshold {:.3}",
295 context.error_rate, rules.error_rate_threshold
296 ));
297 }
298
299 if context.average_latency_ms < rules.latency_threshold_ms / 2.0
301 && context.available_bandwidth_mbps > rules.bandwidth_threshold_mbps * 2.0
302 && context.error_rate < rules.error_rate_threshold / 2.0
303 {
304 priority = priority.decrease_by(rules.priority_reduction_on_good_performance);
305 reasoning.push("Custom: Excellent performance - reducing priority".to_string());
306 }
307
308 Ok(priority)
309 }
310
311 fn calculate_confidence_score(&self, context: &PerformanceContext) -> f64 {
312 let mut confidence: f64 = 1.0;
313
314 if context.error_rate > 0.1 {
316 confidence *= 0.7; }
318
319 if context.cpu_usage > 0.9 {
320 confidence *= 0.8; }
322
323 if context.connection_count > 100 {
324 confidence *= 0.9; }
326
327 confidence.max(0.1) }
329
330 fn analyze_latency_adjustment(
331 &self,
332 metrics: &StreamingMetrics,
333 ) -> ApplicationResult<Option<PriorityAdjustment>> {
334 if metrics.average_latency_ms > 1500.0 && metrics.p99_latency_ms > 3000.0 {
335 Ok(Some(PriorityAdjustment {
336 new_threshold: Priority::CRITICAL,
337 reason: format!(
338 "Latency degradation: avg {:.1}ms, p99 {:.1}ms",
339 metrics.average_latency_ms, metrics.p99_latency_ms
340 ),
341 confidence: 0.9,
342 urgency: AdjustmentUrgency::High,
343 }))
344 } else if metrics.average_latency_ms < 100.0 && metrics.p99_latency_ms < 200.0 {
345 Ok(Some(PriorityAdjustment {
346 new_threshold: Priority::LOW,
347 reason: format!(
348 "Excellent latency: avg {:.1}ms, p99 {:.1}ms",
349 metrics.average_latency_ms, metrics.p99_latency_ms
350 ),
351 confidence: 0.8,
352 urgency: AdjustmentUrgency::Low,
353 }))
354 } else {
355 Ok(None)
356 }
357 }
358
359 fn analyze_throughput_adjustment(
360 &self,
361 metrics: &StreamingMetrics,
362 ) -> ApplicationResult<Option<PriorityAdjustment>> {
363 if metrics.throughput_mbps < 1.0 && metrics.error_rate < 0.02 {
364 Ok(Some(PriorityAdjustment {
366 new_threshold: Priority::LOW,
367 reason: format!(
368 "Low throughput {:.1}Mbps with good stability",
369 metrics.throughput_mbps
370 ),
371 confidence: 0.7,
372 urgency: AdjustmentUrgency::Medium,
373 }))
374 } else if metrics.throughput_mbps > 50.0 {
375 Ok(Some(PriorityAdjustment {
377 new_threshold: Priority::MEDIUM,
378 reason: format!(
379 "High throughput {:.1}Mbps allows selectivity",
380 metrics.throughput_mbps
381 ),
382 confidence: 0.8,
383 urgency: AdjustmentUrgency::Low,
384 }))
385 } else {
386 Ok(None)
387 }
388 }
389
390 fn analyze_error_rate_adjustment(
391 &self,
392 metrics: &StreamingMetrics,
393 ) -> ApplicationResult<Option<PriorityAdjustment>> {
394 if metrics.error_rate > 0.1 {
395 Ok(Some(PriorityAdjustment {
396 new_threshold: Priority::CRITICAL,
397 reason: format!("High error rate {:.1}%", metrics.error_rate * 100.0),
398 confidence: 0.95,
399 urgency: AdjustmentUrgency::Critical,
400 }))
401 } else if metrics.error_rate < 0.001 {
402 Ok(Some(PriorityAdjustment {
403 new_threshold: Priority::LOW,
404 reason: format!(
405 "Excellent stability {:.3}% errors",
406 metrics.error_rate * 100.0
407 ),
408 confidence: 0.8,
409 urgency: AdjustmentUrgency::Low,
410 }))
411 } else {
412 Ok(None)
413 }
414 }
415}
416
417impl Default for PrioritizationService {
418 fn default() -> Self {
419 Self::new(PrioritizationStrategy::Balanced)
420 }
421}
422
423#[derive(Debug, Clone)]
427pub struct StreamingMetrics {
428 pub average_latency_ms: f64,
429 pub p50_latency_ms: f64,
430 pub p95_latency_ms: f64,
431 pub p99_latency_ms: f64,
432 pub throughput_mbps: f64,
433 pub error_rate: f64,
434 pub frames_sent: u64,
435 pub bytes_sent: u64,
436 pub connections_active: usize,
437}
438
439#[derive(Debug, Clone)]
441pub struct PriorityAdjustment {
442 pub new_threshold: Priority,
443 pub reason: String,
444 pub confidence: f64,
445 pub urgency: AdjustmentUrgency,
446}
447
448use crate::application::shared::AdjustmentUrgency;
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn test_conservative_strategy() {
457 let service = PrioritizationService::new(PrioritizationStrategy::Conservative);
458 let context = PerformanceContext {
459 average_latency_ms: 600.0,
460 error_rate: 0.03,
461 ..Default::default()
462 };
463
464 let result = service.calculate_adaptive_priority(&context).unwrap();
465 assert_eq!(result.calculated_priority, Priority::CRITICAL);
466 assert!(!result.reasoning.is_empty());
467 }
468
469 #[test]
470 fn test_balanced_strategy() {
471 let service = PrioritizationService::new(PrioritizationStrategy::Balanced);
472 let context = PerformanceContext::default();
473
474 let result = service.calculate_adaptive_priority(&context).unwrap();
475 assert!(result.confidence_score > 0.0);
476 assert!(!result.reasoning.is_empty());
477 }
478
479 #[test]
480 fn test_custom_strategy() {
481 let custom_rules = CustomPriorityRules {
482 latency_threshold_ms: 200.0,
483 bandwidth_threshold_mbps: 5.0,
484 error_rate_threshold: 0.02,
485 priority_boost_on_error: 25,
486 priority_reduction_on_good_performance: 15,
487 };
488
489 let service = PrioritizationService::new(PrioritizationStrategy::Custom(custom_rules));
490 let context = PerformanceContext {
491 average_latency_ms: 50.0, available_bandwidth_mbps: 15.0, error_rate: 0.005, ..Default::default()
495 };
496
497 let result = service.calculate_adaptive_priority(&context).unwrap();
498 assert!(result.calculated_priority <= Priority::MEDIUM);
500 }
501}