use types::{ToRedisArgs, FromRedisValue, Value, RedisResult,
ErrorKind, from_redis_value};
use connection::ConnectionLike;
#[derive(Clone)]
enum Arg<'a> {
Simple(Vec<u8>),
Cursor,
Borrowed(&'a [u8]),
}
#[derive(Clone)]
pub struct Cmd {
args: Vec<Arg<'static>>,
cursor: Option<u64>,
is_ignored: bool,
}
pub struct Pipeline {
commands: Vec<Cmd>,
transaction_mode: bool,
}
pub struct Iter<'a, T: FromRedisValue> {
batch: Vec<T>,
cursor: u64,
con: &'a (ConnectionLike + 'a),
cmd: Cmd,
}
impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<T> {
loop {
match self.batch.pop() {
Some(v) => { return Some(v); }
None => {}
};
if self.cursor == 0 {
return None;
}
let pcmd = unwrap_or!(self.cmd.get_packed_command_with_cursor(
self.cursor), return None);
let rv = unwrap_or!(self.con.req_packed_command(
&pcmd).ok(), return None);
let (cur, mut batch) : (u64, Vec<T>) = unwrap_or!(
from_redis_value(&rv).ok(), return None);
batch.reverse();
self.cursor = cur;
self.batch = batch;
}
}
}
fn countdigits(mut v: usize) -> usize {
let mut result = 1;
loop {
if v < 10 { return result; }
if v < 100 { return result + 1; }
if v < 1000 { return result + 2; }
if v < 10000 { return result + 3; }
v /= 10000;
result += 4;
}
}
#[inline]
fn bulklen(len: usize) -> usize {
return 1+countdigits(len)+2+len+2;
}
fn encode_command(args: &Vec<Arg>, cursor: u64) -> Vec<u8> {
let mut totlen = 1 + countdigits(args.len()) + 2;
for item in args {
totlen += bulklen(match *item {
Arg::Cursor => countdigits(cursor as usize),
Arg::Simple(ref val) => val.len(),
Arg::Borrowed(ptr) => ptr.len(),
});
}
let mut cmd = Vec::with_capacity(totlen);
cmd.push('*' as u8);
cmd.extend(args.len().to_string().as_bytes());
cmd.push('\r' as u8);
cmd.push('\n' as u8);
{
let mut encode = |item: &[u8]| {
cmd.push('$' as u8);
cmd.extend(item.len().to_string().as_bytes());
cmd.push('\r' as u8);
cmd.push('\n' as u8);
cmd.extend(item.iter());
cmd.push('\r' as u8);
cmd.push('\n' as u8);
};
for item in args.iter() {
match *item {
Arg::Cursor => encode(cursor.to_string().as_bytes()),
Arg::Simple(ref val) => encode(val),
Arg::Borrowed(ptr) => encode(ptr),
}
}
}
cmd
}
fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
let mut rv = vec![];
if atomic {
rv.extend(cmd("MULTI").get_packed_command().into_iter());
}
for cmd in cmds.iter() {
rv.extend(cmd.get_packed_command().into_iter());
}
if atomic {
rv.extend(cmd("EXEC").get_packed_command().into_iter());
}
rv
}
impl Cmd {
pub fn new() -> Cmd {
Cmd { args: vec![], cursor: None, is_ignored: false }
}
#[inline]
pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
for item in arg.to_redis_args().into_iter() {
self.args.push(Arg::Simple(item));
}
self
}
#[inline]
pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
assert!(!self.in_scan_mode());
self.cursor = Some(cursor);
self.args.push(Arg::Cursor);
self
}
#[inline]
pub fn get_packed_command(&self) -> Vec<u8> {
encode_command(&self.args, self.cursor.unwrap_or(0))
}
#[inline]
fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
if !self.in_scan_mode() {
None
} else {
Some(encode_command(&self.args, cursor))
}
}
#[inline]
pub fn in_scan_mode(&self) -> bool {
self.cursor.is_some()
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
let pcmd = self.get_packed_command();
match con.req_packed_command(&pcmd) {
Ok(val) => from_redis_value(&val),
Err(e) => Err(e),
}
}
#[inline]
pub fn iter<'a, T: FromRedisValue>(&self, con: &'a ConnectionLike)
-> RedisResult<Iter<'a, T>> {
let pcmd = self.get_packed_command();
let rv = try!(con.req_packed_command(&pcmd));
let mut batch : Vec<T>;
let mut cursor = 0;
if rv.looks_like_cursor() {
let (next, b) : (u64, Vec<T>) = try!(from_redis_value(&rv));
batch = b;
cursor = next;
} else {
batch = try!(from_redis_value(&rv));
}
batch.reverse();
Ok(Iter {
batch: batch,
cursor: cursor,
con: con,
cmd: self.clone(),
})
}
#[inline]
pub fn execute(&self, con: &ConnectionLike) {
let _ : () = self.query(con).unwrap();
}
}
impl Pipeline {
pub fn new() -> Pipeline {
Pipeline { commands: vec![], transaction_mode: false }
}
#[inline]
pub fn cmd(&mut self, name: &str) -> &mut Pipeline {
self.commands.push(cmd(name));
self
}
#[inline]
pub fn add_command(&mut self, cmd: &Cmd) -> &mut Pipeline {
self.commands.push(cmd.clone());
self
}
#[inline]
fn get_last_command(&mut self) -> &mut Cmd {
let idx = match self.commands.len() {
0 => panic!("No command on stack"),
x => x - 1,
};
&mut self.commands[idx]
}
#[inline]
pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Pipeline {
{
let cmd = self.get_last_command();
cmd.arg(arg);
}
self
}
#[inline]
pub fn ignore(&mut self) -> &mut Pipeline {
{
let cmd = self.get_last_command();
cmd.is_ignored = true;
}
self
}
#[inline]
pub fn atomic(&mut self) -> &mut Pipeline {
self.transaction_mode = true;
self
}
fn make_pipeline_results(&self, resp: Vec<Value>) -> Value {
let mut rv = vec![];
for (idx, result) in resp.into_iter().enumerate() {
if !self.commands[idx].is_ignored {
rv.push(result);
}
}
Value::Bulk(rv)
}
fn execute_pipelined(&self, con: &ConnectionLike) -> RedisResult<Value> {
Ok(self.make_pipeline_results(try!(con.req_packed_commands(
&encode_pipeline(&self.commands, false),
0, self.commands.len()))))
}
fn execute_transaction(&self, con: &ConnectionLike) -> RedisResult<Value> {
let mut resp = try!(con.req_packed_commands(
&encode_pipeline(&self.commands, true),
self.commands.len() + 1, 1));
match resp.pop() {
Some(Value::Nil) => Ok(Value::Nil),
Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)),
_ => fail!((ErrorKind::ResponseError,
"Invalid response when parsing multi response"))
}
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &ConnectionLike) -> RedisResult<T> {
from_redis_value(&(
if self.commands.len() == 0 {
Value::Bulk(vec![])
} else if self.transaction_mode {
try!(self.execute_transaction(con))
} else {
try!(self.execute_pipelined(con))
}
))
}
#[inline]
pub fn execute(&self, con: &ConnectionLike) {
let _ : () = self.query(con).unwrap();
}
}
pub fn cmd<'a>(name: &'a str) -> Cmd {
let mut rv = Cmd::new();
rv.arg(name);
rv
}
pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
encode_command(&args.iter().map(|x| Arg::Borrowed(x)).collect(), 0)
}
pub fn pipe() -> Pipeline {
Pipeline::new()
}