1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
use std::thread;
use log::info;
use crate::core::job::now_time;
use crate::core::step::{mount_step_status, StepStatus, throw_tolerant_exception};
pub mod complex_step;
pub mod simple_step;
pub mod step_builder;
/// A trait for objects that can be executed.
pub trait Runner
where
Self: Sized,
{
/// The type of output produced by the execution.
type Output;
/// Executes the object and returns the output.
fn run(self) -> Self::Output;
}
/// A trait for objects that can make decisions.
pub trait Decider {
/// Checks if the object should be executed.
fn is_run(&self) -> bool;
}
pub type StepCallback = Box<dyn FnOnce() + Send>;
pub type DeciderCallback = Box<dyn Fn() -> bool>;
/// Represents a synchronous step in a job.
pub struct SyncStep {
/// The start time of the step execution.
#[allow(dead_code)]
pub(crate) start_time: Option<u64>,
/// The end time of the step execution.
#[allow(dead_code)]
pub(crate) end_time: Option<u64>,
/// The name of the step.
pub name: String,
/// Indicates whether the step is tolerant to thrown exceptions.
pub throw_tolerant: Option<bool>,
/// The decider callback for the step.
pub(crate) decider: Option<DeciderCallback>,
/// The callback function to be executed as the step.
pub(crate) callback: Option<Box<dyn FnOnce() -> () + Send>>,
}
impl Runner for SyncStep {
/// The output type of the step execution.
type Output = StepStatus;
/// Executes the step and returns its status.
fn run(self) -> Self::Output {
return match self.callback {
None => {
throw_tolerant_exception(self.throw_tolerant.unwrap_or(false), self.name)
}
Some(callback) => {
info!("Step {} is running", self.name);
let task = thread::spawn(move || {
callback();
});
let task_result = task.join();
let start_time = now_time();
return match task_result {
Ok(_) => {
let message = format!("Step {} executed successfully", self.name);
info!("{}", message);
mount_step_status(self.name, Ok(message), start_time)
}
Err(_) => {
let message = format!("Step {} failed to execute", self.name);
info!("{}", message);
mount_step_status(self.name, Err(message), start_time)
}
};
}
};
}
}
impl Decider for SyncStep {
/// Checks if the step should be executed based on the decider callback.
fn is_run(&self) -> bool {
return match &self.decider {
None => true,
Some(decider) => decider(),
};
}
}
// Allows `SyncStep` to be sent between threads safely.
unsafe impl Send for SyncStep {}