apalis_redis/queries/
list_workers.rs1use apalis_core::backend::{BackendExt, ListWorkers, RunningWorker, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage};
6
7impl<Args: Sync, Conn, C> ListWorkers for RedisStorage<Args, Conn, C>
8where
9 RedisStorage<Args, Conn, C>: BackendExt<
10 Context = RedisContext,
11 Compact = Vec<u8>,
12 IdType = Ulid,
13 Error = redis::RedisError,
14 >,
15 C: Codec<Args, Compact = Vec<u8>> + Send,
16 C::Error: std::error::Error + Send + Sync + 'static,
17 Args: 'static + Send,
18 Conn: redis::aio::ConnectionLike + Send + Clone,
19{
20 fn list_workers(
21 &self,
22 queue: &str,
23 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
24 let queue = queue.to_string();
25 let mut conn = self.conn.clone();
26 async move {
27 let json: String = Script::new(include_str!("../../lua/list_workers.lua"))
28 .key(format!("{}:workers", queue))
29 .key("core::apalis::workers:metadata::")
30 .invoke_async(&mut conn)
31 .await?;
32 dbg!(&json);
33 let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
34 redis::RedisError::from((
35 redis::ErrorKind::TypeError,
36 "invalid JSON",
37 e.to_string(),
38 ))
39 })?;
40
41 Ok(workers)
42 }
43 }
44
45 fn list_all_workers(
46 &self,
47 ) -> impl Future<Output = Result<Vec<RunningWorker>, Self::Error>> + Send {
48 let mut conn = self.conn.clone();
49 async move {
50 let queues = redis::cmd("ZRANGE")
51 .arg("core::apalis::queues::list")
52 .arg(0)
53 .arg(-1)
54 .query_async::<Vec<String>>(&mut conn)
55 .await?;
56
57 let script = Script::new(include_str!("../../lua/list_all_workers.lua"));
58 let mut script = script.key(queues);
59 let script = script.arg("core::apalis::workers:metadata::");
60
61 let json: String = script.invoke_async(&mut conn).await?;
62
63 let workers: Vec<RunningWorker> = serde_json::from_str(&json).map_err(|e| {
64 redis::RedisError::from((
65 redis::ErrorKind::TypeError,
66 "invalid JSON",
67 e.to_string(),
68 ))
69 })?;
70
71 Ok(workers)
72 }
73 }
74}