apalis-sqlite 1.0.0-rc.7

Background task processing for rust using apalis and sqlite
Documentation
use apalis_core::{
    backend::{BackendExt, FetchById, codec::Codec},
    task::task_id::TaskId,
};
use apalis_sql::{TaskRow, from_row::FromRowError};
use ulid::Ulid;

use crate::{CompactType, SqliteContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};

impl<Args, D, F> FetchById<Args> for SqliteStorage<Args, D, F>
where
    Self: BackendExt<
            Context = SqliteContext,
            Compact = CompactType,
            IdType = Ulid,
            Error = sqlx::Error,
        >,
    D: Codec<Args, Compact = CompactType>,
    D::Error: std::error::Error + Send + Sync + 'static,
    Args: 'static,
{
    fn fetch_by_id(
        &mut self,
        id: &TaskId<Ulid>,
    ) -> impl Future<Output = Result<Option<SqliteTask<Args>>, Self::Error>> + Send {
        let pool = self.pool.clone();
        let id = id.to_string();
        async move {
            let task = sqlx::query_file_as!(SqliteTaskRow, "queries/task/find_by_id.sql", id)
                .fetch_optional(&pool)
                .await?
                .map(|r| {
                    let row: TaskRow = r
                        .try_into()
                        .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
                    row.try_into_task_compact().and_then(|t| {
                        t.try_map(|a| {
                            D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into()))
                        })
                    })
                })
                .transpose()
                .map_err(|e| sqlx::Error::Protocol(e.to_string()))?;
            Ok(task)
        }
    }
}