apalis_postgres/queries/
list_tasks.rs

1use apalis_core::{
2    backend::{Backend, Filter, ListAllTasks, ListTasks, codec::Codec},
3    task::{Task, status::Status},
4};
5use apalis_sql::{context::SqlContext, from_row::TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, PgTask, PostgresStorage, from_row::PgTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
11where
12    PostgresStorage<Args, CompactType, D, F>:
13        Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14    D: Codec<Args, Compact = CompactType>,
15    D::Error: std::error::Error + Send + Sync + 'static,
16    Args: 'static,
17{
18    fn list_tasks(
19        &self,
20        queue: &str,
21        filter: &Filter,
22    ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
23        let queue = queue.to_string();
24        let pool = self.pool.clone();
25        let limit = filter.limit() as i64;
26        let offset = filter.offset() as i64;
27        let status = filter
28            .status
29            .as_ref()
30            .unwrap_or(&Status::Pending)
31            .to_string();
32        async move {
33            let tasks = sqlx::query_file_as!(
34                PgTaskRow,
35                "queries/backend/list_jobs.sql",
36                status,
37                queue,
38                limit,
39                offset
40            )
41            .fetch_all(&pool)
42            .await?
43            .into_iter()
44            .map(|r| {
45                let row: TaskRow = r.try_into()?;
46                row.try_into_task::<D, _, Ulid>()
47                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
48            })
49            .collect::<Result<Vec<_>, _>>()?;
50            Ok(tasks)
51        }
52    }
53}
54
55impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
56where
57    PostgresStorage<Args, CompactType, D, F>:
58        Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
59{
60    fn list_all_tasks(
61        &self,
62        filter: &Filter,
63    ) -> impl Future<
64        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
65    > + Send {
66        let status = filter
67            .status
68            .as_ref()
69            .map(|s| s.to_string())
70            .unwrap_or(Status::Pending.to_string());
71        let pool = self.pool.clone();
72        let limit = filter.limit() as i64;
73        let offset = filter.offset() as i64;
74        async move {
75            let tasks = sqlx::query_file_as!(
76                PgTaskRow,
77                "queries/backend/list_all_jobs.sql",
78                status,
79                limit,
80                offset
81            )
82            .fetch_all(&pool)
83            .await?
84            .into_iter()
85            .map(|r| {
86                let row: TaskRow = r.try_into()?;
87                row.try_into_task_compact()
88                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
89            })
90            .collect::<Result<Vec<_>, _>>()?;
91            Ok(tasks)
92        }
93    }
94}