mqtt_protocol_core/mqtt/connection/
core.rs

1// MIT License
2//
3// Copyright (c) 2025 Takatoshi Kondo
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to deal
7// in the Software without restriction, including without limitation the rights
8// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9// copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in all
13// copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21// SOFTWARE.
22use alloc::{
23    string::{String, ToString},
24    vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33use crate::mqtt::result_code;
34
35use serde::Serialize;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38enum ConnectionStatus {
39    #[serde(rename = "disconnected")]
40    Disconnected,
41    #[serde(rename = "connecting")]
42    Connecting,
43    #[serde(rename = "connected")]
44    Connected,
45}
46use crate::mqtt::connection::packet_builder::{
47    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
48};
49use crate::mqtt::connection::packet_id_manager::PacketIdManager;
50use crate::mqtt::connection::role;
51use crate::mqtt::connection::role::RoleType;
52use crate::mqtt::connection::sendable::Sendable;
53use crate::mqtt::connection::version::*;
54use crate::mqtt::packet::v3_1_1;
55use crate::mqtt::packet::v5_0;
56use crate::mqtt::packet::GenericPacket;
57use crate::mqtt::packet::GenericStorePacket;
58use crate::mqtt::packet::IsPacketId;
59use crate::mqtt::packet::Qos;
60use crate::mqtt::packet::ResponsePacket;
61use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
62use crate::mqtt::prelude::GenericPacketTrait;
63use crate::mqtt::result_code::{
64    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
65};
66
67/// MQTT protocol maximum packet size limit
68/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
69const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
70
71/// Calculate total packet size from remaining length
72///
73/// The total packet size consists of:
74/// - 1 byte for the fixed header
75/// - 1-4 bytes for the remaining length encoding
76/// - The remaining length value itself
77fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
78    let remaining_length_bytes = if remaining_length < 128 {
79        1
80    } else if remaining_length < 16384 {
81        2
82    } else if remaining_length < 2097152 {
83        3
84    } else {
85        4
86    };
87
88    1 + remaining_length_bytes + remaining_length
89}
90
91/// Type alias for Event with u16 packet ID (most common case)
92///
93/// This is a convenience type alias that most applications will use.
94/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
95pub type Event = GenericEvent<u16>;
96
97/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
98///
99/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
100/// It handles MQTT packet processing, state management, and protocol compliance without performing
101/// any actual network I/O operations. Instead, it returns events that the application must handle.
102///
103/// # Type Parameters
104///
105/// * `Role` - The connection role (Client or Server), determining allowed operations
106/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
107///
108/// # Key Features
109///
110/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
111/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
112/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
113/// - **Configurable Behavior**: Supports various configuration options for different use cases
114/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
115///
116/// # Usage
117///
118/// ```ignore
119/// use mqtt_protocol_core::mqtt;
120///
121/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
122///
123/// // Send a packet
124/// let events = connection.send(publish_packet);
125/// for event in events {
126///     match event {
127///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
128///             // Send packet over network
129///         },
130///         // Handle other events...
131///     }
132/// }
133///
134/// // Receive data
135/// let mut cursor = std::io::Cursor::new(received_data);
136/// let events = connection.recv(&mut cursor);
137/// // Process events...
138/// ```
139pub struct GenericConnection<Role, PacketIdType>
140where
141    Role: RoleType,
142    PacketIdType: IsPacketId,
143{
144    _marker: PhantomData<Role>,
145
146    protocol_version: Version,
147
148    pid_man: PacketIdManager<PacketIdType>,
149    pid_suback: HashSet<PacketIdType>,
150    pid_unsuback: HashSet<PacketIdType>,
151    pid_puback: HashSet<PacketIdType>,
152    pid_pubrec: HashSet<PacketIdType>,
153    pid_pubcomp: HashSet<PacketIdType>,
154
155    need_store: bool,
156    // Store for retransmission packets
157    store: GenericStore<PacketIdType>,
158
159    offline_publish: bool,
160    auto_pub_response: bool,
161    auto_ping_response: bool,
162
163    // Auto map topic alias for sending
164    auto_map_topic_alias_send: bool,
165    // Auto replace topic alias for sending
166    auto_replace_topic_alias_send: bool,
167    // Topic alias management for receiving
168    topic_alias_recv: Option<TopicAliasRecv>,
169    // Topic alias management for sending
170    topic_alias_send: Option<TopicAliasSend>,
171
172    publish_send_max: Option<u16>,
173    // Maximum number of concurrent PUBLISH packets for receiving
174    publish_recv_max: Option<u16>,
175    // Maximum number of concurrent PUBLISH packets for sending
176    // Current count of PUBLISH packets being sent
177    publish_send_count: u16,
178
179    // Set of received PUBLISH packets (for flow control)
180    publish_recv: HashSet<PacketIdType>,
181
182    // Maximum packet size for sending
183    maximum_packet_size_send: u32,
184    // Maximum packet size for receiving
185    maximum_packet_size_recv: u32,
186
187    // Connection state
188    status: ConnectionStatus,
189
190    // PINGREQ send interval in milliseconds set by user
191    pingreq_user_send_interval_ms: Option<u64>,
192    pingreq_keep_alive_ms: u64,
193    pingreq_server_keep_alive_ms: Option<u64>,
194
195    // PINGREQ receive timeout in milliseconds
196    pingreq_recv_timeout_ms: u64,
197    // PINGRESP receive timeout in milliseconds
198    pingresp_recv_timeout_ms: u64,
199
200    // QoS2 PUBLISH packet handling state (for duplicate detection)
201    qos2_publish_handled: HashSet<PacketIdType>,
202
203    // Timer state flags
204    pingreq_send_set: bool,
205    pingreq_recv_set: bool,
206    pingresp_recv_set: bool,
207
208    packet_builder: PacketBuilder,
209    // Client/Server mode flag
210    is_client: bool,
211}
212
213/// Type alias for Connection with u16 packet ID (standard case)
214///
215/// This is the standard Connection type that most applications will use.
216/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
217/// supporting values from 1 to 65535.
218///
219/// For extended scenarios where larger packet ID ranges are needed
220/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
221///
222/// # Type Parameters
223///
224/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
225pub type Connection<Role> = GenericConnection<Role, u16>;
226
227impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
228where
229    Role: RoleType,
230    PacketIdType: IsPacketId,
231{
232    /// Create a new MQTT connection instance
233    ///
234    /// Initializes a new MQTT connection with the specified protocol version.
235    /// The connection starts in a disconnected state and must be activated through
236    /// the connection handshake process (CONNECT/CONNACK).
237    ///
238    /// # Parameters
239    ///
240    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
241    ///
242    /// # Returns
243    ///
244    /// A new `GenericConnection` instance ready for use
245    ///
246    /// # Examples
247    ///
248    /// ```ignore
249    /// use mqtt_protocol_core::mqtt;
250    ///
251    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
252    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
253    /// ```
254    pub fn new(version: Version) -> Self {
255        Self {
256            _marker: PhantomData,
257            protocol_version: version,
258            pid_man: PacketIdManager::new(),
259            pid_suback: HashSet::default(),
260            pid_unsuback: HashSet::default(),
261            pid_puback: HashSet::default(),
262            pid_pubrec: HashSet::default(),
263            pid_pubcomp: HashSet::default(),
264            need_store: false,
265            store: GenericStore::new(),
266            offline_publish: false,
267            auto_pub_response: false,
268            auto_ping_response: false,
269            auto_map_topic_alias_send: false,
270            auto_replace_topic_alias_send: false,
271            topic_alias_recv: None,
272            topic_alias_send: None,
273            publish_send_max: None,
274            publish_recv_max: None,
275            publish_send_count: 0,
276            publish_recv: HashSet::default(),
277            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
278            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
279            status: ConnectionStatus::Disconnected,
280            pingreq_user_send_interval_ms: None,
281            pingreq_keep_alive_ms: 0,
282            pingreq_server_keep_alive_ms: None,
283            pingreq_recv_timeout_ms: 0,
284            pingresp_recv_timeout_ms: 0,
285            qos2_publish_handled: HashSet::default(),
286            pingreq_send_set: false,
287            pingreq_recv_set: false,
288            pingresp_recv_set: false,
289            packet_builder: PacketBuilder::new(),
290            is_client: false,
291        }
292    }
293
294    // public
295
296    /// Send MQTT packet with compile-time role checking (experimental)
297    ///
298    /// This method provides compile-time verification that the packet being sent
299    /// is allowed for the current connection role (Client or Server). It only works
300    /// when the `Role` type parameter is concrete (not generic).
301    ///
302    /// This is an experimental API that may be subject to change. For general use,
303    /// consider using the `send()` method instead.
304    ///
305    /// # Type Parameters
306    ///
307    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
308    ///
309    /// # Parameters
310    ///
311    /// * `packet` - The MQTT packet to send
312    ///
313    /// # Returns
314    ///
315    /// A vector of events that the application must process
316    ///
317    /// # Compile-time Safety
318    ///
319    /// If the packet type is not allowed for the current role, this will result
320    /// in a compile-time error, preventing protocol violations at development time.
321    ///
322    /// # Examples
323    ///
324    /// ```ignore
325    /// use mqtt_protocol_core::mqtt;
326    ///
327    /// // This works for concrete roles
328    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
329    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
330    ///
331    /// // This would cause a compile error
332    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
333    /// ```
334    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
335    where
336        T: Sendable<Role, PacketIdType>,
337    {
338        // dispatch concrete packet or generic packet
339        packet.dispatch_send(self)
340    }
341
342    /// Send MQTT packet with runtime role validation
343    ///
344    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
345    /// and performs runtime validation to ensure the packet is allowed for the current
346    /// connection role. This provides flexibility when the exact packet type is not known
347    /// at compile time.
348    ///
349    /// # Parameters
350    ///
351    /// * `packet` - The MQTT packet to send
352    ///
353    /// # Returns
354    ///
355    /// A vector of events that the application must process. If the packet is not allowed
356    /// for the current role, a `NotifyError` event will be included.
357    ///
358    /// # Validation
359    ///
360    /// The method validates that:
361    /// - The packet type is allowed for the connection role (Client vs Server)
362    /// - Protocol version compatibility
363    /// - Connection state requirements
364    /// - Packet ID management for QoS > 0 packets
365    ///
366    /// # Examples
367    ///
368    /// ```ignore
369    /// use mqtt_protocol_core::mqtt;
370    ///
371    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
372    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
373    ///
374    /// for event in events {
375    ///     match event {
376    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
377    ///             // Send packet over network
378    ///         },
379    ///         mqtt::connection::Event::NotifyError(error) => {
380    ///             // Handle validation errors
381    ///         },
382    ///         // Handle other events...
383    ///     }
384    /// }
385    /// ```
386    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
387        use core::any::TypeId;
388
389        let role_id = TypeId::of::<Role>();
390        let client_id = TypeId::of::<role::Client>();
391        let server_id = TypeId::of::<role::Server>();
392        let any_id = TypeId::of::<role::Any>();
393
394        // Check version compatibility between connection and packet
395        let packet_version = packet.protocol_version();
396
397        // Return error if versions don't match
398        if self.protocol_version != packet_version {
399            return vec![GenericEvent::NotifyError(MqttError::VersionMismatch)];
400        }
401
402        match packet {
403            // CONNECT - Client/Any can send
404            GenericPacket::V3_1_1Connect(p) => {
405                if role_id == client_id || role_id == any_id {
406                    self.process_send_v3_1_1_connect(p)
407                } else {
408                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
409                }
410            }
411            GenericPacket::V5_0Connect(p) => {
412                if role_id == client_id || role_id == any_id {
413                    self.process_send_v5_0_connect(p)
414                } else {
415                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
416                }
417            }
418            // CONNACK - Server/Any can send
419            GenericPacket::V3_1_1Connack(p) => {
420                if role_id == server_id || role_id == any_id {
421                    self.process_send_v3_1_1_connack(p)
422                } else {
423                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
424                }
425            }
426            GenericPacket::V5_0Connack(p) => {
427                if role_id == server_id || role_id == any_id {
428                    self.process_send_v5_0_connack(p)
429                } else {
430                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
431                }
432            }
433            // PUBLISH - Any role can send
434            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
435            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
436            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
437            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
438            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
439            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
440            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
441            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
442            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
443            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
444            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
445            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
446            GenericPacket::V3_1_1Subscribe(p) => {
447                if role_id == client_id || role_id == any_id {
448                    self.process_send_v3_1_1_subscribe(p)
449                } else {
450                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
451                }
452            }
453            GenericPacket::V5_0Subscribe(p) => {
454                if role_id == client_id || role_id == any_id {
455                    self.process_send_v5_0_subscribe(p)
456                } else {
457                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
458                }
459            }
460            GenericPacket::V3_1_1Unsubscribe(p) => {
461                if role_id == client_id || role_id == any_id {
462                    self.process_send_v3_1_1_unsubscribe(p)
463                } else {
464                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
465                }
466            }
467            GenericPacket::V5_0Unsubscribe(p) => {
468                if role_id == client_id || role_id == any_id {
469                    self.process_send_v5_0_unsubscribe(p)
470                } else {
471                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
472                }
473            }
474            // SUBACK/UNSUBACK - Server/Any can send
475            GenericPacket::V3_1_1Suback(p) => {
476                if role_id == server_id || role_id == any_id {
477                    self.process_send_v3_1_1_suback(p)
478                } else {
479                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
480                }
481            }
482            GenericPacket::V5_0Suback(p) => {
483                if role_id == server_id || role_id == any_id {
484                    self.process_send_v5_0_suback(p)
485                } else {
486                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
487                }
488            }
489            GenericPacket::V3_1_1Unsuback(p) => {
490                if role_id == server_id || role_id == any_id {
491                    self.process_send_v3_1_1_unsuback(p)
492                } else {
493                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
494                }
495            }
496            GenericPacket::V5_0Unsuback(p) => {
497                if role_id == server_id || role_id == any_id {
498                    self.process_send_v5_0_unsuback(p)
499                } else {
500                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
501                }
502            }
503            // PINGREQ - Client/Any can send
504            GenericPacket::V3_1_1Pingreq(p) => {
505                if role_id == client_id || role_id == any_id {
506                    self.process_send_v3_1_1_pingreq(p)
507                } else {
508                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
509                }
510            }
511            GenericPacket::V5_0Pingreq(p) => {
512                if role_id == client_id || role_id == any_id {
513                    self.process_send_v5_0_pingreq(p)
514                } else {
515                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
516                }
517            }
518            // PINGRESP - Server/Any can send
519            GenericPacket::V3_1_1Pingresp(p) => {
520                if role_id == server_id || role_id == any_id {
521                    self.process_send_v3_1_1_pingresp(p)
522                } else {
523                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
524                }
525            }
526            GenericPacket::V5_0Pingresp(p) => {
527                if role_id == server_id || role_id == any_id {
528                    self.process_send_v5_0_pingresp(p)
529                } else {
530                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
531                }
532            }
533            // DISCONNECT(v3.1.1) - Client/Any role can send
534            GenericPacket::V3_1_1Disconnect(p) => {
535                if role_id == client_id || role_id == any_id {
536                    self.process_send_v3_1_1_disconnect(p)
537                } else {
538                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
539                }
540            }
541            // DISCONNECT(v5.0) - Any role can send
542            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
543            // AUTH - Any role can send (v5.0 only)
544            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
545        }
546    }
547
548    /// Receive and process incoming MQTT data
549    ///
550    /// This method processes raw bytes received from the network and attempts to
551    /// parse them into MQTT packets. It handles packet fragmentation and can
552    /// process multiple complete packets from a single data buffer.
553    ///
554    /// # Parameters
555    ///
556    /// * `data` - A cursor over the received data bytes. The cursor position will
557    ///   be advanced as data is consumed.
558    ///
559    /// # Returns
560    ///
561    /// A vector of events generated from processing the received data:
562    /// - `NotifyPacketReceived` for successfully parsed packets
563    /// - `NotifyError` for parsing errors or protocol violations
564    /// - Additional events based on packet processing (timers, responses, etc.)
565    ///
566    /// # Behavior
567    ///
568    /// - Handles partial packets (data will be buffered until complete)
569    /// - Processes multiple complete packets in sequence
570    /// - Validates packet structure and protocol compliance
571    /// - Updates internal connection state based on received packets
572    /// - Generates appropriate response events (ACKs, etc.)
573    ///
574    /// # Examples
575    ///
576    /// ```ignore
577    /// use mqtt_protocol_core::mqtt;
578    ///
579    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
580    /// let received_data = [/* network data */];
581    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
582    ///
583    /// let events = connection.recv(&mut cursor);
584    /// for event in events {
585    ///     match event {
586    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
587    ///             // Process received packet
588    ///         },
589    ///         mqtt::connection::Event::NotifyError(error) => {
590    ///             // Handle parsing/protocol errors
591    ///         },
592    ///         // Handle other events...
593    ///     }
594    /// }
595    /// ```
596    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
597        let mut events = Vec::new();
598
599        match self.packet_builder.feed(data) {
600            PacketBuildResult::Complete(raw_packet) => {
601                events.extend(self.process_recv_packet(raw_packet));
602            }
603            PacketBuildResult::Incomplete => {}
604            PacketBuildResult::Error(e) => {
605                self.cancel_timers(&mut events);
606                events.push(GenericEvent::RequestClose);
607                events.push(GenericEvent::NotifyError(e));
608            }
609        }
610
611        events
612    }
613
614    /// Notify that a timer has fired (Event-based API)
615    ///
616    /// This method should be called when the I/O layer detects that a timer has expired.
617    /// It handles the timer event appropriately and returns events that need to be processed.
618    /// Notify that a timer has fired
619    ///
620    /// This method should be called by the application when a previously requested
621    /// timer expires. The connection will take appropriate action based on the timer type.
622    ///
623    /// # Parameters
624    ///
625    /// * `kind` - The type of timer that fired
626    ///
627    /// # Returns
628    ///
629    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
630    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
631        let mut events = Vec::new();
632
633        match kind {
634            TimerKind::PingreqSend => {
635                // Reset timer flag
636                self.pingreq_send_set = false;
637
638                // Send PINGREQ if connected
639                if self.status == ConnectionStatus::Connected {
640                    match self.protocol_version {
641                        Version::V3_1_1 => {
642                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
643                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
644                            }
645                        }
646                        Version::V5_0 => {
647                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
648                                events.extend(self.process_send_v5_0_pingreq(pingreq));
649                            }
650                        }
651                        Version::Undetermined => {
652                            unreachable!("Protocol version should be set before sending PINGREQ");
653                        }
654                    }
655                }
656            }
657            TimerKind::PingreqRecv => {
658                // Reset timer flag
659                self.pingreq_recv_set = false;
660
661                match self.protocol_version {
662                    Version::V3_1_1 => {
663                        // V3.1.1: Close connection
664                        events.push(GenericEvent::RequestClose);
665                    }
666                    Version::V5_0 => {
667                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
668                        if self.status == ConnectionStatus::Connected {
669                            if let Ok(disconnect) = v5_0::Disconnect::builder()
670                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
671                                .build()
672                            {
673                                events.extend(self.process_send_v5_0_disconnect(disconnect));
674                            }
675                        }
676                    }
677                    Version::Undetermined => {
678                        unreachable!("Protocol version should be set before receiving PINGREQ");
679                    }
680                }
681            }
682            TimerKind::PingrespRecv => {
683                // Reset timer flag
684                self.pingresp_recv_set = false;
685
686                match self.protocol_version {
687                    Version::V3_1_1 => {
688                        // V3.1.1: Close connection
689                        events.push(GenericEvent::RequestClose);
690                    }
691                    Version::V5_0 => {
692                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
693                        if self.status == ConnectionStatus::Connected {
694                            if let Ok(disconnect) = v5_0::Disconnect::builder()
695                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
696                                .build()
697                            {
698                                events.extend(self.process_send_v5_0_disconnect(disconnect));
699                            }
700                        }
701                    }
702                    Version::Undetermined => {
703                        unreachable!("Protocol version should be set before receiving PINGRESP");
704                    }
705                }
706            }
707        }
708
709        events
710    }
711
712    /// Notify that the connection has been closed by the I/O layer (Event-based API)
713    ///
714    /// This method should be called when the I/O layer detects that the socket has been closed.
715    /// It updates the internal state appropriately and returns events that need to be processed.
716    /// Notify that the underlying connection has been closed
717    ///
718    /// This method should be called when the network connection is closed,
719    /// either intentionally or due to network issues.
720    ///
721    /// # Returns
722    ///
723    /// Events generated from connection closure processing
724    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
725        let mut events = Vec::new();
726
727        // Reset packet size limits to MQTT protocol maximum
728        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
729        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
730
731        // Set status to disconnected
732        self.status = ConnectionStatus::Disconnected;
733
734        // Clear topic alias management
735        self.topic_alias_send = None;
736        self.topic_alias_recv = None;
737
738        // Release packet IDs for SUBACK
739        for packet_id in self.pid_suback.drain() {
740            if self.pid_man.is_used_id(packet_id) {
741                self.pid_man.release_id(packet_id);
742                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
743            }
744        }
745
746        // Release packet IDs for UNSUBACK
747        for packet_id in self.pid_unsuback.drain() {
748            if self.pid_man.is_used_id(packet_id) {
749                self.pid_man.release_id(packet_id);
750                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
751            }
752        }
753
754        // If not storing session state, clear QoS2 states and release publish-related packet IDs
755        if !self.need_store {
756            self.qos2_publish_handled.clear();
757
758            // Release packet IDs for PUBACK
759            for packet_id in self.pid_puback.drain() {
760                if self.pid_man.is_used_id(packet_id) {
761                    self.pid_man.release_id(packet_id);
762                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
763                }
764            }
765
766            // Release packet IDs for PUBREC
767            for packet_id in self.pid_pubrec.drain() {
768                if self.pid_man.is_used_id(packet_id) {
769                    self.pid_man.release_id(packet_id);
770                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
771                }
772            }
773
774            // Release packet IDs for PUBCOMP
775            for packet_id in self.pid_pubcomp.drain() {
776                if self.pid_man.is_used_id(packet_id) {
777                    self.pid_man.release_id(packet_id);
778                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
779                }
780            }
781        }
782
783        // Cancel all timers
784        self.cancel_timers(&mut events);
785
786        events
787    }
788
789    /// Set the PINGREQ send interval
790    ///
791    /// Overrides the interval for sending PINGREQ packets to maintain the connection alive.
792    /// When changed, this may generate timer-related events to update the ping schedule.
793    ///
794    /// PINGREQ is sent after the interval has elapsed since the last packet of any type was sent.
795    ///
796    /// The send interval is determined by the following priority order:
797    /// 1. User setting by this function
798    /// 2. ServerKeepAlive property value in received CONNACK packet
799    /// 3. KeepAlive value in sent CONNECT packet
800    ///
801    /// # Parameters
802    ///
803    /// * `duration_ms` - The interval override setting:
804    ///   - `Some(value)` - Override with specified milliseconds. If 0, no PINGREQ is sent.
805    ///   - `None` - Do not override, use CONNACK ServerKeepAlive or CONNECT KeepAlive instead.
806    ///
807    /// # Returns
808    ///
809    /// Events generated from updating the ping interval
810    pub fn set_pingreq_send_interval(
811        &mut self,
812        duration_ms: Option<u64>,
813    ) -> Vec<GenericEvent<PacketIdType>> {
814        let mut events = Vec::new();
815        self.pingreq_user_send_interval_ms = duration_ms;
816        if let Some(ms) = duration_ms {
817            if ms == 0 {
818                if self.pingreq_send_set {
819                    self.pingreq_send_set = false;
820                    events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
821                }
822            } else if self.status == ConnectionStatus::Connected {
823                self.pingreq_send_set = true;
824                events.push(GenericEvent::RequestTimerReset {
825                    kind: TimerKind::PingreqSend,
826                    duration_ms: ms,
827                });
828            }
829        }
830        events
831    }
832
833    /// Get the remaining capacity for sending PUBLISH packets
834    ///
835    /// Returns the number of additional PUBLISH packets that can be sent
836    /// without exceeding the receive maximum limit.
837    ///
838    /// # Returns
839    ///
840    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
841    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
842        // If publish_recv_max is set, return the remaining capacity
843        self.publish_send_max
844            .map(|max| max.saturating_sub(self.publish_send_count))
845    }
846
847    /// Enable or disable offline publishing
848    ///
849    /// When enabled, PUBLISH packets can be sent even when disconnected.
850    /// They will be queued and sent once the connection is established.
851    ///
852    /// # Parameters
853    ///
854    /// * `enable` - Whether to enable offline publishing
855    pub fn set_offline_publish(&mut self, enable: bool) {
856        self.offline_publish = enable;
857        if self.offline_publish {
858            self.need_store = true;
859        }
860    }
861
862    /// Enable or disable automatic PUBLISH response generation
863    ///
864    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
865    /// are automatically generated for received PUBLISH packets.
866    ///
867    /// # Parameters
868    ///
869    /// * `enable` - Whether to enable automatic responses
870    pub fn set_auto_pub_response(&mut self, enable: bool) {
871        self.auto_pub_response = enable;
872    }
873
874    /// Enable or disable automatic PING response generation
875    ///
876    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
877    ///
878    /// # Parameters
879    ///
880    /// * `enable` - Whether to enable automatic PING responses
881    pub fn set_auto_ping_response(&mut self, enable: bool) {
882        self.auto_ping_response = enable;
883    }
884
885    /// Enable or disable automatic topic alias mapping for outgoing packets
886    ///
887    /// When enabled, the connection will automatically map topics to aliases
888    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
889    /// - Applying existing registered topic aliases when available
890    /// - Allocating new topic aliases for unregistered topics
891    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
892    ///
893    /// # Parameters
894    ///
895    /// * `enable` - Whether to enable automatic topic alias mapping
896    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
897        self.auto_map_topic_alias_send = enable;
898    }
899
900    /// Enable or disable automatic topic alias replacement for outgoing packets
901    ///
902    /// When enabled, the connection will automatically apply existing registered
903    /// topic aliases to outgoing PUBLISH packets when aliases are available.
904    /// This only uses previously registered aliases and does not allocate new ones.
905    ///
906    /// # Parameters
907    ///
908    /// * `enable` - Whether to enable automatic topic alias replacement
909    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
910        self.auto_replace_topic_alias_send = enable;
911    }
912
913    /// Set the PINGRESP receive timeout
914    ///
915    /// Sets the timeout for receiving PINGRESP packets after sending PINGREQ packets.
916    /// If PINGRESP is not received within this timeout, the connection is considered disconnected.
917    ///
918    /// # Parameters
919    ///
920    /// * `timeout_ms` - The timeout in milliseconds. Set to 0 to disable timeout.
921    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: u64) {
922        self.pingresp_recv_timeout_ms = timeout_ms;
923    }
924
925    /// Acquire a new packet ID for outgoing packets
926    ///
927    /// # Returns
928    ///
929    /// A unique packet ID, or an error if none are available
930    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
931        self.pid_man.acquire_unique_id()
932    }
933
934    /// Register a packet ID as in use
935    ///
936    /// Manually registers a specific packet ID as being in use, preventing
937    /// it from being allocated by `acquire_packet_id()`.
938    ///
939    /// # Parameters
940    ///
941    /// * `packet_id` - The packet ID to register as in use
942    ///
943    /// # Returns
944    ///
945    /// `Ok(())` if successful, or an error if the packet ID is already in use
946    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
947        self.pid_man.register_id(packet_id)
948    }
949
950    /// Release packet ID (Event-based API)
951    /// Release a packet ID for reuse
952    ///
953    /// This method releases a packet ID, making it available for future use.
954    /// It also generates a `NotifyPacketIdReleased` event.
955    ///
956    /// # Parameters
957    ///
958    /// * `packet_id` - The packet ID to release
959    ///
960    /// # Returns
961    ///
962    /// Events generated from releasing the packet ID
963    pub fn release_packet_id(
964        &mut self,
965        packet_id: PacketIdType,
966    ) -> Vec<GenericEvent<PacketIdType>> {
967        let mut events = Vec::new();
968
969        if self.pid_man.is_used_id(packet_id) {
970            self.pid_man.release_id(packet_id);
971            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
972        }
973
974        events
975    }
976
977    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
978    ///
979    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
980    /// that have been successfully processed and handled.
981    ///
982    /// # Returns
983    ///
984    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
985    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
986        self.qos2_publish_handled.clone()
987    }
988
989    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
990    ///
991    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
992    /// typically used when resuming a connection from persistent storage.
993    ///
994    /// # Parameters
995    ///
996    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
997    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
998        self.qos2_publish_handled = pids;
999    }
1000
1001    /// Restore previously stored packets
1002    ///
1003    /// This method restores packets that were previously stored for persistence,
1004    /// typically called when resuming a session.
1005    ///
1006    /// # Parameters
1007    ///
1008    /// * `packets` - Vector of packets to restore
1009    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
1010        for packet in packets {
1011            match &packet {
1012                GenericStorePacket::V3_1_1Publish(p) => {
1013                    // Add to appropriate QoS tracking set
1014                    match p.qos() {
1015                        Qos::AtLeastOnce => {
1016                            self.pid_puback.insert(p.packet_id().unwrap());
1017                        }
1018                        Qos::ExactlyOnce => {
1019                            self.pid_pubrec.insert(p.packet_id().unwrap());
1020                        }
1021                        _ => {
1022                            // QoS 0 shouldn't be in store, but handle gracefully
1023                            warn!("QoS 0 packet found in store, skipping");
1024                            continue;
1025                        }
1026                    }
1027                    // Register packet ID and add to store
1028                    let packet_id = p.packet_id().unwrap();
1029                    if self.pid_man.register_id(packet_id).is_ok() {
1030                        if let Err(_e) = self.store.add(packet) {
1031                            error!("Failed to add packet to store: {:?}", _e);
1032                        }
1033                    } else {
1034                        error!("Packet ID {} has already been used. Skip it", packet_id);
1035                    }
1036                }
1037                GenericStorePacket::V5_0Publish(p) => {
1038                    // Add to appropriate QoS tracking set
1039                    match p.qos() {
1040                        Qos::AtLeastOnce => {
1041                            self.pid_puback.insert(p.packet_id().unwrap());
1042                        }
1043                        Qos::ExactlyOnce => {
1044                            self.pid_pubrec.insert(p.packet_id().unwrap());
1045                        }
1046                        _ => {
1047                            // QoS 0 shouldn't be in store, but handle gracefully
1048                            warn!("QoS 0 packet found in store, skipping");
1049                            continue;
1050                        }
1051                    }
1052                    // Register packet ID and add to store
1053                    let packet_id = p.packet_id().unwrap();
1054                    if self.pid_man.register_id(packet_id).is_ok() {
1055                        if let Err(_e) = self.store.add(packet) {
1056                            error!("Failed to add packet to store: {:?}", _e);
1057                        }
1058                    } else {
1059                        error!("Packet ID {} has already been used. Skip it", packet_id);
1060                    }
1061                }
1062                GenericStorePacket::V3_1_1Pubrel(p) => {
1063                    // Pubrel packets expect PUBCOMP response
1064                    self.pid_pubcomp.insert(p.packet_id());
1065                    // Register packet ID and add to store
1066                    let packet_id = p.packet_id();
1067                    if self.pid_man.register_id(packet_id).is_ok() {
1068                        if let Err(_e) = self.store.add(packet) {
1069                            error!("Failed to add packet to store: {:?}", _e);
1070                        }
1071                    } else {
1072                        error!("Packet ID {} has already been used. Skip it", packet_id);
1073                    }
1074                }
1075                GenericStorePacket::V5_0Pubrel(p) => {
1076                    // Pubrel packets expect PUBCOMP response
1077                    self.pid_pubcomp.insert(p.packet_id());
1078                    // Register packet ID and add to store
1079                    let packet_id = p.packet_id();
1080                    if self.pid_man.register_id(packet_id).is_ok() {
1081                        if let Err(_e) = self.store.add(packet) {
1082                            error!("Failed to add packet to store: {:?}", _e);
1083                        }
1084                    } else {
1085                        error!("Packet ID {} has already been used. Skip it", packet_id);
1086                    }
1087                }
1088            }
1089        }
1090    }
1091
1092    /// Get stored packets for persistence
1093    ///
1094    /// Returns packets that need to be stored for potential retransmission.
1095    /// This is useful for implementing persistent sessions.
1096    ///
1097    /// # Returns
1098    ///
1099    /// Vector of packets that should be persisted
1100    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1101        self.store.get_stored()
1102    }
1103
1104    /// Get the MQTT protocol version being used
1105    ///
1106    /// # Returns
1107    ///
1108    /// The protocol version (V3_1_1 or V5_0)
1109    pub fn get_protocol_version(&self) -> Version {
1110        self.protocol_version
1111    }
1112
1113    /// Regulate packet for store (remove/resolve topic alias)
1114    ///
1115    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1116    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1117    pub fn regulate_for_store(
1118        &self,
1119        mut packet: v5_0::GenericPublish<PacketIdType>,
1120    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1121        if packet.topic_name().is_empty() {
1122            // Topic is empty, need to resolve from topic alias
1123            if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1124                if let Some(ref topic_alias_send) = self.topic_alias_send {
1125                    if let Some(topic) = topic_alias_send.peek(topic_alias) {
1126                        // Found topic for alias, add topic and remove alias property
1127                        packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1128                    } else {
1129                        return Err(MqttError::PacketNotRegulated);
1130                    }
1131                } else {
1132                    return Err(MqttError::PacketNotRegulated);
1133                }
1134            } else {
1135                return Err(MqttError::PacketNotRegulated);
1136            }
1137        } else {
1138            // Topic is not empty, just remove TopicAlias property if present
1139            packet = packet.remove_topic_alias();
1140        }
1141
1142        Ok(packet)
1143    }
1144
1145    // private
1146
1147    /// Initialize connection state based on client/server role
1148    ///
1149    /// Resets all connection-specific state including:
1150    /// - Publish flow control counters and limits
1151    /// - Topic alias management
1152    /// - QoS2 processing state
1153    /// - Packet ID tracking sets
1154    /// - Store requirement flag
1155    ///
1156    /// # Parameters
1157    /// * `is_client` - true for client mode, false for server mode
1158    fn initialize(&mut self, is_client: bool) {
1159        self.publish_send_max = None;
1160        self.publish_recv_max = None;
1161        self.publish_send_count = 0;
1162        self.topic_alias_send = None;
1163        self.topic_alias_recv = None;
1164        self.publish_recv.clear();
1165        self.need_store = false;
1166        self.pid_suback.clear();
1167        self.pid_unsuback.clear();
1168        self.is_client = is_client;
1169        self.pingreq_keep_alive_ms = 0;
1170        self.pingreq_server_keep_alive_ms = None;
1171    }
1172
1173    fn clear_store_related(&mut self) {
1174        self.pid_man.clear();
1175        self.pid_puback.clear();
1176        self.pid_pubrec.clear();
1177        self.pid_pubcomp.clear();
1178        self.store.clear();
1179    }
1180
1181    /// Send all stored packets for retransmission
1182    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1183        let mut events = Vec::new();
1184        self.store.for_each(|packet| {
1185            if packet.size() > self.maximum_packet_size_send as usize {
1186                let packet_id = packet.packet_id();
1187                self.pid_man.release_id(packet_id);
1188                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1189                return false; // Remove from store
1190            }
1191            events.push(GenericEvent::RequestSendPacket {
1192                packet: packet.clone().into(),
1193                release_packet_id_if_send_error: None,
1194            });
1195            true // Keep in store
1196        });
1197
1198        events
1199    }
1200
1201    /// Validate topic alias and return the associated topic name
1202    ///
1203    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1204    /// from the topic alias send manager.
1205    ///
1206    /// # Parameters
1207    /// * `topic_alias_opt` - Optional topic alias value
1208    ///
1209    /// # Returns
1210    /// * `Some(topic_name)` if the topic alias is valid and found
1211    /// * `None` if the topic alias is invalid, not provided, or not found
1212    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1213        let topic_alias = topic_alias_opt?;
1214
1215        if !self.validate_topic_alias_range(topic_alias) {
1216            return None;
1217        }
1218
1219        let topic_alias_send = self.topic_alias_send.as_mut()?;
1220        // LRU updated here
1221        let topic = topic_alias_send.get(topic_alias)?;
1222
1223        Some(topic.to_string())
1224    }
1225
1226    /// Validate that topic alias is within the allowed range
1227    ///
1228    /// Checks if the topic alias value is valid according to the configured
1229    /// topic alias maximum for sending.
1230    ///
1231    /// # Parameters
1232    /// * `topic_alias` - Topic alias value to validate
1233    ///
1234    /// # Returns
1235    /// * `true` if the topic alias is within valid range
1236    /// * `false` if invalid or topic alias sending is not configured
1237    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1238        let topic_alias_send = match &self.topic_alias_send {
1239            Some(tas) => tas,
1240            None => {
1241                error!("topic_alias is set but topic_alias_maximum is 0");
1242                return false;
1243            }
1244        };
1245
1246        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1247            error!("topic_alias is set but out of range");
1248            return false;
1249        }
1250
1251        true
1252    }
1253
1254    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1255    pub(crate) fn process_send_v3_1_1_connect(
1256        &mut self,
1257        packet: v3_1_1::Connect,
1258    ) -> Vec<GenericEvent<PacketIdType>> {
1259        info!("send connect v3.1.1: {packet}");
1260
1261        if self.status != ConnectionStatus::Disconnected {
1262            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1263        }
1264
1265        let mut events = Vec::new();
1266        self.initialize(true);
1267        self.status = ConnectionStatus::Connecting;
1268
1269        self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1270
1271        // Handle clean_session flag
1272        if packet.clean_start() {
1273            self.clear_store_related();
1274        } else {
1275            self.need_store = true;
1276        }
1277
1278        // Clear topic alias for sending
1279        self.topic_alias_send = None;
1280
1281        events.push(GenericEvent::RequestSendPacket {
1282            packet: packet.into(),
1283            release_packet_id_if_send_error: None,
1284        });
1285        self.send_post_process(&mut events);
1286
1287        events
1288    }
1289
1290    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1291    pub(crate) fn process_send_v5_0_connect(
1292        &mut self,
1293        packet: v5_0::Connect,
1294    ) -> Vec<GenericEvent<PacketIdType>> {
1295        info!("send connect v5.0: {packet}");
1296        if !self.validate_maximum_packet_size_send(packet.size()) {
1297            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1298        }
1299        if self.status != ConnectionStatus::Disconnected {
1300            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1301        }
1302
1303        let mut events = Vec::new();
1304        self.initialize(true);
1305        self.status = ConnectionStatus::Connecting;
1306
1307        self.pingreq_keep_alive_ms = packet.keep_alive() as u64 * 1000;
1308
1309        // Handle clean_start flag
1310        if packet.clean_start() {
1311            self.clear_store_related();
1312        }
1313
1314        // Process properties
1315        for prop in packet.props() {
1316            match prop {
1317                Property::TopicAliasMaximum(val) => {
1318                    if val.val() != 0 {
1319                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1320                    }
1321                }
1322                Property::ReceiveMaximum(val) => {
1323                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1324                    self.publish_recv_max = Some(val.val());
1325                }
1326                Property::MaximumPacketSize(val) => {
1327                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1328                    self.maximum_packet_size_recv = val.val();
1329                }
1330                Property::SessionExpiryInterval(val) => {
1331                    if val.val() != 0 {
1332                        self.need_store = true;
1333                    }
1334                }
1335                _ => {
1336                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1337                }
1338            }
1339        }
1340
1341        events.push(GenericEvent::RequestSendPacket {
1342            packet: packet.into(),
1343            release_packet_id_if_send_error: None,
1344        });
1345        self.send_post_process(&mut events);
1346
1347        events
1348    }
1349
1350    pub(crate) fn process_send_v3_1_1_connack(
1351        &mut self,
1352        packet: v3_1_1::Connack,
1353    ) -> Vec<GenericEvent<PacketIdType>> {
1354        info!("send connack v3.1.1: {packet}");
1355        if self.status != ConnectionStatus::Connecting {
1356            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1357        }
1358        let mut events = Vec::new();
1359        let rc = packet.return_code();
1360        events.push(GenericEvent::RequestSendPacket {
1361            packet: packet.into(),
1362            release_packet_id_if_send_error: None,
1363        });
1364        if rc != ConnectReturnCode::Accepted {
1365            self.status = ConnectionStatus::Disconnected;
1366            self.cancel_timers(&mut events);
1367            events.push(GenericEvent::RequestClose);
1368            return events;
1369        }
1370
1371        self.status = ConnectionStatus::Connected;
1372        events.extend(self.send_stored());
1373        self.send_post_process(&mut events);
1374
1375        events
1376    }
1377
1378    pub(crate) fn process_send_v5_0_connack(
1379        &mut self,
1380        packet: v5_0::Connack,
1381    ) -> Vec<GenericEvent<PacketIdType>> {
1382        info!("send connack v5.0: {packet}");
1383        if !self.validate_maximum_packet_size_send(packet.size()) {
1384            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1385        }
1386        if self.status != ConnectionStatus::Connecting {
1387            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1388        }
1389
1390        let mut events = Vec::new();
1391        let rc = packet.reason_code();
1392        if rc == ConnectReasonCode::Success {
1393            // Process properties
1394            for prop in packet.props() {
1395                match prop {
1396                    Property::TopicAliasMaximum(val) => {
1397                        if val.val() != 0 {
1398                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1399                        }
1400                    }
1401                    Property::ReceiveMaximum(val) => {
1402                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1403                        self.publish_recv_max = Some(val.val());
1404                    }
1405                    Property::MaximumPacketSize(val) => {
1406                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1407                        self.maximum_packet_size_recv = val.val();
1408                    }
1409                    Property::ServerKeepAlive(val) => {
1410                        let val = val.val();
1411                        if val == 0 {
1412                            if self.pingreq_recv_set {
1413                                self.pingreq_recv_set = false;
1414                                events
1415                                    .push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
1416                            }
1417                            self.pingreq_recv_timeout_ms = 0;
1418                        } else {
1419                            self.pingreq_recv_timeout_ms = val as u64 * 1000 * 3 / 2;
1420                            self.pingreq_recv_set = true;
1421                            events.push(GenericEvent::RequestTimerReset {
1422                                kind: TimerKind::PingreqRecv,
1423                                duration_ms: self.pingreq_recv_timeout_ms,
1424                            });
1425                        }
1426                    }
1427                    _ => {
1428                        // Ignore other properties
1429                    }
1430                }
1431            }
1432        }
1433        events.push(GenericEvent::RequestSendPacket {
1434            packet: packet.into(),
1435            release_packet_id_if_send_error: None,
1436        });
1437
1438        if rc != ConnectReasonCode::Success {
1439            self.status = ConnectionStatus::Disconnected;
1440            self.cancel_timers(&mut events);
1441            events.push(GenericEvent::RequestClose);
1442            return events;
1443        }
1444
1445        self.status = ConnectionStatus::Connected;
1446
1447        events.extend(self.send_stored());
1448        self.send_post_process(&mut events);
1449
1450        events
1451    }
1452
1453    pub(crate) fn process_send_v3_1_1_publish(
1454        &mut self,
1455        packet: v3_1_1::GenericPublish<PacketIdType>,
1456    ) -> Vec<GenericEvent<PacketIdType>> {
1457        let mut events = Vec::new();
1458        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1459
1460        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1461            // Register packet ID for QoS 1 or 2
1462            let packet_id = packet.packet_id().unwrap();
1463            if self.status != ConnectionStatus::Connected
1464                && !self.need_store
1465                && !self.offline_publish
1466            {
1467                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1468                if self.pid_man.is_used_id(packet_id) {
1469                    self.pid_man.release_id(packet_id);
1470                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1471                }
1472                return events;
1473            }
1474            if !self.pid_man.is_used_id(packet_id) {
1475                error!("packet_id {packet_id} must be acquired or registered");
1476                events.push(GenericEvent::NotifyError(
1477                    MqttError::PacketIdentifierInvalid,
1478                ));
1479                return events;
1480            }
1481            if self.need_store
1482                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1483            {
1484                let store_packet = packet.clone().set_dup(true);
1485                self.store.add(store_packet.try_into().unwrap()).unwrap();
1486            } else {
1487                release_packet_id_if_send_error = Some(packet_id);
1488            }
1489            if packet.qos() == Qos::ExactlyOnce {
1490                self.pid_pubrec.insert(packet_id);
1491            } else {
1492                self.pid_puback.insert(packet_id);
1493            }
1494        } else if self.status != ConnectionStatus::Connected {
1495            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1496            return events;
1497        }
1498
1499        if self.status == ConnectionStatus::Connected {
1500            events.push(GenericEvent::RequestSendPacket {
1501                packet: packet.into(),
1502                release_packet_id_if_send_error,
1503            });
1504            self.send_post_process(&mut events);
1505        }
1506
1507        events
1508    }
1509
1510    pub(crate) fn process_send_v5_0_publish(
1511        &mut self,
1512        mut packet: v5_0::GenericPublish<PacketIdType>,
1513    ) -> Vec<GenericEvent<PacketIdType>> {
1514        if !self.validate_maximum_packet_size_send(packet.size()) {
1515            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1516        }
1517
1518        let mut events = Vec::new();
1519        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1520        let mut topic_alias_validated = false;
1521        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1522            let packet_id = packet.packet_id().unwrap();
1523            if self.status != ConnectionStatus::Connected
1524                && !self.need_store
1525                && !self.offline_publish
1526            {
1527                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1528                if self.pid_man.is_used_id(packet_id) {
1529                    self.pid_man.release_id(packet_id);
1530                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1531                }
1532                return events;
1533            }
1534
1535            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1536            if !self.pid_man.is_used_id(packet_id) {
1537                error!("packet_id {packet_id} must be acquired or registered");
1538                events.push(GenericEvent::NotifyError(
1539                    MqttError::PacketIdentifierInvalid,
1540                ));
1541                return events;
1542            }
1543
1544            if self.need_store
1545                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1546            {
1547                let ta_opt = Self::get_topic_alias_from_props(packet.props());
1548                if packet.topic_name().is_empty() {
1549                    // Topic name is empty, must validate topic alias
1550                    let topic_opt = self.validate_topic_alias(ta_opt);
1551                    if topic_opt.is_none() {
1552                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1553                        if self.pid_man.is_used_id(packet_id) {
1554                            self.pid_man.release_id(packet_id);
1555                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1556                        }
1557                        return events;
1558                    }
1559                    topic_alias_validated = true;
1560                    let store_packet = packet
1561                        .clone()
1562                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1563                        .unwrap()
1564                        .set_dup(true);
1565                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1566                } else {
1567                    // Topic name is not empty, remove topic alias if present
1568                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1569                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1570                }
1571            } else {
1572                release_packet_id_if_send_error = Some(packet_id);
1573            }
1574            if packet.qos() == Qos::ExactlyOnce {
1575                self.pid_pubrec.insert(packet_id);
1576            } else {
1577                self.pid_puback.insert(packet_id);
1578            }
1579        } else if self.status != ConnectionStatus::Connected {
1580            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1581            return events;
1582        }
1583
1584        let packet_id_opt = packet.packet_id();
1585        let ta_opt = Self::get_topic_alias_from_props(packet.props());
1586        if packet.topic_name().is_empty() {
1587            // process manually provided TopicAlias
1588            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1589                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1590                if let Some(packet_id) = packet_id_opt {
1591                    if self.pid_man.is_used_id(packet_id) {
1592                        self.pid_man.release_id(packet_id);
1593                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1594                    }
1595                }
1596                return events;
1597            }
1598        } else if let Some(ta) = ta_opt {
1599            // Topic alias is provided
1600            if self.validate_topic_alias_range(ta) {
1601                trace!(
1602                    "topic alias: {} - {} is registered.",
1603                    packet.topic_name(),
1604                    ta
1605                );
1606                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1607                    topic_alias_send.insert_or_update(packet.topic_name(), ta);
1608                }
1609            } else {
1610                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1611                if let Some(packet_id) = packet_id_opt {
1612                    if self.pid_man.is_used_id(packet_id) {
1613                        self.pid_man.release_id(packet_id);
1614                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1615                    }
1616                }
1617                return events;
1618            }
1619        } else if self.status == ConnectionStatus::Connected {
1620            // process auto applying TopicAlias if the option is enabled
1621            if self.auto_map_topic_alias_send {
1622                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1623                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1624                        trace!(
1625                            "topic alias: {} - {} is found.",
1626                            packet.topic_name(),
1627                            found_ta
1628                        );
1629                        packet = packet.remove_topic_add_topic_alias(found_ta);
1630                    } else {
1631                        let lru_ta = topic_alias_send.get_lru_alias();
1632                        topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1633                        packet = packet.remove_topic_add_topic_alias(lru_ta);
1634                    }
1635                }
1636            } else if self.auto_replace_topic_alias_send {
1637                if let Some(ref topic_alias_send) = self.topic_alias_send {
1638                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1639                        trace!(
1640                            "topic alias: {} - {} is found.",
1641                            packet.topic_name(),
1642                            found_ta
1643                        );
1644                        packet = packet.remove_topic_add_topic_alias(found_ta);
1645                    }
1646                }
1647            }
1648        }
1649
1650        // Check receive_maximum for sending (QoS 1 and 2 packets)
1651        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1652            if let Some(max) = self.publish_send_max {
1653                if self.publish_send_count == max {
1654                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1655                    if let Some(packet_id) = packet_id_opt {
1656                        if self.pid_man.is_used_id(packet_id) {
1657                            self.pid_man.release_id(packet_id);
1658                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1659                        }
1660                    }
1661                    return events;
1662                }
1663                self.publish_send_count += 1;
1664            }
1665        }
1666
1667        if self.status == ConnectionStatus::Connected {
1668            events.push(GenericEvent::RequestSendPacket {
1669                packet: packet.into(),
1670                release_packet_id_if_send_error,
1671            });
1672            self.send_post_process(&mut events);
1673        }
1674
1675        events
1676    }
1677
1678    pub(crate) fn process_send_v3_1_1_puback(
1679        &mut self,
1680        packet: v3_1_1::GenericPuback<PacketIdType>,
1681    ) -> Vec<GenericEvent<PacketIdType>> {
1682        if self.status != ConnectionStatus::Connected {
1683            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1684        }
1685        let mut events = Vec::new();
1686
1687        events.push(GenericEvent::RequestSendPacket {
1688            packet: packet.into(),
1689            release_packet_id_if_send_error: None,
1690        });
1691        self.send_post_process(&mut events);
1692
1693        events
1694    }
1695
1696    pub(crate) fn process_send_v5_0_puback(
1697        &mut self,
1698        packet: v5_0::GenericPuback<PacketIdType>,
1699    ) -> Vec<GenericEvent<PacketIdType>> {
1700        if !self.validate_maximum_packet_size_send(packet.size()) {
1701            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1702        }
1703        if self.status != ConnectionStatus::Connected {
1704            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1705        }
1706
1707        let mut events = Vec::new();
1708        self.publish_recv.remove(&packet.packet_id());
1709
1710        events.push(GenericEvent::RequestSendPacket {
1711            packet: packet.into(),
1712            release_packet_id_if_send_error: None,
1713        });
1714        self.send_post_process(&mut events);
1715
1716        events
1717    }
1718
1719    pub(crate) fn process_send_v3_1_1_pubrec(
1720        &mut self,
1721        packet: v3_1_1::GenericPubrec<PacketIdType>,
1722    ) -> Vec<GenericEvent<PacketIdType>> {
1723        if self.status != ConnectionStatus::Connected {
1724            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1725        }
1726        let mut events = Vec::new();
1727
1728        events.push(GenericEvent::RequestSendPacket {
1729            packet: packet.into(),
1730            release_packet_id_if_send_error: None,
1731        });
1732        self.send_post_process(&mut events);
1733
1734        events
1735    }
1736
1737    pub(crate) fn process_send_v5_0_pubrec(
1738        &mut self,
1739        packet: v5_0::GenericPubrec<PacketIdType>,
1740    ) -> Vec<GenericEvent<PacketIdType>> {
1741        if !self.validate_maximum_packet_size_send(packet.size()) {
1742            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1743        }
1744        if self.status != ConnectionStatus::Connected {
1745            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1746        }
1747
1748        let mut events = Vec::new();
1749        let packet_id = packet.packet_id();
1750
1751        if let Some(rc) = packet.reason_code() {
1752            if rc.is_failure() {
1753                self.publish_recv.remove(&packet_id);
1754                self.qos2_publish_handled.remove(&packet_id);
1755            }
1756        }
1757
1758        events.push(GenericEvent::RequestSendPacket {
1759            packet: packet.into(),
1760            release_packet_id_if_send_error: None,
1761        });
1762        self.send_post_process(&mut events);
1763
1764        events
1765    }
1766
1767    pub(crate) fn process_send_v3_1_1_pubrel(
1768        &mut self,
1769        packet: v3_1_1::GenericPubrel<PacketIdType>,
1770    ) -> Vec<GenericEvent<PacketIdType>> {
1771        if self.status != ConnectionStatus::Connected && !self.need_store {
1772            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1773        }
1774        let mut events = Vec::new();
1775        let packet_id = packet.packet_id();
1776        if !self.pid_man.is_used_id(packet_id) {
1777            error!("packet_id {packet_id} must be acquired or registered");
1778            events.push(GenericEvent::NotifyError(
1779                MqttError::PacketIdentifierInvalid,
1780            ));
1781            return events;
1782        }
1783        if self.need_store {
1784            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1785        }
1786
1787        if self.status == ConnectionStatus::Connected {
1788            self.pid_pubcomp.insert(packet_id);
1789            events.push(GenericEvent::RequestSendPacket {
1790                packet: packet.into(),
1791                release_packet_id_if_send_error: None,
1792            });
1793        }
1794        self.send_post_process(&mut events);
1795
1796        events
1797    }
1798
1799    pub(crate) fn process_send_v5_0_pubrel(
1800        &mut self,
1801        packet: v5_0::GenericPubrel<PacketIdType>,
1802    ) -> Vec<GenericEvent<PacketIdType>> {
1803        if !self.validate_maximum_packet_size_send(packet.size()) {
1804            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1805        }
1806        if self.status != ConnectionStatus::Connected && !self.need_store {
1807            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1808        }
1809
1810        let mut events = Vec::new();
1811        let packet_id = packet.packet_id();
1812        if !self.pid_man.is_used_id(packet_id) {
1813            error!("packet_id {packet_id} must be acquired or registered");
1814            events.push(GenericEvent::NotifyError(
1815                MqttError::PacketIdentifierInvalid,
1816            ));
1817            return events;
1818        }
1819        if self.need_store {
1820            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1821        }
1822
1823        if self.status == ConnectionStatus::Connected {
1824            self.pid_pubcomp.insert(packet_id);
1825            events.push(GenericEvent::RequestSendPacket {
1826                packet: packet.into(),
1827                release_packet_id_if_send_error: None,
1828            });
1829        }
1830        self.send_post_process(&mut events);
1831
1832        events
1833    }
1834
1835    pub(crate) fn process_send_v3_1_1_pubcomp(
1836        &mut self,
1837        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1838    ) -> Vec<GenericEvent<PacketIdType>> {
1839        if self.status != ConnectionStatus::Connected {
1840            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1841        }
1842        let mut events = Vec::new();
1843
1844        events.push(GenericEvent::RequestSendPacket {
1845            packet: packet.into(),
1846            release_packet_id_if_send_error: None,
1847        });
1848        self.send_post_process(&mut events);
1849
1850        events
1851    }
1852
1853    pub(crate) fn process_send_v5_0_pubcomp(
1854        &mut self,
1855        packet: v5_0::GenericPubcomp<PacketIdType>,
1856    ) -> Vec<GenericEvent<PacketIdType>> {
1857        if !self.validate_maximum_packet_size_send(packet.size()) {
1858            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1859        }
1860        if self.status != ConnectionStatus::Connected {
1861            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1862        }
1863
1864        let mut events = Vec::new();
1865        self.publish_recv.remove(&packet.packet_id());
1866
1867        events.push(GenericEvent::RequestSendPacket {
1868            packet: packet.into(),
1869            release_packet_id_if_send_error: None,
1870        });
1871        self.send_post_process(&mut events);
1872
1873        events
1874    }
1875
1876    pub(crate) fn process_send_v3_1_1_subscribe(
1877        &mut self,
1878        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1879    ) -> Vec<GenericEvent<PacketIdType>> {
1880        let mut events = Vec::new();
1881        let packet_id = packet.packet_id();
1882        if self.status != ConnectionStatus::Connected {
1883            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1884            if self.pid_man.is_used_id(packet_id) {
1885                self.pid_man.release_id(packet_id);
1886                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1887            }
1888            return events;
1889        }
1890        if !self.pid_man.is_used_id(packet_id) {
1891            error!("packet_id {packet_id} must be acquired or registered");
1892            events.push(GenericEvent::NotifyError(
1893                MqttError::PacketIdentifierInvalid,
1894            ));
1895            return events;
1896        }
1897        self.pid_suback.insert(packet_id);
1898
1899        events.push(GenericEvent::RequestSendPacket {
1900            packet: packet.into(),
1901            release_packet_id_if_send_error: Some(packet_id),
1902        });
1903        self.send_post_process(&mut events);
1904
1905        events
1906    }
1907
1908    pub(crate) fn process_send_v5_0_subscribe(
1909        &mut self,
1910        packet: v5_0::GenericSubscribe<PacketIdType>,
1911    ) -> Vec<GenericEvent<PacketIdType>> {
1912        if !self.validate_maximum_packet_size_send(packet.size()) {
1913            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1914        }
1915
1916        let mut events = Vec::new();
1917        let packet_id = packet.packet_id();
1918        if self.status != ConnectionStatus::Connected {
1919            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1920            if self.pid_man.is_used_id(packet_id) {
1921                self.pid_man.release_id(packet_id);
1922                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1923            }
1924            return events;
1925        }
1926        if !self.pid_man.is_used_id(packet_id) {
1927            error!("packet_id {packet_id} must be acquired or registered");
1928            events.push(GenericEvent::NotifyError(
1929                MqttError::PacketIdentifierInvalid,
1930            ));
1931            return events;
1932        }
1933        self.pid_suback.insert(packet_id);
1934
1935        events.push(GenericEvent::RequestSendPacket {
1936            packet: packet.into(),
1937            release_packet_id_if_send_error: Some(packet_id),
1938        });
1939        self.send_post_process(&mut events);
1940
1941        events
1942    }
1943
1944    pub(crate) fn process_send_v3_1_1_suback(
1945        &mut self,
1946        packet: v3_1_1::GenericSuback<PacketIdType>,
1947    ) -> Vec<GenericEvent<PacketIdType>> {
1948        if self.status != ConnectionStatus::Connected {
1949            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1950        }
1951        let mut events = Vec::new();
1952        events.push(GenericEvent::RequestSendPacket {
1953            packet: packet.into(),
1954            release_packet_id_if_send_error: None,
1955        });
1956        self.send_post_process(&mut events);
1957
1958        events
1959    }
1960
1961    pub(crate) fn process_send_v5_0_suback(
1962        &mut self,
1963        packet: v5_0::GenericSuback<PacketIdType>,
1964    ) -> Vec<GenericEvent<PacketIdType>> {
1965        if !self.validate_maximum_packet_size_send(packet.size()) {
1966            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1967        }
1968        if self.status != ConnectionStatus::Connected {
1969            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1970        }
1971
1972        let mut events = Vec::new();
1973        events.push(GenericEvent::RequestSendPacket {
1974            packet: packet.into(),
1975            release_packet_id_if_send_error: None,
1976        });
1977        self.send_post_process(&mut events);
1978
1979        events
1980    }
1981
1982    pub(crate) fn process_send_v3_1_1_unsubscribe(
1983        &mut self,
1984        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1985    ) -> Vec<GenericEvent<PacketIdType>> {
1986        let mut events = Vec::new();
1987        let packet_id = packet.packet_id();
1988        if self.status != ConnectionStatus::Connected {
1989            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1990            if self.pid_man.is_used_id(packet_id) {
1991                self.pid_man.release_id(packet_id);
1992                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1993            }
1994            return events;
1995        }
1996        if !self.pid_man.is_used_id(packet_id) {
1997            error!("packet_id {packet_id} must be acquired or registered");
1998            events.push(GenericEvent::NotifyError(
1999                MqttError::PacketIdentifierInvalid,
2000            ));
2001            return events;
2002        }
2003        self.pid_unsuback.insert(packet_id);
2004
2005        events.push(GenericEvent::RequestSendPacket {
2006            packet: packet.into(),
2007            release_packet_id_if_send_error: Some(packet_id),
2008        });
2009        self.send_post_process(&mut events);
2010
2011        events
2012    }
2013
2014    pub(crate) fn process_send_v5_0_unsubscribe(
2015        &mut self,
2016        packet: v5_0::GenericUnsubscribe<PacketIdType>,
2017    ) -> Vec<GenericEvent<PacketIdType>> {
2018        if !self.validate_maximum_packet_size_send(packet.size()) {
2019            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2020        }
2021
2022        let mut events = Vec::new();
2023        let packet_id = packet.packet_id();
2024        if self.status != ConnectionStatus::Connected {
2025            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2026            if self.pid_man.is_used_id(packet_id) {
2027                self.pid_man.release_id(packet_id);
2028                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2029            }
2030            return events;
2031        }
2032        if !self.pid_man.is_used_id(packet_id) {
2033            error!("packet_id {packet_id} must be acquired or registered");
2034            events.push(GenericEvent::NotifyError(
2035                MqttError::PacketIdentifierInvalid,
2036            ));
2037            return events;
2038        }
2039        self.pid_unsuback.insert(packet_id);
2040
2041        events.push(GenericEvent::RequestSendPacket {
2042            packet: packet.into(),
2043            release_packet_id_if_send_error: Some(packet_id),
2044        });
2045        self.send_post_process(&mut events);
2046
2047        events
2048    }
2049
2050    pub(crate) fn process_send_v3_1_1_unsuback(
2051        &mut self,
2052        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2053    ) -> Vec<GenericEvent<PacketIdType>> {
2054        if self.status != ConnectionStatus::Connected {
2055            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2056        }
2057        let mut events = Vec::new();
2058        events.push(GenericEvent::RequestSendPacket {
2059            packet: packet.into(),
2060            release_packet_id_if_send_error: None,
2061        });
2062        self.send_post_process(&mut events);
2063
2064        events
2065    }
2066
2067    pub(crate) fn process_send_v5_0_unsuback(
2068        &mut self,
2069        packet: v5_0::GenericUnsuback<PacketIdType>,
2070    ) -> Vec<GenericEvent<PacketIdType>> {
2071        if !self.validate_maximum_packet_size_send(packet.size()) {
2072            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2073        }
2074        if self.status != ConnectionStatus::Connected {
2075            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076        }
2077
2078        let mut events = Vec::new();
2079        events.push(GenericEvent::RequestSendPacket {
2080            packet: packet.into(),
2081            release_packet_id_if_send_error: None,
2082        });
2083        self.send_post_process(&mut events);
2084
2085        events
2086    }
2087
2088    pub(crate) fn process_send_v3_1_1_pingreq(
2089        &mut self,
2090        packet: v3_1_1::Pingreq,
2091    ) -> Vec<GenericEvent<PacketIdType>> {
2092        if self.status != ConnectionStatus::Connected {
2093            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2094        }
2095        let mut events = Vec::new();
2096        events.push(GenericEvent::RequestSendPacket {
2097            packet: packet.into(),
2098            release_packet_id_if_send_error: None,
2099        });
2100        if self.pingresp_recv_timeout_ms != 0 {
2101            self.pingresp_recv_set = true;
2102            events.push(GenericEvent::RequestTimerReset {
2103                kind: TimerKind::PingrespRecv,
2104                duration_ms: self.pingresp_recv_timeout_ms,
2105            });
2106        }
2107        self.send_post_process(&mut events);
2108
2109        events
2110    }
2111
2112    pub(crate) fn process_send_v5_0_pingreq(
2113        &mut self,
2114        packet: v5_0::Pingreq,
2115    ) -> Vec<GenericEvent<PacketIdType>> {
2116        if !self.validate_maximum_packet_size_send(packet.size()) {
2117            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2118        }
2119        if self.status != ConnectionStatus::Connected {
2120            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2121        }
2122
2123        let mut events = Vec::new();
2124        events.push(GenericEvent::RequestSendPacket {
2125            packet: packet.into(),
2126            release_packet_id_if_send_error: None,
2127        });
2128        if self.pingresp_recv_timeout_ms != 0 {
2129            self.pingresp_recv_set = true;
2130            events.push(GenericEvent::RequestTimerReset {
2131                kind: TimerKind::PingrespRecv,
2132                duration_ms: self.pingresp_recv_timeout_ms,
2133            });
2134        }
2135        self.send_post_process(&mut events);
2136
2137        events
2138    }
2139
2140    pub(crate) fn process_send_v3_1_1_pingresp(
2141        &mut self,
2142        packet: v3_1_1::Pingresp,
2143    ) -> Vec<GenericEvent<PacketIdType>> {
2144        if self.status != ConnectionStatus::Connected {
2145            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2146        }
2147        let mut events = Vec::new();
2148        events.push(GenericEvent::RequestSendPacket {
2149            packet: packet.into(),
2150            release_packet_id_if_send_error: None,
2151        });
2152        self.send_post_process(&mut events);
2153
2154        events
2155    }
2156
2157    pub(crate) fn process_send_v5_0_pingresp(
2158        &mut self,
2159        packet: v5_0::Pingresp,
2160    ) -> Vec<GenericEvent<PacketIdType>> {
2161        if !self.validate_maximum_packet_size_send(packet.size()) {
2162            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2163        }
2164        if self.status != ConnectionStatus::Connected {
2165            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166        }
2167
2168        let mut events = Vec::new();
2169        events.push(GenericEvent::RequestSendPacket {
2170            packet: packet.into(),
2171            release_packet_id_if_send_error: None,
2172        });
2173        self.send_post_process(&mut events);
2174
2175        events
2176    }
2177
2178    pub(crate) fn process_send_v3_1_1_disconnect(
2179        &mut self,
2180        packet: v3_1_1::Disconnect,
2181    ) -> Vec<GenericEvent<PacketIdType>> {
2182        if self.status != ConnectionStatus::Connected {
2183            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2184        }
2185        let mut events = Vec::new();
2186        self.status = ConnectionStatus::Disconnected;
2187        self.cancel_timers(&mut events);
2188        events.push(GenericEvent::RequestSendPacket {
2189            packet: packet.into(),
2190            release_packet_id_if_send_error: None,
2191        });
2192        events.push(GenericEvent::RequestClose);
2193
2194        events
2195    }
2196
2197    pub(crate) fn process_send_v5_0_disconnect(
2198        &mut self,
2199        packet: v5_0::Disconnect,
2200    ) -> Vec<GenericEvent<PacketIdType>> {
2201        if !self.validate_maximum_packet_size_send(packet.size()) {
2202            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2203        }
2204        if self.status != ConnectionStatus::Connected {
2205            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2206        }
2207
2208        let mut events = Vec::new();
2209        self.status = ConnectionStatus::Disconnected;
2210        self.cancel_timers(&mut events);
2211        events.push(GenericEvent::RequestSendPacket {
2212            packet: packet.into(),
2213            release_packet_id_if_send_error: None,
2214        });
2215        events.push(GenericEvent::RequestClose);
2216
2217        events
2218    }
2219
2220    pub(crate) fn process_send_v5_0_auth(
2221        &mut self,
2222        packet: v5_0::Auth,
2223    ) -> Vec<GenericEvent<PacketIdType>> {
2224        if !self.validate_maximum_packet_size_send(packet.size()) {
2225            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2226        }
2227        if self.status == ConnectionStatus::Disconnected {
2228            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2229        }
2230
2231        let mut events = Vec::new();
2232        events.push(GenericEvent::RequestSendPacket {
2233            packet: packet.into(),
2234            release_packet_id_if_send_error: None,
2235        });
2236        self.send_post_process(&mut events);
2237
2238        events
2239    }
2240
2241    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2242        if self.is_client {
2243            // Priority 3
2244            let mut ms: u64 = self.pingreq_keep_alive_ms;
2245            if let Some(timeout_ms) = self.pingreq_user_send_interval_ms {
2246                // Priority 1
2247                ms = timeout_ms;
2248            } else if let Some(timeout_ms) = self.pingreq_server_keep_alive_ms {
2249                // Priority 2
2250                ms = timeout_ms;
2251            }
2252            if ms > 0 {
2253                self.pingreq_send_set = true;
2254                events.push(GenericEvent::RequestTimerReset {
2255                    kind: TimerKind::PingreqSend,
2256                    duration_ms: ms,
2257                });
2258            }
2259        }
2260    }
2261
2262    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2263        if size > self.maximum_packet_size_send as usize {
2264            error!("packet size over maximum_packet_size for sending");
2265            return false;
2266        }
2267        true
2268    }
2269
2270    #[rustfmt::skip]
2271    fn can_receive(&self, packet_type: u8) -> bool {
2272        !((Role::IS_CLIENT &&
2273            (
2274                packet_type == 1 || // CONNECT
2275                packet_type == 8 || // SUBSCRIBE
2276                packet_type == 10 || // UNSUBSCRIBE
2277                packet_type == 12 || // PINGREQ
2278                (packet_type == 14 && self.protocol_version == Version::V3_1_1) || // DISCONNECT
2279                (packet_type == 15 && self.protocol_version == Version::V3_1_1) // AUTH
2280            )
2281        ) || (Role::IS_SERVER &&
2282            (
2283                packet_type == 2 || // CONNACK
2284                packet_type == 9 || // SUBACK
2285                packet_type == 11 || // UNSUBACK
2286                packet_type == 13 || // PINGRESP
2287                (packet_type == 15 && self.protocol_version == Version::V3_1_1) // AUTH
2288            )
2289        ))
2290    }
2291
2292    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2293        let mut events = Vec::new();
2294
2295        // packet size limit validation (v3.1.1 is always satisfied)
2296        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2297        if total_size > self.maximum_packet_size_recv {
2298            // This happens only when protocol version is V5.0.
2299            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2300            // If the packet size is over 268434555, feed() return an error.
2301            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2302            // So DISCONNECT packet is the right choice to notify the error.
2303            let disconnect_packet = v5_0::Disconnect::builder()
2304                .reason_code(DisconnectReasonCode::PacketTooLarge)
2305                .build()
2306                .unwrap();
2307            // Send disconnect packet directly without generic constraints
2308            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2309            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2310            return events;
2311        }
2312
2313        let packet_type = raw_packet.packet_type();
2314        if !self.can_receive(packet_type) {
2315            events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2316            return events;
2317        }
2318
2319        let _flags = raw_packet.flags();
2320        match self.protocol_version {
2321            Version::V3_1_1 => {
2322                match packet_type {
2323                    1 => {
2324                        // CONNECT
2325                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2326                    }
2327                    2 => {
2328                        // CONNACK
2329                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2330                    }
2331                    3 => {
2332                        // PUBLISH
2333                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2334                    }
2335                    4 => {
2336                        // PUBACK
2337                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2338                    }
2339                    5 => {
2340                        // PUBREC
2341                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2342                    }
2343                    6 => {
2344                        // PUBREL
2345                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2346                    }
2347                    7 => {
2348                        // PUBCOMP
2349                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2350                    }
2351                    8 => {
2352                        // SUBSCRIBE
2353                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2354                    }
2355                    9 => {
2356                        // SUBACK
2357                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2358                    }
2359                    10 => {
2360                        // UNSUBSCRIBE
2361                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2362                    }
2363                    11 => {
2364                        // UNSUBACK
2365                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2366                    }
2367                    12 => {
2368                        // PINGREQ
2369                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2370                    }
2371                    13 => {
2372                        // PINGRESP
2373                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2374                    }
2375                    14 => {
2376                        // DISCONNECT
2377                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2378                    }
2379                    // invalid packet type
2380                    _ => {
2381                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2382                    }
2383                }
2384            }
2385            Version::V5_0 => {
2386                match packet_type {
2387                    1 => {
2388                        // CONNECT
2389                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2390                    }
2391                    2 => {
2392                        // CONNACK
2393                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2394                    }
2395                    3 => {
2396                        // PUBLISH
2397                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2398                    }
2399                    4 => {
2400                        // PUBACK
2401                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2402                    }
2403                    5 => {
2404                        // PUBREC
2405                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2406                    }
2407                    6 => {
2408                        // PUBREL
2409                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2410                    }
2411                    7 => {
2412                        // PUBCOMP
2413                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2414                    }
2415                    8 => {
2416                        // SUBSCRIBE
2417                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2418                    }
2419                    9 => {
2420                        // SUBACK
2421                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2422                    }
2423                    10 => {
2424                        // UNSUBSCRIBE
2425                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2426                    }
2427                    11 => {
2428                        // UNSUBACK
2429                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2430                    }
2431                    12 => {
2432                        // PINGREQ
2433                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2434                    }
2435                    13 => {
2436                        // PINGRESP
2437                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2438                    }
2439                    14 => {
2440                        // DISCONNECT
2441                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2442                    }
2443                    15 => {
2444                        // AUTH
2445                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2446                    }
2447                    // invalid packet type
2448                    _ => {
2449                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2450                    }
2451                }
2452            }
2453            Version::Undetermined => {
2454                match packet_type {
2455                    1 => {
2456                        // CONNECT
2457                        if raw_packet.remaining_length() < 7 {
2458                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2459                            return events;
2460                        }
2461                        match raw_packet.data_as_slice()[6] {
2462                            // Protocol Version
2463                            4 => {
2464                                self.protocol_version = Version::V3_1_1;
2465                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2466                            }
2467                            5 => {
2468                                self.protocol_version = Version::V5_0;
2469                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2470                            }
2471                            _ => {
2472                                events.push(GenericEvent::NotifyError(
2473                                    MqttError::UnsupportedProtocolVersion,
2474                                ));
2475                            }
2476                        }
2477                    }
2478                    _ => {
2479                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2480                    }
2481                }
2482            }
2483        }
2484
2485        events
2486    }
2487
2488    fn process_recv_v3_1_1_connect(
2489        &mut self,
2490        raw_packet: RawPacket,
2491    ) -> Vec<GenericEvent<PacketIdType>> {
2492        let mut events = Vec::new();
2493        if self.status != ConnectionStatus::Disconnected {
2494            Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2495            return events;
2496        }
2497        self.status = ConnectionStatus::Connecting;
2498        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2499            Ok((packet, _)) => {
2500                self.initialize(false);
2501                if packet.keep_alive() > 0 {
2502                    self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2503                }
2504                if packet.clean_session() {
2505                    self.clear_store_related();
2506                } else {
2507                    self.need_store = true;
2508                }
2509                events.extend(self.refresh_pingreq_recv());
2510                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2511            }
2512            Err(e) => {
2513                let rc = match e {
2514                    MqttError::ClientIdentifierNotValid => ConnectReturnCode::IdentifierRejected,
2515                    MqttError::BadUserNameOrPassword => ConnectReturnCode::BadUserNameOrPassword,
2516                    MqttError::UnsupportedProtocolVersion => {
2517                        ConnectReturnCode::UnacceptableProtocolVersion
2518                    }
2519                    _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2520                };
2521                let connack = v3_1_1::Connack::builder()
2522                    .return_code(rc)
2523                    .session_present(false)
2524                    .build()
2525                    .unwrap();
2526                let connack_events = self.process_send_v3_1_1_connack(connack);
2527                events.extend(connack_events);
2528                events.push(GenericEvent::NotifyError(e));
2529            }
2530        }
2531
2532        events
2533    }
2534
2535    fn process_recv_v5_0_connect(
2536        &mut self,
2537        raw_packet: RawPacket,
2538    ) -> Vec<GenericEvent<PacketIdType>> {
2539        let mut events = Vec::new();
2540        if self.status != ConnectionStatus::Disconnected {
2541            self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
2542            return events;
2543        }
2544        self.status = ConnectionStatus::Connecting;
2545        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2546            Ok((packet, _)) => {
2547                self.initialize(false);
2548                if packet.keep_alive() > 0 {
2549                    self.pingreq_recv_timeout_ms = (packet.keep_alive() as u64) * 1000 * 3 / 2;
2550                }
2551                if packet.clean_start() {
2552                    self.clear_store_related();
2553                }
2554                packet.props().iter().for_each(|prop| match prop {
2555                    Property::TopicAliasMaximum(p) => {
2556                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2557                    }
2558                    Property::ReceiveMaximum(p) => {
2559                        self.publish_send_max = Some(p.val());
2560                    }
2561                    Property::MaximumPacketSize(p) => {
2562                        self.maximum_packet_size_send = p.val();
2563                    }
2564                    Property::SessionExpiryInterval(p) => {
2565                        if p.val() != 0 {
2566                            self.need_store = true;
2567                        }
2568                    }
2569                    _ => {}
2570                });
2571                events.extend(self.refresh_pingreq_recv());
2572                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2573            }
2574            Err(e) => {
2575                let rc = match e {
2576                    MqttError::ClientIdentifierNotValid => {
2577                        ConnectReasonCode::ClientIdentifierNotValid
2578                    }
2579                    MqttError::BadUserNameOrPassword => ConnectReasonCode::BadUserNameOrPassword,
2580                    MqttError::UnsupportedProtocolVersion => {
2581                        ConnectReasonCode::UnsupportedProtocolVersion
2582                    }
2583                    _ => ConnectReasonCode::UnspecifiedError,
2584                };
2585                let connack = v5_0::Connack::builder()
2586                    .reason_code(rc)
2587                    .session_present(false)
2588                    .build()
2589                    .unwrap();
2590                let connack_events = self.process_send_v5_0_connack(connack);
2591                events.extend(connack_events);
2592                events.push(GenericEvent::NotifyError(e));
2593            }
2594        }
2595
2596        events
2597    }
2598
2599    fn process_recv_v3_1_1_connack(
2600        &mut self,
2601        raw_packet: RawPacket,
2602    ) -> Vec<GenericEvent<PacketIdType>> {
2603        let mut events = Vec::new();
2604
2605        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2606            Ok((packet, _consumed)) => {
2607                if packet.return_code() == ConnectReturnCode::Accepted {
2608                    self.status = ConnectionStatus::Connected;
2609                    if packet.session_present() {
2610                        events.extend(self.send_stored());
2611                    } else {
2612                        self.clear_store_related();
2613                    }
2614                }
2615                events.push(GenericEvent::NotifyPacketReceived(
2616                    GenericPacket::V3_1_1Connack(packet),
2617                ));
2618            }
2619            Err(e) => {
2620                Self::handle_v3_1_1_error(e, &mut events);
2621            }
2622        }
2623
2624        events
2625    }
2626
2627    fn process_recv_v5_0_connack(
2628        &mut self,
2629        raw_packet: RawPacket,
2630    ) -> Vec<GenericEvent<PacketIdType>> {
2631        let mut events = Vec::new();
2632
2633        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2634            Ok((packet, _consumed)) => {
2635                if packet.reason_code() == ConnectReasonCode::Success {
2636                    self.status = ConnectionStatus::Connected;
2637
2638                    // Process properties
2639                    for prop in packet.props() {
2640                        match prop {
2641                            Property::TopicAliasMaximum(val) => {
2642                                if val.val() > 0 {
2643                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2644                                }
2645                            }
2646                            Property::ReceiveMaximum(val) => {
2647                                assert!(val.val() != 0);
2648                                self.publish_send_max = Some(val.val());
2649                            }
2650                            Property::MaximumPacketSize(val) => {
2651                                assert!(val.val() != 0);
2652                                self.maximum_packet_size_send = val.val();
2653                            }
2654                            Property::ServerKeepAlive(val) => {
2655                                let val = val.val() as u64 * 1000;
2656                                self.pingreq_server_keep_alive_ms = Some(val);
2657                                if self.pingreq_user_send_interval_ms.is_none() {
2658                                    if val == 0 {
2659                                        if self.pingreq_send_set {
2660                                            self.pingreq_send_set = false;
2661                                            events.push(GenericEvent::RequestTimerCancel(
2662                                                TimerKind::PingreqSend,
2663                                            ));
2664                                        }
2665                                        self.pingreq_user_send_interval_ms = None;
2666                                    } else {
2667                                        self.pingreq_send_set = true;
2668                                        events.push(GenericEvent::RequestTimerReset {
2669                                            kind: TimerKind::PingreqSend,
2670                                            duration_ms: val,
2671                                        });
2672                                    }
2673                                }
2674                            }
2675                            _ => {
2676                                // Ignore other properties
2677                            }
2678                        }
2679                    }
2680
2681                    if packet.session_present() {
2682                        events.extend(self.send_stored());
2683                    } else {
2684                        self.clear_store_related();
2685                    }
2686                }
2687                events.push(GenericEvent::NotifyPacketReceived(
2688                    GenericPacket::V5_0Connack(packet),
2689                ));
2690            }
2691            Err(e) => {
2692                if self.status == ConnectionStatus::Connected {
2693                    self.handle_v5_0_error(e, &mut events);
2694                } else {
2695                    events.push(GenericEvent::NotifyError(e));
2696                }
2697            }
2698        }
2699
2700        events
2701    }
2702
2703    fn process_recv_v3_1_1_publish(
2704        &mut self,
2705        raw_packet: RawPacket,
2706    ) -> Vec<GenericEvent<PacketIdType>> {
2707        let mut events = Vec::new();
2708
2709        let flags = raw_packet.flags();
2710        match &raw_packet.data {
2711            PacketData::Publish(arc) => {
2712                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2713                    Ok((packet, _consumed)) => {
2714                        match packet.qos() {
2715                            Qos::AtMostOnce => {
2716                                events.extend(self.refresh_pingreq_recv());
2717                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2718                            }
2719                            Qos::AtLeastOnce => {
2720                                let packet_id = packet.packet_id().unwrap();
2721                                if self.status == ConnectionStatus::Connected
2722                                    && self.auto_pub_response
2723                                {
2724                                    // Send PUBACK automatically
2725                                    let puback = v3_1_1::GenericPuback::builder()
2726                                        .packet_id(packet_id)
2727                                        .build()
2728                                        .unwrap();
2729                                    events.extend(self.process_send_v3_1_1_puback(puback));
2730                                }
2731                                events.extend(self.refresh_pingreq_recv());
2732                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2733                            }
2734                            Qos::ExactlyOnce => {
2735                                let packet_id = packet.packet_id().unwrap();
2736                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2737
2738                                if self.status == ConnectionStatus::Connected
2739                                    && (self.auto_pub_response || already_handled)
2740                                {
2741                                    let pubrec = v3_1_1::GenericPubrec::builder()
2742                                        .packet_id(packet_id)
2743                                        .build()
2744                                        .unwrap();
2745                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2746                                }
2747                                events.extend(self.refresh_pingreq_recv());
2748                                if !already_handled {
2749                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2750                                }
2751                            }
2752                        }
2753                    }
2754                    Err(e) => {
2755                        Self::handle_v3_1_1_error(e, &mut events);
2756                    }
2757                }
2758            }
2759            PacketData::Normal(_) => {
2760                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2761            }
2762        }
2763
2764        events
2765    }
2766
2767    fn process_recv_v5_0_publish(
2768        &mut self,
2769        raw_packet: RawPacket,
2770    ) -> Vec<GenericEvent<PacketIdType>> {
2771        let mut events = Vec::new();
2772
2773        let flags = raw_packet.flags();
2774        match &raw_packet.data {
2775            PacketData::Publish(arc) => {
2776                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2777                    Ok((mut packet, _consumed)) => {
2778                        let mut already_handled = false;
2779                        let mut puback_send = false;
2780                        let mut pubrec_send = false;
2781
2782                        let mut check_receive_maximum =
2783                            |events: &mut Vec<GenericEvent<PacketIdType>>| {
2784                                if let Some(max) = self.publish_recv_max {
2785                                    if self.publish_recv.len() >= max as usize {
2786                                        self.handle_v5_0_error(
2787                                            MqttError::ReceiveMaximumExceeded,
2788                                            events,
2789                                        );
2790                                        return false;
2791                                    }
2792                                }
2793                                true
2794                            };
2795
2796                        match packet.qos() {
2797                            Qos::AtLeastOnce => {
2798                                let packet_id = packet.packet_id().unwrap();
2799                                if !check_receive_maximum(&mut events) {
2800                                    return events;
2801                                }
2802                                self.publish_recv.insert(packet_id);
2803                                if self.auto_pub_response
2804                                    && self.status == ConnectionStatus::Connected
2805                                {
2806                                    puback_send = true;
2807                                }
2808                            }
2809                            Qos::ExactlyOnce => {
2810                                let packet_id = packet.packet_id().unwrap();
2811                                if !check_receive_maximum(&mut events) {
2812                                    return events;
2813                                }
2814                                self.publish_recv.insert(packet_id);
2815
2816                                if !self.qos2_publish_handled.insert(packet_id) {
2817                                    already_handled = true;
2818                                }
2819                                if self.status == ConnectionStatus::Connected
2820                                    && (self.auto_pub_response || already_handled)
2821                                {
2822                                    pubrec_send = true;
2823                                }
2824                            }
2825                            Qos::AtMostOnce => {
2826                                // No packet ID handling for QoS 0
2827                            }
2828                        }
2829
2830                        // Topic Alias handling
2831                        if packet.topic_name().is_empty() {
2832                            // Extract topic from topic_alias
2833                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2834                                if ta == 0
2835                                    || self.topic_alias_recv.is_none()
2836                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2837                                {
2838                                    self.handle_v5_0_error(
2839                                        MqttError::TopicAliasInvalid,
2840                                        &mut events,
2841                                    );
2842                                    return events;
2843                                }
2844
2845                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2846                                    if let Some(topic_name) = topic_alias_recv.get(ta) {
2847                                        match packet.add_extracted_topic_name(topic_name) {
2848                                            Ok(extracted) => {
2849                                                packet = extracted;
2850                                            }
2851                                            Err(_e) => {
2852                                                error!("topic alias extract failed: {_e}");
2853                                                self.handle_v5_0_error(
2854                                                    MqttError::TopicAliasInvalid,
2855                                                    &mut events,
2856                                                );
2857                                                return events;
2858                                            }
2859                                        }
2860                                    } else {
2861                                        self.handle_v5_0_error(
2862                                            MqttError::TopicAliasInvalid,
2863                                            &mut events,
2864                                        );
2865                                        return events;
2866                                    }
2867                                }
2868                            } else {
2869                                self.handle_v5_0_error(MqttError::TopicAliasInvalid, &mut events);
2870                                return events;
2871                            }
2872                        } else {
2873                            // Topic is not empty, check if topic alias needs to be registered
2874                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2875                                if ta == 0
2876                                    || self.topic_alias_recv.is_none()
2877                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2878                                {
2879                                    self.handle_v5_0_error(
2880                                        MqttError::TopicAliasInvalid,
2881                                        &mut events,
2882                                    );
2883                                    return events;
2884                                }
2885                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2886                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2887                                }
2888                            }
2889                        }
2890
2891                        // Send response packets
2892                        if puback_send {
2893                            let puback = v5_0::GenericPuback::builder()
2894                                .packet_id(packet.packet_id().unwrap())
2895                                .build()
2896                                .unwrap();
2897                            events.extend(self.process_send_v5_0_puback(puback));
2898                        }
2899                        if pubrec_send {
2900                            let pubrec = v5_0::GenericPubrec::builder()
2901                                .packet_id(packet.packet_id().unwrap())
2902                                .build()
2903                                .unwrap();
2904                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2905                        }
2906
2907                        // Refresh PINGREQ receive timer
2908                        events.extend(self.refresh_pingreq_recv());
2909
2910                        // Notify packet received (only if not already handled)
2911                        if !already_handled {
2912                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2913                        }
2914                    }
2915                    Err(e) => {
2916                        if self.status == ConnectionStatus::Connected {
2917                            self.handle_v5_0_error(e, &mut events);
2918                        } else {
2919                            events.push(GenericEvent::NotifyError(e));
2920                        }
2921                    }
2922                }
2923            }
2924            PacketData::Normal(_) => {
2925                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2926            }
2927        }
2928
2929        events
2930    }
2931
2932    fn process_recv_v3_1_1_puback(
2933        &mut self,
2934        raw_packet: RawPacket,
2935    ) -> Vec<GenericEvent<PacketIdType>> {
2936        let mut events = Vec::new();
2937
2938        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2939            Ok((packet, _)) => {
2940                let packet_id = packet.packet_id();
2941                if self.pid_puback.remove(&packet_id) {
2942                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2943                    if self.pid_man.is_used_id(packet_id) {
2944                        self.pid_man.release_id(packet_id);
2945                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2946                    }
2947                    events.extend(self.refresh_pingreq_recv());
2948                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2949                } else {
2950                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
2951                }
2952            }
2953            Err(e) => {
2954                Self::handle_v3_1_1_error(e, &mut events);
2955            }
2956        }
2957
2958        events
2959    }
2960
2961    fn process_recv_v5_0_puback(
2962        &mut self,
2963        raw_packet: RawPacket,
2964    ) -> Vec<GenericEvent<PacketIdType>> {
2965        let mut events = Vec::new();
2966
2967        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2968            Ok((packet, _)) => {
2969                let packet_id = packet.packet_id();
2970                if self.pid_puback.remove(&packet_id) {
2971                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2972                    if self.pid_man.is_used_id(packet_id) {
2973                        self.pid_man.release_id(packet_id);
2974                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2975                    }
2976                    if self.publish_send_max.is_some() {
2977                        self.publish_send_count -= 1;
2978                    }
2979                    events.extend(self.refresh_pingreq_recv());
2980                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2981                } else {
2982                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
2983                }
2984            }
2985            Err(e) => {
2986                self.handle_v5_0_error(e, &mut events);
2987            }
2988        }
2989
2990        events
2991    }
2992
2993    fn process_recv_v3_1_1_pubrec(
2994        &mut self,
2995        raw_packet: RawPacket,
2996    ) -> Vec<GenericEvent<PacketIdType>> {
2997        let mut events = Vec::new();
2998
2999        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3000            Ok((packet, _)) => {
3001                let packet_id = packet.packet_id();
3002                if self.pid_pubrec.remove(&packet_id) {
3003                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
3004                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3005                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
3006                            .packet_id(packet_id)
3007                            .build()
3008                            .unwrap();
3009                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
3010                    }
3011                    events.extend(self.refresh_pingreq_recv());
3012                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3013                } else {
3014                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3015                }
3016            }
3017            Err(e) => {
3018                Self::handle_v3_1_1_error(e, &mut events);
3019            }
3020        }
3021
3022        events
3023    }
3024
3025    fn process_recv_v5_0_pubrec(
3026        &mut self,
3027        raw_packet: RawPacket,
3028    ) -> Vec<GenericEvent<PacketIdType>> {
3029        let mut events = Vec::new();
3030
3031        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3032            Ok((packet, _)) => {
3033                let packet_id = packet.packet_id();
3034                if self.pid_pubrec.remove(&packet_id) {
3035                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3036                    let reason_code = packet.reason_code();
3037                    if reason_code.is_none() || reason_code.unwrap() == PubrecReasonCode::Success {
3038                        if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3039                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3040                                .packet_id(packet_id)
3041                                .build()
3042                                .unwrap();
3043                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3044                        }
3045                    } else {
3046                        if self.pid_man.is_used_id(packet_id) {
3047                            self.pid_man.release_id(packet_id);
3048                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3049                        }
3050                        if self.publish_send_max.is_some() {
3051                            self.publish_send_count -= 1;
3052                        }
3053                    }
3054                    events.extend(self.refresh_pingreq_recv());
3055                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3056                } else {
3057                    self.handle_v5_0_error(
3058                        MqttError::from(DisconnectReasonCode::ProtocolError),
3059                        &mut events,
3060                    );
3061                }
3062            }
3063            Err(e) => {
3064                self.handle_v5_0_error(e, &mut events);
3065            }
3066        }
3067
3068        events
3069    }
3070
3071    fn process_recv_v3_1_1_pubrel(
3072        &mut self,
3073        raw_packet: RawPacket,
3074    ) -> Vec<GenericEvent<PacketIdType>> {
3075        let mut events = Vec::new();
3076
3077        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3078            Ok((packet, _)) => {
3079                let packet_id = packet.packet_id();
3080                self.qos2_publish_handled.remove(&packet_id);
3081                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3082                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3083                        .packet_id(packet_id)
3084                        .build()
3085                        .unwrap();
3086                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3087                }
3088                events.extend(self.refresh_pingreq_recv());
3089                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3090            }
3091            Err(e) => {
3092                Self::handle_v3_1_1_error(e, &mut events);
3093            }
3094        }
3095
3096        events
3097    }
3098
3099    fn process_recv_v5_0_pubrel(
3100        &mut self,
3101        raw_packet: RawPacket,
3102    ) -> Vec<GenericEvent<PacketIdType>> {
3103        let mut events = Vec::new();
3104
3105        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3106            Ok((packet, _)) => {
3107                let packet_id = packet.packet_id();
3108                let removed = self.qos2_publish_handled.remove(&packet_id);
3109                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3110                    if removed {
3111                        let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3112                            .packet_id(packet_id)
3113                            .build()
3114                            .unwrap();
3115                        events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3116                    } else {
3117                        let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3118                            .packet_id(packet_id)
3119                            .reason_code(result_code::PubcompReasonCode::PacketIdentifierNotFound)
3120                            .build()
3121                            .unwrap();
3122                        events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3123                    }
3124                }
3125                events.extend(self.refresh_pingreq_recv());
3126                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3127            }
3128            Err(e) => {
3129                self.handle_v5_0_error(e, &mut events);
3130            }
3131        }
3132
3133        events
3134    }
3135
3136    fn process_recv_v3_1_1_pubcomp(
3137        &mut self,
3138        raw_packet: RawPacket,
3139    ) -> Vec<GenericEvent<PacketIdType>> {
3140        let mut events = Vec::new();
3141
3142        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3143            Ok((packet, _)) => {
3144                let packet_id = packet.packet_id();
3145                if self.pid_pubcomp.remove(&packet_id) {
3146                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3147                    if self.pid_man.is_used_id(packet_id) {
3148                        self.pid_man.release_id(packet_id);
3149                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3150                    }
3151                    events.extend(self.refresh_pingreq_recv());
3152                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3153                } else {
3154                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3155                }
3156            }
3157            Err(e) => {
3158                Self::handle_v3_1_1_error(e, &mut events);
3159            }
3160        }
3161
3162        events
3163    }
3164
3165    fn process_recv_v5_0_pubcomp(
3166        &mut self,
3167        raw_packet: RawPacket,
3168    ) -> Vec<GenericEvent<PacketIdType>> {
3169        let mut events = Vec::new();
3170
3171        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3172            Ok((packet, _)) => {
3173                let packet_id = packet.packet_id();
3174                if self.pid_pubcomp.remove(&packet_id) {
3175                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3176                    if self.pid_man.is_used_id(packet_id) {
3177                        self.pid_man.release_id(packet_id);
3178                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3179                    }
3180                    if self.publish_send_max.is_some() {
3181                        self.publish_send_count -= 1;
3182                    }
3183                    events.extend(self.refresh_pingreq_recv());
3184                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3185                } else {
3186                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3187                }
3188            }
3189            Err(e) => {
3190                self.handle_v5_0_error(e, &mut events);
3191            }
3192        }
3193
3194        events
3195    }
3196
3197    fn process_recv_v3_1_1_subscribe(
3198        &mut self,
3199        raw_packet: RawPacket,
3200    ) -> Vec<GenericEvent<PacketIdType>> {
3201        let mut events = Vec::new();
3202
3203        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3204            Ok((packet, _)) => {
3205                events.extend(self.refresh_pingreq_recv());
3206                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3207            }
3208            Err(e) => {
3209                Self::handle_v3_1_1_error(e, &mut events);
3210            }
3211        }
3212
3213        events
3214    }
3215
3216    fn process_recv_v5_0_subscribe(
3217        &mut self,
3218        raw_packet: RawPacket,
3219    ) -> Vec<GenericEvent<PacketIdType>> {
3220        let mut events = Vec::new();
3221
3222        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3223            Ok((packet, _)) => {
3224                events.extend(self.refresh_pingreq_recv());
3225                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3226            }
3227            Err(e) => {
3228                self.handle_v5_0_error(e, &mut events);
3229            }
3230        }
3231
3232        events
3233    }
3234
3235    fn process_recv_v3_1_1_suback(
3236        &mut self,
3237        raw_packet: RawPacket,
3238    ) -> Vec<GenericEvent<PacketIdType>> {
3239        let mut events = Vec::new();
3240
3241        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3242            Ok((packet, _)) => {
3243                let packet_id = packet.packet_id();
3244                if self.pid_suback.remove(&packet_id) {
3245                    if self.pid_man.is_used_id(packet_id) {
3246                        self.pid_man.release_id(packet_id);
3247                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3248                    }
3249                    events.extend(self.refresh_pingreq_recv());
3250                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3251                } else {
3252                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3253                }
3254            }
3255            Err(e) => {
3256                Self::handle_v3_1_1_error(e, &mut events);
3257            }
3258        }
3259
3260        events
3261    }
3262
3263    fn process_recv_v5_0_suback(
3264        &mut self,
3265        raw_packet: RawPacket,
3266    ) -> Vec<GenericEvent<PacketIdType>> {
3267        let mut events = Vec::new();
3268
3269        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3270            Ok((packet, _)) => {
3271                let packet_id = packet.packet_id();
3272                if self.pid_suback.remove(&packet_id) {
3273                    if self.pid_man.is_used_id(packet_id) {
3274                        self.pid_man.release_id(packet_id);
3275                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3276                    }
3277                    events.extend(self.refresh_pingreq_recv());
3278                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3279                } else {
3280                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3281                }
3282            }
3283            Err(e) => {
3284                self.handle_v5_0_error(e, &mut events);
3285            }
3286        }
3287
3288        events
3289    }
3290
3291    fn process_recv_v3_1_1_unsubscribe(
3292        &mut self,
3293        raw_packet: RawPacket,
3294    ) -> Vec<GenericEvent<PacketIdType>> {
3295        let mut events = Vec::new();
3296
3297        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3298            Ok((packet, _)) => {
3299                events.extend(self.refresh_pingreq_recv());
3300                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3301            }
3302            Err(e) => {
3303                Self::handle_v3_1_1_error(e, &mut events);
3304            }
3305        }
3306
3307        events
3308    }
3309
3310    fn process_recv_v5_0_unsubscribe(
3311        &mut self,
3312        raw_packet: RawPacket,
3313    ) -> Vec<GenericEvent<PacketIdType>> {
3314        let mut events = Vec::new();
3315
3316        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3317            Ok((packet, _)) => {
3318                events.extend(self.refresh_pingreq_recv());
3319                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3320            }
3321            Err(e) => {
3322                self.handle_v5_0_error(e, &mut events);
3323            }
3324        }
3325
3326        events
3327    }
3328
3329    fn process_recv_v3_1_1_unsuback(
3330        &mut self,
3331        raw_packet: RawPacket,
3332    ) -> Vec<GenericEvent<PacketIdType>> {
3333        let mut events = Vec::new();
3334
3335        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3336            Ok((packet, _)) => {
3337                let packet_id = packet.packet_id();
3338                if self.pid_unsuback.remove(&packet_id) {
3339                    if self.pid_man.is_used_id(packet_id) {
3340                        self.pid_man.release_id(packet_id);
3341                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3342                    }
3343                    events.extend(self.refresh_pingreq_recv());
3344                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3345                } else {
3346                    Self::handle_v3_1_1_error(MqttError::ProtocolError, &mut events);
3347                }
3348            }
3349            Err(e) => {
3350                Self::handle_v3_1_1_error(e, &mut events);
3351            }
3352        }
3353
3354        events
3355    }
3356
3357    fn process_recv_v5_0_unsuback(
3358        &mut self,
3359        raw_packet: RawPacket,
3360    ) -> Vec<GenericEvent<PacketIdType>> {
3361        let mut events = Vec::new();
3362
3363        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3364            Ok((packet, _)) => {
3365                let packet_id = packet.packet_id();
3366                if self.pid_unsuback.remove(&packet_id) {
3367                    if self.pid_man.is_used_id(packet_id) {
3368                        self.pid_man.release_id(packet_id);
3369                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3370                    }
3371                    events.extend(self.refresh_pingreq_recv());
3372                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3373                } else {
3374                    self.handle_v5_0_error(MqttError::ProtocolError, &mut events);
3375                }
3376            }
3377            Err(e) => {
3378                self.handle_v5_0_error(e, &mut events);
3379            }
3380        }
3381
3382        events
3383    }
3384
3385    fn process_recv_v3_1_1_pingreq(
3386        &mut self,
3387        raw_packet: RawPacket,
3388    ) -> Vec<GenericEvent<PacketIdType>> {
3389        let mut events = Vec::new();
3390
3391        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3392            Ok((packet, _)) => {
3393                if (Role::IS_SERVER || Role::IS_ANY)
3394                    && !self.is_client
3395                    && self.auto_ping_response
3396                    && self.status == ConnectionStatus::Connected
3397                {
3398                    let pingresp = v3_1_1::Pingresp::new();
3399                    events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3400                }
3401                events.extend(self.refresh_pingreq_recv());
3402                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3403            }
3404            Err(e) => {
3405                Self::handle_v3_1_1_error(e, &mut events);
3406            }
3407        }
3408
3409        events
3410    }
3411
3412    fn process_recv_v5_0_pingreq(
3413        &mut self,
3414        raw_packet: RawPacket,
3415    ) -> Vec<GenericEvent<PacketIdType>> {
3416        let mut events = Vec::new();
3417
3418        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3419            Ok((packet, _)) => {
3420                if (Role::IS_SERVER || Role::IS_ANY)
3421                    && !self.is_client
3422                    && self.auto_ping_response
3423                    && self.status == ConnectionStatus::Connected
3424                {
3425                    let pingresp = v5_0::Pingresp::new();
3426                    events.extend(self.process_send_v5_0_pingresp(pingresp));
3427                }
3428                events.extend(self.refresh_pingreq_recv());
3429                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3430            }
3431            Err(e) => {
3432                self.handle_v5_0_error(e, &mut events);
3433            }
3434        }
3435
3436        events
3437    }
3438
3439    fn process_recv_v3_1_1_pingresp(
3440        &mut self,
3441        raw_packet: RawPacket,
3442    ) -> Vec<GenericEvent<PacketIdType>> {
3443        let mut events = Vec::new();
3444
3445        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3446            Ok((packet, _)) => {
3447                if self.pingresp_recv_set {
3448                    self.pingresp_recv_set = false;
3449                    events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3450                }
3451                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3452            }
3453            Err(e) => {
3454                Self::handle_v3_1_1_error(e, &mut events);
3455            }
3456        }
3457
3458        events
3459    }
3460
3461    fn process_recv_v5_0_pingresp(
3462        &mut self,
3463        raw_packet: RawPacket,
3464    ) -> Vec<GenericEvent<PacketIdType>> {
3465        let mut events = Vec::new();
3466
3467        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3468            Ok((packet, _)) => {
3469                if self.pingresp_recv_set {
3470                    self.pingresp_recv_set = false;
3471                    events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3472                }
3473                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3474            }
3475            Err(e) => {
3476                self.handle_v5_0_error(e, &mut events);
3477            }
3478        }
3479
3480        events
3481    }
3482
3483    fn process_recv_v3_1_1_disconnect(
3484        &mut self,
3485        raw_packet: RawPacket,
3486    ) -> Vec<GenericEvent<PacketIdType>> {
3487        let mut events = Vec::new();
3488
3489        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3490            Ok((packet, _)) => {
3491                self.cancel_timers(&mut events);
3492                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3493            }
3494            Err(e) => {
3495                Self::handle_v3_1_1_error(e, &mut events);
3496            }
3497        }
3498
3499        events
3500    }
3501
3502    fn process_recv_v5_0_disconnect(
3503        &mut self,
3504        raw_packet: RawPacket,
3505    ) -> Vec<GenericEvent<PacketIdType>> {
3506        let mut events = Vec::new();
3507
3508        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3509            Ok((packet, _)) => {
3510                self.cancel_timers(&mut events);
3511                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3512            }
3513            Err(e) => {
3514                self.handle_v5_0_error(e, &mut events);
3515            }
3516        }
3517
3518        events
3519    }
3520
3521    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3522        let mut events = Vec::new();
3523
3524        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3525            Ok((packet, _)) => {
3526                events.extend(self.refresh_pingreq_recv());
3527                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3528            }
3529            Err(e) => {
3530                self.handle_v5_0_error(e, &mut events);
3531            }
3532        }
3533
3534        events
3535    }
3536
3537    fn handle_v3_1_1_error(e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3538        events.push(GenericEvent::RequestClose);
3539        events.push(GenericEvent::NotifyError(e));
3540    }
3541
3542    fn handle_v5_0_error(&mut self, e: MqttError, events: &mut Vec<GenericEvent<PacketIdType>>) {
3543        let disconnect = v5_0::Disconnect::builder()
3544            .reason_code(e.into())
3545            .build()
3546            .unwrap();
3547        events.extend(self.process_send_v5_0_disconnect(disconnect));
3548        events.push(GenericEvent::NotifyError(e));
3549    }
3550
3551    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3552        let mut events = Vec::new();
3553        if self.pingreq_recv_timeout_ms != 0 {
3554            self.pingreq_recv_set = true;
3555            events.push(GenericEvent::RequestTimerReset {
3556                kind: TimerKind::PingreqRecv,
3557                duration_ms: self.pingreq_recv_timeout_ms,
3558            });
3559        }
3560
3561        events
3562    }
3563
3564    /// Cancel timers and collect events instead of calling handlers
3565    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3566        if self.pingreq_send_set {
3567            self.pingreq_send_set = false;
3568            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3569        }
3570        if self.pingreq_recv_set {
3571            self.pingreq_recv_set = false;
3572            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3573        }
3574        if self.pingresp_recv_set {
3575            self.pingresp_recv_set = false;
3576            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3577        }
3578    }
3579
3580    /// Helper function to extract TopicAlias from properties
3581    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3582        for prop in props {
3583            if let Property::TopicAlias(ta) = prop {
3584                return Some(ta.val());
3585            }
3586        }
3587        None
3588    }
3589}
3590
3591// tests
3592
3593#[cfg(test)]
3594mod tests {
3595    use super::*;
3596    use crate::mqtt::connection::version::Version;
3597    use crate::mqtt::packet::TopicAliasSend;
3598    use crate::mqtt::role;
3599
3600    #[test]
3601    fn test_initialize_client_mode() {
3602        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3603
3604        // Initialize in client mode
3605        connection.initialize(true);
3606
3607        // Verify client mode is set
3608        assert!(connection.is_client);
3609        assert_eq!(connection.publish_send_count, 0);
3610        assert!(connection.publish_send_max.is_none());
3611        assert!(connection.publish_recv_max.is_none());
3612        assert!(!connection.need_store);
3613    }
3614
3615    #[test]
3616    fn test_initialize_server_mode() {
3617        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3618
3619        // Initialize in server mode
3620        connection.initialize(false);
3621
3622        // Verify server mode is set
3623        assert!(!connection.is_client);
3624        assert_eq!(connection.publish_send_count, 0);
3625        assert!(connection.publish_send_max.is_none());
3626        assert!(connection.publish_recv_max.is_none());
3627        assert!(!connection.need_store);
3628    }
3629
3630    #[test]
3631    fn test_validate_topic_alias_no_topic_alias_send() {
3632        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3633
3634        // Should return None when topic_alias_send is not configured
3635        let result = connection.validate_topic_alias(Some(1));
3636        assert!(result.is_none());
3637    }
3638
3639    #[test]
3640    fn test_validate_topic_alias_none_input() {
3641        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3642
3643        // Should return None when no topic alias is provided
3644        let result = connection.validate_topic_alias(None);
3645        assert!(result.is_none());
3646    }
3647
3648    #[test]
3649    fn test_validate_topic_alias_range_no_topic_alias_send() {
3650        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3651
3652        // Should return false when topic_alias_send is not configured
3653        let result = connection.validate_topic_alias_range(1);
3654        assert!(!result);
3655    }
3656
3657    #[test]
3658    fn test_validate_topic_alias_range_zero() {
3659        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3660
3661        // Set up topic alias send with max 10
3662        let topic_alias_send = TopicAliasSend::new(10);
3663        connection.topic_alias_send = Some(topic_alias_send);
3664
3665        // Should return false for alias 0
3666        let result = connection.validate_topic_alias_range(0);
3667        assert!(!result);
3668    }
3669
3670    #[test]
3671    fn test_validate_topic_alias_range_over_max() {
3672        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3673
3674        // Set up topic alias send with max 5
3675        let topic_alias_send = TopicAliasSend::new(5);
3676        connection.topic_alias_send = Some(topic_alias_send);
3677
3678        // Should return false for alias > max
3679        let result = connection.validate_topic_alias_range(6);
3680        assert!(!result);
3681    }
3682
3683    #[test]
3684    fn test_validate_topic_alias_range_valid() {
3685        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3686
3687        // Set up topic alias send with max 10
3688        let topic_alias_send = TopicAliasSend::new(10);
3689        connection.topic_alias_send = Some(topic_alias_send);
3690
3691        // Should return true for valid aliases
3692        assert!(connection.validate_topic_alias_range(1));
3693        assert!(connection.validate_topic_alias_range(5));
3694        assert!(connection.validate_topic_alias_range(10));
3695    }
3696
3697    #[test]
3698    fn test_validate_topic_alias_with_registered_alias() {
3699        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3700
3701        // Set up topic alias send with max 10
3702        let mut topic_alias_send = TopicAliasSend::new(10);
3703        topic_alias_send.insert_or_update("test/topic", 5);
3704        connection.topic_alias_send = Some(topic_alias_send);
3705
3706        // Should return the topic name for registered alias
3707        let result = connection.validate_topic_alias(Some(5));
3708        assert_eq!(result, Some("test/topic".to_string()));
3709    }
3710
3711    #[test]
3712    fn test_validate_topic_alias_unregistered_alias() {
3713        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3714
3715        // Set up topic alias send with max 10 but don't register any aliases
3716        let topic_alias_send = TopicAliasSend::new(10);
3717        connection.topic_alias_send = Some(topic_alias_send);
3718
3719        // Should return None for unregistered alias
3720        let result = connection.validate_topic_alias(Some(5));
3721        assert!(result.is_none());
3722    }
3723
3724    #[test]
3725    fn test_validate_maximum_packet_size_within_limit() {
3726        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3727
3728        // Default maximum_packet_size_send is u32::MAX
3729        let result = connection.validate_maximum_packet_size_send(1000);
3730        assert!(result);
3731    }
3732
3733    #[test]
3734    fn test_validate_maximum_packet_size_at_limit() {
3735        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3736
3737        // Set a specific limit
3738        connection.maximum_packet_size_send = 1000;
3739
3740        // Should return true for size equal to limit
3741        let result = connection.validate_maximum_packet_size_send(1000);
3742        assert!(result);
3743    }
3744
3745    #[test]
3746    fn test_validate_maximum_packet_size_over_limit() {
3747        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3748
3749        // Set a specific limit
3750        connection.maximum_packet_size_send = 1000;
3751
3752        // Should return false for size over limit
3753        let result = connection.validate_maximum_packet_size_send(1001);
3754        assert!(!result);
3755    }
3756
3757    #[test]
3758    fn test_validate_maximum_packet_size_zero_limit() {
3759        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3760
3761        // Set limit to 0
3762        connection.maximum_packet_size_send = 0;
3763
3764        // Should return false for any non-zero size
3765        let result = connection.validate_maximum_packet_size_send(1);
3766        assert!(!result);
3767
3768        // Should return true for zero size
3769        let result = connection.validate_maximum_packet_size_send(0);
3770        assert!(result);
3771    }
3772
3773    #[test]
3774    fn test_initialize_clears_state() {
3775        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3776
3777        // Set up some state that should be cleared
3778        connection.publish_send_count = 5;
3779        connection.need_store = true;
3780        connection.pid_suback.insert(123);
3781        connection.pid_unsuback.insert(456);
3782
3783        // Initialize should clear state
3784        connection.initialize(true);
3785
3786        // Verify state is cleared
3787        assert_eq!(connection.publish_send_count, 0);
3788        assert!(!connection.need_store);
3789        assert!(connection.pid_suback.is_empty());
3790        assert!(connection.pid_unsuback.is_empty());
3791        assert!(connection.is_client);
3792    }
3793
3794    #[test]
3795    fn test_remaining_length_to_total_size() {
3796        // Test 1-byte remaining length encoding (0-127)
3797        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3798        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3799
3800        // Test 2-byte remaining length encoding (128-16383)
3801        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3802        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3803
3804        // Test 3-byte remaining length encoding (16384-2097151)
3805        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3806        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3807
3808        // Test 4-byte remaining length encoding (2097152-268435455)
3809        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3810        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3811    }
3812}