batch_processing/sync/step/
mod.rs1use std::thread;
2use log::info;
3use crate::core::job::now_time;
4use crate::core::step::{mount_step_status, StepStatus, throw_tolerant_exception};
5
6pub mod complex_step;
7pub mod simple_step;
8pub mod step_builder;
9
10pub trait Runner
12where
13 Self: Sized,
14{
15 type Output;
17
18 fn run(self) -> Self::Output;
20}
21
22pub trait Decider {
24 fn is_run(&self) -> bool;
26}
27
28pub type StepCallback = Box<dyn FnOnce() + Send>;
29
30pub type DeciderCallback = Box<dyn Fn() -> bool>;
31
32pub struct SyncStep {
34 #[allow(dead_code)]
36 pub(crate) start_time: Option<u64>,
37 #[allow(dead_code)]
39 pub(crate) end_time: Option<u64>,
40 pub name: String,
42 pub throw_tolerant: Option<bool>,
44 pub(crate) decider: Option<DeciderCallback>,
46 pub(crate) callback: Option<Box<dyn FnOnce() -> () + Send>>,
48}
49
50impl Runner for SyncStep {
51 type Output = StepStatus;
53
54 fn run(self) -> Self::Output {
56 return match self.callback {
57 None => {
58 throw_tolerant_exception(self.throw_tolerant.unwrap_or(false), self.name)
59 }
60 Some(callback) => {
61 info!("Step {} is running", self.name);
62 let task = thread::spawn(move || {
63 callback();
64 });
65 let task_result = task.join();
66
67 let start_time = now_time();
68 return match task_result {
69 Ok(_) => {
70 let message = format!("Step {} executed successfully", self.name);
71 info!("{}", message);
72 mount_step_status(self.name, Ok(message), start_time)
73 }
74 Err(_) => {
75 let message = format!("Step {} failed to execute", self.name);
76 info!("{}", message);
77 mount_step_status(self.name, Err(message), start_time)
78 }
79 };
80 }
81 };
82 }
83}
84
85impl Decider for SyncStep {
86 fn is_run(&self) -> bool {
88 return match &self.decider {
89 None => true,
90 Some(decider) => decider(),
91 };
92 }
93}
94
95unsafe impl Send for SyncStep {}