kevy-embedded 1.1.20

Embedded mode for kevy — in-process Redis-compatible KV without the server/runtime.
Documentation
//! Sharding (B2) + server-dir-interop tests for the embedded store.
//! Split from `store_tests.rs` to keep both under the 500-LOC house rule.

use super::*;
use super::tests::tmp_dir;
use crate::PubsubFrame;

// ───────────────────────── sharding (B2) ─────────────────────────

#[test]
fn sharded_in_memory_roundtrip() {
    let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
    for i in 0..1000u32 {
        s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
    }
    assert_eq!(s.dbsize(), 1000);
    for i in 0..1000u32 {
        assert_eq!(
            s.get(format!("k{i}").as_bytes()).unwrap(),
            Some(format!("v{i}").into_bytes())
        );
    }
    // Cross-shard DEL: keys hash to different shards.
    let keys: Vec<Vec<u8>> = (0..1000u32).map(|i| format!("k{i}").into_bytes()).collect();
    let refs: Vec<&[u8]> = keys.iter().map(|k| k.as_slice()).collect();
    assert_eq!(s.del(&refs).unwrap(), 1000);
    assert_eq!(s.dbsize(), 0);
}

#[test]
fn sharded_persist_survives_restart() {
    let dir = tmp_dir("sharded-restart");
    {
        let s = Store::open(
            Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
        )
        .unwrap();
        for i in 0..500u32 {
            s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
        }
        assert_eq!(s.dbsize(), 500);
    }
    // Per-shard AOFs exist; reopen at the same shard count loads them.
    assert!(dir.join("shards.meta").exists());
    assert!(dir.join("aof-3.aof").exists());
    let s2 = Store::open(
        Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
    )
    .unwrap();
    assert_eq!(s2.dbsize(), 500);
    assert_eq!(s2.get(b"k0").unwrap(), Some(b"v0".to_vec()));
    assert_eq!(s2.get(b"k499").unwrap(), Some(b"v499".to_vec()));
    let _ = std::fs::remove_dir_all(&dir);
}

#[test]
fn migrates_single_aof_to_shards() {
    let dir = tmp_dir("migrate");
    // 1. Write with the default single-shard layout (one aof-0.aof, no meta).
    {
        let s = Store::open(
            Config::default().with_persist(&dir).with_ttl_reaper_manual(),
        )
        .unwrap();
        for i in 0..300u32 {
            s.set(format!("k{i}").as_bytes(), format!("v{i}").as_bytes()).unwrap();
        }
    }
    assert!(dir.join("aof-0.aof").exists());
    // Since the dir-interop fix the single-shard open records its layout
    // too (default filenames are server-readable).
    assert!(dir.join("shards.meta").exists());

    // 2. Reopen opting into 4 shards → migrates the single AOF.
    {
        let s = Store::open(
            Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
        )
        .unwrap();
        assert_eq!(s.dbsize(), 300);
        for i in 0..300u32 {
            assert_eq!(
                s.get(format!("k{i}").as_bytes()).unwrap(),
                Some(format!("v{i}").into_bytes())
            );
        }
    }
    // Migration artifacts: meta written, legacy AOF backed up, per-shard files.
    assert!(dir.join("shards.meta").exists());
    assert!(std::fs::read_dir(&dir).unwrap().any(|e| {
        e.unwrap().file_name().to_string_lossy().contains("premigration")
    }));

    // 3. Reopen sharded again → loads per-shard, data intact.
    let s3 = Store::open(
        Config::default().with_persist(&dir).with_shards(4).with_ttl_reaper_manual(),
    )
    .unwrap();
    assert_eq!(s3.dbsize(), 300);
    assert_eq!(s3.get(b"k150").unwrap(), Some(b"v150".to_vec()));
    let _ = std::fs::remove_dir_all(&dir);
}

