mqtt_protocol_core/mqtt/connection/
core.rs

1// MIT License
2//
3// Copyright (c) 2025 Takatoshi Kondo
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to deal
7// in the Software without restriction, including without limitation the rights
8// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9// copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in all
13// copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21// SOFTWARE.
22use alloc::{
23    string::{String, ToString},
24    vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33
34use serde::Serialize;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
37enum ConnectionStatus {
38    #[serde(rename = "disconnected")]
39    Disconnected,
40    #[serde(rename = "connecting")]
41    Connecting,
42    #[serde(rename = "connected")]
43    Connected,
44}
45use crate::mqtt::connection::packet_builder::{
46    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
47};
48use crate::mqtt::connection::packet_id_manager::PacketIdManager;
49use crate::mqtt::connection::role;
50use crate::mqtt::connection::role::RoleType;
51use crate::mqtt::connection::sendable::Sendable;
52use crate::mqtt::connection::version::*;
53use crate::mqtt::packet::v3_1_1;
54use crate::mqtt::packet::v5_0;
55use crate::mqtt::packet::GenericPacket;
56use crate::mqtt::packet::GenericStorePacket;
57use crate::mqtt::packet::IsPacketId;
58use crate::mqtt::packet::Qos;
59use crate::mqtt::packet::ResponsePacket;
60use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
61use crate::mqtt::prelude::GenericPacketTrait;
62use crate::mqtt::result_code::{
63    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
64};
65
66/// MQTT protocol maximum packet size limit
67/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
68const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
69
70/// Calculate total packet size from remaining length
71///
72/// The total packet size consists of:
73/// - 1 byte for the fixed header
74/// - 1-4 bytes for the remaining length encoding
75/// - The remaining length value itself
76fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
77    let remaining_length_bytes = if remaining_length < 128 {
78        1
79    } else if remaining_length < 16384 {
80        2
81    } else if remaining_length < 2097152 {
82        3
83    } else {
84        4
85    };
86
87    1 + remaining_length_bytes + remaining_length
88}
89
90/// Type alias for Event with u16 packet ID (most common case)
91///
92/// This is a convenience type alias that most applications will use.
93/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
94pub type Event = GenericEvent<u16>;
95
96/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
97///
98/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
99/// It handles MQTT packet processing, state management, and protocol compliance without performing
100/// any actual network I/O operations. Instead, it returns events that the application must handle.
101///
102/// # Type Parameters
103///
104/// * `Role` - The connection role (Client or Server), determining allowed operations
105/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
106///
107/// # Key Features
108///
109/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
110/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
111/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
112/// - **Configurable Behavior**: Supports various configuration options for different use cases
113/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
114///
115/// # Usage
116///
117/// ```ignore
118/// use mqtt_protocol_core::mqtt;
119///
120/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
121///
122/// // Send a packet
123/// let events = connection.send(publish_packet);
124/// for event in events {
125///     match event {
126///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
127///             // Send packet over network
128///         },
129///         // Handle other events...
130///     }
131/// }
132///
133/// // Receive data
134/// let mut cursor = std::io::Cursor::new(received_data);
135/// let events = connection.recv(&mut cursor);
136/// // Process events...
137/// ```
138pub struct GenericConnection<Role, PacketIdType>
139where
140    Role: RoleType,
141    PacketIdType: IsPacketId,
142{
143    _marker: PhantomData<Role>,
144
145    protocol_version: Version,
146
147    pid_man: PacketIdManager<PacketIdType>,
148    pid_suback: HashSet<PacketIdType>,
149    pid_unsuback: HashSet<PacketIdType>,
150    pid_puback: HashSet<PacketIdType>,
151    pid_pubrec: HashSet<PacketIdType>,
152    pid_pubcomp: HashSet<PacketIdType>,
153
154    need_store: bool,
155    // Store for retransmission packets
156    store: GenericStore<PacketIdType>,
157
158    offline_publish: bool,
159    auto_pub_response: bool,
160    auto_ping_response: bool,
161
162    // Auto map topic alias for sending
163    auto_map_topic_alias_send: bool,
164    // Auto replace topic alias for sending
165    auto_replace_topic_alias_send: bool,
166    // Topic alias management for receiving
167    topic_alias_recv: Option<TopicAliasRecv>,
168    // Topic alias management for sending
169    topic_alias_send: Option<TopicAliasSend>,
170
171    publish_send_max: Option<u16>,
172    // Maximum number of concurrent PUBLISH packets for receiving
173    publish_recv_max: Option<u16>,
174    // Maximum number of concurrent PUBLISH packets for sending
175    // Current count of PUBLISH packets being sent
176    publish_send_count: u16,
177
178    // Set of received PUBLISH packets (for flow control)
179    publish_recv: HashSet<PacketIdType>,
180
181    // Maximum packet size for sending
182    maximum_packet_size_send: u32,
183    // Maximum packet size for receiving
184    maximum_packet_size_recv: u32,
185
186    // Connection state
187    status: ConnectionStatus,
188
189    // PINGREQ send interval in milliseconds set by user
190    pingreq_user_send_interval_ms: Option<u64>,
191    pingreq_keep_alive_ms: u64,
192    pingreq_server_keep_alive_ms: Option<u64>,
193
194    // PINGREQ receive timeout in milliseconds
195    pingreq_recv_timeout_ms: u64,
196    // PINGRESP receive timeout in milliseconds
197    pingresp_recv_timeout_ms: u64,
198
199    // QoS2 PUBLISH packet handling state (for duplicate detection)
200    qos2_publish_handled: HashSet<PacketIdType>,
201    // QoS2 PUBLISH packet processing state
202    qos2_publish_processing: HashSet<PacketIdType>,
203
204    // Timer state flags
205    pingreq_send_set: bool,
206    pingreq_recv_set: bool,
207    pingresp_recv_set: bool,
208
209    packet_builder: PacketBuilder,
210    // Client/Server mode flag
211    is_client: bool,
212}
213
214/// Type alias for Connection with u16 packet ID (standard case)
215///
216/// This is the standard Connection type that most applications will use.
217/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
218/// supporting values from 1 to 65535.
219///
220/// For extended scenarios where larger packet ID ranges are needed
221/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
222///
223/// # Type Parameters
224///
225/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
226pub type Connection<Role> = GenericConnection<Role, u16>;
227
228impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
229where
230    Role: RoleType,
231    PacketIdType: IsPacketId,
232{
233    /// Create a new MQTT connection instance
234    ///
235    /// Initializes a new MQTT connection with the specified protocol version.
236    /// The connection starts in a disconnected state and must be activated through
237    /// the connection handshake process (CONNECT/CONNACK).
238    ///
239    /// # Parameters
240    ///
241    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
242    ///
243    /// # Returns
244    ///
245    /// A new `GenericConnection` instance ready for use
246    ///
247    /// # Examples
248    ///
249    /// ```ignore
250    /// use mqtt_protocol_core::mqtt;
251    ///
252    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
253    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
254    /// ```
255    pub fn new(version: Version) -> Self {
256        Self {
257            _marker: PhantomData,
258            protocol_version: version,
259            pid_man: PacketIdManager::new(),
260            pid_suback: HashSet::default(),
261            pid_unsuback: HashSet::default(),
262            pid_puback: HashSet::default(),
263            pid_pubrec: HashSet::default(),
264            pid_pubcomp: HashSet::default(),
265            need_store: false,
266            store: GenericStore::new(),
267            offline_publish: false,
268            auto_pub_response: false,
269            auto_ping_response: false,
270            auto_map_topic_alias_send: false,
271            auto_replace_topic_alias_send: false,
272            topic_alias_recv: None,
273            topic_alias_send: None,
274            publish_send_max: None,
275            publish_recv_max: None,
276            publish_send_count: 0,
277            publish_recv: HashSet::default(),
278            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
279            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
280            status: ConnectionStatus::Disconnected,
281            pingreq_user_send_interval_ms: None,
282            pingreq_keep_alive_ms: 0,
283            pingreq_server_keep_alive_ms: None,
284            pingreq_recv_timeout_ms: 0,
285            pingresp_recv_timeout_ms: 0,
286            qos2_publish_handled: HashSet::default(),
287            qos2_publish_processing: HashSet::default(),
288            pingreq_send_set: false,
289            pingreq_recv_set: false,
290            pingresp_recv_set: false,
291            packet_builder: PacketBuilder::new(),
292            is_client: false,
293        }
294    }
295
296    // public
297
298    /// Send MQTT packet with compile-time role checking (experimental)
299    ///
300    /// This method provides compile-time verification that the packet being sent
301    /// is allowed for the current connection role (Client or Server). It only works
302    /// when the `Role` type parameter is concrete (not generic).
303    ///
304    /// This is an experimental API that may be subject to change. For general use,
305    /// consider using the `send()` method instead.
306    ///
307    /// # Type Parameters
308    ///
309    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
310    ///
311    /// # Parameters
312    ///
313    /// * `packet` - The MQTT packet to send
314    ///
315    /// # Returns
316    ///
317    /// A vector of events that the application must process
318    ///
319    /// # Compile-time Safety
320    ///
321    /// If the packet type is not allowed for the current role, this will result
322    /// in a compile-time error, preventing protocol violations at development time.
323    ///
324    /// # Examples
325    ///
326    /// ```ignore
327    /// use mqtt_protocol_core::mqtt;
328    ///
329    /// // This works for concrete roles
330    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
331    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
332    ///
333    /// // This would cause a compile error
334    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
335    /// ```
336    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
337    where
338        T: Sendable<Role, PacketIdType>,
339    {
340        // dispatch concrete packet or generic packet
341        packet.dispatch_send(self)
342    }
343
344    /// Send MQTT packet with runtime role validation
345    ///
346    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
347    /// and performs runtime validation to ensure the packet is allowed for the current
348    /// connection role. This provides flexibility when the exact packet type is not known
349    /// at compile time.
350    ///
351    /// # Parameters
352    ///
353    /// * `packet` - The MQTT packet to send
354    ///
355    /// # Returns
356    ///
357    /// A vector of events that the application must process. If the packet is not allowed
358    /// for the current role, a `NotifyError` event will be included.
359    ///
360    /// # Validation
361    ///
362    /// The method validates that:
363    /// - The packet type is allowed for the connection role (Client vs Server)
364    /// - Protocol version compatibility
365    /// - Connection state requirements
366    /// - Packet ID management for QoS > 0 packets
367    ///
368    /// # Examples
369    ///
370    /// ```ignore
371    /// use mqtt_protocol_core::mqtt;
372    ///
373    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
374    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
375    ///
376    /// for event in events {
377    ///     match event {
378    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
379    ///             // Send packet over network
380    ///         },
381    ///         mqtt::connection::Event::NotifyError(error) => {
382    ///             // Handle validation errors
383    ///         },
384    ///         // Handle other events...
385    ///     }
386    /// }
387    /// ```
388    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
389        use core::any::TypeId;
390
391        let role_id = TypeId::of::<Role>();
392        let client_id = TypeId::of::<role::Client>();
393        let server_id = TypeId::of::<role::Server>();
394        let any_id = TypeId::of::<role::Any>();
395
396        // Check version compatibility between connection and packet
397        let packet_version = packet.protocol_version();
398
399        // Return error if versions don't match
400        if self.protocol_version != packet_version {
401            return vec![GenericEvent::NotifyError(MqttError::VersionMismatch)];
402        }
403
404        match packet {
405            // CONNECT - Client/Any can send
406            GenericPacket::V3_1_1Connect(p) => {
407                if role_id == client_id || role_id == any_id {
408                    self.process_send_v3_1_1_connect(p)
409                } else {
410                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
411                }
412            }
413            GenericPacket::V5_0Connect(p) => {
414                if role_id == client_id || role_id == any_id {
415                    self.process_send_v5_0_connect(p)
416                } else {
417                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
418                }
419            }
420            // CONNACK - Server/Any can send
421            GenericPacket::V3_1_1Connack(p) => {
422                if role_id == server_id || role_id == any_id {
423                    self.process_send_v3_1_1_connack(p)
424                } else {
425                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
426                }
427            }
428            GenericPacket::V5_0Connack(p) => {
429                if role_id == server_id || role_id == any_id {
430                    self.process_send_v5_0_connack(p)
431                } else {
432                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
433                }
434            }
435            // PUBLISH - Any role can send
436            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
437            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
438            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
439            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
440            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
441            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
442            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
443            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
444            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
445            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
446            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
447            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
448            GenericPacket::V3_1_1Subscribe(p) => {
449                if role_id == client_id || role_id == any_id {
450                    self.process_send_v3_1_1_subscribe(p)
451                } else {
452                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
453                }
454            }
455            GenericPacket::V5_0Subscribe(p) => {
456                if role_id == client_id || role_id == any_id {
457                    self.process_send_v5_0_subscribe(p)
458                } else {
459                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
460                }
461            }
462            GenericPacket::V3_1_1Unsubscribe(p) => {
463                if role_id == client_id || role_id == any_id {
464                    self.process_send_v3_1_1_unsubscribe(p)
465                } else {
466                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
467                }
468            }
469            GenericPacket::V5_0Unsubscribe(p) => {
470                if role_id == client_id || role_id == any_id {
471                    self.process_send_v5_0_unsubscribe(p)
472                } else {
473                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
474                }
475            }
476            // SUBACK/UNSUBACK - Server/Any can send
477            GenericPacket::V3_1_1Suback(p) => {
478                if role_id == server_id || role_id == any_id {
479                    self.process_send_v3_1_1_suback(p)
480                } else {
481                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
482                }
483            }
484            GenericPacket::V5_0Suback(p) => {
485                if role_id == server_id || role_id == any_id {
486                    self.process_send_v5_0_suback(p)
487                } else {
488                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
489                }
490            }
491            GenericPacket::V3_1_1Unsuback(p) => {
492                if role_id == server_id || role_id == any_id {
493                    self.process_send_v3_1_1_unsuback(p)
494                } else {
495                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
496                }
497            }
498            GenericPacket::V5_0Unsuback(p) => {
499                if role_id == server_id || role_id == any_id {
500                    self.process_send_v5_0_unsuback(p)
501                } else {
502                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
503                }
504            }
505            // PINGREQ - Client/Any can send
506            GenericPacket::V3_1_1Pingreq(p) => {
507                if role_id == client_id || role_id == any_id {
508                    self.process_send_v3_1_1_pingreq(p)
509                } else {
510                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
511                }
512            }
513            GenericPacket::V5_0Pingreq(p) => {
514                if role_id == client_id || role_id == any_id {
515                    self.process_send_v5_0_pingreq(p)
516                } else {
517                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
518                }
519            }
520            // PINGRESP - Server/Any can send
521            GenericPacket::V3_1_1Pingresp(p) => {
522                if role_id == server_id || role_id == any_id {
523                    self.process_send_v3_1_1_pingresp(p)
524                } else {
525                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
526                }
527            }
528            GenericPacket::V5_0Pingresp(p) => {
529                if role_id == server_id || role_id == any_id {
530                    self.process_send_v5_0_pingresp(p)
531                } else {
532                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
533                }
534            }
535            // DISCONNECT(v3.1.1) - Client/Any role can send
536            GenericPacket::V3_1_1Disconnect(p) => {
537                if role_id == client_id || role_id == any_id {
538                    self.process_send_v3_1_1_disconnect(p)
539                } else {
540                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
541                }
542            }
543            // DISCONNECT(v5.0) - Any role can send
544            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
545            // AUTH - Any role can send (v5.0 only)
546            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
547        }
548    }
549
550    /// Receive and process incoming MQTT data
551    ///
552    /// This method processes raw bytes received from the network and attempts to
553    /// parse them into MQTT packets. It handles packet fragmentation and can
554    /// process multiple complete packets from a single data buffer.
555    ///
556    /// # Parameters
557    ///
558    /// * `data` - A cursor over the received data bytes. The cursor position will
559    ///   be advanced as data is consumed.
560    ///
561    /// # Returns
562    ///
563    /// A vector of events generated from processing the received data:
564    /// - `NotifyPacketReceived` for successfully parsed packets
565    /// - `NotifyError` for parsing errors or protocol violations
566    /// - Additional events based on packet processing (timers, responses, etc.)
567    ///
568    /// # Behavior
569    ///
570    /// - Handles partial packets (data will be buffered until complete)
571    /// - Processes multiple complete packets in sequence
572    /// - Validates packet structure and protocol compliance
573    /// - Updates internal connection state based on received packets
574    /// - Generates appropriate response events (ACKs, etc.)
575    ///
576    /// # Examples
577    ///
578    /// ```ignore
579    /// use mqtt_protocol_core::mqtt;
580    ///
581    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
582    /// let received_data = [/* network data */];
583    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
584    ///
585    /// let events = connection.recv(&mut cursor);
586    /// for event in events {
587    ///     match event {
588    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
589    ///             // Process received packet
590    ///         },
591    ///         mqtt::connection::Event::NotifyError(error) => {
592    ///             // Handle parsing/protocol errors
593    ///         },
594    ///         // Handle other events...
595    ///     }
596    /// }
597    /// ```
598    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
599        let mut events = Vec::new();
600
601        match self.packet_builder.feed(data) {
602            PacketBuildResult::Complete(raw_packet) => {
603                events.extend(self.process_recv_packet(raw_packet));
604            }
605            PacketBuildResult::Incomplete => {}
606            PacketBuildResult::Error(e) => {
607                self.cancel_timers(&mut events);
608                events.push(GenericEvent::RequestClose);
609                events.push(GenericEvent::NotifyError(e));
610            }
611        }
612
613        events
614    }
615
616    /// Notify that a timer has fired (Event-based API)
617    ///
618    /// This method should be called when the I/O layer detects that a timer has expired.
619    /// It handles the timer event appropriately and returns events that need to be processed.
620    /// Notify that a timer has fired
621    ///
622    /// This method should be called by the application when a previously requested
623    /// timer expires. The connection will take appropriate action based on the timer type.
624    ///
625    /// # Parameters
626    ///
627    /// * `kind` - The type of timer that fired
628    ///
629    /// # Returns
630    ///
631    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
632    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
633        let mut events = Vec::new();
634
635        match kind {
636            TimerKind::PingreqSend => {
637                // Reset timer flag
638                self.pingreq_send_set = false;
639
640                // Send PINGREQ if connected
641                if self.status == ConnectionStatus::Connected {
642                    match self.protocol_version {
643                        Version::V3_1_1 => {
644                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
645                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
646                            }
647                        }
648                        Version::V5_0 => {
649                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
650                                events.extend(self.process_send_v5_0_pingreq(pingreq));
651                            }
652                        }
653                        Version::Undetermined => {
654                            unreachable!("Protocol version should be set before sending PINGREQ");
655                        }
656                    }
657                }
658            }
659            TimerKind::PingreqRecv => {
660                // Reset timer flag
661                self.pingreq_recv_set = false;
662
663                match self.protocol_version {
664                    Version::V3_1_1 => {
665                        // V3.1.1: Close connection
666                        events.push(GenericEvent::RequestClose);
667                    }
668                    Version::V5_0 => {
669                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
670                        if self.status == ConnectionStatus::Connected {
671                            if let Ok(disconnect) = v5_0::Disconnect::builder()
672                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
673                                .build()
674                            {
675                                events.extend(self.process_send_v5_0_disconnect(disconnect));
676                            }
677                        }
678                    }
679                    Version::Undetermined => {
680                        unreachable!("Protocol version should be set before receiving PINGREQ");
681                    }
682                }
683            }
684            TimerKind::PingrespRecv => {
685                // Reset timer flag
686                self.pingresp_recv_set = false;
687
688                match self.protocol_version {
689                    Version::V3_1_1 => {
690                        // V3.1.1: Close connection
691                        events.push(GenericEvent::RequestClose);
692                    }
693                    Version::V5_0 => {
694                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
695                        if self.status == ConnectionStatus::Connected {
696                            if let Ok(disconnect) = v5_0::Disconnect::builder()
697                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
698                                .build()
699                            {
700                                events.extend(self.process_send_v5_0_disconnect(disconnect));
701                            }
702                        }
703                    }
704                    Version::Undetermined => {
705                        unreachable!("Protocol version should be set before receiving PINGRESP");
706                    }
707                }
708            }
709        }
710
711        events
712    }
713
714    /// Notify that the connection has been closed by the I/O layer (Event-based API)
715    ///
716    /// This method should be called when the I/O layer detects that the socket has been closed.
717    /// It updates the internal state appropriately and returns events that need to be processed.
718    /// Notify that the underlying connection has been closed
719    ///
720    /// This method should be called when the network connection is closed,
721    /// either intentionally or due to network issues.
722    ///
723    /// # Returns
724    ///
725    /// Events generated from connection closure processing
726    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
727        let mut events = Vec::new();
728
729        // Reset packet size limits to MQTT protocol maximum
730        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
731        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
732
733        // Set status to disconnected
734        self.status = ConnectionStatus::Disconnected;
735
736        // Clear topic alias management
737        self.topic_alias_send = None;
738        self.topic_alias_recv = None;
739
740        // Release packet IDs for SUBACK
741        for packet_id in self.pid_suback.drain() {
742            if self.pid_man.is_used_id(packet_id) {
743                self.pid_man.release_id(packet_id);
744                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
745            }
746        }
747
748        // Release packet IDs for UNSUBACK
749        for packet_id in self.pid_unsuback.drain() {
750            if self.pid_man.is_used_id(packet_id) {
751                self.pid_man.release_id(packet_id);
752                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
753            }
754        }
755
756        // If not storing session state, clear QoS2 states and release publish-related packet IDs
757        if !self.need_store {
758            self.qos2_publish_processing.clear();
759            self.qos2_publish_handled.clear();
760
761            // Release packet IDs for PUBACK
762            for packet_id in self.pid_puback.drain() {
763                if self.pid_man.is_used_id(packet_id) {
764                    self.pid_man.release_id(packet_id);
765                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
766                }
767            }
768
769            // Release packet IDs for PUBREC
770            for packet_id in self.pid_pubrec.drain() {
771                if self.pid_man.is_used_id(packet_id) {
772                    self.pid_man.release_id(packet_id);
773                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
774                }
775            }
776
777            // Release packet IDs for PUBCOMP
778            for packet_id in self.pid_pubcomp.drain() {
779                if self.pid_man.is_used_id(packet_id) {
780                    self.pid_man.release_id(packet_id);
781                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
782                }
783            }
784        }
785
786        // Cancel all timers
787        self.cancel_timers(&mut events);
788
789        events
790    }
791
792    /// Set the PINGREQ send interval
793    ///
794    /// Overrides the interval for sending PINGREQ packets to maintain the connection alive.
795    /// When changed, this may generate timer-related events to update the ping schedule.
796    ///
797    /// PINGREQ is sent after the interval has elapsed since the last packet of any type was sent.
798    ///
799    /// The send interval is determined by the following priority order:
800    /// 1. User setting by this function
801    /// 2. ServerKeepAlive property value in received CONNACK packet
802    /// 3. KeepAlive value in sent CONNECT packet
803    ///
804    /// # Parameters
805    ///
806    /// * `duration_ms` - The interval override setting:
807    ///   - `Some(value)` - Override with specified milliseconds. If 0, no PINGREQ is sent.
808    ///   - `None` - Do not override, use CONNACK ServerKeepAlive or CONNECT KeepAlive instead.
809    ///
810    /// # Returns
811    ///
812    /// Events generated from updating the ping interval
813    pub fn set_pingreq_send_interval(
814        &mut self,
815        duration_ms: Option<u64>,
816    ) -> Vec<GenericEvent<PacketIdType>> {
817        let mut events = Vec::new();
818        self.pingreq_user_send_interval_ms = duration_ms;
819        if let Some(ms) = duration_ms {
820            if ms == 0 {
821                if self.pingreq_send_set {
822                    self.pingreq_send_set = false;
823                    events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
824                }
825            } else if self.status == ConnectionStatus::Connected {
826                self.pingreq_send_set = true;
827                events.push(GenericEvent::RequestTimerReset {
828                    kind: TimerKind::PingreqSend,
829                    duration_ms: ms,
830                });
831            }
832        }
833        events
834    }
835
836    /// Get the remaining capacity for sending PUBLISH packets
837    ///
838    /// Returns the number of additional PUBLISH packets that can be sent
839    /// without exceeding the receive maximum limit.
840    ///
841    /// # Returns
842    ///
843    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
844    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
845        // If publish_recv_max is set, return the remaining capacity
846        self.publish_send_max
847            .map(|max| max.saturating_sub(self.publish_send_count))
848    }
849
850    /// Enable or disable offline publishing
851    ///
852    /// When enabled, PUBLISH packets can be sent even when disconnected.
853    /// They will be queued and sent once the connection is established.
854    ///
855    /// # Parameters
856    ///
857    /// * `enable` - Whether to enable offline publishing
858    pub fn set_offline_publish(&mut self, enable: bool) {
859        self.offline_publish = enable;
860        if self.offline_publish {
861            self.need_store = true;
862        }
863    }
864
865    /// Enable or disable automatic PUBLISH response generation
866    ///
867    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
868    /// are automatically generated for received PUBLISH packets.
869    ///
870    /// # Parameters
871    ///
872    /// * `enable` - Whether to enable automatic responses
873    pub fn set_auto_pub_response(&mut self, enable: bool) {
874        self.auto_pub_response = enable;
875    }
876
877    /// Enable or disable automatic PING response generation
878    ///
879    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
880    ///
881    /// # Parameters
882    ///
883    /// * `enable` - Whether to enable automatic PING responses
884    pub fn set_auto_ping_response(&mut self, enable: bool) {
885        self.auto_ping_response = enable;
886    }
887
888    /// Enable or disable automatic topic alias mapping for outgoing packets
889    ///
890    /// When enabled, the connection will automatically map topics to aliases
891    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
892    /// - Applying existing registered topic aliases when available
893    /// - Allocating new topic aliases for unregistered topics
894    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
895    ///
896    /// # Parameters
897    ///
898    /// * `enable` - Whether to enable automatic topic alias mapping
899    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
900        self.auto_map_topic_alias_send = enable;
901    }
902
903    /// Enable or disable automatic topic alias replacement for outgoing packets
904    ///
905    /// When enabled, the connection will automatically apply existing registered
906    /// topic aliases to outgoing PUBLISH packets when aliases are available.
907    /// This only uses previously registered aliases and does not allocate new ones.
908    ///
909    /// # Parameters
910    ///
911    /// * `enable` - Whether to enable automatic topic alias replacement
912    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
913        self.auto_replace_topic_alias_send = enable;
914    }
915
916    /// Set the PINGRESP receive timeout
917    ///
918    /// Sets the timeout for receiving PINGRESP packets after sending PINGREQ packets.
919    /// If PINGRESP is not received within this timeout, the connection is considered disconnected.
920    ///
921    /// # Parameters
922    ///
923    /// * `timeout_ms` - The timeout in milliseconds. Set to 0 to disable timeout.
924    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: u64) {
925        self.pingresp_recv_timeout_ms = timeout_ms;
926    }
927
928    /// Acquire a new packet ID for outgoing packets
929    ///
930    /// # Returns
931    ///
932    /// A unique packet ID, or an error if none are available
933    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
934        self.pid_man.acquire_unique_id()
935    }
936
937    /// Register a packet ID as in use
938    ///
939    /// Manually registers a specific packet ID as being in use, preventing
940    /// it from being allocated by `acquire_packet_id()`.
941    ///
942    /// # Parameters
943    ///
944    /// * `packet_id` - The packet ID to register as in use
945    ///
946    /// # Returns
947    ///
948    /// `Ok(())` if successful, or an error if the packet ID is already in use
949    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
950        self.pid_man.register_id(packet_id)
951    }
952
953    /// Release packet ID (Event-based API)
954    /// Release a packet ID for reuse
955    ///
956    /// This method releases a packet ID, making it available for future use.
957    /// It also generates a `NotifyPacketIdReleased` event.
958    ///
959    /// # Parameters
960    ///
961    /// * `packet_id` - The packet ID to release
962    ///
963    /// # Returns
964    ///
965    /// Events generated from releasing the packet ID
966    pub fn release_packet_id(
967        &mut self,
968        packet_id: PacketIdType,
969    ) -> Vec<GenericEvent<PacketIdType>> {
970        let mut events = Vec::new();
971
972        if self.pid_man.is_used_id(packet_id) {
973            self.pid_man.release_id(packet_id);
974            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
975        }
976
977        events
978    }
979
980    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
981    ///
982    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
983    /// that have been successfully processed and handled.
984    ///
985    /// # Returns
986    ///
987    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
988    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
989        self.qos2_publish_handled.clone()
990    }
991
992    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
993    ///
994    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
995    /// typically used when resuming a connection from persistent storage.
996    ///
997    /// # Parameters
998    ///
999    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
1000    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
1001        self.qos2_publish_handled = pids;
1002    }
1003
1004    /// Restore previously stored packets
1005    ///
1006    /// This method restores packets that were previously stored for persistence,
1007    /// typically called when resuming a session.
1008    ///
1009    /// # Parameters
1010    ///
1011    /// * `packets` - Vector of packets to restore
1012    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
1013        for packet in packets {
1014            match &packet {
1015                GenericStorePacket::V3_1_1Publish(p) => {
1016                    // Add to appropriate QoS tracking set
1017                    match p.qos() {
1018                        Qos::AtLeastOnce => {
1019                            self.pid_puback.insert(p.packet_id().unwrap());
1020                        }
1021                        Qos::ExactlyOnce => {
1022                            self.pid_pubrec.insert(p.packet_id().unwrap());
1023                        }
1024                        _ => {
1025                            // QoS 0 shouldn't be in store, but handle gracefully
1026                            warn!("QoS 0 packet found in store, skipping");
1027                            continue;
1028                        }
1029                    }
1030                    // Register packet ID and add to store
1031                    let packet_id = p.packet_id().unwrap();
1032                    if self.pid_man.register_id(packet_id).is_ok() {
1033                        if let Err(_e) = self.store.add(packet) {
1034                            error!("Failed to add packet to store: {:?}", _e);
1035                        }
1036                    } else {
1037                        error!("Packet ID {} has already been used. Skip it", packet_id);
1038                    }
1039                }
1040                GenericStorePacket::V5_0Publish(p) => {
1041                    // Add to appropriate QoS tracking set
1042                    match p.qos() {
1043                        Qos::AtLeastOnce => {
1044                            self.pid_puback.insert(p.packet_id().unwrap());
1045                        }
1046                        Qos::ExactlyOnce => {
1047                            self.pid_pubrec.insert(p.packet_id().unwrap());
1048                        }
1049                        _ => {
1050                            // QoS 0 shouldn't be in store, but handle gracefully
1051                            warn!("QoS 0 packet found in store, skipping");
1052                            continue;
1053                        }
1054                    }
1055                    // Register packet ID and add to store
1056                    let packet_id = p.packet_id().unwrap();
1057                    if self.pid_man.register_id(packet_id).is_ok() {
1058                        if let Err(_e) = self.store.add(packet) {
1059                            error!("Failed to add packet to store: {:?}", _e);
1060                        }
1061                    } else {
1062                        error!("Packet ID {} has already been used. Skip it", packet_id);
1063                    }
1064                }
1065                GenericStorePacket::V3_1_1Pubrel(p) => {
1066                    // Pubrel packets expect PUBCOMP response
1067                    self.pid_pubcomp.insert(p.packet_id());
1068                    // Register packet ID and add to store
1069                    let packet_id = p.packet_id();
1070                    if self.pid_man.register_id(packet_id).is_ok() {
1071                        if let Err(_e) = self.store.add(packet) {
1072                            error!("Failed to add packet to store: {:?}", _e);
1073                        }
1074                    } else {
1075                        error!("Packet ID {} has already been used. Skip it", packet_id);
1076                    }
1077                }
1078                GenericStorePacket::V5_0Pubrel(p) => {
1079                    // Pubrel packets expect PUBCOMP response
1080                    self.pid_pubcomp.insert(p.packet_id());
1081                    // Register packet ID and add to store
1082                    let packet_id = p.packet_id();
1083                    if self.pid_man.register_id(packet_id).is_ok() {
1084                        if let Err(_e) = self.store.add(packet) {
1085                            error!("Failed to add packet to store: {:?}", _e);
1086                        }
1087                    } else {
1088                        error!("Packet ID {} has already been used. Skip it", packet_id);
1089                    }
1090                }
1091            }
1092        }
1093    }
1094
1095    /// Get stored packets for persistence
1096    ///
1097    /// Returns packets that need to be stored for potential retransmission.
1098    /// This is useful for implementing persistent sessions.
1099    ///
1100    /// # Returns
1101    ///
1102    /// Vector of packets that should be persisted
1103    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1104        self.store.get_stored()
1105    }
1106
1107    /// Get the MQTT protocol version being used
1108    ///
1109    /// # Returns
1110    ///
1111    /// The protocol version (V3_1_1 or V5_0)
1112    pub fn get_protocol_version(&self) -> Version {
1113        self.protocol_version
1114    }
1115
1116    /// Check if a PUBLISH packet is currently being processed
1117    ///
1118    /// # Parameters
1119    ///
1120    /// * `packet_id` - The packet ID to check
1121    ///
1122    /// # Returns
1123    ///
1124    /// True if the packet ID is in use for PUBLISH processing
1125    pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1126        self.qos2_publish_processing.contains(&packet_id)
1127    }
1128
1129    /// Regulate packet for store (remove/resolve topic alias)
1130    ///
1131    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1132    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1133    pub fn regulate_for_store(
1134        &self,
1135        mut packet: v5_0::GenericPublish<PacketIdType>,
1136    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1137        if packet.topic_name().is_empty() {
1138            // Topic is empty, need to resolve from topic alias
1139            if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1140                if let Some(ref topic_alias_send) = self.topic_alias_send {
1141                    if let Some(topic) = topic_alias_send.peek(topic_alias) {
1142                        // Found topic for alias, add topic and remove alias property
1143                        packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1144                    } else {
1145                        return Err(MqttError::PacketNotRegulated);
1146                    }
1147                } else {
1148                    return Err(MqttError::PacketNotRegulated);
1149                }
1150            } else {
1151                return Err(MqttError::PacketNotRegulated);
1152            }
1153        } else {
1154            // Topic is not empty, just remove TopicAlias property if present
1155            packet = packet.remove_topic_alias();
1156        }
1157
1158        Ok(packet)
1159    }
1160
1161    // private
1162
1163    /// Initialize connection state based on client/server role
1164    ///
1165    /// Resets all connection-specific state including:
1166    /// - Publish flow control counters and limits
1167    /// - Topic alias management
1168    /// - QoS2 processing state
1169    /// - Packet ID tracking sets
1170    /// - Store requirement flag
1171    ///
1172    /// # Parameters
1173    /// * `is_client` - true for client mode, false for server mode
1174    fn initialize(&mut self, is_client: bool) {
1175        self.publish_send_max = None;
1176        self.publish_recv_max = None;
1177        self.publish_send_count = 0;
1178        self.topic_alias_send = None;
1179        self.topic_alias_recv = None;
1180        self.publish_recv.clear();
1181        self.qos2_publish_processing.clear();
1182        self.need_store = false;
1183        self.pid_suback.clear();
1184        self.pid_unsuback.clear();
1185        self.is_client = is_client;
1186        self.pingreq_keep_alive_ms = 0;
1187        self.pingreq_server_keep_alive_ms = None;
1188    }
1189
1190    fn clear_store_related(&mut self) {
1191        self.pid_man.clear();
1192        self.pid_puback.clear();
1193        self.pid_pubrec.clear();
1194        self.pid_pubcomp.clear();
1195        self.store.clear();
1196    }
1197
1198    /// Send all stored packets for retransmission
1199    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1200        let mut events = Vec::new();
1201        self.store.for_each(|packet| {
1202            if packet.size() > self.maximum_packet_size_send as usize {
1203                let packet_id = packet.packet_id();
1204                self.pid_man.release_id(packet_id);
1205                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1206                return false; // Remove from store
1207            }
1208            events.push(GenericEvent::RequestSendPacket {
1209                packet: packet.clone().into(),
1210                release_packet_id_if_send_error: None,
1211            });
1212            true // Keep in store
1213        });
1214
1215        events
1216    }
1217
1218    /// Validate topic alias and return the associated topic name
1219    ///
1220    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1221    /// from the topic alias send manager.
1222    ///
1223    /// # Parameters
1224    /// * `topic_alias_opt` - Optional topic alias value
1225    ///
1226    /// # Returns
1227    /// * `Some(topic_name)` if the topic alias is valid and found
1228    /// * `None` if the topic alias is invalid, not provided, or not found
1229    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1230        let topic_alias = topic_alias_opt?;
1231
1232        if !self.validate_topic_alias_range(topic_alias) {
1233            return None;
1234        }
1235
1236        let topic_alias_send = self.topic_alias_send.as_mut()?;
1237        // LRU updated here
1238        let topic = topic_alias_send.get(topic_alias)?;
1239
1240        Some(topic.to_string())
1241    }
1242
1243    /// Validate that topic alias is within the allowed range
1244    ///
1245    /// Checks if the topic alias value is valid according to the configured
1246    /// topic alias maximum for sending.
1247    ///
1248    /// # Parameters
1249    /// * `topic_alias` - Topic alias value to validate
1250    ///
1251    /// # Returns
1252    /// * `true` if the topic alias is within valid range
1253    /// * `false` if invalid or topic alias sending is not configured
1254    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1255        let topic_alias_send = match &self.topic_alias_send {
1256            Some(tas) => tas,
1257            None => {
1258                error!("topic_alias is set but topic_alias_maximum is 0");
1259                return false;
1260            }
1261        };
1262
1263        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1264            error!("topic_alias is set but out of range");
1265            return false;
1266        }
1267
1268        true
1269    }
1270
1271    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1272    pub(crate) fn process_send_v3_1_1_connect(
1273        &mut self,
1274        packet: v3_1_1::Connect,
1275    ) -> Vec<GenericEvent<PacketIdType>> {
1276        info!("send connect v3.1.1: {packet}");
1277
1278        if self.status != ConnectionStatus::Disconnected {
1279            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1280        }
1281
1282        let mut events = Vec::new();
1283        self.initialize(true);
1284        self.status = ConnectionStatus::Connecting;
1285
1286        self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1287
1288        // Handle clean_session flag
1289        if packet.clean_start() {
1290            self.clear_store_related();
1291        } else {
1292            self.need_store = true;
1293        }
1294
1295        // Clear topic alias for sending
1296        self.topic_alias_send = None;
1297
1298        events.push(GenericEvent::RequestSendPacket {
1299            packet: packet.into(),
1300            release_packet_id_if_send_error: None,
1301        });
1302        self.send_post_process(&mut events);
1303
1304        events
1305    }
1306
1307    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1308    pub(crate) fn process_send_v5_0_connect(
1309        &mut self,
1310        packet: v5_0::Connect,
1311    ) -> Vec<GenericEvent<PacketIdType>> {
1312        info!("send connect v5.0: {packet}");
1313        if !self.validate_maximum_packet_size_send(packet.size()) {
1314            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1315        }
1316        if self.status != ConnectionStatus::Disconnected {
1317            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1318        }
1319
1320        let mut events = Vec::new();
1321        self.initialize(true);
1322        self.status = ConnectionStatus::Connecting;
1323
1324        self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1325
1326        // Handle clean_start flag
1327        if packet.clean_start() {
1328            self.clear_store_related();
1329        }
1330
1331        // Process properties
1332        for prop in packet.props() {
1333            match prop {
1334                Property::TopicAliasMaximum(val) => {
1335                    if val.val() != 0 {
1336                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1337                    }
1338                }
1339                Property::ReceiveMaximum(val) => {
1340                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1341                    self.publish_recv_max = Some(val.val());
1342                }
1343                Property::MaximumPacketSize(val) => {
1344                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1345                    self.maximum_packet_size_recv = val.val();
1346                }
1347                Property::SessionExpiryInterval(val) => {
1348                    if val.val() != 0 {
1349                        self.need_store = true;
1350                    }
1351                }
1352                _ => {
1353                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1354                }
1355            }
1356        }
1357
1358        events.push(GenericEvent::RequestSendPacket {
1359            packet: packet.into(),
1360            release_packet_id_if_send_error: None,
1361        });
1362        self.send_post_process(&mut events);
1363
1364        events
1365    }
1366
1367    pub(crate) fn process_send_v3_1_1_connack(
1368        &mut self,
1369        packet: v3_1_1::Connack,
1370    ) -> Vec<GenericEvent<PacketIdType>> {
1371        info!("send connack v3.1.1: {packet}");
1372        if self.status != ConnectionStatus::Connecting {
1373            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1374        }
1375        let mut events = Vec::new();
1376        let rc = packet.return_code();
1377        events.push(GenericEvent::RequestSendPacket {
1378            packet: packet.into(),
1379            release_packet_id_if_send_error: None,
1380        });
1381        if rc != ConnectReturnCode::Accepted {
1382            self.status = ConnectionStatus::Disconnected;
1383            self.cancel_timers(&mut events);
1384            events.push(GenericEvent::RequestClose);
1385            return events;
1386        }
1387
1388        self.status = ConnectionStatus::Connected;
1389        events.extend(self.send_stored());
1390        self.send_post_process(&mut events);
1391
1392        events
1393    }
1394
1395    pub(crate) fn process_send_v5_0_connack(
1396        &mut self,
1397        packet: v5_0::Connack,
1398    ) -> Vec<GenericEvent<PacketIdType>> {
1399        info!("send connack v5.0: {packet}");
1400        if !self.validate_maximum_packet_size_send(packet.size()) {
1401            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1402        }
1403        if self.status != ConnectionStatus::Connecting {
1404            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1405        }
1406
1407        let mut events = Vec::new();
1408        let rc = packet.reason_code();
1409        if rc == ConnectReasonCode::Success {
1410            // Process properties
1411            for prop in packet.props() {
1412                match prop {
1413                    Property::TopicAliasMaximum(val) => {
1414                        if val.val() != 0 {
1415                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1416                        }
1417                    }
1418                    Property::ReceiveMaximum(val) => {
1419                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1420                        self.publish_recv_max = Some(val.val());
1421                    }
1422                    Property::MaximumPacketSize(val) => {
1423                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1424                        self.maximum_packet_size_recv = val.val();
1425                    }
1426                    Property::ServerKeepAlive(val) => {
1427                        let val = val.val();
1428                        if val == 0 {
1429                            if self.pingreq_recv_set {
1430                                self.pingreq_recv_set = false;
1431                                events
1432                                    .push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
1433                            }
1434                            self.pingreq_recv_timeout_ms = 0;
1435                        } else {
1436                            self.pingreq_recv_timeout_ms = val as u64 * 1000 * 3 / 2;
1437                            self.pingreq_recv_set = true;
1438                            events.push(GenericEvent::RequestTimerReset {
1439                                kind: TimerKind::PingreqRecv,
1440                                duration_ms: self.pingreq_recv_timeout_ms,
1441                            });
1442                        }
1443                    }
1444                    _ => {
1445                        // Ignore other properties
1446                    }
1447                }
1448            }
1449        }
1450        events.push(GenericEvent::RequestSendPacket {
1451            packet: packet.into(),
1452            release_packet_id_if_send_error: None,
1453        });
1454
1455        if rc != ConnectReasonCode::Success {
1456            self.status = ConnectionStatus::Disconnected;
1457            self.cancel_timers(&mut events);
1458            events.push(GenericEvent::RequestClose);
1459            return events;
1460        }
1461
1462        self.status = ConnectionStatus::Connected;
1463
1464        events.extend(self.send_stored());
1465        self.send_post_process(&mut events);
1466
1467        events
1468    }
1469
1470    pub(crate) fn process_send_v3_1_1_publish(
1471        &mut self,
1472        packet: v3_1_1::GenericPublish<PacketIdType>,
1473    ) -> Vec<GenericEvent<PacketIdType>> {
1474        let mut events = Vec::new();
1475        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1476
1477        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1478            // Register packet ID for QoS 1 or 2
1479            let packet_id = packet.packet_id().unwrap();
1480            if self.status != ConnectionStatus::Connected
1481                && !self.need_store
1482                && !self.offline_publish
1483            {
1484                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1485                if self.pid_man.is_used_id(packet_id) {
1486                    self.pid_man.release_id(packet_id);
1487                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1488                }
1489                return events;
1490            }
1491            if !self.pid_man.is_used_id(packet_id) {
1492                error!("packet_id {packet_id} must be acquired or registered");
1493                events.push(GenericEvent::NotifyError(
1494                    MqttError::PacketIdentifierInvalid,
1495                ));
1496                return events;
1497            }
1498            if self.need_store
1499                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1500            {
1501                let store_packet = packet.clone().set_dup(true);
1502                self.store.add(store_packet.try_into().unwrap()).unwrap();
1503            } else {
1504                release_packet_id_if_send_error = Some(packet_id);
1505            }
1506            if packet.qos() == Qos::ExactlyOnce {
1507                self.qos2_publish_processing.insert(packet_id);
1508                self.pid_pubrec.insert(packet_id);
1509            } else {
1510                self.pid_puback.insert(packet_id);
1511            }
1512        } else if self.status != ConnectionStatus::Connected {
1513            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1514            return events;
1515        }
1516
1517        if self.status == ConnectionStatus::Connected {
1518            events.push(GenericEvent::RequestSendPacket {
1519                packet: packet.into(),
1520                release_packet_id_if_send_error,
1521            });
1522        }
1523        self.send_post_process(&mut events);
1524
1525        events
1526    }
1527
1528    pub(crate) fn process_send_v5_0_publish(
1529        &mut self,
1530        mut packet: v5_0::GenericPublish<PacketIdType>,
1531    ) -> Vec<GenericEvent<PacketIdType>> {
1532        if !self.validate_maximum_packet_size_send(packet.size()) {
1533            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1534        }
1535
1536        let mut events = Vec::new();
1537        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1538        let mut topic_alias_validated = false;
1539        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1540            let packet_id = packet.packet_id().unwrap();
1541            if self.status != ConnectionStatus::Connected
1542                && !self.need_store
1543                && !self.offline_publish
1544            {
1545                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1546                if self.pid_man.is_used_id(packet_id) {
1547                    self.pid_man.release_id(packet_id);
1548                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1549                }
1550                return events;
1551            }
1552
1553            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1554            if !self.pid_man.is_used_id(packet_id) {
1555                error!("packet_id {packet_id} must be acquired or registered");
1556                events.push(GenericEvent::NotifyError(
1557                    MqttError::PacketIdentifierInvalid,
1558                ));
1559                return events;
1560            }
1561
1562            if self.need_store
1563                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1564            {
1565                let ta_opt = Self::get_topic_alias_from_props(packet.props());
1566                if packet.topic_name().is_empty() {
1567                    // Topic name is empty, must validate topic alias
1568                    let topic_opt = self.validate_topic_alias(ta_opt);
1569                    if topic_opt.is_none() {
1570                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1571                        if self.pid_man.is_used_id(packet_id) {
1572                            self.pid_man.release_id(packet_id);
1573                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1574                        }
1575                        return events;
1576                    }
1577                    topic_alias_validated = true;
1578                    let store_packet = packet
1579                        .clone()
1580                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1581                        .unwrap()
1582                        .set_dup(true);
1583                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1584                } else {
1585                    // Topic name is not empty, remove topic alias if present
1586                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1587                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1588                }
1589            } else {
1590                release_packet_id_if_send_error = Some(packet_id);
1591            }
1592            if packet.qos() == Qos::ExactlyOnce {
1593                self.qos2_publish_processing.insert(packet_id);
1594                self.pid_pubrec.insert(packet_id);
1595            } else {
1596                self.pid_puback.insert(packet_id);
1597            }
1598        } else if self.status != ConnectionStatus::Connected {
1599            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1600            return events;
1601        }
1602
1603        let packet_id_opt = packet.packet_id();
1604        let ta_opt = Self::get_topic_alias_from_props(packet.props());
1605        if packet.topic_name().is_empty() {
1606            // process manually provided TopicAlias
1607            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1608                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1609                if let Some(packet_id) = packet_id_opt {
1610                    if self.pid_man.is_used_id(packet_id) {
1611                        self.pid_man.release_id(packet_id);
1612                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1613                    }
1614                }
1615                return events;
1616            }
1617        } else if let Some(ta) = ta_opt {
1618            // Topic alias is provided
1619            if self.validate_topic_alias_range(ta) {
1620                trace!(
1621                    "topic alias: {} - {} is registered.",
1622                    packet.topic_name(),
1623                    ta
1624                );
1625                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1626                    topic_alias_send.insert_or_update(packet.topic_name(), ta);
1627                }
1628            } else {
1629                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1630                if let Some(packet_id) = packet_id_opt {
1631                    if self.pid_man.is_used_id(packet_id) {
1632                        self.pid_man.release_id(packet_id);
1633                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1634                    }
1635                }
1636                return events;
1637            }
1638        } else if self.status == ConnectionStatus::Connected {
1639            // process auto applying TopicAlias if the option is enabled
1640            if self.auto_map_topic_alias_send {
1641                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1642                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1643                        trace!(
1644                            "topic alias: {} - {} is found.",
1645                            packet.topic_name(),
1646                            found_ta
1647                        );
1648                        packet = packet.remove_topic_add_topic_alias(found_ta);
1649                    } else {
1650                        let lru_ta = topic_alias_send.get_lru_alias();
1651                        topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1652                        packet = packet.remove_topic_add_topic_alias(lru_ta);
1653                    }
1654                }
1655            } else if self.auto_replace_topic_alias_send {
1656                if let Some(ref topic_alias_send) = self.topic_alias_send {
1657                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1658                        trace!(
1659                            "topic alias: {} - {} is found.",
1660                            packet.topic_name(),
1661                            found_ta
1662                        );
1663                        packet = packet.remove_topic_add_topic_alias(found_ta);
1664                    }
1665                }
1666            }
1667        }
1668
1669        // Check receive_maximum for sending (QoS 1 and 2 packets)
1670        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1671            if let Some(max) = self.publish_send_max {
1672                if self.publish_send_count == max {
1673                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1674                    if let Some(packet_id) = packet_id_opt {
1675                        if self.pid_man.is_used_id(packet_id) {
1676                            self.pid_man.release_id(packet_id);
1677                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1678                        }
1679                    }
1680                    return events;
1681                }
1682                self.publish_send_count += 1;
1683            }
1684        }
1685
1686        if self.status == ConnectionStatus::Connected {
1687            events.push(GenericEvent::RequestSendPacket {
1688                packet: packet.into(),
1689                release_packet_id_if_send_error,
1690            });
1691        }
1692        self.send_post_process(&mut events);
1693
1694        events
1695    }
1696
1697    pub(crate) fn process_send_v3_1_1_puback(
1698        &mut self,
1699        packet: v3_1_1::GenericPuback<PacketIdType>,
1700    ) -> Vec<GenericEvent<PacketIdType>> {
1701        if self.status != ConnectionStatus::Connected {
1702            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1703        }
1704        let mut events = Vec::new();
1705
1706        events.push(GenericEvent::RequestSendPacket {
1707            packet: packet.into(),
1708            release_packet_id_if_send_error: None,
1709        });
1710        self.send_post_process(&mut events);
1711
1712        events
1713    }
1714
1715    pub(crate) fn process_send_v5_0_puback(
1716        &mut self,
1717        packet: v5_0::GenericPuback<PacketIdType>,
1718    ) -> Vec<GenericEvent<PacketIdType>> {
1719        if !self.validate_maximum_packet_size_send(packet.size()) {
1720            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1721        }
1722        if self.status != ConnectionStatus::Connected {
1723            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1724        }
1725
1726        let mut events = Vec::new();
1727        self.publish_recv.remove(&packet.packet_id());
1728
1729        events.push(GenericEvent::RequestSendPacket {
1730            packet: packet.into(),
1731            release_packet_id_if_send_error: None,
1732        });
1733        self.send_post_process(&mut events);
1734
1735        events
1736    }
1737
1738    pub(crate) fn process_send_v3_1_1_pubrec(
1739        &mut self,
1740        packet: v3_1_1::GenericPubrec<PacketIdType>,
1741    ) -> Vec<GenericEvent<PacketIdType>> {
1742        if self.status != ConnectionStatus::Connected {
1743            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1744        }
1745        let mut events = Vec::new();
1746
1747        events.push(GenericEvent::RequestSendPacket {
1748            packet: packet.into(),
1749            release_packet_id_if_send_error: None,
1750        });
1751        self.send_post_process(&mut events);
1752
1753        events
1754    }
1755
1756    pub(crate) fn process_send_v5_0_pubrec(
1757        &mut self,
1758        packet: v5_0::GenericPubrec<PacketIdType>,
1759    ) -> Vec<GenericEvent<PacketIdType>> {
1760        if !self.validate_maximum_packet_size_send(packet.size()) {
1761            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1762        }
1763        if self.status != ConnectionStatus::Connected {
1764            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1765        }
1766
1767        let mut events = Vec::new();
1768        let packet_id = packet.packet_id();
1769
1770        if let Some(rc) = packet.reason_code() {
1771            if rc.is_failure() {
1772                self.publish_recv.remove(&packet_id);
1773                self.qos2_publish_handled.remove(&packet_id);
1774            }
1775        }
1776
1777        events.push(GenericEvent::RequestSendPacket {
1778            packet: packet.into(),
1779            release_packet_id_if_send_error: None,
1780        });
1781        self.send_post_process(&mut events);
1782
1783        events
1784    }
1785
1786    pub(crate) fn process_send_v3_1_1_pubrel(
1787        &mut self,
1788        packet: v3_1_1::GenericPubrel<PacketIdType>,
1789    ) -> Vec<GenericEvent<PacketIdType>> {
1790        if self.status != ConnectionStatus::Connected && !self.need_store {
1791            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1792        }
1793        let mut events = Vec::new();
1794        let packet_id = packet.packet_id();
1795        if !self.pid_man.is_used_id(packet_id) {
1796            error!("packet_id {packet_id} must be acquired or registered");
1797            events.push(GenericEvent::NotifyError(
1798                MqttError::PacketIdentifierInvalid,
1799            ));
1800            return events;
1801        }
1802        if self.need_store {
1803            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1804        }
1805
1806        if self.status == ConnectionStatus::Connected {
1807            self.pid_pubcomp.insert(packet_id);
1808            events.push(GenericEvent::RequestSendPacket {
1809                packet: packet.into(),
1810                release_packet_id_if_send_error: None,
1811            });
1812        }
1813        self.send_post_process(&mut events);
1814
1815        events
1816    }
1817
1818    pub(crate) fn process_send_v5_0_pubrel(
1819        &mut self,
1820        packet: v5_0::GenericPubrel<PacketIdType>,
1821    ) -> Vec<GenericEvent<PacketIdType>> {
1822        if !self.validate_maximum_packet_size_send(packet.size()) {
1823            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1824        }
1825        if self.status != ConnectionStatus::Connected && !self.need_store {
1826            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1827        }
1828
1829        let mut events = Vec::new();
1830        let packet_id = packet.packet_id();
1831        if !self.pid_man.is_used_id(packet_id) {
1832            error!("packet_id {packet_id} must be acquired or registered");
1833            events.push(GenericEvent::NotifyError(
1834                MqttError::PacketIdentifierInvalid,
1835            ));
1836            return events;
1837        }
1838        if self.need_store {
1839            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1840        }
1841
1842        if self.status == ConnectionStatus::Connected {
1843            self.pid_pubcomp.insert(packet_id);
1844            events.push(GenericEvent::RequestSendPacket {
1845                packet: packet.into(),
1846                release_packet_id_if_send_error: None,
1847            });
1848        }
1849        self.send_post_process(&mut events);
1850
1851        events
1852    }
1853
1854    pub(crate) fn process_send_v3_1_1_pubcomp(
1855        &mut self,
1856        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1857    ) -> Vec<GenericEvent<PacketIdType>> {
1858        if self.status != ConnectionStatus::Connected {
1859            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1860        }
1861        let mut events = Vec::new();
1862
1863        events.push(GenericEvent::RequestSendPacket {
1864            packet: packet.into(),
1865            release_packet_id_if_send_error: None,
1866        });
1867        self.send_post_process(&mut events);
1868
1869        events
1870    }
1871
1872    pub(crate) fn process_send_v5_0_pubcomp(
1873        &mut self,
1874        packet: v5_0::GenericPubcomp<PacketIdType>,
1875    ) -> Vec<GenericEvent<PacketIdType>> {
1876        if !self.validate_maximum_packet_size_send(packet.size()) {
1877            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1878        }
1879        if self.status != ConnectionStatus::Connected {
1880            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1881        }
1882
1883        let mut events = Vec::new();
1884        self.publish_recv.remove(&packet.packet_id());
1885
1886        events.push(GenericEvent::RequestSendPacket {
1887            packet: packet.into(),
1888            release_packet_id_if_send_error: None,
1889        });
1890        self.send_post_process(&mut events);
1891
1892        events
1893    }
1894
1895    pub(crate) fn process_send_v3_1_1_subscribe(
1896        &mut self,
1897        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1898    ) -> Vec<GenericEvent<PacketIdType>> {
1899        let mut events = Vec::new();
1900        let packet_id = packet.packet_id();
1901        if self.status != ConnectionStatus::Connected {
1902            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1903            if self.pid_man.is_used_id(packet_id) {
1904                self.pid_man.release_id(packet_id);
1905                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1906            }
1907            return events;
1908        }
1909        if !self.pid_man.is_used_id(packet_id) {
1910            error!("packet_id {packet_id} must be acquired or registered");
1911            events.push(GenericEvent::NotifyError(
1912                MqttError::PacketIdentifierInvalid,
1913            ));
1914            return events;
1915        }
1916        self.pid_suback.insert(packet_id);
1917
1918        events.push(GenericEvent::RequestSendPacket {
1919            packet: packet.into(),
1920            release_packet_id_if_send_error: Some(packet_id),
1921        });
1922        self.send_post_process(&mut events);
1923
1924        events
1925    }
1926
1927    pub(crate) fn process_send_v5_0_subscribe(
1928        &mut self,
1929        packet: v5_0::GenericSubscribe<PacketIdType>,
1930    ) -> Vec<GenericEvent<PacketIdType>> {
1931        if !self.validate_maximum_packet_size_send(packet.size()) {
1932            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1933        }
1934
1935        let mut events = Vec::new();
1936        let packet_id = packet.packet_id();
1937        if self.status != ConnectionStatus::Connected {
1938            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1939            if self.pid_man.is_used_id(packet_id) {
1940                self.pid_man.release_id(packet_id);
1941                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1942            }
1943            return events;
1944        }
1945        if !self.pid_man.is_used_id(packet_id) {
1946            error!("packet_id {packet_id} must be acquired or registered");
1947            events.push(GenericEvent::NotifyError(
1948                MqttError::PacketIdentifierInvalid,
1949            ));
1950            return events;
1951        }
1952        self.pid_suback.insert(packet_id);
1953
1954        events.push(GenericEvent::RequestSendPacket {
1955            packet: packet.into(),
1956            release_packet_id_if_send_error: Some(packet_id),
1957        });
1958        self.send_post_process(&mut events);
1959
1960        events
1961    }
1962
1963    pub(crate) fn process_send_v3_1_1_suback(
1964        &mut self,
1965        packet: v3_1_1::GenericSuback<PacketIdType>,
1966    ) -> Vec<GenericEvent<PacketIdType>> {
1967        if self.status != ConnectionStatus::Connected {
1968            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1969        }
1970        let mut events = Vec::new();
1971        events.push(GenericEvent::RequestSendPacket {
1972            packet: packet.into(),
1973            release_packet_id_if_send_error: None,
1974        });
1975        self.send_post_process(&mut events);
1976
1977        events
1978    }
1979
1980    pub(crate) fn process_send_v5_0_suback(
1981        &mut self,
1982        packet: v5_0::GenericSuback<PacketIdType>,
1983    ) -> Vec<GenericEvent<PacketIdType>> {
1984        if !self.validate_maximum_packet_size_send(packet.size()) {
1985            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1986        }
1987        if self.status != ConnectionStatus::Connected {
1988            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1989        }
1990
1991        let mut events = Vec::new();
1992        events.push(GenericEvent::RequestSendPacket {
1993            packet: packet.into(),
1994            release_packet_id_if_send_error: None,
1995        });
1996        self.send_post_process(&mut events);
1997
1998        events
1999    }
2000
2001    pub(crate) fn process_send_v3_1_1_unsubscribe(
2002        &mut self,
2003        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
2004    ) -> Vec<GenericEvent<PacketIdType>> {
2005        let mut events = Vec::new();
2006        let packet_id = packet.packet_id();
2007        if self.status != ConnectionStatus::Connected {
2008            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2009            if self.pid_man.is_used_id(packet_id) {
2010                self.pid_man.release_id(packet_id);
2011                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2012            }
2013            return events;
2014        }
2015        if !self.pid_man.is_used_id(packet_id) {
2016            error!("packet_id {packet_id} must be acquired or registered");
2017            events.push(GenericEvent::NotifyError(
2018                MqttError::PacketIdentifierInvalid,
2019            ));
2020            return events;
2021        }
2022        self.pid_unsuback.insert(packet_id);
2023
2024        events.push(GenericEvent::RequestSendPacket {
2025            packet: packet.into(),
2026            release_packet_id_if_send_error: Some(packet_id),
2027        });
2028        self.send_post_process(&mut events);
2029
2030        events
2031    }
2032
2033    pub(crate) fn process_send_v5_0_unsubscribe(
2034        &mut self,
2035        packet: v5_0::GenericUnsubscribe<PacketIdType>,
2036    ) -> Vec<GenericEvent<PacketIdType>> {
2037        if !self.validate_maximum_packet_size_send(packet.size()) {
2038            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2039        }
2040
2041        let mut events = Vec::new();
2042        let packet_id = packet.packet_id();
2043        if self.status != ConnectionStatus::Connected {
2044            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2045            if self.pid_man.is_used_id(packet_id) {
2046                self.pid_man.release_id(packet_id);
2047                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2048            }
2049            return events;
2050        }
2051        if !self.pid_man.is_used_id(packet_id) {
2052            error!("packet_id {packet_id} must be acquired or registered");
2053            events.push(GenericEvent::NotifyError(
2054                MqttError::PacketIdentifierInvalid,
2055            ));
2056            return events;
2057        }
2058        self.pid_unsuback.insert(packet_id);
2059
2060        events.push(GenericEvent::RequestSendPacket {
2061            packet: packet.into(),
2062            release_packet_id_if_send_error: Some(packet_id),
2063        });
2064        self.send_post_process(&mut events);
2065
2066        events
2067    }
2068
2069    pub(crate) fn process_send_v3_1_1_unsuback(
2070        &mut self,
2071        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2072    ) -> Vec<GenericEvent<PacketIdType>> {
2073        if self.status != ConnectionStatus::Connected {
2074            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2075        }
2076        let mut events = Vec::new();
2077        events.push(GenericEvent::RequestSendPacket {
2078            packet: packet.into(),
2079            release_packet_id_if_send_error: None,
2080        });
2081        self.send_post_process(&mut events);
2082
2083        events
2084    }
2085
2086    pub(crate) fn process_send_v5_0_unsuback(
2087        &mut self,
2088        packet: v5_0::GenericUnsuback<PacketIdType>,
2089    ) -> Vec<GenericEvent<PacketIdType>> {
2090        if !self.validate_maximum_packet_size_send(packet.size()) {
2091            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2092        }
2093        if self.status != ConnectionStatus::Connected {
2094            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2095        }
2096
2097        let mut events = Vec::new();
2098        events.push(GenericEvent::RequestSendPacket {
2099            packet: packet.into(),
2100            release_packet_id_if_send_error: None,
2101        });
2102        self.send_post_process(&mut events);
2103
2104        events
2105    }
2106
2107    pub(crate) fn process_send_v3_1_1_pingreq(
2108        &mut self,
2109        packet: v3_1_1::Pingreq,
2110    ) -> Vec<GenericEvent<PacketIdType>> {
2111        if self.status != ConnectionStatus::Connected {
2112            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2113        }
2114        let mut events = Vec::new();
2115        events.push(GenericEvent::RequestSendPacket {
2116            packet: packet.into(),
2117            release_packet_id_if_send_error: None,
2118        });
2119        if self.pingresp_recv_timeout_ms != 0 {
2120            self.pingresp_recv_set = true;
2121            events.push(GenericEvent::RequestTimerReset {
2122                kind: TimerKind::PingrespRecv,
2123                duration_ms: self.pingresp_recv_timeout_ms,
2124            });
2125        }
2126        self.send_post_process(&mut events);
2127
2128        events
2129    }
2130
2131    pub(crate) fn process_send_v5_0_pingreq(
2132        &mut self,
2133        packet: v5_0::Pingreq,
2134    ) -> Vec<GenericEvent<PacketIdType>> {
2135        if !self.validate_maximum_packet_size_send(packet.size()) {
2136            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2137        }
2138        if self.status != ConnectionStatus::Connected {
2139            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2140        }
2141
2142        let mut events = Vec::new();
2143        events.push(GenericEvent::RequestSendPacket {
2144            packet: packet.into(),
2145            release_packet_id_if_send_error: None,
2146        });
2147        if self.pingresp_recv_timeout_ms != 0 {
2148            self.pingresp_recv_set = true;
2149            events.push(GenericEvent::RequestTimerReset {
2150                kind: TimerKind::PingrespRecv,
2151                duration_ms: self.pingresp_recv_timeout_ms,
2152            });
2153        }
2154        self.send_post_process(&mut events);
2155
2156        events
2157    }
2158
2159    pub(crate) fn process_send_v3_1_1_pingresp(
2160        &mut self,
2161        packet: v3_1_1::Pingresp,
2162    ) -> Vec<GenericEvent<PacketIdType>> {
2163        if self.status != ConnectionStatus::Connected {
2164            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2165        }
2166        let mut events = Vec::new();
2167        events.push(GenericEvent::RequestSendPacket {
2168            packet: packet.into(),
2169            release_packet_id_if_send_error: None,
2170        });
2171        self.send_post_process(&mut events);
2172
2173        events
2174    }
2175
2176    pub(crate) fn process_send_v5_0_pingresp(
2177        &mut self,
2178        packet: v5_0::Pingresp,
2179    ) -> Vec<GenericEvent<PacketIdType>> {
2180        if !self.validate_maximum_packet_size_send(packet.size()) {
2181            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2182        }
2183        if self.status != ConnectionStatus::Connected {
2184            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2185        }
2186
2187        let mut events = Vec::new();
2188        events.push(GenericEvent::RequestSendPacket {
2189            packet: packet.into(),
2190            release_packet_id_if_send_error: None,
2191        });
2192        self.send_post_process(&mut events);
2193
2194        events
2195    }
2196
2197    pub(crate) fn process_send_v3_1_1_disconnect(
2198        &mut self,
2199        packet: v3_1_1::Disconnect,
2200    ) -> Vec<GenericEvent<PacketIdType>> {
2201        if self.status != ConnectionStatus::Connected {
2202            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2203        }
2204        let mut events = Vec::new();
2205        self.status = ConnectionStatus::Disconnected;
2206        self.cancel_timers(&mut events);
2207        events.push(GenericEvent::RequestSendPacket {
2208            packet: packet.into(),
2209            release_packet_id_if_send_error: None,
2210        });
2211        events.push(GenericEvent::RequestClose);
2212
2213        events
2214    }
2215
2216    pub(crate) fn process_send_v5_0_disconnect(
2217        &mut self,
2218        packet: v5_0::Disconnect,
2219    ) -> Vec<GenericEvent<PacketIdType>> {
2220        if !self.validate_maximum_packet_size_send(packet.size()) {
2221            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2222        }
2223        if self.status != ConnectionStatus::Connected {
2224            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2225        }
2226
2227        let mut events = Vec::new();
2228        self.status = ConnectionStatus::Disconnected;
2229        self.cancel_timers(&mut events);
2230        events.push(GenericEvent::RequestSendPacket {
2231            packet: packet.into(),
2232            release_packet_id_if_send_error: None,
2233        });
2234        events.push(GenericEvent::RequestClose);
2235
2236        events
2237    }
2238
2239    pub(crate) fn process_send_v5_0_auth(
2240        &mut self,
2241        packet: v5_0::Auth,
2242    ) -> Vec<GenericEvent<PacketIdType>> {
2243        if !self.validate_maximum_packet_size_send(packet.size()) {
2244            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2245        }
2246        if self.status == ConnectionStatus::Disconnected {
2247            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2248        }
2249
2250        let mut events = Vec::new();
2251        events.push(GenericEvent::RequestSendPacket {
2252            packet: packet.into(),
2253            release_packet_id_if_send_error: None,
2254        });
2255        self.send_post_process(&mut events);
2256
2257        events
2258    }
2259
2260    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2261        if self.is_client {
2262            // Priority 3
2263            let mut ms: u64 = self.pingreq_keep_alive_ms;
2264            if let Some(timeout_ms) = self.pingreq_user_send_interval_ms {
2265                // Priority 1
2266                ms = timeout_ms;
2267            } else if let Some(timeout_ms) = self.pingreq_server_keep_alive_ms {
2268                // Priority 2
2269                ms = timeout_ms;
2270            }
2271            if ms > 0 {
2272                self.pingreq_send_set = true;
2273                events.push(GenericEvent::RequestTimerReset {
2274                    kind: TimerKind::PingreqSend,
2275                    duration_ms: ms,
2276                });
2277            }
2278        }
2279    }
2280
2281    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2282        if size > self.maximum_packet_size_send as usize {
2283            error!("packet size over maximum_packet_size for sending");
2284            return false;
2285        }
2286        true
2287    }
2288
2289    #[rustfmt::skip]
2290    fn can_receive(&self, packet_type: u8) -> bool {
2291        !((Role::IS_CLIENT &&
2292            (
2293                packet_type == 1 || // CONNECT
2294                packet_type == 8 || // SUBSCRIBE
2295                packet_type == 10 || // UNSUBSCRIBE
2296                packet_type == 12 || // PINGREQ
2297                (packet_type == 14 && self.protocol_version == Version::V3_1_1) || // DISCONNECT
2298                (packet_type == 15 && self.protocol_version == Version::V3_1_1) // AUTH
2299            )
2300        ) || (Role::IS_SERVER &&
2301            (
2302                packet_type == 2 || // CONNACK
2303                packet_type == 9 || // SUBACK
2304                packet_type == 11 || // UNSUBACK
2305                packet_type == 13 || // PINGRESP
2306                (packet_type == 15 && self.protocol_version == Version::V3_1_1) // AUTH
2307            )
2308        ))
2309    }
2310
2311    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2312        let mut events = Vec::new();
2313
2314        // packet size limit validation (v3.1.1 is always satisfied)
2315        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2316        if total_size > self.maximum_packet_size_recv {
2317            // This happens only when protocol version is V5.0.
2318            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2319            // If the packet size is over 268434555, feed() return an error.
2320            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2321            // So DISCONNECT packet is the right choice to notify the error.
2322            let disconnect_packet = v5_0::Disconnect::builder()
2323                .reason_code(DisconnectReasonCode::PacketTooLarge)
2324                .build()
2325                .unwrap();
2326            // Send disconnect packet directly without generic constraints
2327            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2328            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2329            return events;
2330        }
2331
2332        let packet_type = raw_packet.packet_type();
2333        if !self.can_receive(packet_type) {
2334            events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2335            return events;
2336        }
2337
2338        let _flags = raw_packet.flags();
2339        match self.protocol_version {
2340            Version::V3_1_1 => {
2341                match packet_type {
2342                    1 => {
2343                        // CONNECT
2344                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2345                    }
2346                    2 => {
2347                        // CONNACK
2348                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2349                    }
2350                    3 => {
2351                        // PUBLISH
2352                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2353                    }
2354                    4 => {
2355                        // PUBACK
2356                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2357                    }
2358                    5 => {
2359                        // PUBREC
2360                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2361                    }
2362                    6 => {
2363                        // PUBREL
2364                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2365                    }
2366                    7 => {
2367                        // PUBCOMP
2368                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2369                    }
2370                    8 => {
2371                        // SUBSCRIBE
2372                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2373                    }
2374                    9 => {
2375                        // SUBACK
2376                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2377                    }
2378                    10 => {
2379                        // UNSUBSCRIBE
2380                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2381                    }
2382                    11 => {
2383                        // UNSUBACK
2384                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2385                    }
2386                    12 => {
2387                        // PINGREQ
2388                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2389                    }
2390                    13 => {
2391                        // PINGRESP
2392                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2393                    }
2394                    14 => {
2395                        // DISCONNECT
2396                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2397                    }
2398                    // invalid packet type
2399                    _ => {
2400                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2401                    }
2402                }
2403            }
2404            Version::V5_0 => {
2405                match packet_type {
2406                    1 => {
2407                        // CONNECT
2408                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2409                    }
2410                    2 => {
2411                        // CONNACK
2412                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2413                    }
2414                    3 => {
2415                        // PUBLISH
2416                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2417                    }
2418                    4 => {
2419                        // PUBACK
2420                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2421                    }
2422                    5 => {
2423                        // PUBREC
2424                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2425                    }
2426                    6 => {
2427                        // PUBREL
2428                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2429                    }
2430                    7 => {
2431                        // PUBCOMP
2432                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2433                    }
2434                    8 => {
2435                        // SUBSCRIBE
2436                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2437                    }
2438                    9 => {
2439                        // SUBACK
2440                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2441                    }
2442                    10 => {
2443                        // UNSUBSCRIBE
2444                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2445                    }
2446                    11 => {
2447                        // UNSUBACK
2448                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2449                    }
2450                    12 => {
2451                        // PINGREQ
2452                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2453                    }
2454                    13 => {
2455                        // PINGRESP
2456                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2457                    }
2458                    14 => {
2459                        // DISCONNECT
2460                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2461                    }
2462                    15 => {
2463                        // AUTH
2464                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2465                    }
2466                    // invalid packet type
2467                    _ => {
2468                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2469                    }
2470                }
2471            }
2472            Version::Undetermined => {
2473                match packet_type {
2474                    1 => {
2475                        // CONNECT
2476                        if raw_packet.remaining_length() < 7 {
2477                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2478                            return events;
2479                        }
2480                        match raw_packet.data_as_slice()[6] {
2481                            // Protocol Version
2482                            4 => {
2483                                self.protocol_version = Version::V3_1_1;
2484                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2485                            }
2486                            5 => {
2487                                self.protocol_version = Version::V5_0;
2488                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2489                            }
2490                            _ => {
2491                                events.push(GenericEvent::NotifyError(
2492                                    MqttError::UnsupportedProtocolVersion,
2493                                ));
2494                            }
2495                        }
2496                    }
2497                    _ => {
2498                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2499                    }
2500                }
2501            }
2502        }
2503
2504        events
2505    }
2506
2507    fn process_recv_v3_1_1_connect(
2508        &mut self,
2509        raw_packet: RawPacket,
2510    ) -> Vec<GenericEvent<PacketIdType>> {
2511        let mut events = Vec::new();
2512        if self.status != ConnectionStatus::Disconnected {
2513            Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2514            return events;
2515        }
2516        self.status = ConnectionStatus::Connecting;
2517        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2518            Ok((packet, _)) => {
2519                self.initialize(false);
2520                if packet.keep_alive() > 0 {
2521                    self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2522                }
2523                if packet.clean_session() {
2524                    self.clear_store_related();
2525                } else {
2526                    self.need_store = true;
2527                }
2528                events.extend(self.refresh_pingreq_recv());
2529                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2530            }
2531            Err(e) => {
2532                let rc = match e {
2533                    MqttError::ClientIdentifierNotValid => ConnectReturnCode::IdentifierRejected,
2534                    MqttError::BadUserNameOrPassword => ConnectReturnCode::BadUserNameOrPassword,
2535                    MqttError::UnsupportedProtocolVersion => {
2536                        ConnectReturnCode::UnacceptableProtocolVersion
2537                    }
2538                    _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2539                };
2540                let connack = v3_1_1::Connack::builder()
2541                    .return_code(rc)
2542                    .session_present(false)
2543                    .build()
2544                    .unwrap();
2545                let connack_events = self.process_send_v3_1_1_connack(connack);
2546                events.extend(connack_events);
2547                events.push(GenericEvent::NotifyError(e));
2548            }
2549        }
2550
2551        events
2552    }
2553
2554    fn process_recv_v5_0_connect(
2555        &mut self,
2556        raw_packet: RawPacket,
2557    ) -> Vec<GenericEvent<PacketIdType>> {
2558        let mut events = Vec::new();
2559        if self.status != ConnectionStatus::Disconnected {
2560            self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
2561            return events;
2562        }
2563        self.status = ConnectionStatus::Connecting;
2564        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2565            Ok((packet, _)) => {
2566                self.initialize(false);
2567                if packet.keep_alive() > 0 {
2568                    self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2569                }
2570                if packet.clean_start() {
2571                    self.clear_store_related();
2572                }
2573                packet.props().iter().for_each(|prop| match prop {
2574                    Property::TopicAliasMaximum(p) => {
2575                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2576                    }
2577                    Property::ReceiveMaximum(p) => {
2578                        self.publish_send_max = Some(p.val());
2579                    }
2580                    Property::MaximumPacketSize(p) => {
2581                        self.maximum_packet_size_send = p.val();
2582                    }
2583                    Property::SessionExpiryInterval(p) => {
2584                        if p.val() != 0 {
2585                            self.need_store = true;
2586                        }
2587                    }
2588                    _ => {}
2589                });
2590                events.extend(self.refresh_pingreq_recv());
2591                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2592            }
2593            Err(e) => {
2594                let rc = match e {
2595                    MqttError::ClientIdentifierNotValid => {
2596                        ConnectReasonCode::ClientIdentifierNotValid
2597                    }
2598                    MqttError::BadUserNameOrPassword => ConnectReasonCode::BadUserNameOrPassword,
2599                    MqttError::UnsupportedProtocolVersion => {
2600                        ConnectReasonCode::UnsupportedProtocolVersion
2601                    }
2602                    _ => ConnectReasonCode::UnspecifiedError,
2603                };
2604                let connack = v5_0::Connack::builder()
2605                    .reason_code(rc)
2606                    .session_present(false)
2607                    .build()
2608                    .unwrap();
2609                let connack_events = self.process_send_v5_0_connack(connack);
2610                events.extend(connack_events);
2611                events.push(GenericEvent::NotifyError(e));
2612            }
2613        }
2614
2615        events
2616    }
2617
2618    fn process_recv_v3_1_1_connack(
2619        &mut self,
2620        raw_packet: RawPacket,
2621    ) -> Vec<GenericEvent<PacketIdType>> {
2622        let mut events = Vec::new();
2623
2624        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2625            Ok((packet, _consumed)) => {
2626                if packet.return_code() == ConnectReturnCode::Accepted {
2627                    self.status = ConnectionStatus::Connected;
2628                    if packet.session_present() {
2629                        events.extend(self.send_stored());
2630                    } else {
2631                        self.clear_store_related();
2632                    }
2633                }
2634                events.push(GenericEvent::NotifyPacketReceived(
2635                    GenericPacket::V3_1_1Connack(packet),
2636                ));
2637            }
2638            Err(e) => {
2639                Self::handle_v3_1_1_error(e, &mut events);
2640            }
2641        }
2642
2643        events
2644    }
2645
2646    fn process_recv_v5_0_connack(
2647        &mut self,
2648        raw_packet: RawPacket,
2649    ) -> Vec<GenericEvent<PacketIdType>> {
2650        let mut events = Vec::new();
2651
2652        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2653            Ok((packet, _consumed)) => {
2654                if packet.reason_code() == ConnectReasonCode::Success {
2655                    self.status = ConnectionStatus::Connected;
2656
2657                    // Process properties
2658                    for prop in packet.props() {
2659                        match prop {
2660                            Property::TopicAliasMaximum(val) => {
2661                                if val.val() > 0 {
2662                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2663                                }
2664                            }
2665                            Property::ReceiveMaximum(val) => {
2666                                assert!(val.val() != 0);
2667                                self.publish_send_max = Some(val.val());
2668                            }
2669                            Property::MaximumPacketSize(val) => {
2670                                assert!(val.val() != 0);
2671                                self.maximum_packet_size_send = val.val();
2672                            }
2673                            Property::ServerKeepAlive(val) => {
2674                                let val = val.val() as u64 * 1000;
2675                                self.pingreq_server_keep_alive_ms = Some(val);
2676                                if self.pingreq_user_send_interval_ms.is_none() {
2677                                    if val == 0 {
2678                                        if self.pingreq_send_set {
2679                                            self.pingreq_send_set = false;
2680                                            events.push(GenericEvent::RequestTimerCancel(
2681                                                TimerKind::PingreqSend,
2682                                            ));
2683                                        }
2684                                        self.pingreq_user_send_interval_ms = None;
2685                                    } else {
2686                                        self.pingreq_send_set = true;
2687                                        events.push(GenericEvent::RequestTimerReset {
2688                                            kind: TimerKind::PingreqSend,
2689                                            duration_ms: val,
2690                                        });
2691                                    }
2692                                }
2693                            }
2694                            _ => {
2695                                // Ignore other properties
2696                            }
2697                        }
2698                    }
2699
2700                    if packet.session_present() {
2701                        events.extend(self.send_stored());
2702                    } else {
2703                        self.clear_store_related();
2704                    }
2705                }
2706                events.push(GenericEvent::NotifyPacketReceived(
2707                    GenericPacket::V5_0Connack(packet),
2708                ));
2709            }
2710            Err(e) => {
2711                if self.status == ConnectionStatus::Connected {
2712                    self.handle_v5_0_error(e, &mut events);
2713                } else {
2714                    events.push(GenericEvent::NotifyError(e));
2715                }
2716            }
2717        }
2718
2719        events
2720    }
2721
2722    fn process_recv_v3_1_1_publish(
2723        &mut self,
2724        raw_packet: RawPacket,
2725    ) -> Vec<GenericEvent<PacketIdType>> {
2726        let mut events = Vec::new();
2727
2728        let flags = raw_packet.flags();
2729        match &raw_packet.data {
2730            PacketData::Publish(arc) => {
2731                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2732                    Ok((packet, _consumed)) => {
2733                        match packet.qos() {
2734                            Qos::AtMostOnce => {
2735                                events.extend(self.refresh_pingreq_recv());
2736                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2737                            }
2738                            Qos::AtLeastOnce => {
2739                                let packet_id = packet.packet_id().unwrap();
2740                                if self.status == ConnectionStatus::Connected
2741                                    && self.auto_pub_response
2742                                {
2743                                    // Send PUBACK automatically
2744                                    let puback = v3_1_1::GenericPuback::builder()
2745                                        .packet_id(packet_id)
2746                                        .build()
2747                                        .unwrap();
2748                                    events.extend(self.process_send_v3_1_1_puback(puback));
2749                                }
2750                                events.extend(self.refresh_pingreq_recv());
2751                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2752                            }
2753                            Qos::ExactlyOnce => {
2754                                let packet_id = packet.packet_id().unwrap();
2755                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2756
2757                                if self.status == ConnectionStatus::Connected
2758                                    && (self.auto_pub_response || already_handled)
2759                                {
2760                                    let pubrec = v3_1_1::GenericPubrec::builder()
2761                                        .packet_id(packet_id)
2762                                        .build()
2763                                        .unwrap();
2764                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2765                                }
2766                                events.extend(self.refresh_pingreq_recv());
2767                                if !already_handled {
2768                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2769                                }
2770                            }
2771                        }
2772                    }
2773                    Err(e) => {
2774                        Self::handle_v3_1_1_error(e, &mut events);
2775                    }
2776                }
2777            }
2778            PacketData::Normal(_) => {
2779                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2780            }
2781        }
2782
2783        events
2784    }
2785
2786    fn process_recv_v5_0_publish(
2787        &mut self,
2788        raw_packet: RawPacket,
2789    ) -> Vec<GenericEvent<PacketIdType>> {
2790        let mut events = Vec::new();
2791
2792        let flags = raw_packet.flags();
2793        match &raw_packet.data {
2794            PacketData::Publish(arc) => {
2795                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2796                    Ok((mut packet, _consumed)) => {
2797                        let mut already_handled = false;
2798                        let mut puback_send = false;
2799                        let mut pubrec_send = false;
2800
2801                        let mut check_receive_maximum =
2802                            |events: &mut Vec<GenericEvent<PacketIdType>>| {
2803                                if let Some(max) = self.publish_recv_max {
2804                                    if self.publish_recv.len() >= max as usize {
2805                                        self.handle_v5_0_error(
2806                                            MqttError::ReceiveMaximumExceeded,
2807                                            events,
2808                                        );
2809                                        return false;
2810                                    }
2811                                }
2812                                true
2813                            };
2814
2815                        match packet.qos() {
2816                            Qos::AtLeastOnce => {
2817                                let packet_id = packet.packet_id().unwrap();
2818                                if !check_receive_maximum(&mut events) {
2819                                    return events;
2820                                }
2821                                self.publish_recv.insert(packet_id);
2822                                if self.auto_pub_response
2823                                    && self.status == ConnectionStatus::Connected
2824                                {
2825                                    puback_send = true;
2826                                }
2827                            }
2828                            Qos::ExactlyOnce => {
2829                                let packet_id = packet.packet_id().unwrap();
2830                                if !check_receive_maximum(&mut events) {
2831                                    return events;
2832                                }
2833                                self.publish_recv.insert(packet_id);
2834
2835                                if !self.qos2_publish_handled.insert(packet_id) {
2836                                    already_handled = true;
2837                                }
2838                                if self.status == ConnectionStatus::Connected
2839                                    && (self.auto_pub_response || already_handled)
2840                                {
2841                                    pubrec_send = true;
2842                                }
2843                            }
2844                            Qos::AtMostOnce => {
2845                                // No packet ID handling for QoS 0
2846                            }
2847                        }
2848
2849                        // Topic Alias handling
2850                        if packet.topic_name().is_empty() {
2851                            // Extract topic from topic_alias
2852                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2853                                if ta == 0
2854                                    || self.topic_alias_recv.is_none()
2855                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2856                                {
2857                                    self.handle_v5_0_error(
2858                                        MqttError::TopicAliasInvalid,
2859                                        &mut events,
2860                                    );
2861                                    return events;
2862                                }
2863
2864                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2865                                    if let Some(topic_name) = topic_alias_recv.get(ta) {
2866                                        match packet.add_extracted_topic_name(topic_name) {
2867                                            Ok(extracted) => {
2868                                                packet = extracted;
2869                                            }
2870                                            Err(_e) => {
2871                                                error!("topic alias extract failed: {_e}");
2872                                                self.handle_v5_0_error(
2873                                                    MqttError::TopicAliasInvalid,
2874                                                    &mut events,
2875                                                );
2876                                                return events;
2877                                            }
2878                                        }
2879                                    } else {
2880                                        self.handle_v5_0_error(
2881                                            MqttError::TopicAliasInvalid,
2882                                            &mut events,
2883                                        );
2884                                        return events;
2885                                    }
2886                                }
2887                            } else {
2888                                self.handle_v5_0_error(MqttError::TopicAliasInvalid, &mut events);
2889                                return events;
2890                            }
2891                        } else {
2892                            // Topic is not empty, check if topic alias needs to be registered
2893                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2894                                if ta == 0
2895                                    || self.topic_alias_recv.is_none()
2896                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2897                                {
2898                                    self.handle_v5_0_error(
2899                                        MqttError::TopicAliasInvalid,
2900                                        &mut events,
2901                                    );
2902                                    return events;
2903                                }
2904                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2905                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2906                                }
2907                            }
2908                        }
2909
2910                        // Send response packets
2911                        if puback_send {
2912                            let puback = v5_0::GenericPuback::builder()
2913                                .packet_id(packet.packet_id().unwrap())
2914                                .build()
2915                                .unwrap();
2916                            events.extend(self.process_send_v5_0_puback(puback));
2917                        }
2918                        if pubrec_send {
2919                            let pubrec = v5_0::GenericPubrec::builder()
2920                                .packet_id(packet.packet_id().unwrap())
2921                                .build()
2922                                .unwrap();
2923                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2924                        }
2925
2926                        // Refresh PINGREQ receive timer
2927                        events.extend(self.refresh_pingreq_recv());
2928
2929                        // Notify packet received (only if not already handled)
2930                        if !already_handled {
2931                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2932                        }
2933                    }
2934                    Err(e) => {
2935                        if self.status == ConnectionStatus::Connected {
2936                            self.handle_v5_0_error(e, &mut events);
2937                        } else {
2938                            events.push(GenericEvent::NotifyError(e));
2939                        }
2940                    }
2941                }
2942            }
2943            PacketData::Normal(_) => {
2944                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2945            }
2946        }
2947
2948        events
2949    }
2950
2951    fn process_recv_v3_1_1_puback(
2952        &mut self,
2953        raw_packet: RawPacket,
2954    ) -> Vec<GenericEvent<PacketIdType>> {
2955        let mut events = Vec::new();
2956
2957        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2958            Ok((packet, _)) => {
2959                let packet_id = packet.packet_id();
2960                if self.pid_puback.remove(&packet_id) {
2961                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2962                    if self.pid_man.is_used_id(packet_id) {
2963                        self.pid_man.release_id(packet_id);
2964                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2965                    }
2966                    events.extend(self.refresh_pingreq_recv());
2967                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2968                } else {
2969                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2970                }
2971            }
2972            Err(e) => {
2973                Self::handle_v3_1_1_error(e, &mut events);
2974            }
2975        }
2976
2977        events
2978    }
2979
2980    fn process_recv_v5_0_puback(
2981        &mut self,
2982        raw_packet: RawPacket,
2983    ) -> Vec<GenericEvent<PacketIdType>> {
2984        let mut events = Vec::new();
2985
2986        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2987            Ok((packet, _)) => {
2988                let packet_id = packet.packet_id();
2989                if self.pid_puback.remove(&packet_id) {
2990                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2991                    if self.pid_man.is_used_id(packet_id) {
2992                        self.pid_man.release_id(packet_id);
2993                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2994                    }
2995                    if self.publish_send_max.is_some() {
2996                        self.publish_send_count -= 1;
2997                    }
2998                    events.extend(self.refresh_pingreq_recv());
2999                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3000                } else {
3001                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3002                }
3003            }
3004            Err(e) => {
3005                self.handle_v5_0_error(e, &mut events);
3006            }
3007        }
3008
3009        events
3010    }
3011
3012    fn process_recv_v3_1_1_pubrec(
3013        &mut self,
3014        raw_packet: RawPacket,
3015    ) -> Vec<GenericEvent<PacketIdType>> {
3016        let mut events = Vec::new();
3017
3018        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3019            Ok((packet, _)) => {
3020                let packet_id = packet.packet_id();
3021                if self.pid_pubrec.remove(&packet_id) {
3022                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
3023                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3024                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
3025                            .packet_id(packet_id)
3026                            .build()
3027                            .unwrap();
3028                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
3029                    }
3030                    events.extend(self.refresh_pingreq_recv());
3031                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3032                } else {
3033                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3034                }
3035            }
3036            Err(e) => {
3037                Self::handle_v3_1_1_error(e, &mut events);
3038            }
3039        }
3040
3041        events
3042    }
3043
3044    fn process_recv_v5_0_pubrec(
3045        &mut self,
3046        raw_packet: RawPacket,
3047    ) -> Vec<GenericEvent<PacketIdType>> {
3048        let mut events = Vec::new();
3049
3050        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3051            Ok((packet, _)) => {
3052                let packet_id = packet.packet_id();
3053                if self.pid_pubrec.remove(&packet_id) {
3054                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3055                    let reason_code = packet.reason_code();
3056                    if reason_code.is_none() || reason_code.unwrap() == PubrecReasonCode::Success {
3057                        if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3058                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3059                                .packet_id(packet_id)
3060                                .build()
3061                                .unwrap();
3062                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3063                        }
3064                    } else {
3065                        if self.pid_man.is_used_id(packet_id) {
3066                            self.pid_man.release_id(packet_id);
3067                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3068                        }
3069                        self.qos2_publish_processing.remove(&packet_id);
3070                        if self.publish_send_max.is_some() {
3071                            self.publish_send_count -= 1;
3072                        }
3073                    }
3074                    events.extend(self.refresh_pingreq_recv());
3075                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3076                } else {
3077                    self.handle_v5_0_error(
3078                        MqttError::from(DisconnectReasonCode::ProtocolError),
3079                        &mut events,
3080                    );
3081                }
3082            }
3083            Err(e) => {
3084                self.handle_v5_0_error(e, &mut events);
3085            }
3086        }
3087
3088        events
3089    }
3090
3091    fn process_recv_v3_1_1_pubrel(
3092        &mut self,
3093        raw_packet: RawPacket,
3094    ) -> Vec<GenericEvent<PacketIdType>> {
3095        let mut events = Vec::new();
3096
3097        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3098            Ok((packet, _)) => {
3099                let packet_id = packet.packet_id();
3100                self.qos2_publish_handled.remove(&packet_id);
3101                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3102                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3103                        .packet_id(packet_id)
3104                        .build()
3105                        .unwrap();
3106                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3107                }
3108                events.extend(self.refresh_pingreq_recv());
3109                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3110            }
3111            Err(e) => {
3112                Self::handle_v3_1_1_error(e, &mut events);
3113            }
3114        }
3115
3116        events
3117    }
3118
3119    fn process_recv_v5_0_pubrel(
3120        &mut self,
3121        raw_packet: RawPacket,
3122    ) -> Vec<GenericEvent<PacketIdType>> {
3123        let mut events = Vec::new();
3124
3125        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3126            Ok((packet, _)) => {
3127                let packet_id = packet.packet_id();
3128                self.qos2_publish_handled.remove(&packet_id);
3129                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3130                    let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3131                        .packet_id(packet_id)
3132                        .build()
3133                        .unwrap();
3134                    events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3135                }
3136                events.extend(self.refresh_pingreq_recv());
3137                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3138            }
3139            Err(e) => {
3140                self.handle_v5_0_error(e, &mut events);
3141            }
3142        }
3143
3144        events
3145    }
3146
3147    fn process_recv_v3_1_1_pubcomp(
3148        &mut self,
3149        raw_packet: RawPacket,
3150    ) -> Vec<GenericEvent<PacketIdType>> {
3151        let mut events = Vec::new();
3152
3153        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3154            Ok((packet, _)) => {
3155                let packet_id = packet.packet_id();
3156                if self.pid_pubcomp.remove(&packet_id) {
3157                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3158                    if self.pid_man.is_used_id(packet_id) {
3159                        self.pid_man.release_id(packet_id);
3160                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3161                    }
3162                    self.qos2_publish_processing.remove(&packet_id);
3163                    events.extend(self.refresh_pingreq_recv());
3164                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3165                } else {
3166                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3167                }
3168            }
3169            Err(e) => {
3170                Self::handle_v3_1_1_error(e, &mut events);
3171            }
3172        }
3173
3174        events
3175    }
3176
3177    fn process_recv_v5_0_pubcomp(
3178        &mut self,
3179        raw_packet: RawPacket,
3180    ) -> Vec<GenericEvent<PacketIdType>> {
3181        let mut events = Vec::new();
3182
3183        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3184            Ok((packet, _)) => {
3185                let packet_id = packet.packet_id();
3186                if self.pid_pubcomp.remove(&packet_id) {
3187                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3188                    if self.pid_man.is_used_id(packet_id) {
3189                        self.pid_man.release_id(packet_id);
3190                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3191                    }
3192                    self.qos2_publish_processing.remove(&packet_id);
3193                    if self.publish_send_max.is_some() {
3194                        self.publish_send_count -= 1;
3195                    }
3196                    events.extend(self.refresh_pingreq_recv());
3197                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3198                } else {
3199                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3200                }
3201            }
3202            Err(e) => {
3203                self.handle_v5_0_error(e, &mut events);
3204            }
3205        }
3206
3207        events
3208    }
3209
3210    fn process_recv_v3_1_1_subscribe(
3211        &mut self,
3212        raw_packet: RawPacket,
3213    ) -> Vec<GenericEvent<PacketIdType>> {
3214        let mut events = Vec::new();
3215
3216        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3217            Ok((packet, _)) => {
3218                events.extend(self.refresh_pingreq_recv());
3219                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3220            }
3221            Err(e) => {
3222                Self::handle_v3_1_1_error(e, &mut events);
3223            }
3224        }
3225
3226        events
3227    }
3228
3229    fn process_recv_v5_0_subscribe(
3230        &mut self,
3231        raw_packet: RawPacket,
3232    ) -> Vec<GenericEvent<PacketIdType>> {
3233        let mut events = Vec::new();
3234
3235        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3236            Ok((packet, _)) => {
3237                events.extend(self.refresh_pingreq_recv());
3238                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3239            }
3240            Err(e) => {
3241                self.handle_v5_0_error(e, &mut events);
3242            }
3243        }
3244
3245        events
3246    }
3247
3248    fn process_recv_v3_1_1_suback(
3249        &mut self,
3250        raw_packet: RawPacket,
3251    ) -> Vec<GenericEvent<PacketIdType>> {
3252        let mut events = Vec::new();
3253
3254        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3255            Ok((packet, _)) => {
3256                let packet_id = packet.packet_id();
3257                if self.pid_suback.remove(&packet_id) {
3258                    if self.pid_man.is_used_id(packet_id) {
3259                        self.pid_man.release_id(packet_id);
3260                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3261                    }
3262                    events.extend(self.refresh_pingreq_recv());
3263                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3264                } else {
3265                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3266                }
3267            }
3268            Err(e) => {
3269                Self::handle_v3_1_1_error(e, &mut events);
3270            }
3271        }
3272
3273        events
3274    }
3275
3276    fn process_recv_v5_0_suback(
3277        &mut self,
3278        raw_packet: RawPacket,
3279    ) -> Vec<GenericEvent<PacketIdType>> {
3280        let mut events = Vec::new();
3281
3282        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3283            Ok((packet, _)) => {
3284                let packet_id = packet.packet_id();
3285                if self.pid_suback.remove(&packet_id) {
3286                    if self.pid_man.is_used_id(packet_id) {
3287                        self.pid_man.release_id(packet_id);
3288                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3289                    }
3290                    events.extend(self.refresh_pingreq_recv());
3291                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3292                } else {
3293                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3294                }
3295            }
3296            Err(e) => {
3297                self.handle_v5_0_error(e, &mut events);
3298            }
3299        }
3300
3301        events
3302    }
3303
3304    fn process_recv_v3_1_1_unsubscribe(
3305        &mut self,
3306        raw_packet: RawPacket,
3307    ) -> Vec<GenericEvent<PacketIdType>> {
3308        let mut events = Vec::new();
3309
3310        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3311            Ok((packet, _)) => {
3312                events.extend(self.refresh_pingreq_recv());
3313                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3314            }
3315            Err(e) => {
3316                Self::handle_v3_1_1_error(e, &mut events);
3317            }
3318        }
3319
3320        events
3321    }
3322
3323    fn process_recv_v5_0_unsubscribe(
3324        &mut self,
3325        raw_packet: RawPacket,
3326    ) -> Vec<GenericEvent<PacketIdType>> {
3327        let mut events = Vec::new();
3328
3329        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3330            Ok((packet, _)) => {
3331                events.extend(self.refresh_pingreq_recv());
3332                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3333            }
3334            Err(e) => {
3335                self.handle_v5_0_error(e, &mut events);
3336            }
3337        }
3338
3339        events
3340    }
3341
3342    fn process_recv_v3_1_1_unsuback(
3343        &mut self,
3344        raw_packet: RawPacket,
3345    ) -> Vec<GenericEvent<PacketIdType>> {
3346        let mut events = Vec::new();
3347
3348        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3349            Ok((packet, _)) => {
3350                let packet_id = packet.packet_id();
3351                if self.pid_unsuback.remove(&packet_id) {
3352                    if self.pid_man.is_used_id(packet_id) {
3353                        self.pid_man.release_id(packet_id);
3354                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3355                    }
3356                    events.extend(self.refresh_pingreq_recv());
3357                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3358                } else {
3359                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3360                }
3361            }
3362            Err(e) => {
3363                Self::handle_v3_1_1_error(e, &mut events);
3364            }
3365        }
3366
3367        events
3368    }
3369
3370    fn process_recv_v5_0_unsuback(
3371        &mut self,
3372        raw_packet: RawPacket,
3373    ) -> Vec<GenericEvent<PacketIdType>> {
3374        let mut events = Vec::new();
3375
3376        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3377            Ok((packet, _)) => {
3378                let packet_id = packet.packet_id();
3379                if self.pid_unsuback.remove(&packet_id) {
3380                    if self.pid_man.is_used_id(packet_id) {
3381                        self.pid_man.release_id(packet_id);
3382                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3383                    }
3384                    events.extend(self.refresh_pingreq_recv());
3385                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3386                } else {
3387                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3388                }
3389            }
3390            Err(e) => {
3391                self.handle_v5_0_error(e, &mut events);
3392            }
3393        }
3394
3395        events
3396    }
3397
3398    fn process_recv_v3_1_1_pingreq(
3399        &mut self,
3400        raw_packet: RawPacket,
3401    ) -> Vec<GenericEvent<PacketIdType>> {
3402        let mut events = Vec::new();
3403
3404        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3405            Ok((packet, _)) => {
3406                if (Role::IS_SERVER || Role::IS_ANY)
3407                    && !self.is_client
3408                    && self.auto_ping_response
3409                    && self.status == ConnectionStatus::Connected
3410                {
3411                    let pingresp = v3_1_1::Pingresp::new();
3412                    events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3413                }
3414                events.extend(self.refresh_pingreq_recv());
3415                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3416            }
3417            Err(e) => {
3418                Self::handle_v3_1_1_error(e, &mut events);
3419            }
3420        }
3421
3422        events
3423    }
3424
3425    fn process_recv_v5_0_pingreq(
3426        &mut self,
3427        raw_packet: RawPacket,
3428    ) -> Vec<GenericEvent<PacketIdType>> {
3429        let mut events = Vec::new();
3430
3431        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3432            Ok((packet, _)) => {
3433                if (Role::IS_SERVER || Role::IS_ANY)
3434                    && !self.is_client
3435                    && self.auto_ping_response
3436                    && self.status == ConnectionStatus::Connected
3437                {
3438                    let pingresp = v5_0::Pingresp::new();
3439                    events.extend(self.process_send_v5_0_pingresp(pingresp));
3440                }
3441                events.extend(self.refresh_pingreq_recv());
3442                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3443            }
3444            Err(e) => {
3445                self.handle_v5_0_error(e, &mut events);
3446            }
3447        }
3448
3449        events
3450    }
3451
3452    fn process_recv_v3_1_1_pingresp(
3453        &mut self,
3454        raw_packet: RawPacket,
3455    ) -> Vec<GenericEvent<PacketIdType>> {
3456        let mut events = Vec::new();
3457
3458        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3459            Ok((packet, _)) => {
3460                if self.pingresp_recv_set {
3461                    self.pingresp_recv_set = false;
3462                    events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3463                }
3464                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3465            }
3466            Err(e) => {
3467                Self::handle_v3_1_1_error(e, &mut events);
3468            }
3469        }
3470
3471        events
3472    }
3473
3474    fn process_recv_v5_0_pingresp(
3475        &mut self,
3476        raw_packet: RawPacket,
3477    ) -> Vec<GenericEvent<PacketIdType>> {
3478        let mut events = Vec::new();
3479
3480        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3481            Ok((packet, _)) => {
3482                if self.pingresp_recv_set {
3483                    self.pingresp_recv_set = false;
3484                    events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3485                }
3486                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3487            }
3488            Err(e) => {
3489                self.handle_v5_0_error(e, &mut events);
3490            }
3491        }
3492
3493        events
3494    }
3495
3496    fn process_recv_v3_1_1_disconnect(
3497        &mut self,
3498        raw_packet: RawPacket,
3499    ) -> Vec<GenericEvent<PacketIdType>> {
3500        let mut events = Vec::new();
3501
3502        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3503            Ok((packet, _)) => {
3504                self.cancel_timers(&mut events);
3505                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3506            }
3507            Err(e) => {
3508                Self::handle_v3_1_1_error(e, &mut events);
3509            }
3510        }
3511
3512        events
3513    }
3514
3515    fn process_recv_v5_0_disconnect(
3516        &mut self,
3517        raw_packet: RawPacket,
3518    ) -> Vec<GenericEvent<PacketIdType>> {
3519        let mut events = Vec::new();
3520
3521        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3522            Ok((packet, _)) => {
3523                self.cancel_timers(&mut events);
3524                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3525            }
3526            Err(e) => {
3527                self.handle_v5_0_error(e, &mut events);
3528            }
3529        }
3530
3531        events
3532    }
3533
3534    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3535        let mut events = Vec::new();
3536
3537        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3538            Ok((packet, _)) => {
3539                events.extend(self.refresh_pingreq_recv());
3540                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3541            }
3542            Err(e) => {
3543                self.handle_v5_0_error(e, &mut events);
3544            }
3545        }
3546
3547        events
3548    }
3549
3550    fn handle_v3_1_1_error(e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3551        events.push(GenericEvent::RequestClose);
3552        events.push(GenericEvent::NotifyError(e));
3553    }
3554
3555    fn handle_v5_0_error(&mut self, e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3556        let disconnect = v5_0::Disconnect::builder()
3557            .reason_code(e.into())
3558            .build()
3559            .unwrap();
3560        events.extend(self.process_send_v5_0_disconnect(disconnect));
3561        events.push(GenericEvent::NotifyError(e));
3562    }
3563
3564    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3565        let mut events = Vec::new();
3566        if self.pingreq_recv_timeout_ms != 0 {
3567            self.pingreq_recv_set = true;
3568            events.push(GenericEvent::RequestTimerReset {
3569                kind: TimerKind::PingreqRecv,
3570                duration_ms: self.pingreq_recv_timeout_ms,
3571            });
3572        }
3573
3574        events
3575    }
3576
3577    /// Cancel timers and collect events instead of calling handlers
3578    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3579        if self.pingreq_send_set {
3580            self.pingreq_send_set = false;
3581            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3582        }
3583        if self.pingreq_recv_set {
3584            self.pingreq_recv_set = false;
3585            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3586        }
3587        if self.pingresp_recv_set {
3588            self.pingresp_recv_set = false;
3589            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3590        }
3591    }
3592
3593    /// Helper function to extract TopicAlias from properties
3594    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3595        for prop in props {
3596            if let Property::TopicAlias(ta) = prop {
3597                return Some(ta.val());
3598            }
3599        }
3600        None
3601    }
3602}
3603
3604// tests
3605
3606#[cfg(test)]
3607mod tests {
3608    use super::*;
3609    use crate::mqtt::connection::version::Version;
3610    use crate::mqtt::packet::TopicAliasSend;
3611    use crate::mqtt::role;
3612
3613    #[test]
3614    fn test_initialize_client_mode() {
3615        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3616
3617        // Initialize in client mode
3618        connection.initialize(true);
3619
3620        // Verify client mode is set
3621        assert!(connection.is_client);
3622        assert_eq!(connection.publish_send_count, 0);
3623        assert!(connection.publish_send_max.is_none());
3624        assert!(connection.publish_recv_max.is_none());
3625        assert!(!connection.need_store);
3626    }
3627
3628    #[test]
3629    fn test_initialize_server_mode() {
3630        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3631
3632        // Initialize in server mode
3633        connection.initialize(false);
3634
3635        // Verify server mode is set
3636        assert!(!connection.is_client);
3637        assert_eq!(connection.publish_send_count, 0);
3638        assert!(connection.publish_send_max.is_none());
3639        assert!(connection.publish_recv_max.is_none());
3640        assert!(!connection.need_store);
3641    }
3642
3643    #[test]
3644    fn test_validate_topic_alias_no_topic_alias_send() {
3645        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3646
3647        // Should return None when topic_alias_send is not configured
3648        let result = connection.validate_topic_alias(Some(1));
3649        assert!(result.is_none());
3650    }
3651
3652    #[test]
3653    fn test_validate_topic_alias_none_input() {
3654        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3655
3656        // Should return None when no topic alias is provided
3657        let result = connection.validate_topic_alias(None);
3658        assert!(result.is_none());
3659    }
3660
3661    #[test]
3662    fn test_validate_topic_alias_range_no_topic_alias_send() {
3663        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3664
3665        // Should return false when topic_alias_send is not configured
3666        let result = connection.validate_topic_alias_range(1);
3667        assert!(!result);
3668    }
3669
3670    #[test]
3671    fn test_validate_topic_alias_range_zero() {
3672        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3673
3674        // Set up topic alias send with max 10
3675        let topic_alias_send = TopicAliasSend::new(10);
3676        connection.topic_alias_send = Some(topic_alias_send);
3677
3678        // Should return false for alias 0
3679        let result = connection.validate_topic_alias_range(0);
3680        assert!(!result);
3681    }
3682
3683    #[test]
3684    fn test_validate_topic_alias_range_over_max() {
3685        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3686
3687        // Set up topic alias send with max 5
3688        let topic_alias_send = TopicAliasSend::new(5);
3689        connection.topic_alias_send = Some(topic_alias_send);
3690
3691        // Should return false for alias > max
3692        let result = connection.validate_topic_alias_range(6);
3693        assert!(!result);
3694    }
3695
3696    #[test]
3697    fn test_validate_topic_alias_range_valid() {
3698        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3699
3700        // Set up topic alias send with max 10
3701        let topic_alias_send = TopicAliasSend::new(10);
3702        connection.topic_alias_send = Some(topic_alias_send);
3703
3704        // Should return true for valid aliases
3705        assert!(connection.validate_topic_alias_range(1));
3706        assert!(connection.validate_topic_alias_range(5));
3707        assert!(connection.validate_topic_alias_range(10));
3708    }
3709
3710    #[test]
3711    fn test_validate_topic_alias_with_registered_alias() {
3712        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3713
3714        // Set up topic alias send with max 10
3715        let mut topic_alias_send = TopicAliasSend::new(10);
3716        topic_alias_send.insert_or_update("test/topic", 5);
3717        connection.topic_alias_send = Some(topic_alias_send);
3718
3719        // Should return the topic name for registered alias
3720        let result = connection.validate_topic_alias(Some(5));
3721        assert_eq!(result, Some("test/topic".to_string()));
3722    }
3723
3724    #[test]
3725    fn test_validate_topic_alias_unregistered_alias() {
3726        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3727
3728        // Set up topic alias send with max 10 but don't register any aliases
3729        let topic_alias_send = TopicAliasSend::new(10);
3730        connection.topic_alias_send = Some(topic_alias_send);
3731
3732        // Should return None for unregistered alias
3733        let result = connection.validate_topic_alias(Some(5));
3734        assert!(result.is_none());
3735    }
3736
3737    #[test]
3738    fn test_validate_maximum_packet_size_within_limit() {
3739        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3740
3741        // Default maximum_packet_size_send is u32::MAX
3742        let result = connection.validate_maximum_packet_size_send(1000);
3743        assert!(result);
3744    }
3745
3746    #[test]
3747    fn test_validate_maximum_packet_size_at_limit() {
3748        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3749
3750        // Set a specific limit
3751        connection.maximum_packet_size_send = 1000;
3752
3753        // Should return true for size equal to limit
3754        let result = connection.validate_maximum_packet_size_send(1000);
3755        assert!(result);
3756    }
3757
3758    #[test]
3759    fn test_validate_maximum_packet_size_over_limit() {
3760        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3761
3762        // Set a specific limit
3763        connection.maximum_packet_size_send = 1000;
3764
3765        // Should return false for size over limit
3766        let result = connection.validate_maximum_packet_size_send(1001);
3767        assert!(!result);
3768    }
3769
3770    #[test]
3771    fn test_validate_maximum_packet_size_zero_limit() {
3772        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3773
3774        // Set limit to 0
3775        connection.maximum_packet_size_send = 0;
3776
3777        // Should return false for any non-zero size
3778        let result = connection.validate_maximum_packet_size_send(1);
3779        assert!(!result);
3780
3781        // Should return true for zero size
3782        let result = connection.validate_maximum_packet_size_send(0);
3783        assert!(result);
3784    }
3785
3786    #[test]
3787    fn test_initialize_clears_state() {
3788        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3789
3790        // Set up some state that should be cleared
3791        connection.publish_send_count = 5;
3792        connection.need_store = true;
3793        connection.pid_suback.insert(123);
3794        connection.pid_unsuback.insert(456);
3795
3796        // Initialize should clear state
3797        connection.initialize(true);
3798
3799        // Verify state is cleared
3800        assert_eq!(connection.publish_send_count, 0);
3801        assert!(!connection.need_store);
3802        assert!(connection.pid_suback.is_empty());
3803        assert!(connection.pid_unsuback.is_empty());
3804        assert!(connection.is_client);
3805    }
3806
3807    #[test]
3808    fn test_remaining_length_to_total_size() {
3809        // Test 1-byte remaining length encoding (0-127)
3810        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3811        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3812
3813        // Test 2-byte remaining length encoding (128-16383)
3814        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3815        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3816
3817        // Test 3-byte remaining length encoding (16384-2097151)
3818        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3819        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3820
3821        // Test 4-byte remaining length encoding (2097152-268435455)
3822        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3823        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3824    }
3825}