mqtt/
encode.rs

1use std::io::{self, Result, Error, ErrorKind};
2
3use byteorder::{BigEndian, WriteBytesExt};
4
5use proto::*;
6use packet::*;
7
8pub const MAX_VARIABLE_LENGTH: usize = 268435455; // 0xFF,0xFF,0xFF,0x7F
9
10pub trait WritePacketHelper: io::Write {
11    #[inline]
12    fn write_fixed_header(&mut self, packet: &Packet) -> Result<usize> {
13        let content_size = self.calc_content_size(packet);
14
15        debug!(
16            "write FixedHeader {{ type={}, flags={}, remaining_length={} }} ",
17            packet.packet_type(),
18            packet.packet_flags(),
19            content_size
20        );
21
22        Ok(
23            self.write(
24                &[(packet.packet_type() << 4) | packet.packet_flags()],
25            )? + self.write_variable_length(content_size)?,
26        )
27    }
28
29    fn calc_content_size(&mut self, packet: &Packet) -> usize {
30        match *packet {
31            Packet::Connect {
32                ref last_will,
33                client_id,
34                username,
35                password,
36                ..
37            } => {
38                // Protocol Name + Protocol Level + Connect Flags + Keep Alive
39                let mut n = 2 + 4 + 1 + 1 + 2;
40
41                // Client Id
42                n += 2 + client_id.len();
43
44                // Will Topic + Will Message
45                if let &Some(LastWill { topic, message, .. }) = last_will {
46                    n += 2 + topic.len() + 2 + message.len();
47                }
48
49                if let Some(s) = username {
50                    n += 2 + s.len();
51                }
52
53                if let Some(s) = password {
54                    n += 2 + s.len();
55                }
56
57                n
58            }
59
60            Packet::ConnectAck { .. } => 2, // Flags + Return Code
61
62            Packet::Publish {
63                topic,
64                packet_id,
65                payload,
66                ..
67            } => {
68                // Topic + Packet Id + Payload
69                2 + topic.len() + packet_id.map_or(0, |_| 2) + payload.len()
70            }
71
72            Packet::PublishAck { .. } |
73            Packet::PublishReceived { .. } |
74            Packet::PublishRelease { .. } |
75            Packet::PublishComplete { .. } |
76            Packet::UnsubscribeAck { .. } => 2, // Packet Id
77
78            Packet::Subscribe { ref topic_filters, .. } => {
79                2 +
80                    topic_filters.iter().fold(0, |acc, &(filter, _)| {
81                        acc + 2 + filter.len() + 1
82                    })
83            }
84
85            Packet::SubscribeAck { ref status, .. } => 2 + status.len(),
86
87            Packet::Unsubscribe { ref topic_filters, .. } => {
88                2 +
89                    topic_filters.iter().fold(
90                        0,
91                        |acc, &filter| acc + 2 + filter.len(),
92                    )
93            }
94
95            Packet::PingRequest | Packet::PingResponse | Packet::Disconnect => 0,
96        }
97    }
98
99    fn write_content(&mut self, packet: &Packet) -> Result<usize> {
100        let mut n = 0;
101
102        match *packet {
103            Packet::Connect {
104                protocol,
105                clean_session,
106                keep_alive,
107                ref last_will,
108                client_id,
109                username,
110                password,
111            } => {
112                n += self.write_utf8_str(protocol.name())?;
113
114                let mut flags = ConnectFlags::empty();
115
116                if username.is_some() {
117                    flags |= ConnectFlags::USERNAME;
118                }
119                if password.is_some() {
120                    flags |= ConnectFlags::PASSWORD;
121                }
122
123                if let &Some(LastWill { qos, retain, .. }) = last_will {
124                    flags |= ConnectFlags::WILL;
125
126                    if retain {
127                        flags |= ConnectFlags::WILL_RETAIN;
128                    }
129
130                    let b: u8 = qos.into();
131
132                    flags |= ConnectFlags::from_bits_truncate(b << WILL_QOS_SHIFT);
133                }
134
135                if clean_session {
136                    flags |= ConnectFlags::CLEAN_SESSION;
137                }
138
139                n += self.write(&[protocol.level(), flags.bits()])?;
140
141                self.write_u16::<BigEndian>(keep_alive)?;
142                n += 2;
143
144                n += self.write_utf8_str(client_id)?;
145
146                if let &Some(LastWill { topic, message, .. }) = last_will {
147                    n += self.write_utf8_str(topic)?;
148                    n += self.write_fixed_length_bytes(message)?;
149                }
150
151                if let Some(s) = username {
152                    n += self.write_utf8_str(s)?;
153                }
154
155                if let Some(s) = password {
156                    n += self.write_fixed_length_bytes(s)?;
157                }
158            }
159
160            Packet::ConnectAck {
161                session_present,
162                return_code,
163            } => {
164                n += self.write(
165                    &[
166                        if session_present { 0x01 } else { 0x00 },
167                        return_code.into(),
168                    ],
169                )?;
170            }
171
172            Packet::Publish {
173                qos,
174                topic,
175                packet_id,
176                payload,
177                ..
178            } => {
179                n += self.write_utf8_str(topic)?;
180
181                if qos == QoS::AtLeastOnce || qos == QoS::ExactlyOnce {
182                    self.write_u16::<BigEndian>(packet_id.unwrap())?;
183
184                    n += 2;
185                }
186
187                n += self.write(payload)?;
188            }
189
190            Packet::PublishAck { packet_id } |
191            Packet::PublishReceived { packet_id } |
192            Packet::PublishRelease { packet_id } |
193            Packet::PublishComplete { packet_id } |
194            Packet::UnsubscribeAck { packet_id } => {
195                self.write_u16::<BigEndian>(packet_id)?;
196
197                n += 2;
198            }
199
200            Packet::Subscribe {
201                packet_id,
202                ref topic_filters,
203            } => {
204                self.write_u16::<BigEndian>(packet_id)?;
205
206                n += 2;
207
208                for &(filter, qos) in topic_filters {
209                    n += self.write_utf8_str(filter)? + self.write(&[qos.into()])?;
210                }
211            }
212
213            Packet::SubscribeAck {
214                packet_id,
215                ref status,
216            } => {
217                self.write_u16::<BigEndian>(packet_id)?;
218
219                n += 2;
220
221                let buf: Vec<u8> = status
222                    .iter()
223                    .map(|s| if let SubscribeReturnCode::Success(qos) = *s {
224                        qos.into()
225                    } else {
226                        0x80
227                    })
228                    .collect();
229
230                n += self.write(&buf)?;
231            }
232
233            Packet::Unsubscribe {
234                packet_id,
235                ref topic_filters,
236            } => {
237                self.write_u16::<BigEndian>(packet_id)?;
238
239                n += 2;
240
241                for filter in topic_filters {
242                    n += self.write_utf8_str(filter)?;
243                }
244            }
245
246            Packet::PingRequest | Packet::PingResponse | Packet::Disconnect => {}
247        }
248
249        Ok(n)
250    }
251
252    #[inline]
253    fn write_utf8_str(&mut self, s: &str) -> Result<usize> {
254        self.write_u16::<BigEndian>(s.len() as u16)?;
255
256        Ok(2 + self.write(s.as_bytes())?)
257    }
258
259    #[inline]
260    fn write_fixed_length_bytes(&mut self, s: &[u8]) -> Result<usize> {
261        self.write_u16::<BigEndian>(s.len() as u16)?;
262
263        Ok(2 + self.write(s)?)
264    }
265
266    #[inline]
267    fn write_variable_length(&mut self, size: usize) -> Result<usize> {
268        if size > MAX_VARIABLE_LENGTH {
269            Err(Error::new(ErrorKind::Other, "out of range"))
270        } else if size < 128 {
271            self.write(&[size as u8])
272        } else {
273            let mut v = Vec::new();
274            let mut s = size;
275
276            while s > 0 {
277                let mut b = (s % 128) as u8;
278
279                s >>= 7;
280
281                if s > 0 {
282                    b |= 0x80;
283                }
284
285                v.push(b);
286            }
287
288            debug!("write variable length {} in {} bytes", size, v.len());
289
290            self.write(&v)
291        }
292    }
293}
294
295/// Extends `Write` with methods for writing packet.
296///
297/// ```
298/// use mqtt::{WritePacketExt, Packet};
299///
300/// let mut v = Vec::new();
301/// let p = Packet::PingResponse;
302///
303/// assert_eq!(v.write_packet(&p).unwrap(), 2);
304/// assert_eq!(v, b"\xd0\x00");
305/// ```
306pub trait WritePacketExt: io::Write {
307    #[inline]
308    /// Writes packet to the underlying writer.
309    fn write_packet(&mut self, packet: &Packet) -> Result<usize> {
310        Ok(
311            self.write_fixed_header(packet)? + self.write_content(packet)?,
312        )
313    }
314}
315
316impl<W: io::Write + ?Sized> WritePacketHelper for W {}
317impl<W: io::Write + ?Sized> WritePacketExt for W {}
318
319#[cfg(test)]
320mod tests {
321    extern crate env_logger;
322
323    use decode::*;
324    use super::*;
325
326    #[test]
327    fn test_encode_variable_length() {
328        let mut v = Vec::new();
329
330        assert_eq!(v.write_variable_length(123).unwrap(), 1);
331        assert_eq!(v, &[123]);
332
333        v.clear();
334
335        assert_eq!(v.write_variable_length(129).unwrap(), 2);
336        assert_eq!(v, b"\x81\x01");
337
338        v.clear();
339
340        assert_eq!(v.write_variable_length(16383).unwrap(), 2);
341        assert_eq!(v, b"\xff\x7f");
342
343        v.clear();
344
345        assert_eq!(v.write_variable_length(2097151).unwrap(), 3);
346        assert_eq!(v, b"\xff\xff\x7f");
347
348        v.clear();
349
350        assert_eq!(v.write_variable_length(268435455).unwrap(), 4);
351        assert_eq!(v, b"\xff\xff\xff\x7f");
352
353        assert!(v.write_variable_length(MAX_VARIABLE_LENGTH + 1).is_err())
354    }
355
356    #[test]
357    fn test_encode_fixed_header() {
358        let mut v = Vec::new();
359        let p = Packet::PingRequest;
360
361        assert_eq!(v.calc_content_size(&p), 0);
362        assert_eq!(v.write_fixed_header(&p).unwrap(), 2);
363        assert_eq!(v, b"\xc0\x00");
364
365        v.clear();
366
367        let p = Packet::Publish {
368            dup: true,
369            retain: true,
370            qos: QoS::ExactlyOnce,
371            topic: "topic",
372            packet_id: Some(0x4321),
373            payload: &(0..255).map(|b| b).collect::<Vec<u8>>(),
374        };
375
376        assert_eq!(v.calc_content_size(&p), 264);
377        assert_eq!(v.write_fixed_header(&p).unwrap(), 3);
378        assert_eq!(v, b"\x3d\x88\x02");
379    }
380
381    macro_rules! assert_packet {
382        ($p:expr, $data:expr) => {
383            let mut v = Vec::new();
384            assert_eq!(v.write_packet(&$p).unwrap(), $data.len());
385            assert_eq!(v, $data);
386            assert_eq!(read_packet($data).unwrap(), (&b""[..], $p));
387        }
388    }
389
390    #[test]
391    fn test_encode_connect_packets() {
392        assert_packet!(
393            Packet::Connect {
394                protocol: Protocol::MQTT(4),
395                clean_session: false,
396                keep_alive: 60,
397                client_id: "12345",
398                last_will: None,
399                username: Some("user"),
400                password: Some(b"pass"),
401            },
402            &b"\x10\x1D\x00\x04MQTT\x04\xC0\x00\x3C\x00\
403\x0512345\x00\x04user\x00\x04pass"[..]
404        );
405
406        assert_packet!(
407            Packet::Connect {
408                protocol: Protocol::MQTT(4),
409                clean_session: false,
410                keep_alive: 60,
411                client_id: "12345",
412                last_will: Some(LastWill {
413                    qos: QoS::ExactlyOnce,
414                    retain: false,
415                    topic: "topic",
416                    message: b"message",
417                }),
418                username: None,
419                password: None,
420            },
421            &b"\x10\x21\x00\x04MQTT\x04\x14\x00\x3C\x00\
422\x0512345\x00\x05topic\x00\x07message"[..]
423        );
424
425        assert_packet!(Packet::Disconnect, b"\xe0\x00");
426    }
427
428    #[test]
429    fn test_encode_publish_packets() {
430        assert_packet!(
431            Packet::Publish {
432                dup: true,
433                retain: true,
434                qos: QoS::ExactlyOnce,
435                topic: "topic",
436                packet_id: Some(0x4321),
437                payload: b"data",
438            },
439            b"\x3d\x0D\x00\x05topic\x43\x21data"
440        );
441
442        assert_packet!(
443            Packet::Publish {
444                dup: false,
445                retain: false,
446                qos: QoS::AtMostOnce,
447                topic: "topic",
448                packet_id: None,
449                payload: b"data",
450            },
451            b"\x30\x0b\x00\x05topicdata"
452        );
453    }
454
455    #[test]
456    fn test_encode_subscribe_packets() {
457        assert_packet!(
458            Packet::Subscribe {
459                packet_id: 0x1234,
460                topic_filters: vec![("test", QoS::AtLeastOnce), ("filter", QoS::ExactlyOnce)],
461            },
462            b"\x82\x12\x12\x34\x00\x04test\x01\x00\x06filter\x02"
463        );
464
465        assert_packet!(
466            Packet::SubscribeAck {
467                packet_id: 0x1234,
468                status: vec![
469                    SubscribeReturnCode::Success(QoS::AtLeastOnce),
470                    SubscribeReturnCode::Failure,
471                    SubscribeReturnCode::Success(QoS::ExactlyOnce),
472                ],
473            },
474            b"\x90\x05\x12\x34\x01\x80\x02"
475        );
476
477        assert_packet!(
478            Packet::Unsubscribe {
479                packet_id: 0x1234,
480                topic_filters: vec!["test", "filter"],
481            },
482            b"\xa2\x10\x12\x34\x00\x04test\x00\x06filter"
483        );
484
485        assert_packet!(
486            Packet::UnsubscribeAck { packet_id: 0x4321 },
487            b"\xb0\x02\x43\x21"
488        );
489    }
490
491    #[test]
492    fn test_encode_ping_packets() {
493        assert_packet!(Packet::PingRequest, b"\xc0\x00");
494        assert_packet!(Packet::PingResponse, b"\xd0\x00");
495    }
496}