use std::collections::HashSet;
use std::hash::Hash;
use std::time::{Duration, Instant};
use dashmap::DashMap;
pub struct TimeWindowedIndex<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
entries: DashMap<K, Vec<(V, Instant)>>,
window: Duration,
max_entries_per_key: usize,
}
impl<K, V> TimeWindowedIndex<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Send + Sync,
{
pub fn new(window: Duration, max_entries_per_key: usize) -> Self {
Self {
entries: DashMap::new(),
window,
max_entries_per_key,
}
}
pub fn insert(&self, key: K, value: V) {
let now = Instant::now();
let cutoff = now - self.window;
let max_entries = self.max_entries_per_key;
self.entries
.entry(key)
.and_modify(|entry| {
entry.retain(|(_, ts)| *ts > cutoff);
entry.push((value.clone(), now));
if max_entries > 0 && entry.len() > max_entries {
entry.sort_by(|a, b| b.1.cmp(&a.1));
entry.truncate(max_entries);
}
})
.or_insert_with(|| vec![(value, now)]);
}
pub fn insert_with_timestamp(&self, key: K, value: V, timestamp: Instant) {
let cutoff = Instant::now() - self.window;
let max_entries = self.max_entries_per_key;
self.entries
.entry(key)
.and_modify(|entry| {
entry.retain(|(_, ts)| *ts > cutoff);
entry.push((value.clone(), timestamp));
if max_entries > 0 && entry.len() > max_entries {
entry.sort_by(|a, b| b.1.cmp(&a.1));
entry.truncate(max_entries);
}
})
.or_insert_with(|| vec![(value, timestamp)]);
}
pub fn get(&self, key: &K) -> Vec<V> {
let cutoff = Instant::now() - self.window;
self.entries
.get(key)
.map(|entries| {
entries
.iter()
.filter(|(_, ts)| *ts > cutoff)
.map(|(v, _)| v.clone())
.collect()
})
.unwrap_or_default()
}
pub fn get_with_timestamps(&self, key: &K) -> Vec<(V, Instant)> {
let cutoff = Instant::now() - self.window;
self.entries
.get(key)
.map(|entries| {
entries
.iter()
.filter(|(_, ts)| *ts > cutoff)
.cloned()
.collect()
})
.unwrap_or_default()
}
pub fn count(&self, key: &K) -> usize {
let cutoff = Instant::now() - self.window;
self.entries
.get(key)
.map(|entries| entries.iter().filter(|(_, ts)| *ts > cutoff).count())
.unwrap_or(0)
}
pub fn get_keys_with_min_count(&self, min_count: usize) -> Vec<K> {
let cutoff = Instant::now() - self.window;
self.entries
.iter()
.filter_map(|entry| {
let valid_count = entry.value().iter().filter(|(_, ts)| *ts > cutoff).count();
if valid_count >= min_count {
Some(entry.key().clone())
} else {
None
}
})
.collect()
}
pub fn cleanup(&self) {
let cutoff = Instant::now() - self.window;
for mut entry in self.entries.iter_mut() {
entry.value_mut().retain(|(_, ts)| *ts > cutoff);
}
self.entries.retain(|_, v| !v.is_empty());
}
pub fn clear(&self) {
self.entries.clear();
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn window(&self) -> Duration {
self.window
}
pub fn contains_key(&self, key: &K) -> bool {
self.count(key) > 0
}
pub fn keys(&self) -> Vec<K> {
self.entries
.iter()
.map(|entry| entry.key().clone())
.collect()
}
pub fn any_key_has_value_with_min_count<F>(&self, predicate: F, min_count: usize) -> bool
where
F: Fn(&V) -> bool,
{
let cutoff = Instant::now() - self.window;
self.entries.iter().any(|entry| {
let entries = entry.value();
let has_match = entries
.iter()
.filter(|(_, ts)| *ts > cutoff)
.any(|(v, _)| predicate(v));
let count = entries.iter().filter(|(_, ts)| *ts > cutoff).count();
has_match && count >= min_count
})
}
}
impl<K, V> TimeWindowedIndex<K, V>
where
K: Eq + Hash + Clone + Send + Sync,
V: Clone + Eq + Hash + Send + Sync,
{
pub fn get_unique(&self, key: &K) -> Vec<V> {
let cutoff = Instant::now() - self.window;
self.entries
.get(key)
.map(|entries| {
entries
.iter()
.filter(|(_, ts)| *ts > cutoff)
.map(|(v, _)| v.clone())
.collect::<HashSet<_>>()
.into_iter()
.collect()
})
.unwrap_or_default()
}
pub fn count_unique(&self, key: &K) -> usize {
let cutoff = Instant::now() - self.window;
self.entries
.get(key)
.map(|entries| {
entries
.iter()
.filter(|(_, ts)| *ts > cutoff)
.map(|(v, _)| v)
.collect::<HashSet<_>>()
.len()
})
.unwrap_or(0)
}
pub fn get_keys_with_min_unique_count(&self, min_count: usize) -> Vec<K> {
let cutoff = Instant::now() - self.window;
self.entries
.iter()
.filter_map(|entry| {
let unique_count = entry
.value()
.iter()
.filter(|(_, ts)| *ts > cutoff)
.map(|(v, _)| v)
.collect::<HashSet<_>>()
.len();
if unique_count >= min_count {
Some(entry.key().clone())
} else {
None
}
})
.collect()
}
pub fn get_groups_with_min_unique_count(&self, min_count: usize) -> Vec<(K, Vec<V>)> {
let cutoff = Instant::now() - self.window;
self.entries
.iter()
.filter_map(|entry| {
let unique_values: HashSet<V> = entry
.value()
.iter()
.filter(|(_, ts)| *ts > cutoff)
.map(|(v, _)| v.clone())
.collect();
if unique_values.len() >= min_count {
Some((entry.key().clone(), unique_values.into_iter().collect()))
} else {
None
}
})
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::IpAddr;
use std::thread;
#[test]
fn test_new() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
assert!(index.is_empty());
assert_eq!(index.len(), 0);
assert_eq!(index.window(), Duration::from_secs(300));
}
#[test]
fn test_insert_and_get() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
index.insert("key1".to_string(), ip1);
index.insert("key1".to_string(), ip2);
index.insert("key2".to_string(), ip1);
let values = index.get(&"key1".to_string());
assert_eq!(values.len(), 2);
assert!(values.contains(&ip1));
assert!(values.contains(&ip2));
let values = index.get(&"key2".to_string());
assert_eq!(values.len(), 1);
assert!(values.contains(&ip1));
assert_eq!(index.len(), 2);
}
#[test]
fn test_get_nonexistent_key() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let values = index.get(&"nonexistent".to_string());
assert!(values.is_empty());
}
#[test]
fn test_expiration() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_millis(50), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key".to_string(), ip);
assert_eq!(index.get(&"key".to_string()).len(), 1);
thread::sleep(Duration::from_millis(60));
assert_eq!(index.get(&"key".to_string()).len(), 0);
}
#[test]
fn test_count() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
index.insert("key".to_string(), ip1);
index.insert("key".to_string(), ip2);
assert_eq!(index.count(&"key".to_string()), 2);
assert_eq!(index.count(&"nonexistent".to_string()), 0);
}
#[test]
fn test_get_unique() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key".to_string(), ip1);
index.insert("key".to_string(), ip1);
index.insert("key".to_string(), ip1);
assert_eq!(index.count(&"key".to_string()), 3);
let unique = index.get_unique(&"key".to_string());
assert_eq!(unique.len(), 1);
assert_eq!(index.count_unique(&"key".to_string()), 1);
}
#[test]
fn test_get_keys_with_min_count() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
let ip3: IpAddr = "192.168.1.3".parse().unwrap();
index.insert("key1".to_string(), ip1);
index.insert("key1".to_string(), ip2);
index.insert("key1".to_string(), ip3);
index.insert("key2".to_string(), ip1);
let keys = index.get_keys_with_min_count(2);
assert_eq!(keys.len(), 1);
assert!(keys.contains(&"key1".to_string()));
let keys = index.get_keys_with_min_count(1);
assert_eq!(keys.len(), 2);
}
#[test]
fn test_get_keys_with_min_unique_count() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
index.insert("key1".to_string(), ip1);
index.insert("key1".to_string(), ip2);
index.insert("key2".to_string(), ip1);
index.insert("key2".to_string(), ip1);
let keys = index.get_keys_with_min_unique_count(2);
assert_eq!(keys.len(), 1);
assert!(keys.contains(&"key1".to_string()));
}
#[test]
fn test_get_groups_with_min_unique_count() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
let ip3: IpAddr = "192.168.1.3".parse().unwrap();
index.insert("key1".to_string(), ip1);
index.insert("key1".to_string(), ip2);
index.insert("key1".to_string(), ip3);
index.insert("key2".to_string(), ip1);
let groups = index.get_groups_with_min_unique_count(2);
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].0, "key1");
assert_eq!(groups[0].1.len(), 3);
}
#[test]
fn test_cleanup() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_millis(50), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key".to_string(), ip);
assert_eq!(index.len(), 1);
thread::sleep(Duration::from_millis(60));
index.cleanup();
assert_eq!(index.len(), 0);
assert!(index.is_empty());
}
#[test]
fn test_clear() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key1".to_string(), ip);
index.insert("key2".to_string(), ip);
assert_eq!(index.len(), 2);
index.clear();
assert_eq!(index.len(), 0);
assert!(index.is_empty());
}
#[test]
fn test_contains_key() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key".to_string(), ip);
assert!(index.contains_key(&"key".to_string()));
assert!(!index.contains_key(&"nonexistent".to_string()));
}
#[test]
fn test_max_entries_limit() {
let index: TimeWindowedIndex<String, u32> =
TimeWindowedIndex::new(Duration::from_secs(300), 3);
for i in 0..5 {
index.insert("key".to_string(), i);
}
let values = index.get(&"key".to_string());
assert_eq!(values.len(), 3);
assert!(values.contains(&2));
assert!(values.contains(&3));
assert!(values.contains(&4));
}
#[test]
fn test_keys() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key1".to_string(), ip);
index.insert("key2".to_string(), ip);
index.insert("key3".to_string(), ip);
let keys = index.keys();
assert_eq!(keys.len(), 3);
assert!(keys.contains(&"key1".to_string()));
assert!(keys.contains(&"key2".to_string()));
assert!(keys.contains(&"key3".to_string()));
}
#[test]
fn test_get_with_timestamps() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
index.insert("key".to_string(), ip);
let entries = index.get_with_timestamps(&"key".to_string());
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0, ip);
}
#[test]
fn test_insert_with_timestamp() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip: IpAddr = "192.168.1.1".parse().unwrap();
let past = Instant::now() - Duration::from_secs(10);
index.insert_with_timestamp("key".to_string(), ip, past);
let entries = index.get_with_timestamps(&"key".to_string());
assert_eq!(entries.len(), 1);
assert!(entries[0].1 <= Instant::now() - Duration::from_secs(9));
}
#[test]
fn test_thread_safety() {
use std::sync::Arc;
let index: Arc<TimeWindowedIndex<String, u32>> =
Arc::new(TimeWindowedIndex::new(Duration::from_secs(300), 1000));
let handles: Vec<_> = (0..10)
.map(|i| {
let idx = Arc::clone(&index);
thread::spawn(move || {
for j in 0..100 {
idx.insert(format!("key{}", i), j);
}
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
assert_eq!(index.len(), 10);
for i in 0..10 {
assert!(index.count(&format!("key{}", i)) > 0);
}
}
#[test]
fn test_any_key_has_value_with_min_count() {
let index: TimeWindowedIndex<String, IpAddr> =
TimeWindowedIndex::new(Duration::from_secs(300), 100);
let ip1: IpAddr = "192.168.1.1".parse().unwrap();
let ip2: IpAddr = "192.168.1.2".parse().unwrap();
let ip3: IpAddr = "192.168.1.3".parse().unwrap();
index.insert("key1".to_string(), ip1);
index.insert("key1".to_string(), ip2);
index.insert("key1".to_string(), ip3);
index.insert("key2".to_string(), ip2);
assert!(index.any_key_has_value_with_min_count(|ip| *ip == ip1, 2));
assert!(index.any_key_has_value_with_min_count(|ip| *ip == ip1, 3));
assert!(!index.any_key_has_value_with_min_count(|ip| *ip == ip1, 4));
assert!(index.any_key_has_value_with_min_count(|ip| *ip == ip2, 2));
let ip_nonexistent: IpAddr = "10.0.0.1".parse().unwrap();
assert!(!index.any_key_has_value_with_min_count(|ip| *ip == ip_nonexistent, 1));
}
}