ruvector_core/
lockfree.rs

1//! Lock-free data structures for high-concurrency operations
2//!
3//! This module provides lock-free implementations of common data structures
4//! to minimize contention and improve scalability.
5//!
6//! Note: This module requires the `parallel` feature and is not available on WASM.
7
8#![cfg(all(feature = "parallel", not(target_arch = "wasm32")))]
9
10use crossbeam::queue::{ArrayQueue, SegQueue};
11use crossbeam::utils::CachePadded;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14
15/// Lock-free counter with cache padding to prevent false sharing
16#[repr(align(64))]
17pub struct LockFreeCounter {
18    value: CachePadded<AtomicU64>,
19}
20
21impl LockFreeCounter {
22    pub fn new(initial: u64) -> Self {
23        Self {
24            value: CachePadded::new(AtomicU64::new(initial)),
25        }
26    }
27
28    #[inline]
29    pub fn increment(&self) -> u64 {
30        self.value.fetch_add(1, Ordering::Relaxed)
31    }
32
33    #[inline]
34    pub fn get(&self) -> u64 {
35        self.value.load(Ordering::Relaxed)
36    }
37
38    #[inline]
39    pub fn add(&self, delta: u64) -> u64 {
40        self.value.fetch_add(delta, Ordering::Relaxed)
41    }
42}
43
44/// Lock-free statistics collector
45pub struct LockFreeStats {
46    queries: CachePadded<AtomicU64>,
47    inserts: CachePadded<AtomicU64>,
48    deletes: CachePadded<AtomicU64>,
49    total_latency_ns: CachePadded<AtomicU64>,
50}
51
52impl LockFreeStats {
53    pub fn new() -> Self {
54        Self {
55            queries: CachePadded::new(AtomicU64::new(0)),
56            inserts: CachePadded::new(AtomicU64::new(0)),
57            deletes: CachePadded::new(AtomicU64::new(0)),
58            total_latency_ns: CachePadded::new(AtomicU64::new(0)),
59        }
60    }
61
62    #[inline]
63    pub fn record_query(&self, latency_ns: u64) {
64        self.queries.fetch_add(1, Ordering::Relaxed);
65        self.total_latency_ns
66            .fetch_add(latency_ns, Ordering::Relaxed);
67    }
68
69    #[inline]
70    pub fn record_insert(&self) {
71        self.inserts.fetch_add(1, Ordering::Relaxed);
72    }
73
74    #[inline]
75    pub fn record_delete(&self) {
76        self.deletes.fetch_add(1, Ordering::Relaxed);
77    }
78
79    pub fn snapshot(&self) -> StatsSnapshot {
80        let queries = self.queries.load(Ordering::Relaxed);
81        let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
82
83        StatsSnapshot {
84            queries,
85            inserts: self.inserts.load(Ordering::Relaxed),
86            deletes: self.deletes.load(Ordering::Relaxed),
87            avg_latency_ns: if queries > 0 {
88                total_latency / queries
89            } else {
90                0
91            },
92        }
93    }
94}
95
96impl Default for LockFreeStats {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102#[derive(Debug, Clone)]
103pub struct StatsSnapshot {
104    pub queries: u64,
105    pub inserts: u64,
106    pub deletes: u64,
107    pub avg_latency_ns: u64,
108}
109
110/// Lock-free object pool for reducing allocations
111pub struct ObjectPool<T> {
112    queue: Arc<SegQueue<T>>,
113    factory: Arc<dyn Fn() -> T + Send + Sync>,
114    capacity: usize,
115    allocated: AtomicUsize,
116}
117
118impl<T> ObjectPool<T> {
119    pub fn new<F>(capacity: usize, factory: F) -> Self
120    where
121        F: Fn() -> T + Send + Sync + 'static,
122    {
123        Self {
124            queue: Arc::new(SegQueue::new()),
125            factory: Arc::new(factory),
126            capacity,
127            allocated: AtomicUsize::new(0),
128        }
129    }
130
131    /// Get an object from the pool or create a new one
132    pub fn acquire(&self) -> PooledObject<T> {
133        let object = self.queue.pop().unwrap_or_else(|| {
134            let current = self.allocated.fetch_add(1, Ordering::Relaxed);
135            if current < self.capacity {
136                (self.factory)()
137            } else {
138                self.allocated.fetch_sub(1, Ordering::Relaxed);
139                // Wait for an object to be returned
140                loop {
141                    if let Some(obj) = self.queue.pop() {
142                        break obj;
143                    }
144                    std::hint::spin_loop();
145                }
146            }
147        });
148
149        PooledObject {
150            object: Some(object),
151            pool: Arc::clone(&self.queue),
152        }
153    }
154}
155
156/// RAII wrapper for pooled objects
157pub struct PooledObject<T> {
158    object: Option<T>,
159    pool: Arc<SegQueue<T>>,
160}
161
162impl<T> PooledObject<T> {
163    pub fn get(&self) -> &T {
164        self.object.as_ref().unwrap()
165    }
166
167    pub fn get_mut(&mut self) -> &mut T {
168        self.object.as_mut().unwrap()
169    }
170}
171
172impl<T> Drop for PooledObject<T> {
173    fn drop(&mut self) {
174        if let Some(object) = self.object.take() {
175            self.pool.push(object);
176        }
177    }
178}
179
180impl<T> std::ops::Deref for PooledObject<T> {
181    type Target = T;
182
183    fn deref(&self) -> &Self::Target {
184        self.object.as_ref().unwrap()
185    }
186}
187
188impl<T> std::ops::DerefMut for PooledObject<T> {
189    fn deref_mut(&mut self) -> &mut Self::Target {
190        self.object.as_mut().unwrap()
191    }
192}
193
194/// Lock-free ring buffer for work distribution
195pub struct LockFreeWorkQueue<T> {
196    queue: ArrayQueue<T>,
197}
198
199impl<T> LockFreeWorkQueue<T> {
200    pub fn new(capacity: usize) -> Self {
201        Self {
202            queue: ArrayQueue::new(capacity),
203        }
204    }
205
206    #[inline]
207    pub fn try_push(&self, item: T) -> Result<(), T> {
208        self.queue.push(item)
209    }
210
211    #[inline]
212    pub fn try_pop(&self) -> Option<T> {
213        self.queue.pop()
214    }
215
216    #[inline]
217    pub fn len(&self) -> usize {
218        self.queue.len()
219    }
220
221    #[inline]
222    pub fn is_empty(&self) -> bool {
223        self.queue.is_empty()
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use std::thread;
231
232    #[test]
233    fn test_lockfree_counter() {
234        let counter = Arc::new(LockFreeCounter::new(0));
235        let mut handles = vec![];
236
237        for _ in 0..10 {
238            let counter_clone = Arc::clone(&counter);
239            handles.push(thread::spawn(move || {
240                for _ in 0..1000 {
241                    counter_clone.increment();
242                }
243            }));
244        }
245
246        for handle in handles {
247            handle.join().unwrap();
248        }
249
250        assert_eq!(counter.get(), 10000);
251    }
252
253    #[test]
254    fn test_object_pool() {
255        let pool = ObjectPool::new(4, || Vec::<u8>::with_capacity(1024));
256
257        let mut obj1 = pool.acquire();
258        obj1.push(1);
259        assert_eq!(obj1.len(), 1);
260
261        drop(obj1);
262
263        let obj2 = pool.acquire();
264        // Object should be reused (but cleared state is not guaranteed)
265        assert!(obj2.capacity() >= 1024);
266    }
267
268    #[test]
269    fn test_stats_collector() {
270        let stats = LockFreeStats::new();
271
272        stats.record_query(1000);
273        stats.record_query(2000);
274        stats.record_insert();
275
276        let snapshot = stats.snapshot();
277        assert_eq!(snapshot.queries, 2);
278        assert_eq!(snapshot.inserts, 1);
279        assert_eq!(snapshot.avg_latency_ns, 1500);
280    }
281}