Skip to main content

fast_cache/storage/embedded_store/
core.rs

1use super::*;
2
3impl EmbeddedStore {
4    /// Creates an embedded store with full-key routing.
5    pub fn new(shard_count: usize) -> Self {
6        Self::with_route_mode(shard_count, EmbeddedRouteMode::FullKey)
7    }
8
9    /// Creates an embedded store with an explicit routing mode.
10    pub fn with_route_mode(shard_count: usize, route_mode: EmbeddedRouteMode) -> Self {
11        #[cfg(feature = "telemetry")]
12        {
13            Self::with_route_mode_and_metrics(shard_count, route_mode, None)
14        }
15        #[cfg(not(feature = "telemetry"))]
16        {
17            assert_valid_shard_count(shard_count);
18            let shift = shift_for(shard_count);
19            let shards = (0..shard_count)
20                .map(|shard_id| {
21                    CachePadded::new(RwLock::new(EmbeddedShard::with_limits(
22                        shard_id,
23                        None,
24                        EvictionPolicy::None,
25                        None,
26                    )))
27                })
28                .collect::<Vec<_>>()
29                .into_boxed_slice();
30            Self {
31                shards,
32                shift,
33                objects: RedisObjectStore::new(shard_count),
34                route_mode,
35            }
36        }
37    }
38
39    #[cfg(feature = "telemetry")]
40    pub fn with_route_mode_and_metrics(
41        shard_count: usize,
42        route_mode: EmbeddedRouteMode,
43        metrics: Option<Arc<CacheTelemetry>>,
44    ) -> Self {
45        Self::with_route_mode_and_metrics_shard_offset(shard_count, route_mode, metrics, 0)
46    }
47
48    #[cfg(feature = "telemetry")]
49    pub fn with_route_mode_and_metrics_shard_offset(
50        shard_count: usize,
51        route_mode: EmbeddedRouteMode,
52        metrics: Option<Arc<CacheTelemetry>>,
53        shard_id_base: usize,
54    ) -> Self {
55        assert_valid_shard_count(shard_count);
56        let shift = shift_for(shard_count);
57        let shards = (0..shard_count)
58            .map(|shard_id| {
59                let mut shard =
60                    EmbeddedShard::with_limits(shard_id, None, EvictionPolicy::None, None);
61                if let Some(metrics) = &metrics {
62                    shard.map.attach_metrics(
63                        CacheTelemetryHandle::from_arc(metrics),
64                        shard_id_base + shard_id,
65                    );
66                }
67                CachePadded::new(RwLock::new(shard))
68            })
69            .collect::<Vec<_>>()
70            .into_boxed_slice();
71        Self {
72            shards,
73            shift,
74            objects: RedisObjectStore::new(shard_count),
75            route_mode,
76            metrics,
77        }
78    }
79
80    /// Returns the number of storage shards.
81    #[inline(always)]
82    pub fn shard_count(&self) -> usize {
83        self.shards.len()
84    }
85
86    /// Returns the number of currently live keys and session entries.
87    pub fn len(&self) -> usize {
88        self.shards
89            .iter()
90            .map(|shard| {
91                let shard = shard.read();
92                shard.map.len().saturating_add(shard.session_slots.len())
93            })
94            .sum::<usize>()
95            + self.objects.object_count()
96    }
97
98    /// Returns a sorted snapshot of currently live keys.
99    pub fn key_snapshot(&self) -> Vec<Bytes> {
100        let now_ms = now_millis();
101        let mut keys = Vec::with_capacity(self.len());
102        for shard in &self.shards {
103            let shard = shard.read();
104            keys.extend(
105                shard
106                    .map
107                    .snapshot_entries(now_ms)
108                    .into_iter()
109                    .map(|entry| entry.key),
110            );
111        }
112        keys.extend(
113            self.objects
114                .keys_with_type(now_ms)
115                .into_iter()
116                .map(|(key, _)| key),
117        );
118        keys.sort();
119        keys
120    }
121
122    /// Returns currently live string entries for persistence or replication.
123    ///
124    /// Redis object values are intentionally excluded; the native replication
125    /// stream v1 covers byte-string cache mutations.
126    pub fn entry_snapshot(&self) -> Vec<StoredEntry> {
127        let now_ms = now_millis();
128        let mut entries = Vec::new();
129        for shard in &self.shards {
130            let shard = shard.read();
131            entries.extend(shard.map.snapshot_entries(now_ms));
132        }
133        entries.sort_by_key(|entry| hash_key(entry.key.as_ref()));
134        entries
135    }
136
137    /// Returns the approximate number of bytes stored in string values.
138    pub fn stored_bytes(&self) -> usize {
139        self.shards
140            .iter()
141            .map(|shard| shard.read().stored_bytes())
142            .sum()
143    }
144
145    /// Applies a per-shard memory budget and eviction policy.
146    pub fn configure_memory_policy(
147        &self,
148        per_shard_memory_limit_bytes: Option<usize>,
149        eviction_policy: EvictionPolicy,
150    ) {
151        let now_ms = now_millis();
152        for shard in &self.shards {
153            shard.write().configure_memory_policy(
154                per_shard_memory_limit_bytes,
155                eviction_policy,
156                now_ms,
157            );
158        }
159    }
160
161    /// Returns true when the store has no live keys.
162    #[inline(always)]
163    pub fn is_empty(&self) -> bool {
164        self.len() == 0
165    }
166
167    /// Returns the configured route mode.
168    #[inline(always)]
169    pub fn route_mode(&self) -> EmbeddedRouteMode {
170        self.route_mode
171    }
172
173    /// Computes the route for a session prefix.
174    #[inline(always)]
175    pub fn route_session(&self, session_prefix: &[u8]) -> EmbeddedSessionRoute {
176        EmbeddedSessionRoute {
177            shard_id: compute_session_shard(self.shift, session_prefix),
178        }
179    }
180
181    #[inline(always)]
182    pub(crate) fn route_key_prehashed(&self, key_hash: u64, key: &[u8]) -> EmbeddedKeyRoute {
183        if can_route_with_key_hash(self.route_mode, self.shard_count(), key) {
184            EmbeddedKeyRoute {
185                shard_id: self.route_hash(key_hash),
186                key_hash,
187            }
188        } else {
189            self.route_key(key)
190        }
191    }
192
193    #[cfg(feature = "telemetry")]
194    #[inline(always)]
195    pub fn metrics(&self) -> Option<Arc<CacheTelemetry>> {
196        self.metrics.clone()
197    }
198
199    #[cfg(feature = "telemetry")]
200    pub fn export_metrics_prometheus(&self) -> Option<String> {
201        self.metrics
202            .as_ref()
203            .map(|metrics| metrics.export_prometheus())
204    }
205
206    #[cfg(feature = "telemetry")]
207    pub fn metrics_snapshot(&self) -> Option<crate::storage::CacheMetricsSnapshot> {
208        self.metrics.as_ref().map(|metrics| metrics.snapshot())
209    }
210
211    /// Computes the route for a key.
212    #[inline(always)]
213    pub fn route_key(&self, key: &[u8]) -> EmbeddedKeyRoute {
214        compute_key_route(self.route_mode, self.shift, key)
215    }
216
217    /// Returns the session-routing prefix for a key.
218    ///
219    /// For `s:<session>:c:<chunk>` keys this is `s:<session>`. For non-session
220    /// keys this returns the whole key, matching `SessionPrefix` route mode.
221    #[inline(always)]
222    pub(crate) fn session_route_prefix_for_key<'a>(&self, key: &'a [u8]) -> &'a [u8] {
223        session_route_prefix(key)
224    }
225
226    /// Precomputes route and fingerprint metadata for repeated point lookups.
227    #[inline(always)]
228    pub fn prepare_point_key(&self, key: &[u8]) -> PreparedPointKey {
229        let route = self.route_key(key);
230        PreparedPointKey {
231            route,
232            key_len: key.len(),
233            key_tag: hash_key_tag_from_hash(route.key_hash),
234            key: key.to_vec(),
235        }
236    }
237
238    #[inline(always)]
239    pub(super) fn route_hash(&self, hash: u64) -> usize {
240        stripe_index(hash, self.shift)
241    }
242
243    #[inline(always)]
244    pub(super) fn hashes_for_key(&self, key: &[u8]) -> (u64, u64) {
245        let key_hash = hash_key(key);
246        let route_hash = match self.route_mode {
247            EmbeddedRouteMode::FullKey => key_hash,
248            EmbeddedRouteMode::SessionPrefix => hash_key(session_route_prefix(key)),
249        };
250        (route_hash, key_hash)
251    }
252
253    pub(super) fn single_shard_batch_route(&self, keys: &[Bytes]) -> Option<usize> {
254        if self.route_mode != EmbeddedRouteMode::SessionPrefix || keys.is_empty() {
255            return None;
256        }
257
258        // Session-routed batches only get the fast path when every key shares the
259        // same `s:<session>` prefix. Otherwise we fall back to generic grouping.
260        let first_prefix = session_route_prefix(&keys[0]);
261        let first_shard = self.route_hash(hash_key(first_prefix));
262        if keys[1..]
263            .iter()
264            .all(|key| session_route_prefix(key.as_slice()) == first_prefix)
265        {
266            Some(first_shard)
267        } else {
268            None
269        }
270    }
271
272    #[cfg(test)]
273    #[inline(always)]
274    pub(super) fn shard_for_key(&self, key: &[u8]) -> usize {
275        let (route_hash, _) = self.hashes_for_key(key);
276        self.route_hash(route_hash)
277    }
278
279    #[cfg(feature = "telemetry")]
280    #[inline(always)]
281    pub(super) fn record_batch_metrics(&self, start: Option<Instant>, touched_shards: &[usize]) {
282        if let (Some(metrics), Some(start)) = (&self.metrics, start) {
283            metrics.record_batch_get(start.elapsed().as_nanos() as u64);
284            for &shard_id in touched_shards {
285                metrics.record_batch_get_shard(shard_id);
286            }
287        }
288    }
289}