apalis_postgres/queries/
list_tasks.rs1use apalis_core::{
2 backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3 task::{Task, status::Status},
4};
5use apalis_sql::{
6 context::SqlContext,
7 from_row::{FromRowError, TaskRow},
8};
9use ulid::Ulid;
10
11use crate::{CompactType, PgTask, PostgresStorage, from_row::PgTaskRow};
12
13impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
14where
15 PostgresStorage<Args, CompactType, D, F>:
16 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
17 D: Codec<Args, Compact = CompactType>,
18 D::Error: std::error::Error + Send + Sync + 'static,
19 Args: 'static,
20{
21 fn list_tasks(
22 &self,
23 queue: &str,
24 filter: &Filter,
25 ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
26 let queue = queue.to_string();
27 let pool = self.pool.clone();
28 let limit = filter.limit() as i64;
29 let offset = filter.offset() as i64;
30 let status = filter
31 .status
32 .as_ref()
33 .unwrap_or(&Status::Pending)
34 .to_string();
35 async move {
36 let tasks = sqlx::query_file_as!(
37 PgTaskRow,
38 "queries/backend/list_jobs.sql",
39 status,
40 queue,
41 limit,
42 offset
43 )
44 .fetch_all(&pool)
45 .await?
46 .into_iter()
47 .map(|r| {
48 let row: TaskRow = r.try_into()?;
49 row.try_into_task_compact()
50 .and_then(|a| {
51 a.try_map(|t| D::decode(&t))
52 .map_err(|e| FromRowError::DecodeError(e.into()))
53 })
54 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
55 })
56 .collect::<Result<Vec<_>, _>>()?;
57 Ok(tasks)
58 }
59 }
60}
61
62impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
63where
64 PostgresStorage<Args, CompactType, D, F>:
65 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
66{
67 fn list_all_tasks(
68 &self,
69 filter: &Filter,
70 ) -> impl Future<
71 Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
72 > + Send {
73 let status = filter
74 .status
75 .as_ref()
76 .map(|s| s.to_string())
77 .unwrap_or(Status::Pending.to_string());
78 let pool = self.pool.clone();
79 let limit = filter.limit() as i64;
80 let offset = filter.offset() as i64;
81 async move {
82 let tasks = sqlx::query_file_as!(
83 PgTaskRow,
84 "queries/backend/list_all_jobs.sql",
85 status,
86 limit,
87 offset
88 )
89 .fetch_all(&pool)
90 .await?
91 .into_iter()
92 .map(|r| {
93 let row: TaskRow = r.try_into()?;
94 row.try_into_task_compact()
95 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
96 })
97 .collect::<Result<Vec<_>, _>>()?;
98 Ok(tasks)
99 }
100 }
101}