use std::io;
use std::time::Duration;
use kevy_resp::Reply;
use crate::conn::AsyncConnection;
use crate::reply::{vec2, vec3};
#[derive(Default, Clone)]
pub struct Pipeline {
cmds: Vec<Vec<Vec<u8>>>,
}
impl Pipeline {
pub fn new() -> Self {
Self::default()
}
pub fn len(&self) -> usize {
self.cmds.len()
}
pub fn is_empty(&self) -> bool {
self.cmds.is_empty()
}
pub fn push_raw(mut self, argv: Vec<Vec<u8>>) -> Self {
self.cmds.push(argv);
self
}
pub fn get(mut self, key: &[u8]) -> Self {
self.cmds.push(vec2(b"GET", key));
self
}
pub fn set(mut self, key: &[u8], value: &[u8]) -> Self {
self.cmds.push(vec3(b"SET", key, value));
self
}
pub fn set_with_ttl(mut self, key: &[u8], value: &[u8], ttl: Duration) -> Self {
let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
self.cmds.push(vec![
b"SET".to_vec(),
key.to_vec(),
value.to_vec(),
b"PX".to_vec(),
ms.to_string().into_bytes(),
]);
self
}
pub fn del(mut self, keys: &[&[u8]]) -> Self {
let mut argv = Vec::with_capacity(keys.len() + 1);
argv.push(b"DEL".to_vec());
argv.extend(keys.iter().map(|k| k.to_vec()));
self.cmds.push(argv);
self
}
pub fn exists(mut self, keys: &[&[u8]]) -> Self {
let mut argv = Vec::with_capacity(keys.len() + 1);
argv.push(b"EXISTS".to_vec());
argv.extend(keys.iter().map(|k| k.to_vec()));
self.cmds.push(argv);
self
}
pub fn incr(mut self, key: &[u8]) -> Self {
self.cmds.push(vec2(b"INCR", key));
self
}
pub fn incr_by(mut self, key: &[u8], delta: i64) -> Self {
self.cmds.push(vec![
b"INCRBY".to_vec(),
key.to_vec(),
delta.to_string().into_bytes(),
]);
self
}
pub fn expire(mut self, key: &[u8], ttl: Duration) -> Self {
let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
self.cmds.push(vec![
b"PEXPIRE".to_vec(),
key.to_vec(),
ms.to_string().into_bytes(),
]);
self
}
pub fn publish(mut self, channel: &[u8], message: &[u8]) -> Self {
self.cmds.push(vec3(b"PUBLISH", channel, message));
self
}
pub fn hget(mut self, key: &[u8], field: &[u8]) -> Self {
self.cmds.push(vec3(b"HGET", key, field));
self
}
pub fn hset(mut self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> Self {
let mut argv = Vec::with_capacity(2 + pairs.len() * 2);
argv.push(b"HSET".to_vec());
argv.push(key.to_vec());
for (f, v) in pairs {
argv.push(f.to_vec());
argv.push(v.to_vec());
}
self.cmds.push(argv);
self
}
pub fn lpush(mut self, key: &[u8], values: &[&[u8]]) -> Self {
let mut argv = Vec::with_capacity(values.len() + 2);
argv.push(b"LPUSH".to_vec());
argv.push(key.to_vec());
argv.extend(values.iter().map(|v| v.to_vec()));
self.cmds.push(argv);
self
}
pub fn rpush(mut self, key: &[u8], values: &[&[u8]]) -> Self {
let mut argv = Vec::with_capacity(values.len() + 2);
argv.push(b"RPUSH".to_vec());
argv.push(key.to_vec());
argv.extend(values.iter().map(|v| v.to_vec()));
self.cmds.push(argv);
self
}
pub fn sadd(mut self, key: &[u8], members: &[&[u8]]) -> Self {
let mut argv = Vec::with_capacity(members.len() + 2);
argv.push(b"SADD".to_vec());
argv.push(key.to_vec());
argv.extend(members.iter().map(|m| m.to_vec()));
self.cmds.push(argv);
self
}
pub async fn run(self, conn: &mut AsyncConnection) -> io::Result<Vec<Reply>> {
if self.cmds.is_empty() {
return Ok(Vec::new());
}
conn.codec_mut().pipeline(&self.cmds).await
}
pub fn into_cmds(self) -> Vec<Vec<Vec<u8>>> {
self.cmds
}
}
impl AsyncConnection {
pub fn pipeline(&mut self) -> Pipeline {
Pipeline::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builder_accumulates_in_order() {
let p = Pipeline::new()
.set(b"k", b"v")
.get(b"k")
.incr(b"counter")
.del(&[&b"a"[..], &b"b"[..]]);
assert_eq!(p.len(), 4);
let cmds = p.into_cmds();
assert_eq!(cmds[0][0], b"SET");
assert_eq!(cmds[1][0], b"GET");
assert_eq!(cmds[2][0], b"INCR");
assert_eq!(cmds[3], vec![b"DEL".to_vec(), b"a".to_vec(), b"b".to_vec()]);
}
#[test]
fn empty_pipeline_yields_empty_cmds() {
let p = Pipeline::new();
assert!(p.is_empty());
assert_eq!(p.into_cmds(), Vec::<Vec<Vec<u8>>>::new());
}
#[test]
fn push_raw_escape_hatch() {
let cmds = Pipeline::new()
.push_raw(vec![b"CUSTOM".to_vec(), b"arg".to_vec()])
.into_cmds();
assert_eq!(cmds, vec![vec![b"CUSTOM".to_vec(), b"arg".to_vec()]]);
}
}