dbq 0.1.0

Job queueing and processing library with queues stored in Postgres 9.5+
Documentation
use super::*;

use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};
use std::thread;
use std::time::Duration;

#[derive(Clone)]
struct UnitHandler {}

impl dbq::Handler for UnitHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
        Ok(())
    }
}

#[derive(Clone)]
struct ErrHandler {}

impl dbq::Handler for ErrHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
        Err(std::io::Error::new(std::io::ErrorKind::Other, "job error"))
    }
}

#[derive(Clone)]
struct PanicHandler {}

impl dbq::Handler for PanicHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
        panic!("panic!");
    }
}

#[derive(Clone)]
struct CountHandler {
    total: Arc<AtomicUsize>,
}

impl CountHandler {
    fn new() -> CountHandler {
        CountHandler {
            total: Arc::new(AtomicUsize::new(0)),
        }
    }

    fn total(&self) -> usize {
        self.total.load(Ordering::SeqCst)
    }
}

impl dbq::Handler for CountHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
        self.total.fetch_add(1, Ordering::SeqCst);
        Ok(())
    }
}

#[test]
fn test_single_worker() {
    let schema_config = init();
    let queue_name = "test_single_worker";
    let queue = dbq::Queue::new(schema_config, queue_name.to_string());

    let conn = db_conn();
    queue.clear(&conn).unwrap();
    let _job_id = enqueue_test_job(&queue, 1, &conn);

    let connect_params = db_connect_params();
    let handler = CountHandler::new();
    let mut config =
        dbq::WorkerPoolConfig::new(queue, connect_params, handler.clone()).unwrap();
    config.set_num_workers(1);

    let pool = dbq::WorkerPool::start(config);
    thread::sleep(Duration::new(0, 20_000_000));
    pool.join();
    assert_eq!(1, handler.total());
}

#[test]
fn test_multi_workers() {
    let schema_config = init();
    let queue_name = "test_multi_workers";
    let queue = dbq::Queue::new(schema_config, queue_name.to_string());

    let conn = db_conn();
    queue.clear(&conn).unwrap();
    let _job_id = enqueue_test_job(&queue, 1, &conn);
    drop(conn);

    let connect_params = db_connect_params();
    let config =
        dbq::WorkerPoolConfig::new(queue, connect_params, UnitHandler {}).unwrap();

    let pool = dbq::WorkerPool::start(config);
    thread::sleep(Duration::new(0, 20_000_000));
    pool.join();
}

#[test]
fn test_job_failure() {
    let schema_config = init();
    let queue_name = "test_job_failure";
    let queue = dbq::Queue::new(schema_config, queue_name.to_string());
    let max_attempts = 3;

    let conn = db_conn();
    queue.clear(&conn).unwrap();
    let job_id = enqueue_test_job(&queue, max_attempts, &conn);
    drop(conn);

    let connect_params = db_connect_params();
    let mut config =
        dbq::WorkerPoolConfig::new(queue.clone(), connect_params, ErrHandler {}).unwrap();
    config.set_num_workers(1);
    let pool = dbq::WorkerPool::start(config);
    // sleep longer because of backoff
    thread::sleep(Duration::new(2, 0));
    pool.join();

    let conn = db_conn();
    let job = queue
        .lookup_in_dead_letters(job_id, &conn)
        .unwrap()
        .unwrap();
    assert_eq!(max_attempts, job.error_count)
}

#[test]
fn test_panic() {
    let schema_config = init();
    let queue_name = "test_panic";
    let queue = dbq::Queue::new(schema_config, queue_name.to_string());

    let conn = db_conn();
    queue.clear(&conn).unwrap();
    let job_id = enqueue_test_job(&queue, 1, &conn);
    drop(conn);

    let connect_params = db_connect_params();
    let mut config =
        dbq::WorkerPoolConfig::new(queue.clone(), connect_params, PanicHandler {})
            .unwrap();
    config.set_num_workers(1);
    let pool = dbq::WorkerPool::start(config);
    thread::sleep(Duration::new(0, 20_000_000));
    pool.join();

    let conn = db_conn();
    let job = queue.lookup_in_queue(job_id, &conn).unwrap().unwrap();
    // the error was not recorded due to the panic
    assert_eq!(0, job.error_count)
}