allsource_core/infrastructure/persistence/lock_free/
queue.rs1use crossbeam::queue::ArrayQueue;
2use crate::domain::entities::Event;
3use crate::error::{AllSourceError, Result};
4use std::sync::Arc;
5
6#[derive(Clone)]
37pub struct LockFreeEventQueue {
38 queue: Arc<ArrayQueue<Event>>,
39 capacity: usize,
40}
41
42impl LockFreeEventQueue {
43 pub fn new(capacity: usize) -> Self {
53 Self {
54 queue: Arc::new(ArrayQueue::new(capacity)),
55 capacity,
56 }
57 }
58
59 pub fn try_push(&self, event: Event) -> Result<()> {
70 self.queue
71 .push(event)
72 .map_err(|_| AllSourceError::QueueFull(
73 format!("Event queue at capacity ({})", self.capacity)
74 ))
75 }
76
77 pub fn try_pop(&self) -> Option<Event> {
86 self.queue.pop()
87 }
88
89 pub fn len(&self) -> usize {
94 self.queue.len()
95 }
96
97 pub fn is_empty(&self) -> bool {
101 self.queue.is_empty()
102 }
103
104 pub fn is_full(&self) -> bool {
108 self.queue.len() == self.capacity
109 }
110
111 pub fn capacity(&self) -> usize {
113 self.capacity
114 }
115
116 pub fn fill_ratio(&self) -> f64 {
118 self.len() as f64 / self.capacity as f64
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use serde_json::json;
126 use std::sync::atomic::{AtomicUsize, Ordering};
127 use std::thread;
128
129 fn create_test_event(id: u32) -> Event {
130 Event::from_strings(
131 "test.event".to_string(),
132 format!("entity-{}", id),
133 "default".to_string(),
134 json!({"id": id}),
135 None,
136 )
137 .unwrap()
138 }
139
140 #[test]
141 fn test_create_queue() {
142 let queue = LockFreeEventQueue::new(100);
143 assert_eq!(queue.capacity(), 100);
144 assert_eq!(queue.len(), 0);
145 assert!(queue.is_empty());
146 assert!(!queue.is_full());
147 }
148
149 #[test]
150 fn test_push_and_pop() {
151 let queue = LockFreeEventQueue::new(10);
152
153 let event = create_test_event(1);
154 queue.try_push(event.clone()).unwrap();
155
156 assert_eq!(queue.len(), 1);
157 assert!(!queue.is_empty());
158
159 let popped = queue.try_pop().unwrap();
160 assert_eq!(popped.entity_id(), event.entity_id());
161 assert!(queue.is_empty());
162 }
163
164 #[test]
165 fn test_queue_full() {
166 let queue = LockFreeEventQueue::new(3);
167
168 queue.try_push(create_test_event(1)).unwrap();
170 queue.try_push(create_test_event(2)).unwrap();
171 queue.try_push(create_test_event(3)).unwrap();
172
173 assert!(queue.is_full());
174
175 let result = queue.try_push(create_test_event(4));
177 assert!(result.is_err());
178 assert!(matches!(result, Err(AllSourceError::QueueFull(_))));
179 }
180
181 #[test]
182 fn test_pop_empty_queue() {
183 let queue = LockFreeEventQueue::new(10);
184 assert!(queue.try_pop().is_none());
185 }
186
187 #[test]
188 fn test_multiple_push_pop() {
189 let queue = LockFreeEventQueue::new(100);
190
191 for i in 0..10 {
193 queue.try_push(create_test_event(i)).unwrap();
194 }
195
196 assert_eq!(queue.len(), 10);
197
198 let mut count = 0;
200 while queue.try_pop().is_some() {
201 count += 1;
202 }
203
204 assert_eq!(count, 10);
205 assert!(queue.is_empty());
206 }
207
208 #[test]
209 fn test_fill_ratio() {
210 let queue = LockFreeEventQueue::new(100);
211
212 assert_eq!(queue.fill_ratio(), 0.0);
213
214 for i in 0..50 {
215 queue.try_push(create_test_event(i)).unwrap();
216 }
217
218 assert_eq!(queue.fill_ratio(), 0.5);
219
220 for i in 50..100 {
221 queue.try_push(create_test_event(i)).unwrap();
222 }
223
224 assert_eq!(queue.fill_ratio(), 1.0);
225 }
226
227 #[test]
228 fn test_concurrent_producers() {
229 let queue = LockFreeEventQueue::new(10000);
230 let queue_clone1 = queue.clone();
231 let queue_clone2 = queue.clone();
232
233 let handle1 = thread::spawn(move || {
234 for i in 0..1000 {
235 let _ = queue_clone1.try_push(create_test_event(i));
236 }
237 });
238
239 let handle2 = thread::spawn(move || {
240 for i in 1000..2000 {
241 let _ = queue_clone2.try_push(create_test_event(i));
242 }
243 });
244
245 handle1.join().unwrap();
246 handle2.join().unwrap();
247
248 let final_len = queue.len();
250 assert!(final_len >= 1900 && final_len <= 2000);
251 }
252
253 #[test]
254 fn test_concurrent_producers_and_consumers() {
255 let queue = LockFreeEventQueue::new(1000);
256 let produced = Arc::new(AtomicUsize::new(0));
257 let consumed = Arc::new(AtomicUsize::new(0));
258
259 let queue_prod = queue.clone();
261 let produced_clone = produced.clone();
262 let producer = thread::spawn(move || {
263 for i in 0..500 {
264 while queue_prod.try_push(create_test_event(i)).is_err() {
265 thread::yield_now();
267 }
268 produced_clone.fetch_add(1, Ordering::Relaxed);
269 }
270 });
271
272 let queue_cons = queue.clone();
274 let consumed_clone = consumed.clone();
275 let consumer = thread::spawn(move || {
276 let mut count = 0;
277 while count < 500 {
278 if queue_cons.try_pop().is_some() {
279 count += 1;
280 consumed_clone.fetch_add(1, Ordering::Relaxed);
281 } else {
282 thread::yield_now();
283 }
284 }
285 });
286
287 producer.join().unwrap();
288 consumer.join().unwrap();
289
290 assert_eq!(produced.load(Ordering::Relaxed), 500);
291 assert_eq!(consumed.load(Ordering::Relaxed), 500);
292 assert!(queue.is_empty());
293 }
294}