apalis_redis/queries/
list_queues.rs

1use apalis_core::backend::{Backend, ListQueues, QueueInfo, codec::Codec};
2use ulid::Ulid;
3
4use crate::{RedisContext, RedisStorage};
5
6impl<Args, Conn, C> ListQueues for RedisStorage<Args, Conn, C>
7where
8    RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
9    C: Codec<Args, Compact = Vec<u8>> + Send,
10    C::Error: std::error::Error + Send + Sync + 'static,
11    Args: 'static + Send,
12    Conn: redis::aio::ConnectionLike + Send + Clone,
13{
14    fn list_queues(&self) -> impl Future<Output = Result<Vec<QueueInfo>, Self::Error>> + Send {
15        let mut conn = self.conn.clone();
16
17        async move {
18            let queues = redis::cmd("ZRANGE")
19                .arg("core::apalis::queues::list")
20                .arg(0)
21                .arg(-1)
22                .query_async::<Vec<String>>(&mut conn)
23                .await?
24                .into_iter()
25                .map(|name| QueueInfo {
26                    name,
27                    activity: Vec::new(),
28                    stats: Vec::new(),
29                    workers: Vec::new(),
30                })
31                .collect::<Vec<_>>();
32            // let lua = include_str!("../../lua/overview_by_queue.lua");
33            // let script = Script::new(lua);
34            // let now = chrono::Utc::now().timestamp();
35            // let res = script
36            //     .arg(now)
37            //     .key(queues)
38            //     .invoke_async::<String>(&mut conn)
39            //     .await
40            //     .and_then(|json| {
41            //         dbg!(&json);
42            //         let stats =
43            //             serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
44            //         Ok(stats)
45            //     })?;
46            Ok(queues)
47        }
48    }
49}