#![forbid(unsafe_code)]
mod accounting;
mod clock;
mod entry;
pub mod evict;
pub mod expire;
pub use expire::ExpireStats;
pub(crate) use entry::Entry;
mod hash;
mod keyspace;
mod list;
mod set;
mod stream;
mod string;
mod util;
mod value;
mod zset;
pub use stream::{
AutoclaimResult, ConsumerGroup, ConsumerState, EntryBatch, GroupCreateMode,
LoadedGroup, LoadedPelEntry, LoadedStreamEntry, PelEntry, PendingExtended,
PendingExtendedRow, PendingSummary, ReadGroupId, StreamData, StreamId, StreamIdError,
XAddIdSpec, XClaimOpts, now_unix_ms, parse_explicit_id, parse_range_end,
parse_range_start, parse_xadd_id,
};
pub use util::glob_match;
pub use value::*;
pub(crate) use clock::{now_ns, pack_deadline, unpack_deadline};
use kevy_map::KevyMap;
#[derive(Debug, PartialEq, Eq)]
pub enum RenameOutcome {
Renamed,
NoSuchSrc,
DstExists,
}
#[derive(Debug, PartialEq, Eq)]
pub enum StoreError {
WrongType,
NotInteger,
Overflow,
OutOfRange,
NoSuchKey,
NotFloat,
OutOfMemory,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum EvictionPolicy {
#[default]
NoEviction,
AllKeysLru,
AllKeysLfu,
AllKeysRandom,
VolatileLru,
VolatileLfu,
VolatileRandom,
VolatileTtl,
}
impl EvictionPolicy {
#[inline]
pub fn uses_lru(self) -> bool {
matches!(self, Self::AllKeysLru | Self::VolatileLru)
}
#[inline]
pub fn uses_lfu(self) -> bool {
matches!(self, Self::AllKeysLfu | Self::VolatileLfu)
}
#[inline]
pub fn is_volatile(self) -> bool {
matches!(
self,
Self::VolatileLru | Self::VolatileLfu | Self::VolatileRandom | Self::VolatileTtl
)
}
}
#[derive(Default)]
pub struct Store {
pub(crate) map: KevyMap<SmallBytes, Entry>,
pub(crate) cached_ns: u64,
pub(crate) cached_clock: bool,
pub(crate) used_memory: u64,
pub(crate) maxmemory: u64,
pub(crate) eviction_policy: EvictionPolicy,
pub(crate) evictions_total: u64,
pub(crate) clock_counter: u64,
pub(crate) used_memory_peak: u64,
pub(crate) expired_keys_total: u64,
pub(crate) watch_versions: std::collections::HashMap<Vec<u8>, u64>,
}
impl Store {
pub fn new() -> Self {
Store::default()
}
#[inline]
pub fn refresh_clock(&mut self) {
self.cached_ns = now_ns();
}
#[inline]
pub fn set_cached_clock(&mut self, on: bool) {
self.cached_clock = on;
if on {
self.refresh_clock();
}
}
#[inline]
pub fn set_max_memory(&mut self, maxmemory: u64, policy: EvictionPolicy) {
self.maxmemory = maxmemory;
self.eviction_policy = policy;
}
#[inline]
pub fn used_memory(&self) -> u64 {
self.used_memory
}
#[inline]
pub fn used_memory_peak(&self) -> u64 {
self.used_memory_peak
}
#[inline]
pub fn maxmemory(&self) -> u64 {
self.maxmemory
}
#[inline]
pub fn eviction_policy(&self) -> EvictionPolicy {
self.eviction_policy
}
#[inline]
pub fn evictions_total(&self) -> u64 {
self.evictions_total
}
pub fn record_watch(&mut self, key: &[u8]) -> u64 {
*self
.watch_versions
.entry(key.to_vec())
.or_insert(0)
}
#[inline]
pub fn key_version(&self, key: &[u8]) -> u64 {
self.watch_versions.get(key).copied().unwrap_or(0)
}
#[inline]
pub fn bump_if_watched(&mut self, key: &[u8]) {
if self.watch_versions.is_empty() {
return;
}
if let Some(v) = self.watch_versions.get_mut(key) {
*v = v.wrapping_add(1);
}
}
pub fn bump_all_watched(&mut self) {
for v in self.watch_versions.values_mut() {
*v = v.wrapping_add(1);
}
}
pub fn estimate_key_bytes(&self, key: &[u8]) -> Option<u64> {
self.map.get(key).map(|e| e.weight() + ENTRY_OVERHEAD)
}
#[inline]
pub fn precheck_for_write(&self) -> Result<(), StoreError> {
if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
return Ok(());
}
if self.eviction_policy == EvictionPolicy::NoEviction {
return Err(StoreError::OutOfMemory);
}
Ok(())
}
#[inline]
pub fn try_evict_after_write(&mut self) -> usize {
if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
return 0;
}
evict::evict_until_under_limit(self)
}
}
#[inline]
pub(crate) fn apply_delta(v: &mut u64, delta: i64) {
if delta >= 0 {
*v = v.saturating_add(delta as u64);
} else {
*v = v.saturating_sub((-delta) as u64);
}
}
#[inline]
pub(crate) fn key_heap_bytes_for(key: &[u8]) -> u64 {
if key.len() <= 22 { 0 } else { key.len() as u64 }
}
#[cfg(test)]
mod tests;
#[cfg(test)]
mod tests_memory;