defect_agent/session/background.rs
1//! Session-level background task table.
2//!
3//! ## Problem
4//!
5//! Tools (primarily `spawn_agent { run_in_background: true }`) want to fire-and-forget
6//! a task without blocking the initiating turn. However, the turn main loop's
7//! `run_tools_concurrently` holds tool tasks in a function-local `JoinSet` — when the
8//! function returns, the `JoinSet` is dropped and tasks are aborted, so no task can
9//! outlive the turn that created it.
10//!
11//! [`BackgroundTasks`] moves task `JoinHandle`s to the **session level** (same lifetime
12//! as `events` / `history`), allowing tasks to outlive their initiating turn. It also
13//! uses a **session-level [`CancellationToken`]** (not a turn child token) to mint
14//! per-task child tokens, making cancellation lifecycle independent of the initiating
15//! turn.
16//!
17//! ## Reflow (phase 1: passive)
18//!
19//! When a task completes, it pushes a [`BackgroundOutcome`] into the `completed` queue.
20//! `DefaultSession::run_turn` calls [`drain_completed`](BackgroundTasks::drain_completed)
21//! before each turn, bringing pending results into history as **prefix blocks** of the
22//! current user prompt — the LLM sees the results alongside the next user input.
23//! Phase 2 (active continuation) is handled by the session input loop competing for a
24//! new turn when a background task completes.
25//!
26//! ## Introspection and single-point cancellation (control plane)
27//!
28//! Tasks **do not disappear immediately after completion**: each task retains a
29//! [`TaskEntry`] in the `tasks` table, recording status (running / completed / failed /
30//! cancelled) and a **shared handle to the task's history**.
31//!
32//! The progress "block" granularity is deliberately set to **message blocks submitted to
33//! the LLM** ([`crate::llm::Message`]) — not streaming deltas. Streaming
34//! `AssistantText` / `AssistantThought` chunks produce several words per chunk (mapping
35//! to ACP `AgentMessageChunk`), which are unhelpful for understanding "what is this
36//! subagent doing now". The meaningful granularity is at the turn / tool-call boundary.
37//! The main loop drains the entire batch, coalesces them into a single assistant
38//! `Message`, and appends it to history — that is the moment a "block" is sent to the
39//! AI. Therefore, `spawn_agent` shares the sub-turn history `Arc` into this table
40//! (the sub-turn appends to it), and `peek` snapshots that history directly, taking
41//! the **most recent N message blocks** — a single source of truth (identical to what
42//! is fed to the LLM), no replay/coalesce of streaming deltas needed elsewhere.
43//!
44//! This allows the main agent to inspect a background sub-agent's progress with
45//! `inspect_background_task`, or cancel a single task early via
46//! [`cancel_task`](BackgroundTasks::cancel_task) without affecting other tasks
47//! (each task has its own child token). Completed task entries are evicted by FIFO
48//! upper bound to prevent unbounded growth in long sessions.
49
50use std::collections::BTreeMap;
51use std::future::Future;
52use std::sync::{Arc, Mutex};
53
54use tokio::sync::Notify;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57
58use crate::llm::{Message, MessageContent, Role};
59use crate::session::History;
60
61/// Default number of recent message blocks returned by `inspect_background_task` when
62/// `recent_blocks` is not specified.
63const DEFAULT_RECENT_BLOCKS: usize = 10;
64
65/// Default for [`BackgroundProgressConfig::finished_tasks_cap`].
66const DEFAULT_FINISHED_TASKS_CAP: usize = 64;
67
68/// Configuration for the background task **progress view**.
69///
70/// The goal is to give the main agent a **bird's-eye** view of what a subagent is
71/// currently doing, **not** to flood the main agent's context with the full text of
72/// sub-turns. Therefore the defaults are conservative — assistant/thinking text is
73/// **omitted** by default (`block_text_limit = 0`, reporting only metadata like "there is
74/// an assistant text / thinking"); tool calls, which are naturally short, are kept as-is.
75/// Users can increase `block_text_limit` when more detail is needed.
76///
77/// The source of truth for configuration lives on the agent side (here).
78/// `defect-config`'s `ToolsConfig.background` reuses this struct directly (same
79/// cross-crate reuse pattern as `TurnConfig` / `SessionCapabilitiesConfig` — config
80/// depends on agent, not the other way around).
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct BackgroundProgressConfig {
83 /// How many recent message blocks `inspect_background_task` returns by default when
84 /// called without the `recent_blocks` argument.
85 /// `0` is treated as `1` (at least one block is always returned, otherwise peek would
86 /// always be empty).
87 pub default_recent_blocks: usize,
88 /// Maximum number of Unicode scalar values for the **body** of a single block,
89 /// applied to free-form text such as assistant messages, thinking blocks, and tool
90 /// results. Text exceeding this limit is truncated at the boundary with an ellipsis
91 /// marker. `0` means no body text is kept (only the block's type and metadata, e.g.
92 /// tool name) — this is the default, and minimizes pollution of the main agent's
93 /// context.
94 pub block_text_limit: usize,
95 /// How many **finished** task entries to keep in the `tasks` table. Running entries
96 /// don't count toward the cap — they must remain to be cancelable/peekable. When the
97 /// cap is exceeded, the oldest finished entry is evicted. Bounds the memory footprint
98 /// of long-lived sessions that spawn many background tasks.
99 pub finished_tasks_cap: usize,
100}
101
102impl Default for BackgroundProgressConfig {
103 fn default() -> Self {
104 Self {
105 default_recent_blocks: DEFAULT_RECENT_BLOCKS,
106 // By default, only summary/metadata is provided, not the full body — the goal
107 // is an overview, not context transfer.
108 block_text_limit: 0,
109 finished_tasks_cap: DEFAULT_FINISHED_TASKS_CAP,
110 }
111 }
112}
113
114impl BackgroundProgressConfig {
115 /// Normalize `recent_blocks`: if the caller passes `Some(n)`, use `n` (at least 1);
116 /// if `None`, use the config default (at least 1).
117 fn resolve_recent(&self, requested: Option<usize>) -> usize {
118 requested.unwrap_or(self.default_recent_blocks).max(1)
119 }
120}
121
122/// The outcome produced after a background task completes.
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct BackgroundOutcome {
125 /// The task ID (same as returned by `spawn`), used for backflow message annotation
126 /// and external diagnostics.
127 pub task_id: String,
128 /// Task label (primarily from the `spawn_agent` profile name), included in the return
129 /// message so the model or user can identify the source.
130 pub label: String,
131 /// The result of the background task.
132 pub result: BackgroundResult,
133}
134
135/// The final result of a background task.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum BackgroundResult {
138 /// Completed successfully, containing the task's final text output.
139 Completed(String),
140 /// Failure (including cancellation), with an error description.
141 Failed(String),
142}
143
144impl BackgroundResult {
145 fn is_error(&self) -> bool {
146 matches!(self, BackgroundResult::Failed(_))
147 }
148
149 fn text(&self) -> &str {
150 match self {
151 BackgroundResult::Completed(t) | BackgroundResult::Failed(t) => t,
152 }
153 }
154}
155
156/// Lifecycle status of a background task, exposed via the `inspect_background_task`
157/// control plane.
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum TaskStatus {
160 /// Still running.
161 Running,
162 /// The task completed successfully.
163 Completed,
164 /// The task failed.
165 Failed,
166 /// Canceled by [`cancel_task`](BackgroundTasks::cancel_task) /
167 /// [`cancel_all`](BackgroundTasks::cancel_all).
168 Canceled,
169}
170
171impl TaskStatus {
172 /// Stable lowercase string name for control-plane tool output.
173 #[must_use]
174 pub fn as_str(&self) -> &'static str {
175 match self {
176 TaskStatus::Running => "running",
177 TaskStatus::Completed => "completed",
178 TaskStatus::Failed => "failed",
179 TaskStatus::Canceled => "canceled",
180 }
181 }
182
183 fn is_terminal(&self) -> bool {
184 !matches!(self, TaskStatus::Running)
185 }
186}
187
188/// The role/category of a progress block. Directly corresponds to the content of a
189/// [`crate::llm::Message`] submitted to the LLM.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub enum BlockKind {
192 /// User/task input message (including backflow of background results, tool result
193 /// re-injection, etc.).
194 User,
195 /// Text produced by the assistant.
196 AssistantText,
197 /// The assistant's chain of thought.
198 Thought,
199 /// A tool call initiated by the assistant.
200 ToolUse,
201 /// Tool result (fed back to the model).
202 ToolResult,
203 /// Other (multimodal / provider activity, etc.), normalized for display.
204 Other,
205}
206
207impl BlockKind {
208 /// Stable lowercase string name for control-plane tool output.
209 #[must_use]
210 pub fn as_str(&self) -> &'static str {
211 match self {
212 BlockKind::User => "user",
213 BlockKind::AssistantText => "assistant",
214 BlockKind::Thought => "thought",
215 BlockKind::ToolUse => "tool_use",
216 BlockKind::ToolResult => "tool_result",
217 BlockKind::Other => "other",
218 }
219 }
220
221 /// Whether this kind of block's text is "free-form body" — subject to the limit in
222 /// [`BackgroundProgressConfig::block_text_limit`]. Tool call names are inherently
223 /// one-line summaries, not body text, and are not subject to the limit.
224 fn is_free_form_body(&self) -> bool {
225 matches!(
226 self,
227 BlockKind::User | BlockKind::AssistantText | BlockKind::Thought | BlockKind::ToolResult
228 )
229 }
230}
231
232/// A single progress block returned by `peek`: kind + text summary (truncated per
233/// configuration).
234#[derive(Debug, Clone, PartialEq, Eq)]
235pub struct ProgressBlock {
236 pub kind: BlockKind,
237 pub text: String,
238}
239
240/// A snapshot of a task in the control plane (returned by `list` / `peek`).
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct TaskSnapshot {
243 pub task_id: String,
244 pub label: String,
245 pub status: TaskStatus,
246 /// Total number of progress blocks currently in this task's history (only populated
247 /// by `peek`; `list` returns `0` because it does not read history).
248 pub block_count: usize,
249 /// Recent blocks (empty for `list`; contains the latest N blocks for `peek`).
250 pub recent: Vec<ProgressBlock>,
251}
252
253/// Truncate free-form text to a character limit (splits on Unicode scalar boundaries,
254/// never breaking a character). `limit == 0` returns an empty string (metadata only).
255/// Appends ` …(+N more chars)` to indicate truncation.
256fn truncate_body(text: &str, limit: usize) -> String {
257 if limit == 0 {
258 return String::new();
259 }
260 let total = text.chars().count();
261 if total <= limit {
262 return text.to_string();
263 }
264 let kept: String = text.chars().take(limit).collect();
265 format!("{kept} …(+{} more chars)", total - limit)
266}
267
268/// Extract a human-readable text snippet from a
269/// [`ToolResultBody`](crate::llm::ToolResultBody) (for a bird's-eye summary only).
270fn tool_result_text(body: &crate::llm::ToolResultBody) -> String {
271 use crate::llm::{ToolResultBody, ToolResultContent};
272 match body {
273 ToolResultBody::Text { text } => text.clone(),
274 ToolResultBody::Json { value } => value.to_string(),
275 ToolResultBody::Content { blocks } => blocks
276 .iter()
277 .map(|b| match b {
278 ToolResultContent::Text { text } => text.clone(),
279 ToolResultContent::Image { .. } => "<image>".to_string(),
280 })
281 .collect::<Vec<_>>()
282 .join("\n"),
283 }
284}
285
286/// Maps a [`MessageContent`] to a progress block, truncating the body to `limit` (free
287/// text only).
288fn block_of_content(content: &MessageContent, role: Role, limit: usize) -> ProgressBlock {
289 let (kind, raw): (BlockKind, String) = match content {
290 MessageContent::Text { text } => {
291 let kind = if role == Role::Assistant {
292 BlockKind::AssistantText
293 } else {
294 BlockKind::User
295 };
296 (kind, text.clone())
297 }
298 MessageContent::Thinking { text, .. } => (BlockKind::Thought, text.clone()),
299 // The tool name is a one-line summary and should not be truncated as body text;
300 // parameters are excluded from the bird's-eye view (see the langfuse trace for
301 // details).
302 MessageContent::ToolUse { name, .. } => (BlockKind::ToolUse, name.clone()),
303 MessageContent::ToolResult { output, .. } => {
304 (BlockKind::ToolResult, tool_result_text(output))
305 }
306 MessageContent::Image { .. } => (BlockKind::Other, "<image>".to_string()),
307 MessageContent::ProviderActivity { kind, .. } => {
308 (BlockKind::Other, format!("provider activity: {kind:?}"))
309 }
310 };
311 let text = if kind.is_free_form_body() {
312 truncate_body(&raw, limit)
313 } else {
314 raw
315 };
316 ProgressBlock { kind, text }
317}
318
319/// Extracts the **most recent `n`** message blocks from a history snapshot (flattens each
320/// [`Message`]'s content fragments into individual blocks while preserving chronological
321/// order), truncating each block's body to `limit`. Returns `(total_blocks,
322/// last_n_blocks)`.
323fn recent_blocks_of(messages: &[Message], n: usize, limit: usize) -> (usize, Vec<ProgressBlock>) {
324 let mut all: Vec<ProgressBlock> = Vec::new();
325 for m in messages {
326 for c in m.content.iter() {
327 all.push(block_of_content(c, m.role, limit));
328 }
329 }
330 let total = all.len();
331 let skip = total.saturating_sub(n);
332 (total, all.into_iter().skip(skip).collect())
333}
334
335/// An entry in the `tasks` table.
336struct TaskEntry {
337 label: String,
338 status: TaskStatus,
339 /// Cancellation token specific to this task (child of the session-level token).
340 /// `cancel_task` calls `cancel` on it individually.
341 cancel: CancellationToken,
342 /// Shared handle to this task's history. `peek` uses it to snapshot the message
343 /// blocks submitted to the LLM.
344 /// `Some`: the tool that spawned the task (`spawn_agent`) shared the child turn's
345 /// history via `Arc`;
346 /// `None`: the task does not expose history (no progress to query; `peek` only
347 /// returns status).
348 history: Option<Arc<dyn History>>,
349 /// The `JoinHandle` that keeps the task alive past the turn that spawned it. Set to
350 /// `None` after completion.
351 handle: Option<JoinHandle<()>>,
352 /// Sequence number for termination order (only present on finished entries), used for
353 /// FIFO eviction.
354 finished_seq: Option<u64>,
355}
356
357struct BackgroundInner {
358 /// Monotonically increasing task ID counter.
359 next_id: u64,
360 /// Monotonically increasing "finish order" counter for FIFO eviction of finished
361 /// entries.
362 next_finished_seq: u64,
363 /// All tasks (running + recently finished). When finished entries exceed
364 /// [`finished_tasks_cap`](Self::finished_tasks_cap), the oldest are evicted.
365 tasks: BTreeMap<String, TaskEntry>,
366 /// Completed results pending drain (FIFO). Emptied by `drain_completed`. Orthogonal
367 /// to the `tasks` table: this drives passive draining, while `tasks` supports
368 /// control-plane queries and interrupts.
369 completed: Vec<BackgroundOutcome>,
370 /// Cap on retained finished entries (from
371 /// [`BackgroundProgressConfig::finished_tasks_cap`]).
372 finished_tasks_cap: usize,
373}
374
375impl BackgroundInner {
376 /// Marks a task as finished, records its finish sequence number, and evicts the
377 /// oldest finished entries up to the capacity limit.
378 fn finish(&mut self, id: &str, status: TaskStatus) {
379 let seq = self.next_finished_seq;
380 self.next_finished_seq += 1;
381 if let Some(entry) = self.tasks.get_mut(id) {
382 entry.status = status;
383 entry.handle = None;
384 entry.finished_seq = Some(seq);
385 }
386 self.prune_finished();
387 }
388
389 /// When finished entries exceed the cap, evict the oldest ones by finish sequence.
390 /// Running entries are never evicted.
391 fn prune_finished(&mut self) {
392 let mut finished: Vec<(u64, String)> = self
393 .tasks
394 .iter()
395 .filter_map(|(id, e)| e.finished_seq.map(|seq| (seq, id.clone())))
396 .collect();
397 if finished.len() <= self.finished_tasks_cap {
398 return;
399 }
400 finished.sort_by_key(|(seq, _)| *seq);
401 let drop_count = finished.len() - self.finished_tasks_cap;
402 for (_, id) in finished.into_iter().take(drop_count) {
403 self.tasks.remove(&id);
404 }
405 }
406}
407
408/// Session-level background task table. `Clone` is cheap (inner `Arc`) — `DefaultSession`
409/// holds one copy, cloned to tools via `ToolContext`.
410#[derive(Clone)]
411pub struct BackgroundTasks {
412 /// Session-level cancellation token. Each task derives its token via `child_token()`,
413 /// so `cancel_all` cancels all tasks at once, while cancelling any single task does
414 /// not affect the others.
415 cancel: CancellationToken,
416 /// Notifies when a task completes. Each time a task result is enqueued, `notify_one`
417 /// is called — the session driver waits on this and, when woken, starts an autonomous
418 /// turn to continue processing (phase two). Passive backpressure does not rely on it.
419 completed_notify: Arc<Notify>,
420 /// Progress view configuration (default block count / body limit). `peek` renders
421 /// based on this.
422 progress_config: BackgroundProgressConfig,
423 inner: Arc<Mutex<BackgroundInner>>,
424}
425
426impl BackgroundTasks {
427 /// Constructs a new instance with a session-level cancellation token and a
428 /// progress-view configuration. `session_cancel` is owned by the session and is
429 /// cancelled when the session terminates.
430 #[must_use]
431 pub fn new(
432 session_cancel: CancellationToken,
433 progress_config: BackgroundProgressConfig,
434 ) -> Self {
435 Self {
436 cancel: session_cancel,
437 completed_notify: Arc::new(Notify::new()),
438 progress_config,
439 inner: Arc::new(Mutex::new(BackgroundInner {
440 next_id: 0,
441 next_finished_seq: 0,
442 tasks: BTreeMap::new(),
443 completed: Vec::new(),
444 finished_tasks_cap: progress_config.finished_tasks_cap,
445 })),
446 }
447 }
448
449 /// Wait for a "task completion enqueued" event. The session driver uses this to drive
450 /// proactive continuation.
451 ///
452 /// Uses `Notify`: the driver first calls `notified()` to obtain a future, then checks
453 /// the queue, then awaits — avoiding missed notifications that arrive between checks
454 /// (`Notify`'s permit semantics guarantee that already-fired notifies are not lost).
455 pub async fn wait_for_completion(&self) {
456 self.completed_notify.notified().await;
457 }
458
459 /// Whether there are completed results waiting to be collected. The driver checks
460 /// this after waking up to decide whether to start a turn.
461 #[must_use]
462 pub fn has_completed(&self) -> bool {
463 !self
464 .inner
465 .lock()
466 .expect("BackgroundTasks mutex poisoned")
467 .completed
468 .is_empty()
469 }
470
471 /// Spawns a background task and returns its ID **immediately**.
472 ///
473 /// `make_fut` receives two handles: a [`CancellationToken`] specific to this task (a
474 /// child of the session-level token, which the task body should use to observe
475 /// cancellation) and a [`TaskHandle`] (the task body shares its history `Arc` into
476 /// the table via [`TaskHandle::attach_history`], allowing the control plane to peek
477 /// at the **message chunks submitted to the LLM**). On completion, the result is
478 /// placed in the `completed` queue and the corresponding entry in the `tasks` table
479 /// is marked as terminal (the entry is retained for later inspection).
480 ///
481 /// The closure form that "receives token/handle and then creates the future" is used
482 /// because both must be minted inside `spawn`, and the future needs to capture them —
483 /// accepting a future directly would not allow obtaining a token whose lifetime is
484 /// independent of the turn.
485 pub fn spawn<F, Fut>(&self, label: String, make_fut: F) -> String
486 where
487 F: FnOnce(CancellationToken, TaskHandle) -> Fut,
488 Fut: Future<Output = BackgroundResult> + Send + 'static,
489 {
490 let mut inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
491 let id = format!("bg-{}", inner.next_id);
492 inner.next_id += 1;
493
494 let task_cancel = self.cancel.child_token();
495 let handle = TaskHandle {
496 inner: self.inner.clone(),
497 task_id: id.clone(),
498 };
499 // The task body can detect whether it was cancelled, so that completion
500 // distinguishes between `Failed` and `Canceled` states.
501 let cancel_for_task = task_cancel.clone();
502 let fut = make_fut(task_cancel.clone(), handle);
503
504 let inner_arc = self.inner.clone();
505 let notify = self.completed_notify.clone();
506 let id_for_task = id.clone();
507 let label_for_task = label.clone();
508 let join = tokio::spawn(async move {
509 let result = fut.await;
510 // Distinguish between a task error and an explicit cancellation: the latter
511 // records the status as `Canceled`, the former as `Failed`.
512 let status = if cancel_for_task.is_cancelled() {
513 TaskStatus::Canceled
514 } else if result.is_error() {
515 TaskStatus::Failed
516 } else {
517 TaskStatus::Completed
518 };
519 if let Ok(mut inner) = inner_arc.lock() {
520 inner.finish(&id_for_task, status);
521 inner.completed.push(BackgroundOutcome {
522 task_id: id_for_task,
523 label: label_for_task,
524 result,
525 });
526 }
527 // Wakes the session driver waiting on `wait_for_completion` (active
528 // continuation).
529 // Uses `notify_one` instead of `notify_waiters`: the former **retains a
530 // permit** when no waiters exist,
531 // so the next `notified().await` returns immediately — avoiding lost wakeups
532 // when a task completes
533 // before the driver parks. Single consumer (exactly one driver), so
534 // `notify_one` semantics are correct.
535 // Notify outside the lock.
536 notify.notify_one();
537 });
538
539 inner.tasks.insert(
540 id.clone(),
541 TaskEntry {
542 label,
543 status: TaskStatus::Running,
544 cancel: task_cancel,
545 history: None,
546 handle: Some(join),
547 finished_seq: None,
548 },
549 );
550 id
551 }
552
553 /// Drain all completed results (clears the queue). Called by `run_turn` before
554 /// starting a turn to passively collect results.
555 pub fn drain_completed(&self) -> Vec<BackgroundOutcome> {
556 let mut inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
557 std::mem::take(&mut inner.completed)
558 }
559
560 /// Number of currently running tasks. Used for diagnostics / control plane.
561 #[must_use]
562 pub fn running_count(&self) -> usize {
563 self.inner
564 .lock()
565 .expect("BackgroundTasks mutex poisoned")
566 .tasks
567 .values()
568 .filter(|e| e.status == TaskStatus::Running)
569 .count()
570 }
571
572 /// Returns a snapshot of all tasks (running + recently finished), **without reading
573 /// history** (`recent` is empty, `block_count` is 0). Sorted by task ID in ascending
574 /// order. Used by `inspect_background_task` when called without arguments.
575 #[must_use]
576 pub fn list(&self) -> Vec<TaskSnapshot> {
577 let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
578 inner
579 .tasks
580 .iter()
581 .map(|(id, e)| TaskSnapshot {
582 task_id: id.clone(),
583 label: e.label.clone(),
584 status: e.status,
585 block_count: 0,
586 recent: Vec::new(),
587 })
588 .collect()
589 }
590
591 /// Take a snapshot of a single task, including the most recent `recent_blocks`
592 /// message blocks submitted to the LLM (`None` uses the config default). Returns
593 /// `None` if the task does not exist (never spawned or already evicted); blocks are
594 /// empty if the task does not expose history.
595 ///
596 /// Implementation: clone the task's history `Arc` while holding the table lock, then
597 /// release the table lock before snapshotting (snapshotting uses the history's own
598 /// lock). This avoids performing a potentially expensive deep copy of history while
599 /// holding the table lock, which would block spawn/finish.
600 #[must_use]
601 pub fn peek(&self, id: &str, recent_blocks: Option<usize>) -> Option<TaskSnapshot> {
602 let n = self.progress_config.resolve_recent(recent_blocks);
603 let limit = self.progress_config.block_text_limit;
604 let (label, status, history) = {
605 let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
606 let entry = inner.tasks.get(id)?;
607 (entry.label.clone(), entry.status, entry.history.clone())
608 };
609 let (block_count, recent) = match history {
610 Some(h) => recent_blocks_of(&h.snapshot(), n, limit),
611 None => (0, Vec::new()),
612 };
613 Some(TaskSnapshot {
614 task_id: id.to_string(),
615 label,
616 status,
617 block_count,
618 recent,
619 })
620 }
621
622 /// Cancel a single task early: cancels only its dedicated child token, without
623 /// affecting other tasks.
624 ///
625 /// Returns `Some(true)` if a running task was found and cancellation was requested;
626 /// `Some(false)` if the task exists but is already in a terminal state (no-op);
627 /// `None` if no such id exists. Cancellation is **cooperative** — the task body must
628 /// observe its cancel token and exit; the status transitions to `Canceled` only when
629 /// the task actually finishes.
630 pub fn cancel_task(&self, id: &str) -> Option<bool> {
631 let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
632 let entry = inner.tasks.get(id)?;
633 if entry.status.is_terminal() {
634 return Some(false);
635 }
636 entry.cancel.cancel();
637 Some(true)
638 }
639
640 /// Cancels all background tasks (called when the session ends). Idempotent.
641 pub fn cancel_all(&self) {
642 self.cancel.cancel();
643 }
644}
645
646/// A handle given to a background task, allowing it to share its history `Arc` into the
647/// task table so the control plane can peek at the message chunks it submits to the LLM.
648/// `Clone` is cheap (inner `Arc` + small string).
649#[derive(Clone)]
650pub struct TaskHandle {
651 inner: Arc<Mutex<BackgroundInner>>,
652 task_id: String,
653}
654
655impl TaskHandle {
656 /// Shares this task's history handle into the task table. Called by the `spawn_agent`
657 /// background path before constructing a child turn, passing the child turn's history
658 /// `Arc` — afterwards `peek` can snapshot the message chunks the child agent has
659 /// committed. The task entry may have already been evicted (in extreme cases the task
660 /// finishes instantly and is dropped by FIFO), in which case the operation is
661 /// silently ignored.
662 pub fn attach_history(&self, history: Arc<dyn History>) {
663 if let Ok(mut inner) = self.inner.lock()
664 && let Some(entry) = inner.tasks.get_mut(&self.task_id)
665 {
666 entry.history = Some(history);
667 }
668 }
669}
670
671/// Formats a background task outcome into a text block that is fed back into the
672/// conversation.
673///
674/// The wording is structured as a "deferred tool result return", clearly marking the
675/// source (task id + label) and success/failure, to prevent the model from
676/// misinterpreting it as user speech.
677/// Phase 2 will replace this with the proper ingest path using `IngestSource::Background`,
678/// at which point this function will be superseded by the corresponding payload.
679#[must_use]
680pub fn format_background_outcome(outcome: &BackgroundOutcome) -> String {
681 let status = if outcome.result.is_error() {
682 "failed"
683 } else {
684 "completed"
685 };
686 format!(
687 "⟨background task {} ({}) {}⟩\n{}",
688 outcome.task_id,
689 outcome.label,
690 status,
691 outcome.result.text()
692 )
693}
694
695#[cfg(test)]
696mod tests;