mqtt_protocol_core/mqtt/connection/
event.rs

1// MIT License
2//
3// Copyright (c) 2025 Takatoshi Kondo
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to deal
7// in the Software without restriction, including without limitation the rights
8// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9// copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in all
13// copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21// SOFTWARE.
22use core::fmt;
23
24use serde::ser::{SerializeStruct, Serializer};
25use serde::Serialize;
26
27use crate::mqtt::packet::GenericPacket;
28use crate::mqtt::packet::IsPacketId;
29use crate::mqtt::result_code::MqttError;
30
31/// Represents different types of MQTT timers
32///
33/// This enum defines the different kinds of timers used in MQTT protocol operations.
34/// Each timer serves a specific purpose in maintaining connection health and protocol compliance.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
36pub enum TimerKind {
37    /// Timer for sending PINGREQ packets
38    ///
39    /// This timer is used by MQTT clients to schedule periodic PINGREQ packets
40    /// to keep the connection alive. The interval is typically determined by
41    /// the keep-alive value negotiated during connection establishment.
42    #[serde(rename = "pingreq_send")]
43    PingreqSend,
44
45    /// Timer for receiving PINGREQ packets
46    ///
47    /// This timer is used by MQTT servers (brokers) to detect when a client
48    /// has not sent a PINGREQ packet within the expected timeframe, indicating
49    /// a potentially disconnected or unresponsive client.
50    #[serde(rename = "pingreq_recv")]
51    PingreqRecv,
52
53    /// Timer for receiving PINGRESP packets
54    ///
55    /// This timer is used by MQTT clients to detect when a server has not
56    /// responded to a PINGREQ packet with a PINGRESP within the expected
57    /// timeframe, indicating a potentially disconnected or unresponsive server.
58    #[serde(rename = "pingresp_recv")]
59    PingrespRecv,
60}
61
62/// Generic MQTT Event - represents events that occur during MQTT operations
63///
64/// This enum captures all events that would traditionally be handled by callbacks in
65/// a callback-based MQTT implementation. Instead of using callbacks, this Sans-I/O
66/// library returns events that the user application must process.
67///
68/// Events are returned from operations like `recv()` and `send()` for the user to process.
69/// The user application is responsible for handling each event appropriately, such as
70/// sending packets over the network, managing timers, or handling errors.
71///
72/// # Type Parameters
73///
74/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
75///
76/// # Examples
77///
78/// ```ignore
79/// use mqtt_protocol_core::mqtt;
80///
81/// match event {
82///     mqtt::connection::GenericEvent::RequestSendPacket { packet, .. } => {
83///         // Send the packet over the network
84///         network.send(&packet.to_bytes());
85///     },
86///     mqtt::connection::GenericEvent::RequestTimerReset { kind, duration_ms } => {
87///         // Set or reset a timer
88///         timer_manager.set_timer(kind, duration_ms);
89///     },
90///     // ... handle other events
91/// }
92/// ```
93#[derive(Clone)]
94pub enum GenericEvent<PacketIdType>
95where
96    PacketIdType: IsPacketId + Serialize + 'static,
97{
98    /// Notification that a packet was received and parsed successfully
99    ///
100    /// This event is emitted when the MQTT library has successfully received
101    /// and parsed an incoming packet. The application should process this packet
102    /// according to its type and content.
103    ///
104    /// # Parameters
105    ///
106    /// * `GenericPacket<PacketIdType>` - The parsed MQTT packet
107    NotifyPacketReceived(GenericPacket<PacketIdType>),
108
109    /// Request to send a packet via the underlying transport
110    ///
111    /// This event is emitted when the MQTT library needs to send a packet.
112    /// The application must send this packet over the network transport.
113    /// If sending fails and a packet ID is specified in `release_packet_id_if_send_error`,
114    /// the application should call `release_packet_id()` to free the packet ID for reuse.
115    ///
116    /// # Fields
117    ///
118    /// * `packet` - The MQTT packet to send
119    /// * `release_packet_id_if_send_error` - Optional packet ID to release if sending fails
120    RequestSendPacket {
121        /// The MQTT packet that needs to be sent over the network
122        packet: GenericPacket<PacketIdType>,
123        /// Packet ID to release if the send operation fails (QoS > 0 packets only)
124        release_packet_id_if_send_error: Option<PacketIdType>,
125    },
126
127    /// Notification that a packet ID has been released and is available for reuse
128    ///
129    /// This event is emitted when a packet ID is no longer in use and can be
130    /// assigned to new outgoing packets. This typically happens when:
131    /// - A QoS 1 PUBLISH receives its PUBACK
132    /// - A QoS 2 PUBLISH completes its full handshake (PUBLISH -> PUBREC -> PUBREL -> PUBCOMP)
133    /// - A QoS 2 PUBLISH receives a PUBREC with an error code, terminating the sequence early
134    /// - A SUBSCRIBE receives its SUBACK
135    /// - An UNSUBSCRIBE receives its UNSUBACK
136    ///
137    /// # Parameters
138    ///
139    /// * `PacketIdType` - The packet ID that has been released
140    NotifyPacketIdReleased(PacketIdType),
141
142    /// Request to reset or start a timer
143    ///
144    /// This event is emitted when the MQTT library needs to set up a timer for
145    /// protocol operations such as keep-alive pings or timeout detection.
146    /// The application should start or reset the specified timer type with
147    /// the given duration.
148    ///
149    /// # Fields
150    ///
151    /// * `kind` - The type of timer to reset/start
152    /// * `duration_ms` - Timer duration in milliseconds
153    RequestTimerReset {
154        /// The type of timer that needs to be reset or started
155        kind: TimerKind,
156        /// Duration of the timer in milliseconds
157        duration_ms: u64,
158    },
159
160    /// Request to cancel a timer
161    ///
162    /// This event is emitted when the MQTT library needs to cancel a previously
163    /// set timer. This typically happens when the timer is no longer needed,
164    /// such as when a PINGRESP is received before the PINGRESP timeout.
165    ///
166    /// # Parameters
167    ///
168    /// * `TimerKind` - The type of timer to cancel
169    RequestTimerCancel(TimerKind),
170
171    /// Notification that an error occurred during processing
172    ///
173    /// This event is emitted when the MQTT library encounters an error that
174    /// prevents normal operation. The application should handle the error
175    /// appropriately, which may include logging, reconnection attempts, or
176    /// user notification.
177    ///
178    /// Note: When handling this error, closing the underlying transport is not required.
179    /// If the connection needs to be closed, a separate `RequestClose` event will be emitted.
180    ///
181    /// # Parameters
182    ///
183    /// * `MqttError` - The error that occurred
184    NotifyError(MqttError),
185
186    /// Request to close the connection
187    ///
188    /// This event is emitted when the MQTT library determines that the
189    /// connection should be closed. This can happen due to protocol violations,
190    /// disconnect requests, or other terminal conditions. The application
191    /// should close the underlying network connection.
192    RequestClose,
193}
194
195/// Type alias for Event with u16 packet ID (most common case)
196///
197/// This is the standard Event type that most applications will use.
198/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
199/// supporting values from 1 to 65535.
200///
201/// For extended scenarios where larger packet ID ranges are needed
202/// (such as broker clusters), use `GenericEvent<u32>` directly.
203pub type Event = GenericEvent<u16>;
204
205/// Serialization implementation for GenericEvent
206///
207/// This implementation allows GenericEvent to be serialized to JSON format,
208/// which can be useful for logging, debugging, or inter-process communication.
209/// Each event variant is serialized with a "type" field indicating the event type.
210impl<PacketIdType> Serialize for GenericEvent<PacketIdType>
211where
212    PacketIdType: IsPacketId + Serialize + 'static,
213{
214    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
215    where
216        S: Serializer,
217    {
218        match self {
219            GenericEvent::NotifyPacketReceived(packet) => {
220                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
221                state.serialize_field("type", "notify_packet_received")?;
222                state.serialize_field("packet", packet)?;
223                state.end()
224            }
225            GenericEvent::RequestSendPacket {
226                packet,
227                release_packet_id_if_send_error,
228            } => {
229                let mut state = serializer.serialize_struct("GenericEvent", 3)?;
230                state.serialize_field("type", "request_send_packet")?;
231                state.serialize_field("packet", packet)?;
232                state.serialize_field(
233                    "release_packet_id_if_send_error",
234                    release_packet_id_if_send_error,
235                )?;
236                state.end()
237            }
238            GenericEvent::NotifyPacketIdReleased(packet_id) => {
239                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
240                state.serialize_field("type", "notify_packet_id_released")?;
241                state.serialize_field("packet_id", packet_id)?;
242                state.end()
243            }
244            GenericEvent::RequestTimerReset { kind, duration_ms } => {
245                let mut state = serializer.serialize_struct("GenericEvent", 3)?;
246                state.serialize_field("type", "request_timer_reset")?;
247                state.serialize_field("kind", kind)?;
248                state.serialize_field("duration_ms", duration_ms)?;
249                state.end()
250            }
251            GenericEvent::RequestTimerCancel(kind) => {
252                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
253                state.serialize_field("type", "request_timer_cancel")?;
254                state.serialize_field("kind", kind)?;
255                state.end()
256            }
257            GenericEvent::NotifyError(error) => {
258                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
259                state.serialize_field("type", "notify_error")?;
260                state.serialize_field("error", &format!("{error:?}"))?;
261                state.end()
262            }
263            GenericEvent::RequestClose => {
264                let mut state = serializer.serialize_struct("GenericEvent", 1)?;
265                state.serialize_field("type", "request_close")?;
266                state.end()
267            }
268        }
269    }
270}
271
272/// Display implementation for GenericEvent
273///
274/// Formats the event as a JSON string for human-readable output.
275/// This is particularly useful for logging and debugging purposes.
276/// If serialization fails, an error message is displayed instead.
277impl<PacketIdType> fmt::Display for GenericEvent<PacketIdType>
278where
279    PacketIdType: IsPacketId + Serialize + 'static,
280{
281    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282        match serde_json::to_string(self) {
283            Ok(json) => write!(f, "{json}"),
284            Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
285        }
286    }
287}
288
289/// Debug implementation for GenericEvent
290///
291/// Uses the same JSON formatting as Display for consistent debug output.
292/// This ensures that debug output is structured and easily parseable.
293impl<PacketIdType> fmt::Debug for GenericEvent<PacketIdType>
294where
295    PacketIdType: IsPacketId + Serialize + 'static,
296{
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298        fmt::Display::fmt(self, f)
299    }
300}