apalis_redis/queries/
metrics.rs

1use apalis_core::backend::{Backend, Metrics, Statistic, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, build_error};
6
7impl<Args, Conn, C> Metrics for RedisStorage<Args, Conn, C>
8where
9    RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
10    C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
11    C::Error: std::error::Error + Send + Sync + 'static,
12    Args: 'static + Send + Sync,
13    Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
14{
15    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
16        let mut conn = self.conn.clone();
17
18        async move {
19            let queues = redis::cmd("ZRANGE")
20                .arg("core::apalis::queues")
21                .arg(0)
22                .arg(-1)
23                .query_async::<Vec<String>>(&mut conn)
24                .await?;
25            let lua = include_str!("../../lua/overview.lua");
26            let script = Script::new(lua);
27            let now = chrono::Utc::now().timestamp();
28            let mut script = &mut script.arg(now);
29            for queue in queues {
30                script = script.key(queue);
31            }
32            let res = script
33                .invoke_async::<String>(&mut conn)
34                .await
35                .and_then(|json| {
36                    let stats: Vec<Statistic> =
37                        serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
38                    Ok(stats)
39                })?;
40
41            Ok(res)
42        }
43    }
44    fn fetch_by_queue(
45        &self,
46        queue_id: &str,
47    ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
48        let mut conn = self.conn.clone();
49
50        let queue_name = queue_id.to_string();
51        async move {
52            let lua = include_str!("../../lua/overview_by_queue.lua");
53            let script = Script::new(lua);
54
55            let active = format!("{}:active", queue_name);
56            let done = format!("{}:done", queue_name);
57            let dead = format!("{}:dead", queue_name);
58            let inflight = format!("{}:inflight", queue_name);
59
60            // Execute the Lua script with 4 keys
61            let json: String = script
62                .key(active)
63                .key(done)
64                .key(dead)
65                .key(inflight)
66                .invoke_async(&mut conn)
67                .await?;
68
69            let stats: Vec<Statistic> =
70                serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
71
72            Ok(stats)
73        }
74    }
75}