sync_results/
sync-results.rs1use std::{
2 hint::spin_loop,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7 thread,
8};
9
10use option_lock::OptionLock;
11
12#[derive(Debug)]
13struct Results<T> {
14 data: Vec<OptionLock<T>>,
15 completed: AtomicUsize,
16 iter_index: usize,
17}
18
19impl<T> Results<T> {
20 pub fn new(size: usize) -> Self {
21 let mut data = Vec::with_capacity(size);
22 data.resize_with(size, OptionLock::default);
23 Self {
24 data,
25 completed: AtomicUsize::default(),
26 iter_index: 0,
27 }
28 }
29
30 pub fn completed(&self) -> bool {
31 self.completed.load(Ordering::Relaxed) == self.data.len()
32 }
33
34 pub fn return_result(&self, index: usize, value: T) {
35 if let Ok(()) = self.data[index].try_fill(value) {
36 self.completed.fetch_add(1, Ordering::Release);
37 } else {
38 panic!("Update conflict");
39 }
40 }
41}
42
43impl<T> Iterator for Results<T> {
44 type Item = T;
45
46 fn next(&mut self) -> Option<Self::Item> {
47 let idx = self.iter_index;
48 if idx < self.data.len() {
49 self.iter_index += 1;
50 Some(self.data[idx].take().unwrap())
51 } else {
52 None
53 }
54 }
55}
56
57fn main() {
58 let count = 10;
59 let res = Arc::new(Results::new(count));
60 for index in 0..count {
61 let res = res.clone();
62 thread::spawn(move || {
63 res.return_result(index, index * 2);
64 });
65 }
66 loop {
67 if res.completed() {
68 break;
69 }
70 spin_loop();
71 }
72 let res = Arc::try_unwrap(res).expect("Error unwrapping arc");
73 let mut total = 0;
74 for item in res {
75 total += item;
76 }
77 assert_eq!(total, 90);
78 println!("Completed");
79}