apalis_sqlite/queries/
list_tasks.rs

1use apalis_core::{
2    backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3    task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, SqlContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for SqliteStorage<Args, D, F>
11where
12    Self:
13        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14    D: Codec<Args, Compact = CompactType>,
15    D::Error: std::error::Error + Send + Sync + 'static,
16    Args: 'static,
17{
18    fn list_tasks(
19        &self,
20        queue: &str,
21        filter: &Filter,
22    ) -> impl Future<Output = Result<Vec<SqliteTask<Args>>, Self::Error>> + Send {
23        let queue = queue.to_owned();
24        let pool = self.pool.clone();
25        let limit = filter.limit() as i32;
26        let offset = filter.offset() as i32;
27        let status = filter
28            .status
29            .as_ref()
30            .unwrap_or(&Status::Pending)
31            .to_string();
32        async move {
33            let tasks = sqlx::query_file_as!(
34                SqliteTaskRow,
35                "queries/backend/list_jobs.sql",
36                status,
37                queue,
38                limit,
39                offset
40            )
41            .fetch_all(&pool)
42            .await?
43            .into_iter()
44            .map(|r| {
45                let row: TaskRow = r
46                    .try_into()
47                    .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
48                row.try_into_task_compact().and_then(|t| {
49                    t.try_map(|a| D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into())))
50                })
51            })
52            .collect::<Result<Vec<_>, _>>()
53            .map_err(|e| sqlx::Error::Decode(e.into()))?;
54            Ok(tasks)
55        }
56    }
57}
58
59impl<Args, D, F> ListAllTasks for SqliteStorage<Args, D, F>
60where
61    Self:
62        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
63{
64    fn list_all_tasks(
65        &self,
66        filter: &Filter,
67    ) -> impl Future<
68        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
69    > + Send {
70        let status = filter
71            .status
72            .as_ref()
73            .map(|s| s.to_string())
74            .unwrap_or(Status::Pending.to_string());
75        let pool = self.pool.clone();
76        let limit = filter.limit() as i32;
77        let offset = filter.offset() as i32;
78        async move {
79            let tasks = sqlx::query_file_as!(
80                SqliteTaskRow,
81                "queries/backend/list_all_jobs.sql",
82                status,
83                limit,
84                offset
85            )
86            .fetch_all(&pool)
87            .await?
88            .into_iter()
89            .map(|r| {
90                let row: TaskRow = r.try_into()?;
91                row.try_into_task_compact()
92                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
93            })
94            .collect::<Result<Vec<_>, _>>()?;
95            Ok(tasks)
96        }
97    }
98}