batch_processing/sync/step/
mod.rs

1use std::thread;
2use log::info;
3use crate::core::job::now_time;
4use crate::core::step::{mount_step_status, StepStatus, throw_tolerant_exception};
5
6pub mod complex_step;
7pub mod simple_step;
8pub mod step_builder;
9
10/// A trait for objects that can be executed.
11pub trait Runner
12where
13    Self: Sized,
14{
15    /// The type of output produced by the execution.
16    type Output;
17
18    /// Executes the object and returns the output.
19    fn run(self) -> Self::Output;
20}
21
22/// A trait for objects that can make decisions.
23pub trait Decider {
24    /// Checks if the object should be executed.
25    fn is_run(&self) -> bool;
26}
27
28pub type StepCallback = Box<dyn FnOnce() + Send>;
29
30pub type DeciderCallback = Box<dyn Fn() -> bool>;
31
32/// Represents a synchronous step in a job.
33pub struct SyncStep {
34    /// The start time of the step execution.
35    #[allow(dead_code)]
36    pub(crate) start_time: Option<u64>,
37    /// The end time of the step execution.
38    #[allow(dead_code)]
39    pub(crate) end_time: Option<u64>,
40    /// The name of the step.
41    pub name: String,
42    /// Indicates whether the step is tolerant to thrown exceptions.
43    pub throw_tolerant: Option<bool>,
44    /// The decider callback for the step.
45    pub(crate) decider: Option<DeciderCallback>,
46    /// The callback function to be executed as the step.
47    pub(crate) callback: Option<Box<dyn FnOnce() -> () + Send>>,
48}
49
50impl Runner for SyncStep {
51    /// The output type of the step execution.
52    type Output = StepStatus;
53
54    /// Executes the step and returns its status.
55    fn run(self) -> Self::Output {
56        return match self.callback {
57            None => {
58                throw_tolerant_exception(self.throw_tolerant.unwrap_or(false), self.name)
59            }
60            Some(callback) => {
61                info!("Step {} is running", self.name);
62                let task = thread::spawn(move || {
63                    callback();
64                });
65                let task_result = task.join();
66
67                let start_time = now_time();
68                return match task_result {
69                    Ok(_) => {
70                        let message = format!("Step {} executed successfully", self.name);
71                        info!("{}", message);
72                        mount_step_status(self.name, Ok(message), start_time)
73                    }
74                    Err(_) => {
75                        let message = format!("Step {} failed to execute", self.name);
76                        info!("{}", message);
77                        mount_step_status(self.name, Err(message), start_time)
78                    }
79                };
80            }
81        };
82    }
83}
84
85impl Decider for SyncStep {
86    /// Checks if the step should be executed based on the decider callback.
87    fn is_run(&self) -> bool {
88        return match &self.decider {
89            None => true,
90            Some(decider) => decider(),
91        };
92    }
93}
94
95// Allows `SyncStep` to be sent between threads safely.
96unsafe impl Send for SyncStep {}