use std::collections::VecDeque;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use tokio::sync::{Mutex, Notify, OwnedSemaphorePermit, Semaphore};
use tracing;
use super::models::{TaskId, TaskRequest};
const DEFAULT_MAX_CONCURRENT: u32 = 3;
#[derive(Debug)]
pub struct QueuedTask {
pub task_id: TaskId,
pub agent_id: String,
pub request: TaskRequest,
pub queued_at: DateTime<Utc>,
}
#[derive(Debug)]
pub struct ActiveTask {
pub task_id: TaskId,
pub agent_id: String,
pub request: TaskRequest,
pub started_at: DateTime<Utc>,
pub _permit: OwnedSemaphorePermit,
}
pub struct TaskQueue {
pending: Mutex<VecDeque<QueuedTask>>,
active: DashMap<TaskId, ActiveTask>,
max_concurrent: u32,
slots: Arc<Semaphore>,
notify: Arc<Notify>,
}
impl TaskQueue {
pub fn new(max_concurrent: Option<u32>) -> Arc<Self> {
let max = max_concurrent.unwrap_or(DEFAULT_MAX_CONCURRENT);
let queue = Arc::new(Self {
pending: Mutex::new(VecDeque::new()),
active: DashMap::new(),
max_concurrent: max,
slots: Arc::new(Semaphore::new(max as usize)),
notify: Arc::new(Notify::new()),
});
queue
}
pub fn notifier(&self) -> Arc<Notify> {
self.notify.clone()
}
pub async fn enqueue(&self, agent_id: String, request: TaskRequest) -> TaskId {
let task_id = uuid::Uuid::new_v4().to_string();
let queued_task = QueuedTask {
task_id: task_id.clone(),
agent_id,
request,
queued_at: Utc::now(),
};
{
let mut pending = self.pending.lock().await;
pending.push_back(queued_task);
}
tracing::debug!(task_id = %task_id, "Task enqueued");
self.notify.notify_one();
task_id
}
pub async fn try_dequeue(&self) -> Option<TaskId> {
let permit = match Arc::clone(&self.slots).try_acquire_owned() {
Ok(permit) => permit,
Err(_) => return None,
};
let queued_task = {
let mut pending = self.pending.lock().await;
pending.pop_front()
};
match queued_task {
Some(task) => {
let task_id = task.task_id.clone();
let active_task = ActiveTask {
task_id: task.task_id,
agent_id: task.agent_id,
request: task.request,
started_at: Utc::now(),
_permit: permit,
};
self.active.insert(task_id.clone(), active_task);
tracing::debug!(task_id = %task_id, "Task moved to active");
Some(task_id)
}
None => {
drop(permit);
None
}
}
}
pub fn complete_task(&self, task_id: &str) -> bool {
if let Some((_, _task)) = self.active.remove(task_id) {
tracing::debug!(task_id = %task_id, "Task completed, slot released");
self.notify.notify_one();
true
} else {
tracing::warn!(task_id = %task_id, "Attempted to complete unknown task");
false
}
}
pub async fn cancel_task(&self, task_id: &str) -> bool {
if let Some((_, _task)) = self.active.remove(task_id) {
tracing::debug!(task_id = %task_id, "Active task cancelled, slot released");
self.notify.notify_one();
return true;
}
let mut pending = self.pending.lock().await;
if let Some(pos) = pending.iter().position(|t| t.task_id == task_id) {
pending.remove(pos);
tracing::debug!(task_id = %task_id, "Pending task cancelled");
return true;
}
tracing::warn!(task_id = %task_id, "Attempted to cancel unknown task");
false
}
pub fn active_count(&self) -> usize {
self.active.len()
}
pub async fn pending_count(&self) -> usize {
let pending = self.pending.lock().await;
pending.len()
}
pub fn max_concurrent(&self) -> u32 {
self.max_concurrent
}
pub fn active_tasks(&self) -> &DashMap<TaskId, ActiveTask> {
&self.active
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coding_agent::models::{ReplyTarget, TaskRequest, TaskTrigger};
fn make_request(description: &str) -> TaskRequest {
TaskRequest {
description: description.to_string(),
trigger: TaskTrigger::ControlPanel {
user_id: "test-user".to_string(),
},
workspace: None,
file_context: None,
reply_to: ReplyTarget {
channel_type: "telegram".to_string(),
channel_id: "12345".to_string(),
message_id: None,
},
}
}
#[tokio::test]
async fn test_new_creates_queue_with_default_concurrency() {
let queue = TaskQueue::new(None);
assert_eq!(queue.max_concurrent(), DEFAULT_MAX_CONCURRENT);
assert_eq!(queue.active_count(), 0);
assert_eq!(queue.pending_count().await, 0);
}
#[tokio::test]
async fn test_new_creates_queue_with_custom_concurrency() {
let queue = TaskQueue::new(Some(5));
assert_eq!(queue.max_concurrent(), 5);
}
#[tokio::test]
async fn test_enqueue_adds_task_to_pending() {
let queue = TaskQueue::new(Some(3));
let task_id = queue
.enqueue("agent-1".to_string(), make_request("fix bug"))
.await;
assert!(!task_id.is_empty());
assert_eq!(queue.pending_count().await, 1);
assert_eq!(queue.active_count(), 0);
}
#[tokio::test]
async fn test_enqueue_returns_unique_task_ids() {
let queue = TaskQueue::new(Some(10));
let id1 = queue
.enqueue("agent-1".to_string(), make_request("task 1"))
.await;
let id2 = queue
.enqueue("agent-1".to_string(), make_request("task 2"))
.await;
let id3 = queue
.enqueue("agent-1".to_string(), make_request("task 3"))
.await;
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
}
#[tokio::test]
async fn test_try_dequeue_moves_task_to_active() {
let queue = TaskQueue::new(Some(3));
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "test-task-1".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("test task"),
queued_at: Utc::now(),
});
}
let dequeued = queue.try_dequeue().await;
assert_eq!(dequeued, Some("test-task-1".to_string()));
assert_eq!(queue.active_count(), 1);
}
#[tokio::test]
async fn test_try_dequeue_returns_none_when_empty() {
let queue = TaskQueue::new(Some(3));
let result = queue.try_dequeue().await;
assert_eq!(result, None);
}
#[tokio::test]
async fn test_try_dequeue_returns_none_when_no_slots() {
let queue = TaskQueue::new(Some(1));
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-1".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("task 1"),
queued_at: Utc::now(),
});
}
queue.try_dequeue().await;
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-2".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("task 2"),
queued_at: Utc::now(),
});
}
let result = queue.try_dequeue().await;
assert_eq!(result, None);
assert_eq!(queue.active_count(), 1);
}
#[tokio::test]
async fn test_complete_task_releases_slot() {
let queue = TaskQueue::new(Some(1));
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-1".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("task 1"),
queued_at: Utc::now(),
});
}
queue.try_dequeue().await;
assert_eq!(queue.active_count(), 1);
let completed = queue.complete_task("task-1");
assert!(completed);
assert_eq!(queue.active_count(), 0);
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-2".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("task 2"),
queued_at: Utc::now(),
});
}
let result = queue.try_dequeue().await;
assert_eq!(result, Some("task-2".to_string()));
}
#[tokio::test]
async fn test_complete_task_returns_false_for_unknown() {
let queue = TaskQueue::new(Some(3));
let result = queue.complete_task("nonexistent");
assert!(!result);
}
#[tokio::test]
async fn test_cancel_active_task() {
let queue = TaskQueue::new(Some(3));
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-1".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("task 1"),
queued_at: Utc::now(),
});
}
queue.try_dequeue().await;
assert_eq!(queue.active_count(), 1);
let cancelled = queue.cancel_task("task-1").await;
assert!(cancelled);
assert_eq!(queue.active_count(), 0);
}
#[tokio::test]
async fn test_cancel_pending_task() {
let queue = TaskQueue::new(Some(0));
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-1".to_string(),
agent_id: "agent-1".to_string(),
request: make_request("task 1"),
queued_at: Utc::now(),
});
}
assert_eq!(queue.pending_count().await, 1);
let cancelled = queue.cancel_task("task-1").await;
assert!(cancelled);
assert_eq!(queue.pending_count().await, 0);
}
#[tokio::test]
async fn test_cancel_unknown_task_returns_false() {
let queue = TaskQueue::new(Some(3));
let result = queue.cancel_task("nonexistent").await;
assert!(!result);
}
#[tokio::test]
async fn test_concurrency_limit_enforced() {
let queue = TaskQueue::new(Some(2));
{
let mut pending = queue.pending.lock().await;
for i in 0..3 {
pending.push_back(QueuedTask {
task_id: format!("task-{}", i),
agent_id: "agent-1".to_string(),
request: make_request(&format!("task {}", i)),
queued_at: Utc::now(),
});
}
}
assert!(queue.try_dequeue().await.is_some());
assert!(queue.try_dequeue().await.is_some());
assert!(queue.try_dequeue().await.is_none());
assert_eq!(queue.active_count(), 2);
assert_eq!(queue.pending_count().await, 1);
}
#[tokio::test]
async fn test_executor_is_single_consumer_via_try_dequeue() {
let queue = TaskQueue::new(Some(3));
let task_id = queue
.enqueue("agent-1".to_string(), make_request("manual-dequeue test"))
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(queue.active_count(), 0);
assert_eq!(queue.pending_count().await, 1);
let dequeued = queue.try_dequeue().await;
assert_eq!(dequeued, Some(task_id.clone()));
assert_eq!(queue.active_count(), 1);
assert_eq!(queue.pending_count().await, 0);
assert!(queue.active_tasks().contains_key(&task_id));
}
#[tokio::test]
async fn test_slot_frees_on_completion_allows_next_dequeue() {
let queue = TaskQueue::new(Some(1));
let task_id_1 = queue
.enqueue("agent-1".to_string(), make_request("task 1"))
.await;
assert_eq!(queue.try_dequeue().await, Some(task_id_1.clone()));
assert_eq!(queue.active_count(), 1);
let _task_id_2 = queue
.enqueue("agent-1".to_string(), make_request("task 2"))
.await;
assert!(queue.try_dequeue().await.is_none());
assert_eq!(queue.active_count(), 1);
assert_eq!(queue.pending_count().await, 1);
queue.complete_task(&task_id_1);
assert!(queue.try_dequeue().await.is_some());
assert_eq!(queue.active_count(), 1);
assert_eq!(queue.pending_count().await, 0);
}
#[tokio::test]
async fn test_fifo_ordering() {
let queue = TaskQueue::new(Some(10));
{
let mut pending = queue.pending.lock().await;
for i in 0..5 {
pending.push_back(QueuedTask {
task_id: format!("task-{}", i),
agent_id: "agent-1".to_string(),
request: make_request(&format!("task {}", i)),
queued_at: Utc::now(),
});
}
}
assert_eq!(queue.try_dequeue().await, Some("task-0".to_string()));
assert_eq!(queue.try_dequeue().await, Some("task-1".to_string()));
assert_eq!(queue.try_dequeue().await, Some("task-2".to_string()));
assert_eq!(queue.try_dequeue().await, Some("task-3".to_string()));
assert_eq!(queue.try_dequeue().await, Some("task-4".to_string()));
}
#[tokio::test]
async fn test_active_task_holds_correct_data() {
let queue = TaskQueue::new(Some(3));
{
let mut pending = queue.pending.lock().await;
pending.push_back(QueuedTask {
task_id: "task-abc".to_string(),
agent_id: "claude-code-1".to_string(),
request: make_request("fix authentication bug"),
queued_at: Utc::now(),
});
}
queue.try_dequeue().await;
let active = queue.active_tasks().get("task-abc").unwrap();
assert_eq!(active.task_id, "task-abc");
assert_eq!(active.agent_id, "claude-code-1");
assert_eq!(active.request.description, "fix authentication bug");
assert!(active.started_at <= Utc::now());
}
}