1use alloc::{
23 string::{String, ToString},
24 vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33
34use serde::Serialize;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
37enum ConnectionStatus {
38 #[serde(rename = "disconnected")]
39 Disconnected,
40 #[serde(rename = "connecting")]
41 Connecting,
42 #[serde(rename = "connected")]
43 Connected,
44}
45use crate::mqtt::connection::packet_builder::{
46 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
47};
48use crate::mqtt::connection::packet_id_manager::PacketIdManager;
49use crate::mqtt::connection::role;
50use crate::mqtt::connection::role::RoleType;
51use crate::mqtt::connection::sendable::Sendable;
52use crate::mqtt::connection::version::*;
53use crate::mqtt::packet::v3_1_1;
54use crate::mqtt::packet::v5_0;
55use crate::mqtt::packet::GenericPacket;
56use crate::mqtt::packet::GenericStorePacket;
57use crate::mqtt::packet::IsPacketId;
58use crate::mqtt::packet::Qos;
59use crate::mqtt::packet::ResponsePacket;
60use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
61use crate::mqtt::prelude::GenericPacketTrait;
62use crate::mqtt::result_code::{
63 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
64};
65
66const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
69
70fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
77 let remaining_length_bytes = if remaining_length < 128 {
78 1
79 } else if remaining_length < 16384 {
80 2
81 } else if remaining_length < 2097152 {
82 3
83 } else {
84 4
85 };
86
87 1 + remaining_length_bytes + remaining_length
88}
89
90pub type Event = GenericEvent<u16>;
95
96pub struct GenericConnection<Role, PacketIdType>
139where
140 Role: RoleType,
141 PacketIdType: IsPacketId,
142{
143 _marker: PhantomData<Role>,
144
145 protocol_version: Version,
146
147 pid_man: PacketIdManager<PacketIdType>,
148 pid_suback: HashSet<PacketIdType>,
149 pid_unsuback: HashSet<PacketIdType>,
150 pid_puback: HashSet<PacketIdType>,
151 pid_pubrec: HashSet<PacketIdType>,
152 pid_pubcomp: HashSet<PacketIdType>,
153
154 need_store: bool,
155 store: GenericStore<PacketIdType>,
157
158 offline_publish: bool,
159 auto_pub_response: bool,
160 auto_ping_response: bool,
161
162 auto_map_topic_alias_send: bool,
164 auto_replace_topic_alias_send: bool,
166 topic_alias_recv: Option<TopicAliasRecv>,
168 topic_alias_send: Option<TopicAliasSend>,
170
171 publish_send_max: Option<u16>,
172 publish_recv_max: Option<u16>,
174 publish_send_count: u16,
177
178 publish_recv: HashSet<PacketIdType>,
180
181 maximum_packet_size_send: u32,
183 maximum_packet_size_recv: u32,
185
186 status: ConnectionStatus,
188
189 pingreq_send_interval_ms: Option<u64>,
191 pingreq_recv_timeout_ms: Option<u64>,
193 pingresp_recv_timeout_ms: Option<u64>,
195
196 qos2_publish_handled: HashSet<PacketIdType>,
198 qos2_publish_processing: HashSet<PacketIdType>,
200
201 pingreq_send_set: bool,
203 pingreq_recv_set: bool,
204 pingresp_recv_set: bool,
205
206 packet_builder: PacketBuilder,
207 is_client: bool,
209}
210
211pub type Connection<Role> = GenericConnection<Role, u16>;
224
225impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
226where
227 Role: RoleType,
228 PacketIdType: IsPacketId,
229{
230 pub fn new(version: Version) -> Self {
253 Self {
254 _marker: PhantomData,
255 protocol_version: version,
256 pid_man: PacketIdManager::new(),
257 pid_suback: HashSet::default(),
258 pid_unsuback: HashSet::default(),
259 pid_puback: HashSet::default(),
260 pid_pubrec: HashSet::default(),
261 pid_pubcomp: HashSet::default(),
262 need_store: false,
263 store: GenericStore::new(),
264 offline_publish: false,
265 auto_pub_response: false,
266 auto_ping_response: false,
267 auto_map_topic_alias_send: false,
268 auto_replace_topic_alias_send: false,
269 topic_alias_recv: None,
270 topic_alias_send: None,
271 publish_send_max: None,
272 publish_recv_max: None,
273 publish_send_count: 0,
274 publish_recv: HashSet::default(),
275 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
276 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
277 status: ConnectionStatus::Disconnected,
278 pingreq_send_interval_ms: None,
279 pingreq_recv_timeout_ms: None,
280 pingresp_recv_timeout_ms: None,
281 qos2_publish_handled: HashSet::default(),
282 qos2_publish_processing: HashSet::default(),
283 pingreq_send_set: false,
284 pingreq_recv_set: false,
285 pingresp_recv_set: false,
286 packet_builder: PacketBuilder::new(),
287 is_client: false,
288 }
289 }
290
291 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
332 where
333 T: Sendable<Role, PacketIdType>,
334 {
335 packet.dispatch_send(self)
337 }
338
339 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
384 use core::any::TypeId;
385
386 let role_id = TypeId::of::<Role>();
387 let client_id = TypeId::of::<role::Client>();
388 let server_id = TypeId::of::<role::Server>();
389 let any_id = TypeId::of::<role::Any>();
390
391 let packet_version = packet.protocol_version();
393
394 if self.protocol_version != packet_version {
396 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
397 }
398
399 match packet {
400 GenericPacket::V3_1_1Connect(p) => {
402 if role_id == client_id || role_id == any_id {
403 self.process_send_v3_1_1_connect(p)
404 } else {
405 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
406 }
407 }
408 GenericPacket::V5_0Connect(p) => {
409 if role_id == client_id || role_id == any_id {
410 self.process_send_v5_0_connect(p)
411 } else {
412 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
413 }
414 }
415 GenericPacket::V3_1_1Connack(p) => {
417 if role_id == server_id || role_id == any_id {
418 self.process_send_v3_1_1_connack(p)
419 } else {
420 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
421 }
422 }
423 GenericPacket::V5_0Connack(p) => {
424 if role_id == server_id || role_id == any_id {
425 self.process_send_v5_0_connack(p)
426 } else {
427 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
428 }
429 }
430 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
432 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
433 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
435 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
436 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
437 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
438 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
439 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
440 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
441 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
442 GenericPacket::V3_1_1Subscribe(p) => {
444 if role_id == client_id || role_id == any_id {
445 self.process_send_v3_1_1_subscribe(p)
446 } else {
447 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
448 }
449 }
450 GenericPacket::V5_0Subscribe(p) => {
451 if role_id == client_id || role_id == any_id {
452 self.process_send_v5_0_subscribe(p)
453 } else {
454 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
455 }
456 }
457 GenericPacket::V3_1_1Unsubscribe(p) => {
458 if role_id == client_id || role_id == any_id {
459 self.process_send_v3_1_1_unsubscribe(p)
460 } else {
461 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
462 }
463 }
464 GenericPacket::V5_0Unsubscribe(p) => {
465 if role_id == client_id || role_id == any_id {
466 self.process_send_v5_0_unsubscribe(p)
467 } else {
468 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
469 }
470 }
471 GenericPacket::V3_1_1Suback(p) => {
473 if role_id == server_id || role_id == any_id {
474 self.process_send_v3_1_1_suback(p)
475 } else {
476 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
477 }
478 }
479 GenericPacket::V5_0Suback(p) => {
480 if role_id == server_id || role_id == any_id {
481 self.process_send_v5_0_suback(p)
482 } else {
483 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
484 }
485 }
486 GenericPacket::V3_1_1Unsuback(p) => {
487 if role_id == server_id || role_id == any_id {
488 self.process_send_v3_1_1_unsuback(p)
489 } else {
490 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
491 }
492 }
493 GenericPacket::V5_0Unsuback(p) => {
494 if role_id == server_id || role_id == any_id {
495 self.process_send_v5_0_unsuback(p)
496 } else {
497 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
498 }
499 }
500 GenericPacket::V3_1_1Pingreq(p) => {
502 if role_id == client_id || role_id == any_id {
503 self.process_send_v3_1_1_pingreq(p)
504 } else {
505 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
506 }
507 }
508 GenericPacket::V5_0Pingreq(p) => {
509 if role_id == client_id || role_id == any_id {
510 self.process_send_v5_0_pingreq(p)
511 } else {
512 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
513 }
514 }
515 GenericPacket::V3_1_1Pingresp(p) => {
517 if role_id == server_id || role_id == any_id {
518 self.process_send_v3_1_1_pingresp(p)
519 } else {
520 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
521 }
522 }
523 GenericPacket::V5_0Pingresp(p) => {
524 if role_id == server_id || role_id == any_id {
525 self.process_send_v5_0_pingresp(p)
526 } else {
527 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
528 }
529 }
530 GenericPacket::V3_1_1Disconnect(p) => {
532 if role_id == client_id || role_id == any_id {
533 self.process_send_v3_1_1_disconnect(p)
534 } else {
535 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
536 }
537 }
538 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
540 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
542 }
543 }
544
545 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
594 let mut events = Vec::new();
595
596 match self.packet_builder.feed(data) {
597 PacketBuildResult::Complete(raw_packet) => {
598 events.extend(self.process_recv_packet(raw_packet));
599 }
600 PacketBuildResult::Incomplete => {}
601 PacketBuildResult::Error(e) => {
602 self.cancel_timers(&mut events);
603 events.push(GenericEvent::RequestClose);
604 events.push(GenericEvent::NotifyError(e));
605 }
606 }
607
608 events
609 }
610
611 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
628 let mut events = Vec::new();
629
630 match kind {
631 TimerKind::PingreqSend => {
632 self.pingreq_send_set = false;
634
635 if self.status == ConnectionStatus::Connected {
637 match self.protocol_version {
638 Version::V3_1_1 => {
639 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
640 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
641 }
642 }
643 Version::V5_0 => {
644 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
645 events.extend(self.process_send_v5_0_pingreq(pingreq));
646 }
647 }
648 Version::Undetermined => {
649 unreachable!("Protocol version should be set before sending PINGREQ");
650 }
651 }
652 }
653 }
654 TimerKind::PingreqRecv => {
655 self.pingreq_recv_set = false;
657
658 match self.protocol_version {
659 Version::V3_1_1 => {
660 events.push(GenericEvent::RequestClose);
662 }
663 Version::V5_0 => {
664 if self.status == ConnectionStatus::Connected {
666 if let Ok(disconnect) = v5_0::Disconnect::builder()
667 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
668 .build()
669 {
670 events.extend(self.process_send_v5_0_disconnect(disconnect));
671 }
672 }
673 }
674 Version::Undetermined => {
675 unreachable!("Protocol version should be set before receiving PINGREQ");
676 }
677 }
678 }
679 TimerKind::PingrespRecv => {
680 self.pingresp_recv_set = false;
682
683 match self.protocol_version {
684 Version::V3_1_1 => {
685 events.push(GenericEvent::RequestClose);
687 }
688 Version::V5_0 => {
689 if self.status == ConnectionStatus::Connected {
691 if let Ok(disconnect) = v5_0::Disconnect::builder()
692 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
693 .build()
694 {
695 events.extend(self.process_send_v5_0_disconnect(disconnect));
696 }
697 }
698 }
699 Version::Undetermined => {
700 unreachable!("Protocol version should be set before receiving PINGRESP");
701 }
702 }
703 }
704 }
705
706 events
707 }
708
709 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
722 let mut events = Vec::new();
723
724 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
726 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
727
728 self.status = ConnectionStatus::Disconnected;
730
731 self.topic_alias_send = None;
733 self.topic_alias_recv = None;
734
735 for packet_id in self.pid_suback.drain() {
737 if self.pid_man.is_used_id(packet_id) {
738 self.pid_man.release_id(packet_id);
739 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
740 }
741 }
742
743 for packet_id in self.pid_unsuback.drain() {
745 if self.pid_man.is_used_id(packet_id) {
746 self.pid_man.release_id(packet_id);
747 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
748 }
749 }
750
751 if !self.need_store {
753 self.qos2_publish_processing.clear();
754 self.qos2_publish_handled.clear();
755
756 for packet_id in self.pid_puback.drain() {
758 if self.pid_man.is_used_id(packet_id) {
759 self.pid_man.release_id(packet_id);
760 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
761 }
762 }
763
764 for packet_id in self.pid_pubrec.drain() {
766 if self.pid_man.is_used_id(packet_id) {
767 self.pid_man.release_id(packet_id);
768 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
769 }
770 }
771
772 for packet_id in self.pid_pubcomp.drain() {
774 if self.pid_man.is_used_id(packet_id) {
775 self.pid_man.release_id(packet_id);
776 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
777 }
778 }
779 }
780
781 self.cancel_timers(&mut events);
783
784 events
785 }
786
787 pub fn set_pingreq_send_interval(
800 &mut self,
801 duration_ms: u64,
802 ) -> Vec<GenericEvent<PacketIdType>> {
803 let mut events = Vec::new();
804
805 if duration_ms == 0 {
806 self.pingreq_send_interval_ms = None;
807 self.pingreq_send_set = false;
808 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
809 } else {
810 self.pingreq_send_interval_ms = Some(duration_ms);
811 self.pingreq_send_set = true;
812 events.push(GenericEvent::RequestTimerReset {
813 kind: TimerKind::PingreqSend,
814 duration_ms,
815 });
816 }
817
818 events
819 }
820
821 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
830 self.publish_send_max
832 .map(|max| max.saturating_sub(self.publish_send_count))
833 }
834
835 pub fn set_offline_publish(&mut self, enable: bool) {
844 self.offline_publish = enable;
845 if self.offline_publish {
846 self.need_store = true;
847 }
848 }
849
850 pub fn set_auto_pub_response(&mut self, enable: bool) {
859 self.auto_pub_response = enable;
860 }
861
862 pub fn set_auto_ping_response(&mut self, enable: bool) {
870 self.auto_ping_response = enable;
871 }
872
873 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
885 self.auto_map_topic_alias_send = enable;
886 }
887
888 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
898 self.auto_replace_topic_alias_send = enable;
899 }
900
901 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
903 self.pingresp_recv_timeout_ms = timeout_ms;
904 }
905
906 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
912 self.pid_man.acquire_unique_id()
913 }
914
915 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
928 self.pid_man.register_id(packet_id)
929 }
930
931 pub fn release_packet_id(
945 &mut self,
946 packet_id: PacketIdType,
947 ) -> Vec<GenericEvent<PacketIdType>> {
948 let mut events = Vec::new();
949
950 if self.pid_man.is_used_id(packet_id) {
951 self.pid_man.release_id(packet_id);
952 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
953 }
954
955 events
956 }
957
958 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
967 self.qos2_publish_handled.clone()
968 }
969
970 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
979 self.qos2_publish_handled = pids;
980 }
981
982 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
991 for packet in packets {
992 match &packet {
993 GenericStorePacket::V3_1_1Publish(p) => {
994 match p.qos() {
996 Qos::AtLeastOnce => {
997 self.pid_puback.insert(p.packet_id().unwrap());
998 }
999 Qos::ExactlyOnce => {
1000 self.pid_pubrec.insert(p.packet_id().unwrap());
1001 }
1002 _ => {
1003 warn!("QoS 0 packet found in store, skipping");
1005 continue;
1006 }
1007 }
1008 let packet_id = p.packet_id().unwrap();
1010 if self.pid_man.register_id(packet_id).is_ok() {
1011 if let Err(_e) = self.store.add(packet) {
1012 error!("Failed to add packet to store: {:?}", _e);
1013 }
1014 } else {
1015 error!("Packet ID {} has already been used. Skip it", packet_id);
1016 }
1017 }
1018 GenericStorePacket::V5_0Publish(p) => {
1019 match p.qos() {
1021 Qos::AtLeastOnce => {
1022 self.pid_puback.insert(p.packet_id().unwrap());
1023 }
1024 Qos::ExactlyOnce => {
1025 self.pid_pubrec.insert(p.packet_id().unwrap());
1026 }
1027 _ => {
1028 warn!("QoS 0 packet found in store, skipping");
1030 continue;
1031 }
1032 }
1033 let packet_id = p.packet_id().unwrap();
1035 if self.pid_man.register_id(packet_id).is_ok() {
1036 if let Err(_e) = self.store.add(packet) {
1037 error!("Failed to add packet to store: {:?}", _e);
1038 }
1039 } else {
1040 error!("Packet ID {} has already been used. Skip it", packet_id);
1041 }
1042 }
1043 GenericStorePacket::V3_1_1Pubrel(p) => {
1044 self.pid_pubcomp.insert(p.packet_id());
1046 let packet_id = p.packet_id();
1048 if self.pid_man.register_id(packet_id).is_ok() {
1049 if let Err(_e) = self.store.add(packet) {
1050 error!("Failed to add packet to store: {:?}", _e);
1051 }
1052 } else {
1053 error!("Packet ID {} has already been used. Skip it", packet_id);
1054 }
1055 }
1056 GenericStorePacket::V5_0Pubrel(p) => {
1057 self.pid_pubcomp.insert(p.packet_id());
1059 let packet_id = p.packet_id();
1061 if self.pid_man.register_id(packet_id).is_ok() {
1062 if let Err(_e) = self.store.add(packet) {
1063 error!("Failed to add packet to store: {:?}", _e);
1064 }
1065 } else {
1066 error!("Packet ID {} has already been used. Skip it", packet_id);
1067 }
1068 }
1069 }
1070 }
1071 }
1072
1073 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1082 self.store.get_stored()
1083 }
1084
1085 pub fn get_protocol_version(&self) -> Version {
1091 self.protocol_version
1092 }
1093
1094 pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1104 self.qos2_publish_processing.contains(&packet_id)
1105 }
1106
1107 pub fn regulate_for_store(
1112 &self,
1113 mut packet: v5_0::GenericPublish<PacketIdType>,
1114 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1115 if packet.topic_name().is_empty() {
1116 if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1118 if let Some(ref topic_alias_send) = self.topic_alias_send {
1119 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1120 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1122 } else {
1123 return Err(MqttError::PacketNotRegulated);
1124 }
1125 } else {
1126 return Err(MqttError::PacketNotRegulated);
1127 }
1128 } else {
1129 return Err(MqttError::PacketNotRegulated);
1130 }
1131 } else {
1132 packet = packet.remove_topic_alias();
1134 }
1135
1136 Ok(packet)
1137 }
1138
1139 fn initialize(&mut self, is_client: bool) {
1153 self.publish_send_max = None;
1154 self.publish_recv_max = None;
1155 self.publish_send_count = 0;
1156 self.topic_alias_send = None;
1157 self.topic_alias_recv = None;
1158 self.publish_recv.clear();
1159 self.qos2_publish_processing.clear();
1160 self.need_store = false;
1161 self.pid_suback.clear();
1162 self.pid_unsuback.clear();
1163 self.is_client = is_client;
1164 }
1165
1166 fn clear_store_related(&mut self) {
1167 self.pid_man.clear();
1168 self.pid_puback.clear();
1169 self.pid_pubrec.clear();
1170 self.pid_pubcomp.clear();
1171 self.store.clear();
1172 }
1173
1174 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1176 let mut events = Vec::new();
1177 self.store.for_each(|packet| {
1178 if packet.size() > self.maximum_packet_size_send as usize {
1179 let packet_id = packet.packet_id();
1180 self.pid_man.release_id(packet_id);
1181 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1182 return false; }
1184 events.push(GenericEvent::RequestSendPacket {
1185 packet: packet.clone().into(),
1186 release_packet_id_if_send_error: None,
1187 });
1188 true });
1190
1191 events
1192 }
1193
1194 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1206 let topic_alias = topic_alias_opt?;
1207
1208 if !self.validate_topic_alias_range(topic_alias) {
1209 return None;
1210 }
1211
1212 let topic_alias_send = self.topic_alias_send.as_mut()?;
1213 let topic = topic_alias_send.get(topic_alias)?;
1215
1216 Some(topic.to_string())
1217 }
1218
1219 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1231 let topic_alias_send = match &self.topic_alias_send {
1232 Some(tas) => tas,
1233 None => {
1234 error!("topic_alias is set but topic_alias_maximum is 0");
1235 return false;
1236 }
1237 };
1238
1239 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1240 error!("topic_alias is set but out of range");
1241 return false;
1242 }
1243
1244 true
1245 }
1246
1247 pub(crate) fn process_send_v3_1_1_connect(
1249 &mut self,
1250 packet: v3_1_1::Connect,
1251 ) -> Vec<GenericEvent<PacketIdType>> {
1252 info!("send connect v3.1.1: {packet}");
1253
1254 if self.status != ConnectionStatus::Disconnected {
1255 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1256 }
1257 if !self.validate_maximum_packet_size_send(packet.size()) {
1258 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1259 }
1260
1261 let mut events = Vec::new();
1262 self.initialize(true);
1263 self.status = ConnectionStatus::Connecting;
1264
1265 let keep_alive = packet.keep_alive();
1267 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1268 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1269 }
1270
1271 if packet.clean_start() {
1273 self.clear_store_related();
1274 } else {
1275 self.need_store = true;
1276 }
1277
1278 self.topic_alias_send = None;
1280
1281 events.push(GenericEvent::RequestSendPacket {
1282 packet: packet.into(),
1283 release_packet_id_if_send_error: None,
1284 });
1285 self.send_post_process(&mut events);
1286
1287 events
1288 }
1289
1290 pub(crate) fn process_send_v5_0_connect(
1292 &mut self,
1293 packet: v5_0::Connect,
1294 ) -> Vec<GenericEvent<PacketIdType>> {
1295 info!("send connect v5.0: {packet}");
1296 if !self.validate_maximum_packet_size_send(packet.size()) {
1297 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1298 }
1299 if self.status != ConnectionStatus::Disconnected {
1300 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1301 }
1302
1303 let mut events = Vec::new();
1304 self.initialize(true);
1305 self.status = ConnectionStatus::Connecting;
1306
1307 let keep_alive = packet.keep_alive();
1309 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1310 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1311 }
1312
1313 if packet.clean_start() {
1315 self.clear_store_related();
1316 }
1317
1318 for prop in packet.props() {
1320 match prop {
1321 Property::TopicAliasMaximum(val) => {
1322 if val.val() != 0 {
1323 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1324 }
1325 }
1326 Property::ReceiveMaximum(val) => {
1327 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1328 self.publish_recv_max = Some(val.val());
1329 }
1330 Property::MaximumPacketSize(val) => {
1331 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1332 self.maximum_packet_size_recv = val.val();
1333 }
1334 Property::SessionExpiryInterval(val) => {
1335 if val.val() != 0 {
1336 self.need_store = true;
1337 }
1338 }
1339 _ => {
1340 }
1342 }
1343 }
1344
1345 events.push(GenericEvent::RequestSendPacket {
1346 packet: packet.into(),
1347 release_packet_id_if_send_error: None,
1348 });
1349 self.send_post_process(&mut events);
1350
1351 events
1352 }
1353
1354 pub(crate) fn process_send_v3_1_1_connack(
1355 &mut self,
1356 packet: v3_1_1::Connack,
1357 ) -> Vec<GenericEvent<PacketIdType>> {
1358 info!("send connack v3.1.1: {packet}");
1359 if self.status != ConnectionStatus::Connecting {
1360 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1361 }
1362 let mut events = Vec::new();
1363 if packet.return_code() == ConnectReturnCode::Accepted {
1364 self.status = ConnectionStatus::Connected;
1365 } else {
1366 self.status = ConnectionStatus::Disconnected;
1367 }
1368
1369 events.push(GenericEvent::RequestSendPacket {
1370 packet: packet.into(),
1371 release_packet_id_if_send_error: None,
1372 });
1373 events.extend(self.send_stored());
1374 self.send_post_process(&mut events);
1375
1376 events
1377 }
1378
1379 pub(crate) fn process_send_v5_0_connack(
1380 &mut self,
1381 packet: v5_0::Connack,
1382 ) -> Vec<GenericEvent<PacketIdType>> {
1383 info!("send connack v5.0: {packet}");
1384 if !self.validate_maximum_packet_size_send(packet.size()) {
1385 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1386 }
1387 if self.status != ConnectionStatus::Connecting {
1388 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1389 }
1390
1391 let mut events = Vec::new();
1392
1393 if packet.reason_code() == ConnectReasonCode::Success {
1394 self.status = ConnectionStatus::Connected;
1395
1396 for prop in packet.props() {
1398 match prop {
1399 Property::TopicAliasMaximum(val) => {
1400 if val.val() != 0 {
1401 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1402 }
1403 }
1404 Property::ReceiveMaximum(val) => {
1405 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1406 self.publish_recv_max = Some(val.val());
1407 }
1408 Property::MaximumPacketSize(val) => {
1409 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1410 self.maximum_packet_size_recv = val.val();
1411 }
1412 _ => {
1413 }
1415 }
1416 }
1417 } else {
1418 self.status = ConnectionStatus::Disconnected;
1419 self.cancel_timers(&mut events);
1420 }
1421
1422 events.push(GenericEvent::RequestSendPacket {
1423 packet: packet.into(),
1424 release_packet_id_if_send_error: None,
1425 });
1426 events.extend(self.send_stored());
1427 self.send_post_process(&mut events);
1428
1429 events
1430 }
1431
1432 pub(crate) fn process_send_v3_1_1_publish(
1433 &mut self,
1434 packet: v3_1_1::GenericPublish<PacketIdType>,
1435 ) -> Vec<GenericEvent<PacketIdType>> {
1436 let mut events = Vec::new();
1437 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1438
1439 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1440 let packet_id = packet.packet_id().unwrap();
1442 if self.status != ConnectionStatus::Connected
1443 && !self.need_store
1444 && !self.offline_publish
1445 {
1446 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1447 if self.pid_man.is_used_id(packet_id) {
1448 self.pid_man.release_id(packet_id);
1449 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1450 }
1451 return events;
1452 }
1453 if !self.pid_man.is_used_id(packet_id) {
1454 error!("packet_id {packet_id} must be acquired or registered");
1455 events.push(GenericEvent::NotifyError(
1456 MqttError::PacketIdentifierInvalid,
1457 ));
1458 return events;
1459 }
1460 if self.need_store
1461 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1462 {
1463 let store_packet = packet.clone().set_dup(true);
1464 self.store.add(store_packet.try_into().unwrap()).unwrap();
1465 } else {
1466 release_packet_id_if_send_error = Some(packet_id);
1467 }
1468 if packet.qos() == Qos::ExactlyOnce {
1469 self.qos2_publish_processing.insert(packet_id);
1470 self.pid_pubrec.insert(packet_id);
1471 } else {
1472 self.pid_puback.insert(packet_id);
1473 }
1474 } else if self.status != ConnectionStatus::Connected {
1475 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1476 return events;
1477 }
1478
1479 if self.status == ConnectionStatus::Connected {
1480 events.push(GenericEvent::RequestSendPacket {
1481 packet: packet.into(),
1482 release_packet_id_if_send_error,
1483 });
1484 }
1485 self.send_post_process(&mut events);
1486
1487 events
1488 }
1489
1490 pub(crate) fn process_send_v5_0_publish(
1491 &mut self,
1492 mut packet: v5_0::GenericPublish<PacketIdType>,
1493 ) -> Vec<GenericEvent<PacketIdType>> {
1494 if !self.validate_maximum_packet_size_send(packet.size()) {
1495 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1496 }
1497
1498 let mut events = Vec::new();
1499 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1500 let mut topic_alias_validated = false;
1501 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1502 let packet_id = packet.packet_id().unwrap();
1503 if self.status != ConnectionStatus::Connected
1504 && !self.need_store
1505 && !self.offline_publish
1506 {
1507 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1508 if self.pid_man.is_used_id(packet_id) {
1509 self.pid_man.release_id(packet_id);
1510 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1511 }
1512 return events;
1513 }
1514
1515 if !self.pid_man.is_used_id(packet_id) {
1517 error!("packet_id {packet_id} must be acquired or registered");
1518 events.push(GenericEvent::NotifyError(
1519 MqttError::PacketIdentifierInvalid,
1520 ));
1521 return events;
1522 }
1523
1524 if self.need_store
1525 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1526 {
1527 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1528 if packet.topic_name().is_empty() {
1529 let topic_opt = self.validate_topic_alias(ta_opt);
1531 if topic_opt.is_none() {
1532 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1533 if self.pid_man.is_used_id(packet_id) {
1534 self.pid_man.release_id(packet_id);
1535 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1536 }
1537 return events;
1538 }
1539 topic_alias_validated = true;
1540 let store_packet = packet
1541 .clone()
1542 .remove_topic_alias_add_topic(topic_opt.unwrap())
1543 .unwrap()
1544 .set_dup(true);
1545 self.store.add(store_packet.try_into().unwrap()).unwrap();
1547 } else {
1548 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1550 self.store.add(store_packet.try_into().unwrap()).unwrap();
1551 }
1552 } else {
1553 release_packet_id_if_send_error = Some(packet_id);
1554 }
1555 if packet.qos() == Qos::ExactlyOnce {
1556 self.qos2_publish_processing.insert(packet_id);
1557 self.pid_pubrec.insert(packet_id);
1558 } else {
1559 self.pid_puback.insert(packet_id);
1560 }
1561 } else if self.status != ConnectionStatus::Connected {
1562 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1563 return events;
1564 }
1565
1566 let packet_id_opt = packet.packet_id();
1567 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1568 if packet.topic_name().is_empty() {
1569 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1571 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1572 if let Some(packet_id) = packet_id_opt {
1573 if self.pid_man.is_used_id(packet_id) {
1574 self.pid_man.release_id(packet_id);
1575 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1576 }
1577 }
1578 return events;
1579 }
1580 } else if let Some(ta) = ta_opt {
1581 if self.validate_topic_alias_range(ta) {
1583 trace!(
1584 "topic alias: {} - {} is registered.",
1585 packet.topic_name(),
1586 ta
1587 );
1588 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590 }
1591 } else {
1592 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593 if let Some(packet_id) = packet_id_opt {
1594 if self.pid_man.is_used_id(packet_id) {
1595 self.pid_man.release_id(packet_id);
1596 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597 }
1598 }
1599 return events;
1600 }
1601 } else if self.status == ConnectionStatus::Connected {
1602 if self.auto_map_topic_alias_send {
1604 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1606 trace!(
1607 "topic alias: {} - {} is found.",
1608 packet.topic_name(),
1609 found_ta
1610 );
1611 packet = packet.remove_topic_add_topic_alias(found_ta);
1612 } else {
1613 let lru_ta = topic_alias_send.get_lru_alias();
1614 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1615 packet = packet.remove_topic_add_topic_alias(lru_ta);
1616 }
1617 }
1618 } else if self.auto_replace_topic_alias_send {
1619 if let Some(ref topic_alias_send) = self.topic_alias_send {
1620 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1621 trace!(
1622 "topic alias: {} - {} is found.",
1623 packet.topic_name(),
1624 found_ta
1625 );
1626 packet = packet.remove_topic_add_topic_alias(found_ta);
1627 }
1628 }
1629 }
1630 }
1631
1632 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1634 if let Some(max) = self.publish_send_max {
1635 if self.publish_send_count == max {
1636 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1637 if let Some(packet_id) = packet_id_opt {
1638 if self.pid_man.is_used_id(packet_id) {
1639 self.pid_man.release_id(packet_id);
1640 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1641 }
1642 }
1643 return events;
1644 }
1645 self.publish_send_count += 1;
1646 }
1647 }
1648
1649 if self.status == ConnectionStatus::Connected {
1650 events.push(GenericEvent::RequestSendPacket {
1651 packet: packet.into(),
1652 release_packet_id_if_send_error,
1653 });
1654 }
1655 self.send_post_process(&mut events);
1656
1657 events
1658 }
1659
1660 pub(crate) fn process_send_v3_1_1_puback(
1661 &mut self,
1662 packet: v3_1_1::GenericPuback<PacketIdType>,
1663 ) -> Vec<GenericEvent<PacketIdType>> {
1664 if self.status != ConnectionStatus::Connected {
1665 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1666 }
1667 let mut events = Vec::new();
1668
1669 events.push(GenericEvent::RequestSendPacket {
1670 packet: packet.into(),
1671 release_packet_id_if_send_error: None,
1672 });
1673 self.send_post_process(&mut events);
1674
1675 events
1676 }
1677
1678 pub(crate) fn process_send_v5_0_puback(
1679 &mut self,
1680 packet: v5_0::GenericPuback<PacketIdType>,
1681 ) -> Vec<GenericEvent<PacketIdType>> {
1682 if !self.validate_maximum_packet_size_send(packet.size()) {
1683 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1684 }
1685 if self.status != ConnectionStatus::Connected {
1686 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1687 }
1688
1689 let mut events = Vec::new();
1690 self.publish_recv.remove(&packet.packet_id());
1691
1692 events.push(GenericEvent::RequestSendPacket {
1693 packet: packet.into(),
1694 release_packet_id_if_send_error: None,
1695 });
1696 self.send_post_process(&mut events);
1697
1698 events
1699 }
1700
1701 pub(crate) fn process_send_v3_1_1_pubrec(
1702 &mut self,
1703 packet: v3_1_1::GenericPubrec<PacketIdType>,
1704 ) -> Vec<GenericEvent<PacketIdType>> {
1705 if self.status != ConnectionStatus::Connected {
1706 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1707 }
1708 let mut events = Vec::new();
1709
1710 events.push(GenericEvent::RequestSendPacket {
1711 packet: packet.into(),
1712 release_packet_id_if_send_error: None,
1713 });
1714 self.send_post_process(&mut events);
1715
1716 events
1717 }
1718
1719 pub(crate) fn process_send_v5_0_pubrec(
1720 &mut self,
1721 packet: v5_0::GenericPubrec<PacketIdType>,
1722 ) -> Vec<GenericEvent<PacketIdType>> {
1723 if !self.validate_maximum_packet_size_send(packet.size()) {
1724 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1725 }
1726 if self.status != ConnectionStatus::Connected {
1727 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1728 }
1729
1730 let mut events = Vec::new();
1731 let packet_id = packet.packet_id();
1732
1733 if let Some(rc) = packet.reason_code() {
1734 if rc.is_failure() {
1735 self.publish_recv.remove(&packet_id);
1736 self.qos2_publish_handled.remove(&packet_id);
1737 }
1738 }
1739
1740 events.push(GenericEvent::RequestSendPacket {
1741 packet: packet.into(),
1742 release_packet_id_if_send_error: None,
1743 });
1744 self.send_post_process(&mut events);
1745
1746 events
1747 }
1748
1749 pub(crate) fn process_send_v3_1_1_pubrel(
1750 &mut self,
1751 packet: v3_1_1::GenericPubrel<PacketIdType>,
1752 ) -> Vec<GenericEvent<PacketIdType>> {
1753 if self.status != ConnectionStatus::Connected && !self.need_store {
1754 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1755 }
1756 let mut events = Vec::new();
1757 let packet_id = packet.packet_id();
1758 if !self.pid_man.is_used_id(packet_id) {
1759 error!("packet_id {packet_id} must be acquired or registered");
1760 events.push(GenericEvent::NotifyError(
1761 MqttError::PacketIdentifierInvalid,
1762 ));
1763 return events;
1764 }
1765 if self.need_store {
1766 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1767 }
1768
1769 if self.status == ConnectionStatus::Connected {
1770 self.pid_pubcomp.insert(packet_id);
1771 events.push(GenericEvent::RequestSendPacket {
1772 packet: packet.into(),
1773 release_packet_id_if_send_error: None,
1774 });
1775 }
1776 self.send_post_process(&mut events);
1777
1778 events
1779 }
1780
1781 pub(crate) fn process_send_v5_0_pubrel(
1782 &mut self,
1783 packet: v5_0::GenericPubrel<PacketIdType>,
1784 ) -> Vec<GenericEvent<PacketIdType>> {
1785 if !self.validate_maximum_packet_size_send(packet.size()) {
1786 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1787 }
1788 if self.status != ConnectionStatus::Connected && !self.need_store {
1789 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1790 }
1791
1792 let mut events = Vec::new();
1793 let packet_id = packet.packet_id();
1794 if !self.pid_man.is_used_id(packet_id) {
1795 error!("packet_id {packet_id} must be acquired or registered");
1796 events.push(GenericEvent::NotifyError(
1797 MqttError::PacketIdentifierInvalid,
1798 ));
1799 return events;
1800 }
1801 if self.need_store {
1802 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1803 }
1804
1805 if self.status == ConnectionStatus::Connected {
1806 self.pid_pubcomp.insert(packet_id);
1807 events.push(GenericEvent::RequestSendPacket {
1808 packet: packet.into(),
1809 release_packet_id_if_send_error: None,
1810 });
1811 }
1812 self.send_post_process(&mut events);
1813
1814 events
1815 }
1816
1817 pub(crate) fn process_send_v3_1_1_pubcomp(
1818 &mut self,
1819 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1820 ) -> Vec<GenericEvent<PacketIdType>> {
1821 if self.status != ConnectionStatus::Connected {
1822 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1823 }
1824 let mut events = Vec::new();
1825
1826 events.push(GenericEvent::RequestSendPacket {
1827 packet: packet.into(),
1828 release_packet_id_if_send_error: None,
1829 });
1830 self.send_post_process(&mut events);
1831
1832 events
1833 }
1834
1835 pub(crate) fn process_send_v5_0_pubcomp(
1836 &mut self,
1837 packet: v5_0::GenericPubcomp<PacketIdType>,
1838 ) -> Vec<GenericEvent<PacketIdType>> {
1839 if !self.validate_maximum_packet_size_send(packet.size()) {
1840 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1841 }
1842 if self.status != ConnectionStatus::Connected {
1843 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1844 }
1845
1846 let mut events = Vec::new();
1847 self.publish_recv.remove(&packet.packet_id());
1848
1849 events.push(GenericEvent::RequestSendPacket {
1850 packet: packet.into(),
1851 release_packet_id_if_send_error: None,
1852 });
1853 self.send_post_process(&mut events);
1854
1855 events
1856 }
1857
1858 pub(crate) fn process_send_v3_1_1_subscribe(
1859 &mut self,
1860 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1861 ) -> Vec<GenericEvent<PacketIdType>> {
1862 let mut events = Vec::new();
1863 let packet_id = packet.packet_id();
1864 if self.status != ConnectionStatus::Connected {
1865 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1866 if self.pid_man.is_used_id(packet_id) {
1867 self.pid_man.release_id(packet_id);
1868 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1869 }
1870 return events;
1871 }
1872 if !self.pid_man.is_used_id(packet_id) {
1873 error!("packet_id {packet_id} must be acquired or registered");
1874 events.push(GenericEvent::NotifyError(
1875 MqttError::PacketIdentifierInvalid,
1876 ));
1877 return events;
1878 }
1879 self.pid_suback.insert(packet_id);
1880
1881 events.push(GenericEvent::RequestSendPacket {
1882 packet: packet.into(),
1883 release_packet_id_if_send_error: Some(packet_id),
1884 });
1885 self.send_post_process(&mut events);
1886
1887 events
1888 }
1889
1890 pub(crate) fn process_send_v5_0_subscribe(
1891 &mut self,
1892 packet: v5_0::GenericSubscribe<PacketIdType>,
1893 ) -> Vec<GenericEvent<PacketIdType>> {
1894 if !self.validate_maximum_packet_size_send(packet.size()) {
1895 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1896 }
1897
1898 let mut events = Vec::new();
1899 let packet_id = packet.packet_id();
1900 if self.status != ConnectionStatus::Connected {
1901 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1902 if self.pid_man.is_used_id(packet_id) {
1903 self.pid_man.release_id(packet_id);
1904 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1905 }
1906 return events;
1907 }
1908 if !self.pid_man.is_used_id(packet_id) {
1909 error!("packet_id {packet_id} must be acquired or registered");
1910 events.push(GenericEvent::NotifyError(
1911 MqttError::PacketIdentifierInvalid,
1912 ));
1913 return events;
1914 }
1915 self.pid_suback.insert(packet_id);
1916
1917 events.push(GenericEvent::RequestSendPacket {
1918 packet: packet.into(),
1919 release_packet_id_if_send_error: Some(packet_id),
1920 });
1921 self.send_post_process(&mut events);
1922
1923 events
1924 }
1925
1926 pub(crate) fn process_send_v3_1_1_suback(
1927 &mut self,
1928 packet: v3_1_1::GenericSuback<PacketIdType>,
1929 ) -> Vec<GenericEvent<PacketIdType>> {
1930 if self.status != ConnectionStatus::Connected {
1931 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1932 }
1933 let mut events = Vec::new();
1934 events.push(GenericEvent::RequestSendPacket {
1935 packet: packet.into(),
1936 release_packet_id_if_send_error: None,
1937 });
1938 self.send_post_process(&mut events);
1939
1940 events
1941 }
1942
1943 pub(crate) fn process_send_v5_0_suback(
1944 &mut self,
1945 packet: v5_0::GenericSuback<PacketIdType>,
1946 ) -> Vec<GenericEvent<PacketIdType>> {
1947 if !self.validate_maximum_packet_size_send(packet.size()) {
1948 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1949 }
1950 if self.status != ConnectionStatus::Connected {
1951 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1952 }
1953
1954 let mut events = Vec::new();
1955 events.push(GenericEvent::RequestSendPacket {
1956 packet: packet.into(),
1957 release_packet_id_if_send_error: None,
1958 });
1959 self.send_post_process(&mut events);
1960
1961 events
1962 }
1963
1964 pub(crate) fn process_send_v3_1_1_unsubscribe(
1965 &mut self,
1966 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1967 ) -> Vec<GenericEvent<PacketIdType>> {
1968 let mut events = Vec::new();
1969 let packet_id = packet.packet_id();
1970 if self.status != ConnectionStatus::Connected {
1971 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1972 if self.pid_man.is_used_id(packet_id) {
1973 self.pid_man.release_id(packet_id);
1974 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1975 }
1976 return events;
1977 }
1978 if !self.pid_man.is_used_id(packet_id) {
1979 error!("packet_id {packet_id} must be acquired or registered");
1980 events.push(GenericEvent::NotifyError(
1981 MqttError::PacketIdentifierInvalid,
1982 ));
1983 return events;
1984 }
1985 self.pid_unsuback.insert(packet_id);
1986
1987 events.push(GenericEvent::RequestSendPacket {
1988 packet: packet.into(),
1989 release_packet_id_if_send_error: Some(packet_id),
1990 });
1991 self.send_post_process(&mut events);
1992
1993 events
1994 }
1995
1996 pub(crate) fn process_send_v5_0_unsubscribe(
1997 &mut self,
1998 packet: v5_0::GenericUnsubscribe<PacketIdType>,
1999 ) -> Vec<GenericEvent<PacketIdType>> {
2000 if !self.validate_maximum_packet_size_send(packet.size()) {
2001 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2002 }
2003
2004 let mut events = Vec::new();
2005 let packet_id = packet.packet_id();
2006 if self.status != ConnectionStatus::Connected {
2007 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2008 if self.pid_man.is_used_id(packet_id) {
2009 self.pid_man.release_id(packet_id);
2010 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2011 }
2012 return events;
2013 }
2014 if !self.pid_man.is_used_id(packet_id) {
2015 error!("packet_id {packet_id} must be acquired or registered");
2016 events.push(GenericEvent::NotifyError(
2017 MqttError::PacketIdentifierInvalid,
2018 ));
2019 return events;
2020 }
2021 self.pid_unsuback.insert(packet_id);
2022
2023 events.push(GenericEvent::RequestSendPacket {
2024 packet: packet.into(),
2025 release_packet_id_if_send_error: Some(packet_id),
2026 });
2027 self.send_post_process(&mut events);
2028
2029 events
2030 }
2031
2032 pub(crate) fn process_send_v3_1_1_unsuback(
2033 &mut self,
2034 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2035 ) -> Vec<GenericEvent<PacketIdType>> {
2036 if self.status != ConnectionStatus::Connected {
2037 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2038 }
2039 let mut events = Vec::new();
2040 events.push(GenericEvent::RequestSendPacket {
2041 packet: packet.into(),
2042 release_packet_id_if_send_error: None,
2043 });
2044 self.send_post_process(&mut events);
2045
2046 events
2047 }
2048
2049 pub(crate) fn process_send_v5_0_unsuback(
2050 &mut self,
2051 packet: v5_0::GenericUnsuback<PacketIdType>,
2052 ) -> Vec<GenericEvent<PacketIdType>> {
2053 if !self.validate_maximum_packet_size_send(packet.size()) {
2054 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2055 }
2056 if self.status != ConnectionStatus::Connected {
2057 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2058 }
2059
2060 let mut events = Vec::new();
2061 events.push(GenericEvent::RequestSendPacket {
2062 packet: packet.into(),
2063 release_packet_id_if_send_error: None,
2064 });
2065 self.send_post_process(&mut events);
2066
2067 events
2068 }
2069
2070 pub(crate) fn process_send_v3_1_1_pingreq(
2071 &mut self,
2072 packet: v3_1_1::Pingreq,
2073 ) -> Vec<GenericEvent<PacketIdType>> {
2074 if self.status != ConnectionStatus::Connected {
2075 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076 }
2077 let mut events = Vec::new();
2078 events.push(GenericEvent::RequestSendPacket {
2079 packet: packet.into(),
2080 release_packet_id_if_send_error: None,
2081 });
2082 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2083 self.pingreq_send_set = true;
2084 events.push(GenericEvent::RequestTimerReset {
2085 kind: TimerKind::PingrespRecv,
2086 duration_ms: timeout_ms,
2087 });
2088 }
2089 self.send_post_process(&mut events);
2090
2091 events
2092 }
2093
2094 pub(crate) fn process_send_v5_0_pingreq(
2095 &mut self,
2096 packet: v5_0::Pingreq,
2097 ) -> Vec<GenericEvent<PacketIdType>> {
2098 if !self.validate_maximum_packet_size_send(packet.size()) {
2099 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2100 }
2101 if self.status != ConnectionStatus::Connected {
2102 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2103 }
2104
2105 let mut events = Vec::new();
2106 events.push(GenericEvent::RequestSendPacket {
2107 packet: packet.into(),
2108 release_packet_id_if_send_error: None,
2109 });
2110 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2111 self.pingreq_send_set = true;
2112 events.push(GenericEvent::RequestTimerReset {
2113 kind: TimerKind::PingrespRecv,
2114 duration_ms: timeout_ms,
2115 });
2116 }
2117 self.send_post_process(&mut events);
2118
2119 events
2120 }
2121
2122 pub(crate) fn process_send_v3_1_1_pingresp(
2123 &mut self,
2124 packet: v3_1_1::Pingresp,
2125 ) -> Vec<GenericEvent<PacketIdType>> {
2126 if self.status != ConnectionStatus::Connected {
2127 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2128 }
2129 let mut events = Vec::new();
2130 events.push(GenericEvent::RequestSendPacket {
2131 packet: packet.into(),
2132 release_packet_id_if_send_error: None,
2133 });
2134 self.send_post_process(&mut events);
2135
2136 events
2137 }
2138
2139 pub(crate) fn process_send_v5_0_pingresp(
2140 &mut self,
2141 packet: v5_0::Pingresp,
2142 ) -> Vec<GenericEvent<PacketIdType>> {
2143 if !self.validate_maximum_packet_size_send(packet.size()) {
2144 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2145 }
2146 if self.status != ConnectionStatus::Connected {
2147 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2148 }
2149
2150 let mut events = Vec::new();
2151 events.push(GenericEvent::RequestSendPacket {
2152 packet: packet.into(),
2153 release_packet_id_if_send_error: None,
2154 });
2155 self.send_post_process(&mut events);
2156
2157 events
2158 }
2159
2160 pub(crate) fn process_send_v3_1_1_disconnect(
2161 &mut self,
2162 packet: v3_1_1::Disconnect,
2163 ) -> Vec<GenericEvent<PacketIdType>> {
2164 if self.status != ConnectionStatus::Connected {
2165 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166 }
2167 let mut events = Vec::new();
2168 self.status = ConnectionStatus::Disconnected;
2169 self.cancel_timers(&mut events);
2170 events.push(GenericEvent::RequestSendPacket {
2171 packet: packet.into(),
2172 release_packet_id_if_send_error: None,
2173 });
2174 events.push(GenericEvent::RequestClose);
2175
2176 events
2177 }
2178
2179 pub(crate) fn process_send_v5_0_disconnect(
2180 &mut self,
2181 packet: v5_0::Disconnect,
2182 ) -> Vec<GenericEvent<PacketIdType>> {
2183 if !self.validate_maximum_packet_size_send(packet.size()) {
2184 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2185 }
2186 if self.status != ConnectionStatus::Connected {
2187 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2188 }
2189
2190 let mut events = Vec::new();
2191 self.status = ConnectionStatus::Disconnected;
2192 self.cancel_timers(&mut events);
2193 events.push(GenericEvent::RequestSendPacket {
2194 packet: packet.into(),
2195 release_packet_id_if_send_error: None,
2196 });
2197 events.push(GenericEvent::RequestClose);
2198
2199 events
2200 }
2201
2202 pub(crate) fn process_send_v5_0_auth(
2203 &mut self,
2204 packet: v5_0::Auth,
2205 ) -> Vec<GenericEvent<PacketIdType>> {
2206 if !self.validate_maximum_packet_size_send(packet.size()) {
2207 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2208 }
2209 if self.status == ConnectionStatus::Disconnected {
2210 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2211 }
2212
2213 let mut events = Vec::new();
2214 events.push(GenericEvent::RequestSendPacket {
2215 packet: packet.into(),
2216 release_packet_id_if_send_error: None,
2217 });
2218 self.send_post_process(&mut events);
2219
2220 events
2221 }
2222
2223 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2224 if self.is_client {
2225 if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2226 self.pingreq_send_set = true;
2227 events.push(GenericEvent::RequestTimerReset {
2228 kind: TimerKind::PingreqSend,
2229 duration_ms: timeout_ms,
2230 });
2231 }
2232 }
2233 }
2234
2235 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2236 if size > self.maximum_packet_size_send as usize {
2237 error!("packet size over maximum_packet_size for sending");
2238 return false;
2239 }
2240 true
2241 }
2242
2243 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2244 let mut events = Vec::new();
2245
2246 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2248 if total_size > self.maximum_packet_size_recv {
2249 let disconnect_packet = v5_0::Disconnect::builder()
2255 .reason_code(DisconnectReasonCode::PacketTooLarge)
2256 .build()
2257 .unwrap();
2258 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2260 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2261 return events;
2262 }
2263
2264 let packet_type = raw_packet.packet_type();
2265 let _flags = raw_packet.flags();
2266 match self.protocol_version {
2267 Version::V3_1_1 => {
2268 match packet_type {
2269 1 => {
2270 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2272 }
2273 2 => {
2274 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2276 }
2277 3 => {
2278 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2280 }
2281 4 => {
2282 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2284 }
2285 5 => {
2286 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2288 }
2289 6 => {
2290 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2292 }
2293 7 => {
2294 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2296 }
2297 8 => {
2298 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2300 }
2301 9 => {
2302 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2304 }
2305 10 => {
2306 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2308 }
2309 11 => {
2310 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2312 }
2313 12 => {
2314 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2316 }
2317 13 => {
2318 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2320 }
2321 14 => {
2322 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2324 }
2325 _ => {
2327 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2328 }
2329 }
2330 }
2331 Version::V5_0 => {
2332 match packet_type {
2333 1 => {
2334 events.extend(self.process_recv_v5_0_connect(raw_packet));
2336 }
2337 2 => {
2338 events.extend(self.process_recv_v5_0_connack(raw_packet));
2340 }
2341 3 => {
2342 events.extend(self.process_recv_v5_0_publish(raw_packet));
2344 }
2345 4 => {
2346 events.extend(self.process_recv_v5_0_puback(raw_packet));
2348 }
2349 5 => {
2350 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2352 }
2353 6 => {
2354 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2356 }
2357 7 => {
2358 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2360 }
2361 8 => {
2362 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2364 }
2365 9 => {
2366 events.extend(self.process_recv_v5_0_suback(raw_packet));
2368 }
2369 10 => {
2370 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2372 }
2373 11 => {
2374 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2376 }
2377 12 => {
2378 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2380 }
2381 13 => {
2382 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2384 }
2385 14 => {
2386 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2388 }
2389 15 => {
2390 events.extend(self.process_recv_v5_0_auth(raw_packet));
2392 }
2393 _ => {
2395 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2396 }
2397 }
2398 }
2399 Version::Undetermined => {
2400 match packet_type {
2401 1 => {
2402 if raw_packet.remaining_length() < 7 {
2404 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2405 return events;
2406 }
2407 match raw_packet.data_as_slice()[6] {
2408 4 => {
2410 self.protocol_version = Version::V3_1_1;
2411 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2412 }
2413 5 => {
2414 self.protocol_version = Version::V5_0;
2415 events.extend(self.process_recv_v5_0_connect(raw_packet));
2416 }
2417 _ => {
2418 events.push(GenericEvent::NotifyError(
2419 MqttError::UnsupportedProtocolVersion,
2420 ));
2421 }
2422 }
2423 }
2424 _ => {
2425 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2426 }
2427 }
2428 }
2429 }
2430
2431 events
2432 }
2433
2434 fn process_recv_v3_1_1_connect(
2435 &mut self,
2436 raw_packet: RawPacket,
2437 ) -> Vec<GenericEvent<PacketIdType>> {
2438 let mut events = Vec::new();
2439 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2440 Ok((packet, _)) => {
2441 if self.status != ConnectionStatus::Disconnected {
2442 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2443 return events;
2444 }
2445 self.initialize(false);
2446 self.status = ConnectionStatus::Connecting;
2447 if packet.keep_alive() > 0 {
2448 self.pingreq_recv_timeout_ms =
2449 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2450 }
2451 if packet.clean_session() {
2452 self.clear_store_related();
2453 } else {
2454 self.need_store = true;
2455 }
2456 events.extend(self.refresh_pingreq_recv());
2457 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2458 }
2459 Err(e) => {
2460 if self.status == ConnectionStatus::Disconnected {
2461 self.status = ConnectionStatus::Connecting;
2462 let rc = match e {
2463 MqttError::ClientIdentifierNotValid => {
2464 ConnectReturnCode::IdentifierRejected
2465 }
2466 MqttError::BadUserNameOrPassword => {
2467 ConnectReturnCode::BadUserNameOrPassword
2468 }
2469 MqttError::UnsupportedProtocolVersion => {
2470 ConnectReturnCode::UnacceptableProtocolVersion
2471 }
2472 _ => ConnectReturnCode::NotAuthorized, };
2474 let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2475 let connack_events = self.process_send_v3_1_1_connack(connack);
2476 events.extend(connack_events);
2477 } else {
2478 events.push(GenericEvent::RequestClose);
2479 }
2480 events.push(GenericEvent::NotifyError(e));
2481 }
2482 }
2483
2484 events
2485 }
2486
2487 fn process_recv_v5_0_connect(
2488 &mut self,
2489 raw_packet: RawPacket,
2490 ) -> Vec<GenericEvent<PacketIdType>> {
2491 let mut events = Vec::new();
2492 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2493 Ok((packet, _)) => {
2494 if self.status != ConnectionStatus::Disconnected {
2495 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2496 return events;
2497 }
2498 self.initialize(false);
2499 self.status = ConnectionStatus::Connecting;
2500 if packet.keep_alive() > 0 {
2501 self.pingreq_recv_timeout_ms =
2502 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2503 }
2504 if packet.clean_start() {
2505 self.clear_store_related();
2506 }
2507 packet.props().iter().for_each(|prop| match prop {
2508 Property::TopicAliasMaximum(p) => {
2509 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2510 }
2511 Property::ReceiveMaximum(p) => {
2512 self.publish_send_max = Some(p.val());
2513 }
2514 Property::MaximumPacketSize(p) => {
2515 self.maximum_packet_size_send = p.val();
2516 }
2517 Property::SessionExpiryInterval(p) => {
2518 if p.val() != 0 {
2519 self.need_store = true;
2520 }
2521 }
2522 _ => {}
2523 });
2524 events.extend(self.refresh_pingreq_recv());
2525 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2526 }
2527 Err(e) => {
2528 if self.status == ConnectionStatus::Disconnected {
2529 self.status = ConnectionStatus::Connecting;
2530 let rc = match e {
2531 MqttError::ClientIdentifierNotValid => {
2532 ConnectReasonCode::ClientIdentifierNotValid
2533 }
2534 MqttError::BadUserNameOrPassword => {
2535 ConnectReasonCode::BadAuthenticationMethod
2536 }
2537 MqttError::UnsupportedProtocolVersion => {
2538 ConnectReasonCode::UnsupportedProtocolVersion
2539 }
2540 _ => ConnectReasonCode::UnspecifiedError,
2541 };
2542 let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2543 let connack_events = self.process_send_v5_0_connack(connack);
2544 events.extend(connack_events);
2545 } else {
2546 let disconnect = v5_0::Disconnect::builder()
2547 .reason_code(DisconnectReasonCode::ProtocolError)
2548 .build()
2549 .unwrap();
2550 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2551 events.extend(disconnect_events);
2552 }
2553 events.push(GenericEvent::NotifyError(e));
2554 }
2555 }
2556
2557 events
2558 }
2559
2560 fn process_recv_v3_1_1_connack(
2561 &mut self,
2562 raw_packet: RawPacket,
2563 ) -> Vec<GenericEvent<PacketIdType>> {
2564 let mut events = Vec::new();
2565
2566 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2567 Ok((packet, _consumed)) => {
2568 if packet.return_code() == ConnectReturnCode::Accepted {
2569 self.status = ConnectionStatus::Connected;
2570 if packet.session_present() {
2571 events.extend(self.send_stored());
2572 } else {
2573 self.clear_store_related();
2574 }
2575 }
2576 events.push(GenericEvent::NotifyPacketReceived(
2577 GenericPacket::V3_1_1Connack(packet),
2578 ));
2579 }
2580 Err(e) => {
2581 events.push(GenericEvent::RequestClose);
2582 events.push(GenericEvent::NotifyError(e));
2583 }
2584 }
2585
2586 events
2587 }
2588
2589 fn process_recv_v5_0_connack(
2590 &mut self,
2591 raw_packet: RawPacket,
2592 ) -> Vec<GenericEvent<PacketIdType>> {
2593 let mut events = Vec::new();
2594
2595 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2596 Ok((packet, _consumed)) => {
2597 if packet.reason_code() == ConnectReasonCode::Success {
2598 self.status = ConnectionStatus::Connected;
2599
2600 for prop in packet.props() {
2602 match prop {
2603 Property::TopicAliasMaximum(val) => {
2604 if val.val() > 0 {
2605 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2606 }
2607 }
2608 Property::ReceiveMaximum(val) => {
2609 assert!(val.val() != 0);
2610 self.publish_send_max = Some(val.val());
2611 }
2612 Property::MaximumPacketSize(val) => {
2613 assert!(val.val() != 0);
2614 self.maximum_packet_size_send = val.val();
2615 }
2616 Property::ServerKeepAlive(val) => {
2617 let timeout_ms = (val.val() as u64) * 1000;
2619 self.pingreq_send_interval_ms = Some(timeout_ms);
2620 }
2621 _ => {
2622 }
2624 }
2625 }
2626
2627 if packet.session_present() {
2628 events.extend(self.send_stored());
2629 } else {
2630 self.clear_store_related();
2631 }
2632 }
2633 events.push(GenericEvent::NotifyPacketReceived(
2634 GenericPacket::V5_0Connack(packet),
2635 ));
2636 }
2637 Err(e) => {
2638 if self.status == ConnectionStatus::Connected {
2639 let disconnect = v5_0::Disconnect::builder()
2640 .reason_code(e.into())
2641 .build()
2642 .unwrap();
2643 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2644 events.extend(disconnect_events);
2645 }
2646 events.push(GenericEvent::NotifyError(e));
2647 }
2648 }
2649
2650 events
2651 }
2652
2653 fn process_recv_v3_1_1_publish(
2654 &mut self,
2655 raw_packet: RawPacket,
2656 ) -> Vec<GenericEvent<PacketIdType>> {
2657 let mut events = Vec::new();
2658
2659 let flags = raw_packet.flags();
2660 match &raw_packet.data {
2661 PacketData::Publish(arc) => {
2662 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2663 Ok((packet, _consumed)) => {
2664 match packet.qos() {
2665 Qos::AtMostOnce => {
2666 events.extend(self.refresh_pingreq_recv());
2667 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2668 }
2669 Qos::AtLeastOnce => {
2670 let packet_id = packet.packet_id().unwrap();
2671 if self.status == ConnectionStatus::Connected
2672 && self.auto_pub_response
2673 {
2674 let puback = v3_1_1::GenericPuback::builder()
2676 .packet_id(packet_id)
2677 .build()
2678 .unwrap();
2679 events.extend(self.process_send_v3_1_1_puback(puback));
2680 }
2681 events.extend(self.refresh_pingreq_recv());
2682 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2683 }
2684 Qos::ExactlyOnce => {
2685 let packet_id = packet.packet_id().unwrap();
2686 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2687
2688 if self.status == ConnectionStatus::Connected
2689 && (self.auto_pub_response || already_handled)
2690 {
2691 let pubrec = v3_1_1::GenericPubrec::builder()
2692 .packet_id(packet_id)
2693 .build()
2694 .unwrap();
2695 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2696 }
2697 events.extend(self.refresh_pingreq_recv());
2698 if !already_handled {
2699 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2700 }
2701 }
2702 }
2703 }
2704 Err(e) => {
2705 events.push(GenericEvent::RequestClose);
2706 events.push(GenericEvent::NotifyError(e));
2707 }
2708 }
2709 }
2710 PacketData::Normal(_) => {
2711 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2712 }
2713 }
2714
2715 events
2716 }
2717
2718 fn process_recv_v5_0_publish(
2719 &mut self,
2720 raw_packet: RawPacket,
2721 ) -> Vec<GenericEvent<PacketIdType>> {
2722 let mut events = Vec::new();
2723
2724 let flags = raw_packet.flags();
2725 match &raw_packet.data {
2726 PacketData::Publish(arc) => {
2727 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2728 Ok((mut packet, _consumed)) => {
2729 let mut already_handled = false;
2730 let mut puback_send = false;
2731 let mut pubrec_send = false;
2732
2733 match packet.qos() {
2734 Qos::AtLeastOnce => {
2735 let packet_id = packet.packet_id().unwrap();
2736 if let Some(max) = self.publish_recv_max {
2737 if self.publish_recv.len() >= max as usize {
2738 let disconnect = v5_0::Disconnect::builder()
2739 .reason_code(
2740 DisconnectReasonCode::ReceiveMaximumExceeded,
2741 )
2742 .build()
2743 .unwrap();
2744 events
2745 .extend(self.process_send_v5_0_disconnect(disconnect));
2746 events.push(GenericEvent::NotifyError(
2747 MqttError::ReceiveMaximumExceeded,
2748 ));
2749 return events;
2750 }
2751 }
2752 self.publish_recv.insert(packet_id);
2753 if self.auto_pub_response
2754 && self.status == ConnectionStatus::Connected
2755 {
2756 puback_send = true;
2757 }
2758 }
2759 Qos::ExactlyOnce => {
2760 let packet_id = packet.packet_id().unwrap();
2761 if let Some(max) = self.publish_recv_max {
2762 if self.publish_recv.len() >= max as usize {
2763 let disconnect = v5_0::Disconnect::builder()
2764 .reason_code(
2765 DisconnectReasonCode::ReceiveMaximumExceeded,
2766 )
2767 .build()
2768 .unwrap();
2769 events
2770 .extend(self.process_send_v5_0_disconnect(disconnect));
2771 events.push(GenericEvent::NotifyError(
2772 MqttError::ReceiveMaximumExceeded,
2773 ));
2774 return events;
2775 }
2776 }
2777 self.publish_recv.insert(packet_id);
2778
2779 if !self.qos2_publish_handled.insert(packet_id) {
2780 already_handled = true;
2781 }
2782 if self.status == ConnectionStatus::Connected
2783 && (self.auto_pub_response || already_handled)
2784 {
2785 pubrec_send = true;
2786 }
2787 }
2788 Qos::AtMostOnce => {
2789 }
2791 }
2792
2793 if packet.topic_name().is_empty() {
2795 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2797 if ta == 0
2798 || self.topic_alias_recv.is_none()
2799 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2800 {
2801 let disconnect = v5_0::Disconnect::builder()
2802 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2803 .build()
2804 .unwrap();
2805 events.extend(self.process_send_v5_0_disconnect(disconnect));
2806 events.push(GenericEvent::NotifyError(
2807 MqttError::TopicAliasInvalid,
2808 ));
2809 return events;
2810 }
2811
2812 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2813 if let Some(topic_name) = topic_alias_recv.get(ta) {
2814 match packet.add_extracted_topic_name(topic_name) {
2815 Ok(extracted) => {
2816 packet = extracted;
2817 }
2818 Err(_e) => {
2819 error!("topic alias extract failed: {_e}");
2820 let disconnect = v5_0::Disconnect::builder()
2821 .reason_code(
2822 DisconnectReasonCode::TopicAliasInvalid,
2823 )
2824 .build()
2825 .unwrap();
2826 events.extend(
2827 self.process_send_v5_0_disconnect(disconnect),
2828 );
2829 events.push(GenericEvent::NotifyError(
2830 MqttError::TopicAliasInvalid,
2831 ));
2832 return events;
2833 }
2834 }
2835 } else {
2836 let disconnect = v5_0::Disconnect::builder()
2837 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2838 .build()
2839 .unwrap();
2840 events
2841 .extend(self.process_send_v5_0_disconnect(disconnect));
2842 events.push(GenericEvent::NotifyError(
2843 MqttError::TopicAliasInvalid,
2844 ));
2845 return events;
2846 }
2847 }
2848 } else {
2849 let disconnect = v5_0::Disconnect::builder()
2850 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2851 .build()
2852 .unwrap();
2853 events.extend(self.process_send_v5_0_disconnect(disconnect));
2854 events
2855 .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2856 return events;
2857 }
2858 } else {
2859 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2861 if ta == 0
2862 || self.topic_alias_recv.is_none()
2863 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2864 {
2865 let disconnect = v5_0::Disconnect::builder()
2866 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2867 .build()
2868 .unwrap();
2869 events.extend(self.process_send_v5_0_disconnect(disconnect));
2870 events.push(GenericEvent::NotifyError(
2871 MqttError::TopicAliasInvalid,
2872 ));
2873 return events;
2874 }
2875 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2876 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2877 }
2878 }
2879 }
2880
2881 if puback_send {
2883 let puback = v5_0::GenericPuback::builder()
2884 .packet_id(packet.packet_id().unwrap())
2885 .build()
2886 .unwrap();
2887 events.extend(self.process_send_v5_0_puback(puback));
2888 }
2889 if pubrec_send {
2890 let pubrec = v5_0::GenericPubrec::builder()
2891 .packet_id(packet.packet_id().unwrap())
2892 .build()
2893 .unwrap();
2894 events.extend(self.process_send_v5_0_pubrec(pubrec));
2895 }
2896
2897 events.extend(self.refresh_pingreq_recv());
2899
2900 if !already_handled {
2902 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2903 }
2904 }
2905 Err(e) => {
2906 if self.status == ConnectionStatus::Connected {
2907 let disconnect = v5_0::Disconnect::builder()
2908 .reason_code(e.into())
2909 .build()
2910 .unwrap();
2911 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2912 events.extend(disconnect_events);
2913 }
2914 events.push(GenericEvent::NotifyError(e));
2915 }
2916 }
2917 }
2918 PacketData::Normal(_) => {
2919 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2920 }
2921 }
2922
2923 events
2924 }
2925
2926 fn process_recv_v3_1_1_puback(
2927 &mut self,
2928 raw_packet: RawPacket,
2929 ) -> Vec<GenericEvent<PacketIdType>> {
2930 let mut events = Vec::new();
2931
2932 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2933 Ok((packet, _)) => {
2934 let packet_id = packet.packet_id();
2935 if self.pid_puback.remove(&packet_id) {
2936 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2937 if self.pid_man.is_used_id(packet_id) {
2938 self.pid_man.release_id(packet_id);
2939 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2940 }
2941 events.extend(self.refresh_pingreq_recv());
2942 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2943 } else {
2944 events.push(GenericEvent::RequestClose);
2945 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2946 }
2947 }
2948 Err(e) => {
2949 events.push(GenericEvent::RequestClose);
2950 events.push(GenericEvent::NotifyError(e));
2951 }
2952 }
2953
2954 events
2955 }
2956
2957 fn process_recv_v5_0_puback(
2958 &mut self,
2959 raw_packet: RawPacket,
2960 ) -> Vec<GenericEvent<PacketIdType>> {
2961 let mut events = Vec::new();
2962
2963 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2964 Ok((packet, _)) => {
2965 let packet_id = packet.packet_id();
2966 if self.pid_puback.remove(&packet_id) {
2967 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2968 if self.pid_man.is_used_id(packet_id) {
2969 self.pid_man.release_id(packet_id);
2970 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2971 }
2972 if self.publish_send_max.is_some() {
2973 self.publish_send_count -= 1;
2974 }
2975 events.extend(self.refresh_pingreq_recv());
2976 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2977 } else {
2978 let disconnect = v5_0::Disconnect::builder()
2979 .reason_code(DisconnectReasonCode::ProtocolError)
2980 .build()
2981 .unwrap();
2982 events.extend(self.process_send_v5_0_disconnect(disconnect));
2983 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2984 }
2985 }
2986 Err(e) => {
2987 let disconnect = v5_0::Disconnect::builder()
2988 .reason_code(DisconnectReasonCode::ProtocolError)
2989 .build()
2990 .unwrap();
2991 events.extend(self.process_send_v5_0_disconnect(disconnect));
2992 events.push(GenericEvent::NotifyError(e));
2993 }
2994 }
2995
2996 events
2997 }
2998
2999 fn process_recv_v3_1_1_pubrec(
3000 &mut self,
3001 raw_packet: RawPacket,
3002 ) -> Vec<GenericEvent<PacketIdType>> {
3003 let mut events = Vec::new();
3004
3005 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3006 Ok((packet, _)) => {
3007 let packet_id = packet.packet_id();
3008 if self.pid_pubrec.remove(&packet_id) {
3009 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
3010 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3011 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
3012 .packet_id(packet_id)
3013 .build()
3014 .unwrap();
3015 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
3016 }
3017 events.extend(self.refresh_pingreq_recv());
3018 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3019 } else {
3020 events.push(GenericEvent::RequestClose);
3021 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3022 }
3023 }
3024 Err(e) => {
3025 events.push(GenericEvent::RequestClose);
3026 events.push(GenericEvent::NotifyError(e));
3027 }
3028 }
3029
3030 events
3031 }
3032
3033 fn process_recv_v5_0_pubrec(
3034 &mut self,
3035 raw_packet: RawPacket,
3036 ) -> Vec<GenericEvent<PacketIdType>> {
3037 let mut events = Vec::new();
3038
3039 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3040 Ok((packet, _)) => {
3041 let packet_id = packet.packet_id();
3042 if self.pid_pubrec.remove(&packet_id) {
3043 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3044 if let Some(reason_code) = packet.reason_code() {
3045 if reason_code != PubrecReasonCode::Success {
3046 if self.pid_man.is_used_id(packet_id) {
3047 self.pid_man.release_id(packet_id);
3048 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3049 }
3050 self.qos2_publish_processing.remove(&packet_id);
3051 if self.publish_send_max.is_some() {
3052 self.publish_send_count -= 1;
3053 }
3054 } else if self.auto_pub_response
3055 && self.status == ConnectionStatus::Connected
3056 {
3057 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3058 .packet_id(packet_id)
3059 .build()
3060 .unwrap();
3061 events.extend(self.process_send_v5_0_pubrel(pubrel));
3062 }
3063 } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3064 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3065 .packet_id(packet_id)
3066 .build()
3067 .unwrap();
3068 events.extend(self.process_send_v5_0_pubrel(pubrel));
3069 }
3070 events.extend(self.refresh_pingreq_recv());
3071 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3072 } else {
3073 let disconnect = v5_0::Disconnect::builder()
3074 .reason_code(DisconnectReasonCode::ProtocolError)
3075 .build()
3076 .unwrap();
3077 events.extend(self.process_send_v5_0_disconnect(disconnect));
3078 events.push(GenericEvent::NotifyError(MqttError::from(
3079 DisconnectReasonCode::ProtocolError,
3080 )));
3081 }
3082 }
3083 Err(e) => {
3084 let disconnect = v5_0::Disconnect::builder()
3085 .reason_code(DisconnectReasonCode::ProtocolError)
3086 .build()
3087 .unwrap();
3088 events.extend(self.process_send_v5_0_disconnect(disconnect));
3089 events.push(GenericEvent::NotifyError(e));
3090 }
3091 }
3092
3093 events
3094 }
3095
3096 fn process_recv_v3_1_1_pubrel(
3097 &mut self,
3098 raw_packet: RawPacket,
3099 ) -> Vec<GenericEvent<PacketIdType>> {
3100 let mut events = Vec::new();
3101
3102 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3103 Ok((packet, _)) => {
3104 let packet_id = packet.packet_id();
3105 self.qos2_publish_handled.remove(&packet_id);
3106 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3107 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3108 .packet_id(packet_id)
3109 .build()
3110 .unwrap();
3111 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3112 }
3113 events.extend(self.refresh_pingreq_recv());
3114 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3115 }
3116 Err(e) => {
3117 events.push(GenericEvent::RequestClose);
3118 events.push(GenericEvent::NotifyError(e));
3119 }
3120 }
3121
3122 events
3123 }
3124
3125 fn process_recv_v5_0_pubrel(
3126 &mut self,
3127 raw_packet: RawPacket,
3128 ) -> Vec<GenericEvent<PacketIdType>> {
3129 let mut events = Vec::new();
3130
3131 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3132 Ok((packet, _)) => {
3133 let packet_id = packet.packet_id();
3134 self.qos2_publish_handled.remove(&packet_id);
3135 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3136 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3137 .packet_id(packet_id)
3138 .build()
3139 .unwrap();
3140 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3141 }
3142 events.extend(self.refresh_pingreq_recv());
3143 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3144 }
3145 Err(e) => {
3146 let disconnect = v5_0::Disconnect::builder()
3147 .reason_code(DisconnectReasonCode::ProtocolError)
3148 .build()
3149 .unwrap();
3150 events.extend(self.process_send_v5_0_disconnect(disconnect));
3151 events.push(GenericEvent::NotifyError(e));
3152 }
3153 }
3154
3155 events
3156 }
3157
3158 fn process_recv_v3_1_1_pubcomp(
3159 &mut self,
3160 raw_packet: RawPacket,
3161 ) -> Vec<GenericEvent<PacketIdType>> {
3162 let mut events = Vec::new();
3163
3164 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3165 Ok((packet, _)) => {
3166 let packet_id = packet.packet_id();
3167 if self.pid_pubcomp.remove(&packet_id) {
3168 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3169 if self.pid_man.is_used_id(packet_id) {
3170 self.pid_man.release_id(packet_id);
3171 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3172 }
3173 self.qos2_publish_processing.remove(&packet_id);
3174 events.extend(self.refresh_pingreq_recv());
3175 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3176 } else {
3177 events.push(GenericEvent::RequestClose);
3178 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3179 }
3180 }
3181 Err(e) => {
3182 events.push(GenericEvent::RequestClose);
3183 events.push(GenericEvent::NotifyError(e));
3184 }
3185 }
3186
3187 events
3188 }
3189
3190 fn process_recv_v5_0_pubcomp(
3191 &mut self,
3192 raw_packet: RawPacket,
3193 ) -> Vec<GenericEvent<PacketIdType>> {
3194 let mut events = Vec::new();
3195
3196 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3197 Ok((packet, _)) => {
3198 let packet_id = packet.packet_id();
3199 if self.pid_pubcomp.remove(&packet_id) {
3200 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3201 if self.pid_man.is_used_id(packet_id) {
3202 self.pid_man.release_id(packet_id);
3203 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3204 }
3205 self.qos2_publish_processing.remove(&packet_id);
3206 if self.publish_send_max.is_some() {
3207 self.publish_send_count -= 1;
3208 }
3209 events.extend(self.refresh_pingreq_recv());
3210 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3211 } else {
3212 let disconnect = v5_0::Disconnect::builder()
3213 .reason_code(DisconnectReasonCode::ProtocolError)
3214 .build()
3215 .unwrap();
3216 events.extend(self.process_send_v5_0_disconnect(disconnect));
3217 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3218 }
3219 }
3220 Err(e) => {
3221 let disconnect = v5_0::Disconnect::builder()
3222 .reason_code(DisconnectReasonCode::ProtocolError)
3223 .build()
3224 .unwrap();
3225 events.extend(self.process_send_v5_0_disconnect(disconnect));
3226 events.push(GenericEvent::NotifyError(e));
3227 }
3228 }
3229
3230 events
3231 }
3232
3233 fn process_recv_v3_1_1_subscribe(
3234 &mut self,
3235 raw_packet: RawPacket,
3236 ) -> Vec<GenericEvent<PacketIdType>> {
3237 let mut events = Vec::new();
3238
3239 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3240 Ok((packet, _)) => {
3241 events.extend(self.refresh_pingreq_recv());
3242 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3243 }
3244 Err(e) => {
3245 events.push(GenericEvent::RequestClose);
3246 events.push(GenericEvent::NotifyError(e));
3247 }
3248 }
3249
3250 events
3251 }
3252
3253 fn process_recv_v5_0_subscribe(
3254 &mut self,
3255 raw_packet: RawPacket,
3256 ) -> Vec<GenericEvent<PacketIdType>> {
3257 let mut events = Vec::new();
3258
3259 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3260 Ok((packet, _)) => {
3261 events.extend(self.refresh_pingreq_recv());
3262 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3263 }
3264 Err(e) => {
3265 let disconnect = v5_0::Disconnect::builder()
3266 .reason_code(DisconnectReasonCode::ProtocolError)
3267 .build()
3268 .unwrap();
3269 events.extend(self.process_send_v5_0_disconnect(disconnect));
3270 events.push(GenericEvent::NotifyError(e));
3271 }
3272 }
3273
3274 events
3275 }
3276
3277 fn process_recv_v3_1_1_suback(
3278 &mut self,
3279 raw_packet: RawPacket,
3280 ) -> Vec<GenericEvent<PacketIdType>> {
3281 let mut events = Vec::new();
3282
3283 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3284 Ok((packet, _)) => {
3285 let packet_id = packet.packet_id();
3286 if self.pid_suback.remove(&packet_id) {
3287 if self.pid_man.is_used_id(packet_id) {
3288 self.pid_man.release_id(packet_id);
3289 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3290 }
3291 events.extend(self.refresh_pingreq_recv());
3292 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3293 } else {
3294 events.push(GenericEvent::RequestClose);
3295 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3296 }
3297 }
3298 Err(e) => {
3299 events.push(GenericEvent::RequestClose);
3300 events.push(GenericEvent::NotifyError(e));
3301 }
3302 }
3303
3304 events
3305 }
3306
3307 fn process_recv_v5_0_suback(
3308 &mut self,
3309 raw_packet: RawPacket,
3310 ) -> Vec<GenericEvent<PacketIdType>> {
3311 let mut events = Vec::new();
3312
3313 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3314 Ok((packet, _)) => {
3315 let packet_id = packet.packet_id();
3316 if self.pid_suback.remove(&packet_id) {
3317 if self.pid_man.is_used_id(packet_id) {
3318 self.pid_man.release_id(packet_id);
3319 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3320 }
3321 events.extend(self.refresh_pingreq_recv());
3322 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3323 } else {
3324 let disconnect = v5_0::Disconnect::builder()
3325 .reason_code(DisconnectReasonCode::ProtocolError)
3326 .build()
3327 .unwrap();
3328 events.extend(self.process_send_v5_0_disconnect(disconnect));
3329 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3330 }
3331 }
3332 Err(e) => {
3333 let disconnect = v5_0::Disconnect::builder()
3334 .reason_code(DisconnectReasonCode::ProtocolError)
3335 .build()
3336 .unwrap();
3337 events.extend(self.process_send_v5_0_disconnect(disconnect));
3338 events.push(GenericEvent::NotifyError(e));
3339 }
3340 }
3341
3342 events
3343 }
3344
3345 fn process_recv_v3_1_1_unsubscribe(
3346 &mut self,
3347 raw_packet: RawPacket,
3348 ) -> Vec<GenericEvent<PacketIdType>> {
3349 let mut events = Vec::new();
3350
3351 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3352 Ok((packet, _)) => {
3353 events.extend(self.refresh_pingreq_recv());
3354 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3355 }
3356 Err(e) => {
3357 events.push(GenericEvent::RequestClose);
3358 events.push(GenericEvent::NotifyError(e));
3359 }
3360 }
3361
3362 events
3363 }
3364
3365 fn process_recv_v5_0_unsubscribe(
3366 &mut self,
3367 raw_packet: RawPacket,
3368 ) -> Vec<GenericEvent<PacketIdType>> {
3369 let mut events = Vec::new();
3370
3371 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3372 Ok((packet, _)) => {
3373 events.extend(self.refresh_pingreq_recv());
3374 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3375 }
3376 Err(e) => {
3377 let disconnect = v5_0::Disconnect::builder()
3378 .reason_code(DisconnectReasonCode::ProtocolError)
3379 .build()
3380 .unwrap();
3381 events.extend(self.process_send_v5_0_disconnect(disconnect));
3382 events.push(GenericEvent::NotifyError(e));
3383 }
3384 }
3385
3386 events
3387 }
3388
3389 fn process_recv_v3_1_1_unsuback(
3390 &mut self,
3391 raw_packet: RawPacket,
3392 ) -> Vec<GenericEvent<PacketIdType>> {
3393 let mut events = Vec::new();
3394
3395 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3396 Ok((packet, _)) => {
3397 let packet_id = packet.packet_id();
3398 if self.pid_unsuback.remove(&packet_id) {
3399 if self.pid_man.is_used_id(packet_id) {
3400 self.pid_man.release_id(packet_id);
3401 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3402 }
3403 events.extend(self.refresh_pingreq_recv());
3404 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3405 } else {
3406 events.push(GenericEvent::RequestClose);
3407 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3408 }
3409 }
3410 Err(e) => {
3411 events.push(GenericEvent::RequestClose);
3412 events.push(GenericEvent::NotifyError(e));
3413 }
3414 }
3415
3416 events
3417 }
3418
3419 fn process_recv_v5_0_unsuback(
3420 &mut self,
3421 raw_packet: RawPacket,
3422 ) -> Vec<GenericEvent<PacketIdType>> {
3423 let mut events = Vec::new();
3424
3425 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3426 Ok((packet, _)) => {
3427 let packet_id = packet.packet_id();
3428 if self.pid_unsuback.remove(&packet_id) {
3429 if self.pid_man.is_used_id(packet_id) {
3430 self.pid_man.release_id(packet_id);
3431 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3432 }
3433 events.extend(self.refresh_pingreq_recv());
3434 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3435 } else {
3436 let disconnect = v5_0::Disconnect::builder()
3437 .reason_code(DisconnectReasonCode::ProtocolError)
3438 .build()
3439 .unwrap();
3440 events.extend(self.process_send_v5_0_disconnect(disconnect));
3441 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3442 }
3443 }
3444 Err(e) => {
3445 let disconnect = v5_0::Disconnect::builder()
3446 .reason_code(DisconnectReasonCode::ProtocolError)
3447 .build()
3448 .unwrap();
3449 events.extend(self.process_send_v5_0_disconnect(disconnect));
3450 events.push(GenericEvent::NotifyError(e));
3451 }
3452 }
3453
3454 events
3455 }
3456
3457 fn process_recv_v3_1_1_pingreq(
3458 &mut self,
3459 raw_packet: RawPacket,
3460 ) -> Vec<GenericEvent<PacketIdType>> {
3461 let mut events = Vec::new();
3462
3463 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3464 Ok((packet, _)) => {
3465 if (Role::IS_SERVER || Role::IS_ANY)
3466 && !self.is_client
3467 && self.auto_ping_response
3468 && self.status == ConnectionStatus::Connected
3469 {
3470 let pingresp = v3_1_1::Pingresp::new();
3471 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3472 }
3473 events.extend(self.refresh_pingreq_recv());
3474 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3475 }
3476 Err(e) => {
3477 events.push(GenericEvent::RequestClose);
3478 events.push(GenericEvent::NotifyError(e));
3479 }
3480 }
3481
3482 events
3483 }
3484
3485 fn process_recv_v5_0_pingreq(
3486 &mut self,
3487 raw_packet: RawPacket,
3488 ) -> Vec<GenericEvent<PacketIdType>> {
3489 let mut events = Vec::new();
3490
3491 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3492 Ok((packet, _)) => {
3493 if (Role::IS_SERVER || Role::IS_ANY)
3494 && !self.is_client
3495 && self.auto_ping_response
3496 && self.status == ConnectionStatus::Connected
3497 {
3498 let pingresp = v5_0::Pingresp::new();
3499 events.extend(self.process_send_v5_0_pingresp(pingresp));
3500 }
3501 events.extend(self.refresh_pingreq_recv());
3502 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3503 }
3504 Err(e) => {
3505 let disconnect = v5_0::Disconnect::builder()
3506 .reason_code(DisconnectReasonCode::ProtocolError)
3507 .build()
3508 .unwrap();
3509 events.extend(self.process_send_v5_0_disconnect(disconnect));
3510 events.push(GenericEvent::NotifyError(e));
3511 }
3512 }
3513
3514 events
3515 }
3516
3517 fn process_recv_v3_1_1_pingresp(
3518 &mut self,
3519 raw_packet: RawPacket,
3520 ) -> Vec<GenericEvent<PacketIdType>> {
3521 let mut events = Vec::new();
3522
3523 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3524 Ok((packet, _)) => {
3525 self.pingresp_recv_set = false;
3526 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3527 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3528 }
3529 Err(e) => {
3530 events.push(GenericEvent::RequestClose);
3531 events.push(GenericEvent::NotifyError(e));
3532 }
3533 }
3534
3535 events
3536 }
3537
3538 fn process_recv_v5_0_pingresp(
3539 &mut self,
3540 raw_packet: RawPacket,
3541 ) -> Vec<GenericEvent<PacketIdType>> {
3542 let mut events = Vec::new();
3543
3544 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3545 Ok((packet, _)) => {
3546 self.pingresp_recv_set = false;
3547 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3548 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3549 }
3550 Err(e) => {
3551 let disconnect = v5_0::Disconnect::builder()
3552 .reason_code(DisconnectReasonCode::ProtocolError)
3553 .build()
3554 .unwrap();
3555 events.extend(self.process_send_v5_0_disconnect(disconnect));
3556 events.push(GenericEvent::NotifyError(e));
3557 }
3558 }
3559
3560 events
3561 }
3562
3563 fn process_recv_v3_1_1_disconnect(
3564 &mut self,
3565 raw_packet: RawPacket,
3566 ) -> Vec<GenericEvent<PacketIdType>> {
3567 let mut events = Vec::new();
3568
3569 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3570 Ok((packet, _)) => {
3571 self.cancel_timers(&mut events);
3572 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3573 }
3574 Err(e) => {
3575 events.push(GenericEvent::RequestClose);
3576 events.push(GenericEvent::NotifyError(e));
3577 }
3578 }
3579
3580 events
3581 }
3582
3583 fn process_recv_v5_0_disconnect(
3584 &mut self,
3585 raw_packet: RawPacket,
3586 ) -> Vec<GenericEvent<PacketIdType>> {
3587 let mut events = Vec::new();
3588
3589 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3590 Ok((packet, _)) => {
3591 self.cancel_timers(&mut events);
3592 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3593 }
3594 Err(e) => {
3595 let disconnect = v5_0::Disconnect::builder()
3596 .reason_code(DisconnectReasonCode::ProtocolError)
3597 .build()
3598 .unwrap();
3599 events.extend(self.process_send_v5_0_disconnect(disconnect));
3600 events.push(GenericEvent::NotifyError(e));
3601 }
3602 }
3603
3604 events
3605 }
3606
3607 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3608 let mut events = Vec::new();
3609
3610 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3611 Ok((packet, _)) => {
3612 events.extend(self.refresh_pingreq_recv());
3613 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3614 }
3615 Err(e) => {
3616 let disconnect = v5_0::Disconnect::builder()
3617 .reason_code(DisconnectReasonCode::ProtocolError)
3618 .build()
3619 .unwrap();
3620 events.extend(self.process_send_v5_0_disconnect(disconnect));
3621 events.push(GenericEvent::NotifyError(e));
3622 }
3623 }
3624
3625 events
3626 }
3627
3628 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3629 let mut events = Vec::new();
3630 if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3631 if self.status == ConnectionStatus::Connecting
3632 || self.status == ConnectionStatus::Connected
3633 {
3634 self.pingreq_recv_set = true;
3635 events.push(GenericEvent::RequestTimerReset {
3636 kind: TimerKind::PingreqRecv,
3637 duration_ms: timeout_ms,
3638 });
3639 } else {
3640 self.pingreq_recv_set = false;
3641 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3642 }
3643 }
3644
3645 events
3646 }
3647
3648 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3650 if self.pingreq_send_set {
3651 self.pingreq_send_set = false;
3652 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3653 }
3654 if self.pingreq_recv_set {
3655 self.pingreq_recv_set = false;
3656 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3657 }
3658 if self.pingresp_recv_set {
3659 self.pingresp_recv_set = false;
3660 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3661 }
3662 }
3663
3664 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3666 for prop in props {
3667 if let Property::TopicAlias(ta) = prop {
3668 return Some(ta.val());
3669 }
3670 }
3671 None
3672 }
3673
3674 #[allow(dead_code)]
3675 fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3676 self.pid_man.is_used_id(packet_id)
3677 }
3678}
3679
3680pub trait RecvBehavior<Role, PacketIdType>
3683where
3684 PacketIdType: IsPacketId,
3685{
3686 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3687}
3688
3689impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3691 for GenericConnection<role::Client, PacketIdType>
3692where
3693 PacketIdType: IsPacketId,
3694{
3695 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3696 self.recv(data)
3697 }
3698}
3699
3700impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3701 for GenericConnection<role::Server, PacketIdType>
3702where
3703 PacketIdType: IsPacketId,
3704{
3705 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3706 self.recv(data)
3707 }
3708}
3709
3710impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3711 for GenericConnection<role::Any, PacketIdType>
3712where
3713 PacketIdType: IsPacketId,
3714{
3715 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3716 self.recv(data)
3717 }
3718}
3719
3720#[cfg(test)]
3723mod tests {
3724 use super::*;
3725 use crate::mqtt::connection::version::Version;
3726 use crate::mqtt::packet::TopicAliasSend;
3727 use crate::mqtt::role;
3728
3729 #[test]
3730 fn test_initialize_client_mode() {
3731 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3732
3733 connection.initialize(true);
3735
3736 assert!(connection.is_client);
3738 assert_eq!(connection.publish_send_count, 0);
3739 assert!(connection.publish_send_max.is_none());
3740 assert!(connection.publish_recv_max.is_none());
3741 assert!(!connection.need_store);
3742 }
3743
3744 #[test]
3745 fn test_initialize_server_mode() {
3746 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3747
3748 connection.initialize(false);
3750
3751 assert!(!connection.is_client);
3753 assert_eq!(connection.publish_send_count, 0);
3754 assert!(connection.publish_send_max.is_none());
3755 assert!(connection.publish_recv_max.is_none());
3756 assert!(!connection.need_store);
3757 }
3758
3759 #[test]
3760 fn test_validate_topic_alias_no_topic_alias_send() {
3761 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3762
3763 let result = connection.validate_topic_alias(Some(1));
3765 assert!(result.is_none());
3766 }
3767
3768 #[test]
3769 fn test_validate_topic_alias_none_input() {
3770 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3771
3772 let result = connection.validate_topic_alias(None);
3774 assert!(result.is_none());
3775 }
3776
3777 #[test]
3778 fn test_validate_topic_alias_range_no_topic_alias_send() {
3779 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3780
3781 let result = connection.validate_topic_alias_range(1);
3783 assert!(!result);
3784 }
3785
3786 #[test]
3787 fn test_validate_topic_alias_range_zero() {
3788 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3789
3790 let topic_alias_send = TopicAliasSend::new(10);
3792 connection.topic_alias_send = Some(topic_alias_send);
3793
3794 let result = connection.validate_topic_alias_range(0);
3796 assert!(!result);
3797 }
3798
3799 #[test]
3800 fn test_validate_topic_alias_range_over_max() {
3801 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3802
3803 let topic_alias_send = TopicAliasSend::new(5);
3805 connection.topic_alias_send = Some(topic_alias_send);
3806
3807 let result = connection.validate_topic_alias_range(6);
3809 assert!(!result);
3810 }
3811
3812 #[test]
3813 fn test_validate_topic_alias_range_valid() {
3814 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3815
3816 let topic_alias_send = TopicAliasSend::new(10);
3818 connection.topic_alias_send = Some(topic_alias_send);
3819
3820 assert!(connection.validate_topic_alias_range(1));
3822 assert!(connection.validate_topic_alias_range(5));
3823 assert!(connection.validate_topic_alias_range(10));
3824 }
3825
3826 #[test]
3827 fn test_validate_topic_alias_with_registered_alias() {
3828 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3829
3830 let mut topic_alias_send = TopicAliasSend::new(10);
3832 topic_alias_send.insert_or_update("test/topic", 5);
3833 connection.topic_alias_send = Some(topic_alias_send);
3834
3835 let result = connection.validate_topic_alias(Some(5));
3837 assert_eq!(result, Some("test/topic".to_string()));
3838 }
3839
3840 #[test]
3841 fn test_validate_topic_alias_unregistered_alias() {
3842 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3843
3844 let topic_alias_send = TopicAliasSend::new(10);
3846 connection.topic_alias_send = Some(topic_alias_send);
3847
3848 let result = connection.validate_topic_alias(Some(5));
3850 assert!(result.is_none());
3851 }
3852
3853 #[test]
3854 fn test_validate_maximum_packet_size_within_limit() {
3855 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3856
3857 let result = connection.validate_maximum_packet_size_send(1000);
3859 assert!(result);
3860 }
3861
3862 #[test]
3863 fn test_validate_maximum_packet_size_at_limit() {
3864 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3865
3866 connection.maximum_packet_size_send = 1000;
3868
3869 let result = connection.validate_maximum_packet_size_send(1000);
3871 assert!(result);
3872 }
3873
3874 #[test]
3875 fn test_validate_maximum_packet_size_over_limit() {
3876 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3877
3878 connection.maximum_packet_size_send = 1000;
3880
3881 let result = connection.validate_maximum_packet_size_send(1001);
3883 assert!(!result);
3884 }
3885
3886 #[test]
3887 fn test_validate_maximum_packet_size_zero_limit() {
3888 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3889
3890 connection.maximum_packet_size_send = 0;
3892
3893 let result = connection.validate_maximum_packet_size_send(1);
3895 assert!(!result);
3896
3897 let result = connection.validate_maximum_packet_size_send(0);
3899 assert!(result);
3900 }
3901
3902 #[test]
3903 fn test_initialize_clears_state() {
3904 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3905
3906 connection.publish_send_count = 5;
3908 connection.need_store = true;
3909 connection.pid_suback.insert(123);
3910 connection.pid_unsuback.insert(456);
3911
3912 connection.initialize(true);
3914
3915 assert_eq!(connection.publish_send_count, 0);
3917 assert!(!connection.need_store);
3918 assert!(connection.pid_suback.is_empty());
3919 assert!(connection.pid_unsuback.is_empty());
3920 assert!(connection.is_client);
3921 }
3922
3923 #[test]
3924 fn test_remaining_length_to_total_size() {
3925 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3941}