actix_mqtt/
publish.rs

1use std::convert::TryFrom;
2
3use actix_router::Path;
4use bytes::Bytes;
5use bytestring::ByteString;
6use mqtt_codec as mqtt;
7use serde::de::DeserializeOwned;
8use serde_json::Error as JsonError;
9
10use crate::dispatcher::MqttState;
11use crate::sink::MqttSink;
12
13/// Publish message
14pub struct Publish<S> {
15    publish: mqtt::Publish,
16    sink: MqttSink,
17    state: MqttState<S>,
18    topic: Path<ByteString>,
19    query: Option<ByteString>,
20}
21
22impl<S> Publish<S> {
23    pub(crate) fn new(state: MqttState<S>, publish: mqtt::Publish) -> Self {
24        let (topic, query) = if let Some(pos) = publish.topic.find('?') {
25            (
26                ByteString::try_from(publish.topic.get_ref().slice(0..pos)).unwrap(),
27                Some(
28                    ByteString::try_from(
29                        publish.topic.get_ref().slice(pos + 1..publish.topic.len()),
30                    )
31                    .unwrap(),
32                ),
33            )
34        } else {
35            (publish.topic.clone(), None)
36        };
37        let topic = Path::new(topic);
38        let sink = state.sink().clone();
39        Self {
40            sink,
41            publish,
42            state,
43            topic,
44            query,
45        }
46    }
47
48    #[inline]
49    /// this might be re-delivery of an earlier attempt to send the Packet.
50    pub fn dup(&self) -> bool {
51        self.publish.dup
52    }
53
54    #[inline]
55    pub fn retain(&self) -> bool {
56        self.publish.retain
57    }
58
59    #[inline]
60    /// the level of assurance for delivery of an Application Message.
61    pub fn qos(&self) -> mqtt::QoS {
62        self.publish.qos
63    }
64
65    #[inline]
66    /// the information channel to which payload data is published.
67    pub fn publish_topic(&self) -> &str {
68        &self.publish.topic
69    }
70
71    #[inline]
72    /// returns reference to a connection session
73    pub fn session(&self) -> &S {
74        self.state.session()
75    }
76
77    #[inline]
78    /// returns mutable reference to a connection session
79    pub fn session_mut(&mut self) -> &mut S {
80        self.state.session_mut()
81    }
82
83    #[inline]
84    /// only present in PUBLISH Packets where the QoS level is 1 or 2.
85    pub fn id(&self) -> Option<u16> {
86        self.publish.packet_id
87    }
88
89    #[inline]
90    pub fn topic(&self) -> &Path<ByteString> {
91        &self.topic
92    }
93
94    #[inline]
95    pub fn topic_mut(&mut self) -> &mut Path<ByteString> {
96        &mut self.topic
97    }
98
99    #[inline]
100    pub fn query(&self) -> &str {
101        self.query.as_ref().map(|s| s.as_ref()).unwrap_or("")
102    }
103
104    #[inline]
105    pub fn packet(&self) -> &mqtt::Publish {
106        &self.publish
107    }
108
109    #[inline]
110    /// the Application Message that is being published.
111    pub fn payload(&self) -> &Bytes {
112        &self.publish.payload
113    }
114
115    /// Extract Bytes from packet payload
116    pub fn take_payload(&self) -> Bytes {
117        self.publish.payload.clone()
118    }
119
120    #[inline]
121    /// Mqtt client sink object
122    pub fn sink(&self) -> &MqttSink {
123        &self.sink
124    }
125
126    /// Loads and parse `application/json` encoded body.
127    pub fn json<T: DeserializeOwned>(&mut self) -> Result<T, JsonError> {
128        serde_json::from_slice(&self.publish.payload)
129    }
130}
131
132impl<S> std::fmt::Debug for Publish<S> {
133    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
134        self.publish.fmt(f)
135    }
136}