Skip to main content

chie_core/
qos.rs

1//! Quality of Service (QoS) for priority-based request handling.
2//!
3//! This module provides priority-based scheduling, bandwidth allocation, and SLA tracking
4//! for content distribution requests. It ensures high-priority requests get preferential
5//! treatment while maintaining fairness across different priority levels.
6//!
7//! # Example
8//!
9//! ```rust
10//! use chie_core::qos::{QosManager, Priority, QosConfig, RequestInfo};
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     let config = QosConfig::default();
15//!     let mut qos = QosManager::new(config);
16//!
17//!     // Enqueue requests with different priorities
18//!     let req1 = RequestInfo {
19//!         id: "req1".to_string(),
20//!         cid: "QmCritical".to_string(),
21//!         size_bytes: 1024 * 1024,
22//!         priority: Priority::Critical,
23//!         deadline_ms: None,
24//!     };
25//!     qos.enqueue(req1).await;
26//!
27//!     // Dequeue processes highest priority first
28//!     if let Some(next) = qos.dequeue().await {
29//!         println!("Processing: {} (priority: {:?})", next.id, next.priority);
30//!     }
31//! }
32//! ```
33
34use crate::degradation::ResourcePressure;
35use std::{
36    collections::{HashMap, VecDeque},
37    sync::{Arc, Mutex},
38    time::Instant,
39};
40
41/// Request priority levels.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub enum Priority {
44    /// Critical priority (highest) - reserved for system-critical operations.
45    Critical = 4,
46    /// High priority - time-sensitive user requests.
47    High = 3,
48    /// Normal priority - standard user requests.
49    Normal = 2,
50    /// Low priority - background operations, prefetching.
51    Low = 1,
52}
53
54impl Default for Priority {
55    #[inline]
56    fn default() -> Self {
57        Self::Normal
58    }
59}
60
61/// Configuration for QoS manager.
62#[derive(Debug, Clone)]
63pub struct QosConfig {
64    /// Maximum queue size per priority level.
65    pub max_queue_size: usize,
66    /// Bandwidth allocation percentages by priority (must sum to 100).
67    pub bandwidth_allocation: HashMap<Priority, u32>,
68    /// Enable strict priority (higher priorities always go first).
69    pub strict_priority: bool,
70    /// Minimum time slice per priority in milliseconds (for fair scheduling).
71    pub time_slice_ms: u64,
72    /// SLA target latency in milliseconds per priority.
73    pub sla_target_latency_ms: HashMap<Priority, u64>,
74}
75
76impl Default for QosConfig {
77    #[inline]
78    fn default() -> Self {
79        let mut bandwidth_allocation = HashMap::new();
80        bandwidth_allocation.insert(Priority::Critical, 40);
81        bandwidth_allocation.insert(Priority::High, 30);
82        bandwidth_allocation.insert(Priority::Normal, 20);
83        bandwidth_allocation.insert(Priority::Low, 10);
84
85        let mut sla_target_latency_ms = HashMap::new();
86        sla_target_latency_ms.insert(Priority::Critical, 100);
87        sla_target_latency_ms.insert(Priority::High, 500);
88        sla_target_latency_ms.insert(Priority::Normal, 2000);
89        sla_target_latency_ms.insert(Priority::Low, 10000);
90
91        Self {
92            max_queue_size: 1000,
93            bandwidth_allocation,
94            strict_priority: true,
95            time_slice_ms: 100,
96            sla_target_latency_ms,
97        }
98    }
99}
100
101/// Information about a queued request.
102#[derive(Debug, Clone)]
103pub struct RequestInfo {
104    /// Unique request identifier.
105    pub id: String,
106    /// Content identifier.
107    pub cid: String,
108    /// Estimated size in bytes.
109    pub size_bytes: u64,
110    /// Request priority.
111    pub priority: Priority,
112    /// Optional deadline (absolute time in milliseconds since epoch).
113    /// If set, requests approaching deadline get priority boost.
114    pub deadline_ms: Option<i64>,
115}
116
117/// Internal queue entry with timing information.
118#[derive(Debug, Clone)]
119struct QueueEntry {
120    info: RequestInfo,
121    enqueued_at: Instant,
122}
123
124/// SLA metrics for a priority level.
125#[derive(Debug, Clone, Default)]
126pub struct SlaMetrics {
127    /// Total requests processed.
128    pub total_requests: u64,
129    /// Requests that met SLA target.
130    pub met_sla: u64,
131    /// Requests that violated SLA target.
132    pub violated_sla: u64,
133    /// Average queue time in milliseconds.
134    pub avg_queue_time_ms: u64,
135    /// Maximum queue time in milliseconds.
136    pub max_queue_time_ms: u64,
137    /// Total bytes processed.
138    pub total_bytes: u64,
139}
140
141impl SlaMetrics {
142    /// Calculate SLA compliance rate (0.0 to 1.0).
143    #[must_use]
144    #[inline]
145    pub fn compliance_rate(&self) -> f64 {
146        if self.total_requests == 0 {
147            return 1.0;
148        }
149        self.met_sla as f64 / self.total_requests as f64
150    }
151
152    /// Calculate violation rate (0.0 to 1.0).
153    #[must_use]
154    #[inline]
155    pub fn violation_rate(&self) -> f64 {
156        1.0 - self.compliance_rate()
157    }
158}
159
160/// Quality of Service manager for priority-based request handling.
161pub struct QosManager {
162    config: QosConfig,
163    /// Queues per priority level.
164    queues: Arc<Mutex<HashMap<Priority, VecDeque<QueueEntry>>>>,
165    /// SLA metrics per priority level.
166    metrics: Arc<Mutex<HashMap<Priority, SlaMetrics>>>,
167    /// Last service time per priority (for fair scheduling).
168    last_service: Arc<Mutex<HashMap<Priority, Instant>>>,
169    /// Current resource pressure for adaptive behavior.
170    resource_pressure: Arc<Mutex<ResourcePressure>>,
171}
172
173impl QosManager {
174    /// Create a new QoS manager with the given configuration.
175    #[must_use]
176    pub fn new(config: QosConfig) -> Self {
177        let mut queues = HashMap::new();
178        let mut metrics = HashMap::new();
179        let mut last_service = HashMap::new();
180
181        for &priority in &[
182            Priority::Critical,
183            Priority::High,
184            Priority::Normal,
185            Priority::Low,
186        ] {
187            queues.insert(priority, VecDeque::new());
188            metrics.insert(priority, SlaMetrics::default());
189            last_service.insert(priority, Instant::now());
190        }
191
192        Self {
193            config,
194            queues: Arc::new(Mutex::new(queues)),
195            metrics: Arc::new(Mutex::new(metrics)),
196            last_service: Arc::new(Mutex::new(last_service)),
197            resource_pressure: Arc::new(Mutex::new(ResourcePressure::default())),
198        }
199    }
200
201    /// Enqueue a request with the specified priority.
202    ///
203    /// Returns `true` if the request was enqueued, `false` if the queue is full.
204    #[must_use]
205    pub async fn enqueue(&mut self, request: RequestInfo) -> bool {
206        let priority = request.priority;
207        let entry = QueueEntry {
208            info: request,
209            enqueued_at: Instant::now(),
210        };
211
212        let mut queues = self.queues.lock().unwrap();
213        if let Some(queue) = queues.get_mut(&priority) {
214            if queue.len() >= self.config.max_queue_size {
215                return false;
216            }
217            queue.push_back(entry);
218            true
219        } else {
220            false
221        }
222    }
223
224    /// Dequeue the next request based on priority and scheduling policy.
225    ///
226    /// Supports deadline scheduling: requests with approaching deadlines are prioritized
227    /// over normal priority scheduling.
228    #[must_use]
229    pub async fn dequeue(&mut self) -> Option<RequestInfo> {
230        let mut queues = self.queues.lock().unwrap();
231        let mut last_service = self.last_service.lock().unwrap();
232
233        // First check for urgent deadline-critical requests
234        if let Some((priority, index)) = self.find_urgent_deadline_request(&queues) {
235            if let Some(queue) = queues.get_mut(&priority) {
236                if let Some(entry) = queue.remove(index) {
237                    let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
238                    last_service.insert(priority, Instant::now());
239                    self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
240                    return Some(entry.info);
241                }
242            }
243        }
244
245        // Otherwise, determine which priority to service next
246        let priority = if self.config.strict_priority {
247            // Strict priority: always serve highest priority first
248            self.select_highest_priority(&queues)?
249        } else {
250            // Fair scheduling: weighted round-robin
251            self.select_fair_priority(&queues, &last_service)?
252        };
253
254        // Dequeue from selected priority
255        if let Some(queue) = queues.get_mut(&priority) {
256            if let Some(entry) = queue.pop_front() {
257                let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
258
259                // Update last service time
260                last_service.insert(priority, Instant::now());
261
262                // Update metrics
263                self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
264
265                return Some(entry.info);
266            }
267        }
268
269        None
270    }
271
272    /// Find the most urgent request with an approaching deadline.
273    ///
274    /// Returns (priority, index) of the most urgent request, or None if no urgent requests.
275    /// A request is considered urgent if its deadline is within 100ms.
276    #[must_use]
277    #[inline]
278    fn find_urgent_deadline_request(
279        &self,
280        queues: &HashMap<Priority, VecDeque<QueueEntry>>,
281    ) -> Option<(Priority, usize)> {
282        let now = crate::utils::current_timestamp_ms();
283        let mut most_urgent: Option<(Priority, usize, i64)> = None; // (priority, index, time_to_deadline)
284
285        for (&priority, queue) in queues {
286            for (idx, entry) in queue.iter().enumerate() {
287                if let Some(deadline) = entry.info.deadline_ms {
288                    let time_to_deadline = deadline - now;
289
290                    // Consider urgent if deadline within next 100ms or already passed
291                    if time_to_deadline < 100 {
292                        // Update if this is more urgent (closer to/past deadline)
293                        if let Some((_, _, prev_urgency)) = most_urgent {
294                            if time_to_deadline < prev_urgency {
295                                most_urgent = Some((priority, idx, time_to_deadline));
296                            }
297                        } else {
298                            most_urgent = Some((priority, idx, time_to_deadline));
299                        }
300                    }
301                }
302            }
303        }
304
305        most_urgent.map(|(p, i, _)| (p, i))
306    }
307
308    /// Select the highest priority queue with items.
309    #[must_use]
310    #[inline]
311    fn select_highest_priority(
312        &self,
313        queues: &HashMap<Priority, VecDeque<QueueEntry>>,
314    ) -> Option<Priority> {
315        for &priority in &[
316            Priority::Critical,
317            Priority::High,
318            Priority::Normal,
319            Priority::Low,
320        ] {
321            if let Some(queue) = queues.get(&priority) {
322                if !queue.is_empty() {
323                    return Some(priority);
324                }
325            }
326        }
327        None
328    }
329
330    /// Select priority using fair scheduling (weighted round-robin).
331    #[must_use]
332    #[inline]
333    fn select_fair_priority(
334        &self,
335        queues: &HashMap<Priority, VecDeque<QueueEntry>>,
336        last_service: &HashMap<Priority, Instant>,
337    ) -> Option<Priority> {
338        let mut candidates = Vec::new();
339
340        // Find non-empty queues
341        for &priority in &[
342            Priority::Critical,
343            Priority::High,
344            Priority::Normal,
345            Priority::Low,
346        ] {
347            if let Some(queue) = queues.get(&priority) {
348                if !queue.is_empty() {
349                    candidates.push(priority);
350                }
351            }
352        }
353
354        if candidates.is_empty() {
355            return None;
356        }
357
358        // Select based on time since last service and priority weight
359        candidates.into_iter().max_by_key(|&priority| {
360            let time_since = last_service
361                .get(&priority)
362                .map(|t| t.elapsed().as_millis() as u64)
363                .unwrap_or(0);
364            let weight = self
365                .config
366                .bandwidth_allocation
367                .get(&priority)
368                .copied()
369                .unwrap_or(1);
370            time_since * u64::from(weight)
371        })
372    }
373
374    /// Update SLA metrics for a completed request.
375    #[inline]
376    fn update_metrics(&self, priority: Priority, bytes: u64, queue_time_ms: u64) {
377        let mut metrics = self.metrics.lock().unwrap();
378        if let Some(m) = metrics.get_mut(&priority) {
379            m.total_requests += 1;
380            m.total_bytes += bytes;
381
382            // Update average queue time
383            let total_time = m.avg_queue_time_ms * (m.total_requests - 1) + queue_time_ms;
384            m.avg_queue_time_ms = total_time / m.total_requests;
385
386            // Update max queue time
387            if queue_time_ms > m.max_queue_time_ms {
388                m.max_queue_time_ms = queue_time_ms;
389            }
390
391            // Check SLA compliance
392            if let Some(&target) = self.config.sla_target_latency_ms.get(&priority) {
393                if queue_time_ms <= target {
394                    m.met_sla += 1;
395                } else {
396                    m.violated_sla += 1;
397                }
398            }
399        }
400    }
401
402    /// Get current queue depth for a priority level.
403    #[must_use]
404    #[inline]
405    pub fn queue_depth(&self, priority: Priority) -> usize {
406        self.queues
407            .lock()
408            .unwrap()
409            .get(&priority)
410            .map(|q| q.len())
411            .unwrap_or(0)
412    }
413
414    /// Get total queue depth across all priorities.
415    #[must_use]
416    #[inline]
417    pub fn total_queue_depth(&self) -> usize {
418        self.queues.lock().unwrap().values().map(|q| q.len()).sum()
419    }
420
421    /// Get SLA metrics for a priority level.
422    #[must_use]
423    #[inline]
424    pub fn get_sla_metrics(&self, priority: Priority) -> Option<SlaMetrics> {
425        self.metrics.lock().unwrap().get(&priority).cloned()
426    }
427
428    /// Get SLA metrics for all priority levels.
429    #[must_use]
430    #[inline]
431    pub fn get_all_sla_metrics(&self) -> HashMap<Priority, SlaMetrics> {
432        self.metrics.lock().unwrap().clone()
433    }
434
435    /// Reset all SLA metrics.
436    pub fn reset_metrics(&mut self) {
437        let mut metrics = self.metrics.lock().unwrap();
438        for m in metrics.values_mut() {
439            *m = SlaMetrics::default();
440        }
441    }
442
443    /// Check if any queue is near capacity (>80% full).
444    #[must_use]
445    #[inline]
446    pub fn is_near_capacity(&self) -> bool {
447        let queues = self.queues.lock().unwrap();
448        let threshold = (self.config.max_queue_size * 80) / 100;
449        queues.values().any(|q| q.len() > threshold)
450    }
451
452    /// Get overall SLA compliance rate across all priorities.
453    #[must_use]
454    #[inline]
455    pub fn overall_compliance_rate(&self) -> f64 {
456        let metrics = self.metrics.lock().unwrap();
457        let mut total_requests = 0u64;
458        let mut total_met = 0u64;
459
460        for m in metrics.values() {
461            total_requests += m.total_requests;
462            total_met += m.met_sla;
463        }
464
465        if total_requests == 0 {
466            return 1.0;
467        }
468        total_met as f64 / total_requests as f64
469    }
470
471    /// Update current resource pressure.
472    ///
473    /// This allows QoS to adapt behavior based on system resource availability.
474    pub fn update_resource_pressure(&mut self, pressure: ResourcePressure) {
475        let mut current = self.resource_pressure.lock().unwrap();
476        *current = pressure;
477    }
478
479    /// Get current resource pressure.
480    #[must_use]
481    pub fn get_resource_pressure(&self) -> ResourcePressure {
482        *self.resource_pressure.lock().unwrap()
483    }
484
485    /// Check if system is under high resource pressure.
486    ///
487    /// Returns true if overall resource pressure exceeds 0.80.
488    #[must_use]
489    #[inline]
490    pub fn is_under_high_pressure(&self) -> bool {
491        let pressure = self.resource_pressure.lock().unwrap();
492        pressure.overall_score() > 0.80
493    }
494
495    /// Get adaptive queue size limit based on resource pressure.
496    ///
497    /// Reduces queue size when under pressure to prevent resource exhaustion.
498    #[must_use]
499    #[inline]
500    pub fn adaptive_queue_limit(&self) -> usize {
501        let pressure = self.resource_pressure.lock().unwrap();
502        let pressure_score = pressure.overall_score();
503
504        if pressure_score > 0.90 {
505            // Severe pressure: reduce to 25% capacity
506            self.config.max_queue_size / 4
507        } else if pressure_score > 0.80 {
508            // High pressure: reduce to 50% capacity
509            self.config.max_queue_size / 2
510        } else if pressure_score > 0.70 {
511            // Moderate pressure: reduce to 75% capacity
512            (self.config.max_queue_size * 3) / 4
513        } else {
514            // Normal: full capacity
515            self.config.max_queue_size
516        }
517    }
518
519    /// Check if should throttle low-priority requests based on resource pressure.
520    ///
521    /// Returns true if resource pressure is high and priority is Low.
522    #[must_use]
523    #[inline]
524    pub fn should_throttle_priority(&self, priority: Priority) -> bool {
525        let pressure = self.resource_pressure.lock().unwrap();
526        let pressure_score = pressure.overall_score();
527
528        match priority {
529            Priority::Critical => false,             // Never throttle critical
530            Priority::High => pressure_score > 0.95, // Only under severe pressure
531            Priority::Normal => pressure_score > 0.85,
532            Priority::Low => pressure_score > 0.70,
533        }
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use std::time::Duration;
541
542    fn create_request(id: &str, priority: Priority) -> RequestInfo {
543        RequestInfo {
544            id: id.to_string(),
545            cid: format!("Qm{}", id),
546            size_bytes: 1024,
547            priority,
548            deadline_ms: None,
549        }
550    }
551
552    #[tokio::test]
553    async fn test_enqueue_dequeue() {
554        let mut qos = QosManager::new(QosConfig::default());
555
556        let req = create_request("test1", Priority::Normal);
557        assert!(qos.enqueue(req.clone()).await);
558        assert_eq!(qos.queue_depth(Priority::Normal), 1);
559
560        let dequeued = qos.dequeue().await;
561        assert!(dequeued.is_some());
562        assert_eq!(dequeued.unwrap().id, "test1");
563        assert_eq!(qos.queue_depth(Priority::Normal), 0);
564    }
565
566    #[tokio::test]
567    async fn test_strict_priority_ordering() {
568        let config = QosConfig {
569            strict_priority: true,
570            ..Default::default()
571        };
572        let mut qos = QosManager::new(config);
573
574        // Enqueue in reverse priority order
575        let _ = qos.enqueue(create_request("low", Priority::Low)).await;
576        let _ = qos
577            .enqueue(create_request("normal", Priority::Normal))
578            .await;
579        let _ = qos.enqueue(create_request("high", Priority::High)).await;
580        let _ = qos
581            .enqueue(create_request("critical", Priority::Critical))
582            .await;
583
584        // Should dequeue in priority order
585        assert_eq!(qos.dequeue().await.unwrap().id, "critical");
586        assert_eq!(qos.dequeue().await.unwrap().id, "high");
587        assert_eq!(qos.dequeue().await.unwrap().id, "normal");
588        assert_eq!(qos.dequeue().await.unwrap().id, "low");
589    }
590
591    #[tokio::test]
592    async fn test_queue_capacity() {
593        let config = QosConfig {
594            max_queue_size: 3,
595            ..Default::default()
596        };
597        let mut qos = QosManager::new(config);
598
599        assert!(qos.enqueue(create_request("1", Priority::Normal)).await);
600        assert!(qos.enqueue(create_request("2", Priority::Normal)).await);
601        assert!(qos.enqueue(create_request("3", Priority::Normal)).await);
602        assert!(!qos.enqueue(create_request("4", Priority::Normal)).await); // Should fail
603
604        assert_eq!(qos.queue_depth(Priority::Normal), 3);
605    }
606
607    #[tokio::test]
608    async fn test_sla_metrics() {
609        let mut qos = QosManager::new(QosConfig::default());
610
611        let req = create_request("test", Priority::High);
612        let _ = qos.enqueue(req).await;
613
614        // Small delay to simulate queue time
615        tokio::time::sleep(Duration::from_millis(10)).await;
616
617        let _ = qos.dequeue().await;
618
619        let metrics = qos.get_sla_metrics(Priority::High).unwrap();
620        assert_eq!(metrics.total_requests, 1);
621        assert!(metrics.avg_queue_time_ms >= 10);
622    }
623
624    #[tokio::test]
625    async fn test_sla_compliance() {
626        let mut config = QosConfig::default();
627        config.sla_target_latency_ms.insert(Priority::Normal, 1000);
628        let mut qos = QosManager::new(config);
629
630        // Enqueue and immediately dequeue (should meet SLA)
631        let _ = qos.enqueue(create_request("fast", Priority::Normal)).await;
632        let _ = qos.dequeue().await;
633
634        let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
635        assert_eq!(metrics.met_sla, 1);
636        assert_eq!(metrics.violated_sla, 0);
637        assert_eq!(metrics.compliance_rate(), 1.0);
638    }
639
640    #[tokio::test]
641    async fn test_total_queue_depth() {
642        let mut qos = QosManager::new(QosConfig::default());
643
644        let _ = qos.enqueue(create_request("1", Priority::Critical)).await;
645        let _ = qos.enqueue(create_request("2", Priority::High)).await;
646        let _ = qos.enqueue(create_request("3", Priority::Normal)).await;
647        let _ = qos.enqueue(create_request("4", Priority::Low)).await;
648
649        assert_eq!(qos.total_queue_depth(), 4);
650    }
651
652    #[tokio::test]
653    async fn test_near_capacity() {
654        let config = QosConfig {
655            max_queue_size: 10,
656            ..Default::default()
657        };
658        let mut qos = QosManager::new(config);
659
660        assert!(!qos.is_near_capacity());
661
662        // Fill to 85% (9 out of 10)
663        for i in 0..9 {
664            let _ = qos
665                .enqueue(create_request(&format!("{}", i), Priority::Normal))
666                .await;
667        }
668
669        assert!(qos.is_near_capacity());
670    }
671
672    #[tokio::test]
673    async fn test_reset_metrics() {
674        let mut qos = QosManager::new(QosConfig::default());
675
676        let _ = qos.enqueue(create_request("test", Priority::Normal)).await;
677        let _ = qos.dequeue().await;
678
679        let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
680        assert_eq!(metrics.total_requests, 1);
681
682        qos.reset_metrics();
683
684        let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
685        assert_eq!(metrics.total_requests, 0);
686    }
687
688    #[tokio::test]
689    async fn test_overall_compliance_rate() {
690        let mut qos = QosManager::new(QosConfig::default());
691
692        // Initially should be 1.0 (100%)
693        assert_eq!(qos.overall_compliance_rate(), 1.0);
694
695        // Process some requests
696        for priority in &[
697            Priority::Critical,
698            Priority::High,
699            Priority::Normal,
700            Priority::Low,
701        ] {
702            let _ = qos.enqueue(create_request("test", *priority)).await;
703            let _ = qos.dequeue().await;
704        }
705
706        // Should still be high since we dequeued immediately
707        assert!(qos.overall_compliance_rate() > 0.9);
708    }
709
710    #[tokio::test]
711    async fn test_priority_default() {
712        assert_eq!(Priority::default(), Priority::Normal);
713    }
714
715    #[tokio::test]
716    async fn test_priority_ordering() {
717        assert!(Priority::Critical > Priority::High);
718        assert!(Priority::High > Priority::Normal);
719        assert!(Priority::Normal > Priority::Low);
720    }
721
722    #[tokio::test]
723    async fn test_fair_scheduling() {
724        let config = QosConfig {
725            strict_priority: false,
726            ..Default::default()
727        };
728        let mut qos = QosManager::new(config);
729
730        // Enqueue multiple requests at different priorities
731        for _ in 0..3 {
732            let _ = qos.enqueue(create_request("low", Priority::Low)).await;
733            let _ = qos
734                .enqueue(create_request("normal", Priority::Normal))
735                .await;
736            let _ = qos.enqueue(create_request("high", Priority::High)).await;
737        }
738
739        // Fair scheduling should eventually serve all priorities
740        let mut served_priorities = std::collections::HashSet::new();
741        for _ in 0..9 {
742            if let Some(req) = qos.dequeue().await {
743                served_priorities.insert(req.priority);
744            }
745        }
746
747        // All priorities should have been served
748        assert_eq!(served_priorities.len(), 3);
749    }
750
751    #[tokio::test]
752    async fn test_resource_pressure_integration() {
753        let mut qos = QosManager::new(QosConfig::default());
754
755        // Initial pressure should be default (all zeros)
756        let initial_pressure = qos.get_resource_pressure();
757        assert!((initial_pressure.cpu_usage - 0.0).abs() < 0.01);
758
759        // Update with moderate pressure
760        let moderate_pressure = ResourcePressure {
761            cpu_usage: 0.60,
762            memory_usage: 0.70,
763            disk_usage: 0.65,
764            bandwidth_usage: 0.55,
765        };
766        qos.update_resource_pressure(moderate_pressure);
767
768        assert!(!qos.is_under_high_pressure());
769        assert_eq!(qos.adaptive_queue_limit(), qos.config.max_queue_size);
770    }
771
772    #[tokio::test]
773    async fn test_adaptive_queue_limit() {
774        let mut qos = QosManager::new(QosConfig::default());
775        let base_limit = qos.config.max_queue_size;
776
777        // High pressure: 50% capacity
778        qos.update_resource_pressure(ResourcePressure {
779            cpu_usage: 0.85,
780            memory_usage: 0.80,
781            disk_usage: 0.82,
782            bandwidth_usage: 0.78,
783        });
784        assert_eq!(qos.adaptive_queue_limit(), base_limit / 2);
785
786        // Severe pressure: 25% capacity
787        qos.update_resource_pressure(ResourcePressure {
788            cpu_usage: 0.95,
789            memory_usage: 0.92,
790            disk_usage: 0.90,
791            bandwidth_usage: 0.88,
792        });
793        assert_eq!(qos.adaptive_queue_limit(), base_limit / 4);
794    }
795
796    #[tokio::test]
797    async fn test_throttle_priority_based_on_pressure() {
798        let mut qos = QosManager::new(QosConfig::default());
799
800        // Low pressure: no throttling
801        qos.update_resource_pressure(ResourcePressure {
802            cpu_usage: 0.40,
803            memory_usage: 0.50,
804            disk_usage: 0.45,
805            bandwidth_usage: 0.35,
806        });
807
808        assert!(!qos.should_throttle_priority(Priority::Critical));
809        assert!(!qos.should_throttle_priority(Priority::High));
810        assert!(!qos.should_throttle_priority(Priority::Normal));
811        assert!(!qos.should_throttle_priority(Priority::Low));
812
813        // High pressure: throttle low and normal
814        qos.update_resource_pressure(ResourcePressure {
815            cpu_usage: 0.88,
816            memory_usage: 0.90,
817            disk_usage: 0.87,
818            bandwidth_usage: 0.85,
819        });
820
821        assert!(!qos.should_throttle_priority(Priority::Critical));
822        assert!(!qos.should_throttle_priority(Priority::High));
823        assert!(qos.should_throttle_priority(Priority::Normal));
824        assert!(qos.should_throttle_priority(Priority::Low));
825    }
826
827    #[tokio::test]
828    async fn test_high_pressure_detection() {
829        let mut qos = QosManager::new(QosConfig::default());
830
831        // Just below threshold
832        qos.update_resource_pressure(ResourcePressure {
833            cpu_usage: 0.75,
834            memory_usage: 0.78,
835            disk_usage: 0.72,
836            bandwidth_usage: 0.70,
837        });
838        assert!(!qos.is_under_high_pressure());
839
840        // Above threshold
841        qos.update_resource_pressure(ResourcePressure {
842            cpu_usage: 0.85,
843            memory_usage: 0.88,
844            disk_usage: 0.82,
845            bandwidth_usage: 0.80,
846        });
847        assert!(qos.is_under_high_pressure());
848    }
849}