use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct QuarantineKey {
pub conversation_id: String,
pub schema_version: u32,
}
impl QuarantineKey {
#[must_use]
pub fn new(conversation_id: impl Into<String>, schema_version: u32) -> Self {
Self {
conversation_id: conversation_id.into(),
schema_version,
}
}
fn storage_key(&self) -> String {
format!("{}::v{}", self.conversation_id, self.schema_version)
}
fn parse_storage_key(key: &str) -> Option<Self> {
let (conversation_id, version_part) = key.rsplit_once("::v")?;
let schema_version: u32 = version_part.parse().ok()?;
Some(Self {
conversation_id: conversation_id.to_string(),
schema_version,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct QuarantineRecord {
pub first_attempt_at: DateTime<Utc>,
pub last_attempt_at: DateTime<Utc>,
pub attempt_count: u64,
pub last_reason: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cass_version_at_quarantine: Option<String>,
}
impl QuarantineRecord {
#[must_use]
pub fn is_version_stale_for_retry(&self, current_version: &str) -> bool {
!matches!(&self.cass_version_at_quarantine, Some(v) if v == current_version)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuarantineState {
#[serde(default = "default_storage_version")]
pub storage_version: u32,
pub entries: BTreeMap<String, QuarantineRecord>,
}
fn default_storage_version() -> u32 {
1
}
impl Default for QuarantineState {
fn default() -> Self {
Self {
storage_version: default_storage_version(),
entries: BTreeMap::new(),
}
}
}
impl QuarantineState {
pub const FILENAME: &'static str = "quarantine_state.json";
#[must_use]
pub fn path(data_dir: &Path) -> PathBuf {
data_dir.join(Self::FILENAME)
}
#[must_use]
pub fn load(data_dir: &Path) -> Self {
let path = Self::path(data_dir);
let Ok(text) = std::fs::read_to_string(&path) else {
return Self {
storage_version: 1,
entries: BTreeMap::new(),
};
};
match serde_json::from_str::<Self>(&text) {
Ok(state) => state,
Err(_) => Self {
storage_version: 1,
entries: BTreeMap::new(),
},
}
}
pub fn save(&self, data_dir: &Path) -> std::io::Result<()> {
std::fs::create_dir_all(data_dir)?;
let final_path = Self::path(data_dir);
let tmp_path = data_dir.join(format!("{}.tmp", Self::FILENAME));
let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
std::fs::write(&tmp_path, json)?;
std::fs::rename(&tmp_path, &final_path)?;
Ok(())
}
pub fn record_attempt(
&mut self,
key: &QuarantineKey,
reason: impl Into<String>,
now: DateTime<Utc>,
) {
let reason = reason.into();
let storage_key = key.storage_key();
if let Some(record) = self.entries.get_mut(&storage_key) {
record.last_attempt_at = now;
record.attempt_count = record.attempt_count.saturating_add(1);
record.last_reason = reason;
record.cass_version_at_quarantine = Some(current_cass_version().to_string());
} else {
self.entries.insert(
storage_key,
QuarantineRecord {
first_attempt_at: now,
last_attempt_at: now,
attempt_count: 1,
last_reason: reason,
cass_version_at_quarantine: Some(current_cass_version().to_string()),
},
);
}
}
pub fn clear(&mut self, key: &QuarantineKey) -> bool {
self.entries.remove(&key.storage_key()).is_some()
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn iter(&self) -> impl Iterator<Item = (QuarantineKey, &QuarantineRecord)> + '_ {
self.entries.iter().filter_map(|(storage_key, record)| {
QuarantineKey::parse_storage_key(storage_key).map(|k| (k, record))
})
}
}
fn current_cass_version() -> &'static str {
env!("CARGO_PKG_VERSION")
}
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error;
use tempfile::tempdir;
type TestResult = Result<(), Box<dyn Error>>;
fn test_error(message: impl Into<String>) -> Box<dyn Error> {
std::io::Error::other(message.into()).into()
}
fn ensure(condition: bool, message: impl Into<String>) -> TestResult {
if condition {
Ok(())
} else {
Err(test_error(message))
}
}
fn ts(seconds: i64) -> DateTime<Utc> {
DateTime::<Utc>::from_timestamp(seconds, 0).expect("valid timestamp")
}
#[test]
fn record_attempt_dedups_by_conversation_and_schema_version() {
let mut state = QuarantineState::default();
assert_eq!(state.storage_version, 1);
let key = QuarantineKey::new("conv-a", 3);
state.record_attempt(&key, "streaming-oom: 4.2 GB", ts(1_700_000_000));
state.record_attempt(&key, "streaming-oom: 4.3 GB", ts(1_700_001_000));
state.record_attempt(&key, "streaming-oom: 4.1 GB", ts(1_700_002_000));
assert_eq!(state.len(), 1, "same key must dedup, not append");
let record = state
.entries
.get(&key.storage_key())
.expect("entry present");
assert_eq!(
record.first_attempt_at,
ts(1_700_000_000),
"first attempt preserved"
);
assert_eq!(
record.last_attempt_at,
ts(1_700_002_000),
"last attempt advances"
);
assert_eq!(record.attempt_count, 3);
assert_eq!(record.last_reason, "streaming-oom: 4.1 GB");
}
#[test]
fn record_attempt_treats_different_schema_versions_as_distinct_keys() {
let mut state = QuarantineState::default();
let v3 = QuarantineKey::new("conv-a", 3);
let v4 = QuarantineKey::new("conv-a", 4);
state.record_attempt(&v3, "oom v3", ts(1));
state.record_attempt(&v4, "oom v4", ts(2));
assert_eq!(state.len(), 2, "schema bump must produce a fresh entry");
}
#[test]
fn save_and_load_roundtrips_quarantine_state() {
let dir = tempdir().unwrap();
let mut state = QuarantineState::default();
state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(100));
state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(200));
state.save(dir.path()).expect("save");
let loaded = QuarantineState::load(dir.path());
assert_eq!(loaded.len(), 2);
let r1 = loaded
.entries
.get(&QuarantineKey::new("c1", 1).storage_key())
.unwrap();
assert_eq!(r1.last_reason, "r1");
}
#[test]
fn load_returns_empty_for_missing_or_malformed_file() {
let dir = tempdir().unwrap();
let loaded = QuarantineState::load(dir.path());
assert!(loaded.is_empty());
std::fs::write(dir.path().join(QuarantineState::FILENAME), "not json")
.expect("write malformed");
let loaded = QuarantineState::load(dir.path());
assert!(loaded.is_empty(), "malformed file must not block indexing");
}
#[test]
fn clear_removes_entry() {
let mut state = QuarantineState::default();
let key = QuarantineKey::new("c", 1);
state.record_attempt(&key, "r", ts(1));
assert!(state.clear(&key));
assert!(state.is_empty());
assert!(!state.clear(&key), "clearing absent key returns false");
}
#[test]
fn save_uses_atomic_rename_via_tmp_file() {
let dir = tempdir().unwrap();
let mut state = QuarantineState::default();
state.record_attempt(&QuarantineKey::new("c", 1), "r", ts(1));
state.save(dir.path()).expect("save");
let tmp_path = dir
.path()
.join(format!("{}.tmp", QuarantineState::FILENAME));
assert!(
!tmp_path.exists(),
"tmp file must be renamed away on success"
);
assert!(QuarantineState::path(dir.path()).exists());
}
#[test]
fn iter_yields_keys_in_deterministic_order() {
let mut state = QuarantineState::default();
state.record_attempt(&QuarantineKey::new("c2", 1), "r2", ts(2));
state.record_attempt(&QuarantineKey::new("c1", 1), "r1", ts(1));
let ids: Vec<String> = state.iter().map(|(k, _)| k.conversation_id).collect();
assert_eq!(ids, vec!["c1".to_string(), "c2".to_string()]);
}
#[test]
fn legacy_entry_missing_cass_version_deserialises_and_is_retry_eligible() -> TestResult {
let dir = tempdir()?;
let legacy_json = serde_json::json!({
"storage_version": 1,
"entries": {
"conv-legacy::v1": {
"first_attempt_at": "2025-11-01T00:00:00Z",
"last_attempt_at": "2025-11-01T00:00:00Z",
"attempt_count": 1,
"last_reason": "index-ingest-out-of-memory: out of memory"
}
}
});
std::fs::write(
dir.path().join(QuarantineState::FILENAME),
serde_json::to_string_pretty(&legacy_json)?,
)?;
let state = QuarantineState::load(dir.path());
ensure(
state.len() == 1,
format!(
"legacy entry must load without error; loaded {} entries",
state.len()
),
)?;
let record = state
.entries
.values()
.next()
.ok_or_else(|| test_error("entry present after loading legacy fixture"))?;
ensure(
record.cass_version_at_quarantine.is_none(),
"missing field must deserialise as None, not cause an error",
)?;
ensure(
record.is_version_stale_for_retry("0.6.6"),
"legacy entry with cass_version_at_quarantine=None must be retry-eligible \
(cass#258 carry-over: v0.5.x entries were silently orphaned)",
)?;
ensure(
record.is_version_stale_for_retry("0.5.1"),
"legacy entry must be retry-eligible even when version string matches a v0.5.x tag",
)?;
ensure(
record.is_version_stale_for_retry("99.0.0"),
"legacy entry must be retry-eligible for any future version string",
)?;
Ok(())
}
#[test]
fn versioned_entry_retry_eligibility_gates_correctly() -> TestResult {
let current = current_cass_version();
let mut state = QuarantineState::default();
state.record_attempt(
&QuarantineKey::new("conv-v", 1),
"index-ingest-out-of-memory",
ts(1),
);
let record = state
.entries
.values()
.next()
.ok_or_else(|| test_error("same-version quarantine record exists"))?;
ensure(
!record.is_version_stale_for_retry(current),
"record stamped with current version must not be retry-eligible",
)?;
let mut state2 = QuarantineState::default();
state2.record_attempt(
&QuarantineKey::new("conv-old", 1),
"index-ingest-out-of-memory",
ts(2),
);
state2
.entries
.values_mut()
.next()
.ok_or_else(|| test_error("old-version quarantine record exists"))?
.cass_version_at_quarantine = Some("0.5.1".to_string());
let old_record = state2
.entries
.values()
.next()
.ok_or_else(|| test_error("old-version quarantine record still exists"))?;
ensure(
old_record.is_version_stale_for_retry(current),
"record stamped with older version must be retry-eligible after a version bump",
)?;
Ok(())
}
}