1use crate::types::{
2 properties::*, AuthenticatePacket, ConnectAckPacket, ConnectPacket, DisconnectPacket, Encode,
3 Packet, PropertySize, ProtocolVersion, PublishAckPacket, PublishCompletePacket, PublishPacket,
4 PublishReceivedPacket, PublishReleasePacket, SubscribeAckPacket, SubscribePacket,
5 UnsubscribeAckPacket, UnsubscribePacket, VariableByteInt,
6};
7use bytes::{BufMut, BytesMut};
8
9fn encode_variable_int(value: u32, bytes: &mut BytesMut) -> usize {
10 let mut x = value;
11 let mut byte_counter = 0;
12
13 loop {
14 let mut encoded_byte: u8 = (x % 128) as u8;
15 x /= 128;
16
17 if x > 0 {
18 encoded_byte |= 128;
19 }
20
21 bytes.put_u8(encoded_byte);
22
23 byte_counter += 1;
24
25 if x == 0 {
26 break;
27 }
28 }
29
30 byte_counter
31}
32
33fn encode_string(value: &str, bytes: &mut BytesMut) {
34 bytes.put_u16(value.len() as u16);
35 bytes.put_slice(value.as_bytes());
36}
37
38fn encode_binary_data(value: &[u8], bytes: &mut BytesMut) {
39 bytes.put_u16(value.len() as u16);
40 bytes.put_slice(value);
41}
42
43impl Encode for PayloadFormatIndicator {
44 fn encode(&self, bytes: &mut BytesMut) {
45 bytes.put_u8(PropertyType::PayloadFormatIndicator as u8);
46 bytes.put_u8(self.0);
47 }
48}
49impl Encode for MessageExpiryInterval {
50 fn encode(&self, bytes: &mut BytesMut) {
51 bytes.put_u8(PropertyType::MessageExpiryInterval as u8);
52 bytes.put_u32(self.0);
53 }
54}
55impl Encode for ContentType {
56 fn encode(&self, bytes: &mut BytesMut) {
57 bytes.put_u8(PropertyType::ContentType as u8);
58 encode_string(&self.0, bytes);
59 }
60}
61impl Encode for ResponseTopic {
62 fn encode(&self, bytes: &mut BytesMut) {
63 bytes.put_u8(PropertyType::ResponseTopic as u8);
64 encode_string(&self.0, bytes);
65 }
66}
67impl Encode for CorrelationData {
68 fn encode(&self, bytes: &mut BytesMut) {
69 bytes.put_u8(PropertyType::CorrelationData as u8);
70 encode_binary_data(&self.0, bytes);
71 }
72}
73impl Encode for SubscriptionIdentifier {
74 fn encode(&self, bytes: &mut BytesMut) {
75 bytes.put_u8(PropertyType::SubscriptionIdentifier as u8);
76 encode_variable_int((self.0).0, bytes);
77 }
78}
79impl Encode for SessionExpiryInterval {
80 fn encode(&self, bytes: &mut BytesMut) {
81 bytes.put_u8(PropertyType::SessionExpiryInterval as u8);
82 bytes.put_u32(self.0);
83 }
84}
85impl Encode for AssignedClientIdentifier {
86 fn encode(&self, bytes: &mut BytesMut) {
87 bytes.put_u8(PropertyType::AssignedClientIdentifier as u8);
88 encode_string(&self.0, bytes);
89 }
90}
91impl Encode for ServerKeepAlive {
92 fn encode(&self, bytes: &mut BytesMut) {
93 bytes.put_u8(PropertyType::ServerKeepAlive as u8);
94 bytes.put_u16(self.0)
95 }
96}
97impl Encode for AuthenticationMethod {
98 fn encode(&self, bytes: &mut BytesMut) {
99 bytes.put_u8(PropertyType::AuthenticationMethod as u8);
100 encode_string(&self.0, bytes);
101 }
102}
103impl Encode for AuthenticationData {
104 fn encode(&self, bytes: &mut BytesMut) {
105 bytes.put_u8(PropertyType::AuthenticationData as u8);
106 encode_binary_data(&self.0, bytes);
107 }
108}
109impl Encode for RequestProblemInformation {
110 fn encode(&self, bytes: &mut BytesMut) {
111 bytes.put_u8(PropertyType::RequestProblemInformation as u8);
112 bytes.put_u8(self.0);
113 }
114}
115impl Encode for WillDelayInterval {
116 fn encode(&self, bytes: &mut BytesMut) {
117 bytes.put_u8(PropertyType::WillDelayInterval as u8);
118 bytes.put_u32(self.0);
119 }
120}
121impl Encode for RequestResponseInformation {
122 fn encode(&self, bytes: &mut BytesMut) {
123 bytes.put_u8(PropertyType::RequestResponseInformation as u8);
124 bytes.put_u8(self.0);
125 }
126}
127impl Encode for ResponseInformation {
128 fn encode(&self, bytes: &mut BytesMut) {
129 bytes.put_u8(PropertyType::ResponseInformation as u8);
130 encode_string(&self.0, bytes);
131 }
132}
133impl Encode for ServerReference {
134 fn encode(&self, bytes: &mut BytesMut) {
135 bytes.put_u8(PropertyType::ServerReference as u8);
136 encode_string(&self.0, bytes);
137 }
138}
139impl Encode for ReasonString {
140 fn encode(&self, bytes: &mut BytesMut) {
141 bytes.put_u8(PropertyType::ReasonString as u8);
142 encode_string(&self.0, bytes);
143 }
144}
145impl Encode for ReceiveMaximum {
146 fn encode(&self, bytes: &mut BytesMut) {
147 bytes.put_u8(PropertyType::ReceiveMaximum as u8);
148 bytes.put_u16(self.0);
149 }
150}
151impl Encode for TopicAliasMaximum {
152 fn encode(&self, bytes: &mut BytesMut) {
153 bytes.put_u8(PropertyType::TopicAliasMaximum as u8);
154 bytes.put_u16(self.0);
155 }
156}
157impl Encode for TopicAlias {
158 fn encode(&self, bytes: &mut BytesMut) {
159 bytes.put_u8(PropertyType::TopicAlias as u8);
160 bytes.put_u16(self.0);
161 }
162}
163impl Encode for MaximumQos {
164 fn encode(&self, bytes: &mut BytesMut) {
165 bytes.put_u8(PropertyType::MaximumQos as u8);
166 bytes.put_u8(self.0 as u8);
167 }
168}
169impl Encode for RetainAvailable {
170 fn encode(&self, bytes: &mut BytesMut) {
171 bytes.put_u8(PropertyType::RetainAvailable as u8);
172 bytes.put_u8(self.0);
173 }
174}
175impl Encode for UserProperty {
176 fn encode(&self, bytes: &mut BytesMut) {
177 bytes.put_u8(PropertyType::UserProperty as u8);
178 encode_string(&self.0, bytes);
179 encode_string(&self.1, bytes);
180 }
181}
182impl Encode for MaximumPacketSize {
183 fn encode(&self, bytes: &mut BytesMut) {
184 bytes.put_u8(PropertyType::MaximumPacketSize as u8);
185 bytes.put_u32(self.0);
186 }
187}
188impl Encode for WildcardSubscriptionAvailable {
189 fn encode(&self, bytes: &mut BytesMut) {
190 bytes.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
191 bytes.put_u8(self.0);
192 }
193}
194impl Encode for SubscriptionIdentifierAvailable {
195 fn encode(&self, bytes: &mut BytesMut) {
196 bytes.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
197 bytes.put_u8(self.0);
198 }
199}
200impl Encode for SharedSubscriptionAvailable {
201 fn encode(&self, bytes: &mut BytesMut) {
202 bytes.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
203 bytes.put_u8(self.0);
204 }
205}
206
207fn encode_connect(packet: &ConnectPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
208 encode_string(&packet.protocol_name, bytes);
209 bytes.put_u8(packet.protocol_version as u8);
210
211 let mut connect_flags: u8 = 0b0000_0000;
212
213 if packet.user_name.is_some() {
214 connect_flags |= 0b1000_0000;
215 }
216
217 if packet.password.is_some() {
218 connect_flags |= 0b0100_0000;
219 }
220
221 if let Some(will) = &packet.will {
222 if will.should_retain {
223 connect_flags |= 0b0100_0000;
224 }
225
226 let qos_byte: u8 = will.qos as u8;
227 connect_flags |= (qos_byte & 0b0000_0011) << 3;
228 connect_flags |= 0b0000_0100;
229 }
230
231 if packet.clean_start {
232 connect_flags |= 0b0000_0010;
233 }
234
235 bytes.put_u8(connect_flags);
236 bytes.put_u16(packet.keep_alive);
237
238 if protocol_version == ProtocolVersion::V500 {
239 let property_length = packet.property_size(protocol_version);
240 encode_variable_int(property_length, bytes);
241
242 packet.session_expiry_interval.encode(bytes);
243 packet.receive_maximum.encode(bytes);
244 packet.maximum_packet_size.encode(bytes);
245 packet.topic_alias_maximum.encode(bytes);
246 packet.request_response_information.encode(bytes);
247 packet.request_problem_information.encode(bytes);
248 packet.user_properties.encode(bytes);
249 packet.authentication_method.encode(bytes);
250 packet.authentication_data.encode(bytes);
251 }
252
253 encode_string(&packet.client_id, bytes);
254
255 if let Some(will) = &packet.will {
256 if protocol_version == ProtocolVersion::V500 {
257 let property_length = will.property_size(protocol_version);
258 encode_variable_int(property_length, bytes);
259
260 will.will_delay_interval.encode(bytes);
261 will.payload_format_indicator.encode(bytes);
262 will.message_expiry_interval.encode(bytes);
263 will.content_type.encode(bytes);
264 will.response_topic.encode(bytes);
265 will.correlation_data.encode(bytes);
266 will.user_properties.encode(bytes);
267 }
268
269 encode_string(will.topic.topic_name(), bytes);
270 encode_binary_data(&will.payload, bytes);
271 }
272
273 if let Some(user_name) = &packet.user_name {
274 encode_string(user_name, bytes);
275 }
276
277 if let Some(password) = &packet.password {
278 encode_string(password, bytes);
279 }
280}
281
282fn encode_connect_ack(
283 packet: &ConnectAckPacket,
284 bytes: &mut BytesMut,
285 protocol_version: ProtocolVersion,
286) {
287 let mut connect_ack_flags: u8 = 0b0000_0000;
288 if packet.session_present {
289 connect_ack_flags |= 0b0000_0001;
290 }
291
292 bytes.put_u8(connect_ack_flags);
293 bytes.put_u8(packet.reason_code as u8);
294
295 if protocol_version == ProtocolVersion::V500 {
296 let property_length = packet.property_size(protocol_version);
297 encode_variable_int(property_length, bytes);
298
299 packet.session_expiry_interval.encode(bytes);
300 packet.receive_maximum.encode(bytes);
301 packet.maximum_qos.encode(bytes);
302 packet.retain_available.encode(bytes);
303 packet.maximum_packet_size.encode(bytes);
304 packet.assigned_client_identifier.encode(bytes);
305 packet.topic_alias_maximum.encode(bytes);
306 packet.reason_string.encode(bytes);
307 packet.user_properties.encode(bytes);
308 packet.wildcard_subscription_available.encode(bytes);
309 packet.subscription_identifiers_available.encode(bytes);
310 packet.shared_subscription_available.encode(bytes);
311 packet.server_keep_alive.encode(bytes);
312 packet.response_information.encode(bytes);
313 packet.server_reference.encode(bytes);
314 packet.authentication_method.encode(bytes);
315 packet.authentication_data.encode(bytes);
316 }
317}
318
319fn encode_publish(packet: &PublishPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
320 encode_string(&packet.topic.to_string(), bytes);
321
322 if let Some(packet_id) = packet.packet_id {
323 bytes.put_u16(packet_id);
324 }
325
326 if protocol_version == ProtocolVersion::V500 {
327 let property_length = packet.property_size(protocol_version);
328 encode_variable_int(property_length, bytes);
329
330 packet.payload_format_indicator.encode(bytes);
331 packet.message_expiry_interval.encode(bytes);
332 packet.topic_alias.encode(bytes);
333 packet.response_topic.encode(bytes);
334 packet.correlation_data.encode(bytes);
335 packet.user_properties.encode(bytes);
336 packet.subscription_identifiers.encode(bytes);
337 packet.content_type.encode(bytes);
338 }
339
340 bytes.put_slice(&packet.payload);
341}
342
343fn encode_publish_ack(
344 packet: &PublishAckPacket,
345 bytes: &mut BytesMut,
346 protocol_version: ProtocolVersion,
347) {
348 bytes.put_u16(packet.packet_id);
349
350 if protocol_version == ProtocolVersion::V500 {
351 bytes.put_u8(packet.reason_code as u8);
352
353 let property_length = packet.property_size(protocol_version);
354 encode_variable_int(property_length, bytes);
355
356 packet.reason_string.encode(bytes);
357 packet.user_properties.encode(bytes);
358 }
359}
360
361fn encode_publish_received(
362 packet: &PublishReceivedPacket,
363 bytes: &mut BytesMut,
364 protocol_version: ProtocolVersion,
365) {
366 bytes.put_u16(packet.packet_id);
367
368 if protocol_version == ProtocolVersion::V500 {
369 bytes.put_u8(packet.reason_code as u8);
370
371 let property_length = packet.property_size(protocol_version);
372 encode_variable_int(property_length, bytes);
373
374 packet.reason_string.encode(bytes);
375 packet.user_properties.encode(bytes);
376 }
377}
378
379fn encode_publish_release(
380 packet: &PublishReleasePacket,
381 bytes: &mut BytesMut,
382 protocol_version: ProtocolVersion,
383) {
384 bytes.put_u16(packet.packet_id);
385
386 if protocol_version == ProtocolVersion::V500 {
387 bytes.put_u8(packet.reason_code as u8);
388
389 let property_length = packet.property_size(protocol_version);
390 encode_variable_int(property_length, bytes);
391
392 packet.reason_string.encode(bytes);
393 packet.user_properties.encode(bytes);
394 }
395}
396
397fn encode_publish_complete(
398 packet: &PublishCompletePacket,
399 bytes: &mut BytesMut,
400 protocol_version: ProtocolVersion,
401) {
402 bytes.put_u16(packet.packet_id);
403
404 if protocol_version == ProtocolVersion::V500 {
405 bytes.put_u8(packet.reason_code as u8);
406
407 let property_length = packet.property_size(protocol_version);
408 encode_variable_int(property_length, bytes);
409
410 packet.reason_string.encode(bytes);
411 packet.user_properties.encode(bytes);
412 }
413}
414
415fn encode_subscribe(
416 packet: &SubscribePacket,
417 bytes: &mut BytesMut,
418 protocol_version: ProtocolVersion,
419) {
420 bytes.put_u16(packet.packet_id);
421
422 if protocol_version == ProtocolVersion::V500 {
423 let property_length = packet.property_size(protocol_version);
424 encode_variable_int(property_length, bytes);
425
426 packet.subscription_identifier.encode(bytes);
427 packet.user_properties.encode(bytes);
428 }
429
430 for topic in &packet.subscription_topics {
431 encode_string(&topic.topic_filter.to_string(), bytes);
432
433 let mut options_byte = 0b0000_0000;
434 let retain_handling_byte = topic.retain_handling as u8;
435 options_byte |= (retain_handling_byte & 0b0000_0011) << 4;
436
437 if topic.retain_as_published {
438 options_byte |= 0b0000_1000;
439 }
440
441 if topic.no_local {
442 options_byte |= 0b0000_0100;
443 }
444
445 let qos_byte = topic.maximum_qos as u8;
446 options_byte |= qos_byte & 0b0000_0011;
447
448 bytes.put_u8(options_byte);
449 }
450}
451
452fn encode_subscribe_ack(
453 packet: &SubscribeAckPacket,
454 bytes: &mut BytesMut,
455 protocol_version: ProtocolVersion,
456) {
457 bytes.put_u16(packet.packet_id);
458
459 if protocol_version == ProtocolVersion::V500 {
460 let property_length = packet.property_size(protocol_version);
461 encode_variable_int(property_length, bytes);
462
463 packet.reason_string.encode(bytes);
464 packet.user_properties.encode(bytes);
465 }
466
467 for code in &packet.reason_codes {
468 bytes.put_u8((*code) as u8);
469 }
470}
471
472fn encode_unsubscribe(
473 packet: &UnsubscribePacket,
474 bytes: &mut BytesMut,
475 protocol_version: ProtocolVersion,
476) {
477 bytes.put_u16(packet.packet_id);
478
479 if protocol_version == ProtocolVersion::V500 {
480 let property_length = packet.property_size(protocol_version);
481 encode_variable_int(property_length, bytes);
482
483 packet.user_properties.encode(bytes);
484 }
485
486 for topic_filter in &packet.topic_filters {
487 encode_string(&topic_filter.to_string(), bytes);
488 }
489}
490
491fn encode_unsubscribe_ack(
492 packet: &UnsubscribeAckPacket,
493 bytes: &mut BytesMut,
494 protocol_version: ProtocolVersion,
495) {
496 bytes.put_u16(packet.packet_id);
497
498 if protocol_version == ProtocolVersion::V500 {
499 let property_length = packet.property_size(protocol_version);
500 encode_variable_int(property_length, bytes);
501
502 packet.reason_string.encode(bytes);
503 packet.user_properties.encode(bytes);
504 }
505
506 for code in &packet.reason_codes {
507 bytes.put_u8((*code) as u8);
508 }
509}
510
511fn encode_disconnect(
512 packet: &DisconnectPacket,
513 bytes: &mut BytesMut,
514 protocol_version: ProtocolVersion,
515) {
516 if protocol_version == ProtocolVersion::V500 {
517 bytes.put_u8(packet.reason_code as u8);
518
519 let property_length = packet.property_size(protocol_version);
520 encode_variable_int(property_length, bytes);
521
522 packet.session_expiry_interval.encode(bytes);
523 packet.reason_string.encode(bytes);
524 packet.user_properties.encode(bytes);
525 packet.server_reference.encode(bytes);
526 }
527}
528
529fn encode_authenticate(
530 packet: &AuthenticatePacket,
531 bytes: &mut BytesMut,
532 protocol_version: ProtocolVersion,
533) {
534 bytes.put_u8(packet.reason_code as u8);
535
536 if protocol_version == ProtocolVersion::V500 {
537 let property_length = packet.property_size(protocol_version);
538 encode_variable_int(property_length, bytes);
539
540 packet.authentication_method.encode(bytes);
541 packet.authentication_data.encode(bytes);
542 packet.reason_string.encode(bytes);
543 packet.user_properties.encode(bytes);
544 }
545}
546
547pub fn encode_mqtt(packet: &Packet, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
548 let remaining_length = packet.calculate_size(protocol_version);
549 let packet_size =
550 1 + VariableByteInt(remaining_length).calculate_size(protocol_version) + remaining_length;
551 bytes.reserve(packet_size as usize);
552
553 let first_byte = packet.to_byte();
554 let mut first_byte_val = (first_byte << 4) & 0b1111_0000;
555 first_byte_val |= packet.fixed_header_flags();
556
557 bytes.put_u8(first_byte_val);
558 encode_variable_int(remaining_length, bytes);
559
560 match packet {
561 Packet::Connect(p) => encode_connect(p, bytes, protocol_version),
562 Packet::ConnectAck(p) => encode_connect_ack(p, bytes, protocol_version),
563 Packet::Publish(p) => encode_publish(p, bytes, protocol_version),
564 Packet::PublishAck(p) => encode_publish_ack(p, bytes, protocol_version),
565 Packet::PublishReceived(p) => encode_publish_received(p, bytes, protocol_version),
566 Packet::PublishRelease(p) => encode_publish_release(p, bytes, protocol_version),
567 Packet::PublishComplete(p) => encode_publish_complete(p, bytes, protocol_version),
568 Packet::Subscribe(p) => encode_subscribe(p, bytes, protocol_version),
569 Packet::SubscribeAck(p) => encode_subscribe_ack(p, bytes, protocol_version),
570 Packet::Unsubscribe(p) => encode_unsubscribe(p, bytes, protocol_version),
571 Packet::UnsubscribeAck(p) => encode_unsubscribe_ack(p, bytes, protocol_version),
572 Packet::PingRequest => {},
573 Packet::PingResponse => {},
574 Packet::Disconnect(p) => encode_disconnect(p, bytes, protocol_version),
575 Packet::Authenticate(p) => encode_authenticate(p, bytes, protocol_version),
576 }
577}
578
579#[cfg(test)]
580mod tests {
581 use crate::{decoder::*, encoder::*, types::*};
582 use bytes::BytesMut;
583
584 #[test]
585 fn connect_roundtrip() {
586 let packet = Packet::Connect(ConnectPacket {
587 protocol_name: "MQTT".to_string(),
588 protocol_version: ProtocolVersion::V500,
589 clean_start: true,
590 keep_alive: 200,
591
592 session_expiry_interval: None,
593 receive_maximum: None,
594 maximum_packet_size: None,
595 topic_alias_maximum: None,
596 request_response_information: None,
597 request_problem_information: None,
598 user_properties: vec![],
599 authentication_method: None,
600 authentication_data: None,
601
602 client_id: "test_client".to_string(),
603 will: None,
604 user_name: None,
605 password: None,
606 });
607
608 let mut bytes = BytesMut::new();
609 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
610 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
611
612 assert_eq!(packet, decoded);
613 }
614
615 #[test]
616 fn connect_ack_roundtrip() {
617 let packet = Packet::ConnectAck(ConnectAckPacket {
618 session_present: false,
619 reason_code: ConnectReason::Success,
620
621 session_expiry_interval: None,
622 receive_maximum: None,
623 maximum_qos: None,
624 retain_available: None,
625 maximum_packet_size: None,
626 assigned_client_identifier: None,
627 topic_alias_maximum: None,
628 reason_string: None,
629 user_properties: vec![],
630 wildcard_subscription_available: None,
631 subscription_identifiers_available: None,
632 shared_subscription_available: None,
633 server_keep_alive: None,
634 response_information: None,
635 server_reference: None,
636 authentication_method: None,
637 authentication_data: None,
638 });
639
640 let mut bytes = BytesMut::new();
641 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
642 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
643
644 assert_eq!(packet, decoded);
645 }
646
647 #[test]
648 fn publish_roundtrip() {
649 let packet = Packet::Publish(PublishPacket {
650 is_duplicate: false,
651 qos: QoS::AtLeastOnce,
652 retain: false,
653
654 topic: "test_topic".parse().unwrap(),
655 packet_id: Some(42),
656
657 payload_format_indicator: None,
658 message_expiry_interval: None,
659 topic_alias: None,
660 response_topic: None,
661 correlation_data: None,
662 user_properties: vec![],
663 subscription_identifiers: Vec::with_capacity(0),
664 content_type: None,
665
666 payload: vec![22; 100].into(),
667 });
668
669 let mut bytes = BytesMut::new();
670 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
671 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
672
673 assert_eq!(packet, decoded);
674 }
675
676 #[test]
677 fn publish_ack_roundtrip() {
678 let packet = Packet::PublishAck(PublishAckPacket {
679 packet_id: 1500,
680 reason_code: PublishAckReason::Success,
681
682 reason_string: None,
683 user_properties: vec![],
684 });
685
686 let mut bytes = BytesMut::new();
687 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
688 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
689
690 assert_eq!(packet, decoded);
691 }
692
693 #[test]
694 fn publish_received_roundtrip() {
695 let packet = Packet::PublishReceived(PublishReceivedPacket {
696 packet_id: 1500,
697 reason_code: PublishReceivedReason::Success,
698
699 reason_string: None,
700 user_properties: vec![],
701 });
702
703 let mut bytes = BytesMut::new();
704 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
705 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
706
707 assert_eq!(packet, decoded);
708 }
709
710 #[test]
711 fn publish_release_roundtrip() {
712 let packet = Packet::PublishRelease(PublishReleasePacket {
713 packet_id: 1500,
714 reason_code: PublishReleaseReason::Success,
715
716 reason_string: None,
717 user_properties: vec![],
718 });
719
720 let mut bytes = BytesMut::new();
721 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
722 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
723
724 assert_eq!(packet, decoded);
725 }
726
727 #[test]
728 fn publish_complete_roundtrip() {
729 let packet = Packet::PublishComplete(PublishCompletePacket {
730 packet_id: 1500,
731 reason_code: PublishCompleteReason::Success,
732
733 reason_string: None,
734 user_properties: vec![],
735 });
736
737 let mut bytes = BytesMut::new();
738 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
739 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
740
741 assert_eq!(packet, decoded);
742 }
743
744 #[test]
745 fn subscribe_roundtrip() {
746 let packet = Packet::Subscribe(SubscribePacket {
747 packet_id: 4500,
748
749 subscription_identifier: None,
750 user_properties: vec![],
751
752 subscription_topics: vec![SubscriptionTopic {
753 topic_filter: "test_topic".parse().unwrap(),
754 maximum_qos: QoS::AtLeastOnce,
755 no_local: false,
756 retain_as_published: false,
757 retain_handling: RetainHandling::SendAtSubscribeTime,
758 }],
759 });
760
761 let mut bytes = BytesMut::new();
762 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
763 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
764
765 assert_eq!(packet, decoded);
766 }
767
768 #[test]
769 fn subscribe_ack_roundtrip() {
770 let packet = Packet::SubscribeAck(SubscribeAckPacket {
771 packet_id: 1234,
772
773 reason_string: None,
774 user_properties: vec![],
775
776 reason_codes: vec![SubscribeAckReason::GrantedQoSZero],
777 });
778
779 let mut bytes = BytesMut::new();
780 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
781 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
782
783 assert_eq!(packet, decoded);
784 }
785
786 #[test]
787 fn unsubscribe_roundtrip() {
788 let packet = Packet::Unsubscribe(UnsubscribePacket {
789 packet_id: 1234,
790
791 user_properties: vec![],
792
793 topic_filters: vec!["test_topic".parse().unwrap()],
794 });
795
796 let mut bytes = BytesMut::new();
797 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
798 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
799
800 assert_eq!(packet, decoded);
801 }
802
803 #[test]
804 fn unsubscribe_ack_roundtrip() {
805 let packet = Packet::UnsubscribeAck(UnsubscribeAckPacket {
806 packet_id: 4321,
807
808 reason_string: None,
809 user_properties: vec![],
810
811 reason_codes: vec![UnsubscribeAckReason::Success],
812 });
813
814 let mut bytes = BytesMut::new();
815 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
816 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
817
818 assert_eq!(packet, decoded);
819 }
820
821 #[test]
822 fn ping_request_roundtrip() {
823 let packet = Packet::PingRequest;
824 let mut bytes = BytesMut::new();
825 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
826 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
827
828 assert_eq!(packet, decoded);
829 }
830
831 #[test]
832 fn ping_response_roundtrip() {
833 let packet = Packet::PingResponse;
834 let mut bytes = BytesMut::new();
835 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
836 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
837
838 assert_eq!(packet, decoded);
839 }
840
841 #[test]
842 fn disconnect_roundtrip() {
843 let packet = Packet::Disconnect(DisconnectPacket {
844 reason_code: DisconnectReason::NormalDisconnection,
845
846 session_expiry_interval: None,
847 reason_string: None,
848 user_properties: vec![],
849 server_reference: None,
850 });
851 let mut bytes = BytesMut::new();
852 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
853 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
854
855 assert_eq!(packet, decoded);
856 }
857
858 #[test]
859 fn authenticate_roundtrip() {
860 let packet = Packet::Authenticate(AuthenticatePacket {
861 reason_code: AuthenticateReason::Success,
862
863 authentication_method: None,
864 authentication_data: None,
865 reason_string: None,
866 user_properties: vec![],
867 });
868 let mut bytes = BytesMut::new();
869 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
870 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
871
872 assert_eq!(packet, decoded);
873 }
874}