apalis_sqlite/queries/
fetch_by_id.rs

1use apalis_core::{
2    backend::{BackendExt, FetchById, codec::Codec},
3    task::task_id::TaskId,
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, SqliteContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};
9
10impl<Args, D, F> FetchById<Args> for SqliteStorage<Args, D, F>
11where
12    Self: BackendExt<
13            Context = SqliteContext,
14            Compact = CompactType,
15            IdType = Ulid,
16            Error = sqlx::Error,
17        >,
18    D: Codec<Args, Compact = CompactType>,
19    D::Error: std::error::Error + Send + Sync + 'static,
20    Args: 'static,
21{
22    fn fetch_by_id(
23        &mut self,
24        id: &TaskId<Ulid>,
25    ) -> impl Future<Output = Result<Option<SqliteTask<Args>>, Self::Error>> + Send {
26        let pool = self.pool.clone();
27        let id = id.to_string();
28        async move {
29            let task = sqlx::query_file_as!(SqliteTaskRow, "queries/task/find_by_id.sql", id)
30                .fetch_optional(&pool)
31                .await?
32                .map(|r| {
33                    let row: TaskRow = r
34                        .try_into()
35                        .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
36                    row.try_into_task_compact().and_then(|t| {
37                        t.try_map(|a| {
38                            D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into()))
39                        })
40                    })
41                })
42                .transpose()
43                .map_err(|e| sqlx::Error::Protocol(e.to_string()))?;
44            Ok(task)
45        }
46    }
47}