1use std::str::FromStr;
2
3use apalis_core::{
4 backend::codec::Codec,
5 error::BoxDynError,
6 task::{Task, attempt::Attempt, status::Status, task_id::TaskId},
7 worker::context::WorkerContext,
8};
9use redis::{RedisError, Value, aio::ConnectionLike};
10use ulid::Ulid;
11
12use crate::{RedisStorage, build_error, config::RedisConfig, context::RedisContext};
13
14impl<Args, Conn, C> RedisStorage<Args, Conn, C>
15where
16 Args: Unpin + Send + Sync + 'static,
17 Conn: ConnectionLike + Send + Sync + 'static,
18 C: Codec<Args, Compact = Vec<u8>>,
19 C::Error: Into<BoxDynError>,
20{
21 pub async fn fetch_next(
23 worker: &WorkerContext,
24 config: &RedisConfig,
25 conn: &mut Conn,
26 ) -> Result<Vec<Task<Args, RedisContext, Ulid>>, RedisError> {
27 let fetch_jobs = redis::Script::new(include_str!("../lua/get_jobs.lua"));
28 let workers_set = config.workers_set();
29 let active_jobs_list = config.active_jobs_list();
30 let job_data_hash = config.job_data_hash();
31 let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker.name());
32 let signal_list = config.signal_list();
33
34 let result = fetch_jobs
35 .key(&workers_set)
36 .key(&active_jobs_list)
37 .key(&inflight_set)
38 .key(&job_data_hash)
39 .key(&signal_list)
40 .key(config.job_meta_hash())
41 .arg(config.get_buffer_size()) .arg(&inflight_set)
43 .invoke_async::<Vec<Value>>(&mut *conn)
44 .await;
45 match result {
46 Ok(jobs) => {
47 let mut processed = vec![];
48 let tasks = deserialize_with_meta(&jobs)?;
49 for unprocessed in tasks {
50 let mut task = unprocessed.into_full_task::<Args, C>()?;
51 task.parts.ctx.lock_by = Some(worker.name().to_string());
52 processed.push(task)
53 }
54 Ok(processed)
55 }
56 Err(e) => Err(e),
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct CompactTask<'a> {
64 pub data: &'a Vec<u8>,
66 pub attempts: u32,
68 pub max_attempts: u32,
70 pub status: Status,
72 pub task_id: TaskId<Ulid>,
74 pub meta: serde_json::Map<String, serde_json::Value>,
76}
77
78impl CompactTask<'_> {
79 pub fn into_full_compact(self) -> Result<Task<Vec<u8>, RedisContext, Ulid>, RedisError> {
81 let context = RedisContext {
82 max_attempts: self.max_attempts,
83 lock_by: None,
84 meta: self.meta,
85 };
86 let task = Task::builder(self.data.clone())
87 .with_task_id(self.task_id)
88 .with_status(self.status)
89 .with_attempt(Attempt::new_with_value(self.attempts as usize))
90 .with_ctx(context)
91 .build();
92 Ok(task)
93 }
94
95 pub fn into_full_task<Args: 'static, C>(
97 self,
98 ) -> Result<Task<Args, RedisContext, Ulid>, RedisError>
99 where
100 C: Codec<Args, Compact = Vec<u8>>,
101 C::Error: Into<BoxDynError>,
102 {
103 let args = if std::any::TypeId::of::<Args>() == std::any::TypeId::of::<Vec<u8>>() {
104 let ready = self.data.to_vec();
109 unsafe {
110 let job_ptr = &ready as *const Vec<u8> as *const Args;
111 let args = std::ptr::read(job_ptr);
112 std::mem::forget(ready);
113 args
114 }
115 } else {
116 let args: Args =
117 C::decode(self.data).map_err(|e| build_error(&e.into().to_string()))?;
118 args
119 };
120 let context = RedisContext {
121 max_attempts: self.max_attempts,
122 lock_by: None,
123 meta: self.meta,
124 };
125 let task = Task::builder(args)
126 .with_task_id(self.task_id)
127 .with_status(self.status)
128 .with_attempt(Attempt::new_with_value(self.attempts as usize))
129 .with_ctx(context)
130 .build();
131 Ok(task)
132 }
133}
134
135pub fn str_from_val<'a>(val: &'a redis::Value, field: &'a str) -> Result<&'a str, RedisError> {
137 match val {
138 redis::Value::BulkString(bytes) => {
139 str::from_utf8(bytes).map_err(|_| build_error(&format!("{field} not UTF-8")))
140 }
141 _ => Err(build_error(&format!("{field} not bulk string"))),
142 }
143}
144
145pub fn parse_u32(value: &Value, field: &str) -> Result<u32, RedisError> {
147 match value {
148 Value::BulkString(bytes) => {
149 let s = std::str::from_utf8(bytes)
150 .map_err(|_| build_error(&format!("{field} not UTF-8")))?;
151 s.parse::<u32>()
152 .map_err(|_| build_error(&format!("{field} not u32")))
153 }
154 _ => Err(build_error(&format!("{field} not bulk string"))),
155 }
156}
157
158pub fn deserialize_with_meta<'a>(
160 data: &'a Vec<redis::Value>,
161) -> Result<Vec<CompactTask<'a>>, RedisError> {
162 if data.len() != 2 {
163 return Err(build_error("Expected two elements: job_data and metadata"));
164 }
165 let job_data_list = match &data[0] {
166 redis::Value::Array(vals) => vals,
167 _ => return Err(build_error("Expected job_data to be array")),
168 };
169
170 let meta_list = match &data[1] {
171 redis::Value::Array(vals) => vals,
172 _ => return Err(build_error("Expected metadata to be array")),
173 };
174
175 if job_data_list.len() != meta_list.len() {
176 return Err(build_error("Job data and metadata length mismatch"));
177 }
178
179 let mut result = Vec::with_capacity(job_data_list.len());
180
181 for (data_val, meta_val) in job_data_list.into_iter().zip(meta_list.into_iter()) {
182 let data = match data_val {
183 redis::Value::BulkString(bytes) => bytes,
184 _ => return Err(build_error("Invalid job data format")),
185 };
186
187 let meta_fields = match meta_val {
188 redis::Value::Array(fields) => fields,
189 _ => return Err(build_error("Invalid metadata format")),
190 };
191
192 let task_id = TaskId::from_str(str_from_val(&meta_fields[0], "task_id")?)
193 .map_err(|e| build_error(&e.to_string()))?;
194 let attempts = parse_u32(&meta_fields[2], "attempts")?;
195 let max_attempts = parse_u32(&meta_fields[4], "max_attempts")?;
196 let status = Status::from_str(str_from_val(&meta_fields[6], "status")?)
197 .map_err(|e| build_error(&e.to_string()))?;
198
199 let meta = meta_fields[7..]
200 .chunks(2)
201 .filter_map(|chunk| {
202 if chunk.len() == 2 {
203 Some((
204 str_from_val(&chunk[0], "meta key").ok()?,
205 str_from_val(&chunk[1], "meta value").ok()?,
206 ))
207 } else {
208 None
209 }
210 })
211 .try_fold(serde_json::Map::new(), |mut acc, (key, val)| {
212 acc.insert(
213 key.to_owned(),
214 serde_json::from_str(val).unwrap_or_default(),
215 );
216 Ok::<_, RedisError>(acc)
217 })?;
218
219 result.push(CompactTask {
220 task_id,
221 data,
222 attempts,
223 max_attempts,
224 status,
225 meta,
226 });
227 }
228
229 Ok(result)
230}