nitox/protocol/server/
message.rs

1use bytes::{BufMut, Bytes, BytesMut};
2use protocol::{Command, CommandError};
3
4/// The MSG protocol message is used to deliver an application message to the client.
5#[derive(Debug, Clone, PartialEq, Builder)]
6#[builder(build_fn(validate = "Self::validate"))]
7pub struct Message {
8    /// Subject name this message was received on
9    #[builder(setter(into))]
10    pub subject: String,
11    /// The unique alphanumeric subscription ID of the subject
12    #[builder(setter(into))]
13    pub sid: String,
14    /// The inbox subject on which the publisher is listening for responses
15    #[builder(default)]
16    pub reply_to: Option<String>,
17    /// The message payload data
18    #[builder(setter(into))]
19    pub payload: Bytes,
20}
21
22impl Message {
23    pub fn builder() -> MessageBuilder {
24        MessageBuilder::default()
25    }
26}
27
28impl Command for Message {
29    const CMD_NAME: &'static [u8] = b"MSG";
30
31    fn into_vec(self) -> Result<Bytes, CommandError> {
32        let rt = if let Some(reply_to) = self.reply_to {
33            format!("\t{}", reply_to)
34        } else {
35            "".into()
36        };
37
38        let cmd_str = format!("MSG\t{}\t{}{}\t{}\r\n", self.subject, self.sid, rt, self.payload.len());
39        let mut bytes = BytesMut::with_capacity(cmd_str.len() + self.payload.len() + 2);
40        bytes.put(cmd_str.as_bytes());
41        bytes.put(self.payload);
42        bytes.put("\r\n");
43
44        Ok(bytes.freeze())
45    }
46
47    fn try_parse(buf: &[u8]) -> Result<Self, CommandError> {
48        let len = buf.len();
49
50        if buf[len - 2..] != [b'\r', b'\n'] {
51            return Err(CommandError::IncompleteCommandError);
52        }
53
54        if let Some(payload_start) = buf[..len - 2].iter().position(|b| *b == b'\r') {
55            if buf[payload_start + 1] != b'\n' {
56                return Err(CommandError::CommandMalformed);
57            }
58
59            let payload: Bytes = buf[payload_start + 2..len - 2].into();
60
61            let whole_command = ::std::str::from_utf8(&buf[..payload_start])?;
62            let mut split = whole_command.split_whitespace();
63            let cmd = split.next().ok_or_else(|| CommandError::CommandMalformed)?;
64            // Check if we're still on the right command
65            if cmd.as_bytes() != Self::CMD_NAME {
66                return Err(CommandError::CommandMalformed);
67            }
68
69            let payload_len: usize = split
70                .next_back()
71                .ok_or_else(|| CommandError::CommandMalformed)?
72                .parse()?;
73
74            if payload.len() != payload_len {
75                return Err(CommandError::CommandMalformed);
76            }
77
78            // Extract subject
79            let subject: String = split.next().ok_or_else(|| CommandError::CommandMalformed)?.into();
80
81            let sid: String = split.next().ok_or_else(|| CommandError::CommandMalformed)?.into();
82
83            let reply_to: Option<String> = split.next().map(|v| v.into());
84
85            Ok(Message {
86                subject,
87                sid,
88                payload,
89                reply_to,
90            })
91        } else {
92            Err(CommandError::CommandMalformed)
93        }
94    }
95}
96
97impl MessageBuilder {
98    fn validate(&self) -> Result<(), String> {
99        if let Some(ref subj) = self.subject {
100            check_cmd_arg!(subj, "subject");
101        }
102
103        if let Some(ref reply_to_maybe) = self.reply_to {
104            if let Some(ref reply_to) = reply_to_maybe {
105                check_cmd_arg!(reply_to, "inbox");
106            }
107        }
108
109        Ok(())
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::{Message, MessageBuilder};
116    use protocol::Command;
117
118    static DEFAULT_MSG: &'static str = "MSG\tFOO\tpouet\t4\r\ntoto\r\n";
119
120    #[test]
121    fn it_parses() {
122        let parse_res = Message::try_parse(DEFAULT_MSG.as_bytes());
123        assert!(parse_res.is_ok());
124        let cmd = parse_res.unwrap();
125        assert!(cmd.reply_to.is_none());
126        assert_eq!(&cmd.subject, "FOO");
127        assert_eq!(&cmd.sid, "pouet");
128        assert_eq!(cmd.payload, "toto");
129    }
130
131    #[test]
132    fn it_stringifies() {
133        let cmd = MessageBuilder::default()
134            .subject("FOO")
135            .sid("pouet")
136            .payload("toto")
137            .build()
138            .unwrap();
139
140        let cmd_bytes_res = cmd.into_vec();
141        assert!(cmd_bytes_res.is_ok());
142        let cmd_bytes = cmd_bytes_res.unwrap();
143
144        assert_eq!(DEFAULT_MSG, cmd_bytes);
145    }
146}