apalis_redis/
fetcher.rs

1use std::str::FromStr;
2
3use apalis_core::{
4    backend::codec::Codec,
5    error::BoxDynError,
6    task::{Task, attempt::Attempt, status::Status, task_id::TaskId},
7    worker::context::WorkerContext,
8};
9use redis::{RedisError, Value, aio::ConnectionLike};
10use ulid::Ulid;
11
12use crate::{RedisStorage, build_error, config::RedisConfig, context::RedisContext};
13
14impl<Args, Conn, C> RedisStorage<Args, Conn, C>
15where
16    Args: Unpin + Send + Sync + 'static,
17    Conn: ConnectionLike + Send + Sync + 'static,
18    C: Codec<Args, Compact = Vec<u8>>,
19    C::Error: Into<BoxDynError>,
20{
21    /// Fetches the next batch of tasks for the given worker.
22    pub async fn fetch_next(
23        worker: &WorkerContext,
24        config: &RedisConfig,
25        conn: &mut Conn,
26    ) -> Result<Vec<Task<Vec<u8>, RedisContext, Ulid>>, RedisError> {
27        let fetch_jobs = redis::Script::new(include_str!("../lua/get_jobs.lua"));
28        let workers_set = config.workers_set();
29        let active_jobs_list = config.active_jobs_list();
30        let job_data_hash = config.job_data_hash();
31        let inflight_set = format!("{}:{}", config.inflight_jobs_set(), worker.name());
32        let signal_list = config.signal_list();
33
34        let result = fetch_jobs
35            .key(&workers_set)
36            .key(&active_jobs_list)
37            .key(&inflight_set)
38            .key(&job_data_hash)
39            .key(&signal_list)
40            .key(config.job_meta_hash())
41            .arg(config.get_buffer_size()) // No of jobs to fetch
42            .arg(&inflight_set)
43            .invoke_async::<Vec<Value>>(&mut *conn)
44            .await;
45        match result {
46            Ok(jobs) => {
47                let mut processed = vec![];
48                let tasks = deserialize_with_meta(&jobs)?;
49                for unprocessed in tasks {
50                    let mut task = unprocessed.into_full_compact()?;
51                    task.parts.ctx.lock_by = Some(worker.name().to_string());
52                    processed.push(task)
53                }
54                Ok(processed)
55            }
56            Err(e) => Err(e),
57        }
58    }
59}
60
61/// A task structure that includes metadata.
62#[derive(Debug, Clone)]
63pub struct CompactTask<'a> {
64    /// The task data in its compact form.
65    pub data: &'a Vec<u8>,
66    /// The number of attempts made for this task.
67    pub attempts: u32,
68    /// The maximum number of attempts allowed for this task.
69    pub max_attempts: u32,
70    /// The current status of the task.
71    pub status: Status,
72    /// The unique identifier for the task.
73    pub task_id: TaskId<Ulid>,
74    /// Metadata associated with the task.
75    pub meta: serde_json::Map<String, serde_json::Value>,
76}
77
78impl CompactTask<'_> {
79    /// Converts the task data into a full Task with compact arguments.
80    pub fn into_full_compact(self) -> Result<Task<Vec<u8>, RedisContext, Ulid>, RedisError> {
81        let context = RedisContext {
82            max_attempts: self.max_attempts,
83            lock_by: None,
84            meta: self.meta,
85        };
86        let task = Task::builder(self.data.clone())
87            .with_task_id(self.task_id)
88            .with_status(self.status)
89            .with_attempt(Attempt::new_with_value(self.attempts as usize))
90            .with_ctx(context)
91            .build();
92        Ok(task)
93    }
94
95    /// Converts the task data into a full Task with decoded arguments.
96    pub fn into_full_task<Args: 'static, C>(
97        self,
98    ) -> Result<Task<Args, RedisContext, Ulid>, RedisError>
99    where
100        C: Codec<Args, Compact = Vec<u8>>,
101        C::Error: Into<BoxDynError>,
102    {
103        let args: Args = C::decode(self.data).map_err(|e| build_error(&e.into().to_string()))?;
104        let context = RedisContext {
105            max_attempts: self.max_attempts,
106            lock_by: None,
107            meta: self.meta,
108        };
109        let task = Task::builder(args)
110            .with_task_id(self.task_id)
111            .with_status(self.status)
112            .with_attempt(Attempt::new_with_value(self.attempts as usize))
113            .with_ctx(context)
114            .build();
115        Ok(task)
116    }
117}
118
119/// Extracts a &str from a redis::Value, returning an error if the value is not a bulk string.
120pub fn str_from_val<'a>(val: &'a redis::Value, field: &'a str) -> Result<&'a str, RedisError> {
121    match val {
122        redis::Value::BulkString(bytes) => {
123            str::from_utf8(bytes).map_err(|_| build_error(&format!("{field} not UTF-8")))
124        }
125        _ => Err(build_error(&format!("{field} not bulk string"))),
126    }
127}
128
129/// Parses a u32 from a redis::Value
130pub fn parse_u32(value: &Value, field: &str) -> Result<u32, RedisError> {
131    match value {
132        Value::BulkString(bytes) => {
133            let s = std::str::from_utf8(bytes)
134                .map_err(|_| build_error(&format!("{field} not UTF-8")))?;
135            s.parse::<u32>()
136                .map_err(|_| build_error(&format!("{field} not u32")))
137        }
138        _ => Err(build_error(&format!("{field} not bulk string"))),
139    }
140}
141
142/// Deserializes task data and metadata from Redis values.
143pub fn deserialize_with_meta<'a>(
144    data: &'a [redis::Value],
145) -> Result<Vec<CompactTask<'a>>, RedisError> {
146    if data.len() != 2 {
147        return Err(build_error("Expected two elements: job_data and metadata"));
148    }
149    let job_data_list = match &data[0] {
150        redis::Value::Array(vals) => vals,
151        _ => return Err(build_error("Expected job_data to be array")),
152    };
153
154    let meta_list = match &data[1] {
155        redis::Value::Array(vals) => vals,
156        _ => return Err(build_error("Expected metadata to be array")),
157    };
158
159    if job_data_list.len() != meta_list.len() {
160        return Err(build_error("Job data and metadata length mismatch"));
161    }
162
163    let mut result = Vec::with_capacity(job_data_list.len());
164
165    for (data_val, meta_val) in job_data_list.iter().zip(meta_list.iter()) {
166        let data = match data_val {
167            redis::Value::BulkString(bytes) => bytes,
168            _ => return Err(build_error("Invalid job data format")),
169        };
170
171        let meta_fields = match meta_val {
172            redis::Value::Array(fields) => fields,
173            _ => return Err(build_error("Invalid metadata format")),
174        };
175
176        let task_id = TaskId::from_str(str_from_val(&meta_fields[0], "task_id")?)
177            .map_err(|e| build_error(&e.to_string()))?;
178        let attempts = parse_u32(&meta_fields[2], "attempts")?;
179        let max_attempts = parse_u32(&meta_fields[4], "max_attempts")?;
180        let status = Status::from_str(str_from_val(&meta_fields[6], "status")?)
181            .map_err(|e| build_error(&e.to_string()))?;
182
183        let meta = meta_fields[7..]
184            .chunks(2)
185            .filter_map(|chunk| {
186                if chunk.len() == 2 {
187                    Some((
188                        str_from_val(&chunk[0], "meta key").ok()?,
189                        str_from_val(&chunk[1], "meta value").ok()?,
190                    ))
191                } else {
192                    None
193                }
194            })
195            .try_fold(serde_json::Map::new(), |mut acc, (key, val)| {
196                acc.insert(
197                    key.to_owned(),
198                    serde_json::from_str(val).unwrap_or_default(),
199                );
200                Ok::<_, RedisError>(acc)
201            })?;
202
203        result.push(CompactTask {
204            task_id,
205            data,
206            attempts,
207            max_attempts,
208            status,
209            meta,
210        });
211    }
212
213    Ok(result)
214}