apalis_postgres/queries/
fetch_by_id.rs

1use apalis_core::{
2    backend::{Backend, FetchById, codec::Codec},
3    task::task_id::TaskId,
4};
5
6use apalis_sql::from_row::TaskRow;
7use ulid::Ulid;
8
9use crate::{CompactType, PgTask, PostgresStorage, context::PgContext, from_row::PgTaskRow};
10
11impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, CompactType, D, F>
12where
13    PostgresStorage<Args, CompactType, D, F>:
14        Backend<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
15    D: Codec<Args, Compact = CompactType>,
16    D::Error: std::error::Error + Send + Sync + 'static,
17    Args: 'static,
18{
19    fn fetch_by_id(
20        &mut self,
21        id: &TaskId<Ulid>,
22    ) -> impl Future<Output = Result<Option<PgTask<Args>>, Self::Error>> + Send {
23        let pool = self.pool.clone();
24        let id = id.to_string();
25        async move {
26            let task = sqlx::query_file_as!(PgTaskRow, "queries/task/find_by_id.sql", id)
27                .fetch_optional(&pool)
28                .await?
29                .map(|r: PgTaskRow| {
30                    let row: TaskRow = r.try_into()?;
31                    row.try_into_task::<D, _, Ulid>()
32                        .map_err(|e| sqlx::Error::Protocol(e.to_string()))
33                })
34                .transpose()?;
35            Ok(task)
36        }
37    }
38}