#[allow(unused_imports)]
use crate::sync_util::LockExt;
use indexmap::IndexMap;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
const MAX_TASK_OUTPUT_CHARS: usize = 3000;
const STORE_CAPACITY: usize = 32;
const MAX_CONCURRENT_SUBAGENTS: usize = 4;
#[derive(Debug, Clone)]
pub enum LifecycleEvent {
Started { id: String },
Finished(TaskNotification),
}
pub type LifecycleSender = mpsc::UnboundedSender<LifecycleEvent>;
pub type LifecycleReceiver = mpsc::UnboundedReceiver<LifecycleEvent>;
#[derive(Debug, Clone, Default)]
pub struct BackgroundStore {
inner: Arc<Mutex<Inner>>,
ui_sink: Option<LifecycleSender>,
}
#[derive(Debug, Default)]
struct Inner {
tasks: IndexMap<String, BackgroundTask>,
pending: VecDeque<TaskNotification>,
handles: HashMap<String, JoinHandle<()>>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum TaskState {
Running,
Completed(String),
Failed(String),
}
#[derive(Debug, Clone)]
pub struct BackgroundTask {
pub state: TaskState,
}
#[derive(Debug, Clone, PartialEq)]
pub struct TaskNotification {
pub id: String,
pub state: TaskState,
}
impl BackgroundStore {
#[allow(dead_code)]
pub fn new() -> Self {
Self::default()
}
pub fn with_ui_sink(ui_sink: LifecycleSender) -> Self {
Self {
inner: Arc::new(Mutex::new(Inner::default())),
ui_sink: Some(ui_sink),
}
}
pub fn insert(&self, id: String) {
let mut inner = self.lock();
if !inner.tasks.contains_key(&id) && inner.tasks.len() >= STORE_CAPACITY {
inner.tasks.shift_remove_index(0);
}
inner.tasks.insert(
id,
BackgroundTask {
state: TaskState::Running,
},
);
}
pub fn get(&self, id: &str) -> Option<BackgroundTask> {
self.lock().tasks.get(id).cloned()
}
pub fn running_count(&self) -> usize {
self.lock().handles.len()
}
pub fn max_concurrent() -> usize {
MAX_CONCURRENT_SUBAGENTS
}
pub fn notify(&self, id: &str, state: TaskState) {
if matches!(state, TaskState::Running) {
return;
}
let truncated = truncate_state(state);
let id_owned = id.to_string();
let mut inner = self.lock();
let Some(task) = inner.tasks.get_mut(id) else {
return;
};
task.state = truncated.clone();
inner.handles.remove(id);
if !inner.pending.iter().any(|n| n.id == id_owned) {
inner.pending.push_back(TaskNotification {
id: id_owned.clone(),
state: truncated.clone(),
});
}
drop(inner);
if let Some(sink) = &self.ui_sink {
let _ = sink.send(LifecycleEvent::Finished(TaskNotification {
id: id_owned,
state: truncated,
}));
}
}
pub fn attach_handle(&self, id: &str, handle: JoinHandle<()>) {
let mut inner = self.lock();
if !inner.tasks.contains_key(id) {
return;
}
if let Some(prev) = inner.handles.insert(id.to_string(), handle) {
prev.abort();
}
}
pub fn cancel_all(&self) {
let mut inner = self.lock();
for (_, h) in inner.handles.drain() {
h.abort();
}
let cancelled_label = "cancelled — session switched".to_string();
for task in inner.tasks.values_mut() {
if matches!(task.state, TaskState::Running) {
task.state = TaskState::Failed(cancelled_label.clone());
}
}
inner.pending.clear();
}
pub fn notify_started(&self, id: &str) {
if let Some(sink) = &self.ui_sink {
let _ = sink.send(LifecycleEvent::Started { id: id.to_string() });
}
}
pub fn drain_notifications(&self) -> Vec<TaskNotification> {
self.lock().pending.drain(..).collect()
}
pub fn has_pending_notifications(&self) -> bool {
!self.lock().pending.is_empty()
}
fn lock(&self) -> std::sync::MutexGuard<'_, Inner> {
self.inner.lock_ignore_poison()
}
#[cfg(test)]
fn len(&self) -> usize {
self.lock().tasks.len()
}
#[cfg(test)]
fn pending_len(&self) -> usize {
self.lock().pending.len()
}
}
pub fn followup_from_background_store(
store: BackgroundStore,
) -> crate::agent::agent_loop::hooks::GetFollowupMessagesFn {
use crate::agent::agent_loop::message::{LoopMessage, UserMessage};
Arc::new(move || {
let store = store.clone();
Box::pin(async move {
let drained = store.drain_notifications();
if drained.is_empty() {
return Vec::new();
}
let mut body = String::with_capacity(256);
body.push_str("<system-reminder>\n");
body.push_str("The following background tasks finished since your last turn:\n\n");
for (i, n) in drained.iter().enumerate() {
if i > 0 {
body.push('\n');
}
match &n.state {
TaskState::Completed(text) => {
body.push_str(&format!("[task {}] completed: {}\n", n.id, text));
}
TaskState::Failed(err) => {
body.push_str(&format!("[task {}] failed: {}\n", n.id, err));
}
TaskState::Running => {}
}
}
body.push_str("</system-reminder>");
vec![LoopMessage::User(UserMessage { content: body })]
})
})
}
pub(crate) fn prepend_pending_notifications(
prompt: &str,
store: Option<&BackgroundStore>,
) -> String {
let Some(store) = store else {
return prompt.to_string();
};
let drained = store.drain_notifications();
if drained.is_empty() {
return prompt.to_string();
}
let mut out = String::with_capacity(prompt.len() + 256);
out.push_str("<system-reminder>\n");
out.push_str("The following background tasks finished since your last turn:\n\n");
for (i, n) in drained.iter().enumerate() {
if i > 0 {
out.push('\n');
}
match &n.state {
TaskState::Completed(text) => {
out.push_str(&format!("Task {} (completed):\n{}\n", n.id, text));
}
TaskState::Failed(err) => {
out.push_str(&format!("Task {} (failed):\n{}\n", n.id, err));
}
TaskState::Running => {}
}
}
out.push_str("</system-reminder>\n\n");
out.push_str(prompt);
out
}
fn truncate_state(state: TaskState) -> TaskState {
match state {
TaskState::Completed(text) => {
let outcome = crate::agent::tools::output_relay::relay_if_large("task", text, "");
TaskState::Completed(outcome.text)
}
TaskState::Failed(err) => {
TaskState::Failed(crate::agent::tools::head_cap(
err,
MAX_TASK_OUTPUT_CHARS,
"task error",
))
}
s => s,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn insert_then_get_returns_running() {
let store = BackgroundStore::new();
store.insert("t1".into());
let task = store.get("t1").expect("task present");
assert_eq!(task.state, TaskState::Running);
}
#[test]
fn get_on_missing_returns_none() {
assert!(BackgroundStore::new().get("nope").is_none());
}
#[tokio::test]
async fn cancel_all_aborts_in_flight_tasks() {
let store = BackgroundStore::new();
store.insert("t1".into());
let store_for_task = store.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
store_for_task.notify("t1", TaskState::Completed("should not run".into()));
});
store.attach_handle("t1", handle);
store.insert("t_stale".into());
store.notify("t_stale", TaskState::Completed("prev session".into()));
assert_eq!(store.pending_len(), 1);
store.cancel_all();
assert_eq!(store.pending_len(), 0, "cancel_all must clear pending");
let t1 = store.get("t1").expect("t1 retained");
match &t1.state {
TaskState::Failed(reason) => assert!(
reason.contains("cancelled"),
"expected cancellation reason; got {:?}",
reason
),
other => panic!("expected Failed cancelled, got {:?}", other),
}
tokio::task::yield_now().await;
}
#[test]
fn regression_get_is_read_only_after_completion() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Completed("done".into()));
for _ in 0..3 {
let task = store.get("t1").expect("must remain after read");
assert_eq!(task.state, TaskState::Completed("done".into()));
}
assert_eq!(store.len(), 1);
}
#[test]
fn notify_pushes_completed_to_pending_queue() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Completed("done".into()));
assert_eq!(store.pending_len(), 1);
}
#[test]
fn regression_notify_relays_large_completed_payload() {
let store = BackgroundStore::new();
store.insert("t1".into());
let huge: String = (0..5_000)
.map(|i| format!("subagent output line {i}\n"))
.collect();
store.notify("t1", TaskState::Completed(huge));
let TaskState::Completed(text) = store.get("t1").unwrap().state else {
panic!("expected Completed");
};
assert!(
text.contains("`read`"),
"relayed summary must mention `read` tool: {text}",
);
assert!(
text.contains("transient") || text.contains(".dirge"),
"relayed summary must reference the transient path: {text}",
);
}
#[test]
fn regression_notify_truncates_failed_error() {
let store = BackgroundStore::new();
store.insert("t1".into());
let huge = "e".repeat(MAX_TASK_OUTPUT_CHARS * 2);
let huge_len = huge.len();
store.notify("t1", TaskState::Failed(huge));
let TaskState::Failed(text) = store.get("t1").unwrap().state else {
panic!("expected Failed");
};
assert!(
text.starts_with(&"e".repeat(MAX_TASK_OUTPUT_CHARS)),
"head must be preserved up to the cap"
);
assert!(
text.contains("truncated"),
"must carry a truncation marker: {text}"
);
assert!(text.len() < huge_len, "must be shorter than the original");
}
#[test]
fn small_completed_payload_passes_through_relay_verbatim() {
let store = BackgroundStore::new();
store.insert("t1".into());
let small = "subagent answer: 42\nplus a couple more lines.\n".to_string();
store.notify("t1", TaskState::Completed(small.clone()));
let TaskState::Completed(text) = store.get("t1").unwrap().state else {
panic!("expected Completed");
};
assert_eq!(text, small, "small payload must round-trip unchanged");
}
#[test]
fn notify_on_missing_id_is_noop() {
let store = BackgroundStore::new();
store.notify("ghost", TaskState::Completed("never inserted".into()));
assert!(store.get("ghost").is_none());
assert_eq!(store.pending_len(), 0);
}
#[test]
fn notify_with_running_state_is_noop() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Running);
assert_eq!(store.pending_len(), 0);
assert_eq!(store.get("t1").unwrap().state, TaskState::Running);
}
#[test]
fn regression_double_notify_enqueues_once() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Completed("first".into()));
store.notify("t1", TaskState::Completed("second".into()));
assert_eq!(store.pending_len(), 1);
let TaskState::Completed(text) = store.get("t1").unwrap().state else {
panic!("expected Completed");
};
assert_eq!(text, "second");
}
#[test]
fn drain_returns_pending_then_empties_queue() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.insert("t2".into());
store.notify("t1", TaskState::Completed("a".into()));
store.notify("t2", TaskState::Failed("b".into()));
let drained = store.drain_notifications();
assert_eq!(drained.len(), 2);
assert_eq!(drained[0].id, "t1");
assert_eq!(drained[0].state, TaskState::Completed("a".into()));
assert_eq!(drained[1].id, "t2");
assert_eq!(drained[1].state, TaskState::Failed("b".into()));
assert!(store.drain_notifications().is_empty());
assert!(store.get("t1").is_some());
assert!(store.get("t2").is_some());
}
#[test]
fn drain_is_empty_when_nothing_pending() {
let store = BackgroundStore::new();
store.insert("t1".into());
assert!(store.drain_notifications().is_empty());
}
#[test]
fn regression_lru_evicts_oldest_at_capacity() {
let store = BackgroundStore::new();
for i in 0..STORE_CAPACITY {
store.insert(format!("t{i}"));
}
assert_eq!(store.len(), STORE_CAPACITY);
store.insert("overflow".into());
assert_eq!(store.len(), STORE_CAPACITY);
assert!(store.get("t0").is_none());
assert!(store.get("overflow").is_some());
assert!(store.get(&format!("t{}", STORE_CAPACITY - 1)).is_some());
}
#[test]
fn re_insert_existing_id_does_not_evict() {
let store = BackgroundStore::new();
for i in 0..STORE_CAPACITY {
store.insert(format!("t{i}"));
}
store.insert("t5".into());
assert_eq!(store.len(), STORE_CAPACITY);
assert!(store.get("t0").is_some(), "oldest must NOT be evicted");
assert_eq!(store.get("t5").unwrap().state, TaskState::Running);
}
#[test]
fn regression_notify_on_evicted_id_is_noop() {
let store = BackgroundStore::new();
for i in 0..STORE_CAPACITY {
store.insert(format!("t{i}"));
}
store.insert("overflow".into());
store.notify("t0", TaskState::Completed("late".into()));
assert_eq!(store.pending_len(), 0);
assert!(store.drain_notifications().is_empty());
}
#[test]
fn clones_share_state() {
let a = BackgroundStore::new();
let b = a.clone();
a.insert("t1".into());
b.notify("t1", TaskState::Completed("via b".into()));
let drained = a.drain_notifications();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "t1");
}
#[test]
fn prepend_passthrough_when_store_is_none() {
let out = prepend_pending_notifications("hello", None);
assert_eq!(out, "hello");
}
#[test]
fn prepend_passthrough_when_nothing_pending() {
let store = BackgroundStore::new();
store.insert("t1".into()); let out = prepend_pending_notifications("hello", Some(&store));
assert_eq!(out, "hello");
}
#[test]
fn prepend_formats_system_reminder() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Completed("the result".into()));
let out = prepend_pending_notifications("user msg", Some(&store));
assert!(out.starts_with("<system-reminder>\n"));
assert!(out.contains("Task t1 (completed):"));
assert!(out.contains("the result"));
assert!(out.contains("</system-reminder>\n\n"));
assert!(out.ends_with("user msg"));
}
#[test]
fn prepend_includes_failed_tasks() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Failed("kaboom".into()));
let out = prepend_pending_notifications("user msg", Some(&store));
assert!(out.contains("Task t1 (failed):"));
assert!(out.contains("kaboom"));
}
#[test]
fn regression_prepend_drains_queue_once() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Completed("once".into()));
let first = prepend_pending_notifications("msg", Some(&store));
assert!(first.contains("once"));
let second = prepend_pending_notifications("msg", Some(&store));
assert_eq!(second, "msg");
}
#[test]
fn prepend_includes_all_pending_tasks_in_order() {
let store = BackgroundStore::new();
for i in 0..3 {
store.insert(format!("t{i}"));
store.notify(&format!("t{i}"), TaskState::Completed(format!("r{i}")));
}
let out = prepend_pending_notifications("msg", Some(&store));
let i0 = out.find("Task t0").unwrap();
let i1 = out.find("Task t1").unwrap();
let i2 = out.find("Task t2").unwrap();
assert!(i0 < i1 && i1 < i2);
}
fn unwrap_finished(evt: LifecycleEvent) -> TaskNotification {
match evt {
LifecycleEvent::Finished(n) => n,
other => panic!("expected Finished, got {:?}", other),
}
}
#[tokio::test]
async fn ui_sink_receives_completion_event() {
let (tx, mut rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.insert("t1".into());
store.notify("t1", TaskState::Completed("done".into()));
let notif = unwrap_finished(rx.recv().await.expect("event delivered"));
assert_eq!(notif.id, "t1");
assert_eq!(notif.state, TaskState::Completed("done".into()));
}
#[tokio::test]
async fn ui_sink_receives_failure_event() {
let (tx, mut rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.insert("t1".into());
store.notify("t1", TaskState::Failed("boom".into()));
let notif = unwrap_finished(rx.recv().await.unwrap());
assert_eq!(notif.state, TaskState::Failed("boom".into()));
}
#[tokio::test]
async fn ui_sink_event_carries_relayed_payload() {
let (tx, mut rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.insert("t1".into());
let huge: String = (0..5_000)
.map(|i| format!("subagent output line {i}\n"))
.collect();
let original_len = huge.len();
store.notify("t1", TaskState::Completed(huge));
let notif = unwrap_finished(rx.recv().await.unwrap());
let TaskState::Completed(text) = notif.state else {
panic!("expected Completed");
};
assert!(
text.len() < original_len / 2,
"relayed summary should be much smaller than the original (got {} of {} bytes)",
text.len(),
original_len,
);
assert!(
text.contains("`read`"),
"relayed summary must mention `read` tool",
);
}
#[tokio::test]
async fn ui_sink_does_not_receive_running_state() {
let (tx, mut rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.insert("t1".into());
store.notify("t1", TaskState::Running);
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn ui_sink_no_event_for_evicted_id() {
let (tx, mut rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.notify("ghost", TaskState::Completed("late".into()));
assert!(rx.try_recv().is_err());
}
#[test]
fn regression_drain_returns_snapshotted_state_after_eviction() {
let store = BackgroundStore::new();
store.insert("victim".into());
store.notify("victim", TaskState::Completed("the result".into()));
for i in 0..STORE_CAPACITY {
store.insert(format!("filler{i}"));
}
assert!(store.get("victim").is_none(), "victim must be evicted");
let drained = store.drain_notifications();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].id, "victim");
assert_eq!(drained[0].state, TaskState::Completed("the result".into()));
}
#[tokio::test]
async fn notify_started_fires_only_on_ui_sink() {
let (tx, mut rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.insert("t1".into());
store.notify_started("t1");
let evt = rx.recv().await.expect("Started event delivered");
match evt {
LifecycleEvent::Started { id } => assert_eq!(id, "t1"),
other => panic!("expected Started, got {other:?}"),
}
assert!(store.drain_notifications().is_empty());
}
#[tokio::test]
async fn notify_started_with_no_ui_sink_is_noop() {
let store = BackgroundStore::new();
store.notify_started("t1");
assert_eq!(store.pending_len(), 0);
}
#[tokio::test]
async fn ui_sink_send_after_receiver_dropped_is_silent() {
let (tx, rx) = mpsc::unbounded_channel();
let store = BackgroundStore::with_ui_sink(tx);
store.insert("t1".into());
drop(rx);
store.notify("t1", TaskState::Completed("payload".into()));
let drained = store.drain_notifications();
assert_eq!(drained.len(), 1);
}
use crate::agent::agent_loop::message::LoopMessage;
#[tokio::test]
async fn subagent_completion_injects_followup_message() {
let store = BackgroundStore::new();
store.insert("abc123".into());
store.notify("abc123", TaskState::Completed("the answer is 42".into()));
let hook = followup_from_background_store(store.clone());
let messages = hook().await;
assert_eq!(messages.len(), 1, "exactly one synthesized user message");
let LoopMessage::User(u) = &messages[0] else {
panic!("expected User message, got {:?}", messages[0]);
};
assert!(u.content.starts_with("<system-reminder>\n"));
assert!(u.content.ends_with("</system-reminder>"));
assert!(u.content.contains("[task abc123] completed:"));
assert!(u.content.contains("the answer is 42"));
assert!(hook().await.is_empty());
}
#[tokio::test]
async fn subagent_failure_injects_followup_with_error_marker() {
let store = BackgroundStore::new();
store.insert("xyz789".into());
store.notify(
"xyz789",
TaskState::Failed("connection reset by peer".into()),
);
let hook = followup_from_background_store(store);
let messages = hook().await;
assert_eq!(messages.len(), 1);
let LoopMessage::User(u) = &messages[0] else {
panic!("expected User message");
};
assert!(u.content.contains("[task xyz789] failed:"));
assert!(u.content.contains("connection reset by peer"));
assert!(
!u.content.contains("completed:"),
"failures must not be tagged 'completed': {}",
u.content,
);
}
#[tokio::test]
async fn followup_batches_multiple_completions_in_one_reminder() {
let store = BackgroundStore::new();
for i in 0..3 {
store.insert(format!("t{i}"));
store.notify(
&format!("t{i}"),
TaskState::Completed(format!("result-{i}")),
);
}
let hook = followup_from_background_store(store);
let messages = hook().await;
assert_eq!(messages.len(), 1, "one reminder, not one per task");
let LoopMessage::User(u) = &messages[0] else {
panic!("expected User");
};
assert!(u.content.contains("[task t0] completed: result-0"));
assert!(u.content.contains("[task t1] completed: result-1"));
assert!(u.content.contains("[task t2] completed: result-2"));
let i0 = u.content.find("t0").unwrap();
let i1 = u.content.find("t1").unwrap();
let i2 = u.content.find("t2").unwrap();
assert!(i0 < i1 && i1 < i2);
}
#[tokio::test]
async fn followup_returns_empty_when_no_completions() {
let store = BackgroundStore::new();
store.insert("running".into()); let hook = followup_from_background_store(store);
assert!(hook().await.is_empty());
}
#[tokio::test]
async fn followup_drains_queue_once() {
let store = BackgroundStore::new();
store.insert("t1".into());
store.notify("t1", TaskState::Completed("once".into()));
let hook = followup_from_background_store(store);
let first = hook().await;
assert_eq!(first.len(), 1);
let second = hook().await;
assert!(second.is_empty(), "second poll must not redeliver");
}
#[test]
fn concurrent_inserts_and_notifies() {
let store = BackgroundStore::new();
let mut handles = Vec::new();
let n = STORE_CAPACITY;
for i in 0..n {
let s = store.clone();
let id = format!("t{i}");
handles.push(std::thread::spawn(move || {
s.insert(id.clone());
s.notify(&id, TaskState::Completed(format!("done-{i}")));
}));
}
for h in handles {
h.join().unwrap();
}
let drained = store.drain_notifications();
assert_eq!(drained.len(), n);
let mut ids: Vec<String> = drained.into_iter().map(|n| n.id).collect();
ids.sort();
let mut expected: Vec<String> = (0..n).map(|i| format!("t{i}")).collect();
expected.sort();
assert_eq!(ids, expected);
}
}