mqtt_protocol_core/mqtt/connection/event.rs
1/**
2 * MIT License
3 *
4 * Copyright (c) 2025 Takatoshi Kondo
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24use core::fmt;
25
26use serde::ser::{SerializeStruct, Serializer};
27use serde::Serialize;
28
29use crate::mqtt::packet::GenericPacket;
30use crate::mqtt::packet::IsPacketId;
31use crate::mqtt::result_code::MqttError;
32
33/// Represents different types of MQTT timers
34///
35/// This enum defines the different kinds of timers used in MQTT protocol operations.
36/// Each timer serves a specific purpose in maintaining connection health and protocol compliance.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38pub enum TimerKind {
39 /// Timer for sending PINGREQ packets
40 ///
41 /// This timer is used by MQTT clients to schedule periodic PINGREQ packets
42 /// to keep the connection alive. The interval is typically determined by
43 /// the keep-alive value negotiated during connection establishment.
44 #[serde(rename = "pingreq_send")]
45 PingreqSend,
46
47 /// Timer for receiving PINGREQ packets
48 ///
49 /// This timer is used by MQTT servers (brokers) to detect when a client
50 /// has not sent a PINGREQ packet within the expected timeframe, indicating
51 /// a potentially disconnected or unresponsive client.
52 #[serde(rename = "pingreq_recv")]
53 PingreqRecv,
54
55 /// Timer for receiving PINGRESP packets
56 ///
57 /// This timer is used by MQTT clients to detect when a server has not
58 /// responded to a PINGREQ packet with a PINGRESP within the expected
59 /// timeframe, indicating a potentially disconnected or unresponsive server.
60 #[serde(rename = "pingresp_recv")]
61 PingrespRecv,
62}
63
64/// Generic MQTT Event - represents events that occur during MQTT operations
65///
66/// This enum captures all events that would traditionally be handled by callbacks in
67/// a callback-based MQTT implementation. Instead of using callbacks, this Sans-I/O
68/// library returns events that the user application must process.
69///
70/// Events are returned from operations like `recv()` and `send()` for the user to process.
71/// The user application is responsible for handling each event appropriately, such as
72/// sending packets over the network, managing timers, or handling errors.
73///
74/// # Type Parameters
75///
76/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
77///
78/// # Examples
79///
80/// ```ignore
81/// use mqtt_protocol_core::mqtt;
82///
83/// match event {
84/// mqtt::connection::GenericEvent::RequestSendPacket { packet, .. } => {
85/// // Send the packet over the network
86/// network.send(&packet.to_bytes());
87/// },
88/// mqtt::connection::GenericEvent::RequestTimerReset { kind, duration_ms } => {
89/// // Set or reset a timer
90/// timer_manager.set_timer(kind, duration_ms);
91/// },
92/// // ... handle other events
93/// }
94/// ```
95#[derive(Clone)]
96pub enum GenericEvent<PacketIdType>
97where
98 PacketIdType: IsPacketId + Serialize + 'static,
99{
100 /// Notification that a packet was received and parsed successfully
101 ///
102 /// This event is emitted when the MQTT library has successfully received
103 /// and parsed an incoming packet. The application should process this packet
104 /// according to its type and content.
105 ///
106 /// # Parameters
107 ///
108 /// * `GenericPacket<PacketIdType>` - The parsed MQTT packet
109 NotifyPacketReceived(GenericPacket<PacketIdType>),
110
111 /// Request to send a packet via the underlying transport
112 ///
113 /// This event is emitted when the MQTT library needs to send a packet.
114 /// The application must send this packet over the network transport.
115 /// If sending fails and a packet ID is specified in `release_packet_id_if_send_error`,
116 /// the application should call `release_packet_id()` to free the packet ID for reuse.
117 ///
118 /// # Fields
119 ///
120 /// * `packet` - The MQTT packet to send
121 /// * `release_packet_id_if_send_error` - Optional packet ID to release if sending fails
122 RequestSendPacket {
123 /// The MQTT packet that needs to be sent over the network
124 packet: GenericPacket<PacketIdType>,
125 /// Packet ID to release if the send operation fails (QoS > 0 packets only)
126 release_packet_id_if_send_error: Option<PacketIdType>,
127 },
128
129 /// Notification that a packet ID has been released and is available for reuse
130 ///
131 /// This event is emitted when a packet ID is no longer in use and can be
132 /// assigned to new outgoing packets. This typically happens when:
133 /// - A QoS 1 PUBLISH receives its PUBACK
134 /// - A QoS 2 PUBLISH completes its full handshake (PUBLISH -> PUBREC -> PUBREL -> PUBCOMP)
135 /// - A QoS 2 PUBLISH receives a PUBREC with an error code, terminating the sequence early
136 /// - A SUBSCRIBE receives its SUBACK
137 /// - An UNSUBSCRIBE receives its UNSUBACK
138 ///
139 /// # Parameters
140 ///
141 /// * `PacketIdType` - The packet ID that has been released
142 NotifyPacketIdReleased(PacketIdType),
143
144 /// Request to reset or start a timer
145 ///
146 /// This event is emitted when the MQTT library needs to set up a timer for
147 /// protocol operations such as keep-alive pings or timeout detection.
148 /// The application should start or reset the specified timer type with
149 /// the given duration.
150 ///
151 /// # Fields
152 ///
153 /// * `kind` - The type of timer to reset/start
154 /// * `duration_ms` - Timer duration in milliseconds
155 RequestTimerReset {
156 /// The type of timer that needs to be reset or started
157 kind: TimerKind,
158 /// Duration of the timer in milliseconds
159 duration_ms: u64,
160 },
161
162 /// Request to cancel a timer
163 ///
164 /// This event is emitted when the MQTT library needs to cancel a previously
165 /// set timer. This typically happens when the timer is no longer needed,
166 /// such as when a PINGRESP is received before the PINGRESP timeout.
167 ///
168 /// # Parameters
169 ///
170 /// * `TimerKind` - The type of timer to cancel
171 RequestTimerCancel(TimerKind),
172
173 /// Notification that an error occurred during processing
174 ///
175 /// This event is emitted when the MQTT library encounters an error that
176 /// prevents normal operation. The application should handle the error
177 /// appropriately, which may include logging, reconnection attempts, or
178 /// user notification.
179 ///
180 /// Note: When handling this error, closing the underlying transport is not required.
181 /// If the connection needs to be closed, a separate `RequestClose` event will be emitted.
182 ///
183 /// # Parameters
184 ///
185 /// * `MqttError` - The error that occurred
186 NotifyError(MqttError),
187
188 /// Request to close the connection
189 ///
190 /// This event is emitted when the MQTT library determines that the
191 /// connection should be closed. This can happen due to protocol violations,
192 /// disconnect requests, or other terminal conditions. The application
193 /// should close the underlying network connection.
194 RequestClose,
195}
196
197/// Type alias for Event with u16 packet ID (most common case)
198///
199/// This is the standard Event type that most applications will use.
200/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
201/// supporting values from 1 to 65535.
202///
203/// For extended scenarios where larger packet ID ranges are needed
204/// (such as broker clusters), use `GenericEvent<u32>` directly.
205pub type Event = GenericEvent<u16>;
206
207/// Serialization implementation for GenericEvent
208///
209/// This implementation allows GenericEvent to be serialized to JSON format,
210/// which can be useful for logging, debugging, or inter-process communication.
211/// Each event variant is serialized with a "type" field indicating the event type.
212impl<PacketIdType> Serialize for GenericEvent<PacketIdType>
213where
214 PacketIdType: IsPacketId + Serialize + 'static,
215{
216 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
217 where
218 S: Serializer,
219 {
220 match self {
221 GenericEvent::NotifyPacketReceived(packet) => {
222 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
223 state.serialize_field("type", "notify_packet_received")?;
224 state.serialize_field("packet", packet)?;
225 state.end()
226 }
227 GenericEvent::RequestSendPacket {
228 packet,
229 release_packet_id_if_send_error,
230 } => {
231 let mut state = serializer.serialize_struct("GenericEvent", 3)?;
232 state.serialize_field("type", "request_send_packet")?;
233 state.serialize_field("packet", packet)?;
234 state.serialize_field(
235 "release_packet_id_if_send_error",
236 release_packet_id_if_send_error,
237 )?;
238 state.end()
239 }
240 GenericEvent::NotifyPacketIdReleased(packet_id) => {
241 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
242 state.serialize_field("type", "notify_packet_id_released")?;
243 state.serialize_field("packet_id", packet_id)?;
244 state.end()
245 }
246 GenericEvent::RequestTimerReset { kind, duration_ms } => {
247 let mut state = serializer.serialize_struct("GenericEvent", 3)?;
248 state.serialize_field("type", "request_timer_reset")?;
249 state.serialize_field("kind", kind)?;
250 state.serialize_field("duration_ms", duration_ms)?;
251 state.end()
252 }
253 GenericEvent::RequestTimerCancel(kind) => {
254 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
255 state.serialize_field("type", "request_timer_cancel")?;
256 state.serialize_field("kind", kind)?;
257 state.end()
258 }
259 GenericEvent::NotifyError(error) => {
260 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
261 state.serialize_field("type", "notify_error")?;
262 state.serialize_field("error", &format!("{error:?}"))?;
263 state.end()
264 }
265 GenericEvent::RequestClose => {
266 let mut state = serializer.serialize_struct("GenericEvent", 1)?;
267 state.serialize_field("type", "request_close")?;
268 state.end()
269 }
270 }
271 }
272}
273
274/// Display implementation for GenericEvent
275///
276/// Formats the event as a JSON string for human-readable output.
277/// This is particularly useful for logging and debugging purposes.
278/// If serialization fails, an error message is displayed instead.
279impl<PacketIdType> fmt::Display for GenericEvent<PacketIdType>
280where
281 PacketIdType: IsPacketId + Serialize + 'static,
282{
283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284 match serde_json::to_string(self) {
285 Ok(json) => write!(f, "{json}"),
286 Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
287 }
288 }
289}
290
291/// Debug implementation for GenericEvent
292///
293/// Uses the same JSON formatting as Display for consistent debug output.
294/// This ensures that debug output is structured and easily parseable.
295impl<PacketIdType> fmt::Debug for GenericEvent<PacketIdType>
296where
297 PacketIdType: IsPacketId + Serialize + 'static,
298{
299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
300 fmt::Display::fmt(self, f)
301 }
302}