1use super::*;
2
3pub struct RecvBurstIterator<'a, Up: Send + 'static> {
5 receiver: &'a Receiver<UpMsg<Up>>,
6 buffer_prev: &'a mut Option<UpMsg<Up>>,
7 start: Instant
8}
9
10impl<'a, Up: Send + 'static> RecvBurstIterator<'a, Up> {
11 #[inline]
12 pub(crate) fn new(
13 receiver: &'a Receiver<UpMsg<Up>>,
14 buffer_prev: &'a mut Option<UpMsg<Up>>,
15 start: Instant
16 ) -> Self {
17 Self {
18 receiver,
19 buffer_prev,
20 start
21 }
22 }
23}
24
25impl<'a, Up: Send + 'static> Iterator for RecvBurstIterator<'a, Up> {
31 type Item = Up;
32
33 #[inline]
34 fn next(&mut self) -> Option<Up> {
35 if let Some(ref prev) = self.buffer_prev {
36 if prev.time() > self.start {
37 return None
38 } else {
39 return std::mem::replace(self.buffer_prev, None).map(|m| m.get());
42 }
43 }
44
45 match self.receiver.try_recv() {
46 Ok(msg) => {
47 if msg.time() > self.start {
48 *self.buffer_prev = Some(msg);
49 None
50 } else {
51 Some(msg.get())
52 }
53 }
54 Err(_) => None
55 }
56 }
57}
58
59pub struct RecvAllIterator<Up: Send + 'static> {
61 receiver: Receiver<UpMsg<Up>>,
62 buffer_prev: Option<UpMsg<Up>>,
63 workers: Vec<JoinHandle<()>>,
64}
65
66impl<Up: Send + 'static> RecvAllIterator<Up> {
67 pub(crate) fn new(
68 receiver: Receiver<UpMsg<Up>>,
69 buffer_prev: Option<UpMsg<Up>>,
70 workers: Vec<JoinHandle<()>>,
71 ) -> Self {
72 Self {
73 receiver: receiver,
74 buffer_prev,
75 workers,
76 }
77 }
78}
79
80impl<Up: Send + 'static> Iterator for RecvAllIterator<Up> {
81 type Item = Up;
82
83 #[inline]
84 fn next(&mut self) -> Option<Up> {
85 if self.buffer_prev.is_some() {
86 let buffer_prev = std::mem::replace(&mut self.buffer_prev, None);
87 return Some(buffer_prev.unwrap().get());
88 }
89
90 if let Ok(msg) = self.receiver.recv() {
91 Some(msg.get())
92 } else {
93 for worker in std::mem::replace(&mut self.workers, Vec::new()) {
94 match worker.join() {
95 Ok(_) => {}
96 Err(e) => std::panic::resume_unwind(e)
97 }
98 }
99
100 None
101 }
102 }
103}
104
105impl<Up: Send + 'static> std::iter::FusedIterator for RecvAllIterator<Up> {}