Skip to main content

fast_cache/storage/embedded_store/
point.rs

1use super::*;
2
3impl EmbeddedStore {
4    /// Returns an owned copy of the value for `key`.
5    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
6        let now_ms = now_millis();
7        let route = self.route_key(key);
8        self.get_with_route(route, key, now_ms)
9    }
10
11    /// Single-threaded fused GET + RESP encode. Bypasses the per-shard
12    /// `RwLock` entirely via `data_ptr()` — saves ~5-10ns per GET in atomic
13    /// ops.
14    ///
15    /// # Safety
16    ///
17    /// The caller must guarantee that no other thread is concurrently accessing
18    /// any shard of this `EmbeddedStore`. This is true in the `sc=1`
19    /// multi-direct path where there is exactly one worker thread, but is
20    /// unsafe in any other configuration.
21    pub unsafe fn get_blob_string_into_single_threaded(
22        &self,
23        key: &[u8],
24        out: &mut bytes::BytesMut,
25    ) -> bool {
26        #[cfg(not(feature = "unsafe"))]
27        {
28            self.get_blob_string_into(key, out)
29        }
30        #[cfg(feature = "unsafe")]
31        {
32            let key_hash = hash_key(key);
33            // SAFETY: forwarded from this function's single-threaded contract.
34            unsafe { self.get_blob_string_hashed_into_single_threaded(key_hash, key, out) }
35        }
36    }
37
38    /// Prehashed variant of `get_blob_string_into_single_threaded` for the
39    /// RESP fast path, where the key bytes have already been validated.
40    ///
41    /// # Safety
42    ///
43    /// Same contract as `get_blob_string_into_single_threaded`.
44    pub unsafe fn get_blob_string_hashed_into_single_threaded(
45        &self,
46        key_hash: u64,
47        key: &[u8],
48        out: &mut bytes::BytesMut,
49    ) -> bool {
50        #[cfg(not(feature = "unsafe"))]
51        {
52            self.get_blob_string_hashed_into(key_hash, key, out)
53        }
54        #[cfg(feature = "unsafe")]
55        {
56            if self.shards.len() == 1 {
57                // SAFETY (per fn contract): no other thread is accessing this shard.
58                let shard = unsafe { &*self.shards[0].data_ptr() };
59                if can_skip_session_lookup(key, &shard.session_slots) {
60                    let value = if shard.map.has_no_ttl_entries() {
61                        shard.map.get_ref_hashed_shared_no_ttl(key_hash, key)
62                    } else {
63                        shard.map.get_ref_hashed_shared(key_hash, key, now_millis())
64                    };
65                    if let Some(value) = value {
66                        write_resp_blob_string_into(out, value);
67                        return true;
68                    }
69                    return false;
70                }
71            }
72
73            let route = self.route_key(key);
74            // SAFETY (per fn contract): no other thread is accessing this shard.
75            let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
76            // Skip `now_millis()` syscall when no TTL entries exist — the
77            // expiration filter is a no-op anyway.
78            let now_ms = if shard.map.has_no_ttl_entries() {
79                0
80            } else {
81                now_millis()
82            };
83            if uses_flat_key_storage(self.route_mode, key) {
84                if let Some(value) = shard.map.get_ref_hashed_shared(route.key_hash, key, now_ms) {
85                    write_resp_blob_string_into(out, value);
86                    return true;
87                }
88                return false;
89            }
90            // Session-prefixed keys: fall back to the locked path. Bench doesn't
91            // hit this.
92            self.get_blob_string_into(key, out)
93        }
94    }
95
96    /// Single-threaded SET that bypasses the per-shard `RwLock`.
97    ///
98    /// # Safety
99    ///
100    /// Same contract as `get_blob_string_into_single_threaded`.
101    pub unsafe fn set_single_threaded(&self, key: &[u8], value: &[u8], ttl_ms: Option<u64>) {
102        #[cfg(not(feature = "unsafe"))]
103        {
104            self.set(key.to_vec(), value.to_vec(), ttl_ms);
105        }
106        #[cfg(feature = "unsafe")]
107        {
108            let key_hash = hash_key(key);
109            // SAFETY: forwarded from this function's single-threaded contract.
110            unsafe { self.set_single_threaded_hashed(key_hash, key, value, ttl_ms) };
111        }
112    }
113
114    /// Prehashed variant of `set_single_threaded` for canonical RESP SET.
115    ///
116    /// # Safety
117    ///
118    /// Same contract as `set_single_threaded`.
119    pub unsafe fn set_single_threaded_hashed(
120        &self,
121        key_hash: u64,
122        key: &[u8],
123        value: &[u8],
124        ttl_ms: Option<u64>,
125    ) {
126        #[cfg(not(feature = "unsafe"))]
127        {
128            self.set_slice_prehashed(key_hash, key, value, ttl_ms);
129        }
130        #[cfg(feature = "unsafe")]
131        {
132            if self.shards.len() == 1 {
133                // SAFETY (per fn contract): no other thread is accessing this shard.
134                let shard = unsafe { &mut *self.shards[0].data_ptr() };
135                if ttl_ms.is_none()
136                    && self.route_mode == EmbeddedRouteMode::FullKey
137                    && shard.memory_limit_bytes.is_none()
138                    && shard.eviction_policy == EvictionPolicy::None
139                {
140                    // SAFETY: forwarded from this function's single-threaded
141                    // contract. The direct 1-shard hot path uses borrowed response
142                    // encoding, so stored value buffers are not cloned while SET
143                    // mutates them.
144                    unsafe { shard.map.set_slice_hashed_no_ttl_hot(key_hash, key, value) };
145                    return;
146                }
147                let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
148                let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
149                if let Some(session_prefix) = point_write_session_storage_prefix(key) {
150                    shard
151                        .session_slots
152                        .delete_hashed(&session_prefix, key_hash, key);
153                }
154                shard
155                    .map
156                    .set_slice_hashed(key_hash, key, value, expire_at_ms, now_ms);
157                return;
158            }
159
160            let route = self.route_key(key);
161            // SAFETY (per fn contract): no other thread is accessing this shard.
162            let shard = unsafe { &mut *self.shards[route.shard_id].data_ptr() };
163            let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
164            let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
165            if let Some(session_prefix) = point_write_session_storage_prefix(key) {
166                shard
167                    .session_slots
168                    .delete_hashed(&session_prefix, route.key_hash, key);
169            }
170            shard
171                .map
172                .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
173        }
174    }
175
176    /// FCNP single-shard SET path with both hashes supplied by the client.
177    ///
178    /// # Safety
179    ///
180    /// Same contract as `set_single_threaded_hashed`; additionally, `key_tag`
181    /// must equal `hash_key_tag_from_hash(key_hash)`.
182    #[inline(always)]
183    pub unsafe fn set_single_threaded_hashed_tagged_no_ttl_hot(
184        &self,
185        key_hash: u64,
186        key_tag: u64,
187        key: &[u8],
188        value: &[u8],
189    ) {
190        #[cfg(not(feature = "unsafe"))]
191        {
192            let _ = key_tag;
193            self.set_slice_prehashed(key_hash, key, value, None);
194        }
195        #[cfg(feature = "unsafe")]
196        {
197            debug_assert_eq!(self.shards.len(), 1);
198            debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
199            // SAFETY (per fn contract): no other thread is accessing this shard.
200            let shard = unsafe { &mut *self.shards[0].data_ptr() };
201            debug_assert_eq!(shard.memory_limit_bytes, None);
202            debug_assert_eq!(shard.eviction_policy, EvictionPolicy::None);
203            // SAFETY: forwarded from this function's single-threaded contract,
204            // with a caller-provided key tag matching the key bytes.
205            unsafe {
206                shard
207                    .map
208                    .set_slice_hashed_tagged_no_ttl_hot(key_hash, key_tag, key, value)
209            };
210        }
211    }
212
213    /// Fused GET + RESP-blob-string-encode for the multi-direct hot path.
214    /// Writes `$<len>\r\n<value>\r\n` directly into `out` under the shard read
215    /// lock, eliminating the `Vec<u8>` intermediate that the generic `get`
216    /// path's `.to_vec()` allocates. Returns `true` if the key existed.
217    pub fn get_blob_string_into(&self, key: &[u8], out: &mut bytes::BytesMut) -> bool {
218        let route = self.route_key(key);
219        self.get_blob_string_routed_into(route, key, out)
220    }
221
222    /// Prehashed fused GET + RESP encode. Used by the RESP fast path after it
223    /// validates the key bytes and computes the hash once.
224    pub fn get_blob_string_hashed_into(
225        &self,
226        key_hash: u64,
227        key: &[u8],
228        out: &mut bytes::BytesMut,
229    ) -> bool {
230        if can_route_with_key_hash(self.route_mode, self.shards.len(), key) {
231            let route = EmbeddedKeyRoute {
232                shard_id: self.route_hash(key_hash),
233                key_hash,
234            };
235            return self.get_blob_string_routed_into(route, key, out);
236        }
237
238        self.get_blob_string_into(key, out)
239    }
240
241    fn get_blob_string_routed_into(
242        &self,
243        route: EmbeddedKeyRoute,
244        key: &[u8],
245        out: &mut bytes::BytesMut,
246    ) -> bool {
247        use bytes::BufMut;
248        // Fast path: non-session keys answer from the read lock.
249        if uses_flat_key_storage(self.route_mode, key) {
250            let shard = self.shards[route.shard_id].read();
251            // Skip now_millis when no TTL entries — saves the vDSO call.
252            let now_ms = if shard.map.has_no_ttl_entries() {
253                0
254            } else {
255                now_millis()
256            };
257            if let Some(value) = shard.map.get_ref_hashed_shared(route.key_hash, key, now_ms) {
258                // Inline RESP blob string encode directly into out.
259                write_resp_blob_string_into(out, value);
260                return true;
261            }
262            return false;
263        }
264        // Session-prefixed keys go through the slow path with a write lock.
265        let mut shard = self.shards[route.shard_id].write();
266        if let Some(session_prefix) = derived_session_storage_prefix(key)
267            && let Some(value) =
268                shard
269                    .session_slots
270                    .get_ref_hashed(&session_prefix, route.key_hash, key)
271        {
272            out.put_u8(b'$');
273            let mut buf = itoa::Buffer::new();
274            out.extend_from_slice(buf.format(value.len()).as_bytes());
275            out.extend_from_slice(b"\r\n");
276            out.extend_from_slice(value);
277            out.extend_from_slice(b"\r\n");
278            return true;
279        }
280        let now_ms_session = now_millis();
281        if let Some(value) = shard
282            .map
283            .get_ref_hashed(route.key_hash, key, now_ms_session)
284        {
285            out.put_u8(b'$');
286            let mut buf = itoa::Buffer::new();
287            out.extend_from_slice(buf.format(value.len()).as_bytes());
288            out.extend_from_slice(b"\r\n");
289            out.extend_from_slice(value);
290            out.extend_from_slice(b"\r\n");
291            return true;
292        }
293        false
294    }
295
296    /// Prehashed slice SET for the RESP fast path. Avoids rehashing in
297    /// storage after parsing has already identified the key.
298    pub fn set_slice_prehashed(
299        &self,
300        key_hash: u64,
301        key: &[u8],
302        value: &[u8],
303        ttl_ms: Option<u64>,
304    ) {
305        if !can_route_with_key_hash(self.route_mode, self.shards.len(), key) {
306            self.set(key.to_vec(), value.to_vec(), ttl_ms);
307            return;
308        }
309
310        let route = EmbeddedKeyRoute {
311            shard_id: self.route_hash(key_hash),
312            key_hash,
313        };
314        if self.objects.has_objects() {
315            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
316            let mut shard = self.shards[route.shard_id].write();
317            if bucket.delete_any(key) {
318                self.objects.note_deleted(route.shard_id);
319            }
320            let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
321            let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
322            if let Some(session_prefix) = point_write_session_storage_prefix(key) {
323                shard
324                    .session_slots
325                    .delete_hashed(&session_prefix, route.key_hash, key);
326            }
327            shard
328                .map
329                .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
330            shard.enforce_memory_limit(now_ms);
331            return;
332        }
333        let mut shard = self.shards[route.shard_id].write();
334        let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
335        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
336        if let Some(session_prefix) = point_write_session_storage_prefix(key) {
337            shard
338                .session_slots
339                .delete_hashed(&session_prefix, route.key_hash, key);
340        }
341        shard
342            .map
343            .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
344        shard.enforce_memory_limit(now_ms);
345    }
346
347    /// Zero-copy `GET` for the multi-direct hot path. Returns the stored
348    /// `bytes::Bytes` directly (refcount-only clone of `FlatEntry.value`),
349    /// avoiding the `Vec<u8>` allocation that `get` performs to materialize
350    /// `Bytes = Vec<u8>` for the public API.
351    pub fn get_value_bytes(&self, key: &[u8]) -> Option<bytes::Bytes> {
352        let now_ms = now_millis();
353        let route = self.route_key(key);
354        self.get_value_bytes_routed(route, key, now_ms)
355    }
356
357    /// FastCodec GET path. The wire header carries a route hash so the server
358    /// can usually skip hashing the key again. For session-prefixed keys in
359    /// session route mode, the route hash is intentionally the session prefix
360    /// hash, not the full key hash, so we fall back to the generic lookup.
361    pub fn get_value_bytes_route_hashed(
362        &self,
363        route_hash: u64,
364        key: &[u8],
365    ) -> Option<bytes::Bytes> {
366        if can_use_route_hash_as_key_hash(self.route_mode, key) {
367            let route = EmbeddedKeyRoute {
368                shard_id: self.route_hash(route_hash),
369                key_hash: route_hash,
370            };
371            return self.get_value_bytes_routed(route, key, now_millis());
372        }
373
374        self.get_value_bytes(key)
375    }
376
377    /// FastCodec GET path that calls `write` while the value reference is
378    /// still protected by the shard read lock. This avoids the refcount bump
379    /// and drop from returning `bytes::Bytes` on the native GET hot path.
380    pub fn with_value_bytes_route_hashed<F>(
381        &self,
382        route_hash: u64,
383        key: &[u8],
384        mut write: F,
385    ) -> bool
386    where
387        F: FnMut(&[u8]),
388    {
389        if can_use_route_hash_as_key_hash(self.route_mode, key) {
390            let route = EmbeddedKeyRoute {
391                shard_id: self.route_hash(route_hash),
392                key_hash: route_hash,
393            };
394            return self.with_value_bytes_routed(route, key, &mut write);
395        }
396
397        if let Some(value) = self.get_value_bytes(key) {
398            write(value.as_ref());
399            true
400        } else {
401            false
402        }
403    }
404
405    /// Route-hashed GET path that exposes the stored `Bytes` object while the
406    /// shard lock is held. Callers can clone the `Bytes` handle for large
407    /// zero-copy writes, while small responses can still copy from the borrowed
408    /// value without an extra refcount bump.
409    pub fn with_shared_value_bytes_route_hashed<F>(
410        &self,
411        route_hash: u64,
412        key: &[u8],
413        mut write: F,
414    ) -> bool
415    where
416        F: FnMut(&bytes::Bytes),
417    {
418        if can_use_route_hash_as_key_hash(self.route_mode, key) {
419            let route = EmbeddedKeyRoute {
420                shard_id: self.route_hash(route_hash),
421                key_hash: route_hash,
422            };
423            return self.with_shared_value_bytes_routed(route, key, &mut write);
424        }
425
426        if let Some(value) = self.get_value_bytes(key) {
427            write(&value);
428            true
429        } else {
430            false
431        }
432    }
433
434    /// Full-key FCNP GET path for clients that provide the key hash, key tag,
435    /// and key length but omit the key bytes. This is safe for concurrent
436    /// multi-shard access because the value is exposed only while the shard
437    /// read lock is held.
438    pub fn with_shared_value_bytes_full_key_tagged_no_ttl<F>(
439        &self,
440        route_hash: u64,
441        key_tag: u64,
442        key_len: usize,
443        mut write: F,
444    ) -> bool
445    where
446        F: FnMut(&bytes::Bytes),
447    {
448        if self.route_mode != EmbeddedRouteMode::FullKey {
449            return false;
450        }
451        let route = EmbeddedKeyRoute {
452            shard_id: self.route_hash(route_hash),
453            key_hash: route_hash,
454        };
455        let shard = self.shards[route.shard_id].read();
456        if !shard.map.has_no_ttl_entries() {
457            return false;
458        }
459        if let Some(value) =
460            shard
461                .map
462                .get_shared_value_bytes_hashed_tagged_no_ttl(route.key_hash, key_tag, key_len)
463        {
464            write(value);
465            true
466        } else {
467            false
468        }
469    }
470
471    /// Native GETEX path for v2. Returns the value and applies the TTL while
472    /// holding the target shard write lock.
473    pub fn getex_value_bytes_route_hashed(
474        &self,
475        route_hash: Option<u64>,
476        key: &[u8],
477        ttl_ms: u64,
478    ) -> Option<bytes::Bytes> {
479        if let Some(key_hash) = route_hash
480            && can_use_route_hash_as_key_hash(self.route_mode, key)
481        {
482            let route = EmbeddedKeyRoute {
483                shard_id: self.route_hash(key_hash),
484                key_hash,
485            };
486            return self.getex_value_bytes_routed(route, key, ttl_ms);
487        }
488
489        let route = self.route_key(key);
490        self.getex_value_bytes_routed(route, key, ttl_ms)
491    }
492
493    /// Native GETEX path that writes the value while it is borrowed from the
494    /// shard. This avoids the `Bytes` clone/drop pair in the returning GETEX
495    /// helper.
496    pub fn with_getex_value_bytes_route_hashed<F>(
497        &self,
498        route_hash: Option<u64>,
499        key: &[u8],
500        ttl_ms: u64,
501        mut write: F,
502    ) -> bool
503    where
504        F: FnMut(&[u8]),
505    {
506        if let Some(key_hash) = route_hash
507            && can_use_route_hash_as_key_hash(self.route_mode, key)
508        {
509            let route = EmbeddedKeyRoute {
510                shard_id: self.route_hash(key_hash),
511                key_hash,
512            };
513            return self.with_getex_value_bytes_routed(route, key, ttl_ms, &mut write);
514        }
515
516        let route = self.route_key(key);
517        self.with_getex_value_bytes_routed(route, key, ttl_ms, &mut write)
518    }
519
520    /// Single-threaded FastCodec GET path. Avoids shard `RwLock` atomics in
521    /// the same direct sc=1 setup as the RESP fused single-threaded path.
522    ///
523    /// # Safety
524    ///
525    /// The caller must guarantee that no other thread is concurrently accessing
526    /// any shard of this `EmbeddedStore`.
527    pub unsafe fn get_value_bytes_route_hashed_single_threaded(
528        &self,
529        route_hash: u64,
530        key: &[u8],
531    ) -> Option<bytes::Bytes> {
532        #[cfg(not(feature = "unsafe"))]
533        {
534            self.get_value_bytes_route_hashed(route_hash, key)
535        }
536        #[cfg(feature = "unsafe")]
537        {
538            if can_use_route_hash_as_key_hash(self.route_mode, key) {
539                if self.shards.len() == 1 {
540                    // SAFETY (per fn contract): no other thread is accessing this shard.
541                    let shard = unsafe { &*self.shards[0].data_ptr() };
542                    if can_skip_session_lookup(key, &shard.session_slots) {
543                        let now_ms = if shard.map.has_no_ttl_entries() {
544                            0
545                        } else {
546                            now_millis()
547                        };
548                        return shard.map.get_value_bytes_hashed(route_hash, key, now_ms);
549                    }
550                }
551
552                let route = EmbeddedKeyRoute {
553                    shard_id: self.route_hash(route_hash),
554                    key_hash: route_hash,
555                };
556                // SAFETY (per fn contract): no other thread is accessing this shard.
557                let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
558                if uses_flat_key_storage(self.route_mode, key) {
559                    let now_ms = if shard.map.has_no_ttl_entries() {
560                        0
561                    } else {
562                        now_millis()
563                    };
564                    return shard
565                        .map
566                        .get_value_bytes_hashed(route.key_hash, key, now_ms);
567                }
568            }
569
570            self.get_value_bytes(key)
571        }
572    }
573
574    /// Single-threaded variant of `with_value_bytes_route_hashed`.
575    ///
576    /// # Safety
577    ///
578    /// The caller must guarantee that no other thread is concurrently accessing
579    /// any shard of this `EmbeddedStore`.
580    pub unsafe fn with_value_bytes_route_hashed_single_threaded<F>(
581        &self,
582        route_hash: u64,
583        key: &[u8],
584        write: F,
585    ) -> bool
586    where
587        F: FnMut(&[u8]),
588    {
589        #[cfg(not(feature = "unsafe"))]
590        {
591            self.with_value_bytes_route_hashed(route_hash, key, write)
592        }
593        #[cfg(feature = "unsafe")]
594        {
595            let mut write = write;
596            if can_use_route_hash_as_key_hash(self.route_mode, key) {
597                if self.shards.len() == 1 {
598                    // SAFETY (per fn contract): no other thread is accessing this shard.
599                    let shard = unsafe { &*self.shards[0].data_ptr() };
600                    if can_skip_session_lookup(key, &shard.session_slots) {
601                        let value = if shard.map.has_no_ttl_entries() {
602                            shard.map.get_ref_hashed_shared_no_ttl(route_hash, key)
603                        } else {
604                            shard
605                                .map
606                                .get_ref_hashed_shared(route_hash, key, now_millis())
607                        };
608                        if let Some(value) = value {
609                            write(value);
610                            return true;
611                        }
612                        return false;
613                    }
614                }
615
616                let route = EmbeddedKeyRoute {
617                    shard_id: self.route_hash(route_hash),
618                    key_hash: route_hash,
619                };
620                // SAFETY (per fn contract): no other thread is accessing this shard.
621                let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
622                if uses_flat_key_storage(self.route_mode, key) {
623                    let value = if shard.map.has_no_ttl_entries() {
624                        shard.map.get_ref_hashed_shared_no_ttl(route.key_hash, key)
625                    } else {
626                        shard
627                            .map
628                            .get_ref_hashed_shared(route.key_hash, key, now_millis())
629                    };
630                    if let Some(value) = value {
631                        write(value);
632                        return true;
633                    }
634                    return false;
635                }
636            }
637
638            self.with_value_bytes_route_hashed(route_hash, key, write)
639        }
640    }
641
642    /// Single-threaded FCNP GET helper that exposes the stored `Bytes` object
643    /// to the caller. This lets the network path clone only the refcount for
644    /// large zero-copy writes, while keeping small values on the borrowed
645    /// inline-copy path.
646    ///
647    /// # Safety
648    ///
649    /// The caller must guarantee that no other thread is concurrently accessing
650    /// any shard of this `EmbeddedStore`.
651    pub unsafe fn with_shared_value_bytes_route_hashed_single_threaded<F>(
652        &self,
653        route_hash: u64,
654        key: &[u8],
655        write: F,
656    ) -> bool
657    where
658        F: FnMut(&bytes::Bytes),
659    {
660        #[cfg(not(feature = "unsafe"))]
661        {
662            self.with_shared_value_bytes_route_hashed(route_hash, key, write)
663        }
664        #[cfg(feature = "unsafe")]
665        {
666            let mut write = write;
667            if can_use_route_hash_as_key_hash(self.route_mode, key) {
668                if self.shards.len() == 1 {
669                    // SAFETY (per fn contract): no other thread is accessing this shard.
670                    let shard = unsafe { &*self.shards[0].data_ptr() };
671                    if can_skip_session_lookup(key, &shard.session_slots) {
672                        if shard.map.has_no_ttl_entries() {
673                            return shard.map.with_shared_value_bytes_hashed_no_ttl(
674                                route_hash, key, &mut write,
675                            );
676                        }
677                        return shard.map.with_shared_value_bytes_hashed(
678                            route_hash,
679                            key,
680                            now_millis(),
681                            &mut write,
682                        );
683                    }
684                }
685
686                let route = EmbeddedKeyRoute {
687                    shard_id: self.route_hash(route_hash),
688                    key_hash: route_hash,
689                };
690                // SAFETY (per fn contract): no other thread is accessing this shard.
691                let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
692                if uses_flat_key_storage(self.route_mode, key) {
693                    if shard.map.has_no_ttl_entries() {
694                        return shard.map.with_shared_value_bytes_hashed_no_ttl(
695                            route.key_hash,
696                            key,
697                            &mut write,
698                        );
699                    }
700                    return shard.map.with_shared_value_bytes_hashed(
701                        route.key_hash,
702                        key,
703                        now_millis(),
704                        &mut write,
705                    );
706                }
707            }
708
709            if let Some(value) = self.get_value_bytes(key) {
710                write(&value);
711                true
712            } else {
713                false
714            }
715        }
716    }
717
718    /// 1-shard full-key FCNP GET helper for the hottest native path. This skips
719    /// route-mode, shard-count, and session-slot checks that the general routed
720    /// helper must keep.
721    ///
722    /// # Safety
723    ///
724    /// The caller must guarantee that this store has exactly one shard, uses
725    /// `EmbeddedRouteMode::FullKey`, and no other thread is concurrently
726    /// accessing the shard.
727    #[inline(always)]
728    pub unsafe fn with_shared_value_bytes_full_key_single_threaded<F>(
729        &self,
730        key_hash: u64,
731        key: &[u8],
732        write: F,
733    ) -> bool
734    where
735        F: FnMut(&bytes::Bytes),
736    {
737        #[cfg(not(feature = "unsafe"))]
738        {
739            self.with_shared_value_bytes_route_hashed(key_hash, key, write)
740        }
741        #[cfg(feature = "unsafe")]
742        {
743            let mut write = write;
744            debug_assert_eq!(self.shards.len(), 1);
745            debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
746
747            // SAFETY (per fn contract): no other thread is accessing this shard.
748            let shard = unsafe { &*self.shards[0].data_ptr() };
749            if shard.map.has_no_ttl_entries() {
750                return shard
751                    .map
752                    .with_shared_value_bytes_hashed_no_ttl(key_hash, key, &mut write);
753            }
754            shard
755                .map
756                .with_shared_value_bytes_hashed(key_hash, key, now_millis(), &mut write)
757        }
758    }
759
760    /// 1-shard full-key FCNP GET helper that returns the stored `Bytes`
761    /// reference directly. This keeps the hottest native GET path out of the
762    /// closure-based wrapper used by the general routed path.
763    ///
764    /// # Safety
765    ///
766    /// The caller must guarantee that this store has exactly one shard, uses
767    /// `EmbeddedRouteMode::FullKey`, and no other thread is concurrently
768    /// accessing the shard.
769    #[inline(always)]
770    pub unsafe fn get_shared_value_bytes_full_key_single_threaded(
771        &self,
772        key_hash: u64,
773        key: &[u8],
774    ) -> Option<&bytes::Bytes> {
775        #[cfg(not(feature = "unsafe"))]
776        {
777            let _ = (key_hash, key);
778            None
779        }
780        #[cfg(feature = "unsafe")]
781        {
782            debug_assert_eq!(self.shards.len(), 1);
783            debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
784
785            // SAFETY (per fn contract): no other thread is accessing this shard.
786            let shard = unsafe { &*self.shards[0].data_ptr() };
787            if shard.map.has_no_ttl_entries() {
788                return shard
789                    .map
790                    .get_shared_value_bytes_hashed_no_ttl(key_hash, key);
791            }
792            shard
793                .map
794                .get_shared_value_bytes_hashed(key_hash, key, now_millis())
795        }
796    }
797
798    /// 1-shard full-key FCNP GET helper for clients that supply both the key
799    /// hash and key tag. The native protocol treats that tuple plus key length
800    /// as the lookup identity, avoiding a key-byte compare in the hottest path.
801    ///
802    /// # Safety
803    ///
804    /// The caller must guarantee that this store has exactly one shard, uses
805    /// `EmbeddedRouteMode::FullKey`, and no other thread is concurrently
806    /// accessing the shard. `key_tag` must be the tag for the key bytes that
807    /// produced `key_hash`.
808    #[inline(always)]
809    pub unsafe fn get_shared_value_bytes_full_key_tagged_single_threaded(
810        &self,
811        key_hash: u64,
812        key_tag: u64,
813        key_len: usize,
814    ) -> Option<&bytes::Bytes> {
815        #[cfg(not(feature = "unsafe"))]
816        {
817            let _ = (key_hash, key_tag, key_len);
818            None
819        }
820        #[cfg(feature = "unsafe")]
821        {
822            debug_assert_eq!(self.shards.len(), 1);
823            debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
824
825            // SAFETY (per fn contract): no other thread is accessing this shard.
826            let shard = unsafe { &*self.shards[0].data_ptr() };
827            if shard.map.has_no_ttl_entries() {
828                return shard
829                    .map
830                    .get_shared_value_bytes_hashed_tagged_no_ttl(key_hash, key_tag, key_len);
831            }
832            None
833        }
834    }
835
836    /// Safe shard-owned full-key GET helper for direct-shard server workers.
837    ///
838    /// This keeps safe builds on the normal shard `RwLock`, but avoids the
839    /// generic route/session fallback after the FCNP frame has already proven
840    /// that the request belongs to `shard_id`.
841    #[inline(always)]
842    pub fn with_shared_value_bytes_full_key_owned_shard_no_ttl<F>(
843        &self,
844        shard_id: usize,
845        key_hash: u64,
846        key: &[u8],
847        write: F,
848    ) -> Option<bool>
849    where
850        F: FnMut(&bytes::Bytes),
851    {
852        if self.route_mode != EmbeddedRouteMode::FullKey
853            || shard_id >= self.shards.len()
854            || self.route_hash(key_hash) != shard_id
855        {
856            return None;
857        }
858
859        let shard = self.shards[shard_id].read();
860        if !shard.map.has_no_ttl_entries() {
861            return None;
862        }
863
864        let mut write = write;
865        Some(
866            shard
867                .map
868                .with_shared_value_bytes_hashed_no_ttl(key_hash, key, &mut write),
869        )
870    }
871
872    /// Shard-owned full-key FCNP GET helper for direct-shard server workers.
873    ///
874    /// This is the multi-shard counterpart to
875    /// `get_shared_value_bytes_full_key_tagged_single_threaded`: one server
876    /// worker owns `shard_id`, the client supplies the full-key hash and tag,
877    /// and the lookup can skip both the shard lock and the key-byte compare.
878    ///
879    /// Returns `None` when the hot path is not valid, such as TTL-bearing
880    /// shards, invalid shard ids, non-full-key routing, or a route mismatch.
881    /// Otherwise returns `Some(true)` for a hit and `Some(false)` for a miss.
882    ///
883    /// # Safety
884    ///
885    /// The caller must guarantee that no other thread is concurrently accessing
886    /// `shard_id`, that `key_hash` routes to `shard_id`, and that `key_tag`
887    /// matches the key bytes that produced `key_hash`.
888    #[inline(always)]
889    pub unsafe fn with_shared_value_bytes_full_key_tagged_owned_shard_no_ttl<F>(
890        &self,
891        shard_id: usize,
892        key_hash: u64,
893        key_tag: u64,
894        key_len: usize,
895        write: F,
896    ) -> Option<bool>
897    where
898        F: FnMut(&bytes::Bytes),
899    {
900        #[cfg(not(feature = "unsafe"))]
901        {
902            let _ = (shard_id, key_hash, key_tag, key_len, write);
903            None
904        }
905        #[cfg(feature = "unsafe")]
906        {
907            if self.route_mode != EmbeddedRouteMode::FullKey
908                || shard_id >= self.shards.len()
909                || self.route_hash(key_hash) != shard_id
910            {
911                return None;
912            }
913
914            let mut write = write;
915            // SAFETY (per fn contract): this direct worker owns `shard_id`.
916            let shard = unsafe { &*self.shards[shard_id].data_ptr() };
917            if !shard.map.has_no_ttl_entries() {
918                return None;
919            }
920            if let Some(value) = shard
921                .map
922                .get_shared_value_bytes_hashed_tagged_no_ttl(key_hash, key_tag, key_len)
923            {
924                write(value);
925                Some(true)
926            } else {
927                Some(false)
928            }
929        }
930    }
931
932    /// Shard-owned session-prefix GET helper for direct-shard server workers.
933    ///
934    /// Session slabs store borrowed value slices instead of `Bytes`, so this
935    /// helper writes through a slice callback and avoids the generic fallback's
936    /// `Bytes::copy_from_slice` materialization. The caller has already
937    /// validated that `session_prefix` routes to `shard_id`; this helper only
938    /// checks storage preconditions and performs the lookup.
939    ///
940    /// # Safety
941    ///
942    /// When built with the `unsafe` feature, the caller must guarantee no other
943    /// thread is concurrently accessing `shard_id`. Safe builds keep the shard
944    /// read lock and therefore only require the route metadata to be valid.
945    #[inline(always)]
946    pub unsafe fn with_session_value_slice_owned_shard_no_ttl_prevalidated<F>(
947        &self,
948        shard_id: usize,
949        session_hash: u64,
950        key_hash: u64,
951        session_prefix: &[u8],
952        key: &[u8],
953        write: F,
954    ) -> Option<bool>
955    where
956        F: FnMut(&[u8]),
957    {
958        if self.route_mode != EmbeddedRouteMode::SessionPrefix || shard_id >= self.shards.len() {
959            return None;
960        }
961        let mut write = write;
962
963        #[cfg(not(feature = "unsafe"))]
964        {
965            let shard = self.shards[shard_id].read();
966            match shard.get_session_ref_hashed_shared_no_ttl_prehashed(
967                session_hash,
968                session_prefix,
969                key_hash,
970                key,
971            ) {
972                Some(value) => {
973                    write(value);
974                    Some(true)
975                }
976                None => Some(false),
977            }
978        }
979        #[cfg(feature = "unsafe")]
980        {
981            // SAFETY (per fn contract): this direct worker owns `shard_id`.
982            let shard = unsafe { &*self.shards[shard_id].data_ptr() };
983            match shard.get_session_ref_hashed_shared_no_ttl_prehashed(
984                session_hash,
985                session_prefix,
986                key_hash,
987                key,
988            ) {
989                Some(value) => {
990                    write(value);
991                    Some(true)
992                }
993                None => Some(false),
994            }
995        }
996    }
997
998    /// Safe shard-owned no-TTL session SET helper for direct-shard workers.
999    ///
1000    /// The caller has already validated that `session_prefix` routes to
1001    /// `shard_id`; this helper only enforces storage preconditions and performs
1002    /// the session-slab write.
1003    #[inline(always)]
1004    pub fn set_session_slice_hashed_owned_shard_no_ttl_prevalidated(
1005        &self,
1006        shard_id: usize,
1007        session_hash: u64,
1008        key_hash: u64,
1009        session_prefix: &[u8],
1010        key: &[u8],
1011        value: &[u8],
1012    ) -> bool {
1013        if shard_id >= self.shards.len() || self.objects.has_objects() {
1014            return false;
1015        }
1016
1017        let mut shard = self.shards[shard_id].write();
1018        if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1019            return false;
1020        }
1021        shard.set_session_slice_hashed_no_ttl_prehashed(
1022            session_hash,
1023            session_prefix,
1024            key_hash,
1025            key,
1026            value,
1027        );
1028        true
1029    }
1030
1031    /// Shard-owned no-TTL session SET helper for direct-shard workers.
1032    ///
1033    /// # Safety
1034    ///
1035    /// The caller must guarantee exclusive access to `shard_id` and that the
1036    /// session prefix routes to that shard.
1037    #[inline(always)]
1038    pub unsafe fn set_session_slice_hashed_owned_shard_no_ttl_hot_prevalidated(
1039        &self,
1040        shard_id: usize,
1041        session_hash: u64,
1042        key_hash: u64,
1043        session_prefix: &[u8],
1044        key: &[u8],
1045        value: &[u8],
1046    ) -> bool {
1047        #[cfg(not(feature = "unsafe"))]
1048        {
1049            let _ = (shard_id, session_hash, key_hash, session_prefix, key, value);
1050            false
1051        }
1052        #[cfg(feature = "unsafe")]
1053        {
1054            if shard_id >= self.shards.len() || self.objects.has_objects() {
1055                return false;
1056            }
1057
1058            // SAFETY (per fn contract): this direct worker owns `shard_id`.
1059            let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1060            if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1061                return false;
1062            }
1063            shard.set_session_slice_hashed_no_ttl_prehashed(
1064                session_hash,
1065                session_prefix,
1066                key_hash,
1067                key,
1068                value,
1069            );
1070            true
1071        }
1072    }
1073
1074    /// Safe shard-owned no-TTL SET helper for direct-shard server workers.
1075    ///
1076    /// The helper still takes the shard write lock. It only specializes the
1077    /// routing and no-TTL storage path when the worker-owned shard checks have
1078    /// already succeeded; unsupported cases return `false` so callers can
1079    /// preserve the generic fallback behavior.
1080    #[inline(always)]
1081    pub fn set_slice_hashed_tagged_owned_shard_no_ttl(
1082        &self,
1083        shard_id: usize,
1084        key_hash: u64,
1085        key_tag: u64,
1086        key: &[u8],
1087        value: &[u8],
1088    ) -> bool {
1089        if shard_id >= self.shards.len() || self.objects.has_objects() {
1090            return false;
1091        }
1092
1093        let mut shard = self.shards[shard_id].write();
1094        if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1095            return false;
1096        }
1097        match self.route_mode {
1098            EmbeddedRouteMode::FullKey => {
1099                if self.route_hash(key_hash) != shard_id
1100                    || point_write_session_storage_prefix(key).is_some()
1101                {
1102                    return false;
1103                }
1104                shard
1105                    .map
1106                    .set_slice_hashed_tagged_no_ttl_local(key_hash, key_tag, key, value);
1107            }
1108            EmbeddedRouteMode::SessionPrefix => {
1109                let Some(session_prefix) = point_write_session_storage_prefix(key) else {
1110                    return false;
1111                };
1112                let route = compute_key_route(self.route_mode, self.shift, key);
1113                if route.shard_id != shard_id || route.key_hash != key_hash {
1114                    return false;
1115                }
1116                shard.set_session_slice_hashed_no_ttl(&session_prefix, key_hash, key, value);
1117            }
1118        }
1119        true
1120    }
1121
1122    /// Shard-owned full-key FCNP SET helper for direct-shard server workers.
1123    ///
1124    /// # Safety
1125    ///
1126    /// The caller must guarantee exclusive access to `shard_id`, that
1127    /// `key_hash` routes to `shard_id`, and that `key_tag` matches `key`.
1128    #[inline(always)]
1129    pub unsafe fn set_slice_hashed_tagged_owned_shard_no_ttl_hot(
1130        &self,
1131        shard_id: usize,
1132        key_hash: u64,
1133        key_tag: u64,
1134        key: &[u8],
1135        value: &[u8],
1136    ) -> bool {
1137        #[cfg(not(feature = "unsafe"))]
1138        {
1139            let _ = (shard_id, key_hash, key_tag, key, value);
1140            false
1141        }
1142        #[cfg(feature = "unsafe")]
1143        {
1144            if shard_id >= self.shards.len() {
1145                return false;
1146            }
1147
1148            // SAFETY (per fn contract): this direct worker owns `shard_id`.
1149            let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1150            if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1151                return false;
1152            }
1153            match self.route_mode {
1154                EmbeddedRouteMode::FullKey => {
1155                    if self.route_hash(key_hash) != shard_id
1156                        || point_write_session_storage_prefix(key).is_some()
1157                    {
1158                        return false;
1159                    }
1160                    // SAFETY: forwarded from this function's worker-local ownership contract,
1161                    // with a caller-provided key tag matching the key bytes.
1162                    unsafe {
1163                        shard
1164                            .map
1165                            .set_slice_hashed_tagged_no_ttl_hot(key_hash, key_tag, key, value)
1166                    };
1167                }
1168                EmbeddedRouteMode::SessionPrefix => {
1169                    let Some(session_prefix) = point_write_session_storage_prefix(key) else {
1170                        return false;
1171                    };
1172                    let route = compute_key_route(self.route_mode, self.shift, key);
1173                    if route.shard_id != shard_id || route.key_hash != key_hash {
1174                        return false;
1175                    }
1176                    shard.set_session_slice_hashed_no_ttl(&session_prefix, key_hash, key, value);
1177                }
1178            }
1179            true
1180        }
1181    }
1182
1183    /// Single-threaded native GETEX path.
1184    ///
1185    /// # Safety
1186    ///
1187    /// The caller must guarantee that no other thread is concurrently accessing
1188    /// any shard of this `EmbeddedStore`.
1189    pub unsafe fn getex_value_bytes_route_hashed_single_threaded(
1190        &self,
1191        route_hash: Option<u64>,
1192        key: &[u8],
1193        ttl_ms: u64,
1194    ) -> Option<bytes::Bytes> {
1195        #[cfg(not(feature = "unsafe"))]
1196        {
1197            self.getex_value_bytes_route_hashed(route_hash, key, ttl_ms)
1198        }
1199        #[cfg(feature = "unsafe")]
1200        {
1201            let now_ms = now_millis();
1202            let expire_at_ms = now_ms.saturating_add(ttl_ms);
1203            if let Some(key_hash) = route_hash
1204                && can_use_route_hash_as_key_hash(self.route_mode, key)
1205            {
1206                let shard_id = self.route_hash(key_hash);
1207                // SAFETY (per fn contract): no other thread is accessing this shard.
1208                let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1209                if uses_flat_key_storage(self.route_mode, key) {
1210                    return shard.map.get_value_bytes_hashed_and_expire(
1211                        key_hash,
1212                        key,
1213                        expire_at_ms,
1214                        now_ms,
1215                    );
1216                }
1217            }
1218
1219            self.getex_value_bytes_route_hashed(route_hash, key, ttl_ms)
1220        }
1221    }
1222
1223    /// Single-threaded borrowed native GETEX path.
1224    ///
1225    /// # Safety
1226    ///
1227    /// The caller must guarantee that no other thread is concurrently accessing
1228    /// any shard of this `EmbeddedStore`.
1229    pub unsafe fn with_getex_value_bytes_route_hashed_single_threaded<F>(
1230        &self,
1231        route_hash: Option<u64>,
1232        key: &[u8],
1233        ttl_ms: u64,
1234        write: F,
1235    ) -> bool
1236    where
1237        F: FnMut(&[u8]),
1238    {
1239        #[cfg(not(feature = "unsafe"))]
1240        {
1241            self.with_getex_value_bytes_route_hashed(route_hash, key, ttl_ms, write)
1242        }
1243        #[cfg(feature = "unsafe")]
1244        {
1245            let mut write = write;
1246            let now_ms = now_millis();
1247            let expire_at_ms = now_ms.saturating_add(ttl_ms);
1248            if let Some(key_hash) = route_hash
1249                && can_use_route_hash_as_key_hash(self.route_mode, key)
1250            {
1251                let shard_id = self.route_hash(key_hash);
1252                // SAFETY (per fn contract): no other thread is accessing this shard.
1253                let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1254                if uses_flat_key_storage(self.route_mode, key) {
1255                    return shard.map.with_value_bytes_hashed_and_expire(
1256                        key_hash,
1257                        key,
1258                        expire_at_ms,
1259                        now_ms,
1260                        &mut write,
1261                    );
1262                }
1263            }
1264
1265            self.with_getex_value_bytes_route_hashed(route_hash, key, ttl_ms, write)
1266        }
1267    }
1268
1269    fn get_value_bytes_routed(
1270        &self,
1271        route: EmbeddedKeyRoute,
1272        key: &[u8],
1273        now_ms: u64,
1274    ) -> Option<bytes::Bytes> {
1275        if uses_flat_key_storage(self.route_mode, key) {
1276            let shard = self.shards[route.shard_id].read();
1277            return shard
1278                .map
1279                .get_value_bytes_hashed(route.key_hash, key, now_ms);
1280        }
1281        // Session-prefixed keys still go through the legacy path (write lock,
1282        // value copied). Bench workload doesn't use session keys.
1283        let mut shard = self.shards[route.shard_id].write();
1284        if let Some(session_prefix) = derived_session_storage_prefix(key)
1285            && let Some(value) =
1286                shard
1287                    .session_slots
1288                    .get_ref_hashed(&session_prefix, route.key_hash, key)
1289        {
1290            return Some(bytes::Bytes::copy_from_slice(value));
1291        }
1292        shard
1293            .map
1294            .get_value_bytes_hashed(route.key_hash, key, now_ms)
1295    }
1296
1297    fn with_value_bytes_routed<F>(&self, route: EmbeddedKeyRoute, key: &[u8], write: &mut F) -> bool
1298    where
1299        F: FnMut(&[u8]),
1300    {
1301        if uses_flat_key_storage(self.route_mode, key) {
1302            let shard = self.shards[route.shard_id].read();
1303            let value = if shard.map.has_no_ttl_entries() {
1304                shard.map.get_ref_hashed_shared_no_ttl(route.key_hash, key)
1305            } else {
1306                shard
1307                    .map
1308                    .get_ref_hashed_shared(route.key_hash, key, now_millis())
1309            };
1310            if let Some(value) = value {
1311                write(value);
1312                return true;
1313            }
1314            return false;
1315        }
1316
1317        if let Some(value) = self.get_value_bytes_routed(route, key, now_millis()) {
1318            write(value.as_ref());
1319            true
1320        } else {
1321            false
1322        }
1323    }
1324
1325    pub(super) fn with_shared_value_bytes_routed<F>(
1326        &self,
1327        route: EmbeddedKeyRoute,
1328        key: &[u8],
1329        write: &mut F,
1330    ) -> bool
1331    where
1332        F: FnMut(&bytes::Bytes),
1333    {
1334        if uses_flat_key_storage(self.route_mode, key) {
1335            let shard = self.shards[route.shard_id].read();
1336            if shard.map.has_no_ttl_entries() {
1337                return shard
1338                    .map
1339                    .with_shared_value_bytes_hashed_no_ttl(route.key_hash, key, write);
1340            }
1341            return shard.map.with_shared_value_bytes_hashed(
1342                route.key_hash,
1343                key,
1344                now_millis(),
1345                write,
1346            );
1347        }
1348
1349        if let Some(value) = self.get_value_bytes_routed(route, key, now_millis()) {
1350            write(&value);
1351            true
1352        } else {
1353            false
1354        }
1355    }
1356
1357    fn getex_value_bytes_routed(
1358        &self,
1359        route: EmbeddedKeyRoute,
1360        key: &[u8],
1361        ttl_ms: u64,
1362    ) -> Option<bytes::Bytes> {
1363        let now_ms = now_millis();
1364        let expire_at_ms = now_ms.saturating_add(ttl_ms);
1365        if uses_flat_key_storage(self.route_mode, key) {
1366            let mut shard = self.shards[route.shard_id].write();
1367            return shard.map.get_value_bytes_hashed_and_expire(
1368                route.key_hash,
1369                key,
1370                expire_at_ms,
1371                now_ms,
1372            );
1373        }
1374
1375        let value = self.get_value_bytes(key);
1376        if value.is_some() {
1377            self.expire(key, expire_at_ms);
1378        }
1379        value
1380    }
1381
1382    fn with_getex_value_bytes_routed<F>(
1383        &self,
1384        route: EmbeddedKeyRoute,
1385        key: &[u8],
1386        ttl_ms: u64,
1387        write: &mut F,
1388    ) -> bool
1389    where
1390        F: FnMut(&[u8]),
1391    {
1392        let now_ms = now_millis();
1393        let expire_at_ms = now_ms.saturating_add(ttl_ms);
1394        if uses_flat_key_storage(self.route_mode, key) {
1395            let mut shard = self.shards[route.shard_id].write();
1396            return shard.map.with_value_bytes_hashed_and_expire(
1397                route.key_hash,
1398                key,
1399                expire_at_ms,
1400                now_ms,
1401                write,
1402            );
1403        }
1404
1405        if let Some(value) = self.get_value_bytes(key) {
1406            write(value.as_ref());
1407            self.expire(key, expire_at_ms);
1408            true
1409        } else {
1410            false
1411        }
1412    }
1413
1414    fn get_with_route(&self, route: EmbeddedKeyRoute, key: &[u8], now_ms: u64) -> Option<Bytes> {
1415        // Fast path: non-session keys can answer from the read lock using the
1416        // `&self` shared accessor on `FlatMap`. This is the dominant case for
1417        // workloads that don't use derived session prefixes.
1418        if uses_flat_key_storage(self.route_mode, key) {
1419            let shard = self.shards[route.shard_id].read();
1420            return shard
1421                .map
1422                .get_ref_hashed_shared(route.key_hash, key, now_ms)
1423                .map(<[u8]>::to_vec);
1424        }
1425
1426        // Session-prefixed keys still need the write lock because
1427        // `SessionSlotMap::get_ref_hashed` requires `&mut self`.
1428        let mut shard = self.shards[route.shard_id].write();
1429        if let Some(session_prefix) = derived_session_storage_prefix(key)
1430            && let Some(value) =
1431                shard
1432                    .session_slots
1433                    .get_ref_hashed(&session_prefix, route.key_hash, key)
1434        {
1435            return Some(value.to_vec());
1436        }
1437        shard
1438            .map
1439            .get_ref_hashed(route.key_hash, key, now_ms)
1440            .map(<[u8]>::to_vec)
1441    }
1442
1443    /// Returns an owned read view for one key.
1444    ///
1445    /// The returned view owns the bytes it exposes, so callers can safely pass
1446    /// the metadata through FFI or buffer-style APIs without tying it to the
1447    /// store's lifetime.
1448    pub fn get_view(&self, key: &[u8]) -> EmbeddedReadView {
1449        let route = self.route_key(key);
1450        self.get_view_routed(route, key, now_millis())
1451    }
1452
1453    #[inline(always)]
1454    pub fn get_prepared_view_no_ttl(&self, prepared: &PreparedPointKey) -> EmbeddedReadView {
1455        let mut shard = self.shards[prepared.route().shard_id].write();
1456        let item = shard
1457            .map
1458            .get_ref_hashed_prepared_no_ttl(
1459                prepared.route().key_hash,
1460                prepared.key(),
1461                prepared.key_tag(),
1462            )
1463            .map(EmbeddedReadSlice::from_slice);
1464        drop(shard);
1465        EmbeddedReadView { item }
1466    }
1467
1468    pub fn get_view_routed_no_ttl(&self, route: EmbeddedKeyRoute, key: &[u8]) -> EmbeddedReadView {
1469        let mut shard = self.shards[route.shard_id].write();
1470        let item = if let Some(session_prefix) = derived_session_storage_prefix(key) {
1471            if shard.session_slots.has_session(&session_prefix) {
1472                shard
1473                    .session_slots
1474                    .get_ref_hashed(&session_prefix, route.key_hash, key)
1475                    .map(EmbeddedReadSlice::from_slice)
1476            } else {
1477                shard
1478                    .map
1479                    .get_ref_hashed_no_ttl(route.key_hash, key)
1480                    .map(EmbeddedReadSlice::from_slice)
1481            }
1482        } else {
1483            shard
1484                .map
1485                .get_ref_hashed_no_ttl(route.key_hash, key)
1486                .map(EmbeddedReadSlice::from_slice)
1487        };
1488        drop(shard);
1489        EmbeddedReadView { item }
1490    }
1491
1492    pub fn get_view_routed(
1493        &self,
1494        route: EmbeddedKeyRoute,
1495        key: &[u8],
1496        now_ms: u64,
1497    ) -> EmbeddedReadView {
1498        let mut shard = self.shards[route.shard_id].write();
1499        let item = if let Some(session_prefix) = derived_session_storage_prefix(key) {
1500            if shard.session_slots.has_session(&session_prefix) {
1501                shard
1502                    .session_slots
1503                    .get_ref_hashed(&session_prefix, route.key_hash, key)
1504                    .map(EmbeddedReadSlice::from_slice)
1505            } else {
1506                shard
1507                    .map
1508                    .get_ref_hashed(route.key_hash, key, now_ms)
1509                    .map(EmbeddedReadSlice::from_slice)
1510            }
1511        } else {
1512            shard
1513                .map
1514                .get_ref_hashed(route.key_hash, key, now_ms)
1515                .map(EmbeddedReadSlice::from_slice)
1516        };
1517        drop(shard);
1518        EmbeddedReadView { item }
1519    }
1520}