#![forbid(unsafe_code)]
mod accounting;
pub mod evict;
pub mod expire;
pub use expire::ExpireStats;
mod hash;
mod keyspace;
mod list;
mod set;
mod stream;
mod string;
mod util;
mod value;
mod zset;
pub use stream::{
AutoclaimResult, ConsumerGroup, ConsumerState, EntryBatch, GroupCreateMode,
LoadedStreamEntry, PelEntry, PendingExtended, PendingExtendedRow, PendingSummary,
ReadGroupId, StreamData, StreamId, StreamIdError, XAddIdSpec, XClaimOpts,
now_unix_ms, parse_explicit_id, parse_range_end, parse_range_start, parse_xadd_id,
};
pub use util::glob_match;
pub use value::*;
use kevy_map::KevyMap;
use std::num::NonZeroU64;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
fn epoch() -> Instant {
static EPOCH: OnceLock<Instant> = OnceLock::new();
*EPOCH.get_or_init(Instant::now)
}
#[inline]
fn pack_deadline(t: Instant) -> Option<NonZeroU64> {
let ns = t.saturating_duration_since(epoch()).as_nanos() as u64;
NonZeroU64::new(ns)
}
#[inline]
fn unpack_deadline(ns: NonZeroU64) -> Instant {
epoch() + Duration::from_nanos(ns.get())
}
const WEIGHT_MAX: u32 = u32::MAX;
pub(crate) struct Entry {
pub(crate) value: Value,
pub(crate) expire_at_ns: Option<NonZeroU64>,
pub(crate) weight: u32,
pub(crate) lru_clock: u32,
}
impl Entry {
#[inline]
pub(crate) fn new(value: Value, expire_at: Option<Instant>) -> Self {
Self {
value,
expire_at_ns: expire_at.and_then(pack_deadline),
weight: 0,
lru_clock: 0,
}
}
#[inline]
pub(crate) fn weight(&self) -> u64 {
self.weight as u64
}
#[inline]
pub(crate) fn lru_clock(&self) -> u32 {
self.lru_clock
}
#[inline]
pub(crate) fn set_weight(&mut self, w: u64) {
self.weight = w.min(WEIGHT_MAX as u64) as u32;
}
#[inline]
pub(crate) fn set_lru_clock(&mut self, c: u32) {
self.lru_clock = c;
}
#[inline]
pub(crate) fn add_to_weight(&mut self, delta: i64) {
if delta == 0 {
return;
}
let cur = self.weight as u64;
let new = if delta >= 0 {
cur.saturating_add(delta as u64)
} else {
cur.saturating_sub((-delta) as u64)
};
self.weight = new.min(WEIGHT_MAX as u64) as u32;
}
#[inline]
pub(crate) fn is_expired_at(&self, now: Instant) -> bool {
match self.expire_at_ns {
None => false,
Some(ns) => unpack_deadline(ns) <= now,
}
}
}
const _: () = {
assert!(std::mem::size_of::<Entry>() == 48);
};
#[derive(Debug, PartialEq, Eq)]
pub enum RenameOutcome {
Renamed,
NoSuchSrc,
DstExists,
}
#[derive(Debug, PartialEq, Eq)]
pub enum StoreError {
WrongType,
NotInteger,
Overflow,
OutOfRange,
NoSuchKey,
NotFloat,
OutOfMemory,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum EvictionPolicy {
#[default]
NoEviction,
AllKeysLru,
AllKeysLfu,
AllKeysRandom,
VolatileLru,
VolatileLfu,
VolatileRandom,
VolatileTtl,
}
impl EvictionPolicy {
#[inline]
pub fn uses_lru(self) -> bool {
matches!(self, Self::AllKeysLru | Self::VolatileLru)
}
#[inline]
pub fn uses_lfu(self) -> bool {
matches!(self, Self::AllKeysLfu | Self::VolatileLfu)
}
#[inline]
pub fn is_volatile(self) -> bool {
matches!(
self,
Self::VolatileLru | Self::VolatileLfu | Self::VolatileRandom | Self::VolatileTtl
)
}
}
#[derive(Default)]
pub struct Store {
pub(crate) map: KevyMap<SmallBytes, Entry>,
pub(crate) used_memory: u64,
pub(crate) maxmemory: u64,
pub(crate) eviction_policy: EvictionPolicy,
pub(crate) evictions_total: u64,
pub(crate) clock_counter: u64,
pub(crate) used_memory_peak: u64,
pub(crate) expired_keys_total: u64,
pub(crate) watch_versions: std::collections::HashMap<Vec<u8>, u64>,
}
impl Store {
pub fn new() -> Self {
Store::default()
}
#[inline]
pub fn set_max_memory(&mut self, maxmemory: u64, policy: EvictionPolicy) {
self.maxmemory = maxmemory;
self.eviction_policy = policy;
}
#[inline]
pub fn used_memory(&self) -> u64 {
self.used_memory
}
#[inline]
pub fn used_memory_peak(&self) -> u64 {
self.used_memory_peak
}
#[inline]
pub fn maxmemory(&self) -> u64 {
self.maxmemory
}
#[inline]
pub fn eviction_policy(&self) -> EvictionPolicy {
self.eviction_policy
}
#[inline]
pub fn evictions_total(&self) -> u64 {
self.evictions_total
}
pub fn record_watch(&mut self, key: &[u8]) -> u64 {
*self
.watch_versions
.entry(key.to_vec())
.or_insert(0)
}
#[inline]
pub fn key_version(&self, key: &[u8]) -> u64 {
self.watch_versions.get(key).copied().unwrap_or(0)
}
#[inline]
pub fn bump_if_watched(&mut self, key: &[u8]) {
if let Some(v) = self.watch_versions.get_mut(key) {
*v = v.wrapping_add(1);
}
}
pub fn bump_all_watched(&mut self) {
for v in self.watch_versions.values_mut() {
*v = v.wrapping_add(1);
}
}
pub fn estimate_key_bytes(&self, key: &[u8]) -> Option<u64> {
self.map.get(key).map(|e| e.weight() + ENTRY_OVERHEAD)
}
#[inline]
pub fn precheck_for_write(&self) -> Result<(), StoreError> {
if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
return Ok(());
}
if self.eviction_policy == EvictionPolicy::NoEviction {
return Err(StoreError::OutOfMemory);
}
Ok(())
}
#[inline]
pub fn try_evict_after_write(&mut self) -> usize {
if self.maxmemory == 0 || self.used_memory <= self.maxmemory {
return 0;
}
evict::evict_until_under_limit(self)
}
}
#[inline]
pub(crate) fn apply_delta(v: &mut u64, delta: i64) {
if delta >= 0 {
*v = v.saturating_add(delta as u64);
} else {
*v = v.saturating_sub((-delta) as u64);
}
}
#[inline]
pub(crate) fn key_heap_bytes_for(key: &[u8]) -> u64 {
if key.len() <= 22 { 0 } else { key.len() as u64 }
}
#[cfg(test)]
mod tests;