oxygengine_core/
jobs.rs

1use std::iter::FromIterator;
2#[cfg(feature = "parallel")]
3use std::sync::mpsc::{channel, Receiver, TryRecvError};
4
5pub enum JobResult<T> {
6    Running(Job<T>),
7    Complete(T),
8    Dead,
9}
10
11impl<T> JobResult<T> {
12    pub fn unwrap(self) -> T {
13        match self {
14            Self::Complete(data) => data,
15            _ => panic!("Trying to unwrap incomplete job!"),
16        }
17    }
18
19    pub fn expect(self, message: &str) -> T {
20        match self {
21            Self::Complete(data) => data,
22            _ => panic!("{}", message),
23        }
24    }
25
26    pub fn is_running(&self) -> bool {
27        matches!(self, Self::Running(_))
28    }
29
30    pub fn is_complete(&self) -> bool {
31        matches!(self, Self::Complete(_))
32    }
33
34    pub fn is_dead(&self) -> bool {
35        matches!(self, Self::Dead)
36    }
37}
38
39#[cfg(not(feature = "parallel"))]
40pub struct Job<T>(T);
41
42#[cfg(feature = "parallel")]
43pub struct Job<T>(Receiver<T>);
44
45impl<T> Job<T> {
46    pub fn new<F>(f: F) -> Job<T>
47    where
48        T: Send + 'static,
49        F: FnOnce() -> T + Send + 'static,
50    {
51        #[cfg(not(feature = "parallel"))]
52        {
53            Job(f())
54        }
55        #[cfg(feature = "parallel")]
56        {
57            let (sender, receiver) = channel();
58            rayon::spawn_fifo(move || {
59                let _ = sender.send(f());
60            });
61            Job(receiver)
62        }
63    }
64}
65
66impl<T> Job<T> {
67    #[cfg(not(feature = "parallel"))]
68    pub fn try_consume(self) -> JobResult<T> {
69        JobResult::Complete(self.0)
70    }
71
72    #[cfg(feature = "parallel")]
73    pub fn try_consume(self) -> JobResult<T> {
74        let receiver = self.0;
75        match receiver.try_recv() {
76            Ok(data) => JobResult::Complete(data),
77            Err(error) => match error {
78                TryRecvError::Empty => JobResult::Running(Self(receiver)),
79                TryRecvError::Disconnected => JobResult::Dead,
80            },
81        }
82    }
83
84    #[cfg(not(feature = "parallel"))]
85    pub fn consume(self) -> JobResult<T> {
86        self.try_consume()
87    }
88
89    #[cfg(feature = "parallel")]
90    pub fn consume(self) -> JobResult<T> {
91        match self.0.recv() {
92            Ok(data) => JobResult::Complete(data),
93            Err(_) => JobResult::Dead,
94        }
95    }
96}
97
98pub struct JobsGroup<T>(Vec<JobResult<T>>);
99
100impl<T> JobsGroup<T> {
101    pub fn new<I>(iter: I) -> Self
102    where
103        I: IntoIterator<Item = Job<T>>,
104    {
105        Self(iter.into_iter().map(JobResult::Running).collect())
106    }
107
108    pub fn try_consume(self) -> Result<Vec<T>, Self> {
109        let result = self
110            .0
111            .into_iter()
112            .map(|result| match result {
113                JobResult::Running(job) => job.try_consume(),
114                _ => result,
115            })
116            .collect::<Vec<_>>();
117        if result.iter().all(|result| result.is_complete()) {
118            Ok(result.into_iter().map(|result| result.unwrap()).collect())
119        } else {
120            Err(Self(result))
121        }
122    }
123
124    pub fn consume(self) -> Vec<T> {
125        self.0
126            .into_iter()
127            .filter_map(|result| match result {
128                JobResult::Running(job) => match job.consume() {
129                    JobResult::Complete(data) => Some(data),
130                    _ => None,
131                },
132                JobResult::Complete(data) => Some(data),
133                JobResult::Dead => None,
134            })
135            .collect()
136    }
137
138    pub fn is_all_complete(&self) -> bool {
139        self.0.iter().all(|result| result.is_complete())
140    }
141
142    pub fn is_any_dead(&self) -> bool {
143        self.0.iter().any(|result| result.is_dead())
144    }
145
146    pub fn is_any_running(&self) -> bool {
147        self.0.iter().any(|result| result.is_running())
148    }
149}
150
151impl<T> FromIterator<Job<T>> for JobsGroup<T> {
152    fn from_iter<I>(iter: I) -> Self
153    where
154        I: IntoIterator<Item = Job<T>>,
155    {
156        Self::new(iter)
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163    use std::time::{Duration, Instant};
164
165    #[test]
166    fn test_jobs() {
167        fn fib(n: usize) -> (usize, Duration) {
168            let timer = Instant::now();
169            let mut x = (1, 1);
170            for _ in 0..n {
171                x = (x.1, x.0 + x.1)
172            }
173            (x.0, timer.elapsed())
174        }
175
176        let single = Job::new(|| fib(50));
177        let group = (0..20)
178            .into_iter()
179            .map(|n| Job::new(move || fib(n)))
180            .collect::<JobsGroup<_>>();
181        println!("* Single result: {:?}", single.consume().unwrap());
182        println!("* Group results: {:?}", group.consume());
183    }
184}