apalis_redis/queries/
list_tasks.rs

1use apalis_core::backend::{Backend, Filter, ListAllTasks, ListTasks, codec::Codec};
2use redis::{Script, Value};
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
6
7impl<Args, Conn, C> ListTasks<Args> for RedisStorage<Args, Conn, C>
8where
9    RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
10    C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
11    C::Error: std::error::Error + Send + Sync + 'static,
12    Args: 'static + Send + Sync,
13    Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
14{
15    async fn list_tasks(
16        &self,
17        queue: &str,
18        filter: &Filter,
19    ) -> Result<Vec<RedisTask<Args>>, Self::Error> {
20        let script = Script::new(include_str!("../../lua/list_tasks.lua"));
21        let mut conn = self.conn.clone();
22        let status_str = filter
23            .status
24            .as_ref()
25            .map(|s| s.to_string())
26            .unwrap_or_default();
27        let page = filter.page;
28        let page_size = filter.page_size.unwrap_or(10);
29
30        let result: Value = script
31            .key(self.config.job_data_hash())
32            .key(self.config.job_meta_hash())
33            .key(queue)
34            .arg(status_str)
35            .arg(page.to_string())
36            .arg(page_size.to_string())
37            .invoke_async(&mut conn)
38            .await?;
39
40        if let Value::Array(arr) = &result {
41            deserialize_with_meta(&arr)
42                .map(|tasks| {
43                    tasks
44                        .into_iter()
45                        .map(|t| t.into_full_task::<Args, C>())
46                        .collect::<Result<Vec<RedisTask<Args>>, _>>()
47                })
48                .and_then(|s| Ok(s?))
49        } else {
50            Ok(vec![])
51        }
52    }
53}
54
55impl<Args, Conn, C> ListAllTasks for RedisStorage<Args, Conn, C>
56where
57    RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
58    C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
59    C::Error: std::error::Error + Send + Sync + 'static,
60    Args: 'static + Send + Sync,
61    Conn: redis::aio::ConnectionLike + Send + Sync + Clone,
62{
63    async fn list_all_tasks(
64        &self,
65        filter: &Filter,
66    ) -> Result<Vec<RedisTask<Vec<u8>>>, Self::Error> {
67        let mut conn = self.conn.clone();
68        let script = Script::new(include_str!("../../lua/list_all_tasks.lua"));
69        let status_str = filter
70            .status
71            .as_ref()
72            .map(|s| s.to_string())
73            .unwrap_or_default();
74        let page = filter.page;
75        let page_size = filter.page_size.unwrap_or(10);
76
77        let result: Value = script
78            .key(self.config.job_data_hash())
79            .key(self.config.job_meta_hash())
80            .arg(status_str)
81            .arg(page.to_string())
82            .arg(page_size.to_string())
83            .invoke_async(&mut conn)
84            .await?;
85
86        if let Value::Array(arr) = result {
87            deserialize_with_meta(&arr)
88                .map(|tasks| {
89                    tasks
90                        .into_iter()
91                        .map(|t| t.into_full_compact())
92                        .collect::<Result<Vec<RedisTask<Vec<u8>>>, _>>()
93                })
94                .and_then(|s| Ok(s?))
95        } else {
96            Ok(vec![])
97        }
98    }
99}