apalis-sqlite 1.0.0-rc.7

Background task processing for rust using apalis and sqlite
Documentation
use apalis_core::{
    backend::{BackendExt, Filter, ListAllTasks, ListTasks, codec::Codec},
    task::{Task, status::Status},
};
use apalis_sql::{TaskRow, from_row::FromRowError};
use ulid::Ulid;

use crate::{CompactType, SqliteContext, SqliteStorage, SqliteTask, from_row::SqliteTaskRow};

impl<Args, D, F> ListTasks<Args> for SqliteStorage<Args, D, F>
where
    Self: BackendExt<
            Context = SqliteContext,
            Compact = CompactType,
            IdType = Ulid,
            Error = sqlx::Error,
        >,
    D: Codec<Args, Compact = CompactType>,
    D::Error: std::error::Error + Send + Sync + 'static,
    Args: 'static,
{
    fn list_tasks(
        &self,
        queue: &str,
        filter: &Filter,
    ) -> impl Future<Output = Result<Vec<SqliteTask<Args>>, Self::Error>> + Send {
        let queue = queue.to_owned();
        let pool = self.pool.clone();
        let limit = filter.limit() as i32;
        let offset = filter.offset() as i32;
        let status = filter
            .status
            .as_ref()
            .unwrap_or(&Status::Pending)
            .to_string();
        async move {
            let tasks = sqlx::query_file_as!(
                SqliteTaskRow,
                "queries/backend/list_jobs.sql",
                status,
                queue,
                limit,
                offset
            )
            .fetch_all(&pool)
            .await?
            .into_iter()
            .map(|r| {
                let row: TaskRow = r
                    .try_into()
                    .map_err(|e: sqlx::Error| FromRowError::DecodeError(e.into()))?;
                row.try_into_task_compact().and_then(|t| {
                    t.try_map(|a| D::decode(&a).map_err(|e| FromRowError::DecodeError(e.into())))
                })
            })
            .collect::<Result<Vec<_>, _>>()
            .map_err(|e| sqlx::Error::Decode(e.into()))?;
            Ok(tasks)
        }
    }
}

impl<Args, D, F> ListAllTasks for SqliteStorage<Args, D, F>
where
    Self: BackendExt<
            Context = SqliteContext,
            Compact = CompactType,
            IdType = Ulid,
            Error = sqlx::Error,
        >,
{
    fn list_all_tasks(
        &self,
        filter: &Filter,
    ) -> impl Future<
        Output = Result<Vec<Task<Self::Compact, Self::Context, Self::IdType>>, Self::Error>,
    > + Send {
        let status = filter
            .status
            .as_ref()
            .map(|s| s.to_string())
            .unwrap_or(Status::Pending.to_string());
        let pool = self.pool.clone();
        let limit = filter.limit() as i32;
        let offset = filter.offset() as i32;
        async move {
            let tasks = sqlx::query_file_as!(
                SqliteTaskRow,
                "queries/backend/list_all_jobs.sql",
                status,
                limit,
                offset
            )
            .fetch_all(&pool)
            .await?
            .into_iter()
            .map(|r| {
                let row: TaskRow = r.try_into()?;
                row.try_into_task_compact()
                    .map_err(|e| sqlx::Error::Protocol(e.to_string()))
            })
            .collect::<Result<Vec<_>, _>>()?;
            Ok(tasks)
        }
    }
}