Skip to main content

defect_agent/session/turn/
compaction_slot.rs

1//! Background compression with single-flight semantics.
2//!
3//! Full compression (summarization) calls an LLM, incurring several seconds of latency.
4//! Running it synchronously on the turn critical path would force the user whose turn
5//! triggered the overflow to wait. Background compression moves this summarization call
6//! into a `tokio::spawn` task, so the turn does not block — it quietly compacts history
7//! before hitting the hard watermark.
8//!
9//! ## Why session-level and single-flight
10//!
11//! - **Single-flight**: At most one compression is in flight at a time. This is a
12//!   prerequisite for `History::splice_prefix`'s concurrency invariant — "no mid-section
13//!   messages are added or removed while a compression is in flight" — and the only
14//!   operation that removes mid-section messages is compression itself. Two concurrent
15//!   compressions would invalidate each other's computed `drop_count`.
16//! - **Session-level**: The compression task must outlive the turn that spawned it (the
17//!   turn may end before the summary returns), so the `JoinHandle` is attached to the
18//!   session, sharing the same lifetime as
19//!   [`BackgroundTasks`](crate::session::BackgroundTasks).
20//!
21//! ## Write-back
22//!
23//! After the task completes, it directly calls `history.splice_prefix(drop_count,
24//! summary)` to **silently rewrite history** — note the difference from
25//! [`BackgroundTasks`](crate::session::BackgroundTasks): that path injects the result
26//! back into the conversation as a user message, whereas compression must be silent. On
27//! completion, a callback emits a `ContextCompressed` event for observability
28//! consumption.
29
30use std::sync::{Arc, Mutex};
31
32use futures::future::BoxFuture;
33use tokio::task::JoinHandle;
34
35use super::compact::{self, CompactionCtx};
36use crate::session::{CompactionReport, History};
37
38/// Completion callback: emits an event after receiving a compaction report. Returns a
39/// `BoxFuture` so the callback can be `await`ed (since `emit` is async) — the compaction
40/// task body directly `.await`s it, so event sending completes inside the compaction task
41/// under the same cancel/track constraints, without spawning a detached task (following
42/// the workspace `BoxFuture` convention).
43type OnDone = Arc<dyn Fn(CompactionReport) -> BoxFuture<'static, ()> + Send + Sync>;
44
45struct SlotInner {
46    /// The in-flight compaction task. `Some` means one is running; the task clears this
47    /// field on completion.
48    flight: Option<JoinHandle<()>>,
49}
50
51/// A session-level background compaction slot. `Clone` is cheap (internally an `Arc`).
52#[derive(Clone)]
53pub struct CompactionSlot {
54    inner: Arc<Mutex<SlotInner>>,
55}
56
57impl Default for CompactionSlot {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl CompactionSlot {
64    #[must_use]
65    pub fn new() -> Self {
66        Self {
67            inner: Arc::new(Mutex::new(SlotInner { flight: None })),
68        }
69    }
70
71    /// Whether a compaction is currently in flight.
72    #[must_use]
73    pub(crate) fn is_in_flight(&self) -> bool {
74        let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
75        // Clean up any finished handle (the task body has already written itself back;
76        // this only clears the bookkeeping).
77        if let Some(h) = &inner.flight
78            && h.is_finished()
79        {
80            inner.flight = None;
81        }
82        inner.flight.is_some()
83    }
84
85    /// Attempts to start a background compaction. If one is already in flight, returns
86    /// `false` (single-flight, no duplicate spawns).
87    ///
88    /// `history` is `Arc<dyn History>` so the `'static` task can hold it across turns.
89    /// `ctx` carries provider / model / tools / cancellation token. `on_done` is called
90    /// after a successful compaction (to emit an event).
91    pub(crate) fn try_spawn(
92        &self,
93        history: Arc<dyn History>,
94        ctx: CompactionCtx,
95        threshold: u64,
96        on_done: OnDone,
97    ) -> bool {
98        let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
99        // Clear any finished handles to prevent a stale completed handle from blocking
100        // new ones.
101        if let Some(h) = &inner.flight
102            && h.is_finished()
103        {
104            inner.flight = None;
105        }
106        if inner.flight.is_some() {
107            return false;
108        }
109
110        let slot = self.inner.clone();
111        let handle = tokio::spawn(async move {
112            run_once(history.as_ref(), &ctx, threshold, &on_done).await;
113            // Cleanup: remove this task's handle placeholder (only if it still points to
114            // us — i.e., the task has finished).
115            if let Ok(mut inner) = slot.lock()
116                && let Some(h) = &inner.flight
117                && h.is_finished()
118            {
119                inner.flight = None;
120            }
121        });
122        inner.flight = Some(handle);
123        true
124    }
125
126    /// Waits for an in-flight compaction to complete, if any. Used as a fallback for the
127    /// hard watermark: if a background compaction is already in progress when a new
128    /// request is about to be constructed, it is better to wait for it to finish than to
129    /// start another one synchronously. Returns immediately if no task is in flight.
130    pub(crate) async fn await_in_flight(&self) {
131        let handle = {
132            let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
133            inner.flight.take()
134        };
135        if let Some(handle) = handle {
136            // The task already wrote back its history; we only wait for it to finish
137            // here. Ignore `JoinError` (panic/abort).
138            let _ = handle.await;
139        }
140    }
141}
142
143/// Run one background compaction cycle: snapshot → plan → summarize → write back via
144/// `splice_prefix` → emit event.
145/// Any best-effort skip (no boundary / summary failure) returns silently.
146async fn run_once(history: &dyn History, ctx: &CompactionCtx, threshold: u64, on_done: &OnDone) {
147    let messages = history.snapshot();
148    let Some(plan) = compact::plan(&messages, threshold) else {
149        return;
150    };
151    let Some(summary) = compact::summarize(ctx, &plan.head, plan.prev_summary.as_deref()).await
152    else {
153        return;
154    };
155    let summary_msg = compact::summary_message(&summary);
156
157    // Key: `splice_prefix` drops only the first `drop_count` items from the current list,
158    // keeping everything after (including tail messages appended by the foreground while
159    // the summary was being computed). Single-flight ensures `drop_count` is still valid
160    // for the current list.
161    history.splice_prefix(plan.drop_count, summary_msg);
162    let tokens_after = history.token_estimate().unwrap_or(plan.tokens_before);
163
164    tracing::info!(
165        drop_count = plan.drop_count,
166        tokens_before = plan.tokens_before,
167        tokens_after,
168        "context compacted (background)"
169    );
170    on_done(CompactionReport {
171        tokens_before: plan.tokens_before,
172        tokens_after,
173    })
174    .await;
175}