apalis_postgres/queries/
list_tasks.rs

1use apalis_core::{
2    backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3    task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, PgContext, PgTask, PostgresStorage, from_row::PgTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
11where
12    PostgresStorage<Args, CompactType, D, F>:
13        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14    D: Codec<Args, Compact = CompactType>,
15    D::Error: std::error::Error + Send + Sync + 'static,
16    Args: 'static,
17{
18    fn list_tasks(
19        &self,
20        queue: &str,
21        filter: &Filter,
22    ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
23        let queue = queue.to_string();
24        let pool = self.pool.clone();
25        let limit = filter.limit() as i64;
26        let offset = filter.offset() as i64;
27        let status = filter
28            .status
29            .as_ref()
30            .unwrap_or(&Status::Pending)
31            .to_string();
32        async move {
33            let tasks = sqlx::query_file_as!(
34                PgTaskRow,
35                "queries/backend/list_jobs.sql",
36                status,
37                queue,
38                limit,
39                offset
40            )
41            .fetch_all(&pool)
42            .await?
43            .into_iter()
44            .map(|r| {
45                let row: TaskRow = r.try_into()?;
46                row.try_into_task_compact()
47                    .and_then(|a| {
48                        a.try_map(|t| D::decode(&t))
49                            .map_err(|e| FromRowError::DecodeError(e.into()))
50                    })
51                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
52            })
53            .collect::<Result<Vec<_>, _>>()?;
54            Ok(tasks)
55        }
56    }
57}
58
59impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
60where
61    PostgresStorage<Args, CompactType, D, F>:
62        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
63{
64    fn list_all_tasks(
65        &self,
66        filter: &Filter,
67    ) -> impl Future<
68        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
69    > + Send {
70        let status = filter
71            .status
72            .as_ref()
73            .map(|s| s.to_string())
74            .unwrap_or(Status::Pending.to_string());
75        let pool = self.pool.clone();
76        let limit = filter.limit() as i64;
77        let offset = filter.offset() as i64;
78        async move {
79            let tasks = sqlx::query_file_as!(
80                PgTaskRow,
81                "queries/backend/list_all_jobs.sql",
82                status,
83                limit,
84                offset
85            )
86            .fetch_all(&pool)
87            .await?
88            .into_iter()
89            .map(|r| {
90                let row: TaskRow = r.try_into()?;
91                row.try_into_task_compact()
92                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
93            })
94            .collect::<Result<Vec<_>, _>>()?;
95            Ok(tasks)
96        }
97    }
98}