rexecutor-sqlx 0.1.1

A robust job processing library
Documentation
use std::pin::Pin;

use async_stream::stream;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::Stream;
use rexecutor::{
    backend::{Backend, BackendError, EnqueuableJob, ExecutionError, Job, Query},
    executor::ExecutorIdentifier,
    job::JobId,
    pruner::PruneSpec,
};
use tokio::sync::mpsc;
use tracing::instrument;

use crate::{RexecutorPgBackend, map_err, stream::ReadyJobStream};

impl RexecutorPgBackend {
    fn handle_update(result: sqlx::Result<u64>, job_id: JobId) -> Result<(), BackendError> {
        match result {
            Ok(0) => Err(BackendError::JobNotFound(job_id)),
            Ok(1) => Ok(()),
            Ok(_) => Err(BackendError::BadState),
            Err(error) => Err(map_err(error)),
        }
    }
}

#[async_trait]
impl Backend for RexecutorPgBackend {
    #[instrument(skip(self))]
    async fn subscribe_ready_jobs(
        &self,
        executor_identifier: ExecutorIdentifier,
    ) -> Pin<Box<dyn Stream<Item = Result<Job, BackendError>> + Send>> {
        let (sender, receiver) = mpsc::unbounded_channel();
        self.subscribers
            .write()
            .await
            .entry(executor_identifier.as_str())
            .or_default()
            .push(sender);

        let mut stream: ReadyJobStream = ReadyJobStream {
            receiver,
            backend: self.clone(),
            executor_identifier,
        };
        Box::pin(stream! {
            loop {
                yield stream.next().await;
            }
        })
    }
    async fn enqueue<'a>(&self, job: EnqueuableJob<'a>) -> Result<JobId, BackendError> {
        if job.uniqueness_criteria.is_some() {
            self.insert_unique_job(job).await
        } else {
            self.insert_job(job).await
        }
        .map_err(map_err)
    }
    async fn mark_job_complete(&self, id: JobId) -> Result<(), BackendError> {
        let result = self._mark_job_complete(id).await;
        Self::handle_update(result, id)
    }
    async fn mark_job_retryable(
        &self,
        id: JobId,
        next_scheduled_at: DateTime<Utc>,
        error: ExecutionError,
    ) -> Result<(), BackendError> {
        let result = self._mark_job_retryable(id, next_scheduled_at, error).await;
        Self::handle_update(result, id)
    }
    async fn mark_job_discarded(
        &self,
        id: JobId,
        error: ExecutionError,
    ) -> Result<(), BackendError> {
        let result = self._mark_job_discarded(id, error).await;
        Self::handle_update(result, id)
    }
    async fn mark_job_cancelled(
        &self,
        id: JobId,
        error: ExecutionError,
    ) -> Result<(), BackendError> {
        let result = self._mark_job_cancelled(id, error).await;
        Self::handle_update(result, id)
    }
    async fn mark_job_snoozed(
        &self,
        id: JobId,
        next_scheduled_at: DateTime<Utc>,
    ) -> Result<(), BackendError> {
        let result = self._mark_job_snoozed(id, next_scheduled_at).await;
        Self::handle_update(result, id)
    }
    async fn prune_jobs(&self, spec: &PruneSpec) -> Result<(), BackendError> {
        self.delete_from_spec(spec).await.map_err(map_err)
    }
    async fn rerun_job(&self, id: JobId) -> Result<(), BackendError> {
        let result = self.rerun(id).await;
        Self::handle_update(result, id)
    }
    async fn update_job(&self, job: Job) -> Result<(), BackendError> {
        let id = job.id.into();
        let result = self.update(job).await;
        Self::handle_update(result, id)
    }
    async fn query<'a>(&self, query: Query<'a>) -> Result<Vec<Job>, BackendError> {
        self.run_query(query)
            .await
            .map_err(map_err)?
            .into_iter()
            .map(TryFrom::try_from)
            .collect()
    }
}

#[cfg(test)]
mod test {
    use crate::JobStatus;
    use crate::types::Job;

    use super::*;
    use chrono::TimeDelta;
    use rexecutor::job::ErrorType;
    use serde_json::Value;
    use sqlx::PgPool;

    impl From<PgPool> for RexecutorPgBackend {
        fn from(pool: PgPool) -> Self {
            Self {
                pool,
                subscribers: Default::default(),
            }
        }
    }

