persistent_scheduler/core/
model.rs1use crate::core::task_kind::TaskKind;
2use crate::{
3 core::retry::{RetryPolicy, RetryStrategy},
4 generate_token, utc_now,
5};
6use serde::{Deserialize, Serialize};
7use std::fmt;
8
9type LinearInterval = u32;
10type ExponentialBase = u32;
11
12#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
13pub struct TaskMeta {
14 pub id: String, pub task_key: String, pub task_params: String, pub queue_name: String, pub updated_at: i64, pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_run: i64, pub next_run: i64, pub kind: TaskKind, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub is_repeating: bool, pub heartbeat_at: i64, pub created_at: i64, }
37
38#[derive(Clone, Debug, Eq, Default, PartialEq, Serialize, Deserialize, Hash)]
39pub enum TaskStatus {
40 #[default]
42 Scheduled,
43
44 Running,
46
47 Success,
49
50 Failed,
52
53 Removed,
55
56 Stopped,
58}
59
60impl fmt::Display for TaskStatus {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 let status_str = match self {
63 TaskStatus::Scheduled => "Scheduled",
64 TaskStatus::Running => "Running",
65 TaskStatus::Success => "Success",
66 TaskStatus::Failed => "Failed",
67 TaskStatus::Removed => "Removed",
68 TaskStatus::Stopped => "Stopped",
69 };
70 write!(f, "{}", status_str)
71 }
72}
73
74#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
75pub enum Retry {
76 #[default]
77 Linear,
78 Exponential,
79}
80
81impl fmt::Display for Retry {
82 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83 match self {
84 Retry::Linear => write!(f, "Linear"),
85 Retry::Exponential => write!(f, "Exponential"),
86 }
87 }
88}
89
90fn to_retry(retry_policy: RetryPolicy) -> (Retry, LinearInterval, ExponentialBase) {
91 match retry_policy.strategy {
92 RetryStrategy::Linear { interval } => (Retry::Linear, interval, Default::default()),
93 RetryStrategy::Exponential { base } => (Retry::Exponential, Default::default(), base),
94 }
95}
96
97impl TaskMeta {
98 pub fn new(
99 task_key: String,
100 task_params: String,
101 queue_name: String,
102 kind: TaskKind,
103 retry_policy: RetryPolicy,
104 is_repeating: bool,
105 delay_seconds: u32,
106 ) -> Self {
107 let (retry_strategy, retry_interval, base_interval) = to_retry(retry_policy);
109 Self {
110 id: generate_token!(),
111 task_key,
112 task_params,
113 queue_name,
114 updated_at: utc_now!(),
115 status: TaskStatus::Scheduled,
116 last_error: Default::default(),
117 last_run: Default::default(),
118 next_run: Default::default(),
119 kind,
120 stopped_reason: Default::default(),
121 success_count: Default::default(),
122 failure_count: Default::default(),
123 runner_id: Default::default(),
124 retry_strategy,
125 retry_interval,
126 base_interval,
127 max_retries: retry_policy.max_retries,
128 is_repeating,
129 heartbeat_at: Default::default(),
130 delay_seconds,
131 created_at: utc_now!(),
132 }
133 }
134
135 pub fn retry_policy(&self) -> RetryPolicy {
136 let strategy = match self.retry_strategy {
137 Retry::Linear => RetryStrategy::Linear {
138 interval: self.retry_interval,
139 },
140 Retry::Exponential => RetryStrategy::Exponential {
141 base: self.base_interval,
142 },
143 };
144
145 RetryPolicy {
146 strategy,
147 max_retries: self.max_retries,
148 }
149 }
150}