apalis_postgres/queries/
fetch_by_id.rs

1use apalis_core::backend::{BackendExt, FetchById, codec::Codec};
2
3use apalis_sql::from_row::{FromRowError, TaskRow};
4use ulid::Ulid;
5
6use crate::{CompactType, PgContext, PgTask, PgTaskId, PostgresStorage, from_row::PgTaskRow};
7
8impl<Args, D, F> FetchById<Args> for PostgresStorage<Args, CompactType, D, F>
9where
10    PostgresStorage<Args, CompactType, D, F>:
11        BackendExt<Context = PgContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
12    D: Codec<Args, Compact = CompactType>,
13    D::Error: std::error::Error + Send + Sync + 'static,
14    Args: 'static,
15{
16    fn fetch_by_id(
17        &mut self,
18        id: &PgTaskId,
19    ) -> impl Future<Output = Result<Option<PgTask<Args>>, Self::Error>> + Send {
20        let pool = self.pool.clone();
21        let id = id.to_string();
22        async move {
23            let task = sqlx::query_file_as!(PgTaskRow, "queries/task/find_by_id.sql", id)
24                .fetch_optional(&pool)
25                .await?
26                .map(|r: PgTaskRow| {
27                    let row: TaskRow = r.try_into()?;
28                    row.try_into_task_compact()
29                        .and_then(|a| {
30                            a.try_map(|t| D::decode(&t))
31                                .map_err(|e| FromRowError::DecodeError(e.into()))
32                        })
33                        .map_err(|e| sqlx::Error::Protocol(e.to_string()))
34                })
35                .transpose()?;
36            Ok(task)
37        }
38    }
39}