use super::{EmbeddedRouteMode, EmbeddedStore, PackedSessionWrite};
use crate::config::EvictionPolicy;
#[cfg(feature = "telemetry")]
use crate::storage::CacheTelemetry;
use crate::storage::hash_key;
use crate::storage::{RedisObjectResult, RedisStringLookup};
#[cfg(feature = "telemetry")]
use std::sync::Arc;
#[test]
fn redis_objects_enforce_wrongtype_and_set_overwrite() {
let store = EmbeddedStore::new(4);
assert_eq!(
store.hset(b"user:1", b"name", b"ada"),
RedisObjectResult::Integer(1)
);
assert_eq!(
store.hget(b"user:1", b"name"),
RedisObjectResult::Bulk(Some(b"ada".to_vec()))
);
assert!(store.exists(b"user:1"));
let mut encoded = Vec::new();
assert_eq!(
store.get_string_value_into(b"user:1", |value| encoded.extend_from_slice(value)),
RedisStringLookup::WrongType
);
store.set(b"user:1".to_vec(), b"string".to_vec(), None);
assert_eq!(store.hget(b"user:1", b"name"), RedisObjectResult::WrongType);
assert_eq!(store.get(b"user:1"), Some(b"string".to_vec()));
assert!(store.delete(b"user:1"));
assert_eq!(store.get(b"user:1"), None);
assert!(!store.exists(b"user:1"));
}
#[test]
fn redis_list_set_and_zset_semantics() {
let store = EmbeddedStore::new(4);
assert_eq!(
store.lpush(b"list", &[b"a".as_slice(), b"b".as_slice()]),
RedisObjectResult::Integer(2)
);
assert_eq!(
store.rpush(b"list", &[b"c".as_slice()]),
RedisObjectResult::Integer(3)
);
assert_eq!(
store.lrange(b"list", 0, -1),
RedisObjectResult::Array(vec![
Some(b"b".to_vec()),
Some(b"a".to_vec()),
Some(b"c".to_vec())
])
);
assert_eq!(
store.lpop(b"list"),
RedisObjectResult::Bulk(Some(b"b".to_vec()))
);
assert_eq!(
store.sadd(b"set", &[b"x".as_slice(), b"x".as_slice(), b"y".as_slice()]),
RedisObjectResult::Integer(2)
);
assert_eq!(store.sismember(b"set", b"x"), RedisObjectResult::Integer(1));
assert_eq!(
store.srem(b"set", &[b"x".as_slice()]),
RedisObjectResult::Integer(1)
);
assert_eq!(store.scard(b"set"), RedisObjectResult::Integer(1));
assert_eq!(store.zadd(b"z", 2.0, b"b"), RedisObjectResult::Integer(1));
assert_eq!(store.zadd(b"z", 1.0, b"a"), RedisObjectResult::Integer(1));
assert_eq!(
store.zrange(b"z", 0, -1),
RedisObjectResult::Array(vec![Some(b"a".to_vec()), Some(b"b".to_vec())])
);
assert_eq!(
store.zscore(b"z", b"b"),
RedisObjectResult::Bulk(Some(b"2".to_vec()))
);
}
#[test]
fn redis_segmented_list_semantics_cross_inline_boundary() {
let store = EmbeddedStore::new(4);
let values = (0..20)
.map(|index| format!("v{index:02}").into_bytes())
.collect::<Vec<_>>();
let slices = values.iter().map(Vec::as_slice).collect::<Vec<_>>();
assert_eq!(
store.rpush(b"seg-list", &slices),
RedisObjectResult::Integer(20)
);
assert_eq!(
store.lindex(b"seg-list", 0),
RedisObjectResult::Bulk(Some(b"v00".to_vec()))
);
assert_eq!(
store.lindex(b"seg-list", -1),
RedisObjectResult::Bulk(Some(b"v19".to_vec()))
);
assert_eq!(
store.lrange(b"seg-list", 8, 11),
RedisObjectResult::Array(vec![
Some(b"v08".to_vec()),
Some(b"v09".to_vec()),
Some(b"v10".to_vec()),
Some(b"v11".to_vec()),
])
);
assert_eq!(
store.lrem(b"seg-list", 0, b"v10"),
RedisObjectResult::Integer(1)
);
assert_eq!(
store.lindex(b"seg-list", 10),
RedisObjectResult::Bulk(Some(b"v11".to_vec()))
);
assert_eq!(
store.ltrim(b"seg-list", 3, 12),
RedisObjectResult::Simple("OK")
);
assert_eq!(store.llen(b"seg-list"), RedisObjectResult::Integer(10));
assert_eq!(
store.lpop(b"seg-list"),
RedisObjectResult::Bulk(Some(b"v03".to_vec()))
);
assert_eq!(
store.rpop(b"seg-list"),
RedisObjectResult::Bulk(Some(b"v13".to_vec()))
);
}
#[test]
fn batch_get_preserves_input_order() {
let store = EmbeddedStore::new(4);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
store.set(b"beta".to_vec(), b"two".to_vec(), None);
store.set(b"gamma".to_vec(), b"three".to_vec(), None);
let values = store.batch_get(vec![
b"gamma".to_vec(),
b"missing".to_vec(),
b"alpha".to_vec(),
b"beta".to_vec(),
]);
assert_eq!(
values,
vec![
Some(b"three".to_vec()),
None,
Some(b"one".to_vec()),
Some(b"two".to_vec())
]
);
}
#[test]
fn ttl_expires_with_lazy_reads() {
let store = EmbeddedStore::new(2);
store.set(b"alpha".to_vec(), b"one".to_vec(), Some(1));
std::thread::sleep(std::time::Duration::from_millis(5));
assert_eq!(store.get(b"alpha"), None);
assert!(!store.exists(b"alpha"));
}
#[test]
fn session_prefix_routing_colocates_chunks() {
let store = EmbeddedStore::with_route_mode(8, EmbeddedRouteMode::SessionPrefix);
assert_eq!(
store.shard_for_key(b"s:42:c:0"),
store.shard_for_key(b"s:42:c:7")
);
assert_eq!(
store.shard_for_key(b"s:7:c:1"),
store.shard_for_key(b"s:7:c:99")
);
assert_eq!(
store.route_session(b"s:42").shard_id,
store.route_key(b"s:42:c:0").shard_id
);
}
#[test]
fn session_view_survives_update_and_delete() {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::SessionPrefix);
store.set(b"s:9:c:0".to_vec(), b"alpha".to_vec(), None);
store.set(b"s:9:c:1".to_vec(), b"beta".to_vec(), None);
let keys = vec![b"s:9:c:0".to_vec(), b"s:9:c:1".to_vec()];
let view = store.batch_get_session_view(b"s:9", &keys);
assert_eq!(view.slice(0), Some(&b"alpha"[..]));
assert_eq!(view.slice(1), Some(&b"beta"[..]));
store.set(b"s:9:c:0".to_vec(), b"gamma".to_vec(), None);
assert!(store.delete(b"s:9:c:1"));
assert_eq!(view.slice(0), Some(&b"alpha"[..]));
assert_eq!(view.slice(1), Some(&b"beta"[..]));
drop(view);
assert_eq!(store.get(b"s:9:c:0"), Some(b"gamma".to_vec()));
assert_eq!(store.get(b"s:9:c:1"), None);
}
#[test]
fn single_view_survives_update_and_delete() {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
let view = store.get_view(b"alpha");
assert_eq!(view.slice(), Some(&b"one"[..]));
store.set(b"alpha".to_vec(), b"two".to_vec(), None);
assert!(store.delete(b"alpha"));
assert_eq!(view.slice(), Some(&b"one"[..]));
drop(view);
}
#[test]
fn read_slice_meta_survives_view_and_store_drop() {
let slice = {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
let view = store.get_view(b"alpha");
view.slice_meta().expect("alpha should exist")
};
assert_eq!(slice.as_slice(), b"one");
}
#[test]
fn batch_slice_meta_survives_view_and_store_drop() {
let slice = {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
let view = store.batch_get_view(&[b"alpha".to_vec()]);
view.slice_meta(0).expect("alpha should exist")
};
assert_eq!(slice.as_slice(), b"one");
}
#[test]
fn batch_view_preserves_order_across_shards() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
store.set(b"beta".to_vec(), b"two".to_vec(), None);
let view = store.batch_get_view(&[b"beta".to_vec(), b"missing".to_vec(), b"alpha".to_vec()]);
assert_eq!(view.slice(0), Some(&b"two"[..]));
assert_eq!(view.slice(1), None);
assert_eq!(view.slice(2), Some(&b"one"[..]));
}
#[test]
fn routed_session_view_matches_prefix_view() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::SessionPrefix);
store.batch_set_session_slices_no_ttl(
b"s:9",
[
(&b"s:9:c:0"[..], &b"alpha"[..]),
(&b"s:9:c:1"[..], &b"beta"[..]),
],
);
let keys = vec![b"s:9:c:0".to_vec(), b"s:9:c:1".to_vec()];
let routed = store.batch_get_session_view_routed(store.route_session(b"s:9"), &keys);
let plain = store.batch_get_session_view(b"s:9", &keys);
assert_eq!(routed.slice(0), plain.slice(0));
assert_eq!(routed.slice(1), plain.slice(1));
}
#[test]
fn packed_session_publish_swaps_generation_without_invalidating_readers() {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::SessionPrefix);
store.batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
b"s:5".to_vec(),
vec![
(b"s:5:c:0".to_vec(), b"alpha".to_vec()),
(b"s:5:c:1".to_vec(), b"beta".to_vec()),
],
));
let keys = vec![b"s:5:c:0".to_vec(), b"s:5:c:1".to_vec()];
let view = store.batch_get_session_view(b"s:5", &keys);
assert_eq!(view.slice(0), Some(&b"alpha"[..]));
assert_eq!(view.slice(1), Some(&b"beta"[..]));
store.batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
b"s:5".to_vec(),
vec![(b"s:5:c:0".to_vec(), b"gamma".to_vec())],
));
assert_eq!(view.slice(0), Some(&b"alpha"[..]));
assert_eq!(view.slice(1), Some(&b"beta"[..]));
drop(view);
assert_eq!(store.get(b"s:5:c:0"), Some(b"gamma".to_vec()));
assert_eq!(store.get(b"s:5:c:1"), None);
}
#[test]
fn point_write_overrides_explicit_session_slot_entry() {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::SessionPrefix);
store.batch_set_session_slices_no_ttl(b"s:5", [(b"s:5:c:0".as_slice(), b"alpha".as_slice())]);
store.set(b"s:5:c:0".to_vec(), b"gamma".to_vec(), None);
assert_eq!(store.get(b"s:5:c:0"), Some(b"gamma".to_vec()));
let batch = store.batch_get_session_view(b"s:5", &[b"s:5:c:0".to_vec()]);
assert_eq!(batch.slice(0), Some(&b"gamma"[..]));
}
#[test]
fn owned_workers_cover_every_shard_once() {
let store = EmbeddedStore::with_route_mode(8, EmbeddedRouteMode::SessionPrefix);
let workers = store.into_owned_workers(3);
let mut seen = Vec::new();
for worker in &workers {
for shard_id in 0..8 {
if worker.owns_shard(shard_id) {
seen.push(shard_id);
}
}
}
seen.sort_unstable();
assert_eq!(seen, (0..8).collect::<Vec<_>>());
}
#[test]
fn owned_worker_point_view_reads_from_owned_shard() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
store.set(b"beta".to_vec(), b"two".to_vec(), None);
let mut workers = store.into_owned_workers(2);
let route = workers[0].route_key(b"alpha");
let worker = workers
.iter_mut()
.find(|worker| worker.owns_shard(route.shard_id))
.expect("worker should own routed shard");
let view = worker.get_view_routed_no_ttl(route, b"alpha");
assert!(view.is_hit());
assert_eq!(view.slice(), Some(&b"one"[..]));
assert_eq!(view.len(), 3);
}
#[test]
fn prepared_point_key_view_reads_value() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
let prepared = store.prepare_point_key(b"alpha");
let view = store.get_prepared_view_no_ttl(&prepared);
assert!(view.is_hit());
assert_eq!(view.slice(), Some(&b"one"[..]));
}
#[test]
fn owned_worker_prepared_point_key_reads_value() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::FullKey);
store.set(b"alpha".to_vec(), b"one".to_vec(), None);
let mut workers = store.into_owned_workers(2);
let prepared = workers[0].prepare_point_key(b"alpha");
let worker = workers
.iter_mut()
.find(|worker| worker.owns_shard(prepared.route().shard_id))
.expect("worker should own routed shard");
let value = worker.get_prepared_ref_no_ttl(&prepared);
assert_eq!(value, Some(&b"one"[..]));
}
#[test]
fn owned_worker_prehashed_session_view_matches_routed_view() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::SessionPrefix);
store.batch_set_session_slices_no_ttl(
b"s:7",
[
(&b"s:7:c:0"[..], &b"alpha"[..]),
(&b"s:7:c:1"[..], &b"beta"[..]),
],
);
let mut workers = store.into_owned_workers(2);
let route = workers[0].route_session(b"s:7");
let worker = workers
.iter_mut()
.find(|worker| worker.owns_shard(route.shard_id))
.expect("worker should own routed shard");
let keys = vec![
b"s:7:c:0".to_vec(),
b"s:7:c:missing".to_vec(),
b"s:7:c:1".to_vec(),
];
let plain = worker.batch_get_session_view_routed_no_ttl(route, &keys);
let plain_values = (0..plain.item_count())
.map(|index| plain.slice(index).map(|value| value.to_vec()))
.collect::<Vec<_>>();
let plain_hit_count = plain.hit_count();
let plain_total_bytes = plain.total_bytes();
drop(plain);
let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
let prehashed =
worker.batch_get_session_view_prehashed_routed_no_ttl(route, &keys, &key_hashes);
assert_eq!(prehashed.hit_count(), plain_hit_count);
assert_eq!(prehashed.total_bytes(), plain_total_bytes);
assert_eq!(prehashed.item_count(), plain_values.len());
for (index, expected) in plain_values.iter().enumerate() {
assert_eq!(
prehashed.slice(index).map(|value| value.to_vec()),
*expected,
"mismatch at index {index}"
);
}
}
#[test]
fn owned_worker_session_packed_matches_batch_view() {
let store = EmbeddedStore::with_route_mode(4, EmbeddedRouteMode::SessionPrefix);
store.batch_set_session_slices_no_ttl(
b"s:4",
[
(&b"s:4:c:0"[..], &b"left"[..]),
(&b"s:4:c:1"[..], &b"right"[..]),
],
);
let mut workers = store.into_owned_workers(2);
let route = workers[0].route_session(b"s:4");
let worker = workers
.iter_mut()
.find(|worker| worker.owns_shard(route.shard_id))
.expect("worker should own routed shard");
let keys = vec![
b"s:4:c:0".to_vec(),
b"s:4:c:missing".to_vec(),
b"s:4:c:1".to_vec(),
];
let view = worker.batch_get_session_view_routed_no_ttl(route, &keys);
let expected = (0..view.item_count())
.map(|index| view.slice(index).map(|value| value.to_vec()))
.collect::<Vec<_>>();
drop(view);
let packed = worker.batch_get_session_packed_routed_no_ttl(route, &keys);
let unpacked_values = packed
.offsets
.iter()
.zip(packed.lengths.iter())
.map(|(&offset, &length)| {
if offset == usize::MAX {
None
} else {
Some(packed.buffer[offset..offset + length].to_vec())
}
})
.collect::<Vec<_>>();
assert_eq!(unpacked_values, expected);
}
#[test]
fn memory_cap_lru_evicts_colder_generic_entry_before_session_slot() {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::SessionPrefix);
store.configure_memory_policy(Some(8), EvictionPolicy::Lru);
store.set(b"a".to_vec(), b"1".to_vec(), None);
assert_eq!(store.stored_bytes(), 2);
store.batch_set_session_slices_no_ttl(b"s:1", [(&b"s:1:c:0"[..], &b"x"[..])]);
assert_eq!(store.get(b"a"), None);
assert_eq!(store.get(b"s:1:c:0"), Some(b"x".to_vec()));
assert_eq!(store.len(), 1);
assert!(store.stored_bytes() <= 8);
}
#[test]
fn memory_cap_lfu_preserves_hot_generic_entry_over_session_slot() {
let store = EmbeddedStore::with_route_mode(1, EmbeddedRouteMode::SessionPrefix);
store.configure_memory_policy(Some(8), EvictionPolicy::Lfu);
store.set(b"a".to_vec(), b"1".to_vec(), None);
store.set(b"a".to_vec(), b"1".to_vec(), None);
store.set(b"a".to_vec(), b"1".to_vec(), None);
store.batch_set_session_slices_no_ttl(b"s:1", [(&b"s:1:c:0"[..], &b"x"[..])]);
assert_eq!(store.get(b"a"), Some(b"1".to_vec()));
assert_eq!(store.get(b"s:1:c:0"), None);
assert_eq!(store.len(), 1);
assert!(store.stored_bytes() <= 8);
}
#[cfg(feature = "telemetry")]
#[test]
fn telemetry_snapshot_tracks_basic_ops() {
let metrics = CacheTelemetry::new(2);
let store = EmbeddedStore::with_route_mode_and_metrics(
2,
EmbeddedRouteMode::SessionPrefix,
Some(Arc::clone(&metrics)),
);
store.set(b"s:1:c:0".to_vec(), b"alpha".to_vec(), None);
assert_eq!(store.get(b"s:1:c:0"), Some(b"alpha".to_vec()));
assert_eq!(store.get(b"s:1:c:9"), None);
let _ = store.batch_get_session_packed(b"s:1", &[b"s:1:c:0".to_vec()]);
let snapshot = metrics.snapshot();
assert_eq!(snapshot.sets, 1);
assert_eq!(snapshot.gets, 3);
assert_eq!(snapshot.batch_gets, 1);
assert_eq!(snapshot.hits, 2);
assert_eq!(snapshot.misses, 1);
assert_eq!(snapshot.keys_total, 1);
}