ntex_mqtt/v3/client/
control.rs

1use std::{io, num::NonZeroU16};
2
3use ntex_bytes::Bytes;
4
5use crate::payload::Payload;
6pub use crate::v3::control::{
7    Closed, ControlAck, Disconnect, Error, PeerGone, ProtocolError, PublishRelease,
8};
9use crate::v3::{codec, control::ControlAckKind, error};
10
11/// Client control messages
12#[derive(Debug)]
13pub enum Control<E> {
14    /// Unhandled publish packet
15    Publish(Publish),
16    /// Publish release packet
17    PublishRelease(PublishRelease),
18    /// Connection closed
19    Closed(Closed),
20    /// Application level error from resources and control services
21    Error(Error<E>),
22    /// Protocol level error
23    ProtocolError(ProtocolError),
24    /// Peer is gone
25    PeerGone(PeerGone),
26}
27
28impl<E> Control<E> {
29    pub(super) fn publish(pkt: codec::Publish, pl: Payload, size: u32) -> Self {
30        Control::Publish(Publish(pkt, pl, size))
31    }
32
33    pub(super) fn pubrel(packet_id: NonZeroU16) -> Self {
34        Control::PublishRelease(PublishRelease { packet_id })
35    }
36
37    pub(super) fn closed() -> Self {
38        Control::Closed(Closed)
39    }
40
41    pub(super) fn error(err: E) -> Self {
42        Control::Error(Error::new(err))
43    }
44
45    pub(super) fn proto_error(err: error::ProtocolError) -> Self {
46        Control::ProtocolError(ProtocolError::new(err))
47    }
48
49    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
50        Control::PeerGone(PeerGone(err))
51    }
52
53    /// Initiate clean disconnect
54    pub fn disconnect(&self) -> ControlAck {
55        ControlAck { result: ControlAckKind::Disconnect }
56    }
57
58    /// Ack control message
59    pub fn ack(self) -> ControlAck {
60        match self {
61            Control::Publish(msg) => msg.ack(),
62            Control::PublishRelease(msg) => msg.ack(),
63            Control::Closed(msg) => msg.ack(),
64            Control::Error(msg) => msg.ack(),
65            Control::ProtocolError(msg) => msg.ack(),
66            Control::PeerGone(msg) => msg.ack(),
67        }
68    }
69}
70
71#[derive(Debug)]
72pub struct Publish(codec::Publish, Payload, u32);
73
74impl Publish {
75    /// Returns reference to publish packet
76    pub fn packet(&self) -> &codec::Publish {
77        &self.0
78    }
79
80    /// Returns reference to publish packet
81    pub fn packet_mut(&mut self) -> &mut codec::Publish {
82        &mut self.0
83    }
84
85    /// Returns size of the packet
86    pub fn packet_size(&self) -> u32 {
87        self.2
88    }
89
90    #[inline]
91    /// Returns size of the payload
92    pub fn payload_size(&self) -> usize {
93        self.0.payload_size as usize
94    }
95
96    #[inline]
97    /// Payload that is being published.
98    pub async fn read(&self) -> Result<Option<Bytes>, error::PayloadError> {
99        self.1.read().await
100    }
101
102    #[inline]
103    /// Payload that is being published.
104    pub async fn read_all(&self) -> Result<Option<Bytes>, error::PayloadError> {
105        self.1.read_all().await
106    }
107
108    pub fn ack(self) -> ControlAck {
109        if let Some(id) = self.0.packet_id {
110            ControlAck { result: ControlAckKind::PublishAck(id) }
111        } else {
112            ControlAck { result: ControlAckKind::Nothing }
113        }
114    }
115
116    pub fn into_inner(self) -> (ControlAck, codec::Publish) {
117        if let Some(id) = self.0.packet_id {
118            (ControlAck { result: ControlAckKind::PublishAck(id) }, self.0)
119        } else {
120            (ControlAck { result: ControlAckKind::Nothing }, self.0)
121        }
122    }
123}