mqtt_protocol_core/mqtt/connection/
core.rs

1/**
2 * MIT License
3 *
4 * Copyright (c) 2025 Takatoshi Kondo
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24use std::collections::HashSet;
25use std::io::Cursor;
26use std::marker::PhantomData;
27
28use crate::mqtt::connection::event::{GenericEvent, TimerKind};
29use crate::mqtt::connection::GenericStore;
30
31use serde::Serialize;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
34enum ConnectionStatus {
35    #[serde(rename = "disconnected")]
36    Disconnected,
37    #[serde(rename = "connecting")]
38    Connecting,
39    #[serde(rename = "connected")]
40    Connected,
41}
42use crate::mqtt::connection::packet_builder::{
43    PacketBuildResult, PacketBuilder, PacketData, RawPacket,
44};
45use crate::mqtt::connection::packet_id_manager::PacketIdManager;
46use crate::mqtt::connection::role;
47use crate::mqtt::connection::role::RoleType;
48use crate::mqtt::connection::sendable::Sendable;
49use crate::mqtt::connection::version::*;
50use crate::mqtt::packet::v3_1_1;
51use crate::mqtt::packet::v5_0;
52use crate::mqtt::packet::GenericPacket;
53use crate::mqtt::packet::GenericStorePacket;
54use crate::mqtt::packet::IsPacketId;
55use crate::mqtt::packet::Qos;
56use crate::mqtt::packet::ResponsePacket;
57use crate::mqtt::packet::{Property, TopicAliasRecv, TopicAliasSend};
58use crate::mqtt::prelude::GenericPacketTrait;
59use crate::mqtt::result_code::{
60    ConnectReasonCode, ConnectReturnCode, DisconnectReasonCode, MqttError, PubrecReasonCode,
61};
62
63/// MQTT protocol maximum packet size limit
64/// 1 (fixed header) + 4 (remaining length) + 128^4 (maximum remaining length value)
65const MQTT_PACKET_SIZE_NO_LIMIT: u32 = 1 + 4 + 128 * 128 * 128 * 128;
66
67/// Calculate total packet size from remaining length
68///
69/// The total packet size consists of:
70/// - 1 byte for the fixed header
71/// - 1-4 bytes for the remaining length encoding
72/// - The remaining length value itself
73fn remaining_length_to_total_size(remaining_length: u32) -> u32 {
74    let remaining_length_bytes = if remaining_length < 128 {
75        1
76    } else if remaining_length < 16384 {
77        2
78    } else if remaining_length < 2097152 {
79        3
80    } else {
81        4
82    };
83
84    1 + remaining_length_bytes + remaining_length
85}
86
87/// Type alias for Event with u16 packet ID (most common case)
88///
89/// This is a convenience type alias that most applications will use.
90/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type.
91pub type Event = GenericEvent<u16>;
92
93/// Generic MQTT Connection - Core Sans-I/O MQTT protocol implementation
94///
95/// This struct represents the core MQTT protocol logic in a Sans-I/O (synchronous I/O-independent) design.
96/// It handles MQTT packet processing, state management, and protocol compliance without performing
97/// any actual network I/O operations. Instead, it returns events that the application must handle.
98///
99/// # Type Parameters
100///
101/// * `Role` - The connection role (Client or Server), determining allowed operations
102/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
103///
104/// # Key Features
105///
106/// - **Sans-I/O Design**: No network I/O is performed directly; events are returned for external handling
107/// - **Protocol Compliance**: Implements MQTT v3.1.1 and v5.0 specifications
108/// - **State Management**: Tracks connection state, packet IDs, and protocol flows
109/// - **Configurable Behavior**: Supports various configuration options for different use cases
110/// - **Generic Packet ID Support**: Can use u16 or u32 packet IDs for different deployment scenarios
111///
112/// # Usage
113///
114/// ```ignore
115/// use mqtt_protocol_core::mqtt;
116///
117/// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
118///
119/// // Send a packet
120/// let events = connection.send(publish_packet);
121/// for event in events {
122///     match event {
123///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
124///             // Send packet over network
125///         },
126///         // Handle other events...
127///     }
128/// }
129///
130/// // Receive data
131/// let mut cursor = std::io::Cursor::new(received_data);
132/// let events = connection.recv(&mut cursor);
133/// // Process events...
134/// ```
135pub struct GenericConnection<Role, PacketIdType>
136where
137    Role: RoleType,
138    PacketIdType: IsPacketId,
139{
140    _marker: PhantomData<Role>,
141
142    protocol_version: Version,
143
144    pid_man: PacketIdManager<PacketIdType>,
145    pid_suback: HashSet<PacketIdType>,
146    pid_unsuback: HashSet<PacketIdType>,
147    pid_puback: HashSet<PacketIdType>,
148    pid_pubrec: HashSet<PacketIdType>,
149    pid_pubcomp: HashSet<PacketIdType>,
150
151    need_store: bool,
152    // Store for retransmission packets
153    store: GenericStore<PacketIdType>,
154
155    offline_publish: bool,
156    auto_pub_response: bool,
157    auto_ping_response: bool,
158
159    // Auto map topic alias for sending
160    auto_map_topic_alias_send: bool,
161    // Auto replace topic alias for sending
162    auto_replace_topic_alias_send: bool,
163    // Topic alias management for receiving
164    topic_alias_recv: Option<TopicAliasRecv>,
165    // Topic alias management for sending
166    topic_alias_send: Option<TopicAliasSend>,
167
168    publish_send_max: Option<u16>,
169    // Maximum number of concurrent PUBLISH packets for receiving
170    publish_recv_max: Option<u16>,
171    // Maximum number of concurrent PUBLISH packets for sending
172    // Current count of PUBLISH packets being sent
173    publish_send_count: u16,
174
175    // Set of received PUBLISH packets (for flow control)
176    publish_recv: HashSet<PacketIdType>,
177
178    // Maximum packet size for sending
179    maximum_packet_size_send: u32,
180    // Maximum packet size for receiving
181    maximum_packet_size_recv: u32,
182
183    // Connection state
184    status: ConnectionStatus,
185
186    // PINGREQ send interval in milliseconds
187    pingreq_send_interval_ms: Option<u64>,
188    // PINGREQ receive timeout in milliseconds
189    pingreq_recv_timeout_ms: Option<u64>,
190    // PINGRESP receive timeout in milliseconds
191    pingresp_recv_timeout_ms: Option<u64>,
192
193    // QoS2 PUBLISH packet handling state (for duplicate detection)
194    qos2_publish_handled: HashSet<PacketIdType>,
195    // QoS2 PUBLISH packet processing state
196    qos2_publish_processing: HashSet<PacketIdType>,
197
198    // Timer state flags
199    pingreq_send_set: bool,
200    pingreq_recv_set: bool,
201    pingresp_recv_set: bool,
202
203    packet_builder: PacketBuilder,
204    // Client/Server mode flag
205    is_client: bool,
206}
207
208/// Type alias for Connection with u16 packet ID (standard case)
209///
210/// This is the standard Connection type that most applications will use.
211/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
212/// supporting values from 1 to 65535.
213///
214/// For extended scenarios where larger packet ID ranges are needed
215/// (such as broker clusters), use `GenericConnection<Role, u32>` directly.
216///
217/// # Type Parameters
218///
219/// * `Role` - The connection role (typically `role::Client` or `role::Server`)
220pub type Connection<Role> = GenericConnection<Role, u16>;
221
222impl<Role, PacketIdType> GenericConnection<Role, PacketIdType>
223where
224    Role: RoleType,
225    PacketIdType: IsPacketId,
226{
227    /// Create a new MQTT connection instance
228    ///
229    /// Initializes a new MQTT connection with the specified protocol version.
230    /// The connection starts in a disconnected state and must be activated through
231    /// the connection handshake process (CONNECT/CONNACK).
232    ///
233    /// # Parameters
234    ///
235    /// * `version` - The MQTT protocol version to use (V3_1_1 or V5_0)
236    ///
237    /// # Returns
238    ///
239    /// A new `GenericConnection` instance ready for use
240    ///
241    /// # Examples
242    ///
243    /// ```ignore
244    /// use mqtt_protocol_core::mqtt;
245    ///
246    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
247    /// let mut server = mqtt::connection::Connection::<mqtt::connection::role::Server>::new(mqtt::version::Version::V3_1_1);
248    /// ```
249    pub fn new(version: Version) -> Self {
250        Self {
251            _marker: PhantomData,
252            protocol_version: version,
253            pid_man: PacketIdManager::new(),
254            pid_suback: HashSet::new(),
255            pid_unsuback: HashSet::new(),
256            pid_puback: HashSet::new(),
257            pid_pubrec: HashSet::new(),
258            pid_pubcomp: HashSet::new(),
259            need_store: false,
260            store: GenericStore::new(),
261            offline_publish: false,
262            auto_pub_response: false,
263            auto_ping_response: false,
264            auto_map_topic_alias_send: false,
265            auto_replace_topic_alias_send: false,
266            topic_alias_recv: None,
267            topic_alias_send: None,
268            publish_send_max: None,
269            publish_recv_max: None,
270            publish_send_count: 0,
271            publish_recv: HashSet::new(),
272            maximum_packet_size_send: MQTT_PACKET_SIZE_NO_LIMIT,
273            maximum_packet_size_recv: MQTT_PACKET_SIZE_NO_LIMIT,
274            status: ConnectionStatus::Disconnected,
275            pingreq_send_interval_ms: None,
276            pingreq_recv_timeout_ms: None,
277            pingresp_recv_timeout_ms: None,
278            qos2_publish_handled: HashSet::new(),
279            qos2_publish_processing: HashSet::new(),
280            pingreq_send_set: false,
281            pingreq_recv_set: false,
282            pingresp_recv_set: false,
283            packet_builder: PacketBuilder::new(),
284            is_client: false,
285        }
286    }
287
288    // public
289
290    /// Send MQTT packet with compile-time role checking (experimental)
291    ///
292    /// This method provides compile-time verification that the packet being sent
293    /// is allowed for the current connection role (Client or Server). It only works
294    /// when the `Role` type parameter is concrete (not generic).
295    ///
296    /// This is an experimental API that may be subject to change. For general use,
297    /// consider using the `send()` method instead.
298    ///
299    /// # Type Parameters
300    ///
301    /// * `T` - The packet type that must implement `Sendable<Role, PacketIdType>`
302    ///
303    /// # Parameters
304    ///
305    /// * `packet` - The MQTT packet to send
306    ///
307    /// # Returns
308    ///
309    /// A vector of events that the application must process
310    ///
311    /// # Compile-time Safety
312    ///
313    /// If the packet type is not allowed for the current role, this will result
314    /// in a compile-time error, preventing protocol violations at development time.
315    ///
316    /// # Examples
317    ///
318    /// ```ignore
319    /// use mqtt_protocol_core::mqtt;
320    ///
321    /// // This works for concrete roles
322    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
323    /// let events = client.checked_send(connect_packet); // OK - clients can send CONNECT
324    ///
325    /// // This would cause a compile error
326    /// // let events = client.checked_send(connack_packet); // Error - clients cannot send CONNACK
327    /// ```
328    pub fn checked_send<T>(&mut self, packet: T) -> Vec<GenericEvent<PacketIdType>>
329    where
330        T: Sendable<Role, PacketIdType>,
331    {
332        // dispatch concrete packet or generic packet
333        packet.dispatch_send(self)
334    }
335
336    /// Send MQTT packet with runtime role validation
337    ///
338    /// This is the primary method for sending MQTT packets. It accepts any `GenericPacket`
339    /// and performs runtime validation to ensure the packet is allowed for the current
340    /// connection role. This provides flexibility when the exact packet type is not known
341    /// at compile time.
342    ///
343    /// # Parameters
344    ///
345    /// * `packet` - The MQTT packet to send
346    ///
347    /// # Returns
348    ///
349    /// A vector of events that the application must process. If the packet is not allowed
350    /// for the current role, a `NotifyError` event will be included.
351    ///
352    /// # Validation
353    ///
354    /// The method validates that:
355    /// - The packet type is allowed for the connection role (Client vs Server)
356    /// - Protocol version compatibility
357    /// - Connection state requirements
358    /// - Packet ID management for QoS > 0 packets
359    ///
360    /// # Examples
361    ///
362    /// ```ignore
363    /// use mqtt_protocol_core::mqtt;
364    ///
365    /// let mut client = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
366    /// let events = client.send(mqtt::packet::GenericPacket::V5_0(mqtt::packet::v5_0::Packet::Connect(connect_packet)));
367    ///
368    /// for event in events {
369    ///     match event {
370    ///         mqtt::connection::Event::RequestSendPacket { packet, .. } => {
371    ///             // Send packet over network
372    ///         },
373    ///         mqtt::connection::Event::NotifyError(error) => {
374    ///             // Handle validation errors
375    ///         },
376    ///         // Handle other events...
377    ///     }
378    /// }
379    /// ```
380    pub fn send(&mut self, packet: GenericPacket<PacketIdType>) -> Vec<GenericEvent<PacketIdType>> {
381        use std::any::TypeId;
382
383        let role_id = TypeId::of::<Role>();
384        let client_id = TypeId::of::<role::Client>();
385        let server_id = TypeId::of::<role::Server>();
386        let any_id = TypeId::of::<role::Any>();
387
388        // Check version compatibility between connection and packet
389        let packet_version = packet.protocol_version();
390
391        // Return error if versions don't match
392        if self.protocol_version != packet_version {
393            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
394        }
395
396        match packet {
397            // CONNECT - Client/Any can send
398            GenericPacket::V3_1_1Connect(p) => {
399                if role_id == client_id || role_id == any_id {
400                    self.process_send_v3_1_1_connect(p)
401                } else {
402                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
403                }
404            }
405            GenericPacket::V5_0Connect(p) => {
406                if role_id == client_id || role_id == any_id {
407                    self.process_send_v5_0_connect(p)
408                } else {
409                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
410                }
411            }
412            // CONNACK - Server/Any can send
413            GenericPacket::V3_1_1Connack(p) => {
414                if role_id == server_id || role_id == any_id {
415                    self.process_send_v3_1_1_connack(p)
416                } else {
417                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
418                }
419            }
420            GenericPacket::V5_0Connack(p) => {
421                if role_id == server_id || role_id == any_id {
422                    self.process_send_v5_0_connack(p)
423                } else {
424                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
425                }
426            }
427            // PUBLISH - Any role can send
428            GenericPacket::V3_1_1Publish(p) => self.process_send_v3_1_1_publish(p),
429            GenericPacket::V5_0Publish(p) => self.process_send_v5_0_publish(p),
430            // PUBACK/PUBREC/PUBREL/PUBCOMP - Any role can send
431            GenericPacket::V3_1_1Puback(p) => self.process_send_v3_1_1_puback(p),
432            GenericPacket::V5_0Puback(p) => self.process_send_v5_0_puback(p),
433            GenericPacket::V3_1_1Pubrec(p) => self.process_send_v3_1_1_pubrec(p),
434            GenericPacket::V5_0Pubrec(p) => self.process_send_v5_0_pubrec(p),
435            GenericPacket::V3_1_1Pubrel(p) => self.process_send_v3_1_1_pubrel(p),
436            GenericPacket::V5_0Pubrel(p) => self.process_send_v5_0_pubrel(p),
437            GenericPacket::V3_1_1Pubcomp(p) => self.process_send_v3_1_1_pubcomp(p),
438            GenericPacket::V5_0Pubcomp(p) => self.process_send_v5_0_pubcomp(p),
439            // SUBSCRIBE/UNSUBSCRIBE - Client/Any can send
440            GenericPacket::V3_1_1Subscribe(p) => {
441                if role_id == client_id || role_id == any_id {
442                    self.process_send_v3_1_1_subscribe(p)
443                } else {
444                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
445                }
446            }
447            GenericPacket::V5_0Subscribe(p) => {
448                if role_id == client_id || role_id == any_id {
449                    self.process_send_v5_0_subscribe(p)
450                } else {
451                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
452                }
453            }
454            GenericPacket::V3_1_1Unsubscribe(p) => {
455                if role_id == client_id || role_id == any_id {
456                    self.process_send_v3_1_1_unsubscribe(p)
457                } else {
458                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
459                }
460            }
461            GenericPacket::V5_0Unsubscribe(p) => {
462                if role_id == client_id || role_id == any_id {
463                    self.process_send_v5_0_unsubscribe(p)
464                } else {
465                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
466                }
467            }
468            // SUBACK/UNSUBACK - Server/Any can send
469            GenericPacket::V3_1_1Suback(p) => {
470                if role_id == server_id || role_id == any_id {
471                    self.process_send_v3_1_1_suback(p)
472                } else {
473                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
474                }
475            }
476            GenericPacket::V5_0Suback(p) => {
477                if role_id == server_id || role_id == any_id {
478                    self.process_send_v5_0_suback(p)
479                } else {
480                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
481                }
482            }
483            GenericPacket::V3_1_1Unsuback(p) => {
484                if role_id == server_id || role_id == any_id {
485                    self.process_send_v3_1_1_unsuback(p)
486                } else {
487                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
488                }
489            }
490            GenericPacket::V5_0Unsuback(p) => {
491                if role_id == server_id || role_id == any_id {
492                    self.process_send_v5_0_unsuback(p)
493                } else {
494                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
495                }
496            }
497            // PINGREQ - Client/Any can send
498            GenericPacket::V3_1_1Pingreq(p) => {
499                if role_id == client_id || role_id == any_id {
500                    self.process_send_v3_1_1_pingreq(p)
501                } else {
502                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
503                }
504            }
505            GenericPacket::V5_0Pingreq(p) => {
506                if role_id == client_id || role_id == any_id {
507                    self.process_send_v5_0_pingreq(p)
508                } else {
509                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
510                }
511            }
512            // PINGRESP - Server/Any can send
513            GenericPacket::V3_1_1Pingresp(p) => {
514                if role_id == server_id || role_id == any_id {
515                    self.process_send_v3_1_1_pingresp(p)
516                } else {
517                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
518                }
519            }
520            GenericPacket::V5_0Pingresp(p) => {
521                if role_id == server_id || role_id == any_id {
522                    self.process_send_v5_0_pingresp(p)
523                } else {
524                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
525                }
526            }
527            // DISCONNECT(v3.1.1) - Client/Any role can send
528            GenericPacket::V3_1_1Disconnect(p) => {
529                if role_id == client_id || role_id == any_id {
530                    self.process_send_v3_1_1_disconnect(p)
531                } else {
532                    vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)]
533                }
534            }
535            // DISCONNECT(v5.0) - Any role can send
536            GenericPacket::V5_0Disconnect(p) => self.process_send_v5_0_disconnect(p),
537            // AUTH - Any role can send (v5.0 only)
538            GenericPacket::V5_0Auth(p) => self.process_send_v5_0_auth(p),
539        }
540    }
541
542    /// Receive and process incoming MQTT data
543    ///
544    /// This method processes raw bytes received from the network and attempts to
545    /// parse them into MQTT packets. It handles packet fragmentation and can
546    /// process multiple complete packets from a single data buffer.
547    ///
548    /// # Parameters
549    ///
550    /// * `data` - A cursor over the received data bytes. The cursor position will
551    ///   be advanced as data is consumed.
552    ///
553    /// # Returns
554    ///
555    /// A vector of events generated from processing the received data:
556    /// - `NotifyPacketReceived` for successfully parsed packets
557    /// - `NotifyError` for parsing errors or protocol violations
558    /// - Additional events based on packet processing (timers, responses, etc.)
559    ///
560    /// # Behavior
561    ///
562    /// - Handles partial packets (data will be buffered until complete)
563    /// - Processes multiple complete packets in sequence
564    /// - Validates packet structure and protocol compliance
565    /// - Updates internal connection state based on received packets
566    /// - Generates appropriate response events (ACKs, etc.)
567    ///
568    /// # Examples
569    ///
570    /// ```ignore
571    /// use mqtt_protocol_core::mqtt;
572    ///
573    /// let mut connection = mqtt::connection::Connection::<mqtt::connection::role::Client>::new(mqtt::version::Version::V5_0);
574    /// let received_data = [/* network data */];
575    /// let mut cursor = std::io::Cursor::new(&received_data[..]);
576    ///
577    /// let events = connection.recv(&mut cursor);
578    /// for event in events {
579    ///     match event {
580    ///         mqtt::connection::Event::NotifyPacketReceived(packet) => {
581    ///             // Process received packet
582    ///         },
583    ///         mqtt::connection::Event::NotifyError(error) => {
584    ///             // Handle parsing/protocol errors
585    ///         },
586    ///         // Handle other events...
587    ///     }
588    /// }
589    /// ```
590    pub fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
591        let mut events = Vec::new();
592
593        match self.packet_builder.feed(data) {
594            PacketBuildResult::Complete(raw_packet) => {
595                events.extend(self.process_recv_packet(raw_packet));
596            }
597            PacketBuildResult::Incomplete => {}
598            PacketBuildResult::Error(e) => {
599                self.cancel_timers(&mut events);
600                events.push(GenericEvent::RequestClose);
601                events.push(GenericEvent::NotifyError(e));
602            }
603        }
604
605        events
606    }
607
608    /// Notify that a timer has fired (Event-based API)
609    ///
610    /// This method should be called when the I/O layer detects that a timer has expired.
611    /// It handles the timer event appropriately and returns events that need to be processed.
612    /// Notify that a timer has fired
613    ///
614    /// This method should be called by the application when a previously requested
615    /// timer expires. The connection will take appropriate action based on the timer type.
616    ///
617    /// # Parameters
618    ///
619    /// * `kind` - The type of timer that fired
620    ///
621    /// # Returns
622    ///
623    /// Events generated from timer processing (e.g., sending PINGREQ, connection timeouts)
624    pub fn notify_timer_fired(&mut self, kind: TimerKind) -> Vec<GenericEvent<PacketIdType>> {
625        let mut events = Vec::new();
626
627        match kind {
628            TimerKind::PingreqSend => {
629                // Reset timer flag
630                self.pingreq_send_set = false;
631
632                // Send PINGREQ if connected
633                if self.status == ConnectionStatus::Connected {
634                    match self.protocol_version {
635                        Version::V3_1_1 => {
636                            if let Ok(pingreq) = v3_1_1::Pingreq::builder().build() {
637                                events.extend(self.process_send_v3_1_1_pingreq(pingreq));
638                            }
639                        }
640                        Version::V5_0 => {
641                            if let Ok(pingreq) = v5_0::Pingreq::builder().build() {
642                                events.extend(self.process_send_v5_0_pingreq(pingreq));
643                            }
644                        }
645                        Version::Undetermined => {
646                            unreachable!("Protocol version should be set before sending PINGREQ");
647                        }
648                    }
649                }
650            }
651            TimerKind::PingreqRecv => {
652                // Reset timer flag
653                self.pingreq_recv_set = false;
654
655                match self.protocol_version {
656                    Version::V3_1_1 => {
657                        // V3.1.1: Close connection
658                        events.push(GenericEvent::RequestClose);
659                    }
660                    Version::V5_0 => {
661                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
662                        if self.status == ConnectionStatus::Connected {
663                            if let Ok(disconnect) = v5_0::Disconnect::builder()
664                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
665                                .build()
666                            {
667                                events.extend(self.process_send_v5_0_disconnect(disconnect));
668                            }
669                        }
670                    }
671                    Version::Undetermined => {
672                        unreachable!("Protocol version should be set before receiving PINGREQ");
673                    }
674                }
675            }
676            TimerKind::PingrespRecv => {
677                // Reset timer flag
678                self.pingresp_recv_set = false;
679
680                match self.protocol_version {
681                    Version::V3_1_1 => {
682                        // V3.1.1: Close connection
683                        events.push(GenericEvent::RequestClose);
684                    }
685                    Version::V5_0 => {
686                        // V5.0: Send DISCONNECT with keep_alive_timeout if connected
687                        if self.status == ConnectionStatus::Connected {
688                            if let Ok(disconnect) = v5_0::Disconnect::builder()
689                                .reason_code(DisconnectReasonCode::KeepAliveTimeout)
690                                .build()
691                            {
692                                events.extend(self.process_send_v5_0_disconnect(disconnect));
693                            }
694                        }
695                    }
696                    Version::Undetermined => {
697                        unreachable!("Protocol version should be set before receiving PINGRESP");
698                    }
699                }
700            }
701        }
702
703        events
704    }
705
706    /// Notify that the connection has been closed by the I/O layer (Event-based API)
707    ///
708    /// This method should be called when the I/O layer detects that the socket has been closed.
709    /// It updates the internal state appropriately and returns events that need to be processed.
710    /// Notify that the underlying connection has been closed
711    ///
712    /// This method should be called when the network connection is closed,
713    /// either intentionally or due to network issues.
714    ///
715    /// # Returns
716    ///
717    /// Events generated from connection closure processing
718    pub fn notify_closed(&mut self) -> Vec<GenericEvent<PacketIdType>> {
719        let mut events = Vec::new();
720
721        // Reset packet size limits to MQTT protocol maximum
722        self.maximum_packet_size_send = MQTT_PACKET_SIZE_NO_LIMIT;
723        self.maximum_packet_size_recv = MQTT_PACKET_SIZE_NO_LIMIT;
724
725        // Set status to disconnected
726        self.status = ConnectionStatus::Disconnected;
727
728        // Clear topic alias management
729        self.topic_alias_send = None;
730        self.topic_alias_recv = None;
731
732        // Release packet IDs for SUBACK
733        for packet_id in self.pid_suback.drain() {
734            if self.pid_man.is_used_id(packet_id) {
735                self.pid_man.release_id(packet_id);
736                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
737            }
738        }
739
740        // Release packet IDs for UNSUBACK
741        for packet_id in self.pid_unsuback.drain() {
742            if self.pid_man.is_used_id(packet_id) {
743                self.pid_man.release_id(packet_id);
744                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
745            }
746        }
747
748        // If not storing session state, clear QoS2 states and release publish-related packet IDs
749        if !self.need_store {
750            self.qos2_publish_processing.clear();
751            self.qos2_publish_handled.clear();
752
753            // Release packet IDs for PUBACK
754            for packet_id in self.pid_puback.drain() {
755                if self.pid_man.is_used_id(packet_id) {
756                    self.pid_man.release_id(packet_id);
757                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
758                }
759            }
760
761            // Release packet IDs for PUBREC
762            for packet_id in self.pid_pubrec.drain() {
763                if self.pid_man.is_used_id(packet_id) {
764                    self.pid_man.release_id(packet_id);
765                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
766                }
767            }
768
769            // Release packet IDs for PUBCOMP
770            for packet_id in self.pid_pubcomp.drain() {
771                if self.pid_man.is_used_id(packet_id) {
772                    self.pid_man.release_id(packet_id);
773                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
774                }
775            }
776        }
777
778        // Cancel all timers
779        self.cancel_timers(&mut events);
780
781        events
782    }
783
784    /// Set the PINGREQ send interval
785    ///
786    /// Sets the interval for sending PINGREQ packets to maintain the connection alive.
787    /// When changed, this may generate timer-related events to update the ping schedule.
788    ///
789    /// # Parameters
790    ///
791    /// * `duration_ms` - The interval in milliseconds between PINGREQ packets
792    ///
793    /// # Returns
794    ///
795    /// Events generated from updating the ping interval
796    pub fn set_pingreq_send_interval(
797        &mut self,
798        duration_ms: u64,
799    ) -> Vec<GenericEvent<PacketIdType>> {
800        let mut events = Vec::new();
801
802        if duration_ms == 0 {
803            self.pingreq_send_interval_ms = None;
804            self.pingreq_send_set = false;
805            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
806        } else {
807            self.pingreq_send_interval_ms = Some(duration_ms);
808            self.pingreq_send_set = true;
809            events.push(GenericEvent::RequestTimerReset {
810                kind: TimerKind::PingreqSend,
811                duration_ms,
812            });
813        }
814
815        events
816    }
817
818    /// Get the remaining capacity for sending PUBLISH packets
819    ///
820    /// Returns the number of additional PUBLISH packets that can be sent
821    /// without exceeding the receive maximum limit.
822    ///
823    /// # Returns
824    ///
825    /// The remaining capacity for outgoing PUBLISH packets, or `None` if no limit is set
826    pub fn get_receive_maximum_vacancy_for_send(&self) -> Option<u16> {
827        // If publish_recv_max is set, return the remaining capacity
828        self.publish_send_max
829            .map(|max| max.saturating_sub(self.publish_send_count))
830    }
831
832    /// Enable or disable offline publishing
833    ///
834    /// When enabled, PUBLISH packets can be sent even when disconnected.
835    /// They will be queued and sent once the connection is established.
836    ///
837    /// # Parameters
838    ///
839    /// * `enable` - Whether to enable offline publishing
840    pub fn set_offline_publish(&mut self, enable: bool) {
841        self.offline_publish = enable;
842        if self.offline_publish {
843            self.need_store = true;
844        }
845    }
846
847    /// Enable or disable automatic PUBLISH response generation
848    ///
849    /// When enabled, appropriate response packets (PUBACK, PUBREC, PUBREL, and PUBCOMP.)
850    /// are automatically generated for received PUBLISH packets.
851    ///
852    /// # Parameters
853    ///
854    /// * `enable` - Whether to enable automatic responses
855    pub fn set_auto_pub_response(&mut self, enable: bool) {
856        self.auto_pub_response = enable;
857    }
858
859    /// Enable or disable automatic PING response generation
860    ///
861    /// When enabled, PINGRESP packets are automatically sent in response to PINGREQ.
862    ///
863    /// # Parameters
864    ///
865    /// * `enable` - Whether to enable automatic PING responses
866    pub fn set_auto_ping_response(&mut self, enable: bool) {
867        self.auto_ping_response = enable;
868    }
869
870    /// Enable or disable automatic topic alias mapping for outgoing packets
871    ///
872    /// When enabled, the connection will automatically map topics to aliases
873    /// for outgoing PUBLISH packets to reduce bandwidth usage. This includes:
874    /// - Applying existing registered topic aliases when available
875    /// - Allocating new topic aliases for unregistered topics
876    /// - Using LRU algorithm to overwrite the least recently used alias when all aliases are in use
877    ///
878    /// # Parameters
879    ///
880    /// * `enable` - Whether to enable automatic topic alias mapping
881    pub fn set_auto_map_topic_alias_send(&mut self, enable: bool) {
882        self.auto_map_topic_alias_send = enable;
883    }
884
885    /// Enable or disable automatic topic alias replacement for outgoing packets
886    ///
887    /// When enabled, the connection will automatically apply existing registered
888    /// topic aliases to outgoing PUBLISH packets when aliases are available.
889    /// This only uses previously registered aliases and does not allocate new ones.
890    ///
891    /// # Parameters
892    ///
893    /// * `enable` - Whether to enable automatic topic alias replacement
894    pub fn set_auto_replace_topic_alias_send(&mut self, enable: bool) {
895        self.auto_replace_topic_alias_send = enable;
896    }
897
898    /// Set PINGREQ receive timeout
899    pub fn set_pingresp_recv_timeout(&mut self, timeout_ms: Option<u64>) {
900        self.pingresp_recv_timeout_ms = timeout_ms;
901    }
902
903    /// Acquire a new packet ID for outgoing packets
904    ///
905    /// # Returns
906    ///
907    /// A unique packet ID, or an error if none are available
908    pub fn acquire_packet_id(&mut self) -> Result<PacketIdType, MqttError> {
909        self.pid_man.acquire_unique_id()
910    }
911
912    /// Register a packet ID as in use
913    ///
914    /// Manually registers a specific packet ID as being in use, preventing
915    /// it from being allocated by `acquire_packet_id()`.
916    ///
917    /// # Parameters
918    ///
919    /// * `packet_id` - The packet ID to register as in use
920    ///
921    /// # Returns
922    ///
923    /// `Ok(())` if successful, or an error if the packet ID is already in use
924    pub fn register_packet_id(&mut self, packet_id: PacketIdType) -> Result<(), MqttError> {
925        self.pid_man.register_id(packet_id)
926    }
927
928    /// Release packet ID (Event-based API)
929    /// Release a packet ID for reuse
930    ///
931    /// This method releases a packet ID, making it available for future use.
932    /// It also generates a `NotifyPacketIdReleased` event.
933    ///
934    /// # Parameters
935    ///
936    /// * `packet_id` - The packet ID to release
937    ///
938    /// # Returns
939    ///
940    /// Events generated from releasing the packet ID
941    pub fn release_packet_id(
942        &mut self,
943        packet_id: PacketIdType,
944    ) -> Vec<GenericEvent<PacketIdType>> {
945        let mut events = Vec::new();
946
947        if self.pid_man.is_used_id(packet_id) {
948            self.pid_man.release_id(packet_id);
949            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
950        }
951
952        events
953    }
954
955    /// Get the set of QoS 2 PUBLISH packet IDs that have been handled
956    ///
957    /// Returns a copy of the set containing packet IDs of QoS 2 PUBLISH packets
958    /// that have been successfully processed and handled.
959    ///
960    /// # Returns
961    ///
962    /// A `HashSet` containing packet IDs of handled QoS 2 PUBLISH packets
963    pub fn get_qos2_publish_handled(&self) -> HashSet<PacketIdType> {
964        self.qos2_publish_handled.clone()
965    }
966
967    /// Restore the set of QoS 2 PUBLISH packet IDs that have been handled
968    ///
969    /// Restores the internal state of handled QoS 2 PUBLISH packet IDs,
970    /// typically used when resuming a connection from persistent storage.
971    ///
972    /// # Parameters
973    ///
974    /// * `pids` - A `HashSet` containing packet IDs of previously handled QoS 2 PUBLISH packets
975    pub fn restore_qos2_publish_handled(&mut self, pids: HashSet<PacketIdType>) {
976        self.qos2_publish_handled = pids;
977    }
978
979    /// Restore previously stored packets
980    ///
981    /// This method restores packets that were previously stored for persistence,
982    /// typically called when resuming a session.
983    ///
984    /// # Parameters
985    ///
986    /// * `packets` - Vector of packets to restore
987    pub fn restore_packets(&mut self, packets: Vec<GenericStorePacket<PacketIdType>>) {
988        for packet in packets {
989            match &packet {
990                GenericStorePacket::V3_1_1Publish(p) => {
991                    // Add to appropriate QoS tracking set
992                    match p.qos() {
993                        Qos::AtLeastOnce => {
994                            self.pid_puback.insert(p.packet_id().unwrap());
995                        }
996                        Qos::ExactlyOnce => {
997                            self.pid_pubrec.insert(p.packet_id().unwrap());
998                        }
999                        _ => {
1000                            // QoS 0 shouldn't be in store, but handle gracefully
1001                            tracing::warn!("QoS 0 packet found in store, skipping");
1002                            continue;
1003                        }
1004                    }
1005                    // Register packet ID and add to store
1006                    let packet_id = p.packet_id().unwrap();
1007                    if self.pid_man.register_id(packet_id).is_ok() {
1008                        if let Err(e) = self.store.add(packet) {
1009                            tracing::error!("Failed to add packet to store: {:?}", e);
1010                        }
1011                    } else {
1012                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1013                    }
1014                }
1015                GenericStorePacket::V5_0Publish(p) => {
1016                    // Add to appropriate QoS tracking set
1017                    match p.qos() {
1018                        Qos::AtLeastOnce => {
1019                            self.pid_puback.insert(p.packet_id().unwrap());
1020                        }
1021                        Qos::ExactlyOnce => {
1022                            self.pid_pubrec.insert(p.packet_id().unwrap());
1023                        }
1024                        _ => {
1025                            // QoS 0 shouldn't be in store, but handle gracefully
1026                            tracing::warn!("QoS 0 packet found in store, skipping");
1027                            continue;
1028                        }
1029                    }
1030                    // Register packet ID and add to store
1031                    let packet_id = p.packet_id().unwrap();
1032                    if self.pid_man.register_id(packet_id).is_ok() {
1033                        if let Err(e) = self.store.add(packet) {
1034                            tracing::error!("Failed to add packet to store: {:?}", e);
1035                        }
1036                    } else {
1037                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1038                    }
1039                }
1040                GenericStorePacket::V3_1_1Pubrel(p) => {
1041                    // Pubrel packets expect PUBCOMP response
1042                    self.pid_pubcomp.insert(p.packet_id());
1043                    // Register packet ID and add to store
1044                    let packet_id = p.packet_id();
1045                    if self.pid_man.register_id(packet_id).is_ok() {
1046                        if let Err(e) = self.store.add(packet) {
1047                            tracing::error!("Failed to add packet to store: {:?}", e);
1048                        }
1049                    } else {
1050                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1051                    }
1052                }
1053                GenericStorePacket::V5_0Pubrel(p) => {
1054                    // Pubrel packets expect PUBCOMP response
1055                    self.pid_pubcomp.insert(p.packet_id());
1056                    // Register packet ID and add to store
1057                    let packet_id = p.packet_id();
1058                    if self.pid_man.register_id(packet_id).is_ok() {
1059                        if let Err(e) = self.store.add(packet) {
1060                            tracing::error!("Failed to add packet to store: {:?}", e);
1061                        }
1062                    } else {
1063                        tracing::error!("Packet ID {} has already been used. Skip it", packet_id);
1064                    }
1065                }
1066            }
1067        }
1068    }
1069
1070    /// Get stored packets for persistence
1071    ///
1072    /// Returns packets that need to be stored for potential retransmission.
1073    /// This is useful for implementing persistent sessions.
1074    ///
1075    /// # Returns
1076    ///
1077    /// Vector of packets that should be persisted
1078    pub fn get_stored_packets(&self) -> Vec<GenericStorePacket<PacketIdType>> {
1079        self.store.get_stored()
1080    }
1081
1082    /// Get the MQTT protocol version being used
1083    ///
1084    /// # Returns
1085    ///
1086    /// The protocol version (V3_1_1 or V5_0)
1087    pub fn get_protocol_version(&self) -> Version {
1088        self.protocol_version
1089    }
1090
1091    /// Check if a PUBLISH packet is currently being processed
1092    ///
1093    /// # Parameters
1094    ///
1095    /// * `packet_id` - The packet ID to check
1096    ///
1097    /// # Returns
1098    ///
1099    /// True if the packet ID is in use for PUBLISH processing
1100    pub fn is_publish_processing(&self, packet_id: PacketIdType) -> bool {
1101        self.qos2_publish_processing.contains(&packet_id)
1102    }
1103
1104    /// Regulate packet for store (remove/resolve topic alias)
1105    ///
1106    /// This method prepares a V5.0 publish packet for storage by resolving topic aliases
1107    /// and removing TopicAlias properties to ensure the packet can be retransmitted correctly.
1108    pub fn regulate_for_store(
1109        &self,
1110        mut packet: v5_0::GenericPublish<PacketIdType>,
1111    ) -> Result<v5_0::GenericPublish<PacketIdType>, MqttError> {
1112        if packet.topic_name().is_empty() {
1113            // Topic is empty, need to resolve from topic alias
1114            if let Some(props) = packet.props() {
1115                if let Some(topic_alias) = Self::get_topic_alias_from_props(props) {
1116                    if let Some(ref topic_alias_send) = self.topic_alias_send {
1117                        if let Some(topic) = topic_alias_send.peek(topic_alias) {
1118                            // Found topic for alias, add topic and remove alias property
1119                            packet = packet.remove_topic_alias_add_topic(topic.to_string())?;
1120                        } else {
1121                            return Err(MqttError::PacketNotRegulated);
1122                        }
1123                    } else {
1124                        return Err(MqttError::PacketNotRegulated);
1125                    }
1126                } else {
1127                    return Err(MqttError::PacketNotRegulated);
1128                }
1129            } else {
1130                return Err(MqttError::PacketNotRegulated);
1131            }
1132        } else {
1133            // Topic is not empty, just remove TopicAlias property if present
1134            packet = packet.remove_topic_alias();
1135        }
1136
1137        Ok(packet)
1138    }
1139
1140    // private
1141
1142    /// Initialize connection state based on client/server role
1143    ///
1144    /// Resets all connection-specific state including:
1145    /// - Publish flow control counters and limits
1146    /// - Topic alias management
1147    /// - QoS2 processing state
1148    /// - Packet ID tracking sets
1149    /// - Store requirement flag
1150    ///
1151    /// # Arguments
1152    /// * `is_client` - true for client mode, false for server mode
1153    fn initialize(&mut self, is_client: bool) {
1154        self.publish_send_max = None;
1155        self.publish_recv_max = None;
1156        self.publish_send_count = 0;
1157        self.topic_alias_send = None;
1158        self.topic_alias_recv = None;
1159        self.publish_recv.clear();
1160        self.qos2_publish_processing.clear();
1161        self.need_store = false;
1162        self.pid_suback.clear();
1163        self.pid_unsuback.clear();
1164        self.is_client = is_client;
1165    }
1166
1167    fn clear_store_related(&mut self) {
1168        self.pid_man.clear();
1169        self.pid_puback.clear();
1170        self.pid_pubrec.clear();
1171        self.pid_pubcomp.clear();
1172        self.store.clear();
1173    }
1174
1175    /// Send all stored packets for retransmission
1176    fn send_stored(&mut self) -> Vec<GenericEvent<PacketIdType>> {
1177        let mut events = Vec::new();
1178        self.store.for_each(|packet| {
1179            if packet.size() > self.maximum_packet_size_send as usize {
1180                let packet_id = packet.packet_id();
1181                self.pid_man.release_id(packet_id);
1182                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1183                return false; // Remove from store
1184            }
1185            events.push(GenericEvent::RequestSendPacket {
1186                packet: packet.clone().into(),
1187                release_packet_id_if_send_error: None,
1188            });
1189            true // Keep in store
1190        });
1191
1192        events
1193    }
1194
1195    /// Validate topic alias and return the associated topic name
1196    ///
1197    /// Checks if the topic alias is valid and retrieves the corresponding topic name
1198    /// from the topic alias send manager.
1199    ///
1200    /// # Arguments
1201    /// * `topic_alias_opt` - Optional topic alias value
1202    ///
1203    /// # Returns
1204    /// * `Some(topic_name)` if the topic alias is valid and found
1205    /// * `None` if the topic alias is invalid, not provided, or not found
1206    fn validate_topic_alias(&mut self, topic_alias_opt: Option<u16>) -> Option<String> {
1207        let topic_alias = topic_alias_opt?;
1208
1209        if !self.validate_topic_alias_range(topic_alias) {
1210            return None;
1211        }
1212
1213        let topic_alias_send = self.topic_alias_send.as_mut()?;
1214        // LRU updated here
1215        let topic = topic_alias_send.get(topic_alias)?;
1216
1217        Some(topic.to_string())
1218    }
1219
1220    /// Validate that topic alias is within the allowed range
1221    ///
1222    /// Checks if the topic alias value is valid according to the configured
1223    /// topic alias maximum for sending.
1224    ///
1225    /// # Arguments
1226    /// * `topic_alias` - Topic alias value to validate
1227    ///
1228    /// # Returns
1229    /// * `true` if the topic alias is within valid range
1230    /// * `false` if invalid or topic alias sending is not configured
1231    fn validate_topic_alias_range(&self, topic_alias: u16) -> bool {
1232        let topic_alias_send = match &self.topic_alias_send {
1233            Some(tas) => tas,
1234            None => {
1235                tracing::error!("topic_alias is set but topic_alias_maximum is 0");
1236                return false;
1237            }
1238        };
1239
1240        if topic_alias == 0 || topic_alias > topic_alias_send.max() {
1241            tracing::error!("topic_alias is set but out of range");
1242            return false;
1243        }
1244
1245        true
1246    }
1247
1248    /// Process v3.1.1 CONNECT packet - C++ constexpr if implementation
1249    pub(crate) fn process_send_v3_1_1_connect(
1250        &mut self,
1251        packet: v3_1_1::Connect,
1252    ) -> Vec<GenericEvent<PacketIdType>> {
1253        if self.status != ConnectionStatus::Disconnected {
1254            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1255        }
1256        if !self.validate_maximum_packet_size_send(packet.size()) {
1257            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1258        }
1259
1260        let mut events = Vec::new();
1261        self.initialize(true);
1262        self.status = ConnectionStatus::Connecting;
1263
1264        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1265        let keep_alive = packet.keep_alive();
1266        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1267            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1268        }
1269
1270        // Handle clean_session flag
1271        if packet.clean_start() {
1272            self.clear_store_related();
1273        } else {
1274            self.need_store = true;
1275        }
1276
1277        // Clear topic alias for sending
1278        self.topic_alias_send = None;
1279
1280        events.push(GenericEvent::RequestSendPacket {
1281            packet: packet.into(),
1282            release_packet_id_if_send_error: None,
1283        });
1284        self.send_post_process(&mut events);
1285
1286        events
1287    }
1288
1289    /// Process v5.0 CONNECT packet - C++ constexpr if implementation
1290    pub(crate) fn process_send_v5_0_connect(
1291        &mut self,
1292        packet: v5_0::Connect,
1293    ) -> Vec<GenericEvent<PacketIdType>> {
1294        if !self.validate_maximum_packet_size_send(packet.size()) {
1295            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1296        }
1297        if self.status != ConnectionStatus::Disconnected {
1298            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1299        }
1300
1301        let mut events = Vec::new();
1302        self.initialize(true);
1303        self.status = ConnectionStatus::Connecting;
1304
1305        // Extract keep_alive and set pingreq_send_interval_ms if != 0
1306        let keep_alive = packet.keep_alive();
1307        if keep_alive != 0 && self.pingreq_send_interval_ms.is_none() {
1308            self.pingreq_send_interval_ms = Some(keep_alive as u64 * 1000);
1309        }
1310
1311        // Handle clean_start flag
1312        if packet.clean_start() {
1313            self.clear_store_related();
1314        }
1315
1316        // Process properties
1317        for prop in packet.props() {
1318            match prop {
1319                Property::TopicAliasMaximum(val) => {
1320                    if val.val() != 0 {
1321                        self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1322                    }
1323                }
1324                Property::ReceiveMaximum(val) => {
1325                    debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1326                    self.publish_recv_max = Some(val.val());
1327                }
1328                Property::MaximumPacketSize(val) => {
1329                    debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1330                    self.maximum_packet_size_recv = val.val();
1331                }
1332                Property::SessionExpiryInterval(val) => {
1333                    if val.val() != 0 {
1334                        self.need_store = true;
1335                    }
1336                }
1337                _ => {
1338                    // Ignore other properties (equivalent to [](auto const&){} in C++)
1339                }
1340            }
1341        }
1342
1343        events.push(GenericEvent::RequestSendPacket {
1344            packet: packet.into(),
1345            release_packet_id_if_send_error: None,
1346        });
1347        self.send_post_process(&mut events);
1348
1349        events
1350    }
1351
1352    pub(crate) fn process_send_v3_1_1_connack(
1353        &mut self,
1354        packet: v3_1_1::Connack,
1355    ) -> Vec<GenericEvent<PacketIdType>> {
1356        if self.status != ConnectionStatus::Connecting {
1357            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1358        }
1359        let mut events = Vec::new();
1360        if packet.return_code() == ConnectReturnCode::Accepted {
1361            self.status = ConnectionStatus::Connected;
1362        } else {
1363            self.status = ConnectionStatus::Disconnected;
1364        }
1365
1366        events.push(GenericEvent::RequestSendPacket {
1367            packet: packet.into(),
1368            release_packet_id_if_send_error: None,
1369        });
1370        events.extend(self.send_stored());
1371        self.send_post_process(&mut events);
1372
1373        events
1374    }
1375
1376    pub(crate) fn process_send_v5_0_connack(
1377        &mut self,
1378        packet: v5_0::Connack,
1379    ) -> Vec<GenericEvent<PacketIdType>> {
1380        if !self.validate_maximum_packet_size_send(packet.size()) {
1381            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1382        }
1383        if self.status != ConnectionStatus::Connecting {
1384            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1385        }
1386
1387        let mut events = Vec::new();
1388
1389        if packet.reason_code() == ConnectReasonCode::Success {
1390            self.status = ConnectionStatus::Connected;
1391
1392            // Process properties
1393            for prop in packet.props() {
1394                match prop {
1395                    Property::TopicAliasMaximum(val) => {
1396                        if val.val() != 0 {
1397                            self.topic_alias_recv = Some(TopicAliasRecv::new(val.val()));
1398                        }
1399                    }
1400                    Property::ReceiveMaximum(val) => {
1401                        debug_assert!(val.val() != 0, "ReceiveMaximum must not be 0");
1402                        self.publish_recv_max = Some(val.val());
1403                    }
1404                    Property::MaximumPacketSize(val) => {
1405                        debug_assert!(val.val() != 0, "MaximumPacketSize must not be 0");
1406                        self.maximum_packet_size_recv = val.val();
1407                    }
1408                    _ => {
1409                        // Ignore other properties
1410                    }
1411                }
1412            }
1413        } else {
1414            self.status = ConnectionStatus::Disconnected;
1415            self.cancel_timers(&mut events);
1416        }
1417
1418        events.push(GenericEvent::RequestSendPacket {
1419            packet: packet.into(),
1420            release_packet_id_if_send_error: None,
1421        });
1422        events.extend(self.send_stored());
1423        self.send_post_process(&mut events);
1424
1425        events
1426    }
1427
1428    pub(crate) fn process_send_v3_1_1_publish(
1429        &mut self,
1430        packet: v3_1_1::GenericPublish<PacketIdType>,
1431    ) -> Vec<GenericEvent<PacketIdType>> {
1432        let mut events = Vec::new();
1433        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1434
1435        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1436            // Register packet ID for QoS 1 or 2
1437            let packet_id = packet.packet_id().unwrap();
1438            if self.status != ConnectionStatus::Connected
1439                && !self.need_store
1440                && !self.offline_publish
1441            {
1442                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1443                if self.pid_man.is_used_id(packet_id) {
1444                    self.pid_man.release_id(packet_id);
1445                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1446                }
1447                return events;
1448            }
1449            if !self.pid_man.is_used_id(packet_id) {
1450                tracing::error!("packet_id {packet_id} must be acquired or registered");
1451                events.push(GenericEvent::NotifyError(
1452                    MqttError::PacketIdentifierInvalid,
1453                ));
1454                return events;
1455            }
1456            if self.need_store
1457                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1458            {
1459                let store_packet = packet.clone().set_dup(true);
1460                self.store.add(store_packet.try_into().unwrap()).unwrap();
1461            } else {
1462                release_packet_id_if_send_error = Some(packet_id);
1463            }
1464            if packet.qos() == Qos::ExactlyOnce {
1465                self.qos2_publish_processing.insert(packet_id);
1466                self.pid_pubrec.insert(packet_id);
1467            } else {
1468                self.pid_puback.insert(packet_id);
1469            }
1470        } else if self.status != ConnectionStatus::Connected {
1471            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1472            return events;
1473        }
1474
1475        if self.status == ConnectionStatus::Connected {
1476            events.push(GenericEvent::RequestSendPacket {
1477                packet: packet.into(),
1478                release_packet_id_if_send_error,
1479            });
1480        }
1481        self.send_post_process(&mut events);
1482
1483        events
1484    }
1485
1486    pub(crate) fn process_send_v5_0_publish(
1487        &mut self,
1488        mut packet: v5_0::GenericPublish<PacketIdType>,
1489    ) -> Vec<GenericEvent<PacketIdType>> {
1490        if !self.validate_maximum_packet_size_send(packet.size()) {
1491            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1492        }
1493
1494        let mut events = Vec::new();
1495        let mut release_packet_id_if_send_error: Option<PacketIdType> = None;
1496        let mut topic_alias_validated = false;
1497        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1498            let packet_id = packet.packet_id().unwrap();
1499            if self.status != ConnectionStatus::Connected
1500                && !self.need_store
1501                && !self.offline_publish
1502            {
1503                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1504                if self.pid_man.is_used_id(packet_id) {
1505                    self.pid_man.release_id(packet_id);
1506                    events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1507                }
1508                return events;
1509            }
1510
1511            // Extract topic_name from TopicAlias and remove TopicAlias property, then store it
1512            if !self.pid_man.is_used_id(packet_id) {
1513                tracing::error!("packet_id {packet_id} must be acquired or registered");
1514                events.push(GenericEvent::NotifyError(
1515                    MqttError::PacketIdentifierInvalid,
1516                ));
1517                return events;
1518            }
1519
1520            if self.need_store
1521                && (self.status != ConnectionStatus::Disconnected || self.offline_publish)
1522            {
1523                let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1524                if packet.topic_name().is_empty() {
1525                    // Topic name is empty, must validate topic alias
1526                    let topic_opt = self.validate_topic_alias(ta_opt);
1527                    if topic_opt.is_none() {
1528                        events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1529                        if self.pid_man.is_used_id(packet_id) {
1530                            self.pid_man.release_id(packet_id);
1531                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1532                        }
1533                        return events;
1534                    }
1535                    topic_alias_validated = true;
1536                    let store_packet = packet
1537                        .clone()
1538                        .remove_topic_alias_add_topic(topic_opt.unwrap())
1539                        .unwrap()
1540                        .set_dup(true);
1541                    // TBD validate_maximum_packet_size(store_packet.size());
1542                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1543                } else {
1544                    // Topic name is not empty, remove topic alias if present
1545                    let store_packet = packet.clone().remove_topic_alias().set_dup(true);
1546                    self.store.add(store_packet.try_into().unwrap()).unwrap();
1547                }
1548            } else {
1549                release_packet_id_if_send_error = Some(packet_id);
1550            }
1551            if packet.qos() == Qos::ExactlyOnce {
1552                self.qos2_publish_processing.insert(packet_id);
1553                self.pid_pubrec.insert(packet_id);
1554            } else {
1555                self.pid_puback.insert(packet_id);
1556            }
1557        } else if self.status != ConnectionStatus::Connected {
1558            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1559            return events;
1560        }
1561
1562        let packet_id_opt = packet.packet_id();
1563        let ta_opt = Self::get_topic_alias_from_props_opt(packet.props());
1564        if packet.topic_name().is_empty() {
1565            // process manually provided TopicAlias
1566            if !topic_alias_validated && self.validate_topic_alias(ta_opt).is_none() {
1567                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1568                if let Some(packet_id) = packet_id_opt {
1569                    if self.pid_man.is_used_id(packet_id) {
1570                        self.pid_man.release_id(packet_id);
1571                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1572                    }
1573                }
1574                return events;
1575            }
1576        } else if let Some(ta) = ta_opt {
1577            // Topic alias is provided
1578            if self.validate_topic_alias_range(ta) {
1579                tracing::trace!(
1580                    "topic alias: {} - {} is registered.",
1581                    packet.topic_name(),
1582                    ta
1583                );
1584                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1585                    topic_alias_send.insert_or_update(packet.topic_name(), ta);
1586                }
1587            } else {
1588                events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1589                if let Some(packet_id) = packet_id_opt {
1590                    if self.pid_man.is_used_id(packet_id) {
1591                        self.pid_man.release_id(packet_id);
1592                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1593                    }
1594                }
1595                return events;
1596            }
1597        } else if self.status == ConnectionStatus::Connected {
1598            // process auto applying TopicAlias if the option is enabled
1599            if self.auto_map_topic_alias_send {
1600                if let Some(ref mut topic_alias_send) = self.topic_alias_send {
1601                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1602                        tracing::trace!(
1603                            "topic alias: {} - {} is found.",
1604                            packet.topic_name(),
1605                            found_ta
1606                        );
1607                        packet = packet.remove_topic_add_topic_alias(found_ta);
1608                    } else {
1609                        let lru_ta = topic_alias_send.get_lru_alias();
1610                        topic_alias_send.insert_or_update(packet.topic_name(), lru_ta);
1611                        packet = packet.remove_topic_add_topic_alias(lru_ta);
1612                    }
1613                }
1614            } else if self.auto_replace_topic_alias_send {
1615                if let Some(ref topic_alias_send) = self.topic_alias_send {
1616                    if let Some(found_ta) = topic_alias_send.find_by_topic(packet.topic_name()) {
1617                        tracing::trace!(
1618                            "topic alias: {} - {} is found.",
1619                            packet.topic_name(),
1620                            found_ta
1621                        );
1622                        packet = packet.remove_topic_add_topic_alias(found_ta);
1623                    }
1624                }
1625            }
1626        }
1627
1628        // Check receive_maximum for sending (QoS 1 and 2 packets)
1629        if packet.qos() == Qos::AtLeastOnce || packet.qos() == Qos::ExactlyOnce {
1630            if let Some(max) = self.publish_send_max {
1631                if self.publish_send_count == max {
1632                    events.push(GenericEvent::NotifyError(MqttError::ReceiveMaximumExceeded));
1633                    if let Some(packet_id) = packet_id_opt {
1634                        if self.pid_man.is_used_id(packet_id) {
1635                            self.pid_man.release_id(packet_id);
1636                            events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1637                        }
1638                    }
1639                    return events;
1640                }
1641                self.publish_send_count += 1;
1642            }
1643        }
1644
1645        if self.status == ConnectionStatus::Connected {
1646            events.push(GenericEvent::RequestSendPacket {
1647                packet: packet.into(),
1648                release_packet_id_if_send_error,
1649            });
1650        }
1651        self.send_post_process(&mut events);
1652
1653        events
1654    }
1655
1656    pub(crate) fn process_send_v3_1_1_puback(
1657        &mut self,
1658        packet: v3_1_1::GenericPuback<PacketIdType>,
1659    ) -> Vec<GenericEvent<PacketIdType>> {
1660        if self.status != ConnectionStatus::Connected {
1661            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1662        }
1663        let mut events = Vec::new();
1664
1665        events.push(GenericEvent::RequestSendPacket {
1666            packet: packet.into(),
1667            release_packet_id_if_send_error: None,
1668        });
1669        self.send_post_process(&mut events);
1670
1671        events
1672    }
1673
1674    pub(crate) fn process_send_v5_0_puback(
1675        &mut self,
1676        packet: v5_0::GenericPuback<PacketIdType>,
1677    ) -> Vec<GenericEvent<PacketIdType>> {
1678        if !self.validate_maximum_packet_size_send(packet.size()) {
1679            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1680        }
1681        if self.status != ConnectionStatus::Connected {
1682            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1683        }
1684
1685        let mut events = Vec::new();
1686        self.publish_recv.remove(&packet.packet_id());
1687
1688        events.push(GenericEvent::RequestSendPacket {
1689            packet: packet.into(),
1690            release_packet_id_if_send_error: None,
1691        });
1692        self.send_post_process(&mut events);
1693
1694        events
1695    }
1696
1697    pub(crate) fn process_send_v3_1_1_pubrec(
1698        &mut self,
1699        packet: v3_1_1::GenericPubrec<PacketIdType>,
1700    ) -> Vec<GenericEvent<PacketIdType>> {
1701        if self.status != ConnectionStatus::Connected {
1702            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1703        }
1704        let mut events = Vec::new();
1705
1706        events.push(GenericEvent::RequestSendPacket {
1707            packet: packet.into(),
1708            release_packet_id_if_send_error: None,
1709        });
1710        self.send_post_process(&mut events);
1711
1712        events
1713    }
1714
1715    pub(crate) fn process_send_v5_0_pubrec(
1716        &mut self,
1717        packet: v5_0::GenericPubrec<PacketIdType>,
1718    ) -> Vec<GenericEvent<PacketIdType>> {
1719        if !self.validate_maximum_packet_size_send(packet.size()) {
1720            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1721        }
1722        if self.status != ConnectionStatus::Connected {
1723            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1724        }
1725
1726        let mut events = Vec::new();
1727        let packet_id = packet.packet_id();
1728
1729        if let Some(rc) = packet.reason_code() {
1730            if rc.is_failure() {
1731                self.publish_recv.remove(&packet_id);
1732                self.qos2_publish_handled.remove(&packet_id);
1733            }
1734        }
1735
1736        events.push(GenericEvent::RequestSendPacket {
1737            packet: packet.into(),
1738            release_packet_id_if_send_error: None,
1739        });
1740        self.send_post_process(&mut events);
1741
1742        events
1743    }
1744
1745    pub(crate) fn process_send_v3_1_1_pubrel(
1746        &mut self,
1747        packet: v3_1_1::GenericPubrel<PacketIdType>,
1748    ) -> Vec<GenericEvent<PacketIdType>> {
1749        if self.status != ConnectionStatus::Connected && !self.need_store {
1750            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1751        }
1752        let mut events = Vec::new();
1753        let packet_id = packet.packet_id();
1754        if !self.pid_man.is_used_id(packet_id) {
1755            tracing::error!("packet_id {packet_id} must be acquired or registered");
1756            events.push(GenericEvent::NotifyError(
1757                MqttError::PacketIdentifierInvalid,
1758            ));
1759            return events;
1760        }
1761        if self.need_store {
1762            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1763        }
1764
1765        if self.status == ConnectionStatus::Connected {
1766            self.pid_pubcomp.insert(packet_id);
1767            events.push(GenericEvent::RequestSendPacket {
1768                packet: packet.into(),
1769                release_packet_id_if_send_error: None,
1770            });
1771        }
1772        self.send_post_process(&mut events);
1773
1774        events
1775    }
1776
1777    pub(crate) fn process_send_v5_0_pubrel(
1778        &mut self,
1779        packet: v5_0::GenericPubrel<PacketIdType>,
1780    ) -> Vec<GenericEvent<PacketIdType>> {
1781        if !self.validate_maximum_packet_size_send(packet.size()) {
1782            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1783        }
1784        if self.status != ConnectionStatus::Connected && !self.need_store {
1785            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1786        }
1787
1788        let mut events = Vec::new();
1789        let packet_id = packet.packet_id();
1790        if !self.pid_man.is_used_id(packet_id) {
1791            tracing::error!("packet_id {packet_id} must be acquired or registered");
1792            events.push(GenericEvent::NotifyError(
1793                MqttError::PacketIdentifierInvalid,
1794            ));
1795            return events;
1796        }
1797        if self.need_store {
1798            self.store.add(packet.clone().try_into().unwrap()).unwrap();
1799        }
1800
1801        if self.status == ConnectionStatus::Connected {
1802            self.pid_pubcomp.insert(packet_id);
1803            events.push(GenericEvent::RequestSendPacket {
1804                packet: packet.into(),
1805                release_packet_id_if_send_error: None,
1806            });
1807        }
1808        self.send_post_process(&mut events);
1809
1810        events
1811    }
1812
1813    pub(crate) fn process_send_v3_1_1_pubcomp(
1814        &mut self,
1815        packet: v3_1_1::GenericPubcomp<PacketIdType>,
1816    ) -> Vec<GenericEvent<PacketIdType>> {
1817        if self.status != ConnectionStatus::Connected {
1818            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1819        }
1820        let mut events = Vec::new();
1821
1822        events.push(GenericEvent::RequestSendPacket {
1823            packet: packet.into(),
1824            release_packet_id_if_send_error: None,
1825        });
1826        self.send_post_process(&mut events);
1827
1828        events
1829    }
1830
1831    pub(crate) fn process_send_v5_0_pubcomp(
1832        &mut self,
1833        packet: v5_0::GenericPubcomp<PacketIdType>,
1834    ) -> Vec<GenericEvent<PacketIdType>> {
1835        if !self.validate_maximum_packet_size_send(packet.size()) {
1836            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1837        }
1838        if self.status != ConnectionStatus::Connected {
1839            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1840        }
1841
1842        let mut events = Vec::new();
1843        self.publish_recv.remove(&packet.packet_id());
1844
1845        events.push(GenericEvent::RequestSendPacket {
1846            packet: packet.into(),
1847            release_packet_id_if_send_error: None,
1848        });
1849        self.send_post_process(&mut events);
1850
1851        events
1852    }
1853
1854    pub(crate) fn process_send_v3_1_1_subscribe(
1855        &mut self,
1856        packet: v3_1_1::GenericSubscribe<PacketIdType>,
1857    ) -> Vec<GenericEvent<PacketIdType>> {
1858        let mut events = Vec::new();
1859        let packet_id = packet.packet_id();
1860        if self.status != ConnectionStatus::Connected {
1861            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1862            if self.pid_man.is_used_id(packet_id) {
1863                self.pid_man.release_id(packet_id);
1864                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1865            }
1866            return events;
1867        }
1868        if !self.pid_man.is_used_id(packet_id) {
1869            tracing::error!("packet_id {packet_id} must be acquired or registered");
1870            events.push(GenericEvent::NotifyError(
1871                MqttError::PacketIdentifierInvalid,
1872            ));
1873            return events;
1874        }
1875        self.pid_suback.insert(packet_id);
1876
1877        events.push(GenericEvent::RequestSendPacket {
1878            packet: packet.into(),
1879            release_packet_id_if_send_error: Some(packet_id),
1880        });
1881        self.send_post_process(&mut events);
1882
1883        events
1884    }
1885
1886    pub(crate) fn process_send_v5_0_subscribe(
1887        &mut self,
1888        packet: v5_0::GenericSubscribe<PacketIdType>,
1889    ) -> Vec<GenericEvent<PacketIdType>> {
1890        if !self.validate_maximum_packet_size_send(packet.size()) {
1891            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1892        }
1893
1894        let mut events = Vec::new();
1895        let packet_id = packet.packet_id();
1896        if self.status != ConnectionStatus::Connected {
1897            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1898            if self.pid_man.is_used_id(packet_id) {
1899                self.pid_man.release_id(packet_id);
1900                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1901            }
1902            return events;
1903        }
1904        if !self.pid_man.is_used_id(packet_id) {
1905            tracing::error!("packet_id {packet_id} must be acquired or registered");
1906            events.push(GenericEvent::NotifyError(
1907                MqttError::PacketIdentifierInvalid,
1908            ));
1909            return events;
1910        }
1911        self.pid_suback.insert(packet_id);
1912
1913        events.push(GenericEvent::RequestSendPacket {
1914            packet: packet.into(),
1915            release_packet_id_if_send_error: Some(packet_id),
1916        });
1917        self.send_post_process(&mut events);
1918
1919        events
1920    }
1921
1922    pub(crate) fn process_send_v3_1_1_suback(
1923        &mut self,
1924        packet: v3_1_1::GenericSuback<PacketIdType>,
1925    ) -> Vec<GenericEvent<PacketIdType>> {
1926        if self.status != ConnectionStatus::Connected {
1927            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1928        }
1929        let mut events = Vec::new();
1930        events.push(GenericEvent::RequestSendPacket {
1931            packet: packet.into(),
1932            release_packet_id_if_send_error: None,
1933        });
1934        self.send_post_process(&mut events);
1935
1936        events
1937    }
1938
1939    pub(crate) fn process_send_v5_0_suback(
1940        &mut self,
1941        packet: v5_0::GenericSuback<PacketIdType>,
1942    ) -> Vec<GenericEvent<PacketIdType>> {
1943        if !self.validate_maximum_packet_size_send(packet.size()) {
1944            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1945        }
1946        if self.status != ConnectionStatus::Connected {
1947            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
1948        }
1949
1950        let mut events = Vec::new();
1951        events.push(GenericEvent::RequestSendPacket {
1952            packet: packet.into(),
1953            release_packet_id_if_send_error: None,
1954        });
1955        self.send_post_process(&mut events);
1956
1957        events
1958    }
1959
1960    pub(crate) fn process_send_v3_1_1_unsubscribe(
1961        &mut self,
1962        packet: v3_1_1::GenericUnsubscribe<PacketIdType>,
1963    ) -> Vec<GenericEvent<PacketIdType>> {
1964        let mut events = Vec::new();
1965        let packet_id = packet.packet_id();
1966        if self.status != ConnectionStatus::Connected {
1967            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
1968            if self.pid_man.is_used_id(packet_id) {
1969                self.pid_man.release_id(packet_id);
1970                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
1971            }
1972            return events;
1973        }
1974        if !self.pid_man.is_used_id(packet_id) {
1975            tracing::error!("packet_id {packet_id} must be acquired or registered");
1976            events.push(GenericEvent::NotifyError(
1977                MqttError::PacketIdentifierInvalid,
1978            ));
1979            return events;
1980        }
1981        self.pid_unsuback.insert(packet_id);
1982
1983        events.push(GenericEvent::RequestSendPacket {
1984            packet: packet.into(),
1985            release_packet_id_if_send_error: Some(packet_id),
1986        });
1987        self.send_post_process(&mut events);
1988
1989        events
1990    }
1991
1992    pub(crate) fn process_send_v5_0_unsubscribe(
1993        &mut self,
1994        packet: v5_0::GenericUnsubscribe<PacketIdType>,
1995    ) -> Vec<GenericEvent<PacketIdType>> {
1996        if !self.validate_maximum_packet_size_send(packet.size()) {
1997            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
1998        }
1999
2000        let mut events = Vec::new();
2001        let packet_id = packet.packet_id();
2002        if self.status != ConnectionStatus::Connected {
2003            events.push(GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend));
2004            if self.pid_man.is_used_id(packet_id) {
2005                self.pid_man.release_id(packet_id);
2006                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2007            }
2008            return events;
2009        }
2010        if !self.pid_man.is_used_id(packet_id) {
2011            tracing::error!("packet_id {packet_id} must be acquired or registered");
2012            events.push(GenericEvent::NotifyError(
2013                MqttError::PacketIdentifierInvalid,
2014            ));
2015            return events;
2016        }
2017        self.pid_unsuback.insert(packet_id);
2018
2019        events.push(GenericEvent::RequestSendPacket {
2020            packet: packet.into(),
2021            release_packet_id_if_send_error: Some(packet_id),
2022        });
2023        self.send_post_process(&mut events);
2024
2025        events
2026    }
2027
2028    pub(crate) fn process_send_v3_1_1_unsuback(
2029        &mut self,
2030        packet: v3_1_1::GenericUnsuback<PacketIdType>,
2031    ) -> Vec<GenericEvent<PacketIdType>> {
2032        if self.status != ConnectionStatus::Connected {
2033            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2034        }
2035        let mut events = Vec::new();
2036        events.push(GenericEvent::RequestSendPacket {
2037            packet: packet.into(),
2038            release_packet_id_if_send_error: None,
2039        });
2040        self.send_post_process(&mut events);
2041
2042        events
2043    }
2044
2045    pub(crate) fn process_send_v5_0_unsuback(
2046        &mut self,
2047        packet: v5_0::GenericUnsuback<PacketIdType>,
2048    ) -> Vec<GenericEvent<PacketIdType>> {
2049        if !self.validate_maximum_packet_size_send(packet.size()) {
2050            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2051        }
2052        if self.status != ConnectionStatus::Connected {
2053            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2054        }
2055
2056        let mut events = Vec::new();
2057        events.push(GenericEvent::RequestSendPacket {
2058            packet: packet.into(),
2059            release_packet_id_if_send_error: None,
2060        });
2061        self.send_post_process(&mut events);
2062
2063        events
2064    }
2065
2066    pub(crate) fn process_send_v3_1_1_pingreq(
2067        &mut self,
2068        packet: v3_1_1::Pingreq,
2069    ) -> Vec<GenericEvent<PacketIdType>> {
2070        if self.status != ConnectionStatus::Connected {
2071            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2072        }
2073        let mut events = Vec::new();
2074        events.push(GenericEvent::RequestSendPacket {
2075            packet: packet.into(),
2076            release_packet_id_if_send_error: None,
2077        });
2078        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2079            self.pingreq_send_set = true;
2080            events.push(GenericEvent::RequestTimerReset {
2081                kind: TimerKind::PingrespRecv,
2082                duration_ms: timeout_ms,
2083            });
2084        }
2085        self.send_post_process(&mut events);
2086
2087        events
2088    }
2089
2090    pub(crate) fn process_send_v5_0_pingreq(
2091        &mut self,
2092        packet: v5_0::Pingreq,
2093    ) -> Vec<GenericEvent<PacketIdType>> {
2094        if !self.validate_maximum_packet_size_send(packet.size()) {
2095            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2096        }
2097        if self.status != ConnectionStatus::Connected {
2098            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2099        }
2100
2101        let mut events = Vec::new();
2102        events.push(GenericEvent::RequestSendPacket {
2103            packet: packet.into(),
2104            release_packet_id_if_send_error: None,
2105        });
2106        if let Some(timeout_ms) = self.pingresp_recv_timeout_ms {
2107            self.pingreq_send_set = true;
2108            events.push(GenericEvent::RequestTimerReset {
2109                kind: TimerKind::PingrespRecv,
2110                duration_ms: timeout_ms,
2111            });
2112        }
2113        self.send_post_process(&mut events);
2114
2115        events
2116    }
2117
2118    pub(crate) fn process_send_v3_1_1_pingresp(
2119        &mut self,
2120        packet: v3_1_1::Pingresp,
2121    ) -> Vec<GenericEvent<PacketIdType>> {
2122        if self.status != ConnectionStatus::Connected {
2123            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2124        }
2125        let mut events = Vec::new();
2126        events.push(GenericEvent::RequestSendPacket {
2127            packet: packet.into(),
2128            release_packet_id_if_send_error: None,
2129        });
2130        self.send_post_process(&mut events);
2131
2132        events
2133    }
2134
2135    pub(crate) fn process_send_v5_0_pingresp(
2136        &mut self,
2137        packet: v5_0::Pingresp,
2138    ) -> Vec<GenericEvent<PacketIdType>> {
2139        if !self.validate_maximum_packet_size_send(packet.size()) {
2140            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2141        }
2142        if self.status != ConnectionStatus::Connected {
2143            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2144        }
2145
2146        let mut events = Vec::new();
2147        events.push(GenericEvent::RequestSendPacket {
2148            packet: packet.into(),
2149            release_packet_id_if_send_error: None,
2150        });
2151        self.send_post_process(&mut events);
2152
2153        events
2154    }
2155
2156    pub(crate) fn process_send_v3_1_1_disconnect(
2157        &mut self,
2158        packet: v3_1_1::Disconnect,
2159    ) -> Vec<GenericEvent<PacketIdType>> {
2160        if self.status != ConnectionStatus::Connected {
2161            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2162        }
2163        let mut events = Vec::new();
2164        self.status = ConnectionStatus::Disconnected;
2165        self.cancel_timers(&mut events);
2166        events.push(GenericEvent::RequestSendPacket {
2167            packet: packet.into(),
2168            release_packet_id_if_send_error: None,
2169        });
2170        events.push(GenericEvent::RequestClose);
2171
2172        events
2173    }
2174
2175    pub(crate) fn process_send_v5_0_disconnect(
2176        &mut self,
2177        packet: v5_0::Disconnect,
2178    ) -> Vec<GenericEvent<PacketIdType>> {
2179        if !self.validate_maximum_packet_size_send(packet.size()) {
2180            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2181        }
2182        if self.status != ConnectionStatus::Connected {
2183            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2184        }
2185
2186        let mut events = Vec::new();
2187        self.status = ConnectionStatus::Disconnected;
2188        self.cancel_timers(&mut events);
2189        events.push(GenericEvent::RequestSendPacket {
2190            packet: packet.into(),
2191            release_packet_id_if_send_error: None,
2192        });
2193        events.push(GenericEvent::RequestClose);
2194
2195        events
2196    }
2197
2198    pub(crate) fn process_send_v5_0_auth(
2199        &mut self,
2200        packet: v5_0::Auth,
2201    ) -> Vec<GenericEvent<PacketIdType>> {
2202        if !self.validate_maximum_packet_size_send(packet.size()) {
2203            return vec![GenericEvent::NotifyError(MqttError::PacketTooLarge)];
2204        }
2205        if self.status == ConnectionStatus::Disconnected {
2206            return vec![GenericEvent::NotifyError(MqttError::PacketNotAllowedToSend)];
2207        }
2208
2209        let mut events = Vec::new();
2210        events.push(GenericEvent::RequestSendPacket {
2211            packet: packet.into(),
2212            release_packet_id_if_send_error: None,
2213        });
2214        self.send_post_process(&mut events);
2215
2216        events
2217    }
2218
2219    fn send_post_process(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
2220        if self.is_client {
2221            if let Some(timeout_ms) = self.pingreq_send_interval_ms {
2222                self.pingreq_send_set = true;
2223                events.push(GenericEvent::RequestTimerReset {
2224                    kind: TimerKind::PingreqSend,
2225                    duration_ms: timeout_ms,
2226                });
2227            }
2228        }
2229    }
2230
2231    fn validate_maximum_packet_size_send(&self, size: usize) -> bool {
2232        if size > self.maximum_packet_size_send as usize {
2233            tracing::error!("packet size over maximum_packet_size for sending");
2234            return false;
2235        }
2236        true
2237    }
2238
2239    fn process_recv_packet(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
2240        let mut events = Vec::new();
2241
2242        // packet size limit validation (v3.1.1 is always satisfied)
2243        let total_size = remaining_length_to_total_size(raw_packet.remaining_length());
2244        if total_size > self.maximum_packet_size_recv {
2245            // This happens only when protocol version is V5.0.
2246            // On v3.1.1, the maximum packet size is always 268435455 (2^32 - 1).
2247            // If the packet size is over 268434555, feed() return an error.
2248            // maximum_packet_size_recv is set by sending CONNECT or CONNACK packet.
2249            // So DISCONNECT packet is the right choice to notify the error.
2250            let disconnect_packet = v5_0::Disconnect::builder()
2251                .reason_code(DisconnectReasonCode::PacketTooLarge)
2252                .build()
2253                .unwrap();
2254            // Send disconnect packet directly without generic constraints
2255            events.extend(self.process_send_v5_0_disconnect(disconnect_packet));
2256            events.push(GenericEvent::NotifyError(MqttError::PacketTooLarge));
2257            return events;
2258        }
2259
2260        let packet_type = raw_packet.packet_type();
2261        let _flags = raw_packet.flags();
2262        match self.protocol_version {
2263            Version::V3_1_1 => {
2264                match packet_type {
2265                    1 => {
2266                        // CONNECT
2267                        events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2268                    }
2269                    2 => {
2270                        // CONNACK
2271                        events.extend(self.process_recv_v3_1_1_connack(raw_packet));
2272                    }
2273                    3 => {
2274                        // PUBLISH
2275                        events.extend(self.process_recv_v3_1_1_publish(raw_packet));
2276                    }
2277                    4 => {
2278                        // PUBACK
2279                        events.extend(self.process_recv_v3_1_1_puback(raw_packet));
2280                    }
2281                    5 => {
2282                        // PUBREC
2283                        events.extend(self.process_recv_v3_1_1_pubrec(raw_packet));
2284                    }
2285                    6 => {
2286                        // PUBREL
2287                        events.extend(self.process_recv_v3_1_1_pubrel(raw_packet));
2288                    }
2289                    7 => {
2290                        // PUBCOMP
2291                        events.extend(self.process_recv_v3_1_1_pubcomp(raw_packet));
2292                    }
2293                    8 => {
2294                        // SUBSCRIBE
2295                        events.extend(self.process_recv_v3_1_1_subscribe(raw_packet));
2296                    }
2297                    9 => {
2298                        // SUBACK
2299                        events.extend(self.process_recv_v3_1_1_suback(raw_packet));
2300                    }
2301                    10 => {
2302                        // UNSUBSCRIBE
2303                        events.extend(self.process_recv_v3_1_1_unsubscribe(raw_packet));
2304                    }
2305                    11 => {
2306                        // UNSUBACK
2307                        events.extend(self.process_recv_v3_1_1_unsuback(raw_packet));
2308                    }
2309                    12 => {
2310                        // PINGREQ
2311                        events.extend(self.process_recv_v3_1_1_pingreq(raw_packet));
2312                    }
2313                    13 => {
2314                        // PINGRESP
2315                        events.extend(self.process_recv_v3_1_1_pingresp(raw_packet));
2316                    }
2317                    14 => {
2318                        // DISCONNECT
2319                        events.extend(self.process_recv_v3_1_1_disconnect(raw_packet));
2320                    }
2321                    // invalid packet type
2322                    _ => {
2323                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2324                    }
2325                }
2326            }
2327            Version::V5_0 => {
2328                match packet_type {
2329                    1 => {
2330                        // CONNECT
2331                        events.extend(self.process_recv_v5_0_connect(raw_packet));
2332                    }
2333                    2 => {
2334                        // CONNACK
2335                        events.extend(self.process_recv_v5_0_connack(raw_packet));
2336                    }
2337                    3 => {
2338                        // PUBLISH
2339                        events.extend(self.process_recv_v5_0_publish(raw_packet));
2340                    }
2341                    4 => {
2342                        // PUBACK
2343                        events.extend(self.process_recv_v5_0_puback(raw_packet));
2344                    }
2345                    5 => {
2346                        // PUBREC
2347                        events.extend(self.process_recv_v5_0_pubrec(raw_packet));
2348                    }
2349                    6 => {
2350                        // PUBREL
2351                        events.extend(self.process_recv_v5_0_pubrel(raw_packet));
2352                    }
2353                    7 => {
2354                        // PUBCOMP
2355                        events.extend(self.process_recv_v5_0_pubcomp(raw_packet));
2356                    }
2357                    8 => {
2358                        // SUBSCRIBE
2359                        events.extend(self.process_recv_v5_0_subscribe(raw_packet));
2360                    }
2361                    9 => {
2362                        // SUBACK
2363                        events.extend(self.process_recv_v5_0_suback(raw_packet));
2364                    }
2365                    10 => {
2366                        // UNSUBSCRIBE
2367                        events.extend(self.process_recv_v5_0_unsubscribe(raw_packet));
2368                    }
2369                    11 => {
2370                        // UNSUBACK
2371                        events.extend(self.process_recv_v5_0_unsuback(raw_packet));
2372                    }
2373                    12 => {
2374                        // PINGREQ
2375                        events.extend(self.process_recv_v5_0_pingreq(raw_packet));
2376                    }
2377                    13 => {
2378                        // PINGRESP
2379                        events.extend(self.process_recv_v5_0_pingresp(raw_packet));
2380                    }
2381                    14 => {
2382                        // DISCONNECT
2383                        events.extend(self.process_recv_v5_0_disconnect(raw_packet));
2384                    }
2385                    15 => {
2386                        // AUTH
2387                        events.extend(self.process_recv_v5_0_auth(raw_packet));
2388                    }
2389                    // invalid packet type
2390                    _ => {
2391                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2392                    }
2393                }
2394            }
2395            Version::Undetermined => {
2396                match packet_type {
2397                    1 => {
2398                        // CONNECT
2399                        if raw_packet.remaining_length() < 7 {
2400                            events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2401                            return events;
2402                        }
2403                        match raw_packet.data_as_slice()[6] {
2404                            // Protocol Version
2405                            4 => {
2406                                self.protocol_version = Version::V3_1_1;
2407                                events.extend(self.process_recv_v3_1_1_connect(raw_packet));
2408                            }
2409                            5 => {
2410                                self.protocol_version = Version::V5_0;
2411                                events.extend(self.process_recv_v5_0_connect(raw_packet));
2412                            }
2413                            _ => {
2414                                events.push(GenericEvent::NotifyError(
2415                                    MqttError::UnsupportedProtocolVersion,
2416                                ));
2417                            }
2418                        }
2419                    }
2420                    _ => {
2421                        events.push(GenericEvent::NotifyError(MqttError::MalformedPacket));
2422                    }
2423                }
2424            }
2425        }
2426
2427        events
2428    }
2429
2430    fn process_recv_v3_1_1_connect(
2431        &mut self,
2432        raw_packet: RawPacket,
2433    ) -> Vec<GenericEvent<PacketIdType>> {
2434        let mut events = Vec::new();
2435        match v3_1_1::Connect::parse(raw_packet.data_as_slice()) {
2436            Ok((packet, _)) => {
2437                if self.status != ConnectionStatus::Disconnected {
2438                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2439                    return events;
2440                }
2441                self.initialize(false);
2442                self.status = ConnectionStatus::Connecting;
2443                if packet.keep_alive() > 0 {
2444                    self.pingreq_recv_timeout_ms =
2445                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2446                }
2447                if packet.clean_session() {
2448                    self.clear_store_related();
2449                } else {
2450                    self.need_store = true;
2451                }
2452                events.extend(self.refresh_pingreq_recv());
2453                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2454            }
2455            Err(e) => {
2456                if self.status == ConnectionStatus::Disconnected {
2457                    self.status = ConnectionStatus::Connecting;
2458                    let rc = match e {
2459                        MqttError::ClientIdentifierNotValid => {
2460                            ConnectReturnCode::IdentifierRejected
2461                        }
2462                        MqttError::BadUserNameOrPassword => {
2463                            ConnectReturnCode::BadUserNameOrPassword
2464                        }
2465                        MqttError::UnsupportedProtocolVersion => {
2466                            ConnectReturnCode::UnacceptableProtocolVersion
2467                        }
2468                        _ => ConnectReturnCode::NotAuthorized, // TBD close could be better
2469                    };
2470                    let connack = v3_1_1::Connack::builder().return_code(rc).build().unwrap();
2471                    let connack_events = self.process_send_v3_1_1_connack(connack);
2472                    events.extend(connack_events);
2473                } else {
2474                    events.push(GenericEvent::RequestClose);
2475                }
2476                events.push(GenericEvent::NotifyError(e));
2477            }
2478        }
2479
2480        events
2481    }
2482
2483    fn process_recv_v5_0_connect(
2484        &mut self,
2485        raw_packet: RawPacket,
2486    ) -> Vec<GenericEvent<PacketIdType>> {
2487        let mut events = Vec::new();
2488        match v5_0::Connect::parse(raw_packet.data_as_slice()) {
2489            Ok((packet, _)) => {
2490                if self.status != ConnectionStatus::Disconnected {
2491                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2492                    return events;
2493                }
2494                self.initialize(false);
2495                self.status = ConnectionStatus::Connecting;
2496                if packet.keep_alive() > 0 {
2497                    self.pingreq_recv_timeout_ms =
2498                        Some((packet.keep_alive() as u64) * 1000 * 3 / 2);
2499                }
2500                if packet.clean_start() {
2501                    self.clear_store_related();
2502                }
2503                packet.props().iter().for_each(|prop| match prop {
2504                    Property::TopicAliasMaximum(p) => {
2505                        self.topic_alias_send = Some(TopicAliasSend::new(p.val()));
2506                    }
2507                    Property::ReceiveMaximum(p) => {
2508                        self.publish_send_max = Some(p.val());
2509                    }
2510                    Property::MaximumPacketSize(p) => {
2511                        self.maximum_packet_size_send = p.val();
2512                    }
2513                    Property::SessionExpiryInterval(p) => {
2514                        if p.val() != 0 {
2515                            self.need_store = true;
2516                        }
2517                    }
2518                    _ => {}
2519                });
2520                events.extend(self.refresh_pingreq_recv());
2521                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2522            }
2523            Err(e) => {
2524                if self.status == ConnectionStatus::Disconnected {
2525                    self.status = ConnectionStatus::Connecting;
2526                    let rc = match e {
2527                        MqttError::ClientIdentifierNotValid => {
2528                            ConnectReasonCode::ClientIdentifierNotValid
2529                        }
2530                        MqttError::BadUserNameOrPassword => {
2531                            ConnectReasonCode::BadAuthenticationMethod
2532                        }
2533                        MqttError::UnsupportedProtocolVersion => {
2534                            ConnectReasonCode::UnsupportedProtocolVersion
2535                        }
2536                        _ => ConnectReasonCode::UnspecifiedError,
2537                    };
2538                    let connack = v5_0::Connack::builder().reason_code(rc).build().unwrap();
2539                    let connack_events = self.process_send_v5_0_connack(connack);
2540                    events.extend(connack_events);
2541                } else {
2542                    let disconnect = v5_0::Disconnect::builder()
2543                        .reason_code(DisconnectReasonCode::ProtocolError)
2544                        .build()
2545                        .unwrap();
2546                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2547                    events.extend(disconnect_events);
2548                }
2549                events.push(GenericEvent::NotifyError(e));
2550            }
2551        }
2552
2553        events
2554    }
2555
2556    fn process_recv_v3_1_1_connack(
2557        &mut self,
2558        raw_packet: RawPacket,
2559    ) -> Vec<GenericEvent<PacketIdType>> {
2560        let mut events = Vec::new();
2561
2562        match v3_1_1::Connack::parse(raw_packet.data_as_slice()) {
2563            Ok((packet, _consumed)) => {
2564                if packet.return_code() == ConnectReturnCode::Accepted {
2565                    self.status = ConnectionStatus::Connected;
2566                    if packet.session_present() {
2567                        events.extend(self.send_stored());
2568                    } else {
2569                        self.clear_store_related();
2570                    }
2571                }
2572                events.push(GenericEvent::NotifyPacketReceived(
2573                    GenericPacket::V3_1_1Connack(packet),
2574                ));
2575            }
2576            Err(e) => {
2577                events.push(GenericEvent::RequestClose);
2578                events.push(GenericEvent::NotifyError(e));
2579            }
2580        }
2581
2582        events
2583    }
2584
2585    fn process_recv_v5_0_connack(
2586        &mut self,
2587        raw_packet: RawPacket,
2588    ) -> Vec<GenericEvent<PacketIdType>> {
2589        let mut events = Vec::new();
2590
2591        match v5_0::Connack::parse(raw_packet.data_as_slice()) {
2592            Ok((packet, _consumed)) => {
2593                if packet.reason_code() == ConnectReasonCode::Success {
2594                    self.status = ConnectionStatus::Connected;
2595
2596                    // Process properties
2597                    for prop in packet.props() {
2598                        match prop {
2599                            Property::TopicAliasMaximum(val) => {
2600                                if val.val() > 0 {
2601                                    self.topic_alias_send = Some(TopicAliasSend::new(val.val()));
2602                                }
2603                            }
2604                            Property::ReceiveMaximum(val) => {
2605                                assert!(val.val() != 0);
2606                                self.publish_send_max = Some(val.val());
2607                            }
2608                            Property::MaximumPacketSize(val) => {
2609                                assert!(val.val() != 0);
2610                                self.maximum_packet_size_send = val.val();
2611                            }
2612                            Property::ServerKeepAlive(val) => {
2613                                // Set PINGREQ send interval if this is a client
2614                                let timeout_ms = (val.val() as u64) * 1000;
2615                                self.pingreq_send_interval_ms = Some(timeout_ms);
2616                            }
2617                            _ => {
2618                                // Ignore other properties
2619                            }
2620                        }
2621                    }
2622
2623                    if packet.session_present() {
2624                        events.extend(self.send_stored());
2625                    } else {
2626                        self.clear_store_related();
2627                    }
2628                }
2629                events.push(GenericEvent::NotifyPacketReceived(
2630                    GenericPacket::V5_0Connack(packet),
2631                ));
2632            }
2633            Err(e) => {
2634                if self.status == ConnectionStatus::Connected {
2635                    let disconnect = v5_0::Disconnect::builder()
2636                        .reason_code(e.into())
2637                        .build()
2638                        .unwrap();
2639                    let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2640                    events.extend(disconnect_events);
2641                }
2642                events.push(GenericEvent::NotifyError(e));
2643            }
2644        }
2645
2646        events
2647    }
2648
2649    fn process_recv_v3_1_1_publish(
2650        &mut self,
2651        raw_packet: RawPacket,
2652    ) -> Vec<GenericEvent<PacketIdType>> {
2653        let mut events = Vec::new();
2654
2655        let flags = raw_packet.flags();
2656        match &raw_packet.data {
2657            PacketData::Publish(arc) => {
2658                match v3_1_1::GenericPublish::parse(flags, arc.clone()) {
2659                    Ok((packet, _consumed)) => {
2660                        match packet.qos() {
2661                            Qos::AtMostOnce => {
2662                                events.extend(self.refresh_pingreq_recv());
2663                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2664                            }
2665                            Qos::AtLeastOnce => {
2666                                let packet_id = packet.packet_id().unwrap();
2667                                if self.status == ConnectionStatus::Connected
2668                                    && self.auto_pub_response
2669                                {
2670                                    // Send PUBACK automatically
2671                                    let puback = v3_1_1::GenericPuback::builder()
2672                                        .packet_id(packet_id)
2673                                        .build()
2674                                        .unwrap();
2675                                    events.extend(self.process_send_v3_1_1_puback(puback));
2676                                }
2677                                events.extend(self.refresh_pingreq_recv());
2678                                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2679                            }
2680                            Qos::ExactlyOnce => {
2681                                let packet_id = packet.packet_id().unwrap();
2682                                let already_handled = !self.qos2_publish_handled.insert(packet_id);
2683
2684                                if self.status == ConnectionStatus::Connected
2685                                    && (self.auto_pub_response || already_handled)
2686                                {
2687                                    let pubrec = v3_1_1::GenericPubrec::builder()
2688                                        .packet_id(packet_id)
2689                                        .build()
2690                                        .unwrap();
2691                                    events.extend(self.process_send_v3_1_1_pubrec(pubrec));
2692                                }
2693                                events.extend(self.refresh_pingreq_recv());
2694                                if !already_handled {
2695                                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2696                                }
2697                            }
2698                        }
2699                    }
2700                    Err(e) => {
2701                        events.push(GenericEvent::RequestClose);
2702                        events.push(GenericEvent::NotifyError(e));
2703                    }
2704                }
2705            }
2706            PacketData::Normal(_) => {
2707                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2708            }
2709        }
2710
2711        events
2712    }
2713
2714    fn process_recv_v5_0_publish(
2715        &mut self,
2716        raw_packet: RawPacket,
2717    ) -> Vec<GenericEvent<PacketIdType>> {
2718        let mut events = Vec::new();
2719
2720        let flags = raw_packet.flags();
2721        match &raw_packet.data {
2722            PacketData::Publish(arc) => {
2723                match v5_0::GenericPublish::parse(flags, arc.clone()) {
2724                    Ok((packet, _consumed)) => {
2725                        let mut already_handled = false;
2726                        let mut puback_send = false;
2727                        let mut pubrec_send = false;
2728
2729                        match packet.qos() {
2730                            Qos::AtLeastOnce => {
2731                                let packet_id = packet.packet_id().unwrap();
2732                                if let Some(max) = self.publish_recv_max {
2733                                    if self.publish_recv.len() >= max as usize {
2734                                        let disconnect = v5_0::Disconnect::builder()
2735                                            .reason_code(
2736                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2737                                            )
2738                                            .build()
2739                                            .unwrap();
2740                                        events
2741                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2742                                        events.push(GenericEvent::NotifyError(
2743                                            MqttError::ReceiveMaximumExceeded,
2744                                        ));
2745                                        return events;
2746                                    }
2747                                }
2748                                self.publish_recv.insert(packet_id);
2749                                if self.auto_pub_response
2750                                    && self.status == ConnectionStatus::Connected
2751                                {
2752                                    puback_send = true;
2753                                }
2754                            }
2755                            Qos::ExactlyOnce => {
2756                                let packet_id = packet.packet_id().unwrap();
2757                                if let Some(max) = self.publish_recv_max {
2758                                    if self.publish_recv.len() >= max as usize {
2759                                        let disconnect = v5_0::Disconnect::builder()
2760                                            .reason_code(
2761                                                DisconnectReasonCode::ReceiveMaximumExceeded,
2762                                            )
2763                                            .build()
2764                                            .unwrap();
2765                                        events
2766                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2767                                        events.push(GenericEvent::NotifyError(
2768                                            MqttError::ReceiveMaximumExceeded,
2769                                        ));
2770                                        return events;
2771                                    }
2772                                }
2773                                self.publish_recv.insert(packet_id);
2774
2775                                if !self.qos2_publish_handled.insert(packet_id) {
2776                                    already_handled = true;
2777                                }
2778                                if self.status == ConnectionStatus::Connected
2779                                    && (self.auto_pub_response || already_handled)
2780                                {
2781                                    pubrec_send = true;
2782                                }
2783                            }
2784                            Qos::AtMostOnce => {
2785                                // No packet ID handling for QoS 0
2786                            }
2787                        }
2788
2789                        // Topic Alias handling
2790                        if packet.topic_name().is_empty() {
2791                            // Extract topic from topic_alias
2792                            if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2793                                if ta == 0
2794                                    || self.topic_alias_recv.is_none()
2795                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2796                                {
2797                                    let disconnect = v5_0::Disconnect::builder()
2798                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2799                                        .build()
2800                                        .unwrap();
2801                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2802                                    events.push(GenericEvent::NotifyError(
2803                                        MqttError::TopicAliasInvalid,
2804                                    ));
2805                                    return events;
2806                                }
2807
2808                                if let Some(ref topic_alias_recv) = self.topic_alias_recv {
2809                                    if topic_alias_recv.get(ta).is_none() {
2810                                        let disconnect = v5_0::Disconnect::builder()
2811                                            .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2812                                            .build()
2813                                            .unwrap();
2814                                        events
2815                                            .extend(self.process_send_v5_0_disconnect(disconnect));
2816                                        events.push(GenericEvent::NotifyError(
2817                                            MqttError::TopicAliasInvalid,
2818                                        ));
2819                                        return events;
2820                                    }
2821                                    // Note: In a complete implementation, we would modify the packet
2822                                    // to add the resolved topic. For now, we'll proceed.
2823                                }
2824                            } else {
2825                                let disconnect = v5_0::Disconnect::builder()
2826                                    .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2827                                    .build()
2828                                    .unwrap();
2829                                events.extend(self.process_send_v5_0_disconnect(disconnect));
2830                                events
2831                                    .push(GenericEvent::NotifyError(MqttError::TopicAliasInvalid));
2832                                return events;
2833                            }
2834                        } else {
2835                            // Topic is not empty, check if topic alias needs to be registered
2836                            if let Some(ta) = Self::get_topic_alias_from_props_opt(packet.props()) {
2837                                if ta == 0
2838                                    || self.topic_alias_recv.is_none()
2839                                    || ta > self.topic_alias_recv.as_ref().unwrap().max()
2840                                {
2841                                    let disconnect = v5_0::Disconnect::builder()
2842                                        .reason_code(DisconnectReasonCode::TopicAliasInvalid)
2843                                        .build()
2844                                        .unwrap();
2845                                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2846                                    events.push(GenericEvent::NotifyError(
2847                                        MqttError::TopicAliasInvalid,
2848                                    ));
2849                                    return events;
2850                                }
2851                                if let Some(ref mut topic_alias_recv) = self.topic_alias_recv {
2852                                    topic_alias_recv.insert_or_update(packet.topic_name(), ta);
2853                                }
2854                            }
2855                        }
2856
2857                        // Send response packets
2858                        if puback_send {
2859                            let puback = v5_0::GenericPuback::builder()
2860                                .packet_id(packet.packet_id().unwrap())
2861                                .build()
2862                                .unwrap();
2863                            events.extend(self.process_send_v5_0_puback(puback));
2864                        }
2865                        if pubrec_send {
2866                            let pubrec = v5_0::GenericPubrec::builder()
2867                                .packet_id(packet.packet_id().unwrap())
2868                                .build()
2869                                .unwrap();
2870                            events.extend(self.process_send_v5_0_pubrec(pubrec));
2871                        }
2872
2873                        // Refresh PINGREQ receive timer
2874                        events.extend(self.refresh_pingreq_recv());
2875
2876                        // Notify packet received (only if not already handled)
2877                        if !already_handled {
2878                            events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2879                        }
2880                    }
2881                    Err(e) => {
2882                        if self.status == ConnectionStatus::Connected {
2883                            let disconnect = v5_0::Disconnect::builder()
2884                                .reason_code(e.into())
2885                                .build()
2886                                .unwrap();
2887                            let disconnect_events = self.process_send_v5_0_disconnect(disconnect);
2888                            events.extend(disconnect_events);
2889                        }
2890                        events.push(GenericEvent::NotifyError(e));
2891                    }
2892                }
2893            }
2894            PacketData::Normal(_) => {
2895                unreachable!("PUBLISH packet must use PacketData::Publish variant");
2896            }
2897        }
2898
2899        events
2900    }
2901
2902    fn process_recv_v3_1_1_puback(
2903        &mut self,
2904        raw_packet: RawPacket,
2905    ) -> Vec<GenericEvent<PacketIdType>> {
2906        let mut events = Vec::new();
2907
2908        match v3_1_1::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2909            Ok((packet, _)) => {
2910                let packet_id = packet.packet_id();
2911                if self.pid_puback.remove(&packet_id) {
2912                    self.store.erase(ResponsePacket::V3_1_1Puback, packet_id);
2913                    if self.pid_man.is_used_id(packet_id) {
2914                        self.pid_man.release_id(packet_id);
2915                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2916                    }
2917                    events.extend(self.refresh_pingreq_recv());
2918                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2919                } else {
2920                    events.push(GenericEvent::RequestClose);
2921                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2922                }
2923            }
2924            Err(e) => {
2925                events.push(GenericEvent::RequestClose);
2926                events.push(GenericEvent::NotifyError(e));
2927            }
2928        }
2929
2930        events
2931    }
2932
2933    fn process_recv_v5_0_puback(
2934        &mut self,
2935        raw_packet: RawPacket,
2936    ) -> Vec<GenericEvent<PacketIdType>> {
2937        let mut events = Vec::new();
2938
2939        match v5_0::GenericPuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2940            Ok((packet, _)) => {
2941                let packet_id = packet.packet_id();
2942                if self.pid_puback.remove(&packet_id) {
2943                    self.store.erase(ResponsePacket::V5_0Puback, packet_id);
2944                    if self.pid_man.is_used_id(packet_id) {
2945                        self.pid_man.release_id(packet_id);
2946                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
2947                    }
2948                    if self.publish_send_max.is_some() {
2949                        self.publish_send_count -= 1;
2950                    }
2951                    events.extend(self.refresh_pingreq_recv());
2952                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2953                } else {
2954                    let disconnect = v5_0::Disconnect::builder()
2955                        .reason_code(DisconnectReasonCode::ProtocolError)
2956                        .build()
2957                        .unwrap();
2958                    events.extend(self.process_send_v5_0_disconnect(disconnect));
2959                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2960                }
2961            }
2962            Err(e) => {
2963                let disconnect = v5_0::Disconnect::builder()
2964                    .reason_code(DisconnectReasonCode::ProtocolError)
2965                    .build()
2966                    .unwrap();
2967                events.extend(self.process_send_v5_0_disconnect(disconnect));
2968                events.push(GenericEvent::NotifyError(e));
2969            }
2970        }
2971
2972        events
2973    }
2974
2975    fn process_recv_v3_1_1_pubrec(
2976        &mut self,
2977        raw_packet: RawPacket,
2978    ) -> Vec<GenericEvent<PacketIdType>> {
2979        let mut events = Vec::new();
2980
2981        match v3_1_1::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
2982            Ok((packet, _)) => {
2983                let packet_id = packet.packet_id();
2984                if self.pid_pubrec.remove(&packet_id) {
2985                    self.store.erase(ResponsePacket::V3_1_1Pubrec, packet_id);
2986                    if self.auto_pub_response && self.status == ConnectionStatus::Connected {
2987                        let pubrel = v3_1_1::GenericPubrel::<PacketIdType>::builder()
2988                            .packet_id(packet_id)
2989                            .build()
2990                            .unwrap();
2991                        events.extend(self.process_send_v3_1_1_pubrel(pubrel));
2992                    }
2993                    events.extend(self.refresh_pingreq_recv());
2994                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
2995                } else {
2996                    events.push(GenericEvent::RequestClose);
2997                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
2998                }
2999            }
3000            Err(e) => {
3001                events.push(GenericEvent::RequestClose);
3002                events.push(GenericEvent::NotifyError(e));
3003            }
3004        }
3005
3006        events
3007    }
3008
3009    fn process_recv_v5_0_pubrec(
3010        &mut self,
3011        raw_packet: RawPacket,
3012    ) -> Vec<GenericEvent<PacketIdType>> {
3013        let mut events = Vec::new();
3014
3015        match v5_0::GenericPubrec::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3016            Ok((packet, _)) => {
3017                let packet_id = packet.packet_id();
3018                if self.pid_pubrec.remove(&packet_id) {
3019                    self.store.erase(ResponsePacket::V5_0Pubrec, packet_id);
3020                    if let Some(reason_code) = packet.reason_code() {
3021                        if reason_code != PubrecReasonCode::Success {
3022                            if self.pid_man.is_used_id(packet_id) {
3023                                self.pid_man.release_id(packet_id);
3024                                events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3025                            }
3026                            self.qos2_publish_processing.remove(&packet_id);
3027                            if self.publish_send_max.is_some() {
3028                                self.publish_send_count -= 1;
3029                            }
3030                        } else if self.auto_pub_response
3031                            && self.status == ConnectionStatus::Connected
3032                        {
3033                            let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3034                                .packet_id(packet_id)
3035                                .build()
3036                                .unwrap();
3037                            events.extend(self.process_send_v5_0_pubrel(pubrel));
3038                        }
3039                    } else if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3040                        let pubrel = v5_0::GenericPubrel::<PacketIdType>::builder()
3041                            .packet_id(packet_id)
3042                            .build()
3043                            .unwrap();
3044                        events.extend(self.process_send_v5_0_pubrel(pubrel));
3045                    }
3046                    events.extend(self.refresh_pingreq_recv());
3047                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3048                } else {
3049                    let disconnect = v5_0::Disconnect::builder()
3050                        .reason_code(DisconnectReasonCode::ProtocolError)
3051                        .build()
3052                        .unwrap();
3053                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3054                    events.push(GenericEvent::NotifyError(MqttError::from(
3055                        DisconnectReasonCode::ProtocolError,
3056                    )));
3057                }
3058            }
3059            Err(e) => {
3060                let disconnect = v5_0::Disconnect::builder()
3061                    .reason_code(DisconnectReasonCode::ProtocolError)
3062                    .build()
3063                    .unwrap();
3064                events.extend(self.process_send_v5_0_disconnect(disconnect));
3065                events.push(GenericEvent::NotifyError(e));
3066            }
3067        }
3068
3069        events
3070    }
3071
3072    fn process_recv_v3_1_1_pubrel(
3073        &mut self,
3074        raw_packet: RawPacket,
3075    ) -> Vec<GenericEvent<PacketIdType>> {
3076        let mut events = Vec::new();
3077
3078        match v3_1_1::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3079            Ok((packet, _)) => {
3080                let packet_id = packet.packet_id();
3081                self.qos2_publish_handled.remove(&packet_id);
3082                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3083                    let pubcomp = v3_1_1::GenericPubcomp::<PacketIdType>::builder()
3084                        .packet_id(packet_id)
3085                        .build()
3086                        .unwrap();
3087                    events.extend(self.process_send_v3_1_1_pubcomp(pubcomp));
3088                }
3089                events.extend(self.refresh_pingreq_recv());
3090                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3091            }
3092            Err(e) => {
3093                events.push(GenericEvent::RequestClose);
3094                events.push(GenericEvent::NotifyError(e));
3095            }
3096        }
3097
3098        events
3099    }
3100
3101    fn process_recv_v5_0_pubrel(
3102        &mut self,
3103        raw_packet: RawPacket,
3104    ) -> Vec<GenericEvent<PacketIdType>> {
3105        let mut events = Vec::new();
3106
3107        match v5_0::GenericPubrel::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3108            Ok((packet, _)) => {
3109                let packet_id = packet.packet_id();
3110                self.qos2_publish_handled.remove(&packet_id);
3111                if self.auto_pub_response && self.status == ConnectionStatus::Connected {
3112                    let pubcomp = v5_0::GenericPubcomp::<PacketIdType>::builder()
3113                        .packet_id(packet_id)
3114                        .build()
3115                        .unwrap();
3116                    events.extend(self.process_send_v5_0_pubcomp(pubcomp));
3117                }
3118                events.extend(self.refresh_pingreq_recv());
3119                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3120            }
3121            Err(e) => {
3122                let disconnect = v5_0::Disconnect::builder()
3123                    .reason_code(DisconnectReasonCode::ProtocolError)
3124                    .build()
3125                    .unwrap();
3126                events.extend(self.process_send_v5_0_disconnect(disconnect));
3127                events.push(GenericEvent::NotifyError(e));
3128            }
3129        }
3130
3131        events
3132    }
3133
3134    fn process_recv_v3_1_1_pubcomp(
3135        &mut self,
3136        raw_packet: RawPacket,
3137    ) -> Vec<GenericEvent<PacketIdType>> {
3138        let mut events = Vec::new();
3139
3140        match v3_1_1::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3141            Ok((packet, _)) => {
3142                let packet_id = packet.packet_id();
3143                if self.pid_pubcomp.remove(&packet_id) {
3144                    self.store.erase(ResponsePacket::V3_1_1Pubcomp, packet_id);
3145                    if self.pid_man.is_used_id(packet_id) {
3146                        self.pid_man.release_id(packet_id);
3147                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3148                    }
3149                    self.qos2_publish_processing.remove(&packet_id);
3150                    events.extend(self.refresh_pingreq_recv());
3151                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3152                } else {
3153                    events.push(GenericEvent::RequestClose);
3154                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3155                }
3156            }
3157            Err(e) => {
3158                events.push(GenericEvent::RequestClose);
3159                events.push(GenericEvent::NotifyError(e));
3160            }
3161        }
3162
3163        events
3164    }
3165
3166    fn process_recv_v5_0_pubcomp(
3167        &mut self,
3168        raw_packet: RawPacket,
3169    ) -> Vec<GenericEvent<PacketIdType>> {
3170        let mut events = Vec::new();
3171
3172        match v5_0::GenericPubcomp::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3173            Ok((packet, _)) => {
3174                let packet_id = packet.packet_id();
3175                if self.pid_pubcomp.remove(&packet_id) {
3176                    self.store.erase(ResponsePacket::V5_0Pubcomp, packet_id);
3177                    if self.pid_man.is_used_id(packet_id) {
3178                        self.pid_man.release_id(packet_id);
3179                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3180                    }
3181                    self.qos2_publish_processing.remove(&packet_id);
3182                    if self.publish_send_max.is_some() {
3183                        self.publish_send_count -= 1;
3184                    }
3185                    events.extend(self.refresh_pingreq_recv());
3186                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3187                } else {
3188                    let disconnect = v5_0::Disconnect::builder()
3189                        .reason_code(DisconnectReasonCode::ProtocolError)
3190                        .build()
3191                        .unwrap();
3192                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3193                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3194                }
3195            }
3196            Err(e) => {
3197                let disconnect = v5_0::Disconnect::builder()
3198                    .reason_code(DisconnectReasonCode::ProtocolError)
3199                    .build()
3200                    .unwrap();
3201                events.extend(self.process_send_v5_0_disconnect(disconnect));
3202                events.push(GenericEvent::NotifyError(e));
3203            }
3204        }
3205
3206        events
3207    }
3208
3209    fn process_recv_v3_1_1_subscribe(
3210        &mut self,
3211        raw_packet: RawPacket,
3212    ) -> Vec<GenericEvent<PacketIdType>> {
3213        let mut events = Vec::new();
3214
3215        match v3_1_1::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3216            Ok((packet, _)) => {
3217                events.extend(self.refresh_pingreq_recv());
3218                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3219            }
3220            Err(e) => {
3221                events.push(GenericEvent::RequestClose);
3222                events.push(GenericEvent::NotifyError(e));
3223            }
3224        }
3225
3226        events
3227    }
3228
3229    fn process_recv_v5_0_subscribe(
3230        &mut self,
3231        raw_packet: RawPacket,
3232    ) -> Vec<GenericEvent<PacketIdType>> {
3233        let mut events = Vec::new();
3234
3235        match v5_0::GenericSubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3236            Ok((packet, _)) => {
3237                events.extend(self.refresh_pingreq_recv());
3238                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3239            }
3240            Err(e) => {
3241                let disconnect = v5_0::Disconnect::builder()
3242                    .reason_code(DisconnectReasonCode::ProtocolError)
3243                    .build()
3244                    .unwrap();
3245                events.extend(self.process_send_v5_0_disconnect(disconnect));
3246                events.push(GenericEvent::NotifyError(e));
3247            }
3248        }
3249
3250        events
3251    }
3252
3253    fn process_recv_v3_1_1_suback(
3254        &mut self,
3255        raw_packet: RawPacket,
3256    ) -> Vec<GenericEvent<PacketIdType>> {
3257        let mut events = Vec::new();
3258
3259        match v3_1_1::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3260            Ok((packet, _)) => {
3261                let packet_id = packet.packet_id();
3262                if self.pid_suback.remove(&packet_id) {
3263                    if self.pid_man.is_used_id(packet_id) {
3264                        self.pid_man.release_id(packet_id);
3265                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3266                    }
3267                    events.extend(self.refresh_pingreq_recv());
3268                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3269                } else {
3270                    events.push(GenericEvent::RequestClose);
3271                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3272                }
3273            }
3274            Err(e) => {
3275                events.push(GenericEvent::RequestClose);
3276                events.push(GenericEvent::NotifyError(e));
3277            }
3278        }
3279
3280        events
3281    }
3282
3283    fn process_recv_v5_0_suback(
3284        &mut self,
3285        raw_packet: RawPacket,
3286    ) -> Vec<GenericEvent<PacketIdType>> {
3287        let mut events = Vec::new();
3288
3289        match v5_0::GenericSuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3290            Ok((packet, _)) => {
3291                let packet_id = packet.packet_id();
3292                if self.pid_suback.remove(&packet_id) {
3293                    if self.pid_man.is_used_id(packet_id) {
3294                        self.pid_man.release_id(packet_id);
3295                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3296                    }
3297                    events.extend(self.refresh_pingreq_recv());
3298                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3299                } else {
3300                    let disconnect = v5_0::Disconnect::builder()
3301                        .reason_code(DisconnectReasonCode::ProtocolError)
3302                        .build()
3303                        .unwrap();
3304                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3305                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3306                }
3307            }
3308            Err(e) => {
3309                let disconnect = v5_0::Disconnect::builder()
3310                    .reason_code(DisconnectReasonCode::ProtocolError)
3311                    .build()
3312                    .unwrap();
3313                events.extend(self.process_send_v5_0_disconnect(disconnect));
3314                events.push(GenericEvent::NotifyError(e));
3315            }
3316        }
3317
3318        events
3319    }
3320
3321    fn process_recv_v3_1_1_unsubscribe(
3322        &mut self,
3323        raw_packet: RawPacket,
3324    ) -> Vec<GenericEvent<PacketIdType>> {
3325        let mut events = Vec::new();
3326
3327        match v3_1_1::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3328            Ok((packet, _)) => {
3329                events.extend(self.refresh_pingreq_recv());
3330                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3331            }
3332            Err(e) => {
3333                events.push(GenericEvent::RequestClose);
3334                events.push(GenericEvent::NotifyError(e));
3335            }
3336        }
3337
3338        events
3339    }
3340
3341    fn process_recv_v5_0_unsubscribe(
3342        &mut self,
3343        raw_packet: RawPacket,
3344    ) -> Vec<GenericEvent<PacketIdType>> {
3345        let mut events = Vec::new();
3346
3347        match v5_0::GenericUnsubscribe::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3348            Ok((packet, _)) => {
3349                events.extend(self.refresh_pingreq_recv());
3350                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3351            }
3352            Err(e) => {
3353                let disconnect = v5_0::Disconnect::builder()
3354                    .reason_code(DisconnectReasonCode::ProtocolError)
3355                    .build()
3356                    .unwrap();
3357                events.extend(self.process_send_v5_0_disconnect(disconnect));
3358                events.push(GenericEvent::NotifyError(e));
3359            }
3360        }
3361
3362        events
3363    }
3364
3365    fn process_recv_v3_1_1_unsuback(
3366        &mut self,
3367        raw_packet: RawPacket,
3368    ) -> Vec<GenericEvent<PacketIdType>> {
3369        let mut events = Vec::new();
3370
3371        match v3_1_1::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3372            Ok((packet, _)) => {
3373                let packet_id = packet.packet_id();
3374                if self.pid_unsuback.remove(&packet_id) {
3375                    if self.pid_man.is_used_id(packet_id) {
3376                        self.pid_man.release_id(packet_id);
3377                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3378                    }
3379                    events.extend(self.refresh_pingreq_recv());
3380                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3381                } else {
3382                    events.push(GenericEvent::RequestClose);
3383                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3384                }
3385            }
3386            Err(e) => {
3387                events.push(GenericEvent::RequestClose);
3388                events.push(GenericEvent::NotifyError(e));
3389            }
3390        }
3391
3392        events
3393    }
3394
3395    fn process_recv_v5_0_unsuback(
3396        &mut self,
3397        raw_packet: RawPacket,
3398    ) -> Vec<GenericEvent<PacketIdType>> {
3399        let mut events = Vec::new();
3400
3401        match v5_0::GenericUnsuback::<PacketIdType>::parse(raw_packet.data_as_slice()) {
3402            Ok((packet, _)) => {
3403                let packet_id = packet.packet_id();
3404                if self.pid_unsuback.remove(&packet_id) {
3405                    if self.pid_man.is_used_id(packet_id) {
3406                        self.pid_man.release_id(packet_id);
3407                        events.push(GenericEvent::NotifyPacketIdReleased(packet_id));
3408                    }
3409                    events.extend(self.refresh_pingreq_recv());
3410                    events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3411                } else {
3412                    let disconnect = v5_0::Disconnect::builder()
3413                        .reason_code(DisconnectReasonCode::ProtocolError)
3414                        .build()
3415                        .unwrap();
3416                    events.extend(self.process_send_v5_0_disconnect(disconnect));
3417                    events.push(GenericEvent::NotifyError(MqttError::ProtocolError));
3418                }
3419            }
3420            Err(e) => {
3421                let disconnect = v5_0::Disconnect::builder()
3422                    .reason_code(DisconnectReasonCode::ProtocolError)
3423                    .build()
3424                    .unwrap();
3425                events.extend(self.process_send_v5_0_disconnect(disconnect));
3426                events.push(GenericEvent::NotifyError(e));
3427            }
3428        }
3429
3430        events
3431    }
3432
3433    fn process_recv_v3_1_1_pingreq(
3434        &mut self,
3435        raw_packet: RawPacket,
3436    ) -> Vec<GenericEvent<PacketIdType>> {
3437        let mut events = Vec::new();
3438
3439        match v3_1_1::Pingreq::parse(raw_packet.data_as_slice()) {
3440            Ok((packet, _)) => {
3441                if (Role::IS_SERVER || Role::IS_ANY)
3442                    && !self.is_client
3443                    && self.auto_ping_response
3444                    && self.status == ConnectionStatus::Connected
3445                {
3446                    let pingresp = v3_1_1::Pingresp::new();
3447                    events.extend(self.process_send_v3_1_1_pingresp(pingresp));
3448                }
3449                events.extend(self.refresh_pingreq_recv());
3450                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3451            }
3452            Err(e) => {
3453                events.push(GenericEvent::RequestClose);
3454                events.push(GenericEvent::NotifyError(e));
3455            }
3456        }
3457
3458        events
3459    }
3460
3461    fn process_recv_v5_0_pingreq(
3462        &mut self,
3463        raw_packet: RawPacket,
3464    ) -> Vec<GenericEvent<PacketIdType>> {
3465        let mut events = Vec::new();
3466
3467        match v5_0::Pingreq::parse(raw_packet.data_as_slice()) {
3468            Ok((packet, _)) => {
3469                if (Role::IS_SERVER || Role::IS_ANY)
3470                    && !self.is_client
3471                    && self.auto_ping_response
3472                    && self.status == ConnectionStatus::Connected
3473                {
3474                    let pingresp = v5_0::Pingresp::new();
3475                    events.extend(self.process_send_v5_0_pingresp(pingresp));
3476                }
3477                events.extend(self.refresh_pingreq_recv());
3478                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3479            }
3480            Err(e) => {
3481                let disconnect = v5_0::Disconnect::builder()
3482                    .reason_code(DisconnectReasonCode::ProtocolError)
3483                    .build()
3484                    .unwrap();
3485                events.extend(self.process_send_v5_0_disconnect(disconnect));
3486                events.push(GenericEvent::NotifyError(e));
3487            }
3488        }
3489
3490        events
3491    }
3492
3493    fn process_recv_v3_1_1_pingresp(
3494        &mut self,
3495        raw_packet: RawPacket,
3496    ) -> Vec<GenericEvent<PacketIdType>> {
3497        let mut events = Vec::new();
3498
3499        match v3_1_1::Pingresp::parse(raw_packet.data_as_slice()) {
3500            Ok((packet, _)) => {
3501                self.pingresp_recv_set = false;
3502                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3503                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3504            }
3505            Err(e) => {
3506                events.push(GenericEvent::RequestClose);
3507                events.push(GenericEvent::NotifyError(e));
3508            }
3509        }
3510
3511        events
3512    }
3513
3514    fn process_recv_v5_0_pingresp(
3515        &mut self,
3516        raw_packet: RawPacket,
3517    ) -> Vec<GenericEvent<PacketIdType>> {
3518        let mut events = Vec::new();
3519
3520        match v5_0::Pingresp::parse(raw_packet.data_as_slice()) {
3521            Ok((packet, _)) => {
3522                self.pingresp_recv_set = false;
3523                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3524                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3525            }
3526            Err(e) => {
3527                let disconnect = v5_0::Disconnect::builder()
3528                    .reason_code(DisconnectReasonCode::ProtocolError)
3529                    .build()
3530                    .unwrap();
3531                events.extend(self.process_send_v5_0_disconnect(disconnect));
3532                events.push(GenericEvent::NotifyError(e));
3533            }
3534        }
3535
3536        events
3537    }
3538
3539    fn process_recv_v3_1_1_disconnect(
3540        &mut self,
3541        raw_packet: RawPacket,
3542    ) -> Vec<GenericEvent<PacketIdType>> {
3543        let mut events = Vec::new();
3544
3545        match v3_1_1::Disconnect::parse(raw_packet.data_as_slice()) {
3546            Ok((packet, _)) => {
3547                self.cancel_timers(&mut events);
3548                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3549            }
3550            Err(e) => {
3551                events.push(GenericEvent::RequestClose);
3552                events.push(GenericEvent::NotifyError(e));
3553            }
3554        }
3555
3556        events
3557    }
3558
3559    fn process_recv_v5_0_disconnect(
3560        &mut self,
3561        raw_packet: RawPacket,
3562    ) -> Vec<GenericEvent<PacketIdType>> {
3563        let mut events = Vec::new();
3564
3565        match v5_0::Disconnect::parse(raw_packet.data_as_slice()) {
3566            Ok((packet, _)) => {
3567                self.cancel_timers(&mut events);
3568                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3569            }
3570            Err(e) => {
3571                let disconnect = v5_0::Disconnect::builder()
3572                    .reason_code(DisconnectReasonCode::ProtocolError)
3573                    .build()
3574                    .unwrap();
3575                events.extend(self.process_send_v5_0_disconnect(disconnect));
3576                events.push(GenericEvent::NotifyError(e));
3577            }
3578        }
3579
3580        events
3581    }
3582
3583    fn process_recv_v5_0_auth(&mut self, raw_packet: RawPacket) -> Vec<GenericEvent<PacketIdType>> {
3584        let mut events = Vec::new();
3585
3586        match v5_0::Auth::parse(raw_packet.data_as_slice()) {
3587            Ok((packet, _)) => {
3588                events.extend(self.refresh_pingreq_recv());
3589                events.push(GenericEvent::NotifyPacketReceived(packet.into()));
3590            }
3591            Err(e) => {
3592                let disconnect = v5_0::Disconnect::builder()
3593                    .reason_code(DisconnectReasonCode::ProtocolError)
3594                    .build()
3595                    .unwrap();
3596                events.extend(self.process_send_v5_0_disconnect(disconnect));
3597                events.push(GenericEvent::NotifyError(e));
3598            }
3599        }
3600
3601        events
3602    }
3603
3604    fn get_topic_alias_from_props_opt(props: &Option<Vec<Property>>) -> Option<u16> {
3605        if let Some(props) = props {
3606            Self::get_topic_alias_from_props(props.as_slice())
3607        } else {
3608            None
3609        }
3610    }
3611
3612    fn refresh_pingreq_recv(&mut self) -> Vec<GenericEvent<PacketIdType>> {
3613        let mut events = Vec::new();
3614        if let Some(timeout_ms) = self.pingreq_recv_timeout_ms {
3615            if self.status == ConnectionStatus::Connecting
3616                || self.status == ConnectionStatus::Connected
3617            {
3618                self.pingreq_recv_set = true;
3619                events.push(GenericEvent::RequestTimerReset {
3620                    kind: TimerKind::PingreqRecv,
3621                    duration_ms: timeout_ms,
3622                });
3623            } else {
3624                self.pingreq_recv_set = false;
3625                events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3626            }
3627        }
3628
3629        events
3630    }
3631
3632    /// Cancel timers and collect events instead of calling handlers
3633    fn cancel_timers(&mut self, events: &mut Vec<GenericEvent<PacketIdType>>) {
3634        if self.pingreq_send_set {
3635            self.pingreq_send_set = false;
3636            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqSend));
3637        }
3638        if self.pingreq_recv_set {
3639            self.pingreq_recv_set = false;
3640            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingreqRecv));
3641        }
3642        if self.pingresp_recv_set {
3643            self.pingresp_recv_set = false;
3644            events.push(GenericEvent::RequestTimerCancel(TimerKind::PingrespRecv));
3645        }
3646    }
3647
3648    /// Helper function to extract TopicAlias from properties
3649    fn get_topic_alias_from_props(props: &[Property]) -> Option<u16> {
3650        for prop in props {
3651            if let Property::TopicAlias(ta) = prop {
3652                return Some(ta.val());
3653            }
3654        }
3655        None
3656    }
3657
3658    #[allow(dead_code)]
3659    fn is_packet_id_used(&self, packet_id: PacketIdType) -> bool {
3660        self.pid_man.is_used_id(packet_id)
3661    }
3662}
3663
3664// traits
3665
3666pub trait RecvBehavior<Role, PacketIdType>
3667where
3668    PacketIdType: IsPacketId,
3669{
3670    fn recv(&mut self, data: &mut std::io::Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>>;
3671}
3672
3673// RecvBehavior implementations
3674impl<PacketIdType> RecvBehavior<role::Client, PacketIdType>
3675    for GenericConnection<role::Client, PacketIdType>
3676where
3677    PacketIdType: IsPacketId,
3678{
3679    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3680        self.recv(data)
3681    }
3682}
3683
3684impl<PacketIdType> RecvBehavior<role::Server, PacketIdType>
3685    for GenericConnection<role::Server, PacketIdType>
3686where
3687    PacketIdType: IsPacketId,
3688{
3689    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3690        self.recv(data)
3691    }
3692}
3693
3694impl<PacketIdType> RecvBehavior<role::Any, PacketIdType>
3695    for GenericConnection<role::Any, PacketIdType>
3696where
3697    PacketIdType: IsPacketId,
3698{
3699    fn recv(&mut self, data: &mut Cursor<&[u8]>) -> Vec<GenericEvent<PacketIdType>> {
3700        self.recv(data)
3701    }
3702}
3703
3704// tests
3705
3706#[cfg(test)]
3707mod tests {
3708    use super::*;
3709    use crate::mqtt::connection::version::Version;
3710    use crate::mqtt::packet::TopicAliasSend;
3711    use crate::mqtt::role;
3712
3713    #[test]
3714    fn test_initialize_client_mode() {
3715        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3716
3717        // Initialize in client mode
3718        connection.initialize(true);
3719
3720        // Verify client mode is set
3721        assert!(connection.is_client);
3722        assert_eq!(connection.publish_send_count, 0);
3723        assert!(connection.publish_send_max.is_none());
3724        assert!(connection.publish_recv_max.is_none());
3725        assert!(!connection.need_store);
3726    }
3727
3728    #[test]
3729    fn test_initialize_server_mode() {
3730        let mut connection = GenericConnection::<role::Server, u32>::new(Version::V3_1_1);
3731
3732        // Initialize in server mode
3733        connection.initialize(false);
3734
3735        // Verify server mode is set
3736        assert!(!connection.is_client);
3737        assert_eq!(connection.publish_send_count, 0);
3738        assert!(connection.publish_send_max.is_none());
3739        assert!(connection.publish_recv_max.is_none());
3740        assert!(!connection.need_store);
3741    }
3742
3743    #[test]
3744    fn test_validate_topic_alias_no_topic_alias_send() {
3745        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3746
3747        // Should return None when topic_alias_send is not configured
3748        let result = connection.validate_topic_alias(Some(1));
3749        assert!(result.is_none());
3750    }
3751
3752    #[test]
3753    fn test_validate_topic_alias_none_input() {
3754        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3755
3756        // Should return None when no topic alias is provided
3757        let result = connection.validate_topic_alias(None);
3758        assert!(result.is_none());
3759    }
3760
3761    #[test]
3762    fn test_validate_topic_alias_range_no_topic_alias_send() {
3763        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3764
3765        // Should return false when topic_alias_send is not configured
3766        let result = connection.validate_topic_alias_range(1);
3767        assert!(!result);
3768    }
3769
3770    #[test]
3771    fn test_validate_topic_alias_range_zero() {
3772        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3773
3774        // Set up topic alias send with max 10
3775        let topic_alias_send = TopicAliasSend::new(10);
3776        connection.topic_alias_send = Some(topic_alias_send);
3777
3778        // Should return false for alias 0
3779        let result = connection.validate_topic_alias_range(0);
3780        assert!(!result);
3781    }
3782
3783    #[test]
3784    fn test_validate_topic_alias_range_over_max() {
3785        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3786
3787        // Set up topic alias send with max 5
3788        let topic_alias_send = TopicAliasSend::new(5);
3789        connection.topic_alias_send = Some(topic_alias_send);
3790
3791        // Should return false for alias > max
3792        let result = connection.validate_topic_alias_range(6);
3793        assert!(!result);
3794    }
3795
3796    #[test]
3797    fn test_validate_topic_alias_range_valid() {
3798        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3799
3800        // Set up topic alias send with max 10
3801        let topic_alias_send = TopicAliasSend::new(10);
3802        connection.topic_alias_send = Some(topic_alias_send);
3803
3804        // Should return true for valid aliases
3805        assert!(connection.validate_topic_alias_range(1));
3806        assert!(connection.validate_topic_alias_range(5));
3807        assert!(connection.validate_topic_alias_range(10));
3808    }
3809
3810    #[test]
3811    fn test_validate_topic_alias_with_registered_alias() {
3812        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3813
3814        // Set up topic alias send with max 10
3815        let mut topic_alias_send = TopicAliasSend::new(10);
3816        topic_alias_send.insert_or_update("test/topic", 5);
3817        connection.topic_alias_send = Some(topic_alias_send);
3818
3819        // Should return the topic name for registered alias
3820        let result = connection.validate_topic_alias(Some(5));
3821        assert_eq!(result, Some("test/topic".to_string()));
3822    }
3823
3824    #[test]
3825    fn test_validate_topic_alias_unregistered_alias() {
3826        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3827
3828        // Set up topic alias send with max 10 but don't register any aliases
3829        let topic_alias_send = TopicAliasSend::new(10);
3830        connection.topic_alias_send = Some(topic_alias_send);
3831
3832        // Should return None for unregistered alias
3833        let result = connection.validate_topic_alias(Some(5));
3834        assert!(result.is_none());
3835    }
3836
3837    #[test]
3838    fn test_validate_maximum_packet_size_within_limit() {
3839        let connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3840
3841        // Default maximum_packet_size_send is u32::MAX
3842        let result = connection.validate_maximum_packet_size_send(1000);
3843        assert!(result);
3844    }
3845
3846    #[test]
3847    fn test_validate_maximum_packet_size_at_limit() {
3848        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3849
3850        // Set a specific limit
3851        connection.maximum_packet_size_send = 1000;
3852
3853        // Should return true for size equal to limit
3854        let result = connection.validate_maximum_packet_size_send(1000);
3855        assert!(result);
3856    }
3857
3858    #[test]
3859    fn test_validate_maximum_packet_size_over_limit() {
3860        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3861
3862        // Set a specific limit
3863        connection.maximum_packet_size_send = 1000;
3864
3865        // Should return false for size over limit
3866        let result = connection.validate_maximum_packet_size_send(1001);
3867        assert!(!result);
3868    }
3869
3870    #[test]
3871    fn test_validate_maximum_packet_size_zero_limit() {
3872        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3873
3874        // Set limit to 0
3875        connection.maximum_packet_size_send = 0;
3876
3877        // Should return false for any non-zero size
3878        let result = connection.validate_maximum_packet_size_send(1);
3879        assert!(!result);
3880
3881        // Should return true for zero size
3882        let result = connection.validate_maximum_packet_size_send(0);
3883        assert!(result);
3884    }
3885
3886    #[test]
3887    fn test_initialize_clears_state() {
3888        let mut connection = GenericConnection::<role::Client, u16>::new(Version::V5_0);
3889
3890        // Set up some state that should be cleared
3891        connection.publish_send_count = 5;
3892        connection.need_store = true;
3893        connection.pid_suback.insert(123);
3894        connection.pid_unsuback.insert(456);
3895
3896        // Initialize should clear state
3897        connection.initialize(true);
3898
3899        // Verify state is cleared
3900        assert_eq!(connection.publish_send_count, 0);
3901        assert!(!connection.need_store);
3902        assert!(connection.pid_suback.is_empty());
3903        assert!(connection.pid_unsuback.is_empty());
3904        assert!(connection.is_client);
3905    }
3906
3907    #[test]
3908    fn test_remaining_length_to_total_size() {
3909        // Test 1-byte remaining length encoding (0-127)
3910        assert_eq!(remaining_length_to_total_size(0), 2); // 1 + 1 + 0
3911        assert_eq!(remaining_length_to_total_size(127), 129); // 1 + 1 + 127
3912
3913        // Test 2-byte remaining length encoding (128-16383)
3914        assert_eq!(remaining_length_to_total_size(128), 131); // 1 + 2 + 128
3915        assert_eq!(remaining_length_to_total_size(16383), 16386); // 1 + 2 + 16383
3916
3917        // Test 3-byte remaining length encoding (16384-2097151)
3918        assert_eq!(remaining_length_to_total_size(16384), 16388); // 1 + 3 + 16384
3919        assert_eq!(remaining_length_to_total_size(2097151), 2097155); // 1 + 3 + 2097151
3920
3921        // Test 4-byte remaining length encoding (2097152-268435455)
3922        assert_eq!(remaining_length_to_total_size(2097152), 2097157); // 1 + 4 + 2097152
3923        assert_eq!(remaining_length_to_total_size(268435455), 268435460); // 1 + 4 + 268435455
3924    }
3925}