apalis_redis/queries/
metrics.rs1use apalis_core::backend::{BackendExt, Metrics, Statistic, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, build_error};
6
7impl<Args, Conn, C> Metrics for RedisStorage<Args, Conn, C>
8where
9 RedisStorage<Args, Conn, C>: BackendExt<
10 Context = RedisContext,
11 Compact = Vec<u8>,
12 IdType = Ulid,
13 Error = redis::RedisError,
14 >,
15 C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
16 C::Error: std::error::Error + Send + Sync + 'static,
17 Args: 'static + Send + Sync,
18 Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
19{
20 fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
21 let mut conn = self.conn.clone();
22
23 async move {
24 let queues = redis::cmd("ZRANGE")
25 .arg("core::apalis::queues")
26 .arg(0)
27 .arg(-1)
28 .query_async::<Vec<String>>(&mut conn)
29 .await?;
30 let lua = include_str!("../../lua/overview.lua");
31 let script = Script::new(lua);
32 let now = chrono::Utc::now().timestamp();
33 let mut script = &mut script.arg(now);
34 for queue in queues {
35 script = script.key(queue);
36 }
37 let res = script
38 .invoke_async::<String>(&mut conn)
39 .await
40 .and_then(|json| {
41 let stats: Vec<Statistic> =
42 serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
43 Ok(stats)
44 })?;
45
46 Ok(res)
47 }
48 }
49 fn fetch_by_queue(
50 &self,
51 queue_id: &str,
52 ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
53 let mut conn = self.conn.clone();
54
55 let queue_name = queue_id.to_string();
56 async move {
57 let lua = include_str!("../../lua/overview_by_queue.lua");
58 let script = Script::new(lua);
59
60 let active = format!("{}:active", queue_name);
61 let done = format!("{}:done", queue_name);
62 let dead = format!("{}:dead", queue_name);
63 let inflight = format!("{}:inflight", queue_name);
64
65 let json: String = script
67 .key(active)
68 .key(done)
69 .key(dead)
70 .key(inflight)
71 .invoke_async(&mut conn)
72 .await?;
73
74 let stats: Vec<Statistic> =
75 serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
76
77 Ok(stats)
78 }
79 }
80}