batch-processing 0.1.17

A batch library for processing a list of items in parallel
Documentation
#[cfg(all(feature = "async", test))]
mod async_complex_step_test {
    use batch_processing::tokio::step::complex_step::{AsyncComplexStepBuilder, ComplexStepBuilderTrait};
    use batch_processing::tokio::step::step_builder::AsyncStepBuilderTrait;
    use batch_processing::tokio::step::AsyncStepRunner;
    use futures::{stream, Stream};
    use std::pin::Pin;
    use std::sync::Arc;
    use tokio::sync::Mutex;

    #[tokio::test]
    async fn test_build() {
        let step_builder: AsyncComplexStepBuilder<String, String> = AsyncComplexStepBuilder::get("test".to_string())
            .reader(Box::new(move ||
            {
                return Box::pin(async move {
                    let stream: Pin<Box<dyn Stream<Item=String> + Send>> =
                        Box::pin(stream::iter(vec![String::new()]));
                    stream
                }
                );
            }))
            .processor(
                Box::new(
                    move |item: String| Box::pin(
                        async move {
                            item.to_uppercase()
                        }
                    )
                )
            )
            .writer(
                Box::new(
                    move |items: Vec<String>| Box::pin(
                        async move {
                            println!("{:?}", items);
                        }
                    )
                )
            );

        let step = step_builder.build();

        step.run().await;
    }

    #[tokio::test]
    async fn test_fault_tolerant_is_false() {
        let vec = Arc::new(Mutex::new(Vec::new()));
        let vec_write = vec.clone();
        let step_builder: AsyncComplexStepBuilder<String, String> = AsyncComplexStepBuilder::get("test".to_string())
            .chunk_size(1)
            .reader(Box::new(move ||
                {
                    return Box::pin(async move {
                        let stream: Pin<Box<dyn Stream<Item=String> + Send>> =
                            Box::pin(stream::iter(vec!["test-failed".to_string(), "test".to_string()]));
                        stream
                    }
                    );
                }))
            .processor(
                Box::new(
                    move |item: String| Box::pin(
                        async move {
                            if item == "test-failed".to_string() {
                                panic!("test failed");
                            }
                            return item.to_uppercase();
                        }
                    )
                )
            )
            .writer(
                Box::new(
                    move |items: Vec<String>| {
                        let vec_write = vec_write.clone();
                        Box::pin(
                            async move {
                                let mut vec = vec_write.lock().await;
                                for item in items {
                                    vec.push(item);
                                }
                            }
                        )
                    }
                )
            );

        let step = step_builder.build();
        let step_result = step.run().await;
        let vec = vec.lock().await;
        assert_eq!(step_result.status.is_err(), true);
        assert_eq!(vec.len(), 0);
    }

    #[tokio::test]
    async fn test_fault_tolerant_is_true() {
        let vec = Arc::new(Mutex::new(Vec::new()));
        let vec_write = vec.clone();
        let step_builder: AsyncComplexStepBuilder<String, String> = AsyncComplexStepBuilder::get("test".to_string())
            .chunk_size(1)
            .throw_tolerant()
            .reader(Box::new(move ||
                {
                    return Box::pin(async move {
                        let stream: Pin<Box<dyn Stream<Item=String> + Send>> =
                            Box::pin(stream::iter(vec!["test-failed".to_string(), "test".to_string()]));
                        stream
                    }
                    );
                }))
            .processor(
                Box::new(
                    move |item: String| Box::pin(
                        async move {
                            if item == "test-failed".to_string() {
                                panic!("test failed");
                            }
                            return item.to_uppercase();
                        }
                    )
                )
            )
            .writer(
                Box::new(
                    move |items: Vec<String>| {
                        let vec_write = vec_write.clone();
                        Box::pin(
                            async move {
                                let mut vec = vec_write.lock().await;
                                for item in items {
                                    vec.push(item);
                                }
                            }
                        )
                    }
                )
            );

        let step = step_builder.build();
        let step_result = step.run().await;
        let vec = vec.lock().await;
        assert_eq!(step_result.status.is_err(), true);
        assert_eq!(vec.len(), 1);
    }
}