use super::*;
use crate::tests_aof::temp_aof;
use std::time::Duration;
pub(crate) fn apply_for_test(store: &mut Store, args: &Argv) {
let verb = args[0].to_ascii_uppercase();
match verb.as_slice() {
b"SET" => {
store.set(&args[1], args[2].to_vec(), None, false, false);
}
b"DEL" => {
let keys: Vec<Vec<u8>> = args.iter().skip(1).map(|a| a.to_vec()).collect();
store.del(&keys);
}
b"HSET" => {
let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
let mut i = 2;
while i + 1 < args.len() {
pairs.push((args[i].to_vec(), args[i + 1].to_vec()));
i += 2;
}
store.hset(&args[1], &pairs).unwrap();
}
b"RPUSH" => {
let items: Vec<Vec<u8>> = args.iter().skip(2).map(|a| a.to_vec()).collect();
store.rpush(&args[1], &items).unwrap();
}
b"SADD" => {
let members: Vec<Vec<u8>> = args.iter().skip(2).map(|a| a.to_vec()).collect();
store.sadd(&args[1], &members).unwrap();
}
b"ZADD" => {
let mut pairs: Vec<(f64, Vec<u8>)> = Vec::new();
let mut i = 2;
while i + 1 < args.len() {
let score: f64 = std::str::from_utf8(&args[i]).unwrap().parse().unwrap();
pairs.push((score, args[i + 1].to_vec()));
i += 2;
}
store.zadd(&args[1], &pairs).unwrap();
}
b"PEXPIRE" => {
let ms: u64 = std::str::from_utf8(&args[2]).unwrap().parse().unwrap();
store.expire(&args[1], Duration::from_millis(ms));
}
b"PEXPIREAT" => {
let deadline: u64 = std::str::from_utf8(&args[2]).unwrap().parse().unwrap();
store.expire_at_unix_ms(&args[1], deadline);
}
b"XADD" => {
let mut i = 2;
let mut maxlen: Option<u64> = None;
if args[i].eq_ignore_ascii_case(b"MAXLEN") {
maxlen = Some(std::str::from_utf8(&args[3]).unwrap().parse().unwrap());
i = 4;
}
let spec = kevy_store::parse_xadd_id(&args[i]).unwrap();
let mut fields = Vec::new();
let mut j = i + 1;
while j + 1 < args.len() {
fields.push((args[j].to_vec(), args[j + 1].to_vec()));
j += 2;
}
store.xadd(&args[1], spec, fields, false, 0).unwrap();
if let Some(n) = maxlen {
store.xtrim_maxlen(&args[1], n).unwrap();
}
}
b"XSETID" => {
let last = kevy_store::parse_explicit_id(&args[2], false).unwrap();
assert_eq!(args[3].to_ascii_uppercase(), b"ENTRIESADDED");
let added: u64 = std::str::from_utf8(&args[4]).unwrap().parse().unwrap();
assert_eq!(args[5].to_ascii_uppercase(), b"MAXDELETEDID");
let mxd = kevy_store::parse_explicit_id(&args[6], false).unwrap();
store.xsetid(&args[1], last, Some(added), Some(mxd)).unwrap();
}
b"XGROUP" => match args[1].to_ascii_uppercase().as_slice() {
b"CREATE" => {
assert_eq!(args[5].to_ascii_uppercase(), b"MKSTREAM");
let at = kevy_store::parse_explicit_id(&args[4], false).unwrap();
store
.xgroup_create(&args[2], &args[3], kevy_store::GroupCreateMode::AtId(at), true)
.unwrap();
}
b"CREATECONSUMER" => {
store
.xgroup_create_consumer(&args[2], &args[3], &args[4], 7_777)
.unwrap();
}
other => panic!(
"unexpected XGROUP sub in AOF rewrite: {:?}",
String::from_utf8_lossy(other)
),
},
b"XCLAIM" => {
assert_eq!(&args[4], b"0");
assert_eq!(args[6].to_ascii_uppercase(), b"TIME");
assert_eq!(args[8].to_ascii_uppercase(), b"RETRYCOUNT");
assert_eq!(args[10].to_ascii_uppercase(), b"FORCE");
assert_eq!(args[11].to_ascii_uppercase(), b"JUSTID");
let id = kevy_store::parse_explicit_id(&args[5], false).unwrap();
let opts = kevy_store::XClaimOpts {
min_idle_ms: 0,
idle_override_ms: None,
time_override_ms: Some(std::str::from_utf8(&args[7]).unwrap().parse().unwrap()),
retrycount_override: Some(
std::str::from_utf8(&args[9]).unwrap().parse().unwrap(),
),
force: true,
justid: true,
};
store.xclaim(&args[1], &args[2], &args[3], &[id], &opts, 0).unwrap();
}
other => panic!("unexpected verb in AOF rewrite: {:?}", String::from_utf8_lossy(other)),
}
}
#[test]
fn rewrite_reconstructs_full_keyspace() {
let path = temp_aof("rewrite-all");
let mut src = Store::new();
src.set(b"str", b"hello".to_vec(), None, false, false);
src.set(b"binary", vec![0u8, 1, 2, 255], None, false, false);
src.hset(b"hash", &[(b"f1".to_vec(), b"v1".to_vec()), (b"f2".to_vec(), b"v2".to_vec())])
.unwrap();
src.rpush(b"list", &[b"i1".to_vec(), b"i2".to_vec(), b"i3".to_vec()])
.unwrap();
src.sadd(b"set", &[b"m1".to_vec(), b"m2".to_vec()]).unwrap();
src.zadd(b"zset", &[(1.5, b"a".to_vec()), (2.5, b"b".to_vec())])
.unwrap();
src.set(
b"ttl",
b"x".to_vec(),
Some(Duration::from_secs(3600)),
false,
false,
);
let mut aof = Aof::open(&path, Fsync::Always).unwrap();
let stats = aof.rewrite_from(&src).unwrap();
assert_eq!(stats.keys, 7);
assert!(stats.bytes > 0);
assert_eq!(aof.size_bytes(), stats.bytes);
assert_eq!(aof.size_at_last_rewrite(), stats.bytes);
assert_eq!(aof.rewrites_total(), 1);
drop(aof);
let mut dst = Store::new();
replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
assert_eq!(dst.dbsize(), 7);
assert_eq!(dst.get(b"str").unwrap(), Some(&b"hello"[..]));
assert_eq!(dst.get(b"binary").unwrap(), Some(&[0u8, 1, 2, 255][..]));
assert_eq!(dst.hget(b"hash", b"f1").unwrap(), Some(&b"v1"[..]));
assert_eq!(dst.hget(b"hash", b"f2").unwrap(), Some(&b"v2"[..]));
assert_eq!(dst.llen(b"list").unwrap(), 3);
assert_eq!(dst.scard(b"set").unwrap(), 2);
assert_eq!(dst.zcard(b"zset").unwrap(), 2);
assert!(dst.pttl(b"ttl") > 3_500_000); let _ = std::fs::remove_file(&path);
}
#[test]
fn rewrite_replaces_old_log_atomically() {
let path = temp_aof("rewrite-swap");
{
let mut aof = Aof::open(&path, Fsync::Always).unwrap();
for i in 0..50 {
let k = format!("k{i}");
let argv = Argv::from(vec![b"SET".to_vec(), k.into_bytes(), b"v".to_vec()]);
aof.append(&argv).unwrap();
}
}
let big_size = std::fs::metadata(&path).unwrap().len();
assert!(big_size > 0);
let mut store = Store::new();
store.set(b"only", b"value".to_vec(), None, false, false);
store.set(b"second", b"v2".to_vec(), None, false, false);
let mut aof = Aof::open(&path, Fsync::Always).unwrap();
let stats = aof.rewrite_from(&store).unwrap();
assert_eq!(stats.keys, 2);
let new_size = std::fs::metadata(&path).unwrap().len();
assert!(new_size < big_size, "rewrite should shrink: {new_size} vs {big_size}");
aof.append(&Argv::from(vec![b"SET".to_vec(), b"third".to_vec(), b"v".to_vec()]))
.unwrap();
drop(aof);
let mut dst = Store::new();
replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
assert_eq!(dst.dbsize(), 3, "rewrite + append should yield 3 keys");
let _ = std::fs::remove_file(&path);
}
#[test]
fn append_bumps_size_estimate() {
let path = temp_aof("size-est");
let mut aof = Aof::open(&path, Fsync::No).unwrap();
let base = aof.size_bytes();
aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))
.unwrap();
let after_one = aof.size_bytes();
assert!(after_one > base);
aof.append(&Argv::from(vec![b"SET".to_vec(), b"k2".to_vec(), b"v".to_vec()]))
.unwrap();
assert!(aof.size_bytes() > after_one);
let _ = std::fs::remove_file(&path);
}
#[test]
fn rewrite_resets_size_anchor() {
let path = temp_aof("size-anchor");
let mut aof = Aof::open(&path, Fsync::Always).unwrap();
for _ in 0..10 {
aof.append(&Argv::from(vec![b"SET".to_vec(), b"k".to_vec(), b"v".to_vec()]))
.unwrap();
}
assert!(aof.size_bytes() > aof.size_at_last_rewrite());
let store = Store::new();
let stats = aof.rewrite_from(&store).unwrap();
assert_eq!(stats.keys, 0);
assert_eq!(aof.size_bytes(), 9);
assert_eq!(aof.size_at_last_rewrite(), 9);
assert_eq!(aof.rewrites_total(), 1);
let _ = std::fs::remove_file(&path);
}
#[test]
fn concurrent_rewrite_captures_writes_during_spill() {
let path = temp_aof("concurrent-rw");
let mut store = Store::new();
store.set(b"a", b"1".to_vec(), None, false, false);
store.set(b"b", b"2".to_vec(), None, false, false);
let mut aof = Aof::open(&path, Fsync::Always).unwrap();
let plan = aof.begin_concurrent_rewrite(&store).unwrap();
assert!(aof.is_rewriting());
assert_eq!(plan.keys, 2);
aof.append(&argv(&[b"SET", b"c", b"3"])).unwrap(); aof.append(&argv(&[b"SET", b"b", b"22"])).unwrap(); aof.append(&argv(&[b"DEL", b"a"])).unwrap();
std::fs::write(&plan.tmp, &plan.body).unwrap();
let stats = aof.finish_concurrent_rewrite(&plan.tmp, plan.keys).unwrap();
assert!(!aof.is_rewriting());
assert_eq!(stats.keys, 2);
assert_eq!(aof.rewrites_total(), 1);
let mut dst = Store::new();
replay_aof(&path, |a| apply_for_test(&mut dst, &a)).unwrap();
assert_eq!(dst.get(b"a").unwrap(), None, "DEL during spill must apply");
assert_eq!(dst.get(b"b").unwrap(), Some(&b"22"[..]), "overwrite must win");
assert_eq!(dst.get(b"c").unwrap(), Some(&b"3"[..]), "new key must survive");
let _ = std::fs::remove_file(&path);
}
fn argv(parts: &[&[u8]]) -> Argv {
Argv::from(parts.iter().map(|p| p.to_vec()).collect::<Vec<_>>())
}
#[test]
fn rewrite_reconstructs_stream_groups() {
use kevy_store::{GroupCreateMode, ReadGroupId, StreamId, XAddIdSpec};
let id = |ms, seq| StreamId { ms, seq };
let f = |k: &str| {
(
k.as_bytes().to_vec(),
vec![(b"f".to_vec(), b"v".to_vec())],
)
};
let path = temp_aof("rewrite-groups");
let mut src = Store::new();
for ms in [1u64, 2, 3] {
let (k, fields) = f("st");
src.xadd(&k, XAddIdSpec::Explicit(id(ms, 1)), fields, false, 0).unwrap();
}
src.xgroup_create(b"st", b"g", GroupCreateMode::AtId(StreamId::MIN), false).unwrap();
src.xreadgroup(b"st", b"g", b"c1", ReadGroupId::New, Some(2), false, 1000).unwrap();
src.xreadgroup(b"st", b"g", b"c2", ReadGroupId::New, None, false, 2000).unwrap();
src.xdel(b"st", &[id(2, 1)]).unwrap();
for ms in [7u64, 8] {
let (k, fields) = f("deltail");
src.xadd(&k, XAddIdSpec::Explicit(id(ms, 1)), fields, false, 0).unwrap();
}
src.xdel(b"deltail", &[id(8, 1)]).unwrap();
let (k, fields) = f("emptyg");
src.xadd(&k, XAddIdSpec::Explicit(id(5, 1)), fields, false, 0).unwrap();
src.xdel(b"emptyg", &[id(5, 1)]).unwrap();
src.xgroup_create(b"emptyg", b"g2", GroupCreateMode::AtId(id(5, 1)), false).unwrap();
src.xgroup_create(b"virgin", b"g3", GroupCreateMode::AtId(StreamId::MIN), true).unwrap();
let mut aof = Aof::open(&path, Fsync::No).unwrap();
aof.rewrite_from(&src).unwrap();
drop(aof);
let mut dst = Store::new();
replay_aof(&path, |args| apply_for_test(&mut dst, &args)).unwrap();
let v = dst.stream_view(b"st").unwrap().unwrap();
assert_eq!(
(v.length(), v.last_id(), v.entries_added(), v.max_deleted_id()),
(2, id(3, 1), 3, id(2, 1))
);
let g = v.group(b"g").expect("group must survive the rewrite");
assert_eq!(g.last_delivered_id(), id(3, 1));
assert_eq!(g.pending_count(), 2); let p1 = g.pel.get(&id(1, 1)).unwrap();
assert_eq!(
(p1.consumer.as_slice(), p1.delivery_time_ms, p1.delivery_count),
(&b"c1"[..], 1000, 1)
);
let p3 = g.pel.get(&id(3, 1)).unwrap();
assert_eq!(
(p3.consumer.as_slice(), p3.delivery_time_ms, p3.delivery_count),
(&b"c2"[..], 2000, 1)
);
let mut consumers: Vec<(Vec<u8>, usize)> =
g.consumers_iter().map(|(n, c)| (n.to_vec(), c.pending_count())).collect();
consumers.sort();
assert_eq!(consumers, vec![(b"c1".to_vec(), 1), (b"c2".to_vec(), 1)]);
let v = dst.stream_view(b"deltail").unwrap().unwrap();
assert_eq!(
(v.length(), v.last_id(), v.entries_added(), v.max_deleted_id()),
(1, id(8, 1), 2, id(8, 1))
);
let v = dst.stream_view(b"emptyg").unwrap().unwrap();
assert_eq!(
(v.length(), v.last_id(), v.entries_added(), v.max_deleted_id()),
(0, id(5, 1), 1, id(5, 1))
);
assert_eq!(v.group(b"g2").unwrap().last_delivered_id(), id(5, 1));
let v = dst.stream_view(b"virgin").unwrap().unwrap();
assert_eq!((v.length(), v.last_id()), (0, StreamId::MIN));
assert!(v.group(b"g3").is_some());
let _ = std::fs::remove_file(&path);
}