allsource_core/infrastructure/persistence/lock_free/
queue.rs1use crate::domain::entities::Event;
2use crate::error::{AllSourceError, Result};
3use crossbeam::queue::ArrayQueue;
4use std::sync::Arc;
5
6#[derive(Clone)]
37pub struct LockFreeEventQueue {
38 queue: Arc<ArrayQueue<Event>>,
39 capacity: usize,
40}
41
42impl LockFreeEventQueue {
43 pub fn new(capacity: usize) -> Self {
53 Self {
54 queue: Arc::new(ArrayQueue::new(capacity)),
55 capacity,
56 }
57 }
58
59 pub fn try_push(&self, event: Event) -> Result<()> {
70 self.queue.push(event).map_err(|_| {
71 AllSourceError::QueueFull(format!("Event queue at capacity ({})", self.capacity))
72 })
73 }
74
75 pub fn try_pop(&self) -> Option<Event> {
84 self.queue.pop()
85 }
86
87 pub fn len(&self) -> usize {
92 self.queue.len()
93 }
94
95 pub fn is_empty(&self) -> bool {
99 self.queue.is_empty()
100 }
101
102 pub fn is_full(&self) -> bool {
106 self.queue.len() == self.capacity
107 }
108
109 pub fn capacity(&self) -> usize {
111 self.capacity
112 }
113
114 pub fn fill_ratio(&self) -> f64 {
116 self.len() as f64 / self.capacity as f64
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123 use serde_json::json;
124 use std::sync::atomic::{AtomicUsize, Ordering};
125 use std::thread;
126
127 fn create_test_event(id: u32) -> Event {
128 Event::from_strings(
129 "test.event".to_string(),
130 format!("entity-{}", id),
131 "default".to_string(),
132 json!({"id": id}),
133 None,
134 )
135 .unwrap()
136 }
137
138 #[test]
139 fn test_create_queue() {
140 let queue = LockFreeEventQueue::new(100);
141 assert_eq!(queue.capacity(), 100);
142 assert_eq!(queue.len(), 0);
143 assert!(queue.is_empty());
144 assert!(!queue.is_full());
145 }
146
147 #[test]
148 fn test_push_and_pop() {
149 let queue = LockFreeEventQueue::new(10);
150
151 let event = create_test_event(1);
152 queue.try_push(event.clone()).unwrap();
153
154 assert_eq!(queue.len(), 1);
155 assert!(!queue.is_empty());
156
157 let popped = queue.try_pop().unwrap();
158 assert_eq!(popped.entity_id(), event.entity_id());
159 assert!(queue.is_empty());
160 }
161
162 #[test]
163 fn test_queue_full() {
164 let queue = LockFreeEventQueue::new(3);
165
166 queue.try_push(create_test_event(1)).unwrap();
168 queue.try_push(create_test_event(2)).unwrap();
169 queue.try_push(create_test_event(3)).unwrap();
170
171 assert!(queue.is_full());
172
173 let result = queue.try_push(create_test_event(4));
175 assert!(result.is_err());
176 assert!(matches!(result, Err(AllSourceError::QueueFull(_))));
177 }
178
179 #[test]
180 fn test_pop_empty_queue() {
181 let queue = LockFreeEventQueue::new(10);
182 assert!(queue.try_pop().is_none());
183 }
184
185 #[test]
186 fn test_multiple_push_pop() {
187 let queue = LockFreeEventQueue::new(100);
188
189 for i in 0..10 {
191 queue.try_push(create_test_event(i)).unwrap();
192 }
193
194 assert_eq!(queue.len(), 10);
195
196 let mut count = 0;
198 while queue.try_pop().is_some() {
199 count += 1;
200 }
201
202 assert_eq!(count, 10);
203 assert!(queue.is_empty());
204 }
205
206 #[test]
207 fn test_fill_ratio() {
208 let queue = LockFreeEventQueue::new(100);
209
210 assert_eq!(queue.fill_ratio(), 0.0);
211
212 for i in 0..50 {
213 queue.try_push(create_test_event(i)).unwrap();
214 }
215
216 assert_eq!(queue.fill_ratio(), 0.5);
217
218 for i in 50..100 {
219 queue.try_push(create_test_event(i)).unwrap();
220 }
221
222 assert_eq!(queue.fill_ratio(), 1.0);
223 }
224
225 #[test]
226 fn test_concurrent_producers() {
227 let queue = LockFreeEventQueue::new(10000);
228 let queue_clone1 = queue.clone();
229 let queue_clone2 = queue.clone();
230
231 let handle1 = thread::spawn(move || {
232 for i in 0..1000 {
233 let _ = queue_clone1.try_push(create_test_event(i));
234 }
235 });
236
237 let handle2 = thread::spawn(move || {
238 for i in 1000..2000 {
239 let _ = queue_clone2.try_push(create_test_event(i));
240 }
241 });
242
243 handle1.join().unwrap();
244 handle2.join().unwrap();
245
246 let final_len = queue.len();
248 assert!(final_len >= 1900 && final_len <= 2000);
249 }
250
251 #[test]
252 fn test_concurrent_producers_and_consumers() {
253 let queue = LockFreeEventQueue::new(1000);
254 let produced = Arc::new(AtomicUsize::new(0));
255 let consumed = Arc::new(AtomicUsize::new(0));
256
257 let queue_prod = queue.clone();
259 let produced_clone = produced.clone();
260 let producer = thread::spawn(move || {
261 for i in 0..500 {
262 while queue_prod.try_push(create_test_event(i)).is_err() {
263 thread::yield_now();
265 }
266 produced_clone.fetch_add(1, Ordering::Relaxed);
267 }
268 });
269
270 let queue_cons = queue.clone();
272 let consumed_clone = consumed.clone();
273 let consumer = thread::spawn(move || {
274 let mut count = 0;
275 while count < 500 {
276 if queue_cons.try_pop().is_some() {
277 count += 1;
278 consumed_clone.fetch_add(1, Ordering::Relaxed);
279 } else {
280 thread::yield_now();
281 }
282 }
283 });
284
285 producer.join().unwrap();
286 consumer.join().unwrap();
287
288 assert_eq!(produced.load(Ordering::Relaxed), 500);
289 assert_eq!(consumed.load(Ordering::Relaxed), 500);
290 assert!(queue.is_empty());
291 }
292}