use std::io;
use kevy_resp::Reply;
use kevy_resp_client::RespClient;
use crate::{Connection, string, unexpected, vec2, vec3};
pub struct Transaction<'a> {
client: &'a mut RespClient,
live: bool,
}
impl std::fmt::Debug for Transaction<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transaction")
.field("live", &self.live)
.finish_non_exhaustive()
}
}
impl Connection {
pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
match self {
Self::Embedded(_) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"MULTI/EXEC is not implemented for the embedded backend; \
call Connection methods directly (each is atomic on its own lock)",
)),
Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(Transaction {
client,
live: true,
}),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn watch(&mut self, keys: &[&[u8]]) -> io::Result<()> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"WATCH needs at least one key",
));
}
match self {
Self::Embedded(_) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"WATCH is a transaction primitive; embedded backend has no MULTI",
)),
Self::Remote(c) => {
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"WATCH".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
match c.request(&args)? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn unwatch(&mut self) -> io::Result<()> {
match self {
Self::Embedded(_) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"UNWATCH is a transaction primitive; embedded backend has no MULTI",
)),
Self::Remote(c) => match c.request(&[b"UNWATCH".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
}
impl<'a> Transaction<'a> {
pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
if parts.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::queue needs at least a verb",
));
}
let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
self.queue_argv(argv)
}
pub fn exec(mut self) -> io::Result<Vec<Reply>> {
self.live = false;
match self.client.request(&[b"EXEC".to_vec()])? {
Reply::Array(items) => Ok(items),
Reply::Nil => Ok(Vec::new()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
pub fn exec_watched(mut self) -> io::Result<Option<Vec<Reply>>> {
self.live = false;
match self.client.request(&[b"EXEC".to_vec()])? {
Reply::Array(items) => Ok(Some(items)),
Reply::Nil => Ok(None),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
pub fn exec_typed(mut self) -> io::Result<TransactionReplies> {
self.live = false;
match self.client.request(&[b"EXEC".to_vec()])? {
Reply::Array(items) => Ok(TransactionReplies::new(items)),
Reply::Nil => Err(io::Error::new(
io::ErrorKind::InvalidData,
"transaction aborted by WATCH",
)),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
pub fn exec_watched_typed(mut self) -> io::Result<Option<TransactionReplies>> {
self.live = false;
match self.client.request(&[b"EXEC".to_vec()])? {
Reply::Array(items) => Ok(Some(TransactionReplies::new(items))),
Reply::Nil => Ok(None),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
pub fn discard(mut self) -> io::Result<()> {
self.live = false;
match self.client.request(&[b"DISCARD".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
impl<'a> Transaction<'a> {
pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<&mut Self> {
self.queue_argv(vec3(b"SET", key, value))?;
Ok(self)
}
pub fn get(&mut self, key: &[u8]) -> io::Result<&mut Self> {
self.queue_argv(vec2(b"GET", key))?;
Ok(self)
}
pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::del needs at least one key",
));
}
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"DEL".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
self.queue_argv(args)?;
Ok(self)
}
pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::exists needs at least one key",
));
}
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"EXISTS".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
self.queue_argv(args)?;
Ok(self)
}
pub fn incr(&mut self, key: &[u8]) -> io::Result<&mut Self> {
self.queue_argv(vec2(b"INCR", key))?;
Ok(self)
}
pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<&mut Self> {
let args = vec![
b"INCRBY".to_vec(),
key.to_vec(),
delta.to_string().into_bytes(),
];
self.queue_argv(args)?;
Ok(self)
}
pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::mget needs at least one key",
));
}
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"MGET".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
self.queue_argv(args)?;
Ok(self)
}
pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<&mut Self> {
if pairs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::mset needs at least one (key, value) pair",
));
}
let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
args.push(b"MSET".to_vec());
for (k, v) in pairs {
args.push(k.to_vec());
args.push(v.to_vec());
}
self.queue_argv(args)?;
Ok(self)
}
fn queue_argv(&mut self, argv: Vec<Vec<u8>>) -> io::Result<()> {
match self.client.request(&argv)? {
Reply::Simple(s) if s == b"QUEUED" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
if self.live {
let _ = self.client.request(&[b"DISCARD".to_vec()]);
}
}
}
#[derive(Debug)]
pub struct TransactionReplies {
iter: std::vec::IntoIter<Reply>,
}
impl TransactionReplies {
fn new(items: Vec<Reply>) -> Self {
Self { iter: items.into_iter() }
}
pub fn remaining(&self) -> usize {
self.iter.len()
}
pub fn expect_empty(&mut self) -> io::Result<()> {
let left = self.remaining();
if left == 0 {
Ok(())
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("transaction reply cursor has {left} un-consumed replies"),
))
}
}
pub fn raw(&mut self) -> io::Result<Reply> {
self.iter
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "exhausted"))
}
pub fn next_ok(&mut self) -> io::Result<()> {
match self.raw()? {
Reply::Simple(s) if s == b"OK" => Ok(()),
other => Err(mismatch("Simple(OK)", &other)),
}
}
pub fn next_ok_or_nil(&mut self) -> io::Result<bool> {
match self.raw()? {
Reply::Simple(s) if s == b"OK" => Ok(true),
Reply::Nil => Ok(false),
other => Err(mismatch("Simple(OK) or Nil", &other)),
}
}
pub fn next_int(&mut self) -> io::Result<i64> {
match self.raw()? {
Reply::Int(n) => Ok(n),
other => Err(mismatch("Int", &other)),
}
}
pub fn next_bulk(&mut self) -> io::Result<Option<Vec<u8>>> {
match self.raw()? {
Reply::Bulk(b) => Ok(Some(b)),
Reply::Nil => Ok(None),
other => Err(mismatch("Bulk or Nil", &other)),
}
}
pub fn next_array_of_bulks(&mut self) -> io::Result<Vec<Option<Vec<u8>>>> {
let items = match self.raw()? {
Reply::Array(v) => v,
Reply::Nil => return Ok(Vec::new()),
other => return Err(mismatch("Array", &other)),
};
items
.into_iter()
.map(|r| match r {
Reply::Bulk(b) => Ok(Some(b)),
Reply::Nil => Ok(None),
other => Err(mismatch("Array element Bulk/Nil", &other)),
})
.collect()
}
pub fn next_simple(&mut self) -> io::Result<Vec<u8>> {
match self.raw()? {
Reply::Simple(s) => Ok(s),
other => Err(mismatch("Simple", &other)),
}
}
}
fn mismatch(want: &str, got: &Reply) -> io::Error {
io::Error::new(
io::ErrorKind::InvalidData,
format!("transaction reply mismatch: expected {want}, got {got:?}"),
)
}