use crate::Commands;
use crate::reduce::shard_of;
use kevy_persist::{
Routing, ShardsMeta, load_snapshot, read_shards_meta, replay_aof, save_snapshot,
write_shards_meta,
};
use kevy_store::Store;
use std::io;
use std::path::{Path, PathBuf};
pub(crate) fn ensure_layout<C: Commands>(
dir: &Path,
n: usize,
routing: Routing,
commands: &C,
) -> io::Result<()> {
let meta_path = dir.join("shards.meta");
let target = ShardsMeta { n, routing };
let prev = match read_shards_meta(&meta_path) {
Some(m) => m,
None => ShardsMeta {
n: infer_legacy_n(dir),
routing: Routing::KevyHash,
},
};
if prev.n == 0 || prev == target {
std::fs::create_dir_all(dir)?;
return write_shards_meta(&meta_path, target);
}
reshard(dir, prev, target, commands)?;
write_shards_meta(&meta_path, target)
}
pub(crate) fn has_kevy_files(dir: &Path) -> bool {
infer_legacy_n(dir) > 0 || dir.join("shards.meta").exists()
}
fn infer_legacy_n(dir: &Path) -> usize {
let mut n = 0usize;
let Ok(entries) = std::fs::read_dir(dir) else {
return 0;
};
for entry in entries.flatten() {
let name = entry.file_name();
let Some(name) = name.to_str() else { continue };
let idx = name
.strip_prefix("dump-")
.and_then(|r| r.strip_suffix(".rdb"))
.or_else(|| name.strip_prefix("aof-").and_then(|r| r.strip_suffix(".aof")));
if let Some(i) = idx.and_then(|s| s.parse::<usize>().ok()) {
n = n.max(i + 1);
}
}
n
}
fn reshard<C: Commands>(
dir: &Path,
prev: ShardsMeta,
target: ShardsMeta,
commands: &C,
) -> io::Result<()> {
let mut temp = Store::new();
let mut sources: Vec<PathBuf> = Vec::new();
for i in 0..prev.n {
let snap = dir.join(format!("dump-{i}.rdb"));
if snap.exists() {
load_snapshot(&mut temp, &snap)?;
sources.push(snap);
}
let aof = dir.join(format!("aof-{i}.aof"));
if aof.exists() {
replay_aof(&aof, |args| {
commands.dispatch(&mut temp, &args);
})?;
sources.push(aof);
}
}
let mut stores: Vec<Store> = (0..target.n).map(|_| Store::new()).collect();
let slots = target.routing == Routing::Slots;
temp.snapshot_each(|key, value, ttl_ms| {
stores[shard_of(key, target.n, slots)].load_value(key, value, ttl_ms);
});
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
for src in &sources {
let mut bak = src.clone().into_os_string();
bak.push(format!(".premigration.{stamp}"));
std::fs::rename(src, &bak)?;
}
for (i, store) in stores.iter().enumerate() {
save_snapshot(store, &dir.join(format!("dump-{i}.rdb")))?;
}
eprintln!(
"kevy: re-sharded {} -> {} shards ({:?} -> {:?} routing); {} source file(s) backed up as .premigration.{stamp}",
prev.n, target.n, prev.routing, target.routing, sources.len(),
);
Ok(())
}