persistent_scheduler/core/
model.rs

1use crate::core::task_kind::TaskKind;
2use crate::{
3    core::retry::{RetryPolicy, RetryStrategy},
4    generate_token, utc_now,
5};
6use serde::{Deserialize, Serialize};
7use std::fmt;
8
9type LinearInterval = u32;
10type ExponentialBase = u32;
11
12#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
13pub struct TaskMeta {
14    pub id: String,                     // Unique identifier for the task
15    pub task_key: String,               // Key to identify the specific task
16    pub task_params: String,            // Arguments for the task, stored as a JSON object
17    pub queue_name: String,             // Name of the queue for the task
18    pub updated_at: i64,                // Timestamp of the last update
19    pub status: TaskStatus,             // Current status of the task
20    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
21    pub last_error: Option<String>,     // Error message from the last execution, if any
22    pub last_run: i64,                  // Timestamp of the last run
23    pub next_run: i64,                  // Timestamp of the next scheduled run
24    pub kind: TaskKind,                 // Type of the task
25    pub success_count: u32,             // Count of successful runs
26    pub failure_count: u32,             // Count of failed runs
27    pub runner_id: Option<String>,      // The ID of the current task runner, may be None
28    pub retry_strategy: Retry,          // Retry strategy for handling failures
29    pub retry_interval: u32,            // Interval for retrying the task
30    pub base_interval: u32,             // Base interval for exponential backoff
31    pub delay_seconds: u32,             //Delay before executing a Once task, specified in seconds
32    pub max_retries: Option<u32>,       // Maximum number of retries allowed
33    pub is_repeating: bool,             // Indicates if the task is repeating
34    pub heartbeat_at: i64,              // Timestamp of the last heartbeat in milliseconds
35    pub created_at: i64,                // Timestamp of the task creation
36}
37
38#[derive(Clone, Debug, Eq, Default, PartialEq, Serialize, Deserialize, Hash)]
39pub enum TaskStatus {
40    // Task has been scheduled but has not started executing yet.
41    #[default]
42    Scheduled,
43
44    // Task is currently running.
45    Running,
46
47    // Task has completed successfully.
48    Success,
49
50    // Task has failed.
51    Failed,
52
53    // Task has been marked for removal and will be cleaned up by a dedicated thread.
54    Removed,
55
56    // Task has been stopped (applicable to Repeat and Cron type tasks).
57    Stopped,
58}
59
60impl fmt::Display for TaskStatus {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        let status_str = match self {
63            TaskStatus::Scheduled => "Scheduled",
64            TaskStatus::Running => "Running",
65            TaskStatus::Success => "Success",
66            TaskStatus::Failed => "Failed",
67            TaskStatus::Removed => "Removed",
68            TaskStatus::Stopped => "Stopped",
69        };
70        write!(f, "{}", status_str)
71    }
72}
73
74#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
75pub enum Retry {
76    #[default]
77    Linear,
78    Exponential,
79}
80
81impl fmt::Display for Retry {
82    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83        match self {
84            Retry::Linear => write!(f, "Linear"),
85            Retry::Exponential => write!(f, "Exponential"),
86        }
87    }
88}
89
90fn to_retry(retry_policy: RetryPolicy) -> (Retry, LinearInterval, ExponentialBase) {
91    match retry_policy.strategy {
92        RetryStrategy::Linear { interval } => (Retry::Linear, interval, Default::default()),
93        RetryStrategy::Exponential { base } => (Retry::Exponential, Default::default(), base),
94    }
95}
96
97impl TaskMeta {
98    pub fn new(
99        task_key: String,
100        task_params: String,
101        queue_name: String,
102        kind: TaskKind,
103        retry_policy: RetryPolicy,
104        is_repeating: bool,
105        delay_seconds: u32,
106    ) -> Self {
107        // Extract retry strategy and intervals from the given retry policy.
108        let (retry_strategy, retry_interval, base_interval) = to_retry(retry_policy);
109        Self {
110            id: generate_token!(),
111            task_key,
112            task_params,
113            queue_name,
114            updated_at: utc_now!(),
115            status: TaskStatus::Scheduled,
116            last_error: Default::default(),
117            last_run: Default::default(),
118            next_run: Default::default(),
119            kind,
120            stopped_reason: Default::default(),
121            success_count: Default::default(),
122            failure_count: Default::default(),
123            runner_id: Default::default(),
124            retry_strategy,
125            retry_interval,
126            base_interval,
127            max_retries: retry_policy.max_retries,
128            is_repeating,
129            heartbeat_at: Default::default(),
130            delay_seconds,
131            created_at: utc_now!(),
132        }
133    }
134
135    pub fn retry_policy(&self) -> RetryPolicy {
136        let strategy = match self.retry_strategy {
137            Retry::Linear => RetryStrategy::Linear {
138                interval: self.retry_interval,
139            },
140            Retry::Exponential => RetryStrategy::Exponential {
141                base: self.base_interval,
142            },
143        };
144
145        RetryPolicy {
146            strategy,
147            max_retries: self.max_retries,
148        }
149    }
150}