use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use tokio::sync::{oneshot, watch};
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use crate::engine::EngineEvent;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AgentStatus {
Pending,
Running {
iter: u8,
},
Cancelled,
Completed {
summary: String,
},
Errored {
error: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BgTaskSnapshot {
pub task_id: u32,
pub agent_name: String,
pub prompt: String,
pub age: Duration,
pub status: AgentStatus,
pub spawner: Option<u32>,
}
pub type BgPayload = (String, Vec<String>);
#[derive(Debug)]
pub struct BgAgentResult {
pub agent_name: String,
pub prompt: String,
pub output: String,
pub success: bool,
pub events: Vec<String>,
}
struct BgAgentEntry {
agent_name: String,
prompt: String,
rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
cancel: CancellationToken,
status_rx: watch::Receiver<AgentStatus>,
started_at: Instant,
spawner: Option<u32>,
_handle: AbortOnDropHandle<()>,
}
pub struct BgAgentRegistry {
pending: Mutex<HashMap<u32, BgAgentEntry>>,
next_id: Mutex<u32>,
events: Mutex<std::collections::VecDeque<EngineEvent>>,
}
#[derive(Clone)]
pub struct BgStatusEmitter {
task_id: u32,
spawner: Option<u32>,
status_tx: watch::Sender<AgentStatus>,
registry: Arc<BgAgentRegistry>,
}
impl BgStatusEmitter {
pub fn new(
task_id: u32,
spawner: Option<u32>,
status_tx: watch::Sender<AgentStatus>,
registry: Arc<BgAgentRegistry>,
) -> Self {
Self {
task_id,
spawner,
status_tx,
registry,
}
}
pub fn send(&self, status: AgentStatus) {
let _ = self.status_tx.send(status.clone());
self.registry.push_status_event(EngineEvent::BgTaskUpdate {
task_id: self.task_id,
spawner: self.spawner,
status,
});
}
pub fn current(&self) -> AgentStatus {
self.status_tx.borrow().clone()
}
#[cfg(test)]
pub fn status_sender(&self) -> watch::Sender<AgentStatus> {
self.status_tx.clone()
}
}
pub struct BgAgentReservation {
pub task_id: u32,
pub tx: oneshot::Sender<Result<BgPayload, BgPayload>>,
pub rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
pub cancel: CancellationToken,
pub status_tx: watch::Sender<AgentStatus>,
pub status_rx: watch::Receiver<AgentStatus>,
pub spawner: Option<u32>,
}
impl BgAgentRegistry {
pub fn new() -> Self {
Self {
pending: Mutex::new(HashMap::new()),
next_id: Mutex::new(1),
events: Mutex::new(std::collections::VecDeque::new()),
}
}
pub(crate) fn push_status_event(&self, event: EngineEvent) {
self.events.lock().push_back(event);
}
pub fn drain_status_events(&self) -> Vec<EngineEvent> {
let mut q = self.events.lock();
q.drain(..).collect()
}
pub fn reserve(
&self,
parent_cancel: &CancellationToken,
spawner: Option<u32>,
) -> BgAgentReservation {
let (tx, rx) = oneshot::channel();
let (status_tx, status_rx) = watch::channel(AgentStatus::Pending);
let mut id = self.next_id.lock();
let task_id = *id;
*id += 1;
BgAgentReservation {
task_id,
tx,
rx,
cancel: parent_cancel.child_token(),
status_tx,
status_rx,
spawner,
}
}
#[allow(clippy::too_many_arguments)]
pub fn attach(
&self,
reservation_id: u32,
agent_name: &str,
prompt: &str,
rx: oneshot::Receiver<Result<BgPayload, BgPayload>>,
cancel: CancellationToken,
status_rx: watch::Receiver<AgentStatus>,
spawner: Option<u32>,
handle: tokio::task::JoinHandle<()>,
) {
self.pending.lock().insert(
reservation_id,
BgAgentEntry {
agent_name: agent_name.to_string(),
prompt: prompt.to_string(),
rx,
cancel,
status_rx,
started_at: Instant::now(),
spawner,
_handle: AbortOnDropHandle::new(handle),
},
);
}
#[cfg(test)]
pub fn register_test(
&self,
agent_name: &str,
prompt: &str,
) -> (u32, oneshot::Sender<Result<BgPayload, BgPayload>>) {
let (id, tx, _status_tx, _cancel) =
self.register_test_with_status(agent_name, prompt, None);
(id, tx)
}
#[cfg(test)]
pub fn register_test_with_status(
&self,
agent_name: &str,
prompt: &str,
spawner: Option<u32>,
) -> (
u32,
oneshot::Sender<Result<BgPayload, BgPayload>>,
watch::Sender<AgentStatus>,
CancellationToken,
) {
let (tx, rx) = oneshot::channel();
let (status_tx, status_rx) = watch::channel(AgentStatus::Pending);
let mut id = self.next_id.lock();
let task_id = *id;
*id += 1;
drop(id);
let cancel = CancellationToken::new();
let cancel_observer = cancel.clone();
let noop = tokio::spawn(async {});
self.pending.lock().insert(
task_id,
BgAgentEntry {
agent_name: agent_name.to_string(),
prompt: prompt.to_string(),
rx,
cancel,
status_rx,
started_at: Instant::now(),
spawner,
_handle: AbortOnDropHandle::new(noop),
},
);
(task_id, tx, status_tx, cancel_observer)
}
pub fn drain_completed(&self) -> Vec<BgAgentResult> {
let mut guard = self.pending.lock();
let mut completed = Vec::new();
let mut done_ids = Vec::new();
for (id, entry) in guard.iter_mut() {
match entry.rx.try_recv() {
Ok(Ok((output, events))) => {
done_ids.push(*id);
completed.push(BgAgentResult {
agent_name: entry.agent_name.clone(),
prompt: entry.prompt.clone(),
output,
success: true,
events,
});
}
Ok(Err((err, events))) => {
done_ids.push(*id);
completed.push(BgAgentResult {
agent_name: entry.agent_name.clone(),
prompt: entry.prompt.clone(),
output: err,
success: false,
events,
});
}
Err(oneshot::error::TryRecvError::Empty) => {
}
Err(oneshot::error::TryRecvError::Closed) => {
done_ids.push(*id);
completed.push(BgAgentResult {
agent_name: entry.agent_name.clone(),
prompt: entry.prompt.clone(),
output: "[background agent task was cancelled]".to_string(),
success: false,
events: Vec::new(),
});
}
}
}
for id in done_ids {
guard.remove(&id);
}
completed
}
pub fn pending_count(&self) -> usize {
self.pending.lock().len()
}
pub fn cancel(&self, task_id: u32) -> bool {
let guard = self.pending.lock();
match guard.get(&task_id) {
Some(entry) => {
entry.cancel.cancel();
true
}
None => false,
}
}
pub fn snapshot(&self) -> Vec<BgTaskSnapshot> {
let guard = self.pending.lock();
let now = Instant::now();
let mut out: Vec<_> = guard
.iter()
.map(|(id, entry)| BgTaskSnapshot {
task_id: *id,
agent_name: entry.agent_name.clone(),
prompt: entry.prompt.clone(),
age: now.saturating_duration_since(entry.started_at),
status: entry.status_rx.borrow().clone(),
spawner: entry.spawner,
})
.collect();
out.sort_by_key(|s| s.task_id);
out
}
pub fn snapshot_for_caller(&self, caller_spawner: Option<u32>) -> Vec<BgTaskSnapshot> {
self.snapshot()
.into_iter()
.filter(|s| s.spawner == caller_spawner)
.collect()
}
pub fn subscribe(&self, task_id: u32) -> Option<watch::Receiver<AgentStatus>> {
let guard = self.pending.lock();
guard.get(&task_id).map(|e| e.status_rx.clone())
}
pub fn cancel_as_caller(&self, task_id: u32, caller_spawner: Option<u32>) -> CancelOutcome {
let guard = self.pending.lock();
match guard.get(&task_id) {
None => CancelOutcome::NotFound,
Some(entry) if entry.spawner != caller_spawner => CancelOutcome::Forbidden,
Some(entry) => {
entry.cancel.cancel();
CancelOutcome::Cancelled
}
}
}
pub fn cancel_for_spawner(&self, spawner: u32) -> usize {
let guard = self.pending.lock();
let mut count = 0;
for entry in guard.values() {
if entry.spawner == Some(spawner) {
entry.cancel.cancel();
count += 1;
}
}
count
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CancelOutcome {
Cancelled,
NotFound,
Forbidden,
}
#[derive(Debug)]
pub enum WaitOutcome {
Completed(BgAgentResult),
Cancelled,
TimedOut(BgTaskSnapshot),
NotFound,
Forbidden,
}
impl BgAgentRegistry {
pub async fn wait_for_completion(
&self,
task_id: u32,
caller_spawner: Option<u32>,
timeout: Duration,
) -> WaitOutcome {
let status_rx = {
let guard = self.pending.lock();
match guard.get(&task_id) {
None => return WaitOutcome::NotFound,
Some(entry) if entry.spawner != caller_spawner => {
return WaitOutcome::Forbidden;
}
Some(entry) => entry.status_rx.clone(),
}
};
let wait_fut = wait_for_terminal_status(status_rx);
let result = tokio::time::timeout(timeout, wait_fut).await;
match result {
Err(_elapsed) => {
let snap = self.snapshot().into_iter().find(|s| s.task_id == task_id);
match snap {
Some(s) => WaitOutcome::TimedOut(s),
None => WaitOutcome::NotFound,
}
}
Ok(()) => {
let entry = {
let mut guard = self.pending.lock();
let Some(entry) = guard.remove(&task_id) else {
return WaitOutcome::NotFound;
};
entry
};
let agent_name = entry.agent_name;
let prompt = entry.prompt;
match tokio::time::timeout(Duration::from_millis(50), entry.rx).await {
Ok(Ok(Ok((output, events)))) => WaitOutcome::Completed(BgAgentResult {
agent_name,
prompt,
output,
success: true,
events,
}),
Ok(Ok(Err((err, events)))) => WaitOutcome::Completed(BgAgentResult {
agent_name,
prompt,
output: err,
success: false,
events,
}),
Ok(Err(_)) | Err(_) => WaitOutcome::Cancelled,
}
}
}
}
}
async fn wait_for_terminal_status(mut rx: watch::Receiver<AgentStatus>) {
loop {
let is_terminal = matches!(
*rx.borrow(),
AgentStatus::Completed { .. } | AgentStatus::Errored { .. } | AgentStatus::Cancelled
);
if is_terminal {
return;
}
if rx.changed().await.is_err() {
return;
}
}
}
impl Default for BgAgentRegistry {
fn default() -> Self {
Self::new()
}
}
impl Drop for BgAgentRegistry {
fn drop(&mut self) {
let map = std::mem::take(&mut *self.pending.lock());
if !map.is_empty() {
tracing::debug!(
count = map.len(),
"BgAgentRegistry dropped with pending tasks; aborting"
);
}
}
}
pub fn new_shared() -> Arc<BgAgentRegistry> {
Arc::new(BgAgentRegistry::new())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
#[tokio::test]
async fn register_and_complete() {
let reg = BgAgentRegistry::new();
let (task_id, tx) = reg.register_test("explore", "find all tests");
assert_eq!(task_id, 1);
assert_eq!(reg.pending_count(), 1);
assert!(reg.drain_completed().is_empty());
tx.send(Ok(("found 42 tests".to_string(), Vec::new())))
.unwrap();
let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert_eq!(results[0].agent_name, "explore");
assert_eq!(results[0].output, "found 42 tests");
assert!(results[0].success);
assert_eq!(reg.pending_count(), 0);
}
#[tokio::test]
async fn drain_only_completed() {
let reg = BgAgentRegistry::new();
let (_id1, tx1) = reg.register_test("task", "build");
let (_id2, _tx2) = reg.register_test("explore", "search");
tx1.send(Ok(("done".to_string(), Vec::new()))).unwrap();
let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert_eq!(results[0].agent_name, "task");
assert_eq!(reg.pending_count(), 1); }
#[tokio::test]
async fn dropped_sender_reports_cancelled() {
let reg = BgAgentRegistry::new();
let (_id, tx) = reg.register_test("task", "build");
drop(tx);
let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert!(!results[0].success);
assert!(results[0].output.contains("cancelled"));
}
#[tokio::test]
async fn error_result() {
let reg = BgAgentRegistry::new();
let (_id, tx) = reg.register_test("verify", "check");
tx.send(Err(("test failures".to_string(), Vec::new())))
.unwrap();
let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert!(!results[0].success);
assert_eq!(results[0].output, "test failures");
}
#[tokio::test]
async fn events_propagate_through_drain_for_success() {
let reg = BgAgentRegistry::new();
let (_id, tx) = reg.register_test("explore", "map repo");
let trace = vec![
" \u{1f527} Read".to_string(),
" \u{1f527} Grep".to_string(),
" \u{26a1} cache hit".to_string(),
];
tx.send(Ok(("map result".to_string(), trace.clone())))
.unwrap();
let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert!(results[0].success);
assert_eq!(
results[0].events, trace,
"trace lost between sender and BgAgentResult"
);
}
#[tokio::test]
async fn events_propagate_through_drain_for_failure() {
let reg = BgAgentRegistry::new();
let (_id, tx) = reg.register_test("build", "compile");
let trace = vec![
" \u{1f527} Bash".to_string(),
" \u{2398} approval auto-rejected for Delete (no user channel)".to_string(),
];
tx.send(Err(("compile failed".to_string(), trace.clone())))
.unwrap();
let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert!(!results[0].success);
assert_eq!(results[0].events, trace);
}
#[tokio::test]
async fn cancelled_task_has_empty_event_trace() {
let reg = BgAgentRegistry::new();
let (_id, tx) = reg.register_test("flaky", "x");
drop(tx); let results = reg.drain_completed();
assert_eq!(results.len(), 1);
assert!(!results[0].success);
assert!(
results[0].events.is_empty(),
"cancel path must yield empty trace"
);
}
#[tokio::test]
async fn registry_drop_aborts_pending_tasks() {
let reg = BgAgentRegistry::new();
let parent = CancellationToken::new();
let reservation = reg.reserve(&parent, None);
let task_id = reservation.task_id;
let cancel_for_task = reservation.cancel.clone();
let tx = reservation.tx;
let rx = reservation.rx;
let cancel_for_entry = reservation.cancel;
let status_rx = reservation.status_rx;
let ran_to_completion = Arc::new(AtomicBool::new(false));
let flag = ran_to_completion.clone();
let handle = tokio::spawn(async move {
tokio::select! {
_ = cancel_for_task.cancelled() => {}
_ = tokio::time::sleep(Duration::from_secs(60)) => {
flag.store(true, Ordering::SeqCst);
}
}
let _ = tx.send(Ok(("done".to_string(), Vec::new())));
});
reg.attach(
task_id,
"explore",
"long task",
rx,
cancel_for_entry,
status_rx,
None,
handle,
);
tokio::time::sleep(Duration::from_millis(20)).await;
assert_eq!(reg.pending_count(), 1);
drop(reg);
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
!ran_to_completion.load(Ordering::SeqCst),
"task slept to completion — AbortOnDropHandle did not abort it"
);
}
#[tokio::test]
async fn parent_cancel_cascades_to_reserved_child() {
let reg = BgAgentRegistry::new();
let parent = CancellationToken::new();
let r1 = reg.reserve(&parent, None);
let r2 = reg.reserve(&parent, None);
assert!(!r1.cancel.is_cancelled());
assert!(!r2.cancel.is_cancelled());
parent.cancel();
assert!(
r1.cancel.is_cancelled(),
"child 1 token should observe parent cancel"
);
assert!(
r2.cancel.is_cancelled(),
"child 2 token should observe parent cancel"
);
}
#[tokio::test]
async fn cancel_known_task_fires_token() {
let reg = BgAgentRegistry::new();
let (task_id, _tx, _status_tx, observer) =
reg.register_test_with_status("explore", "map repo", None);
assert!(!observer.is_cancelled(), "precondition");
let fired = reg.cancel(task_id);
assert!(fired, "cancel(known_id) should report success");
assert!(
observer.is_cancelled(),
"the task's cancel token should observe the cancellation"
);
}
#[tokio::test]
async fn cancel_unknown_task_returns_false() {
let reg = BgAgentRegistry::new();
assert!(
!reg.cancel(999),
"cancel of an unknown id should be a no-op returning false"
);
}
#[tokio::test]
async fn cancel_is_idempotent_while_pending() {
let reg = BgAgentRegistry::new();
let (task_id, _tx, _status_tx, _observer) =
reg.register_test_with_status("explore", "x", None);
assert!(reg.cancel(task_id));
assert!(
reg.cancel(task_id),
"second cancel should still find the entry and report success"
);
}
#[tokio::test]
async fn snapshot_lists_pending_tasks_in_id_order() {
let reg = BgAgentRegistry::new();
let (id_a, _tx_a) = reg.register_test("explore", "map");
let (id_b, _tx_b) = reg.register_test("verify", "check");
let snap = reg.snapshot();
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].task_id, id_a);
assert_eq!(snap[0].agent_name, "explore");
assert_eq!(snap[0].prompt, "map");
assert_eq!(snap[0].status, AgentStatus::Pending);
assert_eq!(snap[1].task_id, id_b);
assert_eq!(snap[1].agent_name, "verify");
assert_eq!(snap[1].status, AgentStatus::Pending);
}
#[tokio::test]
async fn snapshot_reflects_status_writes() {
let reg = BgAgentRegistry::new();
let (task_id, _tx, status_tx, _cancel) =
reg.register_test_with_status("explore", "map", None);
assert_eq!(reg.snapshot()[0].status, AgentStatus::Pending);
status_tx.send(AgentStatus::Running { iter: 3 }).unwrap();
let snap = reg.snapshot();
assert_eq!(snap.len(), 1);
assert_eq!(snap[0].task_id, task_id);
assert_eq!(snap[0].status, AgentStatus::Running { iter: 3 });
status_tx
.send(AgentStatus::Completed {
summary: "42 files".to_string(),
})
.unwrap();
assert_eq!(
reg.snapshot()[0].status,
AgentStatus::Completed {
summary: "42 files".to_string()
}
);
}
#[tokio::test]
async fn snapshot_age_is_monotonic() {
let reg = BgAgentRegistry::new();
let (_id, _tx) = reg.register_test("explore", "x");
let age1 = reg.snapshot()[0].age;
tokio::time::sleep(Duration::from_millis(15)).await;
let age2 = reg.snapshot()[0].age;
assert!(
age2 >= age1,
"age should be monotonic non-decreasing across snapshots"
);
}
#[tokio::test]
async fn snapshot_empty_registry_is_empty_vec() {
let reg = BgAgentRegistry::new();
assert!(reg.snapshot().is_empty());
}
#[tokio::test]
async fn snapshot_drops_drained_tasks() {
let reg = BgAgentRegistry::new();
let (_id, tx) = reg.register_test("explore", "x");
assert_eq!(reg.snapshot().len(), 1);
tx.send(Ok(("done".to_string(), Vec::new()))).unwrap();
let _ = reg.drain_completed();
assert!(
reg.snapshot().is_empty(),
"drained tasks must not appear in snapshots"
);
}
#[tokio::test]
async fn snapshot_for_caller_filters_by_spawner() {
let reg = BgAgentRegistry::new();
let (top_id, _tx, _, _) = reg.register_test_with_status("a", "top", None);
let (sub_a_id, _tx, _, _) = reg.register_test_with_status("b", "sub-a", Some(7));
let (_sub_b_id, _tx, _, _) = reg.register_test_with_status("c", "sub-b", Some(9));
let top = reg.snapshot_for_caller(None);
assert_eq!(top.len(), 1);
assert_eq!(top[0].task_id, top_id);
let sub_a = reg.snapshot_for_caller(Some(7));
assert_eq!(sub_a.len(), 1);
assert_eq!(sub_a[0].task_id, sub_a_id);
assert!(reg.snapshot_for_caller(Some(42)).is_empty());
}
#[tokio::test]
async fn cancel_as_caller_returns_forbidden_for_other_spawner() {
let reg = BgAgentRegistry::new();
let (id, _tx, _, observer) = reg.register_test_with_status("x", "y", Some(7));
assert_eq!(
reg.cancel_as_caller(id, None),
CancelOutcome::Forbidden,
"top-level must not be able to cancel sub-agent's task"
);
assert_eq!(
reg.cancel_as_caller(id, Some(99)),
CancelOutcome::Forbidden,
"sibling sub-agent must not be able to cancel"
);
assert!(
!observer.is_cancelled(),
"forbidden calls must NOT fire the cancel token"
);
assert_eq!(reg.cancel_as_caller(id, Some(7)), CancelOutcome::Cancelled);
assert!(observer.is_cancelled());
}
#[tokio::test]
async fn cancel_as_caller_returns_not_found_for_unknown_id() {
let reg = BgAgentRegistry::new();
assert_eq!(reg.cancel_as_caller(999, None), CancelOutcome::NotFound);
}
#[tokio::test]
async fn cancel_for_spawner_kills_only_matching_children() {
let reg = BgAgentRegistry::new();
let (_top, _, _, top_obs) = reg.register_test_with_status("top", "t", None);
let (_a1, _, _, a1_obs) = reg.register_test_with_status("a1", "x", Some(7));
let (_a2, _, _, a2_obs) = reg.register_test_with_status("a2", "y", Some(7));
let (_b, _, _, b_obs) = reg.register_test_with_status("b", "z", Some(9));
let count = reg.cancel_for_spawner(7);
assert_eq!(count, 2, "both of spawner 7's children must be signalled");
assert!(a1_obs.is_cancelled());
assert!(a2_obs.is_cancelled());
assert!(!top_obs.is_cancelled(), "top-level must be untouched");
assert!(!b_obs.is_cancelled(), "sibling spawner's task untouched");
assert_eq!(reg.cancel_for_spawner(7), 2);
assert_eq!(reg.cancel_for_spawner(99), 0);
}
#[tokio::test]
async fn wait_for_completion_consumes_completed_task() {
let reg = BgAgentRegistry::new();
let (id, tx, status_tx, _) = reg.register_test_with_status("explore", "map", Some(3));
tx.send(Ok(("final answer".to_string(), vec!["step 1".to_string()])))
.unwrap();
status_tx
.send(AgentStatus::Completed {
summary: "final answer".to_string(),
})
.unwrap();
let outcome = reg
.wait_for_completion(id, Some(3), Duration::from_secs(1))
.await;
match outcome {
WaitOutcome::Completed(result) => {
assert!(result.success);
assert_eq!(result.output, "final answer");
assert_eq!(result.events, vec!["step 1".to_string()]);
}
other => panic!("expected Completed, got {other:?}"),
}
assert_eq!(reg.drain_completed().len(), 0);
assert!(reg.snapshot().is_empty());
}
#[tokio::test]
async fn wait_for_completion_timeout_preserves_entry() {
let reg = BgAgentRegistry::new();
let (id, _tx, status_tx, _) = reg.register_test_with_status("slow", "x", None);
status_tx.send(AgentStatus::Running { iter: 2 }).unwrap();
let outcome = reg
.wait_for_completion(id, None, Duration::from_millis(40))
.await;
match outcome {
WaitOutcome::TimedOut(snap) => {
assert_eq!(snap.task_id, id);
assert_eq!(snap.status, AgentStatus::Running { iter: 2 });
}
other => panic!("expected TimedOut, got {other:?}"),
}
assert_eq!(reg.snapshot().len(), 1);
}
#[tokio::test]
async fn wait_for_completion_returns_forbidden_for_other_spawner() {
let reg = BgAgentRegistry::new();
let (id, _tx, _, _) = reg.register_test_with_status("x", "y", Some(5));
let outcome = reg
.wait_for_completion(id, None, Duration::from_millis(20))
.await;
assert!(
matches!(outcome, WaitOutcome::Forbidden),
"top-level must not be able to wait on sub-agent task; got {outcome:?}"
);
let outcome = reg
.wait_for_completion(id, Some(99), Duration::from_millis(20))
.await;
assert!(
matches!(outcome, WaitOutcome::Forbidden),
"sibling sub-agent must not be able to wait; got {outcome:?}"
);
}
#[tokio::test]
async fn wait_for_completion_returns_cancelled_when_sender_dropped() {
let reg = BgAgentRegistry::new();
let (id, tx, status_tx, _) = reg.register_test_with_status("x", "y", None);
drop(tx);
status_tx.send(AgentStatus::Cancelled).unwrap();
let outcome = reg
.wait_for_completion(id, None, Duration::from_secs(1))
.await;
assert!(matches!(outcome, WaitOutcome::Cancelled), "got {outcome:?}");
assert!(reg.snapshot().is_empty(), "entry must be reaped");
}
#[tokio::test]
async fn wait_for_completion_returns_not_found_for_unknown_id() {
let reg = BgAgentRegistry::new();
let outcome = reg
.wait_for_completion(999, None, Duration::from_millis(10))
.await;
assert!(matches!(outcome, WaitOutcome::NotFound), "got {outcome:?}");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_for_completion_handles_status_then_yield_then_payload() {
let reg = Arc::new(BgAgentRegistry::new());
let (id, tx, status_tx, _observer) =
reg.register_test_with_status("explore", "map repo", None);
let send_task = tokio::spawn(async move {
status_tx
.send(AgentStatus::Completed {
summary: "done".into(),
})
.unwrap();
tokio::task::yield_now().await;
tokio::task::yield_now().await;
let _ = tx.send(Ok(("final".into(), vec!["e1".into()])));
});
let outcome = reg
.wait_for_completion(id, None, Duration::from_secs(2))
.await;
send_task.await.unwrap();
match outcome {
WaitOutcome::Completed(result) => {
assert_eq!(result.output, "final");
assert!(result.success);
assert_eq!(result.events, vec!["e1".to_string()]);
}
other => panic!("expected Completed, got {other:?}"),
}
}
fn emitter_for(
reg: &Arc<BgAgentRegistry>,
task_id: u32,
spawner: Option<u32>,
) -> BgStatusEmitter {
let (tx, _rx) = watch::channel(AgentStatus::Pending);
BgStatusEmitter::new(task_id, spawner, tx, reg.clone())
}
fn extract(event: &EngineEvent) -> (u32, Option<u32>, &AgentStatus) {
match event {
EngineEvent::BgTaskUpdate {
task_id,
spawner,
status,
} => (*task_id, *spawner, status),
other => panic!("expected BgTaskUpdate, got {other:?}"),
}
}
#[test]
fn emitter_send_queues_engine_event_on_registry() {
let reg = Arc::new(BgAgentRegistry::new());
let emitter = emitter_for(®, 7, Some(42));
assert!(
reg.drain_status_events().is_empty(),
"fresh registry must have an empty event queue"
);
emitter.send(AgentStatus::Running { iter: 0 });
let drained = reg.drain_status_events();
assert_eq!(drained.len(), 1, "single send must produce one event");
let (id, spawner, status) = extract(&drained[0]);
assert_eq!(id, 7);
assert_eq!(spawner, Some(42));
assert!(matches!(status, AgentStatus::Running { iter: 0 }));
}
#[test]
fn emitter_drain_is_fifo_and_clears_queue() {
let reg = Arc::new(BgAgentRegistry::new());
let emitter = emitter_for(®, 1, None);
emitter.send(AgentStatus::Running { iter: 0 });
emitter.send(AgentStatus::Running { iter: 1 });
emitter.send(AgentStatus::Running { iter: 2 });
emitter.send(AgentStatus::Completed {
summary: "done".into(),
});
let drained = reg.drain_status_events();
assert_eq!(drained.len(), 4, "all four sends must surface");
let iters: Vec<_> = drained
.iter()
.filter_map(|e| match e {
EngineEvent::BgTaskUpdate {
status: AgentStatus::Running { iter },
..
} => Some(*iter),
_ => None,
})
.collect();
assert_eq!(iters, vec![0, 1, 2]);
assert!(matches!(
extract(&drained[3]).2,
AgentStatus::Completed { .. }
));
assert!(
reg.drain_status_events().is_empty(),
"drain must clear the queue"
);
}
#[test]
fn emitter_send_also_updates_watch_channel() {
let reg = Arc::new(BgAgentRegistry::new());
let (tx, mut rx) = watch::channel(AgentStatus::Pending);
let emitter = BgStatusEmitter::new(3, None, tx, reg.clone());
emitter.send(AgentStatus::Running { iter: 5 });
assert!(matches!(
*rx.borrow_and_update(),
AgentStatus::Running { iter: 5 }
));
let drained = reg.drain_status_events();
assert_eq!(drained.len(), 1);
assert!(matches!(
extract(&drained[0]).2,
AgentStatus::Running { iter: 5 }
));
}
#[test]
fn emitter_clones_share_queue_and_watch() {
let reg = Arc::new(BgAgentRegistry::new());
let (tx, _rx) = watch::channel(AgentStatus::Pending);
let a = BgStatusEmitter::new(11, Some(2), tx, reg.clone());
let b = a.clone();
a.send(AgentStatus::Running { iter: 1 });
b.send(AgentStatus::Completed {
summary: "ok".into(),
});
let drained = reg.drain_status_events();
assert_eq!(drained.len(), 2, "clones must share the registry queue");
assert!(matches!(a.current(), AgentStatus::Completed { .. }));
}
#[test]
fn agent_status_round_trips_through_serde() {
for status in [
AgentStatus::Pending,
AgentStatus::Running { iter: 0 },
AgentStatus::Running { iter: 17 },
AgentStatus::Cancelled,
AgentStatus::Completed {
summary: "hello".into(),
},
AgentStatus::Errored {
error: "boom".into(),
},
] {
let event = EngineEvent::BgTaskUpdate {
task_id: 1,
spawner: Some(2),
status: status.clone(),
};
let json = serde_json::to_string(&event).expect("serialize");
let back: EngineEvent = serde_json::from_str(&json).expect("deserialize");
match back {
EngineEvent::BgTaskUpdate {
task_id,
spawner,
status: round_tripped,
} => {
assert_eq!(task_id, 1);
assert_eq!(spawner, Some(2));
assert_eq!(round_tripped, status, "json round-trip lost data: {json}");
}
other => panic!("round-trip changed variant: {other:?}"),
}
}
}
#[test]
fn drain_status_events_is_empty_on_fresh_registry() {
let reg = BgAgentRegistry::new();
let drained = reg.drain_status_events();
assert!(drained.is_empty());
}
}