#[cfg(feature = "redis-graph")]
use crate::commands::GraphCommands;
#[cfg(feature = "redis-json")]
use crate::commands::JsonCommands;
#[cfg(feature = "redis-search")]
use crate::commands::SearchCommands;
#[cfg(feature = "redis-time-series")]
use crate::commands::TimeSeriesCommands;
#[cfg(feature = "redis-bloom")]
use crate::commands::{
BloomCommands, CountMinSketchCommands, CuckooCommands, TDigestCommands, TopKCommands,
};
use crate::{
client::{Client, PreparedCommand},
commands::{
BitmapCommands, ClusterCommands, ConnectionCommands, GenericCommands, GeoCommands,
HashCommands, HyperLogLogCommands, ListCommands, ScriptingCommands, ServerCommands,
SetCommands, SortedSetCommands, StreamCommands, StringCommands,
},
resp::{Command, RespBatchDeserializer, Response},
Result,
};
use serde::de::DeserializeOwned;
use std::iter::zip;
pub struct Pipeline<'a> {
client: &'a Client,
commands: Vec<Command>,
forget_flags: Vec<bool>,
retry_on_error: Option<bool>,
}
impl<'a> Pipeline<'a> {
pub(crate) fn new(client: &'a Client) -> Pipeline {
Pipeline {
client,
commands: Vec::new(),
forget_flags: Vec::new(),
retry_on_error: None,
}
}
pub fn retry_on_error(&mut self, retry_on_error: bool) {
self.retry_on_error = Some(retry_on_error);
}
pub fn queue(&mut self, command: Command) {
self.commands.push(command);
self.forget_flags.push(false);
}
pub fn forget(&mut self, command: Command) {
self.commands.push(command);
self.forget_flags.push(true);
}
pub async fn execute<T: DeserializeOwned>(self) -> Result<T> {
let num_commands = self.commands.len();
let results = self
.client
.send_batch(self.commands, self.retry_on_error)
.await?;
if num_commands > 1 {
let mut filtered_results = zip(results, self.forget_flags.iter())
.filter_map(|(value, forget_flag)| if *forget_flag { None } else { Some(value) })
.collect::<Vec<_>>();
if filtered_results.len() == 1 {
let result = filtered_results.pop().unwrap();
result.to()
} else {
let deserializer = RespBatchDeserializer::new(&filtered_results);
T::deserialize(&deserializer)
}
} else {
results[0].to()
}
}
}
pub trait BatchPreparedCommand<R = ()> {
fn queue(self);
fn forget(self);
}
impl<'a, 'b, R: Response> BatchPreparedCommand for PreparedCommand<'a, &'a mut Pipeline<'b>, R> {
#[inline]
fn queue(self) {
self.executor.queue(self.command)
}
#[inline]
fn forget(self) {
self.executor.forget(self.command)
}
}
impl<'a, 'b> BitmapCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
#[cfg(feature = "redis-bloom")]
impl<'a, 'b> BloomCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> ClusterCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> ConnectionCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
#[cfg(feature = "redis-bloom")]
impl<'a, 'b> CountMinSketchCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
#[cfg(feature = "redis-bloom")]
impl<'a, 'b> CuckooCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> GenericCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> GeoCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-graph")))]
#[cfg(feature = "redis-graph")]
impl<'a, 'b> GraphCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> HashCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> HyperLogLogCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-json")))]
#[cfg(feature = "redis-json")]
impl<'a, 'b> JsonCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> ListCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-search")))]
#[cfg(feature = "redis-search")]
impl<'a, 'b> SearchCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> SetCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> ScriptingCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> ServerCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> SortedSetCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> StreamCommands<'a> for &'a mut Pipeline<'b> {}
impl<'a, 'b> StringCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
#[cfg(feature = "redis-bloom")]
impl<'a, 'b> TDigestCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-time-series")))]
#[cfg(feature = "redis-time-series")]
impl<'a, 'b> TimeSeriesCommands<'a> for &'a mut Pipeline<'b> {}
#[cfg_attr(docsrs, doc(cfg(feature = "redis-bloom")))]
#[cfg(feature = "redis-bloom")]
impl<'a, 'b> TopKCommands<'a> for &'a mut Pipeline<'b> {}