apalis_redis/queries/
metrics.rs

1use apalis_core::backend::{BackendExt, Metrics, Statistic, codec::Codec};
2use redis::Script;
3use ulid::Ulid;
4
5use crate::{RedisContext, RedisStorage, build_error};
6
7impl<Args, Conn, C> Metrics for RedisStorage<Args, Conn, C>
8where
9    RedisStorage<Args, Conn, C>: BackendExt<
10            Context = RedisContext,
11            Compact = Vec<u8>,
12            IdType = Ulid,
13            Error = redis::RedisError,
14        >,
15    C: Codec<Args, Compact = Vec<u8>> + Send + Sync,
16    C::Error: std::error::Error + Send + Sync + 'static,
17    Args: 'static + Send + Sync,
18    Conn: redis::aio::ConnectionLike + Send + Clone + Sync,
19{
20    fn global(&self) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
21        let mut conn = self.conn.clone();
22
23        async move {
24            let queues = redis::cmd("ZRANGE")
25                .arg("core::apalis::queues")
26                .arg(0)
27                .arg(-1)
28                .query_async::<Vec<String>>(&mut conn)
29                .await?;
30            let lua = include_str!("../../lua/overview.lua");
31            let script = Script::new(lua);
32            let now = chrono::Utc::now().timestamp();
33            let mut script = &mut script.arg(now);
34            for queue in queues {
35                script = script.key(queue);
36            }
37            let res = script
38                .invoke_async::<String>(&mut conn)
39                .await
40                .and_then(|json| {
41                    let stats: Vec<Statistic> =
42                        serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
43                    Ok(stats)
44                })?;
45
46            Ok(res)
47        }
48    }
49    fn fetch_by_queue(
50        &self,
51        queue_id: &str,
52    ) -> impl Future<Output = Result<Vec<Statistic>, Self::Error>> + Send {
53        let mut conn = self.conn.clone();
54
55        let queue_name = queue_id.to_string();
56        async move {
57            let lua = include_str!("../../lua/overview_by_queue.lua");
58            let script = Script::new(lua);
59
60            let active = format!("{}:active", queue_name);
61            let done = format!("{}:done", queue_name);
62            let dead = format!("{}:dead", queue_name);
63            let inflight = format!("{}:inflight", queue_name);
64
65            // Execute the Lua script with 4 keys
66            let json: String = script
67                .key(active)
68                .key(done)
69                .key(dead)
70                .key(inflight)
71                .invoke_async(&mut conn)
72                .await?;
73
74            let stats: Vec<Statistic> =
75                serde_json::from_str(&json).map_err(|e| build_error(&e.to_string()))?;
76
77            Ok(stats)
78        }
79    }
80}