apalis_sqlite/queries/
list_tasks.rs1use apalis_core::{
2 backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
3 task::{Task, status::Status},
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, SqlContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};
9
10impl<Args, D, F> ListTasks<Args> for SqliteStorage<Args, D, F>
11where
12 Self:
13 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14 D: Codec<Args, Compact = CompactType>,
15 D::Error: std::error::Error + Send + Sync + 'static,
16 Args: 'static,
17{
18 fn list_tasks(
19 &self,
20 queue: &str,
21 filter: &Filter,
22 ) -> impl Future<Output = Result<Vec<SqliteTask<Args>>, Self::Error>> + Send {
23 let queue = queue.to_owned();
24 let pool = self.pool.clone();
25 let limit = filter.limit() as i32;
26 let offset = filter.offset() as i32;
27 let status = filter
28 .status
29 .as_ref()
30 .unwrap_or(&Status::Pending)
31 .to_string();
32 async move {
33 let tasks = sqlx::query_file_as!(
34 SqliteTaskRow,
35 "queries/backend/list_jobs.sql",
36 status,
37 queue,
38 limit,
39 offset
40 )
41 .fetch_all(&pool)
42 .await?
43 .into_iter()
44 .map(|r| {
45 let row: TaskRow = r
46 .try_into()
47 .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
48 row.try_into_task_compact().and_then(|t| {
49 t.try_map(|a| D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into())))
50 })
51 })
52 .collect::<Result<Vec<_>, _>>()
53 .map_err(|e| sqlx::Error::Decode(e.into()))?;
54 Ok(tasks)
55 }
56 }
57}
58
59impl<Args, D, F> ListAllTasks for SqliteStorage<Args, D, F>
60where
61 Self:
62 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
63{
64 fn list_all_tasks(
65 &self,
66 filter: &Filter,
67 ) -> impl Future<
68 Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
69 > + Send {
70 let status = filter
71 .status
72 .as_ref()
73 .map(|s| s.to_string())
74 .unwrap_or(Status::Pending.to_string());
75 let pool = self.pool.clone();
76 let limit = filter.limit() as i32;
77 let offset = filter.offset() as i32;
78 async move {
79 let tasks = sqlx::query_file_as!(
80 SqliteTaskRow,
81 "queries/backend/list_all_jobs.sql",
82 status,
83 limit,
84 offset
85 )
86 .fetch_all(&pool)
87 .await?
88 .into_iter()
89 .map(|r| {
90 let row: TaskRow = r.try_into()?;
91 row.try_into_task_compact()
92 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
93 })
94 .collect::<Result<Vec<_>, _>>()?;
95 Ok(tasks)
96 }
97 }
98}