mqtt_v5/
encoder.rs

1use crate::types::{
2    properties::*, AuthenticatePacket, ConnectAckPacket, ConnectPacket, DisconnectPacket, Encode,
3    Packet, PropertySize, ProtocolVersion, PublishAckPacket, PublishCompletePacket, PublishPacket,
4    PublishReceivedPacket, PublishReleasePacket, SubscribeAckPacket, SubscribePacket,
5    UnsubscribeAckPacket, UnsubscribePacket, VariableByteInt,
6};
7use bytes::{BufMut, BytesMut};
8
9fn encode_variable_int(value: u32, bytes: &mut BytesMut) -> usize {
10    let mut x = value;
11    let mut byte_counter = 0;
12
13    loop {
14        let mut encoded_byte: u8 = (x % 128) as u8;
15        x /= 128;
16
17        if x > 0 {
18            encoded_byte |= 128;
19        }
20
21        bytes.put_u8(encoded_byte);
22
23        byte_counter += 1;
24
25        if x == 0 {
26            break;
27        }
28    }
29
30    byte_counter
31}
32
33fn encode_string(value: &str, bytes: &mut BytesMut) {
34    bytes.put_u16(value.len() as u16);
35    bytes.put_slice(value.as_bytes());
36}
37
38fn encode_binary_data(value: &[u8], bytes: &mut BytesMut) {
39    bytes.put_u16(value.len() as u16);
40    bytes.put_slice(value);
41}
42
43impl Encode for PayloadFormatIndicator {
44    fn encode(&self, bytes: &mut BytesMut) {
45        bytes.put_u8(PropertyType::PayloadFormatIndicator as u8);
46        bytes.put_u8(self.0);
47    }
48}
49impl Encode for MessageExpiryInterval {
50    fn encode(&self, bytes: &mut BytesMut) {
51        bytes.put_u8(PropertyType::MessageExpiryInterval as u8);
52        bytes.put_u32(self.0);
53    }
54}
55impl Encode for ContentType {
56    fn encode(&self, bytes: &mut BytesMut) {
57        bytes.put_u8(PropertyType::ContentType as u8);
58        encode_string(&self.0, bytes);
59    }
60}
61impl Encode for ResponseTopic {
62    fn encode(&self, bytes: &mut BytesMut) {
63        bytes.put_u8(PropertyType::ResponseTopic as u8);
64        encode_string(&self.0, bytes);
65    }
66}
67impl Encode for CorrelationData {
68    fn encode(&self, bytes: &mut BytesMut) {
69        bytes.put_u8(PropertyType::CorrelationData as u8);
70        encode_binary_data(&self.0, bytes);
71    }
72}
73impl Encode for SubscriptionIdentifier {
74    fn encode(&self, bytes: &mut BytesMut) {
75        bytes.put_u8(PropertyType::SubscriptionIdentifier as u8);
76        encode_variable_int((self.0).0, bytes);
77    }
78}
79impl Encode for SessionExpiryInterval {
80    fn encode(&self, bytes: &mut BytesMut) {
81        bytes.put_u8(PropertyType::SessionExpiryInterval as u8);
82        bytes.put_u32(self.0);
83    }
84}
85impl Encode for AssignedClientIdentifier {
86    fn encode(&self, bytes: &mut BytesMut) {
87        bytes.put_u8(PropertyType::AssignedClientIdentifier as u8);
88        encode_string(&self.0, bytes);
89    }
90}
91impl Encode for ServerKeepAlive {
92    fn encode(&self, bytes: &mut BytesMut) {
93        bytes.put_u8(PropertyType::ServerKeepAlive as u8);
94        bytes.put_u16(self.0)
95    }
96}
97impl Encode for AuthenticationMethod {
98    fn encode(&self, bytes: &mut BytesMut) {
99        bytes.put_u8(PropertyType::AuthenticationMethod as u8);
100        encode_string(&self.0, bytes);
101    }
102}
103impl Encode for AuthenticationData {
104    fn encode(&self, bytes: &mut BytesMut) {
105        bytes.put_u8(PropertyType::AuthenticationData as u8);
106        encode_binary_data(&self.0, bytes);
107    }
108}
109impl Encode for RequestProblemInformation {
110    fn encode(&self, bytes: &mut BytesMut) {
111        bytes.put_u8(PropertyType::RequestProblemInformation as u8);
112        bytes.put_u8(self.0);
113    }
114}
115impl Encode for WillDelayInterval {
116    fn encode(&self, bytes: &mut BytesMut) {
117        bytes.put_u8(PropertyType::WillDelayInterval as u8);
118        bytes.put_u32(self.0);
119    }
120}
121impl Encode for RequestResponseInformation {
122    fn encode(&self, bytes: &mut BytesMut) {
123        bytes.put_u8(PropertyType::RequestResponseInformation as u8);
124        bytes.put_u8(self.0);
125    }
126}
127impl Encode for ResponseInformation {
128    fn encode(&self, bytes: &mut BytesMut) {
129        bytes.put_u8(PropertyType::ResponseInformation as u8);
130        encode_string(&self.0, bytes);
131    }
132}
133impl Encode for ServerReference {
134    fn encode(&self, bytes: &mut BytesMut) {
135        bytes.put_u8(PropertyType::ServerReference as u8);
136        encode_string(&self.0, bytes);
137    }
138}
139impl Encode for ReasonString {
140    fn encode(&self, bytes: &mut BytesMut) {
141        bytes.put_u8(PropertyType::ReasonString as u8);
142        encode_string(&self.0, bytes);
143    }
144}
145impl Encode for ReceiveMaximum {
146    fn encode(&self, bytes: &mut BytesMut) {
147        bytes.put_u8(PropertyType::ReceiveMaximum as u8);
148        bytes.put_u16(self.0);
149    }
150}
151impl Encode for TopicAliasMaximum {
152    fn encode(&self, bytes: &mut BytesMut) {
153        bytes.put_u8(PropertyType::TopicAliasMaximum as u8);
154        bytes.put_u16(self.0);
155    }
156}
157impl Encode for TopicAlias {
158    fn encode(&self, bytes: &mut BytesMut) {
159        bytes.put_u8(PropertyType::TopicAlias as u8);
160        bytes.put_u16(self.0);
161    }
162}
163impl Encode for MaximumQos {
164    fn encode(&self, bytes: &mut BytesMut) {
165        bytes.put_u8(PropertyType::MaximumQos as u8);
166        bytes.put_u8(self.0 as u8);
167    }
168}
169impl Encode for RetainAvailable {
170    fn encode(&self, bytes: &mut BytesMut) {
171        bytes.put_u8(PropertyType::RetainAvailable as u8);
172        bytes.put_u8(self.0);
173    }
174}
175impl Encode for UserProperty {
176    fn encode(&self, bytes: &mut BytesMut) {
177        bytes.put_u8(PropertyType::UserProperty as u8);
178        encode_string(&self.0, bytes);
179        encode_string(&self.1, bytes);
180    }
181}
182impl Encode for MaximumPacketSize {
183    fn encode(&self, bytes: &mut BytesMut) {
184        bytes.put_u8(PropertyType::MaximumPacketSize as u8);
185        bytes.put_u32(self.0);
186    }
187}
188impl Encode for WildcardSubscriptionAvailable {
189    fn encode(&self, bytes: &mut BytesMut) {
190        bytes.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
191        bytes.put_u8(self.0);
192    }
193}
194impl Encode for SubscriptionIdentifierAvailable {
195    fn encode(&self, bytes: &mut BytesMut) {
196        bytes.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
197        bytes.put_u8(self.0);
198    }
199}
200impl Encode for SharedSubscriptionAvailable {
201    fn encode(&self, bytes: &mut BytesMut) {
202        bytes.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
203        bytes.put_u8(self.0);
204    }
205}
206
207fn encode_connect(packet: &ConnectPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
208    encode_string(&packet.protocol_name, bytes);
209    bytes.put_u8(packet.protocol_version as u8);
210
211    let mut connect_flags: u8 = 0b0000_0000;
212
213    if packet.user_name.is_some() {
214        connect_flags |= 0b1000_0000;
215    }
216
217    if packet.password.is_some() {
218        connect_flags |= 0b0100_0000;
219    }
220
221    if let Some(will) = &packet.will {
222        if will.should_retain {
223            connect_flags |= 0b0100_0000;
224        }
225
226        let qos_byte: u8 = will.qos as u8;
227        connect_flags |= (qos_byte & 0b0000_0011) << 3;
228        connect_flags |= 0b0000_0100;
229    }
230
231    if packet.clean_start {
232        connect_flags |= 0b0000_0010;
233    }
234
235    bytes.put_u8(connect_flags);
236    bytes.put_u16(packet.keep_alive);
237
238    if protocol_version == ProtocolVersion::V500 {
239        let property_length = packet.property_size(protocol_version);
240        encode_variable_int(property_length, bytes);
241
242        packet.session_expiry_interval.encode(bytes);
243        packet.receive_maximum.encode(bytes);
244        packet.maximum_packet_size.encode(bytes);
245        packet.topic_alias_maximum.encode(bytes);
246        packet.request_response_information.encode(bytes);
247        packet.request_problem_information.encode(bytes);
248        packet.user_properties.encode(bytes);
249        packet.authentication_method.encode(bytes);
250        packet.authentication_data.encode(bytes);
251    }
252
253    encode_string(&packet.client_id, bytes);
254
255    if let Some(will) = &packet.will {
256        if protocol_version == ProtocolVersion::V500 {
257            let property_length = will.property_size(protocol_version);
258            encode_variable_int(property_length, bytes);
259
260            will.will_delay_interval.encode(bytes);
261            will.payload_format_indicator.encode(bytes);
262            will.message_expiry_interval.encode(bytes);
263            will.content_type.encode(bytes);
264            will.response_topic.encode(bytes);
265            will.correlation_data.encode(bytes);
266            will.user_properties.encode(bytes);
267        }
268
269        encode_string(&will.topic, bytes);
270        encode_binary_data(&will.payload, bytes);
271    }
272
273    if let Some(user_name) = &packet.user_name {
274        encode_string(&user_name, bytes);
275    }
276
277    if let Some(password) = &packet.password {
278        encode_string(&password, bytes);
279    }
280}
281
282fn encode_connect_ack(
283    packet: &ConnectAckPacket,
284    bytes: &mut BytesMut,
285    protocol_version: ProtocolVersion,
286) {
287    let mut connect_ack_flags: u8 = 0b0000_0000;
288    if packet.session_present {
289        connect_ack_flags |= 0b0000_0001;
290    }
291
292    bytes.put_u8(connect_ack_flags);
293    bytes.put_u8(packet.reason_code as u8);
294
295    if protocol_version == ProtocolVersion::V500 {
296        let property_length = packet.property_size(protocol_version);
297        encode_variable_int(property_length, bytes);
298
299        packet.session_expiry_interval.encode(bytes);
300        packet.receive_maximum.encode(bytes);
301        packet.maximum_qos.encode(bytes);
302        packet.retain_available.encode(bytes);
303        packet.maximum_packet_size.encode(bytes);
304        packet.assigned_client_identifier.encode(bytes);
305        packet.topic_alias_maximum.encode(bytes);
306        packet.reason_string.encode(bytes);
307        packet.user_properties.encode(bytes);
308        packet.wildcard_subscription_available.encode(bytes);
309        packet.subscription_identifiers_available.encode(bytes);
310        packet.shared_subscription_available.encode(bytes);
311        packet.server_keep_alive.encode(bytes);
312        packet.response_information.encode(bytes);
313        packet.server_reference.encode(bytes);
314        packet.authentication_method.encode(bytes);
315        packet.authentication_data.encode(bytes);
316    }
317}
318
319fn encode_publish(packet: &PublishPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
320    encode_string(&packet.topic.to_string(), bytes);
321
322    if let Some(packet_id) = packet.packet_id {
323        bytes.put_u16(packet_id);
324    }
325
326    if protocol_version == ProtocolVersion::V500 {
327        let property_length = packet.property_size(protocol_version);
328        encode_variable_int(property_length, bytes);
329
330        packet.payload_format_indicator.encode(bytes);
331        packet.message_expiry_interval.encode(bytes);
332        packet.topic_alias.encode(bytes);
333        packet.response_topic.encode(bytes);
334        packet.correlation_data.encode(bytes);
335        packet.user_properties.encode(bytes);
336        packet.subscription_identifier.encode(bytes);
337        packet.content_type.encode(bytes);
338    }
339
340    bytes.put_slice(&packet.payload);
341}
342
343fn encode_publish_ack(
344    packet: &PublishAckPacket,
345    bytes: &mut BytesMut,
346    protocol_version: ProtocolVersion,
347) {
348    bytes.put_u16(packet.packet_id);
349
350    if protocol_version == ProtocolVersion::V500 {
351        bytes.put_u8(packet.reason_code as u8);
352
353        let property_length = packet.property_size(protocol_version);
354        encode_variable_int(property_length, bytes);
355
356        packet.reason_string.encode(bytes);
357        packet.user_properties.encode(bytes);
358    }
359}
360
361fn encode_publish_received(
362    packet: &PublishReceivedPacket,
363    bytes: &mut BytesMut,
364    protocol_version: ProtocolVersion,
365) {
366    bytes.put_u16(packet.packet_id);
367    bytes.put_u8(packet.reason_code as u8);
368
369    if protocol_version == ProtocolVersion::V500 {
370        let property_length = packet.property_size(protocol_version);
371        encode_variable_int(property_length, bytes);
372
373        packet.reason_string.encode(bytes);
374        packet.user_properties.encode(bytes);
375    }
376}
377
378fn encode_publish_release(
379    packet: &PublishReleasePacket,
380    bytes: &mut BytesMut,
381    protocol_version: ProtocolVersion,
382) {
383    bytes.put_u16(packet.packet_id);
384    bytes.put_u8(packet.reason_code as u8);
385
386    if protocol_version == ProtocolVersion::V500 {
387        let property_length = packet.property_size(protocol_version);
388        encode_variable_int(property_length, bytes);
389
390        packet.reason_string.encode(bytes);
391        packet.user_properties.encode(bytes);
392    }
393}
394
395fn encode_publish_complete(
396    packet: &PublishCompletePacket,
397    bytes: &mut BytesMut,
398    protocol_version: ProtocolVersion,
399) {
400    bytes.put_u16(packet.packet_id);
401    bytes.put_u8(packet.reason_code as u8);
402
403    if protocol_version == ProtocolVersion::V500 {
404        let property_length = packet.property_size(protocol_version);
405        encode_variable_int(property_length, bytes);
406
407        packet.reason_string.encode(bytes);
408        packet.user_properties.encode(bytes);
409    }
410}
411
412fn encode_subscribe(
413    packet: &SubscribePacket,
414    bytes: &mut BytesMut,
415    protocol_version: ProtocolVersion,
416) {
417    bytes.put_u16(packet.packet_id);
418
419    if protocol_version == ProtocolVersion::V500 {
420        let property_length = packet.property_size(protocol_version);
421        encode_variable_int(property_length, bytes);
422
423        packet.subscription_identifier.encode(bytes);
424        packet.user_properties.encode(bytes);
425    }
426
427    for topic in &packet.subscription_topics {
428        encode_string(&topic.topic_filter.to_string(), bytes);
429
430        let mut options_byte = 0b0000_0000;
431        let retain_handling_byte = topic.retain_handling as u8;
432        options_byte |= (retain_handling_byte & 0b0000_0011) << 4;
433
434        if topic.retain_as_published {
435            options_byte |= 0b0000_1000;
436        }
437
438        if topic.no_local {
439            options_byte |= 0b0000_0100;
440        }
441
442        let qos_byte = topic.maximum_qos as u8;
443        options_byte |= qos_byte & 0b0000_0011;
444
445        bytes.put_u8(options_byte);
446    }
447}
448
449fn encode_subscribe_ack(
450    packet: &SubscribeAckPacket,
451    bytes: &mut BytesMut,
452    protocol_version: ProtocolVersion,
453) {
454    bytes.put_u16(packet.packet_id);
455
456    if protocol_version == ProtocolVersion::V500 {
457        let property_length = packet.property_size(protocol_version);
458        encode_variable_int(property_length, bytes);
459
460        packet.reason_string.encode(bytes);
461        packet.user_properties.encode(bytes);
462    }
463
464    for code in &packet.reason_codes {
465        bytes.put_u8((*code) as u8);
466    }
467}
468
469fn encode_unsubscribe(
470    packet: &UnsubscribePacket,
471    bytes: &mut BytesMut,
472    protocol_version: ProtocolVersion,
473) {
474    bytes.put_u16(packet.packet_id);
475
476    if protocol_version == ProtocolVersion::V500 {
477        let property_length = packet.property_size(protocol_version);
478        encode_variable_int(property_length, bytes);
479
480        packet.user_properties.encode(bytes);
481    }
482
483    for topic_filter in &packet.topic_filters {
484        encode_string(&topic_filter.to_string(), bytes);
485    }
486}
487
488fn encode_unsubscribe_ack(
489    packet: &UnsubscribeAckPacket,
490    bytes: &mut BytesMut,
491    protocol_version: ProtocolVersion,
492) {
493    bytes.put_u16(packet.packet_id);
494
495    if protocol_version == ProtocolVersion::V500 {
496        let property_length = packet.property_size(protocol_version);
497        encode_variable_int(property_length, bytes);
498
499        packet.reason_string.encode(bytes);
500        packet.user_properties.encode(bytes);
501    }
502
503    for code in &packet.reason_codes {
504        bytes.put_u8((*code) as u8);
505    }
506}
507
508fn encode_disconnect(
509    packet: &DisconnectPacket,
510    bytes: &mut BytesMut,
511    protocol_version: ProtocolVersion,
512) {
513    bytes.put_u8(packet.reason_code as u8);
514
515    if protocol_version == ProtocolVersion::V500 {
516        let property_length = packet.property_size(protocol_version);
517        encode_variable_int(property_length, bytes);
518
519        packet.session_expiry_interval.encode(bytes);
520        packet.reason_string.encode(bytes);
521        packet.user_properties.encode(bytes);
522        packet.server_reference.encode(bytes);
523    }
524}
525
526fn encode_authenticate(
527    packet: &AuthenticatePacket,
528    bytes: &mut BytesMut,
529    protocol_version: ProtocolVersion,
530) {
531    bytes.put_u8(packet.reason_code as u8);
532
533    if protocol_version == ProtocolVersion::V500 {
534        let property_length = packet.property_size(protocol_version);
535        encode_variable_int(property_length, bytes);
536
537        packet.authentication_method.encode(bytes);
538        packet.authentication_data.encode(bytes);
539        packet.reason_string.encode(bytes);
540        packet.user_properties.encode(bytes);
541    }
542}
543
544pub fn encode_mqtt(packet: &Packet, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
545    let remaining_length = packet.calculate_size(protocol_version);
546    let packet_size =
547        1 + VariableByteInt(remaining_length).calculate_size(protocol_version) + remaining_length;
548    bytes.reserve(packet_size as usize);
549
550    let first_byte = packet.to_byte();
551    let mut first_byte_val = (first_byte << 4) & 0b1111_0000;
552    first_byte_val |= packet.fixed_header_flags();
553
554    bytes.put_u8(first_byte_val);
555    encode_variable_int(remaining_length as u32, bytes);
556
557    match packet {
558        Packet::Connect(p) => encode_connect(p, bytes, protocol_version),
559        Packet::ConnectAck(p) => encode_connect_ack(p, bytes, protocol_version),
560        Packet::Publish(p) => encode_publish(p, bytes, protocol_version),
561        Packet::PublishAck(p) => encode_publish_ack(p, bytes, protocol_version),
562        Packet::PublishReceived(p) => encode_publish_received(p, bytes, protocol_version),
563        Packet::PublishRelease(p) => encode_publish_release(p, bytes, protocol_version),
564        Packet::PublishComplete(p) => encode_publish_complete(p, bytes, protocol_version),
565        Packet::Subscribe(p) => encode_subscribe(p, bytes, protocol_version),
566        Packet::SubscribeAck(p) => encode_subscribe_ack(p, bytes, protocol_version),
567        Packet::Unsubscribe(p) => encode_unsubscribe(p, bytes, protocol_version),
568        Packet::UnsubscribeAck(p) => encode_unsubscribe_ack(p, bytes, protocol_version),
569        Packet::PingRequest => {},
570        Packet::PingResponse => {},
571        Packet::Disconnect(p) => encode_disconnect(p, bytes, protocol_version),
572        Packet::Authenticate(p) => encode_authenticate(p, bytes, protocol_version),
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use crate::{decoder::*, encoder::*, types::*};
579    use bytes::BytesMut;
580
581    #[test]
582    fn connect_roundtrip() {
583        let packet = Packet::Connect(ConnectPacket {
584            protocol_name: "MQTT".to_string(),
585            protocol_version: ProtocolVersion::V500,
586            clean_start: true,
587            keep_alive: 200,
588
589            session_expiry_interval: None,
590            receive_maximum: None,
591            maximum_packet_size: None,
592            topic_alias_maximum: None,
593            request_response_information: None,
594            request_problem_information: None,
595            user_properties: vec![],
596            authentication_method: None,
597            authentication_data: None,
598
599            client_id: "test_client".to_string(),
600            will: None,
601            user_name: None,
602            password: None,
603        });
604
605        let mut bytes = BytesMut::new();
606        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
607        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
608
609        assert_eq!(packet, decoded);
610    }
611
612    #[test]
613    fn connect_ack_roundtrip() {
614        let packet = Packet::ConnectAck(ConnectAckPacket {
615            session_present: false,
616            reason_code: ConnectReason::Success,
617
618            session_expiry_interval: None,
619            receive_maximum: None,
620            maximum_qos: None,
621            retain_available: None,
622            maximum_packet_size: None,
623            assigned_client_identifier: None,
624            topic_alias_maximum: None,
625            reason_string: None,
626            user_properties: vec![],
627            wildcard_subscription_available: None,
628            subscription_identifiers_available: None,
629            shared_subscription_available: None,
630            server_keep_alive: None,
631            response_information: None,
632            server_reference: None,
633            authentication_method: None,
634            authentication_data: None,
635        });
636
637        let mut bytes = BytesMut::new();
638        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
639        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
640
641        assert_eq!(packet, decoded);
642    }
643
644    #[test]
645    fn publish_roundtrip() {
646        let packet = Packet::Publish(PublishPacket {
647            is_duplicate: false,
648            qos: QoS::AtLeastOnce,
649            retain: false,
650
651            topic: "test_topic".parse().unwrap(),
652            packet_id: Some(42),
653
654            payload_format_indicator: None,
655            message_expiry_interval: None,
656            topic_alias: None,
657            response_topic: None,
658            correlation_data: None,
659            user_properties: vec![],
660            subscription_identifier: None,
661            content_type: None,
662
663            payload: vec![22; 100].into(),
664        });
665
666        let mut bytes = BytesMut::new();
667        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
668        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
669
670        assert_eq!(packet, decoded);
671    }
672
673    #[test]
674    fn publish_ack_roundtrip() {
675        let packet = Packet::PublishAck(PublishAckPacket {
676            packet_id: 1500,
677            reason_code: PublishAckReason::Success,
678
679            reason_string: None,
680            user_properties: vec![],
681        });
682
683        let mut bytes = BytesMut::new();
684        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
685        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
686
687        assert_eq!(packet, decoded);
688    }
689
690    #[test]
691    fn publish_received_roundtrip() {
692        let packet = Packet::PublishReceived(PublishReceivedPacket {
693            packet_id: 1500,
694            reason_code: PublishReceivedReason::Success,
695
696            reason_string: None,
697            user_properties: vec![],
698        });
699
700        let mut bytes = BytesMut::new();
701        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
702        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
703
704        assert_eq!(packet, decoded);
705    }
706
707    #[test]
708    fn publish_release_roundtrip() {
709        let packet = Packet::PublishRelease(PublishReleasePacket {
710            packet_id: 1500,
711            reason_code: PublishReleaseReason::Success,
712
713            reason_string: None,
714            user_properties: vec![],
715        });
716
717        let mut bytes = BytesMut::new();
718        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
719        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
720
721        assert_eq!(packet, decoded);
722    }
723
724    #[test]
725    fn publish_complete_roundtrip() {
726        let packet = Packet::PublishComplete(PublishCompletePacket {
727            packet_id: 1500,
728            reason_code: PublishCompleteReason::Success,
729
730            reason_string: None,
731            user_properties: vec![],
732        });
733
734        let mut bytes = BytesMut::new();
735        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
736        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
737
738        assert_eq!(packet, decoded);
739    }
740
741    #[test]
742    fn subscribe_roundtrip() {
743        let packet = Packet::Subscribe(SubscribePacket {
744            packet_id: 4500,
745
746            subscription_identifier: None,
747            user_properties: vec![],
748
749            subscription_topics: vec![SubscriptionTopic {
750                topic_filter: "test_topic".parse().unwrap(),
751                maximum_qos: QoS::AtLeastOnce,
752                no_local: false,
753                retain_as_published: false,
754                retain_handling: RetainHandling::SendAtSubscribeTime,
755            }],
756        });
757
758        let mut bytes = BytesMut::new();
759        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
760        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
761
762        assert_eq!(packet, decoded);
763    }
764
765    #[test]
766    fn subscribe_ack_roundtrip() {
767        let packet = Packet::SubscribeAck(SubscribeAckPacket {
768            packet_id: 1234,
769
770            reason_string: None,
771            user_properties: vec![],
772
773            reason_codes: vec![SubscribeAckReason::GrantedQoSZero],
774        });
775
776        let mut bytes = BytesMut::new();
777        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
778        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
779
780        assert_eq!(packet, decoded);
781    }
782
783    #[test]
784    fn unsubscribe_roundtrip() {
785        let packet = Packet::Unsubscribe(UnsubscribePacket {
786            packet_id: 1234,
787
788            user_properties: vec![],
789
790            topic_filters: vec!["test_topic".parse().unwrap()],
791        });
792
793        let mut bytes = BytesMut::new();
794        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
795        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
796
797        assert_eq!(packet, decoded);
798    }
799
800    #[test]
801    fn unsubscribe_ack_roundtrip() {
802        let packet = Packet::UnsubscribeAck(UnsubscribeAckPacket {
803            packet_id: 4321,
804
805            reason_string: None,
806            user_properties: vec![],
807
808            reason_codes: vec![UnsubscribeAckReason::Success],
809        });
810
811        let mut bytes = BytesMut::new();
812        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
813        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
814
815        assert_eq!(packet, decoded);
816    }
817
818    #[test]
819    fn ping_request_roundtrip() {
820        let packet = Packet::PingRequest;
821        let mut bytes = BytesMut::new();
822        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
823        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
824
825        assert_eq!(packet, decoded);
826    }
827
828    #[test]
829    fn ping_response_roundtrip() {
830        let packet = Packet::PingResponse;
831        let mut bytes = BytesMut::new();
832        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
833        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
834
835        assert_eq!(packet, decoded);
836    }
837
838    #[test]
839    fn disconnect_roundtrip() {
840        let packet = Packet::Disconnect(DisconnectPacket {
841            reason_code: DisconnectReason::NormalDisconnection,
842
843            session_expiry_interval: None,
844            reason_string: None,
845            user_properties: vec![],
846            server_reference: None,
847        });
848        let mut bytes = BytesMut::new();
849        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
850        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
851
852        assert_eq!(packet, decoded);
853    }
854
855    #[test]
856    fn authenticate_roundtrip() {
857        let packet = Packet::Authenticate(AuthenticatePacket {
858            reason_code: AuthenticateReason::Success,
859
860            authentication_method: None,
861            authentication_data: None,
862            reason_string: None,
863            user_properties: vec![],
864        });
865        let mut bytes = BytesMut::new();
866        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
867        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
868
869        assert_eq!(packet, decoded);
870    }
871}