apalis_redis/queries/
list_tasks.rs1use apalis_core::backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec};
2use redis::{Script, Value};
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
6
7impl<Args, Conn, C> ListTasks<Args> for RedisStorage<Args, Conn, C>
8where
9 RedisStorage<Args, Conn, C>: BackendExt<
10 Context = RedisContext,
11 Compact = Vec<u8>,
12 IdType = Ulid,
13 Error = redis::RedisError,
14 >,
15 C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
16 C::Error: std::error::Error + Send + Sync + 'static,
17 Args: 'static + Send + Sync,
18 Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
19{
20 async fn list_tasks(
21 &self,
22 queue: &str,
23 filter: &Filter,
24 ) -> Result<Vec<RedisTask<Args>>, Self::Error> {
25 let script = Script::new(include_str!("../../lua/list_tasks.lua"));
26 let mut conn = self.conn.clone();
27 let status_str = filter
28 .status
29 .as_ref()
30 .map(|s| s.to_string())
31 .unwrap_or_default();
32 let page = filter.page;
33 let page_size = filter.page_size.unwrap_or(10);
34
35 let result: Value = script
36 .key(self.config.job_data_hash())
37 .key(self.config.job_meta_hash())
38 .key(queue)
39 .arg(status_str)
40 .arg(page.to_string())
41 .arg(page_size.to_string())
42 .invoke_async(&mut conn)
43 .await?;
44
45 if let Value::Array(arr) = &result {
46 deserialize_with_meta(arr)
47 .map(|tasks| {
48 tasks
49 .into_iter()
50 .map(|t| t.into_full_task::<Args, C>())
51 .collect::<Result<Vec<RedisTask<Args>>, _>>()
52 })
53 .and_then(|s| s)
54 } else {
55 Ok(vec![])
56 }
57 }
58}
59
60impl<Args, Conn, C> ListAllTasks for RedisStorage<Args, Conn, C>
61where
62 RedisStorage<Args, Conn, C>: BackendExt<
63 Context = RedisContext,
64 Compact = Vec<u8>,
65 IdType = Ulid,
66 Error = redis::RedisError,
67 >,
68 C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
69 C::Error: std::error::Error + Send + Sync + 'static,
70 Args: 'static + Send + Sync,
71 Conn: redis::aio::ConnectionLike + Send + Sync + Clone,
72{
73 async fn list_all_tasks(
74 &self,
75 filter: &Filter,
76 ) -> Result<Vec<RedisTask<Vec<u8>>>, Self::Error> {
77 let mut conn = self.conn.clone();
78 let script = Script::new(include_str!("../../lua/list_all_tasks.lua"));
79 let status_str = filter
80 .status
81 .as_ref()
82 .map(|s| s.to_string())
83 .unwrap_or_default();
84 let page = filter.page;
85 let page_size = filter.page_size.unwrap_or(10);
86
87 let result: Value = script
88 .key(self.config.job_data_hash())
89 .key(self.config.job_meta_hash())
90 .arg(status_str)
91 .arg(page.to_string())
92 .arg(page_size.to_string())
93 .invoke_async(&mut conn)
94 .await?;
95
96 if let Value::Array(arr) = result {
97 deserialize_with_meta(&arr)
98 .map(|tasks| {
99 tasks
100 .into_iter()
101 .map(|t| t.into_full_compact())
102 .collect::<Result<Vec<RedisTask<Vec<u8>>>, _>>()
103 })
104 .and_then(|s| s)
105 } else {
106 Ok(vec![])
107 }
108 }
109}