mqtt_protocol_core/mqtt/connection/event.rs
1// MIT License
2//
3// Copyright (c) 2025 Takatoshi Kondo
4//
5// Permission is hereby granted, free of charge, to any person obtaining a copy
6// of this software and associated documentation files (the "Software"), to deal
7// in the Software without restriction, including without limitation the rights
8// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9// copies of the Software, and to permit persons to whom the Software is
10// furnished to do so, subject to the following conditions:
11//
12// The above copyright notice and this permission notice shall be included in all
13// copies or substantial portions of the Software.
14//
15// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21// SOFTWARE.
22use core::fmt;
23
24use serde::ser::{SerializeStruct, Serializer};
25use serde::Serialize;
26
27use crate::mqtt::packet::GenericPacket;
28use crate::mqtt::packet::IsPacketId;
29use crate::mqtt::result_code::MqttError;
30
31/// Represents different types of MQTT timers
32///
33/// This enum defines the different kinds of timers used in MQTT protocol operations.
34/// Each timer serves a specific purpose in maintaining connection health and protocol compliance.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
36pub enum TimerKind {
37 /// Timer for sending PINGREQ packets
38 ///
39 /// This timer is used by MQTT clients to schedule periodic PINGREQ packets
40 /// to keep the connection alive. The interval is typically determined by
41 /// the keep-alive value negotiated during connection establishment.
42 #[serde(rename = "pingreq_send")]
43 PingreqSend,
44
45 /// Timer for receiving PINGREQ packets
46 ///
47 /// This timer is used by MQTT servers (brokers) to detect when a client
48 /// has not sent a PINGREQ packet within the expected timeframe, indicating
49 /// a potentially disconnected or unresponsive client.
50 #[serde(rename = "pingreq_recv")]
51 PingreqRecv,
52
53 /// Timer for receiving PINGRESP packets
54 ///
55 /// This timer is used by MQTT clients to detect when a server has not
56 /// responded to a PINGREQ packet with a PINGRESP within the expected
57 /// timeframe, indicating a potentially disconnected or unresponsive server.
58 #[serde(rename = "pingresp_recv")]
59 PingrespRecv,
60}
61
62/// Generic MQTT Event - represents events that occur during MQTT operations
63///
64/// This enum captures all events that would traditionally be handled by callbacks in
65/// a callback-based MQTT implementation. Instead of using callbacks, this Sans-I/O
66/// library returns events that the user application must process.
67///
68/// Events are returned from operations like `recv()` and `send()` for the user to process.
69/// The user application is responsible for handling each event appropriately, such as
70/// sending packets over the network, managing timers, or handling errors.
71///
72/// # Type Parameters
73///
74/// * `PacketIdType` - The type used for packet IDs (typically `u16`, but can be `u32` for extended scenarios)
75///
76/// # Examples
77///
78/// ```ignore
79/// use mqtt_protocol_core::mqtt;
80///
81/// match event {
82/// mqtt::connection::GenericEvent::RequestSendPacket { packet, .. } => {
83/// // Send the packet over the network
84/// network.send(&packet.to_bytes());
85/// },
86/// mqtt::connection::GenericEvent::RequestTimerReset { kind, duration_ms } => {
87/// // Set or reset a timer
88/// timer_manager.set_timer(kind, duration_ms);
89/// },
90/// // ... handle other events
91/// }
92/// ```
93#[derive(Clone)]
94pub enum GenericEvent<PacketIdType>
95where
96 PacketIdType: IsPacketId + Serialize + 'static,
97{
98 /// Notification that a packet was received and parsed successfully
99 ///
100 /// This event is emitted when the MQTT library has successfully received
101 /// and parsed an incoming packet. The application should process this packet
102 /// according to its type and content.
103 ///
104 /// # Parameters
105 ///
106 /// * `GenericPacket<PacketIdType>` - The parsed MQTT packet
107 NotifyPacketReceived(GenericPacket<PacketIdType>),
108
109 /// Request to send a packet via the underlying transport
110 ///
111 /// This event is emitted when the MQTT library needs to send a packet.
112 /// The application must send this packet over the network transport.
113 /// If sending fails and a packet ID is specified in `release_packet_id_if_send_error`,
114 /// the application should call `release_packet_id()` to free the packet ID for reuse.
115 ///
116 /// # Fields
117 ///
118 /// * `packet` - The MQTT packet to send
119 /// * `release_packet_id_if_send_error` - Optional packet ID to release if sending fails
120 RequestSendPacket {
121 /// The MQTT packet that needs to be sent over the network
122 packet: GenericPacket<PacketIdType>,
123 /// Packet ID to release if the send operation fails (QoS > 0 packets only)
124 release_packet_id_if_send_error: Option<PacketIdType>,
125 },
126
127 /// Notification that a packet ID has been released and is available for reuse
128 ///
129 /// This event is emitted when a packet ID is no longer in use and can be
130 /// assigned to new outgoing packets. This typically happens when:
131 /// - A QoS 1 PUBLISH receives its PUBACK
132 /// - A QoS 2 PUBLISH completes its full handshake (PUBLISH -> PUBREC -> PUBREL -> PUBCOMP)
133 /// - A QoS 2 PUBLISH receives a PUBREC with an error code, terminating the sequence early
134 /// - A SUBSCRIBE receives its SUBACK
135 /// - An UNSUBSCRIBE receives its UNSUBACK
136 ///
137 /// # Parameters
138 ///
139 /// * `PacketIdType` - The packet ID that has been released
140 NotifyPacketIdReleased(PacketIdType),
141
142 /// Request to reset or start a timer
143 ///
144 /// This event is emitted when the MQTT library needs to set up a timer for
145 /// protocol operations such as keep-alive pings or timeout detection.
146 /// The application should start or reset the specified timer type with
147 /// the given duration.
148 ///
149 /// # Fields
150 ///
151 /// * `kind` - The type of timer to reset/start
152 /// * `duration_ms` - Timer duration in milliseconds
153 RequestTimerReset {
154 /// The type of timer that needs to be reset or started
155 kind: TimerKind,
156 /// Duration of the timer in milliseconds
157 duration_ms: u64,
158 },
159
160 /// Request to cancel a timer
161 ///
162 /// This event is emitted when the MQTT library needs to cancel a previously
163 /// set timer. This typically happens when the timer is no longer needed,
164 /// such as when a PINGRESP is received before the PINGRESP timeout.
165 ///
166 /// # Parameters
167 ///
168 /// * `TimerKind` - The type of timer to cancel
169 RequestTimerCancel(TimerKind),
170
171 /// Notification that an error occurred during processing
172 ///
173 /// This event is emitted when the MQTT library encounters an error that
174 /// prevents normal operation. The application should handle the error
175 /// appropriately, which may include logging, reconnection attempts, or
176 /// user notification.
177 ///
178 /// Note: When handling this error, closing the underlying transport is not required.
179 /// If the connection needs to be closed, a separate `RequestClose` event will be emitted.
180 ///
181 /// # Parameters
182 ///
183 /// * `MqttError` - The error that occurred
184 NotifyError(MqttError),
185
186 /// Request to close the connection
187 ///
188 /// This event is emitted when the MQTT library determines that the
189 /// connection should be closed. This can happen due to protocol violations,
190 /// disconnect requests, or other terminal conditions. The application
191 /// should close the underlying network connection.
192 RequestClose,
193}
194
195/// Type alias for Event with u16 packet ID (most common case)
196///
197/// This is the standard Event type that most applications will use.
198/// It uses `u16` for packet IDs, which is the standard MQTT packet ID type
199/// supporting values from 1 to 65535.
200///
201/// For extended scenarios where larger packet ID ranges are needed
202/// (such as broker clusters), use `GenericEvent<u32>` directly.
203pub type Event = GenericEvent<u16>;
204
205/// Serialization implementation for GenericEvent
206///
207/// This implementation allows GenericEvent to be serialized to JSON format,
208/// which can be useful for logging, debugging, or inter-process communication.
209/// Each event variant is serialized with a "type" field indicating the event type.
210impl<PacketIdType> Serialize for GenericEvent<PacketIdType>
211where
212 PacketIdType: IsPacketId + Serialize + 'static,
213{
214 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
215 where
216 S: Serializer,
217 {
218 match self {
219 GenericEvent::NotifyPacketReceived(packet) => {
220 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
221 state.serialize_field("type", "notify_packet_received")?;
222 state.serialize_field("packet", packet)?;
223 state.end()
224 }
225 GenericEvent::RequestSendPacket {
226 packet,
227 release_packet_id_if_send_error,
228 } => {
229 let mut state = serializer.serialize_struct("GenericEvent", 3)?;
230 state.serialize_field("type", "request_send_packet")?;
231 state.serialize_field("packet", packet)?;
232 state.serialize_field(
233 "release_packet_id_if_send_error",
234 release_packet_id_if_send_error,
235 )?;
236 state.end()
237 }
238 GenericEvent::NotifyPacketIdReleased(packet_id) => {
239 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
240 state.serialize_field("type", "notify_packet_id_released")?;
241 state.serialize_field("packet_id", packet_id)?;
242 state.end()
243 }
244 GenericEvent::RequestTimerReset { kind, duration_ms } => {
245 let mut state = serializer.serialize_struct("GenericEvent", 3)?;
246 state.serialize_field("type", "request_timer_reset")?;
247 state.serialize_field("kind", kind)?;
248 state.serialize_field("duration_ms", duration_ms)?;
249 state.end()
250 }
251 GenericEvent::RequestTimerCancel(kind) => {
252 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
253 state.serialize_field("type", "request_timer_cancel")?;
254 state.serialize_field("kind", kind)?;
255 state.end()
256 }
257 GenericEvent::NotifyError(error) => {
258 let mut state = serializer.serialize_struct("GenericEvent", 2)?;
259 state.serialize_field("type", "notify_error")?;
260 state.serialize_field("error", &format!("{error:?}"))?;
261 state.end()
262 }
263 GenericEvent::RequestClose => {
264 let mut state = serializer.serialize_struct("GenericEvent", 1)?;
265 state.serialize_field("type", "request_close")?;
266 state.end()
267 }
268 }
269 }
270}
271
272/// Display implementation for GenericEvent
273///
274/// Formats the event as a JSON string for human-readable output.
275/// This is particularly useful for logging and debugging purposes.
276/// If serialization fails, an error message is displayed instead.
277impl<PacketIdType> fmt::Display for GenericEvent<PacketIdType>
278where
279 PacketIdType: IsPacketId + Serialize + 'static,
280{
281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282 match serde_json::to_string(self) {
283 Ok(json) => write!(f, "{json}"),
284 Err(e) => write!(f, "{{\"error\": \"{e}\"}}"),
285 }
286 }
287}
288
289/// Debug implementation for GenericEvent
290///
291/// Uses the same JSON formatting as Display for consistent debug output.
292/// This ensures that debug output is structured and easily parseable.
293impl<PacketIdType> fmt::Debug for GenericEvent<PacketIdType>
294where
295 PacketIdType: IsPacketId + Serialize + 'static,
296{
297 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298 fmt::Display::fmt(self, f)
299 }
300}