use std::collections::HashSet;
use std::hash::Hash;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use dashmap::DashMap;
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use tokio::sync::Notify;
use super::error::RedexError;
use super::event::RedexEvent;
use super::file::RedexFile;
#[derive(Debug, Clone)]
pub enum IndexOp<K, V> {
Insert(K, V),
Remove(K, V),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IndexStart {
FromBeginning,
FromSeq(u64),
}
pub struct RedexIndex<K, V>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Hash + Eq + Clone + Send + Sync + 'static,
{
inner: Arc<DashMap<K, HashSet<V>>>,
shutdown: Arc<Notify>,
lag_resets: Arc<AtomicU64>,
}
impl<K, V> RedexIndex<K, V>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Hash + Eq + Clone + Send + Sync + 'static,
{
pub fn open<T, F>(file: &RedexFile, start: IndexStart, project: F) -> Self
where
T: DeserializeOwned + Send + 'static,
F: Fn(&T) -> Vec<IndexOp<K, V>> + Send + Sync + 'static,
{
let inner: Arc<DashMap<K, HashSet<V>>> = Arc::new(DashMap::new());
let shutdown = Arc::new(Notify::new());
let lag_resets = Arc::new(AtomicU64::new(0));
let from_seq = match start {
IndexStart::FromBeginning => 0,
IndexStart::FromSeq(n) => n,
};
let task_inner = inner.clone();
let task_shutdown = shutdown.clone();
let task_file = file.clone();
let task_lag_resets = lag_resets.clone();
tokio::spawn(async move {
let mut tail: Pin<Box<dyn Stream<Item = Result<RedexEvent, RedexError>> + Send>> =
Box::pin(task_file.tail(from_seq));
let mut consecutive_resets: u32 = 0;
loop {
tokio::select! {
_ = task_shutdown.notified() => return,
next = tail.next() => {
match next {
Some(Ok(event)) => {
apply_event(&task_inner, &project, &event);
consecutive_resets = 0;
}
Some(Err(RedexError::Lagged)) => {
let resume_seq = task_file.next_seq();
task_inner.clear();
let total_resets =
task_lag_resets.fetch_add(1, Ordering::Relaxed) + 1;
tail = Box::pin(task_file.tail(resume_seq));
consecutive_resets = consecutive_resets.saturating_add(1);
rate_limited_lag_warn(
"RedexIndex: tail lagged; cleared index, resumed live-only",
resume_seq,
total_resets,
consecutive_resets,
);
if let Some(d) = backoff_for(consecutive_resets) {
tokio::time::sleep(d).await;
}
}
Some(Err(e)) => {
tracing::debug!(error = %e, "RedexIndex tail error; continuing");
}
None => {
if task_file.is_closed() {
return;
}
let resume_seq = task_file.next_seq();
task_inner.clear();
let total_resets =
task_lag_resets.fetch_add(1, Ordering::Relaxed) + 1;
tail = Box::pin(task_file.tail(resume_seq));
consecutive_resets = consecutive_resets.saturating_add(1);
rate_limited_lag_warn(
"RedexIndex: tail stream ended on a still-open file \
(saturation-induced watcher drop); cleared index, \
resumed live-only",
resume_seq,
total_resets,
consecutive_resets,
);
if let Some(d) = backoff_for(consecutive_resets) {
tokio::time::sleep(d).await;
}
}
}
}
}
}
});
Self {
inner,
shutdown,
lag_resets,
}
}
pub fn lag_resets(&self) -> u64 {
self.lag_resets.load(Ordering::Relaxed)
}
pub fn get(&self, key: &K) -> Option<HashSet<V>> {
self.inner.get(key).map(|e| e.value().clone())
}
pub fn contains(&self, key: &K, value: &V) -> bool {
self.inner
.get(key)
.is_some_and(|e| e.value().contains(value))
}
pub fn keys(&self) -> Vec<K> {
self.inner.iter().map(|e| e.key().clone()).collect()
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
impl<K, V> Drop for RedexIndex<K, V>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Hash + Eq + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
self.shutdown.notify_one();
}
}
impl<K, V> std::fmt::Debug for RedexIndex<K, V>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Hash + Eq + Clone + Send + Sync + 'static,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedexIndex")
.field("keys", &self.inner.len())
.finish()
}
}
fn backoff_for(consecutive_resets: u32) -> Option<Duration> {
match consecutive_resets {
0 | 1 => None,
2 => Some(Duration::from_millis(5)),
3 => Some(Duration::from_millis(20)),
4 => Some(Duration::from_millis(60)),
_ => Some(Duration::from_millis(250)),
}
}
fn rate_limited_lag_warn(msg: &'static str, resume_seq: u64, total: u64, consecutive: u32) {
let log_this = consecutive == 1 || total.is_power_of_two();
if log_this {
tracing::warn!(resume_seq, total, consecutive, "{msg}");
}
}
fn apply_event<T, K, V, F>(inner: &DashMap<K, HashSet<V>>, project: &F, event: &RedexEvent)
where
T: DeserializeOwned,
K: Hash + Eq + Clone,
V: Hash + Eq + Clone,
F: Fn(&T) -> Vec<IndexOp<K, V>>,
{
let decoded: T = match postcard::from_bytes(&event.payload) {
Ok(t) => t,
Err(e) => {
tracing::warn!(
error = %e,
seq = event.entry.seq,
"RedexIndex: failed to decode event; skipping",
);
return;
}
};
for op in project(&decoded) {
match op {
IndexOp::Insert(k, v) => {
inner.entry(k).or_default().insert(v);
}
IndexOp::Remove(k, v) => {
let mut drop_key = false;
if let Some(mut set) = inner.get_mut(&k) {
set.remove(&v);
drop_key = set.is_empty();
}
if drop_key {
inner.remove(&k);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::config::RedexFileConfig;
use crate::adapter::net::redex::manager::Redex;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Tagged {
id: u64,
tags: Vec<String>,
}
fn open_file(name: &str) -> super::RedexFile {
let r = Redex::new();
r.open_file(&ChannelName::new(name).unwrap(), RedexFileConfig::default())
.unwrap()
}
async fn yield_a_few() {
for _ in 0..10 {
tokio::task::yield_now().await;
}
}
#[tokio::test]
async fn test_index_populates_from_existing_entries() {
let f = open_file("idx/basic");
for id in 0..5u64 {
let ev = Tagged {
id,
tags: vec!["even".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
}
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
yield_a_few().await;
let even = idx.get(&"even".to_string()).expect("even bucket populated");
assert_eq!(even.len(), 5);
for i in 0..5u64 {
assert!(even.contains(&i));
}
assert_eq!(idx.keys().len(), 1);
}
#[tokio::test]
async fn test_index_insert_remove_symmetry() {
let f = open_file("idx/insert_remove");
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| {
if let Some(stripped) = tag.strip_prefix('-') {
IndexOp::Remove(stripped.to_string(), t.id)
} else {
IndexOp::Insert(tag.clone(), t.id)
}
})
.collect()
});
let add = Tagged {
id: 1,
tags: vec!["k".into()],
};
let remove = Tagged {
id: 1,
tags: vec!["-k".into()],
};
f.append(&postcard::to_allocvec(&add).unwrap()).unwrap();
yield_a_few().await;
assert!(idx.contains(&"k".to_string(), &1u64));
f.append(&postcard::to_allocvec(&remove).unwrap()).unwrap();
yield_a_few().await;
assert!(idx.get(&"k".to_string()).is_none());
assert_eq!(idx.len(), 0);
}
#[tokio::test]
async fn test_index_multiple_ops_per_event() {
let f = open_file("idx/multiop");
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
let ev = Tagged {
id: 42,
tags: vec!["alpha".into(), "beta".into(), "gamma".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
yield_a_few().await;
for tag in ["alpha", "beta", "gamma"] {
assert!(idx.contains(&tag.to_string(), &42u64));
}
assert_eq!(idx.len(), 3);
}
#[tokio::test]
async fn test_index_from_seq_skips_earlier_events() {
let f = open_file("idx/fromseq");
for id in 0..4u64 {
let ev = Tagged {
id,
tags: vec!["t".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
}
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromSeq(2), |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
yield_a_few().await;
let bucket = idx.get(&"t".to_string()).unwrap();
assert_eq!(bucket.len(), 2);
assert!(bucket.contains(&2));
assert!(bucket.contains(&3));
assert!(!bucket.contains(&0));
assert!(!bucket.contains(&1));
}
#[tokio::test]
async fn test_index_decode_error_skips_entry() {
let f = open_file("idx/decode_err");
f.append(b"\xFF\xFF\xFF\xFF").unwrap();
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
let good = Tagged {
id: 7,
tags: vec!["x".into()],
};
f.append(&postcard::to_allocvec(&good).unwrap()).unwrap();
yield_a_few().await;
assert!(idx.contains(&"x".to_string(), &7u64));
assert_eq!(idx.len(), 1);
}
#[tokio::test]
async fn index_recovers_from_tail_lag_and_continues_indexing() {
let r = Redex::new();
let f = r
.open_file(
&ChannelName::new("idx/lag-recovery").unwrap(),
RedexFileConfig::default().with_tail_buffer_size(2),
)
.unwrap();
for id in 0..10u64 {
let ev = Tagged {
id,
tags: vec!["pre".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
}
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
yield_a_few().await;
for id in 100..105u64 {
let ev = Tagged {
id,
tags: vec!["post".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
yield_a_few().await;
}
let post_keys = idx.get(&"post".to_string()).expect(
"post-lag bucket missing — pre-fix the index task halted \
permanently after Lagged and never observed these events",
);
assert_eq!(
post_keys.len(),
5,
"every post-lag event must be indexed; recovered set was {:?}",
post_keys
);
for id in 100..105u64 {
assert!(
post_keys.contains(&id),
"post-lag id {} missing from recovered index",
id
);
}
}
#[tokio::test]
async fn index_terminates_when_file_closes() {
let r = Redex::new();
let f = r
.open_file(
&ChannelName::new("idx/close-terminates").unwrap(),
RedexFileConfig::default(),
)
.unwrap();
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
let ev = Tagged {
id: 1,
tags: vec!["a".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
yield_a_few().await;
assert!(idx.contains(&"a".to_string(), &1));
f.close().unwrap();
yield_a_few().await;
assert!(f.is_closed());
}
#[tokio::test]
async fn lag_resets_counter_increments_on_saturation_reset() {
let r = Redex::new();
let f = r
.open_file(
&ChannelName::new("idx/lag-resets-counter").unwrap(),
RedexFileConfig::default().with_tail_buffer_size(2),
)
.unwrap();
for id in 0..10u64 {
let ev = Tagged {
id,
tags: vec!["pre".into()],
};
f.append(&postcard::to_allocvec(&ev).unwrap()).unwrap();
}
let idx: RedexIndex<String, u64> =
RedexIndex::open::<Tagged, _>(&f, IndexStart::FromBeginning, |t| {
t.tags
.iter()
.map(|tag| IndexOp::Insert(tag.clone(), t.id))
.collect()
});
assert_eq!(idx.lag_resets(), 0, "fresh index has not reset yet");
for _ in 0..50 {
tokio::task::yield_now().await;
if idx.lag_resets() > 0 {
break;
}
}
assert!(
idx.lag_resets() >= 1,
"saturation must bump lag_resets, got {}",
idx.lag_resets()
);
}
#[test]
fn backoff_schedule_is_monotonic_and_bounded() {
assert_eq!(backoff_for(0), None);
assert_eq!(backoff_for(1), None);
let prev = backoff_for(2).unwrap();
for n in 3..=10 {
let cur = backoff_for(n).unwrap();
assert!(
cur >= prev,
"backoff must be non-decreasing in consecutive resets; \
backoff_for({n}) = {:?} < backoff_for({}) = {:?}",
cur,
n - 1,
prev,
);
assert!(
cur <= Duration::from_millis(250),
"backoff must be bounded at 250ms; backoff_for({n}) = {:?}",
cur,
);
}
assert_eq!(backoff_for(100).unwrap(), Duration::from_millis(250));
assert_eq!(backoff_for(u32::MAX).unwrap(), Duration::from_millis(250));
}
}