dbq 0.1.0

Job queueing and processing library with queues stored in Postgres 9.5+
Documentation
use crate::error::Result;
use crate::{Job, SchemaConfig};
use chrono::{DateTime, Utc};
use postgres::{self, transaction::Transaction, GenericConnection};
use serde::Serialize;
use serde_json::value;

/// Represents a job queue stored in the database. Because the queue is stored
/// in the database, the queue should be considered to have internal mutability
///
/// # Examples
///
/// ```no_run
/// use dbq::{Queue, SchemaConfig};
/// use postgres::{Connection, TlsMode};
/// use serde_json::value::Value;
///
/// let q = Queue::new(SchemaConfig::default(), "de_lancie_q".to_string());
///
/// let conn = Connection::connect("postgres://user@localhost".to_string(), TlsMode::None)
///     .unwrap();
/// q.enqueue("send_email", Value::String("example@example.com".to_string()), 3, &conn);
/// q.enqueue("send_sms", Value::String("+1-555-555-5555".to_string()), 3, &conn);
/// ```
#[derive(Clone, Debug)]
pub struct Queue {
    schema_config: SchemaConfig,
    name: String,
}

impl Queue {
    #[inline]
    pub fn new(schema_config: SchemaConfig, name: String) -> Queue {
        Queue {
            schema_config,
            name,
        }
    }

    /// Add a new job to the queue
    pub fn enqueue<A: Serialize, C: GenericConnection>(
        &self,
        class: &str,
        args: A,
        max_attempts: u32,
        conn: &C,
    ) -> Result<u64> {
        let args_json = value::to_value(args)?;
        let rows = conn.query(
            &format!(
                "insert into {} (class, args, queue, max_attempts)
                values ($1, $2, $3, $4) returning id",
                self.schema_config.queue_table()
            ),
            &[&class, &args_json, &self.name, &(max_attempts as i32)],
        )?;
        let id: i64 = rows.get(0).get(0);
        Ok(id as u64)
    }

    /// Claim a job so that it can be run
    pub(crate) fn reserve(&self, tx: &Transaction) -> Result<Option<Job>> {
        let query = &format!(
            r#"select
                id, queue, args, class, max_attempts, created_at, run_next_at,
                last_failed_at, last_error, error_count
            from {}
            where queue = $1 and run_next_at < now()
            order by id
            limit 1
            for update skip locked"#,
            self.schema_config.queue_table()
        );
        let rows = tx.query(query, &[&self.name])?;
        if rows.is_empty() {
            return Ok(None);
        }
        let row = rows.get(0);
        let id: i64 = row.get(0);
        let max_attempts: i32 = row.get(4);
        let error_count: i32 = row.get(9);
        Ok(Some(Job {
            id: id as u64,
            queue: row.get(1),
            args: row.get(2),
            class: row.get(3),
            max_attempts: max_attempts as u32,
            created_at: row.get(5),
            run_next_at: row.get(6),
            last_failed_at: row.get(7),
            last_error: row.get(8),
            error_count: error_count as u32,
        }))
    }

    /// Complete a job successfully
    pub(crate) fn complete<C: GenericConnection>(
        &self,
        job: &Job,
        conn: &C,
    ) -> Result<bool> {
        let id = job.id as i64;
        let res = conn.execute(
            &format!(
                "delete from {} where id = $1 and queue = $2",
                self.schema_config.queue_table()
            ),
            &[&id, &self.name],
        )?;
        Ok(res > 0)
    }

    /// Record the failed run attempt
    pub(crate) fn mark_error<C: GenericConnection>(
        &self,
        job: &Job,
        err_msg: &str,
        run_next_at: DateTime<Utc>,
        conn: &C,
    ) -> Result<bool> {
        let id = job.id as i64;
        let error_count = (job.error_count + 1) as i32;
        let res = conn.execute(
            &format!(
                r#"update {}
                set
                    last_failed_at = now(),
                    last_error = $3,
                    error_count = $4,
                    run_next_at = $5
                where id = $1 and queue = $2"#,
                self.schema_config.queue_table()
            ),
            &[&id, &self.name, &err_msg, &error_count, &run_next_at],
        )?;
        Ok(res > 0)
    }

