1use crate::RedisContext;
2use crate::RedisStorage;
3use apalis_core::backend::BackendExpose;
4use apalis_core::backend::Stat;
5use apalis_core::backend::WorkerState;
6use apalis_core::codec::json::JsonCodec;
7use apalis_core::codec::Codec;
8use apalis_core::request::Request;
9use apalis_core::request::State;
10use apalis_core::worker::Worker;
11use apalis_core::worker::WorkerId;
12use redis::{ErrorKind, Value};
13use serde::{de::DeserializeOwned, Serialize};
14
15type RedisCodec = JsonCodec<Vec<u8>>;
16
17impl<T> BackendExpose<T> for RedisStorage<T>
18where
19 T: 'static + Serialize + DeserializeOwned + Send + Unpin + Sync,
20{
21 type Request = Request<T, RedisContext>;
22 type Error = redis::RedisError;
23 async fn stats(&self) -> Result<Stat, redis::RedisError> {
24 let mut conn = self.get_connection().clone();
25 let queue = self.get_config();
26
27 let stats_script = self.scripts.stats.clone();
28
29 let results: Vec<usize> = stats_script
30 .key(queue.active_jobs_list())
31 .key(queue.consumers_set())
32 .key(queue.dead_jobs_set())
33 .key(queue.failed_jobs_set())
34 .key(queue.done_jobs_set())
35 .invoke_async(&mut conn)
36 .await?;
37
38 Ok(Stat {
39 pending: results[0],
40 running: results[1],
41 dead: results[2],
42 failed: results[3],
43 success: results[4],
44 })
45 }
46 async fn list_jobs(
47 &self,
48 status: &State,
49 page: i32,
50 ) -> Result<Vec<Self::Request>, redis::RedisError> {
51 let mut conn = self.get_connection().clone();
52 let queue = self.get_config();
53 match status {
54 State::Pending | State::Scheduled => {
55 let active_jobs_list = &queue.active_jobs_list();
56 let job_data_hash = &queue.job_data_hash();
57 let ids: Vec<String> = redis::cmd("LRANGE")
58 .arg(active_jobs_list)
59 .arg(((page - 1) * 10).to_string())
60 .arg((page * 10).to_string())
61 .query_async(&mut conn)
62 .await?;
63
64 if ids.is_empty() {
65 return Ok(Vec::new());
66 }
67 let data: Option<Value> = redis::cmd("HMGET")
68 .arg(job_data_hash)
69 .arg(&ids)
70 .query_async(&mut conn)
71 .await?;
72
73 let jobs: Vec<Request<T, RedisContext>> =
74 deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
75 Ok(jobs)
76 }
77 State::Running => {
78 let consumers_set = &queue.consumers_set();
79 let job_data_hash = &queue.job_data_hash();
80 let workers: Vec<String> = redis::cmd("ZRANGE")
81 .arg(consumers_set)
82 .arg("0")
83 .arg("-1")
84 .query_async(&mut conn)
85 .await?;
86
87 if workers.is_empty() {
88 return Ok(Vec::new());
89 }
90 let mut all_jobs = Vec::new();
91 for worker in workers {
92 let ids: Vec<String> = redis::cmd("SMEMBERS")
93 .arg(&worker)
94 .query_async(&mut conn)
95 .await?;
96
97 if ids.is_empty() {
98 continue;
99 };
100 let data: Option<Value> = redis::cmd("HMGET")
101 .arg(job_data_hash.clone())
102 .arg(&ids)
103 .query_async(&mut conn)
104 .await?;
105
106 let jobs: Vec<Request<T, RedisContext>> =
107 deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
108 all_jobs.extend(jobs);
109 }
110
111 Ok(all_jobs)
112 }
113 State::Done => {
114 let done_jobs_set = &queue.done_jobs_set();
115 let job_data_hash = &queue.job_data_hash();
116 let ids: Vec<String> = redis::cmd("ZRANGE")
117 .arg(done_jobs_set)
118 .arg(((page - 1) * 10).to_string())
119 .arg((page * 10).to_string())
120 .query_async(&mut conn)
121 .await?;
122
123 if ids.is_empty() {
124 return Ok(Vec::new());
125 }
126 let data: Option<Value> = redis::cmd("HMGET")
127 .arg(job_data_hash)
128 .arg(&ids)
129 .query_async(&mut conn)
130 .await?;
131
132 let jobs: Vec<Request<T, RedisContext>> =
133 deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
134 Ok(jobs)
135 }
136 State::Failed => {
138 let failed_jobs_set = &queue.failed_jobs_set();
139 let job_data_hash = &queue.job_data_hash();
140 let ids: Vec<String> = redis::cmd("ZRANGE")
141 .arg(failed_jobs_set)
142 .arg(((page - 1) * 10).to_string())
143 .arg((page * 10).to_string())
144 .query_async(&mut conn)
145 .await?;
146 if ids.is_empty() {
147 return Ok(Vec::new());
148 }
149 let data: Option<Value> = redis::cmd("HMGET")
150 .arg(job_data_hash)
151 .arg(&ids)
152 .query_async(&mut conn)
153 .await?;
154 let jobs: Vec<Request<T, RedisContext>> =
155 deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
156
157 Ok(jobs)
158 }
159 State::Killed => {
160 let dead_jobs_set = &queue.dead_jobs_set();
161 let job_data_hash = &queue.job_data_hash();
162 let ids: Vec<String> = redis::cmd("ZRANGE")
163 .arg(dead_jobs_set)
164 .arg(((page - 1) * 10).to_string())
165 .arg((page * 10).to_string())
166 .query_async(&mut conn)
167 .await?;
168
169 if ids.is_empty() {
170 return Ok(Vec::new());
171 }
172 let data: Option<Value> = redis::cmd("HMGET")
173 .arg(job_data_hash)
174 .arg(&ids)
175 .query_async(&mut conn)
176 .await?;
177
178 let jobs: Vec<Request<T, RedisContext>> =
179 deserialize_multiple_jobs::<_, RedisCodec>(data.as_ref()).unwrap();
180
181 Ok(jobs)
182 }
183 }
184 }
185 async fn list_workers(&self) -> Result<Vec<Worker<WorkerState>>, redis::RedisError> {
186 let queue = self.get_config();
187 let consumers_set = &queue.consumers_set();
188 let mut conn = self.get_connection().clone();
189 let workers: Vec<String> = redis::cmd("ZRANGE")
190 .arg(consumers_set)
191 .arg("0")
192 .arg("-1")
193 .query_async(&mut conn)
194 .await?;
195 Ok(workers
196 .into_iter()
197 .map(|w| {
198 Worker::new(
199 WorkerId::new(w.replace(&format!("{}:", &queue.inflight_jobs_set()), "")),
200 WorkerState::new::<Self>(queue.get_namespace().to_owned()),
201 )
202 })
203 .collect())
204 }
205}
206
207fn deserialize_multiple_jobs<T, C: Codec<Compact = Vec<u8>>>(
208 jobs: Option<&Value>,
209) -> Option<Vec<Request<T, RedisContext>>>
210where
211 T: DeserializeOwned,
212{
213 let jobs = match jobs {
214 None => None,
215 Some(Value::Array(val)) => Some(val),
216 _ => {
217 None
222 }
223 };
224
225 jobs.map(|values| {
226 values
227 .iter()
228 .filter_map(|v| match v {
229 Value::BulkString(data) => {
230 let inner = C::decode(data.to_vec())
231 .map_err(|e| (ErrorKind::IoError, "Decode error", e.into().to_string()))
232 .unwrap();
233 Some(inner)
234 }
235 _ => None,
236 })
237 .collect()
238 })
239}