use std::collections::VecDeque;
use std::sync::mpsc;
pub struct Sender<T> {
v: VecDeque<T>,
s: mpsc::Sender<VecDeque<T>>,
n: usize,
}
pub struct Receiver<T> {
v: VecDeque<T>,
r: mpsc::Receiver<VecDeque<T>>,
}
impl<T> Sender<T> {
pub fn send(&mut self, t: T) {
self.v.push_back(t);
if self.v.len() == self.n {
self.flush();
}
}
pub fn flush(&mut self) {
let v = std::mem::take(&mut self.v);
self.s.send(v).unwrap();
self.v.reserve(self.n);
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.flush();
}
}
impl<T> Receiver<T> {
pub fn recv(&mut self) -> Option<T> {
loop {
if let Some(t) = self.v.pop_front() {
return Some(t);
}
if let Ok(v) = self.r.recv() {
self.v = v;
} else {
return None;
}
}
}
}
pub fn channel<T>(n: usize) -> (Sender<T>, Receiver<T>) {
let (s, r) = mpsc::channel::<VecDeque<T>>();
let s = Sender {
v: VecDeque::with_capacity(n),
s,
n,
};
let r = Receiver {
v: VecDeque::new(),
r,
};
(s, r)
}
#[cfg(test)]
const TSIZE: u64 = 10_000_000;
#[test]
fn test1() {
use std::thread;
let start = std::time::Instant::now();
let (mut tx, mut rx) = channel::<u64>(4096);
thread::scope(|s| {
s.spawn(move || {
for i in 0..TSIZE {
tx.send(i);
}
});
s.spawn(move || {
let mut total = 0;
while let Some(_x) = rx.recv() {
total += 1;
}
assert!(total == TSIZE);
});
});
let t = start.elapsed().as_millis() as u64;
println!("test1 Time elapsed ={}", t);
}
#[test]
fn test2() {
use std::thread;
let start = std::time::Instant::now();
let (tx, rx) = mpsc::channel::<u64>();
thread::scope(|s| {
s.spawn(move || {
for i in 0..TSIZE {
tx.send(i).unwrap();
}
});
s.spawn(move || {
let mut total = 0;
while let Ok(_x) = rx.recv() {
total += 1;
}
assert!(total == TSIZE);
});
});
let t = start.elapsed().as_millis() as u64;
println!("test2 Time elapsed ={}", t);
}
#[test]
fn test3() {
use std::thread;
let start = std::time::Instant::now();
let (mut tx, mut rx) = bufchan::unbounded();
thread::scope(|s| {
s.spawn(move || {
for i in 0..TSIZE {
tx.send(i);
}
});
s.spawn(move || {
let mut total = 0;
while let Some(_x) = rx.recv() {
total += 1;
}
assert!(total == TSIZE);
});
});
let t = start.elapsed().as_millis() as u64;
println!("test3 Time elapsed ={}", t);
}