Skip to main content

fast_cache/storage/
embedded_store.rs

1use crossbeam_utils::CachePadded;
2#[cfg(not(feature = "embedded-read-biased-lock"))]
3use parking_lot::RwLock;
4#[cfg(feature = "embedded-read-biased-lock")]
5use rblock::RwLock;
6#[cfg(feature = "telemetry")]
7use std::sync::Arc;
8#[cfg(feature = "telemetry")]
9use std::time::Instant;
10
11use crate::config::EvictionPolicy;
12use crate::storage::{
13    Bytes, PackedBatch, PreparedPointKey, RedisObjectBucket, RedisObjectError,
14    RedisObjectReadOutcome, RedisObjectResult, RedisObjectStore, RedisObjectValue,
15    RedisObjectWriteAttempt, RedisStringLookup, StoredEntry, hash_key, hash_key_tag_from_hash,
16    now_millis,
17};
18#[cfg(feature = "telemetry")]
19use crate::storage::{CacheTelemetry, CacheTelemetryHandle};
20#[cfg(feature = "embedded")]
21use crate::storage::{ShardStatsSnapshot, TierStatsSnapshot};
22
23mod batch;
24mod batch_results;
25mod core;
26mod lifecycle;
27mod objects;
28mod owned;
29mod point;
30mod routing;
31mod session_slots;
32mod shard;
33mod views;
34mod write;
35
36pub use owned::{
37    EmbeddedShardHandle, OwnedEmbeddedShard, OwnedEmbeddedWorkerReadSession,
38    OwnedEmbeddedWorkerShards,
39};
40#[cfg(feature = "unsafe")]
41use routing::can_skip_session_lookup;
42pub use routing::{
43    EmbeddedKeyRoute, EmbeddedRouteMode, EmbeddedSessionRoute, shift_for, stripe_index,
44};
45pub(crate) use routing::{assert_valid_shard_count, compute_key_route, compute_session_shard};
46use routing::{
47    batch_derived_session_storage_prefix, can_route_with_key_hash, can_use_route_hash_as_key_hash,
48    derived_session_storage_prefix, point_write_session_storage_prefix, session_route_prefix,
49    uses_flat_key_storage,
50};
51pub use session_slots::PackedSessionWrite;
52pub(crate) use session_slots::SessionSlotMap;
53pub(crate) use shard::EmbeddedShard;
54pub use views::{
55    EmbeddedBatchReadView, EmbeddedReadSlice, EmbeddedReadView, EmbeddedSessionBatchView,
56    OwnedEmbeddedBatchReadView, OwnedEmbeddedReadView, OwnedEmbeddedSessionBatchView,
57    OwnedEmbeddedSessionPackedView,
58};
59
60/// Shared embedded in-memory database.
61///
62/// `EmbeddedStore` is internally sharded and can be shared across threads. It
63/// offers byte-string key/value methods, Redis object helpers, TTL management,
64/// batch reads and writes, and session-oriented packed transfer APIs.
65#[derive(Debug)]
66pub struct EmbeddedStore {
67    shards: Box<[CachePadded<RwLock<EmbeddedShard>>]>,
68    shift: u32,
69    objects: RedisObjectStore,
70    route_mode: EmbeddedRouteMode,
71    #[cfg(feature = "telemetry")]
72    metrics: Option<Arc<CacheTelemetry>>,
73}
74
75#[inline(always)]
76fn reserve_batch_capacity(buffer: &mut Vec<u8>, next_len: usize, item_count: usize) {
77    if buffer.capacity() == 0 && next_len > 0 {
78        // Reserve for the whole batch on the first hit so large chunk reads avoid
79        // repeated reallocations while packing the response payload.
80        buffer.reserve(next_len.saturating_mul(item_count));
81    }
82}
83
84#[inline(always)]
85#[cfg(feature = "embedded")]
86fn accumulate_tier_stats(target: &mut TierStatsSnapshot, snapshot: &TierStatsSnapshot) {
87    target.len = target.len.saturating_add(snapshot.len);
88    target.capacity = target.capacity.saturating_add(snapshot.capacity);
89    target.hits = target.hits.saturating_add(snapshot.hits);
90    target.misses = target.misses.saturating_add(snapshot.misses);
91    target.promotions = target.promotions.saturating_add(snapshot.promotions);
92    target.demotions = target.demotions.saturating_add(snapshot.demotions);
93    target.evictions = target.evictions.saturating_add(snapshot.evictions);
94    target.expirations = target.expirations.saturating_add(snapshot.expirations);
95}
96
97#[inline(always)]
98fn write_now_ms(ttl_ms: Option<u64>, memory_limit_bytes: Option<usize>) -> u64 {
99    if ttl_ms.is_some() || memory_limit_bytes.is_some() {
100        now_millis()
101    } else {
102        0
103    }
104}
105
106#[inline(always)]
107fn write_resp_blob_string_into(out: &mut bytes::BytesMut, value: &[u8]) {
108    #[cfg(not(feature = "unsafe"))]
109    {
110        let mut buf = itoa::Buffer::new();
111        let len_str = buf.format(value.len()).as_bytes();
112        out.extend_from_slice(b"$");
113        out.extend_from_slice(len_str);
114        out.extend_from_slice(b"\r\n");
115        out.extend_from_slice(value);
116        out.extend_from_slice(b"\r\n");
117    }
118    #[cfg(feature = "unsafe")]
119    {
120        if value.len() == 64 {
121            const HEADER: &[u8] = b"$64\r\n";
122            let total = HEADER.len() + 64 + 2;
123            out.reserve(total);
124            // SAFETY: reserve(total) ensures `total` bytes of spare capacity.
125            unsafe {
126                let start = out.len();
127                let dst = out.as_mut_ptr().add(start);
128                std::ptr::copy_nonoverlapping(HEADER.as_ptr(), dst, HEADER.len());
129                std::ptr::copy_nonoverlapping(value.as_ptr(), dst.add(HEADER.len()), 64);
130                *dst.add(HEADER.len() + 64) = b'\r';
131                *dst.add(HEADER.len() + 65) = b'\n';
132                out.set_len(start + total);
133            }
134            return;
135        }
136
137        let mut buf = itoa::Buffer::new();
138        let len_str = buf.format(value.len()).as_bytes();
139        let total = 1 + len_str.len() + 2 + value.len() + 2;
140        out.reserve(total);
141        // SAFETY: reserve(total) ensures `total` bytes of spare capacity.
142        unsafe {
143            let start = out.len();
144            let dst = out.as_mut_ptr().add(start);
145            *dst = b'$';
146            let mut pos = 1usize;
147            std::ptr::copy_nonoverlapping(len_str.as_ptr(), dst.add(pos), len_str.len());
148            pos += len_str.len();
149            *dst.add(pos) = b'\r';
150            *dst.add(pos + 1) = b'\n';
151            pos += 2;
152            std::ptr::copy_nonoverlapping(value.as_ptr(), dst.add(pos), value.len());
153            pos += value.len();
154            *dst.add(pos) = b'\r';
155            *dst.add(pos + 1) = b'\n';
156            out.set_len(start + total);
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests;