1use std::str::FromStr;
2
3use apalis_core::{
4 backend::codec::Codec,
5 error::BoxDynError,
6 task::{Task, attempt::Attempt, builder::TaskBuilder, status::Status, task_id::TaskId},
7};
8
9use crate::context::SqlContext;
10use crate::datetime::{DateTime, DateTimeExt};
11
12#[derive(Debug, thiserror::Error)]
14pub enum FromRowError {
15 #[error("Column not found: {0}")]
17 ColumnNotFound(String),
18
19 #[error("Decode error: {0}")]
21 DecodeError(#[from] BoxDynError),
22}
23
24#[derive(Debug)]
25pub struct TaskRow {
31 pub job: Vec<u8>,
33 pub id: String,
35 pub job_type: String,
37 pub status: String,
39 pub attempts: usize,
41 pub max_attempts: Option<usize>,
43 pub run_at: Option<DateTime>,
45 pub last_result: Option<serde_json::Value>,
47 pub lock_at: Option<DateTime>,
49 pub lock_by: Option<String>,
51 pub done_at: Option<DateTime>,
53 pub priority: Option<usize>,
55 pub metadata: Option<serde_json::Value>,
57 pub idempotency_key: Option<String>,
59}
60
61impl TaskRow {
62 pub fn try_into_task<D, Args, IdType, Pool>(
64 self,
65 ) -> Result<Task<Args, SqlContext<Pool>, IdType>, FromRowError>
66 where
67 D::Error: Into<BoxDynError> + Send + Sync + 'static,
68 IdType: FromStr,
69 <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
70 D: Codec<Args, Compact = Vec<u8>>,
71 Args: 'static,
72 {
73 let ctx = SqlContext::default()
74 .with_done_at(self.done_at.map(|dt| dt.to_unix_timestamp()))
75 .with_lock_by(self.lock_by)
76 .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
77 .with_last_result(self.last_result)
78 .with_priority(self.priority.unwrap_or(0) as i32)
79 .with_meta(
80 self.metadata
81 .map(|m| {
82 serde_json::to_value(&m)
83 .unwrap_or_default()
84 .as_object()
85 .cloned()
86 .unwrap_or_default()
87 })
88 .unwrap_or_default(),
89 )
90 .with_queue(self.job_type)
91 .with_lock_at(self.lock_at.map(|dt| dt.to_unix_timestamp()));
92
93 let args = D::decode(&self.job).map_err(|e| FromRowError::DecodeError(e.into()))?;
94 let mut task = TaskBuilder::new(args)
95 .with_ctx(ctx)
96 .with_attempt(Attempt::new_with_value(self.attempts))
97 .with_status(
98 Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
99 )
100 .with_task_id(
101 TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
102 )
103 .run_at_timestamp(
104 self.run_at
105 .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
106 .to_unix_timestamp() as u64,
107 );
108
109 if let Some(key) = self.idempotency_key {
110 task = task.with_idempotency_key(key);
111 }
112 Ok(task.build())
113 }
114
115 pub fn try_into_task_compact<IdType, Pool>(
117 self,
118 ) -> Result<Task<Vec<u8>, SqlContext<Pool>, IdType>, FromRowError>
119 where
120 IdType: FromStr,
121 <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
122 {
123 let ctx = SqlContext::default()
124 .with_done_at(self.done_at.map(|dt| dt.to_unix_timestamp()))
125 .with_lock_by(self.lock_by)
126 .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
127 .with_last_result(self.last_result)
128 .with_priority(self.priority.unwrap_or(0) as i32)
129 .with_meta(
130 self.metadata
131 .map(|m| m.as_object().cloned().unwrap())
132 .unwrap_or_default(),
133 )
134 .with_queue(self.job_type)
135 .with_lock_at(self.lock_at.map(|dt| dt.to_unix_timestamp()));
136
137 let mut task = TaskBuilder::new(self.job)
138 .with_ctx(ctx)
139 .with_attempt(Attempt::new_with_value(self.attempts))
140 .with_status(
141 Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
142 )
143 .with_task_id(
144 TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
145 )
146 .run_at_timestamp(
147 self.run_at
148 .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
149 .to_unix_timestamp() as u64,
150 );
151
152 if let Some(key) = self.idempotency_key {
153 task = task.with_idempotency_key(key);
154 }
155 Ok(task.build())
156 }
157}