1use serde_json::to_string;
2use chrono::UTC;
3
4use RedisPool;
5use JobSuccessType;
6use errors::Result;
7use job::{Job, RetryInfo};
8
9pub type MiddleWareResult = Result<JobSuccessType>;
10pub type NextFunc<'a> = &'a mut (FnMut(&mut Job, RedisPool) -> MiddleWareResult + 'a);
11
12pub trait MiddleWare: Send {
13 fn handle(&mut self, job: &mut Job, redis: RedisPool, next: NextFunc) -> MiddleWareResult;
14 fn cloned(&mut self) -> Box<MiddleWare>;
15}
16
17impl<F> MiddleWare for F
18 where F: FnMut(&mut Job, RedisPool, NextFunc) -> MiddleWareResult + Copy + Send + 'static
19{
20 fn handle(&mut self, job: &mut Job, redis: RedisPool, next: NextFunc) -> MiddleWareResult {
21 self(job, redis, next)
22 }
23 fn cloned(&mut self) -> Box<MiddleWare> {
24 Box::new(*self)
25 }
26}
27
28pub fn peek_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult {
29 println!("Before Call {:?}", job);
30 let r = next(job, redis);
31 println!("After Call {:?}", job);
32 r
33}
34
35pub fn retry_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult {
36 use redis::Commands;
37 use job::BoolOrUSize::*;
38 let conn = redis.get().unwrap();
39 let r = next(job, redis);
40 match r {
41 Err(e) => {
42 let retry_count = job.retry_info.as_ref().map(|i| i.retry_count).unwrap_or(0);
43 match (&job.retry, usize::max_value()) {
44 (&Bool(true), u) | (&USize(u), _) if retry_count < u => {
45 warn!("Job '{:?}' failed with '{}', retrying", job, e);
46 job.retry_info = Some(RetryInfo {
47 retry_count: retry_count + 1,
48 error_message: format!("{}", e),
49 error_class: "dummy".to_string(),
50 error_backtrace: e.backtrace()
51 .map(|bt| {
52 let s = format!("{:?}", bt);
53 s.split('\n').map(|s| s.to_string()).collect()
54 })
55 .unwrap_or(vec![]),
56 failed_at: UTC::now(),
57 retried_at: None,
58 });
59 let _: () = conn.lpush(job.queue_name(), to_string(job).unwrap())?;
60 Ok(JobSuccessType::Ignore)
61 }
62 _ => Err(e),
63 }
64 }
65 Ok(o) => Ok(o),
66 }
67}
68
69pub fn time_elapse_middleware(job: &mut Job,
70 redis: RedisPool,
71 mut next: NextFunc)
72 -> MiddleWareResult {
73 let j = job.clone();
74 let now = UTC::now();
75 let r = next(job, redis);
76 let that = UTC::now();
77 info!("'{:?}' takes {}", j, that.signed_duration_since(now));
78 r
79}