#[cfg(feature = "chan")]
extern crate chan;
use std::collections::HashMap;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;
use std::hash::Hasher;
use std::sync::Arc;
use std::thread;
pub use filter::Filter;
pub use map::Mapper;
pub use multiplex::Multiplex;
pub use comms::{LockedReceiver, Receiver, ReceiverIntoIterator, Sender};
mod comms {
use std::cell::RefCell;
use std::collections::VecDeque;
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use super::PipelineConfig;
#[derive(Debug)]
pub struct Sender<Out> {
tx: mpsc::SyncSender<VecDeque<Out>>,
config: PipelineConfig,
buffer: RefCell<VecDeque<Out>>,
}
impl<Out> Sender<Out> {
pub fn send(&self, out: Out) -> () {
let new_len = {
let mut buff = self.buffer.borrow_mut();
buff.push_back(out);
buff.len()
};
if new_len >= self.config.batch_size {
self.flush()
}
}
pub fn flush(&self) {
let old_buffer = self.buffer
.replace(VecDeque::with_capacity(self.config.batch_size));
if old_buffer.len() > 0 {
self.tx.send(old_buffer).expect("failed send");
}
}
pub(super) fn pair(config: PipelineConfig) -> (Self, Receiver<Out>) {
let (tx, rx) = mpsc::sync_channel(config.buff_size);
let tx_buffer = VecDeque::with_capacity(config.batch_size);
let rx_buffer = VecDeque::with_capacity(config.batch_size);
(
Self {
tx,
config,
buffer: RefCell::new(tx_buffer),
},
Receiver {
rx,
buffer: RefCell::new(rx_buffer),
},
)
}
}
impl<Out> Drop for Sender<Out> {
fn drop(&mut self) {
self.flush()
}
}
impl<Out> Clone for Sender<Out> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
config: self.config.clone(),
buffer: RefCell::new(VecDeque::with_capacity(
self.config.buff_size,
)),
}
}
}
#[derive(Debug)]
pub struct Receiver<In> {
rx: mpsc::Receiver<VecDeque<In>>,
buffer: RefCell<VecDeque<In>>,
}
impl<In> Receiver<In> {
pub fn recv(&mut self) -> Option<In> {
let current_len = {
let buff = self.buffer.borrow();
buff.len()
};
if current_len > 0 {
return self.buffer.get_mut().pop_front();
}
match self.rx.recv() {
Ok(val) => {
self.buffer.replace(val);
}
Err(_recv_err) => return None,
}
let current_len = {
let buff = self.buffer.borrow();
buff.len()
};
if current_len == 0 {
return None;
} else {
return self.buffer.get_mut().pop_front();
}
}
fn recv_buff(&mut self) -> Option<VecDeque<In>> {
let current_len = {
let buff = self.buffer.borrow();
buff.len()
};
if current_len > 0 {
return Some(self.buffer.replace(VecDeque::new()));
}
match self.rx.recv() {
Ok(val) => {
return Some(val);
}
Err(_recv_err) => return None,
}
}
}
impl<In> IntoIterator for Receiver<In> {
type Item = In;
type IntoIter = ReceiverIntoIterator<In>;
fn into_iter(self) -> Self::IntoIter {
ReceiverIntoIterator {
iter: self.rx.into_iter(),
buffer: self.buffer.into_inner(),
}
}
}
pub struct ReceiverIntoIterator<In> {
iter: mpsc::IntoIter<VecDeque<In>>,
buffer: VecDeque<In>,
}
impl<In> Iterator for ReceiverIntoIterator<In> {
type Item = In;
fn next(&mut self) -> Option<In> {
if self.buffer.len() == 0 {
match self.iter.next() {
Some(buff) => {
self.buffer = buff;
}
None => {
return None;
}
}
}
return self.buffer.pop_front();
}
}
#[derive(Debug)]
pub struct LockedReceiver<T>
where
T: Send + 'static,
{
lockbox: Arc<Mutex<Receiver<T>>>,
buffer: VecDeque<T>,
}
impl<T> LockedReceiver<T>
where
T: Send,
{
pub fn new(recv: Receiver<T>) -> Self {
Self {
lockbox: Arc::new(Mutex::new(recv)),
buffer: VecDeque::new(),
}
}
}
impl<T> Clone for LockedReceiver<T>
where
T: Send,
{
fn clone(&self) -> Self {
Self {
lockbox: self.lockbox.clone(),
buffer: VecDeque::new(),
}
}
}
impl<T> Iterator for LockedReceiver<T>
where
T: Send,
{
type Item = T;
fn next(&mut self) -> Option<T> {
if self.buffer.len() == 0 {
match self.lockbox
.lock()
.expect("failed unwrap mutex")
.recv_buff()
{
Some(buff) => self.buffer = buff,
None => {
return None;
}
}
}
return self.buffer.pop_front();
}
}
}
#[derive(Debug, Copy, Clone)]
pub struct PipelineConfig {
buff_size: usize,
batch_size: usize,
}
impl PipelineConfig {
pub fn buff_size(self, buff_size: usize) -> Self {
Self { buff_size, ..self }
}
pub fn batch_size(self, batch_size: usize) -> Self {
Self { batch_size, ..self }
}
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
buff_size: 10,
batch_size: 10,
}
}
}
#[derive(Debug)]
pub struct Pipeline<Output>
where
Output: Send + 'static,
{
rx: Receiver<Output>,
config: PipelineConfig,
}
impl<Output> Pipeline<Output>
where
Output: Send,
{
pub fn new<F>(func: F) -> Self
where
F: FnOnce(Sender<Output>) -> () + Send + 'static,
{
let config = PipelineConfig::default();
let (tx, rx) = Sender::pair(config);
thread::spawn(move || func(tx));
Pipeline { rx, config }
}
pub fn from<I>(source: I) -> Pipeline<Output>
where
I: IntoIterator<Item = Output> + Send + 'static,
{
Self::new(move |tx| {
for item in source {
tx.send(item);
}
})
}
pub fn configure(self, config: PipelineConfig) -> Self {
Pipeline {
rx: self.rx,
config,
}
}
pub fn then<EntryOut, Entry>(self, next: Entry) -> Pipeline<EntryOut>
where
Entry: PipelineEntry<Output, EntryOut> + Send + 'static,
EntryOut: Send,
{
self.pipe(move |tx, rx| next.process(tx, rx))
}
pub fn pipe<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
where
Func: FnOnce(Sender<EntryOut>, Receiver<Output>) -> () + Send + 'static,
EntryOut: Send,
{
let config = self.config.clone();
let (tx, rx) = Sender::pair(config.clone());
thread::spawn(move || {
func(tx, self.rx);
});
Pipeline { rx, config: config }
}
pub fn ppipe<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
where
Func: Fn(Sender<EntryOut>, LockedReceiver<Output>) -> ()
+ Send
+ Sync
+ 'static,
Output: Send,
EntryOut: Send,
{
let (master_tx, master_rx) = Sender::pair(self.config.clone());
let (chan_tx, chan_rx) = Sender::pair(self.config.clone());
let chan_rx = LockedReceiver::new(chan_rx);
let func = Arc::new(func);
for _ in 0..workers {
let entry_rx = chan_rx.clone();
let entry_tx = master_tx.clone();
let func = func.clone();
thread::spawn(move || {
func(entry_tx, entry_rx);
});
}
let config = self.config;
let rx = self.rx;
thread::spawn(move || {
for item in rx {
chan_tx.send(item);
}
});
Pipeline {
rx: master_rx,
config: config,
}
}
pub fn map<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
where
Func: Fn(Output) -> EntryOut + Send + 'static,
EntryOut: Send,
{
self.pipe(move |tx, rx| {
for entry in rx {
tx.send(func(entry));
}
})
}
pub fn pmap<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
where
Func: Fn(Output) -> EntryOut + Send + Sync + 'static,
EntryOut: Send,
{
if workers == 1 {
return self.map(func);
}
self.ppipe(workers, move |tx, rx| {
for item in rx {
tx.send(func(item))
}
})
}
pub fn filter<Func>(self, pred: Func) -> Pipeline<Output>
where
Func: Fn(&Output) -> bool + Send + 'static,
{
self.pipe(move |tx, rx| {
for entry in rx {
if pred(&entry) {
tx.send(entry);
}
}
})
}
pub fn drain(self) {
for _ in self {}
}
}
impl<OutKey, OutValue> Pipeline<(OutKey, OutValue)>
where
OutKey: Hash + Eq + Send,
OutValue: Send,
{
pub fn reduce<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
where
Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + 'static,
EntryOut: Send,
{
self.pipe(move |tx, rx| {
let mut by_key: HashMap<OutKey, Vec<OutValue>> = HashMap::new();
for (key, value) in rx {
by_key.entry(key).or_insert_with(Vec::new).push(value)
}
for (key, values) in by_key.into_iter() {
let output = func(key, values);
tx.send(output);
}
})
}
pub fn distribute<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
where
Func: Fn(Sender<EntryOut>, Receiver<(OutKey, OutValue)>)
+ Send
+ Sync
+ 'static,
EntryOut: Send,
{
let func = Arc::new(func);
let pl_config = self.config.clone();
self.pipe(move |tx, rx| {
let mut txs = Vec::with_capacity(workers);
for _ in 0..workers {
let func = func.clone();
let (entry_tx, entry_rx) = Sender::pair(pl_config);
let tx = tx.clone();
thread::spawn(move || func(tx, entry_rx));
txs.push(entry_tx);
}
for (key, value) in rx {
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let which = (hasher.finish() as usize) % workers;
txs[which].send((key, value));
}
})
}
pub fn preduce<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
where
Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + Sync + 'static,
OutKey: Send,
OutValue: Send,
EntryOut: Send,
{
if workers == 1 {
return self.reduce(func);
}
self.distribute(workers, move |tx, rx| {
let mut hm = HashMap::new();
for (k, v) in rx {
hm.entry(k).or_insert_with(Vec::new).push(v);
}
for (k, vs) in hm.into_iter() {
tx.send(func(k, vs));
}
})
}
}
impl<Output> IntoIterator for Pipeline<Output>
where
Output: Send,
{
type Item = Output;
type IntoIter = ReceiverIntoIterator<Output>;
fn into_iter(self) -> ReceiverIntoIterator<Output> {
self.rx.into_iter()
}
}
pub trait PipelineEntry<In, Out> {
fn process<I: IntoIterator<Item = In>>(self, tx: Sender<Out>, rx: I) -> ();
}
mod map {
use std::marker::PhantomData;
use super::{PipelineEntry, Sender};
#[derive(Debug)]
pub struct Mapper<In, Out, Func>
where
Func: Fn(In) -> Out,
{
func: Func,
in_: PhantomData<In>,
out_: PhantomData<Out>,
}
impl<In, Out, Func> Mapper<In, Out, Func>
where
Func: Fn(In) -> Out,
{
pub fn new(func: Func) -> Self {
Mapper {
func,
in_: PhantomData,
out_: PhantomData,
}
}
}
impl<In, Out, Func> PipelineEntry<In, Out> for Mapper<In, Out, Func>
where
Func: Fn(In) -> Out,
{
fn process<I: IntoIterator<Item = In>>(self, tx: Sender<Out>, rx: I) {
for item in rx {
let mapped = (self.func)(item);
tx.send(mapped);
}
}
}
impl<In, Out, Func> Clone for Mapper<In, Out, Func>
where
Func: Fn(In) -> Out + Copy,
{
fn clone(&self) -> Self {
Mapper::new(self.func)
}
}
impl<In, Out, Func> Copy for Mapper<In, Out, Func>
where
Func: Fn(In) -> Out + Copy,
{
}
}
mod filter {
use std::marker::PhantomData;
use super::{PipelineEntry, Sender};
#[derive(Debug)]
pub struct Filter<In, Func>
where
Func: Fn(&In) -> bool,
{
func: Func,
in_: PhantomData<In>,
}
impl<In, Func> Filter<In, Func>
where
Func: Fn(&In) -> bool,
{
pub fn new(func: Func) -> Self {
Filter {
func,
in_: PhantomData,
}
}
}
impl<In, Func> PipelineEntry<In, In> for Filter<In, Func>
where
Func: Fn(&In) -> bool,
{
fn process<I: IntoIterator<Item = In>>(self, tx: Sender<In>, rx: I) {
for item in rx {
if (self.func)(&item) {
tx.send(item);
}
}
}
}
}
mod multiplex {
#![cfg_attr(feature = "cargo-clippy", allow(expl_impl_clone_on_copy))]
use std::marker::PhantomData;
use std::thread;
#[cfg(feature = "chan")]
use chan;
use super::{LockedReceiver, PipelineConfig, PipelineEntry, Sender};
#[derive(Debug)]
pub struct Multiplex<In, Out, Entry>
where
Entry: PipelineEntry<In, Out> + Send,
{
entries: Vec<Entry>,
in_: PhantomData<In>,
out_: PhantomData<Out>,
}
impl<In, Out, Entry> Multiplex<In, Out, Entry>
where
Entry: PipelineEntry<In, Out> + Send + Copy,
{
pub fn from(entry: Entry, workers: usize) -> Self {
Self::new((0..workers).map(|_| entry).collect())
}
}
impl<In, Out, Entry> Multiplex<In, Out, Entry>
where
Entry: PipelineEntry<In, Out> + Send,
{
pub fn new(entries: Vec<Entry>) -> Self {
Multiplex {
entries,
in_: PhantomData,
out_: PhantomData,
}
}
}
impl<In, Out, Entry> PipelineEntry<In, Out> for Multiplex<In, Out, Entry>
where
Entry: PipelineEntry<In, Out> + Send + 'static,
In: Send + 'static,
Out: Send + 'static,
{
fn process<I: IntoIterator<Item = In>>(
mut self,
tx: Sender<Out>,
rx: I,
) {
if self.entries.len() == 1 {
let entry = self.entries.pop().expect("len 1 but no entries?");
return entry.process(tx, rx);
}
if cfg!(feature = "chan") {
let (chan_tx, chan_rx) =
chan::sync(PipelineConfig::default().buff_size);
for entry in self.entries {
let entry_rx = chan_rx.clone();
let entry_tx = tx.clone();
thread::spawn(move || {
entry.process(entry_tx, entry_rx);
});
}
for item in rx {
chan_tx.send(item);
}
} else {
let (master_tx, chan_rx) =
Sender::pair(PipelineConfig::default());
let chan_rx = LockedReceiver::new(chan_rx);
for entry in self.entries {
let entry_rx = chan_rx.clone();
let entry_tx = tx.clone();
thread::spawn(move || {
entry.process(entry_tx, entry_rx);
});
}
for item in rx {
master_tx.send(item);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn simple() {
let source: Vec<i32> = vec![1, 2, 3];
let pbb: Pipeline<i32> = Pipeline::from(source);
let produced: Vec<i32> = pbb.into_iter().collect();
assert_eq!(produced, vec![1, 2, 3]);
}
#[test]
fn map() {
let source: Vec<i32> = (1..100).collect();
let expect: Vec<i32> = source.iter().map(|x| x * 2).collect();
let pbb: Pipeline<i32> = Pipeline::from(source).map(|i| i * 2);
let produced: Vec<i32> = pbb.into_iter().collect();
assert_eq!(produced, expect);
}
#[test]
fn multiple_map() {
let source: Vec<i32> = vec![1, 2, 3];
let expect: Vec<i32> =
source.iter().map(|x| (x * 2) * (x * 2)).collect();
let pbb: Pipeline<i32> =
Pipeline::from(source).map(|i| i * 2).map(|i| i * i);
let produced: Vec<i32> = pbb.into_iter().collect();
assert_eq!(produced, expect);
}
fn fib_work(n: u64) -> u64 {
const WORK_FACTOR: u64 = 10;
fib(WORK_FACTOR) + n
}
fn fib(n: u64) -> u64 {
if n == 0 || n == 1 {
1
} else {
fib(n - 1) + fib(n - 2)
}
}
#[test]
fn multiplex_map_function() {
let workers: usize = 10;
let source: Vec<u64> = (1..1000).collect();
let expect: Vec<u64> =
source.clone().into_iter().map(fib_work).collect();
let pbb: Pipeline<u64> = Pipeline::from(source).then(
multiplex::Multiplex::from(map::Mapper::new(fib_work), workers),
);
let mut produced: Vec<u64> = pbb.into_iter().collect();
produced.sort(); assert_eq!(produced, expect);
}
#[test]
fn multiplex_map_closure() {
let workers: usize = 10;
let source: Vec<i32> = (1..1000).collect();
let expect: Vec<i32> = source.iter().map(|x| x * 2).collect();
let pbb: Pipeline<i32> =
Pipeline::from(source).then(multiplex::Multiplex::new(
(0..workers).map(|_| map::Mapper::new(|i| i * 2)).collect(),
));
let mut produced: Vec<i32> = pbb.into_iter().collect();
produced.sort(); assert_eq!(produced, expect);
}
#[test]
fn filter() {
let source: Vec<i32> = (1..100).collect();
let expect: Vec<i32> = source
.iter()
.map(|x| x + 1)
.filter(|x| x % 2 == 0)
.collect();
let pbb: Pipeline<i32> =
Pipeline::from(source).map(|i| i + 1).filter(|i| i % 2 == 0);
let produced: Vec<i32> = pbb.into_iter().collect();
assert_eq!(produced, expect);
}
#[test]
fn simple_closure() {
let source: Vec<i32> = (1..100).collect();
let expect: Vec<i32> = source
.iter()
.map(|x| x + 1)
.filter(|x| x % 2 == 0)
.collect();
let pbb: Pipeline<i32> = Pipeline::from(source).pipe(|tx, rx| {
for item in rx {
let item = item + 1;
if item % 2 == 0 {
tx.send(item);
}
}
});
let produced: Vec<i32> = pbb.into_iter().collect();
assert_eq!(produced, expect);
}
#[test]
fn pmap() {
let source: Vec<i32> = (1..100).collect();
let expect: Vec<i32> = source.iter().map(|x| x * 2).collect();
let workers: usize = 2;
let pbb: Pipeline<i32> =
Pipeline::from(source).pmap(workers, |i| i * 2);
let mut produced: Vec<i32> = pbb.into_iter().collect();
produced.sort();
assert_eq!(produced, expect);
}
#[test]
fn preduce() {
let source: Vec<i32> = (1..1000).collect();
let workers: usize = 2;
let expect = vec![(false, 1996), (true, 1998)];
let mut produced: Vec<(bool, i32)> = Pipeline::from(source)
.map(|x| (x % 3 == 0, x * 2))
.preduce(workers, |threevenness, nums| {
(threevenness, *nums.iter().max().unwrap())
})
.into_iter()
.collect();
produced.sort();
assert_eq!(produced, expect);
}
#[test]
fn mapreduce() {
let source: Vec<i32> = (1..1000).collect();
let workers: usize = 1;
let expect = vec![(false, 1996), (true, 1998)];
let mut produced: Vec<(bool, i32)> = Pipeline::from(source)
.pmap(workers, |x| x * 2)
.pmap(workers, |x| (x % 3 == 0, x))
.preduce(workers, |threevenness, nums| {
(threevenness, *nums.iter().max().unwrap())
})
.into_iter()
.collect();
produced.sort();
assert_eq!(produced, expect);
}
#[test]
fn config() {
let source: Vec<i32> = (1..100).collect();
let _: Vec<i32> = Pipeline::from(source)
.configure(PipelineConfig::default().buff_size(10))
.map(|x| x * 2)
.configure(PipelineConfig::default().buff_size(10))
.filter(|x| x % 3 == 0)
.into_iter()
.collect();
}
}