1use std::str::FromStr;
2
3use apalis_core::{
4 backend::codec::Codec,
5 error::BoxDynError,
6 task::{Task, attempt::Attempt, builder::TaskBuilder, status::Status, task_id::TaskId},
7};
8use chrono::{DateTime, Utc};
9
10use crate::context::SqlContext;
11
12#[derive(Debug, thiserror::Error)]
14pub enum FromRowError {
15 #[error("Column not found: {0}")]
17 ColumnNotFound(String),
18
19 #[error("Decode error: {0}")]
21 DecodeError(#[from] BoxDynError),
22}
23
24#[derive(Debug)]
25pub struct TaskRow {
31 pub job: Vec<u8>,
33 pub id: String,
35 pub job_type: String,
37 pub status: String,
39 pub attempts: usize,
41 pub max_attempts: Option<usize>,
43 pub run_at: Option<DateTime<Utc>>,
45 pub last_result: Option<serde_json::Value>,
47 pub lock_at: Option<DateTime<Utc>>,
49 pub lock_by: Option<String>,
51 pub done_at: Option<DateTime<Utc>>,
53 pub priority: Option<usize>,
55 pub metadata: Option<serde_json::Value>,
57}
58
59impl TaskRow {
60 pub fn try_into_task<D, Args, IdType>(
62 self,
63 ) -> Result<Task<Args, SqlContext, IdType>, FromRowError>
64 where
65 D::Error: Into<BoxDynError> + Send + Sync + 'static,
66 IdType: FromStr,
67 <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
68 D: Codec<Args, Compact = Vec<u8>>,
69 Args: 'static,
70 {
71 let ctx = SqlContext::default()
72 .with_done_at(self.done_at.map(|dt| dt.timestamp()))
73 .with_lock_by(self.lock_by)
74 .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
75 .with_last_result(self.last_result)
76 .with_priority(self.priority.unwrap_or(0) as i32)
77 .with_meta(
78 self.metadata
79 .map(|m| {
80 serde_json::to_value(&m)
81 .unwrap_or_default()
82 .as_object()
83 .cloned()
84 .unwrap_or_default()
85 })
86 .unwrap_or_default(),
87 )
88 .with_queue(self.job_type)
89 .with_lock_at(self.lock_at.map(|dt| dt.timestamp()));
90
91 let args = D::decode(&self.job).map_err(|e| FromRowError::DecodeError(e.into()))?;
92 let task = TaskBuilder::new(args)
93 .with_ctx(ctx)
94 .with_attempt(Attempt::new_with_value(self.attempts))
95 .with_status(
96 Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
97 )
98 .with_task_id(
99 TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
100 )
101 .run_at_timestamp(
102 self.run_at
103 .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
104 .timestamp() as u64,
105 );
106 Ok(task.build())
107 }
108
109 pub fn try_into_task_compact<IdType>(
111 self,
112 ) -> Result<Task<Vec<u8>, SqlContext, IdType>, FromRowError>
113 where
114 IdType: FromStr,
115 <IdType as FromStr>::Err: std::error::Error + Send + Sync + 'static,
116 {
117 let ctx = SqlContext::default()
118 .with_done_at(self.done_at.map(|dt| dt.timestamp()))
119 .with_lock_by(self.lock_by)
120 .with_max_attempts(self.max_attempts.unwrap_or(25) as i32)
121 .with_last_result(self.last_result)
122 .with_priority(self.priority.unwrap_or(0) as i32)
123 .with_meta(
124 self.metadata
125 .map(|m| m.as_object().cloned().unwrap())
126 .unwrap_or_default(),
127 )
128 .with_queue(self.job_type)
129 .with_lock_at(self.lock_at.map(|dt| dt.timestamp()));
130
131 let task = TaskBuilder::new(self.job)
132 .with_ctx(ctx)
133 .with_attempt(Attempt::new_with_value(self.attempts))
134 .with_status(
135 Status::from_str(&self.status).map_err(|e| FromRowError::DecodeError(e.into()))?,
136 )
137 .with_task_id(
138 TaskId::from_str(&self.id).map_err(|e| FromRowError::DecodeError(e.into()))?,
139 )
140 .run_at_timestamp(
141 self.run_at
142 .ok_or(FromRowError::ColumnNotFound("run_at".to_owned()))?
143 .timestamp() as u64,
144 );
145 Ok(task.build())
146 }
147}