use crate::pipeline::node::InOut;
use std::marker::PhantomData;
use crate::thread_pool::ThreadPool;
#[derive(Clone)]
pub struct Map<TIn, TOut, F>
where
TIn: Send,
TOut: Send,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<(TIn, TOut)>,
}
impl<TIn, TOut, F> Map<TIn, TOut, F>
where
TIn: Send + Clone,
TOut: Send + Clone + 'static,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
pub fn build<TInIter, TOutIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<TOut>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<TOut>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, TOut, TOutIter, F> InOut<TInIter, TOutIter> for Map<TIn, TOut, F>
where
TIn: Send + Clone,
TInIter: IntoIterator<Item = TIn>,
TOut: Send + Clone + 'static,
TOutIter: FromIterator<TOut>,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self.threadpool.par_map(input, self.f).collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
}
#[derive(Clone)]
pub struct OrderedMap<TIn, TOut, F>
where
TIn: Send,
TOut: Send,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<(TIn, TOut)>,
}
impl<TIn, TOut, F> OrderedMap<TIn, TOut, F>
where
TIn: Send + Clone,
TOut: Send + Clone + 'static,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
pub fn build<TInIter, TOutIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<TOut>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<TOut>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, TOut, TOutIter, F> InOut<TInIter, TOutIter> for OrderedMap<TIn, TOut, F>
where
TIn: Send + Clone,
TInIter: IntoIterator<Item = TIn>,
TOut: Send + Clone + 'static,
TOutIter: FromIterator<TOut>,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self.threadpool.par_map(input, self.f).collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
fn is_ordered(&self) -> bool {
true
}
}
#[derive(Clone)]
pub struct FlatMap<TIn, TOut, F>
where
TIn: Send + IntoIterator,
TOut: Send + Iterator,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<(TIn, TOut)>,
}
impl<TIn, TOut, F> FlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TOut: Send + Clone + Iterator + 'static,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
pub fn build<TInIter, TOutIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, TOut, TOutIter, F> InOut<TInIter, TOutIter> for FlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TInIter: IntoIterator<Item = TIn>,
TOut: Send + Clone + Iterator + 'static,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self.threadpool.par_map(input, self.f).flatten().collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
}
#[derive(Clone)]
pub struct OrderedFlatMap<TIn, TOut, F>
where
TIn: Send + IntoIterator,
TOut: Send + Iterator,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<(TIn, TOut)>,
}
impl<TIn, TOut, F> OrderedFlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TOut: Send + Clone + Iterator + 'static,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
pub fn build<TInIter, TOutIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, TOut, TOutIter, F> InOut<TInIter, TOutIter> for OrderedFlatMap<TIn, TOut, F>
where
TIn: Send + Clone + IntoIterator,
TInIter: IntoIterator<Item = TIn>,
TOut: Send + Clone + Iterator + 'static,
TOutIter: FromIterator<<TOut as Iterator>::Item>,
F: FnOnce(TIn) -> TOut + Send + Copy,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self.threadpool.par_map(input, self.f).flatten().collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
fn is_ordered(&self) -> bool {
true
}
}
#[derive(Clone)]
pub struct Reduce<TIn, F>
where
TIn: Send,
F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<TIn>,
}
impl<TIn, F> Reduce<TIn, F>
where
TIn: Send + Clone + 'static,
F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync,
{
pub fn build<TInIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TIn>
where
TInIter: IntoIterator<Item = TIn>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TIn>
where
TInIter: IntoIterator<Item = TIn>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, F> InOut<TInIter, TIn> for Reduce<TIn, F>
where
TIn: Send + Clone + 'static,
TInIter: IntoIterator<Item = TIn>,
F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync,
{
fn run(&mut self, input: TInIter) -> Option<TIn> {
let res: TIn = self.threadpool.par_reduce(input, self.f);
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
}
#[derive(Clone)]
pub struct OrderedReduce<TIn, F>
where
TIn: Send,
F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync,
{
threadpool: ThreadPool,
replicas: usize,
f: F,
phantom: PhantomData<TIn>,
}
impl<TIn, F> OrderedReduce<TIn, F>
where
TIn: Send + Clone + 'static,
F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync,
{
pub fn build<TInIter>(n_worker: usize, f: F) -> impl InOut<TInIter, TIn>
where
TInIter: IntoIterator<Item = TIn>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter>(
n_worker: usize,
n_replicas: usize,
f: F,
) -> impl InOut<TInIter, TIn>
where
TInIter: IntoIterator<Item = TIn>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f,
phantom: PhantomData,
}
}
}
impl<TIn, TInIter, F> InOut<TInIter, TIn> for OrderedReduce<TIn, F>
where
TIn: Send + Clone + 'static,
TInIter: IntoIterator<Item = TIn>,
F: FnOnce(TIn, TIn) -> TIn + Send + Copy + Sync,
{
fn run(&mut self, input: TInIter) -> Option<TIn> {
let res: TIn = self.threadpool.par_reduce(input, self.f);
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
fn is_ordered(&self) -> bool {
true
}
}
#[derive(Clone)]
pub struct MapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send,
TMapOut: Send,
TKey: Send,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut,
{
threadpool: ThreadPool,
replicas: usize,
f_map: FMap,
f_reduce: FReduce,
phantom: PhantomData<(TIn, TMapOut, TKey)>,
}
impl<TIn, TMapOut, TKey, FMap, FReduce> MapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TKey: Send + Clone + 'static + Ord,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
pub fn build<TInIter, TOutIter>(
n_worker: usize,
f_map: FMap,
f_reduce: FReduce,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f_map,
f_reduce,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
f_map: FMap,
f_reduce: FReduce,
n_replicas: usize,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f_map,
f_reduce,
phantom: PhantomData,
}
}
}
impl<TIn, TMapOut, TInIter, TKey, TOutIter, FMap, FReduce> InOut<TInIter, TOutIter>
for MapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TInIter: IntoIterator<Item = TIn>,
TKey: Send + Clone + 'static + Ord,
TOutIter: FromIterator<(TKey, TMapOut)>,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self
.threadpool
.par_map_reduce(input, self.f_map, self.f_reduce)
.collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
}
#[derive(Clone)]
pub struct OrderedMapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send,
TMapOut: Send,
TKey: Send,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy,
{
threadpool: ThreadPool,
replicas: usize,
f_map: FMap,
f_reduce: FReduce,
phantom: PhantomData<(TIn, TMapOut, TKey)>,
}
impl<TIn, TMapOut, TKey, FMap, FReduce> OrderedMapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TKey: Send + Clone + 'static + Ord,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
pub fn build<TInIter, TOutIter>(
n_worker: usize,
f_map: FMap,
f_reduce: FReduce,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: 1,
f_map,
f_reduce,
phantom: PhantomData,
}
}
pub fn build_with_replicas<TInIter, TOutIter>(
n_worker: usize,
n_replicas: usize,
f_map: FMap,
f_reduce: FReduce,
) -> impl InOut<TInIter, TOutIter>
where
TInIter: IntoIterator<Item = TIn>,
TOutIter: FromIterator<(TKey, TMapOut)>,
{
assert!(n_replicas > 0);
Self {
threadpool: ThreadPool::with_capacity(n_worker),
replicas: n_replicas,
f_map,
f_reduce,
phantom: PhantomData,
}
}
}
impl<TIn, TMapOut, TInIter, TKey, TOutIter, FMap, FReduce> InOut<TInIter, TOutIter>
for OrderedMapReduce<TIn, TMapOut, TKey, FMap, FReduce>
where
TIn: Send + Clone + 'static,
TMapOut: Send + Clone + 'static,
TInIter: IntoIterator<Item = TIn>,
TKey: Send + Clone + 'static + Ord,
TOutIter: FromIterator<(TKey, TMapOut)>,
FMap: FnOnce(TIn) -> (TKey, TMapOut) + Send + Copy,
FReduce: FnOnce(TMapOut, TMapOut) -> TMapOut + Send + Copy + Sync,
{
fn run(&mut self, input: TInIter) -> Option<TOutIter> {
let res: TOutIter = self
.threadpool
.par_map_reduce(input, self.f_map, self.f_reduce)
.collect();
Some(res)
}
fn number_of_replicas(&self) -> usize {
self.replicas
}
fn is_ordered(&self) -> bool {
true
}
}
#[cfg(test)]
mod test {
use serial_test::serial;
use crate::{
prelude::*,
templates::{
map::{
FlatMap, Map, MapReduce, OrderedFlatMap, OrderedMap, OrderedMapReduce,
OrderedReduce, Reduce,
},
misc::{OrderedSinkVec, SinkVec, SourceIter},
},
};
fn square(x: f64) -> f64 {
x * x
}
#[test]
#[serial]
fn simple_map() {
let mut counter = 1.0;
let numbers: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let mut vector = Vec::new();
for _i in 0..1000 {
vector.push(numbers.clone());
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
Map::build(4, |el: f64| square(el)),
SinkVec::build()
];
let res: Vec<Vec<f64>> = pipe.start_and_wait_end().unwrap();
for vec in res {
for el in vec {
assert_eq!(el.sqrt(), counter);
counter += 1.0;
}
counter = 1.0;
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn simple_map_replicated() {
let mut counter = 1.0;
let numbers: Vec<f64> = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
let mut vector = Vec::new();
for _i in 0..1000 {
vector.push(numbers.clone());
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
Map::build_with_replicas(4, 2, |el: f64| square(el)),
SinkVec::build()
];
let res: Vec<Vec<f64>> = pipe.start_and_wait_end().unwrap();
for vec in res {
for el in vec {
assert_eq!(el.sqrt(), counter);
counter += 1.0;
}
counter = 1.0;
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn simple_ordered_map() {
let mut counter = 1.0;
let mut vector = Vec::new();
for _i in 0..1000 {
let mut numbers = Vec::new();
for _i in 0..10 {
numbers.push(counter);
counter += 1.0;
}
vector.push(numbers);
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
OrderedMap::build(4, |el: f64| square(el)),
OrderedSinkVec::build()
];
let res: Vec<Vec<f64>> = pipe.start_and_wait_end().unwrap();
counter = 1.0;
for vec in res {
for el in vec {
assert_eq!(el.sqrt(), counter);
counter += 1.0;
}
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn simple_ordered_map_replicated() {
let mut counter = 1.0;
let mut vector = Vec::new();
for _i in 0..1000 {
let mut numbers = Vec::new();
for _i in 0..10 {
numbers.push(counter);
counter += 1.0;
}
vector.push(numbers);
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
OrderedMap::build_with_replicas(4, 2, |el: f64| square(el)),
OrderedSinkVec::build()
];
let res: Vec<Vec<f64>> = pipe.start_and_wait_end().unwrap();
counter = 1.0;
for vec in res {
for el in vec {
assert_eq!(el.sqrt(), counter);
counter += 1.0;
}
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn summation() {
let mut set = Vec::new();
for _i in 0..1000 {
let mut vector = Vec::new();
for _i in 0..10 {
vector.push(1);
}
set.push(vector);
}
let pipe = pipeline![
SourceIter::build(set.into_iter()),
Reduce::build(4, |a, b| -> i32 { a + b }),
SinkVec::build()
];
let res: Vec<i32> = pipe.start_and_wait_end().unwrap();
assert_eq!(res.len(), 1000);
for el in res {
assert_eq!(el, 10)
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn summation_ordered() {
let set = vec![vec![1; 10], vec![1; 100], vec![1; 1000]];
let pipe = pipeline![
SourceIter::build(set.into_iter()),
OrderedReduce::build_with_replicas(2, 4, |a, b| -> i32 { a + b }),
OrderedSinkVec::build()
];
let res: Vec<i32> = pipe.start_and_wait_end().unwrap();
assert_eq!(res.len(), 3);
assert_eq!(res[0], 10);
assert_eq!(res[1], 100);
assert_eq!(res[2], 1000);
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn summation_of_squares() {
let mut counter = 1.0;
let mut set = Vec::new();
for i in 0..100000 {
let mut vector = Vec::new();
for _i in 0..10 {
vector.push((i, counter));
counter += 1.0;
}
counter = 1.0;
set.push(vector);
}
let pipe = pipeline![
SourceIter::build(set.into_iter()),
MapReduce::build(
8,
|el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) },
|a, b| { a + b }
),
SinkVec::build()
];
let res: Vec<Vec<(usize, f64)>> = pipe.start_and_wait_end().unwrap();
assert_eq!(res.len(), 100000);
for vec in res {
assert_eq!(vec.len(), 1);
for el in vec {
assert_eq!(el.1, 385.00);
}
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn summation_of_squares_ordered() {
let mut counter = 1.0;
let mut set = Vec::new();
for i in 0..100000 {
let mut vector = Vec::new();
for _i in 0..10 {
vector.push((i, counter));
counter += 1.0;
}
counter = 1.0;
set.push(vector);
}
let pipe = pipeline![
SourceIter::build(set.into_iter()),
OrderedMapReduce::build_with_replicas(
2,
4,
|el: (usize, f64)| -> (usize, f64) { (el.0, el.1 * el.1) },
|a, b| { a + b }
),
OrderedSinkVec::build()
];
let res: Vec<Vec<(usize, f64)>> = pipe.start_and_wait_end().unwrap();
assert_eq!(res.len(), 100000);
for (check, vec) in res.into_iter().enumerate() {
assert_eq!(vec.len(), 1);
for el in vec {
assert_eq!(el, (check, 385.00));
}
}
unsafe {
Orchestrator::delete_global_orchestrator();
}
}
#[test]
#[serial]
fn flat_map() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let mut vector = Vec::new();
for _i in 0..1000 {
vector.push(a.clone());
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
FlatMap::build(4, |x: Vec<u64>| x.into_iter().map(|i| i + 1)),
SinkVec::build()
];
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
let check = res.pop().unwrap();
assert_eq!(&check, &[2, 3, 4, 5, 6, 7, 8, 9]);
}
#[test]
#[serial]
fn flat_map_replicated() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let mut vector = Vec::new();
for _i in 0..1000 {
vector.push(a.clone());
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
FlatMap::build_with_replicas(2, 4, |x: Vec<u64>| x.into_iter().map(|i| i + 1)),
SinkVec::build()
];
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
let check = res.pop().unwrap();
assert_eq!(&check, &[2, 3, 4, 5, 6, 7, 8, 9]);
}
#[test]
#[serial]
fn ordered_flat_map() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let b: Vec<Vec<u64>> = vec![vec![8, 7], vec![6, 5], vec![4, 3], vec![2, 1]];
let mut vector = Vec::new();
for _i in 0..1000 {
vector.push(a.clone());
vector.push(b.clone());
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
OrderedFlatMap::build(4, |x: Vec<u64>| x.into_iter()),
OrderedSinkVec::build()
];
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
while !res.is_empty() {
let check_a = res.remove(0);
let check_b = res.remove(0);
assert_eq!(&check_a, &[1, 2, 3, 4, 5, 6, 7, 8]);
assert_eq!(&check_b, &[8, 7, 6, 5, 4, 3, 2, 1]);
}
}
#[test]
#[serial]
fn ordered_flat_map_replicated() {
let a: Vec<Vec<u64>> = vec![vec![1, 2], vec![3, 4], vec![5, 6], vec![7, 8]];
let b: Vec<Vec<u64>> = vec![vec![8, 7], vec![6, 5], vec![4, 3], vec![2, 1]];
let mut vector = Vec::new();
for _i in 0..1000 {
vector.push(a.clone());
vector.push(b.clone());
}
let pipe = pipeline![
SourceIter::build(vector.into_iter()),
OrderedFlatMap::build_with_replicas(2, 4, |x: Vec<u64>| x.into_iter()),
OrderedSinkVec::build()
];
let mut res: Vec<Vec<u64>> = pipe.start_and_wait_end().unwrap();
while !res.is_empty() {
let check_a = res.remove(0);
let check_b = res.remove(0);
assert_eq!(&check_a, &[1, 2, 3, 4, 5, 6, 7, 8]);
assert_eq!(&check_b, &[8, 7, 6, 5, 4, 3, 2, 1]);
}
}
}