#[macro_use]
extern crate log;
extern crate env_logger;
extern crate redis;
extern crate paho_mqtt as mqtt;
use redis::{Client, Commands, Connection, RedisResult };
pub struct RedisPersistence {
name: String,
client: Client,
conn: Option<Connection>,
}
impl RedisPersistence {
pub fn new() -> RedisPersistence {
RedisPersistence {
name: "".to_string(),
client: Client::open("redis://localhost/").unwrap(),
conn: None,
}
}
}
impl mqtt::ClientPersistence for RedisPersistence
{
fn open(&mut self, client_id: &str, server_uri: &str) -> mqtt::MqttResult<()> {
self.name = format!("{}:{}", client_id, server_uri);
match self.client.get_connection() {
Ok(conn) => {
trace!("Redis persistence [{}]: open", self.name);
self.conn = Some(conn);
Ok(())
}
Err(e) => {
warn!("Redis persistence connect error: {:?}", e);
return Err(mqtt::PERSISTENCE_ERROR)
}
}
}
fn close(&mut self) -> mqtt::MqttResult<()> {
trace!("Client persistence [{}]: close", self.name);
self.conn = None;
Ok(())
}
fn put(&mut self, key: &str, buffers: Vec<&[u8]>) -> mqtt::MqttResult<()> {
trace!("Client persistence [{}]: put key '{}'", self.name, key);
let conn = self.conn.as_ref().unwrap(); let buf: Vec<u8> = buffers.concat();
debug!("Putting key '{}' with {} bytes", key, buf.len());
redis::cmd("HSET").arg(&self.name).arg(key).arg(buf).execute(conn);
Ok(())
}
fn get(&self, key: &str) -> mqtt::MqttResult<Vec<u8>> {
trace!("Client persistence [{}]: get key '{}'", self.name, key);
let conn = self.conn.as_ref().unwrap(); if let Ok(v) = conn.hget(&self.name, key) as RedisResult<Vec<u8>> {
debug!("Found key {} with {} bytes", key, v.len());
Ok(v)
}
else {
Err(mqtt::PERSISTENCE_ERROR)
}
}
fn remove(&mut self, key: &str) -> mqtt::MqttResult<()> {
trace!("Client persistence [{}]: remove key '{}'", self.name, key);
let conn = self.conn.as_ref().unwrap(); if let Ok(res) = conn.hdel(&self.name, key) as RedisResult<usize> {
if res != 0 {
debug!("Removed key: {}", key);
}
else {
debug!("Key not found (assuming OK): {}", key);
}
return Ok(());
}
Err(mqtt::PERSISTENCE_ERROR)
}
fn keys(&self) -> mqtt::MqttResult<Vec<String>> {
trace!("Client persistence [{}]: keys", self.name);
let conn = self.conn.as_ref().unwrap(); if let Ok(v) = conn.hkeys(&self.name) as RedisResult<Vec<String>> {
debug!("Found keys: {:?}", v);
Ok(v)
}
else {
warn!("Error looking for keys");
Err(mqtt::PERSISTENCE_ERROR)
}
}
fn clear(&mut self) -> mqtt::MqttResult<()> {
trace!("Client persistence [{}]: clear", self.name);
let conn = self.conn.as_ref().unwrap(); if let Ok(_res) = conn.del(&self.name) as RedisResult<usize> {
return Ok(());
}
Err(mqtt::PERSISTENCE_ERROR)
}
fn contains_key(&self, key: &str) -> bool {
trace!("Client persistence [{}]: contains key '{}'", self.name, key);
let conn = self.conn.as_ref().unwrap(); if let Ok(res) = conn.hexists(&self.name, key) as RedisResult<usize> {
debug!("'contains' query returned: {:?}", res);
res != 0
}
else { false }
}
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
}
}