Skip to main content

coding_agent_search/indexer/
quarantine.rs

1//! Single-session streaming-OOM quarantine state (#243).
2//!
3//! When `cass index` (non-watch) encounters an irreducible streaming
4//! OOM on a single conversation after deferred-lexical retry, the
5//! policy is **quarantine-and-continue**: record the poison session,
6//! advance the refresh for the rest of the corpus, and surface
7//! `quarantined_conversations=N` so operators see it.
8//!
9//! The critical correctness property is **deduplication by conversation
10//! identity**, not by occurrence: every refresh tick that hits the same
11//! poison session must update the existing record's `last_attempt_at`
12//! and `attempt_count`, not append a fresh entry. Without that, repeated
13//! refreshes on a hot poison session would unbounded-grow the
14//! quarantine state file and obscure which sessions are genuinely new
15//! failures.
16//!
17//! Storage format: `<data_dir>/quarantine_state.json`, an object keyed
18//! by `(conversation_id, schema_version)` so a schema bump that might
19//! make a previously-poison session indexable again produces a fresh
20//! quarantine record rather than coalescing with the stale one.
21
22use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27
28/// Identity of a quarantined conversation, used as the dedup key.
29///
30/// `schema_version` is folded in so a future schema bump that changes
31/// streaming-consumer memory pressure (e.g. a new message-format
32/// encoding that no longer OOMs on the same conversation) produces a
33/// fresh attempt rather than perpetually inheriting the prior verdict.
34#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct QuarantineKey {
36    pub conversation_id: String,
37    pub schema_version: u32,
38}
39
40impl QuarantineKey {
41    #[must_use]
42    pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
43        Self {
44            conversation_id: conversation_id.into(),
45            schema_version,
46        }
47    }
48
49    fn storage_key(&self) -> String {
50        format!("{}::v{}", self.conversation_id, self.schema_version)
51    }
52
53    fn parse_storage_key(key: &str) -> Option<Self> {
54        let (conversation_id, version_part) = key.rsplit_once("::v")?;
55        let schema_version: u32 = version_part.parse().ok()?;
56        Some(Self {
57            conversation_id: conversation_id.to_string(),
58            schema_version,
59        })
60    }
61}
62
63/// One quarantine record, identified by [`QuarantineKey`].
64///
65/// `first_attempt_at` is preserved across repeated refreshes; only
66/// `last_attempt_at`, `attempt_count`, and `last_reason` advance on
67/// each occurrence. This is the contract that prevents the
68/// "appending duplicate quarantine records" anti-pattern flagged on
69/// #243.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct QuarantineRecord {
72    pub first_attempt_at: DateTime<Utc>,
73    pub last_attempt_at: DateTime<Utc>,
74    pub attempt_count: u64,
75    pub last_reason: String,
76}
77
78/// In-memory view of the quarantine state file. Use [`QuarantineState::load`]
79/// to read, [`QuarantineState::record_attempt`] / [`QuarantineState::clear`]
80/// to mutate, and [`QuarantineState::save`] to atomically persist.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct QuarantineState {
83    /// Storage version of the file format itself (not the schema_version
84    /// inside the keys). Bumped when the on-disk shape changes.
85    #[serde(default = "default_storage_version")]
86    pub storage_version: u32,
87    /// Keyed by `QuarantineKey::storage_key()` for stable JSON ordering.
88    pub entries: BTreeMap<String, QuarantineRecord>,
89}
90
91fn default_storage_version() -> u32 {
92    1
93}
94
95impl Default for QuarantineState {
96    fn default() -> Self {
97        Self {
98            storage_version: default_storage_version(),
99            entries: BTreeMap::new(),
100        }
101    }
102}
103
104impl QuarantineState {
105    /// Filename used inside `data_dir`. Stable on disk so external
106    /// tooling can locate the state.
107    pub const FILENAME: &'static str = "quarantine_state.json";
108
109    #[must_use]
110    pub fn path(data_dir: &Path) -> PathBuf {
111        data_dir.join(Self::FILENAME)
112    }
113
114    /// Load the quarantine state from `<data_dir>/quarantine_state.json`.
115    /// Returns an empty state when the file is missing or malformed —
116    /// quarantine is best-effort metadata and an unreadable state file
117    /// must not block indexing.
118    #[must_use]
119    pub fn load(data_dir: &Path) -> Self {
120        let path = Self::path(data_dir);
121        let Ok(text) = std::fs::read_to_string(&path) else {
122            return Self {
123                storage_version: 1,
124                entries: BTreeMap::new(),
125            };
126        };
127        match serde_json::from_str::<Self>(&text) {
128            Ok(state) => state,
129            Err(_) => Self {
130                storage_version: 1,
131                entries: BTreeMap::new(),
132            },
133        }
134    }
135
136    /// Atomically write the quarantine state to disk via temp file + rename,
137    /// so partial writes can never produce a corrupt quarantine_state.json.
138    pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
139        std::fs::create_dir_all(data_dir)?;
140        let final_path = Self::path(data_dir);
141        let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
142        let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
143        std::fs::write(&tmp_path, json)?;
144        std::fs::rename(&tmp_path, &final_path)?;
145        Ok(())
146    }
147
148    /// Record an attempt that failed irreducibly on `key`. If the key
149    /// already exists, the existing record is **updated in place**
150    /// (`last_attempt_at`, `attempt_count`, `last_reason`) rather than
151    /// appended — this is the dedup contract from #243.
152    pub fn record_attempt(
153        &mut self,
154        key: &QuarantineKey,
155        reason: impl Into<String>,
156        now: DateTime<Utc>,
157    ) {
158        let reason = reason.into();
159        let storage_key = key.storage_key();
160        if let Some(record) = self.entries.get_mut(&storage_key) {
161            record.last_attempt_at = now;
162            record.attempt_count = record.attempt_count.saturating_add(1);
163            record.last_reason = reason;
164        } else {
165            self.entries.insert(
166                storage_key,
167                QuarantineRecord {
168                    first_attempt_at: now,
169                    last_attempt_at: now,
170                    attempt_count: 1,
171                    last_reason: reason,
172                },
173            );
174        }
175    }
176
177    /// Remove a quarantine entry. Called by `cass quarantine clear`
178    /// after the operator has confirmed the underlying issue is
179    /// resolved (e.g. a memory bump on the streaming consumer, a
180    /// schema fix, or accepting the loss).
181    pub fn clear(&mut self, key: &QuarantineKey) -> bool {
182        self.entries.remove(&key.storage_key()).is_some()
183    }
184
185    /// Number of currently-quarantined `(conversation_id, schema_version)`
186    /// keys. This is what `cass health` and the indexer JSON envelope
187    /// surface as `quarantined_conversations`.
188    #[must_use]
189    pub fn len(&self) -> usize {
190        self.entries.len()
191    }
192
193    #[must_use]
194    pub fn is_empty(&self) -> bool {
195        self.entries.is_empty()
196    }
197
198    /// Iterate over `(key, record)` pairs in deterministic order.
199    pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
200        self.entries.iter().filter_map(|(storage_key, record)| {
201            QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
202        })
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use tempfile::tempdir;
210
211    fn ts(seconds: i64) -> DateTime<Utc> {
212        DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
213    }
214
215    #[test]
216    fn record_attempt_dedups_by_conversation_and_schema_version() {
217        let mut state = QuarantineState::default();
218        assert_eq!(state.storage_version, 1);
219        let key = QuarantineKey::new("conv-a", 3);
220        state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
221        state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
222        state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
223
224        assert_eq!(state.len(), 1, "same key must dedup, not append");
225        let record = state
226            .entries
227            .get(&key.storage_key())
228            .expect("entry present");
229        assert_eq!(
230            record.first_attempt_at,
231            ts(1_700_000_000),
232            "first attempt preserved"
233        );
234        assert_eq!(
235            record.last_attempt_at,
236            ts(1_700_002_000),
237            "last attempt advances"
238        );
239        assert_eq!(record.attempt_count, 3);
240        assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
241    }
242
243    #[test]
244    fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
245        let mut state = QuarantineState::default();
246        let v3 = QuarantineKey::new("conv-a", 3);
247        let v4 = QuarantineKey::new("conv-a", 4);
248        state.record_attempt(&v3, "oom v3", ts(1));
249        state.record_attempt(&v4, "oom v4", ts(2));
250        assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
251    }
252
253    #[test]
254    fn save_and_load_roundtrips_quarantine_state() {
255        let dir = tempdir().unwrap();
256        let mut state = QuarantineState::default();
257        state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
258        state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
259        state.save(dir.path()).expect("save");
260
261        let loaded = QuarantineState::load(dir.path());
262        assert_eq!(loaded.len(), 2);
263        let r1 = loaded
264            .entries
265            .get(&QuarantineKey::new("c1", 1).storage_key())
266            .unwrap();
267        assert_eq!(r1.last_reason, "r1");
268    }
269
270    #[test]
271    fn load_returns_empty_for_missing_or_malformed_file() {
272        let dir = tempdir().unwrap();
273        let loaded = QuarantineState::load(dir.path());
274        assert!(loaded.is_empty());
275
276        std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
277            .expect("write malformed");
278        let loaded = QuarantineState::load(dir.path());
279        assert!(loaded.is_empty(), "malformed file must not block indexing");
280    }
281
282    #[test]
283    fn clear_removes_entry() {
284        let mut state = QuarantineState::default();
285        let key = QuarantineKey::new("c", 1);
286        state.record_attempt(&key, "r", ts(1));
287        assert!(state.clear(&key));
288        assert!(state.is_empty());
289        assert!(!state.clear(&key), "clearing absent key returns false");
290    }
291
292    #[test]
293    fn save_uses_atomic_rename_via_tmp_file() {
294        let dir = tempdir().unwrap();
295        let mut state = QuarantineState::default();
296        state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
297        state.save(dir.path()).expect("save");
298
299        // The tmp file must not be left behind after a successful save.
300        let tmp_path = dir
301            .path()
302            .join(format!("{}.tmp", QuarantineState::FILENAME));
303        assert!(
304            !tmp_path.exists(),
305            "tmp file must be renamed away on success"
306        );
307        assert!(QuarantineState::path(dir.path()).exists());
308    }
309
310    #[test]
311    fn iter_yields_keys_in_deterministic_order() {
312        let mut state = QuarantineState::default();
313        state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
314        state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
315        let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
316        // BTreeMap-backed: ordering is by storage_key, which sorts c1 before c2.
317        assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
318    }
319}