ntex_mqtt/v5/
control.rs

1use std::{io, marker::PhantomData};
2
3use ntex_bytes::ByteString;
4
5use super::codec::{self, DisconnectReasonCode, QoS, UserProperties};
6use crate::error;
7
8/// Server control messages
9#[derive(Debug)]
10pub enum Control<E> {
11    /// Auth packet from a client
12    Auth(Auth),
13    /// PublishRelease packet from a client
14    PublishRelease(PublishRelease),
15    /// Ping packet from a client
16    Ping(Ping),
17    /// Disconnect packet from a client
18    Disconnect(Disconnect),
19    /// Subscribe packet from a client
20    Subscribe(Subscribe),
21    /// Unsubscribe packet from a client
22    Unsubscribe(Unsubscribe),
23    /// Write back-pressure is enabled/disabled
24    WrBackpressure(WrBackpressure),
25    /// Underlying transport connection closed
26    Closed(Closed),
27    /// Unhandled application level error from handshake, publish and control services
28    Error(Error<E>),
29    /// Protocol level error
30    ProtocolError(ProtocolError),
31    /// Peer is gone
32    PeerGone(PeerGone),
33}
34
35/// Control message handling result
36#[derive(Debug)]
37pub struct ControlAck {
38    pub(crate) packet: Option<codec::Packet>,
39    pub(crate) disconnect: bool,
40}
41
42impl<E> Control<E> {
43    /// Create a new `Control` from AUTH packet.
44    #[doc(hidden)]
45    pub fn auth(pkt: codec::Auth, size: u32) -> Self {
46        Control::Auth(Auth { pkt, size })
47    }
48
49    pub(crate) fn pubrel(pkt: codec::PublishAck2, size: u32) -> Self {
50        Control::PublishRelease(PublishRelease::new(pkt, size))
51    }
52
53    /// Create a new `Control` from SUBSCRIBE packet.
54    #[doc(hidden)]
55    pub fn subscribe(pkt: codec::Subscribe, size: u32) -> Self {
56        Control::Subscribe(Subscribe::new(pkt, size))
57    }
58
59    /// Create a new `Control` from UNSUBSCRIBE packet.
60    #[doc(hidden)]
61    pub fn unsubscribe(pkt: codec::Unsubscribe, size: u32) -> Self {
62        Control::Unsubscribe(Unsubscribe::new(pkt, size))
63    }
64
65    /// Create a new PING `Control`.
66    #[doc(hidden)]
67    pub fn ping() -> Self {
68        Control::Ping(Ping)
69    }
70
71    /// Create a new `Control` from DISCONNECT packet.
72    #[doc(hidden)]
73    pub fn remote_disconnect(pkt: codec::Disconnect, size: u32) -> Self {
74        Control::Disconnect(Disconnect(pkt, size))
75    }
76
77    pub(super) const fn closed() -> Self {
78        Control::Closed(Closed)
79    }
80
81    pub(super) const fn wr_backpressure(enabled: bool) -> Self {
82        Control::WrBackpressure(WrBackpressure(enabled))
83    }
84
85    pub(super) fn error(err: E) -> Self {
86        Control::Error(Error::new(err))
87    }
88
89    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
90        Control::PeerGone(PeerGone(err))
91    }
92
93    pub(super) fn proto_error(err: error::ProtocolError) -> Self {
94        Control::ProtocolError(ProtocolError::new(err))
95    }
96
97    /// Disconnects the client by sending DISCONNECT packet
98    /// with `NormalDisconnection` reason code.
99    pub fn disconnect(&self) -> ControlAck {
100        let pkt = codec::Disconnect {
101            reason_code: codec::DisconnectReasonCode::NormalDisconnection,
102            session_expiry_interval_secs: None,
103            server_reference: None,
104            reason_string: None,
105            user_properties: Default::default(),
106        };
107        ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
108    }
109
110    /// Disconnects the client by sending DISCONNECT packet
111    /// with provided reason code.
112    pub fn disconnect_with(&self, pkt: codec::Disconnect) -> ControlAck {
113        ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
114    }
115
116    /// Ack control message
117    pub fn ack(self) -> ControlAck {
118        match self {
119            Control::Auth(_) => super::disconnect("Auth control message is not supported"),
120            Control::PublishRelease(msg) => msg.ack(),
121            Control::Ping(msg) => msg.ack(),
122            Control::Disconnect(msg) => msg.ack(),
123            Control::Subscribe(msg) => msg.ack(),
124            Control::Unsubscribe(msg) => msg.ack(),
125            Control::WrBackpressure(msg) => msg.ack(),
126            Control::Closed(msg) => msg.ack(),
127            Control::Error(_) => super::disconnect("Error control message is not supported"),
128            Control::ProtocolError(msg) => msg.ack(),
129            Control::PeerGone(msg) => msg.ack(),
130        }
131    }
132}
133
134#[derive(Debug)]
135pub struct Auth {
136    pkt: codec::Auth,
137    size: u32,
138}
139
140impl Auth {
141    /// Returns reference to auth packet
142    pub fn packet(&self) -> &codec::Auth {
143        &self.pkt
144    }
145
146    /// Returns size of the packet
147    pub fn packet_size(&self) -> u32 {
148        self.size
149    }
150
151    pub fn ack(self, response: codec::Auth) -> ControlAck {
152        ControlAck { packet: Some(codec::Packet::Auth(response)), disconnect: false }
153    }
154}
155
156#[derive(Debug)]
157pub struct PublishRelease {
158    pkt: codec::PublishAck2,
159    result: codec::PublishAck2,
160    size: u32,
161}
162
163impl PublishRelease {
164    pub(crate) fn new(pkt: codec::PublishAck2, size: u32) -> Self {
165        let packet_id = pkt.packet_id;
166        Self {
167            pkt,
168            size,
169            result: codec::PublishAck2 {
170                packet_id,
171                reason_code: codec::PublishAck2Reason::Success,
172                properties: codec::UserProperties::default(),
173                reason_string: None,
174            },
175        }
176    }
177
178    /// Returns reference to auth packet
179    pub fn packet(&self) -> &codec::PublishAck2 {
180        &self.pkt
181    }
182
183    /// Returns size of the packet
184    pub fn packet_size(&self) -> u32 {
185        self.size
186    }
187
188    /// Update user properties
189    #[inline]
190    pub fn properties<F>(mut self, f: F) -> Self
191    where
192        F: FnOnce(&mut codec::UserProperties),
193    {
194        f(&mut self.result.properties);
195        self
196    }
197
198    /// Set ack reason string
199    #[inline]
200    pub fn reason(mut self, reason: ByteString) -> Self {
201        self.result.reason_string = Some(reason);
202        self
203    }
204
205    /// Ack publish release
206    pub fn ack(self) -> ControlAck {
207        ControlAck {
208            packet: Some(codec::Packet::PublishComplete(self.result)),
209            disconnect: false,
210        }
211    }
212}
213
214#[derive(Debug)]
215pub struct Ping;
216
217impl Ping {
218    pub fn ack(self) -> ControlAck {
219        ControlAck { packet: Some(codec::Packet::PingResponse), disconnect: false }
220    }
221}
222
223#[derive(Debug)]
224pub struct Disconnect(pub(crate) codec::Disconnect, pub(crate) u32);
225
226impl Disconnect {
227    /// Returns reference to disconnect packet
228    pub fn packet(&self) -> &codec::Disconnect {
229        &self.0
230    }
231
232    /// Returns size of the packet
233    pub fn packet_size(&self) -> u32 {
234        self.1
235    }
236
237    /// Ack disconnect message
238    pub fn ack(self) -> ControlAck {
239        ControlAck { packet: None, disconnect: true }
240    }
241}
242
243/// Subscribe message
244#[derive(Debug)]
245pub struct Subscribe {
246    packet: codec::Subscribe,
247    result: codec::SubscribeAck,
248    size: u32,
249}
250
251impl Subscribe {
252    /// Create a new `Subscribe` control message from a Subscribe
253    /// packet
254    pub fn new(packet: codec::Subscribe, size: u32) -> Self {
255        let mut status = Vec::with_capacity(packet.topic_filters.len());
256        (0..packet.topic_filters.len())
257            .for_each(|_| status.push(codec::SubscribeAckReason::UnspecifiedError));
258
259        let result = codec::SubscribeAck {
260            status,
261            packet_id: packet.packet_id,
262            properties: codec::UserProperties::default(),
263            reason_string: None,
264        };
265
266        Self { packet, result, size }
267    }
268
269    #[inline]
270    /// returns iterator over subscription topics
271    pub fn iter_mut(&mut self) -> SubscribeIter<'_> {
272        SubscribeIter { subs: self as *const _ as *mut _, entry: 0, lt: PhantomData }
273    }
274
275    #[inline]
276    /// Reason string for ack packet
277    pub fn ack_reason(mut self, reason: ByteString) -> Self {
278        self.result.reason_string = Some(reason);
279        self
280    }
281
282    #[inline]
283    /// Properties for ack packet
284    pub fn ack_properties<F>(mut self, f: F) -> Self
285    where
286        F: FnOnce(&mut codec::UserProperties),
287    {
288        f(&mut self.result.properties);
289        self
290    }
291
292    #[inline]
293    /// Ack Subscribe packet
294    pub fn ack(self) -> ControlAck {
295        ControlAck { packet: Some(codec::Packet::SubscribeAck(self.result)), disconnect: false }
296    }
297
298    /// Returns reference to subscribe packet
299    pub fn packet(&self) -> &codec::Subscribe {
300        &self.packet
301    }
302
303    /// Returns size of the packet
304    pub fn packet_size(&self) -> u32 {
305        self.size
306    }
307}
308
309impl<'a> IntoIterator for &'a mut Subscribe {
310    type Item = Subscription<'a>;
311    type IntoIter = SubscribeIter<'a>;
312
313    fn into_iter(self) -> SubscribeIter<'a> {
314        self.iter_mut()
315    }
316}
317
318/// Iterator over subscription topics
319pub struct SubscribeIter<'a> {
320    subs: *mut Subscribe,
321    entry: usize,
322    lt: PhantomData<&'a mut Subscribe>,
323}
324
325impl<'a> SubscribeIter<'a> {
326    fn next_unsafe(&mut self) -> Option<Subscription<'a>> {
327        let subs = unsafe { &mut *self.subs };
328
329        if self.entry < subs.packet.topic_filters.len() {
330            let s = Subscription {
331                topic: &subs.packet.topic_filters[self.entry].0,
332                options: &subs.packet.topic_filters[self.entry].1,
333                status: &mut subs.result.status[self.entry],
334            };
335            self.entry += 1;
336            Some(s)
337        } else {
338            None
339        }
340    }
341}
342
343impl<'a> Iterator for SubscribeIter<'a> {
344    type Item = Subscription<'a>;
345
346    #[inline]
347    fn next(&mut self) -> Option<Subscription<'a>> {
348        self.next_unsafe()
349    }
350}
351
352/// Subscription topic
353#[derive(Debug)]
354pub struct Subscription<'a> {
355    topic: &'a ByteString,
356    options: &'a codec::SubscriptionOptions,
357    status: &'a mut codec::SubscribeAckReason,
358}
359
360impl<'a> Subscription<'a> {
361    #[inline]
362    /// subscription topic
363    pub fn topic(&self) -> &'a ByteString {
364        self.topic
365    }
366
367    #[inline]
368    /// subscription options for current topic
369    pub fn options(&self) -> &codec::SubscriptionOptions {
370        self.options
371    }
372
373    #[inline]
374    /// fail to subscribe to the topic
375    pub fn fail(&mut self, status: codec::SubscribeAckReason) {
376        *self.status = status
377    }
378
379    #[inline]
380    /// confirm subscription to a topic with specific qos
381    pub fn confirm(&mut self, qos: QoS) {
382        match qos {
383            QoS::AtMostOnce => *self.status = codec::SubscribeAckReason::GrantedQos0,
384            QoS::AtLeastOnce => *self.status = codec::SubscribeAckReason::GrantedQos1,
385            QoS::ExactlyOnce => *self.status = codec::SubscribeAckReason::GrantedQos2,
386        }
387    }
388
389    #[inline]
390    #[doc(hidden)]
391    /// confirm subscription to a topic with specific qos
392    pub fn subscribe(&mut self, qos: QoS) {
393        self.confirm(qos)
394    }
395}
396
397/// Unsubscribe message
398#[derive(Debug)]
399pub struct Unsubscribe {
400    packet: codec::Unsubscribe,
401    result: codec::UnsubscribeAck,
402    size: u32,
403}
404
405impl Unsubscribe {
406    /// Create a new `Unsubscribe` control message from an Unsubscribe
407    /// packet
408    pub fn new(packet: codec::Unsubscribe, size: u32) -> Self {
409        let mut status = Vec::with_capacity(packet.topic_filters.len());
410        (0..packet.topic_filters.len())
411            .for_each(|_| status.push(codec::UnsubscribeAckReason::Success));
412
413        let result = codec::UnsubscribeAck {
414            status,
415            packet_id: packet.packet_id,
416            properties: codec::UserProperties::default(),
417            reason_string: None,
418        };
419
420        Self { packet, result, size }
421    }
422
423    /// Unsubscribe packet user properties
424    pub fn properties(&self) -> &codec::UserProperties {
425        &self.packet.user_properties
426    }
427
428    /// returns iterator over unsubscribe topics
429    pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
430        self.packet.topic_filters.iter()
431    }
432
433    #[inline]
434    /// returns iterator over subscription topics
435    pub fn iter_mut(&mut self) -> UnsubscribeIter<'_> {
436        UnsubscribeIter { subs: self as *const _ as *mut _, entry: 0, lt: PhantomData }
437    }
438
439    #[inline]
440    /// Reason string for ack packet
441    pub fn ack_reason(mut self, reason: ByteString) -> Self {
442        self.result.reason_string = Some(reason);
443        self
444    }
445
446    #[inline]
447    /// Properties for ack packet
448    pub fn ack_properties<F>(mut self, f: F) -> Self
449    where
450        F: FnOnce(&mut codec::UserProperties),
451    {
452        f(&mut self.result.properties);
453        self
454    }
455
456    #[inline]
457    /// convert packet to a result
458    pub fn ack(self) -> ControlAck {
459        ControlAck {
460            packet: Some(codec::Packet::UnsubscribeAck(self.result)),
461            disconnect: false,
462        }
463    }
464
465    /// Returns reference to unsubscribe packet
466    pub fn packet(&self) -> &codec::Unsubscribe {
467        &self.packet
468    }
469
470    /// Returns size of the packet
471    pub fn packet_size(&self) -> u32 {
472        self.size
473    }
474}
475
476impl<'a> IntoIterator for &'a mut Unsubscribe {
477    type Item = UnsubscribeItem<'a>;
478    type IntoIter = UnsubscribeIter<'a>;
479
480    fn into_iter(self) -> UnsubscribeIter<'a> {
481        self.iter_mut()
482    }
483}
484
485/// Iterator over topics to unsubscribe
486pub struct UnsubscribeIter<'a> {
487    subs: *mut Unsubscribe,
488    entry: usize,
489    lt: PhantomData<&'a mut Unsubscribe>,
490}
491
492impl<'a> UnsubscribeIter<'a> {
493    fn next_unsafe(&mut self) -> Option<UnsubscribeItem<'a>> {
494        let subs = unsafe { &mut *self.subs };
495
496        if self.entry < subs.packet.topic_filters.len() {
497            let s = UnsubscribeItem {
498                topic: &subs.packet.topic_filters[self.entry],
499                status: &mut subs.result.status[self.entry],
500            };
501            self.entry += 1;
502            Some(s)
503        } else {
504            None
505        }
506    }
507}
508
509impl<'a> Iterator for UnsubscribeIter<'a> {
510    type Item = UnsubscribeItem<'a>;
511
512    #[inline]
513    fn next(&mut self) -> Option<UnsubscribeItem<'a>> {
514        self.next_unsafe()
515    }
516}
517
518/// Subscription topic
519#[derive(Debug)]
520pub struct UnsubscribeItem<'a> {
521    topic: &'a ByteString,
522    status: &'a mut codec::UnsubscribeAckReason,
523}
524
525impl<'a> UnsubscribeItem<'a> {
526    #[inline]
527    /// subscription topic
528    pub fn topic(&self) -> &'a ByteString {
529        self.topic
530    }
531
532    #[inline]
533    /// fail to unsubscribe from the topic
534    pub fn fail(&mut self, status: codec::UnsubscribeAckReason) {
535        *self.status = status;
536    }
537
538    #[inline]
539    /// unsubscribe from a topic
540    pub fn success(&mut self) {
541        *self.status = codec::UnsubscribeAckReason::Success;
542    }
543}
544
545/// Write back-pressure message
546#[derive(Debug)]
547pub struct WrBackpressure(bool);
548
549impl WrBackpressure {
550    #[inline]
551    /// Is write back-pressure enabled
552    pub fn enabled(&self) -> bool {
553        self.0
554    }
555
556    #[inline]
557    /// convert packet to a result
558    pub fn ack(self) -> ControlAck {
559        ControlAck { packet: None, disconnect: false }
560    }
561}
562
563/// Connection closed message
564#[derive(Debug)]
565pub struct Closed;
566
567impl Closed {
568    #[inline]
569    /// convert packet to a result
570    pub fn ack(self) -> ControlAck {
571        ControlAck { packet: None, disconnect: false }
572    }
573}
574
575/// Service level error
576#[derive(Debug)]
577pub struct Error<E> {
578    err: E,
579    pkt: codec::Disconnect,
580}
581
582impl<E> Error<E> {
583    pub fn new(err: E) -> Self {
584        Self {
585            err,
586            pkt: codec::Disconnect {
587                session_expiry_interval_secs: None,
588                server_reference: None,
589                reason_string: None,
590                user_properties: UserProperties::default(),
591                reason_code: DisconnectReasonCode::ImplementationSpecificError,
592            },
593        }
594    }
595
596    #[inline]
597    /// Returns reference to mqtt error
598    pub fn get_ref(&self) -> &E {
599        &self.err
600    }
601
602    #[inline]
603    /// Set reason string for disconnect packet
604    pub fn reason_string(mut self, reason: ByteString) -> Self {
605        self.pkt.reason_string = Some(reason);
606        self
607    }
608
609    #[inline]
610    /// Set server reference for disconnect packet
611    pub fn server_reference(mut self, reference: ByteString) -> Self {
612        self.pkt.server_reference = Some(reference);
613        self
614    }
615
616    #[inline]
617    /// Update disconnect packet properties
618    pub fn properties<F>(mut self, f: F) -> Self
619    where
620        F: FnOnce(&mut codec::UserProperties),
621    {
622        f(&mut self.pkt.user_properties);
623        self
624    }
625
626    #[inline]
627    /// Ack service error, return disconnect packet and close connection.
628    pub fn ack(mut self, reason: DisconnectReasonCode) -> ControlAck {
629        self.pkt.reason_code = reason;
630        ControlAck { packet: Some(codec::Packet::Disconnect(self.pkt)), disconnect: true }
631    }
632
633    #[inline]
634    /// Ack service error, return disconnect packet and close connection.
635    pub fn ack_with<F>(self, f: F) -> ControlAck
636    where
637        F: FnOnce(E, codec::Disconnect) -> codec::Disconnect,
638    {
639        let pkt = f(self.err, self.pkt);
640        ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
641    }
642}
643
644/// Protocol level error
645#[derive(Debug)]
646pub struct ProtocolError {
647    err: error::ProtocolError,
648    pkt: codec::Disconnect,
649}
650
651impl ProtocolError {
652    pub fn new(err: error::ProtocolError) -> Self {
653        Self {
654            pkt: codec::Disconnect {
655                session_expiry_interval_secs: None,
656                server_reference: None,
657                reason_string: None,
658                user_properties: UserProperties::default(),
659                reason_code: match err {
660                    error::ProtocolError::Decode(error::DecodeError::InvalidLength) => {
661                        DisconnectReasonCode::MalformedPacket
662                    }
663                    error::ProtocolError::Decode(error::DecodeError::MaxSizeExceeded) => {
664                        DisconnectReasonCode::PacketTooLarge
665                    }
666                    error::ProtocolError::KeepAliveTimeout => {
667                        DisconnectReasonCode::KeepAliveTimeout
668                    }
669                    error::ProtocolError::ProtocolViolation(ref e) => e.reason(),
670                    error::ProtocolError::Encode(_) => {
671                        DisconnectReasonCode::ImplementationSpecificError
672                    }
673                    _ => DisconnectReasonCode::ImplementationSpecificError,
674                },
675            },
676            err,
677        }
678    }
679
680    #[inline]
681    /// Returns reference to a protocol error
682    pub fn get_ref(&self) -> &error::ProtocolError {
683        &self.err
684    }
685
686    #[inline]
687    /// Set reason code for disconnect packet
688    pub fn reason_code(mut self, reason: DisconnectReasonCode) -> Self {
689        self.pkt.reason_code = reason;
690        self
691    }
692
693    #[inline]
694    /// Set reason string for disconnect packet
695    pub fn reason_string(mut self, reason: ByteString) -> Self {
696        self.pkt.reason_string = Some(reason);
697        self
698    }
699
700    #[inline]
701    /// Set server reference for disconnect packet
702    pub fn server_reference(mut self, reference: ByteString) -> Self {
703        self.pkt.server_reference = Some(reference);
704        self
705    }
706
707    #[inline]
708    /// Update disconnect packet properties
709    pub fn properties<F>(mut self, f: F) -> Self
710    where
711        F: FnOnce(&mut codec::UserProperties),
712    {
713        f(&mut self.pkt.user_properties);
714        self
715    }
716
717    #[inline]
718    /// Ack protocol error, return disconnect packet and close connection.
719    pub fn ack(self) -> ControlAck {
720        ControlAck { packet: Some(codec::Packet::Disconnect(self.pkt)), disconnect: true }
721    }
722
723    #[inline]
724    /// Ack protocol error, return disconnect packet and close connection.
725    pub fn ack_and_error(self) -> (ControlAck, error::ProtocolError) {
726        (
727            ControlAck { packet: Some(codec::Packet::Disconnect(self.pkt)), disconnect: true },
728            self.err,
729        )
730    }
731}
732
733#[derive(Debug)]
734pub struct PeerGone(Option<io::Error>);
735
736impl PeerGone {
737    /// Returns error reference
738    pub fn err(&self) -> Option<&io::Error> {
739        self.0.as_ref()
740    }
741
742    /// Take error
743    pub fn take(&mut self) -> Option<io::Error> {
744        self.0.take()
745    }
746
747    /// Ack PeerGone message
748    pub fn ack(self) -> ControlAck {
749        ControlAck { packet: None, disconnect: true }
750    }
751}