Skip to main content

defect_agent/session/
background.rs

1//! Session-level background task table.
2//!
3//! ## Problem
4//!
5//! Tools (primarily `spawn_agent { run_in_background: true }`) want to fire-and-forget
6//! a task without blocking the initiating turn. However, the turn main loop's
7//! `run_tools_concurrently` holds tool tasks in a function-local `JoinSet` — when the
8//! function returns, the `JoinSet` is dropped and tasks are aborted, so no task can
9//! outlive the turn that created it.
10//!
11//! [`BackgroundTasks`] moves task `JoinHandle`s to the **session level** (same lifetime
12//! as `events` / `history`), allowing tasks to outlive their initiating turn. It also
13//! uses a **session-level [`CancellationToken`]** (not a turn child token) to mint
14//! per-task child tokens, making cancellation lifecycle independent of the initiating
15//! turn.
16//!
17//! ## Reflow (phase 1: passive)
18//!
19//! When a task completes, it pushes a [`BackgroundOutcome`] into the `completed` queue.
20//! `DefaultSession::run_turn` calls [`drain_completed`](BackgroundTasks::drain_completed)
21//! before each turn, bringing pending results into history as **prefix blocks** of the
22//! current user prompt — the LLM sees the results alongside the next user input.
23//! Phase 2 (active continuation) is handled by the session input loop competing for a
24//! new turn when a background task completes.
25//!
26//! ## Introspection and single-point cancellation (control plane)
27//!
28//! Tasks **do not disappear immediately after completion**: each task retains a
29//! [`TaskEntry`] in the `tasks` table, recording status (running / completed / failed /
30//! cancelled) and a **shared handle to the task's history**.
31//!
32//! The progress "block" granularity is deliberately set to **message blocks submitted to
33//! the LLM** ([`crate::llm::Message`]) — not streaming deltas. Streaming
34//! `AssistantText` / `AssistantThought` chunks produce several words per chunk (mapping
35//! to ACP `AgentMessageChunk`), which are unhelpful for understanding "what is this
36//! subagent doing now". The meaningful granularity is at the turn / tool-call boundary.
37//! The main loop drains the entire batch, coalesces them into a single assistant
38//! `Message`, and appends it to history — that is the moment a "block" is sent to the
39//! AI. Therefore, `spawn_agent` shares the sub-turn history `Arc` into this table
40//! (the sub-turn appends to it), and `peek` snapshots that history directly, taking
41//! the **most recent N message blocks** — a single source of truth (identical to what
42//! is fed to the LLM), no replay/coalesce of streaming deltas needed elsewhere.
43//!
44//! This allows the main agent to inspect a background sub-agent's progress with
45//! `inspect_background_task`, or cancel a single task early via
46//! [`cancel_task`](BackgroundTasks::cancel_task) without affecting other tasks
47//! (each task has its own child token). Completed task entries are evicted by FIFO
48//! upper bound to prevent unbounded growth in long sessions.
49
50use std::collections::BTreeMap;
51use std::future::Future;
52use std::sync::{Arc, Mutex};
53
54use tokio::sync::Notify;
55use tokio::task::JoinHandle;
56use tokio_util::sync::CancellationToken;
57
58use crate::llm::{Message, MessageContent, Role};
59use crate::session::History;
60
61/// Default number of recent message blocks returned by `inspect_background_task` when
62/// `recent_blocks` is not specified.
63const DEFAULT_RECENT_BLOCKS: usize = 10;
64
65/// Default for [`BackgroundProgressConfig::finished_tasks_cap`].
66const DEFAULT_FINISHED_TASKS_CAP: usize = 64;
67
68/// Configuration for the background task **progress view**.
69///
70/// The goal is to give the main agent a **bird's-eye** view of what a subagent is
71/// currently doing, **not** to flood the main agent's context with the full text of
72/// sub-turns. Therefore the defaults are conservative — assistant/thinking text is
73/// **omitted** by default (`block_text_limit = 0`, reporting only metadata like "there is
74/// an assistant text / thinking"); tool calls, which are naturally short, are kept as-is.
75/// Users can increase `block_text_limit` when more detail is needed.
76///
77/// The source of truth for configuration lives on the agent side (here).
78/// `defect-config`'s `ToolsConfig.background` reuses this struct directly (same
79/// cross-crate reuse pattern as `TurnConfig` / `SessionCapabilitiesConfig` — config
80/// depends on agent, not the other way around).
81#[derive(Debug, Clone, Copy, PartialEq, Eq)]
82pub struct BackgroundProgressConfig {
83    /// How many recent message blocks `inspect_background_task` returns by default when
84    /// called without the `recent_blocks` argument.
85    /// `0` is treated as `1` (at least one block is always returned, otherwise peek would
86    /// always be empty).
87    pub default_recent_blocks: usize,
88    /// Maximum number of Unicode scalar values for the **body** of a single block,
89    /// applied to free-form text such as assistant messages, thinking blocks, and tool
90    /// results. Text exceeding this limit is truncated at the boundary with an ellipsis
91    /// marker. `0` means no body text is kept (only the block's type and metadata, e.g.
92    /// tool name) — this is the default, and minimizes pollution of the main agent's
93    /// context.
94    pub block_text_limit: usize,
95    /// How many **finished** task entries to keep in the `tasks` table. Running entries
96    /// don't count toward the cap — they must remain to be cancelable/peekable. When the
97    /// cap is exceeded, the oldest finished entry is evicted. Bounds the memory footprint
98    /// of long-lived sessions that spawn many background tasks.
99    pub finished_tasks_cap: usize,
100}
101
102impl Default for BackgroundProgressConfig {
103    fn default() -> Self {
104        Self {
105            default_recent_blocks: DEFAULT_RECENT_BLOCKS,
106            // By default, only summary/metadata is provided, not the full body — the goal
107            // is an overview, not context transfer.
108            block_text_limit: 0,
109            finished_tasks_cap: DEFAULT_FINISHED_TASKS_CAP,
110        }
111    }
112}
113
114impl BackgroundProgressConfig {
115    /// Normalize `recent_blocks`: if the caller passes `Some(n)`, use `n` (at least 1);
116    /// if `None`, use the config default (at least 1).
117    fn resolve_recent(&self, requested: Option<usize>) -> usize {
118        requested.unwrap_or(self.default_recent_blocks).max(1)
119    }
120}
121
122/// The outcome produced after a background task completes.
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct BackgroundOutcome {
125    /// The task ID (same as returned by `spawn`), used for backflow message annotation
126    /// and external diagnostics.
127    pub task_id: String,
128    /// Task label (primarily from the `spawn_agent` profile name), included in the return
129    /// message so the model or user can identify the source.
130    pub label: String,
131    /// The result of the background task.
132    pub result: BackgroundResult,
133}
134
135/// The final result of a background task.
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum BackgroundResult {
138    /// Completed successfully, containing the task's final text output.
139    Completed(String),
140    /// Failure (including cancellation), with an error description.
141    Failed(String),
142}
143
144impl BackgroundResult {
145    fn is_error(&self) -> bool {
146        matches!(self, BackgroundResult::Failed(_))
147    }
148
149    fn text(&self) -> &str {
150        match self {
151            BackgroundResult::Completed(t) | BackgroundResult::Failed(t) => t,
152        }
153    }
154}
155
156/// Lifecycle status of a background task, exposed via the `inspect_background_task`
157/// control plane.
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum TaskStatus {
160    /// Still running.
161    Running,
162    /// The task completed successfully.
163    Completed,
164    /// The task failed.
165    Failed,
166    /// Canceled by [`cancel_task`](BackgroundTasks::cancel_task) /
167    /// [`cancel_all`](BackgroundTasks::cancel_all).
168    Canceled,
169}
170
171impl TaskStatus {
172    /// Stable lowercase string name for control-plane tool output.
173    #[must_use]
174    pub fn as_str(&self) -> &'static str {
175        match self {
176            TaskStatus::Running => "running",
177            TaskStatus::Completed => "completed",
178            TaskStatus::Failed => "failed",
179            TaskStatus::Canceled => "canceled",
180        }
181    }
182
183    fn is_terminal(&self) -> bool {
184        !matches!(self, TaskStatus::Running)
185    }
186}
187
188/// The role/category of a progress block. Directly corresponds to the content of a
189/// [`crate::llm::Message`] submitted to the LLM.
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
191pub enum BlockKind {
192    /// User/task input message (including backflow of background results, tool result
193    /// re-injection, etc.).
194    User,
195    /// Text produced by the assistant.
196    AssistantText,
197    /// The assistant's chain of thought.
198    Thought,
199    /// A tool call initiated by the assistant.
200    ToolUse,
201    /// Tool result (fed back to the model).
202    ToolResult,
203    /// Other (multimodal / provider activity, etc.), normalized for display.
204    Other,
205}
206
207impl BlockKind {
208    /// Stable lowercase string name for control-plane tool output.
209    #[must_use]
210    pub fn as_str(&self) -> &'static str {
211        match self {
212            BlockKind::User => "user",
213            BlockKind::AssistantText => "assistant",
214            BlockKind::Thought => "thought",
215            BlockKind::ToolUse => "tool_use",
216            BlockKind::ToolResult => "tool_result",
217            BlockKind::Other => "other",
218        }
219    }
220
221    /// Whether this kind of block's text is "free-form body" — subject to the limit in
222    /// [`BackgroundProgressConfig::block_text_limit`]. Tool call names are inherently
223    /// one-line summaries, not body text, and are not subject to the limit.
224    fn is_free_form_body(&self) -> bool {
225        matches!(
226            self,
227            BlockKind::User | BlockKind::AssistantText | BlockKind::Thought | BlockKind::ToolResult
228        )
229    }
230}
231
232/// A single progress block returned by `peek`: kind + text summary (truncated per
233/// configuration).
234#[derive(Debug, Clone, PartialEq, Eq)]
235pub struct ProgressBlock {
236    pub kind: BlockKind,
237    pub text: String,
238}
239
240/// A snapshot of a task in the control plane (returned by `list` / `peek`).
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct TaskSnapshot {
243    pub task_id: String,
244    pub label: String,
245    pub status: TaskStatus,
246    /// Total number of progress blocks currently in this task's history (only populated
247    /// by `peek`; `list` returns `0` because it does not read history).
248    pub block_count: usize,
249    /// Recent blocks (empty for `list`; contains the latest N blocks for `peek`).
250    pub recent: Vec<ProgressBlock>,
251}
252
253/// Truncate free-form text to a character limit (splits on Unicode scalar boundaries,
254/// never breaking a character). `limit == 0` returns an empty string (metadata only).
255/// Appends ` …(+N more chars)` to indicate truncation.
256fn truncate_body(text: &str, limit: usize) -> String {
257    if limit == 0 {
258        return String::new();
259    }
260    let total = text.chars().count();
261    if total <= limit {
262        return text.to_string();
263    }
264    let kept: String = text.chars().take(limit).collect();
265    format!("{kept} …(+{} more chars)", total - limit)
266}
267
268/// Extract a human-readable text snippet from a
269/// [`ToolResultBody`](crate::llm::ToolResultBody) (for a bird's-eye summary only).
270fn tool_result_text(body: &crate::llm::ToolResultBody) -> String {
271    use crate::llm::{ToolResultBody, ToolResultContent};
272    match body {
273        ToolResultBody::Text { text } => text.clone(),
274        ToolResultBody::Json { value } => value.to_string(),
275        ToolResultBody::Content { blocks } => blocks
276            .iter()
277            .map(|b| match b {
278                ToolResultContent::Text { text } => text.clone(),
279                ToolResultContent::Image { .. } => "<image>".to_string(),
280            })
281            .collect::<Vec<_>>()
282            .join("\n"),
283    }
284}
285
286/// Maps a [`MessageContent`] to a progress block, truncating the body to `limit` (free
287/// text only).
288fn block_of_content(content: &MessageContent, role: Role, limit: usize) -> ProgressBlock {
289    let (kind, raw): (BlockKind, String) = match content {
290        MessageContent::Text { text } => {
291            let kind = if role == Role::Assistant {
292                BlockKind::AssistantText
293            } else {
294                BlockKind::User
295            };
296            (kind, text.clone())
297        }
298        MessageContent::Thinking { text, .. } => (BlockKind::Thought, text.clone()),
299        // The tool name is a one-line summary and should not be truncated as body text;
300        // parameters are excluded from the bird's-eye view (see the langfuse trace for
301        // details).
302        MessageContent::ToolUse { name, .. } => (BlockKind::ToolUse, name.clone()),
303        MessageContent::ToolResult { output, .. } => {
304            (BlockKind::ToolResult, tool_result_text(output))
305        }
306        MessageContent::Image { .. } => (BlockKind::Other, "<image>".to_string()),
307        MessageContent::ProviderActivity { kind, .. } => {
308            (BlockKind::Other, format!("provider activity: {kind:?}"))
309        }
310    };
311    let text = if kind.is_free_form_body() {
312        truncate_body(&raw, limit)
313    } else {
314        raw
315    };
316    ProgressBlock { kind, text }
317}
318
319/// Extracts the **most recent `n`** message blocks from a history snapshot (flattens each
320/// [`Message`]'s content fragments into individual blocks while preserving chronological
321/// order), truncating each block's body to `limit`. Returns `(total_blocks,
322/// last_n_blocks)`.
323fn recent_blocks_of(messages: &[Message], n: usize, limit: usize) -> (usize, Vec<ProgressBlock>) {
324    let mut all: Vec<ProgressBlock> = Vec::new();
325    for m in messages {
326        for c in m.content.iter() {
327            all.push(block_of_content(c, m.role, limit));
328        }
329    }
330    let total = all.len();
331    let skip = total.saturating_sub(n);
332    (total, all.into_iter().skip(skip).collect())
333}
334
335/// An entry in the `tasks` table.
336struct TaskEntry {
337    label: String,
338    status: TaskStatus,
339    /// Cancellation token specific to this task (child of the session-level token).
340    /// `cancel_task` calls `cancel` on it individually.
341    cancel: CancellationToken,
342    /// Shared handle to this task's history. `peek` uses it to snapshot the message
343    /// blocks submitted to the LLM.
344    /// `Some`: the tool that spawned the task (`spawn_agent`) shared the child turn's
345    /// history via `Arc`;
346    /// `None`: the task does not expose history (no progress to query; `peek` only
347    /// returns status).
348    history: Option<Arc<dyn History>>,
349    /// The `JoinHandle` that keeps the task alive past the turn that spawned it. Set to
350    /// `None` after completion.
351    handle: Option<JoinHandle<()>>,
352    /// Sequence number for termination order (only present on finished entries), used for
353    /// FIFO eviction.
354    finished_seq: Option<u64>,
355}
356
357struct BackgroundInner {
358    /// Monotonically increasing task ID counter.
359    next_id: u64,
360    /// Monotonically increasing "finish order" counter for FIFO eviction of finished
361    /// entries.
362    next_finished_seq: u64,
363    /// All tasks (running + recently finished). When finished entries exceed
364    /// [`finished_tasks_cap`](Self::finished_tasks_cap), the oldest are evicted.
365    tasks: BTreeMap<String, TaskEntry>,
366    /// Completed results pending drain (FIFO). Emptied by `drain_completed`. Orthogonal
367    /// to the `tasks` table: this drives passive draining, while `tasks` supports
368    /// control-plane queries and interrupts.
369    completed: Vec<BackgroundOutcome>,
370    /// Cap on retained finished entries (from
371    /// [`BackgroundProgressConfig::finished_tasks_cap`]).
372    finished_tasks_cap: usize,
373}
374
375impl BackgroundInner {
376    /// Marks a task as finished, records its finish sequence number, and evicts the
377    /// oldest finished entries up to the capacity limit.
378    fn finish(&mut self, id: &str, status: TaskStatus) {
379        let seq = self.next_finished_seq;
380        self.next_finished_seq += 1;
381        if let Some(entry) = self.tasks.get_mut(id) {
382            entry.status = status;
383            entry.handle = None;
384            entry.finished_seq = Some(seq);
385        }
386        self.prune_finished();
387    }
388
389    /// When finished entries exceed the cap, evict the oldest ones by finish sequence.
390    /// Running entries are never evicted.
391    fn prune_finished(&mut self) {
392        let mut finished: Vec<(u64, String)> = self
393            .tasks
394            .iter()
395            .filter_map(|(id, e)| e.finished_seq.map(|seq| (seq, id.clone())))
396            .collect();
397        if finished.len() <= self.finished_tasks_cap {
398            return;
399        }
400        finished.sort_by_key(|(seq, _)| *seq);
401        let drop_count = finished.len() - self.finished_tasks_cap;
402        for (_, id) in finished.into_iter().take(drop_count) {
403            self.tasks.remove(&id);
404        }
405    }
406}
407
408/// Session-level background task table. `Clone` is cheap (inner `Arc`) — `DefaultSession`
409/// holds one copy, cloned to tools via `ToolContext`.
410#[derive(Clone)]
411pub struct BackgroundTasks {
412    /// Session-level cancellation token. Each task derives its token via `child_token()`,
413    /// so `cancel_all` cancels all tasks at once, while cancelling any single task does
414    /// not affect the others.
415    cancel: CancellationToken,
416    /// Notifies when a task completes. Each time a task result is enqueued, `notify_one`
417    /// is called — the session driver waits on this and, when woken, starts an autonomous
418    /// turn to continue processing (phase two). Passive backpressure does not rely on it.
419    completed_notify: Arc<Notify>,
420    /// Progress view configuration (default block count / body limit). `peek` renders
421    /// based on this.
422    progress_config: BackgroundProgressConfig,
423    inner: Arc<Mutex<BackgroundInner>>,
424}
425
426impl BackgroundTasks {
427    /// Constructs a new instance with a session-level cancellation token and a
428    /// progress-view configuration. `session_cancel` is owned by the session and is
429    /// cancelled when the session terminates.
430    #[must_use]
431    pub fn new(
432        session_cancel: CancellationToken,
433        progress_config: BackgroundProgressConfig,
434    ) -> Self {
435        Self {
436            cancel: session_cancel,
437            completed_notify: Arc::new(Notify::new()),
438            progress_config,
439            inner: Arc::new(Mutex::new(BackgroundInner {
440                next_id: 0,
441                next_finished_seq: 0,
442                tasks: BTreeMap::new(),
443                completed: Vec::new(),
444                finished_tasks_cap: progress_config.finished_tasks_cap,
445            })),
446        }
447    }
448
449    /// Wait for a "task completion enqueued" event. The session driver uses this to drive
450    /// proactive continuation.
451    ///
452    /// Uses `Notify`: the driver first calls `notified()` to obtain a future, then checks
453    /// the queue, then awaits — avoiding missed notifications that arrive between checks
454    /// (`Notify`'s permit semantics guarantee that already-fired notifies are not lost).
455    pub async fn wait_for_completion(&self) {
456        self.completed_notify.notified().await;
457    }
458
459    /// Whether there are completed results waiting to be collected. The driver checks
460    /// this after waking up to decide whether to start a turn.
461    #[must_use]
462    pub fn has_completed(&self) -> bool {
463        !self
464            .inner
465            .lock()
466            .expect("BackgroundTasks mutex poisoned")
467            .completed
468            .is_empty()
469    }
470
471    /// Spawns a background task and returns its ID **immediately**.
472    ///
473    /// `make_fut` receives two handles: a [`CancellationToken`] specific to this task (a
474    /// child of the session-level token, which the task body should use to observe
475    /// cancellation) and a [`TaskHandle`] (the task body shares its history `Arc` into
476    /// the table via [`TaskHandle::attach_history`], allowing the control plane to peek
477    /// at the **message chunks submitted to the LLM**). On completion, the result is
478    /// placed in the `completed` queue and the corresponding entry in the `tasks` table
479    /// is marked as terminal (the entry is retained for later inspection).
480    ///
481    /// The closure form that "receives token/handle and then creates the future" is used
482    /// because both must be minted inside `spawn`, and the future needs to capture them —
483    /// accepting a future directly would not allow obtaining a token whose lifetime is
484    /// independent of the turn.
485    pub fn spawn<F, Fut>(&self, label: String, make_fut: F) -> String
486    where
487        F: FnOnce(CancellationToken, TaskHandle) -> Fut,
488        Fut: Future<Output = BackgroundResult> + Send + 'static,
489    {
490        let mut inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
491        let id = format!("bg-{}", inner.next_id);
492        inner.next_id += 1;
493
494        let task_cancel = self.cancel.child_token();
495        let handle = TaskHandle {
496            inner: self.inner.clone(),
497            task_id: id.clone(),
498        };
499        // The task body can detect whether it was cancelled, so that completion
500        // distinguishes between `Failed` and `Canceled` states.
501        let cancel_for_task = task_cancel.clone();
502        let fut = make_fut(task_cancel.clone(), handle);
503
504        let inner_arc = self.inner.clone();
505        let notify = self.completed_notify.clone();
506        let id_for_task = id.clone();
507        let label_for_task = label.clone();
508        let join = tokio::spawn(async move {
509            let result = fut.await;
510            // Distinguish between a task error and an explicit cancellation: the latter
511            // records the status as `Canceled`, the former as `Failed`.
512            let status = if cancel_for_task.is_cancelled() {
513                TaskStatus::Canceled
514            } else if result.is_error() {
515                TaskStatus::Failed
516            } else {
517                TaskStatus::Completed
518            };
519            if let Ok(mut inner) = inner_arc.lock() {
520                inner.finish(&id_for_task, status);
521                inner.completed.push(BackgroundOutcome {
522                    task_id: id_for_task,
523                    label: label_for_task,
524                    result,
525                });
526            }
527            // Wakes the session driver waiting on `wait_for_completion` (active
528            // continuation).
529            // Uses `notify_one` instead of `notify_waiters`: the former **retains a
530            // permit** when no waiters exist,
531            // so the next `notified().await` returns immediately — avoiding lost wakeups
532            // when a task completes
533            // before the driver parks. Single consumer (exactly one driver), so
534            // `notify_one` semantics are correct.
535            // Notify outside the lock.
536            notify.notify_one();
537        });
538
539        inner.tasks.insert(
540            id.clone(),
541            TaskEntry {
542                label,
543                status: TaskStatus::Running,
544                cancel: task_cancel,
545                history: None,
546                handle: Some(join),
547                finished_seq: None,
548            },
549        );
550        id
551    }
552
553    /// Drain all completed results (clears the queue). Called by `run_turn` before
554    /// starting a turn to passively collect results.
555    pub fn drain_completed(&self) -> Vec<BackgroundOutcome> {
556        let mut inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
557        std::mem::take(&mut inner.completed)
558    }
559
560    /// Number of currently running tasks. Used for diagnostics / control plane.
561    #[must_use]
562    pub fn running_count(&self) -> usize {
563        self.inner
564            .lock()
565            .expect("BackgroundTasks mutex poisoned")
566            .tasks
567            .values()
568            .filter(|e| e.status == TaskStatus::Running)
569            .count()
570    }
571
572    /// Returns a snapshot of all tasks (running + recently finished), **without reading
573    /// history** (`recent` is empty, `block_count` is 0). Sorted by task ID in ascending
574    /// order. Used by `inspect_background_task` when called without arguments.
575    #[must_use]
576    pub fn list(&self) -> Vec<TaskSnapshot> {
577        let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
578        inner
579            .tasks
580            .iter()
581            .map(|(id, e)| TaskSnapshot {
582                task_id: id.clone(),
583                label: e.label.clone(),
584                status: e.status,
585                block_count: 0,
586                recent: Vec::new(),
587            })
588            .collect()
589    }
590
591    /// Take a snapshot of a single task, including the most recent `recent_blocks`
592    /// message blocks submitted to the LLM (`None` uses the config default). Returns
593    /// `None` if the task does not exist (never spawned or already evicted); blocks are
594    /// empty if the task does not expose history.
595    ///
596    /// Implementation: clone the task's history `Arc` while holding the table lock, then
597    /// release the table lock before snapshotting (snapshotting uses the history's own
598    /// lock). This avoids performing a potentially expensive deep copy of history while
599    /// holding the table lock, which would block spawn/finish.
600    #[must_use]
601    pub fn peek(&self, id: &str, recent_blocks: Option<usize>) -> Option<TaskSnapshot> {
602        let n = self.progress_config.resolve_recent(recent_blocks);
603        let limit = self.progress_config.block_text_limit;
604        let (label, status, history) = {
605            let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
606            let entry = inner.tasks.get(id)?;
607            (entry.label.clone(), entry.status, entry.history.clone())
608        };
609        let (block_count, recent) = match history {
610            Some(h) => recent_blocks_of(&h.snapshot(), n, limit),
611            None => (0, Vec::new()),
612        };
613        Some(TaskSnapshot {
614            task_id: id.to_string(),
615            label,
616            status,
617            block_count,
618            recent,
619        })
620    }
621
622    /// Cancel a single task early: cancels only its dedicated child token, without
623    /// affecting other tasks.
624    ///
625    /// Returns `Some(true)` if a running task was found and cancellation was requested;
626    /// `Some(false)` if the task exists but is already in a terminal state (no-op);
627    /// `None` if no such id exists. Cancellation is **cooperative** — the task body must
628    /// observe its cancel token and exit; the status transitions to `Canceled` only when
629    /// the task actually finishes.
630    pub fn cancel_task(&self, id: &str) -> Option<bool> {
631        let inner = self.inner.lock().expect("BackgroundTasks mutex poisoned");
632        let entry = inner.tasks.get(id)?;
633        if entry.status.is_terminal() {
634            return Some(false);
635        }
636        entry.cancel.cancel();
637        Some(true)
638    }
639
640    /// Cancels all background tasks (called when the session ends). Idempotent.
641    pub fn cancel_all(&self) {
642        self.cancel.cancel();
643    }
644}
645
646/// A handle given to a background task, allowing it to share its history `Arc` into the
647/// task table so the control plane can peek at the message chunks it submits to the LLM.
648/// `Clone` is cheap (inner `Arc` + small string).
649#[derive(Clone)]
650pub struct TaskHandle {
651    inner: Arc<Mutex<BackgroundInner>>,
652    task_id: String,
653}
654
655impl TaskHandle {
656    /// Shares this task's history handle into the task table. Called by the `spawn_agent`
657    /// background path before constructing a child turn, passing the child turn's history
658    /// `Arc` — afterwards `peek` can snapshot the message chunks the child agent has
659    /// committed. The task entry may have already been evicted (in extreme cases the task
660    /// finishes instantly and is dropped by FIFO), in which case the operation is
661    /// silently ignored.
662    pub fn attach_history(&self, history: Arc<dyn History>) {
663        if let Ok(mut inner) = self.inner.lock()
664            && let Some(entry) = inner.tasks.get_mut(&self.task_id)
665        {
666            entry.history = Some(history);
667        }
668    }
669}
670
671/// Formats a background task outcome into a text block that is fed back into the
672/// conversation.
673///
674/// The wording is structured as a "deferred tool result return", clearly marking the
675/// source (task id + label) and success/failure, to prevent the model from
676/// misinterpreting it as user speech.
677/// Phase 2 will replace this with the proper ingest path using `IngestSource::Background`,
678/// at which point this function will be superseded by the corresponding payload.
679#[must_use]
680pub fn format_background_outcome(outcome: &BackgroundOutcome) -> String {
681    let status = if outcome.result.is_error() {
682        "failed"
683    } else {
684        "completed"
685    };
686    format!(
687        "⟨background task {} ({}) {}⟩\n{}",
688        outcome.task_id,
689        outcome.label,
690        status,
691        outcome.result.text()
692    )
693}
694
695#[cfg(test)]
696mod tests;