persistent_scheduler/core/
model.rs

1use crate::{
2    core::{
3        retry::{RetryPolicy, RetryStrategy},
4    },
5    generate_token, utc_now,
6};
7use serde::{Deserialize, Serialize};
8use std::fmt;
9use crate::core::task_kind::TaskKind;
10
11type LinearInterval = u32;
12type ExponentialBase = u32;
13
14#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
15pub struct TaskMeta {
16    pub id: String,                     // Unique identifier for the task
17    pub task_key: String,               // Key to identify the specific task
18    pub task_params: String,            // Arguments for the task, stored as a JSON object
19    pub queue_name: String,             // Name of the queue for the task
20    pub updated_at: i64,                // Timestamp of the last update
21    pub status: TaskStatus,             // Current status of the task
22    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
23    pub last_error: Option<String>,     // Error message from the last execution, if any
24    pub last_run: i64,                  // Timestamp of the last run
25    pub next_run: i64,                  // Timestamp of the next scheduled run
26    pub kind: TaskKind,                 // Type of the task
27    pub success_count: u32,             // Count of successful runs
28    pub failure_count: u32,             // Count of failed runs
29    pub runner_id: Option<String>,      // The ID of the current task runner, may be None
30    pub retry_strategy: Retry,          // Retry strategy for handling failures
31    pub retry_interval: u32,            // Interval for retrying the task
32    pub base_interval: u32,             // Base interval for exponential backoff
33    pub delay_seconds: u32,             //Delay before executing a Once task, specified in seconds
34    pub max_retries: Option<u32>,       // Maximum number of retries allowed
35    pub is_repeating: bool,             // Indicates if the task is repeating
36    pub heartbeat_at: i64,              // Timestamp of the last heartbeat in milliseconds
37}
38
39#[derive(Clone, Debug, Eq, Default, PartialEq, Serialize, Deserialize, Hash)]
40pub enum TaskStatus {
41    // Task has been scheduled but has not started executing yet.
42    #[default]
43    Scheduled,
44
45    // Task is currently running.
46    Running,
47
48    // Task has completed successfully.
49    Success,
50
51    // Task has failed.
52    Failed,
53
54    // Task has been marked for removal and will be cleaned up by a dedicated thread.
55    Removed,
56
57    // Task has been stopped (applicable to Repeat and Cron type tasks).
58    Stopped,
59}
60
61impl fmt::Display for TaskStatus {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        let status_str = match self {
64            TaskStatus::Scheduled => "Scheduled",
65            TaskStatus::Running => "Running",
66            TaskStatus::Success => "Success",
67            TaskStatus::Failed => "Failed",
68            TaskStatus::Removed => "Removed",
69            TaskStatus::Stopped => "Stopped",
70        };
71        write!(f, "{}", status_str)
72    }
73}
74
75#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
76pub enum Retry {
77    #[default]
78    Linear,
79    Exponential,
80}
81
82impl fmt::Display for Retry {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            Retry::Linear => write!(f, "Linear"),
86            Retry::Exponential => write!(f, "Exponential"),
87        }
88    }
89}
90
91fn to_retry(retry_policy: RetryPolicy) -> (Retry, LinearInterval, ExponentialBase) {
92    match retry_policy.strategy {
93        RetryStrategy::Linear { interval } => (Retry::Linear, interval, Default::default()),
94        RetryStrategy::Exponential { base } => (Retry::Exponential, Default::default(), base),
95    }
96}
97
98impl TaskMeta {
99    pub fn new(
100        task_key: String,
101        task_params: String,
102        queue_name: String,
103        kind: TaskKind,
104        retry_policy: RetryPolicy,
105        is_repeating: bool,
106        delay_seconds: u32,
107    ) -> Self {
108        // Extract retry strategy and intervals from the given retry policy.
109        let (retry_strategy, retry_interval, base_interval) = to_retry(retry_policy);
110        Self {
111            id: generate_token!(),
112            task_key,
113            task_params,
114            queue_name,
115            updated_at: utc_now!(),
116            status: TaskStatus::Scheduled,
117            last_error: Default::default(),
118            last_run: Default::default(),
119            next_run: Default::default(),
120            kind,
121            stopped_reason: Default::default(),
122            success_count: Default::default(),
123            failure_count: Default::default(),
124            runner_id: Default::default(),
125            retry_strategy,
126            retry_interval,
127            base_interval,
128            max_retries: retry_policy.max_retries,
129            is_repeating,
130            heartbeat_at: Default::default(),
131            delay_seconds,
132        }
133    }
134
135    pub fn retry_policy(&self) -> RetryPolicy {
136        let strategy = match self.retry_strategy {
137            Retry::Linear => RetryStrategy::Linear {
138                interval: self.retry_interval,
139            },
140            Retry::Exponential => RetryStrategy::Exponential {
141                base: self.base_interval,
142            },
143        };
144
145        RetryPolicy {
146            strategy,
147            max_retries: self.max_retries,
148        }
149    }
150}