1use alloc::{
23 string::{String, ToString},
24 vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33use crate::mqtt::result_code;
34
35use serde::Serialize;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38enum ConnectionStatus {
39 #[serde(rename = "disconnected")]
40 Disconnected,
41 #[serde(rename = "connecting")]
42 Connecting,
43 #[serde(rename = "connected")]
44 Connected,
45}
46use crate::mqtt::connection::packet_builder::{
47 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
48};
49use crate::mqtt::connection::packet_id_manager::PacketIdManager;
50use crate::mqtt::connection::role;
51use crate::mqtt::connection::role::RoleType;
52use crate::mqtt::connection::sendable::Sendable;
53use crate::mqtt::connection::version::*;
54use crate::mqtt::packet::v3_1_1;
55use crate::mqtt::packet::v5_0;
56use crate::mqtt::packet::GenericPacket;
57use crate::mqtt::packet::GenericStorePacket;
58use crate::mqtt::packet::IsPacketId;
59use crate::mqtt::packet::Qos;
60use crate::mqtt::packet::ResponsePacket;
61use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
62use crate::mqtt::prelude::GenericPacketTrait;
63use crate::mqtt::result_code::{
64 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
65};
66
67const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
70
71fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
78 let remaining_length_bytes = if remaining_length < 128 {
79 1
80 } else if remaining_length < 16384 {
81 2
82 } else if remaining_length < 2097152 {
83 3
84 } else {
85 4
86 };
87
88 1 + remaining_length_bytes + remaining_length
89}
90
91pub type Event = GenericEvent<u16>;
96
97pub struct GenericConnection<Role, PacketIdType>
140where
141 Role: RoleType,
142 PacketIdType: IsPacketId,
143{
144 _marker: PhantomData<Role>,
145
146 protocol_version: Version,
147
148 pid_man: PacketIdManager<PacketIdType>,
149 pid_suback: HashSet<PacketIdType>,
150 pid_unsuback: HashSet<PacketIdType>,
151 pid_puback: HashSet<PacketIdType>,
152 pid_pubrec: HashSet<PacketIdType>,
153 pid_pubcomp: HashSet<PacketIdType>,
154
155 need_store: bool,
156 store: GenericStore<PacketIdType>,
158
159 offline_publish: bool,
160 auto_pub_response: bool,
161 auto_ping_response: bool,
162
163 auto_map_topic_alias_send: bool,
165 auto_replace_topic_alias_send: bool,
167 topic_alias_recv: Option<TopicAliasRecv>,
169 topic_alias_send: Option<TopicAliasSend>,
171
172 publish_send_max: Option<u16>,
173 publish_recv_max: Option<u16>,
175 publish_send_count: u16,
178
179 publish_recv: HashSet<PacketIdType>,
181
182 maximum_packet_size_send: u32,
184 maximum_packet_size_recv: u32,
186
187 status: ConnectionStatus,
189
190 pingreq_user_send_interval_ms: Option<u64>,
192 pingreq_keep_alive_ms: u64,
193 pingreq_server_keep_alive_ms: Option<u64>,
194
195 pingreq_recv_timeout_ms: u64,
197 pingresp_recv_timeout_ms: u64,
199
200 qos2_publish_handled: HashSet<PacketIdType>,
202
203 pingreq_send_set: bool,
205 pingreq_recv_set: bool,
206 pingresp_recv_set: bool,
207
208 packet_builder: PacketBuilder,
209 is_client: bool,
211}
212
213pub type Connection<Role> = GenericConnection<Role, u16>;
226
227impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
228where
229 Role: RoleType,
230 PacketIdType: IsPacketId,
231{
232 pub fn new(version: Version) -> Self {
255 Self {
256 _marker: PhantomData,
257 protocol_version: version,
258 pid_man: PacketIdManager::new(),
259 pid_suback: HashSet::default(),
260 pid_unsuback: HashSet::default(),
261 pid_puback: HashSet::default(),
262 pid_pubrec: HashSet::default(),
263 pid_pubcomp: HashSet::default(),
264 need_store: false,
265 store: GenericStore::new(),
266 offline_publish: false,
267 auto_pub_response: false,
268 auto_ping_response: false,
269 auto_map_topic_alias_send: false,
270 auto_replace_topic_alias_send: false,
271 topic_alias_recv: None,
272 topic_alias_send: None,
273 publish_send_max: None,
274 publish_recv_max: None,
275 publish_send_count: 0,
276 publish_recv: HashSet::default(),
277 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
278 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
279 status: ConnectionStatus::Disconnected,
280 pingreq_user_send_interval_ms: None,
281 pingreq_keep_alive_ms: 0,
282 pingreq_server_keep_alive_ms: None,
283 pingreq_recv_timeout_ms: 0,
284 pingresp_recv_timeout_ms: 0,
285 qos2_publish_handled: HashSet::default(),
286 pingreq_send_set: false,
287 pingreq_recv_set: false,
288 pingresp_recv_set: false,
289 packet_builder: PacketBuilder::new(),
290 is_client: false,
291 }
292 }
293
294 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
335 where
336 T: Sendable<Role, PacketIdType>,
337 {
338 packet.dispatch_send(self)
340 }
341
342 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
387 use core::any::TypeId;
388
389 let role_id = TypeId::of::<Role>();
390 let client_id = TypeId::of::<role::Client>();
391 let server_id = TypeId::of::<role::Server>();
392 let any_id = TypeId::of::<role::Any>();
393
394 let packet_version = packet.protocol_version();
396
397 if self.protocol_version != packet_version {
399 return vec![GenericEvent::NotifyError(MqttError::VersionMismatch)];
400 }
401
402 match packet {
403 GenericPacket::V3_1_1Connect(p) => {
405 if role_id == client_id || role_id == any_id {
406 self.process_send_v3_1_1_connect(p)
407 } else {
408 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
409 }
410 }
411 GenericPacket::V5_0Connect(p) => {
412 if role_id == client_id || role_id == any_id {
413 self.process_send_v5_0_connect(p)
414 } else {
415 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
416 }
417 }
418 GenericPacket::V3_1_1Connack(p) => {
420 if role_id == server_id || role_id == any_id {
421 self.process_send_v3_1_1_connack(p)
422 } else {
423 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
424 }
425 }
426 GenericPacket::V5_0Connack(p) => {
427 if role_id == server_id || role_id == any_id {
428 self.process_send_v5_0_connack(p)
429 } else {
430 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
431 }
432 }
433 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
435 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
436 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
438 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
439 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
440 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
441 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
442 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
443 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
444 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
445 GenericPacket::V3_1_1Subscribe(p) => {
447 if role_id == client_id || role_id == any_id {
448 self.process_send_v3_1_1_subscribe(p)
449 } else {
450 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
451 }
452 }
453 GenericPacket::V5_0Subscribe(p) => {
454 if role_id == client_id || role_id == any_id {
455 self.process_send_v5_0_subscribe(p)
456 } else {
457 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
458 }
459 }
460 GenericPacket::V3_1_1Unsubscribe(p) => {
461 if role_id == client_id || role_id == any_id {
462 self.process_send_v3_1_1_unsubscribe(p)
463 } else {
464 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
465 }
466 }
467 GenericPacket::V5_0Unsubscribe(p) => {
468 if role_id == client_id || role_id == any_id {
469 self.process_send_v5_0_unsubscribe(p)
470 } else {
471 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
472 }
473 }
474 GenericPacket::V3_1_1Suback(p) => {
476 if role_id == server_id || role_id == any_id {
477 self.process_send_v3_1_1_suback(p)
478 } else {
479 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
480 }
481 }
482 GenericPacket::V5_0Suback(p) => {
483 if role_id == server_id || role_id == any_id {
484 self.process_send_v5_0_suback(p)
485 } else {
486 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
487 }
488 }
489 GenericPacket::V3_1_1Unsuback(p) => {
490 if role_id == server_id || role_id == any_id {
491 self.process_send_v3_1_1_unsuback(p)
492 } else {
493 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
494 }
495 }
496 GenericPacket::V5_0Unsuback(p) => {
497 if role_id == server_id || role_id == any_id {
498 self.process_send_v5_0_unsuback(p)
499 } else {
500 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
501 }
502 }
503 GenericPacket::V3_1_1Pingreq(p) => {
505 if role_id == client_id || role_id == any_id {
506 self.process_send_v3_1_1_pingreq(p)
507 } else {
508 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
509 }
510 }
511 GenericPacket::V5_0Pingreq(p) => {
512 if role_id == client_id || role_id == any_id {
513 self.process_send_v5_0_pingreq(p)
514 } else {
515 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
516 }
517 }
518 GenericPacket::V3_1_1Pingresp(p) => {
520 if role_id == server_id || role_id == any_id {
521 self.process_send_v3_1_1_pingresp(p)
522 } else {
523 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
524 }
525 }
526 GenericPacket::V5_0Pingresp(p) => {
527 if role_id == server_id || role_id == any_id {
528 self.process_send_v5_0_pingresp(p)
529 } else {
530 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
531 }
532 }
533 GenericPacket::V3_1_1Disconnect(p) => {
535 if role_id == client_id || role_id == any_id {
536 self.process_send_v3_1_1_disconnect(p)
537 } else {
538 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
539 }
540 }
541 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
543 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
545 }
546 }
547
548 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
597 let mut events = Vec::new();
598
599 match self.packet_builder.feed(data) {
600 PacketBuildResult::Complete(raw_packet) => {
601 events.extend(self.process_recv_packet(raw_packet));
602 }
603 PacketBuildResult::Incomplete => {}
604 PacketBuildResult::Error(e) => {
605 self.cancel_timers(&mut events);
606 events.push(GenericEvent::RequestClose);
607 events.push(GenericEvent::NotifyError(e));
608 }
609 }
610
611 events
612 }
613
614 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
631 let mut events = Vec::new();
632
633 match kind {
634 TimerKind::PingreqSend => {
635 self.pingreq_send_set = false;
637
638 if self.status == ConnectionStatus::Connected {
640 match self.protocol_version {
641 Version::V3_1_1 => {
642 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
643 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
644 }
645 }
646 Version::V5_0 => {
647 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
648 events.extend(self.process_send_v5_0_pingreq(pingreq));
649 }
650 }
651 Version::Undetermined => {
652 unreachable!("Protocol version should be set before sending PINGREQ");
653 }
654 }
655 }
656 }
657 TimerKind::PingreqRecv => {
658 self.pingreq_recv_set = false;
660
661 match self.protocol_version {
662 Version::V3_1_1 => {
663 events.push(GenericEvent::RequestClose);
665 }
666 Version::V5_0 => {
667 if self.status == ConnectionStatus::Connected {
669 if let Ok(disconnect) = v5_0::Disconnect::builder()
670 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
671 .build()
672 {
673 events.extend(self.process_send_v5_0_disconnect(disconnect));
674 }
675 }
676 }
677 Version::Undetermined => {
678 unreachable!("Protocol version should be set before receiving PINGREQ");
679 }
680 }
681 }
682 TimerKind::PingrespRecv => {
683 self.pingresp_recv_set = false;
685
686 match self.protocol_version {
687 Version::V3_1_1 => {
688 events.push(GenericEvent::RequestClose);
690 }
691 Version::V5_0 => {
692 if self.status == ConnectionStatus::Connected {
694 if let Ok(disconnect) = v5_0::Disconnect::builder()
695 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
696 .build()
697 {
698 events.extend(self.process_send_v5_0_disconnect(disconnect));
699 }
700 }
701 }
702 Version::Undetermined => {
703 unreachable!("Protocol version should be set before receiving PINGRESP");
704 }
705 }
706 }
707 }
708
709 events
710 }
711
712 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
725 let mut events = Vec::new();
726
727 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
729 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
730
731 self.status = ConnectionStatus::Disconnected;
733
734 self.topic_alias_send = None;
736 self.topic_alias_recv = None;
737
738 for packet_id in self.pid_suback.drain() {
740 if self.pid_man.is_used_id(packet_id) {
741 self.pid_man.release_id(packet_id);
742 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
743 }
744 }
745
746 for packet_id in self.pid_unsuback.drain() {
748 if self.pid_man.is_used_id(packet_id) {
749 self.pid_man.release_id(packet_id);
750 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
751 }
752 }
753
754 if !self.need_store {
756 self.qos2_publish_handled.clear();
757
758 for packet_id in self.pid_puback.drain() {
760 if self.pid_man.is_used_id(packet_id) {
761 self.pid_man.release_id(packet_id);
762 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
763 }
764 }
765
766 for packet_id in self.pid_pubrec.drain() {
768 if self.pid_man.is_used_id(packet_id) {
769 self.pid_man.release_id(packet_id);
770 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
771 }
772 }
773
774 for packet_id in self.pid_pubcomp.drain() {
776 if self.pid_man.is_used_id(packet_id) {
777 self.pid_man.release_id(packet_id);
778 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
779 }
780 }
781 }
782
783 self.cancel_timers(&mut events);
785
786 events
787 }
788
789 pub fn set_pingreq_send_interval(
811 &mut self,
812 duration_ms: Option<u64>,
813 ) -> Vec<GenericEvent<PacketIdType>> {
814 let mut events = Vec::new();
815 self.pingreq_user_send_interval_ms = duration_ms;
816 if let Some(ms) = duration_ms {
817 if ms == 0 {
818 if self.pingreq_send_set {
819 self.pingreq_send_set = false;
820 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
821 }
822 } else if self.status == ConnectionStatus::Connected {
823 self.pingreq_send_set = true;
824 events.push(GenericEvent::RequestTimerReset {
825 kind: TimerKind::PingreqSend,
826 duration_ms: ms,
827 });
828 }
829 }
830 events
831 }
832
833 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
842 self.publish_send_max
844 .map(|max| max.saturating_sub(self.publish_send_count))
845 }
846
847 pub fn set_offline_publish(&mut self, enable: bool) {
856 self.offline_publish = enable;
857 if self.offline_publish {
858 self.need_store = true;
859 }
860 }
861
862 pub fn set_auto_pub_response(&mut self, enable: bool) {
871 self.auto_pub_response = enable;
872 }
873
874 pub fn set_auto_ping_response(&mut self, enable: bool) {
882 self.auto_ping_response = enable;
883 }
884
885 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
897 self.auto_map_topic_alias_send = enable;
898 }
899
900 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
910 self.auto_replace_topic_alias_send = enable;
911 }
912
913 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: u64) {
922 self.pingresp_recv_timeout_ms = timeout_ms;
923 }
924
925 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
931 self.pid_man.acquire_unique_id()
932 }
933
934 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
947 self.pid_man.register_id(packet_id)
948 }
949
950 pub fn release_packet_id(
964 &mut self,
965 packet_id: PacketIdType,
966 ) -> Vec<GenericEvent<PacketIdType>> {
967 let mut events = Vec::new();
968
969 if self.pid_man.is_used_id(packet_id) {
970 self.pid_man.release_id(packet_id);
971 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
972 }
973
974 events
975 }
976
977 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
986 self.qos2_publish_handled.clone()
987 }
988
989 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
998 self.qos2_publish_handled = pids;
999 }
1000
1001 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
1010 for packet in packets {
1011 match &packet {
1012 GenericStorePacket::V3_1_1Publish(p) => {
1013 match p.qos() {
1015 Qos::AtLeastOnce => {
1016 self.pid_puback.insert(p.packet_id().unwrap());
1017 }
1018 Qos::ExactlyOnce => {
1019 self.pid_pubrec.insert(p.packet_id().unwrap());
1020 }
1021 _ => {
1022 warn!("QoS 0 packet found in store, skipping");
1024 continue;
1025 }
1026 }
1027 let packet_id = p.packet_id().unwrap();
1029 if self.pid_man.register_id(packet_id).is_ok() {
1030 if let Err(_e) = self.store.add(packet) {
1031 error!("Failed to add packet to store: {:?}", _e);
1032 }
1033 } else {
1034 error!("Packet ID {} has already been used. Skip it", packet_id);
1035 }
1036 }
1037 GenericStorePacket::V5_0Publish(p) => {
1038 match p.qos() {
1040 Qos::AtLeastOnce => {
1041 self.pid_puback.insert(p.packet_id().unwrap());
1042 }
1043 Qos::ExactlyOnce => {
1044 self.pid_pubrec.insert(p.packet_id().unwrap());
1045 }
1046 _ => {
1047 warn!("QoS 0 packet found in store, skipping");
1049 continue;
1050 }
1051 }
1052 let packet_id = p.packet_id().unwrap();
1054 if self.pid_man.register_id(packet_id).is_ok() {
1055 if let Err(_e) = self.store.add(packet) {
1056 error!("Failed to add packet to store: {:?}", _e);
1057 }
1058 } else {
1059 error!("Packet ID {} has already been used. Skip it", packet_id);
1060 }
1061 }
1062 GenericStorePacket::V3_1_1Pubrel(p) => {
1063 self.pid_pubcomp.insert(p.packet_id());
1065 let packet_id = p.packet_id();
1067 if self.pid_man.register_id(packet_id).is_ok() {
1068 if let Err(_e) = self.store.add(packet) {
1069 error!("Failed to add packet to store: {:?}", _e);
1070 }
1071 } else {
1072 error!("Packet ID {} has already been used. Skip it", packet_id);
1073 }
1074 }
1075 GenericStorePacket::V5_0Pubrel(p) => {
1076 self.pid_pubcomp.insert(p.packet_id());
1078 let packet_id = p.packet_id();
1080 if self.pid_man.register_id(packet_id).is_ok() {
1081 if let Err(_e) = self.store.add(packet) {
1082 error!("Failed to add packet to store: {:?}", _e);
1083 }
1084 } else {
1085 error!("Packet ID {} has already been used. Skip it", packet_id);
1086 }
1087 }
1088 }
1089 }
1090 }
1091
1092 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1101 self.store.get_stored()
1102 }
1103
1104 pub fn get_protocol_version(&self) -> Version {
1110 self.protocol_version
1111 }
1112
1113 pub fn regulate_for_store(
1118 &self,
1119 mut packet: v5_0::GenericPublish<PacketIdType>,
1120 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1121 if packet.topic_name().is_empty() {
1122 if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1124 if let Some(ref topic_alias_send) = self.topic_alias_send {
1125 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1126 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1128 } else {
1129 return Err(MqttError::PacketNotRegulated);
1130 }
1131 } else {
1132 return Err(MqttError::PacketNotRegulated);
1133 }
1134 } else {
1135 return Err(MqttError::PacketNotRegulated);
1136 }
1137 } else {
1138 packet = packet.remove_topic_alias();
1140 }
1141
1142 Ok(packet)
1143 }
1144
1145 fn initialize(&mut self, is_client: bool) {
1159 self.publish_send_max = None;
1160 self.publish_recv_max = None;
1161 self.publish_send_count = 0;
1162 self.topic_alias_send = None;
1163 self.topic_alias_recv = None;
1164 self.publish_recv.clear();
1165 self.need_store = false;
1166 self.pid_suback.clear();
1167 self.pid_unsuback.clear();
1168 self.is_client = is_client;
1169 self.pingreq_keep_alive_ms = 0;
1170 self.pingreq_server_keep_alive_ms = None;
1171 }
1172
1173 fn clear_store_related(&mut self) {
1174 self.pid_man.clear();
1175 self.pid_puback.clear();
1176 self.pid_pubrec.clear();
1177 self.pid_pubcomp.clear();
1178 self.store.clear();
1179 }
1180
1181 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1183 let mut events = Vec::new();
1184 self.store.for_each(|packet| {
1185 if packet.size() > self.maximum_packet_size_send as usize {
1186 let packet_id = packet.packet_id();
1187 self.pid_man.release_id(packet_id);
1188 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1189 return false; }
1191 events.push(GenericEvent::RequestSendPacket {
1192 packet: packet.clone().into(),
1193 release_packet_id_if_send_error: None,
1194 });
1195 true });
1197
1198 events
1199 }
1200
1201 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1213 let topic_alias = topic_alias_opt?;
1214
1215 if !self.validate_topic_alias_range(topic_alias) {
1216 return None;
1217 }
1218
1219 let topic_alias_send = self.topic_alias_send.as_mut()?;
1220 let topic = topic_alias_send.get(topic_alias)?;
1222
1223 Some(topic.to_string())
1224 }
1225
1226 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1238 let topic_alias_send = match &self.topic_alias_send {
1239 Some(tas) => tas,
1240 None => {
1241 error!("topic_alias is set but topic_alias_maximum is 0");
1242 return false;
1243 }
1244 };
1245
1246 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1247 error!("topic_alias is set but out of range");
1248 return false;
1249 }
1250
1251 true
1252 }
1253
1254 pub(crate) fn process_send_v3_1_1_connect(
1256 &mut self,
1257 packet: v3_1_1::Connect,
1258 ) -> Vec<GenericEvent<PacketIdType>> {
1259 info!("send connect v3.1.1: {packet}");
1260
1261 if self.status != ConnectionStatus::Disconnected {
1262 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1263 }
1264
1265 let mut events = Vec::new();
1266 self.initialize(true);
1267 self.status = ConnectionStatus::Connecting;
1268
1269 self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1270
1271 if packet.clean_start() {
1273 self.clear_store_related();
1274 } else {
1275 self.need_store = true;
1276 }
1277
1278 self.topic_alias_send = None;
1280
1281 events.push(GenericEvent::RequestSendPacket {
1282 packet: packet.into(),
1283 release_packet_id_if_send_error: None,
1284 });
1285 self.send_post_process(&mut events);
1286
1287 events
1288 }
1289
1290 pub(crate) fn process_send_v5_0_connect(
1292 &mut self,
1293 packet: v5_0::Connect,
1294 ) -> Vec<GenericEvent<PacketIdType>> {
1295 info!("send connect v5.0: {packet}");
1296 if !self.validate_maximum_packet_size_send(packet.size()) {
1297 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1298 }
1299 if self.status != ConnectionStatus::Disconnected {
1300 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1301 }
1302
1303 let mut events = Vec::new();
1304 self.initialize(true);
1305 self.status = ConnectionStatus::Connecting;
1306
1307 self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1308
1309 if packet.clean_start() {
1311 self.clear_store_related();
1312 }
1313
1314 for prop in packet.props() {
1316 match prop {
1317 Property::TopicAliasMaximum(val) => {
1318 if val.val() != 0 {
1319 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1320 }
1321 }
1322 Property::ReceiveMaximum(val) => {
1323 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1324 self.publish_recv_max = Some(val.val());
1325 }
1326 Property::MaximumPacketSize(val) => {
1327 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1328 self.maximum_packet_size_recv = val.val();
1329 }
1330 Property::SessionExpiryInterval(val) => {
1331 if val.val() != 0 {
1332 self.need_store = true;
1333 }
1334 }
1335 _ => {
1336 }
1338 }
1339 }
1340
1341 events.push(GenericEvent::RequestSendPacket {
1342 packet: packet.into(),
1343 release_packet_id_if_send_error: None,
1344 });
1345 self.send_post_process(&mut events);
1346
1347 events
1348 }
1349
1350 pub(crate) fn process_send_v3_1_1_connack(
1351 &mut self,
1352 packet: v3_1_1::Connack,
1353 ) -> Vec<GenericEvent<PacketIdType>> {
1354 info!("send connack v3.1.1: {packet}");
1355 if self.status != ConnectionStatus::Connecting {
1356 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1357 }
1358 let mut events = Vec::new();
1359 let rc = packet.return_code();
1360 events.push(GenericEvent::RequestSendPacket {
1361 packet: packet.into(),
1362 release_packet_id_if_send_error: None,
1363 });
1364 if rc != ConnectReturnCode::Accepted {
1365 self.status = ConnectionStatus::Disconnected;
1366 self.cancel_timers(&mut events);
1367 events.push(GenericEvent::RequestClose);
1368 return events;
1369 }
1370
1371 self.status = ConnectionStatus::Connected;
1372 events.extend(self.send_stored());
1373 self.send_post_process(&mut events);
1374
1375 events
1376 }
1377
1378 pub(crate) fn process_send_v5_0_connack(
1379 &mut self,
1380 packet: v5_0::Connack,
1381 ) -> Vec<GenericEvent<PacketIdType>> {
1382 info!("send connack v5.0: {packet}");
1383 if !self.validate_maximum_packet_size_send(packet.size()) {
1384 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1385 }
1386 if self.status != ConnectionStatus::Connecting {
1387 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1388 }
1389
1390 let mut events = Vec::new();
1391 let rc = packet.reason_code();
1392 if rc == ConnectReasonCode::Success {
1393 for prop in packet.props() {
1395 match prop {
1396 Property::TopicAliasMaximum(val) => {
1397 if val.val() != 0 {
1398 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1399 }
1400 }
1401 Property::ReceiveMaximum(val) => {
1402 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1403 self.publish_recv_max = Some(val.val());
1404 }
1405 Property::MaximumPacketSize(val) => {
1406 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1407 self.maximum_packet_size_recv = val.val();
1408 }
1409 Property::ServerKeepAlive(val) => {
1410 let val = val.val();
1411 if val == 0 {
1412 if self.pingreq_recv_set {
1413 self.pingreq_recv_set = false;
1414 events
1415 .push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
1416 }
1417 self.pingreq_recv_timeout_ms = 0;
1418 } else {
1419 self.pingreq_recv_timeout_ms = val as u64 * 1000 * 3 / 2;
1420 self.pingreq_recv_set = true;
1421 events.push(GenericEvent::RequestTimerReset {
1422 kind: TimerKind::PingreqRecv,
1423 duration_ms: self.pingreq_recv_timeout_ms,
1424 });
1425 }
1426 }
1427 _ => {
1428 }
1430 }
1431 }
1432 }
1433 events.push(GenericEvent::RequestSendPacket {
1434 packet: packet.into(),
1435 release_packet_id_if_send_error: None,
1436 });
1437
1438 if rc != ConnectReasonCode::Success {
1439 self.status = ConnectionStatus::Disconnected;
1440 self.cancel_timers(&mut events);
1441 events.push(GenericEvent::RequestClose);
1442 return events;
1443 }
1444
1445 self.status = ConnectionStatus::Connected;
1446
1447 events.extend(self.send_stored());
1448 self.send_post_process(&mut events);
1449
1450 events
1451 }
1452
1453 pub(crate) fn process_send_v3_1_1_publish(
1454 &mut self,
1455 packet: v3_1_1::GenericPublish<PacketIdType>,
1456 ) -> Vec<GenericEvent<PacketIdType>> {
1457 let mut events = Vec::new();
1458 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1459
1460 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1461 let packet_id = packet.packet_id().unwrap();
1463 if self.status != ConnectionStatus::Connected
1464 && !self.need_store
1465 && !self.offline_publish
1466 {
1467 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1468 if self.pid_man.is_used_id(packet_id) {
1469 self.pid_man.release_id(packet_id);
1470 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1471 }
1472 return events;
1473 }
1474 if !self.pid_man.is_used_id(packet_id) {
1475 error!("packet_id {packet_id} must be acquired or registered");
1476 events.push(GenericEvent::NotifyError(
1477 MqttError::PacketIdentifierInvalid,
1478 ));
1479 return events;
1480 }
1481 if self.need_store
1482 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1483 {
1484 let store_packet = packet.clone().set_dup(true);
1485 self.store.add(store_packet.try_into().unwrap()).unwrap();
1486 } else {
1487 release_packet_id_if_send_error = Some(packet_id);
1488 }
1489 if packet.qos() == Qos::ExactlyOnce {
1490 self.pid_pubrec.insert(packet_id);
1491 } else {
1492 self.pid_puback.insert(packet_id);
1493 }
1494 } else if self.status != ConnectionStatus::Connected {
1495 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1496 return events;
1497 }
1498
1499 if self.status == ConnectionStatus::Connected {
1500 events.push(GenericEvent::RequestSendPacket {
1501 packet: packet.into(),
1502 release_packet_id_if_send_error,
1503 });
1504 self.send_post_process(&mut events);
1505 }
1506
1507 events
1508 }
1509
1510 pub(crate) fn process_send_v5_0_publish(
1511 &mut self,
1512 mut packet: v5_0::GenericPublish<PacketIdType>,
1513 ) -> Vec<GenericEvent<PacketIdType>> {
1514 if !self.validate_maximum_packet_size_send(packet.size()) {
1515 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1516 }
1517
1518 let mut events = Vec::new();
1519 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1520 let mut topic_alias_validated = false;
1521 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1522 let packet_id = packet.packet_id().unwrap();
1523 if self.status != ConnectionStatus::Connected
1524 && !self.need_store
1525 && !self.offline_publish
1526 {
1527 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1528 if self.pid_man.is_used_id(packet_id) {
1529 self.pid_man.release_id(packet_id);
1530 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1531 }
1532 return events;
1533 }
1534
1535 if !self.pid_man.is_used_id(packet_id) {
1537 error!("packet_id {packet_id} must be acquired or registered");
1538 events.push(GenericEvent::NotifyError(
1539 MqttError::PacketIdentifierInvalid,
1540 ));
1541 return events;
1542 }
1543
1544 if self.need_store
1545 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1546 {
1547 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1548 if packet.topic_name().is_empty() {
1549 let topic_opt = self.validate_topic_alias(ta_opt);
1551 if topic_opt.is_none() {
1552 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1553 if self.pid_man.is_used_id(packet_id) {
1554 self.pid_man.release_id(packet_id);
1555 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1556 }
1557 return events;
1558 }
1559 topic_alias_validated = true;
1560 let store_packet = packet
1561 .clone()
1562 .remove_topic_alias_add_topic(topic_opt.unwrap())
1563 .unwrap()
1564 .set_dup(true);
1565 self.store.add(store_packet.try_into().unwrap()).unwrap();
1566 } else {
1567 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1569 self.store.add(store_packet.try_into().unwrap()).unwrap();
1570 }
1571 } else {
1572 release_packet_id_if_send_error = Some(packet_id);
1573 }
1574 if packet.qos() == Qos::ExactlyOnce {
1575 self.pid_pubrec.insert(packet_id);
1576 } else {
1577 self.pid_puback.insert(packet_id);
1578 }
1579 } else if self.status != ConnectionStatus::Connected {
1580 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1581 return events;
1582 }
1583
1584 let packet_id_opt = packet.packet_id();
1585 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1586 if packet.topic_name().is_empty() {
1587 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1589 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1590 if let Some(packet_id) = packet_id_opt {
1591 if self.pid_man.is_used_id(packet_id) {
1592 self.pid_man.release_id(packet_id);
1593 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1594 }
1595 }
1596 return events;
1597 }
1598 } else if let Some(ta) = ta_opt {
1599 if self.validate_topic_alias_range(ta) {
1601 trace!(
1602 "topic alias: {} - {} is registered.",
1603 packet.topic_name(),
1604 ta
1605 );
1606 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1607 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1608 }
1609 } else {
1610 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1611 if let Some(packet_id) = packet_id_opt {
1612 if self.pid_man.is_used_id(packet_id) {
1613 self.pid_man.release_id(packet_id);
1614 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1615 }
1616 }
1617 return events;
1618 }
1619 } else if self.status == ConnectionStatus::Connected {
1620 if self.auto_map_topic_alias_send {
1622 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1623 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1624 trace!(
1625 "topic alias: {} - {} is found.",
1626 packet.topic_name(),
1627 found_ta
1628 );
1629 packet = packet.remove_topic_add_topic_alias(found_ta);
1630 } else {
1631 let lru_ta = topic_alias_send.get_lru_alias();
1632 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1633 packet = packet.remove_topic_add_topic_alias(lru_ta);
1634 }
1635 }
1636 } else if self.auto_replace_topic_alias_send {
1637 if let Some(ref topic_alias_send) = self.topic_alias_send {
1638 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1639 trace!(
1640 "topic alias: {} - {} is found.",
1641 packet.topic_name(),
1642 found_ta
1643 );
1644 packet = packet.remove_topic_add_topic_alias(found_ta);
1645 }
1646 }
1647 }
1648 }
1649
1650 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1652 if let Some(max) = self.publish_send_max {
1653 if self.publish_send_count == max {
1654 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1655 if let Some(packet_id) = packet_id_opt {
1656 if self.pid_man.is_used_id(packet_id) {
1657 self.pid_man.release_id(packet_id);
1658 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1659 }
1660 }
1661 return events;
1662 }
1663 self.publish_send_count += 1;
1664 }
1665 }
1666
1667 if self.status == ConnectionStatus::Connected {
1668 events.push(GenericEvent::RequestSendPacket {
1669 packet: packet.into(),
1670 release_packet_id_if_send_error,
1671 });
1672 self.send_post_process(&mut events);
1673 }
1674
1675 events
1676 }
1677
1678 pub(crate) fn process_send_v3_1_1_puback(
1679 &mut self,
1680 packet: v3_1_1::GenericPuback<PacketIdType>,
1681 ) -> Vec<GenericEvent<PacketIdType>> {
1682 if self.status != ConnectionStatus::Connected {
1683 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1684 }
1685 let mut events = Vec::new();
1686
1687 events.push(GenericEvent::RequestSendPacket {
1688 packet: packet.into(),
1689 release_packet_id_if_send_error: None,
1690 });
1691 self.send_post_process(&mut events);
1692
1693 events
1694 }
1695
1696 pub(crate) fn process_send_v5_0_puback(
1697 &mut self,
1698 packet: v5_0::GenericPuback<PacketIdType>,
1699 ) -> Vec<GenericEvent<PacketIdType>> {
1700 if !self.validate_maximum_packet_size_send(packet.size()) {
1701 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1702 }
1703 if self.status != ConnectionStatus::Connected {
1704 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1705 }
1706
1707 let mut events = Vec::new();
1708 self.publish_recv.remove(&packet.packet_id());
1709
1710 events.push(GenericEvent::RequestSendPacket {
1711 packet: packet.into(),
1712 release_packet_id_if_send_error: None,
1713 });
1714 self.send_post_process(&mut events);
1715
1716 events
1717 }
1718
1719 pub(crate) fn process_send_v3_1_1_pubrec(
1720 &mut self,
1721 packet: v3_1_1::GenericPubrec<PacketIdType>,
1722 ) -> Vec<GenericEvent<PacketIdType>> {
1723 if self.status != ConnectionStatus::Connected {
1724 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1725 }
1726 let mut events = Vec::new();
1727
1728 events.push(GenericEvent::RequestSendPacket {
1729 packet: packet.into(),
1730 release_packet_id_if_send_error: None,
1731 });
1732 self.send_post_process(&mut events);
1733
1734 events
1735 }
1736
1737 pub(crate) fn process_send_v5_0_pubrec(
1738 &mut self,
1739 packet: v5_0::GenericPubrec<PacketIdType>,
1740 ) -> Vec<GenericEvent<PacketIdType>> {
1741 if !self.validate_maximum_packet_size_send(packet.size()) {
1742 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1743 }
1744 if self.status != ConnectionStatus::Connected {
1745 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1746 }
1747
1748 let mut events = Vec::new();
1749 let packet_id = packet.packet_id();
1750
1751 if let Some(rc) = packet.reason_code() {
1752 if rc.is_failure() {
1753 self.publish_recv.remove(&packet_id);
1754 self.qos2_publish_handled.remove(&packet_id);
1755 }
1756 }
1757
1758 events.push(GenericEvent::RequestSendPacket {
1759 packet: packet.into(),
1760 release_packet_id_if_send_error: None,
1761 });
1762 self.send_post_process(&mut events);
1763
1764 events
1765 }
1766
1767 pub(crate) fn process_send_v3_1_1_pubrel(
1768 &mut self,
1769 packet: v3_1_1::GenericPubrel<PacketIdType>,
1770 ) -> Vec<GenericEvent<PacketIdType>> {
1771 if self.status != ConnectionStatus::Connected && !self.need_store {
1772 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1773 }
1774 let mut events = Vec::new();
1775 let packet_id = packet.packet_id();
1776 if !self.pid_man.is_used_id(packet_id) {
1777 error!("packet_id {packet_id} must be acquired or registered");
1778 events.push(GenericEvent::NotifyError(
1779 MqttError::PacketIdentifierInvalid,
1780 ));
1781 return events;
1782 }
1783 if self.need_store {
1784 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1785 }
1786
1787 if self.status == ConnectionStatus::Connected {
1788 self.pid_pubcomp.insert(packet_id);
1789 events.push(GenericEvent::RequestSendPacket {
1790 packet: packet.into(),
1791 release_packet_id_if_send_error: None,
1792 });
1793 }
1794 self.send_post_process(&mut events);
1795
1796 events
1797 }
1798
1799 pub(crate) fn process_send_v5_0_pubrel(
1800 &mut self,
1801 packet: v5_0::GenericPubrel<PacketIdType>,
1802 ) -> Vec<GenericEvent<PacketIdType>> {
1803 if !self.validate_maximum_packet_size_send(packet.size()) {
1804 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1805 }
1806 if self.status != ConnectionStatus::Connected && !self.need_store {
1807 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1808 }
1809
1810 let mut events = Vec::new();
1811 let packet_id = packet.packet_id();
1812 if !self.pid_man.is_used_id(packet_id) {
1813 error!("packet_id {packet_id} must be acquired or registered");
1814 events.push(GenericEvent::NotifyError(
1815 MqttError::PacketIdentifierInvalid,
1816 ));
1817 return events;
1818 }
1819 if self.need_store {
1820 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1821 }
1822
1823 if self.status == ConnectionStatus::Connected {
1824 self.pid_pubcomp.insert(packet_id);
1825 events.push(GenericEvent::RequestSendPacket {
1826 packet: packet.into(),
1827 release_packet_id_if_send_error: None,
1828 });
1829 }
1830 self.send_post_process(&mut events);
1831
1832 events
1833 }
1834
1835 pub(crate) fn process_send_v3_1_1_pubcomp(
1836 &mut self,
1837 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1838 ) -> Vec<GenericEvent<PacketIdType>> {
1839 if self.status != ConnectionStatus::Connected {
1840 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1841 }
1842 let mut events = Vec::new();
1843
1844 events.push(GenericEvent::RequestSendPacket {
1845 packet: packet.into(),
1846 release_packet_id_if_send_error: None,
1847 });
1848 self.send_post_process(&mut events);
1849
1850 events
1851 }
1852
1853 pub(crate) fn process_send_v5_0_pubcomp(
1854 &mut self,
1855 packet: v5_0::GenericPubcomp<PacketIdType>,
1856 ) -> Vec<GenericEvent<PacketIdType>> {
1857 if !self.validate_maximum_packet_size_send(packet.size()) {
1858 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1859 }
1860 if self.status != ConnectionStatus::Connected {
1861 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1862 }
1863
1864 let mut events = Vec::new();
1865 self.publish_recv.remove(&packet.packet_id());
1866
1867 events.push(GenericEvent::RequestSendPacket {
1868 packet: packet.into(),
1869 release_packet_id_if_send_error: None,
1870 });
1871 self.send_post_process(&mut events);
1872
1873 events
1874 }
1875
1876 pub(crate) fn process_send_v3_1_1_subscribe(
1877 &mut self,
1878 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1879 ) -> Vec<GenericEvent<PacketIdType>> {
1880 let mut events = Vec::new();
1881 let packet_id = packet.packet_id();
1882 if self.status != ConnectionStatus::Connected {
1883 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1884 if self.pid_man.is_used_id(packet_id) {
1885 self.pid_man.release_id(packet_id);
1886 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1887 }
1888 return events;
1889 }
1890 if !self.pid_man.is_used_id(packet_id) {
1891 error!("packet_id {packet_id} must be acquired or registered");
1892 events.push(GenericEvent::NotifyError(
1893 MqttError::PacketIdentifierInvalid,
1894 ));
1895 return events;
1896 }
1897 self.pid_suback.insert(packet_id);
1898
1899 events.push(GenericEvent::RequestSendPacket {
1900 packet: packet.into(),
1901 release_packet_id_if_send_error: Some(packet_id),
1902 });
1903 self.send_post_process(&mut events);
1904
1905 events
1906 }
1907
1908 pub(crate) fn process_send_v5_0_subscribe(
1909 &mut self,
1910 packet: v5_0::GenericSubscribe<PacketIdType>,
1911 ) -> Vec<GenericEvent<PacketIdType>> {
1912 if !self.validate_maximum_packet_size_send(packet.size()) {
1913 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1914 }
1915
1916 let mut events = Vec::new();
1917 let packet_id = packet.packet_id();
1918 if self.status != ConnectionStatus::Connected {
1919 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1920 if self.pid_man.is_used_id(packet_id) {
1921 self.pid_man.release_id(packet_id);
1922 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1923 }
1924 return events;
1925 }
1926 if !self.pid_man.is_used_id(packet_id) {
1927 error!("packet_id {packet_id} must be acquired or registered");
1928 events.push(GenericEvent::NotifyError(
1929 MqttError::PacketIdentifierInvalid,
1930 ));
1931 return events;
1932 }
1933 self.pid_suback.insert(packet_id);
1934
1935 events.push(GenericEvent::RequestSendPacket {
1936 packet: packet.into(),
1937 release_packet_id_if_send_error: Some(packet_id),
1938 });
1939 self.send_post_process(&mut events);
1940
1941 events
1942 }
1943
1944 pub(crate) fn process_send_v3_1_1_suback(
1945 &mut self,
1946 packet: v3_1_1::GenericSuback<PacketIdType>,
1947 ) -> Vec<GenericEvent<PacketIdType>> {
1948 if self.status != ConnectionStatus::Connected {
1949 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1950 }
1951 let mut events = Vec::new();
1952 events.push(GenericEvent::RequestSendPacket {
1953 packet: packet.into(),
1954 release_packet_id_if_send_error: None,
1955 });
1956 self.send_post_process(&mut events);
1957
1958 events
1959 }
1960
1961 pub(crate) fn process_send_v5_0_suback(
1962 &mut self,
1963 packet: v5_0::GenericSuback<PacketIdType>,
1964 ) -> Vec<GenericEvent<PacketIdType>> {
1965 if !self.validate_maximum_packet_size_send(packet.size()) {
1966 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1967 }
1968 if self.status != ConnectionStatus::Connected {
1969 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1970 }
1971
1972 let mut events = Vec::new();
1973 events.push(GenericEvent::RequestSendPacket {
1974 packet: packet.into(),
1975 release_packet_id_if_send_error: None,
1976 });
1977 self.send_post_process(&mut events);
1978
1979 events
1980 }
1981
1982 pub(crate) fn process_send_v3_1_1_unsubscribe(
1983 &mut self,
1984 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1985 ) -> Vec<GenericEvent<PacketIdType>> {
1986 let mut events = Vec::new();
1987 let packet_id = packet.packet_id();
1988 if self.status != ConnectionStatus::Connected {
1989 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1990 if self.pid_man.is_used_id(packet_id) {
1991 self.pid_man.release_id(packet_id);
1992 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1993 }
1994 return events;
1995 }
1996 if !self.pid_man.is_used_id(packet_id) {
1997 error!("packet_id {packet_id} must be acquired or registered");
1998 events.push(GenericEvent::NotifyError(
1999 MqttError::PacketIdentifierInvalid,
2000 ));
2001 return events;
2002 }
2003 self.pid_unsuback.insert(packet_id);
2004
2005 events.push(GenericEvent::RequestSendPacket {
2006 packet: packet.into(),
2007 release_packet_id_if_send_error: Some(packet_id),
2008 });
2009 self.send_post_process(&mut events);
2010
2011 events
2012 }
2013
2014 pub(crate) fn process_send_v5_0_unsubscribe(
2015 &mut self,
2016 packet: v5_0::GenericUnsubscribe<PacketIdType>,
2017 ) -> Vec<GenericEvent<PacketIdType>> {
2018 if !self.validate_maximum_packet_size_send(packet.size()) {
2019 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2020 }
2021
2022 let mut events = Vec::new();
2023 let packet_id = packet.packet_id();
2024 if self.status != ConnectionStatus::Connected {
2025 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2026 if self.pid_man.is_used_id(packet_id) {
2027 self.pid_man.release_id(packet_id);
2028 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2029 }
2030 return events;
2031 }
2032 if !self.pid_man.is_used_id(packet_id) {
2033 error!("packet_id {packet_id} must be acquired or registered");
2034 events.push(GenericEvent::NotifyError(
2035 MqttError::PacketIdentifierInvalid,
2036 ));
2037 return events;
2038 }
2039 self.pid_unsuback.insert(packet_id);
2040
2041 events.push(GenericEvent::RequestSendPacket {
2042 packet: packet.into(),
2043 release_packet_id_if_send_error: Some(packet_id),
2044 });
2045 self.send_post_process(&mut events);
2046
2047 events
2048 }
2049
2050 pub(crate) fn process_send_v3_1_1_unsuback(
2051 &mut self,
2052 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2053 ) -> Vec<GenericEvent<PacketIdType>> {
2054 if self.status != ConnectionStatus::Connected {
2055 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2056 }
2057 let mut events = Vec::new();
2058 events.push(GenericEvent::RequestSendPacket {
2059 packet: packet.into(),
2060 release_packet_id_if_send_error: None,
2061 });
2062 self.send_post_process(&mut events);
2063
2064 events
2065 }
2066
2067 pub(crate) fn process_send_v5_0_unsuback(
2068 &mut self,
2069 packet: v5_0::GenericUnsuback<PacketIdType>,
2070 ) -> Vec<GenericEvent<PacketIdType>> {
2071 if !self.validate_maximum_packet_size_send(packet.size()) {
2072 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2073 }
2074 if self.status != ConnectionStatus::Connected {
2075 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076 }
2077
2078 let mut events = Vec::new();
2079 events.push(GenericEvent::RequestSendPacket {
2080 packet: packet.into(),
2081 release_packet_id_if_send_error: None,
2082 });
2083 self.send_post_process(&mut events);
2084
2085 events
2086 }
2087
2088 pub(crate) fn process_send_v3_1_1_pingreq(
2089 &mut self,
2090 packet: v3_1_1::Pingreq,
2091 ) -> Vec<GenericEvent<PacketIdType>> {
2092 if self.status != ConnectionStatus::Connected {
2093 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2094 }
2095 let mut events = Vec::new();
2096 events.push(GenericEvent::RequestSendPacket {
2097 packet: packet.into(),
2098 release_packet_id_if_send_error: None,
2099 });
2100 if self.pingresp_recv_timeout_ms != 0 {
2101 self.pingresp_recv_set = true;
2102 events.push(GenericEvent::RequestTimerReset {
2103 kind: TimerKind::PingrespRecv,
2104 duration_ms: self.pingresp_recv_timeout_ms,
2105 });
2106 }
2107 self.send_post_process(&mut events);
2108
2109 events
2110 }
2111
2112 pub(crate) fn process_send_v5_0_pingreq(
2113 &mut self,
2114 packet: v5_0::Pingreq,
2115 ) -> Vec<GenericEvent<PacketIdType>> {
2116 if !self.validate_maximum_packet_size_send(packet.size()) {
2117 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2118 }
2119 if self.status != ConnectionStatus::Connected {
2120 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2121 }
2122
2123 let mut events = Vec::new();
2124 events.push(GenericEvent::RequestSendPacket {
2125 packet: packet.into(),
2126 release_packet_id_if_send_error: None,
2127 });
2128 if self.pingresp_recv_timeout_ms != 0 {
2129 self.pingresp_recv_set = true;
2130 events.push(GenericEvent::RequestTimerReset {
2131 kind: TimerKind::PingrespRecv,
2132 duration_ms: self.pingresp_recv_timeout_ms,
2133 });
2134 }
2135 self.send_post_process(&mut events);
2136
2137 events
2138 }
2139
2140 pub(crate) fn process_send_v3_1_1_pingresp(
2141 &mut self,
2142 packet: v3_1_1::Pingresp,
2143 ) -> Vec<GenericEvent<PacketIdType>> {
2144 if self.status != ConnectionStatus::Connected {
2145 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2146 }
2147 let mut events = Vec::new();
2148 events.push(GenericEvent::RequestSendPacket {
2149 packet: packet.into(),
2150 release_packet_id_if_send_error: None,
2151 });
2152 self.send_post_process(&mut events);
2153
2154 events
2155 }
2156
2157 pub(crate) fn process_send_v5_0_pingresp(
2158 &mut self,
2159 packet: v5_0::Pingresp,
2160 ) -> Vec<GenericEvent<PacketIdType>> {
2161 if !self.validate_maximum_packet_size_send(packet.size()) {
2162 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2163 }
2164 if self.status != ConnectionStatus::Connected {
2165 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166 }
2167
2168 let mut events = Vec::new();
2169 events.push(GenericEvent::RequestSendPacket {
2170 packet: packet.into(),
2171 release_packet_id_if_send_error: None,
2172 });
2173 self.send_post_process(&mut events);
2174
2175 events
2176 }
2177
2178 pub(crate) fn process_send_v3_1_1_disconnect(
2179 &mut self,
2180 packet: v3_1_1::Disconnect,
2181 ) -> Vec<GenericEvent<PacketIdType>> {
2182 if self.status != ConnectionStatus::Connected {
2183 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2184 }
2185 let mut events = Vec::new();
2186 self.status = ConnectionStatus::Disconnected;
2187 self.cancel_timers(&mut events);
2188 events.push(GenericEvent::RequestSendPacket {
2189 packet: packet.into(),
2190 release_packet_id_if_send_error: None,
2191 });
2192 events.push(GenericEvent::RequestClose);
2193
2194 events
2195 }
2196
2197 pub(crate) fn process_send_v5_0_disconnect(
2198 &mut self,
2199 packet: v5_0::Disconnect,
2200 ) -> Vec<GenericEvent<PacketIdType>> {
2201 if !self.validate_maximum_packet_size_send(packet.size()) {
2202 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2203 }
2204 if self.status != ConnectionStatus::Connected {
2205 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2206 }
2207
2208 let mut events = Vec::new();
2209 self.status = ConnectionStatus::Disconnected;
2210 self.cancel_timers(&mut events);
2211 events.push(GenericEvent::RequestSendPacket {
2212 packet: packet.into(),
2213 release_packet_id_if_send_error: None,
2214 });
2215 events.push(GenericEvent::RequestClose);
2216
2217 events
2218 }
2219
2220 pub(crate) fn process_send_v5_0_auth(
2221 &mut self,
2222 packet: v5_0::Auth,
2223 ) -> Vec<GenericEvent<PacketIdType>> {
2224 if !self.validate_maximum_packet_size_send(packet.size()) {
2225 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2226 }
2227 if self.status == ConnectionStatus::Disconnected {
2228 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2229 }
2230
2231 let mut events = Vec::new();
2232 events.push(GenericEvent::RequestSendPacket {
2233 packet: packet.into(),
2234 release_packet_id_if_send_error: None,
2235 });
2236 self.send_post_process(&mut events);
2237
2238 events
2239 }
2240
2241 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2242 if self.is_client {
2243 let mut ms: u64 = self.pingreq_keep_alive_ms;
2245 if let Some(timeout_ms) = self.pingreq_user_send_interval_ms {
2246 ms = timeout_ms;
2248 } else if let Some(timeout_ms) = self.pingreq_server_keep_alive_ms {
2249 ms = timeout_ms;
2251 }
2252 if ms > 0 {
2253 self.pingreq_send_set = true;
2254 events.push(GenericEvent::RequestTimerReset {
2255 kind: TimerKind::PingreqSend,
2256 duration_ms: ms,
2257 });
2258 }
2259 }
2260 }
2261
2262 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2263 if size > self.maximum_packet_size_send as usize {
2264 error!("packet size over maximum_packet_size for sending");
2265 return false;
2266 }
2267 true
2268 }
2269
2270 #[rustfmt::skip]
2271 fn can_receive(&self, packet_type: u8) -> bool {
2272 !((Role::IS_CLIENT &&
2273 (
2274 packet_type == 1 || packet_type == 8 || packet_type == 10 || packet_type == 12 || (packet_type == 14 && self.protocol_version == Version::V3_1_1) || (packet_type == 15 && self.protocol_version == Version::V3_1_1) )
2281 ) || (Role::IS_SERVER &&
2282 (
2283 packet_type == 2 || packet_type == 9 || packet_type == 11 || packet_type == 13 || (packet_type == 15 && self.protocol_version == Version::V3_1_1) )
2289 ))
2290 }
2291
2292 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2293 let mut events = Vec::new();
2294
2295 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2297 if total_size > self.maximum_packet_size_recv {
2298 let disconnect_packet = v5_0::Disconnect::builder()
2304 .reason_code(DisconnectReasonCode::PacketTooLarge)
2305 .build()
2306 .unwrap();
2307 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2309 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2310 return events;
2311 }
2312
2313 let packet_type = raw_packet.packet_type();
2314 if !self.can_receive(packet_type) {
2315 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2316 return events;
2317 }
2318
2319 let _flags = raw_packet.flags();
2320 match self.protocol_version {
2321 Version::V3_1_1 => {
2322 match packet_type {
2323 1 => {
2324 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2326 }
2327 2 => {
2328 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2330 }
2331 3 => {
2332 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2334 }
2335 4 => {
2336 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2338 }
2339 5 => {
2340 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2342 }
2343 6 => {
2344 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2346 }
2347 7 => {
2348 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2350 }
2351 8 => {
2352 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2354 }
2355 9 => {
2356 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2358 }
2359 10 => {
2360 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2362 }
2363 11 => {
2364 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2366 }
2367 12 => {
2368 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2370 }
2371 13 => {
2372 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2374 }
2375 14 => {
2376 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2378 }
2379 _ => {
2381 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2382 }
2383 }
2384 }
2385 Version::V5_0 => {
2386 match packet_type {
2387 1 => {
2388 events.extend(self.process_recv_v5_0_connect(raw_packet));
2390 }
2391 2 => {
2392 events.extend(self.process_recv_v5_0_connack(raw_packet));
2394 }
2395 3 => {
2396 events.extend(self.process_recv_v5_0_publish(raw_packet));
2398 }
2399 4 => {
2400 events.extend(self.process_recv_v5_0_puback(raw_packet));
2402 }
2403 5 => {
2404 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2406 }
2407 6 => {
2408 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2410 }
2411 7 => {
2412 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2414 }
2415 8 => {
2416 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2418 }
2419 9 => {
2420 events.extend(self.process_recv_v5_0_suback(raw_packet));
2422 }
2423 10 => {
2424 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2426 }
2427 11 => {
2428 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2430 }
2431 12 => {
2432 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2434 }
2435 13 => {
2436 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2438 }
2439 14 => {
2440 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2442 }
2443 15 => {
2444 events.extend(self.process_recv_v5_0_auth(raw_packet));
2446 }
2447 _ => {
2449 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2450 }
2451 }
2452 }
2453 Version::Undetermined => {
2454 match packet_type {
2455 1 => {
2456 if raw_packet.remaining_length() < 7 {
2458 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2459 return events;
2460 }
2461 match raw_packet.data_as_slice()[6] {
2462 4 => {
2464 self.protocol_version = Version::V3_1_1;
2465 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2466 }
2467 5 => {
2468 self.protocol_version = Version::V5_0;
2469 events.extend(self.process_recv_v5_0_connect(raw_packet));
2470 }
2471 _ => {
2472 events.push(GenericEvent::NotifyError(
2473 MqttError::UnsupportedProtocolVersion,
2474 ));
2475 }
2476 }
2477 }
2478 _ => {
2479 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2480 }
2481 }
2482 }
2483 }
2484
2485 events
2486 }
2487
2488 fn process_recv_v3_1_1_connect(
2489 &mut self,
2490 raw_packet: RawPacket,
2491 ) -> Vec<GenericEvent<PacketIdType>> {
2492 let mut events = Vec::new();
2493 if self.status != ConnectionStatus::Disconnected {
2494 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2495 return events;
2496 }
2497 self.status = ConnectionStatus::Connecting;
2498 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2499 Ok((packet, _)) => {
2500 self.initialize(false);
2501 if packet.keep_alive() > 0 {
2502 self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2503 }
2504 if packet.clean_session() {
2505 self.clear_store_related();
2506 } else {
2507 self.need_store = true;
2508 }
2509 events.extend(self.refresh_pingreq_recv());
2510 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2511 }
2512 Err(e) => {
2513 let rc = match e {
2514 MqttError::ClientIdentifierNotValid => ConnectReturnCode::IdentifierRejected,
2515 MqttError::BadUserNameOrPassword => ConnectReturnCode::BadUserNameOrPassword,
2516 MqttError::UnsupportedProtocolVersion => {
2517 ConnectReturnCode::UnacceptableProtocolVersion
2518 }
2519 _ => ConnectReturnCode::NotAuthorized, };
2521 let connack = v3_1_1::Connack::builder()
2522 .return_code(rc)
2523 .session_present(false)
2524 .build()
2525 .unwrap();
2526 let connack_events = self.process_send_v3_1_1_connack(connack);
2527 events.extend(connack_events);
2528 events.push(GenericEvent::NotifyError(e));
2529 }
2530 }
2531
2532 events
2533 }
2534
2535 fn process_recv_v5_0_connect(
2536 &mut self,
2537 raw_packet: RawPacket,
2538 ) -> Vec<GenericEvent<PacketIdType>> {
2539 let mut events = Vec::new();
2540 if self.status != ConnectionStatus::Disconnected {
2541 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
2542 return events;
2543 }
2544 self.status = ConnectionStatus::Connecting;
2545 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2546 Ok((packet, _)) => {
2547 self.initialize(false);
2548 if packet.keep_alive() > 0 {
2549 self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2550 }
2551 if packet.clean_start() {
2552 self.clear_store_related();
2553 }
2554 packet.props().iter().for_each(|prop| match prop {
2555 Property::TopicAliasMaximum(p) => {
2556 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2557 }
2558 Property::ReceiveMaximum(p) => {
2559 self.publish_send_max = Some(p.val());
2560 }
2561 Property::MaximumPacketSize(p) => {
2562 self.maximum_packet_size_send = p.val();
2563 }
2564 Property::SessionExpiryInterval(p) => {
2565 if p.val() != 0 {
2566 self.need_store = true;
2567 }
2568 }
2569 _ => {}
2570 });
2571 events.extend(self.refresh_pingreq_recv());
2572 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2573 }
2574 Err(e) => {
2575 let rc = match e {
2576 MqttError::ClientIdentifierNotValid => {
2577 ConnectReasonCode::ClientIdentifierNotValid
2578 }
2579 MqttError::BadUserNameOrPassword => ConnectReasonCode::BadUserNameOrPassword,
2580 MqttError::UnsupportedProtocolVersion => {
2581 ConnectReasonCode::UnsupportedProtocolVersion
2582 }
2583 _ => ConnectReasonCode::UnspecifiedError,
2584 };
2585 let connack = v5_0::Connack::builder()
2586 .reason_code(rc)
2587 .session_present(false)
2588 .build()
2589 .unwrap();
2590 let connack_events = self.process_send_v5_0_connack(connack);
2591 events.extend(connack_events);
2592 events.push(GenericEvent::NotifyError(e));
2593 }
2594 }
2595
2596 events
2597 }
2598
2599 fn process_recv_v3_1_1_connack(
2600 &mut self,
2601 raw_packet: RawPacket,
2602 ) -> Vec<GenericEvent<PacketIdType>> {
2603 let mut events = Vec::new();
2604
2605 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2606 Ok((packet, _consumed)) => {
2607 if packet.return_code() == ConnectReturnCode::Accepted {
2608 self.status = ConnectionStatus::Connected;
2609 if packet.session_present() {
2610 events.extend(self.send_stored());
2611 } else {
2612 self.clear_store_related();
2613 }
2614 }
2615 events.push(GenericEvent::NotifyPacketReceived(
2616 GenericPacket::V3_1_1Connack(packet),
2617 ));
2618 }
2619 Err(e) => {
2620 Self::handle_v3_1_1_error(e, &mut events);
2621 }
2622 }
2623
2624 events
2625 }
2626
2627 fn process_recv_v5_0_connack(
2628 &mut self,
2629 raw_packet: RawPacket,
2630 ) -> Vec<GenericEvent<PacketIdType>> {
2631 let mut events = Vec::new();
2632
2633 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2634 Ok((packet, _consumed)) => {
2635 if packet.reason_code() == ConnectReasonCode::Success {
2636 self.status = ConnectionStatus::Connected;
2637
2638 for prop in packet.props() {
2640 match prop {
2641 Property::TopicAliasMaximum(val) => {
2642 if val.val() > 0 {
2643 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2644 }
2645 }
2646 Property::ReceiveMaximum(val) => {
2647 assert!(val.val() != 0);
2648 self.publish_send_max = Some(val.val());
2649 }
2650 Property::MaximumPacketSize(val) => {
2651 assert!(val.val() != 0);
2652 self.maximum_packet_size_send = val.val();
2653 }
2654 Property::ServerKeepAlive(val) => {
2655 let val = val.val() as u64 * 1000;
2656 self.pingreq_server_keep_alive_ms = Some(val);
2657 if self.pingreq_user_send_interval_ms.is_none() {
2658 if val == 0 {
2659 if self.pingreq_send_set {
2660 self.pingreq_send_set = false;
2661 events.push(GenericEvent::RequestTimerCancel(
2662 TimerKind::PingreqSend,
2663 ));
2664 }
2665 self.pingreq_user_send_interval_ms = None;
2666 } else {
2667 self.pingreq_send_set = true;
2668 events.push(GenericEvent::RequestTimerReset {
2669 kind: TimerKind::PingreqSend,
2670 duration_ms: val,
2671 });
2672 }
2673 }
2674 }
2675 _ => {
2676 }
2678 }
2679 }
2680
2681 if packet.session_present() {
2682 events.extend(self.send_stored());
2683 } else {
2684 self.clear_store_related();
2685 }
2686 }
2687 events.push(GenericEvent::NotifyPacketReceived(
2688 GenericPacket::V5_0Connack(packet),
2689 ));
2690 }
2691 Err(e) => {
2692 if self.status == ConnectionStatus::Connected {
2693 self.handle_v5_0_error(e, &mut events);
2694 } else {
2695 events.push(GenericEvent::NotifyError(e));
2696 }
2697 }
2698 }
2699
2700 events
2701 }
2702
2703 fn process_recv_v3_1_1_publish(
2704 &mut self,
2705 raw_packet: RawPacket,
2706 ) -> Vec<GenericEvent<PacketIdType>> {
2707 let mut events = Vec::new();
2708
2709 let flags = raw_packet.flags();
2710 match &raw_packet.data {
2711 PacketData::Publish(arc) => {
2712 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2713 Ok((packet, _consumed)) => {
2714 match packet.qos() {
2715 Qos::AtMostOnce => {
2716 events.extend(self.refresh_pingreq_recv());
2717 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2718 }
2719 Qos::AtLeastOnce => {
2720 let packet_id = packet.packet_id().unwrap();
2721 if self.status == ConnectionStatus::Connected
2722 && self.auto_pub_response
2723 {
2724 let puback = v3_1_1::GenericPuback::builder()
2726 .packet_id(packet_id)
2727 .build()
2728 .unwrap();
2729 events.extend(self.process_send_v3_1_1_puback(puback));
2730 }
2731 events.extend(self.refresh_pingreq_recv());
2732 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2733 }
2734 Qos::ExactlyOnce => {
2735 let packet_id = packet.packet_id().unwrap();
2736 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2737
2738 if self.status == ConnectionStatus::Connected
2739 && (self.auto_pub_response || already_handled)
2740 {
2741 let pubrec = v3_1_1::GenericPubrec::builder()
2742 .packet_id(packet_id)
2743 .build()
2744 .unwrap();
2745 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2746 }
2747 events.extend(self.refresh_pingreq_recv());
2748 if !already_handled {
2749 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2750 }
2751 }
2752 }
2753 }
2754 Err(e) => {
2755 Self::handle_v3_1_1_error(e, &mut events);
2756 }
2757 }
2758 }
2759 PacketData::Normal(_) => {
2760 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2761 }
2762 }
2763
2764 events
2765 }
2766
2767 fn process_recv_v5_0_publish(
2768 &mut self,
2769 raw_packet: RawPacket,
2770 ) -> Vec<GenericEvent<PacketIdType>> {
2771 let mut events = Vec::new();
2772
2773 let flags = raw_packet.flags();
2774 match &raw_packet.data {
2775 PacketData::Publish(arc) => {
2776 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2777 Ok((mut packet, _consumed)) => {
2778 let mut already_handled = false;
2779 let mut puback_send = false;
2780 let mut pubrec_send = false;
2781
2782 let mut check_receive_maximum =
2783 |events: &mut Vec<GenericEvent<PacketIdType>>| {
2784 if let Some(max) = self.publish_recv_max {
2785 if self.publish_recv.len() >= max as usize {
2786 self.handle_v5_0_error(
2787 MqttError::ReceiveMaximumExceeded,
2788 events,
2789 );
2790 return false;
2791 }
2792 }
2793 true
2794 };
2795
2796 match packet.qos() {
2797 Qos::AtLeastOnce => {
2798 let packet_id = packet.packet_id().unwrap();
2799 if !check_receive_maximum(&mut events) {
2800 return events;
2801 }
2802 self.publish_recv.insert(packet_id);
2803 if self.auto_pub_response
2804 && self.status == ConnectionStatus::Connected
2805 {
2806 puback_send = true;
2807 }
2808 }
2809 Qos::ExactlyOnce => {
2810 let packet_id = packet.packet_id().unwrap();
2811 if !check_receive_maximum(&mut events) {
2812 return events;
2813 }
2814 self.publish_recv.insert(packet_id);
2815
2816 if !self.qos2_publish_handled.insert(packet_id) {
2817 already_handled = true;
2818 }
2819 if self.status == ConnectionStatus::Connected
2820 && (self.auto_pub_response || already_handled)
2821 {
2822 pubrec_send = true;
2823 }
2824 }
2825 Qos::AtMostOnce => {
2826 }
2828 }
2829
2830 if packet.topic_name().is_empty() {
2832 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2834 if ta == 0
2835 || self.topic_alias_recv.is_none()
2836 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2837 {
2838 self.handle_v5_0_error(
2839 MqttError::TopicAliasInvalid,
2840 &mut events,
2841 );
2842 return events;
2843 }
2844
2845 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2846 if let Some(topic_name) = topic_alias_recv.get(ta) {
2847 match packet.add_extracted_topic_name(topic_name) {
2848 Ok(extracted) => {
2849 packet = extracted;
2850 }
2851 Err(_e) => {
2852 error!("topic alias extract failed: {_e}");
2853 self.handle_v5_0_error(
2854 MqttError::TopicAliasInvalid,
2855 &mut events,
2856 );
2857 return events;
2858 }
2859 }
2860 } else {
2861 self.handle_v5_0_error(
2862 MqttError::TopicAliasInvalid,
2863 &mut events,
2864 );
2865 return events;
2866 }
2867 }
2868 } else {
2869 self.handle_v5_0_error(MqttError::TopicAliasInvalid, &mut events);
2870 return events;
2871 }
2872 } else {
2873 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2875 if ta == 0
2876 || self.topic_alias_recv.is_none()
2877 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2878 {
2879 self.handle_v5_0_error(
2880 MqttError::TopicAliasInvalid,
2881 &mut events,
2882 );
2883 return events;
2884 }
2885 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2886 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2887 }
2888 }
2889 }
2890
2891 if puback_send {
2893 let puback = v5_0::GenericPuback::builder()
2894 .packet_id(packet.packet_id().unwrap())
2895 .build()
2896 .unwrap();
2897 events.extend(self.process_send_v5_0_puback(puback));
2898 }
2899 if pubrec_send {
2900 let pubrec = v5_0::GenericPubrec::builder()
2901 .packet_id(packet.packet_id().unwrap())
2902 .build()
2903 .unwrap();
2904 events.extend(self.process_send_v5_0_pubrec(pubrec));
2905 }
2906
2907 events.extend(self.refresh_pingreq_recv());
2909
2910 if !already_handled {
2912 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2913 }
2914 }
2915 Err(e) => {
2916 if self.status == ConnectionStatus::Connected {
2917 self.handle_v5_0_error(e, &mut events);
2918 } else {
2919 events.push(GenericEvent::NotifyError(e));
2920 }
2921 }
2922 }
2923 }
2924 PacketData::Normal(_) => {
2925 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2926 }
2927 }
2928
2929 events
2930 }
2931
2932 fn process_recv_v3_1_1_puback(
2933 &mut self,
2934 raw_packet: RawPacket,
2935 ) -> Vec<GenericEvent<PacketIdType>> {
2936 let mut events = Vec::new();
2937
2938 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2939 Ok((packet, _)) => {
2940 let packet_id = packet.packet_id();
2941 if self.pid_puback.remove(&packet_id) {
2942 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2943 if self.pid_man.is_used_id(packet_id) {
2944 self.pid_man.release_id(packet_id);
2945 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2946 }
2947 events.extend(self.refresh_pingreq_recv());
2948 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2949 } else {
2950 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2951 }
2952 }
2953 Err(e) => {
2954 Self::handle_v3_1_1_error(e, &mut events);
2955 }
2956 }
2957
2958 events
2959 }
2960
2961 fn process_recv_v5_0_puback(
2962 &mut self,
2963 raw_packet: RawPacket,
2964 ) -> Vec<GenericEvent<PacketIdType>> {
2965 let mut events = Vec::new();
2966
2967 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2968 Ok((packet, _)) => {
2969 let packet_id = packet.packet_id();
2970 if self.pid_puback.remove(&packet_id) {
2971 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2972 if self.pid_man.is_used_id(packet_id) {
2973 self.pid_man.release_id(packet_id);
2974 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2975 }
2976 if self.publish_send_max.is_some() {
2977 self.publish_send_count -= 1;
2978 }
2979 events.extend(self.refresh_pingreq_recv());
2980 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2981 } else {
2982 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
2983 }
2984 }
2985 Err(e) => {
2986 self.handle_v5_0_error(e, &mut events);
2987 }
2988 }
2989
2990 events
2991 }
2992
2993 fn process_recv_v3_1_1_pubrec(
2994 &mut self,
2995 raw_packet: RawPacket,
2996 ) -> Vec<GenericEvent<PacketIdType>> {
2997 let mut events = Vec::new();
2998
2999 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3000 Ok((packet, _)) => {
3001 let packet_id = packet.packet_id();
3002 if self.pid_pubrec.remove(&packet_id) {
3003 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
3004 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3005 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
3006 .packet_id(packet_id)
3007 .build()
3008 .unwrap();
3009 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
3010 }
3011 events.extend(self.refresh_pingreq_recv());
3012 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3013 } else {
3014 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3015 }
3016 }
3017 Err(e) => {
3018 Self::handle_v3_1_1_error(e, &mut events);
3019 }
3020 }
3021
3022 events
3023 }
3024
3025 fn process_recv_v5_0_pubrec(
3026 &mut self,
3027 raw_packet: RawPacket,
3028 ) -> Vec<GenericEvent<PacketIdType>> {
3029 let mut events = Vec::new();
3030
3031 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3032 Ok((packet, _)) => {
3033 let packet_id = packet.packet_id();
3034 if self.pid_pubrec.remove(&packet_id) {
3035 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3036 let reason_code = packet.reason_code();
3037 if reason_code.is_none() || reason_code.unwrap() == PubrecReasonCode::Success {
3038 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3039 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3040 .packet_id(packet_id)
3041 .build()
3042 .unwrap();
3043 events.extend(self.process_send_v5_0_pubrel(pubrel));
3044 }
3045 } else {
3046 if self.pid_man.is_used_id(packet_id) {
3047 self.pid_man.release_id(packet_id);
3048 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3049 }
3050 if self.publish_send_max.is_some() {
3051 self.publish_send_count -= 1;
3052 }
3053 }
3054 events.extend(self.refresh_pingreq_recv());
3055 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3056 } else {
3057 self.handle_v5_0_error(
3058 MqttError::from(DisconnectReasonCode::ProtocolError),
3059 &mut events,
3060 );
3061 }
3062 }
3063 Err(e) => {
3064 self.handle_v5_0_error(e, &mut events);
3065 }
3066 }
3067
3068 events
3069 }
3070
3071 fn process_recv_v3_1_1_pubrel(
3072 &mut self,
3073 raw_packet: RawPacket,
3074 ) -> Vec<GenericEvent<PacketIdType>> {
3075 let mut events = Vec::new();
3076
3077 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3078 Ok((packet, _)) => {
3079 let packet_id = packet.packet_id();
3080 self.qos2_publish_handled.remove(&packet_id);
3081 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3082 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3083 .packet_id(packet_id)
3084 .build()
3085 .unwrap();
3086 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3087 }
3088 events.extend(self.refresh_pingreq_recv());
3089 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3090 }
3091 Err(e) => {
3092 Self::handle_v3_1_1_error(e, &mut events);
3093 }
3094 }
3095
3096 events
3097 }
3098
3099 fn process_recv_v5_0_pubrel(
3100 &mut self,
3101 raw_packet: RawPacket,
3102 ) -> Vec<GenericEvent<PacketIdType>> {
3103 let mut events = Vec::new();
3104
3105 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3106 Ok((packet, _)) => {
3107 let packet_id = packet.packet_id();
3108 let removed = self.qos2_publish_handled.remove(&packet_id);
3109 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3110 if removed {
3111 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3112 .packet_id(packet_id)
3113 .build()
3114 .unwrap();
3115 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3116 } else {
3117 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3118 .packet_id(packet_id)
3119 .reason_code(result_code::PubcompReasonCode::PacketIdentifierNotFound)
3120 .build()
3121 .unwrap();
3122 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3123 }
3124 }
3125 events.extend(self.refresh_pingreq_recv());
3126 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3127 }
3128 Err(e) => {
3129 self.handle_v5_0_error(e, &mut events);
3130 }
3131 }
3132
3133 events
3134 }
3135
3136 fn process_recv_v3_1_1_pubcomp(
3137 &mut self,
3138 raw_packet: RawPacket,
3139 ) -> Vec<GenericEvent<PacketIdType>> {
3140 let mut events = Vec::new();
3141
3142 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3143 Ok((packet, _)) => {
3144 let packet_id = packet.packet_id();
3145 if self.pid_pubcomp.remove(&packet_id) {
3146 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3147 if self.pid_man.is_used_id(packet_id) {
3148 self.pid_man.release_id(packet_id);
3149 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3150 }
3151 events.extend(self.refresh_pingreq_recv());
3152 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3153 } else {
3154 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3155 }
3156 }
3157 Err(e) => {
3158 Self::handle_v3_1_1_error(e, &mut events);
3159 }
3160 }
3161
3162 events
3163 }
3164
3165 fn process_recv_v5_0_pubcomp(
3166 &mut self,
3167 raw_packet: RawPacket,
3168 ) -> Vec<GenericEvent<PacketIdType>> {
3169 let mut events = Vec::new();
3170
3171 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3172 Ok((packet, _)) => {
3173 let packet_id = packet.packet_id();
3174 if self.pid_pubcomp.remove(&packet_id) {
3175 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3176 if self.pid_man.is_used_id(packet_id) {
3177 self.pid_man.release_id(packet_id);
3178 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3179 }
3180 if self.publish_send_max.is_some() {
3181 self.publish_send_count -= 1;
3182 }
3183 events.extend(self.refresh_pingreq_recv());
3184 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3185 } else {
3186 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3187 }
3188 }
3189 Err(e) => {
3190 self.handle_v5_0_error(e, &mut events);
3191 }
3192 }
3193
3194 events
3195 }
3196
3197 fn process_recv_v3_1_1_subscribe(
3198 &mut self,
3199 raw_packet: RawPacket,
3200 ) -> Vec<GenericEvent<PacketIdType>> {
3201 let mut events = Vec::new();
3202
3203 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3204 Ok((packet, _)) => {
3205 events.extend(self.refresh_pingreq_recv());
3206 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3207 }
3208 Err(e) => {
3209 Self::handle_v3_1_1_error(e, &mut events);
3210 }
3211 }
3212
3213 events
3214 }
3215
3216 fn process_recv_v5_0_subscribe(
3217 &mut self,
3218 raw_packet: RawPacket,
3219 ) -> Vec<GenericEvent<PacketIdType>> {
3220 let mut events = Vec::new();
3221
3222 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3223 Ok((packet, _)) => {
3224 events.extend(self.refresh_pingreq_recv());
3225 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3226 }
3227 Err(e) => {
3228 self.handle_v5_0_error(e, &mut events);
3229 }
3230 }
3231
3232 events
3233 }
3234
3235 fn process_recv_v3_1_1_suback(
3236 &mut self,
3237 raw_packet: RawPacket,
3238 ) -> Vec<GenericEvent<PacketIdType>> {
3239 let mut events = Vec::new();
3240
3241 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3242 Ok((packet, _)) => {
3243 let packet_id = packet.packet_id();
3244 if self.pid_suback.remove(&packet_id) {
3245 if self.pid_man.is_used_id(packet_id) {
3246 self.pid_man.release_id(packet_id);
3247 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3248 }
3249 events.extend(self.refresh_pingreq_recv());
3250 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3251 } else {
3252 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3253 }
3254 }
3255 Err(e) => {
3256 Self::handle_v3_1_1_error(e, &mut events);
3257 }
3258 }
3259
3260 events
3261 }
3262
3263 fn process_recv_v5_0_suback(
3264 &mut self,
3265 raw_packet: RawPacket,
3266 ) -> Vec<GenericEvent<PacketIdType>> {
3267 let mut events = Vec::new();
3268
3269 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3270 Ok((packet, _)) => {
3271 let packet_id = packet.packet_id();
3272 if self.pid_suback.remove(&packet_id) {
3273 if self.pid_man.is_used_id(packet_id) {
3274 self.pid_man.release_id(packet_id);
3275 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3276 }
3277 events.extend(self.refresh_pingreq_recv());
3278 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3279 } else {
3280 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3281 }
3282 }
3283 Err(e) => {
3284 self.handle_v5_0_error(e, &mut events);
3285 }
3286 }
3287
3288 events
3289 }
3290
3291 fn process_recv_v3_1_1_unsubscribe(
3292 &mut self,
3293 raw_packet: RawPacket,
3294 ) -> Vec<GenericEvent<PacketIdType>> {
3295 let mut events = Vec::new();
3296
3297 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3298 Ok((packet, _)) => {
3299 events.extend(self.refresh_pingreq_recv());
3300 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3301 }
3302 Err(e) => {
3303 Self::handle_v3_1_1_error(e, &mut events);
3304 }
3305 }
3306
3307 events
3308 }
3309
3310 fn process_recv_v5_0_unsubscribe(
3311 &mut self,
3312 raw_packet: RawPacket,
3313 ) -> Vec<GenericEvent<PacketIdType>> {
3314 let mut events = Vec::new();
3315
3316 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3317 Ok((packet, _)) => {
3318 events.extend(self.refresh_pingreq_recv());
3319 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3320 }
3321 Err(e) => {
3322 self.handle_v5_0_error(e, &mut events);
3323 }
3324 }
3325
3326 events
3327 }
3328
3329 fn process_recv_v3_1_1_unsuback(
3330 &mut self,
3331 raw_packet: RawPacket,
3332 ) -> Vec<GenericEvent<PacketIdType>> {
3333 let mut events = Vec::new();
3334
3335 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3336 Ok((packet, _)) => {
3337 let packet_id = packet.packet_id();
3338 if self.pid_unsuback.remove(&packet_id) {
3339 if self.pid_man.is_used_id(packet_id) {
3340 self.pid_man.release_id(packet_id);
3341 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3342 }
3343 events.extend(self.refresh_pingreq_recv());
3344 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3345 } else {
3346 Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3347 }
3348 }
3349 Err(e) => {
3350 Self::handle_v3_1_1_error(e, &mut events);
3351 }
3352 }
3353
3354 events
3355 }
3356
3357 fn process_recv_v5_0_unsuback(
3358 &mut self,
3359 raw_packet: RawPacket,
3360 ) -> Vec<GenericEvent<PacketIdType>> {
3361 let mut events = Vec::new();
3362
3363 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3364 Ok((packet, _)) => {
3365 let packet_id = packet.packet_id();
3366 if self.pid_unsuback.remove(&packet_id) {
3367 if self.pid_man.is_used_id(packet_id) {
3368 self.pid_man.release_id(packet_id);
3369 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3370 }
3371 events.extend(self.refresh_pingreq_recv());
3372 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3373 } else {
3374 self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3375 }
3376 }
3377 Err(e) => {
3378 self.handle_v5_0_error(e, &mut events);
3379 }
3380 }
3381
3382 events
3383 }
3384
3385 fn process_recv_v3_1_1_pingreq(
3386 &mut self,
3387 raw_packet: RawPacket,
3388 ) -> Vec<GenericEvent<PacketIdType>> {
3389 let mut events = Vec::new();
3390
3391 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3392 Ok((packet, _)) => {
3393 if (Role::IS_SERVER || Role::IS_ANY)
3394 && !self.is_client
3395 && self.auto_ping_response
3396 && self.status == ConnectionStatus::Connected
3397 {
3398 let pingresp = v3_1_1::Pingresp::new();
3399 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3400 }
3401 events.extend(self.refresh_pingreq_recv());
3402 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3403 }
3404 Err(e) => {
3405 Self::handle_v3_1_1_error(e, &mut events);
3406 }
3407 }
3408
3409 events
3410 }
3411
3412 fn process_recv_v5_0_pingreq(
3413 &mut self,
3414 raw_packet: RawPacket,
3415 ) -> Vec<GenericEvent<PacketIdType>> {
3416 let mut events = Vec::new();
3417
3418 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3419 Ok((packet, _)) => {
3420 if (Role::IS_SERVER || Role::IS_ANY)
3421 && !self.is_client
3422 && self.auto_ping_response
3423 && self.status == ConnectionStatus::Connected
3424 {
3425 let pingresp = v5_0::Pingresp::new();
3426 events.extend(self.process_send_v5_0_pingresp(pingresp));
3427 }
3428 events.extend(self.refresh_pingreq_recv());
3429 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3430 }
3431 Err(e) => {
3432 self.handle_v5_0_error(e, &mut events);
3433 }
3434 }
3435
3436 events
3437 }
3438
3439 fn process_recv_v3_1_1_pingresp(
3440 &mut self,
3441 raw_packet: RawPacket,
3442 ) -> Vec<GenericEvent<PacketIdType>> {
3443 let mut events = Vec::new();
3444
3445 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3446 Ok((packet, _)) => {
3447 if self.pingresp_recv_set {
3448 self.pingresp_recv_set = false;
3449 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3450 }
3451 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3452 }
3453 Err(e) => {
3454 Self::handle_v3_1_1_error(e, &mut events);
3455 }
3456 }
3457
3458 events
3459 }
3460
3461 fn process_recv_v5_0_pingresp(
3462 &mut self,
3463 raw_packet: RawPacket,
3464 ) -> Vec<GenericEvent<PacketIdType>> {
3465 let mut events = Vec::new();
3466
3467 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3468 Ok((packet, _)) => {
3469 if self.pingresp_recv_set {
3470 self.pingresp_recv_set = false;
3471 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3472 }
3473 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3474 }
3475 Err(e) => {
3476 self.handle_v5_0_error(e, &mut events);
3477 }
3478 }
3479
3480 events
3481 }
3482
3483 fn process_recv_v3_1_1_disconnect(
3484 &mut self,
3485 raw_packet: RawPacket,
3486 ) -> Vec<GenericEvent<PacketIdType>> {
3487 let mut events = Vec::new();
3488
3489 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3490 Ok((packet, _)) => {
3491 self.cancel_timers(&mut events);
3492 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3493 }
3494 Err(e) => {
3495 Self::handle_v3_1_1_error(e, &mut events);
3496 }
3497 }
3498
3499 events
3500 }
3501
3502 fn process_recv_v5_0_disconnect(
3503 &mut self,
3504 raw_packet: RawPacket,
3505 ) -> Vec<GenericEvent<PacketIdType>> {
3506 let mut events = Vec::new();
3507
3508 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3509 Ok((packet, _)) => {
3510 self.cancel_timers(&mut events);
3511 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3512 }
3513 Err(e) => {
3514 self.handle_v5_0_error(e, &mut events);
3515 }
3516 }
3517
3518 events
3519 }
3520
3521 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3522 let mut events = Vec::new();
3523
3524 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3525 Ok((packet, _)) => {
3526 events.extend(self.refresh_pingreq_recv());
3527 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3528 }
3529 Err(e) => {
3530 self.handle_v5_0_error(e, &mut events);
3531 }
3532 }
3533
3534 events
3535 }
3536
3537 fn handle_v3_1_1_error(e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3538 events.push(GenericEvent::RequestClose);
3539 events.push(GenericEvent::NotifyError(e));
3540 }
3541
3542 fn handle_v5_0_error(&mut self, e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3543 let disconnect = v5_0::Disconnect::builder()
3544 .reason_code(e.into())
3545 .build()
3546 .unwrap();
3547 events.extend(self.process_send_v5_0_disconnect(disconnect));
3548 events.push(GenericEvent::NotifyError(e));
3549 }
3550
3551 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3552 let mut events = Vec::new();
3553 if self.pingreq_recv_timeout_ms != 0 {
3554 self.pingreq_recv_set = true;
3555 events.push(GenericEvent::RequestTimerReset {
3556 kind: TimerKind::PingreqRecv,
3557 duration_ms: self.pingreq_recv_timeout_ms,
3558 });
3559 }
3560
3561 events
3562 }
3563
3564 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3566 if self.pingreq_send_set {
3567 self.pingreq_send_set = false;
3568 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3569 }
3570 if self.pingreq_recv_set {
3571 self.pingreq_recv_set = false;
3572 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3573 }
3574 if self.pingresp_recv_set {
3575 self.pingresp_recv_set = false;
3576 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3577 }
3578 }
3579
3580 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3582 for prop in props {
3583 if let Property::TopicAlias(ta) = prop {
3584 return Some(ta.val());
3585 }
3586 }
3587 None
3588 }
3589}
3590
3591#[cfg(test)]
3594mod tests {
3595 use super::*;
3596 use crate::mqtt::connection::version::Version;
3597 use crate::mqtt::packet::TopicAliasSend;
3598 use crate::mqtt::role;
3599
3600 #[test]
3601 fn test_initialize_client_mode() {
3602 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3603
3604 connection.initialize(true);
3606
3607 assert!(connection.is_client);
3609 assert_eq!(connection.publish_send_count, 0);
3610 assert!(connection.publish_send_max.is_none());
3611 assert!(connection.publish_recv_max.is_none());
3612 assert!(!connection.need_store);
3613 }
3614
3615 #[test]
3616 fn test_initialize_server_mode() {
3617 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3618
3619 connection.initialize(false);
3621
3622 assert!(!connection.is_client);
3624 assert_eq!(connection.publish_send_count, 0);
3625 assert!(connection.publish_send_max.is_none());
3626 assert!(connection.publish_recv_max.is_none());
3627 assert!(!connection.need_store);
3628 }
3629
3630 #[test]
3631 fn test_validate_topic_alias_no_topic_alias_send() {
3632 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3633
3634 let result = connection.validate_topic_alias(Some(1));
3636 assert!(result.is_none());
3637 }
3638
3639 #[test]
3640 fn test_validate_topic_alias_none_input() {
3641 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3642
3643 let result = connection.validate_topic_alias(None);
3645 assert!(result.is_none());
3646 }
3647
3648 #[test]
3649 fn test_validate_topic_alias_range_no_topic_alias_send() {
3650 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3651
3652 let result = connection.validate_topic_alias_range(1);
3654 assert!(!result);
3655 }
3656
3657 #[test]
3658 fn test_validate_topic_alias_range_zero() {
3659 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3660
3661 let topic_alias_send = TopicAliasSend::new(10);
3663 connection.topic_alias_send = Some(topic_alias_send);
3664
3665 let result = connection.validate_topic_alias_range(0);
3667 assert!(!result);
3668 }
3669
3670 #[test]
3671 fn test_validate_topic_alias_range_over_max() {
3672 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3673
3674 let topic_alias_send = TopicAliasSend::new(5);
3676 connection.topic_alias_send = Some(topic_alias_send);
3677
3678 let result = connection.validate_topic_alias_range(6);
3680 assert!(!result);
3681 }
3682
3683 #[test]
3684 fn test_validate_topic_alias_range_valid() {
3685 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3686
3687 let topic_alias_send = TopicAliasSend::new(10);
3689 connection.topic_alias_send = Some(topic_alias_send);
3690
3691 assert!(connection.validate_topic_alias_range(1));
3693 assert!(connection.validate_topic_alias_range(5));
3694 assert!(connection.validate_topic_alias_range(10));
3695 }
3696
3697 #[test]
3698 fn test_validate_topic_alias_with_registered_alias() {
3699 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3700
3701 let mut topic_alias_send = TopicAliasSend::new(10);
3703 topic_alias_send.insert_or_update("test/topic", 5);
3704 connection.topic_alias_send = Some(topic_alias_send);
3705
3706 let result = connection.validate_topic_alias(Some(5));
3708 assert_eq!(result, Some("test/topic".to_string()));
3709 }
3710
3711 #[test]
3712 fn test_validate_topic_alias_unregistered_alias() {
3713 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3714
3715 let topic_alias_send = TopicAliasSend::new(10);
3717 connection.topic_alias_send = Some(topic_alias_send);
3718
3719 let result = connection.validate_topic_alias(Some(5));
3721 assert!(result.is_none());
3722 }
3723
3724 #[test]
3725 fn test_validate_maximum_packet_size_within_limit() {
3726 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3727
3728 let result = connection.validate_maximum_packet_size_send(1000);
3730 assert!(result);
3731 }
3732
3733 #[test]
3734 fn test_validate_maximum_packet_size_at_limit() {
3735 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3736
3737 connection.maximum_packet_size_send = 1000;
3739
3740 let result = connection.validate_maximum_packet_size_send(1000);
3742 assert!(result);
3743 }
3744
3745 #[test]
3746 fn test_validate_maximum_packet_size_over_limit() {
3747 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3748
3749 connection.maximum_packet_size_send = 1000;
3751
3752 let result = connection.validate_maximum_packet_size_send(1001);
3754 assert!(!result);
3755 }
3756
3757 #[test]
3758 fn test_validate_maximum_packet_size_zero_limit() {
3759 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3760
3761 connection.maximum_packet_size_send = 0;
3763
3764 let result = connection.validate_maximum_packet_size_send(1);
3766 assert!(!result);
3767
3768 let result = connection.validate_maximum_packet_size_send(0);
3770 assert!(result);
3771 }
3772
3773 #[test]
3774 fn test_initialize_clears_state() {
3775 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3776
3777 connection.publish_send_count = 5;
3779 connection.need_store = true;
3780 connection.pid_suback.insert(123);
3781 connection.pid_unsuback.insert(456);
3782
3783 connection.initialize(true);
3785
3786 assert_eq!(connection.publish_send_count, 0);
3788 assert!(!connection.need_store);
3789 assert!(connection.pid_suback.is_empty());
3790 assert!(connection.pid_unsuback.is_empty());
3791 assert!(connection.is_client);
3792 }
3793
3794 #[test]
3795 fn test_remaining_length_to_total_size() {
3796 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3812}