avila_parallel/
lockfree.rs

1//! Lock-free parallel operations using atomic operations
2//!
3//! This module provides lock-free implementations for maximum performance
4
5use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
6use std::sync::Arc;
7use std::thread;
8
9/// Lock-free counter for parallel counting operations
10pub struct AtomicCounter {
11    count: AtomicUsize,
12}
13
14impl AtomicCounter {
15    /// Create a new atomic counter
16    pub fn new() -> Self {
17        Self {
18            count: AtomicUsize::new(0),
19        }
20    }
21
22    /// Increment the counter
23    #[inline]
24    pub fn increment(&self) {
25        self.count.fetch_add(1, Ordering::Relaxed);
26    }
27
28    /// Add a value to the counter
29    #[inline]
30    pub fn add(&self, value: usize) {
31        self.count.fetch_add(value, Ordering::Relaxed);
32    }
33
34    /// Get the current count
35    #[inline]
36    pub fn get(&self) -> usize {
37        self.count.load(Ordering::Acquire)
38    }
39}
40
41/// Lock-free parallel count with atomic operations
42pub fn lockfree_count<T, F>(items: &[T], predicate: F) -> usize
43where
44    T: Sync,
45    F: Fn(&T) -> bool + Send + Sync,
46{
47    let len = items.len();
48    if len == 0 {
49        return 0;
50    }
51
52    let num_threads = thread::available_parallelism()
53        .map(|n| n.get())
54        .unwrap_or(1);
55    let chunk_size = (len + num_threads - 1) / num_threads;
56
57    if chunk_size >= len {
58        return items.iter().filter(|item| predicate(item)).count();
59    }
60
61    let counter = Arc::new(AtomicCounter::new());
62    let predicate = Arc::new(predicate);
63
64    thread::scope(|s| {
65        for chunk in items.chunks(chunk_size) {
66            let counter = Arc::clone(&counter);
67            let predicate = Arc::clone(&predicate);
68            s.spawn(move || {
69                let local_count = chunk.iter().filter(|item| predicate(item)).count();
70                counter.add(local_count);
71            });
72        }
73    });
74
75    counter.get()
76}
77
78/// Lock-free parallel sum using atomic operations
79pub fn lockfree_sum_i32(items: &[i32]) -> i32 {
80    let len = items.len();
81    if len == 0 {
82        return 0;
83    }
84
85    let num_threads = thread::available_parallelism()
86        .map(|n| n.get())
87        .unwrap_or(1);
88    let chunk_size = (len + num_threads - 1) / num_threads;
89
90    if chunk_size >= len {
91        return items.iter().sum();
92    }
93
94    let counter = Arc::new(AtomicCounter::new());
95
96    thread::scope(|s| {
97        for chunk in items.chunks(chunk_size) {
98            let counter = Arc::clone(&counter);
99            s.spawn(move || {
100                let local_sum: i32 = chunk.iter().sum();
101                // Use transmute to convert i32 to usize safely
102                counter.add(local_sum as usize);
103            });
104        }
105    });
106
107    counter.get() as i32
108}
109
110/// Lock-free parallel any operation
111pub fn lockfree_any<T, F>(items: &[T], predicate: F) -> bool
112where
113    T: Sync,
114    F: Fn(&T) -> bool + Send + Sync,
115{
116    let len = items.len();
117    if len == 0 {
118        return false;
119    }
120
121    let num_threads = thread::available_parallelism()
122        .map(|n| n.get())
123        .unwrap_or(1);
124    let chunk_size = (len + num_threads - 1) / num_threads;
125
126    if chunk_size >= len {
127        return items.iter().any(|item| predicate(item));
128    }
129
130    let found = Arc::new(AtomicBool::new(false));
131    let predicate = Arc::new(predicate);
132
133    thread::scope(|s| {
134        for chunk in items.chunks(chunk_size) {
135            let found = Arc::clone(&found);
136            let predicate = Arc::clone(&predicate);
137            s.spawn(move || {
138                if found.load(Ordering::Relaxed) {
139                    return; // Early exit if already found
140                }
141                if chunk.iter().any(|item| predicate(item)) {
142                    found.store(true, Ordering::Release);
143                }
144            });
145        }
146    });
147
148    found.load(Ordering::Acquire)
149}
150
151/// Lock-free parallel all operation
152pub fn lockfree_all<T, F>(items: &[T], predicate: F) -> bool
153where
154    T: Sync,
155    F: Fn(&T) -> bool + Send + Sync,
156{
157    let len = items.len();
158    if len == 0 {
159        return true;
160    }
161
162    let num_threads = thread::available_parallelism()
163        .map(|n| n.get())
164        .unwrap_or(1);
165    let chunk_size = (len + num_threads - 1) / num_threads;
166
167    if chunk_size >= len {
168        return items.iter().all(|item| predicate(item));
169    }
170
171    let all_true = Arc::new(AtomicBool::new(true));
172    let predicate = Arc::new(predicate);
173
174    thread::scope(|s| {
175        for chunk in items.chunks(chunk_size) {
176            let all_true = Arc::clone(&all_true);
177            let predicate = Arc::clone(&predicate);
178            s.spawn(move || {
179                if !all_true.load(Ordering::Relaxed) {
180                    return; // Early exit if already false
181                }
182                if !chunk.iter().all(|item| predicate(item)) {
183                    all_true.store(false, Ordering::Release);
184                }
185            });
186        }
187    });
188
189    all_true.load(Ordering::Acquire)
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    #[test]
197    fn test_atomic_counter() {
198        let counter = AtomicCounter::new();
199        counter.increment();
200        counter.add(5);
201        assert_eq!(counter.get(), 6);
202    }
203
204    #[test]
205    fn test_lockfree_count() {
206        let data: Vec<i32> = (1..=10000).collect();
207        let count = lockfree_count(&data, |x| x % 2 == 0);
208        assert_eq!(count, 5000);
209    }
210
211    #[test]
212    fn test_lockfree_any() {
213        let data: Vec<i32> = (1..=1000).collect();
214        assert!(lockfree_any(&data, |x| *x == 500));
215        assert!(!lockfree_any(&data, |x| *x > 1000));
216    }
217
218    #[test]
219    fn test_lockfree_all() {
220        let data: Vec<i32> = vec![2, 4, 6, 8, 10];
221        assert!(lockfree_all(&data, |x| x % 2 == 0));
222        assert!(!lockfree_all(&data, |x| *x > 5));
223    }
224}