apalis_redis/queries/
fetch_by_id.rs1use apalis_core::{
2 backend::{Backend, FetchById, codec::Codec},
3 task::task_id::TaskId,
4};
5use redis::{Script, Value};
6use ulid::Ulid;
7
8use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
9
10impl<Args, Conn, C> FetchById<Args> for RedisStorage<Args, Conn, C>
11where
12 RedisStorage<Args, Conn, C>: Backend<Context = RedisContext, Compact = Vec<u8>, IdType = Ulid, Error = redis::RedisError>,
13 C: Codec<Args, Compact = Vec<u8>> + Send,
14 C::Error: std::error::Error + Send + Sync + 'static,
15 Args: 'static + Send,
16 Conn: redis::aio::ConnectionLike + Send,
17{
18 async fn fetch_by_id(
19 &mut self,
20 task_id: &TaskId<Self::IdType>,
21 ) -> Result<Option<RedisTask<Args>>, Self::Error> {
22 let fetch_by_id_script = Script::new(include_str!("../../lua/fetch_by_id.lua"));
23 let result: Value = fetch_by_id_script
24 .key(self.config.job_data_hash())
25 .key(self.config.job_meta_hash())
26 .arg(task_id.to_string())
27 .invoke_async(&mut self.conn)
28 .await?;
29
30 match result {
31 Value::ServerError(s) => Err(s.into()),
32 Value::Array(ref data) => {
33 let tasks = deserialize_with_meta(&data).expect("Failed to deserialize");
35
36 if let Some(task) = tasks.into_iter().take(1).next() {
37 let task = task.into_full_task::<Args, C>()?;
38 Ok(Some(task))
39 } else {
40 Ok(None)
41 }
42 }
43 _ => Ok(None),
44 }
45 }
46}