Skip to main content

rumqttc/mqttbytes/v5/
codec.rs

1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use super::{Error, Packet};
5
6/// MQTT v4 codec
7#[derive(Debug, Clone)]
8pub struct Codec {
9    /// Maximum packet size allowed by client
10    pub max_incoming_size: Option<u32>,
11    /// Maximum packet size allowed by broker
12    pub max_outgoing_size: Option<u32>,
13}
14
15impl Decoder for Codec {
16    type Item = Packet;
17    type Error = Error;
18
19    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
20        match Packet::read(src, self.max_incoming_size) {
21            Ok(packet) => Ok(Some(packet)),
22            Err(Error::InsufficientBytes(b)) => {
23                // Get more packets to construct the incomplete packet
24                src.reserve(b);
25                Ok(None)
26            }
27            Err(e) => Err(e),
28        }
29    }
30}
31
32impl Encoder<Packet> for Codec {
33    type Error = Error;
34
35    fn encode(&mut self, item: Packet, dst: &mut BytesMut) -> Result<(), Self::Error> {
36        item.write(dst, self.max_outgoing_size)?;
37
38        Ok(())
39    }
40}
41
42#[cfg(test)]
43mod tests {
44    use bytes::BytesMut;
45    use tokio_util::codec::{Decoder, Encoder};
46
47    use super::Codec;
48    use crate::{
49        Packet, Publish,
50        mqttbytes::{Error, QoS},
51    };
52
53    #[test]
54    fn outgoing_max_packet_size_check() {
55        let mut buf = BytesMut::new();
56        let mut codec = Codec {
57            max_incoming_size: Some(100),
58            max_outgoing_size: Some(200),
59        };
60
61        let mut small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100], None);
62        small_publish.pkid = 1;
63        codec
64            .encode(Packet::Publish(small_publish), &mut buf)
65            .unwrap();
66
67        let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265], None);
68        match codec.encode(Packet::Publish(large_publish), &mut buf) {
69            Err(Error::OutgoingPacketTooLarge {
70                pkt_size: 282,
71                max: 200,
72            }) => {}
73            _ => unreachable!(),
74        }
75    }
76
77    #[test]
78    fn incoming_max_packet_size_check_happens_on_partial_frame() {
79        let mut buf = BytesMut::from(&[0x30, 0x14][..]);
80        let mut codec = Codec {
81            max_incoming_size: Some(10),
82            max_outgoing_size: Some(200),
83        };
84
85        match codec.decode(&mut buf) {
86            Err(Error::PayloadSizeLimitExceeded {
87                pkt_size: 20,
88                max: 10,
89            }) => {}
90            _ => unreachable!(),
91        }
92    }
93}