use std::io;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use kevy_hash::KevyHash;
use kevy_persist::reshard::{ShardLayout, commit_reshard, merge_sources, recover_journal};
use kevy_persist::{
Aof, Routing, ShardsMeta, layout, layout::infer_files_n, load_snapshot, read_shards_meta,
replay_aof, write_shards_meta,
};
use kevy_store::Store as Keyspace;
use crate::config::{Config, TtlReaperMode};
use crate::metric::KevyMetric;
use crate::store::Inner;
#[inline]
pub(crate) fn shard_idx(key: &[u8], n: usize) -> usize {
if n == 1 {
return 0;
}
let h = key.kevy_hash() as usize;
if n.is_power_of_two() {
h & (n - 1)
} else {
h % n
}
}
fn aof_path(dir: &Path, config: &Config, i: usize, n: usize) -> PathBuf {
if n == 1 {
dir.join(&config.aof_filename) } else {
layout::aof_path(dir, i)
}
}
fn snapshot_path(dir: &Path, config: &Config, i: usize, n: usize) -> PathBuf {
if n == 1 {
dir.join(&config.snapshot_filename)
} else {
layout::snapshot_path(dir, i)
}
}
struct EmbLayout<'a>(&'a Config);
impl ShardLayout for EmbLayout<'_> {
fn snapshot_path(&self, dir: &Path, i: usize, n: usize) -> PathBuf {
snapshot_path(dir, self.0, i, n)
}
fn aof_path(&self, dir: &Path, i: usize, n: usize) -> PathBuf {
aof_path(dir, self.0, i, n)
}
}
fn fresh_keyspace(config: &Config) -> Keyspace {
let mut s = Keyspace::new();
s.set_max_memory(config.maxmemory, config.eviction_policy);
s.set_cached_clock(matches!(config.ttl_reaper, TtlReaperMode::Background));
s
}
pub(crate) fn build_shards(config: &Config) -> io::Result<Vec<Arc<RwLock<Inner>>>> {
let n = config.shards.max(1);
let mut stores: Vec<Keyspace> = (0..n).map(|_| fresh_keyspace(config)).collect();
let Some(dir) = config.data_dir.clone() else {
return Ok(into_inners(stores, (0..n).map(|_| None).collect()));
};
std::fs::create_dir_all(&dir)?;
recover_journal(&dir, &EmbLayout(config))?;
let meta_path = layout::shards_meta_path(&dir);
let prev = read_shards_meta(&meta_path);
let sharded_names = config.snapshot_filename == layout::snapshot_file(0)
&& config.aof_filename == layout::aof_file(0);
let same_layout = match prev {
Some(m) => m.n == n && m.routing == Routing::KevyHash,
None => n == 1 && infer_files_n(&dir) <= 1,
};
if same_layout {
load_in_place(&dir, config, n, &mut stores)?;
if n > 1 || sharded_names {
write_shards_meta(&meta_path, ShardsMeta { n, routing: Routing::KevyHash })?;
}
} else {
let src_n = prev.map(|m| m.n).or_else(|| {
let k = infer_files_n(&dir);
(k > 1).then_some(k)
});
reshard(&dir, config, n, src_n, &mut stores)?;
}
let aofs: Vec<Option<Aof>> = if config.aof {
(0..n)
.map(|i| Aof::open(&aof_path(&dir, config, i, n), config.appendfsync).map(Some))
.collect::<io::Result<_>>()?
} else {
(0..n).map(|_| None).collect()
};
Ok(into_inners(stores, aofs))
}
fn load_in_place(dir: &Path, config: &Config, n: usize, stores: &mut [Keyspace]) -> io::Result<()> {
let mut total_cmds = 0u64;
let mut total_bytes = 0u64;
let start = Instant::now();
for (i, store) in stores.iter_mut().enumerate() {
let snap = snapshot_path(dir, config, i, n);
if snap.exists() {
load_snapshot(store, &snap)?;
}
let aof = aof_path(dir, config, i, n);
if aof.exists() {
total_bytes += std::fs::metadata(&aof).map(|m| m.len()).unwrap_or(0);
replay_aof(&aof, |args| {
total_cmds += 1;
crate::replay::apply(store, &args);
})?;
}
}
emit_replay(config, total_cmds, total_bytes, start);
Ok(())
}
fn reshard(
dir: &Path,
config: &Config,
n: usize,
prev_n: Option<usize>,
stores: &mut [Keyspace],
) -> io::Result<()> {
let lay = EmbLayout(config);
let mut temp = fresh_keyspace(config);
let mut total_cmds = 0u64;
let start = Instant::now();
let src_n = prev_n.unwrap_or(1);
merge_sources(dir, src_n, &lay, &mut temp, |store, args| {
total_cmds += 1;
crate::replay::apply(store, &args);
})?;
let total_bytes = (0..src_n)
.map(|i| lay.aof_path(dir, i, src_n))
.filter_map(|p| std::fs::metadata(p).ok())
.map(|m| m.len())
.sum();
emit_replay(config, total_cmds, total_bytes, start);
temp.snapshot_each(|key, value, ttl_ms| {
stores[shard_idx(key, n)].load_value(key, value, ttl_ms);
});
commit_reshard(dir, src_n, ShardsMeta { n, routing: Routing::KevyHash }, stores, &lay)?;
Ok(())
}
fn emit_replay(config: &Config, commands: u64, bytes: u64, start: Instant) {
if let Some(sink) = &config.metric_sink {
sink.emit(KevyMetric::Replay {
commands,
bytes,
elapsed_ms: start.elapsed().as_millis() as u64,
});
}
}
fn into_inners(stores: Vec<Keyspace>, aofs: Vec<Option<Aof>>) -> Vec<Arc<RwLock<Inner>>> {
stores
.into_iter()
.zip(aofs)
.map(|(store, aof)| Arc::new(RwLock::new(Inner::new(store, aof))))
.collect()
}