fast-cache 0.1.0

Embedded-first thread-per-core in-memory cache with optional Redis-compatible server
Documentation
use crate::config::EvictionPolicy;
use crate::storage::FlatMap;

use super::{EmbeddedRouteMode, SessionSlotMap, derived_session_storage_prefix};

#[derive(Debug, Default)]
pub(crate) struct EmbeddedShard {
    pub(super) map: FlatMap,
    pub(super) session_slots: SessionSlotMap,
    pub(super) memory_limit_bytes: Option<usize>,
    pub(super) eviction_policy: EvictionPolicy,
}

impl EmbeddedShard {
    pub(crate) fn with_limits(
        _shard_id: usize,
        memory_limit_bytes: Option<usize>,
        eviction_policy: EvictionPolicy,
        capacity_hint: Option<usize>,
    ) -> Self {
        let map = capacity_hint.map_or_else(FlatMap::new, FlatMap::with_capacity);
        let mut shard = Self {
            map,
            session_slots: SessionSlotMap::default(),
            memory_limit_bytes: None,
            eviction_policy: EvictionPolicy::None,
        };
        shard.configure_memory_policy(memory_limit_bytes, eviction_policy, 0);
        shard
    }

    #[inline(always)]
    pub(crate) fn stored_bytes(&self) -> usize {
        self.map
            .stored_bytes()
            .saturating_add(self.session_slots.stored_bytes())
    }

    #[inline(always)]
    pub(crate) fn get_ref_hashed_shared_no_ttl(&self, hash: u64, key: &[u8]) -> Option<&[u8]> {
        self.map.get_ref_hashed_shared_no_ttl(hash, key)
    }

    #[inline(always)]
    pub(crate) fn get_ref_hashed_shared(
        &self,
        hash: u64,
        key: &[u8],
        now_ms: u64,
    ) -> Option<&[u8]> {
        self.map.get_ref_hashed_shared(hash, key, now_ms)
    }

    #[inline(always)]
    pub(crate) fn has_no_ttl_entries(&self) -> bool {
        self.map.has_no_ttl_entries()
    }

    #[inline(always)]
    pub(crate) fn contains_key_hashed_no_ttl(&self, hash: u64, key: &[u8]) -> bool {
        self.get_ref_hashed_shared_no_ttl(hash, key).is_some()
    }

    #[inline(always)]
    pub(crate) fn contains_key_hashed(&self, hash: u64, key: &[u8], now_ms: u64) -> bool {
        self.get_ref_hashed_shared(hash, key, now_ms).is_some()
    }

    #[inline(always)]
    pub(crate) fn get_ref_hashed_session_or_flat(
        &mut self,
        key_hash: u64,
        key: &[u8],
        now_ms: u64,
    ) -> Option<&[u8]> {
        if let Some(session_prefix) = derived_session_storage_prefix(key)
            && let Some(value) = self
                .session_slots
                .get_ref_hashed(&session_prefix, key_hash, key)
        {
            return Some(value);
        }

        self.map.get_ref_hashed(key_hash, key, now_ms)
    }

    #[inline(always)]
    pub(crate) fn get_ref_hashed_published_session_or_flat(
        &mut self,
        key_hash: u64,
        key: &[u8],
        now_ms: u64,
    ) -> Option<&[u8]> {
        if let Some(session_prefix) = derived_session_storage_prefix(key)
            && self.session_slots.has_session(&session_prefix)
        {
            return self
                .session_slots
                .get_ref_hashed(&session_prefix, key_hash, key);
        }

        self.map.get_ref_hashed(key_hash, key, now_ms)
    }

    #[inline(always)]
    pub(crate) fn get_ref_hashed_active_session_or_flat(
        &mut self,
        active_session_prefix: Option<&[u8]>,
        key_hash: u64,
        key: &[u8],
        now_ms: u64,
    ) -> Option<&[u8]> {
        match active_session_prefix {
            Some(session_prefix) => {
                self.session_slots
                    .get_ref_hashed(session_prefix, key_hash, key)
            }
            None => self.map.get_ref_hashed(key_hash, key, now_ms),
        }
    }

    #[inline(always)]
    pub(crate) fn set_value_bytes_hashed_no_ttl(
        &mut self,
        route_mode: EmbeddedRouteMode,
        key_hash: u64,
        key: &[u8],
        value: bytes::Bytes,
    ) {
        if route_mode == EmbeddedRouteMode::SessionPrefix
            && let Some(session_prefix) = derived_session_storage_prefix(key)
        {
            self.session_slots
                .delete_hashed(&session_prefix, key_hash, key);
        }
        self.map.set_bytes_hashed(key_hash, key, value, None, 0);
        self.enforce_memory_limit(0);
    }

    #[inline(always)]
    pub(crate) fn set_value_bytes_hashed(
        &mut self,
        route_mode: EmbeddedRouteMode,
        key_hash: u64,
        key: &[u8],
        value: bytes::Bytes,
        expire_at_ms: Option<u64>,
        now_ms: u64,
    ) {
        if route_mode == EmbeddedRouteMode::SessionPrefix
            && let Some(session_prefix) = derived_session_storage_prefix(key)
        {
            self.session_slots
                .delete_hashed(&session_prefix, key_hash, key);
        }
        self.map
            .set_bytes_hashed(key_hash, key, value, expire_at_ms, now_ms);
        self.enforce_memory_limit(now_ms);
    }

    #[inline(always)]
    pub(crate) fn set_slice_hashed_no_ttl(
        &mut self,
        route_mode: EmbeddedRouteMode,
        key_hash: u64,
        key: &[u8],
        value: &[u8],
    ) {
        if route_mode == EmbeddedRouteMode::SessionPrefix
            && let Some(session_prefix) = derived_session_storage_prefix(key)
        {
            self.session_slots
                .delete_hashed(&session_prefix, key_hash, key);
        }
        self.map.set_slice_hashed(key_hash, key, value, None, 0);
        self.enforce_memory_limit(0);
    }

