mqtt_protocol_core/mqtt/connection/
core.rs

1// MIT License
2//
3// Copyright (c) 2025 Takatoshi Kondo
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to deal
7// in the Software without restriction, including without limitation the rights
8// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9// copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in all
13// copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21// SOFTWARE.
22use alloc::{
23    string::{String, ToString},
24    vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33
34use serde::Serialize;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
37enum ConnectionStatus {
38    #[serde(rename = "disconnected")]
39    Disconnected,
40    #[serde(rename = "connecting")]
41    Connecting,
42    #[serde(rename = "connected")]
43    Connected,
44}
45use crate::mqtt::connection::packet_builder::{
46    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
47};
48use crate::mqtt::connection::packet_id_manager::PacketIdManager;
49use crate::mqtt::connection::role;
50use crate::mqtt::connection::role::RoleType;
51use crate::mqtt::connection::sendable::Sendable;
52use crate::mqtt::connection::version::*;
53use crate::mqtt::packet::v3_1_1;
54use crate::mqtt::packet::v5_0;
55use crate::mqtt::packet::GenericPacket;
56use crate::mqtt::packet::GenericStorePacket;
57use crate::mqtt::packet::IsPacketId;
58use crate::mqtt::packet::Qos;
59use crate::mqtt::packet::ResponsePacket;
60use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
61use crate::mqtt::prelude::GenericPacketTrait;
62use crate::mqtt::result_code::{
63    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
64};
65
66/// MQTT protocol maximum packet size limit
67/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
68const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
69
70/// Calculate total packet size from remaining length
71///
72/// The total packet size consists of:
73/// - 1 byte for the fixed header
74/// - 1-4 bytes for the remaining length encoding
75/// - The remaining length value itself
76fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
77    let remaining_length_bytes = if remaining_length < 128 {
78        1
79    } else if remaining_length < 16384 {
80        2
81    } else if remaining_length < 2097152 {
82        3
83    } else {
84        4
85    };
86
87    1 + remaining_length_bytes + remaining_length
88}
89
90/// Type alias for Event with u16 packet ID (most common case)
91///
92/// This is a convenience type alias that most applications will use.
93/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
94pub type Event = GenericEvent<u16>;
95
96/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
97///
98/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
99/// It handles MQTT packet processing, state management, and protocol compliance without performing
100/// any actual network I/O operations. Instead, it returns events that the application must handle.
101///
102/// # Type Parameters
103///
104/// * `Role` - The connection role (Client or Server), determining allowed operations
105/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
106///
107/// # Key Features
108///
109/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
110/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
111/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
112/// - **Configurable Behavior**: Supports various configuration options for different use cases
113/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
114///
115/// # Usage
116///
117/// ```ignore
118/// use mqtt_protocol_core::mqtt;
119///
120/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
121///
122/// // Send a packet
123/// let events = connection.send(publish_packet);
124/// for event in events {
125///     match event {
126///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
127///             // Send packet over network
128///         },
129///         // Handle other events...
130///     }
131/// }
132///
133/// // Receive data
134/// let mut cursor = std::io::Cursor::new(received_data);
135/// let events = connection.recv(&mut cursor);
136/// // Process events...
137/// ```
138pub struct GenericConnection<Role, PacketIdType>
139where
140    Role: RoleType,
141    PacketIdType: IsPacketId,
142{
143    _marker: PhantomData<Role>,
144
145    protocol_version: Version,
146
147    pid_man: PacketIdManager<PacketIdType>,
148    pid_suback: HashSet<PacketIdType>,
149    pid_unsuback: HashSet<PacketIdType>,
150    pid_puback: HashSet<PacketIdType>,
151    pid_pubrec: HashSet<PacketIdType>,
152    pid_pubcomp: HashSet<PacketIdType>,
153
154    need_store: bool,
155    // Store for retransmission packets
156    store: GenericStore<PacketIdType>,
157
158    offline_publish: bool,
159    auto_pub_response: bool,
160    auto_ping_response: bool,
161
162    // Auto map topic alias for sending
163    auto_map_topic_alias_send: bool,
164    // Auto replace topic alias for sending
165    auto_replace_topic_alias_send: bool,
166    // Topic alias management for receiving
167    topic_alias_recv: Option<TopicAliasRecv>,
168    // Topic alias management for sending
169    topic_alias_send: Option<TopicAliasSend>,
170
171    publish_send_max: Option<u16>,
172    // Maximum number of concurrent PUBLISH packets for receiving
173    publish_recv_max: Option<u16>,
174    // Maximum number of concurrent PUBLISH packets for sending
175    // Current count of PUBLISH packets being sent
176    publish_send_count: u16,
177
178    // Set of received PUBLISH packets (for flow control)
179    publish_recv: HashSet<PacketIdType>,
180
181    // Maximum packet size for sending
182    maximum_packet_size_send: u32,
183    // Maximum packet size for receiving
184    maximum_packet_size_recv: u32,
185
186    // Connection state
187    status: ConnectionStatus,
188
189    // PINGREQ send interval in milliseconds
190    pingreq_send_interval_ms: Option<u64>,
191    // PINGREQ receive timeout in milliseconds
192    pingreq_recv_timeout_ms: Option<u64>,
193    // PINGRESP receive timeout in milliseconds
194    pingresp_recv_timeout_ms: Option<u64>,
195
196    // QoS2 PUBLISH packet handling state (for duplicate detection)
197    qos2_publish_handled: HashSet<PacketIdType>,
198    // QoS2 PUBLISH packet processing state
199    qos2_publish_processing: HashSet<PacketIdType>,
200
201    // Timer state flags
202    pingreq_send_set: bool,
203    pingreq_recv_set: bool,
204    pingresp_recv_set: bool,
205
206    packet_builder: PacketBuilder,
207    // Client/Server mode flag
208    is_client: bool,
209}
210
211/// Type alias for Connection with u16 packet ID (standard case)
212///
213/// This is the standard Connection type that most applications will use.
214/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
215/// supporting values from 1 to 65535.
216///
217/// For extended scenarios where larger packet ID ranges are needed
218/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
219///
220/// # Type Parameters
221///
222/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
223pub type Connection<Role> = GenericConnection<Role, u16>;
224
225impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
226where
227    Role: RoleType,
228    PacketIdType: IsPacketId,
229{
230    /// Create a new MQTT connection instance
231    ///
232    /// Initializes a new MQTT connection with the specified protocol version.
233    /// The connection starts in a disconnected state and must be activated through
234    /// the connection handshake process (CONNECT/CONNACK).
235    ///
236    /// # Parameters
237    ///
238    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
239    ///
240    /// # Returns
241    ///
242    /// A new `GenericConnection` instance ready for use
243    ///
244    /// # Examples
245    ///
246    /// ```ignore
247    /// use mqtt_protocol_core::mqtt;
248    ///
249    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
250    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
251    /// ```
252    pub fn new(version: Version) -> Self {
253        Self {
254            _marker: PhantomData,
255            protocol_version: version,
256            pid_man: PacketIdManager::new(),
257            pid_suback: HashSet::default(),
258            pid_unsuback: HashSet::default(),
259            pid_puback: HashSet::default(),
260            pid_pubrec: HashSet::default(),
261            pid_pubcomp: HashSet::default(),
262            need_store: false,
263            store: GenericStore::new(),
264            offline_publish: false,
265            auto_pub_response: false,
266            auto_ping_response: false,
267            auto_map_topic_alias_send: false,
268            auto_replace_topic_alias_send: false,
269            topic_alias_recv: None,
270            topic_alias_send: None,
271            publish_send_max: None,
272            publish_recv_max: None,
273            publish_send_count: 0,
274            publish_recv: HashSet::default(),
275            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
276            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
277            status: ConnectionStatus::Disconnected,
278            pingreq_send_interval_ms: None,
279            pingreq_recv_timeout_ms: None,
280            pingresp_recv_timeout_ms: None,
281            qos2_publish_handled: HashSet::default(),
282            qos2_publish_processing: HashSet::default(),
283            pingreq_send_set: false,
284            pingreq_recv_set: false,
285            pingresp_recv_set: false,
286            packet_builder: PacketBuilder::new(),
287            is_client: false,
288        }
289    }
290
291    // public
292
293    /// Send MQTT packet with compile-time role checking (experimental)
294    ///
295    /// This method provides compile-time verification that the packet being sent
296    /// is allowed for the current connection role (Client or Server). It only works
297    /// when the `Role` type parameter is concrete (not generic).
298    ///
299    /// This is an experimental API that may be subject to change. For general use,
300    /// consider using the `send()` method instead.
301    ///
302    /// # Type Parameters
303    ///
304    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
305    ///
306    /// # Parameters
307    ///
308    /// * `packet` - The MQTT packet to send
309    ///
310    /// # Returns
311    ///
312    /// A vector of events that the application must process
313    ///
314    /// # Compile-time Safety
315    ///
316    /// If the packet type is not allowed for the current role, this will result
317    /// in a compile-time error, preventing protocol violations at development time.
318    ///
319    /// # Examples
320    ///
321    /// ```ignore
322    /// use mqtt_protocol_core::mqtt;
323    ///
324    /// // This works for concrete roles
325    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
326    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
327    ///
328    /// // This would cause a compile error
329    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
330    /// ```
331    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
332    where
333        T: Sendable<Role, PacketIdType>,
334    {
335        // dispatch concrete packet or generic packet
336        packet.dispatch_send(self)
337    }
338
339    /// Send MQTT packet with runtime role validation
340    ///
341    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
342    /// and performs runtime validation to ensure the packet is allowed for the current
343    /// connection role. This provides flexibility when the exact packet type is not known
344    /// at compile time.
345    ///
346    /// # Parameters
347    ///
348    /// * `packet` - The MQTT packet to send
349    ///
350    /// # Returns
351    ///
352    /// A vector of events that the application must process. If the packet is not allowed
353    /// for the current role, a `NotifyError` event will be included.
354    ///
355    /// # Validation
356    ///
357    /// The method validates that:
358    /// - The packet type is allowed for the connection role (Client vs Server)
359    /// - Protocol version compatibility
360    /// - Connection state requirements
361    /// - Packet ID management for QoS > 0 packets
362    ///
363    /// # Examples
364    ///
365    /// ```ignore
366    /// use mqtt_protocol_core::mqtt;
367    ///
368    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
369    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
370    ///
371    /// for event in events {
372    ///     match event {
373    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
374    ///             // Send packet over network
375    ///         },
376    ///         mqtt::connection::Event::NotifyError(error) => {
377    ///             // Handle validation errors
378    ///         },
379    ///         // Handle other events...
380    ///     }
381    /// }
382    /// ```
383    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
384        use core::any::TypeId;
385
386        let role_id = TypeId::of::<Role>();
387        let client_id = TypeId::of::<role::Client>();
388        let server_id = TypeId::of::<role::Server>();
389        let any_id = TypeId::of::<role::Any>();
390
391        // Check version compatibility between connection and packet
392        let packet_version = packet.protocol_version();
393
394        // Return error if versions don't match
395        if self.protocol_version != packet_version {
396            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
397        }
398
399        match packet {
400            // CONNECT - Client/Any can send
401            GenericPacket::V3_1_1Connect(p) => {
402                if role_id == client_id || role_id == any_id {
403                    self.process_send_v3_1_1_connect(p)
404                } else {
405                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
406                }
407            }
408            GenericPacket::V5_0Connect(p) => {
409                if role_id == client_id || role_id == any_id {
410                    self.process_send_v5_0_connect(p)
411                } else {
412                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
413                }
414            }
415            // CONNACK - Server/Any can send
416            GenericPacket::V3_1_1Connack(p) => {
417                if role_id == server_id || role_id == any_id {
418                    self.process_send_v3_1_1_connack(p)
419                } else {
420                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
421                }
422            }
423            GenericPacket::V5_0Connack(p) => {
424                if role_id == server_id || role_id == any_id {
425                    self.process_send_v5_0_connack(p)
426                } else {
427                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
428                }
429            }
430            // PUBLISH - Any role can send
431            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
432            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
433            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
434            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
435            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
436            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
437            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
438            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
439            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
440            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
441            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
442            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
443            GenericPacket::V3_1_1Subscribe(p) => {
444                if role_id == client_id || role_id == any_id {
445                    self.process_send_v3_1_1_subscribe(p)
446                } else {
447                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
448                }
449            }
450            GenericPacket::V5_0Subscribe(p) => {
451                if role_id == client_id || role_id == any_id {
452                    self.process_send_v5_0_subscribe(p)
453                } else {
454                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
455                }
456            }
457            GenericPacket::V3_1_1Unsubscribe(p) => {
458                if role_id == client_id || role_id == any_id {
459                    self.process_send_v3_1_1_unsubscribe(p)
460                } else {
461                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
462                }
463            }
464            GenericPacket::V5_0Unsubscribe(p) => {
465                if role_id == client_id || role_id == any_id {
466                    self.process_send_v5_0_unsubscribe(p)
467                } else {
468                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
469                }
470            }
471            // SUBACK/UNSUBACK - Server/Any can send
472            GenericPacket::V3_1_1Suback(p) => {
473                if role_id == server_id || role_id == any_id {
474                    self.process_send_v3_1_1_suback(p)
475                } else {
476                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
477                }
478            }
479            GenericPacket::V5_0Suback(p) => {
480                if role_id == server_id || role_id == any_id {
481                    self.process_send_v5_0_suback(p)
482                } else {
483                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
484                }
485            }
486            GenericPacket::V3_1_1Unsuback(p) => {
487                if role_id == server_id || role_id == any_id {
488                    self.process_send_v3_1_1_unsuback(p)
489                } else {
490                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
491                }
492            }
493            GenericPacket::V5_0Unsuback(p) => {
494                if role_id == server_id || role_id == any_id {
495                    self.process_send_v5_0_unsuback(p)
496                } else {
497                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
498                }
499            }
500            // PINGREQ - Client/Any can send
501            GenericPacket::V3_1_1Pingreq(p) => {
502                if role_id == client_id || role_id == any_id {
503                    self.process_send_v3_1_1_pingreq(p)
504                } else {
505                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
506                }
507            }
508            GenericPacket::V5_0Pingreq(p) => {
509                if role_id == client_id || role_id == any_id {
510                    self.process_send_v5_0_pingreq(p)
511                } else {
512                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
513                }
514            }
515            // PINGRESP - Server/Any can send
516            GenericPacket::V3_1_1Pingresp(p) => {
517                if role_id == server_id || role_id == any_id {
518                    self.process_send_v3_1_1_pingresp(p)
519                } else {
520                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
521                }
522            }
523            GenericPacket::V5_0Pingresp(p) => {
524                if role_id == server_id || role_id == any_id {
525                    self.process_send_v5_0_pingresp(p)
526                } else {
527                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
528                }
529            }
530            // DISCONNECT(v3.1.1) - Client/Any role can send
531            GenericPacket::V3_1_1Disconnect(p) => {
532                if role_id == client_id || role_id == any_id {
533                    self.process_send_v3_1_1_disconnect(p)
534                } else {
535                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
536                }
537            }
538            // DISCONNECT(v5.0) - Any role can send
539            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
540            // AUTH - Any role can send (v5.0 only)
541            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
542        }
543    }
544
545    /// Receive and process incoming MQTT data
546    ///
547    /// This method processes raw bytes received from the network and attempts to
548    /// parse them into MQTT packets. It handles packet fragmentation and can
549    /// process multiple complete packets from a single data buffer.
550    ///
551    /// # Parameters
552    ///
553    /// * `data` - A cursor over the received data bytes. The cursor position will
554    ///   be advanced as data is consumed.
555    ///
556    /// # Returns
557    ///
558    /// A vector of events generated from processing the received data:
559    /// - `NotifyPacketReceived` for successfully parsed packets
560    /// - `NotifyError` for parsing errors or protocol violations
561    /// - Additional events based on packet processing (timers, responses, etc.)
562    ///
563    /// # Behavior
564    ///
565    /// - Handles partial packets (data will be buffered until complete)
566    /// - Processes multiple complete packets in sequence
567    /// - Validates packet structure and protocol compliance
568    /// - Updates internal connection state based on received packets
569    /// - Generates appropriate response events (ACKs, etc.)
570    ///
571    /// # Examples
572    ///
573    /// ```ignore
574    /// use mqtt_protocol_core::mqtt;
575    ///
576    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
577    /// let received_data = [/* network data */];
578    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
579    ///
580    /// let events = connection.recv(&mut cursor);
581    /// for event in events {
582    ///     match event {
583    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
584    ///             // Process received packet
585    ///         },
586    ///         mqtt::connection::Event::NotifyError(error) => {
587    ///             // Handle parsing/protocol errors
588    ///         },
589    ///         // Handle other events...
590    ///     }
591    /// }
592    /// ```
593    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
594        let mut events = Vec::new();
595
596        match self.packet_builder.feed(data) {
597            PacketBuildResult::Complete(raw_packet) => {
598                events.extend(self.process_recv_packet(raw_packet));
599            }
600            PacketBuildResult::Incomplete => {}
601            PacketBuildResult::Error(e) => {
602                self.cancel_timers(&mut events);
603                events.push(GenericEvent::RequestClose);
604                events.push(GenericEvent::NotifyError(e));
605            }
606        }
607
608        events
609    }
610
611    /// Notify that a timer has fired (Event-based API)
612    ///
613    /// This method should be called when the I/O layer detects that a timer has expired.
614    /// It handles the timer event appropriately and returns events that need to be processed.
615    /// Notify that a timer has fired
616    ///
617    /// This method should be called by the application when a previously requested
618    /// timer expires. The connection will take appropriate action based on the timer type.
619    ///
620    /// # Parameters
621    ///
622    /// * `kind` - The type of timer that fired
623    ///
624    /// # Returns
625    ///
626    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
627    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
628        let mut events = Vec::new();
629
630        match kind {
631            TimerKind::PingreqSend => {
632                // Reset timer flag
633                self.pingreq_send_set = false;
634
635                // Send PINGREQ if connected
636                if self.status == ConnectionStatus::Connected {
637                    match self.protocol_version {
638                        Version::V3_1_1 => {
639                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
640                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
641                            }
642                        }
643                        Version::V5_0 => {
644                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
645                                events.extend(self.process_send_v5_0_pingreq(pingreq));
646                            }
647                        }
648                        Version::Undetermined => {
649                            unreachable!("Protocol version should be set before sending PINGREQ");
650                        }
651                    }
652                }
653            }
654            TimerKind::PingreqRecv => {
655                // Reset timer flag
656                self.pingreq_recv_set = false;
657
658                match self.protocol_version {
659                    Version::V3_1_1 => {
660                        // V3.1.1: Close connection
661                        events.push(GenericEvent::RequestClose);
662                    }
663                    Version::V5_0 => {
664                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
665                        if self.status == ConnectionStatus::Connected {
666                            if let Ok(disconnect) = v5_0::Disconnect::builder()
667                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
668                                .build()
669                            {
670                                events.extend(self.process_send_v5_0_disconnect(disconnect));
671                            }
672                        }
673                    }
674                    Version::Undetermined => {
675                        unreachable!("Protocol version should be set before receiving PINGREQ");
676                    }
677                }
678            }
679            TimerKind::PingrespRecv => {
680                // Reset timer flag
681                self.pingresp_recv_set = false;
682
683                match self.protocol_version {
684                    Version::V3_1_1 => {
685                        // V3.1.1: Close connection
686                        events.push(GenericEvent::RequestClose);
687                    }
688                    Version::V5_0 => {
689                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
690                        if self.status == ConnectionStatus::Connected {
691                            if let Ok(disconnect) = v5_0::Disconnect::builder()
692                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
693                                .build()
694                            {
695                                events.extend(self.process_send_v5_0_disconnect(disconnect));
696                            }
697                        }
698                    }
699                    Version::Undetermined => {
700                        unreachable!("Protocol version should be set before receiving PINGRESP");
701                    }
702                }
703            }
704        }
705
706        events
707    }
708
709    /// Notify that the connection has been closed by the I/O layer (Event-based API)
710    ///
711    /// This method should be called when the I/O layer detects that the socket has been closed.
712    /// It updates the internal state appropriately and returns events that need to be processed.
713    /// Notify that the underlying connection has been closed
714    ///
715    /// This method should be called when the network connection is closed,
716    /// either intentionally or due to network issues.
717    ///
718    /// # Returns
719    ///
720    /// Events generated from connection closure processing
721    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
722        let mut events = Vec::new();
723
724        // Reset packet size limits to MQTT protocol maximum
725        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
726        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
727
728        // Set status to disconnected
729        self.status = ConnectionStatus::Disconnected;
730
731        // Clear topic alias management
732        self.topic_alias_send = None;
733        self.topic_alias_recv = None;
734
735        // Release packet IDs for SUBACK
736        for packet_id in self.pid_suback.drain() {
737            if self.pid_man.is_used_id(packet_id) {
738                self.pid_man.release_id(packet_id);
739                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
740            }
741        }
742
743        // Release packet IDs for UNSUBACK
744        for packet_id in self.pid_unsuback.drain() {
745            if self.pid_man.is_used_id(packet_id) {
746                self.pid_man.release_id(packet_id);
747                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
748            }
749        }
750
751        // If not storing session state, clear QoS2 states and release publish-related packet IDs
752        if !self.need_store {
753            self.qos2_publish_processing.clear();
754            self.qos2_publish_handled.clear();
755
756            // Release packet IDs for PUBACK
757            for packet_id in self.pid_puback.drain() {
758                if self.pid_man.is_used_id(packet_id) {
759                    self.pid_man.release_id(packet_id);
760                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
761                }
762            }
763
764            // Release packet IDs for PUBREC
765            for packet_id in self.pid_pubrec.drain() {
766                if self.pid_man.is_used_id(packet_id) {
767                    self.pid_man.release_id(packet_id);
768                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
769                }
770            }
771
772            // Release packet IDs for PUBCOMP
773            for packet_id in self.pid_pubcomp.drain() {
774                if self.pid_man.is_used_id(packet_id) {
775                    self.pid_man.release_id(packet_id);
776                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
777                }
778            }
779        }
780
781        // Cancel all timers
782        self.cancel_timers(&mut events);
783
784        events
785    }
786
787    /// Set the PINGREQ send interval
788    ///
789    /// Sets the interval for sending PINGREQ packets to maintain the connection alive.
790    /// When changed, this may generate timer-related events to update the ping schedule.
791    ///
792    /// # Parameters
793    ///
794    /// * `duration_ms` - The interval in milliseconds between PINGREQ packets
795    ///
796    /// # Returns
797    ///
798    /// Events generated from updating the ping interval
799    pub fn set_pingreq_send_interval(
800        &mut self,
801        duration_ms: u64,
802    ) -> Vec<GenericEvent<PacketIdType>> {
803        let mut events = Vec::new();
804
805        if duration_ms == 0 {
806            self.pingreq_send_interval_ms = None;
807            self.pingreq_send_set = false;
808            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
809        } else {
810            self.pingreq_send_interval_ms = Some(duration_ms);
811            self.pingreq_send_set = true;
812            events.push(GenericEvent::RequestTimerReset {
813                kind: TimerKind::PingreqSend,
814                duration_ms,
815            });
816        }
817
818        events
819    }
820
821    /// Get the remaining capacity for sending PUBLISH packets
822    ///
823    /// Returns the number of additional PUBLISH packets that can be sent
824    /// without exceeding the receive maximum limit.
825    ///
826    /// # Returns
827    ///
828    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
829    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
830        // If publish_recv_max is set, return the remaining capacity
831        self.publish_send_max
832            .map(|max| max.saturating_sub(self.publish_send_count))
833    }
834
835    /// Enable or disable offline publishing
836    ///
837    /// When enabled, PUBLISH packets can be sent even when disconnected.
838    /// They will be queued and sent once the connection is established.
839    ///
840    /// # Parameters
841    ///
842    /// * `enable` - Whether to enable offline publishing
843    pub fn set_offline_publish(&mut self, enable: bool) {
844        self.offline_publish = enable;
845        if self.offline_publish {
846            self.need_store = true;
847        }
848    }
849
850    /// Enable or disable automatic PUBLISH response generation
851    ///
852    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
853    /// are automatically generated for received PUBLISH packets.
854    ///
855    /// # Parameters
856    ///
857    /// * `enable` - Whether to enable automatic responses
858    pub fn set_auto_pub_response(&mut self, enable: bool) {
859        self.auto_pub_response = enable;
860    }
861
862    /// Enable or disable automatic PING response generation
863    ///
864    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
865    ///
866    /// # Parameters
867    ///
868    /// * `enable` - Whether to enable automatic PING responses
869    pub fn set_auto_ping_response(&mut self, enable: bool) {
870        self.auto_ping_response = enable;
871    }
872
873    /// Enable or disable automatic topic alias mapping for outgoing packets
874    ///
875    /// When enabled, the connection will automatically map topics to aliases
876    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
877    /// - Applying existing registered topic aliases when available
878    /// - Allocating new topic aliases for unregistered topics
879    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
880    ///
881    /// # Parameters
882    ///
883    /// * `enable` - Whether to enable automatic topic alias mapping
884    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
885        self.auto_map_topic_alias_send = enable;
886    }
887
888    /// Enable or disable automatic topic alias replacement for outgoing packets
889    ///
890    /// When enabled, the connection will automatically apply existing registered
891    /// topic aliases to outgoing PUBLISH packets when aliases are available.
892    /// This only uses previously registered aliases and does not allocate new ones.
893    ///
894    /// # Parameters
895    ///
896    /// * `enable` - Whether to enable automatic topic alias replacement
897    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
898        self.auto_replace_topic_alias_send = enable;
899    }
900
901    /// Set PINGREQ receive timeout
902    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
903        self.pingresp_recv_timeout_ms = timeout_ms;
904    }
905
906    /// Acquire a new packet ID for outgoing packets
907    ///
908    /// # Returns
909    ///
910    /// A unique packet ID, or an error if none are available
911    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
912        self.pid_man.acquire_unique_id()
913    }
914
915    /// Register a packet ID as in use
916    ///
917    /// Manually registers a specific packet ID as being in use, preventing
918    /// it from being allocated by `acquire_packet_id()`.
919    ///
920    /// # Parameters
921    ///
922    /// * `packet_id` - The packet ID to register as in use
923    ///
924    /// # Returns
925    ///
926    /// `Ok(())` if successful, or an error if the packet ID is already in use
927    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
928        self.pid_man.register_id(packet_id)
929    }
930
931    /// Release packet ID (Event-based API)
932    /// Release a packet ID for reuse
933    ///
934    /// This method releases a packet ID, making it available for future use.
935    /// It also generates a `NotifyPacketIdReleased` event.
936    ///
937    /// # Parameters
938    ///
939    /// * `packet_id` - The packet ID to release
940    ///
941    /// # Returns
942    ///
943    /// Events generated from releasing the packet ID
944    pub fn release_packet_id(
945        &mut self,
946        packet_id: PacketIdType,
947    ) -> Vec<GenericEvent<PacketIdType>> {
948        let mut events = Vec::new();
949
950        if self.pid_man.is_used_id(packet_id) {
951            self.pid_man.release_id(packet_id);
952            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
953        }
954
955        events
956    }
957
958    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
959    ///
960    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
961    /// that have been successfully processed and handled.
962    ///
963    /// # Returns
964    ///
965    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
966    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
967        self.qos2_publish_handled.clone()
968    }
969
970    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
971    ///
972    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
973    /// typically used when resuming a connection from persistent storage.
974    ///
975    /// # Parameters
976    ///
977    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
978    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
979        self.qos2_publish_handled = pids;
980    }
981
982    /// Restore previously stored packets
983    ///
984    /// This method restores packets that were previously stored for persistence,
985    /// typically called when resuming a session.
986    ///
987    /// # Parameters
988    ///
989    /// * `packets` - Vector of packets to restore
990    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
991        for packet in packets {
992            match &packet {
993                GenericStorePacket::V3_1_1Publish(p) => {
994                    // Add to appropriate QoS tracking set
995                    match p.qos() {
996                        Qos::AtLeastOnce => {
997                            self.pid_puback.insert(p.packet_id().unwrap());
998                        }
999                        Qos::ExactlyOnce => {
1000                            self.pid_pubrec.insert(p.packet_id().unwrap());
1001                        }
1002                        _ => {
1003                            // QoS 0 shouldn't be in store, but handle gracefully
1004                            warn!("QoS 0 packet found in store, skipping");
1005                            continue;
1006                        }
1007                    }
1008                    // Register packet ID and add to store
1009                    let packet_id = p.packet_id().unwrap();
1010                    if self.pid_man.register_id(packet_id).is_ok() {
1011                        if let Err(_e) = self.store.add(packet) {
1012                            error!("Failed to add packet to store: {:?}", _e);
1013                        }
1014                    } else {
1015                        error!("Packet ID {} has already been used. Skip it", packet_id);
1016                    }
1017                }
1018                GenericStorePacket::V5_0Publish(p) => {
1019                    // Add to appropriate QoS tracking set
1020                    match p.qos() {
1021                        Qos::AtLeastOnce => {
1022                            self.pid_puback.insert(p.packet_id().unwrap());
1023                        }
1024                        Qos::ExactlyOnce => {
1025                            self.pid_pubrec.insert(p.packet_id().unwrap());
1026                        }
1027                        _ => {
1028                            // QoS 0 shouldn't be in store, but handle gracefully
1029                            warn!("QoS 0 packet found in store, skipping");
1030                            continue;
1031                        }
1032                    }
1033                    // Register packet ID and add to store
1034                    let packet_id = p.packet_id().unwrap();
1035                    if self.pid_man.register_id(packet_id).is_ok() {
1036                        if let Err(_e) = self.store.add(packet) {
1037                            error!("Failed to add packet to store: {:?}", _e);
1038                        }
1039                    } else {
1040                        error!("Packet ID {} has already been used. Skip it", packet_id);
1041                    }
1042                }
1043                GenericStorePacket::V3_1_1Pubrel(p) => {
1044                    // Pubrel packets expect PUBCOMP response
1045                    self.pid_pubcomp.insert(p.packet_id());
1046                    // Register packet ID and add to store
1047                    let packet_id = p.packet_id();
1048                    if self.pid_man.register_id(packet_id).is_ok() {
1049                        if let Err(_e) = self.store.add(packet) {
1050                            error!("Failed to add packet to store: {:?}", _e);
1051                        }
1052                    } else {
1053                        error!("Packet ID {} has already been used. Skip it", packet_id);
1054                    }
1055                }
1056                GenericStorePacket::V5_0Pubrel(p) => {
1057                    // Pubrel packets expect PUBCOMP response
1058                    self.pid_pubcomp.insert(p.packet_id());
1059                    // Register packet ID and add to store
1060                    let packet_id = p.packet_id();
1061                    if self.pid_man.register_id(packet_id).is_ok() {
1062                        if let Err(_e) = self.store.add(packet) {
1063                            error!("Failed to add packet to store: {:?}", _e);
1064                        }
1065                    } else {
1066                        error!("Packet ID {} has already been used. Skip it", packet_id);
1067                    }
1068                }
1069            }
1070        }
1071    }
1072
1073    /// Get stored packets for persistence
1074    ///
1075    /// Returns packets that need to be stored for potential retransmission.
1076    /// This is useful for implementing persistent sessions.
1077    ///
1078    /// # Returns
1079    ///
1080    /// Vector of packets that should be persisted
1081    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1082        self.store.get_stored()
1083    }
1084
1085    /// Get the MQTT protocol version being used
1086    ///
1087    /// # Returns
1088    ///
1089    /// The protocol version (V3_1_1 or V5_0)
1090    pub fn get_protocol_version(&self) -> Version {
1091        self.protocol_version
1092    }
1093
1094    /// Check if a PUBLISH packet is currently being processed
1095    ///
1096    /// # Parameters
1097    ///
1098    /// * `packet_id` - The packet ID to check
1099    ///
1100    /// # Returns
1101    ///
1102    /// True if the packet ID is in use for PUBLISH processing
1103    pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1104        self.qos2_publish_processing.contains(&packet_id)
1105    }
1106
1107    /// Regulate packet for store (remove/resolve topic alias)
1108    ///
1109    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1110    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1111    pub fn regulate_for_store(
1112        &self,
1113        mut packet: v5_0::GenericPublish<PacketIdType>,
1114    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1115        if packet.topic_name().is_empty() {
1116            // Topic is empty, need to resolve from topic alias
1117            if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1118                if let Some(ref topic_alias_send) = self.topic_alias_send {
1119                    if let Some(topic) = topic_alias_send.peek(topic_alias) {
1120                        // Found topic for alias, add topic and remove alias property
1121                        packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1122                    } else {
1123                        return Err(MqttError::PacketNotRegulated);
1124                    }
1125                } else {
1126                    return Err(MqttError::PacketNotRegulated);
1127                }
1128            } else {
1129                return Err(MqttError::PacketNotRegulated);
1130            }
1131        } else {
1132            // Topic is not empty, just remove TopicAlias property if present
1133            packet = packet.remove_topic_alias();
1134        }
1135
1136        Ok(packet)
1137    }
1138
1139    // private
1140
1141    /// Initialize connection state based on client/server role
1142    ///
1143    /// Resets all connection-specific state including:
1144    /// - Publish flow control counters and limits
1145    /// - Topic alias management
1146    /// - QoS2 processing state
1147    /// - Packet ID tracking sets
1148    /// - Store requirement flag
1149    ///
1150    /// # Parameters
1151    /// * `is_client` - true for client mode, false for server mode
1152    fn initialize(&mut self, is_client: bool) {
1153        self.publish_send_max = None;
1154        self.publish_recv_max = None;
1155        self.publish_send_count = 0;
1156        self.topic_alias_send = None;
1157        self.topic_alias_recv = None;
1158        self.publish_recv.clear();
1159        self.qos2_publish_processing.clear();
1160        self.need_store = false;
1161        self.pid_suback.clear();
1162        self.pid_unsuback.clear();
1163        self.is_client = is_client;
1164    }
1165
1166    fn clear_store_related(&mut self) {
1167        self.pid_man.clear();
1168        self.pid_puback.clear();
1169        self.pid_pubrec.clear();
1170        self.pid_pubcomp.clear();
1171        self.store.clear();
1172    }
1173
1174    /// Send all stored packets for retransmission
1175    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1176        let mut events = Vec::new();
1177        self.store.for_each(|packet| {
1178            if packet.size() > self.maximum_packet_size_send as usize {
1179                let packet_id = packet.packet_id();
1180                self.pid_man.release_id(packet_id);
1181                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1182                return false; // Remove from store
1183            }
1184            events.push(GenericEvent::RequestSendPacket {
1185                packet: packet.clone().into(),
1186                release_packet_id_if_send_error: None,
1187            });
1188            true // Keep in store
1189        });
1190
1191        events
1192    }
1193
1194    /// Validate topic alias and return the associated topic name
1195    ///
1196    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1197    /// from the topic alias send manager.
1198    ///
1199    /// # Parameters
1200    /// * `topic_alias_opt` - Optional topic alias value
1201    ///
1202    /// # Returns
1203    /// * `Some(topic_name)` if the topic alias is valid and found
1204    /// * `None` if the topic alias is invalid, not provided, or not found
1205    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1206        let topic_alias = topic_alias_opt?;
1207
1208        if !self.validate_topic_alias_range(topic_alias) {
1209            return None;
1210        }
1211
1212        let topic_alias_send = self.topic_alias_send.as_mut()?;
1213        // LRU updated here
1214        let topic = topic_alias_send.get(topic_alias)?;
1215
1216        Some(topic.to_string())
1217    }
1218
1219    /// Validate that topic alias is within the allowed range
1220    ///
1221    /// Checks if the topic alias value is valid according to the configured
1222    /// topic alias maximum for sending.
1223    ///
1224    /// # Parameters
1225    /// * `topic_alias` - Topic alias value to validate
1226    ///
1227    /// # Returns
1228    /// * `true` if the topic alias is within valid range
1229    /// * `false` if invalid or topic alias sending is not configured
1230    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1231        let topic_alias_send = match &self.topic_alias_send {
1232            Some(tas) => tas,
1233            None => {
1234                error!("topic_alias is set but topic_alias_maximum is 0");
1235                return false;
1236            }
1237        };
1238
1239        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1240            error!("topic_alias is set but out of range");
1241            return false;
1242        }
1243
1244        true
1245    }
1246
1247    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1248    pub(crate) fn process_send_v3_1_1_connect(
1249        &mut self,
1250        packet: v3_1_1::Connect,
1251    ) -> Vec<GenericEvent<PacketIdType>> {
1252        info!("send connect v3.1.1: {packet}");
1253
1254        if self.status != ConnectionStatus::Disconnected {
1255            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1256        }
1257        if !self.validate_maximum_packet_size_send(packet.size()) {
1258            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1259        }
1260
1261        let mut events = Vec::new();
1262        self.initialize(true);
1263        self.status = ConnectionStatus::Connecting;
1264
1265        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1266        let keep_alive = packet.keep_alive();
1267        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1268            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1269        }
1270
1271        // Handle clean_session flag
1272        if packet.clean_start() {
1273            self.clear_store_related();
1274        } else {
1275            self.need_store = true;
1276        }
1277
1278        // Clear topic alias for sending
1279        self.topic_alias_send = None;
1280
1281        events.push(GenericEvent::RequestSendPacket {
1282            packet: packet.into(),
1283            release_packet_id_if_send_error: None,
1284        });
1285        self.send_post_process(&mut events);
1286
1287        events
1288    }
1289
1290    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1291    pub(crate) fn process_send_v5_0_connect(
1292        &mut self,
1293        packet: v5_0::Connect,
1294    ) -> Vec<GenericEvent<PacketIdType>> {
1295        info!("send connect v5.0: {packet}");
1296        if !self.validate_maximum_packet_size_send(packet.size()) {
1297            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1298        }
1299        if self.status != ConnectionStatus::Disconnected {
1300            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1301        }
1302
1303        let mut events = Vec::new();
1304        self.initialize(true);
1305        self.status = ConnectionStatus::Connecting;
1306
1307        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1308        let keep_alive = packet.keep_alive();
1309        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1310            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1311        }
1312
1313        // Handle clean_start flag
1314        if packet.clean_start() {
1315            self.clear_store_related();
1316        }
1317
1318        // Process properties
1319        for prop in packet.props() {
1320            match prop {
1321                Property::TopicAliasMaximum(val) => {
1322                    if val.val() != 0 {
1323                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1324                    }
1325                }
1326                Property::ReceiveMaximum(val) => {
1327                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1328                    self.publish_recv_max = Some(val.val());
1329                }
1330                Property::MaximumPacketSize(val) => {
1331                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1332                    self.maximum_packet_size_recv = val.val();
1333                }
1334                Property::SessionExpiryInterval(val) => {
1335                    if val.val() != 0 {
1336                        self.need_store = true;
1337                    }
1338                }
1339                _ => {
1340                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1341                }
1342            }
1343        }
1344
1345        events.push(GenericEvent::RequestSendPacket {
1346            packet: packet.into(),
1347            release_packet_id_if_send_error: None,
1348        });
1349        self.send_post_process(&mut events);
1350
1351        events
1352    }
1353
1354    pub(crate) fn process_send_v3_1_1_connack(
1355        &mut self,
1356        packet: v3_1_1::Connack,
1357    ) -> Vec<GenericEvent<PacketIdType>> {
1358        info!("send connack v3.1.1: {packet}");
1359        if self.status != ConnectionStatus::Connecting {
1360            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1361        }
1362        let mut events = Vec::new();
1363        if packet.return_code() == ConnectReturnCode::Accepted {
1364            self.status = ConnectionStatus::Connected;
1365        } else {
1366            self.status = ConnectionStatus::Disconnected;
1367        }
1368
1369        events.push(GenericEvent::RequestSendPacket {
1370            packet: packet.into(),
1371            release_packet_id_if_send_error: None,
1372        });
1373        events.extend(self.send_stored());
1374        self.send_post_process(&mut events);
1375
1376        events
1377    }
1378
1379    pub(crate) fn process_send_v5_0_connack(
1380        &mut self,
1381        packet: v5_0::Connack,
1382    ) -> Vec<GenericEvent<PacketIdType>> {
1383        info!("send connack v5.0: {packet}");
1384        if !self.validate_maximum_packet_size_send(packet.size()) {
1385            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1386        }
1387        if self.status != ConnectionStatus::Connecting {
1388            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1389        }
1390
1391        let mut events = Vec::new();
1392
1393        if packet.reason_code() == ConnectReasonCode::Success {
1394            self.status = ConnectionStatus::Connected;
1395
1396            // Process properties
1397            for prop in packet.props() {
1398                match prop {
1399                    Property::TopicAliasMaximum(val) => {
1400                        if val.val() != 0 {
1401                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1402                        }
1403                    }
1404                    Property::ReceiveMaximum(val) => {
1405                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1406                        self.publish_recv_max = Some(val.val());
1407                    }
1408                    Property::MaximumPacketSize(val) => {
1409                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1410                        self.maximum_packet_size_recv = val.val();
1411                    }
1412                    _ => {
1413                        // Ignore other properties
1414                    }
1415                }
1416            }
1417        } else {
1418            self.status = ConnectionStatus::Disconnected;
1419            self.cancel_timers(&mut events);
1420        }
1421
1422        events.push(GenericEvent::RequestSendPacket {
1423            packet: packet.into(),
1424            release_packet_id_if_send_error: None,
1425        });
1426        events.extend(self.send_stored());
1427        self.send_post_process(&mut events);
1428
1429        events
1430    }
1431
1432    pub(crate) fn process_send_v3_1_1_publish(
1433        &mut self,
1434        packet: v3_1_1::GenericPublish<PacketIdType>,
1435    ) -> Vec<GenericEvent<PacketIdType>> {
1436        let mut events = Vec::new();
1437        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1438
1439        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1440            // Register packet ID for QoS 1 or 2
1441            let packet_id = packet.packet_id().unwrap();
1442            if self.status != ConnectionStatus::Connected
1443                && !self.need_store
1444                && !self.offline_publish
1445            {
1446                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1447                if self.pid_man.is_used_id(packet_id) {
1448                    self.pid_man.release_id(packet_id);
1449                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1450                }
1451                return events;
1452            }
1453            if !self.pid_man.is_used_id(packet_id) {
1454                error!("packet_id {packet_id} must be acquired or registered");
1455                events.push(GenericEvent::NotifyError(
1456                    MqttError::PacketIdentifierInvalid,
1457                ));
1458                return events;
1459            }
1460            if self.need_store
1461                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1462            {
1463                let store_packet = packet.clone().set_dup(true);
1464                self.store.add(store_packet.try_into().unwrap()).unwrap();
1465            } else {
1466                release_packet_id_if_send_error = Some(packet_id);
1467            }
1468            if packet.qos() == Qos::ExactlyOnce {
1469                self.qos2_publish_processing.insert(packet_id);
1470                self.pid_pubrec.insert(packet_id);
1471            } else {
1472                self.pid_puback.insert(packet_id);
1473            }
1474        } else if self.status != ConnectionStatus::Connected {
1475            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1476            return events;
1477        }
1478
1479        if self.status == ConnectionStatus::Connected {
1480            events.push(GenericEvent::RequestSendPacket {
1481                packet: packet.into(),
1482                release_packet_id_if_send_error,
1483            });
1484        }
1485        self.send_post_process(&mut events);
1486
1487        events
1488    }
1489
1490    pub(crate) fn process_send_v5_0_publish(
1491        &mut self,
1492        mut packet: v5_0::GenericPublish<PacketIdType>,
1493    ) -> Vec<GenericEvent<PacketIdType>> {
1494        if !self.validate_maximum_packet_size_send(packet.size()) {
1495            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1496        }
1497
1498        let mut events = Vec::new();
1499        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1500        let mut topic_alias_validated = false;
1501        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1502            let packet_id = packet.packet_id().unwrap();
1503            if self.status != ConnectionStatus::Connected
1504                && !self.need_store
1505                && !self.offline_publish
1506            {
1507                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1508                if self.pid_man.is_used_id(packet_id) {
1509                    self.pid_man.release_id(packet_id);
1510                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1511                }
1512                return events;
1513            }
1514
1515            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1516            if !self.pid_man.is_used_id(packet_id) {
1517                error!("packet_id {packet_id} must be acquired or registered");
1518                events.push(GenericEvent::NotifyError(
1519                    MqttError::PacketIdentifierInvalid,
1520                ));
1521                return events;
1522            }
1523
1524            if self.need_store
1525                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1526            {
1527                let ta_opt = Self::get_topic_alias_from_props(packet.props());
1528                if packet.topic_name().is_empty() {
1529                    // Topic name is empty, must validate topic alias
1530                    let topic_opt = self.validate_topic_alias(ta_opt);
1531                    if topic_opt.is_none() {
1532                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1533                        if self.pid_man.is_used_id(packet_id) {
1534                            self.pid_man.release_id(packet_id);
1535                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1536                        }
1537                        return events;
1538                    }
1539                    topic_alias_validated = true;
1540                    let store_packet = packet
1541                        .clone()
1542                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1543                        .unwrap()
1544                        .set_dup(true);
1545                    // TBD validate_maximum_packet_size(store_packet.size());
1546                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1547                } else {
1548                    // Topic name is not empty, remove topic alias if present
1549                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1550                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1551                }
1552            } else {
1553                release_packet_id_if_send_error = Some(packet_id);
1554            }
1555            if packet.qos() == Qos::ExactlyOnce {
1556                self.qos2_publish_processing.insert(packet_id);
1557                self.pid_pubrec.insert(packet_id);
1558            } else {
1559                self.pid_puback.insert(packet_id);
1560            }
1561        } else if self.status != ConnectionStatus::Connected {
1562            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1563            return events;
1564        }
1565
1566        let packet_id_opt = packet.packet_id();
1567        let ta_opt = Self::get_topic_alias_from_props(packet.props());
1568        if packet.topic_name().is_empty() {
1569            // process manually provided TopicAlias
1570            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1571                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1572                if let Some(packet_id) = packet_id_opt {
1573                    if self.pid_man.is_used_id(packet_id) {
1574                        self.pid_man.release_id(packet_id);
1575                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1576                    }
1577                }
1578                return events;
1579            }
1580        } else if let Some(ta) = ta_opt {
1581            // Topic alias is provided
1582            if self.validate_topic_alias_range(ta) {
1583                trace!(
1584                    "topic alias: {} - {} is registered.",
1585                    packet.topic_name(),
1586                    ta
1587                );
1588                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589                    topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590                }
1591            } else {
1592                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593                if let Some(packet_id) = packet_id_opt {
1594                    if self.pid_man.is_used_id(packet_id) {
1595                        self.pid_man.release_id(packet_id);
1596                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597                    }
1598                }
1599                return events;
1600            }
1601        } else if self.status == ConnectionStatus::Connected {
1602            // process auto applying TopicAlias if the option is enabled
1603            if self.auto_map_topic_alias_send {
1604                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1606                        trace!(
1607                            "topic alias: {} - {} is found.",
1608                            packet.topic_name(),
1609                            found_ta
1610                        );
1611                        packet = packet.remove_topic_add_topic_alias(found_ta);
1612                    } else {
1613                        let lru_ta = topic_alias_send.get_lru_alias();
1614                        topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1615                        packet = packet.remove_topic_add_topic_alias(lru_ta);
1616                    }
1617                }
1618            } else if self.auto_replace_topic_alias_send {
1619                if let Some(ref topic_alias_send) = self.topic_alias_send {
1620                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1621                        trace!(
1622                            "topic alias: {} - {} is found.",
1623                            packet.topic_name(),
1624                            found_ta
1625                        );
1626                        packet = packet.remove_topic_add_topic_alias(found_ta);
1627                    }
1628                }
1629            }
1630        }
1631
1632        // Check receive_maximum for sending (QoS 1 and 2 packets)
1633        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1634            if let Some(max) = self.publish_send_max {
1635                if self.publish_send_count == max {
1636                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1637                    if let Some(packet_id) = packet_id_opt {
1638                        if self.pid_man.is_used_id(packet_id) {
1639                            self.pid_man.release_id(packet_id);
1640                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1641                        }
1642                    }
1643                    return events;
1644                }
1645                self.publish_send_count += 1;
1646            }
1647        }
1648
1649        if self.status == ConnectionStatus::Connected {
1650            events.push(GenericEvent::RequestSendPacket {
1651                packet: packet.into(),
1652                release_packet_id_if_send_error,
1653            });
1654        }
1655        self.send_post_process(&mut events);
1656
1657        events
1658    }
1659
1660    pub(crate) fn process_send_v3_1_1_puback(
1661        &mut self,
1662        packet: v3_1_1::GenericPuback<PacketIdType>,
1663    ) -> Vec<GenericEvent<PacketIdType>> {
1664        if self.status != ConnectionStatus::Connected {
1665            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1666        }
1667        let mut events = Vec::new();
1668
1669        events.push(GenericEvent::RequestSendPacket {
1670            packet: packet.into(),
1671            release_packet_id_if_send_error: None,
1672        });
1673        self.send_post_process(&mut events);
1674
1675        events
1676    }
1677
1678    pub(crate) fn process_send_v5_0_puback(
1679        &mut self,
1680        packet: v5_0::GenericPuback<PacketIdType>,
1681    ) -> Vec<GenericEvent<PacketIdType>> {
1682        if !self.validate_maximum_packet_size_send(packet.size()) {
1683            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1684        }
1685        if self.status != ConnectionStatus::Connected {
1686            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1687        }
1688
1689        let mut events = Vec::new();
1690        self.publish_recv.remove(&packet.packet_id());
1691
1692        events.push(GenericEvent::RequestSendPacket {
1693            packet: packet.into(),
1694            release_packet_id_if_send_error: None,
1695        });
1696        self.send_post_process(&mut events);
1697
1698        events
1699    }
1700
1701    pub(crate) fn process_send_v3_1_1_pubrec(
1702        &mut self,
1703        packet: v3_1_1::GenericPubrec<PacketIdType>,
1704    ) -> Vec<GenericEvent<PacketIdType>> {
1705        if self.status != ConnectionStatus::Connected {
1706            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1707        }
1708        let mut events = Vec::new();
1709
1710        events.push(GenericEvent::RequestSendPacket {
1711            packet: packet.into(),
1712            release_packet_id_if_send_error: None,
1713        });
1714        self.send_post_process(&mut events);
1715
1716        events
1717    }
1718
1719    pub(crate) fn process_send_v5_0_pubrec(
1720        &mut self,
1721        packet: v5_0::GenericPubrec<PacketIdType>,
1722    ) -> Vec<GenericEvent<PacketIdType>> {
1723        if !self.validate_maximum_packet_size_send(packet.size()) {
1724            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1725        }
1726        if self.status != ConnectionStatus::Connected {
1727            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1728        }
1729
1730        let mut events = Vec::new();
1731        let packet_id = packet.packet_id();
1732
1733        if let Some(rc) = packet.reason_code() {
1734            if rc.is_failure() {
1735                self.publish_recv.remove(&packet_id);
1736                self.qos2_publish_handled.remove(&packet_id);
1737            }
1738        }
1739
1740        events.push(GenericEvent::RequestSendPacket {
1741            packet: packet.into(),
1742            release_packet_id_if_send_error: None,
1743        });
1744        self.send_post_process(&mut events);
1745
1746        events
1747    }
1748
1749    pub(crate) fn process_send_v3_1_1_pubrel(
1750        &mut self,
1751        packet: v3_1_1::GenericPubrel<PacketIdType>,
1752    ) -> Vec<GenericEvent<PacketIdType>> {
1753        if self.status != ConnectionStatus::Connected && !self.need_store {
1754            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1755        }
1756        let mut events = Vec::new();
1757        let packet_id = packet.packet_id();
1758        if !self.pid_man.is_used_id(packet_id) {
1759            error!("packet_id {packet_id} must be acquired or registered");
1760            events.push(GenericEvent::NotifyError(
1761                MqttError::PacketIdentifierInvalid,
1762            ));
1763            return events;
1764        }
1765        if self.need_store {
1766            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1767        }
1768
1769        if self.status == ConnectionStatus::Connected {
1770            self.pid_pubcomp.insert(packet_id);
1771            events.push(GenericEvent::RequestSendPacket {
1772                packet: packet.into(),
1773                release_packet_id_if_send_error: None,
1774            });
1775        }
1776        self.send_post_process(&mut events);
1777
1778        events
1779    }
1780
1781    pub(crate) fn process_send_v5_0_pubrel(
1782        &mut self,
1783        packet: v5_0::GenericPubrel<PacketIdType>,
1784    ) -> Vec<GenericEvent<PacketIdType>> {
1785        if !self.validate_maximum_packet_size_send(packet.size()) {
1786            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1787        }
1788        if self.status != ConnectionStatus::Connected && !self.need_store {
1789            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1790        }
1791
1792        let mut events = Vec::new();
1793        let packet_id = packet.packet_id();
1794        if !self.pid_man.is_used_id(packet_id) {
1795            error!("packet_id {packet_id} must be acquired or registered");
1796            events.push(GenericEvent::NotifyError(
1797                MqttError::PacketIdentifierInvalid,
1798            ));
1799            return events;
1800        }
1801        if self.need_store {
1802            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1803        }
1804
1805        if self.status == ConnectionStatus::Connected {
1806            self.pid_pubcomp.insert(packet_id);
1807            events.push(GenericEvent::RequestSendPacket {
1808                packet: packet.into(),
1809                release_packet_id_if_send_error: None,
1810            });
1811        }
1812        self.send_post_process(&mut events);
1813
1814        events
1815    }
1816
1817    pub(crate) fn process_send_v3_1_1_pubcomp(
1818        &mut self,
1819        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1820    ) -> Vec<GenericEvent<PacketIdType>> {
1821        if self.status != ConnectionStatus::Connected {
1822            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1823        }
1824        let mut events = Vec::new();
1825
1826        events.push(GenericEvent::RequestSendPacket {
1827            packet: packet.into(),
1828            release_packet_id_if_send_error: None,
1829        });
1830        self.send_post_process(&mut events);
1831
1832        events
1833    }
1834
1835    pub(crate) fn process_send_v5_0_pubcomp(
1836        &mut self,
1837        packet: v5_0::GenericPubcomp<PacketIdType>,
1838    ) -> Vec<GenericEvent<PacketIdType>> {
1839        if !self.validate_maximum_packet_size_send(packet.size()) {
1840            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1841        }
1842        if self.status != ConnectionStatus::Connected {
1843            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1844        }
1845
1846        let mut events = Vec::new();
1847        self.publish_recv.remove(&packet.packet_id());
1848
1849        events.push(GenericEvent::RequestSendPacket {
1850            packet: packet.into(),
1851            release_packet_id_if_send_error: None,
1852        });
1853        self.send_post_process(&mut events);
1854
1855        events
1856    }
1857
1858    pub(crate) fn process_send_v3_1_1_subscribe(
1859        &mut self,
1860        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1861    ) -> Vec<GenericEvent<PacketIdType>> {
1862        let mut events = Vec::new();
1863        let packet_id = packet.packet_id();
1864        if self.status != ConnectionStatus::Connected {
1865            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1866            if self.pid_man.is_used_id(packet_id) {
1867                self.pid_man.release_id(packet_id);
1868                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1869            }
1870            return events;
1871        }
1872        if !self.pid_man.is_used_id(packet_id) {
1873            error!("packet_id {packet_id} must be acquired or registered");
1874            events.push(GenericEvent::NotifyError(
1875                MqttError::PacketIdentifierInvalid,
1876            ));
1877            return events;
1878        }
1879        self.pid_suback.insert(packet_id);
1880
1881        events.push(GenericEvent::RequestSendPacket {
1882            packet: packet.into(),
1883            release_packet_id_if_send_error: Some(packet_id),
1884        });
1885        self.send_post_process(&mut events);
1886
1887        events
1888    }
1889
1890    pub(crate) fn process_send_v5_0_subscribe(
1891        &mut self,
1892        packet: v5_0::GenericSubscribe<PacketIdType>,
1893    ) -> Vec<GenericEvent<PacketIdType>> {
1894        if !self.validate_maximum_packet_size_send(packet.size()) {
1895            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1896        }
1897
1898        let mut events = Vec::new();
1899        let packet_id = packet.packet_id();
1900        if self.status != ConnectionStatus::Connected {
1901            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1902            if self.pid_man.is_used_id(packet_id) {
1903                self.pid_man.release_id(packet_id);
1904                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1905            }
1906            return events;
1907        }
1908        if !self.pid_man.is_used_id(packet_id) {
1909            error!("packet_id {packet_id} must be acquired or registered");
1910            events.push(GenericEvent::NotifyError(
1911                MqttError::PacketIdentifierInvalid,
1912            ));
1913            return events;
1914        }
1915        self.pid_suback.insert(packet_id);
1916
1917        events.push(GenericEvent::RequestSendPacket {
1918            packet: packet.into(),
1919            release_packet_id_if_send_error: Some(packet_id),
1920        });
1921        self.send_post_process(&mut events);
1922
1923        events
1924    }
1925
1926    pub(crate) fn process_send_v3_1_1_suback(
1927        &mut self,
1928        packet: v3_1_1::GenericSuback<PacketIdType>,
1929    ) -> Vec<GenericEvent<PacketIdType>> {
1930        if self.status != ConnectionStatus::Connected {
1931            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1932        }
1933        let mut events = Vec::new();
1934        events.push(GenericEvent::RequestSendPacket {
1935            packet: packet.into(),
1936            release_packet_id_if_send_error: None,
1937        });
1938        self.send_post_process(&mut events);
1939
1940        events
1941    }
1942
1943    pub(crate) fn process_send_v5_0_suback(
1944        &mut self,
1945        packet: v5_0::GenericSuback<PacketIdType>,
1946    ) -> Vec<GenericEvent<PacketIdType>> {
1947        if !self.validate_maximum_packet_size_send(packet.size()) {
1948            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1949        }
1950        if self.status != ConnectionStatus::Connected {
1951            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1952        }
1953
1954        let mut events = Vec::new();
1955        events.push(GenericEvent::RequestSendPacket {
1956            packet: packet.into(),
1957            release_packet_id_if_send_error: None,
1958        });
1959        self.send_post_process(&mut events);
1960
1961        events
1962    }
1963
1964    pub(crate) fn process_send_v3_1_1_unsubscribe(
1965        &mut self,
1966        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1967    ) -> Vec<GenericEvent<PacketIdType>> {
1968        let mut events = Vec::new();
1969        let packet_id = packet.packet_id();
1970        if self.status != ConnectionStatus::Connected {
1971            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1972            if self.pid_man.is_used_id(packet_id) {
1973                self.pid_man.release_id(packet_id);
1974                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1975            }
1976            return events;
1977        }
1978        if !self.pid_man.is_used_id(packet_id) {
1979            error!("packet_id {packet_id} must be acquired or registered");
1980            events.push(GenericEvent::NotifyError(
1981                MqttError::PacketIdentifierInvalid,
1982            ));
1983            return events;
1984        }
1985        self.pid_unsuback.insert(packet_id);
1986
1987        events.push(GenericEvent::RequestSendPacket {
1988            packet: packet.into(),
1989            release_packet_id_if_send_error: Some(packet_id),
1990        });
1991        self.send_post_process(&mut events);
1992
1993        events
1994    }
1995
1996    pub(crate) fn process_send_v5_0_unsubscribe(
1997        &mut self,
1998        packet: v5_0::GenericUnsubscribe<PacketIdType>,
1999    ) -> Vec<GenericEvent<PacketIdType>> {
2000        if !self.validate_maximum_packet_size_send(packet.size()) {
2001            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2002        }
2003
2004        let mut events = Vec::new();
2005        let packet_id = packet.packet_id();
2006        if self.status != ConnectionStatus::Connected {
2007            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2008            if self.pid_man.is_used_id(packet_id) {
2009                self.pid_man.release_id(packet_id);
2010                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2011            }
2012            return events;
2013        }
2014        if !self.pid_man.is_used_id(packet_id) {
2015            error!("packet_id {packet_id} must be acquired or registered");
2016            events.push(GenericEvent::NotifyError(
2017                MqttError::PacketIdentifierInvalid,
2018            ));
2019            return events;
2020        }
2021        self.pid_unsuback.insert(packet_id);
2022
2023        events.push(GenericEvent::RequestSendPacket {
2024            packet: packet.into(),
2025            release_packet_id_if_send_error: Some(packet_id),
2026        });
2027        self.send_post_process(&mut events);
2028
2029        events
2030    }
2031
2032    pub(crate) fn process_send_v3_1_1_unsuback(
2033        &mut self,
2034        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2035    ) -> Vec<GenericEvent<PacketIdType>> {
2036        if self.status != ConnectionStatus::Connected {
2037            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2038        }
2039        let mut events = Vec::new();
2040        events.push(GenericEvent::RequestSendPacket {
2041            packet: packet.into(),
2042            release_packet_id_if_send_error: None,
2043        });
2044        self.send_post_process(&mut events);
2045
2046        events
2047    }
2048
2049    pub(crate) fn process_send_v5_0_unsuback(
2050        &mut self,
2051        packet: v5_0::GenericUnsuback<PacketIdType>,
2052    ) -> Vec<GenericEvent<PacketIdType>> {
2053        if !self.validate_maximum_packet_size_send(packet.size()) {
2054            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2055        }
2056        if self.status != ConnectionStatus::Connected {
2057            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2058        }
2059
2060        let mut events = Vec::new();
2061        events.push(GenericEvent::RequestSendPacket {
2062            packet: packet.into(),
2063            release_packet_id_if_send_error: None,
2064        });
2065        self.send_post_process(&mut events);
2066
2067        events
2068    }
2069
2070    pub(crate) fn process_send_v3_1_1_pingreq(
2071        &mut self,
2072        packet: v3_1_1::Pingreq,
2073    ) -> Vec<GenericEvent<PacketIdType>> {
2074        if self.status != ConnectionStatus::Connected {
2075            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076        }
2077        let mut events = Vec::new();
2078        events.push(GenericEvent::RequestSendPacket {
2079            packet: packet.into(),
2080            release_packet_id_if_send_error: None,
2081        });
2082        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2083            self.pingreq_send_set = true;
2084            events.push(GenericEvent::RequestTimerReset {
2085                kind: TimerKind::PingrespRecv,
2086                duration_ms: timeout_ms,
2087            });
2088        }
2089        self.send_post_process(&mut events);
2090
2091        events
2092    }
2093
2094    pub(crate) fn process_send_v5_0_pingreq(
2095        &mut self,
2096        packet: v5_0::Pingreq,
2097    ) -> Vec<GenericEvent<PacketIdType>> {
2098        if !self.validate_maximum_packet_size_send(packet.size()) {
2099            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2100        }
2101        if self.status != ConnectionStatus::Connected {
2102            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2103        }
2104
2105        let mut events = Vec::new();
2106        events.push(GenericEvent::RequestSendPacket {
2107            packet: packet.into(),
2108            release_packet_id_if_send_error: None,
2109        });
2110        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2111            self.pingreq_send_set = true;
2112            events.push(GenericEvent::RequestTimerReset {
2113                kind: TimerKind::PingrespRecv,
2114                duration_ms: timeout_ms,
2115            });
2116        }
2117        self.send_post_process(&mut events);
2118
2119        events
2120    }
2121
2122    pub(crate) fn process_send_v3_1_1_pingresp(
2123        &mut self,
2124        packet: v3_1_1::Pingresp,
2125    ) -> Vec<GenericEvent<PacketIdType>> {
2126        if self.status != ConnectionStatus::Connected {
2127            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2128        }
2129        let mut events = Vec::new();
2130        events.push(GenericEvent::RequestSendPacket {
2131            packet: packet.into(),
2132            release_packet_id_if_send_error: None,
2133        });
2134        self.send_post_process(&mut events);
2135
2136        events
2137    }
2138
2139    pub(crate) fn process_send_v5_0_pingresp(
2140        &mut self,
2141        packet: v5_0::Pingresp,
2142    ) -> Vec<GenericEvent<PacketIdType>> {
2143        if !self.validate_maximum_packet_size_send(packet.size()) {
2144            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2145        }
2146        if self.status != ConnectionStatus::Connected {
2147            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2148        }
2149
2150        let mut events = Vec::new();
2151        events.push(GenericEvent::RequestSendPacket {
2152            packet: packet.into(),
2153            release_packet_id_if_send_error: None,
2154        });
2155        self.send_post_process(&mut events);
2156
2157        events
2158    }
2159
2160    pub(crate) fn process_send_v3_1_1_disconnect(
2161        &mut self,
2162        packet: v3_1_1::Disconnect,
2163    ) -> Vec<GenericEvent<PacketIdType>> {
2164        if self.status != ConnectionStatus::Connected {
2165            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166        }
2167        let mut events = Vec::new();
2168        self.status = ConnectionStatus::Disconnected;
2169        self.cancel_timers(&mut events);
2170        events.push(GenericEvent::RequestSendPacket {
2171            packet: packet.into(),
2172            release_packet_id_if_send_error: None,
2173        });
2174        events.push(GenericEvent::RequestClose);
2175
2176        events
2177    }
2178
2179    pub(crate) fn process_send_v5_0_disconnect(
2180        &mut self,
2181        packet: v5_0::Disconnect,
2182    ) -> Vec<GenericEvent<PacketIdType>> {
2183        if !self.validate_maximum_packet_size_send(packet.size()) {
2184            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2185        }
2186        if self.status != ConnectionStatus::Connected {
2187            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2188        }
2189
2190        let mut events = Vec::new();
2191        self.status = ConnectionStatus::Disconnected;
2192        self.cancel_timers(&mut events);
2193        events.push(GenericEvent::RequestSendPacket {
2194            packet: packet.into(),
2195            release_packet_id_if_send_error: None,
2196        });
2197        events.push(GenericEvent::RequestClose);
2198
2199        events
2200    }
2201
2202    pub(crate) fn process_send_v5_0_auth(
2203        &mut self,
2204        packet: v5_0::Auth,
2205    ) -> Vec<GenericEvent<PacketIdType>> {
2206        if !self.validate_maximum_packet_size_send(packet.size()) {
2207            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2208        }
2209        if self.status == ConnectionStatus::Disconnected {
2210            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2211        }
2212
2213        let mut events = Vec::new();
2214        events.push(GenericEvent::RequestSendPacket {
2215            packet: packet.into(),
2216            release_packet_id_if_send_error: None,
2217        });
2218        self.send_post_process(&mut events);
2219
2220        events
2221    }
2222
2223    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2224        if self.is_client {
2225            if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2226                self.pingreq_send_set = true;
2227                events.push(GenericEvent::RequestTimerReset {
2228                    kind: TimerKind::PingreqSend,
2229                    duration_ms: timeout_ms,
2230                });
2231            }
2232        }
2233    }
2234
2235    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2236        if size > self.maximum_packet_size_send as usize {
2237            error!("packet size over maximum_packet_size for sending");
2238            return false;
2239        }
2240        true
2241    }
2242
2243    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2244        let mut events = Vec::new();
2245
2246        // packet size limit validation (v3.1.1 is always satisfied)
2247        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2248        if total_size > self.maximum_packet_size_recv {
2249            // This happens only when protocol version is V5.0.
2250            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2251            // If the packet size is over 268434555, feed() return an error.
2252            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2253            // So DISCONNECT packet is the right choice to notify the error.
2254            let disconnect_packet = v5_0::Disconnect::builder()
2255                .reason_code(DisconnectReasonCode::PacketTooLarge)
2256                .build()
2257                .unwrap();
2258            // Send disconnect packet directly without generic constraints
2259            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2260            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2261            return events;
2262        }
2263
2264        let packet_type = raw_packet.packet_type();
2265        let _flags = raw_packet.flags();
2266        match self.protocol_version {
2267            Version::V3_1_1 => {
2268                match packet_type {
2269                    1 => {
2270                        // CONNECT
2271                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2272                    }
2273                    2 => {
2274                        // CONNACK
2275                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2276                    }
2277                    3 => {
2278                        // PUBLISH
2279                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2280                    }
2281                    4 => {
2282                        // PUBACK
2283                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2284                    }
2285                    5 => {
2286                        // PUBREC
2287                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2288                    }
2289                    6 => {
2290                        // PUBREL
2291                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2292                    }
2293                    7 => {
2294                        // PUBCOMP
2295                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2296                    }
2297                    8 => {
2298                        // SUBSCRIBE
2299                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2300                    }
2301                    9 => {
2302                        // SUBACK
2303                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2304                    }
2305                    10 => {
2306                        // UNSUBSCRIBE
2307                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2308                    }
2309                    11 => {
2310                        // UNSUBACK
2311                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2312                    }
2313                    12 => {
2314                        // PINGREQ
2315                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2316                    }
2317                    13 => {
2318                        // PINGRESP
2319                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2320                    }
2321                    14 => {
2322                        // DISCONNECT
2323                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2324                    }
2325                    // invalid packet type
2326                    _ => {
2327                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2328                    }
2329                }
2330            }
2331            Version::V5_0 => {
2332                match packet_type {
2333                    1 => {
2334                        // CONNECT
2335                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2336                    }
2337                    2 => {
2338                        // CONNACK
2339                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2340                    }
2341                    3 => {
2342                        // PUBLISH
2343                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2344                    }
2345                    4 => {
2346                        // PUBACK
2347                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2348                    }
2349                    5 => {
2350                        // PUBREC
2351                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2352                    }
2353                    6 => {
2354                        // PUBREL
2355                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2356                    }
2357                    7 => {
2358                        // PUBCOMP
2359                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2360                    }
2361                    8 => {
2362                        // SUBSCRIBE
2363                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2364                    }
2365                    9 => {
2366                        // SUBACK
2367                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2368                    }
2369                    10 => {
2370                        // UNSUBSCRIBE
2371                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2372                    }
2373                    11 => {
2374                        // UNSUBACK
2375                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2376                    }
2377                    12 => {
2378                        // PINGREQ
2379                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2380                    }
2381                    13 => {
2382                        // PINGRESP
2383                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2384                    }
2385                    14 => {
2386                        // DISCONNECT
2387                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2388                    }
2389                    15 => {
2390                        // AUTH
2391                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2392                    }
2393                    // invalid packet type
2394                    _ => {
2395                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2396                    }
2397                }
2398            }
2399            Version::Undetermined => {
2400                match packet_type {
2401                    1 => {
2402                        // CONNECT
2403                        if raw_packet.remaining_length() < 7 {
2404                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2405                            return events;
2406                        }
2407                        match raw_packet.data_as_slice()[6] {
2408                            // Protocol Version
2409                            4 => {
2410                                self.protocol_version = Version::V3_1_1;
2411                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2412                            }
2413                            5 => {
2414                                self.protocol_version = Version::V5_0;
2415                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2416                            }
2417                            _ => {
2418                                events.push(GenericEvent::NotifyError(
2419                                    MqttError::UnsupportedProtocolVersion,
2420                                ));
2421                            }
2422                        }
2423                    }
2424                    _ => {
2425                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2426                    }
2427                }
2428            }
2429        }
2430
2431        events
2432    }
2433
2434    fn process_recv_v3_1_1_connect(
2435        &mut self,
2436        raw_packet: RawPacket,
2437    ) -> Vec<GenericEvent<PacketIdType>> {
2438        let mut events = Vec::new();
2439        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2440            Ok((packet, _)) => {
2441                if self.status != ConnectionStatus::Disconnected {
2442                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2443                    return events;
2444                }
2445                self.initialize(false);
2446                self.status = ConnectionStatus::Connecting;
2447                if packet.keep_alive() > 0 {
2448                    self.pingreq_recv_timeout_ms =
2449                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2450                }
2451                if packet.clean_session() {
2452                    self.clear_store_related();
2453                } else {
2454                    self.need_store = true;
2455                }
2456                events.extend(self.refresh_pingreq_recv());
2457                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2458            }
2459            Err(e) => {
2460                if self.status == ConnectionStatus::Disconnected {
2461                    self.status = ConnectionStatus::Connecting;
2462                    let rc = match e {
2463                        MqttError::ClientIdentifierNotValid => {
2464                            ConnectReturnCode::IdentifierRejected
2465                        }
2466                        MqttError::BadUserNameOrPassword => {
2467                            ConnectReturnCode::BadUserNameOrPassword
2468                        }
2469                        MqttError::UnsupportedProtocolVersion => {
2470                            ConnectReturnCode::UnacceptableProtocolVersion
2471                        }
2472                        _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2473                    };
2474                    let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2475                    let connack_events = self.process_send_v3_1_1_connack(connack);
2476                    events.extend(connack_events);
2477                } else {
2478                    events.push(GenericEvent::RequestClose);
2479                }
2480                events.push(GenericEvent::NotifyError(e));
2481            }
2482        }
2483
2484        events
2485    }
2486
2487    fn process_recv_v5_0_connect(
2488        &mut self,
2489        raw_packet: RawPacket,
2490    ) -> Vec<GenericEvent<PacketIdType>> {
2491        let mut events = Vec::new();
2492        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2493            Ok((packet, _)) => {
2494                if self.status != ConnectionStatus::Disconnected {
2495                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2496                    return events;
2497                }
2498                self.initialize(false);
2499                self.status = ConnectionStatus::Connecting;
2500                if packet.keep_alive() > 0 {
2501                    self.pingreq_recv_timeout_ms =
2502                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2503                }
2504                if packet.clean_start() {
2505                    self.clear_store_related();
2506                }
2507                packet.props().iter().for_each(|prop| match prop {
2508                    Property::TopicAliasMaximum(p) => {
2509                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2510                    }
2511                    Property::ReceiveMaximum(p) => {
2512                        self.publish_send_max = Some(p.val());
2513                    }
2514                    Property::MaximumPacketSize(p) => {
2515                        self.maximum_packet_size_send = p.val();
2516                    }
2517                    Property::SessionExpiryInterval(p) => {
2518                        if p.val() != 0 {
2519                            self.need_store = true;
2520                        }
2521                    }
2522                    _ => {}
2523                });
2524                events.extend(self.refresh_pingreq_recv());
2525                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2526            }
2527            Err(e) => {
2528                if self.status == ConnectionStatus::Disconnected {
2529                    self.status = ConnectionStatus::Connecting;
2530                    let rc = match e {
2531                        MqttError::ClientIdentifierNotValid => {
2532                            ConnectReasonCode::ClientIdentifierNotValid
2533                        }
2534                        MqttError::BadUserNameOrPassword => {
2535                            ConnectReasonCode::BadAuthenticationMethod
2536                        }
2537                        MqttError::UnsupportedProtocolVersion => {
2538                            ConnectReasonCode::UnsupportedProtocolVersion
2539                        }
2540                        _ => ConnectReasonCode::UnspecifiedError,
2541                    };
2542                    let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2543                    let connack_events = self.process_send_v5_0_connack(connack);
2544                    events.extend(connack_events);
2545                } else {
2546                    let disconnect = v5_0::Disconnect::builder()
2547                        .reason_code(DisconnectReasonCode::ProtocolError)
2548                        .build()
2549                        .unwrap();
2550                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2551                    events.extend(disconnect_events);
2552                }
2553                events.push(GenericEvent::NotifyError(e));
2554            }
2555        }
2556
2557        events
2558    }
2559
2560    fn process_recv_v3_1_1_connack(
2561        &mut self,
2562        raw_packet: RawPacket,
2563    ) -> Vec<GenericEvent<PacketIdType>> {
2564        let mut events = Vec::new();
2565
2566        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2567            Ok((packet, _consumed)) => {
2568                if packet.return_code() == ConnectReturnCode::Accepted {
2569                    self.status = ConnectionStatus::Connected;
2570                    if packet.session_present() {
2571                        events.extend(self.send_stored());
2572                    } else {
2573                        self.clear_store_related();
2574                    }
2575                }
2576                events.push(GenericEvent::NotifyPacketReceived(
2577                    GenericPacket::V3_1_1Connack(packet),
2578                ));
2579            }
2580            Err(e) => {
2581                events.push(GenericEvent::RequestClose);
2582                events.push(GenericEvent::NotifyError(e));
2583            }
2584        }
2585
2586        events
2587    }
2588
2589    fn process_recv_v5_0_connack(
2590        &mut self,
2591        raw_packet: RawPacket,
2592    ) -> Vec<GenericEvent<PacketIdType>> {
2593        let mut events = Vec::new();
2594
2595        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2596            Ok((packet, _consumed)) => {
2597                if packet.reason_code() == ConnectReasonCode::Success {
2598                    self.status = ConnectionStatus::Connected;
2599
2600                    // Process properties
2601                    for prop in packet.props() {
2602                        match prop {
2603                            Property::TopicAliasMaximum(val) => {
2604                                if val.val() > 0 {
2605                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2606                                }
2607                            }
2608                            Property::ReceiveMaximum(val) => {
2609                                assert!(val.val() != 0);
2610                                self.publish_send_max = Some(val.val());
2611                            }
2612                            Property::MaximumPacketSize(val) => {
2613                                assert!(val.val() != 0);
2614                                self.maximum_packet_size_send = val.val();
2615                            }
2616                            Property::ServerKeepAlive(val) => {
2617                                // Set PINGREQ send interval if this is a client
2618                                let timeout_ms = (val.val() as u64) * 1000;
2619                                self.pingreq_send_interval_ms = Some(timeout_ms);
2620                            }
2621                            _ => {
2622                                // Ignore other properties
2623                            }
2624                        }
2625                    }
2626
2627                    if packet.session_present() {
2628                        events.extend(self.send_stored());
2629                    } else {
2630                        self.clear_store_related();
2631                    }
2632                }
2633                events.push(GenericEvent::NotifyPacketReceived(
2634                    GenericPacket::V5_0Connack(packet),
2635                ));
2636            }
2637            Err(e) => {
2638                if self.status == ConnectionStatus::Connected {
2639                    let disconnect = v5_0::Disconnect::builder()
2640                        .reason_code(e.into())
2641                        .build()
2642                        .unwrap();
2643                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2644                    events.extend(disconnect_events);
2645                }
2646                events.push(GenericEvent::NotifyError(e));
2647            }
2648        }
2649
2650        events
2651    }
2652
2653    fn process_recv_v3_1_1_publish(
2654        &mut self,
2655        raw_packet: RawPacket,
2656    ) -> Vec<GenericEvent<PacketIdType>> {
2657        let mut events = Vec::new();
2658
2659        let flags = raw_packet.flags();
2660        match &raw_packet.data {
2661            PacketData::Publish(arc) => {
2662                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2663                    Ok((packet, _consumed)) => {
2664                        match packet.qos() {
2665                            Qos::AtMostOnce => {
2666                                events.extend(self.refresh_pingreq_recv());
2667                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2668                            }
2669                            Qos::AtLeastOnce => {
2670                                let packet_id = packet.packet_id().unwrap();
2671                                if self.status == ConnectionStatus::Connected
2672                                    && self.auto_pub_response
2673                                {
2674                                    // Send PUBACK automatically
2675                                    let puback = v3_1_1::GenericPuback::builder()
2676                                        .packet_id(packet_id)
2677                                        .build()
2678                                        .unwrap();
2679                                    events.extend(self.process_send_v3_1_1_puback(puback));
2680                                }
2681                                events.extend(self.refresh_pingreq_recv());
2682                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2683                            }
2684                            Qos::ExactlyOnce => {
2685                                let packet_id = packet.packet_id().unwrap();
2686                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2687
2688                                if self.status == ConnectionStatus::Connected
2689                                    && (self.auto_pub_response || already_handled)
2690                                {
2691                                    let pubrec = v3_1_1::GenericPubrec::builder()
2692                                        .packet_id(packet_id)
2693                                        .build()
2694                                        .unwrap();
2695                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2696                                }
2697                                events.extend(self.refresh_pingreq_recv());
2698                                if !already_handled {
2699                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2700                                }
2701                            }
2702                        }
2703                    }
2704                    Err(e) => {
2705                        events.push(GenericEvent::RequestClose);
2706                        events.push(GenericEvent::NotifyError(e));
2707                    }
2708                }
2709            }
2710            PacketData::Normal(_) => {
2711                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2712            }
2713        }
2714
2715        events
2716    }
2717
2718    fn process_recv_v5_0_publish(
2719        &mut self,
2720        raw_packet: RawPacket,
2721    ) -> Vec<GenericEvent<PacketIdType>> {
2722        let mut events = Vec::new();
2723
2724        let flags = raw_packet.flags();
2725        match &raw_packet.data {
2726            PacketData::Publish(arc) => {
2727                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2728                    Ok((mut packet, _consumed)) => {
2729                        let mut already_handled = false;
2730                        let mut puback_send = false;
2731                        let mut pubrec_send = false;
2732
2733                        match packet.qos() {
2734                            Qos::AtLeastOnce => {
2735                                let packet_id = packet.packet_id().unwrap();
2736                                if let Some(max) = self.publish_recv_max {
2737                                    if self.publish_recv.len() >= max as usize {
2738                                        let disconnect = v5_0::Disconnect::builder()
2739                                            .reason_code(
2740                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2741                                            )
2742                                            .build()
2743                                            .unwrap();
2744                                        events
2745                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2746                                        events.push(GenericEvent::NotifyError(
2747                                            MqttError::ReceiveMaximumExceeded,
2748                                        ));
2749                                        return events;
2750                                    }
2751                                }
2752                                self.publish_recv.insert(packet_id);
2753                                if self.auto_pub_response
2754                                    && self.status == ConnectionStatus::Connected
2755                                {
2756                                    puback_send = true;
2757                                }
2758                            }
2759                            Qos::ExactlyOnce => {
2760                                let packet_id = packet.packet_id().unwrap();
2761                                if let Some(max) = self.publish_recv_max {
2762                                    if self.publish_recv.len() >= max as usize {
2763                                        let disconnect = v5_0::Disconnect::builder()
2764                                            .reason_code(
2765                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2766                                            )
2767                                            .build()
2768                                            .unwrap();
2769                                        events
2770                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2771                                        events.push(GenericEvent::NotifyError(
2772                                            MqttError::ReceiveMaximumExceeded,
2773                                        ));
2774                                        return events;
2775                                    }
2776                                }
2777                                self.publish_recv.insert(packet_id);
2778
2779                                if !self.qos2_publish_handled.insert(packet_id) {
2780                                    already_handled = true;
2781                                }
2782                                if self.status == ConnectionStatus::Connected
2783                                    && (self.auto_pub_response || already_handled)
2784                                {
2785                                    pubrec_send = true;
2786                                }
2787                            }
2788                            Qos::AtMostOnce => {
2789                                // No packet ID handling for QoS 0
2790                            }
2791                        }
2792
2793                        // Topic Alias handling
2794                        if packet.topic_name().is_empty() {
2795                            // Extract topic from topic_alias
2796                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2797                                if ta == 0
2798                                    || self.topic_alias_recv.is_none()
2799                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2800                                {
2801                                    let disconnect = v5_0::Disconnect::builder()
2802                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2803                                        .build()
2804                                        .unwrap();
2805                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2806                                    events.push(GenericEvent::NotifyError(
2807                                        MqttError::TopicAliasInvalid,
2808                                    ));
2809                                    return events;
2810                                }
2811
2812                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2813                                    if let Some(topic_name) = topic_alias_recv.get(ta) {
2814                                        match packet.add_extracted_topic_name(topic_name) {
2815                                            Ok(extracted) => {
2816                                                packet = extracted;
2817                                            }
2818                                            Err(_e) => {
2819                                                error!("topic alias extract failed: {_e}");
2820                                                let disconnect = v5_0::Disconnect::builder()
2821                                                    .reason_code(
2822                                                        DisconnectReasonCode::TopicAliasInvalid,
2823                                                    )
2824                                                    .build()
2825                                                    .unwrap();
2826                                                events.extend(
2827                                                    self.process_send_v5_0_disconnect(disconnect),
2828                                                );
2829                                                events.push(GenericEvent::NotifyError(
2830                                                    MqttError::TopicAliasInvalid,
2831                                                ));
2832                                                return events;
2833                                            }
2834                                        }
2835                                    } else {
2836                                        let disconnect = v5_0::Disconnect::builder()
2837                                            .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2838                                            .build()
2839                                            .unwrap();
2840                                        events
2841                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2842                                        events.push(GenericEvent::NotifyError(
2843                                            MqttError::TopicAliasInvalid,
2844                                        ));
2845                                        return events;
2846                                    }
2847                                }
2848                            } else {
2849                                let disconnect = v5_0::Disconnect::builder()
2850                                    .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2851                                    .build()
2852                                    .unwrap();
2853                                events.extend(self.process_send_v5_0_disconnect(disconnect));
2854                                events
2855                                    .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2856                                return events;
2857                            }
2858                        } else {
2859                            // Topic is not empty, check if topic alias needs to be registered
2860                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2861                                if ta == 0
2862                                    || self.topic_alias_recv.is_none()
2863                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2864                                {
2865                                    let disconnect = v5_0::Disconnect::builder()
2866                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2867                                        .build()
2868                                        .unwrap();
2869                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2870                                    events.push(GenericEvent::NotifyError(
2871                                        MqttError::TopicAliasInvalid,
2872                                    ));
2873                                    return events;
2874                                }
2875                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2876                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2877                                }
2878                            }
2879                        }
2880
2881                        // Send response packets
2882                        if puback_send {
2883                            let puback = v5_0::GenericPuback::builder()
2884                                .packet_id(packet.packet_id().unwrap())
2885                                .build()
2886                                .unwrap();
2887                            events.extend(self.process_send_v5_0_puback(puback));
2888                        }
2889                        if pubrec_send {
2890                            let pubrec = v5_0::GenericPubrec::builder()
2891                                .packet_id(packet.packet_id().unwrap())
2892                                .build()
2893                                .unwrap();
2894                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2895                        }
2896
2897                        // Refresh PINGREQ receive timer
2898                        events.extend(self.refresh_pingreq_recv());
2899
2900                        // Notify packet received (only if not already handled)
2901                        if !already_handled {
2902                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2903                        }
2904                    }
2905                    Err(e) => {
2906                        if self.status == ConnectionStatus::Connected {
2907                            let disconnect = v5_0::Disconnect::builder()
2908                                .reason_code(e.into())
2909                                .build()
2910                                .unwrap();
2911                            let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2912                            events.extend(disconnect_events);
2913                        }
2914                        events.push(GenericEvent::NotifyError(e));
2915                    }
2916                }
2917            }
2918            PacketData::Normal(_) => {
2919                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2920            }
2921        }
2922
2923        events
2924    }
2925
2926    fn process_recv_v3_1_1_puback(
2927        &mut self,
2928        raw_packet: RawPacket,
2929    ) -> Vec<GenericEvent<PacketIdType>> {
2930        let mut events = Vec::new();
2931
2932        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2933            Ok((packet, _)) => {
2934                let packet_id = packet.packet_id();
2935                if self.pid_puback.remove(&packet_id) {
2936                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2937                    if self.pid_man.is_used_id(packet_id) {
2938                        self.pid_man.release_id(packet_id);
2939                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2940                    }
2941                    events.extend(self.refresh_pingreq_recv());
2942                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2943                } else {
2944                    events.push(GenericEvent::RequestClose);
2945                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2946                }
2947            }
2948            Err(e) => {
2949                events.push(GenericEvent::RequestClose);
2950                events.push(GenericEvent::NotifyError(e));
2951            }
2952        }
2953
2954        events
2955    }
2956
2957    fn process_recv_v5_0_puback(
2958        &mut self,
2959        raw_packet: RawPacket,
2960    ) -> Vec<GenericEvent<PacketIdType>> {
2961        let mut events = Vec::new();
2962
2963        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2964            Ok((packet, _)) => {
2965                let packet_id = packet.packet_id();
2966                if self.pid_puback.remove(&packet_id) {
2967                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2968                    if self.pid_man.is_used_id(packet_id) {
2969                        self.pid_man.release_id(packet_id);
2970                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2971                    }
2972                    if self.publish_send_max.is_some() {
2973                        self.publish_send_count -= 1;
2974                    }
2975                    events.extend(self.refresh_pingreq_recv());
2976                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2977                } else {
2978                    let disconnect = v5_0::Disconnect::builder()
2979                        .reason_code(DisconnectReasonCode::ProtocolError)
2980                        .build()
2981                        .unwrap();
2982                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2983                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2984                }
2985            }
2986            Err(e) => {
2987                let disconnect = v5_0::Disconnect::builder()
2988                    .reason_code(DisconnectReasonCode::ProtocolError)
2989                    .build()
2990                    .unwrap();
2991                events.extend(self.process_send_v5_0_disconnect(disconnect));
2992                events.push(GenericEvent::NotifyError(e));
2993            }
2994        }
2995
2996        events
2997    }
2998
2999    fn process_recv_v3_1_1_pubrec(
3000        &mut self,
3001        raw_packet: RawPacket,
3002    ) -> Vec<GenericEvent<PacketIdType>> {
3003        let mut events = Vec::new();
3004
3005        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3006            Ok((packet, _)) => {
3007                let packet_id = packet.packet_id();
3008                if self.pid_pubrec.remove(&packet_id) {
3009                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
3010                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3011                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
3012                            .packet_id(packet_id)
3013                            .build()
3014                            .unwrap();
3015                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
3016                    }
3017                    events.extend(self.refresh_pingreq_recv());
3018                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3019                } else {
3020                    events.push(GenericEvent::RequestClose);
3021                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3022                }
3023            }
3024            Err(e) => {
3025                events.push(GenericEvent::RequestClose);
3026                events.push(GenericEvent::NotifyError(e));
3027            }
3028        }
3029
3030        events
3031    }
3032
3033    fn process_recv_v5_0_pubrec(
3034        &mut self,
3035        raw_packet: RawPacket,
3036    ) -> Vec<GenericEvent<PacketIdType>> {
3037        let mut events = Vec::new();
3038
3039        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3040            Ok((packet, _)) => {
3041                let packet_id = packet.packet_id();
3042                if self.pid_pubrec.remove(&packet_id) {
3043                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3044                    if let Some(reason_code) = packet.reason_code() {
3045                        if reason_code != PubrecReasonCode::Success {
3046                            if self.pid_man.is_used_id(packet_id) {
3047                                self.pid_man.release_id(packet_id);
3048                                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3049                            }
3050                            self.qos2_publish_processing.remove(&packet_id);
3051                            if self.publish_send_max.is_some() {
3052                                self.publish_send_count -= 1;
3053                            }
3054                        } else if self.auto_pub_response
3055                            && self.status == ConnectionStatus::Connected
3056                        {
3057                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3058                                .packet_id(packet_id)
3059                                .build()
3060                                .unwrap();
3061                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3062                        }
3063                    } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3064                        let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3065                            .packet_id(packet_id)
3066                            .build()
3067                            .unwrap();
3068                        events.extend(self.process_send_v5_0_pubrel(pubrel));
3069                    }
3070                    events.extend(self.refresh_pingreq_recv());
3071                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3072                } else {
3073                    let disconnect = v5_0::Disconnect::builder()
3074                        .reason_code(DisconnectReasonCode::ProtocolError)
3075                        .build()
3076                        .unwrap();
3077                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3078                    events.push(GenericEvent::NotifyError(MqttError::from(
3079                        DisconnectReasonCode::ProtocolError,
3080                    )));
3081                }
3082            }
3083            Err(e) => {
3084                let disconnect = v5_0::Disconnect::builder()
3085                    .reason_code(DisconnectReasonCode::ProtocolError)
3086                    .build()
3087                    .unwrap();
3088                events.extend(self.process_send_v5_0_disconnect(disconnect));
3089                events.push(GenericEvent::NotifyError(e));
3090            }
3091        }
3092
3093        events
3094    }
3095
3096    fn process_recv_v3_1_1_pubrel(
3097        &mut self,
3098        raw_packet: RawPacket,
3099    ) -> Vec<GenericEvent<PacketIdType>> {
3100        let mut events = Vec::new();
3101
3102        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3103            Ok((packet, _)) => {
3104                let packet_id = packet.packet_id();
3105                self.qos2_publish_handled.remove(&packet_id);
3106                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3107                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3108                        .packet_id(packet_id)
3109                        .build()
3110                        .unwrap();
3111                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3112                }
3113                events.extend(self.refresh_pingreq_recv());
3114                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3115            }
3116            Err(e) => {
3117                events.push(GenericEvent::RequestClose);
3118                events.push(GenericEvent::NotifyError(e));
3119            }
3120        }
3121
3122        events
3123    }
3124
3125    fn process_recv_v5_0_pubrel(
3126        &mut self,
3127        raw_packet: RawPacket,
3128    ) -> Vec<GenericEvent<PacketIdType>> {
3129        let mut events = Vec::new();
3130
3131        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3132            Ok((packet, _)) => {
3133                let packet_id = packet.packet_id();
3134                self.qos2_publish_handled.remove(&packet_id);
3135                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3136                    let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3137                        .packet_id(packet_id)
3138                        .build()
3139                        .unwrap();
3140                    events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3141                }
3142                events.extend(self.refresh_pingreq_recv());
3143                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3144            }
3145            Err(e) => {
3146                let disconnect = v5_0::Disconnect::builder()
3147                    .reason_code(DisconnectReasonCode::ProtocolError)
3148                    .build()
3149                    .unwrap();
3150                events.extend(self.process_send_v5_0_disconnect(disconnect));
3151                events.push(GenericEvent::NotifyError(e));
3152            }
3153        }
3154
3155        events
3156    }
3157
3158    fn process_recv_v3_1_1_pubcomp(
3159        &mut self,
3160        raw_packet: RawPacket,
3161    ) -> Vec<GenericEvent<PacketIdType>> {
3162        let mut events = Vec::new();
3163
3164        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3165            Ok((packet, _)) => {
3166                let packet_id = packet.packet_id();
3167                if self.pid_pubcomp.remove(&packet_id) {
3168                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3169                    if self.pid_man.is_used_id(packet_id) {
3170                        self.pid_man.release_id(packet_id);
3171                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3172                    }
3173                    self.qos2_publish_processing.remove(&packet_id);
3174                    events.extend(self.refresh_pingreq_recv());
3175                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3176                } else {
3177                    events.push(GenericEvent::RequestClose);
3178                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3179                }
3180            }
3181            Err(e) => {
3182                events.push(GenericEvent::RequestClose);
3183                events.push(GenericEvent::NotifyError(e));
3184            }
3185        }
3186
3187        events
3188    }
3189
3190    fn process_recv_v5_0_pubcomp(
3191        &mut self,
3192        raw_packet: RawPacket,
3193    ) -> Vec<GenericEvent<PacketIdType>> {
3194        let mut events = Vec::new();
3195
3196        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3197            Ok((packet, _)) => {
3198                let packet_id = packet.packet_id();
3199                if self.pid_pubcomp.remove(&packet_id) {
3200                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3201                    if self.pid_man.is_used_id(packet_id) {
3202                        self.pid_man.release_id(packet_id);
3203                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3204                    }
3205                    self.qos2_publish_processing.remove(&packet_id);
3206                    if self.publish_send_max.is_some() {
3207                        self.publish_send_count -= 1;
3208                    }
3209                    events.extend(self.refresh_pingreq_recv());
3210                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3211                } else {
3212                    let disconnect = v5_0::Disconnect::builder()
3213                        .reason_code(DisconnectReasonCode::ProtocolError)
3214                        .build()
3215                        .unwrap();
3216                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3217                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3218                }
3219            }
3220            Err(e) => {
3221                let disconnect = v5_0::Disconnect::builder()
3222                    .reason_code(DisconnectReasonCode::ProtocolError)
3223                    .build()
3224                    .unwrap();
3225                events.extend(self.process_send_v5_0_disconnect(disconnect));
3226                events.push(GenericEvent::NotifyError(e));
3227            }
3228        }
3229
3230        events
3231    }
3232
3233    fn process_recv_v3_1_1_subscribe(
3234        &mut self,
3235        raw_packet: RawPacket,
3236    ) -> Vec<GenericEvent<PacketIdType>> {
3237        let mut events = Vec::new();
3238
3239        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3240            Ok((packet, _)) => {
3241                events.extend(self.refresh_pingreq_recv());
3242                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3243            }
3244            Err(e) => {
3245                events.push(GenericEvent::RequestClose);
3246                events.push(GenericEvent::NotifyError(e));
3247            }
3248        }
3249
3250        events
3251    }
3252
3253    fn process_recv_v5_0_subscribe(
3254        &mut self,
3255        raw_packet: RawPacket,
3256    ) -> Vec<GenericEvent<PacketIdType>> {
3257        let mut events = Vec::new();
3258
3259        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3260            Ok((packet, _)) => {
3261                events.extend(self.refresh_pingreq_recv());
3262                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3263            }
3264            Err(e) => {
3265                let disconnect = v5_0::Disconnect::builder()
3266                    .reason_code(DisconnectReasonCode::ProtocolError)
3267                    .build()
3268                    .unwrap();
3269                events.extend(self.process_send_v5_0_disconnect(disconnect));
3270                events.push(GenericEvent::NotifyError(e));
3271            }
3272        }
3273
3274        events
3275    }
3276
3277    fn process_recv_v3_1_1_suback(
3278        &mut self,
3279        raw_packet: RawPacket,
3280    ) -> Vec<GenericEvent<PacketIdType>> {
3281        let mut events = Vec::new();
3282
3283        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3284            Ok((packet, _)) => {
3285                let packet_id = packet.packet_id();
3286                if self.pid_suback.remove(&packet_id) {
3287                    if self.pid_man.is_used_id(packet_id) {
3288                        self.pid_man.release_id(packet_id);
3289                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3290                    }
3291                    events.extend(self.refresh_pingreq_recv());
3292                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3293                } else {
3294                    events.push(GenericEvent::RequestClose);
3295                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3296                }
3297            }
3298            Err(e) => {
3299                events.push(GenericEvent::RequestClose);
3300                events.push(GenericEvent::NotifyError(e));
3301            }
3302        }
3303
3304        events
3305    }
3306
3307    fn process_recv_v5_0_suback(
3308        &mut self,
3309        raw_packet: RawPacket,
3310    ) -> Vec<GenericEvent<PacketIdType>> {
3311        let mut events = Vec::new();
3312
3313        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3314            Ok((packet, _)) => {
3315                let packet_id = packet.packet_id();
3316                if self.pid_suback.remove(&packet_id) {
3317                    if self.pid_man.is_used_id(packet_id) {
3318                        self.pid_man.release_id(packet_id);
3319                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3320                    }
3321                    events.extend(self.refresh_pingreq_recv());
3322                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3323                } else {
3324                    let disconnect = v5_0::Disconnect::builder()
3325                        .reason_code(DisconnectReasonCode::ProtocolError)
3326                        .build()
3327                        .unwrap();
3328                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3329                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3330                }
3331            }
3332            Err(e) => {
3333                let disconnect = v5_0::Disconnect::builder()
3334                    .reason_code(DisconnectReasonCode::ProtocolError)
3335                    .build()
3336                    .unwrap();
3337                events.extend(self.process_send_v5_0_disconnect(disconnect));
3338                events.push(GenericEvent::NotifyError(e));
3339            }
3340        }
3341
3342        events
3343    }
3344
3345    fn process_recv_v3_1_1_unsubscribe(
3346        &mut self,
3347        raw_packet: RawPacket,
3348    ) -> Vec<GenericEvent<PacketIdType>> {
3349        let mut events = Vec::new();
3350
3351        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3352            Ok((packet, _)) => {
3353                events.extend(self.refresh_pingreq_recv());
3354                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3355            }
3356            Err(e) => {
3357                events.push(GenericEvent::RequestClose);
3358                events.push(GenericEvent::NotifyError(e));
3359            }
3360        }
3361
3362        events
3363    }
3364
3365    fn process_recv_v5_0_unsubscribe(
3366        &mut self,
3367        raw_packet: RawPacket,
3368    ) -> Vec<GenericEvent<PacketIdType>> {
3369        let mut events = Vec::new();
3370
3371        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3372            Ok((packet, _)) => {
3373                events.extend(self.refresh_pingreq_recv());
3374                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3375            }
3376            Err(e) => {
3377                let disconnect = v5_0::Disconnect::builder()
3378                    .reason_code(DisconnectReasonCode::ProtocolError)
3379                    .build()
3380                    .unwrap();
3381                events.extend(self.process_send_v5_0_disconnect(disconnect));
3382                events.push(GenericEvent::NotifyError(e));
3383            }
3384        }
3385
3386        events
3387    }
3388
3389    fn process_recv_v3_1_1_unsuback(
3390        &mut self,
3391        raw_packet: RawPacket,
3392    ) -> Vec<GenericEvent<PacketIdType>> {
3393        let mut events = Vec::new();
3394
3395        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3396            Ok((packet, _)) => {
3397                let packet_id = packet.packet_id();
3398                if self.pid_unsuback.remove(&packet_id) {
3399                    if self.pid_man.is_used_id(packet_id) {
3400                        self.pid_man.release_id(packet_id);
3401                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3402                    }
3403                    events.extend(self.refresh_pingreq_recv());
3404                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3405                } else {
3406                    events.push(GenericEvent::RequestClose);
3407                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3408                }
3409            }
3410            Err(e) => {
3411                events.push(GenericEvent::RequestClose);
3412                events.push(GenericEvent::NotifyError(e));
3413            }
3414        }
3415
3416        events
3417    }
3418
3419    fn process_recv_v5_0_unsuback(
3420        &mut self,
3421        raw_packet: RawPacket,
3422    ) -> Vec<GenericEvent<PacketIdType>> {
3423        let mut events = Vec::new();
3424
3425        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3426            Ok((packet, _)) => {
3427                let packet_id = packet.packet_id();
3428                if self.pid_unsuback.remove(&packet_id) {
3429                    if self.pid_man.is_used_id(packet_id) {
3430                        self.pid_man.release_id(packet_id);
3431                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3432                    }
3433                    events.extend(self.refresh_pingreq_recv());
3434                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3435                } else {
3436                    let disconnect = v5_0::Disconnect::builder()
3437                        .reason_code(DisconnectReasonCode::ProtocolError)
3438                        .build()
3439                        .unwrap();
3440                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3441                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3442                }
3443            }
3444            Err(e) => {
3445                let disconnect = v5_0::Disconnect::builder()
3446                    .reason_code(DisconnectReasonCode::ProtocolError)
3447                    .build()
3448                    .unwrap();
3449                events.extend(self.process_send_v5_0_disconnect(disconnect));
3450                events.push(GenericEvent::NotifyError(e));
3451            }
3452        }
3453
3454        events
3455    }
3456
3457    fn process_recv_v3_1_1_pingreq(
3458        &mut self,
3459        raw_packet: RawPacket,
3460    ) -> Vec<GenericEvent<PacketIdType>> {
3461        let mut events = Vec::new();
3462
3463        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3464            Ok((packet, _)) => {
3465                if (Role::IS_SERVER || Role::IS_ANY)
3466                    && !self.is_client
3467                    && self.auto_ping_response
3468                    && self.status == ConnectionStatus::Connected
3469                {
3470                    let pingresp = v3_1_1::Pingresp::new();
3471                    events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3472                }
3473                events.extend(self.refresh_pingreq_recv());
3474                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3475            }
3476            Err(e) => {
3477                events.push(GenericEvent::RequestClose);
3478                events.push(GenericEvent::NotifyError(e));
3479            }
3480        }
3481
3482        events
3483    }
3484
3485    fn process_recv_v5_0_pingreq(
3486        &mut self,
3487        raw_packet: RawPacket,
3488    ) -> Vec<GenericEvent<PacketIdType>> {
3489        let mut events = Vec::new();
3490
3491        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3492            Ok((packet, _)) => {
3493                if (Role::IS_SERVER || Role::IS_ANY)
3494                    && !self.is_client
3495                    && self.auto_ping_response
3496                    && self.status == ConnectionStatus::Connected
3497                {
3498                    let pingresp = v5_0::Pingresp::new();
3499                    events.extend(self.process_send_v5_0_pingresp(pingresp));
3500                }
3501                events.extend(self.refresh_pingreq_recv());
3502                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3503            }
3504            Err(e) => {
3505                let disconnect = v5_0::Disconnect::builder()
3506                    .reason_code(DisconnectReasonCode::ProtocolError)
3507                    .build()
3508                    .unwrap();
3509                events.extend(self.process_send_v5_0_disconnect(disconnect));
3510                events.push(GenericEvent::NotifyError(e));
3511            }
3512        }
3513
3514        events
3515    }
3516
3517    fn process_recv_v3_1_1_pingresp(
3518        &mut self,
3519        raw_packet: RawPacket,
3520    ) -> Vec<GenericEvent<PacketIdType>> {
3521        let mut events = Vec::new();
3522
3523        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3524            Ok((packet, _)) => {
3525                self.pingresp_recv_set = false;
3526                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3527                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3528            }
3529            Err(e) => {
3530                events.push(GenericEvent::RequestClose);
3531                events.push(GenericEvent::NotifyError(e));
3532            }
3533        }
3534
3535        events
3536    }
3537
3538    fn process_recv_v5_0_pingresp(
3539        &mut self,
3540        raw_packet: RawPacket,
3541    ) -> Vec<GenericEvent<PacketIdType>> {
3542        let mut events = Vec::new();
3543
3544        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3545            Ok((packet, _)) => {
3546                self.pingresp_recv_set = false;
3547                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3548                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3549            }
3550            Err(e) => {
3551                let disconnect = v5_0::Disconnect::builder()
3552                    .reason_code(DisconnectReasonCode::ProtocolError)
3553                    .build()
3554                    .unwrap();
3555                events.extend(self.process_send_v5_0_disconnect(disconnect));
3556                events.push(GenericEvent::NotifyError(e));
3557            }
3558        }
3559
3560        events
3561    }
3562
3563    fn process_recv_v3_1_1_disconnect(
3564        &mut self,
3565        raw_packet: RawPacket,
3566    ) -> Vec<GenericEvent<PacketIdType>> {
3567        let mut events = Vec::new();
3568
3569        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3570            Ok((packet, _)) => {
3571                self.cancel_timers(&mut events);
3572                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3573            }
3574            Err(e) => {
3575                events.push(GenericEvent::RequestClose);
3576                events.push(GenericEvent::NotifyError(e));
3577            }
3578        }
3579
3580        events
3581    }
3582
3583    fn process_recv_v5_0_disconnect(
3584        &mut self,
3585        raw_packet: RawPacket,
3586    ) -> Vec<GenericEvent<PacketIdType>> {
3587        let mut events = Vec::new();
3588
3589        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3590            Ok((packet, _)) => {
3591                self.cancel_timers(&mut events);
3592                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3593            }
3594            Err(e) => {
3595                let disconnect = v5_0::Disconnect::builder()
3596                    .reason_code(DisconnectReasonCode::ProtocolError)
3597                    .build()
3598                    .unwrap();
3599                events.extend(self.process_send_v5_0_disconnect(disconnect));
3600                events.push(GenericEvent::NotifyError(e));
3601            }
3602        }
3603
3604        events
3605    }
3606
3607    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3608        let mut events = Vec::new();
3609
3610        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3611            Ok((packet, _)) => {
3612                events.extend(self.refresh_pingreq_recv());
3613                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3614            }
3615            Err(e) => {
3616                let disconnect = v5_0::Disconnect::builder()
3617                    .reason_code(DisconnectReasonCode::ProtocolError)
3618                    .build()
3619                    .unwrap();
3620                events.extend(self.process_send_v5_0_disconnect(disconnect));
3621                events.push(GenericEvent::NotifyError(e));
3622            }
3623        }
3624
3625        events
3626    }
3627
3628    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3629        let mut events = Vec::new();
3630        if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3631            if self.status == ConnectionStatus::Connecting
3632                || self.status == ConnectionStatus::Connected
3633            {
3634                self.pingreq_recv_set = true;
3635                events.push(GenericEvent::RequestTimerReset {
3636                    kind: TimerKind::PingreqRecv,
3637                    duration_ms: timeout_ms,
3638                });
3639            } else {
3640                self.pingreq_recv_set = false;
3641                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3642            }
3643        }
3644
3645        events
3646    }
3647
3648    /// Cancel timers and collect events instead of calling handlers
3649    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3650        if self.pingreq_send_set {
3651            self.pingreq_send_set = false;
3652            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3653        }
3654        if self.pingreq_recv_set {
3655            self.pingreq_recv_set = false;
3656            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3657        }
3658        if self.pingresp_recv_set {
3659            self.pingresp_recv_set = false;
3660            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3661        }
3662    }
3663
3664    /// Helper function to extract TopicAlias from properties
3665    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3666        for prop in props {
3667            if let Property::TopicAlias(ta) = prop {
3668                return Some(ta.val());
3669            }
3670        }
3671        None
3672    }
3673
3674    #[allow(dead_code)]
3675    fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3676        self.pid_man.is_used_id(packet_id)
3677    }
3678}
3679
3680// traits
3681
3682pub trait RecvBehavior<Role, PacketIdType>
3683where
3684    PacketIdType: IsPacketId,
3685{
3686    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3687}
3688
3689// RecvBehavior implementations
3690impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3691    for GenericConnection<role::Client, PacketIdType>
3692where
3693    PacketIdType: IsPacketId,
3694{
3695    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3696        self.recv(data)
3697    }
3698}
3699
3700impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3701    for GenericConnection<role::Server, PacketIdType>
3702where
3703    PacketIdType: IsPacketId,
3704{
3705    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3706        self.recv(data)
3707    }
3708}
3709
3710impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3711    for GenericConnection<role::Any, PacketIdType>
3712where
3713    PacketIdType: IsPacketId,
3714{
3715    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3716        self.recv(data)
3717    }
3718}
3719
3720// tests
3721
3722#[cfg(test)]
3723mod tests {
3724    use super::*;
3725    use crate::mqtt::connection::version::Version;
3726    use crate::mqtt::packet::TopicAliasSend;
3727    use crate::mqtt::role;
3728
3729    #[test]
3730    fn test_initialize_client_mode() {
3731        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3732
3733        // Initialize in client mode
3734        connection.initialize(true);
3735
3736        // Verify client mode is set
3737        assert!(connection.is_client);
3738        assert_eq!(connection.publish_send_count, 0);
3739        assert!(connection.publish_send_max.is_none());
3740        assert!(connection.publish_recv_max.is_none());
3741        assert!(!connection.need_store);
3742    }
3743
3744    #[test]
3745    fn test_initialize_server_mode() {
3746        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3747
3748        // Initialize in server mode
3749        connection.initialize(false);
3750
3751        // Verify server mode is set
3752        assert!(!connection.is_client);
3753        assert_eq!(connection.publish_send_count, 0);
3754        assert!(connection.publish_send_max.is_none());
3755        assert!(connection.publish_recv_max.is_none());
3756        assert!(!connection.need_store);
3757    }
3758
3759    #[test]
3760    fn test_validate_topic_alias_no_topic_alias_send() {
3761        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3762
3763        // Should return None when topic_alias_send is not configured
3764        let result = connection.validate_topic_alias(Some(1));
3765        assert!(result.is_none());
3766    }
3767
3768    #[test]
3769    fn test_validate_topic_alias_none_input() {
3770        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3771
3772        // Should return None when no topic alias is provided
3773        let result = connection.validate_topic_alias(None);
3774        assert!(result.is_none());
3775    }
3776
3777    #[test]
3778    fn test_validate_topic_alias_range_no_topic_alias_send() {
3779        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3780
3781        // Should return false when topic_alias_send is not configured
3782        let result = connection.validate_topic_alias_range(1);
3783        assert!(!result);
3784    }
3785
3786    #[test]
3787    fn test_validate_topic_alias_range_zero() {
3788        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3789
3790        // Set up topic alias send with max 10
3791        let topic_alias_send = TopicAliasSend::new(10);
3792        connection.topic_alias_send = Some(topic_alias_send);
3793
3794        // Should return false for alias 0
3795        let result = connection.validate_topic_alias_range(0);
3796        assert!(!result);
3797    }
3798
3799    #[test]
3800    fn test_validate_topic_alias_range_over_max() {
3801        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3802
3803        // Set up topic alias send with max 5
3804        let topic_alias_send = TopicAliasSend::new(5);
3805        connection.topic_alias_send = Some(topic_alias_send);
3806
3807        // Should return false for alias > max
3808        let result = connection.validate_topic_alias_range(6);
3809        assert!(!result);
3810    }
3811
3812    #[test]
3813    fn test_validate_topic_alias_range_valid() {
3814        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3815
3816        // Set up topic alias send with max 10
3817        let topic_alias_send = TopicAliasSend::new(10);
3818        connection.topic_alias_send = Some(topic_alias_send);
3819
3820        // Should return true for valid aliases
3821        assert!(connection.validate_topic_alias_range(1));
3822        assert!(connection.validate_topic_alias_range(5));
3823        assert!(connection.validate_topic_alias_range(10));
3824    }
3825
3826    #[test]
3827    fn test_validate_topic_alias_with_registered_alias() {
3828        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3829
3830        // Set up topic alias send with max 10
3831        let mut topic_alias_send = TopicAliasSend::new(10);
3832        topic_alias_send.insert_or_update("test/topic", 5);
3833        connection.topic_alias_send = Some(topic_alias_send);
3834
3835        // Should return the topic name for registered alias
3836        let result = connection.validate_topic_alias(Some(5));
3837        assert_eq!(result, Some("test/topic".to_string()));
3838    }
3839
3840    #[test]
3841    fn test_validate_topic_alias_unregistered_alias() {
3842        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3843
3844        // Set up topic alias send with max 10 but don't register any aliases
3845        let topic_alias_send = TopicAliasSend::new(10);
3846        connection.topic_alias_send = Some(topic_alias_send);
3847
3848        // Should return None for unregistered alias
3849        let result = connection.validate_topic_alias(Some(5));
3850        assert!(result.is_none());
3851    }
3852
3853    #[test]
3854    fn test_validate_maximum_packet_size_within_limit() {
3855        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3856
3857        // Default maximum_packet_size_send is u32::MAX
3858        let result = connection.validate_maximum_packet_size_send(1000);
3859        assert!(result);
3860    }
3861
3862    #[test]
3863    fn test_validate_maximum_packet_size_at_limit() {
3864        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3865
3866        // Set a specific limit
3867        connection.maximum_packet_size_send = 1000;
3868
3869        // Should return true for size equal to limit
3870        let result = connection.validate_maximum_packet_size_send(1000);
3871        assert!(result);
3872    }
3873
3874    #[test]
3875    fn test_validate_maximum_packet_size_over_limit() {
3876        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3877
3878        // Set a specific limit
3879        connection.maximum_packet_size_send = 1000;
3880
3881        // Should return false for size over limit
3882        let result = connection.validate_maximum_packet_size_send(1001);
3883        assert!(!result);
3884    }
3885
3886    #[test]
3887    fn test_validate_maximum_packet_size_zero_limit() {
3888        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3889
3890        // Set limit to 0
3891        connection.maximum_packet_size_send = 0;
3892
3893        // Should return false for any non-zero size
3894        let result = connection.validate_maximum_packet_size_send(1);
3895        assert!(!result);
3896
3897        // Should return true for zero size
3898        let result = connection.validate_maximum_packet_size_send(0);
3899        assert!(result);
3900    }
3901
3902    #[test]
3903    fn test_initialize_clears_state() {
3904        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3905
3906        // Set up some state that should be cleared
3907        connection.publish_send_count = 5;
3908        connection.need_store = true;
3909        connection.pid_suback.insert(123);
3910        connection.pid_unsuback.insert(456);
3911
3912        // Initialize should clear state
3913        connection.initialize(true);
3914
3915        // Verify state is cleared
3916        assert_eq!(connection.publish_send_count, 0);
3917        assert!(!connection.need_store);
3918        assert!(connection.pid_suback.is_empty());
3919        assert!(connection.pid_unsuback.is_empty());
3920        assert!(connection.is_client);
3921    }
3922
3923    #[test]
3924    fn test_remaining_length_to_total_size() {
3925        // Test 1-byte remaining length encoding (0-127)
3926        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3927        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3928
3929        // Test 2-byte remaining length encoding (128-16383)
3930        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3931        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3932
3933        // Test 3-byte remaining length encoding (16384-2097151)
3934        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3935        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3936
3937        // Test 4-byte remaining length encoding (2097152-268435455)
3938        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3939        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3940    }
3941}