1#[cfg(feature = "crossbeam")]
15extern crate crossbeam_channel;
16
17#[cfg(feature = "crossbeam")]
18mod crossbeam;
19
20use std::iter::{self, Iterator};
21use std::vec;
22use std::sync::mpsc;
23
24pub trait TryIterRecv<'a> {
33 type Iter: Iterator + 'a;
34 type Error;
35 fn try_iter(&'a self) -> Self::Iter;
36 fn recv(&self) -> Result<<Self::Iter as Iterator>::Item, Self::Error>;
37}
38
39pub trait BatchRecv<'a>: TryIterRecv<'a> {
41 fn batch_recv(&'a self, n: usize) -> Result<BatchIter<Self::Iter>, Self::Error> {
44 let first = self.recv()?;
45 let rest = self.try_iter().take(n - 1);
46 let inner = vec![first].into_iter().chain(rest);
47 Ok(inner)
48 }
49}
50impl<'a, T> BatchRecv<'a> for T
51where
52 T: TryIterRecv<'a>,
53{
54}
55
56pub type BatchIter<I> = iter::Chain<vec::IntoIter<<I as Iterator>::Item>, iter::Take<I>>;
57
58impl<'a, T: 'a> TryIterRecv<'a> for mpsc::Receiver<T> {
59 type Iter = mpsc::TryIter<'a, T>;
60 type Error = mpsc::RecvError;
61 fn try_iter(&'a self) -> mpsc::TryIter<'a, T> {
62 mpsc::Receiver::try_iter(&self)
63 }
64 fn recv(&self) -> Result<T, Self::Error> {
65 mpsc::Receiver::recv(&self)
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use std::sync::mpsc;
72 use std::thread;
73 use super::BatchRecv;
74 #[test]
75 fn take3of4() {
76 let (tx, rx) = mpsc::sync_channel(10);
77 tx.send(1).unwrap();
78 tx.send(2).unwrap();
79 tx.send(3).unwrap();
80 tx.send(4).unwrap();
81 let first3: Vec<_> = rx.batch_recv(3).unwrap().collect();
82 assert_eq!(first3, vec![1, 2, 3]);
83 }
84
85 #[test]
86 fn take2of2() {
87 let (tx, rx) = mpsc::sync_channel(10);
88 tx.send(1).unwrap();
89 tx.send(2).unwrap();
90 let first2: Vec<_> = rx.batch_recv(3).unwrap().collect();
91 assert_eq!(first2, vec![1, 2]);
92 }
93
94 #[test]
95 fn take_first1() {
96 let (tx, rx) = mpsc::sync_channel(10);
97 thread::spawn(move || {
98 thread::yield_now();
99 tx.send(1).unwrap();
100 });
101 let first1: Vec<_> = rx.batch_recv(3).unwrap().collect();
102 assert_eq!(first1, vec![1]);
103 }
104}