Skip to main content

ruvector_core/
lockfree.rs

1//! Lock-free data structures for high-concurrency operations
2//!
3//! This module provides lock-free implementations of common data structures
4//! to minimize contention and improve scalability.
5//!
6//! Note: This module requires the `parallel` feature and is not available on WASM.
7
8#![cfg(all(feature = "parallel", not(target_arch = "wasm32")))]
9
10use crossbeam::queue::{ArrayQueue, SegQueue};
11use crossbeam::utils::CachePadded;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14
15/// Lock-free counter with cache padding to prevent false sharing
16#[repr(align(64))]
17pub struct LockFreeCounter {
18    value: CachePadded<AtomicU64>,
19}
20
21impl LockFreeCounter {
22    pub fn new(initial: u64) -> Self {
23        Self {
24            value: CachePadded::new(AtomicU64::new(initial)),
25        }
26    }
27
28    #[inline]
29    pub fn increment(&self) -> u64 {
30        self.value.fetch_add(1, Ordering::Relaxed)
31    }
32
33    #[inline]
34    pub fn get(&self) -> u64 {
35        self.value.load(Ordering::Relaxed)
36    }
37
38    #[inline]
39    pub fn add(&self, delta: u64) -> u64 {
40        self.value.fetch_add(delta, Ordering::Relaxed)
41    }
42}
43
44/// Lock-free statistics collector
45pub struct LockFreeStats {
46    queries: CachePadded<AtomicU64>,
47    inserts: CachePadded<AtomicU64>,
48    deletes: CachePadded<AtomicU64>,
49    total_latency_ns: CachePadded<AtomicU64>,
50}
51
52impl LockFreeStats {
53    pub fn new() -> Self {
54        Self {
55            queries: CachePadded::new(AtomicU64::new(0)),
56            inserts: CachePadded::new(AtomicU64::new(0)),
57            deletes: CachePadded::new(AtomicU64::new(0)),
58            total_latency_ns: CachePadded::new(AtomicU64::new(0)),
59        }
60    }
61
62    #[inline]
63    pub fn record_query(&self, latency_ns: u64) {
64        self.queries.fetch_add(1, Ordering::Relaxed);
65        self.total_latency_ns
66            .fetch_add(latency_ns, Ordering::Relaxed);
67    }
68
69    #[inline]
70    pub fn record_insert(&self) {
71        self.inserts.fetch_add(1, Ordering::Relaxed);
72    }
73
74    #[inline]
75    pub fn record_delete(&self) {
76        self.deletes.fetch_add(1, Ordering::Relaxed);
77    }
78
79    pub fn snapshot(&self) -> StatsSnapshot {
80        let queries = self.queries.load(Ordering::Relaxed);
81        let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
82
83        StatsSnapshot {
84            queries,
85            inserts: self.inserts.load(Ordering::Relaxed),
86            deletes: self.deletes.load(Ordering::Relaxed),
87            avg_latency_ns: if queries > 0 {
88                total_latency / queries
89            } else {
90                0
91            },
92        }
93    }
94}
95
96impl Default for LockFreeStats {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102#[derive(Debug, Clone)]
103pub struct StatsSnapshot {
104    pub queries: u64,
105    pub inserts: u64,
106    pub deletes: u64,
107    pub avg_latency_ns: u64,
108}
109
110/// Lock-free object pool for reducing allocations
111pub struct ObjectPool<T> {
112    queue: Arc<SegQueue<T>>,
113    factory: Arc<dyn Fn() -> T + Send + Sync>,
114    capacity: usize,
115    allocated: AtomicUsize,
116}
117
118impl<T> ObjectPool<T> {
119    pub fn new<F>(capacity: usize, factory: F) -> Self
120    where
121        F: Fn() -> T + Send + Sync + 'static,
122    {
123        Self {
124            queue: Arc::new(SegQueue::new()),
125            factory: Arc::new(factory),
126            capacity,
127            allocated: AtomicUsize::new(0),
128        }
129    }
130
131    /// Get an object from the pool or create a new one
132    pub fn acquire(&self) -> PooledObject<T> {
133        let object = self.queue.pop().unwrap_or_else(|| {
134            let current = self.allocated.fetch_add(1, Ordering::Relaxed);
135            if current < self.capacity {
136                (self.factory)()
137            } else {
138                self.allocated.fetch_sub(1, Ordering::Relaxed);
139                // Wait for an object to be returned
140                loop {
141                    if let Some(obj) = self.queue.pop() {
142                        break obj;
143                    }
144                    std::hint::spin_loop();
145                }
146            }
147        });
148
149        PooledObject {
150            object: Some(object),
151            pool: Arc::clone(&self.queue),
152        }
153    }
154}
155
156/// RAII wrapper for pooled objects
157pub struct PooledObject<T> {
158    object: Option<T>,
159    pool: Arc<SegQueue<T>>,
160}
161
162impl<T> PooledObject<T> {
163    pub fn get(&self) -> &T {
164        self.object.as_ref().unwrap()
165    }
166
167    pub fn get_mut(&mut self) -> &mut T {
168        self.object.as_mut().unwrap()
169    }
170}
171
172impl<T> Drop for PooledObject<T> {
173    fn drop(&mut self) {
174        if let Some(object) = self.object.take() {
175            self.pool.push(object);
176        }
177    }
178}
179
180impl<T> std::ops::Deref for PooledObject<T> {
181    type Target = T;
182
183    fn deref(&self) -> &Self::Target {
184        self.object.as_ref().unwrap()
185    }
186}
187
188impl<T> std::ops::DerefMut for PooledObject<T> {
189    fn deref_mut(&mut self) -> &mut Self::Target {
190        self.object.as_mut().unwrap()
191    }
192}
193
194/// Lock-free ring buffer for work distribution
195pub struct LockFreeWorkQueue<T> {
196    queue: ArrayQueue<T>,
197}
198
199impl<T> LockFreeWorkQueue<T> {
200    pub fn new(capacity: usize) -> Self {
201        Self {
202            queue: ArrayQueue::new(capacity),
203        }
204    }
205
206    #[inline]
207    pub fn try_push(&self, item: T) -> Result<(), T> {
208        self.queue.push(item)
209    }
210
211    #[inline]
212    pub fn try_pop(&self) -> Option<T> {
213        self.queue.pop()
214    }
215
216    #[inline]
217    pub fn len(&self) -> usize {
218        self.queue.len()
219    }
220
221    #[inline]
222    pub fn is_empty(&self) -> bool {
223        self.queue.is_empty()
224    }
225}
226
227/// Atomic vector pool for lock-free vector operations (ADR-001)
228///
229/// Provides a pool of pre-allocated vectors that can be acquired and released
230/// without locking, ideal for high-throughput batch operations.
231pub struct AtomicVectorPool {
232    /// Pool of available vectors
233    pool: SegQueue<Vec<f32>>,
234    /// Dimensions per vector
235    dimensions: usize,
236    /// Maximum pool size
237    max_size: usize,
238    /// Current pool size
239    size: AtomicUsize,
240    /// Total allocations
241    total_allocations: AtomicU64,
242    /// Pool hits (reused vectors)
243    pool_hits: AtomicU64,
244}
245
246impl AtomicVectorPool {
247    /// Create a new atomic vector pool
248    pub fn new(dimensions: usize, initial_size: usize, max_size: usize) -> Self {
249        let pool = SegQueue::new();
250
251        // Pre-allocate vectors
252        for _ in 0..initial_size {
253            pool.push(vec![0.0; dimensions]);
254        }
255
256        Self {
257            pool,
258            dimensions,
259            max_size,
260            size: AtomicUsize::new(initial_size),
261            total_allocations: AtomicU64::new(0),
262            pool_hits: AtomicU64::new(0),
263        }
264    }
265
266    /// Acquire a vector from the pool (or allocate new one)
267    pub fn acquire(&self) -> PooledVector {
268        self.total_allocations.fetch_add(1, Ordering::Relaxed);
269
270        let vec = if let Some(mut v) = self.pool.pop() {
271            self.pool_hits.fetch_add(1, Ordering::Relaxed);
272            // Clear the vector for reuse
273            v.fill(0.0);
274            v
275        } else {
276            // Allocate new vector
277            vec![0.0; self.dimensions]
278        };
279
280        PooledVector {
281            vec: Some(vec),
282            pool: self,
283        }
284    }
285
286    /// Return a vector to the pool
287    fn return_to_pool(&self, vec: Vec<f32>) {
288        let current_size = self.size.load(Ordering::Relaxed);
289        if current_size < self.max_size {
290            self.pool.push(vec);
291            self.size.fetch_add(1, Ordering::Relaxed);
292        }
293        // If pool is full, vector is dropped
294    }
295
296    /// Get pool statistics
297    pub fn stats(&self) -> VectorPoolStats {
298        let total = self.total_allocations.load(Ordering::Relaxed);
299        let hits = self.pool_hits.load(Ordering::Relaxed);
300        let hit_rate = if total > 0 {
301            hits as f64 / total as f64
302        } else {
303            0.0
304        };
305
306        VectorPoolStats {
307            total_allocations: total,
308            pool_hits: hits,
309            hit_rate,
310            current_size: self.size.load(Ordering::Relaxed),
311            max_size: self.max_size,
312        }
313    }
314
315    /// Get dimensions
316    pub fn dimensions(&self) -> usize {
317        self.dimensions
318    }
319}
320
321/// Statistics for the vector pool
322#[derive(Debug, Clone)]
323pub struct VectorPoolStats {
324    pub total_allocations: u64,
325    pub pool_hits: u64,
326    pub hit_rate: f64,
327    pub current_size: usize,
328    pub max_size: usize,
329}
330
331/// RAII wrapper for pooled vectors
332pub struct PooledVector<'a> {
333    vec: Option<Vec<f32>>,
334    pool: &'a AtomicVectorPool,
335}
336
337impl<'a> PooledVector<'a> {
338    /// Get as slice
339    pub fn as_slice(&self) -> &[f32] {
340        self.vec.as_ref().unwrap()
341    }
342
343    /// Get as mutable slice
344    pub fn as_mut_slice(&mut self) -> &mut [f32] {
345        self.vec.as_mut().unwrap()
346    }
347
348    /// Copy from source slice
349    pub fn copy_from(&mut self, src: &[f32]) {
350        let vec = self.vec.as_mut().unwrap();
351        assert_eq!(vec.len(), src.len(), "Dimension mismatch");
352        vec.copy_from_slice(src);
353    }
354
355    /// Detach the vector from the pool (it won't be returned)
356    pub fn detach(mut self) -> Vec<f32> {
357        self.vec.take().unwrap()
358    }
359}
360
361impl<'a> Drop for PooledVector<'a> {
362    fn drop(&mut self) {
363        if let Some(vec) = self.vec.take() {
364            self.pool.return_to_pool(vec);
365        }
366    }
367}
368
369impl<'a> std::ops::Deref for PooledVector<'a> {
370    type Target = [f32];
371
372    fn deref(&self) -> &[f32] {
373        self.as_slice()
374    }
375}
376
377impl<'a> std::ops::DerefMut for PooledVector<'a> {
378    fn deref_mut(&mut self) -> &mut [f32] {
379        self.as_mut_slice()
380    }
381}
382
383/// Lock-free batch processor for parallel vector operations (ADR-001)
384///
385/// Distributes work across multiple workers without contention.
386pub struct LockFreeBatchProcessor {
387    /// Work queue for pending items
388    work_queue: ArrayQueue<BatchItem>,
389    /// Results queue
390    results_queue: SegQueue<BatchResult>,
391    /// Pending count
392    pending: AtomicUsize,
393    /// Completed count
394    completed: AtomicUsize,
395}
396
397/// Item in the batch work queue
398#[derive(Debug)]
399pub struct BatchItem {
400    pub id: u64,
401    pub data: Vec<f32>,
402}
403
404/// Result from batch processing
405pub struct BatchResult {
406    pub id: u64,
407    pub result: Vec<f32>,
408}
409
410impl LockFreeBatchProcessor {
411    /// Create a new batch processor with given capacity
412    pub fn new(capacity: usize) -> Self {
413        Self {
414            work_queue: ArrayQueue::new(capacity),
415            results_queue: SegQueue::new(),
416            pending: AtomicUsize::new(0),
417            completed: AtomicUsize::new(0),
418        }
419    }
420
421    /// Submit a batch item for processing
422    pub fn submit(&self, item: BatchItem) -> Result<(), BatchItem> {
423        self.pending.fetch_add(1, Ordering::Relaxed);
424        self.work_queue.push(item)
425    }
426
427    /// Try to get a work item (for workers)
428    pub fn try_get_work(&self) -> Option<BatchItem> {
429        self.work_queue.pop()
430    }
431
432    /// Submit a result (from workers)
433    pub fn submit_result(&self, result: BatchResult) {
434        self.completed.fetch_add(1, Ordering::Relaxed);
435        self.results_queue.push(result);
436    }
437
438    /// Collect all available results
439    pub fn collect_results(&self) -> Vec<BatchResult> {
440        let mut results = Vec::new();
441        while let Some(result) = self.results_queue.pop() {
442            results.push(result);
443        }
444        results
445    }
446
447    /// Get pending count
448    pub fn pending(&self) -> usize {
449        self.pending.load(Ordering::Relaxed)
450    }
451
452    /// Get completed count
453    pub fn completed(&self) -> usize {
454        self.completed.load(Ordering::Relaxed)
455    }
456
457    /// Check if all work is done
458    pub fn is_done(&self) -> bool {
459        self.pending() == self.completed()
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use std::thread;
467
468    #[test]
469    fn test_lockfree_counter() {
470        let counter = Arc::new(LockFreeCounter::new(0));
471        let mut handles = vec![];
472
473        for _ in 0..10 {
474            let counter_clone = Arc::clone(&counter);
475            handles.push(thread::spawn(move || {
476                for _ in 0..1000 {
477                    counter_clone.increment();
478                }
479            }));
480        }
481
482        for handle in handles {
483            handle.join().unwrap();
484        }
485
486        assert_eq!(counter.get(), 10000);
487    }
488
489    #[test]
490    fn test_object_pool() {
491        let pool = ObjectPool::new(4, || Vec::<u8>::with_capacity(1024));
492
493        let mut obj1 = pool.acquire();
494        obj1.push(1);
495        assert_eq!(obj1.len(), 1);
496
497        drop(obj1);
498
499        let obj2 = pool.acquire();
500        // Object should be reused (but cleared state is not guaranteed)
501        assert!(obj2.capacity() >= 1024);
502    }
503
504    #[test]
505    fn test_stats_collector() {
506        let stats = LockFreeStats::new();
507
508        stats.record_query(1000);
509        stats.record_query(2000);
510        stats.record_insert();
511
512        let snapshot = stats.snapshot();
513        assert_eq!(snapshot.queries, 2);
514        assert_eq!(snapshot.inserts, 1);
515        assert_eq!(snapshot.avg_latency_ns, 1500);
516    }
517
518    #[test]
519    fn test_atomic_vector_pool() {
520        let pool = AtomicVectorPool::new(4, 2, 10);
521
522        // Acquire first vector
523        let mut v1 = pool.acquire();
524        v1.copy_from(&[1.0, 2.0, 3.0, 4.0]);
525        assert_eq!(v1.as_slice(), &[1.0, 2.0, 3.0, 4.0]);
526
527        // Acquire second vector
528        let mut v2 = pool.acquire();
529        v2.copy_from(&[5.0, 6.0, 7.0, 8.0]);
530
531        // Stats should show allocations
532        let stats = pool.stats();
533        assert_eq!(stats.total_allocations, 2);
534    }
535
536    #[test]
537    fn test_vector_pool_reuse() {
538        let pool = AtomicVectorPool::new(3, 1, 5);
539
540        // Acquire and release
541        {
542            let mut v = pool.acquire();
543            v.copy_from(&[1.0, 2.0, 3.0]);
544        } // v is returned to pool here
545
546        // Acquire again - should be a pool hit
547        let _v2 = pool.acquire();
548
549        let stats = pool.stats();
550        assert_eq!(stats.total_allocations, 2);
551        assert!(stats.pool_hits >= 1, "Should have at least one pool hit");
552    }
553
554    #[test]
555    fn test_batch_processor() {
556        let processor = LockFreeBatchProcessor::new(10);
557
558        // Submit work items
559        processor
560            .submit(BatchItem {
561                id: 1,
562                data: vec![1.0, 2.0],
563            })
564            .unwrap();
565        processor
566            .submit(BatchItem {
567                id: 2,
568                data: vec![3.0, 4.0],
569            })
570            .unwrap();
571
572        assert_eq!(processor.pending(), 2);
573
574        // Process work
575        while let Some(item) = processor.try_get_work() {
576            let result = BatchResult {
577                id: item.id,
578                result: item.data.iter().map(|x| x * 2.0).collect(),
579            };
580            processor.submit_result(result);
581        }
582
583        assert!(processor.is_done());
584        assert_eq!(processor.completed(), 2);
585
586        // Collect results
587        let results = processor.collect_results();
588        assert_eq!(results.len(), 2);
589    }
590}