use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{RwLock, broadcast, oneshot};
pub struct WaitQueue {
queues: RwLock<HashMap<String, VecDeque<WaitEntry>>>,
wait_history: RwLock<HashMap<String, Vec<Duration>>>,
event_sender: broadcast::Sender<WaitQueueEvent>,
max_history_entries: usize,
}
#[derive(Debug)]
pub struct WaitEntry {
pub agent_id: String,
pub priority: u8,
pub registered_at: Instant,
pub auto_acquire: bool,
notify_sender: Option<oneshot::Sender<()>>,
}
pub struct WaitQueueHandle {
pub ready: oneshot::Receiver<()>,
pub initial_position: usize,
pub resource_key: String,
pub agent_id: String,
wait_queue: Arc<WaitQueue>,
}
impl WaitQueueHandle {
pub async fn cancel(self) -> bool {
self.wait_queue
.cancel(&self.resource_key, &self.agent_id)
.await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WaitQueueEvent {
Registered {
agent_id: String,
resource_key: String,
position: usize,
priority: u8,
},
PositionChanged {
agent_id: String,
resource_key: String,
old_position: usize,
new_position: usize,
},
Ready {
agent_id: String,
resource_key: String,
wait_duration_ms: u64,
},
Removed {
agent_id: String,
resource_key: String,
reason: RemovalReason,
},
QueueEmpty {
resource_key: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RemovalReason {
Cancelled,
Acquired,
Timeout,
ResourceUnavailable,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueStatus {
pub resource_key: String,
pub queue_length: usize,
pub waiters: Vec<WaiterInfo>,
pub estimated_wait_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WaiterInfo {
pub agent_id: String,
pub position: usize,
pub priority: u8,
pub waiting_since_secs: u64,
pub auto_acquire: bool,
}
impl WaitQueue {
pub fn new() -> Arc<Self> {
Self::with_max_history(100)
}
pub fn with_max_history(max_history_entries: usize) -> Arc<Self> {
let (event_sender, _) = broadcast::channel(256);
Arc::new(Self {
queues: RwLock::new(HashMap::new()),
wait_history: RwLock::new(HashMap::new()),
event_sender,
max_history_entries,
})
}
pub fn subscribe(&self) -> broadcast::Receiver<WaitQueueEvent> {
self.event_sender.subscribe()
}
pub async fn register(
self: &Arc<Self>,
resource_key: &str,
agent_id: &str,
priority: u8,
auto_acquire: bool,
) -> WaitQueueHandle {
let (notify_sender, notify_receiver) = oneshot::channel();
let entry = WaitEntry {
agent_id: agent_id.to_string(),
priority,
registered_at: Instant::now(),
auto_acquire,
notify_sender: Some(notify_sender),
};
let position = {
let mut queues = self.queues.write().await;
let queue = queues.entry(resource_key.to_string()).or_default();
let insert_pos = queue
.iter()
.position(|e| e.priority > priority)
.unwrap_or(queue.len());
queue.insert(insert_pos, entry);
for (i, e) in queue.iter().enumerate().skip(insert_pos + 1) {
let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
agent_id: e.agent_id.clone(),
resource_key: resource_key.to_string(),
old_position: i - 1,
new_position: i,
});
}
insert_pos
};
let _ = self.event_sender.send(WaitQueueEvent::Registered {
agent_id: agent_id.to_string(),
resource_key: resource_key.to_string(),
position,
priority,
});
WaitQueueHandle {
ready: notify_receiver,
initial_position: position,
resource_key: resource_key.to_string(),
agent_id: agent_id.to_string(),
wait_queue: Arc::clone(self),
}
}
pub async fn cancel(&self, resource_key: &str, agent_id: &str) -> bool {
let mut queues = self.queues.write().await;
if let Some(queue) = queues.get_mut(resource_key)
&& let Some(pos) = queue.iter().position(|e| e.agent_id == agent_id)
{
queue.remove(pos);
for (i, e) in queue.iter().enumerate().skip(pos) {
let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
agent_id: e.agent_id.clone(),
resource_key: resource_key.to_string(),
old_position: i + 1,
new_position: i,
});
}
let _ = self.event_sender.send(WaitQueueEvent::Removed {
agent_id: agent_id.to_string(),
resource_key: resource_key.to_string(),
reason: RemovalReason::Cancelled,
});
if queue.is_empty() {
queues.remove(resource_key);
let _ = self.event_sender.send(WaitQueueEvent::QueueEmpty {
resource_key: resource_key.to_string(),
});
}
return true;
}
false
}
pub async fn notify_released(&self, resource_key: &str) -> Option<String> {
let mut queues = self.queues.write().await;
if let Some(queue) = queues.get_mut(resource_key)
&& let Some(mut entry) = queue.pop_front()
{
let wait_duration = entry.registered_at.elapsed();
{
let mut history = self.wait_history.write().await;
let times = history.entry(resource_key.to_string()).or_default();
times.push(wait_duration);
if times.len() > self.max_history_entries {
times.remove(0);
}
}
if let Some(sender) = entry.notify_sender.take() {
let _ = sender.send(());
}
let agent_id = entry.agent_id.clone();
let _ = self.event_sender.send(WaitQueueEvent::Ready {
agent_id: agent_id.clone(),
resource_key: resource_key.to_string(),
wait_duration_ms: wait_duration.as_millis() as u64,
});
for (i, e) in queue.iter().enumerate() {
let _ = self.event_sender.send(WaitQueueEvent::PositionChanged {
agent_id: e.agent_id.clone(),
resource_key: resource_key.to_string(),
old_position: i + 1,
new_position: i,
});
}
if queue.is_empty() {
queues.remove(resource_key);
let _ = self.event_sender.send(WaitQueueEvent::QueueEmpty {
resource_key: resource_key.to_string(),
});
}
return Some(agent_id);
}
None
}
pub async fn queue_length(&self, resource_key: &str) -> usize {
let queues = self.queues.read().await;
queues.get(resource_key).map_or(0, |q| q.len())
}
pub async fn position(&self, resource_key: &str, agent_id: &str) -> Option<usize> {
let queues = self.queues.read().await;
queues
.get(resource_key)
.and_then(|q| q.iter().position(|e| e.agent_id == agent_id))
}
pub async fn estimate_wait(&self, resource_key: &str) -> Option<Duration> {
let history = self.wait_history.read().await;
if let Some(times) = history.get(resource_key) {
if times.is_empty() {
return None;
}
let total: Duration = times.iter().sum();
Some(total / times.len() as u32)
} else {
None
}
}
pub async fn estimate_wait_at_position(
&self,
resource_key: &str,
position: usize,
) -> Option<Duration> {
let base_estimate = self.estimate_wait(resource_key).await?;
Some(base_estimate * (position as u32 + 1))
}
pub async fn get_queue_status(&self, resource_key: &str) -> Option<QueueStatus> {
let queues = self.queues.read().await;
let queue = queues.get(resource_key)?;
let waiters: Vec<WaiterInfo> = queue
.iter()
.enumerate()
.map(|(i, e)| WaiterInfo {
agent_id: e.agent_id.clone(),
position: i,
priority: e.priority,
waiting_since_secs: e.registered_at.elapsed().as_secs(),
auto_acquire: e.auto_acquire,
})
.collect();
let estimated_wait_ms = self
.estimate_wait(resource_key)
.await
.map(|d| d.as_millis() as u64);
Some(QueueStatus {
resource_key: resource_key.to_string(),
queue_length: queue.len(),
waiters,
estimated_wait_ms,
})
}
pub async fn list_queues(&self) -> Vec<String> {
let queues = self.queues.read().await;
queues.keys().cloned().collect()
}
pub async fn is_waiting(&self, agent_id: &str) -> bool {
let queues = self.queues.read().await;
queues
.values()
.any(|q| q.iter().any(|e| e.agent_id == agent_id))
}
pub async fn waiting_for(&self, agent_id: &str) -> Vec<String> {
let queues = self.queues.read().await;
queues
.iter()
.filter(|(_, q)| q.iter().any(|e| e.agent_id == agent_id))
.map(|(k, _)| k.clone())
.collect()
}
pub async fn record_wait_time(&self, resource_key: &str, duration: Duration) {
let mut history = self.wait_history.write().await;
let times = history.entry(resource_key.to_string()).or_default();
times.push(duration);
if times.len() > self.max_history_entries {
times.remove(0);
}
}
pub async fn peek_next(&self, resource_key: &str) -> Option<WaiterInfo> {
let queues = self.queues.read().await;
queues.get(resource_key).and_then(|q| {
q.front().map(|e| WaiterInfo {
agent_id: e.agent_id.clone(),
position: 0,
priority: e.priority,
waiting_since_secs: e.registered_at.elapsed().as_secs(),
auto_acquire: e.auto_acquire,
})
})
}
pub async fn should_auto_acquire(&self, resource_key: &str, agent_id: &str) -> bool {
let queues = self.queues.read().await;
if let Some(queue) = queues.get(resource_key)
&& let Some(front) = queue.front()
{
return front.agent_id == agent_id && front.auto_acquire;
}
false
}
}
impl Default for WaitQueue {
fn default() -> Self {
let (event_sender, _) = broadcast::channel(256);
Self {
queues: RwLock::new(HashMap::new()),
wait_history: RwLock::new(HashMap::new()),
event_sender,
max_history_entries: 100,
}
}
}
pub fn resource_key(operation_type: &str, scope: &str) -> String {
format!("{}:{}", operation_type, scope)
}
pub fn file_resource_key(path: &std::path::Path) -> String {
format!("file:{}", path.display())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_register_and_position() {
let queue = WaitQueue::new();
let handle1 = queue.register("build:/project", "agent-1", 5, false).await;
let handle2 = queue.register("build:/project", "agent-2", 5, false).await;
assert_eq!(handle1.initial_position, 0);
assert_eq!(handle2.initial_position, 1);
assert_eq!(queue.position("build:/project", "agent-1").await, Some(0));
assert_eq!(queue.position("build:/project", "agent-2").await, Some(1));
assert_eq!(queue.queue_length("build:/project").await, 2);
}
#[tokio::test]
async fn test_priority_ordering() {
let queue = WaitQueue::new();
let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
let handle2 = queue.register("build:/project", "agent-2", 1, false).await; let _handle3 = queue.register("build:/project", "agent-3", 10, false).await;
assert_eq!(handle2.initial_position, 0);
assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
assert_eq!(queue.position("build:/project", "agent-1").await, Some(1));
assert_eq!(queue.position("build:/project", "agent-3").await, Some(2));
}
#[tokio::test]
async fn test_cancel() {
let queue = WaitQueue::new();
let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
assert!(queue.cancel("build:/project", "agent-1").await);
assert_eq!(queue.position("build:/project", "agent-1").await, None);
assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
assert_eq!(queue.queue_length("build:/project").await, 1);
}
#[tokio::test]
async fn test_notify_released() {
let queue = WaitQueue::new();
let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
let next = queue.notify_released("build:/project").await;
assert_eq!(next, Some("agent-1".to_string()));
assert_eq!(queue.position("build:/project", "agent-2").await, Some(0));
assert_eq!(queue.queue_length("build:/project").await, 1);
}
#[tokio::test]
async fn test_empty_queue_cleanup() {
let queue = WaitQueue::new();
let _handle = queue.register("build:/project", "agent-1", 5, false).await;
assert!(queue.cancel("build:/project", "agent-1").await);
assert_eq!(queue.queue_length("build:/project").await, 0);
assert!(queue.list_queues().await.is_empty());
}
#[tokio::test]
async fn test_wait_time_estimation() {
let queue = WaitQueue::new();
queue
.record_wait_time("build:/project", Duration::from_secs(10))
.await;
queue
.record_wait_time("build:/project", Duration::from_secs(20))
.await;
queue
.record_wait_time("build:/project", Duration::from_secs(30))
.await;
let estimate = queue.estimate_wait("build:/project").await.unwrap();
assert_eq!(estimate, Duration::from_secs(20)); }
#[tokio::test]
async fn test_is_waiting() {
let queue = WaitQueue::new();
let _handle = queue.register("build:/project", "agent-1", 5, false).await;
assert!(queue.is_waiting("agent-1").await);
assert!(!queue.is_waiting("agent-2").await);
}
#[tokio::test]
async fn test_waiting_for() {
let queue = WaitQueue::new();
let _handle1 = queue.register("build:/project1", "agent-1", 5, false).await;
let _handle2 = queue.register("build:/project2", "agent-1", 5, false).await;
let waiting = queue.waiting_for("agent-1").await;
assert_eq!(waiting.len(), 2);
assert!(waiting.contains(&"build:/project1".to_string()));
assert!(waiting.contains(&"build:/project2".to_string()));
}
#[tokio::test]
async fn test_peek_next() {
let queue = WaitQueue::new();
let _handle = queue.register("build:/project", "agent-1", 5, true).await;
let next = queue.peek_next("build:/project").await.unwrap();
assert_eq!(next.agent_id, "agent-1");
assert_eq!(next.priority, 5);
assert!(next.auto_acquire);
assert_eq!(queue.queue_length("build:/project").await, 1);
}
#[tokio::test]
async fn test_should_auto_acquire() {
let queue = WaitQueue::new();
let _handle1 = queue.register("build:/project", "agent-1", 5, true).await;
let _handle2 = queue.register("build:/project", "agent-2", 5, false).await;
assert!(queue.should_auto_acquire("build:/project", "agent-1").await);
assert!(!queue.should_auto_acquire("build:/project", "agent-2").await);
}
#[tokio::test]
async fn test_queue_status() {
let queue = WaitQueue::new();
let _handle1 = queue.register("build:/project", "agent-1", 5, false).await;
let _handle2 = queue.register("build:/project", "agent-2", 3, true).await;
let status = queue.get_queue_status("build:/project").await.unwrap();
assert_eq!(status.queue_length, 2);
assert_eq!(status.waiters.len(), 2);
assert_eq!(status.waiters[0].agent_id, "agent-2");
assert_eq!(status.waiters[1].agent_id, "agent-1");
}
#[tokio::test]
async fn test_event_subscription() {
let queue = WaitQueue::new();
let mut receiver = queue.subscribe();
let _handle = queue.register("build:/project", "agent-1", 5, false).await;
let event = receiver.try_recv().unwrap();
match event {
WaitQueueEvent::Registered { agent_id, .. } => {
assert_eq!(agent_id, "agent-1");
}
_ => panic!("Expected Registered event"),
}
}
}