#![deny(missing_docs, missing_debug_implementations, missing_copy_implementations,
trivial_numeric_casts, unstable_features, unused_import_braces, unused_qualifications)]
extern crate bincode;
extern crate byteorder;
extern crate flate2;
extern crate serde;
mod receiver;
mod sender;
mod private;
mod deque;
pub use self::receiver::Receiver;
pub use self::sender::Sender;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::{fs, io, mem, sync};
use std::sync::atomic::AtomicUsize;
use std::path::Path;
#[derive(Debug)]
pub enum Error {
NoSuchDirectory,
IoError(io::Error),
NoFlush,
Full,
}
pub fn channel<T>(name: &str, data_dir: &Path) -> Result<(Sender<T>, Receiver<T>), Error>
where
T: Serialize + DeserializeOwned,
{
channel_with_explicit_capacity(name, data_dir, 0x100_000, 0x10_000_000, usize::max_value())
}
pub fn channel_with_explicit_capacity<T>(
name: &str,
data_dir: &Path,
max_memory_bytes: usize,
max_disk_bytes: usize,
max_disk_files: usize,
) -> Result<(Sender<T>, Receiver<T>), Error>
where
T: Serialize + DeserializeOwned,
{
let root = data_dir.join(name);
if !root.is_dir() {
match fs::create_dir_all(root.clone()) {
Ok(()) => {}
Err(e) => {
return Err(Error::IoError(e));
}
}
}
let sz = mem::size_of::<T>();
let max_disk_bytes = ::std::cmp::max(0x100_000, max_disk_bytes);
let total_memory_limit: usize = ::std::cmp::max(1, max_memory_bytes / sz);
let q: private::Queue<T> = deque::Queue::with_capacity(total_memory_limit);
if let Err(e) = private::clear_directory(&root) {
return Err(Error::IoError(e));
}
let max_disk_files = sync::Arc::new(AtomicUsize::new(max_disk_files));
let sender = Sender::new(
name,
&root,
max_disk_bytes,
q.clone(),
sync::Arc::clone(&max_disk_files),
)?;
let receiver = Receiver::new(&root, q, sync::Arc::clone(&max_disk_files))?;
Ok((sender, receiver))
}
#[cfg(test)]
mod test {
extern crate quickcheck;
extern crate tempdir;
use self::quickcheck::{QuickCheck, TestResult};
use super::channel_with_explicit_capacity;
use std::{mem, thread};
#[test]
fn ingress_shedding() {
if let Ok(dir) = tempdir::TempDir::new("hopper") {
if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity::<u64>(
"round_trip_order_preserved", dir.path(), 8, 32, 2, ) {
let total_elems = 5 * 131082;
let expected_shed_sends = 383981;
let mut shed_sends = 0;
let mut sent_values = Vec::new();
for i in 0..total_elems {
loop {
match snd.send(i) {
Ok(()) => {
sent_values.push(i);
break;
}
Err((r, err)) => {
assert_eq!(r, i);
match err {
super::Error::Full => {
shed_sends += 1;
break;
}
_ => {
continue;
}
}
}
}
}
}
assert_eq!(shed_sends, expected_shed_sends);
let mut received_elements = 0;
let mut attempts = 0;
loop {
match rcv.iter().next() {
None => {
attempts += 1;
assert!(attempts < 10_000);
}
Some(res) => {
received_elements += 1;
assert_eq!(res, 0);
break;
}
}
}
loop {
if snd.flush().is_ok() {
break;
}
}
let mut attempts = 0;
for i in &sent_values[1..] {
loop {
match rcv.iter().next() {
None => {
attempts += 1;
assert!(attempts < 10_000);
}
Some(res) => {
received_elements += 1;
assert_eq!(*i, res);
break;
}
}
}
}
assert_eq!(received_elements, sent_values.len());
}
}
}
fn round_trip_exp(
in_memory_limit: usize,
max_bytes: usize,
max_disk_files: usize,
total_elems: usize,
) -> bool {
if let Ok(dir) = tempdir::TempDir::new("hopper") {
if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
"round_trip_order_preserved",
dir.path(),
in_memory_limit,
max_bytes,
max_disk_files,
) {
for i in 0..total_elems {
loop {
if snd.send(i).is_ok() {
break;
}
}
}
let mut attempts = 0;
loop {
match rcv.iter().next() {
None => {
attempts += 1;
assert!(attempts < 10_000);
}
Some(res) => {
assert_eq!(res, 0);
break;
}
}
}
loop {
if snd.flush().is_ok() {
break;
}
}
for i in 1..total_elems {
let mut attempts = 0;
loop {
match rcv.iter().next() {
None => {
attempts += 1;
assert!(attempts < 10_000);
}
Some(res) => {
assert_eq!(res, i);
break;
}
}
}
}
}
}
true
}
#[test]
fn round_trip() {
fn inner(in_memory_limit: usize, max_bytes: usize, total_elems: usize) -> TestResult {
let sz = mem::size_of::<u64>();
if (in_memory_limit / sz) == 0 || (max_bytes / sz) == 0 || total_elems == 0 {
return TestResult::discard();
}
let max_disk_files = usize::max_value();
TestResult::from_bool(round_trip_exp(
in_memory_limit,
max_bytes,
max_disk_files,
total_elems,
))
}
QuickCheck::new().quickcheck(inner as fn(usize, usize, usize) -> TestResult);
}
fn multi_thread_concurrent_snd_and_rcv_round_trip_exp(
total_senders: usize,
in_memory_bytes: usize,
disk_bytes: usize,
max_disk_files: usize,
vals: Vec<u64>,
) -> bool {
if let Ok(dir) = tempdir::TempDir::new("hopper") {
if let Ok((snd, mut rcv)) = channel_with_explicit_capacity(
"tst",
dir.path(),
in_memory_bytes,
disk_bytes,
max_disk_files,
) {
let chunk_size = vals.len() / total_senders;
let mut snd_jh = Vec::new();
let snd_vals = vals.clone();
for chunk in snd_vals.chunks(chunk_size) {
let mut thr_snd = snd.clone();
let chunk = chunk.to_vec();
snd_jh.push(thread::spawn(move || {
let mut queued = Vec::new();
for mut ev in chunk {
loop {
match thr_snd.send(ev) {
Err(res) => {
ev = res.0;
}
Ok(()) => {
break;
}
}
}
queued.push(ev);
}
let mut attempts = 0;
loop {
if thr_snd.flush().is_ok() {
break;
}
thread::sleep(::std::time::Duration::from_millis(100));
attempts += 1;
assert!(attempts < 10);
}
queued
}))
}
let expected_total_vals = vals.len();
let rcv_jh = thread::spawn(move || {
let mut collected = Vec::new();
let mut rcv_iter = rcv.iter();
while collected.len() < expected_total_vals {
let mut attempts = 0;
loop {
match rcv_iter.next() {
None => {
attempts += 1;
assert!(attempts < 10_000);
}
Some(res) => {
collected.push(res);
break;
}
}
}
}
collected
});
let mut snd_vals: Vec<u64> = Vec::new();
for jh in snd_jh {
snd_vals.append(&mut jh.join().expect("snd join failed"));
}
let mut rcv_vals = rcv_jh.join().expect("rcv join failed");
rcv_vals.sort();
snd_vals.sort();
assert_eq!(rcv_vals, snd_vals);
}
}
true
}
#[test]
fn explicit_multi_thread_concurrent_snd_and_rcv_round_trip() {
let total_senders = 10;
let in_memory_bytes = 50;
let disk_bytes = 10;
let max_disk_files = 100;
let vals = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let mut loops = 0;
loop {
assert!(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
total_senders,
in_memory_bytes,
disk_bytes,
max_disk_files,
vals.clone(),
));
loops += 1;
if loops > 2_500 {
break;
}
thread::sleep(::std::time::Duration::from_millis(1));
}
}
#[test]
fn multi_thread_concurrent_snd_and_rcv_round_trip() {
fn inner(
total_senders: usize,
in_memory_bytes: usize,
disk_bytes: usize,
max_disk_files: usize,
vals: Vec<u64>,
) -> TestResult {
let sz = mem::size_of::<u64>();
if total_senders == 0 || total_senders > 10 || vals.len() == 0
|| (vals.len() < total_senders) || (in_memory_bytes / sz) == 0
|| (disk_bytes / sz) == 0
{
return TestResult::discard();
}
TestResult::from_bool(multi_thread_concurrent_snd_and_rcv_round_trip_exp(
total_senders,
in_memory_bytes,
disk_bytes,
max_disk_files,
vals,
))
}
QuickCheck::new()
.quickcheck(inner as fn(usize, usize, usize, usize, Vec<u64>) -> TestResult);
}
fn single_sender_single_rcv_round_trip_exp(
in_memory_bytes: usize,
disk_bytes: usize,
max_disk_files: usize,
total_vals: usize,
) -> bool {
if let Ok(dir) = tempdir::TempDir::new("hopper") {
if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity(
"tst",
dir.path(),
in_memory_bytes,
disk_bytes,
max_disk_files,
) {
let builder = thread::Builder::new();
if let Ok(snd_jh) = builder.spawn(move || {
for i in 0..total_vals {
loop {
if snd.send(i).is_ok() {
break;
}
}
}
let mut attempts = 0;
loop {
if snd.flush().is_ok() {
break;
}
thread::sleep(::std::time::Duration::from_millis(100));
attempts += 1;
assert!(attempts < 10);
}
}) {
let builder = thread::Builder::new();
if let Ok(rcv_jh) = builder.spawn(move || {
let mut rcv_iter = rcv.iter();
let mut cur = 0;
while cur != total_vals {
let mut attempts = 0;
loop {
if let Some(rcvd) = rcv_iter.next() {
debug_assert_eq!(
cur, rcvd,
"FAILED TO GET ALL IN ORDER: {:?}",
rcvd,
);
cur += 1;
break;
} else {
attempts += 1;
assert!(attempts < 10_000);
}
}
}
}) {
snd_jh.join().expect("snd join failed");
rcv_jh.join().expect("rcv join failed");
}
}
}
}
true
}
#[test]
fn explicit_single_sender_single_rcv_round_trip() {
let mut loops = 0;
loop {
assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5, 10));
loops += 1;
if loops > 2_500 {
break;
}
thread::sleep(::std::time::Duration::from_millis(1));
}
}
#[test]
fn single_sender_single_rcv_round_trip() {
fn inner(
in_memory_bytes: usize,
disk_bytes: usize,
max_disk_files: usize,
total_vals: usize,
) -> TestResult {
let sz = mem::size_of::<u64>();
if total_vals == 0 || (in_memory_bytes / sz) == 0 || (disk_bytes / sz) == 0 {
return TestResult::discard();
}
TestResult::from_bool(single_sender_single_rcv_round_trip_exp(
in_memory_bytes,
disk_bytes,
max_disk_files,
total_vals,
))
}
QuickCheck::new().quickcheck(inner as fn(usize, usize, usize, usize) -> TestResult);
}
}