celery_rs_core/message_protocol/
headers.rs1use std::collections::BTreeMap;
8use std::process;
9
10use amiquip::AmqpValue;
11use amq_protocol::uri::AMQPScheme::AMQP;
12use serde_json::{Map, Number, to_string, Value};
13use uuid::Uuid;
14
15use crate::argparse::{args::Args, kwargs::KwArgs};
16use crate::nodename::anon_name;
17
18
19#[derive(Clone, Debug)]
21pub struct TimeLimit {
22 pub soft: i64,
23 pub hard: i64,
24}
25
26
27#[derive(Clone, Debug)]
29pub struct Headers{
30 pub lang: String,
31 pub task: String,
32 pub id: String,
33 pub root_id: String,
34 pub parent_id: Option<String>,
35 pub group: Option<String>,
36 pub meth: Option<String>,
37 pub shadow: Option<String>,
38 pub eta: Option<String>,
39 pub expires: Option<String>,
40 pub retries: Option<i8>,
41 pub timelimit: Option<TimeLimit>,
42 pub argsrepr: Option<Args>,
43 pub kwargsrepr: Option<KwArgs>,
44 pub origin: Option<String>,
45}
46
47
48impl Headers{
50
51 pub fn convert_to_btree_map(&self) -> BTreeMap<String, AmqpValue>{
53 let mut jmap = BTreeMap::<String, AmqpValue>::new();
54 jmap.insert(String::from("lang"), AmqpValue::LongString(self.lang.clone()));
55 jmap.insert(String::from("task"), AmqpValue::LongString(self.task.clone()));
56 jmap.insert(String::from("id"), AmqpValue::LongString(self.id.clone()));
57 jmap.insert(String::from("root_id"), AmqpValue::LongString(self.root_id.clone()));
58
59 if self.parent_id.is_some() {
60 jmap.insert(String::from("parent_id"), AmqpValue::LongString(self.parent_id.clone().unwrap()));
61 }else {
62 jmap.insert("parent_id".to_string(), AmqpValue::Void);
63 }
64
65 if self.group.is_some() {
66 jmap.insert(String::from("group"), AmqpValue::LongString(self.group.clone().unwrap()));
67 }else{
68 jmap.insert("group".to_string(), AmqpValue::Void);
69 }
70
71 if self.meth.is_some() {
72 let v = self.meth.clone().unwrap();
73 jmap.insert(String::from("meth"), AmqpValue::LongString(v));
74 }
75
76 if self.shadow.is_some(){
77 let v = self.shadow.clone().unwrap();
78 jmap.insert(String::from("shadow"), AmqpValue::LongString(v));
79 }else{
80 jmap.insert("shadow".to_string(), AmqpValue::Void);
81 }
82
83 if self.eta.is_some(){
84 let v = self.eta.clone().unwrap();
85 jmap.insert(String::from("eta"), AmqpValue::LongString(v));
86 }else{
87 jmap.insert("eta".to_string(), AmqpValue::Void);
88 }
89
90 if self.expires.is_some(){
91 let v = self.expires.clone().unwrap();
92 jmap.insert(String::from("expires"), AmqpValue::LongString(v));
93 }else{
94 jmap.insert("expires".to_string(), AmqpValue::Void);
95 }
96
97 if self.retries.is_some(){
98 let v = self.retries.clone().unwrap();
99 jmap.insert(String::from("retries"), AmqpValue::ShortShortInt(v));
100 }else{
101 jmap.insert("retries".to_string(), AmqpValue::Void);
102 }
103
104 if self.timelimit.is_some(){
105 let v = self.timelimit.clone().unwrap();
106 let vtup = vec![AmqpValue::LongLongInt(v.soft), AmqpValue::LongLongInt(v.hard)];
107 jmap.insert(String::from("timelimit"), AmqpValue::FieldArray(vtup));
108 }else{
109 let vtup = vec![AmqpValue::Void, AmqpValue::Void];
110 jmap.insert("timelimit".to_string(), AmqpValue::FieldArray(vtup));
111 }
112
113 if self.argsrepr.is_some(){
114 let v = self.argsrepr.clone().unwrap();
115 let val = v.args_to_vec();
116 let jstr = to_string(&Value::Array(val));
117 jmap.insert("argsrepr".to_string(), AmqpValue::LongString(jstr.unwrap()));
118 }else{
119 let v = Value::from(Vec::<Value>::new());
120 let vm = to_string(&v);
121 jmap.insert("argsrepr".to_string(), AmqpValue::LongString(vm.unwrap()));
122 }
123
124 if self.kwargsrepr.is_some(){
125 let v = self.kwargsrepr.clone().unwrap();
126 let vm = to_string(&Value::from(v.convert_to_map()));
127 if vm.is_ok() {
128 let jstr = vm.unwrap();
129 jmap.insert(String::from("kwargsrepr"), AmqpValue::LongString(jstr));
130 }
131 }else{
132 let v = Map::<String, Value>::new();
133 let vm = to_string(&Value::from(v));
134 if vm.is_ok() {
135 let jstr = vm.unwrap();
136 jmap.insert("kwargsrepr".to_string(), AmqpValue::LongString(jstr));
137 }
138 }
139
140
141 if self.origin.is_some(){
142 let v = self.origin.clone().unwrap();
143 jmap.insert(String::from("origin"), AmqpValue::LongString(v));
144 }else{
145 let nodename = anon_name::get_anon_nodename(None, None);
146 jmap.insert("origin".to_string(), AmqpValue::LongString(nodename));
147 }
148 jmap
149 }
150
151 pub fn convert_to_json_map(&self) -> Map<String, Value>{
153 let mut jmap = Map::new();
154 jmap.insert(String::from("lang"), Value::String(self.lang.clone()));
155 jmap.insert(String::from("task"), Value::String(self.task.clone()));
156 jmap.insert(String::from("id"), Value::String(self.id.clone()));
157 jmap.insert(String::from("root_id"), Value::String(self.root_id.clone()));
158
159 if self.parent_id.is_some() {
160 let parent_id = self.parent_id.clone().unwrap();
161 jmap.insert(String::from("parent_id"), Value::String(parent_id));
162 }
163
164 if self.group.is_some() {
165 let group = self.group.clone().unwrap();
166 jmap.insert(String::from("group"), Value::String(group));
167 }
168
169 if self.meth.is_some() {
170 let v = self.meth.clone().unwrap();
171 jmap.insert(String::from("meth"), Value::from(v));
172 }
173
174 if self.shadow.is_some(){
175 let v = self.shadow.clone().unwrap();
176 jmap.insert(String::from("shadow"), Value::from(v));
177 }
178
179 if self.eta.is_some(){
180 let v = self.eta.clone().unwrap();
181 jmap.insert(String::from("eta"), Value::from(v));
182 }
183
184 if self.expires.is_some(){
185 let v = self.expires.clone().unwrap();
186 jmap.insert(String::from("expires"), Value::from(v));
187 }
188
189 if self.retries.is_some(){
190 let v = self.retries.clone().unwrap();
191 jmap.insert(String::from("retries"), Value::from(v));
192 }
193
194 if self.timelimit.is_some(){
195 let v = self.timelimit.clone().unwrap();
196 let vtup = vec![Value::from(v.soft), Value::from(v.hard)];
197 jmap.insert(String::from("timelimit"), Value::Array(vtup));
198 }
199
200 if self.argsrepr.is_some(){
201 let v = self.argsrepr.clone().unwrap();
202 let argsrepr = v.args_to_vec();
203 jmap.insert(String::from("args"), Value::Array(argsrepr));
204 }
205
206 if self.kwargsrepr.is_some(){
207 let v = self.kwargsrepr.clone().unwrap();
208 let vm = v.convert_to_map();
209 jmap.insert(String::from("kwargsrepr"), Value::Object(vm));
210 }
211
212
213 if self.origin.is_some(){
214 let v = self.origin.clone().unwrap();
215 jmap.insert(String::from("origin"), Value::from(v));
216 }
217 jmap
218 }
219
220 pub fn new(lang: String, task: String, id: String, root_id: String) -> Headers{
222 Headers{
223 lang: lang,
224 task: task,
225 id: id,
226 root_id: root_id,
227 parent_id: None,
228 group: None,
229 meth: None,
230 shadow: None,
231 eta: None,
232 expires: None,
233 retries: None,
234 timelimit: None,
235 argsrepr: None,
236 kwargsrepr: None,
237 origin: None,
238 }
239 }
240}
241
242
243#[cfg(test)]
244mod tests{
245 use std::vec::Vec;
246
247 use crate::argparse::args::{Arg, Args};
248 use crate::message_protocol::headers::Headers;
249
250 #[test]
251 fn should_convert_to_json_map(){
252 let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
253 let arep = Args{
254 args: Vec::<Arg>::new(),
255 };
256 h.argsrepr = Some(arep);
257 let m = h.convert_to_json_map();
258 let l = m.get("lang");
259 assert!(String::from(l.unwrap().as_str().unwrap()).eq("rs"));
260 }
261
262 #[test]
263 fn should_create_new_headers(){
264 let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
265 let arep = Args{
266 args: Vec::<Arg>::new(),
267 };
268 h.argsrepr = Some(arep);
269 let lang = h.lang;
270 let optrep = h.argsrepr;
271 assert!(lang.eq("rs"));
272 assert!(optrep.unwrap().args.len() == 0);
273 }
274}