1use ntex_bytes::ByteString;
2use std::{io, marker::PhantomData, num::NonZeroU16};
3
4use super::codec;
5use crate::{error, types::QoS};
6
7#[derive(Debug)]
9pub enum Control<E> {
10 PublishRelease(PublishRelease),
12 Ping(Ping),
14 Disconnect(Disconnect),
16 Subscribe(Subscribe),
18 Unsubscribe(Unsubscribe),
20 WrBackpressure(WrBackpressure),
22 Closed(Closed),
24 Error(Error<E>),
26 ProtocolError(ProtocolError),
28 PeerGone(PeerGone),
30}
31
32#[derive(Debug)]
33pub struct ControlAck {
34 pub(crate) result: ControlAckKind,
35}
36
37#[derive(Debug)]
38pub(crate) enum ControlAckKind {
39 Nothing,
40 PublishAck(NonZeroU16),
41 PublishRelease(NonZeroU16),
42 Ping,
43 Disconnect,
44 Subscribe(SubscribeResult),
45 Unsubscribe(UnsubscribeResult),
46 Closed,
47}
48
49impl<E> Control<E> {
50 pub(crate) fn pubrel(packet_id: NonZeroU16) -> Self {
51 Control::PublishRelease(PublishRelease { packet_id })
52 }
53
54 #[doc(hidden)]
56 pub fn ping() -> Self {
57 Control::Ping(Ping)
58 }
59
60 #[doc(hidden)]
62 pub fn subscribe(pkt: Subscribe) -> Self {
63 Control::Subscribe(pkt)
64 }
65
66 #[doc(hidden)]
68 pub fn unsubscribe(pkt: Unsubscribe) -> Self {
69 Control::Unsubscribe(pkt)
70 }
71
72 #[doc(hidden)]
74 pub fn remote_disconnect() -> Self {
75 Control::Disconnect(Disconnect)
76 }
77
78 pub(super) const fn closed() -> Self {
79 Control::Closed(Closed)
80 }
81
82 pub(super) const fn wr_backpressure(enabled: bool) -> Self {
83 Control::WrBackpressure(WrBackpressure(enabled))
84 }
85
86 pub(super) fn error(err: E) -> Self {
87 Control::Error(Error::new(err))
88 }
89
90 pub(super) fn proto_error(err: error::ProtocolError) -> Self {
91 Control::ProtocolError(ProtocolError::new(err))
92 }
93
94 pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
96 Control::PeerGone(PeerGone(err))
97 }
98
99 pub fn disconnect(&self) -> ControlAck {
101 ControlAck { result: ControlAckKind::Disconnect }
102 }
103
104 pub fn ack(self) -> ControlAck {
106 match self {
107 Control::PublishRelease(msg) => msg.ack(),
108 Control::Ping(msg) => msg.ack(),
109 Control::Disconnect(msg) => msg.ack(),
110 Control::Subscribe(_) => {
111 log::warn!("Subscribe is not supported");
112 ControlAck { result: ControlAckKind::Disconnect }
113 }
114 Control::Unsubscribe(_) => {
115 log::warn!("Unsubscribe is not supported");
116 ControlAck { result: ControlAckKind::Disconnect }
117 }
118 Control::WrBackpressure(msg) => msg.ack(),
119 Control::Closed(msg) => msg.ack(),
120 Control::Error(msg) => msg.ack(),
121 Control::ProtocolError(msg) => msg.ack(),
122 Control::PeerGone(msg) => msg.ack(),
123 }
124 }
125}
126
127#[derive(Copy, Clone, Debug)]
129pub struct PublishRelease {
130 pub packet_id: NonZeroU16,
131}
132
133impl PublishRelease {
134 #[inline]
135 pub fn id(self) -> NonZeroU16 {
137 self.packet_id
138 }
139
140 #[inline]
141 pub fn ack(self) -> ControlAck {
143 ControlAck { result: ControlAckKind::PublishRelease(self.packet_id) }
144 }
145}
146
147pub(crate) struct PublishReleaseResult {
148 pub packet_id: NonZeroU16,
149}
150
151#[derive(Copy, Clone, Debug)]
152pub struct Ping;
153
154impl Ping {
155 pub fn ack(self) -> ControlAck {
156 ControlAck { result: ControlAckKind::Ping }
157 }
158}
159
160#[derive(Copy, Clone, Debug)]
161pub struct Disconnect;
162
163impl Disconnect {
164 pub fn ack(self) -> ControlAck {
165 ControlAck { result: ControlAckKind::Disconnect }
166 }
167}
168
169#[derive(Debug)]
171pub struct Error<E> {
172 err: E,
173}
174
175impl<E> Error<E> {
176 pub fn new(err: E) -> Self {
177 Self { err }
178 }
179
180 #[inline]
181 pub fn get_ref(&self) -> &E {
183 &self.err
184 }
185
186 #[inline]
187 pub fn ack(self) -> ControlAck {
189 ControlAck { result: ControlAckKind::Disconnect }
190 }
191
192 #[inline]
193 pub fn ack_and_error(self) -> (ControlAck, E) {
195 (ControlAck { result: ControlAckKind::Disconnect }, self.err)
196 }
197}
198
199#[derive(Debug)]
201pub struct ProtocolError {
202 err: error::ProtocolError,
203}
204
205impl ProtocolError {
206 pub fn new(err: error::ProtocolError) -> Self {
207 Self { err }
208 }
209
210 #[inline]
211 pub fn get_ref(&self) -> &error::ProtocolError {
213 &self.err
214 }
215
216 #[inline]
217 pub fn ack(self) -> ControlAck {
219 ControlAck { result: ControlAckKind::Disconnect }
220 }
221
222 #[inline]
223 pub fn ack_and_error(self) -> (ControlAck, error::ProtocolError) {
225 (ControlAck { result: ControlAckKind::Disconnect }, self.err)
226 }
227}
228
229#[derive(Debug)]
231pub struct Subscribe {
232 packet_id: NonZeroU16,
233 packet_size: u32,
234 topics: Vec<(ByteString, QoS)>,
235 codes: Vec<codec::SubscribeReturnCode>,
236}
237
238#[derive(Debug)]
240pub(crate) struct SubscribeResult {
241 pub(crate) codes: Vec<codec::SubscribeReturnCode>,
242 pub(crate) packet_id: NonZeroU16,
243}
244
245impl Subscribe {
246 #[doc(hidden)]
249 pub fn new(
250 packet_id: NonZeroU16,
251 packet_size: u32,
252 topics: Vec<(ByteString, QoS)>,
253 ) -> Self {
254 let mut codes = Vec::with_capacity(topics.len());
255 (0..topics.len()).for_each(|_| codes.push(codec::SubscribeReturnCode::Failure));
256
257 Self { packet_id, packet_size, topics, codes }
258 }
259
260 pub fn packet_size(&self) -> u32 {
262 self.packet_size
263 }
264
265 #[inline]
266 pub fn iter_mut(&mut self) -> SubscribeIter<'_> {
268 SubscribeIter { subs: self as *const _ as *mut _, entry: 0, lt: PhantomData }
269 }
270
271 #[inline]
272 pub fn ack(self) -> ControlAck {
274 ControlAck {
275 result: ControlAckKind::Subscribe(SubscribeResult {
276 codes: self.codes,
277 packet_id: self.packet_id,
278 }),
279 }
280 }
281}
282
283impl<'a> IntoIterator for &'a mut Subscribe {
284 type Item = Subscription<'a>;
285 type IntoIter = SubscribeIter<'a>;
286
287 fn into_iter(self) -> SubscribeIter<'a> {
288 self.iter_mut()
289 }
290}
291
292pub struct SubscribeIter<'a> {
294 subs: *mut Subscribe,
295 entry: usize,
296 lt: PhantomData<&'a mut Subscribe>,
297}
298
299impl<'a> SubscribeIter<'a> {
300 fn next_unsafe(&mut self) -> Option<Subscription<'a>> {
301 let subs = unsafe { &mut *self.subs };
302
303 if self.entry < subs.topics.len() {
304 let s = Subscription {
305 topic: &subs.topics[self.entry].0,
306 qos: subs.topics[self.entry].1,
307 code: &mut subs.codes[self.entry],
308 };
309 self.entry += 1;
310 Some(s)
311 } else {
312 None
313 }
314 }
315}
316
317impl<'a> Iterator for SubscribeIter<'a> {
318 type Item = Subscription<'a>;
319
320 #[inline]
321 fn next(&mut self) -> Option<Subscription<'a>> {
322 self.next_unsafe()
323 }
324}
325
326#[derive(Debug)]
328pub struct Subscription<'a> {
329 topic: &'a ByteString,
330 qos: QoS,
331 code: &'a mut codec::SubscribeReturnCode,
332}
333
334impl<'a> Subscription<'a> {
335 #[inline]
336 pub fn topic(&self) -> &'a ByteString {
338 self.topic
339 }
340
341 #[inline]
342 pub fn qos(&self) -> QoS {
344 self.qos
345 }
346
347 #[inline]
348 pub fn fail(&mut self) {
350 *self.code = codec::SubscribeReturnCode::Failure
351 }
352
353 #[inline]
354 pub fn confirm(&mut self, qos: QoS) {
356 *self.code = codec::SubscribeReturnCode::Success(qos)
357 }
358
359 #[inline]
360 #[doc(hidden)]
361 pub fn subscribe(&mut self, qos: QoS) {
363 self.confirm(qos)
364 }
365}
366
367#[derive(Debug)]
369pub struct Unsubscribe {
370 packet_id: NonZeroU16,
371 packet_size: u32,
372 topics: Vec<ByteString>,
373}
374
375#[derive(Debug)]
377pub(crate) struct UnsubscribeResult {
378 pub(crate) packet_id: NonZeroU16,
379}
380
381impl Unsubscribe {
382 #[doc(hidden)]
385 pub fn new(packet_id: NonZeroU16, packet_size: u32, topics: Vec<ByteString>) -> Self {
386 Self { packet_id, packet_size, topics }
387 }
388
389 pub fn packet_size(&self) -> u32 {
391 self.packet_size
392 }
393
394 pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
396 self.topics.iter()
397 }
398
399 #[inline]
400 pub fn ack(self) -> ControlAck {
402 ControlAck {
403 result: ControlAckKind::Unsubscribe(UnsubscribeResult {
404 packet_id: self.packet_id,
405 }),
406 }
407 }
408}
409
410#[derive(Debug)]
412pub struct WrBackpressure(bool);
413
414impl WrBackpressure {
415 #[inline]
416 pub fn enabled(&self) -> bool {
418 self.0
419 }
420
421 #[inline]
422 pub fn ack(self) -> ControlAck {
424 ControlAck { result: ControlAckKind::Nothing }
425 }
426}
427
428#[derive(Debug)]
430pub struct Closed;
431
432impl Closed {
433 #[inline]
434 pub fn ack(self) -> ControlAck {
436 ControlAck { result: ControlAckKind::Closed }
437 }
438}
439
440#[derive(Debug)]
441pub struct PeerGone(pub(super) Option<io::Error>);
442
443impl PeerGone {
444 pub fn err(&self) -> Option<&io::Error> {
446 self.0.as_ref()
447 }
448
449 pub fn take(&mut self) -> Option<io::Error> {
451 self.0.take()
452 }
453
454 pub fn ack(self) -> ControlAck {
455 ControlAck { result: ControlAckKind::Nothing }
456 }
457}