Skip to main content

fast_cache/storage/embedded_store/
batch.rs

1use super::batch_results::{
2    BatchReadViewBuilder, OrderedBatchReadViewBuilder, OrderedPackedBatchBuilder,
3    PackedBatchBuilder,
4};
5use super::*;
6
7impl EmbeddedStore {
8    /// Returns owned values for `keys` in request order.
9    pub fn batch_get(&self, keys: Vec<Bytes>) -> Vec<Option<Bytes>> {
10        let total = keys.len();
11        if total == 0 {
12            return Vec::new();
13        }
14
15        #[cfg(feature = "telemetry")]
16        let start = self.metrics.as_ref().map(|_| Instant::now());
17        let now_ms = now_millis();
18        if let Some(shard_id) = self.single_shard_batch_route(&keys) {
19            let mut shard = self.shards[shard_id].write();
20            let values = keys
21                .into_iter()
22                .map(|key| {
23                    let (_, key_hash) = self.hashes_for_key(&key);
24                    shard
25                        .get_ref_hashed_session_or_flat(key_hash, &key, now_ms)
26                        .map(<[u8]>::to_vec)
27                })
28                .collect();
29            #[cfg(feature = "telemetry")]
30            self.record_batch_metrics(start, &[shard_id]);
31            return values;
32        }
33
34        let mut groups = vec![Vec::<(usize, Bytes, u64)>::new(); self.shards.len()];
35        let mut touched = Vec::new();
36
37        for (index, key) in keys.into_iter().enumerate() {
38            let (route_hash, key_hash) = self.hashes_for_key(&key);
39            let shard_id = self.route_hash(route_hash);
40            if groups[shard_id].is_empty() {
41                touched.push(shard_id);
42            }
43            groups[shard_id].push((index, key, key_hash));
44        }
45
46        let mut values = vec![None; total];
47        for (shard_id, batch) in groups.into_iter().enumerate() {
48            if batch.is_empty() {
49                continue;
50            }
51            let mut shard = self.shards[shard_id].write();
52            for (index, key, key_hash) in batch {
53                values[index] = shard
54                    .get_ref_hashed_session_or_flat(key_hash, &key, now_ms)
55                    .map(<[u8]>::to_vec);
56            }
57        }
58        #[cfg(feature = "telemetry")]
59        self.record_batch_metrics(start, &touched);
60        values
61    }
62
63    /// Returns owned views for a generic batch.
64    ///
65    /// This path may touch multiple shards under `full_key` routing; each hit
66    /// is materialized into an owned `bytes::Bytes` handle.
67    pub fn batch_get_view(&self, keys: &[Bytes]) -> EmbeddedBatchReadView {
68        let total = keys.len();
69        if total == 0 {
70            return EmbeddedBatchReadView {
71                items: Vec::new(),
72                hit_count: 0,
73                total_bytes: 0,
74            };
75        }
76
77        #[cfg(feature = "telemetry")]
78        let start = self.metrics.as_ref().map(|_| Instant::now());
79        let now_ms = now_millis();
80
81        if let Some(shard_id) = self.single_shard_batch_route(keys) {
82            let mut shard = self.shards[shard_id].write();
83            let mut view = BatchReadViewBuilder::new(keys.len());
84            for key in keys {
85                let (_, key_hash) = self.hashes_for_key(key);
86                view.push(shard.get_ref_hashed_published_session_or_flat(key_hash, key, now_ms));
87            }
88            drop(shard);
89            #[cfg(feature = "telemetry")]
90            self.record_batch_metrics(start, &[shard_id]);
91            return view.finish();
92        }
93
94        let mut groups = vec![Vec::<(usize, &Bytes, u64)>::new(); self.shards.len()];
95        let mut touched = Vec::new();
96        for (index, key) in keys.iter().enumerate() {
97            let (route_hash, key_hash) = self.hashes_for_key(key);
98            let shard_id = self.route_hash(route_hash);
99            if groups[shard_id].is_empty() {
100                touched.push(shard_id);
101            }
102            groups[shard_id].push((index, key, key_hash));
103        }
104
105        let mut view = OrderedBatchReadViewBuilder::new(total);
106        for (shard_id, batch) in groups.into_iter().enumerate() {
107            if batch.is_empty() {
108                continue;
109            }
110            let mut shard = self.shards[shard_id].write();
111            for (index, key, key_hash) in batch {
112                let value = shard.get_ref_hashed_published_session_or_flat(key_hash, key, now_ms);
113                if let Some(value) = value {
114                    view.record_hit(index, value);
115                }
116            }
117            drop(shard);
118        }
119
120        #[cfg(feature = "telemetry")]
121        self.record_batch_metrics(start, &touched);
122        view.finish()
123    }
124
125    /// Retrieves a session-scoped batch from a known shard without rediscovering
126    /// placement from every full chunk key.
127    pub fn batch_get_session(&self, session_prefix: &[u8], keys: &[Bytes]) -> Vec<Option<Bytes>> {
128        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
129        self.batch_get_session_prehashed(session_prefix, keys, &key_hashes)
130    }
131
132    pub fn batch_get_session_prehashed(
133        &self,
134        session_prefix: &[u8],
135        keys: &[Bytes],
136        key_hashes: &[u64],
137    ) -> Vec<Option<Bytes>> {
138        assert_eq!(
139            keys.len(),
140            key_hashes.len(),
141            "keys and key_hashes must have matching lengths",
142        );
143        if keys.is_empty() {
144            return Vec::new();
145        }
146
147        #[cfg(feature = "telemetry")]
148        let start = self.metrics.as_ref().map(|_| Instant::now());
149        let route = self.route_session(session_prefix);
150        let now_ms = now_millis();
151        let mut shard = self.shards[route.shard_id].write();
152        let active_session_prefix = shard
153            .session_slots
154            .has_session(session_prefix)
155            .then_some(session_prefix);
156        let values = keys
157            .iter()
158            .zip(key_hashes.iter().copied())
159            .map(|(key, key_hash)| {
160                shard
161                    .get_ref_hashed_active_session_or_flat(
162                        active_session_prefix,
163                        key_hash,
164                        key,
165                        now_ms,
166                    )
167                    .map(<[u8]>::to_vec)
168            })
169            .collect();
170        #[cfg(feature = "telemetry")]
171        self.record_batch_metrics(start, &[route.shard_id]);
172        values
173    }
174
175    /// Retrieves a session-scoped batch from a precomputed shard route.
176    pub fn batch_get_session_routed(
177        &self,
178        route: EmbeddedSessionRoute,
179        keys: &[Bytes],
180    ) -> Vec<Option<Bytes>> {
181        let total = keys.len();
182        if total == 0 {
183            return Vec::new();
184        }
185
186        #[cfg(feature = "telemetry")]
187        let start = self.metrics.as_ref().map(|_| Instant::now());
188        let now_ms = now_millis();
189        let mut shard = self.shards[route.shard_id].write();
190        let values = keys
191            .iter()
192            .map(|key| {
193                let key_hash = hash_key(key);
194                shard
195                    .map
196                    .get_ref_hashed(key_hash, key, now_ms)
197                    .map(<[u8]>::to_vec)
198            })
199            .collect();
200        #[cfg(feature = "telemetry")]
201        self.record_batch_metrics(start, &[route.shard_id]);
202        values
203    }
204
205    /// Returns owned views for a session-scoped batch.
206    ///
207    /// Session-slot values are copied into owned handles so the returned view
208    /// remains valid after later session updates.
209    pub fn batch_get_session_view(
210        &self,
211        session_prefix: &[u8],
212        keys: &[Bytes],
213    ) -> EmbeddedSessionBatchView {
214        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
215        self.batch_get_session_view_prehashed(session_prefix, keys, &key_hashes)
216    }
217
218    pub fn batch_get_session_view_prehashed(
219        &self,
220        session_prefix: &[u8],
221        keys: &[Bytes],
222        key_hashes: &[u64],
223    ) -> EmbeddedSessionBatchView {
224        assert_eq!(
225            keys.len(),
226            key_hashes.len(),
227            "keys and key_hashes must have matching lengths",
228        );
229        if keys.is_empty() {
230            return EmbeddedBatchReadView {
231                items: Vec::new(),
232                hit_count: 0,
233                total_bytes: 0,
234            };
235        }
236
237        #[cfg(feature = "telemetry")]
238        let start = self.metrics.as_ref().map(|_| Instant::now());
239        let route = self.route_session(session_prefix);
240        let now_ms = now_millis();
241        let mut shard = self.shards[route.shard_id].write();
242
243        let active_session_prefix = shard
244            .session_slots
245            .has_session(session_prefix)
246            .then_some(session_prefix);
247        let mut view = BatchReadViewBuilder::new(keys.len());
248        for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
249            view.push(shard.get_ref_hashed_active_session_or_flat(
250                active_session_prefix,
251                key_hash,
252                key,
253                now_ms,
254            ));
255        }
256        drop(shard);
257
258        #[cfg(feature = "telemetry")]
259        self.record_batch_metrics(start, &[route.shard_id]);
260
261        view.finish()
262    }
263
264    /// Returns owned views for a session-scoped batch from a precomputed
265    /// shard route.
266    pub fn batch_get_session_view_routed(
267        &self,
268        route: EmbeddedSessionRoute,
269        keys: &[Bytes],
270    ) -> EmbeddedSessionBatchView {
271        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
272        self.batch_get_session_view_prehashed_routed(route, keys, &key_hashes)
273    }
274
275    /// Returns owned views for a session-scoped batch from a precomputed
276    /// shard route and precomputed full-key hashes.
277    pub fn batch_get_session_view_prehashed_routed(
278        &self,
279        route: EmbeddedSessionRoute,
280        keys: &[Bytes],
281        key_hashes: &[u64],
282    ) -> EmbeddedSessionBatchView {
283        assert_eq!(
284            keys.len(),
285            key_hashes.len(),
286            "keys and key_hashes must have matching lengths",
287        );
288        if keys.is_empty() {
289            return EmbeddedBatchReadView {
290                items: Vec::new(),
291                hit_count: 0,
292                total_bytes: 0,
293            };
294        }
295
296        #[cfg(feature = "telemetry")]
297        let start = self.metrics.as_ref().map(|_| Instant::now());
298        let now_ms = now_millis();
299        let mut shard = self.shards[route.shard_id].write();
300        let session_prefix = batch_derived_session_storage_prefix(keys);
301        let active_session_prefix = session_prefix
302            .as_ref()
303            .filter(|prefix| shard.session_slots.has_session(prefix))
304            .map(Vec::as_slice);
305
306        let mut view = BatchReadViewBuilder::new(keys.len());
307        for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
308            view.push(shard.get_ref_hashed_active_session_or_flat(
309                active_session_prefix,
310                key_hash,
311                key,
312                now_ms,
313            ));
314        }
315        drop(shard);
316
317        #[cfg(feature = "telemetry")]
318        self.record_batch_metrics(start, &[route.shard_id]);
319
320        view.finish()
321    }
322
323    /// Retrieves a generic packed batch. This is still copy-out, but it packs
324    /// all returned bytes into one contiguous buffer to minimize object churn.
325    pub fn batch_get_packed(&self, keys: &[Bytes]) -> PackedBatch {
326        let total = keys.len();
327        if total == 0 {
328            return PackedBatch::default();
329        }
330
331        #[cfg(feature = "telemetry")]
332        let start = self.metrics.as_ref().map(|_| Instant::now());
333        let now_ms = now_millis();
334        if let Some(shard_id) = self.single_shard_batch_route(keys) {
335            let mut shard = self.shards[shard_id].write();
336            let mut packed = PackedBatchBuilder::new(total);
337            for key in keys {
338                let (_, key_hash) = self.hashes_for_key(key);
339                packed.push(shard.get_ref_hashed_session_or_flat(key_hash, key, now_ms));
340            }
341            #[cfg(feature = "telemetry")]
342            self.record_batch_metrics(start, &[shard_id]);
343            return packed.finish();
344        }
345
346        let mut groups = vec![Vec::<(usize, &Bytes, u64)>::new(); self.shards.len()];
347        let mut touched = Vec::new();
348        for (index, key) in keys.iter().enumerate() {
349            let (route_hash, key_hash) = self.hashes_for_key(key);
350            let shard_id = self.route_hash(route_hash);
351            if groups[shard_id].is_empty() {
352                touched.push(shard_id);
353            }
354            groups[shard_id].push((index, key, key_hash));
355        }
356
357        let mut packed = OrderedPackedBatchBuilder::new(total);
358
359        for (shard_id, batch) in groups.into_iter().enumerate() {
360            if batch.is_empty() {
361                continue;
362            }
363            let mut shard = self.shards[shard_id].write();
364            for (index, key, key_hash) in batch {
365                if let Some(value) = shard.get_ref_hashed_session_or_flat(key_hash, key, now_ms) {
366                    packed.record_hit(index, value);
367                }
368            }
369        }
370
371        let packed = packed.finish();
372        #[cfg(feature = "telemetry")]
373        self.record_batch_metrics(start, &touched);
374        packed
375    }
376
377    /// Retrieves a session-scoped packed batch from a known shard.
378    pub fn batch_get_session_packed(&self, session_prefix: &[u8], keys: &[Bytes]) -> PackedBatch {
379        if keys.is_empty() {
380            return PackedBatch::default();
381        }
382
383        #[cfg(feature = "telemetry")]
384        let start = self.metrics.as_ref().map(|_| Instant::now());
385        let route = self.route_session(session_prefix);
386        let now_ms = now_millis();
387        let mut shard = self.shards[route.shard_id].write();
388        let active_session_prefix = shard
389            .session_slots
390            .has_session(session_prefix)
391            .then_some(session_prefix);
392        let mut packed = PackedBatchBuilder::new(keys.len());
393        for key in keys {
394            let key_hash = hash_key(key);
395            packed.push(shard.get_ref_hashed_active_session_or_flat(
396                active_session_prefix,
397                key_hash,
398                key,
399                now_ms,
400            ));
401        }
402        #[cfg(feature = "telemetry")]
403        self.record_batch_metrics(start, &[route.shard_id]);
404        packed.finish()
405    }
406
407    /// Retrieves a session-scoped packed batch from a precomputed shard route.
408    pub fn batch_get_session_packed_routed(
409        &self,
410        route: EmbeddedSessionRoute,
411        keys: &[Bytes],
412    ) -> PackedBatch {
413        if keys.is_empty() {
414            return PackedBatch::default();
415        }
416
417        #[cfg(feature = "telemetry")]
418        let start = self.metrics.as_ref().map(|_| Instant::now());
419        let now_ms = now_millis();
420        let mut shard = self.shards[route.shard_id].write();
421        let session_prefix = batch_derived_session_storage_prefix(keys);
422        let active_session_prefix = session_prefix
423            .as_ref()
424            .filter(|prefix| shard.session_slots.has_session(prefix))
425            .map(Vec::as_slice);
426        let mut packed = PackedBatchBuilder::new(keys.len());
427        for key in keys {
428            let key_hash = hash_key(key);
429            packed.push(shard.get_ref_hashed_active_session_or_flat(
430                active_session_prefix,
431                key_hash,
432                key,
433                now_ms,
434            ));
435        }
436        #[cfg(feature = "telemetry")]
437        self.record_batch_metrics(start, &[route.shard_id]);
438        packed.finish()
439    }
440}