bchan 1.0.0

A buffered channel similar to std::sync::mpsc::channel, but faster.
Documentation
//! A buffered channel similar to std::sync::mpsc::channel, but faster.
//!
//! # Example
//! ```
//!    let (mut tx, mut rx) = bchan::channel::<u64>(4096);
//!    std::thread::scope(|s| {
//!        s.spawn(move || {
//!            for i in 0..100000 {
//!                tx.send(i);
//!            }
//!        });
//!        s.spawn(move || {
//!            let mut count = 0;
//!            while let Some(x) = rx.recv() {
//!                assert!( x == count );
//!                count += 1;
//!            }
//!        });
//!    });
//! ```

use std::collections::VecDeque;
use std::sync::mpsc;

/// Sender
pub struct Sender<T> {
    v: VecDeque<T>,
    s: mpsc::Sender<VecDeque<T>>,
    n: usize,
}

/// Receiver
pub struct Receiver<T> {
    v: VecDeque<T>,
    r: mpsc::Receiver<VecDeque<T>>,
}

impl<T> Sender<T> {
    pub fn send(&mut self, t: T) {
        self.v.push_back(t);
        if self.v.len() == self.n {
            self.flush();
        }
    }
    pub fn flush(&mut self) {
        let v = std::mem::take(&mut self.v);
        self.s.send(v).unwrap();
        self.v.reserve(self.n);
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.flush();
    }
}

impl<T> Receiver<T> {
    pub fn recv(&mut self) -> Option<T> {
        loop {
            if let Some(t) = self.v.pop_front() {
                return Some(t);
            }
            if let Ok(v) = self.r.recv() {
                self.v = v;
            } else {
                return None;
            }
        }
    }
}

/// Returns channel similar to mpsc::channel that buffers n elements.
pub fn channel<T>(n: usize) -> (Sender<T>, Receiver<T>) {
    let (s, r) = mpsc::channel::<VecDeque<T>>();
    let s = Sender {
        v: VecDeque::with_capacity(n),
        s,
        n,
    };
    let r = Receiver {
        v: VecDeque::new(),
        r,
    };
    (s, r)
}

#[cfg(test)]
const TSIZE: u64 = 10_000_000;

#[test]
fn test1() // Uses buffered mpsc channel
{
    use std::thread;

    let start = std::time::Instant::now();

    let (mut tx, mut rx) = channel::<u64>(4096);
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..TSIZE {
                tx.send(i);
            }
        });
        s.spawn(move || {
            let mut total = 0;
            while let Some(_x) = rx.recv() {
                total += 1;
            }
            assert!(total == TSIZE);
        });
    });

    let t = start.elapsed().as_millis() as u64;
    println!("test1 Time elapsed ={}", t);
}

#[test]
fn test2() // Uses standard mpsc channel
{
    use std::thread;

    let start = std::time::Instant::now();

    let (tx, rx) = mpsc::channel::<u64>();
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..TSIZE {
                tx.send(i).unwrap();
            }
        });
        s.spawn(move || {
            let mut total = 0;
            while let Ok(_x) = rx.recv() {
                total += 1;
            }
            assert!(total == TSIZE);
        });
    });

    let t = start.elapsed().as_millis() as u64;
    println!("test2 Time elapsed ={}", t);
}

#[test]
fn test3() // Uses bufchan channel
{
    use std::thread;

    let start = std::time::Instant::now();

    let (mut tx, mut rx) = bufchan::unbounded();
    thread::scope(|s| {
        s.spawn(move || {
            for i in 0..TSIZE {
                tx.send(i);
            }
        });
        s.spawn(move || {
            let mut total = 0;
            while let Some(_x) = rx.recv() {
                total += 1;
            }
            assert!(total == TSIZE);
        });
    });

    let t = start.elapsed().as_millis() as u64;
    println!("test3 Time elapsed ={}", t);
}