1use alloc::{
23 string::{String, ToString},
24 vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33
34use serde::Serialize;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
37enum ConnectionStatus {
38 #[serde(rename = "disconnected")]
39 Disconnected,
40 #[serde(rename = "connecting")]
41 Connecting,
42 #[serde(rename = "connected")]
43 Connected,
44}
45use crate::mqtt::connection::packet_builder::{
46 PacketBuildResult, PacketBuilder, PacketData, RawPacket,
47};
48use crate::mqtt::connection::packet_id_manager::PacketIdManager;
49use crate::mqtt::connection::role;
50use crate::mqtt::connection::role::RoleType;
51use crate::mqtt::connection::sendable::Sendable;
52use crate::mqtt::connection::version::*;
53use crate::mqtt::packet::v3_1_1;
54use crate::mqtt::packet::v5_0;
55use crate::mqtt::packet::GenericPacket;
56use crate::mqtt::packet::GenericStorePacket;
57use crate::mqtt::packet::IsPacketId;
58use crate::mqtt::packet::Qos;
59use crate::mqtt::packet::ResponsePacket;
60use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
61use crate::mqtt::prelude::GenericPacketTrait;
62use crate::mqtt::result_code::{
63 ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
64};
65
66const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
69
70fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
77 let remaining_length_bytes = if remaining_length < 128 {
78 1
79 } else if remaining_length < 16384 {
80 2
81 } else if remaining_length < 2097152 {
82 3
83 } else {
84 4
85 };
86
87 1 + remaining_length_bytes + remaining_length
88}
89
90pub type Event = GenericEvent<u16>;
95
96pub struct GenericConnection<Role, PacketIdType>
139where
140 Role: RoleType,
141 PacketIdType: IsPacketId,
142{
143 _marker: PhantomData<Role>,
144
145 protocol_version: Version,
146
147 pid_man: PacketIdManager<PacketIdType>,
148 pid_suback: HashSet<PacketIdType>,
149 pid_unsuback: HashSet<PacketIdType>,
150 pid_puback: HashSet<PacketIdType>,
151 pid_pubrec: HashSet<PacketIdType>,
152 pid_pubcomp: HashSet<PacketIdType>,
153
154 need_store: bool,
155 store: GenericStore<PacketIdType>,
157
158 offline_publish: bool,
159 auto_pub_response: bool,
160 auto_ping_response: bool,
161
162 auto_map_topic_alias_send: bool,
164 auto_replace_topic_alias_send: bool,
166 topic_alias_recv: Option<TopicAliasRecv>,
168 topic_alias_send: Option<TopicAliasSend>,
170
171 publish_send_max: Option<u16>,
172 publish_recv_max: Option<u16>,
174 publish_send_count: u16,
177
178 publish_recv: HashSet<PacketIdType>,
180
181 maximum_packet_size_send: u32,
183 maximum_packet_size_recv: u32,
185
186 status: ConnectionStatus,
188
189 pingreq_send_interval_ms: Option<u64>,
191 pingreq_recv_timeout_ms: Option<u64>,
193 pingresp_recv_timeout_ms: Option<u64>,
195
196 qos2_publish_handled: HashSet<PacketIdType>,
198 qos2_publish_processing: HashSet<PacketIdType>,
200
201 pingreq_send_set: bool,
203 pingreq_recv_set: bool,
204 pingresp_recv_set: bool,
205
206 packet_builder: PacketBuilder,
207 is_client: bool,
209}
210
211pub type Connection<Role> = GenericConnection<Role, u16>;
224
225impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
226where
227 Role: RoleType,
228 PacketIdType: IsPacketId,
229{
230 pub fn new(version: Version) -> Self {
253 Self {
254 _marker: PhantomData,
255 protocol_version: version,
256 pid_man: PacketIdManager::new(),
257 pid_suback: HashSet::default(),
258 pid_unsuback: HashSet::default(),
259 pid_puback: HashSet::default(),
260 pid_pubrec: HashSet::default(),
261 pid_pubcomp: HashSet::default(),
262 need_store: false,
263 store: GenericStore::new(),
264 offline_publish: false,
265 auto_pub_response: false,
266 auto_ping_response: false,
267 auto_map_topic_alias_send: false,
268 auto_replace_topic_alias_send: false,
269 topic_alias_recv: None,
270 topic_alias_send: None,
271 publish_send_max: None,
272 publish_recv_max: None,
273 publish_send_count: 0,
274 publish_recv: HashSet::default(),
275 maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
276 maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
277 status: ConnectionStatus::Disconnected,
278 pingreq_send_interval_ms: None,
279 pingreq_recv_timeout_ms: None,
280 pingresp_recv_timeout_ms: None,
281 qos2_publish_handled: HashSet::default(),
282 qos2_publish_processing: HashSet::default(),
283 pingreq_send_set: false,
284 pingreq_recv_set: false,
285 pingresp_recv_set: false,
286 packet_builder: PacketBuilder::new(),
287 is_client: false,
288 }
289 }
290
291 pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
332 where
333 T: Sendable<Role, PacketIdType>,
334 {
335 packet.dispatch_send(self)
337 }
338
339 pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
384 use core::any::TypeId;
385
386 let role_id = TypeId::of::<Role>();
387 let client_id = TypeId::of::<role::Client>();
388 let server_id = TypeId::of::<role::Server>();
389 let any_id = TypeId::of::<role::Any>();
390
391 let packet_version = packet.protocol_version();
393
394 if self.protocol_version != packet_version {
396 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
397 }
398
399 match packet {
400 GenericPacket::V3_1_1Connect(p) => {
402 if role_id == client_id || role_id == any_id {
403 self.process_send_v3_1_1_connect(p)
404 } else {
405 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
406 }
407 }
408 GenericPacket::V5_0Connect(p) => {
409 if role_id == client_id || role_id == any_id {
410 self.process_send_v5_0_connect(p)
411 } else {
412 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
413 }
414 }
415 GenericPacket::V3_1_1Connack(p) => {
417 if role_id == server_id || role_id == any_id {
418 self.process_send_v3_1_1_connack(p)
419 } else {
420 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
421 }
422 }
423 GenericPacket::V5_0Connack(p) => {
424 if role_id == server_id || role_id == any_id {
425 self.process_send_v5_0_connack(p)
426 } else {
427 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
428 }
429 }
430 GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
432 GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
433 GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
435 GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
436 GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
437 GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
438 GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
439 GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
440 GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
441 GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
442 GenericPacket::V3_1_1Subscribe(p) => {
444 if role_id == client_id || role_id == any_id {
445 self.process_send_v3_1_1_subscribe(p)
446 } else {
447 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
448 }
449 }
450 GenericPacket::V5_0Subscribe(p) => {
451 if role_id == client_id || role_id == any_id {
452 self.process_send_v5_0_subscribe(p)
453 } else {
454 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
455 }
456 }
457 GenericPacket::V3_1_1Unsubscribe(p) => {
458 if role_id == client_id || role_id == any_id {
459 self.process_send_v3_1_1_unsubscribe(p)
460 } else {
461 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
462 }
463 }
464 GenericPacket::V5_0Unsubscribe(p) => {
465 if role_id == client_id || role_id == any_id {
466 self.process_send_v5_0_unsubscribe(p)
467 } else {
468 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
469 }
470 }
471 GenericPacket::V3_1_1Suback(p) => {
473 if role_id == server_id || role_id == any_id {
474 self.process_send_v3_1_1_suback(p)
475 } else {
476 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
477 }
478 }
479 GenericPacket::V5_0Suback(p) => {
480 if role_id == server_id || role_id == any_id {
481 self.process_send_v5_0_suback(p)
482 } else {
483 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
484 }
485 }
486 GenericPacket::V3_1_1Unsuback(p) => {
487 if role_id == server_id || role_id == any_id {
488 self.process_send_v3_1_1_unsuback(p)
489 } else {
490 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
491 }
492 }
493 GenericPacket::V5_0Unsuback(p) => {
494 if role_id == server_id || role_id == any_id {
495 self.process_send_v5_0_unsuback(p)
496 } else {
497 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
498 }
499 }
500 GenericPacket::V3_1_1Pingreq(p) => {
502 if role_id == client_id || role_id == any_id {
503 self.process_send_v3_1_1_pingreq(p)
504 } else {
505 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
506 }
507 }
508 GenericPacket::V5_0Pingreq(p) => {
509 if role_id == client_id || role_id == any_id {
510 self.process_send_v5_0_pingreq(p)
511 } else {
512 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
513 }
514 }
515 GenericPacket::V3_1_1Pingresp(p) => {
517 if role_id == server_id || role_id == any_id {
518 self.process_send_v3_1_1_pingresp(p)
519 } else {
520 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
521 }
522 }
523 GenericPacket::V5_0Pingresp(p) => {
524 if role_id == server_id || role_id == any_id {
525 self.process_send_v5_0_pingresp(p)
526 } else {
527 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
528 }
529 }
530 GenericPacket::V3_1_1Disconnect(p) => {
532 if role_id == client_id || role_id == any_id {
533 self.process_send_v3_1_1_disconnect(p)
534 } else {
535 vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
536 }
537 }
538 GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
540 GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
542 }
543 }
544
545 pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
594 let mut events = Vec::new();
595
596 match self.packet_builder.feed(data) {
597 PacketBuildResult::Complete(raw_packet) => {
598 events.extend(self.process_recv_packet(raw_packet));
599 }
600 PacketBuildResult::Incomplete => {}
601 PacketBuildResult::Error(e) => {
602 self.cancel_timers(&mut events);
603 events.push(GenericEvent::RequestClose);
604 events.push(GenericEvent::NotifyError(e));
605 }
606 }
607
608 events
609 }
610
611 pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
628 let mut events = Vec::new();
629
630 match kind {
631 TimerKind::PingreqSend => {
632 self.pingreq_send_set = false;
634
635 if self.status == ConnectionStatus::Connected {
637 match self.protocol_version {
638 Version::V3_1_1 => {
639 if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
640 events.extend(self.process_send_v3_1_1_pingreq(pingreq));
641 }
642 }
643 Version::V5_0 => {
644 if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
645 events.extend(self.process_send_v5_0_pingreq(pingreq));
646 }
647 }
648 Version::Undetermined => {
649 unreachable!("Protocol version should be set before sending PINGREQ");
650 }
651 }
652 }
653 }
654 TimerKind::PingreqRecv => {
655 self.pingreq_recv_set = false;
657
658 match self.protocol_version {
659 Version::V3_1_1 => {
660 events.push(GenericEvent::RequestClose);
662 }
663 Version::V5_0 => {
664 if self.status == ConnectionStatus::Connected {
666 if let Ok(disconnect) = v5_0::Disconnect::builder()
667 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
668 .build()
669 {
670 events.extend(self.process_send_v5_0_disconnect(disconnect));
671 }
672 }
673 }
674 Version::Undetermined => {
675 unreachable!("Protocol version should be set before receiving PINGREQ");
676 }
677 }
678 }
679 TimerKind::PingrespRecv => {
680 self.pingresp_recv_set = false;
682
683 match self.protocol_version {
684 Version::V3_1_1 => {
685 events.push(GenericEvent::RequestClose);
687 }
688 Version::V5_0 => {
689 if self.status == ConnectionStatus::Connected {
691 if let Ok(disconnect) = v5_0::Disconnect::builder()
692 .reason_code(DisconnectReasonCode::KeepAliveTimeout)
693 .build()
694 {
695 events.extend(self.process_send_v5_0_disconnect(disconnect));
696 }
697 }
698 }
699 Version::Undetermined => {
700 unreachable!("Protocol version should be set before receiving PINGRESP");
701 }
702 }
703 }
704 }
705
706 events
707 }
708
709 pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
722 let mut events = Vec::new();
723
724 self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
726 self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
727
728 self.status = ConnectionStatus::Disconnected;
730
731 self.topic_alias_send = None;
733 self.topic_alias_recv = None;
734
735 for packet_id in self.pid_suback.drain() {
737 if self.pid_man.is_used_id(packet_id) {
738 self.pid_man.release_id(packet_id);
739 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
740 }
741 }
742
743 for packet_id in self.pid_unsuback.drain() {
745 if self.pid_man.is_used_id(packet_id) {
746 self.pid_man.release_id(packet_id);
747 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
748 }
749 }
750
751 if !self.need_store {
753 self.qos2_publish_processing.clear();
754 self.qos2_publish_handled.clear();
755
756 for packet_id in self.pid_puback.drain() {
758 if self.pid_man.is_used_id(packet_id) {
759 self.pid_man.release_id(packet_id);
760 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
761 }
762 }
763
764 for packet_id in self.pid_pubrec.drain() {
766 if self.pid_man.is_used_id(packet_id) {
767 self.pid_man.release_id(packet_id);
768 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
769 }
770 }
771
772 for packet_id in self.pid_pubcomp.drain() {
774 if self.pid_man.is_used_id(packet_id) {
775 self.pid_man.release_id(packet_id);
776 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
777 }
778 }
779 }
780
781 self.cancel_timers(&mut events);
783
784 events
785 }
786
787 pub fn set_pingreq_send_interval(
800 &mut self,
801 duration_ms: u64,
802 ) -> Vec<GenericEvent<PacketIdType>> {
803 let mut events = Vec::new();
804
805 if duration_ms == 0 {
806 self.pingreq_send_interval_ms = None;
807 self.pingreq_send_set = false;
808 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
809 } else {
810 self.pingreq_send_interval_ms = Some(duration_ms);
811 self.pingreq_send_set = true;
812 events.push(GenericEvent::RequestTimerReset {
813 kind: TimerKind::PingreqSend,
814 duration_ms,
815 });
816 }
817
818 events
819 }
820
821 pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
830 self.publish_send_max
832 .map(|max| max.saturating_sub(self.publish_send_count))
833 }
834
835 pub fn set_offline_publish(&mut self, enable: bool) {
844 self.offline_publish = enable;
845 if self.offline_publish {
846 self.need_store = true;
847 }
848 }
849
850 pub fn set_auto_pub_response(&mut self, enable: bool) {
859 self.auto_pub_response = enable;
860 }
861
862 pub fn set_auto_ping_response(&mut self, enable: bool) {
870 self.auto_ping_response = enable;
871 }
872
873 pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
885 self.auto_map_topic_alias_send = enable;
886 }
887
888 pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
898 self.auto_replace_topic_alias_send = enable;
899 }
900
901 pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
903 self.pingresp_recv_timeout_ms = timeout_ms;
904 }
905
906 pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
912 self.pid_man.acquire_unique_id()
913 }
914
915 pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
928 self.pid_man.register_id(packet_id)
929 }
930
931 pub fn release_packet_id(
945 &mut self,
946 packet_id: PacketIdType,
947 ) -> Vec<GenericEvent<PacketIdType>> {
948 let mut events = Vec::new();
949
950 if self.pid_man.is_used_id(packet_id) {
951 self.pid_man.release_id(packet_id);
952 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
953 }
954
955 events
956 }
957
958 pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
967 self.qos2_publish_handled.clone()
968 }
969
970 pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
979 self.qos2_publish_handled = pids;
980 }
981
982 pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
991 for packet in packets {
992 match &packet {
993 GenericStorePacket::V3_1_1Publish(p) => {
994 match p.qos() {
996 Qos::AtLeastOnce => {
997 self.pid_puback.insert(p.packet_id().unwrap());
998 }
999 Qos::ExactlyOnce => {
1000 self.pid_pubrec.insert(p.packet_id().unwrap());
1001 }
1002 _ => {
1003 warn!("QoS 0 packet found in store, skipping");
1005 continue;
1006 }
1007 }
1008 let packet_id = p.packet_id().unwrap();
1010 if self.pid_man.register_id(packet_id).is_ok() {
1011 if let Err(_e) = self.store.add(packet) {
1012 error!("Failed to add packet to store: {:?}", _e);
1013 }
1014 } else {
1015 error!("Packet ID {} has already been used. Skip it", packet_id);
1016 }
1017 }
1018 GenericStorePacket::V5_0Publish(p) => {
1019 match p.qos() {
1021 Qos::AtLeastOnce => {
1022 self.pid_puback.insert(p.packet_id().unwrap());
1023 }
1024 Qos::ExactlyOnce => {
1025 self.pid_pubrec.insert(p.packet_id().unwrap());
1026 }
1027 _ => {
1028 warn!("QoS 0 packet found in store, skipping");
1030 continue;
1031 }
1032 }
1033 let packet_id = p.packet_id().unwrap();
1035 if self.pid_man.register_id(packet_id).is_ok() {
1036 if let Err(_e) = self.store.add(packet) {
1037 error!("Failed to add packet to store: {:?}", _e);
1038 }
1039 } else {
1040 error!("Packet ID {} has already been used. Skip it", packet_id);
1041 }
1042 }
1043 GenericStorePacket::V3_1_1Pubrel(p) => {
1044 self.pid_pubcomp.insert(p.packet_id());
1046 let packet_id = p.packet_id();
1048 if self.pid_man.register_id(packet_id).is_ok() {
1049 if let Err(_e) = self.store.add(packet) {
1050 error!("Failed to add packet to store: {:?}", _e);
1051 }
1052 } else {
1053 error!("Packet ID {} has already been used. Skip it", packet_id);
1054 }
1055 }
1056 GenericStorePacket::V5_0Pubrel(p) => {
1057 self.pid_pubcomp.insert(p.packet_id());
1059 let packet_id = p.packet_id();
1061 if self.pid_man.register_id(packet_id).is_ok() {
1062 if let Err(_e) = self.store.add(packet) {
1063 error!("Failed to add packet to store: {:?}", _e);
1064 }
1065 } else {
1066 error!("Packet ID {} has already been used. Skip it", packet_id);
1067 }
1068 }
1069 }
1070 }
1071 }
1072
1073 pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1082 self.store.get_stored()
1083 }
1084
1085 pub fn get_protocol_version(&self) -> Version {
1091 self.protocol_version
1092 }
1093
1094 pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1104 self.qos2_publish_processing.contains(&packet_id)
1105 }
1106
1107 pub fn regulate_for_store(
1112 &self,
1113 mut packet: v5_0::GenericPublish<PacketIdType>,
1114 ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1115 if packet.topic_name().is_empty() {
1116 if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1118 if let Some(ref topic_alias_send) = self.topic_alias_send {
1119 if let Some(topic) = topic_alias_send.peek(topic_alias) {
1120 packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1122 } else {
1123 return Err(MqttError::PacketNotRegulated);
1124 }
1125 } else {
1126 return Err(MqttError::PacketNotRegulated);
1127 }
1128 } else {
1129 return Err(MqttError::PacketNotRegulated);
1130 }
1131 } else {
1132 packet = packet.remove_topic_alias();
1134 }
1135
1136 Ok(packet)
1137 }
1138
1139 fn initialize(&mut self, is_client: bool) {
1153 self.publish_send_max = None;
1154 self.publish_recv_max = None;
1155 self.publish_send_count = 0;
1156 self.topic_alias_send = None;
1157 self.topic_alias_recv = None;
1158 self.publish_recv.clear();
1159 self.qos2_publish_processing.clear();
1160 self.need_store = false;
1161 self.pid_suback.clear();
1162 self.pid_unsuback.clear();
1163 self.is_client = is_client;
1164 }
1165
1166 fn clear_store_related(&mut self) {
1167 self.pid_man.clear();
1168 self.pid_puback.clear();
1169 self.pid_pubrec.clear();
1170 self.pid_pubcomp.clear();
1171 self.store.clear();
1172 }
1173
1174 fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1176 let mut events = Vec::new();
1177 self.store.for_each(|packet| {
1178 if packet.size() > self.maximum_packet_size_send as usize {
1179 let packet_id = packet.packet_id();
1180 self.pid_man.release_id(packet_id);
1181 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1182 return false; }
1184 events.push(GenericEvent::RequestSendPacket {
1185 packet: packet.clone().into(),
1186 release_packet_id_if_send_error: None,
1187 });
1188 true });
1190
1191 events
1192 }
1193
1194 fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1206 let topic_alias = topic_alias_opt?;
1207
1208 if !self.validate_topic_alias_range(topic_alias) {
1209 return None;
1210 }
1211
1212 let topic_alias_send = self.topic_alias_send.as_mut()?;
1213 let topic = topic_alias_send.get(topic_alias)?;
1215
1216 Some(topic.to_string())
1217 }
1218
1219 fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1231 let topic_alias_send = match &self.topic_alias_send {
1232 Some(tas) => tas,
1233 None => {
1234 error!("topic_alias is set but topic_alias_maximum is 0");
1235 return false;
1236 }
1237 };
1238
1239 if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1240 error!("topic_alias is set but out of range");
1241 return false;
1242 }
1243
1244 true
1245 }
1246
1247 pub(crate) fn process_send_v3_1_1_connect(
1249 &mut self,
1250 packet: v3_1_1::Connect,
1251 ) -> Vec<GenericEvent<PacketIdType>> {
1252 info!("send connect v3.1.1: {packet}");
1253
1254 if self.status != ConnectionStatus::Disconnected {
1255 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1256 }
1257 if !self.validate_maximum_packet_size_send(packet.size()) {
1258 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1259 }
1260
1261 let mut events = Vec::new();
1262 self.initialize(true);
1263 self.status = ConnectionStatus::Connecting;
1264
1265 let keep_alive = packet.keep_alive();
1267 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1268 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1269 }
1270
1271 if packet.clean_start() {
1273 self.clear_store_related();
1274 } else {
1275 self.need_store = true;
1276 }
1277
1278 self.topic_alias_send = None;
1280
1281 events.push(GenericEvent::RequestSendPacket {
1282 packet: packet.into(),
1283 release_packet_id_if_send_error: None,
1284 });
1285 self.send_post_process(&mut events);
1286
1287 events
1288 }
1289
1290 pub(crate) fn process_send_v5_0_connect(
1292 &mut self,
1293 packet: v5_0::Connect,
1294 ) -> Vec<GenericEvent<PacketIdType>> {
1295 info!("send connect v5.0: {packet}");
1296 if !self.validate_maximum_packet_size_send(packet.size()) {
1297 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1298 }
1299 if self.status != ConnectionStatus::Disconnected {
1300 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1301 }
1302
1303 let mut events = Vec::new();
1304 self.initialize(true);
1305 self.status = ConnectionStatus::Connecting;
1306
1307 let keep_alive = packet.keep_alive();
1309 if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1310 self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1311 }
1312
1313 if packet.clean_start() {
1315 self.clear_store_related();
1316 }
1317
1318 for prop in packet.props() {
1320 match prop {
1321 Property::TopicAliasMaximum(val) => {
1322 if val.val() != 0 {
1323 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1324 }
1325 }
1326 Property::ReceiveMaximum(val) => {
1327 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1328 self.publish_recv_max = Some(val.val());
1329 }
1330 Property::MaximumPacketSize(val) => {
1331 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1332 self.maximum_packet_size_recv = val.val();
1333 }
1334 Property::SessionExpiryInterval(val) => {
1335 if val.val() != 0 {
1336 self.need_store = true;
1337 }
1338 }
1339 _ => {
1340 }
1342 }
1343 }
1344
1345 events.push(GenericEvent::RequestSendPacket {
1346 packet: packet.into(),
1347 release_packet_id_if_send_error: None,
1348 });
1349 self.send_post_process(&mut events);
1350
1351 events
1352 }
1353
1354 pub(crate) fn process_send_v3_1_1_connack(
1355 &mut self,
1356 packet: v3_1_1::Connack,
1357 ) -> Vec<GenericEvent<PacketIdType>> {
1358 info!("send connack v3.1.1: {packet}");
1359 if self.status != ConnectionStatus::Connecting {
1360 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1361 }
1362 let mut events = Vec::new();
1363 if packet.return_code() == ConnectReturnCode::Accepted {
1364 self.status = ConnectionStatus::Connected;
1365 } else {
1366 self.status = ConnectionStatus::Disconnected;
1367 }
1368
1369 events.push(GenericEvent::RequestSendPacket {
1370 packet: packet.into(),
1371 release_packet_id_if_send_error: None,
1372 });
1373 events.extend(self.send_stored());
1374 self.send_post_process(&mut events);
1375
1376 events
1377 }
1378
1379 pub(crate) fn process_send_v5_0_connack(
1380 &mut self,
1381 packet: v5_0::Connack,
1382 ) -> Vec<GenericEvent<PacketIdType>> {
1383 info!("send connack v5.0: {packet}");
1384 if !self.validate_maximum_packet_size_send(packet.size()) {
1385 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1386 }
1387 if self.status != ConnectionStatus::Connecting {
1388 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1389 }
1390
1391 let mut events = Vec::new();
1392
1393 if packet.reason_code() == ConnectReasonCode::Success {
1394 self.status = ConnectionStatus::Connected;
1395
1396 for prop in packet.props() {
1398 match prop {
1399 Property::TopicAliasMaximum(val) => {
1400 if val.val() != 0 {
1401 self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1402 }
1403 }
1404 Property::ReceiveMaximum(val) => {
1405 debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1406 self.publish_recv_max = Some(val.val());
1407 }
1408 Property::MaximumPacketSize(val) => {
1409 debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1410 self.maximum_packet_size_recv = val.val();
1411 }
1412 _ => {
1413 }
1415 }
1416 }
1417 } else {
1418 self.status = ConnectionStatus::Disconnected;
1419 self.cancel_timers(&mut events);
1420 }
1421
1422 events.push(GenericEvent::RequestSendPacket {
1423 packet: packet.into(),
1424 release_packet_id_if_send_error: None,
1425 });
1426 events.extend(self.send_stored());
1427 self.send_post_process(&mut events);
1428
1429 events
1430 }
1431
1432 pub(crate) fn process_send_v3_1_1_publish(
1433 &mut self,
1434 packet: v3_1_1::GenericPublish<PacketIdType>,
1435 ) -> Vec<GenericEvent<PacketIdType>> {
1436 let mut events = Vec::new();
1437 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1438
1439 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1440 let packet_id = packet.packet_id().unwrap();
1442 if self.status != ConnectionStatus::Connected
1443 && !self.need_store
1444 && !self.offline_publish
1445 {
1446 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1447 if self.pid_man.is_used_id(packet_id) {
1448 self.pid_man.release_id(packet_id);
1449 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1450 }
1451 return events;
1452 }
1453 if !self.pid_man.is_used_id(packet_id) {
1454 error!("packet_id {packet_id} must be acquired or registered");
1455 events.push(GenericEvent::NotifyError(
1456 MqttError::PacketIdentifierInvalid,
1457 ));
1458 return events;
1459 }
1460 if self.need_store
1461 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1462 {
1463 let store_packet = packet.clone().set_dup(true);
1464 self.store.add(store_packet.try_into().unwrap()).unwrap();
1465 } else {
1466 release_packet_id_if_send_error = Some(packet_id);
1467 }
1468 if packet.qos() == Qos::ExactlyOnce {
1469 self.qos2_publish_processing.insert(packet_id);
1470 self.pid_pubrec.insert(packet_id);
1471 } else {
1472 self.pid_puback.insert(packet_id);
1473 }
1474 } else if self.status != ConnectionStatus::Connected {
1475 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1476 return events;
1477 }
1478
1479 if self.status == ConnectionStatus::Connected {
1480 events.push(GenericEvent::RequestSendPacket {
1481 packet: packet.into(),
1482 release_packet_id_if_send_error,
1483 });
1484 }
1485 self.send_post_process(&mut events);
1486
1487 events
1488 }
1489
1490 pub(crate) fn process_send_v5_0_publish(
1491 &mut self,
1492 mut packet: v5_0::GenericPublish<PacketIdType>,
1493 ) -> Vec<GenericEvent<PacketIdType>> {
1494 if !self.validate_maximum_packet_size_send(packet.size()) {
1495 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1496 }
1497
1498 let mut events = Vec::new();
1499 let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1500 let mut topic_alias_validated = false;
1501 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1502 let packet_id = packet.packet_id().unwrap();
1503 if self.status != ConnectionStatus::Connected
1504 && !self.need_store
1505 && !self.offline_publish
1506 {
1507 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1508 if self.pid_man.is_used_id(packet_id) {
1509 self.pid_man.release_id(packet_id);
1510 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1511 }
1512 return events;
1513 }
1514
1515 if !self.pid_man.is_used_id(packet_id) {
1517 error!("packet_id {packet_id} must be acquired or registered");
1518 events.push(GenericEvent::NotifyError(
1519 MqttError::PacketIdentifierInvalid,
1520 ));
1521 return events;
1522 }
1523
1524 if self.need_store
1525 && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1526 {
1527 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1528 if packet.topic_name().is_empty() {
1529 let topic_opt = self.validate_topic_alias(ta_opt);
1531 if topic_opt.is_none() {
1532 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1533 if self.pid_man.is_used_id(packet_id) {
1534 self.pid_man.release_id(packet_id);
1535 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1536 }
1537 return events;
1538 }
1539 topic_alias_validated = true;
1540 let store_packet = packet
1541 .clone()
1542 .remove_topic_alias_add_topic(topic_opt.unwrap())
1543 .unwrap()
1544 .set_dup(true);
1545 self.store.add(store_packet.try_into().unwrap()).unwrap();
1547 } else {
1548 let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1550 self.store.add(store_packet.try_into().unwrap()).unwrap();
1551 }
1552 } else {
1553 release_packet_id_if_send_error = Some(packet_id);
1554 }
1555 if packet.qos() == Qos::ExactlyOnce {
1556 self.qos2_publish_processing.insert(packet_id);
1557 self.pid_pubrec.insert(packet_id);
1558 } else {
1559 self.pid_puback.insert(packet_id);
1560 }
1561 } else if self.status != ConnectionStatus::Connected {
1562 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1563 return events;
1564 }
1565
1566 let packet_id_opt = packet.packet_id();
1567 let ta_opt = Self::get_topic_alias_from_props(packet.props());
1568 if packet.topic_name().is_empty() {
1569 if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1571 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1572 if let Some(packet_id) = packet_id_opt {
1573 if self.pid_man.is_used_id(packet_id) {
1574 self.pid_man.release_id(packet_id);
1575 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1576 }
1577 }
1578 return events;
1579 }
1580 } else if let Some(ta) = ta_opt {
1581 if self.validate_topic_alias_range(ta) {
1583 trace!(
1584 "topic alias: {} - {} is registered.",
1585 packet.topic_name(),
1586 ta
1587 );
1588 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589 topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590 }
1591 } else {
1592 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593 if let Some(packet_id) = packet_id_opt {
1594 if self.pid_man.is_used_id(packet_id) {
1595 self.pid_man.release_id(packet_id);
1596 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597 }
1598 }
1599 return events;
1600 }
1601 } else if self.status == ConnectionStatus::Connected {
1602 if self.auto_map_topic_alias_send {
1604 if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1606 trace!(
1607 "topic alias: {} - {} is found.",
1608 packet.topic_name(),
1609 found_ta
1610 );
1611 packet = packet.remove_topic_add_topic_alias(found_ta);
1612 } else {
1613 let lru_ta = topic_alias_send.get_lru_alias();
1614 topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1615 packet = packet.remove_topic_add_topic_alias(lru_ta);
1616 }
1617 }
1618 } else if self.auto_replace_topic_alias_send {
1619 if let Some(ref topic_alias_send) = self.topic_alias_send {
1620 if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1621 trace!(
1622 "topic alias: {} - {} is found.",
1623 packet.topic_name(),
1624 found_ta
1625 );
1626 packet = packet.remove_topic_add_topic_alias(found_ta);
1627 }
1628 }
1629 }
1630 }
1631
1632 if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1634 if let Some(max) = self.publish_send_max {
1635 if self.publish_send_count == max {
1636 events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1637 if let Some(packet_id) = packet_id_opt {
1638 if self.pid_man.is_used_id(packet_id) {
1639 self.pid_man.release_id(packet_id);
1640 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1641 }
1642 }
1643 return events;
1644 }
1645 self.publish_send_count += 1;
1646 }
1647 }
1648
1649 if self.status == ConnectionStatus::Connected {
1650 events.push(GenericEvent::RequestSendPacket {
1651 packet: packet.into(),
1652 release_packet_id_if_send_error,
1653 });
1654 }
1655 self.send_post_process(&mut events);
1656
1657 events
1658 }
1659
1660 pub(crate) fn process_send_v3_1_1_puback(
1661 &mut self,
1662 packet: v3_1_1::GenericPuback<PacketIdType>,
1663 ) -> Vec<GenericEvent<PacketIdType>> {
1664 if self.status != ConnectionStatus::Connected {
1665 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1666 }
1667 let mut events = Vec::new();
1668
1669 events.push(GenericEvent::RequestSendPacket {
1670 packet: packet.into(),
1671 release_packet_id_if_send_error: None,
1672 });
1673 self.send_post_process(&mut events);
1674
1675 events
1676 }
1677
1678 pub(crate) fn process_send_v5_0_puback(
1679 &mut self,
1680 packet: v5_0::GenericPuback<PacketIdType>,
1681 ) -> Vec<GenericEvent<PacketIdType>> {
1682 if !self.validate_maximum_packet_size_send(packet.size()) {
1683 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1684 }
1685 if self.status != ConnectionStatus::Connected {
1686 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1687 }
1688
1689 let mut events = Vec::new();
1690 self.publish_recv.remove(&packet.packet_id());
1691
1692 events.push(GenericEvent::RequestSendPacket {
1693 packet: packet.into(),
1694 release_packet_id_if_send_error: None,
1695 });
1696 self.send_post_process(&mut events);
1697
1698 events
1699 }
1700
1701 pub(crate) fn process_send_v3_1_1_pubrec(
1702 &mut self,
1703 packet: v3_1_1::GenericPubrec<PacketIdType>,
1704 ) -> Vec<GenericEvent<PacketIdType>> {
1705 if self.status != ConnectionStatus::Connected {
1706 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1707 }
1708 let mut events = Vec::new();
1709
1710 events.push(GenericEvent::RequestSendPacket {
1711 packet: packet.into(),
1712 release_packet_id_if_send_error: None,
1713 });
1714 self.send_post_process(&mut events);
1715
1716 events
1717 }
1718
1719 pub(crate) fn process_send_v5_0_pubrec(
1720 &mut self,
1721 packet: v5_0::GenericPubrec<PacketIdType>,
1722 ) -> Vec<GenericEvent<PacketIdType>> {
1723 if !self.validate_maximum_packet_size_send(packet.size()) {
1724 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1725 }
1726 if self.status != ConnectionStatus::Connected {
1727 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1728 }
1729
1730 let mut events = Vec::new();
1731 let packet_id = packet.packet_id();
1732
1733 if let Some(rc) = packet.reason_code() {
1734 if rc.is_failure() {
1735 self.publish_recv.remove(&packet_id);
1736 self.qos2_publish_handled.remove(&packet_id);
1737 }
1738 }
1739
1740 events.push(GenericEvent::RequestSendPacket {
1741 packet: packet.into(),
1742 release_packet_id_if_send_error: None,
1743 });
1744 self.send_post_process(&mut events);
1745
1746 events
1747 }
1748
1749 pub(crate) fn process_send_v3_1_1_pubrel(
1750 &mut self,
1751 packet: v3_1_1::GenericPubrel<PacketIdType>,
1752 ) -> Vec<GenericEvent<PacketIdType>> {
1753 if self.status != ConnectionStatus::Connected && !self.need_store {
1754 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1755 }
1756 let mut events = Vec::new();
1757 let packet_id = packet.packet_id();
1758 if !self.pid_man.is_used_id(packet_id) {
1759 error!("packet_id {packet_id} must be acquired or registered");
1760 events.push(GenericEvent::NotifyError(
1761 MqttError::PacketIdentifierInvalid,
1762 ));
1763 return events;
1764 }
1765 if self.need_store {
1766 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1767 }
1768
1769 if self.status == ConnectionStatus::Connected {
1770 self.pid_pubcomp.insert(packet_id);
1771 events.push(GenericEvent::RequestSendPacket {
1772 packet: packet.into(),
1773 release_packet_id_if_send_error: None,
1774 });
1775 }
1776 self.send_post_process(&mut events);
1777
1778 events
1779 }
1780
1781 pub(crate) fn process_send_v5_0_pubrel(
1782 &mut self,
1783 packet: v5_0::GenericPubrel<PacketIdType>,
1784 ) -> Vec<GenericEvent<PacketIdType>> {
1785 if !self.validate_maximum_packet_size_send(packet.size()) {
1786 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1787 }
1788 if self.status != ConnectionStatus::Connected && !self.need_store {
1789 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1790 }
1791
1792 let mut events = Vec::new();
1793 let packet_id = packet.packet_id();
1794 if !self.pid_man.is_used_id(packet_id) {
1795 error!("packet_id {packet_id} must be acquired or registered");
1796 events.push(GenericEvent::NotifyError(
1797 MqttError::PacketIdentifierInvalid,
1798 ));
1799 return events;
1800 }
1801 if self.need_store {
1802 self.store.add(packet.clone().try_into().unwrap()).unwrap();
1803 }
1804
1805 if self.status == ConnectionStatus::Connected {
1806 self.pid_pubcomp.insert(packet_id);
1807 events.push(GenericEvent::RequestSendPacket {
1808 packet: packet.into(),
1809 release_packet_id_if_send_error: None,
1810 });
1811 }
1812 self.send_post_process(&mut events);
1813
1814 events
1815 }
1816
1817 pub(crate) fn process_send_v3_1_1_pubcomp(
1818 &mut self,
1819 packet: v3_1_1::GenericPubcomp<PacketIdType>,
1820 ) -> Vec<GenericEvent<PacketIdType>> {
1821 if self.status != ConnectionStatus::Connected {
1822 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1823 }
1824 let mut events = Vec::new();
1825
1826 events.push(GenericEvent::RequestSendPacket {
1827 packet: packet.into(),
1828 release_packet_id_if_send_error: None,
1829 });
1830 self.send_post_process(&mut events);
1831
1832 events
1833 }
1834
1835 pub(crate) fn process_send_v5_0_pubcomp(
1836 &mut self,
1837 packet: v5_0::GenericPubcomp<PacketIdType>,
1838 ) -> Vec<GenericEvent<PacketIdType>> {
1839 if !self.validate_maximum_packet_size_send(packet.size()) {
1840 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1841 }
1842 if self.status != ConnectionStatus::Connected {
1843 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1844 }
1845
1846 let mut events = Vec::new();
1847 self.publish_recv.remove(&packet.packet_id());
1848
1849 events.push(GenericEvent::RequestSendPacket {
1850 packet: packet.into(),
1851 release_packet_id_if_send_error: None,
1852 });
1853 self.send_post_process(&mut events);
1854
1855 events
1856 }
1857
1858 pub(crate) fn process_send_v3_1_1_subscribe(
1859 &mut self,
1860 packet: v3_1_1::GenericSubscribe<PacketIdType>,
1861 ) -> Vec<GenericEvent<PacketIdType>> {
1862 let mut events = Vec::new();
1863 let packet_id = packet.packet_id();
1864 if self.status != ConnectionStatus::Connected {
1865 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1866 if self.pid_man.is_used_id(packet_id) {
1867 self.pid_man.release_id(packet_id);
1868 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1869 }
1870 return events;
1871 }
1872 if !self.pid_man.is_used_id(packet_id) {
1873 error!("packet_id {packet_id} must be acquired or registered");
1874 events.push(GenericEvent::NotifyError(
1875 MqttError::PacketIdentifierInvalid,
1876 ));
1877 return events;
1878 }
1879 self.pid_suback.insert(packet_id);
1880
1881 events.push(GenericEvent::RequestSendPacket {
1882 packet: packet.into(),
1883 release_packet_id_if_send_error: Some(packet_id),
1884 });
1885 self.send_post_process(&mut events);
1886
1887 events
1888 }
1889
1890 pub(crate) fn process_send_v5_0_subscribe(
1891 &mut self,
1892 packet: v5_0::GenericSubscribe<PacketIdType>,
1893 ) -> Vec<GenericEvent<PacketIdType>> {
1894 if !self.validate_maximum_packet_size_send(packet.size()) {
1895 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1896 }
1897
1898 let mut events = Vec::new();
1899 let packet_id = packet.packet_id();
1900 if self.status != ConnectionStatus::Connected {
1901 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1902 if self.pid_man.is_used_id(packet_id) {
1903 self.pid_man.release_id(packet_id);
1904 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1905 }
1906 return events;
1907 }
1908 if !self.pid_man.is_used_id(packet_id) {
1909 error!("packet_id {packet_id} must be acquired or registered");
1910 events.push(GenericEvent::NotifyError(
1911 MqttError::PacketIdentifierInvalid,
1912 ));
1913 return events;
1914 }
1915 self.pid_suback.insert(packet_id);
1916
1917 events.push(GenericEvent::RequestSendPacket {
1918 packet: packet.into(),
1919 release_packet_id_if_send_error: Some(packet_id),
1920 });
1921 self.send_post_process(&mut events);
1922
1923 events
1924 }
1925
1926 pub(crate) fn process_send_v3_1_1_suback(
1927 &mut self,
1928 packet: v3_1_1::GenericSuback<PacketIdType>,
1929 ) -> Vec<GenericEvent<PacketIdType>> {
1930 if self.status != ConnectionStatus::Connected {
1931 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1932 }
1933 let mut events = Vec::new();
1934 events.push(GenericEvent::RequestSendPacket {
1935 packet: packet.into(),
1936 release_packet_id_if_send_error: None,
1937 });
1938 self.send_post_process(&mut events);
1939
1940 events
1941 }
1942
1943 pub(crate) fn process_send_v5_0_suback(
1944 &mut self,
1945 packet: v5_0::GenericSuback<PacketIdType>,
1946 ) -> Vec<GenericEvent<PacketIdType>> {
1947 if !self.validate_maximum_packet_size_send(packet.size()) {
1948 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1949 }
1950 if self.status != ConnectionStatus::Connected {
1951 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1952 }
1953
1954 let mut events = Vec::new();
1955 events.push(GenericEvent::RequestSendPacket {
1956 packet: packet.into(),
1957 release_packet_id_if_send_error: None,
1958 });
1959 self.send_post_process(&mut events);
1960
1961 events
1962 }
1963
1964 pub(crate) fn process_send_v3_1_1_unsubscribe(
1965 &mut self,
1966 packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1967 ) -> Vec<GenericEvent<PacketIdType>> {
1968 let mut events = Vec::new();
1969 let packet_id = packet.packet_id();
1970 if self.status != ConnectionStatus::Connected {
1971 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1972 if self.pid_man.is_used_id(packet_id) {
1973 self.pid_man.release_id(packet_id);
1974 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1975 }
1976 return events;
1977 }
1978 if !self.pid_man.is_used_id(packet_id) {
1979 error!("packet_id {packet_id} must be acquired or registered");
1980 events.push(GenericEvent::NotifyError(
1981 MqttError::PacketIdentifierInvalid,
1982 ));
1983 return events;
1984 }
1985 self.pid_unsuback.insert(packet_id);
1986
1987 events.push(GenericEvent::RequestSendPacket {
1988 packet: packet.into(),
1989 release_packet_id_if_send_error: Some(packet_id),
1990 });
1991 self.send_post_process(&mut events);
1992
1993 events
1994 }
1995
1996 pub(crate) fn process_send_v5_0_unsubscribe(
1997 &mut self,
1998 packet: v5_0::GenericUnsubscribe<PacketIdType>,
1999 ) -> Vec<GenericEvent<PacketIdType>> {
2000 if !self.validate_maximum_packet_size_send(packet.size()) {
2001 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2002 }
2003
2004 let mut events = Vec::new();
2005 let packet_id = packet.packet_id();
2006 if self.status != ConnectionStatus::Connected {
2007 events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2008 if self.pid_man.is_used_id(packet_id) {
2009 self.pid_man.release_id(packet_id);
2010 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2011 }
2012 return events;
2013 }
2014 if !self.pid_man.is_used_id(packet_id) {
2015 error!("packet_id {packet_id} must be acquired or registered");
2016 events.push(GenericEvent::NotifyError(
2017 MqttError::PacketIdentifierInvalid,
2018 ));
2019 return events;
2020 }
2021 self.pid_unsuback.insert(packet_id);
2022
2023 events.push(GenericEvent::RequestSendPacket {
2024 packet: packet.into(),
2025 release_packet_id_if_send_error: Some(packet_id),
2026 });
2027 self.send_post_process(&mut events);
2028
2029 events
2030 }
2031
2032 pub(crate) fn process_send_v3_1_1_unsuback(
2033 &mut self,
2034 packet: v3_1_1::GenericUnsuback<PacketIdType>,
2035 ) -> Vec<GenericEvent<PacketIdType>> {
2036 if self.status != ConnectionStatus::Connected {
2037 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2038 }
2039 let mut events = Vec::new();
2040 events.push(GenericEvent::RequestSendPacket {
2041 packet: packet.into(),
2042 release_packet_id_if_send_error: None,
2043 });
2044 self.send_post_process(&mut events);
2045
2046 events
2047 }
2048
2049 pub(crate) fn process_send_v5_0_unsuback(
2050 &mut self,
2051 packet: v5_0::GenericUnsuback<PacketIdType>,
2052 ) -> Vec<GenericEvent<PacketIdType>> {
2053 if !self.validate_maximum_packet_size_send(packet.size()) {
2054 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2055 }
2056 if self.status != ConnectionStatus::Connected {
2057 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2058 }
2059
2060 let mut events = Vec::new();
2061 events.push(GenericEvent::RequestSendPacket {
2062 packet: packet.into(),
2063 release_packet_id_if_send_error: None,
2064 });
2065 self.send_post_process(&mut events);
2066
2067 events
2068 }
2069
2070 pub(crate) fn process_send_v3_1_1_pingreq(
2071 &mut self,
2072 packet: v3_1_1::Pingreq,
2073 ) -> Vec<GenericEvent<PacketIdType>> {
2074 if self.status != ConnectionStatus::Connected {
2075 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076 }
2077 let mut events = Vec::new();
2078 events.push(GenericEvent::RequestSendPacket {
2079 packet: packet.into(),
2080 release_packet_id_if_send_error: None,
2081 });
2082 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2083 self.pingreq_send_set = true;
2084 events.push(GenericEvent::RequestTimerReset {
2085 kind: TimerKind::PingrespRecv,
2086 duration_ms: timeout_ms,
2087 });
2088 }
2089 self.send_post_process(&mut events);
2090
2091 events
2092 }
2093
2094 pub(crate) fn process_send_v5_0_pingreq(
2095 &mut self,
2096 packet: v5_0::Pingreq,
2097 ) -> Vec<GenericEvent<PacketIdType>> {
2098 if !self.validate_maximum_packet_size_send(packet.size()) {
2099 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2100 }
2101 if self.status != ConnectionStatus::Connected {
2102 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2103 }
2104
2105 let mut events = Vec::new();
2106 events.push(GenericEvent::RequestSendPacket {
2107 packet: packet.into(),
2108 release_packet_id_if_send_error: None,
2109 });
2110 if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2111 self.pingreq_send_set = true;
2112 events.push(GenericEvent::RequestTimerReset {
2113 kind: TimerKind::PingrespRecv,
2114 duration_ms: timeout_ms,
2115 });
2116 }
2117 self.send_post_process(&mut events);
2118
2119 events
2120 }
2121
2122 pub(crate) fn process_send_v3_1_1_pingresp(
2123 &mut self,
2124 packet: v3_1_1::Pingresp,
2125 ) -> Vec<GenericEvent<PacketIdType>> {
2126 if self.status != ConnectionStatus::Connected {
2127 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2128 }
2129 let mut events = Vec::new();
2130 events.push(GenericEvent::RequestSendPacket {
2131 packet: packet.into(),
2132 release_packet_id_if_send_error: None,
2133 });
2134 self.send_post_process(&mut events);
2135
2136 events
2137 }
2138
2139 pub(crate) fn process_send_v5_0_pingresp(
2140 &mut self,
2141 packet: v5_0::Pingresp,
2142 ) -> Vec<GenericEvent<PacketIdType>> {
2143 if !self.validate_maximum_packet_size_send(packet.size()) {
2144 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2145 }
2146 if self.status != ConnectionStatus::Connected {
2147 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2148 }
2149
2150 let mut events = Vec::new();
2151 events.push(GenericEvent::RequestSendPacket {
2152 packet: packet.into(),
2153 release_packet_id_if_send_error: None,
2154 });
2155 self.send_post_process(&mut events);
2156
2157 events
2158 }
2159
2160 pub(crate) fn process_send_v3_1_1_disconnect(
2161 &mut self,
2162 packet: v3_1_1::Disconnect,
2163 ) -> Vec<GenericEvent<PacketIdType>> {
2164 if self.status != ConnectionStatus::Connected {
2165 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166 }
2167 let mut events = Vec::new();
2168 self.status = ConnectionStatus::Disconnected;
2169 self.cancel_timers(&mut events);
2170 events.push(GenericEvent::RequestSendPacket {
2171 packet: packet.into(),
2172 release_packet_id_if_send_error: None,
2173 });
2174 events.push(GenericEvent::RequestClose);
2175
2176 events
2177 }
2178
2179 pub(crate) fn process_send_v5_0_disconnect(
2180 &mut self,
2181 packet: v5_0::Disconnect,
2182 ) -> Vec<GenericEvent<PacketIdType>> {
2183 if !self.validate_maximum_packet_size_send(packet.size()) {
2184 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2185 }
2186 if self.status != ConnectionStatus::Connected {
2187 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2188 }
2189
2190 let mut events = Vec::new();
2191 self.status = ConnectionStatus::Disconnected;
2192 self.cancel_timers(&mut events);
2193 events.push(GenericEvent::RequestSendPacket {
2194 packet: packet.into(),
2195 release_packet_id_if_send_error: None,
2196 });
2197 events.push(GenericEvent::RequestClose);
2198
2199 events
2200 }
2201
2202 pub(crate) fn process_send_v5_0_auth(
2203 &mut self,
2204 packet: v5_0::Auth,
2205 ) -> Vec<GenericEvent<PacketIdType>> {
2206 if !self.validate_maximum_packet_size_send(packet.size()) {
2207 return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2208 }
2209 if self.status == ConnectionStatus::Disconnected {
2210 return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2211 }
2212
2213 let mut events = Vec::new();
2214 events.push(GenericEvent::RequestSendPacket {
2215 packet: packet.into(),
2216 release_packet_id_if_send_error: None,
2217 });
2218 self.send_post_process(&mut events);
2219
2220 events
2221 }
2222
2223 fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2224 if self.is_client {
2225 if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2226 self.pingreq_send_set = true;
2227 events.push(GenericEvent::RequestTimerReset {
2228 kind: TimerKind::PingreqSend,
2229 duration_ms: timeout_ms,
2230 });
2231 }
2232 }
2233 }
2234
2235 fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2236 if size > self.maximum_packet_size_send as usize {
2237 error!("packet size over maximum_packet_size for sending");
2238 return false;
2239 }
2240 true
2241 }
2242
2243 fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2244 let mut events = Vec::new();
2245
2246 let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2248 if total_size > self.maximum_packet_size_recv {
2249 let disconnect_packet = v5_0::Disconnect::builder()
2255 .reason_code(DisconnectReasonCode::PacketTooLarge)
2256 .build()
2257 .unwrap();
2258 events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2260 events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2261 return events;
2262 }
2263
2264 let packet_type = raw_packet.packet_type();
2265 let _flags = raw_packet.flags();
2266 match self.protocol_version {
2267 Version::V3_1_1 => {
2268 match packet_type {
2269 1 => {
2270 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2272 }
2273 2 => {
2274 events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2276 }
2277 3 => {
2278 events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2280 }
2281 4 => {
2282 events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2284 }
2285 5 => {
2286 events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2288 }
2289 6 => {
2290 events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2292 }
2293 7 => {
2294 events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2296 }
2297 8 => {
2298 events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2300 }
2301 9 => {
2302 events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2304 }
2305 10 => {
2306 events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2308 }
2309 11 => {
2310 events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2312 }
2313 12 => {
2314 events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2316 }
2317 13 => {
2318 events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2320 }
2321 14 => {
2322 events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2324 }
2325 _ => {
2327 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2328 }
2329 }
2330 }
2331 Version::V5_0 => {
2332 match packet_type {
2333 1 => {
2334 events.extend(self.process_recv_v5_0_connect(raw_packet));
2336 }
2337 2 => {
2338 events.extend(self.process_recv_v5_0_connack(raw_packet));
2340 }
2341 3 => {
2342 events.extend(self.process_recv_v5_0_publish(raw_packet));
2344 }
2345 4 => {
2346 events.extend(self.process_recv_v5_0_puback(raw_packet));
2348 }
2349 5 => {
2350 events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2352 }
2353 6 => {
2354 events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2356 }
2357 7 => {
2358 events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2360 }
2361 8 => {
2362 events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2364 }
2365 9 => {
2366 events.extend(self.process_recv_v5_0_suback(raw_packet));
2368 }
2369 10 => {
2370 events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2372 }
2373 11 => {
2374 events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2376 }
2377 12 => {
2378 events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2380 }
2381 13 => {
2382 events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2384 }
2385 14 => {
2386 events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2388 }
2389 15 => {
2390 events.extend(self.process_recv_v5_0_auth(raw_packet));
2392 }
2393 _ => {
2395 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2396 }
2397 }
2398 }
2399 Version::Undetermined => {
2400 match packet_type {
2401 1 => {
2402 if raw_packet.remaining_length() < 7 {
2404 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2405 return events;
2406 }
2407 match raw_packet.data_as_slice()[6] {
2408 4 => {
2410 self.protocol_version = Version::V3_1_1;
2411 events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2412 }
2413 5 => {
2414 self.protocol_version = Version::V5_0;
2415 events.extend(self.process_recv_v5_0_connect(raw_packet));
2416 }
2417 _ => {
2418 events.push(GenericEvent::NotifyError(
2419 MqttError::UnsupportedProtocolVersion,
2420 ));
2421 }
2422 }
2423 }
2424 _ => {
2425 events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2426 }
2427 }
2428 }
2429 }
2430
2431 events
2432 }
2433
2434 fn process_recv_v3_1_1_connect(
2435 &mut self,
2436 raw_packet: RawPacket,
2437 ) -> Vec<GenericEvent<PacketIdType>> {
2438 let mut events = Vec::new();
2439 match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2440 Ok((packet, _)) => {
2441 if self.status != ConnectionStatus::Disconnected {
2442 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2443 return events;
2444 }
2445 self.initialize(false);
2446 self.status = ConnectionStatus::Connecting;
2447 if packet.keep_alive() > 0 {
2448 self.pingreq_recv_timeout_ms =
2449 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2450 }
2451 if packet.clean_session() {
2452 self.clear_store_related();
2453 } else {
2454 self.need_store = true;
2455 }
2456 events.extend(self.refresh_pingreq_recv());
2457 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2458 }
2459 Err(e) => {
2460 if self.status == ConnectionStatus::Disconnected {
2461 self.status = ConnectionStatus::Connecting;
2462 let rc = match e {
2463 MqttError::ClientIdentifierNotValid => {
2464 ConnectReturnCode::IdentifierRejected
2465 }
2466 MqttError::BadUserNameOrPassword => {
2467 ConnectReturnCode::BadUserNameOrPassword
2468 }
2469 MqttError::UnsupportedProtocolVersion => {
2470 ConnectReturnCode::UnacceptableProtocolVersion
2471 }
2472 _ => ConnectReturnCode::NotAuthorized, };
2474 let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2475 let connack_events = self.process_send_v3_1_1_connack(connack);
2476 events.extend(connack_events);
2477 } else {
2478 events.push(GenericEvent::RequestClose);
2479 }
2480 events.push(GenericEvent::NotifyError(e));
2481 }
2482 }
2483
2484 events
2485 }
2486
2487 fn process_recv_v5_0_connect(
2488 &mut self,
2489 raw_packet: RawPacket,
2490 ) -> Vec<GenericEvent<PacketIdType>> {
2491 let mut events = Vec::new();
2492 match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2493 Ok((packet, _)) => {
2494 if self.status != ConnectionStatus::Disconnected {
2495 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2496 return events;
2497 }
2498 self.initialize(false);
2499 self.status = ConnectionStatus::Connecting;
2500 if packet.keep_alive() > 0 {
2501 self.pingreq_recv_timeout_ms =
2502 Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2503 }
2504 if packet.clean_start() {
2505 self.clear_store_related();
2506 }
2507 packet.props().iter().for_each(|prop| match prop {
2508 Property::TopicAliasMaximum(p) => {
2509 self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2510 }
2511 Property::ReceiveMaximum(p) => {
2512 self.publish_send_max = Some(p.val());
2513 }
2514 Property::MaximumPacketSize(p) => {
2515 self.maximum_packet_size_send = p.val();
2516 }
2517 Property::SessionExpiryInterval(p) => {
2518 if p.val() != 0 {
2519 self.need_store = true;
2520 }
2521 }
2522 _ => {}
2523 });
2524 events.extend(self.refresh_pingreq_recv());
2525 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2526 }
2527 Err(e) => {
2528 if self.status == ConnectionStatus::Disconnected {
2529 self.status = ConnectionStatus::Connecting;
2530 let rc = match e {
2531 MqttError::ClientIdentifierNotValid => {
2532 ConnectReasonCode::ClientIdentifierNotValid
2533 }
2534 MqttError::BadUserNameOrPassword => {
2535 ConnectReasonCode::BadAuthenticationMethod
2536 }
2537 MqttError::UnsupportedProtocolVersion => {
2538 ConnectReasonCode::UnsupportedProtocolVersion
2539 }
2540 _ => ConnectReasonCode::UnspecifiedError,
2541 };
2542 let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2543 let connack_events = self.process_send_v5_0_connack(connack);
2544 events.extend(connack_events);
2545 } else {
2546 let disconnect = v5_0::Disconnect::builder()
2547 .reason_code(DisconnectReasonCode::ProtocolError)
2548 .build()
2549 .unwrap();
2550 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2551 events.extend(disconnect_events);
2552 }
2553 events.push(GenericEvent::NotifyError(e));
2554 }
2555 }
2556
2557 events
2558 }
2559
2560 fn process_recv_v3_1_1_connack(
2561 &mut self,
2562 raw_packet: RawPacket,
2563 ) -> Vec<GenericEvent<PacketIdType>> {
2564 let mut events = Vec::new();
2565
2566 match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2567 Ok((packet, _consumed)) => {
2568 if packet.return_code() == ConnectReturnCode::Accepted {
2569 self.status = ConnectionStatus::Connected;
2570 if packet.session_present() {
2571 events.extend(self.send_stored());
2572 } else {
2573 self.clear_store_related();
2574 }
2575 }
2576 events.push(GenericEvent::NotifyPacketReceived(
2577 GenericPacket::V3_1_1Connack(packet),
2578 ));
2579 }
2580 Err(e) => {
2581 events.push(GenericEvent::RequestClose);
2582 events.push(GenericEvent::NotifyError(e));
2583 }
2584 }
2585
2586 events
2587 }
2588
2589 fn process_recv_v5_0_connack(
2590 &mut self,
2591 raw_packet: RawPacket,
2592 ) -> Vec<GenericEvent<PacketIdType>> {
2593 let mut events = Vec::new();
2594
2595 match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2596 Ok((packet, _consumed)) => {
2597 if packet.reason_code() == ConnectReasonCode::Success {
2598 self.status = ConnectionStatus::Connected;
2599
2600 for prop in packet.props() {
2602 match prop {
2603 Property::TopicAliasMaximum(val) => {
2604 if val.val() > 0 {
2605 self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2606 }
2607 }
2608 Property::ReceiveMaximum(val) => {
2609 assert!(val.val() != 0);
2610 self.publish_send_max = Some(val.val());
2611 }
2612 Property::MaximumPacketSize(val) => {
2613 assert!(val.val() != 0);
2614 self.maximum_packet_size_send = val.val();
2615 }
2616 Property::ServerKeepAlive(val) => {
2617 let timeout_ms = (val.val() as u64) * 1000;
2619 self.pingreq_send_interval_ms = Some(timeout_ms);
2620 }
2621 _ => {
2622 }
2624 }
2625 }
2626
2627 if packet.session_present() {
2628 events.extend(self.send_stored());
2629 } else {
2630 self.clear_store_related();
2631 }
2632 }
2633 events.push(GenericEvent::NotifyPacketReceived(
2634 GenericPacket::V5_0Connack(packet),
2635 ));
2636 }
2637 Err(e) => {
2638 if self.status == ConnectionStatus::Connected {
2639 let disconnect = v5_0::Disconnect::builder()
2640 .reason_code(e.into())
2641 .build()
2642 .unwrap();
2643 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2644 events.extend(disconnect_events);
2645 }
2646 events.push(GenericEvent::NotifyError(e));
2647 }
2648 }
2649
2650 events
2651 }
2652
2653 fn process_recv_v3_1_1_publish(
2654 &mut self,
2655 raw_packet: RawPacket,
2656 ) -> Vec<GenericEvent<PacketIdType>> {
2657 let mut events = Vec::new();
2658
2659 let flags = raw_packet.flags();
2660 match &raw_packet.data {
2661 PacketData::Publish(arc) => {
2662 match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2663 Ok((packet, _consumed)) => {
2664 match packet.qos() {
2665 Qos::AtMostOnce => {
2666 events.extend(self.refresh_pingreq_recv());
2667 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2668 }
2669 Qos::AtLeastOnce => {
2670 let packet_id = packet.packet_id().unwrap();
2671 if self.status == ConnectionStatus::Connected
2672 && self.auto_pub_response
2673 {
2674 let puback = v3_1_1::GenericPuback::builder()
2676 .packet_id(packet_id)
2677 .build()
2678 .unwrap();
2679 events.extend(self.process_send_v3_1_1_puback(puback));
2680 }
2681 events.extend(self.refresh_pingreq_recv());
2682 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2683 }
2684 Qos::ExactlyOnce => {
2685 let packet_id = packet.packet_id().unwrap();
2686 let already_handled = !self.qos2_publish_handled.insert(packet_id);
2687
2688 if self.status == ConnectionStatus::Connected
2689 && (self.auto_pub_response || already_handled)
2690 {
2691 let pubrec = v3_1_1::GenericPubrec::builder()
2692 .packet_id(packet_id)
2693 .build()
2694 .unwrap();
2695 events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2696 }
2697 events.extend(self.refresh_pingreq_recv());
2698 if !already_handled {
2699 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2700 }
2701 }
2702 }
2703 }
2704 Err(e) => {
2705 events.push(GenericEvent::RequestClose);
2706 events.push(GenericEvent::NotifyError(e));
2707 }
2708 }
2709 }
2710 PacketData::Normal(_) => {
2711 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2712 }
2713 }
2714
2715 events
2716 }
2717
2718 fn process_recv_v5_0_publish(
2719 &mut self,
2720 raw_packet: RawPacket,
2721 ) -> Vec<GenericEvent<PacketIdType>> {
2722 let mut events = Vec::new();
2723
2724 let flags = raw_packet.flags();
2725 match &raw_packet.data {
2726 PacketData::Publish(arc) => {
2727 match v5_0::GenericPublish::parse(flags, arc.clone()) {
2728 Ok((packet, _consumed)) => {
2729 let mut already_handled = false;
2730 let mut puback_send = false;
2731 let mut pubrec_send = false;
2732
2733 match packet.qos() {
2734 Qos::AtLeastOnce => {
2735 let packet_id = packet.packet_id().unwrap();
2736 if let Some(max) = self.publish_recv_max {
2737 if self.publish_recv.len() >= max as usize {
2738 let disconnect = v5_0::Disconnect::builder()
2739 .reason_code(
2740 DisconnectReasonCode::ReceiveMaximumExceeded,
2741 )
2742 .build()
2743 .unwrap();
2744 events
2745 .extend(self.process_send_v5_0_disconnect(disconnect));
2746 events.push(GenericEvent::NotifyError(
2747 MqttError::ReceiveMaximumExceeded,
2748 ));
2749 return events;
2750 }
2751 }
2752 self.publish_recv.insert(packet_id);
2753 if self.auto_pub_response
2754 && self.status == ConnectionStatus::Connected
2755 {
2756 puback_send = true;
2757 }
2758 }
2759 Qos::ExactlyOnce => {
2760 let packet_id = packet.packet_id().unwrap();
2761 if let Some(max) = self.publish_recv_max {
2762 if self.publish_recv.len() >= max as usize {
2763 let disconnect = v5_0::Disconnect::builder()
2764 .reason_code(
2765 DisconnectReasonCode::ReceiveMaximumExceeded,
2766 )
2767 .build()
2768 .unwrap();
2769 events
2770 .extend(self.process_send_v5_0_disconnect(disconnect));
2771 events.push(GenericEvent::NotifyError(
2772 MqttError::ReceiveMaximumExceeded,
2773 ));
2774 return events;
2775 }
2776 }
2777 self.publish_recv.insert(packet_id);
2778
2779 if !self.qos2_publish_handled.insert(packet_id) {
2780 already_handled = true;
2781 }
2782 if self.status == ConnectionStatus::Connected
2783 && (self.auto_pub_response || already_handled)
2784 {
2785 pubrec_send = true;
2786 }
2787 }
2788 Qos::AtMostOnce => {
2789 }
2791 }
2792
2793 if packet.topic_name().is_empty() {
2795 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2797 if ta == 0
2798 || self.topic_alias_recv.is_none()
2799 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2800 {
2801 let disconnect = v5_0::Disconnect::builder()
2802 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2803 .build()
2804 .unwrap();
2805 events.extend(self.process_send_v5_0_disconnect(disconnect));
2806 events.push(GenericEvent::NotifyError(
2807 MqttError::TopicAliasInvalid,
2808 ));
2809 return events;
2810 }
2811
2812 if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2813 if topic_alias_recv.get(ta).is_none() {
2814 let disconnect = v5_0::Disconnect::builder()
2815 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2816 .build()
2817 .unwrap();
2818 events
2819 .extend(self.process_send_v5_0_disconnect(disconnect));
2820 events.push(GenericEvent::NotifyError(
2821 MqttError::TopicAliasInvalid,
2822 ));
2823 return events;
2824 }
2825 }
2828 } else {
2829 let disconnect = v5_0::Disconnect::builder()
2830 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2831 .build()
2832 .unwrap();
2833 events.extend(self.process_send_v5_0_disconnect(disconnect));
2834 events
2835 .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2836 return events;
2837 }
2838 } else {
2839 if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2841 if ta == 0
2842 || self.topic_alias_recv.is_none()
2843 || ta > self.topic_alias_recv.as_ref().unwrap().max()
2844 {
2845 let disconnect = v5_0::Disconnect::builder()
2846 .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2847 .build()
2848 .unwrap();
2849 events.extend(self.process_send_v5_0_disconnect(disconnect));
2850 events.push(GenericEvent::NotifyError(
2851 MqttError::TopicAliasInvalid,
2852 ));
2853 return events;
2854 }
2855 if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2856 topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2857 }
2858 }
2859 }
2860
2861 if puback_send {
2863 let puback = v5_0::GenericPuback::builder()
2864 .packet_id(packet.packet_id().unwrap())
2865 .build()
2866 .unwrap();
2867 events.extend(self.process_send_v5_0_puback(puback));
2868 }
2869 if pubrec_send {
2870 let pubrec = v5_0::GenericPubrec::builder()
2871 .packet_id(packet.packet_id().unwrap())
2872 .build()
2873 .unwrap();
2874 events.extend(self.process_send_v5_0_pubrec(pubrec));
2875 }
2876
2877 events.extend(self.refresh_pingreq_recv());
2879
2880 if !already_handled {
2882 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2883 }
2884 }
2885 Err(e) => {
2886 if self.status == ConnectionStatus::Connected {
2887 let disconnect = v5_0::Disconnect::builder()
2888 .reason_code(e.into())
2889 .build()
2890 .unwrap();
2891 let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2892 events.extend(disconnect_events);
2893 }
2894 events.push(GenericEvent::NotifyError(e));
2895 }
2896 }
2897 }
2898 PacketData::Normal(_) => {
2899 unreachable!("PUBLISH packet must use PacketData::Publish variant");
2900 }
2901 }
2902
2903 events
2904 }
2905
2906 fn process_recv_v3_1_1_puback(
2907 &mut self,
2908 raw_packet: RawPacket,
2909 ) -> Vec<GenericEvent<PacketIdType>> {
2910 let mut events = Vec::new();
2911
2912 match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2913 Ok((packet, _)) => {
2914 let packet_id = packet.packet_id();
2915 if self.pid_puback.remove(&packet_id) {
2916 self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2917 if self.pid_man.is_used_id(packet_id) {
2918 self.pid_man.release_id(packet_id);
2919 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2920 }
2921 events.extend(self.refresh_pingreq_recv());
2922 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2923 } else {
2924 events.push(GenericEvent::RequestClose);
2925 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2926 }
2927 }
2928 Err(e) => {
2929 events.push(GenericEvent::RequestClose);
2930 events.push(GenericEvent::NotifyError(e));
2931 }
2932 }
2933
2934 events
2935 }
2936
2937 fn process_recv_v5_0_puback(
2938 &mut self,
2939 raw_packet: RawPacket,
2940 ) -> Vec<GenericEvent<PacketIdType>> {
2941 let mut events = Vec::new();
2942
2943 match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2944 Ok((packet, _)) => {
2945 let packet_id = packet.packet_id();
2946 if self.pid_puback.remove(&packet_id) {
2947 self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2948 if self.pid_man.is_used_id(packet_id) {
2949 self.pid_man.release_id(packet_id);
2950 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2951 }
2952 if self.publish_send_max.is_some() {
2953 self.publish_send_count -= 1;
2954 }
2955 events.extend(self.refresh_pingreq_recv());
2956 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2957 } else {
2958 let disconnect = v5_0::Disconnect::builder()
2959 .reason_code(DisconnectReasonCode::ProtocolError)
2960 .build()
2961 .unwrap();
2962 events.extend(self.process_send_v5_0_disconnect(disconnect));
2963 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2964 }
2965 }
2966 Err(e) => {
2967 let disconnect = v5_0::Disconnect::builder()
2968 .reason_code(DisconnectReasonCode::ProtocolError)
2969 .build()
2970 .unwrap();
2971 events.extend(self.process_send_v5_0_disconnect(disconnect));
2972 events.push(GenericEvent::NotifyError(e));
2973 }
2974 }
2975
2976 events
2977 }
2978
2979 fn process_recv_v3_1_1_pubrec(
2980 &mut self,
2981 raw_packet: RawPacket,
2982 ) -> Vec<GenericEvent<PacketIdType>> {
2983 let mut events = Vec::new();
2984
2985 match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2986 Ok((packet, _)) => {
2987 let packet_id = packet.packet_id();
2988 if self.pid_pubrec.remove(&packet_id) {
2989 self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2990 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2991 let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2992 .packet_id(packet_id)
2993 .build()
2994 .unwrap();
2995 events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2996 }
2997 events.extend(self.refresh_pingreq_recv());
2998 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2999 } else {
3000 events.push(GenericEvent::RequestClose);
3001 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3002 }
3003 }
3004 Err(e) => {
3005 events.push(GenericEvent::RequestClose);
3006 events.push(GenericEvent::NotifyError(e));
3007 }
3008 }
3009
3010 events
3011 }
3012
3013 fn process_recv_v5_0_pubrec(
3014 &mut self,
3015 raw_packet: RawPacket,
3016 ) -> Vec<GenericEvent<PacketIdType>> {
3017 let mut events = Vec::new();
3018
3019 match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3020 Ok((packet, _)) => {
3021 let packet_id = packet.packet_id();
3022 if self.pid_pubrec.remove(&packet_id) {
3023 self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3024 if let Some(reason_code) = packet.reason_code() {
3025 if reason_code != PubrecReasonCode::Success {
3026 if self.pid_man.is_used_id(packet_id) {
3027 self.pid_man.release_id(packet_id);
3028 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3029 }
3030 self.qos2_publish_processing.remove(&packet_id);
3031 if self.publish_send_max.is_some() {
3032 self.publish_send_count -= 1;
3033 }
3034 } else if self.auto_pub_response
3035 && self.status == ConnectionStatus::Connected
3036 {
3037 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3038 .packet_id(packet_id)
3039 .build()
3040 .unwrap();
3041 events.extend(self.process_send_v5_0_pubrel(pubrel));
3042 }
3043 } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3044 let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3045 .packet_id(packet_id)
3046 .build()
3047 .unwrap();
3048 events.extend(self.process_send_v5_0_pubrel(pubrel));
3049 }
3050 events.extend(self.refresh_pingreq_recv());
3051 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3052 } else {
3053 let disconnect = v5_0::Disconnect::builder()
3054 .reason_code(DisconnectReasonCode::ProtocolError)
3055 .build()
3056 .unwrap();
3057 events.extend(self.process_send_v5_0_disconnect(disconnect));
3058 events.push(GenericEvent::NotifyError(MqttError::from(
3059 DisconnectReasonCode::ProtocolError,
3060 )));
3061 }
3062 }
3063 Err(e) => {
3064 let disconnect = v5_0::Disconnect::builder()
3065 .reason_code(DisconnectReasonCode::ProtocolError)
3066 .build()
3067 .unwrap();
3068 events.extend(self.process_send_v5_0_disconnect(disconnect));
3069 events.push(GenericEvent::NotifyError(e));
3070 }
3071 }
3072
3073 events
3074 }
3075
3076 fn process_recv_v3_1_1_pubrel(
3077 &mut self,
3078 raw_packet: RawPacket,
3079 ) -> Vec<GenericEvent<PacketIdType>> {
3080 let mut events = Vec::new();
3081
3082 match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3083 Ok((packet, _)) => {
3084 let packet_id = packet.packet_id();
3085 self.qos2_publish_handled.remove(&packet_id);
3086 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3087 let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3088 .packet_id(packet_id)
3089 .build()
3090 .unwrap();
3091 events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3092 }
3093 events.extend(self.refresh_pingreq_recv());
3094 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3095 }
3096 Err(e) => {
3097 events.push(GenericEvent::RequestClose);
3098 events.push(GenericEvent::NotifyError(e));
3099 }
3100 }
3101
3102 events
3103 }
3104
3105 fn process_recv_v5_0_pubrel(
3106 &mut self,
3107 raw_packet: RawPacket,
3108 ) -> Vec<GenericEvent<PacketIdType>> {
3109 let mut events = Vec::new();
3110
3111 match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3112 Ok((packet, _)) => {
3113 let packet_id = packet.packet_id();
3114 self.qos2_publish_handled.remove(&packet_id);
3115 if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3116 let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3117 .packet_id(packet_id)
3118 .build()
3119 .unwrap();
3120 events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3121 }
3122 events.extend(self.refresh_pingreq_recv());
3123 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3124 }
3125 Err(e) => {
3126 let disconnect = v5_0::Disconnect::builder()
3127 .reason_code(DisconnectReasonCode::ProtocolError)
3128 .build()
3129 .unwrap();
3130 events.extend(self.process_send_v5_0_disconnect(disconnect));
3131 events.push(GenericEvent::NotifyError(e));
3132 }
3133 }
3134
3135 events
3136 }
3137
3138 fn process_recv_v3_1_1_pubcomp(
3139 &mut self,
3140 raw_packet: RawPacket,
3141 ) -> Vec<GenericEvent<PacketIdType>> {
3142 let mut events = Vec::new();
3143
3144 match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3145 Ok((packet, _)) => {
3146 let packet_id = packet.packet_id();
3147 if self.pid_pubcomp.remove(&packet_id) {
3148 self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3149 if self.pid_man.is_used_id(packet_id) {
3150 self.pid_man.release_id(packet_id);
3151 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3152 }
3153 self.qos2_publish_processing.remove(&packet_id);
3154 events.extend(self.refresh_pingreq_recv());
3155 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3156 } else {
3157 events.push(GenericEvent::RequestClose);
3158 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3159 }
3160 }
3161 Err(e) => {
3162 events.push(GenericEvent::RequestClose);
3163 events.push(GenericEvent::NotifyError(e));
3164 }
3165 }
3166
3167 events
3168 }
3169
3170 fn process_recv_v5_0_pubcomp(
3171 &mut self,
3172 raw_packet: RawPacket,
3173 ) -> Vec<GenericEvent<PacketIdType>> {
3174 let mut events = Vec::new();
3175
3176 match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3177 Ok((packet, _)) => {
3178 let packet_id = packet.packet_id();
3179 if self.pid_pubcomp.remove(&packet_id) {
3180 self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3181 if self.pid_man.is_used_id(packet_id) {
3182 self.pid_man.release_id(packet_id);
3183 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3184 }
3185 self.qos2_publish_processing.remove(&packet_id);
3186 if self.publish_send_max.is_some() {
3187 self.publish_send_count -= 1;
3188 }
3189 events.extend(self.refresh_pingreq_recv());
3190 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3191 } else {
3192 let disconnect = v5_0::Disconnect::builder()
3193 .reason_code(DisconnectReasonCode::ProtocolError)
3194 .build()
3195 .unwrap();
3196 events.extend(self.process_send_v5_0_disconnect(disconnect));
3197 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3198 }
3199 }
3200 Err(e) => {
3201 let disconnect = v5_0::Disconnect::builder()
3202 .reason_code(DisconnectReasonCode::ProtocolError)
3203 .build()
3204 .unwrap();
3205 events.extend(self.process_send_v5_0_disconnect(disconnect));
3206 events.push(GenericEvent::NotifyError(e));
3207 }
3208 }
3209
3210 events
3211 }
3212
3213 fn process_recv_v3_1_1_subscribe(
3214 &mut self,
3215 raw_packet: RawPacket,
3216 ) -> Vec<GenericEvent<PacketIdType>> {
3217 let mut events = Vec::new();
3218
3219 match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3220 Ok((packet, _)) => {
3221 events.extend(self.refresh_pingreq_recv());
3222 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3223 }
3224 Err(e) => {
3225 events.push(GenericEvent::RequestClose);
3226 events.push(GenericEvent::NotifyError(e));
3227 }
3228 }
3229
3230 events
3231 }
3232
3233 fn process_recv_v5_0_subscribe(
3234 &mut self,
3235 raw_packet: RawPacket,
3236 ) -> Vec<GenericEvent<PacketIdType>> {
3237 let mut events = Vec::new();
3238
3239 match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3240 Ok((packet, _)) => {
3241 events.extend(self.refresh_pingreq_recv());
3242 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3243 }
3244 Err(e) => {
3245 let disconnect = v5_0::Disconnect::builder()
3246 .reason_code(DisconnectReasonCode::ProtocolError)
3247 .build()
3248 .unwrap();
3249 events.extend(self.process_send_v5_0_disconnect(disconnect));
3250 events.push(GenericEvent::NotifyError(e));
3251 }
3252 }
3253
3254 events
3255 }
3256
3257 fn process_recv_v3_1_1_suback(
3258 &mut self,
3259 raw_packet: RawPacket,
3260 ) -> Vec<GenericEvent<PacketIdType>> {
3261 let mut events = Vec::new();
3262
3263 match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3264 Ok((packet, _)) => {
3265 let packet_id = packet.packet_id();
3266 if self.pid_suback.remove(&packet_id) {
3267 if self.pid_man.is_used_id(packet_id) {
3268 self.pid_man.release_id(packet_id);
3269 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3270 }
3271 events.extend(self.refresh_pingreq_recv());
3272 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3273 } else {
3274 events.push(GenericEvent::RequestClose);
3275 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3276 }
3277 }
3278 Err(e) => {
3279 events.push(GenericEvent::RequestClose);
3280 events.push(GenericEvent::NotifyError(e));
3281 }
3282 }
3283
3284 events
3285 }
3286
3287 fn process_recv_v5_0_suback(
3288 &mut self,
3289 raw_packet: RawPacket,
3290 ) -> Vec<GenericEvent<PacketIdType>> {
3291 let mut events = Vec::new();
3292
3293 match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3294 Ok((packet, _)) => {
3295 let packet_id = packet.packet_id();
3296 if self.pid_suback.remove(&packet_id) {
3297 if self.pid_man.is_used_id(packet_id) {
3298 self.pid_man.release_id(packet_id);
3299 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3300 }
3301 events.extend(self.refresh_pingreq_recv());
3302 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3303 } else {
3304 let disconnect = v5_0::Disconnect::builder()
3305 .reason_code(DisconnectReasonCode::ProtocolError)
3306 .build()
3307 .unwrap();
3308 events.extend(self.process_send_v5_0_disconnect(disconnect));
3309 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3310 }
3311 }
3312 Err(e) => {
3313 let disconnect = v5_0::Disconnect::builder()
3314 .reason_code(DisconnectReasonCode::ProtocolError)
3315 .build()
3316 .unwrap();
3317 events.extend(self.process_send_v5_0_disconnect(disconnect));
3318 events.push(GenericEvent::NotifyError(e));
3319 }
3320 }
3321
3322 events
3323 }
3324
3325 fn process_recv_v3_1_1_unsubscribe(
3326 &mut self,
3327 raw_packet: RawPacket,
3328 ) -> Vec<GenericEvent<PacketIdType>> {
3329 let mut events = Vec::new();
3330
3331 match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3332 Ok((packet, _)) => {
3333 events.extend(self.refresh_pingreq_recv());
3334 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3335 }
3336 Err(e) => {
3337 events.push(GenericEvent::RequestClose);
3338 events.push(GenericEvent::NotifyError(e));
3339 }
3340 }
3341
3342 events
3343 }
3344
3345 fn process_recv_v5_0_unsubscribe(
3346 &mut self,
3347 raw_packet: RawPacket,
3348 ) -> Vec<GenericEvent<PacketIdType>> {
3349 let mut events = Vec::new();
3350
3351 match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3352 Ok((packet, _)) => {
3353 events.extend(self.refresh_pingreq_recv());
3354 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3355 }
3356 Err(e) => {
3357 let disconnect = v5_0::Disconnect::builder()
3358 .reason_code(DisconnectReasonCode::ProtocolError)
3359 .build()
3360 .unwrap();
3361 events.extend(self.process_send_v5_0_disconnect(disconnect));
3362 events.push(GenericEvent::NotifyError(e));
3363 }
3364 }
3365
3366 events
3367 }
3368
3369 fn process_recv_v3_1_1_unsuback(
3370 &mut self,
3371 raw_packet: RawPacket,
3372 ) -> Vec<GenericEvent<PacketIdType>> {
3373 let mut events = Vec::new();
3374
3375 match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3376 Ok((packet, _)) => {
3377 let packet_id = packet.packet_id();
3378 if self.pid_unsuback.remove(&packet_id) {
3379 if self.pid_man.is_used_id(packet_id) {
3380 self.pid_man.release_id(packet_id);
3381 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3382 }
3383 events.extend(self.refresh_pingreq_recv());
3384 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3385 } else {
3386 events.push(GenericEvent::RequestClose);
3387 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3388 }
3389 }
3390 Err(e) => {
3391 events.push(GenericEvent::RequestClose);
3392 events.push(GenericEvent::NotifyError(e));
3393 }
3394 }
3395
3396 events
3397 }
3398
3399 fn process_recv_v5_0_unsuback(
3400 &mut self,
3401 raw_packet: RawPacket,
3402 ) -> Vec<GenericEvent<PacketIdType>> {
3403 let mut events = Vec::new();
3404
3405 match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3406 Ok((packet, _)) => {
3407 let packet_id = packet.packet_id();
3408 if self.pid_unsuback.remove(&packet_id) {
3409 if self.pid_man.is_used_id(packet_id) {
3410 self.pid_man.release_id(packet_id);
3411 events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3412 }
3413 events.extend(self.refresh_pingreq_recv());
3414 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3415 } else {
3416 let disconnect = v5_0::Disconnect::builder()
3417 .reason_code(DisconnectReasonCode::ProtocolError)
3418 .build()
3419 .unwrap();
3420 events.extend(self.process_send_v5_0_disconnect(disconnect));
3421 events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3422 }
3423 }
3424 Err(e) => {
3425 let disconnect = v5_0::Disconnect::builder()
3426 .reason_code(DisconnectReasonCode::ProtocolError)
3427 .build()
3428 .unwrap();
3429 events.extend(self.process_send_v5_0_disconnect(disconnect));
3430 events.push(GenericEvent::NotifyError(e));
3431 }
3432 }
3433
3434 events
3435 }
3436
3437 fn process_recv_v3_1_1_pingreq(
3438 &mut self,
3439 raw_packet: RawPacket,
3440 ) -> Vec<GenericEvent<PacketIdType>> {
3441 let mut events = Vec::new();
3442
3443 match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3444 Ok((packet, _)) => {
3445 if (Role::IS_SERVER || Role::IS_ANY)
3446 && !self.is_client
3447 && self.auto_ping_response
3448 && self.status == ConnectionStatus::Connected
3449 {
3450 let pingresp = v3_1_1::Pingresp::new();
3451 events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3452 }
3453 events.extend(self.refresh_pingreq_recv());
3454 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3455 }
3456 Err(e) => {
3457 events.push(GenericEvent::RequestClose);
3458 events.push(GenericEvent::NotifyError(e));
3459 }
3460 }
3461
3462 events
3463 }
3464
3465 fn process_recv_v5_0_pingreq(
3466 &mut self,
3467 raw_packet: RawPacket,
3468 ) -> Vec<GenericEvent<PacketIdType>> {
3469 let mut events = Vec::new();
3470
3471 match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3472 Ok((packet, _)) => {
3473 if (Role::IS_SERVER || Role::IS_ANY)
3474 && !self.is_client
3475 && self.auto_ping_response
3476 && self.status == ConnectionStatus::Connected
3477 {
3478 let pingresp = v5_0::Pingresp::new();
3479 events.extend(self.process_send_v5_0_pingresp(pingresp));
3480 }
3481 events.extend(self.refresh_pingreq_recv());
3482 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3483 }
3484 Err(e) => {
3485 let disconnect = v5_0::Disconnect::builder()
3486 .reason_code(DisconnectReasonCode::ProtocolError)
3487 .build()
3488 .unwrap();
3489 events.extend(self.process_send_v5_0_disconnect(disconnect));
3490 events.push(GenericEvent::NotifyError(e));
3491 }
3492 }
3493
3494 events
3495 }
3496
3497 fn process_recv_v3_1_1_pingresp(
3498 &mut self,
3499 raw_packet: RawPacket,
3500 ) -> Vec<GenericEvent<PacketIdType>> {
3501 let mut events = Vec::new();
3502
3503 match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3504 Ok((packet, _)) => {
3505 self.pingresp_recv_set = false;
3506 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3507 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3508 }
3509 Err(e) => {
3510 events.push(GenericEvent::RequestClose);
3511 events.push(GenericEvent::NotifyError(e));
3512 }
3513 }
3514
3515 events
3516 }
3517
3518 fn process_recv_v5_0_pingresp(
3519 &mut self,
3520 raw_packet: RawPacket,
3521 ) -> Vec<GenericEvent<PacketIdType>> {
3522 let mut events = Vec::new();
3523
3524 match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3525 Ok((packet, _)) => {
3526 self.pingresp_recv_set = false;
3527 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3528 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3529 }
3530 Err(e) => {
3531 let disconnect = v5_0::Disconnect::builder()
3532 .reason_code(DisconnectReasonCode::ProtocolError)
3533 .build()
3534 .unwrap();
3535 events.extend(self.process_send_v5_0_disconnect(disconnect));
3536 events.push(GenericEvent::NotifyError(e));
3537 }
3538 }
3539
3540 events
3541 }
3542
3543 fn process_recv_v3_1_1_disconnect(
3544 &mut self,
3545 raw_packet: RawPacket,
3546 ) -> Vec<GenericEvent<PacketIdType>> {
3547 let mut events = Vec::new();
3548
3549 match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3550 Ok((packet, _)) => {
3551 self.cancel_timers(&mut events);
3552 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3553 }
3554 Err(e) => {
3555 events.push(GenericEvent::RequestClose);
3556 events.push(GenericEvent::NotifyError(e));
3557 }
3558 }
3559
3560 events
3561 }
3562
3563 fn process_recv_v5_0_disconnect(
3564 &mut self,
3565 raw_packet: RawPacket,
3566 ) -> Vec<GenericEvent<PacketIdType>> {
3567 let mut events = Vec::new();
3568
3569 match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3570 Ok((packet, _)) => {
3571 self.cancel_timers(&mut events);
3572 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3573 }
3574 Err(e) => {
3575 let disconnect = v5_0::Disconnect::builder()
3576 .reason_code(DisconnectReasonCode::ProtocolError)
3577 .build()
3578 .unwrap();
3579 events.extend(self.process_send_v5_0_disconnect(disconnect));
3580 events.push(GenericEvent::NotifyError(e));
3581 }
3582 }
3583
3584 events
3585 }
3586
3587 fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3588 let mut events = Vec::new();
3589
3590 match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3591 Ok((packet, _)) => {
3592 events.extend(self.refresh_pingreq_recv());
3593 events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3594 }
3595 Err(e) => {
3596 let disconnect = v5_0::Disconnect::builder()
3597 .reason_code(DisconnectReasonCode::ProtocolError)
3598 .build()
3599 .unwrap();
3600 events.extend(self.process_send_v5_0_disconnect(disconnect));
3601 events.push(GenericEvent::NotifyError(e));
3602 }
3603 }
3604
3605 events
3606 }
3607
3608 fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3609 let mut events = Vec::new();
3610 if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3611 if self.status == ConnectionStatus::Connecting
3612 || self.status == ConnectionStatus::Connected
3613 {
3614 self.pingreq_recv_set = true;
3615 events.push(GenericEvent::RequestTimerReset {
3616 kind: TimerKind::PingreqRecv,
3617 duration_ms: timeout_ms,
3618 });
3619 } else {
3620 self.pingreq_recv_set = false;
3621 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3622 }
3623 }
3624
3625 events
3626 }
3627
3628 fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3630 if self.pingreq_send_set {
3631 self.pingreq_send_set = false;
3632 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3633 }
3634 if self.pingreq_recv_set {
3635 self.pingreq_recv_set = false;
3636 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3637 }
3638 if self.pingresp_recv_set {
3639 self.pingresp_recv_set = false;
3640 events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3641 }
3642 }
3643
3644 fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3646 for prop in props {
3647 if let Property::TopicAlias(ta) = prop {
3648 return Some(ta.val());
3649 }
3650 }
3651 None
3652 }
3653
3654 #[allow(dead_code)]
3655 fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3656 self.pid_man.is_used_id(packet_id)
3657 }
3658}
3659
3660pub trait RecvBehavior<Role, PacketIdType>
3663where
3664 PacketIdType: IsPacketId,
3665{
3666 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3667}
3668
3669impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3671 for GenericConnection<role::Client, PacketIdType>
3672where
3673 PacketIdType: IsPacketId,
3674{
3675 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3676 self.recv(data)
3677 }
3678}
3679
3680impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3681 for GenericConnection<role::Server, PacketIdType>
3682where
3683 PacketIdType: IsPacketId,
3684{
3685 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3686 self.recv(data)
3687 }
3688}
3689
3690impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3691 for GenericConnection<role::Any, PacketIdType>
3692where
3693 PacketIdType: IsPacketId,
3694{
3695 fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3696 self.recv(data)
3697 }
3698}
3699
3700#[cfg(test)]
3703mod tests {
3704 use super::*;
3705 use crate::mqtt::connection::version::Version;
3706 use crate::mqtt::packet::TopicAliasSend;
3707 use crate::mqtt::role;
3708
3709 #[test]
3710 fn test_initialize_client_mode() {
3711 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3712
3713 connection.initialize(true);
3715
3716 assert!(connection.is_client);
3718 assert_eq!(connection.publish_send_count, 0);
3719 assert!(connection.publish_send_max.is_none());
3720 assert!(connection.publish_recv_max.is_none());
3721 assert!(!connection.need_store);
3722 }
3723
3724 #[test]
3725 fn test_initialize_server_mode() {
3726 let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3727
3728 connection.initialize(false);
3730
3731 assert!(!connection.is_client);
3733 assert_eq!(connection.publish_send_count, 0);
3734 assert!(connection.publish_send_max.is_none());
3735 assert!(connection.publish_recv_max.is_none());
3736 assert!(!connection.need_store);
3737 }
3738
3739 #[test]
3740 fn test_validate_topic_alias_no_topic_alias_send() {
3741 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3742
3743 let result = connection.validate_topic_alias(Some(1));
3745 assert!(result.is_none());
3746 }
3747
3748 #[test]
3749 fn test_validate_topic_alias_none_input() {
3750 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3751
3752 let result = connection.validate_topic_alias(None);
3754 assert!(result.is_none());
3755 }
3756
3757 #[test]
3758 fn test_validate_topic_alias_range_no_topic_alias_send() {
3759 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3760
3761 let result = connection.validate_topic_alias_range(1);
3763 assert!(!result);
3764 }
3765
3766 #[test]
3767 fn test_validate_topic_alias_range_zero() {
3768 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3769
3770 let topic_alias_send = TopicAliasSend::new(10);
3772 connection.topic_alias_send = Some(topic_alias_send);
3773
3774 let result = connection.validate_topic_alias_range(0);
3776 assert!(!result);
3777 }
3778
3779 #[test]
3780 fn test_validate_topic_alias_range_over_max() {
3781 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3782
3783 let topic_alias_send = TopicAliasSend::new(5);
3785 connection.topic_alias_send = Some(topic_alias_send);
3786
3787 let result = connection.validate_topic_alias_range(6);
3789 assert!(!result);
3790 }
3791
3792 #[test]
3793 fn test_validate_topic_alias_range_valid() {
3794 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3795
3796 let topic_alias_send = TopicAliasSend::new(10);
3798 connection.topic_alias_send = Some(topic_alias_send);
3799
3800 assert!(connection.validate_topic_alias_range(1));
3802 assert!(connection.validate_topic_alias_range(5));
3803 assert!(connection.validate_topic_alias_range(10));
3804 }
3805
3806 #[test]
3807 fn test_validate_topic_alias_with_registered_alias() {
3808 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3809
3810 let mut topic_alias_send = TopicAliasSend::new(10);
3812 topic_alias_send.insert_or_update("test/topic", 5);
3813 connection.topic_alias_send = Some(topic_alias_send);
3814
3815 let result = connection.validate_topic_alias(Some(5));
3817 assert_eq!(result, Some("test/topic".to_string()));
3818 }
3819
3820 #[test]
3821 fn test_validate_topic_alias_unregistered_alias() {
3822 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3823
3824 let topic_alias_send = TopicAliasSend::new(10);
3826 connection.topic_alias_send = Some(topic_alias_send);
3827
3828 let result = connection.validate_topic_alias(Some(5));
3830 assert!(result.is_none());
3831 }
3832
3833 #[test]
3834 fn test_validate_maximum_packet_size_within_limit() {
3835 let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3836
3837 let result = connection.validate_maximum_packet_size_send(1000);
3839 assert!(result);
3840 }
3841
3842 #[test]
3843 fn test_validate_maximum_packet_size_at_limit() {
3844 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3845
3846 connection.maximum_packet_size_send = 1000;
3848
3849 let result = connection.validate_maximum_packet_size_send(1000);
3851 assert!(result);
3852 }
3853
3854 #[test]
3855 fn test_validate_maximum_packet_size_over_limit() {
3856 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3857
3858 connection.maximum_packet_size_send = 1000;
3860
3861 let result = connection.validate_maximum_packet_size_send(1001);
3863 assert!(!result);
3864 }
3865
3866 #[test]
3867 fn test_validate_maximum_packet_size_zero_limit() {
3868 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3869
3870 connection.maximum_packet_size_send = 0;
3872
3873 let result = connection.validate_maximum_packet_size_send(1);
3875 assert!(!result);
3876
3877 let result = connection.validate_maximum_packet_size_send(0);
3879 assert!(result);
3880 }
3881
3882 #[test]
3883 fn test_initialize_clears_state() {
3884 let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3885
3886 connection.publish_send_count = 5;
3888 connection.need_store = true;
3889 connection.pid_suback.insert(123);
3890 connection.pid_unsuback.insert(456);
3891
3892 connection.initialize(true);
3894
3895 assert_eq!(connection.publish_send_count, 0);
3897 assert!(!connection.need_store);
3898 assert!(connection.pid_suback.is_empty());
3899 assert!(connection.pid_unsuback.is_empty());
3900 assert!(connection.is_client);
3901 }
3902
3903 #[test]
3904 fn test_remaining_length_to_total_size() {
3905 assert_eq!(remaining_length_to_total_size(0), 2); assert_eq!(remaining_length_to_total_size(127), 129); assert_eq!(remaining_length_to_total_size(128), 131); assert_eq!(remaining_length_to_total_size(16383), 16386); assert_eq!(remaining_length_to_total_size(16384), 16388); assert_eq!(remaining_length_to_total_size(2097151), 2097155); assert_eq!(remaining_length_to_total_size(2097152), 2097157); assert_eq!(remaining_length_to_total_size(268435455), 268435460); }
3921}