1use alloc::{
25 string::{String, ToString},
26 vec::Vec,
27};
28use core::marker::PhantomData;
29
30use crate::mqtt::common::Cursor;
31use crate::mqtt::common::HashSet;
32use crate::mqtt::connection::event::{GenericEvent, TimerKind};
33use crate::mqtt::connection::GenericStore;
34
35use serde::Serialize;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38enum ConnectionStatus {
39 #[serde(rename = "disconnected")]
40 Disconnected,
41 #[serde(rename = "connecting")]
42 Connecting,
43 #[serde(rename = "connected")]
44 Connected,
45}
46use crate::mqtt::connection::packet_builder::{
47 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
48};
49use crate::mqtt::connection::packet_id_manager::PacketIdManager;
50use crate::mqtt::connection::role;
51use crate::mqtt::connection::role::RoleType;
52use crate::mqtt::connection::sendable::Sendable;
53use crate::mqtt::connection::version::*;
54use crate::mqtt::packet::v3_1_1;
55use crate::mqtt::packet::v5_0;
56use crate::mqtt::packet::GenericPacket;
57use crate::mqtt::packet::GenericStorePacket;
58use crate::mqtt::packet::IsPacketId;
59use crate::mqtt::packet::Qos;
60use crate::mqtt::packet::ResponsePacket;
61use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
62use crate::mqtt::prelude::GenericPacketTrait;
63use crate::mqtt::result_code::{
64 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
65};
66
67const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
70
71fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
78 let remaining_length_bytes = if remaining_length < 128 {
79 1
80 } else if remaining_length < 16384 {
81 2
82 } else if remaining_length < 2097152 {
83 3
84 } else {
85 4
86 };
87
88 1 + remaining_length_bytes + remaining_length
89}
90
91pub type Event = GenericEvent<u16>;
96
97pub struct GenericConnection<Role, PacketIdType>
140where
141 Role: RoleType,
142 PacketIdType: IsPacketId,
143{
144 _marker: PhantomData<Role>,
145
146 protocol_version: Version,
147
148 pid_man: PacketIdManager<PacketIdType>,
149 pid_suback: HashSet<PacketIdType>,
150 pid_unsuback: HashSet<PacketIdType>,
151 pid_puback: HashSet<PacketIdType>,
152 pid_pubrec: HashSet<PacketIdType>,
153 pid_pubcomp: HashSet<PacketIdType>,
154
155 need_store: bool,
156 store: GenericStore<PacketIdType>,
158
159 offline_publish: bool,
160 auto_pub_response: bool,
161 auto_ping_response: bool,
162
163 auto_map_topic_alias_send: bool,
165 auto_replace_topic_alias_send: bool,
167 topic_alias_recv: Option<TopicAliasRecv>,
169 topic_alias_send: Option<TopicAliasSend>,
171
172 publish_send_max: Option<u16>,
173 publish_recv_max: Option<u16>,
175 publish_send_count: u16,
178
179 publish_recv: HashSet<PacketIdType>,
181
182 maximum_packet_size_send: u32,
184 maximum_packet_size_recv: u32,
186
187 status: ConnectionStatus,
189
190 pingreq_send_interval_ms: Option<u64>,
192 pingreq_recv_timeout_ms: Option<u64>,
194 pingresp_recv_timeout_ms: Option<u64>,
196
197 qos2_publish_handled: HashSet<PacketIdType>,
199 qos2_publish_processing: HashSet<PacketIdType>,
201
202 pingreq_send_set: bool,
204 pingreq_recv_set: bool,
205 pingresp_recv_set: bool,
206
207 packet_builder: PacketBuilder,
208 is_client: bool,
210}
211
212pub type Connection<Role> = GenericConnection<Role, u16>;
225
226impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
227where
228 Role: RoleType,
229 PacketIdType: IsPacketId,
230{
231 pub fn new(version: Version) -> Self {
254 Self {
255 _marker: PhantomData,
256 protocol_version: version,
257 pid_man: PacketIdManager::new(),
258 pid_suback: HashSet::new(),
259 pid_unsuback: HashSet::new(),
260 pid_puback: HashSet::new(),
261 pid_pubrec: HashSet::new(),
262 pid_pubcomp: HashSet::new(),
263 need_store: false,
264 store: GenericStore::new(),
265 offline_publish: false,
266 auto_pub_response: false,
267 auto_ping_response: false,
268 auto_map_topic_alias_send: false,
269 auto_replace_topic_alias_send: false,
270 topic_alias_recv: None,
271 topic_alias_send: None,
272 publish_send_max: None,
273 publish_recv_max: None,
274 publish_send_count: 0,
275 publish_recv: HashSet::new(),
276 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
277 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
278 status: ConnectionStatus::Disconnected,
279 pingreq_send_interval_ms: None,
280 pingreq_recv_timeout_ms: None,
281 pingresp_recv_timeout_ms: None,
282 qos2_publish_handled: HashSet::new(),
283 qos2_publish_processing: HashSet::new(),
284 pingreq_send_set: false,
285 pingreq_recv_set: false,
286 pingresp_recv_set: false,
287 packet_builder: PacketBuilder::new(),
288 is_client: false,
289 }
290 }
291
292 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
333 where
334 T: Sendable<Role, PacketIdType>,
335 {
336 packet.dispatch_send(self)
338 }
339
340 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
385 use core::any::TypeId;
386
387 let role_id = TypeId::of::<Role>();
388 let client_id = TypeId::of::<role::Client>();
389 let server_id = TypeId::of::<role::Server>();
390 let any_id = TypeId::of::<role::Any>();
391
392 let packet_version = packet.protocol_version();
394
395 if self.protocol_version != packet_version {
397 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
398 }
399
400 match packet {
401 GenericPacket::V3_1_1Connect(p) => {
403 if role_id == client_id || role_id == any_id {
404 self.process_send_v3_1_1_connect(p)
405 } else {
406 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
407 }
408 }
409 GenericPacket::V5_0Connect(p) => {
410 if role_id == client_id || role_id == any_id {
411 self.process_send_v5_0_connect(p)
412 } else {
413 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
414 }
415 }
416 GenericPacket::V3_1_1Connack(p) => {
418 if role_id == server_id || role_id == any_id {
419 self.process_send_v3_1_1_connack(p)
420 } else {
421 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
422 }
423 }
424 GenericPacket::V5_0Connack(p) => {
425 if role_id == server_id || role_id == any_id {
426 self.process_send_v5_0_connack(p)
427 } else {
428 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
429 }
430 }
431 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
433 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
434 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
436 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
437 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
438 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
439 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
440 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
441 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
442 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
443 GenericPacket::V3_1_1Subscribe(p) => {
445 if role_id == client_id || role_id == any_id {
446 self.process_send_v3_1_1_subscribe(p)
447 } else {
448 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
449 }
450 }
451 GenericPacket::V5_0Subscribe(p) => {
452 if role_id == client_id || role_id == any_id {
453 self.process_send_v5_0_subscribe(p)
454 } else {
455 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
456 }
457 }
458 GenericPacket::V3_1_1Unsubscribe(p) => {
459 if role_id == client_id || role_id == any_id {
460 self.process_send_v3_1_1_unsubscribe(p)
461 } else {
462 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
463 }
464 }
465 GenericPacket::V5_0Unsubscribe(p) => {
466 if role_id == client_id || role_id == any_id {
467 self.process_send_v5_0_unsubscribe(p)
468 } else {
469 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
470 }
471 }
472 GenericPacket::V3_1_1Suback(p) => {
474 if role_id == server_id || role_id == any_id {
475 self.process_send_v3_1_1_suback(p)
476 } else {
477 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
478 }
479 }
480 GenericPacket::V5_0Suback(p) => {
481 if role_id == server_id || role_id == any_id {
482 self.process_send_v5_0_suback(p)
483 } else {
484 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
485 }
486 }
487 GenericPacket::V3_1_1Unsuback(p) => {
488 if role_id == server_id || role_id == any_id {
489 self.process_send_v3_1_1_unsuback(p)
490 } else {
491 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
492 }
493 }
494 GenericPacket::V5_0Unsuback(p) => {
495 if role_id == server_id || role_id == any_id {
496 self.process_send_v5_0_unsuback(p)
497 } else {
498 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
499 }
500 }
501 GenericPacket::V3_1_1Pingreq(p) => {
503 if role_id == client_id || role_id == any_id {
504 self.process_send_v3_1_1_pingreq(p)
505 } else {
506 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
507 }
508 }
509 GenericPacket::V5_0Pingreq(p) => {
510 if role_id == client_id || role_id == any_id {
511 self.process_send_v5_0_pingreq(p)
512 } else {
513 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
514 }
515 }
516 GenericPacket::V3_1_1Pingresp(p) => {
518 if role_id == server_id || role_id == any_id {
519 self.process_send_v3_1_1_pingresp(p)
520 } else {
521 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
522 }
523 }
524 GenericPacket::V5_0Pingresp(p) => {
525 if role_id == server_id || role_id == any_id {
526 self.process_send_v5_0_pingresp(p)
527 } else {
528 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
529 }
530 }
531 GenericPacket::V3_1_1Disconnect(p) => {
533 if role_id == client_id || role_id == any_id {
534 self.process_send_v3_1_1_disconnect(p)
535 } else {
536 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
537 }
538 }
539 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
541 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
543 }
544 }
545
546 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
595 let mut events = Vec::new();
596
597 match self.packet_builder.feed(data) {
598 PacketBuildResult::Complete(raw_packet) => {
599 events.extend(self.process_recv_packet(raw_packet));
600 }
601 PacketBuildResult::Incomplete => {}
602 PacketBuildResult::Error(e) => {
603 self.cancel_timers(&mut events);
604 events.push(GenericEvent::RequestClose);
605 events.push(GenericEvent::NotifyError(e));
606 }
607 }
608
609 events
610 }
611
612 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
629 let mut events = Vec::new();
630
631 match kind {
632 TimerKind::PingreqSend => {
633 self.pingreq_send_set = false;
635
636 if self.status == ConnectionStatus::Connected {
638 match self.protocol_version {
639 Version::V3_1_1 => {
640 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
641 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
642 }
643 }
644 Version::V5_0 => {
645 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
646 events.extend(self.process_send_v5_0_pingreq(pingreq));
647 }
648 }
649 Version::Undetermined => {
650 unreachable!("Protocol version should be set before sending PINGREQ");
651 }
652 }
653 }
654 }
655 TimerKind::PingreqRecv => {
656 self.pingreq_recv_set = false;
658
659 match self.protocol_version {
660 Version::V3_1_1 => {
661 events.push(GenericEvent::RequestClose);
663 }
664 Version::V5_0 => {
665 if self.status == ConnectionStatus::Connected {
667 if let Ok(disconnect) = v5_0::Disconnect::builder()
668 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
669 .build()
670 {
671 events.extend(self.process_send_v5_0_disconnect(disconnect));
672 }
673 }
674 }
675 Version::Undetermined => {
676 unreachable!("Protocol version should be set before receiving PINGREQ");
677 }
678 }
679 }
680 TimerKind::PingrespRecv => {
681 self.pingresp_recv_set = false;
683
684 match self.protocol_version {
685 Version::V3_1_1 => {
686 events.push(GenericEvent::RequestClose);
688 }
689 Version::V5_0 => {
690 if self.status == ConnectionStatus::Connected {
692 if let Ok(disconnect) = v5_0::Disconnect::builder()
693 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
694 .build()
695 {
696 events.extend(self.process_send_v5_0_disconnect(disconnect));
697 }
698 }
699 }
700 Version::Undetermined => {
701 unreachable!("Protocol version should be set before receiving PINGRESP");
702 }
703 }
704 }
705 }
706
707 events
708 }
709
710 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
723 let mut events = Vec::new();
724
725 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
727 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
728
729 self.status = ConnectionStatus::Disconnected;
731
732 self.topic_alias_send = None;
734 self.topic_alias_recv = None;
735
736 for packet_id in self.pid_suback.drain() {
738 if self.pid_man.is_used_id(packet_id) {
739 self.pid_man.release_id(packet_id);
740 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
741 }
742 }
743
744 for packet_id in self.pid_unsuback.drain() {
746 if self.pid_man.is_used_id(packet_id) {
747 self.pid_man.release_id(packet_id);
748 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
749 }
750 }
751
752 if !self.need_store {
754 self.qos2_publish_processing.clear();
755 self.qos2_publish_handled.clear();
756
757 for packet_id in self.pid_puback.drain() {
759 if self.pid_man.is_used_id(packet_id) {
760 self.pid_man.release_id(packet_id);
761 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
762 }
763 }
764
765 for packet_id in self.pid_pubrec.drain() {
767 if self.pid_man.is_used_id(packet_id) {
768 self.pid_man.release_id(packet_id);
769 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
770 }
771 }
772
773 for packet_id in self.pid_pubcomp.drain() {
775 if self.pid_man.is_used_id(packet_id) {
776 self.pid_man.release_id(packet_id);
777 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
778 }
779 }
780 }
781
782 self.cancel_timers(&mut events);
784
785 events
786 }
787
788 pub fn set_pingreq_send_interval(
801 &mut self,
802 duration_ms: u64,
803 ) -> Vec<GenericEvent<PacketIdType>> {
804 let mut events = Vec::new();
805
806 if duration_ms == 0 {
807 self.pingreq_send_interval_ms = None;
808 self.pingreq_send_set = false;
809 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
810 } else {
811 self.pingreq_send_interval_ms = Some(duration_ms);
812 self.pingreq_send_set = true;
813 events.push(GenericEvent::RequestTimerReset {
814 kind: TimerKind::PingreqSend,
815 duration_ms,
816 });
817 }
818
819 events
820 }
821
822 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
831 self.publish_send_max
833 .map(|max| max.saturating_sub(self.publish_send_count))
834 }
835
836 pub fn set_offline_publish(&mut self, enable: bool) {
845 self.offline_publish = enable;
846 if self.offline_publish {
847 self.need_store = true;
848 }
849 }
850
851 pub fn set_auto_pub_response(&mut self, enable: bool) {
860 self.auto_pub_response = enable;
861 }
862
863 pub fn set_auto_ping_response(&mut self, enable: bool) {
871 self.auto_ping_response = enable;
872 }
873
874 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
886 self.auto_map_topic_alias_send = enable;
887 }
888
889 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
899 self.auto_replace_topic_alias_send = enable;
900 }
901
902 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
904 self.pingresp_recv_timeout_ms = timeout_ms;
905 }
906
907 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
913 self.pid_man.acquire_unique_id()
914 }
915
916 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
929 self.pid_man.register_id(packet_id)
930 }
931
932 pub fn release_packet_id(
946 &mut self,
947 packet_id: PacketIdType,
948 ) -> Vec<GenericEvent<PacketIdType>> {
949 let mut events = Vec::new();
950
951 if self.pid_man.is_used_id(packet_id) {
952 self.pid_man.release_id(packet_id);
953 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
954 }
955
956 events
957 }
958
959 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
968 self.qos2_publish_handled.clone()
969 }
970
971 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
980 self.qos2_publish_handled = pids;
981 }
982
983 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
992 for packet in packets {
993 match &packet {
994 GenericStorePacket::V3_1_1Publish(p) => {
995 match p.qos() {
997 Qos::AtLeastOnce => {
998 self.pid_puback.insert(p.packet_id().unwrap());
999 }
1000 Qos::ExactlyOnce => {
1001 self.pid_pubrec.insert(p.packet_id().unwrap());
1002 }
1003 _ => {
1004 tracing::warn!("QoS 0 packet found in store, skipping");
1006 continue;
1007 }
1008 }
1009 let packet_id = p.packet_id().unwrap();
1011 if self.pid_man.register_id(packet_id).is_ok() {
1012 if let Err(e) = self.store.add(packet) {
1013 tracing::error!("Failed to add packet to store: {:?}", e);
1014 }
1015 } else {
1016 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1017 }
1018 }
1019 GenericStorePacket::V5_0Publish(p) => {
1020 match p.qos() {
1022 Qos::AtLeastOnce => {
1023 self.pid_puback.insert(p.packet_id().unwrap());
1024 }
1025 Qos::ExactlyOnce => {
1026 self.pid_pubrec.insert(p.packet_id().unwrap());
1027 }
1028 _ => {
1029 tracing::warn!("QoS 0 packet found in store, skipping");
1031 continue;
1032 }
1033 }
1034 let packet_id = p.packet_id().unwrap();
1036 if self.pid_man.register_id(packet_id).is_ok() {
1037 if let Err(e) = self.store.add(packet) {
1038 tracing::error!("Failed to add packet to store: {:?}", e);
1039 }
1040 } else {
1041 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1042 }
1043 }
1044 GenericStorePacket::V3_1_1Pubrel(p) => {
1045 self.pid_pubcomp.insert(p.packet_id());
1047 let packet_id = p.packet_id();
1049 if self.pid_man.register_id(packet_id).is_ok() {
1050 if let Err(e) = self.store.add(packet) {
1051 tracing::error!("Failed to add packet to store: {:?}", e);
1052 }
1053 } else {
1054 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1055 }
1056 }
1057 GenericStorePacket::V5_0Pubrel(p) => {
1058 self.pid_pubcomp.insert(p.packet_id());
1060 let packet_id = p.packet_id();
1062 if self.pid_man.register_id(packet_id).is_ok() {
1063 if let Err(e) = self.store.add(packet) {
1064 tracing::error!("Failed to add packet to store: {:?}", e);
1065 }
1066 } else {
1067 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1068 }
1069 }
1070 }
1071 }
1072 }
1073
1074 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1083 self.store.get_stored()
1084 }
1085
1086 pub fn get_protocol_version(&self) -> Version {
1092 self.protocol_version
1093 }
1094
1095 pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1105 self.qos2_publish_processing.contains(&packet_id)
1106 }
1107
1108 pub fn regulate_for_store(
1113 &self,
1114 mut packet: v5_0::GenericPublish<PacketIdType>,
1115 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1116 if packet.topic_name().is_empty() {
1117 if let Some(props) = packet.props() {
1119 if let Some(topic_alias) = Self::get_topic_alias_from_props(props) {
1120 if let Some(ref topic_alias_send) = self.topic_alias_send {
1121 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1122 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1124 } else {
1125 return Err(MqttError::PacketNotRegulated);
1126 }
1127 } else {
1128 return Err(MqttError::PacketNotRegulated);
1129 }
1130 } else {
1131 return Err(MqttError::PacketNotRegulated);
1132 }
1133 } else {
1134 return Err(MqttError::PacketNotRegulated);
1135 }
1136 } else {
1137 packet = packet.remove_topic_alias();
1139 }
1140
1141 Ok(packet)
1142 }
1143
1144 fn initialize(&mut self, is_client: bool) {
1158 self.publish_send_max = None;
1159 self.publish_recv_max = None;
1160 self.publish_send_count = 0;
1161 self.topic_alias_send = None;
1162 self.topic_alias_recv = None;
1163 self.publish_recv.clear();
1164 self.qos2_publish_processing.clear();
1165 self.need_store = false;
1166 self.pid_suback.clear();
1167 self.pid_unsuback.clear();
1168 self.is_client = is_client;
1169 }
1170
1171 fn clear_store_related(&mut self) {
1172 self.pid_man.clear();
1173 self.pid_puback.clear();
1174 self.pid_pubrec.clear();
1175 self.pid_pubcomp.clear();
1176 self.store.clear();
1177 }
1178
1179 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1181 let mut events = Vec::new();
1182 self.store.for_each(|packet| {
1183 if packet.size() > self.maximum_packet_size_send as usize {
1184 let packet_id = packet.packet_id();
1185 self.pid_man.release_id(packet_id);
1186 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1187 return false; }
1189 events.push(GenericEvent::RequestSendPacket {
1190 packet: packet.clone().into(),
1191 release_packet_id_if_send_error: None,
1192 });
1193 true });
1195
1196 events
1197 }
1198
1199 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1211 let topic_alias = topic_alias_opt?;
1212
1213 if !self.validate_topic_alias_range(topic_alias) {
1214 return None;
1215 }
1216
1217 let topic_alias_send = self.topic_alias_send.as_mut()?;
1218 let topic = topic_alias_send.get(topic_alias)?;
1220
1221 Some(topic.to_string())
1222 }
1223
1224 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1236 let topic_alias_send = match &self.topic_alias_send {
1237 Some(tas) => tas,
1238 None => {
1239 tracing::error!("topic_alias is set but topic_alias_maximum is 0");
1240 return false;
1241 }
1242 };
1243
1244 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1245 tracing::error!("topic_alias is set but out of range");
1246 return false;
1247 }
1248
1249 true
1250 }
1251
1252 pub(crate) fn process_send_v3_1_1_connect(
1254 &mut self,
1255 packet: v3_1_1::Connect,
1256 ) -> Vec<GenericEvent<PacketIdType>> {
1257 if self.status != ConnectionStatus::Disconnected {
1258 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1259 }
1260 if !self.validate_maximum_packet_size_send(packet.size()) {
1261 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1262 }
1263
1264 let mut events = Vec::new();
1265 self.initialize(true);
1266 self.status = ConnectionStatus::Connecting;
1267
1268 let keep_alive = packet.keep_alive();
1270 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1271 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1272 }
1273
1274 if packet.clean_start() {
1276 self.clear_store_related();
1277 } else {
1278 self.need_store = true;
1279 }
1280
1281 self.topic_alias_send = None;
1283
1284 events.push(GenericEvent::RequestSendPacket {
1285 packet: packet.into(),
1286 release_packet_id_if_send_error: None,
1287 });
1288 self.send_post_process(&mut events);
1289
1290 events
1291 }
1292
1293 pub(crate) fn process_send_v5_0_connect(
1295 &mut self,
1296 packet: v5_0::Connect,
1297 ) -> Vec<GenericEvent<PacketIdType>> {
1298 if !self.validate_maximum_packet_size_send(packet.size()) {
1299 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1300 }
1301 if self.status != ConnectionStatus::Disconnected {
1302 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1303 }
1304
1305 let mut events = Vec::new();
1306 self.initialize(true);
1307 self.status = ConnectionStatus::Connecting;
1308
1309 let keep_alive = packet.keep_alive();
1311 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1312 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1313 }
1314
1315 if packet.clean_start() {
1317 self.clear_store_related();
1318 }
1319
1320 for prop in packet.props() {
1322 match prop {
1323 Property::TopicAliasMaximum(val) => {
1324 if val.val() != 0 {
1325 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1326 }
1327 }
1328 Property::ReceiveMaximum(val) => {
1329 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1330 self.publish_recv_max = Some(val.val());
1331 }
1332 Property::MaximumPacketSize(val) => {
1333 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1334 self.maximum_packet_size_recv = val.val();
1335 }
1336 Property::SessionExpiryInterval(val) => {
1337 if val.val() != 0 {
1338 self.need_store = true;
1339 }
1340 }
1341 _ => {
1342 }
1344 }
1345 }
1346
1347 events.push(GenericEvent::RequestSendPacket {
1348 packet: packet.into(),
1349 release_packet_id_if_send_error: None,
1350 });
1351 self.send_post_process(&mut events);
1352
1353 events
1354 }
1355
1356 pub(crate) fn process_send_v3_1_1_connack(
1357 &mut self,
1358 packet: v3_1_1::Connack,
1359 ) -> Vec<GenericEvent<PacketIdType>> {
1360 if self.status != ConnectionStatus::Connecting {
1361 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1362 }
1363 let mut events = Vec::new();
1364 if packet.return_code() == ConnectReturnCode::Accepted {
1365 self.status = ConnectionStatus::Connected;
1366 } else {
1367 self.status = ConnectionStatus::Disconnected;
1368 }
1369
1370 events.push(GenericEvent::RequestSendPacket {
1371 packet: packet.into(),
1372 release_packet_id_if_send_error: None,
1373 });
1374 events.extend(self.send_stored());
1375 self.send_post_process(&mut events);
1376
1377 events
1378 }
1379
1380 pub(crate) fn process_send_v5_0_connack(
1381 &mut self,
1382 packet: v5_0::Connack,
1383 ) -> Vec<GenericEvent<PacketIdType>> {
1384 if !self.validate_maximum_packet_size_send(packet.size()) {
1385 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1386 }
1387 if self.status != ConnectionStatus::Connecting {
1388 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1389 }
1390
1391 let mut events = Vec::new();
1392
1393 if packet.reason_code() == ConnectReasonCode::Success {
1394 self.status = ConnectionStatus::Connected;
1395
1396 for prop in packet.props() {
1398 match prop {
1399 Property::TopicAliasMaximum(val) => {
1400 if val.val() != 0 {
1401 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1402 }
1403 }
1404 Property::ReceiveMaximum(val) => {
1405 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1406 self.publish_recv_max = Some(val.val());
1407 }
1408 Property::MaximumPacketSize(val) => {
1409 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1410 self.maximum_packet_size_recv = val.val();
1411 }
1412 _ => {
1413 }
1415 }
1416 }
1417 } else {
1418 self.status = ConnectionStatus::Disconnected;
1419 self.cancel_timers(&mut events);
1420 }
1421
1422 events.push(GenericEvent::RequestSendPacket {
1423 packet: packet.into(),
1424 release_packet_id_if_send_error: None,
1425 });
1426 events.extend(self.send_stored());
1427 self.send_post_process(&mut events);
1428
1429 events
1430 }
1431
1432 pub(crate) fn process_send_v3_1_1_publish(
1433 &mut self,
1434 packet: v3_1_1::GenericPublish<PacketIdType>,
1435 ) -> Vec<GenericEvent<PacketIdType>> {
1436 let mut events = Vec::new();
1437 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1438
1439 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1440 let packet_id = packet.packet_id().unwrap();
1442 if self.status != ConnectionStatus::Connected
1443 && !self.need_store
1444 && !self.offline_publish
1445 {
1446 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1447 if self.pid_man.is_used_id(packet_id) {
1448 self.pid_man.release_id(packet_id);
1449 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1450 }
1451 return events;
1452 }
1453 if !self.pid_man.is_used_id(packet_id) {
1454 tracing::error!("packet_id {packet_id} must be acquired or registered");
1455 events.push(GenericEvent::NotifyError(
1456 MqttError::PacketIdentifierInvalid,
1457 ));
1458 return events;
1459 }
1460 if self.need_store
1461 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1462 {
1463 let store_packet = packet.clone().set_dup(true);
1464 self.store.add(store_packet.try_into().unwrap()).unwrap();
1465 } else {
1466 release_packet_id_if_send_error = Some(packet_id);
1467 }
1468 if packet.qos() == Qos::ExactlyOnce {
1469 self.qos2_publish_processing.insert(packet_id);
1470 self.pid_pubrec.insert(packet_id);
1471 } else {
1472 self.pid_puback.insert(packet_id);
1473 }
1474 } else if self.status != ConnectionStatus::Connected {
1475 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1476 return events;
1477 }
1478
1479 if self.status == ConnectionStatus::Connected {
1480 events.push(GenericEvent::RequestSendPacket {
1481 packet: packet.into(),
1482 release_packet_id_if_send_error,
1483 });
1484 }
1485 self.send_post_process(&mut events);
1486
1487 events
1488 }
1489
1490 pub(crate) fn process_send_v5_0_publish(
1491 &mut self,
1492 mut packet: v5_0::GenericPublish<PacketIdType>,
1493 ) -> Vec<GenericEvent<PacketIdType>> {
1494 if !self.validate_maximum_packet_size_send(packet.size()) {
1495 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1496 }
1497
1498 let mut events = Vec::new();
1499 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1500 let mut topic_alias_validated = false;
1501 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1502 let packet_id = packet.packet_id().unwrap();
1503 if self.status != ConnectionStatus::Connected
1504 && !self.need_store
1505 && !self.offline_publish
1506 {
1507 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1508 if self.pid_man.is_used_id(packet_id) {
1509 self.pid_man.release_id(packet_id);
1510 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1511 }
1512 return events;
1513 }
1514
1515 if !self.pid_man.is_used_id(packet_id) {
1517 tracing::error!("packet_id {packet_id} must be acquired or registered");
1518 events.push(GenericEvent::NotifyError(
1519 MqttError::PacketIdentifierInvalid,
1520 ));
1521 return events;
1522 }
1523
1524 if self.need_store
1525 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1526 {
1527 let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1528 if packet.topic_name().is_empty() {
1529 let topic_opt = self.validate_topic_alias(ta_opt);
1531 if topic_opt.is_none() {
1532 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1533 if self.pid_man.is_used_id(packet_id) {
1534 self.pid_man.release_id(packet_id);
1535 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1536 }
1537 return events;
1538 }
1539 topic_alias_validated = true;
1540 let store_packet = packet
1541 .clone()
1542 .remove_topic_alias_add_topic(topic_opt.unwrap())
1543 .unwrap()
1544 .set_dup(true);
1545 self.store.add(store_packet.try_into().unwrap()).unwrap();
1547 } else {
1548 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1550 self.store.add(store_packet.try_into().unwrap()).unwrap();
1551 }
1552 } else {
1553 release_packet_id_if_send_error = Some(packet_id);
1554 }
1555 if packet.qos() == Qos::ExactlyOnce {
1556 self.qos2_publish_processing.insert(packet_id);
1557 self.pid_pubrec.insert(packet_id);
1558 } else {
1559 self.pid_puback.insert(packet_id);
1560 }
1561 } else if self.status != ConnectionStatus::Connected {
1562 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1563 return events;
1564 }
1565
1566 let packet_id_opt = packet.packet_id();
1567 let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1568 if packet.topic_name().is_empty() {
1569 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1571 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1572 if let Some(packet_id) = packet_id_opt {
1573 if self.pid_man.is_used_id(packet_id) {
1574 self.pid_man.release_id(packet_id);
1575 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1576 }
1577 }
1578 return events;
1579 }
1580 } else if let Some(ta) = ta_opt {
1581 if self.validate_topic_alias_range(ta) {
1583 tracing::trace!(
1584 "topic alias: {} - {} is registered.",
1585 packet.topic_name(),
1586 ta
1587 );
1588 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590 }
1591 } else {
1592 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593 if let Some(packet_id) = packet_id_opt {
1594 if self.pid_man.is_used_id(packet_id) {
1595 self.pid_man.release_id(packet_id);
1596 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597 }
1598 }
1599 return events;
1600 }
1601 } else if self.status == ConnectionStatus::Connected {
1602 if self.auto_map_topic_alias_send {
1604 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1606 tracing::trace!(
1607 "topic alias: {} - {} is found.",
1608 packet.topic_name(),
1609 found_ta
1610 );
1611 packet = packet.remove_topic_add_topic_alias(found_ta);
1612 } else {
1613 let lru_ta = topic_alias_send.get_lru_alias();
1614 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1615 packet = packet.remove_topic_add_topic_alias(lru_ta);
1616 }
1617 }
1618 } else if self.auto_replace_topic_alias_send {
1619 if let Some(ref topic_alias_send) = self.topic_alias_send {
1620 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1621 tracing::trace!(
1622 "topic alias: {} - {} is found.",
1623 packet.topic_name(),
1624 found_ta
1625 );
1626 packet = packet.remove_topic_add_topic_alias(found_ta);
1627 }
1628 }
1629 }
1630 }
1631
1632 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1634 if let Some(max) = self.publish_send_max {
1635 if self.publish_send_count == max {
1636 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1637 if let Some(packet_id) = packet_id_opt {
1638 if self.pid_man.is_used_id(packet_id) {
1639 self.pid_man.release_id(packet_id);
1640 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1641 }
1642 }
1643 return events;
1644 }
1645 self.publish_send_count += 1;
1646 }
1647 }
1648
1649 if self.status == ConnectionStatus::Connected {
1650 events.push(GenericEvent::RequestSendPacket {
1651 packet: packet.into(),
1652 release_packet_id_if_send_error,
1653 });
1654 }
1655 self.send_post_process(&mut events);
1656
1657 events
1658 }
1659
1660 pub(crate) fn process_send_v3_1_1_puback(
1661 &mut self,
1662 packet: v3_1_1::GenericPuback<PacketIdType>,
1663 ) -> Vec<GenericEvent<PacketIdType>> {
1664 if self.status != ConnectionStatus::Connected {
1665 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1666 }
1667 let mut events = Vec::new();
1668
1669 events.push(GenericEvent::RequestSendPacket {
1670 packet: packet.into(),
1671 release_packet_id_if_send_error: None,
1672 });
1673 self.send_post_process(&mut events);
1674
1675 events
1676 }
1677
1678 pub(crate) fn process_send_v5_0_puback(
1679 &mut self,
1680 packet: v5_0::GenericPuback<PacketIdType>,
1681 ) -> Vec<GenericEvent<PacketIdType>> {
1682 if !self.validate_maximum_packet_size_send(packet.size()) {
1683 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1684 }
1685 if self.status != ConnectionStatus::Connected {
1686 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1687 }
1688
1689 let mut events = Vec::new();
1690 self.publish_recv.remove(&packet.packet_id());
1691
1692 events.push(GenericEvent::RequestSendPacket {
1693 packet: packet.into(),
1694 release_packet_id_if_send_error: None,
1695 });
1696 self.send_post_process(&mut events);
1697
1698 events
1699 }
1700
1701 pub(crate) fn process_send_v3_1_1_pubrec(
1702 &mut self,
1703 packet: v3_1_1::GenericPubrec<PacketIdType>,
1704 ) -> Vec<GenericEvent<PacketIdType>> {
1705 if self.status != ConnectionStatus::Connected {
1706 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1707 }
1708 let mut events = Vec::new();
1709
1710 events.push(GenericEvent::RequestSendPacket {
1711 packet: packet.into(),
1712 release_packet_id_if_send_error: None,
1713 });
1714 self.send_post_process(&mut events);
1715
1716 events
1717 }
1718
1719 pub(crate) fn process_send_v5_0_pubrec(
1720 &mut self,
1721 packet: v5_0::GenericPubrec<PacketIdType>,
1722 ) -> Vec<GenericEvent<PacketIdType>> {
1723 if !self.validate_maximum_packet_size_send(packet.size()) {
1724 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1725 }
1726 if self.status != ConnectionStatus::Connected {
1727 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1728 }
1729
1730 let mut events = Vec::new();
1731 let packet_id = packet.packet_id();
1732
1733 if let Some(rc) = packet.reason_code() {
1734 if rc.is_failure() {
1735 self.publish_recv.remove(&packet_id);
1736 self.qos2_publish_handled.remove(&packet_id);
1737 }
1738 }
1739
1740 events.push(GenericEvent::RequestSendPacket {
1741 packet: packet.into(),
1742 release_packet_id_if_send_error: None,
1743 });
1744 self.send_post_process(&mut events);
1745
1746 events
1747 }
1748
1749 pub(crate) fn process_send_v3_1_1_pubrel(
1750 &mut self,
1751 packet: v3_1_1::GenericPubrel<PacketIdType>,
1752 ) -> Vec<GenericEvent<PacketIdType>> {
1753 if self.status != ConnectionStatus::Connected && !self.need_store {
1754 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1755 }
1756 let mut events = Vec::new();
1757 let packet_id = packet.packet_id();
1758 if !self.pid_man.is_used_id(packet_id) {
1759 tracing::error!("packet_id {packet_id} must be acquired or registered");
1760 events.push(GenericEvent::NotifyError(
1761 MqttError::PacketIdentifierInvalid,
1762 ));
1763 return events;
1764 }
1765 if self.need_store {
1766 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1767 }
1768
1769 if self.status == ConnectionStatus::Connected {
1770 self.pid_pubcomp.insert(packet_id);
1771 events.push(GenericEvent::RequestSendPacket {
1772 packet: packet.into(),
1773 release_packet_id_if_send_error: None,
1774 });
1775 }
1776 self.send_post_process(&mut events);
1777
1778 events
1779 }
1780
1781 pub(crate) fn process_send_v5_0_pubrel(
1782 &mut self,
1783 packet: v5_0::GenericPubrel<PacketIdType>,
1784 ) -> Vec<GenericEvent<PacketIdType>> {
1785 if !self.validate_maximum_packet_size_send(packet.size()) {
1786 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1787 }
1788 if self.status != ConnectionStatus::Connected && !self.need_store {
1789 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1790 }
1791
1792 let mut events = Vec::new();
1793 let packet_id = packet.packet_id();
1794 if !self.pid_man.is_used_id(packet_id) {
1795 tracing::error!("packet_id {packet_id} must be acquired or registered");
1796 events.push(GenericEvent::NotifyError(
1797 MqttError::PacketIdentifierInvalid,
1798 ));
1799 return events;
1800 }
1801 if self.need_store {
1802 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1803 }
1804
1805 if self.status == ConnectionStatus::Connected {
1806 self.pid_pubcomp.insert(packet_id);
1807 events.push(GenericEvent::RequestSendPacket {
1808 packet: packet.into(),
1809 release_packet_id_if_send_error: None,
1810 });
1811 }
1812 self.send_post_process(&mut events);
1813
1814 events
1815 }
1816
1817 pub(crate) fn process_send_v3_1_1_pubcomp(
1818 &mut self,
1819 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1820 ) -> Vec<GenericEvent<PacketIdType>> {
1821 if self.status != ConnectionStatus::Connected {
1822 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1823 }
1824 let mut events = Vec::new();
1825
1826 events.push(GenericEvent::RequestSendPacket {
1827 packet: packet.into(),
1828 release_packet_id_if_send_error: None,
1829 });
1830 self.send_post_process(&mut events);
1831
1832 events
1833 }
1834
1835 pub(crate) fn process_send_v5_0_pubcomp(
1836 &mut self,
1837 packet: v5_0::GenericPubcomp<PacketIdType>,
1838 ) -> Vec<GenericEvent<PacketIdType>> {
1839 if !self.validate_maximum_packet_size_send(packet.size()) {
1840 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1841 }
1842 if self.status != ConnectionStatus::Connected {
1843 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1844 }
1845
1846 let mut events = Vec::new();
1847 self.publish_recv.remove(&packet.packet_id());
1848
1849 events.push(GenericEvent::RequestSendPacket {
1850 packet: packet.into(),
1851 release_packet_id_if_send_error: None,
1852 });
1853 self.send_post_process(&mut events);
1854
1855 events
1856 }
1857
1858 pub(crate) fn process_send_v3_1_1_subscribe(
1859 &mut self,
1860 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1861 ) -> Vec<GenericEvent<PacketIdType>> {
1862 let mut events = Vec::new();
1863 let packet_id = packet.packet_id();
1864 if self.status != ConnectionStatus::Connected {
1865 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1866 if self.pid_man.is_used_id(packet_id) {
1867 self.pid_man.release_id(packet_id);
1868 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1869 }
1870 return events;
1871 }
1872 if !self.pid_man.is_used_id(packet_id) {
1873 tracing::error!("packet_id {packet_id} must be acquired or registered");
1874 events.push(GenericEvent::NotifyError(
1875 MqttError::PacketIdentifierInvalid,
1876 ));
1877 return events;
1878 }
1879 self.pid_suback.insert(packet_id);
1880
1881 events.push(GenericEvent::RequestSendPacket {
1882 packet: packet.into(),
1883 release_packet_id_if_send_error: Some(packet_id),
1884 });
1885 self.send_post_process(&mut events);
1886
1887 events
1888 }
1889
1890 pub(crate) fn process_send_v5_0_subscribe(
1891 &mut self,
1892 packet: v5_0::GenericSubscribe<PacketIdType>,
1893 ) -> Vec<GenericEvent<PacketIdType>> {
1894 if !self.validate_maximum_packet_size_send(packet.size()) {
1895 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1896 }
1897
1898 let mut events = Vec::new();
1899 let packet_id = packet.packet_id();
1900 if self.status != ConnectionStatus::Connected {
1901 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1902 if self.pid_man.is_used_id(packet_id) {
1903 self.pid_man.release_id(packet_id);
1904 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1905 }
1906 return events;
1907 }
1908 if !self.pid_man.is_used_id(packet_id) {
1909 tracing::error!("packet_id {packet_id} must be acquired or registered");
1910 events.push(GenericEvent::NotifyError(
1911 MqttError::PacketIdentifierInvalid,
1912 ));
1913 return events;
1914 }
1915 self.pid_suback.insert(packet_id);
1916
1917 events.push(GenericEvent::RequestSendPacket {
1918 packet: packet.into(),
1919 release_packet_id_if_send_error: Some(packet_id),
1920 });
1921 self.send_post_process(&mut events);
1922
1923 events
1924 }
1925
1926 pub(crate) fn process_send_v3_1_1_suback(
1927 &mut self,
1928 packet: v3_1_1::GenericSuback<PacketIdType>,
1929 ) -> Vec<GenericEvent<PacketIdType>> {
1930 if self.status != ConnectionStatus::Connected {
1931 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1932 }
1933 let mut events = Vec::new();
1934 events.push(GenericEvent::RequestSendPacket {
1935 packet: packet.into(),
1936 release_packet_id_if_send_error: None,
1937 });
1938 self.send_post_process(&mut events);
1939
1940 events
1941 }
1942
1943 pub(crate) fn process_send_v5_0_suback(
1944 &mut self,
1945 packet: v5_0::GenericSuback<PacketIdType>,
1946 ) -> Vec<GenericEvent<PacketIdType>> {
1947 if !self.validate_maximum_packet_size_send(packet.size()) {
1948 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1949 }
1950 if self.status != ConnectionStatus::Connected {
1951 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1952 }
1953
1954 let mut events = Vec::new();
1955 events.push(GenericEvent::RequestSendPacket {
1956 packet: packet.into(),
1957 release_packet_id_if_send_error: None,
1958 });
1959 self.send_post_process(&mut events);
1960
1961 events
1962 }
1963
1964 pub(crate) fn process_send_v3_1_1_unsubscribe(
1965 &mut self,
1966 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1967 ) -> Vec<GenericEvent<PacketIdType>> {
1968 let mut events = Vec::new();
1969 let packet_id = packet.packet_id();
1970 if self.status != ConnectionStatus::Connected {
1971 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1972 if self.pid_man.is_used_id(packet_id) {
1973 self.pid_man.release_id(packet_id);
1974 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1975 }
1976 return events;
1977 }
1978 if !self.pid_man.is_used_id(packet_id) {
1979 tracing::error!("packet_id {packet_id} must be acquired or registered");
1980 events.push(GenericEvent::NotifyError(
1981 MqttError::PacketIdentifierInvalid,
1982 ));
1983 return events;
1984 }
1985 self.pid_unsuback.insert(packet_id);
1986
1987 events.push(GenericEvent::RequestSendPacket {
1988 packet: packet.into(),
1989 release_packet_id_if_send_error: Some(packet_id),
1990 });
1991 self.send_post_process(&mut events);
1992
1993 events
1994 }
1995
1996 pub(crate) fn process_send_v5_0_unsubscribe(
1997 &mut self,
1998 packet: v5_0::GenericUnsubscribe<PacketIdType>,
1999 ) -> Vec<GenericEvent<PacketIdType>> {
2000 if !self.validate_maximum_packet_size_send(packet.size()) {
2001 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2002 }
2003
2004 let mut events = Vec::new();
2005 let packet_id = packet.packet_id();
2006 if self.status != ConnectionStatus::Connected {
2007 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2008 if self.pid_man.is_used_id(packet_id) {
2009 self.pid_man.release_id(packet_id);
2010 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2011 }
2012 return events;
2013 }
2014 if !self.pid_man.is_used_id(packet_id) {
2015 tracing::error!("packet_id {packet_id} must be acquired or registered");
2016 events.push(GenericEvent::NotifyError(
2017 MqttError::PacketIdentifierInvalid,
2018 ));
2019 return events;
2020 }
2021 self.pid_unsuback.insert(packet_id);
2022
2023 events.push(GenericEvent::RequestSendPacket {
2024 packet: packet.into(),
2025 release_packet_id_if_send_error: Some(packet_id),
2026 });
2027 self.send_post_process(&mut events);
2028
2029 events
2030 }
2031
2032 pub(crate) fn process_send_v3_1_1_unsuback(
2033 &mut self,
2034 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2035 ) -> Vec<GenericEvent<PacketIdType>> {
2036 if self.status != ConnectionStatus::Connected {
2037 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2038 }
2039 let mut events = Vec::new();
2040 events.push(GenericEvent::RequestSendPacket {
2041 packet: packet.into(),
2042 release_packet_id_if_send_error: None,
2043 });
2044 self.send_post_process(&mut events);
2045
2046 events
2047 }
2048
2049 pub(crate) fn process_send_v5_0_unsuback(
2050 &mut self,
2051 packet: v5_0::GenericUnsuback<PacketIdType>,
2052 ) -> Vec<GenericEvent<PacketIdType>> {
2053 if !self.validate_maximum_packet_size_send(packet.size()) {
2054 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2055 }
2056 if self.status != ConnectionStatus::Connected {
2057 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2058 }
2059
2060 let mut events = Vec::new();
2061 events.push(GenericEvent::RequestSendPacket {
2062 packet: packet.into(),
2063 release_packet_id_if_send_error: None,
2064 });
2065 self.send_post_process(&mut events);
2066
2067 events
2068 }
2069
2070 pub(crate) fn process_send_v3_1_1_pingreq(
2071 &mut self,
2072 packet: v3_1_1::Pingreq,
2073 ) -> Vec<GenericEvent<PacketIdType>> {
2074 if self.status != ConnectionStatus::Connected {
2075 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076 }
2077 let mut events = Vec::new();
2078 events.push(GenericEvent::RequestSendPacket {
2079 packet: packet.into(),
2080 release_packet_id_if_send_error: None,
2081 });
2082 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2083 self.pingreq_send_set = true;
2084 events.push(GenericEvent::RequestTimerReset {
2085 kind: TimerKind::PingrespRecv,
2086 duration_ms: timeout_ms,
2087 });
2088 }
2089 self.send_post_process(&mut events);
2090
2091 events
2092 }
2093
2094 pub(crate) fn process_send_v5_0_pingreq(
2095 &mut self,
2096 packet: v5_0::Pingreq,
2097 ) -> Vec<GenericEvent<PacketIdType>> {
2098 if !self.validate_maximum_packet_size_send(packet.size()) {
2099 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2100 }
2101 if self.status != ConnectionStatus::Connected {
2102 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2103 }
2104
2105 let mut events = Vec::new();
2106 events.push(GenericEvent::RequestSendPacket {
2107 packet: packet.into(),
2108 release_packet_id_if_send_error: None,
2109 });
2110 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2111 self.pingreq_send_set = true;
2112 events.push(GenericEvent::RequestTimerReset {
2113 kind: TimerKind::PingrespRecv,
2114 duration_ms: timeout_ms,
2115 });
2116 }
2117 self.send_post_process(&mut events);
2118
2119 events
2120 }
2121
2122 pub(crate) fn process_send_v3_1_1_pingresp(
2123 &mut self,
2124 packet: v3_1_1::Pingresp,
2125 ) -> Vec<GenericEvent<PacketIdType>> {
2126 if self.status != ConnectionStatus::Connected {
2127 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2128 }
2129 let mut events = Vec::new();
2130 events.push(GenericEvent::RequestSendPacket {
2131 packet: packet.into(),
2132 release_packet_id_if_send_error: None,
2133 });
2134 self.send_post_process(&mut events);
2135
2136 events
2137 }
2138
2139 pub(crate) fn process_send_v5_0_pingresp(
2140 &mut self,
2141 packet: v5_0::Pingresp,
2142 ) -> Vec<GenericEvent<PacketIdType>> {
2143 if !self.validate_maximum_packet_size_send(packet.size()) {
2144 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2145 }
2146 if self.status != ConnectionStatus::Connected {
2147 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2148 }
2149
2150 let mut events = Vec::new();
2151 events.push(GenericEvent::RequestSendPacket {
2152 packet: packet.into(),
2153 release_packet_id_if_send_error: None,
2154 });
2155 self.send_post_process(&mut events);
2156
2157 events
2158 }
2159
2160 pub(crate) fn process_send_v3_1_1_disconnect(
2161 &mut self,
2162 packet: v3_1_1::Disconnect,
2163 ) -> Vec<GenericEvent<PacketIdType>> {
2164 if self.status != ConnectionStatus::Connected {
2165 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166 }
2167 let mut events = Vec::new();
2168 self.status = ConnectionStatus::Disconnected;
2169 self.cancel_timers(&mut events);
2170 events.push(GenericEvent::RequestSendPacket {
2171 packet: packet.into(),
2172 release_packet_id_if_send_error: None,
2173 });
2174 events.push(GenericEvent::RequestClose);
2175
2176 events
2177 }
2178
2179 pub(crate) fn process_send_v5_0_disconnect(
2180 &mut self,
2181 packet: v5_0::Disconnect,
2182 ) -> Vec<GenericEvent<PacketIdType>> {
2183 if !self.validate_maximum_packet_size_send(packet.size()) {
2184 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2185 }
2186 if self.status != ConnectionStatus::Connected {
2187 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2188 }
2189
2190 let mut events = Vec::new();
2191 self.status = ConnectionStatus::Disconnected;
2192 self.cancel_timers(&mut events);
2193 events.push(GenericEvent::RequestSendPacket {
2194 packet: packet.into(),
2195 release_packet_id_if_send_error: None,
2196 });
2197 events.push(GenericEvent::RequestClose);
2198
2199 events
2200 }
2201
2202 pub(crate) fn process_send_v5_0_auth(
2203 &mut self,
2204 packet: v5_0::Auth,
2205 ) -> Vec<GenericEvent<PacketIdType>> {
2206 if !self.validate_maximum_packet_size_send(packet.size()) {
2207 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2208 }
2209 if self.status == ConnectionStatus::Disconnected {
2210 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2211 }
2212
2213 let mut events = Vec::new();
2214 events.push(GenericEvent::RequestSendPacket {
2215 packet: packet.into(),
2216 release_packet_id_if_send_error: None,
2217 });
2218 self.send_post_process(&mut events);
2219
2220 events
2221 }
2222
2223 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2224 if self.is_client {
2225 if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2226 self.pingreq_send_set = true;
2227 events.push(GenericEvent::RequestTimerReset {
2228 kind: TimerKind::PingreqSend,
2229 duration_ms: timeout_ms,
2230 });
2231 }
2232 }
2233 }
2234
2235 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2236 if size > self.maximum_packet_size_send as usize {
2237 tracing::error!("packet size over maximum_packet_size for sending");
2238 return false;
2239 }
2240 true
2241 }
2242
2243 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2244 let mut events = Vec::new();
2245
2246 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2248 if total_size > self.maximum_packet_size_recv {
2249 let disconnect_packet = v5_0::Disconnect::builder()
2255 .reason_code(DisconnectReasonCode::PacketTooLarge)
2256 .build()
2257 .unwrap();
2258 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2260 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2261 return events;
2262 }
2263
2264 let packet_type = raw_packet.packet_type();
2265 let _flags = raw_packet.flags();
2266 match self.protocol_version {
2267 Version::V3_1_1 => {
2268 match packet_type {
2269 1 => {
2270 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2272 }
2273 2 => {
2274 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2276 }
2277 3 => {
2278 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2280 }
2281 4 => {
2282 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2284 }
2285 5 => {
2286 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2288 }
2289 6 => {
2290 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2292 }
2293 7 => {
2294 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2296 }
2297 8 => {
2298 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2300 }
2301 9 => {
2302 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2304 }
2305 10 => {
2306 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2308 }
2309 11 => {
2310 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2312 }
2313 12 => {
2314 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2316 }
2317 13 => {
2318 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2320 }
2321 14 => {
2322 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2324 }
2325 _ => {
2327 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2328 }
2329 }
2330 }
2331 Version::V5_0 => {
2332 match packet_type {
2333 1 => {
2334 events.extend(self.process_recv_v5_0_connect(raw_packet));
2336 }
2337 2 => {
2338 events.extend(self.process_recv_v5_0_connack(raw_packet));
2340 }
2341 3 => {
2342 events.extend(self.process_recv_v5_0_publish(raw_packet));
2344 }
2345 4 => {
2346 events.extend(self.process_recv_v5_0_puback(raw_packet));
2348 }
2349 5 => {
2350 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2352 }
2353 6 => {
2354 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2356 }
2357 7 => {
2358 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2360 }
2361 8 => {
2362 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2364 }
2365 9 => {
2366 events.extend(self.process_recv_v5_0_suback(raw_packet));
2368 }
2369 10 => {
2370 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2372 }
2373 11 => {
2374 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2376 }
2377 12 => {
2378 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2380 }
2381 13 => {
2382 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2384 }
2385 14 => {
2386 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2388 }
2389 15 => {
2390 events.extend(self.process_recv_v5_0_auth(raw_packet));
2392 }
2393 _ => {
2395 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2396 }
2397 }
2398 }
2399 Version::Undetermined => {
2400 match packet_type {
2401 1 => {
2402 if raw_packet.remaining_length() < 7 {
2404 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2405 return events;
2406 }
2407 match raw_packet.data_as_slice()[6] {
2408 4 => {
2410 self.protocol_version = Version::V3_1_1;
2411 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2412 }
2413 5 => {
2414 self.protocol_version = Version::V5_0;
2415 events.extend(self.process_recv_v5_0_connect(raw_packet));
2416 }
2417 _ => {
2418 events.push(GenericEvent::NotifyError(
2419 MqttError::UnsupportedProtocolVersion,
2420 ));
2421 }
2422 }
2423 }
2424 _ => {
2425 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2426 }
2427 }
2428 }
2429 }
2430
2431 events
2432 }
2433
2434 fn process_recv_v3_1_1_connect(
2435 &mut self,
2436 raw_packet: RawPacket,
2437 ) -> Vec<GenericEvent<PacketIdType>> {
2438 let mut events = Vec::new();
2439 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2440 Ok((packet, _)) => {
2441 if self.status != ConnectionStatus::Disconnected {
2442 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2443 return events;
2444 }
2445 self.initialize(false);
2446 self.status = ConnectionStatus::Connecting;
2447 if packet.keep_alive() > 0 {
2448 self.pingreq_recv_timeout_ms =
2449 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2450 }
2451 if packet.clean_session() {
2452 self.clear_store_related();
2453 } else {
2454 self.need_store = true;
2455 }
2456 events.extend(self.refresh_pingreq_recv());
2457 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2458 }
2459 Err(e) => {
2460 if self.status == ConnectionStatus::Disconnected {
2461 self.status = ConnectionStatus::Connecting;
2462 let rc = match e {
2463 MqttError::ClientIdentifierNotValid => {
2464 ConnectReturnCode::IdentifierRejected
2465 }
2466 MqttError::BadUserNameOrPassword => {
2467 ConnectReturnCode::BadUserNameOrPassword
2468 }
2469 MqttError::UnsupportedProtocolVersion => {
2470 ConnectReturnCode::UnacceptableProtocolVersion
2471 }
2472 _ => ConnectReturnCode::NotAuthorized, };
2474 let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2475 let connack_events = self.process_send_v3_1_1_connack(connack);
2476 events.extend(connack_events);
2477 } else {
2478 events.push(GenericEvent::RequestClose);
2479 }
2480 events.push(GenericEvent::NotifyError(e));
2481 }
2482 }
2483
2484 events
2485 }
2486
2487 fn process_recv_v5_0_connect(
2488 &mut self,
2489 raw_packet: RawPacket,
2490 ) -> Vec<GenericEvent<PacketIdType>> {
2491 let mut events = Vec::new();
2492 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2493 Ok((packet, _)) => {
2494 if self.status != ConnectionStatus::Disconnected {
2495 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2496 return events;
2497 }
2498 self.initialize(false);
2499 self.status = ConnectionStatus::Connecting;
2500 if packet.keep_alive() > 0 {
2501 self.pingreq_recv_timeout_ms =
2502 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2503 }
2504 if packet.clean_start() {
2505 self.clear_store_related();
2506 }
2507 packet.props().iter().for_each(|prop| match prop {
2508 Property::TopicAliasMaximum(p) => {
2509 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2510 }
2511 Property::ReceiveMaximum(p) => {
2512 self.publish_send_max = Some(p.val());
2513 }
2514 Property::MaximumPacketSize(p) => {
2515 self.maximum_packet_size_send = p.val();
2516 }
2517 Property::SessionExpiryInterval(p) => {
2518 if p.val() != 0 {
2519 self.need_store = true;
2520 }
2521 }
2522 _ => {}
2523 });
2524 events.extend(self.refresh_pingreq_recv());
2525 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2526 }
2527 Err(e) => {
2528 if self.status == ConnectionStatus::Disconnected {
2529 self.status = ConnectionStatus::Connecting;
2530 let rc = match e {
2531 MqttError::ClientIdentifierNotValid => {
2532 ConnectReasonCode::ClientIdentifierNotValid
2533 }
2534 MqttError::BadUserNameOrPassword => {
2535 ConnectReasonCode::BadAuthenticationMethod
2536 }
2537 MqttError::UnsupportedProtocolVersion => {
2538 ConnectReasonCode::UnsupportedProtocolVersion
2539 }
2540 _ => ConnectReasonCode::UnspecifiedError,
2541 };
2542 let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2543 let connack_events = self.process_send_v5_0_connack(connack);
2544 events.extend(connack_events);
2545 } else {
2546 let disconnect = v5_0::Disconnect::builder()
2547 .reason_code(DisconnectReasonCode::ProtocolError)
2548 .build()
2549 .unwrap();
2550 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2551 events.extend(disconnect_events);
2552 }
2553 events.push(GenericEvent::NotifyError(e));
2554 }
2555 }
2556
2557 events
2558 }
2559
2560 fn process_recv_v3_1_1_connack(
2561 &mut self,
2562 raw_packet: RawPacket,
2563 ) -> Vec<GenericEvent<PacketIdType>> {
2564 let mut events = Vec::new();
2565
2566 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2567 Ok((packet, _consumed)) => {
2568 if packet.return_code() == ConnectReturnCode::Accepted {
2569 self.status = ConnectionStatus::Connected;
2570 if packet.session_present() {
2571 events.extend(self.send_stored());
2572 } else {
2573 self.clear_store_related();
2574 }
2575 }
2576 events.push(GenericEvent::NotifyPacketReceived(
2577 GenericPacket::V3_1_1Connack(packet),
2578 ));
2579 }
2580 Err(e) => {
2581 events.push(GenericEvent::RequestClose);
2582 events.push(GenericEvent::NotifyError(e));
2583 }
2584 }
2585
2586 events
2587 }
2588
2589 fn process_recv_v5_0_connack(
2590 &mut self,
2591 raw_packet: RawPacket,
2592 ) -> Vec<GenericEvent<PacketIdType>> {
2593 let mut events = Vec::new();
2594
2595 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2596 Ok((packet, _consumed)) => {
2597 if packet.reason_code() == ConnectReasonCode::Success {
2598 self.status = ConnectionStatus::Connected;
2599
2600 for prop in packet.props() {
2602 match prop {
2603 Property::TopicAliasMaximum(val) => {
2604 if val.val() > 0 {
2605 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2606 }
2607 }
2608 Property::ReceiveMaximum(val) => {
2609 assert!(val.val() != 0);
2610 self.publish_send_max = Some(val.val());
2611 }
2612 Property::MaximumPacketSize(val) => {
2613 assert!(val.val() != 0);
2614 self.maximum_packet_size_send = val.val();
2615 }
2616 Property::ServerKeepAlive(val) => {
2617 let timeout_ms = (val.val() as u64) * 1000;
2619 self.pingreq_send_interval_ms = Some(timeout_ms);
2620 }
2621 _ => {
2622 }
2624 }
2625 }
2626
2627 if packet.session_present() {
2628 events.extend(self.send_stored());
2629 } else {
2630 self.clear_store_related();
2631 }
2632 }
2633 events.push(GenericEvent::NotifyPacketReceived(
2634 GenericPacket::V5_0Connack(packet),
2635 ));
2636 }
2637 Err(e) => {
2638 if self.status == ConnectionStatus::Connected {
2639 let disconnect = v5_0::Disconnect::builder()
2640 .reason_code(e.into())
2641 .build()
2642 .unwrap();
2643 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2644 events.extend(disconnect_events);
2645 }
2646 events.push(GenericEvent::NotifyError(e));
2647 }
2648 }
2649
2650 events
2651 }
2652
2653 fn process_recv_v3_1_1_publish(
2654 &mut self,
2655 raw_packet: RawPacket,
2656 ) -> Vec<GenericEvent<PacketIdType>> {
2657 let mut events = Vec::new();
2658
2659 let flags = raw_packet.flags();
2660 match &raw_packet.data {
2661 PacketData::Publish(arc) => {
2662 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2663 Ok((packet, _consumed)) => {
2664 match packet.qos() {
2665 Qos::AtMostOnce => {
2666 events.extend(self.refresh_pingreq_recv());
2667 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2668 }
2669 Qos::AtLeastOnce => {
2670 let packet_id = packet.packet_id().unwrap();
2671 if self.status == ConnectionStatus::Connected
2672 && self.auto_pub_response
2673 {
2674 let puback = v3_1_1::GenericPuback::builder()
2676 .packet_id(packet_id)
2677 .build()
2678 .unwrap();
2679 events.extend(self.process_send_v3_1_1_puback(puback));
2680 }
2681 events.extend(self.refresh_pingreq_recv());
2682 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2683 }
2684 Qos::ExactlyOnce => {
2685 let packet_id = packet.packet_id().unwrap();
2686 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2687
2688 if self.status == ConnectionStatus::Connected
2689 && (self.auto_pub_response || already_handled)
2690 {
2691 let pubrec = v3_1_1::GenericPubrec::builder()
2692 .packet_id(packet_id)
2693 .build()
2694 .unwrap();
2695 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2696 }
2697 events.extend(self.refresh_pingreq_recv());
2698 if !already_handled {
2699 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2700 }
2701 }
2702 }
2703 }
2704 Err(e) => {
2705 events.push(GenericEvent::RequestClose);
2706 events.push(GenericEvent::NotifyError(e));
2707 }
2708 }
2709 }
2710 PacketData::Normal(_) => {
2711 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2712 }
2713 }
2714
2715 events
2716 }
2717
2718 fn process_recv_v5_0_publish(
2719 &mut self,
2720 raw_packet: RawPacket,
2721 ) -> Vec<GenericEvent<PacketIdType>> {
2722 let mut events = Vec::new();
2723
2724 let flags = raw_packet.flags();
2725 match &raw_packet.data {
2726 PacketData::Publish(arc) => {
2727 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2728 Ok((packet, _consumed)) => {
2729 let mut already_handled = false;
2730 let mut puback_send = false;
2731 let mut pubrec_send = false;
2732
2733 match packet.qos() {
2734 Qos::AtLeastOnce => {
2735 let packet_id = packet.packet_id().unwrap();
2736 if let Some(max) = self.publish_recv_max {
2737 if self.publish_recv.len() >= max as usize {
2738 let disconnect = v5_0::Disconnect::builder()
2739 .reason_code(
2740 DisconnectReasonCode::ReceiveMaximumExceeded,
2741 )
2742 .build()
2743 .unwrap();
2744 events
2745 .extend(self.process_send_v5_0_disconnect(disconnect));
2746 events.push(GenericEvent::NotifyError(
2747 MqttError::ReceiveMaximumExceeded,
2748 ));
2749 return events;
2750 }
2751 }
2752 self.publish_recv.insert(packet_id);
2753 if self.auto_pub_response
2754 && self.status == ConnectionStatus::Connected
2755 {
2756 puback_send = true;
2757 }
2758 }
2759 Qos::ExactlyOnce => {
2760 let packet_id = packet.packet_id().unwrap();
2761 if let Some(max) = self.publish_recv_max {
2762 if self.publish_recv.len() >= max as usize {
2763 let disconnect = v5_0::Disconnect::builder()
2764 .reason_code(
2765 DisconnectReasonCode::ReceiveMaximumExceeded,
2766 )
2767 .build()
2768 .unwrap();
2769 events
2770 .extend(self.process_send_v5_0_disconnect(disconnect));
2771 events.push(GenericEvent::NotifyError(
2772 MqttError::ReceiveMaximumExceeded,
2773 ));
2774 return events;
2775 }
2776 }
2777 self.publish_recv.insert(packet_id);
2778
2779 if !self.qos2_publish_handled.insert(packet_id) {
2780 already_handled = true;
2781 }
2782 if self.status == ConnectionStatus::Connected
2783 && (self.auto_pub_response || already_handled)
2784 {
2785 pubrec_send = true;
2786 }
2787 }
2788 Qos::AtMostOnce => {
2789 }
2791 }
2792
2793 if packet.topic_name().is_empty() {
2795 if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2797 if ta == 0
2798 || self.topic_alias_recv.is_none()
2799 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2800 {
2801 let disconnect = v5_0::Disconnect::builder()
2802 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2803 .build()
2804 .unwrap();
2805 events.extend(self.process_send_v5_0_disconnect(disconnect));
2806 events.push(GenericEvent::NotifyError(
2807 MqttError::TopicAliasInvalid,
2808 ));
2809 return events;
2810 }
2811
2812 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2813 if topic_alias_recv.get(ta).is_none() {
2814 let disconnect = v5_0::Disconnect::builder()
2815 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2816 .build()
2817 .unwrap();
2818 events
2819 .extend(self.process_send_v5_0_disconnect(disconnect));
2820 events.push(GenericEvent::NotifyError(
2821 MqttError::TopicAliasInvalid,
2822 ));
2823 return events;
2824 }
2825 }
2828 } else {
2829 let disconnect = v5_0::Disconnect::builder()
2830 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2831 .build()
2832 .unwrap();
2833 events.extend(self.process_send_v5_0_disconnect(disconnect));
2834 events
2835 .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2836 return events;
2837 }
2838 } else {
2839 if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2841 if ta == 0
2842 || self.topic_alias_recv.is_none()
2843 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2844 {
2845 let disconnect = v5_0::Disconnect::builder()
2846 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2847 .build()
2848 .unwrap();
2849 events.extend(self.process_send_v5_0_disconnect(disconnect));
2850 events.push(GenericEvent::NotifyError(
2851 MqttError::TopicAliasInvalid,
2852 ));
2853 return events;
2854 }
2855 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2856 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2857 }
2858 }
2859 }
2860
2861 if puback_send {
2863 let puback = v5_0::GenericPuback::builder()
2864 .packet_id(packet.packet_id().unwrap())
2865 .build()
2866 .unwrap();
2867 events.extend(self.process_send_v5_0_puback(puback));
2868 }
2869 if pubrec_send {
2870 let pubrec = v5_0::GenericPubrec::builder()
2871 .packet_id(packet.packet_id().unwrap())
2872 .build()
2873 .unwrap();
2874 events.extend(self.process_send_v5_0_pubrec(pubrec));
2875 }
2876
2877 events.extend(self.refresh_pingreq_recv());
2879
2880 if !already_handled {
2882 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2883 }
2884 }
2885 Err(e) => {
2886 if self.status == ConnectionStatus::Connected {
2887 let disconnect = v5_0::Disconnect::builder()
2888 .reason_code(e.into())
2889 .build()
2890 .unwrap();
2891 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2892 events.extend(disconnect_events);
2893 }
2894 events.push(GenericEvent::NotifyError(e));
2895 }
2896 }
2897 }
2898 PacketData::Normal(_) => {
2899 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2900 }
2901 }
2902
2903 events
2904 }
2905
2906 fn process_recv_v3_1_1_puback(
2907 &mut self,
2908 raw_packet: RawPacket,
2909 ) -> Vec<GenericEvent<PacketIdType>> {
2910 let mut events = Vec::new();
2911
2912 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2913 Ok((packet, _)) => {
2914 let packet_id = packet.packet_id();
2915 if self.pid_puback.remove(&packet_id) {
2916 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2917 if self.pid_man.is_used_id(packet_id) {
2918 self.pid_man.release_id(packet_id);
2919 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2920 }
2921 events.extend(self.refresh_pingreq_recv());
2922 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2923 } else {
2924 events.push(GenericEvent::RequestClose);
2925 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2926 }
2927 }
2928 Err(e) => {
2929 events.push(GenericEvent::RequestClose);
2930 events.push(GenericEvent::NotifyError(e));
2931 }
2932 }
2933
2934 events
2935 }
2936
2937 fn process_recv_v5_0_puback(
2938 &mut self,
2939 raw_packet: RawPacket,
2940 ) -> Vec<GenericEvent<PacketIdType>> {
2941 let mut events = Vec::new();
2942
2943 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2944 Ok((packet, _)) => {
2945 let packet_id = packet.packet_id();
2946 if self.pid_puback.remove(&packet_id) {
2947 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2948 if self.pid_man.is_used_id(packet_id) {
2949 self.pid_man.release_id(packet_id);
2950 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2951 }
2952 if self.publish_send_max.is_some() {
2953 self.publish_send_count -= 1;
2954 }
2955 events.extend(self.refresh_pingreq_recv());
2956 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2957 } else {
2958 let disconnect = v5_0::Disconnect::builder()
2959 .reason_code(DisconnectReasonCode::ProtocolError)
2960 .build()
2961 .unwrap();
2962 events.extend(self.process_send_v5_0_disconnect(disconnect));
2963 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2964 }
2965 }
2966 Err(e) => {
2967 let disconnect = v5_0::Disconnect::builder()
2968 .reason_code(DisconnectReasonCode::ProtocolError)
2969 .build()
2970 .unwrap();
2971 events.extend(self.process_send_v5_0_disconnect(disconnect));
2972 events.push(GenericEvent::NotifyError(e));
2973 }
2974 }
2975
2976 events
2977 }
2978
2979 fn process_recv_v3_1_1_pubrec(
2980 &mut self,
2981 raw_packet: RawPacket,
2982 ) -> Vec<GenericEvent<PacketIdType>> {
2983 let mut events = Vec::new();
2984
2985 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2986 Ok((packet, _)) => {
2987 let packet_id = packet.packet_id();
2988 if self.pid_pubrec.remove(&packet_id) {
2989 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2990 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2991 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2992 .packet_id(packet_id)
2993 .build()
2994 .unwrap();
2995 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2996 }
2997 events.extend(self.refresh_pingreq_recv());
2998 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2999 } else {
3000 events.push(GenericEvent::RequestClose);
3001 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3002 }
3003 }
3004 Err(e) => {
3005 events.push(GenericEvent::RequestClose);
3006 events.push(GenericEvent::NotifyError(e));
3007 }
3008 }
3009
3010 events
3011 }
3012
3013 fn process_recv_v5_0_pubrec(
3014 &mut self,
3015 raw_packet: RawPacket,
3016 ) -> Vec<GenericEvent<PacketIdType>> {
3017 let mut events = Vec::new();
3018
3019 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3020 Ok((packet, _)) => {
3021 let packet_id = packet.packet_id();
3022 if self.pid_pubrec.remove(&packet_id) {
3023 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3024 if let Some(reason_code) = packet.reason_code() {
3025 if reason_code != PubrecReasonCode::Success {
3026 if self.pid_man.is_used_id(packet_id) {
3027 self.pid_man.release_id(packet_id);
3028 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3029 }
3030 self.qos2_publish_processing.remove(&packet_id);
3031 if self.publish_send_max.is_some() {
3032 self.publish_send_count -= 1;
3033 }
3034 } else if self.auto_pub_response
3035 && self.status == ConnectionStatus::Connected
3036 {
3037 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3038 .packet_id(packet_id)
3039 .build()
3040 .unwrap();
3041 events.extend(self.process_send_v5_0_pubrel(pubrel));
3042 }
3043 } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3044 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3045 .packet_id(packet_id)
3046 .build()
3047 .unwrap();
3048 events.extend(self.process_send_v5_0_pubrel(pubrel));
3049 }
3050 events.extend(self.refresh_pingreq_recv());
3051 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3052 } else {
3053 let disconnect = v5_0::Disconnect::builder()
3054 .reason_code(DisconnectReasonCode::ProtocolError)
3055 .build()
3056 .unwrap();
3057 events.extend(self.process_send_v5_0_disconnect(disconnect));
3058 events.push(GenericEvent::NotifyError(MqttError::from(
3059 DisconnectReasonCode::ProtocolError,
3060 )));
3061 }
3062 }
3063 Err(e) => {
3064 let disconnect = v5_0::Disconnect::builder()
3065 .reason_code(DisconnectReasonCode::ProtocolError)
3066 .build()
3067 .unwrap();
3068 events.extend(self.process_send_v5_0_disconnect(disconnect));
3069 events.push(GenericEvent::NotifyError(e));
3070 }
3071 }
3072
3073 events
3074 }
3075
3076 fn process_recv_v3_1_1_pubrel(
3077 &mut self,
3078 raw_packet: RawPacket,
3079 ) -> Vec<GenericEvent<PacketIdType>> {
3080 let mut events = Vec::new();
3081
3082 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3083 Ok((packet, _)) => {
3084 let packet_id = packet.packet_id();
3085 self.qos2_publish_handled.remove(&packet_id);
3086 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3087 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3088 .packet_id(packet_id)
3089 .build()
3090 .unwrap();
3091 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3092 }
3093 events.extend(self.refresh_pingreq_recv());
3094 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3095 }
3096 Err(e) => {
3097 events.push(GenericEvent::RequestClose);
3098 events.push(GenericEvent::NotifyError(e));
3099 }
3100 }
3101
3102 events
3103 }
3104
3105 fn process_recv_v5_0_pubrel(
3106 &mut self,
3107 raw_packet: RawPacket,
3108 ) -> Vec<GenericEvent<PacketIdType>> {
3109 let mut events = Vec::new();
3110
3111 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3112 Ok((packet, _)) => {
3113 let packet_id = packet.packet_id();
3114 self.qos2_publish_handled.remove(&packet_id);
3115 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3116 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3117 .packet_id(packet_id)
3118 .build()
3119 .unwrap();
3120 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3121 }
3122 events.extend(self.refresh_pingreq_recv());
3123 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3124 }
3125 Err(e) => {
3126 let disconnect = v5_0::Disconnect::builder()
3127 .reason_code(DisconnectReasonCode::ProtocolError)
3128 .build()
3129 .unwrap();
3130 events.extend(self.process_send_v5_0_disconnect(disconnect));
3131 events.push(GenericEvent::NotifyError(e));
3132 }
3133 }
3134
3135 events
3136 }
3137
3138 fn process_recv_v3_1_1_pubcomp(
3139 &mut self,
3140 raw_packet: RawPacket,
3141 ) -> Vec<GenericEvent<PacketIdType>> {
3142 let mut events = Vec::new();
3143
3144 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3145 Ok((packet, _)) => {
3146 let packet_id = packet.packet_id();
3147 if self.pid_pubcomp.remove(&packet_id) {
3148 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3149 if self.pid_man.is_used_id(packet_id) {
3150 self.pid_man.release_id(packet_id);
3151 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3152 }
3153 self.qos2_publish_processing.remove(&packet_id);
3154 events.extend(self.refresh_pingreq_recv());
3155 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3156 } else {
3157 events.push(GenericEvent::RequestClose);
3158 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3159 }
3160 }
3161 Err(e) => {
3162 events.push(GenericEvent::RequestClose);
3163 events.push(GenericEvent::NotifyError(e));
3164 }
3165 }
3166
3167 events
3168 }
3169
3170 fn process_recv_v5_0_pubcomp(
3171 &mut self,
3172 raw_packet: RawPacket,
3173 ) -> Vec<GenericEvent<PacketIdType>> {
3174 let mut events = Vec::new();
3175
3176 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3177 Ok((packet, _)) => {
3178 let packet_id = packet.packet_id();
3179 if self.pid_pubcomp.remove(&packet_id) {
3180 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3181 if self.pid_man.is_used_id(packet_id) {
3182 self.pid_man.release_id(packet_id);
3183 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3184 }
3185 self.qos2_publish_processing.remove(&packet_id);
3186 if self.publish_send_max.is_some() {
3187 self.publish_send_count -= 1;
3188 }
3189 events.extend(self.refresh_pingreq_recv());
3190 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3191 } else {
3192 let disconnect = v5_0::Disconnect::builder()
3193 .reason_code(DisconnectReasonCode::ProtocolError)
3194 .build()
3195 .unwrap();
3196 events.extend(self.process_send_v5_0_disconnect(disconnect));
3197 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3198 }
3199 }
3200 Err(e) => {
3201 let disconnect = v5_0::Disconnect::builder()
3202 .reason_code(DisconnectReasonCode::ProtocolError)
3203 .build()
3204 .unwrap();
3205 events.extend(self.process_send_v5_0_disconnect(disconnect));
3206 events.push(GenericEvent::NotifyError(e));
3207 }
3208 }
3209
3210 events
3211 }
3212
3213 fn process_recv_v3_1_1_subscribe(
3214 &mut self,
3215 raw_packet: RawPacket,
3216 ) -> Vec<GenericEvent<PacketIdType>> {
3217 let mut events = Vec::new();
3218
3219 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3220 Ok((packet, _)) => {
3221 events.extend(self.refresh_pingreq_recv());
3222 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3223 }
3224 Err(e) => {
3225 events.push(GenericEvent::RequestClose);
3226 events.push(GenericEvent::NotifyError(e));
3227 }
3228 }
3229
3230 events
3231 }
3232
3233 fn process_recv_v5_0_subscribe(
3234 &mut self,
3235 raw_packet: RawPacket,
3236 ) -> Vec<GenericEvent<PacketIdType>> {
3237 let mut events = Vec::new();
3238
3239 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3240 Ok((packet, _)) => {
3241 events.extend(self.refresh_pingreq_recv());
3242 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3243 }
3244 Err(e) => {
3245 let disconnect = v5_0::Disconnect::builder()
3246 .reason_code(DisconnectReasonCode::ProtocolError)
3247 .build()
3248 .unwrap();
3249 events.extend(self.process_send_v5_0_disconnect(disconnect));
3250 events.push(GenericEvent::NotifyError(e));
3251 }
3252 }
3253
3254 events
3255 }
3256
3257 fn process_recv_v3_1_1_suback(
3258 &mut self,
3259 raw_packet: RawPacket,
3260 ) -> Vec<GenericEvent<PacketIdType>> {
3261 let mut events = Vec::new();
3262
3263 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3264 Ok((packet, _)) => {
3265 let packet_id = packet.packet_id();
3266 if self.pid_suback.remove(&packet_id) {
3267 if self.pid_man.is_used_id(packet_id) {
3268 self.pid_man.release_id(packet_id);
3269 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3270 }
3271 events.extend(self.refresh_pingreq_recv());
3272 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3273 } else {
3274 events.push(GenericEvent::RequestClose);
3275 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3276 }
3277 }
3278 Err(e) => {
3279 events.push(GenericEvent::RequestClose);
3280 events.push(GenericEvent::NotifyError(e));
3281 }
3282 }
3283
3284 events
3285 }
3286
3287 fn process_recv_v5_0_suback(
3288 &mut self,
3289 raw_packet: RawPacket,
3290 ) -> Vec<GenericEvent<PacketIdType>> {
3291 let mut events = Vec::new();
3292
3293 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3294 Ok((packet, _)) => {
3295 let packet_id = packet.packet_id();
3296 if self.pid_suback.remove(&packet_id) {
3297 if self.pid_man.is_used_id(packet_id) {
3298 self.pid_man.release_id(packet_id);
3299 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3300 }
3301 events.extend(self.refresh_pingreq_recv());
3302 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3303 } else {
3304 let disconnect = v5_0::Disconnect::builder()
3305 .reason_code(DisconnectReasonCode::ProtocolError)
3306 .build()
3307 .unwrap();
3308 events.extend(self.process_send_v5_0_disconnect(disconnect));
3309 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3310 }
3311 }
3312 Err(e) => {
3313 let disconnect = v5_0::Disconnect::builder()
3314 .reason_code(DisconnectReasonCode::ProtocolError)
3315 .build()
3316 .unwrap();
3317 events.extend(self.process_send_v5_0_disconnect(disconnect));
3318 events.push(GenericEvent::NotifyError(e));
3319 }
3320 }
3321
3322 events
3323 }
3324
3325 fn process_recv_v3_1_1_unsubscribe(
3326 &mut self,
3327 raw_packet: RawPacket,
3328 ) -> Vec<GenericEvent<PacketIdType>> {
3329 let mut events = Vec::new();
3330
3331 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3332 Ok((packet, _)) => {
3333 events.extend(self.refresh_pingreq_recv());
3334 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3335 }
3336 Err(e) => {
3337 events.push(GenericEvent::RequestClose);
3338 events.push(GenericEvent::NotifyError(e));
3339 }
3340 }
3341
3342 events
3343 }
3344
3345 fn process_recv_v5_0_unsubscribe(
3346 &mut self,
3347 raw_packet: RawPacket,
3348 ) -> Vec<GenericEvent<PacketIdType>> {
3349 let mut events = Vec::new();
3350
3351 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3352 Ok((packet, _)) => {
3353 events.extend(self.refresh_pingreq_recv());
3354 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3355 }
3356 Err(e) => {
3357 let disconnect = v5_0::Disconnect::builder()
3358 .reason_code(DisconnectReasonCode::ProtocolError)
3359 .build()
3360 .unwrap();
3361 events.extend(self.process_send_v5_0_disconnect(disconnect));
3362 events.push(GenericEvent::NotifyError(e));
3363 }
3364 }
3365
3366 events
3367 }
3368
3369 fn process_recv_v3_1_1_unsuback(
3370 &mut self,
3371 raw_packet: RawPacket,
3372 ) -> Vec<GenericEvent<PacketIdType>> {
3373 let mut events = Vec::new();
3374
3375 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3376 Ok((packet, _)) => {
3377 let packet_id = packet.packet_id();
3378 if self.pid_unsuback.remove(&packet_id) {
3379 if self.pid_man.is_used_id(packet_id) {
3380 self.pid_man.release_id(packet_id);
3381 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3382 }
3383 events.extend(self.refresh_pingreq_recv());
3384 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3385 } else {
3386 events.push(GenericEvent::RequestClose);
3387 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3388 }
3389 }
3390 Err(e) => {
3391 events.push(GenericEvent::RequestClose);
3392 events.push(GenericEvent::NotifyError(e));
3393 }
3394 }
3395
3396 events
3397 }
3398
3399 fn process_recv_v5_0_unsuback(
3400 &mut self,
3401 raw_packet: RawPacket,
3402 ) -> Vec<GenericEvent<PacketIdType>> {
3403 let mut events = Vec::new();
3404
3405 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3406 Ok((packet, _)) => {
3407 let packet_id = packet.packet_id();
3408 if self.pid_unsuback.remove(&packet_id) {
3409 if self.pid_man.is_used_id(packet_id) {
3410 self.pid_man.release_id(packet_id);
3411 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3412 }
3413 events.extend(self.refresh_pingreq_recv());
3414 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3415 } else {
3416 let disconnect = v5_0::Disconnect::builder()
3417 .reason_code(DisconnectReasonCode::ProtocolError)
3418 .build()
3419 .unwrap();
3420 events.extend(self.process_send_v5_0_disconnect(disconnect));
3421 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3422 }
3423 }
3424 Err(e) => {
3425 let disconnect = v5_0::Disconnect::builder()
3426 .reason_code(DisconnectReasonCode::ProtocolError)
3427 .build()
3428 .unwrap();
3429 events.extend(self.process_send_v5_0_disconnect(disconnect));
3430 events.push(GenericEvent::NotifyError(e));
3431 }
3432 }
3433
3434 events
3435 }
3436
3437 fn process_recv_v3_1_1_pingreq(
3438 &mut self,
3439 raw_packet: RawPacket,
3440 ) -> Vec<GenericEvent<PacketIdType>> {
3441 let mut events = Vec::new();
3442
3443 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3444 Ok((packet, _)) => {
3445 if (Role::IS_SERVER || Role::IS_ANY)
3446 && !self.is_client
3447 && self.auto_ping_response
3448 && self.status == ConnectionStatus::Connected
3449 {
3450 let pingresp = v3_1_1::Pingresp::new();
3451 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3452 }
3453 events.extend(self.refresh_pingreq_recv());
3454 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3455 }
3456 Err(e) => {
3457 events.push(GenericEvent::RequestClose);
3458 events.push(GenericEvent::NotifyError(e));
3459 }
3460 }
3461
3462 events
3463 }
3464
3465 fn process_recv_v5_0_pingreq(
3466 &mut self,
3467 raw_packet: RawPacket,
3468 ) -> Vec<GenericEvent<PacketIdType>> {
3469 let mut events = Vec::new();
3470
3471 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3472 Ok((packet, _)) => {
3473 if (Role::IS_SERVER || Role::IS_ANY)
3474 && !self.is_client
3475 && self.auto_ping_response
3476 && self.status == ConnectionStatus::Connected
3477 {
3478 let pingresp = v5_0::Pingresp::new();
3479 events.extend(self.process_send_v5_0_pingresp(pingresp));
3480 }
3481 events.extend(self.refresh_pingreq_recv());
3482 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3483 }
3484 Err(e) => {
3485 let disconnect = v5_0::Disconnect::builder()
3486 .reason_code(DisconnectReasonCode::ProtocolError)
3487 .build()
3488 .unwrap();
3489 events.extend(self.process_send_v5_0_disconnect(disconnect));
3490 events.push(GenericEvent::NotifyError(e));
3491 }
3492 }
3493
3494 events
3495 }
3496
3497 fn process_recv_v3_1_1_pingresp(
3498 &mut self,
3499 raw_packet: RawPacket,
3500 ) -> Vec<GenericEvent<PacketIdType>> {
3501 let mut events = Vec::new();
3502
3503 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3504 Ok((packet, _)) => {
3505 self.pingresp_recv_set = false;
3506 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3507 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3508 }
3509 Err(e) => {
3510 events.push(GenericEvent::RequestClose);
3511 events.push(GenericEvent::NotifyError(e));
3512 }
3513 }
3514
3515 events
3516 }
3517
3518 fn process_recv_v5_0_pingresp(
3519 &mut self,
3520 raw_packet: RawPacket,
3521 ) -> Vec<GenericEvent<PacketIdType>> {
3522 let mut events = Vec::new();
3523
3524 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3525 Ok((packet, _)) => {
3526 self.pingresp_recv_set = false;
3527 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3528 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3529 }
3530 Err(e) => {
3531 let disconnect = v5_0::Disconnect::builder()
3532 .reason_code(DisconnectReasonCode::ProtocolError)
3533 .build()
3534 .unwrap();
3535 events.extend(self.process_send_v5_0_disconnect(disconnect));
3536 events.push(GenericEvent::NotifyError(e));
3537 }
3538 }
3539
3540 events
3541 }
3542
3543 fn process_recv_v3_1_1_disconnect(
3544 &mut self,
3545 raw_packet: RawPacket,
3546 ) -> Vec<GenericEvent<PacketIdType>> {
3547 let mut events = Vec::new();
3548
3549 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3550 Ok((packet, _)) => {
3551 self.cancel_timers(&mut events);
3552 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3553 }
3554 Err(e) => {
3555 events.push(GenericEvent::RequestClose);
3556 events.push(GenericEvent::NotifyError(e));
3557 }
3558 }
3559
3560 events
3561 }
3562
3563 fn process_recv_v5_0_disconnect(
3564 &mut self,
3565 raw_packet: RawPacket,
3566 ) -> Vec<GenericEvent<PacketIdType>> {
3567 let mut events = Vec::new();
3568
3569 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3570 Ok((packet, _)) => {
3571 self.cancel_timers(&mut events);
3572 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3573 }
3574 Err(e) => {
3575 let disconnect = v5_0::Disconnect::builder()
3576 .reason_code(DisconnectReasonCode::ProtocolError)
3577 .build()
3578 .unwrap();
3579 events.extend(self.process_send_v5_0_disconnect(disconnect));
3580 events.push(GenericEvent::NotifyError(e));
3581 }
3582 }
3583
3584 events
3585 }
3586
3587 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3588 let mut events = Vec::new();
3589
3590 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3591 Ok((packet, _)) => {
3592 events.extend(self.refresh_pingreq_recv());
3593 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3594 }
3595 Err(e) => {
3596 let disconnect = v5_0::Disconnect::builder()
3597 .reason_code(DisconnectReasonCode::ProtocolError)
3598 .build()
3599 .unwrap();
3600 events.extend(self.process_send_v5_0_disconnect(disconnect));
3601 events.push(GenericEvent::NotifyError(e));
3602 }
3603 }
3604
3605 events
3606 }
3607
3608 fn get_topic_alias_from_props_opt(props: &Option<Vec<Property>>) -> Option<u16> {
3609 if let Some(props) = props {
3610 Self::get_topic_alias_from_props(props.as_slice())
3611 } else {
3612 None
3613 }
3614 }
3615
3616 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3617 let mut events = Vec::new();
3618 if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3619 if self.status == ConnectionStatus::Connecting
3620 || self.status == ConnectionStatus::Connected
3621 {
3622 self.pingreq_recv_set = true;
3623 events.push(GenericEvent::RequestTimerReset {
3624 kind: TimerKind::PingreqRecv,
3625 duration_ms: timeout_ms,
3626 });
3627 } else {
3628 self.pingreq_recv_set = false;
3629 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3630 }
3631 }
3632
3633 events
3634 }
3635
3636 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3638 if self.pingreq_send_set {
3639 self.pingreq_send_set = false;
3640 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3641 }
3642 if self.pingreq_recv_set {
3643 self.pingreq_recv_set = false;
3644 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3645 }
3646 if self.pingresp_recv_set {
3647 self.pingresp_recv_set = false;
3648 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3649 }
3650 }
3651
3652 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3654 for prop in props {
3655 if let Property::TopicAlias(ta) = prop {
3656 return Some(ta.val());
3657 }
3658 }
3659 None
3660 }
3661
3662 #[allow(dead_code)]
3663 fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3664 self.pid_man.is_used_id(packet_id)
3665 }
3666}
3667
3668pub trait RecvBehavior<Role, PacketIdType>
3671where
3672 PacketIdType: IsPacketId,
3673{
3674 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3675}
3676
3677impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3679 for GenericConnection<role::Client, PacketIdType>
3680where
3681 PacketIdType: IsPacketId,
3682{
3683 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3684 self.recv(data)
3685 }
3686}
3687
3688impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3689 for GenericConnection<role::Server, PacketIdType>
3690where
3691 PacketIdType: IsPacketId,
3692{
3693 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3694 self.recv(data)
3695 }
3696}
3697
3698impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3699 for GenericConnection<role::Any, PacketIdType>
3700where
3701 PacketIdType: IsPacketId,
3702{
3703 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3704 self.recv(data)
3705 }
3706}
3707
3708#[cfg(test)]
3711mod tests {
3712 use super::*;
3713 use crate::mqtt::connection::version::Version;
3714 use crate::mqtt::packet::TopicAliasSend;
3715 use crate::mqtt::role;
3716
3717 #[test]
3718 fn test_initialize_client_mode() {
3719 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3720
3721 connection.initialize(true);
3723
3724 assert!(connection.is_client);
3726 assert_eq!(connection.publish_send_count, 0);
3727 assert!(connection.publish_send_max.is_none());
3728 assert!(connection.publish_recv_max.is_none());
3729 assert!(!connection.need_store);
3730 }
3731
3732 #[test]
3733 fn test_initialize_server_mode() {
3734 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3735
3736 connection.initialize(false);
3738
3739 assert!(!connection.is_client);
3741 assert_eq!(connection.publish_send_count, 0);
3742 assert!(connection.publish_send_max.is_none());
3743 assert!(connection.publish_recv_max.is_none());
3744 assert!(!connection.need_store);
3745 }
3746
3747 #[test]
3748 fn test_validate_topic_alias_no_topic_alias_send() {
3749 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3750
3751 let result = connection.validate_topic_alias(Some(1));
3753 assert!(result.is_none());
3754 }
3755
3756 #[test]
3757 fn test_validate_topic_alias_none_input() {
3758 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3759
3760 let result = connection.validate_topic_alias(None);
3762 assert!(result.is_none());
3763 }
3764
3765 #[test]
3766 fn test_validate_topic_alias_range_no_topic_alias_send() {
3767 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3768
3769 let result = connection.validate_topic_alias_range(1);
3771 assert!(!result);
3772 }
3773
3774 #[test]
3775 fn test_validate_topic_alias_range_zero() {
3776 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3777
3778 let topic_alias_send = TopicAliasSend::new(10);
3780 connection.topic_alias_send = Some(topic_alias_send);
3781
3782 let result = connection.validate_topic_alias_range(0);
3784 assert!(!result);
3785 }
3786
3787 #[test]
3788 fn test_validate_topic_alias_range_over_max() {
3789 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3790
3791 let topic_alias_send = TopicAliasSend::new(5);
3793 connection.topic_alias_send = Some(topic_alias_send);
3794
3795 let result = connection.validate_topic_alias_range(6);
3797 assert!(!result);
3798 }
3799
3800 #[test]
3801 fn test_validate_topic_alias_range_valid() {
3802 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3803
3804 let topic_alias_send = TopicAliasSend::new(10);
3806 connection.topic_alias_send = Some(topic_alias_send);
3807
3808 assert!(connection.validate_topic_alias_range(1));
3810 assert!(connection.validate_topic_alias_range(5));
3811 assert!(connection.validate_topic_alias_range(10));
3812 }
3813
3814 #[test]
3815 fn test_validate_topic_alias_with_registered_alias() {
3816 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3817
3818 let mut topic_alias_send = TopicAliasSend::new(10);
3820 topic_alias_send.insert_or_update("test/topic", 5);
3821 connection.topic_alias_send = Some(topic_alias_send);
3822
3823 let result = connection.validate_topic_alias(Some(5));
3825 assert_eq!(result, Some("test/topic".to_string()));
3826 }
3827
3828 #[test]
3829 fn test_validate_topic_alias_unregistered_alias() {
3830 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3831
3832 let topic_alias_send = TopicAliasSend::new(10);
3834 connection.topic_alias_send = Some(topic_alias_send);
3835
3836 let result = connection.validate_topic_alias(Some(5));
3838 assert!(result.is_none());
3839 }
3840
3841 #[test]
3842 fn test_validate_maximum_packet_size_within_limit() {
3843 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3844
3845 let result = connection.validate_maximum_packet_size_send(1000);
3847 assert!(result);
3848 }
3849
3850 #[test]
3851 fn test_validate_maximum_packet_size_at_limit() {
3852 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3853
3854 connection.maximum_packet_size_send = 1000;
3856
3857 let result = connection.validate_maximum_packet_size_send(1000);
3859 assert!(result);
3860 }
3861
3862 #[test]
3863 fn test_validate_maximum_packet_size_over_limit() {
3864 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3865
3866 connection.maximum_packet_size_send = 1000;
3868
3869 let result = connection.validate_maximum_packet_size_send(1001);
3871 assert!(!result);
3872 }
3873
3874 #[test]
3875 fn test_validate_maximum_packet_size_zero_limit() {
3876 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3877
3878 connection.maximum_packet_size_send = 0;
3880
3881 let result = connection.validate_maximum_packet_size_send(1);
3883 assert!(!result);
3884
3885 let result = connection.validate_maximum_packet_size_send(0);
3887 assert!(result);
3888 }
3889
3890 #[test]
3891 fn test_initialize_clears_state() {
3892 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3893
3894 connection.publish_send_count = 5;
3896 connection.need_store = true;
3897 connection.pid_suback.insert(123);
3898 connection.pid_unsuback.insert(456);
3899
3900 connection.initialize(true);
3902
3903 assert_eq!(connection.publish_send_count, 0);
3905 assert!(!connection.need_store);
3906 assert!(connection.pid_suback.is_empty());
3907 assert!(connection.pid_unsuback.is_empty());
3908 assert!(connection.is_client);
3909 }
3910
3911 #[test]
3912 fn test_remaining_length_to_total_size() {
3913 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3929}