apalis_redis/queries/
fetch_by_id.rs

1use apalis_core::{
2    backend::{BackendExt, FetchById, codec::Codec},
3    task::task_id::TaskId,
4};
5use redis::{Script, Value};
6use ulid::Ulid;
7
8use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
9
10impl<Args, Conn, C> FetchById<Args> for RedisStorage<Args, Conn, C>
11where
12    RedisStorage<Args, Conn, C>: BackendExt<
13            Context = RedisContext,
14            Compact = Vec<u8>,
15            IdType = Ulid,
16            Error = redis::RedisError,
17        >,
18    C: Codec<Args, Compact = Vec<u8>> + Send,
19    C::Error: std::error::Error + Send + Sync + 'static,
20    Args: 'static + Send,
21    Conn: redis::aio::ConnectionLike + Send,
22{
23    async fn fetch_by_id(
24        &mut self,
25        task_id: &TaskId<Self::IdType>,
26    ) -> Result<Option<RedisTask<Args>>, Self::Error> {
27        let fetch_by_id_script = Script::new(include_str!("../../lua/fetch_by_id.lua"));
28        let result: Value = fetch_by_id_script
29            .key(self.config.job_data_hash())
30            .key(self.config.job_meta_hash())
31            .arg(task_id.to_string())
32            .invoke_async(&mut self.conn)
33            .await?;
34
35        match result {
36            Value::ServerError(s) => Err(s.into()),
37            Value::Array(ref data) => {
38                // Reuse your existing parser
39                let tasks = deserialize_with_meta(data).expect("Failed to deserialize");
40
41                if let Some(task) = tasks.into_iter().take(1).next() {
42                    let task = task.into_full_task::<Args, C>()?;
43                    Ok(Some(task))
44                } else {
45                    Ok(None)
46                }
47            }
48            _ => Ok(None),
49        }
50    }
51}