paho_mqtt_redis/lib.rs
1// mqtt.rust.redis/src/lib.rs
2//
3// Main library source file for 'mqtt-redis'.
4//
5// --------------------------------------------------------------------------
6// Copyright (c) 2017-2023 Frank Pagliughi <fpagliughi@mindspring.com>
7// All rights reserved.
8//
9// Redistribution and use in source and binary forms, with or without
10// modification, are permitted provided that the following conditions are
11// met:
12//
13// 1. Redistributions of source code must retain the above copyright notice,
14// this list of conditions and the following disclaimer.
15//
16// 2. Redistributions in binary form must reproduce the above copyright
17// notice, this list of conditions and the following disclaimer in the
18// documentation and/or other materials provided with the distribution.
19//
20// 3. Neither the name of the copyright holder nor the names of its
21// contributors may be used to endorse or promote products derived from this
22// software without specific prior written permission.
23//
24// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
25// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
26// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
28// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35//
36
37//! This is a small example of using Redis as the persistence store for the
38//! Paho MQTT Rust client.
39//!
40//! It is an add-on library for use with the Eclipse Paho Rust MQTT Client
41//! <https://github.com/eclipse/paho.mqtt.rust>
42//!
43//! The MQTT client library provides several mechanisms to persist QoS 1 & 2
44//! messages while they are in transit. This helps to ensure that even if the
45//! client application crashes, upon restart those messages can be retrieved
46//! from the persistence store and re-sent to the server.
47//!
48//! The Paho library contains file/disk based persistence out of the box.
49//! That is very useful, but on a Flash-based Embedded device, like an IoT
50//! gateway, but continuous writes to the flash chip will wear it out
51//! prematurely.
52//!
53//! So it would be nice to use a RAM-based cache that is outside the client
54//! application's process. An instance of Redis, running locally, is a
55//! nice solution.
56//!
57//! The Paho library allows the application to create a user-supplied
58//! persistence object and register that with the client. The object simply
59//! needs to implement the `paho_mqtt::ClientPersistence` trait. These
60//! callbacks map to the operations on a key/value store, so Redis is a
61//! perfect candidate to match the persistence API and act as a store.
62//!
63//! The MQTT callbacks map nearly 1:1 to Redis Hash commands:
64//!
65//! ```ignore
66//! open() -> conect
67//! close() -> disconnect
68//!
69//! put() -> HSET
70//! get() -> HGET
71//! remove() -> HDEL
72//! keys() -> HKEYS
73//! clear() -> DEL
74//! contains_key() -> HEXISTS
75//!```
76//!
77//! NOTE: Using Redis as an MQTT persistence store is an extremely viable
78//! solution in a production IoT device or gateway, but it really only makes
79//! sense to use it if the Redis server is running locally on the device
80//! and connected via localhost or a UNIX socket. It _does not make sense_ to
81//! use a remote Redis server for this purpose.
82//!
83
84#[macro_use]
85extern crate log;
86
87use paho_mqtt as mqtt;
88use redis::{Client, Commands, Connection, RedisResult};
89
90// --------------------------------------------------------------------------
91
92/// The MQTT Redis persistence object.
93/// An instance of this stuct can be residtered with an MQTT client to hold
94/// messgaes in a Redis server until they are properly acknowledged by the
95/// remote MQTT server. An instance of this object maps to a single hash
96/// on a specific Redis server.
97pub struct RedisPersistence {
98 /// The name of the Redis hash object.
99 /// This is formed as a combination of the MQTT server name/address
100 /// and the client ID string.
101 name: String,
102 /// The Redis client
103 client: Client,
104 /// The connection to the Redis client.
105 /// This is opened and closed on instruction from the MQTT client.
106 conn: Option<Connection>,
107}
108
109impl RedisPersistence {
110 /// Create a new persistence object to connect to a local Redis server.
111 pub fn new() -> Self {
112 Self::default()
113 }
114}
115
116impl Default for RedisPersistence {
117 /// Create a new persistence object to connect to the Redis server
118 /// on localhost.
119 fn default() -> Self {
120 Self {
121 name: "".to_string(),
122 client: Client::open("redis://localhost/").unwrap(),
123 conn: None,
124 }
125 }
126}
127
128impl mqtt::ClientPersistence for RedisPersistence {
129 /// Opena the connection to the Redis client.
130 fn open(&mut self, client_id: &str, server_uri: &str) -> mqtt::Result<()> {
131 self.name = format!("{}:{}", client_id, server_uri);
132
133 match self.client.get_connection() {
134 Ok(conn) => {
135 trace!("Redis persistence [{}]: open", self.name);
136 self.conn = Some(conn);
137 Ok(())
138 }
139 Err(e) => {
140 warn!("Redis persistence connect error: {:?}", e);
141 Err(mqtt::PersistenceError)
142 }
143 }
144 }
145
146 /// Close the connection to the Redis client.
147 fn close(&mut self) -> mqtt::Result<()> {
148 trace!("Client persistence [{}]: close", self.name);
149 if let Some(conn) = self.conn.take() {
150 drop(conn);
151 }
152 trace!("Redis close complete");
153 Ok(())
154 }
155
156 /// Store a persistent value to Redis.
157 /// We get a vector of buffer references for the data to store, which we
158 /// can concatenate into a single byte buffer to send to the server.
159 fn put(&mut self, key: &str, buffers: Vec<&[u8]>) -> mqtt::Result<()> {
160 trace!("Client persistence [{}]: put key '{}'", self.name, key);
161 let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
162 let buf: Vec<u8> = buffers.concat();
163 debug!("Putting key '{}' with {} bytes", key, buf.len());
164 redis::cmd("HSET")
165 .arg(&self.name)
166 .arg(key)
167 .arg(buf)
168 .execute(conn);
169 Ok(())
170 }
171
172 /// Get the data buffer for the requested key.
173 /// Although the value sent to the server was a collection of buffers,
174 /// we can return them as a single, concatenated buffer.
175 fn get(&mut self, key: &str) -> mqtt::Result<Vec<u8>> {
176 trace!("Client persistence [{}]: get key '{}'", self.name, key);
177 let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
178 if let Ok(v) = conn.hget(&self.name, key) as RedisResult<Vec<u8>> {
179 debug!("Found key {} with {} bytes", key, v.len());
180 Ok(v)
181 } else {
182 Err(mqtt::PersistenceError)
183 }
184 }
185
186 /// Remove the value with the specified `key` from the store.
187 fn remove(&mut self, key: &str) -> mqtt::Result<()> {
188 trace!("Client persistence [{}]: remove key '{}'", self.name, key);
189 let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
190 if let Ok(res) = conn.hdel(&self.name, key) as RedisResult<usize> {
191 if res != 0 {
192 debug!("Removed key: {}", key);
193 } else {
194 debug!("Key not found (assuming OK): {}", key);
195 }
196 // Either way, if key is not in the store we report success.
197 return Ok(());
198 }
199 Err(mqtt::PersistenceError)
200 }
201
202 /// Return a collection of all the keys in the store for this client.
203 fn keys(&mut self) -> mqtt::Result<Vec<String>> {
204 trace!("Client persistence [{}]: keys", self.name);
205 let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
206 if let Ok(v) = conn.hkeys(&self.name) as RedisResult<Vec<String>> {
207 debug!("Found keys: {:?}", v);
208 Ok(v)
209 } else {
210 warn!("Error looking for keys");
211 Err(mqtt::PersistenceError)
212 }
213 }
214
215 /// Remove all the data for this client from the store.
216 fn clear(&mut self) -> mqtt::Result<()> {
217 trace!("Client persistence [{}]: clear", self.name);
218 let conn = self.conn.as_mut().unwrap(); // TODO: Check for error?
219 if let Ok(_res) = conn.del(&self.name) as RedisResult<usize> {
220 // res==1 means hash/store deleted, 0 means it wasn't found.
221 // Either way, it's gone, so return success
222 return Ok(());
223 }
224 Err(mqtt::PersistenceError)
225 }
226
227 /// Determines if the store for this client contains the specified `key`.
228 fn contains_key(&mut self, key: &str) -> bool {
229 trace!("Client persistence [{}]: contains key '{}'", self.name, key);
230 let conn = match self.conn.as_mut() {
231 Some(conn) => conn,
232 None => return false,
233 };
234 if let Ok(res) = conn.hexists(&self.name, key) as RedisResult<usize> {
235 debug!("'contains' query returned: {:?}", res);
236 res != 0
237 } else {
238 false
239 }
240 }
241}