sidekiq/
middleware.rs

1use serde_json::to_string;
2use chrono::UTC;
3
4use RedisPool;
5use JobSuccessType;
6use errors::Result;
7use job::{Job, RetryInfo};
8
9pub type MiddleWareResult = Result<JobSuccessType>;
10pub type NextFunc<'a> = &'a mut (FnMut(&mut Job, RedisPool) -> MiddleWareResult + 'a);
11
12pub trait MiddleWare: Send {
13    fn handle(&mut self, job: &mut Job, redis: RedisPool, next: NextFunc) -> MiddleWareResult;
14    fn cloned(&mut self) -> Box<MiddleWare>;
15}
16
17impl<F> MiddleWare for F
18    where F: FnMut(&mut Job, RedisPool, NextFunc) -> MiddleWareResult + Copy + Send + 'static
19{
20    fn handle(&mut self, job: &mut Job, redis: RedisPool, next: NextFunc) -> MiddleWareResult {
21        self(job, redis, next)
22    }
23    fn cloned(&mut self) -> Box<MiddleWare> {
24        Box::new(*self)
25    }
26}
27
28pub fn peek_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult {
29    println!("Before Call {:?}", job);
30    let r = next(job, redis);
31    println!("After Call {:?}", job);
32    r
33}
34
35pub fn retry_middleware(job: &mut Job, redis: RedisPool, mut next: NextFunc) -> MiddleWareResult {
36    use redis::Commands;
37    use job::BoolOrUSize::*;
38    let conn = redis.get().unwrap();
39    let r = next(job, redis);
40    match r {
41        Err(e) => {
42            let retry_count = job.retry_info.as_ref().map(|i| i.retry_count).unwrap_or(0);
43            match (&job.retry, usize::max_value()) {
44                (&Bool(true), u) | (&USize(u), _) if retry_count < u => {
45                    warn!("Job '{:?}' failed with '{}', retrying", job, e);
46                    job.retry_info = Some(RetryInfo {
47                        retry_count: retry_count + 1,
48                        error_message: format!("{}", e),
49                        error_class: "dummy".to_string(),
50                        error_backtrace: e.backtrace()
51                            .map(|bt| {
52                                let s = format!("{:?}", bt);
53                                s.split('\n').map(|s| s.to_string()).collect()
54                            })
55                            .unwrap_or(vec![]),
56                        failed_at: UTC::now(),
57                        retried_at: None,
58                    });
59                    let _: () = conn.lpush(job.queue_name(), to_string(job).unwrap())?;
60                    Ok(JobSuccessType::Ignore)
61                }
62                _ => Err(e),
63            }
64        }
65        Ok(o) => Ok(o),
66    }
67}
68
69pub fn time_elapse_middleware(job: &mut Job,
70                              redis: RedisPool,
71                              mut next: NextFunc)
72                              -> MiddleWareResult {
73    let j = job.clone();
74    let now = UTC::now();
75    let r = next(job, redis);
76    let that = UTC::now();
77    info!("'{:?}' takes {}", j, that.signed_duration_since(now));
78    r
79}