1use alloc::{
23 string::{String, ToString},
24 vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33
34use serde::Serialize;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
37enum ConnectionStatus {
38 #[serde(rename = "disconnected")]
39 Disconnected,
40 #[serde(rename = "connecting")]
41 Connecting,
42 #[serde(rename = "connected")]
43 Connected,
44}
45use crate::mqtt::connection::packet_builder::{
46 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
47};
48use crate::mqtt::connection::packet_id_manager::PacketIdManager;
49use crate::mqtt::connection::role;
50use crate::mqtt::connection::role::RoleType;
51use crate::mqtt::connection::sendable::Sendable;
52use crate::mqtt::connection::version::*;
53use crate::mqtt::packet::v3_1_1;
54use crate::mqtt::packet::v5_0;
55use crate::mqtt::packet::GenericPacket;
56use crate::mqtt::packet::GenericStorePacket;
57use crate::mqtt::packet::IsPacketId;
58use crate::mqtt::packet::Qos;
59use crate::mqtt::packet::ResponsePacket;
60use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
61use crate::mqtt::prelude::GenericPacketTrait;
62use crate::mqtt::result_code::{
63 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
64};
65
66const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
69
70fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
77 let remaining_length_bytes = if remaining_length < 128 {
78 1
79 } else if remaining_length < 16384 {
80 2
81 } else if remaining_length < 2097152 {
82 3
83 } else {
84 4
85 };
86
87 1 + remaining_length_bytes + remaining_length
88}
89
90pub type Event = GenericEvent<u16>;
95
96pub struct GenericConnection<Role, PacketIdType>
139where
140 Role: RoleType,
141 PacketIdType: IsPacketId,
142{
143 _marker: PhantomData<Role>,
144
145 protocol_version: Version,
146
147 pid_man: PacketIdManager<PacketIdType>,
148 pid_suback: HashSet<PacketIdType>,
149 pid_unsuback: HashSet<PacketIdType>,
150 pid_puback: HashSet<PacketIdType>,
151 pid_pubrec: HashSet<PacketIdType>,
152 pid_pubcomp: HashSet<PacketIdType>,
153
154 need_store: bool,
155 store: GenericStore<PacketIdType>,
157
158 offline_publish: bool,
159 auto_pub_response: bool,
160 auto_ping_response: bool,
161
162 auto_map_topic_alias_send: bool,
164 auto_replace_topic_alias_send: bool,
166 topic_alias_recv: Option<TopicAliasRecv>,
168 topic_alias_send: Option<TopicAliasSend>,
170
171 publish_send_max: Option<u16>,
172 publish_recv_max: Option<u16>,
174 publish_send_count: u16,
177
178 publish_recv: HashSet<PacketIdType>,
180
181 maximum_packet_size_send: u32,
183 maximum_packet_size_recv: u32,
185
186 status: ConnectionStatus,
188
189 pingreq_user_send_interval_ms: Option<u64>,
191 pingreq_keep_alive_ms: u64,
192 pingreq_server_keep_alive_ms: Option<u64>,
193
194 pingreq_recv_timeout_ms: u64,
196 pingresp_recv_timeout_ms: u64,
198
199 qos2_publish_handled: HashSet<PacketIdType>,
201 qos2_publish_processing: HashSet<PacketIdType>,
203
204 pingreq_send_set: bool,
206 pingreq_recv_set: bool,
207 pingresp_recv_set: bool,
208
209 packet_builder: PacketBuilder,
210 is_client: bool,
212}
213
214pub type Connection<Role> = GenericConnection<Role, u16>;
227
228impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
229where
230 Role: RoleType,
231 PacketIdType: IsPacketId,
232{
233 pub fn new(version: Version) -> Self {
256 Self {
257 _marker: PhantomData,
258 protocol_version: version,
259 pid_man: PacketIdManager::new(),
260 pid_suback: HashSet::default(),
261 pid_unsuback: HashSet::default(),
262 pid_puback: HashSet::default(),
263 pid_pubrec: HashSet::default(),
264 pid_pubcomp: HashSet::default(),
265 need_store: false,
266 store: GenericStore::new(),
267 offline_publish: false,
268 auto_pub_response: false,
269 auto_ping_response: false,
270 auto_map_topic_alias_send: false,
271 auto_replace_topic_alias_send: false,
272 topic_alias_recv: None,
273 topic_alias_send: None,
274 publish_send_max: None,
275 publish_recv_max: None,
276 publish_send_count: 0,
277 publish_recv: HashSet::default(),
278 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
279 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
280 status: ConnectionStatus::Disconnected,
281 pingreq_user_send_interval_ms: None,
282 pingreq_keep_alive_ms: 0,
283 pingreq_server_keep_alive_ms: None,
284 pingreq_recv_timeout_ms: 0,
285 pingresp_recv_timeout_ms: 0,
286 qos2_publish_handled: HashSet::default(),
287 qos2_publish_processing: HashSet::default(),
288 pingreq_send_set: false,
289 pingreq_recv_set: false,
290 pingresp_recv_set: false,
291 packet_builder: PacketBuilder::new(),
292 is_client: false,
293 }
294 }
295
296 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
337 where
338 T: Sendable<Role, PacketIdType>,
339 {
340 packet.dispatch_send(self)
342 }
343
344 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
389 use core::any::TypeId;
390
391 let role_id = TypeId::of::<Role>();
392 let client_id = TypeId::of::<role::Client>();
393 let server_id = TypeId::of::<role::Server>();
394 let any_id = TypeId::of::<role::Any>();
395
396 let packet_version = packet.protocol_version();
398
399 if self.protocol_version != packet_version {
401 return vec![GenericEvent::NotifyError(MqttError::VersionMismatch)];
402 }
403
404 match packet {
405 GenericPacket::V3_1_1Connect(p) => {
407 if role_id == client_id || role_id == any_id {
408 self.process_send_v3_1_1_connect(p)
409 } else {
410 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
411 }
412 }
413 GenericPacket::V5_0Connect(p) => {
414 if role_id == client_id || role_id == any_id {
415 self.process_send_v5_0_connect(p)
416 } else {
417 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
418 }
419 }
420 GenericPacket::V3_1_1Connack(p) => {
422 if role_id == server_id || role_id == any_id {
423 self.process_send_v3_1_1_connack(p)
424 } else {
425 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
426 }
427 }
428 GenericPacket::V5_0Connack(p) => {
429 if role_id == server_id || role_id == any_id {
430 self.process_send_v5_0_connack(p)
431 } else {
432 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
433 }
434 }
435 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
437 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
438 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
440 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
441 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
442 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
443 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
444 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
445 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
446 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
447 GenericPacket::V3_1_1Subscribe(p) => {
449 if role_id == client_id || role_id == any_id {
450 self.process_send_v3_1_1_subscribe(p)
451 } else {
452 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
453 }
454 }
455 GenericPacket::V5_0Subscribe(p) => {
456 if role_id == client_id || role_id == any_id {
457 self.process_send_v5_0_subscribe(p)
458 } else {
459 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
460 }
461 }
462 GenericPacket::V3_1_1Unsubscribe(p) => {
463 if role_id == client_id || role_id == any_id {
464 self.process_send_v3_1_1_unsubscribe(p)
465 } else {
466 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
467 }
468 }
469 GenericPacket::V5_0Unsubscribe(p) => {
470 if role_id == client_id || role_id == any_id {
471 self.process_send_v5_0_unsubscribe(p)
472 } else {
473 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
474 }
475 }
476 GenericPacket::V3_1_1Suback(p) => {
478 if role_id == server_id || role_id == any_id {
479 self.process_send_v3_1_1_suback(p)
480 } else {
481 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
482 }
483 }
484 GenericPacket::V5_0Suback(p) => {
485 if role_id == server_id || role_id == any_id {
486 self.process_send_v5_0_suback(p)
487 } else {
488 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
489 }
490 }
491 GenericPacket::V3_1_1Unsuback(p) => {
492 if role_id == server_id || role_id == any_id {
493 self.process_send_v3_1_1_unsuback(p)
494 } else {
495 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
496 }
497 }
498 GenericPacket::V5_0Unsuback(p) => {
499 if role_id == server_id || role_id == any_id {
500 self.process_send_v5_0_unsuback(p)
501 } else {
502 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
503 }
504 }
505 GenericPacket::V3_1_1Pingreq(p) => {
507 if role_id == client_id || role_id == any_id {
508 self.process_send_v3_1_1_pingreq(p)
509 } else {
510 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
511 }
512 }
513 GenericPacket::V5_0Pingreq(p) => {
514 if role_id == client_id || role_id == any_id {
515 self.process_send_v5_0_pingreq(p)
516 } else {
517 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
518 }
519 }
520 GenericPacket::V3_1_1Pingresp(p) => {
522 if role_id == server_id || role_id == any_id {
523 self.process_send_v3_1_1_pingresp(p)
524 } else {
525 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
526 }
527 }
528 GenericPacket::V5_0Pingresp(p) => {
529 if role_id == server_id || role_id == any_id {
530 self.process_send_v5_0_pingresp(p)
531 } else {
532 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
533 }
534 }
535 GenericPacket::V3_1_1Disconnect(p) => {
537 if role_id == client_id || role_id == any_id {
538 self.process_send_v3_1_1_disconnect(p)
539 } else {
540 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
541 }
542 }
543 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
545 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
547 }
548 }
549
550 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
599 let mut events = Vec::new();
600
601 match self.packet_builder.feed(data) {
602 PacketBuildResult::Complete(raw_packet) => {
603 events.extend(self.process_recv_packet(raw_packet));
604 }
605 PacketBuildResult::Incomplete => {}
606 PacketBuildResult::Error(e) => {
607 self.cancel_timers(&mut events);
608 events.push(GenericEvent::RequestClose);
609 events.push(GenericEvent::NotifyError(e));
610 }
611 }
612
613 events
614 }
615
616 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
633 let mut events = Vec::new();
634
635 match kind {
636 TimerKind::PingreqSend => {
637 self.pingreq_send_set = false;
639
640 if self.status == ConnectionStatus::Connected {
642 match self.protocol_version {
643 Version::V3_1_1 => {
644 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
645 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
646 }
647 }
648 Version::V5_0 => {
649 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
650 events.extend(self.process_send_v5_0_pingreq(pingreq));
651 }
652 }
653 Version::Undetermined => {
654 unreachable!("Protocol version should be set before sending PINGREQ");
655 }
656 }
657 }
658 }
659 TimerKind::PingreqRecv => {
660 self.pingreq_recv_set = false;
662
663 match self.protocol_version {
664 Version::V3_1_1 => {
665 events.push(GenericEvent::RequestClose);
667 }
668 Version::V5_0 => {
669 if self.status == ConnectionStatus::Connected {
671 if let Ok(disconnect) = v5_0::Disconnect::builder()
672 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
673 .build()
674 {
675 events.extend(self.process_send_v5_0_disconnect(disconnect));
676 }
677 }
678 }
679 Version::Undetermined => {
680 unreachable!("Protocol version should be set before receiving PINGREQ");
681 }
682 }
683 }
684 TimerKind::PingrespRecv => {
685 self.pingresp_recv_set = false;
687
688 match self.protocol_version {
689 Version::V3_1_1 => {
690 events.push(GenericEvent::RequestClose);
692 }
693 Version::V5_0 => {
694 if self.status == ConnectionStatus::Connected {
696 if let Ok(disconnect) = v5_0::Disconnect::builder()
697 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
698 .build()
699 {
700 events.extend(self.process_send_v5_0_disconnect(disconnect));
701 }
702 }
703 }
704 Version::Undetermined => {
705 unreachable!("Protocol version should be set before receiving PINGRESP");
706 }
707 }
708 }
709 }
710
711 events
712 }
713
714 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
727 let mut events = Vec::new();
728
729 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
731 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
732
733 self.status = ConnectionStatus::Disconnected;
735
736 self.topic_alias_send = None;
738 self.topic_alias_recv = None;
739
740 for packet_id in self.pid_suback.drain() {
742 if self.pid_man.is_used_id(packet_id) {
743 self.pid_man.release_id(packet_id);
744 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
745 }
746 }
747
748 for packet_id in self.pid_unsuback.drain() {
750 if self.pid_man.is_used_id(packet_id) {
751 self.pid_man.release_id(packet_id);
752 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
753 }
754 }
755
756 if !self.need_store {
758 self.qos2_publish_processing.clear();
759 self.qos2_publish_handled.clear();
760
761 for packet_id in self.pid_puback.drain() {
763 if self.pid_man.is_used_id(packet_id) {
764 self.pid_man.release_id(packet_id);
765 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
766 }
767 }
768
769 for packet_id in self.pid_pubrec.drain() {
771 if self.pid_man.is_used_id(packet_id) {
772 self.pid_man.release_id(packet_id);
773 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
774 }
775 }
776
777 for packet_id in self.pid_pubcomp.drain() {
779 if self.pid_man.is_used_id(packet_id) {
780 self.pid_man.release_id(packet_id);
781 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
782 }
783 }
784 }
785
786 self.cancel_timers(&mut events);
788
789 events
790 }
791
792 pub fn set_pingreq_send_interval(
814 &mut self,
815 duration_ms: Option<u64>,
816 ) -> Vec<GenericEvent<PacketIdType>> {
817 let mut events = Vec::new();
818 self.pingreq_user_send_interval_ms = duration_ms;
819 if let Some(ms) = duration_ms {
820 if ms == 0 {
821 if self.pingreq_send_set {
822 self.pingreq_send_set = false;
823 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
824 }
825 } else if self.status == ConnectionStatus::Connected {
826 self.pingreq_send_set = true;
827 events.push(GenericEvent::RequestTimerReset {
828 kind: TimerKind::PingreqSend,
829 duration_ms: ms,
830 });
831 }
832 }
833 events
834 }
835
836 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
845 self.publish_send_max
847 .map(|max| max.saturating_sub(self.publish_send_count))
848 }
849
850 pub fn set_offline_publish(&mut self, enable: bool) {
859 self.offline_publish = enable;
860 if self.offline_publish {
861 self.need_store = true;
862 }
863 }
864
865 pub fn set_auto_pub_response(&mut self, enable: bool) {
874 self.auto_pub_response = enable;
875 }
876
877 pub fn set_auto_ping_response(&mut self, enable: bool) {
885 self.auto_ping_response = enable;
886 }
887
888 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
900 self.auto_map_topic_alias_send = enable;
901 }
902
903 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
913 self.auto_replace_topic_alias_send = enable;
914 }
915
916 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: u64) {
925 self.pingresp_recv_timeout_ms = timeout_ms;
926 }
927
928 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
934 self.pid_man.acquire_unique_id()
935 }
936
937 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
950 self.pid_man.register_id(packet_id)
951 }
952
953 pub fn release_packet_id(
967 &mut self,
968 packet_id: PacketIdType,
969 ) -> Vec<GenericEvent<PacketIdType>> {
970 let mut events = Vec::new();
971
972 if self.pid_man.is_used_id(packet_id) {
973 self.pid_man.release_id(packet_id);
974 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
975 }
976
977 events
978 }
979
980 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
989 self.qos2_publish_handled.clone()
990 }
991
992 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
1001 self.qos2_publish_handled = pids;
1002 }
1003
1004 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
1013 for packet in packets {
1014 match &packet {
1015 GenericStorePacket::V3_1_1Publish(p) => {
1016 match p.qos() {
1018 Qos::AtLeastOnce => {
1019 self.pid_puback.insert(p.packet_id().unwrap());
1020 }
1021 Qos::ExactlyOnce => {
1022 self.pid_pubrec.insert(p.packet_id().unwrap());
1023 }
1024 _ => {
1025 warn!("QoS 0 packet found in store, skipping");
1027 continue;
1028 }
1029 }
1030 let packet_id = p.packet_id().unwrap();
1032 if self.pid_man.register_id(packet_id).is_ok() {
1033 if let Err(_e) = self.store.add(packet) {
1034 error!("Failed to add packet to store: {:?}", _e);
1035 }
1036 } else {
1037 error!("Packet ID {} has already been used. Skip it", packet_id);
1038 }
1039 }
1040 GenericStorePacket::V5_0Publish(p) => {
1041 match p.qos() {
1043 Qos::AtLeastOnce => {
1044 self.pid_puback.insert(p.packet_id().unwrap());
1045 }
1046 Qos::ExactlyOnce => {
1047 self.pid_pubrec.insert(p.packet_id().unwrap());
1048 }
1049 _ => {
1050 warn!("QoS 0 packet found in store, skipping");
1052 continue;
1053 }
1054 }
1055 let packet_id = p.packet_id().unwrap();
1057 if self.pid_man.register_id(packet_id).is_ok() {
1058 if let Err(_e) = self.store.add(packet) {
1059 error!("Failed to add packet to store: {:?}", _e);
1060 }
1061 } else {
1062 error!("Packet ID {} has already been used. Skip it", packet_id);
1063 }
1064 }
1065 GenericStorePacket::V3_1_1Pubrel(p) => {
1066 self.pid_pubcomp.insert(p.packet_id());
1068 let packet_id = p.packet_id();
1070 if self.pid_man.register_id(packet_id).is_ok() {
1071 if let Err(_e) = self.store.add(packet) {
1072 error!("Failed to add packet to store: {:?}", _e);
1073 }
1074 } else {
1075 error!("Packet ID {} has already been used. Skip it", packet_id);
1076 }
1077 }
1078 GenericStorePacket::V5_0Pubrel(p) => {
1079 self.pid_pubcomp.insert(p.packet_id());
1081 let packet_id = p.packet_id();
1083 if self.pid_man.register_id(packet_id).is_ok() {
1084 if let Err(_e) = self.store.add(packet) {
1085 error!("Failed to add packet to store: {:?}", _e);
1086 }
1087 } else {
1088 error!("Packet ID {} has already been used. Skip it", packet_id);
1089 }
1090 }
1091 }
1092 }
1093 }
1094
1095 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1104 self.store.get_stored()
1105 }
1106
1107 pub fn get_protocol_version(&self) -> Version {
1113 self.protocol_version
1114 }
1115
1116 pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1126 self.qos2_publish_processing.contains(&packet_id)
1127 }
1128
1129 pub fn regulate_for_store(
1134 &self,
1135 mut packet: v5_0::GenericPublish<PacketIdType>,
1136 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1137 if packet.topic_name().is_empty() {
1138 if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1140 if let Some(ref topic_alias_send) = self.topic_alias_send {
1141 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1142 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1144 } else {
1145 return Err(MqttError::PacketNotRegulated);
1146 }
1147 } else {
1148 return Err(MqttError::PacketNotRegulated);
1149 }
1150 } else {
1151 return Err(MqttError::PacketNotRegulated);
1152 }
1153 } else {
1154 packet = packet.remove_topic_alias();
1156 }
1157
1158 Ok(packet)
1159 }
1160
1161 fn initialize(&mut self, is_client: bool) {
1175 self.publish_send_max = None;
1176 self.publish_recv_max = None;
1177 self.publish_send_count = 0;
1178 self.topic_alias_send = None;
1179 self.topic_alias_recv = None;
1180 self.publish_recv.clear();
1181 self.qos2_publish_processing.clear();
1182 self.need_store = false;
1183 self.pid_suback.clear();
1184 self.pid_unsuback.clear();
1185 self.is_client = is_client;
1186 self.pingreq_keep_alive_ms = 0;
1187 self.pingreq_server_keep_alive_ms = None;
1188 }
1189
1190 fn clear_store_related(&mut self) {
1191 self.pid_man.clear();
1192 self.pid_puback.clear();
1193 self.pid_pubrec.clear();
1194 self.pid_pubcomp.clear();
1195 self.store.clear();
1196 }
1197
1198 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1200 let mut events = Vec::new();
1201 self.store.for_each(|packet| {
1202 if packet.size() > self.maximum_packet_size_send as usize {
1203 let packet_id = packet.packet_id();
1204 self.pid_man.release_id(packet_id);
1205 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1206 return false; }
1208 events.push(GenericEvent::RequestSendPacket {
1209 packet: packet.clone().into(),
1210 release_packet_id_if_send_error: None,
1211 });
1212 true });
1214
1215 events
1216 }
1217
1218 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1230 let topic_alias = topic_alias_opt?;
1231
1232 if !self.validate_topic_alias_range(topic_alias) {
1233 return None;
1234 }
1235
1236 let topic_alias_send = self.topic_alias_send.as_mut()?;
1237 let topic = topic_alias_send.get(topic_alias)?;
1239
1240 Some(topic.to_string())
1241 }
1242
1243 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1255 let topic_alias_send = match &self.topic_alias_send {
1256 Some(tas) => tas,
1257 None => {
1258 error!("topic_alias is set but topic_alias_maximum is 0");
1259 return false;
1260 }
1261 };
1262
1263 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1264 error!("topic_alias is set but out of range");
1265 return false;
1266 }
1267
1268 true
1269 }
1270
1271 pub(crate) fn process_send_v3_1_1_connect(
1273 &mut self,
1274 packet: v3_1_1::Connect,
1275 ) -> Vec<GenericEvent<PacketIdType>> {
1276 info!("send connect v3.1.1: {packet}");
1277
1278 if self.status != ConnectionStatus::Disconnected {
1279 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1280 }
1281
1282 let mut events = Vec::new();
1283 self.initialize(true);
1284 self.status = ConnectionStatus::Connecting;
1285
1286 self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1287
1288 if packet.clean_start() {
1290 self.clear_store_related();
1291 } else {
1292 self.need_store = true;
1293 }
1294
1295 self.topic_alias_send = None;
1297
1298 events.push(GenericEvent::RequestSendPacket {
1299 packet: packet.into(),
1300 release_packet_id_if_send_error: None,
1301 });
1302 self.send_post_process(&mut events);
1303
1304 events
1305 }
1306
1307 pub(crate) fn process_send_v5_0_connect(
1309 &mut self,
1310 packet: v5_0::Connect,
1311 ) -> Vec<GenericEvent<PacketIdType>> {
1312 info!("send connect v5.0: {packet}");
1313 if !self.validate_maximum_packet_size_send(packet.size()) {
1314 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1315 }
1316 if self.status != ConnectionStatus::Disconnected {
1317 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1318 }
1319
1320 let mut events = Vec::new();
1321 self.initialize(true);
1322 self.status = ConnectionStatus::Connecting;
1323
1324 self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1325
1326 if packet.clean_start() {
1328 self.clear_store_related();
1329 }
1330
1331 for prop in packet.props() {
1333 match prop {
1334 Property::TopicAliasMaximum(val) => {
1335 if val.val() != 0 {
1336 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1337 }
1338 }
1339 Property::ReceiveMaximum(val) => {
1340 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1341 self.publish_recv_max = Some(val.val());
1342 }
1343 Property::MaximumPacketSize(val) => {
1344 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1345 self.maximum_packet_size_recv = val.val();
1346 }
1347 Property::SessionExpiryInterval(val) => {
1348 if val.val() != 0 {
1349 self.need_store = true;
1350 }
1351 }
1352 _ => {
1353 }
1355 }
1356 }
1357
1358 events.push(GenericEvent::RequestSendPacket {
1359 packet: packet.into(),
1360 release_packet_id_if_send_error: None,
1361 });
1362 self.send_post_process(&mut events);
1363
1364 events
1365 }
1366
1367 pub(crate) fn process_send_v3_1_1_connack(
1368 &mut self,
1369 packet: v3_1_1::Connack,
1370 ) -> Vec<GenericEvent<PacketIdType>> {
1371 info!("send connack v3.1.1: {packet}");
1372 if self.status != ConnectionStatus::Connecting {
1373 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1374 }
1375 let mut events = Vec::new();
1376 let rc = packet.return_code();
1377 events.push(GenericEvent::RequestSendPacket {
1378 packet: packet.into(),
1379 release_packet_id_if_send_error: None,
1380 });
1381 if rc != ConnectReturnCode::Accepted {
1382 self.status = ConnectionStatus::Disconnected;
1383 self.cancel_timers(&mut events);
1384 events.push(GenericEvent::RequestClose);
1385 return events;
1386 }
1387
1388 self.status = ConnectionStatus::Connected;
1389 events.extend(self.send_stored());
1390 self.send_post_process(&mut events);
1391
1392 events
1393 }
1394
1395 pub(crate) fn process_send_v5_0_connack(
1396 &mut self,
1397 packet: v5_0::Connack,
1398 ) -> Vec<GenericEvent<PacketIdType>> {
1399 info!("send connack v5.0: {packet}");
1400 if !self.validate_maximum_packet_size_send(packet.size()) {
1401 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1402 }
1403 if self.status != ConnectionStatus::Connecting {
1404 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1405 }
1406
1407 let mut events = Vec::new();
1408 let rc = packet.reason_code();
1409 if rc == ConnectReasonCode::Success {
1410 for prop in packet.props() {
1412 match prop {
1413 Property::TopicAliasMaximum(val) => {
1414 if val.val() != 0 {
1415 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1416 }
1417 }
1418 Property::ReceiveMaximum(val) => {
1419 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1420 self.publish_recv_max = Some(val.val());
1421 }
1422 Property::MaximumPacketSize(val) => {
1423 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1424 self.maximum_packet_size_recv = val.val();
1425 }
1426 Property::ServerKeepAlive(val) => {
1427 let val = val.val();
1428 if val == 0 {
1429 if self.pingreq_recv_set {
1430 self.pingreq_recv_set = false;
1431 events
1432 .push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
1433 }
1434 self.pingreq_recv_timeout_ms = 0;
1435 } else {
1436 self.pingreq_recv_timeout_ms = val as u64 * 1000 * 3 / 2;
1437 self.pingreq_recv_set = true;
1438 events.push(GenericEvent::RequestTimerReset {
1439 kind: TimerKind::PingreqRecv,
1440 duration_ms: self.pingreq_recv_timeout_ms,
1441 });
1442 }
1443 }
1444 _ => {
1445 }
1447 }
1448 }
1449 }
1450 events.push(GenericEvent::RequestSendPacket {
1451 packet: packet.into(),
1452 release_packet_id_if_send_error: None,
1453 });
1454
1455 if rc != ConnectReasonCode::Success {
1456 self.status = ConnectionStatus::Disconnected;
1457 self.cancel_timers(&mut events);
1458 events.push(GenericEvent::RequestClose);
1459 return events;
1460 }
1461
1462 self.status = ConnectionStatus::Connected;
1463
1464 events.extend(self.send_stored());
1465 self.send_post_process(&mut events);
1466
1467 events
1468 }
1469
1470 pub(crate) fn process_send_v3_1_1_publish(
1471 &mut self,
1472 packet: v3_1_1::GenericPublish<PacketIdType>,
1473 ) -> Vec<GenericEvent<PacketIdType>> {
1474 let mut events = Vec::new();
1475 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1476
1477 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1478 let packet_id = packet.packet_id().unwrap();
1480 if self.status != ConnectionStatus::Connected
1481 && !self.need_store
1482 && !self.offline_publish
1483 {
1484 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1485 if self.pid_man.is_used_id(packet_id) {
1486 self.pid_man.release_id(packet_id);
1487 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1488 }
1489 return events;
1490 }
1491 if !self.pid_man.is_used_id(packet_id) {
1492 error!("packet_id {packet_id} must be acquired or registered");
1493 events.push(GenericEvent::NotifyError(
1494 MqttError::PacketIdentifierInvalid,
1495 ));
1496 return events;
1497 }
1498 if self.need_store
1499 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1500 {
1501 let store_packet = packet.clone().set_dup(true);
1502 self.store.add(store_packet.try_into().unwrap()).unwrap();
1503 } else {
1504 release_packet_id_if_send_error = Some(packet_id);
1505 }
1506 if packet.qos() == Qos::ExactlyOnce {
1507 self.qos2_publish_processing.insert(packet_id);
1508 self.pid_pubrec.insert(packet_id);
1509 } else {
1510 self.pid_puback.insert(packet_id);
1511 }
1512 } else if self.status != ConnectionStatus::Connected {
1513 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1514 return events;
1515 }
1516
1517 if self.status == ConnectionStatus::Connected {
1518 events.push(GenericEvent::RequestSendPacket {
1519 packet: packet.into(),
1520 release_packet_id_if_send_error,
1521 });
1522 }
1523 self.send_post_process(&mut events);
1524
1525 events
1526 }
1527
1528 pub(crate) fn process_send_v5_0_publish(
1529 &mut self,
1530 mut packet: v5_0::GenericPublish<PacketIdType>,
1531 ) -> Vec<GenericEvent<PacketIdType>> {
1532 if !self.validate_maximum_packet_size_send(packet.size()) {
1533 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1534 }
1535
1536 let mut events = Vec::new();
1537 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1538 let mut topic_alias_validated = false;
1539 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1540 let packet_id = packet.packet_id().unwrap();
1541 if self.status != ConnectionStatus::Connected
1542 && !self.need_store
1543 && !self.offline_publish
1544 {
1545 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1546 if self.pid_man.is_used_id(packet_id) {
1547 self.pid_man.release_id(packet_id);
1548 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1549 }
1550 return events;
1551 }
1552
1553 if !self.pid_man.is_used_id(packet_id) {
1555 error!("packet_id {packet_id} must be acquired or registered");
1556 events.push(GenericEvent::NotifyError(
1557 MqttError::PacketIdentifierInvalid,
1558 ));
1559 return events;
1560 }
1561
1562 if self.need_store
1563 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1564 {
1565 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1566 if packet.topic_name().is_empty() {
1567 let topic_opt = self.validate_topic_alias(ta_opt);
1569 if topic_opt.is_none() {
1570 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1571 if self.pid_man.is_used_id(packet_id) {
1572 self.pid_man.release_id(packet_id);
1573 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1574 }
1575 return events;
1576 }
1577 topic_alias_validated = true;
1578 let store_packet = packet
1579 .clone()
1580 .remove_topic_alias_add_topic(topic_opt.unwrap())
1581 .unwrap()
1582 .set_dup(true);
1583 self.store.add(store_packet.try_into().unwrap()).unwrap();
1584 } else {
1585 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1587 self.store.add(store_packet.try_into().unwrap()).unwrap();
1588 }
1589 } else {
1590 release_packet_id_if_send_error = Some(packet_id);
1591 }
1592 if packet.qos() == Qos::ExactlyOnce {
1593 self.qos2_publish_processing.insert(packet_id);
1594 self.pid_pubrec.insert(packet_id);
1595 } else {
1596 self.pid_puback.insert(packet_id);
1597 }
1598 } else if self.status != ConnectionStatus::Connected {
1599 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1600 return events;
1601 }
1602
1603 let packet_id_opt = packet.packet_id();
1604 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1605 if packet.topic_name().is_empty() {
1606 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1608 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1609 if let Some(packet_id) = packet_id_opt {
1610 if self.pid_man.is_used_id(packet_id) {
1611 self.pid_man.release_id(packet_id);
1612 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1613 }
1614 }
1615 return events;
1616 }
1617 } else if let Some(ta) = ta_opt {
1618 if self.validate_topic_alias_range(ta) {
1620 trace!(
1621 "topic alias: {} - {} is registered.",
1622 packet.topic_name(),
1623 ta
1624 );
1625 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1626 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1627 }
1628 } else {
1629 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1630 if let Some(packet_id) = packet_id_opt {
1631 if self.pid_man.is_used_id(packet_id) {
1632 self.pid_man.release_id(packet_id);
1633 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1634 }
1635 }
1636 return events;
1637 }
1638 } else if self.status == ConnectionStatus::Connected {
1639 if self.auto_map_topic_alias_send {
1641 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1642 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1643 trace!(
1644 "topic alias: {} - {} is found.",
1645 packet.topic_name(),
1646 found_ta
1647 );
1648 packet = packet.remove_topic_add_topic_alias(found_ta);
1649 } else {
1650 let lru_ta = topic_alias_send.get_lru_alias();
1651 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1652 packet = packet.remove_topic_add_topic_alias(lru_ta);
1653 }
1654 }
1655 } else if self.auto_replace_topic_alias_send {
1656 if let Some(ref topic_alias_send) = self.topic_alias_send {
1657 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1658 trace!(
1659 "topic alias: {} - {} is found.",
1660 packet.topic_name(),
1661 found_ta
1662 );
1663 packet = packet.remove_topic_add_topic_alias(found_ta);
1664 }
1665 }
1666 }
1667 }
1668
1669 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1671 if let Some(max) = self.publish_send_max {
1672 if self.publish_send_count == max {
1673 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1674 if let Some(packet_id) = packet_id_opt {
1675 if self.pid_man.is_used_id(packet_id) {
1676 self.pid_man.release_id(packet_id);
1677 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1678 }
1679 }
1680 return events;
1681 }
1682 self.publish_send_count += 1;
1683 }
1684 }
1685
1686 if self.status == ConnectionStatus::Connected {
1687 events.push(GenericEvent::RequestSendPacket {
1688 packet: packet.into(),
1689 release_packet_id_if_send_error,
1690 });
1691 }
1692 self.send_post_process(&mut events);
1693
1694 events
1695 }
1696
1697 pub(crate) fn process_send_v3_1_1_puback(
1698 &mut self,
1699 packet: v3_1_1::GenericPuback<PacketIdType>,
1700 ) -> Vec<GenericEvent<PacketIdType>> {
1701 if self.status != ConnectionStatus::Connected {
1702 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1703 }
1704 let mut events = Vec::new();
1705
1706 events.push(GenericEvent::RequestSendPacket {
1707 packet: packet.into(),
1708 release_packet_id_if_send_error: None,
1709 });
1710 self.send_post_process(&mut events);
1711
1712 events
1713 }
1714
1715 pub(crate) fn process_send_v5_0_puback(
1716 &mut self,
1717 packet: v5_0::GenericPuback<PacketIdType>,
1718 ) -> Vec<GenericEvent<PacketIdType>> {
1719 if !self.validate_maximum_packet_size_send(packet.size()) {
1720 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1721 }
1722 if self.status != ConnectionStatus::Connected {
1723 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1724 }
1725
1726 let mut events = Vec::new();
1727 self.publish_recv.remove(&packet.packet_id());
1728
1729 events.push(GenericEvent::RequestSendPacket {
1730 packet: packet.into(),
1731 release_packet_id_if_send_error: None,
1732 });
1733 self.send_post_process(&mut events);
1734
1735 events
1736 }
1737
1738 pub(crate) fn process_send_v3_1_1_pubrec(
1739 &mut self,
1740 packet: v3_1_1::GenericPubrec<PacketIdType>,
1741 ) -> Vec<GenericEvent<PacketIdType>> {
1742 if self.status != ConnectionStatus::Connected {
1743 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1744 }
1745 let mut events = Vec::new();
1746
1747 events.push(GenericEvent::RequestSendPacket {
1748 packet: packet.into(),
1749 release_packet_id_if_send_error: None,
1750 });
1751 self.send_post_process(&mut events);
1752
1753 events
1754 }
1755
1756 pub(crate) fn process_send_v5_0_pubrec(
1757 &mut self,
1758 packet: v5_0::GenericPubrec<PacketIdType>,
1759 ) -> Vec<GenericEvent<PacketIdType>> {
1760 if !self.validate_maximum_packet_size_send(packet.size()) {
1761 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1762 }
1763 if self.status != ConnectionStatus::Connected {
1764 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1765 }
1766
1767 let mut events = Vec::new();
1768 let packet_id = packet.packet_id();
1769
1770 if let Some(rc) = packet.reason_code() {
1771 if rc.is_failure() {
1772 self.publish_recv.remove(&packet_id);
1773 self.qos2_publish_handled.remove(&packet_id);
1774 }
1775 }
1776
1777 events.push(GenericEvent::RequestSendPacket {
1778 packet: packet.into(),
1779 release_packet_id_if_send_error: None,
1780 });
1781 self.send_post_process(&mut events);
1782
1783 events
1784 }
1785
1786 pub(crate) fn process_send_v3_1_1_pubrel(
1787 &mut self,
1788 packet: v3_1_1::GenericPubrel<PacketIdType>,
1789 ) -> Vec<GenericEvent<PacketIdType>> {
1790 if self.status != ConnectionStatus::Connected && !self.need_store {
1791 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1792 }
1793 let mut events = Vec::new();
1794 let packet_id = packet.packet_id();
1795 if !self.pid_man.is_used_id(packet_id) {
1796 error!("packet_id {packet_id} must be acquired or registered");
1797 events.push(GenericEvent::NotifyError(
1798 MqttError::PacketIdentifierInvalid,
1799 ));
1800 return events;
1801 }
1802 if self.need_store {
1803 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1804 }
1805
1806 if self.status == ConnectionStatus::Connected {
1807 self.pid_pubcomp.insert(packet_id);
1808 events.push(GenericEvent::RequestSendPacket {
1809 packet: packet.into(),
1810 release_packet_id_if_send_error: None,
1811 });
1812 }
1813 self.send_post_process(&mut events);
1814
1815 events
1816 }
1817
1818 pub(crate) fn process_send_v5_0_pubrel(
1819 &mut self,
1820 packet: v5_0::GenericPubrel<PacketIdType>,
1821 ) -> Vec<GenericEvent<PacketIdType>> {
1822 if !self.validate_maximum_packet_size_send(packet.size()) {
1823 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1824 }
1825 if self.status != ConnectionStatus::Connected && !self.need_store {
1826 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1827 }
1828
1829 let mut events = Vec::new();
1830 let packet_id = packet.packet_id();
1831 if !self.pid_man.is_used_id(packet_id) {
1832 error!("packet_id {packet_id} must be acquired or registered");
1833 events.push(GenericEvent::NotifyError(
1834 MqttError::PacketIdentifierInvalid,
1835 ));
1836 return events;
1837 }
1838 if self.need_store {
1839 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1840 }
1841
1842 if self.status == ConnectionStatus::Connected {
1843 self.pid_pubcomp.insert(packet_id);
1844 events.push(GenericEvent::RequestSendPacket {
1845 packet: packet.into(),
1846 release_packet_id_if_send_error: None,
1847 });
1848 }
1849 self.send_post_process(&mut events);
1850
1851 events
1852 }
1853
1854 pub(crate) fn process_send_v3_1_1_pubcomp(
1855 &mut self,
1856 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1857 ) -> Vec<GenericEvent<PacketIdType>> {
1858 if self.status != ConnectionStatus::Connected {
1859 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1860 }
1861 let mut events = Vec::new();
1862
1863 events.push(GenericEvent::RequestSendPacket {
1864 packet: packet.into(),
1865 release_packet_id_if_send_error: None,
1866 });
1867 self.send_post_process(&mut events);
1868
1869 events
1870 }
1871
1872 pub(crate) fn process_send_v5_0_pubcomp(
1873 &mut self,
1874 packet: v5_0::GenericPubcomp<PacketIdType>,
1875 ) -> Vec<GenericEvent<PacketIdType>> {
1876 if !self.validate_maximum_packet_size_send(packet.size()) {
1877 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1878 }
1879 if self.status != ConnectionStatus::Connected {
1880 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1881 }
1882
1883 let mut events = Vec::new();
1884 self.publish_recv.remove(&packet.packet_id());
1885
1886 events.push(GenericEvent::RequestSendPacket {
1887 packet: packet.into(),
1888 release_packet_id_if_send_error: None,
1889 });
1890 self.send_post_process(&mut events);
1891
1892 events
1893 }
1894
1895 pub(crate) fn process_send_v3_1_1_subscribe(
1896 &mut self,
1897 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1898 ) -> Vec<GenericEvent<PacketIdType>> {
1899 let mut events = Vec::new();
1900 let packet_id = packet.packet_id();
1901 if self.status != ConnectionStatus::Connected {
1902 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1903 if self.pid_man.is_used_id(packet_id) {
1904 self.pid_man.release_id(packet_id);
1905 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1906 }
1907 return events;
1908 }
1909 if !self.pid_man.is_used_id(packet_id) {
1910 error!("packet_id {packet_id} must be acquired or registered");
1911 events.push(GenericEvent::NotifyError(
1912 MqttError::PacketIdentifierInvalid,
1913 ));
1914 return events;
1915 }
1916 self.pid_suback.insert(packet_id);
1917
1918 events.push(GenericEvent::RequestSendPacket {
1919 packet: packet.into(),
1920 release_packet_id_if_send_error: Some(packet_id),
1921 });
1922 self.send_post_process(&mut events);
1923
1924 events
1925 }
1926
1927 pub(crate) fn process_send_v5_0_subscribe(
1928 &mut self,
1929 packet: v5_0::GenericSubscribe<PacketIdType>,
1930 ) -> Vec<GenericEvent<PacketIdType>> {
1931 if !self.validate_maximum_packet_size_send(packet.size()) {
1932 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1933 }
1934
1935 let mut events = Vec::new();
1936 let packet_id = packet.packet_id();
1937 if self.status != ConnectionStatus::Connected {
1938 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1939 if self.pid_man.is_used_id(packet_id) {
1940 self.pid_man.release_id(packet_id);
1941 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1942 }
1943 return events;
1944 }
1945 if !self.pid_man.is_used_id(packet_id) {
1946 error!("packet_id {packet_id} must be acquired or registered");
1947 events.push(GenericEvent::NotifyError(
1948 MqttError::PacketIdentifierInvalid,
1949 ));
1950 return events;
1951 }
1952 self.pid_suback.insert(packet_id);
1953
1954 events.push(GenericEvent::RequestSendPacket {
1955 packet: packet.into(),
1956 release_packet_id_if_send_error: Some(packet_id),
1957 });
1958 self.send_post_process(&mut events);
1959
1960 events
1961 }
1962
1963 pub(crate) fn process_send_v3_1_1_suback(
1964 &mut self,
1965 packet: v3_1_1::GenericSuback<PacketIdType>,
1966 ) -> Vec<GenericEvent<PacketIdType>> {
1967 if self.status != ConnectionStatus::Connected {
1968 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1969 }
1970 let mut events = Vec::new();
1971 events.push(GenericEvent::RequestSendPacket {
1972 packet: packet.into(),
1973 release_packet_id_if_send_error: None,
1974 });
1975 self.send_post_process(&mut events);
1976
1977 events
1978 }
1979
1980 pub(crate) fn process_send_v5_0_suback(
1981 &mut self,
1982 packet: v5_0::GenericSuback<PacketIdType>,
1983 ) -> Vec<GenericEvent<PacketIdType>> {
1984 if !self.validate_maximum_packet_size_send(packet.size()) {
1985 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1986 }
1987 if self.status != ConnectionStatus::Connected {
1988 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1989 }
1990
1991 let mut events = Vec::new();
1992 events.push(GenericEvent::RequestSendPacket {
1993 packet: packet.into(),
1994 release_packet_id_if_send_error: None,
1995 });
1996 self.send_post_process(&mut events);
1997
1998 events
1999 }
2000
2001 pub(crate) fn process_send_v3_1_1_unsubscribe(
2002 &mut self,
2003 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
2004 ) -> Vec<GenericEvent<PacketIdType>> {
2005 let mut events = Vec::new();
2006 let packet_id = packet.packet_id();
2007 if self.status != ConnectionStatus::Connected {
2008 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2009 if self.pid_man.is_used_id(packet_id) {
2010 self.pid_man.release_id(packet_id);
2011 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2012 }
2013 return events;
2014 }
2015 if !self.pid_man.is_used_id(packet_id) {
2016 error!("packet_id {packet_id} must be acquired or registered");
2017 events.push(GenericEvent::NotifyError(
2018 MqttError::PacketIdentifierInvalid,
2019 ));
2020 return events;
2021 }
2022 self.pid_unsuback.insert(packet_id);
2023
2024 events.push(GenericEvent::RequestSendPacket {
2025 packet: packet.into(),
2026 release_packet_id_if_send_error: Some(packet_id),
2027 });
2028 self.send_post_process(&mut events);
2029
2030 events
2031 }
2032
2033 pub(crate) fn process_send_v5_0_unsubscribe(
2034 &mut self,
2035 packet: v5_0::GenericUnsubscribe<PacketIdType>,
2036 ) -> Vec<GenericEvent<PacketIdType>> {
2037 if !self.validate_maximum_packet_size_send(packet.size()) {
2038 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2039 }
2040
2041 let mut events = Vec::new();
2042 let packet_id = packet.packet_id();
2043 if self.status != ConnectionStatus::Connected {
2044 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2045 if self.pid_man.is_used_id(packet_id) {
2046 self.pid_man.release_id(packet_id);
2047 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2048 }
2049 return events;
2050 }
2051 if !self.pid_man.is_used_id(packet_id) {
2052 error!("packet_id {packet_id} must be acquired or registered");
2053 events.push(GenericEvent::NotifyError(
2054 MqttError::PacketIdentifierInvalid,
2055 ));
2056 return events;
2057 }
2058 self.pid_unsuback.insert(packet_id);
2059
2060 events.push(GenericEvent::RequestSendPacket {
2061 packet: packet.into(),
2062 release_packet_id_if_send_error: Some(packet_id),
2063 });
2064 self.send_post_process(&mut events);
2065
2066 events
2067 }
2068
2069 pub(crate) fn process_send_v3_1_1_unsuback(
2070 &mut self,
2071 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2072 ) -> Vec<GenericEvent<PacketIdType>> {
2073 if self.status != ConnectionStatus::Connected {
2074 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2075 }
2076 let mut events = Vec::new();
2077 events.push(GenericEvent::RequestSendPacket {
2078 packet: packet.into(),
2079 release_packet_id_if_send_error: None,
2080 });
2081 self.send_post_process(&mut events);
2082
2083 events
2084 }
2085
2086 pub(crate) fn process_send_v5_0_unsuback(
2087 &mut self,
2088 packet: v5_0::GenericUnsuback<PacketIdType>,
2089 ) -> Vec<GenericEvent<PacketIdType>> {
2090 if !self.validate_maximum_packet_size_send(packet.size()) {
2091 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2092 }
2093 if self.status != ConnectionStatus::Connected {
2094 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2095 }
2096
2097 let mut events = Vec::new();
2098 events.push(GenericEvent::RequestSendPacket {
2099 packet: packet.into(),
2100 release_packet_id_if_send_error: None,
2101 });
2102 self.send_post_process(&mut events);
2103
2104 events
2105 }
2106
2107 pub(crate) fn process_send_v3_1_1_pingreq(
2108 &mut self,
2109 packet: v3_1_1::Pingreq,
2110 ) -> Vec<GenericEvent<PacketIdType>> {
2111 if self.status != ConnectionStatus::Connected {
2112 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2113 }
2114 let mut events = Vec::new();
2115 events.push(GenericEvent::RequestSendPacket {
2116 packet: packet.into(),
2117 release_packet_id_if_send_error: None,
2118 });
2119 if self.pingresp_recv_timeout_ms != 0 {
2120 self.pingresp_recv_set = true;
2121 events.push(GenericEvent::RequestTimerReset {
2122 kind: TimerKind::PingrespRecv,
2123 duration_ms: self.pingresp_recv_timeout_ms,
2124 });
2125 }
2126 self.send_post_process(&mut events);
2127
2128 events
2129 }
2130
2131 pub(crate) fn process_send_v5_0_pingreq(
2132 &mut self,
2133 packet: v5_0::Pingreq,
2134 ) -> Vec<GenericEvent<PacketIdType>> {
2135 if !self.validate_maximum_packet_size_send(packet.size()) {
2136 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2137 }
2138 if self.status != ConnectionStatus::Connected {
2139 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2140 }
2141
2142 let mut events = Vec::new();
2143 events.push(GenericEvent::RequestSendPacket {
2144 packet: packet.into(),
2145 release_packet_id_if_send_error: None,
2146 });
2147 if self.pingresp_recv_timeout_ms != 0 {
2148 self.pingresp_recv_set = true;
2149 events.push(GenericEvent::RequestTimerReset {
2150 kind: TimerKind::PingrespRecv,
2151 duration_ms: self.pingresp_recv_timeout_ms,
2152 });
2153 }
2154 self.send_post_process(&mut events);
2155
2156 events
2157 }
2158
2159 pub(crate) fn process_send_v3_1_1_pingresp(
2160 &mut self,
2161 packet: v3_1_1::Pingresp,
2162 ) -> Vec<GenericEvent<PacketIdType>> {
2163 if self.status != ConnectionStatus::Connected {
2164 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2165 }
2166 let mut events = Vec::new();
2167 events.push(GenericEvent::RequestSendPacket {
2168 packet: packet.into(),
2169 release_packet_id_if_send_error: None,
2170 });
2171 self.send_post_process(&mut events);
2172
2173 events
2174 }
2175
2176 pub(crate) fn process_send_v5_0_pingresp(
2177 &mut self,
2178 packet: v5_0::Pingresp,
2179 ) -> Vec<GenericEvent<PacketIdType>> {
2180 if !self.validate_maximum_packet_size_send(packet.size()) {
2181 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2182 }
2183 if self.status != ConnectionStatus::Connected {
2184 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2185 }
2186
2187 let mut events = Vec::new();
2188 events.push(GenericEvent::RequestSendPacket {
2189 packet: packet.into(),
2190 release_packet_id_if_send_error: None,
2191 });
2192 self.send_post_process(&mut events);
2193
2194 events
2195 }
2196
2197 pub(crate) fn process_send_v3_1_1_disconnect(
2198 &mut self,
2199 packet: v3_1_1::Disconnect,
2200 ) -> Vec<GenericEvent<PacketIdType>> {
2201 if self.status != ConnectionStatus::Connected {
2202 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2203 }
2204 let mut events = Vec::new();
2205 self.status = ConnectionStatus::Disconnected;
2206 self.cancel_timers(&mut events);
2207 events.push(GenericEvent::RequestSendPacket {
2208 packet: packet.into(),
2209 release_packet_id_if_send_error: None,
2210 });
2211 events.push(GenericEvent::RequestClose);
2212
2213 events
2214 }
2215
2216 pub(crate) fn process_send_v5_0_disconnect(
2217 &mut self,
2218 packet: v5_0::Disconnect,
2219 ) -> Vec<GenericEvent<PacketIdType>> {
2220 if !self.validate_maximum_packet_size_send(packet.size()) {
2221 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2222 }
2223 if self.status != ConnectionStatus::Connected {
2224 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2225 }
2226
2227 let mut events = Vec::new();
2228 self.status = ConnectionStatus::Disconnected;
2229 self.cancel_timers(&mut events);
2230 events.push(GenericEvent::RequestSendPacket {
2231 packet: packet.into(),
2232 release_packet_id_if_send_error: None,
2233 });
2234 events.push(GenericEvent::RequestClose);
2235
2236 events
2237 }
2238
2239 pub(crate) fn process_send_v5_0_auth(
2240 &mut self,
2241 packet: v5_0::Auth,
2242 ) -> Vec<GenericEvent<PacketIdType>> {
2243 if !self.validate_maximum_packet_size_send(packet.size()) {
2244 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2245 }
2246 if self.status == ConnectionStatus::Disconnected {
2247 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2248 }
2249
2250 let mut events = Vec::new();
2251 events.push(GenericEvent::RequestSendPacket {
2252 packet: packet.into(),
2253 release_packet_id_if_send_error: None,
2254 });
2255 self.send_post_process(&mut events);
2256
2257 events
2258 }
2259
2260 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2261 if self.is_client {
2262 let mut ms: u64 = self.pingreq_keep_alive_ms;
2264 if let Some(timeout_ms) = self.pingreq_user_send_interval_ms {
2265 ms = timeout_ms;
2267 } else if let Some(timeout_ms) = self.pingreq_server_keep_alive_ms {
2268 ms = timeout_ms;
2270 }
2271 if ms > 0 {
2272 self.pingreq_send_set = true;
2273 events.push(GenericEvent::RequestTimerReset {
2274 kind: TimerKind::PingreqSend,
2275 duration_ms: ms,
2276 });
2277 }
2278 }
2279 }
2280
2281 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2282 if size > self.maximum_packet_size_send as usize {
2283 error!("packet size over maximum_packet_size for sending");
2284 return false;
2285 }
2286 true
2287 }
2288
2289 #[rustfmt::skip]
2290 fn can_receive(&self, packet_type: u8) -> bool {
2291 !((Role::IS_CLIENT &&
2292 (
2293 packet_type == 1 || packet_type == 8 || packet_type == 10 || packet_type == 12 || (packet_type == 14 && self.protocol_version == Version::V3_1_1) || (packet_type == 15 && self.protocol_version == Version::V3_1_1) )
2300 ) || (Role::IS_SERVER &&
2301 (
2302 packet_type == 2 || packet_type == 9 || packet_type == 11 || packet_type == 13 || (packet_type == 15 && self.protocol_version == Version::V3_1_1) )
2308 ))
2309 }
2310
2311 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2312 let mut events = Vec::new();
2313
2314 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2316 if total_size > self.maximum_packet_size_recv {
2317 let disconnect_packet = v5_0::Disconnect::builder()
2323 .reason_code(DisconnectReasonCode::PacketTooLarge)
2324 .build()
2325 .unwrap();
2326 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2328 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2329 return events;
2330 }
2331
2332 let packet_type = raw_packet.packet_type();
2333 if !self.can_receive(packet_type) {
2334 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2335 return events;
2336 }
2337
2338 let _flags = raw_packet.flags();
2339 match self.protocol_version {
2340 Version::V3_1_1 => {
2341 match packet_type {
2342 1 => {
2343 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2345 }
2346 2 => {
2347 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2349 }
2350 3 => {
2351 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2353 }
2354 4 => {
2355 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2357 }
2358 5 => {
2359 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2361 }
2362 6 => {
2363 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2365 }
2366 7 => {
2367 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2369 }
2370 8 => {
2371 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2373 }
2374 9 => {
2375 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2377 }
2378 10 => {
2379 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2381 }
2382 11 => {
2383 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2385 }
2386 12 => {
2387 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2389 }
2390 13 => {
2391 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2393 }
2394 14 => {
2395 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2397 }
2398 _ => {
2400 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2401 }
2402 }
2403 }
2404 Version::V5_0 => {
2405 match packet_type {
2406 1 => {
2407 events.extend(self.process_recv_v5_0_connect(raw_packet));
2409 }
2410 2 => {
2411 events.extend(self.process_recv_v5_0_connack(raw_packet));
2413 }
2414 3 => {
2415 events.extend(self.process_recv_v5_0_publish(raw_packet));
2417 }
2418 4 => {
2419 events.extend(self.process_recv_v5_0_puback(raw_packet));
2421 }
2422 5 => {
2423 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2425 }
2426 6 => {
2427 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2429 }
2430 7 => {
2431 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2433 }
2434 8 => {
2435 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2437 }
2438 9 => {
2439 events.extend(self.process_recv_v5_0_suback(raw_packet));
2441 }
2442 10 => {
2443 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2445 }
2446 11 => {
2447 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2449 }
2450 12 => {
2451 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2453 }
2454 13 => {
2455 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2457 }
2458 14 => {
2459 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2461 }
2462 15 => {
2463 events.extend(self.process_recv_v5_0_auth(raw_packet));
2465 }
2466 _ => {
2468 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2469 }
2470 }
2471 }
2472 Version::Undetermined => {
2473 match packet_type {
2474 1 => {
2475 if raw_packet.remaining_length() < 7 {
2477 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2478 return events;
2479 }
2480 match raw_packet.data_as_slice()[6] {
2481 4 => {
2483 self.protocol_version = Version::V3_1_1;
2484 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2485 }
2486 5 => {
2487 self.protocol_version = Version::V5_0;
2488 events.extend(self.process_recv_v5_0_connect(raw_packet));
2489 }
2490 _ => {
2491 events.push(GenericEvent::NotifyError(
2492 MqttError::UnsupportedProtocolVersion,
2493 ));
2494 }
2495 }
2496 }
2497 _ => {
2498 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2499 }
2500 }
2501 }
2502 }
2503
2504 events
2505 }
2506
2507 fn process_recv_v3_1_1_connect(
2508 &mut self,
2509 raw_packet: RawPacket,
2510 ) -> Vec<GenericEvent<PacketIdType>> {
2511 let mut events = Vec::new();
2512 if self.status != ConnectionStatus::Disconnected {
2513 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2514 return events;
2515 }
2516 self.status = ConnectionStatus::Connecting;
2517 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2518 Ok((packet, _)) => {
2519 self.initialize(false);
2520 if packet.keep_alive() > 0 {
2521 self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2522 }
2523 if packet.clean_session() {
2524 self.clear_store_related();
2525 } else {
2526 self.need_store = true;
2527 }
2528 events.extend(self.refresh_pingreq_recv());
2529 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2530 }
2531 Err(e) => {
2532 let rc = match e {
2533 MqttError::ClientIdentifierNotValid => ConnectReturnCode::IdentifierRejected,
2534 MqttError::BadUserNameOrPassword => ConnectReturnCode::BadUserNameOrPassword,
2535 MqttError::UnsupportedProtocolVersion => {
2536 ConnectReturnCode::UnacceptableProtocolVersion
2537 }
2538 _ => ConnectReturnCode::NotAuthorized, };
2540 let connack = v3_1_1::Connack::builder()
2541 .return_code(rc)
2542 .session_present(false)
2543 .build()
2544 .unwrap();
2545 let connack_events = self.process_send_v3_1_1_connack(connack);
2546 events.extend(connack_events);
2547 events.push(GenericEvent::NotifyError(e));
2548 }
2549 }
2550
2551 events
2552 }
2553
2554 fn process_recv_v5_0_connect(
2555 &mut self,
2556 raw_packet: RawPacket,
2557 ) -> Vec<GenericEvent<PacketIdType>> {
2558 let mut events = Vec::new();
2559 if self.status != ConnectionStatus::Disconnected {
2560 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
2561 return events;
2562 }
2563 self.status = ConnectionStatus::Connecting;
2564 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2565 Ok((packet, _)) => {
2566 self.initialize(false);
2567 if packet.keep_alive() > 0 {
2568 self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2569 }
2570 if packet.clean_start() {
2571 self.clear_store_related();
2572 }
2573 packet.props().iter().for_each(|prop| match prop {
2574 Property::TopicAliasMaximum(p) => {
2575 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2576 }
2577 Property::ReceiveMaximum(p) => {
2578 self.publish_send_max = Some(p.val());
2579 }
2580 Property::MaximumPacketSize(p) => {
2581 self.maximum_packet_size_send = p.val();
2582 }
2583 Property::SessionExpiryInterval(p) => {
2584 if p.val() != 0 {
2585 self.need_store = true;
2586 }
2587 }
2588 _ => {}
2589 });
2590 events.extend(self.refresh_pingreq_recv());
2591 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2592 }
2593 Err(e) => {
2594 let rc = match e {
2595 MqttError::ClientIdentifierNotValid => {
2596 ConnectReasonCode::ClientIdentifierNotValid
2597 }
2598 MqttError::BadUserNameOrPassword => ConnectReasonCode::BadUserNameOrPassword,
2599 MqttError::UnsupportedProtocolVersion => {
2600 ConnectReasonCode::UnsupportedProtocolVersion
2601 }
2602 _ => ConnectReasonCode::UnspecifiedError,
2603 };
2604 let connack = v5_0::Connack::builder()
2605 .reason_code(rc)
2606 .session_present(false)
2607 .build()
2608 .unwrap();
2609 let connack_events = self.process_send_v5_0_connack(connack);
2610 events.extend(connack_events);
2611 events.push(GenericEvent::NotifyError(e));
2612 }
2613 }
2614
2615 events
2616 }
2617
2618 fn process_recv_v3_1_1_connack(
2619 &mut self,
2620 raw_packet: RawPacket,
2621 ) -> Vec<GenericEvent<PacketIdType>> {
2622 let mut events = Vec::new();
2623
2624 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2625 Ok((packet, _consumed)) => {
2626 if packet.return_code() == ConnectReturnCode::Accepted {
2627 self.status = ConnectionStatus::Connected;
2628 if packet.session_present() {
2629 events.extend(self.send_stored());
2630 } else {
2631 self.clear_store_related();
2632 }
2633 }
2634 events.push(GenericEvent::NotifyPacketReceived(
2635 GenericPacket::V3_1_1Connack(packet),
2636 ));
2637 }
2638 Err(e) => {
2639 Self::handle_v3_1_1_error(e, &mut events);
2640 }
2641 }
2642
2643 events
2644 }
2645
2646 fn process_recv_v5_0_connack(
2647 &mut self,
2648 raw_packet: RawPacket,
2649 ) -> Vec<GenericEvent<PacketIdType>> {
2650 let mut events = Vec::new();
2651
2652 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2653 Ok((packet, _consumed)) => {
2654 if packet.reason_code() == ConnectReasonCode::Success {
2655 self.status = ConnectionStatus::Connected;
2656
2657 for prop in packet.props() {
2659 match prop {
2660 Property::TopicAliasMaximum(val) => {
2661 if val.val() > 0 {
2662 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2663 }
2664 }
2665 Property::ReceiveMaximum(val) => {
2666 assert!(val.val() != 0);
2667 self.publish_send_max = Some(val.val());
2668 }
2669 Property::MaximumPacketSize(val) => {
2670 assert!(val.val() != 0);
2671 self.maximum_packet_size_send = val.val();
2672 }
2673 Property::ServerKeepAlive(val) => {
2674 let val = val.val() as u64 * 1000;
2675 self.pingreq_server_keep_alive_ms = Some(val);
2676 if self.pingreq_user_send_interval_ms.is_none() {
2677 if val == 0 {
2678 if self.pingreq_send_set {
2679 self.pingreq_send_set = false;
2680 events.push(GenericEvent::RequestTimerCancel(
2681 TimerKind::PingreqSend,
2682 ));
2683 }
2684 self.pingreq_user_send_interval_ms = None;
2685 } else {
2686 self.pingreq_send_set = true;
2687 events.push(GenericEvent::RequestTimerReset {
2688 kind: TimerKind::PingreqSend,
2689 duration_ms: val,
2690 });
2691 }
2692 }
2693 }
2694 _ => {
2695 }
2697 }
2698 }
2699
2700 if packet.session_present() {
2701 events.extend(self.send_stored());
2702 } else {
2703 self.clear_store_related();
2704 }
2705 }
2706 events.push(GenericEvent::NotifyPacketReceived(
2707 GenericPacket::V5_0Connack(packet),
2708 ));
2709 }
2710 Err(e) => {
2711 if self.status == ConnectionStatus::Connected {
2712 self.handle_v5_0_error(e, &mut events);
2713 } else {
2714 events.push(GenericEvent::NotifyError(e));
2715 }
2716 }
2717 }
2718
2719 events
2720 }
2721
2722 fn process_recv_v3_1_1_publish(
2723 &mut self,
2724 raw_packet: RawPacket,
2725 ) -> Vec<GenericEvent<PacketIdType>> {
2726 let mut events = Vec::new();
2727
2728 let flags = raw_packet.flags();
2729 match &raw_packet.data {
2730 PacketData::Publish(arc) => {
2731 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2732 Ok((packet, _consumed)) => {
2733 match packet.qos() {
2734 Qos::AtMostOnce => {
2735 events.extend(self.refresh_pingreq_recv());
2736 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2737 }
2738 Qos::AtLeastOnce => {
2739 let packet_id = packet.packet_id().unwrap();
2740 if self.status == ConnectionStatus::Connected
2741 && self.auto_pub_response
2742 {
2743 let puback = v3_1_1::GenericPuback::builder()
2745 .packet_id(packet_id)
2746 .build()
2747 .unwrap();
2748 events.extend(self.process_send_v3_1_1_puback(puback));
2749 }
2750 events.extend(self.refresh_pingreq_recv());
2751 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2752 }
2753 Qos::ExactlyOnce => {
2754 let packet_id = packet.packet_id().unwrap();
2755 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2756
2757 if self.status == ConnectionStatus::Connected
2758 && (self.auto_pub_response || already_handled)
2759 {
2760 let pubrec = v3_1_1::GenericPubrec::builder()
2761 .packet_id(packet_id)
2762 .build()
2763 .unwrap();
2764 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2765 }
2766 events.extend(self.refresh_pingreq_recv());
2767 if !already_handled {
2768 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2769 }
2770 }
2771 }
2772 }
2773 Err(e) => {
2774 Self::handle_v3_1_1_error(e, &mut events);
2775 }
2776 }
2777 }
2778 PacketData::Normal(_) => {
2779 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2780 }
2781 }
2782
2783 events
2784 }
2785
2786 fn process_recv_v5_0_publish(
2787 &mut self,
2788 raw_packet: RawPacket,
2789 ) -> Vec<GenericEvent<PacketIdType>> {
2790 let mut events = Vec::new();
2791
2792 let flags = raw_packet.flags();
2793 match &raw_packet.data {
2794 PacketData::Publish(arc) => {
2795 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2796 Ok((mut packet, _consumed)) => {
2797 let mut already_handled = false;
2798 let mut puback_send = false;
2799 let mut pubrec_send = false;
2800
2801 let mut check_receive_maximum =
2802 |events: &mut Vec<GenericEvent<PacketIdType>>| {
2803 if let Some(max) = self.publish_recv_max {
2804 if self.publish_recv.len() >= max as usize {
2805 self.handle_v5_0_error(
2806 MqttError::ReceiveMaximumExceeded,
2807 events,
2808 );
2809 return false;
2810 }
2811 }
2812 true
2813 };
2814
2815 match packet.qos() {
2816 Qos::AtLeastOnce => {
2817 let packet_id = packet.packet_id().unwrap();
2818 if !check_receive_maximum(&mut events) {
2819 return events;
2820 }
2821 self.publish_recv.insert(packet_id);
2822 if self.auto_pub_response
2823 && self.status == ConnectionStatus::Connected
2824 {
2825 puback_send = true;
2826 }
2827 }
2828 Qos::ExactlyOnce => {
2829 let packet_id = packet.packet_id().unwrap();
2830 if !check_receive_maximum(&mut events) {
2831 return events;
2832 }
2833 self.publish_recv.insert(packet_id);
2834
2835 if !self.qos2_publish_handled.insert(packet_id) {
2836 already_handled = true;
2837 }
2838 if self.status == ConnectionStatus::Connected
2839 && (self.auto_pub_response || already_handled)
2840 {
2841 pubrec_send = true;
2842 }
2843 }
2844 Qos::AtMostOnce => {
2845 }
2847 }
2848
2849 if packet.topic_name().is_empty() {
2851 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2853 if ta == 0
2854 || self.topic_alias_recv.is_none()
2855 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2856 {
2857 self.handle_v5_0_error(
2858 MqttError::TopicAliasInvalid,
2859 &mut events,
2860 );
2861 return events;
2862 }
2863
2864 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2865 if let Some(topic_name) = topic_alias_recv.get(ta) {
2866 match packet.add_extracted_topic_name(topic_name) {
2867 Ok(extracted) => {
2868 packet = extracted;
2869 }
2870 Err(_e) => {
2871 error!("topic alias extract failed: {_e}");
2872 self.handle_v5_0_error(
2873 MqttError::TopicAliasInvalid,
2874 &mut events,
2875 );
2876 return events;
2877 }
2878 }
2879 } else {
2880 self.handle_v5_0_error(
2881 MqttError::TopicAliasInvalid,
2882 &mut events,
2883 );
2884 return events;
2885 }
2886 }
2887 } else {
2888 self.handle_v5_0_error(MqttError::TopicAliasInvalid, &mut events);
2889 return events;
2890 }
2891 } else {
2892 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2894 if ta == 0
2895 || self.topic_alias_recv.is_none()
2896 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2897 {
2898 self.handle_v5_0_error(
2899 MqttError::TopicAliasInvalid,
2900 &mut events,
2901 );
2902 return events;
2903 }
2904 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2905 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2906 }
2907 }
2908 }
2909
2910 if puback_send {
2912 let puback = v5_0::GenericPuback::builder()
2913 .packet_id(packet.packet_id().unwrap())
2914 .build()
2915 .unwrap();
2916 events.extend(self.process_send_v5_0_puback(puback));
2917 }
2918 if pubrec_send {
2919 let pubrec = v5_0::GenericPubrec::builder()
2920 .packet_id(packet.packet_id().unwrap())
2921 .build()
2922 .unwrap();
2923 events.extend(self.process_send_v5_0_pubrec(pubrec));
2924 }
2925
2926 events.extend(self.refresh_pingreq_recv());
2928
2929 if !already_handled {
2931 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2932 }
2933 }
2934 Err(e) => {
2935 if self.status == ConnectionStatus::Connected {
2936 self.handle_v5_0_error(e, &mut events);
2937 } else {
2938 events.push(GenericEvent::NotifyError(e));
2939 }
2940 }
2941 }
2942 }
2943 PacketData::Normal(_) => {
2944 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2945 }
2946 }
2947
2948 events
2949 }
2950
2951 fn process_recv_v3_1_1_puback(
2952 &mut self,
2953 raw_packet: RawPacket,
2954 ) -> Vec<GenericEvent<PacketIdType>> {
2955 let mut events = Vec::new();
2956
2957 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2958 Ok((packet, _)) => {
2959 let packet_id = packet.packet_id();
2960 if self.pid_puback.remove(&packet_id) {
2961 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2962 if self.pid_man.is_used_id(packet_id) {
2963 self.pid_man.release_id(packet_id);
2964 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2965 }
2966 events.extend(self.refresh_pingreq_recv());
2967 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2968 } else {
2969 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2970 }
2971 }
2972 Err(e) => {
2973 Self::handle_v3_1_1_error(e, &mut events);
2974 }
2975 }
2976
2977 events
2978 }
2979
2980 fn process_recv_v5_0_puback(
2981 &mut self,
2982 raw_packet: RawPacket,
2983 ) -> Vec<GenericEvent<PacketIdType>> {
2984 let mut events = Vec::new();
2985
2986 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2987 Ok((packet, _)) => {
2988 let packet_id = packet.packet_id();
2989 if self.pid_puback.remove(&packet_id) {
2990 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2991 if self.pid_man.is_used_id(packet_id) {
2992 self.pid_man.release_id(packet_id);
2993 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2994 }
2995 if self.publish_send_max.is_some() {
2996 self.publish_send_count -= 1;
2997 }
2998 events.extend(self.refresh_pingreq_recv());
2999 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3000 } else {
3001 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3002 }
3003 }
3004 Err(e) => {
3005 self.handle_v5_0_error(e, &mut events);
3006 }
3007 }
3008
3009 events
3010 }
3011
3012 fn process_recv_v3_1_1_pubrec(
3013 &mut self,
3014 raw_packet: RawPacket,
3015 ) -> Vec<GenericEvent<PacketIdType>> {
3016 let mut events = Vec::new();
3017
3018 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3019 Ok((packet, _)) => {
3020 let packet_id = packet.packet_id();
3021 if self.pid_pubrec.remove(&packet_id) {
3022 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
3023 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3024 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
3025 .packet_id(packet_id)
3026 .build()
3027 .unwrap();
3028 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
3029 }
3030 events.extend(self.refresh_pingreq_recv());
3031 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3032 } else {
3033 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3034 }
3035 }
3036 Err(e) => {
3037 Self::handle_v3_1_1_error(e, &mut events);
3038 }
3039 }
3040
3041 events
3042 }
3043
3044 fn process_recv_v5_0_pubrec(
3045 &mut self,
3046 raw_packet: RawPacket,
3047 ) -> Vec<GenericEvent<PacketIdType>> {
3048 let mut events = Vec::new();
3049
3050 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3051 Ok((packet, _)) => {
3052 let packet_id = packet.packet_id();
3053 if self.pid_pubrec.remove(&packet_id) {
3054 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3055 let reason_code = packet.reason_code();
3056 if reason_code.is_none() || reason_code.unwrap() == PubrecReasonCode::Success {
3057 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3058 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3059 .packet_id(packet_id)
3060 .build()
3061 .unwrap();
3062 events.extend(self.process_send_v5_0_pubrel(pubrel));
3063 }
3064 } else {
3065 if self.pid_man.is_used_id(packet_id) {
3066 self.pid_man.release_id(packet_id);
3067 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3068 }
3069 self.qos2_publish_processing.remove(&packet_id);
3070 if self.publish_send_max.is_some() {
3071 self.publish_send_count -= 1;
3072 }
3073 }
3074 events.extend(self.refresh_pingreq_recv());
3075 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3076 } else {
3077 self.handle_v5_0_error(
3078 MqttError::from(DisconnectReasonCode::ProtocolError),
3079 &mut events,
3080 );
3081 }
3082 }
3083 Err(e) => {
3084 self.handle_v5_0_error(e, &mut events);
3085 }
3086 }
3087
3088 events
3089 }
3090
3091 fn process_recv_v3_1_1_pubrel(
3092 &mut self,
3093 raw_packet: RawPacket,
3094 ) -> Vec<GenericEvent<PacketIdType>> {
3095 let mut events = Vec::new();
3096
3097 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3098 Ok((packet, _)) => {
3099 let packet_id = packet.packet_id();
3100 self.qos2_publish_handled.remove(&packet_id);
3101 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3102 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3103 .packet_id(packet_id)
3104 .build()
3105 .unwrap();
3106 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3107 }
3108 events.extend(self.refresh_pingreq_recv());
3109 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3110 }
3111 Err(e) => {
3112 Self::handle_v3_1_1_error(e, &mut events);
3113 }
3114 }
3115
3116 events
3117 }
3118
3119 fn process_recv_v5_0_pubrel(
3120 &mut self,
3121 raw_packet: RawPacket,
3122 ) -> Vec<GenericEvent<PacketIdType>> {
3123 let mut events = Vec::new();
3124
3125 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3126 Ok((packet, _)) => {
3127 let packet_id = packet.packet_id();
3128 self.qos2_publish_handled.remove(&packet_id);
3129 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3130 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3131 .packet_id(packet_id)
3132 .build()
3133 .unwrap();
3134 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3135 }
3136 events.extend(self.refresh_pingreq_recv());
3137 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3138 }
3139 Err(e) => {
3140 self.handle_v5_0_error(e, &mut events);
3141 }
3142 }
3143
3144 events
3145 }
3146
3147 fn process_recv_v3_1_1_pubcomp(
3148 &mut self,
3149 raw_packet: RawPacket,
3150 ) -> Vec<GenericEvent<PacketIdType>> {
3151 let mut events = Vec::new();
3152
3153 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3154 Ok((packet, _)) => {
3155 let packet_id = packet.packet_id();
3156 if self.pid_pubcomp.remove(&packet_id) {
3157 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3158 if self.pid_man.is_used_id(packet_id) {
3159 self.pid_man.release_id(packet_id);
3160 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3161 }
3162 self.qos2_publish_processing.remove(&packet_id);
3163 events.extend(self.refresh_pingreq_recv());
3164 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3165 } else {
3166 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3167 }
3168 }
3169 Err(e) => {
3170 Self::handle_v3_1_1_error(e, &mut events);
3171 }
3172 }
3173
3174 events
3175 }
3176
3177 fn process_recv_v5_0_pubcomp(
3178 &mut self,
3179 raw_packet: RawPacket,
3180 ) -> Vec<GenericEvent<PacketIdType>> {
3181 let mut events = Vec::new();
3182
3183 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3184 Ok((packet, _)) => {
3185 let packet_id = packet.packet_id();
3186 if self.pid_pubcomp.remove(&packet_id) {
3187 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3188 if self.pid_man.is_used_id(packet_id) {
3189 self.pid_man.release_id(packet_id);
3190 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3191 }
3192 self.qos2_publish_processing.remove(&packet_id);
3193 if self.publish_send_max.is_some() {
3194 self.publish_send_count -= 1;
3195 }
3196 events.extend(self.refresh_pingreq_recv());
3197 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3198 } else {
3199 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3200 }
3201 }
3202 Err(e) => {
3203 self.handle_v5_0_error(e, &mut events);
3204 }
3205 }
3206
3207 events
3208 }
3209
3210 fn process_recv_v3_1_1_subscribe(
3211 &mut self,
3212 raw_packet: RawPacket,
3213 ) -> Vec<GenericEvent<PacketIdType>> {
3214 let mut events = Vec::new();
3215
3216 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3217 Ok((packet, _)) => {
3218 events.extend(self.refresh_pingreq_recv());
3219 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3220 }
3221 Err(e) => {
3222 Self::handle_v3_1_1_error(e, &mut events);
3223 }
3224 }
3225
3226 events
3227 }
3228
3229 fn process_recv_v5_0_subscribe(
3230 &mut self,
3231 raw_packet: RawPacket,
3232 ) -> Vec<GenericEvent<PacketIdType>> {
3233 let mut events = Vec::new();
3234
3235 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3236 Ok((packet, _)) => {
3237 events.extend(self.refresh_pingreq_recv());
3238 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3239 }
3240 Err(e) => {
3241 self.handle_v5_0_error(e, &mut events);
3242 }
3243 }
3244
3245 events
3246 }
3247
3248 fn process_recv_v3_1_1_suback(
3249 &mut self,
3250 raw_packet: RawPacket,
3251 ) -> Vec<GenericEvent<PacketIdType>> {
3252 let mut events = Vec::new();
3253
3254 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3255 Ok((packet, _)) => {
3256 let packet_id = packet.packet_id();
3257 if self.pid_suback.remove(&packet_id) {
3258 if self.pid_man.is_used_id(packet_id) {
3259 self.pid_man.release_id(packet_id);
3260 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3261 }
3262 events.extend(self.refresh_pingreq_recv());
3263 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3264 } else {
3265 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3266 }
3267 }
3268 Err(e) => {
3269 Self::handle_v3_1_1_error(e, &mut events);
3270 }
3271 }
3272
3273 events
3274 }
3275
3276 fn process_recv_v5_0_suback(
3277 &mut self,
3278 raw_packet: RawPacket,
3279 ) -> Vec<GenericEvent<PacketIdType>> {
3280 let mut events = Vec::new();
3281
3282 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3283 Ok((packet, _)) => {
3284 let packet_id = packet.packet_id();
3285 if self.pid_suback.remove(&packet_id) {
3286 if self.pid_man.is_used_id(packet_id) {
3287 self.pid_man.release_id(packet_id);
3288 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3289 }
3290 events.extend(self.refresh_pingreq_recv());
3291 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3292 } else {
3293 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3294 }
3295 }
3296 Err(e) => {
3297 self.handle_v5_0_error(e, &mut events);
3298 }
3299 }
3300
3301 events
3302 }
3303
3304 fn process_recv_v3_1_1_unsubscribe(
3305 &mut self,
3306 raw_packet: RawPacket,
3307 ) -> Vec<GenericEvent<PacketIdType>> {
3308 let mut events = Vec::new();
3309
3310 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3311 Ok((packet, _)) => {
3312 events.extend(self.refresh_pingreq_recv());
3313 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3314 }
3315 Err(e) => {
3316 Self::handle_v3_1_1_error(e, &mut events);
3317 }
3318 }
3319
3320 events
3321 }
3322
3323 fn process_recv_v5_0_unsubscribe(
3324 &mut self,
3325 raw_packet: RawPacket,
3326 ) -> Vec<GenericEvent<PacketIdType>> {
3327 let mut events = Vec::new();
3328
3329 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3330 Ok((packet, _)) => {
3331 events.extend(self.refresh_pingreq_recv());
3332 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3333 }
3334 Err(e) => {
3335 self.handle_v5_0_error(e, &mut events);
3336 }
3337 }
3338
3339 events
3340 }
3341
3342 fn process_recv_v3_1_1_unsuback(
3343 &mut self,
3344 raw_packet: RawPacket,
3345 ) -> Vec<GenericEvent<PacketIdType>> {
3346 let mut events = Vec::new();
3347
3348 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3349 Ok((packet, _)) => {
3350 let packet_id = packet.packet_id();
3351 if self.pid_unsuback.remove(&packet_id) {
3352 if self.pid_man.is_used_id(packet_id) {
3353 self.pid_man.release_id(packet_id);
3354 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3355 }
3356 events.extend(self.refresh_pingreq_recv());
3357 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3358 } else {
3359 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3360 }
3361 }
3362 Err(e) => {
3363 Self::handle_v3_1_1_error(e, &mut events);
3364 }
3365 }
3366
3367 events
3368 }
3369
3370 fn process_recv_v5_0_unsuback(
3371 &mut self,
3372 raw_packet: RawPacket,
3373 ) -> Vec<GenericEvent<PacketIdType>> {
3374 let mut events = Vec::new();
3375
3376 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3377 Ok((packet, _)) => {
3378 let packet_id = packet.packet_id();
3379 if self.pid_unsuback.remove(&packet_id) {
3380 if self.pid_man.is_used_id(packet_id) {
3381 self.pid_man.release_id(packet_id);
3382 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3383 }
3384 events.extend(self.refresh_pingreq_recv());
3385 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3386 } else {
3387 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3388 }
3389 }
3390 Err(e) => {
3391 self.handle_v5_0_error(e, &mut events);
3392 }
3393 }
3394
3395 events
3396 }
3397
3398 fn process_recv_v3_1_1_pingreq(
3399 &mut self,
3400 raw_packet: RawPacket,
3401 ) -> Vec<GenericEvent<PacketIdType>> {
3402 let mut events = Vec::new();
3403
3404 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3405 Ok((packet, _)) => {
3406 if (Role::IS_SERVER || Role::IS_ANY)
3407 && !self.is_client
3408 && self.auto_ping_response
3409 && self.status == ConnectionStatus::Connected
3410 {
3411 let pingresp = v3_1_1::Pingresp::new();
3412 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3413 }
3414 events.extend(self.refresh_pingreq_recv());
3415 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3416 }
3417 Err(e) => {
3418 Self::handle_v3_1_1_error(e, &mut events);
3419 }
3420 }
3421
3422 events
3423 }
3424
3425 fn process_recv_v5_0_pingreq(
3426 &mut self,
3427 raw_packet: RawPacket,
3428 ) -> Vec<GenericEvent<PacketIdType>> {
3429 let mut events = Vec::new();
3430
3431 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3432 Ok((packet, _)) => {
3433 if (Role::IS_SERVER || Role::IS_ANY)
3434 && !self.is_client
3435 && self.auto_ping_response
3436 && self.status == ConnectionStatus::Connected
3437 {
3438 let pingresp = v5_0::Pingresp::new();
3439 events.extend(self.process_send_v5_0_pingresp(pingresp));
3440 }
3441 events.extend(self.refresh_pingreq_recv());
3442 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3443 }
3444 Err(e) => {
3445 self.handle_v5_0_error(e, &mut events);
3446 }
3447 }
3448
3449 events
3450 }
3451
3452 fn process_recv_v3_1_1_pingresp(
3453 &mut self,
3454 raw_packet: RawPacket,
3455 ) -> Vec<GenericEvent<PacketIdType>> {
3456 let mut events = Vec::new();
3457
3458 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3459 Ok((packet, _)) => {
3460 if self.pingresp_recv_set {
3461 self.pingresp_recv_set = false;
3462 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3463 }
3464 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3465 }
3466 Err(e) => {
3467 Self::handle_v3_1_1_error(e, &mut events);
3468 }
3469 }
3470
3471 events
3472 }
3473
3474 fn process_recv_v5_0_pingresp(
3475 &mut self,
3476 raw_packet: RawPacket,
3477 ) -> Vec<GenericEvent<PacketIdType>> {
3478 let mut events = Vec::new();
3479
3480 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3481 Ok((packet, _)) => {
3482 if self.pingresp_recv_set {
3483 self.pingresp_recv_set = false;
3484 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3485 }
3486 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3487 }
3488 Err(e) => {
3489 self.handle_v5_0_error(e, &mut events);
3490 }
3491 }
3492
3493 events
3494 }
3495
3496 fn process_recv_v3_1_1_disconnect(
3497 &mut self,
3498 raw_packet: RawPacket,
3499 ) -> Vec<GenericEvent<PacketIdType>> {
3500 let mut events = Vec::new();
3501
3502 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3503 Ok((packet, _)) => {
3504 self.cancel_timers(&mut events);
3505 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3506 }
3507 Err(e) => {
3508 Self::handle_v3_1_1_error(e, &mut events);
3509 }
3510 }
3511
3512 events
3513 }
3514
3515 fn process_recv_v5_0_disconnect(
3516 &mut self,
3517 raw_packet: RawPacket,
3518 ) -> Vec<GenericEvent<PacketIdType>> {
3519 let mut events = Vec::new();
3520
3521 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3522 Ok((packet, _)) => {
3523 self.cancel_timers(&mut events);
3524 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3525 }
3526 Err(e) => {
3527 self.handle_v5_0_error(e, &mut events);
3528 }
3529 }
3530
3531 events
3532 }
3533
3534 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3535 let mut events = Vec::new();
3536
3537 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3538 Ok((packet, _)) => {
3539 events.extend(self.refresh_pingreq_recv());
3540 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3541 }
3542 Err(e) => {
3543 self.handle_v5_0_error(e, &mut events);
3544 }
3545 }
3546
3547 events
3548 }
3549
3550 fn handle_v3_1_1_error(e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3551 events.push(GenericEvent::RequestClose);
3552 events.push(GenericEvent::NotifyError(e));
3553 }
3554
3555 fn handle_v5_0_error(&mut self, e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3556 let disconnect = v5_0::Disconnect::builder()
3557 .reason_code(e.into())
3558 .build()
3559 .unwrap();
3560 events.extend(self.process_send_v5_0_disconnect(disconnect));
3561 events.push(GenericEvent::NotifyError(e));
3562 }
3563
3564 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3565 let mut events = Vec::new();
3566 if self.pingreq_recv_timeout_ms != 0 {
3567 self.pingreq_recv_set = true;
3568 events.push(GenericEvent::RequestTimerReset {
3569 kind: TimerKind::PingreqRecv,
3570 duration_ms: self.pingreq_recv_timeout_ms,
3571 });
3572 }
3573
3574 events
3575 }
3576
3577 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3579 if self.pingreq_send_set {
3580 self.pingreq_send_set = false;
3581 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3582 }
3583 if self.pingreq_recv_set {
3584 self.pingreq_recv_set = false;
3585 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3586 }
3587 if self.pingresp_recv_set {
3588 self.pingresp_recv_set = false;
3589 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3590 }
3591 }
3592
3593 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3595 for prop in props {
3596 if let Property::TopicAlias(ta) = prop {
3597 return Some(ta.val());
3598 }
3599 }
3600 None
3601 }
3602}
3603
3604#[cfg(test)]
3607mod tests {
3608 use super::*;
3609 use crate::mqtt::connection::version::Version;
3610 use crate::mqtt::packet::TopicAliasSend;
3611 use crate::mqtt::role;
3612
3613 #[test]
3614 fn test_initialize_client_mode() {
3615 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3616
3617 connection.initialize(true);
3619
3620 assert!(connection.is_client);
3622 assert_eq!(connection.publish_send_count, 0);
3623 assert!(connection.publish_send_max.is_none());
3624 assert!(connection.publish_recv_max.is_none());
3625 assert!(!connection.need_store);
3626 }
3627
3628 #[test]
3629 fn test_initialize_server_mode() {
3630 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3631
3632 connection.initialize(false);
3634
3635 assert!(!connection.is_client);
3637 assert_eq!(connection.publish_send_count, 0);
3638 assert!(connection.publish_send_max.is_none());
3639 assert!(connection.publish_recv_max.is_none());
3640 assert!(!connection.need_store);
3641 }
3642
3643 #[test]
3644 fn test_validate_topic_alias_no_topic_alias_send() {
3645 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3646
3647 let result = connection.validate_topic_alias(Some(1));
3649 assert!(result.is_none());
3650 }
3651
3652 #[test]
3653 fn test_validate_topic_alias_none_input() {
3654 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3655
3656 let result = connection.validate_topic_alias(None);
3658 assert!(result.is_none());
3659 }
3660
3661 #[test]
3662 fn test_validate_topic_alias_range_no_topic_alias_send() {
3663 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3664
3665 let result = connection.validate_topic_alias_range(1);
3667 assert!(!result);
3668 }
3669
3670 #[test]
3671 fn test_validate_topic_alias_range_zero() {
3672 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3673
3674 let topic_alias_send = TopicAliasSend::new(10);
3676 connection.topic_alias_send = Some(topic_alias_send);
3677
3678 let result = connection.validate_topic_alias_range(0);
3680 assert!(!result);
3681 }
3682
3683 #[test]
3684 fn test_validate_topic_alias_range_over_max() {
3685 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3686
3687 let topic_alias_send = TopicAliasSend::new(5);
3689 connection.topic_alias_send = Some(topic_alias_send);
3690
3691 let result = connection.validate_topic_alias_range(6);
3693 assert!(!result);
3694 }
3695
3696 #[test]
3697 fn test_validate_topic_alias_range_valid() {
3698 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3699
3700 let topic_alias_send = TopicAliasSend::new(10);
3702 connection.topic_alias_send = Some(topic_alias_send);
3703
3704 assert!(connection.validate_topic_alias_range(1));
3706 assert!(connection.validate_topic_alias_range(5));
3707 assert!(connection.validate_topic_alias_range(10));
3708 }
3709
3710 #[test]
3711 fn test_validate_topic_alias_with_registered_alias() {
3712 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3713
3714 let mut topic_alias_send = TopicAliasSend::new(10);
3716 topic_alias_send.insert_or_update("test/topic", 5);
3717 connection.topic_alias_send = Some(topic_alias_send);
3718
3719 let result = connection.validate_topic_alias(Some(5));
3721 assert_eq!(result, Some("test/topic".to_string()));
3722 }
3723
3724 #[test]
3725 fn test_validate_topic_alias_unregistered_alias() {
3726 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3727
3728 let topic_alias_send = TopicAliasSend::new(10);
3730 connection.topic_alias_send = Some(topic_alias_send);
3731
3732 let result = connection.validate_topic_alias(Some(5));
3734 assert!(result.is_none());
3735 }
3736
3737 #[test]
3738 fn test_validate_maximum_packet_size_within_limit() {
3739 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3740
3741 let result = connection.validate_maximum_packet_size_send(1000);
3743 assert!(result);
3744 }
3745
3746 #[test]
3747 fn test_validate_maximum_packet_size_at_limit() {
3748 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3749
3750 connection.maximum_packet_size_send = 1000;
3752
3753 let result = connection.validate_maximum_packet_size_send(1000);
3755 assert!(result);
3756 }
3757
3758 #[test]
3759 fn test_validate_maximum_packet_size_over_limit() {
3760 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3761
3762 connection.maximum_packet_size_send = 1000;
3764
3765 let result = connection.validate_maximum_packet_size_send(1001);
3767 assert!(!result);
3768 }
3769
3770 #[test]
3771 fn test_validate_maximum_packet_size_zero_limit() {
3772 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3773
3774 connection.maximum_packet_size_send = 0;
3776
3777 let result = connection.validate_maximum_packet_size_send(1);
3779 assert!(!result);
3780
3781 let result = connection.validate_maximum_packet_size_send(0);
3783 assert!(result);
3784 }
3785
3786 #[test]
3787 fn test_initialize_clears_state() {
3788 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3789
3790 connection.publish_send_count = 5;
3792 connection.need_store = true;
3793 connection.pid_suback.insert(123);
3794 connection.pid_unsuback.insert(456);
3795
3796 connection.initialize(true);
3798
3799 assert_eq!(connection.publish_send_count, 0);
3801 assert!(!connection.need_store);
3802 assert!(connection.pid_suback.is_empty());
3803 assert!(connection.pid_unsuback.is_empty());
3804 assert!(connection.is_client);
3805 }
3806
3807 #[test]
3808 fn test_remaining_length_to_total_size() {
3809 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3825}