mqtt_protocol_core/mqtt/connection/
core.rs

1/**
2 * MIT License
3 *
4 * Copyright (c) 2025 Takatoshi Kondo
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24use std::collections::HashSet;
25use std::io::Cursor;
26use std::marker::PhantomData;
27
28use crate::mqtt::connection::GenericStore;
29use crate::mqtt::connection::event::{GenericEvent, TimerKind};
30
31use serde::Serialize;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
34enum ConnectionStatus {
35    #[serde(rename = "disconnected")]
36    Disconnected,
37    #[serde(rename = "connecting")]
38    Connecting,
39    #[serde(rename = "connected")]
40    Connected,
41}
42use crate::mqtt::connection::packet_builder::{
43    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
44};
45use crate::mqtt::connection::role;
46use crate::mqtt::connection::role::RoleType;
47use crate::mqtt::connection::sendable::Sendable;
48use crate::mqtt::packet::GenericPacket;
49use crate::mqtt::packet::GenericStorePacket;
50use crate::mqtt::packet::Qos;
51use crate::mqtt::packet::ResponsePacket;
52use crate::mqtt::packet::v3_1_1;
53use crate::mqtt::packet::v5_0;
54use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
55use crate::mqtt::packet_id_manager::PacketIdManager;
56use crate::mqtt::prelude::GenericPacketTrait;
57use crate::mqtt::result_code::{
58    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
59};
60use crate::mqtt::packet::IsPacketId;
61use crate::mqtt::version::*;
62
63/// MQTT protocol maximum packet size limit
64/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
65const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
66
67/// Calculate total packet size from remaining length
68///
69/// The total packet size consists of:
70/// - 1 byte for the fixed header
71/// - 1-4 bytes for the remaining length encoding
72/// - The remaining length value itself
73fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
74    let remaining_length_bytes = if remaining_length < 128 {
75        1
76    } else if remaining_length < 16384 {
77        2
78    } else if remaining_length < 2097152 {
79        3
80    } else {
81        4
82    };
83
84    1 + remaining_length_bytes + remaining_length
85}
86
87/// Type alias for Event with u16 packet ID (most common case)
88///
89/// This is a convenience type alias that most applications will use.
90/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
91pub type Event = GenericEvent<u16>;
92
93/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
94///
95/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
96/// It handles MQTT packet processing, state management, and protocol compliance without performing
97/// any actual network I/O operations. Instead, it returns events that the application must handle.
98///
99/// # Type Parameters
100///
101/// * `Role` - The connection role (Client or Server), determining allowed operations
102/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
103///
104/// # Key Features
105///
106/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
107/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
108/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
109/// - **Configurable Behavior**: Supports various configuration options for different use cases
110/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
111///
112/// # Usage
113///
114/// ```ignore
115/// use mqtt_protocol_core::mqtt;
116///
117/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
118///
119/// // Send a packet
120/// let events = connection.send(publish_packet);
121/// for event in events {
122///     match event {
123///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
124///             // Send packet over network
125///         },
126///         // Handle other events...
127///     }
128/// }
129///
130/// // Receive data
131/// let mut cursor = std::io::Cursor::new(received_data);
132/// let events = connection.recv(&mut cursor);
133/// // Process events...
134/// ```
135pub struct GenericConnection<Role, PacketIdType>
136where
137    Role: RoleType,
138    PacketIdType: IsPacketId,
139{
140    _marker: PhantomData<Role>,
141
142    protocol_version: Version,
143
144    pid_man: PacketIdManager<PacketIdType>,
145    pid_suback: HashSet<PacketIdType>,
146    pid_unsuback: HashSet<PacketIdType>,
147    pid_puback: HashSet<PacketIdType>,
148    pid_pubrec: HashSet<PacketIdType>,
149    pid_pubcomp: HashSet<PacketIdType>,
150
151    need_store: bool,
152    // Store for retransmission packets
153    store: GenericStore<PacketIdType>,
154
155    offline_publish: bool,
156    auto_pub_response: bool,
157    auto_ping_response: bool,
158
159    // Auto map topic alias for sending
160    auto_map_topic_alias_send: bool,
161    // Auto replace topic alias for sending
162    auto_replace_topic_alias_send: bool,
163    // Topic alias management for receiving
164    topic_alias_recv: Option<TopicAliasRecv>,
165    // Topic alias management for sending
166    topic_alias_send: Option<TopicAliasSend>,
167
168    publish_send_max: Option<u16>,
169    // Maximum number of concurrent PUBLISH packets for receiving
170    publish_recv_max: Option<u16>,
171    // Maximum number of concurrent PUBLISH packets for sending
172    // Current count of PUBLISH packets being sent
173    publish_send_count: u16,
174
175    // Set of received PUBLISH packets (for flow control)
176    publish_recv: HashSet<PacketIdType>,
177
178    // Maximum packet size for sending
179    maximum_packet_size_send: u32,
180    // Maximum packet size for receiving
181    maximum_packet_size_recv: u32,
182
183    // Connection state
184    status: ConnectionStatus,
185
186    // PINGREQ send interval in milliseconds
187    pingreq_send_interval_ms: Option<u64>,
188    // PINGREQ receive timeout in milliseconds
189    pingreq_recv_timeout_ms: Option<u64>,
190    // PINGRESP receive timeout in milliseconds
191    pingresp_recv_timeout_ms: Option<u64>,
192
193    // QoS2 PUBLISH packet handling state (for duplicate detection)
194    qos2_publish_handled: HashSet<PacketIdType>,
195    // QoS2 PUBLISH packet processing state
196    qos2_publish_processing: HashSet<PacketIdType>,
197
198    // Timer state flags
199    pingreq_send_set: bool,
200    pingreq_recv_set: bool,
201    pingresp_recv_set: bool,
202
203    packet_builder: PacketBuilder,
204    // Client/Server mode flag
205    is_client: bool,
206}
207
208/// Type alias for Connection with u16 packet ID (standard case)
209///
210/// This is the standard Connection type that most applications will use.
211/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
212/// supporting values from 1 to 65535.
213///
214/// For extended scenarios where larger packet ID ranges are needed
215/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
216///
217/// # Type Parameters
218///
219/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
220pub type Connection<Role> = GenericConnection<Role, u16>;
221
222impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
223where
224    Role: RoleType,
225    PacketIdType: IsPacketId,
226{
227    /// Create a new MQTT connection instance
228    ///
229    /// Initializes a new MQTT connection with the specified protocol version.
230    /// The connection starts in a disconnected state and must be activated through
231    /// the connection handshake process (CONNECT/CONNACK).
232    ///
233    /// # Parameters
234    ///
235    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
236    ///
237    /// # Returns
238    ///
239    /// A new `GenericConnection` instance ready for use
240    ///
241    /// # Examples
242    ///
243    /// ```ignore
244    /// use mqtt_protocol_core::mqtt;
245    ///
246    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
247    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
248    /// ```
249    pub fn new(version: Version) -> Self {
250        Self {
251            _marker: PhantomData,
252            protocol_version: version,
253            pid_man: PacketIdManager::new(),
254            pid_suback: HashSet::new(),
255            pid_unsuback: HashSet::new(),
256            pid_puback: HashSet::new(),
257            pid_pubrec: HashSet::new(),
258            pid_pubcomp: HashSet::new(),
259            need_store: false,
260            store: GenericStore::new(),
261            offline_publish: false,
262            auto_pub_response: false,
263            auto_ping_response: false,
264            auto_map_topic_alias_send: false,
265            auto_replace_topic_alias_send: false,
266            topic_alias_recv: None,
267            topic_alias_send: None,
268            publish_send_max: None,
269            publish_recv_max: None,
270            publish_send_count: 0,
271            publish_recv: HashSet::new(),
272            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
273            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
274            status: ConnectionStatus::Disconnected,
275            pingreq_send_interval_ms: None,
276            pingreq_recv_timeout_ms: None,
277            pingresp_recv_timeout_ms: None,
278            qos2_publish_handled: HashSet::new(),
279            qos2_publish_processing: HashSet::new(),
280            pingreq_send_set: false,
281            pingreq_recv_set: false,
282            pingresp_recv_set: false,
283            packet_builder: PacketBuilder::new(),
284            is_client: false,
285        }
286    }
287
288    // public
289
290    /// Send MQTT packet with compile-time role checking (experimental)
291    ///
292    /// This method provides compile-time verification that the packet being sent
293    /// is allowed for the current connection role (Client or Server). It only works
294    /// when the `Role` type parameter is concrete (not generic).
295    ///
296    /// This is an experimental API that may be subject to change. For general use,
297    /// consider using the `send()` method instead.
298    ///
299    /// # Type Parameters
300    ///
301    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
302    ///
303    /// # Parameters
304    ///
305    /// * `packet` - The MQTT packet to send
306    ///
307    /// # Returns
308    ///
309    /// A vector of events that the application must process
310    ///
311    /// # Compile-time Safety
312    ///
313    /// If the packet type is not allowed for the current role, this will result
314    /// in a compile-time error, preventing protocol violations at development time.
315    ///
316    /// # Examples
317    ///
318    /// ```ignore
319    /// use mqtt_protocol_core::mqtt;
320    ///
321    /// // This works for concrete roles
322    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
323    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
324    ///
325    /// // This would cause a compile error
326    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
327    /// ```
328    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
329    where
330        T: Sendable<Role, PacketIdType>,
331    {
332        // dispatch concrete packet or generic packet
333        packet.dispatch_send(self)
334    }
335
336    /// Send MQTT packet with runtime role validation
337    ///
338    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
339    /// and performs runtime validation to ensure the packet is allowed for the current
340    /// connection role. This provides flexibility when the exact packet type is not known
341    /// at compile time.
342    ///
343    /// # Parameters
344    ///
345    /// * `packet` - The MQTT packet to send
346    ///
347    /// # Returns
348    ///
349    /// A vector of events that the application must process. If the packet is not allowed
350    /// for the current role, a `NotifyError` event will be included.
351    ///
352    /// # Validation
353    ///
354    /// The method validates that:
355    /// - The packet type is allowed for the connection role (Client vs Server)
356    /// - Protocol version compatibility
357    /// - Connection state requirements
358    /// - Packet ID management for QoS > 0 packets
359    ///
360    /// # Examples
361    ///
362    /// ```ignore
363    /// use mqtt_protocol_core::mqtt;
364    ///
365    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
366    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
367    ///
368    /// for event in events {
369    ///     match event {
370    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
371    ///             // Send packet over network
372    ///         },
373    ///         mqtt::connection::Event::NotifyError(error) => {
374    ///             // Handle validation errors
375    ///         },
376    ///         // Handle other events...
377    ///     }
378    /// }
379    /// ```
380    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
381        use std::any::TypeId;
382
383        let role_id = TypeId::of::<Role>();
384        let client_id = TypeId::of::<role::Client>();
385        let server_id = TypeId::of::<role::Server>();
386        let any_id = TypeId::of::<role::Any>();
387
388        // Check version compatibility between connection and packet
389        let packet_version = packet.protocol_version();
390
391        // Return error if versions don't match
392        if self.protocol_version != packet_version {
393            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
394        }
395
396        match packet {
397            // CONNECT - Client/Any can send
398            GenericPacket::V3_1_1Connect(p) => {
399                if role_id == client_id || role_id == any_id {
400                    self.process_send_v3_1_1_connect(p)
401                } else {
402                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
403                }
404            }
405            GenericPacket::V5_0Connect(p) => {
406                if role_id == client_id || role_id == any_id {
407                    self.process_send_v5_0_connect(p)
408                } else {
409                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
410                }
411            }
412            // CONNACK - Server/Any can send
413            GenericPacket::V3_1_1Connack(p) => {
414                if role_id == server_id || role_id == any_id {
415                    self.process_send_v3_1_1_connack(p)
416                } else {
417                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
418                }
419            }
420            GenericPacket::V5_0Connack(p) => {
421                if role_id == server_id || role_id == any_id {
422                    self.process_send_v5_0_connack(p)
423                } else {
424                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
425                }
426            }
427            // PUBLISH - Any role can send
428            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
429            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
430            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
431            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
432            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
433            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
434            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
435            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
436            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
437            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
438            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
439            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
440            GenericPacket::V3_1_1Subscribe(p) => {
441                if role_id == client_id || role_id == any_id {
442                    self.process_send_v3_1_1_subscribe(p)
443                } else {
444                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
445                }
446            }
447            GenericPacket::V5_0Subscribe(p) => {
448                if role_id == client_id || role_id == any_id {
449                    self.process_send_v5_0_subscribe(p)
450                } else {
451                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
452                }
453            }
454            GenericPacket::V3_1_1Unsubscribe(p) => {
455                if role_id == client_id || role_id == any_id {
456                    self.process_send_v3_1_1_unsubscribe(p)
457                } else {
458                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
459                }
460            }
461            GenericPacket::V5_0Unsubscribe(p) => {
462                if role_id == client_id || role_id == any_id {
463                    self.process_send_v5_0_unsubscribe(p)
464                } else {
465                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
466                }
467            }
468            // SUBACK/UNSUBACK - Server/Any can send
469            GenericPacket::V3_1_1Suback(p) => {
470                if role_id == server_id || role_id == any_id {
471                    self.process_send_v3_1_1_suback(p)
472                } else {
473                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
474                }
475            }
476            GenericPacket::V5_0Suback(p) => {
477                if role_id == server_id || role_id == any_id {
478                    self.process_send_v5_0_suback(p)
479                } else {
480                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
481                }
482            }
483            GenericPacket::V3_1_1Unsuback(p) => {
484                if role_id == server_id || role_id == any_id {
485                    self.process_send_v3_1_1_unsuback(p)
486                } else {
487                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
488                }
489            }
490            GenericPacket::V5_0Unsuback(p) => {
491                if role_id == server_id || role_id == any_id {
492                    self.process_send_v5_0_unsuback(p)
493                } else {
494                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
495                }
496            }
497            // PINGREQ - Client/Any can send
498            GenericPacket::V3_1_1Pingreq(p) => {
499                if role_id == client_id || role_id == any_id {
500                    self.process_send_v3_1_1_pingreq(p)
501                } else {
502                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
503                }
504            }
505            GenericPacket::V5_0Pingreq(p) => {
506                if role_id == client_id || role_id == any_id {
507                    self.process_send_v5_0_pingreq(p)
508                } else {
509                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
510                }
511            }
512            // PINGRESP - Server/Any can send
513            GenericPacket::V3_1_1Pingresp(p) => {
514                if role_id == server_id || role_id == any_id {
515                    self.process_send_v3_1_1_pingresp(p)
516                } else {
517                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
518                }
519            }
520            GenericPacket::V5_0Pingresp(p) => {
521                if role_id == server_id || role_id == any_id {
522                    self.process_send_v5_0_pingresp(p)
523                } else {
524                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
525                }
526            }
527            // DISCONNECT(v3.1.1) - Client/Any role can send
528            GenericPacket::V3_1_1Disconnect(p) => {
529                if role_id == client_id || role_id == any_id {
530                    self.process_send_v3_1_1_disconnect(p)
531                } else {
532                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
533                }
534            }
535            // DISCONNECT(v5.0) - Any role can send
536            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
537            // AUTH - Any role can send (v5.0 only)
538            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
539        }
540    }
541
542    /// Receive and process incoming MQTT data
543    ///
544    /// This method processes raw bytes received from the network and attempts to
545    /// parse them into MQTT packets. It handles packet fragmentation and can
546    /// process multiple complete packets from a single data buffer.
547    ///
548    /// # Parameters
549    ///
550    /// * `data` - A cursor over the received data bytes. The cursor position will
551    ///           be advanced as data is consumed.
552    ///
553    /// # Returns
554    ///
555    /// A vector of events generated from processing the received data:
556    /// - `NotifyPacketReceived` for successfully parsed packets
557    /// - `NotifyError` for parsing errors or protocol violations
558    /// - Additional events based on packet processing (timers, responses, etc.)
559    ///
560    /// # Behavior
561    ///
562    /// - Handles partial packets (data will be buffered until complete)
563    /// - Processes multiple complete packets in sequence
564    /// - Validates packet structure and protocol compliance
565    /// - Updates internal connection state based on received packets
566    /// - Generates appropriate response events (ACKs, etc.)
567    ///
568    /// # Examples
569    ///
570    /// ```ignore
571    /// use mqtt_protocol_core::mqtt;
572    ///
573    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
574    /// let received_data = [/* network data */];
575    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
576    ///
577    /// let events = connection.recv(&mut cursor);
578    /// for event in events {
579    ///     match event {
580    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
581    ///             // Process received packet
582    ///         },
583    ///         mqtt::connection::Event::NotifyError(error) => {
584    ///             // Handle parsing/protocol errors
585    ///         },
586    ///         // Handle other events...
587    ///     }
588    /// }
589    /// ```
590    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
591        let mut events = Vec::new();
592
593        match self.packet_builder.feed(data) {
594            PacketBuildResult::Complete(raw_packet) => {
595                events.extend(self.process_recv_packet(raw_packet));
596            }
597            PacketBuildResult::Incomplete => {}
598            PacketBuildResult::Error(e) => {
599                self.cancel_timers(&mut events);
600                events.push(GenericEvent::RequestClose);
601                events.push(GenericEvent::NotifyError(e));
602            }
603        }
604
605        events
606    }
607
608    /// Notify that a timer has fired (Event-based API)
609    ///
610    /// This method should be called when the I/O layer detects that a timer has expired.
611    /// It handles the timer event appropriately and returns events that need to be processed.
612    /// Notify that a timer has fired
613    ///
614    /// This method should be called by the application when a previously requested
615    /// timer expires. The connection will take appropriate action based on the timer type.
616    ///
617    /// # Parameters
618    ///
619    /// * `kind` - The type of timer that fired
620    ///
621    /// # Returns
622    ///
623    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
624    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
625        let mut events = Vec::new();
626
627        match kind {
628            TimerKind::PingreqSend => {
629                // Reset timer flag
630                self.pingreq_send_set = false;
631
632                // Send PINGREQ if connected
633                if self.status == ConnectionStatus::Connected {
634                    match self.protocol_version {
635                        Version::V3_1_1 => {
636                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
637                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
638                            }
639                        }
640                        Version::V5_0 => {
641                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
642                                events.extend(self.process_send_v5_0_pingreq(pingreq));
643                            }
644                        }
645                        Version::Undetermined => {
646                            unreachable!("Protocol version should be set before sending PINGREQ");
647                        }
648                    }
649                }
650            }
651            TimerKind::PingreqRecv => {
652                // Reset timer flag
653                self.pingreq_recv_set = false;
654
655                match self.protocol_version {
656                    Version::V3_1_1 => {
657                        // V3.1.1: Close connection
658                        events.push(GenericEvent::RequestClose);
659                    }
660                    Version::V5_0 => {
661                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
662                        if self.status == ConnectionStatus::Connected {
663                            if let Ok(disconnect) = v5_0::Disconnect::builder()
664                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
665                                .build()
666                            {
667                                events.extend(self.process_send_v5_0_disconnect(disconnect));
668                            }
669                        }
670                    }
671                    Version::Undetermined => {
672                        unreachable!("Protocol version should be set before receiving PINGREQ");
673                    }
674                }
675            }
676            TimerKind::PingrespRecv => {
677                // Reset timer flag
678                self.pingresp_recv_set = false;
679
680                match self.protocol_version {
681                    Version::V3_1_1 => {
682                        // V3.1.1: Close connection
683                        events.push(GenericEvent::RequestClose);
684                    }
685                    Version::V5_0 => {
686                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
687                        if self.status == ConnectionStatus::Connected {
688                            if let Ok(disconnect) = v5_0::Disconnect::builder()
689                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
690                                .build()
691                            {
692                                events.extend(self.process_send_v5_0_disconnect(disconnect));
693                            }
694                        }
695                    }
696                    Version::Undetermined => {
697                        unreachable!("Protocol version should be set before receiving PINGRESP");
698                    }
699                }
700            }
701        }
702
703        events
704    }
705
706    /// Notify that the connection has been closed by the I/O layer (Event-based API)
707    ///
708    /// This method should be called when the I/O layer detects that the socket has been closed.
709    /// It updates the internal state appropriately and returns events that need to be processed.
710    /// Notify that the underlying connection has been closed
711    ///
712    /// This method should be called when the network connection is closed,
713    /// either intentionally or due to network issues.
714    ///
715    /// # Returns
716    ///
717    /// Events generated from connection closure processing
718    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
719        let mut events = Vec::new();
720
721        // Reset packet size limits to MQTT protocol maximum
722        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
723        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
724
725        // Set status to disconnected
726        self.status = ConnectionStatus::Disconnected;
727
728        // Clear topic alias management
729        self.topic_alias_send = None;
730        self.topic_alias_recv = None;
731
732        // Release packet IDs for SUBACK
733        for packet_id in self.pid_suback.drain() {
734            if self.pid_man.is_used_id(packet_id) {
735                self.pid_man.release_id(packet_id);
736                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
737            }
738        }
739
740        // Release packet IDs for UNSUBACK
741        for packet_id in self.pid_unsuback.drain() {
742            if self.pid_man.is_used_id(packet_id) {
743                self.pid_man.release_id(packet_id);
744                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
745            }
746        }
747
748        // If not storing session state, clear QoS2 states and release publish-related packet IDs
749        if !self.need_store {
750            self.qos2_publish_processing.clear();
751            self.qos2_publish_handled.clear();
752
753            // Release packet IDs for PUBACK
754            for packet_id in self.pid_puback.drain() {
755                if self.pid_man.is_used_id(packet_id) {
756                    self.pid_man.release_id(packet_id);
757                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
758                }
759            }
760
761            // Release packet IDs for PUBREC
762            for packet_id in self.pid_pubrec.drain() {
763                if self.pid_man.is_used_id(packet_id) {
764                    self.pid_man.release_id(packet_id);
765                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
766                }
767            }
768
769            // Release packet IDs for PUBCOMP
770            for packet_id in self.pid_pubcomp.drain() {
771                if self.pid_man.is_used_id(packet_id) {
772                    self.pid_man.release_id(packet_id);
773                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
774                }
775            }
776        }
777
778        // Cancel all timers
779        self.cancel_timers(&mut events);
780
781        events
782    }
783
784    /// Set the PINGREQ send interval
785    ///
786    /// Sets the interval for sending PINGREQ packets to maintain the connection alive.
787    /// When changed, this may generate timer-related events to update the ping schedule.
788    ///
789    /// # Parameters
790    ///
791    /// * `duration_ms` - The interval in milliseconds between PINGREQ packets
792    ///
793    /// # Returns
794    ///
795    /// Events generated from updating the ping interval
796    pub fn set_pingreq_send_interval(
797        &mut self,
798        duration_ms: u64,
799    ) -> Vec<GenericEvent<PacketIdType>> {
800        let mut events = Vec::new();
801
802        if duration_ms == 0 {
803            self.pingreq_send_interval_ms = None;
804            self.pingreq_send_set = false;
805            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
806        } else {
807            self.pingreq_send_interval_ms = Some(duration_ms);
808            self.pingreq_send_set = true;
809            events.push(GenericEvent::RequestTimerReset {
810                kind: TimerKind::PingreqSend,
811                duration_ms,
812            });
813        }
814
815        events
816    }
817
818    /// Get the remaining capacity for sending PUBLISH packets
819    ///
820    /// Returns the number of additional PUBLISH packets that can be sent
821    /// without exceeding the receive maximum limit.
822    ///
823    /// # Returns
824    ///
825    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
826    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
827        // If publish_recv_max is set, return the remaining capacity
828        if let Some(max) = self.publish_send_max {
829            Some(max.saturating_sub(self.publish_send_count))
830        } else {
831            None // No limit set
832        }
833    }
834
835    /// Enable or disable offline publishing
836    ///
837    /// When enabled, PUBLISH packets can be sent even when disconnected.
838    /// They will be queued and sent once the connection is established.
839    ///
840    /// # Parameters
841    ///
842    /// * `enable` - Whether to enable offline publishing
843    pub fn set_offline_publish(&mut self, enable: bool) {
844        self.offline_publish = enable;
845        if self.offline_publish {
846            self.need_store = true;
847        }
848    }
849
850    /// Enable or disable automatic PUBLISH response generation
851    ///
852    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
853    /// are automatically generated for received PUBLISH packets.
854    ///
855    /// # Parameters
856    ///
857    /// * `enable` - Whether to enable automatic responses
858    pub fn set_auto_pub_response(&mut self, enable: bool) {
859        self.auto_pub_response = enable;
860    }
861
862    /// Enable or disable automatic PING response generation
863    ///
864    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
865    ///
866    /// # Parameters
867    ///
868    /// * `enable` - Whether to enable automatic PING responses
869    pub fn set_auto_ping_response(&mut self, enable: bool) {
870        self.auto_ping_response = enable;
871    }
872
873    /// Enable or disable automatic topic alias mapping for outgoing packets
874    ///
875    /// When enabled, the connection will automatically map topics to aliases
876    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
877    /// - Applying existing registered topic aliases when available
878    /// - Allocating new topic aliases for unregistered topics
879    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
880    ///
881    /// # Parameters
882    ///
883    /// * `enable` - Whether to enable automatic topic alias mapping
884    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
885        self.auto_map_topic_alias_send = enable;
886    }
887
888    /// Enable or disable automatic topic alias replacement for outgoing packets
889    ///
890    /// When enabled, the connection will automatically apply existing registered
891    /// topic aliases to outgoing PUBLISH packets when aliases are available.
892    /// This only uses previously registered aliases and does not allocate new ones.
893    ///
894    /// # Parameters
895    ///
896    /// * `enable` - Whether to enable automatic topic alias replacement
897    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
898        self.auto_replace_topic_alias_send = enable;
899    }
900
901    /// Set PINGREQ receive timeout
902    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
903        self.pingresp_recv_timeout_ms = timeout_ms;
904    }
905
906    /// Acquire a new packet ID for outgoing packets
907    ///
908    /// # Returns
909    ///
910    /// A unique packet ID, or an error if none are available
911    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
912        self.pid_man.acquire_unique_id()
913    }
914
915    /// Register a packet ID as in use
916    ///
917    /// Manually registers a specific packet ID as being in use, preventing
918    /// it from being allocated by `acquire_packet_id()`.
919    ///
920    /// # Parameters
921    ///
922    /// * `packet_id` - The packet ID to register as in use
923    ///
924    /// # Returns
925    ///
926    /// `Ok(())` if successful, or an error if the packet ID is already in use
927    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
928        self.pid_man.register_id(packet_id)
929    }
930
931    /// Release packet ID (Event-based API)
932    /// Release a packet ID for reuse
933    ///
934    /// This method releases a packet ID, making it available for future use.
935    /// It also generates a `NotifyPacketIdReleased` event.
936    ///
937    /// # Parameters
938    ///
939    /// * `packet_id` - The packet ID to release
940    ///
941    /// # Returns
942    ///
943    /// Events generated from releasing the packet ID
944    pub fn release_packet_id(
945        &mut self,
946        packet_id: PacketIdType,
947    ) -> Vec<GenericEvent<PacketIdType>> {
948        let mut events = Vec::new();
949
950        if self.pid_man.is_used_id(packet_id) {
951            self.pid_man.release_id(packet_id);
952            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
953        }
954
955        events
956    }
957
958    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
959    ///
960    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
961    /// that have been successfully processed and handled.
962    ///
963    /// # Returns
964    ///
965    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
966    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
967        self.qos2_publish_handled.clone()
968    }
969
970    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
971    ///
972    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
973    /// typically used when resuming a connection from persistent storage.
974    ///
975    /// # Parameters
976    ///
977    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
978    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
979        self.qos2_publish_handled = pids;
980    }
981
982    /// Restore previously stored packets
983    ///
984    /// This method restores packets that were previously stored for persistence,
985    /// typically called when resuming a session.
986    ///
987    /// # Parameters
988    ///
989    /// * `packets` - Vector of packets to restore
990    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
991        for packet in packets {
992            match &packet {
993                GenericStorePacket::V3_1_1Publish(p) => {
994                    // Add to appropriate QoS tracking set
995                    match p.qos() {
996                        Qos::AtLeastOnce => {
997                            self.pid_puback.insert(p.packet_id().unwrap());
998                        }
999                        Qos::ExactlyOnce => {
1000                            self.pid_pubrec.insert(p.packet_id().unwrap());
1001                        }
1002                        _ => {
1003                            // QoS 0 shouldn't be in store, but handle gracefully
1004                            tracing::warn!("QoS 0 packet found in store, skipping");
1005                            continue;
1006                        }
1007                    }
1008                    // Register packet ID and add to store
1009                    let packet_id = p.packet_id().unwrap();
1010                    if self.pid_man.register_id(packet_id).is_ok() {
1011                        if let Err(e) = self.store.add(packet) {
1012                            tracing::error!("Failed to add packet to store: {:?}", e);
1013                        }
1014                    } else {
1015                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1016                    }
1017                }
1018                GenericStorePacket::V5_0Publish(p) => {
1019                    // Add to appropriate QoS tracking set
1020                    match p.qos() {
1021                        Qos::AtLeastOnce => {
1022                            self.pid_puback.insert(p.packet_id().unwrap());
1023                        }
1024                        Qos::ExactlyOnce => {
1025                            self.pid_pubrec.insert(p.packet_id().unwrap());
1026                        }
1027                        _ => {
1028                            // QoS 0 shouldn't be in store, but handle gracefully
1029                            tracing::warn!("QoS 0 packet found in store, skipping");
1030                            continue;
1031                        }
1032                    }
1033                    // Register packet ID and add to store
1034                    let packet_id = p.packet_id().unwrap();
1035                    if self.pid_man.register_id(packet_id).is_ok() {
1036                        if let Err(e) = self.store.add(packet) {
1037                            tracing::error!("Failed to add packet to store: {:?}", e);
1038                        }
1039                    } else {
1040                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1041                    }
1042                }
1043                GenericStorePacket::V3_1_1Pubrel(p) => {
1044                    // Pubrel packets expect PUBCOMP response
1045                    self.pid_pubcomp.insert(p.packet_id());
1046                    // Register packet ID and add to store
1047                    let packet_id = p.packet_id();
1048                    if self.pid_man.register_id(packet_id).is_ok() {
1049                        if let Err(e) = self.store.add(packet) {
1050                            tracing::error!("Failed to add packet to store: {:?}", e);
1051                        }
1052                    } else {
1053                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1054                    }
1055                }
1056                GenericStorePacket::V5_0Pubrel(p) => {
1057                    // Pubrel packets expect PUBCOMP response
1058                    self.pid_pubcomp.insert(p.packet_id());
1059                    // Register packet ID and add to store
1060                    let packet_id = p.packet_id();
1061                    if self.pid_man.register_id(packet_id).is_ok() {
1062                        if let Err(e) = self.store.add(packet) {
1063                            tracing::error!("Failed to add packet to store: {:?}", e);
1064                        }
1065                    } else {
1066                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1067                    }
1068                }
1069            }
1070        }
1071    }
1072
1073    /// Get stored packets for persistence
1074    ///
1075    /// Returns packets that need to be stored for potential retransmission.
1076    /// This is useful for implementing persistent sessions.
1077    ///
1078    /// # Returns
1079    ///
1080    /// Vector of packets that should be persisted
1081    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1082        self.store.get_stored()
1083    }
1084
1085    /// Get the MQTT protocol version being used
1086    ///
1087    /// # Returns
1088    ///
1089    /// The protocol version (V3_1_1 or V5_0)
1090    pub fn get_protocol_version(&self) -> Version {
1091        self.protocol_version
1092    }
1093
1094    /// Check if a PUBLISH packet is currently being processed
1095    ///
1096    /// # Parameters
1097    ///
1098    /// * `packet_id` - The packet ID to check
1099    ///
1100    /// # Returns
1101    ///
1102    /// True if the packet ID is in use for PUBLISH processing
1103    pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1104        self.qos2_publish_processing.contains(&packet_id)
1105    }
1106
1107    /// Regulate packet for store (remove/resolve topic alias)
1108    ///
1109    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1110    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1111    pub fn regulate_for_store(
1112        &self,
1113        mut packet: v5_0::GenericPublish<PacketIdType>,
1114    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1115        if packet.topic_name().is_empty() {
1116            // Topic is empty, need to resolve from topic alias
1117            if let Some(props) = packet.props() {
1118                if let Some(topic_alias) = Self::get_topic_alias_from_props(props) {
1119                    if let Some(ref topic_alias_send) = self.topic_alias_send {
1120                        if let Some(topic) = topic_alias_send.peek(topic_alias) {
1121                            // Found topic for alias, add topic and remove alias property
1122                            packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1123                        } else {
1124                            return Err(MqttError::PacketNotRegulated);
1125                        }
1126                    } else {
1127                        return Err(MqttError::PacketNotRegulated);
1128                    }
1129                } else {
1130                    return Err(MqttError::PacketNotRegulated);
1131                }
1132            } else {
1133                return Err(MqttError::PacketNotRegulated);
1134            }
1135        } else {
1136            // Topic is not empty, just remove TopicAlias property if present
1137            packet = packet.remove_topic_alias();
1138        }
1139
1140        Ok(packet)
1141    }
1142
1143    // private
1144
1145    /// Initialize connection state based on client/server role
1146    ///
1147    /// Resets all connection-specific state including:
1148    /// - Publish flow control counters and limits
1149    /// - Topic alias management
1150    /// - QoS2 processing state
1151    /// - Packet ID tracking sets
1152    /// - Store requirement flag
1153    ///
1154    /// # Arguments
1155    /// * `is_client` - true for client mode, false for server mode
1156    fn initialize(&mut self, is_client: bool) {
1157        self.publish_send_max = None;
1158        self.publish_recv_max = None;
1159        self.publish_send_count = 0;
1160        self.topic_alias_send = None;
1161        self.topic_alias_recv = None;
1162        self.publish_recv.clear();
1163        self.qos2_publish_processing.clear();
1164        self.need_store = false;
1165        self.pid_suback.clear();
1166        self.pid_unsuback.clear();
1167        self.is_client = is_client;
1168    }
1169
1170    fn clear_store_related(&mut self) {
1171        self.pid_man.clear();
1172        self.pid_puback.clear();
1173        self.pid_pubrec.clear();
1174        self.pid_pubcomp.clear();
1175        self.store.clear();
1176    }
1177
1178    /// Send all stored packets for retransmission
1179    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1180        let mut events = Vec::new();
1181        self.store.for_each(|packet| {
1182            if packet.size() > self.maximum_packet_size_send as usize {
1183                let packet_id = packet.packet_id();
1184                self.pid_man.release_id(packet_id);
1185                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1186                return false; // Remove from store
1187            }
1188            events.push(GenericEvent::RequestSendPacket {
1189                packet: packet.clone().into(),
1190                release_packet_id_if_send_error: None,
1191            });
1192            return true; // Keep in store
1193        });
1194
1195        events
1196    }
1197
1198    /// Validate topic alias and return the associated topic name
1199    ///
1200    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1201    /// from the topic alias send manager.
1202    ///
1203    /// # Arguments
1204    /// * `topic_alias_opt` - Optional topic alias value
1205    ///
1206    /// # Returns
1207    /// * `Some(topic_name)` if the topic alias is valid and found
1208    /// * `None` if the topic alias is invalid, not provided, or not found
1209    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1210        let topic_alias = topic_alias_opt?;
1211
1212        if !self.validate_topic_alias_range(topic_alias) {
1213            return None;
1214        }
1215
1216        let topic_alias_send = self.topic_alias_send.as_mut()?;
1217        // LRU updated here
1218        let topic = topic_alias_send.get(topic_alias)?;
1219
1220        Some(topic.to_string())
1221    }
1222
1223    /// Validate that topic alias is within the allowed range
1224    ///
1225    /// Checks if the topic alias value is valid according to the configured
1226    /// topic alias maximum for sending.
1227    ///
1228    /// # Arguments
1229    /// * `topic_alias` - Topic alias value to validate
1230    ///
1231    /// # Returns
1232    /// * `true` if the topic alias is within valid range
1233    /// * `false` if invalid or topic alias sending is not configured
1234    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1235        let topic_alias_send = match &self.topic_alias_send {
1236            Some(tas) => tas,
1237            None => {
1238                tracing::error!("topic_alias is set but topic_alias_maximum is 0");
1239                return false;
1240            }
1241        };
1242
1243        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1244            tracing::error!("topic_alias is set but out of range");
1245            return false;
1246        }
1247
1248        true
1249    }
1250
1251    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1252    pub(crate) fn process_send_v3_1_1_connect(
1253        &mut self,
1254        packet: v3_1_1::Connect,
1255    ) -> Vec<GenericEvent<PacketIdType>> {
1256        if self.status != ConnectionStatus::Disconnected {
1257            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1258        }
1259        if !self.validate_maximum_packet_size_send(packet.size()) {
1260            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1261        }
1262
1263        let mut events = Vec::new();
1264        self.initialize(true);
1265        self.status = ConnectionStatus::Connecting;
1266
1267        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1268        let keep_alive = packet.keep_alive();
1269        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1270            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1271        }
1272
1273        // Handle clean_session flag
1274        if packet.clean_start() {
1275            self.clear_store_related();
1276        } else {
1277            self.need_store = true;
1278        }
1279
1280        // Clear topic alias for sending
1281        self.topic_alias_send = None;
1282
1283        events.push(GenericEvent::RequestSendPacket {
1284            packet: packet.into(),
1285            release_packet_id_if_send_error: None,
1286        });
1287        self.send_post_process(&mut events);
1288
1289        events
1290    }
1291
1292    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1293    pub(crate) fn process_send_v5_0_connect(
1294        &mut self,
1295        packet: v5_0::Connect,
1296    ) -> Vec<GenericEvent<PacketIdType>> {
1297        if !self.validate_maximum_packet_size_send(packet.size()) {
1298            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1299        }
1300        if self.status != ConnectionStatus::Disconnected {
1301            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1302        }
1303
1304        let mut events = Vec::new();
1305        self.initialize(true);
1306        self.status = ConnectionStatus::Connecting;
1307
1308        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1309        let keep_alive = packet.keep_alive();
1310        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1311            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1312        }
1313
1314        // Handle clean_start flag
1315        if packet.clean_start() {
1316            self.clear_store_related();
1317        }
1318
1319        // Process properties
1320        for prop in packet.props() {
1321            match prop {
1322                Property::TopicAliasMaximum(val) => {
1323                    if val.val() != 0 {
1324                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1325                    }
1326                }
1327                Property::ReceiveMaximum(val) => {
1328                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1329                    self.publish_recv_max = Some(val.val());
1330                }
1331                Property::MaximumPacketSize(val) => {
1332                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1333                    self.maximum_packet_size_recv = val.val();
1334                }
1335                Property::SessionExpiryInterval(val) => {
1336                    if val.val() != 0 {
1337                        self.need_store = true;
1338                    }
1339                }
1340                _ => {
1341                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1342                }
1343            }
1344        }
1345
1346        events.push(GenericEvent::RequestSendPacket {
1347            packet: packet.into(),
1348            release_packet_id_if_send_error: None,
1349        });
1350        self.send_post_process(&mut events);
1351
1352        events
1353    }
1354
1355    pub(crate) fn process_send_v3_1_1_connack(
1356        &mut self,
1357        packet: v3_1_1::Connack,
1358    ) -> Vec<GenericEvent<PacketIdType>> {
1359        if self.status != ConnectionStatus::Connecting {
1360            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1361        }
1362        let mut events = Vec::new();
1363        if packet.return_code() == ConnectReturnCode::Accepted {
1364            self.status = ConnectionStatus::Connected;
1365        } else {
1366            self.status = ConnectionStatus::Disconnected;
1367        }
1368
1369        events.push(GenericEvent::RequestSendPacket {
1370            packet: packet.into(),
1371            release_packet_id_if_send_error: None,
1372        });
1373        events.extend(self.send_stored());
1374        self.send_post_process(&mut events);
1375
1376        events
1377    }
1378
1379    pub(crate) fn process_send_v5_0_connack(
1380        &mut self,
1381        packet: v5_0::Connack,
1382    ) -> Vec<GenericEvent<PacketIdType>> {
1383        if !self.validate_maximum_packet_size_send(packet.size()) {
1384            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1385        }
1386        if self.status != ConnectionStatus::Connecting {
1387            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1388        }
1389
1390        let mut events = Vec::new();
1391
1392        if packet.reason_code() == ConnectReasonCode::Success {
1393            self.status = ConnectionStatus::Connected;
1394
1395            // Process properties
1396            for prop in packet.props() {
1397                match prop {
1398                    Property::TopicAliasMaximum(val) => {
1399                        if val.val() != 0 {
1400                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1401                        }
1402                    }
1403                    Property::ReceiveMaximum(val) => {
1404                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1405                        self.publish_recv_max = Some(val.val());
1406                    }
1407                    Property::MaximumPacketSize(val) => {
1408                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1409                        self.maximum_packet_size_recv = val.val();
1410                    }
1411                    _ => {
1412                        // Ignore other properties
1413                    }
1414                }
1415            }
1416        } else {
1417            self.status = ConnectionStatus::Disconnected;
1418            self.cancel_timers(&mut events);
1419        }
1420
1421        events.push(GenericEvent::RequestSendPacket {
1422            packet: packet.into(),
1423            release_packet_id_if_send_error: None,
1424        });
1425        events.extend(self.send_stored());
1426        self.send_post_process(&mut events);
1427
1428        events
1429    }
1430
1431    pub(crate) fn process_send_v3_1_1_publish(
1432        &mut self,
1433        packet: v3_1_1::GenericPublish<PacketIdType>,
1434    ) -> Vec<GenericEvent<PacketIdType>> {
1435        let mut events = Vec::new();
1436        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1437
1438        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1439            // Register packet ID for QoS 1 or 2
1440            let packet_id = packet.packet_id().unwrap();
1441            if self.status != ConnectionStatus::Connected
1442                && !self.need_store
1443                && !self.offline_publish
1444            {
1445                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1446                if self.pid_man.is_used_id(packet_id) {
1447                    self.pid_man.release_id(packet_id);
1448                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1449                }
1450                return events;
1451            }
1452            if !self.pid_man.is_used_id(packet_id) {
1453                tracing::error!("packet_id {packet_id} must be acquired or registered");
1454                events.push(GenericEvent::NotifyError(
1455                    MqttError::PacketIdentifierInvalid,
1456                ));
1457                return events;
1458            }
1459            if self.need_store
1460                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1461            {
1462                let store_packet = packet.clone().set_dup(true);
1463                self.store.add(store_packet.try_into().unwrap()).unwrap();
1464            } else {
1465                release_packet_id_if_send_error = Some(packet_id);
1466            }
1467            if packet.qos() == Qos::ExactlyOnce {
1468                self.qos2_publish_processing.insert(packet_id);
1469                self.pid_pubrec.insert(packet_id);
1470            } else {
1471                self.pid_puback.insert(packet_id);
1472            }
1473        } else if self.status != ConnectionStatus::Connected {
1474            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1475            return events;
1476        }
1477
1478        if self.status == ConnectionStatus::Connected {
1479            events.push(GenericEvent::RequestSendPacket {
1480                packet: packet.into(),
1481                release_packet_id_if_send_error,
1482            });
1483        }
1484        self.send_post_process(&mut events);
1485
1486        events
1487    }
1488
1489    pub(crate) fn process_send_v5_0_publish(
1490        &mut self,
1491        mut packet: v5_0::GenericPublish<PacketIdType>,
1492    ) -> Vec<GenericEvent<PacketIdType>> {
1493        if !self.validate_maximum_packet_size_send(packet.size()) {
1494            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1495        }
1496
1497        let mut events = Vec::new();
1498        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1499        let mut topic_alias_validated = false;
1500        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1501            let packet_id = packet.packet_id().unwrap();
1502            if self.status != ConnectionStatus::Connected
1503                && !self.need_store
1504                && !self.offline_publish
1505            {
1506                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1507                if self.pid_man.is_used_id(packet_id) {
1508                    self.pid_man.release_id(packet_id);
1509                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1510                }
1511                return events;
1512            }
1513
1514            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1515            if !self.pid_man.is_used_id(packet_id) {
1516                tracing::error!("packet_id {packet_id} must be acquired or registered");
1517                events.push(GenericEvent::NotifyError(
1518                    MqttError::PacketIdentifierInvalid,
1519                ));
1520                return events;
1521            }
1522
1523            if self.need_store
1524                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1525            {
1526                let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1527                if packet.topic_name().is_empty() {
1528                    // Topic name is empty, must validate topic alias
1529                    let topic_opt = self.validate_topic_alias(ta_opt);
1530                    if topic_opt.is_none() {
1531                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1532                        if self.pid_man.is_used_id(packet_id) {
1533                            self.pid_man.release_id(packet_id);
1534                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1535                        }
1536                        return events;
1537                    }
1538                    topic_alias_validated = true;
1539                    let store_packet = packet
1540                        .clone()
1541                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1542                        .unwrap()
1543                        .set_dup(true);
1544                    // TBD validate_maximum_packet_size(store_packet.size());
1545                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1546                } else {
1547                    // Topic name is not empty, remove topic alias if present
1548                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1549                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1550                }
1551            } else {
1552                release_packet_id_if_send_error = Some(packet_id);
1553            }
1554            if packet.qos() == Qos::ExactlyOnce {
1555                self.qos2_publish_processing.insert(packet_id);
1556                self.pid_pubrec.insert(packet_id);
1557            } else {
1558                self.pid_puback.insert(packet_id);
1559            }
1560        } else if self.status != ConnectionStatus::Connected {
1561            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1562            return events;
1563        }
1564
1565        let packet_id_opt = packet.packet_id();
1566        let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1567        if packet.topic_name().is_empty() {
1568            // process manually provided TopicAlias
1569            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1570                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1571                if let Some(packet_id) = packet_id_opt {
1572                    if self.pid_man.is_used_id(packet_id) {
1573                        self.pid_man.release_id(packet_id);
1574                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1575                    }
1576                }
1577                return events;
1578            }
1579        } else {
1580            if let Some(ta) = ta_opt {
1581                // Topic alias is provided
1582                if self.validate_topic_alias_range(ta) {
1583                    tracing::trace!(
1584                        "topic alias: {} - {} is registered.",
1585                        packet.topic_name(),
1586                        ta
1587                    );
1588                    if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589                        topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590                    }
1591                } else {
1592                    events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593                    if let Some(packet_id) = packet_id_opt {
1594                        if self.pid_man.is_used_id(packet_id) {
1595                            self.pid_man.release_id(packet_id);
1596                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597                        }
1598                    }
1599                    return events;
1600                }
1601            } else if self.status == ConnectionStatus::Connected {
1602                // process auto applying TopicAlias if the option is enabled
1603                if self.auto_map_topic_alias_send {
1604                    if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605                        if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name())
1606                        {
1607                            tracing::trace!(
1608                                "topic alias: {} - {} is found.",
1609                                packet.topic_name(),
1610                                found_ta
1611                            );
1612                            packet = packet.remove_topic_add_topic_alias(found_ta);
1613                        } else {
1614                            let lru_ta = topic_alias_send.get_lru_alias();
1615                            topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1616                            packet = packet.remove_topic_add_topic_alias(lru_ta);
1617                        }
1618                    }
1619                } else if self.auto_replace_topic_alias_send {
1620                    if let Some(ref topic_alias_send) = self.topic_alias_send {
1621                        if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name())
1622                        {
1623                            tracing::trace!(
1624                                "topic alias: {} - {} is found.",
1625                                packet.topic_name(),
1626                                found_ta
1627                            );
1628                            packet = packet.remove_topic_add_topic_alias(found_ta);
1629                        }
1630                    }
1631                }
1632            }
1633        }
1634
1635        // Check receive_maximum for sending (QoS 1 and 2 packets)
1636        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1637            if let Some(max) = self.publish_send_max {
1638                if self.publish_send_count == max {
1639                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1640                    if let Some(packet_id) = packet_id_opt {
1641                        if self.pid_man.is_used_id(packet_id) {
1642                            self.pid_man.release_id(packet_id);
1643                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1644                        }
1645                    }
1646                    return events;
1647                }
1648                self.publish_send_count += 1;
1649            }
1650        }
1651
1652        if self.status == ConnectionStatus::Connected {
1653            events.push(GenericEvent::RequestSendPacket {
1654                packet: packet.into(),
1655                release_packet_id_if_send_error,
1656            });
1657        }
1658        self.send_post_process(&mut events);
1659
1660        events
1661    }
1662
1663    pub(crate) fn process_send_v3_1_1_puback(
1664        &mut self,
1665        packet: v3_1_1::GenericPuback<PacketIdType>,
1666    ) -> Vec<GenericEvent<PacketIdType>> {
1667        if self.status != ConnectionStatus::Connected {
1668            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1669        }
1670        let mut events = Vec::new();
1671
1672        events.push(GenericEvent::RequestSendPacket {
1673            packet: packet.into(),
1674            release_packet_id_if_send_error: None,
1675        });
1676        self.send_post_process(&mut events);
1677
1678        events
1679    }
1680
1681    pub(crate) fn process_send_v5_0_puback(
1682        &mut self,
1683        packet: v5_0::GenericPuback<PacketIdType>,
1684    ) -> Vec<GenericEvent<PacketIdType>> {
1685        if !self.validate_maximum_packet_size_send(packet.size()) {
1686            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1687        }
1688        if self.status != ConnectionStatus::Connected {
1689            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1690        }
1691
1692        let mut events = Vec::new();
1693        self.publish_recv.remove(&packet.packet_id());
1694
1695        events.push(GenericEvent::RequestSendPacket {
1696            packet: packet.into(),
1697            release_packet_id_if_send_error: None,
1698        });
1699        self.send_post_process(&mut events);
1700
1701        events
1702    }
1703
1704    pub(crate) fn process_send_v3_1_1_pubrec(
1705        &mut self,
1706        packet: v3_1_1::GenericPubrec<PacketIdType>,
1707    ) -> Vec<GenericEvent<PacketIdType>> {
1708        if self.status != ConnectionStatus::Connected {
1709            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1710        }
1711        let mut events = Vec::new();
1712
1713        events.push(GenericEvent::RequestSendPacket {
1714            packet: packet.into(),
1715            release_packet_id_if_send_error: None,
1716        });
1717        self.send_post_process(&mut events);
1718
1719        events
1720    }
1721
1722    pub(crate) fn process_send_v5_0_pubrec(
1723        &mut self,
1724        packet: v5_0::GenericPubrec<PacketIdType>,
1725    ) -> Vec<GenericEvent<PacketIdType>> {
1726        if !self.validate_maximum_packet_size_send(packet.size()) {
1727            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1728        }
1729        if self.status != ConnectionStatus::Connected {
1730            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1731        }
1732
1733        let mut events = Vec::new();
1734        let packet_id = packet.packet_id();
1735
1736        if let Some(rc) = packet.reason_code() {
1737            if rc.is_failure() {
1738                self.publish_recv.remove(&packet_id);
1739                self.qos2_publish_handled.remove(&packet_id);
1740            }
1741        }
1742
1743        events.push(GenericEvent::RequestSendPacket {
1744            packet: packet.into(),
1745            release_packet_id_if_send_error: None,
1746        });
1747        self.send_post_process(&mut events);
1748
1749        events
1750    }
1751
1752    pub(crate) fn process_send_v3_1_1_pubrel(
1753        &mut self,
1754        packet: v3_1_1::GenericPubrel<PacketIdType>,
1755    ) -> Vec<GenericEvent<PacketIdType>> {
1756        if self.status != ConnectionStatus::Connected && !self.need_store {
1757            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1758        }
1759        let mut events = Vec::new();
1760        let packet_id = packet.packet_id();
1761        if !self.pid_man.is_used_id(packet_id) {
1762            tracing::error!("packet_id {packet_id} must be acquired or registered");
1763            events.push(GenericEvent::NotifyError(
1764                MqttError::PacketIdentifierInvalid,
1765            ));
1766            return events;
1767        }
1768        if self.need_store {
1769            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1770        }
1771
1772        if self.status == ConnectionStatus::Connected {
1773            self.pid_pubcomp.insert(packet_id);
1774            events.push(GenericEvent::RequestSendPacket {
1775                packet: packet.into(),
1776                release_packet_id_if_send_error: None,
1777            });
1778        }
1779        self.send_post_process(&mut events);
1780
1781        events
1782    }
1783
1784    pub(crate) fn process_send_v5_0_pubrel(
1785        &mut self,
1786        packet: v5_0::GenericPubrel<PacketIdType>,
1787    ) -> Vec<GenericEvent<PacketIdType>> {
1788        if !self.validate_maximum_packet_size_send(packet.size()) {
1789            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1790        }
1791        if self.status != ConnectionStatus::Connected && !self.need_store {
1792            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1793        }
1794
1795        let mut events = Vec::new();
1796        let packet_id = packet.packet_id();
1797        if !self.pid_man.is_used_id(packet_id) {
1798            tracing::error!("packet_id {packet_id} must be acquired or registered");
1799            events.push(GenericEvent::NotifyError(
1800                MqttError::PacketIdentifierInvalid,
1801            ));
1802            return events;
1803        }
1804        if self.need_store {
1805            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1806        }
1807
1808        if self.status == ConnectionStatus::Connected {
1809            self.pid_pubcomp.insert(packet_id);
1810            events.push(GenericEvent::RequestSendPacket {
1811                packet: packet.into(),
1812                release_packet_id_if_send_error: None,
1813            });
1814        }
1815        self.send_post_process(&mut events);
1816
1817        events
1818    }
1819
1820    pub(crate) fn process_send_v3_1_1_pubcomp(
1821        &mut self,
1822        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1823    ) -> Vec<GenericEvent<PacketIdType>> {
1824        if self.status != ConnectionStatus::Connected {
1825            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1826        }
1827        let mut events = Vec::new();
1828
1829        events.push(GenericEvent::RequestSendPacket {
1830            packet: packet.into(),
1831            release_packet_id_if_send_error: None,
1832        });
1833        self.send_post_process(&mut events);
1834
1835        events
1836    }
1837
1838    pub(crate) fn process_send_v5_0_pubcomp(
1839        &mut self,
1840        packet: v5_0::GenericPubcomp<PacketIdType>,
1841    ) -> Vec<GenericEvent<PacketIdType>> {
1842        if !self.validate_maximum_packet_size_send(packet.size()) {
1843            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1844        }
1845        if self.status != ConnectionStatus::Connected {
1846            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1847        }
1848
1849        let mut events = Vec::new();
1850        self.publish_recv.remove(&packet.packet_id());
1851
1852        events.push(GenericEvent::RequestSendPacket {
1853            packet: packet.into(),
1854            release_packet_id_if_send_error: None,
1855        });
1856        self.send_post_process(&mut events);
1857
1858        events
1859    }
1860
1861    pub(crate) fn process_send_v3_1_1_subscribe(
1862        &mut self,
1863        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1864    ) -> Vec<GenericEvent<PacketIdType>> {
1865        let mut events = Vec::new();
1866        let packet_id = packet.packet_id();
1867        if self.status != ConnectionStatus::Connected {
1868            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1869            if self.pid_man.is_used_id(packet_id) {
1870                self.pid_man.release_id(packet_id);
1871                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1872            }
1873            return events;
1874        }
1875        if !self.pid_man.is_used_id(packet_id) {
1876            tracing::error!("packet_id {packet_id} must be acquired or registered");
1877            events.push(GenericEvent::NotifyError(
1878                MqttError::PacketIdentifierInvalid,
1879            ));
1880            return events;
1881        }
1882        self.pid_suback.insert(packet_id);
1883
1884        events.push(GenericEvent::RequestSendPacket {
1885            packet: packet.into(),
1886            release_packet_id_if_send_error: Some(packet_id),
1887        });
1888        self.send_post_process(&mut events);
1889
1890        events
1891    }
1892
1893    pub(crate) fn process_send_v5_0_subscribe(
1894        &mut self,
1895        packet: v5_0::GenericSubscribe<PacketIdType>,
1896    ) -> Vec<GenericEvent<PacketIdType>> {
1897        if !self.validate_maximum_packet_size_send(packet.size()) {
1898            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1899        }
1900
1901        let mut events = Vec::new();
1902        let packet_id = packet.packet_id();
1903        if self.status != ConnectionStatus::Connected {
1904            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1905            if self.pid_man.is_used_id(packet_id) {
1906                self.pid_man.release_id(packet_id);
1907                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1908            }
1909            return events;
1910        }
1911        if !self.pid_man.is_used_id(packet_id) {
1912            tracing::error!("packet_id {packet_id} must be acquired or registered");
1913            events.push(GenericEvent::NotifyError(
1914                MqttError::PacketIdentifierInvalid,
1915            ));
1916            return events;
1917        }
1918        self.pid_suback.insert(packet_id);
1919
1920        events.push(GenericEvent::RequestSendPacket {
1921            packet: packet.into(),
1922            release_packet_id_if_send_error: Some(packet_id),
1923        });
1924        self.send_post_process(&mut events);
1925
1926        events
1927    }
1928
1929    pub(crate) fn process_send_v3_1_1_suback(
1930        &mut self,
1931        packet: v3_1_1::GenericSuback<PacketIdType>,
1932    ) -> Vec<GenericEvent<PacketIdType>> {
1933        if self.status != ConnectionStatus::Connected {
1934            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1935        }
1936        let mut events = Vec::new();
1937        events.push(GenericEvent::RequestSendPacket {
1938            packet: packet.into(),
1939            release_packet_id_if_send_error: None,
1940        });
1941        self.send_post_process(&mut events);
1942
1943        events
1944    }
1945
1946    pub(crate) fn process_send_v5_0_suback(
1947        &mut self,
1948        packet: v5_0::GenericSuback<PacketIdType>,
1949    ) -> Vec<GenericEvent<PacketIdType>> {
1950        if !self.validate_maximum_packet_size_send(packet.size()) {
1951            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1952        }
1953        if self.status != ConnectionStatus::Connected {
1954            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1955        }
1956
1957        let mut events = Vec::new();
1958        events.push(GenericEvent::RequestSendPacket {
1959            packet: packet.into(),
1960            release_packet_id_if_send_error: None,
1961        });
1962        self.send_post_process(&mut events);
1963
1964        events
1965    }
1966
1967    pub(crate) fn process_send_v3_1_1_unsubscribe(
1968        &mut self,
1969        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1970    ) -> Vec<GenericEvent<PacketIdType>> {
1971        let mut events = Vec::new();
1972        let packet_id = packet.packet_id();
1973        if self.status != ConnectionStatus::Connected {
1974            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1975            if self.pid_man.is_used_id(packet_id) {
1976                self.pid_man.release_id(packet_id);
1977                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1978            }
1979            return events;
1980        }
1981        if !self.pid_man.is_used_id(packet_id) {
1982            tracing::error!("packet_id {packet_id} must be acquired or registered");
1983            events.push(GenericEvent::NotifyError(
1984                MqttError::PacketIdentifierInvalid,
1985            ));
1986            return events;
1987        }
1988        self.pid_unsuback.insert(packet_id);
1989
1990        events.push(GenericEvent::RequestSendPacket {
1991            packet: packet.into(),
1992            release_packet_id_if_send_error: Some(packet_id),
1993        });
1994        self.send_post_process(&mut events);
1995
1996        events
1997    }
1998
1999    pub(crate) fn process_send_v5_0_unsubscribe(
2000        &mut self,
2001        packet: v5_0::GenericUnsubscribe<PacketIdType>,
2002    ) -> Vec<GenericEvent<PacketIdType>> {
2003        if !self.validate_maximum_packet_size_send(packet.size()) {
2004            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2005        }
2006
2007        let mut events = Vec::new();
2008        let packet_id = packet.packet_id();
2009        if self.status != ConnectionStatus::Connected {
2010            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2011            if self.pid_man.is_used_id(packet_id) {
2012                self.pid_man.release_id(packet_id);
2013                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2014            }
2015            return events;
2016        }
2017        if !self.pid_man.is_used_id(packet_id) {
2018            tracing::error!("packet_id {packet_id} must be acquired or registered");
2019            events.push(GenericEvent::NotifyError(
2020                MqttError::PacketIdentifierInvalid,
2021            ));
2022            return events;
2023        }
2024        self.pid_unsuback.insert(packet_id);
2025
2026        events.push(GenericEvent::RequestSendPacket {
2027            packet: packet.into(),
2028            release_packet_id_if_send_error: Some(packet_id),
2029        });
2030        self.send_post_process(&mut events);
2031
2032        events
2033    }
2034
2035    pub(crate) fn process_send_v3_1_1_unsuback(
2036        &mut self,
2037        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2038    ) -> Vec<GenericEvent<PacketIdType>> {
2039        if self.status != ConnectionStatus::Connected {
2040            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2041        }
2042        let mut events = Vec::new();
2043        events.push(GenericEvent::RequestSendPacket {
2044            packet: packet.into(),
2045            release_packet_id_if_send_error: None,
2046        });
2047        self.send_post_process(&mut events);
2048
2049        events
2050    }
2051
2052    pub(crate) fn process_send_v5_0_unsuback(
2053        &mut self,
2054        packet: v5_0::GenericUnsuback<PacketIdType>,
2055    ) -> Vec<GenericEvent<PacketIdType>> {
2056        if !self.validate_maximum_packet_size_send(packet.size()) {
2057            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2058        }
2059        if self.status != ConnectionStatus::Connected {
2060            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2061        }
2062
2063        let mut events = Vec::new();
2064        events.push(GenericEvent::RequestSendPacket {
2065            packet: packet.into(),
2066            release_packet_id_if_send_error: None,
2067        });
2068        self.send_post_process(&mut events);
2069
2070        events
2071    }
2072
2073    pub(crate) fn process_send_v3_1_1_pingreq(
2074        &mut self,
2075        packet: v3_1_1::Pingreq,
2076    ) -> Vec<GenericEvent<PacketIdType>> {
2077        if self.status != ConnectionStatus::Connected {
2078            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2079        }
2080        let mut events = Vec::new();
2081        events.push(GenericEvent::RequestSendPacket {
2082            packet: packet.into(),
2083            release_packet_id_if_send_error: None,
2084        });
2085        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2086            self.pingreq_send_set = true;
2087            events.push(GenericEvent::RequestTimerReset {
2088                kind: TimerKind::PingrespRecv,
2089                duration_ms: timeout_ms,
2090            });
2091        }
2092        self.send_post_process(&mut events);
2093
2094        events
2095    }
2096
2097    pub(crate) fn process_send_v5_0_pingreq(
2098        &mut self,
2099        packet: v5_0::Pingreq,
2100    ) -> Vec<GenericEvent<PacketIdType>> {
2101        if !self.validate_maximum_packet_size_send(packet.size()) {
2102            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2103        }
2104        if self.status != ConnectionStatus::Connected {
2105            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2106        }
2107
2108        let mut events = Vec::new();
2109        events.push(GenericEvent::RequestSendPacket {
2110            packet: packet.into(),
2111            release_packet_id_if_send_error: None,
2112        });
2113        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2114            self.pingreq_send_set = true;
2115            events.push(GenericEvent::RequestTimerReset {
2116                kind: TimerKind::PingrespRecv,
2117                duration_ms: timeout_ms,
2118            });
2119        }
2120        self.send_post_process(&mut events);
2121
2122        events
2123    }
2124
2125    pub(crate) fn process_send_v3_1_1_pingresp(
2126        &mut self,
2127        packet: v3_1_1::Pingresp,
2128    ) -> Vec<GenericEvent<PacketIdType>> {
2129        if self.status != ConnectionStatus::Connected {
2130            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2131        }
2132        let mut events = Vec::new();
2133        events.push(GenericEvent::RequestSendPacket {
2134            packet: packet.into(),
2135            release_packet_id_if_send_error: None,
2136        });
2137        self.send_post_process(&mut events);
2138
2139        events
2140    }
2141
2142    pub(crate) fn process_send_v5_0_pingresp(
2143        &mut self,
2144        packet: v5_0::Pingresp,
2145    ) -> Vec<GenericEvent<PacketIdType>> {
2146        if !self.validate_maximum_packet_size_send(packet.size()) {
2147            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2148        }
2149        if self.status != ConnectionStatus::Connected {
2150            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2151        }
2152
2153        let mut events = Vec::new();
2154        events.push(GenericEvent::RequestSendPacket {
2155            packet: packet.into(),
2156            release_packet_id_if_send_error: None,
2157        });
2158        self.send_post_process(&mut events);
2159
2160        events
2161    }
2162
2163    pub(crate) fn process_send_v3_1_1_disconnect(
2164        &mut self,
2165        packet: v3_1_1::Disconnect,
2166    ) -> Vec<GenericEvent<PacketIdType>> {
2167        if self.status != ConnectionStatus::Connected {
2168            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2169        }
2170        let mut events = Vec::new();
2171        self.status = ConnectionStatus::Disconnected;
2172        self.cancel_timers(&mut events);
2173        events.push(GenericEvent::RequestSendPacket {
2174            packet: packet.into(),
2175            release_packet_id_if_send_error: None,
2176        });
2177        events.push(GenericEvent::RequestClose);
2178
2179        events
2180    }
2181
2182    pub(crate) fn process_send_v5_0_disconnect(
2183        &mut self,
2184        packet: v5_0::Disconnect,
2185    ) -> Vec<GenericEvent<PacketIdType>> {
2186        if !self.validate_maximum_packet_size_send(packet.size()) {
2187            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2188        }
2189        if self.status != ConnectionStatus::Connected {
2190            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2191        }
2192
2193        let mut events = Vec::new();
2194        self.status = ConnectionStatus::Disconnected;
2195        self.cancel_timers(&mut events);
2196        events.push(GenericEvent::RequestSendPacket {
2197            packet: packet.into(),
2198            release_packet_id_if_send_error: None,
2199        });
2200        events.push(GenericEvent::RequestClose);
2201
2202        events
2203    }
2204
2205    pub(crate) fn process_send_v5_0_auth(
2206        &mut self,
2207        packet: v5_0::Auth,
2208    ) -> Vec<GenericEvent<PacketIdType>> {
2209        if !self.validate_maximum_packet_size_send(packet.size()) {
2210            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2211        }
2212        if self.status == ConnectionStatus::Disconnected {
2213            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2214        }
2215
2216        let mut events = Vec::new();
2217        events.push(GenericEvent::RequestSendPacket {
2218            packet: packet.into(),
2219            release_packet_id_if_send_error: None,
2220        });
2221        self.send_post_process(&mut events);
2222
2223        events
2224    }
2225
2226    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2227        if self.is_client {
2228            if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2229                self.pingreq_send_set = true;
2230                events.push(GenericEvent::RequestTimerReset {
2231                    kind: TimerKind::PingreqSend,
2232                    duration_ms: timeout_ms,
2233                });
2234            }
2235        }
2236    }
2237
2238    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2239        if size > self.maximum_packet_size_send as usize {
2240            tracing::error!("packet size over maximum_packet_size for sending");
2241            return false;
2242        }
2243        true
2244    }
2245
2246    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2247        let mut events = Vec::new();
2248
2249        // packet size limit validation (v3.1.1 is always satisfied)
2250        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2251        if total_size > self.maximum_packet_size_recv {
2252            // This happens only when protocol version is V5.0.
2253            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2254            // If the packet size is over 268434555, feed() return an error.
2255            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2256            // So DISCONNECT packet is the right choice to notify the error.
2257            let disconnect_packet = v5_0::Disconnect::builder()
2258                .reason_code(DisconnectReasonCode::PacketTooLarge)
2259                .build()
2260                .unwrap();
2261            // Send disconnect packet directly without generic constraints
2262            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2263            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2264            return events;
2265        }
2266
2267        let packet_type = raw_packet.packet_type();
2268        let _flags = raw_packet.flags();
2269        match self.protocol_version {
2270            Version::V3_1_1 => {
2271                match packet_type {
2272                    1 => {
2273                        // CONNECT
2274                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2275                    }
2276                    2 => {
2277                        // CONNACK
2278                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2279                    }
2280                    3 => {
2281                        // PUBLISH
2282                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2283                    }
2284                    4 => {
2285                        // PUBACK
2286                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2287                    }
2288                    5 => {
2289                        // PUBREC
2290                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2291                    }
2292                    6 => {
2293                        // PUBREL
2294                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2295                    }
2296                    7 => {
2297                        // PUBCOMP
2298                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2299                    }
2300                    8 => {
2301                        // SUBSCRIBE
2302                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2303                    }
2304                    9 => {
2305                        // SUBACK
2306                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2307                    }
2308                    10 => {
2309                        // UNSUBSCRIBE
2310                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2311                    }
2312                    11 => {
2313                        // UNSUBACK
2314                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2315                    }
2316                    12 => {
2317                        // PINGREQ
2318                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2319                    }
2320                    13 => {
2321                        // PINGRESP
2322                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2323                    }
2324                    14 => {
2325                        // DISCONNECT
2326                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2327                    }
2328                    // invalid packet type
2329                    _ => {
2330                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2331                    }
2332                }
2333            }
2334            Version::V5_0 => {
2335                match packet_type {
2336                    1 => {
2337                        // CONNECT
2338                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2339                    }
2340                    2 => {
2341                        // CONNACK
2342                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2343                    }
2344                    3 => {
2345                        // PUBLISH
2346                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2347                    }
2348                    4 => {
2349                        // PUBACK
2350                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2351                    }
2352                    5 => {
2353                        // PUBREC
2354                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2355                    }
2356                    6 => {
2357                        // PUBREL
2358                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2359                    }
2360                    7 => {
2361                        // PUBCOMP
2362                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2363                    }
2364                    8 => {
2365                        // SUBSCRIBE
2366                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2367                    }
2368                    9 => {
2369                        // SUBACK
2370                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2371                    }
2372                    10 => {
2373                        // UNSUBSCRIBE
2374                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2375                    }
2376                    11 => {
2377                        // UNSUBACK
2378                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2379                    }
2380                    12 => {
2381                        // PINGREQ
2382                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2383                    }
2384                    13 => {
2385                        // PINGRESP
2386                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2387                    }
2388                    14 => {
2389                        // DISCONNECT
2390                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2391                    }
2392                    15 => {
2393                        // AUTH
2394                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2395                    }
2396                    // invalid packet type
2397                    _ => {
2398                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2399                    }
2400                }
2401            }
2402            Version::Undetermined => {
2403                match packet_type {
2404                    1 => {
2405                        // CONNECT
2406                        if raw_packet.remaining_length() < 7 {
2407                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2408                            return events;
2409                        }
2410                        match raw_packet.data_as_slice()[6] {
2411                            // Protocol Version
2412                            4 => {
2413                                self.protocol_version = Version::V3_1_1;
2414                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2415                            }
2416                            5 => {
2417                                self.protocol_version = Version::V5_0;
2418                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2419                            }
2420                            _ => {
2421                                events.push(GenericEvent::NotifyError(
2422                                    MqttError::UnsupportedProtocolVersion,
2423                                ));
2424                            }
2425                        }
2426                    }
2427                    _ => {
2428                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2429                    }
2430                }
2431            }
2432        }
2433
2434        events
2435    }
2436
2437    fn process_recv_v3_1_1_connect(
2438        &mut self,
2439        raw_packet: RawPacket,
2440    ) -> Vec<GenericEvent<PacketIdType>> {
2441        let mut events = Vec::new();
2442        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2443            Ok((packet, _)) => {
2444                if self.status != ConnectionStatus::Disconnected {
2445                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2446                    return events;
2447                }
2448                self.initialize(false);
2449                self.status = ConnectionStatus::Connecting;
2450                if packet.keep_alive() > 0 {
2451                    self.pingreq_recv_timeout_ms =
2452                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2453                }
2454                if packet.clean_session() {
2455                    self.clear_store_related();
2456                } else {
2457                    self.need_store = true;
2458                }
2459                events.extend(self.refresh_pingreq_recv());
2460                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2461            }
2462            Err(e) => {
2463                if self.status == ConnectionStatus::Disconnected {
2464                    self.status = ConnectionStatus::Connecting;
2465                    let rc = match e {
2466                        MqttError::ClientIdentifierNotValid => {
2467                            ConnectReturnCode::IdentifierRejected
2468                        }
2469                        MqttError::BadUserNameOrPassword => {
2470                            ConnectReturnCode::BadUserNameOrPassword
2471                        }
2472                        MqttError::UnsupportedProtocolVersion => {
2473                            ConnectReturnCode::UnacceptableProtocolVersion
2474                        }
2475                        _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2476                    };
2477                    let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2478                    let connack_events = self.process_send_v3_1_1_connack(connack);
2479                    events.extend(connack_events);
2480                } else {
2481                    events.push(GenericEvent::RequestClose);
2482                }
2483                events.push(GenericEvent::NotifyError(e));
2484            }
2485        }
2486
2487        events
2488    }
2489
2490    fn process_recv_v5_0_connect(
2491        &mut self,
2492        raw_packet: RawPacket,
2493    ) -> Vec<GenericEvent<PacketIdType>> {
2494        let mut events = Vec::new();
2495        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2496            Ok((packet, _)) => {
2497                if self.status != ConnectionStatus::Disconnected {
2498                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2499                    return events;
2500                }
2501                self.initialize(false);
2502                self.status = ConnectionStatus::Connecting;
2503                if packet.keep_alive() > 0 {
2504                    self.pingreq_recv_timeout_ms =
2505                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2506                }
2507                if packet.clean_start() {
2508                    self.clear_store_related();
2509                }
2510                packet.props().iter().for_each(|prop| match prop {
2511                    Property::TopicAliasMaximum(p) => {
2512                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2513                    }
2514                    Property::ReceiveMaximum(p) => {
2515                        self.publish_send_max = Some(p.val());
2516                    }
2517                    Property::MaximumPacketSize(p) => {
2518                        self.maximum_packet_size_send = p.val();
2519                    }
2520                    Property::SessionExpiryInterval(p) => {
2521                        if p.val() != 0 {
2522                            self.need_store = true;
2523                        }
2524                    }
2525                    _ => {}
2526                });
2527                events.extend(self.refresh_pingreq_recv());
2528                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2529            }
2530            Err(e) => {
2531                if self.status == ConnectionStatus::Disconnected {
2532                    self.status = ConnectionStatus::Connecting;
2533                    let rc = match e {
2534                        MqttError::ClientIdentifierNotValid => {
2535                            ConnectReasonCode::ClientIdentifierNotValid
2536                        }
2537                        MqttError::BadUserNameOrPassword => {
2538                            ConnectReasonCode::BadAuthenticationMethod
2539                        }
2540                        MqttError::UnsupportedProtocolVersion => {
2541                            ConnectReasonCode::UnsupportedProtocolVersion
2542                        }
2543                        _ => ConnectReasonCode::UnspecifiedError,
2544                    };
2545                    let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2546                    let connack_events = self.process_send_v5_0_connack(connack);
2547                    events.extend(connack_events);
2548                } else {
2549                    let disconnect = v5_0::Disconnect::builder()
2550                        .reason_code(DisconnectReasonCode::ProtocolError)
2551                        .build()
2552                        .unwrap();
2553                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2554                    events.extend(disconnect_events);
2555                }
2556                events.push(GenericEvent::NotifyError(e));
2557            }
2558        }
2559
2560        events
2561    }
2562
2563    fn process_recv_v3_1_1_connack(
2564        &mut self,
2565        raw_packet: RawPacket,
2566    ) -> Vec<GenericEvent<PacketIdType>> {
2567        let mut events = Vec::new();
2568
2569        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2570            Ok((packet, _consumed)) => {
2571                if packet.return_code() == ConnectReturnCode::Accepted {
2572                    self.status = ConnectionStatus::Connected;
2573                    if packet.session_present() {
2574                        events.extend(self.send_stored());
2575                    } else {
2576                        self.clear_store_related();
2577                    }
2578                }
2579                events.push(GenericEvent::NotifyPacketReceived(
2580                    GenericPacket::V3_1_1Connack(packet),
2581                ));
2582            }
2583            Err(e) => {
2584                events.push(GenericEvent::RequestClose);
2585                events.push(GenericEvent::NotifyError(e));
2586            }
2587        }
2588
2589        events
2590    }
2591
2592    fn process_recv_v5_0_connack(
2593        &mut self,
2594        raw_packet: RawPacket,
2595    ) -> Vec<GenericEvent<PacketIdType>> {
2596        let mut events = Vec::new();
2597
2598        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2599            Ok((packet, _consumed)) => {
2600                if packet.reason_code() == ConnectReasonCode::Success {
2601                    self.status = ConnectionStatus::Connected;
2602
2603                    // Process properties
2604                    for prop in packet.props() {
2605                        match prop {
2606                            Property::TopicAliasMaximum(val) => {
2607                                if val.val() > 0 {
2608                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2609                                }
2610                            }
2611                            Property::ReceiveMaximum(val) => {
2612                                assert!(val.val() != 0);
2613                                self.publish_send_max = Some(val.val());
2614                            }
2615                            Property::MaximumPacketSize(val) => {
2616                                assert!(val.val() != 0);
2617                                self.maximum_packet_size_send = val.val();
2618                            }
2619                            Property::ServerKeepAlive(val) => {
2620                                // Set PINGREQ send interval if this is a client
2621                                let timeout_ms = (val.val() as u64) * 1000;
2622                                self.pingreq_send_interval_ms = Some(timeout_ms);
2623                            }
2624                            _ => {
2625                                // Ignore other properties
2626                            }
2627                        }
2628                    }
2629
2630                    if packet.session_present() {
2631                        events.extend(self.send_stored());
2632                    } else {
2633                        self.clear_store_related();
2634                    }
2635                }
2636                events.push(GenericEvent::NotifyPacketReceived(
2637                    GenericPacket::V5_0Connack(packet),
2638                ));
2639            }
2640            Err(e) => {
2641                if self.status == ConnectionStatus::Connected {
2642                    let disconnect = v5_0::Disconnect::builder()
2643                        .reason_code(e.into())
2644                        .build()
2645                        .unwrap();
2646                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2647                    events.extend(disconnect_events);
2648                }
2649                events.push(GenericEvent::NotifyError(e));
2650            }
2651        }
2652
2653        events
2654    }
2655
2656    fn process_recv_v3_1_1_publish(
2657        &mut self,
2658        raw_packet: RawPacket,
2659    ) -> Vec<GenericEvent<PacketIdType>> {
2660        let mut events = Vec::new();
2661
2662        let flags = raw_packet.flags();
2663        match &raw_packet.data {
2664            PacketData::Publish(arc) => {
2665                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2666                    Ok((packet, _consumed)) => {
2667                        match packet.qos() {
2668                            Qos::AtMostOnce => {
2669                                events.extend(self.refresh_pingreq_recv());
2670                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2671                            }
2672                            Qos::AtLeastOnce => {
2673                                let packet_id = packet.packet_id().unwrap();
2674                                if self.status == ConnectionStatus::Connected
2675                                    && self.auto_pub_response
2676                                {
2677                                    // Send PUBACK automatically
2678                                    let puback = v3_1_1::GenericPuback::builder()
2679                                        .packet_id(packet_id)
2680                                        .build()
2681                                        .unwrap();
2682                                    events.extend(self.process_send_v3_1_1_puback(puback));
2683                                }
2684                                events.extend(self.refresh_pingreq_recv());
2685                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2686                            }
2687                            Qos::ExactlyOnce => {
2688                                let packet_id = packet.packet_id().unwrap();
2689                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2690
2691                                if self.status == ConnectionStatus::Connected
2692                                    && (self.auto_pub_response || already_handled)
2693                                {
2694                                    let pubrec = v3_1_1::GenericPubrec::builder()
2695                                        .packet_id(packet_id)
2696                                        .build()
2697                                        .unwrap();
2698                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2699                                }
2700                                events.extend(self.refresh_pingreq_recv());
2701                                if !already_handled {
2702                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2703                                }
2704                            }
2705                        }
2706                    }
2707                    Err(e) => {
2708                        events.push(GenericEvent::RequestClose);
2709                        events.push(GenericEvent::NotifyError(e));
2710                    }
2711                }
2712            }
2713            PacketData::Normal(_) => {
2714                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2715            }
2716        }
2717
2718        events
2719    }
2720
2721    fn process_recv_v5_0_publish(
2722        &mut self,
2723        raw_packet: RawPacket,
2724    ) -> Vec<GenericEvent<PacketIdType>> {
2725        let mut events = Vec::new();
2726
2727        let flags = raw_packet.flags();
2728        match &raw_packet.data {
2729            PacketData::Publish(arc) => {
2730                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2731                    Ok((packet, _consumed)) => {
2732                        let mut already_handled = false;
2733                        let mut puback_send = false;
2734                        let mut pubrec_send = false;
2735
2736                        match packet.qos() {
2737                            Qos::AtLeastOnce => {
2738                                let packet_id = packet.packet_id().unwrap();
2739                                if let Some(max) = self.publish_recv_max {
2740                                    if self.publish_recv.len() >= max as usize {
2741                                        let disconnect = v5_0::Disconnect::builder()
2742                                            .reason_code(
2743                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2744                                            )
2745                                            .build()
2746                                            .unwrap();
2747                                        events
2748                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2749                                        events.push(GenericEvent::NotifyError(
2750                                            MqttError::ReceiveMaximumExceeded,
2751                                        ));
2752                                        return events;
2753                                    }
2754                                }
2755                                self.publish_recv.insert(packet_id);
2756                                if self.auto_pub_response
2757                                    && self.status == ConnectionStatus::Connected
2758                                {
2759                                    puback_send = true;
2760                                }
2761                            }
2762                            Qos::ExactlyOnce => {
2763                                let packet_id = packet.packet_id().unwrap();
2764                                if let Some(max) = self.publish_recv_max {
2765                                    if self.publish_recv.len() >= max as usize {
2766                                        let disconnect = v5_0::Disconnect::builder()
2767                                            .reason_code(
2768                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2769                                            )
2770                                            .build()
2771                                            .unwrap();
2772                                        events
2773                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2774                                        events.push(GenericEvent::NotifyError(
2775                                            MqttError::ReceiveMaximumExceeded,
2776                                        ));
2777                                        return events;
2778                                    }
2779                                }
2780                                self.publish_recv.insert(packet_id);
2781
2782                                if !self.qos2_publish_handled.insert(packet_id) {
2783                                    already_handled = true;
2784                                }
2785                                if self.status == ConnectionStatus::Connected
2786                                    && (self.auto_pub_response || already_handled)
2787                                {
2788                                    pubrec_send = true;
2789                                }
2790                            }
2791                            Qos::AtMostOnce => {
2792                                // No packet ID handling for QoS 0
2793                            }
2794                        }
2795
2796                        // Topic Alias handling
2797                        if packet.topic_name().is_empty() {
2798                            // Extract topic from topic_alias
2799                            if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2800                                if ta == 0
2801                                    || self.topic_alias_recv.is_none()
2802                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2803                                {
2804                                    let disconnect = v5_0::Disconnect::builder()
2805                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2806                                        .build()
2807                                        .unwrap();
2808                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2809                                    events.push(GenericEvent::NotifyError(
2810                                        MqttError::TopicAliasInvalid,
2811                                    ));
2812                                    return events;
2813                                }
2814
2815                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2816                                    if topic_alias_recv.get(ta).is_none() {
2817                                        let disconnect = v5_0::Disconnect::builder()
2818                                            .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2819                                            .build()
2820                                            .unwrap();
2821                                        events
2822                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2823                                        events.push(GenericEvent::NotifyError(
2824                                            MqttError::TopicAliasInvalid,
2825                                        ));
2826                                        return events;
2827                                    }
2828                                    // Note: In a complete implementation, we would modify the packet
2829                                    // to add the resolved topic. For now, we'll proceed.
2830                                }
2831                            } else {
2832                                let disconnect = v5_0::Disconnect::builder()
2833                                    .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2834                                    .build()
2835                                    .unwrap();
2836                                events.extend(self.process_send_v5_0_disconnect(disconnect));
2837                                events
2838                                    .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2839                                return events;
2840                            }
2841                        } else {
2842                            // Topic is not empty, check if topic alias needs to be registered
2843                            if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2844                                if ta == 0
2845                                    || self.topic_alias_recv.is_none()
2846                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2847                                {
2848                                    let disconnect = v5_0::Disconnect::builder()
2849                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2850                                        .build()
2851                                        .unwrap();
2852                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2853                                    events.push(GenericEvent::NotifyError(
2854                                        MqttError::TopicAliasInvalid,
2855                                    ));
2856                                    return events;
2857                                }
2858                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2859                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2860                                }
2861                            }
2862                        }
2863
2864                        // Send response packets
2865                        if puback_send {
2866                            let puback = v5_0::GenericPuback::builder()
2867                                .packet_id(packet.packet_id().unwrap())
2868                                .build()
2869                                .unwrap();
2870                            events.extend(self.process_send_v5_0_puback(puback));
2871                        }
2872                        if pubrec_send {
2873                            let pubrec = v5_0::GenericPubrec::builder()
2874                                .packet_id(packet.packet_id().unwrap())
2875                                .build()
2876                                .unwrap();
2877                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2878                        }
2879
2880                        // Refresh PINGREQ receive timer
2881                        events.extend(self.refresh_pingreq_recv());
2882
2883                        // Notify packet received (only if not already handled)
2884                        if !already_handled {
2885                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2886                        }
2887                    }
2888                    Err(e) => {
2889                        if self.status == ConnectionStatus::Connected {
2890                            let disconnect = v5_0::Disconnect::builder()
2891                                .reason_code(e.into())
2892                                .build()
2893                                .unwrap();
2894                            let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2895                            events.extend(disconnect_events);
2896                        }
2897                        events.push(GenericEvent::NotifyError(e));
2898                    }
2899                }
2900            }
2901            PacketData::Normal(_) => {
2902                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2903            }
2904        }
2905
2906        events
2907    }
2908
2909    fn process_recv_v3_1_1_puback(
2910        &mut self,
2911        raw_packet: RawPacket,
2912    ) -> Vec<GenericEvent<PacketIdType>> {
2913        let mut events = Vec::new();
2914
2915        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2916            Ok((packet, _)) => {
2917                let packet_id = packet.packet_id();
2918                if self.pid_puback.remove(&packet_id) {
2919                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2920                    if self.pid_man.is_used_id(packet_id) {
2921                        self.pid_man.release_id(packet_id);
2922                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2923                    }
2924                    events.extend(self.refresh_pingreq_recv());
2925                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2926                } else {
2927                    events.push(GenericEvent::RequestClose);
2928                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2929                }
2930            }
2931            Err(e) => {
2932                events.push(GenericEvent::RequestClose);
2933                events.push(GenericEvent::NotifyError(e));
2934            }
2935        }
2936
2937        events
2938    }
2939
2940    fn process_recv_v5_0_puback(
2941        &mut self,
2942        raw_packet: RawPacket,
2943    ) -> Vec<GenericEvent<PacketIdType>> {
2944        let mut events = Vec::new();
2945
2946        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2947            Ok((packet, _)) => {
2948                let packet_id = packet.packet_id();
2949                if self.pid_puback.remove(&packet_id) {
2950                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2951                    if self.pid_man.is_used_id(packet_id) {
2952                        self.pid_man.release_id(packet_id);
2953                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2954                    }
2955                    if self.publish_send_max.is_some() {
2956                        self.publish_send_count -= 1;
2957                    }
2958                    events.extend(self.refresh_pingreq_recv());
2959                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2960                } else {
2961                    let disconnect = v5_0::Disconnect::builder()
2962                        .reason_code(DisconnectReasonCode::ProtocolError)
2963                        .build()
2964                        .unwrap();
2965                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2966                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2967                }
2968            }
2969            Err(e) => {
2970                let disconnect = v5_0::Disconnect::builder()
2971                    .reason_code(DisconnectReasonCode::ProtocolError)
2972                    .build()
2973                    .unwrap();
2974                events.extend(self.process_send_v5_0_disconnect(disconnect));
2975                events.push(GenericEvent::NotifyError(e));
2976            }
2977        }
2978
2979        events
2980    }
2981
2982    fn process_recv_v3_1_1_pubrec(
2983        &mut self,
2984        raw_packet: RawPacket,
2985    ) -> Vec<GenericEvent<PacketIdType>> {
2986        let mut events = Vec::new();
2987
2988        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2989            Ok((packet, _)) => {
2990                let packet_id = packet.packet_id();
2991                if self.pid_pubrec.remove(&packet_id) {
2992                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2993                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2994                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2995                            .packet_id(packet_id)
2996                            .build()
2997                            .unwrap();
2998                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2999                    }
3000                    events.extend(self.refresh_pingreq_recv());
3001                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3002                } else {
3003                    events.push(GenericEvent::RequestClose);
3004                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3005                }
3006            }
3007            Err(e) => {
3008                events.push(GenericEvent::RequestClose);
3009                events.push(GenericEvent::NotifyError(e));
3010            }
3011        }
3012
3013        events
3014    }
3015
3016    fn process_recv_v5_0_pubrec(
3017        &mut self,
3018        raw_packet: RawPacket,
3019    ) -> Vec<GenericEvent<PacketIdType>> {
3020        let mut events = Vec::new();
3021
3022        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3023            Ok((packet, _)) => {
3024                let packet_id = packet.packet_id();
3025                if self.pid_pubrec.remove(&packet_id) {
3026                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3027                    if let Some(reason_code) = packet.reason_code() {
3028                        if reason_code != PubrecReasonCode::Success {
3029                            if self.pid_man.is_used_id(packet_id) {
3030                                self.pid_man.release_id(packet_id);
3031                                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3032                            }
3033                            self.qos2_publish_processing.remove(&packet_id);
3034                            if self.publish_send_max.is_some() {
3035                                self.publish_send_count -= 1;
3036                            }
3037                        } else if self.auto_pub_response
3038                            && self.status == ConnectionStatus::Connected
3039                        {
3040                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3041                                .packet_id(packet_id)
3042                                .build()
3043                                .unwrap();
3044                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3045                        }
3046                    } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3047                        let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3048                            .packet_id(packet_id)
3049                            .build()
3050                            .unwrap();
3051                        events.extend(self.process_send_v5_0_pubrel(pubrel));
3052                    }
3053                    events.extend(self.refresh_pingreq_recv());
3054                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3055                } else {
3056                    let disconnect = v5_0::Disconnect::builder()
3057                        .reason_code(DisconnectReasonCode::ProtocolError)
3058                        .build()
3059                        .unwrap();
3060                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3061                    events.push(GenericEvent::NotifyError(MqttError::from(
3062                        DisconnectReasonCode::ProtocolError,
3063                    )));
3064                }
3065            }
3066            Err(e) => {
3067                let disconnect = v5_0::Disconnect::builder()
3068                    .reason_code(DisconnectReasonCode::ProtocolError)
3069                    .build()
3070                    .unwrap();
3071                events.extend(self.process_send_v5_0_disconnect(disconnect));
3072                events.push(GenericEvent::NotifyError(e));
3073            }
3074        }
3075
3076        events
3077    }
3078
3079    fn process_recv_v3_1_1_pubrel(
3080        &mut self,
3081        raw_packet: RawPacket,
3082    ) -> Vec<GenericEvent<PacketIdType>> {
3083        let mut events = Vec::new();
3084
3085        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3086            Ok((packet, _)) => {
3087                let packet_id = packet.packet_id();
3088                self.qos2_publish_handled.remove(&packet_id);
3089                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3090                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3091                        .packet_id(packet_id)
3092                        .build()
3093                        .unwrap();
3094                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3095                }
3096                events.extend(self.refresh_pingreq_recv());
3097                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3098            }
3099            Err(e) => {
3100                events.push(GenericEvent::RequestClose);
3101                events.push(GenericEvent::NotifyError(e));
3102            }
3103        }
3104
3105        events
3106    }
3107
3108    fn process_recv_v5_0_pubrel(
3109        &mut self,
3110        raw_packet: RawPacket,
3111    ) -> Vec<GenericEvent<PacketIdType>> {
3112        let mut events = Vec::new();
3113
3114        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3115            Ok((packet, _)) => {
3116                let packet_id = packet.packet_id();
3117                self.qos2_publish_handled.remove(&packet_id);
3118                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3119                    let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3120                        .packet_id(packet_id)
3121                        .build()
3122                        .unwrap();
3123                    events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3124                }
3125                events.extend(self.refresh_pingreq_recv());
3126                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3127            }
3128            Err(e) => {
3129                let disconnect = v5_0::Disconnect::builder()
3130                    .reason_code(DisconnectReasonCode::ProtocolError)
3131                    .build()
3132                    .unwrap();
3133                events.extend(self.process_send_v5_0_disconnect(disconnect));
3134                events.push(GenericEvent::NotifyError(e));
3135            }
3136        }
3137
3138        events
3139    }
3140
3141    fn process_recv_v3_1_1_pubcomp(
3142        &mut self,
3143        raw_packet: RawPacket,
3144    ) -> Vec<GenericEvent<PacketIdType>> {
3145        let mut events = Vec::new();
3146
3147        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3148            Ok((packet, _)) => {
3149                let packet_id = packet.packet_id();
3150                if self.pid_pubcomp.remove(&packet_id) {
3151                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3152                    if self.pid_man.is_used_id(packet_id) {
3153                        self.pid_man.release_id(packet_id);
3154                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3155                    }
3156                    self.qos2_publish_processing.remove(&packet_id);
3157                    events.extend(self.refresh_pingreq_recv());
3158                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3159                } else {
3160                    events.push(GenericEvent::RequestClose);
3161                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3162                }
3163            }
3164            Err(e) => {
3165                events.push(GenericEvent::RequestClose);
3166                events.push(GenericEvent::NotifyError(e));
3167            }
3168        }
3169
3170        events
3171    }
3172
3173    fn process_recv_v5_0_pubcomp(
3174        &mut self,
3175        raw_packet: RawPacket,
3176    ) -> Vec<GenericEvent<PacketIdType>> {
3177        let mut events = Vec::new();
3178
3179        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3180            Ok((packet, _)) => {
3181                let packet_id = packet.packet_id();
3182                if self.pid_pubcomp.remove(&packet_id) {
3183                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3184                    if self.pid_man.is_used_id(packet_id) {
3185                        self.pid_man.release_id(packet_id);
3186                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3187                    }
3188                    self.qos2_publish_processing.remove(&packet_id);
3189                    if self.publish_send_max.is_some() {
3190                        self.publish_send_count -= 1;
3191                    }
3192                    events.extend(self.refresh_pingreq_recv());
3193                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3194                } else {
3195                    let disconnect = v5_0::Disconnect::builder()
3196                        .reason_code(DisconnectReasonCode::ProtocolError)
3197                        .build()
3198                        .unwrap();
3199                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3200                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3201                }
3202            }
3203            Err(e) => {
3204                let disconnect = v5_0::Disconnect::builder()
3205                    .reason_code(DisconnectReasonCode::ProtocolError)
3206                    .build()
3207                    .unwrap();
3208                events.extend(self.process_send_v5_0_disconnect(disconnect));
3209                events.push(GenericEvent::NotifyError(e));
3210            }
3211        }
3212
3213        events
3214    }
3215
3216    fn process_recv_v3_1_1_subscribe(
3217        &mut self,
3218        raw_packet: RawPacket,
3219    ) -> Vec<GenericEvent<PacketIdType>> {
3220        let mut events = Vec::new();
3221
3222        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3223            Ok((packet, _)) => {
3224                events.extend(self.refresh_pingreq_recv());
3225                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3226            }
3227            Err(e) => {
3228                events.push(GenericEvent::RequestClose);
3229                events.push(GenericEvent::NotifyError(e));
3230            }
3231        }
3232
3233        events
3234    }
3235
3236    fn process_recv_v5_0_subscribe(
3237        &mut self,
3238        raw_packet: RawPacket,
3239    ) -> Vec<GenericEvent<PacketIdType>> {
3240        let mut events = Vec::new();
3241
3242        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3243            Ok((packet, _)) => {
3244                events.extend(self.refresh_pingreq_recv());
3245                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3246            }
3247            Err(e) => {
3248                let disconnect = v5_0::Disconnect::builder()
3249                    .reason_code(DisconnectReasonCode::ProtocolError)
3250                    .build()
3251                    .unwrap();
3252                events.extend(self.process_send_v5_0_disconnect(disconnect));
3253                events.push(GenericEvent::NotifyError(e));
3254            }
3255        }
3256
3257        events
3258    }
3259
3260    fn process_recv_v3_1_1_suback(
3261        &mut self,
3262        raw_packet: RawPacket,
3263    ) -> Vec<GenericEvent<PacketIdType>> {
3264        let mut events = Vec::new();
3265
3266        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3267            Ok((packet, _)) => {
3268                let packet_id = packet.packet_id();
3269                if self.pid_suback.remove(&packet_id) {
3270                    if self.pid_man.is_used_id(packet_id) {
3271                        self.pid_man.release_id(packet_id);
3272                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3273                    }
3274                    events.extend(self.refresh_pingreq_recv());
3275                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3276                } else {
3277                    events.push(GenericEvent::RequestClose);
3278                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3279                }
3280            }
3281            Err(e) => {
3282                events.push(GenericEvent::RequestClose);
3283                events.push(GenericEvent::NotifyError(e));
3284            }
3285        }
3286
3287        events
3288    }
3289
3290    fn process_recv_v5_0_suback(
3291        &mut self,
3292        raw_packet: RawPacket,
3293    ) -> Vec<GenericEvent<PacketIdType>> {
3294        let mut events = Vec::new();
3295
3296        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3297            Ok((packet, _)) => {
3298                let packet_id = packet.packet_id();
3299                if self.pid_suback.remove(&packet_id) {
3300                    if self.pid_man.is_used_id(packet_id) {
3301                        self.pid_man.release_id(packet_id);
3302                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3303                    }
3304                    events.extend(self.refresh_pingreq_recv());
3305                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3306                } else {
3307                    let disconnect = v5_0::Disconnect::builder()
3308                        .reason_code(DisconnectReasonCode::ProtocolError)
3309                        .build()
3310                        .unwrap();
3311                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3312                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3313                }
3314            }
3315            Err(e) => {
3316                let disconnect = v5_0::Disconnect::builder()
3317                    .reason_code(DisconnectReasonCode::ProtocolError)
3318                    .build()
3319                    .unwrap();
3320                events.extend(self.process_send_v5_0_disconnect(disconnect));
3321                events.push(GenericEvent::NotifyError(e));
3322            }
3323        }
3324
3325        events
3326    }
3327
3328    fn process_recv_v3_1_1_unsubscribe(
3329        &mut self,
3330        raw_packet: RawPacket,
3331    ) -> Vec<GenericEvent<PacketIdType>> {
3332        let mut events = Vec::new();
3333
3334        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3335            Ok((packet, _)) => {
3336                events.extend(self.refresh_pingreq_recv());
3337                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3338            }
3339            Err(e) => {
3340                events.push(GenericEvent::RequestClose);
3341                events.push(GenericEvent::NotifyError(e));
3342            }
3343        }
3344
3345        events
3346    }
3347
3348    fn process_recv_v5_0_unsubscribe(
3349        &mut self,
3350        raw_packet: RawPacket,
3351    ) -> Vec<GenericEvent<PacketIdType>> {
3352        let mut events = Vec::new();
3353
3354        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3355            Ok((packet, _)) => {
3356                events.extend(self.refresh_pingreq_recv());
3357                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3358            }
3359            Err(e) => {
3360                let disconnect = v5_0::Disconnect::builder()
3361                    .reason_code(DisconnectReasonCode::ProtocolError)
3362                    .build()
3363                    .unwrap();
3364                events.extend(self.process_send_v5_0_disconnect(disconnect));
3365                events.push(GenericEvent::NotifyError(e));
3366            }
3367        }
3368
3369        events
3370    }
3371
3372    fn process_recv_v3_1_1_unsuback(
3373        &mut self,
3374        raw_packet: RawPacket,
3375    ) -> Vec<GenericEvent<PacketIdType>> {
3376        let mut events = Vec::new();
3377
3378        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3379            Ok((packet, _)) => {
3380                let packet_id = packet.packet_id();
3381                if self.pid_unsuback.remove(&packet_id) {
3382                    if self.pid_man.is_used_id(packet_id) {
3383                        self.pid_man.release_id(packet_id);
3384                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3385                    }
3386                    events.extend(self.refresh_pingreq_recv());
3387                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3388                } else {
3389                    events.push(GenericEvent::RequestClose);
3390                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3391                }
3392            }
3393            Err(e) => {
3394                events.push(GenericEvent::RequestClose);
3395                events.push(GenericEvent::NotifyError(e));
3396            }
3397        }
3398
3399        events
3400    }
3401
3402    fn process_recv_v5_0_unsuback(
3403        &mut self,
3404        raw_packet: RawPacket,
3405    ) -> Vec<GenericEvent<PacketIdType>> {
3406        let mut events = Vec::new();
3407
3408        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3409            Ok((packet, _)) => {
3410                let packet_id = packet.packet_id();
3411                if self.pid_unsuback.remove(&packet_id) {
3412                    if self.pid_man.is_used_id(packet_id) {
3413                        self.pid_man.release_id(packet_id);
3414                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3415                    }
3416                    events.extend(self.refresh_pingreq_recv());
3417                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3418                } else {
3419                    let disconnect = v5_0::Disconnect::builder()
3420                        .reason_code(DisconnectReasonCode::ProtocolError)
3421                        .build()
3422                        .unwrap();
3423                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3424                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3425                }
3426            }
3427            Err(e) => {
3428                let disconnect = v5_0::Disconnect::builder()
3429                    .reason_code(DisconnectReasonCode::ProtocolError)
3430                    .build()
3431                    .unwrap();
3432                events.extend(self.process_send_v5_0_disconnect(disconnect));
3433                events.push(GenericEvent::NotifyError(e));
3434            }
3435        }
3436
3437        events
3438    }
3439
3440    fn process_recv_v3_1_1_pingreq(
3441        &mut self,
3442        raw_packet: RawPacket,
3443    ) -> Vec<GenericEvent<PacketIdType>> {
3444        let mut events = Vec::new();
3445
3446        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3447            Ok((packet, _)) => {
3448                if (Role::IS_SERVER || Role::IS_ANY) && !self.is_client {
3449                    if self.auto_ping_response && self.status == ConnectionStatus::Connected {
3450                        let pingresp = v3_1_1::Pingresp::new();
3451                        events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3452                    }
3453                }
3454                events.extend(self.refresh_pingreq_recv());
3455                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3456            }
3457            Err(e) => {
3458                events.push(GenericEvent::RequestClose);
3459                events.push(GenericEvent::NotifyError(e));
3460            }
3461        }
3462
3463        events
3464    }
3465
3466    fn process_recv_v5_0_pingreq(
3467        &mut self,
3468        raw_packet: RawPacket,
3469    ) -> Vec<GenericEvent<PacketIdType>> {
3470        let mut events = Vec::new();
3471
3472        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3473            Ok((packet, _)) => {
3474                if (Role::IS_SERVER || Role::IS_ANY) && !self.is_client {
3475                    if self.auto_ping_response && self.status == ConnectionStatus::Connected {
3476                        let pingresp = v5_0::Pingresp::new();
3477                        events.extend(self.process_send_v5_0_pingresp(pingresp));
3478                    }
3479                }
3480                events.extend(self.refresh_pingreq_recv());
3481                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3482            }
3483            Err(e) => {
3484                let disconnect = v5_0::Disconnect::builder()
3485                    .reason_code(DisconnectReasonCode::ProtocolError)
3486                    .build()
3487                    .unwrap();
3488                events.extend(self.process_send_v5_0_disconnect(disconnect));
3489                events.push(GenericEvent::NotifyError(e));
3490            }
3491        }
3492
3493        events
3494    }
3495
3496    fn process_recv_v3_1_1_pingresp(
3497        &mut self,
3498        raw_packet: RawPacket,
3499    ) -> Vec<GenericEvent<PacketIdType>> {
3500        let mut events = Vec::new();
3501
3502        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3503            Ok((packet, _)) => {
3504                self.pingresp_recv_set = false;
3505                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3506                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3507            }
3508            Err(e) => {
3509                events.push(GenericEvent::RequestClose);
3510                events.push(GenericEvent::NotifyError(e));
3511            }
3512        }
3513
3514        events
3515    }
3516
3517    fn process_recv_v5_0_pingresp(
3518        &mut self,
3519        raw_packet: RawPacket,
3520    ) -> Vec<GenericEvent<PacketIdType>> {
3521        let mut events = Vec::new();
3522
3523        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3524            Ok((packet, _)) => {
3525                self.pingresp_recv_set = false;
3526                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3527                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3528            }
3529            Err(e) => {
3530                let disconnect = v5_0::Disconnect::builder()
3531                    .reason_code(DisconnectReasonCode::ProtocolError)
3532                    .build()
3533                    .unwrap();
3534                events.extend(self.process_send_v5_0_disconnect(disconnect));
3535                events.push(GenericEvent::NotifyError(e));
3536            }
3537        }
3538
3539        events
3540    }
3541
3542    fn process_recv_v3_1_1_disconnect(
3543        &mut self,
3544        raw_packet: RawPacket,
3545    ) -> Vec<GenericEvent<PacketIdType>> {
3546        let mut events = Vec::new();
3547
3548        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3549            Ok((packet, _)) => {
3550                self.cancel_timers(&mut events);
3551                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3552            }
3553            Err(e) => {
3554                events.push(GenericEvent::RequestClose);
3555                events.push(GenericEvent::NotifyError(e));
3556            }
3557        }
3558
3559        events
3560    }
3561
3562    fn process_recv_v5_0_disconnect(
3563        &mut self,
3564        raw_packet: RawPacket,
3565    ) -> Vec<GenericEvent<PacketIdType>> {
3566        let mut events = Vec::new();
3567
3568        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3569            Ok((packet, _)) => {
3570                self.cancel_timers(&mut events);
3571                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3572            }
3573            Err(e) => {
3574                let disconnect = v5_0::Disconnect::builder()
3575                    .reason_code(DisconnectReasonCode::ProtocolError)
3576                    .build()
3577                    .unwrap();
3578                events.extend(self.process_send_v5_0_disconnect(disconnect));
3579                events.push(GenericEvent::NotifyError(e));
3580            }
3581        }
3582
3583        events
3584    }
3585
3586    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3587        let mut events = Vec::new();
3588
3589        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3590            Ok((packet, _)) => {
3591                events.extend(self.refresh_pingreq_recv());
3592                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3593            }
3594            Err(e) => {
3595                let disconnect = v5_0::Disconnect::builder()
3596                    .reason_code(DisconnectReasonCode::ProtocolError)
3597                    .build()
3598                    .unwrap();
3599                events.extend(self.process_send_v5_0_disconnect(disconnect));
3600                events.push(GenericEvent::NotifyError(e));
3601            }
3602        }
3603
3604        events
3605    }
3606
3607    fn get_topic_alias_from_props_opt(props: &Option<Vec<Property>>) -> Option<u16> {
3608        if let Some(props) = props {
3609            Self::get_topic_alias_from_props(props.as_slice())
3610        } else {
3611            None
3612        }
3613    }
3614
3615    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3616        let mut events = Vec::new();
3617        if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3618            if self.status == ConnectionStatus::Connecting
3619                || self.status == ConnectionStatus::Connected
3620            {
3621                self.pingreq_recv_set = true;
3622                events.push(GenericEvent::RequestTimerReset {
3623                    kind: TimerKind::PingreqRecv,
3624                    duration_ms: timeout_ms,
3625                });
3626            } else {
3627                self.pingreq_recv_set = false;
3628                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3629            }
3630        }
3631
3632        events
3633    }
3634
3635    /// Cancel timers and collect events instead of calling handlers
3636    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3637        if self.pingreq_send_set {
3638            self.pingreq_send_set = false;
3639            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3640        }
3641        if self.pingreq_recv_set {
3642            self.pingreq_recv_set = false;
3643            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3644        }
3645        if self.pingresp_recv_set {
3646            self.pingresp_recv_set = false;
3647            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3648        }
3649    }
3650
3651    /// Helper function to extract TopicAlias from properties
3652    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3653        for prop in props {
3654            if let Property::TopicAlias(ta) = prop {
3655                return Some(ta.val());
3656            }
3657        }
3658        None
3659    }
3660
3661    #[allow(dead_code)]
3662    fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3663        self.pid_man.is_used_id(packet_id)
3664    }
3665}
3666
3667// traits
3668
3669pub trait RecvBehavior<Role, PacketIdType>
3670where
3671    PacketIdType: IsPacketId,
3672{
3673    fn recv(&mut self, data: &mut std::io::Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3674}
3675
3676// RecvBehavior implementations
3677impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3678    for GenericConnection<role::Client, PacketIdType>
3679where
3680    PacketIdType: IsPacketId,
3681{
3682    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3683        self.recv(data)
3684    }
3685}
3686
3687impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3688    for GenericConnection<role::Server, PacketIdType>
3689where
3690    PacketIdType: IsPacketId,
3691{
3692    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3693        self.recv(data)
3694    }
3695}
3696
3697impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3698    for GenericConnection<role::Any, PacketIdType>
3699where
3700    PacketIdType: IsPacketId,
3701{
3702    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3703        self.recv(data)
3704    }
3705}
3706
3707// tests
3708
3709#[cfg(test)]
3710mod tests {
3711    use super::*;
3712    use crate::mqtt::packet::TopicAliasSend;
3713    use crate::mqtt::role;
3714    use crate::mqtt::version::Version;
3715
3716    #[test]
3717    fn test_initialize_client_mode() {
3718        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3719
3720        // Initialize in client mode
3721        connection.initialize(true);
3722
3723        // Verify client mode is set
3724        assert!(connection.is_client);
3725        assert_eq!(connection.publish_send_count, 0);
3726        assert!(connection.publish_send_max.is_none());
3727        assert!(connection.publish_recv_max.is_none());
3728        assert!(!connection.need_store);
3729    }
3730
3731    #[test]
3732    fn test_initialize_server_mode() {
3733        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3734
3735        // Initialize in server mode
3736        connection.initialize(false);
3737
3738        // Verify server mode is set
3739        assert!(!connection.is_client);
3740        assert_eq!(connection.publish_send_count, 0);
3741        assert!(connection.publish_send_max.is_none());
3742        assert!(connection.publish_recv_max.is_none());
3743        assert!(!connection.need_store);
3744    }
3745
3746    #[test]
3747    fn test_validate_topic_alias_no_topic_alias_send() {
3748        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3749
3750        // Should return None when topic_alias_send is not configured
3751        let result = connection.validate_topic_alias(Some(1));
3752        assert!(result.is_none());
3753    }
3754
3755    #[test]
3756    fn test_validate_topic_alias_none_input() {
3757        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3758
3759        // Should return None when no topic alias is provided
3760        let result = connection.validate_topic_alias(None);
3761        assert!(result.is_none());
3762    }
3763
3764    #[test]
3765    fn test_validate_topic_alias_range_no_topic_alias_send() {
3766        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3767
3768        // Should return false when topic_alias_send is not configured
3769        let result = connection.validate_topic_alias_range(1);
3770        assert!(!result);
3771    }
3772
3773    #[test]
3774    fn test_validate_topic_alias_range_zero() {
3775        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3776
3777        // Set up topic alias send with max 10
3778        let topic_alias_send = TopicAliasSend::new(10);
3779        connection.topic_alias_send = Some(topic_alias_send);
3780
3781        // Should return false for alias 0
3782        let result = connection.validate_topic_alias_range(0);
3783        assert!(!result);
3784    }
3785
3786    #[test]
3787    fn test_validate_topic_alias_range_over_max() {
3788        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3789
3790        // Set up topic alias send with max 5
3791        let topic_alias_send = TopicAliasSend::new(5);
3792        connection.topic_alias_send = Some(topic_alias_send);
3793
3794        // Should return false for alias > max
3795        let result = connection.validate_topic_alias_range(6);
3796        assert!(!result);
3797    }
3798
3799    #[test]
3800    fn test_validate_topic_alias_range_valid() {
3801        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3802
3803        // Set up topic alias send with max 10
3804        let topic_alias_send = TopicAliasSend::new(10);
3805        connection.topic_alias_send = Some(topic_alias_send);
3806
3807        // Should return true for valid aliases
3808        assert!(connection.validate_topic_alias_range(1));
3809        assert!(connection.validate_topic_alias_range(5));
3810        assert!(connection.validate_topic_alias_range(10));
3811    }
3812
3813    #[test]
3814    fn test_validate_topic_alias_with_registered_alias() {
3815        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3816
3817        // Set up topic alias send with max 10
3818        let mut topic_alias_send = TopicAliasSend::new(10);
3819        topic_alias_send.insert_or_update("test/topic", 5);
3820        connection.topic_alias_send = Some(topic_alias_send);
3821
3822        // Should return the topic name for registered alias
3823        let result = connection.validate_topic_alias(Some(5));
3824        assert_eq!(result, Some("test/topic".to_string()));
3825    }
3826
3827    #[test]
3828    fn test_validate_topic_alias_unregistered_alias() {
3829        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3830
3831        // Set up topic alias send with max 10 but don't register any aliases
3832        let topic_alias_send = TopicAliasSend::new(10);
3833        connection.topic_alias_send = Some(topic_alias_send);
3834
3835        // Should return None for unregistered alias
3836        let result = connection.validate_topic_alias(Some(5));
3837        assert!(result.is_none());
3838    }
3839
3840    #[test]
3841    fn test_validate_maximum_packet_size_within_limit() {
3842        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3843
3844        // Default maximum_packet_size_send is u32::MAX
3845        let result = connection.validate_maximum_packet_size_send(1000);
3846        assert!(result);
3847    }
3848
3849    #[test]
3850    fn test_validate_maximum_packet_size_at_limit() {
3851        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3852
3853        // Set a specific limit
3854        connection.maximum_packet_size_send = 1000;
3855
3856        // Should return true for size equal to limit
3857        let result = connection.validate_maximum_packet_size_send(1000);
3858        assert!(result);
3859    }
3860
3861    #[test]
3862    fn test_validate_maximum_packet_size_over_limit() {
3863        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3864
3865        // Set a specific limit
3866        connection.maximum_packet_size_send = 1000;
3867
3868        // Should return false for size over limit
3869        let result = connection.validate_maximum_packet_size_send(1001);
3870        assert!(!result);
3871    }
3872
3873    #[test]
3874    fn test_validate_maximum_packet_size_zero_limit() {
3875        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3876
3877        // Set limit to 0
3878        connection.maximum_packet_size_send = 0;
3879
3880        // Should return false for any non-zero size
3881        let result = connection.validate_maximum_packet_size_send(1);
3882        assert!(!result);
3883
3884        // Should return true for zero size
3885        let result = connection.validate_maximum_packet_size_send(0);
3886        assert!(result);
3887    }
3888
3889    #[test]
3890    fn test_initialize_clears_state() {
3891        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3892
3893        // Set up some state that should be cleared
3894        connection.publish_send_count = 5;
3895        connection.need_store = true;
3896        connection.pid_suback.insert(123);
3897        connection.pid_unsuback.insert(456);
3898
3899        // Initialize should clear state
3900        connection.initialize(true);
3901
3902        // Verify state is cleared
3903        assert_eq!(connection.publish_send_count, 0);
3904        assert!(!connection.need_store);
3905        assert!(connection.pid_suback.is_empty());
3906        assert!(connection.pid_unsuback.is_empty());
3907        assert!(connection.is_client);
3908    }
3909
3910    #[test]
3911    fn test_remaining_length_to_total_size() {
3912        // Test 1-byte remaining length encoding (0-127)
3913        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3914        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3915
3916        // Test 2-byte remaining length encoding (128-16383)
3917        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3918        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3919
3920        // Test 3-byte remaining length encoding (16384-2097151)
3921        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3922        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3923
3924        // Test 4-byte remaining length encoding (2097152-268435455)
3925        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3926        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3927    }
3928}