use super::*;
use super::tests::tmp_dir;
use crate::PubsubFrame;
#[test]
fn sharded_in_memory_roundtrip() {
let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
for i in 0..1000u32 {
s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
}
assert_eq!(s.dbsize(), 1000);
for i in 0..1000u32 {
assert_eq!(
s.get(format!("k{i}").as_bytes()).unwrap(),
Some(format!("v{i}").into_bytes())
);
}
let keys: Vec<Vec<u8>> = (0..1000u32).map(|i| format!("k{i}").into_bytes()).collect();
let refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
assert_eq!(s.del(&refs).unwrap(), 1000);
assert_eq!(s.dbsize(), 0);
}
#[test]
fn sharded_persist_survives_restart() {
let dir = tmp_dir("sharded-restart");
{
let s = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
for i in 0..500u32 {
s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
}
assert_eq!(s.dbsize(), 500);
}
assert!(dir.join("shards.meta").exists());
assert!(dir.join("aof-3.aof").exists());
let s2 = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s2.dbsize(), 500);
assert_eq!(s2.get(b"k0").unwrap(), Some(b"v0".to_vec()));
assert_eq!(s2.get(b"k499").unwrap(), Some(b"v499".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn migrates_single_aof_to_shards() {
let dir = tmp_dir("migrate");
{
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
for i in 0..300u32 {
s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
}
}
assert!(dir.join("aof-0.aof").exists());
assert!(dir.join("shards.meta").exists());
{
let s = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s.dbsize(), 300);
for i in 0..300u32 {
assert_eq!(
s.get(format!("k{i}").as_bytes()).unwrap(),
Some(format!("v{i}").into_bytes())
);
}
}
assert!(dir.join("shards.meta").exists());
assert!(std::fs::read_dir(&dir).unwrap().any(|e| {
e.unwrap().file_name().to_string_lossy().contains("premigration")
}));
let s3 = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s3.dbsize(), 300);
assert_eq!(s3.get(b"k150").unwrap(), Some(b"v150".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn sharded_pubsub_still_process_wide() {
let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
let sub = s.subscribe(&[b"chan"]);
let _ack = sub.recv().unwrap();
assert_eq!(s.publish(b"chan", b"hello"), 1);
assert_eq!(
sub.recv().unwrap(),
PubsubFrame::Message { channel: b"chan".to_vec(), payload: b"hello".to_vec() }
);
}
#[test]
fn collect_keys_spans_all_shards() {
let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
for i in 0..500u32 {
s.set(format!("user:{i}").as_bytes(), b"v").unwrap();
}
for i in 0..50u32 {
s.set(format!("other:{i}").as_bytes(), b"v").unwrap();
}
let matched = s.collect_keys(Some(b"user:*"), None);
assert_eq!(matched.len(), 500);
assert_eq!(s.collect_keys(Some(b"user:*"), Some(100)).len(), 100);
let mut total = 0;
s.for_each_shard(|inner| total += inner.dbsize());
assert_eq!(total, 550);
assert_eq!(s.shard_count(), 8);
}
#[test]
fn metaless_multishard_dir_is_migrated_not_partially_loaded() {
let dir = tmp_dir("metaless-multi");
std::fs::create_dir_all(&dir).unwrap();
let mut a = kevy_store::Store::new();
a.set(b"alpha", b"1".to_vec(), None, false, false);
kevy_persist::save_snapshot(&a, &dir.join("dump-0.rdb")).unwrap();
let mut b = kevy_store::Store::new();
b.set(b"beta", b"2".to_vec(), None, false, false);
kevy_persist::save_snapshot(&b, &dir.join("dump-1.rdb")).unwrap();
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s.dbsize(), 2, "both shards' keys must survive the n=1 open");
assert_eq!(s.get(b"alpha").unwrap(), Some(b"1".to_vec()));
assert_eq!(s.get(b"beta").unwrap(), Some(b"2".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn single_shard_default_names_record_meta() {
let dir = tmp_dir("single-meta");
{
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
s.set(b"k", b"v").unwrap();
}
assert!(dir.join("shards.meta").exists());
assert!(dir.join("aof-0.aof").exists());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn custom_filenames_stay_metaless() {
let dir = tmp_dir("custom-names");
{
let s = Store::open(
Config::default()
.with_persist(&dir)
.with_aof_filename("my.aof")
.with_snapshot_filename("my.rdb")
.with_ttl_reaper_manual(),
)
.unwrap();
s.set(b"k", b"v").unwrap();
}
assert!(dir.join("my.aof").exists());
assert!(!dir.join("shards.meta").exists());
let s2 = Store::open(
Config::default()
.with_persist(&dir)
.with_aof_filename("my.aof")
.with_snapshot_filename("my.rdb")
.with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s2.get(b"k").unwrap(), Some(b"v".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn open_rolls_forward_a_committed_reshard() {
let dir = tmp_dir("reshard-rollfwd");
std::fs::create_dir_all(&dir).unwrap();
let mut stale = kevy_store::Store::new();
stale.set(b"stale", b"x".to_vec(), None, false, false);
kevy_persist::save_snapshot(&stale, &dir.join("dump-0.rdb")).unwrap();
kevy_persist::write_shards_meta(
&dir.join("shards.meta"),
kevy_persist::ShardsMeta { n: 1, routing: kevy_persist::Routing::KevyHash },
)
.unwrap();
let mut shards = [kevy_store::Store::new(), kevy_store::Store::new()];
for (k, v) in [(b"alpha".as_slice(), b"1".as_slice()), (b"beta", b"2"), (b"gamma", b"3")] {
shards[crate::shard::shard_idx(k, 2)].set(k, v.to_vec(), None, false, false);
}
kevy_persist::save_snapshot(&shards[0], &dir.join("dump-0.rdb.reshard")).unwrap();
kevy_persist::save_snapshot(&shards[1], &dir.join("dump-1.rdb.reshard")).unwrap();
std::fs::write(
dir.join("reshard.journal"),
"kevy-reshard-journal v1\nstamp=7\nprev_n=1\nn=2\nrouting=kevyhash\n",
)
.unwrap();
let s = Store::open(
Config::default().with_persist(&dir).with_shards(2).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s.dbsize(), 3, "recovered store must hold the migrated state");
assert_eq!(s.get(b"alpha").unwrap(), Some(b"1".to_vec()));
assert_eq!(s.get(b"gamma").unwrap(), Some(b"3".to_vec()));
assert_eq!(s.get(b"stale").unwrap(), None);
drop(s);
assert!(dir.join("dump-0.rdb.premigration.7").exists(), "stale source not backed up");
assert!(!dir.join("reshard.journal").exists());
assert!(!dir.join("dump-0.rdb.reshard").exists());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn open_discards_a_torn_reshard_journal() {
let dir = tmp_dir("reshard-torn");
std::fs::create_dir_all(&dir).unwrap();
let mut old = kevy_store::Store::new();
old.set(b"keep", b"v".to_vec(), None, false, false);
kevy_persist::save_snapshot(&old, &dir.join("dump-0.rdb")).unwrap();
std::fs::write(dir.join("reshard.journal"), "kevy-reshard-journal v1\nstamp=").unwrap();
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s.get(b"keep").unwrap(), Some(b"v".to_vec()));
drop(s);
assert!(!dir.join("reshard.journal").exists());
let _ = std::fs::remove_dir_all(&dir);
}