1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
use serde_json::to_string; use chrono::UTC; use ::RedisPool; use ::JobSuccessType; use errors::Result; use job::Job; pub type MiddleWareResult = Result<JobSuccessType>; pub type NextFunc<'a> = &'a mut (FnMut(&mut Job, RedisPool) -> MiddleWareResult + 'a); pub trait MiddleWare: Send { fn handle(&mut self, job: &mut Job, redis: RedisPool, next: NextFunc) -> MiddleWareResult; fn cloned(&mut self) -> Box<MiddleWare>; } impl<F> MiddleWare for F where F: FnMut(&mut Job, RedisPool, NextFunc) -> MiddleWareResult + Copy + Send + 'static { fn handle(&mut self, job: &mut Job, redis: RedisPool, next: NextFunc) -> MiddleWareResult { self(job, redis, next) } fn cloned(&mut self) -> Box<MiddleWare> { Box::new(*self) } } pub fn peek_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult { println!("Before Call {:?}", job); let r = next(job, redis); println!("After Call {:?}", job); r } pub fn retry_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult { use redis::Commands; use job::BoolOrUSize::*; let conn = redis.get().unwrap(); let r = next(job, redis); match r { Err(e) => { match job.retry { Bool(true) => { warn!("Job '{:?}' failed with '{}', retrying", job, e); job.retry = Bool(false); try!(conn.lpush(job.queue_name(), to_string(job).unwrap())); Ok(JobSuccessType::Ignore) } USize(u) if u > 0 => { warn!("'{:?}' failed with '{}', retrying", job, e); job.retry = USize(u - 1); try!(conn.lpush(job.queue_name(), to_string(job).unwrap())); Ok(JobSuccessType::Ignore) } _ => Err(e), } } Ok(o) => Ok(o), } } pub fn time_elapse_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult { let j = job.clone(); let now = UTC::now(); let r = next(job, redis); let that = UTC::now(); info!("'{:?}' takes {}", j, that - now); r }