vertx_eventbus_bridge/
message.rs1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::collections::HashMap;
4use std::io::ErrorKind;
5use std::sync::mpsc::Receiver;
6
7pub type UserMessage<T> = Result<T, ErrorKind>;
8
9pub struct MessageConsumer<T> {
10 pub msg_queue: Receiver<UserMessage<T>>,
11}
12
13impl<T> Iterator for MessageConsumer<T> {
14 type Item = UserMessage<T>;
15
16 fn next(&mut self) -> Option<Self::Item> {
17 self.msg_queue.try_recv().ok()
18 }
19}
20
21#[derive(Debug, Deserialize, PartialEq)]
22#[serde(rename_all(serialize = "lowercase", deserialize = "lowercase"))]
23#[serde(tag = "type")]
24pub enum InMessage {
25 Pong,
26 Err(ErrorMessage),
28 Message(Message),
29}
30
31#[derive(Debug, Serialize, PartialEq)]
32#[serde(rename_all(serialize = "lowercase", deserialize = "lowercase"))]
33#[serde(tag = "type")]
34pub enum OutMessage {
35 Ping, Register(RegisterMessage),
39 Unregister(RegisterMessage),
40 Send(SendMessage),
42 Publish(Message),
43}
44
45#[derive(Debug, Serialize, Deserialize, PartialEq)]
46pub struct Message {
47 pub address: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub body: Option<Value>,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub headers: Option<HashMap<String, String>>,
52}
53
54#[derive(Debug, Serialize, Deserialize, PartialEq)]
55pub struct ErrorMessage {
56 pub message: String,
57}
58
59#[derive(Debug, Serialize, Deserialize, PartialEq)]
60#[serde(rename_all = "camelCase")]
61pub struct SendMessage {
62 pub address: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub reply_address: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub body: Option<Value>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 pub headers: Option<HashMap<String, String>>,
69}
70
71#[derive(Debug, Serialize, Deserialize, PartialEq)]
72pub struct RegisterMessage {
73 pub address: String,
74}
75
76#[cfg(test)]
77mod tests {
78 use crate::message::{InMessage, Message, OutMessage, SendMessage};
79 use serde_json::json;
80
81 const JSON_PING: &str = r#"{"type":"ping"}"#;
82 const JSON_PONG: &str = r#"{"type":"pong"}"#;
83
84 const JSON_SEND: &str =
85 r#"{"type":"send","address":"the-address","replyAddress":"the-reply-address","body":{}}"#;
86 const JSON_RECEIVED: &str = r#"{"type":"message","address":"the-address","body":{}}"#;
87
88 #[test]
89 fn unmarshall_messages() {
90 assert_eq!(InMessage::Pong, serde_json::from_str(JSON_PONG).unwrap());
91
92 match serde_json::from_str(JSON_RECEIVED).unwrap() {
93 InMessage::Message(msg) => assert_eq!(
94 Message {
95 address: "the-address".to_string(),
96 body: Some(json!({})),
97 headers: None
98 },
99 msg
100 ),
101 other => panic!(format!("Expecting a message, not {:?}", other)),
102 };
103 }
104
105 #[test]
106 fn marshall_messages() {
107 let msg: String = serde_json::to_string(&OutMessage::Ping).unwrap();
108 assert_eq!(JSON_PING, msg);
109
110 let msg: String = serde_json::to_string(&OutMessage::Send(SendMessage {
111 address: "the-address".to_string(),
112 body: Some(json!({})),
113 reply_address: Some("the-reply-address".to_string()),
114 headers: None,
115 }))
116 .unwrap();
117 assert_eq!(JSON_SEND, msg);
118 }
119}