apalis_postgres/queries/
list_tasks.rs1use apalis_core::{
2 backend::{Backend, Filter, ListAllTasks, ListTasks, codec::Codec},
3 task::{Task, status::Status},
4};
5use apalis_sql::{context::SqlContext, from_row::TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, PgTask, PostgresStorage, from_row::PgTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
11where
12 PostgresStorage<Args, CompactType, D, F>:
13 Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14 D: Codec<Args, Compact = CompactType>,
15 D::Error: std::error::Error + Send + Sync + 'static,
16 Args: 'static,
17{
18 fn list_tasks(
19 &self,
20 queue: &str,
21 filter: &Filter,
22 ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
23 let queue = queue.to_string();
24 let pool = self.pool.clone();
25 let limit = filter.limit() as i64;
26 let offset = filter.offset() as i64;
27 let status = filter
28 .status
29 .as_ref()
30 .unwrap_or(&Status::Pending)
31 .to_string();
32 async move {
33 let tasks = sqlx::query_file_as!(
34 PgTaskRow,
35 "queries/backend/list_jobs.sql",
36 status,
37 queue,
38 limit,
39 offset
40 )
41 .fetch_all(&pool)
42 .await?
43 .into_iter()
44 .map(|r| {
45 let row: TaskRow = r.try_into()?;
46 row.try_into_task::<D, _, Ulid>()
47 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
48 })
49 .collect::<Result<Vec<_>, _>>()?;
50 Ok(tasks)
51 }
52 }
53}
54
55impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
56where
57 PostgresStorage<Args, CompactType, D, F>:
58 Backend<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
59{
60 fn list_all_tasks(
61 &self,
62 filter: &Filter,
63 ) -> impl Future<
64 Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
65 > + Send {
66 let status = filter
67 .status
68 .as_ref()
69 .map(|s| s.to_string())
70 .unwrap_or(Status::Pending.to_string());
71 let pool = self.pool.clone();
72 let limit = filter.limit() as i64;
73 let offset = filter.offset() as i64;
74 async move {
75 let tasks = sqlx::query_file_as!(
76 PgTaskRow,
77 "queries/backend/list_all_jobs.sql",
78 status,
79 limit,
80 offset
81 )
82 .fetch_all(&pool)
83 .await?
84 .into_iter()
85 .map(|r| {
86 let row: TaskRow = r.try_into()?;
87 row.try_into_task_compact()
88 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
89 })
90 .collect::<Result<Vec<_>, _>>()?;
91 Ok(tasks)
92 }
93 }
94}