paho_mqtt_redis/
lib.rs

1// mqtt.rust.redis/src/lib.rs
2//
3// Main library source file for 'mqtt-redis'.
4//
5// --------------------------------------------------------------------------
6// Copyright (c) 2017-2023 Frank Pagliughi <fpagliughi@mindspring.com>
7// All rights reserved.
8//
9// Redistribution and use in source and binary forms, with or without
10// modification, are permitted provided that the following conditions are
11// met:
12//
13// 1. Redistributions of source code must retain the above copyright notice,
14// this list of conditions and the following disclaimer.
15//
16// 2. Redistributions in binary form must reproduce the above copyright
17// notice, this list of conditions and the following disclaimer in the
18// documentation and/or other materials provided with the distribution.
19//
20// 3. Neither the name of the copyright holder nor the names of its
21// contributors may be used to endorse or promote products derived from this
22// software without specific prior written permission.
23//
24// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
25// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
26// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
27// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
28// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
29// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
30// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
31// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
32// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
33// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
34// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
35//
36
37//! This is a small example of using Redis as the persistence store for the
38//! Paho MQTT Rust client.
39//!
40//! It is an add-on library for use with the Eclipse Paho Rust MQTT Client
41//!     <https://github.com/eclipse/paho.mqtt.rust>
42//!
43//! The MQTT client library provides several mechanisms to persist QoS 1 & 2
44//! messages while they are in transit. This helps to ensure that even if the
45//! client application crashes, upon restart those messages can be retrieved
46//! from the persistence store and re-sent to the server.
47//!
48//! The Paho library contains file/disk based persistence out of the box.
49//! That is very useful, but on a Flash-based Embedded device, like an IoT
50//! gateway, but continuous writes to the flash chip will wear it out
51//! prematurely.
52//!
53//! So it would be nice to use a RAM-based cache that is outside the client
54//! application's process. An instance of Redis, running locally, is a
55//! nice solution.
56//!
57//! The Paho library allows the application to create a user-supplied
58//! persistence object and register that with the client. The object simply
59//! needs to implement the `paho_mqtt::ClientPersistence` trait. These
60//! callbacks map to the operations on a key/value store, so Redis is a
61//! perfect candidate to match the persistence API and act as a store.
62//!
63//! The MQTT callbacks map nearly 1:1 to Redis Hash commands:
64//!
65//! ```ignore
66//!      open()      -> conect
67//!      close()     -> disconnect
68//!
69//!      put()       -> HSET
70//!      get()       -> HGET
71//!      remove()    -> HDEL
72//!      keys()      -> HKEYS
73//!      clear()     -> DEL
74//!      contains_key() -> HEXISTS
75//!```
76//!
77//! NOTE: Using Redis as an MQTT persistence store is an extremely viable
78//! solution in a production IoT device or gateway, but it really only makes
79//! sense to use it if the Redis server is running locally on the device
80//! and connected via localhost or a UNIX socket. It _does not make sense_ to
81//! use a remote Redis server for this purpose.
82//!
83
84#[macro_use]
85extern crate log;
86
87use paho_mqtt as mqtt;
88use redis::{Client, Commands, Connection, RedisResult};
89
90// --------------------------------------------------------------------------
91
92/// The MQTT Redis persistence object.
93/// An instance of this stuct can be residtered with an MQTT client to hold
94/// messgaes in a Redis server until they are properly acknowledged by the
95/// remote MQTT server. An instance of this object maps to a single hash
96/// on a specific Redis server.
97pub struct RedisPersistence {
98    /// The name of the Redis hash object.
99    /// This is formed as a combination of the MQTT server name/address
100    /// and the client ID string.
101    name: String,
102    /// The Redis client
103    client: Client,
104    /// The connection to the Redis client.
105    /// This is opened and closed on instruction from the MQTT client.
106    conn: Option<Connection>,
107}
108
109impl RedisPersistence {
110    /// Create a new persistence object to connect to a local Redis server.
111    pub fn new() -> Self {
112        Self::default()
113    }
114}
115
116impl Default for RedisPersistence {
117    /// Create a new persistence object to connect to the Redis server
118    /// on localhost.
119    fn default() -> Self {
120        Self {
121            name: "".to_string(),
122            client: Client::open("redis://localhost/").unwrap(),
123            conn: None,
124        }
125    }
126}
127
128impl mqtt::ClientPersistence for RedisPersistence {
129    /// Opena the connection to the Redis client.
130    fn open(&mut self, client_id: &str, server_uri: &str) -> mqtt::Result<()> {
131        self.name = format!("{}:{}", client_id, server_uri);
132
133        match self.client.get_connection() {
134            Ok(conn) => {
135                trace!("Redis persistence [{}]: open", self.name);
136                self.conn = Some(conn);
137                Ok(())
138            }
139            Err(e) => {
140                warn!("Redis persistence connect error: {:?}", e);
141                Err(mqtt::PersistenceError)
142            }
143        }
144    }
145
146    /// Close the connection to the Redis client.
147    fn close(&mut self) -> mqtt::Result<()> {
148        trace!("Client persistence [{}]: close", self.name);
149        if let Some(conn) = self.conn.take() {
150            drop(conn);
151        }
152        trace!("Redis close complete");
153        Ok(())
154    }
155
156    /// Store a persistent value to Redis.
157    /// We get a vector of buffer references for the data to store, which we
158    /// can concatenate into a single byte buffer to send to the server.
159    fn put(&mut self, key: &str, buffers: Vec<&[u8]>) -> mqtt::Result<()> {
160        trace!("Client persistence [{}]: put key '{}'", self.name, key);
161        let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
162        let buf: Vec<u8> = buffers.concat();
163        debug!("Putting key '{}' with {} bytes", key, buf.len());
164        redis::cmd("HSET")
165            .arg(&self.name)
166            .arg(key)
167            .arg(buf)
168            .execute(conn);
169        Ok(())
170    }
171
172    /// Get the data buffer for the requested key.
173    /// Although the value sent to the server was a collection of buffers,
174    /// we can return them as a single, concatenated buffer.
175    fn get(&mut self, key: &str) -> mqtt::Result<Vec<u8>> {
176        trace!("Client persistence [{}]: get key '{}'", self.name, key);
177        let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
178        if let Ok(v) = conn.hget(&self.name, key) as RedisResult<Vec<u8>> {
179            debug!("Found key {} with {} bytes", key, v.len());
180            Ok(v)
181        } else {
182            Err(mqtt::PersistenceError)
183        }
184    }
185
186    /// Remove the value with the specified `key` from the store.
187    fn remove(&mut self, key: &str) -> mqtt::Result<()> {
188        trace!("Client persistence [{}]: remove key '{}'", self.name, key);
189        let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
190        if let Ok(res) = conn.hdel(&self.name, key) as RedisResult<usize> {
191            if res != 0 {
192                debug!("Removed key: {}", key);
193            } else {
194                debug!("Key not found (assuming OK): {}", key);
195            }
196            // Either way, if key is not in the store we report success.
197            return Ok(());
198        }
199        Err(mqtt::PersistenceError)
200    }
201
202    /// Return a collection of all the keys in the store for this client.
203    fn keys(&mut self) -> mqtt::Result<Vec<String>> {
204        trace!("Client persistence [{}]: keys", self.name);
205        let conn = self.conn.as_mut().ok_or(mqtt::PersistenceError)?;
206        if let Ok(v) = conn.hkeys(&self.name) as RedisResult<Vec<String>> {
207            debug!("Found keys: {:?}", v);
208            Ok(v)
209        } else {
210            warn!("Error looking for keys");
211            Err(mqtt::PersistenceError)
212        }
213    }
214
215    /// Remove all the data for this client from the store.
216    fn clear(&mut self) -> mqtt::Result<()> {
217        trace!("Client persistence [{}]: clear", self.name);
218        let conn = self.conn.as_mut().unwrap(); // TODO: Check for error?
219        if let Ok(_res) = conn.del(&self.name) as RedisResult<usize> {
220            // res==1 means hash/store deleted, 0 means it wasn't found.
221            // Either way, it's gone, so return success
222            return Ok(());
223        }
224        Err(mqtt::PersistenceError)
225    }
226
227    /// Determines if the store for this client contains the specified `key`.
228    fn contains_key(&mut self, key: &str) -> bool {
229        trace!("Client persistence [{}]: contains key '{}'", self.name, key);
230        let conn = match self.conn.as_mut() {
231            Some(conn) => conn,
232            None => return false,
233        };
234        if let Ok(res) = conn.hexists(&self.name, key) as RedisResult<usize> {
235            debug!("'contains' query returned: {:?}", res);
236            res != 0
237        } else {
238            false
239        }
240    }
241}