Skip to main content

apalis_sql/
from_row.rs

1use std::str::FromStr;
2
3use apalis_core::{
4    backend::codec::Codec,
5    error::BoxDynError,
6    task::{Task, attempt::Attempt, builder::TaskBuilder, status::Status, task_id::TaskId},
7};
8
9use crate::context::SqlContext;
10use crate::datetime::{DateTime, DateTimeExt};
11
12/// Errors that can occur when converting a database row into a Task
13#[derive(Debug, thiserror::Error)]
14pub enum FromRowError {
15    /// Column not found in the row
16    #[error("Column not found: {0}")]
17    ColumnNotFound(String),
18
19    /// Error decoding the job data
20    #[error("Decode error: {0}")]
21    DecodeError(#[from] BoxDynError),
22}
23
24#[derive(Debug)]
25/// Represents a row from the tasks table in the database.
26///
27/// This struct contains all the fields necessary to represent a task/job
28/// stored in the SQL database, including its execution state, metadata,
29/// and scheduling information.
30pub struct TaskRow {
31    /// The serialized job data as bytes
32    pub job: Vec<u8>,
33    /// Unique identifier for the task
34    pub id: String,
35    /// The type/name of the job being executed
36    pub job_type: String,
37    /// Current status of the task (e.g., "pending", "running", "completed", "failed")
38    pub status: String,
39    /// Number of times this task has been attempted
40    pub attempts: usize,
41    /// Maximum number of attempts allowed for this task before giving up
42    pub max_attempts: Option<usize>,
43    /// When the task should be executed (for scheduled tasks)
44    pub run_at: Option<DateTime>,
45    /// The result of the last execution attempt, stored as JSON
46    pub last_result: Option<serde_json::Value>,
47    /// Timestamp when the task was locked for execution
48    pub lock_at: Option<DateTime>,
49    /// Identifier of the worker/process that has locked this task
50    pub lock_by: Option<String>,
51    /// Timestamp when the task was completed
52    pub done_at: Option<DateTime>,
53    /// Priority level of the task (higher values indicate higher priority)
54    pub priority: Option<usize>,
55    /// Additional metadata associated with the task, stored as JSON
56    pub metadata: Option<serde_json::Value>,
57    /// Idempotency key for enforcing uniqueness
58    pub idempotency_key: Option<String>,
59}
60
61impl TaskRow {
62    /// Convert the TaskRow into a Task with decoded arguments
63    pub fn try_into_task<D, Args, IdType, Pool>(
64        self,
65    ) -> Result<Task<Args, SqlContext<Pool>, IdType>, FromRowError>
66    where
67        D::Error: Into<BoxDynError> + Send + Sync + 'static,
68        IdType: FromStr,
69        <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
70        D: Codec<Args, Compact = Vec<u8>>,
71        Args: 'static,
72    {
73        let ctx = SqlContext::default()
74            .with_done_at(self.done_at.map(|dt| dt.to_unix_timestamp()))
75            .with_lock_by(self.lock_by)
76            .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
77            .with_last_result(self.last_result)
78            .with_priority(self.priority.unwrap_or(0) as i32)
79            .with_meta(
80                self.metadata
81                    .map(|m| {
82                        serde_json::to_value(&m)
83                            .unwrap_or_default()
84                            .as_object()
85                            .cloned()
86                            .unwrap_or_default()
87                    })
88                    .unwrap_or_default(),
89            )
90            .with_queue(self.job_type)
91            .with_lock_at(self.lock_at.map(|dt| dt.to_unix_timestamp()));
92
93        let args = D::decode(&self.job).map_err(|e| FromRowError::DecodeError(e.into()))?;
94        let mut task = TaskBuilder::new(args)
95            .with_ctx(ctx)
96            .with_attempt(Attempt::new_with_value(self.attempts))
97            .with_status(
98                Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
99            )
100            .with_task_id(
101                TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
102            )
103            .run_at_timestamp(
104                self.run_at
105                    .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
106                    .to_unix_timestamp() as u64,
107            );
108
109        if let Some(key) = self.idempotency_key {
110            task = task.with_idempotency_key(key);
111        }
112        Ok(task.build())
113    }
114
115    /// Convert the TaskRow into a Task with compacted arguments
116    pub fn try_into_task_compact<IdType, Pool>(
117        self,
118    ) -> Result<Task<Vec<u8>, SqlContext<Pool>, IdType>, FromRowError>
119    where
120        IdType: FromStr,
121        <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
122    {
123        let ctx = SqlContext::default()
124            .with_done_at(self.done_at.map(|dt| dt.to_unix_timestamp()))
125            .with_lock_by(self.lock_by)
126            .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
127            .with_last_result(self.last_result)
128            .with_priority(self.priority.unwrap_or(0) as i32)
129            .with_meta(
130                self.metadata
131                    .map(|m| m.as_object().cloned().unwrap())
132                    .unwrap_or_default(),
133            )
134            .with_queue(self.job_type)
135            .with_lock_at(self.lock_at.map(|dt| dt.to_unix_timestamp()));
136
137        let mut task = TaskBuilder::new(self.job)
138            .with_ctx(ctx)
139            .with_attempt(Attempt::new_with_value(self.attempts))
140            .with_status(
141                Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
142            )
143            .with_task_id(
144                TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
145            )
146            .run_at_timestamp(
147                self.run_at
148                    .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
149                    .to_unix_timestamp() as u64,
150            );
151
152        if let Some(key) = self.idempotency_key {
153            task = task.with_idempotency_key(key);
154        }
155        Ok(task.build())
156    }
157}