apalis_sqlite/queries/
list_tasks.rs1use apalis_core::{
2 backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3 task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, SqliteContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for SqliteStorage<Args, D, F>
11where
12 Self: BackendExt<
13 Context = SqliteContext,
14 Compact = CompactType,
15 IdType = Ulid,
16 Error = sqlx::Error,
17 >,
18 D: Codec<Args, Compact = CompactType>,
19 D::Error: std::error::Error + Send + Sync + 'static,
20 Args: 'static,
21{
22 fn list_tasks(
23 &self,
24 queue: &str,
25 filter: &Filter,
26 ) -> impl Future<Output = Result<Vec<SqliteTask<Args>>, Self::Error>> + Send {
27 let queue = queue.to_owned();
28 let pool = self.pool.clone();
29 let limit = filter.limit() as i32;
30 let offset = filter.offset() as i32;
31 let status = filter
32 .status
33 .as_ref()
34 .unwrap_or(&Status::Pending)
35 .to_string();
36 async move {
37 let tasks = sqlx::query_file_as!(
38 SqliteTaskRow,
39 "queries/backend/list_jobs.sql",
40 status,
41 queue,
42 limit,
43 offset
44 )
45 .fetch_all(&pool)
46 .await?
47 .into_iter()
48 .map(|r| {
49 let row: TaskRow = r
50 .try_into()
51 .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
52 row.try_into_task_compact().and_then(|t| {
53 t.try_map(|a| D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into())))
54 })
55 })
56 .collect::<Result<Vec<_>, _>>()
57 .map_err(|e| sqlx::Error::Decode(e.into()))?;
58 Ok(tasks)
59 }
60 }
61}
62
63impl<Args, D, F> ListAllTasks for SqliteStorage<Args, D, F>
64where
65 Self: BackendExt<
66 Context = SqliteContext,
67 Compact = CompactType,
68 IdType = Ulid,
69 Error = sqlx::Error,
70 >,
71{
72 fn list_all_tasks(
73 &self,
74 filter: &Filter,
75 ) -> impl Future<
76 Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
77 > + Send {
78 let status = filter
79 .status
80 .as_ref()
81 .map(|s| s.to_string())
82 .unwrap_or(Status::Pending.to_string());
83 let pool = self.pool.clone();
84 let limit = filter.limit() as i32;
85 let offset = filter.offset() as i32;
86 async move {
87 let tasks = sqlx::query_file_as!(
88 SqliteTaskRow,
89 "queries/backend/list_all_jobs.sql",
90 status,
91 limit,
92 offset
93 )
94 .fetch_all(&pool)
95 .await?
96 .into_iter()
97 .map(|r| {
98 let row: TaskRow = r.try_into()?;
99 row.try_into_task_compact()
100 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
101 })
102 .collect::<Result<Vec<_>, _>>()?;
103 Ok(tasks)
104 }
105 }
106}