apalis_redis/queries/
list_workers.rs

1use apalis_core::backend::{Backend, ListWorkers, RunningWorker, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage};
6
7impl<Args: Sync, Conn, C> ListWorkers for RedisStorage<Args, Conn, C>
8where
9    RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
10    C: Codec<Args, Compact = Vec<u8>> + Send,
11    C::Error: std::error::Error + Send + Sync + 'static,
12    Args: 'static + Send,
13    Conn: redis::aio::ConnectionLike + Send + Clone,
14{
15    fn list_workers(
16        &self,
17        queue: &str,
18    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
19        let queue = queue.to_string();
20        let mut conn = self.conn.clone();
21        async move {
22            let json: String = Script::new(include_str!("../../lua/list_workers.lua"))
23                .key(format!("{}:workers", queue))
24                .key("core::apalis::workers:metadata::")
25                .invoke_async(&mut conn)
26                .await?;
27            dbg!(&json);
28            let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
29                redis::RedisError::from((
30                    redis::ErrorKind::TypeError,
31                    "invalid JSON",
32                    e.to_string(),
33                ))
34            })?;
35
36            Ok(workers)
37        }
38    }
39
40    fn list_all_workers(
41        &self,
42    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
43        let mut conn = self.conn.clone();
44        async move {
45            let queues = redis::cmd("ZRANGE")
46                .arg("core::apalis::queues::list")
47                .arg(0)
48                .arg(-1)
49                .query_async::<Vec<String>>(&mut conn)
50                .await?;
51
52            let script = Script::new(include_str!("../../lua/list_all_workers.lua"));
53            let mut script = script.key(queues);
54            let script = script.arg("core::apalis::workers:metadata::");
55
56            let json: String = script.invoke_async(&mut conn).await?;
57
58            let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
59                redis::RedisError::from((
60                    redis::ErrorKind::TypeError,
61                    "invalid JSON",
62                    e.to_string(),
63                ))
64            })?;
65
66            Ok(workers)
67        }
68    }
69}