sidekiq/
job.rs

1use std::collections::BTreeMap;
2use std::iter::FromIterator;
3
4use serde_json::{Value as JValue, Map as JMap};
5use serde::{Serialize, Deserialize, Serializer, Deserializer};
6use serde::de::Error;
7use serde::ser::SerializeMap;
8
9use chrono::{DateTime, UTC, NaiveDateTime};
10
11#[derive(Debug, Clone)]
12pub enum BoolOrUSize {
13    Bool(bool),
14    USize(usize),
15}
16
17#[derive(Debug, Clone)]
18pub struct Job {
19    pub class: String,
20    pub jid: String,
21    pub args: Vec<JValue>,
22    pub created_at: Option<DateTime<UTC>>,
23    pub enqueued_at: DateTime<UTC>,
24    pub queue: String,
25    pub retry: BoolOrUSize,
26    pub at: Option<DateTime<UTC>>, // when scheduled
27    pub namespace: String,
28    pub retry_info: Option<RetryInfo>,
29    pub extra: BTreeMap<String, JValue>,
30}
31
32impl Job {
33    fn with_namespace(&self, snippet: &str) -> String {
34        if self.namespace == "" {
35            snippet.into()
36        } else {
37            self.namespace.clone() + ":" + snippet
38        }
39    }
40
41    pub fn queue_name(&self) -> String {
42        self.with_namespace(&("queue:".to_string() + &self.queue))
43    }
44}
45
46impl Deserialize for Job {
47    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
48        where D: Deserializer
49    {
50        let j = <JValue as Deserialize>::deserialize(deserializer)?;
51        if let JValue::Object(mut obj) = j {
52            let class = JMapExt::<D>::remove_string(&mut obj, "class")?;
53            let args = JMapExt::<D>::remove_vec(&mut obj, "args")?;
54            let queue = JMapExt::<D>::remove_string(&mut obj, "queue")?;
55            let jid = JMapExt::<D>::remove_string(&mut obj, "jid")?;
56            let retry = obj.remove("retry")
57                .and_then(|r| if r.is_u64() {
58                    Some(BoolOrUSize::USize(r.as_u64().unwrap() as usize))
59                } else if r.is_boolean() {
60                    Some(BoolOrUSize::Bool(r.as_bool().unwrap()))
61                } else {
62                    None
63                })
64                .ok_or(D::Error::custom("no member 'retry'"))?;
65            let created_at = JMapExt::<D>::remove_datetime(&mut obj, "created_at").ok();
66            let enqueued_at = JMapExt::<D>::remove_datetime(&mut obj, "enqueued_at")?;
67            let at = JMapExt::<D>::remove_datetime(&mut obj, "at").ok();
68
69            let retry_info = hado! {
70                    retry_count <- JMapExt::<D>::remove_usize(&mut obj, "retry_count").ok();
71                    error_message <- JMapExt::<D>::remove_string(&mut obj, "error_message").ok();
72                    error_class <- JMapExt::<D>::remove_string(&mut obj, "error_class").ok();
73                    error_backtrace <- JMapExt::<D>::remove_svec(&mut obj, "error_backtrace").ok();
74                    failed_at <- JMapExt::<D>::remove_datetime(&mut obj, "failed_at").ok();
75                    retried_at <- Some(JMapExt::<D>::remove_datetime(&mut obj, "retried_at").ok());
76
77                    Some(RetryInfo {
78                        retry_count: retry_count,
79                        error_message: error_message.clone(),
80                        error_class: error_class.clone(),
81                        error_backtrace: error_backtrace.clone(),
82                        failed_at: failed_at,
83                        retried_at: retried_at,
84                    })
85                };
86
87            Ok(Job {
88                class: class,
89                args: args,
90                queue: queue,
91                jid: jid,
92                retry: retry,
93                created_at: created_at,
94                enqueued_at: enqueued_at,
95                at: at,
96                extra: BTreeMap::from_iter(obj),
97                retry_info: retry_info,
98                namespace: "".into(), // it will be set later on
99            })
100        } else {
101            Err(D::Error::custom("not an object"))
102        }
103    }
104}
105
106impl Serialize for Job {
107    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
108        where S: Serializer
109    {
110        let mut map_serializer = serializer.serialize_map(None)?;
111
112        map_serializer.serialize_entry("class", &self.class)?;
113        map_serializer.serialize_entry("args", &self.args)?;
114        map_serializer.serialize_entry("queue", &self.queue)?;
115        map_serializer.serialize_entry("jid", &self.jid)?;
116
117        self.created_at
118            .map(|created_at| {
119                let timestamp = created_at.timestamp() as f64 +
120                                created_at.timestamp_subsec_nanos() as f64 / 1e9;
121                map_serializer.serialize_entry("created_at", &timestamp)
122            })
123            .unwrap_or(Ok(()))?;
124
125        let enqueued_at = self.enqueued_at.timestamp() as f64 +
126                          self.enqueued_at.timestamp_subsec_nanos() as f64 / 1e9;
127        map_serializer.serialize_entry("enqueued_at", &enqueued_at)?;
128
129        self.at
130            .map(|at| {
131                let timestamp = at.timestamp() as f64 + at.timestamp_subsec_nanos() as f64 / 1e9;
132                map_serializer.serialize_entry("at", &timestamp)
133            })
134            .unwrap_or(Ok(()))?;
135
136        match self.retry {
137            BoolOrUSize::Bool(x) => {
138                map_serializer.serialize_entry("retry", &x)?;
139            }
140            BoolOrUSize::USize(x) => {
141                map_serializer.serialize_entry("retry", &x)?;
142            }
143        };
144
145        if let Some(ref retry_info) = self.retry_info {
146            map_serializer.serialize_entry("error_backtrace", &retry_info.error_backtrace)?;
147            map_serializer.serialize_entry("error_class", &retry_info.error_class)?;
148            map_serializer.serialize_entry("error_message", &retry_info.error_message)?;
149            let failed_at = retry_info.failed_at.timestamp() as f64 +
150                            retry_info.failed_at.timestamp_subsec_nanos() as f64 / 1e9;
151            map_serializer.serialize_entry("failed_at", &failed_at)?;
152            retry_info.retried_at.map(|retried_at| {
153                let retried_at = retried_at.timestamp() as f64 +
154                                 retried_at.timestamp_subsec_nanos() as f64 / 1e9;
155                map_serializer.serialize_entry("retried_at", &retried_at)
156            });
157            map_serializer.serialize_entry("retry_count", &retry_info.retry_count)?;
158        }
159        for (k, v) in &self.extra {
160            map_serializer.serialize_entry(k, v)?;
161        }
162
163        map_serializer.end()
164
165    }
166}
167
168#[derive(Clone, Debug)]
169pub struct RetryInfo {
170    pub retry_count: usize,
171    pub error_message: String,
172    pub error_class: String,
173    pub error_backtrace: Vec<String>,
174    pub failed_at: DateTime<UTC>,
175    pub retried_at: Option<DateTime<UTC>>,
176}
177
178trait JMapExt<D>
179    where D: Deserializer
180{
181    fn remove_datetime(&mut self, key: &str) -> Result<DateTime<UTC>, D::Error>;
182    fn remove_string(&mut self, key: &str) -> Result<String, D::Error>;
183    fn remove_vec(&mut self, key: &str) -> Result<Vec<JValue>, D::Error>;
184    fn remove_svec(&mut self, key: &str) -> Result<Vec<String>, D::Error>;
185    fn remove_usize(&mut self, key: &str) -> Result<usize, D::Error>;
186}
187
188impl<D> JMapExt<D> for JMap<String, JValue>
189    where D: Deserializer
190{
191    fn remove_datetime(&mut self, key: &str) -> Result<DateTime<UTC>, D::Error> {
192        self.remove(key)
193            .and_then(|v| v.as_f64())
194            .map(|f| NaiveDateTime::from_timestamp(f as i64, ((f - f.floor()) * 1e9) as u32))
195            .map(|t| DateTime::from_utc(t, UTC))
196            .ok_or(D::Error::custom(format!("no member '{}'", key)))
197    }
198
199    fn remove_vec(&mut self, key: &str) -> Result<Vec<JValue>, D::Error> {
200        let value = self.remove(key);
201        match value {
202            Some(JValue::Array(v)) => Ok(v),
203            Some(_) => Err(D::Error::custom(format!("'{}' not a array", key))),
204            None => Err(D::Error::custom(format!("no member '{}'", key))),
205        }
206    }
207
208    fn remove_svec(&mut self, key: &str) -> Result<Vec<String>, D::Error> {
209        let value = self.remove(key);
210        match value {
211            Some(JValue::Array(v)) => {
212                v.into_iter()
213                    .map(|e| match e {
214                        JValue::String(s) => Ok(s),
215                        _ => Err(D::Error::custom(format!("'{}' contains non string", key))),
216                    })
217                    .collect()
218            }
219            Some(_) => Err(D::Error::custom(format!("'{}' not a array", key))),
220            None => Err(D::Error::custom(format!("no member '{}'", key))),
221        }
222    }
223
224    fn remove_string(&mut self, key: &str) -> Result<String, D::Error> {
225        self.remove(key)
226            .and_then(|v| v.as_str().map(|s| s.to_string()))
227            .ok_or(D::Error::custom(format!("no member '{}'", key)))
228    }
229
230    fn remove_usize(&mut self, key: &str) -> Result<usize, D::Error> {
231        match self.remove(key) {
232            Some(JValue::Number(number)) => Ok(number.as_f64().unwrap_or(0f64) as usize),
233            Some(_) => Err(D::Error::custom(format!("'{}' not a usize", key))),
234            None => Err(D::Error::custom(format!("no member '{}'", key))),
235        }
236    }
237}