chie_core/
qos.rs

1//! Quality of Service (QoS) for priority-based request handling.
2//!
3//! This module provides priority-based scheduling, bandwidth allocation, and SLA tracking
4//! for content distribution requests. It ensures high-priority requests get preferential
5//! treatment while maintaining fairness across different priority levels.
6//!
7//! # Example
8//!
9//! ```rust
10//! use chie_core::qos::{QosManager, Priority, QosConfig, RequestInfo};
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     let config = QosConfig::default();
15//!     let mut qos = QosManager::new(config);
16//!
17//!     // Enqueue requests with different priorities
18//!     let req1 = RequestInfo {
19//!         id: "req1".to_string(),
20//!         cid: "QmCritical".to_string(),
21//!         size_bytes: 1024 * 1024,
22//!         priority: Priority::Critical,
23//!     };
24//!     qos.enqueue(req1).await;
25//!
26//!     // Dequeue processes highest priority first
27//!     if let Some(next) = qos.dequeue().await {
28//!         println!("Processing: {} (priority: {:?})", next.id, next.priority);
29//!     }
30//! }
31//! ```
32
33use crate::degradation::ResourcePressure;
34use std::{
35    collections::{HashMap, VecDeque},
36    sync::{Arc, Mutex},
37    time::Instant,
38};
39
40/// Request priority levels.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42pub enum Priority {
43    /// Critical priority (highest) - reserved for system-critical operations.
44    Critical = 4,
45    /// High priority - time-sensitive user requests.
46    High = 3,
47    /// Normal priority - standard user requests.
48    Normal = 2,
49    /// Low priority - background operations, prefetching.
50    Low = 1,
51}
52
53impl Default for Priority {
54    #[inline]
55    fn default() -> Self {
56        Self::Normal
57    }
58}
59
60/// Configuration for QoS manager.
61#[derive(Debug, Clone)]
62pub struct QosConfig {
63    /// Maximum queue size per priority level.
64    pub max_queue_size: usize,
65    /// Bandwidth allocation percentages by priority (must sum to 100).
66    pub bandwidth_allocation: HashMap<Priority, u32>,
67    /// Enable strict priority (higher priorities always go first).
68    pub strict_priority: bool,
69    /// Minimum time slice per priority in milliseconds (for fair scheduling).
70    pub time_slice_ms: u64,
71    /// SLA target latency in milliseconds per priority.
72    pub sla_target_latency_ms: HashMap<Priority, u64>,
73}
74
75impl Default for QosConfig {
76    #[inline]
77    fn default() -> Self {
78        let mut bandwidth_allocation = HashMap::new();
79        bandwidth_allocation.insert(Priority::Critical, 40);
80        bandwidth_allocation.insert(Priority::High, 30);
81        bandwidth_allocation.insert(Priority::Normal, 20);
82        bandwidth_allocation.insert(Priority::Low, 10);
83
84        let mut sla_target_latency_ms = HashMap::new();
85        sla_target_latency_ms.insert(Priority::Critical, 100);
86        sla_target_latency_ms.insert(Priority::High, 500);
87        sla_target_latency_ms.insert(Priority::Normal, 2000);
88        sla_target_latency_ms.insert(Priority::Low, 10000);
89
90        Self {
91            max_queue_size: 1000,
92            bandwidth_allocation,
93            strict_priority: true,
94            time_slice_ms: 100,
95            sla_target_latency_ms,
96        }
97    }
98}
99
100/// Information about a queued request.
101#[derive(Debug, Clone)]
102pub struct RequestInfo {
103    /// Unique request identifier.
104    pub id: String,
105    /// Content identifier.
106    pub cid: String,
107    /// Estimated size in bytes.
108    pub size_bytes: u64,
109    /// Request priority.
110    pub priority: Priority,
111    /// Optional deadline (absolute time in milliseconds since epoch).
112    /// If set, requests approaching deadline get priority boost.
113    pub deadline_ms: Option<i64>,
114}
115
116/// Internal queue entry with timing information.
117#[derive(Debug, Clone)]
118struct QueueEntry {
119    info: RequestInfo,
120    enqueued_at: Instant,
121}
122
123/// SLA metrics for a priority level.
124#[derive(Debug, Clone, Default)]
125pub struct SlaMetrics {
126    /// Total requests processed.
127    pub total_requests: u64,
128    /// Requests that met SLA target.
129    pub met_sla: u64,
130    /// Requests that violated SLA target.
131    pub violated_sla: u64,
132    /// Average queue time in milliseconds.
133    pub avg_queue_time_ms: u64,
134    /// Maximum queue time in milliseconds.
135    pub max_queue_time_ms: u64,
136    /// Total bytes processed.
137    pub total_bytes: u64,
138}
139
140impl SlaMetrics {
141    /// Calculate SLA compliance rate (0.0 to 1.0).
142    #[must_use]
143    #[inline]
144    pub fn compliance_rate(&self) -> f64 {
145        if self.total_requests == 0 {
146            return 1.0;
147        }
148        self.met_sla as f64 / self.total_requests as f64
149    }
150
151    /// Calculate violation rate (0.0 to 1.0).
152    #[must_use]
153    #[inline]
154    pub fn violation_rate(&self) -> f64 {
155        1.0 - self.compliance_rate()
156    }
157}
158
159/// Quality of Service manager for priority-based request handling.
160pub struct QosManager {
161    config: QosConfig,
162    /// Queues per priority level.
163    queues: Arc<Mutex<HashMap<Priority, VecDeque<QueueEntry>>>>,
164    /// SLA metrics per priority level.
165    metrics: Arc<Mutex<HashMap<Priority, SlaMetrics>>>,
166    /// Last service time per priority (for fair scheduling).
167    last_service: Arc<Mutex<HashMap<Priority, Instant>>>,
168    /// Current resource pressure for adaptive behavior.
169    resource_pressure: Arc<Mutex<ResourcePressure>>,
170}
171
172impl QosManager {
173    /// Create a new QoS manager with the given configuration.
174    #[must_use]
175    pub fn new(config: QosConfig) -> Self {
176        let mut queues = HashMap::new();
177        let mut metrics = HashMap::new();
178        let mut last_service = HashMap::new();
179
180        for &priority in &[
181            Priority::Critical,
182            Priority::High,
183            Priority::Normal,
184            Priority::Low,
185        ] {
186            queues.insert(priority, VecDeque::new());
187            metrics.insert(priority, SlaMetrics::default());
188            last_service.insert(priority, Instant::now());
189        }
190
191        Self {
192            config,
193            queues: Arc::new(Mutex::new(queues)),
194            metrics: Arc::new(Mutex::new(metrics)),
195            last_service: Arc::new(Mutex::new(last_service)),
196            resource_pressure: Arc::new(Mutex::new(ResourcePressure::default())),
197        }
198    }
199
200    /// Enqueue a request with the specified priority.
201    ///
202    /// Returns `true` if the request was enqueued, `false` if the queue is full.
203    #[must_use]
204    pub async fn enqueue(&mut self, request: RequestInfo) -> bool {
205        let priority = request.priority;
206        let entry = QueueEntry {
207            info: request,
208            enqueued_at: Instant::now(),
209        };
210
211        let mut queues = self.queues.lock().unwrap();
212        if let Some(queue) = queues.get_mut(&priority) {
213            if queue.len() >= self.config.max_queue_size {
214                return false;
215            }
216            queue.push_back(entry);
217            true
218        } else {
219            false
220        }
221    }
222
223    /// Dequeue the next request based on priority and scheduling policy.
224    ///
225    /// Supports deadline scheduling: requests with approaching deadlines are prioritized
226    /// over normal priority scheduling.
227    #[must_use]
228    pub async fn dequeue(&mut self) -> Option<RequestInfo> {
229        let mut queues = self.queues.lock().unwrap();
230        let mut last_service = self.last_service.lock().unwrap();
231
232        // First check for urgent deadline-critical requests
233        if let Some((priority, index)) = self.find_urgent_deadline_request(&queues) {
234            if let Some(queue) = queues.get_mut(&priority) {
235                if let Some(entry) = queue.remove(index) {
236                    let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
237                    last_service.insert(priority, Instant::now());
238                    self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
239                    return Some(entry.info);
240                }
241            }
242        }
243
244        // Otherwise, determine which priority to service next
245        let priority = if self.config.strict_priority {
246            // Strict priority: always serve highest priority first
247            self.select_highest_priority(&queues)?
248        } else {
249            // Fair scheduling: weighted round-robin
250            self.select_fair_priority(&queues, &last_service)?
251        };
252
253        // Dequeue from selected priority
254        if let Some(queue) = queues.get_mut(&priority) {
255            if let Some(entry) = queue.pop_front() {
256                let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
257
258                // Update last service time
259                last_service.insert(priority, Instant::now());
260
261                // Update metrics
262                self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
263
264                return Some(entry.info);
265            }
266        }
267
268        None
269    }
270
271    /// Find the most urgent request with an approaching deadline.
272    ///
273    /// Returns (priority, index) of the most urgent request, or None if no urgent requests.
274    /// A request is considered urgent if its deadline is within 100ms.
275    #[must_use]
276    #[inline]
277    fn find_urgent_deadline_request(
278        &self,
279        queues: &HashMap<Priority, VecDeque<QueueEntry>>,
280    ) -> Option<(Priority, usize)> {
281        let now = crate::utils::current_timestamp_ms();
282        let mut most_urgent: Option<(Priority, usize, i64)> = None; // (priority, index, time_to_deadline)
283
284        for (&priority, queue) in queues {
285            for (idx, entry) in queue.iter().enumerate() {
286                if let Some(deadline) = entry.info.deadline_ms {
287                    let time_to_deadline = deadline - now;
288
289                    // Consider urgent if deadline within next 100ms or already passed
290                    if time_to_deadline < 100 {
291                        // Update if this is more urgent (closer to/past deadline)
292                        if let Some((_, _, prev_urgency)) = most_urgent {
293                            if time_to_deadline < prev_urgency {
294                                most_urgent = Some((priority, idx, time_to_deadline));
295                            }
296                        } else {
297                            most_urgent = Some((priority, idx, time_to_deadline));
298                        }
299                    }
300                }
301            }
302        }
303
304        most_urgent.map(|(p, i, _)| (p, i))
305    }
306
307    /// Select the highest priority queue with items.
308    #[must_use]
309    #[inline]
310    fn select_highest_priority(
311        &self,
312        queues: &HashMap<Priority, VecDeque<QueueEntry>>,
313    ) -> Option<Priority> {
314        for &priority in &[
315            Priority::Critical,
316            Priority::High,
317            Priority::Normal,
318            Priority::Low,
319        ] {
320            if let Some(queue) = queues.get(&priority) {
321                if !queue.is_empty() {
322                    return Some(priority);
323                }
324            }
325        }
326        None
327    }
328
329    /// Select priority using fair scheduling (weighted round-robin).
330    #[must_use]
331    #[inline]
332    fn select_fair_priority(
333        &self,
334        queues: &HashMap<Priority, VecDeque<QueueEntry>>,
335        last_service: &HashMap<Priority, Instant>,
336    ) -> Option<Priority> {
337        let mut candidates = Vec::new();
338
339        // Find non-empty queues
340        for &priority in &[
341            Priority::Critical,
342            Priority::High,
343            Priority::Normal,
344            Priority::Low,
345        ] {
346            if let Some(queue) = queues.get(&priority) {
347                if !queue.is_empty() {
348                    candidates.push(priority);
349                }
350            }
351        }
352
353        if candidates.is_empty() {
354            return None;
355        }
356
357        // Select based on time since last service and priority weight
358        candidates.into_iter().max_by_key(|&priority| {
359            let time_since = last_service
360                .get(&priority)
361                .map(|t| t.elapsed().as_millis() as u64)
362                .unwrap_or(0);
363            let weight = self
364                .config
365                .bandwidth_allocation
366                .get(&priority)
367                .copied()
368                .unwrap_or(1);
369            time_since * u64::from(weight)
370        })
371    }
372
373    /// Update SLA metrics for a completed request.
374    #[inline]
375    fn update_metrics(&self, priority: Priority, bytes: u64, queue_time_ms: u64) {
376        let mut metrics = self.metrics.lock().unwrap();
377        if let Some(m) = metrics.get_mut(&priority) {
378            m.total_requests += 1;
379            m.total_bytes += bytes;
380
381            // Update average queue time
382            let total_time = m.avg_queue_time_ms * (m.total_requests - 1) + queue_time_ms;
383            m.avg_queue_time_ms = total_time / m.total_requests;
384
385            // Update max queue time
386            if queue_time_ms > m.max_queue_time_ms {
387                m.max_queue_time_ms = queue_time_ms;
388            }
389
390            // Check SLA compliance
391            if let Some(&target) = self.config.sla_target_latency_ms.get(&priority) {
392                if queue_time_ms <= target {
393                    m.met_sla += 1;
394                } else {
395                    m.violated_sla += 1;
396                }
397            }
398        }
399    }
400
401    /// Get current queue depth for a priority level.
402    #[must_use]
403    #[inline]
404    pub fn queue_depth(&self, priority: Priority) -> usize {
405        self.queues
406            .lock()
407            .unwrap()
408            .get(&priority)
409            .map(|q| q.len())
410            .unwrap_or(0)
411    }
412
413    /// Get total queue depth across all priorities.
414    #[must_use]
415    #[inline]
416    pub fn total_queue_depth(&self) -> usize {
417        self.queues.lock().unwrap().values().map(|q| q.len()).sum()
418    }
419
420    /// Get SLA metrics for a priority level.
421    #[must_use]
422    #[inline]
423    pub fn get_sla_metrics(&self, priority: Priority) -> Option<SlaMetrics> {
424        self.metrics.lock().unwrap().get(&priority).cloned()
425    }
426
427    /// Get SLA metrics for all priority levels.
428    #[must_use]
429    #[inline]
430    pub fn get_all_sla_metrics(&self) -> HashMap<Priority, SlaMetrics> {
431        self.metrics.lock().unwrap().clone()
432    }
433
434    /// Reset all SLA metrics.
435    pub fn reset_metrics(&mut self) {
436        let mut metrics = self.metrics.lock().unwrap();
437        for m in metrics.values_mut() {
438            *m = SlaMetrics::default();
439        }
440    }
441
442    /// Check if any queue is near capacity (>80% full).
443    #[must_use]
444    #[inline]
445    pub fn is_near_capacity(&self) -> bool {
446        let queues = self.queues.lock().unwrap();
447        let threshold = (self.config.max_queue_size * 80) / 100;
448        queues.values().any(|q| q.len() > threshold)
449    }
450
451    /// Get overall SLA compliance rate across all priorities.
452    #[must_use]
453    #[inline]
454    pub fn overall_compliance_rate(&self) -> f64 {
455        let metrics = self.metrics.lock().unwrap();
456        let mut total_requests = 0u64;
457        let mut total_met = 0u64;
458
459        for m in metrics.values() {
460            total_requests += m.total_requests;
461            total_met += m.met_sla;
462        }
463
464        if total_requests == 0 {
465            return 1.0;
466        }
467        total_met as f64 / total_requests as f64
468    }
469
470    /// Update current resource pressure.
471    ///
472    /// This allows QoS to adapt behavior based on system resource availability.
473    pub fn update_resource_pressure(&mut self, pressure: ResourcePressure) {
474        let mut current = self.resource_pressure.lock().unwrap();
475        *current = pressure;
476    }
477
478    /// Get current resource pressure.
479    #[must_use]
480    pub fn get_resource_pressure(&self) -> ResourcePressure {
481        *self.resource_pressure.lock().unwrap()
482    }
483
484    /// Check if system is under high resource pressure.
485    ///
486    /// Returns true if overall resource pressure exceeds 0.80.
487    #[must_use]
488    #[inline]
489    pub fn is_under_high_pressure(&self) -> bool {
490        let pressure = self.resource_pressure.lock().unwrap();
491        pressure.overall_score() > 0.80
492    }
493
494    /// Get adaptive queue size limit based on resource pressure.
495    ///
496    /// Reduces queue size when under pressure to prevent resource exhaustion.
497    #[must_use]
498    #[inline]
499    pub fn adaptive_queue_limit(&self) -> usize {
500        let pressure = self.resource_pressure.lock().unwrap();
501        let pressure_score = pressure.overall_score();
502
503        if pressure_score > 0.90 {
504            // Severe pressure: reduce to 25% capacity
505            self.config.max_queue_size / 4
506        } else if pressure_score > 0.80 {
507            // High pressure: reduce to 50% capacity
508            self.config.max_queue_size / 2
509        } else if pressure_score > 0.70 {
510            // Moderate pressure: reduce to 75% capacity
511            (self.config.max_queue_size * 3) / 4
512        } else {
513            // Normal: full capacity
514            self.config.max_queue_size
515        }
516    }
517
518    /// Check if should throttle low-priority requests based on resource pressure.
519    ///
520    /// Returns true if resource pressure is high and priority is Low.
521    #[must_use]
522    #[inline]
523    pub fn should_throttle_priority(&self, priority: Priority) -> bool {
524        let pressure = self.resource_pressure.lock().unwrap();
525        let pressure_score = pressure.overall_score();
526
527        match priority {
528            Priority::Critical => false,             // Never throttle critical
529            Priority::High => pressure_score > 0.95, // Only under severe pressure
530            Priority::Normal => pressure_score > 0.85,
531            Priority::Low => pressure_score > 0.70,
532        }
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use std::time::Duration;
540
541    fn create_request(id: &str, priority: Priority) -> RequestInfo {
542        RequestInfo {
543            id: id.to_string(),
544            cid: format!("Qm{}", id),
545            size_bytes: 1024,
546            priority,
547            deadline_ms: None,
548        }
549    }
550
551    #[tokio::test]
552    async fn test_enqueue_dequeue() {
553        let mut qos = QosManager::new(QosConfig::default());
554
555        let req = create_request("test1", Priority::Normal);
556        assert!(qos.enqueue(req.clone()).await);
557        assert_eq!(qos.queue_depth(Priority::Normal), 1);
558
559        let dequeued = qos.dequeue().await;
560        assert!(dequeued.is_some());
561        assert_eq!(dequeued.unwrap().id, "test1");
562        assert_eq!(qos.queue_depth(Priority::Normal), 0);
563    }
564
565    #[tokio::test]
566    async fn test_strict_priority_ordering() {
567        let config = QosConfig {
568            strict_priority: true,
569            ..Default::default()
570        };
571        let mut qos = QosManager::new(config);
572
573        // Enqueue in reverse priority order
574        let _ = qos.enqueue(create_request("low", Priority::Low)).await;
575        let _ = qos
576            .enqueue(create_request("normal", Priority::Normal))
577            .await;
578        let _ = qos.enqueue(create_request("high", Priority::High)).await;
579        let _ = qos
580            .enqueue(create_request("critical", Priority::Critical))
581            .await;
582
583        // Should dequeue in priority order
584        assert_eq!(qos.dequeue().await.unwrap().id, "critical");
585        assert_eq!(qos.dequeue().await.unwrap().id, "high");
586        assert_eq!(qos.dequeue().await.unwrap().id, "normal");
587        assert_eq!(qos.dequeue().await.unwrap().id, "low");
588    }
589
590    #[tokio::test]
591    async fn test_queue_capacity() {
592        let config = QosConfig {
593            max_queue_size: 3,
594            ..Default::default()
595        };
596        let mut qos = QosManager::new(config);
597
598        assert!(qos.enqueue(create_request("1", Priority::Normal)).await);
599        assert!(qos.enqueue(create_request("2", Priority::Normal)).await);
600        assert!(qos.enqueue(create_request("3", Priority::Normal)).await);
601        assert!(!qos.enqueue(create_request("4", Priority::Normal)).await); // Should fail
602
603        assert_eq!(qos.queue_depth(Priority::Normal), 3);
604    }
605
606    #[tokio::test]
607    async fn test_sla_metrics() {
608        let mut qos = QosManager::new(QosConfig::default());
609
610        let req = create_request("test", Priority::High);
611        let _ = qos.enqueue(req).await;
612
613        // Small delay to simulate queue time
614        tokio::time::sleep(Duration::from_millis(10)).await;
615
616        let _ = qos.dequeue().await;
617
618        let metrics = qos.get_sla_metrics(Priority::High).unwrap();
619        assert_eq!(metrics.total_requests, 1);
620        assert!(metrics.avg_queue_time_ms >= 10);
621    }
622
623    #[tokio::test]
624    async fn test_sla_compliance() {
625        let mut config = QosConfig::default();
626        config.sla_target_latency_ms.insert(Priority::Normal, 1000);
627        let mut qos = QosManager::new(config);
628
629        // Enqueue and immediately dequeue (should meet SLA)
630        let _ = qos.enqueue(create_request("fast", Priority::Normal)).await;
631        let _ = qos.dequeue().await;
632
633        let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
634        assert_eq!(metrics.met_sla, 1);
635        assert_eq!(metrics.violated_sla, 0);
636        assert_eq!(metrics.compliance_rate(), 1.0);
637    }
638
639    #[tokio::test]
640    async fn test_total_queue_depth() {
641        let mut qos = QosManager::new(QosConfig::default());
642
643        let _ = qos.enqueue(create_request("1", Priority::Critical)).await;
644        let _ = qos.enqueue(create_request("2", Priority::High)).await;
645        let _ = qos.enqueue(create_request("3", Priority::Normal)).await;
646        let _ = qos.enqueue(create_request("4", Priority::Low)).await;
647
648        assert_eq!(qos.total_queue_depth(), 4);
649    }
650
651    #[tokio::test]
652    async fn test_near_capacity() {
653        let config = QosConfig {
654            max_queue_size: 10,
655            ..Default::default()
656        };
657        let mut qos = QosManager::new(config);
658
659        assert!(!qos.is_near_capacity());
660
661        // Fill to 85% (9 out of 10)
662        for i in 0..9 {
663            let _ = qos
664                .enqueue(create_request(&format!("{}", i), Priority::Normal))
665                .await;
666        }
667
668        assert!(qos.is_near_capacity());
669    }
670
671    #[tokio::test]
672    async fn test_reset_metrics() {
673        let mut qos = QosManager::new(QosConfig::default());
674
675        let _ = qos.enqueue(create_request("test", Priority::Normal)).await;
676        let _ = qos.dequeue().await;
677
678        let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
679        assert_eq!(metrics.total_requests, 1);
680
681        qos.reset_metrics();
682
683        let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
684        assert_eq!(metrics.total_requests, 0);
685    }
686
687    #[tokio::test]
688    async fn test_overall_compliance_rate() {
689        let mut qos = QosManager::new(QosConfig::default());
690
691        // Initially should be 1.0 (100%)
692        assert_eq!(qos.overall_compliance_rate(), 1.0);
693
694        // Process some requests
695        for priority in &[
696            Priority::Critical,
697            Priority::High,
698            Priority::Normal,
699            Priority::Low,
700        ] {
701            let _ = qos.enqueue(create_request("test", *priority)).await;
702            let _ = qos.dequeue().await;
703        }
704
705        // Should still be high since we dequeued immediately
706        assert!(qos.overall_compliance_rate() > 0.9);
707    }
708
709    #[tokio::test]
710    async fn test_priority_default() {
711        assert_eq!(Priority::default(), Priority::Normal);
712    }
713
714    #[tokio::test]
715    async fn test_priority_ordering() {
716        assert!(Priority::Critical > Priority::High);
717        assert!(Priority::High > Priority::Normal);
718        assert!(Priority::Normal > Priority::Low);
719    }
720
721    #[tokio::test]
722    async fn test_fair_scheduling() {
723        let config = QosConfig {
724            strict_priority: false,
725            ..Default::default()
726        };
727        let mut qos = QosManager::new(config);
728
729        // Enqueue multiple requests at different priorities
730        for _ in 0..3 {
731            let _ = qos.enqueue(create_request("low", Priority::Low)).await;
732            let _ = qos
733                .enqueue(create_request("normal", Priority::Normal))
734                .await;
735            let _ = qos.enqueue(create_request("high", Priority::High)).await;
736        }
737
738        // Fair scheduling should eventually serve all priorities
739        let mut served_priorities = std::collections::HashSet::new();
740        for _ in 0..9 {
741            if let Some(req) = qos.dequeue().await {
742                served_priorities.insert(req.priority);
743            }
744        }
745
746        // All priorities should have been served
747        assert_eq!(served_priorities.len(), 3);
748    }
749
750    #[tokio::test]
751    async fn test_resource_pressure_integration() {
752        let mut qos = QosManager::new(QosConfig::default());
753
754        // Initial pressure should be default (all zeros)
755        let initial_pressure = qos.get_resource_pressure();
756        assert!((initial_pressure.cpu_usage - 0.0).abs() < 0.01);
757
758        // Update with moderate pressure
759        let moderate_pressure = ResourcePressure {
760            cpu_usage: 0.60,
761            memory_usage: 0.70,
762            disk_usage: 0.65,
763            bandwidth_usage: 0.55,
764        };
765        qos.update_resource_pressure(moderate_pressure);
766
767        assert!(!qos.is_under_high_pressure());
768        assert_eq!(qos.adaptive_queue_limit(), qos.config.max_queue_size);
769    }
770
771    #[tokio::test]
772    async fn test_adaptive_queue_limit() {
773        let mut qos = QosManager::new(QosConfig::default());
774        let base_limit = qos.config.max_queue_size;
775
776        // High pressure: 50% capacity
777        qos.update_resource_pressure(ResourcePressure {
778            cpu_usage: 0.85,
779            memory_usage: 0.80,
780            disk_usage: 0.82,
781            bandwidth_usage: 0.78,
782        });
783        assert_eq!(qos.adaptive_queue_limit(), base_limit / 2);
784
785        // Severe pressure: 25% capacity
786        qos.update_resource_pressure(ResourcePressure {
787            cpu_usage: 0.95,
788            memory_usage: 0.92,
789            disk_usage: 0.90,
790            bandwidth_usage: 0.88,
791        });
792        assert_eq!(qos.adaptive_queue_limit(), base_limit / 4);
793    }
794
795    #[tokio::test]
796    async fn test_throttle_priority_based_on_pressure() {
797        let mut qos = QosManager::new(QosConfig::default());
798
799        // Low pressure: no throttling
800        qos.update_resource_pressure(ResourcePressure {
801            cpu_usage: 0.40,
802            memory_usage: 0.50,
803            disk_usage: 0.45,
804            bandwidth_usage: 0.35,
805        });
806
807        assert!(!qos.should_throttle_priority(Priority::Critical));
808        assert!(!qos.should_throttle_priority(Priority::High));
809        assert!(!qos.should_throttle_priority(Priority::Normal));
810        assert!(!qos.should_throttle_priority(Priority::Low));
811
812        // High pressure: throttle low and normal
813        qos.update_resource_pressure(ResourcePressure {
814            cpu_usage: 0.88,
815            memory_usage: 0.90,
816            disk_usage: 0.87,
817            bandwidth_usage: 0.85,
818        });
819
820        assert!(!qos.should_throttle_priority(Priority::Critical));
821        assert!(!qos.should_throttle_priority(Priority::High));
822        assert!(qos.should_throttle_priority(Priority::Normal));
823        assert!(qos.should_throttle_priority(Priority::Low));
824    }
825
826    #[tokio::test]
827    async fn test_high_pressure_detection() {
828        let mut qos = QosManager::new(QosConfig::default());
829
830        // Just below threshold
831        qos.update_resource_pressure(ResourcePressure {
832            cpu_usage: 0.75,
833            memory_usage: 0.78,
834            disk_usage: 0.72,
835            bandwidth_usage: 0.70,
836        });
837        assert!(!qos.is_under_high_pressure());
838
839        // Above threshold
840        qos.update_resource_pressure(ResourcePressure {
841            cpu_usage: 0.85,
842            memory_usage: 0.88,
843            disk_usage: 0.82,
844            bandwidth_usage: 0.80,
845        });
846        assert!(qos.is_under_high_pressure());
847    }
848}