allsource_core/infrastructure/persistence/lock_free/
queue.rs1use crate::{
2 domain::entities::Event,
3 error::{AllSourceError, Result},
4};
5use crossbeam::queue::ArrayQueue;
6use std::sync::Arc;
7
8#[derive(Clone)]
39pub struct LockFreeEventQueue {
40 queue: Arc<ArrayQueue<Event>>,
41 capacity: usize,
42}
43
44impl LockFreeEventQueue {
45 pub fn new(capacity: usize) -> Self {
55 Self {
56 queue: Arc::new(ArrayQueue::new(capacity)),
57 capacity,
58 }
59 }
60
61 pub fn try_push(&self, event: Event) -> Result<()> {
72 self.queue.push(event).map_err(|_| {
73 AllSourceError::QueueFull(format!("Event queue at capacity ({})", self.capacity))
74 })
75 }
76
77 pub fn try_pop(&self) -> Option<Event> {
86 self.queue.pop()
87 }
88
89 pub fn len(&self) -> usize {
94 self.queue.len()
95 }
96
97 pub fn is_empty(&self) -> bool {
101 self.queue.is_empty()
102 }
103
104 pub fn is_full(&self) -> bool {
108 self.queue.len() == self.capacity
109 }
110
111 pub fn capacity(&self) -> usize {
113 self.capacity
114 }
115
116 pub fn fill_ratio(&self) -> f64 {
118 self.len() as f64 / self.capacity as f64
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use serde_json::json;
126 use std::{
127 sync::atomic::{AtomicUsize, Ordering},
128 thread,
129 };
130
131 fn create_test_event(id: u32) -> Event {
132 Event::from_strings(
133 "test.event".to_string(),
134 format!("entity-{}", id),
135 "default".to_string(),
136 json!({"id": id}),
137 None,
138 )
139 .unwrap()
140 }
141
142 #[test]
143 fn test_create_queue() {
144 let queue = LockFreeEventQueue::new(100);
145 assert_eq!(queue.capacity(), 100);
146 assert_eq!(queue.len(), 0);
147 assert!(queue.is_empty());
148 assert!(!queue.is_full());
149 }
150
151 #[test]
152 fn test_push_and_pop() {
153 let queue = LockFreeEventQueue::new(10);
154
155 let event = create_test_event(1);
156 queue.try_push(event.clone()).unwrap();
157
158 assert_eq!(queue.len(), 1);
159 assert!(!queue.is_empty());
160
161 let popped = queue.try_pop().unwrap();
162 assert_eq!(popped.entity_id(), event.entity_id());
163 assert!(queue.is_empty());
164 }
165
166 #[test]
167 fn test_queue_full() {
168 let queue = LockFreeEventQueue::new(3);
169
170 queue.try_push(create_test_event(1)).unwrap();
172 queue.try_push(create_test_event(2)).unwrap();
173 queue.try_push(create_test_event(3)).unwrap();
174
175 assert!(queue.is_full());
176
177 let result = queue.try_push(create_test_event(4));
179 assert!(result.is_err());
180 assert!(matches!(result, Err(AllSourceError::QueueFull(_))));
181 }
182
183 #[test]
184 fn test_pop_empty_queue() {
185 let queue = LockFreeEventQueue::new(10);
186 assert!(queue.try_pop().is_none());
187 }
188
189 #[test]
190 fn test_multiple_push_pop() {
191 let queue = LockFreeEventQueue::new(100);
192
193 for i in 0..10 {
195 queue.try_push(create_test_event(i)).unwrap();
196 }
197
198 assert_eq!(queue.len(), 10);
199
200 let mut count = 0;
202 while queue.try_pop().is_some() {
203 count += 1;
204 }
205
206 assert_eq!(count, 10);
207 assert!(queue.is_empty());
208 }
209
210 #[test]
211 fn test_fill_ratio() {
212 let queue = LockFreeEventQueue::new(100);
213
214 assert_eq!(queue.fill_ratio(), 0.0);
215
216 for i in 0..50 {
217 queue.try_push(create_test_event(i)).unwrap();
218 }
219
220 assert_eq!(queue.fill_ratio(), 0.5);
221
222 for i in 50..100 {
223 queue.try_push(create_test_event(i)).unwrap();
224 }
225
226 assert_eq!(queue.fill_ratio(), 1.0);
227 }
228
229 #[test]
230 fn test_concurrent_producers() {
231 let queue = LockFreeEventQueue::new(10000);
232 let queue_clone1 = queue.clone();
233 let queue_clone2 = queue.clone();
234
235 let handle1 = thread::spawn(move || {
236 for i in 0..1000 {
237 let _ = queue_clone1.try_push(create_test_event(i));
238 }
239 });
240
241 let handle2 = thread::spawn(move || {
242 for i in 1000..2000 {
243 let _ = queue_clone2.try_push(create_test_event(i));
244 }
245 });
246
247 handle1.join().unwrap();
248 handle2.join().unwrap();
249
250 let final_len = queue.len();
252 assert!((1900..=2000).contains(&final_len));
253 }
254
255 #[test]
256 fn test_concurrent_producers_and_consumers() {
257 let queue = LockFreeEventQueue::new(1000);
258 let produced = Arc::new(AtomicUsize::new(0));
259 let consumed = Arc::new(AtomicUsize::new(0));
260
261 let queue_prod = queue.clone();
263 let produced_clone = produced.clone();
264 let producer = thread::spawn(move || {
265 for i in 0..500 {
266 while queue_prod.try_push(create_test_event(i)).is_err() {
267 thread::yield_now();
269 }
270 produced_clone.fetch_add(1, Ordering::Relaxed);
271 }
272 });
273
274 let queue_cons = queue.clone();
276 let consumed_clone = consumed.clone();
277 let consumer = thread::spawn(move || {
278 let mut count = 0;
279 while count < 500 {
280 if queue_cons.try_pop().is_some() {
281 count += 1;
282 consumed_clone.fetch_add(1, Ordering::Relaxed);
283 } else {
284 thread::yield_now();
285 }
286 }
287 });
288
289 producer.join().unwrap();
290 consumer.join().unwrap();
291
292 assert_eq!(produced.load(Ordering::Relaxed), 500);
293 assert_eq!(consumed.load(Ordering::Relaxed), 500);
294 assert!(queue.is_empty());
295 }
296}