defect_agent/session/background.rs
1//! Session-level background task table.
2//!
3//! ## Problem
4//!
5//! Tools (primarily `spawn_agent { run_in_background: true }`) want to fire-and-forget
6//! a task without blocking the initiating turn. However, the turn main loop's
7//! `run_tools_concurrently` holds tool tasks in a function-local `JoinSet` — when the
8//! function returns, the `JoinSet` is dropped and tasks are aborted, so no task can
9//! outlive the turn that created it.
10//!
11//! [`BackgroundTasks`] moves task `JoinHandle`s to the **session level** (same lifetime
12//! as `events` / `history`), allowing tasks to outlive their initiating turn. It also
13//! uses a **session-level [`CancellationToken`]** (not a turn child token) to mint
14//! per-task child tokens, making cancellation lifecycle independent of the initiating
15//! turn.
16//!
17//! ## Reflow (phase 1: passive)
18//!
19//! When a task completes, it pushes a [`BackgroundOutcome`] into the `completed` queue.
20//! `DefaultSession::run_turn` calls [`drain_completed`](BackgroundTasks::drain_completed)
21//! before each turn, bringing pending results into history as **prefix blocks** of the
22//! current user prompt — the LLM sees the results alongside the next user input.
23//! Phase 2 (active continuation) is handled by the session input loop competing for a
24//! new turn when a background task completes.
25//!
26//! ## Introspection and single-point cancellation (control plane)
27//!
28//! Tasks **do not disappear immediately after completion**: each task retains a
29//! [`TaskEntry`] in the `tasks` table, recording status (running / completed / failed /
30//! cancelled) and a **shared handle to the task's history**.
31//!
32//! The progress "block" granularity is deliberately set to **message blocks submitted to
33//! the LLM** ([`crate::llm::Message`]) — not streaming deltas. Streaming
34//! `AssistantText` / `AssistantThought` chunks produce several words per chunk (mapping
35//! to ACP `AgentMessageChunk`), which are unhelpful for understanding "what is this
36//! subagent doing now". The meaningful granularity is at the turn / tool-call boundary.
37//! The main loop drains the entire batch, coalesces them into a single assistant
38//! `Message`, and appends it to history — that is the moment a "block" is sent to the
39//! AI. Therefore, `spawn_agent` shares the sub-turn history `Arc` into this table
40//! (the sub-turn appends to it), and `peek` snapshots that history directly, taking
41//! the **most recent N message blocks** — a single source of truth (identical to what
42//! is fed to the LLM), no replay/coalesce of streaming deltas needed elsewhere.
43//!
44//! This allows the main agent to inspect a background sub-agent's progress with
45//! `inspect_background_task`, or cancel a single task early via
46//! [`cancel_task`](BackgroundTasks::cancel_task) without affecting other tasks
47//! (each task has its own child token). Completed task entries are evicted by FIFO
48//! upper bound to prevent unbounded growth in long sessions.
49
50use std::collections::BTreeMap;
51use std::future::Future;
52use std::sync::{Arc, Mutex};
53
54use tokio::sync::Notify;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57
58use crate::llm::{Message, MessageContent, Role};
59use crate::session::History;
60
61/// Default number of recent message blocks returned by `inspect_background_task` when
62/// `recent_blocks` is not specified.
63const DEFAULT_RECENT_BLOCKS: usize = 10;
64
65/// How many **finished** task entries to keep in the `tasks` table. Running entries don't
66/// count toward the cap—they must remain to be cancelable/peekable. When the cap is
67/// exceeded, the oldest finished entry is evicted.
68const FINISHED_TASKS_CAP: usize = 64;
69
70/// Configuration for the background task **progress view**.
71///
72/// The goal is to give the main agent a **bird's-eye** view of what a subagent is
73/// currently doing, **not** to flood the main agent's context with the full text of
74/// sub-turns. Therefore the defaults are conservative — assistant/thinking text is
75/// **omitted** by default (`block_text_limit = 0`, reporting only metadata like "there is
76/// an assistant text / thinking"); tool calls, which are naturally short, are kept as-is.
77/// Users can increase `block_text_limit` when more detail is needed.
78///
79/// The source of truth for configuration lives on the agent side (here).
80/// `defect-config`'s `ToolsConfig.background` reuses this struct directly (same
81/// cross-crate reuse pattern as `TurnConfig` / `SessionCapabilitiesConfig` — config
82/// depends on agent, not the other way around).
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub struct BackgroundProgressConfig {
85 /// How many recent message blocks `inspect_background_task` returns by default when
86 /// called without the `recent_blocks` argument.
87 /// `0` is treated as `1` (at least one block is always returned, otherwise peek would
88 /// always be empty).
89 pub default_recent_blocks: usize,
90 /// Maximum number of Unicode scalar values for the **body** of a single block,
91 /// applied to free-form text such as assistant messages, thinking blocks, and tool
92 /// results. Text exceeding this limit is truncated at the boundary with an ellipsis
93 /// marker. `0` means no body text is kept (only the block's type and metadata, e.g.
94 /// tool name) — this is the default, and minimizes pollution of the main agent's
95 /// context.
96 pub block_text_limit: usize,
97}
98
99impl Default for BackgroundProgressConfig {
100 fn default() -> Self {
101 Self {
102 default_recent_blocks: DEFAULT_RECENT_BLOCKS,
103 // By default, only summary/metadata is provided, not the full body — the goal
104 // is an overview, not context transfer.
105 block_text_limit: 0,
106 }
107 }
108}
109
110impl BackgroundProgressConfig {
111 /// Normalize `recent_blocks`: if the caller passes `Some(n)`, use `n` (at least 1);
112 /// if `None`, use the config default (at least 1).
113 fn resolve_recent(&self, requested: Option<usize>) -> usize {
114 requested.unwrap_or(self.default_recent_blocks).max(1)
115 }
116}
117
118/// The outcome produced after a background task completes.
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct BackgroundOutcome {
121 /// The task ID (same as returned by `spawn`), used for backflow message annotation
122 /// and external diagnostics.
123 pub task_id: String,
124 /// Task label (primarily from the `spawn_agent` profile name), included in the return
125 /// message so the model or user can identify the source.
126 pub label: String,
127 /// The result of the background task.
128 pub result: BackgroundResult,
129}
130
131/// The final result of a background task.
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub enum BackgroundResult {
134 /// Completed successfully, containing the task's final text output.
135 Completed(String),
136 /// Failure (including cancellation), with an error description.
137 Failed(String),
138}
139
140impl BackgroundResult {
141 fn is_error(&self) -> bool {
142 matches!(self, BackgroundResult::Failed(_))
143 }
144
145 fn text(&self) -> &str {
146 match self {
147 BackgroundResult::Completed(t) | BackgroundResult::Failed(t) => t,
148 }
149 }
150}
151
152/// Lifecycle status of a background task, exposed via the `inspect_background_task`
153/// control plane.
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum TaskStatus {
156 /// Still running.
157 Running,
158 /// The task completed successfully.
159 Completed,
160 /// The task failed.
161 Failed,
162 /// Canceled by [`cancel_task`](BackgroundTasks::cancel_task) /
163 /// [`cancel_all`](BackgroundTasks::cancel_all).
164 Canceled,
165}
166
167impl TaskStatus {
168 /// Stable lowercase string name for control-plane tool output.
169 #[must_use]
170 pub fn as_str(&self) -> &'static str {
171 match self {
172 TaskStatus::Running => "running",
173 TaskStatus::Completed => "completed",
174 TaskStatus::Failed => "failed",
175 TaskStatus::Canceled => "canceled",
176 }
177 }
178
179 fn is_terminal(&self) -> bool {
180 !matches!(self, TaskStatus::Running)
181 }
182}
183
184/// The role/category of a progress block. Directly corresponds to the content of a
185/// [`crate::llm::Message`] submitted to the LLM.
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187pub enum BlockKind {
188 /// User/task input message (including backflow of background results, tool result
189 /// re-injection, etc.).
190 User,
191 /// Text produced by the assistant.
192 AssistantText,
193 /// The assistant's chain of thought.
194 Thought,
195 /// A tool call initiated by the assistant.
196 ToolUse,
197 /// Tool result (fed back to the model).
198 ToolResult,
199 /// Other (multimodal / provider activity, etc.), normalized for display.
200 Other,
201}
202
203impl BlockKind {
204 /// Stable lowercase string name for control-plane tool output.
205 #[must_use]
206 pub fn as_str(&self) -> &'static str {
207 match self {
208 BlockKind::User => "user",
209 BlockKind::AssistantText => "assistant",
210 BlockKind::Thought => "thought",
211 BlockKind::ToolUse => "tool_use",
212 BlockKind::ToolResult => "tool_result",
213 BlockKind::Other => "other",
214 }
215 }
216
217 /// Whether this kind of block's text is "free-form body" — subject to the limit in
218 /// [`BackgroundProgressConfig::block_text_limit`]. Tool call names are inherently
219 /// one-line summaries, not body text, and are not subject to the limit.
220 fn is_free_form_body(&self) -> bool {
221 matches!(
222 self,
223 BlockKind::User | BlockKind::AssistantText | BlockKind::Thought | BlockKind::ToolResult
224 )
225 }
226}
227
228/// A single progress block returned by `peek`: kind + text summary (truncated per
229/// configuration).
230#[derive(Debug, Clone, PartialEq, Eq)]
231pub struct ProgressBlock {
232 pub kind: BlockKind,
233 pub text: String,
234}
235
236/// A snapshot of a task in the control plane (returned by `list` / `peek`).
237#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct TaskSnapshot {
239 pub task_id: String,
240 pub label: String,
241 pub status: TaskStatus,
242 /// Total number of progress blocks currently in this task's history (only populated
243 /// by `peek`; `list` returns `0` because it does not read history).
244 pub block_count: usize,
245 /// Recent blocks (empty for `list`; contains the latest N blocks for `peek`).
246 pub recent: Vec<ProgressBlock>,
247}
248
249/// Truncate free-form text to a character limit (splits on Unicode scalar boundaries,
250/// never breaking a character). `limit == 0` returns an empty string (metadata only).
251/// Appends ` …(+N more chars)` to indicate truncation.
252fn truncate_body(text: &str, limit: usize) -> String {
253 if limit == 0 {
254 return String::new();
255 }
256 let total = text.chars().count();
257 if total <= limit {
258 return text.to_string();
259 }
260 let kept: String = text.chars().take(limit).collect();
261 format!("{kept} …(+{} more chars)", total - limit)
262}
263
264/// Extract a human-readable text snippet from a
265/// [`ToolResultBody`](crate::llm::ToolResultBody) (for a bird's-eye summary only).
266fn tool_result_text(body: &crate::llm::ToolResultBody) -> String {
267 use crate::llm::{ToolResultBody, ToolResultContent};
268 match body {
269 ToolResultBody::Text { text } => text.clone(),
270 ToolResultBody::Json { value } => value.to_string(),
271 ToolResultBody::Content { blocks } => blocks
272 .iter()
273 .map(|b| match b {
274 ToolResultContent::Text { text } => text.clone(),
275 ToolResultContent::Image { .. } => "<image>".to_string(),
276 })
277 .collect::<Vec<_>>()
278 .join("\n"),
279 }
280}
281
282/// Maps a [`MessageContent`] to a progress block, truncating the body to `limit` (free
283/// text only).
284fn block_of_content(content: &MessageContent, role: Role, limit: usize) -> ProgressBlock {
285 let (kind, raw): (BlockKind, String) = match content {
286 MessageContent::Text { text } => {
287 let kind = if role == Role::Assistant {
288 BlockKind::AssistantText
289 } else {
290 BlockKind::User
291 };
292 (kind, text.clone())
293 }
294 MessageContent::Thinking { text, .. } => (BlockKind::Thought, text.clone()),
295 // The tool name is a one-line summary and should not be truncated as body text;
296 // parameters are excluded from the bird's-eye view (see the langfuse trace for
297 // details).
298 MessageContent::ToolUse { name, .. } => (BlockKind::ToolUse, name.clone()),
299 MessageContent::ToolResult { output, .. } => {
300 (BlockKind::ToolResult, tool_result_text(output))
301 }
302 MessageContent::Image { .. } => (BlockKind::Other, "<image>".to_string()),
303 MessageContent::ProviderActivity { kind, .. } => {
304 (BlockKind::Other, format!("provider activity: {kind:?}"))
305 }
306 };
307 let text = if kind.is_free_form_body() {
308 truncate_body(&raw, limit)
309 } else {
310 raw
311 };
312 ProgressBlock { kind, text }
313}
314
315/// Extracts the **most recent `n`** message blocks from a history snapshot (flattens each
316/// [`Message`]'s content fragments into individual blocks while preserving chronological
317/// order), truncating each block's body to `limit`. Returns `(total_blocks,
318/// last_n_blocks)`.
319fn recent_blocks_of(messages: &[Message], n: usize, limit: usize) -> (usize, Vec<ProgressBlock>) {
320 let mut all: Vec<ProgressBlock> = Vec::new();
321 for m in messages {
322 for c in m.content.iter() {
323 all.push(block_of_content(c, m.role, limit));
324 }
325 }
326 let total = all.len();
327 let skip = total.saturating_sub(n);
328 (total, all.into_iter().skip(skip).collect())
329}
330
331/// An entry in the `tasks` table.
332struct TaskEntry {
333 label: String,
334 status: TaskStatus,
335 /// Cancellation token specific to this task (child of the session-level token).
336 /// `cancel_task` calls `cancel` on it individually.
337 cancel: CancellationToken,
338 /// Shared handle to this task's history. `peek` uses it to snapshot the message
339 /// blocks submitted to the LLM.
340 /// `Some`: the tool that spawned the task (`spawn_agent`) shared the child turn's
341 /// history via `Arc`;
342 /// `None`: the task does not expose history (no progress to query; `peek` only
343 /// returns status).
344 history: Option<Arc<dyn History>>,
345 /// The `JoinHandle` that keeps the task alive past the turn that spawned it. Set to
346 /// `None` after completion.
347 handle: Option<JoinHandle<()>>,
348 /// Sequence number for termination order (only present on finished entries), used for
349 /// FIFO eviction.
350 finished_seq: Option<u64>,
351}
352
353struct BackgroundInner {
354 /// Monotonically increasing task ID counter.
355 next_id: u64,
356 /// Monotonically increasing "finish order" counter for FIFO eviction of finished
357 /// entries.
358 next_finished_seq: u64,
359 /// All tasks (running + recently finished). When finished entries exceed
360 /// [`FINISHED_TASKS_CAP`], the oldest are evicted.
361 tasks: BTreeMap<String, TaskEntry>,
362 /// Completed results pending drain (FIFO). Emptied by `drain_completed`. Orthogonal
363 /// to the `tasks` table: this drives passive draining, while `tasks` supports
364 /// control-plane queries and interrupts.
365 completed: Vec<BackgroundOutcome>,
366}
367
368impl BackgroundInner {
369 /// Marks a task as finished, records its finish sequence number, and evicts the
370 /// oldest finished entries up to the capacity limit.
371 fn finish(&mut self, id: &str, status: TaskStatus) {
372 let seq = self.next_finished_seq;
373 self.next_finished_seq += 1;
374 if let Some(entry) = self.tasks.get_mut(id) {
375 entry.status = status;
376 entry.handle = None;
377 entry.finished_seq = Some(seq);
378 }
379 self.prune_finished();
380 }
381
382 /// When finished entries exceed the cap, evict the oldest ones by finish sequence.
383 /// Running entries are never evicted.
384 fn prune_finished(&mut self) {
385 let mut finished: Vec<(u64, String)> = self
386 .tasks
387 .iter()
388 .filter_map(|(id, e)| e.finished_seq.map(|seq| (seq, id.clone())))
389 .collect();
390 if finished.len() <= FINISHED_TASKS_CAP {
391 return;
392 }
393 finished.sort_by_key(|(seq, _)| *seq);
394 let drop_count = finished.len() - FINISHED_TASKS_CAP;
395 for (_, id) in finished.into_iter().take(drop_count) {
396 self.tasks.remove(&id);
397 }
398 }
399}
400
401/// Session-level background task table. `Clone` is cheap (inner `Arc`) — `DefaultSession`
402/// holds one copy, cloned to tools via `ToolContext`.
403#[derive(Clone)]
404pub struct BackgroundTasks {
405 /// Session-level cancellation token. Each task derives its token via `child_token()`,
406 /// so `cancel_all` cancels all tasks at once, while cancelling any single task does
407 /// not affect the others.
408 cancel: CancellationToken,
409 /// Notifies when a task completes. Each time a task result is enqueued, `notify_one`
410 /// is called — the session driver waits on this and, when woken, starts an autonomous
411 /// turn to continue processing (phase two). Passive backpressure does not rely on it.
412 completed_notify: Arc<Notify>,
413 /// Progress view configuration (default block count / body limit). `peek` renders
414 /// based on this.
415 progress_config: BackgroundProgressConfig,
416 inner: Arc<Mutex<BackgroundInner>>,
417}
418
419impl BackgroundTasks {
420 /// Constructs a new instance with a session-level cancellation token and a
421 /// progress-view configuration. `session_cancel` is owned by the session and is
422 /// cancelled when the session terminates.
423 #[must_use]
424 pub fn new(
425 session_cancel: CancellationToken,
426 progress_config: BackgroundProgressConfig,
427 ) -> Self {
428 Self {
429 cancel: session_cancel,
430 completed_notify: Arc::new(Notify::new()),
431 progress_config,
432 inner: Arc::new(Mutex::new(BackgroundInner {
433 next_id: 0,
434 next_finished_seq: 0,
435 tasks: BTreeMap::new(),
436 completed: Vec::new(),
437 })),
438 }
439 }
440
441 /// Wait for a "task completion enqueued" event. The session driver uses this to drive
442 /// proactive continuation.
443 ///
444 /// Uses `Notify`: the driver first calls `notified()` to obtain a future, then checks
445 /// the queue, then awaits — avoiding missed notifications that arrive between checks
446 /// (`Notify`'s permit semantics guarantee that already-fired notifies are not lost).
447 pub async fn wait_for_completion(&self) {
448 self.completed_notify.notified().await;
449 }
450
451 /// Whether there are completed results waiting to be collected. The driver checks
452 /// this after waking up to decide whether to start a turn.
453 #[must_use]
454 pub fn has_completed(&self) -> bool {
455 !self
456 .inner
457 .lock()
458 .expect("BackgroundTasks mutex poisoned")
459 .completed
460 .is_empty()
461 }
462
463 /// Spawns a background task and returns its ID **immediately**.
464 ///
465 /// `make_fut` receives two handles: a [`CancellationToken`] specific to this task (a
466 /// child of the session-level token, which the task body should use to observe
467 /// cancellation) and a [`TaskHandle`] (the task body shares its history `Arc` into
468 /// the table via [`TaskHandle::attach_history`], allowing the control plane to peek
469 /// at the **message chunks submitted to the LLM**). On completion, the result is
470 /// placed in the `completed` queue and the corresponding entry in the `tasks` table
471 /// is marked as terminal (the entry is retained for later inspection).
472 ///
473 /// The closure form that "receives token/handle and then creates the future" is used
474 /// because both must be minted inside `spawn`, and the future needs to capture them —
475 /// accepting a future directly would not allow obtaining a token whose lifetime is
476 /// independent of the turn.
477 pub fn spawn<F, Fut>(&self, label: String, make_fut: F) -> String
478 where
479 F: FnOnce(CancellationToken, TaskHandle) -> Fut,
480 Fut: Future<Output = BackgroundResult> + Send + 'static,
481 {
482 let mut inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
483 let id = format!("bg-{}", inner.next_id);
484 inner.next_id += 1;
485
486 let task_cancel = self.cancel.child_token();
487 let handle = TaskHandle {
488 inner: self.inner.clone(),
489 task_id: id.clone(),
490 };
491 // The task body can detect whether it was cancelled, so that completion
492 // distinguishes between `Failed` and `Canceled` states.
493 let cancel_for_task = task_cancel.clone();
494 let fut = make_fut(task_cancel.clone(), handle);
495
496 let inner_arc = self.inner.clone();
497 let notify = self.completed_notify.clone();
498 let id_for_task = id.clone();
499 let label_for_task = label.clone();
500 let join = tokio::spawn(async move {
501 let result = fut.await;
502 // Distinguish between a task error and an explicit cancellation: the latter
503 // records the status as `Canceled`, the former as `Failed`.
504 let status = if cancel_for_task.is_cancelled() {
505 TaskStatus::Canceled
506 } else if result.is_error() {
507 TaskStatus::Failed
508 } else {
509 TaskStatus::Completed
510 };
511 if let Ok(mut inner) = inner_arc.lock() {
512 inner.finish(&id_for_task, status);
513 inner.completed.push(BackgroundOutcome {
514 task_id: id_for_task,
515 label: label_for_task,
516 result,
517 });
518 }
519 // Wakes the session driver waiting on `wait_for_completion` (active
520 // continuation).
521 // Uses `notify_one` instead of `notify_waiters`: the former **retains a
522 // permit** when no waiters exist,
523 // so the next `notified().await` returns immediately — avoiding lost wakeups
524 // when a task completes
525 // before the driver parks. Single consumer (exactly one driver), so
526 // `notify_one` semantics are correct.
527 // Notify outside the lock.
528 notify.notify_one();
529 });
530
531 inner.tasks.insert(
532 id.clone(),
533 TaskEntry {
534 label,
535 status: TaskStatus::Running,
536 cancel: task_cancel,
537 history: None,
538 handle: Some(join),
539 finished_seq: None,
540 },
541 );
542 id
543 }
544
545 /// Drain all completed results (clears the queue). Called by `run_turn` before
546 /// starting a turn to passively collect results.
547 pub fn drain_completed(&self) -> Vec<BackgroundOutcome> {
548 let mut inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
549 std::mem::take(&mut inner.completed)
550 }
551
552 /// Number of currently running tasks. Used for diagnostics / control plane.
553 #[must_use]
554 pub fn running_count(&self) -> usize {
555 self.inner
556 .lock()
557 .expect("BackgroundTasks mutex poisoned")
558 .tasks
559 .values()
560 .filter(|e| e.status == TaskStatus::Running)
561 .count()
562 }
563
564 /// Returns a snapshot of all tasks (running + recently finished), **without reading
565 /// history** (`recent` is empty, `block_count` is 0). Sorted by task ID in ascending
566 /// order. Used by `inspect_background_task` when called without arguments.
567 #[must_use]
568 pub fn list(&self) -> Vec<TaskSnapshot> {
569 let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
570 inner
571 .tasks
572 .iter()
573 .map(|(id, e)| TaskSnapshot {
574 task_id: id.clone(),
575 label: e.label.clone(),
576 status: e.status,
577 block_count: 0,
578 recent: Vec::new(),
579 })
580 .collect()
581 }
582
583 /// Take a snapshot of a single task, including the most recent `recent_blocks`
584 /// message blocks submitted to the LLM (`None` uses the config default). Returns
585 /// `None` if the task does not exist (never spawned or already evicted); blocks are
586 /// empty if the task does not expose history.
587 ///
588 /// Implementation: clone the task's history `Arc` while holding the table lock, then
589 /// release the table lock before snapshotting (snapshotting uses the history's own
590 /// lock). This avoids performing a potentially expensive deep copy of history while
591 /// holding the table lock, which would block spawn/finish.
592 #[must_use]
593 pub fn peek(&self, id: &str, recent_blocks: Option<usize>) -> Option<TaskSnapshot> {
594 let n = self.progress_config.resolve_recent(recent_blocks);
595 let limit = self.progress_config.block_text_limit;
596 let (label, status, history) = {
597 let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
598 let entry = inner.tasks.get(id)?;
599 (entry.label.clone(), entry.status, entry.history.clone())
600 };
601 let (block_count, recent) = match history {
602 Some(h) => recent_blocks_of(&h.snapshot(), n, limit),
603 None => (0, Vec::new()),
604 };
605 Some(TaskSnapshot {
606 task_id: id.to_string(),
607 label,
608 status,
609 block_count,
610 recent,
611 })
612 }
613
614 /// Cancel a single task early: cancels only its dedicated child token, without
615 /// affecting other tasks.
616 ///
617 /// Returns `Some(true)` if a running task was found and cancellation was requested;
618 /// `Some(false)` if the task exists but is already in a terminal state (no-op);
619 /// `None` if no such id exists. Cancellation is **cooperative** — the task body must
620 /// observe its cancel token and exit; the status transitions to `Canceled` only when
621 /// the task actually finishes.
622 pub fn cancel_task(&self, id: &str) -> Option<bool> {
623 let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
624 let entry = inner.tasks.get(id)?;
625 if entry.status.is_terminal() {
626 return Some(false);
627 }
628 entry.cancel.cancel();
629 Some(true)
630 }
631
632 /// Cancels all background tasks (called when the session ends). Idempotent.
633 pub fn cancel_all(&self) {
634 self.cancel.cancel();
635 }
636}
637
638/// A handle given to a background task, allowing it to share its history `Arc` into the
639/// task table so the control plane can peek at the message chunks it submits to the LLM.
640/// `Clone` is cheap (inner `Arc` + small string).
641#[derive(Clone)]
642pub struct TaskHandle {
643 inner: Arc<Mutex<BackgroundInner>>,
644 task_id: String,
645}
646
647impl TaskHandle {
648 /// Shares this task's history handle into the task table. Called by the `spawn_agent`
649 /// background path before constructing a child turn, passing the child turn's history
650 /// `Arc` — afterwards `peek` can snapshot the message chunks the child agent has
651 /// committed. The task entry may have already been evicted (in extreme cases the task
652 /// finishes instantly and is dropped by FIFO), in which case the operation is
653 /// silently ignored.
654 pub fn attach_history(&self, history: Arc<dyn History>) {
655 if let Ok(mut inner) = self.inner.lock()
656 && let Some(entry) = inner.tasks.get_mut(&self.task_id)
657 {
658 entry.history = Some(history);
659 }
660 }
661}
662
663/// Formats a background task outcome into a text block that is fed back into the
664/// conversation.
665///
666/// The wording is structured as a "deferred tool result return", clearly marking the
667/// source (task id + label) and success/failure, to prevent the model from
668/// misinterpreting it as user speech.
669/// Phase 2 will replace this with the proper ingest path using `IngestSource::Background`,
670/// at which point this function will be superseded by the corresponding payload.
671#[must_use]
672pub fn format_background_outcome(outcome: &BackgroundOutcome) -> String {
673 let status = if outcome.result.is_error() {
674 "failed"
675 } else {
676 "completed"
677 };
678 format!(
679 "⟨background task {} ({}) {}⟩\n{}",
680 outcome.task_id,
681 outcome.label,
682 status,
683 outcome.result.text()
684 )
685}
686
687#[cfg(test)]
688mod tests;