Skip to main content

coding_agent_search/indexer/
quarantine.rs

1//! Single-session streaming-OOM quarantine state (#243).
2//!
3//! When `cass index` (non-watch) encounters an irreducible streaming
4//! OOM on a single conversation after deferred-lexical retry, the
5//! policy is **quarantine-and-continue**: record the poison session,
6//! advance the refresh for the rest of the corpus, and surface
7//! `quarantined_conversations=N` so operators see it.
8//!
9//! The critical correctness property is **deduplication by conversation
10//! identity**, not by occurrence: every refresh tick that hits the same
11//! poison session must update the existing record's `last_attempt_at`
12//! and `attempt_count`, not append a fresh entry. Without that, repeated
13//! refreshes on a hot poison session would unbounded-grow the
14//! quarantine state file and obscure which sessions are genuinely new
15//! failures.
16//!
17//! Storage format: `<data_dir>/quarantine_state.json`, an object keyed
18//! by `(conversation_id, schema_version)` so a schema bump that might
19//! make a previously-poison session indexable again produces a fresh
20//! quarantine record rather than coalescing with the stale one.
21
22use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27
28/// Identity of a quarantined conversation, used as the dedup key.
29///
30/// `schema_version` is folded in so a future schema bump that changes
31/// streaming-consumer memory pressure (e.g. a new message-format
32/// encoding that no longer OOMs on the same conversation) produces a
33/// fresh attempt rather than perpetually inheriting the prior verdict.
34#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct QuarantineKey {
36    pub conversation_id: String,
37    pub schema_version: u32,
38}
39
40impl QuarantineKey {
41    #[must_use]
42    pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
43        Self {
44            conversation_id: conversation_id.into(),
45            schema_version,
46        }
47    }
48
49    fn storage_key(&self) -> String {
50        format!("{}::v{}", self.conversation_id, self.schema_version)
51    }
52
53    fn parse_storage_key(key: &str) -> Option<Self> {
54        let (conversation_id, version_part) = key.rsplit_once("::v")?;
55        let schema_version: u32 = version_part.parse().ok()?;
56        Some(Self {
57            conversation_id: conversation_id.to_string(),
58            schema_version,
59        })
60    }
61}
62
63/// One quarantine record, identified by [`QuarantineKey`].
64///
65/// `first_attempt_at` is preserved across repeated refreshes; only
66/// `last_attempt_at`, `attempt_count`, and `last_reason` advance on
67/// each occurrence. This is the contract that prevents the
68/// "appending duplicate quarantine records" anti-pattern flagged on
69/// #243.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct QuarantineRecord {
72    pub first_attempt_at: DateTime<Utc>,
73    pub last_attempt_at: DateTime<Utc>,
74    pub attempt_count: u64,
75    pub last_reason: String,
76    /// Cass version that produced the quarantine entry.
77    ///
78    /// `None` means the record pre-dates v0.6.x (written by v0.5.x or
79    /// earlier), which lacked this field entirely.  The field is
80    /// `#[serde(default)]` so missing-on-disk JSON is silently treated as
81    /// `None` rather than a deserialisation error — critical for the legacy
82    /// carry-over path described in cass#258.
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub cass_version_at_quarantine: Option<String>,
85}
86
87impl QuarantineRecord {
88    /// Returns `true` when this record should be re-attempted on the next
89    /// broad scan under `current_version`.
90    ///
91    /// The rule is:
92    /// - `None`  → **legacy entry** (pre-v0.6.x, no version recorded) — always
93    ///   retry-eligible, because the bug that originally caused the OOM may
94    ///   already be fixed in the current binary.  Treating `None` as "same
95    ///   version" would silently orphan every v0.5.x quarantine entry forever,
96    ///   which is the cass#258 carry-over bug this method closes.
97    /// - `Some(v)` where `v == current_version` → already retried under this
98    ///   binary; **not** eligible (avoid infinite retry storms).
99    /// - `Some(v)` where `v != current_version` → **stale entry**; a version
100    ///   bump may have fixed the underlying bug, so retry once.
101    #[must_use]
102    pub fn is_version_stale_for_retry(&self, current_version: &str) -> bool {
103        !matches!(&self.cass_version_at_quarantine, Some(v) if v == current_version)
104    }
105}
106
107/// In-memory view of the quarantine state file. Use [`QuarantineState::load`]
108/// to read, [`QuarantineState::record_attempt`] / [`QuarantineState::clear`]
109/// to mutate, and [`QuarantineState::save`] to atomically persist.
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct QuarantineState {
112    /// Storage version of the file format itself (not the schema_version
113    /// inside the keys). Bumped when the on-disk shape changes.
114    #[serde(default = "default_storage_version")]
115    pub storage_version: u32,
116    /// Keyed by `QuarantineKey::storage_key()` for stable JSON ordering.
117    pub entries: BTreeMap<String, QuarantineRecord>,
118}
119
120fn default_storage_version() -> u32 {
121    1
122}
123
124impl Default for QuarantineState {
125    fn default() -> Self {
126        Self {
127            storage_version: default_storage_version(),
128            entries: BTreeMap::new(),
129        }
130    }
131}
132
133impl QuarantineState {
134    /// Filename used inside `data_dir`. Stable on disk so external
135    /// tooling can locate the state.
136    pub const FILENAME: &'static str = "quarantine_state.json";
137
138    #[must_use]
139    pub fn path(data_dir: &Path) -> PathBuf {
140        data_dir.join(Self::FILENAME)
141    }
142
143    /// Load the quarantine state from `<data_dir>/quarantine_state.json`.
144    /// Returns an empty state when the file is missing or malformed —
145    /// quarantine is best-effort metadata and an unreadable state file
146    /// must not block indexing.
147    #[must_use]
148    pub fn load(data_dir: &Path) -> Self {
149        let path = Self::path(data_dir);
150        let Ok(text) = std::fs::read_to_string(&path) else {
151            return Self {
152                storage_version: 1,
153                entries: BTreeMap::new(),
154            };
155        };
156        match serde_json::from_str::<Self>(&text) {
157            Ok(state) => state,
158            Err(_) => Self {
159                storage_version: 1,
160                entries: BTreeMap::new(),
161            },
162        }
163    }
164
165    /// Atomically write the quarantine state to disk via temp file + rename,
166    /// so partial writes can never produce a corrupt quarantine_state.json.
167    pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
168        std::fs::create_dir_all(data_dir)?;
169        let final_path = Self::path(data_dir);
170        let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
171        let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
172        std::fs::write(&tmp_path, json)?;
173        std::fs::rename(&tmp_path, &final_path)?;
174        Ok(())
175    }
176
177    /// Record an attempt that failed irreducibly on `key`. If the key
178    /// already exists, the existing record is **updated in place**
179    /// (`last_attempt_at`, `attempt_count`, `last_reason`) rather than
180    /// appended — this is the dedup contract from #243.
181    pub fn record_attempt(
182        &mut self,
183        key: &QuarantineKey,
184        reason: impl Into<String>,
185        now: DateTime<Utc>,
186    ) {
187        let reason = reason.into();
188        let storage_key = key.storage_key();
189        if let Some(record) = self.entries.get_mut(&storage_key) {
190            record.last_attempt_at = now;
191            record.attempt_count = record.attempt_count.saturating_add(1);
192            record.last_reason = reason;
193            record.cass_version_at_quarantine = Some(current_cass_version().to_string());
194        } else {
195            self.entries.insert(
196                storage_key,
197                QuarantineRecord {
198                    first_attempt_at: now,
199                    last_attempt_at: now,
200                    attempt_count: 1,
201                    last_reason: reason,
202                    cass_version_at_quarantine: Some(current_cass_version().to_string()),
203                },
204            );
205        }
206    }
207
208    /// Remove a quarantine entry. Called by `cass quarantine clear`
209    /// after the operator has confirmed the underlying issue is
210    /// resolved (e.g. a memory bump on the streaming consumer, a
211    /// schema fix, or accepting the loss).
212    pub fn clear(&mut self, key: &QuarantineKey) -> bool {
213        self.entries.remove(&key.storage_key()).is_some()
214    }
215
216    /// Number of currently-quarantined `(conversation_id, schema_version)`
217    /// keys. This is what `cass health` and the indexer JSON envelope
218    /// surface as `quarantined_conversations`.
219    #[must_use]
220    pub fn len(&self) -> usize {
221        self.entries.len()
222    }
223
224    #[must_use]
225    pub fn is_empty(&self) -> bool {
226        self.entries.is_empty()
227    }
228
229    /// Iterate over `(key, record)` pairs in deterministic order.
230    pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
231        self.entries.iter().filter_map(|(storage_key, record)| {
232            QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
233        })
234    }
235}
236
237fn current_cass_version() -> &'static str {
238    env!("CARGO_PKG_VERSION")
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use std::error::Error;
245    use tempfile::tempdir;
246
247    type TestResult = Result<(), Box<dyn Error>>;
248
249    fn test_error(message: impl Into<String>) -> Box<dyn Error> {
250        std::io::Error::other(message.into()).into()
251    }
252
253    fn ensure(condition: bool, message: impl Into<String>) -> TestResult {
254        if condition {
255            Ok(())
256        } else {
257            Err(test_error(message))
258        }
259    }
260
261    fn ts(seconds: i64) -> DateTime<Utc> {
262        DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
263    }
264
265    #[test]
266    fn record_attempt_dedups_by_conversation_and_schema_version() {
267        let mut state = QuarantineState::default();
268        assert_eq!(state.storage_version, 1);
269        let key = QuarantineKey::new("conv-a", 3);
270        state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
271        state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
272        state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
273
274        assert_eq!(state.len(), 1, "same key must dedup, not append");
275        let record = state
276            .entries
277            .get(&key.storage_key())
278            .expect("entry present");
279        assert_eq!(
280            record.first_attempt_at,
281            ts(1_700_000_000),
282            "first attempt preserved"
283        );
284        assert_eq!(
285            record.last_attempt_at,
286            ts(1_700_002_000),
287            "last attempt advances"
288        );
289        assert_eq!(record.attempt_count, 3);
290        assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
291    }
292
293    #[test]
294    fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
295        let mut state = QuarantineState::default();
296        let v3 = QuarantineKey::new("conv-a", 3);
297        let v4 = QuarantineKey::new("conv-a", 4);
298        state.record_attempt(&v3, "oom v3", ts(1));
299        state.record_attempt(&v4, "oom v4", ts(2));
300        assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
301    }
302
303    #[test]
304    fn save_and_load_roundtrips_quarantine_state() {
305        let dir = tempdir().unwrap();
306        let mut state = QuarantineState::default();
307        state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
308        state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
309        state.save(dir.path()).expect("save");
310
311        let loaded = QuarantineState::load(dir.path());
312        assert_eq!(loaded.len(), 2);
313        let r1 = loaded
314            .entries
315            .get(&QuarantineKey::new("c1", 1).storage_key())
316            .unwrap();
317        assert_eq!(r1.last_reason, "r1");
318    }
319
320    #[test]
321    fn load_returns_empty_for_missing_or_malformed_file() {
322        let dir = tempdir().unwrap();
323        let loaded = QuarantineState::load(dir.path());
324        assert!(loaded.is_empty());
325
326        std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
327            .expect("write malformed");
328        let loaded = QuarantineState::load(dir.path());
329        assert!(loaded.is_empty(), "malformed file must not block indexing");
330    }
331
332    #[test]
333    fn clear_removes_entry() {
334        let mut state = QuarantineState::default();
335        let key = QuarantineKey::new("c", 1);
336        state.record_attempt(&key, "r", ts(1));
337        assert!(state.clear(&key));
338        assert!(state.is_empty());
339        assert!(!state.clear(&key), "clearing absent key returns false");
340    }
341
342    #[test]
343    fn save_uses_atomic_rename_via_tmp_file() {
344        let dir = tempdir().unwrap();
345        let mut state = QuarantineState::default();
346        state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
347        state.save(dir.path()).expect("save");
348
349        // The tmp file must not be left behind after a successful save.
350        let tmp_path = dir
351            .path()
352            .join(format!("{}.tmp", QuarantineState::FILENAME));
353        assert!(
354            !tmp_path.exists(),
355            "tmp file must be renamed away on success"
356        );
357        assert!(QuarantineState::path(dir.path()).exists());
358    }
359
360    #[test]
361    fn iter_yields_keys_in_deterministic_order() {
362        let mut state = QuarantineState::default();
363        state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
364        state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
365        let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
366        // BTreeMap-backed: ordering is by storage_key, which sorts c1 before c2.
367        assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
368    }
369
370    /// Regression for cass#258 carry-over: v0.5.x entries written without
371    /// `cass_version_at_quarantine` must deserialise cleanly (field becomes
372    /// `None` via `#[serde(default)]`) and be considered retry-eligible so
373    /// they are not silently orphaned forever.
374    ///
375    /// The fixture JSON deliberately omits the field, simulating a real
376    /// `quarantine_state.json` produced by cass ≤ 0.5.1.
377    #[test]
378    fn legacy_entry_missing_cass_version_deserialises_and_is_retry_eligible() -> TestResult {
379        let dir = tempdir()?;
380
381        // Write a minimal quarantine_state.json that looks like v0.5.x output:
382        // the `cass_version_at_quarantine` key is entirely absent.
383        let legacy_json = serde_json::json!({
384            "storage_version": 1,
385            "entries": {
386                "conv-legacy::v1": {
387                    "first_attempt_at": "2025-11-01T00:00:00Z",
388                    "last_attempt_at": "2025-11-01T00:00:00Z",
389                    "attempt_count": 1,
390                    "last_reason": "index-ingest-out-of-memory: out of memory"
391                    // cass_version_at_quarantine intentionally absent
392                }
393            }
394        });
395        std::fs::write(
396            dir.path().join(QuarantineState::FILENAME),
397            serde_json::to_string_pretty(&legacy_json)?,
398        )?;
399
400        let state = QuarantineState::load(dir.path());
401        ensure(
402            state.len() == 1,
403            format!(
404                "legacy entry must load without error; loaded {} entries",
405                state.len()
406            ),
407        )?;
408
409        let record = state
410            .entries
411            .values()
412            .next()
413            .ok_or_else(|| test_error("entry present after loading legacy fixture"))?;
414
415        ensure(
416            record.cass_version_at_quarantine.is_none(),
417            "missing field must deserialise as None, not cause an error",
418        )?;
419
420        // The critical correctness property: a legacy entry (None version) is
421        // ALWAYS retry-eligible regardless of what the current binary version is.
422        ensure(
423            record.is_version_stale_for_retry("0.6.6"),
424            "legacy entry with cass_version_at_quarantine=None must be retry-eligible \
425             (cass#258 carry-over: v0.5.x entries were silently orphaned)",
426        )?;
427        ensure(
428            record.is_version_stale_for_retry("0.5.1"),
429            "legacy entry must be retry-eligible even when version string matches a v0.5.x tag",
430        )?;
431        ensure(
432            record.is_version_stale_for_retry("99.0.0"),
433            "legacy entry must be retry-eligible for any future version string",
434        )?;
435        Ok(())
436    }
437
438    #[test]
439    fn versioned_entry_retry_eligibility_gates_correctly() -> TestResult {
440        let current = current_cass_version();
441        let mut state = QuarantineState::default();
442        state.record_attempt(
443            &QuarantineKey::new("conv-v", 1),
444            "index-ingest-out-of-memory",
445            ts(1),
446        );
447        // record_attempt always stamps cass_version_at_quarantine with the
448        // current binary version; simulate "same version" by leaving as-is.
449        let record = state
450            .entries
451            .values()
452            .next()
453            .ok_or_else(|| test_error("same-version quarantine record exists"))?;
454        // A record stamped with the current version must NOT trigger a retry
455        // (already tried under this binary).
456        ensure(
457            !record.is_version_stale_for_retry(current),
458            "record stamped with current version must not be retry-eligible",
459        )?;
460
461        // Simulate a record written by an older version.
462        let mut state2 = QuarantineState::default();
463        state2.record_attempt(
464            &QuarantineKey::new("conv-old", 1),
465            "index-ingest-out-of-memory",
466            ts(2),
467        );
468        state2
469            .entries
470            .values_mut()
471            .next()
472            .ok_or_else(|| test_error("old-version quarantine record exists"))?
473            .cass_version_at_quarantine = Some("0.5.1".to_string());
474        let old_record = state2
475            .entries
476            .values()
477            .next()
478            .ok_or_else(|| test_error("old-version quarantine record still exists"))?;
479        ensure(
480            old_record.is_version_stale_for_retry(current),
481            "record stamped with older version must be retry-eligible after a version bump",
482        )?;
483        Ok(())
484    }
485}