1use std::collections::BTreeMap;
2use std::iter::FromIterator;
3
4use serde_json::{Value as JValue, Map as JMap};
5use serde::{Serialize, Deserialize, Serializer, Deserializer};
6use serde::de::Error;
7use serde::ser::SerializeMap;
8
9use chrono::{DateTime, UTC, NaiveDateTime};
10
11#[derive(Debug, Clone)]
12pub enum BoolOrUSize {
13 Bool(bool),
14 USize(usize),
15}
16
17#[derive(Debug, Clone)]
18pub struct Job {
19 pub class: String,
20 pub jid: String,
21 pub args: Vec<JValue>,
22 pub created_at: Option<DateTime<UTC>>,
23 pub enqueued_at: DateTime<UTC>,
24 pub queue: String,
25 pub retry: BoolOrUSize,
26 pub at: Option<DateTime<UTC>>, pub namespace: String,
28 pub retry_info: Option<RetryInfo>,
29 pub extra: BTreeMap<String, JValue>,
30}
31
32impl Job {
33 fn with_namespace(&self, snippet: &str) -> String {
34 if self.namespace == "" {
35 snippet.into()
36 } else {
37 self.namespace.clone() + ":" + snippet
38 }
39 }
40
41 pub fn queue_name(&self) -> String {
42 self.with_namespace(&("queue:".to_string() + &self.queue))
43 }
44}
45
46impl Deserialize for Job {
47 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
48 where D: Deserializer
49 {
50 let j = <JValue as Deserialize>::deserialize(deserializer)?;
51 if let JValue::Object(mut obj) = j {
52 let class = JMapExt::<D>::remove_string(&mut obj, "class")?;
53 let args = JMapExt::<D>::remove_vec(&mut obj, "args")?;
54 let queue = JMapExt::<D>::remove_string(&mut obj, "queue")?;
55 let jid = JMapExt::<D>::remove_string(&mut obj, "jid")?;
56 let retry = obj.remove("retry")
57 .and_then(|r| if r.is_u64() {
58 Some(BoolOrUSize::USize(r.as_u64().unwrap() as usize))
59 } else if r.is_boolean() {
60 Some(BoolOrUSize::Bool(r.as_bool().unwrap()))
61 } else {
62 None
63 })
64 .ok_or(D::Error::custom("no member 'retry'"))?;
65 let created_at = JMapExt::<D>::remove_datetime(&mut obj, "created_at").ok();
66 let enqueued_at = JMapExt::<D>::remove_datetime(&mut obj, "enqueued_at")?;
67 let at = JMapExt::<D>::remove_datetime(&mut obj, "at").ok();
68
69 let retry_info = hado! {
70 retry_count <- JMapExt::<D>::remove_usize(&mut obj, "retry_count").ok();
71 error_message <- JMapExt::<D>::remove_string(&mut obj, "error_message").ok();
72 error_class <- JMapExt::<D>::remove_string(&mut obj, "error_class").ok();
73 error_backtrace <- JMapExt::<D>::remove_svec(&mut obj, "error_backtrace").ok();
74 failed_at <- JMapExt::<D>::remove_datetime(&mut obj, "failed_at").ok();
75 retried_at <- Some(JMapExt::<D>::remove_datetime(&mut obj, "retried_at").ok());
76
77 Some(RetryInfo {
78 retry_count: retry_count,
79 error_message: error_message.clone(),
80 error_class: error_class.clone(),
81 error_backtrace: error_backtrace.clone(),
82 failed_at: failed_at,
83 retried_at: retried_at,
84 })
85 };
86
87 Ok(Job {
88 class: class,
89 args: args,
90 queue: queue,
91 jid: jid,
92 retry: retry,
93 created_at: created_at,
94 enqueued_at: enqueued_at,
95 at: at,
96 extra: BTreeMap::from_iter(obj),
97 retry_info: retry_info,
98 namespace: "".into(), })
100 } else {
101 Err(D::Error::custom("not an object"))
102 }
103 }
104}
105
106impl Serialize for Job {
107 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
108 where S: Serializer
109 {
110 let mut map_serializer = serializer.serialize_map(None)?;
111
112 map_serializer.serialize_entry("class", &self.class)?;
113 map_serializer.serialize_entry("args", &self.args)?;
114 map_serializer.serialize_entry("queue", &self.queue)?;
115 map_serializer.serialize_entry("jid", &self.jid)?;
116
117 self.created_at
118 .map(|created_at| {
119 let timestamp = created_at.timestamp() as f64 +
120 created_at.timestamp_subsec_nanos() as f64 / 1e9;
121 map_serializer.serialize_entry("created_at", ×tamp)
122 })
123 .unwrap_or(Ok(()))?;
124
125 let enqueued_at = self.enqueued_at.timestamp() as f64 +
126 self.enqueued_at.timestamp_subsec_nanos() as f64 / 1e9;
127 map_serializer.serialize_entry("enqueued_at", &enqueued_at)?;
128
129 self.at
130 .map(|at| {
131 let timestamp = at.timestamp() as f64 + at.timestamp_subsec_nanos() as f64 / 1e9;
132 map_serializer.serialize_entry("at", ×tamp)
133 })
134 .unwrap_or(Ok(()))?;
135
136 match self.retry {
137 BoolOrUSize::Bool(x) => {
138 map_serializer.serialize_entry("retry", &x)?;
139 }
140 BoolOrUSize::USize(x) => {
141 map_serializer.serialize_entry("retry", &x)?;
142 }
143 };
144
145 if let Some(ref retry_info) = self.retry_info {
146 map_serializer.serialize_entry("error_backtrace", &retry_info.error_backtrace)?;
147 map_serializer.serialize_entry("error_class", &retry_info.error_class)?;
148 map_serializer.serialize_entry("error_message", &retry_info.error_message)?;
149 let failed_at = retry_info.failed_at.timestamp() as f64 +
150 retry_info.failed_at.timestamp_subsec_nanos() as f64 / 1e9;
151 map_serializer.serialize_entry("failed_at", &failed_at)?;
152 retry_info.retried_at.map(|retried_at| {
153 let retried_at = retried_at.timestamp() as f64 +
154 retried_at.timestamp_subsec_nanos() as f64 / 1e9;
155 map_serializer.serialize_entry("retried_at", &retried_at)
156 });
157 map_serializer.serialize_entry("retry_count", &retry_info.retry_count)?;
158 }
159 for (k, v) in &self.extra {
160 map_serializer.serialize_entry(k, v)?;
161 }
162
163 map_serializer.end()
164
165 }
166}
167
168#[derive(Clone, Debug)]
169pub struct RetryInfo {
170 pub retry_count: usize,
171 pub error_message: String,
172 pub error_class: String,
173 pub error_backtrace: Vec<String>,
174 pub failed_at: DateTime<UTC>,
175 pub retried_at: Option<DateTime<UTC>>,
176}
177
178trait JMapExt<D>
179 where D: Deserializer
180{
181 fn remove_datetime(&mut self, key: &str) -> Result<DateTime<UTC>, D::Error>;
182 fn remove_string(&mut self, key: &str) -> Result<String, D::Error>;
183 fn remove_vec(&mut self, key: &str) -> Result<Vec<JValue>, D::Error>;
184 fn remove_svec(&mut self, key: &str) -> Result<Vec<String>, D::Error>;
185 fn remove_usize(&mut self, key: &str) -> Result<usize, D::Error>;
186}
187
188impl<D> JMapExt<D> for JMap<String, JValue>
189 where D: Deserializer
190{
191 fn remove_datetime(&mut self, key: &str) -> Result<DateTime<UTC>, D::Error> {
192 self.remove(key)
193 .and_then(|v| v.as_f64())
194 .map(|f| NaiveDateTime::from_timestamp(f as i64, ((f - f.floor()) * 1e9) as u32))
195 .map(|t| DateTime::from_utc(t, UTC))
196 .ok_or(D::Error::custom(format!("no member '{}'", key)))
197 }
198
199 fn remove_vec(&mut self, key: &str) -> Result<Vec<JValue>, D::Error> {
200 let value = self.remove(key);
201 match value {
202 Some(JValue::Array(v)) => Ok(v),
203 Some(_) => Err(D::Error::custom(format!("'{}' not a array", key))),
204 None => Err(D::Error::custom(format!("no member '{}'", key))),
205 }
206 }
207
208 fn remove_svec(&mut self, key: &str) -> Result<Vec<String>, D::Error> {
209 let value = self.remove(key);
210 match value {
211 Some(JValue::Array(v)) => {
212 v.into_iter()
213 .map(|e| match e {
214 JValue::String(s) => Ok(s),
215 _ => Err(D::Error::custom(format!("'{}' contains non string", key))),
216 })
217 .collect()
218 }
219 Some(_) => Err(D::Error::custom(format!("'{}' not a array", key))),
220 None => Err(D::Error::custom(format!("no member '{}'", key))),
221 }
222 }
223
224 fn remove_string(&mut self, key: &str) -> Result<String, D::Error> {
225 self.remove(key)
226 .and_then(|v| v.as_str().map(|s| s.to_string()))
227 .ok_or(D::Error::custom(format!("no member '{}'", key)))
228 }
229
230 fn remove_usize(&mut self, key: &str) -> Result<usize, D::Error> {
231 match self.remove(key) {
232 Some(JValue::Number(number)) => Ok(number.as_f64().unwrap_or(0f64) as usize),
233 Some(_) => Err(D::Error::custom(format!("'{}' not a usize", key))),
234 None => Err(D::Error::custom(format!("no member '{}'", key))),
235 }
236 }
237}