1use std::collections::HashSet;
25use std::io::Cursor;
26use std::marker::PhantomData;
27
28use crate::mqtt::connection::event::{GenericEvent, TimerKind};
29use crate::mqtt::connection::GenericStore;
30
31use serde::Serialize;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
34enum ConnectionStatus {
35 #[serde(rename = "disconnected")]
36 Disconnected,
37 #[serde(rename = "connecting")]
38 Connecting,
39 #[serde(rename = "connected")]
40 Connected,
41}
42use crate::mqtt::connection::packet_builder::{
43 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
44};
45use crate::mqtt::connection::packet_id_manager::PacketIdManager;
46use crate::mqtt::connection::role;
47use crate::mqtt::connection::role::RoleType;
48use crate::mqtt::connection::sendable::Sendable;
49use crate::mqtt::connection::version::*;
50use crate::mqtt::packet::v3_1_1;
51use crate::mqtt::packet::v5_0;
52use crate::mqtt::packet::GenericPacket;
53use crate::mqtt::packet::GenericStorePacket;
54use crate::mqtt::packet::IsPacketId;
55use crate::mqtt::packet::Qos;
56use crate::mqtt::packet::ResponsePacket;
57use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
58use crate::mqtt::prelude::GenericPacketTrait;
59use crate::mqtt::result_code::{
60 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
61};
62
63const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
66
67fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
74 let remaining_length_bytes = if remaining_length < 128 {
75 1
76 } else if remaining_length < 16384 {
77 2
78 } else if remaining_length < 2097152 {
79 3
80 } else {
81 4
82 };
83
84 1 + remaining_length_bytes + remaining_length
85}
86
87pub type Event = GenericEvent<u16>;
92
93pub struct GenericConnection<Role, PacketIdType>
136where
137 Role: RoleType,
138 PacketIdType: IsPacketId,
139{
140 _marker: PhantomData<Role>,
141
142 protocol_version: Version,
143
144 pid_man: PacketIdManager<PacketIdType>,
145 pid_suback: HashSet<PacketIdType>,
146 pid_unsuback: HashSet<PacketIdType>,
147 pid_puback: HashSet<PacketIdType>,
148 pid_pubrec: HashSet<PacketIdType>,
149 pid_pubcomp: HashSet<PacketIdType>,
150
151 need_store: bool,
152 store: GenericStore<PacketIdType>,
154
155 offline_publish: bool,
156 auto_pub_response: bool,
157 auto_ping_response: bool,
158
159 auto_map_topic_alias_send: bool,
161 auto_replace_topic_alias_send: bool,
163 topic_alias_recv: Option<TopicAliasRecv>,
165 topic_alias_send: Option<TopicAliasSend>,
167
168 publish_send_max: Option<u16>,
169 publish_recv_max: Option<u16>,
171 publish_send_count: u16,
174
175 publish_recv: HashSet<PacketIdType>,
177
178 maximum_packet_size_send: u32,
180 maximum_packet_size_recv: u32,
182
183 status: ConnectionStatus,
185
186 pingreq_send_interval_ms: Option<u64>,
188 pingreq_recv_timeout_ms: Option<u64>,
190 pingresp_recv_timeout_ms: Option<u64>,
192
193 qos2_publish_handled: HashSet<PacketIdType>,
195 qos2_publish_processing: HashSet<PacketIdType>,
197
198 pingreq_send_set: bool,
200 pingreq_recv_set: bool,
201 pingresp_recv_set: bool,
202
203 packet_builder: PacketBuilder,
204 is_client: bool,
206}
207
208pub type Connection<Role> = GenericConnection<Role, u16>;
221
222impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
223where
224 Role: RoleType,
225 PacketIdType: IsPacketId,
226{
227 pub fn new(version: Version) -> Self {
250 Self {
251 _marker: PhantomData,
252 protocol_version: version,
253 pid_man: PacketIdManager::new(),
254 pid_suback: HashSet::new(),
255 pid_unsuback: HashSet::new(),
256 pid_puback: HashSet::new(),
257 pid_pubrec: HashSet::new(),
258 pid_pubcomp: HashSet::new(),
259 need_store: false,
260 store: GenericStore::new(),
261 offline_publish: false,
262 auto_pub_response: false,
263 auto_ping_response: false,
264 auto_map_topic_alias_send: false,
265 auto_replace_topic_alias_send: false,
266 topic_alias_recv: None,
267 topic_alias_send: None,
268 publish_send_max: None,
269 publish_recv_max: None,
270 publish_send_count: 0,
271 publish_recv: HashSet::new(),
272 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
273 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
274 status: ConnectionStatus::Disconnected,
275 pingreq_send_interval_ms: None,
276 pingreq_recv_timeout_ms: None,
277 pingresp_recv_timeout_ms: None,
278 qos2_publish_handled: HashSet::new(),
279 qos2_publish_processing: HashSet::new(),
280 pingreq_send_set: false,
281 pingreq_recv_set: false,
282 pingresp_recv_set: false,
283 packet_builder: PacketBuilder::new(),
284 is_client: false,
285 }
286 }
287
288 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
329 where
330 T: Sendable<Role, PacketIdType>,
331 {
332 packet.dispatch_send(self)
334 }
335
336 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
381 use std::any::TypeId;
382
383 let role_id = TypeId::of::<Role>();
384 let client_id = TypeId::of::<role::Client>();
385 let server_id = TypeId::of::<role::Server>();
386 let any_id = TypeId::of::<role::Any>();
387
388 let packet_version = packet.protocol_version();
390
391 if self.protocol_version != packet_version {
393 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
394 }
395
396 match packet {
397 GenericPacket::V3_1_1Connect(p) => {
399 if role_id == client_id || role_id == any_id {
400 self.process_send_v3_1_1_connect(p)
401 } else {
402 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
403 }
404 }
405 GenericPacket::V5_0Connect(p) => {
406 if role_id == client_id || role_id == any_id {
407 self.process_send_v5_0_connect(p)
408 } else {
409 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
410 }
411 }
412 GenericPacket::V3_1_1Connack(p) => {
414 if role_id == server_id || role_id == any_id {
415 self.process_send_v3_1_1_connack(p)
416 } else {
417 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
418 }
419 }
420 GenericPacket::V5_0Connack(p) => {
421 if role_id == server_id || role_id == any_id {
422 self.process_send_v5_0_connack(p)
423 } else {
424 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
425 }
426 }
427 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
429 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
430 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
432 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
433 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
434 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
435 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
436 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
437 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
438 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
439 GenericPacket::V3_1_1Subscribe(p) => {
441 if role_id == client_id || role_id == any_id {
442 self.process_send_v3_1_1_subscribe(p)
443 } else {
444 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
445 }
446 }
447 GenericPacket::V5_0Subscribe(p) => {
448 if role_id == client_id || role_id == any_id {
449 self.process_send_v5_0_subscribe(p)
450 } else {
451 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
452 }
453 }
454 GenericPacket::V3_1_1Unsubscribe(p) => {
455 if role_id == client_id || role_id == any_id {
456 self.process_send_v3_1_1_unsubscribe(p)
457 } else {
458 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
459 }
460 }
461 GenericPacket::V5_0Unsubscribe(p) => {
462 if role_id == client_id || role_id == any_id {
463 self.process_send_v5_0_unsubscribe(p)
464 } else {
465 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
466 }
467 }
468 GenericPacket::V3_1_1Suback(p) => {
470 if role_id == server_id || role_id == any_id {
471 self.process_send_v3_1_1_suback(p)
472 } else {
473 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
474 }
475 }
476 GenericPacket::V5_0Suback(p) => {
477 if role_id == server_id || role_id == any_id {
478 self.process_send_v5_0_suback(p)
479 } else {
480 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
481 }
482 }
483 GenericPacket::V3_1_1Unsuback(p) => {
484 if role_id == server_id || role_id == any_id {
485 self.process_send_v3_1_1_unsuback(p)
486 } else {
487 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
488 }
489 }
490 GenericPacket::V5_0Unsuback(p) => {
491 if role_id == server_id || role_id == any_id {
492 self.process_send_v5_0_unsuback(p)
493 } else {
494 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
495 }
496 }
497 GenericPacket::V3_1_1Pingreq(p) => {
499 if role_id == client_id || role_id == any_id {
500 self.process_send_v3_1_1_pingreq(p)
501 } else {
502 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
503 }
504 }
505 GenericPacket::V5_0Pingreq(p) => {
506 if role_id == client_id || role_id == any_id {
507 self.process_send_v5_0_pingreq(p)
508 } else {
509 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
510 }
511 }
512 GenericPacket::V3_1_1Pingresp(p) => {
514 if role_id == server_id || role_id == any_id {
515 self.process_send_v3_1_1_pingresp(p)
516 } else {
517 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
518 }
519 }
520 GenericPacket::V5_0Pingresp(p) => {
521 if role_id == server_id || role_id == any_id {
522 self.process_send_v5_0_pingresp(p)
523 } else {
524 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
525 }
526 }
527 GenericPacket::V3_1_1Disconnect(p) => {
529 if role_id == client_id || role_id == any_id {
530 self.process_send_v3_1_1_disconnect(p)
531 } else {
532 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
533 }
534 }
535 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
537 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
539 }
540 }
541
542 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
591 let mut events = Vec::new();
592
593 match self.packet_builder.feed(data) {
594 PacketBuildResult::Complete(raw_packet) => {
595 events.extend(self.process_recv_packet(raw_packet));
596 }
597 PacketBuildResult::Incomplete => {}
598 PacketBuildResult::Error(e) => {
599 self.cancel_timers(&mut events);
600 events.push(GenericEvent::RequestClose);
601 events.push(GenericEvent::NotifyError(e));
602 }
603 }
604
605 events
606 }
607
608 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
625 let mut events = Vec::new();
626
627 match kind {
628 TimerKind::PingreqSend => {
629 self.pingreq_send_set = false;
631
632 if self.status == ConnectionStatus::Connected {
634 match self.protocol_version {
635 Version::V3_1_1 => {
636 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
637 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
638 }
639 }
640 Version::V5_0 => {
641 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
642 events.extend(self.process_send_v5_0_pingreq(pingreq));
643 }
644 }
645 Version::Undetermined => {
646 unreachable!("Protocol version should be set before sending PINGREQ");
647 }
648 }
649 }
650 }
651 TimerKind::PingreqRecv => {
652 self.pingreq_recv_set = false;
654
655 match self.protocol_version {
656 Version::V3_1_1 => {
657 events.push(GenericEvent::RequestClose);
659 }
660 Version::V5_0 => {
661 if self.status == ConnectionStatus::Connected {
663 if let Ok(disconnect) = v5_0::Disconnect::builder()
664 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
665 .build()
666 {
667 events.extend(self.process_send_v5_0_disconnect(disconnect));
668 }
669 }
670 }
671 Version::Undetermined => {
672 unreachable!("Protocol version should be set before receiving PINGREQ");
673 }
674 }
675 }
676 TimerKind::PingrespRecv => {
677 self.pingresp_recv_set = false;
679
680 match self.protocol_version {
681 Version::V3_1_1 => {
682 events.push(GenericEvent::RequestClose);
684 }
685 Version::V5_0 => {
686 if self.status == ConnectionStatus::Connected {
688 if let Ok(disconnect) = v5_0::Disconnect::builder()
689 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
690 .build()
691 {
692 events.extend(self.process_send_v5_0_disconnect(disconnect));
693 }
694 }
695 }
696 Version::Undetermined => {
697 unreachable!("Protocol version should be set before receiving PINGRESP");
698 }
699 }
700 }
701 }
702
703 events
704 }
705
706 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
719 let mut events = Vec::new();
720
721 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
723 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
724
725 self.status = ConnectionStatus::Disconnected;
727
728 self.topic_alias_send = None;
730 self.topic_alias_recv = None;
731
732 for packet_id in self.pid_suback.drain() {
734 if self.pid_man.is_used_id(packet_id) {
735 self.pid_man.release_id(packet_id);
736 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
737 }
738 }
739
740 for packet_id in self.pid_unsuback.drain() {
742 if self.pid_man.is_used_id(packet_id) {
743 self.pid_man.release_id(packet_id);
744 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
745 }
746 }
747
748 if !self.need_store {
750 self.qos2_publish_processing.clear();
751 self.qos2_publish_handled.clear();
752
753 for packet_id in self.pid_puback.drain() {
755 if self.pid_man.is_used_id(packet_id) {
756 self.pid_man.release_id(packet_id);
757 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
758 }
759 }
760
761 for packet_id in self.pid_pubrec.drain() {
763 if self.pid_man.is_used_id(packet_id) {
764 self.pid_man.release_id(packet_id);
765 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
766 }
767 }
768
769 for packet_id in self.pid_pubcomp.drain() {
771 if self.pid_man.is_used_id(packet_id) {
772 self.pid_man.release_id(packet_id);
773 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
774 }
775 }
776 }
777
778 self.cancel_timers(&mut events);
780
781 events
782 }
783
784 pub fn set_pingreq_send_interval(
797 &mut self,
798 duration_ms: u64,
799 ) -> Vec<GenericEvent<PacketIdType>> {
800 let mut events = Vec::new();
801
802 if duration_ms == 0 {
803 self.pingreq_send_interval_ms = None;
804 self.pingreq_send_set = false;
805 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
806 } else {
807 self.pingreq_send_interval_ms = Some(duration_ms);
808 self.pingreq_send_set = true;
809 events.push(GenericEvent::RequestTimerReset {
810 kind: TimerKind::PingreqSend,
811 duration_ms,
812 });
813 }
814
815 events
816 }
817
818 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
827 self.publish_send_max
829 .map(|max| max.saturating_sub(self.publish_send_count))
830 }
831
832 pub fn set_offline_publish(&mut self, enable: bool) {
841 self.offline_publish = enable;
842 if self.offline_publish {
843 self.need_store = true;
844 }
845 }
846
847 pub fn set_auto_pub_response(&mut self, enable: bool) {
856 self.auto_pub_response = enable;
857 }
858
859 pub fn set_auto_ping_response(&mut self, enable: bool) {
867 self.auto_ping_response = enable;
868 }
869
870 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
882 self.auto_map_topic_alias_send = enable;
883 }
884
885 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
895 self.auto_replace_topic_alias_send = enable;
896 }
897
898 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
900 self.pingresp_recv_timeout_ms = timeout_ms;
901 }
902
903 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
909 self.pid_man.acquire_unique_id()
910 }
911
912 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
925 self.pid_man.register_id(packet_id)
926 }
927
928 pub fn release_packet_id(
942 &mut self,
943 packet_id: PacketIdType,
944 ) -> Vec<GenericEvent<PacketIdType>> {
945 let mut events = Vec::new();
946
947 if self.pid_man.is_used_id(packet_id) {
948 self.pid_man.release_id(packet_id);
949 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
950 }
951
952 events
953 }
954
955 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
964 self.qos2_publish_handled.clone()
965 }
966
967 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
976 self.qos2_publish_handled = pids;
977 }
978
979 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
988 for packet in packets {
989 match &packet {
990 GenericStorePacket::V3_1_1Publish(p) => {
991 match p.qos() {
993 Qos::AtLeastOnce => {
994 self.pid_puback.insert(p.packet_id().unwrap());
995 }
996 Qos::ExactlyOnce => {
997 self.pid_pubrec.insert(p.packet_id().unwrap());
998 }
999 _ => {
1000 tracing::warn!("QoS 0 packet found in store, skipping");
1002 continue;
1003 }
1004 }
1005 let packet_id = p.packet_id().unwrap();
1007 if self.pid_man.register_id(packet_id).is_ok() {
1008 if let Err(e) = self.store.add(packet) {
1009 tracing::error!("Failed to add packet to store: {:?}", e);
1010 }
1011 } else {
1012 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1013 }
1014 }
1015 GenericStorePacket::V5_0Publish(p) => {
1016 match p.qos() {
1018 Qos::AtLeastOnce => {
1019 self.pid_puback.insert(p.packet_id().unwrap());
1020 }
1021 Qos::ExactlyOnce => {
1022 self.pid_pubrec.insert(p.packet_id().unwrap());
1023 }
1024 _ => {
1025 tracing::warn!("QoS 0 packet found in store, skipping");
1027 continue;
1028 }
1029 }
1030 let packet_id = p.packet_id().unwrap();
1032 if self.pid_man.register_id(packet_id).is_ok() {
1033 if let Err(e) = self.store.add(packet) {
1034 tracing::error!("Failed to add packet to store: {:?}", e);
1035 }
1036 } else {
1037 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1038 }
1039 }
1040 GenericStorePacket::V3_1_1Pubrel(p) => {
1041 self.pid_pubcomp.insert(p.packet_id());
1043 let packet_id = p.packet_id();
1045 if self.pid_man.register_id(packet_id).is_ok() {
1046 if let Err(e) = self.store.add(packet) {
1047 tracing::error!("Failed to add packet to store: {:?}", e);
1048 }
1049 } else {
1050 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1051 }
1052 }
1053 GenericStorePacket::V5_0Pubrel(p) => {
1054 self.pid_pubcomp.insert(p.packet_id());
1056 let packet_id = p.packet_id();
1058 if self.pid_man.register_id(packet_id).is_ok() {
1059 if let Err(e) = self.store.add(packet) {
1060 tracing::error!("Failed to add packet to store: {:?}", e);
1061 }
1062 } else {
1063 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1064 }
1065 }
1066 }
1067 }
1068 }
1069
1070 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1079 self.store.get_stored()
1080 }
1081
1082 pub fn get_protocol_version(&self) -> Version {
1088 self.protocol_version
1089 }
1090
1091 pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1101 self.qos2_publish_processing.contains(&packet_id)
1102 }
1103
1104 pub fn regulate_for_store(
1109 &self,
1110 mut packet: v5_0::GenericPublish<PacketIdType>,
1111 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1112 if packet.topic_name().is_empty() {
1113 if let Some(props) = packet.props() {
1115 if let Some(topic_alias) = Self::get_topic_alias_from_props(props) {
1116 if let Some(ref topic_alias_send) = self.topic_alias_send {
1117 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1118 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1120 } else {
1121 return Err(MqttError::PacketNotRegulated);
1122 }
1123 } else {
1124 return Err(MqttError::PacketNotRegulated);
1125 }
1126 } else {
1127 return Err(MqttError::PacketNotRegulated);
1128 }
1129 } else {
1130 return Err(MqttError::PacketNotRegulated);
1131 }
1132 } else {
1133 packet = packet.remove_topic_alias();
1135 }
1136
1137 Ok(packet)
1138 }
1139
1140 fn initialize(&mut self, is_client: bool) {
1154 self.publish_send_max = None;
1155 self.publish_recv_max = None;
1156 self.publish_send_count = 0;
1157 self.topic_alias_send = None;
1158 self.topic_alias_recv = None;
1159 self.publish_recv.clear();
1160 self.qos2_publish_processing.clear();
1161 self.need_store = false;
1162 self.pid_suback.clear();
1163 self.pid_unsuback.clear();
1164 self.is_client = is_client;
1165 }
1166
1167 fn clear_store_related(&mut self) {
1168 self.pid_man.clear();
1169 self.pid_puback.clear();
1170 self.pid_pubrec.clear();
1171 self.pid_pubcomp.clear();
1172 self.store.clear();
1173 }
1174
1175 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1177 let mut events = Vec::new();
1178 self.store.for_each(|packet| {
1179 if packet.size() > self.maximum_packet_size_send as usize {
1180 let packet_id = packet.packet_id();
1181 self.pid_man.release_id(packet_id);
1182 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1183 return false; }
1185 events.push(GenericEvent::RequestSendPacket {
1186 packet: packet.clone().into(),
1187 release_packet_id_if_send_error: None,
1188 });
1189 true });
1191
1192 events
1193 }
1194
1195 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1207 let topic_alias = topic_alias_opt?;
1208
1209 if !self.validate_topic_alias_range(topic_alias) {
1210 return None;
1211 }
1212
1213 let topic_alias_send = self.topic_alias_send.as_mut()?;
1214 let topic = topic_alias_send.get(topic_alias)?;
1216
1217 Some(topic.to_string())
1218 }
1219
1220 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1232 let topic_alias_send = match &self.topic_alias_send {
1233 Some(tas) => tas,
1234 None => {
1235 tracing::error!("topic_alias is set but topic_alias_maximum is 0");
1236 return false;
1237 }
1238 };
1239
1240 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1241 tracing::error!("topic_alias is set but out of range");
1242 return false;
1243 }
1244
1245 true
1246 }
1247
1248 pub(crate) fn process_send_v3_1_1_connect(
1250 &mut self,
1251 packet: v3_1_1::Connect,
1252 ) -> Vec<GenericEvent<PacketIdType>> {
1253 if self.status != ConnectionStatus::Disconnected {
1254 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1255 }
1256 if !self.validate_maximum_packet_size_send(packet.size()) {
1257 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1258 }
1259
1260 let mut events = Vec::new();
1261 self.initialize(true);
1262 self.status = ConnectionStatus::Connecting;
1263
1264 let keep_alive = packet.keep_alive();
1266 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1267 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1268 }
1269
1270 if packet.clean_start() {
1272 self.clear_store_related();
1273 } else {
1274 self.need_store = true;
1275 }
1276
1277 self.topic_alias_send = None;
1279
1280 events.push(GenericEvent::RequestSendPacket {
1281 packet: packet.into(),
1282 release_packet_id_if_send_error: None,
1283 });
1284 self.send_post_process(&mut events);
1285
1286 events
1287 }
1288
1289 pub(crate) fn process_send_v5_0_connect(
1291 &mut self,
1292 packet: v5_0::Connect,
1293 ) -> Vec<GenericEvent<PacketIdType>> {
1294 if !self.validate_maximum_packet_size_send(packet.size()) {
1295 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1296 }
1297 if self.status != ConnectionStatus::Disconnected {
1298 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1299 }
1300
1301 let mut events = Vec::new();
1302 self.initialize(true);
1303 self.status = ConnectionStatus::Connecting;
1304
1305 let keep_alive = packet.keep_alive();
1307 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1308 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1309 }
1310
1311 if packet.clean_start() {
1313 self.clear_store_related();
1314 }
1315
1316 for prop in packet.props() {
1318 match prop {
1319 Property::TopicAliasMaximum(val) => {
1320 if val.val() != 0 {
1321 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1322 }
1323 }
1324 Property::ReceiveMaximum(val) => {
1325 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1326 self.publish_recv_max = Some(val.val());
1327 }
1328 Property::MaximumPacketSize(val) => {
1329 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1330 self.maximum_packet_size_recv = val.val();
1331 }
1332 Property::SessionExpiryInterval(val) => {
1333 if val.val() != 0 {
1334 self.need_store = true;
1335 }
1336 }
1337 _ => {
1338 }
1340 }
1341 }
1342
1343 events.push(GenericEvent::RequestSendPacket {
1344 packet: packet.into(),
1345 release_packet_id_if_send_error: None,
1346 });
1347 self.send_post_process(&mut events);
1348
1349 events
1350 }
1351
1352 pub(crate) fn process_send_v3_1_1_connack(
1353 &mut self,
1354 packet: v3_1_1::Connack,
1355 ) -> Vec<GenericEvent<PacketIdType>> {
1356 if self.status != ConnectionStatus::Connecting {
1357 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1358 }
1359 let mut events = Vec::new();
1360 if packet.return_code() == ConnectReturnCode::Accepted {
1361 self.status = ConnectionStatus::Connected;
1362 } else {
1363 self.status = ConnectionStatus::Disconnected;
1364 }
1365
1366 events.push(GenericEvent::RequestSendPacket {
1367 packet: packet.into(),
1368 release_packet_id_if_send_error: None,
1369 });
1370 events.extend(self.send_stored());
1371 self.send_post_process(&mut events);
1372
1373 events
1374 }
1375
1376 pub(crate) fn process_send_v5_0_connack(
1377 &mut self,
1378 packet: v5_0::Connack,
1379 ) -> Vec<GenericEvent<PacketIdType>> {
1380 if !self.validate_maximum_packet_size_send(packet.size()) {
1381 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1382 }
1383 if self.status != ConnectionStatus::Connecting {
1384 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1385 }
1386
1387 let mut events = Vec::new();
1388
1389 if packet.reason_code() == ConnectReasonCode::Success {
1390 self.status = ConnectionStatus::Connected;
1391
1392 for prop in packet.props() {
1394 match prop {
1395 Property::TopicAliasMaximum(val) => {
1396 if val.val() != 0 {
1397 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1398 }
1399 }
1400 Property::ReceiveMaximum(val) => {
1401 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1402 self.publish_recv_max = Some(val.val());
1403 }
1404 Property::MaximumPacketSize(val) => {
1405 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1406 self.maximum_packet_size_recv = val.val();
1407 }
1408 _ => {
1409 }
1411 }
1412 }
1413 } else {
1414 self.status = ConnectionStatus::Disconnected;
1415 self.cancel_timers(&mut events);
1416 }
1417
1418 events.push(GenericEvent::RequestSendPacket {
1419 packet: packet.into(),
1420 release_packet_id_if_send_error: None,
1421 });
1422 events.extend(self.send_stored());
1423 self.send_post_process(&mut events);
1424
1425 events
1426 }
1427
1428 pub(crate) fn process_send_v3_1_1_publish(
1429 &mut self,
1430 packet: v3_1_1::GenericPublish<PacketIdType>,
1431 ) -> Vec<GenericEvent<PacketIdType>> {
1432 let mut events = Vec::new();
1433 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1434
1435 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1436 let packet_id = packet.packet_id().unwrap();
1438 if self.status != ConnectionStatus::Connected
1439 && !self.need_store
1440 && !self.offline_publish
1441 {
1442 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1443 if self.pid_man.is_used_id(packet_id) {
1444 self.pid_man.release_id(packet_id);
1445 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1446 }
1447 return events;
1448 }
1449 if !self.pid_man.is_used_id(packet_id) {
1450 tracing::error!("packet_id {packet_id} must be acquired or registered");
1451 events.push(GenericEvent::NotifyError(
1452 MqttError::PacketIdentifierInvalid,
1453 ));
1454 return events;
1455 }
1456 if self.need_store
1457 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1458 {
1459 let store_packet = packet.clone().set_dup(true);
1460 self.store.add(store_packet.try_into().unwrap()).unwrap();
1461 } else {
1462 release_packet_id_if_send_error = Some(packet_id);
1463 }
1464 if packet.qos() == Qos::ExactlyOnce {
1465 self.qos2_publish_processing.insert(packet_id);
1466 self.pid_pubrec.insert(packet_id);
1467 } else {
1468 self.pid_puback.insert(packet_id);
1469 }
1470 } else if self.status != ConnectionStatus::Connected {
1471 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1472 return events;
1473 }
1474
1475 if self.status == ConnectionStatus::Connected {
1476 events.push(GenericEvent::RequestSendPacket {
1477 packet: packet.into(),
1478 release_packet_id_if_send_error,
1479 });
1480 }
1481 self.send_post_process(&mut events);
1482
1483 events
1484 }
1485
1486 pub(crate) fn process_send_v5_0_publish(
1487 &mut self,
1488 mut packet: v5_0::GenericPublish<PacketIdType>,
1489 ) -> Vec<GenericEvent<PacketIdType>> {
1490 if !self.validate_maximum_packet_size_send(packet.size()) {
1491 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1492 }
1493
1494 let mut events = Vec::new();
1495 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1496 let mut topic_alias_validated = false;
1497 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1498 let packet_id = packet.packet_id().unwrap();
1499 if self.status != ConnectionStatus::Connected
1500 && !self.need_store
1501 && !self.offline_publish
1502 {
1503 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1504 if self.pid_man.is_used_id(packet_id) {
1505 self.pid_man.release_id(packet_id);
1506 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1507 }
1508 return events;
1509 }
1510
1511 if !self.pid_man.is_used_id(packet_id) {
1513 tracing::error!("packet_id {packet_id} must be acquired or registered");
1514 events.push(GenericEvent::NotifyError(
1515 MqttError::PacketIdentifierInvalid,
1516 ));
1517 return events;
1518 }
1519
1520 if self.need_store
1521 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1522 {
1523 let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1524 if packet.topic_name().is_empty() {
1525 let topic_opt = self.validate_topic_alias(ta_opt);
1527 if topic_opt.is_none() {
1528 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1529 if self.pid_man.is_used_id(packet_id) {
1530 self.pid_man.release_id(packet_id);
1531 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1532 }
1533 return events;
1534 }
1535 topic_alias_validated = true;
1536 let store_packet = packet
1537 .clone()
1538 .remove_topic_alias_add_topic(topic_opt.unwrap())
1539 .unwrap()
1540 .set_dup(true);
1541 self.store.add(store_packet.try_into().unwrap()).unwrap();
1543 } else {
1544 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1546 self.store.add(store_packet.try_into().unwrap()).unwrap();
1547 }
1548 } else {
1549 release_packet_id_if_send_error = Some(packet_id);
1550 }
1551 if packet.qos() == Qos::ExactlyOnce {
1552 self.qos2_publish_processing.insert(packet_id);
1553 self.pid_pubrec.insert(packet_id);
1554 } else {
1555 self.pid_puback.insert(packet_id);
1556 }
1557 } else if self.status != ConnectionStatus::Connected {
1558 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1559 return events;
1560 }
1561
1562 let packet_id_opt = packet.packet_id();
1563 let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1564 if packet.topic_name().is_empty() {
1565 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1567 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1568 if let Some(packet_id) = packet_id_opt {
1569 if self.pid_man.is_used_id(packet_id) {
1570 self.pid_man.release_id(packet_id);
1571 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1572 }
1573 }
1574 return events;
1575 }
1576 } else if let Some(ta) = ta_opt {
1577 if self.validate_topic_alias_range(ta) {
1579 tracing::trace!(
1580 "topic alias: {} - {} is registered.",
1581 packet.topic_name(),
1582 ta
1583 );
1584 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1585 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1586 }
1587 } else {
1588 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1589 if let Some(packet_id) = packet_id_opt {
1590 if self.pid_man.is_used_id(packet_id) {
1591 self.pid_man.release_id(packet_id);
1592 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1593 }
1594 }
1595 return events;
1596 }
1597 } else if self.status == ConnectionStatus::Connected {
1598 if self.auto_map_topic_alias_send {
1600 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1601 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1602 tracing::trace!(
1603 "topic alias: {} - {} is found.",
1604 packet.topic_name(),
1605 found_ta
1606 );
1607 packet = packet.remove_topic_add_topic_alias(found_ta);
1608 } else {
1609 let lru_ta = topic_alias_send.get_lru_alias();
1610 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1611 packet = packet.remove_topic_add_topic_alias(lru_ta);
1612 }
1613 }
1614 } else if self.auto_replace_topic_alias_send {
1615 if let Some(ref topic_alias_send) = self.topic_alias_send {
1616 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1617 tracing::trace!(
1618 "topic alias: {} - {} is found.",
1619 packet.topic_name(),
1620 found_ta
1621 );
1622 packet = packet.remove_topic_add_topic_alias(found_ta);
1623 }
1624 }
1625 }
1626 }
1627
1628 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1630 if let Some(max) = self.publish_send_max {
1631 if self.publish_send_count == max {
1632 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1633 if let Some(packet_id) = packet_id_opt {
1634 if self.pid_man.is_used_id(packet_id) {
1635 self.pid_man.release_id(packet_id);
1636 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1637 }
1638 }
1639 return events;
1640 }
1641 self.publish_send_count += 1;
1642 }
1643 }
1644
1645 if self.status == ConnectionStatus::Connected {
1646 events.push(GenericEvent::RequestSendPacket {
1647 packet: packet.into(),
1648 release_packet_id_if_send_error,
1649 });
1650 }
1651 self.send_post_process(&mut events);
1652
1653 events
1654 }
1655
1656 pub(crate) fn process_send_v3_1_1_puback(
1657 &mut self,
1658 packet: v3_1_1::GenericPuback<PacketIdType>,
1659 ) -> Vec<GenericEvent<PacketIdType>> {
1660 if self.status != ConnectionStatus::Connected {
1661 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1662 }
1663 let mut events = Vec::new();
1664
1665 events.push(GenericEvent::RequestSendPacket {
1666 packet: packet.into(),
1667 release_packet_id_if_send_error: None,
1668 });
1669 self.send_post_process(&mut events);
1670
1671 events
1672 }
1673
1674 pub(crate) fn process_send_v5_0_puback(
1675 &mut self,
1676 packet: v5_0::GenericPuback<PacketIdType>,
1677 ) -> Vec<GenericEvent<PacketIdType>> {
1678 if !self.validate_maximum_packet_size_send(packet.size()) {
1679 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1680 }
1681 if self.status != ConnectionStatus::Connected {
1682 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1683 }
1684
1685 let mut events = Vec::new();
1686 self.publish_recv.remove(&packet.packet_id());
1687
1688 events.push(GenericEvent::RequestSendPacket {
1689 packet: packet.into(),
1690 release_packet_id_if_send_error: None,
1691 });
1692 self.send_post_process(&mut events);
1693
1694 events
1695 }
1696
1697 pub(crate) fn process_send_v3_1_1_pubrec(
1698 &mut self,
1699 packet: v3_1_1::GenericPubrec<PacketIdType>,
1700 ) -> Vec<GenericEvent<PacketIdType>> {
1701 if self.status != ConnectionStatus::Connected {
1702 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1703 }
1704 let mut events = Vec::new();
1705
1706 events.push(GenericEvent::RequestSendPacket {
1707 packet: packet.into(),
1708 release_packet_id_if_send_error: None,
1709 });
1710 self.send_post_process(&mut events);
1711
1712 events
1713 }
1714
1715 pub(crate) fn process_send_v5_0_pubrec(
1716 &mut self,
1717 packet: v5_0::GenericPubrec<PacketIdType>,
1718 ) -> Vec<GenericEvent<PacketIdType>> {
1719 if !self.validate_maximum_packet_size_send(packet.size()) {
1720 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1721 }
1722 if self.status != ConnectionStatus::Connected {
1723 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1724 }
1725
1726 let mut events = Vec::new();
1727 let packet_id = packet.packet_id();
1728
1729 if let Some(rc) = packet.reason_code() {
1730 if rc.is_failure() {
1731 self.publish_recv.remove(&packet_id);
1732 self.qos2_publish_handled.remove(&packet_id);
1733 }
1734 }
1735
1736 events.push(GenericEvent::RequestSendPacket {
1737 packet: packet.into(),
1738 release_packet_id_if_send_error: None,
1739 });
1740 self.send_post_process(&mut events);
1741
1742 events
1743 }
1744
1745 pub(crate) fn process_send_v3_1_1_pubrel(
1746 &mut self,
1747 packet: v3_1_1::GenericPubrel<PacketIdType>,
1748 ) -> Vec<GenericEvent<PacketIdType>> {
1749 if self.status != ConnectionStatus::Connected && !self.need_store {
1750 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1751 }
1752 let mut events = Vec::new();
1753 let packet_id = packet.packet_id();
1754 if !self.pid_man.is_used_id(packet_id) {
1755 tracing::error!("packet_id {packet_id} must be acquired or registered");
1756 events.push(GenericEvent::NotifyError(
1757 MqttError::PacketIdentifierInvalid,
1758 ));
1759 return events;
1760 }
1761 if self.need_store {
1762 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1763 }
1764
1765 if self.status == ConnectionStatus::Connected {
1766 self.pid_pubcomp.insert(packet_id);
1767 events.push(GenericEvent::RequestSendPacket {
1768 packet: packet.into(),
1769 release_packet_id_if_send_error: None,
1770 });
1771 }
1772 self.send_post_process(&mut events);
1773
1774 events
1775 }
1776
1777 pub(crate) fn process_send_v5_0_pubrel(
1778 &mut self,
1779 packet: v5_0::GenericPubrel<PacketIdType>,
1780 ) -> Vec<GenericEvent<PacketIdType>> {
1781 if !self.validate_maximum_packet_size_send(packet.size()) {
1782 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1783 }
1784 if self.status != ConnectionStatus::Connected && !self.need_store {
1785 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1786 }
1787
1788 let mut events = Vec::new();
1789 let packet_id = packet.packet_id();
1790 if !self.pid_man.is_used_id(packet_id) {
1791 tracing::error!("packet_id {packet_id} must be acquired or registered");
1792 events.push(GenericEvent::NotifyError(
1793 MqttError::PacketIdentifierInvalid,
1794 ));
1795 return events;
1796 }
1797 if self.need_store {
1798 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1799 }
1800
1801 if self.status == ConnectionStatus::Connected {
1802 self.pid_pubcomp.insert(packet_id);
1803 events.push(GenericEvent::RequestSendPacket {
1804 packet: packet.into(),
1805 release_packet_id_if_send_error: None,
1806 });
1807 }
1808 self.send_post_process(&mut events);
1809
1810 events
1811 }
1812
1813 pub(crate) fn process_send_v3_1_1_pubcomp(
1814 &mut self,
1815 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1816 ) -> Vec<GenericEvent<PacketIdType>> {
1817 if self.status != ConnectionStatus::Connected {
1818 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1819 }
1820 let mut events = Vec::new();
1821
1822 events.push(GenericEvent::RequestSendPacket {
1823 packet: packet.into(),
1824 release_packet_id_if_send_error: None,
1825 });
1826 self.send_post_process(&mut events);
1827
1828 events
1829 }
1830
1831 pub(crate) fn process_send_v5_0_pubcomp(
1832 &mut self,
1833 packet: v5_0::GenericPubcomp<PacketIdType>,
1834 ) -> Vec<GenericEvent<PacketIdType>> {
1835 if !self.validate_maximum_packet_size_send(packet.size()) {
1836 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1837 }
1838 if self.status != ConnectionStatus::Connected {
1839 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1840 }
1841
1842 let mut events = Vec::new();
1843 self.publish_recv.remove(&packet.packet_id());
1844
1845 events.push(GenericEvent::RequestSendPacket {
1846 packet: packet.into(),
1847 release_packet_id_if_send_error: None,
1848 });
1849 self.send_post_process(&mut events);
1850
1851 events
1852 }
1853
1854 pub(crate) fn process_send_v3_1_1_subscribe(
1855 &mut self,
1856 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1857 ) -> Vec<GenericEvent<PacketIdType>> {
1858 let mut events = Vec::new();
1859 let packet_id = packet.packet_id();
1860 if self.status != ConnectionStatus::Connected {
1861 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1862 if self.pid_man.is_used_id(packet_id) {
1863 self.pid_man.release_id(packet_id);
1864 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1865 }
1866 return events;
1867 }
1868 if !self.pid_man.is_used_id(packet_id) {
1869 tracing::error!("packet_id {packet_id} must be acquired or registered");
1870 events.push(GenericEvent::NotifyError(
1871 MqttError::PacketIdentifierInvalid,
1872 ));
1873 return events;
1874 }
1875 self.pid_suback.insert(packet_id);
1876
1877 events.push(GenericEvent::RequestSendPacket {
1878 packet: packet.into(),
1879 release_packet_id_if_send_error: Some(packet_id),
1880 });
1881 self.send_post_process(&mut events);
1882
1883 events
1884 }
1885
1886 pub(crate) fn process_send_v5_0_subscribe(
1887 &mut self,
1888 packet: v5_0::GenericSubscribe<PacketIdType>,
1889 ) -> Vec<GenericEvent<PacketIdType>> {
1890 if !self.validate_maximum_packet_size_send(packet.size()) {
1891 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1892 }
1893
1894 let mut events = Vec::new();
1895 let packet_id = packet.packet_id();
1896 if self.status != ConnectionStatus::Connected {
1897 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1898 if self.pid_man.is_used_id(packet_id) {
1899 self.pid_man.release_id(packet_id);
1900 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1901 }
1902 return events;
1903 }
1904 if !self.pid_man.is_used_id(packet_id) {
1905 tracing::error!("packet_id {packet_id} must be acquired or registered");
1906 events.push(GenericEvent::NotifyError(
1907 MqttError::PacketIdentifierInvalid,
1908 ));
1909 return events;
1910 }
1911 self.pid_suback.insert(packet_id);
1912
1913 events.push(GenericEvent::RequestSendPacket {
1914 packet: packet.into(),
1915 release_packet_id_if_send_error: Some(packet_id),
1916 });
1917 self.send_post_process(&mut events);
1918
1919 events
1920 }
1921
1922 pub(crate) fn process_send_v3_1_1_suback(
1923 &mut self,
1924 packet: v3_1_1::GenericSuback<PacketIdType>,
1925 ) -> Vec<GenericEvent<PacketIdType>> {
1926 if self.status != ConnectionStatus::Connected {
1927 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1928 }
1929 let mut events = Vec::new();
1930 events.push(GenericEvent::RequestSendPacket {
1931 packet: packet.into(),
1932 release_packet_id_if_send_error: None,
1933 });
1934 self.send_post_process(&mut events);
1935
1936 events
1937 }
1938
1939 pub(crate) fn process_send_v5_0_suback(
1940 &mut self,
1941 packet: v5_0::GenericSuback<PacketIdType>,
1942 ) -> Vec<GenericEvent<PacketIdType>> {
1943 if !self.validate_maximum_packet_size_send(packet.size()) {
1944 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1945 }
1946 if self.status != ConnectionStatus::Connected {
1947 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1948 }
1949
1950 let mut events = Vec::new();
1951 events.push(GenericEvent::RequestSendPacket {
1952 packet: packet.into(),
1953 release_packet_id_if_send_error: None,
1954 });
1955 self.send_post_process(&mut events);
1956
1957 events
1958 }
1959
1960 pub(crate) fn process_send_v3_1_1_unsubscribe(
1961 &mut self,
1962 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1963 ) -> Vec<GenericEvent<PacketIdType>> {
1964 let mut events = Vec::new();
1965 let packet_id = packet.packet_id();
1966 if self.status != ConnectionStatus::Connected {
1967 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1968 if self.pid_man.is_used_id(packet_id) {
1969 self.pid_man.release_id(packet_id);
1970 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1971 }
1972 return events;
1973 }
1974 if !self.pid_man.is_used_id(packet_id) {
1975 tracing::error!("packet_id {packet_id} must be acquired or registered");
1976 events.push(GenericEvent::NotifyError(
1977 MqttError::PacketIdentifierInvalid,
1978 ));
1979 return events;
1980 }
1981 self.pid_unsuback.insert(packet_id);
1982
1983 events.push(GenericEvent::RequestSendPacket {
1984 packet: packet.into(),
1985 release_packet_id_if_send_error: Some(packet_id),
1986 });
1987 self.send_post_process(&mut events);
1988
1989 events
1990 }
1991
1992 pub(crate) fn process_send_v5_0_unsubscribe(
1993 &mut self,
1994 packet: v5_0::GenericUnsubscribe<PacketIdType>,
1995 ) -> Vec<GenericEvent<PacketIdType>> {
1996 if !self.validate_maximum_packet_size_send(packet.size()) {
1997 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1998 }
1999
2000 let mut events = Vec::new();
2001 let packet_id = packet.packet_id();
2002 if self.status != ConnectionStatus::Connected {
2003 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2004 if self.pid_man.is_used_id(packet_id) {
2005 self.pid_man.release_id(packet_id);
2006 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2007 }
2008 return events;
2009 }
2010 if !self.pid_man.is_used_id(packet_id) {
2011 tracing::error!("packet_id {packet_id} must be acquired or registered");
2012 events.push(GenericEvent::NotifyError(
2013 MqttError::PacketIdentifierInvalid,
2014 ));
2015 return events;
2016 }
2017 self.pid_unsuback.insert(packet_id);
2018
2019 events.push(GenericEvent::RequestSendPacket {
2020 packet: packet.into(),
2021 release_packet_id_if_send_error: Some(packet_id),
2022 });
2023 self.send_post_process(&mut events);
2024
2025 events
2026 }
2027
2028 pub(crate) fn process_send_v3_1_1_unsuback(
2029 &mut self,
2030 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2031 ) -> Vec<GenericEvent<PacketIdType>> {
2032 if self.status != ConnectionStatus::Connected {
2033 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2034 }
2035 let mut events = Vec::new();
2036 events.push(GenericEvent::RequestSendPacket {
2037 packet: packet.into(),
2038 release_packet_id_if_send_error: None,
2039 });
2040 self.send_post_process(&mut events);
2041
2042 events
2043 }
2044
2045 pub(crate) fn process_send_v5_0_unsuback(
2046 &mut self,
2047 packet: v5_0::GenericUnsuback<PacketIdType>,
2048 ) -> Vec<GenericEvent<PacketIdType>> {
2049 if !self.validate_maximum_packet_size_send(packet.size()) {
2050 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2051 }
2052 if self.status != ConnectionStatus::Connected {
2053 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2054 }
2055
2056 let mut events = Vec::new();
2057 events.push(GenericEvent::RequestSendPacket {
2058 packet: packet.into(),
2059 release_packet_id_if_send_error: None,
2060 });
2061 self.send_post_process(&mut events);
2062
2063 events
2064 }
2065
2066 pub(crate) fn process_send_v3_1_1_pingreq(
2067 &mut self,
2068 packet: v3_1_1::Pingreq,
2069 ) -> Vec<GenericEvent<PacketIdType>> {
2070 if self.status != ConnectionStatus::Connected {
2071 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2072 }
2073 let mut events = Vec::new();
2074 events.push(GenericEvent::RequestSendPacket {
2075 packet: packet.into(),
2076 release_packet_id_if_send_error: None,
2077 });
2078 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2079 self.pingreq_send_set = true;
2080 events.push(GenericEvent::RequestTimerReset {
2081 kind: TimerKind::PingrespRecv,
2082 duration_ms: timeout_ms,
2083 });
2084 }
2085 self.send_post_process(&mut events);
2086
2087 events
2088 }
2089
2090 pub(crate) fn process_send_v5_0_pingreq(
2091 &mut self,
2092 packet: v5_0::Pingreq,
2093 ) -> Vec<GenericEvent<PacketIdType>> {
2094 if !self.validate_maximum_packet_size_send(packet.size()) {
2095 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2096 }
2097 if self.status != ConnectionStatus::Connected {
2098 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2099 }
2100
2101 let mut events = Vec::new();
2102 events.push(GenericEvent::RequestSendPacket {
2103 packet: packet.into(),
2104 release_packet_id_if_send_error: None,
2105 });
2106 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2107 self.pingreq_send_set = true;
2108 events.push(GenericEvent::RequestTimerReset {
2109 kind: TimerKind::PingrespRecv,
2110 duration_ms: timeout_ms,
2111 });
2112 }
2113 self.send_post_process(&mut events);
2114
2115 events
2116 }
2117
2118 pub(crate) fn process_send_v3_1_1_pingresp(
2119 &mut self,
2120 packet: v3_1_1::Pingresp,
2121 ) -> Vec<GenericEvent<PacketIdType>> {
2122 if self.status != ConnectionStatus::Connected {
2123 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2124 }
2125 let mut events = Vec::new();
2126 events.push(GenericEvent::RequestSendPacket {
2127 packet: packet.into(),
2128 release_packet_id_if_send_error: None,
2129 });
2130 self.send_post_process(&mut events);
2131
2132 events
2133 }
2134
2135 pub(crate) fn process_send_v5_0_pingresp(
2136 &mut self,
2137 packet: v5_0::Pingresp,
2138 ) -> Vec<GenericEvent<PacketIdType>> {
2139 if !self.validate_maximum_packet_size_send(packet.size()) {
2140 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2141 }
2142 if self.status != ConnectionStatus::Connected {
2143 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2144 }
2145
2146 let mut events = Vec::new();
2147 events.push(GenericEvent::RequestSendPacket {
2148 packet: packet.into(),
2149 release_packet_id_if_send_error: None,
2150 });
2151 self.send_post_process(&mut events);
2152
2153 events
2154 }
2155
2156 pub(crate) fn process_send_v3_1_1_disconnect(
2157 &mut self,
2158 packet: v3_1_1::Disconnect,
2159 ) -> Vec<GenericEvent<PacketIdType>> {
2160 if self.status != ConnectionStatus::Connected {
2161 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2162 }
2163 let mut events = Vec::new();
2164 self.status = ConnectionStatus::Disconnected;
2165 self.cancel_timers(&mut events);
2166 events.push(GenericEvent::RequestSendPacket {
2167 packet: packet.into(),
2168 release_packet_id_if_send_error: None,
2169 });
2170 events.push(GenericEvent::RequestClose);
2171
2172 events
2173 }
2174
2175 pub(crate) fn process_send_v5_0_disconnect(
2176 &mut self,
2177 packet: v5_0::Disconnect,
2178 ) -> Vec<GenericEvent<PacketIdType>> {
2179 if !self.validate_maximum_packet_size_send(packet.size()) {
2180 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2181 }
2182 if self.status != ConnectionStatus::Connected {
2183 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2184 }
2185
2186 let mut events = Vec::new();
2187 self.status = ConnectionStatus::Disconnected;
2188 self.cancel_timers(&mut events);
2189 events.push(GenericEvent::RequestSendPacket {
2190 packet: packet.into(),
2191 release_packet_id_if_send_error: None,
2192 });
2193 events.push(GenericEvent::RequestClose);
2194
2195 events
2196 }
2197
2198 pub(crate) fn process_send_v5_0_auth(
2199 &mut self,
2200 packet: v5_0::Auth,
2201 ) -> Vec<GenericEvent<PacketIdType>> {
2202 if !self.validate_maximum_packet_size_send(packet.size()) {
2203 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2204 }
2205 if self.status == ConnectionStatus::Disconnected {
2206 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2207 }
2208
2209 let mut events = Vec::new();
2210 events.push(GenericEvent::RequestSendPacket {
2211 packet: packet.into(),
2212 release_packet_id_if_send_error: None,
2213 });
2214 self.send_post_process(&mut events);
2215
2216 events
2217 }
2218
2219 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2220 if self.is_client {
2221 if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2222 self.pingreq_send_set = true;
2223 events.push(GenericEvent::RequestTimerReset {
2224 kind: TimerKind::PingreqSend,
2225 duration_ms: timeout_ms,
2226 });
2227 }
2228 }
2229 }
2230
2231 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2232 if size > self.maximum_packet_size_send as usize {
2233 tracing::error!("packet size over maximum_packet_size for sending");
2234 return false;
2235 }
2236 true
2237 }
2238
2239 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2240 let mut events = Vec::new();
2241
2242 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2244 if total_size > self.maximum_packet_size_recv {
2245 let disconnect_packet = v5_0::Disconnect::builder()
2251 .reason_code(DisconnectReasonCode::PacketTooLarge)
2252 .build()
2253 .unwrap();
2254 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2256 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2257 return events;
2258 }
2259
2260 let packet_type = raw_packet.packet_type();
2261 let _flags = raw_packet.flags();
2262 match self.protocol_version {
2263 Version::V3_1_1 => {
2264 match packet_type {
2265 1 => {
2266 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2268 }
2269 2 => {
2270 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2272 }
2273 3 => {
2274 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2276 }
2277 4 => {
2278 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2280 }
2281 5 => {
2282 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2284 }
2285 6 => {
2286 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2288 }
2289 7 => {
2290 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2292 }
2293 8 => {
2294 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2296 }
2297 9 => {
2298 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2300 }
2301 10 => {
2302 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2304 }
2305 11 => {
2306 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2308 }
2309 12 => {
2310 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2312 }
2313 13 => {
2314 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2316 }
2317 14 => {
2318 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2320 }
2321 _ => {
2323 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2324 }
2325 }
2326 }
2327 Version::V5_0 => {
2328 match packet_type {
2329 1 => {
2330 events.extend(self.process_recv_v5_0_connect(raw_packet));
2332 }
2333 2 => {
2334 events.extend(self.process_recv_v5_0_connack(raw_packet));
2336 }
2337 3 => {
2338 events.extend(self.process_recv_v5_0_publish(raw_packet));
2340 }
2341 4 => {
2342 events.extend(self.process_recv_v5_0_puback(raw_packet));
2344 }
2345 5 => {
2346 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2348 }
2349 6 => {
2350 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2352 }
2353 7 => {
2354 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2356 }
2357 8 => {
2358 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2360 }
2361 9 => {
2362 events.extend(self.process_recv_v5_0_suback(raw_packet));
2364 }
2365 10 => {
2366 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2368 }
2369 11 => {
2370 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2372 }
2373 12 => {
2374 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2376 }
2377 13 => {
2378 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2380 }
2381 14 => {
2382 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2384 }
2385 15 => {
2386 events.extend(self.process_recv_v5_0_auth(raw_packet));
2388 }
2389 _ => {
2391 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2392 }
2393 }
2394 }
2395 Version::Undetermined => {
2396 match packet_type {
2397 1 => {
2398 if raw_packet.remaining_length() < 7 {
2400 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2401 return events;
2402 }
2403 match raw_packet.data_as_slice()[6] {
2404 4 => {
2406 self.protocol_version = Version::V3_1_1;
2407 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2408 }
2409 5 => {
2410 self.protocol_version = Version::V5_0;
2411 events.extend(self.process_recv_v5_0_connect(raw_packet));
2412 }
2413 _ => {
2414 events.push(GenericEvent::NotifyError(
2415 MqttError::UnsupportedProtocolVersion,
2416 ));
2417 }
2418 }
2419 }
2420 _ => {
2421 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2422 }
2423 }
2424 }
2425 }
2426
2427 events
2428 }
2429
2430 fn process_recv_v3_1_1_connect(
2431 &mut self,
2432 raw_packet: RawPacket,
2433 ) -> Vec<GenericEvent<PacketIdType>> {
2434 let mut events = Vec::new();
2435 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2436 Ok((packet, _)) => {
2437 if self.status != ConnectionStatus::Disconnected {
2438 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2439 return events;
2440 }
2441 self.initialize(false);
2442 self.status = ConnectionStatus::Connecting;
2443 if packet.keep_alive() > 0 {
2444 self.pingreq_recv_timeout_ms =
2445 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2446 }
2447 if packet.clean_session() {
2448 self.clear_store_related();
2449 } else {
2450 self.need_store = true;
2451 }
2452 events.extend(self.refresh_pingreq_recv());
2453 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2454 }
2455 Err(e) => {
2456 if self.status == ConnectionStatus::Disconnected {
2457 self.status = ConnectionStatus::Connecting;
2458 let rc = match e {
2459 MqttError::ClientIdentifierNotValid => {
2460 ConnectReturnCode::IdentifierRejected
2461 }
2462 MqttError::BadUserNameOrPassword => {
2463 ConnectReturnCode::BadUserNameOrPassword
2464 }
2465 MqttError::UnsupportedProtocolVersion => {
2466 ConnectReturnCode::UnacceptableProtocolVersion
2467 }
2468 _ => ConnectReturnCode::NotAuthorized, };
2470 let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2471 let connack_events = self.process_send_v3_1_1_connack(connack);
2472 events.extend(connack_events);
2473 } else {
2474 events.push(GenericEvent::RequestClose);
2475 }
2476 events.push(GenericEvent::NotifyError(e));
2477 }
2478 }
2479
2480 events
2481 }
2482
2483 fn process_recv_v5_0_connect(
2484 &mut self,
2485 raw_packet: RawPacket,
2486 ) -> Vec<GenericEvent<PacketIdType>> {
2487 let mut events = Vec::new();
2488 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2489 Ok((packet, _)) => {
2490 if self.status != ConnectionStatus::Disconnected {
2491 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2492 return events;
2493 }
2494 self.initialize(false);
2495 self.status = ConnectionStatus::Connecting;
2496 if packet.keep_alive() > 0 {
2497 self.pingreq_recv_timeout_ms =
2498 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2499 }
2500 if packet.clean_start() {
2501 self.clear_store_related();
2502 }
2503 packet.props().iter().for_each(|prop| match prop {
2504 Property::TopicAliasMaximum(p) => {
2505 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2506 }
2507 Property::ReceiveMaximum(p) => {
2508 self.publish_send_max = Some(p.val());
2509 }
2510 Property::MaximumPacketSize(p) => {
2511 self.maximum_packet_size_send = p.val();
2512 }
2513 Property::SessionExpiryInterval(p) => {
2514 if p.val() != 0 {
2515 self.need_store = true;
2516 }
2517 }
2518 _ => {}
2519 });
2520 events.extend(self.refresh_pingreq_recv());
2521 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2522 }
2523 Err(e) => {
2524 if self.status == ConnectionStatus::Disconnected {
2525 self.status = ConnectionStatus::Connecting;
2526 let rc = match e {
2527 MqttError::ClientIdentifierNotValid => {
2528 ConnectReasonCode::ClientIdentifierNotValid
2529 }
2530 MqttError::BadUserNameOrPassword => {
2531 ConnectReasonCode::BadAuthenticationMethod
2532 }
2533 MqttError::UnsupportedProtocolVersion => {
2534 ConnectReasonCode::UnsupportedProtocolVersion
2535 }
2536 _ => ConnectReasonCode::UnspecifiedError,
2537 };
2538 let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2539 let connack_events = self.process_send_v5_0_connack(connack);
2540 events.extend(connack_events);
2541 } else {
2542 let disconnect = v5_0::Disconnect::builder()
2543 .reason_code(DisconnectReasonCode::ProtocolError)
2544 .build()
2545 .unwrap();
2546 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2547 events.extend(disconnect_events);
2548 }
2549 events.push(GenericEvent::NotifyError(e));
2550 }
2551 }
2552
2553 events
2554 }
2555
2556 fn process_recv_v3_1_1_connack(
2557 &mut self,
2558 raw_packet: RawPacket,
2559 ) -> Vec<GenericEvent<PacketIdType>> {
2560 let mut events = Vec::new();
2561
2562 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2563 Ok((packet, _consumed)) => {
2564 if packet.return_code() == ConnectReturnCode::Accepted {
2565 self.status = ConnectionStatus::Connected;
2566 if packet.session_present() {
2567 events.extend(self.send_stored());
2568 } else {
2569 self.clear_store_related();
2570 }
2571 }
2572 events.push(GenericEvent::NotifyPacketReceived(
2573 GenericPacket::V3_1_1Connack(packet),
2574 ));
2575 }
2576 Err(e) => {
2577 events.push(GenericEvent::RequestClose);
2578 events.push(GenericEvent::NotifyError(e));
2579 }
2580 }
2581
2582 events
2583 }
2584
2585 fn process_recv_v5_0_connack(
2586 &mut self,
2587 raw_packet: RawPacket,
2588 ) -> Vec<GenericEvent<PacketIdType>> {
2589 let mut events = Vec::new();
2590
2591 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2592 Ok((packet, _consumed)) => {
2593 if packet.reason_code() == ConnectReasonCode::Success {
2594 self.status = ConnectionStatus::Connected;
2595
2596 for prop in packet.props() {
2598 match prop {
2599 Property::TopicAliasMaximum(val) => {
2600 if val.val() > 0 {
2601 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2602 }
2603 }
2604 Property::ReceiveMaximum(val) => {
2605 assert!(val.val() != 0);
2606 self.publish_send_max = Some(val.val());
2607 }
2608 Property::MaximumPacketSize(val) => {
2609 assert!(val.val() != 0);
2610 self.maximum_packet_size_send = val.val();
2611 }
2612 Property::ServerKeepAlive(val) => {
2613 let timeout_ms = (val.val() as u64) * 1000;
2615 self.pingreq_send_interval_ms = Some(timeout_ms);
2616 }
2617 _ => {
2618 }
2620 }
2621 }
2622
2623 if packet.session_present() {
2624 events.extend(self.send_stored());
2625 } else {
2626 self.clear_store_related();
2627 }
2628 }
2629 events.push(GenericEvent::NotifyPacketReceived(
2630 GenericPacket::V5_0Connack(packet),
2631 ));
2632 }
2633 Err(e) => {
2634 if self.status == ConnectionStatus::Connected {
2635 let disconnect = v5_0::Disconnect::builder()
2636 .reason_code(e.into())
2637 .build()
2638 .unwrap();
2639 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2640 events.extend(disconnect_events);
2641 }
2642 events.push(GenericEvent::NotifyError(e));
2643 }
2644 }
2645
2646 events
2647 }
2648
2649 fn process_recv_v3_1_1_publish(
2650 &mut self,
2651 raw_packet: RawPacket,
2652 ) -> Vec<GenericEvent<PacketIdType>> {
2653 let mut events = Vec::new();
2654
2655 let flags = raw_packet.flags();
2656 match &raw_packet.data {
2657 PacketData::Publish(arc) => {
2658 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2659 Ok((packet, _consumed)) => {
2660 match packet.qos() {
2661 Qos::AtMostOnce => {
2662 events.extend(self.refresh_pingreq_recv());
2663 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2664 }
2665 Qos::AtLeastOnce => {
2666 let packet_id = packet.packet_id().unwrap();
2667 if self.status == ConnectionStatus::Connected
2668 && self.auto_pub_response
2669 {
2670 let puback = v3_1_1::GenericPuback::builder()
2672 .packet_id(packet_id)
2673 .build()
2674 .unwrap();
2675 events.extend(self.process_send_v3_1_1_puback(puback));
2676 }
2677 events.extend(self.refresh_pingreq_recv());
2678 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2679 }
2680 Qos::ExactlyOnce => {
2681 let packet_id = packet.packet_id().unwrap();
2682 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2683
2684 if self.status == ConnectionStatus::Connected
2685 && (self.auto_pub_response || already_handled)
2686 {
2687 let pubrec = v3_1_1::GenericPubrec::builder()
2688 .packet_id(packet_id)
2689 .build()
2690 .unwrap();
2691 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2692 }
2693 events.extend(self.refresh_pingreq_recv());
2694 if !already_handled {
2695 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2696 }
2697 }
2698 }
2699 }
2700 Err(e) => {
2701 events.push(GenericEvent::RequestClose);
2702 events.push(GenericEvent::NotifyError(e));
2703 }
2704 }
2705 }
2706 PacketData::Normal(_) => {
2707 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2708 }
2709 }
2710
2711 events
2712 }
2713
2714 fn process_recv_v5_0_publish(
2715 &mut self,
2716 raw_packet: RawPacket,
2717 ) -> Vec<GenericEvent<PacketIdType>> {
2718 let mut events = Vec::new();
2719
2720 let flags = raw_packet.flags();
2721 match &raw_packet.data {
2722 PacketData::Publish(arc) => {
2723 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2724 Ok((packet, _consumed)) => {
2725 let mut already_handled = false;
2726 let mut puback_send = false;
2727 let mut pubrec_send = false;
2728
2729 match packet.qos() {
2730 Qos::AtLeastOnce => {
2731 let packet_id = packet.packet_id().unwrap();
2732 if let Some(max) = self.publish_recv_max {
2733 if self.publish_recv.len() >= max as usize {
2734 let disconnect = v5_0::Disconnect::builder()
2735 .reason_code(
2736 DisconnectReasonCode::ReceiveMaximumExceeded,
2737 )
2738 .build()
2739 .unwrap();
2740 events
2741 .extend(self.process_send_v5_0_disconnect(disconnect));
2742 events.push(GenericEvent::NotifyError(
2743 MqttError::ReceiveMaximumExceeded,
2744 ));
2745 return events;
2746 }
2747 }
2748 self.publish_recv.insert(packet_id);
2749 if self.auto_pub_response
2750 && self.status == ConnectionStatus::Connected
2751 {
2752 puback_send = true;
2753 }
2754 }
2755 Qos::ExactlyOnce => {
2756 let packet_id = packet.packet_id().unwrap();
2757 if let Some(max) = self.publish_recv_max {
2758 if self.publish_recv.len() >= max as usize {
2759 let disconnect = v5_0::Disconnect::builder()
2760 .reason_code(
2761 DisconnectReasonCode::ReceiveMaximumExceeded,
2762 )
2763 .build()
2764 .unwrap();
2765 events
2766 .extend(self.process_send_v5_0_disconnect(disconnect));
2767 events.push(GenericEvent::NotifyError(
2768 MqttError::ReceiveMaximumExceeded,
2769 ));
2770 return events;
2771 }
2772 }
2773 self.publish_recv.insert(packet_id);
2774
2775 if !self.qos2_publish_handled.insert(packet_id) {
2776 already_handled = true;
2777 }
2778 if self.status == ConnectionStatus::Connected
2779 && (self.auto_pub_response || already_handled)
2780 {
2781 pubrec_send = true;
2782 }
2783 }
2784 Qos::AtMostOnce => {
2785 }
2787 }
2788
2789 if packet.topic_name().is_empty() {
2791 if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2793 if ta == 0
2794 || self.topic_alias_recv.is_none()
2795 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2796 {
2797 let disconnect = v5_0::Disconnect::builder()
2798 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2799 .build()
2800 .unwrap();
2801 events.extend(self.process_send_v5_0_disconnect(disconnect));
2802 events.push(GenericEvent::NotifyError(
2803 MqttError::TopicAliasInvalid,
2804 ));
2805 return events;
2806 }
2807
2808 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2809 if topic_alias_recv.get(ta).is_none() {
2810 let disconnect = v5_0::Disconnect::builder()
2811 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2812 .build()
2813 .unwrap();
2814 events
2815 .extend(self.process_send_v5_0_disconnect(disconnect));
2816 events.push(GenericEvent::NotifyError(
2817 MqttError::TopicAliasInvalid,
2818 ));
2819 return events;
2820 }
2821 }
2824 } else {
2825 let disconnect = v5_0::Disconnect::builder()
2826 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2827 .build()
2828 .unwrap();
2829 events.extend(self.process_send_v5_0_disconnect(disconnect));
2830 events
2831 .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2832 return events;
2833 }
2834 } else {
2835 if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2837 if ta == 0
2838 || self.topic_alias_recv.is_none()
2839 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2840 {
2841 let disconnect = v5_0::Disconnect::builder()
2842 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2843 .build()
2844 .unwrap();
2845 events.extend(self.process_send_v5_0_disconnect(disconnect));
2846 events.push(GenericEvent::NotifyError(
2847 MqttError::TopicAliasInvalid,
2848 ));
2849 return events;
2850 }
2851 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2852 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2853 }
2854 }
2855 }
2856
2857 if puback_send {
2859 let puback = v5_0::GenericPuback::builder()
2860 .packet_id(packet.packet_id().unwrap())
2861 .build()
2862 .unwrap();
2863 events.extend(self.process_send_v5_0_puback(puback));
2864 }
2865 if pubrec_send {
2866 let pubrec = v5_0::GenericPubrec::builder()
2867 .packet_id(packet.packet_id().unwrap())
2868 .build()
2869 .unwrap();
2870 events.extend(self.process_send_v5_0_pubrec(pubrec));
2871 }
2872
2873 events.extend(self.refresh_pingreq_recv());
2875
2876 if !already_handled {
2878 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2879 }
2880 }
2881 Err(e) => {
2882 if self.status == ConnectionStatus::Connected {
2883 let disconnect = v5_0::Disconnect::builder()
2884 .reason_code(e.into())
2885 .build()
2886 .unwrap();
2887 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2888 events.extend(disconnect_events);
2889 }
2890 events.push(GenericEvent::NotifyError(e));
2891 }
2892 }
2893 }
2894 PacketData::Normal(_) => {
2895 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2896 }
2897 }
2898
2899 events
2900 }
2901
2902 fn process_recv_v3_1_1_puback(
2903 &mut self,
2904 raw_packet: RawPacket,
2905 ) -> Vec<GenericEvent<PacketIdType>> {
2906 let mut events = Vec::new();
2907
2908 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2909 Ok((packet, _)) => {
2910 let packet_id = packet.packet_id();
2911 if self.pid_puback.remove(&packet_id) {
2912 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2913 if self.pid_man.is_used_id(packet_id) {
2914 self.pid_man.release_id(packet_id);
2915 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2916 }
2917 events.extend(self.refresh_pingreq_recv());
2918 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2919 } else {
2920 events.push(GenericEvent::RequestClose);
2921 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2922 }
2923 }
2924 Err(e) => {
2925 events.push(GenericEvent::RequestClose);
2926 events.push(GenericEvent::NotifyError(e));
2927 }
2928 }
2929
2930 events
2931 }
2932
2933 fn process_recv_v5_0_puback(
2934 &mut self,
2935 raw_packet: RawPacket,
2936 ) -> Vec<GenericEvent<PacketIdType>> {
2937 let mut events = Vec::new();
2938
2939 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2940 Ok((packet, _)) => {
2941 let packet_id = packet.packet_id();
2942 if self.pid_puback.remove(&packet_id) {
2943 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2944 if self.pid_man.is_used_id(packet_id) {
2945 self.pid_man.release_id(packet_id);
2946 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2947 }
2948 if self.publish_send_max.is_some() {
2949 self.publish_send_count -= 1;
2950 }
2951 events.extend(self.refresh_pingreq_recv());
2952 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2953 } else {
2954 let disconnect = v5_0::Disconnect::builder()
2955 .reason_code(DisconnectReasonCode::ProtocolError)
2956 .build()
2957 .unwrap();
2958 events.extend(self.process_send_v5_0_disconnect(disconnect));
2959 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2960 }
2961 }
2962 Err(e) => {
2963 let disconnect = v5_0::Disconnect::builder()
2964 .reason_code(DisconnectReasonCode::ProtocolError)
2965 .build()
2966 .unwrap();
2967 events.extend(self.process_send_v5_0_disconnect(disconnect));
2968 events.push(GenericEvent::NotifyError(e));
2969 }
2970 }
2971
2972 events
2973 }
2974
2975 fn process_recv_v3_1_1_pubrec(
2976 &mut self,
2977 raw_packet: RawPacket,
2978 ) -> Vec<GenericEvent<PacketIdType>> {
2979 let mut events = Vec::new();
2980
2981 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2982 Ok((packet, _)) => {
2983 let packet_id = packet.packet_id();
2984 if self.pid_pubrec.remove(&packet_id) {
2985 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2986 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2987 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2988 .packet_id(packet_id)
2989 .build()
2990 .unwrap();
2991 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2992 }
2993 events.extend(self.refresh_pingreq_recv());
2994 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2995 } else {
2996 events.push(GenericEvent::RequestClose);
2997 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2998 }
2999 }
3000 Err(e) => {
3001 events.push(GenericEvent::RequestClose);
3002 events.push(GenericEvent::NotifyError(e));
3003 }
3004 }
3005
3006 events
3007 }
3008
3009 fn process_recv_v5_0_pubrec(
3010 &mut self,
3011 raw_packet: RawPacket,
3012 ) -> Vec<GenericEvent<PacketIdType>> {
3013 let mut events = Vec::new();
3014
3015 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3016 Ok((packet, _)) => {
3017 let packet_id = packet.packet_id();
3018 if self.pid_pubrec.remove(&packet_id) {
3019 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3020 if let Some(reason_code) = packet.reason_code() {
3021 if reason_code != PubrecReasonCode::Success {
3022 if self.pid_man.is_used_id(packet_id) {
3023 self.pid_man.release_id(packet_id);
3024 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3025 }
3026 self.qos2_publish_processing.remove(&packet_id);
3027 if self.publish_send_max.is_some() {
3028 self.publish_send_count -= 1;
3029 }
3030 } else if self.auto_pub_response
3031 && self.status == ConnectionStatus::Connected
3032 {
3033 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3034 .packet_id(packet_id)
3035 .build()
3036 .unwrap();
3037 events.extend(self.process_send_v5_0_pubrel(pubrel));
3038 }
3039 } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3040 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3041 .packet_id(packet_id)
3042 .build()
3043 .unwrap();
3044 events.extend(self.process_send_v5_0_pubrel(pubrel));
3045 }
3046 events.extend(self.refresh_pingreq_recv());
3047 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3048 } else {
3049 let disconnect = v5_0::Disconnect::builder()
3050 .reason_code(DisconnectReasonCode::ProtocolError)
3051 .build()
3052 .unwrap();
3053 events.extend(self.process_send_v5_0_disconnect(disconnect));
3054 events.push(GenericEvent::NotifyError(MqttError::from(
3055 DisconnectReasonCode::ProtocolError,
3056 )));
3057 }
3058 }
3059 Err(e) => {
3060 let disconnect = v5_0::Disconnect::builder()
3061 .reason_code(DisconnectReasonCode::ProtocolError)
3062 .build()
3063 .unwrap();
3064 events.extend(self.process_send_v5_0_disconnect(disconnect));
3065 events.push(GenericEvent::NotifyError(e));
3066 }
3067 }
3068
3069 events
3070 }
3071
3072 fn process_recv_v3_1_1_pubrel(
3073 &mut self,
3074 raw_packet: RawPacket,
3075 ) -> Vec<GenericEvent<PacketIdType>> {
3076 let mut events = Vec::new();
3077
3078 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3079 Ok((packet, _)) => {
3080 let packet_id = packet.packet_id();
3081 self.qos2_publish_handled.remove(&packet_id);
3082 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3083 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3084 .packet_id(packet_id)
3085 .build()
3086 .unwrap();
3087 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3088 }
3089 events.extend(self.refresh_pingreq_recv());
3090 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3091 }
3092 Err(e) => {
3093 events.push(GenericEvent::RequestClose);
3094 events.push(GenericEvent::NotifyError(e));
3095 }
3096 }
3097
3098 events
3099 }
3100
3101 fn process_recv_v5_0_pubrel(
3102 &mut self,
3103 raw_packet: RawPacket,
3104 ) -> Vec<GenericEvent<PacketIdType>> {
3105 let mut events = Vec::new();
3106
3107 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3108 Ok((packet, _)) => {
3109 let packet_id = packet.packet_id();
3110 self.qos2_publish_handled.remove(&packet_id);
3111 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3112 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3113 .packet_id(packet_id)
3114 .build()
3115 .unwrap();
3116 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3117 }
3118 events.extend(self.refresh_pingreq_recv());
3119 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3120 }
3121 Err(e) => {
3122 let disconnect = v5_0::Disconnect::builder()
3123 .reason_code(DisconnectReasonCode::ProtocolError)
3124 .build()
3125 .unwrap();
3126 events.extend(self.process_send_v5_0_disconnect(disconnect));
3127 events.push(GenericEvent::NotifyError(e));
3128 }
3129 }
3130
3131 events
3132 }
3133
3134 fn process_recv_v3_1_1_pubcomp(
3135 &mut self,
3136 raw_packet: RawPacket,
3137 ) -> Vec<GenericEvent<PacketIdType>> {
3138 let mut events = Vec::new();
3139
3140 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3141 Ok((packet, _)) => {
3142 let packet_id = packet.packet_id();
3143 if self.pid_pubcomp.remove(&packet_id) {
3144 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3145 if self.pid_man.is_used_id(packet_id) {
3146 self.pid_man.release_id(packet_id);
3147 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3148 }
3149 self.qos2_publish_processing.remove(&packet_id);
3150 events.extend(self.refresh_pingreq_recv());
3151 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3152 } else {
3153 events.push(GenericEvent::RequestClose);
3154 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3155 }
3156 }
3157 Err(e) => {
3158 events.push(GenericEvent::RequestClose);
3159 events.push(GenericEvent::NotifyError(e));
3160 }
3161 }
3162
3163 events
3164 }
3165
3166 fn process_recv_v5_0_pubcomp(
3167 &mut self,
3168 raw_packet: RawPacket,
3169 ) -> Vec<GenericEvent<PacketIdType>> {
3170 let mut events = Vec::new();
3171
3172 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3173 Ok((packet, _)) => {
3174 let packet_id = packet.packet_id();
3175 if self.pid_pubcomp.remove(&packet_id) {
3176 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3177 if self.pid_man.is_used_id(packet_id) {
3178 self.pid_man.release_id(packet_id);
3179 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3180 }
3181 self.qos2_publish_processing.remove(&packet_id);
3182 if self.publish_send_max.is_some() {
3183 self.publish_send_count -= 1;
3184 }
3185 events.extend(self.refresh_pingreq_recv());
3186 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3187 } else {
3188 let disconnect = v5_0::Disconnect::builder()
3189 .reason_code(DisconnectReasonCode::ProtocolError)
3190 .build()
3191 .unwrap();
3192 events.extend(self.process_send_v5_0_disconnect(disconnect));
3193 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3194 }
3195 }
3196 Err(e) => {
3197 let disconnect = v5_0::Disconnect::builder()
3198 .reason_code(DisconnectReasonCode::ProtocolError)
3199 .build()
3200 .unwrap();
3201 events.extend(self.process_send_v5_0_disconnect(disconnect));
3202 events.push(GenericEvent::NotifyError(e));
3203 }
3204 }
3205
3206 events
3207 }
3208
3209 fn process_recv_v3_1_1_subscribe(
3210 &mut self,
3211 raw_packet: RawPacket,
3212 ) -> Vec<GenericEvent<PacketIdType>> {
3213 let mut events = Vec::new();
3214
3215 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3216 Ok((packet, _)) => {
3217 events.extend(self.refresh_pingreq_recv());
3218 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3219 }
3220 Err(e) => {
3221 events.push(GenericEvent::RequestClose);
3222 events.push(GenericEvent::NotifyError(e));
3223 }
3224 }
3225
3226 events
3227 }
3228
3229 fn process_recv_v5_0_subscribe(
3230 &mut self,
3231 raw_packet: RawPacket,
3232 ) -> Vec<GenericEvent<PacketIdType>> {
3233 let mut events = Vec::new();
3234
3235 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3236 Ok((packet, _)) => {
3237 events.extend(self.refresh_pingreq_recv());
3238 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3239 }
3240 Err(e) => {
3241 let disconnect = v5_0::Disconnect::builder()
3242 .reason_code(DisconnectReasonCode::ProtocolError)
3243 .build()
3244 .unwrap();
3245 events.extend(self.process_send_v5_0_disconnect(disconnect));
3246 events.push(GenericEvent::NotifyError(e));
3247 }
3248 }
3249
3250 events
3251 }
3252
3253 fn process_recv_v3_1_1_suback(
3254 &mut self,
3255 raw_packet: RawPacket,
3256 ) -> Vec<GenericEvent<PacketIdType>> {
3257 let mut events = Vec::new();
3258
3259 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3260 Ok((packet, _)) => {
3261 let packet_id = packet.packet_id();
3262 if self.pid_suback.remove(&packet_id) {
3263 if self.pid_man.is_used_id(packet_id) {
3264 self.pid_man.release_id(packet_id);
3265 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3266 }
3267 events.extend(self.refresh_pingreq_recv());
3268 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3269 } else {
3270 events.push(GenericEvent::RequestClose);
3271 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3272 }
3273 }
3274 Err(e) => {
3275 events.push(GenericEvent::RequestClose);
3276 events.push(GenericEvent::NotifyError(e));
3277 }
3278 }
3279
3280 events
3281 }
3282
3283 fn process_recv_v5_0_suback(
3284 &mut self,
3285 raw_packet: RawPacket,
3286 ) -> Vec<GenericEvent<PacketIdType>> {
3287 let mut events = Vec::new();
3288
3289 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3290 Ok((packet, _)) => {
3291 let packet_id = packet.packet_id();
3292 if self.pid_suback.remove(&packet_id) {
3293 if self.pid_man.is_used_id(packet_id) {
3294 self.pid_man.release_id(packet_id);
3295 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3296 }
3297 events.extend(self.refresh_pingreq_recv());
3298 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3299 } else {
3300 let disconnect = v5_0::Disconnect::builder()
3301 .reason_code(DisconnectReasonCode::ProtocolError)
3302 .build()
3303 .unwrap();
3304 events.extend(self.process_send_v5_0_disconnect(disconnect));
3305 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3306 }
3307 }
3308 Err(e) => {
3309 let disconnect = v5_0::Disconnect::builder()
3310 .reason_code(DisconnectReasonCode::ProtocolError)
3311 .build()
3312 .unwrap();
3313 events.extend(self.process_send_v5_0_disconnect(disconnect));
3314 events.push(GenericEvent::NotifyError(e));
3315 }
3316 }
3317
3318 events
3319 }
3320
3321 fn process_recv_v3_1_1_unsubscribe(
3322 &mut self,
3323 raw_packet: RawPacket,
3324 ) -> Vec<GenericEvent<PacketIdType>> {
3325 let mut events = Vec::new();
3326
3327 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3328 Ok((packet, _)) => {
3329 events.extend(self.refresh_pingreq_recv());
3330 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3331 }
3332 Err(e) => {
3333 events.push(GenericEvent::RequestClose);
3334 events.push(GenericEvent::NotifyError(e));
3335 }
3336 }
3337
3338 events
3339 }
3340
3341 fn process_recv_v5_0_unsubscribe(
3342 &mut self,
3343 raw_packet: RawPacket,
3344 ) -> Vec<GenericEvent<PacketIdType>> {
3345 let mut events = Vec::new();
3346
3347 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3348 Ok((packet, _)) => {
3349 events.extend(self.refresh_pingreq_recv());
3350 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3351 }
3352 Err(e) => {
3353 let disconnect = v5_0::Disconnect::builder()
3354 .reason_code(DisconnectReasonCode::ProtocolError)
3355 .build()
3356 .unwrap();
3357 events.extend(self.process_send_v5_0_disconnect(disconnect));
3358 events.push(GenericEvent::NotifyError(e));
3359 }
3360 }
3361
3362 events
3363 }
3364
3365 fn process_recv_v3_1_1_unsuback(
3366 &mut self,
3367 raw_packet: RawPacket,
3368 ) -> Vec<GenericEvent<PacketIdType>> {
3369 let mut events = Vec::new();
3370
3371 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3372 Ok((packet, _)) => {
3373 let packet_id = packet.packet_id();
3374 if self.pid_unsuback.remove(&packet_id) {
3375 if self.pid_man.is_used_id(packet_id) {
3376 self.pid_man.release_id(packet_id);
3377 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3378 }
3379 events.extend(self.refresh_pingreq_recv());
3380 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3381 } else {
3382 events.push(GenericEvent::RequestClose);
3383 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3384 }
3385 }
3386 Err(e) => {
3387 events.push(GenericEvent::RequestClose);
3388 events.push(GenericEvent::NotifyError(e));
3389 }
3390 }
3391
3392 events
3393 }
3394
3395 fn process_recv_v5_0_unsuback(
3396 &mut self,
3397 raw_packet: RawPacket,
3398 ) -> Vec<GenericEvent<PacketIdType>> {
3399 let mut events = Vec::new();
3400
3401 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3402 Ok((packet, _)) => {
3403 let packet_id = packet.packet_id();
3404 if self.pid_unsuback.remove(&packet_id) {
3405 if self.pid_man.is_used_id(packet_id) {
3406 self.pid_man.release_id(packet_id);
3407 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3408 }
3409 events.extend(self.refresh_pingreq_recv());
3410 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3411 } else {
3412 let disconnect = v5_0::Disconnect::builder()
3413 .reason_code(DisconnectReasonCode::ProtocolError)
3414 .build()
3415 .unwrap();
3416 events.extend(self.process_send_v5_0_disconnect(disconnect));
3417 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3418 }
3419 }
3420 Err(e) => {
3421 let disconnect = v5_0::Disconnect::builder()
3422 .reason_code(DisconnectReasonCode::ProtocolError)
3423 .build()
3424 .unwrap();
3425 events.extend(self.process_send_v5_0_disconnect(disconnect));
3426 events.push(GenericEvent::NotifyError(e));
3427 }
3428 }
3429
3430 events
3431 }
3432
3433 fn process_recv_v3_1_1_pingreq(
3434 &mut self,
3435 raw_packet: RawPacket,
3436 ) -> Vec<GenericEvent<PacketIdType>> {
3437 let mut events = Vec::new();
3438
3439 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3440 Ok((packet, _)) => {
3441 if (Role::IS_SERVER || Role::IS_ANY)
3442 && !self.is_client
3443 && self.auto_ping_response
3444 && self.status == ConnectionStatus::Connected
3445 {
3446 let pingresp = v3_1_1::Pingresp::new();
3447 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3448 }
3449 events.extend(self.refresh_pingreq_recv());
3450 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3451 }
3452 Err(e) => {
3453 events.push(GenericEvent::RequestClose);
3454 events.push(GenericEvent::NotifyError(e));
3455 }
3456 }
3457
3458 events
3459 }
3460
3461 fn process_recv_v5_0_pingreq(
3462 &mut self,
3463 raw_packet: RawPacket,
3464 ) -> Vec<GenericEvent<PacketIdType>> {
3465 let mut events = Vec::new();
3466
3467 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3468 Ok((packet, _)) => {
3469 if (Role::IS_SERVER || Role::IS_ANY)
3470 && !self.is_client
3471 && self.auto_ping_response
3472 && self.status == ConnectionStatus::Connected
3473 {
3474 let pingresp = v5_0::Pingresp::new();
3475 events.extend(self.process_send_v5_0_pingresp(pingresp));
3476 }
3477 events.extend(self.refresh_pingreq_recv());
3478 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3479 }
3480 Err(e) => {
3481 let disconnect = v5_0::Disconnect::builder()
3482 .reason_code(DisconnectReasonCode::ProtocolError)
3483 .build()
3484 .unwrap();
3485 events.extend(self.process_send_v5_0_disconnect(disconnect));
3486 events.push(GenericEvent::NotifyError(e));
3487 }
3488 }
3489
3490 events
3491 }
3492
3493 fn process_recv_v3_1_1_pingresp(
3494 &mut self,
3495 raw_packet: RawPacket,
3496 ) -> Vec<GenericEvent<PacketIdType>> {
3497 let mut events = Vec::new();
3498
3499 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3500 Ok((packet, _)) => {
3501 self.pingresp_recv_set = false;
3502 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3503 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3504 }
3505 Err(e) => {
3506 events.push(GenericEvent::RequestClose);
3507 events.push(GenericEvent::NotifyError(e));
3508 }
3509 }
3510
3511 events
3512 }
3513
3514 fn process_recv_v5_0_pingresp(
3515 &mut self,
3516 raw_packet: RawPacket,
3517 ) -> Vec<GenericEvent<PacketIdType>> {
3518 let mut events = Vec::new();
3519
3520 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3521 Ok((packet, _)) => {
3522 self.pingresp_recv_set = false;
3523 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3524 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3525 }
3526 Err(e) => {
3527 let disconnect = v5_0::Disconnect::builder()
3528 .reason_code(DisconnectReasonCode::ProtocolError)
3529 .build()
3530 .unwrap();
3531 events.extend(self.process_send_v5_0_disconnect(disconnect));
3532 events.push(GenericEvent::NotifyError(e));
3533 }
3534 }
3535
3536 events
3537 }
3538
3539 fn process_recv_v3_1_1_disconnect(
3540 &mut self,
3541 raw_packet: RawPacket,
3542 ) -> Vec<GenericEvent<PacketIdType>> {
3543 let mut events = Vec::new();
3544
3545 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3546 Ok((packet, _)) => {
3547 self.cancel_timers(&mut events);
3548 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3549 }
3550 Err(e) => {
3551 events.push(GenericEvent::RequestClose);
3552 events.push(GenericEvent::NotifyError(e));
3553 }
3554 }
3555
3556 events
3557 }
3558
3559 fn process_recv_v5_0_disconnect(
3560 &mut self,
3561 raw_packet: RawPacket,
3562 ) -> Vec<GenericEvent<PacketIdType>> {
3563 let mut events = Vec::new();
3564
3565 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3566 Ok((packet, _)) => {
3567 self.cancel_timers(&mut events);
3568 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3569 }
3570 Err(e) => {
3571 let disconnect = v5_0::Disconnect::builder()
3572 .reason_code(DisconnectReasonCode::ProtocolError)
3573 .build()
3574 .unwrap();
3575 events.extend(self.process_send_v5_0_disconnect(disconnect));
3576 events.push(GenericEvent::NotifyError(e));
3577 }
3578 }
3579
3580 events
3581 }
3582
3583 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3584 let mut events = Vec::new();
3585
3586 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3587 Ok((packet, _)) => {
3588 events.extend(self.refresh_pingreq_recv());
3589 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3590 }
3591 Err(e) => {
3592 let disconnect = v5_0::Disconnect::builder()
3593 .reason_code(DisconnectReasonCode::ProtocolError)
3594 .build()
3595 .unwrap();
3596 events.extend(self.process_send_v5_0_disconnect(disconnect));
3597 events.push(GenericEvent::NotifyError(e));
3598 }
3599 }
3600
3601 events
3602 }
3603
3604 fn get_topic_alias_from_props_opt(props: &Option<Vec<Property>>) -> Option<u16> {
3605 if let Some(props) = props {
3606 Self::get_topic_alias_from_props(props.as_slice())
3607 } else {
3608 None
3609 }
3610 }
3611
3612 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3613 let mut events = Vec::new();
3614 if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3615 if self.status == ConnectionStatus::Connecting
3616 || self.status == ConnectionStatus::Connected
3617 {
3618 self.pingreq_recv_set = true;
3619 events.push(GenericEvent::RequestTimerReset {
3620 kind: TimerKind::PingreqRecv,
3621 duration_ms: timeout_ms,
3622 });
3623 } else {
3624 self.pingreq_recv_set = false;
3625 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3626 }
3627 }
3628
3629 events
3630 }
3631
3632 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3634 if self.pingreq_send_set {
3635 self.pingreq_send_set = false;
3636 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3637 }
3638 if self.pingreq_recv_set {
3639 self.pingreq_recv_set = false;
3640 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3641 }
3642 if self.pingresp_recv_set {
3643 self.pingresp_recv_set = false;
3644 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3645 }
3646 }
3647
3648 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3650 for prop in props {
3651 if let Property::TopicAlias(ta) = prop {
3652 return Some(ta.val());
3653 }
3654 }
3655 None
3656 }
3657
3658 #[allow(dead_code)]
3659 fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3660 self.pid_man.is_used_id(packet_id)
3661 }
3662}
3663
3664pub trait RecvBehavior<Role, PacketIdType>
3667where
3668 PacketIdType: IsPacketId,
3669{
3670 fn recv(&mut self, data: &mut std::io::Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3671}
3672
3673impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3675 for GenericConnection<role::Client, PacketIdType>
3676where
3677 PacketIdType: IsPacketId,
3678{
3679 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3680 self.recv(data)
3681 }
3682}
3683
3684impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3685 for GenericConnection<role::Server, PacketIdType>
3686where
3687 PacketIdType: IsPacketId,
3688{
3689 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3690 self.recv(data)
3691 }
3692}
3693
3694impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3695 for GenericConnection<role::Any, PacketIdType>
3696where
3697 PacketIdType: IsPacketId,
3698{
3699 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3700 self.recv(data)
3701 }
3702}
3703
3704#[cfg(test)]
3707mod tests {
3708 use super::*;
3709 use crate::mqtt::connection::version::Version;
3710 use crate::mqtt::packet::TopicAliasSend;
3711 use crate::mqtt::role;
3712
3713 #[test]
3714 fn test_initialize_client_mode() {
3715 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3716
3717 connection.initialize(true);
3719
3720 assert!(connection.is_client);
3722 assert_eq!(connection.publish_send_count, 0);
3723 assert!(connection.publish_send_max.is_none());
3724 assert!(connection.publish_recv_max.is_none());
3725 assert!(!connection.need_store);
3726 }
3727
3728 #[test]
3729 fn test_initialize_server_mode() {
3730 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3731
3732 connection.initialize(false);
3734
3735 assert!(!connection.is_client);
3737 assert_eq!(connection.publish_send_count, 0);
3738 assert!(connection.publish_send_max.is_none());
3739 assert!(connection.publish_recv_max.is_none());
3740 assert!(!connection.need_store);
3741 }
3742
3743 #[test]
3744 fn test_validate_topic_alias_no_topic_alias_send() {
3745 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3746
3747 let result = connection.validate_topic_alias(Some(1));
3749 assert!(result.is_none());
3750 }
3751
3752 #[test]
3753 fn test_validate_topic_alias_none_input() {
3754 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3755
3756 let result = connection.validate_topic_alias(None);
3758 assert!(result.is_none());
3759 }
3760
3761 #[test]
3762 fn test_validate_topic_alias_range_no_topic_alias_send() {
3763 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3764
3765 let result = connection.validate_topic_alias_range(1);
3767 assert!(!result);
3768 }
3769
3770 #[test]
3771 fn test_validate_topic_alias_range_zero() {
3772 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3773
3774 let topic_alias_send = TopicAliasSend::new(10);
3776 connection.topic_alias_send = Some(topic_alias_send);
3777
3778 let result = connection.validate_topic_alias_range(0);
3780 assert!(!result);
3781 }
3782
3783 #[test]
3784 fn test_validate_topic_alias_range_over_max() {
3785 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3786
3787 let topic_alias_send = TopicAliasSend::new(5);
3789 connection.topic_alias_send = Some(topic_alias_send);
3790
3791 let result = connection.validate_topic_alias_range(6);
3793 assert!(!result);
3794 }
3795
3796 #[test]
3797 fn test_validate_topic_alias_range_valid() {
3798 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3799
3800 let topic_alias_send = TopicAliasSend::new(10);
3802 connection.topic_alias_send = Some(topic_alias_send);
3803
3804 assert!(connection.validate_topic_alias_range(1));
3806 assert!(connection.validate_topic_alias_range(5));
3807 assert!(connection.validate_topic_alias_range(10));
3808 }
3809
3810 #[test]
3811 fn test_validate_topic_alias_with_registered_alias() {
3812 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3813
3814 let mut topic_alias_send = TopicAliasSend::new(10);
3816 topic_alias_send.insert_or_update("test/topic", 5);
3817 connection.topic_alias_send = Some(topic_alias_send);
3818
3819 let result = connection.validate_topic_alias(Some(5));
3821 assert_eq!(result, Some("test/topic".to_string()));
3822 }
3823
3824 #[test]
3825 fn test_validate_topic_alias_unregistered_alias() {
3826 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3827
3828 let topic_alias_send = TopicAliasSend::new(10);
3830 connection.topic_alias_send = Some(topic_alias_send);
3831
3832 let result = connection.validate_topic_alias(Some(5));
3834 assert!(result.is_none());
3835 }
3836
3837 #[test]
3838 fn test_validate_maximum_packet_size_within_limit() {
3839 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3840
3841 let result = connection.validate_maximum_packet_size_send(1000);
3843 assert!(result);
3844 }
3845
3846 #[test]
3847 fn test_validate_maximum_packet_size_at_limit() {
3848 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3849
3850 connection.maximum_packet_size_send = 1000;
3852
3853 let result = connection.validate_maximum_packet_size_send(1000);
3855 assert!(result);
3856 }
3857
3858 #[test]
3859 fn test_validate_maximum_packet_size_over_limit() {
3860 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3861
3862 connection.maximum_packet_size_send = 1000;
3864
3865 let result = connection.validate_maximum_packet_size_send(1001);
3867 assert!(!result);
3868 }
3869
3870 #[test]
3871 fn test_validate_maximum_packet_size_zero_limit() {
3872 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3873
3874 connection.maximum_packet_size_send = 0;
3876
3877 let result = connection.validate_maximum_packet_size_send(1);
3879 assert!(!result);
3880
3881 let result = connection.validate_maximum_packet_size_send(0);
3883 assert!(result);
3884 }
3885
3886 #[test]
3887 fn test_initialize_clears_state() {
3888 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3889
3890 connection.publish_send_count = 5;
3892 connection.need_store = true;
3893 connection.pid_suback.insert(123);
3894 connection.pid_unsuback.insert(456);
3895
3896 connection.initialize(true);
3898
3899 assert_eq!(connection.publish_send_count, 0);
3901 assert!(!connection.need_store);
3902 assert!(connection.pid_suback.is_empty());
3903 assert!(connection.pid_unsuback.is_empty());
3904 assert!(connection.is_client);
3905 }
3906
3907 #[test]
3908 fn test_remaining_length_to_total_size() {
3909 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3925}