use crate::store::{
DbStateTable, MessageTable, QueueTable, RunRecordTable, StepRecordTable, Store,
Worker as WorkerTrait, WorkerTable, WorkflowTable,
};
use async_trait::async_trait;
use sqlx::PgPool;
use std::sync::Arc;
pub(crate) mod dialect;
pub mod tables;
use self::tables::db_state::DbState as PostgresDbState;
use self::tables::pgqrs_messages::Messages as PostgresMessageTable;
use self::tables::pgqrs_queues::Queues as PostgresQueueTable;
use self::tables::pgqrs_workers::Workers as PostgresWorkerTable;
use self::tables::pgqrs_workflow_runs::RunRecords as PostgresRunRecordTable;
use self::tables::pgqrs_workflow_steps::StepRecords as PostgresStepRecordTable;
use self::tables::pgqrs_workflows::Workflows as PostgresWorkflowTable;
use crate::config::Config;
pub static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!("migrations/postgres");
#[derive(Debug, Clone)]
pub struct PostgresStore {
pool: PgPool,
config: Config,
queues: Arc<PostgresQueueTable>,
messages: Arc<PostgresMessageTable>,
workers: Arc<PostgresWorkerTable>,
db_state: Arc<PostgresDbState>,
workflows: Arc<PostgresWorkflowTable>,
workflow_runs: Arc<PostgresRunRecordTable>,
workflow_steps: Arc<PostgresStepRecordTable>,
}
impl PostgresStore {
pub fn new(pool: PgPool, config: &Config) -> Self {
Self {
pool: pool.clone(),
config: config.clone(),
queues: Arc::new(PostgresQueueTable::new(pool.clone())),
messages: Arc::new(PostgresMessageTable::new(pool.clone())),
workers: Arc::new(PostgresWorkerTable::new(pool.clone())),
db_state: Arc::new(PostgresDbState::new(pool.clone())),
workflows: Arc::new(PostgresWorkflowTable::new(pool.clone())),
workflow_runs: Arc::new(PostgresRunRecordTable::new(pool.clone())),
workflow_steps: Arc::new(PostgresStepRecordTable::new(pool)),
}
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
}
#[async_trait]
impl Store for PostgresStore {
async fn execute_raw(&self, sql: &str) -> crate::error::Result<()> {
sqlx::raw_sql(sql).execute(&self.pool).await?;
Ok(())
}
async fn execute_raw_with_i64(&self, sql: &str, param: i64) -> crate::error::Result<()> {
sqlx::query(sql).bind(param).execute(&self.pool).await?;
Ok(())
}
async fn execute_raw_with_two_i64(
&self,
sql: &str,
param1: i64,
param2: i64,
) -> crate::error::Result<()> {
sqlx::query(sql)
.bind(param1)
.bind(param2)
.execute(&self.pool)
.await?;
Ok(())
}
async fn query_int(&self, sql: &str) -> crate::error::Result<i64> {
use sqlx::Row;
let row = sqlx::raw_sql(sql).fetch_one(&self.pool).await?;
Ok(row.try_get(0)?)
}
async fn query_string(&self, sql: &str) -> crate::error::Result<String> {
use sqlx::Row;
let row = sqlx::raw_sql(sql).fetch_one(&self.pool).await?;
Ok(row.try_get(0)?)
}
async fn query_bool(&self, sql: &str) -> crate::error::Result<bool> {
use sqlx::Row;
let row = sqlx::raw_sql(sql).fetch_one(&self.pool).await?;
Ok(row.try_get(0)?)
}
fn config(&self) -> &Config {
&self.config
}
fn queues(&self) -> &dyn QueueTable {
self.queues.as_ref()
}
fn messages(&self) -> &dyn MessageTable {
self.messages.as_ref()
}
fn workers(&self) -> &dyn WorkerTable {
self.workers.as_ref()
}
fn db_state(&self) -> &dyn DbStateTable {
self.db_state.as_ref()
}
fn workflows(&self) -> &dyn WorkflowTable {
self.workflows.as_ref()
}
fn workflow_runs(&self) -> &dyn RunRecordTable {
self.workflow_runs.as_ref()
}
fn workflow_steps(&self) -> &dyn StepRecordTable {
self.workflow_steps.as_ref()
}
async fn bootstrap(&self) -> crate::error::Result<()> {
MIGRATOR.run(&self.pool).await?;
Ok(())
}
async fn admin(
&self,
name: &str,
config: &Config,
) -> crate::error::Result<crate::workers::Admin> {
let _ = config;
crate::workers::Admin::new(crate::store::AnyStore::Postgres(self.clone()), name).await
}
async fn admin_ephemeral(
&self,
config: &Config,
) -> crate::error::Result<crate::workers::Admin> {
let _ = config;
crate::workers::Admin::new_ephemeral(crate::store::AnyStore::Postgres(self.clone())).await
}
async fn producer(
&self,
queue: &str,
name: &str,
_config: &Config,
) -> crate::error::Result<crate::workers::Producer> {
let queue_info = self.queues.get_by_name(queue).await?;
let worker_record = self.workers.register(Some(queue_info.id), name).await?;
Ok(crate::workers::Producer::new(
crate::store::AnyStore::Postgres(self.clone()),
queue_info,
worker_record,
_config.validation_config.clone(),
))
}
async fn consumer(
&self,
queue: &str,
name: &str,
_config: &Config,
) -> crate::error::Result<crate::workers::Consumer> {
let queue_info = self.queues.get_by_name(queue).await?;
let worker_record = self.workers.register(Some(queue_info.id), name).await?;
Ok(crate::workers::Consumer::new(
crate::store::AnyStore::Postgres(self.clone()),
queue_info,
worker_record,
))
}
async fn queue(&self, name: &str) -> crate::error::Result<crate::types::QueueRecord> {
let queue_exists = self.queues.exists(name).await?;
if queue_exists {
return Err(crate::error::Error::QueueAlreadyExists {
name: name.to_string(),
});
}
self.queues
.insert(crate::types::NewQueueRecord {
queue_name: name.to_string(),
})
.await
}
async fn workflow(&self, name: &str) -> crate::error::Result<crate::types::WorkflowRecord> {
let queue_exists = self.queues.exists(name).await?;
if !queue_exists {
let _queue = self
.queues
.insert(crate::types::NewQueueRecord {
queue_name: name.to_string(),
})
.await?;
}
let queue = self.queues.get_by_name(name).await?;
let workflow_record = self
.workflows
.insert(crate::types::NewWorkflowRecord {
name: name.to_string(),
queue_id: queue.id,
})
.await
.map_err(|e| {
if let crate::error::Error::QueryFailed { source, .. } = &e {
if let Some(sqlx::Error::Database(db_err)) =
source.downcast_ref::<sqlx::Error>()
{
if db_err.code().as_deref() == Some("23505") {
return crate::error::Error::WorkflowAlreadyExists {
name: name.to_string(),
};
}
}
}
e
})?;
Ok(workflow_record)
}
async fn run(
&self,
message: crate::types::QueueMessage,
) -> crate::error::Result<crate::workers::Run> {
match self.workflow_runs.get_by_message_id(message.id).await {
Ok(record) => {
return Ok(crate::workers::Run::new(
crate::store::AnyStore::Postgres(self.clone()),
record,
));
}
Err(crate::error::Error::NotFound { .. }) => {
}
Err(e) => return Err(e),
}
let queue = self.queues.get(message.queue_id).await?;
let workflow = self.workflows.get_by_name(&queue.queue_name).await?;
let run_rec = self
.workflow_runs
.insert(crate::types::NewRunRecord {
workflow_id: workflow.id,
message_id: message.id,
input: Some(message.payload.clone()),
})
.await?;
Ok(crate::workers::Run::new(
crate::store::AnyStore::Postgres(self.clone()),
run_rec,
))
}
async fn worker(&self, id: i64) -> crate::error::Result<Box<dyn WorkerTrait>> {
let worker_record = self.workers.get(id).await?;
Ok(Box::new(crate::workers::WorkerHandle::new(
crate::store::AnyStore::Postgres(self.clone()),
worker_record,
)))
}
fn concurrency_model(&self) -> crate::store::ConcurrencyModel {
crate::store::ConcurrencyModel::MultiProcess
}
fn backend_name(&self) -> &'static str {
"postgres"
}
async fn producer_ephemeral(
&self,
queue: &str,
_config: &Config,
) -> crate::error::Result<crate::workers::Producer> {
let queue_info = self.queues.get_by_name(queue).await?;
let worker_record = self.workers.register_ephemeral(Some(queue_info.id)).await?;
Ok(crate::workers::Producer::new(
crate::store::AnyStore::Postgres(self.clone()),
queue_info,
worker_record,
_config.validation_config.clone(),
))
}
async fn consumer_ephemeral(
&self,
queue: &str,
_config: &Config,
) -> crate::error::Result<crate::workers::Consumer> {
let queue_info = self.queues.get_by_name(queue).await?;
let worker_record = self.workers.register_ephemeral(Some(queue_info.id)).await?;
Ok(crate::workers::Consumer::new(
crate::store::AnyStore::Postgres(self.clone()),
queue_info,
worker_record,
))
}
}