persistent_scheduler/core/
model.rs1use crate::core::task_kind::TaskKind;
2use crate::{
3 core::retry::{RetryPolicy, RetryStrategy},
4 generate_token, utc_now,
5};
6use serde::{Deserialize, Serialize};
7use std::fmt;
8
9type LinearInterval = u32;
10type ExponentialBase = u32;
11
12#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
13pub struct TaskMeta {
14 pub id: String, pub task_key: String, pub task_params: String, pub queue_name: String, pub updated_at: i64, pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_duration_ms: Option<usize>,
24 pub last_retry_count: Option<usize>,
26 pub last_run: i64, pub next_run: i64, pub kind: TaskKind, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub is_repeating: bool, pub heartbeat_at: i64, pub created_at: i64, }
41
42#[derive(Clone, Debug, Eq, Default, PartialEq, Serialize, Deserialize, Hash)]
43pub enum TaskStatus {
44 #[default]
46 Scheduled,
47
48 Running,
50
51 Success,
53
54 Failed,
56
57 Removed,
59
60 Stopped,
62}
63
64impl fmt::Display for TaskStatus {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 let status_str = match self {
67 TaskStatus::Scheduled => "Scheduled",
68 TaskStatus::Running => "Running",
69 TaskStatus::Success => "Success",
70 TaskStatus::Failed => "Failed",
71 TaskStatus::Removed => "Removed",
72 TaskStatus::Stopped => "Stopped",
73 };
74 write!(f, "{}", status_str)
75 }
76}
77
78#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
79pub enum Retry {
80 #[default]
81 Linear,
82 Exponential,
83}
84
85impl fmt::Display for Retry {
86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87 match self {
88 Retry::Linear => write!(f, "Linear"),
89 Retry::Exponential => write!(f, "Exponential"),
90 }
91 }
92}
93
94fn to_retry(retry_policy: RetryPolicy) -> (Retry, LinearInterval, ExponentialBase) {
95 match retry_policy.strategy {
96 RetryStrategy::Linear { interval } => (Retry::Linear, interval, Default::default()),
97 RetryStrategy::Exponential { base } => (Retry::Exponential, Default::default(), base),
98 }
99}
100
101impl TaskMeta {
102 pub fn new(
103 task_key: String,
104 task_params: String,
105 queue_name: String,
106 kind: TaskKind,
107 retry_policy: RetryPolicy,
108 is_repeating: bool,
109 delay_seconds: u32,
110 ) -> Self {
111 let (retry_strategy, retry_interval, base_interval) = to_retry(retry_policy);
113 Self {
114 id: generate_token!(),
115 task_key,
116 task_params,
117 queue_name,
118 updated_at: utc_now!(),
119 status: TaskStatus::Scheduled,
120 last_error: Default::default(),
121 last_duration_ms: Default::default(),
122 last_retry_count: Default::default(),
123 last_run: Default::default(),
124 next_run: Default::default(),
125 kind,
126 stopped_reason: Default::default(),
127 success_count: Default::default(),
128 failure_count: Default::default(),
129 runner_id: Default::default(),
130 retry_strategy,
131 retry_interval,
132 base_interval,
133 max_retries: retry_policy.max_retries,
134 is_repeating,
135 heartbeat_at: Default::default(),
136 delay_seconds,
137 created_at: utc_now!(),
138 }
139 }
140
141 pub fn retry_policy(&self) -> RetryPolicy {
142 let strategy = match self.retry_strategy {
143 Retry::Linear => RetryStrategy::Linear {
144 interval: self.retry_interval,
145 },
146 Retry::Exponential => RetryStrategy::Exponential {
147 base: self.base_interval,
148 },
149 };
150
151 RetryPolicy {
152 strategy,
153 max_retries: self.max_retries,
154 }
155 }
156}