#![allow(unused_mut)]
use parking_lot::{
RwLock
};
use std::sync::Arc;
use std::ops::{Deref, DerefMut};
use std::fmt;
use std::hash::Hash;
use futures::future;
use futures::Future;
use futures::sync::oneshot::{
channel as oneshot_channel,
};
use futures::sync::mpsc::{
UnboundedSender,
unbounded
};
use futures::stream::{
Stream
};
use futures::sink::Sink;
use boxfnonce::SendBoxFnOnce;
use std::collections::{
HashMap,
VecDeque
};
use ::error::*;
use ::types::*;
use ::utils as client_utils;
use ::RedisClient;
#[cfg(feature="metrics")]
use ::metrics;
use ::metrics::{
SizeTracker,
LatencyTracker
};
#[cfg(feature="metrics")]
use ::metrics::{
LatencyMetrics,
SizeMetrics
};
use super::utils;
use super::owned;
use super::commands;
use super::commands::{
CommandFn,
ConnectSender
};
#[derive(Clone)]
pub struct RedisClientRemote {
state: Arc<RwLock<Arc<RwLock<ClientState>>>>,
command_tx: Arc<RwLock<Option<UnboundedSender<CommandFn>>>>,
connect_tx: Arc<RwLock<VecDeque<ConnectSender>>>,
error_tx: Arc<RwLock<VecDeque<UnboundedSender<RedisError>>>>,
message_tx: Arc<RwLock<VecDeque<UnboundedSender<(String, RedisValue)>>>>,
latency_stats: Arc<RwLock<Option<Arc<RwLock<LatencyTracker>>>>>,
size_stats: Arc<RwLock<Option<Arc<RwLock<SizeTracker>>>>>,
}
impl fmt::Debug for RedisClientRemote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[RedisClientRemote]")
}
}
impl RedisClientRemote {
pub fn new() -> RedisClientRemote {
RedisClientRemote {
state: utils::init_state(),
command_tx: Arc::new(RwLock::new(None)),
connect_tx: Arc::new(RwLock::new(VecDeque::new())),
error_tx: Arc::new(RwLock::new(VecDeque::new())),
message_tx: Arc::new(RwLock::new(VecDeque::new())),
latency_stats: Arc::new(RwLock::new(None)),
size_stats: Arc::new(RwLock::new(None))
}
}
pub fn to_owned(&self) -> owned::RedisClientRemote {
owned::RedisClientRemote::from_borrowed(self.clone())
}
pub fn into_owned(self) -> owned::RedisClientRemote {
owned::RedisClientRemote::from_borrowed(self)
}
pub fn state(&self) -> ClientState {
utils::read_state(&self.state)
}
#[doc(hidden)]
pub fn state_cloned(&self) -> Arc<RwLock<Arc<RwLock<ClientState>>>> {
self.state.clone()
}
#[cfg(feature="metrics")]
pub fn read_latency_metrics(&self) -> Option<LatencyMetrics> {
let latency_guard = self.latency_stats.read();
if let Some(ref latency_stats) = *latency_guard.deref() {
Some(metrics::read_latency_stats(latency_stats))
}else{
None
}
}
#[cfg(feature="metrics")]
pub fn take_latency_metrics(&self) -> Option<LatencyMetrics> {
let latency_guard = self.latency_stats.read();
if let Some(ref latency_stats) = *latency_guard.deref() {
Some(metrics::take_latency_stats(latency_stats))
}else{
None
}
}
#[cfg(feature="metrics")]
pub fn read_size_metrics(&self) -> Option<SizeMetrics> {
let size_guard = self.size_stats.read();
if let Some(ref size_stats) = *size_guard.deref() {
Some(metrics::read_size_stats(size_stats))
}else{
None
}
}
#[cfg(feature="metrics")]
pub fn take_size_metrics(&self) -> Option<SizeMetrics> {
let size_guard = self.size_stats.read();
if let Some(ref size_stats) = *size_guard.deref() {
Some(metrics::take_size_stats(size_stats))
}else{
None
}
}
#[doc(hidden)]
pub fn read_metrics_trackers(&self) -> (&Arc<RwLock<Option<Arc<RwLock<LatencyTracker>>>>>, &Arc<RwLock<Option<Arc<RwLock<SizeTracker>>>>>) {
(&self.latency_stats, &self.size_stats)
}
#[doc(hidden)]
pub fn read_connect_tx(&self) -> &Arc<RwLock<VecDeque<ConnectSender>>> {
&self.connect_tx
}
#[doc(hidden)]
pub fn read_error_tx(&self) -> &Arc<RwLock<VecDeque<UnboundedSender<RedisError>>>> {
&self.error_tx
}
#[doc(hidden)]
pub fn read_message_tx(&self) -> &Arc<RwLock<VecDeque<UnboundedSender<(String, RedisValue)>>>> {
&self.message_tx
}
pub fn init(&self, client: RedisClient) -> Box<Future<Item=RedisClient, Error=RedisError>> {
let (tx, rx) = unbounded();
{
let mut command_guard = self.command_tx.write();
let mut command_ref = command_guard.deref_mut();
if let Some(mut tx) = command_ref.take() {
let _ = tx.close();
}
*command_ref = Some(tx);
}
utils::replace_state(&self.state, client.state_cloned());
let (latency, size) = client.metrics_trackers_cloned();
{
let mut latency_guard = self.latency_stats.write();
let latency_ref = latency_guard.deref_mut();
*latency_ref = Some(latency);
let mut size_guard = self.size_stats.write();
let size_ref = size_guard.deref_mut();
*size_ref = Some(size);
}
let commands_ft = rx.from_err::<RedisError>().fold((client.clone(), client), |(backup, client), func| {
func.call_tuple((client,)).and_then(move |client: Option<RedisClient>| {
match client {
Some(c) => Ok((backup, c)),
None => Ok((backup.clone(), backup))
}
})
})
.map(|(_, client)| client);
utils::register_callbacks(&self.command_tx, &self.connect_tx, &self.error_tx, &self.message_tx);
Box::new(commands_ft)
}
pub fn close(&mut self) {
let mut tx_guard = self.command_tx.write();
let mut tx_ref = tx_guard.deref_mut();
if let Some(ref mut tx) = *tx_ref {
let _ = tx.close();
}
}
pub fn on_connect(&self) -> Box<Future<Item=(), Error=RedisError>> {
let is_initialized = {
let command_tx_guard = self.command_tx.read();
command_tx_guard.deref().is_some()
};
let (tx, rx) = oneshot_channel();
let out = Box::new(rx.from_err::<RedisError>().flatten());
if is_initialized {
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
client.register_connect_callback(tx);
client_utils::future_ok(Some(client))
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => out,
Err(e) => client_utils::future_error(e)
}
}else{
let mut connect_tx_guard = self.connect_tx.write();
let mut connect_tx_refs = connect_tx_guard.deref_mut();
connect_tx_refs.push_back(tx);
out
}
}
pub fn on_error(&self) -> Box<Stream<Item=RedisError, Error=RedisError>> {
let is_initialized = {
let command_tx_guard = self.command_tx.read();
command_tx_guard.deref().is_some()
};
let (tx, rx) = unbounded();
let out = Box::new(rx.from_err::<RedisError>());
if is_initialized {
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
utils::transfer_sender(client.errors_cloned(), tx);
client_utils::future_ok(Some(client))
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => out,
Err(e) => Box::new(future::err(e).into_stream())
}
}else{
let mut error_tx_guard = self.error_tx.write();
let mut error_tx_ref = error_tx_guard.deref_mut();
error_tx_ref.push_back(tx);
out
}
}
pub fn on_message(&self) -> Box<Stream<Item=(String, RedisValue), Error=RedisError>> {
let is_initialized = {
let command_tx_guard = self.command_tx.read();
command_tx_guard.deref().is_some()
};
let (tx, rx) = unbounded();
let out = Box::new(rx.from_err::<RedisError>());
if is_initialized {
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
utils::transfer_sender(client.messages_cloned(), tx);
client_utils::future_ok(Some(client))
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => out,
Err(e) => Box::new(future::err(e).into_stream())
}
}else{
let mut message_tx_guard = self.message_tx.write();
let mut message_tx_ref = message_tx_guard.deref_mut();
message_tx_ref.push_back(tx);
out
}
}
pub fn select(&self, db: u8) -> Box<Future<Item=(), Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
commands::select(client, tx, db)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn subscribe<K: Into<String>>(&self, channel: K) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let channel = channel.into();
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
commands::subscribe(client, tx, channel)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn unsubscribe<K: Into<String>>(&self, channel: K) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let channel = channel.into();
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
commands::unsubscribe(client, tx, channel)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn publish<K: Into<String>, V: Into<RedisValue>>(&self, channel: K, message: V) -> Box<Future<Item=i64, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let channel = channel.into();
let message = message.into();
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
commands::publish(client, tx, channel, message)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn get<K: Into<RedisKey>>(&self, key: K) -> Box<Future<Item=Option<RedisValue>, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key: RedisKey = key.into();
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
commands::get(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn set<K: Into<RedisKey>, V: Into<RedisValue>>(&self, key: K, value: V, expire: Option<Expiration>, options: Option<SetOptions>) -> Box<Future<Item=bool, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let (key, value) = (key.into(), value.into());
let func: CommandFn = SendBoxFnOnce::from(move |client: RedisClient| {
commands::set(client, tx, key, value, expire, options)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn del<K: Into<MultipleKeys>>(&self, keys: K) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let keys = keys.into().inner();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::del(client, tx, keys)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn decr<K: Into<RedisKey>>(&self, key: K) -> Box<Future<Item=i64, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::decr(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn incr<K: Into<RedisKey>>(&self, key: K) -> Box<Future<Item=i64, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::incr(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn incrby<K: Into<RedisKey>>(&self, key: K, incr: i64) -> Box<Future<Item=i64, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::incrby(client, tx, key, incr)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn incrbyfloat<K: Into<RedisKey>>(&self, key: K, incr: f64) -> Box<Future<Item=f64, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::incrbyfloat(client, tx, key, incr)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hget<F: Into<RedisKey>, K: Into<RedisKey>>(&self, key: K, field: F) -> Box<Future<Item=Option<RedisValue>, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let (key, field) = (key.into(), field.into());
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hget(client, tx, key, field)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hgetall<K: Into<RedisKey>>(&self, key: K) -> Box<Future<Item=HashMap<String, RedisValue>, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hgetall(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hset<K: Into<RedisKey>, F: Into<RedisKey>, V: Into<RedisValue>>(&self, key: K, field: F, value: V) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let (key, field, value) = (key.into(), field.into(), value.into());
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hset(client, tx, key, field, value)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hdel<K: Into<RedisKey>, F: Into<MultipleKeys>>(&self, key: K, fields: F) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let fields = fields.into().inner();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hdel(client, tx, key, fields)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hlen<K: Into<RedisKey>> (&self, key: K) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hlen(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hmget<F: Into<MultipleKeys>, K: Into<RedisKey>> (&self, key: K, fields: F) -> Box<Future<Item=Vec<RedisValue>, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let fields = fields.into().inner();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hmget(client, tx, key, fields)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hmset<V: Into<RedisValue>, F: Into<RedisKey> + Hash + Eq, K: Into<RedisKey>> (&self, key: K, mut values: HashMap<F, V>) -> Box<Future<Item=String, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let mut owned_values = HashMap::with_capacity(values.len());
for (key, val) in values.drain() {
owned_values.insert(key.into(), val.into());
}
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hmset(client, tx, key, owned_values)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hsetnx<K: Into<RedisKey>, F: Into<RedisKey>, V: Into<RedisValue>> (&self, key: K, field: F, value: V) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let (key, field, value) = (key.into(), field.into(), value.into());
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hsetnx(client, tx, key, field, value)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hstrlen<K: Into<RedisKey>, F: Into<RedisKey>> (&self, key: K, field: F) -> Box<Future<Item=usize, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let (key, field) = (key.into(), field.into());
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hstrlen(client, tx, key, field)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hvals<K: Into<RedisKey>> (&self, key: K) -> Box<Future<Item=Vec<RedisValue>, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hvals(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
pub fn hkeys<K: Into<RedisKey>> (&self, key: K) -> Box<Future<Item=Vec<String>, Error=RedisError>> {
let (tx, rx) = oneshot_channel();
let key = key.into();
let func: CommandFn = SendBoxFnOnce::new(move |client: RedisClient| {
commands::hkeys(client, tx, key)
});
match utils::send_command(&self.command_tx, func) {
Ok(_) => Box::new(rx.from_err::<RedisError>().flatten()),
Err(e) => client_utils::future_error(e)
}
}
}