use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
use std::thread::JoinHandle;
use std::time::Instant;
use crate::metric::KevyMetric;
use kevy_persist::{Aof, Argv, RewriteStats};
use kevy_store::{ExpireStats, StoreError};
use crate::config::Config;
use crate::pubsub::PubsubBus;
use crate::shard::{build_shards, shard_idx};
pub(crate) type Shards = Arc<Vec<Arc<RwLock<Inner>>>>;
#[derive(Clone)]
pub struct Store {
shards: Shards,
guard: Arc<DropGuard>,
config: Config,
}
pub struct WeakStore {
shards: Weak<Vec<Arc<RwLock<Inner>>>>,
guard: Weak<DropGuard>,
config: Config,
}
impl WeakStore {
pub fn upgrade(&self) -> Option<Store> {
Some(Store {
shards: self.shards.upgrade()?,
guard: self.guard.upgrade()?,
config: self.config.clone(),
})
}
}
pub(crate) struct Inner {
pub(crate) store: kevy_store::Store,
pub(crate) aof: Option<Aof>,
pub(crate) bus: PubsubBus,
}
impl Inner {
pub(crate) fn new(store: kevy_store::Store, aof: Option<Aof>) -> Self {
Inner { store, aof, bus: PubsubBus::new() }
}
}
pub(crate) struct DropGuard {
reaper_stop: Option<Arc<AtomicBool>>,
reaper_join: Mutex<Option<JoinHandle<()>>>,
shards_for_flush: Shards,
}
impl Store {
pub fn open(config: Config) -> io::Result<Self> {
let shards: Shards = Arc::new(build_shards(&config)?);
let (reaper_stop, reaper_join) = crate::reaper::spawn_reaper(&config, &shards)?;
let guard = Arc::new(DropGuard {
reaper_stop,
reaper_join: Mutex::new(reaper_join),
shards_for_flush: shards.clone(),
});
Ok(Store { shards, guard, config })
}
pub fn downgrade(&self) -> WeakStore {
WeakStore {
shards: Arc::downgrade(&self.shards),
guard: Arc::downgrade(&self.guard),
config: self.config.clone(),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut kevy_store::Store) -> R,
{
let mut g = self.lock();
f(&mut g.store)
}
pub fn with_key<F, R>(&self, key: &[u8], f: F) -> R
where
F: FnOnce(&mut kevy_store::Store) -> R,
{
let mut g = self.wshard(key);
f(&mut g.store)
}
pub fn collect_keys(&self, pattern: Option<&[u8]>, limit: Option<usize>) -> Vec<Vec<u8>> {
let mut out = Vec::new();
for shard in self.shards.iter() {
if limit.is_some_and(|l| out.len() >= l) {
break;
}
let remaining = limit.map(|l| l - out.len());
out.extend(lock_read(shard).store.collect_keys(pattern, remaining));
}
out
}
pub fn for_each_shard<F: FnMut(&mut kevy_store::Store)>(&self, mut f: F) {
for shard in self.shards.iter() {
f(&mut lock_write(shard).store);
}
}
#[inline]
pub fn shard_count(&self) -> usize {
self.shards.len()
}
pub fn log(&self, parts: &[&[u8]]) -> io::Result<()> {
let mut g = match parts.get(1) {
Some(key) => self.wshard(key),
None => self.lock(),
};
if let Some(aof) = &mut g.aof {
let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
aof.append(&argv)?;
}
Ok(())
}
pub fn tick(&self) -> ExpireStats {
let mut total = ExpireStats::default();
for shard in self.shards.iter() {
let stats = {
let mut g = lock_write(shard);
g.store.tick_expire(self.config.reaper_samples, self.config.reaper_max_rounds)
};
total.sampled += stats.sampled;
total.expired += stats.expired;
crate::reaper::concurrent_auto_rewrite(
shard,
self.config.auto_aof_rewrite_pct,
self.config.auto_aof_rewrite_min_size,
self.config.metric_sink.as_ref(),
);
}
total
}
pub fn rewrite_aof(&self) -> io::Result<Option<RewriteStats>> {
let mut agg: Option<RewriteStats> = None;
for shard in self.shards.iter() {
let start = Instant::now();
let (view, tmp, before_bytes) = {
let mut g = lock_write(shard);
let Inner { store, aof, bus: _ } = &mut *g;
let Some(aof) = aof else { continue };
if aof.is_rewriting() {
continue;
}
let before = aof.size_bytes();
let view = store.collect_snapshot();
(view, aof.begin_view_rewrite()?, before)
};
let keys = match kevy_persist::dump_aof(&tmp, &view) {
Ok((keys, _)) => keys,
Err(e) => {
let mut g = lock_write(shard);
if let Some(aof) = &mut g.aof {
aof.abort_concurrent_rewrite();
}
let _ = std::fs::remove_file(&tmp);
return Err(e);
}
};
let mut g = lock_write(shard);
let Some(aof) = &mut g.aof else { continue };
let stats = match aof.finish_concurrent_rewrite(&tmp, keys) {
Ok(s) => s,
Err(e) => {
aof.abort_concurrent_rewrite();
let _ = std::fs::remove_file(&tmp);
return Err(e);
}
};
if let Some(sink) = &self.config.metric_sink {
sink.emit(KevyMetric::Rewrite {
keys: stats.keys,
before_bytes,
after_bytes: stats.bytes,
elapsed_ms: start.elapsed().as_millis() as u64,
});
}
let acc = agg.get_or_insert(RewriteStats { keys: 0, bytes: 0 });
acc.keys += stats.keys;
acc.bytes += stats.bytes;
}
Ok(agg)
}
pub fn save_snapshot(&self) -> io::Result<bool> {
let Some(dir) = self.config.data_dir.as_ref() else {
return Ok(false);
};
let n = self.shards.len();
for (i, shard) in self.shards.iter().enumerate() {
let name = if n == 1 {
self.config.snapshot_filename.clone()
} else {
kevy_persist::layout::snapshot_file(i)
};
save_shard_snapshot(shard, &dir.join(name))?;
}
Ok(true)
}
pub(crate) fn inner_handle(&self) -> Arc<RwLock<Inner>> {
self.shards[0].clone()
}
pub(crate) fn guard_handle(&self) -> Arc<DropGuard> {
self.guard.clone()
}
fn shard_for(&self, key: &[u8]) -> &Arc<RwLock<Inner>> {
&self.shards[shard_idx(key, self.shards.len())]
}
pub(crate) fn wshard(&self, key: &[u8]) -> RwLockWriteGuard<'_, Inner> {
lock_write(self.shard_for(key))
}
pub(crate) fn rshard(&self, key: &[u8]) -> RwLockReadGuard<'_, Inner> {
lock_read(self.shard_for(key))
}
pub(crate) fn lock(&self) -> RwLockWriteGuard<'_, Inner> {
lock_write(&self.shards[0])
}
pub(crate) fn sum_shards<F: Fn(&mut Inner) -> usize>(&self, f: F) -> usize {
self.shards.iter().map(|s| f(&mut lock_write(s))).sum()
}
pub(crate) fn sum_shards_u64<F: Fn(&mut Inner) -> u64>(&self, f: F) -> u64 {
self.shards.iter().map(|s| f(&mut lock_write(s))).sum()
}
pub(crate) fn try_for_each_shard<F: FnMut(&mut Inner) -> io::Result<()>>(
&self,
mut f: F,
) -> io::Result<()> {
for s in self.shards.iter() {
f(&mut lock_write(s))?;
}
Ok(())
}
}
fn save_shard_snapshot(shard: &RwLock<Inner>, path: &std::path::Path) -> io::Result<()> {
let (view, reset_tmp) = freeze_for_save(shard)?;
let tmp = match kevy_persist::write_snapshot_tmp(&view, path) {
Ok(t) => t,
Err(e) => {
if reset_tmp.is_some()
&& let Some(aof) = &mut lock_write(shard).aof
{
aof.abort_concurrent_rewrite();
}
return Err(e);
}
};
let mut g = lock_write(shard);
std::fs::rename(&tmp, path)?;
if let (Some(reset), Some(aof)) = (reset_tmp, &mut g.aof) {
let swap = kevy_persist::write_aof_base(&reset)
.and_then(|()| aof.finish_concurrent_rewrite(&reset, 0));
if let Err(e) = swap {
aof.abort_concurrent_rewrite();
let _ = std::fs::remove_file(&reset);
return Err(e);
}
}
Ok(())
}
fn freeze_for_save(
shard: &RwLock<Inner>,
) -> io::Result<(kevy_store::SnapshotView, Option<std::path::PathBuf>)> {
for _ in 0..2000 {
{
let mut g = lock_write(shard);
let Inner { store, aof, .. } = &mut *g;
match aof {
Some(a) if a.is_rewriting() => {} Some(a) => {
let view = store.collect_snapshot();
return Ok((view, Some(a.begin_view_rewrite()?)));
}
None => return Ok((store.collect_snapshot(), None)),
}
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
Err(io::Error::new(
io::ErrorKind::TimedOut,
"kevy-embedded: AOF rewrite still in flight after 10s; snapshot aborted",
))
}
pub(crate) fn lock_write(shard: &RwLock<Inner>) -> RwLockWriteGuard<'_, Inner> {
shard.write().unwrap_or_else(|p| p.into_inner())
}
pub(crate) fn lock_read(shard: &RwLock<Inner>) -> RwLockReadGuard<'_, Inner> {
shard.read().unwrap_or_else(|p| p.into_inner())
}
fn log_argv(aof: &mut Option<Aof>, parts: &[&[u8]]) -> io::Result<()> {
if let Some(aof) = aof {
let argv = Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>());
aof.append(&argv)?;
}
Ok(())
}
pub(crate) fn commit_write(inner: &mut Inner, parts: &[&[u8]]) -> io::Result<()> {
log_argv(&mut inner.aof, parts)?;
inner.store.try_evict_after_write();
Ok(())
}
pub(crate) fn store_err(e: StoreError) -> io::Error {
io::Error::new(io::ErrorKind::InvalidInput, format!("kevy-store: {e:?}"))
}
impl Drop for DropGuard {
fn drop(&mut self) {
if let Some(stop) = &self.reaper_stop {
stop.store(true, Ordering::Relaxed);
}
if let Some(j) = self
.reaper_join
.lock()
.unwrap_or_else(|p| p.into_inner())
.take()
{
let _ = j.join();
}
for shard in self.shards_for_flush.iter() {
let mut g = lock_write(shard);
if let Some(aof) = &mut g.aof {
let _ = aof.maybe_sync();
}
}
}
}
#[cfg(test)]
#[path = "store_tests.rs"]
mod tests;
#[cfg(test)]
#[path = "store_tests_shard.rs"]
mod tests_shard;