apalis_redis/queries/
fetch_by_id.rs1use apalis_core::{
2 backend::{BackendExt, FetchById, codec::Codec},
3 task::task_id::TaskId,
4};
5use redis::{Script, Value};
6use ulid::Ulid;
7
8use crate::{RedisContext, RedisStorage, RedisTask, fetcher::deserialize_with_meta};
9
10impl<Args, Conn, C> FetchById<Args> for RedisStorage<Args, Conn, C>
11where
12 RedisStorage<Args, Conn, C>: BackendExt<
13 Context = RedisContext,
14 Compact = Vec<u8>,
15 IdType = Ulid,
16 Error = redis::RedisError,
17 >,
18 C: Codec<Args, Compact = Vec<u8>> + Send,
19 C::Error: std::error::Error + Send + Sync + 'static,
20 Args: 'static + Send,
21 Conn: redis::aio::ConnectionLike + Send,
22{
23 async fn fetch_by_id(
24 &mut self,
25 task_id: &TaskId<Self::IdType>,
26 ) -> Result<Option<RedisTask<Args>>, Self::Error> {
27 let fetch_by_id_script = Script::new(include_str!("../../lua/fetch_by_id.lua"));
28 let result: Value = fetch_by_id_script
29 .key(self.config.job_data_hash())
30 .key(self.config.job_meta_hash())
31 .arg(task_id.to_string())
32 .invoke_async(&mut self.conn)
33 .await?;
34
35 match result {
36 Value::ServerError(s) => Err(s.into()),
37 Value::Array(ref data) => {
38 let tasks = deserialize_with_meta(data).expect("Failed to deserialize");
40
41 if let Some(task) = tasks.into_iter().take(1).next() {
42 let task = task.into_full_task::<Args, C>()?;
43 Ok(Some(task))
44 } else {
45 Ok(None)
46 }
47 }
48 _ => Ok(None),
49 }
50 }
51}