coding_agent_search/indexer/
quarantine.rs1use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct QuarantineKey {
36 pub conversation_id: String,
37 pub schema_version: u32,
38}
39
40impl QuarantineKey {
41 #[must_use]
42 pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
43 Self {
44 conversation_id: conversation_id.into(),
45 schema_version,
46 }
47 }
48
49 fn storage_key(&self) -> String {
50 format!("{}::v{}", self.conversation_id, self.schema_version)
51 }
52
53 fn parse_storage_key(key: &str) -> Option<Self> {
54 let (conversation_id, version_part) = key.rsplit_once("::v")?;
55 let schema_version: u32 = version_part.parse().ok()?;
56 Some(Self {
57 conversation_id: conversation_id.to_string(),
58 schema_version,
59 })
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct QuarantineRecord {
72 pub first_attempt_at: DateTime<Utc>,
73 pub last_attempt_at: DateTime<Utc>,
74 pub attempt_count: u64,
75 pub last_reason: String,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub cass_version_at_quarantine: Option<String>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct QuarantineState {
85 #[serde(default = "default_storage_version")]
88 pub storage_version: u32,
89 pub entries: BTreeMap<String, QuarantineRecord>,
91}
92
93fn default_storage_version() -> u32 {
94 1
95}
96
97impl Default for QuarantineState {
98 fn default() -> Self {
99 Self {
100 storage_version: default_storage_version(),
101 entries: BTreeMap::new(),
102 }
103 }
104}
105
106impl QuarantineState {
107 pub const FILENAME: &'static str = "quarantine_state.json";
110
111 #[must_use]
112 pub fn path(data_dir: &Path) -> PathBuf {
113 data_dir.join(Self::FILENAME)
114 }
115
116 #[must_use]
121 pub fn load(data_dir: &Path) -> Self {
122 let path = Self::path(data_dir);
123 let Ok(text) = std::fs::read_to_string(&path) else {
124 return Self {
125 storage_version: 1,
126 entries: BTreeMap::new(),
127 };
128 };
129 match serde_json::from_str::<Self>(&text) {
130 Ok(state) => state,
131 Err(_) => Self {
132 storage_version: 1,
133 entries: BTreeMap::new(),
134 },
135 }
136 }
137
138 pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
141 std::fs::create_dir_all(data_dir)?;
142 let final_path = Self::path(data_dir);
143 let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
144 let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
145 std::fs::write(&tmp_path, json)?;
146 std::fs::rename(&tmp_path, &final_path)?;
147 Ok(())
148 }
149
150 pub fn record_attempt(
155 &mut self,
156 key: &QuarantineKey,
157 reason: impl Into<String>,
158 now: DateTime<Utc>,
159 ) {
160 let reason = reason.into();
161 let storage_key = key.storage_key();
162 if let Some(record) = self.entries.get_mut(&storage_key) {
163 record.last_attempt_at = now;
164 record.attempt_count = record.attempt_count.saturating_add(1);
165 record.last_reason = reason;
166 record.cass_version_at_quarantine = Some(current_cass_version().to_string());
167 } else {
168 self.entries.insert(
169 storage_key,
170 QuarantineRecord {
171 first_attempt_at: now,
172 last_attempt_at: now,
173 attempt_count: 1,
174 last_reason: reason,
175 cass_version_at_quarantine: Some(current_cass_version().to_string()),
176 },
177 );
178 }
179 }
180
181 pub fn clear(&mut self, key: &QuarantineKey) -> bool {
186 self.entries.remove(&key.storage_key()).is_some()
187 }
188
189 #[must_use]
193 pub fn len(&self) -> usize {
194 self.entries.len()
195 }
196
197 #[must_use]
198 pub fn is_empty(&self) -> bool {
199 self.entries.is_empty()
200 }
201
202 pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
204 self.entries.iter().filter_map(|(storage_key, record)| {
205 QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
206 })
207 }
208}
209
210fn current_cass_version() -> &'static str {
211 env!("CARGO_PKG_VERSION")
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use tempfile::tempdir;
218
219 fn ts(seconds: i64) -> DateTime<Utc> {
220 DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
221 }
222
223 #[test]
224 fn record_attempt_dedups_by_conversation_and_schema_version() {
225 let mut state = QuarantineState::default();
226 assert_eq!(state.storage_version, 1);
227 let key = QuarantineKey::new("conv-a", 3);
228 state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
229 state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
230 state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
231
232 assert_eq!(state.len(), 1, "same key must dedup, not append");
233 let record = state
234 .entries
235 .get(&key.storage_key())
236 .expect("entry present");
237 assert_eq!(
238 record.first_attempt_at,
239 ts(1_700_000_000),
240 "first attempt preserved"
241 );
242 assert_eq!(
243 record.last_attempt_at,
244 ts(1_700_002_000),
245 "last attempt advances"
246 );
247 assert_eq!(record.attempt_count, 3);
248 assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
249 }
250
251 #[test]
252 fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
253 let mut state = QuarantineState::default();
254 let v3 = QuarantineKey::new("conv-a", 3);
255 let v4 = QuarantineKey::new("conv-a", 4);
256 state.record_attempt(&v3, "oom v3", ts(1));
257 state.record_attempt(&v4, "oom v4", ts(2));
258 assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
259 }
260
261 #[test]
262 fn save_and_load_roundtrips_quarantine_state() {
263 let dir = tempdir().unwrap();
264 let mut state = QuarantineState::default();
265 state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
266 state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
267 state.save(dir.path()).expect("save");
268
269 let loaded = QuarantineState::load(dir.path());
270 assert_eq!(loaded.len(), 2);
271 let r1 = loaded
272 .entries
273 .get(&QuarantineKey::new("c1", 1).storage_key())
274 .unwrap();
275 assert_eq!(r1.last_reason, "r1");
276 }
277
278 #[test]
279 fn load_returns_empty_for_missing_or_malformed_file() {
280 let dir = tempdir().unwrap();
281 let loaded = QuarantineState::load(dir.path());
282 assert!(loaded.is_empty());
283
284 std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
285 .expect("write malformed");
286 let loaded = QuarantineState::load(dir.path());
287 assert!(loaded.is_empty(), "malformed file must not block indexing");
288 }
289
290 #[test]
291 fn clear_removes_entry() {
292 let mut state = QuarantineState::default();
293 let key = QuarantineKey::new("c", 1);
294 state.record_attempt(&key, "r", ts(1));
295 assert!(state.clear(&key));
296 assert!(state.is_empty());
297 assert!(!state.clear(&key), "clearing absent key returns false");
298 }
299
300 #[test]
301 fn save_uses_atomic_rename_via_tmp_file() {
302 let dir = tempdir().unwrap();
303 let mut state = QuarantineState::default();
304 state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
305 state.save(dir.path()).expect("save");
306
307 let tmp_path = dir
309 .path()
310 .join(format!("{}.tmp", QuarantineState::FILENAME));
311 assert!(
312 !tmp_path.exists(),
313 "tmp file must be renamed away on success"
314 );
315 assert!(QuarantineState::path(dir.path()).exists());
316 }
317
318 #[test]
319 fn iter_yields_keys_in_deterministic_order() {
320 let mut state = QuarantineState::default();
321 state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
322 state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
323 let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
324 assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
326 }
327}