Skip to main content

allsource_core/infrastructure/persistence/lock_free/
queue.rs

1use crate::{
2    domain::entities::Event,
3    error::{AllSourceError, Result},
4};
5use crossbeam::queue::ArrayQueue;
6use std::sync::Arc;
7
8/// Lock-free bounded event queue for high-throughput ingestion
9///
10/// # Design Pattern
11/// Uses crossbeam's lock-free MPMC (Multi-Producer, Multi-Consumer) queue
12/// to eliminate contention in the hot path of event ingestion.
13///
14/// # Benefits
15/// - **Zero Lock Contention**: Multiple threads can push/pop concurrently
16/// - **Predictable Latency**: No waiting for locks
17/// - **High Throughput**: Optimized for concurrent access
18/// - **Backpressure Handling**: Returns error when full
19///
20/// # Performance
21/// - Push: ~10-20ns (lock-free)
22/// - Pop: ~10-20ns (lock-free)
23/// - vs RwLock: 100-500ns (with contention)
24///
25/// # Example
26/// ```ignore
27/// let queue = LockFreeEventQueue::new(10000);
28///
29/// // Multiple producers can push concurrently
30/// let event = Event::from_strings(...)?;
31/// queue.try_push(event)?;
32///
33/// // Multiple consumers can pop concurrently
34/// if let Some(event) = queue.try_pop() {
35///     // Process event
36/// }
37/// ```
38#[derive(Clone)]
39pub struct LockFreeEventQueue {
40    queue: Arc<ArrayQueue<Event>>,
41    capacity: usize,
42}
43
44impl LockFreeEventQueue {
45    /// Create new lock-free event queue with fixed capacity
46    ///
47    /// # Arguments
48    /// - `capacity`: Maximum number of events the queue can hold
49    ///
50    /// # Capacity Guidelines
51    /// - Small: 1,000-10,000 events (low memory, fast overflow)
52    /// - Medium: 10,000-100,000 events (balanced)
53    /// - Large: 100,000-1,000,000 events (high memory, slow overflow)
54    pub fn new(capacity: usize) -> Self {
55        Self {
56            queue: Arc::new(ArrayQueue::new(capacity)),
57            capacity,
58        }
59    }
60
61    /// Try to push an event to the queue (non-blocking)
62    ///
63    /// Returns an error if the queue is full. Callers should implement
64    /// backpressure handling (e.g., retry with exponential backoff, or
65    /// return HTTP 503 Service Unavailable).
66    ///
67    /// # Performance
68    /// - Lock-free operation (~10-20ns)
69    /// - No waiting on contention
70    /// - Constant time O(1)
71    pub fn try_push(&self, event: Event) -> Result<()> {
72        self.queue.push(event).map_err(|_| {
73            AllSourceError::QueueFull(format!("Event queue at capacity ({})", self.capacity))
74        })
75    }
76
77    /// Try to pop an event from the queue (non-blocking)
78    ///
79    /// Returns `None` if the queue is empty.
80    ///
81    /// # Performance
82    /// - Lock-free operation (~10-20ns)
83    /// - No waiting on contention
84    /// - Constant time O(1)
85    pub fn try_pop(&self) -> Option<Event> {
86        self.queue.pop()
87    }
88
89    /// Get current queue length
90    ///
91    /// Note: This is approximate in concurrent scenarios due to race
92    /// conditions between length check and actual push/pop operations.
93    pub fn len(&self) -> usize {
94        self.queue.len()
95    }
96
97    /// Check if queue is empty
98    ///
99    /// Note: Result may be stale in concurrent scenarios.
100    pub fn is_empty(&self) -> bool {
101        self.queue.is_empty()
102    }
103
104    /// Check if queue is full
105    ///
106    /// Note: Result may be stale in concurrent scenarios.
107    pub fn is_full(&self) -> bool {
108        self.queue.len() == self.capacity
109    }
110
111    /// Get queue capacity
112    pub fn capacity(&self) -> usize {
113        self.capacity
114    }
115
116    /// Get approximate fill percentage (0.0 to 1.0)
117    pub fn fill_ratio(&self) -> f64 {
118        self.len() as f64 / self.capacity as f64
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use serde_json::json;
126    use std::{
127        sync::atomic::{AtomicUsize, Ordering},
128        thread,
129    };
130
131    fn create_test_event(id: u32) -> Event {
132        Event::from_strings(
133            "test.event".to_string(),
134            format!("entity-{}", id),
135            "default".to_string(),
136            json!({"id": id}),
137            None,
138        )
139        .unwrap()
140    }
141
142    #[test]
143    fn test_create_queue() {
144        let queue = LockFreeEventQueue::new(100);
145        assert_eq!(queue.capacity(), 100);
146        assert_eq!(queue.len(), 0);
147        assert!(queue.is_empty());
148        assert!(!queue.is_full());
149    }
150
151    #[test]
152    fn test_push_and_pop() {
153        let queue = LockFreeEventQueue::new(10);
154
155        let event = create_test_event(1);
156        queue.try_push(event.clone()).unwrap();
157
158        assert_eq!(queue.len(), 1);
159        assert!(!queue.is_empty());
160
161        let popped = queue.try_pop().unwrap();
162        assert_eq!(popped.entity_id(), event.entity_id());
163        assert!(queue.is_empty());
164    }
165
166    #[test]
167    fn test_queue_full() {
168        let queue = LockFreeEventQueue::new(3);
169
170        // Fill queue
171        queue.try_push(create_test_event(1)).unwrap();
172        queue.try_push(create_test_event(2)).unwrap();
173        queue.try_push(create_test_event(3)).unwrap();
174
175        assert!(queue.is_full());
176
177        // Try to push when full
178        let result = queue.try_push(create_test_event(4));
179        assert!(result.is_err());
180        assert!(matches!(result, Err(AllSourceError::QueueFull(_))));
181    }
182
183    #[test]
184    fn test_pop_empty_queue() {
185        let queue = LockFreeEventQueue::new(10);
186        assert!(queue.try_pop().is_none());
187    }
188
189    #[test]
190    fn test_multiple_push_pop() {
191        let queue = LockFreeEventQueue::new(100);
192
193        // Push 10 events
194        for i in 0..10 {
195            queue.try_push(create_test_event(i)).unwrap();
196        }
197
198        assert_eq!(queue.len(), 10);
199
200        // Pop all events
201        let mut count = 0;
202        while queue.try_pop().is_some() {
203            count += 1;
204        }
205
206        assert_eq!(count, 10);
207        assert!(queue.is_empty());
208    }
209
210    #[test]
211    fn test_fill_ratio() {
212        let queue = LockFreeEventQueue::new(100);
213
214        assert_eq!(queue.fill_ratio(), 0.0);
215
216        for i in 0..50 {
217            queue.try_push(create_test_event(i)).unwrap();
218        }
219
220        assert_eq!(queue.fill_ratio(), 0.5);
221
222        for i in 50..100 {
223            queue.try_push(create_test_event(i)).unwrap();
224        }
225
226        assert_eq!(queue.fill_ratio(), 1.0);
227    }
228
229    #[test]
230    fn test_concurrent_producers() {
231        let queue = LockFreeEventQueue::new(10000);
232        let queue_clone1 = queue.clone();
233        let queue_clone2 = queue.clone();
234
235        let handle1 = thread::spawn(move || {
236            for i in 0..1000 {
237                let _ = queue_clone1.try_push(create_test_event(i));
238            }
239        });
240
241        let handle2 = thread::spawn(move || {
242            for i in 1000..2000 {
243                let _ = queue_clone2.try_push(create_test_event(i));
244            }
245        });
246
247        handle1.join().unwrap();
248        handle2.join().unwrap();
249
250        // Should have approximately 2000 events (some may have been lost if queue was full)
251        let final_len = queue.len();
252        assert!((1900..=2000).contains(&final_len));
253    }
254
255    #[test]
256    fn test_concurrent_producers_and_consumers() {
257        let queue = LockFreeEventQueue::new(1000);
258        let produced = Arc::new(AtomicUsize::new(0));
259        let consumed = Arc::new(AtomicUsize::new(0));
260
261        // Producer thread
262        let queue_prod = queue.clone();
263        let produced_clone = produced.clone();
264        let producer = thread::spawn(move || {
265            for i in 0..500 {
266                while queue_prod.try_push(create_test_event(i)).is_err() {
267                    // Retry if queue is full
268                    thread::yield_now();
269                }
270                produced_clone.fetch_add(1, Ordering::Relaxed);
271            }
272        });
273
274        // Consumer thread
275        let queue_cons = queue.clone();
276        let consumed_clone = consumed.clone();
277        let consumer = thread::spawn(move || {
278            let mut count = 0;
279            while count < 500 {
280                if queue_cons.try_pop().is_some() {
281                    count += 1;
282                    consumed_clone.fetch_add(1, Ordering::Relaxed);
283                } else {
284                    thread::yield_now();
285                }
286            }
287        });
288
289        producer.join().unwrap();
290        consumer.join().unwrap();
291
292        assert_eq!(produced.load(Ordering::Relaxed), 500);
293        assert_eq!(consumed.load(Ordering::Relaxed), 500);
294        assert!(queue.is_empty());
295    }
296}