1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use super::*;

/// An iterator that will yield received messages until the message queue has been caught up to when the iterator was created.
pub struct RecvBurstIterator<'a, Up: Send + 'static> {
    receiver: &'a Receiver<UpMsg<Up>>,
    buffer_prev: &'a mut Option<UpMsg<Up>>,
    start: Instant
}

impl<'a, Up: Send + 'static> RecvBurstIterator<'a, Up> {
    #[inline]
    pub(crate) fn new(
        receiver: &'a Receiver<UpMsg<Up>>,
        buffer_prev: &'a mut Option<UpMsg<Up>>,
        start: Instant
    ) -> Self {
        Self {
            receiver,
            buffer_prev,
            start
        }
    }
}

// This iterator *should* be fused, although the following scenario is still possible
// Worker sends a message
// RecvBurstIterator is created
// RecvBurstIterator::next() is called ~> yields None
// The message is received ~> RecvBurstIterator::next() will now yield Some
impl<'a, Up: Send + 'static> Iterator for RecvBurstIterator<'a, Up> {
    type Item = Up;

    #[inline]
    fn next(&mut self) -> Option<Up> {
        if let Some(ref prev) = self.buffer_prev {
            if prev.time() > self.start {
                return None
            } else {
                // std::mem::drop(prev); // Drop the immutable reference

                return std::mem::replace(self.buffer_prev, None).map(|m| m.get());
            }
        }

        match self.receiver.try_recv() {
            Ok(msg) => {
                if msg.time() > self.start {
                    *self.buffer_prev = Some(msg);
                    None
                } else {
                    Some(msg.get())
                }
            }
            Err(_) => None
        }
    }
}

/// An iterator that will yield all the remaining messages from the workers, and join them once they have all dropped their receiver.
pub struct RecvAllIterator<Up: Send + 'static> {
    receiver: Receiver<UpMsg<Up>>,
    buffer_prev: Option<UpMsg<Up>>,
    workers: Vec<JoinHandle<()>>,
}

impl<Up: Send + 'static> RecvAllIterator<Up> {
    pub(crate) fn new(
        receiver: Receiver<UpMsg<Up>>,
        buffer_prev: Option<UpMsg<Up>>,
        workers: Vec<JoinHandle<()>>,
    ) -> Self {
        Self {
            receiver: receiver,
            buffer_prev,
            workers,
        }
    }
}

impl<Up: Send + 'static> Iterator for RecvAllIterator<Up> {
    type Item = Up;

    #[inline]
    fn next(&mut self) -> Option<Up> {
        if self.buffer_prev.is_some() {
            let buffer_prev = std::mem::replace(&mut self.buffer_prev, None);
            return Some(buffer_prev.unwrap().get());
        }

        if let Ok(msg) = self.receiver.recv() {
            Some(msg.get())
        } else {
            for worker in std::mem::replace(&mut self.workers, Vec::new()) {
                match worker.join() {
                    Ok(_) => {}
                    Err(e) => std::panic::resume_unwind(e)
                }
            }

            None
        }
    }
}

impl<Up: Send + 'static> std::iter::FusedIterator for RecvAllIterator<Up> {}