1mod command;
15mod embedded_store;
16#[cfg(feature = "sharded")]
17mod embedded_store_sharded;
18mod embedded_store_shared;
19mod engine;
20mod flat_map;
21mod records;
22mod redis_objects;
23mod stats;
24#[cfg(feature = "telemetry")]
25mod telemetry;
26
27pub use command::{BorrowedCommand, Command};
28#[cfg(feature = "sharded")]
29pub use embedded_store::OwnedEmbeddedSessionPackedView as LocalEmbeddedSessionPackedView;
30pub use embedded_store::{
31 EmbeddedBatchReadView, EmbeddedKeyRoute, EmbeddedReadSlice, EmbeddedReadView,
32 EmbeddedRouteMode, EmbeddedSessionBatchView, EmbeddedSessionRoute, EmbeddedShardHandle,
33 EmbeddedStore, OwnedEmbeddedBatchReadView, OwnedEmbeddedReadView,
34 OwnedEmbeddedSessionBatchView, OwnedEmbeddedSessionPackedView, OwnedEmbeddedShard,
35 OwnedEmbeddedWorkerReadSession, OwnedEmbeddedWorkerShards, PackedSessionWrite, shift_for,
36 stripe_index,
37};
38#[cfg(feature = "sharded")]
39pub use embedded_store_sharded::{
40 LocalRouteError, LocalStoreAccessError, LocalStoreInstallError,
41 WorkerLocalBatchReadView as LocalEmbeddedBatchReadView,
42 WorkerLocalEmbeddedStore as LocalEmbeddedStore,
43 WorkerLocalEmbeddedStoreBootstrap as LocalEmbeddedStoreBootstrap,
44 WorkerLocalReadSlice as LocalEmbeddedReadSlice, WorkerLocalReadView as LocalEmbeddedReadView,
45 WorkerLocalSessionBatchView as LocalEmbeddedSessionBatchView,
46};
47pub use embedded_store_shared::{
48 Entry as SharedEmbeddedEntry, Ref as SharedEmbeddedRef, RefMut as SharedEmbeddedRefMut,
49 SharedEmbeddedConfig, SharedEmbeddedLockPolicy, SharedEmbeddedStore,
50 VacantEntry as SharedEmbeddedVacantEntry,
51};
52
53#[cfg(feature = "sharded")]
54pub fn with_local_embedded_store<R>(
55 f: impl FnOnce(&mut LocalEmbeddedStore) -> R,
56) -> Result<R, LocalStoreAccessError> {
57 embedded_store_sharded::with_thread_local_embedded_store(f)
58}
59
60#[cfg(feature = "sharded")]
61pub fn take_local_embedded_store() -> Option<LocalEmbeddedStore> {
62 embedded_store_sharded::take_thread_local_embedded_store()
63}
64pub use engine::EngineHandle;
65pub(crate) use engine::{
66 EngineCommandContext, EngineFastFuture, EngineFrameFuture, EngineRespSpanFuture,
67 ExpirationChange, RESP_SPANNED_VALUE_MIN, ShardKey, ShardOperation, ShardReply, ShardValue,
68};
69pub use flat_map::FlatMap;
70pub use records::{MutationBytes, MutationOp, MutationRecord, StoredEntry};
71pub(crate) use redis_objects::{
72 RedisObjectBucket, RedisObjectReadOutcome, RedisObjectStore, RedisObjectValue,
73 RedisObjectWriteAttempt, WRONGTYPE_MESSAGE,
74};
75pub use redis_objects::{RedisObjectError, RedisObjectResult, RedisStringLookup};
76pub use stats::{GlobalStatsSnapshot, ShardStatsSnapshot, TierStatsSnapshot, WalStatsSnapshot};
77use std::collections::{HashMap, HashSet};
78use std::sync::Once;
79use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
80use std::thread;
81use std::time::{Duration, SystemTime, UNIX_EPOCH};
82#[cfg(feature = "telemetry")]
83pub use telemetry::{CacheMetrics, CacheMetricsSnapshot, CacheTelemetry, CacheTelemetryHandle};
84
85pub type Bytes = Vec<u8>;
87pub type FastHashMap<K, V> = HashMap<K, V, xxhash_rust::xxh3::Xxh3DefaultBuilder>;
89pub type FastHashSet<T> = HashSet<T, xxhash_rust::xxh3::Xxh3DefaultBuilder>;
91
92#[derive(Debug, Clone, Default)]
98pub struct PackedBatch {
99 pub buffer: Bytes,
101 pub offsets: Vec<usize>,
103 pub lengths: Vec<usize>,
105 pub hit_count: usize,
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct PreparedPointKey {
116 pub(crate) route: EmbeddedKeyRoute,
117 pub(crate) key_len: usize,
118 pub(crate) key_tag: u64,
119 pub(crate) key: Bytes,
120}
121
122impl PreparedPointKey {
123 #[inline(always)]
125 pub fn route(&self) -> EmbeddedKeyRoute {
126 self.route
127 }
128
129 #[inline(always)]
131 pub fn key_len(&self) -> usize {
132 self.key_len
133 }
134
135 #[inline(always)]
137 pub fn key_tag(&self) -> u64 {
138 self.key_tag
139 }
140
141 #[inline(always)]
143 pub fn key(&self) -> &[u8] {
144 &self.key
145 }
146}
147
148impl PackedBatch {
149 #[inline(always)]
151 pub fn total_bytes(&self) -> usize {
152 self.buffer.len()
153 }
154
155 #[inline(always)]
157 pub fn item_count(&self) -> usize {
158 self.offsets.len()
159 }
160
161 #[inline(always)]
163 pub fn all_hit(&self) -> bool {
164 self.hit_count == self.item_count()
165 }
166}
167
168#[inline(always)]
170pub fn hash_key(key: &[u8]) -> u64 {
171 xxhash_rust::xxh3::xxh3_64(key)
172}
173
174#[inline(always)]
179pub fn hash_key_tag_from_hash(hash: u64) -> u64 {
180 hash >> 56
181}
182
183#[inline(always)]
185pub fn hash_key_tag(key: &[u8]) -> u64 {
186 hash_key_tag_from_hash(hash_key(key))
187}
188
189pub fn now_millis() -> u64 {
191 exact_now_millis()
192}
193
194#[inline(always)]
202pub(crate) fn ttl_now_millis() -> u64 {
203 TTL_CLOCK_START.call_once(start_ttl_clock);
204 if TTL_CLOCK_RUNNING.load(Ordering::Relaxed) {
205 TTL_CLOCK_MS.load(Ordering::Relaxed)
206 } else {
207 exact_now_millis()
208 }
209}
210
211static TTL_CLOCK_START: Once = Once::new();
212static TTL_CLOCK_RUNNING: AtomicBool = AtomicBool::new(false);
213static TTL_CLOCK_MS: AtomicU64 = AtomicU64::new(0);
214
215fn start_ttl_clock() {
216 TTL_CLOCK_MS.store(exact_now_millis(), Ordering::Relaxed);
217 match thread::Builder::new()
218 .name("fast-cache-ttl-clock".to_string())
219 .spawn(|| {
220 loop {
221 TTL_CLOCK_MS.store(exact_now_millis(), Ordering::Relaxed);
222 thread::sleep(Duration::from_millis(1));
223 }
224 }) {
225 Ok(_handle) => TTL_CLOCK_RUNNING.store(true, Ordering::Relaxed),
226 Err(_error) => {}
227 }
228}
229
230#[inline(always)]
231fn exact_now_millis() -> u64 {
232 let duration = SystemTime::now()
233 .duration_since(UNIX_EPOCH)
234 .unwrap_or_default();
235 duration.as_millis() as u64
236}