rust_mqtt/client/client.rs
1/*
2 * MIT License
3 *
4 * Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24
25use embedded_io::ReadReady;
26use embedded_io_async::{Read, Write};
27use heapless::Vec;
28use rand_core::RngCore;
29
30use crate::client::client_config::ClientConfig;
31use crate::packet::v5::publish_packet::QualityOfService::{self, QoS1};
32use crate::packet::v5::reason_codes::ReasonCode;
33
34use super::raw_client::{Event, RawMqttClient};
35
36pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore>
37where
38 T: Read + Write,
39{
40 raw: RawMqttClient<'a, T, MAX_PROPERTIES, R>,
41}
42
43impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
44where
45 T: Read + Write,
46 R: RngCore,
47{
48 pub fn new(
49 network_driver: T,
50 buffer: &'a mut [u8],
51 buffer_len: usize,
52 recv_buffer: &'a mut [u8],
53 recv_buffer_len: usize,
54 config: ClientConfig<'a, MAX_PROPERTIES, R>,
55 ) -> Self {
56 Self {
57 raw: RawMqttClient::new(
58 network_driver,
59 buffer,
60 buffer_len,
61 recv_buffer,
62 recv_buffer_len,
63 config,
64 ),
65 }
66 }
67
68 /// Method allows client connect to server. Client is connecting to the specified broker
69 /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
70 /// If the connection to the broker fails, method returns Err variable that contains
71 /// Reason codes returned from the broker.
72 pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
73 self.raw.connect_to_broker().await?;
74
75 match self.raw.poll::<0>().await? {
76 Event::Connack => Ok(()),
77 Event::Disconnect(reason) => Err(reason),
78 // If an application message comes at this moment, it is lost.
79 _ => Err(ReasonCode::ImplementationSpecificError),
80 }
81 }
82
83 /// Method allows client disconnect from the server. Client disconnects from the specified broker
84 /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
85 /// If the disconnect from the broker fails, method returns Err variable that contains
86 /// Reason codes returned from the broker.
87 pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
88 self.raw.disconnect().await?;
89 Ok(())
90 }
91
92 /// Method allows sending message to broker specified from the ClientConfig. Client sends the
93 /// message from the parameter `message` to the topic `topic_name` on the broker
94 /// specified in the ClientConfig. If the send fails method returns Err with reason code
95 /// received by broker.
96 pub async fn send_message<'b>(
97 &'b mut self,
98 topic_name: &'b str,
99 message: &'b [u8],
100 qos: QualityOfService,
101 retain: bool,
102 ) -> Result<(), ReasonCode> {
103 let identifier = self
104 .raw
105 .send_message(topic_name, message, qos, retain)
106 .await?;
107
108 // QoS1
109 if qos == QoS1 {
110 match self.raw.poll::<0>().await? {
111 Event::Puback(ack_identifier, matching_subscriber) => {
112 if !matching_subscriber {
113 Err(ReasonCode::NoMatchingSubscribers)
114 } else if identifier == ack_identifier {
115 Ok(())
116 } else {
117 Err(ReasonCode::PacketIdentifierNotFound)
118 }
119 }
120 Event::Disconnect(reason) => Err(reason),
121 // If an application message comes at this moment, it is lost.
122 _ => Err(ReasonCode::ImplementationSpecificError),
123 }
124 } else {
125 Ok(())
126 }
127 }
128
129 /// Method allows client subscribe to multiple topics specified in the parameter
130 /// `topic_names` on the broker specified in the `ClientConfig`. Generics `TOPICS`
131 /// sets the value of the `topics_names` vector. MQTT protocol implementation
132 /// is selected automatically.
133 pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
134 &'b mut self,
135 topic_names: &'b Vec<&'b str, TOPICS>,
136 ) -> Result<(), ReasonCode> {
137 let identifier = self.raw.subscribe_to_topics(topic_names).await?;
138
139 match self.raw.poll::<TOPICS>().await? {
140 Event::Suback(ack_identifier) => {
141 if identifier == ack_identifier {
142 Ok(())
143 } else {
144 Err(ReasonCode::PacketIdentifierNotFound)
145 }
146 }
147 Event::Disconnect(reason) => Err(reason),
148 // If an application message comes at this moment, it is lost.
149 _ => Err(ReasonCode::ImplementationSpecificError),
150 }
151 }
152
153 /// Method allows client unsubscribe from the topic specified in the parameter
154 /// `topic_name` on the broker from the `ClientConfig`. MQTT protocol implementation
155 /// is selected automatically.
156 pub async fn unsubscribe_from_topic<'b>(
157 &'b mut self,
158 topic_name: &'b str,
159 ) -> Result<(), ReasonCode> {
160 let identifier = self.raw.unsubscribe_from_topic(topic_name).await?;
161
162 match self.raw.poll::<0>().await? {
163 Event::Unsuback(ack_identifier) => {
164 if identifier == ack_identifier {
165 Ok(())
166 } else {
167 Err(ReasonCode::PacketIdentifierNotFound)
168 }
169 }
170 Event::Disconnect(reason) => Err(reason),
171 // If an application message comes at this moment, it is lost.
172 _ => Err(ReasonCode::ImplementationSpecificError),
173 }
174 }
175
176 /// Method allows client subscribe to multiple topics specified in the parameter
177 /// `topic_name` on the broker specified in the `ClientConfig`. MQTT protocol implementation
178 /// is selected automatically.
179 pub async fn subscribe_to_topic<'b>(
180 &'b mut self,
181 topic_name: &'b str,
182 ) -> Result<(), ReasonCode> {
183 let mut topic_names = Vec::<&'b str, 1>::new();
184 topic_names.push(topic_name).unwrap();
185
186 let identifier = self.raw.subscribe_to_topics(&topic_names).await?;
187
188 match self.raw.poll::<1>().await? {
189 Event::Suback(ack_identifier) => {
190 if identifier == ack_identifier {
191 Ok(())
192 } else {
193 Err(ReasonCode::PacketIdentifierNotFound)
194 }
195 }
196 Event::Disconnect(reason) => Err(reason),
197 // If an application message comes at this moment, it is lost.
198 _ => Err(ReasonCode::ImplementationSpecificError),
199 }
200 }
201
202 /// Method allows client receive a message. The work of this method strictly depends on the
203 /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet
204 /// from the broker.
205 pub async fn receive_message<'b>(&'b mut self) -> Result<(&'b str, &'b [u8]), ReasonCode> {
206 match self.raw.poll::<0>().await? {
207 Event::Message(topic, payload) => Ok((topic, payload)),
208 Event::Disconnect(reason) => Err(reason),
209 // If an application message comes at this moment, it is lost.
210 _ => Err(ReasonCode::ImplementationSpecificError),
211 }
212 }
213
214 /// Method allows client send PING message to the broker specified in the `ClientConfig`.
215 /// If there is expectation for long running connection. Method should be executed
216 /// regularly by the timer that counts down the session expiry interval.
217 pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
218 self.raw.send_ping().await?;
219
220 match self.raw.poll::<0>().await? {
221 Event::Pingresp => Ok(()),
222 Event::Disconnect(reason) => Err(reason),
223 // If an application message comes at this moment, it is lost.
224 _ => Err(ReasonCode::ImplementationSpecificError),
225 }
226 }
227}
228
229impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
230where
231 T: Read + Write + ReadReady,
232 R: RngCore,
233{
234 /// Receive a message if one is ready. The work of this method strictly depends on the
235 /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet
236 /// from the broker.
237 pub async fn receive_message_if_ready<'b>(
238 &'b mut self,
239 ) -> Result<Option<(&'b str, &'b [u8])>, ReasonCode> {
240 match self.raw.poll_if_ready::<0>().await? {
241 None => Ok(None),
242 Some(Event::Message(topic, payload)) => Ok(Some((topic, payload))),
243 Some(Event::Disconnect(reason)) => Err(reason),
244 // If an application message comes at this moment, it is lost.
245 _ => Err(ReasonCode::ImplementationSpecificError),
246 }
247 }
248}