use std::io;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::Duration;
use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
use kevy_store::{ExpireStats, StoreError};
use crate::config::{Config, TtlReaperMode};
pub struct Store {
inner: Arc<Mutex<Inner>>,
config: Config,
reaper_stop: Option<Arc<AtomicBool>>,
reaper_join: Option<JoinHandle<()>>,
}
struct Inner {
store: kevy_store::Store,
aof: Option<Aof>,
}
impl Store {
pub fn open(config: Config) -> io::Result<Self> {
let mut store = kevy_store::Store::new();
store.set_max_memory(config.maxmemory, config.eviction_policy);
let aof = if let Some(dir) = &config.data_dir {
std::fs::create_dir_all(dir)?;
let snap_path = dir.join(&config.snapshot_filename);
if snap_path.exists() {
load_snapshot(&mut store, &snap_path)?;
}
let aof_path = dir.join(&config.aof_filename);
if aof_path.exists() {
replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
}
if config.aof {
Some(Aof::open(&aof_path, config.appendfsync)?)
} else {
None
}
} else {
None
};
let inner = Arc::new(Mutex::new(Inner { store, aof }));
let (reaper_stop, reaper_join) = match config.ttl_reaper {
TtlReaperMode::Manual => (None, None),
TtlReaperMode::Background => {
let stop = Arc::new(AtomicBool::new(false));
let stop_t = stop.clone();
let inner_t = inner.clone();
let interval = config.reaper_interval;
let samples = config.reaper_samples;
let rounds = config.reaper_max_rounds;
let handle = std::thread::Builder::new()
.name(String::from("kevy-embedded-reaper"))
.spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
(Some(stop), Some(handle))
}
};
Ok(Store {
inner,
config,
reaper_stop,
reaper_join,
})
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut kevy_store::Store) -> R,
{
let mut g = self.lock();
f(&mut g.store)
}
pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
let mut g = self.lock();
if let Some(aof) = &mut g.aof {
let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
aof.append(&argv)?;
}
Ok(())
}
pub fn tick(&self) -> ExpireStats {
let mut g = self.lock();
g.store
.tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
}
pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
let mut g = self.lock();
let Inner { store, aof } = &mut *g;
let Some(aof) = aof else { return Ok(None) };
Ok(Some(aof.rewrite_from(store)?))
}
pub fn save_snapshot(&self) -> io::Result<bool> {
let g = self.lock();
let Some(dir) = self.config.data_dir.as_ref() else {
return Ok(false);
};
let path: PathBuf = dir.join(&self.config.snapshot_filename);
save_snapshot(&g.store, &path)?;
Ok(true)
}
pub fn set(&self, key: &[u8], value: &[u8]) -> io::Result<bool> {
let mut g = self.lock();
let ok = g.store.set(key, value.to_vec(), None, false, false);
commit_write(&mut g, &[b"SET", key, value])?;
Ok(ok)
}
pub fn set_with_ttl(&self, key: &[u8], value: &[u8], ttl: Duration) -> io::Result<bool> {
let mut g = self.lock();
let ok = g.store.set(key, value.to_vec(), Some(ttl), false, false);
let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
commit_write(&mut g, &[b"SET", key, value])?;
commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
Ok(ok)
}
pub fn get(&self, key: &[u8]) -> io::Result<Option<Vec<u8>>> {
let mut g = self.lock();
Ok(g.store.get(key).map_err(store_err)?.map(|v| v.to_vec()))
}
pub fn del(&self, keys: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
let n = g.store.del(&owned);
if n > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(keys.len() + 1);
parts.push(b"DEL");
for k in keys {
parts.push(k);
}
commit_write(&mut g, &parts)?;
}
Ok(n)
}
pub fn exists(&self, keys: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = keys.iter().map(|k| k.to_vec()).collect();
Ok(g.store.exists(&owned))
}
pub fn incr(&self, key: &[u8]) -> io::Result<i64> {
self.incr_by(key, 1)
}
pub fn incr_by(&self, key: &[u8], delta: i64) -> io::Result<i64> {
let mut g = self.lock();
let n = g.store.incr_by(key, delta).map_err(store_err)?;
commit_write(&mut g, &[b"INCRBY", key, delta.to_string().as_bytes()])?;
Ok(n)
}
pub fn expire(&self, key: &[u8], ttl: Duration) -> io::Result<bool> {
let mut g = self.lock();
let touched = g.store.expire(key, ttl);
if touched {
let ms = ttl.as_millis().min(u64::MAX as u128) as u64;
commit_write(&mut g, &[b"PEXPIRE", key, ms.to_string().as_bytes()])?;
}
Ok(touched)
}
pub fn persist(&self, key: &[u8]) -> io::Result<bool> {
let mut g = self.lock();
let touched = g.store.persist(key);
if touched {
commit_write(&mut g, &[b"PERSIST", key])?;
}
Ok(touched)
}
pub fn ttl_ms(&self, key: &[u8]) -> i64 {
self.lock().store.pttl(key)
}
pub fn type_of(&self, key: &[u8]) -> &'static str {
self.lock().store.type_of(key)
}
pub fn dbsize(&self) -> usize {
self.lock().store.dbsize()
}
pub fn flush(&self) -> io::Result<()> {
let mut g = self.lock();
g.store.flush();
commit_write(&mut g, &[b"FLUSHALL"])?;
Ok(())
}
pub fn key_bytes(&self, key: &[u8]) -> Option<u64> {
self.lock().store.estimate_key_bytes(key)
}
pub fn used_memory(&self) -> u64 {
self.lock().store.used_memory()
}
pub fn evictions_total(&self) -> u64 {
self.lock().store.evictions_total()
}
pub fn expired_keys_total(&self) -> u64 {
self.lock().store.expired_keys_total()
}
pub fn hset(&self, key: &[u8], pairs: &[(&[u8], &[u8])]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<(Vec<u8>, Vec<u8>)> =
pairs.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect();
let added = g.store.hset(key, &owned).map_err(store_err)?;
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
parts.push(b"HSET");
parts.push(key);
for (f, v) in pairs {
parts.push(f);
parts.push(v);
}
commit_write(&mut g, &parts)?;
Ok(added)
}
pub fn hget(&self, key: &[u8], field: &[u8]) -> io::Result<Option<Vec<u8>>> {
let mut g = self.lock();
Ok(g.store.hget(key, field).map_err(store_err)?.map(|v| v.to_vec()))
}
pub fn hdel(&self, key: &[u8], fields: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = fields.iter().map(|f| f.to_vec()).collect();
let removed = g.store.hdel(key, &owned).map_err(store_err)?;
if removed > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + fields.len());
parts.push(b"HDEL");
parts.push(key);
for f in fields {
parts.push(f);
}
commit_write(&mut g, &parts)?;
}
Ok(removed)
}
pub fn lpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
push_helper(self, key, values, b"LPUSH", |s, k, vs| s.lpush(k, vs))
}
pub fn rpush(&self, key: &[u8], values: &[&[u8]]) -> io::Result<usize> {
push_helper(self, key, values, b"RPUSH", |s, k, vs| s.rpush(k, vs))
}
pub fn lpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
pop_helper(self, key, count, false)
}
pub fn rpop(&self, key: &[u8], count: usize) -> io::Result<Vec<Vec<u8>>> {
pop_helper(self, key, count, true)
}
pub fn llen(&self, key: &[u8]) -> io::Result<usize> {
self.lock().store.llen(key).map_err(store_err)
}
pub fn sadd(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
push_helper(self, key, members, b"SADD", |s, k, ms| s.sadd(k, ms))
}
pub fn srem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
let removed = g.store.srem(key, &owned).map_err(store_err)?;
if removed > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
parts.push(b"SREM");
parts.push(key);
for m in members {
parts.push(m);
}
commit_write(&mut g, &parts)?;
}
Ok(removed)
}
pub fn smembers(&self, key: &[u8]) -> io::Result<Vec<Vec<u8>>> {
self.lock().store.smembers(key).map_err(store_err)
}
pub fn scard(&self, key: &[u8]) -> io::Result<usize> {
self.lock().store.scard(key).map_err(store_err)
}
pub fn zadd(&self, key: &[u8], pairs: &[(f64, &[u8])]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<(f64, Vec<u8>)> =
pairs.iter().map(|(s, m)| (*s, m.to_vec())).collect();
let added = g.store.zadd(key, &owned).map_err(store_err)?;
let mut score_strs: Vec<Vec<u8>> = Vec::with_capacity(pairs.len());
for (s, _) in pairs {
score_strs.push(format!("{s}").into_bytes());
}
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + pairs.len() * 2);
parts.push(b"ZADD");
parts.push(key);
for (i, (_, m)) in pairs.iter().enumerate() {
parts.push(&score_strs[i]);
parts.push(m);
}
commit_write(&mut g, &parts)?;
Ok(added)
}
pub fn zrem(&self, key: &[u8], members: &[&[u8]]) -> io::Result<usize> {
let mut g = self.lock();
let owned: Vec<Vec<u8>> = members.iter().map(|m| m.to_vec()).collect();
let removed = g.store.zrem(key, &owned).map_err(store_err)?;
if removed > 0 {
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + members.len());
parts.push(b"ZREM");
parts.push(key);
for m in members {
parts.push(m);
}
commit_write(&mut g, &parts)?;
}
Ok(removed)
}
pub fn zscore(&self, key: &[u8], member: &[u8]) -> io::Result<Option<f64>> {
self.lock().store.zscore(key, member).map_err(store_err)
}
pub fn zcard(&self, key: &[u8]) -> io::Result<usize> {
self.lock().store.zcard(key).map_err(store_err)
}
fn lock(&self) -> MutexGuard<'_, Inner> {
match self.inner.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
}
}
}
fn push_helper<F>(
s: &Store,
key: &[u8],
values: &[&[u8]],
verb: &'static [u8],
op: F,
) -> io::Result<usize>
where
F: FnOnce(&mut kevy_store::Store, &[u8], &[Vec<u8>]) -> Result<usize, StoreError>,
{
let mut g = s.lock();
let owned: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
let n = op(&mut g.store, key, &owned).map_err(store_err)?;
let mut parts: Vec<&[u8]> = Vec::with_capacity(2 + values.len());
parts.push(verb);
parts.push(key);
for v in values {
parts.push(v);
}
commit_write(&mut g, &parts)?;
Ok(n)
}
fn pop_helper(s: &Store, key: &[u8], count: usize, from_tail: bool) -> io::Result<Vec<Vec<u8>>> {
let mut g = s.lock();
let popped = if from_tail {
g.store.rpop(key, count).map_err(store_err)?
} else {
g.store.lpop(key, count).map_err(store_err)?
};
if !popped.is_empty() {
let verb: &[u8] = if from_tail { b"RPOP" } else { b"LPOP" };
let count_str = popped.len().to_string();
let parts: [&[u8]; 3] = [verb, key, count_str.as_bytes()];
commit_write(&mut g, &parts)?;
}
Ok(popped)
}
fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
if let Some(aof) = aof {
let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
aof.append(&argv)?;
}
Ok(())
}
fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
log_argv(&mut inner.aof, parts)?;
inner.store.try_evict_after_write();
Ok(())
}
fn store_err(e: StoreError) -> io::Error {
io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
}
fn reaper_loop(
inner: Arc<Mutex<Inner>>,
stop: Arc<AtomicBool>,
interval: Duration,
samples: usize,
rounds: u32,
) {
while !stop.load(Ordering::Relaxed) {
std::thread::sleep(interval);
if stop.load(Ordering::Relaxed) {
break;
}
let mut g = match inner.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
let _ = g.store.tick_expire(samples, rounds);
if let Some(aof) = &mut g.aof {
let _ = aof.maybe_sync();
}
}
}
impl Drop for Store {
fn drop(&mut self) {
if let Some(stop) = &self.reaper_stop {
stop.store(true, Ordering::Relaxed);
}
if let Some(j) = self.reaper_join.take() {
let _ = j.join();
}
let mut g = match self.inner.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
if let Some(aof) = &mut g.aof {
let _ = aof.maybe_sync();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{AppendFsync, EvictionPolicy};
fn tmp_dir(name: &str) -> PathBuf {
let mut p = std::env::temp_dir();
let uniq = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
p.push(format!("kevy-embedded-{name}-{uniq}"));
p
}
#[test]
fn in_memory_roundtrip() {
let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
s.set(b"k", b"v").unwrap();
assert_eq!(s.get(b"k").unwrap(), Some(b"v".to_vec()));
assert_eq!(s.dbsize(), 1);
s.del(&[b"k"]).unwrap();
assert_eq!(s.dbsize(), 0);
}
#[test]
fn persistence_round_trip_via_aof() {
let dir = tmp_dir("aof-rt");
{
let s = Store::open(
Config::default()
.with_persist(&dir)
.with_ttl_reaper_manual()
.with_appendfsync(AppendFsync::Always),
)
.unwrap();
for i in 0..50 {
s.set(format!("k{i}").as_bytes(), b"v").unwrap();
}
s.incr_by(b"counter", 41).unwrap();
s.hset(b"h", &[(b"field" as &[u8], b"val" as &[u8])]).unwrap();
}
let s2 = Store::open(
Config::default()
.with_persist(&dir)
.with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s2.dbsize(), 52); assert_eq!(s2.get(b"k0").unwrap(), Some(b"v".to_vec()));
assert_eq!(s2.get(b"k49").unwrap(), Some(b"v".to_vec()));
assert_eq!(s2.get(b"counter").unwrap(), Some(b"41".to_vec()));
assert_eq!(s2.hget(b"h", b"field").unwrap(), Some(b"val".to_vec()));
drop(s2);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn eviction_works_under_pressure() {
let s = Store::open(
Config::default()
.with_ttl_reaper_manual()
.with_max_memory(800)
.with_eviction(EvictionPolicy::AllKeysLru),
)
.unwrap();
for i in 0..50 {
s.set(format!("k{i:02}").as_bytes(), b"xxxxxxxxxxxxxxxxxxxx")
.unwrap();
}
assert!(s.used_memory() <= 800, "got {}", s.used_memory());
assert!(s.evictions_total() > 0);
}
#[test]
fn manual_tick_runs_active_reaper() {
let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
s.set_with_ttl(b"short", b"v", Duration::from_millis(1)).unwrap();
s.set(b"perm", b"v").unwrap();
std::thread::sleep(Duration::from_millis(20));
let stats = s.tick();
let _ = stats;
let _ = s.get(b"short").unwrap(); assert!(s.expired_keys_total() >= 1);
assert!(s.get(b"perm").unwrap().is_some());
}
#[test]
fn with_escape_hatch_works() {
let s = Store::open(Config::default().with_ttl_reaper_manual()).unwrap();
let zsize = s.with(|store| {
let _ = store.zadd(b"z", &[(1.0, b"a".to_vec()), (2.0, b"b".to_vec())]);
store.zcard(b"z").unwrap()
});
assert_eq!(zsize, 2);
assert_eq!(s.type_of(b"z"), "zset");
}
#[test]
fn background_reaper_thread_drops_expired_keys() {
let s = Store::open(
Config::default().with_reaper_interval(Duration::from_millis(20)),
)
.unwrap();
s.set_with_ttl(b"k", b"v", Duration::from_millis(5)).unwrap();
std::thread::sleep(Duration::from_millis(120));
let _ = s.get(b"k").unwrap(); assert_eq!(s.dbsize(), 0);
}
#[test]
fn arc_sharing_across_threads() {
use std::sync::Arc;
let s = Arc::new(Store::open(Config::default().with_ttl_reaper_manual()).unwrap());
let mut handles = Vec::new();
for i in 0..8 {
let s = Arc::clone(&s);
handles.push(std::thread::spawn(move || {
for j in 0..50 {
s.set(format!("t{i}-{j}").as_bytes(), b"v").unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(s.dbsize(), 8 * 50);
}
#[test]
fn drop_during_reaper_does_not_deadlock() {
for _ in 0..4 {
let s = Store::open(
Config::default().with_reaper_interval(Duration::from_millis(5)),
)
.unwrap();
s.set(b"k", b"v").unwrap();
std::thread::sleep(Duration::from_millis(40));
drop(s); }
}
#[test]
fn save_snapshot_then_restart() {
let dir = tmp_dir("snap-rt");
{
let s = Store::open(
Config::default()
.with_persist(&dir)
.without_aof()
.with_ttl_reaper_manual(),
)
.unwrap();
for i in 0..10 {
s.set(format!("k{i}").as_bytes(), b"v").unwrap();
}
let saved = s.save_snapshot().unwrap();
assert!(saved);
}
let s2 = Store::open(
Config::default()
.with_persist(&dir)
.without_aof()
.with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s2.dbsize(), 10);
let _ = std::fs::remove_dir_all(&dir);
}
}