mqttrust/encoding/v4/
decoder.rs

1use super::*;
2
3pub fn decode_slice(buf: &[u8]) -> Result<Option<Packet<'_>>, Error> {
4    let mut offset = 0;
5    if let Some((header, remaining_len)) = read_header(buf, &mut offset)? {
6        let r = read_packet(header, remaining_len, buf, &mut offset)?;
7        Ok(Some(r))
8    } else {
9        // Don't have a full packet
10        Ok(None)
11    }
12}
13
14fn read_packet<'a>(
15    header: Header,
16    remaining_len: usize,
17    buf: &'a [u8],
18    offset: &mut usize,
19) -> Result<Packet<'a>, Error> {
20    Ok(match header.typ {
21        PacketType::Pingreq => Packet::Pingreq,
22        PacketType::Pingresp => Packet::Pingresp,
23        PacketType::Disconnect => Packet::Disconnect,
24        PacketType::Connect => Connect::from_buffer(buf, offset)?.into(),
25        PacketType::Connack => Connack::from_buffer(buf, offset)?.into(),
26        PacketType::Publish => Publish::from_buffer(&header, remaining_len, buf, offset)?.into(),
27        PacketType::Puback => Packet::Puback(Pid::from_buffer(buf, offset)?),
28        PacketType::Pubrec => Packet::Pubrec(Pid::from_buffer(buf, offset)?),
29        PacketType::Pubrel => Packet::Pubrel(Pid::from_buffer(buf, offset)?),
30        PacketType::Pubcomp => Packet::Pubcomp(Pid::from_buffer(buf, offset)?),
31        PacketType::Subscribe => Subscribe::from_buffer(remaining_len, buf, offset)?.into(),
32        PacketType::Suback => Suback::from_buffer(remaining_len, buf, offset)?.into(),
33        PacketType::Unsubscribe => Unsubscribe::from_buffer(remaining_len, buf, offset)?.into(),
34        PacketType::Unsuback => Packet::Unsuback(Pid::from_buffer(buf, offset)?),
35    })
36}
37
38/// Read the parsed header and remaining_len from the buffer. Only return Some() and advance the
39/// buffer position if there is enough data in the buffer to read the full packet.
40pub fn read_header(buf: &[u8], offset: &mut usize) -> Result<Option<(Header, usize)>, Error> {
41    let mut len: usize = 0;
42    for pos in 0..=3 {
43        if buf.len() > *offset + pos + 1 {
44            let byte = buf[*offset + pos + 1];
45            len += (byte as usize & 0x7F) << (pos * 7);
46            if (byte & 0x80) == 0 {
47                // Continuation bit == 0, length is parsed
48                if buf.len() < *offset + 2 + pos + len {
49                    // Won't be able to read full packet
50                    return Ok(None);
51                }
52                // Parse header byte, skip past the header, and return
53                let header = Header::new(buf[*offset])?;
54                *offset += pos + 2;
55                return Ok(Some((header, len)));
56            }
57        } else {
58            // Couldn't read full length
59            return Ok(None);
60        }
61    }
62    // Continuation byte == 1 four times, that's illegal.
63    Err(Error::InvalidHeader)
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct Header {
68    pub typ: PacketType,
69    pub dup: bool,
70    pub qos: QoS,
71    pub retain: bool,
72}
73impl Header {
74    pub fn new(hd: u8) -> Result<Header, Error> {
75        let (typ, flags_ok) = match hd >> 4 {
76            1 => (PacketType::Connect, hd & 0b1111 == 0),
77            2 => (PacketType::Connack, hd & 0b1111 == 0),
78            3 => (PacketType::Publish, true),
79            4 => (PacketType::Puback, hd & 0b1111 == 0),
80            5 => (PacketType::Pubrec, hd & 0b1111 == 0),
81            6 => (PacketType::Pubrel, hd & 0b1111 == 0b0010),
82            7 => (PacketType::Pubcomp, hd & 0b1111 == 0),
83            8 => (PacketType::Subscribe, hd & 0b1111 == 0b0010),
84            9 => (PacketType::Suback, hd & 0b1111 == 0),
85            10 => (PacketType::Unsubscribe, hd & 0b1111 == 0b0010),
86            11 => (PacketType::Unsuback, hd & 0b1111 == 0),
87            12 => (PacketType::Pingreq, hd & 0b1111 == 0),
88            13 => (PacketType::Pingresp, hd & 0b1111 == 0),
89            14 => (PacketType::Disconnect, hd & 0b1111 == 0),
90            _ => (PacketType::Connect, false),
91        };
92        if !flags_ok {
93            return Err(Error::InvalidHeader);
94        }
95        Ok(Header {
96            typ,
97            dup: hd & 0b1000 != 0,
98            qos: QoS::from_u8((hd & 0b110) >> 1)?,
99            retain: hd & 1 == 1,
100        })
101    }
102}
103
104pub(crate) fn read_str<'a>(buf: &'a [u8], offset: &mut usize) -> Result<&'a str, Error> {
105    core::str::from_utf8(read_bytes(buf, offset)?).map_err(|_| Error::InvalidString)
106}
107
108pub(crate) fn read_bytes<'a>(buf: &'a [u8], offset: &mut usize) -> Result<&'a [u8], Error> {
109    if buf[*offset..].len() < 2 {
110        return Err(Error::InvalidLength);
111    }
112    let len = ((buf[*offset] as usize) << 8) | buf[*offset + 1] as usize;
113
114    *offset += 2;
115    if len > buf[*offset..].len() {
116        Err(Error::InvalidLength)
117    } else {
118        let bytes = &buf[*offset..*offset + len];
119        *offset += len;
120        Ok(bytes)
121    }
122}