    #[inline(always)]
    pub(crate) fn set_slice_hashed(
        &mut self,
        route_mode: EmbeddedRouteMode,
        key_hash: u64,
        key: &[u8],
        value: &[u8],
        expire_at_ms: Option<u64>,
        now_ms: u64,
    ) {
        if route_mode == EmbeddedRouteMode::SessionPrefix
            && let Some(session_prefix) = derived_session_storage_prefix(key)
        {
            self.session_slots
                .delete_hashed(&session_prefix, key_hash, key);
        }
        self.map
            .set_slice_hashed(key_hash, key, value, expire_at_ms, now_ms);
        self.enforce_memory_limit(now_ms);
    }

    #[inline(always)]
    pub(crate) fn remove_value_hashed_no_ttl(
        &mut self,
        key_hash: u64,
        key: &[u8],
    ) -> Option<bytes::Bytes> {
        let removed = self.map.remove_value_hashed(key_hash, key, 0);
        if removed.is_some() {
            self.enforce_memory_limit(0);
        }
        removed
    }

    #[inline(always)]
    pub(crate) fn get_session_ref_hashed_shared_no_ttl(
        &self,
        session_prefix: &[u8],
        key_hash: u64,
        key: &[u8],
    ) -> Option<&[u8]> {
        self.session_slots
            .get_ref_hashed_shared(session_prefix, key_hash, key)
    }

    #[inline(always)]
    pub(crate) fn get_session_ref_hashed_shared_no_ttl_prehashed(
        &self,
        session_hash: u64,
        session_prefix: &[u8],
        key_hash: u64,
        key: &[u8],
    ) -> Option<&[u8]> {
        self.session_slots.get_ref_hashed_shared_prehashed(
            session_hash,
            session_prefix,
            key_hash,
            key,
        )
    }

    #[inline(always)]
    pub(crate) fn set_session_slice_hashed_no_ttl(
        &mut self,
        session_prefix: &[u8],
        key_hash: u64,
        key: &[u8],
        value: &[u8],
    ) {
        if !self.map.is_empty() {
            self.map.delete_hashed(key_hash, key, 0);
        }
        self.session_slots
            .set_slice_hashed(session_prefix, key_hash, key, value);
        self.enforce_memory_limit(0);
    }

    #[inline(always)]
    pub(crate) fn set_session_slice_hashed_no_ttl_prehashed(
        &mut self,
        session_hash: u64,
        session_prefix: &[u8],
        key_hash: u64,
        key: &[u8],
        value: &[u8],
    ) {
        if !self.map.is_empty() {
            self.map.delete_hashed(key_hash, key, 0);
        }
        self.session_slots.set_slice_hashed_prehashed(
            session_hash,
            session_prefix,
            key_hash,
            key,
            value,
        );
        self.enforce_memory_limit(0);
    }

    #[inline(always)]
    fn update_lazy_read_sampling(&mut self) {
        let enabled = if self.eviction_policy == EvictionPolicy::None {
            false
        } else if let Some(limit) = self.memory_limit_bytes {
            let watermark = limit.saturating_mul(3) / 4;
            self.stored_bytes() >= watermark.max(1)
        } else {
            false
        };
        self.session_slots.configure_read_sampling(enabled);
    }

    pub(super) fn configure_memory_policy(
        &mut self,
        memory_limit_bytes: Option<usize>,
        eviction_policy: EvictionPolicy,
        now_ms: u64,
    ) {
        self.memory_limit_bytes = memory_limit_bytes.filter(|limit| *limit > 0);
        self.eviction_policy = eviction_policy;
        self.session_slots.configure_access_tracking(
            self.memory_limit_bytes.is_some() && self.eviction_policy != EvictionPolicy::None,
        );
        self.update_lazy_read_sampling();
        self.map
            .configure_memory_policy(None, eviction_policy, now_ms);
        self.enforce_memory_limit(now_ms);
    }

    #[inline(always)]
    pub(super) fn enforce_memory_limit(&mut self, now_ms: u64) {
        self.update_lazy_read_sampling();
        let Some(limit) = self.memory_limit_bytes else {
            return;
        };
        if self.stored_bytes() <= limit {
            return;
        }

        self.map.process_maintenance(now_ms);
        if self.session_slots.is_empty() {
            self.map.evict_to_memory_target(
                self.eviction_policy,
                now_ms,
                FlatMap::eviction_target_bytes(limit),
            );
            self.update_lazy_read_sampling();
            return;
        }
        while self.stored_bytes() > limit {
            let map_candidate = self.map.eviction_candidate(self.eviction_policy);
            let session_candidate = self.session_slots.eviction_candidate(self.eviction_policy);
            let evicted = match (map_candidate, session_candidate) {
                (
                    Some((map_rank, _map_hash, _map_key)),
                    Some((session_rank, _session_prefix, _session_hash, _session_key)),
                ) => {
                    if session_rank < map_rank {
                        self.session_slots.evict_with_policy(self.eviction_policy)
                    } else {
                        self.map.evict_with_policy(self.eviction_policy, now_ms)
                    }
                }
                (Some((_rank, _map_hash, _map_key)), None) => {
                    self.map.evict_with_policy(self.eviction_policy, now_ms)
                }
                (None, Some((_rank, _session_prefix, _session_hash, _session_key))) => {
                    self.session_slots.evict_with_policy(self.eviction_policy)
                }
                (None, None) => false,
            };
            if !evicted {
                break;
            }
            if self.stored_bytes() <= limit {
                break;
            }
        }
        self.update_lazy_read_sampling();
    }
}