    /// Move a job to "dead letters" storage so it is not retried
    pub(crate) fn fail<C: GenericConnection>(
        &self,
        job: &Job,
        err_msg: &str,
        conn: &C,
    ) -> Result<bool> {
        let id = job.id as i64;
        let error_count = (job.error_count + 1) as i32;
        let res = conn.execute(
            &format!(
                r#"insert into {} (
                    id, class, args, queue, max_attempts, created_at, last_failed_at, last_error,
                    error_count
                )
                select id, class, args, queue, max_attempts, created_at, now(), $3, $4
                from {}
                where id = $1 and queue = $2"#,
                self.schema_config.dead_letters_table(), self.schema_config.queue_table()),
            &[&id, &self.name, &err_msg, &error_count],
        )?;
        if res == 0 {
            return Ok(false);
        }
        self.complete(&job, conn)
    }

    /// Length of the queue
    pub fn len<C: GenericConnection>(&self, conn: &C) -> Result<usize> {
        let res = conn.query(
            &format!(
                "select count(*) from {} where queue = $1",
                self.schema_config.queue_table()
            ),
            &[&self.name],
        )?;
        let count: i64 = res.get(0).get(0);
        Ok(count as usize)
    }

    /// Find a job in the queue by its job ID
    pub fn lookup_in_queue<C: GenericConnection>(
        &self,
        job_id: u64,
        conn: &C,
    ) -> Result<Option<Job>> {
        let query = &format!(
            r#"select
                id, queue, args, class, max_attempts, created_at, run_next_at,
                last_failed_at, last_error, error_count
            from {}
            where id = $1 and queue = $2"#,
            self.schema_config.queue_table()
        );
        let rows = conn.query(query, &[&(job_id as i64), &self.name])?;
        if rows.is_empty() {
            return Ok(None);
        }
        let row = rows.get(0);
        let id: i64 = row.get(0);
        let max_attempts: i32 = row.get(4);
        let error_count: i32 = row.get(9);
        Ok(Some(Job {
            id: id as u64,
            queue: row.get(1),
            args: row.get(2),
            class: row.get(3),
            max_attempts: max_attempts as u32,
            created_at: row.get(5),
            run_next_at: Some(row.get(6)),
            last_failed_at: row.get(7),
            last_error: row.get(8),
            error_count: error_count as u32,
        }))
    }

    /// Find a job in the queue's "dead letter" storage by its job ID
    pub fn lookup_in_dead_letters<C: GenericConnection>(
        &self,
        job_id: u64,
        conn: &C,
    ) -> Result<Option<Job>> {
        let query = &format!(
            r#"select
                id, queue, args, class, max_attempts, created_at,
                last_failed_at, last_error, error_count
            from {}
            where id = $1 and queue = $2"#,
            self.schema_config.dead_letters_table()
        );
        let rows = conn.query(query, &[&(job_id as i64), &self.name])?;
        if rows.is_empty() {
            return Ok(None);
        }
        let row = rows.get(0);
        let id: i64 = row.get(0);
        let max_attempts: i32 = row.get(4);
        let error_count: i32 = row.get(8);
        Ok(Some(Job {
            id: id as u64,
            queue: row.get(1),
            args: row.get(2),
            class: row.get(3),
            max_attempts: max_attempts as u32,
            created_at: row.get(5),
            run_next_at: None,
            last_failed_at: row.get(6),
            last_error: row.get(7),
            error_count: error_count as u32,
        }))
    }

    /// Delete all jobs from the queue. Calling this function may block until
    /// all running jobs in the queue finish
    pub fn clear<C: GenericConnection>(&self, conn: &C) -> Result<()> {
        conn.execute(
            &format!(
                "delete from {} where queue = $1",
                self.schema_config.queue_table()
            ),
            &[&self.name],
        )?;
        Ok(())
    }
}