batch-processing 0.1.17

A batch library for processing a list of items in parallel
Documentation
#[cfg(all(feature = "async", test))]
mod job_test {
    use std::pin::Pin;
    use futures::{Stream, stream};
    use batch_processing::tokio::job::job_builder::{AsyncJobBuilder, AsyncJobBuilderTrait};
    use batch_processing::tokio::step::{AsyncStepRunner, AsyncStep};
    use batch_processing::tokio::step::complex_step::{AsyncComplexStepBuilder, ComplexStepBuilderTrait};
    use batch_processing::tokio::step::simple_step::{AsyncSimpleStepBuilder, AsyncSimpleStepBuilderTrait};
    use batch_processing::tokio::step::step_builder::AsyncStepBuilderTrait;

    #[tokio::test]
    async fn job_simple_step() {
        fn generate_step(step_count: i8) -> AsyncStep {
            return AsyncSimpleStepBuilder::get(String::from(format!("step{}", step_count)))
                .tasklet(Box::new(move || {
                    return Box::pin(async move {
                        println!("{}", format!("Step {}", step_count));
                    });
                }))
                .build();
        }

        let mut job_builder = AsyncJobBuilder::get(String::from("sync-data"))
            .step(generate_step(1))
            .step(generate_step(2));

        for i in 3..=10 {
            job_builder = job_builder.step(generate_step(i));
        }

        let job = job_builder.build();
        let job_status = job.run().await;
        println!("{:?}", job_status);
    }


    #[tokio::test]
    async fn job_complex_step() {
        fn generate_step(
            step_count: i32
        ) -> AsyncStep {
            let step_builder: AsyncComplexStepBuilder<i32, i32> = AsyncComplexStepBuilder::get("test".to_string())
                .reader(Box::new(move ||
                    {
                        let step_count = step_count;
                        let mut vec: Vec<i32> = Vec::new();

                        for n in 0..=step_count {
                            vec.push(n);
                        }

                        let stream: Pin<Box<dyn Stream<Item=i32> + Send>> = Box::pin(Box::new(stream::iter(vec)));
                        Box::pin(async move {
                                stream
                            }
                        )
                    }))
                .processor(
                    Box::new(
                        move |item: i32| Box::pin(
                            async move {
                                item * 2
                            }
                        )
                    )
                )
                .writer(
                    Box::new(
                        move |items: Vec<i32>| Box::pin(
                            async move {
                                println!("{:?}", items);
                            }
                        )
                    )
                );

            return step_builder.build();
        }

        let mut job_builder = AsyncJobBuilder::get(String::from("job_complex_step"))
            .multi_tasks(4);

        for i in 1..=4 {
            job_builder = job_builder.step(generate_step(i));
        }

        let job = job_builder.build();

        job.run().await;
    }
}