ntex_mqtt/v3/
control.rs

1use ntex_bytes::ByteString;
2use std::{io, marker::PhantomData, num::NonZeroU16};
3
4use super::codec;
5use crate::{error, types::QoS};
6
7/// Server control messages
8#[derive(Debug)]
9pub enum Control<E> {
10    /// Publish release
11    PublishRelease(PublishRelease),
12    /// Ping packet
13    Ping(Ping),
14    /// Disconnect packet
15    Disconnect(Disconnect),
16    /// Subscribe packet
17    Subscribe(Subscribe),
18    /// Unsubscribe packet
19    Unsubscribe(Unsubscribe),
20    /// Write back-pressure is enabled/disabled
21    WrBackpressure(WrBackpressure),
22    /// Connection dropped
23    Closed(Closed),
24    /// Service level error
25    Error(Error<E>),
26    /// Protocol level error
27    ProtocolError(ProtocolError),
28    /// Peer is gone
29    PeerGone(PeerGone),
30}
31
32#[derive(Debug)]
33pub struct ControlAck {
34    pub(crate) result: ControlAckKind,
35}
36
37#[derive(Debug)]
38pub(crate) enum ControlAckKind {
39    Nothing,
40    PublishAck(NonZeroU16),
41    PublishRelease(NonZeroU16),
42    Ping,
43    Disconnect,
44    Subscribe(SubscribeResult),
45    Unsubscribe(UnsubscribeResult),
46    Closed,
47}
48
49impl<E> Control<E> {
50    pub(crate) fn pubrel(packet_id: NonZeroU16) -> Self {
51        Control::PublishRelease(PublishRelease { packet_id })
52    }
53
54    /// Create a new PING `Control` message.
55    #[doc(hidden)]
56    pub fn ping() -> Self {
57        Control::Ping(Ping)
58    }
59
60    /// Create a new `Control` message from SUBSCRIBE packet.
61    #[doc(hidden)]
62    pub fn subscribe(pkt: Subscribe) -> Self {
63        Control::Subscribe(pkt)
64    }
65
66    /// Create a new `Control` message from UNSUBSCRIBE packet.
67    #[doc(hidden)]
68    pub fn unsubscribe(pkt: Unsubscribe) -> Self {
69        Control::Unsubscribe(pkt)
70    }
71
72    /// Create a new `Control` message from DISCONNECT packet.
73    #[doc(hidden)]
74    pub fn remote_disconnect() -> Self {
75        Control::Disconnect(Disconnect)
76    }
77
78    pub(super) const fn closed() -> Self {
79        Control::Closed(Closed)
80    }
81
82    pub(super) const fn wr_backpressure(enabled: bool) -> Self {
83        Control::WrBackpressure(WrBackpressure(enabled))
84    }
85
86    pub(super) fn error(err: E) -> Self {
87        Control::Error(Error::new(err))
88    }
89
90    pub(super) fn proto_error(err: error::ProtocolError) -> Self {
91        Control::ProtocolError(ProtocolError::new(err))
92    }
93
94    /// Create a new `Control` message from DISCONNECT packet.
95    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
96        Control::PeerGone(PeerGone(err))
97    }
98
99    /// Disconnects the client by sending DISCONNECT packet.
100    pub fn disconnect(&self) -> ControlAck {
101        ControlAck { result: ControlAckKind::Disconnect }
102    }
103
104    /// Ack control message
105    pub fn ack(self) -> ControlAck {
106        match self {
107            Control::PublishRelease(msg) => msg.ack(),
108            Control::Ping(msg) => msg.ack(),
109            Control::Disconnect(msg) => msg.ack(),
110            Control::Subscribe(_) => {
111                log::warn!("Subscribe is not supported");
112                ControlAck { result: ControlAckKind::Disconnect }
113            }
114            Control::Unsubscribe(_) => {
115                log::warn!("Unsubscribe is not supported");
116                ControlAck { result: ControlAckKind::Disconnect }
117            }
118            Control::WrBackpressure(msg) => msg.ack(),
119            Control::Closed(msg) => msg.ack(),
120            Control::Error(msg) => msg.ack(),
121            Control::ProtocolError(msg) => msg.ack(),
122            Control::PeerGone(msg) => msg.ack(),
123        }
124    }
125}
126
127/// Publish release
128#[derive(Copy, Clone, Debug)]
129pub struct PublishRelease {
130    pub packet_id: NonZeroU16,
131}
132
133impl PublishRelease {
134    #[inline]
135    /// Packet Identifier
136    pub fn id(self) -> NonZeroU16 {
137        self.packet_id
138    }
139
140    #[inline]
141    /// convert packet to a result
142    pub fn ack(self) -> ControlAck {
143        ControlAck { result: ControlAckKind::PublishRelease(self.packet_id) }
144    }
145}
146
147pub(crate) struct PublishReleaseResult {
148    pub packet_id: NonZeroU16,
149}
150
151#[derive(Copy, Clone, Debug)]
152pub struct Ping;
153
154impl Ping {
155    pub fn ack(self) -> ControlAck {
156        ControlAck { result: ControlAckKind::Ping }
157    }
158}
159
160#[derive(Copy, Clone, Debug)]
161pub struct Disconnect;
162
163impl Disconnect {
164    pub fn ack(self) -> ControlAck {
165        ControlAck { result: ControlAckKind::Disconnect }
166    }
167}
168
169/// Service level error
170#[derive(Debug)]
171pub struct Error<E> {
172    err: E,
173}
174
175impl<E> Error<E> {
176    pub fn new(err: E) -> Self {
177        Self { err }
178    }
179
180    #[inline]
181    /// Returns reference to mqtt error
182    pub fn get_ref(&self) -> &E {
183        &self.err
184    }
185
186    #[inline]
187    /// Ack service error, return disconnect packet and close connection.
188    pub fn ack(self) -> ControlAck {
189        ControlAck { result: ControlAckKind::Disconnect }
190    }
191
192    #[inline]
193    /// Ack service error, return disconnect packet and close connection.
194    pub fn ack_and_error(self) -> (ControlAck, E) {
195        (ControlAck { result: ControlAckKind::Disconnect }, self.err)
196    }
197}
198
199/// Protocol level error
200#[derive(Debug)]
201pub struct ProtocolError {
202    err: error::ProtocolError,
203}
204
205impl ProtocolError {
206    pub fn new(err: error::ProtocolError) -> Self {
207        Self { err }
208    }
209
210    #[inline]
211    /// Returns reference to a protocol error
212    pub fn get_ref(&self) -> &error::ProtocolError {
213        &self.err
214    }
215
216    #[inline]
217    /// Ack protocol error, return disconnect packet and close connection.
218    pub fn ack(self) -> ControlAck {
219        ControlAck { result: ControlAckKind::Disconnect }
220    }
221
222    #[inline]
223    /// Ack protocol error, return disconnect packet and close connection.
224    pub fn ack_and_error(self) -> (ControlAck, error::ProtocolError) {
225        (ControlAck { result: ControlAckKind::Disconnect }, self.err)
226    }
227}
228
229/// Subscribe message
230#[derive(Debug)]
231pub struct Subscribe {
232    packet_id: NonZeroU16,
233    packet_size: u32,
234    topics: Vec<(ByteString, QoS)>,
235    codes: Vec<codec::SubscribeReturnCode>,
236}
237
238/// Result of a subscribe message
239#[derive(Debug)]
240pub(crate) struct SubscribeResult {
241    pub(crate) codes: Vec<codec::SubscribeReturnCode>,
242    pub(crate) packet_id: NonZeroU16,
243}
244
245impl Subscribe {
246    /// Create a new `Subscribe` control message from packet id and
247    /// a list of topics.
248    #[doc(hidden)]
249    pub fn new(
250        packet_id: NonZeroU16,
251        packet_size: u32,
252        topics: Vec<(ByteString, QoS)>,
253    ) -> Self {
254        let mut codes = Vec::with_capacity(topics.len());
255        (0..topics.len()).for_each(|_| codes.push(codec::SubscribeReturnCode::Failure));
256
257        Self { packet_id, packet_size, topics, codes }
258    }
259
260    /// Returns size of the packet
261    pub fn packet_size(&self) -> u32 {
262        self.packet_size
263    }
264
265    #[inline]
266    /// returns iterator over subscription topics
267    pub fn iter_mut(&mut self) -> SubscribeIter<'_> {
268        SubscribeIter { subs: self as *const _ as *mut _, entry: 0, lt: PhantomData }
269    }
270
271    #[inline]
272    /// convert subscription to a result
273    pub fn ack(self) -> ControlAck {
274        ControlAck {
275            result: ControlAckKind::Subscribe(SubscribeResult {
276                codes: self.codes,
277                packet_id: self.packet_id,
278            }),
279        }
280    }
281}
282
283impl<'a> IntoIterator for &'a mut Subscribe {
284    type Item = Subscription<'a>;
285    type IntoIter = SubscribeIter<'a>;
286
287    fn into_iter(self) -> SubscribeIter<'a> {
288        self.iter_mut()
289    }
290}
291
292/// Iterator over subscription topics
293pub struct SubscribeIter<'a> {
294    subs: *mut Subscribe,
295    entry: usize,
296    lt: PhantomData<&'a mut Subscribe>,
297}
298
299impl<'a> SubscribeIter<'a> {
300    fn next_unsafe(&mut self) -> Option<Subscription<'a>> {
301        let subs = unsafe { &mut *self.subs };
302
303        if self.entry < subs.topics.len() {
304            let s = Subscription {
305                topic: &subs.topics[self.entry].0,
306                qos: subs.topics[self.entry].1,
307                code: &mut subs.codes[self.entry],
308            };
309            self.entry += 1;
310            Some(s)
311        } else {
312            None
313        }
314    }
315}
316
317impl<'a> Iterator for SubscribeIter<'a> {
318    type Item = Subscription<'a>;
319
320    #[inline]
321    fn next(&mut self) -> Option<Subscription<'a>> {
322        self.next_unsafe()
323    }
324}
325
326/// Subscription topic
327#[derive(Debug)]
328pub struct Subscription<'a> {
329    topic: &'a ByteString,
330    qos: QoS,
331    code: &'a mut codec::SubscribeReturnCode,
332}
333
334impl<'a> Subscription<'a> {
335    #[inline]
336    /// subscription topic
337    pub fn topic(&self) -> &'a ByteString {
338        self.topic
339    }
340
341    #[inline]
342    /// the level of assurance for delivery of an Application Message.
343    pub fn qos(&self) -> QoS {
344        self.qos
345    }
346
347    #[inline]
348    /// fail to subscribe to the topic
349    pub fn fail(&mut self) {
350        *self.code = codec::SubscribeReturnCode::Failure
351    }
352
353    #[inline]
354    /// confirm subscription to a topic with specific qos
355    pub fn confirm(&mut self, qos: QoS) {
356        *self.code = codec::SubscribeReturnCode::Success(qos)
357    }
358
359    #[inline]
360    #[doc(hidden)]
361    /// confirm subscription to a topic with specific qos
362    pub fn subscribe(&mut self, qos: QoS) {
363        self.confirm(qos)
364    }
365}
366
367/// Unsubscribe message
368#[derive(Debug)]
369pub struct Unsubscribe {
370    packet_id: NonZeroU16,
371    packet_size: u32,
372    topics: Vec<ByteString>,
373}
374
375/// Result of a unsubscribe message
376#[derive(Debug)]
377pub(crate) struct UnsubscribeResult {
378    pub(crate) packet_id: NonZeroU16,
379}
380
381impl Unsubscribe {
382    /// Create a new `Unsubscribe` control message from packet id and
383    /// a list of topics.
384    #[doc(hidden)]
385    pub fn new(packet_id: NonZeroU16, packet_size: u32, topics: Vec<ByteString>) -> Self {
386        Self { packet_id, packet_size, topics }
387    }
388
389    /// Returns size of the packet
390    pub fn packet_size(&self) -> u32 {
391        self.packet_size
392    }
393
394    /// returns iterator over unsubscribe topics
395    pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
396        self.topics.iter()
397    }
398
399    #[inline]
400    /// convert packet to a result
401    pub fn ack(self) -> ControlAck {
402        ControlAck {
403            result: ControlAckKind::Unsubscribe(UnsubscribeResult {
404                packet_id: self.packet_id,
405            }),
406        }
407    }
408}
409
410/// Write back-pressure message
411#[derive(Debug)]
412pub struct WrBackpressure(bool);
413
414impl WrBackpressure {
415    #[inline]
416    /// Is write back-pressure enabled
417    pub fn enabled(&self) -> bool {
418        self.0
419    }
420
421    #[inline]
422    /// convert packet to a result
423    pub fn ack(self) -> ControlAck {
424        ControlAck { result: ControlAckKind::Nothing }
425    }
426}
427
428/// Connection closed message
429#[derive(Debug)]
430pub struct Closed;
431
432impl Closed {
433    #[inline]
434    /// convert packet to a result
435    pub fn ack(self) -> ControlAck {
436        ControlAck { result: ControlAckKind::Closed }
437    }
438}
439
440#[derive(Debug)]
441pub struct PeerGone(pub(super) Option<io::Error>);
442
443impl PeerGone {
444    /// Returns error reference
445    pub fn err(&self) -> Option<&io::Error> {
446        self.0.as_ref()
447    }
448
449    /// Take error
450    pub fn take(&mut self) -> Option<io::Error> {
451        self.0.take()
452    }
453
454    pub fn ack(self) -> ControlAck {
455        ControlAck { result: ControlAckKind::Nothing }
456    }
457}