1#![cfg(all(feature = "parallel", not(target_arch = "wasm32")))]
9
10use crossbeam::queue::{ArrayQueue, SegQueue};
11use crossbeam::utils::CachePadded;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14
15#[repr(align(64))]
17pub struct LockFreeCounter {
18 value: CachePadded<AtomicU64>,
19}
20
21impl LockFreeCounter {
22 pub fn new(initial: u64) -> Self {
23 Self {
24 value: CachePadded::new(AtomicU64::new(initial)),
25 }
26 }
27
28 #[inline]
29 pub fn increment(&self) -> u64 {
30 self.value.fetch_add(1, Ordering::Relaxed)
31 }
32
33 #[inline]
34 pub fn get(&self) -> u64 {
35 self.value.load(Ordering::Relaxed)
36 }
37
38 #[inline]
39 pub fn add(&self, delta: u64) -> u64 {
40 self.value.fetch_add(delta, Ordering::Relaxed)
41 }
42}
43
44pub struct LockFreeStats {
46 queries: CachePadded<AtomicU64>,
47 inserts: CachePadded<AtomicU64>,
48 deletes: CachePadded<AtomicU64>,
49 total_latency_ns: CachePadded<AtomicU64>,
50}
51
52impl LockFreeStats {
53 pub fn new() -> Self {
54 Self {
55 queries: CachePadded::new(AtomicU64::new(0)),
56 inserts: CachePadded::new(AtomicU64::new(0)),
57 deletes: CachePadded::new(AtomicU64::new(0)),
58 total_latency_ns: CachePadded::new(AtomicU64::new(0)),
59 }
60 }
61
62 #[inline]
63 pub fn record_query(&self, latency_ns: u64) {
64 self.queries.fetch_add(1, Ordering::Relaxed);
65 self.total_latency_ns
66 .fetch_add(latency_ns, Ordering::Relaxed);
67 }
68
69 #[inline]
70 pub fn record_insert(&self) {
71 self.inserts.fetch_add(1, Ordering::Relaxed);
72 }
73
74 #[inline]
75 pub fn record_delete(&self) {
76 self.deletes.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn snapshot(&self) -> StatsSnapshot {
80 let queries = self.queries.load(Ordering::Relaxed);
81 let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
82
83 StatsSnapshot {
84 queries,
85 inserts: self.inserts.load(Ordering::Relaxed),
86 deletes: self.deletes.load(Ordering::Relaxed),
87 avg_latency_ns: if queries > 0 {
88 total_latency / queries
89 } else {
90 0
91 },
92 }
93 }
94}
95
96impl Default for LockFreeStats {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102#[derive(Debug, Clone)]
103pub struct StatsSnapshot {
104 pub queries: u64,
105 pub inserts: u64,
106 pub deletes: u64,
107 pub avg_latency_ns: u64,
108}
109
110pub struct ObjectPool<T> {
112 queue: Arc<SegQueue<T>>,
113 factory: Arc<dyn Fn() -> T + Send + Sync>,
114 capacity: usize,
115 allocated: AtomicUsize,
116}
117
118impl<T> ObjectPool<T> {
119 pub fn new<F>(capacity: usize, factory: F) -> Self
120 where
121 F: Fn() -> T + Send + Sync + 'static,
122 {
123 Self {
124 queue: Arc::new(SegQueue::new()),
125 factory: Arc::new(factory),
126 capacity,
127 allocated: AtomicUsize::new(0),
128 }
129 }
130
131 pub fn acquire(&self) -> PooledObject<T> {
133 let object = self.queue.pop().unwrap_or_else(|| {
134 let current = self.allocated.fetch_add(1, Ordering::Relaxed);
135 if current < self.capacity {
136 (self.factory)()
137 } else {
138 self.allocated.fetch_sub(1, Ordering::Relaxed);
139 loop {
141 if let Some(obj) = self.queue.pop() {
142 break obj;
143 }
144 std::hint::spin_loop();
145 }
146 }
147 });
148
149 PooledObject {
150 object: Some(object),
151 pool: Arc::clone(&self.queue),
152 }
153 }
154}
155
156pub struct PooledObject<T> {
158 object: Option<T>,
159 pool: Arc<SegQueue<T>>,
160}
161
162impl<T> PooledObject<T> {
163 pub fn get(&self) -> &T {
164 self.object.as_ref().unwrap()
165 }
166
167 pub fn get_mut(&mut self) -> &mut T {
168 self.object.as_mut().unwrap()
169 }
170}
171
172impl<T> Drop for PooledObject<T> {
173 fn drop(&mut self) {
174 if let Some(object) = self.object.take() {
175 self.pool.push(object);
176 }
177 }
178}
179
180impl<T> std::ops::Deref for PooledObject<T> {
181 type Target = T;
182
183 fn deref(&self) -> &Self::Target {
184 self.object.as_ref().unwrap()
185 }
186}
187
188impl<T> std::ops::DerefMut for PooledObject<T> {
189 fn deref_mut(&mut self) -> &mut Self::Target {
190 self.object.as_mut().unwrap()
191 }
192}
193
194pub struct LockFreeWorkQueue<T> {
196 queue: ArrayQueue<T>,
197}
198
199impl<T> LockFreeWorkQueue<T> {
200 pub fn new(capacity: usize) -> Self {
201 Self {
202 queue: ArrayQueue::new(capacity),
203 }
204 }
205
206 #[inline]
207 pub fn try_push(&self, item: T) -> Result<(), T> {
208 self.queue.push(item)
209 }
210
211 #[inline]
212 pub fn try_pop(&self) -> Option<T> {
213 self.queue.pop()
214 }
215
216 #[inline]
217 pub fn len(&self) -> usize {
218 self.queue.len()
219 }
220
221 #[inline]
222 pub fn is_empty(&self) -> bool {
223 self.queue.is_empty()
224 }
225}
226
227pub struct AtomicVectorPool {
232 pool: SegQueue<Vec<f32>>,
234 dimensions: usize,
236 max_size: usize,
238 size: AtomicUsize,
240 total_allocations: AtomicU64,
242 pool_hits: AtomicU64,
244}
245
246impl AtomicVectorPool {
247 pub fn new(dimensions: usize, initial_size: usize, max_size: usize) -> Self {
249 let pool = SegQueue::new();
250
251 for _ in 0..initial_size {
253 pool.push(vec![0.0; dimensions]);
254 }
255
256 Self {
257 pool,
258 dimensions,
259 max_size,
260 size: AtomicUsize::new(initial_size),
261 total_allocations: AtomicU64::new(0),
262 pool_hits: AtomicU64::new(0),
263 }
264 }
265
266 pub fn acquire(&self) -> PooledVector {
268 self.total_allocations.fetch_add(1, Ordering::Relaxed);
269
270 let vec = if let Some(mut v) = self.pool.pop() {
271 self.pool_hits.fetch_add(1, Ordering::Relaxed);
272 v.fill(0.0);
274 v
275 } else {
276 vec![0.0; self.dimensions]
278 };
279
280 PooledVector {
281 vec: Some(vec),
282 pool: self,
283 }
284 }
285
286 fn return_to_pool(&self, vec: Vec<f32>) {
288 let current_size = self.size.load(Ordering::Relaxed);
289 if current_size < self.max_size {
290 self.pool.push(vec);
291 self.size.fetch_add(1, Ordering::Relaxed);
292 }
293 }
295
296 pub fn stats(&self) -> VectorPoolStats {
298 let total = self.total_allocations.load(Ordering::Relaxed);
299 let hits = self.pool_hits.load(Ordering::Relaxed);
300 let hit_rate = if total > 0 {
301 hits as f64 / total as f64
302 } else {
303 0.0
304 };
305
306 VectorPoolStats {
307 total_allocations: total,
308 pool_hits: hits,
309 hit_rate,
310 current_size: self.size.load(Ordering::Relaxed),
311 max_size: self.max_size,
312 }
313 }
314
315 pub fn dimensions(&self) -> usize {
317 self.dimensions
318 }
319}
320
321#[derive(Debug, Clone)]
323pub struct VectorPoolStats {
324 pub total_allocations: u64,
325 pub pool_hits: u64,
326 pub hit_rate: f64,
327 pub current_size: usize,
328 pub max_size: usize,
329}
330
331pub struct PooledVector<'a> {
333 vec: Option<Vec<f32>>,
334 pool: &'a AtomicVectorPool,
335}
336
337impl<'a> PooledVector<'a> {
338 pub fn as_slice(&self) -> &[f32] {
340 self.vec.as_ref().unwrap()
341 }
342
343 pub fn as_mut_slice(&mut self) -> &mut [f32] {
345 self.vec.as_mut().unwrap()
346 }
347
348 pub fn copy_from(&mut self, src: &[f32]) {
350 let vec = self.vec.as_mut().unwrap();
351 assert_eq!(vec.len(), src.len(), "Dimension mismatch");
352 vec.copy_from_slice(src);
353 }
354
355 pub fn detach(mut self) -> Vec<f32> {
357 self.vec.take().unwrap()
358 }
359}
360
361impl<'a> Drop for PooledVector<'a> {
362 fn drop(&mut self) {
363 if let Some(vec) = self.vec.take() {
364 self.pool.return_to_pool(vec);
365 }
366 }
367}
368
369impl<'a> std::ops::Deref for PooledVector<'a> {
370 type Target = [f32];
371
372 fn deref(&self) -> &[f32] {
373 self.as_slice()
374 }
375}
376
377impl<'a> std::ops::DerefMut for PooledVector<'a> {
378 fn deref_mut(&mut self) -> &mut [f32] {
379 self.as_mut_slice()
380 }
381}
382
383pub struct LockFreeBatchProcessor {
387 work_queue: ArrayQueue<BatchItem>,
389 results_queue: SegQueue<BatchResult>,
391 pending: AtomicUsize,
393 completed: AtomicUsize,
395}
396
397#[derive(Debug)]
399pub struct BatchItem {
400 pub id: u64,
401 pub data: Vec<f32>,
402}
403
404pub struct BatchResult {
406 pub id: u64,
407 pub result: Vec<f32>,
408}
409
410impl LockFreeBatchProcessor {
411 pub fn new(capacity: usize) -> Self {
413 Self {
414 work_queue: ArrayQueue::new(capacity),
415 results_queue: SegQueue::new(),
416 pending: AtomicUsize::new(0),
417 completed: AtomicUsize::new(0),
418 }
419 }
420
421 pub fn submit(&self, item: BatchItem) -> Result<(), BatchItem> {
423 self.pending.fetch_add(1, Ordering::Relaxed);
424 self.work_queue.push(item)
425 }
426
427 pub fn try_get_work(&self) -> Option<BatchItem> {
429 self.work_queue.pop()
430 }
431
432 pub fn submit_result(&self, result: BatchResult) {
434 self.completed.fetch_add(1, Ordering::Relaxed);
435 self.results_queue.push(result);
436 }
437
438 pub fn collect_results(&self) -> Vec<BatchResult> {
440 let mut results = Vec::new();
441 while let Some(result) = self.results_queue.pop() {
442 results.push(result);
443 }
444 results
445 }
446
447 pub fn pending(&self) -> usize {
449 self.pending.load(Ordering::Relaxed)
450 }
451
452 pub fn completed(&self) -> usize {
454 self.completed.load(Ordering::Relaxed)
455 }
456
457 pub fn is_done(&self) -> bool {
459 self.pending() == self.completed()
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use std::thread;
467
468 #[test]
469 fn test_lockfree_counter() {
470 let counter = Arc::new(LockFreeCounter::new(0));
471 let mut handles = vec![];
472
473 for _ in 0..10 {
474 let counter_clone = Arc::clone(&counter);
475 handles.push(thread::spawn(move || {
476 for _ in 0..1000 {
477 counter_clone.increment();
478 }
479 }));
480 }
481
482 for handle in handles {
483 handle.join().unwrap();
484 }
485
486 assert_eq!(counter.get(), 10000);
487 }
488
489 #[test]
490 fn test_object_pool() {
491 let pool = ObjectPool::new(4, || Vec::<u8>::with_capacity(1024));
492
493 let mut obj1 = pool.acquire();
494 obj1.push(1);
495 assert_eq!(obj1.len(), 1);
496
497 drop(obj1);
498
499 let obj2 = pool.acquire();
500 assert!(obj2.capacity() >= 1024);
502 }
503
504 #[test]
505 fn test_stats_collector() {
506 let stats = LockFreeStats::new();
507
508 stats.record_query(1000);
509 stats.record_query(2000);
510 stats.record_insert();
511
512 let snapshot = stats.snapshot();
513 assert_eq!(snapshot.queries, 2);
514 assert_eq!(snapshot.inserts, 1);
515 assert_eq!(snapshot.avg_latency_ns, 1500);
516 }
517
518 #[test]
519 fn test_atomic_vector_pool() {
520 let pool = AtomicVectorPool::new(4, 2, 10);
521
522 let mut v1 = pool.acquire();
524 v1.copy_from(&[1.0, 2.0, 3.0, 4.0]);
525 assert_eq!(v1.as_slice(), &[1.0, 2.0, 3.0, 4.0]);
526
527 let mut v2 = pool.acquire();
529 v2.copy_from(&[5.0, 6.0, 7.0, 8.0]);
530
531 let stats = pool.stats();
533 assert_eq!(stats.total_allocations, 2);
534 }
535
536 #[test]
537 fn test_vector_pool_reuse() {
538 let pool = AtomicVectorPool::new(3, 1, 5);
539
540 {
542 let mut v = pool.acquire();
543 v.copy_from(&[1.0, 2.0, 3.0]);
544 } let _v2 = pool.acquire();
548
549 let stats = pool.stats();
550 assert_eq!(stats.total_allocations, 2);
551 assert!(stats.pool_hits >= 1, "Should have at least one pool hit");
552 }
553
554 #[test]
555 fn test_batch_processor() {
556 let processor = LockFreeBatchProcessor::new(10);
557
558 processor
560 .submit(BatchItem {
561 id: 1,
562 data: vec![1.0, 2.0],
563 })
564 .unwrap();
565 processor
566 .submit(BatchItem {
567 id: 2,
568 data: vec![3.0, 4.0],
569 })
570 .unwrap();
571
572 assert_eq!(processor.pending(), 2);
573
574 while let Some(item) = processor.try_get_work() {
576 let result = BatchResult {
577 id: item.id,
578 result: item.data.iter().map(|x| x * 2.0).collect(),
579 };
580 processor.submit_result(result);
581 }
582
583 assert!(processor.is_done());
584 assert_eq!(processor.completed(), 2);
585
586 let results = processor.collect_results();
588 assert_eq!(results.len(), 2);
589 }
590}