coding_agent_search/indexer/
quarantine.rs1use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct QuarantineKey {
36 pub conversation_id: String,
37 pub schema_version: u32,
38}
39
40impl QuarantineKey {
41 #[must_use]
42 pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
43 Self {
44 conversation_id: conversation_id.into(),
45 schema_version,
46 }
47 }
48
49 fn storage_key(&self) -> String {
50 format!("{}::v{}", self.conversation_id, self.schema_version)
51 }
52
53 fn parse_storage_key(key: &str) -> Option<Self> {
54 let (conversation_id, version_part) = key.rsplit_once("::v")?;
55 let schema_version: u32 = version_part.parse().ok()?;
56 Some(Self {
57 conversation_id: conversation_id.to_string(),
58 schema_version,
59 })
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct QuarantineRecord {
72 pub first_attempt_at: DateTime<Utc>,
73 pub last_attempt_at: DateTime<Utc>,
74 pub attempt_count: u64,
75 pub last_reason: String,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub cass_version_at_quarantine: Option<String>,
85}
86
87impl QuarantineRecord {
88 #[must_use]
102 pub fn is_version_stale_for_retry(&self, current_version: &str) -> bool {
103 !matches!(&self.cass_version_at_quarantine, Some(v) if v == current_version)
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct QuarantineState {
112 #[serde(default = "default_storage_version")]
115 pub storage_version: u32,
116 pub entries: BTreeMap<String, QuarantineRecord>,
118}
119
120fn default_storage_version() -> u32 {
121 1
122}
123
124impl Default for QuarantineState {
125 fn default() -> Self {
126 Self {
127 storage_version: default_storage_version(),
128 entries: BTreeMap::new(),
129 }
130 }
131}
132
133impl QuarantineState {
134 pub const FILENAME: &'static str = "quarantine_state.json";
137
138 #[must_use]
139 pub fn path(data_dir: &Path) -> PathBuf {
140 data_dir.join(Self::FILENAME)
141 }
142
143 #[must_use]
148 pub fn load(data_dir: &Path) -> Self {
149 let path = Self::path(data_dir);
150 let Ok(text) = std::fs::read_to_string(&path) else {
151 return Self {
152 storage_version: 1,
153 entries: BTreeMap::new(),
154 };
155 };
156 match serde_json::from_str::<Self>(&text) {
157 Ok(state) => state,
158 Err(_) => Self {
159 storage_version: 1,
160 entries: BTreeMap::new(),
161 },
162 }
163 }
164
165 pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
168 std::fs::create_dir_all(data_dir)?;
169 let final_path = Self::path(data_dir);
170 let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
171 let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
172 std::fs::write(&tmp_path, json)?;
173 std::fs::rename(&tmp_path, &final_path)?;
174 Ok(())
175 }
176
177 pub fn record_attempt(
182 &mut self,
183 key: &QuarantineKey,
184 reason: impl Into<String>,
185 now: DateTime<Utc>,
186 ) {
187 let reason = reason.into();
188 let storage_key = key.storage_key();
189 if let Some(record) = self.entries.get_mut(&storage_key) {
190 record.last_attempt_at = now;
191 record.attempt_count = record.attempt_count.saturating_add(1);
192 record.last_reason = reason;
193 record.cass_version_at_quarantine = Some(current_cass_version().to_string());
194 } else {
195 self.entries.insert(
196 storage_key,
197 QuarantineRecord {
198 first_attempt_at: now,
199 last_attempt_at: now,
200 attempt_count: 1,
201 last_reason: reason,
202 cass_version_at_quarantine: Some(current_cass_version().to_string()),
203 },
204 );
205 }
206 }
207
208 pub fn clear(&mut self, key: &QuarantineKey) -> bool {
213 self.entries.remove(&key.storage_key()).is_some()
214 }
215
216 #[must_use]
220 pub fn len(&self) -> usize {
221 self.entries.len()
222 }
223
224 #[must_use]
225 pub fn is_empty(&self) -> bool {
226 self.entries.is_empty()
227 }
228
229 pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
231 self.entries.iter().filter_map(|(storage_key, record)| {
232 QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
233 })
234 }
235}
236
237fn current_cass_version() -> &'static str {
238 env!("CARGO_PKG_VERSION")
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use std::error::Error;
245 use tempfile::tempdir;
246
247 type TestResult = Result<(), Box<dyn Error>>;
248
249 fn test_error(message: impl Into<String>) -> Box<dyn Error> {
250 std::io::Error::other(message.into()).into()
251 }
252
253 fn ensure(condition: bool, message: impl Into<String>) -> TestResult {
254 if condition {
255 Ok(())
256 } else {
257 Err(test_error(message))
258 }
259 }
260
261 fn ts(seconds: i64) -> DateTime<Utc> {
262 DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
263 }
264
265 #[test]
266 fn record_attempt_dedups_by_conversation_and_schema_version() {
267 let mut state = QuarantineState::default();
268 assert_eq!(state.storage_version, 1);
269 let key = QuarantineKey::new("conv-a", 3);
270 state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
271 state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
272 state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
273
274 assert_eq!(state.len(), 1, "same key must dedup, not append");
275 let record = state
276 .entries
277 .get(&key.storage_key())
278 .expect("entry present");
279 assert_eq!(
280 record.first_attempt_at,
281 ts(1_700_000_000),
282 "first attempt preserved"
283 );
284 assert_eq!(
285 record.last_attempt_at,
286 ts(1_700_002_000),
287 "last attempt advances"
288 );
289 assert_eq!(record.attempt_count, 3);
290 assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
291 }
292
293 #[test]
294 fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
295 let mut state = QuarantineState::default();
296 let v3 = QuarantineKey::new("conv-a", 3);
297 let v4 = QuarantineKey::new("conv-a", 4);
298 state.record_attempt(&v3, "oom v3", ts(1));
299 state.record_attempt(&v4, "oom v4", ts(2));
300 assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
301 }
302
303 #[test]
304 fn save_and_load_roundtrips_quarantine_state() {
305 let dir = tempdir().unwrap();
306 let mut state = QuarantineState::default();
307 state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
308 state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
309 state.save(dir.path()).expect("save");
310
311 let loaded = QuarantineState::load(dir.path());
312 assert_eq!(loaded.len(), 2);
313 let r1 = loaded
314 .entries
315 .get(&QuarantineKey::new("c1", 1).storage_key())
316 .unwrap();
317 assert_eq!(r1.last_reason, "r1");
318 }
319
320 #[test]
321 fn load_returns_empty_for_missing_or_malformed_file() {
322 let dir = tempdir().unwrap();
323 let loaded = QuarantineState::load(dir.path());
324 assert!(loaded.is_empty());
325
326 std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
327 .expect("write malformed");
328 let loaded = QuarantineState::load(dir.path());
329 assert!(loaded.is_empty(), "malformed file must not block indexing");
330 }
331
332 #[test]
333 fn clear_removes_entry() {
334 let mut state = QuarantineState::default();
335 let key = QuarantineKey::new("c", 1);
336 state.record_attempt(&key, "r", ts(1));
337 assert!(state.clear(&key));
338 assert!(state.is_empty());
339 assert!(!state.clear(&key), "clearing absent key returns false");
340 }
341
342 #[test]
343 fn save_uses_atomic_rename_via_tmp_file() {
344 let dir = tempdir().unwrap();
345 let mut state = QuarantineState::default();
346 state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
347 state.save(dir.path()).expect("save");
348
349 let tmp_path = dir
351 .path()
352 .join(format!("{}.tmp", QuarantineState::FILENAME));
353 assert!(
354 !tmp_path.exists(),
355 "tmp file must be renamed away on success"
356 );
357 assert!(QuarantineState::path(dir.path()).exists());
358 }
359
360 #[test]
361 fn iter_yields_keys_in_deterministic_order() {
362 let mut state = QuarantineState::default();
363 state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
364 state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
365 let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
366 assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
368 }
369
370 #[test]
378 fn legacy_entry_missing_cass_version_deserialises_and_is_retry_eligible() -> TestResult {
379 let dir = tempdir()?;
380
381 let legacy_json = serde_json::json!({
384 "storage_version": 1,
385 "entries": {
386 "conv-legacy::v1": {
387 "first_attempt_at": "2025-11-01T00:00:00Z",
388 "last_attempt_at": "2025-11-01T00:00:00Z",
389 "attempt_count": 1,
390 "last_reason": "index-ingest-out-of-memory: out of memory"
391 }
393 }
394 });
395 std::fs::write(
396 dir.path().join(QuarantineState::FILENAME),
397 serde_json::to_string_pretty(&legacy_json)?,
398 )?;
399
400 let state = QuarantineState::load(dir.path());
401 ensure(
402 state.len() == 1,
403 format!(
404 "legacy entry must load without error; loaded {} entries",
405 state.len()
406 ),
407 )?;
408
409 let record = state
410 .entries
411 .values()
412 .next()
413 .ok_or_else(|| test_error("entry present after loading legacy fixture"))?;
414
415 ensure(
416 record.cass_version_at_quarantine.is_none(),
417 "missing field must deserialise as None, not cause an error",
418 )?;
419
420 ensure(
423 record.is_version_stale_for_retry("0.6.6"),
424 "legacy entry with cass_version_at_quarantine=None must be retry-eligible \
425 (cass#258 carry-over: v0.5.x entries were silently orphaned)",
426 )?;
427 ensure(
428 record.is_version_stale_for_retry("0.5.1"),
429 "legacy entry must be retry-eligible even when version string matches a v0.5.x tag",
430 )?;
431 ensure(
432 record.is_version_stale_for_retry("99.0.0"),
433 "legacy entry must be retry-eligible for any future version string",
434 )?;
435 Ok(())
436 }
437
438 #[test]
439 fn versioned_entry_retry_eligibility_gates_correctly() -> TestResult {
440 let current = current_cass_version();
441 let mut state = QuarantineState::default();
442 state.record_attempt(
443 &QuarantineKey::new("conv-v", 1),
444 "index-ingest-out-of-memory",
445 ts(1),
446 );
447 let record = state
450 .entries
451 .values()
452 .next()
453 .ok_or_else(|| test_error("same-version quarantine record exists"))?;
454 ensure(
457 !record.is_version_stale_for_retry(current),
458 "record stamped with current version must not be retry-eligible",
459 )?;
460
461 let mut state2 = QuarantineState::default();
463 state2.record_attempt(
464 &QuarantineKey::new("conv-old", 1),
465 "index-ingest-out-of-memory",
466 ts(2),
467 );
468 state2
469 .entries
470 .values_mut()
471 .next()
472 .ok_or_else(|| test_error("old-version quarantine record exists"))?
473 .cass_version_at_quarantine = Some("0.5.1".to_string());
474 let old_record = state2
475 .entries
476 .values()
477 .next()
478 .ok_or_else(|| test_error("old-version quarantine record still exists"))?;
479 ensure(
480 old_record.is_version_stale_for_retry(current),
481 "record stamped with older version must be retry-eligible after a version bump",
482 )?;
483 Ok(())
484 }
485}