1use std::{collections::VecDeque, marker::PhantomData};
2
3use crate::pipeline::node::{In, InOut, Out};
4
5pub struct SourceIter<I, T>
9where
10 I: Iterator<Item = T>,
11{
12 iterator: I,
13 phantom: PhantomData<T>,
14}
15impl<I, T> SourceIter<I, T>
16where
17 I: Iterator<Item = T>,
18 T: Send + 'static,
19{
20 pub fn build(iterator: I) -> impl Out<T> {
43 Self {
44 iterator,
45 phantom: PhantomData,
46 }
47 }
48}
49impl<I, T> Out<T> for SourceIter<I, T>
50where
51 I: Iterator<Item = T>,
52 T: Send + 'static,
53{
54 fn run(&mut self) -> Option<T> {
55 self.iterator.next()
56 }
57}
58
59pub struct SinkVec<T> {
63 data: Vec<T>,
64}
65impl<T> SinkVec<T>
66where
67 T: Send + 'static,
68{
69 pub fn build() -> impl In<T, Vec<T>> {
93 Self { data: Vec::new() }
94 }
95}
96impl<T> In<T, Vec<T>> for SinkVec<T>
97where
98 T: Send + 'static,
99{
100 fn run(&mut self, input: T) {
101 self.data.push(input);
102 }
103 fn finalize(self) -> Option<Vec<T>> {
104 Some(self.data)
105 }
106}
107
108#[derive(Clone)]
113pub struct Splitter<T> {
114 chunk_size: usize,
115 n_replicas: usize,
116 data: VecDeque<T>,
117}
118impl<T> Splitter<T>
119where
120 T: Send + 'static + Clone,
121{
122 pub fn build(chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
145 Self {
146 chunk_size,
147 n_replicas: 1,
148 data: VecDeque::new(),
149 }
150 }
151
152 pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
158 Self {
159 chunk_size,
160 n_replicas,
161 data: VecDeque::new(),
162 }
163 }
164}
165impl<T> InOut<Vec<T>, Vec<T>> for Splitter<T>
166where
167 T: Send + 'static + Clone,
168{
169 fn run(&mut self, input: Vec<T>) -> Option<Vec<T>> {
170 self.data.extend(input);
171 None
172 }
173 fn number_of_replicas(&self) -> usize {
174 self.n_replicas
175 }
176 fn is_producer(&self) -> bool {
177 true
178 }
179 fn produce(&mut self) -> Option<Vec<T>> {
180 if self.data.len() >= self.chunk_size {
181 let mut chunk = Vec::new();
182 for _i in 0..self.chunk_size {
183 chunk.push(self.data.pop_front().unwrap())
184 }
185 Some(chunk)
186 } else {
187 None
188 }
189 }
190}
191
192#[derive(Clone)]
197pub struct Aggregator<T> {
198 chunk_size: usize,
199 n_replicas: usize,
200 data: VecDeque<T>,
201}
202impl<T> Aggregator<T>
203where
204 T: Send + 'static + Clone,
205{
206 pub fn build(chunk_size: usize) -> impl InOut<T, Vec<T>> {
227 Self {
228 chunk_size,
229 n_replicas: 1,
230 data: VecDeque::new(),
231 }
232 }
233
234 pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<T, Vec<T>> {
240 Self {
241 chunk_size,
242 n_replicas,
243 data: VecDeque::new(),
244 }
245 }
246}
247impl<T> InOut<T, Vec<T>> for Aggregator<T>
248where
249 T: Send + 'static + Clone,
250{
251 fn run(&mut self, input: T) -> Option<Vec<T>> {
252 self.data.push_back(input);
253 None
254 }
255 fn number_of_replicas(&self) -> usize {
256 self.n_replicas
257 }
258 fn is_producer(&self) -> bool {
259 true
260 }
261 fn produce(&mut self) -> Option<Vec<T>> {
262 if self.data.len() >= self.chunk_size {
263 let mut chunk = Vec::new();
264 for _i in 0..self.chunk_size {
265 chunk.push(self.data.pop_front().unwrap())
266 }
267 Some(chunk)
268 } else {
269 None
270 }
271 }
272}
273
274#[derive(Clone)]
278pub struct Sequential<T, U, F>
279where
280 T: Send + 'static,
281 U: Send + 'static,
282 F: FnMut(T) -> U + Send + 'static,
283{
284 f: F,
285 phantom: PhantomData<T>,
286}
287impl<T, U, F> Sequential<T, U, F>
288where
289 T: Send + 'static + Clone,
290 U: Send + 'static + Clone,
291 F: FnMut(T) -> U + Send + 'static + Clone,
292{
293 pub fn build(f: F) -> impl InOut<T, U> {
299 Self {
300 f,
301 phantom: PhantomData,
302 }
303 }
304}
305impl<T, U, F> InOut<T, U> for Sequential<T, U, F>
306where
307 T: Send + 'static + Clone,
308 U: Send + 'static + Clone,
309 F: FnMut(T) -> U + Send + 'static + Clone,
310{
311 fn run(&mut self, input: T) -> Option<U> {
312 Some((self.f)(input))
313 }
314}
315
316#[derive(Clone)]
320pub struct Parallel<T, U, F>
321where
322 T: Send + 'static,
323 U: Send + 'static,
324 F: FnMut(T) -> U + Send + 'static,
325{
326 n_replicas: usize,
327 f: F,
328 phantom: PhantomData<T>,
329}
330impl<T, U, F> Parallel<T, U, F>
331where
332 T: Send + 'static + Clone,
333 U: Send + 'static + Clone,
334 F: FnMut(T) -> U + Send + 'static + Clone,
335{
336 pub fn build(n_replicas: usize, f: F) -> impl InOut<T, U> {
343 Self {
344 n_replicas,
345 f,
346 phantom: PhantomData,
347 }
348 }
349}
350impl<T, U, F> InOut<T, U> for Parallel<T, U, F>
351where
352 T: Send + 'static + Clone,
353 U: Send + 'static + Clone,
354 F: FnMut(T) -> U + Send + 'static + Clone,
355{
356 fn run(&mut self, input: T) -> Option<U> {
357 Some((self.f)(input))
358 }
359 fn number_of_replicas(&self) -> usize {
360 self.n_replicas
361 }
362}
363
364#[derive(Clone)]
368pub struct Filter<T, F>
369where
370 T: Send + 'static,
371 F: FnMut(&T) -> bool + Send + 'static,
372{
373 f: F,
374 n_replicas: usize,
375 phantom: PhantomData<T>,
376}
377impl<T, F> Filter<T, F>
378where
379 T: Send + 'static + Clone,
380 F: FnMut(&T) -> bool + Send + 'static + Clone,
381{
382 pub fn build(f: F) -> impl InOut<T, T> {
404 Self {
405 f,
406 n_replicas: 1,
407 phantom: PhantomData,
408 }
409 }
410 pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut<T, T> {
417 Self {
418 f,
419 n_replicas,
420 phantom: PhantomData,
421 }
422 }
423}
424impl<T, F> InOut<T, T> for Filter<T, F>
425where
426 T: Send + 'static + Clone,
427 F: FnMut(&T) -> bool + Send + 'static + Clone,
428{
429 fn run(&mut self, input: T) -> Option<T> {
430 if (self.f)(&input) {
431 Some(input)
432 } else {
433 None
434 }
435 }
436 fn number_of_replicas(&self) -> usize {
437 self.n_replicas
438 }
439}
440
441pub struct OrderedSinkVec<T> {
450 data: Vec<T>,
451}
452impl<T> OrderedSinkVec<T>
453where
454 T: Send + 'static,
455{
456 pub fn build() -> impl In<T, Vec<T>> {
460 Self { data: Vec::new() }
461 }
462}
463impl<T> In<T, Vec<T>> for OrderedSinkVec<T>
464where
465 T: Send + 'static,
466{
467 fn run(&mut self, input: T) {
468 self.data.push(input);
469 }
470 fn is_ordered(&self) -> bool {
471 true
472 }
473 fn finalize(self) -> Option<Vec<T>> {
474 Some(self.data)
475 }
476}
477
478#[derive(Clone)]
485pub struct OrderedSplitter<T> {
486 chunk_size: usize,
487 n_replicas: usize,
488 data: VecDeque<T>,
489}
490impl<T> OrderedSplitter<T>
491where
492 T: Send + 'static + Clone,
493{
494 pub fn build(chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
498 Self {
499 chunk_size,
500 n_replicas: 1,
501 data: VecDeque::new(),
502 }
503 }
504
505 pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<Vec<T>, Vec<T>> {
510 Self {
511 chunk_size,
512 n_replicas,
513 data: VecDeque::new(),
514 }
515 }
516}
517impl<T> InOut<Vec<T>, Vec<T>> for OrderedSplitter<T>
518where
519 T: Send + 'static + Clone,
520{
521 fn run(&mut self, input: Vec<T>) -> Option<Vec<T>> {
522 self.data.extend(input);
523 None
524 }
525 fn number_of_replicas(&self) -> usize {
526 self.n_replicas
527 }
528 fn is_producer(&self) -> bool {
529 true
530 }
531 fn produce(&mut self) -> Option<Vec<T>> {
532 if self.data.len() >= self.chunk_size {
533 let mut chunk = Vec::new();
534 for _i in 0..self.chunk_size {
535 chunk.push(self.data.pop_front().unwrap())
536 }
537 Some(chunk)
538 } else {
539 None
540 }
541 }
542 fn is_ordered(&self) -> bool {
543 true
544 }
545}
546
547#[derive(Clone)]
554pub struct OrderedAggregator<T> {
555 chunk_size: usize,
556 n_replicas: usize,
557 data: VecDeque<T>,
558}
559impl<T> OrderedAggregator<T>
560where
561 T: Send + 'static + Clone,
562{
563 pub fn build(chunk_size: usize) -> impl InOut<T, Vec<T>> {
568 Self {
569 chunk_size,
570 n_replicas: 1,
571 data: VecDeque::new(),
572 }
573 }
574
575 pub fn build_with_replicas(n_replicas: usize, chunk_size: usize) -> impl InOut<T, Vec<T>> {
580 Self {
581 chunk_size,
582 n_replicas,
583 data: VecDeque::new(),
584 }
585 }
586}
587impl<T> InOut<T, Vec<T>> for OrderedAggregator<T>
588where
589 T: Send + 'static + Clone,
590{
591 fn run(&mut self, input: T) -> Option<Vec<T>> {
592 self.data.push_back(input);
593 None
594 }
595 fn number_of_replicas(&self) -> usize {
596 self.n_replicas
597 }
598 fn is_producer(&self) -> bool {
599 true
600 }
601 fn produce(&mut self) -> Option<Vec<T>> {
602 if self.data.len() >= self.chunk_size {
603 let mut chunk = Vec::new();
604 for _i in 0..self.chunk_size {
605 chunk.push(self.data.pop_front().unwrap())
606 }
607 Some(chunk)
608 } else {
609 None
610 }
611 }
612 fn is_ordered(&self) -> bool {
613 true
614 }
615}
616
617#[derive(Clone)]
623pub struct OrderedSequential<T, U, F> {
624 f: F,
625 phantom: PhantomData<(T, U)>,
626}
627impl<T, U, F> OrderedSequential<T, U, F>
628where
629 T: Send + 'static + Clone,
630 U: Send + 'static + Clone,
631 F: FnMut(T) -> U + Send + 'static + Clone,
632{
633 pub fn build(f: F) -> impl InOut<T, U> {
638 Self {
639 f,
640 phantom: PhantomData,
641 }
642 }
643}
644impl<T, U, F> InOut<T, U> for OrderedSequential<T, U, F>
645where
646 T: Send + 'static + Clone,
647 U: Send + 'static + Clone,
648 F: FnMut(T) -> U + Send + 'static + Clone,
649{
650 fn run(&mut self, input: T) -> Option<U> {
651 Some((self.f)(input))
652 }
653 fn is_ordered(&self) -> bool {
654 true
655 }
656}
657
658#[derive(Clone)]
664pub struct OrderedParallel<T, U, F> {
665 f: F,
666 n_replicas: usize,
667 phantom: PhantomData<(T, U)>,
668}
669impl<T, U, F> OrderedParallel<T, U, F>
670where
671 T: Send + 'static + Clone,
672 U: Send + 'static + Clone,
673 F: FnMut(T) -> U + Send + 'static + Clone,
674{
675 pub fn build(n_replicas: usize, f: F) -> impl InOut<T, U> {
681 Self {
682 f,
683 n_replicas,
684 phantom: PhantomData,
685 }
686 }
687}
688impl<T, U, F> InOut<T, U> for OrderedParallel<T, U, F>
689where
690 T: Send + 'static + Clone,
691 U: Send + 'static + Clone,
692 F: FnMut(T) -> U + Send + 'static + Clone,
693{
694 fn run(&mut self, input: T) -> Option<U> {
695 Some((self.f)(input))
696 }
697 fn is_ordered(&self) -> bool {
698 true
699 }
700 fn number_of_replicas(&self) -> usize {
701 self.n_replicas
702 }
703}
704
705#[derive(Clone)]
710pub struct OrderedFilter<T, F> {
711 f: F,
712 n_replicas: usize,
713 phantom: PhantomData<T>,
714}
715impl<T, F> OrderedFilter<T, F>
716where
717 T: Send + 'static + Clone,
718 F: FnMut(&T) -> bool + Send + 'static + Clone,
719{
720 pub fn build(f: F) -> impl InOut<T, T> {
725 Self {
726 f,
727 n_replicas: 1,
728 phantom: PhantomData,
729 }
730 }
731 pub fn build_with_replicas(n_replicas: usize, f: F) -> impl InOut<T, T> {
737 Self {
738 f,
739 n_replicas,
740 phantom: PhantomData,
741 }
742 }
743}
744impl<T, F> InOut<T, T> for OrderedFilter<T, F>
745where
746 T: Send + 'static + Clone,
747 F: FnMut(&T) -> bool + Send + 'static + Clone,
748{
749 fn run(&mut self, input: T) -> Option<T> {
750 if (self.f)(&input) {
751 Some(input)
752 } else {
753 None
754 }
755 }
756 fn is_ordered(&self) -> bool {
757 true
758 }
759 fn number_of_replicas(&self) -> usize {
760 self.n_replicas
761 }
762}
763
764#[cfg(test)]
765mod test {
766 use crate::{
767 prelude::*,
768 templates::misc::{
769 Filter, OrderedAggregator, OrderedFilter, OrderedParallel, OrderedSequential,
770 OrderedSinkVec, OrderedSplitter, Parallel, Sequential, SinkVec, SourceIter,
771 },
772 };
773 use serial_test::serial;
774
775 use super::{Aggregator, Splitter};
776
777 #[test]
778 #[serial]
779 fn simple_pipeline() {
780 let p = pipeline![
781 SourceIter::build(1..21),
782 Sequential::build(|el| {
783 el }),
785 SinkVec::build()
786 ];
787
788 let res = p.start_and_wait_end().unwrap();
789
790 assert_eq!(res.len(), 20)
791 }
792
793 #[test]
794 #[serial]
795 fn simple_pipeline_ordered() {
796 let p = pipeline![
797 SourceIter::build(1..21),
798 OrderedSequential::build(|el| {
799 el }),
801 OrderedSinkVec::build()
802 ];
803
804 let res = p.start_and_wait_end().unwrap();
805
806 assert_eq!(res.len(), 20);
807
808 let mut counter = 1;
809 for el in res {
810 assert_eq!(el, counter);
811 counter += 1;
812 }
813 }
814
815 #[test]
816 #[serial]
817 fn simple_farm() {
818 let p = pipeline![
819 SourceIter::build(1..21),
820 Parallel::build(8, |el| {
821 el }),
823 SinkVec::build()
824 ];
825
826 let res = p.start_and_wait_end().unwrap();
827
828 assert_eq!(res.len(), 20)
829 }
830
831 #[test]
832 #[serial]
833 fn simple_farm_ordered() {
834 let p = pipeline![
835 SourceIter::build(1..21),
836 OrderedParallel::build(8, |el| {
837 el }),
839 OrderedSinkVec::build()
840 ];
841
842 let res = p.start_and_wait_end().unwrap();
843
844 assert_eq!(res.len(), 20);
845
846 let mut counter = 1;
847 for el in res {
848 assert_eq!(el, counter);
849 counter += 1;
850 }
851 }
852
853 #[test]
854 #[serial]
855 fn splitter() {
856 let mut counter = 1;
857 let mut set = Vec::new();
858
859 for i in 0..1000 {
860 let mut vector = Vec::new();
861 for _i in 0..20 {
862 vector.push((i, counter));
863 counter += 1;
864 }
865 counter = 1;
866 set.push(vector);
867 }
868
869 let p = pipeline![
870 SourceIter::build(set.into_iter()),
871 Splitter::build_with_replicas(2, 2),
872 Splitter::build(20000),
873 SinkVec::build()
874 ];
875
876 let mut res = p.start_and_wait_end().unwrap();
877
878 assert_eq!(res.len(), 1);
879
880 let vec = res.pop().unwrap();
881
882 assert_eq!(vec.len(), 20000)
883 }
884
885 #[test]
886 #[serial]
887 fn splitter_ordered() {
888 let mut counter = 1;
889 let mut set = Vec::new();
890
891 for _i in 0..1000 {
892 let mut vector = Vec::new();
893 for _i in 0..20 {
894 vector.push(counter);
895 counter += 1;
896 }
897 set.push(vector);
898 }
899
900 let p = pipeline![
901 SourceIter::build(set.into_iter()),
902 OrderedSplitter::build_with_replicas(2, 10),
903 OrderedSplitter::build_with_replicas(4, 1),
904 OrderedSplitter::build(20000),
905 OrderedSinkVec::build()
906 ];
907
908 let mut res = p.start_and_wait_end().unwrap();
909
910 assert_eq!(res.len(), 1);
911
912 let vec = res.pop().unwrap();
913
914 assert_eq!(vec.len(), 20000);
915
916 counter = 1;
917 for el in vec {
918 assert_eq!(el, counter);
919 counter += 1;
920 }
921 }
922
923 #[test]
924 #[serial]
925 fn aggregator() {
926 let p = pipeline![
927 SourceIter::build(0..2000),
928 Aggregator::build(100),
929 SinkVec::build()
930 ];
931
932 let res = p.start_and_wait_end().unwrap();
933
934 assert_eq!(res.len(), 20);
935
936 for vec in res {
937 assert_eq!(vec.len(), 100);
938 }
939 }
940
941 #[test]
942 #[serial]
943 fn aggregator_ordered() {
944 let p = pipeline![
945 SourceIter::build(0..2000),
946 OrderedAggregator::build_with_replicas(4, 100),
947 OrderedSplitter::build(1),
948 OrderedSinkVec::build()
949 ];
950
951 let res = p.start_and_wait_end().unwrap();
952
953 assert_eq!(res.len(), 2000);
954
955 for vec in res {
956 assert_eq!(vec.len(), 1);
957 }
958 }
959
960 #[test]
961 #[serial]
962 fn filter() {
963 let p = pipeline![
964 SourceIter::build(0..200),
965 Filter::build(|el| { el % 2 == 0 }),
966 SinkVec::build()
967 ];
968
969 let res = p.start_and_wait_end().unwrap();
970
971 assert_eq!(res.len(), 100)
972 }
973
974 #[test]
975 #[serial]
976 fn filter_ordered() {
977 let p = pipeline![
978 SourceIter::build(0..200),
979 OrderedFilter::build_with_replicas(4, |el| { el % 2 == 0 }),
980 OrderedSinkVec::build()
981 ];
982
983 let res = p.start_and_wait_end().unwrap();
984
985 assert_eq!(res.len(), 100);
986
987 let mut counter = 0;
988 for el in res {
989 assert_eq!(el, counter);
990 counter += 2;
991 }
992 }
993}