1use crate::degradation::ResourcePressure;
35use std::{
36 collections::{HashMap, VecDeque},
37 sync::{Arc, Mutex},
38 time::Instant,
39};
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub enum Priority {
44 Critical = 4,
46 High = 3,
48 Normal = 2,
50 Low = 1,
52}
53
54impl Default for Priority {
55 #[inline]
56 fn default() -> Self {
57 Self::Normal
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct QosConfig {
64 pub max_queue_size: usize,
66 pub bandwidth_allocation: HashMap<Priority, u32>,
68 pub strict_priority: bool,
70 pub time_slice_ms: u64,
72 pub sla_target_latency_ms: HashMap<Priority, u64>,
74}
75
76impl Default for QosConfig {
77 #[inline]
78 fn default() -> Self {
79 let mut bandwidth_allocation = HashMap::new();
80 bandwidth_allocation.insert(Priority::Critical, 40);
81 bandwidth_allocation.insert(Priority::High, 30);
82 bandwidth_allocation.insert(Priority::Normal, 20);
83 bandwidth_allocation.insert(Priority::Low, 10);
84
85 let mut sla_target_latency_ms = HashMap::new();
86 sla_target_latency_ms.insert(Priority::Critical, 100);
87 sla_target_latency_ms.insert(Priority::High, 500);
88 sla_target_latency_ms.insert(Priority::Normal, 2000);
89 sla_target_latency_ms.insert(Priority::Low, 10000);
90
91 Self {
92 max_queue_size: 1000,
93 bandwidth_allocation,
94 strict_priority: true,
95 time_slice_ms: 100,
96 sla_target_latency_ms,
97 }
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct RequestInfo {
104 pub id: String,
106 pub cid: String,
108 pub size_bytes: u64,
110 pub priority: Priority,
112 pub deadline_ms: Option<i64>,
115}
116
117#[derive(Debug, Clone)]
119struct QueueEntry {
120 info: RequestInfo,
121 enqueued_at: Instant,
122}
123
124#[derive(Debug, Clone, Default)]
126pub struct SlaMetrics {
127 pub total_requests: u64,
129 pub met_sla: u64,
131 pub violated_sla: u64,
133 pub avg_queue_time_ms: u64,
135 pub max_queue_time_ms: u64,
137 pub total_bytes: u64,
139}
140
141impl SlaMetrics {
142 #[must_use]
144 #[inline]
145 pub fn compliance_rate(&self) -> f64 {
146 if self.total_requests == 0 {
147 return 1.0;
148 }
149 self.met_sla as f64 / self.total_requests as f64
150 }
151
152 #[must_use]
154 #[inline]
155 pub fn violation_rate(&self) -> f64 {
156 1.0 - self.compliance_rate()
157 }
158}
159
160pub struct QosManager {
162 config: QosConfig,
163 queues: Arc<Mutex<HashMap<Priority, VecDeque<QueueEntry>>>>,
165 metrics: Arc<Mutex<HashMap<Priority, SlaMetrics>>>,
167 last_service: Arc<Mutex<HashMap<Priority, Instant>>>,
169 resource_pressure: Arc<Mutex<ResourcePressure>>,
171}
172
173impl QosManager {
174 #[must_use]
176 pub fn new(config: QosConfig) -> Self {
177 let mut queues = HashMap::new();
178 let mut metrics = HashMap::new();
179 let mut last_service = HashMap::new();
180
181 for &priority in &[
182 Priority::Critical,
183 Priority::High,
184 Priority::Normal,
185 Priority::Low,
186 ] {
187 queues.insert(priority, VecDeque::new());
188 metrics.insert(priority, SlaMetrics::default());
189 last_service.insert(priority, Instant::now());
190 }
191
192 Self {
193 config,
194 queues: Arc::new(Mutex::new(queues)),
195 metrics: Arc::new(Mutex::new(metrics)),
196 last_service: Arc::new(Mutex::new(last_service)),
197 resource_pressure: Arc::new(Mutex::new(ResourcePressure::default())),
198 }
199 }
200
201 #[must_use]
205 pub async fn enqueue(&mut self, request: RequestInfo) -> bool {
206 let priority = request.priority;
207 let entry = QueueEntry {
208 info: request,
209 enqueued_at: Instant::now(),
210 };
211
212 let mut queues = self.queues.lock().unwrap();
213 if let Some(queue) = queues.get_mut(&priority) {
214 if queue.len() >= self.config.max_queue_size {
215 return false;
216 }
217 queue.push_back(entry);
218 true
219 } else {
220 false
221 }
222 }
223
224 #[must_use]
229 pub async fn dequeue(&mut self) -> Option<RequestInfo> {
230 let mut queues = self.queues.lock().unwrap();
231 let mut last_service = self.last_service.lock().unwrap();
232
233 if let Some((priority, index)) = self.find_urgent_deadline_request(&queues) {
235 if let Some(queue) = queues.get_mut(&priority) {
236 if let Some(entry) = queue.remove(index) {
237 let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
238 last_service.insert(priority, Instant::now());
239 self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
240 return Some(entry.info);
241 }
242 }
243 }
244
245 let priority = if self.config.strict_priority {
247 self.select_highest_priority(&queues)?
249 } else {
250 self.select_fair_priority(&queues, &last_service)?
252 };
253
254 if let Some(queue) = queues.get_mut(&priority) {
256 if let Some(entry) = queue.pop_front() {
257 let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
258
259 last_service.insert(priority, Instant::now());
261
262 self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
264
265 return Some(entry.info);
266 }
267 }
268
269 None
270 }
271
272 #[must_use]
277 #[inline]
278 fn find_urgent_deadline_request(
279 &self,
280 queues: &HashMap<Priority, VecDeque<QueueEntry>>,
281 ) -> Option<(Priority, usize)> {
282 let now = crate::utils::current_timestamp_ms();
283 let mut most_urgent: Option<(Priority, usize, i64)> = None; for (&priority, queue) in queues {
286 for (idx, entry) in queue.iter().enumerate() {
287 if let Some(deadline) = entry.info.deadline_ms {
288 let time_to_deadline = deadline - now;
289
290 if time_to_deadline < 100 {
292 if let Some((_, _, prev_urgency)) = most_urgent {
294 if time_to_deadline < prev_urgency {
295 most_urgent = Some((priority, idx, time_to_deadline));
296 }
297 } else {
298 most_urgent = Some((priority, idx, time_to_deadline));
299 }
300 }
301 }
302 }
303 }
304
305 most_urgent.map(|(p, i, _)| (p, i))
306 }
307
308 #[must_use]
310 #[inline]
311 fn select_highest_priority(
312 &self,
313 queues: &HashMap<Priority, VecDeque<QueueEntry>>,
314 ) -> Option<Priority> {
315 for &priority in &[
316 Priority::Critical,
317 Priority::High,
318 Priority::Normal,
319 Priority::Low,
320 ] {
321 if let Some(queue) = queues.get(&priority) {
322 if !queue.is_empty() {
323 return Some(priority);
324 }
325 }
326 }
327 None
328 }
329
330 #[must_use]
332 #[inline]
333 fn select_fair_priority(
334 &self,
335 queues: &HashMap<Priority, VecDeque<QueueEntry>>,
336 last_service: &HashMap<Priority, Instant>,
337 ) -> Option<Priority> {
338 let mut candidates = Vec::new();
339
340 for &priority in &[
342 Priority::Critical,
343 Priority::High,
344 Priority::Normal,
345 Priority::Low,
346 ] {
347 if let Some(queue) = queues.get(&priority) {
348 if !queue.is_empty() {
349 candidates.push(priority);
350 }
351 }
352 }
353
354 if candidates.is_empty() {
355 return None;
356 }
357
358 candidates.into_iter().max_by_key(|&priority| {
360 let time_since = last_service
361 .get(&priority)
362 .map(|t| t.elapsed().as_millis() as u64)
363 .unwrap_or(0);
364 let weight = self
365 .config
366 .bandwidth_allocation
367 .get(&priority)
368 .copied()
369 .unwrap_or(1);
370 time_since * u64::from(weight)
371 })
372 }
373
374 #[inline]
376 fn update_metrics(&self, priority: Priority, bytes: u64, queue_time_ms: u64) {
377 let mut metrics = self.metrics.lock().unwrap();
378 if let Some(m) = metrics.get_mut(&priority) {
379 m.total_requests += 1;
380 m.total_bytes += bytes;
381
382 let total_time = m.avg_queue_time_ms * (m.total_requests - 1) + queue_time_ms;
384 m.avg_queue_time_ms = total_time / m.total_requests;
385
386 if queue_time_ms > m.max_queue_time_ms {
388 m.max_queue_time_ms = queue_time_ms;
389 }
390
391 if let Some(&target) = self.config.sla_target_latency_ms.get(&priority) {
393 if queue_time_ms <= target {
394 m.met_sla += 1;
395 } else {
396 m.violated_sla += 1;
397 }
398 }
399 }
400 }
401
402 #[must_use]
404 #[inline]
405 pub fn queue_depth(&self, priority: Priority) -> usize {
406 self.queues
407 .lock()
408 .unwrap()
409 .get(&priority)
410 .map(|q| q.len())
411 .unwrap_or(0)
412 }
413
414 #[must_use]
416 #[inline]
417 pub fn total_queue_depth(&self) -> usize {
418 self.queues.lock().unwrap().values().map(|q| q.len()).sum()
419 }
420
421 #[must_use]
423 #[inline]
424 pub fn get_sla_metrics(&self, priority: Priority) -> Option<SlaMetrics> {
425 self.metrics.lock().unwrap().get(&priority).cloned()
426 }
427
428 #[must_use]
430 #[inline]
431 pub fn get_all_sla_metrics(&self) -> HashMap<Priority, SlaMetrics> {
432 self.metrics.lock().unwrap().clone()
433 }
434
435 pub fn reset_metrics(&mut self) {
437 let mut metrics = self.metrics.lock().unwrap();
438 for m in metrics.values_mut() {
439 *m = SlaMetrics::default();
440 }
441 }
442
443 #[must_use]
445 #[inline]
446 pub fn is_near_capacity(&self) -> bool {
447 let queues = self.queues.lock().unwrap();
448 let threshold = (self.config.max_queue_size * 80) / 100;
449 queues.values().any(|q| q.len() > threshold)
450 }
451
452 #[must_use]
454 #[inline]
455 pub fn overall_compliance_rate(&self) -> f64 {
456 let metrics = self.metrics.lock().unwrap();
457 let mut total_requests = 0u64;
458 let mut total_met = 0u64;
459
460 for m in metrics.values() {
461 total_requests += m.total_requests;
462 total_met += m.met_sla;
463 }
464
465 if total_requests == 0 {
466 return 1.0;
467 }
468 total_met as f64 / total_requests as f64
469 }
470
471 pub fn update_resource_pressure(&mut self, pressure: ResourcePressure) {
475 let mut current = self.resource_pressure.lock().unwrap();
476 *current = pressure;
477 }
478
479 #[must_use]
481 pub fn get_resource_pressure(&self) -> ResourcePressure {
482 *self.resource_pressure.lock().unwrap()
483 }
484
485 #[must_use]
489 #[inline]
490 pub fn is_under_high_pressure(&self) -> bool {
491 let pressure = self.resource_pressure.lock().unwrap();
492 pressure.overall_score() > 0.80
493 }
494
495 #[must_use]
499 #[inline]
500 pub fn adaptive_queue_limit(&self) -> usize {
501 let pressure = self.resource_pressure.lock().unwrap();
502 let pressure_score = pressure.overall_score();
503
504 if pressure_score > 0.90 {
505 self.config.max_queue_size / 4
507 } else if pressure_score > 0.80 {
508 self.config.max_queue_size / 2
510 } else if pressure_score > 0.70 {
511 (self.config.max_queue_size * 3) / 4
513 } else {
514 self.config.max_queue_size
516 }
517 }
518
519 #[must_use]
523 #[inline]
524 pub fn should_throttle_priority(&self, priority: Priority) -> bool {
525 let pressure = self.resource_pressure.lock().unwrap();
526 let pressure_score = pressure.overall_score();
527
528 match priority {
529 Priority::Critical => false, Priority::High => pressure_score > 0.95, Priority::Normal => pressure_score > 0.85,
532 Priority::Low => pressure_score > 0.70,
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use std::time::Duration;
541
542 fn create_request(id: &str, priority: Priority) -> RequestInfo {
543 RequestInfo {
544 id: id.to_string(),
545 cid: format!("Qm{}", id),
546 size_bytes: 1024,
547 priority,
548 deadline_ms: None,
549 }
550 }
551
552 #[tokio::test]
553 async fn test_enqueue_dequeue() {
554 let mut qos = QosManager::new(QosConfig::default());
555
556 let req = create_request("test1", Priority::Normal);
557 assert!(qos.enqueue(req.clone()).await);
558 assert_eq!(qos.queue_depth(Priority::Normal), 1);
559
560 let dequeued = qos.dequeue().await;
561 assert!(dequeued.is_some());
562 assert_eq!(dequeued.unwrap().id, "test1");
563 assert_eq!(qos.queue_depth(Priority::Normal), 0);
564 }
565
566 #[tokio::test]
567 async fn test_strict_priority_ordering() {
568 let config = QosConfig {
569 strict_priority: true,
570 ..Default::default()
571 };
572 let mut qos = QosManager::new(config);
573
574 let _ = qos.enqueue(create_request("low", Priority::Low)).await;
576 let _ = qos
577 .enqueue(create_request("normal", Priority::Normal))
578 .await;
579 let _ = qos.enqueue(create_request("high", Priority::High)).await;
580 let _ = qos
581 .enqueue(create_request("critical", Priority::Critical))
582 .await;
583
584 assert_eq!(qos.dequeue().await.unwrap().id, "critical");
586 assert_eq!(qos.dequeue().await.unwrap().id, "high");
587 assert_eq!(qos.dequeue().await.unwrap().id, "normal");
588 assert_eq!(qos.dequeue().await.unwrap().id, "low");
589 }
590
591 #[tokio::test]
592 async fn test_queue_capacity() {
593 let config = QosConfig {
594 max_queue_size: 3,
595 ..Default::default()
596 };
597 let mut qos = QosManager::new(config);
598
599 assert!(qos.enqueue(create_request("1", Priority::Normal)).await);
600 assert!(qos.enqueue(create_request("2", Priority::Normal)).await);
601 assert!(qos.enqueue(create_request("3", Priority::Normal)).await);
602 assert!(!qos.enqueue(create_request("4", Priority::Normal)).await); assert_eq!(qos.queue_depth(Priority::Normal), 3);
605 }
606
607 #[tokio::test]
608 async fn test_sla_metrics() {
609 let mut qos = QosManager::new(QosConfig::default());
610
611 let req = create_request("test", Priority::High);
612 let _ = qos.enqueue(req).await;
613
614 tokio::time::sleep(Duration::from_millis(10)).await;
616
617 let _ = qos.dequeue().await;
618
619 let metrics = qos.get_sla_metrics(Priority::High).unwrap();
620 assert_eq!(metrics.total_requests, 1);
621 assert!(metrics.avg_queue_time_ms >= 10);
622 }
623
624 #[tokio::test]
625 async fn test_sla_compliance() {
626 let mut config = QosConfig::default();
627 config.sla_target_latency_ms.insert(Priority::Normal, 1000);
628 let mut qos = QosManager::new(config);
629
630 let _ = qos.enqueue(create_request("fast", Priority::Normal)).await;
632 let _ = qos.dequeue().await;
633
634 let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
635 assert_eq!(metrics.met_sla, 1);
636 assert_eq!(metrics.violated_sla, 0);
637 assert_eq!(metrics.compliance_rate(), 1.0);
638 }
639
640 #[tokio::test]
641 async fn test_total_queue_depth() {
642 let mut qos = QosManager::new(QosConfig::default());
643
644 let _ = qos.enqueue(create_request("1", Priority::Critical)).await;
645 let _ = qos.enqueue(create_request("2", Priority::High)).await;
646 let _ = qos.enqueue(create_request("3", Priority::Normal)).await;
647 let _ = qos.enqueue(create_request("4", Priority::Low)).await;
648
649 assert_eq!(qos.total_queue_depth(), 4);
650 }
651
652 #[tokio::test]
653 async fn test_near_capacity() {
654 let config = QosConfig {
655 max_queue_size: 10,
656 ..Default::default()
657 };
658 let mut qos = QosManager::new(config);
659
660 assert!(!qos.is_near_capacity());
661
662 for i in 0..9 {
664 let _ = qos
665 .enqueue(create_request(&format!("{}", i), Priority::Normal))
666 .await;
667 }
668
669 assert!(qos.is_near_capacity());
670 }
671
672 #[tokio::test]
673 async fn test_reset_metrics() {
674 let mut qos = QosManager::new(QosConfig::default());
675
676 let _ = qos.enqueue(create_request("test", Priority::Normal)).await;
677 let _ = qos.dequeue().await;
678
679 let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
680 assert_eq!(metrics.total_requests, 1);
681
682 qos.reset_metrics();
683
684 let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
685 assert_eq!(metrics.total_requests, 0);
686 }
687
688 #[tokio::test]
689 async fn test_overall_compliance_rate() {
690 let mut qos = QosManager::new(QosConfig::default());
691
692 assert_eq!(qos.overall_compliance_rate(), 1.0);
694
695 for priority in &[
697 Priority::Critical,
698 Priority::High,
699 Priority::Normal,
700 Priority::Low,
701 ] {
702 let _ = qos.enqueue(create_request("test", *priority)).await;
703 let _ = qos.dequeue().await;
704 }
705
706 assert!(qos.overall_compliance_rate() > 0.9);
708 }
709
710 #[tokio::test]
711 async fn test_priority_default() {
712 assert_eq!(Priority::default(), Priority::Normal);
713 }
714
715 #[tokio::test]
716 async fn test_priority_ordering() {
717 assert!(Priority::Critical > Priority::High);
718 assert!(Priority::High > Priority::Normal);
719 assert!(Priority::Normal > Priority::Low);
720 }
721
722 #[tokio::test]
723 async fn test_fair_scheduling() {
724 let config = QosConfig {
725 strict_priority: false,
726 ..Default::default()
727 };
728 let mut qos = QosManager::new(config);
729
730 for _ in 0..3 {
732 let _ = qos.enqueue(create_request("low", Priority::Low)).await;
733 let _ = qos
734 .enqueue(create_request("normal", Priority::Normal))
735 .await;
736 let _ = qos.enqueue(create_request("high", Priority::High)).await;
737 }
738
739 let mut served_priorities = std::collections::HashSet::new();
741 for _ in 0..9 {
742 if let Some(req) = qos.dequeue().await {
743 served_priorities.insert(req.priority);
744 }
745 }
746
747 assert_eq!(served_priorities.len(), 3);
749 }
750
751 #[tokio::test]
752 async fn test_resource_pressure_integration() {
753 let mut qos = QosManager::new(QosConfig::default());
754
755 let initial_pressure = qos.get_resource_pressure();
757 assert!((initial_pressure.cpu_usage - 0.0).abs() < 0.01);
758
759 let moderate_pressure = ResourcePressure {
761 cpu_usage: 0.60,
762 memory_usage: 0.70,
763 disk_usage: 0.65,
764 bandwidth_usage: 0.55,
765 };
766 qos.update_resource_pressure(moderate_pressure);
767
768 assert!(!qos.is_under_high_pressure());
769 assert_eq!(qos.adaptive_queue_limit(), qos.config.max_queue_size);
770 }
771
772 #[tokio::test]
773 async fn test_adaptive_queue_limit() {
774 let mut qos = QosManager::new(QosConfig::default());
775 let base_limit = qos.config.max_queue_size;
776
777 qos.update_resource_pressure(ResourcePressure {
779 cpu_usage: 0.85,
780 memory_usage: 0.80,
781 disk_usage: 0.82,
782 bandwidth_usage: 0.78,
783 });
784 assert_eq!(qos.adaptive_queue_limit(), base_limit / 2);
785
786 qos.update_resource_pressure(ResourcePressure {
788 cpu_usage: 0.95,
789 memory_usage: 0.92,
790 disk_usage: 0.90,
791 bandwidth_usage: 0.88,
792 });
793 assert_eq!(qos.adaptive_queue_limit(), base_limit / 4);
794 }
795
796 #[tokio::test]
797 async fn test_throttle_priority_based_on_pressure() {
798 let mut qos = QosManager::new(QosConfig::default());
799
800 qos.update_resource_pressure(ResourcePressure {
802 cpu_usage: 0.40,
803 memory_usage: 0.50,
804 disk_usage: 0.45,
805 bandwidth_usage: 0.35,
806 });
807
808 assert!(!qos.should_throttle_priority(Priority::Critical));
809 assert!(!qos.should_throttle_priority(Priority::High));
810 assert!(!qos.should_throttle_priority(Priority::Normal));
811 assert!(!qos.should_throttle_priority(Priority::Low));
812
813 qos.update_resource_pressure(ResourcePressure {
815 cpu_usage: 0.88,
816 memory_usage: 0.90,
817 disk_usage: 0.87,
818 bandwidth_usage: 0.85,
819 });
820
821 assert!(!qos.should_throttle_priority(Priority::Critical));
822 assert!(!qos.should_throttle_priority(Priority::High));
823 assert!(qos.should_throttle_priority(Priority::Normal));
824 assert!(qos.should_throttle_priority(Priority::Low));
825 }
826
827 #[tokio::test]
828 async fn test_high_pressure_detection() {
829 let mut qos = QosManager::new(QosConfig::default());
830
831 qos.update_resource_pressure(ResourcePressure {
833 cpu_usage: 0.75,
834 memory_usage: 0.78,
835 disk_usage: 0.72,
836 bandwidth_usage: 0.70,
837 });
838 assert!(!qos.is_under_high_pressure());
839
840 qos.update_resource_pressure(ResourcePressure {
842 cpu_usage: 0.85,
843 memory_usage: 0.88,
844 disk_usage: 0.82,
845 bandwidth_usage: 0.80,
846 });
847 assert!(qos.is_under_high_pressure());
848 }
849}