worker_pool/
iterator.rs

1use super::*;
2
3/// An iterator that will yield received messages until the message queue has been caught up to when the iterator was created.
4pub struct RecvBurstIterator<'a, Up: Send + 'static> {
5    receiver: &'a Receiver<UpMsg<Up>>,
6    buffer_prev: &'a mut Option<UpMsg<Up>>,
7    start: Instant
8}
9
10impl<'a, Up: Send + 'static> RecvBurstIterator<'a, Up> {
11    #[inline]
12    pub(crate) fn new(
13        receiver: &'a Receiver<UpMsg<Up>>,
14        buffer_prev: &'a mut Option<UpMsg<Up>>,
15        start: Instant
16    ) -> Self {
17        Self {
18            receiver,
19            buffer_prev,
20            start
21        }
22    }
23}
24
25// This iterator *should* be fused, although the following scenario is still possible
26// Worker sends a message
27// RecvBurstIterator is created
28// RecvBurstIterator::next() is called ~> yields None
29// The message is received ~> RecvBurstIterator::next() will now yield Some
30impl<'a, Up: Send + 'static> Iterator for RecvBurstIterator<'a, Up> {
31    type Item = Up;
32
33    #[inline]
34    fn next(&mut self) -> Option<Up> {
35        if let Some(ref prev) = self.buffer_prev {
36            if prev.time() > self.start {
37                return None
38            } else {
39                // std::mem::drop(prev); // Drop the immutable reference
40
41                return std::mem::replace(self.buffer_prev, None).map(|m| m.get());
42            }
43        }
44
45        match self.receiver.try_recv() {
46            Ok(msg) => {
47                if msg.time() > self.start {
48                    *self.buffer_prev = Some(msg);
49                    None
50                } else {
51                    Some(msg.get())
52                }
53            }
54            Err(_) => None
55        }
56    }
57}
58
59/// An iterator that will yield all the remaining messages from the workers, and join them once they have all dropped their receiver.
60pub struct RecvAllIterator<Up: Send + 'static> {
61    receiver: Receiver<UpMsg<Up>>,
62    buffer_prev: Option<UpMsg<Up>>,
63    workers: Vec<JoinHandle<()>>,
64}
65
66impl<Up: Send + 'static> RecvAllIterator<Up> {
67    pub(crate) fn new(
68        receiver: Receiver<UpMsg<Up>>,
69        buffer_prev: Option<UpMsg<Up>>,
70        workers: Vec<JoinHandle<()>>,
71    ) -> Self {
72        Self {
73            receiver: receiver,
74            buffer_prev,
75            workers,
76        }
77    }
78}
79
80impl<Up: Send + 'static> Iterator for RecvAllIterator<Up> {
81    type Item = Up;
82
83    #[inline]
84    fn next(&mut self) -> Option<Up> {
85        if self.buffer_prev.is_some() {
86            let buffer_prev = std::mem::replace(&mut self.buffer_prev, None);
87            return Some(buffer_prev.unwrap().get());
88        }
89
90        if let Ok(msg) = self.receiver.recv() {
91            Some(msg.get())
92        } else {
93            for worker in std::mem::replace(&mut self.workers, Vec::new()) {
94                match worker.join() {
95                    Ok(_) => {}
96                    Err(e) => std::panic::resume_unwind(e)
97                }
98            }
99
100            None
101        }
102    }
103}
104
105impl<Up: Send + 'static> std::iter::FusedIterator for RecvAllIterator<Up> {}