sync_results/
sync-results.rs

1use std::{
2    hint::spin_loop,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7    thread,
8};
9
10use option_lock::OptionLock;
11
12#[derive(Debug)]
13struct Results<T> {
14    data: Vec<OptionLock<T>>,
15    completed: AtomicUsize,
16    iter_index: usize,
17}
18
19impl<T> Results<T> {
20    pub fn new(size: usize) -> Self {
21        let mut data = Vec::with_capacity(size);
22        data.resize_with(size, OptionLock::default);
23        Self {
24            data,
25            completed: AtomicUsize::default(),
26            iter_index: 0,
27        }
28    }
29
30    pub fn completed(&self) -> bool {
31        self.completed.load(Ordering::Relaxed) == self.data.len()
32    }
33
34    pub fn return_result(&self, index: usize, value: T) {
35        if let Ok(()) = self.data[index].try_fill(value) {
36            self.completed.fetch_add(1, Ordering::Release);
37        } else {
38            panic!("Update conflict");
39        }
40    }
41}
42
43impl<T> Iterator for Results<T> {
44    type Item = T;
45
46    fn next(&mut self) -> Option<Self::Item> {
47        let idx = self.iter_index;
48        if idx < self.data.len() {
49            self.iter_index += 1;
50            Some(self.data[idx].take().unwrap())
51        } else {
52            None
53        }
54    }
55}
56
57fn main() {
58    let count = 10;
59    let res = Arc::new(Results::new(count));
60    for index in 0..count {
61        let res = res.clone();
62        thread::spawn(move || {
63            res.return_result(index, index * 2);
64        });
65    }
66    loop {
67        if res.completed() {
68            break;
69        }
70        spin_loop();
71    }
72    let res = Arc::try_unwrap(res).expect("Error unwrapping arc");
73    let mut total = 0;
74    for item in res {
75        total += item;
76    }
77    assert_eq!(total, 90);
78    println!("Completed");
79}