#![macro_use]
use serde::{Deserialize, Serialize};
use crate::cmd::{cmd, cmd_len, Cmd};
use crate::connection::ConnectionLike;
use crate::types::{
from_redis_value, ErrorKind, FromRedisValue, HashSet, RedisResult, ToRedisArgs, Value,
};
#[derive(Clone, Deserialize, Serialize)]
pub struct Pipeline {
commands: Vec<Cmd>,
transaction_mode: bool,
ignored_commands: HashSet<usize>,
}
impl Pipeline {
pub fn new() -> Pipeline {
Self::with_capacity(0)
}
pub fn with_capacity(capacity: usize) -> Pipeline {
Pipeline {
commands: Vec::with_capacity(capacity),
transaction_mode: false,
ignored_commands: HashSet::new(),
}
}
#[inline]
pub fn atomic(&mut self) -> &mut Pipeline {
self.transaction_mode = true;
self
}
pub fn get_packed_pipeline(&self) -> Vec<u8> {
encode_pipeline(&self.commands, self.transaction_mode)
}
fn execute_pipelined(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
Ok(self.make_pipeline_results(con.req_packed_commands(
&encode_pipeline(&self.commands, false),
0,
self.commands.len(),
)?))
}
fn execute_transaction(&self, con: &mut dyn ConnectionLike) -> RedisResult<Value> {
let mut resp = con.req_packed_commands(
&encode_pipeline(&self.commands, true),
self.commands.len() + 1,
1,
)?;
match resp.pop() {
Some(Value::Nil) => Ok(Value::Nil),
Some(Value::Bulk(items)) => Ok(self.make_pipeline_results(items)),
_ => fail!((
ErrorKind::ResponseError,
"Invalid response when parsing multi response"
)),
}
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
if !con.supports_pipelining() {
fail!((
ErrorKind::ResponseError,
"This connection does not support pipelining."
));
}
from_redis_value(
&(if self.commands.is_empty() {
Value::Bulk(vec![])
} else if self.transaction_mode {
self.execute_transaction(con)?
} else {
self.execute_pipelined(con)?
}),
)
}
#[inline]
pub fn execute(&self, con: &mut dyn ConnectionLike) {
self.query::<()>(con).unwrap();
}
}
fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec<u8> {
let mut rv = vec![];
write_pipeline(&mut rv, cmds, atomic);
rv
}
fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Cmd], atomic: bool) {
let cmds_len = cmds.iter().map(cmd_len).sum();
if atomic {
let multi = cmd("MULTI");
let exec = cmd("EXEC");
rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
multi.write_packed_command_preallocated(rv);
for cmd in cmds {
cmd.write_packed_command_preallocated(rv);
}
exec.write_packed_command_preallocated(rv);
} else {
rv.reserve(cmds_len);
for cmd in cmds {
cmd.write_packed_command_preallocated(rv);
}
}
}
macro_rules! implement_pipeline_commands {
($struct_name:ident) => {
impl $struct_name {
#[inline]
pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
self.commands.push(cmd);
self
}
#[inline]
pub fn cmd(&mut self, name: &str) -> &mut Self {
self.add_command(cmd(name))
}
pub fn cmd_iter(&self) -> impl Iterator<Item = &Cmd> {
self.commands.iter()
}
#[inline]
pub fn ignore(&mut self) -> &mut Self {
match self.commands.len() {
0 => true,
x => self.ignored_commands.insert(x - 1),
};
self
}
#[inline]
pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Self {
{
let cmd = self.get_last_command();
cmd.arg(arg);
}
self
}
#[inline]
pub fn clear(&mut self) {
self.commands.clear();
self.ignored_commands.clear();
}
#[inline]
fn get_last_command(&mut self) -> &mut Cmd {
let idx = match self.commands.len() {
0 => panic!("No command on stack"),
x => x - 1,
};
&mut self.commands[idx]
}
fn make_pipeline_results(&self, resp: Vec<Value>) -> Value {
let mut rv = vec![];
for (idx, result) in resp.into_iter().enumerate() {
if !self.ignored_commands.contains(&idx) {
rv.push(result);
}
}
Value::Bulk(rv)
}
}
impl Default for $struct_name {
fn default() -> Self {
Self::new()
}
}
};
}
implement_pipeline_commands!(Pipeline);