#![allow(unused_mut)]
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_proto;
extern crate tokio_timer;
extern crate bytes;
extern crate parking_lot;
extern crate url;
extern crate crc16;
#[cfg(feature="metrics")]
extern crate chrono;
#[cfg(feature="sync")]
extern crate boxfnonce;
#[cfg(feature="enable-flame")]
extern crate flame;
#[macro_use]
extern crate log;
extern crate pretty_env_logger;
#[macro_use]
mod _flame;
#[macro_use]
mod utils;
mod loop_serve;
#[cfg(feature="metrics")]
pub mod metrics;
#[cfg(not(feature="metrics"))]
mod metrics;
use metrics::{
SizeTracker,
LatencyTracker
};
#[cfg(feature="metrics")]
use metrics::{
LatencyMetrics,
SizeMetrics
};
pub mod error;
pub mod types;
#[cfg(feature="sync")]
pub mod sync;
#[cfg(feature="fuzz")]
pub mod protocol;
#[cfg(not(feature="fuzz"))]
mod protocol;
use std::ops::Deref;
use std::ops::DerefMut;
use std::sync::Arc;
use std::hash::Hash;
use std::collections::{
HashMap,
VecDeque
};
use parking_lot::RwLock;
use tokio_core::reactor::Handle;
use futures::{
Future,
Stream
};
use futures::sync::oneshot::{
Sender as OneshotSender,
channel as oneshot_channel
};
use error::{
RedisErrorKind,
RedisError
};
use types::{
SetOptions,
Expiration,
InfoKind,
ClientState,
RedisKey,
RedisValue,
RedisValueKind,
RedisConfig,
ReconnectPolicy,
MultipleKeys,
ASYNC
};
use loop_serve::{
ConnectionFuture
};
use protocol::types::{
RedisCommand,
RedisCommandKind
};
use futures::sync::mpsc::{
UnboundedSender,
unbounded
};
use std::fmt;
use std::rc::Rc;
use std::cell::RefCell;
#[cfg(feature="mock")]
mod mocks;
#[derive(Clone)]
pub struct RedisClient {
state: Arc<RwLock<ClientState>>,
config: Rc<RefCell<RedisConfig>>,
error_tx: Rc<RefCell<VecDeque<UnboundedSender<RedisError>>>>,
command_tx: Rc<RefCell<Option<UnboundedSender<RedisCommand>>>>,
message_tx: Rc<RefCell<VecDeque<UnboundedSender<(String, RedisValue)>>>>,
reconnect_tx: Rc<RefCell<VecDeque<UnboundedSender<RedisClient>>>>,
connect_tx: Rc<RefCell<VecDeque<OneshotSender<Result<RedisClient, RedisError>>>>>,
closed: Arc<RwLock<bool>>,
remote_tx: Rc<RefCell<VecDeque<OneshotSender<Result<(), RedisError>>>>>,
latency_stats: Arc<RwLock<LatencyTracker>>,
size_stats: Arc<RwLock<SizeTracker>>
}
impl fmt::Debug for RedisClient {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "[RedisClient]")
}
}
impl RedisClient {
pub fn new(config: RedisConfig) -> RedisClient {
let state = ClientState::Disconnected;
let latency = LatencyTracker::default();
let size = SizeTracker::default();
RedisClient {
config: Rc::new(RefCell::new(config)),
state: Arc::new(RwLock::new(state)),
error_tx: Rc::new(RefCell::new(VecDeque::new())),
command_tx: Rc::new(RefCell::new(None)),
message_tx: Rc::new(RefCell::new(VecDeque::new())),
reconnect_tx: Rc::new(RefCell::new(VecDeque::new())),
connect_tx: Rc::new(RefCell::new(VecDeque::new())),
closed: Arc::new(RwLock::new(false)),
remote_tx: Rc::new(RefCell::new(VecDeque::new())),
latency_stats: Arc::new(RwLock::new(latency)),
size_stats: Arc::new(RwLock::new(size))
}
}
pub fn quit(self) -> Box<Future<Item=Self, Error=RedisError>> {
debug!("Closing Redis connection.");
let exit_early = {
let mut closed_guard = self.closed.write();
let mut closed_ref = closed_guard.deref_mut();
if self.state() != ClientState::Connected {
if *closed_ref {
true
}else{
*closed_ref = true;
true
}
}else{
false
}
};
loop_serve::utils::close_error_tx(&self.error_tx);
loop_serve::utils::close_reconnect_tx(&self.reconnect_tx);
loop_serve::utils::close_messages_tx(&self.message_tx);
loop_serve::utils::close_connect_tx(&self.connect_tx, &self.remote_tx);
if exit_early {
utils::future_ok(self)
}else{
Box::new(utils::request_response(&self.command_tx, &self.state, || {
Ok((RedisCommandKind::Quit, vec![]))
}).and_then(|_| {
Ok(self)
}))
}
}
pub fn state(&self) -> ClientState {
let state_guard = self.state.read();
state_guard.deref().clone()
}
#[cfg(feature="metrics")]
pub fn read_latency_metrics(&self) -> LatencyMetrics {
metrics::read_latency_stats(&self.latency_stats)
}
#[cfg(feature="metrics")]
pub fn take_latency_metrics(&self) -> LatencyMetrics {
metrics::take_latency_stats(&self.latency_stats)
}
#[cfg(feature="metrics")]
pub fn read_size_metrics(&self) -> SizeMetrics {
metrics::read_size_stats(&self.size_stats)
}
#[cfg(feature="metrics")]
pub fn take_size_metrics(&self) -> SizeMetrics {
metrics::take_size_stats(&self.size_stats)
}
#[doc(hidden)]
pub fn metrics_trackers_cloned(&self) -> (Arc<RwLock<LatencyTracker>>, Arc<RwLock<SizeTracker>>) {
(self.latency_stats.clone(), self.size_stats.clone())
}
#[doc(hidden)]
#[cfg(feature="sync")]
pub fn state_cloned(&self) -> Arc<RwLock<ClientState>> {
self.state.clone()
}
#[doc(hidden)]
#[cfg(feature="sync")]
pub fn messages_cloned(&self) -> Rc<RefCell<VecDeque<UnboundedSender<(String, RedisValue)>>>> {
self.message_tx.clone()
}
#[doc(hidden)]
#[cfg(feature="sync")]
pub fn errors_cloned(&self) -> Rc<RefCell<VecDeque<UnboundedSender<RedisError>>>> {
self.error_tx.clone()
}
#[doc(hidden)]
#[cfg(feature="sync")]
pub fn register_connect_callback(&self, tx: OneshotSender<Result<(), RedisError>>) {
let mut remote_tx_refs = self.remote_tx.borrow_mut();
remote_tx_refs.push_back(tx);
}
pub fn connect(&self, handle: &Handle) -> ConnectionFuture {
fry!(utils::check_client_state(ClientState::Disconnected, &self.state));
fry!(utils::check_and_set_closed_flag(&self.closed, false));
let (config, state, error_tx, message_tx, command_tx, connect_tx, reconnect_tx, remote_tx) = (
self.config.clone(),
self.state.clone(),
self.error_tx.clone(),
self.message_tx.clone(),
self.command_tx.clone(),
self.connect_tx.clone(),
self.reconnect_tx.clone(),
self.remote_tx.clone()
);
debug!("Connecting to Redis server.");
loop_serve::init(self.clone(), handle, config, state, error_tx, message_tx, command_tx, connect_tx, reconnect_tx, remote_tx)
}
pub fn connect_with_policy(&self, handle: &Handle, mut policy: ReconnectPolicy) -> Box<Future<Item=(), Error=RedisError>> {
fry!(utils::check_client_state(ClientState::Disconnected, &self.state));
fry!(utils::check_and_set_closed_flag(&self.closed, false));
let (client, config, state, error_tx, message_tx, command_tx, reconnect_tx, connect_tx, closed, remote_tx) = (
self.clone(),
self.config.clone(),
self.state.clone(),
self.error_tx.clone(),
self.message_tx.clone(),
self.command_tx.clone(),
self.reconnect_tx.clone(),
self.connect_tx.clone(),
self.closed.clone(),
self.remote_tx.clone()
);
policy.reset_attempts();
debug!("Connecting to Redis server with reconnect policy.");
loop_serve::init_with_policy(client, handle, config, state, closed, error_tx, message_tx, command_tx, reconnect_tx, connect_tx, remote_tx, policy)
}
pub fn on_reconnect(&self) -> Box<Stream<Item=Self, Error=RedisError>> {
let (tx, rx) = unbounded();
{
let mut reconnect_ref = self.reconnect_tx.borrow_mut();
reconnect_ref.push_back(tx);
}
Box::new(rx.from_err::<RedisError>())
}
pub fn on_connect(&self) -> Box<Future<Item=Self, Error=RedisError>> {
if utils::read_client_state(&self.state) == ClientState::Connected {
return utils::future_ok(self.clone());
}
let (tx, rx) = oneshot_channel();
{
let mut connect_ref = self.connect_tx.borrow_mut();
connect_ref.push_back(tx);
}
Box::new(rx.from_err::<RedisError>().flatten())
}
pub fn on_error(&self) -> Box<Stream<Item=RedisError, Error=RedisError>> {
let (tx, rx) = unbounded();
{
let mut error_tx_ref = self.error_tx.borrow_mut();
error_tx_ref.push_back(tx);
}
Box::new(rx.from_err::<RedisError>())
}
pub fn on_message(&self) -> Box<Stream<Item=(String, RedisValue), Error=RedisError>> {
let (tx, rx) = unbounded();
{
let mut message_tx_ref = self.message_tx.borrow_mut();
message_tx_ref.push_back(tx);
}
Box::new(rx.from_err::<RedisError>())
}
pub fn is_clustered(&self) -> bool {
utils::is_clustered(&self.config)
}
pub fn split_cluster(&self, handle: &Handle) -> Box<Future<Item=Vec<(RedisClient, RedisConfig)>, Error=RedisError>> {
if utils::is_clustered(&self.config) {
utils::split(&self.command_tx, &self.config, handle)
}else{
utils::future_error(RedisError::new(
RedisErrorKind::Unknown, "Client is not using a clustered deployment."
))
}
}
pub fn select(self, db: u8) -> Box<Future<Item=Self, Error=RedisError>> {
debug!("Selecting Redis database {}", db);
Box::new(utils::request_response(&self.command_tx, &self.state, || {
Ok((RedisCommandKind::Select, vec![RedisValue::from(db)]))
}).and_then(|frame| {
match frame.into_single_result() {
Ok(_) => Ok(self),
Err(e) => Err(e)
}
}))
}
pub fn info(self, section: Option<InfoKind>) -> Box<Future<Item=(Self, String), Error=RedisError>> {
let section = section.map(|k| k.to_string());
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let vec = match section {
Some(s) => vec![RedisValue::from(s)],
None => vec![]
};
Ok((RedisCommandKind::Info, vec))
}).and_then(|frame| {
match frame.into_single_result() {
Ok(resp) => {
let kind = resp.kind();
match resp.into_string() {
Some(s) => Ok((self, s)),
None => Err(RedisError::new(
RedisErrorKind::Unknown, format!("Invalid INFO response. Expected String, found {:?}", kind)
))
}
},
Err(e) => Err(e)
}
}))
}
pub fn ping(self) -> Box<Future<Item=(Self, String), Error=RedisError>> {
debug!("Pinging Redis server.");
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Ping, vec![]))
}).and_then(|frame| {
debug!("Received Redis ping response.");
match frame.into_single_result() {
Ok(resp) => {
let kind = resp.kind();
match resp.into_string() {
Some(s) => Ok((self, s)),
None => Err(RedisError::new(
RedisErrorKind::Unknown, format!("Invalid PING response. Expected String, found {:?}", kind)
))
}
},
Err(e) => Err(e)
}
}))
}
pub fn subscribe<T: Into<String>>(self, channel: T) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let channel = channel.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Subscribe, vec![channel.into()]))
}).and_then(|frame| {
let mut results = frame.into_results()?;
let count = match results.pop() {
Some(c) => match c.into_u64() {
Some(i) => i,
None => return Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid SUBSCRIBE channel count response."
))
},
None => return Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid SUBSCRIBE response."
))
};
Ok((self, count as usize))
}))
}
pub fn unsubscribe<T: Into<String>>(self, channel: T) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let channel = channel.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Unsubscribe, vec![channel.into()]))
}).and_then(|frame| {
let mut results = frame.into_results()?;
let count = match results.pop() {
Some(c) => match c.into_u64() {
Some(i) => i,
None => return Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid UNSUBSCRIBE channel count response."
))
},
None => return Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid UNSUBSCRIBE response."
))
};
Ok((self, count as usize))
}))
}
pub fn publish<T: Into<String>, V: Into<RedisValue>>(self, channel: T, message: V) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
let channel = channel.into();
let message = message.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Publish, vec![channel.into(), message]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
let count = match resp.into_i64() {
Some(c) => c,
None => return Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid PUBLISH response."
))
};
Ok((self, count))
}))
}
pub fn get<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, Option<RedisValue>), Error=RedisError>> {
let _guard = flame_start!("redis:get:1");
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Get, vec![key.into()]))
}).and_then(|frame| {
let _guard = flame_start!("redis:get:2");
let resp = frame.into_single_result()?;
let resp = if resp.kind() == RedisValueKind::Null {
None
} else {
Some(resp)
};
Ok((self, resp))
}))
}
pub fn set<K: Into<RedisKey>, V: Into<RedisValue>>(self, key: K, value: V, expire: Option<Expiration>, options: Option<SetOptions>) -> Box<Future<Item=(Self, bool), Error=RedisError>> {
let _guard = flame_start!("redis:set:1");
let (key, value) = (key.into(), value.into());
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let _guard = flame_start!("redis:set:2");
let mut args = vec![key.into(), value.into()];
if let Some(expire) = expire {
let (k, v) = expire.into_args();
args.push(k.into());
args.push(v.into());
}
if let Some(options) = options {
args.push(options.to_string().into());
}
Ok((RedisCommandKind::Set, args))
}).and_then(|frame| {
let _guard = flame_start!("redis:set:3");
let resp = frame.into_single_result()?;
Ok((self, resp.kind() != RedisValueKind::Null))
}))
}
pub fn auth<V: Into<String>>(self, value: V) -> Box<Future<Item=(Self, String), Error=RedisError>> {
let value = value.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Auth, vec![value.into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp.into_string() {
Some(s) => Ok((self, s)),
None => Err(RedisError::new(
RedisErrorKind::Auth, "AUTH denied."
))
}
}))
}
pub fn bgrewriteaof(self) -> Box<Future<Item=(Self, String), Error=RedisError>> {
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::BgreWriteAof, vec![]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp.into_string() {
Some(s) => Ok((self, s)),
None => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid BGREWRITEAOF."
))
}
}))
}
pub fn bgsave(self) -> Box<Future<Item=(Self, String), Error=RedisError>> {
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::BgSave, vec![]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp.into_string() {
Some(s) => Ok((self, s)),
None => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid BgSave response."
))
}
}))
}
pub fn client_list(self) -> Box<Future<Item=(Self, String), Error=RedisError>> {
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args = vec![];
Ok((RedisCommandKind::ClientList, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp.into_string() {
Some(s) => Ok((self, s)),
None => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid CLIENTLIST response."
))
}
}))
}
pub fn client_getname(self) -> Box<Future<Item=(Self, Option<String>), Error=RedisError>> {
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::ClientGetName, vec![]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp.into_string() {
Some(s) => Ok((self, Some(s))),
None => Ok((self, None))
}
}))
}
pub fn client_setname<V: Into<String>>(self, name: V) -> Box<Future<Item=(Self, Option<String>), Error=RedisError>> {
let name = name.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::ClientSetname, vec![name.into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp.into_string() {
Some(s) => Ok((self, Some(s))),
None => Ok((self, None))
}
}))
}
pub fn dbsize(self) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::DBSize, vec![]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid DBsize response."
))
}
}))
}
pub fn decr<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Decr, vec![key.into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as i64)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid DECR response."
))
}
}))
}
pub fn decrby<V: Into<RedisValue>, K: Into<RedisKey>>(self, key: K, value: V) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args = vec![key.into(), value.into()];
Ok((RedisCommandKind::DecrBy, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as i64)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid DECRBY response."
))
}
}))
}
pub fn del<K: Into<MultipleKeys>>(self, keys: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let _guard = flame_start!("redis:del:1");
let mut keys = keys.into().inner();
let args: Vec<RedisValue> = keys.drain(..).map(|k| {
k.into()
}).collect();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Del, args))
}).and_then(|frame| {
let _guard = flame_start!("redis:del:2");
let resp = frame.into_single_result()?;
let res = match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid DEL response."
))
};
res
}))
}
pub fn dump<K: Into<RedisKey>>(self, key: K) -> Box<Future<Item=(Self, Option<String>), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Dump, vec![key.into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => Ok((self, Some(s))),
RedisValue::Null => Ok((self, None)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid DUMP response."
))
}
}))
}
pub fn exists<K: Into<MultipleKeys>>(self, keys: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let mut keys = keys.into().inner();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = keys.drain(..).map(|k| k.into()).collect();
Ok((RedisCommandKind::Exists, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid EXISTS response."
))
}
}))
}
pub fn expire<K: Into<RedisKey>>(self, key: K, seconds: i64) -> Box<Future<Item=(Self, bool), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Expire, vec![
key.into(),
seconds.into()
]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => match num {
0 => Ok((self, false)),
1 => Ok((self, true)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid EXPIRE response value."
))
},
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid EXPIRE response."
))
}
}))
}
pub fn expire_at<K: Into<RedisKey>>(self, key: K, timestamp: i64) -> Box<Future<Item=(Self, bool), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args = vec![key.into(), timestamp.into()];
Ok((RedisCommandKind::ExpireAt, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => match num {
0 => Ok((self, false)),
1 => Ok((self, true)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid EXPIREAT response value."
))
},
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid EXPIREAT response."
))
}
}))
}
pub fn flushall(self, async: bool) -> Box<Future<Item=(Self, String), Error=RedisError>> {
let args = if async {
vec![ASYNC.into()]
}else{
Vec::new()
};
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::FlushAll, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => Ok((self, s)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid FLUSHALL response."
))
}
}))
}
pub fn flushdb(self, async: bool) -> Box<Future<Item=(Self, String), Error=RedisError>> {
let args = if async {
vec![ASYNC.into()]
}else{
Vec::new()
};
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::FlushDB, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => Ok((self, s)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid FLUSHALLDB response."
))
}
}))
}
pub fn getrange<K: Into<RedisKey>> (self, key: K, start: usize, end: usize) -> Box<Future<Item=(Self, String), Error=RedisError>> {
let key = key.into();
let start = fry!(RedisValue::from_usize(start));
let end = fry!(RedisValue::from_usize(end));
let args = vec![
key.into(),
start,
end
];
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::GetRange, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => Ok((self, s)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid GETRANGE response."
))
}
}))
}
pub fn getset<V: Into<RedisValue>, K: Into<RedisKey>> (self, key: K, value: V) -> Box<Future<Item=(Self, Option<RedisValue>), Error=RedisError>> {
let (key, value) = (key.into(), value.into());
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = vec![key.into(), value.into()];
Ok((RedisCommandKind::GetSet, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Null => Ok((self, None)),
_ => Ok((self, Some(resp)))
}
}))
}
pub fn hdel<F: Into<MultipleKeys>, K: Into<RedisKey>> (self, key: K, fields: F) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let _guard = flame_start!("redis:hdel:1");
let key = key.into();
let mut fields = fields.into().inner();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let _guard = flame_start!("redis:hdel:2");
let mut args: Vec<RedisValue> = Vec::with_capacity(fields.len() + 1);
args.push(key.into());
for field in fields.drain(..) {
args.push(field.into());
}
Ok((RedisCommandKind::HDel, args))
}).and_then(|frame| {
let _guard = flame_start!("redis:hdel:3");
let resp = frame.into_single_result()?;
let res = match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid HDEL response."
))
};
res
}))
}
pub fn hexists<F: Into<RedisKey>, K: Into<RedisKey>> (self, key: K, field: F) -> Box<Future<Item=(Self, bool), Error=RedisError>> {
let key = key.into();
let field = field.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = vec![key.into(), field.into()];
Ok((RedisCommandKind::HExists, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => match num {
0 => Ok((self, false)),
1 => Ok((self, true)),
_ => Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid HEXISTS response value."
))
},
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid HEXISTS response."
))
}
}))
}
pub fn hget<F: Into<RedisKey>, K: Into<RedisKey>> (self, key: K, field: F) -> Box<Future<Item=(Self, Option<RedisValue>), Error=RedisError>> {
let _guard = flame_start!("redis:hget:1");
let key = key.into();
let field = field.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = vec![key.into(), field.into()];
Ok((RedisCommandKind::HGet, args))
}).and_then(|frame| {
let _guard = flame_start!("redis:hget:2");
let resp = frame.into_single_result()?;
let res = match resp {
RedisValue::Null => Ok((self, None)),
_ => Ok((self, Some(resp)))
};
res
}))
}
pub fn hgetall<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, HashMap<String, RedisValue>), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = vec![key.into()];
Ok((RedisCommandKind::HGetAll, args))
}).and_then(|frame| {
let mut resp = frame.into_results()?;
let mut map: HashMap<String, RedisValue> = HashMap::with_capacity(resp.len() / 2);
for mut chunk in resp.chunks_mut(2) {
let (key, val) = (chunk[0].take(), chunk[1].take());
let key = match key {
RedisValue::String(s) => s,
_ => return Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid HGETALL response."
))
};
map.insert(key, val);
}
Ok((self, map))
}))
}
pub fn hincrby<F: Into<RedisKey>, K: Into<RedisKey>> (self, key: K, field: F, incr: i64) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
let (key, field) = (key.into(), field.into());
let args: Vec<RedisValue> = vec![
key.into(),
field.into(),
incr.into()
];
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HIncrBy, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as i64)),
_ => Err(RedisError::new(
RedisErrorKind::ProtocolError, "Invalid HINCRBY response."
))
}
}))
}
pub fn hincrbyfloat<K: Into<RedisKey>, F: Into<RedisKey>> (self, key: K, field: F, incr: f64) -> Box<Future<Item=(Self, f64), Error=RedisError>> {
let (key, field) = (key.into(), field.into());
let args = vec![
key.into(),
field.into(),
incr.to_string().into()
];
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HIncrByFloat, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => match s.parse::<f64>() {
Ok(f) => Ok((self, f)),
Err(e) => Err(RedisError::new(
RedisErrorKind::Unknown, format!("Invalid HINCRBYFLOAT response: {:?}", e)
))
},
_ => Err(RedisError::new(
RedisErrorKind::InvalidArgument, "Invalid HINCRBYFLOAT response."
))
}
}))
}
pub fn hkeys<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, Vec<String>), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HKeys, vec![key.into()]))
}).and_then(|frame| {
let mut resp = frame.into_results()?;
let mut out = Vec::with_capacity(resp.len());
for val in resp.drain(..) {
let s = match val {
RedisValue::Null => "nil".to_owned(),
RedisValue::String(s) => s,
_ => return Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid HKEYS response."
))
};
out.push(s);
}
Ok((self, out))
}))
}
pub fn hlen<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HLen, vec![key.into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid HLEN response."
))
}
}))
}
pub fn hmget<F: Into<MultipleKeys>, K: Into<RedisKey>> (self, key: K, fields: F) -> Box<Future<Item=(Self, Vec<RedisValue>), Error=RedisError>> {
let key = key.into();
let mut fields = fields.into().inner();
let mut args = Vec::with_capacity(fields.len() + 1);
args.push(key.into());
for field in fields.drain(..) {
args.push(field.into());
}
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HMGet, args))
}).and_then(|frame| {
Ok((self, frame.into_results()?))
}))
}
pub fn hmset<V: Into<RedisValue>, F: Into<RedisKey> + Hash + Eq, K: Into<RedisKey>> (self, key: K, mut values: HashMap<F, V>) -> Box<Future<Item=(Self, String), Error=RedisError>> {
let key = key.into();
let mut args = Vec::with_capacity(values.len() * 2 + 1);
args.push(key.into());
for (field, value) in values.drain() {
let field = field.into();
args.push(field.into());
args.push(value.into());
}
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HMSet, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => Ok((self, s)),
_ => Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid HMSET response."
))
}
}))
}
pub fn hset<K: Into<RedisKey>, F: Into<RedisKey>, V: Into<RedisValue>> (self, key: K, field: F, value: V) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let _guard = flame_start!("redis:hset:1");
let key = key.into();
let field = field.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let _guard = flame_start!("redis:hset:2");
let args: Vec<RedisValue> = vec![key.into(), field.into(), value.into()];
Ok((RedisCommandKind::HSet, args))
}).and_then(|frame| {
let _guard = flame_start!("redis:hset:3");
let resp = frame.into_single_result()?;
let res = match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::Unknown , "Invalid HSET response."
))
};
res
}))
}
pub fn hsetnx<K: Into<RedisKey>, F: Into<RedisKey>, V: Into<RedisValue>> (self, key: K, field: F, value: V) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let (key, field, value) = (key.into(), field.into(), value.into());
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = vec![key.into(), field.into(), value];
Ok((RedisCommandKind::HSetNx, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::Unknown , "Invalid HSETNX response."
))
}
}))
}
pub fn hstrlen<K: Into<RedisKey>, F: Into<RedisKey>> (self, key: K, field: F) -> Box<Future<Item=(Self, usize), Error=RedisError>> {
let (key, field) = (key.into(), field.into());
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
let args: Vec<RedisValue> = vec![key.into(), field.into()];
Ok((RedisCommandKind::HStrLen, args))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(num) => Ok((self, num as usize)),
_ => Err(RedisError::new(
RedisErrorKind::Unknown , "Invalid HSTRLEN response."
))
}
}))
}
pub fn hvals<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, Vec<RedisValue>), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::HVals, vec![key.into()]))
}).and_then(|frame| {
Ok((self, frame.into_results()?))
}))
}
pub fn incr<K: Into<RedisKey>> (self, key: K) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::Incr, vec![key.into()]))
}).and_then(|frame| {
let _guard = flame_start!("redis:incr:1");
let resp = frame.into_single_result()?;
let res = match resp {
RedisValue::Integer(num) => Ok((self, num as i64)),
_ => Err(RedisError::new(
RedisErrorKind::InvalidArgument, "Invalid INCR response."
))
};
res
}))
}
pub fn incrby<K: Into<RedisKey>>(self, key: K, incr: i64) -> Box<Future<Item=(Self, i64), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::IncrBy, vec![key.into(), incr.into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::Integer(i) => Ok((self, i as i64)),
_ => Err(RedisError::new(
RedisErrorKind::InvalidArgument, "Invalid INCRBY response."
))
}
}))
}
pub fn incrbyfloat<K: Into<RedisKey>>(self, key: K, incr: f64) -> Box<Future<Item=(Self, f64), Error=RedisError>> {
let key = key.into();
Box::new(utils::request_response(&self.command_tx, &self.state, move || {
Ok((RedisCommandKind::IncrByFloat, vec![key.into(), incr.to_string().into()]))
}).and_then(|frame| {
let resp = frame.into_single_result()?;
match resp {
RedisValue::String(s) => match s.parse::<f64>() {
Ok(f) => Ok((self, f)),
Err(e) => Err(e.into())
},
_ => Err(RedisError::new(
RedisErrorKind::Unknown, "Invalid INCRBYFLOAT response."
))
}
}))
}
}
#[cfg(test)]
mod tests {
#![allow(dead_code)]
#![allow(unused_imports)]
#![allow(unused_variables)]
#![allow(unused_mut)]
#![allow(deprecated)]
#![allow(unused_macros)]
use super::*;
use std::sync::Arc;
use std::thread;
}