use super::*;
use super::tests::tmp_dir;
use crate::PubsubFrame;
#[test]
fn sharded_in_memory_roundtrip() {
let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
for i in 0..1000u32 {
s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
}
assert_eq!(s.dbsize(), 1000);
for i in 0..1000u32 {
assert_eq!(
s.get(format!("k{i}").as_bytes()).unwrap(),
Some(format!("v{i}").into_bytes())
);
}
let keys: Vec<Vec<u8>> = (0..1000u32).map(|i| format!("k{i}").into_bytes()).collect();
let refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
assert_eq!(s.del(&refs).unwrap(), 1000);
assert_eq!(s.dbsize(), 0);
}
#[test]
fn sharded_persist_survives_restart() {
let dir = tmp_dir("sharded-restart");
{
let s = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
for i in 0..500u32 {
s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
}
assert_eq!(s.dbsize(), 500);
}
assert!(dir.join("shards.meta").exists());
assert!(dir.join("aof-3.aof").exists());
let s2 = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s2.dbsize(), 500);
assert_eq!(s2.get(b"k0").unwrap(), Some(b"v0".to_vec()));
assert_eq!(s2.get(b"k499").unwrap(), Some(b"v499".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn migrates_single_aof_to_shards() {
let dir = tmp_dir("migrate");
{
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
for i in 0..300u32 {
s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
}
}
assert!(dir.join("aof-0.aof").exists());
assert!(dir.join("shards.meta").exists());
{
let s = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s.dbsize(), 300);
for i in 0..300u32 {
assert_eq!(
s.get(format!("k{i}").as_bytes()).unwrap(),
Some(format!("v{i}").into_bytes())
);
}
}
assert!(dir.join("shards.meta").exists());
assert!(std::fs::read_dir(&dir).unwrap().any(|e| {
e.unwrap().file_name().to_string_lossy().contains("premigration")
}));
let s3 = Store::open(
Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s3.dbsize(), 300);
assert_eq!(s3.get(b"k150").unwrap(), Some(b"v150".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn sharded_pubsub_still_process_wide() {
let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
let sub = s.subscribe(&[b"chan"]);
let _ack = sub.recv().unwrap();
assert_eq!(s.publish(b"chan", b"hello"), 1);
assert_eq!(
sub.recv().unwrap(),
PubsubFrame::Message { channel: b"chan".to_vec(), payload: b"hello".to_vec() }
);
}
#[test]
fn collect_keys_spans_all_shards() {
let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
for i in 0..500u32 {
s.set(format!("user:{i}").as_bytes(), b"v").unwrap();
}
for i in 0..50u32 {
s.set(format!("other:{i}").as_bytes(), b"v").unwrap();
}
let matched = s.collect_keys(Some(b"user:*"), None);
assert_eq!(matched.len(), 500);
assert_eq!(s.collect_keys(Some(b"user:*"), Some(100)).len(), 100);
let mut total = 0;
s.for_each_shard(|inner| total += inner.dbsize());
assert_eq!(total, 550);
assert_eq!(s.shard_count(), 8);
}
#[test]
fn metaless_multishard_dir_is_migrated_not_partially_loaded() {
let dir = tmp_dir("metaless-multi");
std::fs::create_dir_all(&dir).unwrap();
let mut a = kevy_store::Store::new();
a.set(b"alpha", b"1".to_vec(), None, false, false);
kevy_persist::save_snapshot(&a, &dir.join("dump-0.rdb")).unwrap();
let mut b = kevy_store::Store::new();
b.set(b"beta", b"2".to_vec(), None, false, false);
kevy_persist::save_snapshot(&b, &dir.join("dump-1.rdb")).unwrap();
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s.dbsize(), 2, "both shards' keys must survive the n=1 open");
assert_eq!(s.get(b"alpha").unwrap(), Some(b"1".to_vec()));
assert_eq!(s.get(b"beta").unwrap(), Some(b"2".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn single_shard_default_names_record_meta() {
let dir = tmp_dir("single-meta");
{
let s = Store::open(
Config::default().with_persist(&dir).with_ttl_reaper_manual(),
)
.unwrap();
s.set(b"k", b"v").unwrap();
}
assert!(dir.join("shards.meta").exists());
assert!(dir.join("aof-0.aof").exists());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn custom_filenames_stay_metaless() {
let dir = tmp_dir("custom-names");
{
let s = Store::open(
Config::default()
.with_persist(&dir)
.with_aof_filename("my.aof")
.with_snapshot_filename("my.rdb")
.with_ttl_reaper_manual(),
)
.unwrap();
s.set(b"k", b"v").unwrap();
}
assert!(dir.join("my.aof").exists());
assert!(!dir.join("shards.meta").exists());
let s2 = Store::open(
Config::default()
.with_persist(&dir)
.with_aof_filename("my.aof")
.with_snapshot_filename("my.rdb")
.with_ttl_reaper_manual(),
)
.unwrap();
assert_eq!(s2.get(b"k").unwrap(), Some(b"v".to_vec()));
let _ = std::fs::remove_dir_all(&dir);
}