use connection::ConnectionLike;
use types::{
from_redis_value, ErrorKind, FromRedisValue, RedisFuture, RedisResult, ToRedisArgs, Value,
};
use futures::Future;
#[derive(Clone)]
enum Arg<'a> {
Simple(Vec<u8>),
Cursor,
Borrowed(&'a [u8]),
}
#[derive(Clone)]
pub struct Cmd {
args: Vec<Arg<'static>>,
cursor: Option<u64>,
is_ignored: bool,
}
#[derive(Clone)]
pub struct Pipeline {
commands: Vec<Cmd>,
transaction_mode: bool,
}
pub struct Iter<'a, T: FromRedisValue> {
batch: Vec<T>,
cursor: u64,
con: &'a (ConnectionLike + 'a),
cmd: Cmd,
}
impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<T> {
loop {
match self.batch.pop() {
Some(v) => {
return Some(v);
}
None => {}
};
if self.cursor == 0 {
return None;
}
let pcmd = unwrap_or!(
self.cmd.get_packed_command_with_cursor(self.cursor),
return None
);
let rv = unwrap_or!(self.con.req_packed_command(&pcmd).ok(), return None);
let (cur, mut batch): (u64, Vec<T>) =
unwrap_or!(from_redis_value(&rv).ok(), return None);
batch.reverse();
self.cursor = cur;
self.batch = batch;
}
}
}
fn countdigits(mut v: usize) -> usize {
let mut result = 1;
loop {
if v < 10 {
return result;
}
if v < 100 {
return result + 1;
}
if v < 1000 {
return result + 2;
}
if v < 10000 {
return result + 3;
}
v /= 10000;
result += 4;
}
}
#[inline]
fn bulklen(len: usize) -> usize {
return 1 + countdigits(len) + 2 + len + 2;
}
fn encode_command(args: &[Arg], cursor: u64) -> Vec<u8> {
use std::io::Write;
let mut totlen = 1 + countdigits(args.len()) + 2;
for item in args {
totlen += bulklen(match *item {
Arg::Cursor => countdigits(cursor as usize),
Arg::Simple(ref val) => val.len(),
Arg::Borrowed(ptr) => ptr.len(),
});
}
let mut cmd = Vec::with_capacity(totlen);
write!(cmd, "*{}\r\n", args.len()).unwrap();
{
let mut encode = |item: &[u8]| {
write!(cmd, "${}\r\n", item.len()).unwrap();
cmd.extend(item.iter());
cmd.push('\r' as u8);
cmd.push('\n' as u8);
};
for item in args.iter() {
match *item {
Arg::Cursor => encode(cursor.to_string().as_bytes()),
Arg::Simple(ref val) => encode(val),
Arg::Borrowed(ptr) => encode(ptr),
}
}
}
cmd
}
fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
let mut rv = vec![];
if atomic {
rv.extend(cmd("MULTI").get_packed_command().into_iter());
}
for cmd in cmds.iter() {
rv.extend(cmd.get_packed_command().into_iter());
}
if atomic {
rv.extend(cmd("EXEC").get_packed_command().into_iter());
}
rv
}
impl Cmd {
pub fn new() -> Cmd {
Cmd {
args: vec![],
cursor: None,
is_ignored: false,
}
}
#[inline]
pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
let mut out = Vec::new();
arg.write_redis_args(&mut out);
for item in out {
self.args.push(Arg::Simple(item));
}
self
}
#[inline]
pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
assert!(!self.in_scan_mode());
self.cursor = Some(cursor);
self.args.push(Arg::Cursor);
self
}
#[inline]
pub fn get_packed_command(&self) -> Vec<u8> {
encode_command(&self.args, self.cursor.unwrap_or(0))
}
#[inline]
fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
if !self.in_scan_mode() {
None
} else {
Some(encode_command(&self.args, cursor))
}
}
#[inline]
pub fn in_scan_mode(&self) -> bool {
self.cursor.is_some()
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
let pcmd = self.get_packed_command();
match con.req_packed_command(&pcmd) {
Ok(val) => from_redis_value(&val),
Err(e) => Err(e),
}
}
#[inline]
pub fn query_async<C, T: FromRedisValue>(&self, con: C) -> RedisFuture<(C, T)>
where
C: ::async::ConnectionLike + Send + 'static,
T: Send + 'static,
{
let pcmd = self.get_packed_command();
Box::new(
con.req_packed_command(pcmd)
.and_then(|(con, val)| from_redis_value(&val).map(|t| (con, t))),
)
}
#[inline]
pub fn iter<'a, T: FromRedisValue>(&self, con: &'a ConnectionLike) -> RedisResult<Iter<'a, T>> {
let pcmd = self.get_packed_command();
let rv = con.req_packed_command(&pcmd)?;
let mut batch: Vec<T>;
let mut cursor = 0;
if rv.looks_like_cursor() {
let (next, b): (u64, Vec<T>) = from_redis_value(&rv)?;
batch = b;
cursor = next;
} else {
batch = from_redis_value(&rv)?;
}
batch.reverse();
Ok(Iter {
batch: batch,
cursor: cursor,
con: con,
cmd: self.clone(),
})
}
#[inline]
pub fn execute(&self, con: &ConnectionLike) {
let _: () = self.query(con).unwrap();
}
}
impl Pipeline {
pub fn new() -> Pipeline {
Self::with_capacity(0)
}
pub fn with_capacity(capacity: usize) -> Pipeline {
Pipeline {
commands: Vec::with_capacity(capacity),
transaction_mode: false,
}
}
#[inline]
pub fn cmd(&mut self, name: &str) -> &mut Pipeline {
self.commands.push(cmd(name));
self
}
#[inline]
pub fn add_command(&mut self, cmd: &Cmd) -> &mut Pipeline {
self.commands.push(cmd.clone());
self
}
#[inline]
fn get_last_command(&mut self) -> &mut Cmd {
let idx = match self.commands.len() {
0 => panic!("No command on stack"),
x => x - 1,
};
&mut self.commands[idx]
}
#[inline]
pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Pipeline {
{
let cmd = self.get_last_command();
cmd.arg(arg);
}
self
}
#[inline]
pub fn ignore(&mut self) -> &mut Pipeline {
{
let cmd = self.get_last_command();
cmd.is_ignored = true;
}
self
}
#[inline]
pub fn atomic(&mut self) -> &mut Pipeline {
self.transaction_mode = true;
self
}
fn make_pipeline_results(&self, resp: Vec<Value>) -> Value {
let mut rv = vec![];
for (idx, result) in resp.into_iter().enumerate() {
if !self.commands[idx].is_ignored {
rv.push(result);
}
}
Value::Bulk(rv)
}
fn execute_pipelined(&self, con: &ConnectionLike) -> RedisResult<Value> {
Ok(self.make_pipeline_results(con.req_packed_commands(
&encode_pipeline(&self.commands, false),
0,
self.commands.len(),
)?))
}
fn execute_transaction(&self, con: &ConnectionLike) -> RedisResult<Value> {
let mut resp = con.req_packed_commands(
&encode_pipeline(&self.commands, true),
self.commands.len() + 1,
1,
)?;
match resp.pop() {
Some(Value::Nil) => Ok(Value::Nil),
Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)),
_ => fail!((
ErrorKind::ResponseError,
"Invalid response when parsing multi response"
)),
}
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
from_redis_value(
&(if self.commands.len() == 0 {
Value::Bulk(vec![])
} else if self.transaction_mode {
self.execute_transaction(con)?
} else {
self.execute_pipelined(con)?
}),
)
}
#[inline]
pub fn clear(&mut self) {
self.commands.clear();
}
fn execute_pipelined_async<C>(self, con: C) -> RedisFuture<(C, Value)>
where
C: ::async::ConnectionLike + Send + 'static,
{
Box::new(
con.req_packed_commands(
encode_pipeline(&self.commands, false),
0,
self.commands.len(),
)
.map(move |(con, value)| (con, self.make_pipeline_results(value))),
)
}
fn execute_transaction_async<C>(self, con: C) -> RedisFuture<(C, Value)>
where
C: ::async::ConnectionLike + Send + 'static,
{
Box::new(
con.req_packed_commands(
encode_pipeline(&self.commands, true),
self.commands.len() + 1,
1,
)
.and_then(move |(con, mut resp)| match resp.pop() {
Some(Value::Nil) => Ok((con, Value::Nil)),
Some(Value::Bulk(items)) => Ok((con, self.make_pipeline_results(items))),
_ => fail!((
ErrorKind::ResponseError,
"Invalid response when parsing multi response"
)),
}),
)
}
#[inline]
pub fn query_async<C, T: FromRedisValue>(self, con: C) -> RedisFuture<(C, T)>
where
C: ::async::ConnectionLike + Send + 'static,
T: Send + 'static,
{
use futures::future;
let future = if self.commands.len() == 0 {
return Box::new(future::result(
from_redis_value(&Value::Bulk(vec![])).map(|v| (con, v)),
));
} else if self.transaction_mode {
self.execute_transaction_async(con)
} else {
self.execute_pipelined_async(con)
};
Box::new(future.and_then(|(con, v)| Ok((con, from_redis_value(&v)?))))
}
#[inline]
pub fn execute(&self, con: &ConnectionLike) {
let _: () = self.query(con).unwrap();
}
}
pub fn cmd<'a>(name: &'a str) -> Cmd {
let mut rv = Cmd::new();
rv.arg(name);
rv
}
pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
encode_command(
&args.iter().map(|x| Arg::Borrowed(x)).collect::<Vec<_>>(),
0,
)
}
pub fn pipe() -> Pipeline {
Pipeline::new()
}