1use crate::compaction::{self, CompactionPreparation, CompactionResult};
7use crate::error::{Error, Result};
8use crate::provider::Provider;
9use std::sync::{Arc, Mutex as StdMutex, mpsc};
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone)]
14pub struct CompactionQuota {
15 pub cooldown: Duration,
17 pub timeout: Duration,
19 pub max_attempts_per_session: u32,
21}
22
23impl Default for CompactionQuota {
24 fn default() -> Self {
25 Self {
26 cooldown: Duration::from_secs(60),
27 timeout: Duration::from_secs(120),
28 max_attempts_per_session: 100,
29 }
30 }
31}
32
33type CompactionOutcome = Result<CompactionResult>;
34
35struct PendingCompaction {
36 rx: StdMutex<mpsc::Receiver<CompactionOutcome>>,
37 started_at: Instant,
38}
39
40pub(crate) struct CompactionWorkerState {
42 pending: Option<PendingCompaction>,
43 last_start: Option<Instant>,
44 attempt_count: u32,
45 quota: CompactionQuota,
46}
47
48impl CompactionWorkerState {
49 pub const fn new(quota: CompactionQuota) -> Self {
50 Self {
51 pending: None,
52 last_start: None,
53 attempt_count: 0,
54 quota,
55 }
56 }
57
58 pub fn can_start(&self) -> bool {
60 if self.pending.is_some() {
61 return false;
62 }
63 if self.attempt_count >= self.quota.max_attempts_per_session {
64 return false;
65 }
66 if let Some(last) = self.last_start {
67 if last.elapsed() < self.quota.cooldown {
68 return false;
69 }
70 }
71 true
72 }
73
74 pub fn try_recv(&mut self) -> Option<CompactionOutcome> {
76 let timed_out = self
78 .pending
79 .as_ref()
80 .is_some_and(|p| p.started_at.elapsed() > self.quota.timeout);
81
82 if timed_out {
83 self.pending = None;
84 return Some(Err(Error::session(
85 "Background compaction timed out".to_string(),
86 )));
87 }
88
89 let pending = self.pending.take()?;
91 let recv_result = match pending.rx.lock() {
92 Ok(rx) => rx.try_recv(),
93 Err(_) => {
94 return Some(Err(Error::session(
95 "Background compaction receiver mutex poisoned".to_string(),
96 )));
97 }
98 };
99
100 match recv_result {
101 Ok(outcome) => Some(outcome),
102 Err(mpsc::TryRecvError::Empty) => {
103 self.pending = Some(pending);
105 None
106 }
107 Err(mpsc::TryRecvError::Disconnected) => Some(Err(Error::session(
108 "Background compaction worker disconnected".to_string(),
109 ))),
110 }
111 }
112
113 pub fn start(
115 &mut self,
116 preparation: CompactionPreparation,
117 provider: Arc<dyn Provider>,
118 api_key: String,
119 custom_instructions: Option<String>,
120 ) {
121 debug_assert!(
122 self.can_start(),
123 "start() called while can_start() is false"
124 );
125
126 let (tx, rx) = mpsc::channel();
127 let now = Instant::now();
128
129 std::thread::Builder::new()
130 .name("pi-compaction-bg".to_string())
131 .spawn(move || {
132 run_compaction_thread(preparation, provider, api_key, custom_instructions, tx);
133 })
134 .expect("spawn background compaction thread");
135
136 self.pending = Some(PendingCompaction {
137 rx: StdMutex::new(rx),
138 started_at: now,
139 });
140 self.last_start = Some(now);
141 self.attempt_count = self.attempt_count.saturating_add(1);
142 }
143}
144
145#[allow(clippy::needless_pass_by_value)]
146fn run_compaction_thread(
147 preparation: CompactionPreparation,
148 provider: Arc<dyn Provider>,
149 api_key: String,
150 custom_instructions: Option<String>,
151 tx: mpsc::Sender<CompactionOutcome>,
152) {
153 let runtime = asupersync::runtime::RuntimeBuilder::current_thread()
154 .build()
155 .expect("build runtime for background compaction");
156
157 let result = runtime.block_on(async {
158 compaction::compact(
159 preparation,
160 provider,
161 &api_key,
162 custom_instructions.as_deref(),
163 )
164 .await
165 });
166
167 let _ = tx.send(result);
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173
174 fn make_worker(quota: CompactionQuota) -> CompactionWorkerState {
175 CompactionWorkerState::new(quota)
176 }
177
178 fn default_worker() -> CompactionWorkerState {
179 make_worker(CompactionQuota::default())
180 }
181
182 fn inject_pending(worker: &mut CompactionWorkerState, rx: mpsc::Receiver<CompactionOutcome>) {
183 worker.pending = Some(PendingCompaction {
184 rx: StdMutex::new(rx),
185 started_at: Instant::now(),
186 });
187 worker.last_start = Some(Instant::now());
188 worker.attempt_count += 1;
189 }
190
191 #[test]
192 fn fresh_worker_can_start() {
193 let w = default_worker();
194 assert!(w.can_start());
195 }
196
197 #[test]
198 fn cannot_start_while_pending() {
199 let mut w = default_worker();
200 let (_tx, rx) = mpsc::channel();
201 inject_pending(&mut w, rx);
202 assert!(!w.can_start());
203 }
204
205 #[test]
206 fn cannot_start_during_cooldown() {
207 let mut w = make_worker(CompactionQuota {
208 cooldown: Duration::from_secs(3600),
209 ..CompactionQuota::default()
210 });
211 w.last_start = Some(Instant::now());
212 w.attempt_count = 1;
213 assert!(!w.can_start());
214 }
215
216 #[test]
217 fn can_start_after_cooldown() {
218 let mut w = make_worker(CompactionQuota {
219 cooldown: Duration::from_millis(0),
220 ..CompactionQuota::default()
221 });
222 w.last_start = Some(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
223 w.attempt_count = 1;
224 assert!(w.can_start());
225 }
226
227 #[test]
228 fn max_attempts_blocks_start() {
229 let mut w = make_worker(CompactionQuota {
230 max_attempts_per_session: 2,
231 cooldown: Duration::from_millis(0),
232 ..CompactionQuota::default()
233 });
234 w.attempt_count = 2;
235 assert!(!w.can_start());
236 }
237
238 #[test]
239 fn try_recv_none_when_no_pending() {
240 let mut w = default_worker();
241 assert!(w.try_recv().is_none());
242 }
243
244 #[test]
245 fn try_recv_none_when_not_ready() {
246 let mut w = default_worker();
247 let (_tx, rx) = mpsc::channel::<CompactionOutcome>();
248 inject_pending(&mut w, rx);
249 assert!(w.try_recv().is_none());
251 assert!(w.pending.is_some());
253 }
254
255 #[test]
256 fn try_recv_returns_disconnected_when_sender_dropped() {
257 let mut w = default_worker();
258 let (tx, rx) = mpsc::channel::<CompactionOutcome>();
259 inject_pending(&mut w, rx);
260 drop(tx);
261 let outcome = w.try_recv().expect("should return disconnected error");
262 assert!(outcome.is_err());
263 assert!(w.pending.is_none());
264 }
265
266 #[test]
267 fn try_recv_timeout() {
268 let mut w = make_worker(CompactionQuota {
269 timeout: Duration::from_millis(0),
270 ..CompactionQuota::default()
271 });
272 let (_tx, rx) = mpsc::channel::<CompactionOutcome>();
273 w.pending = Some(PendingCompaction {
274 rx: StdMutex::new(rx),
275 started_at: Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
276 });
277 let outcome = w.try_recv().expect("should return timeout error");
278 assert!(outcome.is_err());
279 let err_msg = outcome.unwrap_err().to_string();
280 assert!(err_msg.contains("timed out"), "got: {err_msg}");
281 }
282
283 #[test]
284 fn try_recv_success() {
285 let mut w = default_worker();
286 let (tx, rx) = mpsc::channel::<CompactionOutcome>();
287 inject_pending(&mut w, rx);
288
289 let result = CompactionResult {
291 summary: "test summary".to_string(),
292 first_kept_entry_id: "entry-1".to_string(),
293 tokens_before: 1000,
294 details: compaction::CompactionDetails {
295 read_files: vec![],
296 modified_files: vec![],
297 },
298 };
299 tx.send(Ok(result)).unwrap();
300
301 let outcome = w.try_recv().expect("should have result");
302 let result = outcome.expect("should be Ok");
303 assert_eq!(result.summary, "test summary");
304 assert!(w.pending.is_none());
305 }
306}