use crate::layout;
use crate::{Argv, Routing, ShardsMeta, load_snapshot, replay_aof, save_snapshot, write_shards_meta};
use kevy_store::Store;
use std::io;
use std::path::{Path, PathBuf};
pub trait ShardLayout {
fn snapshot_path(&self, dir: &Path, i: usize, n: usize) -> PathBuf;
fn aof_path(&self, dir: &Path, i: usize, n: usize) -> PathBuf;
}
pub struct StdLayout;
impl ShardLayout for StdLayout {
fn snapshot_path(&self, dir: &Path, i: usize, _n: usize) -> PathBuf {
layout::snapshot_path(dir, i)
}
fn aof_path(&self, dir: &Path, i: usize, _n: usize) -> PathBuf {
layout::aof_path(dir, i)
}
}
const JOURNAL: &str = "reshard.journal";
fn reshard_tmp(target: &Path) -> PathBuf {
let mut s = target.as_os_str().to_owned();
s.push(".reshard");
PathBuf::from(s)
}
pub fn merge_sources<L: ShardLayout>(
dir: &Path,
src_n: usize,
lay: &L,
temp: &mut Store,
mut replay: impl FnMut(&mut Store, Argv),
) -> io::Result<Vec<PathBuf>> {
let mut sources: Vec<PathBuf> = Vec::new();
for i in 0..src_n {
let snap = lay.snapshot_path(dir, i, src_n);
if snap.exists() {
load_snapshot(temp, &snap)?;
sources.push(snap);
}
let aof = lay.aof_path(dir, i, src_n);
if aof.exists() {
replay_aof(&aof, |args| replay(temp, args))?;
sources.push(aof);
}
}
Ok(sources)
}
pub fn commit_reshard<L: ShardLayout>(
dir: &Path,
prev_n: usize,
target: ShardsMeta,
stores: &[Store],
lay: &L,
) -> io::Result<u128> {
for i in 0..target.n {
let _ = std::fs::remove_file(reshard_tmp(&lay.snapshot_path(dir, i, target.n)));
}
for (i, store) in stores.iter().enumerate() {
save_snapshot(store, &reshard_tmp(&lay.snapshot_path(dir, i, target.n)))?;
}
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
write_journal(dir, prev_n, target, stamp)?; finish_reshard(dir, prev_n, target, stamp, lay)?;
Ok(stamp)
}
fn write_journal(dir: &Path, prev_n: usize, target: ShardsMeta, stamp: u128) -> io::Result<()> {
use std::io::Write;
let routing = match target.routing {
Routing::KevyHash => "kevyhash",
Routing::Slots => "slots",
};
let body = format!(
"kevy-reshard-journal v1\nstamp={stamp}\nprev_n={prev_n}\nn={}\nrouting={routing}\n",
target.n,
);
let mut f = std::fs::File::create(dir.join(JOURNAL))?;
f.write_all(body.as_bytes())?;
f.sync_all()
}
fn finish_reshard<L: ShardLayout>(
dir: &Path,
prev_n: usize,
target: ShardsMeta,
stamp: u128,
lay: &L,
) -> io::Result<()> {
for i in 0..prev_n {
let snap = lay.snapshot_path(dir, i, prev_n);
let is_source = i >= target.n
|| reshard_tmp(&lay.snapshot_path(dir, i, target.n)).exists();
if is_source && snap.exists() {
rename_to_backup(&snap, stamp)?;
}
let aof = lay.aof_path(dir, i, prev_n);
if aof.exists() {
rename_to_backup(&aof, stamp)?;
}
}
for i in 0..target.n {
let dst = lay.snapshot_path(dir, i, target.n);
let tmp = reshard_tmp(&dst);
if tmp.exists() {
std::fs::rename(&tmp, &dst)?;
}
}
write_shards_meta(&layout::shards_meta_path(dir), target)?;
std::fs::remove_file(dir.join(JOURNAL))
}
fn rename_to_backup(src: &Path, stamp: u128) -> io::Result<()> {
let mut bak = src.as_os_str().to_owned();
bak.push(format!(".premigration.{stamp}"));
std::fs::rename(src, &bak)
}
pub fn recover_journal<L: ShardLayout>(dir: &Path, lay: &L) -> io::Result<()> {
let path = dir.join(JOURNAL);
let body = match std::fs::read_to_string(&path) {
Ok(b) => b,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e),
};
match parse_journal(&body) {
Some((prev_n, target, stamp)) => {
finish_reshard(dir, prev_n, target, stamp, lay)?;
eprintln!(
"kevy: completed interrupted re-shard to {} shards ({:?} routing)",
target.n, target.routing,
);
Ok(())
}
None => std::fs::remove_file(&path),
}
}
fn parse_journal(body: &str) -> Option<(usize, ShardsMeta, u128)> {
let mut lines = body.lines();
if lines.next() != Some("kevy-reshard-journal v1") {
return None;
}
let mut stamp = None;
let mut prev_n = None;
let mut n = None;
let mut routing = None;
for line in lines {
let (k, v) = line.split_once('=')?;
match k {
"stamp" => stamp = v.parse::<u128>().ok(),
"prev_n" => prev_n = v.parse::<usize>().ok(),
"n" => n = v.parse::<usize>().ok(),
"routing" => {
routing = match v {
"kevyhash" => Some(Routing::KevyHash),
"slots" => Some(Routing::Slots),
_ => None,
}
}
_ => return None,
}
}
Some((prev_n?, ShardsMeta { n: n?, routing: routing? }, stamp?))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::read_shards_meta;
fn temp_dir(name: &str) -> PathBuf {
let p = std::env::temp_dir().join(format!(
"kevy-reshard-{name}-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&p).unwrap();
p
}
fn touch(dir: &Path, name: &str) {
std::fs::write(dir.join(name), b"x").unwrap();
}
const TARGET: ShardsMeta = ShardsMeta { n: 2, routing: Routing::Slots };
#[test]
fn journal_round_trips() {
let dir = temp_dir("journal-rt");
write_journal(&dir, 4, TARGET, 99).unwrap();
let body = std::fs::read_to_string(dir.join(JOURNAL)).unwrap();
assert_eq!(parse_journal(&body), Some((4, TARGET, 99)));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn recover_completes_after_commit_point_crash() {
let dir = temp_dir("mid-c");
for f in ["dump-0.rdb", "dump-1.rdb", "dump-2.rdb", "dump-3.rdb", "aof-0.aof", "aof-3.aof"] {
touch(&dir, f);
}
touch(&dir, "dump-0.rdb.reshard");
touch(&dir, "dump-1.rdb.reshard");
write_journal(&dir, 4, TARGET, 7).unwrap();
recover_journal(&dir, &StdLayout).unwrap();
for f in ["dump-0.rdb", "dump-1.rdb", "dump-2.rdb", "dump-3.rdb", "aof-0.aof", "aof-3.aof"] {
assert!(dir.join(format!("{f}.premigration.7")).exists(), "{f} not backed up");
}
assert!(dir.join("dump-0.rdb").exists() && dir.join("dump-1.rdb").exists());
assert!(!dir.join("dump-0.rdb.reshard").exists());
assert!(!dir.join(JOURNAL).exists());
assert_eq!(read_shards_meta(&dir.join("shards.meta")), Some(TARGET));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn recover_resumes_mid_finalize_crash() {
let dir = temp_dir("mid-d");
std::fs::write(dir.join("dump-0.rdb"), b"new0").unwrap(); touch(&dir, "dump-1.rdb.reshard"); touch(&dir, "dump-2.rdb.premigration.7"); write_journal(&dir, 3, TARGET, 7).unwrap();
recover_journal(&dir, &StdLayout).unwrap();
assert_eq!(std::fs::read(dir.join("dump-0.rdb")).unwrap(), b"new0");
assert!(!dir.join("dump-0.rdb.premigration.7").exists(), "finalized snapshot re-backed-up");
assert!(dir.join("dump-1.rdb").exists());
assert!(!dir.join(JOURNAL).exists());
assert_eq!(read_shards_meta(&dir.join("shards.meta")), Some(TARGET));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn torn_journal_is_discarded() {
let dir = temp_dir("torn");
touch(&dir, "dump-0.rdb");
touch(&dir, "dump-0.rdb.reshard");
std::fs::write(dir.join(JOURNAL), b"kevy-reshard-journal v1\nstamp=12").unwrap();
recover_journal(&dir, &StdLayout).unwrap();
assert!(!dir.join(JOURNAL).exists());
assert!(dir.join("dump-0.rdb").exists());
assert!(!dir.join("dump-0.rdb.premigration.12").exists());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn no_journal_is_a_no_op() {
let dir = temp_dir("none");
touch(&dir, "dump-0.rdb");
recover_journal(&dir, &StdLayout).unwrap();
assert!(dir.join("dump-0.rdb").exists());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn custom_layout_round_trip() {
struct Custom;
impl ShardLayout for Custom {
fn snapshot_path(&self, dir: &Path, i: usize, n: usize) -> PathBuf {
if n == 1 { dir.join("snap.bin") } else { layout::snapshot_path(dir, i) }
}
fn aof_path(&self, dir: &Path, i: usize, n: usize) -> PathBuf {
if n == 1 { dir.join("log.bin") } else { layout::aof_path(dir, i) }
}
}
let dir = temp_dir("custom");
let mut a = Store::new();
a.set(b"alpha", b"1".to_vec(), None, false, false);
save_snapshot(&a, &dir.join("dump-0.rdb")).unwrap();
let mut b = Store::new();
b.set(b"beta", b"2".to_vec(), None, false, false);
save_snapshot(&b, &dir.join("dump-1.rdb")).unwrap();
let mut temp = Store::new();
let sources = merge_sources(&dir, 2, &Custom, &mut temp, |_, _| {}).unwrap();
assert_eq!(sources.len(), 2);
let target = ShardsMeta { n: 1, routing: Routing::KevyHash };
let stamp = commit_reshard(&dir, 2, target, &[temp], &Custom).unwrap();
assert!(dir.join("snap.bin").exists());
assert!(dir.join(format!("dump-0.rdb.premigration.{stamp}")).exists());
assert!(dir.join(format!("dump-1.rdb.premigration.{stamp}")).exists());
assert!(!dir.join(JOURNAL).exists());
let mut merged = Store::new();
load_snapshot(&mut merged, &dir.join("snap.bin")).unwrap();
assert_eq!(merged.dbsize(), 2);
let _ = std::fs::remove_dir_all(&dir);
}
}