use std::sync::Arc;
use std::time::{Duration, Instant};
use dashmap::DashMap;
use crate::core::registry::IndexId;
pub fn max_consecutive_failures() -> u32 {
std::env::var("TRUSTY_REINDEX_MAX_FAILURES")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.filter(|&n| n >= 1)
.unwrap_or(3)
}
pub fn max_quarantine_secs() -> u64 {
std::env::var("TRUSTY_REINDEX_QUARANTINE_MAX_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|&s| s >= 1)
.unwrap_or(3600)
}
const BASE_QUARANTINE_SECS: u64 = 60;
#[derive(Debug)]
struct QuarantineEntry {
consecutive_failures: u32,
quarantine_until: Option<Instant>,
}
#[derive(Clone)]
pub struct ReindexQuarantine {
entries: Arc<DashMap<IndexId, QuarantineEntry>>,
max_failures: u32,
cached_max_quarantine_secs: u64,
}
impl Default for ReindexQuarantine {
fn default() -> Self {
Self::new()
}
}
impl ReindexQuarantine {
pub fn new() -> Self {
Self {
entries: Arc::new(DashMap::new()),
max_failures: max_consecutive_failures(),
cached_max_quarantine_secs: max_quarantine_secs(),
}
}
pub fn is_quarantined(&self, id: &IndexId) -> bool {
if let Some(mut entry) = self.entries.get_mut(id) {
if let Some(until) = entry.quarantine_until {
if Instant::now() < until {
return true;
}
entry.quarantine_until = None;
}
}
false
}
pub fn record_failure(&self, id: &IndexId) {
let max_failures = self.max_failures;
let max_quarantine = self.cached_max_quarantine_secs;
let mut entry = self
.entries
.entry(id.clone())
.or_insert_with(|| QuarantineEntry {
consecutive_failures: 0,
quarantine_until: None,
});
entry.consecutive_failures = entry.consecutive_failures.saturating_add(1);
let failures = entry.consecutive_failures;
if failures >= max_failures {
let excess = failures.saturating_sub(max_failures);
let multiplier = 1u64.checked_shl(excess.min(30)).unwrap_or(u64::MAX);
let backoff_secs = BASE_QUARANTINE_SECS
.saturating_mul(multiplier)
.min(max_quarantine);
let until = Instant::now() + Duration::from_secs(backoff_secs);
entry.quarantine_until = Some(until);
tracing::warn!(
index_id = %id.0,
consecutive_failures = failures,
backoff_secs,
"reindex quarantine: index quarantined after {} consecutive failure(s) \
— next retry in {}s (issue #764). \
Resolve the root cause (missing root? corrupt corpus? sidecar crash?) \
and issue a manual `POST /indexes/{}/reindex` to clear.",
failures,
backoff_secs,
id.0,
);
} else {
tracing::debug!(
index_id = %id.0,
consecutive_failures = failures,
remaining = max_failures.saturating_sub(failures),
"reindex quarantine: failure recorded ({}/{} before quarantine)",
failures,
max_failures,
);
}
}
pub fn record_success(&self, id: &IndexId) {
self.entries.remove(id);
}
pub fn failure_count(&self, id: &IndexId) -> u32 {
self.entries
.get(id)
.map(|e| e.consecutive_failures)
.unwrap_or(0)
}
pub fn quarantined_count(&self) -> usize {
let now = Instant::now();
self.entries
.iter()
.filter(|e| e.quarantine_until.is_some_and(|t| now < t))
.count()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn id(s: &str) -> IndexId {
IndexId(s.to_string())
}
#[test]
fn quarantine_new_is_empty() {
let q = ReindexQuarantine::new();
assert!(!q.is_quarantined(&id("x")));
assert_eq!(q.failure_count(&id("x")), 0);
assert_eq!(q.quarantined_count(), 0);
}
#[test]
fn quarantine_failure_count_increments() {
let q = ReindexQuarantine::new();
assert_eq!(q.failure_count(&id("a")), 0);
q.record_failure(&id("a"));
assert_eq!(q.failure_count(&id("a")), 1);
q.record_failure(&id("a"));
assert_eq!(q.failure_count(&id("a")), 2);
assert_eq!(q.failure_count(&id("b")), 0);
}
#[test]
fn quarantine_triggers_after_threshold() {
let q = ReindexQuarantine::new();
let max = max_consecutive_failures();
for _ in 0..max.saturating_sub(1) {
q.record_failure(&id("idx"));
assert!(!q.is_quarantined(&id("idx")));
}
q.record_failure(&id("idx"));
assert!(q.is_quarantined(&id("idx")));
assert_eq!(q.quarantined_count(), 1);
}
#[test]
fn quarantine_success_clears_failures() {
let q = ReindexQuarantine::new();
let max = max_consecutive_failures();
for _ in 0..max {
q.record_failure(&id("z"));
}
assert!(q.is_quarantined(&id("z")));
q.record_success(&id("z"));
assert!(!q.is_quarantined(&id("z")));
assert_eq!(q.failure_count(&id("z")), 0);
}
#[test]
fn quarantine_backoff_grows_exponentially() {
let q = ReindexQuarantine::new();
let max = max_consecutive_failures();
for _ in 0..max {
q.record_failure(&id("w"));
}
let until1 = q
.entries
.get(&id("w"))
.and_then(|e| e.quarantine_until)
.expect("must be quarantined");
q.record_failure(&id("w"));
let until2 = q
.entries
.get(&id("w"))
.and_then(|e| e.quarantine_until)
.expect("must still be quarantined");
assert!(
until2 >= until1,
"second quarantine deadline must not be earlier than the first"
);
}
}