rumqttc/mqttbytes/v5/
codec.rs1use bytes::BytesMut;
2use tokio_util::codec::{Decoder, Encoder};
3
4use super::{Error, Packet};
5
6#[derive(Debug, Clone)]
8pub struct Codec {
9 pub max_incoming_size: Option<u32>,
11 pub max_outgoing_size: Option<u32>,
13}
14
15impl Decoder for Codec {
16 type Item = Packet;
17 type Error = Error;
18
19 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
20 match Packet::read(src, self.max_incoming_size) {
21 Ok(packet) => Ok(Some(packet)),
22 Err(Error::InsufficientBytes(b)) => {
23 src.reserve(b);
25 Ok(None)
26 }
27 Err(e) => Err(e),
28 }
29 }
30}
31
32impl Encoder<Packet> for Codec {
33 type Error = Error;
34
35 fn encode(&mut self, item: Packet, dst: &mut BytesMut) -> Result<(), Self::Error> {
36 item.write(dst, self.max_outgoing_size)?;
37
38 Ok(())
39 }
40}
41
42#[cfg(test)]
43mod tests {
44 use bytes::BytesMut;
45 use tokio_util::codec::{Decoder, Encoder};
46
47 use super::Codec;
48 use crate::{
49 Packet, Publish,
50 mqttbytes::{Error, QoS},
51 };
52
53 #[test]
54 fn outgoing_max_packet_size_check() {
55 let mut buf = BytesMut::new();
56 let mut codec = Codec {
57 max_incoming_size: Some(100),
58 max_outgoing_size: Some(200),
59 };
60
61 let mut small_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 100], None);
62 small_publish.pkid = 1;
63 codec
64 .encode(Packet::Publish(small_publish), &mut buf)
65 .unwrap();
66
67 let large_publish = Publish::new("hello/world", QoS::AtLeastOnce, vec![1; 265], None);
68 match codec.encode(Packet::Publish(large_publish), &mut buf) {
69 Err(Error::OutgoingPacketTooLarge {
70 pkt_size: 282,
71 max: 200,
72 }) => {}
73 _ => unreachable!(),
74 }
75 }
76
77 #[test]
78 fn incoming_max_packet_size_check_happens_on_partial_frame() {
79 let mut buf = BytesMut::from(&[0x30, 0x14][..]);
80 let mut codec = Codec {
81 max_incoming_size: Some(10),
82 max_outgoing_size: Some(200),
83 };
84
85 match codec.decode(&mut buf) {
86 Err(Error::PayloadSizeLimitExceeded {
87 pkt_size: 20,
88 max: 10,
89 }) => {}
90 _ => unreachable!(),
91 }
92 }
93}