persistent_scheduler/core/
model.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use crate::{
    core::{
        retry::{RetryPolicy, RetryStrategy},
        task_kind::TaskKind,
    },
    generate_token, utc_now,
};
use serde::{Deserialize, Serialize};
use std::fmt;
type LinearInterval = u32;
type ExponentialBase = u32;

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct TaskMeta {
    pub id: String,                     // Unique identifier for the task
    pub task_key: String,               // Key to identify the specific task
    pub task_params: String,            // Arguments for the task, stored as a JSON object
    pub queue_name: String,             // Name of the queue for the task
    pub updated_at: i64,                // Timestamp of the last update
    pub status: TaskStatus,             // Current status of the task
    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
    pub last_error: Option<String>,     // Error message from the last execution, if any
    pub last_run: i64,                  // Timestamp of the last run
    pub next_run: i64,                  // Timestamp of the next scheduled run
    pub kind: TaskKind,                 // Type of the task
    pub success_count: u32,             // Count of successful runs
    pub failure_count: u32,             // Count of failed runs
    pub runner_id: Option<String>,      // The ID of the current task runner, may be None
    pub retry_strategy: Retry,          // Retry strategy for handling failures
    pub retry_interval: u32,            // Interval for retrying the task
    pub base_interval: u32,             // Base interval for exponential backoff
    pub delay_seconds: u32,             //Delay before executing a Once task, specified in seconds
    pub max_retries: Option<u32>,       // Maximum number of retries allowed
    pub cron_schedule: Option<String>,  // Cron expression for scheduling
    pub cron_timezone: Option<String>,  // Timezone for the cron schedule (stored as a string)
    pub is_repeating: bool,             // Indicates if the task is repeating
    pub repeat_interval: u32,           // Interval for repeating task
    pub heartbeat_at: i64,              // Timestamp of the last heartbeat in milliseconds
}

#[derive(Clone, Debug, Eq, Default, PartialEq, Serialize, Deserialize, Hash)]
pub enum TaskStatus {
    // Task has been scheduled but has not started executing yet.
    #[default]
    Scheduled,

    // Task is currently running.
    Running,

    // Task has completed successfully.
    Success,

    // Task has failed.
    Failed,

    // Task has been marked for removal and will be cleaned up by a dedicated thread.
    Removed,

    // Task has been stopped (applicable to Repeat and Cron type tasks).
    Stopped,
}

impl fmt::Display for TaskStatus {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let status_str = match self {
            TaskStatus::Scheduled => "Scheduled",
            TaskStatus::Running => "Running",
            TaskStatus::Success => "Success",
            TaskStatus::Failed => "Failed",
            TaskStatus::Removed => "Removed",
            TaskStatus::Stopped => "Stopped",
        };
        write!(f, "{}", status_str)
    }
}

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
pub enum Retry {
    #[default]
    Linear,
    Exponential,
}

impl fmt::Display for Retry {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Retry::Linear => write!(f, "Linear"),
            Retry::Exponential => write!(f, "Exponential"),
        }
    }
}

fn to_retry(retry_policy: RetryPolicy) -> (Retry, LinearInterval, ExponentialBase) {
    match retry_policy.strategy {
        RetryStrategy::Linear { interval } => (Retry::Linear, interval, Default::default()),
        RetryStrategy::Exponential { base } => (Retry::Exponential, Default::default(), base),
    }
}

impl TaskMeta {
    pub fn new(
        task_key: String,
        task_params: String,
        queue_name: String,
        kind: TaskKind,
        retry_policy: RetryPolicy,
        cron_schedule: Option<String>,
        cron_timezone: Option<String>,
        is_repeating: bool,
        repeat_interval: Option<u32>,
        delay_seconds: u32,
    ) -> Self {
        // Extract retry strategy and intervals from the given retry policy.
        let (retry_strategy, retry_interval, base_interval) = to_retry(retry_policy);
        let repeat_interval = match repeat_interval {
            Some(v) => v,
            None => Default::default(),
        };
        Self {
            id: generate_token!(),
            task_key,
            task_params,
            queue_name,
            updated_at: utc_now!(),
            status: TaskStatus::Scheduled,
            last_error: Default::default(),
            last_run: Default::default(),
            next_run: Default::default(),
            kind,
            stopped_reason: Default::default(),
            success_count: Default::default(),
            failure_count: Default::default(),
            runner_id: Default::default(),
            retry_strategy,
            retry_interval,
            base_interval,
            max_retries: retry_policy.max_retries,
            cron_schedule,
            cron_timezone,
            is_repeating,
            repeat_interval,
            heartbeat_at: Default::default(),
            delay_seconds,
        }
    }

    pub fn retry_policy(&self) -> RetryPolicy {
        let strategy = match self.retry_strategy {
            Retry::Linear => RetryStrategy::Linear {
                interval: self.retry_interval,
            },
            Retry::Exponential => RetryStrategy::Exponential {
                base: self.base_interval,
            },
        };

        RetryPolicy {
            strategy,
            max_retries: self.max_retries,
        }
    }
}