1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use std::thread;
use std::time::SystemTime;

use crate::core::step::{mount_step_status, StepStatus, throw_tolerant_exception};

pub mod complex_step;
pub mod simple_step;
pub mod step_builder;

/// A trait for objects that can be executed.
pub trait Runner where Self: Sized {
    /// The type of output produced by the execution.
    type Output;

    /// Executes the object and returns the output.
    fn run(self) -> Self::Output;
}

/// A trait for objects that can make decisions.
pub trait Decider {
    /// Checks if the object should be executed.
    fn is_run(&self) -> bool;
}

pub type StepCallback = Box<dyn FnOnce() + Send>;

pub type DeciderCallback = Box<dyn Fn() -> bool>;

/// Represents a synchronous step in a job.
pub struct SyncStep {
    /// The start time of the step execution.
    #[allow(dead_code)]
    pub(crate) start_time: Option<u64>,
    /// The end time of the step execution.
    #[allow(dead_code)]
    pub(crate) end_time: Option<u64>,
    /// The name of the step.
    pub name: String,
    /// Indicates whether the step is tolerant to thrown exceptions.
    pub throw_tolerant: Option<bool>,
    /// The decider callback for the step.
    pub(crate) decider: Option<DeciderCallback>,
    /// The callback function to be executed as the step.
    pub(crate) callback: Option<Box<dyn FnOnce() -> () + Send>>,
}

impl Runner for SyncStep {
    /// The output type of the step execution.
    type Output = StepStatus;

    /// Executes the step and returns its status.
    fn run(self) -> Self::Output {
        let start_time = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis();

        return match self.callback {
            None => {
                throw_tolerant_exception(self.throw_tolerant.unwrap_or(false), self.name)
            }
            Some(callback) => {
                let thread = thread::spawn(move || {
                    callback();
                });
                let thread_result = thread.join();

                return match thread_result {
                    Ok(_) => mount_step_status(Ok(format!("Step {} executed successfully", self.name)), start_time),
                    Err(_) => mount_step_status(Err(format!("Step {} failed to execute", self.name)), start_time),
                };
            }
        };
    }
}

impl Decider for SyncStep {
    /// Checks if the step should be executed based on the decider callback.
    fn is_run(&self) -> bool {
        return match &self.decider {
            None => true,
            Some(decider) => decider(),
        };
    }
}

// Allows `SyncStep` to be sent between threads safely.
unsafe impl Send for SyncStep {}