rust_mqtt/client/client.rs
1/*
2 * MIT License
3 *
4 * Copyright (c) [2022] [Ondrej Babec <ond.babec@gmail.com>]
5 *
6 * Permission is hereby granted, free of charge, to any person obtaining a copy
7 * of this software and associated documentation files (the "Software"), to deal
8 * in the Software without restriction, including without limitation the rights
9 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 * copies of the Software, and to permit persons to whom the Software is
11 * furnished to do so, subject to the following conditions:
12 *
13 * The above copyright notice and this permission notice shall be included in all
14 * copies or substantial portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 * SOFTWARE.
23 */
24
25use embedded_io_async::{Read, Write};
26use heapless::Vec;
27use rand_core::RngCore;
28
29use crate::client::client_config::ClientConfig;
30use crate::packet::v5::publish_packet::QualityOfService::{self, QoS1};
31use crate::packet::v5::reason_codes::ReasonCode;
32
33use super::raw_client::{Event, RawMqttClient};
34
35pub struct MqttClient<'a, T, const MAX_PROPERTIES: usize, R: RngCore>
36where
37 T: Read + Write,
38{
39 raw: RawMqttClient<'a, T, MAX_PROPERTIES, R>,
40}
41
42impl<'a, T, const MAX_PROPERTIES: usize, R> MqttClient<'a, T, MAX_PROPERTIES, R>
43where
44 T: Read + Write,
45 R: RngCore,
46{
47 pub fn new(
48 network_driver: T,
49 buffer: &'a mut [u8],
50 buffer_len: usize,
51 recv_buffer: &'a mut [u8],
52 recv_buffer_len: usize,
53 config: ClientConfig<'a, MAX_PROPERTIES, R>,
54 ) -> Self {
55 Self {
56 raw: RawMqttClient::new(
57 network_driver,
58 buffer,
59 buffer_len,
60 recv_buffer,
61 recv_buffer_len,
62 config,
63 ),
64 }
65 }
66
67 /// Method allows client connect to server. Client is connecting to the specified broker
68 /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
69 /// If the connection to the broker fails, method returns Err variable that contains
70 /// Reason codes returned from the broker.
71 pub async fn connect_to_broker<'b>(&'b mut self) -> Result<(), ReasonCode> {
72 self.raw.connect_to_broker().await?;
73
74 match self.raw.poll::<0>().await? {
75 Event::Connack => Ok(()),
76 Event::Disconnect(reason) => Err(reason),
77 // If an application message comes at this moment, it is lost.
78 _ => Err(ReasonCode::ImplementationSpecificError),
79 }
80 }
81
82 /// Method allows client disconnect from the server. Client disconnects from the specified broker
83 /// in the `ClientConfig`. Method selects proper implementation of the MQTT version based on the config.
84 /// If the disconnect from the broker fails, method returns Err variable that contains
85 /// Reason codes returned from the broker.
86 pub async fn disconnect<'b>(&'b mut self) -> Result<(), ReasonCode> {
87 self.raw.disconnect().await?;
88 Ok(())
89 }
90
91 /// Method allows sending message to broker specified from the ClientConfig. Client sends the
92 /// message from the parameter `message` to the topic `topic_name` on the broker
93 /// specified in the ClientConfig. If the send fails method returns Err with reason code
94 /// received by broker.
95 pub async fn send_message<'b>(
96 &'b mut self,
97 topic_name: &'b str,
98 message: &'b [u8],
99 qos: QualityOfService,
100 retain: bool,
101 ) -> Result<(), ReasonCode> {
102 let identifier = self
103 .raw
104 .send_message(topic_name, message, qos, retain)
105 .await?;
106
107 // QoS1
108 if qos == QoS1 {
109 match self.raw.poll::<0>().await? {
110 Event::Puback(ack_identifier) => {
111 if identifier == ack_identifier {
112 Ok(())
113 } else {
114 Err(ReasonCode::PacketIdentifierNotFound)
115 }
116 }
117 Event::Disconnect(reason) => Err(reason),
118 // If an application message comes at this moment, it is lost.
119 _ => Err(ReasonCode::ImplementationSpecificError),
120 }
121 } else {
122 Ok(())
123 }
124 }
125
126 /// Method allows client subscribe to multiple topics specified in the parameter
127 /// `topic_names` on the broker specified in the `ClientConfig`. Generics `TOPICS`
128 /// sets the value of the `topics_names` vector. MQTT protocol implementation
129 /// is selected automatically.
130 pub async fn subscribe_to_topics<'b, const TOPICS: usize>(
131 &'b mut self,
132 topic_names: &'b Vec<&'b str, TOPICS>,
133 ) -> Result<(), ReasonCode> {
134 let identifier = self.raw.subscribe_to_topics(topic_names).await?;
135
136 match self.raw.poll::<TOPICS>().await? {
137 Event::Suback(ack_identifier) => {
138 if identifier == ack_identifier {
139 Ok(())
140 } else {
141 Err(ReasonCode::PacketIdentifierNotFound)
142 }
143 }
144 Event::Disconnect(reason) => Err(reason),
145 // If an application message comes at this moment, it is lost.
146 _ => Err(ReasonCode::ImplementationSpecificError),
147 }
148 }
149
150 /// Method allows client unsubscribe from the topic specified in the parameter
151 /// `topic_name` on the broker from the `ClientConfig`. MQTT protocol implementation
152 /// is selected automatically.
153 pub async fn unsubscribe_from_topic<'b>(
154 &'b mut self,
155 topic_name: &'b str,
156 ) -> Result<(), ReasonCode> {
157 let identifier = self.raw.unsubscribe_from_topic(topic_name).await?;
158
159 match self.raw.poll::<0>().await? {
160 Event::Unsuback(ack_identifier) => {
161 if identifier == ack_identifier {
162 Ok(())
163 } else {
164 Err(ReasonCode::PacketIdentifierNotFound)
165 }
166 }
167 Event::Disconnect(reason) => Err(reason),
168 // If an application message comes at this moment, it is lost.
169 _ => Err(ReasonCode::ImplementationSpecificError),
170 }
171 }
172
173 /// Method allows client subscribe to multiple topics specified in the parameter
174 /// `topic_name` on the broker specified in the `ClientConfig`. MQTT protocol implementation
175 /// is selected automatically.
176 pub async fn subscribe_to_topic<'b>(
177 &'b mut self,
178 topic_name: &'b str,
179 ) -> Result<(), ReasonCode> {
180 let mut topic_names = Vec::<&'b str, 1>::new();
181 topic_names.push(topic_name).unwrap();
182
183 let identifier = self.raw.subscribe_to_topics(&topic_names).await?;
184
185 match self.raw.poll::<1>().await? {
186 Event::Suback(ack_identifier) => {
187 if identifier == ack_identifier {
188 Ok(())
189 } else {
190 Err(ReasonCode::PacketIdentifierNotFound)
191 }
192 }
193 Event::Disconnect(reason) => Err(reason),
194 // If an application message comes at this moment, it is lost.
195 _ => Err(ReasonCode::ImplementationSpecificError),
196 }
197 }
198
199 /// Method allows client receive a message. The work of this method strictly depends on the
200 /// network implementation passed in the `ClientConfig`. It expects the PUBLISH packet
201 /// from the broker.
202 pub async fn receive_message<'b>(&'b mut self) -> Result<(&'b str, &'b [u8]), ReasonCode> {
203 match self.raw.poll::<0>().await? {
204 Event::Message(topic, payload) => Ok((topic, payload)),
205 Event::Disconnect(reason) => Err(reason),
206 // If an application message comes at this moment, it is lost.
207 _ => Err(ReasonCode::ImplementationSpecificError),
208 }
209 }
210
211 /// Method allows client send PING message to the broker specified in the `ClientConfig`.
212 /// If there is expectation for long running connection. Method should be executed
213 /// regularly by the timer that counts down the session expiry interval.
214 pub async fn send_ping<'b>(&'b mut self) -> Result<(), ReasonCode> {
215 self.raw.send_ping().await?;
216
217 match self.raw.poll::<0>().await? {
218 Event::Pingresp => Ok(()),
219 Event::Disconnect(reason) => Err(reason),
220 // If an application message comes at this moment, it is lost.
221 _ => Err(ReasonCode::ImplementationSpecificError),
222 }
223 }
224}