nitox/protocol/server/
message.rs1use bytes::{BufMut, Bytes, BytesMut};
2use protocol::{Command, CommandError};
3
4#[derive(Debug, Clone, PartialEq, Builder)]
6#[builder(build_fn(validate = "Self::validate"))]
7pub struct Message {
8 #[builder(setter(into))]
10 pub subject: String,
11 #[builder(setter(into))]
13 pub sid: String,
14 #[builder(default)]
16 pub reply_to: Option<String>,
17 #[builder(setter(into))]
19 pub payload: Bytes,
20}
21
22impl Message {
23 pub fn builder() -> MessageBuilder {
24 MessageBuilder::default()
25 }
26}
27
28impl Command for Message {
29 const CMD_NAME: &'static [u8] = b"MSG";
30
31 fn into_vec(self) -> Result<Bytes, CommandError> {
32 let rt = if let Some(reply_to) = self.reply_to {
33 format!("\t{}", reply_to)
34 } else {
35 "".into()
36 };
37
38 let cmd_str = format!("MSG\t{}\t{}{}\t{}\r\n", self.subject, self.sid, rt, self.payload.len());
39 let mut bytes = BytesMut::with_capacity(cmd_str.len() + self.payload.len() + 2);
40 bytes.put(cmd_str.as_bytes());
41 bytes.put(self.payload);
42 bytes.put("\r\n");
43
44 Ok(bytes.freeze())
45 }
46
47 fn try_parse(buf: &[u8]) -> Result<Self, CommandError> {
48 let len = buf.len();
49
50 if buf[len - 2..] != [b'\r', b'\n'] {
51 return Err(CommandError::IncompleteCommandError);
52 }
53
54 if let Some(payload_start) = buf[..len - 2].iter().position(|b| *b == b'\r') {
55 if buf[payload_start + 1] != b'\n' {
56 return Err(CommandError::CommandMalformed);
57 }
58
59 let payload: Bytes = buf[payload_start + 2..len - 2].into();
60
61 let whole_command = ::std::str::from_utf8(&buf[..payload_start])?;
62 let mut split = whole_command.split_whitespace();
63 let cmd = split.next().ok_or_else(|| CommandError::CommandMalformed)?;
64 if cmd.as_bytes() != Self::CMD_NAME {
66 return Err(CommandError::CommandMalformed);
67 }
68
69 let payload_len: usize = split
70 .next_back()
71 .ok_or_else(|| CommandError::CommandMalformed)?
72 .parse()?;
73
74 if payload.len() != payload_len {
75 return Err(CommandError::CommandMalformed);
76 }
77
78 let subject: String = split.next().ok_or_else(|| CommandError::CommandMalformed)?.into();
80
81 let sid: String = split.next().ok_or_else(|| CommandError::CommandMalformed)?.into();
82
83 let reply_to: Option<String> = split.next().map(|v| v.into());
84
85 Ok(Message {
86 subject,
87 sid,
88 payload,
89 reply_to,
90 })
91 } else {
92 Err(CommandError::CommandMalformed)
93 }
94 }
95}
96
97impl MessageBuilder {
98 fn validate(&self) -> Result<(), String> {
99 if let Some(ref subj) = self.subject {
100 check_cmd_arg!(subj, "subject");
101 }
102
103 if let Some(ref reply_to_maybe) = self.reply_to {
104 if let Some(ref reply_to) = reply_to_maybe {
105 check_cmd_arg!(reply_to, "inbox");
106 }
107 }
108
109 Ok(())
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::{Message, MessageBuilder};
116 use protocol::Command;
117
118 static DEFAULT_MSG: &'static str = "MSG\tFOO\tpouet\t4\r\ntoto\r\n";
119
120 #[test]
121 fn it_parses() {
122 let parse_res = Message::try_parse(DEFAULT_MSG.as_bytes());
123 assert!(parse_res.is_ok());
124 let cmd = parse_res.unwrap();
125 assert!(cmd.reply_to.is_none());
126 assert_eq!(&cmd.subject, "FOO");
127 assert_eq!(&cmd.sid, "pouet");
128 assert_eq!(cmd.payload, "toto");
129 }
130
131 #[test]
132 fn it_stringifies() {
133 let cmd = MessageBuilder::default()
134 .subject("FOO")
135 .sid("pouet")
136 .payload("toto")
137 .build()
138 .unwrap();
139
140 let cmd_bytes_res = cmd.into_vec();
141 assert!(cmd_bytes_res.is_ok());
142 let cmd_bytes = cmd_bytes_res.unwrap();
143
144 assert_eq!(DEFAULT_MSG, cmd_bytes);
145 }
146}