celery_rs_core/message_protocol/
message.rs

1/*
2A message body for
3
4Author Andrew Evans
5*/
6
7use amiquip::{AmqpProperties, AmqpValue};
8use amq_protocol::uri::AMQPScheme::AMQP;
9use serde_json::{Map, to_string, Value};
10
11use crate::argparse::args::Args;
12use crate::argparse::kwargs::KwArgs;
13use crate::message_protocol::{headers::Headers, message_body::MessageBody, properties::Properties};
14
15/// Message objects to be packaged when ready
16pub struct Message{
17    pub properties: Properties,
18    pub headers: Headers,
19    pub body: MessageBody,
20    pub args: Option<Args>,
21    pub kwargs: Option<KwArgs>,
22}
23
24
25/// functions for converting message to string
26impl Message{
27
28    /// Get message parts
29    pub fn get_message_parts(&self) -> (String, AmqpProperties){
30        let mut props = self.properties.convert_to_amqp_properties();
31
32        /// get extra properties
33        let jheaders = self.headers.convert_to_btree_map();
34        props = props.with_headers(jheaders);
35
36        /// get the message body string
37        let mut body_vec = Vec::<Value>::new();
38
39        if self.args.is_some() {
40            let args = self.args.clone().unwrap().args_to_vec();
41            body_vec.push(Value::Array(args));
42        }else{
43            body_vec.push(Value::Null);
44        }
45
46        if self.kwargs.is_some(){
47            let kwargs = self.kwargs.clone().unwrap().convert_to_map();
48            body_vec.push(Value::Object(kwargs));
49        }else{
50            body_vec.push(Value::Null);
51        }
52
53        let message_map = self.body.convert_to_json_map();
54        let mbody_val = Value::Object(message_map);
55        body_vec.push(mbody_val);
56        let bv = Value::Array(body_vec);
57        let body_str = to_string(&bv).ok().unwrap();
58        (body_str, props)
59    }
60
61    /// convert the body to json
62    pub fn new(properties: Properties, headers: Headers, body: MessageBody, args: Option<Args>, kwargs: Option<KwArgs>) -> Message{
63        Message{
64            properties: properties,
65            headers: headers,
66            body: body,
67            args: args,
68            kwargs: kwargs,
69        }
70    }
71}
72
73
74#[cfg(test)]
75mod tests{
76    use amiquip::AmqpValue;
77    use amq_protocol::types::AMQPType;
78    use serde_json::{from_str, Value};
79
80    use crate::argparse::args::{Arg, Args};
81    use crate::argparse::kwargs::KwArg;
82    use crate::message_protocol::headers::Headers;
83    use crate::message_protocol::message_body::MessageBody;
84    use crate::message_protocol::properties::Properties;
85
86    use super::*;
87
88    #[test]
89    fn create_new_message(){
90        let correlation_id = String::from("test_correlation");
91        let content_type = String::from("test_content");
92        let content_encoding = String::from("test_encoding");
93        let props = Properties::new(correlation_id, content_type, content_encoding, None);
94        let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
95        let arep = Args{
96            args: Vec::<Arg>::new(),
97        };
98        h.argsrepr = Some(arep);
99        let mb = MessageBody::new(Some(String::from("chord")), None, None, None);
100        let cjm = mb.convert_to_json_map();
101        let ch = cjm.get("chord");
102        let cv = ch.unwrap().to_owned();
103        let test_string = String::from("test");
104        let test_val = Value::from(test_string);
105        let arg = Arg::new(test_val.clone(), AMQPType::LongString);
106        assert!(arg.arg.as_str().unwrap().eq("test"));
107        let mut args = Args::new();
108        args.args.push(arg);
109        let kwargs: Option<KwArg> = None;
110        let m = Message::new(props, h, mb, Some(args), None);
111    }
112
113    #[test]
114    fn test_serialize_body(){
115        let correlation_id = String::from("test_correlation");
116        let content_type = String::from("test_content");
117        let content_encoding = String::from("test_encoding");
118        let props = Properties::new(correlation_id, content_type, content_encoding, None);
119        let mut h = Headers::new(String::from("rs"), String::from("test_task"), String::from("id"), String::from("test_root"));
120        let arep = Args{
121            args: Vec::<Arg>::new(),
122        };
123        h.argsrepr = Some(arep);
124        let mb = MessageBody::new(Some(String::from("chord")), None, None, None);
125        let cjm = mb.convert_to_json_map();
126        let ch = cjm.get("chord");
127        let cv = ch.unwrap().to_owned();
128        let test_string = String::from("test");
129        let test_val = Value::from(test_string);
130        let arg = Arg::new(test_val.clone(), AMQPType::LongString);
131        assert!(arg.arg.as_str().unwrap().eq("test"));
132        let mut args = Args::new();
133        args.args.push(arg);
134        let kwargs: Option<KwArg> = None;
135        let m = Message::new(props, h, mb, Some(args), None);
136        let (body, props) = m.get_message_parts();
137        let jval = from_str(body.as_str());
138        let rval: Value = jval.ok().unwrap();
139        let o = rval.as_array().unwrap().to_owned();
140        assert!(o.len() == 3);
141        let a1 = o.get(0).unwrap().to_owned();
142        let jargs = a1.as_array().unwrap().to_owned();
143        assert!(jargs.len() == 1);
144        assert!(jargs.get(0).unwrap().eq("test"));
145        assert!(o.get(1).unwrap().to_owned() == Value::Null);
146    }
147}