mqtt_protocol_core/mqtt/connection/
core.rs

1// MIT License
2//
3// Copyright (c) 2025 Takatoshi Kondo
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to deal
7// in the Software without restriction, including without limitation the rights
8// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9// copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in all
13// copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21// SOFTWARE.
22use alloc::{
23    string::{String, ToString},
24    vec::Vec,
25};
26use core::marker::PhantomData;
27
28use crate::mqtt::common::tracing::{error, info, trace, warn};
29use crate::mqtt::common::Cursor;
30use crate::mqtt::common::HashSet;
31use crate::mqtt::connection::event::{GenericEvent, TimerKind};
32use crate::mqtt::connection::GenericStore;
33
34use serde::Serialize;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
37enum ConnectionStatus {
38    #[serde(rename = "disconnected")]
39    Disconnected,
40    #[serde(rename = "connecting")]
41    Connecting,
42    #[serde(rename = "connected")]
43    Connected,
44}
45use crate::mqtt::connection::packet_builder::{
46    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
47};
48use crate::mqtt::connection::packet_id_manager::PacketIdManager;
49use crate::mqtt::connection::role;
50use crate::mqtt::connection::role::RoleType;
51use crate::mqtt::connection::sendable::Sendable;
52use crate::mqtt::connection::version::*;
53use crate::mqtt::packet::v3_1_1;
54use crate::mqtt::packet::v5_0;
55use crate::mqtt::packet::GenericPacket;
56use crate::mqtt::packet::GenericStorePacket;
57use crate::mqtt::packet::IsPacketId;
58use crate::mqtt::packet::Qos;
59use crate::mqtt::packet::ResponsePacket;
60use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
61use crate::mqtt::prelude::GenericPacketTrait;
62use crate::mqtt::result_code::{
63    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
64};
65
66/// MQTT protocol maximum packet size limit
67/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
68const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
69
70/// Calculate total packet size from remaining length
71///
72/// The total packet size consists of:
73/// - 1 byte for the fixed header
74/// - 1-4 bytes for the remaining length encoding
75/// - The remaining length value itself
76fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
77    let remaining_length_bytes = if remaining_length < 128 {
78        1
79    } else if remaining_length < 16384 {
80        2
81    } else if remaining_length < 2097152 {
82        3
83    } else {
84        4
85    };
86
87    1 + remaining_length_bytes + remaining_length
88}
89
90/// Type alias for Event with u16 packet ID (most common case)
91///
92/// This is a convenience type alias that most applications will use.
93/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
94pub type Event = GenericEvent<u16>;
95
96/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
97///
98/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
99/// It handles MQTT packet processing, state management, and protocol compliance without performing
100/// any actual network I/O operations. Instead, it returns events that the application must handle.
101///
102/// # Type Parameters
103///
104/// * `Role` - The connection role (Client or Server), determining allowed operations
105/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
106///
107/// # Key Features
108///
109/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
110/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
111/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
112/// - **Configurable Behavior**: Supports various configuration options for different use cases
113/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
114///
115/// # Usage
116///
117/// ```ignore
118/// use mqtt_protocol_core::mqtt;
119///
120/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
121///
122/// // Send a packet
123/// let events = connection.send(publish_packet);
124/// for event in events {
125///     match event {
126///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
127///             // Send packet over network
128///         },
129///         // Handle other events...
130///     }
131/// }
132///
133/// // Receive data
134/// let mut cursor = std::io::Cursor::new(received_data);
135/// let events = connection.recv(&mut cursor);
136/// // Process events...
137/// ```
138pub struct GenericConnection<Role, PacketIdType>
139where
140    Role: RoleType,
141    PacketIdType: IsPacketId,
142{
143    _marker: PhantomData<Role>,
144
145    protocol_version: Version,
146
147    pid_man: PacketIdManager<PacketIdType>,
148    pid_suback: HashSet<PacketIdType>,
149    pid_unsuback: HashSet<PacketIdType>,
150    pid_puback: HashSet<PacketIdType>,
151    pid_pubrec: HashSet<PacketIdType>,
152    pid_pubcomp: HashSet<PacketIdType>,
153
154    need_store: bool,
155    // Store for retransmission packets
156    store: GenericStore<PacketIdType>,
157
158    offline_publish: bool,
159    auto_pub_response: bool,
160    auto_ping_response: bool,
161
162    // Auto map topic alias for sending
163    auto_map_topic_alias_send: bool,
164    // Auto replace topic alias for sending
165    auto_replace_topic_alias_send: bool,
166    // Topic alias management for receiving
167    topic_alias_recv: Option<TopicAliasRecv>,
168    // Topic alias management for sending
169    topic_alias_send: Option<TopicAliasSend>,
170
171    publish_send_max: Option<u16>,
172    // Maximum number of concurrent PUBLISH packets for receiving
173    publish_recv_max: Option<u16>,
174    // Maximum number of concurrent PUBLISH packets for sending
175    // Current count of PUBLISH packets being sent
176    publish_send_count: u16,
177
178    // Set of received PUBLISH packets (for flow control)
179    publish_recv: HashSet<PacketIdType>,
180
181    // Maximum packet size for sending
182    maximum_packet_size_send: u32,
183    // Maximum packet size for receiving
184    maximum_packet_size_recv: u32,
185
186    // Connection state
187    status: ConnectionStatus,
188
189    // PINGREQ send interval in milliseconds
190    pingreq_send_interval_ms: Option<u64>,
191    // PINGREQ receive timeout in milliseconds
192    pingreq_recv_timeout_ms: Option<u64>,
193    // PINGRESP receive timeout in milliseconds
194    pingresp_recv_timeout_ms: Option<u64>,
195
196    // QoS2 PUBLISH packet handling state (for duplicate detection)
197    qos2_publish_handled: HashSet<PacketIdType>,
198    // QoS2 PUBLISH packet processing state
199    qos2_publish_processing: HashSet<PacketIdType>,
200
201    // Timer state flags
202    pingreq_send_set: bool,
203    pingreq_recv_set: bool,
204    pingresp_recv_set: bool,
205
206    packet_builder: PacketBuilder,
207    // Client/Server mode flag
208    is_client: bool,
209}
210
211/// Type alias for Connection with u16 packet ID (standard case)
212///
213/// This is the standard Connection type that most applications will use.
214/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
215/// supporting values from 1 to 65535.
216///
217/// For extended scenarios where larger packet ID ranges are needed
218/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
219///
220/// # Type Parameters
221///
222/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
223pub type Connection<Role> = GenericConnection<Role, u16>;
224
225impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
226where
227    Role: RoleType,
228    PacketIdType: IsPacketId,
229{
230    /// Create a new MQTT connection instance
231    ///
232    /// Initializes a new MQTT connection with the specified protocol version.
233    /// The connection starts in a disconnected state and must be activated through
234    /// the connection handshake process (CONNECT/CONNACK).
235    ///
236    /// # Parameters
237    ///
238    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
239    ///
240    /// # Returns
241    ///
242    /// A new `GenericConnection` instance ready for use
243    ///
244    /// # Examples
245    ///
246    /// ```ignore
247    /// use mqtt_protocol_core::mqtt;
248    ///
249    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
250    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
251    /// ```
252    pub fn new(version: Version) -> Self {
253        Self {
254            _marker: PhantomData,
255            protocol_version: version,
256            pid_man: PacketIdManager::new(),
257            pid_suback: HashSet::default(),
258            pid_unsuback: HashSet::default(),
259            pid_puback: HashSet::default(),
260            pid_pubrec: HashSet::default(),
261            pid_pubcomp: HashSet::default(),
262            need_store: false,
263            store: GenericStore::new(),
264            offline_publish: false,
265            auto_pub_response: false,
266            auto_ping_response: false,
267            auto_map_topic_alias_send: false,
268            auto_replace_topic_alias_send: false,
269            topic_alias_recv: None,
270            topic_alias_send: None,
271            publish_send_max: None,
272            publish_recv_max: None,
273            publish_send_count: 0,
274            publish_recv: HashSet::default(),
275            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
276            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
277            status: ConnectionStatus::Disconnected,
278            pingreq_send_interval_ms: None,
279            pingreq_recv_timeout_ms: None,
280            pingresp_recv_timeout_ms: None,
281            qos2_publish_handled: HashSet::default(),
282            qos2_publish_processing: HashSet::default(),
283            pingreq_send_set: false,
284            pingreq_recv_set: false,
285            pingresp_recv_set: false,
286            packet_builder: PacketBuilder::new(),
287            is_client: false,
288        }
289    }
290
291    // public
292
293    /// Send MQTT packet with compile-time role checking (experimental)
294    ///
295    /// This method provides compile-time verification that the packet being sent
296    /// is allowed for the current connection role (Client or Server). It only works
297    /// when the `Role` type parameter is concrete (not generic).
298    ///
299    /// This is an experimental API that may be subject to change. For general use,
300    /// consider using the `send()` method instead.
301    ///
302    /// # Type Parameters
303    ///
304    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
305    ///
306    /// # Parameters
307    ///
308    /// * `packet` - The MQTT packet to send
309    ///
310    /// # Returns
311    ///
312    /// A vector of events that the application must process
313    ///
314    /// # Compile-time Safety
315    ///
316    /// If the packet type is not allowed for the current role, this will result
317    /// in a compile-time error, preventing protocol violations at development time.
318    ///
319    /// # Examples
320    ///
321    /// ```ignore
322    /// use mqtt_protocol_core::mqtt;
323    ///
324    /// // This works for concrete roles
325    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
326    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
327    ///
328    /// // This would cause a compile error
329    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
330    /// ```
331    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
332    where
333        T: Sendable<Role, PacketIdType>,
334    {
335        // dispatch concrete packet or generic packet
336        packet.dispatch_send(self)
337    }
338
339    /// Send MQTT packet with runtime role validation
340    ///
341    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
342    /// and performs runtime validation to ensure the packet is allowed for the current
343    /// connection role. This provides flexibility when the exact packet type is not known
344    /// at compile time.
345    ///
346    /// # Parameters
347    ///
348    /// * `packet` - The MQTT packet to send
349    ///
350    /// # Returns
351    ///
352    /// A vector of events that the application must process. If the packet is not allowed
353    /// for the current role, a `NotifyError` event will be included.
354    ///
355    /// # Validation
356    ///
357    /// The method validates that:
358    /// - The packet type is allowed for the connection role (Client vs Server)
359    /// - Protocol version compatibility
360    /// - Connection state requirements
361    /// - Packet ID management for QoS > 0 packets
362    ///
363    /// # Examples
364    ///
365    /// ```ignore
366    /// use mqtt_protocol_core::mqtt;
367    ///
368    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
369    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
370    ///
371    /// for event in events {
372    ///     match event {
373    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
374    ///             // Send packet over network
375    ///         },
376    ///         mqtt::connection::Event::NotifyError(error) => {
377    ///             // Handle validation errors
378    ///         },
379    ///         // Handle other events...
380    ///     }
381    /// }
382    /// ```
383    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
384        use core::any::TypeId;
385
386        let role_id = TypeId::of::<Role>();
387        let client_id = TypeId::of::<role::Client>();
388        let server_id = TypeId::of::<role::Server>();
389        let any_id = TypeId::of::<role::Any>();
390
391        // Check version compatibility between connection and packet
392        let packet_version = packet.protocol_version();
393
394        // Return error if versions don't match
395        if self.protocol_version != packet_version {
396            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
397        }
398
399        match packet {
400            // CONNECT - Client/Any can send
401            GenericPacket::V3_1_1Connect(p) => {
402                if role_id == client_id || role_id == any_id {
403                    self.process_send_v3_1_1_connect(p)
404                } else {
405                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
406                }
407            }
408            GenericPacket::V5_0Connect(p) => {
409                if role_id == client_id || role_id == any_id {
410                    self.process_send_v5_0_connect(p)
411                } else {
412                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
413                }
414            }
415            // CONNACK - Server/Any can send
416            GenericPacket::V3_1_1Connack(p) => {
417                if role_id == server_id || role_id == any_id {
418                    self.process_send_v3_1_1_connack(p)
419                } else {
420                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
421                }
422            }
423            GenericPacket::V5_0Connack(p) => {
424                if role_id == server_id || role_id == any_id {
425                    self.process_send_v5_0_connack(p)
426                } else {
427                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
428                }
429            }
430            // PUBLISH - Any role can send
431            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
432            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
433            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
434            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
435            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
436            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
437            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
438            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
439            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
440            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
441            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
442            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
443            GenericPacket::V3_1_1Subscribe(p) => {
444                if role_id == client_id || role_id == any_id {
445                    self.process_send_v3_1_1_subscribe(p)
446                } else {
447                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
448                }
449            }
450            GenericPacket::V5_0Subscribe(p) => {
451                if role_id == client_id || role_id == any_id {
452                    self.process_send_v5_0_subscribe(p)
453                } else {
454                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
455                }
456            }
457            GenericPacket::V3_1_1Unsubscribe(p) => {
458                if role_id == client_id || role_id == any_id {
459                    self.process_send_v3_1_1_unsubscribe(p)
460                } else {
461                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
462                }
463            }
464            GenericPacket::V5_0Unsubscribe(p) => {
465                if role_id == client_id || role_id == any_id {
466                    self.process_send_v5_0_unsubscribe(p)
467                } else {
468                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
469                }
470            }
471            // SUBACK/UNSUBACK - Server/Any can send
472            GenericPacket::V3_1_1Suback(p) => {
473                if role_id == server_id || role_id == any_id {
474                    self.process_send_v3_1_1_suback(p)
475                } else {
476                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
477                }
478            }
479            GenericPacket::V5_0Suback(p) => {
480                if role_id == server_id || role_id == any_id {
481                    self.process_send_v5_0_suback(p)
482                } else {
483                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
484                }
485            }
486            GenericPacket::V3_1_1Unsuback(p) => {
487                if role_id == server_id || role_id == any_id {
488                    self.process_send_v3_1_1_unsuback(p)
489                } else {
490                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
491                }
492            }
493            GenericPacket::V5_0Unsuback(p) => {
494                if role_id == server_id || role_id == any_id {
495                    self.process_send_v5_0_unsuback(p)
496                } else {
497                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
498                }
499            }
500            // PINGREQ - Client/Any can send
501            GenericPacket::V3_1_1Pingreq(p) => {
502                if role_id == client_id || role_id == any_id {
503                    self.process_send_v3_1_1_pingreq(p)
504                } else {
505                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
506                }
507            }
508            GenericPacket::V5_0Pingreq(p) => {
509                if role_id == client_id || role_id == any_id {
510                    self.process_send_v5_0_pingreq(p)
511                } else {
512                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
513                }
514            }
515            // PINGRESP - Server/Any can send
516            GenericPacket::V3_1_1Pingresp(p) => {
517                if role_id == server_id || role_id == any_id {
518                    self.process_send_v3_1_1_pingresp(p)
519                } else {
520                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
521                }
522            }
523            GenericPacket::V5_0Pingresp(p) => {
524                if role_id == server_id || role_id == any_id {
525                    self.process_send_v5_0_pingresp(p)
526                } else {
527                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
528                }
529            }
530            // DISCONNECT(v3.1.1) - Client/Any role can send
531            GenericPacket::V3_1_1Disconnect(p) => {
532                if role_id == client_id || role_id == any_id {
533                    self.process_send_v3_1_1_disconnect(p)
534                } else {
535                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
536                }
537            }
538            // DISCONNECT(v5.0) - Any role can send
539            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
540            // AUTH - Any role can send (v5.0 only)
541            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
542        }
543    }
544
545    /// Receive and process incoming MQTT data
546    ///
547    /// This method processes raw bytes received from the network and attempts to
548    /// parse them into MQTT packets. It handles packet fragmentation and can
549    /// process multiple complete packets from a single data buffer.
550    ///
551    /// # Parameters
552    ///
553    /// * `data` - A cursor over the received data bytes. The cursor position will
554    ///   be advanced as data is consumed.
555    ///
556    /// # Returns
557    ///
558    /// A vector of events generated from processing the received data:
559    /// - `NotifyPacketReceived` for successfully parsed packets
560    /// - `NotifyError` for parsing errors or protocol violations
561    /// - Additional events based on packet processing (timers, responses, etc.)
562    ///
563    /// # Behavior
564    ///
565    /// - Handles partial packets (data will be buffered until complete)
566    /// - Processes multiple complete packets in sequence
567    /// - Validates packet structure and protocol compliance
568    /// - Updates internal connection state based on received packets
569    /// - Generates appropriate response events (ACKs, etc.)
570    ///
571    /// # Examples
572    ///
573    /// ```ignore
574    /// use mqtt_protocol_core::mqtt;
575    ///
576    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
577    /// let received_data = [/* network data */];
578    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
579    ///
580    /// let events = connection.recv(&mut cursor);
581    /// for event in events {
582    ///     match event {
583    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
584    ///             // Process received packet
585    ///         },
586    ///         mqtt::connection::Event::NotifyError(error) => {
587    ///             // Handle parsing/protocol errors
588    ///         },
589    ///         // Handle other events...
590    ///     }
591    /// }
592    /// ```
593    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
594        let mut events = Vec::new();
595
596        match self.packet_builder.feed(data) {
597            PacketBuildResult::Complete(raw_packet) => {
598                events.extend(self.process_recv_packet(raw_packet));
599            }
600            PacketBuildResult::Incomplete => {}
601            PacketBuildResult::Error(e) => {
602                self.cancel_timers(&mut events);
603                events.push(GenericEvent::RequestClose);
604                events.push(GenericEvent::NotifyError(e));
605            }
606        }
607
608        events
609    }
610
611    /// Notify that a timer has fired (Event-based API)
612    ///
613    /// This method should be called when the I/O layer detects that a timer has expired.
614    /// It handles the timer event appropriately and returns events that need to be processed.
615    /// Notify that a timer has fired
616    ///
617    /// This method should be called by the application when a previously requested
618    /// timer expires. The connection will take appropriate action based on the timer type.
619    ///
620    /// # Parameters
621    ///
622    /// * `kind` - The type of timer that fired
623    ///
624    /// # Returns
625    ///
626    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
627    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
628        let mut events = Vec::new();
629
630        match kind {
631            TimerKind::PingreqSend => {
632                // Reset timer flag
633                self.pingreq_send_set = false;
634
635                // Send PINGREQ if connected
636                if self.status == ConnectionStatus::Connected {
637                    match self.protocol_version {
638                        Version::V3_1_1 => {
639                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
640                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
641                            }
642                        }
643                        Version::V5_0 => {
644                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
645                                events.extend(self.process_send_v5_0_pingreq(pingreq));
646                            }
647                        }
648                        Version::Undetermined => {
649                            unreachable!("Protocol version should be set before sending PINGREQ");
650                        }
651                    }
652                }
653            }
654            TimerKind::PingreqRecv => {
655                // Reset timer flag
656                self.pingreq_recv_set = false;
657
658                match self.protocol_version {
659                    Version::V3_1_1 => {
660                        // V3.1.1: Close connection
661                        events.push(GenericEvent::RequestClose);
662                    }
663                    Version::V5_0 => {
664                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
665                        if self.status == ConnectionStatus::Connected {
666                            if let Ok(disconnect) = v5_0::Disconnect::builder()
667                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
668                                .build()
669                            {
670                                events.extend(self.process_send_v5_0_disconnect(disconnect));
671                            }
672                        }
673                    }
674                    Version::Undetermined => {
675                        unreachable!("Protocol version should be set before receiving PINGREQ");
676                    }
677                }
678            }
679            TimerKind::PingrespRecv => {
680                // Reset timer flag
681                self.pingresp_recv_set = false;
682
683                match self.protocol_version {
684                    Version::V3_1_1 => {
685                        // V3.1.1: Close connection
686                        events.push(GenericEvent::RequestClose);
687                    }
688                    Version::V5_0 => {
689                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
690                        if self.status == ConnectionStatus::Connected {
691                            if let Ok(disconnect) = v5_0::Disconnect::builder()
692                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
693                                .build()
694                            {
695                                events.extend(self.process_send_v5_0_disconnect(disconnect));
696                            }
697                        }
698                    }
699                    Version::Undetermined => {
700                        unreachable!("Protocol version should be set before receiving PINGRESP");
701                    }
702                }
703            }
704        }
705
706        events
707    }
708
709    /// Notify that the connection has been closed by the I/O layer (Event-based API)
710    ///
711    /// This method should be called when the I/O layer detects that the socket has been closed.
712    /// It updates the internal state appropriately and returns events that need to be processed.
713    /// Notify that the underlying connection has been closed
714    ///
715    /// This method should be called when the network connection is closed,
716    /// either intentionally or due to network issues.
717    ///
718    /// # Returns
719    ///
720    /// Events generated from connection closure processing
721    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
722        let mut events = Vec::new();
723
724        // Reset packet size limits to MQTT protocol maximum
725        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
726        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
727
728        // Set status to disconnected
729        self.status = ConnectionStatus::Disconnected;
730
731        // Clear topic alias management
732        self.topic_alias_send = None;
733        self.topic_alias_recv = None;
734
735        // Release packet IDs for SUBACK
736        for packet_id in self.pid_suback.drain() {
737            if self.pid_man.is_used_id(packet_id) {
738                self.pid_man.release_id(packet_id);
739                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
740            }
741        }
742
743        // Release packet IDs for UNSUBACK
744        for packet_id in self.pid_unsuback.drain() {
745            if self.pid_man.is_used_id(packet_id) {
746                self.pid_man.release_id(packet_id);
747                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
748            }
749        }
750
751        // If not storing session state, clear QoS2 states and release publish-related packet IDs
752        if !self.need_store {
753            self.qos2_publish_processing.clear();
754            self.qos2_publish_handled.clear();
755
756            // Release packet IDs for PUBACK
757            for packet_id in self.pid_puback.drain() {
758                if self.pid_man.is_used_id(packet_id) {
759                    self.pid_man.release_id(packet_id);
760                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
761                }
762            }
763
764            // Release packet IDs for PUBREC
765            for packet_id in self.pid_pubrec.drain() {
766                if self.pid_man.is_used_id(packet_id) {
767                    self.pid_man.release_id(packet_id);
768                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
769                }
770            }
771
772            // Release packet IDs for PUBCOMP
773            for packet_id in self.pid_pubcomp.drain() {
774                if self.pid_man.is_used_id(packet_id) {
775                    self.pid_man.release_id(packet_id);
776                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
777                }
778            }
779        }
780
781        // Cancel all timers
782        self.cancel_timers(&mut events);
783
784        events
785    }
786
787    /// Set the PINGREQ send interval
788    ///
789    /// Sets the interval for sending PINGREQ packets to maintain the connection alive.
790    /// When changed, this may generate timer-related events to update the ping schedule.
791    ///
792    /// # Parameters
793    ///
794    /// * `duration_ms` - The interval in milliseconds between PINGREQ packets
795    ///
796    /// # Returns
797    ///
798    /// Events generated from updating the ping interval
799    pub fn set_pingreq_send_interval(
800        &mut self,
801        duration_ms: u64,
802    ) -> Vec<GenericEvent<PacketIdType>> {
803        let mut events = Vec::new();
804
805        if duration_ms == 0 {
806            self.pingreq_send_interval_ms = None;
807            self.pingreq_send_set = false;
808            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
809        } else {
810            self.pingreq_send_interval_ms = Some(duration_ms);
811            self.pingreq_send_set = true;
812            events.push(GenericEvent::RequestTimerReset {
813                kind: TimerKind::PingreqSend,
814                duration_ms,
815            });
816        }
817
818        events
819    }
820
821    /// Get the remaining capacity for sending PUBLISH packets
822    ///
823    /// Returns the number of additional PUBLISH packets that can be sent
824    /// without exceeding the receive maximum limit.
825    ///
826    /// # Returns
827    ///
828    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
829    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
830        // If publish_recv_max is set, return the remaining capacity
831        self.publish_send_max
832            .map(|max| max.saturating_sub(self.publish_send_count))
833    }
834
835    /// Enable or disable offline publishing
836    ///
837    /// When enabled, PUBLISH packets can be sent even when disconnected.
838    /// They will be queued and sent once the connection is established.
839    ///
840    /// # Parameters
841    ///
842    /// * `enable` - Whether to enable offline publishing
843    pub fn set_offline_publish(&mut self, enable: bool) {
844        self.offline_publish = enable;
845        if self.offline_publish {
846            self.need_store = true;
847        }
848    }
849
850    /// Enable or disable automatic PUBLISH response generation
851    ///
852    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
853    /// are automatically generated for received PUBLISH packets.
854    ///
855    /// # Parameters
856    ///
857    /// * `enable` - Whether to enable automatic responses
858    pub fn set_auto_pub_response(&mut self, enable: bool) {
859        self.auto_pub_response = enable;
860    }
861
862    /// Enable or disable automatic PING response generation
863    ///
864    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
865    ///
866    /// # Parameters
867    ///
868    /// * `enable` - Whether to enable automatic PING responses
869    pub fn set_auto_ping_response(&mut self, enable: bool) {
870        self.auto_ping_response = enable;
871    }
872
873    /// Enable or disable automatic topic alias mapping for outgoing packets
874    ///
875    /// When enabled, the connection will automatically map topics to aliases
876    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
877    /// - Applying existing registered topic aliases when available
878    /// - Allocating new topic aliases for unregistered topics
879    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
880    ///
881    /// # Parameters
882    ///
883    /// * `enable` - Whether to enable automatic topic alias mapping
884    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
885        self.auto_map_topic_alias_send = enable;
886    }
887
888    /// Enable or disable automatic topic alias replacement for outgoing packets
889    ///
890    /// When enabled, the connection will automatically apply existing registered
891    /// topic aliases to outgoing PUBLISH packets when aliases are available.
892    /// This only uses previously registered aliases and does not allocate new ones.
893    ///
894    /// # Parameters
895    ///
896    /// * `enable` - Whether to enable automatic topic alias replacement
897    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
898        self.auto_replace_topic_alias_send = enable;
899    }
900
901    /// Set PINGREQ receive timeout
902    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
903        self.pingresp_recv_timeout_ms = timeout_ms;
904    }
905
906    /// Acquire a new packet ID for outgoing packets
907    ///
908    /// # Returns
909    ///
910    /// A unique packet ID, or an error if none are available
911    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
912        self.pid_man.acquire_unique_id()
913    }
914
915    /// Register a packet ID as in use
916    ///
917    /// Manually registers a specific packet ID as being in use, preventing
918    /// it from being allocated by `acquire_packet_id()`.
919    ///
920    /// # Parameters
921    ///
922    /// * `packet_id` - The packet ID to register as in use
923    ///
924    /// # Returns
925    ///
926    /// `Ok(())` if successful, or an error if the packet ID is already in use
927    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
928        self.pid_man.register_id(packet_id)
929    }
930
931    /// Release packet ID (Event-based API)
932    /// Release a packet ID for reuse
933    ///
934    /// This method releases a packet ID, making it available for future use.
935    /// It also generates a `NotifyPacketIdReleased` event.
936    ///
937    /// # Parameters
938    ///
939    /// * `packet_id` - The packet ID to release
940    ///
941    /// # Returns
942    ///
943    /// Events generated from releasing the packet ID
944    pub fn release_packet_id(
945        &mut self,
946        packet_id: PacketIdType,
947    ) -> Vec<GenericEvent<PacketIdType>> {
948        let mut events = Vec::new();
949
950        if self.pid_man.is_used_id(packet_id) {
951            self.pid_man.release_id(packet_id);
952            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
953        }
954
955        events
956    }
957
958    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
959    ///
960    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
961    /// that have been successfully processed and handled.
962    ///
963    /// # Returns
964    ///
965    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
966    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
967        self.qos2_publish_handled.clone()
968    }
969
970    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
971    ///
972    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
973    /// typically used when resuming a connection from persistent storage.
974    ///
975    /// # Parameters
976    ///
977    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
978    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
979        self.qos2_publish_handled = pids;
980    }
981
982    /// Restore previously stored packets
983    ///
984    /// This method restores packets that were previously stored for persistence,
985    /// typically called when resuming a session.
986    ///
987    /// # Parameters
988    ///
989    /// * `packets` - Vector of packets to restore
990    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
991        for packet in packets {
992            match &packet {
993                GenericStorePacket::V3_1_1Publish(p) => {
994                    // Add to appropriate QoS tracking set
995                    match p.qos() {
996                        Qos::AtLeastOnce => {
997                            self.pid_puback.insert(p.packet_id().unwrap());
998                        }
999                        Qos::ExactlyOnce => {
1000                            self.pid_pubrec.insert(p.packet_id().unwrap());
1001                        }
1002                        _ => {
1003                            // QoS 0 shouldn't be in store, but handle gracefully
1004                            warn!("QoS 0 packet found in store, skipping");
1005                            continue;
1006                        }
1007                    }
1008                    // Register packet ID and add to store
1009                    let packet_id = p.packet_id().unwrap();
1010                    if self.pid_man.register_id(packet_id).is_ok() {
1011                        if let Err(_e) = self.store.add(packet) {
1012                            error!("Failed to add packet to store: {:?}", _e);
1013                        }
1014                    } else {
1015                        error!("Packet ID {} has already been used. Skip it", packet_id);
1016                    }
1017                }
1018                GenericStorePacket::V5_0Publish(p) => {
1019                    // Add to appropriate QoS tracking set
1020                    match p.qos() {
1021                        Qos::AtLeastOnce => {
1022                            self.pid_puback.insert(p.packet_id().unwrap());
1023                        }
1024                        Qos::ExactlyOnce => {
1025                            self.pid_pubrec.insert(p.packet_id().unwrap());
1026                        }
1027                        _ => {
1028                            // QoS 0 shouldn't be in store, but handle gracefully
1029                            warn!("QoS 0 packet found in store, skipping");
1030                            continue;
1031                        }
1032                    }
1033                    // Register packet ID and add to store
1034                    let packet_id = p.packet_id().unwrap();
1035                    if self.pid_man.register_id(packet_id).is_ok() {
1036                        if let Err(_e) = self.store.add(packet) {
1037                            error!("Failed to add packet to store: {:?}", _e);
1038                        }
1039                    } else {
1040                        error!("Packet ID {} has already been used. Skip it", packet_id);
1041                    }
1042                }
1043                GenericStorePacket::V3_1_1Pubrel(p) => {
1044                    // Pubrel packets expect PUBCOMP response
1045                    self.pid_pubcomp.insert(p.packet_id());
1046                    // Register packet ID and add to store
1047                    let packet_id = p.packet_id();
1048                    if self.pid_man.register_id(packet_id).is_ok() {
1049                        if let Err(_e) = self.store.add(packet) {
1050                            error!("Failed to add packet to store: {:?}", _e);
1051                        }
1052                    } else {
1053                        error!("Packet ID {} has already been used. Skip it", packet_id);
1054                    }
1055                }
1056                GenericStorePacket::V5_0Pubrel(p) => {
1057                    // Pubrel packets expect PUBCOMP response
1058                    self.pid_pubcomp.insert(p.packet_id());
1059                    // Register packet ID and add to store
1060                    let packet_id = p.packet_id();
1061                    if self.pid_man.register_id(packet_id).is_ok() {
1062                        if let Err(_e) = self.store.add(packet) {
1063                            error!("Failed to add packet to store: {:?}", _e);
1064                        }
1065                    } else {
1066                        error!("Packet ID {} has already been used. Skip it", packet_id);
1067                    }
1068                }
1069            }
1070        }
1071    }
1072
1073    /// Get stored packets for persistence
1074    ///
1075    /// Returns packets that need to be stored for potential retransmission.
1076    /// This is useful for implementing persistent sessions.
1077    ///
1078    /// # Returns
1079    ///
1080    /// Vector of packets that should be persisted
1081    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1082        self.store.get_stored()
1083    }
1084
1085    /// Get the MQTT protocol version being used
1086    ///
1087    /// # Returns
1088    ///
1089    /// The protocol version (V3_1_1 or V5_0)
1090    pub fn get_protocol_version(&self) -> Version {
1091        self.protocol_version
1092    }
1093
1094    /// Check if a PUBLISH packet is currently being processed
1095    ///
1096    /// # Parameters
1097    ///
1098    /// * `packet_id` - The packet ID to check
1099    ///
1100    /// # Returns
1101    ///
1102    /// True if the packet ID is in use for PUBLISH processing
1103    pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1104        self.qos2_publish_processing.contains(&packet_id)
1105    }
1106
1107    /// Regulate packet for store (remove/resolve topic alias)
1108    ///
1109    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1110    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1111    pub fn regulate_for_store(
1112        &self,
1113        mut packet: v5_0::GenericPublish<PacketIdType>,
1114    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1115        if packet.topic_name().is_empty() {
1116            // Topic is empty, need to resolve from topic alias
1117            if let Some(topic_alias) = Self::get_topic_alias_from_props(packet.props()) {
1118                if let Some(ref topic_alias_send) = self.topic_alias_send {
1119                    if let Some(topic) = topic_alias_send.peek(topic_alias) {
1120                        // Found topic for alias, add topic and remove alias property
1121                        packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1122                    } else {
1123                        return Err(MqttError::PacketNotRegulated);
1124                    }
1125                } else {
1126                    return Err(MqttError::PacketNotRegulated);
1127                }
1128            } else {
1129                return Err(MqttError::PacketNotRegulated);
1130            }
1131        } else {
1132            // Topic is not empty, just remove TopicAlias property if present
1133            packet = packet.remove_topic_alias();
1134        }
1135
1136        Ok(packet)
1137    }
1138
1139    // private
1140
1141    /// Initialize connection state based on client/server role
1142    ///
1143    /// Resets all connection-specific state including:
1144    /// - Publish flow control counters and limits
1145    /// - Topic alias management
1146    /// - QoS2 processing state
1147    /// - Packet ID tracking sets
1148    /// - Store requirement flag
1149    ///
1150    /// # Parameters
1151    /// * `is_client` - true for client mode, false for server mode
1152    fn initialize(&mut self, is_client: bool) {
1153        self.publish_send_max = None;
1154        self.publish_recv_max = None;
1155        self.publish_send_count = 0;
1156        self.topic_alias_send = None;
1157        self.topic_alias_recv = None;
1158        self.publish_recv.clear();
1159        self.qos2_publish_processing.clear();
1160        self.need_store = false;
1161        self.pid_suback.clear();
1162        self.pid_unsuback.clear();
1163        self.is_client = is_client;
1164    }
1165
1166    fn clear_store_related(&mut self) {
1167        self.pid_man.clear();
1168        self.pid_puback.clear();
1169        self.pid_pubrec.clear();
1170        self.pid_pubcomp.clear();
1171        self.store.clear();
1172    }
1173
1174    /// Send all stored packets for retransmission
1175    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1176        let mut events = Vec::new();
1177        self.store.for_each(|packet| {
1178            if packet.size() > self.maximum_packet_size_send as usize {
1179                let packet_id = packet.packet_id();
1180                self.pid_man.release_id(packet_id);
1181                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1182                return false; // Remove from store
1183            }
1184            events.push(GenericEvent::RequestSendPacket {
1185                packet: packet.clone().into(),
1186                release_packet_id_if_send_error: None,
1187            });
1188            true // Keep in store
1189        });
1190
1191        events
1192    }
1193
1194    /// Validate topic alias and return the associated topic name
1195    ///
1196    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1197    /// from the topic alias send manager.
1198    ///
1199    /// # Parameters
1200    /// * `topic_alias_opt` - Optional topic alias value
1201    ///
1202    /// # Returns
1203    /// * `Some(topic_name)` if the topic alias is valid and found
1204    /// * `None` if the topic alias is invalid, not provided, or not found
1205    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1206        let topic_alias = topic_alias_opt?;
1207
1208        if !self.validate_topic_alias_range(topic_alias) {
1209            return None;
1210        }
1211
1212        let topic_alias_send = self.topic_alias_send.as_mut()?;
1213        // LRU updated here
1214        let topic = topic_alias_send.get(topic_alias)?;
1215
1216        Some(topic.to_string())
1217    }
1218
1219    /// Validate that topic alias is within the allowed range
1220    ///
1221    /// Checks if the topic alias value is valid according to the configured
1222    /// topic alias maximum for sending.
1223    ///
1224    /// # Parameters
1225    /// * `topic_alias` - Topic alias value to validate
1226    ///
1227    /// # Returns
1228    /// * `true` if the topic alias is within valid range
1229    /// * `false` if invalid or topic alias sending is not configured
1230    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1231        let topic_alias_send = match &self.topic_alias_send {
1232            Some(tas) => tas,
1233            None => {
1234                error!("topic_alias is set but topic_alias_maximum is 0");
1235                return false;
1236            }
1237        };
1238
1239        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1240            error!("topic_alias is set but out of range");
1241            return false;
1242        }
1243
1244        true
1245    }
1246
1247    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1248    pub(crate) fn process_send_v3_1_1_connect(
1249        &mut self,
1250        packet: v3_1_1::Connect,
1251    ) -> Vec<GenericEvent<PacketIdType>> {
1252        info!("send connect v3.1.1: {packet}");
1253
1254        if self.status != ConnectionStatus::Disconnected {
1255            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1256        }
1257        if !self.validate_maximum_packet_size_send(packet.size()) {
1258            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1259        }
1260
1261        let mut events = Vec::new();
1262        self.initialize(true);
1263        self.status = ConnectionStatus::Connecting;
1264
1265        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1266        let keep_alive = packet.keep_alive();
1267        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1268            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1269        }
1270
1271        // Handle clean_session flag
1272        if packet.clean_start() {
1273            self.clear_store_related();
1274        } else {
1275            self.need_store = true;
1276        }
1277
1278        // Clear topic alias for sending
1279        self.topic_alias_send = None;
1280
1281        events.push(GenericEvent::RequestSendPacket {
1282            packet: packet.into(),
1283            release_packet_id_if_send_error: None,
1284        });
1285        self.send_post_process(&mut events);
1286
1287        events
1288    }
1289
1290    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1291    pub(crate) fn process_send_v5_0_connect(
1292        &mut self,
1293        packet: v5_0::Connect,
1294    ) -> Vec<GenericEvent<PacketIdType>> {
1295        info!("send connect v5.0: {packet}");
1296        if !self.validate_maximum_packet_size_send(packet.size()) {
1297            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1298        }
1299        if self.status != ConnectionStatus::Disconnected {
1300            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1301        }
1302
1303        let mut events = Vec::new();
1304        self.initialize(true);
1305        self.status = ConnectionStatus::Connecting;
1306
1307        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1308        let keep_alive = packet.keep_alive();
1309        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1310            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1311        }
1312
1313        // Handle clean_start flag
1314        if packet.clean_start() {
1315            self.clear_store_related();
1316        }
1317
1318        // Process properties
1319        for prop in packet.props() {
1320            match prop {
1321                Property::TopicAliasMaximum(val) => {
1322                    if val.val() != 0 {
1323                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1324                    }
1325                }
1326                Property::ReceiveMaximum(val) => {
1327                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1328                    self.publish_recv_max = Some(val.val());
1329                }
1330                Property::MaximumPacketSize(val) => {
1331                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1332                    self.maximum_packet_size_recv = val.val();
1333                }
1334                Property::SessionExpiryInterval(val) => {
1335                    if val.val() != 0 {
1336                        self.need_store = true;
1337                    }
1338                }
1339                _ => {
1340                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1341                }
1342            }
1343        }
1344
1345        events.push(GenericEvent::RequestSendPacket {
1346            packet: packet.into(),
1347            release_packet_id_if_send_error: None,
1348        });
1349        self.send_post_process(&mut events);
1350
1351        events
1352    }
1353
1354    pub(crate) fn process_send_v3_1_1_connack(
1355        &mut self,
1356        packet: v3_1_1::Connack,
1357    ) -> Vec<GenericEvent<PacketIdType>> {
1358        info!("send connack v3.1.1: {packet}");
1359        if self.status != ConnectionStatus::Connecting {
1360            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1361        }
1362        let mut events = Vec::new();
1363        if packet.return_code() == ConnectReturnCode::Accepted {
1364            self.status = ConnectionStatus::Connected;
1365        } else {
1366            self.status = ConnectionStatus::Disconnected;
1367        }
1368
1369        events.push(GenericEvent::RequestSendPacket {
1370            packet: packet.into(),
1371            release_packet_id_if_send_error: None,
1372        });
1373        events.extend(self.send_stored());
1374        self.send_post_process(&mut events);
1375
1376        events
1377    }
1378
1379    pub(crate) fn process_send_v5_0_connack(
1380        &mut self,
1381        packet: v5_0::Connack,
1382    ) -> Vec<GenericEvent<PacketIdType>> {
1383        info!("send connack v5.0: {packet}");
1384        if !self.validate_maximum_packet_size_send(packet.size()) {
1385            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1386        }
1387        if self.status != ConnectionStatus::Connecting {
1388            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1389        }
1390
1391        let mut events = Vec::new();
1392
1393        if packet.reason_code() == ConnectReasonCode::Success {
1394            self.status = ConnectionStatus::Connected;
1395
1396            // Process properties
1397            for prop in packet.props() {
1398                match prop {
1399                    Property::TopicAliasMaximum(val) => {
1400                        if val.val() != 0 {
1401                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1402                        }
1403                    }
1404                    Property::ReceiveMaximum(val) => {
1405                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1406                        self.publish_recv_max = Some(val.val());
1407                    }
1408                    Property::MaximumPacketSize(val) => {
1409                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1410                        self.maximum_packet_size_recv = val.val();
1411                    }
1412                    _ => {
1413                        // Ignore other properties
1414                    }
1415                }
1416            }
1417        } else {
1418            self.status = ConnectionStatus::Disconnected;
1419            self.cancel_timers(&mut events);
1420        }
1421
1422        events.push(GenericEvent::RequestSendPacket {
1423            packet: packet.into(),
1424            release_packet_id_if_send_error: None,
1425        });
1426        events.extend(self.send_stored());
1427        self.send_post_process(&mut events);
1428
1429        events
1430    }
1431
1432    pub(crate) fn process_send_v3_1_1_publish(
1433        &mut self,
1434        packet: v3_1_1::GenericPublish<PacketIdType>,
1435    ) -> Vec<GenericEvent<PacketIdType>> {
1436        let mut events = Vec::new();
1437        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1438
1439        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1440            // Register packet ID for QoS 1 or 2
1441            let packet_id = packet.packet_id().unwrap();
1442            if self.status != ConnectionStatus::Connected
1443                && !self.need_store
1444                && !self.offline_publish
1445            {
1446                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1447                if self.pid_man.is_used_id(packet_id) {
1448                    self.pid_man.release_id(packet_id);
1449                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1450                }
1451                return events;
1452            }
1453            if !self.pid_man.is_used_id(packet_id) {
1454                error!("packet_id {packet_id} must be acquired or registered");
1455                events.push(GenericEvent::NotifyError(
1456                    MqttError::PacketIdentifierInvalid,
1457                ));
1458                return events;
1459            }
1460            if self.need_store
1461                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1462            {
1463                let store_packet = packet.clone().set_dup(true);
1464                self.store.add(store_packet.try_into().unwrap()).unwrap();
1465            } else {
1466                release_packet_id_if_send_error = Some(packet_id);
1467            }
1468            if packet.qos() == Qos::ExactlyOnce {
1469                self.qos2_publish_processing.insert(packet_id);
1470                self.pid_pubrec.insert(packet_id);
1471            } else {
1472                self.pid_puback.insert(packet_id);
1473            }
1474        } else if self.status != ConnectionStatus::Connected {
1475            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1476            return events;
1477        }
1478
1479        if self.status == ConnectionStatus::Connected {
1480            events.push(GenericEvent::RequestSendPacket {
1481                packet: packet.into(),
1482                release_packet_id_if_send_error,
1483            });
1484        }
1485        self.send_post_process(&mut events);
1486
1487        events
1488    }
1489
1490    pub(crate) fn process_send_v5_0_publish(
1491        &mut self,
1492        mut packet: v5_0::GenericPublish<PacketIdType>,
1493    ) -> Vec<GenericEvent<PacketIdType>> {
1494        if !self.validate_maximum_packet_size_send(packet.size()) {
1495            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1496        }
1497
1498        let mut events = Vec::new();
1499        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1500        let mut topic_alias_validated = false;
1501        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1502            let packet_id = packet.packet_id().unwrap();
1503            if self.status != ConnectionStatus::Connected
1504                && !self.need_store
1505                && !self.offline_publish
1506            {
1507                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1508                if self.pid_man.is_used_id(packet_id) {
1509                    self.pid_man.release_id(packet_id);
1510                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1511                }
1512                return events;
1513            }
1514
1515            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1516            if !self.pid_man.is_used_id(packet_id) {
1517                error!("packet_id {packet_id} must be acquired or registered");
1518                events.push(GenericEvent::NotifyError(
1519                    MqttError::PacketIdentifierInvalid,
1520                ));
1521                return events;
1522            }
1523
1524            if self.need_store
1525                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1526            {
1527                let ta_opt = Self::get_topic_alias_from_props(packet.props());
1528                if packet.topic_name().is_empty() {
1529                    // Topic name is empty, must validate topic alias
1530                    let topic_opt = self.validate_topic_alias(ta_opt);
1531                    if topic_opt.is_none() {
1532                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1533                        if self.pid_man.is_used_id(packet_id) {
1534                            self.pid_man.release_id(packet_id);
1535                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1536                        }
1537                        return events;
1538                    }
1539                    topic_alias_validated = true;
1540                    let store_packet = packet
1541                        .clone()
1542                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1543                        .unwrap()
1544                        .set_dup(true);
1545                    // TBD validate_maximum_packet_size(store_packet.size());
1546                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1547                } else {
1548                    // Topic name is not empty, remove topic alias if present
1549                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1550                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1551                }
1552            } else {
1553                release_packet_id_if_send_error = Some(packet_id);
1554            }
1555            if packet.qos() == Qos::ExactlyOnce {
1556                self.qos2_publish_processing.insert(packet_id);
1557                self.pid_pubrec.insert(packet_id);
1558            } else {
1559                self.pid_puback.insert(packet_id);
1560            }
1561        } else if self.status != ConnectionStatus::Connected {
1562            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1563            return events;
1564        }
1565
1566        let packet_id_opt = packet.packet_id();
1567        let ta_opt = Self::get_topic_alias_from_props(packet.props());
1568        if packet.topic_name().is_empty() {
1569            // process manually provided TopicAlias
1570            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1571                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1572                if let Some(packet_id) = packet_id_opt {
1573                    if self.pid_man.is_used_id(packet_id) {
1574                        self.pid_man.release_id(packet_id);
1575                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1576                    }
1577                }
1578                return events;
1579            }
1580        } else if let Some(ta) = ta_opt {
1581            // Topic alias is provided
1582            if self.validate_topic_alias_range(ta) {
1583                trace!(
1584                    "topic alias: {} - {} is registered.",
1585                    packet.topic_name(),
1586                    ta
1587                );
1588                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1589                    topic_alias_send.insert_or_update(packet.topic_name(), ta);
1590                }
1591            } else {
1592                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1593                if let Some(packet_id) = packet_id_opt {
1594                    if self.pid_man.is_used_id(packet_id) {
1595                        self.pid_man.release_id(packet_id);
1596                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1597                    }
1598                }
1599                return events;
1600            }
1601        } else if self.status == ConnectionStatus::Connected {
1602            // process auto applying TopicAlias if the option is enabled
1603            if self.auto_map_topic_alias_send {
1604                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1605                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1606                        trace!(
1607                            "topic alias: {} - {} is found.",
1608                            packet.topic_name(),
1609                            found_ta
1610                        );
1611                        packet = packet.remove_topic_add_topic_alias(found_ta);
1612                    } else {
1613                        let lru_ta = topic_alias_send.get_lru_alias();
1614                        topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1615                        packet = packet.remove_topic_add_topic_alias(lru_ta);
1616                    }
1617                }
1618            } else if self.auto_replace_topic_alias_send {
1619                if let Some(ref topic_alias_send) = self.topic_alias_send {
1620                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1621                        trace!(
1622                            "topic alias: {} - {} is found.",
1623                            packet.topic_name(),
1624                            found_ta
1625                        );
1626                        packet = packet.remove_topic_add_topic_alias(found_ta);
1627                    }
1628                }
1629            }
1630        }
1631
1632        // Check receive_maximum for sending (QoS 1 and 2 packets)
1633        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1634            if let Some(max) = self.publish_send_max {
1635                if self.publish_send_count == max {
1636                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1637                    if let Some(packet_id) = packet_id_opt {
1638                        if self.pid_man.is_used_id(packet_id) {
1639                            self.pid_man.release_id(packet_id);
1640                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1641                        }
1642                    }
1643                    return events;
1644                }
1645                self.publish_send_count += 1;
1646            }
1647        }
1648
1649        if self.status == ConnectionStatus::Connected {
1650            events.push(GenericEvent::RequestSendPacket {
1651                packet: packet.into(),
1652                release_packet_id_if_send_error,
1653            });
1654        }
1655        self.send_post_process(&mut events);
1656
1657        events
1658    }
1659
1660    pub(crate) fn process_send_v3_1_1_puback(
1661        &mut self,
1662        packet: v3_1_1::GenericPuback<PacketIdType>,
1663    ) -> Vec<GenericEvent<PacketIdType>> {
1664        if self.status != ConnectionStatus::Connected {
1665            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1666        }
1667        let mut events = Vec::new();
1668
1669        events.push(GenericEvent::RequestSendPacket {
1670            packet: packet.into(),
1671            release_packet_id_if_send_error: None,
1672        });
1673        self.send_post_process(&mut events);
1674
1675        events
1676    }
1677
1678    pub(crate) fn process_send_v5_0_puback(
1679        &mut self,
1680        packet: v5_0::GenericPuback<PacketIdType>,
1681    ) -> Vec<GenericEvent<PacketIdType>> {
1682        if !self.validate_maximum_packet_size_send(packet.size()) {
1683            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1684        }
1685        if self.status != ConnectionStatus::Connected {
1686            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1687        }
1688
1689        let mut events = Vec::new();
1690        self.publish_recv.remove(&packet.packet_id());
1691
1692        events.push(GenericEvent::RequestSendPacket {
1693            packet: packet.into(),
1694            release_packet_id_if_send_error: None,
1695        });
1696        self.send_post_process(&mut events);
1697
1698        events
1699    }
1700
1701    pub(crate) fn process_send_v3_1_1_pubrec(
1702        &mut self,
1703        packet: v3_1_1::GenericPubrec<PacketIdType>,
1704    ) -> Vec<GenericEvent<PacketIdType>> {
1705        if self.status != ConnectionStatus::Connected {
1706            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1707        }
1708        let mut events = Vec::new();
1709
1710        events.push(GenericEvent::RequestSendPacket {
1711            packet: packet.into(),
1712            release_packet_id_if_send_error: None,
1713        });
1714        self.send_post_process(&mut events);
1715
1716        events
1717    }
1718
1719    pub(crate) fn process_send_v5_0_pubrec(
1720        &mut self,
1721        packet: v5_0::GenericPubrec<PacketIdType>,
1722    ) -> Vec<GenericEvent<PacketIdType>> {
1723        if !self.validate_maximum_packet_size_send(packet.size()) {
1724            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1725        }
1726        if self.status != ConnectionStatus::Connected {
1727            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1728        }
1729
1730        let mut events = Vec::new();
1731        let packet_id = packet.packet_id();
1732
1733        if let Some(rc) = packet.reason_code() {
1734            if rc.is_failure() {
1735                self.publish_recv.remove(&packet_id);
1736                self.qos2_publish_handled.remove(&packet_id);
1737            }
1738        }
1739
1740        events.push(GenericEvent::RequestSendPacket {
1741            packet: packet.into(),
1742            release_packet_id_if_send_error: None,
1743        });
1744        self.send_post_process(&mut events);
1745
1746        events
1747    }
1748
1749    pub(crate) fn process_send_v3_1_1_pubrel(
1750        &mut self,
1751        packet: v3_1_1::GenericPubrel<PacketIdType>,
1752    ) -> Vec<GenericEvent<PacketIdType>> {
1753        if self.status != ConnectionStatus::Connected && !self.need_store {
1754            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1755        }
1756        let mut events = Vec::new();
1757        let packet_id = packet.packet_id();
1758        if !self.pid_man.is_used_id(packet_id) {
1759            error!("packet_id {packet_id} must be acquired or registered");
1760            events.push(GenericEvent::NotifyError(
1761                MqttError::PacketIdentifierInvalid,
1762            ));
1763            return events;
1764        }
1765        if self.need_store {
1766            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1767        }
1768
1769        if self.status == ConnectionStatus::Connected {
1770            self.pid_pubcomp.insert(packet_id);
1771            events.push(GenericEvent::RequestSendPacket {
1772                packet: packet.into(),
1773                release_packet_id_if_send_error: None,
1774            });
1775        }
1776        self.send_post_process(&mut events);
1777
1778        events
1779    }
1780
1781    pub(crate) fn process_send_v5_0_pubrel(
1782        &mut self,
1783        packet: v5_0::GenericPubrel<PacketIdType>,
1784    ) -> Vec<GenericEvent<PacketIdType>> {
1785        if !self.validate_maximum_packet_size_send(packet.size()) {
1786            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1787        }
1788        if self.status != ConnectionStatus::Connected && !self.need_store {
1789            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1790        }
1791
1792        let mut events = Vec::new();
1793        let packet_id = packet.packet_id();
1794        if !self.pid_man.is_used_id(packet_id) {
1795            error!("packet_id {packet_id} must be acquired or registered");
1796            events.push(GenericEvent::NotifyError(
1797                MqttError::PacketIdentifierInvalid,
1798            ));
1799            return events;
1800        }
1801        if self.need_store {
1802            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1803        }
1804
1805        if self.status == ConnectionStatus::Connected {
1806            self.pid_pubcomp.insert(packet_id);
1807            events.push(GenericEvent::RequestSendPacket {
1808                packet: packet.into(),
1809                release_packet_id_if_send_error: None,
1810            });
1811        }
1812        self.send_post_process(&mut events);
1813
1814        events
1815    }
1816
1817    pub(crate) fn process_send_v3_1_1_pubcomp(
1818        &mut self,
1819        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1820    ) -> Vec<GenericEvent<PacketIdType>> {
1821        if self.status != ConnectionStatus::Connected {
1822            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1823        }
1824        let mut events = Vec::new();
1825
1826        events.push(GenericEvent::RequestSendPacket {
1827            packet: packet.into(),
1828            release_packet_id_if_send_error: None,
1829        });
1830        self.send_post_process(&mut events);
1831
1832        events
1833    }
1834
1835    pub(crate) fn process_send_v5_0_pubcomp(
1836        &mut self,
1837        packet: v5_0::GenericPubcomp<PacketIdType>,
1838    ) -> Vec<GenericEvent<PacketIdType>> {
1839        if !self.validate_maximum_packet_size_send(packet.size()) {
1840            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1841        }
1842        if self.status != ConnectionStatus::Connected {
1843            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1844        }
1845
1846        let mut events = Vec::new();
1847        self.publish_recv.remove(&packet.packet_id());
1848
1849        events.push(GenericEvent::RequestSendPacket {
1850            packet: packet.into(),
1851            release_packet_id_if_send_error: None,
1852        });
1853        self.send_post_process(&mut events);
1854
1855        events
1856    }
1857
1858    pub(crate) fn process_send_v3_1_1_subscribe(
1859        &mut self,
1860        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1861    ) -> Vec<GenericEvent<PacketIdType>> {
1862        let mut events = Vec::new();
1863        let packet_id = packet.packet_id();
1864        if self.status != ConnectionStatus::Connected {
1865            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1866            if self.pid_man.is_used_id(packet_id) {
1867                self.pid_man.release_id(packet_id);
1868                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1869            }
1870            return events;
1871        }
1872        if !self.pid_man.is_used_id(packet_id) {
1873            error!("packet_id {packet_id} must be acquired or registered");
1874            events.push(GenericEvent::NotifyError(
1875                MqttError::PacketIdentifierInvalid,
1876            ));
1877            return events;
1878        }
1879        self.pid_suback.insert(packet_id);
1880
1881        events.push(GenericEvent::RequestSendPacket {
1882            packet: packet.into(),
1883            release_packet_id_if_send_error: Some(packet_id),
1884        });
1885        self.send_post_process(&mut events);
1886
1887        events
1888    }
1889
1890    pub(crate) fn process_send_v5_0_subscribe(
1891        &mut self,
1892        packet: v5_0::GenericSubscribe<PacketIdType>,
1893    ) -> Vec<GenericEvent<PacketIdType>> {
1894        if !self.validate_maximum_packet_size_send(packet.size()) {
1895            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1896        }
1897
1898        let mut events = Vec::new();
1899        let packet_id = packet.packet_id();
1900        if self.status != ConnectionStatus::Connected {
1901            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1902            if self.pid_man.is_used_id(packet_id) {
1903                self.pid_man.release_id(packet_id);
1904                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1905            }
1906            return events;
1907        }
1908        if !self.pid_man.is_used_id(packet_id) {
1909            error!("packet_id {packet_id} must be acquired or registered");
1910            events.push(GenericEvent::NotifyError(
1911                MqttError::PacketIdentifierInvalid,
1912            ));
1913            return events;
1914        }
1915        self.pid_suback.insert(packet_id);
1916
1917        events.push(GenericEvent::RequestSendPacket {
1918            packet: packet.into(),
1919            release_packet_id_if_send_error: Some(packet_id),
1920        });
1921        self.send_post_process(&mut events);
1922
1923        events
1924    }
1925
1926    pub(crate) fn process_send_v3_1_1_suback(
1927        &mut self,
1928        packet: v3_1_1::GenericSuback<PacketIdType>,
1929    ) -> Vec<GenericEvent<PacketIdType>> {
1930        if self.status != ConnectionStatus::Connected {
1931            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1932        }
1933        let mut events = Vec::new();
1934        events.push(GenericEvent::RequestSendPacket {
1935            packet: packet.into(),
1936            release_packet_id_if_send_error: None,
1937        });
1938        self.send_post_process(&mut events);
1939
1940        events
1941    }
1942
1943    pub(crate) fn process_send_v5_0_suback(
1944        &mut self,
1945        packet: v5_0::GenericSuback<PacketIdType>,
1946    ) -> Vec<GenericEvent<PacketIdType>> {
1947        if !self.validate_maximum_packet_size_send(packet.size()) {
1948            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1949        }
1950        if self.status != ConnectionStatus::Connected {
1951            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1952        }
1953
1954        let mut events = Vec::new();
1955        events.push(GenericEvent::RequestSendPacket {
1956            packet: packet.into(),
1957            release_packet_id_if_send_error: None,
1958        });
1959        self.send_post_process(&mut events);
1960
1961        events
1962    }
1963
1964    pub(crate) fn process_send_v3_1_1_unsubscribe(
1965        &mut self,
1966        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1967    ) -> Vec<GenericEvent<PacketIdType>> {
1968        let mut events = Vec::new();
1969        let packet_id = packet.packet_id();
1970        if self.status != ConnectionStatus::Connected {
1971            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1972            if self.pid_man.is_used_id(packet_id) {
1973                self.pid_man.release_id(packet_id);
1974                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1975            }
1976            return events;
1977        }
1978        if !self.pid_man.is_used_id(packet_id) {
1979            error!("packet_id {packet_id} must be acquired or registered");
1980            events.push(GenericEvent::NotifyError(
1981                MqttError::PacketIdentifierInvalid,
1982            ));
1983            return events;
1984        }
1985        self.pid_unsuback.insert(packet_id);
1986
1987        events.push(GenericEvent::RequestSendPacket {
1988            packet: packet.into(),
1989            release_packet_id_if_send_error: Some(packet_id),
1990        });
1991        self.send_post_process(&mut events);
1992
1993        events
1994    }
1995
1996    pub(crate) fn process_send_v5_0_unsubscribe(
1997        &mut self,
1998        packet: v5_0::GenericUnsubscribe<PacketIdType>,
1999    ) -> Vec<GenericEvent<PacketIdType>> {
2000        if !self.validate_maximum_packet_size_send(packet.size()) {
2001            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2002        }
2003
2004        let mut events = Vec::new();
2005        let packet_id = packet.packet_id();
2006        if self.status != ConnectionStatus::Connected {
2007            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2008            if self.pid_man.is_used_id(packet_id) {
2009                self.pid_man.release_id(packet_id);
2010                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2011            }
2012            return events;
2013        }
2014        if !self.pid_man.is_used_id(packet_id) {
2015            error!("packet_id {packet_id} must be acquired or registered");
2016            events.push(GenericEvent::NotifyError(
2017                MqttError::PacketIdentifierInvalid,
2018            ));
2019            return events;
2020        }
2021        self.pid_unsuback.insert(packet_id);
2022
2023        events.push(GenericEvent::RequestSendPacket {
2024            packet: packet.into(),
2025            release_packet_id_if_send_error: Some(packet_id),
2026        });
2027        self.send_post_process(&mut events);
2028
2029        events
2030    }
2031
2032    pub(crate) fn process_send_v3_1_1_unsuback(
2033        &mut self,
2034        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2035    ) -> Vec<GenericEvent<PacketIdType>> {
2036        if self.status != ConnectionStatus::Connected {
2037            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2038        }
2039        let mut events = Vec::new();
2040        events.push(GenericEvent::RequestSendPacket {
2041            packet: packet.into(),
2042            release_packet_id_if_send_error: None,
2043        });
2044        self.send_post_process(&mut events);
2045
2046        events
2047    }
2048
2049    pub(crate) fn process_send_v5_0_unsuback(
2050        &mut self,
2051        packet: v5_0::GenericUnsuback<PacketIdType>,
2052    ) -> Vec<GenericEvent<PacketIdType>> {
2053        if !self.validate_maximum_packet_size_send(packet.size()) {
2054            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2055        }
2056        if self.status != ConnectionStatus::Connected {
2057            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2058        }
2059
2060        let mut events = Vec::new();
2061        events.push(GenericEvent::RequestSendPacket {
2062            packet: packet.into(),
2063            release_packet_id_if_send_error: None,
2064        });
2065        self.send_post_process(&mut events);
2066
2067        events
2068    }
2069
2070    pub(crate) fn process_send_v3_1_1_pingreq(
2071        &mut self,
2072        packet: v3_1_1::Pingreq,
2073    ) -> Vec<GenericEvent<PacketIdType>> {
2074        if self.status != ConnectionStatus::Connected {
2075            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2076        }
2077        let mut events = Vec::new();
2078        events.push(GenericEvent::RequestSendPacket {
2079            packet: packet.into(),
2080            release_packet_id_if_send_error: None,
2081        });
2082        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2083            self.pingreq_send_set = true;
2084            events.push(GenericEvent::RequestTimerReset {
2085                kind: TimerKind::PingrespRecv,
2086                duration_ms: timeout_ms,
2087            });
2088        }
2089        self.send_post_process(&mut events);
2090
2091        events
2092    }
2093
2094    pub(crate) fn process_send_v5_0_pingreq(
2095        &mut self,
2096        packet: v5_0::Pingreq,
2097    ) -> Vec<GenericEvent<PacketIdType>> {
2098        if !self.validate_maximum_packet_size_send(packet.size()) {
2099            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2100        }
2101        if self.status != ConnectionStatus::Connected {
2102            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2103        }
2104
2105        let mut events = Vec::new();
2106        events.push(GenericEvent::RequestSendPacket {
2107            packet: packet.into(),
2108            release_packet_id_if_send_error: None,
2109        });
2110        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2111            self.pingreq_send_set = true;
2112            events.push(GenericEvent::RequestTimerReset {
2113                kind: TimerKind::PingrespRecv,
2114                duration_ms: timeout_ms,
2115            });
2116        }
2117        self.send_post_process(&mut events);
2118
2119        events
2120    }
2121
2122    pub(crate) fn process_send_v3_1_1_pingresp(
2123        &mut self,
2124        packet: v3_1_1::Pingresp,
2125    ) -> Vec<GenericEvent<PacketIdType>> {
2126        if self.status != ConnectionStatus::Connected {
2127            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2128        }
2129        let mut events = Vec::new();
2130        events.push(GenericEvent::RequestSendPacket {
2131            packet: packet.into(),
2132            release_packet_id_if_send_error: None,
2133        });
2134        self.send_post_process(&mut events);
2135
2136        events
2137    }
2138
2139    pub(crate) fn process_send_v5_0_pingresp(
2140        &mut self,
2141        packet: v5_0::Pingresp,
2142    ) -> Vec<GenericEvent<PacketIdType>> {
2143        if !self.validate_maximum_packet_size_send(packet.size()) {
2144            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2145        }
2146        if self.status != ConnectionStatus::Connected {
2147            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2148        }
2149
2150        let mut events = Vec::new();
2151        events.push(GenericEvent::RequestSendPacket {
2152            packet: packet.into(),
2153            release_packet_id_if_send_error: None,
2154        });
2155        self.send_post_process(&mut events);
2156
2157        events
2158    }
2159
2160    pub(crate) fn process_send_v3_1_1_disconnect(
2161        &mut self,
2162        packet: v3_1_1::Disconnect,
2163    ) -> Vec<GenericEvent<PacketIdType>> {
2164        if self.status != ConnectionStatus::Connected {
2165            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2166        }
2167        let mut events = Vec::new();
2168        self.status = ConnectionStatus::Disconnected;
2169        self.cancel_timers(&mut events);
2170        events.push(GenericEvent::RequestSendPacket {
2171            packet: packet.into(),
2172            release_packet_id_if_send_error: None,
2173        });
2174        events.push(GenericEvent::RequestClose);
2175
2176        events
2177    }
2178
2179    pub(crate) fn process_send_v5_0_disconnect(
2180        &mut self,
2181        packet: v5_0::Disconnect,
2182    ) -> Vec<GenericEvent<PacketIdType>> {
2183        if !self.validate_maximum_packet_size_send(packet.size()) {
2184            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2185        }
2186        if self.status != ConnectionStatus::Connected {
2187            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2188        }
2189
2190        let mut events = Vec::new();
2191        self.status = ConnectionStatus::Disconnected;
2192        self.cancel_timers(&mut events);
2193        events.push(GenericEvent::RequestSendPacket {
2194            packet: packet.into(),
2195            release_packet_id_if_send_error: None,
2196        });
2197        events.push(GenericEvent::RequestClose);
2198
2199        events
2200    }
2201
2202    pub(crate) fn process_send_v5_0_auth(
2203        &mut self,
2204        packet: v5_0::Auth,
2205    ) -> Vec<GenericEvent<PacketIdType>> {
2206        if !self.validate_maximum_packet_size_send(packet.size()) {
2207            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2208        }
2209        if self.status == ConnectionStatus::Disconnected {
2210            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2211        }
2212
2213        let mut events = Vec::new();
2214        events.push(GenericEvent::RequestSendPacket {
2215            packet: packet.into(),
2216            release_packet_id_if_send_error: None,
2217        });
2218        self.send_post_process(&mut events);
2219
2220        events
2221    }
2222
2223    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2224        if self.is_client {
2225            if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2226                self.pingreq_send_set = true;
2227                events.push(GenericEvent::RequestTimerReset {
2228                    kind: TimerKind::PingreqSend,
2229                    duration_ms: timeout_ms,
2230                });
2231            }
2232        }
2233    }
2234
2235    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2236        if size > self.maximum_packet_size_send as usize {
2237            error!("packet size over maximum_packet_size for sending");
2238            return false;
2239        }
2240        true
2241    }
2242
2243    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2244        let mut events = Vec::new();
2245
2246        // packet size limit validation (v3.1.1 is always satisfied)
2247        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2248        if total_size > self.maximum_packet_size_recv {
2249            // This happens only when protocol version is V5.0.
2250            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2251            // If the packet size is over 268434555, feed() return an error.
2252            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2253            // So DISCONNECT packet is the right choice to notify the error.
2254            let disconnect_packet = v5_0::Disconnect::builder()
2255                .reason_code(DisconnectReasonCode::PacketTooLarge)
2256                .build()
2257                .unwrap();
2258            // Send disconnect packet directly without generic constraints
2259            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2260            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2261            return events;
2262        }
2263
2264        let packet_type = raw_packet.packet_type();
2265        let _flags = raw_packet.flags();
2266        match self.protocol_version {
2267            Version::V3_1_1 => {
2268                match packet_type {
2269                    1 => {
2270                        // CONNECT
2271                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2272                    }
2273                    2 => {
2274                        // CONNACK
2275                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2276                    }
2277                    3 => {
2278                        // PUBLISH
2279                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2280                    }
2281                    4 => {
2282                        // PUBACK
2283                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2284                    }
2285                    5 => {
2286                        // PUBREC
2287                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2288                    }
2289                    6 => {
2290                        // PUBREL
2291                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2292                    }
2293                    7 => {
2294                        // PUBCOMP
2295                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2296                    }
2297                    8 => {
2298                        // SUBSCRIBE
2299                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2300                    }
2301                    9 => {
2302                        // SUBACK
2303                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2304                    }
2305                    10 => {
2306                        // UNSUBSCRIBE
2307                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2308                    }
2309                    11 => {
2310                        // UNSUBACK
2311                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2312                    }
2313                    12 => {
2314                        // PINGREQ
2315                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2316                    }
2317                    13 => {
2318                        // PINGRESP
2319                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2320                    }
2321                    14 => {
2322                        // DISCONNECT
2323                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2324                    }
2325                    // invalid packet type
2326                    _ => {
2327                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2328                    }
2329                }
2330            }
2331            Version::V5_0 => {
2332                match packet_type {
2333                    1 => {
2334                        // CONNECT
2335                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2336                    }
2337                    2 => {
2338                        // CONNACK
2339                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2340                    }
2341                    3 => {
2342                        // PUBLISH
2343                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2344                    }
2345                    4 => {
2346                        // PUBACK
2347                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2348                    }
2349                    5 => {
2350                        // PUBREC
2351                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2352                    }
2353                    6 => {
2354                        // PUBREL
2355                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2356                    }
2357                    7 => {
2358                        // PUBCOMP
2359                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2360                    }
2361                    8 => {
2362                        // SUBSCRIBE
2363                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2364                    }
2365                    9 => {
2366                        // SUBACK
2367                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2368                    }
2369                    10 => {
2370                        // UNSUBSCRIBE
2371                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2372                    }
2373                    11 => {
2374                        // UNSUBACK
2375                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2376                    }
2377                    12 => {
2378                        // PINGREQ
2379                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2380                    }
2381                    13 => {
2382                        // PINGRESP
2383                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2384                    }
2385                    14 => {
2386                        // DISCONNECT
2387                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2388                    }
2389                    15 => {
2390                        // AUTH
2391                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2392                    }
2393                    // invalid packet type
2394                    _ => {
2395                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2396                    }
2397                }
2398            }
2399            Version::Undetermined => {
2400                match packet_type {
2401                    1 => {
2402                        // CONNECT
2403                        if raw_packet.remaining_length() < 7 {
2404                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2405                            return events;
2406                        }
2407                        match raw_packet.data_as_slice()[6] {
2408                            // Protocol Version
2409                            4 => {
2410                                self.protocol_version = Version::V3_1_1;
2411                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2412                            }
2413                            5 => {
2414                                self.protocol_version = Version::V5_0;
2415                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2416                            }
2417                            _ => {
2418                                events.push(GenericEvent::NotifyError(
2419                                    MqttError::UnsupportedProtocolVersion,
2420                                ));
2421                            }
2422                        }
2423                    }
2424                    _ => {
2425                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2426                    }
2427                }
2428            }
2429        }
2430
2431        events
2432    }
2433
2434    fn process_recv_v3_1_1_connect(
2435        &mut self,
2436        raw_packet: RawPacket,
2437    ) -> Vec<GenericEvent<PacketIdType>> {
2438        let mut events = Vec::new();
2439        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2440            Ok((packet, _)) => {
2441                if self.status != ConnectionStatus::Disconnected {
2442                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2443                    return events;
2444                }
2445                self.initialize(false);
2446                self.status = ConnectionStatus::Connecting;
2447                if packet.keep_alive() > 0 {
2448                    self.pingreq_recv_timeout_ms =
2449                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2450                }
2451                if packet.clean_session() {
2452                    self.clear_store_related();
2453                } else {
2454                    self.need_store = true;
2455                }
2456                events.extend(self.refresh_pingreq_recv());
2457                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2458            }
2459            Err(e) => {
2460                if self.status == ConnectionStatus::Disconnected {
2461                    self.status = ConnectionStatus::Connecting;
2462                    let rc = match e {
2463                        MqttError::ClientIdentifierNotValid => {
2464                            ConnectReturnCode::IdentifierRejected
2465                        }
2466                        MqttError::BadUserNameOrPassword => {
2467                            ConnectReturnCode::BadUserNameOrPassword
2468                        }
2469                        MqttError::UnsupportedProtocolVersion => {
2470                            ConnectReturnCode::UnacceptableProtocolVersion
2471                        }
2472                        _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2473                    };
2474                    let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2475                    let connack_events = self.process_send_v3_1_1_connack(connack);
2476                    events.extend(connack_events);
2477                } else {
2478                    events.push(GenericEvent::RequestClose);
2479                }
2480                events.push(GenericEvent::NotifyError(e));
2481            }
2482        }
2483
2484        events
2485    }
2486
2487    fn process_recv_v5_0_connect(
2488        &mut self,
2489        raw_packet: RawPacket,
2490    ) -> Vec<GenericEvent<PacketIdType>> {
2491        let mut events = Vec::new();
2492        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2493            Ok((packet, _)) => {
2494                if self.status != ConnectionStatus::Disconnected {
2495                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2496                    return events;
2497                }
2498                self.initialize(false);
2499                self.status = ConnectionStatus::Connecting;
2500                if packet.keep_alive() > 0 {
2501                    self.pingreq_recv_timeout_ms =
2502                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2503                }
2504                if packet.clean_start() {
2505                    self.clear_store_related();
2506                }
2507                packet.props().iter().for_each(|prop| match prop {
2508                    Property::TopicAliasMaximum(p) => {
2509                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2510                    }
2511                    Property::ReceiveMaximum(p) => {
2512                        self.publish_send_max = Some(p.val());
2513                    }
2514                    Property::MaximumPacketSize(p) => {
2515                        self.maximum_packet_size_send = p.val();
2516                    }
2517                    Property::SessionExpiryInterval(p) => {
2518                        if p.val() != 0 {
2519                            self.need_store = true;
2520                        }
2521                    }
2522                    _ => {}
2523                });
2524                events.extend(self.refresh_pingreq_recv());
2525                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2526            }
2527            Err(e) => {
2528                if self.status == ConnectionStatus::Disconnected {
2529                    self.status = ConnectionStatus::Connecting;
2530                    let rc = match e {
2531                        MqttError::ClientIdentifierNotValid => {
2532                            ConnectReasonCode::ClientIdentifierNotValid
2533                        }
2534                        MqttError::BadUserNameOrPassword => {
2535                            ConnectReasonCode::BadAuthenticationMethod
2536                        }
2537                        MqttError::UnsupportedProtocolVersion => {
2538                            ConnectReasonCode::UnsupportedProtocolVersion
2539                        }
2540                        _ => ConnectReasonCode::UnspecifiedError,
2541                    };
2542                    let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2543                    let connack_events = self.process_send_v5_0_connack(connack);
2544                    events.extend(connack_events);
2545                } else {
2546                    let disconnect = v5_0::Disconnect::builder()
2547                        .reason_code(DisconnectReasonCode::ProtocolError)
2548                        .build()
2549                        .unwrap();
2550                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2551                    events.extend(disconnect_events);
2552                }
2553                events.push(GenericEvent::NotifyError(e));
2554            }
2555        }
2556
2557        events
2558    }
2559
2560    fn process_recv_v3_1_1_connack(
2561        &mut self,
2562        raw_packet: RawPacket,
2563    ) -> Vec<GenericEvent<PacketIdType>> {
2564        let mut events = Vec::new();
2565
2566        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2567            Ok((packet, _consumed)) => {
2568                if packet.return_code() == ConnectReturnCode::Accepted {
2569                    self.status = ConnectionStatus::Connected;
2570                    if packet.session_present() {
2571                        events.extend(self.send_stored());
2572                    } else {
2573                        self.clear_store_related();
2574                    }
2575                }
2576                events.push(GenericEvent::NotifyPacketReceived(
2577                    GenericPacket::V3_1_1Connack(packet),
2578                ));
2579            }
2580            Err(e) => {
2581                events.push(GenericEvent::RequestClose);
2582                events.push(GenericEvent::NotifyError(e));
2583            }
2584        }
2585
2586        events
2587    }
2588
2589    fn process_recv_v5_0_connack(
2590        &mut self,
2591        raw_packet: RawPacket,
2592    ) -> Vec<GenericEvent<PacketIdType>> {
2593        let mut events = Vec::new();
2594
2595        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2596            Ok((packet, _consumed)) => {
2597                if packet.reason_code() == ConnectReasonCode::Success {
2598                    self.status = ConnectionStatus::Connected;
2599
2600                    // Process properties
2601                    for prop in packet.props() {
2602                        match prop {
2603                            Property::TopicAliasMaximum(val) => {
2604                                if val.val() > 0 {
2605                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2606                                }
2607                            }
2608                            Property::ReceiveMaximum(val) => {
2609                                assert!(val.val() != 0);
2610                                self.publish_send_max = Some(val.val());
2611                            }
2612                            Property::MaximumPacketSize(val) => {
2613                                assert!(val.val() != 0);
2614                                self.maximum_packet_size_send = val.val();
2615                            }
2616                            Property::ServerKeepAlive(val) => {
2617                                // Set PINGREQ send interval if this is a client
2618                                let timeout_ms = (val.val() as u64) * 1000;
2619                                self.pingreq_send_interval_ms = Some(timeout_ms);
2620                            }
2621                            _ => {
2622                                // Ignore other properties
2623                            }
2624                        }
2625                    }
2626
2627                    if packet.session_present() {
2628                        events.extend(self.send_stored());
2629                    } else {
2630                        self.clear_store_related();
2631                    }
2632                }
2633                events.push(GenericEvent::NotifyPacketReceived(
2634                    GenericPacket::V5_0Connack(packet),
2635                ));
2636            }
2637            Err(e) => {
2638                if self.status == ConnectionStatus::Connected {
2639                    let disconnect = v5_0::Disconnect::builder()
2640                        .reason_code(e.into())
2641                        .build()
2642                        .unwrap();
2643                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2644                    events.extend(disconnect_events);
2645                }
2646                events.push(GenericEvent::NotifyError(e));
2647            }
2648        }
2649
2650        events
2651    }
2652
2653    fn process_recv_v3_1_1_publish(
2654        &mut self,
2655        raw_packet: RawPacket,
2656    ) -> Vec<GenericEvent<PacketIdType>> {
2657        let mut events = Vec::new();
2658
2659        let flags = raw_packet.flags();
2660        match &raw_packet.data {
2661            PacketData::Publish(arc) => {
2662                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2663                    Ok((packet, _consumed)) => {
2664                        match packet.qos() {
2665                            Qos::AtMostOnce => {
2666                                events.extend(self.refresh_pingreq_recv());
2667                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2668                            }
2669                            Qos::AtLeastOnce => {
2670                                let packet_id = packet.packet_id().unwrap();
2671                                if self.status == ConnectionStatus::Connected
2672                                    && self.auto_pub_response
2673                                {
2674                                    // Send PUBACK automatically
2675                                    let puback = v3_1_1::GenericPuback::builder()
2676                                        .packet_id(packet_id)
2677                                        .build()
2678                                        .unwrap();
2679                                    events.extend(self.process_send_v3_1_1_puback(puback));
2680                                }
2681                                events.extend(self.refresh_pingreq_recv());
2682                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2683                            }
2684                            Qos::ExactlyOnce => {
2685                                let packet_id = packet.packet_id().unwrap();
2686                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2687
2688                                if self.status == ConnectionStatus::Connected
2689                                    && (self.auto_pub_response || already_handled)
2690                                {
2691                                    let pubrec = v3_1_1::GenericPubrec::builder()
2692                                        .packet_id(packet_id)
2693                                        .build()
2694                                        .unwrap();
2695                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2696                                }
2697                                events.extend(self.refresh_pingreq_recv());
2698                                if !already_handled {
2699                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2700                                }
2701                            }
2702                        }
2703                    }
2704                    Err(e) => {
2705                        events.push(GenericEvent::RequestClose);
2706                        events.push(GenericEvent::NotifyError(e));
2707                    }
2708                }
2709            }
2710            PacketData::Normal(_) => {
2711                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2712            }
2713        }
2714
2715        events
2716    }
2717
2718    fn process_recv_v5_0_publish(
2719        &mut self,
2720        raw_packet: RawPacket,
2721    ) -> Vec<GenericEvent<PacketIdType>> {
2722        let mut events = Vec::new();
2723
2724        let flags = raw_packet.flags();
2725        match &raw_packet.data {
2726            PacketData::Publish(arc) => {
2727                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2728                    Ok((packet, _consumed)) => {
2729                        let mut already_handled = false;
2730                        let mut puback_send = false;
2731                        let mut pubrec_send = false;
2732
2733                        match packet.qos() {
2734                            Qos::AtLeastOnce => {
2735                                let packet_id = packet.packet_id().unwrap();
2736                                if let Some(max) = self.publish_recv_max {
2737                                    if self.publish_recv.len() >= max as usize {
2738                                        let disconnect = v5_0::Disconnect::builder()
2739                                            .reason_code(
2740                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2741                                            )
2742                                            .build()
2743                                            .unwrap();
2744                                        events
2745                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2746                                        events.push(GenericEvent::NotifyError(
2747                                            MqttError::ReceiveMaximumExceeded,
2748                                        ));
2749                                        return events;
2750                                    }
2751                                }
2752                                self.publish_recv.insert(packet_id);
2753                                if self.auto_pub_response
2754                                    && self.status == ConnectionStatus::Connected
2755                                {
2756                                    puback_send = true;
2757                                }
2758                            }
2759                            Qos::ExactlyOnce => {
2760                                let packet_id = packet.packet_id().unwrap();
2761                                if let Some(max) = self.publish_recv_max {
2762                                    if self.publish_recv.len() >= max as usize {
2763                                        let disconnect = v5_0::Disconnect::builder()
2764                                            .reason_code(
2765                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2766                                            )
2767                                            .build()
2768                                            .unwrap();
2769                                        events
2770                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2771                                        events.push(GenericEvent::NotifyError(
2772                                            MqttError::ReceiveMaximumExceeded,
2773                                        ));
2774                                        return events;
2775                                    }
2776                                }
2777                                self.publish_recv.insert(packet_id);
2778
2779                                if !self.qos2_publish_handled.insert(packet_id) {
2780                                    already_handled = true;
2781                                }
2782                                if self.status == ConnectionStatus::Connected
2783                                    && (self.auto_pub_response || already_handled)
2784                                {
2785                                    pubrec_send = true;
2786                                }
2787                            }
2788                            Qos::AtMostOnce => {
2789                                // No packet ID handling for QoS 0
2790                            }
2791                        }
2792
2793                        // Topic Alias handling
2794                        if packet.topic_name().is_empty() {
2795                            // Extract topic from topic_alias
2796                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2797                                if ta == 0
2798                                    || self.topic_alias_recv.is_none()
2799                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2800                                {
2801                                    let disconnect = v5_0::Disconnect::builder()
2802                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2803                                        .build()
2804                                        .unwrap();
2805                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2806                                    events.push(GenericEvent::NotifyError(
2807                                        MqttError::TopicAliasInvalid,
2808                                    ));
2809                                    return events;
2810                                }
2811
2812                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2813                                    if topic_alias_recv.get(ta).is_none() {
2814                                        let disconnect = v5_0::Disconnect::builder()
2815                                            .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2816                                            .build()
2817                                            .unwrap();
2818                                        events
2819                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2820                                        events.push(GenericEvent::NotifyError(
2821                                            MqttError::TopicAliasInvalid,
2822                                        ));
2823                                        return events;
2824                                    }
2825                                    // Note: In a complete implementation, we would modify the packet
2826                                    // to add the resolved topic. For now, we'll proceed.
2827                                }
2828                            } else {
2829                                let disconnect = v5_0::Disconnect::builder()
2830                                    .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2831                                    .build()
2832                                    .unwrap();
2833                                events.extend(self.process_send_v5_0_disconnect(disconnect));
2834                                events
2835                                    .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2836                                return events;
2837                            }
2838                        } else {
2839                            // Topic is not empty, check if topic alias needs to be registered
2840                            if let Some(ta) = Self::get_topic_alias_from_props(packet.props()) {
2841                                if ta == 0
2842                                    || self.topic_alias_recv.is_none()
2843                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2844                                {
2845                                    let disconnect = v5_0::Disconnect::builder()
2846                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2847                                        .build()
2848                                        .unwrap();
2849                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2850                                    events.push(GenericEvent::NotifyError(
2851                                        MqttError::TopicAliasInvalid,
2852                                    ));
2853                                    return events;
2854                                }
2855                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2856                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2857                                }
2858                            }
2859                        }
2860
2861                        // Send response packets
2862                        if puback_send {
2863                            let puback = v5_0::GenericPuback::builder()
2864                                .packet_id(packet.packet_id().unwrap())
2865                                .build()
2866                                .unwrap();
2867                            events.extend(self.process_send_v5_0_puback(puback));
2868                        }
2869                        if pubrec_send {
2870                            let pubrec = v5_0::GenericPubrec::builder()
2871                                .packet_id(packet.packet_id().unwrap())
2872                                .build()
2873                                .unwrap();
2874                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2875                        }
2876
2877                        // Refresh PINGREQ receive timer
2878                        events.extend(self.refresh_pingreq_recv());
2879
2880                        // Notify packet received (only if not already handled)
2881                        if !already_handled {
2882                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2883                        }
2884                    }
2885                    Err(e) => {
2886                        if self.status == ConnectionStatus::Connected {
2887                            let disconnect = v5_0::Disconnect::builder()
2888                                .reason_code(e.into())
2889                                .build()
2890                                .unwrap();
2891                            let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2892                            events.extend(disconnect_events);
2893                        }
2894                        events.push(GenericEvent::NotifyError(e));
2895                    }
2896                }
2897            }
2898            PacketData::Normal(_) => {
2899                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2900            }
2901        }
2902
2903        events
2904    }
2905
2906    fn process_recv_v3_1_1_puback(
2907        &mut self,
2908        raw_packet: RawPacket,
2909    ) -> Vec<GenericEvent<PacketIdType>> {
2910        let mut events = Vec::new();
2911
2912        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2913            Ok((packet, _)) => {
2914                let packet_id = packet.packet_id();
2915                if self.pid_puback.remove(&packet_id) {
2916                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2917                    if self.pid_man.is_used_id(packet_id) {
2918                        self.pid_man.release_id(packet_id);
2919                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2920                    }
2921                    events.extend(self.refresh_pingreq_recv());
2922                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2923                } else {
2924                    events.push(GenericEvent::RequestClose);
2925                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2926                }
2927            }
2928            Err(e) => {
2929                events.push(GenericEvent::RequestClose);
2930                events.push(GenericEvent::NotifyError(e));
2931            }
2932        }
2933
2934        events
2935    }
2936
2937    fn process_recv_v5_0_puback(
2938        &mut self,
2939        raw_packet: RawPacket,
2940    ) -> Vec<GenericEvent<PacketIdType>> {
2941        let mut events = Vec::new();
2942
2943        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2944            Ok((packet, _)) => {
2945                let packet_id = packet.packet_id();
2946                if self.pid_puback.remove(&packet_id) {
2947                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2948                    if self.pid_man.is_used_id(packet_id) {
2949                        self.pid_man.release_id(packet_id);
2950                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2951                    }
2952                    if self.publish_send_max.is_some() {
2953                        self.publish_send_count -= 1;
2954                    }
2955                    events.extend(self.refresh_pingreq_recv());
2956                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2957                } else {
2958                    let disconnect = v5_0::Disconnect::builder()
2959                        .reason_code(DisconnectReasonCode::ProtocolError)
2960                        .build()
2961                        .unwrap();
2962                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2963                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2964                }
2965            }
2966            Err(e) => {
2967                let disconnect = v5_0::Disconnect::builder()
2968                    .reason_code(DisconnectReasonCode::ProtocolError)
2969                    .build()
2970                    .unwrap();
2971                events.extend(self.process_send_v5_0_disconnect(disconnect));
2972                events.push(GenericEvent::NotifyError(e));
2973            }
2974        }
2975
2976        events
2977    }
2978
2979    fn process_recv_v3_1_1_pubrec(
2980        &mut self,
2981        raw_packet: RawPacket,
2982    ) -> Vec<GenericEvent<PacketIdType>> {
2983        let mut events = Vec::new();
2984
2985        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2986            Ok((packet, _)) => {
2987                let packet_id = packet.packet_id();
2988                if self.pid_pubrec.remove(&packet_id) {
2989                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2990                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2991                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2992                            .packet_id(packet_id)
2993                            .build()
2994                            .unwrap();
2995                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2996                    }
2997                    events.extend(self.refresh_pingreq_recv());
2998                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2999                } else {
3000                    events.push(GenericEvent::RequestClose);
3001                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3002                }
3003            }
3004            Err(e) => {
3005                events.push(GenericEvent::RequestClose);
3006                events.push(GenericEvent::NotifyError(e));
3007            }
3008        }
3009
3010        events
3011    }
3012
3013    fn process_recv_v5_0_pubrec(
3014        &mut self,
3015        raw_packet: RawPacket,
3016    ) -> Vec<GenericEvent<PacketIdType>> {
3017        let mut events = Vec::new();
3018
3019        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3020            Ok((packet, _)) => {
3021                let packet_id = packet.packet_id();
3022                if self.pid_pubrec.remove(&packet_id) {
3023                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3024                    if let Some(reason_code) = packet.reason_code() {
3025                        if reason_code != PubrecReasonCode::Success {
3026                            if self.pid_man.is_used_id(packet_id) {
3027                                self.pid_man.release_id(packet_id);
3028                                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3029                            }
3030                            self.qos2_publish_processing.remove(&packet_id);
3031                            if self.publish_send_max.is_some() {
3032                                self.publish_send_count -= 1;
3033                            }
3034                        } else if self.auto_pub_response
3035                            && self.status == ConnectionStatus::Connected
3036                        {
3037                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3038                                .packet_id(packet_id)
3039                                .build()
3040                                .unwrap();
3041                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3042                        }
3043                    } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3044                        let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3045                            .packet_id(packet_id)
3046                            .build()
3047                            .unwrap();
3048                        events.extend(self.process_send_v5_0_pubrel(pubrel));
3049                    }
3050                    events.extend(self.refresh_pingreq_recv());
3051                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3052                } else {
3053                    let disconnect = v5_0::Disconnect::builder()
3054                        .reason_code(DisconnectReasonCode::ProtocolError)
3055                        .build()
3056                        .unwrap();
3057                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3058                    events.push(GenericEvent::NotifyError(MqttError::from(
3059                        DisconnectReasonCode::ProtocolError,
3060                    )));
3061                }
3062            }
3063            Err(e) => {
3064                let disconnect = v5_0::Disconnect::builder()
3065                    .reason_code(DisconnectReasonCode::ProtocolError)
3066                    .build()
3067                    .unwrap();
3068                events.extend(self.process_send_v5_0_disconnect(disconnect));
3069                events.push(GenericEvent::NotifyError(e));
3070            }
3071        }
3072
3073        events
3074    }
3075
3076    fn process_recv_v3_1_1_pubrel(
3077        &mut self,
3078        raw_packet: RawPacket,
3079    ) -> Vec<GenericEvent<PacketIdType>> {
3080        let mut events = Vec::new();
3081
3082        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3083            Ok((packet, _)) => {
3084                let packet_id = packet.packet_id();
3085                self.qos2_publish_handled.remove(&packet_id);
3086                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3087                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3088                        .packet_id(packet_id)
3089                        .build()
3090                        .unwrap();
3091                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3092                }
3093                events.extend(self.refresh_pingreq_recv());
3094                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3095            }
3096            Err(e) => {
3097                events.push(GenericEvent::RequestClose);
3098                events.push(GenericEvent::NotifyError(e));
3099            }
3100        }
3101
3102        events
3103    }
3104
3105    fn process_recv_v5_0_pubrel(
3106        &mut self,
3107        raw_packet: RawPacket,
3108    ) -> Vec<GenericEvent<PacketIdType>> {
3109        let mut events = Vec::new();
3110
3111        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3112            Ok((packet, _)) => {
3113                let packet_id = packet.packet_id();
3114                self.qos2_publish_handled.remove(&packet_id);
3115                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3116                    let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3117                        .packet_id(packet_id)
3118                        .build()
3119                        .unwrap();
3120                    events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3121                }
3122                events.extend(self.refresh_pingreq_recv());
3123                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3124            }
3125            Err(e) => {
3126                let disconnect = v5_0::Disconnect::builder()
3127                    .reason_code(DisconnectReasonCode::ProtocolError)
3128                    .build()
3129                    .unwrap();
3130                events.extend(self.process_send_v5_0_disconnect(disconnect));
3131                events.push(GenericEvent::NotifyError(e));
3132            }
3133        }
3134
3135        events
3136    }
3137
3138    fn process_recv_v3_1_1_pubcomp(
3139        &mut self,
3140        raw_packet: RawPacket,
3141    ) -> Vec<GenericEvent<PacketIdType>> {
3142        let mut events = Vec::new();
3143
3144        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3145            Ok((packet, _)) => {
3146                let packet_id = packet.packet_id();
3147                if self.pid_pubcomp.remove(&packet_id) {
3148                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3149                    if self.pid_man.is_used_id(packet_id) {
3150                        self.pid_man.release_id(packet_id);
3151                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3152                    }
3153                    self.qos2_publish_processing.remove(&packet_id);
3154                    events.extend(self.refresh_pingreq_recv());
3155                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3156                } else {
3157                    events.push(GenericEvent::RequestClose);
3158                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3159                }
3160            }
3161            Err(e) => {
3162                events.push(GenericEvent::RequestClose);
3163                events.push(GenericEvent::NotifyError(e));
3164            }
3165        }
3166
3167        events
3168    }
3169
3170    fn process_recv_v5_0_pubcomp(
3171        &mut self,
3172        raw_packet: RawPacket,
3173    ) -> Vec<GenericEvent<PacketIdType>> {
3174        let mut events = Vec::new();
3175
3176        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3177            Ok((packet, _)) => {
3178                let packet_id = packet.packet_id();
3179                if self.pid_pubcomp.remove(&packet_id) {
3180                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3181                    if self.pid_man.is_used_id(packet_id) {
3182                        self.pid_man.release_id(packet_id);
3183                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3184                    }
3185                    self.qos2_publish_processing.remove(&packet_id);
3186                    if self.publish_send_max.is_some() {
3187                        self.publish_send_count -= 1;
3188                    }
3189                    events.extend(self.refresh_pingreq_recv());
3190                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3191                } else {
3192                    let disconnect = v5_0::Disconnect::builder()
3193                        .reason_code(DisconnectReasonCode::ProtocolError)
3194                        .build()
3195                        .unwrap();
3196                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3197                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3198                }
3199            }
3200            Err(e) => {
3201                let disconnect = v5_0::Disconnect::builder()
3202                    .reason_code(DisconnectReasonCode::ProtocolError)
3203                    .build()
3204                    .unwrap();
3205                events.extend(self.process_send_v5_0_disconnect(disconnect));
3206                events.push(GenericEvent::NotifyError(e));
3207            }
3208        }
3209
3210        events
3211    }
3212
3213    fn process_recv_v3_1_1_subscribe(
3214        &mut self,
3215        raw_packet: RawPacket,
3216    ) -> Vec<GenericEvent<PacketIdType>> {
3217        let mut events = Vec::new();
3218
3219        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3220            Ok((packet, _)) => {
3221                events.extend(self.refresh_pingreq_recv());
3222                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3223            }
3224            Err(e) => {
3225                events.push(GenericEvent::RequestClose);
3226                events.push(GenericEvent::NotifyError(e));
3227            }
3228        }
3229
3230        events
3231    }
3232
3233    fn process_recv_v5_0_subscribe(
3234        &mut self,
3235        raw_packet: RawPacket,
3236    ) -> Vec<GenericEvent<PacketIdType>> {
3237        let mut events = Vec::new();
3238
3239        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3240            Ok((packet, _)) => {
3241                events.extend(self.refresh_pingreq_recv());
3242                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3243            }
3244            Err(e) => {
3245                let disconnect = v5_0::Disconnect::builder()
3246                    .reason_code(DisconnectReasonCode::ProtocolError)
3247                    .build()
3248                    .unwrap();
3249                events.extend(self.process_send_v5_0_disconnect(disconnect));
3250                events.push(GenericEvent::NotifyError(e));
3251            }
3252        }
3253
3254        events
3255    }
3256
3257    fn process_recv_v3_1_1_suback(
3258        &mut self,
3259        raw_packet: RawPacket,
3260    ) -> Vec<GenericEvent<PacketIdType>> {
3261        let mut events = Vec::new();
3262
3263        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3264            Ok((packet, _)) => {
3265                let packet_id = packet.packet_id();
3266                if self.pid_suback.remove(&packet_id) {
3267                    if self.pid_man.is_used_id(packet_id) {
3268                        self.pid_man.release_id(packet_id);
3269                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3270                    }
3271                    events.extend(self.refresh_pingreq_recv());
3272                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3273                } else {
3274                    events.push(GenericEvent::RequestClose);
3275                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3276                }
3277            }
3278            Err(e) => {
3279                events.push(GenericEvent::RequestClose);
3280                events.push(GenericEvent::NotifyError(e));
3281            }
3282        }
3283
3284        events
3285    }
3286
3287    fn process_recv_v5_0_suback(
3288        &mut self,
3289        raw_packet: RawPacket,
3290    ) -> Vec<GenericEvent<PacketIdType>> {
3291        let mut events = Vec::new();
3292
3293        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3294            Ok((packet, _)) => {
3295                let packet_id = packet.packet_id();
3296                if self.pid_suback.remove(&packet_id) {
3297                    if self.pid_man.is_used_id(packet_id) {
3298                        self.pid_man.release_id(packet_id);
3299                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3300                    }
3301                    events.extend(self.refresh_pingreq_recv());
3302                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3303                } else {
3304                    let disconnect = v5_0::Disconnect::builder()
3305                        .reason_code(DisconnectReasonCode::ProtocolError)
3306                        .build()
3307                        .unwrap();
3308                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3309                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3310                }
3311            }
3312            Err(e) => {
3313                let disconnect = v5_0::Disconnect::builder()
3314                    .reason_code(DisconnectReasonCode::ProtocolError)
3315                    .build()
3316                    .unwrap();
3317                events.extend(self.process_send_v5_0_disconnect(disconnect));
3318                events.push(GenericEvent::NotifyError(e));
3319            }
3320        }
3321
3322        events
3323    }
3324
3325    fn process_recv_v3_1_1_unsubscribe(
3326        &mut self,
3327        raw_packet: RawPacket,
3328    ) -> Vec<GenericEvent<PacketIdType>> {
3329        let mut events = Vec::new();
3330
3331        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3332            Ok((packet, _)) => {
3333                events.extend(self.refresh_pingreq_recv());
3334                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3335            }
3336            Err(e) => {
3337                events.push(GenericEvent::RequestClose);
3338                events.push(GenericEvent::NotifyError(e));
3339            }
3340        }
3341
3342        events
3343    }
3344
3345    fn process_recv_v5_0_unsubscribe(
3346        &mut self,
3347        raw_packet: RawPacket,
3348    ) -> Vec<GenericEvent<PacketIdType>> {
3349        let mut events = Vec::new();
3350
3351        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3352            Ok((packet, _)) => {
3353                events.extend(self.refresh_pingreq_recv());
3354                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3355            }
3356            Err(e) => {
3357                let disconnect = v5_0::Disconnect::builder()
3358                    .reason_code(DisconnectReasonCode::ProtocolError)
3359                    .build()
3360                    .unwrap();
3361                events.extend(self.process_send_v5_0_disconnect(disconnect));
3362                events.push(GenericEvent::NotifyError(e));
3363            }
3364        }
3365
3366        events
3367    }
3368
3369    fn process_recv_v3_1_1_unsuback(
3370        &mut self,
3371        raw_packet: RawPacket,
3372    ) -> Vec<GenericEvent<PacketIdType>> {
3373        let mut events = Vec::new();
3374
3375        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3376            Ok((packet, _)) => {
3377                let packet_id = packet.packet_id();
3378                if self.pid_unsuback.remove(&packet_id) {
3379                    if self.pid_man.is_used_id(packet_id) {
3380                        self.pid_man.release_id(packet_id);
3381                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3382                    }
3383                    events.extend(self.refresh_pingreq_recv());
3384                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3385                } else {
3386                    events.push(GenericEvent::RequestClose);
3387                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3388                }
3389            }
3390            Err(e) => {
3391                events.push(GenericEvent::RequestClose);
3392                events.push(GenericEvent::NotifyError(e));
3393            }
3394        }
3395
3396        events
3397    }
3398
3399    fn process_recv_v5_0_unsuback(
3400        &mut self,
3401        raw_packet: RawPacket,
3402    ) -> Vec<GenericEvent<PacketIdType>> {
3403        let mut events = Vec::new();
3404
3405        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3406            Ok((packet, _)) => {
3407                let packet_id = packet.packet_id();
3408                if self.pid_unsuback.remove(&packet_id) {
3409                    if self.pid_man.is_used_id(packet_id) {
3410                        self.pid_man.release_id(packet_id);
3411                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3412                    }
3413                    events.extend(self.refresh_pingreq_recv());
3414                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3415                } else {
3416                    let disconnect = v5_0::Disconnect::builder()
3417                        .reason_code(DisconnectReasonCode::ProtocolError)
3418                        .build()
3419                        .unwrap();
3420                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3421                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3422                }
3423            }
3424            Err(e) => {
3425                let disconnect = v5_0::Disconnect::builder()
3426                    .reason_code(DisconnectReasonCode::ProtocolError)
3427                    .build()
3428                    .unwrap();
3429                events.extend(self.process_send_v5_0_disconnect(disconnect));
3430                events.push(GenericEvent::NotifyError(e));
3431            }
3432        }
3433
3434        events
3435    }
3436
3437    fn process_recv_v3_1_1_pingreq(
3438        &mut self,
3439        raw_packet: RawPacket,
3440    ) -> Vec<GenericEvent<PacketIdType>> {
3441        let mut events = Vec::new();
3442
3443        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3444            Ok((packet, _)) => {
3445                if (Role::IS_SERVER || Role::IS_ANY)
3446                    && !self.is_client
3447                    && self.auto_ping_response
3448                    && self.status == ConnectionStatus::Connected
3449                {
3450                    let pingresp = v3_1_1::Pingresp::new();
3451                    events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3452                }
3453                events.extend(self.refresh_pingreq_recv());
3454                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3455            }
3456            Err(e) => {
3457                events.push(GenericEvent::RequestClose);
3458                events.push(GenericEvent::NotifyError(e));
3459            }
3460        }
3461
3462        events
3463    }
3464
3465    fn process_recv_v5_0_pingreq(
3466        &mut self,
3467        raw_packet: RawPacket,
3468    ) -> Vec<GenericEvent<PacketIdType>> {
3469        let mut events = Vec::new();
3470
3471        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3472            Ok((packet, _)) => {
3473                if (Role::IS_SERVER || Role::IS_ANY)
3474                    && !self.is_client
3475                    && self.auto_ping_response
3476                    && self.status == ConnectionStatus::Connected
3477                {
3478                    let pingresp = v5_0::Pingresp::new();
3479                    events.extend(self.process_send_v5_0_pingresp(pingresp));
3480                }
3481                events.extend(self.refresh_pingreq_recv());
3482                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3483            }
3484            Err(e) => {
3485                let disconnect = v5_0::Disconnect::builder()
3486                    .reason_code(DisconnectReasonCode::ProtocolError)
3487                    .build()
3488                    .unwrap();
3489                events.extend(self.process_send_v5_0_disconnect(disconnect));
3490                events.push(GenericEvent::NotifyError(e));
3491            }
3492        }
3493
3494        events
3495    }
3496
3497    fn process_recv_v3_1_1_pingresp(
3498        &mut self,
3499        raw_packet: RawPacket,
3500    ) -> Vec<GenericEvent<PacketIdType>> {
3501        let mut events = Vec::new();
3502
3503        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3504            Ok((packet, _)) => {
3505                self.pingresp_recv_set = false;
3506                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3507                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3508            }
3509            Err(e) => {
3510                events.push(GenericEvent::RequestClose);
3511                events.push(GenericEvent::NotifyError(e));
3512            }
3513        }
3514
3515        events
3516    }
3517
3518    fn process_recv_v5_0_pingresp(
3519        &mut self,
3520        raw_packet: RawPacket,
3521    ) -> Vec<GenericEvent<PacketIdType>> {
3522        let mut events = Vec::new();
3523
3524        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3525            Ok((packet, _)) => {
3526                self.pingresp_recv_set = false;
3527                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3528                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3529            }
3530            Err(e) => {
3531                let disconnect = v5_0::Disconnect::builder()
3532                    .reason_code(DisconnectReasonCode::ProtocolError)
3533                    .build()
3534                    .unwrap();
3535                events.extend(self.process_send_v5_0_disconnect(disconnect));
3536                events.push(GenericEvent::NotifyError(e));
3537            }
3538        }
3539
3540        events
3541    }
3542
3543    fn process_recv_v3_1_1_disconnect(
3544        &mut self,
3545        raw_packet: RawPacket,
3546    ) -> Vec<GenericEvent<PacketIdType>> {
3547        let mut events = Vec::new();
3548
3549        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3550            Ok((packet, _)) => {
3551                self.cancel_timers(&mut events);
3552                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3553            }
3554            Err(e) => {
3555                events.push(GenericEvent::RequestClose);
3556                events.push(GenericEvent::NotifyError(e));
3557            }
3558        }
3559
3560        events
3561    }
3562
3563    fn process_recv_v5_0_disconnect(
3564        &mut self,
3565        raw_packet: RawPacket,
3566    ) -> Vec<GenericEvent<PacketIdType>> {
3567        let mut events = Vec::new();
3568
3569        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3570            Ok((packet, _)) => {
3571                self.cancel_timers(&mut events);
3572                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3573            }
3574            Err(e) => {
3575                let disconnect = v5_0::Disconnect::builder()
3576                    .reason_code(DisconnectReasonCode::ProtocolError)
3577                    .build()
3578                    .unwrap();
3579                events.extend(self.process_send_v5_0_disconnect(disconnect));
3580                events.push(GenericEvent::NotifyError(e));
3581            }
3582        }
3583
3584        events
3585    }
3586
3587    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3588        let mut events = Vec::new();
3589
3590        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3591            Ok((packet, _)) => {
3592                events.extend(self.refresh_pingreq_recv());
3593                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3594            }
3595            Err(e) => {
3596                let disconnect = v5_0::Disconnect::builder()
3597                    .reason_code(DisconnectReasonCode::ProtocolError)
3598                    .build()
3599                    .unwrap();
3600                events.extend(self.process_send_v5_0_disconnect(disconnect));
3601                events.push(GenericEvent::NotifyError(e));
3602            }
3603        }
3604
3605        events
3606    }
3607
3608    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3609        let mut events = Vec::new();
3610        if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3611            if self.status == ConnectionStatus::Connecting
3612                || self.status == ConnectionStatus::Connected
3613            {
3614                self.pingreq_recv_set = true;
3615                events.push(GenericEvent::RequestTimerReset {
3616                    kind: TimerKind::PingreqRecv,
3617                    duration_ms: timeout_ms,
3618                });
3619            } else {
3620                self.pingreq_recv_set = false;
3621                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3622            }
3623        }
3624
3625        events
3626    }
3627
3628    /// Cancel timers and collect events instead of calling handlers
3629    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3630        if self.pingreq_send_set {
3631            self.pingreq_send_set = false;
3632            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3633        }
3634        if self.pingreq_recv_set {
3635            self.pingreq_recv_set = false;
3636            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3637        }
3638        if self.pingresp_recv_set {
3639            self.pingresp_recv_set = false;
3640            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3641        }
3642    }
3643
3644    /// Helper function to extract TopicAlias from properties
3645    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3646        for prop in props {
3647            if let Property::TopicAlias(ta) = prop {
3648                return Some(ta.val());
3649            }
3650        }
3651        None
3652    }
3653
3654    #[allow(dead_code)]
3655    fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3656        self.pid_man.is_used_id(packet_id)
3657    }
3658}
3659
3660// traits
3661
3662pub trait RecvBehavior<Role, PacketIdType>
3663where
3664    PacketIdType: IsPacketId,
3665{
3666    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3667}
3668
3669// RecvBehavior implementations
3670impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3671    for GenericConnection<role::Client, PacketIdType>
3672where
3673    PacketIdType: IsPacketId,
3674{
3675    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3676        self.recv(data)
3677    }
3678}
3679
3680impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3681    for GenericConnection<role::Server, PacketIdType>
3682where
3683    PacketIdType: IsPacketId,
3684{
3685    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3686        self.recv(data)
3687    }
3688}
3689
3690impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3691    for GenericConnection<role::Any, PacketIdType>
3692where
3693    PacketIdType: IsPacketId,
3694{
3695    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3696        self.recv(data)
3697    }
3698}
3699
3700// tests
3701
3702#[cfg(test)]
3703mod tests {
3704    use super::*;
3705    use crate::mqtt::connection::version::Version;
3706    use crate::mqtt::packet::TopicAliasSend;
3707    use crate::mqtt::role;
3708
3709    #[test]
3710    fn test_initialize_client_mode() {
3711        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3712
3713        // Initialize in client mode
3714        connection.initialize(true);
3715
3716        // Verify client mode is set
3717        assert!(connection.is_client);
3718        assert_eq!(connection.publish_send_count, 0);
3719        assert!(connection.publish_send_max.is_none());
3720        assert!(connection.publish_recv_max.is_none());
3721        assert!(!connection.need_store);
3722    }
3723
3724    #[test]
3725    fn test_initialize_server_mode() {
3726        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3727
3728        // Initialize in server mode
3729        connection.initialize(false);
3730
3731        // Verify server mode is set
3732        assert!(!connection.is_client);
3733        assert_eq!(connection.publish_send_count, 0);
3734        assert!(connection.publish_send_max.is_none());
3735        assert!(connection.publish_recv_max.is_none());
3736        assert!(!connection.need_store);
3737    }
3738
3739    #[test]
3740    fn test_validate_topic_alias_no_topic_alias_send() {
3741        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3742
3743        // Should return None when topic_alias_send is not configured
3744        let result = connection.validate_topic_alias(Some(1));
3745        assert!(result.is_none());
3746    }
3747
3748    #[test]
3749    fn test_validate_topic_alias_none_input() {
3750        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3751
3752        // Should return None when no topic alias is provided
3753        let result = connection.validate_topic_alias(None);
3754        assert!(result.is_none());
3755    }
3756
3757    #[test]
3758    fn test_validate_topic_alias_range_no_topic_alias_send() {
3759        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3760
3761        // Should return false when topic_alias_send is not configured
3762        let result = connection.validate_topic_alias_range(1);
3763        assert!(!result);
3764    }
3765
3766    #[test]
3767    fn test_validate_topic_alias_range_zero() {
3768        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3769
3770        // Set up topic alias send with max 10
3771        let topic_alias_send = TopicAliasSend::new(10);
3772        connection.topic_alias_send = Some(topic_alias_send);
3773
3774        // Should return false for alias 0
3775        let result = connection.validate_topic_alias_range(0);
3776        assert!(!result);
3777    }
3778
3779    #[test]
3780    fn test_validate_topic_alias_range_over_max() {
3781        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3782
3783        // Set up topic alias send with max 5
3784        let topic_alias_send = TopicAliasSend::new(5);
3785        connection.topic_alias_send = Some(topic_alias_send);
3786
3787        // Should return false for alias > max
3788        let result = connection.validate_topic_alias_range(6);
3789        assert!(!result);
3790    }
3791
3792    #[test]
3793    fn test_validate_topic_alias_range_valid() {
3794        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3795
3796        // Set up topic alias send with max 10
3797        let topic_alias_send = TopicAliasSend::new(10);
3798        connection.topic_alias_send = Some(topic_alias_send);
3799
3800        // Should return true for valid aliases
3801        assert!(connection.validate_topic_alias_range(1));
3802        assert!(connection.validate_topic_alias_range(5));
3803        assert!(connection.validate_topic_alias_range(10));
3804    }
3805
3806    #[test]
3807    fn test_validate_topic_alias_with_registered_alias() {
3808        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3809
3810        // Set up topic alias send with max 10
3811        let mut topic_alias_send = TopicAliasSend::new(10);
3812        topic_alias_send.insert_or_update("test/topic", 5);
3813        connection.topic_alias_send = Some(topic_alias_send);
3814
3815        // Should return the topic name for registered alias
3816        let result = connection.validate_topic_alias(Some(5));
3817        assert_eq!(result, Some("test/topic".to_string()));
3818    }
3819
3820    #[test]
3821    fn test_validate_topic_alias_unregistered_alias() {
3822        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3823
3824        // Set up topic alias send with max 10 but don't register any aliases
3825        let topic_alias_send = TopicAliasSend::new(10);
3826        connection.topic_alias_send = Some(topic_alias_send);
3827
3828        // Should return None for unregistered alias
3829        let result = connection.validate_topic_alias(Some(5));
3830        assert!(result.is_none());
3831    }
3832
3833    #[test]
3834    fn test_validate_maximum_packet_size_within_limit() {
3835        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3836
3837        // Default maximum_packet_size_send is u32::MAX
3838        let result = connection.validate_maximum_packet_size_send(1000);
3839        assert!(result);
3840    }
3841
3842    #[test]
3843    fn test_validate_maximum_packet_size_at_limit() {
3844        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3845
3846        // Set a specific limit
3847        connection.maximum_packet_size_send = 1000;
3848
3849        // Should return true for size equal to limit
3850        let result = connection.validate_maximum_packet_size_send(1000);
3851        assert!(result);
3852    }
3853
3854    #[test]
3855    fn test_validate_maximum_packet_size_over_limit() {
3856        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3857
3858        // Set a specific limit
3859        connection.maximum_packet_size_send = 1000;
3860
3861        // Should return false for size over limit
3862        let result = connection.validate_maximum_packet_size_send(1001);
3863        assert!(!result);
3864    }
3865
3866    #[test]
3867    fn test_validate_maximum_packet_size_zero_limit() {
3868        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3869
3870        // Set limit to 0
3871        connection.maximum_packet_size_send = 0;
3872
3873        // Should return false for any non-zero size
3874        let result = connection.validate_maximum_packet_size_send(1);
3875        assert!(!result);
3876
3877        // Should return true for zero size
3878        let result = connection.validate_maximum_packet_size_send(0);
3879        assert!(result);
3880    }
3881
3882    #[test]
3883    fn test_initialize_clears_state() {
3884        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3885
3886        // Set up some state that should be cleared
3887        connection.publish_send_count = 5;
3888        connection.need_store = true;
3889        connection.pid_suback.insert(123);
3890        connection.pid_unsuback.insert(456);
3891
3892        // Initialize should clear state
3893        connection.initialize(true);
3894
3895        // Verify state is cleared
3896        assert_eq!(connection.publish_send_count, 0);
3897        assert!(!connection.need_store);
3898        assert!(connection.pid_suback.is_empty());
3899        assert!(connection.pid_unsuback.is_empty());
3900        assert!(connection.is_client);
3901    }
3902
3903    #[test]
3904    fn test_remaining_length_to_total_size() {
3905        // Test 1-byte remaining length encoding (0-127)
3906        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3907        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3908
3909        // Test 2-byte remaining length encoding (128-16383)
3910        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3911        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3912
3913        // Test 3-byte remaining length encoding (16384-2097151)
3914        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3915        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3916
3917        // Test 4-byte remaining length encoding (2097152-268435455)
3918        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3919        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3920    }
3921}