#[test]
fn sharded_pubsub_still_process_wide() {
    // Pub/sub is on shard 0; publishing reaches a subscriber regardless of
    // how the keyspace is sharded.
    let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
    let sub = s.subscribe(&[b"chan"]);
    let _ack = sub.recv().unwrap();
    assert_eq!(s.publish(b"chan", b"hello"), 1);
    assert_eq!(
        sub.recv().unwrap(),
        PubsubFrame::Message { channel: b"chan".to_vec(), payload: b"hello".to_vec() }
    );
}

#[test]
fn collect_keys_spans_all_shards() {
    let s = Store::open(Config::default().with_shards(8).with_ttl_reaper_manual()).unwrap();
    for i in 0..500u32 {
        s.set(format!("user:{i}").as_bytes(), b"v").unwrap();
    }
    for i in 0..50u32 {
        s.set(format!("other:{i}").as_bytes(), b"v").unwrap();
    }
    // Glob scan must span ALL shards, not just shard 0 (the `with` hole).
    let matched = s.collect_keys(Some(b"user:*"), None);
    assert_eq!(matched.len(), 500);
    // limit bounds the total across shards.
    assert_eq!(s.collect_keys(Some(b"user:*"), Some(100)).len(), 100);
    // for_each_shard sees the whole keyspace.
    let mut total = 0;
    s.for_each_shard(|inner| total += inner.dbsize());
    assert_eq!(total, 550);
    assert_eq!(s.shard_count(), 8);
}

// ──────────────── dir interop with the server (2026-06-11) ────────────────

/// A meta-less multi-shard dir (e.g. written by a pre-meta server) used to
/// be mistaken for the single-file layout: an n==1 open silently loaded
/// shard 0 only — (k-1)/k of the keyspace gone. Now the file names are
/// inferred and the dir is migrated whole.
#[test]
fn metaless_multishard_dir_is_migrated_not_partially_loaded() {
    let dir = tmp_dir("metaless-multi");
    std::fs::create_dir_all(&dir).unwrap();
    // Hand-build two shard snapshots, no shards.meta.
    let mut a = kevy_store::Store::new();
    a.set(b"alpha", b"1".to_vec(), None, false, false);
    kevy_persist::save_snapshot(&a, &dir.join("dump-0.rdb")).unwrap();
    let mut b = kevy_store::Store::new();
    b.set(b"beta", b"2".to_vec(), None, false, false);
    kevy_persist::save_snapshot(&b, &dir.join("dump-1.rdb")).unwrap();

    let s = Store::open(
        Config::default().with_persist(&dir).with_ttl_reaper_manual(),
    )
    .unwrap();
    assert_eq!(s.dbsize(), 2, "both shards' keys must survive the n=1 open");
    assert_eq!(s.get(b"alpha").unwrap(), Some(b"1".to_vec()));
    assert_eq!(s.get(b"beta").unwrap(), Some(b"2".to_vec()));
    let _ = std::fs::remove_dir_all(&dir);
}

/// Default-named single-shard dirs record their layout so a server (or a
/// later n>1 open) reads them without inference.
#[test]
fn single_shard_default_names_record_meta() {
    let dir = tmp_dir("single-meta");
    {
        let s = Store::open(
            Config::default().with_persist(&dir).with_ttl_reaper_manual(),
        )
        .unwrap();
        s.set(b"k", b"v").unwrap();
    }
    assert!(dir.join("shards.meta").exists());
    assert!(dir.join("aof-0.aof").exists());
    let _ = std::fs::remove_dir_all(&dir);
}

/// Custom filenames opt out of dir interop: the files keep their names and
/// no shards.meta is written (a meta would point a server at files that
/// don't exist).
#[test]
fn custom_filenames_stay_metaless() {
    let dir = tmp_dir("custom-names");
    {
        let s = Store::open(
            Config::default()
                .with_persist(&dir)
                .with_aof_filename("my.aof")
                .with_snapshot_filename("my.rdb")
                .with_ttl_reaper_manual(),
        )
        .unwrap();
        s.set(b"k", b"v").unwrap();
    }
    assert!(dir.join("my.aof").exists());
    assert!(!dir.join("shards.meta").exists());
    // Reopen with the same custom names: data intact.
    let s2 = Store::open(
        Config::default()
            .with_persist(&dir)
            .with_aof_filename("my.aof")
            .with_snapshot_filename("my.rdb")
            .with_ttl_reaper_manual(),
    )
    .unwrap();
    assert_eq!(s2.get(b"k").unwrap(), Some(b"v".to_vec()));
    let _ = std::fs::remove_dir_all(&dir);
}

