ppl/templates/
misc.rs

1use std::{collections::VecDeque, marker::PhantomData};
2
3use crate::pipeline::node::{In, InOut, Out};
4
5/// SourceIter.
6///
7/// This source node produces data from a iterator.
8pub struct SourceIter<I, T>
9where
10    I: Iterator<Item = T>,
11{
12    iterator: I,
13    phantom: PhantomData<T>,
14}
15impl<I, T> SourceIter<I, T>
16where
17    I: Iterator<Item = T>,
18    T: Send + 'static,
19{
20    /// Creates a new source from any type that implements the `Iterator` trait.
21    /// The source will terminate when the iterator is exhausted.
22    ///
23    /// # Arguments
24    /// * `iterator` - Type that implements the [`Iterator`] trait
25    /// and represents the stream of data we want emit.
26    ///
27    /// # Examples
28    ///
29    /// In this example we create a source node using a [`SourceIter`]
30    /// template that emits numbers from 1 to 21.
31    ///
32    /// ```
33    /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}};
34    ///
35    /// let p = pipeline![
36    ///     SourceIter::build(1..21),
37    ///     Sequential::build(|el| { el }),
38    ///     SinkVec::build()
39    /// ];
40    /// let res = p.start_and_wait_end().unwrap();
41    /// ```
42    pub fn build(iterator: I) -> impl Out<T> {
43        Self {
44            iterator,
45            phantom: PhantomData,
46        }
47    }
48}
49impl<I, T> Out<T> for SourceIter<I, T>
50where
51    I: Iterator<Item = T>,
52    T: Send + 'static,
53{
54    fn run(&mut self) -> Option<T> {
55        self.iterator.next()
56    }
57}
58
59/// SinkVec.
60///
61/// Sink node that accumulates data into a vector.
62pub struct SinkVec<T> {
63    data: Vec<T>,
64}
65impl<T> SinkVec<T>
66where
67    T: Send + 'static,
68{
69    /// Creates a new sink that accumulates data into a vector.
70    /// The sink will terminate when the stream terminates, producing
71    /// a vector containing all the data received.
72    ///    
73    /// # Examples
74    ///
75    /// In this example we send element by element the content of a vector
76    /// through a pipeline.
77    /// Using the [`SinkVec`] template, we create a sink node that collects
78    /// the data received.
79    ///
80    /// ```
81    /// use ppl::{prelude::*, templates::misc::{SourceIter, Sequential, SinkVec}};
82    ///
83    /// let data = vec![1, 2, 3, 4, 5];
84    /// let p = pipeline![
85    ///     SourceIter::build(data.into_iter()),
86    ///     Sequential::build(|el| { el }),
87    ///     SinkVec::build()
88    /// ];
89    /// let res = p.start_and_wait_end().unwrap();
90    /// assert_eq!(res,  vec![1, 2, 3, 4, 5])
91    /// ```
92    pub fn build() -> impl In<T, Vec<T>> {
93        Self { data: Vec::new() }
94    }
95}
96impl<T> In<T, Vec<T>> for SinkVec<T>
97where
98    T: Send + 'static,
99{
100    fn run(&mut self, input: T) {
101        self.data.push(input);
102    }
103    fn finalize(self) -> Option<Vec<T>> {
104        Some(self.data)
105    }
106}
107
108/// Splitter.
109///
110/// This node receives a vector, split it into chunks of size `chunk_size`
111/// and send each chunk to the next node.
112#[derive(Clone)]
113pub struct Splitter<T> {
114    chunk_size: usize,
115    n_replicas: usize,
116    data: VecDeque<T>,
117}
118impl<T> Splitter<T>
119where
120    T: Send + 'static + Clone,
121{
122    /// Creates a new splitter node.
123    ///
124    /// # Arguments
125    /// * `chunk_size` - Number of elements for each chunk.
126    ///
127    /// # Examples
128    /// Given a stream of numbers, we create a pipeline with a splitter that
129    /// create vectors of two elements each.
130    ///
131    /// ```
132    /// use ppl::{prelude::*, templates::misc::{SourceIter, Splitter, SinkVec, Aggregator}};
133    ///
134    /// let vec = vec![1, 2, 3, 4, 5, 6, 7, 8];
135    /// let p = pipeline![
136    ///     SourceIter::build(vec.into_iter()),
137    ///     Aggregator::build(8), // We aggregate each element in a vector.
138    ///     Splitter::build(2), // We split the received vector in 4 sub-vector of size 2.
139    ///     SinkVec::build()
140    /// ];
141    /// let mut res = p.start_and_wait_end().unwrap();
142    /// assert_eq!(res.len(), 4)
143    /// ```
144    pub fn build(chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
145        Self {
146            chunk_size,
147            n_replicas: 1,
148            data: VecDeque::new(),
149        }
150    }
151
152    /// Creates a new splitter node with 'n_replicas' replicas of the same node.
153    ///     
154    /// # Arguments
155    /// * `n_replicas` - Number of replicas.
156    /// * `chunk_size` - Number of elements for each chunk.
157    pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
158        Self {
159            chunk_size,
160            n_replicas,
161            data: VecDeque::new(),
162        }
163    }
164}
165impl<T> InOut<Vec<T>, Vec<T>> for Splitter<T>
166where
167    T: Send + 'static + Clone,
168{
169    fn run(&mut self, input: Vec<T>) -> Option<Vec<T>> {
170        self.data.extend(input);
171        None
172    }
173    fn number_of_replicas(&self) -> usize {
174        self.n_replicas
175    }
176    fn is_producer(&self) -> bool {
177        true
178    }
179    fn produce(&mut self) -> Option<Vec<T>> {
180        if self.data.len() >= self.chunk_size {
181            let mut chunk = Vec::new();
182            for _i in 0..self.chunk_size {
183                chunk.push(self.data.pop_front().unwrap())
184            }
185            Some(chunk)
186        } else {
187            None
188        }
189    }
190}
191
192/// Aggregator.
193///
194/// This node receives elements and accumulates them into a vector.
195/// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node.
196#[derive(Clone)]
197pub struct Aggregator<T> {
198    chunk_size: usize,
199    n_replicas: usize,
200    data: VecDeque<T>,
201}
202impl<T> Aggregator<T>
203where
204    T: Send + 'static + Clone,
205{
206    /// Creates a new aggregator node.
207    ///
208    /// # Arguments
209    /// * `chunk_size` - Number of elements for each chunk.
210    ///
211    /// # Examples
212    /// Given a stream of numbers, we use an [`Aggregator`] template to
213    /// group the elements of this stream in vectors of size 100.
214    ///
215    /// ```
216    /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Aggregator}};
217    ///
218    /// let p = pipeline![
219    ///     SourceIter::build(0..2000),
220    ///     Aggregator::build(100),
221    ///     SinkVec::build()
222    /// ];
223    /// let res = p.start_and_wait_end().unwrap();
224    /// assert_eq!(res.len(), 20);
225    /// ```
226    pub fn build(chunk_size: usize) -> impl InOut<T, Vec<T>> {
227        Self {
228            chunk_size,
229            n_replicas: 1,
230            data: VecDeque::new(),
231        }
232    }
233
234    /// Creates a new aggregator node with 'n_replicas' replicas of the same node.
235    ///
236    /// # Arguments
237    /// * `n_replicas` - Number of replicas.
238    /// * `chunk_size` - Number of elements for each chunk.
239    pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<T, Vec<T>> {
240        Self {
241            chunk_size,
242            n_replicas,
243            data: VecDeque::new(),
244        }
245    }
246}
247impl<T> InOut<T, Vec<T>> for Aggregator<T>
248where
249    T: Send + 'static + Clone,
250{
251    fn run(&mut self, input: T) -> Option<Vec<T>> {
252        self.data.push_back(input);
253        None
254    }
255    fn number_of_replicas(&self) -> usize {
256        self.n_replicas
257    }
258    fn is_producer(&self) -> bool {
259        true
260    }
261    fn produce(&mut self) -> Option<Vec<T>> {
262        if self.data.len() >= self.chunk_size {
263            let mut chunk = Vec::new();
264            for _i in 0..self.chunk_size {
265                chunk.push(self.data.pop_front().unwrap())
266            }
267            Some(chunk)
268        } else {
269            None
270        }
271    }
272}
273
274/// Sequential node.
275///
276/// Given a function that defines the logic of the node, this method will create a node with one replica.
277#[derive(Clone)]
278pub struct Sequential<T, U, F>
279where
280    T: Send + 'static,
281    U: Send + 'static,
282    F: FnMut(T) -> U + Send + 'static,
283{
284    f: F,
285    phantom: PhantomData<T>,
286}
287impl<T, U, F> Sequential<T, U, F>
288where
289    T: Send + 'static + Clone,
290    U: Send + 'static + Clone,
291    F: FnMut(T) -> U + Send + 'static + Clone,
292{
293    /// Creates a new sequential node.
294    ///
295    /// # Arguments
296    /// * `f` - Function name or lambda function that specify the logic
297    /// of this node.
298    pub fn build(f: F) -> impl InOut<T, U> {
299        Self {
300            f,
301            phantom: PhantomData,
302        }
303    }
304}
305impl<T, U, F> InOut<T, U> for Sequential<T, U, F>
306where
307    T: Send + 'static + Clone,
308    U: Send + 'static + Clone,
309    F: FnMut(T) -> U + Send + 'static + Clone,
310{
311    fn run(&mut self, input: T) -> Option<U> {
312        Some((self.f)(input))
313    }
314}
315
316/// Parallel node.
317///
318/// Given a function that defines the logic of the node, this method will create 'n_replicas' replicas of that node.
319#[derive(Clone)]
320pub struct Parallel<T, U, F>
321where
322    T: Send + 'static,
323    U: Send + 'static,
324    F: FnMut(T) -> U + Send + 'static,
325{
326    n_replicas: usize,
327    f: F,
328    phantom: PhantomData<T>,
329}
330impl<T, U, F> Parallel<T, U, F>
331where
332    T: Send + 'static + Clone,
333    U: Send + 'static + Clone,
334    F: FnMut(T) -> U + Send + 'static + Clone,
335{
336    /// Creates a new parallel node.
337    ///
338    /// # Arguments
339    /// * `n_replicas` - Number of replicas.
340    /// * `f` - Function name or lambda function that specify the logic
341    /// of this node.
342    pub fn build(n_replicas: usize, f: F) -> impl InOut<T, U> {
343        Self {
344            n_replicas,
345            f,
346            phantom: PhantomData,
347        }
348    }
349}
350impl<T, U, F> InOut<T, U> for Parallel<T, U, F>
351where
352    T: Send + 'static + Clone,
353    U: Send + 'static + Clone,
354    F: FnMut(T) -> U + Send + 'static + Clone,
355{
356    fn run(&mut self, input: T) -> Option<U> {
357        Some((self.f)(input))
358    }
359    fn number_of_replicas(&self) -> usize {
360        self.n_replicas
361    }
362}
363
364/// Filter.
365///
366/// This node receives elements and filters them according to the given predicate.
367#[derive(Clone)]
368pub struct Filter<T, F>
369where
370    T: Send + 'static,
371    F: FnMut(&T) -> bool + Send + 'static,
372{
373    f: F,
374    n_replicas: usize,
375    phantom: PhantomData<T>,
376}
377impl<T, F> Filter<T, F>
378where
379    T: Send + 'static + Clone,
380    F: FnMut(&T) -> bool + Send + 'static + Clone,
381{
382    /// Creates a new filter node.
383    ///
384    /// # Arguments
385    /// * `f` - Function name or lambda function that represent the predicate
386    /// function we want to apply.
387    ///
388    /// # Examples
389    /// Given a set of numbers from 0 to 199, we use a [`Filter`]
390    /// template to filter the even numbers.
391    /// ```
392    /// use ppl::{prelude::*, templates::misc::{SourceIter, SinkVec, Filter}};
393    ///
394    /// let p = pipeline![
395    ///     SourceIter::build(0..200),
396    ///     Filter::build(|el| { el % 2 == 0 }),
397    ///     SinkVec::build()
398    /// ];
399    ///
400    /// let res = p.start_and_wait_end().unwrap();
401    ///  assert_eq!(res.len(), 100)
402    /// ```
403    pub fn build(f: F) -> impl InOut<T, T> {
404        Self {
405            f,
406            n_replicas: 1,
407            phantom: PhantomData,
408        }
409    }
410    /// Creates a new filter node with 'n_replicas' replicas of the same node.
411    ///     
412    /// # Arguments
413    /// * `n_replicas` - Number of replicas.
414    /// * `f` - Function name or lambda function that represent the predicate
415    /// function we want to apply.
416    pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut<T, T> {
417        Self {
418            f,
419            n_replicas,
420            phantom: PhantomData,
421        }
422    }
423}
424impl<T, F> InOut<T, T> for Filter<T, F>
425where
426    T: Send + 'static + Clone,
427    F: FnMut(&T) -> bool + Send + 'static + Clone,
428{
429    fn run(&mut self, input: T) -> Option<T> {
430        if (self.f)(&input) {
431            Some(input)
432        } else {
433            None
434        }
435    }
436    fn number_of_replicas(&self) -> usize {
437        self.n_replicas
438    }
439}
440
441// Ordered versions of the above
442
443/// OrderedSinkVec.
444///
445/// Sink node that accumulates data into a vector.
446/// This is an ordered version of [`SinkVec`].
447/// The sink will produce a vector containing all the data received in the same order
448/// as it was received.
449pub struct OrderedSinkVec<T> {
450    data: Vec<T>,
451}
452impl<T> OrderedSinkVec<T>
453where
454    T: Send + 'static,
455{
456    /// Creates a new sink that accumulates data into a vector.
457    /// The sink will terminate when the stream terminates, producing
458    /// a vector containing all the data received.
459    pub fn build() -> impl In<T, Vec<T>> {
460        Self { data: Vec::new() }
461    }
462}
463impl<T> In<T, Vec<T>> for OrderedSinkVec<T>
464where
465    T: Send + 'static,
466{
467    fn run(&mut self, input: T) {
468        self.data.push(input);
469    }
470    fn is_ordered(&self) -> bool {
471        true
472    }
473    fn finalize(self) -> Option<Vec<T>> {
474        Some(self.data)
475    }
476}
477
478/// OrderedSplitter.
479///
480/// This node receives a vector, split it into chunks of size `chunk_size`
481/// and send each chunk to the next node.
482/// This is an ordered versione of [`Splitter`].
483/// This node mantains the order of the input in the output.
484#[derive(Clone)]
485pub struct OrderedSplitter<T> {
486    chunk_size: usize,
487    n_replicas: usize,
488    data: VecDeque<T>,
489}
490impl<T> OrderedSplitter<T>
491where
492    T: Send + 'static + Clone,
493{
494    /// Creates a new ordered splitter node.
495    /// # Arguments
496    /// * `chunk_size` - Number of elements for each chunk.
497    pub fn build(chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
498        Self {
499            chunk_size,
500            n_replicas: 1,
501            data: VecDeque::new(),
502        }
503    }
504
505    /// Creates a new ordered splitter node with 'n_replicas' replicas of the same node.
506    /// # Arguments
507    /// * `n_replicas` - Number of replicas.
508    /// * `chunk_size` - Number of elements for each chunk.
509    pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
510        Self {
511            chunk_size,
512            n_replicas,
513            data: VecDeque::new(),
514        }
515    }
516}
517impl<T> InOut<Vec<T>, Vec<T>> for OrderedSplitter<T>
518where
519    T: Send + 'static + Clone,
520{
521    fn run(&mut self, input: Vec<T>) -> Option<Vec<T>> {
522        self.data.extend(input);
523        None
524    }
525    fn number_of_replicas(&self) -> usize {
526        self.n_replicas
527    }
528    fn is_producer(&self) -> bool {
529        true
530    }
531    fn produce(&mut self) -> Option<Vec<T>> {
532        if self.data.len() >= self.chunk_size {
533            let mut chunk = Vec::new();
534            for _i in 0..self.chunk_size {
535                chunk.push(self.data.pop_front().unwrap())
536            }
537            Some(chunk)
538        } else {
539            None
540        }
541    }
542    fn is_ordered(&self) -> bool {
543        true
544    }
545}
546
547/// OrderedAggregator.
548///
549/// This node receives elements and accumulates them into a vector.
550/// When the vector reaches the size `chunk_size` it send the vector with the elements accumulated to the next node.
551/// This is an ordered version of [`Aggregator`].
552/// This node mantains the order of the input in the output.
553#[derive(Clone)]
554pub struct OrderedAggregator<T> {
555    chunk_size: usize,
556    n_replicas: usize,
557    data: VecDeque<T>,
558}
559impl<T> OrderedAggregator<T>
560where
561    T: Send + 'static + Clone,
562{
563    /// Creates a new ordered aggregator node
564    ///
565    /// # Arguments
566    /// * `chunk_size` - Number of elements for each chunk.
567    pub fn build(chunk_size: usize) -> impl InOut<T, Vec<T>> {
568        Self {
569            chunk_size,
570            n_replicas: 1,
571            data: VecDeque::new(),
572        }
573    }
574
575    /// Creates a new ordered aggregator nod with 'n_replicas' replicas of the same node.
576    /// # Arguments
577    /// * `n_replicas` - Number of replicas.
578    /// * `chunk_size` - Number of elements for each chunk.
579    pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<T, Vec<T>> {
580        Self {
581            chunk_size,
582            n_replicas,
583            data: VecDeque::new(),
584        }
585    }
586}
587impl<T> InOut<T, Vec<T>> for OrderedAggregator<T>
588where
589    T: Send + 'static + Clone,
590{
591    fn run(&mut self, input: T) -> Option<Vec<T>> {
592        self.data.push_back(input);
593        None
594    }
595    fn number_of_replicas(&self) -> usize {
596        self.n_replicas
597    }
598    fn is_producer(&self) -> bool {
599        true
600    }
601    fn produce(&mut self) -> Option<Vec<T>> {
602        if self.data.len() >= self.chunk_size {
603            let mut chunk = Vec::new();
604            for _i in 0..self.chunk_size {
605                chunk.push(self.data.pop_front().unwrap())
606            }
607            Some(chunk)
608        } else {
609            None
610        }
611    }
612    fn is_ordered(&self) -> bool {
613        true
614    }
615}
616
617/// OrderedSequential.
618///
619/// This node receives elements and applies a function to each element.
620/// This is an ordered version of [`Sequential`].
621/// The node will produce data in the same order as it is received from the upstream.
622#[derive(Clone)]
623pub struct OrderedSequential<T, U, F> {
624    f: F,
625    phantom: PhantomData<(T, U)>,
626}
627impl<T, U, F> OrderedSequential<T, U, F>
628where
629    T: Send + 'static + Clone,
630    U: Send + 'static + Clone,
631    F: FnMut(T) -> U + Send + 'static + Clone,
632{
633    /// Creates a new sequential node.
634    /// # Arguments
635    /// * `f` - Function name or lambda function that specify the logic
636    /// of this node.
637    pub fn build(f: F) -> impl InOut<T, U> {
638        Self {
639            f,
640            phantom: PhantomData,
641        }
642    }
643}
644impl<T, U, F> InOut<T, U> for OrderedSequential<T, U, F>
645where
646    T: Send + 'static + Clone,
647    U: Send + 'static + Clone,
648    F: FnMut(T) -> U + Send + 'static + Clone,
649{
650    fn run(&mut self, input: T) -> Option<U> {
651        Some((self.f)(input))
652    }
653    fn is_ordered(&self) -> bool {
654        true
655    }
656}
657
658/// OrderedParallel.
659///
660/// This node receives elements and applies a function to each element.
661/// This is an ordered version of [`Parallel`].
662/// The node will produce data in the same order as it is received from the upstream.
663#[derive(Clone)]
664pub struct OrderedParallel<T, U, F> {
665    f: F,
666    n_replicas: usize,
667    phantom: PhantomData<(T, U)>,
668}
669impl<T, U, F> OrderedParallel<T, U, F>
670where
671    T: Send + 'static + Clone,
672    U: Send + 'static + Clone,
673    F: FnMut(T) -> U + Send + 'static + Clone,
674{
675    /// Creates a new parallel node.
676    /// # Arguments
677    /// * `n_replicas` - Number of replicas.
678    /// * `f` - Function name or lambda function that specify the logic
679    /// of this node.
680    pub fn build(n_replicas: usize, f: F) -> impl InOut<T, U> {
681        Self {
682            f,
683            n_replicas,
684            phantom: PhantomData,
685        }
686    }
687}
688impl<T, U, F> InOut<T, U> for OrderedParallel<T, U, F>
689where
690    T: Send + 'static + Clone,
691    U: Send + 'static + Clone,
692    F: FnMut(T) -> U + Send + 'static + Clone,
693{
694    fn run(&mut self, input: T) -> Option<U> {
695        Some((self.f)(input))
696    }
697    fn is_ordered(&self) -> bool {
698        true
699    }
700    fn number_of_replicas(&self) -> usize {
701        self.n_replicas
702    }
703}
704
705/// OrderedFilter.
706///
707/// This node receives elements and filters them according to a predicate.
708/// This is an ordered version of [`Filter`].
709#[derive(Clone)]
710pub struct OrderedFilter<T, F> {
711    f: F,
712    n_replicas: usize,
713    phantom: PhantomData<T>,
714}
715impl<T, F> OrderedFilter<T, F>
716where
717    T: Send + 'static + Clone,
718    F: FnMut(&T) -> bool + Send + 'static + Clone,
719{
720    /// Creates a new filter node.
721    /// # Arguments
722    /// * `f` - Function name or lambda function that represent the predicate
723    /// function we want to apply.
724    pub fn build(f: F) -> impl InOut<T, T> {
725        Self {
726            f,
727            n_replicas: 1,
728            phantom: PhantomData,
729        }
730    }
731    /// Creates a new filter node.
732    /// # Arguments
733    /// * `n_replicas` - Number of replicas.
734    /// * `f` - Function name or lambda function that represent the predicate
735    /// function we want to apply.
736    pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut<T, T> {
737        Self {
738            f,
739            n_replicas,
740            phantom: PhantomData,
741        }
742    }
743}
744impl<T, F> InOut<T, T> for OrderedFilter<T, F>
745where
746    T: Send + 'static + Clone,
747    F: FnMut(&T) -> bool + Send + 'static + Clone,
748{
749    fn run(&mut self, input: T) -> Option<T> {
750        if (self.f)(&input) {
751            Some(input)
752        } else {
753            None
754        }
755    }
756    fn is_ordered(&self) -> bool {
757        true
758    }
759    fn number_of_replicas(&self) -> usize {
760        self.n_replicas
761    }
762}
763
764#[cfg(test)]
765mod test {
766    use crate::{
767        prelude::*,
768        templates::misc::{
769            Filter, OrderedAggregator, OrderedFilter, OrderedParallel, OrderedSequential,
770            OrderedSinkVec, OrderedSplitter, Parallel, Sequential, SinkVec, SourceIter,
771        },
772    };
773    use serial_test::serial;
774
775    use super::{Aggregator, Splitter};
776
777    #[test]
778    #[serial]
779    fn simple_pipeline() {
780        let p = pipeline![
781            SourceIter::build(1..21),
782            Sequential::build(|el| {
783                el /*println!("Hello, received: {}", el); */
784            }),
785            SinkVec::build()
786        ];
787
788        let res = p.start_and_wait_end().unwrap();
789
790        assert_eq!(res.len(), 20)
791    }
792
793    #[test]
794    #[serial]
795    fn simple_pipeline_ordered() {
796        let p = pipeline![
797            SourceIter::build(1..21),
798            OrderedSequential::build(|el| {
799                el /*println!("Hello, received: {}", el); */
800            }),
801            OrderedSinkVec::build()
802        ];
803
804        let res = p.start_and_wait_end().unwrap();
805
806        assert_eq!(res.len(), 20);
807
808        let mut counter = 1;
809        for el in res {
810            assert_eq!(el, counter);
811            counter += 1;
812        }
813    }
814
815    #[test]
816    #[serial]
817    fn simple_farm() {
818        let p = pipeline![
819            SourceIter::build(1..21),
820            Parallel::build(8, |el| {
821                el /*println!("Hello, received: {}", el); */
822            }),
823            SinkVec::build()
824        ];
825
826        let res = p.start_and_wait_end().unwrap();
827
828        assert_eq!(res.len(), 20)
829    }
830
831    #[test]
832    #[serial]
833    fn simple_farm_ordered() {
834        let p = pipeline![
835            SourceIter::build(1..21),
836            OrderedParallel::build(8, |el| {
837                el /*println!("Hello, received: {}", el); */
838            }),
839            OrderedSinkVec::build()
840        ];
841
842        let res = p.start_and_wait_end().unwrap();
843
844        assert_eq!(res.len(), 20);
845
846        let mut counter = 1;
847        for el in res {
848            assert_eq!(el, counter);
849            counter += 1;
850        }
851    }
852
853    #[test]
854    #[serial]
855    fn splitter() {
856        let mut counter = 1;
857        let mut set = Vec::new();
858
859        for i in 0..1000 {
860            let mut vector = Vec::new();
861            for _i in 0..20 {
862                vector.push((i, counter));
863                counter += 1;
864            }
865            counter = 1;
866            set.push(vector);
867        }
868
869        let p = pipeline![
870            SourceIter::build(set.into_iter()),
871            Splitter::build_with_replicas(2, 2),
872            Splitter::build(20000),
873            SinkVec::build()
874        ];
875
876        let mut res = p.start_and_wait_end().unwrap();
877
878        assert_eq!(res.len(), 1);
879
880        let vec = res.pop().unwrap();
881
882        assert_eq!(vec.len(), 20000)
883    }
884
885    #[test]
886    #[serial]
887    fn splitter_ordered() {
888        let mut counter = 1;
889        let mut set = Vec::new();
890
891        for _i in 0..1000 {
892            let mut vector = Vec::new();
893            for _i in 0..20 {
894                vector.push(counter);
895                counter += 1;
896            }
897            set.push(vector);
898        }
899
900        let p = pipeline![
901            SourceIter::build(set.into_iter()),
902            OrderedSplitter::build_with_replicas(2, 10),
903            OrderedSplitter::build_with_replicas(4, 1),
904            OrderedSplitter::build(20000),
905            OrderedSinkVec::build()
906        ];
907
908        let mut res = p.start_and_wait_end().unwrap();
909
910        assert_eq!(res.len(), 1);
911
912        let vec = res.pop().unwrap();
913
914        assert_eq!(vec.len(), 20000);
915
916        counter = 1;
917        for el in vec {
918            assert_eq!(el, counter);
919            counter += 1;
920        }
921    }
922
923    #[test]
924    #[serial]
925    fn aggregator() {
926        let p = pipeline![
927            SourceIter::build(0..2000),
928            Aggregator::build(100),
929            SinkVec::build()
930        ];
931
932        let res = p.start_and_wait_end().unwrap();
933
934        assert_eq!(res.len(), 20);
935
936        for vec in res {
937            assert_eq!(vec.len(), 100);
938        }
939    }
940
941    #[test]
942    #[serial]
943    fn aggregator_ordered() {
944        let p = pipeline![
945            SourceIter::build(0..2000),
946            OrderedAggregator::build_with_replicas(4, 100),
947            OrderedSplitter::build(1),
948            OrderedSinkVec::build()
949        ];
950
951        let res = p.start_and_wait_end().unwrap();
952
953        assert_eq!(res.len(), 2000);
954
955        for vec in res {
956            assert_eq!(vec.len(), 1);
957        }
958    }
959
960    #[test]
961    #[serial]
962    fn filter() {
963        let p = pipeline![
964            SourceIter::build(0..200),
965            Filter::build(|el| { el % 2 == 0 }),
966            SinkVec::build()
967        ];
968
969        let res = p.start_and_wait_end().unwrap();
970
971        assert_eq!(res.len(), 100)
972    }
973
974    #[test]
975    #[serial]
976    fn filter_ordered() {
977        let p = pipeline![
978            SourceIter::build(0..200),
979            OrderedFilter::build_with_replicas(4, |el| { el % 2 == 0 }),
980            OrderedSinkVec::build()
981        ];
982
983        let res = p.start_and_wait_end().unwrap();
984
985        assert_eq!(res.len(), 100);
986
987        let mut counter = 0;
988        for el in res {
989            assert_eq!(el, counter);
990            counter += 2;
991        }
992    }
993}