Skip to main content

fast_cache/storage/embedded_store/
owned.rs

1#[cfg(not(feature = "embedded-read-biased-lock"))]
2use parking_lot::RwLockWriteGuard;
3#[cfg(feature = "embedded-read-biased-lock")]
4use rblock::RwLockWriteGuard;
5
6use crate::storage::FlatMap;
7
8use super::batch_results::{
9    BatchReadViewBuilder, OrderedBatchReadViewBuilder, OrderedPackedBatchBuilder,
10    PackedBatchBuilder,
11};
12use super::*;
13
14/// Exclusive shard-local handle for slot-owned embedded workers.
15///
16/// This is the no-contention path for workloads that can guarantee one thread
17/// owns a shard's slot range. The caller locks the shard once, then performs
18/// many point operations without re-entering the shared store routing path.
19#[derive(Debug)]
20pub struct EmbeddedShardHandle<'a> {
21    shard_id: usize,
22    shard: RwLockWriteGuard<'a, EmbeddedShard>,
23}
24
25impl<'a> EmbeddedShardHandle<'a> {
26    #[inline(always)]
27    pub fn shard_id(&self) -> usize {
28        self.shard_id
29    }
30
31    #[inline(always)]
32    pub fn get_ref_no_ttl_hashed(&mut self, key_hash: u64, key: &[u8]) -> Option<&[u8]> {
33        self.shard.map.get_ref_hashed_no_ttl(key_hash, key)
34    }
35
36    #[inline(always)]
37    pub fn set_hashed_no_ttl<K, V>(&mut self, key_hash: u64, key: K, value: V)
38    where
39        K: Into<Bytes>,
40        V: Into<Bytes>,
41    {
42        self.shard.map.set_hashed(key_hash, key, value, None, 0);
43        self.shard.enforce_memory_limit(0);
44    }
45
46    #[inline(always)]
47    pub fn set_slice_hashed_no_ttl(&mut self, key_hash: u64, key: &[u8], value: &[u8]) {
48        self.shard
49            .map
50            .set_slice_hashed(key_hash, key, value, None, 0);
51        self.shard.enforce_memory_limit(0);
52    }
53}
54
55/// Owned shard for slot-range workers.
56///
57/// This is the no-mutex execution path: one worker thread owns this shard and
58/// accesses its `FlatMap` directly.
59#[derive(Debug)]
60pub struct OwnedEmbeddedShard {
61    shard_id: usize,
62    map: FlatMap,
63    session_slots: SessionSlotMap,
64    memory_limit_bytes: Option<usize>,
65    eviction_policy: EvictionPolicy,
66}
67
68impl OwnedEmbeddedShard {
69    #[inline(always)]
70    fn update_lazy_read_sampling(&mut self) {
71        let enabled = if self.eviction_policy == EvictionPolicy::None {
72            false
73        } else if let Some(limit) = self.memory_limit_bytes {
74            let watermark = limit.saturating_mul(3) / 4;
75            self.stored_bytes() >= watermark.max(1)
76        } else {
77            false
78        };
79        self.session_slots.configure_read_sampling(enabled);
80    }
81
82    #[inline(always)]
83    pub fn shard_id(&self) -> usize {
84        self.shard_id
85    }
86
87    #[inline(always)]
88    pub fn get_ref_no_ttl_hashed(&mut self, key_hash: u64, key: &[u8]) -> Option<&[u8]> {
89        self.map.get_ref_hashed_no_ttl(key_hash, key)
90    }
91
92    #[inline(always)]
93    pub fn set_hashed_no_ttl<K, V>(&mut self, key_hash: u64, key: K, value: V)
94    where
95        K: Into<Bytes>,
96        V: Into<Bytes>,
97    {
98        self.map.set_hashed(key_hash, key, value, None, 0);
99        self.enforce_memory_limit(0);
100    }
101
102    #[inline(always)]
103    pub fn set_slice_hashed_no_ttl(&mut self, key_hash: u64, key: &[u8], value: &[u8]) {
104        self.map.set_slice_hashed(key_hash, key, value, None, 0);
105        self.enforce_memory_limit(0);
106    }
107
108    #[inline(always)]
109    pub fn get_session_ref_hashed_no_ttl(
110        &mut self,
111        session_prefix: &[u8],
112        key_hash: u64,
113        key: &[u8],
114    ) -> Option<&[u8]> {
115        self.session_slots
116            .get_ref_hashed(session_prefix, key_hash, key)
117    }
118
119    #[inline(always)]
120    pub fn get_ref_hashed_session_or_flat(
121        &mut self,
122        key_hash: u64,
123        key: &[u8],
124        now_ms: u64,
125    ) -> Option<&[u8]> {
126        if let Some(session_prefix) = derived_session_storage_prefix(key)
127            && let Some(value) = self
128                .session_slots
129                .get_ref_hashed(&session_prefix, key_hash, key)
130        {
131            return Some(value);
132        }
133
134        self.map.get_ref_hashed(key_hash, key, now_ms)
135    }
136
137    #[inline(always)]
138    pub fn get_ref_hashed_published_session_or_flat(
139        &mut self,
140        key_hash: u64,
141        key: &[u8],
142        now_ms: u64,
143    ) -> Option<&[u8]> {
144        if let Some(session_prefix) = derived_session_storage_prefix(key)
145            && self.session_slots.has_session(&session_prefix)
146        {
147            return self
148                .session_slots
149                .get_ref_hashed(&session_prefix, key_hash, key);
150        }
151
152        self.map.get_ref_hashed(key_hash, key, now_ms)
153    }
154
155    #[inline(always)]
156    pub fn get_ref_hashed_active_session_or_flat(
157        &mut self,
158        active_session_prefix: Option<&[u8]>,
159        key_hash: u64,
160        key: &[u8],
161        now_ms: u64,
162    ) -> Option<&[u8]> {
163        match active_session_prefix {
164            Some(session_prefix) => {
165                self.session_slots
166                    .get_ref_hashed(session_prefix, key_hash, key)
167            }
168            None => self.map.get_ref_hashed(key_hash, key, now_ms),
169        }
170    }
171
172    #[inline(always)]
173    pub fn get_ref_hashed_active_session_or_no_ttl_flat(
174        &mut self,
175        active_session_prefix: Option<&[u8]>,
176        key_hash: u64,
177        key: &[u8],
178    ) -> Option<&[u8]> {
179        match active_session_prefix {
180            Some(session_prefix) => {
181                self.get_session_ref_hashed_no_ttl(session_prefix, key_hash, key)
182            }
183            None => self.get_ref_no_ttl_hashed(key_hash, key),
184        }
185    }
186
187    #[inline(always)]
188    pub fn set_session_slice_hashed_no_ttl(
189        &mut self,
190        session_prefix: &[u8],
191        key_hash: u64,
192        key: &[u8],
193        value: &[u8],
194    ) {
195        self.map.delete_hashed(key_hash, key, 0);
196        self.session_slots
197            .set_slice_hashed(session_prefix, key_hash, key, value);
198        self.enforce_memory_limit(0);
199    }
200
201    fn stored_bytes(&self) -> usize {
202        self.map
203            .stored_bytes()
204            .saturating_add(self.session_slots.stored_bytes())
205    }
206
207    #[inline(always)]
208    fn exceeds_memory_limit(&self) -> bool {
209        self.memory_limit_bytes
210            .is_some_and(|limit| self.stored_bytes() > limit)
211    }
212
213    #[inline(always)]
214    fn enforce_memory_limit(&mut self, now_ms: u64) {
215        self.update_lazy_read_sampling();
216        let Some(limit) = self.memory_limit_bytes else {
217            return;
218        };
219        if self.stored_bytes() <= limit {
220            return;
221        }
222
223        self.map.process_maintenance(now_ms);
224        if self.session_slots.is_empty() {
225            self.map.evict_to_memory_target(
226                self.eviction_policy,
227                now_ms,
228                FlatMap::eviction_target_bytes(limit),
229            );
230            self.update_lazy_read_sampling();
231            return;
232        }
233        while self.stored_bytes() > limit {
234            let map_candidate = self.map.eviction_candidate(self.eviction_policy);
235            let session_candidate = self.session_slots.eviction_candidate(self.eviction_policy);
236            let evicted = match (map_candidate, session_candidate) {
237                (Some((map_rank, _, _)), Some((session_rank, _, _, _))) => {
238                    if session_rank < map_rank {
239                        self.session_slots.evict_with_policy(self.eviction_policy)
240                    } else {
241                        self.map.evict_with_policy(self.eviction_policy, now_ms)
242                    }
243                }
244                (Some((_rank, _, _)), None) => {
245                    self.map.evict_with_policy(self.eviction_policy, now_ms)
246                }
247                (None, Some((_rank, _, _, _))) => {
248                    self.session_slots.evict_with_policy(self.eviction_policy)
249                }
250                (None, None) => false,
251            };
252            if !evicted {
253                break;
254            }
255        }
256        self.update_lazy_read_sampling();
257    }
258}
259
260/// One worker-local shard set for the slot-owned embedded path.
261///
262/// The worker owns every shard in this set exclusively, so routed operations
263/// execute without any mutex on the hot path.
264#[derive(Debug)]
265pub struct OwnedEmbeddedWorkerShards {
266    route_mode: EmbeddedRouteMode,
267    shard_count: usize,
268    shift: u32,
269    shard_lookup: Vec<usize>,
270    shards: Vec<OwnedEmbeddedShard>,
271    #[cfg(feature = "telemetry")]
272    metrics: Option<Arc<CacheTelemetry>>,
273}
274
275impl OwnedEmbeddedWorkerShards {
276    #[cfg(feature = "embedded")]
277    pub(crate) fn local_tier_stats_snapshot(
278        &self,
279    ) -> (TierStatsSnapshot, TierStatsSnapshot, TierStatsSnapshot) {
280        let mut hot = TierStatsSnapshot {
281            name: "hot",
282            ..TierStatsSnapshot::default()
283        };
284        let mut warm = TierStatsSnapshot {
285            name: "warm",
286            ..TierStatsSnapshot::default()
287        };
288        let mut cold = TierStatsSnapshot {
289            name: "cold",
290            ..TierStatsSnapshot::default()
291        };
292
293        for shard in &self.shards {
294            let (shard_hot, shard_warm, shard_cold) = shard.map.stats_snapshot();
295            accumulate_tier_stats(&mut hot, &shard_hot);
296            accumulate_tier_stats(&mut warm, &shard_warm);
297            accumulate_tier_stats(&mut cold, &shard_cold);
298        }
299
300        (hot, warm, cold)
301    }
302
303    fn new(
304        route_mode: EmbeddedRouteMode,
305        shard_count: usize,
306        shards: Vec<OwnedEmbeddedShard>,
307        #[cfg(feature = "telemetry")] metrics: Option<Arc<CacheTelemetry>>,
308    ) -> Self {
309        assert_valid_shard_count(shard_count);
310        let mut shard_lookup = vec![usize::MAX; shard_count];
311        for (index, shard) in shards.iter().enumerate() {
312            shard_lookup[shard.shard_id()] = index;
313        }
314        Self {
315            route_mode,
316            shard_count,
317            shift: shift_for(shard_count),
318            shard_lookup,
319            shards,
320            #[cfg(feature = "telemetry")]
321            metrics,
322        }
323    }
324
325    #[inline(always)]
326    pub fn shard_count(&self) -> usize {
327        self.shard_count
328    }
329
330    #[inline(always)]
331    pub fn route_mode(&self) -> EmbeddedRouteMode {
332        self.route_mode
333    }
334
335    #[inline(always)]
336    pub fn worker_shard_count(&self) -> usize {
337        self.shards.len()
338    }
339
340    #[inline(always)]
341    pub fn owns_shard(&self, shard_id: usize) -> bool {
342        self.shard_lookup
343            .get(shard_id)
344            .copied()
345            .is_some_and(|index| index != usize::MAX)
346    }
347
348    #[inline(always)]
349    pub fn route_key(&self, key: &[u8]) -> EmbeddedKeyRoute {
350        compute_key_route(self.route_mode, self.shift, key)
351    }
352
353    #[inline(always)]
354    pub fn prepare_point_key(&self, key: &[u8]) -> PreparedPointKey {
355        let route = self.route_key(key);
356        PreparedPointKey {
357            route,
358            key_len: key.len(),
359            key_tag: hash_key_tag_from_hash(route.key_hash),
360            key: key.to_vec(),
361        }
362    }
363
364    #[inline(always)]
365    pub fn route_session(&self, session_prefix: &[u8]) -> EmbeddedSessionRoute {
366        EmbeddedSessionRoute {
367            shard_id: compute_session_shard(self.shift, session_prefix),
368        }
369    }
370
371    #[inline(always)]
372    pub fn begin_read_session(&mut self) -> OwnedEmbeddedWorkerReadSession<'_> {
373        OwnedEmbeddedWorkerReadSession {
374            opened: vec![false; self.shards.len()],
375            opened_indices: Vec::with_capacity(self.shards.len()),
376            worker: self,
377        }
378    }
379
380    #[inline(always)]
381    pub fn get_ref_no_ttl_routed(&mut self, route: EmbeddedKeyRoute, key: &[u8]) -> Option<&[u8]> {
382        self.shard_for_route_mut(route.shard_id)
383            .get_ref_no_ttl_hashed(route.key_hash, key)
384    }
385
386    #[inline(always)]
387    pub fn get_prepared_ref_no_ttl(&mut self, prepared: &PreparedPointKey) -> Option<&[u8]> {
388        self.shard_for_route_mut(prepared.route().shard_id)
389            .map
390            .get_ref_hashed_prepared_no_ttl(
391                prepared.route().key_hash,
392                prepared.key(),
393                prepared.key_tag(),
394            )
395    }
396
397    #[cfg(feature = "embedded")]
398    pub(crate) fn local_get_slice(&mut self, key: &[u8]) -> Option<EmbeddedReadSlice> {
399        let route = self.route_key(key);
400        self.local_get_slice_routed(route, key)
401    }
402
403    #[cfg(feature = "embedded")]
404    #[inline(always)]
405    pub(crate) fn local_get_slice_routed_no_ttl(
406        &mut self,
407        route: EmbeddedKeyRoute,
408        key: &[u8],
409    ) -> Option<EmbeddedReadSlice> {
410        let shard = self.shard_for_route_mut(route.shard_id);
411        let value = if let Some(session_prefix) = derived_session_storage_prefix(key) {
412            if shard.session_slots.has_session(&session_prefix) {
413                shard
414                    .session_slots
415                    .get_ref_hashed_local(&session_prefix, route.key_hash, key)
416            } else {
417                shard.map.get_ref_hashed_no_ttl(route.key_hash, key)
418            }
419        } else {
420            shard.map.get_ref_hashed_no_ttl(route.key_hash, key)
421        };
422        value.map(EmbeddedReadSlice::from_slice)
423    }
424
425    #[cfg(feature = "embedded")]
426    #[inline(always)]
427    pub(crate) fn local_get_slice_routed(
428        &mut self,
429        route: EmbeddedKeyRoute,
430        key: &[u8],
431    ) -> Option<EmbeddedReadSlice> {
432        let now_ms = now_millis();
433        let shard = self.shard_for_route_mut(route.shard_id);
434        let value = if let Some(session_prefix) = derived_session_storage_prefix(key) {
435            if shard.session_slots.has_session(&session_prefix) {
436                shard
437                    .session_slots
438                    .get_ref_hashed_local(&session_prefix, route.key_hash, key)
439            } else {
440                shard.map.get_ref_hashed_local(route.key_hash, key, now_ms)
441            }
442        } else {
443            shard.map.get_ref_hashed_local(route.key_hash, key, now_ms)
444        };
445        value.map(EmbeddedReadSlice::from_slice)
446    }
447
448    #[cfg(feature = "embedded")]
449    #[inline(always)]
450    pub(crate) fn local_get_point_ref_routed_no_ttl(
451        &mut self,
452        route: EmbeddedKeyRoute,
453        key: &[u8],
454    ) -> Option<&[u8]> {
455        self.shard_for_route_mut(route.shard_id)
456            .map
457            .get_ref_hashed_no_ttl(route.key_hash, key)
458    }
459
460    #[cfg(feature = "embedded")]
461    #[inline(always)]
462    pub(crate) fn local_get_point_ref_prepared_routed_no_ttl(
463        &mut self,
464        prepared: &PreparedPointKey,
465    ) -> Option<&[u8]> {
466        self.shard_for_route_mut(prepared.route().shard_id)
467            .map
468            .get_ref_hashed_prepared_no_ttl(
469                prepared.route().key_hash,
470                prepared.key(),
471                prepared.key_tag(),
472            )
473    }
474
475    #[cfg(feature = "embedded")]
476    #[inline(always)]
477    pub(crate) fn local_get_ref_routed_no_ttl(
478        &mut self,
479        route: EmbeddedKeyRoute,
480        key: &[u8],
481    ) -> Option<&[u8]> {
482        let shard = self.shard_for_route_mut(route.shard_id);
483        if let Some(session_prefix) = derived_session_storage_prefix(key)
484            && shard.session_slots.has_session(&session_prefix)
485        {
486            return shard
487                .session_slots
488                .get_ref_hashed_local(&session_prefix, route.key_hash, key);
489        }
490        shard.map.get_ref_hashed_no_ttl(route.key_hash, key)
491    }
492
493    #[cfg(feature = "embedded")]
494    pub(crate) fn local_batch_get_slices(
495        &mut self,
496        keys: &[Bytes],
497    ) -> Vec<Option<EmbeddedReadSlice>> {
498        keys.iter()
499            .map(|key| self.local_get_slice(key))
500            .collect::<Vec<_>>()
501    }
502
503    #[cfg(feature = "embedded")]
504    pub(crate) fn local_batch_get_session_slices_prehashed(
505        &mut self,
506        session_prefix: &[u8],
507        keys: &[Bytes],
508        key_hashes: &[u64],
509    ) -> Vec<Option<EmbeddedReadSlice>> {
510        assert_eq!(
511            keys.len(),
512            key_hashes.len(),
513            "keys and key_hashes must have matching lengths",
514        );
515        let route = self.route_session(session_prefix);
516        let shard = self.shard_for_route_mut(route.shard_id);
517        let now_ms = now_millis();
518        let active_session_prefix = shard
519            .session_slots
520            .has_session(session_prefix)
521            .then_some(session_prefix);
522        keys.iter()
523            .zip(key_hashes.iter().copied())
524            .map(|(key, key_hash)| {
525                let value = match active_session_prefix {
526                    Some(session_prefix) => {
527                        shard
528                            .session_slots
529                            .get_ref_hashed_local(session_prefix, key_hash, key)
530                    }
531                    None => shard.map.get_ref_hashed_local(key_hash, key, now_ms),
532                };
533                value.map(EmbeddedReadSlice::from_slice)
534            })
535            .collect::<Vec<_>>()
536    }
537
538    #[cfg(feature = "embedded")]
539    #[inline(always)]
540    pub(crate) fn local_get_session_ref_hashed_no_ttl<'a>(
541        &'a mut self,
542        session_prefix: &[u8],
543        key_hash: u64,
544        key: &[u8],
545    ) -> Option<&'a [u8]> {
546        let route = self.route_session(session_prefix);
547        let shard = self.shard_for_route_mut(route.shard_id);
548        if shard.session_slots.has_session(session_prefix) {
549            return shard
550                .session_slots
551                .get_ref_hashed_local(session_prefix, key_hash, key);
552        }
553        shard.map.get_ref_hashed_local(key_hash, key, now_millis())
554    }
555
556    #[cfg(feature = "embedded")]
557    pub(crate) fn local_get(&mut self, key: &[u8]) -> Option<Bytes> {
558        let route = self.route_key(key);
559        let now_ms = now_millis();
560        let shard = self.shard_for_route_mut(route.shard_id);
561        if let Some(session_prefix) = derived_session_storage_prefix(key)
562            && let Some(value) =
563                shard
564                    .session_slots
565                    .get_ref_hashed_local(&session_prefix, route.key_hash, key)
566        {
567            return Some(value.to_vec());
568        }
569        shard
570            .map
571            .get_ref_hashed_local(route.key_hash, key, now_ms)
572            .map(<[u8]>::to_vec)
573    }
574
575    #[cfg(feature = "embedded")]
576    pub(crate) fn local_set(&mut self, key: Bytes, value: Bytes, ttl_ms: Option<u64>) {
577        let now_ms = now_millis();
578        let route = self.route_key(&key);
579        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
580        let shard = self.shard_for_route_mut(route.shard_id);
581        if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
582            shard
583                .session_slots
584                .delete_hashed_local(&session_prefix, route.key_hash, &key);
585        }
586        shard
587            .map
588            .set_hashed_local(route.key_hash, key, value, expire_at_ms, now_ms);
589        shard.enforce_memory_limit(now_ms);
590    }
591
592    #[cfg(feature = "embedded")]
593    pub(crate) fn local_set_slice_no_ttl(
594        &mut self,
595        route: EmbeddedKeyRoute,
596        key: &[u8],
597        value: &[u8],
598    ) {
599        self.local_set_slice_tagged_no_ttl(
600            route,
601            hash_key_tag_from_hash(route.key_hash),
602            key,
603            value,
604        );
605    }
606
607    #[cfg(feature = "embedded")]
608    #[inline(always)]
609    pub(crate) fn local_set_slice(
610        &mut self,
611        route: EmbeddedKeyRoute,
612        key: &[u8],
613        value: &[u8],
614        ttl_ms: Option<u64>,
615    ) {
616        match ttl_ms {
617            None => self.local_set_slice_no_ttl(route, key, value),
618            Some(ttl_ms) => {
619                let now_ms = now_millis();
620                let expire_at_ms = Some(now_ms.saturating_add(ttl_ms));
621                let shard = self.shard_for_route_mut(route.shard_id);
622                if let Some(session_prefix) = point_write_session_storage_prefix(key) {
623                    shard
624                        .session_slots
625                        .delete_hashed_local(&session_prefix, route.key_hash, key);
626                }
627                shard
628                    .map
629                    .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
630                shard.enforce_memory_limit(now_ms);
631            }
632        }
633    }
634
635    #[cfg(feature = "embedded")]
636    pub(crate) fn local_set_prepared_slice_no_ttl(
637        &mut self,
638        prepared: &PreparedPointKey,
639        value: &[u8],
640    ) {
641        if self.route_mode == EmbeddedRouteMode::FullKey {
642            let route = prepared.route();
643            let shard = self.shard_for_route_mut(route.shard_id);
644            shard.map.set_slice_hashed_tagged_no_ttl_local(
645                route.key_hash,
646                prepared.key_tag(),
647                prepared.key(),
648                value,
649            );
650            if shard.exceeds_memory_limit() {
651                shard.enforce_memory_limit(0);
652            }
653            return;
654        }
655
656        self.local_set_slice_tagged_no_ttl(
657            prepared.route(),
658            prepared.key_tag(),
659            prepared.key(),
660            value,
661        );
662    }
663
664    #[cfg(feature = "embedded")]
665    pub(crate) fn local_set_slice_tagged_no_ttl(
666        &mut self,
667        route: EmbeddedKeyRoute,
668        key_tag: u64,
669        key: &[u8],
670        value: &[u8],
671    ) {
672        let shard = self.shard_for_route_mut(route.shard_id);
673        if let Some(session_prefix) = point_write_session_storage_prefix(key) {
674            shard
675                .session_slots
676                .delete_hashed_local(&session_prefix, route.key_hash, key);
677        }
678        shard
679            .map
680            .set_slice_hashed_tagged_no_ttl_local(route.key_hash, key_tag, key, value);
681        if shard.exceeds_memory_limit() {
682            shard.enforce_memory_limit(0);
683        }
684    }
685
686    #[cfg(feature = "embedded")]
687    pub(crate) fn local_batch_set(&mut self, items: Vec<(Bytes, Bytes)>, ttl_ms: Option<u64>) {
688        let now_ms = now_millis();
689        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
690        for (key, value) in items {
691            let route = self.route_key(&key);
692            let shard = self.shard_for_route_mut(route.shard_id);
693            if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
694                shard
695                    .session_slots
696                    .delete_hashed_local(&session_prefix, route.key_hash, &key);
697            }
698            shard
699                .map
700                .set_hashed_local(route.key_hash, key, value, expire_at_ms, now_ms);
701            shard.enforce_memory_limit(now_ms);
702        }
703    }
704
705    #[cfg(feature = "embedded")]
706    pub(crate) fn local_batch_set_session_owned_no_ttl(
707        &mut self,
708        session_prefix: Bytes,
709        items: Vec<(Bytes, Bytes)>,
710    ) {
711        self.local_batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
712            session_prefix,
713            items,
714        ));
715    }
716
717    #[cfg(feature = "embedded")]
718    pub(crate) fn local_batch_set_session_packed_no_ttl(&mut self, packed: PackedSessionWrite) {
719        if packed.item_count() == 0 {
720            return;
721        }
722        let route = self.route_session(packed.session_prefix());
723        let shard = self.shard_for_route_mut(route.shard_id);
724        for entry in packed.slab.entries.iter() {
725            shard.map.delete_hashed_local(entry.hash, &entry.key, 0);
726        }
727        shard.session_slots.replace_session_slab_local(packed);
728        shard.enforce_memory_limit(0);
729    }
730
731    #[cfg(feature = "embedded")]
732    pub(crate) fn local_delete(&mut self, key: &[u8]) -> bool {
733        let route = self.route_key(key);
734        let shard = self.shard_for_route_mut(route.shard_id);
735        let deleted_session = derived_session_storage_prefix(key).is_some_and(|session_prefix| {
736            shard
737                .session_slots
738                .delete_hashed_local(&session_prefix, route.key_hash, key)
739        });
740        let deleted_map = shard.map.delete_hashed_local(route.key_hash, key, 0);
741        deleted_session || deleted_map
742    }
743
744    #[cfg(feature = "embedded")]
745    pub(crate) fn local_exists(&mut self, key: &[u8]) -> bool {
746        self.local_get(key).is_some()
747    }
748
749    #[cfg(feature = "embedded")]
750    pub(crate) fn local_ttl_seconds(&mut self, key: &[u8]) -> i64 {
751        let route = self.route_key(key);
752        let now_ms = now_millis();
753        let shard = self.shard_for_route_mut(route.shard_id);
754        if let Some(session_prefix) = derived_session_storage_prefix(key)
755            && shard
756                .session_slots
757                .get_ref_hashed_local(&session_prefix, route.key_hash, key)
758                .is_some()
759        {
760            return -1;
761        }
762        shard.map.ttl_seconds(key, now_ms)
763    }
764
765    #[cfg(feature = "embedded")]
766    pub(crate) fn local_pttl_millis(&mut self, key: &[u8]) -> i64 {
767        let route = self.route_key(key);
768        let now_ms = now_millis();
769        let shard = self.shard_for_route_mut(route.shard_id);
770        if let Some(session_prefix) = derived_session_storage_prefix(key)
771            && shard
772                .session_slots
773                .get_ref_hashed_local(&session_prefix, route.key_hash, key)
774                .is_some()
775        {
776            return -1;
777        }
778        shard.map.ttl_millis(key, now_ms)
779    }
780
781    #[cfg(feature = "embedded")]
782    pub(crate) fn local_expire(&mut self, key: &[u8], expire_at_ms: u64) -> bool {
783        let route = self.route_key(key);
784        let now_ms = now_millis();
785        let shard = self.shard_for_route_mut(route.shard_id);
786        if let Some(session_prefix) = derived_session_storage_prefix(key)
787            && shard
788                .session_slots
789                .get_ref_hashed_local(&session_prefix, route.key_hash, key)
790                .is_some()
791        {
792            return false;
793        }
794        shard.map.expire(key, expire_at_ms, now_ms)
795    }
796
797    #[cfg(feature = "embedded")]
798    pub(crate) fn local_persist(&mut self, key: &[u8]) -> bool {
799        let route = self.route_key(key);
800        let now_ms = now_millis();
801        let shard = self.shard_for_route_mut(route.shard_id);
802        if let Some(session_prefix) = derived_session_storage_prefix(key)
803            && shard
804                .session_slots
805                .get_ref_hashed_local(&session_prefix, route.key_hash, key)
806                .is_some()
807        {
808            return false;
809        }
810        shard.map.persist(key, now_ms)
811    }
812
813    #[inline(always)]
814    pub fn get_view_routed_no_ttl(
815        &mut self,
816        route: EmbeddedKeyRoute,
817        key: &[u8],
818    ) -> OwnedEmbeddedReadView {
819        let shard = self.shard_for_route_mut(route.shard_id);
820        let item = shard
821            .get_ref_no_ttl_hashed(route.key_hash, key)
822            .map(EmbeddedReadSlice::from_slice);
823        OwnedEmbeddedReadView { item }
824    }
825
826    #[inline(always)]
827    pub fn set_slice_routed_no_ttl(&mut self, route: EmbeddedKeyRoute, key: &[u8], value: &[u8]) {
828        self.shard_for_route_mut(route.shard_id)
829            .set_slice_hashed_no_ttl(route.key_hash, key, value);
830    }
831
832    pub fn batch_set_session_slices_no_ttl<I, K, V>(&mut self, session_prefix: &[u8], items: I)
833    where
834        I: IntoIterator<Item = (K, V)>,
835        K: AsRef<[u8]>,
836        V: AsRef<[u8]>,
837    {
838        let route = self.route_session(session_prefix);
839        let shard = self.shard_for_route_mut(route.shard_id);
840        for (key, value) in items {
841            let key = key.as_ref();
842            shard.set_session_slice_hashed_no_ttl(
843                session_prefix,
844                hash_key(key),
845                key,
846                value.as_ref(),
847            );
848        }
849    }
850
851    pub fn batch_set_session_packed_no_ttl(&mut self, packed: PackedSessionWrite) {
852        if packed.item_count() == 0 {
853            return;
854        }
855        let route = self.route_session(&packed.session_prefix);
856        let shard = self.shard_for_route_mut(route.shard_id);
857        for entry in packed.slab.entries.iter() {
858            shard.map.delete_hashed(entry.hash, &entry.key, 0);
859        }
860        shard.session_slots.replace_session_slab(packed);
861        shard.enforce_memory_limit(0);
862    }
863
864    pub fn batch_get_session_view_no_ttl(
865        &mut self,
866        session_prefix: &[u8],
867        keys: &[Bytes],
868    ) -> OwnedEmbeddedSessionBatchView {
869        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
870        self.batch_get_session_view_prehashed_no_ttl(session_prefix, keys, &key_hashes)
871    }
872
873    pub fn batch_get_session_view_prehashed_no_ttl(
874        &mut self,
875        session_prefix: &[u8],
876        keys: &[Bytes],
877        key_hashes: &[u64],
878    ) -> OwnedEmbeddedSessionBatchView {
879        assert_eq!(
880            keys.len(),
881            key_hashes.len(),
882            "keys and key_hashes must have matching lengths",
883        );
884        if keys.is_empty() {
885            return OwnedEmbeddedBatchReadView {
886                items: Vec::new(),
887                hit_count: 0,
888                total_bytes: 0,
889            };
890        }
891
892        let route = self.route_session(session_prefix);
893        let shard = self.shard_for_route_mut(route.shard_id);
894        let mut view = BatchReadViewBuilder::new(keys.len());
895        for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
896            view.push(shard.get_session_ref_hashed_no_ttl(session_prefix, key_hash, key));
897        }
898
899        view.finish_owned()
900    }
901
902    pub fn batch_get_session_view_routed_no_ttl(
903        &mut self,
904        route: EmbeddedSessionRoute,
905        keys: &[Bytes],
906    ) -> OwnedEmbeddedSessionBatchView {
907        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
908        self.batch_get_session_view_prehashed_routed_no_ttl(route, keys, &key_hashes)
909    }
910
911    pub fn batch_get_session_view_prehashed_routed_no_ttl(
912        &mut self,
913        route: EmbeddedSessionRoute,
914        keys: &[Bytes],
915        key_hashes: &[u64],
916    ) -> OwnedEmbeddedSessionBatchView {
917        assert_eq!(
918            keys.len(),
919            key_hashes.len(),
920            "keys and key_hashes must have matching lengths",
921        );
922        if keys.is_empty() {
923            return OwnedEmbeddedBatchReadView {
924                items: Vec::new(),
925                hit_count: 0,
926                total_bytes: 0,
927            };
928        }
929
930        let shard = self.shard_for_route_mut(route.shard_id);
931        let session_prefix = batch_derived_session_storage_prefix(keys);
932        let active_session_prefix = session_prefix
933            .as_ref()
934            .filter(|prefix| shard.session_slots.has_session(prefix))
935            .map(Vec::as_slice);
936
937        let mut view = BatchReadViewBuilder::new(keys.len());
938        for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
939            view.push(shard.get_ref_hashed_active_session_or_no_ttl_flat(
940                active_session_prefix,
941                key_hash,
942                key,
943            ));
944        }
945
946        view.finish_owned()
947    }
948
949    pub fn batch_get_session_packed_no_ttl(
950        &mut self,
951        session_prefix: &[u8],
952        keys: &[Bytes],
953    ) -> PackedBatch {
954        if keys.is_empty() {
955            return PackedBatch::default();
956        }
957
958        let route = self.route_session(session_prefix);
959        let shard = self.shard_for_route_mut(route.shard_id);
960        let mut packed = PackedBatchBuilder::new(keys.len());
961        for key in keys {
962            let key_hash = hash_key(key);
963            packed.push(shard.get_session_ref_hashed_no_ttl(session_prefix, key_hash, key));
964        }
965        packed.finish()
966    }
967
968    pub fn batch_get_session_packed_routed_no_ttl(
969        &mut self,
970        route: EmbeddedSessionRoute,
971        keys: &[Bytes],
972    ) -> PackedBatch {
973        if keys.is_empty() {
974            return PackedBatch::default();
975        }
976
977        let shard = self.shard_for_route_mut(route.shard_id);
978        let session_prefix = batch_derived_session_storage_prefix(keys);
979        let active_session_prefix = session_prefix
980            .as_ref()
981            .filter(|prefix| shard.session_slots.has_session(prefix))
982            .map(Vec::as_slice);
983        let mut packed = PackedBatchBuilder::new(keys.len());
984        for key in keys {
985            let key_hash = hash_key(key);
986            packed.push(shard.get_ref_hashed_active_session_or_no_ttl_flat(
987                active_session_prefix,
988                key_hash,
989                key,
990            ));
991        }
992        packed.finish()
993    }
994
995    pub fn len(&self) -> usize {
996        self.shards
997            .iter()
998            .map(|shard| shard.map.len().saturating_add(shard.session_slots.len()))
999            .sum()
1000    }
1001
1002    pub fn is_empty(&self) -> bool {
1003        self.len() == 0
1004    }
1005
1006    pub fn process_maintenance(&mut self) -> usize {
1007        let now_ms = now_millis();
1008        self.shards
1009            .iter_mut()
1010            .map(|shard| shard.map.process_maintenance(now_ms))
1011            .sum()
1012    }
1013
1014    pub fn restore_entries<I>(&mut self, entries: I)
1015    where
1016        I: IntoIterator<Item = StoredEntry>,
1017    {
1018        let now_ms = now_millis();
1019        for entry in entries {
1020            if entry
1021                .expire_at_ms
1022                .is_some_and(|expire_at_ms| expire_at_ms <= now_ms)
1023            {
1024                continue;
1025            }
1026            let route = self.route_key(&entry.key);
1027            let shard = self.shard_for_route_mut(route.shard_id);
1028            if let Some(session_prefix) = point_write_session_storage_prefix(&entry.key) {
1029                shard
1030                    .session_slots
1031                    .delete_hashed(&session_prefix, route.key_hash, &entry.key);
1032            }
1033            shard.map.set_hashed(
1034                route.key_hash,
1035                entry.key,
1036                entry.value,
1037                entry.expire_at_ms,
1038                now_ms,
1039            );
1040            shard.enforce_memory_limit(now_ms);
1041        }
1042    }
1043
1044    pub fn get(&mut self, key: &[u8]) -> Option<Bytes> {
1045        let now_ms = now_millis();
1046        let route = self.route_key(key);
1047        let shard = self.shard_for_route_mut(route.shard_id);
1048        if let Some(session_prefix) = derived_session_storage_prefix(key)
1049            && let Some(value) =
1050                shard
1051                    .session_slots
1052                    .get_ref_hashed(&session_prefix, route.key_hash, key)
1053        {
1054            return Some(value.to_vec());
1055        }
1056        shard
1057            .map
1058            .get_ref_hashed(route.key_hash, key, now_ms)
1059            .map(<[u8]>::to_vec)
1060    }
1061
1062    pub fn get_view(&mut self, key: &[u8]) -> OwnedEmbeddedReadView {
1063        let route = self.route_key(key);
1064        let now_ms = now_millis();
1065        let shard = self.shard_for_route_mut(route.shard_id);
1066        let item = if let Some(session_prefix) = derived_session_storage_prefix(key) {
1067            if shard.session_slots.has_session(&session_prefix) {
1068                shard
1069                    .session_slots
1070                    .get_ref_hashed(&session_prefix, route.key_hash, key)
1071                    .map(EmbeddedReadSlice::from_slice)
1072            } else {
1073                shard
1074                    .map
1075                    .get_ref_hashed(route.key_hash, key, now_ms)
1076                    .map(EmbeddedReadSlice::from_slice)
1077            }
1078        } else {
1079            shard
1080                .map
1081                .get_ref_hashed(route.key_hash, key, now_ms)
1082                .map(EmbeddedReadSlice::from_slice)
1083        };
1084        OwnedEmbeddedReadView { item }
1085    }
1086
1087    pub fn batch_get(&mut self, keys: Vec<Bytes>) -> Vec<Option<Bytes>> {
1088        let total = keys.len();
1089        if total == 0 {
1090            return Vec::new();
1091        }
1092
1093        #[cfg(feature = "telemetry")]
1094        let start = self.metrics.as_ref().map(|_| Instant::now());
1095        let now_ms = now_millis();
1096        let mut values = vec![None; total];
1097        let mut groups = vec![Vec::<(usize, Bytes, u64)>::new(); self.shards.len()];
1098        let mut touched = Vec::new();
1099
1100        for (index, key) in keys.into_iter().enumerate() {
1101            let route = self.route_key(&key);
1102            let local_index = self
1103                .shard_lookup
1104                .get(route.shard_id)
1105                .copied()
1106                .filter(|idx| *idx != usize::MAX)
1107                .expect("routed key does not belong to this owned worker");
1108            if groups[local_index].is_empty() {
1109                touched.push(route.shard_id);
1110            }
1111            groups[local_index].push((index, key, route.key_hash));
1112        }
1113
1114        for (local_index, batch) in groups.into_iter().enumerate() {
1115            if batch.is_empty() {
1116                continue;
1117            }
1118            let shard = &mut self.shards[local_index];
1119            for (index, key, key_hash) in batch {
1120                values[index] = shard
1121                    .get_ref_hashed_session_or_flat(key_hash, &key, now_ms)
1122                    .map(<[u8]>::to_vec);
1123            }
1124        }
1125
1126        #[cfg(feature = "telemetry")]
1127        self.record_batch_metrics(start, &touched);
1128        values
1129    }
1130
1131    pub fn batch_get_view(&mut self, keys: &[Bytes]) -> OwnedEmbeddedBatchReadView {
1132        let total = keys.len();
1133        if total == 0 {
1134            return OwnedEmbeddedBatchReadView {
1135                items: Vec::new(),
1136                hit_count: 0,
1137                total_bytes: 0,
1138            };
1139        }
1140
1141        #[cfg(feature = "telemetry")]
1142        let start = self.metrics.as_ref().map(|_| Instant::now());
1143        let now_ms = now_millis();
1144
1145        let mut groups = vec![Vec::<(usize, &Bytes, u64, usize)>::new(); self.shards.len()];
1146        let mut touched = Vec::new();
1147        for (index, key) in keys.iter().enumerate() {
1148            let route = self.route_key(key);
1149            let local_index = self
1150                .shard_lookup
1151                .get(route.shard_id)
1152                .copied()
1153                .filter(|idx| *idx != usize::MAX)
1154                .expect("routed key does not belong to this owned worker");
1155            if groups[local_index].is_empty() {
1156                touched.push(route.shard_id);
1157            }
1158            groups[local_index].push((index, key, route.key_hash, route.shard_id));
1159        }
1160
1161        let mut view = OrderedBatchReadViewBuilder::new(total);
1162
1163        for (local_index, batch) in groups.into_iter().enumerate() {
1164            if batch.is_empty() {
1165                continue;
1166            }
1167            let shard = &mut self.shards[local_index];
1168            for (index, key, key_hash, _shard_id) in batch {
1169                if let Some(value) =
1170                    shard.get_ref_hashed_published_session_or_flat(key_hash, key, now_ms)
1171                {
1172                    view.record_hit(index, value);
1173                }
1174            }
1175        }
1176
1177        #[cfg(feature = "telemetry")]
1178        self.record_batch_metrics(start, &touched);
1179        view.finish_owned()
1180    }
1181
1182    pub fn batch_get_session_view(
1183        &mut self,
1184        session_prefix: &[u8],
1185        keys: &[Bytes],
1186    ) -> OwnedEmbeddedSessionBatchView {
1187        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
1188        self.batch_get_session_view_prehashed(session_prefix, keys, &key_hashes)
1189    }
1190
1191    pub fn batch_get_session_view_prehashed(
1192        &mut self,
1193        session_prefix: &[u8],
1194        keys: &[Bytes],
1195        key_hashes: &[u64],
1196    ) -> OwnedEmbeddedSessionBatchView {
1197        assert_eq!(
1198            keys.len(),
1199            key_hashes.len(),
1200            "keys and key_hashes must have matching lengths",
1201        );
1202        if keys.is_empty() {
1203            return OwnedEmbeddedBatchReadView {
1204                items: Vec::new(),
1205                hit_count: 0,
1206                total_bytes: 0,
1207            };
1208        }
1209
1210        #[cfg(feature = "telemetry")]
1211        let start = self.metrics.as_ref().map(|_| Instant::now());
1212        let now_ms = now_millis();
1213        let route = self.route_session(session_prefix);
1214        let shard = self.shard_for_route_mut(route.shard_id);
1215        let active_session_prefix = shard
1216            .session_slots
1217            .has_session(session_prefix)
1218            .then_some(session_prefix);
1219
1220        let mut view = BatchReadViewBuilder::new(keys.len());
1221        for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
1222            view.push(shard.get_ref_hashed_active_session_or_flat(
1223                active_session_prefix,
1224                key_hash,
1225                key,
1226                now_ms,
1227            ));
1228        }
1229
1230        #[cfg(feature = "telemetry")]
1231        self.record_batch_metrics(start, &[route.shard_id]);
1232        view.finish_owned()
1233    }
1234
1235    pub fn batch_get_session_packed_view_prehashed(
1236        &mut self,
1237        session_prefix: &[u8],
1238        keys: &[Bytes],
1239        key_hashes: &[u64],
1240    ) -> Option<OwnedEmbeddedSessionPackedView> {
1241        assert_eq!(
1242            keys.len(),
1243            key_hashes.len(),
1244            "keys and key_hashes must have matching lengths",
1245        );
1246        if keys.is_empty() {
1247            return Some(OwnedEmbeddedSessionPackedView {
1248                buffer: EmbeddedReadSlice::from_slice(&[]),
1249                offsets: Vec::new(),
1250                lengths: Vec::new(),
1251                hit_count: 0,
1252                total_bytes: 0,
1253            });
1254        }
1255
1256        let route = self.route_session(session_prefix);
1257        let shard = self.shard_for_route_mut(route.shard_id);
1258        if !shard.session_slots.has_session(session_prefix) {
1259            return None;
1260        }
1261
1262        let meta =
1263            shard
1264                .session_slots
1265                .get_packed_view_hashed_local(session_prefix, keys, key_hashes)?;
1266
1267        Some(OwnedEmbeddedSessionPackedView {
1268            buffer: EmbeddedReadSlice { bytes: meta.buffer },
1269            offsets: meta.offsets,
1270            lengths: meta.lengths,
1271            hit_count: meta.hit_count,
1272            total_bytes: meta.total_bytes,
1273        })
1274    }
1275
1276    pub fn batch_get_session_packed_view(
1277        &mut self,
1278        session_prefix: &[u8],
1279        keys: &[Bytes],
1280    ) -> Option<OwnedEmbeddedSessionPackedView> {
1281        let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
1282        self.batch_get_session_packed_view_prehashed(session_prefix, keys, &key_hashes)
1283    }
1284
1285    pub fn batch_get_session_packed(
1286        &mut self,
1287        session_prefix: &[u8],
1288        keys: &[Bytes],
1289    ) -> PackedBatch {
1290        if keys.is_empty() {
1291            return PackedBatch::default();
1292        }
1293
1294        #[cfg(feature = "telemetry")]
1295        let start = self.metrics.as_ref().map(|_| Instant::now());
1296        let route = self.route_session(session_prefix);
1297        let now_ms = now_millis();
1298        let shard = self.shard_for_route_mut(route.shard_id);
1299        let active_session_prefix = shard
1300            .session_slots
1301            .has_session(session_prefix)
1302            .then_some(session_prefix);
1303        let mut packed = PackedBatchBuilder::new(keys.len());
1304        for key in keys {
1305            let key_hash = hash_key(key);
1306            packed.push(shard.get_ref_hashed_active_session_or_flat(
1307                active_session_prefix,
1308                key_hash,
1309                key,
1310                now_ms,
1311            ));
1312        }
1313        #[cfg(feature = "telemetry")]
1314        self.record_batch_metrics(start, &[route.shard_id]);
1315        packed.finish()
1316    }
1317
1318    pub fn batch_get_packed(&mut self, keys: &[Bytes]) -> PackedBatch {
1319        let total = keys.len();
1320        if total == 0 {
1321            return PackedBatch::default();
1322        }
1323
1324        #[cfg(feature = "telemetry")]
1325        let start = self.metrics.as_ref().map(|_| Instant::now());
1326        let now_ms = now_millis();
1327        let mut groups = vec![Vec::<(usize, &Bytes, u64, usize)>::new(); self.shards.len()];
1328        let mut touched = Vec::new();
1329        for (index, key) in keys.iter().enumerate() {
1330            let route = self.route_key(key);
1331            let local_index = self
1332                .shard_lookup
1333                .get(route.shard_id)
1334                .copied()
1335                .filter(|idx| *idx != usize::MAX)
1336                .expect("routed key does not belong to this owned worker");
1337            if groups[local_index].is_empty() {
1338                touched.push(route.shard_id);
1339            }
1340            groups[local_index].push((index, key, route.key_hash, route.shard_id));
1341        }
1342
1343        let mut packed = OrderedPackedBatchBuilder::new(total);
1344
1345        for (local_index, batch) in groups.into_iter().enumerate() {
1346            if batch.is_empty() {
1347                continue;
1348            }
1349            let shard = &mut self.shards[local_index];
1350            for (index, key, key_hash, _shard_id) in batch {
1351                if let Some(value) = shard.get_ref_hashed_session_or_flat(key_hash, key, now_ms) {
1352                    packed.record_hit(index, value);
1353                }
1354            }
1355        }
1356
1357        #[cfg(feature = "telemetry")]
1358        self.record_batch_metrics(start, &touched);
1359        packed.finish()
1360    }
1361
1362    pub fn set(&mut self, key: Bytes, value: Bytes, ttl_ms: Option<u64>) {
1363        let now_ms = now_millis();
1364        let route = self.route_key(&key);
1365        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
1366        let shard = self.shard_for_route_mut(route.shard_id);
1367        if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
1368            shard
1369                .session_slots
1370                .delete_hashed(&session_prefix, route.key_hash, &key);
1371        }
1372        shard
1373            .map
1374            .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
1375        shard.enforce_memory_limit(now_ms);
1376    }
1377
1378    pub fn batch_set(&mut self, items: Vec<(Bytes, Bytes)>, ttl_ms: Option<u64>) {
1379        if items.is_empty() {
1380            return;
1381        }
1382
1383        let now_ms = now_millis();
1384        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
1385        let mut groups = vec![Vec::<(Bytes, Bytes, EmbeddedKeyRoute)>::new(); self.shards.len()];
1386        for (key, value) in items {
1387            let route = self.route_key(&key);
1388            let local_index = self
1389                .shard_lookup
1390                .get(route.shard_id)
1391                .copied()
1392                .filter(|idx| *idx != usize::MAX)
1393                .expect("routed key does not belong to this owned worker");
1394            groups[local_index].push((key, value, route));
1395        }
1396
1397        for (local_index, batch) in groups.into_iter().enumerate() {
1398            if batch.is_empty() {
1399                continue;
1400            }
1401            let shard = &mut self.shards[local_index];
1402            for (key, value, route) in batch {
1403                if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
1404                    shard
1405                        .session_slots
1406                        .delete_hashed(&session_prefix, route.key_hash, &key);
1407                }
1408                shard
1409                    .map
1410                    .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
1411            }
1412            shard.enforce_memory_limit(now_ms);
1413        }
1414    }
1415
1416    pub fn batch_set_session_owned_no_ttl(
1417        &mut self,
1418        session_prefix: Bytes,
1419        items: Vec<(Bytes, Bytes)>,
1420    ) {
1421        if items.is_empty() {
1422            return;
1423        }
1424        self.local_batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
1425            session_prefix,
1426            items,
1427        ));
1428    }
1429
1430    pub fn delete(&mut self, key: &[u8]) -> bool {
1431        let now_ms = now_millis();
1432        let route = self.route_key(key);
1433        let shard = self.shard_for_route_mut(route.shard_id);
1434        if let Some(session_prefix) = derived_session_storage_prefix(key)
1435            && shard
1436                .session_slots
1437                .delete_hashed(&session_prefix, route.key_hash, key)
1438        {
1439            return true;
1440        }
1441        shard.map.delete_hashed(route.key_hash, key, now_ms)
1442    }
1443
1444    pub fn exists(&mut self, key: &[u8]) -> bool {
1445        self.get(key).is_some()
1446    }
1447
1448    #[cfg(feature = "telemetry")]
1449    #[inline(always)]
1450    fn record_batch_metrics(&self, start: Option<Instant>, touched_shards: &[usize]) {
1451        if let (Some(metrics), Some(start)) = (&self.metrics, start) {
1452            metrics.record_batch_get(start.elapsed().as_nanos() as u64);
1453            for &shard_id in touched_shards {
1454                metrics.record_batch_get_shard(shard_id);
1455            }
1456        }
1457    }
1458
1459    fn shard_for_route_mut(&mut self, shard_id: usize) -> &mut OwnedEmbeddedShard {
1460        let index = self
1461            .shard_lookup
1462            .get(shard_id)
1463            .copied()
1464            .filter(|index| *index != usize::MAX)
1465            .expect("routed key does not belong to this owned worker");
1466        &mut self.shards[index]
1467    }
1468}
1469
1470/// Long-lived worker-local read session for the absolute max embedded read path.
1471///
1472/// This keeps shard-local read epochs open across many routed lookups so the
1473/// caller can avoid begin/end-epoch overhead on every individual read. It is
1474/// intended for worker-owned read-heavy loops, not mixed read/write traffic.
1475#[derive(Debug)]
1476pub struct OwnedEmbeddedWorkerReadSession<'a> {
1477    pub(super) worker: &'a mut OwnedEmbeddedWorkerShards,
1478    pub(super) opened: Vec<bool>,
1479    pub(super) opened_indices: Vec<usize>,
1480}
1481
1482impl<'a> OwnedEmbeddedWorkerReadSession<'a> {
1483    #[inline(always)]
1484    fn shard_for_route_mut(&mut self, shard_id: usize) -> &mut OwnedEmbeddedShard {
1485        let index = self
1486            .worker
1487            .shard_lookup
1488            .get(shard_id)
1489            .copied()
1490            .filter(|index| *index != usize::MAX)
1491            .expect("routed key does not belong to this owned worker");
1492        if !self.opened[index] {
1493            self.worker.shards[index].map.begin_read_epoch();
1494            self.opened[index] = true;
1495            self.opened_indices.push(index);
1496        }
1497        &mut self.worker.shards[index]
1498    }
1499
1500    #[inline(always)]
1501    pub fn get_ref_no_ttl_routed(&mut self, route: EmbeddedKeyRoute, key: &[u8]) -> Option<&[u8]> {
1502        self.shard_for_route_mut(route.shard_id)
1503            .get_ref_no_ttl_hashed(route.key_hash, key)
1504    }
1505}
1506
1507impl Drop for OwnedEmbeddedWorkerReadSession<'_> {
1508    fn drop(&mut self) {
1509        for index in self.opened_indices.drain(..) {
1510            self.worker.shards[index].map.end_read_epoch();
1511        }
1512    }
1513}
1514
1515impl EmbeddedStore {
1516    pub fn bind_shard(&self, shard_id: usize) -> EmbeddedShardHandle<'_> {
1517        assert!(shard_id < self.shards.len(), "invalid shard id");
1518        EmbeddedShardHandle {
1519            shard_id,
1520            shard: self.shards[shard_id].write(),
1521        }
1522    }
1523
1524    pub fn into_owned_shards(self) -> Vec<OwnedEmbeddedShard> {
1525        self.shards
1526            .into_iter()
1527            .enumerate()
1528            .map(|(shard_id, shard)| {
1529                let shard = shard.into_inner().into_inner();
1530                OwnedEmbeddedShard {
1531                    shard_id,
1532                    map: shard.map,
1533                    session_slots: shard.session_slots,
1534                    memory_limit_bytes: shard.memory_limit_bytes,
1535                    eviction_policy: shard.eviction_policy,
1536                }
1537            })
1538            .collect()
1539    }
1540
1541    pub fn into_owned_workers(self, worker_count: usize) -> Vec<OwnedEmbeddedWorkerShards> {
1542        let worker_count = worker_count.max(1);
1543        let route_mode = self.route_mode;
1544        let shard_count = self.shard_count();
1545        #[cfg(feature = "telemetry")]
1546        let metrics = self.metrics.clone();
1547        let mut buckets = (0..worker_count)
1548            .map(|_| Vec::<OwnedEmbeddedShard>::new())
1549            .collect::<Vec<_>>();
1550
1551        for shard in self.into_owned_shards() {
1552            buckets[shard.shard_id() % worker_count].push(shard);
1553        }
1554
1555        buckets
1556            .into_iter()
1557            .map(|shards| {
1558                OwnedEmbeddedWorkerShards::new(
1559                    route_mode,
1560                    shard_count,
1561                    shards,
1562                    #[cfg(feature = "telemetry")]
1563                    metrics.clone(),
1564                )
1565            })
1566            .collect()
1567    }
1568}