simple_pub_sub_message/
message.rs

1use crate::{header::Header, PktType};
2use anyhow::{bail, Result};
3use log::trace;
4use tokio::sync::broadcast::Sender;
5
6/// structure containing the complete information about a message.
7#[derive(Debug, Clone)]
8pub struct Msg {
9    /// `Header`: the header of the message.
10    pub header: Header,
11    /// The topic for the message.
12    pub topic: String,
13    /// the actual message, bytes.
14    pub message: Vec<u8>,
15    /// `tokio::broadcast::sync::Sender` the channel for passing the messages across.
16    pub channel: Option<Sender<Msg>>,
17    /// client_id: to identify each socket connection/client.
18    pub client_id: Option<String>,
19}
20
21impl Msg {
22    /// Creates a new `Msg` with the given data.
23    /// ```
24    /// use simple_pub_sub_message::message::Msg;
25    /// use simple_pub_sub_message::PktType;
26    /// let msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
27    /// ```
28    pub fn new(pkt_type: PktType, topic: String, message: Option<Vec<u8>>) -> Msg {
29        let msg: Vec<u8> = message.unwrap_or_default();
30
31        Msg {
32            header: Header::new(pkt_type, topic.len() as u8, msg.len() as u16),
33            topic,
34            message: msg,
35            channel: None,
36            client_id: None,
37        }
38    }
39
40    /// adds the given channel to the message.
41    /// ```
42    /// use simple_pub_sub_message::message::Msg;
43    /// use simple_pub_sub_message::PktType;
44    /// use tokio::sync::broadcast::Sender;
45    /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
46    /// let chan: tokio::sync::broadcast::Sender<Msg> =
47    ///   tokio::sync::broadcast::Sender::new(1);
48    /// msg.channel(chan)
49    /// ```
50    pub fn channel(&mut self, chan: Sender<Msg>) {
51        self.channel = Some(chan);
52    }
53
54    ///returns the client id for the message.
55    /// ```
56    /// use simple_pub_sub_message::message::Msg;
57    /// use simple_pub_sub_message::PktType;
58    /// use uuid;
59    /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
60    /// let client_id = uuid::Uuid::new_v4().to_string();
61    /// msg.client_id(client_id)
62    /// ```
63    pub fn client_id(&mut self, client_id: String) {
64        self.client_id = Some(client_id);
65    }
66
67    /// generates the response `Msg` with the given data.
68    /// ```
69    /// use simple_pub_sub_message::message::Msg;
70    /// use simple_pub_sub_message::PktType;
71    /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
72    /// let response_msg = msg.response_msg(vec![]);
73    /// ```
74    pub fn response_msg(&self, message: Vec<u8>) -> Result<Msg> {
75        let mut header: Header = self.header.response_header()?;
76        header.message_length = message.len() as u16;
77        Ok(Msg {
78            header,
79            topic: self.topic.clone(),
80            message,
81            channel: None,
82            client_id: None,
83        })
84    }
85
86    /// returns bytes for the `Msg` that can be sent to the stream.
87    ///```
88    /// use simple_pub_sub_message::message::Msg;
89    /// use simple_pub_sub_message::PktType;
90    /// let mut msg = Msg::new(PktType::PUBLISH, "Test".to_string(), Some(b"The message".to_vec()));
91    /// let bytes = msg.bytes();
92    /// ```
93    pub fn bytes(&self) -> Vec<u8> {
94        let mut buffer: Vec<u8> = self.header.bytes().to_vec();
95        buffer.extend(self.topic.as_bytes().to_vec());
96        buffer.extend(self.message.clone());
97        trace!("The generated buffer is: {:?}", buffer);
98        buffer
99    }
100}
101
102/// returns a response `Msg`.
103/// ```
104/// use simple_pub_sub_message::message::Msg;
105/// use simple_pub_sub_message::PktType;
106/// use simple_pub_sub_message::message::get_msg_response;
107/// let mut msg = Msg::new(PktType::PUBLISHACK, "Test".to_string(), Some(b"".to_vec()));
108/// let response_msg = get_msg_response(msg);
109/// ```
110pub fn get_msg_response(msg: Msg) -> Result<Vec<u8>> {
111    let mut resp: Vec<u8> = msg.response_msg(msg.message.clone())?.bytes();
112    resp.extend(msg.topic.bytes());
113    Ok(resp)
114}
115
116impl PartialEq for Msg {
117    fn eq(&self, other: &Self) -> bool {
118        if self.header == other.header && self.topic == other.topic && self.message == other.message
119        {
120            return true;
121        }
122        false
123    }
124}
125impl TryFrom<&[u8]> for Msg {
126    type Error = anyhow::Error;
127
128    /// Parses a `Msg` from a `Vec<u8>`.
129    /// ```
130    /// use simple_pub_sub_message::message::Msg;
131    /// let buf = [15, 0, 1, 2, 3, 0, 12, 0, 97, 98,
132    ///   99, 116, 101, 115, 116, 32, 109, 101, 115,
133    ///   115, 97, 103, 101];
134    /// let msg = Msg::try_from(buf.as_ref()).unwrap();
135    /// println!("{:?}", msg);
136    /// ```
137    fn try_from(bytes: &[u8]) -> Result<Msg> {
138        let header = Header::try_from(bytes[..8].as_ref())?;
139        let topic: String = String::from_utf8(bytes[8..(8 + header.topic_length).into()].to_vec())?;
140        let message_end: usize = ((8 + header.topic_length) as u16 + header.message_length).into();
141
142        if bytes.len() < message_end {
143            bail!("Invalid Msg length");
144        }
145        let message = bytes[(8 + header.topic_length).into()..message_end].to_vec();
146        Ok(Msg {
147            header,
148            topic,
149            message,
150            channel: None,
151            client_id: None,
152        })
153    }
154}
155impl TryFrom<Vec<u8>> for Msg {
156    type Error = anyhow::Error;
157
158    /// Parses a `Msg` from a `Vec<u8>`.
159    /// ```
160    /// use simple_pub_sub_message::message::Msg;
161    /// let buf = vec![15, 0, 1, 2, 3, 0, 12, 0, 97, 98,
162    ///   99, 116, 101, 115, 116, 32, 109, 101, 115,
163    ///   115, 97, 103, 101];
164    /// let msg = Msg::try_from(buf).unwrap();
165    /// ```
166    fn try_from(bytes: Vec<u8>) -> Result<Msg> {
167        Msg::try_from(bytes.as_ref())
168    }
169}