disqrust 0.1.0

A high-level library to implement Disque workers
Documentation
#![cfg_attr(feature = "nightly", feature(duration_span))]
extern crate disqrust;
extern crate disque;
extern crate redis;

use std::time::Duration;
use std::sync::mpsc::{channel, Sender};

use disque::Disque;
use disqrust::{EventLoop, Handler, JobStatus};
use redis::Value;

enum HandlerCall {
    Job(Vec<u8>, String, Vec<u8>),
    Error(Vec<u8>, String, u32, u32),
}

impl HandlerCall {
    fn body(&self) -> Vec<u8> {
        match *self {
            HandlerCall::Job(_, _, ref body) => body.clone(),
            HandlerCall::Error(_, _, _, _) => panic!("getting body for error"),
        }
    }

    fn nack_additional_deliveries(&self) -> (u32, u32) {
        match *self {
            HandlerCall::Job(_, _, _) => panic!("getting nack for job"),
            HandlerCall::Error(_, _, a, b) => (a, b),
        }
    }
}

#[derive(Clone)]
struct MyHandler {
    sender: Sender<HandlerCall>,
    process_job_ret: JobStatus,
    process_error_ret: bool,
    sleep: Option<u32>,
    panic: bool,
}

impl MyHandler {
    fn new(sender: Sender<HandlerCall>, process_job_ret: JobStatus,
            process_error_ret: bool) -> MyHandler {
        MyHandler {
            sender: sender,
            process_job_ret: process_job_ret,
            process_error_ret: process_error_ret,
            sleep: None,
            panic: false,
        }
    }

    #[allow(dead_code)]
    fn set_sleep(&mut self, sleep: Option<u32>) {
        self.sleep = sleep;
    }

    #[allow(dead_code)]
    fn should_panic(&mut self, panic: bool) {
        self.panic = panic;
    }
}

impl Handler for MyHandler {
    fn process_job(&self, queue_name: &[u8], jobid: &String, body: Vec<u8>
            ) -> JobStatus {
        if let Some(sleep) = self.sleep {
            std::thread::sleep_ms(sleep);
        }

        if self.panic {
            panic!("panic on demand");
        }

        self.sender.send(HandlerCall::Job(queue_name.to_vec(), jobid.clone(),
                    body)).unwrap();
        self.process_job_ret.clone()
    }

    fn process_error(&self, queue_name: &[u8], jobid: &String, nack: u32,
            additional_deliveries: u32) -> bool {
        self.sender.send(HandlerCall::Error(queue_name.to_vec(), jobid.clone(),
                    nack, additional_deliveries)).unwrap();
        self.process_error_ret
    }
}

#[cfg(test)]
fn create_job(queue: &[u8], job: &[u8], nack: bool
        ) -> (Disque, Vec<u8>, Vec<u8>, String) {
    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    let jobid = disque.addjob(queue, job, Duration::from_secs(10), None,
            None, Some(Duration::from_secs(1)), None, None, false).unwrap();
    if nack {
        disque.getjob(true, None, &[queue]).unwrap();
        disque.nackjob(jobid.as_bytes()).unwrap();
    }
    (disque, queue.to_vec(), job.to_vec(), jobid)
}

#[test]
fn basic() {
    let (disque, queue, job, _) = create_job(b"basic", b"job67", false);

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::AckJob, true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(1);
    el.stop();
    assert_eq!(rx.try_recv().unwrap().body(), job);
    assert!(rx.try_recv().is_err());
}

#[test]
fn error() {
    let (disque, queue, _, jobid) = create_job(b"error", b"job456", true);

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::AckJob, false);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(1);
    el.stop();
    assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (1, 0));
    assert!(rx.try_recv().is_err());

    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    disque.getjob(true, None, &[&*queue]).unwrap();
    disque.ackjob(jobid.as_bytes()).unwrap();
}

#[test]
fn error_and_job() {
    let (disque, queue, job, _) = create_job(b"errorandjob", b"job123", true);

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::AckJob, true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(1);
    el.stop();
    assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (1, 0));
    assert_eq!(rx.try_recv().unwrap().body(), job);
    assert!(rx.try_recv().is_err());
}

#[test]
fn fastack() {
    let (disque, queue, _, jobid) = create_job(b"fastack", b"job123", false);

    assert!(disque.show(jobid.as_bytes()).unwrap().is_some());

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::FastAck, true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(1);
    el.stop();
    rx.try_recv().unwrap();

    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    assert!(disque.show(jobid.as_bytes()).unwrap().is_none());
}

