apalis_postgres/queries/
fetch_by_id.rs1use apalis_core::{
2 backend::{BackendExt, FetchById, codec::Codec},
3 task::task_id::TaskId,
4};
5
6use apalis_sql::from_row::{FromRowError, TaskRow};
7use ulid::Ulid;
8
9use crate::{CompactType, PgContext, PgTask, PostgresStorage, from_row::PgTaskRow};
10
11impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, CompactType, D, F>
12where
13 PostgresStorage<Args, CompactType, D, F>:
14 BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
15 D: Codec<Args, Compact = CompactType>,
16 D::Error: std::error::Error + Send + Sync + 'static,
17 Args: 'static,
18{
19 fn fetch_by_id(
20 &mut self,
21 id: &TaskId<Ulid>,
22 ) -> impl Future<Output = Result<Option<PgTask<Args>>, Self::Error>> + Send {
23 let pool = self.pool.clone();
24 let id = id.to_string();
25 async move {
26 let task = sqlx::query_file_as!(PgTaskRow, "queries/task/find_by_id.sql", id)
27 .fetch_optional(&pool)
28 .await?
29 .map(|r: PgTaskRow| {
30 let row: TaskRow = r.try_into()?;
31 row.try_into_task_compact()
32 .and_then(|a| {
33 a.try_map(|t| D::decode(&t))
34 .map_err(|e| FromRowError::DecodeError(e.into()))
35 })
36 .map_err(|e| sqlx::Error::Protocol(e.to_string()))
37 })
38 .transpose()?;
39 Ok(task)
40 }
41 }
42}