use std::collections::VecDeque;
use std::time::Duration;
use ahash::AHashMap;
use bytes::Bytes;
use compact_str::CompactString;
use rand::seq::IteratorRandom;
use tracing::warn;
use crate::dropper::DropHandle;
use crate::memory::{self, MemoryTracker};
use crate::time;
use crate::types::sorted_set::{ScoreBound, SortedSet, ZAddFlags};
use crate::types::{self, normalize_range, Value};
mod hash;
mod list;
#[cfg(feature = "protobuf")]
mod proto;
mod set;
mod string;
#[cfg(feature = "vector")]
mod vector;
mod zset;
const WRONGTYPE_MSG: &str = "WRONGTYPE Operation against a key holding the wrong kind of value";
const OOM_MSG: &str = "OOM command not allowed when used memory > 'maxmemory'";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WrongType;
impl std::fmt::Display for WrongType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{WRONGTYPE_MSG}")
}
}
impl std::error::Error for WrongType {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WriteError {
WrongType,
OutOfMemory,
}
impl From<WrongType> for WriteError {
fn from(_: WrongType) -> Self {
WriteError::WrongType
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IncrError {
WrongType,
NotAnInteger,
Overflow,
OutOfMemory,
}
impl std::fmt::Display for IncrError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IncrError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
IncrError::NotAnInteger => write!(f, "ERR value is not an integer or out of range"),
IncrError::Overflow => write!(f, "ERR increment or decrement would overflow"),
IncrError::OutOfMemory => write!(f, "{OOM_MSG}"),
}
}
}
impl std::error::Error for IncrError {}
#[derive(Debug, Clone, PartialEq)]
pub enum IncrFloatError {
WrongType,
NotAFloat,
NanOrInfinity,
OutOfMemory,
}
impl std::fmt::Display for IncrFloatError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IncrFloatError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
IncrFloatError::NotAFloat => write!(f, "ERR value is not a valid float"),
IncrFloatError::NanOrInfinity => {
write!(f, "ERR increment would produce NaN or Infinity")
}
IncrFloatError::OutOfMemory => write!(f, "{OOM_MSG}"),
}
}
}
impl std::error::Error for IncrFloatError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RenameError {
NoSuchKey,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CopyError {
NoSuchKey,
OutOfMemory,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LsetError {
WrongType,
NoSuchKey,
IndexOutOfRange,
}
impl std::fmt::Display for LsetError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LsetError::WrongType => write!(f, "{WRONGTYPE_MSG}"),
LsetError::NoSuchKey => write!(f, "ERR no such key"),
LsetError::IndexOutOfRange => write!(f, "ERR index out of range"),
}
}
}
impl std::error::Error for LsetError {}
impl std::fmt::Display for RenameError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RenameError::NoSuchKey => write!(f, "ERR no such key"),
}
}
}
impl std::error::Error for RenameError {}
#[derive(Debug, Clone)]
pub struct ZAddResult {
pub count: usize,
pub applied: Vec<(f64, String)>,
}
#[cfg(feature = "vector")]
#[derive(Debug, Clone)]
pub struct VAddResult {
pub element: String,
pub vector: Vec<f32>,
pub added: bool,
}
#[cfg(feature = "vector")]
#[derive(Debug, Clone)]
pub struct VAddBatchResult {
pub added_count: usize,
pub applied: Vec<(String, Vec<f32>)>,
}
#[cfg(feature = "vector")]
#[derive(Debug, Clone)]
pub enum VectorWriteError {
WrongType,
OutOfMemory,
IndexError(String),
PartialBatch {
message: String,
applied: Vec<(String, Vec<f32>)>,
},
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum EvictionPolicy {
#[default]
NoEviction,
AllKeysLru,
}
#[derive(Debug, Clone)]
pub struct ShardConfig {
pub max_memory: Option<usize>,
pub eviction_policy: EvictionPolicy,
pub shard_id: u16,
}
impl Default for ShardConfig {
fn default() -> Self {
Self {
max_memory: None,
eviction_policy: EvictionPolicy::NoEviction,
shard_id: 0,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum SetResult {
Ok,
OutOfMemory,
Blocked,
}
#[derive(Debug, Clone)]
pub(crate) struct Entry {
pub(crate) value: Value,
pub(crate) expires_at_ms: u64,
pub(crate) cached_value_size: u32,
pub(crate) last_access_secs: u32,
}
impl Entry {
fn new(value: Value, ttl: Option<Duration>) -> Self {
let cached_value_size = memory::value_size(&value) as u32;
Self {
value,
expires_at_ms: time::expiry_from_duration(ttl),
cached_value_size,
last_access_secs: time::now_secs(),
}
}
fn is_expired(&self) -> bool {
time::is_expired(self.expires_at_ms)
}
#[inline(always)]
fn touch(&mut self, track: bool) {
if track {
self.last_access_secs = time::now_secs();
}
}
fn entry_size(&self, key: &str) -> usize {
key.len() + self.cached_value_size as usize + memory::ENTRY_OVERHEAD
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TtlResult {
Seconds(u64),
Milliseconds(u64),
NoExpiry,
NotFound,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeyspaceStats {
pub key_count: usize,
pub used_bytes: usize,
pub keys_with_expiry: usize,
pub keys_expired: u64,
pub keys_evicted: u64,
pub oom_rejections: u64,
}
const EVICTION_SAMPLE_SIZE: usize = 16;
pub struct Keyspace {
entries: AHashMap<CompactString, Entry>,
memory: MemoryTracker,
config: ShardConfig,
expiry_count: usize,
expired_total: u64,
evicted_total: u64,
oom_rejections: u64,
drop_handle: Option<DropHandle>,
next_version: u64,
versions: AHashMap<CompactString, u64>,
track_access: bool,
}
impl Keyspace {
pub fn new() -> Self {
Self::with_config(ShardConfig::default())
}
pub fn with_config(config: ShardConfig) -> Self {
let track_access = config.eviction_policy == EvictionPolicy::AllKeysLru;
Self {
entries: AHashMap::new(),
memory: MemoryTracker::new(),
config,
expiry_count: 0,
expired_total: 0,
evicted_total: 0,
oom_rejections: 0,
drop_handle: None,
next_version: 0,
versions: AHashMap::new(),
track_access,
}
}
pub fn set_drop_handle(&mut self, handle: DropHandle) {
self.drop_handle = Some(handle);
}
fn bump_version(&mut self, key: &str) {
if let Some(ver) = self.versions.get_mut(key) {
self.next_version += 1;
*ver = self.next_version;
}
}
pub fn key_version(&mut self, key: &str) -> Option<u64> {
let entry = self.entries.get(key)?;
if entry.is_expired() {
return None;
}
let ver = *self
.versions
.entry(CompactString::from(key))
.or_insert(self.next_version);
Some(ver)
}
fn remove_version(&mut self, key: &str) {
self.versions.remove(key);
}
pub fn clear_versions(&mut self) {
self.versions.clear();
}
fn decrement_expiry_if_set(&mut self, entry: &Entry) {
if entry.expires_at_ms != 0 {
self.expiry_count = self.expiry_count.saturating_sub(1);
}
}
fn cleanup_after_remove(
&mut self,
key: &str,
old_size: usize,
is_empty: bool,
removed_bytes: usize,
) {
if is_empty {
if let Some(removed) = self.entries.remove(key) {
self.decrement_expiry_if_set(&removed);
}
self.memory.remove_with_size(old_size);
} else {
self.memory.shrink_by(removed_bytes);
if let Some(entry) = self.entries.get_mut(key) {
entry.cached_value_size =
(entry.cached_value_size as usize).saturating_sub(removed_bytes) as u32;
}
}
}
fn ensure_collection_type(
&self,
key: &str,
type_check: fn(&Value) -> bool,
) -> Result<bool, WriteError> {
match self.entries.get(key) {
None => Ok(true),
Some(e) if type_check(&e.value) => Ok(false),
Some(_) => Err(WriteError::WrongType),
}
}
fn reserve_memory(
&mut self,
is_new: bool,
key: &str,
base_overhead: usize,
element_increase: usize,
) -> Result<(), WriteError> {
let estimated_increase = if is_new {
memory::ENTRY_OVERHEAD + key.len() + base_overhead + element_increase
} else {
element_increase
};
if self.enforce_memory_limit(estimated_increase) {
Ok(())
} else {
Err(WriteError::OutOfMemory)
}
}
fn insert_empty(&mut self, key: &str, value: Value) {
self.memory.add(key, &value);
let entry = Entry::new(value, None);
self.entries.insert(CompactString::from(key), entry);
self.bump_version(key);
}
fn track_size<T>(&mut self, key: &str, f: impl FnOnce(&mut Entry) -> T) -> Option<T> {
let entry = self.entries.get_mut(key)?;
let old_size = entry.entry_size(key);
let result = f(entry);
let entry = self.entries.get_mut(key)?;
let new_value_size = memory::value_size(&entry.value);
entry.cached_value_size = new_value_size as u32;
let new_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
self.memory.adjust(old_size, new_size);
self.bump_version(key);
Some(result)
}
fn adjust_expiry_count(&mut self, had_expiry: bool, has_expiry: bool) {
match (had_expiry, has_expiry) {
(false, true) => self.expiry_count += 1,
(true, false) => self.expiry_count = self.expiry_count.saturating_sub(1),
_ => {}
}
}
fn try_evict(&mut self) -> bool {
if self.entries.is_empty() {
return false;
}
let mut rng = rand::rng();
let mut best_key: Option<&str> = None;
let mut best_access = u32::MAX;
let mut seen = 0usize;
for (key, entry) in &self.entries {
seen += 1;
if seen <= EVICTION_SAMPLE_SIZE {
if entry.last_access_secs < best_access {
best_access = entry.last_access_secs;
best_key = Some(&**key);
}
} else {
use rand::Rng;
let j = rng.random_range(0..seen);
if j < EVICTION_SAMPLE_SIZE && entry.last_access_secs < best_access {
best_access = entry.last_access_secs;
best_key = Some(&**key);
}
}
}
if let Some(victim) = best_key {
let victim = victim.to_owned();
if let Some(entry) = self.entries.remove(victim.as_str()) {
self.memory.remove(&victim, &entry.value);
self.decrement_expiry_if_set(&entry);
self.evicted_total += 1;
self.remove_version(&victim);
self.defer_drop(entry.value);
return true;
}
}
false
}
fn enforce_memory_limit(&mut self, estimated_increase: usize) -> bool {
if let Some(max) = self.config.max_memory {
let limit = memory::effective_limit(max);
while self.memory.used_bytes() + estimated_increase > limit {
match self.config.eviction_policy {
EvictionPolicy::NoEviction => {
self.oom_rejections += 1;
if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000) {
warn!(
used_bytes = self.memory.used_bytes(),
limit,
requested = estimated_increase,
total_rejections = self.oom_rejections,
"OOM: write rejected (policy: noeviction)"
);
}
return false;
}
EvictionPolicy::AllKeysLru => {
if !self.try_evict() {
self.oom_rejections += 1;
if self.oom_rejections == 1 || self.oom_rejections.is_multiple_of(1000)
{
warn!(
used_bytes = self.memory.used_bytes(),
limit,
requested = estimated_increase,
total_rejections = self.oom_rejections,
"OOM: write rejected (eviction exhausted)"
);
}
return false;
}
}
}
}
}
true
}
pub fn del(&mut self, key: &str) -> bool {
if self.remove_if_expired(key) {
return false;
}
if let Some(entry) = self.entries.remove(key) {
self.memory.remove(key, &entry.value);
self.decrement_expiry_if_set(&entry);
self.remove_version(key);
self.defer_drop(entry.value);
true
} else {
false
}
}
pub fn unlink(&mut self, key: &str) -> bool {
if self.remove_if_expired(key) {
return false;
}
if let Some(entry) = self.entries.remove(key) {
self.memory.remove(key, &entry.value);
self.decrement_expiry_if_set(&entry);
self.remove_version(key);
if let Some(ref handle) = self.drop_handle {
handle.defer_value(entry.value);
}
true
} else {
false
}
}
pub(crate) fn flush_async(&mut self) -> AHashMap<CompactString, Entry> {
let old = std::mem::take(&mut self.entries);
self.memory.reset();
self.expiry_count = 0;
self.versions.clear();
old
}
pub fn exists(&mut self, key: &str) -> bool {
if self.remove_if_expired(key) {
return false;
}
self.entries.contains_key(key)
}
pub fn random_key(&mut self) -> Option<String> {
for _ in 0..5 {
let mut rng = rand::rng();
let key = self.entries.keys().choose(&mut rng)?.clone();
if self.remove_if_expired(&key) {
continue;
}
return Some(key.to_string());
}
None
}
pub fn touch(&mut self, key: &str) -> bool {
if self.remove_if_expired(key) {
return false;
}
match self.entries.get_mut(key) {
Some(entry) => {
entry.touch(self.track_access);
true
}
None => false,
}
}
pub fn sort(
&mut self,
key: &str,
desc: bool,
alpha: bool,
limit: Option<(i64, i64)>,
) -> Result<Vec<Bytes>, &'static str> {
if self.remove_if_expired(key) {
return Ok(Vec::new());
}
let entry = match self.entries.get_mut(key) {
Some(e) => {
e.touch(self.track_access);
e
}
None => return Ok(Vec::new()),
};
let mut items: Vec<Bytes> = match &entry.value {
Value::List(deq) => deq.iter().cloned().collect(),
Value::Set(set) => set.iter().map(|s| Bytes::from(s.clone())).collect(),
Value::SortedSet(zset) => zset
.iter()
.map(|(m, _)| Bytes::from(m.to_owned()))
.collect(),
_ => return Err(WRONGTYPE_MSG),
};
if alpha {
items.sort();
if desc {
items.reverse();
}
} else {
let mut parse_err = false;
items.sort_by(|a, b| {
let a_str = std::str::from_utf8(a).unwrap_or("");
let b_str = std::str::from_utf8(b).unwrap_or("");
let a_val = a_str.parse::<f64>().unwrap_or_else(|_| {
parse_err = true;
0.0
});
let b_val = b_str.parse::<f64>().unwrap_or_else(|_| {
parse_err = true;
0.0
});
if desc {
b_val
.partial_cmp(&a_val)
.unwrap_or(std::cmp::Ordering::Equal)
} else {
a_val
.partial_cmp(&b_val)
.unwrap_or(std::cmp::Ordering::Equal)
}
});
if parse_err {
return Err("ERR One or more scores can't be converted into double");
}
}
if let Some((offset, count)) = limit {
let offset = offset.max(0) as usize;
let count = count.max(0) as usize;
let end = offset.saturating_add(count).min(items.len());
if offset < items.len() {
items = items[offset..end].to_vec();
} else {
items.clear();
}
}
Ok(items)
}
pub fn expire(&mut self, key: &str, seconds: u64) -> bool {
if self.remove_if_expired(key) {
return false;
}
match self.entries.get_mut(key) {
Some(entry) => {
if entry.expires_at_ms == 0 {
self.expiry_count += 1;
}
entry.expires_at_ms = time::now_ms().saturating_add(seconds.saturating_mul(1000));
self.bump_version(key);
true
}
None => false,
}
}
pub fn ttl(&mut self, key: &str) -> TtlResult {
if self.remove_if_expired(key) {
return TtlResult::NotFound;
}
match self.entries.get(key) {
Some(entry) => match time::remaining_secs(entry.expires_at_ms) {
Some(secs) => TtlResult::Seconds(secs),
None => TtlResult::NoExpiry,
},
None => TtlResult::NotFound,
}
}
pub fn persist(&mut self, key: &str) -> bool {
if self.remove_if_expired(key) {
return false;
}
match self.entries.get_mut(key) {
Some(entry) => {
if entry.expires_at_ms != 0 {
entry.expires_at_ms = 0;
self.expiry_count = self.expiry_count.saturating_sub(1);
self.bump_version(key);
true
} else {
false
}
}
None => false,
}
}
pub fn pttl(&mut self, key: &str) -> TtlResult {
if self.remove_if_expired(key) {
return TtlResult::NotFound;
}
match self.entries.get(key) {
Some(entry) => match time::remaining_ms(entry.expires_at_ms) {
Some(ms) => TtlResult::Milliseconds(ms),
None => TtlResult::NoExpiry,
},
None => TtlResult::NotFound,
}
}
pub fn pexpire(&mut self, key: &str, millis: u64) -> bool {
if self.remove_if_expired(key) {
return false;
}
match self.entries.get_mut(key) {
Some(entry) => {
if entry.expires_at_ms == 0 {
self.expiry_count += 1;
}
entry.expires_at_ms = time::now_ms().saturating_add(millis);
self.bump_version(key);
true
}
None => false,
}
}
pub fn keys(&self, pattern: &str) -> Vec<String> {
let len = self.entries.len();
if len > 10_000 {
warn!(
key_count = len,
"KEYS on large keyspace, consider SCAN instead"
);
}
let compiled = GlobPattern::new(pattern);
self.entries
.iter()
.filter(|(_, entry)| !entry.is_expired())
.filter(|(key, _)| compiled.matches(key))
.map(|(key, _)| String::from(&**key))
.collect()
}
pub fn count_keys_in_slot(&self, slot: u16) -> usize {
self.entries
.iter()
.filter(|(_, entry)| !entry.is_expired())
.filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
.count()
}
pub fn get_keys_in_slot(&self, slot: u16, count: usize) -> Vec<String> {
self.entries
.iter()
.filter(|(_, entry)| !entry.is_expired())
.filter(|(key, _)| ember_cluster::key_slot(key.as_bytes()) == slot)
.take(count)
.map(|(key, _)| String::from(&**key))
.collect()
}
pub fn rename(&mut self, key: &str, newkey: &str) -> Result<(), RenameError> {
self.remove_if_expired(key);
self.remove_if_expired(newkey);
let entry = match self.entries.remove(key) {
Some(entry) => entry,
None => return Err(RenameError::NoSuchKey),
};
self.memory.remove(key, &entry.value);
self.decrement_expiry_if_set(&entry);
if let Some(old_dest) = self.entries.remove(newkey) {
self.memory.remove(newkey, &old_dest.value);
self.decrement_expiry_if_set(&old_dest);
}
self.memory.add(newkey, &entry.value);
if entry.expires_at_ms != 0 {
self.expiry_count += 1;
}
self.remove_version(key);
self.entries.insert(CompactString::from(newkey), entry);
self.bump_version(newkey);
Ok(())
}
pub fn copy(&mut self, source: &str, dest: &str, replace: bool) -> Result<bool, CopyError> {
self.remove_if_expired(source);
self.remove_if_expired(dest);
let src_entry = match self.entries.get(source) {
Some(e) => e,
None => return Err(CopyError::NoSuchKey),
};
if !replace && self.entries.contains_key(dest) {
return Ok(false);
}
let cloned_value = src_entry.value.clone();
let cloned_expire = if src_entry.expires_at_ms != 0 {
Some(src_entry.expires_at_ms)
} else {
None
};
let new_size = memory::entry_size(dest, &cloned_value);
let old_dest_size = self
.entries
.get(dest)
.map(|e| e.entry_size(dest))
.unwrap_or(0);
let net_increase = new_size.saturating_sub(old_dest_size);
if !self.enforce_memory_limit(net_increase) {
return Err(CopyError::OutOfMemory);
}
if let Some(old_dest) = self.entries.remove(dest) {
self.memory.remove(dest, &old_dest.value);
self.decrement_expiry_if_set(&old_dest);
self.defer_drop(old_dest.value);
}
self.memory.add(dest, &cloned_value);
let has_expiry = cloned_expire.is_some();
if has_expiry {
self.expiry_count += 1;
}
let mut entry = Entry::new(cloned_value, None);
if let Some(ts) = cloned_expire {
entry.expires_at_ms = ts;
}
self.entries.insert(CompactString::from(dest), entry);
self.bump_version(dest);
Ok(true)
}
pub fn stats(&self) -> KeyspaceStats {
KeyspaceStats {
key_count: self.memory.key_count(),
used_bytes: self.memory.used_bytes(),
keys_with_expiry: self.expiry_count,
keys_expired: self.expired_total,
keys_evicted: self.evicted_total,
oom_rejections: self.oom_rejections,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn clear(&mut self) {
self.entries.clear();
self.memory.reset();
self.expiry_count = 0;
self.versions.clear();
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn scan_keys(
&self,
cursor: u64,
count: usize,
pattern: Option<&str>,
) -> (u64, Vec<String>) {
let mut keys = Vec::with_capacity(count);
let mut position = 0u64;
let target_count = if count == 0 { 10 } else { count };
let compiled = pattern.map(GlobPattern::new);
for (key, entry) in self.entries.iter() {
if entry.is_expired() {
continue;
}
if position < cursor {
position += 1;
continue;
}
if let Some(ref pat) = compiled {
if !pat.matches(key) {
position += 1;
continue;
}
}
keys.push(String::from(&**key));
position += 1;
if keys.len() >= target_count {
return (position, keys);
}
}
(0, keys)
}
pub fn dump(&mut self, key: &str) -> Option<(&Value, i64)> {
if self.remove_if_expired(key) {
return None;
}
let entry = self.entries.get(key)?;
let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
Some(ms) => ms.min(i64::MAX as u64) as i64,
None => -1,
};
Some((&entry.value, ttl_ms))
}
pub fn iter_entries(&self) -> impl Iterator<Item = (&str, &Value, i64)> {
self.entries.iter().filter_map(move |(key, entry)| {
if entry.is_expired() {
return None;
}
let ttl_ms = match time::remaining_ms(entry.expires_at_ms) {
Some(ms) => ms.min(i64::MAX as u64) as i64,
None => -1,
};
Some((&**key, &entry.value, ttl_ms))
})
}
pub fn restore(&mut self, key: String, value: Value, ttl: Option<Duration>) {
let has_expiry = ttl.is_some();
if let Some(old) = self.entries.get(key.as_str()) {
self.memory.replace(&key, &old.value, &value);
self.adjust_expiry_count(old.expires_at_ms != 0, has_expiry);
} else {
self.memory.add(&key, &value);
if has_expiry {
self.expiry_count += 1;
}
}
let entry = Entry::new(value, ttl);
self.entries.insert(CompactString::from(key.clone()), entry);
self.bump_version(&key);
}
pub fn expire_sample(&mut self, count: usize) -> usize {
if self.entries.is_empty() {
return 0;
}
let mut rng = rand::rng();
let keys_to_check: Vec<String> = self
.entries
.keys()
.choose_multiple(&mut rng, count)
.into_iter()
.map(|k| String::from(&**k))
.collect();
let mut removed = 0;
for key in &keys_to_check {
if self.remove_if_expired(key) {
removed += 1;
}
}
removed
}
fn remove_expired_entry(&mut self, key: &str) {
if let Some(entry) = self.entries.remove(key) {
self.memory.remove(key, &entry.value);
self.decrement_expiry_if_set(&entry);
self.expired_total += 1;
self.remove_version(key);
self.defer_drop(entry.value);
}
}
fn remove_if_expired(&mut self, key: &str) -> bool {
let expired = self
.entries
.get(key)
.map(|e| e.is_expired())
.unwrap_or(false);
if expired {
if let Some(entry) = self.entries.remove(key) {
self.memory.remove(key, &entry.value);
self.decrement_expiry_if_set(&entry);
self.expired_total += 1;
self.remove_version(key);
self.defer_drop(entry.value);
}
}
expired
}
fn get_live_entry(&mut self, key: &str) -> Option<&mut Entry> {
self.remove_if_expired(key);
let entry = self.entries.get_mut(key)?;
entry.touch(self.track_access);
Some(entry)
}
fn defer_drop(&self, value: Value) {
if let Some(ref handle) = self.drop_handle {
handle.defer_value(value);
}
}
}
impl Default for Keyspace {
fn default() -> Self {
Self::new()
}
}
pub(crate) fn format_float(val: f64) -> String {
if val == 0.0 {
return "0".into();
}
let s = format!("{:.17e}", val);
let reparsed: f64 = s.parse().unwrap_or(val);
if reparsed == reparsed.trunc() && reparsed >= i64::MIN as f64 && reparsed <= i64::MAX as f64 {
format!("{}", reparsed as i64)
} else {
let formatted = format!("{}", reparsed);
formatted
}
}
pub(crate) fn glob_match(pattern: &str, text: &str) -> bool {
let pat: Vec<char> = pattern.chars().collect();
glob_match_compiled(&pat, text)
}
pub(crate) struct GlobPattern {
chars: Vec<char>,
}
impl GlobPattern {
pub(crate) fn new(pattern: &str) -> Self {
Self {
chars: pattern.chars().collect(),
}
}
pub(crate) fn matches(&self, text: &str) -> bool {
glob_match_compiled(&self.chars, text)
}
}
fn glob_match_compiled(pat: &[char], text: &str) -> bool {
let txt: Vec<char> = text.chars().collect();
let mut pi = 0; let mut ti = 0;
let mut star_pi: Option<usize> = None;
let mut star_ti: usize = 0;
while ti < txt.len() || pi < pat.len() {
if pi < pat.len() {
match pat[pi] {
'*' => {
star_pi = Some(pi);
star_ti = ti;
pi += 1;
continue;
}
'?' if ti < txt.len() => {
pi += 1;
ti += 1;
continue;
}
'[' if ti < txt.len() => {
let tc = txt[ti];
let mut j = pi + 1;
let mut negated = false;
let mut matched = false;
if j < pat.len() && (pat[j] == '^' || pat[j] == '!') {
negated = true;
j += 1;
}
while j < pat.len() && pat[j] != ']' {
if pat[j] == tc {
matched = true;
}
j += 1;
}
if negated {
matched = !matched;
}
if matched && j < pat.len() {
pi = j + 1; ti += 1;
continue;
}
}
c if ti < txt.len() && c == txt[ti] => {
pi += 1;
ti += 1;
continue;
}
_ => {}
}
}
if let Some(sp) = star_pi {
pi = sp + 1;
star_ti += 1;
ti = star_ti;
if ti > txt.len() {
return false;
}
} else {
return false;
}
}
while pi < pat.len() && pat[pi] == '*' {
pi += 1;
}
pi == pat.len()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn del_existing() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
assert!(ks.del("key"));
assert_eq!(ks.get("key").unwrap(), None);
}
#[test]
fn del_missing() {
let mut ks = Keyspace::new();
assert!(!ks.del("nope"));
}
#[test]
fn exists_present_and_absent() {
let mut ks = Keyspace::new();
ks.set("yes".into(), Bytes::from("here"), None, false, false);
assert!(ks.exists("yes"));
assert!(!ks.exists("no"));
}
#[test]
fn ttl_no_expiry() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
}
#[test]
fn ttl_not_found() {
let mut ks = Keyspace::new();
assert_eq!(ks.ttl("missing"), TtlResult::NotFound);
}
#[test]
fn ttl_with_expiry() {
let mut ks = Keyspace::new();
ks.set(
"key".into(),
Bytes::from("val"),
Some(Duration::from_secs(100)),
false,
false,
);
match ks.ttl("key") {
TtlResult::Seconds(s) => assert!((98..=100).contains(&s)),
other => panic!("expected Seconds, got {other:?}"),
}
}
#[test]
fn ttl_expired_key() {
let mut ks = Keyspace::new();
ks.set(
"temp".into(),
Bytes::from("val"),
Some(Duration::from_millis(10)),
false,
false,
);
thread::sleep(Duration::from_millis(30));
assert_eq!(ks.ttl("temp"), TtlResult::NotFound);
}
#[test]
fn expire_existing_key() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
assert!(ks.expire("key", 60));
match ks.ttl("key") {
TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
other => panic!("expected Seconds, got {other:?}"),
}
}
#[test]
fn expire_missing_key() {
let mut ks = Keyspace::new();
assert!(!ks.expire("nope", 60));
}
#[test]
fn del_expired_key_returns_false() {
let mut ks = Keyspace::new();
ks.set(
"temp".into(),
Bytes::from("val"),
Some(Duration::from_millis(10)),
false,
false,
);
thread::sleep(Duration::from_millis(30));
assert!(!ks.del("temp"));
}
#[test]
fn memory_increases_on_set() {
let mut ks = Keyspace::new();
assert_eq!(ks.stats().used_bytes, 0);
ks.set("key".into(), Bytes::from("value"), None, false, false);
assert!(ks.stats().used_bytes > 0);
assert_eq!(ks.stats().key_count, 1);
}
#[test]
fn memory_decreases_on_del() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("value"), None, false, false);
let after_set = ks.stats().used_bytes;
ks.del("key");
assert_eq!(ks.stats().used_bytes, 0);
assert!(after_set > 0);
}
#[test]
fn memory_adjusts_on_overwrite() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("short"), None, false, false);
let small = ks.stats().used_bytes;
ks.set(
"key".into(),
Bytes::from("a much longer value"),
None,
false,
false,
);
let large = ks.stats().used_bytes;
assert!(large > small);
assert_eq!(ks.stats().key_count, 1);
}
#[test]
fn memory_decreases_on_expired_removal() {
let mut ks = Keyspace::new();
ks.set(
"temp".into(),
Bytes::from("data"),
Some(Duration::from_millis(10)),
false,
false,
);
assert!(ks.stats().used_bytes > 0);
thread::sleep(Duration::from_millis(30));
let _ = ks.get("temp");
assert_eq!(ks.stats().used_bytes, 0);
assert_eq!(ks.stats().key_count, 0);
}
#[test]
fn stats_tracks_expiry_count() {
let mut ks = Keyspace::new();
ks.set("a".into(), Bytes::from("1"), None, false, false);
ks.set(
"b".into(),
Bytes::from("2"),
Some(Duration::from_secs(100)),
false,
false,
);
ks.set(
"c".into(),
Bytes::from("3"),
Some(Duration::from_secs(200)),
false,
false,
);
let stats = ks.stats();
assert_eq!(stats.key_count, 3);
assert_eq!(stats.keys_with_expiry, 2);
}
#[test]
fn noeviction_returns_oom_when_full() {
let config = ShardConfig {
max_memory: Some(130),
eviction_policy: EvictionPolicy::NoEviction,
..ShardConfig::default()
};
let mut ks = Keyspace::with_config(config);
assert_eq!(
ks.set("a".into(), Bytes::from("val"), None, false, false),
SetResult::Ok
);
let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
assert_eq!(result, SetResult::OutOfMemory);
assert!(ks.exists("a"));
}
#[test]
fn lru_eviction_makes_room() {
let config = ShardConfig {
max_memory: Some(130),
eviction_policy: EvictionPolicy::AllKeysLru,
..ShardConfig::default()
};
let mut ks = Keyspace::with_config(config);
assert_eq!(
ks.set("a".into(), Bytes::from("val"), None, false, false),
SetResult::Ok
);
assert_eq!(
ks.set("b".into(), Bytes::from("val"), None, false, false),
SetResult::Ok
);
assert!(!ks.exists("a"));
assert!(ks.exists("b"));
}
#[test]
fn safety_margin_rejects_near_raw_limit() {
let config = ShardConfig {
max_memory: Some(120),
eviction_policy: EvictionPolicy::NoEviction,
..ShardConfig::default()
};
let mut ks = Keyspace::with_config(config);
assert_eq!(
ks.set("a".into(), Bytes::from("val"), None, false, false),
SetResult::Ok
);
let result = ks.set("b".into(), Bytes::from("val"), None, false, false);
assert_eq!(result, SetResult::OutOfMemory);
}
#[test]
fn overwrite_same_size_succeeds_at_limit() {
let config = ShardConfig {
max_memory: Some(130),
eviction_policy: EvictionPolicy::NoEviction,
..ShardConfig::default()
};
let mut ks = Keyspace::with_config(config);
assert_eq!(
ks.set("a".into(), Bytes::from("val"), None, false, false),
SetResult::Ok
);
assert_eq!(
ks.set("a".into(), Bytes::from("new"), None, false, false),
SetResult::Ok
);
assert_eq!(
ks.get("a").unwrap(),
Some(Value::String(Bytes::from("new")))
);
}
#[test]
fn overwrite_larger_value_respects_limit() {
let config = ShardConfig {
max_memory: Some(130),
eviction_policy: EvictionPolicy::NoEviction,
..ShardConfig::default()
};
let mut ks = Keyspace::with_config(config);
assert_eq!(
ks.set("a".into(), Bytes::from("val"), None, false, false),
SetResult::Ok
);
let big_value = "x".repeat(200);
let result = ks.set("a".into(), Bytes::from(big_value), None, false, false);
assert_eq!(result, SetResult::OutOfMemory);
assert_eq!(
ks.get("a").unwrap(),
Some(Value::String(Bytes::from("val")))
);
}
#[test]
fn iter_entries_returns_live_entries() {
let mut ks = Keyspace::new();
ks.set("a".into(), Bytes::from("1"), None, false, false);
ks.set(
"b".into(),
Bytes::from("2"),
Some(Duration::from_secs(100)),
false,
false,
);
let entries: Vec<_> = ks.iter_entries().collect();
assert_eq!(entries.len(), 2);
}
#[test]
fn iter_entries_skips_expired() {
let mut ks = Keyspace::new();
ks.set(
"dead".into(),
Bytes::from("gone"),
Some(Duration::from_millis(1)),
false,
false,
);
ks.set("alive".into(), Bytes::from("here"), None, false, false);
thread::sleep(Duration::from_millis(10));
let entries: Vec<_> = ks.iter_entries().collect();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0, "alive");
}
#[test]
fn iter_entries_ttl_for_no_expiry() {
let mut ks = Keyspace::new();
ks.set("permanent".into(), Bytes::from("val"), None, false, false);
let entries: Vec<_> = ks.iter_entries().collect();
assert_eq!(entries[0].2, -1);
}
#[test]
fn restore_adds_entry() {
let mut ks = Keyspace::new();
ks.restore("restored".into(), Value::String(Bytes::from("data")), None);
assert_eq!(
ks.get("restored").unwrap(),
Some(Value::String(Bytes::from("data")))
);
assert_eq!(ks.stats().key_count, 1);
}
#[test]
fn restore_with_zero_ttl_expires_immediately() {
let mut ks = Keyspace::new();
ks.restore(
"short-lived".into(),
Value::String(Bytes::from("data")),
Some(Duration::from_millis(1)),
);
std::thread::sleep(Duration::from_millis(5));
assert!(ks.get("short-lived").is_err() || ks.get("short-lived").unwrap().is_none());
}
#[test]
fn restore_overwrites_existing() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("old"), None, false, false);
ks.restore("key".into(), Value::String(Bytes::from("new")), None);
assert_eq!(
ks.get("key").unwrap(),
Some(Value::String(Bytes::from("new")))
);
assert_eq!(ks.stats().key_count, 1);
}
#[test]
fn restore_bypasses_memory_limit() {
let config = ShardConfig {
max_memory: Some(50), eviction_policy: EvictionPolicy::NoEviction,
..ShardConfig::default()
};
let mut ks = Keyspace::with_config(config);
ks.restore(
"big".into(),
Value::String(Bytes::from("x".repeat(200))),
None,
);
assert_eq!(ks.stats().key_count, 1);
}
#[test]
fn no_limit_never_rejects() {
let mut ks = Keyspace::new();
for i in 0..100 {
assert_eq!(
ks.set(format!("key:{i}"), Bytes::from("value"), None, false, false),
SetResult::Ok
);
}
assert_eq!(ks.len(), 100);
}
#[test]
fn clear_removes_all_keys() {
let mut ks = Keyspace::new();
ks.set("a".into(), Bytes::from("1"), None, false, false);
ks.set(
"b".into(),
Bytes::from("2"),
Some(Duration::from_secs(60)),
false,
false,
);
ks.lpush("list", &[Bytes::from("x")]).unwrap();
assert_eq!(ks.len(), 3);
assert!(ks.stats().used_bytes > 0);
assert_eq!(ks.stats().keys_with_expiry, 1);
ks.clear();
assert_eq!(ks.len(), 0);
assert!(ks.is_empty());
assert_eq!(ks.stats().used_bytes, 0);
assert_eq!(ks.stats().keys_with_expiry, 0);
}
#[test]
fn scan_returns_keys() {
let mut ks = Keyspace::new();
ks.set("key1".into(), Bytes::from("a"), None, false, false);
ks.set("key2".into(), Bytes::from("b"), None, false, false);
ks.set("key3".into(), Bytes::from("c"), None, false, false);
let (cursor, keys) = ks.scan_keys(0, 10, None);
assert_eq!(cursor, 0); assert_eq!(keys.len(), 3);
}
#[test]
fn scan_empty_keyspace() {
let ks = Keyspace::new();
let (cursor, keys) = ks.scan_keys(0, 10, None);
assert_eq!(cursor, 0);
assert!(keys.is_empty());
}
#[test]
fn scan_with_pattern() {
let mut ks = Keyspace::new();
ks.set("user:1".into(), Bytes::from("a"), None, false, false);
ks.set("user:2".into(), Bytes::from("b"), None, false, false);
ks.set("item:1".into(), Bytes::from("c"), None, false, false);
let (cursor, keys) = ks.scan_keys(0, 10, Some("user:*"));
assert_eq!(cursor, 0);
assert_eq!(keys.len(), 2);
for k in &keys {
assert!(k.starts_with("user:"));
}
}
#[test]
fn scan_with_count_limit() {
let mut ks = Keyspace::new();
for i in 0..10 {
ks.set(format!("k{i}"), Bytes::from("v"), None, false, false);
}
let (cursor, keys) = ks.scan_keys(0, 3, None);
assert!(!keys.is_empty());
assert!(keys.len() <= 3);
if cursor != 0 {
let (cursor2, keys2) = ks.scan_keys(cursor, 3, None);
assert!(!keys2.is_empty());
let _ = (cursor2, keys2);
}
}
#[test]
fn scan_skips_expired_keys() {
let mut ks = Keyspace::new();
ks.set("live".into(), Bytes::from("a"), None, false, false);
ks.set(
"expired".into(),
Bytes::from("b"),
Some(Duration::from_millis(1)),
false,
false,
);
std::thread::sleep(Duration::from_millis(5));
let (_, keys) = ks.scan_keys(0, 10, None);
assert_eq!(keys.len(), 1);
assert_eq!(keys[0], "live");
}
#[test]
fn glob_match_star() {
assert!(super::glob_match("user:*", "user:123"));
assert!(super::glob_match("user:*", "user:"));
assert!(super::glob_match("*:data", "foo:data"));
assert!(!super::glob_match("user:*", "item:123"));
}
#[test]
fn glob_match_question() {
assert!(super::glob_match("key?", "key1"));
assert!(super::glob_match("key?", "keya"));
assert!(!super::glob_match("key?", "key"));
assert!(!super::glob_match("key?", "key12"));
}
#[test]
fn glob_match_brackets() {
assert!(super::glob_match("key[abc]", "keya"));
assert!(super::glob_match("key[abc]", "keyb"));
assert!(!super::glob_match("key[abc]", "keyd"));
}
#[test]
fn glob_match_literal() {
assert!(super::glob_match("exact", "exact"));
assert!(!super::glob_match("exact", "exactnot"));
assert!(!super::glob_match("exact", "notexact"));
}
#[test]
fn persist_removes_expiry() {
let mut ks = Keyspace::new();
ks.set(
"key".into(),
Bytes::from("val"),
Some(Duration::from_secs(60)),
false,
false,
);
assert!(matches!(ks.ttl("key"), TtlResult::Seconds(_)));
assert!(ks.persist("key"));
assert_eq!(ks.ttl("key"), TtlResult::NoExpiry);
assert_eq!(ks.stats().keys_with_expiry, 0);
}
#[test]
fn persist_returns_false_without_expiry() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
assert!(!ks.persist("key"));
}
#[test]
fn persist_returns_false_for_missing_key() {
let mut ks = Keyspace::new();
assert!(!ks.persist("missing"));
}
#[test]
fn pttl_returns_milliseconds() {
let mut ks = Keyspace::new();
ks.set(
"key".into(),
Bytes::from("val"),
Some(Duration::from_secs(60)),
false,
false,
);
match ks.pttl("key") {
TtlResult::Milliseconds(ms) => assert!(ms > 59_000 && ms <= 60_000),
other => panic!("expected Milliseconds, got {other:?}"),
}
}
#[test]
fn pttl_no_expiry() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
assert_eq!(ks.pttl("key"), TtlResult::NoExpiry);
}
#[test]
fn pttl_not_found() {
let mut ks = Keyspace::new();
assert_eq!(ks.pttl("missing"), TtlResult::NotFound);
}
#[test]
fn pexpire_sets_ttl_in_millis() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
assert!(ks.pexpire("key", 5000));
match ks.pttl("key") {
TtlResult::Milliseconds(ms) => assert!(ms > 4000 && ms <= 5000),
other => panic!("expected Milliseconds, got {other:?}"),
}
assert_eq!(ks.stats().keys_with_expiry, 1);
}
#[test]
fn pexpire_missing_key_returns_false() {
let mut ks = Keyspace::new();
assert!(!ks.pexpire("missing", 5000));
}
#[test]
fn pexpire_overwrites_existing_ttl() {
let mut ks = Keyspace::new();
ks.set(
"key".into(),
Bytes::from("val"),
Some(Duration::from_secs(60)),
false,
false,
);
assert!(ks.pexpire("key", 500));
match ks.pttl("key") {
TtlResult::Milliseconds(ms) => assert!(ms <= 500),
other => panic!("expected Milliseconds, got {other:?}"),
}
assert_eq!(ks.stats().keys_with_expiry, 1);
}
#[test]
fn keys_match_all() {
let mut ks = Keyspace::new();
ks.set("a".into(), Bytes::from("1"), None, false, false);
ks.set("b".into(), Bytes::from("2"), None, false, false);
ks.set("c".into(), Bytes::from("3"), None, false, false);
let mut result = ks.keys("*");
result.sort();
assert_eq!(result, vec!["a", "b", "c"]);
}
#[test]
fn keys_with_pattern() {
let mut ks = Keyspace::new();
ks.set("user:1".into(), Bytes::from("a"), None, false, false);
ks.set("user:2".into(), Bytes::from("b"), None, false, false);
ks.set("item:1".into(), Bytes::from("c"), None, false, false);
let mut result = ks.keys("user:*");
result.sort();
assert_eq!(result, vec!["user:1", "user:2"]);
}
#[test]
fn keys_skips_expired() {
let mut ks = Keyspace::new();
ks.set("live".into(), Bytes::from("a"), None, false, false);
ks.set(
"dead".into(),
Bytes::from("b"),
Some(Duration::from_millis(1)),
false,
false,
);
thread::sleep(Duration::from_millis(5));
let result = ks.keys("*");
assert_eq!(result, vec!["live"]);
}
#[test]
fn keys_empty_keyspace() {
let ks = Keyspace::new();
assert!(ks.keys("*").is_empty());
}
#[test]
fn rename_basic() {
let mut ks = Keyspace::new();
ks.set("old".into(), Bytes::from("value"), None, false, false);
ks.rename("old", "new").unwrap();
assert!(!ks.exists("old"));
assert_eq!(
ks.get("new").unwrap(),
Some(Value::String(Bytes::from("value")))
);
}
#[test]
fn rename_preserves_expiry() {
let mut ks = Keyspace::new();
ks.set(
"old".into(),
Bytes::from("val"),
Some(Duration::from_secs(60)),
false,
false,
);
ks.rename("old", "new").unwrap();
match ks.ttl("new") {
TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
other => panic!("expected TTL preserved, got {other:?}"),
}
}
#[test]
fn rename_overwrites_destination() {
let mut ks = Keyspace::new();
ks.set("src".into(), Bytes::from("new_val"), None, false, false);
ks.set("dst".into(), Bytes::from("old_val"), None, false, false);
ks.rename("src", "dst").unwrap();
assert!(!ks.exists("src"));
assert_eq!(
ks.get("dst").unwrap(),
Some(Value::String(Bytes::from("new_val")))
);
assert_eq!(ks.len(), 1);
}
#[test]
fn rename_missing_key_returns_error() {
let mut ks = Keyspace::new();
let err = ks.rename("missing", "new").unwrap_err();
assert_eq!(err, RenameError::NoSuchKey);
}
#[test]
fn rename_same_key() {
let mut ks = Keyspace::new();
ks.set("key".into(), Bytes::from("val"), None, false, false);
ks.rename("key", "key").unwrap();
assert_eq!(
ks.get("key").unwrap(),
Some(Value::String(Bytes::from("val")))
);
}
#[test]
fn rename_tracks_memory() {
let mut ks = Keyspace::new();
ks.set("old".into(), Bytes::from("value"), None, false, false);
let before = ks.stats().used_bytes;
ks.rename("old", "new").unwrap();
let after = ks.stats().used_bytes;
assert_eq!(before, after);
assert_eq!(ks.stats().key_count, 1);
}
#[test]
fn zero_ttl_expires_immediately() {
let mut ks = Keyspace::new();
ks.set(
"key".into(),
Bytes::from("val"),
Some(Duration::ZERO),
false,
false,
);
std::thread::sleep(Duration::from_millis(1));
assert!(ks.get("key").unwrap().is_none());
}
#[test]
fn very_small_ttl_expires_quickly() {
let mut ks = Keyspace::new();
ks.set(
"key".into(),
Bytes::from("val"),
Some(Duration::from_millis(1)),
false,
false,
);
std::thread::sleep(Duration::from_millis(5));
assert!(ks.get("key").unwrap().is_none());
}
#[test]
fn count_keys_in_slot_empty() {
let ks = Keyspace::new();
assert_eq!(ks.count_keys_in_slot(0), 0);
}
#[test]
fn count_keys_in_slot_matches() {
let mut ks = Keyspace::new();
ks.set("a".into(), Bytes::from("1"), None, false, false);
ks.set("b".into(), Bytes::from("2"), None, false, false);
ks.set("c".into(), Bytes::from("3"), None, false, false);
let slot_a = ember_cluster::key_slot(b"a");
let count = ks.count_keys_in_slot(slot_a);
assert!(count >= 1);
}
#[test]
fn count_keys_in_slot_skips_expired() {
let mut ks = Keyspace::new();
let slot = ember_cluster::key_slot(b"temp");
ks.set(
"temp".into(),
Bytes::from("gone"),
Some(Duration::from_millis(0)),
false,
false,
);
thread::sleep(Duration::from_millis(5));
assert_eq!(ks.count_keys_in_slot(slot), 0);
}
#[test]
fn get_keys_in_slot_returns_matching() {
let mut ks = Keyspace::new();
ks.set("x".into(), Bytes::from("1"), None, false, false);
ks.set("y".into(), Bytes::from("2"), None, false, false);
let slot_x = ember_cluster::key_slot(b"x");
let keys = ks.get_keys_in_slot(slot_x, 100);
assert!(keys.contains(&"x".to_string()));
}
#[test]
fn get_keys_in_slot_respects_count_limit() {
let mut ks = Keyspace::new();
for i in 0..100 {
ks.set(format!("key:{i}"), Bytes::from("v"), None, false, false);
}
let keys = ks.get_keys_in_slot(0, 3);
assert!(keys.len() <= 3);
}
#[test]
fn key_version_returns_none_for_missing() {
let mut ks = Keyspace::new();
assert_eq!(ks.key_version("nope"), None);
}
#[test]
fn key_version_changes_on_set() {
let mut ks = Keyspace::new();
ks.set("k".into(), Bytes::from("v1"), None, false, false);
let v1 = ks.key_version("k").expect("key should exist");
ks.set("k".into(), Bytes::from("v2"), None, false, false);
let v2 = ks.key_version("k").expect("key should exist");
assert!(v2 > v1, "version should increase on overwrite");
}
#[test]
fn key_version_none_after_del() {
let mut ks = Keyspace::new();
ks.set("k".into(), Bytes::from("v"), None, false, false);
assert!(ks.key_version("k").is_some());
ks.del("k");
assert_eq!(ks.key_version("k"), None);
}
#[test]
fn key_version_changes_on_list_push() {
let mut ks = Keyspace::new();
ks.lpush("list", &[Bytes::from("a")]).unwrap();
let v1 = ks.key_version("list").expect("list should exist");
ks.rpush("list", &[Bytes::from("b")]).unwrap();
let v2 = ks.key_version("list").expect("list should exist");
assert!(v2 > v1, "version should increase on rpush");
}
#[test]
fn key_version_changes_on_hash_set() {
let mut ks = Keyspace::new();
ks.hset("h", &[("f1".into(), Bytes::from("v1"))]).unwrap();
let v1 = ks.key_version("h").expect("hash should exist");
ks.hset("h", &[("f2".into(), Bytes::from("v2"))]).unwrap();
let v2 = ks.key_version("h").expect("hash should exist");
assert!(v2 > v1, "version should increase on hset");
}
#[test]
fn key_version_changes_on_expire() {
let mut ks = Keyspace::new();
ks.set("k".into(), Bytes::from("v"), None, false, false);
let v1 = ks.key_version("k").expect("key should exist");
ks.expire("k", 100);
let v2 = ks.key_version("k").expect("key should exist");
assert!(v2 > v1, "version should increase on expire");
}
#[test]
fn key_version_stable_without_watch() {
let mut ks = Keyspace::new();
ks.set("a".into(), Bytes::from("1"), None, false, false);
ks.set("a".into(), Bytes::from("2"), None, false, false);
let v1 = ks.key_version("a").unwrap();
let v2 = ks.key_version("a").unwrap();
assert_eq!(v1, v2, "version should be stable without mutations");
}
#[test]
fn copy_basic() {
let mut ks = Keyspace::new();
ks.set("src".into(), Bytes::from("hello"), None, false, false);
assert_eq!(ks.copy("src", "dst", false), Ok(true));
assert_eq!(
ks.get("dst").unwrap(),
Some(Value::String(Bytes::from("hello")))
);
assert!(ks.exists("src"));
}
#[test]
fn copy_preserves_expiry() {
let mut ks = Keyspace::new();
ks.set(
"src".into(),
Bytes::from("val"),
Some(Duration::from_secs(60)),
false,
false,
);
assert_eq!(ks.copy("src", "dst", false), Ok(true));
match ks.ttl("dst") {
TtlResult::Seconds(s) => assert!((58..=60).contains(&s)),
other => panic!("expected TTL preserved, got {other:?}"),
}
}
#[test]
fn copy_no_replace_returns_false() {
let mut ks = Keyspace::new();
ks.set("src".into(), Bytes::from("a"), None, false, false);
ks.set("dst".into(), Bytes::from("b"), None, false, false);
assert_eq!(ks.copy("src", "dst", false), Ok(false));
assert_eq!(
ks.get("dst").unwrap(),
Some(Value::String(Bytes::from("b")))
);
}
#[test]
fn copy_replace_overwrites() {
let mut ks = Keyspace::new();
ks.set("src".into(), Bytes::from("new"), None, false, false);
ks.set("dst".into(), Bytes::from("old"), None, false, false);
assert_eq!(ks.copy("src", "dst", true), Ok(true));
assert_eq!(
ks.get("dst").unwrap(),
Some(Value::String(Bytes::from("new")))
);
}
#[test]
fn copy_missing_source() {
let mut ks = Keyspace::new();
assert_eq!(ks.copy("missing", "dst", false), Err(CopyError::NoSuchKey));
}
#[test]
fn copy_tracks_memory() {
let mut ks = Keyspace::new();
ks.set("src".into(), Bytes::from("value"), None, false, false);
let before = ks.stats().used_bytes;
ks.copy("src", "dst", false).unwrap();
let after = ks.stats().used_bytes;
assert!(after > before);
assert_eq!(ks.stats().key_count, 2);
}
#[test]
fn random_key_empty() {
let mut ks = Keyspace::new();
assert_eq!(ks.random_key(), None);
}
#[test]
fn random_key_returns_existing() {
let mut ks = Keyspace::new();
ks.set("only".into(), Bytes::from("val"), None, false, false);
assert_eq!(ks.random_key(), Some("only".into()));
}
#[test]
fn touch_existing_key() {
let mut ks = Keyspace::new();
ks.set("k".into(), Bytes::from("v"), None, false, false);
assert!(ks.touch("k"));
}
#[test]
fn touch_missing_key() {
let mut ks = Keyspace::new();
assert!(!ks.touch("missing"));
}
#[test]
fn sort_list_numeric() {
let mut ks = Keyspace::new();
let _ = ks.lpush(
"nums",
&[Bytes::from("3"), Bytes::from("1"), Bytes::from("2")],
);
let result = ks.sort("nums", false, false, None).unwrap();
assert_eq!(
result,
vec![Bytes::from("1"), Bytes::from("2"), Bytes::from("3")]
);
}
#[test]
fn sort_list_alpha_desc() {
let mut ks = Keyspace::new();
let _ = ks.lpush(
"words",
&[
Bytes::from("banana"),
Bytes::from("apple"),
Bytes::from("cherry"),
],
);
let result = ks.sort("words", true, true, None).unwrap();
assert_eq!(
result,
vec![
Bytes::from("cherry"),
Bytes::from("banana"),
Bytes::from("apple")
]
);
}
#[test]
fn sort_with_limit() {
let mut ks = Keyspace::new();
let _ = ks.lpush(
"nums",
&[
Bytes::from("4"),
Bytes::from("3"),
Bytes::from("2"),
Bytes::from("1"),
],
);
let result = ks.sort("nums", false, false, Some((1, 2))).unwrap();
assert_eq!(result, vec![Bytes::from("2"), Bytes::from("3")]);
}
#[test]
fn sort_set_alpha() {
let mut ks = Keyspace::new();
let members: Vec<String> = vec!["c".into(), "a".into(), "b".into()];
let _ = ks.sadd("myset", &members);
let result = ks.sort("myset", false, true, None).unwrap();
assert_eq!(
result,
vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
);
}
#[test]
fn sort_missing_key() {
let mut ks = Keyspace::new();
let result = ks.sort("nope", false, false, None).unwrap();
assert!(result.is_empty());
}
#[test]
fn sort_wrong_type() {
let mut ks = Keyspace::new();
ks.set("str".into(), Bytes::from("hello"), None, false, false);
let result = ks.sort("str", false, false, None);
assert!(result.is_err());
}
}