fast_cache/storage/embedded_store/
batch.rs1use super::batch_results::{
2 BatchReadViewBuilder, OrderedBatchReadViewBuilder, OrderedPackedBatchBuilder,
3 PackedBatchBuilder,
4};
5use super::*;
6
7impl EmbeddedStore {
8 pub fn batch_get(&self, keys: Vec<Bytes>) -> Vec<Option<Bytes>> {
10 let total = keys.len();
11 if total == 0 {
12 return Vec::new();
13 }
14
15 #[cfg(feature = "telemetry")]
16 let start = self.metrics.as_ref().map(|_| Instant::now());
17 let now_ms = now_millis();
18 if let Some(shard_id) = self.single_shard_batch_route(&keys) {
19 let mut shard = self.shards[shard_id].write();
20 let values = keys
21 .into_iter()
22 .map(|key| {
23 let (_, key_hash) = self.hashes_for_key(&key);
24 shard
25 .get_ref_hashed_session_or_flat(key_hash, &key, now_ms)
26 .map(<[u8]>::to_vec)
27 })
28 .collect();
29 #[cfg(feature = "telemetry")]
30 self.record_batch_metrics(start, &[shard_id]);
31 return values;
32 }
33
34 let mut groups = vec![Vec::<(usize, Bytes, u64)>::new(); self.shards.len()];
35 let mut touched = Vec::new();
36
37 for (index, key) in keys.into_iter().enumerate() {
38 let (route_hash, key_hash) = self.hashes_for_key(&key);
39 let shard_id = self.route_hash(route_hash);
40 if groups[shard_id].is_empty() {
41 touched.push(shard_id);
42 }
43 groups[shard_id].push((index, key, key_hash));
44 }
45
46 let mut values = vec![None; total];
47 for (shard_id, batch) in groups.into_iter().enumerate() {
48 if batch.is_empty() {
49 continue;
50 }
51 let mut shard = self.shards[shard_id].write();
52 for (index, key, key_hash) in batch {
53 values[index] = shard
54 .get_ref_hashed_session_or_flat(key_hash, &key, now_ms)
55 .map(<[u8]>::to_vec);
56 }
57 }
58 #[cfg(feature = "telemetry")]
59 self.record_batch_metrics(start, &touched);
60 values
61 }
62
63 pub fn batch_get_view(&self, keys: &[Bytes]) -> EmbeddedBatchReadView {
68 let total = keys.len();
69 if total == 0 {
70 return EmbeddedBatchReadView {
71 items: Vec::new(),
72 hit_count: 0,
73 total_bytes: 0,
74 };
75 }
76
77 #[cfg(feature = "telemetry")]
78 let start = self.metrics.as_ref().map(|_| Instant::now());
79 let now_ms = now_millis();
80
81 if let Some(shard_id) = self.single_shard_batch_route(keys) {
82 let mut shard = self.shards[shard_id].write();
83 let mut view = BatchReadViewBuilder::new(keys.len());
84 for key in keys {
85 let (_, key_hash) = self.hashes_for_key(key);
86 view.push(shard.get_ref_hashed_published_session_or_flat(key_hash, key, now_ms));
87 }
88 drop(shard);
89 #[cfg(feature = "telemetry")]
90 self.record_batch_metrics(start, &[shard_id]);
91 return view.finish();
92 }
93
94 let mut groups = vec![Vec::<(usize, &Bytes, u64)>::new(); self.shards.len()];
95 let mut touched = Vec::new();
96 for (index, key) in keys.iter().enumerate() {
97 let (route_hash, key_hash) = self.hashes_for_key(key);
98 let shard_id = self.route_hash(route_hash);
99 if groups[shard_id].is_empty() {
100 touched.push(shard_id);
101 }
102 groups[shard_id].push((index, key, key_hash));
103 }
104
105 let mut view = OrderedBatchReadViewBuilder::new(total);
106 for (shard_id, batch) in groups.into_iter().enumerate() {
107 if batch.is_empty() {
108 continue;
109 }
110 let mut shard = self.shards[shard_id].write();
111 for (index, key, key_hash) in batch {
112 let value = shard.get_ref_hashed_published_session_or_flat(key_hash, key, now_ms);
113 if let Some(value) = value {
114 view.record_hit(index, value);
115 }
116 }
117 drop(shard);
118 }
119
120 #[cfg(feature = "telemetry")]
121 self.record_batch_metrics(start, &touched);
122 view.finish()
123 }
124
125 pub fn batch_get_session(&self, session_prefix: &[u8], keys: &[Bytes]) -> Vec<Option<Bytes>> {
128 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
129 self.batch_get_session_prehashed(session_prefix, keys, &key_hashes)
130 }
131
132 pub fn batch_get_session_prehashed(
133 &self,
134 session_prefix: &[u8],
135 keys: &[Bytes],
136 key_hashes: &[u64],
137 ) -> Vec<Option<Bytes>> {
138 assert_eq!(
139 keys.len(),
140 key_hashes.len(),
141 "keys and key_hashes must have matching lengths",
142 );
143 if keys.is_empty() {
144 return Vec::new();
145 }
146
147 #[cfg(feature = "telemetry")]
148 let start = self.metrics.as_ref().map(|_| Instant::now());
149 let route = self.route_session(session_prefix);
150 let now_ms = now_millis();
151 let mut shard = self.shards[route.shard_id].write();
152 let active_session_prefix = shard
153 .session_slots
154 .has_session(session_prefix)
155 .then_some(session_prefix);
156 let values = keys
157 .iter()
158 .zip(key_hashes.iter().copied())
159 .map(|(key, key_hash)| {
160 shard
161 .get_ref_hashed_active_session_or_flat(
162 active_session_prefix,
163 key_hash,
164 key,
165 now_ms,
166 )
167 .map(<[u8]>::to_vec)
168 })
169 .collect();
170 #[cfg(feature = "telemetry")]
171 self.record_batch_metrics(start, &[route.shard_id]);
172 values
173 }
174
175 pub fn batch_get_session_routed(
177 &self,
178 route: EmbeddedSessionRoute,
179 keys: &[Bytes],
180 ) -> Vec<Option<Bytes>> {
181 let total = keys.len();
182 if total == 0 {
183 return Vec::new();
184 }
185
186 #[cfg(feature = "telemetry")]
187 let start = self.metrics.as_ref().map(|_| Instant::now());
188 let now_ms = now_millis();
189 let mut shard = self.shards[route.shard_id].write();
190 let values = keys
191 .iter()
192 .map(|key| {
193 let key_hash = hash_key(key);
194 shard
195 .map
196 .get_ref_hashed(key_hash, key, now_ms)
197 .map(<[u8]>::to_vec)
198 })
199 .collect();
200 #[cfg(feature = "telemetry")]
201 self.record_batch_metrics(start, &[route.shard_id]);
202 values
203 }
204
205 pub fn batch_get_session_view(
210 &self,
211 session_prefix: &[u8],
212 keys: &[Bytes],
213 ) -> EmbeddedSessionBatchView {
214 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
215 self.batch_get_session_view_prehashed(session_prefix, keys, &key_hashes)
216 }
217
218 pub fn batch_get_session_view_prehashed(
219 &self,
220 session_prefix: &[u8],
221 keys: &[Bytes],
222 key_hashes: &[u64],
223 ) -> EmbeddedSessionBatchView {
224 assert_eq!(
225 keys.len(),
226 key_hashes.len(),
227 "keys and key_hashes must have matching lengths",
228 );
229 if keys.is_empty() {
230 return EmbeddedBatchReadView {
231 items: Vec::new(),
232 hit_count: 0,
233 total_bytes: 0,
234 };
235 }
236
237 #[cfg(feature = "telemetry")]
238 let start = self.metrics.as_ref().map(|_| Instant::now());
239 let route = self.route_session(session_prefix);
240 let now_ms = now_millis();
241 let mut shard = self.shards[route.shard_id].write();
242
243 let active_session_prefix = shard
244 .session_slots
245 .has_session(session_prefix)
246 .then_some(session_prefix);
247 let mut view = BatchReadViewBuilder::new(keys.len());
248 for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
249 view.push(shard.get_ref_hashed_active_session_or_flat(
250 active_session_prefix,
251 key_hash,
252 key,
253 now_ms,
254 ));
255 }
256 drop(shard);
257
258 #[cfg(feature = "telemetry")]
259 self.record_batch_metrics(start, &[route.shard_id]);
260
261 view.finish()
262 }
263
264 pub fn batch_get_session_view_routed(
267 &self,
268 route: EmbeddedSessionRoute,
269 keys: &[Bytes],
270 ) -> EmbeddedSessionBatchView {
271 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
272 self.batch_get_session_view_prehashed_routed(route, keys, &key_hashes)
273 }
274
275 pub fn batch_get_session_view_prehashed_routed(
278 &self,
279 route: EmbeddedSessionRoute,
280 keys: &[Bytes],
281 key_hashes: &[u64],
282 ) -> EmbeddedSessionBatchView {
283 assert_eq!(
284 keys.len(),
285 key_hashes.len(),
286 "keys and key_hashes must have matching lengths",
287 );
288 if keys.is_empty() {
289 return EmbeddedBatchReadView {
290 items: Vec::new(),
291 hit_count: 0,
292 total_bytes: 0,
293 };
294 }
295
296 #[cfg(feature = "telemetry")]
297 let start = self.metrics.as_ref().map(|_| Instant::now());
298 let now_ms = now_millis();
299 let mut shard = self.shards[route.shard_id].write();
300 let session_prefix = batch_derived_session_storage_prefix(keys);
301 let active_session_prefix = session_prefix
302 .as_ref()
303 .filter(|prefix| shard.session_slots.has_session(prefix))
304 .map(Vec::as_slice);
305
306 let mut view = BatchReadViewBuilder::new(keys.len());
307 for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
308 view.push(shard.get_ref_hashed_active_session_or_flat(
309 active_session_prefix,
310 key_hash,
311 key,
312 now_ms,
313 ));
314 }
315 drop(shard);
316
317 #[cfg(feature = "telemetry")]
318 self.record_batch_metrics(start, &[route.shard_id]);
319
320 view.finish()
321 }
322
323 pub fn batch_get_packed(&self, keys: &[Bytes]) -> PackedBatch {
326 let total = keys.len();
327 if total == 0 {
328 return PackedBatch::default();
329 }
330
331 #[cfg(feature = "telemetry")]
332 let start = self.metrics.as_ref().map(|_| Instant::now());
333 let now_ms = now_millis();
334 if let Some(shard_id) = self.single_shard_batch_route(keys) {
335 let mut shard = self.shards[shard_id].write();
336 let mut packed = PackedBatchBuilder::new(total);
337 for key in keys {
338 let (_, key_hash) = self.hashes_for_key(key);
339 packed.push(shard.get_ref_hashed_session_or_flat(key_hash, key, now_ms));
340 }
341 #[cfg(feature = "telemetry")]
342 self.record_batch_metrics(start, &[shard_id]);
343 return packed.finish();
344 }
345
346 let mut groups = vec![Vec::<(usize, &Bytes, u64)>::new(); self.shards.len()];
347 let mut touched = Vec::new();
348 for (index, key) in keys.iter().enumerate() {
349 let (route_hash, key_hash) = self.hashes_for_key(key);
350 let shard_id = self.route_hash(route_hash);
351 if groups[shard_id].is_empty() {
352 touched.push(shard_id);
353 }
354 groups[shard_id].push((index, key, key_hash));
355 }
356
357 let mut packed = OrderedPackedBatchBuilder::new(total);
358
359 for (shard_id, batch) in groups.into_iter().enumerate() {
360 if batch.is_empty() {
361 continue;
362 }
363 let mut shard = self.shards[shard_id].write();
364 for (index, key, key_hash) in batch {
365 if let Some(value) = shard.get_ref_hashed_session_or_flat(key_hash, key, now_ms) {
366 packed.record_hit(index, value);
367 }
368 }
369 }
370
371 let packed = packed.finish();
372 #[cfg(feature = "telemetry")]
373 self.record_batch_metrics(start, &touched);
374 packed
375 }
376
377 pub fn batch_get_session_packed(&self, session_prefix: &[u8], keys: &[Bytes]) -> PackedBatch {
379 if keys.is_empty() {
380 return PackedBatch::default();
381 }
382
383 #[cfg(feature = "telemetry")]
384 let start = self.metrics.as_ref().map(|_| Instant::now());
385 let route = self.route_session(session_prefix);
386 let now_ms = now_millis();
387 let mut shard = self.shards[route.shard_id].write();
388 let active_session_prefix = shard
389 .session_slots
390 .has_session(session_prefix)
391 .then_some(session_prefix);
392 let mut packed = PackedBatchBuilder::new(keys.len());
393 for key in keys {
394 let key_hash = hash_key(key);
395 packed.push(shard.get_ref_hashed_active_session_or_flat(
396 active_session_prefix,
397 key_hash,
398 key,
399 now_ms,
400 ));
401 }
402 #[cfg(feature = "telemetry")]
403 self.record_batch_metrics(start, &[route.shard_id]);
404 packed.finish()
405 }
406
407 pub fn batch_get_session_packed_routed(
409 &self,
410 route: EmbeddedSessionRoute,
411 keys: &[Bytes],
412 ) -> PackedBatch {
413 if keys.is_empty() {
414 return PackedBatch::default();
415 }
416
417 #[cfg(feature = "telemetry")]
418 let start = self.metrics.as_ref().map(|_| Instant::now());
419 let now_ms = now_millis();
420 let mut shard = self.shards[route.shard_id].write();
421 let session_prefix = batch_derived_session_storage_prefix(keys);
422 let active_session_prefix = session_prefix
423 .as_ref()
424 .filter(|prefix| shard.session_slots.has_session(prefix))
425 .map(Vec::as_slice);
426 let mut packed = PackedBatchBuilder::new(keys.len());
427 for key in keys {
428 let key_hash = hash_key(key);
429 packed.push(shard.get_ref_hashed_active_session_or_flat(
430 active_session_prefix,
431 key_hash,
432 key,
433 now_ms,
434 ));
435 }
436 #[cfg(feature = "telemetry")]
437 self.record_batch_metrics(start, &[route.shard_id]);
438 packed.finish()
439 }
440}