ntex_mqtt/v3/client/
control.rs1use std::{io, num::NonZeroU16};
2
3use ntex_bytes::Bytes;
4
5use crate::payload::Payload;
6pub use crate::v3::control::{
7 Closed, ControlAck, Disconnect, Error, PeerGone, ProtocolError, PublishRelease,
8};
9use crate::v3::{codec, control::ControlAckKind, error};
10
11#[derive(Debug)]
13pub enum Control<E> {
14 Publish(Publish),
16 PublishRelease(PublishRelease),
18 Closed(Closed),
20 Error(Error<E>),
22 ProtocolError(ProtocolError),
24 PeerGone(PeerGone),
26}
27
28impl<E> Control<E> {
29 pub(super) fn publish(pkt: codec::Publish, pl: Payload, size: u32) -> Self {
30 Control::Publish(Publish(pkt, pl, size))
31 }
32
33 pub(super) fn pubrel(packet_id: NonZeroU16) -> Self {
34 Control::PublishRelease(PublishRelease { packet_id })
35 }
36
37 pub(super) fn closed() -> Self {
38 Control::Closed(Closed)
39 }
40
41 pub(super) fn error(err: E) -> Self {
42 Control::Error(Error::new(err))
43 }
44
45 pub(super) fn proto_error(err: error::ProtocolError) -> Self {
46 Control::ProtocolError(ProtocolError::new(err))
47 }
48
49 pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
50 Control::PeerGone(PeerGone(err))
51 }
52
53 pub fn disconnect(&self) -> ControlAck {
55 ControlAck { result: ControlAckKind::Disconnect }
56 }
57
58 pub fn ack(self) -> ControlAck {
60 match self {
61 Control::Publish(msg) => msg.ack(),
62 Control::PublishRelease(msg) => msg.ack(),
63 Control::Closed(msg) => msg.ack(),
64 Control::Error(msg) => msg.ack(),
65 Control::ProtocolError(msg) => msg.ack(),
66 Control::PeerGone(msg) => msg.ack(),
67 }
68 }
69}
70
71#[derive(Debug)]
72pub struct Publish(codec::Publish, Payload, u32);
73
74impl Publish {
75 pub fn packet(&self) -> &codec::Publish {
77 &self.0
78 }
79
80 pub fn packet_mut(&mut self) -> &mut codec::Publish {
82 &mut self.0
83 }
84
85 pub fn packet_size(&self) -> u32 {
87 self.2
88 }
89
90 #[inline]
91 pub fn payload_size(&self) -> usize {
93 self.0.payload_size as usize
94 }
95
96 #[inline]
97 pub async fn read(&self) -> Result<Option<Bytes>, error::PayloadError> {
99 self.1.read().await
100 }
101
102 #[inline]
103 pub async fn read_all(&self) -> Result<Option<Bytes>, error::PayloadError> {
105 self.1.read_all().await
106 }
107
108 pub fn ack(self) -> ControlAck {
109 if let Some(id) = self.0.packet_id {
110 ControlAck { result: ControlAckKind::PublishAck(id) }
111 } else {
112 ControlAck { result: ControlAckKind::Nothing }
113 }
114 }
115
116 pub fn into_inner(self) -> (ControlAck, codec::Publish) {
117 if let Some(id) = self.0.packet_id {
118 (ControlAck { result: ControlAckKind::PublishAck(id) }, self.0)
119 } else {
120 (ControlAck { result: ControlAckKind::Nothing }, self.0)
121 }
122 }
123}