// ─────────────── reshard crash-idempotency (2026-06-11) ───────────────

/// Crash right after a reshard's commit point (journal + `.reshard` temps
/// on disk, sources untouched): the next open must roll the migration
/// forward — finalize the temps, back the stale source up, record the
/// layout — and come up with the *new* state, not the stale one.
#[test]
fn open_rolls_forward_a_committed_reshard() {
    let dir = tmp_dir("reshard-rollfwd");
    std::fs::create_dir_all(&dir).unwrap();
    // Old layout: one shard whose snapshot still holds the pre-reshard key.
    let mut stale = kevy_store::Store::new();
    stale.set(b"stale", b"x".to_vec(), None, false, false);
    kevy_persist::save_snapshot(&stale, &dir.join("dump-0.rdb")).unwrap();
    kevy_persist::write_shards_meta(
        &dir.join("shards.meta"),
        kevy_persist::ShardsMeta { n: 1, routing: kevy_persist::Routing::KevyHash },
    )
    .unwrap();
    // Committed-but-unfinished migration to 2 shards: temps + journal.
    // Keys are placed on the shard their hash routes to, as a real
    // redistribute would have done.
    let mut shards = [kevy_store::Store::new(), kevy_store::Store::new()];
    for (k, v) in [(b"alpha".as_slice(), b"1".as_slice()), (b"beta", b"2"), (b"gamma", b"3")] {
        shards[crate::shard::shard_idx(k, 2)].set(k, v.to_vec(), None, false, false);
    }
    kevy_persist::save_snapshot(&shards[0], &dir.join("dump-0.rdb.reshard")).unwrap();
    kevy_persist::save_snapshot(&shards[1], &dir.join("dump-1.rdb.reshard")).unwrap();
    std::fs::write(
        dir.join("reshard.journal"),
        "kevy-reshard-journal v1\nstamp=7\nprev_n=1\nn=2\nrouting=kevyhash\n",
    )
    .unwrap();

    let s = Store::open(
        Config::default().with_persist(&dir).with_shards(2).with_ttl_reaper_manual(),
    )
    .unwrap();
    assert_eq!(s.dbsize(), 3, "recovered store must hold the migrated state");
    assert_eq!(s.get(b"alpha").unwrap(), Some(b"1".to_vec()));
    assert_eq!(s.get(b"gamma").unwrap(), Some(b"3".to_vec()));
    assert_eq!(s.get(b"stale").unwrap(), None);
    drop(s);
    assert!(dir.join("dump-0.rdb.premigration.7").exists(), "stale source not backed up");
    assert!(!dir.join("reshard.journal").exists());
    assert!(!dir.join("dump-0.rdb.reshard").exists());
    let _ = std::fs::remove_dir_all(&dir);
}

/// A torn journal (crash mid-journal-write) means the commit point was
/// never reached: the old layout is intact and must load as-is, with the
/// torn journal and stale temps discarded.
#[test]
fn open_discards_a_torn_reshard_journal() {
    let dir = tmp_dir("reshard-torn");
    std::fs::create_dir_all(&dir).unwrap();
    let mut old = kevy_store::Store::new();
    old.set(b"keep", b"v".to_vec(), None, false, false);
    kevy_persist::save_snapshot(&old, &dir.join("dump-0.rdb")).unwrap();
    std::fs::write(dir.join("reshard.journal"), "kevy-reshard-journal v1\nstamp=").unwrap();

    let s = Store::open(
        Config::default().with_persist(&dir).with_ttl_reaper_manual(),
    )
    .unwrap();
    assert_eq!(s.get(b"keep").unwrap(), Some(b"v".to_vec()));
    drop(s);
    assert!(!dir.join("reshard.journal").exists());
    let _ = std::fs::remove_dir_all(&dir);
}