#[deny(missing_docs)]
extern crate futures;
extern crate futures_cpupool;
extern crate num_cpus;
#[macro_use]
extern crate pub_iterator_type;
use futures::Future;
use futures_cpupool::{CpuFuture, CpuPool};
use std::collections::VecDeque;
use std::sync::Arc;
pub trait ParMap: Iterator + Sized {
fn par_map<B, F>(self, f: F) -> Map<Self, B, F>
where
F: Sync + Send + 'static + Fn(Self::Item) -> B,
B: Send + 'static,
Self::Item: Send + 'static,
{
self.with_nb_threads(num_cpus::get()).par_map(f)
}
fn par_flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where
F: Sync + Send + 'static + Fn(Self::Item) -> U,
U: IntoIterator,
U::Item: Send + 'static,
Self::Item: Send + 'static,
{
self.with_nb_threads(num_cpus::get()).par_flat_map(f)
}
fn pack(self, nb: usize) -> Pack<Self> {
Pack { iter: self, nb: nb }
}
fn par_packed_map<'a, B, F>(self, nb: usize, f: F) -> PackedMap<'a, B>
where
F: Sync + Send + 'static + Fn(Self::Item) -> B,
B: Send + 'static,
Self::Item: Send + 'static,
Self: 'a,
{
self.with_nb_threads(num_cpus::get()).par_packed_map(nb, f)
}
fn par_packed_flat_map<'a, U, F>(self, nb: usize, f: F) -> PackedFlatMap<'a, U::Item>
where
F: Sync + Send + 'static + Fn(Self::Item) -> U,
U: IntoIterator + 'a,
U::Item: Send + 'static,
Self::Item: Send + 'static,
Self: 'a,
{
self.with_nb_threads(num_cpus::get()).par_packed_flat_map(nb, f)
}
fn with_nb_threads(self, nb: usize) -> ParMapBuilder<Self> {
ParMapBuilder {
iter: self,
nb_threads: nb,
}
}
}
impl<I: Iterator> ParMap for I {}
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
pub struct Map<I, B, F> {
pool: CpuPool,
queue: VecDeque<CpuFuture<B, ()>>,
iter: I,
f: Arc<F>,
}
impl<I: Iterator, B: Send + 'static, F> Map<I, B, F>
where
F: Sync + Send + 'static + Fn(I::Item) -> B,
I::Item: Send + 'static,
{
fn spawn(&mut self) {
let future = match self.iter.next() {
None => return,
Some(item) => {
let f = self.f.clone();
self.pool.spawn_fn(move || Ok(f(item)))
}
};
self.queue.push_back(future);
}
fn with_nb_threads(iter: I, f: F, nb_thread: usize) -> Self {
let mut res = Map {
pool: CpuPool::new(nb_thread),
queue: VecDeque::new(),
iter: iter,
f: Arc::new(f),
};
for _ in 0..nb_thread * 2 {
res.spawn();
}
res
}
}
impl<I: Iterator, B: Send + 'static, F> Iterator for Map<I, B, F>
where
F: Sync + Send + 'static + Fn(I::Item) -> B,
I::Item: Send + 'static,
{
type Item = B;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop_front().map(|future| {
let i = future.wait().unwrap();
self.spawn();
i
})
}
}
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
pub struct FlatMap<I: Iterator, U: IntoIterator, F> {
pool: CpuPool,
queue: VecDeque<CpuFuture<Vec<U::Item>, ()>>,
iter: I,
f: Arc<F>,
cur_iter: ::std::vec::IntoIter<U::Item>,
}
impl<I: Iterator, U: IntoIterator, F> FlatMap<I, U, F>
where
F: Sync + Send + 'static + Fn(I::Item) -> U,
U::Item: Send + 'static,
I::Item: Send + 'static,
{
fn spawn(&mut self) {
let future = match self.iter.next() {
None => return,
Some(item) => {
let f = self.f.clone();
self.pool
.spawn_fn(move || Ok(f(item).into_iter().collect()))
}
};
self.queue.push_back(future);
}
fn with_nb_threads(iter: I, f: F, nb_thread: usize) -> Self {
let mut res = FlatMap {
pool: CpuPool::new(nb_thread),
queue: VecDeque::new(),
iter: iter,
f: Arc::new(f),
cur_iter: vec![].into_iter(),
};
for _ in 0..nb_thread * 2 {
res.spawn();
}
res
}
}
impl<I: Iterator, U: IntoIterator, F> Iterator for FlatMap<I, U, F>
where
F: Sync + Send + 'static + Fn(I::Item) -> U,
U::Item: Send + 'static,
I::Item: Send + 'static,
{
type Item = U::Item;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(item) = self.cur_iter.next() {
return Some(item);
}
let v = match self.queue.pop_front() {
Some(future) => future.wait().unwrap(),
None => return None,
};
self.cur_iter = v.into_iter();
self.spawn();
}
}
}
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
pub struct Pack<I> {
iter: I,
nb: usize,
}
impl<I: Iterator> Iterator for Pack<I> {
type Item = Vec<I::Item>;
fn next(&mut self) -> Option<Self::Item> {
let item: Vec<_> = self.iter.by_ref().take(self.nb).collect();
if item.is_empty() {
None
} else {
Some(item)
}
}
}
pub_iterator_type! {
#[doc="As `Map` but packed."]
PackedMap['a, B] = Box<Iterator<Item = B> + 'a>
}
pub_iterator_type! {
#[doc="As `FlatMap` but packed."]
PackedFlatMap['a, T] = Box<Iterator<Item = T> + 'a>
}
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
pub struct ParMapBuilder<I> {
iter: I,
nb_threads: usize,
}
impl<I: Iterator> ParMapBuilder<I> {
pub fn par_map<B, F>(self, f: F) -> Map<I, B, F>
where
F: Sync + Send + 'static + Fn(I::Item) -> B,
B: Send + 'static,
I::Item: Send + 'static,
{
Map::with_nb_threads(self.iter, f, self.nb_threads)
}
pub fn par_flat_map<U, F>(self, f: F) -> FlatMap<I, U, F>
where
F: Sync + Send + 'static + Fn(I::Item) -> U,
U: IntoIterator,
U::Item: Send + 'static,
I::Item: Send + 'static,
{
FlatMap::with_nb_threads(self.iter, f, self.nb_threads)
}
pub fn par_packed_map<'a, B, F>(self, nb: usize, f: F) -> PackedMap<'a, B>
where
F: Sync + Send + 'static + Fn(I::Item) -> B,
B: Send + 'static,
I::Item: Send + 'static,
Self: 'a,
{
let f = Arc::new(f);
let f = move |iter: Vec<I::Item>| {
let f = f.clone();
iter.into_iter().map(move |i| f(i))
};
PackedMap(Box::new(
self.iter
.pack(nb)
.with_nb_threads(self.nb_threads)
.par_flat_map(f),
))
}
pub fn par_packed_flat_map<'a, U, F>(self, nb: usize, f: F) -> PackedFlatMap<'a, U::Item>
where
F: Sync + Send + 'static + Fn(I::Item) -> U,
U: IntoIterator + 'a,
U::Item: Send + 'static,
I::Item: Send + 'static,
Self: 'a,
{
let f = Arc::new(f);
let f = move |iter: Vec<I::Item>| {
let f = f.clone();
iter.into_iter().flat_map(move |i| f(i))
};
PackedFlatMap(Box::new(
self.iter
.pack(nb)
.with_nb_threads(self.nb_threads)
.par_flat_map(f),
))
}
}