apalis_redis/queries/
metrics.rs1use apalis_core::backend::{Backend, Metrics, Statistic, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, build_error};
6
7impl<Args, Conn, C> Metrics for RedisStorage<Args, Conn, C>
8where
9 RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
10 C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
11 C::Error: std::error::Error + Send + Sync + 'static,
12 Args: 'static + Send + Sync,
13 Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
14{
15 fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
16 let mut conn = self.conn.clone();
17
18 async move {
19 let queues = redis::cmd("ZRANGE")
20 .arg("core::apalis::queues")
21 .arg(0)
22 .arg(-1)
23 .query_async::<Vec<String>>(&mut conn)
24 .await?;
25 let lua = include_str!("../../lua/overview.lua");
26 let script = Script::new(lua);
27 let now = chrono::Utc::now().timestamp();
28 let mut script = &mut script.arg(now);
29 for queue in queues {
30 script = script.key(queue);
31 }
32 let res = script
33 .invoke_async::<String>(&mut conn)
34 .await
35 .and_then(|json| {
36 let stats: Vec<Statistic> =
37 serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
38 Ok(stats)
39 })?;
40
41 Ok(res)
42 }
43 }
44 fn fetch_by_queue(
45 &self,
46 queue_id: &str,
47 ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
48 let mut conn = self.conn.clone();
49
50 let queue_name = queue_id.to_string();
51 async move {
52 let lua = include_str!("../../lua/overview_by_queue.lua");
53 let script = Script::new(lua);
54
55 let active = format!("{}:active", queue_name);
56 let done = format!("{}:done", queue_name);
57 let dead = format!("{}:dead", queue_name);
58 let inflight = format!("{}:inflight", queue_name);
59
60 let json: String = script
62 .key(active)
63 .key(done)
64 .key(dead)
65 .key(inflight)
66 .invoke_async(&mut conn)
67 .await?;
68
69 let stats: Vec<Statistic> =
70 serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
71
72 Ok(stats)
73 }
74 }
75}