1use std::str::FromStr;
2
3use apalis_core::{
4 backend::codec::Codec,
5 error::BoxDynError,
6 task::{Task, attempt::Attempt, status::Status, task_id::TaskId},
7 worker::context::WorkerContext,
8};
9use redis::{RedisError, Value, aio::ConnectionLike};
10use ulid::Ulid;
11
12use crate::{RedisStorage, build_error, config::RedisConfig, context::RedisContext};
13
14impl<Args, Conn, C> RedisStorage<Args, Conn, C>
15where
16 Args: Unpin + Send + Sync + 'static,
17 Conn: ConnectionLike + Send + Sync + 'static,
18 C: Codec<Args, Compact = Vec<u8>>,
19 C::Error: Into<BoxDynError>,
20{
21 pub async fn fetch_next(
23 worker: &WorkerContext,
24 config: &RedisConfig,
25 conn: &mut Conn,
26 ) -> Result<Vec<Task<Vec<u8>, RedisContext, Ulid>>, RedisError> {
27 let fetch_jobs = redis::Script::new(include_str!("../lua/get_jobs.lua"));
28 let workers_set = config.workers_set();
29 let active_jobs_list = config.active_jobs_list();
30 let job_data_hash = config.job_data_hash();
31 let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker.name());
32 let signal_list = config.signal_list();
33
34 let result = fetch_jobs
35 .key(&workers_set)
36 .key(&active_jobs_list)
37 .key(&inflight_set)
38 .key(&job_data_hash)
39 .key(&signal_list)
40 .key(config.job_meta_hash())
41 .arg(config.get_buffer_size()) .arg(&inflight_set)
43 .invoke_async::<Vec<Value>>(&mut *conn)
44 .await;
45 match result {
46 Ok(jobs) => {
47 let mut processed = vec![];
48 let tasks = deserialize_with_meta(&jobs)?;
49 for unprocessed in tasks {
50 let mut task = unprocessed.into_full_compact()?;
51 task.parts.ctx.lock_by = Some(worker.name().to_string());
52 processed.push(task)
53 }
54 Ok(processed)
55 }
56 Err(e) => Err(e),
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct CompactTask<'a> {
64 pub data: &'a Vec<u8>,
66 pub attempts: u32,
68 pub max_attempts: u32,
70 pub status: Status,
72 pub task_id: TaskId<Ulid>,
74 pub meta: serde_json::Map<String, serde_json::Value>,
76}
77
78impl CompactTask<'_> {
79 pub fn into_full_compact(self) -> Result<Task<Vec<u8>, RedisContext, Ulid>, RedisError> {
81 let context = RedisContext {
82 max_attempts: self.max_attempts,
83 lock_by: None,
84 meta: self.meta,
85 };
86 let task = Task::builder(self.data.clone())
87 .with_task_id(self.task_id)
88 .with_status(self.status)
89 .with_attempt(Attempt::new_with_value(self.attempts as usize))
90 .with_ctx(context)
91 .build();
92 Ok(task)
93 }
94
95 pub fn into_full_task<Args: 'static, C>(
97 self,
98 ) -> Result<Task<Args, RedisContext, Ulid>, RedisError>
99 where
100 C: Codec<Args, Compact = Vec<u8>>,
101 C::Error: Into<BoxDynError>,
102 {
103 let args: Args = C::decode(self.data).map_err(|e| build_error(&e.into().to_string()))?;
104 let context = RedisContext {
105 max_attempts: self.max_attempts,
106 lock_by: None,
107 meta: self.meta,
108 };
109 let task = Task::builder(args)
110 .with_task_id(self.task_id)
111 .with_status(self.status)
112 .with_attempt(Attempt::new_with_value(self.attempts as usize))
113 .with_ctx(context)
114 .build();
115 Ok(task)
116 }
117}
118
119pub fn str_from_val<'a>(val: &'a redis::Value, field: &'a str) -> Result<&'a str, RedisError> {
121 match val {
122 redis::Value::BulkString(bytes) => {
123 str::from_utf8(bytes).map_err(|_| build_error(&format!("{field} not UTF-8")))
124 }
125 _ => Err(build_error(&format!("{field} not bulk string"))),
126 }
127}
128
129pub fn parse_u32(value: &Value, field: &str) -> Result<u32, RedisError> {
131 match value {
132 Value::BulkString(bytes) => {
133 let s = std::str::from_utf8(bytes)
134 .map_err(|_| build_error(&format!("{field} not UTF-8")))?;
135 s.parse::<u32>()
136 .map_err(|_| build_error(&format!("{field} not u32")))
137 }
138 _ => Err(build_error(&format!("{field} not bulk string"))),
139 }
140}
141
142pub fn deserialize_with_meta<'a>(
144 data: &'a [redis::Value],
145) -> Result<Vec<CompactTask<'a>>, RedisError> {
146 if data.len() != 2 {
147 return Err(build_error("Expected two elements: job_data and metadata"));
148 }
149 let job_data_list = match &data[0] {
150 redis::Value::Array(vals) => vals,
151 _ => return Err(build_error("Expected job_data to be array")),
152 };
153
154 let meta_list = match &data[1] {
155 redis::Value::Array(vals) => vals,
156 _ => return Err(build_error("Expected metadata to be array")),
157 };
158
159 if job_data_list.len() != meta_list.len() {
160 return Err(build_error("Job data and metadata length mismatch"));
161 }
162
163 let mut result = Vec::with_capacity(job_data_list.len());
164
165 for (data_val, meta_val) in job_data_list.iter().zip(meta_list.iter()) {
166 let data = match data_val {
167 redis::Value::BulkString(bytes) => bytes,
168 _ => return Err(build_error("Invalid job data format")),
169 };
170
171 let meta_fields = match meta_val {
172 redis::Value::Array(fields) => fields,
173 _ => return Err(build_error("Invalid metadata format")),
174 };
175
176 let task_id = TaskId::from_str(str_from_val(&meta_fields[0], "task_id")?)
177 .map_err(|e| build_error(&e.to_string()))?;
178 let attempts = parse_u32(&meta_fields[2], "attempts")?;
179 let max_attempts = parse_u32(&meta_fields[4], "max_attempts")?;
180 let status = Status::from_str(str_from_val(&meta_fields[6], "status")?)
181 .map_err(|e| build_error(&e.to_string()))?;
182
183 let meta = meta_fields[7..]
184 .chunks(2)
185 .filter_map(|chunk| {
186 if chunk.len() == 2 {
187 Some((
188 str_from_val(&chunk[0], "meta key").ok()?,
189 str_from_val(&chunk[1], "meta value").ok()?,
190 ))
191 } else {
192 None
193 }
194 })
195 .try_fold(serde_json::Map::new(), |mut acc, (key, val)| {
196 acc.insert(
197 key.to_owned(),
198 serde_json::from_str(val).unwrap_or_default(),
199 );
200 Ok::<_, RedisError>(acc)
201 })?;
202
203 result.push(CompactTask {
204 task_id,
205 data,
206 attempts,
207 max_attempts,
208 status,
209 meta,
210 });
211 }
212
213 Ok(result)
214}