defect_agent/session/turn/compaction_slot.rs
1//! Background compression with single-flight semantics.
2//!
3//! Full compression (summarization) calls an LLM, incurring several seconds of latency.
4//! Running it synchronously on the turn critical path would force the user whose turn
5//! triggered the overflow to wait. Background compression moves this summarization call
6//! into a `tokio::spawn` task, so the turn does not block — it quietly compacts history
7//! before hitting the hard watermark.
8//!
9//! ## Why session-level and single-flight
10//!
11//! - **Single-flight**: At most one compression is in flight at a time. This is a
12//! prerequisite for `History::splice_prefix`'s concurrency invariant — "no mid-section
13//! messages are added or removed while a compression is in flight" — and the only
14//! operation that removes mid-section messages is compression itself. Two concurrent
15//! compressions would invalidate each other's computed `drop_count`.
16//! - **Session-level**: The compression task must outlive the turn that spawned it (the
17//! turn may end before the summary returns), so the `JoinHandle` is attached to the
18//! session, sharing the same lifetime as
19//! [`BackgroundTasks`](crate::session::BackgroundTasks).
20//!
21//! ## Write-back
22//!
23//! After the task completes, it directly calls `history.splice_prefix(drop_count,
24//! summary)` to **silently rewrite history** — note the difference from
25//! [`BackgroundTasks`](crate::session::BackgroundTasks): that path injects the result
26//! back into the conversation as a user message, whereas compression must be silent. On
27//! completion, a callback emits a `ContextCompressed` event for observability
28//! consumption.
29
30use std::sync::{Arc, Mutex};
31
32use futures::future::BoxFuture;
33use tokio::task::JoinHandle;
34
35use super::compact::{self, CompactionCtx};
36use crate::session::{CompactionReport, History};
37
38/// Completion callback: emits an event after receiving a compaction report. Returns a
39/// `BoxFuture` so the callback can be `await`ed (since `emit` is async) — the compaction
40/// task body directly `.await`s it, so event sending completes inside the compaction task
41/// under the same cancel/track constraints, without spawning a detached task (following
42/// the workspace `BoxFuture` convention).
43type OnDone = Arc<dyn Fn(CompactionReport) -> BoxFuture<'static, ()> + Send + Sync>;
44
45struct SlotInner {
46 /// The in-flight compaction task. `Some` means one is running; the task clears this
47 /// field on completion.
48 flight: Option<JoinHandle<()>>,
49}
50
51/// A session-level background compaction slot. `Clone` is cheap (internally an `Arc`).
52#[derive(Clone)]
53pub struct CompactionSlot {
54 inner: Arc<Mutex<SlotInner>>,
55}
56
57impl Default for CompactionSlot {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl CompactionSlot {
64 #[must_use]
65 pub fn new() -> Self {
66 Self {
67 inner: Arc::new(Mutex::new(SlotInner { flight: None })),
68 }
69 }
70
71 /// Whether a compaction is currently in flight.
72 #[must_use]
73 pub(crate) fn is_in_flight(&self) -> bool {
74 let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
75 // Clean up any finished handle (the task body has already written itself back;
76 // this only clears the bookkeeping).
77 if let Some(h) = &inner.flight
78 && h.is_finished()
79 {
80 inner.flight = None;
81 }
82 inner.flight.is_some()
83 }
84
85 /// Attempts to start a background compaction. If one is already in flight, returns
86 /// `false` (single-flight, no duplicate spawns).
87 ///
88 /// `history` is `Arc<dyn History>` so the `'static` task can hold it across turns.
89 /// `ctx` carries provider / model / tools / cancellation token. `on_done` is called
90 /// after a successful compaction (to emit an event).
91 pub(crate) fn try_spawn(
92 &self,
93 history: Arc<dyn History>,
94 ctx: CompactionCtx,
95 threshold: u64,
96 on_done: OnDone,
97 ) -> bool {
98 let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
99 // Clear any finished handles to prevent a stale completed handle from blocking
100 // new ones.
101 if let Some(h) = &inner.flight
102 && h.is_finished()
103 {
104 inner.flight = None;
105 }
106 if inner.flight.is_some() {
107 return false;
108 }
109
110 let slot = self.inner.clone();
111 let handle = tokio::spawn(async move {
112 run_once(history.as_ref(), &ctx, threshold, &on_done).await;
113 // Cleanup: remove this task's handle placeholder (only if it still points to
114 // us — i.e., the task has finished).
115 if let Ok(mut inner) = slot.lock()
116 && let Some(h) = &inner.flight
117 && h.is_finished()
118 {
119 inner.flight = None;
120 }
121 });
122 inner.flight = Some(handle);
123 true
124 }
125
126 /// Waits for an in-flight compaction to complete, if any. Used as a fallback for the
127 /// hard watermark: if a background compaction is already in progress when a new
128 /// request is about to be constructed, it is better to wait for it to finish than to
129 /// start another one synchronously. Returns immediately if no task is in flight.
130 pub(crate) async fn await_in_flight(&self) {
131 let handle = {
132 let mut inner = self.inner.lock().expect("CompactionSlot mutex poisoned");
133 inner.flight.take()
134 };
135 if let Some(handle) = handle {
136 // The task already wrote back its history; we only wait for it to finish
137 // here. Ignore `JoinError` (panic/abort).
138 let _ = handle.await;
139 }
140 }
141}
142
143/// Run one background compaction cycle: snapshot → plan → summarize → write back via
144/// `splice_prefix` → emit event.
145/// Any best-effort skip (no boundary / summary failure) returns silently.
146async fn run_once(history: &dyn History, ctx: &CompactionCtx, threshold: u64, on_done: &OnDone) {
147 let messages = history.snapshot();
148 let Some(plan) = compact::plan(&messages, threshold) else {
149 return;
150 };
151 let Some(summary) = compact::summarize(ctx, &plan.head, plan.prev_summary.as_deref()).await
152 else {
153 return;
154 };
155 let summary_msg = compact::summary_message(&summary);
156
157 // Key: `splice_prefix` drops only the first `drop_count` items from the current list,
158 // keeping everything after (including tail messages appended by the foreground while
159 // the summary was being computed). Single-flight ensures `drop_count` is still valid
160 // for the current list.
161 history.splice_prefix(plan.drop_count, summary_msg);
162 let tokens_after = history.token_estimate().unwrap_or(plan.tokens_before);
163
164 tracing::info!(
165 drop_count = plan.drop_count,
166 tokens_before = plan.tokens_before,
167 tokens_after,
168 "context compacted (background)"
169 );
170 on_done(CompactionReport {
171 tokens_before: plan.tokens_before,
172 tokens_after,
173 })
174 .await;
175}