mqtt_protocol_core/mqtt/connection/
event.rs

1/**
2 * MIT License
3 *
4 * Copyright (c) 2025 Takatoshi Kondo
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24use std::fmt;
25
26use serde::ser::{SerializeStruct, Serializer};
27use serde::Serialize;
28
29use crate::mqtt::packet::GenericPacket;
30use crate::mqtt::packet::IsPacketId;
31use crate::mqtt::result_code::MqttError;
32
33/// Represents different types of MQTT timers
34///
35/// This enum defines the different kinds of timers used in MQTT protocol operations.
36/// Each timer serves a specific purpose in maintaining connection health and protocol compliance.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38pub enum TimerKind {
39    /// Timer for sending PINGREQ packets
40    ///
41    /// This timer is used by MQTT clients to schedule periodic PINGREQ packets
42    /// to keep the connection alive. The interval is typically determined by
43    /// the keep-alive value negotiated during connection establishment.
44    #[serde(rename = "pingreq_send")]
45    PingreqSend,
46
47    /// Timer for receiving PINGREQ packets
48    ///
49    /// This timer is used by MQTT servers (brokers) to detect when a client
50    /// has not sent a PINGREQ packet within the expected timeframe, indicating
51    /// a potentially disconnected or unresponsive client.
52    #[serde(rename = "pingreq_recv")]
53    PingreqRecv,
54
55    /// Timer for receiving PINGRESP packets
56    ///
57    /// This timer is used by MQTT clients to detect when a server has not
58    /// responded to a PINGREQ packet with a PINGRESP within the expected
59    /// timeframe, indicating a potentially disconnected or unresponsive server.
60    #[serde(rename = "pingresp_recv")]
61    PingrespRecv,
62}
63
64/// Generic MQTT Event - represents events that occur during MQTT operations
65///
66/// This enum captures all events that would traditionally be handled by callbacks in
67/// a callback-based MQTT implementation. Instead of using callbacks, this Sans-I/O
68/// library returns events that the user application must process.
69///
70/// Events are returned from operations like `recv()` and `send()` for the user to process.
71/// The user application is responsible for handling each event appropriately, such as
72/// sending packets over the network, managing timers, or handling errors.
73///
74/// # Type Parameters
75///
76/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
77///
78/// # Examples
79///
80/// ```ignore
81/// use mqtt_protocol_core::mqtt;
82///
83/// match event {
84///     mqtt::connection::GenericEvent::RequestSendPacket { packet, .. } => {
85///         // Send the packet over the network
86///         network.send(&packet.to_bytes());
87///     },
88///     mqtt::connection::GenericEvent::RequestTimerReset { kind, duration_ms } => {
89///         // Set or reset a timer
90///         timer_manager.set_timer(kind, duration_ms);
91///     },
92///     // ... handle other events
93/// }
94/// ```
95#[derive(Clone)]
96pub enum GenericEvent<PacketIdType>
97where
98    PacketIdType: IsPacketId + Serialize + 'static,
99{
100    /// Notification that a packet was received and parsed successfully
101    ///
102    /// This event is emitted when the MQTT library has successfully received
103    /// and parsed an incoming packet. The application should process this packet
104    /// according to its type and content.
105    ///
106    /// # Parameters
107    ///
108    /// * `GenericPacket<PacketIdType>` - The parsed MQTT packet
109    NotifyPacketReceived(GenericPacket<PacketIdType>),
110
111    /// Request to send a packet via the underlying transport
112    ///
113    /// This event is emitted when the MQTT library needs to send a packet.
114    /// The application must send this packet over the network transport.
115    /// If sending fails and a packet ID is specified in `release_packet_id_if_send_error`,
116    /// the application should call `release_packet_id()` to free the packet ID for reuse.
117    ///
118    /// # Fields
119    ///
120    /// * `packet` - The MQTT packet to send
121    /// * `release_packet_id_if_send_error` - Optional packet ID to release if sending fails
122    RequestSendPacket {
123        /// The MQTT packet that needs to be sent over the network
124        packet: GenericPacket<PacketIdType>,
125        /// Packet ID to release if the send operation fails (QoS > 0 packets only)
126        release_packet_id_if_send_error: Option<PacketIdType>,
127    },
128
129    /// Notification that a packet ID has been released and is available for reuse
130    ///
131    /// This event is emitted when a packet ID is no longer in use and can be
132    /// assigned to new outgoing packets. This typically happens when:
133    /// - A QoS 1 PUBLISH receives its PUBACK
134    /// - A QoS 2 PUBLISH completes its full handshake (PUBLISH -> PUBREC -> PUBREL -> PUBCOMP)
135    /// - A QoS 2 PUBLISH receives a PUBREC with an error code, terminating the sequence early
136    /// - A SUBSCRIBE receives its SUBACK
137    /// - An UNSUBSCRIBE receives its UNSUBACK
138    ///
139    /// # Parameters
140    ///
141    /// * `PacketIdType` - The packet ID that has been released
142    NotifyPacketIdReleased(PacketIdType),
143
144    /// Request to reset or start a timer
145    ///
146    /// This event is emitted when the MQTT library needs to set up a timer for
147    /// protocol operations such as keep-alive pings or timeout detection.
148    /// The application should start or reset the specified timer type with
149    /// the given duration.
150    ///
151    /// # Fields
152    ///
153    /// * `kind` - The type of timer to reset/start
154    /// * `duration_ms` - Timer duration in milliseconds
155    RequestTimerReset {
156        /// The type of timer that needs to be reset or started
157        kind: TimerKind,
158        /// Duration of the timer in milliseconds
159        duration_ms: u64,
160    },
161
162    /// Request to cancel a timer
163    ///
164    /// This event is emitted when the MQTT library needs to cancel a previously
165    /// set timer. This typically happens when the timer is no longer needed,
166    /// such as when a PINGRESP is received before the PINGRESP timeout.
167    ///
168    /// # Parameters
169    ///
170    /// * `TimerKind` - The type of timer to cancel
171    RequestTimerCancel(TimerKind),
172
173    /// Notification that an error occurred during processing
174    ///
175    /// This event is emitted when the MQTT library encounters an error that
176    /// prevents normal operation. The application should handle the error
177    /// appropriately, which may include logging, reconnection attempts, or
178    /// user notification.
179    ///
180    /// Note: When handling this error, closing the underlying transport is not required.
181    /// If the connection needs to be closed, a separate `RequestClose` event will be emitted.
182    ///
183    /// # Parameters
184    ///
185    /// * `MqttError` - The error that occurred
186    NotifyError(MqttError),
187
188    /// Request to close the connection
189    ///
190    /// This event is emitted when the MQTT library determines that the
191    /// connection should be closed. This can happen due to protocol violations,
192    /// disconnect requests, or other terminal conditions. The application
193    /// should close the underlying network connection.
194    RequestClose,
195}
196
197/// Type alias for Event with u16 packet ID (most common case)
198///
199/// This is the standard Event type that most applications will use.
200/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
201/// supporting values from 1 to 65535.
202///
203/// For extended scenarios where larger packet ID ranges are needed
204/// (such as broker clusters), use `GenericEvent<u32>` directly.
205pub type Event = GenericEvent<u16>;
206
207/// Serialization implementation for GenericEvent
208///
209/// This implementation allows GenericEvent to be serialized to JSON format,
210/// which can be useful for logging, debugging, or inter-process communication.
211/// Each event variant is serialized with a "type" field indicating the event type.
212impl<PacketIdType> Serialize for GenericEvent<PacketIdType>
213where
214    PacketIdType: IsPacketId + Serialize + 'static,
215{
216    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
217    where
218        S: Serializer,
219    {
220        match self {
221            GenericEvent::NotifyPacketReceived(packet) => {
222                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
223                state.serialize_field("type", "notify_packet_received")?;
224                state.serialize_field("packet", packet)?;
225                state.end()
226            }
227            GenericEvent::RequestSendPacket {
228                packet,
229                release_packet_id_if_send_error,
230            } => {
231                let mut state = serializer.serialize_struct("GenericEvent", 3)?;
232                state.serialize_field("type", "request_send_packet")?;
233                state.serialize_field("packet", packet)?;
234                state.serialize_field(
235                    "release_packet_id_if_send_error",
236                    release_packet_id_if_send_error,
237                )?;
238                state.end()
239            }
240            GenericEvent::NotifyPacketIdReleased(packet_id) => {
241                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
242                state.serialize_field("type", "notify_packet_id_released")?;
243                state.serialize_field("packet_id", packet_id)?;
244                state.end()
245            }
246            GenericEvent::RequestTimerReset { kind, duration_ms } => {
247                let mut state = serializer.serialize_struct("GenericEvent", 3)?;
248                state.serialize_field("type", "request_timer_reset")?;
249                state.serialize_field("kind", kind)?;
250                state.serialize_field("duration_ms", duration_ms)?;
251                state.end()
252            }
253            GenericEvent::RequestTimerCancel(kind) => {
254                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
255                state.serialize_field("type", "request_timer_cancel")?;
256                state.serialize_field("kind", kind)?;
257                state.end()
258            }
259            GenericEvent::NotifyError(error) => {
260                let mut state = serializer.serialize_struct("GenericEvent", 2)?;
261                state.serialize_field("type", "notify_error")?;
262                state.serialize_field("error", &format!("{error:?}"))?;
263                state.end()
264            }
265            GenericEvent::RequestClose => {
266                let mut state = serializer.serialize_struct("GenericEvent", 1)?;
267                state.serialize_field("type", "request_close")?;
268                state.end()
269            }
270        }
271    }
272}
273
274/// Display implementation for GenericEvent
275///
276/// Formats the event as a JSON string for human-readable output.
277/// This is particularly useful for logging and debugging purposes.
278/// If serialization fails, an error message is displayed instead.
279impl<PacketIdType> fmt::Display for GenericEvent<PacketIdType>
280where
281    PacketIdType: IsPacketId + Serialize + 'static,
282{
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        match serde_json::to_string(self) {
285            Ok(json) => write!(f, "{json}"),
286            Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
287        }
288    }
289}
290
291/// Debug implementation for GenericEvent
292///
293/// Uses the same JSON formatting as Display for consistent debug output.
294/// This ensures that debug output is structured and easily parseable.
295impl<PacketIdType> fmt::Debug for GenericEvent<PacketIdType>
296where
297    PacketIdType: IsPacketId + Serialize + 'static,
298{
299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300        fmt::Display::fmt(self, f)
301    }
302}