Skip to main content

apalis_postgres/queries/
list_tasks.rs

1use apalis_core::{
2    backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3    task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, PgContext, PgTask, PostgresStorage, from_row::PgTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
11where
12    PostgresStorage<Args, CompactType, D, F>:
13        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14    D: Codec<Args, Compact = CompactType>,
15    D::Error: std::error::Error + Send + Sync + 'static,
16    Args: 'static,
17{
18    fn list_tasks(
19        &self,
20        filter: &Filter,
21    ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
22        let queue = self.config.queue().to_string();
23        let pool = self.pool.clone();
24        let limit = filter.limit() as i64;
25        let offset = filter.offset() as i64;
26        let status = filter
27            .status
28            .as_ref()
29            .unwrap_or(&Status::Pending)
30            .to_string();
31        async move {
32            let tasks = sqlx::query_file_as!(
33                PgTaskRow,
34                "queries/backend/list_jobs.sql",
35                status,
36                queue,
37                limit,
38                offset
39            )
40            .fetch_all(&pool)
41            .await?
42            .into_iter()
43            .map(|r| {
44                let row: TaskRow = r.try_into()?;
45                row.try_into_task_compact()
46                    .and_then(|a| {
47                        a.try_map(|t| D::decode(&t))
48                            .map_err(|e| FromRowError::DecodeError(e.into()))
49                    })
50                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
51            })
52            .collect::<Result<Vec<_>, _>>()?;
53            Ok(tasks)
54        }
55    }
56}
57
58impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
59where
60    PostgresStorage<Args, CompactType, D, F>:
61        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
62{
63    fn list_all_tasks(
64        &self,
65        filter: &Filter,
66    ) -> impl Future<
67        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
68    > + Send {
69        let status = filter
70            .status
71            .as_ref()
72            .map(|s| s.to_string())
73            .unwrap_or(Status::Pending.to_string());
74        let pool = self.pool.clone();
75        let limit = filter.limit() as i64;
76        let offset = filter.offset() as i64;
77        async move {
78            let tasks = sqlx::query_file_as!(
79                PgTaskRow,
80                "queries/backend/list_all_jobs.sql",
81                status,
82                limit,
83                offset
84            )
85            .fetch_all(&pool)
86            .await?
87            .into_iter()
88            .map(|r| {
89                let row: TaskRow = r.try_into()?;
90                row.try_into_task_compact()
91                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
92            })
93            .collect::<Result<Vec<_>, _>>()?;
94            Ok(tasks)
95        }
96    }
97}