use crate::{
domain::entities::Event,
error::{AllSourceError, Result},
};
use crossbeam::queue::ArrayQueue;
use std::{
hash::{Hash, Hasher},
sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
},
};
const DEFAULT_SHARD_COUNT: usize = 16;
pub struct ShardedEventQueue {
shards: Vec<Arc<ArrayQueue<Event>>>,
shard_count: usize,
capacity_per_shard: usize,
pop_index: AtomicUsize,
total_pushed: AtomicU64,
total_popped: AtomicU64,
push_failures: AtomicU64,
}
impl ShardedEventQueue {
pub fn new(total_capacity: usize) -> Self {
Self::with_shards(total_capacity, DEFAULT_SHARD_COUNT)
}
pub fn with_shards(total_capacity: usize, shard_count: usize) -> Self {
let capacity_per_shard = (total_capacity / shard_count).max(1);
let shards = (0..shard_count)
.map(|_| Arc::new(ArrayQueue::new(capacity_per_shard)))
.collect();
Self {
shards,
shard_count,
capacity_per_shard,
pop_index: AtomicUsize::new(0),
total_pushed: AtomicU64::new(0),
total_popped: AtomicU64::new(0),
push_failures: AtomicU64::new(0),
}
}
#[inline]
fn shard_index(&self, entity_id: &str) -> usize {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
entity_id.hash(&mut hasher);
(hasher.finish() as usize) & (self.shard_count - 1)
}
pub fn try_push(&self, event: Event) -> Result<()> {
let shard_idx = self.shard_index(event.entity_id().as_str());
let shard = &self.shards[shard_idx];
if let Ok(()) = shard.push(event) {
self.total_pushed.fetch_add(1, Ordering::Relaxed);
Ok(())
} else {
self.push_failures.fetch_add(1, Ordering::Relaxed);
Err(AllSourceError::QueueFull(format!(
"Shard {} at capacity ({})",
shard_idx, self.capacity_per_shard
)))
}
}
pub fn try_push_batch(&self, events: Vec<Event>) -> usize {
let mut success_count = 0;
for event in events {
if self.try_push(event).is_ok() {
success_count += 1;
}
}
success_count
}
pub fn try_pop(&self) -> Option<Event> {
let start_idx = self.pop_index.fetch_add(1, Ordering::Relaxed) % self.shard_count;
let shard = &self.shards[start_idx];
if let Some(event) = shard.pop() {
self.total_popped.fetch_add(1, Ordering::Relaxed);
Some(event)
} else {
None
}
}
pub fn try_pop_any(&self) -> Option<Event> {
let start_idx = self.pop_index.fetch_add(1, Ordering::Relaxed) % self.shard_count;
for i in 0..self.shard_count {
let idx = (start_idx + i) % self.shard_count;
if let Some(event) = self.shards[idx].pop() {
self.total_popped.fetch_add(1, Ordering::Relaxed);
return Some(event);
}
}
None
}
pub fn try_pop_batch(&self, max_count: usize) -> Vec<Event> {
let mut events = Vec::with_capacity(max_count);
let mut popped = 0;
while popped < max_count {
if let Some(event) = self.try_pop_any() {
events.push(event);
popped += 1;
} else {
break;
}
}
events
}
pub fn len(&self) -> usize {
self.shards.iter().map(|s| s.len()).sum()
}
pub fn is_empty(&self) -> bool {
self.shards.iter().all(|s| s.is_empty())
}
pub fn total_capacity(&self) -> usize {
self.shard_count * self.capacity_per_shard
}
pub fn shard_count(&self) -> usize {
self.shard_count
}
pub fn stats(&self) -> ShardedQueueStats {
ShardedQueueStats {
total_pushed: self.total_pushed.load(Ordering::Relaxed),
total_popped: self.total_popped.load(Ordering::Relaxed),
push_failures: self.push_failures.load(Ordering::Relaxed),
current_length: self.len(),
shard_lengths: self.shards.iter().map(|s| s.len()).collect(),
}
}
pub fn fill_ratio(&self) -> f64 {
self.len() as f64 / self.total_capacity() as f64
}
}
impl Clone for ShardedEventQueue {
fn clone(&self) -> Self {
Self {
shards: self.shards.clone(),
shard_count: self.shard_count,
capacity_per_shard: self.capacity_per_shard,
pop_index: AtomicUsize::new(0),
total_pushed: AtomicU64::new(self.total_pushed.load(Ordering::Relaxed)),
total_popped: AtomicU64::new(self.total_popped.load(Ordering::Relaxed)),
push_failures: AtomicU64::new(self.push_failures.load(Ordering::Relaxed)),
}
}
}
#[derive(Debug, Clone)]
pub struct ShardedQueueStats {
pub total_pushed: u64,
pub total_popped: u64,
pub push_failures: u64,
pub current_length: usize,
pub shard_lengths: Vec<usize>,
}
impl ShardedQueueStats {
pub fn balance_ratio(&self) -> f64 {
if self.shard_lengths.is_empty() || self.current_length == 0 {
return 1.0;
}
let avg = self.current_length as f64 / self.shard_lengths.len() as f64;
let variance: f64 = self
.shard_lengths
.iter()
.map(|&len| (len as f64 - avg).powi(2))
.sum::<f64>()
/ self.shard_lengths.len() as f64;
let std_dev = variance.sqrt();
if avg > 0.0 {
1.0 - (std_dev / avg).min(1.0)
} else {
1.0
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::thread;
fn create_test_event(id: u32) -> Event {
Event::from_strings(
"test.event".to_string(),
format!("entity-{id}"),
"default".to_string(),
json!({"id": id}),
None,
)
.unwrap()
}
#[test]
fn test_create_sharded_queue() {
let queue = ShardedEventQueue::new(10000);
assert_eq!(queue.shard_count(), DEFAULT_SHARD_COUNT);
assert!(queue.total_capacity() >= 10000 / DEFAULT_SHARD_COUNT * DEFAULT_SHARD_COUNT);
assert!(queue.is_empty());
}
#[test]
fn test_push_pop() {
let queue = ShardedEventQueue::new(1000);
let event = create_test_event(1);
queue.try_push(event.clone()).unwrap();
assert!(!queue.is_empty());
let popped = queue.try_pop_any().unwrap();
assert_eq!(popped.entity_id().as_str(), "entity-1");
assert!(queue.is_empty());
}
#[test]
fn test_batch_operations() {
let queue = ShardedEventQueue::new(1000);
let events: Vec<Event> = (0..100).map(create_test_event).collect();
let pushed = queue.try_push_batch(events);
assert_eq!(pushed, 100);
assert_eq!(queue.len(), 100);
let popped = queue.try_pop_batch(50);
assert_eq!(popped.len(), 50);
assert_eq!(queue.len(), 50);
}
#[test]
fn test_sharding_distribution() {
let queue = ShardedEventQueue::with_shards(10000, 4);
for i in 0..1000 {
let event = create_test_event(i);
queue.try_push(event).unwrap();
}
let stats = queue.stats();
assert_eq!(stats.current_length, 1000);
let non_empty_shards = stats.shard_lengths.iter().filter(|&&len| len > 0).count();
assert!(non_empty_shards >= 3);
}
#[test]
fn test_concurrent_push() {
let queue = ShardedEventQueue::new(100000);
let queue_ref = &queue;
std::thread::scope(|s| {
for t in 0..8 {
s.spawn(move || {
for i in 0..1000 {
let event = create_test_event(t * 1000 + i);
let _ = queue_ref.try_push(event);
}
});
}
});
assert_eq!(queue.len(), 8000);
}
#[test]
fn test_concurrent_push_pop() {
let queue = ShardedEventQueue::new(10000);
let queue_ref = &queue;
std::thread::scope(|s| {
for t in 0..4 {
s.spawn(move || {
for i in 0..500 {
let event = create_test_event(t * 500 + i);
while queue_ref.try_push(event.clone()).is_err() {
thread::yield_now();
}
}
});
}
for _ in 0..4 {
s.spawn(move || {
let mut consumed = 0;
while consumed < 500 {
if queue_ref.try_pop_any().is_some() {
consumed += 1;
} else {
thread::yield_now();
}
}
});
}
});
assert!(queue.is_empty());
}
#[test]
fn test_stats() {
let queue = ShardedEventQueue::new(1000);
for i in 0..100 {
queue.try_push(create_test_event(i)).unwrap();
}
for _ in 0..30 {
queue.try_pop_any();
}
let stats = queue.stats();
assert_eq!(stats.total_pushed, 100);
assert_eq!(stats.total_popped, 30);
assert_eq!(stats.current_length, 70);
}
#[test]
fn test_balance_ratio() {
let stats = ShardedQueueStats {
total_pushed: 100,
total_popped: 0,
push_failures: 0,
current_length: 100,
shard_lengths: vec![25, 25, 25, 25], };
assert!((stats.balance_ratio() - 1.0).abs() < 0.001);
let unbalanced_stats = ShardedQueueStats {
total_pushed: 100,
total_popped: 0,
push_failures: 0,
current_length: 100,
shard_lengths: vec![100, 0, 0, 0], };
assert!(unbalanced_stats.balance_ratio() < 0.5);
}
}