batch_processing/sync/job/
mod.rs1use std::sync::{Arc, Mutex};
2use std::thread::{JoinHandle, spawn};
3
4use log::{error, info};
5
6use crate::core::job::{now_time, JobStatus};
7use crate::core::step::StepStatus;
8use crate::sync::step::{Decider, Runner, SyncStep};
9
10pub mod job_builder;
11
12pub struct Job {
14 pub name: String,
16 pub start_time: Option<u64>,
18 pub end_time: Option<u64>,
20 pub steps: Vec<SyncStep>,
22 pub multi_threaded: Option<bool>,
24 pub max_threads: Option<usize>,
26}
27
28impl Runner for Job {
29 type Output = JobStatus;
31
32 fn run(self) -> Self::Output {
34 let start_time = now_time();
35 let multi_threaded = self.multi_threaded.unwrap_or(false);
36 let steps = self.steps;
37 if multi_threaded {
38 info!("Running job {} with multi-threaded mode", self.name)
39 } else {
40 info!("Running job {} with single-threaded mode", self.name)
41 }
42
43 fn log_step(result: Result<String, String>) {
44 match result {
45 Ok(success_message) => {
46 info!("{}", success_message);
47 }
48 Err(error_message) => {
49 error!("{}", error_message);
50 }
51 }
52 }
53
54 return if !multi_threaded {
55 let mut steps_status_vec: Vec<StepStatus> = Vec::new();
56 for step in steps {
57 if step.is_run().clone() {
58 info!("Step {} is skipped", &step.name);
59 continue;
60 }
61
62 let throw_tolerant = step.throw_tolerant.unwrap_or(false).clone();
63 let step_name = step.name.clone();
64
65 info!("Running step {}", &step_name);
66
67 let step_result = step.run();
68 steps_status_vec.push(step_result.clone());
69
70 match step_result.status {
71 Ok(success_message) => {
72 info!("{}", success_message);
73 }
74 Err(error_message) => {
75 if throw_tolerant {
76 error!("Error occurred in step {} but it is throw tolerant", &step_name);
77 panic!("{}", error_message);
78 } else {
79 error!("{}", error_message);
80 }
81 }
82 }
83 }
84
85 let name = format!("Job {} completed", self.name);
86 info!("{}", name);
87 let end_time = now_time();
88 JobStatus {
89 name: self.name,
90 status: Ok(name),
91 end_time: Some(end_time),
92 start_time: Some(start_time),
93 steps_status: steps_status_vec,
94 }
95 } else {
96 let max_threads = self.max_threads.unwrap_or(1);
97 let threads: Arc<Mutex<Vec<JoinHandle<StepStatus>>>> = Arc::new(Mutex::new(Vec::new()));
98 let mut steps_status_vec: Vec<StepStatus> = Vec::new();
99
100 for step in steps {
101 let threads = Arc::clone(&threads);
102 {
103 let mut threads = threads.lock().unwrap();
104 threads.push(spawn(move || {
105 step.run()
106 }));
107 }
108 {
109 let mut threads = threads.lock().unwrap();
110 let threads_len = threads.len().clone();
111
112 if threads_len >= max_threads {
113 while let Some(join_handler) = threads.pop() {
114 let step_result = join_handler.join().unwrap();
115
116 log_step(step_result.status.clone());
117
118 steps_status_vec.push(step_result);
119 }
120 }
121 }
122 }
123
124 let threads = Arc::clone(&threads);
125 let mut threads = threads.lock().unwrap();
126
127 if !threads.is_empty() {
128 while let Some(join_handler) = threads.pop() {
129 let step_result = join_handler.join().unwrap();
130
131 log_step(step_result.status.clone());
132
133 steps_status_vec.push(step_result);
134 }
135 }
136
137 JobStatus {
138 name: self.name.clone(),
139 status: Ok(format!("Job {} completed", self.name)),
140 start_time: Some(start_time),
141 end_time: Some(now_time()),
142 steps_status: steps_status_vec,
143 }
144 };
145 }
146}
147
148unsafe impl Send for Job {}