mqttrs/
decoder.rs

1use crate::*;
2
3pub fn clone_packet(input: &[u8], output: &mut [u8]) -> Result<usize, Error> {
4    if input.is_empty() {
5        return Ok(0);
6    }
7
8    let mut offset = 0;
9    // while Header::new(input[offset]).is_err() {
10    //     offset += 1;
11    //     if input[offset..].is_empty() {
12    //         return Ok(0);
13    //     }
14    // }
15
16    let start = offset;
17    if let Some((_, remaining_len)) = read_header(input, &mut offset)? {
18        let end = offset + remaining_len;
19        let len = end - start;
20        output[..len].copy_from_slice(&input[start..end]);
21        Ok(len)
22    } else {
23        // Don't have a full packet
24        Ok(0)
25    }
26}
27
28/// Decode bytes from a [BytesMut] buffer as a [Packet] enum.
29///
30/// The buf is never actually written to, it only takes a `BytesMut` instead of a `Bytes` to
31/// allow using the same buffer to read bytes from network.
32///
33/// ```
34/// # use mqttrs::*;
35/// # use bytes::*;
36/// // Fill a buffer with encoded data (probably from a `TcpStream`).
37/// let mut buf = BytesMut::from(&[0b00110000, 11,
38///                                0, 4, 't' as u8, 'e' as u8, 's' as u8, 't' as u8,
39///                                'h' as u8, 'e' as u8, 'l' as u8, 'l' as u8, 'o' as u8] as &[u8]);
40///
41/// // Parse the bytes and check the result.
42/// match decode_slice(&mut buf) {
43///     Ok(Some(Packet::Publish(p))) => {
44///         assert_eq!(p.payload, b"hello");
45///     },
46///     // In real code you probably don't want to panic like that ;)
47///     Ok(None) => panic!("not enough data"),
48///     other => panic!("unexpected {:?}", other),
49/// }
50/// ```
51///
52/// [Packet]: ../enum.Packet.html
53/// [BytesMut]: https://docs.rs/bytes/1.0.0/bytes/struct.BytesMut.html
54pub fn decode_slice<'a>(buf: &'a [u8]) -> Result<Option<Packet<'a>>, Error> {
55    let mut offset = 0;
56    if let Some((header, remaining_len)) = read_header(buf, &mut offset)? {
57        let r = read_packet(header, remaining_len, buf, &mut offset)?;
58        Ok(Some(r))
59    } else {
60        // Don't have a full packet
61        Ok(None)
62    }
63}
64
65fn read_packet<'a>(
66    header: Header,
67    remaining_len: usize,
68    buf: &'a [u8],
69    offset: &mut usize,
70) -> Result<Packet<'a>, Error> {
71    Ok(match header.typ {
72        PacketType::Pingreq => Packet::Pingreq,
73        PacketType::Pingresp => Packet::Pingresp,
74        PacketType::Disconnect => Packet::Disconnect,
75        PacketType::Connect => Connect::from_buffer(buf, offset)?.into(),
76        PacketType::Connack => Connack::from_buffer(buf, offset)?.into(),
77        PacketType::Publish => Publish::from_buffer(&header, remaining_len, buf, offset)?.into(),
78        PacketType::Puback => Packet::Puback(Pid::from_buffer(buf, offset)?),
79        PacketType::Pubrec => Packet::Pubrec(Pid::from_buffer(buf, offset)?),
80        PacketType::Pubrel => Packet::Pubrel(Pid::from_buffer(buf, offset)?),
81        PacketType::Pubcomp => Packet::Pubcomp(Pid::from_buffer(buf, offset)?),
82        PacketType::Subscribe => Subscribe::from_buffer(remaining_len, buf, offset)?.into(),
83        PacketType::Suback => Suback::from_buffer(remaining_len, buf, offset)?.into(),
84        PacketType::Unsubscribe => Unsubscribe::from_buffer(remaining_len, buf, offset)?.into(),
85        PacketType::Unsuback => Packet::Unsuback(Pid::from_buffer(buf, offset)?),
86    })
87}
88
89/// Read the parsed header and remaining_len from the buffer. Only return Some() and advance the
90/// buffer position if there is enough data in the buffer to read the full packet.
91pub(crate) fn read_header<'a>(
92    buf: &'a [u8],
93    offset: &mut usize,
94) -> Result<Option<(Header, usize)>, Error> {
95    let mut len: usize = 0;
96    for pos in 0..=3 {
97        if buf.len() > *offset + pos + 1 {
98            let byte = buf[*offset + pos + 1];
99            len += (byte as usize & 0x7F) << (pos * 7);
100            if (byte & 0x80) == 0 {
101                // Continuation bit == 0, length is parsed
102                if buf.len() < *offset + 2 + pos + len {
103                    // Won't be able to read full packet
104                    return Ok(None);
105                }
106                // Parse header byte, skip past the header, and return
107                let header = Header::new(buf[*offset])?;
108                *offset += pos + 2;
109                return Ok(Some((header, len)));
110            }
111        } else {
112            // Couldn't read full length
113            return Ok(None);
114        }
115    }
116    // Continuation byte == 1 four times, that's illegal.
117    Err(Error::InvalidHeader)
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub(crate) struct Header {
122    pub typ: PacketType,
123    pub dup: bool,
124    pub qos: QoS,
125    pub retain: bool,
126}
127impl Header {
128    pub fn new(hd: u8) -> Result<Header, Error> {
129        let (typ, flags_ok) = match hd >> 4 {
130            1 => (PacketType::Connect, hd & 0b1111 == 0),
131            2 => (PacketType::Connack, hd & 0b1111 == 0),
132            3 => (PacketType::Publish, true),
133            4 => (PacketType::Puback, hd & 0b1111 == 0),
134            5 => (PacketType::Pubrec, hd & 0b1111 == 0),
135            6 => (PacketType::Pubrel, hd & 0b1111 == 0b0010),
136            7 => (PacketType::Pubcomp, hd & 0b1111 == 0),
137            8 => (PacketType::Subscribe, hd & 0b1111 == 0b0010),
138            9 => (PacketType::Suback, hd & 0b1111 == 0),
139            10 => (PacketType::Unsubscribe, hd & 0b1111 == 0b0010),
140            11 => (PacketType::Unsuback, hd & 0b1111 == 0),
141            12 => (PacketType::Pingreq, hd & 0b1111 == 0),
142            13 => (PacketType::Pingresp, hd & 0b1111 == 0),
143            14 => (PacketType::Disconnect, hd & 0b1111 == 0),
144            _ => (PacketType::Connect, false),
145        };
146        if !flags_ok {
147            return Err(Error::InvalidHeader);
148        }
149        Ok(Header {
150            typ,
151            dup: hd & 0b1000 != 0,
152            qos: QoS::from_u8((hd & 0b110) >> 1)?,
153            retain: hd & 1 == 1,
154        })
155    }
156}
157
158pub(crate) fn read_str<'a>(buf: &'a [u8], offset: &mut usize) -> Result<&'a str, Error> {
159    core::str::from_utf8(read_bytes(buf, offset)?).map_err(|e| Error::InvalidString(e))
160}
161
162pub(crate) fn read_bytes<'a>(buf: &'a [u8], offset: &mut usize) -> Result<&'a [u8], Error> {
163    if buf[*offset..].len() < 2 {
164        return Err(Error::InvalidLength);
165    }
166    let len = ((buf[*offset] as usize) << 8) | buf[*offset + 1] as usize;
167    *offset += 2;
168    if len > buf[*offset..].len() {
169        Err(Error::InvalidLength)
170    } else {
171        let bytes = &buf[*offset..*offset + len];
172        *offset += len;
173        Ok(bytes)
174    }
175}