pgqrs 0.15.2

A high-performance PostgreSQL-backed job queue for Rust applications
Documentation
//! Postgres implementation of the Store trait.

use crate::store::{
    DbStateTable, MessageTable, QueueTable, RunRecordTable, StepRecordTable, Store,
    Worker as WorkerTrait, WorkerTable, WorkflowTable,
};
use async_trait::async_trait;
use sqlx::PgPool;
use std::sync::Arc;

pub(crate) mod dialect;
pub mod tables;

use self::tables::db_state::DbState as PostgresDbState;
use self::tables::pgqrs_messages::Messages as PostgresMessageTable;
use self::tables::pgqrs_queues::Queues as PostgresQueueTable;
use self::tables::pgqrs_workers::Workers as PostgresWorkerTable;
use self::tables::pgqrs_workflow_runs::RunRecords as PostgresRunRecordTable;
use self::tables::pgqrs_workflow_steps::StepRecords as PostgresStepRecordTable;
use self::tables::pgqrs_workflows::Workflows as PostgresWorkflowTable;

use crate::config::Config;

pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("migrations/postgres");

#[derive(Debug, Clone)]
pub struct PostgresStore {
    pool: PgPool,
    config: Config,
    queues: Arc<PostgresQueueTable>,
    messages: Arc<PostgresMessageTable>,
    workers: Arc<PostgresWorkerTable>,
    db_state: Arc<PostgresDbState>,
    workflows: Arc<PostgresWorkflowTable>,
    workflow_runs: Arc<PostgresRunRecordTable>,
    workflow_steps: Arc<PostgresStepRecordTable>,
}

impl PostgresStore {
    pub fn new(pool: PgPool, config: &Config) -> Self {
        Self {
            pool: pool.clone(),
            config: config.clone(),
            queues: Arc::new(PostgresQueueTable::new(pool.clone())),
            messages: Arc::new(PostgresMessageTable::new(pool.clone())),
            workers: Arc::new(PostgresWorkerTable::new(pool.clone())),
            db_state: Arc::new(PostgresDbState::new(pool.clone())),
            workflows: Arc::new(PostgresWorkflowTable::new(pool.clone())),
            workflow_runs: Arc::new(PostgresRunRecordTable::new(pool.clone())),
            workflow_steps: Arc::new(PostgresStepRecordTable::new(pool)),
        }
    }

    /// Get access to the underlying PgPool.
    pub fn pool(&self) -> &PgPool {
        &self.pool
    }
}

#[async_trait]
impl Store for PostgresStore {
    async fn execute_raw(&self, sql: &str) -> crate::error::Result<()> {
        sqlx::raw_sql(sql).execute(&self.pool).await?;
        Ok(())
    }

    async fn execute_raw_with_i64(&self, sql: &str, param: i64) -> crate::error::Result<()> {
        sqlx::query(sql).bind(param).execute(&self.pool).await?;
        Ok(())
    }

    async fn execute_raw_with_two_i64(
        &self,
        sql: &str,
        param1: i64,
        param2: i64,
    ) -> crate::error::Result<()> {
        sqlx::query(sql)
            .bind(param1)
            .bind(param2)
            .execute(&self.pool)
            .await?;
        Ok(())
    }

    async fn query_int(&self, sql: &str) -> crate::error::Result<i64> {
        use sqlx::Row;
        let row = sqlx::raw_sql(sql).fetch_one(&self.pool).await?;
        Ok(row.try_get(0)?)
    }

    async fn query_string(&self, sql: &str) -> crate::error::Result<String> {
        use sqlx::Row;
        let row = sqlx::raw_sql(sql).fetch_one(&self.pool).await?;
        Ok(row.try_get(0)?)
    }

    async fn query_bool(&self, sql: &str) -> crate::error::Result<bool> {
        use sqlx::Row;
        let row = sqlx::raw_sql(sql).fetch_one(&self.pool).await?;
        Ok(row.try_get(0)?)
    }

    fn config(&self) -> &Config {
        &self.config
    }

    fn queues(&self) -> &dyn QueueTable {
        self.queues.as_ref()
    }

    fn messages(&self) -> &dyn MessageTable {
        self.messages.as_ref()
    }

    fn workers(&self) -> &dyn WorkerTable {
        self.workers.as_ref()
    }

    fn db_state(&self) -> &dyn DbStateTable {
        self.db_state.as_ref()
    }

    fn workflows(&self) -> &dyn WorkflowTable {
        self.workflows.as_ref()
    }

    fn workflow_runs(&self) -> &dyn RunRecordTable {
        self.workflow_runs.as_ref()
    }

    fn workflow_steps(&self) -> &dyn StepRecordTable {
        self.workflow_steps.as_ref()
    }

    async fn bootstrap(&self) -> crate::error::Result<()> {
        MIGRATOR.run(&self.pool).await?;
        Ok(())
    }

    async fn admin(
        &self,
        name: &str,
        config: &Config,
    ) -> crate::error::Result<crate::workers::Admin> {
        let _ = config;
        crate::workers::Admin::new(crate::store::AnyStore::Postgres(self.clone()), name).await
    }

