persistent_scheduler/core/
model.rs

1use crate::core::task_kind::TaskKind;
2use crate::{
3    core::retry::{RetryPolicy, RetryStrategy},
4    generate_token, utc_now,
5};
6use serde::{Deserialize, Serialize};
7use std::fmt;
8
9type LinearInterval = u32;
10type ExponentialBase = u32;
11
12#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
13pub struct TaskMeta {
14    pub id: String,                     // Unique identifier for the task
15    pub task_key: String,               // Key to identify the specific task
16    pub task_params: String,            // Arguments for the task, stored as a JSON object
17    pub queue_name: String,             // Name of the queue for the task
18    pub updated_at: i64,                // Timestamp of the last update
19    pub status: TaskStatus,             // Current status of the task
20    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
21    pub last_error: Option<String>,     // Error message from the last execution, if any
22    /// Duration of last run in milliseconds, including retries
23    pub last_duration_ms: Option<usize>,
24    /// Number of retries before last completion (success or final failure)
25    pub last_retry_count: Option<usize>,
26    pub last_run: i64,             // Timestamp of the last run
27    pub next_run: i64,             // Timestamp of the next scheduled run
28    pub kind: TaskKind,            // Type of the task
29    pub success_count: u32,        // Count of successful runs
30    pub failure_count: u32,        // Count of failed runs
31    pub runner_id: Option<String>, // The ID of the current task runner, may be None
32    pub retry_strategy: Retry,     // Retry strategy for handling failures
33    pub retry_interval: u32,       // Interval for retrying the task
34    pub base_interval: u32,        // Base interval for exponential backoff
35    pub delay_seconds: u32,        //Delay before executing a Once task, specified in seconds
36    pub max_retries: Option<u32>,  // Maximum number of retries allowed
37    pub is_repeating: bool,        // Indicates if the task is repeating
38    pub heartbeat_at: i64,         // Timestamp of the last heartbeat in milliseconds
39    pub created_at: i64,           // Timestamp of the task creation
40}
41
42#[derive(Clone, Debug, Eq, Default, PartialEq, Serialize, Deserialize, Hash)]
43pub enum TaskStatus {
44    // Task has been scheduled but has not started executing yet.
45    #[default]
46    Scheduled,
47
48    // Task is currently running.
49    Running,
50
51    // Task has completed successfully.
52    Success,
53
54    // Task has failed.
55    Failed,
56
57    // Task has been marked for removal and will be cleaned up by a dedicated thread.
58    Removed,
59
60    // Task has been stopped (applicable to Repeat and Cron type tasks).
61    Stopped,
62}
63
64impl fmt::Display for TaskStatus {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        let status_str = match self {
67            TaskStatus::Scheduled => "Scheduled",
68            TaskStatus::Running => "Running",
69            TaskStatus::Success => "Success",
70            TaskStatus::Failed => "Failed",
71            TaskStatus::Removed => "Removed",
72            TaskStatus::Stopped => "Stopped",
73        };
74        write!(f, "{}", status_str)
75    }
76}
77
78#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
79pub enum Retry {
80    #[default]
81    Linear,
82    Exponential,
83}
84
85impl fmt::Display for Retry {
86    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87        match self {
88            Retry::Linear => write!(f, "Linear"),
89            Retry::Exponential => write!(f, "Exponential"),
90        }
91    }
92}
93
94fn to_retry(retry_policy: RetryPolicy) -> (Retry, LinearInterval, ExponentialBase) {
95    match retry_policy.strategy {
96        RetryStrategy::Linear { interval } => (Retry::Linear, interval, Default::default()),
97        RetryStrategy::Exponential { base } => (Retry::Exponential, Default::default(), base),
98    }
99}
100
101impl TaskMeta {
102    pub fn new(
103        task_key: String,
104        task_params: String,
105        queue_name: String,
106        kind: TaskKind,
107        retry_policy: RetryPolicy,
108        is_repeating: bool,
109        delay_seconds: u32,
110    ) -> Self {
111        // Extract retry strategy and intervals from the given retry policy.
112        let (retry_strategy, retry_interval, base_interval) = to_retry(retry_policy);
113        Self {
114            id: generate_token!(),
115            task_key,
116            task_params,
117            queue_name,
118            updated_at: utc_now!(),
119            status: TaskStatus::Scheduled,
120            last_error: Default::default(),
121            last_duration_ms: Default::default(),
122            last_retry_count: Default::default(),
123            last_run: Default::default(),
124            next_run: Default::default(),
125            kind,
126            stopped_reason: Default::default(),
127            success_count: Default::default(),
128            failure_count: Default::default(),
129            runner_id: Default::default(),
130            retry_strategy,
131            retry_interval,
132            base_interval,
133            max_retries: retry_policy.max_retries,
134            is_repeating,
135            heartbeat_at: Default::default(),
136            delay_seconds,
137            created_at: utc_now!(),
138        }
139    }
140
141    pub fn retry_policy(&self) -> RetryPolicy {
142        let strategy = match self.retry_strategy {
143            Retry::Linear => RetryStrategy::Linear {
144                interval: self.retry_interval,
145            },
146            Retry::Exponential => RetryStrategy::Exponential {
147                base: self.base_interval,
148            },
149        };
150
151        RetryPolicy {
152            strategy,
153            max_retries: self.max_retries,
154        }
155    }
156}