vertx_eventbus_bridge/
message.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::collections::HashMap;
4use std::io::ErrorKind;
5use std::sync::mpsc::Receiver;
6
7pub type UserMessage<T> = Result<T, ErrorKind>;
8
9pub struct MessageConsumer<T> {
10    pub msg_queue: Receiver<UserMessage<T>>,
11}
12
13impl<T> Iterator for MessageConsumer<T> {
14    type Item = UserMessage<T>;
15
16    fn next(&mut self) -> Option<Self::Item> {
17        self.msg_queue.try_recv().ok()
18    }
19}
20
21#[derive(Debug, Deserialize, PartialEq)]
22#[serde(rename_all(serialize = "lowercase", deserialize = "lowercase"))]
23#[serde(tag = "type")]
24pub enum InMessage {
25    Pong,
26    // user, incoming messages
27    Err(ErrorMessage),
28    Message(Message),
29}
30
31#[derive(Debug, Serialize, PartialEq)]
32#[serde(rename_all(serialize = "lowercase", deserialize = "lowercase"))]
33#[serde(tag = "type")]
34pub enum OutMessage {
35    // internal, control
36    Ping, // outgoing
37    // internal, primitives associated to user actions, outgoing
38    Register(RegisterMessage),
39    Unregister(RegisterMessage),
40    // user, outgoing message
41    Send(SendMessage),
42    Publish(Message),
43}
44
45#[derive(Debug, Serialize, Deserialize, PartialEq)]
46pub struct Message {
47    pub address: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub body: Option<Value>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub headers: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize, Deserialize, PartialEq)]
55pub struct ErrorMessage {
56    pub message: String,
57}
58
59#[derive(Debug, Serialize, Deserialize, PartialEq)]
60#[serde(rename_all = "camelCase")]
61pub struct SendMessage {
62    pub address: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub reply_address: Option<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub body: Option<Value>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub headers: Option<HashMap<String, String>>,
69}
70
71#[derive(Debug, Serialize, Deserialize, PartialEq)]
72pub struct RegisterMessage {
73    pub address: String,
74}
75
76#[cfg(test)]
77mod tests {
78    use crate::message::{InMessage, Message, OutMessage, SendMessage};
79    use serde_json::json;
80
81    const JSON_PING: &str = r#"{"type":"ping"}"#;
82    const JSON_PONG: &str = r#"{"type":"pong"}"#;
83
84    const JSON_SEND: &str =
85        r#"{"type":"send","address":"the-address","replyAddress":"the-reply-address","body":{}}"#;
86    const JSON_RECEIVED: &str = r#"{"type":"message","address":"the-address","body":{}}"#;
87
88    #[test]
89    fn unmarshall_messages() {
90        assert_eq!(InMessage::Pong, serde_json::from_str(JSON_PONG).unwrap());
91
92        match serde_json::from_str(JSON_RECEIVED).unwrap() {
93            InMessage::Message(msg) => assert_eq!(
94                Message {
95                    address: "the-address".to_string(),
96                    body: Some(json!({})),
97                    headers: None
98                },
99                msg
100            ),
101            other => panic!(format!("Expecting a message, not {:?}", other)),
102        };
103    }
104
105    #[test]
106    fn marshall_messages() {
107        let msg: String = serde_json::to_string(&OutMessage::Ping).unwrap();
108        assert_eq!(JSON_PING, msg);
109
110        let msg: String = serde_json::to_string(&OutMessage::Send(SendMessage {
111            address: "the-address".to_string(),
112            body: Some(json!({})),
113            reply_address: Some("the-reply-address".to_string()),
114            headers: None,
115        }))
116        .unwrap();
117        assert_eq!(JSON_SEND, msg);
118    }
119}