1#[cfg(feature = "async_postgres")]
2use crate::schema::backie_tasks;
3use crate::BackoffMode;
4use chrono::DateTime;
5use chrono::Utc;
6use diesel_derive_newtype::DieselNewType;
7use serde::Serialize;
8use std::borrow::Cow;
9use std::fmt;
10use std::fmt::Display;
11use uuid::Uuid;
12
13#[derive(Clone, Debug, Eq, PartialEq)]
15pub enum TaskState {
16 Ready,
18
19 Running,
21
22 Failed(String),
24
25 Done,
27}
28
29#[derive(Clone, Copy, Debug, Ord, PartialOrd, Hash, PartialEq, Eq, DieselNewType, Serialize)]
30pub struct TaskId(Uuid);
31
32impl Display for TaskId {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
34 write!(f, "{}", self.0)
35 }
36}
37
38impl From<Uuid> for TaskId {
39 fn from(value: Uuid) -> Self {
40 Self(value)
41 }
42}
43
44impl From<TaskId> for Uuid {
45 fn from(value: TaskId) -> Self {
46 value.0
47 }
48}
49
50#[derive(Clone, Debug, Hash, PartialEq, Eq, DieselNewType, Serialize)]
51pub struct TaskHash(Cow<'static, str>);
52
53impl TaskHash {
54 pub fn new<T: Into<String>>(hash: T) -> Self {
55 TaskHash(Cow::Owned(hash.into()))
56 }
57}
58
59impl<'a> From<&'a TaskHash> for &'a str {
60 fn from(value: &'a TaskHash) -> Self {
61 &value.0
62 }
63}
64
65#[derive(Debug, Eq, PartialEq, Clone)]
66#[cfg_attr(
67 feature = "async_postgres",
68 derive(diesel::Queryable, diesel::Identifiable)
69)]
70#[cfg_attr(feature = "async_postgres", diesel(table_name = backie_tasks))]
71pub struct Task {
72 pub id: TaskId,
74
75 pub task_name: String,
77
78 pub queue_name: String,
80
81 pub uniq_hash: Option<TaskHash>,
83
84 pub payload: serde_json::Value,
86
87 pub timeout_msecs: i64,
89
90 pub created_at: DateTime<Utc>,
92
93 pub scheduled_at: DateTime<Utc>,
95
96 pub running_at: Option<DateTime<Utc>>,
98
99 pub done_at: Option<DateTime<Utc>>,
101
102 pub error_info: Option<serde_json::Value>,
104
105 pub retries: i32,
107
108 pub max_retries: i32,
110
111 pub backoff_mode: serde_json::Value, }
114
115impl Task {
116 pub fn state(&self) -> TaskState {
117 if self.done_at.is_some() {
118 if self.error_info.is_some() {
119 TaskState::Failed(self.error_info.clone().unwrap().to_string())
121 } else {
122 TaskState::Done
123 }
124 } else if self.running_at.is_some() {
125 TaskState::Running
126 } else {
127 TaskState::Ready
128 }
129 }
130
131 pub fn backoff_mode(&self) -> BackoffMode {
132 serde_json::from_value(self.backoff_mode.clone()).expect("Invalid backoff mode")
133 }
134}
135
136#[derive(Debug, Eq, PartialEq, Clone)]
137#[cfg_attr(feature = "async_postgres", derive(diesel::Insertable))]
138#[cfg_attr(feature = "async_postgres", diesel(table_name = backie_tasks))]
139pub struct NewTask {
140 task_name: String,
141 queue_name: String,
142 uniq_hash: Option<TaskHash>,
143 payload: serde_json::Value,
144 timeout_msecs: i64,
145 max_retries: i32,
146 backoff_mode: serde_json::Value,
147}
148
149impl NewTask {
150 #[cfg(feature = "async_postgres")]
151 pub(crate) fn with_timeout<T>(
152 background_task: T,
153 timeout: std::time::Duration,
154 ) -> Result<Self, serde_json::Error>
155 where
156 T: crate::BackgroundTask,
157 {
158 let uniq_hash = background_task.uniq();
159 let payload = serde_json::to_value(background_task)?;
160
161 Ok(Self {
162 task_name: T::TASK_NAME.to_string(),
163 queue_name: T::QUEUE.to_string(),
164 uniq_hash,
165 payload,
166 timeout_msecs: timeout.as_millis() as i64,
167 max_retries: T::MAX_RETRIES,
168 backoff_mode: serde_json::to_value(T::BACKOFF_MODE)?,
169 })
170 }
171
172 #[cfg(feature = "async_postgres")]
173 pub(crate) fn new<T>(background_task: T) -> Result<Self, serde_json::Error>
174 where
175 T: crate::BackgroundTask,
176 {
177 use std::time::Duration;
178
179 Self::with_timeout(background_task, Duration::from_secs(120))
180 }
181}
182
183#[cfg(test)]
184impl From<NewTask> for Task {
185 fn from(new_task: NewTask) -> Self {
186 Self {
187 id: TaskId(Uuid::new_v4()),
188 task_name: new_task.task_name,
189 queue_name: new_task.queue_name,
190 uniq_hash: new_task.uniq_hash,
191 payload: new_task.payload,
192 timeout_msecs: new_task.timeout_msecs,
193 created_at: Utc::now(),
194 scheduled_at: Utc::now(),
195 running_at: None,
196 done_at: None,
197 error_info: None,
198 retries: 0,
199 max_retries: new_task.max_retries,
200 backoff_mode: new_task.backoff_mode,
201 }
202 }
203}
204
205pub struct CurrentTask {
206 id: TaskId,
207 retries: i32,
208 created_at: DateTime<Utc>,
209}
210
211impl CurrentTask {
212 pub(crate) fn new(task: &Task) -> Self {
213 Self {
214 id: task.id,
215 retries: task.retries,
216 created_at: task.created_at,
217 }
218 }
219
220 pub fn id(&self) -> TaskId {
221 self.id
222 }
223
224 pub fn retry_count(&self) -> i32 {
225 self.retries
226 }
227
228 pub fn created_at(&self) -> DateTime<Utc> {
229 self.created_at
230 }
231}