1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::sync::{Arc, Mutex};
use std::thread::{JoinHandle, spawn};

use log::{error, info};

use crate::core::job::{now_time, JobStatus};
use crate::core::step::StepStatus;
use crate::sync::step::{Decider, Runner, SyncStep};

pub mod job_builder;

/// Represents a synchronous job.
pub struct Job {
    /// The name of the job.
    pub name: String,
    /// The start time of the job execution.
    pub start_time: Option<u64>,
    /// The end time of the job execution.
    pub end_time: Option<u64>,
    /// The list of steps to be executed in the job.
    pub steps: Vec<SyncStep>,
    /// Indicates whether the job is to be executed in multithreaded mode.
    pub multi_threaded: Option<bool>,
    /// The maximum number of threads allowed for multithreaded execution.
    pub max_threads: Option<usize>,
}

impl Runner for Job {
    /// The output type of the job execution.
    type Output = JobStatus;

    /// Executes the synchronous job and returns its status.
    fn run(self) -> Self::Output {
        let start_time = now_time();
        let multi_threaded = self.multi_threaded.unwrap_or(false);
        let steps = self.steps;
        if multi_threaded {
            info!("Running job {} with multi-threaded mode", self.name)
        } else {
            info!("Running job {} with single-threaded mode", self.name)
        }

        fn log_step(result: Result<String, String>) {
            match result {
                Ok(success_message) => {
                    info!("{}", success_message);
                }
                Err(error_message) => {
                    error!("{}", error_message);
                }
            }
        }

        return if !multi_threaded {
            let mut steps_status_vec: Vec<StepStatus> = Vec::new();
            for step in steps {
                if step.is_run().clone() {
                    info!("Step {} is skipped", &step.name);
                    continue;
                }

                let throw_tolerant = step.throw_tolerant.unwrap_or(false).clone();
                let step_name = step.name.clone();

                info!("Running step {}", &step_name);

                let step_result = step.run();
                steps_status_vec.push(step_result.clone());

                match step_result.status {
                    Ok(success_message) => {
                        info!("{}", success_message);
                    }
                    Err(error_message) => {
                        if throw_tolerant {
                            error!("Error occurred in step {} but it is throw tolerant", &step_name);
                            panic!("{}", error_message);
                        } else {
                            error!("{}", error_message);
                        }
                    }
                }
            }

            let name = format!("Job {} completed", self.name);
            info!("{}", name);
            let end_time = now_time();
            JobStatus {
                name: self.name,
                status: Ok(name),
                end_time: Some(end_time),
                start_time: Some(start_time),
                steps_status: steps_status_vec,
            }
        } else {
            let max_threads = self.max_threads.unwrap_or(1);
            let threads: Arc<Mutex<Vec<JoinHandle<StepStatus>>>> = Arc::new(Mutex::new(Vec::new()));
            let mut steps_status_vec: Vec<StepStatus> = Vec::new();

            for step in steps {
                let threads = Arc::clone(&threads);
                {
                    let mut threads = threads.lock().unwrap();
                    threads.push(spawn(move || {
                        step.run()
                    }));
                }
                {
                    let mut threads = threads.lock().unwrap();
                    let threads_len = threads.len().clone();

                    if threads_len >= max_threads {
                        while let Some(join_handler) = threads.pop() {
                            let step_result = join_handler.join().unwrap();

                            log_step(step_result.status.clone());

                            steps_status_vec.push(step_result);
                        }
                    }
                }
            }

            let threads = Arc::clone(&threads);
            let mut threads = threads.lock().unwrap();

            if !threads.is_empty() {
                while let Some(join_handler) = threads.pop() {
                    let step_result = join_handler.join().unwrap();

                    log_step(step_result.status.clone());

                    steps_status_vec.push(step_result);
                }
            }

            JobStatus {
                name: self.name.clone(),
                status: Ok(format!("Job {} completed", self.name)),
                start_time: Some(start_time),
                end_time: Some(now_time()),
                steps_status: steps_status_vec,
            }
        };
    }
}

// Allows `Job` to be sent between threads safely.
unsafe impl Send for Job {}