apalis_postgres/queries/
list_tasks.rs

1use apalis_core::{
2    backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3    task::{Task, status::Status},
4};
5use apalis_sql::{
6    context::SqlContext,
7    from_row::{FromRowError, TaskRow},
8};
9use ulid::Ulid;
10
11use crate::{CompactType, PgTask, PostgresStorage, from_row::PgTaskRow};
12
13impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
14where
15    PostgresStorage<Args, CompactType, D, F>:
16        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
17    D: Codec<Args, Compact = CompactType>,
18    D::Error: std::error::Error + Send + Sync + 'static,
19    Args: 'static,
20{
21    fn list_tasks(
22        &self,
23        queue: &str,
24        filter: &Filter,
25    ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
26        let queue = queue.to_string();
27        let pool = self.pool.clone();
28        let limit = filter.limit() as i64;
29        let offset = filter.offset() as i64;
30        let status = filter
31            .status
32            .as_ref()
33            .unwrap_or(&Status::Pending)
34            .to_string();
35        async move {
36            let tasks = sqlx::query_file_as!(
37                PgTaskRow,
38                "queries/backend/list_jobs.sql",
39                status,
40                queue,
41                limit,
42                offset
43            )
44            .fetch_all(&pool)
45            .await?
46            .into_iter()
47            .map(|r| {
48                let row: TaskRow = r.try_into()?;
49                row.try_into_task_compact()
50                    .and_then(|a| {
51                        a.try_map(|t| D::decode(&t))
52                            .map_err(|e| FromRowError::DecodeError(e.into()))
53                    })
54                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
55            })
56            .collect::<Result<Vec<_>, _>>()?;
57            Ok(tasks)
58        }
59    }
60}
61
62impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
63where
64    PostgresStorage<Args, CompactType, D, F>:
65        BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
66{
67    fn list_all_tasks(
68        &self,
69        filter: &Filter,
70    ) -> impl Future<
71        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
72    > + Send {
73        let status = filter
74            .status
75            .as_ref()
76            .map(|s| s.to_string())
77            .unwrap_or(Status::Pending.to_string());
78        let pool = self.pool.clone();
79        let limit = filter.limit() as i64;
80        let offset = filter.offset() as i64;
81        async move {
82            let tasks = sqlx::query_file_as!(
83                PgTaskRow,
84                "queries/backend/list_all_jobs.sql",
85                status,
86                limit,
87                offset
88            )
89            .fetch_all(&pool)
90            .await?
91            .into_iter()
92            .map(|r| {
93                let row: TaskRow = r.try_into()?;
94                row.try_into_task_compact()
95                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
96            })
97            .collect::<Result<Vec<_>, _>>()?;
98            Ok(tasks)
99        }
100    }
101}