use crate::error::Result;
use postgres::{transaction::Transaction, GenericConnection};
const DEFAULT_TABLE_PREFIX: &str = "dbq";
#[derive(Clone, Debug)]
pub struct SchemaConfig {
schema: Option<String>,
table_prefix: String,
queue_table: String,
dead_letters_table: String,
}
impl SchemaConfig {
pub fn new(table_prefix: String) -> Self {
let mut config = SchemaConfig {
schema: None,
table_prefix,
queue_table: "".to_string(),
dead_letters_table: "".to_string(),
};
config.update_table_ids();
config
}
#[inline]
pub fn schema(&self) -> &Option<String> {
&self.schema
}
#[inline]
pub fn set_schema(&mut self, schema: Option<String>) {
self.schema = schema;
self.update_table_ids();
}
#[inline]
pub fn table_prefix(&self) -> &str {
&self.table_prefix
}
#[inline]
pub fn queue_table(&self) -> &str {
&self.queue_table
}
#[inline]
pub fn dead_letters_table(&self) -> &str {
&self.dead_letters_table
}
fn update_table_ids(&mut self) {
let full_table_prefix =
build_full_table_prefix(&self.schema, self.table_prefix());
self.queue_table = format!("{}job_queue", &full_table_prefix);
self.dead_letters_table = format!("{}job_dead_letters", full_table_prefix);
}
}
impl Default for SchemaConfig {
fn default() -> Self {
let table_prefix = DEFAULT_TABLE_PREFIX.to_string();
Self::new(table_prefix)
}
}
pub fn run_migrations<C: GenericConnection>(
config: &SchemaConfig,
conn: &C,
advisory_lock_key: Option<i64>,
) -> Result<()> {
let tx = conn.transaction()?;
if let Some(alk) = advisory_lock_key {
tx.execute("select pg_advisory_xact_lock($1)", &[&alk])?;
}
table_job_queue(&config, &tx)?;
table_job_dead_letters(&config, &tx)?;
tx.commit()?;
Ok(())
}
fn table_job_queue<'a>(config: &SchemaConfig, conn: &'a Transaction<'a>) -> Result<()> {
let ddl = &format!(
r#"create table if not exists {} (
id bigserial not null primary key,
class text not null,
args json not null,
queue text not null,
max_attempts integer not null,
created_at timestamp with time zone default now() not null,
run_next_at timestamp with time zone default now() not null,
last_failed_at timestamp with time zone null,
last_error text null,
error_count integer default 0 not null
)"#,
config.queue_table(),
);
let _ = conn.execute(ddl, &[])?;
Ok(())
}
fn table_job_dead_letters<'a>(
config: &SchemaConfig,
conn: &'a Transaction<'a>,
) -> Result<()> {
let ddl = &format!(
r#"create table if not exists {} (
id bigint not null primary key,
class text not null,
args json not null,
queue text not null,
max_attempts integer not null,
created_at timestamp with time zone not null,
last_failed_at timestamp with time zone not null,
last_error text not null,
error_count integer not null
)"#,
config.dead_letters_table(),
);
let _ = conn.execute(ddl, &[])?;
Ok(())
}
fn build_full_table_prefix(schema: &Option<String>, table_prefix: &str) -> String {
let prefix_len =
schema.as_ref().map(|s| s.len()).unwrap_or(0) + table_prefix.len() + 2;
let mut prefix = String::with_capacity(prefix_len);
if let Some(s) = schema {
prefix.push_str(&s[..]);
prefix.push_str(".");
}
if table_prefix.is_empty() {
return prefix;
}
prefix.push_str(&table_prefix[..]);
prefix.push_str("_");
prefix
}