use async_trait::async_trait;
use futures::future::BoxFuture;
use log::info;
use tokio::task::JoinError;
use crate::core::job::now_time;
use crate::core::step::{mount_step_status, StepStatus, throw_tolerant_exception};
pub mod simple_step;
pub mod step_builder;
pub mod complex_step;
pub mod parallel_step_builder;
#[async_trait]
pub trait AsyncStepRunner<R> where Self: Sized + Send {
async fn run(self) -> R;
}
#[async_trait]
pub trait Decider {
async fn decide(&self) -> bool;
}
pub type DynAsyncCallback<O> = dyn Send + Sync + Fn() -> BoxFuture<'static, O>;
pub type DeciderCallback = Box<dyn Send + Sync + Fn() -> BoxFuture<'static, bool>>;
type StepResult = Result<(), JoinError>;
pub struct AsyncStep {
pub name: String,
pub throw_tolerant: Option<bool>,
decider: Option<DeciderCallback>,
callback: Option<Box<DynAsyncCallback<StepResult>>>,
}
#[async_trait]
impl AsyncStepRunner<StepStatus> for AsyncStep {
async fn run(self) -> StepStatus {
return match self.callback {
None => {
throw_tolerant_exception(self.throw_tolerant.unwrap_or(false), self.name)
}
Some(callback) => {
let start_time = now_time();
info!("Step {} is running", self.name);
let callback_result = tokio::spawn(async move {
return callback().await;
}).await;
return match callback_result {
Ok(step_result) => {
if let Err(error) = step_result {
let message = format!("Step {} completed with error: {}", self.name, error.to_string());
info!("{}", message);
return mount_step_status(self.name, Err(message), start_time)
}
let message = format!("Step {} executed successfully", self.name);
info!("{}", message);
mount_step_status(self.name, Ok(message), start_time)
}
Err(error) => {
let message = format!("Step {} failed to execute: {}", self.name, error.to_string());
info!("{}", message);
mount_step_status(self.name, Err(message), start_time)
},
};
}
};
}
}
#[async_trait]
impl Decider for AsyncStep {
async fn decide(&self) -> bool {
return match &self.decider {
None => true,
Some(decider) => decider().await,
};
}
}
unsafe impl Send for AsyncStep {}