ntex_mqtt/v3/
publish.rs

1use std::{mem, num::NonZeroU16};
2
3use ntex_bytes::{ByteString, Bytes};
4use ntex_router::Path;
5use serde::de::DeserializeOwned;
6use serde_json::Error as JsonError;
7
8use crate::{error::PayloadError, payload::Payload, v3::codec};
9
10#[derive(Debug)]
11/// Publish message
12pub struct Publish {
13    pkt: codec::Publish,
14    pkt_size: u32,
15    topic: Path<ByteString>,
16    payload: Payload,
17}
18
19impl Publish {
20    /// Create a new `Publish` message from a PUBLISH
21    /// packet
22    #[doc(hidden)]
23    pub fn new(pkt: codec::Publish, payload: Payload, pkt_size: u32) -> Self {
24        Self { topic: Path::new(pkt.topic.clone()), pkt, pkt_size, payload }
25    }
26
27    #[inline]
28    /// this might be re-delivery of an earlier attempt to send the Packet.
29    pub fn dup(&self) -> bool {
30        self.pkt.dup
31    }
32
33    #[inline]
34    pub fn retain(&self) -> bool {
35        self.pkt.retain
36    }
37
38    #[inline]
39    /// the level of assurance for delivery of an Application Message.
40    pub fn qos(&self) -> codec::QoS {
41        self.pkt.qos
42    }
43
44    #[inline]
45    /// the information channel to which payload data is published.
46    pub fn publish_topic(&self) -> &str {
47        &self.pkt.topic
48    }
49
50    #[inline]
51    /// only present in PUBLISH Packets where the QoS level is 1 or 2.
52    pub fn id(&self) -> Option<NonZeroU16> {
53        self.pkt.packet_id
54    }
55
56    #[inline]
57    pub fn topic(&self) -> &Path<ByteString> {
58        &self.topic
59    }
60
61    #[inline]
62    pub fn topic_mut(&mut self) -> &mut Path<ByteString> {
63        &mut self.topic
64    }
65
66    #[inline]
67    pub fn packet(&self) -> &codec::Publish {
68        &self.pkt
69    }
70
71    #[inline]
72    pub fn packet_mut(&mut self) -> &mut codec::Publish {
73        &mut self.pkt
74    }
75
76    #[inline]
77    /// Returns size of the publish
78    pub fn packet_size(&self) -> u32 {
79        self.pkt_size
80    }
81
82    #[inline]
83    /// Returns size of the payload
84    pub fn payload_size(&self) -> usize {
85        self.pkt.payload_size as usize
86    }
87
88    #[inline]
89    /// Payload that is being published.
90    pub async fn read(&self) -> Result<Option<Bytes>, PayloadError> {
91        self.payload.read().await
92    }
93
94    #[inline]
95    /// Payload that is being published.
96    pub async fn read_all(&self) -> Result<Option<Bytes>, PayloadError> {
97        self.payload.read_all().await
98    }
99
100    /// Replace packet'a payload with empty bytes, returns existing payload.
101    pub fn take_payload(&mut self) -> Payload {
102        mem::take(&mut self.payload)
103    }
104
105    pub(super) fn into_inner(self) -> (codec::Publish, Payload, u32) {
106        (self.pkt, self.payload, self.pkt_size)
107    }
108}