ruvector_core/
lockfree.rs

1//! Lock-free data structures for high-concurrency operations
2//!
3//! This module provides lock-free implementations of common data structures
4//! to minimize contention and improve scalability.
5
6use crossbeam::queue::{ArrayQueue, SegQueue};
7use crossbeam::utils::CachePadded;
8use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11/// Lock-free counter with cache padding to prevent false sharing
12#[repr(align(64))]
13pub struct LockFreeCounter {
14    value: CachePadded<AtomicU64>,
15}
16
17impl LockFreeCounter {
18    pub fn new(initial: u64) -> Self {
19        Self {
20            value: CachePadded::new(AtomicU64::new(initial)),
21        }
22    }
23
24    #[inline]
25    pub fn increment(&self) -> u64 {
26        self.value.fetch_add(1, Ordering::Relaxed)
27    }
28
29    #[inline]
30    pub fn get(&self) -> u64 {
31        self.value.load(Ordering::Relaxed)
32    }
33
34    #[inline]
35    pub fn add(&self, delta: u64) -> u64 {
36        self.value.fetch_add(delta, Ordering::Relaxed)
37    }
38}
39
40/// Lock-free statistics collector
41pub struct LockFreeStats {
42    queries: CachePadded<AtomicU64>,
43    inserts: CachePadded<AtomicU64>,
44    deletes: CachePadded<AtomicU64>,
45    total_latency_ns: CachePadded<AtomicU64>,
46}
47
48impl LockFreeStats {
49    pub fn new() -> Self {
50        Self {
51            queries: CachePadded::new(AtomicU64::new(0)),
52            inserts: CachePadded::new(AtomicU64::new(0)),
53            deletes: CachePadded::new(AtomicU64::new(0)),
54            total_latency_ns: CachePadded::new(AtomicU64::new(0)),
55        }
56    }
57
58    #[inline]
59    pub fn record_query(&self, latency_ns: u64) {
60        self.queries.fetch_add(1, Ordering::Relaxed);
61        self.total_latency_ns
62            .fetch_add(latency_ns, Ordering::Relaxed);
63    }
64
65    #[inline]
66    pub fn record_insert(&self) {
67        self.inserts.fetch_add(1, Ordering::Relaxed);
68    }
69
70    #[inline]
71    pub fn record_delete(&self) {
72        self.deletes.fetch_add(1, Ordering::Relaxed);
73    }
74
75    pub fn snapshot(&self) -> StatsSnapshot {
76        let queries = self.queries.load(Ordering::Relaxed);
77        let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
78
79        StatsSnapshot {
80            queries,
81            inserts: self.inserts.load(Ordering::Relaxed),
82            deletes: self.deletes.load(Ordering::Relaxed),
83            avg_latency_ns: if queries > 0 {
84                total_latency / queries
85            } else {
86                0
87            },
88        }
89    }
90}
91
92impl Default for LockFreeStats {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98#[derive(Debug, Clone)]
99pub struct StatsSnapshot {
100    pub queries: u64,
101    pub inserts: u64,
102    pub deletes: u64,
103    pub avg_latency_ns: u64,
104}
105
106/// Lock-free object pool for reducing allocations
107pub struct ObjectPool<T> {
108    queue: Arc<SegQueue<T>>,
109    factory: Arc<dyn Fn() -> T + Send + Sync>,
110    capacity: usize,
111    allocated: AtomicUsize,
112}
113
114impl<T> ObjectPool<T> {
115    pub fn new<F>(capacity: usize, factory: F) -> Self
116    where
117        F: Fn() -> T + Send + Sync + 'static,
118    {
119        Self {
120            queue: Arc::new(SegQueue::new()),
121            factory: Arc::new(factory),
122            capacity,
123            allocated: AtomicUsize::new(0),
124        }
125    }
126
127    /// Get an object from the pool or create a new one
128    pub fn acquire(&self) -> PooledObject<T> {
129        let object = self.queue.pop().unwrap_or_else(|| {
130            let current = self.allocated.fetch_add(1, Ordering::Relaxed);
131            if current < self.capacity {
132                (self.factory)()
133            } else {
134                self.allocated.fetch_sub(1, Ordering::Relaxed);
135                // Wait for an object to be returned
136                loop {
137                    if let Some(obj) = self.queue.pop() {
138                        break obj;
139                    }
140                    std::hint::spin_loop();
141                }
142            }
143        });
144
145        PooledObject {
146            object: Some(object),
147            pool: Arc::clone(&self.queue),
148        }
149    }
150}
151
152/// RAII wrapper for pooled objects
153pub struct PooledObject<T> {
154    object: Option<T>,
155    pool: Arc<SegQueue<T>>,
156}
157
158impl<T> PooledObject<T> {
159    pub fn get(&self) -> &T {
160        self.object.as_ref().unwrap()
161    }
162
163    pub fn get_mut(&mut self) -> &mut T {
164        self.object.as_mut().unwrap()
165    }
166}
167
168impl<T> Drop for PooledObject<T> {
169    fn drop(&mut self) {
170        if let Some(object) = self.object.take() {
171            self.pool.push(object);
172        }
173    }
174}
175
176impl<T> std::ops::Deref for PooledObject<T> {
177    type Target = T;
178
179    fn deref(&self) -> &Self::Target {
180        self.object.as_ref().unwrap()
181    }
182}
183
184impl<T> std::ops::DerefMut for PooledObject<T> {
185    fn deref_mut(&mut self) -> &mut Self::Target {
186        self.object.as_mut().unwrap()
187    }
188}
189
190/// Lock-free ring buffer for work distribution
191pub struct LockFreeWorkQueue<T> {
192    queue: ArrayQueue<T>,
193}
194
195impl<T> LockFreeWorkQueue<T> {
196    pub fn new(capacity: usize) -> Self {
197        Self {
198            queue: ArrayQueue::new(capacity),
199        }
200    }
201
202    #[inline]
203    pub fn try_push(&self, item: T) -> Result<(), T> {
204        self.queue.push(item)
205    }
206
207    #[inline]
208    pub fn try_pop(&self) -> Option<T> {
209        self.queue.pop()
210    }
211
212    #[inline]
213    pub fn len(&self) -> usize {
214        self.queue.len()
215    }
216
217    #[inline]
218    pub fn is_empty(&self) -> bool {
219        self.queue.is_empty()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use std::thread;
227
228    #[test]
229    fn test_lockfree_counter() {
230        let counter = Arc::new(LockFreeCounter::new(0));
231        let mut handles = vec![];
232
233        for _ in 0..10 {
234            let counter_clone = Arc::clone(&counter);
235            handles.push(thread::spawn(move || {
236                for _ in 0..1000 {
237                    counter_clone.increment();
238                }
239            }));
240        }
241
242        for handle in handles {
243            handle.join().unwrap();
244        }
245
246        assert_eq!(counter.get(), 10000);
247    }
248
249    #[test]
250    fn test_object_pool() {
251        let pool = ObjectPool::new(4, || Vec::<u8>::with_capacity(1024));
252
253        let mut obj1 = pool.acquire();
254        obj1.push(1);
255        assert_eq!(obj1.len(), 1);
256
257        drop(obj1);
258
259        let obj2 = pool.acquire();
260        // Object should be reused (but cleared state is not guaranteed)
261        assert!(obj2.capacity() >= 1024);
262    }
263
264    #[test]
265    fn test_stats_collector() {
266        let stats = LockFreeStats::new();
267
268        stats.record_query(1000);
269        stats.record_query(2000);
270        stats.record_insert();
271
272        let snapshot = stats.snapshot();
273        assert_eq!(snapshot.queries, 2);
274        assert_eq!(snapshot.inserts, 1);
275        assert_eq!(snapshot.avg_latency_ns, 1500);
276    }
277}