Skip to main content

fast_cache/
storage.rs

1//! Embedded storage APIs, routing helpers, batch payloads, and runtime stats.
2//!
3//! Most applications start with [`EmbeddedStore`], which is a shared, sharded
4//! key-value database handle. Applications that need cheap clones of one
5//! cross-worker handle can use [`SharedEmbeddedStore`]. Applications that pin
6//! work to owner-local worker threads can use [`LocalEmbeddedStore`] for
7//! exclusive `&mut` access to the shards assigned to that worker.
8//!
9//! Keys and values are byte vectors (`Vec<u8>`). TTL arguments accepted by
10//! write methods are milliseconds relative to the current time; expiration
11//! methods that accept `expire_at_ms` use an absolute Unix timestamp in
12//! milliseconds.
13
14mod command;
15mod embedded_store;
16#[cfg(feature = "sharded")]
17mod embedded_store_sharded;
18mod embedded_store_shared;
19mod engine;
20mod flat_map;
21mod records;
22mod redis_objects;
23mod stats;
24#[cfg(feature = "telemetry")]
25mod telemetry;
26
27pub use command::{BorrowedCommand, Command};
28#[cfg(feature = "sharded")]
29pub use embedded_store::OwnedEmbeddedSessionPackedView as LocalEmbeddedSessionPackedView;
30pub use embedded_store::{
31    EmbeddedBatchReadView, EmbeddedKeyRoute, EmbeddedReadSlice, EmbeddedReadView,
32    EmbeddedRouteMode, EmbeddedSessionBatchView, EmbeddedSessionRoute, EmbeddedShardHandle,
33    EmbeddedStore, OwnedEmbeddedBatchReadView, OwnedEmbeddedReadView,
34    OwnedEmbeddedSessionBatchView, OwnedEmbeddedSessionPackedView, OwnedEmbeddedShard,
35    OwnedEmbeddedWorkerReadSession, OwnedEmbeddedWorkerShards, PackedSessionWrite, shift_for,
36    stripe_index,
37};
38#[cfg(feature = "sharded")]
39pub use embedded_store_sharded::{
40    LocalRouteError, LocalStoreAccessError, LocalStoreInstallError,
41    WorkerLocalBatchReadView as LocalEmbeddedBatchReadView,
42    WorkerLocalEmbeddedStore as LocalEmbeddedStore,
43    WorkerLocalEmbeddedStoreBootstrap as LocalEmbeddedStoreBootstrap,
44    WorkerLocalReadSlice as LocalEmbeddedReadSlice, WorkerLocalReadView as LocalEmbeddedReadView,
45    WorkerLocalSessionBatchView as LocalEmbeddedSessionBatchView,
46};
47pub use embedded_store_shared::{
48    Entry as SharedEmbeddedEntry, Ref as SharedEmbeddedRef, RefMut as SharedEmbeddedRefMut,
49    SharedEmbeddedConfig, SharedEmbeddedLockPolicy, SharedEmbeddedStore,
50    VacantEntry as SharedEmbeddedVacantEntry,
51};
52
53#[cfg(feature = "sharded")]
54pub fn with_local_embedded_store<R>(
55    f: impl FnOnce(&mut LocalEmbeddedStore) -> R,
56) -> Result<R, LocalStoreAccessError> {
57    embedded_store_sharded::with_thread_local_embedded_store(f)
58}
59
60#[cfg(feature = "sharded")]
61pub fn take_local_embedded_store() -> Option<LocalEmbeddedStore> {
62    embedded_store_sharded::take_thread_local_embedded_store()
63}
64pub use engine::EngineHandle;
65pub(crate) use engine::{
66    EngineCommandContext, EngineFastFuture, EngineFrameFuture, EngineRespSpanFuture,
67    ExpirationChange, RESP_SPANNED_VALUE_MIN, ShardKey, ShardOperation, ShardReply, ShardValue,
68};
69pub use flat_map::FlatMap;
70pub use records::{MutationBytes, MutationOp, MutationRecord, StoredEntry};
71pub(crate) use redis_objects::{
72    RedisObjectBucket, RedisObjectReadOutcome, RedisObjectStore, RedisObjectValue,
73    RedisObjectWriteAttempt, WRONGTYPE_MESSAGE,
74};
75pub use redis_objects::{RedisObjectError, RedisObjectResult, RedisStringLookup};
76pub use stats::{GlobalStatsSnapshot, ShardStatsSnapshot, TierStatsSnapshot, WalStatsSnapshot};
77use std::collections::{HashMap, HashSet};
78use std::sync::Once;
79use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
80use std::thread;
81use std::time::{Duration, SystemTime, UNIX_EPOCH};
82#[cfg(feature = "telemetry")]
83pub use telemetry::{CacheMetrics, CacheMetricsSnapshot, CacheTelemetry, CacheTelemetryHandle};
84
85/// Owned byte buffer used for cache keys and values.
86pub type Bytes = Vec<u8>;
87/// Hash map with the crate's default XXH3 hasher.
88pub type FastHashMap<K, V> = HashMap<K, V, xxhash_rust::xxh3::Xxh3DefaultBuilder>;
89/// Hash set with the crate's default XXH3 hasher.
90pub type FastHashSet<T> = HashSet<T, xxhash_rust::xxh3::Xxh3DefaultBuilder>;
91
92/// A packed copy-out batch result.
93///
94/// The payload bytes are concatenated into a single contiguous buffer to keep
95/// Python object creation and allocator churn low while we still return owned
96/// data. `offsets[index] == usize::MAX` marks a miss.
97#[derive(Debug, Clone, Default)]
98pub struct PackedBatch {
99    /// Concatenated value bytes for all hits.
100    pub buffer: Bytes,
101    /// Start offset for each requested item, or `usize::MAX` for a miss.
102    pub offsets: Vec<usize>,
103    /// Value length for each requested item.
104    pub lengths: Vec<usize>,
105    /// Number of requested keys that were present.
106    pub hit_count: usize,
107}
108
109/// Precomputed routing and exact-match metadata for repeated point lookups.
110///
111/// This token owns the original key bytes plus precomputed routing and
112/// hash-derived tag state so callers can reuse lookup preparation work while still
113/// preserving exact byte-for-byte key semantics on every hit.
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct PreparedPointKey {
116    pub(crate) route: EmbeddedKeyRoute,
117    pub(crate) key_len: usize,
118    pub(crate) key_tag: u64,
119    pub(crate) key: Bytes,
120}
121
122impl PreparedPointKey {
123    /// Returns the precomputed route for this key.
124    #[inline(always)]
125    pub fn route(&self) -> EmbeddedKeyRoute {
126        self.route
127    }
128
129    /// Returns the original key length in bytes.
130    #[inline(always)]
131    pub fn key_len(&self) -> usize {
132        self.key_len
133    }
134
135    /// Returns the precomputed primary-hash-derived key tag.
136    #[inline(always)]
137    pub fn key_tag(&self) -> u64 {
138        self.key_tag
139    }
140
141    /// Returns the original key bytes.
142    #[inline(always)]
143    pub fn key(&self) -> &[u8] {
144        &self.key
145    }
146}
147
148impl PackedBatch {
149    /// Returns the total number of value bytes copied into `buffer`.
150    #[inline(always)]
151    pub fn total_bytes(&self) -> usize {
152        self.buffer.len()
153    }
154
155    /// Returns the number of keys requested in the batch.
156    #[inline(always)]
157    pub fn item_count(&self) -> usize {
158        self.offsets.len()
159    }
160
161    /// Returns true when every requested key was present.
162    #[inline(always)]
163    pub fn all_hit(&self) -> bool {
164        self.hit_count == self.item_count()
165    }
166}
167
168/// Computes the crate's primary XXH3 key hash.
169#[inline(always)]
170pub fn hash_key(key: &[u8]) -> u64 {
171    xxhash_rust::xxh3::xxh3_64(key)
172}
173
174/// Computes the key tag associated with an already-computed primary key hash.
175///
176/// This mirrors hash-table control-byte style filtering without hashing the
177/// key bytes a second time. Exact-match paths still compare the key bytes.
178#[inline(always)]
179pub fn hash_key_tag_from_hash(hash: u64) -> u64 {
180    hash >> 56
181}
182
183/// Computes the primary-hash-derived key tag used by prepared point lookups.
184#[inline(always)]
185pub fn hash_key_tag(key: &[u8]) -> u64 {
186    hash_key_tag_from_hash(hash_key(key))
187}
188
189/// Returns the current Unix timestamp in milliseconds.
190pub fn now_millis() -> u64 {
191    exact_now_millis()
192}
193
194/// Returns a cached Unix timestamp in milliseconds for TTL hot paths.
195///
196/// Expiration checks only need millisecond granularity, but a full wall-clock
197/// read on every cache hit is expensive at saturation. This starts a small
198/// process-local updater on first TTL use and lets readers load the current
199/// millisecond from an atomic. If the updater cannot start, callers fall back
200/// to the exact clock.
201#[inline(always)]
202pub(crate) fn ttl_now_millis() -> u64 {
203    TTL_CLOCK_START.call_once(start_ttl_clock);
204    if TTL_CLOCK_RUNNING.load(Ordering::Relaxed) {
205        TTL_CLOCK_MS.load(Ordering::Relaxed)
206    } else {
207        exact_now_millis()
208    }
209}
210
211static TTL_CLOCK_START: Once = Once::new();
212static TTL_CLOCK_RUNNING: AtomicBool = AtomicBool::new(false);
213static TTL_CLOCK_MS: AtomicU64 = AtomicU64::new(0);
214
215fn start_ttl_clock() {
216    TTL_CLOCK_MS.store(exact_now_millis(), Ordering::Relaxed);
217    match thread::Builder::new()
218        .name("fast-cache-ttl-clock".to_string())
219        .spawn(|| {
220            loop {
221                TTL_CLOCK_MS.store(exact_now_millis(), Ordering::Relaxed);
222                thread::sleep(Duration::from_millis(1));
223            }
224        }) {
225        Ok(_handle) => TTL_CLOCK_RUNNING.store(true, Ordering::Relaxed),
226        Err(_error) => {}
227    }
228}
229
230#[inline(always)]
231fn exact_now_millis() -> u64 {
232    let duration = SystemTime::now()
233        .duration_since(UNIX_EPOCH)
234        .unwrap_or_default();
235    duration.as_millis() as u64
236}