1use apalis_sql::from_row::TaskRow;
4use chrono::{TimeZone, Utc};
5use libsql::Row;
6
7use crate::LibsqlError;
8
9#[derive(Debug)]
11pub struct LibsqlTaskRow {
12 pub job: Vec<u8>,
13 pub id: Option<String>,
14 pub job_type: Option<String>,
15 pub status: Option<String>,
16 pub attempts: Option<i64>,
17 pub max_attempts: Option<i64>,
18 pub run_at: Option<i64>,
19 pub last_error: Option<String>,
20 pub lock_at: Option<i64>,
21 pub lock_by: Option<String>,
22 pub done_at: Option<i64>,
23 pub priority: Option<i64>,
24 pub metadata: Option<String>,
25}
26
27impl LibsqlTaskRow {
28 pub fn from_row(row: &Row) -> Result<Self, LibsqlError> {
33 Ok(Self {
34 job: row.get::<Vec<u8>>(0).map_err(LibsqlError::Database)?,
35 id: row
36 .get::<Option<String>>(1)
37 .map_err(LibsqlError::Database)?,
38 job_type: row
39 .get::<Option<String>>(2)
40 .map_err(LibsqlError::Database)?,
41 status: row
42 .get::<Option<String>>(3)
43 .map_err(LibsqlError::Database)?,
44 attempts: row.get::<Option<i64>>(4).map_err(LibsqlError::Database)?,
45 max_attempts: row.get::<Option<i64>>(5).map_err(LibsqlError::Database)?,
46 run_at: row.get::<Option<i64>>(6).map_err(LibsqlError::Database)?,
47 last_error: row
48 .get::<Option<String>>(7)
49 .map_err(LibsqlError::Database)?,
50 lock_at: row.get::<Option<i64>>(8).map_err(LibsqlError::Database)?,
51 lock_by: row
52 .get::<Option<String>>(9)
53 .map_err(LibsqlError::Database)?,
54 done_at: row.get::<Option<i64>>(10).map_err(LibsqlError::Database)?,
55 priority: row.get::<Option<i64>>(11).map_err(LibsqlError::Database)?,
56 metadata: row
57 .get::<Option<String>>(12)
58 .map_err(LibsqlError::Database)?,
59 })
60 }
61}
62
63impl TryFrom<LibsqlTaskRow> for TaskRow {
64 type Error = LibsqlError;
65
66 fn try_from(row: LibsqlTaskRow) -> Result<Self, Self::Error> {
67 Ok(TaskRow {
68 job: row.job,
69 id: row
70 .id
71 .ok_or_else(|| LibsqlError::Other("Missing id".into()))?,
72 job_type: row
73 .job_type
74 .ok_or_else(|| LibsqlError::Other("Missing job_type".into()))?,
75 status: row
76 .status
77 .ok_or_else(|| LibsqlError::Other("Missing status".into()))?,
78 attempts: row
79 .attempts
80 .ok_or_else(|| LibsqlError::Other("Missing attempts".into()))?
81 .try_into()
82 .map_err(|e| LibsqlError::Other(format!("Attempts overflow: {}", e)))?,
83 max_attempts: row
84 .max_attempts
85 .map(|v| v.try_into())
86 .transpose()
87 .map_err(|e| LibsqlError::Other(format!("Max attempts overflow: {}", e)))?,
88 run_at: row.run_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single()),
89 last_result: row
90 .last_error
91 .as_ref()
92 .and_then(|res| match serde_json::from_str(res) {
93 Ok(result) => Some(result),
94 Err(e) => {
95 log::warn!("Failed to parse last_error JSON: {}", e);
96 None
97 }
98 }),
99 lock_at: row.lock_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single()),
100 lock_by: row.lock_by,
101 done_at: row.done_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single()),
102 priority: row
103 .priority
104 .map(|v| v.try_into())
105 .transpose()
106 .map_err(|e| LibsqlError::Other(format!("Priority overflow: {}", e)))?,
107 metadata: row
108 .metadata
109 .as_ref()
110 .and_then(|m| match serde_json::from_str(m) {
111 Ok(meta) => Some(meta),
112 Err(e) => {
113 log::warn!("Failed to parse metadata JSON: {}", e);
114 None
115 }
116 }),
117 })
118 }
119}