use crate::degradation::ResourcePressure;
use std::{
collections::{HashMap, VecDeque},
sync::{Arc, Mutex},
time::Instant,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Priority {
Critical = 4,
High = 3,
Normal = 2,
Low = 1,
}
impl Default for Priority {
#[inline]
fn default() -> Self {
Self::Normal
}
}
#[derive(Debug, Clone)]
pub struct QosConfig {
pub max_queue_size: usize,
pub bandwidth_allocation: HashMap<Priority, u32>,
pub strict_priority: bool,
pub time_slice_ms: u64,
pub sla_target_latency_ms: HashMap<Priority, u64>,
}
impl Default for QosConfig {
#[inline]
fn default() -> Self {
let mut bandwidth_allocation = HashMap::new();
bandwidth_allocation.insert(Priority::Critical, 40);
bandwidth_allocation.insert(Priority::High, 30);
bandwidth_allocation.insert(Priority::Normal, 20);
bandwidth_allocation.insert(Priority::Low, 10);
let mut sla_target_latency_ms = HashMap::new();
sla_target_latency_ms.insert(Priority::Critical, 100);
sla_target_latency_ms.insert(Priority::High, 500);
sla_target_latency_ms.insert(Priority::Normal, 2000);
sla_target_latency_ms.insert(Priority::Low, 10000);
Self {
max_queue_size: 1000,
bandwidth_allocation,
strict_priority: true,
time_slice_ms: 100,
sla_target_latency_ms,
}
}
}
#[derive(Debug, Clone)]
pub struct RequestInfo {
pub id: String,
pub cid: String,
pub size_bytes: u64,
pub priority: Priority,
pub deadline_ms: Option<i64>,
}
#[derive(Debug, Clone)]
struct QueueEntry {
info: RequestInfo,
enqueued_at: Instant,
}
#[derive(Debug, Clone, Default)]
pub struct SlaMetrics {
pub total_requests: u64,
pub met_sla: u64,
pub violated_sla: u64,
pub avg_queue_time_ms: u64,
pub max_queue_time_ms: u64,
pub total_bytes: u64,
}
impl SlaMetrics {
#[must_use]
#[inline]
pub fn compliance_rate(&self) -> f64 {
if self.total_requests == 0 {
return 1.0;
}
self.met_sla as f64 / self.total_requests as f64
}
#[must_use]
#[inline]
pub fn violation_rate(&self) -> f64 {
1.0 - self.compliance_rate()
}
}
pub struct QosManager {
config: QosConfig,
queues: Arc<Mutex<HashMap<Priority, VecDeque<QueueEntry>>>>,
metrics: Arc<Mutex<HashMap<Priority, SlaMetrics>>>,
last_service: Arc<Mutex<HashMap<Priority, Instant>>>,
resource_pressure: Arc<Mutex<ResourcePressure>>,
}
impl QosManager {
#[must_use]
pub fn new(config: QosConfig) -> Self {
let mut queues = HashMap::new();
let mut metrics = HashMap::new();
let mut last_service = HashMap::new();
for &priority in &[
Priority::Critical,
Priority::High,
Priority::Normal,
Priority::Low,
] {
queues.insert(priority, VecDeque::new());
metrics.insert(priority, SlaMetrics::default());
last_service.insert(priority, Instant::now());
}
Self {
config,
queues: Arc::new(Mutex::new(queues)),
metrics: Arc::new(Mutex::new(metrics)),
last_service: Arc::new(Mutex::new(last_service)),
resource_pressure: Arc::new(Mutex::new(ResourcePressure::default())),
}
}
#[must_use]
pub async fn enqueue(&mut self, request: RequestInfo) -> bool {
let priority = request.priority;
let entry = QueueEntry {
info: request,
enqueued_at: Instant::now(),
};
let mut queues = self.queues.lock().unwrap();
if let Some(queue) = queues.get_mut(&priority) {
if queue.len() >= self.config.max_queue_size {
return false;
}
queue.push_back(entry);
true
} else {
false
}
}
#[must_use]
pub async fn dequeue(&mut self) -> Option<RequestInfo> {
let mut queues = self.queues.lock().unwrap();
let mut last_service = self.last_service.lock().unwrap();
if let Some((priority, index)) = self.find_urgent_deadline_request(&queues) {
if let Some(queue) = queues.get_mut(&priority) {
if let Some(entry) = queue.remove(index) {
let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
last_service.insert(priority, Instant::now());
self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
return Some(entry.info);
}
}
}
let priority = if self.config.strict_priority {
self.select_highest_priority(&queues)?
} else {
self.select_fair_priority(&queues, &last_service)?
};
if let Some(queue) = queues.get_mut(&priority) {
if let Some(entry) = queue.pop_front() {
let queue_time_ms = entry.enqueued_at.elapsed().as_millis() as u64;
last_service.insert(priority, Instant::now());
self.update_metrics(priority, entry.info.size_bytes, queue_time_ms);
return Some(entry.info);
}
}
None
}
#[must_use]
#[inline]
fn find_urgent_deadline_request(
&self,
queues: &HashMap<Priority, VecDeque<QueueEntry>>,
) -> Option<(Priority, usize)> {
let now = crate::utils::current_timestamp_ms();
let mut most_urgent: Option<(Priority, usize, i64)> = None;
for (&priority, queue) in queues {
for (idx, entry) in queue.iter().enumerate() {
if let Some(deadline) = entry.info.deadline_ms {
let time_to_deadline = deadline - now;
if time_to_deadline < 100 {
if let Some((_, _, prev_urgency)) = most_urgent {
if time_to_deadline < prev_urgency {
most_urgent = Some((priority, idx, time_to_deadline));
}
} else {
most_urgent = Some((priority, idx, time_to_deadline));
}
}
}
}
}
most_urgent.map(|(p, i, _)| (p, i))
}
#[must_use]
#[inline]
fn select_highest_priority(
&self,
queues: &HashMap<Priority, VecDeque<QueueEntry>>,
) -> Option<Priority> {
for &priority in &[
Priority::Critical,
Priority::High,
Priority::Normal,
Priority::Low,
] {
if let Some(queue) = queues.get(&priority) {
if !queue.is_empty() {
return Some(priority);
}
}
}
None
}
#[must_use]
#[inline]
fn select_fair_priority(
&self,
queues: &HashMap<Priority, VecDeque<QueueEntry>>,
last_service: &HashMap<Priority, Instant>,
) -> Option<Priority> {
let mut candidates = Vec::new();
for &priority in &[
Priority::Critical,
Priority::High,
Priority::Normal,
Priority::Low,
] {
if let Some(queue) = queues.get(&priority) {
if !queue.is_empty() {
candidates.push(priority);
}
}
}
if candidates.is_empty() {
return None;
}
candidates.into_iter().max_by_key(|&priority| {
let time_since = last_service
.get(&priority)
.map(|t| t.elapsed().as_millis() as u64)
.unwrap_or(0);
let weight = self
.config
.bandwidth_allocation
.get(&priority)
.copied()
.unwrap_or(1);
time_since * u64::from(weight)
})
}
#[inline]
fn update_metrics(&self, priority: Priority, bytes: u64, queue_time_ms: u64) {
let mut metrics = self.metrics.lock().unwrap();
if let Some(m) = metrics.get_mut(&priority) {
m.total_requests += 1;
m.total_bytes += bytes;
let total_time = m.avg_queue_time_ms * (m.total_requests - 1) + queue_time_ms;
m.avg_queue_time_ms = total_time / m.total_requests;
if queue_time_ms > m.max_queue_time_ms {
m.max_queue_time_ms = queue_time_ms;
}
if let Some(&target) = self.config.sla_target_latency_ms.get(&priority) {
if queue_time_ms <= target {
m.met_sla += 1;
} else {
m.violated_sla += 1;
}
}
}
}
#[must_use]
#[inline]
pub fn queue_depth(&self, priority: Priority) -> usize {
self.queues
.lock()
.unwrap()
.get(&priority)
.map(|q| q.len())
.unwrap_or(0)
}
#[must_use]
#[inline]
pub fn total_queue_depth(&self) -> usize {
self.queues.lock().unwrap().values().map(|q| q.len()).sum()
}
#[must_use]
#[inline]
pub fn get_sla_metrics(&self, priority: Priority) -> Option<SlaMetrics> {
self.metrics.lock().unwrap().get(&priority).cloned()
}
#[must_use]
#[inline]
pub fn get_all_sla_metrics(&self) -> HashMap<Priority, SlaMetrics> {
self.metrics.lock().unwrap().clone()
}
pub fn reset_metrics(&mut self) {
let mut metrics = self.metrics.lock().unwrap();
for m in metrics.values_mut() {
*m = SlaMetrics::default();
}
}
#[must_use]
#[inline]
pub fn is_near_capacity(&self) -> bool {
let queues = self.queues.lock().unwrap();
let threshold = (self.config.max_queue_size * 80) / 100;
queues.values().any(|q| q.len() > threshold)
}
#[must_use]
#[inline]
pub fn overall_compliance_rate(&self) -> f64 {
let metrics = self.metrics.lock().unwrap();
let mut total_requests = 0u64;
let mut total_met = 0u64;
for m in metrics.values() {
total_requests += m.total_requests;
total_met += m.met_sla;
}
if total_requests == 0 {
return 1.0;
}
total_met as f64 / total_requests as f64
}
pub fn update_resource_pressure(&mut self, pressure: ResourcePressure) {
let mut current = self.resource_pressure.lock().unwrap();
*current = pressure;
}
#[must_use]
pub fn get_resource_pressure(&self) -> ResourcePressure {
*self.resource_pressure.lock().unwrap()
}
#[must_use]
#[inline]
pub fn is_under_high_pressure(&self) -> bool {
let pressure = self.resource_pressure.lock().unwrap();
pressure.overall_score() > 0.80
}
#[must_use]
#[inline]
pub fn adaptive_queue_limit(&self) -> usize {
let pressure = self.resource_pressure.lock().unwrap();
let pressure_score = pressure.overall_score();
if pressure_score > 0.90 {
self.config.max_queue_size / 4
} else if pressure_score > 0.80 {
self.config.max_queue_size / 2
} else if pressure_score > 0.70 {
(self.config.max_queue_size * 3) / 4
} else {
self.config.max_queue_size
}
}
#[must_use]
#[inline]
pub fn should_throttle_priority(&self, priority: Priority) -> bool {
let pressure = self.resource_pressure.lock().unwrap();
let pressure_score = pressure.overall_score();
match priority {
Priority::Critical => false, Priority::High => pressure_score > 0.95, Priority::Normal => pressure_score > 0.85,
Priority::Low => pressure_score > 0.70,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn create_request(id: &str, priority: Priority) -> RequestInfo {
RequestInfo {
id: id.to_string(),
cid: format!("Qm{}", id),
size_bytes: 1024,
priority,
deadline_ms: None,
}
}
#[tokio::test]
async fn test_enqueue_dequeue() {
let mut qos = QosManager::new(QosConfig::default());
let req = create_request("test1", Priority::Normal);
assert!(qos.enqueue(req.clone()).await);
assert_eq!(qos.queue_depth(Priority::Normal), 1);
let dequeued = qos.dequeue().await;
assert!(dequeued.is_some());
assert_eq!(dequeued.unwrap().id, "test1");
assert_eq!(qos.queue_depth(Priority::Normal), 0);
}
#[tokio::test]
async fn test_strict_priority_ordering() {
let config = QosConfig {
strict_priority: true,
..Default::default()
};
let mut qos = QosManager::new(config);
let _ = qos.enqueue(create_request("low", Priority::Low)).await;
let _ = qos
.enqueue(create_request("normal", Priority::Normal))
.await;
let _ = qos.enqueue(create_request("high", Priority::High)).await;
let _ = qos
.enqueue(create_request("critical", Priority::Critical))
.await;
assert_eq!(qos.dequeue().await.unwrap().id, "critical");
assert_eq!(qos.dequeue().await.unwrap().id, "high");
assert_eq!(qos.dequeue().await.unwrap().id, "normal");
assert_eq!(qos.dequeue().await.unwrap().id, "low");
}
#[tokio::test]
async fn test_queue_capacity() {
let config = QosConfig {
max_queue_size: 3,
..Default::default()
};
let mut qos = QosManager::new(config);
assert!(qos.enqueue(create_request("1", Priority::Normal)).await);
assert!(qos.enqueue(create_request("2", Priority::Normal)).await);
assert!(qos.enqueue(create_request("3", Priority::Normal)).await);
assert!(!qos.enqueue(create_request("4", Priority::Normal)).await);
assert_eq!(qos.queue_depth(Priority::Normal), 3);
}
#[tokio::test]
async fn test_sla_metrics() {
let mut qos = QosManager::new(QosConfig::default());
let req = create_request("test", Priority::High);
let _ = qos.enqueue(req).await;
tokio::time::sleep(Duration::from_millis(10)).await;
let _ = qos.dequeue().await;
let metrics = qos.get_sla_metrics(Priority::High).unwrap();
assert_eq!(metrics.total_requests, 1);
assert!(metrics.avg_queue_time_ms >= 10);
}
#[tokio::test]
async fn test_sla_compliance() {
let mut config = QosConfig::default();
config.sla_target_latency_ms.insert(Priority::Normal, 1000);
let mut qos = QosManager::new(config);
let _ = qos.enqueue(create_request("fast", Priority::Normal)).await;
let _ = qos.dequeue().await;
let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
assert_eq!(metrics.met_sla, 1);
assert_eq!(metrics.violated_sla, 0);
assert_eq!(metrics.compliance_rate(), 1.0);
}
#[tokio::test]
async fn test_total_queue_depth() {
let mut qos = QosManager::new(QosConfig::default());
let _ = qos.enqueue(create_request("1", Priority::Critical)).await;
let _ = qos.enqueue(create_request("2", Priority::High)).await;
let _ = qos.enqueue(create_request("3", Priority::Normal)).await;
let _ = qos.enqueue(create_request("4", Priority::Low)).await;
assert_eq!(qos.total_queue_depth(), 4);
}
#[tokio::test]
async fn test_near_capacity() {
let config = QosConfig {
max_queue_size: 10,
..Default::default()
};
let mut qos = QosManager::new(config);
assert!(!qos.is_near_capacity());
for i in 0..9 {
let _ = qos
.enqueue(create_request(&format!("{}", i), Priority::Normal))
.await;
}
assert!(qos.is_near_capacity());
}
#[tokio::test]
async fn test_reset_metrics() {
let mut qos = QosManager::new(QosConfig::default());
let _ = qos.enqueue(create_request("test", Priority::Normal)).await;
let _ = qos.dequeue().await;
let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
assert_eq!(metrics.total_requests, 1);
qos.reset_metrics();
let metrics = qos.get_sla_metrics(Priority::Normal).unwrap();
assert_eq!(metrics.total_requests, 0);
}
#[tokio::test]
async fn test_overall_compliance_rate() {
let mut qos = QosManager::new(QosConfig::default());
assert_eq!(qos.overall_compliance_rate(), 1.0);
for priority in &[
Priority::Critical,
Priority::High,
Priority::Normal,
Priority::Low,
] {
let _ = qos.enqueue(create_request("test", *priority)).await;
let _ = qos.dequeue().await;
}
assert!(qos.overall_compliance_rate() > 0.9);
}
#[tokio::test]
async fn test_priority_default() {
assert_eq!(Priority::default(), Priority::Normal);
}
#[tokio::test]
async fn test_priority_ordering() {
assert!(Priority::Critical > Priority::High);
assert!(Priority::High > Priority::Normal);
assert!(Priority::Normal > Priority::Low);
}
#[tokio::test]
async fn test_fair_scheduling() {
let config = QosConfig {
strict_priority: false,
..Default::default()
};
let mut qos = QosManager::new(config);
for _ in 0..3 {
let _ = qos.enqueue(create_request("low", Priority::Low)).await;
let _ = qos
.enqueue(create_request("normal", Priority::Normal))
.await;
let _ = qos.enqueue(create_request("high", Priority::High)).await;
}
let mut served_priorities = std::collections::HashSet::new();
for _ in 0..9 {
if let Some(req) = qos.dequeue().await {
served_priorities.insert(req.priority);
}
}
assert_eq!(served_priorities.len(), 3);
}
#[tokio::test]
async fn test_resource_pressure_integration() {
let mut qos = QosManager::new(QosConfig::default());
let initial_pressure = qos.get_resource_pressure();
assert!((initial_pressure.cpu_usage - 0.0).abs() < 0.01);
let moderate_pressure = ResourcePressure {
cpu_usage: 0.60,
memory_usage: 0.70,
disk_usage: 0.65,
bandwidth_usage: 0.55,
};
qos.update_resource_pressure(moderate_pressure);
assert!(!qos.is_under_high_pressure());
assert_eq!(qos.adaptive_queue_limit(), qos.config.max_queue_size);
}
#[tokio::test]
async fn test_adaptive_queue_limit() {
let mut qos = QosManager::new(QosConfig::default());
let base_limit = qos.config.max_queue_size;
qos.update_resource_pressure(ResourcePressure {
cpu_usage: 0.85,
memory_usage: 0.80,
disk_usage: 0.82,
bandwidth_usage: 0.78,
});
assert_eq!(qos.adaptive_queue_limit(), base_limit / 2);
qos.update_resource_pressure(ResourcePressure {
cpu_usage: 0.95,
memory_usage: 0.92,
disk_usage: 0.90,
bandwidth_usage: 0.88,
});
assert_eq!(qos.adaptive_queue_limit(), base_limit / 4);
}
#[tokio::test]
async fn test_throttle_priority_based_on_pressure() {
let mut qos = QosManager::new(QosConfig::default());
qos.update_resource_pressure(ResourcePressure {
cpu_usage: 0.40,
memory_usage: 0.50,
disk_usage: 0.45,
bandwidth_usage: 0.35,
});
assert!(!qos.should_throttle_priority(Priority::Critical));
assert!(!qos.should_throttle_priority(Priority::High));
assert!(!qos.should_throttle_priority(Priority::Normal));
assert!(!qos.should_throttle_priority(Priority::Low));
qos.update_resource_pressure(ResourcePressure {
cpu_usage: 0.88,
memory_usage: 0.90,
disk_usage: 0.87,
bandwidth_usage: 0.85,
});
assert!(!qos.should_throttle_priority(Priority::Critical));
assert!(!qos.should_throttle_priority(Priority::High));
assert!(qos.should_throttle_priority(Priority::Normal));
assert!(qos.should_throttle_priority(Priority::Low));
}
#[tokio::test]
async fn test_high_pressure_detection() {
let mut qos = QosManager::new(QosConfig::default());
qos.update_resource_pressure(ResourcePressure {
cpu_usage: 0.75,
memory_usage: 0.78,
disk_usage: 0.72,
bandwidth_usage: 0.70,
});
assert!(!qos.is_under_high_pressure());
qos.update_resource_pressure(ResourcePressure {
cpu_usage: 0.85,
memory_usage: 0.88,
disk_usage: 0.82,
bandwidth_usage: 0.80,
});
assert!(qos.is_under_high_pressure());
}
}