apalis_redis/queries/
list_workers.rs

1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage};
6
7impl<Args: Sync, Conn, C> ListWorkers for RedisStorage<Args, Conn, C>
8where
9    RedisStorage<Args, Conn, C>: BackendExt<
10            Context = RedisContext,
11            Compact = Vec<u8>,
12            IdType = Ulid,
13            Error = redis::RedisError,
14        >,
15    C: Codec<Args, Compact = Vec<u8>> + Send,
16    C::Error: std::error::Error + Send + Sync + 'static,
17    Args: 'static + Send,
18    Conn: redis::aio::ConnectionLike + Send + Clone,
19{
20    fn list_workers(
21        &self,
22        queue: &str,
23    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
24        let queue = queue.to_string();
25        let mut conn = self.conn.clone();
26        async move {
27            let json: String = Script::new(include_str!("../../lua/list_workers.lua"))
28                .key(format!("{}:workers", queue))
29                .key("core::apalis::workers:metadata::")
30                .invoke_async(&mut conn)
31                .await?;
32            dbg!(&json);
33            let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
34                redis::RedisError::from((
35                    redis::ErrorKind::TypeError,
36                    "invalid JSON",
37                    e.to_string(),
38                ))
39            })?;
40
41            Ok(workers)
42        }
43    }
44
45    fn list_all_workers(
46        &self,
47    ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
48        let mut conn = self.conn.clone();
49        async move {
50            let queues = redis::cmd("ZRANGE")
51                .arg("core::apalis::queues::list")
52                .arg(0)
53                .arg(-1)
54                .query_async::<Vec<String>>(&mut conn)
55                .await?;
56
57            let script = Script::new(include_str!("../../lua/list_all_workers.lua"));
58            let mut script = script.key(queues);
59            let script = script.arg("core::apalis::workers:metadata::");
60
61            let json: String = script.invoke_async(&mut conn).await?;
62
63            let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
64                redis::RedisError::from((
65                    redis::ErrorKind::TypeError,
66                    "invalid JSON",
67                    e.to_string(),
68                ))
69            })?;
70
71            Ok(workers)
72        }
73    }
74}