1use crate::degradation::ResourcePressure;
34use std::{
35 collections::{HashMap, VecDeque},
36 sync::{Arc, Mutex},
37 time::Instant,
38};
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42pub enum Priority {
43 Critical = 4,
45 High = 3,
47 Normal = 2,
49 Low = 1,
51}
52
53impl Default for Priority {
54 #[inline]
55 fn default() -> Self {
56 Self::Normal
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct QosConfig {
63 pub max_queue_size: usize,
65 pub bandwidth_allocation: HashMap<Priority, u32>,
67 pub strict_priority: bool,
69 pub time_slice_ms: u64,
71 pub sla_target_latency_ms: HashMap<Priority, u64>,
73}
74
75impl Default for QosConfig {
76 #[inline]
77 fn default() -> Self {
78 let mut bandwidth_allocation = HashMap::new();
79 bandwidth_allocation.insert(Priority::Critical, 40);
80 bandwidth_allocation.insert(Priority::High, 30);
81 bandwidth_allocation.insert(Priority::Normal, 20);
82 bandwidth_allocation.insert(Priority::Low, 10);
83
84 let mut sla_target_latency_ms = HashMap::new();
85 sla_target_latency_ms.insert(Priority::Critical, 100);
86 sla_target_latency_ms.insert(Priority::High, 500);
87 sla_target_latency_ms.insert(Priority::Normal, 2000);
88 sla_target_latency_ms.insert(Priority::Low, 10000);
89
90 Self {
91 max_queue_size: 1000,
92 bandwidth_allocation,
93 strict_priority: true,
94 time_slice_ms: 100,
95 sla_target_latency_ms,
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
102pub struct RequestInfo {
103 pub id: String,
105 pub cid: String,
107 pub size_bytes: u64,
109 pub priority: Priority,
111 pub deadline_ms: Option<i64>,
114}
115
116#[derive(Debug, Clone)]
118struct QueueEntry {
119 info: RequestInfo,
120 enqueued_at: Instant,
121}
122
123#[derive(Debug, Clone, Default)]
125pub struct SlaMetrics {
126 pub total_requests: u64,
128 pub met_sla: u64,
130 pub violated_sla: u64,
132 pub avg_queue_time_ms: u64,
134 pub max_queue_time_ms: u64,
136 pub total_bytes: u64,
138}
139
140impl SlaMetrics {
141 #[must_use]
143 #[inline]
144 pub fn compliance_rate(&self) -> f64 {
145 if self.total_requests == 0 {
146 return 1.0;
147 }
148 self.met_sla as f64 / self.total_requests as f64
149 }
150
151 #[must_use]
153 #[inline]
154 pub fn violation_rate(&self) -> f64 {
155 1.0 - self.compliance_rate()
156 }
157}
158
159pub struct QosManager {
161 config: QosConfig,
162 queues: Arc<Mutex<HashMap<Priority, VecDeque<QueueEntry>>>>,
164 metrics: Arc<Mutex<HashMap<Priority, SlaMetrics>>>,
166 last_service: Arc<Mutex<HashMap<Priority, Instant>>>,
168 resource_pressure: Arc<Mutex<ResourcePressure>>,
170}
171
172impl QosManager {
173 #[must_use]
175 pub fn new(config: QosConfig) -> Self {
176 let mut queues = HashMap::new();
177 let mut metrics = HashMap::new();
178 let mut last_service = HashMap::new();
179
180 for &priority in &[
181 Priority::Critical,
182 Priority::High,
183 Priority::Normal,
184 Priority::Low,
185 ] {
186 queues.insert(priority, VecDeque::new());
187 metrics.insert(priority, SlaMetrics::default());
188 last_service.insert(priority, Instant::now());
189 }
190
191 Self {
192 config,
193 queues: Arc::new(Mutex::new(queues)),
194 metrics: Arc::new(Mutex::new(metrics)),
195 last_service: Arc::new(Mutex::new(last_service)),
196 resource_pressure: Arc::new(Mutex::new(ResourcePressure::default())),
197 }
198 }
199
200 #[must_use]
204 pub async fn enqueue(&mut self, request: RequestInfo) -> bool {
205 let priority = request.priority;
206 let entry = QueueEntry {
207 info: request,
208 enqueued_at: Instant::now(),
209 };
210
211 let mut queues = self.queues.lock().unwrap();
212 if let Some(queue) = queues.get_mut(&priority) {
213 if queue.len() >= self.config.max_queue_size {
214 return false;
215 }
216 queue.push_back(entry);
217 true
218 } else {
219 false
220 }
221 }
222
223 #[must_use]
228 pub async fn dequeue(&mut self) -> Option<RequestInfo> {
229 let mut queues = self.queues.lock().unwrap();
230 let mut last_service = self.last_service.lock().unwrap();
231
232 if let Some((priority, index)) = self.find_urgent_deadline_request(&queues) {
234 if let Some(queue) = queues.get_mut(&priority) {
235 if let Some(entry) = queue.remove(index) {
236 let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
237 last_service.insert(priority, Instant::now());
238 self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
239 return Some(entry.info);
240 }
241 }
242 }
243
244 let priority = if self.config.strict_priority {
246 self.select_highest_priority(&queues)?
248 } else {
249 self.select_fair_priority(&queues, &last_service)?
251 };
252
253 if let Some(queue) = queues.get_mut(&priority) {
255 if let Some(entry) = queue.pop_front() {
256 let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
257
258 last_service.insert(priority, Instant::now());
260
261 self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
263
264 return Some(entry.info);
265 }
266 }
267
268 None
269 }
270
271 #[must_use]
276 #[inline]
277 fn find_urgent_deadline_request(
278 &self,
279 queues: &HashMap<Priority, VecDeque<QueueEntry>>,
280 ) -> Option<(Priority, usize)> {
281 let now = crate::utils::current_timestamp_ms();
282 let mut most_urgent: Option<(Priority, usize, i64)> = None; for (&priority, queue) in queues {
285 for (idx, entry) in queue.iter().enumerate() {
286 if let Some(deadline) = entry.info.deadline_ms {
287 let time_to_deadline = deadline - now;
288
289 if time_to_deadline < 100 {
291 if let Some((_, _, prev_urgency)) = most_urgent {
293 if time_to_deadline < prev_urgency {
294 most_urgent = Some((priority, idx, time_to_deadline));
295 }
296 } else {
297 most_urgent = Some((priority, idx, time_to_deadline));
298 }
299 }
300 }
301 }
302 }
303
304 most_urgent.map(|(p, i, _)| (p, i))
305 }
306
307 #[must_use]
309 #[inline]
310 fn select_highest_priority(
311 &self,
312 queues: &HashMap<Priority, VecDeque<QueueEntry>>,
313 ) -> Option<Priority> {
314 for &priority in &[
315 Priority::Critical,
316 Priority::High,
317 Priority::Normal,
318 Priority::Low,
319 ] {
320 if let Some(queue) = queues.get(&priority) {
321 if !queue.is_empty() {
322 return Some(priority);
323 }
324 }
325 }
326 None
327 }
328
329 #[must_use]
331 #[inline]
332 fn select_fair_priority(
333 &self,
334 queues: &HashMap<Priority, VecDeque<QueueEntry>>,
335 last_service: &HashMap<Priority, Instant>,
336 ) -> Option<Priority> {
337 let mut candidates = Vec::new();
338
339 for &priority in &[
341 Priority::Critical,
342 Priority::High,
343 Priority::Normal,
344 Priority::Low,
345 ] {
346 if let Some(queue) = queues.get(&priority) {
347 if !queue.is_empty() {
348 candidates.push(priority);
349 }
350 }
351 }
352
353 if candidates.is_empty() {
354 return None;
355 }
356
357 candidates.into_iter().max_by_key(|&priority| {
359 let time_since = last_service
360 .get(&priority)
361 .map(|t| t.elapsed().as_millis() as u64)
362 .unwrap_or(0);
363 let weight = self
364 .config
365 .bandwidth_allocation
366 .get(&priority)
367 .copied()
368 .unwrap_or(1);
369 time_since * u64::from(weight)
370 })
371 }
372
373 #[inline]
375 fn update_metrics(&self, priority: Priority, bytes: u64, queue_time_ms: u64) {
376 let mut metrics = self.metrics.lock().unwrap();
377 if let Some(m) = metrics.get_mut(&priority) {
378 m.total_requests += 1;
379 m.total_bytes += bytes;
380
381 let total_time = m.avg_queue_time_ms * (m.total_requests - 1) + queue_time_ms;
383 m.avg_queue_time_ms = total_time / m.total_requests;
384
385 if queue_time_ms > m.max_queue_time_ms {
387 m.max_queue_time_ms = queue_time_ms;
388 }
389
390 if let Some(&target) = self.config.sla_target_latency_ms.get(&priority) {
392 if queue_time_ms <= target {
393 m.met_sla += 1;
394 } else {
395 m.violated_sla += 1;
396 }
397 }
398 }
399 }
400
401 #[must_use]
403 #[inline]
404 pub fn queue_depth(&self, priority: Priority) -> usize {
405 self.queues
406 .lock()
407 .unwrap()
408 .get(&priority)
409 .map(|q| q.len())
410 .unwrap_or(0)
411 }
412
413 #[must_use]
415 #[inline]
416 pub fn total_queue_depth(&self) -> usize {
417 self.queues.lock().unwrap().values().map(|q| q.len()).sum()
418 }
419
420 #[must_use]
422 #[inline]
423 pub fn get_sla_metrics(&self, priority: Priority) -> Option<SlaMetrics> {
424 self.metrics.lock().unwrap().get(&priority).cloned()
425 }
426
427 #[must_use]
429 #[inline]
430 pub fn get_all_sla_metrics(&self) -> HashMap<Priority, SlaMetrics> {
431 self.metrics.lock().unwrap().clone()
432 }
433
434 pub fn reset_metrics(&mut self) {
436 let mut metrics = self.metrics.lock().unwrap();
437 for m in metrics.values_mut() {
438 *m = SlaMetrics::default();
439 }
440 }
441
442 #[must_use]
444 #[inline]
445 pub fn is_near_capacity(&self) -> bool {
446 let queues = self.queues.lock().unwrap();
447 let threshold = (self.config.max_queue_size * 80) / 100;
448 queues.values().any(|q| q.len() > threshold)
449 }
450
451 #[must_use]
453 #[inline]
454 pub fn overall_compliance_rate(&self) -> f64 {
455 let metrics = self.metrics.lock().unwrap();
456 let mut total_requests = 0u64;
457 let mut total_met = 0u64;
458
459 for m in metrics.values() {
460 total_requests += m.total_requests;
461 total_met += m.met_sla;
462 }
463
464 if total_requests == 0 {
465 return 1.0;
466 }
467 total_met as f64 / total_requests as f64
468 }
469
470 pub fn update_resource_pressure(&mut self, pressure: ResourcePressure) {
474 let mut current = self.resource_pressure.lock().unwrap();
475 *current = pressure;
476 }
477
478 #[must_use]
480 pub fn get_resource_pressure(&self) -> ResourcePressure {
481 *self.resource_pressure.lock().unwrap()
482 }
483
484 #[must_use]
488 #[inline]
489 pub fn is_under_high_pressure(&self) -> bool {
490 let pressure = self.resource_pressure.lock().unwrap();
491 pressure.overall_score() > 0.80
492 }
493
494 #[must_use]
498 #[inline]
499 pub fn adaptive_queue_limit(&self) -> usize {
500 let pressure = self.resource_pressure.lock().unwrap();
501 let pressure_score = pressure.overall_score();
502
503 if pressure_score > 0.90 {
504 self.config.max_queue_size / 4
506 } else if pressure_score > 0.80 {
507 self.config.max_queue_size / 2
509 } else if pressure_score > 0.70 {
510 (self.config.max_queue_size * 3) / 4
512 } else {
513 self.config.max_queue_size
515 }
516 }
517
518 #[must_use]
522 #[inline]
523 pub fn should_throttle_priority(&self, priority: Priority) -> bool {
524 let pressure = self.resource_pressure.lock().unwrap();
525 let pressure_score = pressure.overall_score();
526
527 match priority {
528 Priority::Critical => false, Priority::High => pressure_score > 0.95, Priority::Normal => pressure_score > 0.85,
531 Priority::Low => pressure_score > 0.70,
532 }
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use std::time::Duration;
540
541 fn create_request(id: &str, priority: Priority) -> RequestInfo {
542 RequestInfo {
543 id: id.to_string(),
544 cid: format!("Qm{}", id),
545 size_bytes: 1024,
546 priority,
547 deadline_ms: None,
548 }
549 }
550
551 #[tokio::test]
552 async fn test_enqueue_dequeue() {
553 let mut qos = QosManager::new(QosConfig::default());
554
555 let req = create_request("test1", Priority::Normal);
556 assert!(qos.enqueue(req.clone()).await);
557 assert_eq!(qos.queue_depth(Priority::Normal), 1);
558
559 let dequeued = qos.dequeue().await;
560 assert!(dequeued.is_some());
561 assert_eq!(dequeued.unwrap().id, "test1");
562 assert_eq!(qos.queue_depth(Priority::Normal), 0);
563 }
564
565 #[tokio::test]
566 async fn test_strict_priority_ordering() {
567 let config = QosConfig {
568 strict_priority: true,
569 ..Default::default()
570 };
571 let mut qos = QosManager::new(config);
572
573 let _ = qos.enqueue(create_request("low", Priority::Low)).await;
575 let _ = qos
576 .enqueue(create_request("normal", Priority::Normal))
577 .await;
578 let _ = qos.enqueue(create_request("high", Priority::High)).await;
579 let _ = qos
580 .enqueue(create_request("critical", Priority::Critical))
581 .await;
582
583 assert_eq!(qos.dequeue().await.unwrap().id, "critical");
585 assert_eq!(qos.dequeue().await.unwrap().id, "high");
586 assert_eq!(qos.dequeue().await.unwrap().id, "normal");
587 assert_eq!(qos.dequeue().await.unwrap().id, "low");
588 }
589
590 #[tokio::test]
591 async fn test_queue_capacity() {
592 let config = QosConfig {
593 max_queue_size: 3,
594 ..Default::default()
595 };
596 let mut qos = QosManager::new(config);
597
598 assert!(qos.enqueue(create_request("1", Priority::Normal)).await);
599 assert!(qos.enqueue(create_request("2", Priority::Normal)).await);
600 assert!(qos.enqueue(create_request("3", Priority::Normal)).await);
601 assert!(!qos.enqueue(create_request("4", Priority::Normal)).await); assert_eq!(qos.queue_depth(Priority::Normal), 3);
604 }
605
606 #[tokio::test]
607 async fn test_sla_metrics() {
608 let mut qos = QosManager::new(QosConfig::default());
609
610 let req = create_request("test", Priority::High);
611 let _ = qos.enqueue(req).await;
612
613 tokio::time::sleep(Duration::from_millis(10)).await;
615
616 let _ = qos.dequeue().await;
617
618 let metrics = qos.get_sla_metrics(Priority::High).unwrap();
619 assert_eq!(metrics.total_requests, 1);
620 assert!(metrics.avg_queue_time_ms >= 10);
621 }
622
623 #[tokio::test]
624 async fn test_sla_compliance() {
625 let mut config = QosConfig::default();
626 config.sla_target_latency_ms.insert(Priority::Normal, 1000);
627 let mut qos = QosManager::new(config);
628
629 let _ = qos.enqueue(create_request("fast", Priority::Normal)).await;
631 let _ = qos.dequeue().await;
632
633 let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
634 assert_eq!(metrics.met_sla, 1);
635 assert_eq!(metrics.violated_sla, 0);
636 assert_eq!(metrics.compliance_rate(), 1.0);
637 }
638
639 #[tokio::test]
640 async fn test_total_queue_depth() {
641 let mut qos = QosManager::new(QosConfig::default());
642
643 let _ = qos.enqueue(create_request("1", Priority::Critical)).await;
644 let _ = qos.enqueue(create_request("2", Priority::High)).await;
645 let _ = qos.enqueue(create_request("3", Priority::Normal)).await;
646 let _ = qos.enqueue(create_request("4", Priority::Low)).await;
647
648 assert_eq!(qos.total_queue_depth(), 4);
649 }
650
651 #[tokio::test]
652 async fn test_near_capacity() {
653 let config = QosConfig {
654 max_queue_size: 10,
655 ..Default::default()
656 };
657 let mut qos = QosManager::new(config);
658
659 assert!(!qos.is_near_capacity());
660
661 for i in 0..9 {
663 let _ = qos
664 .enqueue(create_request(&format!("{}", i), Priority::Normal))
665 .await;
666 }
667
668 assert!(qos.is_near_capacity());
669 }
670
671 #[tokio::test]
672 async fn test_reset_metrics() {
673 let mut qos = QosManager::new(QosConfig::default());
674
675 let _ = qos.enqueue(create_request("test", Priority::Normal)).await;
676 let _ = qos.dequeue().await;
677
678 let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
679 assert_eq!(metrics.total_requests, 1);
680
681 qos.reset_metrics();
682
683 let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
684 assert_eq!(metrics.total_requests, 0);
685 }
686
687 #[tokio::test]
688 async fn test_overall_compliance_rate() {
689 let mut qos = QosManager::new(QosConfig::default());
690
691 assert_eq!(qos.overall_compliance_rate(), 1.0);
693
694 for priority in &[
696 Priority::Critical,
697 Priority::High,
698 Priority::Normal,
699 Priority::Low,
700 ] {
701 let _ = qos.enqueue(create_request("test", *priority)).await;
702 let _ = qos.dequeue().await;
703 }
704
705 assert!(qos.overall_compliance_rate() > 0.9);
707 }
708
709 #[tokio::test]
710 async fn test_priority_default() {
711 assert_eq!(Priority::default(), Priority::Normal);
712 }
713
714 #[tokio::test]
715 async fn test_priority_ordering() {
716 assert!(Priority::Critical > Priority::High);
717 assert!(Priority::High > Priority::Normal);
718 assert!(Priority::Normal > Priority::Low);
719 }
720
721 #[tokio::test]
722 async fn test_fair_scheduling() {
723 let config = QosConfig {
724 strict_priority: false,
725 ..Default::default()
726 };
727 let mut qos = QosManager::new(config);
728
729 for _ in 0..3 {
731 let _ = qos.enqueue(create_request("low", Priority::Low)).await;
732 let _ = qos
733 .enqueue(create_request("normal", Priority::Normal))
734 .await;
735 let _ = qos.enqueue(create_request("high", Priority::High)).await;
736 }
737
738 let mut served_priorities = std::collections::HashSet::new();
740 for _ in 0..9 {
741 if let Some(req) = qos.dequeue().await {
742 served_priorities.insert(req.priority);
743 }
744 }
745
746 assert_eq!(served_priorities.len(), 3);
748 }
749
750 #[tokio::test]
751 async fn test_resource_pressure_integration() {
752 let mut qos = QosManager::new(QosConfig::default());
753
754 let initial_pressure = qos.get_resource_pressure();
756 assert!((initial_pressure.cpu_usage - 0.0).abs() < 0.01);
757
758 let moderate_pressure = ResourcePressure {
760 cpu_usage: 0.60,
761 memory_usage: 0.70,
762 disk_usage: 0.65,
763 bandwidth_usage: 0.55,
764 };
765 qos.update_resource_pressure(moderate_pressure);
766
767 assert!(!qos.is_under_high_pressure());
768 assert_eq!(qos.adaptive_queue_limit(), qos.config.max_queue_size);
769 }
770
771 #[tokio::test]
772 async fn test_adaptive_queue_limit() {
773 let mut qos = QosManager::new(QosConfig::default());
774 let base_limit = qos.config.max_queue_size;
775
776 qos.update_resource_pressure(ResourcePressure {
778 cpu_usage: 0.85,
779 memory_usage: 0.80,
780 disk_usage: 0.82,
781 bandwidth_usage: 0.78,
782 });
783 assert_eq!(qos.adaptive_queue_limit(), base_limit / 2);
784
785 qos.update_resource_pressure(ResourcePressure {
787 cpu_usage: 0.95,
788 memory_usage: 0.92,
789 disk_usage: 0.90,
790 bandwidth_usage: 0.88,
791 });
792 assert_eq!(qos.adaptive_queue_limit(), base_limit / 4);
793 }
794
795 #[tokio::test]
796 async fn test_throttle_priority_based_on_pressure() {
797 let mut qos = QosManager::new(QosConfig::default());
798
799 qos.update_resource_pressure(ResourcePressure {
801 cpu_usage: 0.40,
802 memory_usage: 0.50,
803 disk_usage: 0.45,
804 bandwidth_usage: 0.35,
805 });
806
807 assert!(!qos.should_throttle_priority(Priority::Critical));
808 assert!(!qos.should_throttle_priority(Priority::High));
809 assert!(!qos.should_throttle_priority(Priority::Normal));
810 assert!(!qos.should_throttle_priority(Priority::Low));
811
812 qos.update_resource_pressure(ResourcePressure {
814 cpu_usage: 0.88,
815 memory_usage: 0.90,
816 disk_usage: 0.87,
817 bandwidth_usage: 0.85,
818 });
819
820 assert!(!qos.should_throttle_priority(Priority::Critical));
821 assert!(!qos.should_throttle_priority(Priority::High));
822 assert!(qos.should_throttle_priority(Priority::Normal));
823 assert!(qos.should_throttle_priority(Priority::Low));
824 }
825
826 #[tokio::test]
827 async fn test_high_pressure_detection() {
828 let mut qos = QosManager::new(QosConfig::default());
829
830 qos.update_resource_pressure(ResourcePressure {
832 cpu_usage: 0.75,
833 memory_usage: 0.78,
834 disk_usage: 0.72,
835 bandwidth_usage: 0.70,
836 });
837 assert!(!qos.is_under_high_pressure());
838
839 qos.update_resource_pressure(ResourcePressure {
841 cpu_usage: 0.85,
842 memory_usage: 0.88,
843 disk_usage: 0.82,
844 bandwidth_usage: 0.80,
845 });
846 assert!(qos.is_under_high_pressure());
847 }
848}