Skip to main content

coding_agent_search/indexer/
quarantine.rs

1//! Single-session streaming-OOM quarantine state (#243).
2//!
3//! When `cass index` (non-watch) encounters an irreducible streaming
4//! OOM on a single conversation after deferred-lexical retry, the
5//! policy is **quarantine-and-continue**: record the poison session,
6//! advance the refresh for the rest of the corpus, and surface
7//! `quarantined_conversations=N` so operators see it.
8//!
9//! The critical correctness property is **deduplication by conversation
10//! identity**, not by occurrence: every refresh tick that hits the same
11//! poison session must update the existing record's `last_attempt_at`
12//! and `attempt_count`, not append a fresh entry. Without that, repeated
13//! refreshes on a hot poison session would unbounded-grow the
14//! quarantine state file and obscure which sessions are genuinely new
15//! failures.
16//!
17//! Storage format: `<data_dir>/quarantine_state.json`, an object keyed
18//! by `(conversation_id, schema_version)` so a schema bump that might
19//! make a previously-poison session indexable again produces a fresh
20//! quarantine record rather than coalescing with the stale one.
21
22use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27
28/// Identity of a quarantined conversation, used as the dedup key.
29///
30/// `schema_version` is folded in so a future schema bump that changes
31/// streaming-consumer memory pressure (e.g. a new message-format
32/// encoding that no longer OOMs on the same conversation) produces a
33/// fresh attempt rather than perpetually inheriting the prior verdict.
34#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct QuarantineKey {
36    pub conversation_id: String,
37    pub schema_version: u32,
38}
39
40impl QuarantineKey {
41    #[must_use]
42    pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
43        Self {
44            conversation_id: conversation_id.into(),
45            schema_version,
46        }
47    }
48
49    fn storage_key(&self) -> String {
50        format!("{}::v{}", self.conversation_id, self.schema_version)
51    }
52
53    fn parse_storage_key(key: &str) -> Option<Self> {
54        let (conversation_id, version_part) = key.rsplit_once("::v")?;
55        let schema_version: u32 = version_part.parse().ok()?;
56        Some(Self {
57            conversation_id: conversation_id.to_string(),
58            schema_version,
59        })
60    }
61}
62
63/// One quarantine record, identified by [`QuarantineKey`].
64///
65/// `first_attempt_at` is preserved across repeated refreshes; only
66/// `last_attempt_at`, `attempt_count`, and `last_reason` advance on
67/// each occurrence. This is the contract that prevents the
68/// "appending duplicate quarantine records" anti-pattern flagged on
69/// #243.
70#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct QuarantineRecord {
72    pub first_attempt_at: DateTime<Utc>,
73    pub last_attempt_at: DateTime<Utc>,
74    pub attempt_count: u64,
75    pub last_reason: String,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub cass_version_at_quarantine: Option<String>,
78}
79
80/// In-memory view of the quarantine state file. Use [`QuarantineState::load`]
81/// to read, [`QuarantineState::record_attempt`] / [`QuarantineState::clear`]
82/// to mutate, and [`QuarantineState::save`] to atomically persist.
83#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct QuarantineState {
85    /// Storage version of the file format itself (not the schema_version
86    /// inside the keys). Bumped when the on-disk shape changes.
87    #[serde(default = "default_storage_version")]
88    pub storage_version: u32,
89    /// Keyed by `QuarantineKey::storage_key()` for stable JSON ordering.
90    pub entries: BTreeMap<String, QuarantineRecord>,
91}
92
93fn default_storage_version() -> u32 {
94    1
95}
96
97impl Default for QuarantineState {
98    fn default() -> Self {
99        Self {
100            storage_version: default_storage_version(),
101            entries: BTreeMap::new(),
102        }
103    }
104}
105
106impl QuarantineState {
107    /// Filename used inside `data_dir`. Stable on disk so external
108    /// tooling can locate the state.
109    pub const FILENAME: &'static str = "quarantine_state.json";
110
111    #[must_use]
112    pub fn path(data_dir: &Path) -> PathBuf {
113        data_dir.join(Self::FILENAME)
114    }
115
116    /// Load the quarantine state from `<data_dir>/quarantine_state.json`.
117    /// Returns an empty state when the file is missing or malformed —
118    /// quarantine is best-effort metadata and an unreadable state file
119    /// must not block indexing.
120    #[must_use]
121    pub fn load(data_dir: &Path) -> Self {
122        let path = Self::path(data_dir);
123        let Ok(text) = std::fs::read_to_string(&path) else {
124            return Self {
125                storage_version: 1,
126                entries: BTreeMap::new(),
127            };
128        };
129        match serde_json::from_str::<Self>(&text) {
130            Ok(state) => state,
131            Err(_) => Self {
132                storage_version: 1,
133                entries: BTreeMap::new(),
134            },
135        }
136    }
137
138    /// Atomically write the quarantine state to disk via temp file + rename,
139    /// so partial writes can never produce a corrupt quarantine_state.json.
140    pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
141        std::fs::create_dir_all(data_dir)?;
142        let final_path = Self::path(data_dir);
143        let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
144        let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
145        std::fs::write(&tmp_path, json)?;
146        std::fs::rename(&tmp_path, &final_path)?;
147        Ok(())
148    }
149
150    /// Record an attempt that failed irreducibly on `key`. If the key
151    /// already exists, the existing record is **updated in place**
152    /// (`last_attempt_at`, `attempt_count`, `last_reason`) rather than
153    /// appended — this is the dedup contract from #243.
154    pub fn record_attempt(
155        &mut self,
156        key: &QuarantineKey,
157        reason: impl Into<String>,
158        now: DateTime<Utc>,
159    ) {
160        let reason = reason.into();
161        let storage_key = key.storage_key();
162        if let Some(record) = self.entries.get_mut(&storage_key) {
163            record.last_attempt_at = now;
164            record.attempt_count = record.attempt_count.saturating_add(1);
165            record.last_reason = reason;
166            record.cass_version_at_quarantine = Some(current_cass_version().to_string());
167        } else {
168            self.entries.insert(
169                storage_key,
170                QuarantineRecord {
171                    first_attempt_at: now,
172                    last_attempt_at: now,
173                    attempt_count: 1,
174                    last_reason: reason,
175                    cass_version_at_quarantine: Some(current_cass_version().to_string()),
176                },
177            );
178        }
179    }
180
181    /// Remove a quarantine entry. Called by `cass quarantine clear`
182    /// after the operator has confirmed the underlying issue is
183    /// resolved (e.g. a memory bump on the streaming consumer, a
184    /// schema fix, or accepting the loss).
185    pub fn clear(&mut self, key: &QuarantineKey) -> bool {
186        self.entries.remove(&key.storage_key()).is_some()
187    }
188
189    /// Number of currently-quarantined `(conversation_id, schema_version)`
190    /// keys. This is what `cass health` and the indexer JSON envelope
191    /// surface as `quarantined_conversations`.
192    #[must_use]
193    pub fn len(&self) -> usize {
194        self.entries.len()
195    }
196
197    #[must_use]
198    pub fn is_empty(&self) -> bool {
199        self.entries.is_empty()
200    }
201
202    /// Iterate over `(key, record)` pairs in deterministic order.
203    pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
204        self.entries.iter().filter_map(|(storage_key, record)| {
205            QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
206        })
207    }
208}
209
210fn current_cass_version() -> &'static str {
211    env!("CARGO_PKG_VERSION")
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use tempfile::tempdir;
218
219    fn ts(seconds: i64) -> DateTime<Utc> {
220        DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
221    }
222
223    #[test]
224    fn record_attempt_dedups_by_conversation_and_schema_version() {
225        let mut state = QuarantineState::default();
226        assert_eq!(state.storage_version, 1);
227        let key = QuarantineKey::new("conv-a", 3);
228        state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
229        state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
230        state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
231
232        assert_eq!(state.len(), 1, "same key must dedup, not append");
233        let record = state
234            .entries
235            .get(&key.storage_key())
236            .expect("entry present");
237        assert_eq!(
238            record.first_attempt_at,
239            ts(1_700_000_000),
240            "first attempt preserved"
241        );
242        assert_eq!(
243            record.last_attempt_at,
244            ts(1_700_002_000),
245            "last attempt advances"
246        );
247        assert_eq!(record.attempt_count, 3);
248        assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
249    }
250
251    #[test]
252    fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
253        let mut state = QuarantineState::default();
254        let v3 = QuarantineKey::new("conv-a", 3);
255        let v4 = QuarantineKey::new("conv-a", 4);
256        state.record_attempt(&v3, "oom v3", ts(1));
257        state.record_attempt(&v4, "oom v4", ts(2));
258        assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
259    }
260
261    #[test]
262    fn save_and_load_roundtrips_quarantine_state() {
263        let dir = tempdir().unwrap();
264        let mut state = QuarantineState::default();
265        state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
266        state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
267        state.save(dir.path()).expect("save");
268
269        let loaded = QuarantineState::load(dir.path());
270        assert_eq!(loaded.len(), 2);
271        let r1 = loaded
272            .entries
273            .get(&QuarantineKey::new("c1", 1).storage_key())
274            .unwrap();
275        assert_eq!(r1.last_reason, "r1");
276    }
277
278    #[test]
279    fn load_returns_empty_for_missing_or_malformed_file() {
280        let dir = tempdir().unwrap();
281        let loaded = QuarantineState::load(dir.path());
282        assert!(loaded.is_empty());
283
284        std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
285            .expect("write malformed");
286        let loaded = QuarantineState::load(dir.path());
287        assert!(loaded.is_empty(), "malformed file must not block indexing");
288    }
289
290    #[test]
291    fn clear_removes_entry() {
292        let mut state = QuarantineState::default();
293        let key = QuarantineKey::new("c", 1);
294        state.record_attempt(&key, "r", ts(1));
295        assert!(state.clear(&key));
296        assert!(state.is_empty());
297        assert!(!state.clear(&key), "clearing absent key returns false");
298    }
299
300    #[test]
301    fn save_uses_atomic_rename_via_tmp_file() {
302        let dir = tempdir().unwrap();
303        let mut state = QuarantineState::default();
304        state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
305        state.save(dir.path()).expect("save");
306
307        // The tmp file must not be left behind after a successful save.
308        let tmp_path = dir
309            .path()
310            .join(format!("{}.tmp", QuarantineState::FILENAME));
311        assert!(
312            !tmp_path.exists(),
313            "tmp file must be renamed away on success"
314        );
315        assert!(QuarantineState::path(dir.path()).exists());
316    }
317
318    #[test]
319    fn iter_yields_keys_in_deterministic_order() {
320        let mut state = QuarantineState::default();
321        state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
322        state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
323        let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
324        // BTreeMap-backed: ordering is by storage_key, which sorts c1 before c2.
325        assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
326    }
327}