apalis_redis/
expose.rs

1use crate::RedisContext;
2use crate::RedisStorage;
3use apalis_core::backend::BackendExpose;
4use apalis_core::backend::Stat;
5use apalis_core::backend::WorkerState;
6use apalis_core::codec::json::JsonCodec;
7use apalis_core::codec::Codec;
8use apalis_core::request::Request;
9use apalis_core::request::State;
10use apalis_core::worker::Worker;
11use apalis_core::worker::WorkerId;
12use redis::{ErrorKind, Value};
13use serde::{de::DeserializeOwned, Serialize};
14
15type RedisCodec = JsonCodec<Vec<u8>>;
16
17impl<T> BackendExpose<T> for RedisStorage<T>
18where
19    T: 'static + Serialize + DeserializeOwned + Send + Unpin + Sync,
20{
21    type Request = Request<T, RedisContext>;
22    type Error = redis::RedisError;
23    async fn stats(&self) -> Result<Stat, redis::RedisError> {
24        let mut conn = self.get_connection().clone();
25        let queue = self.get_config();
26
27        let stats_script = self.scripts.stats.clone();
28
29        let results: Vec<usize> = stats_script
30            .key(queue.active_jobs_list())
31            .key(queue.consumers_set())
32            .key(queue.dead_jobs_set())
33            .key(queue.failed_jobs_set())
34            .key(queue.done_jobs_set())
35            .invoke_async(&mut conn)
36            .await?;
37
38        Ok(Stat {
39            pending: results[0],
40            running: results[1],
41            dead: results[2],
42            failed: results[3],
43            success: results[4],
44        })
45    }
46    async fn list_jobs(
47        &self,
48        status: &State,
49        page: i32,
50    ) -> Result<Vec<Self::Request>, redis::RedisError> {
51        let mut conn = self.get_connection().clone();
52        let queue = self.get_config();
53        match status {
54            State::Pending | State::Scheduled => {
55                let active_jobs_list = &queue.active_jobs_list();
56                let job_data_hash = &queue.job_data_hash();
57                let ids: Vec<String> = redis::cmd("LRANGE")
58                    .arg(active_jobs_list)
59                    .arg(((page - 1) * 10).to_string())
60                    .arg((page * 10).to_string())
61                    .query_async(&mut conn)
62                    .await?;
63
64                if ids.is_empty() {
65                    return Ok(Vec::new());
66                }
67                let data: Option<Value> = redis::cmd("HMGET")
68                    .arg(job_data_hash)
69                    .arg(&ids)
70                    .query_async(&mut conn)
71                    .await?;
72
73                let jobs: Vec<Request<T, RedisContext>> =
74                    deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
75                Ok(jobs)
76            }
77            State::Running => {
78                let consumers_set = &queue.consumers_set();
79                let job_data_hash = &queue.job_data_hash();
80                let workers: Vec<String> = redis::cmd("ZRANGE")
81                    .arg(consumers_set)
82                    .arg("0")
83                    .arg("-1")
84                    .query_async(&mut conn)
85                    .await?;
86
87                if workers.is_empty() {
88                    return Ok(Vec::new());
89                }
90                let mut all_jobs = Vec::new();
91                for worker in workers {
92                    let ids: Vec<String> = redis::cmd("SMEMBERS")
93                        .arg(&worker)
94                        .query_async(&mut conn)
95                        .await?;
96
97                    if ids.is_empty() {
98                        continue;
99                    };
100                    let data: Option<Value> = redis::cmd("HMGET")
101                        .arg(job_data_hash.clone())
102                        .arg(&ids)
103                        .query_async(&mut conn)
104                        .await?;
105
106                    let jobs: Vec<Request<T, RedisContext>> =
107                        deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
108                    all_jobs.extend(jobs);
109                }
110
111                Ok(all_jobs)
112            }
113            State::Done => {
114                let done_jobs_set = &queue.done_jobs_set();
115                let job_data_hash = &queue.job_data_hash();
116                let ids: Vec<String> = redis::cmd("ZRANGE")
117                    .arg(done_jobs_set)
118                    .arg(((page - 1) * 10).to_string())
119                    .arg((page * 10).to_string())
120                    .query_async(&mut conn)
121                    .await?;
122
123                if ids.is_empty() {
124                    return Ok(Vec::new());
125                }
126                let data: Option<Value> = redis::cmd("HMGET")
127                    .arg(job_data_hash)
128                    .arg(&ids)
129                    .query_async(&mut conn)
130                    .await?;
131
132                let jobs: Vec<Request<T, RedisContext>> =
133                    deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
134                Ok(jobs)
135            }
136            // State::Retry => Ok(Vec::new()),
137            State::Failed => {
138                let failed_jobs_set = &queue.failed_jobs_set();
139                let job_data_hash = &queue.job_data_hash();
140                let ids: Vec<String> = redis::cmd("ZRANGE")
141                    .arg(failed_jobs_set)
142                    .arg(((page - 1) * 10).to_string())
143                    .arg((page * 10).to_string())
144                    .query_async(&mut conn)
145                    .await?;
146                if ids.is_empty() {
147                    return Ok(Vec::new());
148                }
149                let data: Option<Value> = redis::cmd("HMGET")
150                    .arg(job_data_hash)
151                    .arg(&ids)
152                    .query_async(&mut conn)
153                    .await?;
154                let jobs: Vec<Request<T, RedisContext>> =
155                    deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
156
157                Ok(jobs)
158            }
159            State::Killed => {
160                let dead_jobs_set = &queue.dead_jobs_set();
161                let job_data_hash = &queue.job_data_hash();
162                let ids: Vec<String> = redis::cmd("ZRANGE")
163                    .arg(dead_jobs_set)
164                    .arg(((page - 1) * 10).to_string())
165                    .arg((page * 10).to_string())
166                    .query_async(&mut conn)
167                    .await?;
168
169                if ids.is_empty() {
170                    return Ok(Vec::new());
171                }
172                let data: Option<Value> = redis::cmd("HMGET")
173                    .arg(job_data_hash)
174                    .arg(&ids)
175                    .query_async(&mut conn)
176                    .await?;
177
178                let jobs: Vec<Request<T, RedisContext>> =
179                    deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
180
181                Ok(jobs)
182            }
183        }
184    }
185    async fn list_workers(&self) -> Result<Vec<Worker<WorkerState>>, redis::RedisError> {
186        let queue = self.get_config();
187        let consumers_set = &queue.consumers_set();
188        let mut conn = self.get_connection().clone();
189        let workers: Vec<String> = redis::cmd("ZRANGE")
190            .arg(consumers_set)
191            .arg("0")
192            .arg("-1")
193            .query_async(&mut conn)
194            .await?;
195        Ok(workers
196            .into_iter()
197            .map(|w| {
198                Worker::new(
199                    WorkerId::new(w.replace(&format!("{}:", &queue.inflight_jobs_set()), "")),
200                    WorkerState::new::<Self>(queue.get_namespace().to_owned()),
201                )
202            })
203            .collect())
204    }
205}
206
207fn deserialize_multiple_jobs<T, C: Codec<Compact = Vec<u8>>>(
208    jobs: Option<&Value>,
209) -> Option<Vec<Request<T, RedisContext>>>
210where
211    T: DeserializeOwned,
212{
213    let jobs = match jobs {
214        None => None,
215        Some(Value::Array(val)) => Some(val),
216        _ => {
217            // error!(
218            //     "Decoding Message Failed: {:?}",
219            //     "unknown result type for next message"
220            // );
221            None
222        }
223    };
224
225    jobs.map(|values| {
226        values
227            .iter()
228            .filter_map(|v| match v {
229                Value::BulkString(data) => {
230                    let inner = C::decode(data.to_vec())
231                        .map_err(|e| (ErrorKind::IoError, "Decode error", e.into().to_string()))
232                        .unwrap();
233                    Some(inner)
234                }
235                _ => None,
236            })
237            .collect()
238    })
239}