apalis_sqlite/queries/
fetch_by_id.rs1use apalis_core::{
2 backend::{BackendExt, FetchById, codec::Codec},
3 task::task_id::TaskId,
4};
5use apalis_sql::from_row::{FromRowError, TaskRow};
6use ulid::Ulid;
7
8use crate::{CompactType, SqlContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};
9
10impl<Args, D, F> FetchById<Args> for SqliteStorage<Args, D, F>
11where
12 Self:
13 BackendExt<Context = SqlContext, Compact = CompactType, IdType = Ulid, Error = sqlx::Error>,
14 D: Codec<Args, Compact = CompactType>,
15 D::Error: std::error::Error + Send + Sync + 'static,
16 Args: 'static,
17{
18 fn fetch_by_id(
19 &mut self,
20 id: &TaskId<Ulid>,
21 ) -> impl Future<Output = Result<Option<SqliteTask<Args>>, Self::Error>> + Send {
22 let pool = self.pool.clone();
23 let id = id.to_string();
24 async move {
25 let task = sqlx::query_file_as!(SqliteTaskRow, "queries/task/find_by_id.sql", id)
26 .fetch_optional(&pool)
27 .await?
28 .map(|r| {
29 let row: TaskRow = r
30 .try_into()
31 .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
32 row.try_into_task_compact().and_then(|t| {
33 t.try_map(|a| {
34 D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into()))
35 })
36 })
37 })
38 .transpose()
39 .map_err(|e| sqlx::Error::Protocol(e.to_string()))?;
40 Ok(task)
41 }
42 }
43}