apalis_redis/queries/
list_tasks.rs1use apalis_core::backend::{Backend, Filter, ListAllTasks, ListTasks, codec::Codec};
2use redis::{Script, Value};
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
6
7impl<Args, Conn, C> ListTasks<Args> for RedisStorage<Args, Conn, C>
8where
9 RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
10 C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
11 C::Error: std::error::Error + Send + Sync + 'static,
12 Args: 'static + Send + Sync,
13 Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
14{
15 async fn list_tasks(
16 &self,
17 queue: &str,
18 filter: &Filter,
19 ) -> Result<Vec<RedisTask<Args>>, Self::Error> {
20 let script = Script::new(include_str!("../../lua/list_tasks.lua"));
21 let mut conn = self.conn.clone();
22 let status_str = filter
23 .status
24 .as_ref()
25 .map(|s| s.to_string())
26 .unwrap_or_default();
27 let page = filter.page;
28 let page_size = filter.page_size.unwrap_or(10);
29
30 let result: Value = script
31 .key(self.config.job_data_hash())
32 .key(self.config.job_meta_hash())
33 .key(queue)
34 .arg(status_str)
35 .arg(page.to_string())
36 .arg(page_size.to_string())
37 .invoke_async(&mut conn)
38 .await?;
39
40 if let Value::Array(arr) = &result {
41 deserialize_with_meta(&arr)
42 .map(|tasks| {
43 tasks
44 .into_iter()
45 .map(|t| t.into_full_task::<Args, C>())
46 .collect::<Result<Vec<RedisTask<Args>>, _>>()
47 })
48 .and_then(|s| Ok(s?))
49 } else {
50 Ok(vec![])
51 }
52 }
53}
54
55impl<Args, Conn, C> ListAllTasks for RedisStorage<Args, Conn, C>
56where
57 RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
58 C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
59 C::Error: std::error::Error + Send + Sync + 'static,
60 Args: 'static + Send + Sync,
61 Conn: redis::aio::ConnectionLike + Send + Sync + Clone,
62{
63 async fn list_all_tasks(
64 &self,
65 filter: &Filter,
66 ) -> Result<Vec<RedisTask<Vec<u8>>>, Self::Error> {
67 let mut conn = self.conn.clone();
68 let script = Script::new(include_str!("../../lua/list_all_tasks.lua"));
69 let status_str = filter
70 .status
71 .as_ref()
72 .map(|s| s.to_string())
73 .unwrap_or_default();
74 let page = filter.page;
75 let page_size = filter.page_size.unwrap_or(10);
76
77 let result: Value = script
78 .key(self.config.job_data_hash())
79 .key(self.config.job_meta_hash())
80 .arg(status_str)
81 .arg(page.to_string())
82 .arg(page_size.to_string())
83 .invoke_async(&mut conn)
84 .await?;
85
86 if let Value::Array(arr) = result {
87 deserialize_with_meta(&arr)
88 .map(|tasks| {
89 tasks
90 .into_iter()
91 .map(|t| t.into_full_compact())
92 .collect::<Result<Vec<RedisTask<Vec<u8>>>, _>>()
93 })
94 .and_then(|s| Ok(s?))
95 } else {
96 Ok(vec![])
97 }
98 }
99}