ntex_mqtt/v5/client/
control.rs

1use std::io;
2
3use ntex_bytes::{ByteString, Bytes};
4
5use crate::{error, payload::Payload, v5::codec};
6
7pub use crate::v5::control::{
8    Closed, ControlAck, Disconnect, Error, ProtocolError, PublishRelease,
9};
10
11/// Client control messages
12#[derive(Debug)]
13pub enum Control<E> {
14    /// Unhandled publish packet
15    Publish(Publish),
16    /// Publish release packet
17    PublishRelease(PublishRelease),
18    /// Disconnect packet
19    Disconnect(Disconnect),
20    /// Application level error from resources and control services
21    Error(Error<E>),
22    /// Protocol level error
23    ProtocolError(ProtocolError),
24    /// Connection closed
25    Closed(Closed),
26    /// Peer is gone
27    PeerGone(PeerGone),
28}
29
30impl<E> Control<E> {
31    pub(super) fn publish(pkt: codec::Publish, pl: Payload, size: u32) -> Self {
32        Control::Publish(Publish(pkt, pl, size))
33    }
34
35    pub(super) fn pubrel(pkt: codec::PublishAck2, size: u32) -> Self {
36        Control::PublishRelease(PublishRelease::new(pkt, size))
37    }
38
39    pub(super) fn dis(pkt: codec::Disconnect, size: u32) -> Self {
40        Control::Disconnect(Disconnect(pkt, size))
41    }
42
43    pub(super) const fn closed() -> Self {
44        Control::Closed(Closed)
45    }
46
47    pub(super) fn error(err: E) -> Self {
48        Control::Error(Error::new(err))
49    }
50
51    pub(super) fn proto_error(err: error::ProtocolError) -> Self {
52        Control::ProtocolError(ProtocolError::new(err))
53    }
54
55    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
56        Control::PeerGone(PeerGone(err))
57    }
58
59    pub fn disconnect(&self, pkt: codec::Disconnect) -> ControlAck {
60        ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
61    }
62
63    /// Ack control message
64    pub fn ack(self) -> ControlAck {
65        match self {
66            Control::Publish(_) => {
67                crate::v5::disconnect("Publish control message is not supported")
68            }
69            Control::PublishRelease(msg) => msg.ack(),
70            Control::Disconnect(msg) => msg.ack(),
71            Control::Closed(msg) => msg.ack(),
72            Control::Error(_) => {
73                crate::v5::disconnect("Error control message is not supported")
74            }
75            Control::ProtocolError(msg) => msg.ack(),
76            Control::PeerGone(msg) => msg.ack(),
77        }
78    }
79}
80
81#[derive(Debug)]
82pub struct Publish(codec::Publish, Payload, u32);
83
84impl Publish {
85    /// Returns reference to publish packet
86    pub fn packet(&self) -> &codec::Publish {
87        &self.0
88    }
89
90    /// Returns reference to publish packet
91    pub fn packet_mut(&mut self) -> &mut codec::Publish {
92        &mut self.0
93    }
94
95    /// Returns size of the packet
96    pub fn packet_size(&self) -> u32 {
97        self.2
98    }
99
100    #[inline]
101    /// Returns size of the payload
102    pub fn payload_size(&self) -> usize {
103        self.0.payload_size as usize
104    }
105
106    #[inline]
107    /// Payload that is being published.
108    pub async fn read(&self) -> Result<Option<Bytes>, error::PayloadError> {
109        self.1.read().await
110    }
111
112    #[inline]
113    /// Payload that is being published.
114    pub async fn read_all(&self) -> Result<Option<Bytes>, error::PayloadError> {
115        self.1.read_all().await
116    }
117
118    pub fn ack_qos0(self) -> ControlAck {
119        ControlAck { packet: None, disconnect: false }
120    }
121
122    pub fn ack(self, reason_code: codec::PublishAckReason) -> ControlAck {
123        ControlAck {
124            packet: self.0.packet_id.map(|packet_id| {
125                codec::Packet::PublishAck(codec::PublishAck {
126                    packet_id,
127                    reason_code,
128                    properties: codec::UserProperties::new(),
129                    reason_string: None,
130                })
131            }),
132            disconnect: false,
133        }
134    }
135
136    pub fn ack_with(
137        self,
138        reason_code: codec::PublishAckReason,
139        properties: codec::UserProperties,
140        reason_string: Option<ByteString>,
141    ) -> ControlAck {
142        ControlAck {
143            packet: self.0.packet_id.map(|packet_id| {
144                codec::Packet::PublishAck(codec::PublishAck {
145                    packet_id,
146                    reason_code,
147                    properties,
148                    reason_string,
149                })
150            }),
151            disconnect: false,
152        }
153    }
154
155    pub fn into_inner(
156        self,
157        reason_code: codec::PublishAckReason,
158    ) -> (ControlAck, codec::Publish) {
159        (
160            ControlAck {
161                packet: self.0.packet_id.map(|packet_id| {
162                    codec::Packet::PublishAck(codec::PublishAck {
163                        packet_id,
164                        reason_code,
165                        properties: codec::UserProperties::new(),
166                        reason_string: None,
167                    })
168                }),
169                disconnect: false,
170            },
171            self.0,
172        )
173    }
174}
175
176#[derive(Debug)]
177pub struct PeerGone(Option<io::Error>);
178
179impl PeerGone {
180    /// Returns error reference
181    pub fn error(&self) -> Option<&io::Error> {
182        self.0.as_ref()
183    }
184
185    /// Ack PeerGone message
186    pub fn ack(self) -> ControlAck {
187        ControlAck { packet: None, disconnect: true }
188    }
189}