use std::io;
use std::time::Duration;
use kevy_store::StoreError;
use crate::pubsub::Subscription;
use crate::store::{Store, commit_write, store_err};
impl Store {
pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
let mut g = self.lock();
let ok = g.store.set(key, value.to_vec(), None, false, false);
commit_write(&mut g, &[b"SET", key, value])?;
Ok(ok)
}
pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
let mut g = self.lock();
let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
commit_write(&mut g, &[b"SET", key, value])?;
commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
Ok(ok)
}
pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
let mut g = self.lock();
Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
}
pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
let n = g.store.del(&owned);
if n > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
parts.push(b"DEL");
for k in keys {
parts.push(k);
}
commit_write(&mut g, &parts)?;
}
Ok(n)
}
pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
Ok(g.store.exists(&owned))
}
pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
self.incr_by(key, 1)
}
pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
let mut g = self.lock();
let n = g.store.incr_by(key, delta).map_err(store_err)?;
commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
Ok(n)
}
pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
let mut g = self.lock();
let touched = g.store.expire(key, ttl);
if touched {
let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
}
Ok(touched)
}
pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
let mut g = self.lock();
let touched = g.store.persist(key);
if touched {
commit_write(&mut g, &[b"PERSIST", key])?;
}
Ok(touched)
}
pub fn ttl_ms(&self, key: &[u8]) -> i64 {
self.lock().store.pttl(key)
}
pub fn type_of(&self, key: &[u8]) -> &'static str {
self.lock().store.type_of(key)
}
pub fn dbsize(&self) -> usize {
self.lock().store.dbsize()
}
pub fn flush(&self) -> io::Result<()> {
let mut g = self.lock();
g.store.flush();
commit_write(&mut g, &[b"FLUSHALL"])?;
Ok(())
}
pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
self.lock().store.estimate_key_bytes(key)
}
pub fn used_memory(&self) -> u64 {
self.lock().store.used_memory()
}
pub fn evictions_total(&self) -> u64 {
self.lock().store.evictions_total()
}
pub fn expired_keys_total(&self) -> u64 {
self.lock().store.expired_keys_total()
}
pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<(Vec<u8>, Vec<u8>)> =
pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
let added = g.store.hset(key, &owned).map_err(store_err)?;
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
parts.push(b"HSET");
parts.push(key);
for (f, v) in pairs {
parts.push(f);
parts.push(v);
}
commit_write(&mut g, &parts)?;
Ok(added)
}
pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
let mut g = self.lock();
Ok(g.store
.hget(key, field)
.map_err(store_err)?
.map(|v| v.to_vec()))
}
pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
let removed = g.store.hdel(key, &owned).map_err(store_err)?;
if removed > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
parts.push(b"HDEL");
parts.push(key);
for f in fields {
parts.push(f);
}
commit_write(&mut g, &parts)?;
}
Ok(removed)
}
pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
}
pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
}
pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
pop_helper(self, key, count, false)
}
pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
pop_helper(self, key, count, true)
}
pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
self.lock().store.llen(key).map_err(store_err)
}
pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
}
pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
let removed = g.store.srem(key, &owned).map_err(store_err)?;
if removed > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
parts.push(b"SREM");
parts.push(key);
for m in members {
parts.push(m);
}
commit_write(&mut g, &parts)?;
}
Ok(removed)
}
pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
self.lock().store.smembers(key).map_err(store_err)
}
pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
self.lock().store.scard(key).map_err(store_err)
}
pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<(f64, Vec<u8>)> =
pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
let added = g.store.zadd(key, &owned).map_err(store_err)?;
let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
for (s, _) in pairs {
score_strs.push(format!("{s}").into_bytes());
}
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
parts.push(b"ZADD");
parts.push(key);
for (i, (_, m)) in pairs.iter().enumerate() {
parts.push(&score_strs[i]);
parts.push(m);
}
commit_write(&mut g, &parts)?;
Ok(added)
}
pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
let removed = g.store.zrem(key, &owned).map_err(store_err)?;
if removed > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
parts.push(b"ZREM");
parts.push(key);
for m in members {
parts.push(m);
}
commit_write(&mut g, &parts)?;
}
Ok(removed)
}
pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
self.lock().store.zscore(key, member).map_err(store_err)
}
pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
self.lock().store.zcard(key).map_err(store_err)
}
pub fn publish(&self, channel: &[u8], payload: &[u8]) -> usize {
let plans = {
let g = self.lock();
g.bus.collect_delivery(channel, payload)
};
let mut count = 0;
for (frame, sender) in plans {
if sender.send(frame).is_ok() {
count += 1;
}
}
count
}
pub fn subscribe(&self, channels: &[&[u8]]) -> Subscription {
let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
if !channels.is_empty() {
sub.subscribe(channels);
}
sub
}
pub fn psubscribe(&self, patterns: &[&[u8]]) -> Subscription {
let mut sub = Subscription::new(self.inner_handle(), self.guard_handle());
if !patterns.is_empty() {
sub.psubscribe(patterns);
}
sub
}
}
fn push_helper<F>(
s: &Store,
key: &[u8],
values: &[&[u8]],
verb: &'static [u8],
op: F,
) -> io::Result<usize>
where
F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
{
let mut g = s.lock();
let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
let n = op(&mut g.store, key, &owned).map_err(store_err)?;
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
parts.push(verb);
parts.push(key);
for v in values {
parts.push(v);
}
commit_write(&mut g, &parts)?;
Ok(n)
}
fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
let mut g = s.lock();
let popped = if from_tail {
g.store.rpop(key, count).map_err(store_err)?
} else {
g.store.lpop(key, count).map_err(store_err)?
};
if !popped.is_empty() {
let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
let count_str = popped.len().to_string();
let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
commit_write(&mut g, &parts)?;
}
Ok(popped)
}