apalis_sqlite/queries/
list_tasks.rs

1use apalis_core::{
2    backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3    task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, SqliteContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for SqliteStorage<Args, D, F>
11where
12    Self: BackendExt<
13            Context = SqliteContext,
14            Compact = CompactType,
15            IdType = Ulid,
16            Error = sqlx::Error,
17        >,
18    D: Codec<Args, Compact = CompactType>,
19    D::Error: std::error::Error + Send + Sync + 'static,
20    Args: 'static,
21{
22    fn list_tasks(
23        &self,
24        queue: &str,
25        filter: &Filter,
26    ) -> impl Future<Output = Result<Vec<SqliteTask<Args>>, Self::Error>> + Send {
27        let queue = queue.to_owned();
28        let pool = self.pool.clone();
29        let limit = filter.limit() as i32;
30        let offset = filter.offset() as i32;
31        let status = filter
32            .status
33            .as_ref()
34            .unwrap_or(&Status::Pending)
35            .to_string();
36        async move {
37            let tasks = sqlx::query_file_as!(
38                SqliteTaskRow,
39                "queries/backend/list_jobs.sql",
40                status,
41                queue,
42                limit,
43                offset
44            )
45            .fetch_all(&pool)
46            .await?
47            .into_iter()
48            .map(|r| {
49                let row: TaskRow = r
50                    .try_into()
51                    .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
52                row.try_into_task_compact().and_then(|t| {
53                    t.try_map(|a| D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into())))
54                })
55            })
56            .collect::<Result<Vec<_>, _>>()
57            .map_err(|e| sqlx::Error::Decode(e.into()))?;
58            Ok(tasks)
59        }
60    }
61}
62
63impl<Args, D, F> ListAllTasks for SqliteStorage<Args, D, F>
64where
65    Self: BackendExt<
66            Context = SqliteContext,
67            Compact = CompactType,
68            IdType = Ulid,
69            Error = sqlx::Error,
70        >,
71{
72    fn list_all_tasks(
73        &self,
74        filter: &Filter,
75    ) -> impl Future<
76        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
77    > + Send {
78        let status = filter
79            .status
80            .as_ref()
81            .map(|s| s.to_string())
82            .unwrap_or(Status::Pending.to_string());
83        let pool = self.pool.clone();
84        let limit = filter.limit() as i32;
85        let offset = filter.offset() as i32;
86        async move {
87            let tasks = sqlx::query_file_as!(
88                SqliteTaskRow,
89                "queries/backend/list_all_jobs.sql",
90                status,
91                limit,
92                offset
93            )
94            .fetch_all(&pool)
95            .await?
96            .into_iter()
97            .map(|r| {
98                let row: TaskRow = r.try_into()?;
99                row.try_into_task_compact()
100                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
101            })
102            .collect::<Result<Vec<_>, _>>()?;
103            Ok(tasks)
104        }
105    }
106}