apalis_sql/
from_row.rs

1use std::str::FromStr;
2
3use apalis_core::{
4    backend::codec::Codec,
5    error::BoxDynError,
6    task::{Task, attempt::Attempt, builder::TaskBuilder, status::Status, task_id::TaskId},
7};
8use chrono::{DateTime, Utc};
9
10use crate::context::SqlContext;
11
12/// Errors that can occur when converting a database row into a Task
13#[derive(Debug, thiserror::Error)]
14pub enum FromRowError {
15    /// Column not found in the row
16    #[error("Column not found: {0}")]
17    ColumnNotFound(String),
18
19    /// Error decoding the job data
20    #[error("Decode error: {0}")]
21    DecodeError(#[from] BoxDynError),
22}
23
24#[derive(Debug)]
25/// Represents a row from the tasks table in the database.
26///
27/// This struct contains all the fields necessary to represent a task/job
28/// stored in the SQL database, including its execution state, metadata,
29/// and scheduling information.
30pub struct TaskRow {
31    /// The serialized job data as bytes
32    pub job: Vec<u8>,
33    /// Unique identifier for the task
34    pub id: String,
35    /// The type/name of the job being executed
36    pub job_type: String,
37    /// Current status of the task (e.g., "pending", "running", "completed", "failed")
38    pub status: String,
39    /// Number of times this task has been attempted
40    pub attempts: usize,
41    /// Maximum number of attempts allowed for this task before giving up
42    pub max_attempts: Option<usize>,
43    /// When the task should be executed (for scheduled tasks)
44    pub run_at: Option<DateTime<Utc>>,
45    /// The result of the last execution attempt, stored as JSON
46    pub last_result: Option<serde_json::Value>,
47    /// Timestamp when the task was locked for execution
48    pub lock_at: Option<DateTime<Utc>>,
49    /// Identifier of the worker/process that has locked this task
50    pub lock_by: Option<String>,
51    /// Timestamp when the task was completed
52    pub done_at: Option<DateTime<Utc>>,
53    /// Priority level of the task (higher values indicate higher priority)
54    pub priority: Option<usize>,
55    /// Additional metadata associated with the task, stored as JSON
56    pub metadata: Option<serde_json::Value>,
57}
58
59impl TaskRow {
60    /// Convert the TaskRow into a Task with decoded arguments
61    pub fn try_into_task<D, Args, IdType>(
62        self,
63    ) -> Result<Task<Args, SqlContext, IdType>, FromRowError>
64    where
65        D::Error: Into<BoxDynError> + Send + Sync + 'static,
66        IdType: FromStr,
67        <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
68        D: Codec<Args, Compact = Vec<u8>>,
69        Args: 'static,
70    {
71        let ctx = SqlContext::default()
72            .with_done_at(self.done_at.map(|dt| dt.timestamp()))
73            .with_lock_by(self.lock_by)
74            .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
75            .with_last_result(self.last_result)
76            .with_priority(self.priority.unwrap_or(0) as i32)
77            .with_meta(
78                self.metadata
79                    .map(|m| {
80                        serde_json::to_value(&m)
81                            .unwrap_or_default()
82                            .as_object()
83                            .cloned()
84                            .unwrap_or_default()
85                    })
86                    .unwrap_or_default(),
87            )
88            .with_queue(self.job_type)
89            .with_lock_at(self.lock_at.map(|dt| dt.timestamp()));
90
91        let args = D::decode(&self.job).map_err(|e| FromRowError::DecodeError(e.into()))?;
92        let task = TaskBuilder::new(args)
93            .with_ctx(ctx)
94            .with_attempt(Attempt::new_with_value(self.attempts))
95            .with_status(
96                Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
97            )
98            .with_task_id(
99                TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
100            )
101            .run_at_timestamp(
102                self.run_at
103                    .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
104                    .timestamp() as u64,
105            );
106        Ok(task.build())
107    }
108
109    /// Convert the TaskRow into a Task with compacted arguments
110    pub fn try_into_task_compact<IdType>(
111        self,
112    ) -> Result<Task<Vec<u8>, SqlContext, IdType>, FromRowError>
113    where
114        IdType: FromStr,
115        <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
116    {
117        let ctx = SqlContext::default()
118            .with_done_at(self.done_at.map(|dt| dt.timestamp()))
119            .with_lock_by(self.lock_by)
120            .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
121            .with_last_result(self.last_result)
122            .with_priority(self.priority.unwrap_or(0) as i32)
123            .with_meta(
124                self.metadata
125                    .map(|m| m.as_object().cloned().unwrap())
126                    .unwrap_or_default(),
127            )
128            .with_queue(self.job_type)
129            .with_lock_at(self.lock_at.map(|dt| dt.timestamp()));
130
131        let task = TaskBuilder::new(self.job)
132            .with_ctx(ctx)
133            .with_attempt(Attempt::new_with_value(self.attempts))
134            .with_status(
135                Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
136            )
137            .with_task_id(
138                TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
139            )
140            .run_at_timestamp(
141                self.run_at
142                    .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
143                    .timestamp() as u64,
144            );
145        Ok(task.build())
146    }
147}