gmqtt_client/client/
message.rs

1use std::time::Duration;
2
3use bytes::Bytes;
4use log::error;
5use mqttbytes::{
6    v5::{Publish, PublishProperties},
7    QoS,
8};
9use std::borrow::Cow;
10
11#[derive(Debug)]
12pub struct MessageBuilder {
13    qos: QoS,
14    retain: bool,
15    topic: String,
16    properties: PublishProperties,
17    payload: Bytes,
18}
19
20#[derive(Debug, Clone)]
21pub struct Message {
22    pub(crate) qos: QoS,
23    pub(crate) retain: bool,
24    pub(crate) topic: String,
25    pub(crate) properties: PublishProperties,
26    pub(crate) payload: Bytes,
27}
28
29impl MessageBuilder {
30    pub fn new() -> Self {
31        Self {
32            qos: QoS::AtMostOnce,
33            retain: false,
34            topic: String::new(),
35            properties: publish_properties_default(),
36            payload: Bytes::new(),
37        }
38    }
39
40    pub fn qos(mut self, qos: QoS) -> Self {
41        if qos == QoS::ExactlyOnce {
42            // TODO: Add QoS 2 support for outgoing messages
43            panic!("Quantity of Service 2 (Exactly Once) is not supported");
44        }
45
46        self.qos = qos;
47        self
48    }
49
50    pub fn payload<B: Into<Bytes>>(mut self, payload: B) -> Self {
51        self.payload = payload.into();
52        self
53    }
54
55    pub fn topic<S: Into<String>>(mut self, topic: S) -> Self {
56        self.topic = topic.into();
57        self
58    }
59
60    pub fn retained(mut self, retain: bool) -> Self {
61        self.retain = retain;
62        self
63    }
64
65    pub fn message_expiry_interval(mut self, expiry_interval: Option<Duration>) -> Self {
66        let message_expiry_interval = expiry_interval.map(|d| {
67            let expiry_interval_secs = d.as_secs();
68
69            if expiry_interval_secs > u32::MAX as u64 {
70                error!("Expiry interval too long, using std::u32::MAX");
71                return u32::MAX;
72            }
73
74            expiry_interval_secs as u32
75        });
76
77        self.properties.message_expiry_interval = message_expiry_interval;
78        self
79    }
80
81    pub fn response_topic<S: Into<String>>(mut self, response_topic: Option<S>) -> Self {
82        self.properties.response_topic = response_topic.map(|t| t.into());
83        self
84    }
85
86    /// Set Payload Format Indicator to 1 (UTF-8 Encoded Character Data) or None (unspecified bytes)
87    pub fn utf8(mut self, is_utf8: bool) -> Self {
88        if is_utf8 {
89            self.properties.payload_format_indicator = Some(1);
90        } else {
91            self.properties.payload_format_indicator = None;
92        }
93        self
94    }
95
96    pub fn add_user_property<K: Into<String>, V: Into<String>>(mut self, key: K, value: V) -> Self {
97        self.properties
98            .user_properties
99            .push((key.into(), value.into()));
100        self
101    }
102
103    pub fn correlation_data<B: Into<Bytes>>(mut self, correlation_data: Option<B>) -> Self {
104        self.properties.correlation_data = correlation_data.map(|c| c.into());
105        self
106    }
107
108    pub fn finalize(self) -> Message {
109        Message {
110            qos: self.qos,
111            retain: self.retain,
112            topic: self.topic,
113            properties: self.properties,
114            payload: self.payload,
115        }
116    }
117}
118
119impl Default for MessageBuilder {
120    fn default() -> Self {
121        Self::new()
122    }
123}
124
125impl Message {
126    pub fn qos(&self) -> QoS {
127        self.qos
128    }
129
130    pub fn retain(&self) -> bool {
131        self.retain
132    }
133
134    pub fn topic(&self) -> &str {
135        &self.topic
136    }
137
138    pub fn properties(&self) -> &PublishProperties {
139        &self.properties
140    }
141
142    pub fn payload(&self) -> &[u8] {
143        &self.payload
144    }
145
146    pub fn payload_str(&self) -> Cow<'_, str> {
147        String::from_utf8_lossy(&self.payload)
148    }
149}
150
151#[cfg(feature = "json")]
152impl<'a> Message {
153    pub fn payload_json<D: serde::Deserialize<'a>>(&'a self) -> serde_json::error::Result<D> {
154        serde_json::from_slice::<D>(&self.payload)
155    }
156}
157
158impl From<Publish> for Message {
159    fn from(publish: Publish) -> Self {
160        let properties = publish
161            .properties
162            .unwrap_or_else(publish_properties_default);
163
164        Self {
165            qos: publish.qos,
166            retain: publish.retain,
167            topic: publish.topic,
168            properties,
169            payload: publish.payload,
170        }
171    }
172}
173
174fn publish_properties_default() -> PublishProperties {
175    PublishProperties {
176        payload_format_indicator: None,
177        message_expiry_interval: None,
178        topic_alias: None,
179        response_topic: None,
180        correlation_data: None,
181        user_properties: Vec::new(),
182        subscription_identifiers: Vec::new(),
183        content_type: None,
184    }
185}