celery_rs_core/message_protocol/
message.rs1use amiquip::{AmqpProperties, AmqpValue};
8use amq_protocol::uri::AMQPScheme::AMQP;
9use serde_json::{Map, to_string, Value};
10
11use crate::argparse::args::Args;
12use crate::argparse::kwargs::KwArgs;
13use crate::message_protocol::{headers::Headers, message_body::MessageBody, properties::Properties};
14
15pub struct Message{
17 pub properties: Properties,
18 pub headers: Headers,
19 pub body: MessageBody,
20 pub args: Option<Args>,
21 pub kwargs: Option<KwArgs>,
22}
23
24
25impl Message{
27
28 pub fn get_message_parts(&self) -> (String, AmqpProperties){
30 let mut props = self.properties.convert_to_amqp_properties();
31
32 let jheaders = self.headers.convert_to_btree_map();
34 props = props.with_headers(jheaders);
35
36 let mut body_vec = Vec::<Value>::new();
38
39 if self.args.is_some() {
40 let args = self.args.clone().unwrap().args_to_vec();
41 body_vec.push(Value::Array(args));
42 }else{
43 body_vec.push(Value::Null);
44 }
45
46 if self.kwargs.is_some(){
47 let kwargs = self.kwargs.clone().unwrap().convert_to_map();
48 body_vec.push(Value::Object(kwargs));
49 }else{
50 body_vec.push(Value::Null);
51 }
52
53 let message_map = self.body.convert_to_json_map();
54 let mbody_val = Value::Object(message_map);
55 body_vec.push(mbody_val);
56 let bv = Value::Array(body_vec);
57 let body_str = to_string(&bv).ok().unwrap();
58 (body_str, props)
59 }
60
61 pub fn new(properties: Properties, headers: Headers, body: MessageBody, args: Option<Args>, kwargs: Option<KwArgs>) -> Message{
63 Message{
64 properties: properties,
65 headers: headers,
66 body: body,
67 args: args,
68 kwargs: kwargs,
69 }
70 }
71}
72
73
74#[cfg(test)]
75mod tests{
76 use amiquip::AmqpValue;
77 use amq_protocol::types::AMQPType;
78 use serde_json::{from_str, Value};
79
80 use crate::argparse::args::{Arg, Args};
81 use crate::argparse::kwargs::KwArg;
82 use crate::message_protocol::headers::Headers;
83 use crate::message_protocol::message_body::MessageBody;
84 use crate::message_protocol::properties::Properties;
85
86 use super::*;
87
88 #[test]
89 fn create_new_message(){
90 let correlation_id = String::from("test_correlation");
91 let content_type = String::from("test_content");
92 let content_encoding = String::from("test_encoding");
93 let props = Properties::new(correlation_id, content_type, content_encoding, None);
94 let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
95 let arep = Args{
96 args: Vec::<Arg>::new(),
97 };
98 h.argsrepr = Some(arep);
99 let mb = MessageBody::new(Some(String::from("chord")), None, None, None);
100 let cjm = mb.convert_to_json_map();
101 let ch = cjm.get("chord");
102 let cv = ch.unwrap().to_owned();
103 let test_string = String::from("test");
104 let test_val = Value::from(test_string);
105 let arg = Arg::new(test_val.clone(), AMQPType::LongString);
106 assert!(arg.arg.as_str().unwrap().eq("test"));
107 let mut args = Args::new();
108 args.args.push(arg);
109 let kwargs: Option<KwArg> = None;
110 let m = Message::new(props, h, mb, Some(args), None);
111 }
112
113 #[test]
114 fn test_serialize_body(){
115 let correlation_id = String::from("test_correlation");
116 let content_type = String::from("test_content");
117 let content_encoding = String::from("test_encoding");
118 let props = Properties::new(correlation_id, content_type, content_encoding, None);
119 let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
120 let arep = Args{
121 args: Vec::<Arg>::new(),
122 };
123 h.argsrepr = Some(arep);
124 let mb = MessageBody::new(Some(String::from("chord")), None, None, None);
125 let cjm = mb.convert_to_json_map();
126 let ch = cjm.get("chord");
127 let cv = ch.unwrap().to_owned();
128 let test_string = String::from("test");
129 let test_val = Value::from(test_string);
130 let arg = Arg::new(test_val.clone(), AMQPType::LongString);
131 assert!(arg.arg.as_str().unwrap().eq("test"));
132 let mut args = Args::new();
133 args.args.push(arg);
134 let kwargs: Option<KwArg> = None;
135 let m = Message::new(props, h, mb, Some(args), None);
136 let (body, props) = m.get_message_parts();
137 let jval = from_str(body.as_str());
138 let rval: Value = jval.ok().unwrap();
139 let o = rval.as_array().unwrap().to_owned();
140 assert!(o.len() == 3);
141 let a1 = o.get(0).unwrap().to_owned();
142 let jargs = a1.as_array().unwrap().to_owned();
143 assert!(jargs.len() == 1);
144 assert!(jargs.get(0).unwrap().eq("test"));
145 assert!(o.get(1).unwrap().to_owned() == Value::Null);
146 }
147}