#[test]
fn ackjob() {
    let (disque, queue, _, jobid) = create_job(b"ackjob", b"job123", false);

    assert!(disque.show(jobid.as_bytes()).unwrap().is_some());

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::AckJob, true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(1);
    el.stop();
    rx.try_recv().unwrap();

    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    assert!(disque.show(jobid.as_bytes()).unwrap().is_none());
}

#[test]
fn nack() {
    let (disque, queue, _, jobid) = create_job(b"nack", b"job000", false);

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::NAck, true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());

    el.run_times(3);
    el.stop();
    rx.try_recv().unwrap();
    rx.try_recv().unwrap();
    rx.try_recv().unwrap();

    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    assert_eq!(*disque.show(jobid.as_bytes()).unwrap().unwrap().get(
                "nacks").unwrap(),
            Value::Int(3));
    disque.ackjob(jobid.as_bytes()).unwrap();
}

#[test]
fn jobcount_current_node() {
    let (disque, queue, _, _) = create_job(b"ackjob", b"job123", false);

    let (tx, rx) = channel();
    let handler = MyHandler::new(tx, JobStatus::AckJob, true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(1);

    assert!(el.jobcount_current_node() >= 1);
    el.stop();
    rx.try_recv().unwrap();
}

#[test]
fn change_servers() {
    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    // This test does not apply when there is only one server
    let hello = disque.hello().unwrap();
    if hello.2.len() == 1 {
        return;
    }
    let disque2 = Disque::open(&*format!("redis://{}:{}/",
                hello.2[1].1, hello.2[1].2)).unwrap();

    let (tx, rx) = channel();

    let handler = MyHandler::new(tx, JobStatus::AckJob, true);
    let mut el = EventLoop::new(disque2, 1, handler);
    let oldid = el.current_node_id();
    let queue = b"change_servers";
    let job = b"job";

    el.watch_queue(queue.to_vec());

    let att = 20;
    for _ in 0..att {
        disque.addjob(queue, job, Duration::from_secs(10), None,
                None, None, None, None, false).unwrap();
        el.run_times(1);
        el.do_cycle();
        let newid = el.current_node_id();
        if newid != oldid {
            rx.try_recv().unwrap();
            return;
        }
    }

    panic!("After {} attempts it did not change node", att);
}

#[cfg(feature = "nightly")]
#[test]
fn queueing() {
    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    // This test does not apply when there is only one server
    let hello = disque.hello().unwrap();
    if hello.2.len() == 1 {
        return;
    }
    let disque2 = Disque::open(&*format!("redis://{}:{}/",
                hello.2[1].1, hello.2[1].2)).unwrap();

    let (tx, rx) = channel();

    let mut handler = MyHandler::new(tx, JobStatus::AckJob, true);
    handler.set_sleep(Some(200));
    let mut el = EventLoop::new(disque2, 1, handler);
    let queue = b"queueing";
    el.watch_queue(queue.to_vec());

    let d = Duration::span(|| {
        disque.addjob(queue, b"job1", Duration::from_secs(10), None,
                None, None, None, None, false).unwrap();
        disque.addjob(queue, b"job2", Duration::from_secs(10), None,
                None, None, None, None, false).unwrap();
        disque.addjob(queue, b"job3", Duration::from_secs(10), None,
                None, None, None, None, false).unwrap();
        el.run_times(3);
        el.stop();
    });
    assert_eq!(rx.try_recv().unwrap().body(), b"job1");
    assert_eq!(rx.try_recv().unwrap().body(), b"job2");
    assert_eq!(rx.try_recv().unwrap().body(), b"job3");
    assert_eq!(d.as_secs(), 0);
    assert!(d.subsec_nanos() >= 600_000_000);
    assert!(d.subsec_nanos() <= 999_999_999);
}

#[cfg(feature = "nightly")]
#[test]
fn panic_recover() {
    let (disque, queue, _, jobid) = create_job(b"panic_recover", b"job451", false);

    let (tx, rx) = channel();
    let mut handler = MyHandler::new(tx, JobStatus::AckJob, true);
    handler.should_panic(true);
    let mut el = EventLoop::new(disque, 1, handler);
    el.watch_queue(queue.to_vec());
    el.run_times(3);
    el.stop();
    assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (0, 1));
    assert_eq!(rx.try_recv().unwrap().nack_additional_deliveries(), (0, 2));
    assert!(rx.try_recv().is_err());

    let disque = Disque::open("redis://127.0.0.1:7711/").unwrap();
    disque.ackjob(jobid.as_bytes()).unwrap();
}