1use crate::types::{
2 properties::*, AuthenticatePacket, ConnectAckPacket, ConnectPacket, DisconnectPacket, Encode,
3 Packet, PropertySize, ProtocolVersion, PublishAckPacket, PublishCompletePacket, PublishPacket,
4 PublishReceivedPacket, PublishReleasePacket, SubscribeAckPacket, SubscribePacket,
5 UnsubscribeAckPacket, UnsubscribePacket, VariableByteInt,
6};
7use bytes::{BufMut, BytesMut};
8
9fn encode_variable_int(value: u32, bytes: &mut BytesMut) -> usize {
10 let mut x = value;
11 let mut byte_counter = 0;
12
13 loop {
14 let mut encoded_byte: u8 = (x % 128) as u8;
15 x /= 128;
16
17 if x > 0 {
18 encoded_byte |= 128;
19 }
20
21 bytes.put_u8(encoded_byte);
22
23 byte_counter += 1;
24
25 if x == 0 {
26 break;
27 }
28 }
29
30 byte_counter
31}
32
33fn encode_string(value: &str, bytes: &mut BytesMut) {
34 bytes.put_u16(value.len() as u16);
35 bytes.put_slice(value.as_bytes());
36}
37
38fn encode_binary_data(value: &[u8], bytes: &mut BytesMut) {
39 bytes.put_u16(value.len() as u16);
40 bytes.put_slice(value);
41}
42
43impl Encode for PayloadFormatIndicator {
44 fn encode(&self, bytes: &mut BytesMut) {
45 bytes.put_u8(PropertyType::PayloadFormatIndicator as u8);
46 bytes.put_u8(self.0);
47 }
48}
49impl Encode for MessageExpiryInterval {
50 fn encode(&self, bytes: &mut BytesMut) {
51 bytes.put_u8(PropertyType::MessageExpiryInterval as u8);
52 bytes.put_u32(self.0);
53 }
54}
55impl Encode for ContentType {
56 fn encode(&self, bytes: &mut BytesMut) {
57 bytes.put_u8(PropertyType::ContentType as u8);
58 encode_string(&self.0, bytes);
59 }
60}
61impl Encode for ResponseTopic {
62 fn encode(&self, bytes: &mut BytesMut) {
63 bytes.put_u8(PropertyType::ResponseTopic as u8);
64 encode_string(&self.0, bytes);
65 }
66}
67impl Encode for CorrelationData {
68 fn encode(&self, bytes: &mut BytesMut) {
69 bytes.put_u8(PropertyType::CorrelationData as u8);
70 encode_binary_data(&self.0, bytes);
71 }
72}
73impl Encode for SubscriptionIdentifier {
74 fn encode(&self, bytes: &mut BytesMut) {
75 bytes.put_u8(PropertyType::SubscriptionIdentifier as u8);
76 encode_variable_int((self.0).0, bytes);
77 }
78}
79impl Encode for SessionExpiryInterval {
80 fn encode(&self, bytes: &mut BytesMut) {
81 bytes.put_u8(PropertyType::SessionExpiryInterval as u8);
82 bytes.put_u32(self.0);
83 }
84}
85impl Encode for AssignedClientIdentifier {
86 fn encode(&self, bytes: &mut BytesMut) {
87 bytes.put_u8(PropertyType::AssignedClientIdentifier as u8);
88 encode_string(&self.0, bytes);
89 }
90}
91impl Encode for ServerKeepAlive {
92 fn encode(&self, bytes: &mut BytesMut) {
93 bytes.put_u8(PropertyType::ServerKeepAlive as u8);
94 bytes.put_u16(self.0)
95 }
96}
97impl Encode for AuthenticationMethod {
98 fn encode(&self, bytes: &mut BytesMut) {
99 bytes.put_u8(PropertyType::AuthenticationMethod as u8);
100 encode_string(&self.0, bytes);
101 }
102}
103impl Encode for AuthenticationData {
104 fn encode(&self, bytes: &mut BytesMut) {
105 bytes.put_u8(PropertyType::AuthenticationData as u8);
106 encode_binary_data(&self.0, bytes);
107 }
108}
109impl Encode for RequestProblemInformation {
110 fn encode(&self, bytes: &mut BytesMut) {
111 bytes.put_u8(PropertyType::RequestProblemInformation as u8);
112 bytes.put_u8(self.0);
113 }
114}
115impl Encode for WillDelayInterval {
116 fn encode(&self, bytes: &mut BytesMut) {
117 bytes.put_u8(PropertyType::WillDelayInterval as u8);
118 bytes.put_u32(self.0);
119 }
120}
121impl Encode for RequestResponseInformation {
122 fn encode(&self, bytes: &mut BytesMut) {
123 bytes.put_u8(PropertyType::RequestResponseInformation as u8);
124 bytes.put_u8(self.0);
125 }
126}
127impl Encode for ResponseInformation {
128 fn encode(&self, bytes: &mut BytesMut) {
129 bytes.put_u8(PropertyType::ResponseInformation as u8);
130 encode_string(&self.0, bytes);
131 }
132}
133impl Encode for ServerReference {
134 fn encode(&self, bytes: &mut BytesMut) {
135 bytes.put_u8(PropertyType::ServerReference as u8);
136 encode_string(&self.0, bytes);
137 }
138}
139impl Encode for ReasonString {
140 fn encode(&self, bytes: &mut BytesMut) {
141 bytes.put_u8(PropertyType::ReasonString as u8);
142 encode_string(&self.0, bytes);
143 }
144}
145impl Encode for ReceiveMaximum {
146 fn encode(&self, bytes: &mut BytesMut) {
147 bytes.put_u8(PropertyType::ReceiveMaximum as u8);
148 bytes.put_u16(self.0);
149 }
150}
151impl Encode for TopicAliasMaximum {
152 fn encode(&self, bytes: &mut BytesMut) {
153 bytes.put_u8(PropertyType::TopicAliasMaximum as u8);
154 bytes.put_u16(self.0);
155 }
156}
157impl Encode for TopicAlias {
158 fn encode(&self, bytes: &mut BytesMut) {
159 bytes.put_u8(PropertyType::TopicAlias as u8);
160 bytes.put_u16(self.0);
161 }
162}
163impl Encode for MaximumQos {
164 fn encode(&self, bytes: &mut BytesMut) {
165 bytes.put_u8(PropertyType::MaximumQos as u8);
166 bytes.put_u8(self.0 as u8);
167 }
168}
169impl Encode for RetainAvailable {
170 fn encode(&self, bytes: &mut BytesMut) {
171 bytes.put_u8(PropertyType::RetainAvailable as u8);
172 bytes.put_u8(self.0);
173 }
174}
175impl Encode for UserProperty {
176 fn encode(&self, bytes: &mut BytesMut) {
177 bytes.put_u8(PropertyType::UserProperty as u8);
178 encode_string(&self.0, bytes);
179 encode_string(&self.1, bytes);
180 }
181}
182impl Encode for MaximumPacketSize {
183 fn encode(&self, bytes: &mut BytesMut) {
184 bytes.put_u8(PropertyType::MaximumPacketSize as u8);
185 bytes.put_u32(self.0);
186 }
187}
188impl Encode for WildcardSubscriptionAvailable {
189 fn encode(&self, bytes: &mut BytesMut) {
190 bytes.put_u8(PropertyType::WildcardSubscriptionAvailable as u8);
191 bytes.put_u8(self.0);
192 }
193}
194impl Encode for SubscriptionIdentifierAvailable {
195 fn encode(&self, bytes: &mut BytesMut) {
196 bytes.put_u8(PropertyType::SubscriptionIdentifierAvailable as u8);
197 bytes.put_u8(self.0);
198 }
199}
200impl Encode for SharedSubscriptionAvailable {
201 fn encode(&self, bytes: &mut BytesMut) {
202 bytes.put_u8(PropertyType::SharedSubscriptionAvailable as u8);
203 bytes.put_u8(self.0);
204 }
205}
206
207fn encode_connect(packet: &ConnectPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
208 encode_string(&packet.protocol_name, bytes);
209 bytes.put_u8(packet.protocol_version as u8);
210
211 let mut connect_flags: u8 = 0b0000_0000;
212
213 if packet.user_name.is_some() {
214 connect_flags |= 0b1000_0000;
215 }
216
217 if packet.password.is_some() {
218 connect_flags |= 0b0100_0000;
219 }
220
221 if let Some(will) = &packet.will {
222 if will.should_retain {
223 connect_flags |= 0b0100_0000;
224 }
225
226 let qos_byte: u8 = will.qos as u8;
227 connect_flags |= (qos_byte & 0b0000_0011) << 3;
228 connect_flags |= 0b0000_0100;
229 }
230
231 if packet.clean_start {
232 connect_flags |= 0b0000_0010;
233 }
234
235 bytes.put_u8(connect_flags);
236 bytes.put_u16(packet.keep_alive);
237
238 if protocol_version == ProtocolVersion::V500 {
239 let property_length = packet.property_size(protocol_version);
240 encode_variable_int(property_length, bytes);
241
242 packet.session_expiry_interval.encode(bytes);
243 packet.receive_maximum.encode(bytes);
244 packet.maximum_packet_size.encode(bytes);
245 packet.topic_alias_maximum.encode(bytes);
246 packet.request_response_information.encode(bytes);
247 packet.request_problem_information.encode(bytes);
248 packet.user_properties.encode(bytes);
249 packet.authentication_method.encode(bytes);
250 packet.authentication_data.encode(bytes);
251 }
252
253 encode_string(&packet.client_id, bytes);
254
255 if let Some(will) = &packet.will {
256 if protocol_version == ProtocolVersion::V500 {
257 let property_length = will.property_size(protocol_version);
258 encode_variable_int(property_length, bytes);
259
260 will.will_delay_interval.encode(bytes);
261 will.payload_format_indicator.encode(bytes);
262 will.message_expiry_interval.encode(bytes);
263 will.content_type.encode(bytes);
264 will.response_topic.encode(bytes);
265 will.correlation_data.encode(bytes);
266 will.user_properties.encode(bytes);
267 }
268
269 encode_string(&will.topic, bytes);
270 encode_binary_data(&will.payload, bytes);
271 }
272
273 if let Some(user_name) = &packet.user_name {
274 encode_string(&user_name, bytes);
275 }
276
277 if let Some(password) = &packet.password {
278 encode_string(&password, bytes);
279 }
280}
281
282fn encode_connect_ack(
283 packet: &ConnectAckPacket,
284 bytes: &mut BytesMut,
285 protocol_version: ProtocolVersion,
286) {
287 let mut connect_ack_flags: u8 = 0b0000_0000;
288 if packet.session_present {
289 connect_ack_flags |= 0b0000_0001;
290 }
291
292 bytes.put_u8(connect_ack_flags);
293 bytes.put_u8(packet.reason_code as u8);
294
295 if protocol_version == ProtocolVersion::V500 {
296 let property_length = packet.property_size(protocol_version);
297 encode_variable_int(property_length, bytes);
298
299 packet.session_expiry_interval.encode(bytes);
300 packet.receive_maximum.encode(bytes);
301 packet.maximum_qos.encode(bytes);
302 packet.retain_available.encode(bytes);
303 packet.maximum_packet_size.encode(bytes);
304 packet.assigned_client_identifier.encode(bytes);
305 packet.topic_alias_maximum.encode(bytes);
306 packet.reason_string.encode(bytes);
307 packet.user_properties.encode(bytes);
308 packet.wildcard_subscription_available.encode(bytes);
309 packet.subscription_identifiers_available.encode(bytes);
310 packet.shared_subscription_available.encode(bytes);
311 packet.server_keep_alive.encode(bytes);
312 packet.response_information.encode(bytes);
313 packet.server_reference.encode(bytes);
314 packet.authentication_method.encode(bytes);
315 packet.authentication_data.encode(bytes);
316 }
317}
318
319fn encode_publish(packet: &PublishPacket, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
320 encode_string(&packet.topic.to_string(), bytes);
321
322 if let Some(packet_id) = packet.packet_id {
323 bytes.put_u16(packet_id);
324 }
325
326 if protocol_version == ProtocolVersion::V500 {
327 let property_length = packet.property_size(protocol_version);
328 encode_variable_int(property_length, bytes);
329
330 packet.payload_format_indicator.encode(bytes);
331 packet.message_expiry_interval.encode(bytes);
332 packet.topic_alias.encode(bytes);
333 packet.response_topic.encode(bytes);
334 packet.correlation_data.encode(bytes);
335 packet.user_properties.encode(bytes);
336 packet.subscription_identifier.encode(bytes);
337 packet.content_type.encode(bytes);
338 }
339
340 bytes.put_slice(&packet.payload);
341}
342
343fn encode_publish_ack(
344 packet: &PublishAckPacket,
345 bytes: &mut BytesMut,
346 protocol_version: ProtocolVersion,
347) {
348 bytes.put_u16(packet.packet_id);
349
350 if protocol_version == ProtocolVersion::V500 {
351 bytes.put_u8(packet.reason_code as u8);
352
353 let property_length = packet.property_size(protocol_version);
354 encode_variable_int(property_length, bytes);
355
356 packet.reason_string.encode(bytes);
357 packet.user_properties.encode(bytes);
358 }
359}
360
361fn encode_publish_received(
362 packet: &PublishReceivedPacket,
363 bytes: &mut BytesMut,
364 protocol_version: ProtocolVersion,
365) {
366 bytes.put_u16(packet.packet_id);
367 bytes.put_u8(packet.reason_code as u8);
368
369 if protocol_version == ProtocolVersion::V500 {
370 let property_length = packet.property_size(protocol_version);
371 encode_variable_int(property_length, bytes);
372
373 packet.reason_string.encode(bytes);
374 packet.user_properties.encode(bytes);
375 }
376}
377
378fn encode_publish_release(
379 packet: &PublishReleasePacket,
380 bytes: &mut BytesMut,
381 protocol_version: ProtocolVersion,
382) {
383 bytes.put_u16(packet.packet_id);
384 bytes.put_u8(packet.reason_code as u8);
385
386 if protocol_version == ProtocolVersion::V500 {
387 let property_length = packet.property_size(protocol_version);
388 encode_variable_int(property_length, bytes);
389
390 packet.reason_string.encode(bytes);
391 packet.user_properties.encode(bytes);
392 }
393}
394
395fn encode_publish_complete(
396 packet: &PublishCompletePacket,
397 bytes: &mut BytesMut,
398 protocol_version: ProtocolVersion,
399) {
400 bytes.put_u16(packet.packet_id);
401 bytes.put_u8(packet.reason_code as u8);
402
403 if protocol_version == ProtocolVersion::V500 {
404 let property_length = packet.property_size(protocol_version);
405 encode_variable_int(property_length, bytes);
406
407 packet.reason_string.encode(bytes);
408 packet.user_properties.encode(bytes);
409 }
410}
411
412fn encode_subscribe(
413 packet: &SubscribePacket,
414 bytes: &mut BytesMut,
415 protocol_version: ProtocolVersion,
416) {
417 bytes.put_u16(packet.packet_id);
418
419 if protocol_version == ProtocolVersion::V500 {
420 let property_length = packet.property_size(protocol_version);
421 encode_variable_int(property_length, bytes);
422
423 packet.subscription_identifier.encode(bytes);
424 packet.user_properties.encode(bytes);
425 }
426
427 for topic in &packet.subscription_topics {
428 encode_string(&topic.topic_filter.to_string(), bytes);
429
430 let mut options_byte = 0b0000_0000;
431 let retain_handling_byte = topic.retain_handling as u8;
432 options_byte |= (retain_handling_byte & 0b0000_0011) << 4;
433
434 if topic.retain_as_published {
435 options_byte |= 0b0000_1000;
436 }
437
438 if topic.no_local {
439 options_byte |= 0b0000_0100;
440 }
441
442 let qos_byte = topic.maximum_qos as u8;
443 options_byte |= qos_byte & 0b0000_0011;
444
445 bytes.put_u8(options_byte);
446 }
447}
448
449fn encode_subscribe_ack(
450 packet: &SubscribeAckPacket,
451 bytes: &mut BytesMut,
452 protocol_version: ProtocolVersion,
453) {
454 bytes.put_u16(packet.packet_id);
455
456 if protocol_version == ProtocolVersion::V500 {
457 let property_length = packet.property_size(protocol_version);
458 encode_variable_int(property_length, bytes);
459
460 packet.reason_string.encode(bytes);
461 packet.user_properties.encode(bytes);
462 }
463
464 for code in &packet.reason_codes {
465 bytes.put_u8((*code) as u8);
466 }
467}
468
469fn encode_unsubscribe(
470 packet: &UnsubscribePacket,
471 bytes: &mut BytesMut,
472 protocol_version: ProtocolVersion,
473) {
474 bytes.put_u16(packet.packet_id);
475
476 if protocol_version == ProtocolVersion::V500 {
477 let property_length = packet.property_size(protocol_version);
478 encode_variable_int(property_length, bytes);
479
480 packet.user_properties.encode(bytes);
481 }
482
483 for topic_filter in &packet.topic_filters {
484 encode_string(&topic_filter.to_string(), bytes);
485 }
486}
487
488fn encode_unsubscribe_ack(
489 packet: &UnsubscribeAckPacket,
490 bytes: &mut BytesMut,
491 protocol_version: ProtocolVersion,
492) {
493 bytes.put_u16(packet.packet_id);
494
495 if protocol_version == ProtocolVersion::V500 {
496 let property_length = packet.property_size(protocol_version);
497 encode_variable_int(property_length, bytes);
498
499 packet.reason_string.encode(bytes);
500 packet.user_properties.encode(bytes);
501 }
502
503 for code in &packet.reason_codes {
504 bytes.put_u8((*code) as u8);
505 }
506}
507
508fn encode_disconnect(
509 packet: &DisconnectPacket,
510 bytes: &mut BytesMut,
511 protocol_version: ProtocolVersion,
512) {
513 bytes.put_u8(packet.reason_code as u8);
514
515 if protocol_version == ProtocolVersion::V500 {
516 let property_length = packet.property_size(protocol_version);
517 encode_variable_int(property_length, bytes);
518
519 packet.session_expiry_interval.encode(bytes);
520 packet.reason_string.encode(bytes);
521 packet.user_properties.encode(bytes);
522 packet.server_reference.encode(bytes);
523 }
524}
525
526fn encode_authenticate(
527 packet: &AuthenticatePacket,
528 bytes: &mut BytesMut,
529 protocol_version: ProtocolVersion,
530) {
531 bytes.put_u8(packet.reason_code as u8);
532
533 if protocol_version == ProtocolVersion::V500 {
534 let property_length = packet.property_size(protocol_version);
535 encode_variable_int(property_length, bytes);
536
537 packet.authentication_method.encode(bytes);
538 packet.authentication_data.encode(bytes);
539 packet.reason_string.encode(bytes);
540 packet.user_properties.encode(bytes);
541 }
542}
543
544pub fn encode_mqtt(packet: &Packet, bytes: &mut BytesMut, protocol_version: ProtocolVersion) {
545 let remaining_length = packet.calculate_size(protocol_version);
546 let packet_size =
547 1 + VariableByteInt(remaining_length).calculate_size(protocol_version) + remaining_length;
548 bytes.reserve(packet_size as usize);
549
550 let first_byte = packet.to_byte();
551 let mut first_byte_val = (first_byte << 4) & 0b1111_0000;
552 first_byte_val |= packet.fixed_header_flags();
553
554 bytes.put_u8(first_byte_val);
555 encode_variable_int(remaining_length as u32, bytes);
556
557 match packet {
558 Packet::Connect(p) => encode_connect(p, bytes, protocol_version),
559 Packet::ConnectAck(p) => encode_connect_ack(p, bytes, protocol_version),
560 Packet::Publish(p) => encode_publish(p, bytes, protocol_version),
561 Packet::PublishAck(p) => encode_publish_ack(p, bytes, protocol_version),
562 Packet::PublishReceived(p) => encode_publish_received(p, bytes, protocol_version),
563 Packet::PublishRelease(p) => encode_publish_release(p, bytes, protocol_version),
564 Packet::PublishComplete(p) => encode_publish_complete(p, bytes, protocol_version),
565 Packet::Subscribe(p) => encode_subscribe(p, bytes, protocol_version),
566 Packet::SubscribeAck(p) => encode_subscribe_ack(p, bytes, protocol_version),
567 Packet::Unsubscribe(p) => encode_unsubscribe(p, bytes, protocol_version),
568 Packet::UnsubscribeAck(p) => encode_unsubscribe_ack(p, bytes, protocol_version),
569 Packet::PingRequest => {},
570 Packet::PingResponse => {},
571 Packet::Disconnect(p) => encode_disconnect(p, bytes, protocol_version),
572 Packet::Authenticate(p) => encode_authenticate(p, bytes, protocol_version),
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use crate::{decoder::*, encoder::*, types::*};
579 use bytes::BytesMut;
580
581 #[test]
582 fn connect_roundtrip() {
583 let packet = Packet::Connect(ConnectPacket {
584 protocol_name: "MQTT".to_string(),
585 protocol_version: ProtocolVersion::V500,
586 clean_start: true,
587 keep_alive: 200,
588
589 session_expiry_interval: None,
590 receive_maximum: None,
591 maximum_packet_size: None,
592 topic_alias_maximum: None,
593 request_response_information: None,
594 request_problem_information: None,
595 user_properties: vec![],
596 authentication_method: None,
597 authentication_data: None,
598
599 client_id: "test_client".to_string(),
600 will: None,
601 user_name: None,
602 password: None,
603 });
604
605 let mut bytes = BytesMut::new();
606 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
607 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
608
609 assert_eq!(packet, decoded);
610 }
611
612 #[test]
613 fn connect_ack_roundtrip() {
614 let packet = Packet::ConnectAck(ConnectAckPacket {
615 session_present: false,
616 reason_code: ConnectReason::Success,
617
618 session_expiry_interval: None,
619 receive_maximum: None,
620 maximum_qos: None,
621 retain_available: None,
622 maximum_packet_size: None,
623 assigned_client_identifier: None,
624 topic_alias_maximum: None,
625 reason_string: None,
626 user_properties: vec![],
627 wildcard_subscription_available: None,
628 subscription_identifiers_available: None,
629 shared_subscription_available: None,
630 server_keep_alive: None,
631 response_information: None,
632 server_reference: None,
633 authentication_method: None,
634 authentication_data: None,
635 });
636
637 let mut bytes = BytesMut::new();
638 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
639 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
640
641 assert_eq!(packet, decoded);
642 }
643
644 #[test]
645 fn publish_roundtrip() {
646 let packet = Packet::Publish(PublishPacket {
647 is_duplicate: false,
648 qos: QoS::AtLeastOnce,
649 retain: false,
650
651 topic: "test_topic".parse().unwrap(),
652 packet_id: Some(42),
653
654 payload_format_indicator: None,
655 message_expiry_interval: None,
656 topic_alias: None,
657 response_topic: None,
658 correlation_data: None,
659 user_properties: vec![],
660 subscription_identifier: None,
661 content_type: None,
662
663 payload: vec![22; 100].into(),
664 });
665
666 let mut bytes = BytesMut::new();
667 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
668 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
669
670 assert_eq!(packet, decoded);
671 }
672
673 #[test]
674 fn publish_ack_roundtrip() {
675 let packet = Packet::PublishAck(PublishAckPacket {
676 packet_id: 1500,
677 reason_code: PublishAckReason::Success,
678
679 reason_string: None,
680 user_properties: vec![],
681 });
682
683 let mut bytes = BytesMut::new();
684 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
685 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
686
687 assert_eq!(packet, decoded);
688 }
689
690 #[test]
691 fn publish_received_roundtrip() {
692 let packet = Packet::PublishReceived(PublishReceivedPacket {
693 packet_id: 1500,
694 reason_code: PublishReceivedReason::Success,
695
696 reason_string: None,
697 user_properties: vec![],
698 });
699
700 let mut bytes = BytesMut::new();
701 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
702 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
703
704 assert_eq!(packet, decoded);
705 }
706
707 #[test]
708 fn publish_release_roundtrip() {
709 let packet = Packet::PublishRelease(PublishReleasePacket {
710 packet_id: 1500,
711 reason_code: PublishReleaseReason::Success,
712
713 reason_string: None,
714 user_properties: vec![],
715 });
716
717 let mut bytes = BytesMut::new();
718 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
719 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
720
721 assert_eq!(packet, decoded);
722 }
723
724 #[test]
725 fn publish_complete_roundtrip() {
726 let packet = Packet::PublishComplete(PublishCompletePacket {
727 packet_id: 1500,
728 reason_code: PublishCompleteReason::Success,
729
730 reason_string: None,
731 user_properties: vec![],
732 });
733
734 let mut bytes = BytesMut::new();
735 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
736 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
737
738 assert_eq!(packet, decoded);
739 }
740
741 #[test]
742 fn subscribe_roundtrip() {
743 let packet = Packet::Subscribe(SubscribePacket {
744 packet_id: 4500,
745
746 subscription_identifier: None,
747 user_properties: vec![],
748
749 subscription_topics: vec![SubscriptionTopic {
750 topic_filter: "test_topic".parse().unwrap(),
751 maximum_qos: QoS::AtLeastOnce,
752 no_local: false,
753 retain_as_published: false,
754 retain_handling: RetainHandling::SendAtSubscribeTime,
755 }],
756 });
757
758 let mut bytes = BytesMut::new();
759 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
760 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
761
762 assert_eq!(packet, decoded);
763 }
764
765 #[test]
766 fn subscribe_ack_roundtrip() {
767 let packet = Packet::SubscribeAck(SubscribeAckPacket {
768 packet_id: 1234,
769
770 reason_string: None,
771 user_properties: vec![],
772
773 reason_codes: vec![SubscribeAckReason::GrantedQoSZero],
774 });
775
776 let mut bytes = BytesMut::new();
777 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
778 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
779
780 assert_eq!(packet, decoded);
781 }
782
783 #[test]
784 fn unsubscribe_roundtrip() {
785 let packet = Packet::Unsubscribe(UnsubscribePacket {
786 packet_id: 1234,
787
788 user_properties: vec![],
789
790 topic_filters: vec!["test_topic".parse().unwrap()],
791 });
792
793 let mut bytes = BytesMut::new();
794 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
795 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
796
797 assert_eq!(packet, decoded);
798 }
799
800 #[test]
801 fn unsubscribe_ack_roundtrip() {
802 let packet = Packet::UnsubscribeAck(UnsubscribeAckPacket {
803 packet_id: 4321,
804
805 reason_string: None,
806 user_properties: vec![],
807
808 reason_codes: vec![UnsubscribeAckReason::Success],
809 });
810
811 let mut bytes = BytesMut::new();
812 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
813 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
814
815 assert_eq!(packet, decoded);
816 }
817
818 #[test]
819 fn ping_request_roundtrip() {
820 let packet = Packet::PingRequest;
821 let mut bytes = BytesMut::new();
822 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
823 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
824
825 assert_eq!(packet, decoded);
826 }
827
828 #[test]
829 fn ping_response_roundtrip() {
830 let packet = Packet::PingResponse;
831 let mut bytes = BytesMut::new();
832 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
833 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
834
835 assert_eq!(packet, decoded);
836 }
837
838 #[test]
839 fn disconnect_roundtrip() {
840 let packet = Packet::Disconnect(DisconnectPacket {
841 reason_code: DisconnectReason::NormalDisconnection,
842
843 session_expiry_interval: None,
844 reason_string: None,
845 user_properties: vec![],
846 server_reference: None,
847 });
848 let mut bytes = BytesMut::new();
849 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
850 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
851
852 assert_eq!(packet, decoded);
853 }
854
855 #[test]
856 fn authenticate_roundtrip() {
857 let packet = Packet::Authenticate(AuthenticatePacket {
858 reason_code: AuthenticateReason::Success,
859
860 authentication_method: None,
861 authentication_data: None,
862 reason_string: None,
863 user_properties: vec![],
864 });
865 let mut bytes = BytesMut::new();
866 encode_mqtt(&packet, &mut bytes, ProtocolVersion::V500);
867 let decoded = decode_mqtt(&mut bytes, ProtocolVersion::V500).unwrap().unwrap();
868
869 assert_eq!(packet, decoded);
870 }
871}