mqtt_protocol_core/mqtt/connection/
core.rs

1/**
2 * MIT License
3 *
4 * Copyright (c) 2025 Takatoshi Kondo
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24use alloc::{
25    string::{String, ToString},
26    vec::Vec,
27};
28use core::marker::PhantomData;
29
30use crate::mqtt::common::Cursor;
31use crate::mqtt::common::HashSet;
32use crate::mqtt::connection::event::{GenericEvent, TimerKind};
33use crate::mqtt::connection::GenericStore;
34
35use serde::Serialize;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38enum ConnectionStatus {
39    #[serde(rename = "disconnected")]
40    Disconnected,
41    #[serde(rename = "connecting")]
42    Connecting,
43    #[serde(rename = "connected")]
44    Connected,
45}
46use crate::mqtt::connection::packet_builder::{
47    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
48};
49use crate::mqtt::connection::packet_id_manager::PacketIdManager;
50use crate::mqtt::connection::role;
51use crate::mqtt::connection::role::RoleType;
52use crate::mqtt::connection::sendable::Sendable;
53use crate::mqtt::connection::version::*;
54use crate::mqtt::packet::v3_1_1;
55use crate::mqtt::packet::v5_0;
56use crate::mqtt::packet::GenericPacket;
57use crate::mqtt::packet::GenericStorePacket;
58use crate::mqtt::packet::IsPacketId;
59use crate::mqtt::packet::Qos;
60use crate::mqtt::packet::ResponsePacket;
61use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
62use crate::mqtt::prelude::GenericPacketTrait;
63use crate::mqtt::result_code::{
64    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
65};
66
67/// MQTT protocol maximum packet size limit
68/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
69const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
70
71/// Calculate total packet size from remaining length
72///
73/// The total packet size consists of:
74/// - 1 byte for the fixed header
75/// - 1-4 bytes for the remaining length encoding
76/// - The remaining length value itself
77fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
78    let remaining_length_bytes = if remaining_length < 128 {
79        1
80    } else if remaining_length < 16384 {
81        2
82    } else if remaining_length < 2097152 {
83        3
84    } else {
85        4
86    };
87
88    1 + remaining_length_bytes + remaining_length
89}
90
91/// Type alias for Event with u16 packet ID (most common case)
92///
93/// This is a convenience type alias that most applications will use.
94/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
95pub type Event = GenericEvent<u16>;
96
97/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
98///
99/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
100/// It handles MQTT packet processing, state management, and protocol compliance without performing
101/// any actual network I/O operations. Instead, it returns events that the application must handle.
102///
103/// # Type Parameters
104///
105/// * `Role` - The connection role (Client or Server), determining allowed operations
106/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
107///
108/// # Key Features
109///
110/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
111/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
112/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
113/// - **Configurable Behavior**: Supports various configuration options for different use cases
114/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
115///
116/// # Usage
117///
118/// ```ignore
119/// use mqtt_protocol_core::mqtt;
120///
121/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
122///
123/// // Send a packet
124/// let events = connection.send(publish_packet);
125/// for event in events {
126///     match event {
127///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
128///             // Send packet over network
129///         },
130///         // Handle other events...
131///     }
132/// }
133///
134/// // Receive data
135/// let mut cursor = std::io::Cursor::new(received_data);
136/// let events = connection.recv(&mut cursor);
137/// // Process events...
138/// ```
139pub struct GenericConnection<Role, PacketIdType>
140where
141    Role: RoleType,
142    PacketIdType: IsPacketId,
143{
144    _marker: PhantomData<Role>,
145
146    protocol_version: Version,
147
148    pid_man: PacketIdManager<PacketIdType>,
149    pid_suback: HashSet<PacketIdType>,
150    pid_unsuback: HashSet<PacketIdType>,
151    pid_puback: HashSet<PacketIdType>,
152    pid_pubrec: HashSet<PacketIdType>,
153    pid_pubcomp: HashSet<PacketIdType>,
154
155    need_store: bool,
156    // Store for retransmission packets
157    store: GenericStore<PacketIdType>,
158
159    offline_publish: bool,
160    auto_pub_response: bool,
161    auto_ping_response: bool,
162
163    // Auto map topic alias for sending
164    auto_map_topic_alias_send: bool,
165    // Auto replace topic alias for sending
166    auto_replace_topic_alias_send: bool,
167    // Topic alias management for receiving
168    topic_alias_recv: Option<TopicAliasRecv>,
169    // Topic alias management for sending
170    topic_alias_send: Option<TopicAliasSend>,
171
172    publish_send_max: Option<u16>,
173    // Maximum number of concurrent PUBLISH packets for receiving
174    publish_recv_max: Option<u16>,
175    // Maximum number of concurrent PUBLISH packets for sending
176    // Current count of PUBLISH packets being sent
177    publish_send_count: u16,
178
179    // Set of received PUBLISH packets (for flow control)
180    publish_recv: HashSet<PacketIdType>,
181
182    // Maximum packet size for sending
183    maximum_packet_size_send: u32,
184    // Maximum packet size for receiving
185    maximum_packet_size_recv: u32,
186
187    // Connection state
188    status: ConnectionStatus,
189
190    // PINGREQ send interval in milliseconds
191    pingreq_send_interval_ms: Option<u64>,
192    // PINGREQ receive timeout in milliseconds
193    pingreq_recv_timeout_ms: Option<u64>,
194    // PINGRESP receive timeout in milliseconds
195    pingresp_recv_timeout_ms: Option<u64>,
196
197    // QoS2 PUBLISH packet handling state (for duplicate detection)
198    qos2_publish_handled: HashSet<PacketIdType>,
199    // QoS2 PUBLISH packet processing state
200    qos2_publish_processing: HashSet<PacketIdType>,
201
202    // Timer state flags
203    pingreq_send_set: bool,
204    pingreq_recv_set: bool,
205    pingresp_recv_set: bool,
206
207    packet_builder: PacketBuilder,
208    // Client/Server mode flag
209    is_client: bool,
210}
211
212/// Type alias for Connection with u16 packet ID (standard case)
213///
214/// This is the standard Connection type that most applications will use.
215/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
216/// supporting values from 1 to 65535.
217///
218/// For extended scenarios where larger packet ID ranges are needed
219/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
220///
221/// # Type Parameters
222///
223/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
224pub type Connection<Role> = GenericConnection<Role, u16>;
225
226impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
227where
228    Role: RoleType,
229    PacketIdType: IsPacketId,
230{
231    /// Create a new MQTT connection instance
232    ///
233    /// Initializes a new MQTT connection with the specified protocol version.
234    /// The connection starts in a disconnected state and must be activated through
235    /// the connection handshake process (CONNECT/CONNACK).
236    ///
237    /// # Parameters
238    ///
239    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
240    ///
241    /// # Returns
242    ///
243    /// A new `GenericConnection` instance ready for use
244    ///
245    /// # Examples
246    ///
247    /// ```ignore
248    /// use mqtt_protocol_core::mqtt;
249    ///
250    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
251    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
252    /// ```
253    pub fn new(version: Version) -> Self {
254        Self {
255            _marker: PhantomData,
256            protocol_version: version,
257            pid_man: PacketIdManager::new(),
258            pid_suback: HashSet::new(),
259            pid_unsuback: HashSet::new(),
260            pid_puback: HashSet::new(),
261            pid_pubrec: HashSet::new(),
262            pid_pubcomp: HashSet::new(),
263            need_store: false,
264            store: GenericStore::new(),
265            offline_publish: false,
266            auto_pub_response: false,
267            auto_ping_response: false,
268            auto_map_topic_alias_send: false,
269            auto_replace_topic_alias_send: false,
270            topic_alias_recv: None,
271            topic_alias_send: None,
272            publish_send_max: None,
273            publish_recv_max: None,
274            publish_send_count: 0,
275            publish_recv: HashSet::new(),
276            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
277            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
278            status: ConnectionStatus::Disconnected,
279            pingreq_send_interval_ms: None,
280            pingreq_recv_timeout_ms: None,
281            pingresp_recv_timeout_ms: None,
282            qos2_publish_handled: HashSet::new(),
283            qos2_publish_processing: HashSet::new(),
284            pingreq_send_set: false,
285            pingreq_recv_set: false,
286            pingresp_recv_set: false,
287            packet_builder: PacketBuilder::new(),
288            is_client: false,
289        }
290    }
291
292    // public
293
294    /// Send MQTT packet with compile-time role checking (experimental)
295    ///
296    /// This method provides compile-time verification that the packet being sent
297    /// is allowed for the current connection role (Client or Server). It only works
298    /// when the `Role` type parameter is concrete (not generic).
299    ///
300    /// This is an experimental API that may be subject to change. For general use,
301    /// consider using the `send()` method instead.
302    ///
303    /// # Type Parameters
304    ///
305    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
306    ///
307    /// # Parameters
308    ///
309    /// * `packet` - The MQTT packet to send
310    ///
311    /// # Returns
312    ///
313    /// A vector of events that the application must process
314    ///
315    /// # Compile-time Safety
316    ///
317    /// If the packet type is not allowed for the current role, this will result
318    /// in a compile-time error, preventing protocol violations at development time.
319    ///
320    /// # Examples
321    ///
322    /// ```ignore
323    /// use mqtt_protocol_core::mqtt;
324    ///
325    /// // This works for concrete roles
326    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
327    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
328    ///
329    /// // This would cause a compile error
330    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
331    /// ```
332    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
333    where
334        T: Sendable<Role, PacketIdType>,
335    {
336        // dispatch concrete packet or generic packet
337        packet.dispatch_send(self)
338    }
339
340    /// Send MQTT packet with runtime role validation
341    ///
342    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
343    /// and performs runtime validation to ensure the packet is allowed for the current
344    /// connection role. This provides flexibility when the exact packet type is not known
345    /// at compile time.
346    ///
347    /// # Parameters
348    ///
349    /// * `packet` - The MQTT packet to send
350    ///
351    /// # Returns
352    ///
353    /// A vector of events that the application must process. If the packet is not allowed
354    /// for the current role, a `NotifyError` event will be included.
355    ///
356    /// # Validation
357    ///
358    /// The method validates that:
359    /// - The packet type is allowed for the connection role (Client vs Server)
360    /// - Protocol version compatibility
361    /// - Connection state requirements
362    /// - Packet ID management for QoS > 0 packets
363    ///
364    /// # Examples
365    ///
366    /// ```ignore
367    /// use mqtt_protocol_core::mqtt;
368    ///
369    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
370    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
371    ///
372    /// for event in events {
373    ///     match event {
374    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
375    ///             // Send packet over network
376    ///         },
377    ///         mqtt::connection::Event::NotifyError(error) => {
378    ///             // Handle validation errors
379    ///         },
380    ///         // Handle other events...
381    ///     }
382    /// }
383    /// ```
384    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
385        use core::any::TypeId;
386
387        let role_id = TypeId::of::<Role>();
388        let client_id = TypeId::of::<role::Client>();
389        let server_id = TypeId::of::<role::Server>();
390        let any_id = TypeId::of::<role::Any>();
391
392        // Check version compatibility between connection and packet
393        let packet_version = packet.protocol_version();
394
395        // Return error if versions don't match
396        if self.protocol_version != packet_version {
397            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
398        }
399
400        match packet {
401            // CONNECT - Client/Any can send
402            GenericPacket::V3_1_1Connect(p) => {
403                if role_id == client_id || role_id == any_id {
404                    self.process_send_v3_1_1_connect(p)
405                } else {
406                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
407                }
408            }
409            GenericPacket::V5_0Connect(p) => {
410                if role_id == client_id || role_id == any_id {
411                    self.process_send_v5_0_connect(p)
412                } else {
413                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
414                }
415            }
416            // CONNACK - Server/Any can send
417            GenericPacket::V3_1_1Connack(p) => {
418                if role_id == server_id || role_id == any_id {
419                    self.process_send_v3_1_1_connack(p)
420                } else {
421                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
422                }
423            }
424            GenericPacket::V5_0Connack(p) => {
425                if role_id == server_id || role_id == any_id {
426                    self.process_send_v5_0_connack(p)
427                } else {
428                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
429                }
430            }
431            // PUBLISH - Any role can send
432            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
433            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
434            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
435            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
436            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
437            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
438            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
439            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
440            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
441            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
442            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
443            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
444            GenericPacket::V3_1_1Subscribe(p) => {
445                if role_id == client_id || role_id == any_id {
446                    self.process_send_v3_1_1_subscribe(p)
447                } else {
448                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
449                }
450            }
451            GenericPacket::V5_0Subscribe(p) => {
452                if role_id == client_id || role_id == any_id {
453                    self.process_send_v5_0_subscribe(p)
454                } else {
455                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
456                }
457            }
458            GenericPacket::V3_1_1Unsubscribe(p) => {
459                if role_id == client_id || role_id == any_id {
460                    self.process_send_v3_1_1_unsubscribe(p)
461                } else {
462                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
463                }
464            }
465            GenericPacket::V5_0Unsubscribe(p) => {
466                if role_id == client_id || role_id == any_id {
467                    self.process_send_v5_0_unsubscribe(p)
468                } else {
469                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
470                }
471            }
472            // SUBACK/UNSUBACK - Server/Any can send
473            GenericPacket::V3_1_1Suback(p) => {
474                if role_id == server_id || role_id == any_id {
475                    self.process_send_v3_1_1_suback(p)
476                } else {
477                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
478                }
479            }
480            GenericPacket::V5_0Suback(p) => {
481                if role_id == server_id || role_id == any_id {
482                    self.process_send_v5_0_suback(p)
483                } else {
484                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
485                }
486            }
487            GenericPacket::V3_1_1Unsuback(p) => {
488                if role_id == server_id || role_id == any_id {
489                    self.process_send_v3_1_1_unsuback(p)
490                } else {
491                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
492                }
493            }
494            GenericPacket::V5_0Unsuback(p) => {
495                if role_id == server_id || role_id == any_id {
496                    self.process_send_v5_0_unsuback(p)
497                } else {
498                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
499                }
500            }
501            // PINGREQ - Client/Any can send
502            GenericPacket::V3_1_1Pingreq(p) => {
503                if role_id == client_id || role_id == any_id {
504                    self.process_send_v3_1_1_pingreq(p)
505                } else {
506                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
507                }
508            }
509            GenericPacket::V5_0Pingreq(p) => {
510                if role_id == client_id || role_id == any_id {
511                    self.process_send_v5_0_pingreq(p)
512                } else {
513                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
514                }
515            }
516            // PINGRESP - Server/Any can send
517            GenericPacket::V3_1_1Pingresp(p) => {
518                if role_id == server_id || role_id == any_id {
519                    self.process_send_v3_1_1_pingresp(p)
520                } else {
521                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
522                }
523            }
524            GenericPacket::V5_0Pingresp(p) => {
525                if role_id == server_id || role_id == any_id {
526                    self.process_send_v5_0_pingresp(p)
527                } else {
528                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
529                }
530            }
531            // DISCONNECT(v3.1.1) - Client/Any role can send
532            GenericPacket::V3_1_1Disconnect(p) => {
533                if role_id == client_id || role_id == any_id {
534                    self.process_send_v3_1_1_disconnect(p)
535                } else {
536                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
537                }
538            }
539            // DISCONNECT(v5.0) - Any role can send
540            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
541            // AUTH - Any role can send (v5.0 only)
542            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
543        }
544    }
545
546    /// Receive and process incoming MQTT data
547    ///
548    /// This method processes raw bytes received from the network and attempts to
549    /// parse them into MQTT packets. It handles packet fragmentation and can
550    /// process multiple complete packets from a single data buffer.
551    ///
552    /// # Parameters
553    ///
554    /// * `data` - A cursor over the received data bytes. The cursor position will
555    ///   be advanced as data is consumed.
556    ///
557    /// # Returns
558    ///
559    /// A vector of events generated from processing the received data:
560    /// - `NotifyPacketReceived` for successfully parsed packets
561    /// - `NotifyError` for parsing errors or protocol violations
562    /// - Additional events based on packet processing (timers, responses, etc.)
563    ///
564    /// # Behavior
565    ///
566    /// - Handles partial packets (data will be buffered until complete)
567    /// - Processes multiple complete packets in sequence
568    /// - Validates packet structure and protocol compliance
569    /// - Updates internal connection state based on received packets
570    /// - Generates appropriate response events (ACKs, etc.)
571    ///
572    /// # Examples
573    ///
574    /// ```ignore
575    /// use mqtt_protocol_core::mqtt;
576    ///
577    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
578    /// let received_data = [/* network data */];
579    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
580    ///
581    /// let events = connection.recv(&mut cursor);
582    /// for event in events {
583    ///     match event {
584    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
585    ///             // Process received packet
586    ///         },
587    ///         mqtt::connection::Event::NotifyError(error) => {
588    ///             // Handle parsing/protocol errors
589    ///         },
590    ///         // Handle other events...
591    ///     }
592    /// }
593    /// ```
594    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
595        let mut events = Vec::new();
596
597        match self.packet_builder.feed(data) {
598            PacketBuildResult::Complete(raw_packet) => {
599                events.extend(self.process_recv_packet(raw_packet));
600            }
601            PacketBuildResult::Incomplete => {}
602            PacketBuildResult::Error(e) => {
603                self.cancel_timers(&mut events);
604                events.push(GenericEvent::RequestClose);
605                events.push(GenericEvent::NotifyError(e));
606            }
607        }
608
609        events
610    }
611
612    /// Notify that a timer has fired (Event-based API)
613    ///
614    /// This method should be called when the I/O layer detects that a timer has expired.
615    /// It handles the timer event appropriately and returns events that need to be processed.
616    /// Notify that a timer has fired
617    ///
618    /// This method should be called by the application when a previously requested
619    /// timer expires. The connection will take appropriate action based on the timer type.
620    ///
621    /// # Parameters
622    ///
623    /// * `kind` - The type of timer that fired
624    ///
625    /// # Returns
626    ///
627    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
628    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
629        let mut events = Vec::new();
630
631        match kind {
632            TimerKind::PingreqSend => {
633                // Reset timer flag
634                self.pingreq_send_set = false;
635
636                // Send PINGREQ if connected
637                if self.status == ConnectionStatus::Connected {
638                    match self.protocol_version {
639                        Version::V3_1_1 => {
640                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
641                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
642                            }
643                        }
644                        Version::V5_0 => {
645                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
646                                events.extend(self.process_send_v5_0_pingreq(pingreq));
647                            }
648                        }
649                        Version::Undetermined => {
650                            unreachable!("Protocol version should be set before sending PINGREQ");
651                        }
652                    }
653                }
654            }
655            TimerKind::PingreqRecv => {
656                // Reset timer flag
657                self.pingreq_recv_set = false;
658
659                match self.protocol_version {
660                    Version::V3_1_1 => {
661                        // V3.1.1: Close connection
662                        events.push(GenericEvent::RequestClose);
663                    }
664                    Version::V5_0 => {
665                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
666                        if self.status == ConnectionStatus::Connected {
667                            if let Ok(disconnect) = v5_0::Disconnect::builder()
668                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
669                                .build()
670                            {
671                                events.extend(self.process_send_v5_0_disconnect(disconnect));
672                            }
673                        }
674                    }
675                    Version::Undetermined => {
676                        unreachable!("Protocol version should be set before receiving PINGREQ");
677                    }
678                }
679            }
680            TimerKind::PingrespRecv => {
681                // Reset timer flag
682                self.pingresp_recv_set = false;
683
684                match self.protocol_version {
685                    Version::V3_1_1 => {
686                        // V3.1.1: Close connection
687                        events.push(GenericEvent::RequestClose);
688                    }
689                    Version::V5_0 => {
690                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
691                        if self.status == ConnectionStatus::Connected {
692                            if let Ok(disconnect) = v5_0::Disconnect::builder()
693                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
694                                .build()
695                            {
696                                events.extend(self.process_send_v5_0_disconnect(disconnect));
697                            }
698                        }
699                    }
700                    Version::Undetermined => {
701                        unreachable!("Protocol version should be set before receiving PINGRESP");
702                    }
703                }
704            }
705        }
706
707        events
708    }
709
710    /// Notify that the connection has been closed by the I/O layer (Event-based API)
711    ///
712    /// This method should be called when the I/O layer detects that the socket has been closed.
713    /// It updates the internal state appropriately and returns events that need to be processed.
714    /// Notify that the underlying connection has been closed
715    ///
716    /// This method should be called when the network connection is closed,
717    /// either intentionally or due to network issues.
718    ///
719    /// # Returns
720    ///
721    /// Events generated from connection closure processing
722    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
723        let mut events = Vec::new();
724
725        // Reset packet size limits to MQTT protocol maximum
726        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
727        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
728
729        // Set status to disconnected
730        self.status = ConnectionStatus::Disconnected;
731
732        // Clear topic alias management
733        self.topic_alias_send = None;
734        self.topic_alias_recv = None;
735
736        // Release packet IDs for SUBACK
737        for packet_id in self.pid_suback.drain() {
738            if self.pid_man.is_used_id(packet_id) {
739                self.pid_man.release_id(packet_id);
740                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
741            }
742        }
743
744        // Release packet IDs for UNSUBACK
745        for packet_id in self.pid_unsuback.drain() {
746            if self.pid_man.is_used_id(packet_id) {
747                self.pid_man.release_id(packet_id);
748                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
749            }
750        }
751
752        // If not storing session state, clear QoS2 states and release publish-related packet IDs
753        if !self.need_store {
754            self.qos2_publish_processing.clear();
755            self.qos2_publish_handled.clear();
756
757            // Release packet IDs for PUBACK
758            for packet_id in self.pid_puback.drain() {
759                if self.pid_man.is_used_id(packet_id) {
760                    self.pid_man.release_id(packet_id);
761                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
762                }
763            }
764
765            // Release packet IDs for PUBREC
766            for packet_id in self.pid_pubrec.drain() {
767                if self.pid_man.is_used_id(packet_id) {
768                    self.pid_man.release_id(packet_id);
769                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
770                }
771            }
772
773            // Release packet IDs for PUBCOMP
774            for packet_id in self.pid_pubcomp.drain() {
775                if self.pid_man.is_used_id(packet_id) {
776                    self.pid_man.release_id(packet_id);
777                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
778                }
779            }
780        }
781
782        // Cancel all timers
783        self.cancel_timers(&mut events);
784
785        events
786    }
787
788    /// Set the PINGREQ send interval
789    ///
790    /// Sets the interval for sending PINGREQ packets to maintain the connection alive.
791    /// When changed, this may generate timer-related events to update the ping schedule.
792    ///
793    /// # Parameters
794    ///
795    /// * `duration_ms` - The interval in milliseconds between PINGREQ packets
796    ///
797    /// # Returns
798    ///
799    /// Events generated from updating the ping interval
800    pub fn set_pingreq_send_interval(
801        &mut self,
802        duration_ms: u64,
803    ) -> Vec<GenericEvent<PacketIdType>> {
804        let mut events = Vec::new();
805
806        if duration_ms == 0 {
807            self.pingreq_send_interval_ms = None;
808            self.pingreq_send_set = false;
809            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
810        } else {
811            self.pingreq_send_interval_ms = Some(duration_ms);
812            self.pingreq_send_set = true;
813            events.push(GenericEvent::RequestTimerReset {
814                kind: TimerKind::PingreqSend,
815                duration_ms,
816            });
817        }
818
819        events
820    }
821
822    /// Get the remaining capacity for sending PUBLISH packets
823    ///
824    /// Returns the number of additional PUBLISH packets that can be sent
825    /// without exceeding the receive maximum limit.
826    ///
827    /// # Returns
828    ///
829    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
830    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
831        // If publish_recv_max is set, return the remaining capacity
832        self.publish_send_max
833            .map(|max| max.saturating_sub(self.publish_send_count))
834    }
835
836    /// Enable or disable offline publishing
837    ///
838    /// When enabled, PUBLISH packets can be sent even when disconnected.
839    /// They will be queued and sent once the connection is established.
840    ///
841    /// # Parameters
842    ///
843    /// * `enable` - Whether to enable offline publishing
844    pub fn set_offline_publish(&mut self, enable: bool) {
845        self.offline_publish = enable;
846        if self.offline_publish {
847            self.need_store = true;
848        }
849    }
850
851    /// Enable or disable automatic PUBLISH response generation
852    ///
853    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
854    /// are automatically generated for received PUBLISH packets.
855    ///
856    /// # Parameters
857    ///
858    /// * `enable` - Whether to enable automatic responses
859    pub fn set_auto_pub_response(&mut self, enable: bool) {
860        self.auto_pub_response = enable;
861    }
862
863    /// Enable or disable automatic PING response generation
864    ///
865    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
866    ///
867    /// # Parameters
868    ///
869    /// * `enable` - Whether to enable automatic PING responses
870    pub fn set_auto_ping_response(&mut self, enable: bool) {
871        self.auto_ping_response = enable;
872    }
873
874    /// Enable or disable automatic topic alias mapping for outgoing packets
875    ///
876    /// When enabled, the connection will automatically map topics to aliases
877    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
878    /// - Applying existing registered topic aliases when available
879    /// - Allocating new topic aliases for unregistered topics
880    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
881    ///
882    /// # Parameters
883    ///
884    /// * `enable` - Whether to enable automatic topic alias mapping
885    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
886        self.auto_map_topic_alias_send = enable;
887    }
888
889    /// Enable or disable automatic topic alias replacement for outgoing packets
890    ///
891    /// When enabled, the connection will automatically apply existing registered
892    /// topic aliases to outgoing PUBLISH packets when aliases are available.
893    /// This only uses previously registered aliases and does not allocate new ones.
894    ///
895    /// # Parameters
896    ///
897    /// * `enable` - Whether to enable automatic topic alias replacement
898    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
899        self.auto_replace_topic_alias_send = enable;
900    }
901
902    /// Set PINGREQ receive timeout
903    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
904        self.pingresp_recv_timeout_ms = timeout_ms;
905    }
906
907    /// Acquire a new packet ID for outgoing packets
908    ///
909    /// # Returns
910    ///
911    /// A unique packet ID, or an error if none are available
912    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
913        self.pid_man.acquire_unique_id()
914    }
915
916    /// Register a packet ID as in use
917    ///
918    /// Manually registers a specific packet ID as being in use, preventing
919    /// it from being allocated by `acquire_packet_id()`.
920    ///
921    /// # Parameters
922    ///
923    /// * `packet_id` - The packet ID to register as in use
924    ///
925    /// # Returns
926    ///
927    /// `Ok(())` if successful, or an error if the packet ID is already in use
928    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
929        self.pid_man.register_id(packet_id)
930    }
931
932    /// Release packet ID (Event-based API)
933    /// Release a packet ID for reuse
934    ///
935    /// This method releases a packet ID, making it available for future use.
936    /// It also generates a `NotifyPacketIdReleased` event.
937    ///
938    /// # Parameters
939    ///
940    /// * `packet_id` - The packet ID to release
941    ///
942    /// # Returns
943    ///
944    /// Events generated from releasing the packet ID
945    pub fn release_packet_id(
946        &mut self,
947        packet_id: PacketIdType,
948    ) -> Vec<GenericEvent<PacketIdType>> {
949        let mut events = Vec::new();
950
951        if self.pid_man.is_used_id(packet_id) {
952            self.pid_man.release_id(packet_id);
953            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
954        }
955
956        events
957    }
958
959    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
960    ///
961    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
962    /// that have been successfully processed and handled.
963    ///
964    /// # Returns
965    ///
966    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
967    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
968        self.qos2_publish_handled.clone()
969    }
970
971    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
972    ///
973    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
974    /// typically used when resuming a connection from persistent storage.
975    ///
976    /// # Parameters
977    ///
978    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
979    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
980        self.qos2_publish_handled = pids;
981    }
982
983    /// Restore previously stored packets
984    ///
985    /// This method restores packets that were previously stored for persistence,
986    /// typically called when resuming a session.
987    ///
988    /// # Parameters
989    ///
990    /// * `packets` - Vector of packets to restore
991    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
992        for packet in packets {
993            match &packet {
994                GenericStorePacket::V3_1_1Publish(p) => {
995                    // Add to appropriate QoS tracking set
996                    match p.qos() {
997                        Qos::AtLeastOnce => {
998                            self.pid_puback.insert(p.packet_id().unwrap());
999                        }
1000                        Qos::ExactlyOnce => {
1001                            self.pid_pubrec.insert(p.packet_id().unwrap());
1002                        }
1003                        _ => {
1004                            // QoS 0 shouldn't be in store, but handle gracefully
1005                            tracing::warn!("QoS 0 packet found in store, skipping");
1006                            continue;
1007                        }
1008                    }
1009                    // Register packet ID and add to store
1010                    let packet_id = p.packet_id().unwrap();
1011                    if self.pid_man.register_id(packet_id).is_ok() {
1012                        if let Err(e) = self.store.add(packet) {
1013                            tracing::error!("Failed to add packet to store: {:?}", e);
1014                        }
1015                    } else {
1016                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1017                    }
1018                }
1019                GenericStorePacket::V5_0Publish(p) => {
1020                    // Add to appropriate QoS tracking set
1021                    match p.qos() {
1022                        Qos::AtLeastOnce => {
1023                            self.pid_puback.insert(p.packet_id().unwrap());
1024                        }
1025                        Qos::ExactlyOnce => {
1026                            self.pid_pubrec.insert(p.packet_id().unwrap());
1027                        }
1028                        _ => {
1029                            // QoS 0 shouldn't be in store, but handle gracefully
1030                            tracing::warn!("QoS 0 packet found in store, skipping");
1031                            continue;
1032                        }
1033                    }
1034                    // Register packet ID and add to store
1035                    let packet_id = p.packet_id().unwrap();
1036                    if self.pid_man.register_id(packet_id).is_ok() {
1037                        if let Err(e) = self.store.add(packet) {
1038                            tracing::error!("Failed to add packet to store: {:?}", e);
1039                        }
1040                    } else {
1041                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1042                    }
1043                }
1044                GenericStorePacket::V3_1_1Pubrel(p) => {
1045                    // Pubrel packets expect PUBCOMP response
1046                    self.pid_pubcomp.insert(p.packet_id());
1047                    // Register packet ID and add to store
1048                    let packet_id = p.packet_id();
1049                    if self.pid_man.register_id(packet_id).is_ok() {
1050                        if let Err(e) = self.store.add(packet) {
1051                            tracing::error!("Failed to add packet to store: {:?}", e);
1052                        }
1053                    } else {
1054                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1055                    }
1056                }
1057                GenericStorePacket::V5_0Pubrel(p) => {
1058                    // Pubrel packets expect PUBCOMP response
1059                    self.pid_pubcomp.insert(p.packet_id());
1060                    // Register packet ID and add to store
1061                    let packet_id = p.packet_id();
1062                    if self.pid_man.register_id(packet_id).is_ok() {
1063                        if let Err(e) = self.store.add(packet) {
1064                            tracing::error!("Failed to add packet to store: {:?}", e);
1065                        }
1066                    } else {
1067                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1068                    }
1069                }
1070            }
1071        }
1072    }
1073
1074    /// Get stored packets for persistence
1075    ///
1076    /// Returns packets that need to be stored for potential retransmission.
1077    /// This is useful for implementing persistent sessions.
1078    ///
1079    /// # Returns
1080    ///
1081    /// Vector of packets that should be persisted
1082    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1083        self.store.get_stored()
1084    }
1085
1086    /// Get the MQTT protocol version being used
1087    ///
1088    /// # Returns
1089    ///
1090    /// The protocol version (V3_1_1 or V5_0)
1091    pub fn get_protocol_version(&self) -> Version {
1092        self.protocol_version
1093    }
1094
1095    /// Check if a PUBLISH packet is currently being processed
1096    ///
1097    /// # Parameters
1098    ///
1099    /// * `packet_id` - The packet ID to check
1100    ///
1101    /// # Returns
1102    ///
1103    /// True if the packet ID is in use for PUBLISH processing
1104    pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1105        self.qos2_publish_processing.contains(&packet_id)
1106    }
1107
1108    /// Regulate packet for store (remove/resolve topic alias)
1109    ///
1110    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1111    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1112    pub fn regulate_for_store(
1113        &self,
1114        mut packet: v5_0::GenericPublish<PacketIdType>,
1115    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1116        if packet.topic_name().is_empty() {
1117            // Topic is empty, need to resolve from topic alias
1118            if let Some(props) = packet.props() {
1119                if let Some(topic_alias) = Self::get_topic_alias_from_props(props) {
1120                    if let Some(ref topic_alias_send) = self.topic_alias_send {
1121                        if let Some(topic) = topic_alias_send.peek(topic_alias) {
1122                            // Found topic for alias, add topic and remove alias property
1123                            packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1124                        } else {
1125                            return Err(MqttError::PacketNotRegulated);
1126                        }
1127                    } else {
1128                        return Err(MqttError::PacketNotRegulated);
1129                    }
1130                } else {
1131                    return Err(MqttError::PacketNotRegulated);
1132                }
1133            } else {
1134                return Err(MqttError::PacketNotRegulated);
1135            }
1136        } else {
1137            // Topic is not empty, just remove TopicAlias property if present
1138            packet = packet.remove_topic_alias();
1139        }
1140
1141        Ok(packet)
1142    }
1143
1144    // private
1145
1146    /// Initialize connection state based on client/server role
1147    ///
1148    /// Resets all connection-specific state including:
1149    /// - Publish flow control counters and limits
1150    /// - Topic alias management
1151    /// - QoS2 processing state
1152    /// - Packet ID tracking sets
1153    /// - Store requirement flag
1154    ///
1155    /// # Parameters
1156    /// * `is_client` - true for client mode, false for server mode
1157    fn initialize(&mut self, is_client: bool) {
1158        self.publish_send_max = None;
1159        self.publish_recv_max = None;
1160        self.publish_send_count = 0;
1161        self.topic_alias_send = None;
1162        self.topic_alias_recv = None;
1163        self.publish_recv.clear();
1164        self.qos2_publish_processing.clear();
1165        self.need_store = false;
1166        self.pid_suback.clear();
1167        self.pid_unsuback.clear();
1168        self.is_client = is_client;
1169    }
1170
1171    fn clear_store_related(&mut self) {
1172        self.pid_man.clear();
1173        self.pid_puback.clear();
1174        self.pid_pubrec.clear();
1175        self.pid_pubcomp.clear();
1176        self.store.clear();
1177    }
1178
1179    /// Send all stored packets for retransmission
1180    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1181        let mut events = Vec::new();
1182        self.store.for_each(|packet| {
1183            if packet.size() > self.maximum_packet_size_send as usize {
1184                let packet_id = packet.packet_id();
1185                self.pid_man.release_id(packet_id);
1186                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1187                return false; // Remove from store
1188            }
1189            events.push(GenericEvent::RequestSendPacket {
1190                packet: packet.clone().into(),
1191                release_packet_id_if_send_error: None,
1192            });
1193            true // Keep in store
1194        });
1195
1196        events
1197    }
1198
1199    /// Validate topic alias and return the associated topic name
1200    ///
1201    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1202    /// from the topic alias send manager.
1203    ///
1204    /// # Parameters
1205    /// * `topic_alias_opt` - Optional topic alias value
1206    ///
1207    /// # Returns
1208    /// * `Some(topic_name)` if the topic alias is valid and found
1209    /// * `None` if the topic alias is invalid, not provided, or not found
1210    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1211        let topic_alias = topic_alias_opt?;
1212
1213        if !self.validate_topic_alias_range(topic_alias) {
1214            return None;
1215        }
1216
1217        let topic_alias_send = self.topic_alias_send.as_mut()?;
1218        // LRU updated here
1219        let topic = topic_alias_send.get(topic_alias)?;
1220
1221        Some(topic.to_string())
1222    }
1223
1224    /// Validate that topic alias is within the allowed range
1225    ///
1226    /// Checks if the topic alias value is valid according to the configured
1227    /// topic alias maximum for sending.
1228    ///
1229    /// # Parameters
1230    /// * `topic_alias` - Topic alias value to validate
1231    ///
1232    /// # Returns
1233    /// * `true` if the topic alias is within valid range
1234    /// * `false` if invalid or topic alias sending is not configured
1235    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1236        let topic_alias_send = match &self.topic_alias_send {
1237            Some(tas) => tas,
1238            None => {
1239                tracing::error!("topic_alias is set but topic_alias_maximum is 0");
1240                return false;
1241            }
1242        };
1243
1244        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1245            tracing::error!("topic_alias is set but out of range");
1246            return false;
1247        }
1248
1249        true
1250    }
1251
1252    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1253    pub(crate) fn process_send_v3_1_1_connect(
1254        &mut self,
1255        packet: v3_1_1::Connect,
1256    ) -> Vec<GenericEvent<PacketIdType>> {
1257        if self.status != ConnectionStatus::Disconnected {
1258            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1259        }
1260        if !self.validate_maximum_packet_size_send(packet.size()) {
1261            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1262        }
1263
1264        let mut events = Vec::new();
1265        self.initialize(true);
1266        self.status = ConnectionStatus::Connecting;
1267
1268        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1269        let keep_alive = packet.keep_alive();
1270        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1271            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1272        }
1273
1274        // Handle clean_session flag
1275        if packet.clean_start() {
1276            self.clear_store_related();
1277        } else {
1278            self.need_store = true;
1279        }
1280
1281        // Clear topic alias for sending
1282        self.topic_alias_send = None;
1283
1284        events.push(GenericEvent::RequestSendPacket {
1285            packet: packet.into(),
1286            release_packet_id_if_send_error: None,
1287        });
1288        self.send_post_process(&mut events);
1289
1290        events
1291    }
1292
1293    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1294    pub(crate) fn process_send_v5_0_connect(
1295        &mut self,
1296        packet: v5_0::Connect,
1297    ) -> Vec<GenericEvent<PacketIdType>> {
1298        if !self.validate_maximum_packet_size_send(packet.size()) {
1299            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1300        }
1301        if self.status != ConnectionStatus::Disconnected {
1302            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1303        }
1304
1305        let mut events = Vec::new();
1306        self.initialize(true);
1307        self.status = ConnectionStatus::Connecting;
1308
1309        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1310        let keep_alive = packet.keep_alive();
1311        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1312            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1313        }
1314
1315        // Handle clean_start flag
1316        if packet.clean_start() {
1317            self.clear_store_related();
1318        }
1319
1320        // Process properties
1321        for prop in packet.props() {
1322            match prop {
1323                Property::TopicAliasMaximum(val) => {
1324                    if val.val() != 0 {
1325                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1326                    }
1327                }
1328                Property::ReceiveMaximum(val) => {
1329                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1330                    self.publish_recv_max = Some(val.val());
1331                }
1332                Property::MaximumPacketSize(val) => {
1333                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1334                    self.maximum_packet_size_recv = val.val();
1335                }
1336                Property::SessionExpiryInterval(val) => {
1337                    if val.val() != 0 {
1338                        self.need_store = true;
1339                    }
1340                }
1341                _ => {
1342                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1343                }
1344            }
1345        }
1346
1347        events.push(GenericEvent::RequestSendPacket {
1348            packet: packet.into(),
1349            release_packet_id_if_send_error: None,
1350        });
1351        self.send_post_process(&mut events);
1352
1353        events
1354    }
1355
1356    pub(crate) fn process_send_v3_1_1_connack(
1357        &mut self,
1358        packet: v3_1_1::Connack,
1359    ) -> Vec<GenericEvent<PacketIdType>> {
1360        if self.status != ConnectionStatus::Connecting {
1361            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1362        }
1363        let mut events = Vec::new();
1364        if packet.return_code() == ConnectReturnCode::Accepted {
1365            self.status = ConnectionStatus::Connected;
1366        } else {
1367            self.status = ConnectionStatus::Disconnected;
1368        }
1369
1370        events.push(GenericEvent::RequestSendPacket {
1371            packet: packet.into(),
1372            release_packet_id_if_send_error: None,
1373        });
1374        events.extend(self.send_stored());
1375        self.send_post_process(&mut events);
1376
1377        events
1378    }
1379
1380    pub(crate) fn process_send_v5_0_connack(
1381        &mut self,
1382        packet: v5_0::Connack,
1383    ) -> Vec<GenericEvent<PacketIdType>> {
1384        if !self.validate_maximum_packet_size_send(packet.size()) {
1385            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1386        }
1387        if self.status != ConnectionStatus::Connecting {
1388            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1389        }
1390
1391        let mut events = Vec::new();
1392
1393        if packet.reason_code() == ConnectReasonCode::Success {
1394            self.status = ConnectionStatus::Connected;
1395
1396            // Process properties
1397            for prop in packet.props() {
1398                match prop {
1399                    Property::TopicAliasMaximum(val) => {
1400                        if val.val() != 0 {
1401                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1402                        }
1403                    }
1404                    Property::ReceiveMaximum(val) => {
1405                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1406                        self.publish_recv_max = Some(val.val());
1407                    }
1408                    Property::MaximumPacketSize(val) => {
1409                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1410                        self.maximum_packet_size_recv = val.val();
1411                    }
1412                    _ => {
1413                        // Ignore other properties
1414                    }
1415                }
1416            }
1417        } else {
1418            self.status = ConnectionStatus::Disconnected;
1419            self.cancel_timers(&mut events);
1420        }
1421
1422        events.push(GenericEvent::RequestSendPacket {
1423            packet: packet.into(),
1424            release_packet_id_if_send_error: None,
1425        });
1426        events.extend(self.send_stored());
1427        self.send_post_process(&mut events);
1428
1429        events
1430    }
1431
1432    pub(crate) fn process_send_v3_1_1_publish(
1433        &mut self,
1434        packet: v3_1_1::GenericPublish<PacketIdType>,
1435    ) -> Vec<GenericEvent<PacketIdType>> {
1436        let mut events = Vec::new();
1437        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1438
1439        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1440            // Register packet ID for QoS 1 or 2
1441            let packet_id = packet.packet_id().unwrap();
1442            if self.status != ConnectionStatus::Connected
1443                && !self.need_store
1444                && !self.offline_publish
1445            {
1446                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1447                if self.pid_man.is_used_id(packet_id) {
1448                    self.pid_man.release_id(packet_id);
1449                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1450                }
1451                return events;
1452            }
1453            if !self.pid_man.is_used_id(packet_id) {
1454                tracing::error!("packet_id {packet_id} must be acquired or registered");
1455                events.push(GenericEvent::NotifyError(
1456                    MqttError::PacketIdentifierInvalid,
1457                ));
1458                return events;
1459            }
1460            if self.need_store
1461                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1462            {
1463                let store_packet = packet.clone().set_dup(true);
1464                self.store.add(store_packet.try_into().unwrap()).unwrap();
1465            } else {
1466                release_packet_id_if_send_error = Some(packet_id);
1467            }
1468            if packet.qos() == Qos::ExactlyOnce {
1469                self.qos2_publish_processing.insert(packet_id);
1470                self.pid_pubrec.insert(packet_id);
1471            } else {
1472                self.pid_puback.insert(packet_id);
1473            }
1474        } else if self.status != ConnectionStatus::Connected {
1475            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1476            return events;
1477        }
1478
1479        if self.status == ConnectionStatus::Connected {
1480            events.push(GenericEvent::RequestSendPacket {
1481                packet: packet.into(),
1482                release_packet_id_if_send_error,
1483            });
1484        }
1485        self.send_post_process(&mut events);
1486
1487        events
1488    }
1489
1490    pub(crate) fn process_send_v5_0_publish(
1491        &mut self,
1492        mut packet: v5_0::GenericPublish<PacketIdType>,
1493    ) -> Vec<GenericEvent<PacketIdType>> {
1494        if !self.validate_maximum_packet_size_send(packet.size()) {
1495            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1496        }
1497
1498        let mut events = Vec::new();
1499        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1500        let mut topic_alias_validated = false;
1501        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1502            let packet_id = packet.packet_id().unwrap();
1503            if self.status != ConnectionStatus::Connected
1504                && !self.need_store
1505                && !self.offline_publish
1506            {
1507                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1508                if self.pid_man.is_used_id(packet_id) {
1509                    self.pid_man.release_id(packet_id);
1510                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1511                }
1512                return events;
1513            }
1514
1515            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1516            if !self.pid_man.is_used_id(packet_id) {
1517                tracing::error!("packet_id {packet_id} must be acquired or registered");
1518                events.push(GenericEvent::NotifyError(
1519                    MqttError::PacketIdentifierInvalid,
1520                ));
1521                return events;
1522            }
1523
1524            if self.need_store
1525                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1526            {
1527                let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1528                if packet.topic_name().is_empty() {
1529                    // Topic name is empty, must validate topic alias
1530                    let topic_opt = self.validate_topic_alias(ta_opt);
1531                    if topic_opt.is_none() {
1532                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1533                        if self.pid_man.is_used_id(packet_id) {
1534                            self.pid_man.release_id(packet_id);
1535                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1536                        }
1537                        return events;
1538                    }
1539                    topic_alias_validated = true;
1540                    let store_packet = packet
1541                        .clone()
1542                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1543                        .unwrap()
1544                        .set_dup(true);
1545                    // TBD validate_maximum_packet_size(store_packet.size());
1546                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1547                } else {
1548                    // Topic name is not empty, remove topic alias if present
1549                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1550                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1551                }
1552            } else {
1553                release_packet_id_if_send_error = Some(packet_id);
1554            }
1555            if packet.qos() == Qos::ExactlyOnce {
1556                self.qos2_publish_processing.insert(packet_id);
1557                self.pid_pubrec.insert(packet_id);
1558            } else {
1559                self.pid_puback.insert(packet_id);
1560            }
1561        } else if self.status != ConnectionStatus::Connected {
1562            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1563            return events;
1564        }
1565
1566        let packet_id_opt = packet.packet_id();
1567        let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1568        if packet.topic_name().is_empty() {
1569            // process manually provided TopicAlias
1570            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1571                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1572                if let Some(packet_id) = packet_id_opt {
1573                    if self.pid_man.is_used_id(packet_id) {
1574                        self.pid_man.release_id(packet_id);
1575                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1576                    }
1577                }
1578                return events;
1579            }
1580        } else if let Some(ta) = ta_opt {
1581            // Topic alias is provided
1582            if self.validate_topic_alias_range(ta) {
1583                tracing::trace!(
1584                    "topic alias: {} - {} is registered.",
1585                    packet.topic_name(),
1586                    ta
1587                );
1588                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589                    topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590                }
1591            } else {
1592                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593                if let Some(packet_id) = packet_id_opt {
1594                    if self.pid_man.is_used_id(packet_id) {
1595                        self.pid_man.release_id(packet_id);
1596                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597                    }
1598                }
1599                return events;
1600            }
1601        } else if self.status == ConnectionStatus::Connected {
1602            // process auto applying TopicAlias if the option is enabled
1603            if self.auto_map_topic_alias_send {
1604                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1606                        tracing::trace!(
1607                            "topic alias: {} - {} is found.",
1608                            packet.topic_name(),
1609                            found_ta
1610                        );
1611                        packet = packet.remove_topic_add_topic_alias(found_ta);
1612                    } else {
1613                        let lru_ta = topic_alias_send.get_lru_alias();
1614                        topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1615                        packet = packet.remove_topic_add_topic_alias(lru_ta);
1616                    }
1617                }
1618            } else if self.auto_replace_topic_alias_send {
1619                if let Some(ref topic_alias_send) = self.topic_alias_send {
1620                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1621                        tracing::trace!(
1622                            "topic alias: {} - {} is found.",
1623                            packet.topic_name(),
1624                            found_ta
1625                        );
1626                        packet = packet.remove_topic_add_topic_alias(found_ta);
1627                    }
1628                }
1629            }
1630        }
1631
1632        // Check receive_maximum for sending (QoS 1 and 2 packets)
1633        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1634            if let Some(max) = self.publish_send_max {
1635                if self.publish_send_count == max {
1636                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1637                    if let Some(packet_id) = packet_id_opt {
1638                        if self.pid_man.is_used_id(packet_id) {
1639                            self.pid_man.release_id(packet_id);
1640                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1641                        }
1642                    }
1643                    return events;
1644                }
1645                self.publish_send_count += 1;
1646            }
1647        }
1648
1649        if self.status == ConnectionStatus::Connected {
1650            events.push(GenericEvent::RequestSendPacket {
1651                packet: packet.into(),
1652                release_packet_id_if_send_error,
1653            });
1654        }
1655        self.send_post_process(&mut events);
1656
1657        events
1658    }
1659
1660    pub(crate) fn process_send_v3_1_1_puback(
1661        &mut self,
1662        packet: v3_1_1::GenericPuback<PacketIdType>,
1663    ) -> Vec<GenericEvent<PacketIdType>> {
1664        if self.status != ConnectionStatus::Connected {
1665            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1666        }
1667        let mut events = Vec::new();
1668
1669        events.push(GenericEvent::RequestSendPacket {
1670            packet: packet.into(),
1671            release_packet_id_if_send_error: None,
1672        });
1673        self.send_post_process(&mut events);
1674
1675        events
1676    }
1677
1678    pub(crate) fn process_send_v5_0_puback(
1679        &mut self,
1680        packet: v5_0::GenericPuback<PacketIdType>,
1681    ) -> Vec<GenericEvent<PacketIdType>> {
1682        if !self.validate_maximum_packet_size_send(packet.size()) {
1683            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1684        }
1685        if self.status != ConnectionStatus::Connected {
1686            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1687        }
1688
1689        let mut events = Vec::new();
1690        self.publish_recv.remove(&packet.packet_id());
1691
1692        events.push(GenericEvent::RequestSendPacket {
1693            packet: packet.into(),
1694            release_packet_id_if_send_error: None,
1695        });
1696        self.send_post_process(&mut events);
1697
1698        events
1699    }
1700
1701    pub(crate) fn process_send_v3_1_1_pubrec(
1702        &mut self,
1703        packet: v3_1_1::GenericPubrec<PacketIdType>,
1704    ) -> Vec<GenericEvent<PacketIdType>> {
1705        if self.status != ConnectionStatus::Connected {
1706            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1707        }
1708        let mut events = Vec::new();
1709
1710        events.push(GenericEvent::RequestSendPacket {
1711            packet: packet.into(),
1712            release_packet_id_if_send_error: None,
1713        });
1714        self.send_post_process(&mut events);
1715
1716        events
1717    }
1718
1719    pub(crate) fn process_send_v5_0_pubrec(
1720        &mut self,
1721        packet: v5_0::GenericPubrec<PacketIdType>,
1722    ) -> Vec<GenericEvent<PacketIdType>> {
1723        if !self.validate_maximum_packet_size_send(packet.size()) {
1724            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1725        }
1726        if self.status != ConnectionStatus::Connected {
1727            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1728        }
1729
1730        let mut events = Vec::new();
1731        let packet_id = packet.packet_id();
1732
1733        if let Some(rc) = packet.reason_code() {
1734            if rc.is_failure() {
1735                self.publish_recv.remove(&packet_id);
1736                self.qos2_publish_handled.remove(&packet_id);
1737            }
1738        }
1739
1740        events.push(GenericEvent::RequestSendPacket {
1741            packet: packet.into(),
1742            release_packet_id_if_send_error: None,
1743        });
1744        self.send_post_process(&mut events);
1745
1746        events
1747    }
1748
1749    pub(crate) fn process_send_v3_1_1_pubrel(
1750        &mut self,
1751        packet: v3_1_1::GenericPubrel<PacketIdType>,
1752    ) -> Vec<GenericEvent<PacketIdType>> {
1753        if self.status != ConnectionStatus::Connected && !self.need_store {
1754            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1755        }
1756        let mut events = Vec::new();
1757        let packet_id = packet.packet_id();
1758        if !self.pid_man.is_used_id(packet_id) {
1759            tracing::error!("packet_id {packet_id} must be acquired or registered");
1760            events.push(GenericEvent::NotifyError(
1761                MqttError::PacketIdentifierInvalid,
1762            ));
1763            return events;
1764        }
1765        if self.need_store {
1766            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1767        }
1768
1769        if self.status == ConnectionStatus::Connected {
1770            self.pid_pubcomp.insert(packet_id);
1771            events.push(GenericEvent::RequestSendPacket {
1772                packet: packet.into(),
1773                release_packet_id_if_send_error: None,
1774            });
1775        }
1776        self.send_post_process(&mut events);
1777
1778        events
1779    }
1780
1781    pub(crate) fn process_send_v5_0_pubrel(
1782        &mut self,
1783        packet: v5_0::GenericPubrel<PacketIdType>,
1784    ) -> Vec<GenericEvent<PacketIdType>> {
1785        if !self.validate_maximum_packet_size_send(packet.size()) {
1786            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1787        }
1788        if self.status != ConnectionStatus::Connected && !self.need_store {
1789            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1790        }
1791
1792        let mut events = Vec::new();
1793        let packet_id = packet.packet_id();
1794        if !self.pid_man.is_used_id(packet_id) {
1795            tracing::error!("packet_id {packet_id} must be acquired or registered");
1796            events.push(GenericEvent::NotifyError(
1797                MqttError::PacketIdentifierInvalid,
1798            ));
1799            return events;
1800        }
1801        if self.need_store {
1802            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1803        }
1804
1805        if self.status == ConnectionStatus::Connected {
1806            self.pid_pubcomp.insert(packet_id);
1807            events.push(GenericEvent::RequestSendPacket {
1808                packet: packet.into(),
1809                release_packet_id_if_send_error: None,
1810            });
1811        }
1812        self.send_post_process(&mut events);
1813
1814        events
1815    }
1816
1817    pub(crate) fn process_send_v3_1_1_pubcomp(
1818        &mut self,
1819        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1820    ) -> Vec<GenericEvent<PacketIdType>> {
1821        if self.status != ConnectionStatus::Connected {
1822            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1823        }
1824        let mut events = Vec::new();
1825
1826        events.push(GenericEvent::RequestSendPacket {
1827            packet: packet.into(),
1828            release_packet_id_if_send_error: None,
1829        });
1830        self.send_post_process(&mut events);
1831
1832        events
1833    }
1834
1835    pub(crate) fn process_send_v5_0_pubcomp(
1836        &mut self,
1837        packet: v5_0::GenericPubcomp<PacketIdType>,
1838    ) -> Vec<GenericEvent<PacketIdType>> {
1839        if !self.validate_maximum_packet_size_send(packet.size()) {
1840            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1841        }
1842        if self.status != ConnectionStatus::Connected {
1843            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1844        }
1845
1846        let mut events = Vec::new();
1847        self.publish_recv.remove(&packet.packet_id());
1848
1849        events.push(GenericEvent::RequestSendPacket {
1850            packet: packet.into(),
1851            release_packet_id_if_send_error: None,
1852        });
1853        self.send_post_process(&mut events);
1854
1855        events
1856    }
1857
1858    pub(crate) fn process_send_v3_1_1_subscribe(
1859        &mut self,
1860        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1861    ) -> Vec<GenericEvent<PacketIdType>> {
1862        let mut events = Vec::new();
1863        let packet_id = packet.packet_id();
1864        if self.status != ConnectionStatus::Connected {
1865            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1866            if self.pid_man.is_used_id(packet_id) {
1867                self.pid_man.release_id(packet_id);
1868                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1869            }
1870            return events;
1871        }
1872        if !self.pid_man.is_used_id(packet_id) {
1873            tracing::error!("packet_id {packet_id} must be acquired or registered");
1874            events.push(GenericEvent::NotifyError(
1875                MqttError::PacketIdentifierInvalid,
1876            ));
1877            return events;
1878        }
1879        self.pid_suback.insert(packet_id);
1880
1881        events.push(GenericEvent::RequestSendPacket {
1882            packet: packet.into(),
1883            release_packet_id_if_send_error: Some(packet_id),
1884        });
1885        self.send_post_process(&mut events);
1886
1887        events
1888    }
1889
1890    pub(crate) fn process_send_v5_0_subscribe(
1891        &mut self,
1892        packet: v5_0::GenericSubscribe<PacketIdType>,
1893    ) -> Vec<GenericEvent<PacketIdType>> {
1894        if !self.validate_maximum_packet_size_send(packet.size()) {
1895            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1896        }
1897
1898        let mut events = Vec::new();
1899        let packet_id = packet.packet_id();
1900        if self.status != ConnectionStatus::Connected {
1901            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1902            if self.pid_man.is_used_id(packet_id) {
1903                self.pid_man.release_id(packet_id);
1904                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1905            }
1906            return events;
1907        }
1908        if !self.pid_man.is_used_id(packet_id) {
1909            tracing::error!("packet_id {packet_id} must be acquired or registered");
1910            events.push(GenericEvent::NotifyError(
1911                MqttError::PacketIdentifierInvalid,
1912            ));
1913            return events;
1914        }
1915        self.pid_suback.insert(packet_id);
1916
1917        events.push(GenericEvent::RequestSendPacket {
1918            packet: packet.into(),
1919            release_packet_id_if_send_error: Some(packet_id),
1920        });
1921        self.send_post_process(&mut events);
1922
1923        events
1924    }
1925
1926    pub(crate) fn process_send_v3_1_1_suback(
1927        &mut self,
1928        packet: v3_1_1::GenericSuback<PacketIdType>,
1929    ) -> Vec<GenericEvent<PacketIdType>> {
1930        if self.status != ConnectionStatus::Connected {
1931            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1932        }
1933        let mut events = Vec::new();
1934        events.push(GenericEvent::RequestSendPacket {
1935            packet: packet.into(),
1936            release_packet_id_if_send_error: None,
1937        });
1938        self.send_post_process(&mut events);
1939
1940        events
1941    }
1942
1943    pub(crate) fn process_send_v5_0_suback(
1944        &mut self,
1945        packet: v5_0::GenericSuback<PacketIdType>,
1946    ) -> Vec<GenericEvent<PacketIdType>> {
1947        if !self.validate_maximum_packet_size_send(packet.size()) {
1948            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1949        }
1950        if self.status != ConnectionStatus::Connected {
1951            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1952        }
1953
1954        let mut events = Vec::new();
1955        events.push(GenericEvent::RequestSendPacket {
1956            packet: packet.into(),
1957            release_packet_id_if_send_error: None,
1958        });
1959        self.send_post_process(&mut events);
1960
1961        events
1962    }
1963
1964    pub(crate) fn process_send_v3_1_1_unsubscribe(
1965        &mut self,
1966        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1967    ) -> Vec<GenericEvent<PacketIdType>> {
1968        let mut events = Vec::new();
1969        let packet_id = packet.packet_id();
1970        if self.status != ConnectionStatus::Connected {
1971            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1972            if self.pid_man.is_used_id(packet_id) {
1973                self.pid_man.release_id(packet_id);
1974                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1975            }
1976            return events;
1977        }
1978        if !self.pid_man.is_used_id(packet_id) {
1979            tracing::error!("packet_id {packet_id} must be acquired or registered");
1980            events.push(GenericEvent::NotifyError(
1981                MqttError::PacketIdentifierInvalid,
1982            ));
1983            return events;
1984        }
1985        self.pid_unsuback.insert(packet_id);
1986
1987        events.push(GenericEvent::RequestSendPacket {
1988            packet: packet.into(),
1989            release_packet_id_if_send_error: Some(packet_id),
1990        });
1991        self.send_post_process(&mut events);
1992
1993        events
1994    }
1995
1996    pub(crate) fn process_send_v5_0_unsubscribe(
1997        &mut self,
1998        packet: v5_0::GenericUnsubscribe<PacketIdType>,
1999    ) -> Vec<GenericEvent<PacketIdType>> {
2000        if !self.validate_maximum_packet_size_send(packet.size()) {
2001            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2002        }
2003
2004        let mut events = Vec::new();
2005        let packet_id = packet.packet_id();
2006        if self.status != ConnectionStatus::Connected {
2007            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2008            if self.pid_man.is_used_id(packet_id) {
2009                self.pid_man.release_id(packet_id);
2010                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2011            }
2012            return events;
2013        }
2014        if !self.pid_man.is_used_id(packet_id) {
2015            tracing::error!("packet_id {packet_id} must be acquired or registered");
2016            events.push(GenericEvent::NotifyError(
2017                MqttError::PacketIdentifierInvalid,
2018            ));
2019            return events;
2020        }
2021        self.pid_unsuback.insert(packet_id);
2022
2023        events.push(GenericEvent::RequestSendPacket {
2024            packet: packet.into(),
2025            release_packet_id_if_send_error: Some(packet_id),
2026        });
2027        self.send_post_process(&mut events);
2028
2029        events
2030    }
2031
2032    pub(crate) fn process_send_v3_1_1_unsuback(
2033        &mut self,
2034        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2035    ) -> Vec<GenericEvent<PacketIdType>> {
2036        if self.status != ConnectionStatus::Connected {
2037            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2038        }
2039        let mut events = Vec::new();
2040        events.push(GenericEvent::RequestSendPacket {
2041            packet: packet.into(),
2042            release_packet_id_if_send_error: None,
2043        });
2044        self.send_post_process(&mut events);
2045
2046        events
2047    }
2048
2049    pub(crate) fn process_send_v5_0_unsuback(
2050        &mut self,
2051        packet: v5_0::GenericUnsuback<PacketIdType>,
2052    ) -> Vec<GenericEvent<PacketIdType>> {
2053        if !self.validate_maximum_packet_size_send(packet.size()) {
2054            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2055        }
2056        if self.status != ConnectionStatus::Connected {
2057            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2058        }
2059
2060        let mut events = Vec::new();
2061        events.push(GenericEvent::RequestSendPacket {
2062            packet: packet.into(),
2063            release_packet_id_if_send_error: None,
2064        });
2065        self.send_post_process(&mut events);
2066
2067        events
2068    }
2069
2070    pub(crate) fn process_send_v3_1_1_pingreq(
2071        &mut self,
2072        packet: v3_1_1::Pingreq,
2073    ) -> Vec<GenericEvent<PacketIdType>> {
2074        if self.status != ConnectionStatus::Connected {
2075            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076        }
2077        let mut events = Vec::new();
2078        events.push(GenericEvent::RequestSendPacket {
2079            packet: packet.into(),
2080            release_packet_id_if_send_error: None,
2081        });
2082        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2083            self.pingreq_send_set = true;
2084            events.push(GenericEvent::RequestTimerReset {
2085                kind: TimerKind::PingrespRecv,
2086                duration_ms: timeout_ms,
2087            });
2088        }
2089        self.send_post_process(&mut events);
2090
2091        events
2092    }
2093
2094    pub(crate) fn process_send_v5_0_pingreq(
2095        &mut self,
2096        packet: v5_0::Pingreq,
2097    ) -> Vec<GenericEvent<PacketIdType>> {
2098        if !self.validate_maximum_packet_size_send(packet.size()) {
2099            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2100        }
2101        if self.status != ConnectionStatus::Connected {
2102            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2103        }
2104
2105        let mut events = Vec::new();
2106        events.push(GenericEvent::RequestSendPacket {
2107            packet: packet.into(),
2108            release_packet_id_if_send_error: None,
2109        });
2110        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2111            self.pingreq_send_set = true;
2112            events.push(GenericEvent::RequestTimerReset {
2113                kind: TimerKind::PingrespRecv,
2114                duration_ms: timeout_ms,
2115            });
2116        }
2117        self.send_post_process(&mut events);
2118
2119        events
2120    }
2121
2122    pub(crate) fn process_send_v3_1_1_pingresp(
2123        &mut self,
2124        packet: v3_1_1::Pingresp,
2125    ) -> Vec<GenericEvent<PacketIdType>> {
2126        if self.status != ConnectionStatus::Connected {
2127            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2128        }
2129        let mut events = Vec::new();
2130        events.push(GenericEvent::RequestSendPacket {
2131            packet: packet.into(),
2132            release_packet_id_if_send_error: None,
2133        });
2134        self.send_post_process(&mut events);
2135
2136        events
2137    }
2138
2139    pub(crate) fn process_send_v5_0_pingresp(
2140        &mut self,
2141        packet: v5_0::Pingresp,
2142    ) -> Vec<GenericEvent<PacketIdType>> {
2143        if !self.validate_maximum_packet_size_send(packet.size()) {
2144            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2145        }
2146        if self.status != ConnectionStatus::Connected {
2147            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2148        }
2149
2150        let mut events = Vec::new();
2151        events.push(GenericEvent::RequestSendPacket {
2152            packet: packet.into(),
2153            release_packet_id_if_send_error: None,
2154        });
2155        self.send_post_process(&mut events);
2156
2157        events
2158    }
2159
2160    pub(crate) fn process_send_v3_1_1_disconnect(
2161        &mut self,
2162        packet: v3_1_1::Disconnect,
2163    ) -> Vec<GenericEvent<PacketIdType>> {
2164        if self.status != ConnectionStatus::Connected {
2165            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166        }
2167        let mut events = Vec::new();
2168        self.status = ConnectionStatus::Disconnected;
2169        self.cancel_timers(&mut events);
2170        events.push(GenericEvent::RequestSendPacket {
2171            packet: packet.into(),
2172            release_packet_id_if_send_error: None,
2173        });
2174        events.push(GenericEvent::RequestClose);
2175
2176        events
2177    }
2178
2179    pub(crate) fn process_send_v5_0_disconnect(
2180        &mut self,
2181        packet: v5_0::Disconnect,
2182    ) -> Vec<GenericEvent<PacketIdType>> {
2183        if !self.validate_maximum_packet_size_send(packet.size()) {
2184            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2185        }
2186        if self.status != ConnectionStatus::Connected {
2187            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2188        }
2189
2190        let mut events = Vec::new();
2191        self.status = ConnectionStatus::Disconnected;
2192        self.cancel_timers(&mut events);
2193        events.push(GenericEvent::RequestSendPacket {
2194            packet: packet.into(),
2195            release_packet_id_if_send_error: None,
2196        });
2197        events.push(GenericEvent::RequestClose);
2198
2199        events
2200    }
2201
2202    pub(crate) fn process_send_v5_0_auth(
2203        &mut self,
2204        packet: v5_0::Auth,
2205    ) -> Vec<GenericEvent<PacketIdType>> {
2206        if !self.validate_maximum_packet_size_send(packet.size()) {
2207            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2208        }
2209        if self.status == ConnectionStatus::Disconnected {
2210            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2211        }
2212
2213        let mut events = Vec::new();
2214        events.push(GenericEvent::RequestSendPacket {
2215            packet: packet.into(),
2216            release_packet_id_if_send_error: None,
2217        });
2218        self.send_post_process(&mut events);
2219
2220        events
2221    }
2222
2223    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2224        if self.is_client {
2225            if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2226                self.pingreq_send_set = true;
2227                events.push(GenericEvent::RequestTimerReset {
2228                    kind: TimerKind::PingreqSend,
2229                    duration_ms: timeout_ms,
2230                });
2231            }
2232        }
2233    }
2234
2235    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2236        if size > self.maximum_packet_size_send as usize {
2237            tracing::error!("packet size over maximum_packet_size for sending");
2238            return false;
2239        }
2240        true
2241    }
2242
2243    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2244        let mut events = Vec::new();
2245
2246        // packet size limit validation (v3.1.1 is always satisfied)
2247        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2248        if total_size > self.maximum_packet_size_recv {
2249            // This happens only when protocol version is V5.0.
2250            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2251            // If the packet size is over 268434555, feed() return an error.
2252            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2253            // So DISCONNECT packet is the right choice to notify the error.
2254            let disconnect_packet = v5_0::Disconnect::builder()
2255                .reason_code(DisconnectReasonCode::PacketTooLarge)
2256                .build()
2257                .unwrap();
2258            // Send disconnect packet directly without generic constraints
2259            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2260            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2261            return events;
2262        }
2263
2264        let packet_type = raw_packet.packet_type();
2265        let _flags = raw_packet.flags();
2266        match self.protocol_version {
2267            Version::V3_1_1 => {
2268                match packet_type {
2269                    1 => {
2270                        // CONNECT
2271                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2272                    }
2273                    2 => {
2274                        // CONNACK
2275                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2276                    }
2277                    3 => {
2278                        // PUBLISH
2279                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2280                    }
2281                    4 => {
2282                        // PUBACK
2283                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2284                    }
2285                    5 => {
2286                        // PUBREC
2287                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2288                    }
2289                    6 => {
2290                        // PUBREL
2291                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2292                    }
2293                    7 => {
2294                        // PUBCOMP
2295                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2296                    }
2297                    8 => {
2298                        // SUBSCRIBE
2299                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2300                    }
2301                    9 => {
2302                        // SUBACK
2303                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2304                    }
2305                    10 => {
2306                        // UNSUBSCRIBE
2307                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2308                    }
2309                    11 => {
2310                        // UNSUBACK
2311                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2312                    }
2313                    12 => {
2314                        // PINGREQ
2315                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2316                    }
2317                    13 => {
2318                        // PINGRESP
2319                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2320                    }
2321                    14 => {
2322                        // DISCONNECT
2323                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2324                    }
2325                    // invalid packet type
2326                    _ => {
2327                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2328                    }
2329                }
2330            }
2331            Version::V5_0 => {
2332                match packet_type {
2333                    1 => {
2334                        // CONNECT
2335                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2336                    }
2337                    2 => {
2338                        // CONNACK
2339                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2340                    }
2341                    3 => {
2342                        // PUBLISH
2343                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2344                    }
2345                    4 => {
2346                        // PUBACK
2347                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2348                    }
2349                    5 => {
2350                        // PUBREC
2351                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2352                    }
2353                    6 => {
2354                        // PUBREL
2355                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2356                    }
2357                    7 => {
2358                        // PUBCOMP
2359                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2360                    }
2361                    8 => {
2362                        // SUBSCRIBE
2363                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2364                    }
2365                    9 => {
2366                        // SUBACK
2367                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2368                    }
2369                    10 => {
2370                        // UNSUBSCRIBE
2371                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2372                    }
2373                    11 => {
2374                        // UNSUBACK
2375                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2376                    }
2377                    12 => {
2378                        // PINGREQ
2379                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2380                    }
2381                    13 => {
2382                        // PINGRESP
2383                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2384                    }
2385                    14 => {
2386                        // DISCONNECT
2387                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2388                    }
2389                    15 => {
2390                        // AUTH
2391                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2392                    }
2393                    // invalid packet type
2394                    _ => {
2395                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2396                    }
2397                }
2398            }
2399            Version::Undetermined => {
2400                match packet_type {
2401                    1 => {
2402                        // CONNECT
2403                        if raw_packet.remaining_length() < 7 {
2404                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2405                            return events;
2406                        }
2407                        match raw_packet.data_as_slice()[6] {
2408                            // Protocol Version
2409                            4 => {
2410                                self.protocol_version = Version::V3_1_1;
2411                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2412                            }
2413                            5 => {
2414                                self.protocol_version = Version::V5_0;
2415                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2416                            }
2417                            _ => {
2418                                events.push(GenericEvent::NotifyError(
2419                                    MqttError::UnsupportedProtocolVersion,
2420                                ));
2421                            }
2422                        }
2423                    }
2424                    _ => {
2425                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2426                    }
2427                }
2428            }
2429        }
2430
2431        events
2432    }
2433
2434    fn process_recv_v3_1_1_connect(
2435        &mut self,
2436        raw_packet: RawPacket,
2437    ) -> Vec<GenericEvent<PacketIdType>> {
2438        let mut events = Vec::new();
2439        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2440            Ok((packet, _)) => {
2441                if self.status != ConnectionStatus::Disconnected {
2442                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2443                    return events;
2444                }
2445                self.initialize(false);
2446                self.status = ConnectionStatus::Connecting;
2447                if packet.keep_alive() > 0 {
2448                    self.pingreq_recv_timeout_ms =
2449                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2450                }
2451                if packet.clean_session() {
2452                    self.clear_store_related();
2453                } else {
2454                    self.need_store = true;
2455                }
2456                events.extend(self.refresh_pingreq_recv());
2457                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2458            }
2459            Err(e) => {
2460                if self.status == ConnectionStatus::Disconnected {
2461                    self.status = ConnectionStatus::Connecting;
2462                    let rc = match e {
2463                        MqttError::ClientIdentifierNotValid => {
2464                            ConnectReturnCode::IdentifierRejected
2465                        }
2466                        MqttError::BadUserNameOrPassword => {
2467                            ConnectReturnCode::BadUserNameOrPassword
2468                        }
2469                        MqttError::UnsupportedProtocolVersion => {
2470                            ConnectReturnCode::UnacceptableProtocolVersion
2471                        }
2472                        _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2473                    };
2474                    let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2475                    let connack_events = self.process_send_v3_1_1_connack(connack);
2476                    events.extend(connack_events);
2477                } else {
2478                    events.push(GenericEvent::RequestClose);
2479                }
2480                events.push(GenericEvent::NotifyError(e));
2481            }
2482        }
2483
2484        events
2485    }
2486
2487    fn process_recv_v5_0_connect(
2488        &mut self,
2489        raw_packet: RawPacket,
2490    ) -> Vec<GenericEvent<PacketIdType>> {
2491        let mut events = Vec::new();
2492        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2493            Ok((packet, _)) => {
2494                if self.status != ConnectionStatus::Disconnected {
2495                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2496                    return events;
2497                }
2498                self.initialize(false);
2499                self.status = ConnectionStatus::Connecting;
2500                if packet.keep_alive() > 0 {
2501                    self.pingreq_recv_timeout_ms =
2502                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2503                }
2504                if packet.clean_start() {
2505                    self.clear_store_related();
2506                }
2507                packet.props().iter().for_each(|prop| match prop {
2508                    Property::TopicAliasMaximum(p) => {
2509                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2510                    }
2511                    Property::ReceiveMaximum(p) => {
2512                        self.publish_send_max = Some(p.val());
2513                    }
2514                    Property::MaximumPacketSize(p) => {
2515                        self.maximum_packet_size_send = p.val();
2516                    }
2517                    Property::SessionExpiryInterval(p) => {
2518                        if p.val() != 0 {
2519                            self.need_store = true;
2520                        }
2521                    }
2522                    _ => {}
2523                });
2524                events.extend(self.refresh_pingreq_recv());
2525                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2526            }
2527            Err(e) => {
2528                if self.status == ConnectionStatus::Disconnected {
2529                    self.status = ConnectionStatus::Connecting;
2530                    let rc = match e {
2531                        MqttError::ClientIdentifierNotValid => {
2532                            ConnectReasonCode::ClientIdentifierNotValid
2533                        }
2534                        MqttError::BadUserNameOrPassword => {
2535                            ConnectReasonCode::BadAuthenticationMethod
2536                        }
2537                        MqttError::UnsupportedProtocolVersion => {
2538                            ConnectReasonCode::UnsupportedProtocolVersion
2539                        }
2540                        _ => ConnectReasonCode::UnspecifiedError,
2541                    };
2542                    let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2543                    let connack_events = self.process_send_v5_0_connack(connack);
2544                    events.extend(connack_events);
2545                } else {
2546                    let disconnect = v5_0::Disconnect::builder()
2547                        .reason_code(DisconnectReasonCode::ProtocolError)
2548                        .build()
2549                        .unwrap();
2550                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2551                    events.extend(disconnect_events);
2552                }
2553                events.push(GenericEvent::NotifyError(e));
2554            }
2555        }
2556
2557        events
2558    }
2559
2560    fn process_recv_v3_1_1_connack(
2561        &mut self,
2562        raw_packet: RawPacket,
2563    ) -> Vec<GenericEvent<PacketIdType>> {
2564        let mut events = Vec::new();
2565
2566        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2567            Ok((packet, _consumed)) => {
2568                if packet.return_code() == ConnectReturnCode::Accepted {
2569                    self.status = ConnectionStatus::Connected;
2570                    if packet.session_present() {
2571                        events.extend(self.send_stored());
2572                    } else {
2573                        self.clear_store_related();
2574                    }
2575                }
2576                events.push(GenericEvent::NotifyPacketReceived(
2577                    GenericPacket::V3_1_1Connack(packet),
2578                ));
2579            }
2580            Err(e) => {
2581                events.push(GenericEvent::RequestClose);
2582                events.push(GenericEvent::NotifyError(e));
2583            }
2584        }
2585
2586        events
2587    }
2588
2589    fn process_recv_v5_0_connack(
2590        &mut self,
2591        raw_packet: RawPacket,
2592    ) -> Vec<GenericEvent<PacketIdType>> {
2593        let mut events = Vec::new();
2594
2595        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2596            Ok((packet, _consumed)) => {
2597                if packet.reason_code() == ConnectReasonCode::Success {
2598                    self.status = ConnectionStatus::Connected;
2599
2600                    // Process properties
2601                    for prop in packet.props() {
2602                        match prop {
2603                            Property::TopicAliasMaximum(val) => {
2604                                if val.val() > 0 {
2605                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2606                                }
2607                            }
2608                            Property::ReceiveMaximum(val) => {
2609                                assert!(val.val() != 0);
2610                                self.publish_send_max = Some(val.val());
2611                            }
2612                            Property::MaximumPacketSize(val) => {
2613                                assert!(val.val() != 0);
2614                                self.maximum_packet_size_send = val.val();
2615                            }
2616                            Property::ServerKeepAlive(val) => {
2617                                // Set PINGREQ send interval if this is a client
2618                                let timeout_ms = (val.val() as u64) * 1000;
2619                                self.pingreq_send_interval_ms = Some(timeout_ms);
2620                            }
2621                            _ => {
2622                                // Ignore other properties
2623                            }
2624                        }
2625                    }
2626
2627                    if packet.session_present() {
2628                        events.extend(self.send_stored());
2629                    } else {
2630                        self.clear_store_related();
2631                    }
2632                }
2633                events.push(GenericEvent::NotifyPacketReceived(
2634                    GenericPacket::V5_0Connack(packet),
2635                ));
2636            }
2637            Err(e) => {
2638                if self.status == ConnectionStatus::Connected {
2639                    let disconnect = v5_0::Disconnect::builder()
2640                        .reason_code(e.into())
2641                        .build()
2642                        .unwrap();
2643                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2644                    events.extend(disconnect_events);
2645                }
2646                events.push(GenericEvent::NotifyError(e));
2647            }
2648        }
2649
2650        events
2651    }
2652
2653    fn process_recv_v3_1_1_publish(
2654        &mut self,
2655        raw_packet: RawPacket,
2656    ) -> Vec<GenericEvent<PacketIdType>> {
2657        let mut events = Vec::new();
2658
2659        let flags = raw_packet.flags();
2660        match &raw_packet.data {
2661            PacketData::Publish(arc) => {
2662                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2663                    Ok((packet, _consumed)) => {
2664                        match packet.qos() {
2665                            Qos::AtMostOnce => {
2666                                events.extend(self.refresh_pingreq_recv());
2667                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2668                            }
2669                            Qos::AtLeastOnce => {
2670                                let packet_id = packet.packet_id().unwrap();
2671                                if self.status == ConnectionStatus::Connected
2672                                    && self.auto_pub_response
2673                                {
2674                                    // Send PUBACK automatically
2675                                    let puback = v3_1_1::GenericPuback::builder()
2676                                        .packet_id(packet_id)
2677                                        .build()
2678                                        .unwrap();
2679                                    events.extend(self.process_send_v3_1_1_puback(puback));
2680                                }
2681                                events.extend(self.refresh_pingreq_recv());
2682                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2683                            }
2684                            Qos::ExactlyOnce => {
2685                                let packet_id = packet.packet_id().unwrap();
2686                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2687
2688                                if self.status == ConnectionStatus::Connected
2689                                    && (self.auto_pub_response || already_handled)
2690                                {
2691                                    let pubrec = v3_1_1::GenericPubrec::builder()
2692                                        .packet_id(packet_id)
2693                                        .build()
2694                                        .unwrap();
2695                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2696                                }
2697                                events.extend(self.refresh_pingreq_recv());
2698                                if !already_handled {
2699                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2700                                }
2701                            }
2702                        }
2703                    }
2704                    Err(e) => {
2705                        events.push(GenericEvent::RequestClose);
2706                        events.push(GenericEvent::NotifyError(e));
2707                    }
2708                }
2709            }
2710            PacketData::Normal(_) => {
2711                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2712            }
2713        }
2714
2715        events
2716    }
2717
2718    fn process_recv_v5_0_publish(
2719        &mut self,
2720        raw_packet: RawPacket,
2721    ) -> Vec<GenericEvent<PacketIdType>> {
2722        let mut events = Vec::new();
2723
2724        let flags = raw_packet.flags();
2725        match &raw_packet.data {
2726            PacketData::Publish(arc) => {
2727                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2728                    Ok((packet, _consumed)) => {
2729                        let mut already_handled = false;
2730                        let mut puback_send = false;
2731                        let mut pubrec_send = false;
2732
2733                        match packet.qos() {
2734                            Qos::AtLeastOnce => {
2735                                let packet_id = packet.packet_id().unwrap();
2736                                if let Some(max) = self.publish_recv_max {
2737                                    if self.publish_recv.len() >= max as usize {
2738                                        let disconnect = v5_0::Disconnect::builder()
2739                                            .reason_code(
2740                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2741                                            )
2742                                            .build()
2743                                            .unwrap();
2744                                        events
2745                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2746                                        events.push(GenericEvent::NotifyError(
2747                                            MqttError::ReceiveMaximumExceeded,
2748                                        ));
2749                                        return events;
2750                                    }
2751                                }
2752                                self.publish_recv.insert(packet_id);
2753                                if self.auto_pub_response
2754                                    && self.status == ConnectionStatus::Connected
2755                                {
2756                                    puback_send = true;
2757                                }
2758                            }
2759                            Qos::ExactlyOnce => {
2760                                let packet_id = packet.packet_id().unwrap();
2761                                if let Some(max) = self.publish_recv_max {
2762                                    if self.publish_recv.len() >= max as usize {
2763                                        let disconnect = v5_0::Disconnect::builder()
2764                                            .reason_code(
2765                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2766                                            )
2767                                            .build()
2768                                            .unwrap();
2769                                        events
2770                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2771                                        events.push(GenericEvent::NotifyError(
2772                                            MqttError::ReceiveMaximumExceeded,
2773                                        ));
2774                                        return events;
2775                                    }
2776                                }
2777                                self.publish_recv.insert(packet_id);
2778
2779                                if !self.qos2_publish_handled.insert(packet_id) {
2780                                    already_handled = true;
2781                                }
2782                                if self.status == ConnectionStatus::Connected
2783                                    && (self.auto_pub_response || already_handled)
2784                                {
2785                                    pubrec_send = true;
2786                                }
2787                            }
2788                            Qos::AtMostOnce => {
2789                                // No packet ID handling for QoS 0
2790                            }
2791                        }
2792
2793                        // Topic Alias handling
2794                        if packet.topic_name().is_empty() {
2795                            // Extract topic from topic_alias
2796                            if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2797                                if ta == 0
2798                                    || self.topic_alias_recv.is_none()
2799                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2800                                {
2801                                    let disconnect = v5_0::Disconnect::builder()
2802                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2803                                        .build()
2804                                        .unwrap();
2805                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2806                                    events.push(GenericEvent::NotifyError(
2807                                        MqttError::TopicAliasInvalid,
2808                                    ));
2809                                    return events;
2810                                }
2811
2812                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2813                                    if topic_alias_recv.get(ta).is_none() {
2814                                        let disconnect = v5_0::Disconnect::builder()
2815                                            .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2816                                            .build()
2817                                            .unwrap();
2818                                        events
2819                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2820                                        events.push(GenericEvent::NotifyError(
2821                                            MqttError::TopicAliasInvalid,
2822                                        ));
2823                                        return events;
2824                                    }
2825                                    // Note: In a complete implementation, we would modify the packet
2826                                    // to add the resolved topic. For now, we'll proceed.
2827                                }
2828                            } else {
2829                                let disconnect = v5_0::Disconnect::builder()
2830                                    .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2831                                    .build()
2832                                    .unwrap();
2833                                events.extend(self.process_send_v5_0_disconnect(disconnect));
2834                                events
2835                                    .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2836                                return events;
2837                            }
2838                        } else {
2839                            // Topic is not empty, check if topic alias needs to be registered
2840                            if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2841                                if ta == 0
2842                                    || self.topic_alias_recv.is_none()
2843                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2844                                {
2845                                    let disconnect = v5_0::Disconnect::builder()
2846                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2847                                        .build()
2848                                        .unwrap();
2849                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2850                                    events.push(GenericEvent::NotifyError(
2851                                        MqttError::TopicAliasInvalid,
2852                                    ));
2853                                    return events;
2854                                }
2855                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2856                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2857                                }
2858                            }
2859                        }
2860
2861                        // Send response packets
2862                        if puback_send {
2863                            let puback = v5_0::GenericPuback::builder()
2864                                .packet_id(packet.packet_id().unwrap())
2865                                .build()
2866                                .unwrap();
2867                            events.extend(self.process_send_v5_0_puback(puback));
2868                        }
2869                        if pubrec_send {
2870                            let pubrec = v5_0::GenericPubrec::builder()
2871                                .packet_id(packet.packet_id().unwrap())
2872                                .build()
2873                                .unwrap();
2874                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2875                        }
2876
2877                        // Refresh PINGREQ receive timer
2878                        events.extend(self.refresh_pingreq_recv());
2879
2880                        // Notify packet received (only if not already handled)
2881                        if !already_handled {
2882                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2883                        }
2884                    }
2885                    Err(e) => {
2886                        if self.status == ConnectionStatus::Connected {
2887                            let disconnect = v5_0::Disconnect::builder()
2888                                .reason_code(e.into())
2889                                .build()
2890                                .unwrap();
2891                            let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2892                            events.extend(disconnect_events);
2893                        }
2894                        events.push(GenericEvent::NotifyError(e));
2895                    }
2896                }
2897            }
2898            PacketData::Normal(_) => {
2899                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2900            }
2901        }
2902
2903        events
2904    }
2905
2906    fn process_recv_v3_1_1_puback(
2907        &mut self,
2908        raw_packet: RawPacket,
2909    ) -> Vec<GenericEvent<PacketIdType>> {
2910        let mut events = Vec::new();
2911
2912        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2913            Ok((packet, _)) => {
2914                let packet_id = packet.packet_id();
2915                if self.pid_puback.remove(&packet_id) {
2916                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2917                    if self.pid_man.is_used_id(packet_id) {
2918                        self.pid_man.release_id(packet_id);
2919                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2920                    }
2921                    events.extend(self.refresh_pingreq_recv());
2922                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2923                } else {
2924                    events.push(GenericEvent::RequestClose);
2925                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2926                }
2927            }
2928            Err(e) => {
2929                events.push(GenericEvent::RequestClose);
2930                events.push(GenericEvent::NotifyError(e));
2931            }
2932        }
2933
2934        events
2935    }
2936
2937    fn process_recv_v5_0_puback(
2938        &mut self,
2939        raw_packet: RawPacket,
2940    ) -> Vec<GenericEvent<PacketIdType>> {
2941        let mut events = Vec::new();
2942
2943        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2944            Ok((packet, _)) => {
2945                let packet_id = packet.packet_id();
2946                if self.pid_puback.remove(&packet_id) {
2947                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2948                    if self.pid_man.is_used_id(packet_id) {
2949                        self.pid_man.release_id(packet_id);
2950                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2951                    }
2952                    if self.publish_send_max.is_some() {
2953                        self.publish_send_count -= 1;
2954                    }
2955                    events.extend(self.refresh_pingreq_recv());
2956                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2957                } else {
2958                    let disconnect = v5_0::Disconnect::builder()
2959                        .reason_code(DisconnectReasonCode::ProtocolError)
2960                        .build()
2961                        .unwrap();
2962                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2963                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2964                }
2965            }
2966            Err(e) => {
2967                let disconnect = v5_0::Disconnect::builder()
2968                    .reason_code(DisconnectReasonCode::ProtocolError)
2969                    .build()
2970                    .unwrap();
2971                events.extend(self.process_send_v5_0_disconnect(disconnect));
2972                events.push(GenericEvent::NotifyError(e));
2973            }
2974        }
2975
2976        events
2977    }
2978
2979    fn process_recv_v3_1_1_pubrec(
2980        &mut self,
2981        raw_packet: RawPacket,
2982    ) -> Vec<GenericEvent<PacketIdType>> {
2983        let mut events = Vec::new();
2984
2985        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2986            Ok((packet, _)) => {
2987                let packet_id = packet.packet_id();
2988                if self.pid_pubrec.remove(&packet_id) {
2989                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2990                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2991                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2992                            .packet_id(packet_id)
2993                            .build()
2994                            .unwrap();
2995                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2996                    }
2997                    events.extend(self.refresh_pingreq_recv());
2998                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2999                } else {
3000                    events.push(GenericEvent::RequestClose);
3001                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3002                }
3003            }
3004            Err(e) => {
3005                events.push(GenericEvent::RequestClose);
3006                events.push(GenericEvent::NotifyError(e));
3007            }
3008        }
3009
3010        events
3011    }
3012
3013    fn process_recv_v5_0_pubrec(
3014        &mut self,
3015        raw_packet: RawPacket,
3016    ) -> Vec<GenericEvent<PacketIdType>> {
3017        let mut events = Vec::new();
3018
3019        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3020            Ok((packet, _)) => {
3021                let packet_id = packet.packet_id();
3022                if self.pid_pubrec.remove(&packet_id) {
3023                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3024                    if let Some(reason_code) = packet.reason_code() {
3025                        if reason_code != PubrecReasonCode::Success {
3026                            if self.pid_man.is_used_id(packet_id) {
3027                                self.pid_man.release_id(packet_id);
3028                                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3029                            }
3030                            self.qos2_publish_processing.remove(&packet_id);
3031                            if self.publish_send_max.is_some() {
3032                                self.publish_send_count -= 1;
3033                            }
3034                        } else if self.auto_pub_response
3035                            && self.status == ConnectionStatus::Connected
3036                        {
3037                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3038                                .packet_id(packet_id)
3039                                .build()
3040                                .unwrap();
3041                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3042                        }
3043                    } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3044                        let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3045                            .packet_id(packet_id)
3046                            .build()
3047                            .unwrap();
3048                        events.extend(self.process_send_v5_0_pubrel(pubrel));
3049                    }
3050                    events.extend(self.refresh_pingreq_recv());
3051                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3052                } else {
3053                    let disconnect = v5_0::Disconnect::builder()
3054                        .reason_code(DisconnectReasonCode::ProtocolError)
3055                        .build()
3056                        .unwrap();
3057                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3058                    events.push(GenericEvent::NotifyError(MqttError::from(
3059                        DisconnectReasonCode::ProtocolError,
3060                    )));
3061                }
3062            }
3063            Err(e) => {
3064                let disconnect = v5_0::Disconnect::builder()
3065                    .reason_code(DisconnectReasonCode::ProtocolError)
3066                    .build()
3067                    .unwrap();
3068                events.extend(self.process_send_v5_0_disconnect(disconnect));
3069                events.push(GenericEvent::NotifyError(e));
3070            }
3071        }
3072
3073        events
3074    }
3075
3076    fn process_recv_v3_1_1_pubrel(
3077        &mut self,
3078        raw_packet: RawPacket,
3079    ) -> Vec<GenericEvent<PacketIdType>> {
3080        let mut events = Vec::new();
3081
3082        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3083            Ok((packet, _)) => {
3084                let packet_id = packet.packet_id();
3085                self.qos2_publish_handled.remove(&packet_id);
3086                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3087                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3088                        .packet_id(packet_id)
3089                        .build()
3090                        .unwrap();
3091                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3092                }
3093                events.extend(self.refresh_pingreq_recv());
3094                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3095            }
3096            Err(e) => {
3097                events.push(GenericEvent::RequestClose);
3098                events.push(GenericEvent::NotifyError(e));
3099            }
3100        }
3101
3102        events
3103    }
3104
3105    fn process_recv_v5_0_pubrel(
3106        &mut self,
3107        raw_packet: RawPacket,
3108    ) -> Vec<GenericEvent<PacketIdType>> {
3109        let mut events = Vec::new();
3110
3111        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3112            Ok((packet, _)) => {
3113                let packet_id = packet.packet_id();
3114                self.qos2_publish_handled.remove(&packet_id);
3115                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3116                    let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3117                        .packet_id(packet_id)
3118                        .build()
3119                        .unwrap();
3120                    events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3121                }
3122                events.extend(self.refresh_pingreq_recv());
3123                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3124            }
3125            Err(e) => {
3126                let disconnect = v5_0::Disconnect::builder()
3127                    .reason_code(DisconnectReasonCode::ProtocolError)
3128                    .build()
3129                    .unwrap();
3130                events.extend(self.process_send_v5_0_disconnect(disconnect));
3131                events.push(GenericEvent::NotifyError(e));
3132            }
3133        }
3134
3135        events
3136    }
3137
3138    fn process_recv_v3_1_1_pubcomp(
3139        &mut self,
3140        raw_packet: RawPacket,
3141    ) -> Vec<GenericEvent<PacketIdType>> {
3142        let mut events = Vec::new();
3143
3144        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3145            Ok((packet, _)) => {
3146                let packet_id = packet.packet_id();
3147                if self.pid_pubcomp.remove(&packet_id) {
3148                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3149                    if self.pid_man.is_used_id(packet_id) {
3150                        self.pid_man.release_id(packet_id);
3151                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3152                    }
3153                    self.qos2_publish_processing.remove(&packet_id);
3154                    events.extend(self.refresh_pingreq_recv());
3155                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3156                } else {
3157                    events.push(GenericEvent::RequestClose);
3158                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3159                }
3160            }
3161            Err(e) => {
3162                events.push(GenericEvent::RequestClose);
3163                events.push(GenericEvent::NotifyError(e));
3164            }
3165        }
3166
3167        events
3168    }
3169
3170    fn process_recv_v5_0_pubcomp(
3171        &mut self,
3172        raw_packet: RawPacket,
3173    ) -> Vec<GenericEvent<PacketIdType>> {
3174        let mut events = Vec::new();
3175
3176        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3177            Ok((packet, _)) => {
3178                let packet_id = packet.packet_id();
3179                if self.pid_pubcomp.remove(&packet_id) {
3180                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3181                    if self.pid_man.is_used_id(packet_id) {
3182                        self.pid_man.release_id(packet_id);
3183                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3184                    }
3185                    self.qos2_publish_processing.remove(&packet_id);
3186                    if self.publish_send_max.is_some() {
3187                        self.publish_send_count -= 1;
3188                    }
3189                    events.extend(self.refresh_pingreq_recv());
3190                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3191                } else {
3192                    let disconnect = v5_0::Disconnect::builder()
3193                        .reason_code(DisconnectReasonCode::ProtocolError)
3194                        .build()
3195                        .unwrap();
3196                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3197                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3198                }
3199            }
3200            Err(e) => {
3201                let disconnect = v5_0::Disconnect::builder()
3202                    .reason_code(DisconnectReasonCode::ProtocolError)
3203                    .build()
3204                    .unwrap();
3205                events.extend(self.process_send_v5_0_disconnect(disconnect));
3206                events.push(GenericEvent::NotifyError(e));
3207            }
3208        }
3209
3210        events
3211    }
3212
3213    fn process_recv_v3_1_1_subscribe(
3214        &mut self,
3215        raw_packet: RawPacket,
3216    ) -> Vec<GenericEvent<PacketIdType>> {
3217        let mut events = Vec::new();
3218
3219        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3220            Ok((packet, _)) => {
3221                events.extend(self.refresh_pingreq_recv());
3222                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3223            }
3224            Err(e) => {
3225                events.push(GenericEvent::RequestClose);
3226                events.push(GenericEvent::NotifyError(e));
3227            }
3228        }
3229
3230        events
3231    }
3232
3233    fn process_recv_v5_0_subscribe(
3234        &mut self,
3235        raw_packet: RawPacket,
3236    ) -> Vec<GenericEvent<PacketIdType>> {
3237        let mut events = Vec::new();
3238
3239        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3240            Ok((packet, _)) => {
3241                events.extend(self.refresh_pingreq_recv());
3242                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3243            }
3244            Err(e) => {
3245                let disconnect = v5_0::Disconnect::builder()
3246                    .reason_code(DisconnectReasonCode::ProtocolError)
3247                    .build()
3248                    .unwrap();
3249                events.extend(self.process_send_v5_0_disconnect(disconnect));
3250                events.push(GenericEvent::NotifyError(e));
3251            }
3252        }
3253
3254        events
3255    }
3256
3257    fn process_recv_v3_1_1_suback(
3258        &mut self,
3259        raw_packet: RawPacket,
3260    ) -> Vec<GenericEvent<PacketIdType>> {
3261        let mut events = Vec::new();
3262
3263        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3264            Ok((packet, _)) => {
3265                let packet_id = packet.packet_id();
3266                if self.pid_suback.remove(&packet_id) {
3267                    if self.pid_man.is_used_id(packet_id) {
3268                        self.pid_man.release_id(packet_id);
3269                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3270                    }
3271                    events.extend(self.refresh_pingreq_recv());
3272                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3273                } else {
3274                    events.push(GenericEvent::RequestClose);
3275                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3276                }
3277            }
3278            Err(e) => {
3279                events.push(GenericEvent::RequestClose);
3280                events.push(GenericEvent::NotifyError(e));
3281            }
3282        }
3283
3284        events
3285    }
3286
3287    fn process_recv_v5_0_suback(
3288        &mut self,
3289        raw_packet: RawPacket,
3290    ) -> Vec<GenericEvent<PacketIdType>> {
3291        let mut events = Vec::new();
3292
3293        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3294            Ok((packet, _)) => {
3295                let packet_id = packet.packet_id();
3296                if self.pid_suback.remove(&packet_id) {
3297                    if self.pid_man.is_used_id(packet_id) {
3298                        self.pid_man.release_id(packet_id);
3299                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3300                    }
3301                    events.extend(self.refresh_pingreq_recv());
3302                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3303                } else {
3304                    let disconnect = v5_0::Disconnect::builder()
3305                        .reason_code(DisconnectReasonCode::ProtocolError)
3306                        .build()
3307                        .unwrap();
3308                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3309                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3310                }
3311            }
3312            Err(e) => {
3313                let disconnect = v5_0::Disconnect::builder()
3314                    .reason_code(DisconnectReasonCode::ProtocolError)
3315                    .build()
3316                    .unwrap();
3317                events.extend(self.process_send_v5_0_disconnect(disconnect));
3318                events.push(GenericEvent::NotifyError(e));
3319            }
3320        }
3321
3322        events
3323    }
3324
3325    fn process_recv_v3_1_1_unsubscribe(
3326        &mut self,
3327        raw_packet: RawPacket,
3328    ) -> Vec<GenericEvent<PacketIdType>> {
3329        let mut events = Vec::new();
3330
3331        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3332            Ok((packet, _)) => {
3333                events.extend(self.refresh_pingreq_recv());
3334                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3335            }
3336            Err(e) => {
3337                events.push(GenericEvent::RequestClose);
3338                events.push(GenericEvent::NotifyError(e));
3339            }
3340        }
3341
3342        events
3343    }
3344
3345    fn process_recv_v5_0_unsubscribe(
3346        &mut self,
3347        raw_packet: RawPacket,
3348    ) -> Vec<GenericEvent<PacketIdType>> {
3349        let mut events = Vec::new();
3350
3351        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3352            Ok((packet, _)) => {
3353                events.extend(self.refresh_pingreq_recv());
3354                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3355            }
3356            Err(e) => {
3357                let disconnect = v5_0::Disconnect::builder()
3358                    .reason_code(DisconnectReasonCode::ProtocolError)
3359                    .build()
3360                    .unwrap();
3361                events.extend(self.process_send_v5_0_disconnect(disconnect));
3362                events.push(GenericEvent::NotifyError(e));
3363            }
3364        }
3365
3366        events
3367    }
3368
3369    fn process_recv_v3_1_1_unsuback(
3370        &mut self,
3371        raw_packet: RawPacket,
3372    ) -> Vec<GenericEvent<PacketIdType>> {
3373        let mut events = Vec::new();
3374
3375        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3376            Ok((packet, _)) => {
3377                let packet_id = packet.packet_id();
3378                if self.pid_unsuback.remove(&packet_id) {
3379                    if self.pid_man.is_used_id(packet_id) {
3380                        self.pid_man.release_id(packet_id);
3381                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3382                    }
3383                    events.extend(self.refresh_pingreq_recv());
3384                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3385                } else {
3386                    events.push(GenericEvent::RequestClose);
3387                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3388                }
3389            }
3390            Err(e) => {
3391                events.push(GenericEvent::RequestClose);
3392                events.push(GenericEvent::NotifyError(e));
3393            }
3394        }
3395
3396        events
3397    }
3398
3399    fn process_recv_v5_0_unsuback(
3400        &mut self,
3401        raw_packet: RawPacket,
3402    ) -> Vec<GenericEvent<PacketIdType>> {
3403        let mut events = Vec::new();
3404
3405        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3406            Ok((packet, _)) => {
3407                let packet_id = packet.packet_id();
3408                if self.pid_unsuback.remove(&packet_id) {
3409                    if self.pid_man.is_used_id(packet_id) {
3410                        self.pid_man.release_id(packet_id);
3411                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3412                    }
3413                    events.extend(self.refresh_pingreq_recv());
3414                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3415                } else {
3416                    let disconnect = v5_0::Disconnect::builder()
3417                        .reason_code(DisconnectReasonCode::ProtocolError)
3418                        .build()
3419                        .unwrap();
3420                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3421                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3422                }
3423            }
3424            Err(e) => {
3425                let disconnect = v5_0::Disconnect::builder()
3426                    .reason_code(DisconnectReasonCode::ProtocolError)
3427                    .build()
3428                    .unwrap();
3429                events.extend(self.process_send_v5_0_disconnect(disconnect));
3430                events.push(GenericEvent::NotifyError(e));
3431            }
3432        }
3433
3434        events
3435    }
3436
3437    fn process_recv_v3_1_1_pingreq(
3438        &mut self,
3439        raw_packet: RawPacket,
3440    ) -> Vec<GenericEvent<PacketIdType>> {
3441        let mut events = Vec::new();
3442
3443        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3444            Ok((packet, _)) => {
3445                if (Role::IS_SERVER || Role::IS_ANY)
3446                    && !self.is_client
3447                    && self.auto_ping_response
3448                    && self.status == ConnectionStatus::Connected
3449                {
3450                    let pingresp = v3_1_1::Pingresp::new();
3451                    events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3452                }
3453                events.extend(self.refresh_pingreq_recv());
3454                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3455            }
3456            Err(e) => {
3457                events.push(GenericEvent::RequestClose);
3458                events.push(GenericEvent::NotifyError(e));
3459            }
3460        }
3461
3462        events
3463    }
3464
3465    fn process_recv_v5_0_pingreq(
3466        &mut self,
3467        raw_packet: RawPacket,
3468    ) -> Vec<GenericEvent<PacketIdType>> {
3469        let mut events = Vec::new();
3470
3471        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3472            Ok((packet, _)) => {
3473                if (Role::IS_SERVER || Role::IS_ANY)
3474                    && !self.is_client
3475                    && self.auto_ping_response
3476                    && self.status == ConnectionStatus::Connected
3477                {
3478                    let pingresp = v5_0::Pingresp::new();
3479                    events.extend(self.process_send_v5_0_pingresp(pingresp));
3480                }
3481                events.extend(self.refresh_pingreq_recv());
3482                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3483            }
3484            Err(e) => {
3485                let disconnect = v5_0::Disconnect::builder()
3486                    .reason_code(DisconnectReasonCode::ProtocolError)
3487                    .build()
3488                    .unwrap();
3489                events.extend(self.process_send_v5_0_disconnect(disconnect));
3490                events.push(GenericEvent::NotifyError(e));
3491            }
3492        }
3493
3494        events
3495    }
3496
3497    fn process_recv_v3_1_1_pingresp(
3498        &mut self,
3499        raw_packet: RawPacket,
3500    ) -> Vec<GenericEvent<PacketIdType>> {
3501        let mut events = Vec::new();
3502
3503        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3504            Ok((packet, _)) => {
3505                self.pingresp_recv_set = false;
3506                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3507                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3508            }
3509            Err(e) => {
3510                events.push(GenericEvent::RequestClose);
3511                events.push(GenericEvent::NotifyError(e));
3512            }
3513        }
3514
3515        events
3516    }
3517
3518    fn process_recv_v5_0_pingresp(
3519        &mut self,
3520        raw_packet: RawPacket,
3521    ) -> Vec<GenericEvent<PacketIdType>> {
3522        let mut events = Vec::new();
3523
3524        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3525            Ok((packet, _)) => {
3526                self.pingresp_recv_set = false;
3527                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3528                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3529            }
3530            Err(e) => {
3531                let disconnect = v5_0::Disconnect::builder()
3532                    .reason_code(DisconnectReasonCode::ProtocolError)
3533                    .build()
3534                    .unwrap();
3535                events.extend(self.process_send_v5_0_disconnect(disconnect));
3536                events.push(GenericEvent::NotifyError(e));
3537            }
3538        }
3539
3540        events
3541    }
3542
3543    fn process_recv_v3_1_1_disconnect(
3544        &mut self,
3545        raw_packet: RawPacket,
3546    ) -> Vec<GenericEvent<PacketIdType>> {
3547        let mut events = Vec::new();
3548
3549        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3550            Ok((packet, _)) => {
3551                self.cancel_timers(&mut events);
3552                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3553            }
3554            Err(e) => {
3555                events.push(GenericEvent::RequestClose);
3556                events.push(GenericEvent::NotifyError(e));
3557            }
3558        }
3559
3560        events
3561    }
3562
3563    fn process_recv_v5_0_disconnect(
3564        &mut self,
3565        raw_packet: RawPacket,
3566    ) -> Vec<GenericEvent<PacketIdType>> {
3567        let mut events = Vec::new();
3568
3569        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3570            Ok((packet, _)) => {
3571                self.cancel_timers(&mut events);
3572                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3573            }
3574            Err(e) => {
3575                let disconnect = v5_0::Disconnect::builder()
3576                    .reason_code(DisconnectReasonCode::ProtocolError)
3577                    .build()
3578                    .unwrap();
3579                events.extend(self.process_send_v5_0_disconnect(disconnect));
3580                events.push(GenericEvent::NotifyError(e));
3581            }
3582        }
3583
3584        events
3585    }
3586
3587    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3588        let mut events = Vec::new();
3589
3590        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3591            Ok((packet, _)) => {
3592                events.extend(self.refresh_pingreq_recv());
3593                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3594            }
3595            Err(e) => {
3596                let disconnect = v5_0::Disconnect::builder()
3597                    .reason_code(DisconnectReasonCode::ProtocolError)
3598                    .build()
3599                    .unwrap();
3600                events.extend(self.process_send_v5_0_disconnect(disconnect));
3601                events.push(GenericEvent::NotifyError(e));
3602            }
3603        }
3604
3605        events
3606    }
3607
3608    fn get_topic_alias_from_props_opt(props: &Option<Vec<Property>>) -> Option<u16> {
3609        if let Some(props) = props {
3610            Self::get_topic_alias_from_props(props.as_slice())
3611        } else {
3612            None
3613        }
3614    }
3615
3616    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3617        let mut events = Vec::new();
3618        if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3619            if self.status == ConnectionStatus::Connecting
3620                || self.status == ConnectionStatus::Connected
3621            {
3622                self.pingreq_recv_set = true;
3623                events.push(GenericEvent::RequestTimerReset {
3624                    kind: TimerKind::PingreqRecv,
3625                    duration_ms: timeout_ms,
3626                });
3627            } else {
3628                self.pingreq_recv_set = false;
3629                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3630            }
3631        }
3632
3633        events
3634    }
3635
3636    /// Cancel timers and collect events instead of calling handlers
3637    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3638        if self.pingreq_send_set {
3639            self.pingreq_send_set = false;
3640            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3641        }
3642        if self.pingreq_recv_set {
3643            self.pingreq_recv_set = false;
3644            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3645        }
3646        if self.pingresp_recv_set {
3647            self.pingresp_recv_set = false;
3648            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3649        }
3650    }
3651
3652    /// Helper function to extract TopicAlias from properties
3653    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3654        for prop in props {
3655            if let Property::TopicAlias(ta) = prop {
3656                return Some(ta.val());
3657            }
3658        }
3659        None
3660    }
3661
3662    #[allow(dead_code)]
3663    fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3664        self.pid_man.is_used_id(packet_id)
3665    }
3666}
3667
3668// traits
3669
3670pub trait RecvBehavior<Role, PacketIdType>
3671where
3672    PacketIdType: IsPacketId,
3673{
3674    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3675}
3676
3677// RecvBehavior implementations
3678impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3679    for GenericConnection<role::Client, PacketIdType>
3680where
3681    PacketIdType: IsPacketId,
3682{
3683    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3684        self.recv(data)
3685    }
3686}
3687
3688impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3689    for GenericConnection<role::Server, PacketIdType>
3690where
3691    PacketIdType: IsPacketId,
3692{
3693    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3694        self.recv(data)
3695    }
3696}
3697
3698impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3699    for GenericConnection<role::Any, PacketIdType>
3700where
3701    PacketIdType: IsPacketId,
3702{
3703    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3704        self.recv(data)
3705    }
3706}
3707
3708// tests
3709
3710#[cfg(test)]
3711mod tests {
3712    use super::*;
3713    use crate::mqtt::connection::version::Version;
3714    use crate::mqtt::packet::TopicAliasSend;
3715    use crate::mqtt::role;
3716
3717    #[test]
3718    fn test_initialize_client_mode() {
3719        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3720
3721        // Initialize in client mode
3722        connection.initialize(true);
3723
3724        // Verify client mode is set
3725        assert!(connection.is_client);
3726        assert_eq!(connection.publish_send_count, 0);
3727        assert!(connection.publish_send_max.is_none());
3728        assert!(connection.publish_recv_max.is_none());
3729        assert!(!connection.need_store);
3730    }
3731
3732    #[test]
3733    fn test_initialize_server_mode() {
3734        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3735
3736        // Initialize in server mode
3737        connection.initialize(false);
3738
3739        // Verify server mode is set
3740        assert!(!connection.is_client);
3741        assert_eq!(connection.publish_send_count, 0);
3742        assert!(connection.publish_send_max.is_none());
3743        assert!(connection.publish_recv_max.is_none());
3744        assert!(!connection.need_store);
3745    }
3746
3747    #[test]
3748    fn test_validate_topic_alias_no_topic_alias_send() {
3749        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3750
3751        // Should return None when topic_alias_send is not configured
3752        let result = connection.validate_topic_alias(Some(1));
3753        assert!(result.is_none());
3754    }
3755
3756    #[test]
3757    fn test_validate_topic_alias_none_input() {
3758        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3759
3760        // Should return None when no topic alias is provided
3761        let result = connection.validate_topic_alias(None);
3762        assert!(result.is_none());
3763    }
3764
3765    #[test]
3766    fn test_validate_topic_alias_range_no_topic_alias_send() {
3767        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3768
3769        // Should return false when topic_alias_send is not configured
3770        let result = connection.validate_topic_alias_range(1);
3771        assert!(!result);
3772    }
3773
3774    #[test]
3775    fn test_validate_topic_alias_range_zero() {
3776        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3777
3778        // Set up topic alias send with max 10
3779        let topic_alias_send = TopicAliasSend::new(10);
3780        connection.topic_alias_send = Some(topic_alias_send);
3781
3782        // Should return false for alias 0
3783        let result = connection.validate_topic_alias_range(0);
3784        assert!(!result);
3785    }
3786
3787    #[test]
3788    fn test_validate_topic_alias_range_over_max() {
3789        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3790
3791        // Set up topic alias send with max 5
3792        let topic_alias_send = TopicAliasSend::new(5);
3793        connection.topic_alias_send = Some(topic_alias_send);
3794
3795        // Should return false for alias > max
3796        let result = connection.validate_topic_alias_range(6);
3797        assert!(!result);
3798    }
3799
3800    #[test]
3801    fn test_validate_topic_alias_range_valid() {
3802        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3803
3804        // Set up topic alias send with max 10
3805        let topic_alias_send = TopicAliasSend::new(10);
3806        connection.topic_alias_send = Some(topic_alias_send);
3807
3808        // Should return true for valid aliases
3809        assert!(connection.validate_topic_alias_range(1));
3810        assert!(connection.validate_topic_alias_range(5));
3811        assert!(connection.validate_topic_alias_range(10));
3812    }
3813
3814    #[test]
3815    fn test_validate_topic_alias_with_registered_alias() {
3816        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3817
3818        // Set up topic alias send with max 10
3819        let mut topic_alias_send = TopicAliasSend::new(10);
3820        topic_alias_send.insert_or_update("test/topic", 5);
3821        connection.topic_alias_send = Some(topic_alias_send);
3822
3823        // Should return the topic name for registered alias
3824        let result = connection.validate_topic_alias(Some(5));
3825        assert_eq!(result, Some("test/topic".to_string()));
3826    }
3827
3828    #[test]
3829    fn test_validate_topic_alias_unregistered_alias() {
3830        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3831
3832        // Set up topic alias send with max 10 but don't register any aliases
3833        let topic_alias_send = TopicAliasSend::new(10);
3834        connection.topic_alias_send = Some(topic_alias_send);
3835
3836        // Should return None for unregistered alias
3837        let result = connection.validate_topic_alias(Some(5));
3838        assert!(result.is_none());
3839    }
3840
3841    #[test]
3842    fn test_validate_maximum_packet_size_within_limit() {
3843        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3844
3845        // Default maximum_packet_size_send is u32::MAX
3846        let result = connection.validate_maximum_packet_size_send(1000);
3847        assert!(result);
3848    }
3849
3850    #[test]
3851    fn test_validate_maximum_packet_size_at_limit() {
3852        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3853
3854        // Set a specific limit
3855        connection.maximum_packet_size_send = 1000;
3856
3857        // Should return true for size equal to limit
3858        let result = connection.validate_maximum_packet_size_send(1000);
3859        assert!(result);
3860    }
3861
3862    #[test]
3863    fn test_validate_maximum_packet_size_over_limit() {
3864        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3865
3866        // Set a specific limit
3867        connection.maximum_packet_size_send = 1000;
3868
3869        // Should return false for size over limit
3870        let result = connection.validate_maximum_packet_size_send(1001);
3871        assert!(!result);
3872    }
3873
3874    #[test]
3875    fn test_validate_maximum_packet_size_zero_limit() {
3876        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3877
3878        // Set limit to 0
3879        connection.maximum_packet_size_send = 0;
3880
3881        // Should return false for any non-zero size
3882        let result = connection.validate_maximum_packet_size_send(1);
3883        assert!(!result);
3884
3885        // Should return true for zero size
3886        let result = connection.validate_maximum_packet_size_send(0);
3887        assert!(result);
3888    }
3889
3890    #[test]
3891    fn test_initialize_clears_state() {
3892        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3893
3894        // Set up some state that should be cleared
3895        connection.publish_send_count = 5;
3896        connection.need_store = true;
3897        connection.pid_suback.insert(123);
3898        connection.pid_unsuback.insert(456);
3899
3900        // Initialize should clear state
3901        connection.initialize(true);
3902
3903        // Verify state is cleared
3904        assert_eq!(connection.publish_send_count, 0);
3905        assert!(!connection.need_store);
3906        assert!(connection.pid_suback.is_empty());
3907        assert!(connection.pid_unsuback.is_empty());
3908        assert!(connection.is_client);
3909    }
3910
3911    #[test]
3912    fn test_remaining_length_to_total_size() {
3913        // Test 1-byte remaining length encoding (0-127)
3914        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3915        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3916
3917        // Test 2-byte remaining length encoding (128-16383)
3918        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3919        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3920
3921        // Test 3-byte remaining length encoding (16384-2097151)
3922        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3923        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3924
3925        // Test 4-byte remaining length encoding (2097152-268435455)
3926        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3927        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3928    }
3929}