backie/
task.rs

1#[cfg(feature = "async_postgres")]
2use crate::schema::backie_tasks;
3use crate::BackoffMode;
4use chrono::DateTime;
5use chrono::Utc;
6use diesel_derive_newtype::DieselNewType;
7use serde::Serialize;
8use std::borrow::Cow;
9use std::fmt;
10use std::fmt::Display;
11use uuid::Uuid;
12
13/// States of a task.
14#[derive(Clone, Debug, Eq, PartialEq)]
15pub enum TaskState {
16    /// The task is ready to be executed.
17    Ready,
18
19    /// The task is running.
20    Running,
21
22    /// The task has failed to execute.
23    Failed(String),
24
25    /// The task finished successfully.
26    Done,
27}
28
29#[derive(Clone, Copy, Debug, Ord, PartialOrd, Hash, PartialEq, Eq, DieselNewType, Serialize)]
30pub struct TaskId(Uuid);
31
32impl Display for TaskId {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
34        write!(f, "{}", self.0)
35    }
36}
37
38impl From<Uuid> for TaskId {
39    fn from(value: Uuid) -> Self {
40        Self(value)
41    }
42}
43
44impl From<TaskId> for Uuid {
45    fn from(value: TaskId) -> Self {
46        value.0
47    }
48}
49
50#[derive(Clone, Debug, Hash, PartialEq, Eq, DieselNewType, Serialize)]
51pub struct TaskHash(Cow<'static, str>);
52
53impl TaskHash {
54    pub fn new<T: Into<String>>(hash: T) -> Self {
55        TaskHash(Cow::Owned(hash.into()))
56    }
57}
58
59impl<'a> From<&'a TaskHash> for &'a str {
60    fn from(value: &'a TaskHash) -> Self {
61        &value.0
62    }
63}
64
65#[derive(Debug, Eq, PartialEq, Clone)]
66#[cfg_attr(
67    feature = "async_postgres",
68    derive(diesel::Queryable, diesel::Identifiable)
69)]
70#[cfg_attr(feature = "async_postgres", diesel(table_name = backie_tasks))]
71pub struct Task {
72    /// Unique identifier of the task.
73    pub id: TaskId,
74
75    /// Name of the type of task.
76    pub task_name: String,
77
78    /// Queue name that the task belongs to.
79    pub queue_name: String,
80
81    /// Unique hash is used to identify and avoid duplicate tasks.
82    pub uniq_hash: Option<TaskHash>,
83
84    /// Representation of the task.
85    pub payload: serde_json::Value,
86
87    /// Max timeout that the task can run for.
88    pub timeout_msecs: i64,
89
90    /// Creation time of the task.
91    pub created_at: DateTime<Utc>,
92
93    /// Date time when the task is scheduled to run.
94    pub scheduled_at: DateTime<Utc>,
95
96    /// Date time when the task is started to run.
97    pub running_at: Option<DateTime<Utc>>,
98
99    /// Date time when the task is finished.
100    pub done_at: Option<DateTime<Utc>>,
101
102    /// Failure reason, when the task is failed.
103    pub error_info: Option<serde_json::Value>,
104
105    /// Number of times a task was retried.
106    pub retries: i32,
107
108    /// Maximum number of retries allow for this task before it is maked as failure.
109    pub max_retries: i32,
110
111    /// Backoff mode for this task.
112    pub backoff_mode: serde_json::Value, // We use JSON to allow the backoff mode to be changed in the future and maybe hold configuration values
113}
114
115impl Task {
116    pub fn state(&self) -> TaskState {
117        if self.done_at.is_some() {
118            if self.error_info.is_some() {
119                // TODO: use a proper error type
120                TaskState::Failed(self.error_info.clone().unwrap().to_string())
121            } else {
122                TaskState::Done
123            }
124        } else if self.running_at.is_some() {
125            TaskState::Running
126        } else {
127            TaskState::Ready
128        }
129    }
130
131    pub fn backoff_mode(&self) -> BackoffMode {
132        serde_json::from_value(self.backoff_mode.clone()).expect("Invalid backoff mode")
133    }
134}
135
136#[derive(Debug, Eq, PartialEq, Clone)]
137#[cfg_attr(feature = "async_postgres", derive(diesel::Insertable))]
138#[cfg_attr(feature = "async_postgres", diesel(table_name = backie_tasks))]
139pub struct NewTask {
140    task_name: String,
141    queue_name: String,
142    uniq_hash: Option<TaskHash>,
143    payload: serde_json::Value,
144    timeout_msecs: i64,
145    max_retries: i32,
146    backoff_mode: serde_json::Value,
147}
148
149impl NewTask {
150    #[cfg(feature = "async_postgres")]
151    pub(crate) fn with_timeout<T>(
152        background_task: T,
153        timeout: std::time::Duration,
154    ) -> Result<Self, serde_json::Error>
155    where
156        T: crate::BackgroundTask,
157    {
158        let uniq_hash = background_task.uniq();
159        let payload = serde_json::to_value(background_task)?;
160
161        Ok(Self {
162            task_name: T::TASK_NAME.to_string(),
163            queue_name: T::QUEUE.to_string(),
164            uniq_hash,
165            payload,
166            timeout_msecs: timeout.as_millis() as i64,
167            max_retries: T::MAX_RETRIES,
168            backoff_mode: serde_json::to_value(T::BACKOFF_MODE)?,
169        })
170    }
171
172    #[cfg(feature = "async_postgres")]
173    pub(crate) fn new<T>(background_task: T) -> Result<Self, serde_json::Error>
174    where
175        T: crate::BackgroundTask,
176    {
177        use std::time::Duration;
178
179        Self::with_timeout(background_task, Duration::from_secs(120))
180    }
181}
182
183#[cfg(test)]
184impl From<NewTask> for Task {
185    fn from(new_task: NewTask) -> Self {
186        Self {
187            id: TaskId(Uuid::new_v4()),
188            task_name: new_task.task_name,
189            queue_name: new_task.queue_name,
190            uniq_hash: new_task.uniq_hash,
191            payload: new_task.payload,
192            timeout_msecs: new_task.timeout_msecs,
193            created_at: Utc::now(),
194            scheduled_at: Utc::now(),
195            running_at: None,
196            done_at: None,
197            error_info: None,
198            retries: 0,
199            max_retries: new_task.max_retries,
200            backoff_mode: new_task.backoff_mode,
201        }
202    }
203}
204
205pub struct CurrentTask {
206    id: TaskId,
207    retries: i32,
208    created_at: DateTime<Utc>,
209}
210
211impl CurrentTask {
212    pub(crate) fn new(task: &Task) -> Self {
213        Self {
214            id: task.id,
215            retries: task.retries,
216            created_at: task.created_at,
217        }
218    }
219
220    pub fn id(&self) -> TaskId {
221        self.id
222    }
223
224    pub fn retry_count(&self) -> i32 {
225        self.retries
226    }
227
228    pub fn created_at(&self) -> DateTime<Utc> {
229        self.created_at
230    }
231}