resque 0.3.0

rust resque clone
Documentation
extern crate chrono;
extern crate closer;
extern crate env_logger;
extern crate r2d2;
extern crate r2d2_redis;
extern crate resque;

use std::thread;
use std::sync::Arc;
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use r2d2::{Config, Pool, PooledConnection};
use r2d2_redis::RedisConnectionManager;
use resque::{JobService, Result, ResultExt, WithRedis};

type RedisPool = Pool<RedisConnectionManager>;
type RedisConn = PooledConnection<RedisConnectionManager>;

// RUST_LOG=debug cargo run --example hello
fn main() {
    env_logger::init().unwrap();

    let closer = closer::Closer::init();
    let with_redis = HelloWithRedis::new("redis://127.0.0.1:6379/0");

    start_enqueue_thread(with_redis.0.clone());

    let mut config = resque::Config::new("local", 2, with_redis, closer.closed());
    config.set_watch_dead_thread(10);

    let job_service = Box::new(HelloJobService::new());
    config.reg_job_service(job_service).unwrap();

    resque::start(config).unwrap();
}

fn start_enqueue_thread(redis_pool: RedisPool) {
    thread::spawn(move || {
        let redis_conn = redis_pool.get().unwrap();
        loop {
            let now_i = chrono::Utc::now().timestamp();

            let args = vec![];
            resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap();

            let args = vec!["Arg1".to_string()];
            resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap();

            let args = vec!["Arg1".to_string(), "Arg2".to_string()];
            resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap();

            let args = vec!["ScheduleArg1".to_string()];
            resque::enqueue_at(&redis_conn, now_i + 1, "hello", "HelloJob", &args).unwrap();

            let args = vec!["ScheduleArg1".to_string(), "ScheduleArg2".to_string()];
            resque::enqueue_at(&redis_conn, now_i + 2, "hello", "HelloJob", &args).unwrap();

            thread::sleep(Duration::from_secs(5));
        }
    });
}

#[derive(Clone)]
struct HelloWithRedis(RedisPool);

impl HelloWithRedis {
    fn new(redis_url: &str) -> HelloWithRedis {
        let manager = RedisConnectionManager::new(redis_url).unwrap();

        let pool = Pool::new(Config::default(), manager).unwrap();
        HelloWithRedis(pool)
    }
}

impl WithRedis for HelloWithRedis {
    type Conn = RedisConn;

    fn with_redis<F, R>(&self, f: F) -> Result<R>
    where
        F: FnOnce(&Self::Conn) -> Result<R>,
    {
        let redis_conn = self.0.get().chain_err(|| "could not get redis conn")?;
        f(&redis_conn)
    }
}

#[derive(Clone)]
struct HelloJobService(Arc<AtomicUsize>);

impl HelloJobService {
    fn new() -> HelloJobService {
        HelloJobService(Arc::new(AtomicUsize::new(1)))
    }
}

impl JobService for HelloJobService {
    fn run(&self, args: &[String]) -> Result<()> {
        self.0.fetch_add(1, Ordering::Relaxed);

        thread::sleep(Duration::from_secs(2));

        let i = self.0.load(Ordering::Relaxed);
        println!("hello: {}, args: {:?}", i, args);
        Ok(())
    }

    fn queue(&self) -> &str {
        "hello"
    }

    fn job_type(&self) -> &str {
        "HelloJob"
    }

    fn box_clone(&self) -> Box<JobService> {
        Box::new((*self).clone())
    }
}