ntex_mqtt/v5/
publish.rs

1use std::{mem, num::NonZeroU16};
2
3use ntex_bytes::{ByteString, Bytes};
4use ntex_router::Path;
5
6use crate::{error::PayloadError, payload::Payload, v5::codec};
7
8#[derive(Debug)]
9/// Publish message
10pub struct Publish {
11    pkt: codec::Publish,
12    pkt_size: u32,
13    topic: Path<ByteString>,
14    payload: Payload,
15}
16
17impl Publish {
18    /// Create a new `Publish` message from a PUBLISH
19    /// packet
20    #[doc(hidden)]
21    pub fn new(pkt: codec::Publish, payload: Payload, pkt_size: u32) -> Self {
22        Self { topic: Path::new(pkt.topic.clone()), pkt, pkt_size, payload }
23    }
24
25    #[inline]
26    /// this might be re-delivery of an earlier attempt to send the Packet.
27    pub fn dup(&self) -> bool {
28        self.pkt.dup
29    }
30
31    #[inline]
32    pub fn retain(&self) -> bool {
33        self.pkt.retain
34    }
35
36    #[inline]
37    /// the level of assurance for delivery of an Application Message.
38    pub fn qos(&self) -> codec::QoS {
39        self.pkt.qos
40    }
41
42    #[inline]
43    /// the information channel to which payload data is published.
44    pub fn publish_topic(&self) -> &str {
45        &self.pkt.topic
46    }
47
48    #[inline]
49    /// only present in PUBLISH Packets where the QoS level is 1 or 2.
50    pub fn id(&self) -> Option<NonZeroU16> {
51        self.pkt.packet_id
52    }
53
54    #[inline]
55    pub fn topic(&self) -> &Path<ByteString> {
56        &self.topic
57    }
58
59    #[inline]
60    pub fn topic_mut(&mut self) -> &mut Path<ByteString> {
61        &mut self.topic
62    }
63
64    #[inline]
65    pub fn packet(&self) -> &codec::Publish {
66        &self.pkt
67    }
68
69    #[inline]
70    pub fn packet_mut(&mut self) -> &mut codec::Publish {
71        &mut self.pkt
72    }
73
74    #[inline]
75    /// Returns size of the packet
76    pub fn packet_size(&self) -> u32 {
77        self.pkt_size
78    }
79
80    #[inline]
81    /// Returns size of the payload
82    pub fn payload_size(&self) -> usize {
83        self.pkt.payload_size as usize
84    }
85
86    #[inline]
87    /// Payload that is being published.
88    pub async fn read(&self) -> Result<Option<Bytes>, PayloadError> {
89        self.payload.read().await
90    }
91
92    #[inline]
93    /// Payload that is being published.
94    pub async fn read_all(&self) -> Result<Option<Bytes>, PayloadError> {
95        self.payload.read_all().await
96    }
97
98    /// Replace packet'a payload with empty bytes, returns existing payload.
99    pub fn take_payload(&mut self) -> Payload {
100        mem::take(&mut self.payload)
101    }
102
103    /// Create acknowledgement for this packet
104    pub fn ack(self) -> PublishAck {
105        PublishAck {
106            reason_code: codec::PublishAckReason::Success,
107            properties: codec::UserProperties::default(),
108            reason_string: None,
109        }
110    }
111
112    pub(crate) fn into_inner(self) -> (codec::Publish, Payload) {
113        (self.pkt, self.payload)
114    }
115}
116
117#[derive(Debug)]
118/// Publish ack
119pub struct PublishAck {
120    pub(crate) reason_code: codec::PublishAckReason,
121    pub(crate) properties: codec::UserProperties,
122    pub(crate) reason_string: Option<ByteString>,
123}
124
125impl PublishAck {
126    /// Create new `PublishAck` instance from a reason code.
127    pub fn new(code: codec::PublishAckReason) -> Self {
128        PublishAck {
129            reason_code: code,
130            properties: codec::UserProperties::default(),
131            reason_string: None,
132        }
133    }
134
135    /// Set Acknowledgement's Reason Code
136    #[inline]
137    pub fn reason_code(mut self, reason_code: codec::PublishAckReason) -> Self {
138        self.reason_code = reason_code;
139        self
140    }
141
142    /// Update user properties
143    #[inline]
144    pub fn properties<F>(mut self, f: F) -> Self
145    where
146        F: FnOnce(&mut codec::UserProperties),
147    {
148        f(&mut self.properties);
149        self
150    }
151
152    /// Set ack reason string
153    #[inline]
154    pub fn reason(mut self, reason: ByteString) -> Self {
155        self.reason_string = Some(reason);
156        self
157    }
158}