apalis_libsql/
row.rs

1//! Row mapping for libSQL database rows to TaskRow
2
3use apalis_sql::from_row::TaskRow;
4use chrono::{TimeZone, Utc};
5use libsql::Row;
6
7use crate::LibsqlError;
8
9/// A task row from the libSQL database
10#[derive(Debug)]
11pub struct LibsqlTaskRow {
12    pub job: Vec<u8>,
13    pub id: Option<String>,
14    pub job_type: Option<String>,
15    pub status: Option<String>,
16    pub attempts: Option<i64>,
17    pub max_attempts: Option<i64>,
18    pub run_at: Option<i64>,
19    pub last_error: Option<String>,
20    pub lock_at: Option<i64>,
21    pub lock_by: Option<String>,
22    pub done_at: Option<i64>,
23    pub priority: Option<i64>,
24    pub metadata: Option<String>,
25}
26
27impl LibsqlTaskRow {
28    /// Create a LibsqlTaskRow from a libsql::Row
29    ///
30    /// Expects columns in the order returned by the fetch_next query:
31    /// job, id, job_type, status, attempts, max_attempts, run_at, last_error, lock_at, lock_by, done_at, priority, metadata
32    pub fn from_row(row: &Row) -> Result<Self, LibsqlError> {
33        Ok(Self {
34            job: row.get::<Vec<u8>>(0).map_err(LibsqlError::Database)?,
35            id: row
36                .get::<Option<String>>(1)
37                .map_err(LibsqlError::Database)?,
38            job_type: row
39                .get::<Option<String>>(2)
40                .map_err(LibsqlError::Database)?,
41            status: row
42                .get::<Option<String>>(3)
43                .map_err(LibsqlError::Database)?,
44            attempts: row.get::<Option<i64>>(4).map_err(LibsqlError::Database)?,
45            max_attempts: row.get::<Option<i64>>(5).map_err(LibsqlError::Database)?,
46            run_at: row.get::<Option<i64>>(6).map_err(LibsqlError::Database)?,
47            last_error: row
48                .get::<Option<String>>(7)
49                .map_err(LibsqlError::Database)?,
50            lock_at: row.get::<Option<i64>>(8).map_err(LibsqlError::Database)?,
51            lock_by: row
52                .get::<Option<String>>(9)
53                .map_err(LibsqlError::Database)?,
54            done_at: row.get::<Option<i64>>(10).map_err(LibsqlError::Database)?,
55            priority: row.get::<Option<i64>>(11).map_err(LibsqlError::Database)?,
56            metadata: row
57                .get::<Option<String>>(12)
58                .map_err(LibsqlError::Database)?,
59        })
60    }
61}
62
63impl TryFrom<LibsqlTaskRow> for TaskRow {
64    type Error = LibsqlError;
65
66    fn try_from(row: LibsqlTaskRow) -> Result<Self, Self::Error> {
67        Ok(TaskRow {
68            job: row.job,
69            id: row
70                .id
71                .ok_or_else(|| LibsqlError::Other("Missing id".into()))?,
72            job_type: row
73                .job_type
74                .ok_or_else(|| LibsqlError::Other("Missing job_type".into()))?,
75            status: row
76                .status
77                .ok_or_else(|| LibsqlError::Other("Missing status".into()))?,
78            attempts: row
79                .attempts
80                .ok_or_else(|| LibsqlError::Other("Missing attempts".into()))?
81                .try_into()
82                .map_err(|e| LibsqlError::Other(format!("Attempts overflow: {}", e)))?,
83            max_attempts: row
84                .max_attempts
85                .map(|v| v.try_into())
86                .transpose()
87                .map_err(|e| LibsqlError::Other(format!("Max attempts overflow: {}", e)))?,
88            run_at: row.run_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single()),
89            last_result: row
90                .last_error
91                .as_ref()
92                .and_then(|res| match serde_json::from_str(res) {
93                    Ok(result) => Some(result),
94                    Err(e) => {
95                        log::warn!("Failed to parse last_error JSON: {}", e);
96                        None
97                    }
98                }),
99            lock_at: row.lock_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single()),
100            lock_by: row.lock_by,
101            done_at: row.done_at.and_then(|ts| Utc.timestamp_opt(ts, 0).single()),
102            priority: row
103                .priority
104                .map(|v| v.try_into())
105                .transpose()
106                .map_err(|e| LibsqlError::Other(format!("Priority overflow: {}", e)))?,
107            metadata: row
108                .metadata
109                .as_ref()
110                .and_then(|m| match serde_json::from_str(m) {
111                    Ok(meta) => Some(meta),
112                    Err(e) => {
113                        log::warn!("Failed to parse metadata JSON: {}", e);
114                        None
115                    }
116                }),
117        })
118    }
119}