    struct MockJob<'a>(EnqueuableJob<'a>);

    impl<'a> From<MockJob<'a>> for EnqueuableJob<'a> {
        fn from(value: MockJob<'a>) -> Self {
            value.0
        }
    }

    impl<'a> Default for MockJob<'a> {
        fn default() -> Self {
            Self(EnqueuableJob {
                executor: "executor".to_owned(),
                data: Value::String("data".to_owned()),
                metadata: Value::String("metadata".to_owned()),
                max_attempts: 5,
                scheduled_at: Utc::now(),
                tags: Default::default(),
                priority: 0,
                uniqueness_criteria: None,
            })
        }
    }

    impl<'a> MockJob<'a> {
        const EXECUTOR: &'static str = "executor";

        async fn enqueue(self, backend: impl Backend) -> JobId {
            backend.enqueue(self.0).await.unwrap()
        }

        fn with_scheduled_at(self, scheduled_at: DateTime<Utc>) -> Self {
            Self(EnqueuableJob {
                scheduled_at,
                ..self.0
            })
        }
    }

    impl RexecutorPgBackend {
        async fn all_jobs(&self) -> sqlx::Result<Vec<Job>> {
            sqlx::query_as!(
                Job,
                r#"SELECT
                    id,
                    status AS "status: JobStatus",
                    executor,
                    data,
                    metadata,
                    attempt,
                    max_attempts,
                    priority,
                    tags,
                    errors,
                    inserted_at,
                    scheduled_at,
                    attempted_at,
                    completed_at,
                    cancelled_at,
                    discarded_at
                FROM rexecutor_jobs
                "#
            )
            .fetch_all(&self.pool)
            .await
        }
    }

    rexecutor::backend::testing::test_suite!(
        attr: sqlx::test,
        args: (pool: PgPool),
        backend: RexecutorPgBackend::from_pool(pool).await.unwrap()
    );

    #[sqlx::test]
    async fn load_job_mark_as_executing_for_executor_returns_none_when_db_empty(pool: PgPool) {
        let backend: RexecutorPgBackend = pool.into();

        let job = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap();

        assert!(job.is_none());
    }

    #[sqlx::test]
    async fn load_job_mark_as_executing_for_executor_returns_job_when_ready_for_execution(
        pool: PgPool,
    ) {
        let backend: RexecutorPgBackend = pool.into();

        let job_id = MockJob::default().enqueue(&backend).await;

        let job = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap()
            .expect("Should return a job");

        assert_eq!(job.id, job_id);
        assert_eq!(job.status, JobStatus::Executing);
    }

    #[sqlx::test]
    async fn load_job_mark_as_executing_for_executor_does_not_return_executing_jobs(pool: PgPool) {
        let backend: RexecutorPgBackend = pool.into();

        MockJob::default().enqueue(&backend).await;

        let _ = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap()
            .expect("Should return a job");

        let job = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap();

        assert!(job.is_none());
    }

    #[sqlx::test]
    async fn load_job_mark_as_executing_for_executor_returns_retryable_jobs(pool: PgPool) {
        let backend: RexecutorPgBackend = pool.into();

        let job_id = MockJob::default().enqueue(&backend).await;
        backend
            .mark_job_retryable(
                job_id,
                Utc::now(),
                ExecutionError {
                    error_type: ErrorType::Panic,
                    message: "Oh dear".to_owned(),
                },
            )
            .await
            .unwrap();

        let job = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap()
            .expect("Should return a job");

        assert_eq!(job.id, job_id);
        assert_eq!(job.status, JobStatus::Executing);
    }

    #[sqlx::test]
    async fn load_job_mark_as_executing_for_executor_returns_job_when_job_scheduled_in_past(
        pool: PgPool,
    ) {
        let backend: RexecutorPgBackend = pool.into();
        let job_id = MockJob::default()
            .with_scheduled_at(Utc::now() - TimeDelta::hours(3))
            .enqueue(&backend)
            .await;

        let job = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap()
            .expect("Should return a job");

        assert_eq!(job.id, job_id);
        assert_eq!(job.status, JobStatus::Executing);
    }

    #[sqlx::test]
    async fn load_job_mark_as_executing_for_executor_returns_oldest_scheduled_at_executable_job(
        pool: PgPool,
    ) {
        let backend: RexecutorPgBackend = pool.into();
        let expected_job_id = MockJob::default()
            .with_scheduled_at(Utc::now() - TimeDelta::hours(3))
            .enqueue(&backend)
            .await;

        let _ = MockJob::default().enqueue(&backend).await;

        let job_id = MockJob::default().enqueue(&backend).await;
        backend.mark_job_complete(job_id).await.unwrap();

        let job_id = MockJob::default().enqueue(&backend).await;
        backend
            .mark_job_discarded(
                job_id,
                ExecutionError {
                    error_type: ErrorType::Panic,
                    message: "Oh dear".to_owned(),
                },
            )
            .await
            .unwrap();

        let job_id = MockJob::default().enqueue(&backend).await;
        backend
            .mark_job_cancelled(
                job_id,
                ExecutionError {
                    error_type: ErrorType::Cancelled,
                    message: "Not needed".to_owned(),
                },
            )
            .await
            .unwrap();

        let job = backend
            .load_job_mark_as_executing_for_executor(MockJob::EXECUTOR)
            .await
            .unwrap()
            .expect("Should return a job");

        assert_eq!(job.id, expected_job_id);
        assert_eq!(job.status, JobStatus::Executing);
    }

    #[sqlx::test]
    async fn enqueue_test(pool: PgPool) {
        let backend: RexecutorPgBackend = pool.into();
        let job = MockJob::default();

        let result = backend.enqueue(job.into()).await;

        assert!(result.is_ok());

        let all_jobs = backend.all_jobs().await.unwrap();

        assert_eq!(all_jobs.len(), 1);
    }
}