batch_processing/sync/job/
mod.rs

1use std::sync::{Arc, Mutex};
2use std::thread::{JoinHandle, spawn};
3
4use log::{error, info};
5
6use crate::core::job::{now_time, JobStatus};
7use crate::core::step::StepStatus;
8use crate::sync::step::{Decider, Runner, SyncStep};
9
10pub mod job_builder;
11
12/// Represents a synchronous job.
13pub struct Job {
14    /// The name of the job.
15    pub name: String,
16    /// The start time of the job execution.
17    pub start_time: Option<u64>,
18    /// The end time of the job execution.
19    pub end_time: Option<u64>,
20    /// The list of steps to be executed in the job.
21    pub steps: Vec<SyncStep>,
22    /// Indicates whether the job is to be executed in multithreaded mode.
23    pub multi_threaded: Option<bool>,
24    /// The maximum number of threads allowed for multithreaded execution.
25    pub max_threads: Option<usize>,
26}
27
28impl Runner for Job {
29    /// The output type of the job execution.
30    type Output = JobStatus;
31
32    /// Executes the synchronous job and returns its status.
33    fn run(self) -> Self::Output {
34        let start_time = now_time();
35        let multi_threaded = self.multi_threaded.unwrap_or(false);
36        let steps = self.steps;
37        if multi_threaded {
38            info!("Running job {} with multi-threaded mode", self.name)
39        } else {
40            info!("Running job {} with single-threaded mode", self.name)
41        }
42
43        fn log_step(result: Result<String, String>) {
44            match result {
45                Ok(success_message) => {
46                    info!("{}", success_message);
47                }
48                Err(error_message) => {
49                    error!("{}", error_message);
50                }
51            }
52        }
53
54        return if !multi_threaded {
55            let mut steps_status_vec: Vec<StepStatus> = Vec::new();
56            for step in steps {
57                if step.is_run().clone() {
58                    info!("Step {} is skipped", &step.name);
59                    continue;
60                }
61
62                let throw_tolerant = step.throw_tolerant.unwrap_or(false).clone();
63                let step_name = step.name.clone();
64
65                info!("Running step {}", &step_name);
66
67                let step_result = step.run();
68                steps_status_vec.push(step_result.clone());
69
70                match step_result.status {
71                    Ok(success_message) => {
72                        info!("{}", success_message);
73                    }
74                    Err(error_message) => {
75                        if throw_tolerant {
76                            error!("Error occurred in step {} but it is throw tolerant", &step_name);
77                            panic!("{}", error_message);
78                        } else {
79                            error!("{}", error_message);
80                        }
81                    }
82                }
83            }
84
85            let name = format!("Job {} completed", self.name);
86            info!("{}", name);
87            let end_time = now_time();
88            JobStatus {
89                name: self.name,
90                status: Ok(name),
91                end_time: Some(end_time),
92                start_time: Some(start_time),
93                steps_status: steps_status_vec,
94            }
95        } else {
96            let max_threads = self.max_threads.unwrap_or(1);
97            let threads: Arc<Mutex<Vec<JoinHandle<StepStatus>>>> = Arc::new(Mutex::new(Vec::new()));
98            let mut steps_status_vec: Vec<StepStatus> = Vec::new();
99
100            for step in steps {
101                let threads = Arc::clone(&threads);
102                {
103                    let mut threads = threads.lock().unwrap();
104                    threads.push(spawn(move || {
105                        step.run()
106                    }));
107                }
108                {
109                    let mut threads = threads.lock().unwrap();
110                    let threads_len = threads.len().clone();
111
112                    if threads_len >= max_threads {
113                        while let Some(join_handler) = threads.pop() {
114                            let step_result = join_handler.join().unwrap();
115
116                            log_step(step_result.status.clone());
117
118                            steps_status_vec.push(step_result);
119                        }
120                    }
121                }
122            }
123
124            let threads = Arc::clone(&threads);
125            let mut threads = threads.lock().unwrap();
126
127            if !threads.is_empty() {
128                while let Some(join_handler) = threads.pop() {
129                    let step_result = join_handler.join().unwrap();
130
131                    log_step(step_result.status.clone());
132
133                    steps_status_vec.push(step_result);
134                }
135            }
136
137            JobStatus {
138                name: self.name.clone(),
139                status: Ok(format!("Job {} completed", self.name)),
140                start_time: Some(start_time),
141                end_time: Some(now_time()),
142                steps_status: steps_status_vec,
143            }
144        };
145    }
146}
147
148// Allows `Job` to be sent between threads safely.
149unsafe impl Send for Job {}