use std::io;
use kevy_resp::Reply;
use kevy_resp_client::RespClient;
use crate::{Connection, string, unexpected, vec2, vec3};
pub struct Transaction<'a> {
client: &'a mut RespClient,
live: bool,
}
impl std::fmt::Debug for Transaction<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Transaction")
.field("live", &self.live)
.finish_non_exhaustive()
}
}
impl Connection {
pub fn multi(&mut self) -> io::Result<Transaction<'_>> {
match self {
Self::Embedded(_) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"MULTI/EXEC is not implemented for the embedded backend; \
call Connection methods directly (each is atomic on its own lock)",
)),
Self::Remote(client) => match client.request(&[b"MULTI".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(Transaction {
client,
live: true,
}),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn watch(&mut self, keys: &[&[u8]]) -> io::Result<()> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"WATCH needs at least one key",
));
}
match self {
Self::Embedded(_) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"WATCH is a transaction primitive; embedded backend has no MULTI",
)),
Self::Remote(c) => {
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"WATCH".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
match c.request(&args)? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn unwatch(&mut self) -> io::Result<()> {
match self {
Self::Embedded(_) => Err(io::Error::new(
io::ErrorKind::Unsupported,
"UNWATCH is a transaction primitive; embedded backend has no MULTI",
)),
Self::Remote(c) => match c.request(&[b"UNWATCH".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
}
impl<'a> Transaction<'a> {
pub fn queue(&mut self, parts: &[&[u8]]) -> io::Result<()> {
if parts.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::queue needs at least a verb",
));
}
let argv: Vec<Vec<u8>> = parts.iter().map(|p| p.to_vec()).collect();
self.queue_argv(argv)
}
pub fn exec(mut self) -> io::Result<Vec<Reply>> {
self.live = false;
match self.client.request(&[b"EXEC".to_vec()])? {
Reply::Array(items) => Ok(items),
Reply::Nil => Ok(Vec::new()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
pub fn exec_watched(mut self) -> io::Result<Option<Vec<Reply>>> {
self.live = false;
match self.client.request(&[b"EXEC".to_vec()])? {
Reply::Array(items) => Ok(Some(items)),
Reply::Nil => Ok(None),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
pub fn discard(mut self) -> io::Result<()> {
self.live = false;
match self.client.request(&[b"DISCARD".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
impl<'a> Transaction<'a> {
pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<&mut Self> {
self.queue_argv(vec3(b"SET", key, value))?;
Ok(self)
}
pub fn get(&mut self, key: &[u8]) -> io::Result<&mut Self> {
self.queue_argv(vec2(b"GET", key))?;
Ok(self)
}
pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::del needs at least one key",
));
}
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"DEL".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
self.queue_argv(args)?;
Ok(self)
}
pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::exists needs at least one key",
));
}
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"EXISTS".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
self.queue_argv(args)?;
Ok(self)
}
pub fn incr(&mut self, key: &[u8]) -> io::Result<&mut Self> {
self.queue_argv(vec2(b"INCR", key))?;
Ok(self)
}
pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<&mut Self> {
let args = vec![
b"INCRBY".to_vec(),
key.to_vec(),
delta.to_string().into_bytes(),
];
self.queue_argv(args)?;
Ok(self)
}
pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<&mut Self> {
if keys.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::mget needs at least one key",
));
}
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"MGET".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
self.queue_argv(args)?;
Ok(self)
}
pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<&mut Self> {
if pairs.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Transaction::mset needs at least one (key, value) pair",
));
}
let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
args.push(b"MSET".to_vec());
for (k, v) in pairs {
args.push(k.to_vec());
args.push(v.to_vec());
}
self.queue_argv(args)?;
Ok(self)
}
fn queue_argv(&mut self, argv: Vec<Vec<u8>>) -> io::Result<()> {
match self.client.request(&argv)? {
Reply::Simple(s) if s == b"QUEUED" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
impl Drop for Transaction<'_> {
fn drop(&mut self) {
if self.live {
let _ = self.client.request(&[b"DISCARD".to_vec()]);
}
}
}