use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct QuarantineKey {
pub conversation_id: String,
pub schema_version: u32,
}
impl QuarantineKey {
#[must_use]
pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
Self {
conversation_id: conversation_id.into(),
schema_version,
}
}
fn storage_key(&self) -> String {
format!("{}::v{}", self.conversation_id, self.schema_version)
}
fn parse_storage_key(key: &str) -> Option<Self> {
let (conversation_id, version_part) = key.rsplit_once("::v")?;
let schema_version: u32 = version_part.parse().ok()?;
Some(Self {
conversation_id: conversation_id.to_string(),
schema_version,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QuarantineRecord {
pub first_attempt_at: DateTime<Utc>,
pub last_attempt_at: DateTime<Utc>,
pub attempt_count: u64,
pub last_reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuarantineState {
#[serde(default = "default_storage_version")]
pub storage_version: u32,
pub entries: BTreeMap<String, QuarantineRecord>,
}
fn default_storage_version() -> u32 {
1
}
impl Default for QuarantineState {
fn default() -> Self {
Self {
storage_version: default_storage_version(),
entries: BTreeMap::new(),
}
}
}
impl QuarantineState {
pub const FILENAME: &'static str = "quarantine_state.json";
#[must_use]
pub fn path(data_dir: &Path) -> PathBuf {
data_dir.join(Self::FILENAME)
}
#[must_use]
pub fn load(data_dir: &Path) -> Self {
let path = Self::path(data_dir);
let Ok(text) = std::fs::read_to_string(&path) else {
return Self {
storage_version: 1,
entries: BTreeMap::new(),
};
};
match serde_json::from_str::<Self>(&text) {
Ok(state) => state,
Err(_) => Self {
storage_version: 1,
entries: BTreeMap::new(),
},
}
}
pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
std::fs::create_dir_all(data_dir)?;
let final_path = Self::path(data_dir);
let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
std::fs::write(&tmp_path, json)?;
std::fs::rename(&tmp_path, &final_path)?;
Ok(())
}
pub fn record_attempt(
&mut self,
key: &QuarantineKey,
reason: impl Into<String>,
now: DateTime<Utc>,
) {
let reason = reason.into();
let storage_key = key.storage_key();
if let Some(record) = self.entries.get_mut(&storage_key) {
record.last_attempt_at = now;
record.attempt_count = record.attempt_count.saturating_add(1);
record.last_reason = reason;
} else {
self.entries.insert(
storage_key,
QuarantineRecord {
first_attempt_at: now,
last_attempt_at: now,
attempt_count: 1,
last_reason: reason,
},
);
}
}
pub fn clear(&mut self, key: &QuarantineKey) -> bool {
self.entries.remove(&key.storage_key()).is_some()
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
self.entries.iter().filter_map(|(storage_key, record)| {
QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn ts(seconds: i64) -> DateTime<Utc> {
DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
}
#[test]
fn record_attempt_dedups_by_conversation_and_schema_version() {
let mut state = QuarantineState::default();
assert_eq!(state.storage_version, 1);
let key = QuarantineKey::new("conv-a", 3);
state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
assert_eq!(state.len(), 1, "same key must dedup, not append");
let record = state
.entries
.get(&key.storage_key())
.expect("entry present");
assert_eq!(
record.first_attempt_at,
ts(1_700_000_000),
"first attempt preserved"
);
assert_eq!(
record.last_attempt_at,
ts(1_700_002_000),
"last attempt advances"
);
assert_eq!(record.attempt_count, 3);
assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
}
#[test]
fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
let mut state = QuarantineState::default();
let v3 = QuarantineKey::new("conv-a", 3);
let v4 = QuarantineKey::new("conv-a", 4);
state.record_attempt(&v3, "oom v3", ts(1));
state.record_attempt(&v4, "oom v4", ts(2));
assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
}
#[test]
fn save_and_load_roundtrips_quarantine_state() {
let dir = tempdir().unwrap();
let mut state = QuarantineState::default();
state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
state.save(dir.path()).expect("save");
let loaded = QuarantineState::load(dir.path());
assert_eq!(loaded.len(), 2);
let r1 = loaded
.entries
.get(&QuarantineKey::new("c1", 1).storage_key())
.unwrap();
assert_eq!(r1.last_reason, "r1");
}
#[test]
fn load_returns_empty_for_missing_or_malformed_file() {
let dir = tempdir().unwrap();
let loaded = QuarantineState::load(dir.path());
assert!(loaded.is_empty());
std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
.expect("write malformed");
let loaded = QuarantineState::load(dir.path());
assert!(loaded.is_empty(), "malformed file must not block indexing");
}
#[test]
fn clear_removes_entry() {
let mut state = QuarantineState::default();
let key = QuarantineKey::new("c", 1);
state.record_attempt(&key, "r", ts(1));
assert!(state.clear(&key));
assert!(state.is_empty());
assert!(!state.clear(&key), "clearing absent key returns false");
}
#[test]
fn save_uses_atomic_rename_via_tmp_file() {
let dir = tempdir().unwrap();
let mut state = QuarantineState::default();
state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
state.save(dir.path()).expect("save");
let tmp_path = dir
.path()
.join(format!("{}.tmp", QuarantineState::FILENAME));
assert!(
!tmp_path.exists(),
"tmp file must be renamed away on success"
);
assert!(QuarantineState::path(dir.path()).exists());
}
#[test]
fn iter_yields_keys_in_deterministic_order() {
let mut state = QuarantineState::default();
state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
}
}