#![allow(unused_mut)]
use parking_lot::{
RwLock
};
use std::sync::Arc;
use std::ops::Deref;
use std::ops::DerefMut;
use std::hash::Hash;
use std::fmt;
use std::collections::{
VecDeque,
HashMap
};
use futures::{
Future,
Stream
};
use futures::sync::oneshot::{
channel as oneshot_channel
};
use futures::sync::mpsc::{
UnboundedSender,
unbounded
};
use ::error::*;
use ::types::*;
use ::RedisClient;
#[cfg(feature="metrics")]
use ::metrics;
use ::metrics::{
SizeTracker,
LatencyTracker
};
#[cfg(feature="metrics")]
use ::metrics::{
LatencyMetrics,
SizeMetrics
};
use super::utils;
use super::borrowed;
use super::borrowed::RedisClientRemote as RedisClientBorrowed;
use super::commands::ConnectSender;
#[derive(Clone)]
pub struct RedisClientRemote {
state: Arc<RwLock<Arc<RwLock<ClientState>>>>,
borrowed: Arc<RwLock<Option<RedisClientBorrowed>>>,
connect_tx: Arc<RwLock<VecDeque<ConnectSender>>>,
error_tx: Arc<RwLock<VecDeque<UnboundedSender<RedisError>>>>,
message_tx: Arc<RwLock<VecDeque<UnboundedSender<(String, RedisValue)>>>>,
latency_stats: Arc<RwLock<Option<Arc<RwLock<LatencyTracker>>>>>,
size_stats: Arc<RwLock<Option<Arc<RwLock<SizeTracker>>>>>
}
impl fmt::Debug for RedisClientRemote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[RedisClientRemote]")
}
}
impl RedisClientRemote {
pub fn new() -> RedisClientRemote {
RedisClientRemote {
state: utils::init_state(),
borrowed: Arc::new(RwLock::new(None)),
connect_tx: Arc::new(RwLock::new(VecDeque::new())),
error_tx: Arc::new(RwLock::new(VecDeque::new())),
message_tx: Arc::new(RwLock::new(VecDeque::new())),
latency_stats: Arc::new(RwLock::new(None)),
size_stats: Arc::new(RwLock::new(None))
}
}
pub fn from_borrowed(client: borrowed::RedisClientRemote) -> RedisClientRemote {
let connect_tx = client.read_connect_tx().clone();
let error_tx = client.read_error_tx().clone();
let message_tx = client.read_message_tx().clone();
let (latency, size) = {
let (latency, size) = client.read_metrics_trackers();
(latency.clone(), size.clone())
};
RedisClientRemote {
state: client.state_cloned(),
borrowed: Arc::new(RwLock::new(Some(client))),
latency_stats: latency,
size_stats: size,
connect_tx: connect_tx,
error_tx: error_tx,
message_tx: message_tx
}
}
pub fn to_borrowed(&self) -> Option<borrowed::RedisClientRemote> {
let borrowed_guard = self.borrowed.read();
borrowed_guard.deref().clone()
}
pub fn into_borrowed(self) -> Option<borrowed::RedisClientRemote> {
let mut borrowed_guard = self.borrowed.write();
borrowed_guard.deref_mut().take()
}
pub fn inner_borrowed(&self) -> &Arc<RwLock<Option<RedisClientBorrowed>>> {
&self.borrowed
}
pub fn state(&self) -> ClientState {
utils::read_state(&self.state)
}
#[doc(hidden)]
pub fn state_cloned(&self) -> Arc<RwLock<Arc<RwLock<ClientState>>>> {
self.state.clone()
}
#[cfg(feature="metrics")]
pub fn read_latency_metrics(&self) -> Option<LatencyMetrics> {
let latency_guard = self.latency_stats.read();
if let Some(ref latency_stats) = *latency_guard.deref() {
Some(metrics::read_latency_stats(latency_stats))
}else{
None
}
}
#[cfg(feature="metrics")]
pub fn take_latency_metrics(&self) -> Option<LatencyMetrics> {
let latency_guard = self.latency_stats.read();
if let Some(ref latency_stats) = *latency_guard.deref() {
Some(metrics::take_latency_stats(latency_stats))
}else{
None
}
}
#[cfg(feature="metrics")]
pub fn read_size_metrics(&self) -> Option<SizeMetrics> {
let size_guard = self.size_stats.read();
if let Some(ref size_stats) = *size_guard.deref() {
Some(metrics::read_size_stats(size_stats))
}else{
None
}
}
#[cfg(feature="metrics")]
pub fn take_size_metrics(&self) -> Option<SizeMetrics> {
let size_guard = self.size_stats.read();
if let Some(ref size_stats) = *size_guard.deref() {
Some(metrics::take_size_stats(size_stats))
}else{
None
}
}
pub fn init(&self, client: RedisClient) -> Box<Future<Item=RedisClient, Error=RedisError>> {
let borrowed = borrowed::RedisClientRemote::new();
utils::replace_state(&self.state, client.state_cloned());
utils::transfer_senders(&self.connect_tx, borrowed.read_connect_tx());
utils::transfer_senders(&self.error_tx, borrowed.read_error_tx());
utils::transfer_senders(&self.message_tx, borrowed.read_message_tx());
{
let mut borrowed_guard = self.borrowed.write();
let borrowed_ref = borrowed_guard.deref_mut();
*borrowed_ref = Some(borrowed.clone());
}
{
let (latency, size) = client.metrics_trackers_cloned();
let mut latency_guard = self.latency_stats.write();
let latency_ref = latency_guard.deref_mut();
*latency_ref = Some(latency);
let mut size_guard = self.size_stats.write();
let size_ref = size_guard.deref_mut();
*size_ref = Some(size);
}
borrowed.init(client)
}
pub fn close(&mut self) {
let mut borr_guard = self.borrowed.write();
let mut borr_ref = borr_guard.deref_mut();
match *borr_ref {
Some(ref mut borr) => borr.close(),
None => {}
};
}
pub fn on_connect(&self) -> Box<Future<Item=(), Error=RedisError>> {
let is_initialized = {
let borrowed_guard = self.borrowed.read();
borrowed_guard.deref().is_some()
};
if is_initialized {
let borrowed_guard = self.borrowed.read();
let borrowed_ref = borrowed_guard.deref();
let borrowed = borrowed_ref.as_ref().unwrap();
borrowed.on_connect()
}else{
let (tx, rx) = oneshot_channel();
let mut connect_tx_guard = self.connect_tx.write();
let mut connect_tx_ref = connect_tx_guard.deref_mut();
connect_tx_ref.push_back(tx);
Box::new(rx.from_err::<RedisError>().flatten())
}
}
pub fn on_error(&self) -> Box<Stream<Item=RedisError, Error=RedisError>> {
let is_initialized = {
let borrowed_guard = self.borrowed.read();
borrowed_guard.deref().is_some()
};
if is_initialized {
let borrowed_guard = self.borrowed.read();
let borrowed_ref = borrowed_guard.deref();
let borrowed = borrowed_ref.as_ref().unwrap();
borrowed.on_error()
}else{
let (tx, rx) = unbounded();
let mut error_tx_guard = self.error_tx.write();
let mut error_tx_ref = error_tx_guard.deref_mut();
error_tx_ref.push_back(tx);
Box::new(rx.from_err::<RedisError>())
}
}
pub fn on_message(&self) -> Box<Stream<Item=(String, RedisValue), Error=RedisError>> {
let is_initialized = {
let borrowed_guard = self.borrowed.read();
borrowed_guard.deref().is_some()
};
if is_initialized {
let borrowed_guard = self.borrowed.read();
let borrowed_ref = borrowed_guard.deref();
let borrowed = borrowed_ref.as_ref().unwrap();
borrowed.on_message()
}else{
let (tx, rx) = unbounded();
let mut message_tx_guard = self.message_tx.write();
let mut message_tx_ref = message_tx_guard.deref_mut();
message_tx_ref.push_back(tx);
Box::new(rx.from_err::<RedisError>())
}
}
pub fn select(self, db: u8) -> Box<Future<Item=Self, Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.select(db).and_then(move |_| {
Ok(_self)
}))
})
}
pub fn subscribe<K: Into<String>>(self, channel: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.subscribe(channel).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn unsubscribe<K: Into<String>>(self, channel: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.unsubscribe(channel).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn publish<K: Into<String>, V: Into<RedisValue>>(self, channel: K, message: V) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.publish(channel, message).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn get<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, Option<RedisValue>), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.get(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn set<K: Into<RedisKey>, V: Into<RedisValue>>(self, key: K, value: V, expire: Option<Expiration>, options: Option<SetOptions>) -> Box<Future<Item=(Self, bool), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.set(key, value, expire, options).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn del<K: Into<MultipleKeys>>(self, keys: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.del(keys).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn decr<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.decr(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn incr<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.incr(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn incrby<K: Into<RedisKey>>(self, key: K, incr: i64) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.incrby(key, incr).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn incrbyfloat<K: Into<RedisKey>>(self, key: K, incr: f64) -> Box<Future<Item=(Self, f64), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.incrbyfloat(key, incr).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hget<F: Into<RedisKey>, K: Into<RedisKey>>(self, key: K, field: F) -> Box<Future<Item=(Self, Option<RedisValue>), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hget(key, field).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hgetall<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, HashMap<String, RedisValue>), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hgetall(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hset<K: Into<RedisKey>, F: Into<RedisKey>, V: Into<RedisValue>>(self, key: K, field: F, value: V) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hset(key, field, value).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hdel<K: Into<RedisKey>, F: Into<MultipleKeys>>(self, key: K, fields: F) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hdel(key, fields).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hlen<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hlen(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hmget<F: Into<MultipleKeys>, K: Into<RedisKey>> (self, key: K, fields: F) -> Box<Future<Item=(Self, Vec<RedisValue>), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hmget(key, fields).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hmset<V: Into<RedisValue>, F: Into<RedisKey> + Hash + Eq, K: Into<RedisKey>> (self, key: K, values: HashMap<F, V>) -> Box<Future<Item=(Self, String), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hmset(key, values).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hsetnx<K: Into<RedisKey>, F: Into<RedisKey>, V: Into<RedisValue>> (self, key: K, field: F, value: V) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hsetnx(key, field, value).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hstrlen<K: Into<RedisKey>, F: Into<RedisKey>> (self, key: K, field: F) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hstrlen(key, field).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hvals<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, Vec<RedisValue>), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hvals(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
pub fn hkeys<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, Vec<String>), Error=RedisError>> {
utils::run_borrowed(self, move |_self, borrowed| {
Box::new(borrowed.hkeys(key).and_then(move |resp| {
Ok((_self, resp))
}))
})
}
}