coding_agent_search/indexer/
quarantine.rs1use std::collections::BTreeMap;
23use std::path::{Path, PathBuf};
24
25use chrono::{DateTime, Utc};
26use serde::{Deserialize, Serialize};
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct QuarantineKey {
36 pub conversation_id: String,
37 pub schema_version: u32,
38}
39
40impl QuarantineKey {
41 #[must_use]
42 pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
43 Self {
44 conversation_id: conversation_id.into(),
45 schema_version,
46 }
47 }
48
49 fn storage_key(&self) -> String {
50 format!("{}::v{}", self.conversation_id, self.schema_version)
51 }
52
53 fn parse_storage_key(key: &str) -> Option<Self> {
54 let (conversation_id, version_part) = key.rsplit_once("::v")?;
55 let schema_version: u32 = version_part.parse().ok()?;
56 Some(Self {
57 conversation_id: conversation_id.to_string(),
58 schema_version,
59 })
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct QuarantineRecord {
72 pub first_attempt_at: DateTime<Utc>,
73 pub last_attempt_at: DateTime<Utc>,
74 pub attempt_count: u64,
75 pub last_reason: String,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct QuarantineState {
83 #[serde(default = "default_storage_version")]
86 pub storage_version: u32,
87 pub entries: BTreeMap<String, QuarantineRecord>,
89}
90
91fn default_storage_version() -> u32 {
92 1
93}
94
95impl Default for QuarantineState {
96 fn default() -> Self {
97 Self {
98 storage_version: default_storage_version(),
99 entries: BTreeMap::new(),
100 }
101 }
102}
103
104impl QuarantineState {
105 pub const FILENAME: &'static str = "quarantine_state.json";
108
109 #[must_use]
110 pub fn path(data_dir: &Path) -> PathBuf {
111 data_dir.join(Self::FILENAME)
112 }
113
114 #[must_use]
119 pub fn load(data_dir: &Path) -> Self {
120 let path = Self::path(data_dir);
121 let Ok(text) = std::fs::read_to_string(&path) else {
122 return Self {
123 storage_version: 1,
124 entries: BTreeMap::new(),
125 };
126 };
127 match serde_json::from_str::<Self>(&text) {
128 Ok(state) => state,
129 Err(_) => Self {
130 storage_version: 1,
131 entries: BTreeMap::new(),
132 },
133 }
134 }
135
136 pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
139 std::fs::create_dir_all(data_dir)?;
140 let final_path = Self::path(data_dir);
141 let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
142 let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
143 std::fs::write(&tmp_path, json)?;
144 std::fs::rename(&tmp_path, &final_path)?;
145 Ok(())
146 }
147
148 pub fn record_attempt(
153 &mut self,
154 key: &QuarantineKey,
155 reason: impl Into<String>,
156 now: DateTime<Utc>,
157 ) {
158 let reason = reason.into();
159 let storage_key = key.storage_key();
160 if let Some(record) = self.entries.get_mut(&storage_key) {
161 record.last_attempt_at = now;
162 record.attempt_count = record.attempt_count.saturating_add(1);
163 record.last_reason = reason;
164 } else {
165 self.entries.insert(
166 storage_key,
167 QuarantineRecord {
168 first_attempt_at: now,
169 last_attempt_at: now,
170 attempt_count: 1,
171 last_reason: reason,
172 },
173 );
174 }
175 }
176
177 pub fn clear(&mut self, key: &QuarantineKey) -> bool {
182 self.entries.remove(&key.storage_key()).is_some()
183 }
184
185 #[must_use]
189 pub fn len(&self) -> usize {
190 self.entries.len()
191 }
192
193 #[must_use]
194 pub fn is_empty(&self) -> bool {
195 self.entries.is_empty()
196 }
197
198 pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
200 self.entries.iter().filter_map(|(storage_key, record)| {
201 QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
202 })
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use tempfile::tempdir;
210
211 fn ts(seconds: i64) -> DateTime<Utc> {
212 DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
213 }
214
215 #[test]
216 fn record_attempt_dedups_by_conversation_and_schema_version() {
217 let mut state = QuarantineState::default();
218 assert_eq!(state.storage_version, 1);
219 let key = QuarantineKey::new("conv-a", 3);
220 state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
221 state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
222 state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
223
224 assert_eq!(state.len(), 1, "same key must dedup, not append");
225 let record = state
226 .entries
227 .get(&key.storage_key())
228 .expect("entry present");
229 assert_eq!(
230 record.first_attempt_at,
231 ts(1_700_000_000),
232 "first attempt preserved"
233 );
234 assert_eq!(
235 record.last_attempt_at,
236 ts(1_700_002_000),
237 "last attempt advances"
238 );
239 assert_eq!(record.attempt_count, 3);
240 assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
241 }
242
243 #[test]
244 fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
245 let mut state = QuarantineState::default();
246 let v3 = QuarantineKey::new("conv-a", 3);
247 let v4 = QuarantineKey::new("conv-a", 4);
248 state.record_attempt(&v3, "oom v3", ts(1));
249 state.record_attempt(&v4, "oom v4", ts(2));
250 assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
251 }
252
253 #[test]
254 fn save_and_load_roundtrips_quarantine_state() {
255 let dir = tempdir().unwrap();
256 let mut state = QuarantineState::default();
257 state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
258 state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
259 state.save(dir.path()).expect("save");
260
261 let loaded = QuarantineState::load(dir.path());
262 assert_eq!(loaded.len(), 2);
263 let r1 = loaded
264 .entries
265 .get(&QuarantineKey::new("c1", 1).storage_key())
266 .unwrap();
267 assert_eq!(r1.last_reason, "r1");
268 }
269
270 #[test]
271 fn load_returns_empty_for_missing_or_malformed_file() {
272 let dir = tempdir().unwrap();
273 let loaded = QuarantineState::load(dir.path());
274 assert!(loaded.is_empty());
275
276 std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
277 .expect("write malformed");
278 let loaded = QuarantineState::load(dir.path());
279 assert!(loaded.is_empty(), "malformed file must not block indexing");
280 }
281
282 #[test]
283 fn clear_removes_entry() {
284 let mut state = QuarantineState::default();
285 let key = QuarantineKey::new("c", 1);
286 state.record_attempt(&key, "r", ts(1));
287 assert!(state.clear(&key));
288 assert!(state.is_empty());
289 assert!(!state.clear(&key), "clearing absent key returns false");
290 }
291
292 #[test]
293 fn save_uses_atomic_rename_via_tmp_file() {
294 let dir = tempdir().unwrap();
295 let mut state = QuarantineState::default();
296 state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
297 state.save(dir.path()).expect("save");
298
299 let tmp_path = dir
301 .path()
302 .join(format!("{}.tmp", QuarantineState::FILENAME));
303 assert!(
304 !tmp_path.exists(),
305 "tmp file must be renamed away on success"
306 );
307 assert!(QuarantineState::path(dir.path()).exists());
308 }
309
310 #[test]
311 fn iter_yields_keys_in_deterministic_order() {
312 let mut state = QuarantineState::default();
313 state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
314 state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
315 let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
316 assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
318 }
319}