ntex_mqtt/v5/client/
control.rs1use std::io;
2
3use ntex_bytes::{ByteString, Bytes};
4
5use crate::{error, payload::Payload, v5::codec};
6
7pub use crate::v5::control::{
8 Closed, ControlAck, Disconnect, Error, ProtocolError, PublishRelease,
9};
10
11#[derive(Debug)]
13pub enum Control<E> {
14 Publish(Publish),
16 PublishRelease(PublishRelease),
18 Disconnect(Disconnect),
20 Error(Error<E>),
22 ProtocolError(ProtocolError),
24 Closed(Closed),
26 PeerGone(PeerGone),
28}
29
30impl<E> Control<E> {
31 pub(super) fn publish(pkt: codec::Publish, pl: Payload, size: u32) -> Self {
32 Control::Publish(Publish(pkt, pl, size))
33 }
34
35 pub(super) fn pubrel(pkt: codec::PublishAck2, size: u32) -> Self {
36 Control::PublishRelease(PublishRelease::new(pkt, size))
37 }
38
39 pub(super) fn dis(pkt: codec::Disconnect, size: u32) -> Self {
40 Control::Disconnect(Disconnect(pkt, size))
41 }
42
43 pub(super) const fn closed() -> Self {
44 Control::Closed(Closed)
45 }
46
47 pub(super) fn error(err: E) -> Self {
48 Control::Error(Error::new(err))
49 }
50
51 pub(super) fn proto_error(err: error::ProtocolError) -> Self {
52 Control::ProtocolError(ProtocolError::new(err))
53 }
54
55 pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
56 Control::PeerGone(PeerGone(err))
57 }
58
59 pub fn disconnect(&self, pkt: codec::Disconnect) -> ControlAck {
60 ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
61 }
62
63 pub fn ack(self) -> ControlAck {
65 match self {
66 Control::Publish(_) => {
67 crate::v5::disconnect("Publish control message is not supported")
68 }
69 Control::PublishRelease(msg) => msg.ack(),
70 Control::Disconnect(msg) => msg.ack(),
71 Control::Closed(msg) => msg.ack(),
72 Control::Error(_) => {
73 crate::v5::disconnect("Error control message is not supported")
74 }
75 Control::ProtocolError(msg) => msg.ack(),
76 Control::PeerGone(msg) => msg.ack(),
77 }
78 }
79}
80
81#[derive(Debug)]
82pub struct Publish(codec::Publish, Payload, u32);
83
84impl Publish {
85 pub fn packet(&self) -> &codec::Publish {
87 &self.0
88 }
89
90 pub fn packet_mut(&mut self) -> &mut codec::Publish {
92 &mut self.0
93 }
94
95 pub fn packet_size(&self) -> u32 {
97 self.2
98 }
99
100 #[inline]
101 pub fn payload_size(&self) -> usize {
103 self.0.payload_size as usize
104 }
105
106 #[inline]
107 pub async fn read(&self) -> Result<Option<Bytes>, error::PayloadError> {
109 self.1.read().await
110 }
111
112 #[inline]
113 pub async fn read_all(&self) -> Result<Option<Bytes>, error::PayloadError> {
115 self.1.read_all().await
116 }
117
118 pub fn ack_qos0(self) -> ControlAck {
119 ControlAck { packet: None, disconnect: false }
120 }
121
122 pub fn ack(self, reason_code: codec::PublishAckReason) -> ControlAck {
123 ControlAck {
124 packet: self.0.packet_id.map(|packet_id| {
125 codec::Packet::PublishAck(codec::PublishAck {
126 packet_id,
127 reason_code,
128 properties: codec::UserProperties::new(),
129 reason_string: None,
130 })
131 }),
132 disconnect: false,
133 }
134 }
135
136 pub fn ack_with(
137 self,
138 reason_code: codec::PublishAckReason,
139 properties: codec::UserProperties,
140 reason_string: Option<ByteString>,
141 ) -> ControlAck {
142 ControlAck {
143 packet: self.0.packet_id.map(|packet_id| {
144 codec::Packet::PublishAck(codec::PublishAck {
145 packet_id,
146 reason_code,
147 properties,
148 reason_string,
149 })
150 }),
151 disconnect: false,
152 }
153 }
154
155 pub fn into_inner(
156 self,
157 reason_code: codec::PublishAckReason,
158 ) -> (ControlAck, codec::Publish) {
159 (
160 ControlAck {
161 packet: self.0.packet_id.map(|packet_id| {
162 codec::Packet::PublishAck(codec::PublishAck {
163 packet_id,
164 reason_code,
165 properties: codec::UserProperties::new(),
166 reason_string: None,
167 })
168 }),
169 disconnect: false,
170 },
171 self.0,
172 )
173 }
174}
175
176#[derive(Debug)]
177pub struct PeerGone(Option<io::Error>);
178
179impl PeerGone {
180 pub fn error(&self) -> Option<&io::Error> {
182 self.0.as_ref()
183 }
184
185 pub fn ack(self) -> ControlAck {
187 ControlAck { packet: None, disconnect: true }
188 }
189}