#![macro_use]
use crate::cmd::{Cmd, cmd, cmd_len};
use crate::value::{
ErrorKind, FromValue, HashSet, Result, ToArgs, Value, from_owned_value,
};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct Pipeline {
commands: Vec<Arc<Cmd>>,
transaction_mode: bool,
ignored_commands: HashSet<usize>,
}
impl Pipeline {
pub fn new() -> Pipeline {
Self::with_capacity(0)
}
pub(crate) fn with_capacity(capacity: usize) -> Pipeline {
Pipeline {
commands: Vec::with_capacity(capacity),
transaction_mode: false,
ignored_commands: HashSet::new(),
}
}
#[inline]
pub fn atomic(&mut self) -> &mut Pipeline {
self.transaction_mode = true;
self
}
pub(crate) fn get_packed_pipeline(&self) -> bytes::Bytes {
encode_pipeline(&self.commands, self.transaction_mode).into()
}
async fn execute_pipelined_async<C>(&self, con: &mut C) -> Result<Value>
where
C: crate::connection::ConnectionLike,
{
let value = con
.req_packed_commands(self, 0, self.commands.len(), None)
.await?;
self.make_pipeline_results(value)
}
async fn execute_transaction_async<C>(&self, con: &mut C) -> Result<Value>
where
C: crate::connection::ConnectionLike,
{
let mut resp = con
.req_packed_commands(self, self.commands.len() + 1, 1, None)
.await?;
match resp.pop() {
Some(Ok(Value::Nil)) => Ok(Value::Nil),
Some(Ok(Value::Array(items))) => Ok(self.make_pipeline_results(items)?),
Some(Err(e)) => Err(e),
_ => Err((
ErrorKind::ResponseError,
"Invalid response when parsing multi response",
)
.into()),
}
}
#[inline]
pub async fn query_async<C, T: FromValue>(&self, con: &mut C) -> Result<T>
where
C: crate::connection::ConnectionLike,
{
let v = if self.commands.is_empty() {
return from_owned_value(Value::Array(vec![]));
} else if self.transaction_mode {
self.execute_transaction_async(con).await?
} else {
self.execute_pipelined_async(con).await?
};
from_owned_value(v)
}
pub(crate) fn is_atomic(&self) -> bool {
self.transaction_mode
}
pub(crate) fn len(&self) -> usize {
self.commands.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.commands.is_empty()
}
pub(crate) fn get_command(&self, index: usize) -> Option<Arc<Cmd>> {
self.commands.get(index).cloned()
}
}
fn encode_pipeline(cmds: &[Arc<Cmd>], atomic: bool) -> Vec<u8> {
let mut rv = vec![];
write_pipeline(&mut rv, cmds, atomic);
rv
}
fn write_pipeline(rv: &mut Vec<u8>, cmds: &[Arc<Cmd>], atomic: bool) {
let cmds_len = cmds.iter().map(cmd_len).sum();
if atomic {
let multi = cmd("MULTI");
let exec = cmd("EXEC");
rv.reserve(cmd_len(&multi) + cmd_len(&exec) + cmds_len);
multi.write_packed_command_preallocated(rv);
for cmd in cmds {
cmd.write_packed_command_preallocated(rv);
}
exec.write_packed_command_preallocated(rv);
} else {
rv.reserve(cmds_len);
for cmd in cmds {
cmd.write_packed_command_preallocated(rv);
}
}
}
macro_rules! implement_pipeline_commands {
($struct_name:ident) => {
impl $struct_name {
#[inline]
pub fn add_command(&mut self, cmd: Cmd) -> &mut Self {
self.add_command_with_arc(cmd.into())
}
#[inline]
pub(crate) fn add_command_with_arc(&mut self, cmd: Arc<Cmd>) -> &mut Self {
self.commands.push(cmd);
self
}
#[inline]
pub fn cmd(&mut self, name: &str) -> &mut Self {
self.add_command(cmd(name))
}
pub fn cmd_iter(&self) -> impl Iterator<Item = &Arc<Cmd>> {
self.commands.iter()
}
#[inline]
pub fn ignore(&mut self) -> &mut Self {
match self.commands.len() {
0 => true,
x => self.ignored_commands.insert(x - 1),
};
self
}
#[inline]
pub fn arg<T: ToArgs>(&mut self, arg: T) -> &mut Self {
{
let cmd = self.get_last_command();
cmd.arg(arg);
}
self
}
#[inline]
pub fn clear(&mut self) {
self.commands.clear();
self.ignored_commands.clear();
}
#[inline]
fn get_last_command(&mut self) -> &mut Cmd {
let idx = self.commands.len().checked_sub(1)
.expect("No command on stack");
Arc::get_mut(&mut self.commands[idx]).expect("Cannot modify the last command: multiple active references exist. Ensure the command is uniquely owned before mutating.")
}
fn make_pipeline_results(&self, resp: Vec<Result<Value>>) -> Result<Value> {
let mut rv = Vec::with_capacity(resp.len().saturating_sub(self.ignored_commands.len()));
for (idx, result) in resp.into_iter().enumerate() {
if !self.ignored_commands.contains(&idx) {
rv.push(result);
}
}
Ok(Value::Array(rv))
}
}
impl Default for $struct_name {
fn default() -> Self {
Self::new()
}
}
};
}
implement_pipeline_commands!(Pipeline);
#[derive(Debug, Clone, Copy, Default)]
pub struct PipelineRetryStrategy {
pub retry_server_error: bool,
pub retry_connection_error: bool,
}
impl PipelineRetryStrategy {
pub fn new(retry_server_error: bool, retry_connection_error: bool) -> Self {
Self {
retry_server_error,
retry_connection_error,
}
}
}