use crate::{
domain::entities::Event,
error::{AllSourceError, Result},
};
use crossbeam::queue::ArrayQueue;
use std::sync::Arc;
#[derive(Clone)]
pub struct LockFreeEventQueue {
queue: Arc<ArrayQueue<Event>>,
capacity: usize,
}
impl LockFreeEventQueue {
pub fn new(capacity: usize) -> Self {
Self {
queue: Arc::new(ArrayQueue::new(capacity)),
capacity,
}
}
pub fn try_push(&self, event: Event) -> Result<()> {
self.queue.push(event).map_err(|_| {
AllSourceError::QueueFull(format!("Event queue at capacity ({})", self.capacity))
})
}
pub fn try_pop(&self) -> Option<Event> {
self.queue.pop()
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn is_full(&self) -> bool {
self.queue.len() == self.capacity
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn fill_ratio(&self) -> f64 {
self.len() as f64 / self.capacity as f64
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::{
sync::atomic::{AtomicUsize, Ordering},
thread,
};
fn create_test_event(id: u32) -> Event {
Event::from_strings(
"test.event".to_string(),
format!("entity-{id}"),
"default".to_string(),
json!({"id": id}),
None,
)
.unwrap()
}
#[test]
fn test_create_queue() {
let queue = LockFreeEventQueue::new(100);
assert_eq!(queue.capacity(), 100);
assert_eq!(queue.len(), 0);
assert!(queue.is_empty());
assert!(!queue.is_full());
}
#[test]
fn test_push_and_pop() {
let queue = LockFreeEventQueue::new(10);
let event = create_test_event(1);
queue.try_push(event.clone()).unwrap();
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
let popped = queue.try_pop().unwrap();
assert_eq!(popped.entity_id(), event.entity_id());
assert!(queue.is_empty());
}
#[test]
fn test_queue_full() {
let queue = LockFreeEventQueue::new(3);
queue.try_push(create_test_event(1)).unwrap();
queue.try_push(create_test_event(2)).unwrap();
queue.try_push(create_test_event(3)).unwrap();
assert!(queue.is_full());
let result = queue.try_push(create_test_event(4));
assert!(result.is_err());
assert!(matches!(result, Err(AllSourceError::QueueFull(_))));
}
#[test]
fn test_pop_empty_queue() {
let queue = LockFreeEventQueue::new(10);
assert!(queue.try_pop().is_none());
}
#[test]
fn test_multiple_push_pop() {
let queue = LockFreeEventQueue::new(100);
for i in 0..10 {
queue.try_push(create_test_event(i)).unwrap();
}
assert_eq!(queue.len(), 10);
let mut count = 0;
while queue.try_pop().is_some() {
count += 1;
}
assert_eq!(count, 10);
assert!(queue.is_empty());
}
#[test]
fn test_fill_ratio() {
let queue = LockFreeEventQueue::new(100);
assert_eq!(queue.fill_ratio(), 0.0);
for i in 0..50 {
queue.try_push(create_test_event(i)).unwrap();
}
assert_eq!(queue.fill_ratio(), 0.5);
for i in 50..100 {
queue.try_push(create_test_event(i)).unwrap();
}
assert_eq!(queue.fill_ratio(), 1.0);
}
#[test]
fn test_concurrent_producers() {
let queue = LockFreeEventQueue::new(10000);
let queue_clone1 = queue.clone();
let queue_clone2 = queue.clone();
let handle1 = thread::spawn(move || {
for i in 0..1000 {
let _ = queue_clone1.try_push(create_test_event(i));
}
});
let handle2 = thread::spawn(move || {
for i in 1000..2000 {
let _ = queue_clone2.try_push(create_test_event(i));
}
});
handle1.join().unwrap();
handle2.join().unwrap();
let final_len = queue.len();
assert!((1900..=2000).contains(&final_len));
}
#[test]
fn test_concurrent_producers_and_consumers() {
let queue = LockFreeEventQueue::new(1000);
let produced = Arc::new(AtomicUsize::new(0));
let consumed = Arc::new(AtomicUsize::new(0));
let queue_prod = queue.clone();
let produced_clone = produced.clone();
let producer = thread::spawn(move || {
for i in 0..500 {
while queue_prod.try_push(create_test_event(i)).is_err() {
thread::yield_now();
}
produced_clone.fetch_add(1, Ordering::Relaxed);
}
});
let queue_cons = queue.clone();
let consumed_clone = consumed.clone();
let consumer = thread::spawn(move || {
let mut count = 0;
while count < 500 {
if queue_cons.try_pop().is_some() {
count += 1;
consumed_clone.fetch_add(1, Ordering::Relaxed);
} else {
thread::yield_now();
}
}
});
producer.join().unwrap();
consumer.join().unwrap();
assert_eq!(produced.load(Ordering::Relaxed), 500);
assert_eq!(consumed.load(Ordering::Relaxed), 500);
assert!(queue.is_empty());
}
}