apalis_redis/queries/
list_workers.rs1use apalis_core::backend::{Backend, ListWorkers, RunningWorker, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage};
6
7impl<Args: Sync, Conn, C> ListWorkers for RedisStorage<Args, Conn, C>
8where
9 RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
10 C: Codec<Args, Compact = Vec<u8>> + Send,
11 C::Error: std::error::Error + Send + Sync + 'static,
12 Args: 'static + Send,
13 Conn: redis::aio::ConnectionLike + Send + Clone,
14{
15 fn list_workers(
16 &self,
17 queue: &str,
18 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
19 let queue = queue.to_string();
20 let mut conn = self.conn.clone();
21 async move {
22 let json: String = Script::new(include_str!("../../lua/list_workers.lua"))
23 .key(format!("{}:workers", queue))
24 .key("core::apalis::workers:metadata::")
25 .invoke_async(&mut conn)
26 .await?;
27 dbg!(&json);
28 let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
29 redis::RedisError::from((
30 redis::ErrorKind::TypeError,
31 "invalid JSON",
32 e.to_string(),
33 ))
34 })?;
35
36 Ok(workers)
37 }
38 }
39
40 fn list_all_workers(
41 &self,
42 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
43 let mut conn = self.conn.clone();
44 async move {
45 let queues = redis::cmd("ZRANGE")
46 .arg("core::apalis::queues::list")
47 .arg(0)
48 .arg(-1)
49 .query_async::<Vec<String>>(&mut conn)
50 .await?;
51
52 let script = Script::new(include_str!("../../lua/list_all_workers.lua"));
53 let mut script = script.key(queues);
54 let script = script.arg("core::apalis::workers:metadata::");
55
56 let json: String = script.invoke_async(&mut conn).await?;
57
58 let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
59 redis::RedisError::from((
60 redis::ErrorKind::TypeError,
61 "invalid JSON",
62 e.to_string(),
63 ))
64 })?;
65
66 Ok(workers)
67 }
68 }
69}