use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::watch;
#[derive(Debug, Clone)]
pub struct CompletedJob {
pub agent_name: String,
pub output: String,
}
#[derive(Debug)]
pub struct JobHandle {
pub cancel_tx: watch::Sender<bool>,
pub task_handle: tokio::task::JoinHandle<()>,
}
#[derive(Clone, Debug)]
pub struct BackgroundJobManager {
active: Arc<AtomicUsize>,
max_concurrent: usize,
jobs: Arc<Mutex<Vec<JobHandle>>>,
}
impl BackgroundJobManager {
pub fn new(max_concurrent: usize) -> Self {
Self {
active: Arc::new(AtomicUsize::new(0)),
max_concurrent,
jobs: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn try_acquire(&self) -> Result<(), usize> {
let current = self.active.load(Ordering::SeqCst);
if current >= self.max_concurrent {
return Err(current);
}
self.active.fetch_add(1, Ordering::SeqCst);
Ok(())
}
pub fn release(&self, job: CompletedJob) {
self.active.fetch_sub(1, Ordering::SeqCst);
let content = if job.output.starts_with("ERROR: ") {
format!(
"[Async agent '{}' failed]\n\n{}",
job.agent_name,
job.output.trim_start_matches("ERROR: ")
)
} else {
format!(
"[Async agent '{}' completed]\n\n{}",
job.agent_name, job.output
)
};
crate::session::inbox::push_inbox_message(crate::session::inbox::InboxMessage {
source: crate::session::inbox::InboxSource::BackgroundAgent {
name: job.agent_name,
},
content,
});
}
pub fn register_job(&self, handle: JobHandle) {
let mut jobs = self.jobs.lock().unwrap();
jobs.push(handle);
}
pub fn remove_job(&self, task_id: tokio::task::Id) {
let mut jobs = self.jobs.lock().unwrap();
jobs.retain(|j| j.task_handle.id() != task_id);
}
pub fn active_count(&self) -> usize {
self.active.load(Ordering::SeqCst)
}
pub async fn wait_all(&self) -> usize {
let handles: Vec<_> = {
let mut jobs = self.jobs.lock().unwrap();
std::mem::take(&mut *jobs)
};
let count = handles.len();
for handle in handles {
let _ = handle.task_handle.await;
}
count
}
pub fn kill_all(&self) {
let handles: Vec<_> = {
let mut jobs = self.jobs.lock().unwrap();
std::mem::take(&mut *jobs)
};
for handle in handles {
let _ = handle.cancel_tx.send(true);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_acquire_and_release() {
let mgr = BackgroundJobManager::new(2);
assert!(mgr.try_acquire().is_ok());
assert!(mgr.try_acquire().is_ok());
assert!(mgr.try_acquire().is_err());
mgr.release(CompletedJob {
agent_name: "a".into(),
output: "done".into(),
});
assert!(mgr.try_acquire().is_ok());
}
#[test]
fn test_active_count() {
let mgr = BackgroundJobManager::new(10);
assert_eq!(mgr.active_count(), 0);
mgr.try_acquire().unwrap();
mgr.try_acquire().unwrap();
assert_eq!(mgr.active_count(), 2);
mgr.release(CompletedJob {
agent_name: "a".into(),
output: "x".into(),
});
assert_eq!(mgr.active_count(), 1);
}
}