use std::io;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard, Weak};
use std::thread::JoinHandle;
use std::time::Duration;
use kevy_persist::{Aof, Argv, RewriteStats, load_snapshot, replay_aof, save_snapshot};
use kevy_store::{ExpireStats, StoreError};
use crate::config::{Config, TtlReaperMode};
use crate::pubsub::PubsubBus;
#[derive(Clone)]
pub struct Store {
inner: Arc<Mutex<Inner>>,
guard: Arc<DropGuard>,
config: Config,
}
pub struct WeakStore {
inner: Weak<Mutex<Inner>>,
guard: Weak<DropGuard>,
config: Config,
}
impl WeakStore {
pub fn upgrade(&self) -> Option<Store> {
Some(Store {
inner: self.inner.upgrade()?,
guard: self.guard.upgrade()?,
config: self.config.clone(),
})
}
}
pub(crate) struct Inner {
pub(crate) store: kevy_store::Store,
pub(crate) aof: Option<Aof>,
pub(crate) bus: PubsubBus,
}
pub(crate) struct DropGuard {
reaper_stop: Option<Arc<AtomicBool>>,
reaper_join: Mutex<Option<JoinHandle<()>>>,
inner_for_flush: Arc<Mutex<Inner>>,
}
impl Store {
pub fn open(config: Config) -> io::Result<Self> {
let (store, aof) = init_persistent_store(&config)?;
let inner = Arc::new(Mutex::new(Inner {
store,
aof,
bus: PubsubBus::new(),
}));
let (reaper_stop, reaper_join) = spawn_reaper(&config, &inner)?;
let guard = Arc::new(DropGuard {
reaper_stop,
reaper_join: Mutex::new(reaper_join),
inner_for_flush: inner.clone(),
});
Ok(Store {
inner,
guard,
config,
})
}
pub fn downgrade(&self) -> WeakStore {
WeakStore {
inner: Arc::downgrade(&self.inner),
guard: Arc::downgrade(&self.guard),
config: self.config.clone(),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut kevy_store::Store) -> R,
{
let mut g = self.lock();
f(&mut g.store)
}
pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
let mut g = self.lock();
if let Some(aof) = &mut g.aof {
let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
aof.append(&argv)?;
}
Ok(())
}
pub fn tick(&self) -> ExpireStats {
let mut g = self.lock();
g.store
.tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
}
pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
let mut g = self.lock();
let Inner { store, aof, bus: _ } = &mut *g;
let Some(aof) = aof else { return Ok(None) };
Ok(Some(aof.rewrite_from(store)?))
}
pub fn save_snapshot(&self) -> io::Result<bool> {
let g = self.lock();
let Some(dir) = self.config.data_dir.as_ref() else {
return Ok(false);
};
let path: PathBuf = dir.join(&self.config.snapshot_filename);
save_snapshot(&g.store, &path)?;
Ok(true)
}
pub(crate) fn inner_handle(&self) -> Arc<Mutex<Inner>> {
self.inner.clone()
}
pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
self.guard.clone()
}
pub(crate) fn lock(&self) -> MutexGuard<'_, Inner> {
match self.inner.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
}
}
}
fn init_persistent_store(config: &Config) -> io::Result<(kevy_store::Store, Option<Aof>)> {
let mut store = kevy_store::Store::new();
store.set_max_memory(config.maxmemory, config.eviction_policy);
let aof = if let Some(dir) = &config.data_dir {
std::fs::create_dir_all(dir)?;
let snap_path = dir.join(&config.snapshot_filename);
if snap_path.exists() {
load_snapshot(&mut store, &snap_path)?;
}
let aof_path = dir.join(&config.aof_filename);
if aof_path.exists() {
replay_aof(&aof_path, |args| crate::replay::apply(&mut store, &args))?;
}
if config.aof {
Some(Aof::open(&aof_path, config.appendfsync)?)
} else {
None
}
} else {
None
};
Ok((store, aof))
}
#[allow(clippy::type_complexity)] fn spawn_reaper(
config: &Config,
inner: &Arc<Mutex<Inner>>,
) -> io::Result<(Option<Arc<AtomicBool>>, Option<JoinHandle<()>>)> {
match config.ttl_reaper {
TtlReaperMode::Manual => Ok((None, None)),
TtlReaperMode::Background => {
let stop = Arc::new(AtomicBool::new(false));
let stop_t = stop.clone();
let inner_t = inner.clone();
let interval = config.reaper_interval;
let samples = config.reaper_samples;
let rounds = config.reaper_max_rounds;
let handle = std::thread::Builder::new()
.name(String::from("kevy-embedded-reaper"))
.spawn(move || reaper_loop(inner_t, stop_t, interval, samples, rounds))?;
Ok((Some(stop), Some(handle)))
}
}
}
fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
if let Some(aof) = aof {
let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
aof.append(&argv)?;
}
Ok(())
}
pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
log_argv(&mut inner.aof, parts)?;
inner.store.try_evict_after_write();
Ok(())
}
pub(crate) fn store_err(e: StoreError) -> io::Error {
io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
}
fn reaper_loop(
inner: Arc<Mutex<Inner>>,
stop: Arc<AtomicBool>,
interval: Duration,
samples: usize,
rounds: u32,
) {
while !stop.load(Ordering::Relaxed) {
std::thread::sleep(interval);
if stop.load(Ordering::Relaxed) {
break;
}
let mut g = match inner.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
let _ = g.store.tick_expire(samples, rounds);
if let Some(aof) = &mut g.aof {
let _ = aof.maybe_sync();
}
}
}
impl Drop for DropGuard {
fn drop(&mut self) {
if let Some(stop) = &self.reaper_stop {
stop.store(true, Ordering::Relaxed);
}
if let Some(j) = self
.reaper_join
.lock()
.unwrap_or_else(|p| p.into_inner())
.take()
{
let _ = j.join();
}
let mut g = match self.inner_for_flush.lock() {
Ok(g) => g,
Err(poison) => poison.into_inner(),
};
if let Some(aof) = &mut g.aof {
let _ = aof.maybe_sync();
}
}
}
#[cfg(test)]
#[path = "store_tests.rs"]
mod tests;