use std::collections::VecDeque;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use parking_lot::{Mutex, RwLock};
use tokio::sync::Notify;
use crate::batching::BatchPriority;
pub const NUM_PRIORITY_LEVELS: usize = 4;
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub max_queue_depth: usize,
pub max_per_priority: usize,
pub priority_weights: [f32; NUM_PRIORITY_LEVELS],
pub starvation_timeout: Duration,
pub starvation_promotion_priority: BatchPriority,
pub enable_wfq: bool,
pub max_wait_time: Duration,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
max_queue_depth: 1000,
max_per_priority: 500,
priority_weights: [1.0, 1.0, 2.0, 4.0],
starvation_timeout: Duration::from_secs(30),
starvation_promotion_priority: BatchPriority::High,
enable_wfq: true,
max_wait_time: Duration::from_secs(60),
}
}
}
impl QueueConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_queue_depth(mut self, depth: usize) -> Self {
self.max_queue_depth = depth;
self
}
pub fn with_max_per_priority(mut self, max: usize) -> Self {
self.max_per_priority = max;
self
}
pub fn with_priority_weights(mut self, weights: [f32; NUM_PRIORITY_LEVELS]) -> Self {
self.priority_weights = weights;
self
}
pub fn with_starvation_timeout(mut self, timeout: Duration) -> Self {
self.starvation_timeout = timeout;
self
}
pub fn with_wfq(mut self, enabled: bool) -> Self {
self.enable_wfq = enabled;
self
}
pub fn with_max_wait_time(mut self, timeout: Duration) -> Self {
self.max_wait_time = timeout;
self
}
pub fn weight_for(&self, priority: BatchPriority) -> f32 {
self.priority_weights[priority.as_level() as usize]
}
}
#[derive(Debug)]
pub struct QueuedRequest {
pub id: String,
pub priority: BatchPriority,
pub original_priority: BatchPriority,
pub enqueued_at: Instant,
pub estimated_tokens: usize,
pub model: String,
pub streamable: bool,
payload: Vec<u8>,
}
impl QueuedRequest {
pub fn new(
id: impl Into<String>,
priority: BatchPriority,
model: impl Into<String>,
estimated_tokens: usize,
) -> Self {
Self {
id: id.into(),
priority,
original_priority: priority,
enqueued_at: Instant::now(),
estimated_tokens,
model: model.into(),
streamable: false,
payload: Vec::new(),
}
}
pub fn with_payload(mut self, payload: Vec<u8>) -> Self {
self.payload = payload;
self
}
pub fn with_streamable(mut self, streamable: bool) -> Self {
self.streamable = streamable;
self
}
pub fn payload(&self) -> &[u8] {
&self.payload
}
pub fn take_payload(&mut self) -> Vec<u8> {
std::mem::take(&mut self.payload)
}
pub fn wait_time(&self) -> Duration {
self.enqueued_at.elapsed()
}
pub fn is_starving(&self, timeout: Duration) -> bool {
self.wait_time() > timeout
}
pub fn promote(&mut self, new_priority: BatchPriority) {
if new_priority > self.priority {
self.priority = new_priority;
}
}
}
#[derive(Debug, Clone)]
pub enum QueueError {
QueueFull {
current: usize,
max: usize,
},
PriorityQueueFull {
priority: BatchPriority,
current: usize,
max: usize,
},
WouldExceedMaxWait {
estimated_wait: Duration,
max_wait: Duration,
},
ShuttingDown,
}
impl fmt::Display for QueueError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::QueueFull { current, max } => {
write!(f, "Queue full: {}/{}", current, max)
},
Self::PriorityQueueFull {
priority,
current,
max,
} => {
write!(f, "Priority queue {} full: {}/{}", priority, current, max)
},
Self::WouldExceedMaxWait {
estimated_wait,
max_wait,
} => {
write!(
f,
"Would exceed max wait: {:?} > {:?}",
estimated_wait, max_wait
)
},
Self::ShuttingDown => write!(f, "Queue is shutting down"),
}
}
}
impl std::error::Error for QueueError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueueState {
Open,
Draining,
Closed,
}
impl fmt::Display for QueueState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Open => write!(f, "open"),
Self::Draining => write!(f, "draining"),
Self::Closed => write!(f, "closed"),
}
}
}
pub struct RequestQueue {
config: QueueConfig,
state: RwLock<QueueState>,
queues: [Mutex<VecDeque<QueuedRequest>>; NUM_PRIORITY_LEVELS],
credits: [AtomicU64; NUM_PRIORITY_LEVELS],
metrics: QueueMetrics,
notify_enqueue: Notify,
notify_dequeue: Notify,
}
impl RequestQueue {
pub fn new(config: QueueConfig) -> Self {
Self {
config,
state: RwLock::new(QueueState::Open),
queues: [
Mutex::new(VecDeque::new()),
Mutex::new(VecDeque::new()),
Mutex::new(VecDeque::new()),
Mutex::new(VecDeque::new()),
],
credits: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
metrics: QueueMetrics::new(),
notify_enqueue: Notify::new(),
notify_dequeue: Notify::new(),
}
}
pub fn config(&self) -> &QueueConfig {
&self.config
}
pub fn state(&self) -> QueueState {
*self.state.read()
}
pub fn metrics(&self) -> &QueueMetrics {
&self.metrics
}
pub fn len(&self) -> usize {
self.queues.iter().map(|q| q.lock().len()).sum()
}
pub fn is_empty(&self) -> bool {
self.queues.iter().all(|q| q.lock().is_empty())
}
pub fn len_at_priority(&self, priority: BatchPriority) -> usize {
self.queues[priority.as_level() as usize].lock().len()
}
pub fn enqueue(&self, request: QueuedRequest) -> Result<(), QueueError> {
if *self.state.read() != QueueState::Open {
return Err(QueueError::ShuttingDown);
}
let priority_idx = request.priority.as_level() as usize;
let total_depth = self.len();
if total_depth >= self.config.max_queue_depth {
self.metrics.record_rejected(request.priority);
return Err(QueueError::QueueFull {
current: total_depth,
max: self.config.max_queue_depth,
});
}
let priority_depth = self.queues[priority_idx].lock().len();
if priority_depth >= self.config.max_per_priority {
self.metrics.record_rejected(request.priority);
return Err(QueueError::PriorityQueueFull {
priority: request.priority,
current: priority_depth,
max: self.config.max_per_priority,
});
}
self.queues[priority_idx].lock().push_back(request);
self.metrics
.record_enqueued(BatchPriority::from_level(priority_idx as u8));
self.notify_enqueue.notify_one();
Ok(())
}
pub fn dequeue(&self) -> Option<QueuedRequest> {
if let Some(request) = self.dequeue_starving() {
self.metrics.record_starvation_promotion();
return Some(request);
}
if self.config.enable_wfq {
self.dequeue_wfq()
} else {
self.dequeue_priority()
}
}
fn dequeue_priority(&self) -> Option<QueuedRequest> {
for priority in [
BatchPriority::Critical,
BatchPriority::High,
BatchPriority::Normal,
BatchPriority::Background,
] {
let idx = priority.as_level() as usize;
let mut queue = self.queues[idx].lock();
if let Some(request) = queue.pop_front() {
drop(queue);
self.metrics.record_dequeued(priority);
self.metrics.record_wait_time(request.wait_time());
self.notify_dequeue.notify_one();
return Some(request);
}
}
None
}
fn dequeue_wfq(&self) -> Option<QueuedRequest> {
for (idx, weight) in self.config.priority_weights.iter().enumerate() {
let credit_add = (*weight * 100.0) as u64;
self.credits[idx].fetch_add(credit_add, Ordering::Relaxed);
}
let mut best_idx: Option<usize> = None;
let mut best_credits: u64 = 0;
for idx in 0..NUM_PRIORITY_LEVELS {
if !self.queues[idx].lock().is_empty() {
let credits = self.credits[idx].load(Ordering::Relaxed);
if credits > best_credits {
best_credits = credits;
best_idx = Some(idx);
}
}
}
if let Some(idx) = best_idx {
let mut queue = self.queues[idx].lock();
if let Some(request) = queue.pop_front() {
drop(queue);
let cost = request.estimated_tokens.max(1) as u64;
self.credits[idx].fetch_sub(cost.min(best_credits), Ordering::Relaxed);
let priority = BatchPriority::from_level(idx as u8);
self.metrics.record_dequeued(priority);
self.metrics.record_wait_time(request.wait_time());
self.notify_dequeue.notify_one();
return Some(request);
}
}
None
}
fn dequeue_starving(&self) -> Option<QueuedRequest> {
for priority in [BatchPriority::Background, BatchPriority::Normal] {
let idx = priority.as_level() as usize;
let mut queue = self.queues[idx].lock();
if let Some(pos) = queue
.iter()
.position(|r| r.is_starving(self.config.starvation_timeout))
{
if let Some(mut request) = queue.remove(pos) {
drop(queue);
request.promote(self.config.starvation_promotion_priority);
self.metrics.record_dequeued(priority);
self.metrics.record_wait_time(request.wait_time());
self.notify_dequeue.notify_one();
return Some(request);
}
}
}
None
}
pub fn dequeue_batch(&self, n: usize) -> Vec<QueuedRequest> {
let mut batch = Vec::with_capacity(n);
for _ in 0..n {
if let Some(request) = self.dequeue() {
batch.push(request);
} else {
break;
}
}
batch
}
pub fn peek(&self) -> Option<PeekResult> {
for priority in [BatchPriority::Background, BatchPriority::Normal] {
let idx = priority.as_level() as usize;
let queue = self.queues[idx].lock();
if let Some(request) = queue
.iter()
.find(|r| r.is_starving(self.config.starvation_timeout))
{
return Some(PeekResult {
id: request.id.clone(),
priority: request.priority,
wait_time: request.wait_time(),
is_starving: true,
});
}
}
for priority in [
BatchPriority::Critical,
BatchPriority::High,
BatchPriority::Normal,
BatchPriority::Background,
] {
let idx = priority.as_level() as usize;
let queue = self.queues[idx].lock();
if let Some(request) = queue.front() {
return Some(PeekResult {
id: request.id.clone(),
priority: request.priority,
wait_time: request.wait_time(),
is_starving: false,
});
}
}
None
}
pub fn remove(&self, request_id: &str) -> Option<QueuedRequest> {
for (idx, queue_mutex) in self.queues.iter().enumerate() {
let mut queue = queue_mutex.lock();
if let Some(pos) = queue.iter().position(|r| r.id == request_id) {
if let Some(request) = queue.remove(pos) {
drop(queue);
let priority = BatchPriority::from_level(idx as u8);
self.metrics.record_cancelled(priority);
self.notify_dequeue.notify_one();
return Some(request);
}
}
}
None
}
pub fn start_drain(&self) {
*self.state.write() = QueueState::Draining;
}
pub fn close(&self) -> Vec<QueuedRequest> {
*self.state.write() = QueueState::Closed;
let mut cancelled = Vec::new();
for queue_mutex in &self.queues {
let mut queue = queue_mutex.lock();
cancelled.extend(queue.drain(..));
}
self.notify_enqueue.notify_waiters();
self.notify_dequeue.notify_waiters();
cancelled
}
pub async fn wait_for_enqueue(&self) {
self.notify_enqueue.notified().await;
}
pub async fn wait_for_dequeue(&self) {
self.notify_dequeue.notified().await;
}
pub fn stats(&self) -> QueueStats {
let depths: [usize; NUM_PRIORITY_LEVELS] = [
self.queues[0].lock().len(),
self.queues[1].lock().len(),
self.queues[2].lock().len(),
self.queues[3].lock().len(),
];
let oldest_wait = self.oldest_wait_time();
QueueStats {
state: self.state(),
total_depth: depths.iter().sum(),
depths_by_priority: depths,
oldest_wait_time: oldest_wait,
enqueued: self.metrics.enqueued(),
dequeued: self.metrics.dequeued(),
rejected: self.metrics.rejected(),
cancelled: self.metrics.cancelled(),
starvation_promotions: self.metrics.starvation_promotions(),
}
}
fn oldest_wait_time(&self) -> Option<Duration> {
let mut oldest: Option<Duration> = None;
for queue_mutex in &self.queues {
let queue = queue_mutex.lock();
if let Some(request) = queue.front() {
let wait = request.wait_time();
oldest = Some(oldest.map_or(wait, |o| o.max(wait)));
}
}
oldest
}
}
impl fmt::Debug for RequestQueue {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestQueue")
.field("state", &self.state())
.field("total_depth", &self.len())
.field("config", &self.config)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct PeekResult {
pub id: String,
pub priority: BatchPriority,
pub wait_time: Duration,
pub is_starving: bool,
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub state: QueueState,
pub total_depth: usize,
pub depths_by_priority: [usize; NUM_PRIORITY_LEVELS],
pub oldest_wait_time: Option<Duration>,
pub enqueued: u64,
pub dequeued: u64,
pub rejected: u64,
pub cancelled: u64,
pub starvation_promotions: u64,
}
#[derive(Debug)]
pub struct QueueMetrics {
enqueued: [AtomicU64; NUM_PRIORITY_LEVELS],
dequeued: [AtomicU64; NUM_PRIORITY_LEVELS],
rejected: [AtomicU64; NUM_PRIORITY_LEVELS],
cancelled: [AtomicU64; NUM_PRIORITY_LEVELS],
starvation_promotions: AtomicU64,
total_wait_ns: AtomicU64,
total_processed: AtomicU64,
}
impl QueueMetrics {
pub fn new() -> Self {
Self {
enqueued: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
dequeued: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
rejected: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
cancelled: [
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
AtomicU64::new(0),
],
starvation_promotions: AtomicU64::new(0),
total_wait_ns: AtomicU64::new(0),
total_processed: AtomicU64::new(0),
}
}
pub fn record_enqueued(&self, priority: BatchPriority) {
self.enqueued[priority.as_level() as usize].fetch_add(1, Ordering::Relaxed);
}
pub fn record_dequeued(&self, priority: BatchPriority) {
self.dequeued[priority.as_level() as usize].fetch_add(1, Ordering::Relaxed);
}
pub fn record_rejected(&self, priority: BatchPriority) {
self.rejected[priority.as_level() as usize].fetch_add(1, Ordering::Relaxed);
}
pub fn record_cancelled(&self, priority: BatchPriority) {
self.cancelled[priority.as_level() as usize].fetch_add(1, Ordering::Relaxed);
}
pub fn record_starvation_promotion(&self) {
self.starvation_promotions.fetch_add(1, Ordering::Relaxed);
}
pub fn record_wait_time(&self, wait: Duration) {
self.total_wait_ns
.fetch_add(wait.as_nanos() as u64, Ordering::Relaxed);
self.total_processed.fetch_add(1, Ordering::Relaxed);
}
pub fn enqueued(&self) -> u64 {
self.enqueued
.iter()
.map(|c| c.load(Ordering::Relaxed))
.sum()
}
pub fn enqueued_by_priority(&self, priority: BatchPriority) -> u64 {
self.enqueued[priority.as_level() as usize].load(Ordering::Relaxed)
}
pub fn dequeued(&self) -> u64 {
self.dequeued
.iter()
.map(|c| c.load(Ordering::Relaxed))
.sum()
}
pub fn dequeued_by_priority(&self, priority: BatchPriority) -> u64 {
self.dequeued[priority.as_level() as usize].load(Ordering::Relaxed)
}
pub fn rejected(&self) -> u64 {
self.rejected
.iter()
.map(|c| c.load(Ordering::Relaxed))
.sum()
}
pub fn rejected_by_priority(&self, priority: BatchPriority) -> u64 {
self.rejected[priority.as_level() as usize].load(Ordering::Relaxed)
}
pub fn cancelled(&self) -> u64 {
self.cancelled
.iter()
.map(|c| c.load(Ordering::Relaxed))
.sum()
}
pub fn starvation_promotions(&self) -> u64 {
self.starvation_promotions.load(Ordering::Relaxed)
}
pub fn avg_wait_time(&self) -> Duration {
let total_ns = self.total_wait_ns.load(Ordering::Relaxed);
let count = self.total_processed.load(Ordering::Relaxed);
if count > 0 {
Duration::from_nanos(total_ns / count)
} else {
Duration::ZERO
}
}
pub fn prometheus(&self) -> String {
let mut output = String::new();
let priorities = ["background", "normal", "high", "critical"];
output.push_str("# HELP infernum_queue_enqueued_total Requests enqueued\n");
output.push_str("# TYPE infernum_queue_enqueued_total counter\n");
for (idx, name) in priorities.iter().enumerate() {
output.push_str(&format!(
"infernum_queue_enqueued_total{{priority=\"{}\"}} {}\n",
name,
self.enqueued[idx].load(Ordering::Relaxed)
));
}
output.push_str("# HELP infernum_queue_dequeued_total Requests dequeued\n");
output.push_str("# TYPE infernum_queue_dequeued_total counter\n");
for (idx, name) in priorities.iter().enumerate() {
output.push_str(&format!(
"infernum_queue_dequeued_total{{priority=\"{}\"}} {}\n",
name,
self.dequeued[idx].load(Ordering::Relaxed)
));
}
output.push_str("# HELP infernum_queue_rejected_total Requests rejected\n");
output.push_str("# TYPE infernum_queue_rejected_total counter\n");
for (idx, name) in priorities.iter().enumerate() {
output.push_str(&format!(
"infernum_queue_rejected_total{{priority=\"{}\"}} {}\n",
name,
self.rejected[idx].load(Ordering::Relaxed)
));
}
output.push_str("# HELP infernum_queue_cancelled_total Requests cancelled\n");
output.push_str("# TYPE infernum_queue_cancelled_total counter\n");
output.push_str(&format!(
"infernum_queue_cancelled_total {}\n",
self.cancelled()
));
output
.push_str("# HELP infernum_queue_starvation_promotions_total Starvation promotions\n");
output.push_str("# TYPE infernum_queue_starvation_promotions_total counter\n");
output.push_str(&format!(
"infernum_queue_starvation_promotions_total {}\n",
self.starvation_promotions()
));
output.push_str("# HELP infernum_queue_avg_wait_seconds Average wait time\n");
output.push_str("# TYPE infernum_queue_avg_wait_seconds gauge\n");
output.push_str(&format!(
"infernum_queue_avg_wait_seconds {:.6}\n",
self.avg_wait_time().as_secs_f64()
));
output
}
}
impl Default for QueueMetrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_queue_config_default() {
let config = QueueConfig::default();
assert_eq!(config.max_queue_depth, 1000);
assert_eq!(config.max_per_priority, 500);
assert_eq!(config.starvation_timeout, Duration::from_secs(30));
assert!(config.enable_wfq);
}
#[test]
fn test_queue_config_builder() {
let config = QueueConfig::new()
.with_max_queue_depth(500)
.with_max_per_priority(100)
.with_starvation_timeout(Duration::from_secs(60))
.with_wfq(false);
assert_eq!(config.max_queue_depth, 500);
assert_eq!(config.max_per_priority, 100);
assert_eq!(config.starvation_timeout, Duration::from_secs(60));
assert!(!config.enable_wfq);
}
#[test]
fn test_queue_config_weights() {
let config = QueueConfig::default();
assert_eq!(config.weight_for(BatchPriority::Background), 1.0);
assert_eq!(config.weight_for(BatchPriority::Normal), 1.0);
assert_eq!(config.weight_for(BatchPriority::High), 2.0);
assert_eq!(config.weight_for(BatchPriority::Critical), 4.0);
}
#[test]
fn test_queued_request_new() {
let request = QueuedRequest::new("req-1", BatchPriority::Normal, "llama", 100);
assert_eq!(request.id, "req-1");
assert_eq!(request.priority, BatchPriority::Normal);
assert_eq!(request.estimated_tokens, 100);
assert!(!request.streamable);
}
#[test]
fn test_queued_request_with_payload() {
let request = QueuedRequest::new("req-1", BatchPriority::High, "llama", 50)
.with_payload(vec![1, 2, 3])
.with_streamable(true);
assert_eq!(request.payload(), &[1, 2, 3]);
assert!(request.streamable);
}
#[test]
fn test_queued_request_promote() {
let mut request = QueuedRequest::new("req-1", BatchPriority::Normal, "llama", 100);
request.promote(BatchPriority::High);
assert_eq!(request.priority, BatchPriority::High);
assert_eq!(request.original_priority, BatchPriority::Normal);
request.promote(BatchPriority::Normal);
assert_eq!(request.priority, BatchPriority::High);
}
#[test]
fn test_queue_error_display() {
let err = QueueError::QueueFull {
current: 100,
max: 100,
};
assert!(err.to_string().contains("Queue full"));
let err = QueueError::PriorityQueueFull {
priority: BatchPriority::High,
current: 50,
max: 50,
};
assert!(err.to_string().contains("Priority queue"));
let err = QueueError::ShuttingDown;
assert!(err.to_string().contains("shutting down"));
}
#[test]
fn test_queue_state_display() {
assert_eq!(QueueState::Open.to_string(), "open");
assert_eq!(QueueState::Draining.to_string(), "draining");
assert_eq!(QueueState::Closed.to_string(), "closed");
}
#[test]
fn test_request_queue_new() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
assert_eq!(queue.state(), QueueState::Open);
assert!(queue.is_empty());
assert_eq!(queue.len(), 0);
}
#[test]
fn test_request_queue_enqueue_dequeue() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
let request = QueuedRequest::new("req-1", BatchPriority::Normal, "llama", 100);
queue.enqueue(request).unwrap();
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
let dequeued = queue.dequeue();
assert!(dequeued.is_some());
assert_eq!(dequeued.unwrap().id, "req-1");
assert!(queue.is_empty());
}
#[test]
fn test_request_queue_priority_ordering() {
let config = QueueConfig::new().with_wfq(false); let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new(
"low",
BatchPriority::Background,
"m",
10,
))
.unwrap();
queue
.enqueue(QueuedRequest::new("normal", BatchPriority::Normal, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new("high", BatchPriority::High, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new(
"critical",
BatchPriority::Critical,
"m",
10,
))
.unwrap();
assert_eq!(queue.dequeue().unwrap().id, "critical");
assert_eq!(queue.dequeue().unwrap().id, "high");
assert_eq!(queue.dequeue().unwrap().id, "normal");
assert_eq!(queue.dequeue().unwrap().id, "low");
}
#[test]
fn test_request_queue_max_depth() {
let config = QueueConfig::new().with_max_queue_depth(2);
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::Normal, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new("req-2", BatchPriority::Normal, "m", 10))
.unwrap();
let result = queue.enqueue(QueuedRequest::new("req-3", BatchPriority::Normal, "m", 10));
assert!(matches!(result, Err(QueueError::QueueFull { .. })));
}
#[test]
fn test_request_queue_max_per_priority() {
let config = QueueConfig::new()
.with_max_queue_depth(100)
.with_max_per_priority(1);
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::High, "m", 10))
.unwrap();
let result = queue.enqueue(QueuedRequest::new("req-2", BatchPriority::High, "m", 10));
assert!(matches!(result, Err(QueueError::PriorityQueueFull { .. })));
queue
.enqueue(QueuedRequest::new("req-3", BatchPriority::Normal, "m", 10))
.unwrap();
}
#[test]
fn test_request_queue_len_at_priority() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::High, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new("req-2", BatchPriority::High, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new("req-3", BatchPriority::Normal, "m", 10))
.unwrap();
assert_eq!(queue.len_at_priority(BatchPriority::High), 2);
assert_eq!(queue.len_at_priority(BatchPriority::Normal), 1);
assert_eq!(queue.len_at_priority(BatchPriority::Critical), 0);
}
#[test]
fn test_request_queue_dequeue_batch() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
for i in 0..5 {
queue
.enqueue(QueuedRequest::new(
format!("req-{}", i),
BatchPriority::Normal,
"m",
10,
))
.unwrap();
}
let batch = queue.dequeue_batch(3);
assert_eq!(batch.len(), 3);
assert_eq!(queue.len(), 2);
}
#[test]
fn test_request_queue_peek() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
assert!(queue.peek().is_none());
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::Normal, "m", 10))
.unwrap();
let peek = queue.peek();
assert!(peek.is_some());
assert_eq!(peek.unwrap().id, "req-1");
assert_eq!(queue.len(), 1);
}
#[test]
fn test_request_queue_remove() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::Normal, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new("req-2", BatchPriority::Normal, "m", 10))
.unwrap();
let removed = queue.remove("req-1");
assert!(removed.is_some());
assert_eq!(removed.unwrap().id, "req-1");
assert_eq!(queue.len(), 1);
let removed = queue.remove("req-999");
assert!(removed.is_none());
}
#[test]
fn test_request_queue_drain_close() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::Normal, "m", 10))
.unwrap();
queue.start_drain();
assert_eq!(queue.state(), QueueState::Draining);
let result = queue.enqueue(QueuedRequest::new("req-2", BatchPriority::Normal, "m", 10));
assert!(matches!(result, Err(QueueError::ShuttingDown)));
assert!(queue.dequeue().is_some());
queue
.enqueue(QueuedRequest::new("req-3", BatchPriority::Normal, "m", 10))
.unwrap_err();
let cancelled = queue.close();
assert_eq!(queue.state(), QueueState::Closed);
assert!(cancelled.is_empty()); }
#[test]
fn test_request_queue_stats() {
let config = QueueConfig::default();
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("req-1", BatchPriority::High, "m", 10))
.unwrap();
queue
.enqueue(QueuedRequest::new("req-2", BatchPriority::Normal, "m", 10))
.unwrap();
let stats = queue.stats();
assert_eq!(stats.state, QueueState::Open);
assert_eq!(stats.total_depth, 2);
assert_eq!(stats.depths_by_priority[2], 1); assert_eq!(stats.depths_by_priority[1], 1); }
#[test]
fn test_queue_metrics_new() {
let metrics = QueueMetrics::new();
assert_eq!(metrics.enqueued(), 0);
assert_eq!(metrics.dequeued(), 0);
assert_eq!(metrics.rejected(), 0);
}
#[test]
fn test_queue_metrics_record() {
let metrics = QueueMetrics::new();
metrics.record_enqueued(BatchPriority::High);
metrics.record_enqueued(BatchPriority::High);
metrics.record_enqueued(BatchPriority::Normal);
metrics.record_dequeued(BatchPriority::High);
metrics.record_rejected(BatchPriority::Critical);
assert_eq!(metrics.enqueued(), 3);
assert_eq!(metrics.enqueued_by_priority(BatchPriority::High), 2);
assert_eq!(metrics.dequeued(), 1);
assert_eq!(metrics.rejected(), 1);
}
#[test]
fn test_queue_metrics_wait_time() {
let metrics = QueueMetrics::new();
metrics.record_wait_time(Duration::from_millis(100));
metrics.record_wait_time(Duration::from_millis(200));
let avg = metrics.avg_wait_time();
assert!(avg >= Duration::from_millis(140) && avg <= Duration::from_millis(160));
}
#[test]
fn test_queue_metrics_prometheus() {
let metrics = QueueMetrics::new();
metrics.record_enqueued(BatchPriority::Normal);
metrics.record_starvation_promotion();
let output = metrics.prometheus();
assert!(output.contains("infernum_queue_enqueued_total"));
assert!(output.contains("infernum_queue_starvation_promotions_total 1"));
}
#[test]
fn test_wfq_fairness() {
let config = QueueConfig::new()
.with_wfq(true)
.with_priority_weights([1.0, 1.0, 2.0, 4.0]);
let queue = RequestQueue::new(config);
for i in 0..10 {
queue
.enqueue(QueuedRequest::new(
format!("critical-{}", i),
BatchPriority::Critical,
"m",
10,
))
.unwrap();
queue
.enqueue(QueuedRequest::new(
format!("normal-{}", i),
BatchPriority::Normal,
"m",
10,
))
.unwrap();
}
let mut critical_count = 0;
let mut normal_count = 0;
for _ in 0..10 {
if let Some(req) = queue.dequeue() {
if req.id.starts_with("critical") {
critical_count += 1;
} else {
normal_count += 1;
}
}
}
assert!(
critical_count > normal_count,
"Critical: {}, Normal: {}",
critical_count,
normal_count
);
}
#[test]
fn test_starvation_prevention() {
let config = QueueConfig::new()
.with_wfq(false)
.with_starvation_timeout(Duration::from_millis(1));
let queue = RequestQueue::new(config);
queue
.enqueue(QueuedRequest::new("normal", BatchPriority::Normal, "m", 10))
.unwrap();
std::thread::sleep(Duration::from_millis(5));
queue
.enqueue(QueuedRequest::new(
"critical",
BatchPriority::Critical,
"m",
10,
))
.unwrap();
let first = queue.dequeue().unwrap();
assert_eq!(first.id, "normal");
assert_eq!(first.priority, BatchPriority::High); }
}