use std::{fmt, io};
use crate::connection::ConnectionLike;
use crate::pipeline::Pipeline;
use crate::types::{from_redis_value, FromRedisValue, RedisResult, RedisWrite, ToRedisArgs};
#[derive(Clone)]
pub enum Arg<D> {
Simple(D),
Cursor,
}
#[derive(Clone)]
pub struct Cmd {
data: Vec<u8>,
args: Vec<Arg<usize>>,
cursor: Option<u64>,
}
pub struct Iter<'a, T: FromRedisValue> {
batch: std::vec::IntoIter<T>,
cursor: u64,
con: &'a mut (dyn ConnectionLike + 'a),
cmd: Cmd,
}
impl<'a, T: FromRedisValue> Iterator for Iter<'a, T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<T> {
loop {
if let Some(v) = self.batch.next() {
return Some(v);
};
if self.cursor == 0 {
return None;
}
let pcmd = unwrap_or!(
self.cmd.get_packed_command_with_cursor(self.cursor),
return None
);
let rv = unwrap_or!(self.con.req_packed_command(&pcmd).ok(), return None);
let (cur, batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(), return None);
self.cursor = cur;
self.batch = batch.into_iter();
}
}
}
#[cfg(feature = "aio")]
use crate::aio::ConnectionLike as AsyncConnection;
#[cfg(feature = "aio")]
pub struct AsyncIter<'a, T: FromRedisValue + 'a> {
batch: std::vec::IntoIter<T>,
con: &'a mut (dyn AsyncConnection + Send + 'a),
cmd: Cmd,
}
#[cfg(feature = "aio")]
impl<'a, T: FromRedisValue + 'a> AsyncIter<'a, T> {
#[inline]
pub async fn next_item(&mut self) -> Option<T> {
loop {
if let Some(v) = self.batch.next() {
return Some(v);
};
if let Some(cursor) = self.cmd.cursor {
if cursor == 0 {
return None;
}
} else {
return None;
}
let rv = unwrap_or!(
self.con.req_packed_command(&self.cmd).await.ok(),
return None
);
let (cur, batch): (u64, Vec<T>) = unwrap_or!(from_redis_value(&rv).ok(), return None);
self.cmd.cursor = Some(cur);
self.batch = batch.into_iter();
}
}
}
fn countdigits(mut v: usize) -> usize {
let mut result = 1;
loop {
if v < 10 {
return result;
}
if v < 100 {
return result + 1;
}
if v < 1000 {
return result + 2;
}
if v < 10000 {
return result + 3;
}
v /= 10000;
result += 4;
}
}
#[inline]
fn bulklen(len: usize) -> usize {
1 + countdigits(len) + 2 + len + 2
}
fn args_len<'a, I>(args: I, cursor: u64) -> usize
where
I: IntoIterator<Item = Arg<&'a [u8]>> + ExactSizeIterator,
{
let mut totlen = 1 + countdigits(args.len()) + 2;
for item in args {
totlen += bulklen(match item {
Arg::Cursor => countdigits(cursor as usize),
Arg::Simple(val) => val.len(),
});
}
totlen
}
pub(crate) fn cmd_len(cmd: &Cmd) -> usize {
args_len(cmd.args_iter(), cmd.cursor.unwrap_or(0))
}
fn encode_command<'a, I>(args: I, cursor: u64) -> Vec<u8>
where
I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
{
let mut cmd = Vec::new();
write_command_to_vec(&mut cmd, args, cursor);
cmd
}
fn write_command_to_vec<'a, I>(cmd: &mut Vec<u8>, args: I, cursor: u64)
where
I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
{
let totlen = args_len(args.clone(), cursor);
cmd.reserve(totlen);
write_command(cmd, args, cursor).unwrap()
}
fn write_command<'a, I>(cmd: &mut (impl ?Sized + io::Write), args: I, cursor: u64) -> io::Result<()>
where
I: IntoIterator<Item = Arg<&'a [u8]>> + Clone + ExactSizeIterator,
{
cmd.write_all(b"*")?;
::itoa::write(&mut *cmd, args.len())?;
cmd.write_all(b"\r\n")?;
let mut cursor_bytes = itoa::Buffer::new();
for item in args {
let bytes = match item {
Arg::Cursor => cursor_bytes.format(cursor).as_bytes(),
Arg::Simple(val) => val,
};
cmd.write_all(b"$")?;
::itoa::write(&mut *cmd, bytes.len())?;
cmd.write_all(b"\r\n")?;
cmd.write_all(bytes)?;
cmd.write_all(b"\r\n")?;
}
Ok(())
}
impl RedisWrite for Cmd {
fn write_arg(&mut self, arg: &[u8]) {
self.data.extend_from_slice(arg);
self.args.push(Arg::Simple(self.data.len()));
}
fn write_arg_fmt(&mut self, arg: impl fmt::Display) {
use std::io::Write;
write!(self.data, "{}", arg).unwrap();
self.args.push(Arg::Simple(self.data.len()));
}
}
impl Default for Cmd {
fn default() -> Cmd {
Cmd::new()
}
}
impl Cmd {
pub fn new() -> Cmd {
Cmd {
data: vec![],
args: vec![],
cursor: None,
}
}
#[inline]
pub fn arg<T: ToRedisArgs>(&mut self, arg: T) -> &mut Cmd {
arg.write_redis_args(self);
self
}
#[inline]
pub fn cursor_arg(&mut self, cursor: u64) -> &mut Cmd {
assert!(!self.in_scan_mode());
self.cursor = Some(cursor);
self.args.push(Arg::Cursor);
self
}
#[inline]
pub fn get_packed_command(&self) -> Vec<u8> {
let mut cmd = Vec::new();
self.write_packed_command(&mut cmd);
cmd
}
pub(crate) fn write_packed_command(&self, cmd: &mut Vec<u8>) {
write_command_to_vec(cmd, self.args_iter(), self.cursor.unwrap_or(0))
}
pub(crate) fn write_packed_command_preallocated(&self, cmd: &mut Vec<u8>) {
write_command(cmd, self.args_iter(), self.cursor.unwrap_or(0)).unwrap()
}
#[inline]
fn get_packed_command_with_cursor(&self, cursor: u64) -> Option<Vec<u8>> {
if !self.in_scan_mode() {
None
} else {
Some(encode_command(self.args_iter(), cursor))
}
}
#[inline]
pub fn in_scan_mode(&self) -> bool {
self.cursor.is_some()
}
#[inline]
pub fn query<T: FromRedisValue>(&self, con: &mut dyn ConnectionLike) -> RedisResult<T> {
match con.req_command(self) {
Ok(val) => from_redis_value(&val),
Err(e) => Err(e),
}
}
#[inline]
#[cfg(feature = "aio")]
pub async fn query_async<C, T: FromRedisValue>(&self, con: &mut C) -> RedisResult<T>
where
C: crate::aio::ConnectionLike,
{
let val = con.req_packed_command(self).await?;
from_redis_value(&val)
}
#[inline]
pub fn iter<T: FromRedisValue>(self, con: &mut dyn ConnectionLike) -> RedisResult<Iter<'_, T>> {
let rv = con.req_command(&self)?;
let (cursor, batch) = if rv.looks_like_cursor() {
from_redis_value::<(u64, Vec<T>)>(&rv)?
} else {
(0, from_redis_value(&rv)?)
};
Ok(Iter {
batch: batch.into_iter(),
cursor,
con,
cmd: self,
})
}
#[cfg(feature = "aio")]
#[inline]
pub async fn iter_async<'a, T: FromRedisValue + 'a>(
mut self,
con: &'a mut (dyn AsyncConnection + Send),
) -> RedisResult<AsyncIter<'a, T>> {
let rv = con.req_packed_command(&self).await?;
let (cursor, batch) = if rv.looks_like_cursor() {
from_redis_value::<(u64, Vec<T>)>(&rv)?
} else {
(0, from_redis_value(&rv)?)
};
if cursor == 0 {
self.cursor = None;
} else {
self.cursor = Some(cursor);
}
Ok(AsyncIter {
batch: batch.into_iter(),
con,
cmd: self,
})
}
#[inline]
pub fn execute(&self, con: &mut dyn ConnectionLike) {
self.query::<()>(con).unwrap();
}
pub fn args_iter(&self) -> impl Iterator<Item = Arg<&[u8]>> + Clone + ExactSizeIterator {
let mut prev = 0;
self.args.iter().map(move |arg| match *arg {
Arg::Simple(i) => {
let arg = Arg::Simple(&self.data[prev..i]);
prev = i;
arg
}
Arg::Cursor => Arg::Cursor,
})
}
#[cfg(feature = "cluster")]
pub(crate) fn arg_idx(&self, idx: usize) -> Option<&[u8]> {
if idx >= self.args.len() {
return None;
}
let start = if idx == 0 {
0
} else {
match self.args[idx - 1] {
Arg::Simple(n) => n,
_ => 0,
}
};
let end = match self.args[idx] {
Arg::Simple(n) => n,
_ => 0,
};
if start == 0 && end == 0 {
return None;
}
Some(&self.data[start..end])
}
}
pub fn cmd(name: &str) -> Cmd {
let mut rv = Cmd::new();
rv.arg(name);
rv
}
pub fn pack_command(args: &[Vec<u8>]) -> Vec<u8> {
encode_command(args.iter().map(|x| Arg::Simple(&x[..])), 0)
}
pub fn pipe() -> Pipeline {
Pipeline::new()
}
#[cfg(test)]
#[cfg(feature = "cluster")]
mod tests {
use super::Cmd;
#[test]
fn test_cmd_arg_idx() {
let mut c = Cmd::new();
assert_eq!(c.arg_idx(0), None);
c.arg("SET");
assert_eq!(c.arg_idx(0), Some(&b"SET"[..]));
assert_eq!(c.arg_idx(1), None);
c.arg("foo").arg("42");
assert_eq!(c.arg_idx(1), Some(&b"foo"[..]));
assert_eq!(c.arg_idx(2), Some(&b"42"[..]));
assert_eq!(c.arg_idx(3), None);
assert_eq!(c.arg_idx(4), None);
}
}