1use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
6use std::sync::Arc;
7use std::thread;
8
9pub struct AtomicCounter {
11 count: AtomicUsize,
12}
13
14impl AtomicCounter {
15 pub fn new() -> Self {
17 Self {
18 count: AtomicUsize::new(0),
19 }
20 }
21
22 #[inline]
24 pub fn increment(&self) {
25 self.count.fetch_add(1, Ordering::Relaxed);
26 }
27
28 #[inline]
30 pub fn add(&self, value: usize) {
31 self.count.fetch_add(value, Ordering::Relaxed);
32 }
33
34 #[inline]
36 pub fn get(&self) -> usize {
37 self.count.load(Ordering::Acquire)
38 }
39}
40
41pub fn lockfree_count<T, F>(items: &[T], predicate: F) -> usize
43where
44 T: Sync,
45 F: Fn(&T) -> bool + Send + Sync,
46{
47 let len = items.len();
48 if len == 0 {
49 return 0;
50 }
51
52 let num_threads = thread::available_parallelism()
53 .map(|n| n.get())
54 .unwrap_or(1);
55 let chunk_size = (len + num_threads - 1) / num_threads;
56
57 if chunk_size >= len {
58 return items.iter().filter(|item| predicate(item)).count();
59 }
60
61 let counter = Arc::new(AtomicCounter::new());
62 let predicate = Arc::new(predicate);
63
64 thread::scope(|s| {
65 for chunk in items.chunks(chunk_size) {
66 let counter = Arc::clone(&counter);
67 let predicate = Arc::clone(&predicate);
68 s.spawn(move || {
69 let local_count = chunk.iter().filter(|item| predicate(item)).count();
70 counter.add(local_count);
71 });
72 }
73 });
74
75 counter.get()
76}
77
78pub fn lockfree_sum_i32(items: &[i32]) -> i32 {
80 let len = items.len();
81 if len == 0 {
82 return 0;
83 }
84
85 let num_threads = thread::available_parallelism()
86 .map(|n| n.get())
87 .unwrap_or(1);
88 let chunk_size = (len + num_threads - 1) / num_threads;
89
90 if chunk_size >= len {
91 return items.iter().sum();
92 }
93
94 let counter = Arc::new(AtomicCounter::new());
95
96 thread::scope(|s| {
97 for chunk in items.chunks(chunk_size) {
98 let counter = Arc::clone(&counter);
99 s.spawn(move || {
100 let local_sum: i32 = chunk.iter().sum();
101 counter.add(local_sum as usize);
103 });
104 }
105 });
106
107 counter.get() as i32
108}
109
110pub fn lockfree_any<T, F>(items: &[T], predicate: F) -> bool
112where
113 T: Sync,
114 F: Fn(&T) -> bool + Send + Sync,
115{
116 let len = items.len();
117 if len == 0 {
118 return false;
119 }
120
121 let num_threads = thread::available_parallelism()
122 .map(|n| n.get())
123 .unwrap_or(1);
124 let chunk_size = (len + num_threads - 1) / num_threads;
125
126 if chunk_size >= len {
127 return items.iter().any(|item| predicate(item));
128 }
129
130 let found = Arc::new(AtomicBool::new(false));
131 let predicate = Arc::new(predicate);
132
133 thread::scope(|s| {
134 for chunk in items.chunks(chunk_size) {
135 let found = Arc::clone(&found);
136 let predicate = Arc::clone(&predicate);
137 s.spawn(move || {
138 if found.load(Ordering::Relaxed) {
139 return; }
141 if chunk.iter().any(|item| predicate(item)) {
142 found.store(true, Ordering::Release);
143 }
144 });
145 }
146 });
147
148 found.load(Ordering::Acquire)
149}
150
151pub fn lockfree_all<T, F>(items: &[T], predicate: F) -> bool
153where
154 T: Sync,
155 F: Fn(&T) -> bool + Send + Sync,
156{
157 let len = items.len();
158 if len == 0 {
159 return true;
160 }
161
162 let num_threads = thread::available_parallelism()
163 .map(|n| n.get())
164 .unwrap_or(1);
165 let chunk_size = (len + num_threads - 1) / num_threads;
166
167 if chunk_size >= len {
168 return items.iter().all(|item| predicate(item));
169 }
170
171 let all_true = Arc::new(AtomicBool::new(true));
172 let predicate = Arc::new(predicate);
173
174 thread::scope(|s| {
175 for chunk in items.chunks(chunk_size) {
176 let all_true = Arc::clone(&all_true);
177 let predicate = Arc::clone(&predicate);
178 s.spawn(move || {
179 if !all_true.load(Ordering::Relaxed) {
180 return; }
182 if !chunk.iter().all(|item| predicate(item)) {
183 all_true.store(false, Ordering::Release);
184 }
185 });
186 }
187 });
188
189 all_true.load(Ordering::Acquire)
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195
196 #[test]
197 fn test_atomic_counter() {
198 let counter = AtomicCounter::new();
199 counter.increment();
200 counter.add(5);
201 assert_eq!(counter.get(), 6);
202 }
203
204 #[test]
205 fn test_lockfree_count() {
206 let data: Vec<i32> = (1..=10000).collect();
207 let count = lockfree_count(&data, |x| x % 2 == 0);
208 assert_eq!(count, 5000);
209 }
210
211 #[test]
212 fn test_lockfree_any() {
213 let data: Vec<i32> = (1..=1000).collect();
214 assert!(lockfree_any(&data, |x| *x == 500));
215 assert!(!lockfree_any(&data, |x| *x > 1000));
216 }
217
218 #[test]
219 fn test_lockfree_all() {
220 let data: Vec<i32> = vec![2, 4, 6, 8, 10];
221 assert!(lockfree_all(&data, |x| x % 2 == 0));
222 assert!(!lockfree_all(&data, |x| *x > 5));
223 }
224}
225
226
227
228
229