gmqtt_client/client/
message.rs1use std::time::Duration;
2
3use bytes::Bytes;
4use log::error;
5use mqttbytes::{
6 v5::{Publish, PublishProperties},
7 QoS,
8};
9use std::borrow::Cow;
10
11#[derive(Debug)]
12pub struct MessageBuilder {
13 qos: QoS,
14 retain: bool,
15 topic: String,
16 properties: PublishProperties,
17 payload: Bytes,
18}
19
20#[derive(Debug, Clone)]
21pub struct Message {
22 pub(crate) qos: QoS,
23 pub(crate) retain: bool,
24 pub(crate) topic: String,
25 pub(crate) properties: PublishProperties,
26 pub(crate) payload: Bytes,
27}
28
29impl MessageBuilder {
30 pub fn new() -> Self {
31 Self {
32 qos: QoS::AtMostOnce,
33 retain: false,
34 topic: String::new(),
35 properties: publish_properties_default(),
36 payload: Bytes::new(),
37 }
38 }
39
40 pub fn qos(mut self, qos: QoS) -> Self {
41 if qos == QoS::ExactlyOnce {
42 panic!("Quantity of Service 2 (Exactly Once) is not supported");
44 }
45
46 self.qos = qos;
47 self
48 }
49
50 pub fn payload<B: Into<Bytes>>(mut self, payload: B) -> Self {
51 self.payload = payload.into();
52 self
53 }
54
55 pub fn topic<S: Into<String>>(mut self, topic: S) -> Self {
56 self.topic = topic.into();
57 self
58 }
59
60 pub fn retained(mut self, retain: bool) -> Self {
61 self.retain = retain;
62 self
63 }
64
65 pub fn message_expiry_interval(mut self, expiry_interval: Option<Duration>) -> Self {
66 let message_expiry_interval = expiry_interval.map(|d| {
67 let expiry_interval_secs = d.as_secs();
68
69 if expiry_interval_secs > u32::MAX as u64 {
70 error!("Expiry interval too long, using std::u32::MAX");
71 return u32::MAX;
72 }
73
74 expiry_interval_secs as u32
75 });
76
77 self.properties.message_expiry_interval = message_expiry_interval;
78 self
79 }
80
81 pub fn response_topic<S: Into<String>>(mut self, response_topic: Option<S>) -> Self {
82 self.properties.response_topic = response_topic.map(|t| t.into());
83 self
84 }
85
86 pub fn utf8(mut self, is_utf8: bool) -> Self {
88 if is_utf8 {
89 self.properties.payload_format_indicator = Some(1);
90 } else {
91 self.properties.payload_format_indicator = None;
92 }
93 self
94 }
95
96 pub fn add_user_property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
97 self.properties
98 .user_properties
99 .push((key.into(), value.into()));
100 self
101 }
102
103 pub fn correlation_data<B: Into<Bytes>>(mut self, correlation_data: Option<B>) -> Self {
104 self.properties.correlation_data = correlation_data.map(|c| c.into());
105 self
106 }
107
108 pub fn finalize(self) -> Message {
109 Message {
110 qos: self.qos,
111 retain: self.retain,
112 topic: self.topic,
113 properties: self.properties,
114 payload: self.payload,
115 }
116 }
117}
118
119impl Default for MessageBuilder {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl Message {
126 pub fn qos(&self) -> QoS {
127 self.qos
128 }
129
130 pub fn retain(&self) -> bool {
131 self.retain
132 }
133
134 pub fn topic(&self) -> &str {
135 &self.topic
136 }
137
138 pub fn properties(&self) -> &PublishProperties {
139 &self.properties
140 }
141
142 pub fn payload(&self) -> &[u8] {
143 &self.payload
144 }
145
146 pub fn payload_str(&self) -> Cow<'_, str> {
147 String::from_utf8_lossy(&self.payload)
148 }
149}
150
151#[cfg(feature = "json")]
152impl<'a> Message {
153 pub fn payload_json<D: serde::Deserialize<'a>>(&'a self) -> serde_json::error::Result<D> {
154 serde_json::from_slice::<D>(&self.payload)
155 }
156}
157
158impl From<Publish> for Message {
159 fn from(publish: Publish) -> Self {
160 let properties = publish
161 .properties
162 .unwrap_or_else(publish_properties_default);
163
164 Self {
165 qos: publish.qos,
166 retain: publish.retain,
167 topic: publish.topic,
168 properties,
169 payload: publish.payload,
170 }
171 }
172}
173
174fn publish_properties_default() -> PublishProperties {
175 PublishProperties {
176 payload_format_indicator: None,
177 message_expiry_interval: None,
178 topic_alias: None,
179 response_topic: None,
180 correlation_data: None,
181 user_properties: Vec::new(),
182 subscription_identifiers: Vec::new(),
183 content_type: None,
184 }
185}