1use std::convert::TryFrom;
2
3use actix_router::Path;
4use bytes::Bytes;
5use bytestring::ByteString;
6use mqtt_codec as mqtt;
7use serde::de::DeserializeOwned;
8use serde_json::Error as JsonError;
9
10use crate::dispatcher::MqttState;
11use crate::sink::MqttSink;
12
13pub struct Publish<S> {
15 publish: mqtt::Publish,
16 sink: MqttSink,
17 state: MqttState<S>,
18 topic: Path<ByteString>,
19 query: Option<ByteString>,
20}
21
22impl<S> Publish<S> {
23 pub(crate) fn new(state: MqttState<S>, publish: mqtt::Publish) -> Self {
24 let (topic, query) = if let Some(pos) = publish.topic.find('?') {
25 (
26 ByteString::try_from(publish.topic.get_ref().slice(0..pos)).unwrap(),
27 Some(
28 ByteString::try_from(
29 publish.topic.get_ref().slice(pos + 1..publish.topic.len()),
30 )
31 .unwrap(),
32 ),
33 )
34 } else {
35 (publish.topic.clone(), None)
36 };
37 let topic = Path::new(topic);
38 let sink = state.sink().clone();
39 Self {
40 sink,
41 publish,
42 state,
43 topic,
44 query,
45 }
46 }
47
48 #[inline]
49 pub fn dup(&self) -> bool {
51 self.publish.dup
52 }
53
54 #[inline]
55 pub fn retain(&self) -> bool {
56 self.publish.retain
57 }
58
59 #[inline]
60 pub fn qos(&self) -> mqtt::QoS {
62 self.publish.qos
63 }
64
65 #[inline]
66 pub fn publish_topic(&self) -> &str {
68 &self.publish.topic
69 }
70
71 #[inline]
72 pub fn session(&self) -> &S {
74 self.state.session()
75 }
76
77 #[inline]
78 pub fn session_mut(&mut self) -> &mut S {
80 self.state.session_mut()
81 }
82
83 #[inline]
84 pub fn id(&self) -> Option<u16> {
86 self.publish.packet_id
87 }
88
89 #[inline]
90 pub fn topic(&self) -> &Path<ByteString> {
91 &self.topic
92 }
93
94 #[inline]
95 pub fn topic_mut(&mut self) -> &mut Path<ByteString> {
96 &mut self.topic
97 }
98
99 #[inline]
100 pub fn query(&self) -> &str {
101 self.query.as_ref().map(|s| s.as_ref()).unwrap_or("")
102 }
103
104 #[inline]
105 pub fn packet(&self) -> &mqtt::Publish {
106 &self.publish
107 }
108
109 #[inline]
110 pub fn payload(&self) -> &Bytes {
112 &self.publish.payload
113 }
114
115 pub fn take_payload(&self) -> Bytes {
117 self.publish.payload.clone()
118 }
119
120 #[inline]
121 pub fn sink(&self) -> &MqttSink {
123 &self.sink
124 }
125
126 pub fn json<T: DeserializeOwned>(&mut self) -> Result<T, JsonError> {
128 serde_json::from_slice(&self.publish.payload)
129 }
130}
131
132impl<S> std::fmt::Debug for Publish<S> {
133 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
134 self.publish.fmt(f)
135 }
136}