batch_recv/
lib.rs

1//! Batched receive from queues
2//!
3//! To use the [`batch_recv` method](./trait.BatchRecv.html#method.batch_recv), import the [`BatchRecv` trait](./trait.BatchRecv.html):
4//!
5//! ```
6//! use batch_recv::BatchRecv;
7//! ```
8//!
9//! ## Crate Features
10//! - `crossbeam`
11//!   - Disabled by default
12//!   - Provied [`TryIterRecv` trait](./trait.TryIterRecv.html) implementation for crossbeam_channel::Receiver
13
14#[cfg(feature = "crossbeam")]
15extern crate crossbeam_channel;
16
17#[cfg(feature = "crossbeam")]
18mod crossbeam;
19
20use std::iter::{self, Iterator};
21use std::vec;
22use std::sync::mpsc;
23
24/// Trait for queues which have batch-able receiving method.
25///
26/// This is mainly used by [`BatchRecv` trait](./trait.BatchRecv.html).
27///
28/// At first, [`BatchRecv` trait](./trait.BatchRecv.html) calls `recv`
29/// which blocks the thread until the first value comes.
30///
31/// And then, it calls `try_iter` to retrieve the following values.
32pub trait TryIterRecv<'a> {
33    type Iter: Iterator + 'a;
34    type Error;
35    fn try_iter(&'a self) -> Self::Iter;
36    fn recv(&self) -> Result<<Self::Iter as Iterator>::Item, Self::Error>;
37}
38
39/// Trait which provides batched receiving method.
40pub trait BatchRecv<'a>: TryIterRecv<'a> {
41    /// Creates an iterator yields its first `n` values.
42    /// It blocks until the first value comes if the queue is empty.
43    fn batch_recv(&'a self, n: usize) -> Result<BatchIter<Self::Iter>, Self::Error> {
44        let first = self.recv()?;
45        let rest = self.try_iter().take(n - 1);
46        let inner = vec![first].into_iter().chain(rest);
47        Ok(inner)
48    }
49}
50impl<'a, T> BatchRecv<'a> for T
51where
52    T: TryIterRecv<'a>,
53{
54}
55
56pub type BatchIter<I> = iter::Chain<vec::IntoIter<<I as Iterator>::Item>, iter::Take<I>>;
57
58impl<'a, T: 'a> TryIterRecv<'a> for mpsc::Receiver<T> {
59    type Iter = mpsc::TryIter<'a, T>;
60    type Error = mpsc::RecvError;
61    fn try_iter(&'a self) -> mpsc::TryIter<'a, T> {
62        mpsc::Receiver::try_iter(&self)
63    }
64    fn recv(&self) -> Result<T, Self::Error> {
65        mpsc::Receiver::recv(&self)
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use std::sync::mpsc;
72    use std::thread;
73    use super::BatchRecv;
74    #[test]
75    fn take3of4() {
76        let (tx, rx) = mpsc::sync_channel(10);
77        tx.send(1).unwrap();
78        tx.send(2).unwrap();
79        tx.send(3).unwrap();
80        tx.send(4).unwrap();
81        let first3: Vec<_> = rx.batch_recv(3).unwrap().collect();
82        assert_eq!(first3, vec![1, 2, 3]);
83    }
84
85    #[test]
86    fn take2of2() {
87        let (tx, rx) = mpsc::sync_channel(10);
88        tx.send(1).unwrap();
89        tx.send(2).unwrap();
90        let first2: Vec<_> = rx.batch_recv(3).unwrap().collect();
91        assert_eq!(first2, vec![1, 2]);
92    }
93
94    #[test]
95    fn take_first1() {
96        let (tx, rx) = mpsc::sync_channel(10);
97        thread::spawn(move || {
98            thread::yield_now();
99            tx.send(1).unwrap();
100        });
101        let first1: Vec<_> = rx.batch_recv(3).unwrap().collect();
102        assert_eq!(first1, vec![1]);
103    }
104}