celery_rs_core/message_protocol/
headers.rs

1/*
2Headers for the message
3
4Author Andrew Evans
5*/
6
7use std::collections::BTreeMap;
8use std::process;
9
10use amiquip::AmqpValue;
11use amq_protocol::uri::AMQPScheme::AMQP;
12use serde_json::{Map, Number, to_string, Value};
13use uuid::Uuid;
14
15use crate::argparse::{args::Args, kwargs::KwArgs};
16use crate::nodename::anon_name;
17
18
19/// Soft and hard time limits
20#[derive(Clone, Debug)]
21pub struct TimeLimit {
22    pub soft: i64,
23    pub hard: i64,
24}
25
26
27/// Stored headers
28#[derive(Clone, Debug)]
29pub struct Headers{
30    pub lang: String,
31    pub task: String,
32    pub id: String,
33    pub root_id: String,
34    pub parent_id: Option<String>,
35    pub group: Option<String>,
36    pub meth: Option<String>,
37    pub shadow: Option<String>,
38    pub eta: Option<String>,
39    pub expires: Option<String>,
40    pub retries: Option<i8>,
41    pub timelimit: Option<TimeLimit>,
42    pub argsrepr: Option<Args>,
43    pub kwargsrepr: Option<KwArgs>,
44    pub origin: Option<String>,
45}
46
47
48/// Headers implementation
49impl Headers{
50
51    /// convert headers to a btree map
52    pub fn convert_to_btree_map(&self) -> BTreeMap<String, AmqpValue>{
53        let mut jmap = BTreeMap::<String, AmqpValue>::new();
54        jmap.insert(String::from("lang"), AmqpValue::LongString(self.lang.clone()));
55        jmap.insert(String::from("task"), AmqpValue::LongString(self.task.clone()));
56        jmap.insert(String::from("id"), AmqpValue::LongString(self.id.clone()));
57        jmap.insert(String::from("root_id"), AmqpValue::LongString(self.root_id.clone()));
58
59        if self.parent_id.is_some() {
60            jmap.insert(String::from("parent_id"), AmqpValue::LongString(self.parent_id.clone().unwrap()));
61        }else {
62            jmap.insert("parent_id".to_string(), AmqpValue::Void);
63        }
64
65        if self.group.is_some() {
66            jmap.insert(String::from("group"), AmqpValue::LongString(self.group.clone().unwrap()));
67        }else{
68            jmap.insert("group".to_string(), AmqpValue::Void);
69        }
70
71        if self.meth.is_some() {
72            let v = self.meth.clone().unwrap();
73            jmap.insert(String::from("meth"), AmqpValue::LongString(v));
74        }
75
76        if self.shadow.is_some(){
77            let v = self.shadow.clone().unwrap();
78            jmap.insert(String::from("shadow"), AmqpValue::LongString(v));
79        }else{
80            jmap.insert("shadow".to_string(), AmqpValue::Void);
81        }
82
83        if self.eta.is_some(){
84            let v = self.eta.clone().unwrap();
85            jmap.insert(String::from("eta"), AmqpValue::LongString(v));
86        }else{
87            jmap.insert("eta".to_string(), AmqpValue::Void);
88        }
89
90        if self.expires.is_some(){
91            let v = self.expires.clone().unwrap();
92            jmap.insert(String::from("expires"), AmqpValue::LongString(v));
93        }else{
94            jmap.insert("expires".to_string(), AmqpValue::Void);
95        }
96
97        if self.retries.is_some(){
98            let v = self.retries.clone().unwrap();
99            jmap.insert(String::from("retries"), AmqpValue::ShortShortInt(v));
100        }else{
101            jmap.insert("retries".to_string(), AmqpValue::Void);
102        }
103
104        if self.timelimit.is_some(){
105            let v = self.timelimit.clone().unwrap();
106            let vtup = vec![AmqpValue::LongLongInt(v.soft), AmqpValue::LongLongInt(v.hard)];
107            jmap.insert(String::from("timelimit"), AmqpValue::FieldArray(vtup));
108        }else{
109            let vtup = vec![AmqpValue::Void, AmqpValue::Void];
110            jmap.insert("timelimit".to_string(), AmqpValue::FieldArray(vtup));
111        }
112
113        if self.argsrepr.is_some(){
114            let v = self.argsrepr.clone().unwrap();
115            let val = v.args_to_vec();
116            let jstr = to_string(&Value::Array(val));
117            jmap.insert("argsrepr".to_string(), AmqpValue::LongString(jstr.unwrap()));
118        }else{
119            let v = Value::from(Vec::<Value>::new());
120            let vm = to_string(&v);
121            jmap.insert("argsrepr".to_string(), AmqpValue::LongString(vm.unwrap()));
122        }
123
124        if self.kwargsrepr.is_some(){
125            let v = self.kwargsrepr.clone().unwrap();
126            let vm = to_string(&Value::from(v.convert_to_map()));
127            if vm.is_ok() {
128                let jstr = vm.unwrap();
129                jmap.insert(String::from("kwargsrepr"), AmqpValue::LongString(jstr));
130            }
131        }else{
132            let v = Map::<String, Value>::new();
133            let vm = to_string(&Value::from(v));
134            if vm.is_ok() {
135                let jstr = vm.unwrap();
136                jmap.insert("kwargsrepr".to_string(), AmqpValue::LongString(jstr));
137            }
138        }
139
140
141        if self.origin.is_some(){
142            let v = self.origin.clone().unwrap();
143            jmap.insert(String::from("origin"), AmqpValue::LongString(v));
144        }else{
145            let nodename = anon_name::get_anon_nodename(None, None);
146            jmap.insert("origin".to_string(), AmqpValue::LongString(nodename));
147        }
148        jmap
149    }
150
151    /// convert to a json capable map
152    pub fn convert_to_json_map(&self) -> Map<String, Value>{
153        let mut jmap = Map::new();
154        jmap.insert(String::from("lang"), Value::String(self.lang.clone()));
155        jmap.insert(String::from("task"), Value::String(self.task.clone()));
156        jmap.insert(String::from("id"), Value::String(self.id.clone()));
157        jmap.insert(String::from("root_id"), Value::String(self.root_id.clone()));
158
159        if self.parent_id.is_some() {
160            let parent_id = self.parent_id.clone().unwrap();
161            jmap.insert(String::from("parent_id"), Value::String(parent_id));
162        }
163
164        if self.group.is_some() {
165            let group = self.group.clone().unwrap();
166            jmap.insert(String::from("group"), Value::String(group));
167        }
168
169        if self.meth.is_some() {
170            let v = self.meth.clone().unwrap();
171            jmap.insert(String::from("meth"), Value::from(v));
172        }
173
174        if self.shadow.is_some(){
175            let v = self.shadow.clone().unwrap();
176            jmap.insert(String::from("shadow"), Value::from(v));
177        }
178
179        if self.eta.is_some(){
180            let v = self.eta.clone().unwrap();
181            jmap.insert(String::from("eta"), Value::from(v));
182        }
183
184        if self.expires.is_some(){
185            let v = self.expires.clone().unwrap();
186            jmap.insert(String::from("expires"), Value::from(v));
187        }
188
189        if self.retries.is_some(){
190            let v = self.retries.clone().unwrap();
191            jmap.insert(String::from("retries"), Value::from(v));
192        }
193
194        if self.timelimit.is_some(){
195            let v = self.timelimit.clone().unwrap();
196            let vtup = vec![Value::from(v.soft), Value::from(v.hard)];
197            jmap.insert(String::from("timelimit"), Value::Array(vtup));
198        }
199
200        if self.argsrepr.is_some(){
201            let v = self.argsrepr.clone().unwrap();
202            let argsrepr = v.args_to_vec();
203            jmap.insert(String::from("args"), Value::Array(argsrepr));
204        }
205
206        if self.kwargsrepr.is_some(){
207            let v = self.kwargsrepr.clone().unwrap();
208            let vm = v.convert_to_map();
209            jmap.insert(String::from("kwargsrepr"), Value::Object(vm));
210        }
211
212
213        if self.origin.is_some(){
214            let v = self.origin.clone().unwrap();
215            jmap.insert(String::from("origin"), Value::from(v));
216        }
217        jmap
218    }
219
220    /// create new headers
221    pub fn new(lang: String, task: String, id: String, root_id: String) -> Headers{
222        Headers{
223            lang: lang,
224            task: task,
225            id: id,
226            root_id: root_id,
227            parent_id: None,
228            group: None,
229            meth: None,
230            shadow: None,
231            eta: None,
232            expires: None,
233            retries: None,
234            timelimit: None,
235            argsrepr: None,
236            kwargsrepr: None,
237            origin: None,
238        }
239    }
240}
241
242
243#[cfg(test)]
244mod tests{
245    use std::vec::Vec;
246
247    use crate::argparse::args::{Arg, Args};
248    use crate::message_protocol::headers::Headers;
249
250    #[test]
251    fn should_convert_to_json_map(){
252        let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
253        let arep = Args{
254            args: Vec::<Arg>::new(),
255        };
256        h.argsrepr = Some(arep);
257        let m = h.convert_to_json_map();
258        let l = m.get("lang");
259        assert!(String::from(l.unwrap().as_str().unwrap()).eq("rs"));
260    }
261
262    #[test]
263    fn should_create_new_headers(){
264        let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
265        let arep = Args{
266          args: Vec::<Arg>::new(),
267        };
268        h.argsrepr = Some(arep);
269        let lang = h.lang;
270        let optrep = h.argsrepr;
271        assert!(lang.eq("rs"));
272        assert!(optrep.unwrap().args.len() == 0);
273    }
274}