use super::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct UnitHandler {}
impl dbq::Handler for UnitHandler {
type Error = std::io::Error;
fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
Ok(())
}
}
#[derive(Clone)]
struct ErrHandler {}
impl dbq::Handler for ErrHandler {
type Error = std::io::Error;
fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "job error"))
}
}
#[derive(Clone)]
struct PanicHandler {}
impl dbq::Handler for PanicHandler {
type Error = std::io::Error;
fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
panic!("panic!");
}
}
#[derive(Clone)]
struct CountHandler {
total: Arc<AtomicUsize>,
}
impl CountHandler {
fn new() -> CountHandler {
CountHandler {
total: Arc::new(AtomicUsize::new(0)),
}
}
fn total(&self) -> usize {
self.total.load(Ordering::SeqCst)
}
}
impl dbq::Handler for CountHandler {
type Error = std::io::Error;
fn handle(&self, _ctx: dbq::JobContext) -> std::result::Result<(), Self::Error> {
self.total.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[test]
fn test_single_worker() {
let schema_config = init();
let queue_name = "test_single_worker";
let queue = dbq::Queue::new(schema_config, queue_name.to_string());
let conn = db_conn();
queue.clear(&conn).unwrap();
let _job_id = enqueue_test_job(&queue, 1, &conn);
let connect_params = db_connect_params();
let handler = CountHandler::new();
let mut config =
dbq::WorkerPoolConfig::new(queue, connect_params, handler.clone()).unwrap();
config.set_num_workers(1);
let pool = dbq::WorkerPool::start(config);
thread::sleep(Duration::new(0, 20_000_000));
pool.join();
assert_eq!(1, handler.total());
}
#[test]
fn test_multi_workers() {
let schema_config = init();
let queue_name = "test_multi_workers";
let queue = dbq::Queue::new(schema_config, queue_name.to_string());
let conn = db_conn();
queue.clear(&conn).unwrap();
let _job_id = enqueue_test_job(&queue, 1, &conn);
drop(conn);
let connect_params = db_connect_params();
let config =
dbq::WorkerPoolConfig::new(queue, connect_params, UnitHandler {}).unwrap();
let pool = dbq::WorkerPool::start(config);
thread::sleep(Duration::new(0, 20_000_000));
pool.join();
}
#[test]
fn test_job_failure() {
let schema_config = init();
let queue_name = "test_job_failure";
let queue = dbq::Queue::new(schema_config, queue_name.to_string());
let max_attempts = 3;
let conn = db_conn();
queue.clear(&conn).unwrap();
let job_id = enqueue_test_job(&queue, max_attempts, &conn);
drop(conn);
let connect_params = db_connect_params();
let mut config =
dbq::WorkerPoolConfig::new(queue.clone(), connect_params, ErrHandler {}).unwrap();
config.set_num_workers(1);
let pool = dbq::WorkerPool::start(config);
thread::sleep(Duration::new(2, 0));
pool.join();
let conn = db_conn();
let job = queue
.lookup_in_dead_letters(job_id, &conn)
.unwrap()
.unwrap();
assert_eq!(max_attempts, job.error_count)
}
#[test]
fn test_panic() {
let schema_config = init();
let queue_name = "test_panic";
let queue = dbq::Queue::new(schema_config, queue_name.to_string());
let conn = db_conn();
queue.clear(&conn).unwrap();
let job_id = enqueue_test_job(&queue, 1, &conn);
drop(conn);
let connect_params = db_connect_params();
let mut config =
dbq::WorkerPoolConfig::new(queue.clone(), connect_params, PanicHandler {})
.unwrap();
config.set_num_workers(1);
let pool = dbq::WorkerPool::start(config);
thread::sleep(Duration::new(0, 20_000_000));
pool.join();
let conn = db_conn();
let job = queue.lookup_in_queue(job_id, &conn).unwrap().unwrap();
assert_eq!(0, job.error_count)
}