pgqrs 0.15.3

A high-performance PostgreSQL-backed job queue for Rust applications
Documentation
use crate::error::Result;
use crate::store::dialect::SqlDialect;
use crate::store::query::QueryBuilder;
use crate::types::WorkerStatus;
use async_trait::async_trait;
use chrono::Utc;

#[async_trait]
pub(crate) trait DialectWorkerTable: crate::store::WorkerTable + Sync {
    type Dialect: SqlDialect;

    async fn execute_worker_update(&self, query: QueryBuilder) -> Result<u64>;
    async fn query_worker_status(&self, worker_id: i64) -> Result<WorkerStatus>;

    async fn ensure_shutdown_allowed(&self, _worker_id: i64) -> Result<()> {
        Ok(())
    }

    async fn dialect_heartbeat(&self, worker_id: i64) -> Result<()> {
        let now = Utc::now();

        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.heartbeat)
                .bind_datetime(Some(now))
                .bind_i64(worker_id),
            worker_id,
            None,
        )
        .await
    }

    async fn dialect_suspend(&self, worker_id: i64) -> Result<()> {
        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.suspend).bind_i64(worker_id),
            worker_id,
            Some((
                "suspended",
                "Worker must be Ready, Polling, or Interrupted to suspend",
            )),
        )
        .await
    }

    async fn dialect_complete_poll(&self, worker_id: i64) -> Result<()> {
        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.complete_poll).bind_i64(worker_id),
            worker_id,
            Some((
                "ready",
                "Worker must be in Polling state to complete polling",
            )),
        )
        .await
    }

    async fn dialect_resume(&self, worker_id: i64) -> Result<()> {
        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.resume).bind_i64(worker_id),
            worker_id,
            Some(("ready", "Worker must be in Suspended state to resume")),
        )
        .await
    }

    async fn dialect_poll(&self, worker_id: i64) -> Result<()> {
        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.poll).bind_i64(worker_id),
            worker_id,
            Some(("polling", "Worker must be Ready to start polling")),
        )
        .await
    }

    async fn dialect_interrupt(&self, worker_id: i64) -> Result<()> {
        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.interrupt).bind_i64(worker_id),
            worker_id,
            Some((
                "interrupted",
                "Worker must be in Polling state to be interrupted",
            )),
        )
        .await
    }

    async fn dialect_shutdown(&self, worker_id: i64) -> Result<()> {
        self.ensure_shutdown_allowed(worker_id).await?;
        let now = Utc::now();

        self.ensure_transition(
            QueryBuilder::new(Self::Dialect::WORKER.shutdown)
                .bind_i64(worker_id)
                .bind_datetime(Some(now)),
            worker_id,
            Some(("stopped", "Worker must be in Suspended state to shutdown")),
        )
        .await
    }

    async fn ensure_transition(
        &self,
        query: QueryBuilder,
        worker_id: i64,
        invalid_transition: Option<(&'static str, &'static str)>,
    ) -> Result<()> {
        let count = self.execute_worker_update(query).await?;

        if count > 0 {
            return Ok(());
        }

        if let Some((to, reason)) = invalid_transition {
            let current_status = self.query_worker_status(worker_id).await?;
            return Err(crate::error::Error::InvalidStateTransition {
                from: current_status.to_string(),
                to: to.to_string(),
                reason: reason.to_string(),
            });
        }

        Err(crate::error::Error::WorkerNotFound { id: worker_id })
    }
}