mqtt_v5_fork/
encoder.rs

1use crate::types::{
2    properties::*, AuthenticatePacket, ConnectAckPacket, ConnectPacket, DisconnectPacket, Encode,
3    Packet, PropertySize, ProtocolVersion, PublishAckPacket, PublishCompletePacket, PublishPacket,
4    PublishReceivedPacket, PublishReleasePacket, SubscribeAckPacket, SubscribePacket,
5    UnsubscribeAckPacket, UnsubscribePacket, VariableByteInt,
6};
7use bytes::{BufMut, BytesMut};
8
9fn encode_variable_int(value: u32, bytes: &mut BytesMut) -> usize {
10    let mut x = value;
11    let mut byte_counter = 0;
12
13    loop {
14        let mut encoded_byte: u8 = (x % 128) as u8;
15        x /= 128;
16
17        if x > 0 {
18            encoded_byte |= 128;
19        }
20
21        bytes.put_u8(encoded_byte);
22
23        byte_counter += 1;
24
25        if x == 0 {
26            break;
27        }
28    }
29
30    byte_counter
31}
32
33fn encode_string(value: &str, bytes: &mut BytesMut) {
34    bytes.put_u16(value.len() as u16);
35    bytes.put_slice(value.as_bytes());
36}
37
38fn encode_binary_data(value: &[u8], bytes: &mut BytesMut) {
39    bytes.put_u16(value.len() as u16);
40    bytes.put_slice(value);
41}
42
43impl Encode for PayloadFormatIndicator {
44    fn encode(&self, bytes: &mut BytesMut) {
45        bytes.put_u8(PropertyType::PayloadFormatIndicator as u8);
46        bytes.put_u8(self.0);
47    }
48}
49impl Encode for MessageExpiryInterval {
50    fn encode(&self, bytes: &mut BytesMut) {
51        bytes.put_u8(PropertyType::MessageExpiryInterval as u8);
52        bytes.put_u32(self.0);
53    }
54}
55impl Encode for ContentType {
56    fn encode(&self, bytes: &mut BytesMut) {
57        bytes.put_u8(PropertyType::ContentType as u8);
58        encode_string(&self.0, bytes);
59    }
60}
61impl Encode for ResponseTopic {
62    fn encode(&self, bytes: &mut BytesMut) {
63        bytes.put_u8(PropertyType::ResponseTopic as u8);
64        encode_string(&self.0, bytes);
65    }
66}
67impl Encode for CorrelationData {
68    fn encode(&self, bytes: &mut BytesMut) {
69        bytes.put_u8(PropertyType::CorrelationData as u8);
70        encode_binary_data(&self.0, bytes);
71    }
72}
73impl Encode for SubscriptionIdentifier {
74    fn encode(&self, bytes: &mut BytesMut) {
75        bytes.put_u8(PropertyType::SubscriptionIdentifier as u8);
76        encode_variable_int((self.0).0, bytes);
77    }
78}
79impl Encode for SessionExpiryInterval {
80    fn encode(&self, bytes: &mut BytesMut) {
81        bytes.put_u8(PropertyType::SessionExpiryInterval as u8);
82        bytes.put_u32(self.0);
83    }
84}
85impl Encode for AssignedClientIdentifier {
86    fn encode(&self, bytes: &mut BytesMut) {
87        bytes.put_u8(PropertyType::AssignedClientIdentifier as u8);
88        encode_string(&self.0, bytes);
89    }
90}
91impl Encode for ServerKeepAlive {
92    fn encode(&self, bytes: &mut BytesMut) {
93        bytes.put_u8(PropertyType::ServerKeepAlive as u8);
94        bytes.put_u16(self.0)
95    }
96}
97impl Encode for AuthenticationMethod {
98    fn encode(&self, bytes: &mut BytesMut) {
99        bytes.put_u8(PropertyType::AuthenticationMethod as u8);
100        encode_string(&self.0, bytes);
101    }
102}
103impl Encode for AuthenticationData {
104    fn encode(&self, bytes: &mut BytesMut) {
105        bytes.put_u8(PropertyType::AuthenticationData as u8);
106        encode_binary_data(&self.0, bytes);
107    }
108}
109impl Encode for RequestProblemInformation {
110    fn encode(&self, bytes: &mut BytesMut) {
111        bytes.put_u8(PropertyType::RequestProblemInformation as u8);
112        bytes.put_u8(self.0);
113    }
114}
115impl Encode for WillDelayInterval {
116    fn encode(&self, bytes: &mut BytesMut) {
117        bytes.put_u8(PropertyType::WillDelayInterval as u8);
118        bytes.put_u32(self.0);
119    }
120}
121impl Encode for RequestResponseInformation {
122    fn encode(&self, bytes: &mut BytesMut) {
123        bytes.put_u8(PropertyType::RequestResponseInformation as u8);
124        bytes.put_u8(self.0);
125    }
126}
127impl Encode for ResponseInformation {
128    fn encode(&self, bytes: &mut BytesMut) {
129        bytes.put_u8(PropertyType::ResponseInformation as u8);
130        encode_string(&self.0, bytes);
131    }
132}
133impl Encode for ServerReference {
134    fn encode(&self, bytes: &mut BytesMut) {
135        bytes.put_u8(PropertyType::ServerReference as u8);
136        encode_string(&self.0, bytes);
137    }
138}
139impl Encode for ReasonString {
140    fn encode(&self, bytes: &mut BytesMut) {
141        bytes.put_u8(PropertyType::ReasonString as u8);
142        encode_string(&self.0, bytes);
143    }
144}
145impl Encode for ReceiveMaximum {
146    fn encode(&self, bytes: &mut BytesMut) {
147        bytes.put_u8(PropertyType::ReceiveMaximum as u8);
148        bytes.put_u16(self.0);
149    }
150}
151impl Encode for TopicAliasMaximum {
152    fn encode(&self, bytes: &mut BytesMut) {
153        bytes.put_u8(PropertyType::TopicAliasMaximum as u8);
154        bytes.put_u16(self.0);
155    }
156}
157impl Encode for TopicAlias {
158    fn encode(&self, bytes: &mut BytesMut) {
159        bytes.put_u8(PropertyType::TopicAlias as u8);
160        bytes.put_u16(self.0);
161    }
162}
163impl Encode for MaximumQos {
164    fn encode(&self, bytes: &mut BytesMut) {
165        bytes.put_u8(PropertyType::MaximumQos as u8);
166        bytes.put_u8(self.0 as u8);
167    }
168}
169impl Encode for RetainAvailable {
170    fn encode(&self, bytes: &mut BytesMut) {
171        bytes.put_u8(PropertyType::RetainAvailable as u8);
172        bytes.put_u8(self.0);
173    }
174}
175impl Encode for UserProperty {
176    fn encode(&self, bytes: &mut BytesMut) {
177        bytes.put_u8(PropertyType::UserProperty as u8);
178        encode_string(&self.0, bytes);
179        encode_string(&self.1, bytes);
180    }
181}
182impl Encode for MaximumPacketSize {
183    fn encode(&self, bytes: &mut BytesMut) {
184        bytes.put_u8(PropertyType::MaximumPacketSize as u8);
185        bytes.put_u32(self.0);
186    }
187}
188impl Encode for WildcardSubscriptionAvailable {
189    fn encode(&self, bytes: &mut BytesMut) {
190        bytes.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
191        bytes.put_u8(self.0);
192    }
193}
194impl Encode for SubscriptionIdentifierAvailable {
195    fn encode(&self, bytes: &mut BytesMut) {
196        bytes.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
197        bytes.put_u8(self.0);
198    }
199}
200impl Encode for SharedSubscriptionAvailable {
201    fn encode(&self, bytes: &mut BytesMut) {
202        bytes.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
203        bytes.put_u8(self.0);
204    }
205}
206
207fn encode_connect(packet: &ConnectPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
208    encode_string(&packet.protocol_name, bytes);
209    bytes.put_u8(packet.protocol_version as u8);
210
211    let mut connect_flags: u8 = 0b0000_0000;
212
213    if packet.user_name.is_some() {
214        connect_flags |= 0b1000_0000;
215    }
216
217    if packet.password.is_some() {
218        connect_flags |= 0b0100_0000;
219    }
220
221    if let Some(will) = &packet.will {
222        if will.should_retain {
223            connect_flags |= 0b0100_0000;
224        }
225
226        let qos_byte: u8 = will.qos as u8;
227        connect_flags |= (qos_byte & 0b0000_0011) << 3;
228        connect_flags |= 0b0000_0100;
229    }
230
231    if packet.clean_start {
232        connect_flags |= 0b0000_0010;
233    }
234
235    bytes.put_u8(connect_flags);
236    bytes.put_u16(packet.keep_alive);
237
238    if protocol_version == ProtocolVersion::V500 {
239        let property_length = packet.property_size(protocol_version);
240        encode_variable_int(property_length, bytes);
241
242        packet.session_expiry_interval.encode(bytes);
243        packet.receive_maximum.encode(bytes);
244        packet.maximum_packet_size.encode(bytes);
245        packet.topic_alias_maximum.encode(bytes);
246        packet.request_response_information.encode(bytes);
247        packet.request_problem_information.encode(bytes);
248        packet.user_properties.encode(bytes);
249        packet.authentication_method.encode(bytes);
250        packet.authentication_data.encode(bytes);
251    }
252
253    encode_string(&packet.client_id, bytes);
254
255    if let Some(will) = &packet.will {
256        if protocol_version == ProtocolVersion::V500 {
257            let property_length = will.property_size(protocol_version);
258            encode_variable_int(property_length, bytes);
259
260            will.will_delay_interval.encode(bytes);
261            will.payload_format_indicator.encode(bytes);
262            will.message_expiry_interval.encode(bytes);
263            will.content_type.encode(bytes);
264            will.response_topic.encode(bytes);
265            will.correlation_data.encode(bytes);
266            will.user_properties.encode(bytes);
267        }
268
269        encode_string(will.topic.topic_name(), bytes);
270        encode_binary_data(&will.payload, bytes);
271    }
272
273    if let Some(user_name) = &packet.user_name {
274        encode_string(user_name, bytes);
275    }
276
277    if let Some(password) = &packet.password {
278        encode_string(password, bytes);
279    }
280}
281
282fn encode_connect_ack(
283    packet: &ConnectAckPacket,
284    bytes: &mut BytesMut,
285    protocol_version: ProtocolVersion,
286) {
287    let mut connect_ack_flags: u8 = 0b0000_0000;
288    if packet.session_present {
289        connect_ack_flags |= 0b0000_0001;
290    }
291
292    bytes.put_u8(connect_ack_flags);
293    bytes.put_u8(packet.reason_code as u8);
294
295    if protocol_version == ProtocolVersion::V500 {
296        let property_length = packet.property_size(protocol_version);
297        encode_variable_int(property_length, bytes);
298
299        packet.session_expiry_interval.encode(bytes);
300        packet.receive_maximum.encode(bytes);
301        packet.maximum_qos.encode(bytes);
302        packet.retain_available.encode(bytes);
303        packet.maximum_packet_size.encode(bytes);
304        packet.assigned_client_identifier.encode(bytes);
305        packet.topic_alias_maximum.encode(bytes);
306        packet.reason_string.encode(bytes);
307        packet.user_properties.encode(bytes);
308        packet.wildcard_subscription_available.encode(bytes);
309        packet.subscription_identifiers_available.encode(bytes);
310        packet.shared_subscription_available.encode(bytes);
311        packet.server_keep_alive.encode(bytes);
312        packet.response_information.encode(bytes);
313        packet.server_reference.encode(bytes);
314        packet.authentication_method.encode(bytes);
315        packet.authentication_data.encode(bytes);
316    }
317}
318
319fn encode_publish(packet: &PublishPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
320    encode_string(&packet.topic.to_string(), bytes);
321
322    if let Some(packet_id) = packet.packet_id {
323        bytes.put_u16(packet_id);
324    }
325
326    if protocol_version == ProtocolVersion::V500 {
327        let property_length = packet.property_size(protocol_version);
328        encode_variable_int(property_length, bytes);
329
330        packet.payload_format_indicator.encode(bytes);
331        packet.message_expiry_interval.encode(bytes);
332        packet.topic_alias.encode(bytes);
333        packet.response_topic.encode(bytes);
334        packet.correlation_data.encode(bytes);
335        packet.user_properties.encode(bytes);
336        packet.subscription_identifiers.encode(bytes);
337        packet.content_type.encode(bytes);
338    }
339
340    bytes.put_slice(&packet.payload);
341}
342
343fn encode_publish_ack(
344    packet: &PublishAckPacket,
345    bytes: &mut BytesMut,
346    protocol_version: ProtocolVersion,
347) {
348    bytes.put_u16(packet.packet_id);
349
350    if protocol_version == ProtocolVersion::V500 {
351        bytes.put_u8(packet.reason_code as u8);
352
353        let property_length = packet.property_size(protocol_version);
354        encode_variable_int(property_length, bytes);
355
356        packet.reason_string.encode(bytes);
357        packet.user_properties.encode(bytes);
358    }
359}
360
361fn encode_publish_received(
362    packet: &PublishReceivedPacket,
363    bytes: &mut BytesMut,
364    protocol_version: ProtocolVersion,
365) {
366    bytes.put_u16(packet.packet_id);
367
368    if protocol_version == ProtocolVersion::V500 {
369        bytes.put_u8(packet.reason_code as u8);
370
371        let property_length = packet.property_size(protocol_version);
372        encode_variable_int(property_length, bytes);
373
374        packet.reason_string.encode(bytes);
375        packet.user_properties.encode(bytes);
376    }
377}
378
379fn encode_publish_release(
380    packet: &PublishReleasePacket,
381    bytes: &mut BytesMut,
382    protocol_version: ProtocolVersion,
383) {
384    bytes.put_u16(packet.packet_id);
385
386    if protocol_version == ProtocolVersion::V500 {
387        bytes.put_u8(packet.reason_code as u8);
388
389        let property_length = packet.property_size(protocol_version);
390        encode_variable_int(property_length, bytes);
391
392        packet.reason_string.encode(bytes);
393        packet.user_properties.encode(bytes);
394    }
395}
396
397fn encode_publish_complete(
398    packet: &PublishCompletePacket,
399    bytes: &mut BytesMut,
400    protocol_version: ProtocolVersion,
401) {
402    bytes.put_u16(packet.packet_id);
403
404    if protocol_version == ProtocolVersion::V500 {
405        bytes.put_u8(packet.reason_code as u8);
406
407        let property_length = packet.property_size(protocol_version);
408        encode_variable_int(property_length, bytes);
409
410        packet.reason_string.encode(bytes);
411        packet.user_properties.encode(bytes);
412    }
413}
414
415fn encode_subscribe(
416    packet: &SubscribePacket,
417    bytes: &mut BytesMut,
418    protocol_version: ProtocolVersion,
419) {
420    bytes.put_u16(packet.packet_id);
421
422    if protocol_version == ProtocolVersion::V500 {
423        let property_length = packet.property_size(protocol_version);
424        encode_variable_int(property_length, bytes);
425
426        packet.subscription_identifier.encode(bytes);
427        packet.user_properties.encode(bytes);
428    }
429
430    for topic in &packet.subscription_topics {
431        encode_string(&topic.topic_filter.to_string(), bytes);
432
433        let mut options_byte = 0b0000_0000;
434        let retain_handling_byte = topic.retain_handling as u8;
435        options_byte |= (retain_handling_byte & 0b0000_0011) << 4;
436
437        if topic.retain_as_published {
438            options_byte |= 0b0000_1000;
439        }
440
441        if topic.no_local {
442            options_byte |= 0b0000_0100;
443        }
444
445        let qos_byte = topic.maximum_qos as u8;
446        options_byte |= qos_byte & 0b0000_0011;
447
448        bytes.put_u8(options_byte);
449    }
450}
451
452fn encode_subscribe_ack(
453    packet: &SubscribeAckPacket,
454    bytes: &mut BytesMut,
455    protocol_version: ProtocolVersion,
456) {
457    bytes.put_u16(packet.packet_id);
458
459    if protocol_version == ProtocolVersion::V500 {
460        let property_length = packet.property_size(protocol_version);
461        encode_variable_int(property_length, bytes);
462
463        packet.reason_string.encode(bytes);
464        packet.user_properties.encode(bytes);
465    }
466
467    for code in &packet.reason_codes {
468        bytes.put_u8((*code) as u8);
469    }
470}
471
472fn encode_unsubscribe(
473    packet: &UnsubscribePacket,
474    bytes: &mut BytesMut,
475    protocol_version: ProtocolVersion,
476) {
477    bytes.put_u16(packet.packet_id);
478
479    if protocol_version == ProtocolVersion::V500 {
480        let property_length = packet.property_size(protocol_version);
481        encode_variable_int(property_length, bytes);
482
483        packet.user_properties.encode(bytes);
484    }
485
486    for topic_filter in &packet.topic_filters {
487        encode_string(&topic_filter.to_string(), bytes);
488    }
489}
490
491fn encode_unsubscribe_ack(
492    packet: &UnsubscribeAckPacket,
493    bytes: &mut BytesMut,
494    protocol_version: ProtocolVersion,
495) {
496    bytes.put_u16(packet.packet_id);
497
498    if protocol_version == ProtocolVersion::V500 {
499        let property_length = packet.property_size(protocol_version);
500        encode_variable_int(property_length, bytes);
501
502        packet.reason_string.encode(bytes);
503        packet.user_properties.encode(bytes);
504    }
505
506    for code in &packet.reason_codes {
507        bytes.put_u8((*code) as u8);
508    }
509}
510
511fn encode_disconnect(
512    packet: &DisconnectPacket,
513    bytes: &mut BytesMut,
514    protocol_version: ProtocolVersion,
515) {
516    if protocol_version == ProtocolVersion::V500 {
517        bytes.put_u8(packet.reason_code as u8);
518
519        let property_length = packet.property_size(protocol_version);
520        encode_variable_int(property_length, bytes);
521
522        packet.session_expiry_interval.encode(bytes);
523        packet.reason_string.encode(bytes);
524        packet.user_properties.encode(bytes);
525        packet.server_reference.encode(bytes);
526    }
527}
528
529fn encode_authenticate(
530    packet: &AuthenticatePacket,
531    bytes: &mut BytesMut,
532    protocol_version: ProtocolVersion,
533) {
534    bytes.put_u8(packet.reason_code as u8);
535
536    if protocol_version == ProtocolVersion::V500 {
537        let property_length = packet.property_size(protocol_version);
538        encode_variable_int(property_length, bytes);
539
540        packet.authentication_method.encode(bytes);
541        packet.authentication_data.encode(bytes);
542        packet.reason_string.encode(bytes);
543        packet.user_properties.encode(bytes);
544    }
545}
546
547pub fn encode_mqtt(packet: &Packet, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
548    let remaining_length = packet.calculate_size(protocol_version);
549    let packet_size =
550        1 + VariableByteInt(remaining_length).calculate_size(protocol_version) + remaining_length;
551    bytes.reserve(packet_size as usize);
552
553    let first_byte = packet.to_byte();
554    let mut first_byte_val = (first_byte << 4) & 0b1111_0000;
555    first_byte_val |= packet.fixed_header_flags();
556
557    bytes.put_u8(first_byte_val);
558    encode_variable_int(remaining_length, bytes);
559
560    match packet {
561        Packet::Connect(p) => encode_connect(p, bytes, protocol_version),
562        Packet::ConnectAck(p) => encode_connect_ack(p, bytes, protocol_version),
563        Packet::Publish(p) => encode_publish(p, bytes, protocol_version),
564        Packet::PublishAck(p) => encode_publish_ack(p, bytes, protocol_version),
565        Packet::PublishReceived(p) => encode_publish_received(p, bytes, protocol_version),
566        Packet::PublishRelease(p) => encode_publish_release(p, bytes, protocol_version),
567        Packet::PublishComplete(p) => encode_publish_complete(p, bytes, protocol_version),
568        Packet::Subscribe(p) => encode_subscribe(p, bytes, protocol_version),
569        Packet::SubscribeAck(p) => encode_subscribe_ack(p, bytes, protocol_version),
570        Packet::Unsubscribe(p) => encode_unsubscribe(p, bytes, protocol_version),
571        Packet::UnsubscribeAck(p) => encode_unsubscribe_ack(p, bytes, protocol_version),
572        Packet::PingRequest => {},
573        Packet::PingResponse => {},
574        Packet::Disconnect(p) => encode_disconnect(p, bytes, protocol_version),
575        Packet::Authenticate(p) => encode_authenticate(p, bytes, protocol_version),
576    }
577}
578
579#[cfg(test)]
580mod tests {
581    use crate::{decoder::*, encoder::*, types::*};
582    use bytes::BytesMut;
583
584    #[test]
585    fn connect_roundtrip() {
586        let packet = Packet::Connect(ConnectPacket {
587            protocol_name: "MQTT".to_string(),
588            protocol_version: ProtocolVersion::V500,
589            clean_start: true,
590            keep_alive: 200,
591
592            session_expiry_interval: None,
593            receive_maximum: None,
594            maximum_packet_size: None,
595            topic_alias_maximum: None,
596            request_response_information: None,
597            request_problem_information: None,
598            user_properties: vec![],
599            authentication_method: None,
600            authentication_data: None,
601
602            client_id: "test_client".to_string(),
603            will: None,
604            user_name: None,
605            password: None,
606        });
607
608        let mut bytes = BytesMut::new();
609        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
610        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
611
612        assert_eq!(packet, decoded);
613    }
614
615    #[test]
616    fn connect_ack_roundtrip() {
617        let packet = Packet::ConnectAck(ConnectAckPacket {
618            session_present: false,
619            reason_code: ConnectReason::Success,
620
621            session_expiry_interval: None,
622            receive_maximum: None,
623            maximum_qos: None,
624            retain_available: None,
625            maximum_packet_size: None,
626            assigned_client_identifier: None,
627            topic_alias_maximum: None,
628            reason_string: None,
629            user_properties: vec![],
630            wildcard_subscription_available: None,
631            subscription_identifiers_available: None,
632            shared_subscription_available: None,
633            server_keep_alive: None,
634            response_information: None,
635            server_reference: None,
636            authentication_method: None,
637            authentication_data: None,
638        });
639
640        let mut bytes = BytesMut::new();
641        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
642        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
643
644        assert_eq!(packet, decoded);
645    }
646
647    #[test]
648    fn publish_roundtrip() {
649        let packet = Packet::Publish(PublishPacket {
650            is_duplicate: false,
651            qos: QoS::AtLeastOnce,
652            retain: false,
653
654            topic: "test_topic".parse().unwrap(),
655            packet_id: Some(42),
656
657            payload_format_indicator: None,
658            message_expiry_interval: None,
659            topic_alias: None,
660            response_topic: None,
661            correlation_data: None,
662            user_properties: vec![],
663            subscription_identifiers: Vec::with_capacity(0),
664            content_type: None,
665
666            payload: vec![22; 100].into(),
667        });
668
669        let mut bytes = BytesMut::new();
670        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
671        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
672
673        assert_eq!(packet, decoded);
674    }
675
676    #[test]
677    fn publish_ack_roundtrip() {
678        let packet = Packet::PublishAck(PublishAckPacket {
679            packet_id: 1500,
680            reason_code: PublishAckReason::Success,
681
682            reason_string: None,
683            user_properties: vec![],
684        });
685
686        let mut bytes = BytesMut::new();
687        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
688        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
689
690        assert_eq!(packet, decoded);
691    }
692
693    #[test]
694    fn publish_received_roundtrip() {
695        let packet = Packet::PublishReceived(PublishReceivedPacket {
696            packet_id: 1500,
697            reason_code: PublishReceivedReason::Success,
698
699            reason_string: None,
700            user_properties: vec![],
701        });
702
703        let mut bytes = BytesMut::new();
704        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
705        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
706
707        assert_eq!(packet, decoded);
708    }
709
710    #[test]
711    fn publish_release_roundtrip() {
712        let packet = Packet::PublishRelease(PublishReleasePacket {
713            packet_id: 1500,
714            reason_code: PublishReleaseReason::Success,
715
716            reason_string: None,
717            user_properties: vec![],
718        });
719
720        let mut bytes = BytesMut::new();
721        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
722        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
723
724        assert_eq!(packet, decoded);
725    }
726
727    #[test]
728    fn publish_complete_roundtrip() {
729        let packet = Packet::PublishComplete(PublishCompletePacket {
730            packet_id: 1500,
731            reason_code: PublishCompleteReason::Success,
732
733            reason_string: None,
734            user_properties: vec![],
735        });
736
737        let mut bytes = BytesMut::new();
738        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
739        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
740
741        assert_eq!(packet, decoded);
742    }
743
744    #[test]
745    fn subscribe_roundtrip() {
746        let packet = Packet::Subscribe(SubscribePacket {
747            packet_id: 4500,
748
749            subscription_identifier: None,
750            user_properties: vec![],
751
752            subscription_topics: vec![SubscriptionTopic {
753                topic_filter: "test_topic".parse().unwrap(),
754                maximum_qos: QoS::AtLeastOnce,
755                no_local: false,
756                retain_as_published: false,
757                retain_handling: RetainHandling::SendAtSubscribeTime,
758            }],
759        });
760
761        let mut bytes = BytesMut::new();
762        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
763        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
764
765        assert_eq!(packet, decoded);
766    }
767
768    #[test]
769    fn subscribe_ack_roundtrip() {
770        let packet = Packet::SubscribeAck(SubscribeAckPacket {
771            packet_id: 1234,
772
773            reason_string: None,
774            user_properties: vec![],
775
776            reason_codes: vec![SubscribeAckReason::GrantedQoSZero],
777        });
778
779        let mut bytes = BytesMut::new();
780        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
781        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
782
783        assert_eq!(packet, decoded);
784    }
785
786    #[test]
787    fn unsubscribe_roundtrip() {
788        let packet = Packet::Unsubscribe(UnsubscribePacket {
789            packet_id: 1234,
790
791            user_properties: vec![],
792
793            topic_filters: vec!["test_topic".parse().unwrap()],
794        });
795
796        let mut bytes = BytesMut::new();
797        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
798        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
799
800        assert_eq!(packet, decoded);
801    }
802
803    #[test]
804    fn unsubscribe_ack_roundtrip() {
805        let packet = Packet::UnsubscribeAck(UnsubscribeAckPacket {
806            packet_id: 4321,
807
808            reason_string: None,
809            user_properties: vec![],
810
811            reason_codes: vec![UnsubscribeAckReason::Success],
812        });
813
814        let mut bytes = BytesMut::new();
815        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
816        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
817
818        assert_eq!(packet, decoded);
819    }
820
821    #[test]
822    fn ping_request_roundtrip() {
823        let packet = Packet::PingRequest;
824        let mut bytes = BytesMut::new();
825        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
826        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
827
828        assert_eq!(packet, decoded);
829    }
830
831    #[test]
832    fn ping_response_roundtrip() {
833        let packet = Packet::PingResponse;
834        let mut bytes = BytesMut::new();
835        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
836        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
837
838        assert_eq!(packet, decoded);
839    }
840
841    #[test]
842    fn disconnect_roundtrip() {
843        let packet = Packet::Disconnect(DisconnectPacket {
844            reason_code: DisconnectReason::NormalDisconnection,
845
846            session_expiry_interval: None,
847            reason_string: None,
848            user_properties: vec![],
849            server_reference: None,
850        });
851        let mut bytes = BytesMut::new();
852        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
853        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
854
855        assert_eq!(packet, decoded);
856    }
857
858    #[test]
859    fn authenticate_roundtrip() {
860        let packet = Packet::Authenticate(AuthenticatePacket {
861            reason_code: AuthenticateReason::Success,
862
863            authentication_method: None,
864            authentication_data: None,
865            reason_string: None,
866            user_properties: vec![],
867        });
868        let mut bytes = BytesMut::new();
869        encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
870        let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
871
872        assert_eq!(packet, decoded);
873    }
874}