apalis_redis/queries/
list_tasks.rs

1use apalis_core::backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec};
2use redis::{Script, Value};
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
6
7impl<Args, Conn, C> ListTasks<Args> for RedisStorage<Args, Conn, C>
8where
9    RedisStorage<Args, Conn, C>: BackendExt<
10            Context = RedisContext,
11            Compact = Vec<u8>,
12            IdType = Ulid,
13            Error = redis::RedisError,
14        >,
15    C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
16    C::Error: std::error::Error + Send + Sync + 'static,
17    Args: 'static + Send + Sync,
18    Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
19{
20    async fn list_tasks(
21        &self,
22        queue: &str,
23        filter: &Filter,
24    ) -> Result<Vec<RedisTask<Args>>, Self::Error> {
25        let script = Script::new(include_str!("../../lua/list_tasks.lua"));
26        let mut conn = self.conn.clone();
27        let status_str = filter
28            .status
29            .as_ref()
30            .map(|s| s.to_string())
31            .unwrap_or_default();
32        let page = filter.page;
33        let page_size = filter.page_size.unwrap_or(10);
34
35        let result: Value = script
36            .key(self.config.job_data_hash())
37            .key(self.config.job_meta_hash())
38            .key(queue)
39            .arg(status_str)
40            .arg(page.to_string())
41            .arg(page_size.to_string())
42            .invoke_async(&mut conn)
43            .await?;
44
45        if let Value::Array(arr) = &result {
46            deserialize_with_meta(arr)
47                .map(|tasks| {
48                    tasks
49                        .into_iter()
50                        .map(|t| t.into_full_task::<Args, C>())
51                        .collect::<Result<Vec<RedisTask<Args>>, _>>()
52                })
53                .and_then(|s| s)
54        } else {
55            Ok(vec![])
56        }
57    }
58}
59
60impl<Args, Conn, C> ListAllTasks for RedisStorage<Args, Conn, C>
61where
62    RedisStorage<Args, Conn, C>: BackendExt<
63            Context = RedisContext,
64            Compact = Vec<u8>,
65            IdType = Ulid,
66            Error = redis::RedisError,
67        >,
68    C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
69    C::Error: std::error::Error + Send + Sync + 'static,
70    Args: 'static + Send + Sync,
71    Conn: redis::aio::ConnectionLike + Send + Sync + Clone,
72{
73    async fn list_all_tasks(
74        &self,
75        filter: &Filter,
76    ) -> Result<Vec<RedisTask<Vec<u8>>>, Self::Error> {
77        let mut conn = self.conn.clone();
78        let script = Script::new(include_str!("../../lua/list_all_tasks.lua"));
79        let status_str = filter
80            .status
81            .as_ref()
82            .map(|s| s.to_string())
83            .unwrap_or_default();
84        let page = filter.page;
85        let page_size = filter.page_size.unwrap_or(10);
86
87        let result: Value = script
88            .key(self.config.job_data_hash())
89            .key(self.config.job_meta_hash())
90            .arg(status_str)
91            .arg(page.to_string())
92            .arg(page_size.to_string())
93            .invoke_async(&mut conn)
94            .await?;
95
96        if let Value::Array(arr) = result {
97            deserialize_with_meta(&arr)
98                .map(|tasks| {
99                    tasks
100                        .into_iter()
101                        .map(|t| t.into_full_compact())
102                        .collect::<Result<Vec<RedisTask<Vec<u8>>>, _>>()
103                })
104                .and_then(|s| s)
105        } else {
106            Ok(vec![])
107        }
108    }
109}