Skip to main content

pi/
compaction_worker.rs

1//! Background compaction worker with basic quota controls.
2//!
3//! This keeps LLM compaction off the foreground turn path by running compaction
4//! on a dedicated thread and applying results on subsequent turns.
5
6use crate::compaction::{self, CompactionPreparation, CompactionResult};
7use crate::error::{Error, Result};
8use crate::provider::Provider;
9use std::sync::{Arc, Mutex as StdMutex, mpsc};
10use std::time::{Duration, Instant};
11
12/// Quota controls that bound background compaction resource usage.
13#[derive(Debug, Clone)]
14pub struct CompactionQuota {
15    /// Minimum elapsed time between compaction starts.
16    pub cooldown: Duration,
17    /// Maximum wall-clock time to wait for a background compaction result.
18    pub timeout: Duration,
19    /// Maximum compaction attempts allowed in a single session.
20    pub max_attempts_per_session: u32,
21}
22
23impl Default for CompactionQuota {
24    fn default() -> Self {
25        Self {
26            cooldown: Duration::from_secs(60),
27            timeout: Duration::from_secs(120),
28            max_attempts_per_session: 100,
29        }
30    }
31}
32
33type CompactionOutcome = Result<CompactionResult>;
34
35struct PendingCompaction {
36    rx: StdMutex<mpsc::Receiver<CompactionOutcome>>,
37    started_at: Instant,
38}
39
40/// Per-session background compaction state.
41pub(crate) struct CompactionWorkerState {
42    pending: Option<PendingCompaction>,
43    last_start: Option<Instant>,
44    attempt_count: u32,
45    quota: CompactionQuota,
46}
47
48impl CompactionWorkerState {
49    pub const fn new(quota: CompactionQuota) -> Self {
50        Self {
51            pending: None,
52            last_start: None,
53            attempt_count: 0,
54            quota,
55        }
56    }
57
58    /// Whether a new background compaction is allowed to start now.
59    pub fn can_start(&self) -> bool {
60        if self.pending.is_some() {
61            return false;
62        }
63        if self.attempt_count >= self.quota.max_attempts_per_session {
64            return false;
65        }
66        if let Some(last) = self.last_start {
67            if last.elapsed() < self.quota.cooldown {
68                return false;
69            }
70        }
71        true
72    }
73
74    /// Non-blocking check for a completed compaction result.
75    pub fn try_recv(&mut self) -> Option<CompactionOutcome> {
76        // Check timeout first (read-only borrow, then drop before mutation).
77        let timed_out = self
78            .pending
79            .as_ref()
80            .is_some_and(|p| p.started_at.elapsed() > self.quota.timeout);
81
82        if timed_out {
83            self.pending = None;
84            return Some(Err(Error::session(
85                "Background compaction timed out".to_string(),
86            )));
87        }
88
89        // Try to receive — take() moves ownership so no outstanding borrow.
90        let pending = self.pending.take()?;
91        let recv_result = match pending.rx.lock() {
92            Ok(rx) => rx.try_recv(),
93            Err(_) => {
94                return Some(Err(Error::session(
95                    "Background compaction receiver mutex poisoned".to_string(),
96                )));
97            }
98        };
99
100        match recv_result {
101            Ok(outcome) => Some(outcome),
102            Err(mpsc::TryRecvError::Empty) => {
103                // Not done yet — put it back.
104                self.pending = Some(pending);
105                None
106            }
107            Err(mpsc::TryRecvError::Disconnected) => Some(Err(Error::session(
108                "Background compaction worker disconnected".to_string(),
109            ))),
110        }
111    }
112
113    /// Spawn a background compaction in a dedicated thread.
114    pub fn start(
115        &mut self,
116        preparation: CompactionPreparation,
117        provider: Arc<dyn Provider>,
118        api_key: String,
119        custom_instructions: Option<String>,
120    ) {
121        debug_assert!(
122            self.can_start(),
123            "start() called while can_start() is false"
124        );
125
126        let (tx, rx) = mpsc::channel();
127        let now = Instant::now();
128
129        std::thread::Builder::new()
130            .name("pi-compaction-bg".to_string())
131            .spawn(move || {
132                run_compaction_thread(preparation, provider, api_key, custom_instructions, tx);
133            })
134            .expect("spawn background compaction thread");
135
136        self.pending = Some(PendingCompaction {
137            rx: StdMutex::new(rx),
138            started_at: now,
139        });
140        self.last_start = Some(now);
141        self.attempt_count = self.attempt_count.saturating_add(1);
142    }
143}
144
145#[allow(clippy::needless_pass_by_value)]
146fn run_compaction_thread(
147    preparation: CompactionPreparation,
148    provider: Arc<dyn Provider>,
149    api_key: String,
150    custom_instructions: Option<String>,
151    tx: mpsc::Sender<CompactionOutcome>,
152) {
153    let runtime = asupersync::runtime::RuntimeBuilder::current_thread()
154        .build()
155        .expect("build runtime for background compaction");
156
157    let result = runtime.block_on(async {
158        compaction::compact(
159            preparation,
160            provider,
161            &api_key,
162            custom_instructions.as_deref(),
163        )
164        .await
165    });
166
167    let _ = tx.send(result);
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173
174    fn make_worker(quota: CompactionQuota) -> CompactionWorkerState {
175        CompactionWorkerState::new(quota)
176    }
177
178    fn default_worker() -> CompactionWorkerState {
179        make_worker(CompactionQuota::default())
180    }
181
182    fn inject_pending(worker: &mut CompactionWorkerState, rx: mpsc::Receiver<CompactionOutcome>) {
183        worker.pending = Some(PendingCompaction {
184            rx: StdMutex::new(rx),
185            started_at: Instant::now(),
186        });
187        worker.last_start = Some(Instant::now());
188        worker.attempt_count += 1;
189    }
190
191    #[test]
192    fn fresh_worker_can_start() {
193        let w = default_worker();
194        assert!(w.can_start());
195    }
196
197    #[test]
198    fn cannot_start_while_pending() {
199        let mut w = default_worker();
200        let (_tx, rx) = mpsc::channel();
201        inject_pending(&mut w, rx);
202        assert!(!w.can_start());
203    }
204
205    #[test]
206    fn cannot_start_during_cooldown() {
207        let mut w = make_worker(CompactionQuota {
208            cooldown: Duration::from_secs(3600),
209            ..CompactionQuota::default()
210        });
211        w.last_start = Some(Instant::now());
212        w.attempt_count = 1;
213        assert!(!w.can_start());
214    }
215
216    #[test]
217    fn can_start_after_cooldown() {
218        let mut w = make_worker(CompactionQuota {
219            cooldown: Duration::from_millis(0),
220            ..CompactionQuota::default()
221        });
222        w.last_start = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
223        w.attempt_count = 1;
224        assert!(w.can_start());
225    }
226
227    #[test]
228    fn max_attempts_blocks_start() {
229        let mut w = make_worker(CompactionQuota {
230            max_attempts_per_session: 2,
231            cooldown: Duration::from_millis(0),
232            ..CompactionQuota::default()
233        });
234        w.attempt_count = 2;
235        assert!(!w.can_start());
236    }
237
238    #[test]
239    fn try_recv_none_when_no_pending() {
240        let mut w = default_worker();
241        assert!(w.try_recv().is_none());
242    }
243
244    #[test]
245    fn try_recv_none_when_not_ready() {
246        let mut w = default_worker();
247        let (_tx, rx) = mpsc::channel::<CompactionOutcome>();
248        inject_pending(&mut w, rx);
249        // Nothing sent yet.
250        assert!(w.try_recv().is_none());
251        // Pending should still be there.
252        assert!(w.pending.is_some());
253    }
254
255    #[test]
256    fn try_recv_returns_disconnected_when_sender_dropped() {
257        let mut w = default_worker();
258        let (tx, rx) = mpsc::channel::<CompactionOutcome>();
259        inject_pending(&mut w, rx);
260        drop(tx);
261        let outcome = w.try_recv().expect("should return disconnected error");
262        assert!(outcome.is_err());
263        assert!(w.pending.is_none());
264    }
265
266    #[test]
267    fn try_recv_timeout() {
268        let mut w = make_worker(CompactionQuota {
269            timeout: Duration::from_millis(0),
270            ..CompactionQuota::default()
271        });
272        let (_tx, rx) = mpsc::channel::<CompactionOutcome>();
273        w.pending = Some(PendingCompaction {
274            rx: StdMutex::new(rx),
275            started_at: Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
276        });
277        let outcome = w.try_recv().expect("should return timeout error");
278        assert!(outcome.is_err());
279        let err_msg = outcome.unwrap_err().to_string();
280        assert!(err_msg.contains("timed out"), "got: {err_msg}");
281    }
282
283    #[test]
284    fn try_recv_success() {
285        let mut w = default_worker();
286        let (tx, rx) = mpsc::channel::<CompactionOutcome>();
287        inject_pending(&mut w, rx);
288
289        // Simulate a successful compaction result.
290        let result = CompactionResult {
291            summary: "test summary".to_string(),
292            first_kept_entry_id: "entry-1".to_string(),
293            tokens_before: 1000,
294            details: compaction::CompactionDetails {
295                read_files: vec![],
296                modified_files: vec![],
297            },
298        };
299        tx.send(Ok(result)).unwrap();
300
301        let outcome = w.try_recv().expect("should have result");
302        let result = outcome.expect("should be Ok");
303        assert_eq!(result.summary, "test summary");
304        assert!(w.pending.is_none());
305    }
306}