apalis_postgres/queries/
list_tasks.rs1use apalis_core::{
2 backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3 task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, PgContext, PgTask, PostgresStorage, from_row::PgTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for PostgresStorage<Args, CompactType, D, F>
11where
12 PostgresStorage<Args, CompactType, D, F>:
13 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14 D: Codec<Args, Compact = CompactType>,
15 D::Error: std::error::Error + Send + Sync + 'static,
16 Args: 'static,
17{
18 fn list_tasks(
19 &self,
20 filter: &Filter,
21 ) -> impl Future<Output = Result<Vec<PgTask<Args>>, Self::Error>> + Send {
22 let queue = self.config.queue().to_string();
23 let pool = self.pool.clone();
24 let limit = filter.limit() as i64;
25 let offset = filter.offset() as i64;
26 let status = filter
27 .status
28 .as_ref()
29 .unwrap_or(&Status::Pending)
30 .to_string();
31 async move {
32 let tasks = sqlx::query_file_as!(
33 PgTaskRow,
34 "queries/backend/list_jobs.sql",
35 status,
36 queue,
37 limit,
38 offset
39 )
40 .fetch_all(&pool)
41 .await?
42 .into_iter()
43 .map(|r| {
44 let row: TaskRow = r.try_into()?;
45 row.try_into_task_compact()
46 .and_then(|a| {
47 a.try_map(|t| D::decode(&t))
48 .map_err(|e| FromRowError::DecodeError(e.into()))
49 })
50 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
51 })
52 .collect::<Result<Vec<_>, _>>()?;
53 Ok(tasks)
54 }
55 }
56}
57
58impl<Args, D, F> ListAllTasks for PostgresStorage<Args, CompactType, D, F>
59where
60 PostgresStorage<Args, CompactType, D, F>:
61 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
62{
63 fn list_all_tasks(
64 &self,
65 filter: &Filter,
66 ) -> impl Future<
67 Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
68 > + Send {
69 let status = filter
70 .status
71 .as_ref()
72 .map(|s| s.to_string())
73 .unwrap_or(Status::Pending.to_string());
74 let pool = self.pool.clone();
75 let limit = filter.limit() as i64;
76 let offset = filter.offset() as i64;
77 async move {
78 let tasks = sqlx::query_file_as!(
79 PgTaskRow,
80 "queries/backend/list_all_jobs.sql",
81 status,
82 limit,
83 offset
84 )
85 .fetch_all(&pool)
86 .await?
87 .into_iter()
88 .map(|r| {
89 let row: TaskRow = r.try_into()?;
90 row.try_into_task_compact()
91 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
92 })
93 .collect::<Result<Vec<_>, _>>()?;
94 Ok(tasks)
95 }
96 }
97}