simple_pub_sub_message/message.rs
1use crate::{header::Header, PktType};
2use anyhow::{bail, Result};
3use log::trace;
4use tokio::sync::broadcast::Sender;
5
6/// structure containing the complete information about a message.
7#[derive(Debug, Clone)]
8pub struct Msg {
9 /// `Header`: the header of the message.
10 pub header: Header,
11 /// The topic for the message.
12 pub topic: String,
13 /// the actual message, bytes.
14 pub message: Vec<u8>,
15 /// `tokio::broadcast::sync::Sender` the channel for passing the messages across.
16 pub channel: Option<Sender<Msg>>,
17 /// client_id: to identify each socket connection/client.
18 pub client_id: Option<String>,
19}
20
21impl Msg {
22 /// Creates a new `Msg` with the given data.
23 /// ```
24 /// use simple_pub_sub_message::message::Msg;
25 /// use simple_pub_sub_message::PktType;
26 /// let msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
27 /// ```
28 pub fn new(pkt_type: PktType, topic: String, message: Option<Vec<u8>>) -> Msg {
29 let msg: Vec<u8> = message.unwrap_or_default();
30
31 Msg {
32 header: Header::new(pkt_type, topic.len() as u8, msg.len() as u16),
33 topic,
34 message: msg,
35 channel: None,
36 client_id: None,
37 }
38 }
39
40 /// adds the given channel to the message.
41 /// ```
42 /// use simple_pub_sub_message::message::Msg;
43 /// use simple_pub_sub_message::PktType;
44 /// use tokio::sync::broadcast::Sender;
45 /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
46 /// let chan: tokio::sync::broadcast::Sender<Msg> =
47 /// tokio::sync::broadcast::Sender::new(1);
48 /// msg.channel(chan)
49 /// ```
50 pub fn channel(&mut self, chan: Sender<Msg>) {
51 self.channel = Some(chan);
52 }
53
54 ///returns the client id for the message.
55 /// ```
56 /// use simple_pub_sub_message::message::Msg;
57 /// use simple_pub_sub_message::PktType;
58 /// use uuid;
59 /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
60 /// let client_id = uuid::Uuid::new_v4().to_string();
61 /// msg.client_id(client_id)
62 /// ```
63 pub fn client_id(&mut self, client_id: String) {
64 self.client_id = Some(client_id);
65 }
66
67 /// generates the response `Msg` with the given data.
68 /// ```
69 /// use simple_pub_sub_message::message::Msg;
70 /// use simple_pub_sub_message::PktType;
71 /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
72 /// let response_msg = msg.response_msg(vec![]);
73 /// ```
74 pub fn response_msg(&self, message: Vec<u8>) -> Result<Msg> {
75 let mut header: Header = self.header.response_header()?;
76 header.message_length = message.len() as u16;
77 Ok(Msg {
78 header,
79 topic: self.topic.clone(),
80 message,
81 channel: None,
82 client_id: None,
83 })
84 }
85
86 /// returns bytes for the `Msg` that can be sent to the stream.
87 ///```
88 /// use simple_pub_sub_message::message::Msg;
89 /// use simple_pub_sub_message::PktType;
90 /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
91 /// let bytes = msg.bytes();
92 /// ```
93 pub fn bytes(&self) -> Vec<u8> {
94 let mut buffer: Vec<u8> = self.header.bytes().to_vec();
95 buffer.extend(self.topic.as_bytes().to_vec());
96 buffer.extend(self.message.clone());
97 trace!("The generated buffer is: {:?}", buffer);
98 buffer
99 }
100}
101
102/// returns a response `Msg`.
103/// ```
104/// use simple_pub_sub_message::message::Msg;
105/// use simple_pub_sub_message::PktType;
106/// use simple_pub_sub_message::message::get_msg_response;
107/// let mut msg = Msg::new(PktType::PUBLISHACK, "Test".to_string(), Some(b"".to_vec()));
108/// let response_msg = get_msg_response(msg);
109/// ```
110pub fn get_msg_response(msg: Msg) -> Result<Vec<u8>> {
111 let mut resp: Vec<u8> = msg.response_msg(msg.message.clone())?.bytes();
112 resp.extend(msg.topic.bytes());
113 Ok(resp)
114}
115
116impl PartialEq for Msg {
117 fn eq(&self, other: &Self) -> bool {
118 if self.header == other.header && self.topic == other.topic && self.message == other.message
119 {
120 return true;
121 }
122 false
123 }
124}
125impl TryFrom<&[u8]> for Msg {
126 type Error = anyhow::Error;
127
128 /// Parses a `Msg` from a `Vec<u8>`.
129 /// ```
130 /// use simple_pub_sub_message::message::Msg;
131 /// let buf = [15, 0, 1, 2, 3, 0, 12, 0, 97, 98,
132 /// 99, 116, 101, 115, 116, 32, 109, 101, 115,
133 /// 115, 97, 103, 101];
134 /// let msg = Msg::try_from(buf.as_ref()).unwrap();
135 /// println!("{:?}", msg);
136 /// ```
137 fn try_from(bytes: &[u8]) -> Result<Msg> {
138 let header = Header::try_from(bytes[..8].as_ref())?;
139 let topic: String = String::from_utf8(bytes[8..(8 + header.topic_length).into()].to_vec())?;
140 let message_end: usize = ((8 + header.topic_length) as u16 + header.message_length).into();
141
142 if bytes.len() < message_end {
143 bail!("Invalid Msg length");
144 }
145 let message = bytes[(8 + header.topic_length).into()..message_end].to_vec();
146 Ok(Msg {
147 header,
148 topic,
149 message,
150 channel: None,
151 client_id: None,
152 })
153 }
154}
155impl TryFrom<Vec<u8>> for Msg {
156 type Error = anyhow::Error;
157
158 /// Parses a `Msg` from a `Vec<u8>`.
159 /// ```
160 /// use simple_pub_sub_message::message::Msg;
161 /// let buf = vec![15, 0, 1, 2, 3, 0, 12, 0, 97, 98,
162 /// 99, 116, 101, 115, 116, 32, 109, 101, 115,
163 /// 115, 97, 103, 101];
164 /// let msg = Msg::try_from(buf).unwrap();
165 /// ```
166 fn try_from(bytes: Vec<u8>) -> Result<Msg> {
167 Msg::try_from(bytes.as_ref())
168 }
169}