fast_cache/storage/embedded_store/
core.rs1use super::*;
2
3impl EmbeddedStore {
4 pub fn new(shard_count: usize) -> Self {
6 Self::with_route_mode(shard_count, EmbeddedRouteMode::FullKey)
7 }
8
9 pub fn with_route_mode(shard_count: usize, route_mode: EmbeddedRouteMode) -> Self {
11 #[cfg(feature = "telemetry")]
12 {
13 Self::with_route_mode_and_metrics(shard_count, route_mode, None)
14 }
15 #[cfg(not(feature = "telemetry"))]
16 {
17 assert_valid_shard_count(shard_count);
18 let shift = shift_for(shard_count);
19 let shards = (0..shard_count)
20 .map(|shard_id| {
21 CachePadded::new(RwLock::new(EmbeddedShard::with_limits(
22 shard_id,
23 None,
24 EvictionPolicy::None,
25 None,
26 )))
27 })
28 .collect::<Vec<_>>()
29 .into_boxed_slice();
30 Self {
31 shards,
32 shift,
33 objects: RedisObjectStore::new(shard_count),
34 route_mode,
35 }
36 }
37 }
38
39 #[cfg(feature = "telemetry")]
40 pub fn with_route_mode_and_metrics(
41 shard_count: usize,
42 route_mode: EmbeddedRouteMode,
43 metrics: Option<Arc<CacheTelemetry>>,
44 ) -> Self {
45 Self::with_route_mode_and_metrics_shard_offset(shard_count, route_mode, metrics, 0)
46 }
47
48 #[cfg(feature = "telemetry")]
49 pub fn with_route_mode_and_metrics_shard_offset(
50 shard_count: usize,
51 route_mode: EmbeddedRouteMode,
52 metrics: Option<Arc<CacheTelemetry>>,
53 shard_id_base: usize,
54 ) -> Self {
55 assert_valid_shard_count(shard_count);
56 let shift = shift_for(shard_count);
57 let shards = (0..shard_count)
58 .map(|shard_id| {
59 let mut shard =
60 EmbeddedShard::with_limits(shard_id, None, EvictionPolicy::None, None);
61 if let Some(metrics) = &metrics {
62 shard.map.attach_metrics(
63 CacheTelemetryHandle::from_arc(metrics),
64 shard_id_base + shard_id,
65 );
66 }
67 CachePadded::new(RwLock::new(shard))
68 })
69 .collect::<Vec<_>>()
70 .into_boxed_slice();
71 Self {
72 shards,
73 shift,
74 objects: RedisObjectStore::new(shard_count),
75 route_mode,
76 metrics,
77 }
78 }
79
80 #[inline(always)]
82 pub fn shard_count(&self) -> usize {
83 self.shards.len()
84 }
85
86 pub fn len(&self) -> usize {
88 self.shards
89 .iter()
90 .map(|shard| {
91 let shard = shard.read();
92 shard.map.len().saturating_add(shard.session_slots.len())
93 })
94 .sum::<usize>()
95 + self.objects.object_count()
96 }
97
98 pub fn key_snapshot(&self) -> Vec<Bytes> {
100 let now_ms = now_millis();
101 let mut keys = Vec::with_capacity(self.len());
102 for shard in &self.shards {
103 let shard = shard.read();
104 keys.extend(
105 shard
106 .map
107 .snapshot_entries(now_ms)
108 .into_iter()
109 .map(|entry| entry.key),
110 );
111 }
112 keys.extend(
113 self.objects
114 .keys_with_type(now_ms)
115 .into_iter()
116 .map(|(key, _)| key),
117 );
118 keys.sort();
119 keys
120 }
121
122 pub fn entry_snapshot(&self) -> Vec<StoredEntry> {
127 let now_ms = now_millis();
128 let mut entries = Vec::new();
129 for shard in &self.shards {
130 let shard = shard.read();
131 entries.extend(shard.map.snapshot_entries(now_ms));
132 }
133 entries.sort_by_key(|entry| hash_key(entry.key.as_ref()));
134 entries
135 }
136
137 pub fn stored_bytes(&self) -> usize {
139 self.shards
140 .iter()
141 .map(|shard| shard.read().stored_bytes())
142 .sum()
143 }
144
145 pub fn configure_memory_policy(
147 &self,
148 per_shard_memory_limit_bytes: Option<usize>,
149 eviction_policy: EvictionPolicy,
150 ) {
151 let now_ms = now_millis();
152 for shard in &self.shards {
153 shard.write().configure_memory_policy(
154 per_shard_memory_limit_bytes,
155 eviction_policy,
156 now_ms,
157 );
158 }
159 }
160
161 #[inline(always)]
163 pub fn is_empty(&self) -> bool {
164 self.len() == 0
165 }
166
167 #[inline(always)]
169 pub fn route_mode(&self) -> EmbeddedRouteMode {
170 self.route_mode
171 }
172
173 #[inline(always)]
175 pub fn route_session(&self, session_prefix: &[u8]) -> EmbeddedSessionRoute {
176 EmbeddedSessionRoute {
177 shard_id: compute_session_shard(self.shift, session_prefix),
178 }
179 }
180
181 #[inline(always)]
182 pub(crate) fn route_key_prehashed(&self, key_hash: u64, key: &[u8]) -> EmbeddedKeyRoute {
183 if can_route_with_key_hash(self.route_mode, self.shard_count(), key) {
184 EmbeddedKeyRoute {
185 shard_id: self.route_hash(key_hash),
186 key_hash,
187 }
188 } else {
189 self.route_key(key)
190 }
191 }
192
193 #[cfg(feature = "telemetry")]
194 #[inline(always)]
195 pub fn metrics(&self) -> Option<Arc<CacheTelemetry>> {
196 self.metrics.clone()
197 }
198
199 #[cfg(feature = "telemetry")]
200 pub fn export_metrics_prometheus(&self) -> Option<String> {
201 self.metrics
202 .as_ref()
203 .map(|metrics| metrics.export_prometheus())
204 }
205
206 #[cfg(feature = "telemetry")]
207 pub fn metrics_snapshot(&self) -> Option<crate::storage::CacheMetricsSnapshot> {
208 self.metrics.as_ref().map(|metrics| metrics.snapshot())
209 }
210
211 #[inline(always)]
213 pub fn route_key(&self, key: &[u8]) -> EmbeddedKeyRoute {
214 compute_key_route(self.route_mode, self.shift, key)
215 }
216
217 #[inline(always)]
222 pub(crate) fn session_route_prefix_for_key<'a>(&self, key: &'a [u8]) -> &'a [u8] {
223 session_route_prefix(key)
224 }
225
226 #[inline(always)]
228 pub fn prepare_point_key(&self, key: &[u8]) -> PreparedPointKey {
229 let route = self.route_key(key);
230 PreparedPointKey {
231 route,
232 key_len: key.len(),
233 key_tag: hash_key_tag_from_hash(route.key_hash),
234 key: key.to_vec(),
235 }
236 }
237
238 #[inline(always)]
239 pub(super) fn route_hash(&self, hash: u64) -> usize {
240 stripe_index(hash, self.shift)
241 }
242
243 #[inline(always)]
244 pub(super) fn hashes_for_key(&self, key: &[u8]) -> (u64, u64) {
245 let key_hash = hash_key(key);
246 let route_hash = match self.route_mode {
247 EmbeddedRouteMode::FullKey => key_hash,
248 EmbeddedRouteMode::SessionPrefix => hash_key(session_route_prefix(key)),
249 };
250 (route_hash, key_hash)
251 }
252
253 pub(super) fn single_shard_batch_route(&self, keys: &[Bytes]) -> Option<usize> {
254 if self.route_mode != EmbeddedRouteMode::SessionPrefix || keys.is_empty() {
255 return None;
256 }
257
258 let first_prefix = session_route_prefix(&keys[0]);
261 let first_shard = self.route_hash(hash_key(first_prefix));
262 if keys[1..]
263 .iter()
264 .all(|key| session_route_prefix(key.as_slice()) == first_prefix)
265 {
266 Some(first_shard)
267 } else {
268 None
269 }
270 }
271
272 #[cfg(test)]
273 #[inline(always)]
274 pub(super) fn shard_for_key(&self, key: &[u8]) -> usize {
275 let (route_hash, _) = self.hashes_for_key(key);
276 self.route_hash(route_hash)
277 }
278
279 #[cfg(feature = "telemetry")]
280 #[inline(always)]
281 pub(super) fn record_batch_metrics(&self, start: Option<Instant>, touched_shards: &[usize]) {
282 if let (Some(metrics), Some(start)) = (&self.metrics, start) {
283 metrics.record_batch_get(start.elapsed().as_nanos() as u64);
284 for &shard_id in touched_shards {
285 metrics.record_batch_get_shard(shard_id);
286 }
287 }
288 }
289}