use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;
pub struct RedisStreamDedup {
order: VecDeque<Arc<str>>,
seen: HashSet<Arc<str>>,
capacity: usize,
}
impl RedisStreamDedup {
pub const MAX_CAPACITY: usize = 1 << 24;
pub fn with_capacity(capacity: usize) -> Self {
let capacity = capacity.clamp(1, Self::MAX_CAPACITY);
Self {
order: VecDeque::with_capacity(capacity),
seen: HashSet::with_capacity(capacity),
capacity,
}
}
pub fn new() -> Self {
Self::with_capacity(4096)
}
#[inline]
pub fn len(&self) -> usize {
self.seen.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.seen.is_empty()
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn is_duplicate(&mut self, dedup_id: &str) -> bool {
if self.seen.contains(dedup_id) {
return true;
}
if self.seen.len() >= self.capacity {
if let Some(evicted) = self.order.pop_front() {
self.seen.remove(&evicted);
}
}
let id: Arc<str> = Arc::from(dedup_id);
self.order.push_back(Arc::clone(&id));
self.seen.insert(id);
false
}
pub fn clear(&mut self) {
self.order.clear();
self.seen.clear();
}
}
impl Default for RedisStreamDedup {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for RedisStreamDedup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisStreamDedup")
.field("len", &self.len())
.field("capacity", &self.capacity)
.finish()
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::disallowed_methods,
reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
)]
use super::*;
#[test]
fn first_observation_is_not_a_duplicate() {
let mut d = RedisStreamDedup::with_capacity(8);
assert!(!d.is_duplicate("a"));
assert_eq!(d.len(), 1);
}
#[test]
fn repeat_observation_is_a_duplicate() {
let mut d = RedisStreamDedup::with_capacity(8);
assert!(!d.is_duplicate("a"));
assert!(d.is_duplicate("a"));
assert_eq!(d.len(), 1);
}
#[test]
fn distinct_ids_do_not_collide() {
let mut d = RedisStreamDedup::with_capacity(8);
assert!(!d.is_duplicate("a"));
assert!(!d.is_duplicate("b"));
assert!(!d.is_duplicate("c"));
assert_eq!(d.len(), 3);
assert!(d.is_duplicate("a"));
assert!(d.is_duplicate("b"));
assert!(d.is_duplicate("c"));
}
#[test]
fn lru_keeps_non_evicted_ids_tracked() {
let mut d = RedisStreamDedup::with_capacity(2);
assert!(!d.is_duplicate("a"));
assert!(!d.is_duplicate("b"));
assert!(!d.is_duplicate("c")); assert_eq!(d.len(), 2);
assert!(d.is_duplicate("b"));
assert!(d.is_duplicate("c"));
}
#[test]
fn lru_evicted_id_is_reported_as_new() {
let mut d = RedisStreamDedup::with_capacity(2);
assert!(!d.is_duplicate("a"));
assert!(!d.is_duplicate("b"));
assert!(!d.is_duplicate("c"));
assert!(!d.is_duplicate("a"));
}
#[test]
fn duplicate_observation_does_not_refresh_lru_position() {
let mut d = RedisStreamDedup::with_capacity(2);
assert!(!d.is_duplicate("a")); assert!(!d.is_duplicate("b"));
assert!(d.is_duplicate("a"));
assert!(!d.is_duplicate("c"));
assert!(
d.is_duplicate("b"),
"duplicate observation must NOT refresh LRU position — \
expected `b` to still be tracked after `c` evicted the \
front, but `b` was evicted instead",
);
}
#[test]
fn capacity_zero_is_clamped_to_one() {
let mut d = RedisStreamDedup::with_capacity(0);
assert_eq!(d.capacity(), 1);
assert!(!d.is_duplicate("a"));
assert!(!d.is_duplicate("b")); assert!(!d.is_duplicate("a")); }
#[test]
fn clear_resets_state() {
let mut d = RedisStreamDedup::with_capacity(8);
d.is_duplicate("a");
d.is_duplicate("b");
assert_eq!(d.len(), 2);
d.clear();
assert_eq!(d.len(), 0);
assert!(d.is_empty());
assert!(!d.is_duplicate("a")); }
#[test]
fn redis_stream_dedup_is_send_and_sync() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<RedisStreamDedup>();
assert_sync::<RedisStreamDedup>();
}
#[test]
fn redis_stream_dedup_works_under_mutex_across_threads() {
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
let dedup = Arc::new(Mutex::new(RedisStreamDedup::with_capacity(128)));
let handles: Vec<_> = (0..4)
.map(|t| {
let d = Arc::clone(&dedup);
thread::spawn(move || {
for i in 0..16 {
let id = format!("t{}:{}", t, i);
let mut g = d.lock().unwrap();
let was_dup = g.is_duplicate(&id);
assert!(
!was_dup,
"thread {} id {} should be new on first insert",
t, i
);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
let g = dedup.lock().unwrap();
assert_eq!(g.len(), 4 * 16);
}
#[test]
fn filters_redis_streams_producer_duplicates_by_dedup_id() {
let mut d = RedisStreamDedup::with_capacity(64);
let dedup_ids = ["deadbeef:0:0:0", "deadbeef:0:0:1", "deadbeef:0:0:2"];
for id in &dedup_ids {
assert!(
!d.is_duplicate(id),
"first observation of {id} should not be a duplicate",
);
}
for id in &dedup_ids {
assert!(
d.is_duplicate(id),
"retry-path observation of {id} should be filtered as a duplicate",
);
}
}
#[test]
fn with_capacity_clamps_usize_max() {
let d = RedisStreamDedup::with_capacity(usize::MAX);
assert_eq!(
d.capacity,
RedisStreamDedup::MAX_CAPACITY,
"capacity must be clamped at MAX_CAPACITY",
);
const _: () = assert!(
RedisStreamDedup::MAX_CAPACITY < usize::MAX,
"MAX_CAPACITY must strictly bound usize::MAX",
);
}
#[test]
fn with_capacity_preserves_in_range_values() {
let d = RedisStreamDedup::with_capacity(1024);
assert_eq!(d.capacity, 1024);
let d_zero = RedisStreamDedup::with_capacity(0);
assert_eq!(d_zero.capacity, 1, "0 must clamp UP to 1, not down");
}
}