1use std::collections::HashSet;
25use std::io::Cursor;
26use std::marker::PhantomData;
27
28use crate::mqtt::connection::GenericStore;
29use crate::mqtt::connection::event::{GenericEvent, TimerKind};
30
31use serde::Serialize;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
34enum ConnectionStatus {
35 #[serde(rename = "disconnected")]
36 Disconnected,
37 #[serde(rename = "connecting")]
38 Connecting,
39 #[serde(rename = "connected")]
40 Connected,
41}
42use crate::mqtt::connection::packet_builder::{
43 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
44};
45use crate::mqtt::connection::role;
46use crate::mqtt::connection::role::RoleType;
47use crate::mqtt::connection::sendable::Sendable;
48use crate::mqtt::packet::GenericPacket;
49use crate::mqtt::packet::GenericStorePacket;
50use crate::mqtt::packet::Qos;
51use crate::mqtt::packet::ResponsePacket;
52use crate::mqtt::packet::v3_1_1;
53use crate::mqtt::packet::v5_0;
54use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
55use crate::mqtt::packet_id_manager::PacketIdManager;
56use crate::mqtt::prelude::GenericPacketTrait;
57use crate::mqtt::result_code::{
58 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
59};
60use crate::mqtt::packet::IsPacketId;
61use crate::mqtt::version::*;
62
63const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
66
67fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
74 let remaining_length_bytes = if remaining_length < 128 {
75 1
76 } else if remaining_length < 16384 {
77 2
78 } else if remaining_length < 2097152 {
79 3
80 } else {
81 4
82 };
83
84 1 + remaining_length_bytes + remaining_length
85}
86
87pub type Event = GenericEvent<u16>;
92
93pub struct GenericConnection<Role, PacketIdType>
136where
137 Role: RoleType,
138 PacketIdType: IsPacketId,
139{
140 _marker: PhantomData<Role>,
141
142 protocol_version: Version,
143
144 pid_man: PacketIdManager<PacketIdType>,
145 pid_suback: HashSet<PacketIdType>,
146 pid_unsuback: HashSet<PacketIdType>,
147 pid_puback: HashSet<PacketIdType>,
148 pid_pubrec: HashSet<PacketIdType>,
149 pid_pubcomp: HashSet<PacketIdType>,
150
151 need_store: bool,
152 store: GenericStore<PacketIdType>,
154
155 offline_publish: bool,
156 auto_pub_response: bool,
157 auto_ping_response: bool,
158
159 auto_map_topic_alias_send: bool,
161 auto_replace_topic_alias_send: bool,
163 topic_alias_recv: Option<TopicAliasRecv>,
165 topic_alias_send: Option<TopicAliasSend>,
167
168 publish_send_max: Option<u16>,
169 publish_recv_max: Option<u16>,
171 publish_send_count: u16,
174
175 publish_recv: HashSet<PacketIdType>,
177
178 maximum_packet_size_send: u32,
180 maximum_packet_size_recv: u32,
182
183 status: ConnectionStatus,
185
186 pingreq_send_interval_ms: Option<u64>,
188 pingreq_recv_timeout_ms: Option<u64>,
190 pingresp_recv_timeout_ms: Option<u64>,
192
193 qos2_publish_handled: HashSet<PacketIdType>,
195 qos2_publish_processing: HashSet<PacketIdType>,
197
198 pingreq_send_set: bool,
200 pingreq_recv_set: bool,
201 pingresp_recv_set: bool,
202
203 packet_builder: PacketBuilder,
204 is_client: bool,
206}
207
208pub type Connection<Role> = GenericConnection<Role, u16>;
221
222impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
223where
224 Role: RoleType,
225 PacketIdType: IsPacketId,
226{
227 pub fn new(version: Version) -> Self {
250 Self {
251 _marker: PhantomData,
252 protocol_version: version,
253 pid_man: PacketIdManager::new(),
254 pid_suback: HashSet::new(),
255 pid_unsuback: HashSet::new(),
256 pid_puback: HashSet::new(),
257 pid_pubrec: HashSet::new(),
258 pid_pubcomp: HashSet::new(),
259 need_store: false,
260 store: GenericStore::new(),
261 offline_publish: false,
262 auto_pub_response: false,
263 auto_ping_response: false,
264 auto_map_topic_alias_send: false,
265 auto_replace_topic_alias_send: false,
266 topic_alias_recv: None,
267 topic_alias_send: None,
268 publish_send_max: None,
269 publish_recv_max: None,
270 publish_send_count: 0,
271 publish_recv: HashSet::new(),
272 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
273 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
274 status: ConnectionStatus::Disconnected,
275 pingreq_send_interval_ms: None,
276 pingreq_recv_timeout_ms: None,
277 pingresp_recv_timeout_ms: None,
278 qos2_publish_handled: HashSet::new(),
279 qos2_publish_processing: HashSet::new(),
280 pingreq_send_set: false,
281 pingreq_recv_set: false,
282 pingresp_recv_set: false,
283 packet_builder: PacketBuilder::new(),
284 is_client: false,
285 }
286 }
287
288 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
329 where
330 T: Sendable<Role, PacketIdType>,
331 {
332 packet.dispatch_send(self)
334 }
335
336 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
381 use std::any::TypeId;
382
383 let role_id = TypeId::of::<Role>();
384 let client_id = TypeId::of::<role::Client>();
385 let server_id = TypeId::of::<role::Server>();
386 let any_id = TypeId::of::<role::Any>();
387
388 let packet_version = packet.protocol_version();
390
391 if self.protocol_version != packet_version {
393 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
394 }
395
396 match packet {
397 GenericPacket::V3_1_1Connect(p) => {
399 if role_id == client_id || role_id == any_id {
400 self.process_send_v3_1_1_connect(p)
401 } else {
402 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
403 }
404 }
405 GenericPacket::V5_0Connect(p) => {
406 if role_id == client_id || role_id == any_id {
407 self.process_send_v5_0_connect(p)
408 } else {
409 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
410 }
411 }
412 GenericPacket::V3_1_1Connack(p) => {
414 if role_id == server_id || role_id == any_id {
415 self.process_send_v3_1_1_connack(p)
416 } else {
417 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
418 }
419 }
420 GenericPacket::V5_0Connack(p) => {
421 if role_id == server_id || role_id == any_id {
422 self.process_send_v5_0_connack(p)
423 } else {
424 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
425 }
426 }
427 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
429 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
430 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
432 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
433 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
434 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
435 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
436 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
437 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
438 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
439 GenericPacket::V3_1_1Subscribe(p) => {
441 if role_id == client_id || role_id == any_id {
442 self.process_send_v3_1_1_subscribe(p)
443 } else {
444 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
445 }
446 }
447 GenericPacket::V5_0Subscribe(p) => {
448 if role_id == client_id || role_id == any_id {
449 self.process_send_v5_0_subscribe(p)
450 } else {
451 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
452 }
453 }
454 GenericPacket::V3_1_1Unsubscribe(p) => {
455 if role_id == client_id || role_id == any_id {
456 self.process_send_v3_1_1_unsubscribe(p)
457 } else {
458 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
459 }
460 }
461 GenericPacket::V5_0Unsubscribe(p) => {
462 if role_id == client_id || role_id == any_id {
463 self.process_send_v5_0_unsubscribe(p)
464 } else {
465 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
466 }
467 }
468 GenericPacket::V3_1_1Suback(p) => {
470 if role_id == server_id || role_id == any_id {
471 self.process_send_v3_1_1_suback(p)
472 } else {
473 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
474 }
475 }
476 GenericPacket::V5_0Suback(p) => {
477 if role_id == server_id || role_id == any_id {
478 self.process_send_v5_0_suback(p)
479 } else {
480 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
481 }
482 }
483 GenericPacket::V3_1_1Unsuback(p) => {
484 if role_id == server_id || role_id == any_id {
485 self.process_send_v3_1_1_unsuback(p)
486 } else {
487 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
488 }
489 }
490 GenericPacket::V5_0Unsuback(p) => {
491 if role_id == server_id || role_id == any_id {
492 self.process_send_v5_0_unsuback(p)
493 } else {
494 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
495 }
496 }
497 GenericPacket::V3_1_1Pingreq(p) => {
499 if role_id == client_id || role_id == any_id {
500 self.process_send_v3_1_1_pingreq(p)
501 } else {
502 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
503 }
504 }
505 GenericPacket::V5_0Pingreq(p) => {
506 if role_id == client_id || role_id == any_id {
507 self.process_send_v5_0_pingreq(p)
508 } else {
509 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
510 }
511 }
512 GenericPacket::V3_1_1Pingresp(p) => {
514 if role_id == server_id || role_id == any_id {
515 self.process_send_v3_1_1_pingresp(p)
516 } else {
517 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
518 }
519 }
520 GenericPacket::V5_0Pingresp(p) => {
521 if role_id == server_id || role_id == any_id {
522 self.process_send_v5_0_pingresp(p)
523 } else {
524 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
525 }
526 }
527 GenericPacket::V3_1_1Disconnect(p) => {
529 if role_id == client_id || role_id == any_id {
530 self.process_send_v3_1_1_disconnect(p)
531 } else {
532 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
533 }
534 }
535 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
537 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
539 }
540 }
541
542 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
591 let mut events = Vec::new();
592
593 match self.packet_builder.feed(data) {
594 PacketBuildResult::Complete(raw_packet) => {
595 events.extend(self.process_recv_packet(raw_packet));
596 }
597 PacketBuildResult::Incomplete => {}
598 PacketBuildResult::Error(e) => {
599 self.cancel_timers(&mut events);
600 events.push(GenericEvent::RequestClose);
601 events.push(GenericEvent::NotifyError(e));
602 }
603 }
604
605 events
606 }
607
608 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
625 let mut events = Vec::new();
626
627 match kind {
628 TimerKind::PingreqSend => {
629 self.pingreq_send_set = false;
631
632 if self.status == ConnectionStatus::Connected {
634 match self.protocol_version {
635 Version::V3_1_1 => {
636 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
637 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
638 }
639 }
640 Version::V5_0 => {
641 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
642 events.extend(self.process_send_v5_0_pingreq(pingreq));
643 }
644 }
645 Version::Undetermined => {
646 unreachable!("Protocol version should be set before sending PINGREQ");
647 }
648 }
649 }
650 }
651 TimerKind::PingreqRecv => {
652 self.pingreq_recv_set = false;
654
655 match self.protocol_version {
656 Version::V3_1_1 => {
657 events.push(GenericEvent::RequestClose);
659 }
660 Version::V5_0 => {
661 if self.status == ConnectionStatus::Connected {
663 if let Ok(disconnect) = v5_0::Disconnect::builder()
664 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
665 .build()
666 {
667 events.extend(self.process_send_v5_0_disconnect(disconnect));
668 }
669 }
670 }
671 Version::Undetermined => {
672 unreachable!("Protocol version should be set before receiving PINGREQ");
673 }
674 }
675 }
676 TimerKind::PingrespRecv => {
677 self.pingresp_recv_set = false;
679
680 match self.protocol_version {
681 Version::V3_1_1 => {
682 events.push(GenericEvent::RequestClose);
684 }
685 Version::V5_0 => {
686 if self.status == ConnectionStatus::Connected {
688 if let Ok(disconnect) = v5_0::Disconnect::builder()
689 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
690 .build()
691 {
692 events.extend(self.process_send_v5_0_disconnect(disconnect));
693 }
694 }
695 }
696 Version::Undetermined => {
697 unreachable!("Protocol version should be set before receiving PINGRESP");
698 }
699 }
700 }
701 }
702
703 events
704 }
705
706 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
719 let mut events = Vec::new();
720
721 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
723 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
724
725 self.status = ConnectionStatus::Disconnected;
727
728 self.topic_alias_send = None;
730 self.topic_alias_recv = None;
731
732 for packet_id in self.pid_suback.drain() {
734 if self.pid_man.is_used_id(packet_id) {
735 self.pid_man.release_id(packet_id);
736 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
737 }
738 }
739
740 for packet_id in self.pid_unsuback.drain() {
742 if self.pid_man.is_used_id(packet_id) {
743 self.pid_man.release_id(packet_id);
744 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
745 }
746 }
747
748 if !self.need_store {
750 self.qos2_publish_processing.clear();
751 self.qos2_publish_handled.clear();
752
753 for packet_id in self.pid_puback.drain() {
755 if self.pid_man.is_used_id(packet_id) {
756 self.pid_man.release_id(packet_id);
757 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
758 }
759 }
760
761 for packet_id in self.pid_pubrec.drain() {
763 if self.pid_man.is_used_id(packet_id) {
764 self.pid_man.release_id(packet_id);
765 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
766 }
767 }
768
769 for packet_id in self.pid_pubcomp.drain() {
771 if self.pid_man.is_used_id(packet_id) {
772 self.pid_man.release_id(packet_id);
773 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
774 }
775 }
776 }
777
778 self.cancel_timers(&mut events);
780
781 events
782 }
783
784 pub fn set_pingreq_send_interval(
797 &mut self,
798 duration_ms: u64,
799 ) -> Vec<GenericEvent<PacketIdType>> {
800 let mut events = Vec::new();
801
802 if duration_ms == 0 {
803 self.pingreq_send_interval_ms = None;
804 self.pingreq_send_set = false;
805 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
806 } else {
807 self.pingreq_send_interval_ms = Some(duration_ms);
808 self.pingreq_send_set = true;
809 events.push(GenericEvent::RequestTimerReset {
810 kind: TimerKind::PingreqSend,
811 duration_ms,
812 });
813 }
814
815 events
816 }
817
818 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
827 if let Some(max) = self.publish_send_max {
829 Some(max.saturating_sub(self.publish_send_count))
830 } else {
831 None }
833 }
834
835 pub fn set_offline_publish(&mut self, enable: bool) {
844 self.offline_publish = enable;
845 if self.offline_publish {
846 self.need_store = true;
847 }
848 }
849
850 pub fn set_auto_pub_response(&mut self, enable: bool) {
859 self.auto_pub_response = enable;
860 }
861
862 pub fn set_auto_ping_response(&mut self, enable: bool) {
870 self.auto_ping_response = enable;
871 }
872
873 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
885 self.auto_map_topic_alias_send = enable;
886 }
887
888 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
898 self.auto_replace_topic_alias_send = enable;
899 }
900
901 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
903 self.pingresp_recv_timeout_ms = timeout_ms;
904 }
905
906 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
912 self.pid_man.acquire_unique_id()
913 }
914
915 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
928 self.pid_man.register_id(packet_id)
929 }
930
931 pub fn release_packet_id(
945 &mut self,
946 packet_id: PacketIdType,
947 ) -> Vec<GenericEvent<PacketIdType>> {
948 let mut events = Vec::new();
949
950 if self.pid_man.is_used_id(packet_id) {
951 self.pid_man.release_id(packet_id);
952 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
953 }
954
955 events
956 }
957
958 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
967 self.qos2_publish_handled.clone()
968 }
969
970 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
979 self.qos2_publish_handled = pids;
980 }
981
982 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
991 for packet in packets {
992 match &packet {
993 GenericStorePacket::V3_1_1Publish(p) => {
994 match p.qos() {
996 Qos::AtLeastOnce => {
997 self.pid_puback.insert(p.packet_id().unwrap());
998 }
999 Qos::ExactlyOnce => {
1000 self.pid_pubrec.insert(p.packet_id().unwrap());
1001 }
1002 _ => {
1003 tracing::warn!("QoS 0 packet found in store, skipping");
1005 continue;
1006 }
1007 }
1008 let packet_id = p.packet_id().unwrap();
1010 if self.pid_man.register_id(packet_id).is_ok() {
1011 if let Err(e) = self.store.add(packet) {
1012 tracing::error!("Failed to add packet to store: {:?}", e);
1013 }
1014 } else {
1015 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1016 }
1017 }
1018 GenericStorePacket::V5_0Publish(p) => {
1019 match p.qos() {
1021 Qos::AtLeastOnce => {
1022 self.pid_puback.insert(p.packet_id().unwrap());
1023 }
1024 Qos::ExactlyOnce => {
1025 self.pid_pubrec.insert(p.packet_id().unwrap());
1026 }
1027 _ => {
1028 tracing::warn!("QoS 0 packet found in store, skipping");
1030 continue;
1031 }
1032 }
1033 let packet_id = p.packet_id().unwrap();
1035 if self.pid_man.register_id(packet_id).is_ok() {
1036 if let Err(e) = self.store.add(packet) {
1037 tracing::error!("Failed to add packet to store: {:?}", e);
1038 }
1039 } else {
1040 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1041 }
1042 }
1043 GenericStorePacket::V3_1_1Pubrel(p) => {
1044 self.pid_pubcomp.insert(p.packet_id());
1046 let packet_id = p.packet_id();
1048 if self.pid_man.register_id(packet_id).is_ok() {
1049 if let Err(e) = self.store.add(packet) {
1050 tracing::error!("Failed to add packet to store: {:?}", e);
1051 }
1052 } else {
1053 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1054 }
1055 }
1056 GenericStorePacket::V5_0Pubrel(p) => {
1057 self.pid_pubcomp.insert(p.packet_id());
1059 let packet_id = p.packet_id();
1061 if self.pid_man.register_id(packet_id).is_ok() {
1062 if let Err(e) = self.store.add(packet) {
1063 tracing::error!("Failed to add packet to store: {:?}", e);
1064 }
1065 } else {
1066 tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1067 }
1068 }
1069 }
1070 }
1071 }
1072
1073 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1082 self.store.get_stored()
1083 }
1084
1085 pub fn get_protocol_version(&self) -> Version {
1091 self.protocol_version
1092 }
1093
1094 pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1104 self.qos2_publish_processing.contains(&packet_id)
1105 }
1106
1107 pub fn regulate_for_store(
1112 &self,
1113 mut packet: v5_0::GenericPublish<PacketIdType>,
1114 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1115 if packet.topic_name().is_empty() {
1116 if let Some(props) = packet.props() {
1118 if let Some(topic_alias) = Self::get_topic_alias_from_props(props) {
1119 if let Some(ref topic_alias_send) = self.topic_alias_send {
1120 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1121 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1123 } else {
1124 return Err(MqttError::PacketNotRegulated);
1125 }
1126 } else {
1127 return Err(MqttError::PacketNotRegulated);
1128 }
1129 } else {
1130 return Err(MqttError::PacketNotRegulated);
1131 }
1132 } else {
1133 return Err(MqttError::PacketNotRegulated);
1134 }
1135 } else {
1136 packet = packet.remove_topic_alias();
1138 }
1139
1140 Ok(packet)
1141 }
1142
1143 fn initialize(&mut self, is_client: bool) {
1157 self.publish_send_max = None;
1158 self.publish_recv_max = None;
1159 self.publish_send_count = 0;
1160 self.topic_alias_send = None;
1161 self.topic_alias_recv = None;
1162 self.publish_recv.clear();
1163 self.qos2_publish_processing.clear();
1164 self.need_store = false;
1165 self.pid_suback.clear();
1166 self.pid_unsuback.clear();
1167 self.is_client = is_client;
1168 }
1169
1170 fn clear_store_related(&mut self) {
1171 self.pid_man.clear();
1172 self.pid_puback.clear();
1173 self.pid_pubrec.clear();
1174 self.pid_pubcomp.clear();
1175 self.store.clear();
1176 }
1177
1178 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1180 let mut events = Vec::new();
1181 self.store.for_each(|packet| {
1182 if packet.size() > self.maximum_packet_size_send as usize {
1183 let packet_id = packet.packet_id();
1184 self.pid_man.release_id(packet_id);
1185 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1186 return false; }
1188 events.push(GenericEvent::RequestSendPacket {
1189 packet: packet.clone().into(),
1190 release_packet_id_if_send_error: None,
1191 });
1192 return true; });
1194
1195 events
1196 }
1197
1198 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1210 let topic_alias = topic_alias_opt?;
1211
1212 if !self.validate_topic_alias_range(topic_alias) {
1213 return None;
1214 }
1215
1216 let topic_alias_send = self.topic_alias_send.as_mut()?;
1217 let topic = topic_alias_send.get(topic_alias)?;
1219
1220 Some(topic.to_string())
1221 }
1222
1223 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1235 let topic_alias_send = match &self.topic_alias_send {
1236 Some(tas) => tas,
1237 None => {
1238 tracing::error!("topic_alias is set but topic_alias_maximum is 0");
1239 return false;
1240 }
1241 };
1242
1243 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1244 tracing::error!("topic_alias is set but out of range");
1245 return false;
1246 }
1247
1248 true
1249 }
1250
1251 pub(crate) fn process_send_v3_1_1_connect(
1253 &mut self,
1254 packet: v3_1_1::Connect,
1255 ) -> Vec<GenericEvent<PacketIdType>> {
1256 if self.status != ConnectionStatus::Disconnected {
1257 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1258 }
1259 if !self.validate_maximum_packet_size_send(packet.size()) {
1260 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1261 }
1262
1263 let mut events = Vec::new();
1264 self.initialize(true);
1265 self.status = ConnectionStatus::Connecting;
1266
1267 let keep_alive = packet.keep_alive();
1269 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1270 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1271 }
1272
1273 if packet.clean_start() {
1275 self.clear_store_related();
1276 } else {
1277 self.need_store = true;
1278 }
1279
1280 self.topic_alias_send = None;
1282
1283 events.push(GenericEvent::RequestSendPacket {
1284 packet: packet.into(),
1285 release_packet_id_if_send_error: None,
1286 });
1287 self.send_post_process(&mut events);
1288
1289 events
1290 }
1291
1292 pub(crate) fn process_send_v5_0_connect(
1294 &mut self,
1295 packet: v5_0::Connect,
1296 ) -> Vec<GenericEvent<PacketIdType>> {
1297 if !self.validate_maximum_packet_size_send(packet.size()) {
1298 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1299 }
1300 if self.status != ConnectionStatus::Disconnected {
1301 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1302 }
1303
1304 let mut events = Vec::new();
1305 self.initialize(true);
1306 self.status = ConnectionStatus::Connecting;
1307
1308 let keep_alive = packet.keep_alive();
1310 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1311 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1312 }
1313
1314 if packet.clean_start() {
1316 self.clear_store_related();
1317 }
1318
1319 for prop in packet.props() {
1321 match prop {
1322 Property::TopicAliasMaximum(val) => {
1323 if val.val() != 0 {
1324 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1325 }
1326 }
1327 Property::ReceiveMaximum(val) => {
1328 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1329 self.publish_recv_max = Some(val.val());
1330 }
1331 Property::MaximumPacketSize(val) => {
1332 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1333 self.maximum_packet_size_recv = val.val();
1334 }
1335 Property::SessionExpiryInterval(val) => {
1336 if val.val() != 0 {
1337 self.need_store = true;
1338 }
1339 }
1340 _ => {
1341 }
1343 }
1344 }
1345
1346 events.push(GenericEvent::RequestSendPacket {
1347 packet: packet.into(),
1348 release_packet_id_if_send_error: None,
1349 });
1350 self.send_post_process(&mut events);
1351
1352 events
1353 }
1354
1355 pub(crate) fn process_send_v3_1_1_connack(
1356 &mut self,
1357 packet: v3_1_1::Connack,
1358 ) -> Vec<GenericEvent<PacketIdType>> {
1359 if self.status != ConnectionStatus::Connecting {
1360 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1361 }
1362 let mut events = Vec::new();
1363 if packet.return_code() == ConnectReturnCode::Accepted {
1364 self.status = ConnectionStatus::Connected;
1365 } else {
1366 self.status = ConnectionStatus::Disconnected;
1367 }
1368
1369 events.push(GenericEvent::RequestSendPacket {
1370 packet: packet.into(),
1371 release_packet_id_if_send_error: None,
1372 });
1373 events.extend(self.send_stored());
1374 self.send_post_process(&mut events);
1375
1376 events
1377 }
1378
1379 pub(crate) fn process_send_v5_0_connack(
1380 &mut self,
1381 packet: v5_0::Connack,
1382 ) -> Vec<GenericEvent<PacketIdType>> {
1383 if !self.validate_maximum_packet_size_send(packet.size()) {
1384 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1385 }
1386 if self.status != ConnectionStatus::Connecting {
1387 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1388 }
1389
1390 let mut events = Vec::new();
1391
1392 if packet.reason_code() == ConnectReasonCode::Success {
1393 self.status = ConnectionStatus::Connected;
1394
1395 for prop in packet.props() {
1397 match prop {
1398 Property::TopicAliasMaximum(val) => {
1399 if val.val() != 0 {
1400 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1401 }
1402 }
1403 Property::ReceiveMaximum(val) => {
1404 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1405 self.publish_recv_max = Some(val.val());
1406 }
1407 Property::MaximumPacketSize(val) => {
1408 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1409 self.maximum_packet_size_recv = val.val();
1410 }
1411 _ => {
1412 }
1414 }
1415 }
1416 } else {
1417 self.status = ConnectionStatus::Disconnected;
1418 self.cancel_timers(&mut events);
1419 }
1420
1421 events.push(GenericEvent::RequestSendPacket {
1422 packet: packet.into(),
1423 release_packet_id_if_send_error: None,
1424 });
1425 events.extend(self.send_stored());
1426 self.send_post_process(&mut events);
1427
1428 events
1429 }
1430
1431 pub(crate) fn process_send_v3_1_1_publish(
1432 &mut self,
1433 packet: v3_1_1::GenericPublish<PacketIdType>,
1434 ) -> Vec<GenericEvent<PacketIdType>> {
1435 let mut events = Vec::new();
1436 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1437
1438 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1439 let packet_id = packet.packet_id().unwrap();
1441 if self.status != ConnectionStatus::Connected
1442 && !self.need_store
1443 && !self.offline_publish
1444 {
1445 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1446 if self.pid_man.is_used_id(packet_id) {
1447 self.pid_man.release_id(packet_id);
1448 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1449 }
1450 return events;
1451 }
1452 if !self.pid_man.is_used_id(packet_id) {
1453 tracing::error!("packet_id {packet_id} must be acquired or registered");
1454 events.push(GenericEvent::NotifyError(
1455 MqttError::PacketIdentifierInvalid,
1456 ));
1457 return events;
1458 }
1459 if self.need_store
1460 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1461 {
1462 let store_packet = packet.clone().set_dup(true);
1463 self.store.add(store_packet.try_into().unwrap()).unwrap();
1464 } else {
1465 release_packet_id_if_send_error = Some(packet_id);
1466 }
1467 if packet.qos() == Qos::ExactlyOnce {
1468 self.qos2_publish_processing.insert(packet_id);
1469 self.pid_pubrec.insert(packet_id);
1470 } else {
1471 self.pid_puback.insert(packet_id);
1472 }
1473 } else if self.status != ConnectionStatus::Connected {
1474 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1475 return events;
1476 }
1477
1478 if self.status == ConnectionStatus::Connected {
1479 events.push(GenericEvent::RequestSendPacket {
1480 packet: packet.into(),
1481 release_packet_id_if_send_error,
1482 });
1483 }
1484 self.send_post_process(&mut events);
1485
1486 events
1487 }
1488
1489 pub(crate) fn process_send_v5_0_publish(
1490 &mut self,
1491 mut packet: v5_0::GenericPublish<PacketIdType>,
1492 ) -> Vec<GenericEvent<PacketIdType>> {
1493 if !self.validate_maximum_packet_size_send(packet.size()) {
1494 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1495 }
1496
1497 let mut events = Vec::new();
1498 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1499 let mut topic_alias_validated = false;
1500 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1501 let packet_id = packet.packet_id().unwrap();
1502 if self.status != ConnectionStatus::Connected
1503 && !self.need_store
1504 && !self.offline_publish
1505 {
1506 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1507 if self.pid_man.is_used_id(packet_id) {
1508 self.pid_man.release_id(packet_id);
1509 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1510 }
1511 return events;
1512 }
1513
1514 if !self.pid_man.is_used_id(packet_id) {
1516 tracing::error!("packet_id {packet_id} must be acquired or registered");
1517 events.push(GenericEvent::NotifyError(
1518 MqttError::PacketIdentifierInvalid,
1519 ));
1520 return events;
1521 }
1522
1523 if self.need_store
1524 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1525 {
1526 let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1527 if packet.topic_name().is_empty() {
1528 let topic_opt = self.validate_topic_alias(ta_opt);
1530 if topic_opt.is_none() {
1531 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1532 if self.pid_man.is_used_id(packet_id) {
1533 self.pid_man.release_id(packet_id);
1534 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1535 }
1536 return events;
1537 }
1538 topic_alias_validated = true;
1539 let store_packet = packet
1540 .clone()
1541 .remove_topic_alias_add_topic(topic_opt.unwrap())
1542 .unwrap()
1543 .set_dup(true);
1544 self.store.add(store_packet.try_into().unwrap()).unwrap();
1546 } else {
1547 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1549 self.store.add(store_packet.try_into().unwrap()).unwrap();
1550 }
1551 } else {
1552 release_packet_id_if_send_error = Some(packet_id);
1553 }
1554 if packet.qos() == Qos::ExactlyOnce {
1555 self.qos2_publish_processing.insert(packet_id);
1556 self.pid_pubrec.insert(packet_id);
1557 } else {
1558 self.pid_puback.insert(packet_id);
1559 }
1560 } else if self.status != ConnectionStatus::Connected {
1561 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1562 return events;
1563 }
1564
1565 let packet_id_opt = packet.packet_id();
1566 let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1567 if packet.topic_name().is_empty() {
1568 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1570 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1571 if let Some(packet_id) = packet_id_opt {
1572 if self.pid_man.is_used_id(packet_id) {
1573 self.pid_man.release_id(packet_id);
1574 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1575 }
1576 }
1577 return events;
1578 }
1579 } else {
1580 if let Some(ta) = ta_opt {
1581 if self.validate_topic_alias_range(ta) {
1583 tracing::trace!(
1584 "topic alias: {} - {} is registered.",
1585 packet.topic_name(),
1586 ta
1587 );
1588 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590 }
1591 } else {
1592 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593 if let Some(packet_id) = packet_id_opt {
1594 if self.pid_man.is_used_id(packet_id) {
1595 self.pid_man.release_id(packet_id);
1596 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597 }
1598 }
1599 return events;
1600 }
1601 } else if self.status == ConnectionStatus::Connected {
1602 if self.auto_map_topic_alias_send {
1604 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name())
1606 {
1607 tracing::trace!(
1608 "topic alias: {} - {} is found.",
1609 packet.topic_name(),
1610 found_ta
1611 );
1612 packet = packet.remove_topic_add_topic_alias(found_ta);
1613 } else {
1614 let lru_ta = topic_alias_send.get_lru_alias();
1615 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1616 packet = packet.remove_topic_add_topic_alias(lru_ta);
1617 }
1618 }
1619 } else if self.auto_replace_topic_alias_send {
1620 if let Some(ref topic_alias_send) = self.topic_alias_send {
1621 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name())
1622 {
1623 tracing::trace!(
1624 "topic alias: {} - {} is found.",
1625 packet.topic_name(),
1626 found_ta
1627 );
1628 packet = packet.remove_topic_add_topic_alias(found_ta);
1629 }
1630 }
1631 }
1632 }
1633 }
1634
1635 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1637 if let Some(max) = self.publish_send_max {
1638 if self.publish_send_count == max {
1639 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1640 if let Some(packet_id) = packet_id_opt {
1641 if self.pid_man.is_used_id(packet_id) {
1642 self.pid_man.release_id(packet_id);
1643 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1644 }
1645 }
1646 return events;
1647 }
1648 self.publish_send_count += 1;
1649 }
1650 }
1651
1652 if self.status == ConnectionStatus::Connected {
1653 events.push(GenericEvent::RequestSendPacket {
1654 packet: packet.into(),
1655 release_packet_id_if_send_error,
1656 });
1657 }
1658 self.send_post_process(&mut events);
1659
1660 events
1661 }
1662
1663 pub(crate) fn process_send_v3_1_1_puback(
1664 &mut self,
1665 packet: v3_1_1::GenericPuback<PacketIdType>,
1666 ) -> Vec<GenericEvent<PacketIdType>> {
1667 if self.status != ConnectionStatus::Connected {
1668 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1669 }
1670 let mut events = Vec::new();
1671
1672 events.push(GenericEvent::RequestSendPacket {
1673 packet: packet.into(),
1674 release_packet_id_if_send_error: None,
1675 });
1676 self.send_post_process(&mut events);
1677
1678 events
1679 }
1680
1681 pub(crate) fn process_send_v5_0_puback(
1682 &mut self,
1683 packet: v5_0::GenericPuback<PacketIdType>,
1684 ) -> Vec<GenericEvent<PacketIdType>> {
1685 if !self.validate_maximum_packet_size_send(packet.size()) {
1686 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1687 }
1688 if self.status != ConnectionStatus::Connected {
1689 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1690 }
1691
1692 let mut events = Vec::new();
1693 self.publish_recv.remove(&packet.packet_id());
1694
1695 events.push(GenericEvent::RequestSendPacket {
1696 packet: packet.into(),
1697 release_packet_id_if_send_error: None,
1698 });
1699 self.send_post_process(&mut events);
1700
1701 events
1702 }
1703
1704 pub(crate) fn process_send_v3_1_1_pubrec(
1705 &mut self,
1706 packet: v3_1_1::GenericPubrec<PacketIdType>,
1707 ) -> Vec<GenericEvent<PacketIdType>> {
1708 if self.status != ConnectionStatus::Connected {
1709 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1710 }
1711 let mut events = Vec::new();
1712
1713 events.push(GenericEvent::RequestSendPacket {
1714 packet: packet.into(),
1715 release_packet_id_if_send_error: None,
1716 });
1717 self.send_post_process(&mut events);
1718
1719 events
1720 }
1721
1722 pub(crate) fn process_send_v5_0_pubrec(
1723 &mut self,
1724 packet: v5_0::GenericPubrec<PacketIdType>,
1725 ) -> Vec<GenericEvent<PacketIdType>> {
1726 if !self.validate_maximum_packet_size_send(packet.size()) {
1727 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1728 }
1729 if self.status != ConnectionStatus::Connected {
1730 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1731 }
1732
1733 let mut events = Vec::new();
1734 let packet_id = packet.packet_id();
1735
1736 if let Some(rc) = packet.reason_code() {
1737 if rc.is_failure() {
1738 self.publish_recv.remove(&packet_id);
1739 self.qos2_publish_handled.remove(&packet_id);
1740 }
1741 }
1742
1743 events.push(GenericEvent::RequestSendPacket {
1744 packet: packet.into(),
1745 release_packet_id_if_send_error: None,
1746 });
1747 self.send_post_process(&mut events);
1748
1749 events
1750 }
1751
1752 pub(crate) fn process_send_v3_1_1_pubrel(
1753 &mut self,
1754 packet: v3_1_1::GenericPubrel<PacketIdType>,
1755 ) -> Vec<GenericEvent<PacketIdType>> {
1756 if self.status != ConnectionStatus::Connected && !self.need_store {
1757 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1758 }
1759 let mut events = Vec::new();
1760 let packet_id = packet.packet_id();
1761 if !self.pid_man.is_used_id(packet_id) {
1762 tracing::error!("packet_id {packet_id} must be acquired or registered");
1763 events.push(GenericEvent::NotifyError(
1764 MqttError::PacketIdentifierInvalid,
1765 ));
1766 return events;
1767 }
1768 if self.need_store {
1769 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1770 }
1771
1772 if self.status == ConnectionStatus::Connected {
1773 self.pid_pubcomp.insert(packet_id);
1774 events.push(GenericEvent::RequestSendPacket {
1775 packet: packet.into(),
1776 release_packet_id_if_send_error: None,
1777 });
1778 }
1779 self.send_post_process(&mut events);
1780
1781 events
1782 }
1783
1784 pub(crate) fn process_send_v5_0_pubrel(
1785 &mut self,
1786 packet: v5_0::GenericPubrel<PacketIdType>,
1787 ) -> Vec<GenericEvent<PacketIdType>> {
1788 if !self.validate_maximum_packet_size_send(packet.size()) {
1789 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1790 }
1791 if self.status != ConnectionStatus::Connected && !self.need_store {
1792 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1793 }
1794
1795 let mut events = Vec::new();
1796 let packet_id = packet.packet_id();
1797 if !self.pid_man.is_used_id(packet_id) {
1798 tracing::error!("packet_id {packet_id} must be acquired or registered");
1799 events.push(GenericEvent::NotifyError(
1800 MqttError::PacketIdentifierInvalid,
1801 ));
1802 return events;
1803 }
1804 if self.need_store {
1805 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1806 }
1807
1808 if self.status == ConnectionStatus::Connected {
1809 self.pid_pubcomp.insert(packet_id);
1810 events.push(GenericEvent::RequestSendPacket {
1811 packet: packet.into(),
1812 release_packet_id_if_send_error: None,
1813 });
1814 }
1815 self.send_post_process(&mut events);
1816
1817 events
1818 }
1819
1820 pub(crate) fn process_send_v3_1_1_pubcomp(
1821 &mut self,
1822 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1823 ) -> Vec<GenericEvent<PacketIdType>> {
1824 if self.status != ConnectionStatus::Connected {
1825 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1826 }
1827 let mut events = Vec::new();
1828
1829 events.push(GenericEvent::RequestSendPacket {
1830 packet: packet.into(),
1831 release_packet_id_if_send_error: None,
1832 });
1833 self.send_post_process(&mut events);
1834
1835 events
1836 }
1837
1838 pub(crate) fn process_send_v5_0_pubcomp(
1839 &mut self,
1840 packet: v5_0::GenericPubcomp<PacketIdType>,
1841 ) -> Vec<GenericEvent<PacketIdType>> {
1842 if !self.validate_maximum_packet_size_send(packet.size()) {
1843 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1844 }
1845 if self.status != ConnectionStatus::Connected {
1846 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1847 }
1848
1849 let mut events = Vec::new();
1850 self.publish_recv.remove(&packet.packet_id());
1851
1852 events.push(GenericEvent::RequestSendPacket {
1853 packet: packet.into(),
1854 release_packet_id_if_send_error: None,
1855 });
1856 self.send_post_process(&mut events);
1857
1858 events
1859 }
1860
1861 pub(crate) fn process_send_v3_1_1_subscribe(
1862 &mut self,
1863 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1864 ) -> Vec<GenericEvent<PacketIdType>> {
1865 let mut events = Vec::new();
1866 let packet_id = packet.packet_id();
1867 if self.status != ConnectionStatus::Connected {
1868 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1869 if self.pid_man.is_used_id(packet_id) {
1870 self.pid_man.release_id(packet_id);
1871 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1872 }
1873 return events;
1874 }
1875 if !self.pid_man.is_used_id(packet_id) {
1876 tracing::error!("packet_id {packet_id} must be acquired or registered");
1877 events.push(GenericEvent::NotifyError(
1878 MqttError::PacketIdentifierInvalid,
1879 ));
1880 return events;
1881 }
1882 self.pid_suback.insert(packet_id);
1883
1884 events.push(GenericEvent::RequestSendPacket {
1885 packet: packet.into(),
1886 release_packet_id_if_send_error: Some(packet_id),
1887 });
1888 self.send_post_process(&mut events);
1889
1890 events
1891 }
1892
1893 pub(crate) fn process_send_v5_0_subscribe(
1894 &mut self,
1895 packet: v5_0::GenericSubscribe<PacketIdType>,
1896 ) -> Vec<GenericEvent<PacketIdType>> {
1897 if !self.validate_maximum_packet_size_send(packet.size()) {
1898 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1899 }
1900
1901 let mut events = Vec::new();
1902 let packet_id = packet.packet_id();
1903 if self.status != ConnectionStatus::Connected {
1904 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1905 if self.pid_man.is_used_id(packet_id) {
1906 self.pid_man.release_id(packet_id);
1907 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1908 }
1909 return events;
1910 }
1911 if !self.pid_man.is_used_id(packet_id) {
1912 tracing::error!("packet_id {packet_id} must be acquired or registered");
1913 events.push(GenericEvent::NotifyError(
1914 MqttError::PacketIdentifierInvalid,
1915 ));
1916 return events;
1917 }
1918 self.pid_suback.insert(packet_id);
1919
1920 events.push(GenericEvent::RequestSendPacket {
1921 packet: packet.into(),
1922 release_packet_id_if_send_error: Some(packet_id),
1923 });
1924 self.send_post_process(&mut events);
1925
1926 events
1927 }
1928
1929 pub(crate) fn process_send_v3_1_1_suback(
1930 &mut self,
1931 packet: v3_1_1::GenericSuback<PacketIdType>,
1932 ) -> Vec<GenericEvent<PacketIdType>> {
1933 if self.status != ConnectionStatus::Connected {
1934 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1935 }
1936 let mut events = Vec::new();
1937 events.push(GenericEvent::RequestSendPacket {
1938 packet: packet.into(),
1939 release_packet_id_if_send_error: None,
1940 });
1941 self.send_post_process(&mut events);
1942
1943 events
1944 }
1945
1946 pub(crate) fn process_send_v5_0_suback(
1947 &mut self,
1948 packet: v5_0::GenericSuback<PacketIdType>,
1949 ) -> Vec<GenericEvent<PacketIdType>> {
1950 if !self.validate_maximum_packet_size_send(packet.size()) {
1951 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1952 }
1953 if self.status != ConnectionStatus::Connected {
1954 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1955 }
1956
1957 let mut events = Vec::new();
1958 events.push(GenericEvent::RequestSendPacket {
1959 packet: packet.into(),
1960 release_packet_id_if_send_error: None,
1961 });
1962 self.send_post_process(&mut events);
1963
1964 events
1965 }
1966
1967 pub(crate) fn process_send_v3_1_1_unsubscribe(
1968 &mut self,
1969 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1970 ) -> Vec<GenericEvent<PacketIdType>> {
1971 let mut events = Vec::new();
1972 let packet_id = packet.packet_id();
1973 if self.status != ConnectionStatus::Connected {
1974 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1975 if self.pid_man.is_used_id(packet_id) {
1976 self.pid_man.release_id(packet_id);
1977 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1978 }
1979 return events;
1980 }
1981 if !self.pid_man.is_used_id(packet_id) {
1982 tracing::error!("packet_id {packet_id} must be acquired or registered");
1983 events.push(GenericEvent::NotifyError(
1984 MqttError::PacketIdentifierInvalid,
1985 ));
1986 return events;
1987 }
1988 self.pid_unsuback.insert(packet_id);
1989
1990 events.push(GenericEvent::RequestSendPacket {
1991 packet: packet.into(),
1992 release_packet_id_if_send_error: Some(packet_id),
1993 });
1994 self.send_post_process(&mut events);
1995
1996 events
1997 }
1998
1999 pub(crate) fn process_send_v5_0_unsubscribe(
2000 &mut self,
2001 packet: v5_0::GenericUnsubscribe<PacketIdType>,
2002 ) -> Vec<GenericEvent<PacketIdType>> {
2003 if !self.validate_maximum_packet_size_send(packet.size()) {
2004 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2005 }
2006
2007 let mut events = Vec::new();
2008 let packet_id = packet.packet_id();
2009 if self.status != ConnectionStatus::Connected {
2010 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2011 if self.pid_man.is_used_id(packet_id) {
2012 self.pid_man.release_id(packet_id);
2013 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2014 }
2015 return events;
2016 }
2017 if !self.pid_man.is_used_id(packet_id) {
2018 tracing::error!("packet_id {packet_id} must be acquired or registered");
2019 events.push(GenericEvent::NotifyError(
2020 MqttError::PacketIdentifierInvalid,
2021 ));
2022 return events;
2023 }
2024 self.pid_unsuback.insert(packet_id);
2025
2026 events.push(GenericEvent::RequestSendPacket {
2027 packet: packet.into(),
2028 release_packet_id_if_send_error: Some(packet_id),
2029 });
2030 self.send_post_process(&mut events);
2031
2032 events
2033 }
2034
2035 pub(crate) fn process_send_v3_1_1_unsuback(
2036 &mut self,
2037 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2038 ) -> Vec<GenericEvent<PacketIdType>> {
2039 if self.status != ConnectionStatus::Connected {
2040 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2041 }
2042 let mut events = Vec::new();
2043 events.push(GenericEvent::RequestSendPacket {
2044 packet: packet.into(),
2045 release_packet_id_if_send_error: None,
2046 });
2047 self.send_post_process(&mut events);
2048
2049 events
2050 }
2051
2052 pub(crate) fn process_send_v5_0_unsuback(
2053 &mut self,
2054 packet: v5_0::GenericUnsuback<PacketIdType>,
2055 ) -> Vec<GenericEvent<PacketIdType>> {
2056 if !self.validate_maximum_packet_size_send(packet.size()) {
2057 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2058 }
2059 if self.status != ConnectionStatus::Connected {
2060 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2061 }
2062
2063 let mut events = Vec::new();
2064 events.push(GenericEvent::RequestSendPacket {
2065 packet: packet.into(),
2066 release_packet_id_if_send_error: None,
2067 });
2068 self.send_post_process(&mut events);
2069
2070 events
2071 }
2072
2073 pub(crate) fn process_send_v3_1_1_pingreq(
2074 &mut self,
2075 packet: v3_1_1::Pingreq,
2076 ) -> Vec<GenericEvent<PacketIdType>> {
2077 if self.status != ConnectionStatus::Connected {
2078 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2079 }
2080 let mut events = Vec::new();
2081 events.push(GenericEvent::RequestSendPacket {
2082 packet: packet.into(),
2083 release_packet_id_if_send_error: None,
2084 });
2085 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2086 self.pingreq_send_set = true;
2087 events.push(GenericEvent::RequestTimerReset {
2088 kind: TimerKind::PingrespRecv,
2089 duration_ms: timeout_ms,
2090 });
2091 }
2092 self.send_post_process(&mut events);
2093
2094 events
2095 }
2096
2097 pub(crate) fn process_send_v5_0_pingreq(
2098 &mut self,
2099 packet: v5_0::Pingreq,
2100 ) -> Vec<GenericEvent<PacketIdType>> {
2101 if !self.validate_maximum_packet_size_send(packet.size()) {
2102 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2103 }
2104 if self.status != ConnectionStatus::Connected {
2105 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2106 }
2107
2108 let mut events = Vec::new();
2109 events.push(GenericEvent::RequestSendPacket {
2110 packet: packet.into(),
2111 release_packet_id_if_send_error: None,
2112 });
2113 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2114 self.pingreq_send_set = true;
2115 events.push(GenericEvent::RequestTimerReset {
2116 kind: TimerKind::PingrespRecv,
2117 duration_ms: timeout_ms,
2118 });
2119 }
2120 self.send_post_process(&mut events);
2121
2122 events
2123 }
2124
2125 pub(crate) fn process_send_v3_1_1_pingresp(
2126 &mut self,
2127 packet: v3_1_1::Pingresp,
2128 ) -> Vec<GenericEvent<PacketIdType>> {
2129 if self.status != ConnectionStatus::Connected {
2130 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2131 }
2132 let mut events = Vec::new();
2133 events.push(GenericEvent::RequestSendPacket {
2134 packet: packet.into(),
2135 release_packet_id_if_send_error: None,
2136 });
2137 self.send_post_process(&mut events);
2138
2139 events
2140 }
2141
2142 pub(crate) fn process_send_v5_0_pingresp(
2143 &mut self,
2144 packet: v5_0::Pingresp,
2145 ) -> Vec<GenericEvent<PacketIdType>> {
2146 if !self.validate_maximum_packet_size_send(packet.size()) {
2147 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2148 }
2149 if self.status != ConnectionStatus::Connected {
2150 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2151 }
2152
2153 let mut events = Vec::new();
2154 events.push(GenericEvent::RequestSendPacket {
2155 packet: packet.into(),
2156 release_packet_id_if_send_error: None,
2157 });
2158 self.send_post_process(&mut events);
2159
2160 events
2161 }
2162
2163 pub(crate) fn process_send_v3_1_1_disconnect(
2164 &mut self,
2165 packet: v3_1_1::Disconnect,
2166 ) -> Vec<GenericEvent<PacketIdType>> {
2167 if self.status != ConnectionStatus::Connected {
2168 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2169 }
2170 let mut events = Vec::new();
2171 self.status = ConnectionStatus::Disconnected;
2172 self.cancel_timers(&mut events);
2173 events.push(GenericEvent::RequestSendPacket {
2174 packet: packet.into(),
2175 release_packet_id_if_send_error: None,
2176 });
2177 events.push(GenericEvent::RequestClose);
2178
2179 events
2180 }
2181
2182 pub(crate) fn process_send_v5_0_disconnect(
2183 &mut self,
2184 packet: v5_0::Disconnect,
2185 ) -> Vec<GenericEvent<PacketIdType>> {
2186 if !self.validate_maximum_packet_size_send(packet.size()) {
2187 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2188 }
2189 if self.status != ConnectionStatus::Connected {
2190 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2191 }
2192
2193 let mut events = Vec::new();
2194 self.status = ConnectionStatus::Disconnected;
2195 self.cancel_timers(&mut events);
2196 events.push(GenericEvent::RequestSendPacket {
2197 packet: packet.into(),
2198 release_packet_id_if_send_error: None,
2199 });
2200 events.push(GenericEvent::RequestClose);
2201
2202 events
2203 }
2204
2205 pub(crate) fn process_send_v5_0_auth(
2206 &mut self,
2207 packet: v5_0::Auth,
2208 ) -> Vec<GenericEvent<PacketIdType>> {
2209 if !self.validate_maximum_packet_size_send(packet.size()) {
2210 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2211 }
2212 if self.status == ConnectionStatus::Disconnected {
2213 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2214 }
2215
2216 let mut events = Vec::new();
2217 events.push(GenericEvent::RequestSendPacket {
2218 packet: packet.into(),
2219 release_packet_id_if_send_error: None,
2220 });
2221 self.send_post_process(&mut events);
2222
2223 events
2224 }
2225
2226 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2227 if self.is_client {
2228 if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2229 self.pingreq_send_set = true;
2230 events.push(GenericEvent::RequestTimerReset {
2231 kind: TimerKind::PingreqSend,
2232 duration_ms: timeout_ms,
2233 });
2234 }
2235 }
2236 }
2237
2238 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2239 if size > self.maximum_packet_size_send as usize {
2240 tracing::error!("packet size over maximum_packet_size for sending");
2241 return false;
2242 }
2243 true
2244 }
2245
2246 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2247 let mut events = Vec::new();
2248
2249 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2251 if total_size > self.maximum_packet_size_recv {
2252 let disconnect_packet = v5_0::Disconnect::builder()
2258 .reason_code(DisconnectReasonCode::PacketTooLarge)
2259 .build()
2260 .unwrap();
2261 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2263 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2264 return events;
2265 }
2266
2267 let packet_type = raw_packet.packet_type();
2268 let _flags = raw_packet.flags();
2269 match self.protocol_version {
2270 Version::V3_1_1 => {
2271 match packet_type {
2272 1 => {
2273 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2275 }
2276 2 => {
2277 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2279 }
2280 3 => {
2281 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2283 }
2284 4 => {
2285 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2287 }
2288 5 => {
2289 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2291 }
2292 6 => {
2293 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2295 }
2296 7 => {
2297 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2299 }
2300 8 => {
2301 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2303 }
2304 9 => {
2305 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2307 }
2308 10 => {
2309 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2311 }
2312 11 => {
2313 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2315 }
2316 12 => {
2317 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2319 }
2320 13 => {
2321 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2323 }
2324 14 => {
2325 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2327 }
2328 _ => {
2330 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2331 }
2332 }
2333 }
2334 Version::V5_0 => {
2335 match packet_type {
2336 1 => {
2337 events.extend(self.process_recv_v5_0_connect(raw_packet));
2339 }
2340 2 => {
2341 events.extend(self.process_recv_v5_0_connack(raw_packet));
2343 }
2344 3 => {
2345 events.extend(self.process_recv_v5_0_publish(raw_packet));
2347 }
2348 4 => {
2349 events.extend(self.process_recv_v5_0_puback(raw_packet));
2351 }
2352 5 => {
2353 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2355 }
2356 6 => {
2357 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2359 }
2360 7 => {
2361 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2363 }
2364 8 => {
2365 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2367 }
2368 9 => {
2369 events.extend(self.process_recv_v5_0_suback(raw_packet));
2371 }
2372 10 => {
2373 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2375 }
2376 11 => {
2377 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2379 }
2380 12 => {
2381 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2383 }
2384 13 => {
2385 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2387 }
2388 14 => {
2389 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2391 }
2392 15 => {
2393 events.extend(self.process_recv_v5_0_auth(raw_packet));
2395 }
2396 _ => {
2398 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2399 }
2400 }
2401 }
2402 Version::Undetermined => {
2403 match packet_type {
2404 1 => {
2405 if raw_packet.remaining_length() < 7 {
2407 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2408 return events;
2409 }
2410 match raw_packet.data_as_slice()[6] {
2411 4 => {
2413 self.protocol_version = Version::V3_1_1;
2414 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2415 }
2416 5 => {
2417 self.protocol_version = Version::V5_0;
2418 events.extend(self.process_recv_v5_0_connect(raw_packet));
2419 }
2420 _ => {
2421 events.push(GenericEvent::NotifyError(
2422 MqttError::UnsupportedProtocolVersion,
2423 ));
2424 }
2425 }
2426 }
2427 _ => {
2428 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2429 }
2430 }
2431 }
2432 }
2433
2434 events
2435 }
2436
2437 fn process_recv_v3_1_1_connect(
2438 &mut self,
2439 raw_packet: RawPacket,
2440 ) -> Vec<GenericEvent<PacketIdType>> {
2441 let mut events = Vec::new();
2442 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2443 Ok((packet, _)) => {
2444 if self.status != ConnectionStatus::Disconnected {
2445 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2446 return events;
2447 }
2448 self.initialize(false);
2449 self.status = ConnectionStatus::Connecting;
2450 if packet.keep_alive() > 0 {
2451 self.pingreq_recv_timeout_ms =
2452 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2453 }
2454 if packet.clean_session() {
2455 self.clear_store_related();
2456 } else {
2457 self.need_store = true;
2458 }
2459 events.extend(self.refresh_pingreq_recv());
2460 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2461 }
2462 Err(e) => {
2463 if self.status == ConnectionStatus::Disconnected {
2464 self.status = ConnectionStatus::Connecting;
2465 let rc = match e {
2466 MqttError::ClientIdentifierNotValid => {
2467 ConnectReturnCode::IdentifierRejected
2468 }
2469 MqttError::BadUserNameOrPassword => {
2470 ConnectReturnCode::BadUserNameOrPassword
2471 }
2472 MqttError::UnsupportedProtocolVersion => {
2473 ConnectReturnCode::UnacceptableProtocolVersion
2474 }
2475 _ => ConnectReturnCode::NotAuthorized, };
2477 let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2478 let connack_events = self.process_send_v3_1_1_connack(connack);
2479 events.extend(connack_events);
2480 } else {
2481 events.push(GenericEvent::RequestClose);
2482 }
2483 events.push(GenericEvent::NotifyError(e));
2484 }
2485 }
2486
2487 events
2488 }
2489
2490 fn process_recv_v5_0_connect(
2491 &mut self,
2492 raw_packet: RawPacket,
2493 ) -> Vec<GenericEvent<PacketIdType>> {
2494 let mut events = Vec::new();
2495 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2496 Ok((packet, _)) => {
2497 if self.status != ConnectionStatus::Disconnected {
2498 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2499 return events;
2500 }
2501 self.initialize(false);
2502 self.status = ConnectionStatus::Connecting;
2503 if packet.keep_alive() > 0 {
2504 self.pingreq_recv_timeout_ms =
2505 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2506 }
2507 if packet.clean_start() {
2508 self.clear_store_related();
2509 }
2510 packet.props().iter().for_each(|prop| match prop {
2511 Property::TopicAliasMaximum(p) => {
2512 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2513 }
2514 Property::ReceiveMaximum(p) => {
2515 self.publish_send_max = Some(p.val());
2516 }
2517 Property::MaximumPacketSize(p) => {
2518 self.maximum_packet_size_send = p.val();
2519 }
2520 Property::SessionExpiryInterval(p) => {
2521 if p.val() != 0 {
2522 self.need_store = true;
2523 }
2524 }
2525 _ => {}
2526 });
2527 events.extend(self.refresh_pingreq_recv());
2528 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2529 }
2530 Err(e) => {
2531 if self.status == ConnectionStatus::Disconnected {
2532 self.status = ConnectionStatus::Connecting;
2533 let rc = match e {
2534 MqttError::ClientIdentifierNotValid => {
2535 ConnectReasonCode::ClientIdentifierNotValid
2536 }
2537 MqttError::BadUserNameOrPassword => {
2538 ConnectReasonCode::BadAuthenticationMethod
2539 }
2540 MqttError::UnsupportedProtocolVersion => {
2541 ConnectReasonCode::UnsupportedProtocolVersion
2542 }
2543 _ => ConnectReasonCode::UnspecifiedError,
2544 };
2545 let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2546 let connack_events = self.process_send_v5_0_connack(connack);
2547 events.extend(connack_events);
2548 } else {
2549 let disconnect = v5_0::Disconnect::builder()
2550 .reason_code(DisconnectReasonCode::ProtocolError)
2551 .build()
2552 .unwrap();
2553 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2554 events.extend(disconnect_events);
2555 }
2556 events.push(GenericEvent::NotifyError(e));
2557 }
2558 }
2559
2560 events
2561 }
2562
2563 fn process_recv_v3_1_1_connack(
2564 &mut self,
2565 raw_packet: RawPacket,
2566 ) -> Vec<GenericEvent<PacketIdType>> {
2567 let mut events = Vec::new();
2568
2569 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2570 Ok((packet, _consumed)) => {
2571 if packet.return_code() == ConnectReturnCode::Accepted {
2572 self.status = ConnectionStatus::Connected;
2573 if packet.session_present() {
2574 events.extend(self.send_stored());
2575 } else {
2576 self.clear_store_related();
2577 }
2578 }
2579 events.push(GenericEvent::NotifyPacketReceived(
2580 GenericPacket::V3_1_1Connack(packet),
2581 ));
2582 }
2583 Err(e) => {
2584 events.push(GenericEvent::RequestClose);
2585 events.push(GenericEvent::NotifyError(e));
2586 }
2587 }
2588
2589 events
2590 }
2591
2592 fn process_recv_v5_0_connack(
2593 &mut self,
2594 raw_packet: RawPacket,
2595 ) -> Vec<GenericEvent<PacketIdType>> {
2596 let mut events = Vec::new();
2597
2598 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2599 Ok((packet, _consumed)) => {
2600 if packet.reason_code() == ConnectReasonCode::Success {
2601 self.status = ConnectionStatus::Connected;
2602
2603 for prop in packet.props() {
2605 match prop {
2606 Property::TopicAliasMaximum(val) => {
2607 if val.val() > 0 {
2608 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2609 }
2610 }
2611 Property::ReceiveMaximum(val) => {
2612 assert!(val.val() != 0);
2613 self.publish_send_max = Some(val.val());
2614 }
2615 Property::MaximumPacketSize(val) => {
2616 assert!(val.val() != 0);
2617 self.maximum_packet_size_send = val.val();
2618 }
2619 Property::ServerKeepAlive(val) => {
2620 let timeout_ms = (val.val() as u64) * 1000;
2622 self.pingreq_send_interval_ms = Some(timeout_ms);
2623 }
2624 _ => {
2625 }
2627 }
2628 }
2629
2630 if packet.session_present() {
2631 events.extend(self.send_stored());
2632 } else {
2633 self.clear_store_related();
2634 }
2635 }
2636 events.push(GenericEvent::NotifyPacketReceived(
2637 GenericPacket::V5_0Connack(packet),
2638 ));
2639 }
2640 Err(e) => {
2641 if self.status == ConnectionStatus::Connected {
2642 let disconnect = v5_0::Disconnect::builder()
2643 .reason_code(e.into())
2644 .build()
2645 .unwrap();
2646 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2647 events.extend(disconnect_events);
2648 }
2649 events.push(GenericEvent::NotifyError(e));
2650 }
2651 }
2652
2653 events
2654 }
2655
2656 fn process_recv_v3_1_1_publish(
2657 &mut self,
2658 raw_packet: RawPacket,
2659 ) -> Vec<GenericEvent<PacketIdType>> {
2660 let mut events = Vec::new();
2661
2662 let flags = raw_packet.flags();
2663 match &raw_packet.data {
2664 PacketData::Publish(arc) => {
2665 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2666 Ok((packet, _consumed)) => {
2667 match packet.qos() {
2668 Qos::AtMostOnce => {
2669 events.extend(self.refresh_pingreq_recv());
2670 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2671 }
2672 Qos::AtLeastOnce => {
2673 let packet_id = packet.packet_id().unwrap();
2674 if self.status == ConnectionStatus::Connected
2675 && self.auto_pub_response
2676 {
2677 let puback = v3_1_1::GenericPuback::builder()
2679 .packet_id(packet_id)
2680 .build()
2681 .unwrap();
2682 events.extend(self.process_send_v3_1_1_puback(puback));
2683 }
2684 events.extend(self.refresh_pingreq_recv());
2685 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2686 }
2687 Qos::ExactlyOnce => {
2688 let packet_id = packet.packet_id().unwrap();
2689 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2690
2691 if self.status == ConnectionStatus::Connected
2692 && (self.auto_pub_response || already_handled)
2693 {
2694 let pubrec = v3_1_1::GenericPubrec::builder()
2695 .packet_id(packet_id)
2696 .build()
2697 .unwrap();
2698 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2699 }
2700 events.extend(self.refresh_pingreq_recv());
2701 if !already_handled {
2702 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2703 }
2704 }
2705 }
2706 }
2707 Err(e) => {
2708 events.push(GenericEvent::RequestClose);
2709 events.push(GenericEvent::NotifyError(e));
2710 }
2711 }
2712 }
2713 PacketData::Normal(_) => {
2714 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2715 }
2716 }
2717
2718 events
2719 }
2720
2721 fn process_recv_v5_0_publish(
2722 &mut self,
2723 raw_packet: RawPacket,
2724 ) -> Vec<GenericEvent<PacketIdType>> {
2725 let mut events = Vec::new();
2726
2727 let flags = raw_packet.flags();
2728 match &raw_packet.data {
2729 PacketData::Publish(arc) => {
2730 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2731 Ok((packet, _consumed)) => {
2732 let mut already_handled = false;
2733 let mut puback_send = false;
2734 let mut pubrec_send = false;
2735
2736 match packet.qos() {
2737 Qos::AtLeastOnce => {
2738 let packet_id = packet.packet_id().unwrap();
2739 if let Some(max) = self.publish_recv_max {
2740 if self.publish_recv.len() >= max as usize {
2741 let disconnect = v5_0::Disconnect::builder()
2742 .reason_code(
2743 DisconnectReasonCode::ReceiveMaximumExceeded,
2744 )
2745 .build()
2746 .unwrap();
2747 events
2748 .extend(self.process_send_v5_0_disconnect(disconnect));
2749 events.push(GenericEvent::NotifyError(
2750 MqttError::ReceiveMaximumExceeded,
2751 ));
2752 return events;
2753 }
2754 }
2755 self.publish_recv.insert(packet_id);
2756 if self.auto_pub_response
2757 && self.status == ConnectionStatus::Connected
2758 {
2759 puback_send = true;
2760 }
2761 }
2762 Qos::ExactlyOnce => {
2763 let packet_id = packet.packet_id().unwrap();
2764 if let Some(max) = self.publish_recv_max {
2765 if self.publish_recv.len() >= max as usize {
2766 let disconnect = v5_0::Disconnect::builder()
2767 .reason_code(
2768 DisconnectReasonCode::ReceiveMaximumExceeded,
2769 )
2770 .build()
2771 .unwrap();
2772 events
2773 .extend(self.process_send_v5_0_disconnect(disconnect));
2774 events.push(GenericEvent::NotifyError(
2775 MqttError::ReceiveMaximumExceeded,
2776 ));
2777 return events;
2778 }
2779 }
2780 self.publish_recv.insert(packet_id);
2781
2782 if !self.qos2_publish_handled.insert(packet_id) {
2783 already_handled = true;
2784 }
2785 if self.status == ConnectionStatus::Connected
2786 && (self.auto_pub_response || already_handled)
2787 {
2788 pubrec_send = true;
2789 }
2790 }
2791 Qos::AtMostOnce => {
2792 }
2794 }
2795
2796 if packet.topic_name().is_empty() {
2798 if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2800 if ta == 0
2801 || self.topic_alias_recv.is_none()
2802 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2803 {
2804 let disconnect = v5_0::Disconnect::builder()
2805 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2806 .build()
2807 .unwrap();
2808 events.extend(self.process_send_v5_0_disconnect(disconnect));
2809 events.push(GenericEvent::NotifyError(
2810 MqttError::TopicAliasInvalid,
2811 ));
2812 return events;
2813 }
2814
2815 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2816 if topic_alias_recv.get(ta).is_none() {
2817 let disconnect = v5_0::Disconnect::builder()
2818 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2819 .build()
2820 .unwrap();
2821 events
2822 .extend(self.process_send_v5_0_disconnect(disconnect));
2823 events.push(GenericEvent::NotifyError(
2824 MqttError::TopicAliasInvalid,
2825 ));
2826 return events;
2827 }
2828 }
2831 } else {
2832 let disconnect = v5_0::Disconnect::builder()
2833 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2834 .build()
2835 .unwrap();
2836 events.extend(self.process_send_v5_0_disconnect(disconnect));
2837 events
2838 .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2839 return events;
2840 }
2841 } else {
2842 if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2844 if ta == 0
2845 || self.topic_alias_recv.is_none()
2846 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2847 {
2848 let disconnect = v5_0::Disconnect::builder()
2849 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2850 .build()
2851 .unwrap();
2852 events.extend(self.process_send_v5_0_disconnect(disconnect));
2853 events.push(GenericEvent::NotifyError(
2854 MqttError::TopicAliasInvalid,
2855 ));
2856 return events;
2857 }
2858 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2859 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2860 }
2861 }
2862 }
2863
2864 if puback_send {
2866 let puback = v5_0::GenericPuback::builder()
2867 .packet_id(packet.packet_id().unwrap())
2868 .build()
2869 .unwrap();
2870 events.extend(self.process_send_v5_0_puback(puback));
2871 }
2872 if pubrec_send {
2873 let pubrec = v5_0::GenericPubrec::builder()
2874 .packet_id(packet.packet_id().unwrap())
2875 .build()
2876 .unwrap();
2877 events.extend(self.process_send_v5_0_pubrec(pubrec));
2878 }
2879
2880 events.extend(self.refresh_pingreq_recv());
2882
2883 if !already_handled {
2885 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2886 }
2887 }
2888 Err(e) => {
2889 if self.status == ConnectionStatus::Connected {
2890 let disconnect = v5_0::Disconnect::builder()
2891 .reason_code(e.into())
2892 .build()
2893 .unwrap();
2894 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2895 events.extend(disconnect_events);
2896 }
2897 events.push(GenericEvent::NotifyError(e));
2898 }
2899 }
2900 }
2901 PacketData::Normal(_) => {
2902 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2903 }
2904 }
2905
2906 events
2907 }
2908
2909 fn process_recv_v3_1_1_puback(
2910 &mut self,
2911 raw_packet: RawPacket,
2912 ) -> Vec<GenericEvent<PacketIdType>> {
2913 let mut events = Vec::new();
2914
2915 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2916 Ok((packet, _)) => {
2917 let packet_id = packet.packet_id();
2918 if self.pid_puback.remove(&packet_id) {
2919 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2920 if self.pid_man.is_used_id(packet_id) {
2921 self.pid_man.release_id(packet_id);
2922 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2923 }
2924 events.extend(self.refresh_pingreq_recv());
2925 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2926 } else {
2927 events.push(GenericEvent::RequestClose);
2928 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2929 }
2930 }
2931 Err(e) => {
2932 events.push(GenericEvent::RequestClose);
2933 events.push(GenericEvent::NotifyError(e));
2934 }
2935 }
2936
2937 events
2938 }
2939
2940 fn process_recv_v5_0_puback(
2941 &mut self,
2942 raw_packet: RawPacket,
2943 ) -> Vec<GenericEvent<PacketIdType>> {
2944 let mut events = Vec::new();
2945
2946 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2947 Ok((packet, _)) => {
2948 let packet_id = packet.packet_id();
2949 if self.pid_puback.remove(&packet_id) {
2950 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2951 if self.pid_man.is_used_id(packet_id) {
2952 self.pid_man.release_id(packet_id);
2953 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2954 }
2955 if self.publish_send_max.is_some() {
2956 self.publish_send_count -= 1;
2957 }
2958 events.extend(self.refresh_pingreq_recv());
2959 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2960 } else {
2961 let disconnect = v5_0::Disconnect::builder()
2962 .reason_code(DisconnectReasonCode::ProtocolError)
2963 .build()
2964 .unwrap();
2965 events.extend(self.process_send_v5_0_disconnect(disconnect));
2966 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2967 }
2968 }
2969 Err(e) => {
2970 let disconnect = v5_0::Disconnect::builder()
2971 .reason_code(DisconnectReasonCode::ProtocolError)
2972 .build()
2973 .unwrap();
2974 events.extend(self.process_send_v5_0_disconnect(disconnect));
2975 events.push(GenericEvent::NotifyError(e));
2976 }
2977 }
2978
2979 events
2980 }
2981
2982 fn process_recv_v3_1_1_pubrec(
2983 &mut self,
2984 raw_packet: RawPacket,
2985 ) -> Vec<GenericEvent<PacketIdType>> {
2986 let mut events = Vec::new();
2987
2988 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2989 Ok((packet, _)) => {
2990 let packet_id = packet.packet_id();
2991 if self.pid_pubrec.remove(&packet_id) {
2992 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2993 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2994 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2995 .packet_id(packet_id)
2996 .build()
2997 .unwrap();
2998 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2999 }
3000 events.extend(self.refresh_pingreq_recv());
3001 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3002 } else {
3003 events.push(GenericEvent::RequestClose);
3004 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3005 }
3006 }
3007 Err(e) => {
3008 events.push(GenericEvent::RequestClose);
3009 events.push(GenericEvent::NotifyError(e));
3010 }
3011 }
3012
3013 events
3014 }
3015
3016 fn process_recv_v5_0_pubrec(
3017 &mut self,
3018 raw_packet: RawPacket,
3019 ) -> Vec<GenericEvent<PacketIdType>> {
3020 let mut events = Vec::new();
3021
3022 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3023 Ok((packet, _)) => {
3024 let packet_id = packet.packet_id();
3025 if self.pid_pubrec.remove(&packet_id) {
3026 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3027 if let Some(reason_code) = packet.reason_code() {
3028 if reason_code != PubrecReasonCode::Success {
3029 if self.pid_man.is_used_id(packet_id) {
3030 self.pid_man.release_id(packet_id);
3031 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3032 }
3033 self.qos2_publish_processing.remove(&packet_id);
3034 if self.publish_send_max.is_some() {
3035 self.publish_send_count -= 1;
3036 }
3037 } else if self.auto_pub_response
3038 && self.status == ConnectionStatus::Connected
3039 {
3040 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3041 .packet_id(packet_id)
3042 .build()
3043 .unwrap();
3044 events.extend(self.process_send_v5_0_pubrel(pubrel));
3045 }
3046 } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3047 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3048 .packet_id(packet_id)
3049 .build()
3050 .unwrap();
3051 events.extend(self.process_send_v5_0_pubrel(pubrel));
3052 }
3053 events.extend(self.refresh_pingreq_recv());
3054 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3055 } else {
3056 let disconnect = v5_0::Disconnect::builder()
3057 .reason_code(DisconnectReasonCode::ProtocolError)
3058 .build()
3059 .unwrap();
3060 events.extend(self.process_send_v5_0_disconnect(disconnect));
3061 events.push(GenericEvent::NotifyError(MqttError::from(
3062 DisconnectReasonCode::ProtocolError,
3063 )));
3064 }
3065 }
3066 Err(e) => {
3067 let disconnect = v5_0::Disconnect::builder()
3068 .reason_code(DisconnectReasonCode::ProtocolError)
3069 .build()
3070 .unwrap();
3071 events.extend(self.process_send_v5_0_disconnect(disconnect));
3072 events.push(GenericEvent::NotifyError(e));
3073 }
3074 }
3075
3076 events
3077 }
3078
3079 fn process_recv_v3_1_1_pubrel(
3080 &mut self,
3081 raw_packet: RawPacket,
3082 ) -> Vec<GenericEvent<PacketIdType>> {
3083 let mut events = Vec::new();
3084
3085 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3086 Ok((packet, _)) => {
3087 let packet_id = packet.packet_id();
3088 self.qos2_publish_handled.remove(&packet_id);
3089 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3090 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3091 .packet_id(packet_id)
3092 .build()
3093 .unwrap();
3094 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3095 }
3096 events.extend(self.refresh_pingreq_recv());
3097 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3098 }
3099 Err(e) => {
3100 events.push(GenericEvent::RequestClose);
3101 events.push(GenericEvent::NotifyError(e));
3102 }
3103 }
3104
3105 events
3106 }
3107
3108 fn process_recv_v5_0_pubrel(
3109 &mut self,
3110 raw_packet: RawPacket,
3111 ) -> Vec<GenericEvent<PacketIdType>> {
3112 let mut events = Vec::new();
3113
3114 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3115 Ok((packet, _)) => {
3116 let packet_id = packet.packet_id();
3117 self.qos2_publish_handled.remove(&packet_id);
3118 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3119 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3120 .packet_id(packet_id)
3121 .build()
3122 .unwrap();
3123 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3124 }
3125 events.extend(self.refresh_pingreq_recv());
3126 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3127 }
3128 Err(e) => {
3129 let disconnect = v5_0::Disconnect::builder()
3130 .reason_code(DisconnectReasonCode::ProtocolError)
3131 .build()
3132 .unwrap();
3133 events.extend(self.process_send_v5_0_disconnect(disconnect));
3134 events.push(GenericEvent::NotifyError(e));
3135 }
3136 }
3137
3138 events
3139 }
3140
3141 fn process_recv_v3_1_1_pubcomp(
3142 &mut self,
3143 raw_packet: RawPacket,
3144 ) -> Vec<GenericEvent<PacketIdType>> {
3145 let mut events = Vec::new();
3146
3147 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3148 Ok((packet, _)) => {
3149 let packet_id = packet.packet_id();
3150 if self.pid_pubcomp.remove(&packet_id) {
3151 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3152 if self.pid_man.is_used_id(packet_id) {
3153 self.pid_man.release_id(packet_id);
3154 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3155 }
3156 self.qos2_publish_processing.remove(&packet_id);
3157 events.extend(self.refresh_pingreq_recv());
3158 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3159 } else {
3160 events.push(GenericEvent::RequestClose);
3161 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3162 }
3163 }
3164 Err(e) => {
3165 events.push(GenericEvent::RequestClose);
3166 events.push(GenericEvent::NotifyError(e));
3167 }
3168 }
3169
3170 events
3171 }
3172
3173 fn process_recv_v5_0_pubcomp(
3174 &mut self,
3175 raw_packet: RawPacket,
3176 ) -> Vec<GenericEvent<PacketIdType>> {
3177 let mut events = Vec::new();
3178
3179 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3180 Ok((packet, _)) => {
3181 let packet_id = packet.packet_id();
3182 if self.pid_pubcomp.remove(&packet_id) {
3183 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3184 if self.pid_man.is_used_id(packet_id) {
3185 self.pid_man.release_id(packet_id);
3186 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3187 }
3188 self.qos2_publish_processing.remove(&packet_id);
3189 if self.publish_send_max.is_some() {
3190 self.publish_send_count -= 1;
3191 }
3192 events.extend(self.refresh_pingreq_recv());
3193 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3194 } else {
3195 let disconnect = v5_0::Disconnect::builder()
3196 .reason_code(DisconnectReasonCode::ProtocolError)
3197 .build()
3198 .unwrap();
3199 events.extend(self.process_send_v5_0_disconnect(disconnect));
3200 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3201 }
3202 }
3203 Err(e) => {
3204 let disconnect = v5_0::Disconnect::builder()
3205 .reason_code(DisconnectReasonCode::ProtocolError)
3206 .build()
3207 .unwrap();
3208 events.extend(self.process_send_v5_0_disconnect(disconnect));
3209 events.push(GenericEvent::NotifyError(e));
3210 }
3211 }
3212
3213 events
3214 }
3215
3216 fn process_recv_v3_1_1_subscribe(
3217 &mut self,
3218 raw_packet: RawPacket,
3219 ) -> Vec<GenericEvent<PacketIdType>> {
3220 let mut events = Vec::new();
3221
3222 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3223 Ok((packet, _)) => {
3224 events.extend(self.refresh_pingreq_recv());
3225 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3226 }
3227 Err(e) => {
3228 events.push(GenericEvent::RequestClose);
3229 events.push(GenericEvent::NotifyError(e));
3230 }
3231 }
3232
3233 events
3234 }
3235
3236 fn process_recv_v5_0_subscribe(
3237 &mut self,
3238 raw_packet: RawPacket,
3239 ) -> Vec<GenericEvent<PacketIdType>> {
3240 let mut events = Vec::new();
3241
3242 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3243 Ok((packet, _)) => {
3244 events.extend(self.refresh_pingreq_recv());
3245 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3246 }
3247 Err(e) => {
3248 let disconnect = v5_0::Disconnect::builder()
3249 .reason_code(DisconnectReasonCode::ProtocolError)
3250 .build()
3251 .unwrap();
3252 events.extend(self.process_send_v5_0_disconnect(disconnect));
3253 events.push(GenericEvent::NotifyError(e));
3254 }
3255 }
3256
3257 events
3258 }
3259
3260 fn process_recv_v3_1_1_suback(
3261 &mut self,
3262 raw_packet: RawPacket,
3263 ) -> Vec<GenericEvent<PacketIdType>> {
3264 let mut events = Vec::new();
3265
3266 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3267 Ok((packet, _)) => {
3268 let packet_id = packet.packet_id();
3269 if self.pid_suback.remove(&packet_id) {
3270 if self.pid_man.is_used_id(packet_id) {
3271 self.pid_man.release_id(packet_id);
3272 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3273 }
3274 events.extend(self.refresh_pingreq_recv());
3275 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3276 } else {
3277 events.push(GenericEvent::RequestClose);
3278 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3279 }
3280 }
3281 Err(e) => {
3282 events.push(GenericEvent::RequestClose);
3283 events.push(GenericEvent::NotifyError(e));
3284 }
3285 }
3286
3287 events
3288 }
3289
3290 fn process_recv_v5_0_suback(
3291 &mut self,
3292 raw_packet: RawPacket,
3293 ) -> Vec<GenericEvent<PacketIdType>> {
3294 let mut events = Vec::new();
3295
3296 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3297 Ok((packet, _)) => {
3298 let packet_id = packet.packet_id();
3299 if self.pid_suback.remove(&packet_id) {
3300 if self.pid_man.is_used_id(packet_id) {
3301 self.pid_man.release_id(packet_id);
3302 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3303 }
3304 events.extend(self.refresh_pingreq_recv());
3305 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3306 } else {
3307 let disconnect = v5_0::Disconnect::builder()
3308 .reason_code(DisconnectReasonCode::ProtocolError)
3309 .build()
3310 .unwrap();
3311 events.extend(self.process_send_v5_0_disconnect(disconnect));
3312 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3313 }
3314 }
3315 Err(e) => {
3316 let disconnect = v5_0::Disconnect::builder()
3317 .reason_code(DisconnectReasonCode::ProtocolError)
3318 .build()
3319 .unwrap();
3320 events.extend(self.process_send_v5_0_disconnect(disconnect));
3321 events.push(GenericEvent::NotifyError(e));
3322 }
3323 }
3324
3325 events
3326 }
3327
3328 fn process_recv_v3_1_1_unsubscribe(
3329 &mut self,
3330 raw_packet: RawPacket,
3331 ) -> Vec<GenericEvent<PacketIdType>> {
3332 let mut events = Vec::new();
3333
3334 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3335 Ok((packet, _)) => {
3336 events.extend(self.refresh_pingreq_recv());
3337 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3338 }
3339 Err(e) => {
3340 events.push(GenericEvent::RequestClose);
3341 events.push(GenericEvent::NotifyError(e));
3342 }
3343 }
3344
3345 events
3346 }
3347
3348 fn process_recv_v5_0_unsubscribe(
3349 &mut self,
3350 raw_packet: RawPacket,
3351 ) -> Vec<GenericEvent<PacketIdType>> {
3352 let mut events = Vec::new();
3353
3354 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3355 Ok((packet, _)) => {
3356 events.extend(self.refresh_pingreq_recv());
3357 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3358 }
3359 Err(e) => {
3360 let disconnect = v5_0::Disconnect::builder()
3361 .reason_code(DisconnectReasonCode::ProtocolError)
3362 .build()
3363 .unwrap();
3364 events.extend(self.process_send_v5_0_disconnect(disconnect));
3365 events.push(GenericEvent::NotifyError(e));
3366 }
3367 }
3368
3369 events
3370 }
3371
3372 fn process_recv_v3_1_1_unsuback(
3373 &mut self,
3374 raw_packet: RawPacket,
3375 ) -> Vec<GenericEvent<PacketIdType>> {
3376 let mut events = Vec::new();
3377
3378 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3379 Ok((packet, _)) => {
3380 let packet_id = packet.packet_id();
3381 if self.pid_unsuback.remove(&packet_id) {
3382 if self.pid_man.is_used_id(packet_id) {
3383 self.pid_man.release_id(packet_id);
3384 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3385 }
3386 events.extend(self.refresh_pingreq_recv());
3387 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3388 } else {
3389 events.push(GenericEvent::RequestClose);
3390 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3391 }
3392 }
3393 Err(e) => {
3394 events.push(GenericEvent::RequestClose);
3395 events.push(GenericEvent::NotifyError(e));
3396 }
3397 }
3398
3399 events
3400 }
3401
3402 fn process_recv_v5_0_unsuback(
3403 &mut self,
3404 raw_packet: RawPacket,
3405 ) -> Vec<GenericEvent<PacketIdType>> {
3406 let mut events = Vec::new();
3407
3408 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3409 Ok((packet, _)) => {
3410 let packet_id = packet.packet_id();
3411 if self.pid_unsuback.remove(&packet_id) {
3412 if self.pid_man.is_used_id(packet_id) {
3413 self.pid_man.release_id(packet_id);
3414 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3415 }
3416 events.extend(self.refresh_pingreq_recv());
3417 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3418 } else {
3419 let disconnect = v5_0::Disconnect::builder()
3420 .reason_code(DisconnectReasonCode::ProtocolError)
3421 .build()
3422 .unwrap();
3423 events.extend(self.process_send_v5_0_disconnect(disconnect));
3424 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3425 }
3426 }
3427 Err(e) => {
3428 let disconnect = v5_0::Disconnect::builder()
3429 .reason_code(DisconnectReasonCode::ProtocolError)
3430 .build()
3431 .unwrap();
3432 events.extend(self.process_send_v5_0_disconnect(disconnect));
3433 events.push(GenericEvent::NotifyError(e));
3434 }
3435 }
3436
3437 events
3438 }
3439
3440 fn process_recv_v3_1_1_pingreq(
3441 &mut self,
3442 raw_packet: RawPacket,
3443 ) -> Vec<GenericEvent<PacketIdType>> {
3444 let mut events = Vec::new();
3445
3446 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3447 Ok((packet, _)) => {
3448 if (Role::IS_SERVER || Role::IS_ANY) && !self.is_client {
3449 if self.auto_ping_response && self.status == ConnectionStatus::Connected {
3450 let pingresp = v3_1_1::Pingresp::new();
3451 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3452 }
3453 }
3454 events.extend(self.refresh_pingreq_recv());
3455 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3456 }
3457 Err(e) => {
3458 events.push(GenericEvent::RequestClose);
3459 events.push(GenericEvent::NotifyError(e));
3460 }
3461 }
3462
3463 events
3464 }
3465
3466 fn process_recv_v5_0_pingreq(
3467 &mut self,
3468 raw_packet: RawPacket,
3469 ) -> Vec<GenericEvent<PacketIdType>> {
3470 let mut events = Vec::new();
3471
3472 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3473 Ok((packet, _)) => {
3474 if (Role::IS_SERVER || Role::IS_ANY) && !self.is_client {
3475 if self.auto_ping_response && self.status == ConnectionStatus::Connected {
3476 let pingresp = v5_0::Pingresp::new();
3477 events.extend(self.process_send_v5_0_pingresp(pingresp));
3478 }
3479 }
3480 events.extend(self.refresh_pingreq_recv());
3481 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3482 }
3483 Err(e) => {
3484 let disconnect = v5_0::Disconnect::builder()
3485 .reason_code(DisconnectReasonCode::ProtocolError)
3486 .build()
3487 .unwrap();
3488 events.extend(self.process_send_v5_0_disconnect(disconnect));
3489 events.push(GenericEvent::NotifyError(e));
3490 }
3491 }
3492
3493 events
3494 }
3495
3496 fn process_recv_v3_1_1_pingresp(
3497 &mut self,
3498 raw_packet: RawPacket,
3499 ) -> Vec<GenericEvent<PacketIdType>> {
3500 let mut events = Vec::new();
3501
3502 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3503 Ok((packet, _)) => {
3504 self.pingresp_recv_set = false;
3505 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3506 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3507 }
3508 Err(e) => {
3509 events.push(GenericEvent::RequestClose);
3510 events.push(GenericEvent::NotifyError(e));
3511 }
3512 }
3513
3514 events
3515 }
3516
3517 fn process_recv_v5_0_pingresp(
3518 &mut self,
3519 raw_packet: RawPacket,
3520 ) -> Vec<GenericEvent<PacketIdType>> {
3521 let mut events = Vec::new();
3522
3523 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3524 Ok((packet, _)) => {
3525 self.pingresp_recv_set = false;
3526 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3527 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3528 }
3529 Err(e) => {
3530 let disconnect = v5_0::Disconnect::builder()
3531 .reason_code(DisconnectReasonCode::ProtocolError)
3532 .build()
3533 .unwrap();
3534 events.extend(self.process_send_v5_0_disconnect(disconnect));
3535 events.push(GenericEvent::NotifyError(e));
3536 }
3537 }
3538
3539 events
3540 }
3541
3542 fn process_recv_v3_1_1_disconnect(
3543 &mut self,
3544 raw_packet: RawPacket,
3545 ) -> Vec<GenericEvent<PacketIdType>> {
3546 let mut events = Vec::new();
3547
3548 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3549 Ok((packet, _)) => {
3550 self.cancel_timers(&mut events);
3551 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3552 }
3553 Err(e) => {
3554 events.push(GenericEvent::RequestClose);
3555 events.push(GenericEvent::NotifyError(e));
3556 }
3557 }
3558
3559 events
3560 }
3561
3562 fn process_recv_v5_0_disconnect(
3563 &mut self,
3564 raw_packet: RawPacket,
3565 ) -> Vec<GenericEvent<PacketIdType>> {
3566 let mut events = Vec::new();
3567
3568 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3569 Ok((packet, _)) => {
3570 self.cancel_timers(&mut events);
3571 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3572 }
3573 Err(e) => {
3574 let disconnect = v5_0::Disconnect::builder()
3575 .reason_code(DisconnectReasonCode::ProtocolError)
3576 .build()
3577 .unwrap();
3578 events.extend(self.process_send_v5_0_disconnect(disconnect));
3579 events.push(GenericEvent::NotifyError(e));
3580 }
3581 }
3582
3583 events
3584 }
3585
3586 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3587 let mut events = Vec::new();
3588
3589 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3590 Ok((packet, _)) => {
3591 events.extend(self.refresh_pingreq_recv());
3592 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3593 }
3594 Err(e) => {
3595 let disconnect = v5_0::Disconnect::builder()
3596 .reason_code(DisconnectReasonCode::ProtocolError)
3597 .build()
3598 .unwrap();
3599 events.extend(self.process_send_v5_0_disconnect(disconnect));
3600 events.push(GenericEvent::NotifyError(e));
3601 }
3602 }
3603
3604 events
3605 }
3606
3607 fn get_topic_alias_from_props_opt(props: &Option<Vec<Property>>) -> Option<u16> {
3608 if let Some(props) = props {
3609 Self::get_topic_alias_from_props(props.as_slice())
3610 } else {
3611 None
3612 }
3613 }
3614
3615 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3616 let mut events = Vec::new();
3617 if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3618 if self.status == ConnectionStatus::Connecting
3619 || self.status == ConnectionStatus::Connected
3620 {
3621 self.pingreq_recv_set = true;
3622 events.push(GenericEvent::RequestTimerReset {
3623 kind: TimerKind::PingreqRecv,
3624 duration_ms: timeout_ms,
3625 });
3626 } else {
3627 self.pingreq_recv_set = false;
3628 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3629 }
3630 }
3631
3632 events
3633 }
3634
3635 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3637 if self.pingreq_send_set {
3638 self.pingreq_send_set = false;
3639 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3640 }
3641 if self.pingreq_recv_set {
3642 self.pingreq_recv_set = false;
3643 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3644 }
3645 if self.pingresp_recv_set {
3646 self.pingresp_recv_set = false;
3647 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3648 }
3649 }
3650
3651 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3653 for prop in props {
3654 if let Property::TopicAlias(ta) = prop {
3655 return Some(ta.val());
3656 }
3657 }
3658 None
3659 }
3660
3661 #[allow(dead_code)]
3662 fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3663 self.pid_man.is_used_id(packet_id)
3664 }
3665}
3666
3667pub trait RecvBehavior<Role, PacketIdType>
3670where
3671 PacketIdType: IsPacketId,
3672{
3673 fn recv(&mut self, data: &mut std::io::Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3674}
3675
3676impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3678 for GenericConnection<role::Client, PacketIdType>
3679where
3680 PacketIdType: IsPacketId,
3681{
3682 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3683 self.recv(data)
3684 }
3685}
3686
3687impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3688 for GenericConnection<role::Server, PacketIdType>
3689where
3690 PacketIdType: IsPacketId,
3691{
3692 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3693 self.recv(data)
3694 }
3695}
3696
3697impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3698 for GenericConnection<role::Any, PacketIdType>
3699where
3700 PacketIdType: IsPacketId,
3701{
3702 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3703 self.recv(data)
3704 }
3705}
3706
3707#[cfg(test)]
3710mod tests {
3711 use super::*;
3712 use crate::mqtt::packet::TopicAliasSend;
3713 use crate::mqtt::role;
3714 use crate::mqtt::version::Version;
3715
3716 #[test]
3717 fn test_initialize_client_mode() {
3718 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3719
3720 connection.initialize(true);
3722
3723 assert!(connection.is_client);
3725 assert_eq!(connection.publish_send_count, 0);
3726 assert!(connection.publish_send_max.is_none());
3727 assert!(connection.publish_recv_max.is_none());
3728 assert!(!connection.need_store);
3729 }
3730
3731 #[test]
3732 fn test_initialize_server_mode() {
3733 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3734
3735 connection.initialize(false);
3737
3738 assert!(!connection.is_client);
3740 assert_eq!(connection.publish_send_count, 0);
3741 assert!(connection.publish_send_max.is_none());
3742 assert!(connection.publish_recv_max.is_none());
3743 assert!(!connection.need_store);
3744 }
3745
3746 #[test]
3747 fn test_validate_topic_alias_no_topic_alias_send() {
3748 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3749
3750 let result = connection.validate_topic_alias(Some(1));
3752 assert!(result.is_none());
3753 }
3754
3755 #[test]
3756 fn test_validate_topic_alias_none_input() {
3757 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3758
3759 let result = connection.validate_topic_alias(None);
3761 assert!(result.is_none());
3762 }
3763
3764 #[test]
3765 fn test_validate_topic_alias_range_no_topic_alias_send() {
3766 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3767
3768 let result = connection.validate_topic_alias_range(1);
3770 assert!(!result);
3771 }
3772
3773 #[test]
3774 fn test_validate_topic_alias_range_zero() {
3775 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3776
3777 let topic_alias_send = TopicAliasSend::new(10);
3779 connection.topic_alias_send = Some(topic_alias_send);
3780
3781 let result = connection.validate_topic_alias_range(0);
3783 assert!(!result);
3784 }
3785
3786 #[test]
3787 fn test_validate_topic_alias_range_over_max() {
3788 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3789
3790 let topic_alias_send = TopicAliasSend::new(5);
3792 connection.topic_alias_send = Some(topic_alias_send);
3793
3794 let result = connection.validate_topic_alias_range(6);
3796 assert!(!result);
3797 }
3798
3799 #[test]
3800 fn test_validate_topic_alias_range_valid() {
3801 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3802
3803 let topic_alias_send = TopicAliasSend::new(10);
3805 connection.topic_alias_send = Some(topic_alias_send);
3806
3807 assert!(connection.validate_topic_alias_range(1));
3809 assert!(connection.validate_topic_alias_range(5));
3810 assert!(connection.validate_topic_alias_range(10));
3811 }
3812
3813 #[test]
3814 fn test_validate_topic_alias_with_registered_alias() {
3815 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3816
3817 let mut topic_alias_send = TopicAliasSend::new(10);
3819 topic_alias_send.insert_or_update("test/topic", 5);
3820 connection.topic_alias_send = Some(topic_alias_send);
3821
3822 let result = connection.validate_topic_alias(Some(5));
3824 assert_eq!(result, Some("test/topic".to_string()));
3825 }
3826
3827 #[test]
3828 fn test_validate_topic_alias_unregistered_alias() {
3829 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3830
3831 let topic_alias_send = TopicAliasSend::new(10);
3833 connection.topic_alias_send = Some(topic_alias_send);
3834
3835 let result = connection.validate_topic_alias(Some(5));
3837 assert!(result.is_none());
3838 }
3839
3840 #[test]
3841 fn test_validate_maximum_packet_size_within_limit() {
3842 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3843
3844 let result = connection.validate_maximum_packet_size_send(1000);
3846 assert!(result);
3847 }
3848
3849 #[test]
3850 fn test_validate_maximum_packet_size_at_limit() {
3851 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3852
3853 connection.maximum_packet_size_send = 1000;
3855
3856 let result = connection.validate_maximum_packet_size_send(1000);
3858 assert!(result);
3859 }
3860
3861 #[test]
3862 fn test_validate_maximum_packet_size_over_limit() {
3863 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3864
3865 connection.maximum_packet_size_send = 1000;
3867
3868 let result = connection.validate_maximum_packet_size_send(1001);
3870 assert!(!result);
3871 }
3872
3873 #[test]
3874 fn test_validate_maximum_packet_size_zero_limit() {
3875 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3876
3877 connection.maximum_packet_size_send = 0;
3879
3880 let result = connection.validate_maximum_packet_size_send(1);
3882 assert!(!result);
3883
3884 let result = connection.validate_maximum_packet_size_send(0);
3886 assert!(result);
3887 }
3888
3889 #[test]
3890 fn test_initialize_clears_state() {
3891 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3892
3893 connection.publish_send_count = 5;
3895 connection.need_store = true;
3896 connection.pid_suback.insert(123);
3897 connection.pid_unsuback.insert(456);
3898
3899 connection.initialize(true);
3901
3902 assert_eq!(connection.publish_send_count, 0);
3904 assert!(!connection.need_store);
3905 assert!(connection.pid_suback.is_empty());
3906 assert!(connection.pid_unsuback.is_empty());
3907 assert!(connection.is_client);
3908 }
3909
3910 #[test]
3911 fn test_remaining_length_to_total_size() {
3912 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3928}