ruvector_core/
lockfree.rs1use crossbeam::queue::{ArrayQueue, SegQueue};
7use crossbeam::utils::CachePadded;
8use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11#[repr(align(64))]
13pub struct LockFreeCounter {
14 value: CachePadded<AtomicU64>,
15}
16
17impl LockFreeCounter {
18 pub fn new(initial: u64) -> Self {
19 Self {
20 value: CachePadded::new(AtomicU64::new(initial)),
21 }
22 }
23
24 #[inline]
25 pub fn increment(&self) -> u64 {
26 self.value.fetch_add(1, Ordering::Relaxed)
27 }
28
29 #[inline]
30 pub fn get(&self) -> u64 {
31 self.value.load(Ordering::Relaxed)
32 }
33
34 #[inline]
35 pub fn add(&self, delta: u64) -> u64 {
36 self.value.fetch_add(delta, Ordering::Relaxed)
37 }
38}
39
40pub struct LockFreeStats {
42 queries: CachePadded<AtomicU64>,
43 inserts: CachePadded<AtomicU64>,
44 deletes: CachePadded<AtomicU64>,
45 total_latency_ns: CachePadded<AtomicU64>,
46}
47
48impl LockFreeStats {
49 pub fn new() -> Self {
50 Self {
51 queries: CachePadded::new(AtomicU64::new(0)),
52 inserts: CachePadded::new(AtomicU64::new(0)),
53 deletes: CachePadded::new(AtomicU64::new(0)),
54 total_latency_ns: CachePadded::new(AtomicU64::new(0)),
55 }
56 }
57
58 #[inline]
59 pub fn record_query(&self, latency_ns: u64) {
60 self.queries.fetch_add(1, Ordering::Relaxed);
61 self.total_latency_ns
62 .fetch_add(latency_ns, Ordering::Relaxed);
63 }
64
65 #[inline]
66 pub fn record_insert(&self) {
67 self.inserts.fetch_add(1, Ordering::Relaxed);
68 }
69
70 #[inline]
71 pub fn record_delete(&self) {
72 self.deletes.fetch_add(1, Ordering::Relaxed);
73 }
74
75 pub fn snapshot(&self) -> StatsSnapshot {
76 let queries = self.queries.load(Ordering::Relaxed);
77 let total_latency = self.total_latency_ns.load(Ordering::Relaxed);
78
79 StatsSnapshot {
80 queries,
81 inserts: self.inserts.load(Ordering::Relaxed),
82 deletes: self.deletes.load(Ordering::Relaxed),
83 avg_latency_ns: if queries > 0 {
84 total_latency / queries
85 } else {
86 0
87 },
88 }
89 }
90}
91
92impl Default for LockFreeStats {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98#[derive(Debug, Clone)]
99pub struct StatsSnapshot {
100 pub queries: u64,
101 pub inserts: u64,
102 pub deletes: u64,
103 pub avg_latency_ns: u64,
104}
105
106pub struct ObjectPool<T> {
108 queue: Arc<SegQueue<T>>,
109 factory: Arc<dyn Fn() -> T + Send + Sync>,
110 capacity: usize,
111 allocated: AtomicUsize,
112}
113
114impl<T> ObjectPool<T> {
115 pub fn new<F>(capacity: usize, factory: F) -> Self
116 where
117 F: Fn() -> T + Send + Sync + 'static,
118 {
119 Self {
120 queue: Arc::new(SegQueue::new()),
121 factory: Arc::new(factory),
122 capacity,
123 allocated: AtomicUsize::new(0),
124 }
125 }
126
127 pub fn acquire(&self) -> PooledObject<T> {
129 let object = self.queue.pop().unwrap_or_else(|| {
130 let current = self.allocated.fetch_add(1, Ordering::Relaxed);
131 if current < self.capacity {
132 (self.factory)()
133 } else {
134 self.allocated.fetch_sub(1, Ordering::Relaxed);
135 loop {
137 if let Some(obj) = self.queue.pop() {
138 break obj;
139 }
140 std::hint::spin_loop();
141 }
142 }
143 });
144
145 PooledObject {
146 object: Some(object),
147 pool: Arc::clone(&self.queue),
148 }
149 }
150}
151
152pub struct PooledObject<T> {
154 object: Option<T>,
155 pool: Arc<SegQueue<T>>,
156}
157
158impl<T> PooledObject<T> {
159 pub fn get(&self) -> &T {
160 self.object.as_ref().unwrap()
161 }
162
163 pub fn get_mut(&mut self) -> &mut T {
164 self.object.as_mut().unwrap()
165 }
166}
167
168impl<T> Drop for PooledObject<T> {
169 fn drop(&mut self) {
170 if let Some(object) = self.object.take() {
171 self.pool.push(object);
172 }
173 }
174}
175
176impl<T> std::ops::Deref for PooledObject<T> {
177 type Target = T;
178
179 fn deref(&self) -> &Self::Target {
180 self.object.as_ref().unwrap()
181 }
182}
183
184impl<T> std::ops::DerefMut for PooledObject<T> {
185 fn deref_mut(&mut self) -> &mut Self::Target {
186 self.object.as_mut().unwrap()
187 }
188}
189
190pub struct LockFreeWorkQueue<T> {
192 queue: ArrayQueue<T>,
193}
194
195impl<T> LockFreeWorkQueue<T> {
196 pub fn new(capacity: usize) -> Self {
197 Self {
198 queue: ArrayQueue::new(capacity),
199 }
200 }
201
202 #[inline]
203 pub fn try_push(&self, item: T) -> Result<(), T> {
204 self.queue.push(item)
205 }
206
207 #[inline]
208 pub fn try_pop(&self) -> Option<T> {
209 self.queue.pop()
210 }
211
212 #[inline]
213 pub fn len(&self) -> usize {
214 self.queue.len()
215 }
216
217 #[inline]
218 pub fn is_empty(&self) -> bool {
219 self.queue.is_empty()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use std::thread;
227
228 #[test]
229 fn test_lockfree_counter() {
230 let counter = Arc::new(LockFreeCounter::new(0));
231 let mut handles = vec![];
232
233 for _ in 0..10 {
234 let counter_clone = Arc::clone(&counter);
235 handles.push(thread::spawn(move || {
236 for _ in 0..1000 {
237 counter_clone.increment();
238 }
239 }));
240 }
241
242 for handle in handles {
243 handle.join().unwrap();
244 }
245
246 assert_eq!(counter.get(), 10000);
247 }
248
249 #[test]
250 fn test_object_pool() {
251 let pool = ObjectPool::new(4, || Vec::<u8>::with_capacity(1024));
252
253 let mut obj1 = pool.acquire();
254 obj1.push(1);
255 assert_eq!(obj1.len(), 1);
256
257 drop(obj1);
258
259 let obj2 = pool.acquire();
260 assert!(obj2.capacity() >= 1024);
262 }
263
264 #[test]
265 fn test_stats_collector() {
266 let stats = LockFreeStats::new();
267
268 stats.record_query(1000);
269 stats.record_query(2000);
270 stats.record_insert();
271
272 let snapshot = stats.snapshot();
273 assert_eq!(snapshot.queries, 2);
274 assert_eq!(snapshot.inserts, 1);
275 assert_eq!(snapshot.avg_latency_ns, 1500);
276 }
277}