allsource_core/infrastructure/persistence/lock_free/
queue.rs

1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crossbeam::queue::ArrayQueue;
4use std::sync::Arc;
5
6/// Lock-free bounded event queue for high-throughput ingestion
7///
8/// # Design Pattern
9/// Uses crossbeam's lock-free MPMC (Multi-Producer, Multi-Consumer) queue
10/// to eliminate contention in the hot path of event ingestion.
11///
12/// # Benefits
13/// - **Zero Lock Contention**: Multiple threads can push/pop concurrently
14/// - **Predictable Latency**: No waiting for locks
15/// - **High Throughput**: Optimized for concurrent access
16/// - **Backpressure Handling**: Returns error when full
17///
18/// # Performance
19/// - Push: ~10-20ns (lock-free)
20/// - Pop: ~10-20ns (lock-free)
21/// - vs RwLock: 100-500ns (with contention)
22///
23/// # Example
24/// ```ignore
25/// let queue = LockFreeEventQueue::new(10000);
26///
27/// // Multiple producers can push concurrently
28/// let event = Event::from_strings(...)?;
29/// queue.try_push(event)?;
30///
31/// // Multiple consumers can pop concurrently
32/// if let Some(event) = queue.try_pop() {
33///     // Process event
34/// }
35/// ```
36#[derive(Clone)]
37pub struct LockFreeEventQueue {
38    queue: Arc<ArrayQueue<Event>>,
39    capacity: usize,
40}
41
42impl LockFreeEventQueue {
43    /// Create new lock-free event queue with fixed capacity
44    ///
45    /// # Arguments
46    /// - `capacity`: Maximum number of events the queue can hold
47    ///
48    /// # Capacity Guidelines
49    /// - Small: 1,000-10,000 events (low memory, fast overflow)
50    /// - Medium: 10,000-100,000 events (balanced)
51    /// - Large: 100,000-1,000,000 events (high memory, slow overflow)
52    pub fn new(capacity: usize) -> Self {
53        Self {
54            queue: Arc::new(ArrayQueue::new(capacity)),
55            capacity,
56        }
57    }
58
59    /// Try to push an event to the queue (non-blocking)
60    ///
61    /// Returns an error if the queue is full. Callers should implement
62    /// backpressure handling (e.g., retry with exponential backoff, or
63    /// return HTTP 503 Service Unavailable).
64    ///
65    /// # Performance
66    /// - Lock-free operation (~10-20ns)
67    /// - No waiting on contention
68    /// - Constant time O(1)
69    pub fn try_push(&self, event: Event) -> Result<()> {
70        self.queue.push(event).map_err(|_| {
71            AllSourceError::QueueFull(format!("Event queue at capacity ({})", self.capacity))
72        })
73    }
74
75    /// Try to pop an event from the queue (non-blocking)
76    ///
77    /// Returns `None` if the queue is empty.
78    ///
79    /// # Performance
80    /// - Lock-free operation (~10-20ns)
81    /// - No waiting on contention
82    /// - Constant time O(1)
83    pub fn try_pop(&self) -> Option<Event> {
84        self.queue.pop()
85    }
86
87    /// Get current queue length
88    ///
89    /// Note: This is approximate in concurrent scenarios due to race
90    /// conditions between length check and actual push/pop operations.
91    pub fn len(&self) -> usize {
92        self.queue.len()
93    }
94
95    /// Check if queue is empty
96    ///
97    /// Note: Result may be stale in concurrent scenarios.
98    pub fn is_empty(&self) -> bool {
99        self.queue.is_empty()
100    }
101
102    /// Check if queue is full
103    ///
104    /// Note: Result may be stale in concurrent scenarios.
105    pub fn is_full(&self) -> bool {
106        self.queue.len() == self.capacity
107    }
108
109    /// Get queue capacity
110    pub fn capacity(&self) -> usize {
111        self.capacity
112    }
113
114    /// Get approximate fill percentage (0.0 to 1.0)
115    pub fn fill_ratio(&self) -> f64 {
116        self.len() as f64 / self.capacity as f64
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use serde_json::json;
124    use std::sync::atomic::{AtomicUsize, Ordering};
125    use std::thread;
126
127    fn create_test_event(id: u32) -> Event {
128        Event::from_strings(
129            "test.event".to_string(),
130            format!("entity-{}", id),
131            "default".to_string(),
132            json!({"id": id}),
133            None,
134        )
135        .unwrap()
136    }
137
138    #[test]
139    fn test_create_queue() {
140        let queue = LockFreeEventQueue::new(100);
141        assert_eq!(queue.capacity(), 100);
142        assert_eq!(queue.len(), 0);
143        assert!(queue.is_empty());
144        assert!(!queue.is_full());
145    }
146
147    #[test]
148    fn test_push_and_pop() {
149        let queue = LockFreeEventQueue::new(10);
150
151        let event = create_test_event(1);
152        queue.try_push(event.clone()).unwrap();
153
154        assert_eq!(queue.len(), 1);
155        assert!(!queue.is_empty());
156
157        let popped = queue.try_pop().unwrap();
158        assert_eq!(popped.entity_id(), event.entity_id());
159        assert!(queue.is_empty());
160    }
161
162    #[test]
163    fn test_queue_full() {
164        let queue = LockFreeEventQueue::new(3);
165
166        // Fill queue
167        queue.try_push(create_test_event(1)).unwrap();
168        queue.try_push(create_test_event(2)).unwrap();
169        queue.try_push(create_test_event(3)).unwrap();
170
171        assert!(queue.is_full());
172
173        // Try to push when full
174        let result = queue.try_push(create_test_event(4));
175        assert!(result.is_err());
176        assert!(matches!(result, Err(AllSourceError::QueueFull(_))));
177    }
178
179    #[test]
180    fn test_pop_empty_queue() {
181        let queue = LockFreeEventQueue::new(10);
182        assert!(queue.try_pop().is_none());
183    }
184
185    #[test]
186    fn test_multiple_push_pop() {
187        let queue = LockFreeEventQueue::new(100);
188
189        // Push 10 events
190        for i in 0..10 {
191            queue.try_push(create_test_event(i)).unwrap();
192        }
193
194        assert_eq!(queue.len(), 10);
195
196        // Pop all events
197        let mut count = 0;
198        while queue.try_pop().is_some() {
199            count += 1;
200        }
201
202        assert_eq!(count, 10);
203        assert!(queue.is_empty());
204    }
205
206    #[test]
207    fn test_fill_ratio() {
208        let queue = LockFreeEventQueue::new(100);
209
210        assert_eq!(queue.fill_ratio(), 0.0);
211
212        for i in 0..50 {
213            queue.try_push(create_test_event(i)).unwrap();
214        }
215
216        assert_eq!(queue.fill_ratio(), 0.5);
217
218        for i in 50..100 {
219            queue.try_push(create_test_event(i)).unwrap();
220        }
221
222        assert_eq!(queue.fill_ratio(), 1.0);
223    }
224
225    #[test]
226    fn test_concurrent_producers() {
227        let queue = LockFreeEventQueue::new(10000);
228        let queue_clone1 = queue.clone();
229        let queue_clone2 = queue.clone();
230
231        let handle1 = thread::spawn(move || {
232            for i in 0..1000 {
233                let _ = queue_clone1.try_push(create_test_event(i));
234            }
235        });
236
237        let handle2 = thread::spawn(move || {
238            for i in 1000..2000 {
239                let _ = queue_clone2.try_push(create_test_event(i));
240            }
241        });
242
243        handle1.join().unwrap();
244        handle2.join().unwrap();
245
246        // Should have approximately 2000 events (some may have been lost if queue was full)
247        let final_len = queue.len();
248        assert!(final_len >= 1900 && final_len <= 2000);
249    }
250
251    #[test]
252    fn test_concurrent_producers_and_consumers() {
253        let queue = LockFreeEventQueue::new(1000);
254        let produced = Arc::new(AtomicUsize::new(0));
255        let consumed = Arc::new(AtomicUsize::new(0));
256
257        // Producer thread
258        let queue_prod = queue.clone();
259        let produced_clone = produced.clone();
260        let producer = thread::spawn(move || {
261            for i in 0..500 {
262                while queue_prod.try_push(create_test_event(i)).is_err() {
263                    // Retry if queue is full
264                    thread::yield_now();
265                }
266                produced_clone.fetch_add(1, Ordering::Relaxed);
267            }
268        });
269
270        // Consumer thread
271        let queue_cons = queue.clone();
272        let consumed_clone = consumed.clone();
273        let consumer = thread::spawn(move || {
274            let mut count = 0;
275            while count < 500 {
276                if queue_cons.try_pop().is_some() {
277                    count += 1;
278                    consumed_clone.fetch_add(1, Ordering::Relaxed);
279                } else {
280                    thread::yield_now();
281                }
282            }
283        });
284
285        producer.join().unwrap();
286        consumer.join().unwrap();
287
288        assert_eq!(produced.load(Ordering::Relaxed), 500);
289        assert_eq!(consumed.load(Ordering::Relaxed), 500);
290        assert!(queue.is_empty());
291    }
292}