dbq 0.1.0

Job queueing and processing library with queues stored in Postgres 9.5+
Documentation
use crate::error::Result;
use postgres::{transaction::Transaction, GenericConnection};

const DEFAULT_TABLE_PREFIX: &str = "dbq";

/// Configures the tables for storing the job queue and dead letters. The
/// default `SchemaConfig` uses the default schema and prefixes the tables with
/// `dbq_`
///
/// # Examples
///
/// Default configuration
///
/// ```
/// use dbq::*;
/// let config = SchemaConfig::default();
/// assert_eq!("dbq_job_queue", config.queue_table());
/// assert_eq!("dbq_job_dead_letters", config.dead_letters_table());
/// ```
///
/// Change the table prefix
///
/// ```
/// use dbq::*;
/// let config = SchemaConfig::new("my".to_string());
/// assert_eq!("my_job_queue", config.queue_table());
/// assert_eq!("my_job_dead_letters", config.dead_letters_table());
/// ```
///
/// Remove the table prefix
///
/// ```
/// use dbq::*;
/// let config = SchemaConfig::new("".to_string());
/// assert_eq!("job_queue", config.queue_table());
/// assert_eq!("job_dead_letters", config.dead_letters_table());
/// ```
///
/// Use a custom schema
///
/// ```
/// use dbq::*;
/// let mut config = SchemaConfig::new("".to_string());
/// config.set_schema(Some("jobs".to_string()));
/// assert_eq!("jobs.job_queue", config.queue_table());
/// assert_eq!("jobs.job_dead_letters", config.dead_letters_table());
/// ```
///
#[derive(Clone, Debug)]
pub struct SchemaConfig {
    schema: Option<String>,
    table_prefix: String,
    queue_table: String,
    dead_letters_table: String,
}

impl SchemaConfig {
    pub fn new(table_prefix: String) -> Self {
        let mut config = SchemaConfig {
            schema: None,
            table_prefix,
            queue_table: "".to_string(),
            dead_letters_table: "".to_string(),
        };
        config.update_table_ids();
        config
    }

    #[inline]
    pub fn schema(&self) -> &Option<String> {
        &self.schema
    }

    #[inline]
    pub fn set_schema(&mut self, schema: Option<String>) {
        self.schema = schema;
        self.update_table_ids();
    }

    #[inline]
    pub fn table_prefix(&self) -> &str {
        &self.table_prefix
    }

    #[inline]
    pub fn queue_table(&self) -> &str {
        &self.queue_table
    }

    #[inline]
    pub fn dead_letters_table(&self) -> &str {
        &self.dead_letters_table
    }

    fn update_table_ids(&mut self) {
        let full_table_prefix =
            build_full_table_prefix(&self.schema, self.table_prefix());
        self.queue_table = format!("{}job_queue", &full_table_prefix);
        self.dead_letters_table = format!("{}job_dead_letters", full_table_prefix);
    }
}

impl Default for SchemaConfig {
    fn default() -> Self {
        let table_prefix = DEFAULT_TABLE_PREFIX.to_string();
        Self::new(table_prefix)
    }
}

/// Create the database objects used by dbq. This function is idempotent and
/// should be run on every startup to ensure that the database objects are
/// up to date.
///
/// Pass an advisory lock key to wrap migrations in an advisory
/// lock to prevent errors from multiple migrations being run at once (for
/// instance, when multiple programs start and run migrations simultaneously).
/// The advisory lock key must be the same for every call to `run_migrations`
/// to prevent conflicts.
pub fn run_migrations<C: GenericConnection>(
    config: &SchemaConfig,
    conn: &C,
    advisory_lock_key: Option<i64>,
) -> Result<()> {
    let tx = conn.transaction()?;

    if let Some(alk) = advisory_lock_key {
        tx.execute("select pg_advisory_xact_lock($1)", &[&alk])?;
    }

    table_job_queue(&config, &tx)?;
    table_job_dead_letters(&config, &tx)?;

    tx.commit()?;
    Ok(())
}

fn table_job_queue<'a>(config: &SchemaConfig, conn: &'a Transaction<'a>) -> Result<()> {
    let ddl = &format!(
        r#"create table if not exists {} (
            id bigserial not null primary key,
            class text not null,
            args json not null,
            queue text not null,
            max_attempts integer not null,
            created_at timestamp with time zone default now() not null,
            run_next_at timestamp with time zone default now() not null,
            last_failed_at timestamp with time zone null,
            last_error text null,
            error_count integer default 0 not null
        )"#,
        config.queue_table(),
    );
    let _ = conn.execute(ddl, &[])?;
    Ok(())
}

fn table_job_dead_letters<'a>(
    config: &SchemaConfig,
    conn: &'a Transaction<'a>,
) -> Result<()> {
    let ddl = &format!(
        r#"create table if not exists {} (
            id bigint not null primary key,
            class text not null,
            args json not null,
            queue text not null,
            max_attempts integer not null,
            created_at timestamp with time zone not null,
            last_failed_at timestamp with time zone not null,
            last_error text not null,
            error_count integer not null
        )"#,
        config.dead_letters_table(),
    );
    let _ = conn.execute(ddl, &[])?;
    Ok(())
}

fn build_full_table_prefix(schema: &Option<String>, table_prefix: &str) -> String {
    let prefix_len =
        schema.as_ref().map(|s| s.len()).unwrap_or(0) + table_prefix.len() + 2;
    let mut prefix = String::with_capacity(prefix_len);
    if let Some(s) = schema {
        prefix.push_str(&s[..]);
        prefix.push_str(".");
    }
    if table_prefix.is_empty() {
        return prefix;
    }
    prefix.push_str(&table_prefix[..]);
    prefix.push_str("_");
    prefix
}