    async fn admin_ephemeral(
        &self,
        config: &Config,
    ) -> crate::error::Result<crate::workers::Admin> {
        let _ = config;
        crate::workers::Admin::new_ephemeral(crate::store::AnyStore::Postgres(self.clone())).await
    }

    async fn producer(
        &self,
        queue: &str,
        name: &str,
        _config: &Config,
    ) -> crate::error::Result<crate::workers::Producer> {
        let queue_info = self.queues.get_by_name(queue).await?;
        let worker_record = self.workers.register(Some(queue_info.id), name).await?;

        Ok(crate::workers::Producer::new(
            crate::store::AnyStore::Postgres(self.clone()),
            queue_info,
            worker_record,
            _config.validation_config.clone(),
        ))
    }

    async fn consumer(
        &self,
        queue: &str,
        name: &str,
        _config: &Config,
    ) -> crate::error::Result<crate::workers::Consumer> {
        let queue_info = self.queues.get_by_name(queue).await?;
        let worker_record = self.workers.register(Some(queue_info.id), name).await?;

        Ok(crate::workers::Consumer::new(
            crate::store::AnyStore::Postgres(self.clone()),
            queue_info,
            worker_record,
        ))
    }

    async fn queue(&self, name: &str) -> crate::error::Result<crate::types::QueueRecord> {
        let queue_exists = self.queues.exists(name).await?;
        if queue_exists {
            return Err(crate::error::Error::QueueAlreadyExists {
                name: name.to_string(),
            });
        }

        self.queues
            .insert(crate::types::NewQueueRecord {
                queue_name: name.to_string(),
            })
            .await
    }

    async fn workflow(&self, name: &str) -> crate::error::Result<crate::types::WorkflowRecord> {
        // Ensure backing queue exists.
        let queue_exists = self.queues.exists(name).await?;
        if !queue_exists {
            let _queue = self
                .queues
                .insert(crate::types::NewQueueRecord {
                    queue_name: name.to_string(),
                })
                .await?;
        }

        let queue = self.queues.get_by_name(name).await?;

        // Create workflow definition (template).
        let workflow_record = self
            .workflows
            .insert(crate::types::NewWorkflowRecord {
                name: name.to_string(),
                queue_id: queue.id,
            })
            .await
            .map_err(|e| {
                if let crate::error::Error::QueryFailed { source, .. } = &e {
                    if let Some(sqlx::Error::Database(db_err)) =
                        source.downcast_ref::<sqlx::Error>()
                    {
                        if db_err.code().as_deref() == Some("23505") {
                            return crate::error::Error::WorkflowAlreadyExists {
                                name: name.to_string(),
                            };
                        }
                    }
                }
                e
            })?;

        Ok(workflow_record)
    }

    async fn run(
        &self,
        message: crate::types::QueueMessage,
    ) -> crate::error::Result<crate::workers::Run> {
        // Try to find existing run by message_id
        match self.workflow_runs.get_by_message_id(message.id).await {
            Ok(record) => {
                return Ok(crate::workers::Run::new(
                    crate::store::AnyStore::Postgres(self.clone()),
                    record,
                ));
            }
            Err(crate::error::Error::NotFound { .. }) => {
                // Not found, continue to create new run
            }
            Err(e) => return Err(e),
        }

        // Otherwise, it's a new trigger. Create run record.
        let queue = self.queues.get(message.queue_id).await?;
        let workflow = self.workflows.get_by_name(&queue.queue_name).await?;

        let run_rec = self
            .workflow_runs
            .insert(crate::types::NewRunRecord {
                workflow_id: workflow.id,
                message_id: message.id,
                input: Some(message.payload.clone()),
            })
            .await?;

        Ok(crate::workers::Run::new(
            crate::store::AnyStore::Postgres(self.clone()),
            run_rec,
        ))
    }

    async fn worker(&self, id: i64) -> crate::error::Result<Box<dyn WorkerTrait>> {
        let worker_record = self.workers.get(id).await?;
        Ok(Box::new(crate::workers::WorkerHandle::new(
            crate::store::AnyStore::Postgres(self.clone()),
            worker_record,
        )))
    }

    fn concurrency_model(&self) -> crate::store::ConcurrencyModel {
        crate::store::ConcurrencyModel::MultiProcess
    }

    fn backend_name(&self) -> &'static str {
        "postgres"
    }

    async fn producer_ephemeral(
        &self,
        queue: &str,
        _config: &Config,
    ) -> crate::error::Result<crate::workers::Producer> {
        let queue_info = self.queues.get_by_name(queue).await?;
        let worker_record = self.workers.register_ephemeral(Some(queue_info.id)).await?;

        Ok(crate::workers::Producer::new(
            crate::store::AnyStore::Postgres(self.clone()),
            queue_info,
            worker_record,
            _config.validation_config.clone(),
        ))
    }

    async fn consumer_ephemeral(
        &self,
        queue: &str,
        _config: &Config,
    ) -> crate::error::Result<crate::workers::Consumer> {
        let queue_info = self.queues.get_by_name(queue).await?;
        let worker_record = self.workers.register_ephemeral(Some(queue_info.id)).await?;

        Ok(crate::workers::Consumer::new(
            crate::store::AnyStore::Postgres(self.clone()),
            queue_info,
            worker_record,
        ))
    }
}