use crate::{
Result,
client::{Client, PreparedCommand},
commands::{
BitmapCommands, BloomCommands, ClusterCommands, ConnectionCommands, CountMinSketchCommands,
CuckooCommands, GenericCommands, GeoCommands, HashCommands, HyperLogLogCommands,
JsonCommands, ListCommands, ScriptingCommands, SearchCommands, ServerCommands, SetCommands,
SortedSetCommands, StreamCommands, StringCommands, TDigestCommands, TimeSeriesCommands,
TopKCommands, VectorSetCommands,
},
resp::{Command, RespBatchDeserializer, Response},
};
use serde::de::DeserializeOwned;
use smallvec::SmallVec;
pub struct Pipeline<'a> {
client: &'a Client,
commands: SmallVec<[Command; 10]>,
forget_flags: SmallVec<[bool; 10]>,
retry_on_error: Option<bool>,
}
impl Pipeline<'_> {
pub(crate) fn new<'a>(client: &'a Client) -> Pipeline<'a> {
Pipeline {
client,
commands: SmallVec::new(),
forget_flags: SmallVec::new(),
retry_on_error: None,
}
}
pub fn reserve(&mut self, additional: usize) {
self.commands.reserve(additional);
self.forget_flags.reserve(additional);
}
pub fn retry_on_error(&mut self, retry_on_error: bool) {
self.retry_on_error = Some(retry_on_error);
}
pub fn queue(&mut self, command: impl Into<Command>) {
self.commands.push(command.into());
self.forget_flags.push(false);
}
pub fn forget(&mut self, command: impl Into<Command>) {
self.commands.push(command.into());
self.forget_flags.push(true);
}
pub async fn execute<T: DeserializeOwned>(self) -> Result<T> {
let num_commands = self.commands.len();
let mut results = self
.client
.internal_send_batch(self.commands, self.retry_on_error)
.await?;
if num_commands > 1 {
if !self.forget_flags.is_empty() {
let mut idx = 0;
results.retain(|_| {
let keep = !self.forget_flags[idx];
idx += 1;
keep
});
}
if results.len() == 1 {
let result = results.pop().unwrap();
result.to()
} else {
let deserializer = RespBatchDeserializer::new(&results);
T::deserialize(&deserializer)
}
} else {
results[0].to()
}
}
}
pub trait BatchPreparedCommand<R = ()> {
fn queue(self);
fn forget(self);
}
impl<'a, R: Response> BatchPreparedCommand for PreparedCommand<'a, &'a mut Pipeline<'_>, R> {
#[inline]
fn queue(self) {
self.executor.queue(self.command)
}
#[inline]
fn forget(self) {
self.executor.forget(self.command)
}
}
impl<'a> BitmapCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> BloomCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> ClusterCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> ConnectionCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> CountMinSketchCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> CuckooCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> GenericCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> GeoCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> HashCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> HyperLogLogCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> JsonCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> ListCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> SearchCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> SetCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> ScriptingCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> ServerCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> SortedSetCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> StreamCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> StringCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> TDigestCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> TimeSeriesCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> TopKCommands<'a> for &'a mut Pipeline<'_> {}
impl<'a> VectorSetCommands<'a> for &'a Pipeline<'_> {}