use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::RwLock;
use object_store::ObjectStore;
use super::error::QuarantineError;
pub use super::registry_store::{QuarantineStorageConfig, build_quarantine_store};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum QuarantineEngine {
Columnar,
Fts,
Vector,
Raft,
}
impl QuarantineEngine {
pub fn as_str(self) -> &'static str {
match self {
QuarantineEngine::Columnar => "columnar",
QuarantineEngine::Fts => "fts",
QuarantineEngine::Vector => "vector",
QuarantineEngine::Raft => "raft",
}
}
}
impl std::fmt::Display for QuarantineEngine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SegmentKey {
pub engine: QuarantineEngine,
pub collection: String,
pub segment_id: String,
}
#[derive(Debug, Clone)]
pub struct QuarantineRecord {
pub strikes: u32,
pub first_seen_ms: u64,
pub last_error: String,
pub quarantine_info: Option<QuarantineInfo>,
}
#[derive(Debug, Clone)]
pub struct QuarantineInfo {
pub quarantined_path: PathBuf,
pub quarantined_at_ms: u64,
}
#[derive(Debug, Default)]
pub struct QuarantineRegistry {
entries: RwLock<HashMap<SegmentKey, QuarantineRecord>>,
}
impl QuarantineRegistry {
pub fn new() -> Self {
Self {
entries: RwLock::new(HashMap::new()),
}
}
pub fn record_failure(
&self,
key: SegmentKey,
error_summary: &str,
segment_path: Option<&Path>,
) -> Result<(), QuarantineError> {
let now_ms = unix_ms_now();
{
let read = self.entries.read().unwrap_or_else(|p| p.into_inner());
if let Some(rec) = read.get(&key)
&& let Some(ref qi) = rec.quarantine_info
{
return Err(QuarantineError::SegmentQuarantined {
engine: key.engine.to_string(),
collection: key.collection.clone(),
segment_id: key.segment_id.clone(),
quarantined_at_unix_ms: qi.quarantined_at_ms,
});
}
}
let mut write = self.entries.write().unwrap_or_else(|p| p.into_inner());
let rec = write.entry(key.clone()).or_insert(QuarantineRecord {
strikes: 0,
first_seen_ms: now_ms,
last_error: error_summary.to_string(),
quarantine_info: None,
});
if let Some(ref qi) = rec.quarantine_info {
return Err(QuarantineError::SegmentQuarantined {
engine: key.engine.to_string(),
collection: key.collection.clone(),
segment_id: key.segment_id.clone(),
quarantined_at_unix_ms: qi.quarantined_at_ms,
});
}
rec.strikes += 1;
rec.last_error = error_summary.to_string();
if rec.strikes < 2 {
return Ok(());
}
let quarantined_at_ms = now_ms;
let quarantined_path = if let Some(path) = segment_path {
let new_path = path.with_extension(format!(
"{}.quarantined.{quarantined_at_ms}",
path.extension().and_then(|s| s.to_str()).unwrap_or("seg")
));
if let Err(e) = std::fs::rename(path, &new_path) {
tracing::error!(
engine = key.engine.as_str(),
collection = %key.collection,
segment_id = %key.segment_id,
path = %path.display(),
error = %e,
"failed to rename corrupt segment file; segment will still be quarantined in memory"
);
path.to_path_buf()
} else {
tracing::error!(
engine = key.engine.as_str(),
collection = %key.collection,
segment_id = %key.segment_id,
quarantined_path = %new_path.display(),
"segment quarantined after two consecutive CRC failures; \
file renamed, collection degraded but other segments remain readable"
);
new_path
}
} else {
tracing::error!(
engine = key.engine.as_str(),
collection = %key.collection,
segment_id = %key.segment_id,
"segment quarantined after two consecutive CRC failures (no file path)"
);
PathBuf::new()
};
rec.quarantine_info = Some(QuarantineInfo {
quarantined_path,
quarantined_at_ms,
});
Err(QuarantineError::SegmentQuarantined {
engine: key.engine.to_string(),
collection: key.collection.clone(),
segment_id: key.segment_id.clone(),
quarantined_at_unix_ms: quarantined_at_ms,
})
}
pub fn is_quarantined(&self, key: &SegmentKey) -> bool {
let read = self.entries.read().unwrap_or_else(|p| p.into_inner());
read.get(key)
.map(|rec| rec.quarantine_info.is_some())
.unwrap_or(false)
}
pub fn record_success(&self, key: &SegmentKey) {
let mut write = self.entries.write().unwrap_or_else(|p| p.into_inner());
if let Some(rec) = write.get_mut(key)
&& rec.quarantine_info.is_none()
{
rec.strikes = 0;
}
}
pub async fn rebuild_from_store(
&self,
engine: QuarantineEngine,
store: &Arc<dyn ObjectStore>,
engine_key_from_filename: &(dyn Fn(&str) -> Option<(String, String)> + Send + Sync),
) {
use futures::TryStreamExt;
let objects: Vec<_> = match store.list(None).try_collect().await {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, "quarantine rebuild: cannot list object store");
return;
}
};
for obj in objects {
let key_str = obj.location.as_ref();
let fname_str = key_str.rsplit('/').next().unwrap_or(key_str);
if !fname_str.contains(".quarantined.") {
continue;
}
let (collection, segment_id) = match engine_key_from_filename(fname_str) {
Some(pair) => pair,
None => {
tracing::warn!(
file = fname_str,
engine = engine.as_str(),
"quarantine rebuild: cannot parse filename, skipping"
);
continue;
}
};
let quarantined_at_ms = fname_str
.rsplit('.')
.next()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let key = SegmentKey {
engine,
collection,
segment_id,
};
let quarantined_path = PathBuf::from(obj.location.as_ref());
let mut write = self.entries.write().unwrap_or_else(|p| p.into_inner());
write.insert(
key,
QuarantineRecord {
strikes: 2,
first_seen_ms: quarantined_at_ms,
last_error: "rebuilt from object store on startup".to_string(),
quarantine_info: Some(QuarantineInfo {
quarantined_path,
quarantined_at_ms,
}),
},
);
}
}
pub fn rebuild_from_dir(
&self,
engine: QuarantineEngine,
dir: &Path,
engine_key_from_filename: &dyn Fn(&str) -> Option<(String, String)>,
) {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(e) => {
tracing::warn!(dir = %dir.display(), error = %e, "quarantine rebuild: cannot read dir");
return;
}
};
for entry in entries.flatten() {
let fname = entry.file_name();
let fname_str = match fname.to_str() {
Some(s) => s,
None => continue,
};
if !fname_str.contains(".quarantined.") {
continue;
}
let (collection, segment_id) = match engine_key_from_filename(fname_str) {
Some(pair) => pair,
None => {
tracing::warn!(
file = fname_str,
engine = engine.as_str(),
"quarantine rebuild: cannot parse filename, skipping"
);
continue;
}
};
let quarantined_at_ms = fname_str
.rsplit('.')
.next()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let key = SegmentKey {
engine,
collection,
segment_id,
};
let quarantined_path = dir.join(fname_str);
let mut write = self.entries.write().unwrap_or_else(|p| p.into_inner());
write.insert(
key,
QuarantineRecord {
strikes: 2,
first_seen_ms: quarantined_at_ms,
last_error: "rebuilt from disk on startup".to_string(),
quarantine_info: Some(QuarantineInfo {
quarantined_path,
quarantined_at_ms,
}),
},
);
}
}
pub fn quarantined_snapshot(&self) -> Vec<QuarantineSnapshot> {
let read = self.entries.read().unwrap_or_else(|p| p.into_inner());
read.iter()
.filter_map(|(key, rec)| {
rec.quarantine_info.as_ref().map(|qi| QuarantineSnapshot {
engine: key.engine.to_string(),
collection: key.collection.clone(),
segment_id: key.segment_id.clone(),
quarantined_at_unix_ms: qi.quarantined_at_ms,
last_error_summary: rec.last_error.clone(),
strikes: rec.strikes,
})
})
.collect()
}
pub fn active_counts(&self) -> HashMap<(String, String), u64> {
let read = self.entries.read().unwrap_or_else(|p| p.into_inner());
let mut counts: HashMap<(String, String), u64> = HashMap::new();
for (key, rec) in read.iter() {
if rec.quarantine_info.is_some() {
*counts
.entry((key.engine.to_string(), key.collection.clone()))
.or_insert(0) += 1;
}
}
counts
}
pub fn total_counts(&self) -> HashMap<(String, String), u64> {
self.active_counts()
}
}
#[derive(Debug, Clone)]
pub struct QuarantineSnapshot {
pub engine: String,
pub collection: String,
pub segment_id: String,
pub quarantined_at_unix_ms: u64,
pub last_error_summary: String,
pub strikes: u32,
}
fn unix_ms_now() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn key(engine: QuarantineEngine, collection: &str, segment_id: &str) -> SegmentKey {
SegmentKey {
engine,
collection: collection.to_string(),
segment_id: segment_id.to_string(),
}
}
#[test]
fn first_strike_returns_ok() {
let reg = QuarantineRegistry::new();
let k = key(QuarantineEngine::Columnar, "coll", "seg1");
assert!(reg.record_failure(k, "crc error", None).is_ok());
}
#[test]
fn second_strike_returns_quarantined() {
let reg = QuarantineRegistry::new();
let k = key(QuarantineEngine::Columnar, "coll", "seg1");
reg.record_failure(k.clone(), "crc error", None).unwrap();
let err = reg.record_failure(k, "crc error", None).unwrap_err();
assert!(matches!(err, QuarantineError::SegmentQuarantined { .. }));
}
#[test]
fn success_clears_strikes() {
let reg = QuarantineRegistry::new();
let k = key(QuarantineEngine::Fts, "coll", "seg2");
reg.record_failure(k.clone(), "crc error", None).unwrap();
reg.record_success(&k);
assert!(reg.record_failure(k, "crc error", None).is_ok());
}
#[test]
fn quarantined_stays_quarantined_after_success_attempt() {
let reg = QuarantineRegistry::new();
let k = key(QuarantineEngine::Vector, "coll", "seg3");
reg.record_failure(k.clone(), "crc", None).unwrap();
reg.record_failure(k.clone(), "crc", None).unwrap_err();
reg.record_success(&k);
let err = reg.record_failure(k, "crc", None).unwrap_err();
assert!(matches!(err, QuarantineError::SegmentQuarantined { .. }));
}
#[test]
fn concurrent_claims_only_one_quarantine() {
let reg = Arc::new(QuarantineRegistry::new());
let k = key(QuarantineEngine::Raft, "coll", "seg4");
reg.record_failure(k.clone(), "first", None).unwrap();
let reg1 = Arc::clone(®);
let k1 = k.clone();
let reg2 = Arc::clone(®);
let k2 = k.clone();
let t1 = std::thread::spawn(move || reg1.record_failure(k1, "second", None));
let t2 = std::thread::spawn(move || reg2.record_failure(k2, "second", None));
let r1 = t1.join().unwrap();
let r2 = t2.join().unwrap();
assert!(r1.is_err() && r2.is_err());
let snap = reg.quarantined_snapshot();
assert_eq!(snap.len(), 1);
}
#[test]
fn rebuild_from_dir_restores_quarantine() {
let tmp = tempfile::tempdir().unwrap();
let ts = 1_700_000_000_000u64;
std::fs::write(tmp.path().join(format!("seg5.quarantined.{ts}")), b"").unwrap();
let reg = QuarantineRegistry::new();
reg.rebuild_from_dir(QuarantineEngine::Columnar, tmp.path(), &|fname| {
let stem = fname.split(".quarantined.").next()?;
Some(("default".to_string(), stem.to_string()))
});
let k = key(QuarantineEngine::Columnar, "default", "seg5");
let err = reg.record_failure(k, "crc", None).unwrap_err();
assert!(
matches!(err, QuarantineError::SegmentQuarantined { quarantined_at_unix_ms, .. } if quarantined_at_unix_ms == ts)
);
}
#[tokio::test]
async fn rebuild_from_store_restores_quarantine() {
use object_store::ObjectStoreExt;
use object_store::memory::InMemory;
use object_store::path::Path as ObjectPath;
let store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let ts = 1_700_000_000_111u64;
let key_path = ObjectPath::from(format!("seg6.quarantined.{ts}"));
store
.put(&key_path, object_store::PutPayload::from(b"".as_ref()))
.await
.unwrap();
let reg = QuarantineRegistry::new();
reg.rebuild_from_store(QuarantineEngine::Columnar, &store, &|fname| {
let stem = fname.split(".quarantined.").next()?;
Some(("default".to_string(), stem.to_string()))
})
.await;
let k = key(QuarantineEngine::Columnar, "default", "seg6");
let err = reg.record_failure(k, "crc", None).unwrap_err();
assert!(
matches!(err, QuarantineError::SegmentQuarantined { quarantined_at_unix_ms, .. } if quarantined_at_unix_ms == ts),
"expected ts={ts} but got: {err}"
);
}
}