apalis_redis/queries/
list_queues.rs

1use apalis_core::backend::{BackendExt, ListQueues, QueueInfo, codec::Codec};
2use ulid::Ulid;
3
4use crate::{RedisContext, RedisStorage};
5
6impl<Args, Conn, C> ListQueues for RedisStorage<Args, Conn, C>
7where
8    RedisStorage<Args, Conn, C>: BackendExt<
9            Context = RedisContext,
10            Compact = Vec<u8>,
11            IdType = Ulid,
12            Error = redis::RedisError,
13        >,
14    C: Codec<Args, Compact = Vec<u8>> + Send,
15    C::Error: std::error::Error + Send + Sync + 'static,
16    Args: 'static + Send,
17    Conn: redis::aio::ConnectionLike + Send + Clone,
18{
19    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
20        let mut conn = self.conn.clone();
21
22        async move {
23            let queues = redis::cmd("ZRANGE")
24                .arg("core::apalis::queues::list")
25                .arg(0)
26                .arg(-1)
27                .query_async::<Vec<String>>(&mut conn)
28                .await?
29                .into_iter()
30                .map(|name| QueueInfo {
31                    name,
32                    activity: Vec::new(),
33                    stats: Vec::new(),
34                    workers: Vec::new(),
35                })
36                .collect::<Vec<_>>();
37            // let lua = include_str!("../../lua/overview_by_queue.lua");
38            // let script = Script::new(lua);
39            // let now = chrono::Utc::now().timestamp();
40            // let res = script
41            //     .arg(now)
42            //     .key(queues)
43            //     .invoke_async::<String>(&mut conn)
44            //     .await
45            //     .and_then(|json| {
46            //         dbg!(&json);
47            //         let stats =
48            //             serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
49            //         Ok(stats)
50            //     })?;
51            Ok(queues)
52        }
53    }
54}