thread_manager/
iterator.rs

1use std::sync::Arc;
2
3use crossbeam_channel::RecvError;
4
5use crate::channel::ResultChannel;
6use crate::worker::ThreadWorker;
7
8pub struct ResultIter<'a, T>
9where
10    T: Send + 'static,
11{
12    result_channel: &'a Arc<ResultChannel<T>>,
13}
14
15impl<'a, T> ResultIter<'a, T>
16where
17    T: Send + 'static,
18{
19    pub fn new(result_channel: &'a Arc<ResultChannel<T>>) -> Self {
20        Self { result_channel }
21    }
22
23    /// Checks if results are available.
24    pub fn has_results(&self) -> bool {
25        !self.result_channel.is_finished()
26    }
27}
28
29impl<'a, T> Iterator for ResultIter<'a, T>
30where
31    T: Send + 'static,
32{
33    type Item = T;
34
35    fn next(&mut self) -> Option<Self::Item> {
36        if self.has_results() {
37            let result: Result<T, RecvError> = self.result_channel.recv();
38            self.result_channel.status().set_concluded(true);
39            if let Ok(result) = result {
40                return Some(result);
41            }
42        }
43        None
44    }
45}
46
47pub struct YieldResultIter<'a, F, T>
48where
49    F: Fn() -> T + Send + 'static,
50    T: Send + 'static,
51{
52    workers: &'a Vec<ThreadWorker<F, T>>,
53    result_channel: &'a Arc<ResultChannel<T>>,
54}
55
56impl<'a, F, T> YieldResultIter<'a, F, T>
57where
58    F: Fn() -> T + Send + 'static,
59    T: Send + 'static,
60{
61    pub fn new(
62        workers: &'a Vec<ThreadWorker<F, T>>,
63        result_channel: &'a Arc<ResultChannel<T>>,
64    ) -> Self {
65        Self {
66            workers,
67            result_channel,
68        }
69    }
70
71    /// Checks if jobs are pending in the queue.
72    pub fn has_jobs(&self) -> bool {
73        for worker in self.workers.iter() {
74            if !worker.job_channel().is_finished() {
75                return true;
76            }
77        }
78        false
79    }
80
81    /// Checks if results are available.
82    pub fn has_results(&self) -> bool {
83        !self.result_channel.is_finished()
84    }
85}
86
87impl<'a, F, T> Iterator for YieldResultIter<'a, F, T>
88where
89    F: Fn() -> T + Send + 'static,
90    T: Send + 'static,
91{
92    type Item = T;
93
94    fn next(&mut self) -> Option<Self::Item> {
95        if self.has_jobs() || self.has_results() {
96            let result: Result<T, RecvError> = self.result_channel.recv();
97            self.result_channel.status().set_concluded(true);
98            if let Ok(result) = result {
99                return Some(result);
100            }
101        }
102        None
103    }
104}