#[cfg(all(feature = "async", test))]
mod async_simple_step_test {
use std::sync::Arc;
use batch_processing::tokio::step::{AsyncStepRunner, AsyncStep, simple_step};
use batch_processing::tokio::step::simple_step::{AsyncSimpleStepBuilder, AsyncSimpleStepBuilderTrait};
use batch_processing::tokio::step::step_builder::AsyncStepBuilderTrait;
use tokio::join;
use tokio::sync::Mutex;
#[tokio::test]
async fn test_simple_step() {
let vector = Arc::new(Mutex::new(vec![1, 2, 3]));
let shared_vector = Arc::clone(&vector);
let step = AsyncSimpleStepBuilder::get("test".to_string())
.tasklet(Box::new(
move || {
let vector = shared_vector.clone();
return Box::pin(async move {
let mut vector = vector.lock().await;
vector.push(4);
println!("{}", format!("Step {}", String::new()));
});
}
)).build();
step.run().await;
let vector = Arc::clone(&vector);
let shared_vector = vector.lock().await;
println!("{:?}", shared_vector);
}
#[tokio::test]
async fn test_simple_step_with_decider() {
fn generate_step_with_sleep(name: String, millis_time: u64) -> AsyncStep {
return simple_step::get(name.clone())
.tasklet(Box::new(
move || {
let name = name.clone();
return Box::pin(async move {
tokio::time::sleep(std::time::Duration::from_millis(millis_time)).await;
println!("{}", format!("Step {}", name));
});
}
))
.decider(Box::new(
move || {
return Box::pin(async move {
return true;
});
}
))
.build();
}
let step1 = generate_step_with_sleep("step1".to_string(), 100);
let step2 = generate_step_with_sleep("step2".to_string(), 200);
let (step1, step2) = join!(step1.run(), step2.run());
assert!(step1.status.is_ok(), "The step 1 should be successful");
assert!(step2.status.is_ok(), "The step 2 should be successful");
}
}