#![forbid(unsafe_code)]
#![warn(missing_docs)]
use std::io;
use std::time::Duration;
use kevy_embedded::Store;
use kevy_resp::Reply;
use kevy_resp_client::RespClient;
mod collections;
mod scan;
mod subscribe;
mod transaction;
mod url;
pub use subscribe::{PubsubEvent, Subscriber, SubscriberEvents, SubscriberMessages};
pub use transaction::{Transaction, TransactionReplies};
pub(crate) use url::{Target, parse_url, resolve_store};
pub enum Connection {
Embedded(Store),
Remote(RespClient),
}
impl Connection {
pub fn open(url: &str) -> io::Result<Self> {
let parsed = parse_url(url)?;
match parsed {
Target::Remote(remote_url) => Ok(Self::Remote(RespClient::from_url(&remote_url)?)),
embed => Ok(Self::Embedded(resolve_store(&embed)?)),
}
}
pub fn ping(&mut self) -> io::Result<()> {
match self {
Self::Embedded(_) => Ok(()),
Self::Remote(c) => match c.request(&[b"PING".to_vec()])? {
Reply::Simple(s) if s == b"PONG" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn set(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> {
match self {
Self::Embedded(s) => s.set(key, value).map(|_| ()),
Self::Remote(c) => match c.request(&vec3(b"SET", key, value))? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn get(&mut self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
match self {
Self::Embedded(s) => s.get(key),
Self::Remote(c) => match c.request(&vec2(b"GET", key))? {
Reply::Bulk(v) => Ok(Some(v)),
Reply::Nil => Ok(None),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn del(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
match self {
Self::Embedded(s) => s.del(keys),
Self::Remote(c) => {
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"DEL".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
match c.request(&args)? {
Reply::Int(n) if n >= 0 => Ok(n as usize),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn exists(&mut self, keys: &[&[u8]]) -> io::Result<usize> {
match self {
Self::Embedded(s) => s.exists(keys),
Self::Remote(c) => {
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"EXISTS".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
match c.request(&args)? {
Reply::Int(n) if n >= 0 => Ok(n as usize),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn incr(&mut self, key: &[u8]) -> io::Result<i64> {
match self {
Self::Embedded(s) => s.incr(key),
Self::Remote(c) => match c.request(&vec2(b"INCR", key))? {
Reply::Int(n) => Ok(n),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn incr_by(&mut self, key: &[u8], delta: i64) -> io::Result<i64> {
match self {
Self::Embedded(s) => s.incr_by(key, delta),
Self::Remote(c) => {
let args = vec![
b"INCRBY".to_vec(),
key.to_vec(),
delta.to_string().into_bytes(),
];
match c.request(&args)? {
Reply::Int(n) => Ok(n),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn expire(&mut self, key: &[u8], ttl: Duration) -> io::Result<bool> {
match self {
Self::Embedded(s) => s.expire(key, ttl),
Self::Remote(c) => {
let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
let args = vec![b"PEXPIRE".to_vec(), key.to_vec(), ms.to_string().into_bytes()];
match c.request(&args)? {
Reply::Int(1) => Ok(true),
Reply::Int(0) => Ok(false),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn persist(&mut self, key: &[u8]) -> io::Result<bool> {
match self {
Self::Embedded(s) => s.persist(key),
Self::Remote(c) => match c.request(&vec2(b"PERSIST", key))? {
Reply::Int(1) => Ok(true),
Reply::Int(0) => Ok(false),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn ttl_ms(&mut self, key: &[u8]) -> io::Result<i64> {
match self {
Self::Embedded(s) => Ok(s.ttl_ms(key)),
Self::Remote(c) => match c.request(&vec2(b"PTTL", key))? {
Reply::Int(n) => Ok(n),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn type_of(&mut self, key: &[u8]) -> io::Result<String> {
match self {
Self::Embedded(s) => Ok(s.type_of(key).to_string()),
Self::Remote(c) => match c.request(&vec2(b"TYPE", key))? {
Reply::Simple(s) => Ok(string(s)),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn dbsize(&mut self) -> io::Result<usize> {
match self {
Self::Embedded(s) => Ok(s.dbsize()),
Self::Remote(c) => match c.request(&[b"DBSIZE".to_vec()])? {
Reply::Int(n) if n >= 0 => Ok(n as usize),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn flush(&mut self) -> io::Result<()> {
match self {
Self::Embedded(s) => s.flush(),
Self::Remote(c) => match c.request(&[b"FLUSHDB".to_vec()])? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
pub fn set_with_ttl(&mut self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<()> {
match self {
Self::Embedded(s) => s.set_with_ttl(key, value, ttl).map(|_| ()),
Self::Remote(c) => {
let ms = ttl.as_millis().min(i64::MAX as u128) as i64;
let args = vec![
b"SET".to_vec(),
key.to_vec(),
value.to_vec(),
b"PX".to_vec(),
ms.to_string().into_bytes(),
];
match c.request(&args)? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn mget(&mut self, keys: &[&[u8]]) -> io::Result<Vec<Option<Vec<u8>>>> {
match self {
Self::Embedded(s) => keys.iter().map(|k| s.get(k)).collect(),
Self::Remote(c) => {
let mut args = Vec::with_capacity(keys.len() + 1);
args.push(b"MGET".to_vec());
args.extend(keys.iter().map(|k| k.to_vec()));
match c.request(&args)? {
Reply::Array(items) => items
.into_iter()
.map(|r| match r {
Reply::Bulk(v) => Ok(Some(v)),
Reply::Nil => Ok(None),
other => Err(unexpected(other)),
})
.collect(),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn mset(&mut self, pairs: &[(&[u8], &[u8])]) -> io::Result<()> {
match self {
Self::Embedded(s) => {
for (k, v) in pairs {
s.set(k, v)?;
}
Ok(())
}
Self::Remote(c) => {
let mut args = Vec::with_capacity(pairs.len() * 2 + 1);
args.push(b"MSET".to_vec());
for (k, v) in pairs {
args.push(k.to_vec());
args.push(v.to_vec());
}
match c.request(&args)? {
Reply::Simple(s) if s == b"OK" => Ok(()),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
}
}
}
}
pub fn publish(&mut self, channel: &[u8], message: &[u8]) -> io::Result<usize> {
match self {
Self::Embedded(s) => Ok(s.publish(channel, message)),
Self::Remote(c) => match c.request(&vec3(b"PUBLISH", channel, message))? {
Reply::Int(n) if n >= 0 => Ok(n as usize),
Reply::Error(e) => Err(io::Error::other(string(e))),
other => Err(unexpected(other)),
},
}
}
}
pub(crate) fn vec2(verb: &[u8], a: &[u8]) -> Vec<Vec<u8>> {
vec![verb.to_vec(), a.to_vec()]
}
pub(crate) fn vec3(verb: &[u8], a: &[u8], b: &[u8]) -> Vec<Vec<u8>> {
vec![verb.to_vec(), a.to_vec(), b.to_vec()]
}
pub(crate) fn string(b: Vec<u8>) -> String {
String::from_utf8_lossy(&b).into_owned()
}
pub(crate) fn unexpected(r: Reply) -> io::Error {
let kind = match r {
Reply::Simple(_) => "simple-string",
Reply::Error(_) => "error",
Reply::Int(_) => "integer",
Reply::Bulk(_) => "bulk-string",
Reply::Nil => "nil",
Reply::Array(_) => "array",
};
io::Error::other(format!("unexpected RESP reply variant: {kind}"))
}
pub(crate) fn array_to_bulks(items: Vec<Reply>) -> io::Result<Vec<Vec<u8>>> {
items
.into_iter()
.map(|r| match r {
Reply::Bulk(v) => Ok(v),
Reply::Simple(v) => Ok(v),
Reply::Nil => Ok(Vec::new()),
other => Err(unexpected(other)),
})
.collect()
}
pub(crate) fn store_err(e: kevy_embedded::StoreError) -> io::Error {
io::Error::other(format!("kevy-store: {e:?}"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn embedded_mem_full_crud_round_trip() {
let mut c = Connection::open("mem://").unwrap();
c.ping().unwrap();
c.set(b"k", b"v").unwrap();
assert_eq!(c.get(b"k").unwrap(), Some(b"v".to_vec()));
assert_eq!(c.del(&[&b"k"[..], &b"missing"[..]]).unwrap(), 1);
assert_eq!(c.get(b"k").unwrap(), None);
c.set(b"a", b"1").unwrap();
c.set(b"b", b"2").unwrap();
assert_eq!(c.exists(&[&b"a"[..], &b"b"[..], &b"none"[..]]).unwrap(), 2);
assert_eq!(c.incr(b"counter").unwrap(), 1);
assert_eq!(c.incr_by(b"counter", 9).unwrap(), 10);
c.set(b"timed", b"x").unwrap();
assert!(c.expire(b"timed", Duration::from_secs(60)).unwrap());
let ttl = c.ttl_ms(b"timed").unwrap();
assert!((0..=60_000).contains(&ttl), "ttl_ms = {ttl}");
assert!(c.persist(b"timed").unwrap());
assert_eq!(c.ttl_ms(b"timed").unwrap(), -1);
assert_eq!(c.type_of(b"none").unwrap(), "none");
assert_eq!(c.type_of(b"timed").unwrap(), "string");
assert!(c.dbsize().unwrap() >= 3);
c.flush().unwrap();
assert_eq!(c.dbsize().unwrap(), 0);
c.set_with_ttl(b"timed2", b"x", Duration::from_secs(60))
.unwrap();
let ttl = c.ttl_ms(b"timed2").unwrap();
assert!((0..=60_000).contains(&ttl));
}
#[test]
fn anonymous_mem_publish_returns_zero() {
let mut c = Connection::open("mem://").unwrap();
assert_eq!(c.publish(b"chan", b"hi").unwrap(), 0);
}
#[test]
fn embedded_mget_mset() {
let mut c = Connection::open("mem://").unwrap();
c.mset(&[
(b"a".as_ref(), b"1".as_ref()),
(b"b".as_ref(), b"2".as_ref()),
])
.unwrap();
let got = c.mget(&[&b"a"[..], &b"b"[..], &b"missing"[..]]).unwrap();
assert_eq!(
got,
vec![Some(b"1".to_vec()), Some(b"2".to_vec()), None]
);
}
#[test]
fn embedded_multi_rejected_unsupported() {
let mut c = Connection::open("mem://").unwrap();
let err = c.multi().unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Unsupported);
}
}