ruvector_core/
lockfree.rs1#![cfg(all(feature = "parallel", not(target_arch = "wasm32")))]
9
10use crossbeam::queue::{ArrayQueue, SegQueue};
11use crossbeam::utils::CachePadded;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::Arc;
14
15#[repr(align(64))]
17pub struct LockFreeCounter {
18 value: CachePadded<AtomicU64>,
19}
20
21impl LockFreeCounter {
22 pub fn new(initial: u64) -> Self {
23 Self {
24 value: CachePadded::new(AtomicU64::new(initial)),
25 }
26 }
27
28 #[inline]
29 pub fn increment(&self) -> u64 {
30 self.value.fetch_add(1, Ordering::Relaxed)
31 }
32
33 #[inline]
34 pub fn get(&self) -> u64 {
35 self.value.load(Ordering::Relaxed)
36 }
37
38 #[inline]
39 pub fn add(&self, delta: u64) -> u64 {
40 self.value.fetch_add(delta, Ordering::Relaxed)
41 }
42}
43
44pub struct LockFreeStats {
46 queries: CachePadded<AtomicU64>,
47 inserts: CachePadded<AtomicU64>,
48 deletes: CachePadded<AtomicU64>,
49 total_latency_ns: CachePadded<AtomicU64>,
50}
51
52impl LockFreeStats {
53 pub fn new() -> Self {
54 Self {
55 queries: CachePadded::new(AtomicU64::new(0)),
56 inserts: CachePadded::new(AtomicU64::new(0)),
57 deletes: CachePadded::new(AtomicU64::new(0)),
58 total_latency_ns: CachePadded::new(AtomicU64::new(0)),
59 }
60 }
61
62 #[inline]
63 pub fn record_query(&self, latency_ns: u64) {
64 self.queries.fetch_add(1, Ordering::Relaxed);
65 self.total_latency_ns
66 .fetch_add(latency_ns, Ordering::Relaxed);
67 }
68
69 #[inline]
70 pub fn record_insert(&self) {
71 self.inserts.fetch_add(1, Ordering::Relaxed);
72 }
73
74 #[inline]
75 pub fn record_delete(&self) {
76 self.deletes.fetch_add(1, Ordering::Relaxed);
77 }
78
79 pub fn snapshot(&self) -> StatsSnapshot {
80 let queries = self.queries.load(Ordering::Relaxed);
81 let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
82
83 StatsSnapshot {
84 queries,
85 inserts: self.inserts.load(Ordering::Relaxed),
86 deletes: self.deletes.load(Ordering::Relaxed),
87 avg_latency_ns: if queries > 0 {
88 total_latency / queries
89 } else {
90 0
91 },
92 }
93 }
94}
95
96impl Default for LockFreeStats {
97 fn default() -> Self {
98 Self::new()
99 }
100}
101
102#[derive(Debug, Clone)]
103pub struct StatsSnapshot {
104 pub queries: u64,
105 pub inserts: u64,
106 pub deletes: u64,
107 pub avg_latency_ns: u64,
108}
109
110pub struct ObjectPool<T> {
112 queue: Arc<SegQueue<T>>,
113 factory: Arc<dyn Fn() -> T + Send + Sync>,
114 capacity: usize,
115 allocated: AtomicUsize,
116}
117
118impl<T> ObjectPool<T> {
119 pub fn new<F>(capacity: usize, factory: F) -> Self
120 where
121 F: Fn() -> T + Send + Sync + 'static,
122 {
123 Self {
124 queue: Arc::new(SegQueue::new()),
125 factory: Arc::new(factory),
126 capacity,
127 allocated: AtomicUsize::new(0),
128 }
129 }
130
131 pub fn acquire(&self) -> PooledObject<T> {
133 let object = self.queue.pop().unwrap_or_else(|| {
134 let current = self.allocated.fetch_add(1, Ordering::Relaxed);
135 if current < self.capacity {
136 (self.factory)()
137 } else {
138 self.allocated.fetch_sub(1, Ordering::Relaxed);
139 loop {
141 if let Some(obj) = self.queue.pop() {
142 break obj;
143 }
144 std::hint::spin_loop();
145 }
146 }
147 });
148
149 PooledObject {
150 object: Some(object),
151 pool: Arc::clone(&self.queue),
152 }
153 }
154}
155
156pub struct PooledObject<T> {
158 object: Option<T>,
159 pool: Arc<SegQueue<T>>,
160}
161
162impl<T> PooledObject<T> {
163 pub fn get(&self) -> &T {
164 self.object.as_ref().unwrap()
165 }
166
167 pub fn get_mut(&mut self) -> &mut T {
168 self.object.as_mut().unwrap()
169 }
170}
171
172impl<T> Drop for PooledObject<T> {
173 fn drop(&mut self) {
174 if let Some(object) = self.object.take() {
175 self.pool.push(object);
176 }
177 }
178}
179
180impl<T> std::ops::Deref for PooledObject<T> {
181 type Target = T;
182
183 fn deref(&self) -> &Self::Target {
184 self.object.as_ref().unwrap()
185 }
186}
187
188impl<T> std::ops::DerefMut for PooledObject<T> {
189 fn deref_mut(&mut self) -> &mut Self::Target {
190 self.object.as_mut().unwrap()
191 }
192}
193
194pub struct LockFreeWorkQueue<T> {
196 queue: ArrayQueue<T>,
197}
198
199impl<T> LockFreeWorkQueue<T> {
200 pub fn new(capacity: usize) -> Self {
201 Self {
202 queue: ArrayQueue::new(capacity),
203 }
204 }
205
206 #[inline]
207 pub fn try_push(&self, item: T) -> Result<(), T> {
208 self.queue.push(item)
209 }
210
211 #[inline]
212 pub fn try_pop(&self) -> Option<T> {
213 self.queue.pop()
214 }
215
216 #[inline]
217 pub fn len(&self) -> usize {
218 self.queue.len()
219 }
220
221 #[inline]
222 pub fn is_empty(&self) -> bool {
223 self.queue.is_empty()
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use std::thread;
231
232 #[test]
233 fn test_lockfree_counter() {
234 let counter = Arc::new(LockFreeCounter::new(0));
235 let mut handles = vec![];
236
237 for _ in 0..10 {
238 let counter_clone = Arc::clone(&counter);
239 handles.push(thread::spawn(move || {
240 for _ in 0..1000 {
241 counter_clone.increment();
242 }
243 }));
244 }
245
246 for handle in handles {
247 handle.join().unwrap();
248 }
249
250 assert_eq!(counter.get(), 10000);
251 }
252
253 #[test]
254 fn test_object_pool() {
255 let pool = ObjectPool::new(4, || Vec::<u8>::with_capacity(1024));
256
257 let mut obj1 = pool.acquire();
258 obj1.push(1);
259 assert_eq!(obj1.len(), 1);
260
261 drop(obj1);
262
263 let obj2 = pool.acquire();
264 assert!(obj2.capacity() >= 1024);
266 }
267
268 #[test]
269 fn test_stats_collector() {
270 let stats = LockFreeStats::new();
271
272 stats.record_query(1000);
273 stats.record_query(2000);
274 stats.record_insert();
275
276 let snapshot = stats.snapshot();
277 assert_eq!(snapshot.queries, 2);
278 assert_eq!(snapshot.inserts, 1);
279 assert_eq!(snapshot.avg_latency_ns, 1500);
280 }
281}