apalis_redis/queries/
fetch_by_id.rs

1use apalis_core::{
2    backend::{Backend, FetchById, codec::Codec},
3    task::task_id::TaskId,
4};
5use redis::{Script, Value};
6use ulid::Ulid;
7
8use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
9
10impl<Args, Conn, C> FetchById<Args> for RedisStorage<Args, Conn, C>
11where
12    RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
13    C: Codec<Args, Compact = Vec<u8>> + Send,
14    C::Error: std::error::Error + Send + Sync + 'static,
15    Args: 'static + Send,
16    Conn: redis::aio::ConnectionLike + Send,
17{
18    async fn fetch_by_id(
19        &mut self,
20        task_id: &TaskId<Self::IdType>,
21    ) -> Result<Option<RedisTask<Args>>, Self::Error> {
22        let fetch_by_id_script = Script::new(include_str!("../../lua/fetch_by_id.lua"));
23        let result: Value = fetch_by_id_script
24            .key(self.config.job_data_hash())
25            .key(self.config.job_meta_hash())
26            .arg(task_id.to_string())
27            .invoke_async(&mut self.conn)
28            .await?;
29
30        match result {
31            Value::ServerError(s) => Err(s.into()),
32            Value::Array(ref data) => {
33                // Reuse your existing parser
34                let tasks = deserialize_with_meta(&data).expect("Failed to deserialize");
35
36                if let Some(task) = tasks.into_iter().take(1).next() {
37                    let task = task.into_full_task::<Args, C>()?;
38                    Ok(Some(task))
39                } else {
40                    Ok(None)
41                }
42            }
43            _ => Ok(None),
44        }
45    }
46}