extern crate chrono;
extern crate closer;
extern crate env_logger;
extern crate r2d2;
extern crate r2d2_redis;
extern crate resque;
use std::thread;
use std::sync::Arc;
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use r2d2::{Config, Pool, PooledConnection};
use r2d2_redis::RedisConnectionManager;
use resque::{JobService, Result, ResultExt, WithRedis};
type RedisPool = Pool<RedisConnectionManager>;
type RedisConn = PooledConnection<RedisConnectionManager>;
fn main() {
env_logger::init().unwrap();
let closer = closer::Closer::init();
let with_redis = HelloWithRedis::new("redis://127.0.0.1:6379/0");
start_enqueue_thread(with_redis.0.clone());
let mut config = resque::Config::new("local", 2, with_redis, closer.closed());
config.set_watch_dead_thread(10);
let job_service = Box::new(HelloJobService::new());
config.reg_job_service(job_service).unwrap();
resque::start(config).unwrap();
}
fn start_enqueue_thread(redis_pool: RedisPool) {
thread::spawn(move || {
let redis_conn = redis_pool.get().unwrap();
loop {
let now_i = chrono::Utc::now().timestamp();
let args = vec![];
resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap();
let args = vec!["Arg1".to_string()];
resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap();
let args = vec!["Arg1".to_string(), "Arg2".to_string()];
resque::enqueue(&redis_conn, "hello", "HelloJob", &args).unwrap();
let args = vec!["ScheduleArg1".to_string()];
resque::enqueue_at(&redis_conn, now_i + 1, "hello", "HelloJob", &args).unwrap();
let args = vec!["ScheduleArg1".to_string(), "ScheduleArg2".to_string()];
resque::enqueue_at(&redis_conn, now_i + 2, "hello", "HelloJob", &args).unwrap();
thread::sleep(Duration::from_secs(5));
}
});
}
#[derive(Clone)]
struct HelloWithRedis(RedisPool);
impl HelloWithRedis {
fn new(redis_url: &str) -> HelloWithRedis {
let manager = RedisConnectionManager::new(redis_url).unwrap();
let pool = Pool::new(Config::default(), manager).unwrap();
HelloWithRedis(pool)
}
}
impl WithRedis for HelloWithRedis {
type Conn = RedisConn;
fn with_redis<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&Self::Conn) -> Result<R>,
{
let redis_conn = self.0.get().chain_err(|| "could not get redis conn")?;
f(&redis_conn)
}
}
#[derive(Clone)]
struct HelloJobService(Arc<AtomicUsize>);
impl HelloJobService {
fn new() -> HelloJobService {
HelloJobService(Arc::new(AtomicUsize::new(1)))
}
}
impl JobService for HelloJobService {
fn run(&self, args: &[String]) -> Result<()> {
self.0.fetch_add(1, Ordering::Relaxed);
thread::sleep(Duration::from_secs(2));
let i = self.0.load(Ordering::Relaxed);
println!("hello: {}, args: {:?}", i, args);
Ok(())
}
fn queue(&self) -> &str {
"hello"
}
fn job_type(&self) -> &str {
"HelloJob"
}
fn box_clone(&self) -> Box<JobService> {
Box::new((*self).clone())
}
}