1use std::iter::FromIterator;
2#[cfg(feature = "parallel")]
3use std::sync::mpsc::{channel, Receiver, TryRecvError};
4
5pub enum JobResult<T> {
6 Running(Job<T>),
7 Complete(T),
8 Dead,
9}
10
11impl<T> JobResult<T> {
12 pub fn unwrap(self) -> T {
13 match self {
14 Self::Complete(data) => data,
15 _ => panic!("Trying to unwrap incomplete job!"),
16 }
17 }
18
19 pub fn expect(self, message: &str) -> T {
20 match self {
21 Self::Complete(data) => data,
22 _ => panic!("{}", message),
23 }
24 }
25
26 pub fn is_running(&self) -> bool {
27 matches!(self, Self::Running(_))
28 }
29
30 pub fn is_complete(&self) -> bool {
31 matches!(self, Self::Complete(_))
32 }
33
34 pub fn is_dead(&self) -> bool {
35 matches!(self, Self::Dead)
36 }
37}
38
39#[cfg(not(feature = "parallel"))]
40pub struct Job<T>(T);
41
42#[cfg(feature = "parallel")]
43pub struct Job<T>(Receiver<T>);
44
45impl<T> Job<T> {
46 pub fn new<F>(f: F) -> Job<T>
47 where
48 T: Send + 'static,
49 F: FnOnce() -> T + Send + 'static,
50 {
51 #[cfg(not(feature = "parallel"))]
52 {
53 Job(f())
54 }
55 #[cfg(feature = "parallel")]
56 {
57 let (sender, receiver) = channel();
58 rayon::spawn_fifo(move || {
59 let _ = sender.send(f());
60 });
61 Job(receiver)
62 }
63 }
64}
65
66impl<T> Job<T> {
67 #[cfg(not(feature = "parallel"))]
68 pub fn try_consume(self) -> JobResult<T> {
69 JobResult::Complete(self.0)
70 }
71
72 #[cfg(feature = "parallel")]
73 pub fn try_consume(self) -> JobResult<T> {
74 let receiver = self.0;
75 match receiver.try_recv() {
76 Ok(data) => JobResult::Complete(data),
77 Err(error) => match error {
78 TryRecvError::Empty => JobResult::Running(Self(receiver)),
79 TryRecvError::Disconnected => JobResult::Dead,
80 },
81 }
82 }
83
84 #[cfg(not(feature = "parallel"))]
85 pub fn consume(self) -> JobResult<T> {
86 self.try_consume()
87 }
88
89 #[cfg(feature = "parallel")]
90 pub fn consume(self) -> JobResult<T> {
91 match self.0.recv() {
92 Ok(data) => JobResult::Complete(data),
93 Err(_) => JobResult::Dead,
94 }
95 }
96}
97
98pub struct JobsGroup<T>(Vec<JobResult<T>>);
99
100impl<T> JobsGroup<T> {
101 pub fn new<I>(iter: I) -> Self
102 where
103 I: IntoIterator<Item = Job<T>>,
104 {
105 Self(iter.into_iter().map(JobResult::Running).collect())
106 }
107
108 pub fn try_consume(self) -> Result<Vec<T>, Self> {
109 let result = self
110 .0
111 .into_iter()
112 .map(|result| match result {
113 JobResult::Running(job) => job.try_consume(),
114 _ => result,
115 })
116 .collect::<Vec<_>>();
117 if result.iter().all(|result| result.is_complete()) {
118 Ok(result.into_iter().map(|result| result.unwrap()).collect())
119 } else {
120 Err(Self(result))
121 }
122 }
123
124 pub fn consume(self) -> Vec<T> {
125 self.0
126 .into_iter()
127 .filter_map(|result| match result {
128 JobResult::Running(job) => match job.consume() {
129 JobResult::Complete(data) => Some(data),
130 _ => None,
131 },
132 JobResult::Complete(data) => Some(data),
133 JobResult::Dead => None,
134 })
135 .collect()
136 }
137
138 pub fn is_all_complete(&self) -> bool {
139 self.0.iter().all(|result| result.is_complete())
140 }
141
142 pub fn is_any_dead(&self) -> bool {
143 self.0.iter().any(|result| result.is_dead())
144 }
145
146 pub fn is_any_running(&self) -> bool {
147 self.0.iter().any(|result| result.is_running())
148 }
149}
150
151impl<T> FromIterator<Job<T>> for JobsGroup<T> {
152 fn from_iter<I>(iter: I) -> Self
153 where
154 I: IntoIterator<Item = Job<T>>,
155 {
156 Self::new(iter)
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use std::time::{Duration, Instant};
164
165 #[test]
166 fn test_jobs() {
167 fn fib(n: usize) -> (usize, Duration) {
168 let timer = Instant::now();
169 let mut x = (1, 1);
170 for _ in 0..n {
171 x = (x.1, x.0 + x.1)
172 }
173 (x.0, timer.elapsed())
174 }
175
176 let single = Job::new(|| fib(50));
177 let group = (0..20)
178 .into_iter()
179 .map(|n| Job::new(move || fib(n)))
180 .collect::<JobsGroup<_>>();
181 println!("* Single result: {:?}", single.consume().unwrap());
182 println!("* Group results: {:?}", group.consume());
183 }
184}