mod command;
mod embedded_store;
#[cfg(feature = "sharded")]
mod embedded_store_sharded;
mod embedded_store_shared;
mod engine;
mod flat_map;
mod records;
mod redis_objects;
mod stats;
#[cfg(feature = "telemetry")]
mod telemetry;
pub use command::{BorrowedCommand, Command};
#[cfg(feature = "sharded")]
pub use embedded_store::OwnedEmbeddedSessionPackedView as LocalEmbeddedSessionPackedView;
pub use embedded_store::{
EmbeddedBatchReadView, EmbeddedKeyRoute, EmbeddedReadSlice, EmbeddedReadView,
EmbeddedRouteMode, EmbeddedSessionBatchView, EmbeddedSessionRoute, EmbeddedShardHandle,
EmbeddedStore, OwnedEmbeddedBatchReadView, OwnedEmbeddedReadView,
OwnedEmbeddedSessionBatchView, OwnedEmbeddedSessionPackedView, OwnedEmbeddedShard,
OwnedEmbeddedWorkerReadSession, OwnedEmbeddedWorkerShards, PackedSessionWrite, shift_for,
stripe_index,
};
#[cfg(feature = "sharded")]
pub use embedded_store_sharded::{
LocalRouteError, LocalStoreAccessError, LocalStoreInstallError,
WorkerLocalBatchReadView as LocalEmbeddedBatchReadView,
WorkerLocalEmbeddedStore as LocalEmbeddedStore,
WorkerLocalEmbeddedStoreBootstrap as LocalEmbeddedStoreBootstrap,
WorkerLocalReadSlice as LocalEmbeddedReadSlice, WorkerLocalReadView as LocalEmbeddedReadView,
WorkerLocalSessionBatchView as LocalEmbeddedSessionBatchView,
};
pub use embedded_store_shared::{
Entry as SharedEmbeddedEntry, Ref as SharedEmbeddedRef, RefMut as SharedEmbeddedRefMut,
SharedEmbeddedConfig, SharedEmbeddedLockPolicy, SharedEmbeddedStore,
VacantEntry as SharedEmbeddedVacantEntry,
};
#[cfg(feature = "sharded")]
pub fn with_local_embedded_store<R>(
f: impl FnOnce(&mut LocalEmbeddedStore) -> R,
) -> Result<R, LocalStoreAccessError> {
embedded_store_sharded::with_thread_local_embedded_store(f)
}
#[cfg(feature = "sharded")]
pub fn take_local_embedded_store() -> Option<LocalEmbeddedStore> {
embedded_store_sharded::take_thread_local_embedded_store()
}
pub use engine::EngineHandle;
pub(crate) use engine::{
EngineCommandContext, EngineFastFuture, EngineFrameFuture, EngineRespSpanFuture,
ExpirationChange, RESP_SPANNED_VALUE_MIN, ShardKey, ShardOperation, ShardReply, ShardValue,
};
pub use flat_map::FlatMap;
pub use records::{MutationBytes, MutationOp, MutationRecord, StoredEntry};
pub(crate) use redis_objects::{
RedisObjectBucket, RedisObjectReadOutcome, RedisObjectStore, RedisObjectValue,
RedisObjectWriteAttempt, WRONGTYPE_MESSAGE,
};
pub use redis_objects::{RedisObjectError, RedisObjectResult, RedisStringLookup};
pub use stats::{GlobalStatsSnapshot, ShardStatsSnapshot, TierStatsSnapshot, WalStatsSnapshot};
use std::collections::{HashMap, HashSet};
use std::sync::Once;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[cfg(feature = "telemetry")]
pub use telemetry::{CacheMetrics, CacheMetricsSnapshot, CacheTelemetry, CacheTelemetryHandle};
pub type Bytes = Vec<u8>;
pub type FastHashMap<K, V> = HashMap<K, V, xxhash_rust::xxh3::Xxh3DefaultBuilder>;
pub type FastHashSet<T> = HashSet<T, xxhash_rust::xxh3::Xxh3DefaultBuilder>;
#[derive(Debug, Clone, Default)]
pub struct PackedBatch {
pub buffer: Bytes,
pub offsets: Vec<usize>,
pub lengths: Vec<usize>,
pub hit_count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PreparedPointKey {
pub(crate) route: EmbeddedKeyRoute,
pub(crate) key_len: usize,
pub(crate) key_tag: u64,
pub(crate) key: Bytes,
}
impl PreparedPointKey {
#[inline(always)]
pub fn route(&self) -> EmbeddedKeyRoute {
self.route
}
#[inline(always)]
pub fn key_len(&self) -> usize {
self.key_len
}
#[inline(always)]
pub fn key_tag(&self) -> u64 {
self.key_tag
}
#[inline(always)]
pub fn key(&self) -> &[u8] {
&self.key
}
}
impl PackedBatch {
#[inline(always)]
pub fn total_bytes(&self) -> usize {
self.buffer.len()
}
#[inline(always)]
pub fn item_count(&self) -> usize {
self.offsets.len()
}
#[inline(always)]
pub fn all_hit(&self) -> bool {
self.hit_count == self.item_count()
}
}
#[inline(always)]
pub fn hash_key(key: &[u8]) -> u64 {
xxhash_rust::xxh3::xxh3_64(key)
}
#[inline(always)]
pub fn hash_key_tag_from_hash(hash: u64) -> u64 {
hash >> 56
}
#[inline(always)]
pub fn hash_key_tag(key: &[u8]) -> u64 {
hash_key_tag_from_hash(hash_key(key))
}
pub fn now_millis() -> u64 {
exact_now_millis()
}
#[inline(always)]
pub(crate) fn ttl_now_millis() -> u64 {
TTL_CLOCK_START.call_once(start_ttl_clock);
if TTL_CLOCK_RUNNING.load(Ordering::Relaxed) {
TTL_CLOCK_MS.load(Ordering::Relaxed)
} else {
exact_now_millis()
}
}
static TTL_CLOCK_START: Once = Once::new();
static TTL_CLOCK_RUNNING: AtomicBool = AtomicBool::new(false);
static TTL_CLOCK_MS: AtomicU64 = AtomicU64::new(0);
fn start_ttl_clock() {
TTL_CLOCK_MS.store(exact_now_millis(), Ordering::Relaxed);
match thread::Builder::new()
.name("fast-cache-ttl-clock".to_string())
.spawn(|| {
loop {
TTL_CLOCK_MS.store(exact_now_millis(), Ordering::Relaxed);
thread::sleep(Duration::from_millis(1));
}
}) {
Ok(_handle) => TTL_CLOCK_RUNNING.store(true, Ordering::Relaxed),
Err(_error) => {}
}
}
#[inline(always)]
fn exact_now_millis() -> u64 {
let duration = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
duration.as_millis() as u64
}