1use super::*;
2
3impl EmbeddedStore {
4 pub fn get(&self, key: &[u8]) -> Option<Bytes> {
6 let now_ms = now_millis();
7 let route = self.route_key(key);
8 self.get_with_route(route, key, now_ms)
9 }
10
11 pub unsafe fn get_blob_string_into_single_threaded(
22 &self,
23 key: &[u8],
24 out: &mut bytes::BytesMut,
25 ) -> bool {
26 #[cfg(not(feature = "unsafe"))]
27 {
28 self.get_blob_string_into(key, out)
29 }
30 #[cfg(feature = "unsafe")]
31 {
32 let key_hash = hash_key(key);
33 unsafe { self.get_blob_string_hashed_into_single_threaded(key_hash, key, out) }
35 }
36 }
37
38 pub unsafe fn get_blob_string_hashed_into_single_threaded(
45 &self,
46 key_hash: u64,
47 key: &[u8],
48 out: &mut bytes::BytesMut,
49 ) -> bool {
50 #[cfg(not(feature = "unsafe"))]
51 {
52 self.get_blob_string_hashed_into(key_hash, key, out)
53 }
54 #[cfg(feature = "unsafe")]
55 {
56 if self.shards.len() == 1 {
57 let shard = unsafe { &*self.shards[0].data_ptr() };
59 if can_skip_session_lookup(key, &shard.session_slots) {
60 let value = if shard.map.has_no_ttl_entries() {
61 shard.map.get_ref_hashed_shared_no_ttl(key_hash, key)
62 } else {
63 shard.map.get_ref_hashed_shared(key_hash, key, now_millis())
64 };
65 if let Some(value) = value {
66 write_resp_blob_string_into(out, value);
67 return true;
68 }
69 return false;
70 }
71 }
72
73 let route = self.route_key(key);
74 let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
76 let now_ms = if shard.map.has_no_ttl_entries() {
79 0
80 } else {
81 now_millis()
82 };
83 if uses_flat_key_storage(self.route_mode, key) {
84 if let Some(value) = shard.map.get_ref_hashed_shared(route.key_hash, key, now_ms) {
85 write_resp_blob_string_into(out, value);
86 return true;
87 }
88 return false;
89 }
90 self.get_blob_string_into(key, out)
93 }
94 }
95
96 pub unsafe fn set_single_threaded(&self, key: &[u8], value: &[u8], ttl_ms: Option<u64>) {
102 #[cfg(not(feature = "unsafe"))]
103 {
104 self.set(key.to_vec(), value.to_vec(), ttl_ms);
105 }
106 #[cfg(feature = "unsafe")]
107 {
108 let key_hash = hash_key(key);
109 unsafe { self.set_single_threaded_hashed(key_hash, key, value, ttl_ms) };
111 }
112 }
113
114 pub unsafe fn set_single_threaded_hashed(
120 &self,
121 key_hash: u64,
122 key: &[u8],
123 value: &[u8],
124 ttl_ms: Option<u64>,
125 ) {
126 #[cfg(not(feature = "unsafe"))]
127 {
128 self.set_slice_prehashed(key_hash, key, value, ttl_ms);
129 }
130 #[cfg(feature = "unsafe")]
131 {
132 if self.shards.len() == 1 {
133 let shard = unsafe { &mut *self.shards[0].data_ptr() };
135 if ttl_ms.is_none()
136 && self.route_mode == EmbeddedRouteMode::FullKey
137 && shard.memory_limit_bytes.is_none()
138 && shard.eviction_policy == EvictionPolicy::None
139 {
140 unsafe { shard.map.set_slice_hashed_no_ttl_hot(key_hash, key, value) };
145 return;
146 }
147 let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
148 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
149 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
150 shard
151 .session_slots
152 .delete_hashed(&session_prefix, key_hash, key);
153 }
154 shard
155 .map
156 .set_slice_hashed(key_hash, key, value, expire_at_ms, now_ms);
157 return;
158 }
159
160 let route = self.route_key(key);
161 let shard = unsafe { &mut *self.shards[route.shard_id].data_ptr() };
163 let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
164 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
165 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
166 shard
167 .session_slots
168 .delete_hashed(&session_prefix, route.key_hash, key);
169 }
170 shard
171 .map
172 .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
173 }
174 }
175
176 #[inline(always)]
183 pub unsafe fn set_single_threaded_hashed_tagged_no_ttl_hot(
184 &self,
185 key_hash: u64,
186 key_tag: u64,
187 key: &[u8],
188 value: &[u8],
189 ) {
190 #[cfg(not(feature = "unsafe"))]
191 {
192 let _ = key_tag;
193 self.set_slice_prehashed(key_hash, key, value, None);
194 }
195 #[cfg(feature = "unsafe")]
196 {
197 debug_assert_eq!(self.shards.len(), 1);
198 debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
199 let shard = unsafe { &mut *self.shards[0].data_ptr() };
201 debug_assert_eq!(shard.memory_limit_bytes, None);
202 debug_assert_eq!(shard.eviction_policy, EvictionPolicy::None);
203 unsafe {
206 shard
207 .map
208 .set_slice_hashed_tagged_no_ttl_hot(key_hash, key_tag, key, value)
209 };
210 }
211 }
212
213 pub fn get_blob_string_into(&self, key: &[u8], out: &mut bytes::BytesMut) -> bool {
218 let route = self.route_key(key);
219 self.get_blob_string_routed_into(route, key, out)
220 }
221
222 pub fn get_blob_string_hashed_into(
225 &self,
226 key_hash: u64,
227 key: &[u8],
228 out: &mut bytes::BytesMut,
229 ) -> bool {
230 if can_route_with_key_hash(self.route_mode, self.shards.len(), key) {
231 let route = EmbeddedKeyRoute {
232 shard_id: self.route_hash(key_hash),
233 key_hash,
234 };
235 return self.get_blob_string_routed_into(route, key, out);
236 }
237
238 self.get_blob_string_into(key, out)
239 }
240
241 fn get_blob_string_routed_into(
242 &self,
243 route: EmbeddedKeyRoute,
244 key: &[u8],
245 out: &mut bytes::BytesMut,
246 ) -> bool {
247 use bytes::BufMut;
248 if uses_flat_key_storage(self.route_mode, key) {
250 let shard = self.shards[route.shard_id].read();
251 let now_ms = if shard.map.has_no_ttl_entries() {
253 0
254 } else {
255 now_millis()
256 };
257 if let Some(value) = shard.map.get_ref_hashed_shared(route.key_hash, key, now_ms) {
258 write_resp_blob_string_into(out, value);
260 return true;
261 }
262 return false;
263 }
264 let mut shard = self.shards[route.shard_id].write();
266 if let Some(session_prefix) = derived_session_storage_prefix(key)
267 && let Some(value) =
268 shard
269 .session_slots
270 .get_ref_hashed(&session_prefix, route.key_hash, key)
271 {
272 out.put_u8(b'$');
273 let mut buf = itoa::Buffer::new();
274 out.extend_from_slice(buf.format(value.len()).as_bytes());
275 out.extend_from_slice(b"\r\n");
276 out.extend_from_slice(value);
277 out.extend_from_slice(b"\r\n");
278 return true;
279 }
280 let now_ms_session = now_millis();
281 if let Some(value) = shard
282 .map
283 .get_ref_hashed(route.key_hash, key, now_ms_session)
284 {
285 out.put_u8(b'$');
286 let mut buf = itoa::Buffer::new();
287 out.extend_from_slice(buf.format(value.len()).as_bytes());
288 out.extend_from_slice(b"\r\n");
289 out.extend_from_slice(value);
290 out.extend_from_slice(b"\r\n");
291 return true;
292 }
293 false
294 }
295
296 pub fn set_slice_prehashed(
299 &self,
300 key_hash: u64,
301 key: &[u8],
302 value: &[u8],
303 ttl_ms: Option<u64>,
304 ) {
305 if !can_route_with_key_hash(self.route_mode, self.shards.len(), key) {
306 self.set(key.to_vec(), value.to_vec(), ttl_ms);
307 return;
308 }
309
310 let route = EmbeddedKeyRoute {
311 shard_id: self.route_hash(key_hash),
312 key_hash,
313 };
314 if self.objects.has_objects() {
315 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
316 let mut shard = self.shards[route.shard_id].write();
317 if bucket.delete_any(key) {
318 self.objects.note_deleted(route.shard_id);
319 }
320 let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
321 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
322 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
323 shard
324 .session_slots
325 .delete_hashed(&session_prefix, route.key_hash, key);
326 }
327 shard
328 .map
329 .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
330 shard.enforce_memory_limit(now_ms);
331 return;
332 }
333 let mut shard = self.shards[route.shard_id].write();
334 let now_ms = write_now_ms(ttl_ms, shard.memory_limit_bytes);
335 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
336 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
337 shard
338 .session_slots
339 .delete_hashed(&session_prefix, route.key_hash, key);
340 }
341 shard
342 .map
343 .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
344 shard.enforce_memory_limit(now_ms);
345 }
346
347 pub fn get_value_bytes(&self, key: &[u8]) -> Option<bytes::Bytes> {
352 let now_ms = now_millis();
353 let route = self.route_key(key);
354 self.get_value_bytes_routed(route, key, now_ms)
355 }
356
357 pub fn get_value_bytes_route_hashed(
362 &self,
363 route_hash: u64,
364 key: &[u8],
365 ) -> Option<bytes::Bytes> {
366 if can_use_route_hash_as_key_hash(self.route_mode, key) {
367 let route = EmbeddedKeyRoute {
368 shard_id: self.route_hash(route_hash),
369 key_hash: route_hash,
370 };
371 return self.get_value_bytes_routed(route, key, now_millis());
372 }
373
374 self.get_value_bytes(key)
375 }
376
377 pub fn with_value_bytes_route_hashed<F>(
381 &self,
382 route_hash: u64,
383 key: &[u8],
384 mut write: F,
385 ) -> bool
386 where
387 F: FnMut(&[u8]),
388 {
389 if can_use_route_hash_as_key_hash(self.route_mode, key) {
390 let route = EmbeddedKeyRoute {
391 shard_id: self.route_hash(route_hash),
392 key_hash: route_hash,
393 };
394 return self.with_value_bytes_routed(route, key, &mut write);
395 }
396
397 if let Some(value) = self.get_value_bytes(key) {
398 write(value.as_ref());
399 true
400 } else {
401 false
402 }
403 }
404
405 pub fn with_shared_value_bytes_route_hashed<F>(
410 &self,
411 route_hash: u64,
412 key: &[u8],
413 mut write: F,
414 ) -> bool
415 where
416 F: FnMut(&bytes::Bytes),
417 {
418 if can_use_route_hash_as_key_hash(self.route_mode, key) {
419 let route = EmbeddedKeyRoute {
420 shard_id: self.route_hash(route_hash),
421 key_hash: route_hash,
422 };
423 return self.with_shared_value_bytes_routed(route, key, &mut write);
424 }
425
426 if let Some(value) = self.get_value_bytes(key) {
427 write(&value);
428 true
429 } else {
430 false
431 }
432 }
433
434 pub fn with_shared_value_bytes_full_key_tagged_no_ttl<F>(
439 &self,
440 route_hash: u64,
441 key_tag: u64,
442 key_len: usize,
443 mut write: F,
444 ) -> bool
445 where
446 F: FnMut(&bytes::Bytes),
447 {
448 if self.route_mode != EmbeddedRouteMode::FullKey {
449 return false;
450 }
451 let route = EmbeddedKeyRoute {
452 shard_id: self.route_hash(route_hash),
453 key_hash: route_hash,
454 };
455 let shard = self.shards[route.shard_id].read();
456 if !shard.map.has_no_ttl_entries() {
457 return false;
458 }
459 if let Some(value) =
460 shard
461 .map
462 .get_shared_value_bytes_hashed_tagged_no_ttl(route.key_hash, key_tag, key_len)
463 {
464 write(value);
465 true
466 } else {
467 false
468 }
469 }
470
471 pub fn getex_value_bytes_route_hashed(
474 &self,
475 route_hash: Option<u64>,
476 key: &[u8],
477 ttl_ms: u64,
478 ) -> Option<bytes::Bytes> {
479 if let Some(key_hash) = route_hash
480 && can_use_route_hash_as_key_hash(self.route_mode, key)
481 {
482 let route = EmbeddedKeyRoute {
483 shard_id: self.route_hash(key_hash),
484 key_hash,
485 };
486 return self.getex_value_bytes_routed(route, key, ttl_ms);
487 }
488
489 let route = self.route_key(key);
490 self.getex_value_bytes_routed(route, key, ttl_ms)
491 }
492
493 pub fn with_getex_value_bytes_route_hashed<F>(
497 &self,
498 route_hash: Option<u64>,
499 key: &[u8],
500 ttl_ms: u64,
501 mut write: F,
502 ) -> bool
503 where
504 F: FnMut(&[u8]),
505 {
506 if let Some(key_hash) = route_hash
507 && can_use_route_hash_as_key_hash(self.route_mode, key)
508 {
509 let route = EmbeddedKeyRoute {
510 shard_id: self.route_hash(key_hash),
511 key_hash,
512 };
513 return self.with_getex_value_bytes_routed(route, key, ttl_ms, &mut write);
514 }
515
516 let route = self.route_key(key);
517 self.with_getex_value_bytes_routed(route, key, ttl_ms, &mut write)
518 }
519
520 pub unsafe fn get_value_bytes_route_hashed_single_threaded(
528 &self,
529 route_hash: u64,
530 key: &[u8],
531 ) -> Option<bytes::Bytes> {
532 #[cfg(not(feature = "unsafe"))]
533 {
534 self.get_value_bytes_route_hashed(route_hash, key)
535 }
536 #[cfg(feature = "unsafe")]
537 {
538 if can_use_route_hash_as_key_hash(self.route_mode, key) {
539 if self.shards.len() == 1 {
540 let shard = unsafe { &*self.shards[0].data_ptr() };
542 if can_skip_session_lookup(key, &shard.session_slots) {
543 let now_ms = if shard.map.has_no_ttl_entries() {
544 0
545 } else {
546 now_millis()
547 };
548 return shard.map.get_value_bytes_hashed(route_hash, key, now_ms);
549 }
550 }
551
552 let route = EmbeddedKeyRoute {
553 shard_id: self.route_hash(route_hash),
554 key_hash: route_hash,
555 };
556 let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
558 if uses_flat_key_storage(self.route_mode, key) {
559 let now_ms = if shard.map.has_no_ttl_entries() {
560 0
561 } else {
562 now_millis()
563 };
564 return shard
565 .map
566 .get_value_bytes_hashed(route.key_hash, key, now_ms);
567 }
568 }
569
570 self.get_value_bytes(key)
571 }
572 }
573
574 pub unsafe fn with_value_bytes_route_hashed_single_threaded<F>(
581 &self,
582 route_hash: u64,
583 key: &[u8],
584 write: F,
585 ) -> bool
586 where
587 F: FnMut(&[u8]),
588 {
589 #[cfg(not(feature = "unsafe"))]
590 {
591 self.with_value_bytes_route_hashed(route_hash, key, write)
592 }
593 #[cfg(feature = "unsafe")]
594 {
595 let mut write = write;
596 if can_use_route_hash_as_key_hash(self.route_mode, key) {
597 if self.shards.len() == 1 {
598 let shard = unsafe { &*self.shards[0].data_ptr() };
600 if can_skip_session_lookup(key, &shard.session_slots) {
601 let value = if shard.map.has_no_ttl_entries() {
602 shard.map.get_ref_hashed_shared_no_ttl(route_hash, key)
603 } else {
604 shard
605 .map
606 .get_ref_hashed_shared(route_hash, key, now_millis())
607 };
608 if let Some(value) = value {
609 write(value);
610 return true;
611 }
612 return false;
613 }
614 }
615
616 let route = EmbeddedKeyRoute {
617 shard_id: self.route_hash(route_hash),
618 key_hash: route_hash,
619 };
620 let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
622 if uses_flat_key_storage(self.route_mode, key) {
623 let value = if shard.map.has_no_ttl_entries() {
624 shard.map.get_ref_hashed_shared_no_ttl(route.key_hash, key)
625 } else {
626 shard
627 .map
628 .get_ref_hashed_shared(route.key_hash, key, now_millis())
629 };
630 if let Some(value) = value {
631 write(value);
632 return true;
633 }
634 return false;
635 }
636 }
637
638 self.with_value_bytes_route_hashed(route_hash, key, write)
639 }
640 }
641
642 pub unsafe fn with_shared_value_bytes_route_hashed_single_threaded<F>(
652 &self,
653 route_hash: u64,
654 key: &[u8],
655 write: F,
656 ) -> bool
657 where
658 F: FnMut(&bytes::Bytes),
659 {
660 #[cfg(not(feature = "unsafe"))]
661 {
662 self.with_shared_value_bytes_route_hashed(route_hash, key, write)
663 }
664 #[cfg(feature = "unsafe")]
665 {
666 let mut write = write;
667 if can_use_route_hash_as_key_hash(self.route_mode, key) {
668 if self.shards.len() == 1 {
669 let shard = unsafe { &*self.shards[0].data_ptr() };
671 if can_skip_session_lookup(key, &shard.session_slots) {
672 if shard.map.has_no_ttl_entries() {
673 return shard.map.with_shared_value_bytes_hashed_no_ttl(
674 route_hash, key, &mut write,
675 );
676 }
677 return shard.map.with_shared_value_bytes_hashed(
678 route_hash,
679 key,
680 now_millis(),
681 &mut write,
682 );
683 }
684 }
685
686 let route = EmbeddedKeyRoute {
687 shard_id: self.route_hash(route_hash),
688 key_hash: route_hash,
689 };
690 let shard = unsafe { &*self.shards[route.shard_id].data_ptr() };
692 if uses_flat_key_storage(self.route_mode, key) {
693 if shard.map.has_no_ttl_entries() {
694 return shard.map.with_shared_value_bytes_hashed_no_ttl(
695 route.key_hash,
696 key,
697 &mut write,
698 );
699 }
700 return shard.map.with_shared_value_bytes_hashed(
701 route.key_hash,
702 key,
703 now_millis(),
704 &mut write,
705 );
706 }
707 }
708
709 if let Some(value) = self.get_value_bytes(key) {
710 write(&value);
711 true
712 } else {
713 false
714 }
715 }
716 }
717
718 #[inline(always)]
728 pub unsafe fn with_shared_value_bytes_full_key_single_threaded<F>(
729 &self,
730 key_hash: u64,
731 key: &[u8],
732 write: F,
733 ) -> bool
734 where
735 F: FnMut(&bytes::Bytes),
736 {
737 #[cfg(not(feature = "unsafe"))]
738 {
739 self.with_shared_value_bytes_route_hashed(key_hash, key, write)
740 }
741 #[cfg(feature = "unsafe")]
742 {
743 let mut write = write;
744 debug_assert_eq!(self.shards.len(), 1);
745 debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
746
747 let shard = unsafe { &*self.shards[0].data_ptr() };
749 if shard.map.has_no_ttl_entries() {
750 return shard
751 .map
752 .with_shared_value_bytes_hashed_no_ttl(key_hash, key, &mut write);
753 }
754 shard
755 .map
756 .with_shared_value_bytes_hashed(key_hash, key, now_millis(), &mut write)
757 }
758 }
759
760 #[inline(always)]
770 pub unsafe fn get_shared_value_bytes_full_key_single_threaded(
771 &self,
772 key_hash: u64,
773 key: &[u8],
774 ) -> Option<&bytes::Bytes> {
775 #[cfg(not(feature = "unsafe"))]
776 {
777 let _ = (key_hash, key);
778 None
779 }
780 #[cfg(feature = "unsafe")]
781 {
782 debug_assert_eq!(self.shards.len(), 1);
783 debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
784
785 let shard = unsafe { &*self.shards[0].data_ptr() };
787 if shard.map.has_no_ttl_entries() {
788 return shard
789 .map
790 .get_shared_value_bytes_hashed_no_ttl(key_hash, key);
791 }
792 shard
793 .map
794 .get_shared_value_bytes_hashed(key_hash, key, now_millis())
795 }
796 }
797
798 #[inline(always)]
809 pub unsafe fn get_shared_value_bytes_full_key_tagged_single_threaded(
810 &self,
811 key_hash: u64,
812 key_tag: u64,
813 key_len: usize,
814 ) -> Option<&bytes::Bytes> {
815 #[cfg(not(feature = "unsafe"))]
816 {
817 let _ = (key_hash, key_tag, key_len);
818 None
819 }
820 #[cfg(feature = "unsafe")]
821 {
822 debug_assert_eq!(self.shards.len(), 1);
823 debug_assert_eq!(self.route_mode, EmbeddedRouteMode::FullKey);
824
825 let shard = unsafe { &*self.shards[0].data_ptr() };
827 if shard.map.has_no_ttl_entries() {
828 return shard
829 .map
830 .get_shared_value_bytes_hashed_tagged_no_ttl(key_hash, key_tag, key_len);
831 }
832 None
833 }
834 }
835
836 #[inline(always)]
842 pub fn with_shared_value_bytes_full_key_owned_shard_no_ttl<F>(
843 &self,
844 shard_id: usize,
845 key_hash: u64,
846 key: &[u8],
847 write: F,
848 ) -> Option<bool>
849 where
850 F: FnMut(&bytes::Bytes),
851 {
852 if self.route_mode != EmbeddedRouteMode::FullKey
853 || shard_id >= self.shards.len()
854 || self.route_hash(key_hash) != shard_id
855 {
856 return None;
857 }
858
859 let shard = self.shards[shard_id].read();
860 if !shard.map.has_no_ttl_entries() {
861 return None;
862 }
863
864 let mut write = write;
865 Some(
866 shard
867 .map
868 .with_shared_value_bytes_hashed_no_ttl(key_hash, key, &mut write),
869 )
870 }
871
872 #[inline(always)]
889 pub unsafe fn with_shared_value_bytes_full_key_tagged_owned_shard_no_ttl<F>(
890 &self,
891 shard_id: usize,
892 key_hash: u64,
893 key_tag: u64,
894 key_len: usize,
895 write: F,
896 ) -> Option<bool>
897 where
898 F: FnMut(&bytes::Bytes),
899 {
900 #[cfg(not(feature = "unsafe"))]
901 {
902 let _ = (shard_id, key_hash, key_tag, key_len, write);
903 None
904 }
905 #[cfg(feature = "unsafe")]
906 {
907 if self.route_mode != EmbeddedRouteMode::FullKey
908 || shard_id >= self.shards.len()
909 || self.route_hash(key_hash) != shard_id
910 {
911 return None;
912 }
913
914 let mut write = write;
915 let shard = unsafe { &*self.shards[shard_id].data_ptr() };
917 if !shard.map.has_no_ttl_entries() {
918 return None;
919 }
920 if let Some(value) = shard
921 .map
922 .get_shared_value_bytes_hashed_tagged_no_ttl(key_hash, key_tag, key_len)
923 {
924 write(value);
925 Some(true)
926 } else {
927 Some(false)
928 }
929 }
930 }
931
932 #[inline(always)]
946 pub unsafe fn with_session_value_slice_owned_shard_no_ttl_prevalidated<F>(
947 &self,
948 shard_id: usize,
949 session_hash: u64,
950 key_hash: u64,
951 session_prefix: &[u8],
952 key: &[u8],
953 write: F,
954 ) -> Option<bool>
955 where
956 F: FnMut(&[u8]),
957 {
958 if self.route_mode != EmbeddedRouteMode::SessionPrefix || shard_id >= self.shards.len() {
959 return None;
960 }
961 let mut write = write;
962
963 #[cfg(not(feature = "unsafe"))]
964 {
965 let shard = self.shards[shard_id].read();
966 match shard.get_session_ref_hashed_shared_no_ttl_prehashed(
967 session_hash,
968 session_prefix,
969 key_hash,
970 key,
971 ) {
972 Some(value) => {
973 write(value);
974 Some(true)
975 }
976 None => Some(false),
977 }
978 }
979 #[cfg(feature = "unsafe")]
980 {
981 let shard = unsafe { &*self.shards[shard_id].data_ptr() };
983 match shard.get_session_ref_hashed_shared_no_ttl_prehashed(
984 session_hash,
985 session_prefix,
986 key_hash,
987 key,
988 ) {
989 Some(value) => {
990 write(value);
991 Some(true)
992 }
993 None => Some(false),
994 }
995 }
996 }
997
998 #[inline(always)]
1004 pub fn set_session_slice_hashed_owned_shard_no_ttl_prevalidated(
1005 &self,
1006 shard_id: usize,
1007 session_hash: u64,
1008 key_hash: u64,
1009 session_prefix: &[u8],
1010 key: &[u8],
1011 value: &[u8],
1012 ) -> bool {
1013 if shard_id >= self.shards.len() || self.objects.has_objects() {
1014 return false;
1015 }
1016
1017 let mut shard = self.shards[shard_id].write();
1018 if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1019 return false;
1020 }
1021 shard.set_session_slice_hashed_no_ttl_prehashed(
1022 session_hash,
1023 session_prefix,
1024 key_hash,
1025 key,
1026 value,
1027 );
1028 true
1029 }
1030
1031 #[inline(always)]
1038 pub unsafe fn set_session_slice_hashed_owned_shard_no_ttl_hot_prevalidated(
1039 &self,
1040 shard_id: usize,
1041 session_hash: u64,
1042 key_hash: u64,
1043 session_prefix: &[u8],
1044 key: &[u8],
1045 value: &[u8],
1046 ) -> bool {
1047 #[cfg(not(feature = "unsafe"))]
1048 {
1049 let _ = (shard_id, session_hash, key_hash, session_prefix, key, value);
1050 false
1051 }
1052 #[cfg(feature = "unsafe")]
1053 {
1054 if shard_id >= self.shards.len() || self.objects.has_objects() {
1055 return false;
1056 }
1057
1058 let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1060 if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1061 return false;
1062 }
1063 shard.set_session_slice_hashed_no_ttl_prehashed(
1064 session_hash,
1065 session_prefix,
1066 key_hash,
1067 key,
1068 value,
1069 );
1070 true
1071 }
1072 }
1073
1074 #[inline(always)]
1081 pub fn set_slice_hashed_tagged_owned_shard_no_ttl(
1082 &self,
1083 shard_id: usize,
1084 key_hash: u64,
1085 key_tag: u64,
1086 key: &[u8],
1087 value: &[u8],
1088 ) -> bool {
1089 if shard_id >= self.shards.len() || self.objects.has_objects() {
1090 return false;
1091 }
1092
1093 let mut shard = self.shards[shard_id].write();
1094 if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1095 return false;
1096 }
1097 match self.route_mode {
1098 EmbeddedRouteMode::FullKey => {
1099 if self.route_hash(key_hash) != shard_id
1100 || point_write_session_storage_prefix(key).is_some()
1101 {
1102 return false;
1103 }
1104 shard
1105 .map
1106 .set_slice_hashed_tagged_no_ttl_local(key_hash, key_tag, key, value);
1107 }
1108 EmbeddedRouteMode::SessionPrefix => {
1109 let Some(session_prefix) = point_write_session_storage_prefix(key) else {
1110 return false;
1111 };
1112 let route = compute_key_route(self.route_mode, self.shift, key);
1113 if route.shard_id != shard_id || route.key_hash != key_hash {
1114 return false;
1115 }
1116 shard.set_session_slice_hashed_no_ttl(&session_prefix, key_hash, key, value);
1117 }
1118 }
1119 true
1120 }
1121
1122 #[inline(always)]
1129 pub unsafe fn set_slice_hashed_tagged_owned_shard_no_ttl_hot(
1130 &self,
1131 shard_id: usize,
1132 key_hash: u64,
1133 key_tag: u64,
1134 key: &[u8],
1135 value: &[u8],
1136 ) -> bool {
1137 #[cfg(not(feature = "unsafe"))]
1138 {
1139 let _ = (shard_id, key_hash, key_tag, key, value);
1140 false
1141 }
1142 #[cfg(feature = "unsafe")]
1143 {
1144 if shard_id >= self.shards.len() {
1145 return false;
1146 }
1147
1148 let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1150 if shard.memory_limit_bytes.is_some() || shard.eviction_policy != EvictionPolicy::None {
1151 return false;
1152 }
1153 match self.route_mode {
1154 EmbeddedRouteMode::FullKey => {
1155 if self.route_hash(key_hash) != shard_id
1156 || point_write_session_storage_prefix(key).is_some()
1157 {
1158 return false;
1159 }
1160 unsafe {
1163 shard
1164 .map
1165 .set_slice_hashed_tagged_no_ttl_hot(key_hash, key_tag, key, value)
1166 };
1167 }
1168 EmbeddedRouteMode::SessionPrefix => {
1169 let Some(session_prefix) = point_write_session_storage_prefix(key) else {
1170 return false;
1171 };
1172 let route = compute_key_route(self.route_mode, self.shift, key);
1173 if route.shard_id != shard_id || route.key_hash != key_hash {
1174 return false;
1175 }
1176 shard.set_session_slice_hashed_no_ttl(&session_prefix, key_hash, key, value);
1177 }
1178 }
1179 true
1180 }
1181 }
1182
1183 pub unsafe fn getex_value_bytes_route_hashed_single_threaded(
1190 &self,
1191 route_hash: Option<u64>,
1192 key: &[u8],
1193 ttl_ms: u64,
1194 ) -> Option<bytes::Bytes> {
1195 #[cfg(not(feature = "unsafe"))]
1196 {
1197 self.getex_value_bytes_route_hashed(route_hash, key, ttl_ms)
1198 }
1199 #[cfg(feature = "unsafe")]
1200 {
1201 let now_ms = now_millis();
1202 let expire_at_ms = now_ms.saturating_add(ttl_ms);
1203 if let Some(key_hash) = route_hash
1204 && can_use_route_hash_as_key_hash(self.route_mode, key)
1205 {
1206 let shard_id = self.route_hash(key_hash);
1207 let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1209 if uses_flat_key_storage(self.route_mode, key) {
1210 return shard.map.get_value_bytes_hashed_and_expire(
1211 key_hash,
1212 key,
1213 expire_at_ms,
1214 now_ms,
1215 );
1216 }
1217 }
1218
1219 self.getex_value_bytes_route_hashed(route_hash, key, ttl_ms)
1220 }
1221 }
1222
1223 pub unsafe fn with_getex_value_bytes_route_hashed_single_threaded<F>(
1230 &self,
1231 route_hash: Option<u64>,
1232 key: &[u8],
1233 ttl_ms: u64,
1234 write: F,
1235 ) -> bool
1236 where
1237 F: FnMut(&[u8]),
1238 {
1239 #[cfg(not(feature = "unsafe"))]
1240 {
1241 self.with_getex_value_bytes_route_hashed(route_hash, key, ttl_ms, write)
1242 }
1243 #[cfg(feature = "unsafe")]
1244 {
1245 let mut write = write;
1246 let now_ms = now_millis();
1247 let expire_at_ms = now_ms.saturating_add(ttl_ms);
1248 if let Some(key_hash) = route_hash
1249 && can_use_route_hash_as_key_hash(self.route_mode, key)
1250 {
1251 let shard_id = self.route_hash(key_hash);
1252 let shard = unsafe { &mut *self.shards[shard_id].data_ptr() };
1254 if uses_flat_key_storage(self.route_mode, key) {
1255 return shard.map.with_value_bytes_hashed_and_expire(
1256 key_hash,
1257 key,
1258 expire_at_ms,
1259 now_ms,
1260 &mut write,
1261 );
1262 }
1263 }
1264
1265 self.with_getex_value_bytes_route_hashed(route_hash, key, ttl_ms, write)
1266 }
1267 }
1268
1269 fn get_value_bytes_routed(
1270 &self,
1271 route: EmbeddedKeyRoute,
1272 key: &[u8],
1273 now_ms: u64,
1274 ) -> Option<bytes::Bytes> {
1275 if uses_flat_key_storage(self.route_mode, key) {
1276 let shard = self.shards[route.shard_id].read();
1277 return shard
1278 .map
1279 .get_value_bytes_hashed(route.key_hash, key, now_ms);
1280 }
1281 let mut shard = self.shards[route.shard_id].write();
1284 if let Some(session_prefix) = derived_session_storage_prefix(key)
1285 && let Some(value) =
1286 shard
1287 .session_slots
1288 .get_ref_hashed(&session_prefix, route.key_hash, key)
1289 {
1290 return Some(bytes::Bytes::copy_from_slice(value));
1291 }
1292 shard
1293 .map
1294 .get_value_bytes_hashed(route.key_hash, key, now_ms)
1295 }
1296
1297 fn with_value_bytes_routed<F>(&self, route: EmbeddedKeyRoute, key: &[u8], write: &mut F) -> bool
1298 where
1299 F: FnMut(&[u8]),
1300 {
1301 if uses_flat_key_storage(self.route_mode, key) {
1302 let shard = self.shards[route.shard_id].read();
1303 let value = if shard.map.has_no_ttl_entries() {
1304 shard.map.get_ref_hashed_shared_no_ttl(route.key_hash, key)
1305 } else {
1306 shard
1307 .map
1308 .get_ref_hashed_shared(route.key_hash, key, now_millis())
1309 };
1310 if let Some(value) = value {
1311 write(value);
1312 return true;
1313 }
1314 return false;
1315 }
1316
1317 if let Some(value) = self.get_value_bytes_routed(route, key, now_millis()) {
1318 write(value.as_ref());
1319 true
1320 } else {
1321 false
1322 }
1323 }
1324
1325 pub(super) fn with_shared_value_bytes_routed<F>(
1326 &self,
1327 route: EmbeddedKeyRoute,
1328 key: &[u8],
1329 write: &mut F,
1330 ) -> bool
1331 where
1332 F: FnMut(&bytes::Bytes),
1333 {
1334 if uses_flat_key_storage(self.route_mode, key) {
1335 let shard = self.shards[route.shard_id].read();
1336 if shard.map.has_no_ttl_entries() {
1337 return shard
1338 .map
1339 .with_shared_value_bytes_hashed_no_ttl(route.key_hash, key, write);
1340 }
1341 return shard.map.with_shared_value_bytes_hashed(
1342 route.key_hash,
1343 key,
1344 now_millis(),
1345 write,
1346 );
1347 }
1348
1349 if let Some(value) = self.get_value_bytes_routed(route, key, now_millis()) {
1350 write(&value);
1351 true
1352 } else {
1353 false
1354 }
1355 }
1356
1357 fn getex_value_bytes_routed(
1358 &self,
1359 route: EmbeddedKeyRoute,
1360 key: &[u8],
1361 ttl_ms: u64,
1362 ) -> Option<bytes::Bytes> {
1363 let now_ms = now_millis();
1364 let expire_at_ms = now_ms.saturating_add(ttl_ms);
1365 if uses_flat_key_storage(self.route_mode, key) {
1366 let mut shard = self.shards[route.shard_id].write();
1367 return shard.map.get_value_bytes_hashed_and_expire(
1368 route.key_hash,
1369 key,
1370 expire_at_ms,
1371 now_ms,
1372 );
1373 }
1374
1375 let value = self.get_value_bytes(key);
1376 if value.is_some() {
1377 self.expire(key, expire_at_ms);
1378 }
1379 value
1380 }
1381
1382 fn with_getex_value_bytes_routed<F>(
1383 &self,
1384 route: EmbeddedKeyRoute,
1385 key: &[u8],
1386 ttl_ms: u64,
1387 write: &mut F,
1388 ) -> bool
1389 where
1390 F: FnMut(&[u8]),
1391 {
1392 let now_ms = now_millis();
1393 let expire_at_ms = now_ms.saturating_add(ttl_ms);
1394 if uses_flat_key_storage(self.route_mode, key) {
1395 let mut shard = self.shards[route.shard_id].write();
1396 return shard.map.with_value_bytes_hashed_and_expire(
1397 route.key_hash,
1398 key,
1399 expire_at_ms,
1400 now_ms,
1401 write,
1402 );
1403 }
1404
1405 if let Some(value) = self.get_value_bytes(key) {
1406 write(value.as_ref());
1407 self.expire(key, expire_at_ms);
1408 true
1409 } else {
1410 false
1411 }
1412 }
1413
1414 fn get_with_route(&self, route: EmbeddedKeyRoute, key: &[u8], now_ms: u64) -> Option<Bytes> {
1415 if uses_flat_key_storage(self.route_mode, key) {
1419 let shard = self.shards[route.shard_id].read();
1420 return shard
1421 .map
1422 .get_ref_hashed_shared(route.key_hash, key, now_ms)
1423 .map(<[u8]>::to_vec);
1424 }
1425
1426 let mut shard = self.shards[route.shard_id].write();
1429 if let Some(session_prefix) = derived_session_storage_prefix(key)
1430 && let Some(value) =
1431 shard
1432 .session_slots
1433 .get_ref_hashed(&session_prefix, route.key_hash, key)
1434 {
1435 return Some(value.to_vec());
1436 }
1437 shard
1438 .map
1439 .get_ref_hashed(route.key_hash, key, now_ms)
1440 .map(<[u8]>::to_vec)
1441 }
1442
1443 pub fn get_view(&self, key: &[u8]) -> EmbeddedReadView {
1449 let route = self.route_key(key);
1450 self.get_view_routed(route, key, now_millis())
1451 }
1452
1453 #[inline(always)]
1454 pub fn get_prepared_view_no_ttl(&self, prepared: &PreparedPointKey) -> EmbeddedReadView {
1455 let mut shard = self.shards[prepared.route().shard_id].write();
1456 let item = shard
1457 .map
1458 .get_ref_hashed_prepared_no_ttl(
1459 prepared.route().key_hash,
1460 prepared.key(),
1461 prepared.key_tag(),
1462 )
1463 .map(EmbeddedReadSlice::from_slice);
1464 drop(shard);
1465 EmbeddedReadView { item }
1466 }
1467
1468 pub fn get_view_routed_no_ttl(&self, route: EmbeddedKeyRoute, key: &[u8]) -> EmbeddedReadView {
1469 let mut shard = self.shards[route.shard_id].write();
1470 let item = if let Some(session_prefix) = derived_session_storage_prefix(key) {
1471 if shard.session_slots.has_session(&session_prefix) {
1472 shard
1473 .session_slots
1474 .get_ref_hashed(&session_prefix, route.key_hash, key)
1475 .map(EmbeddedReadSlice::from_slice)
1476 } else {
1477 shard
1478 .map
1479 .get_ref_hashed_no_ttl(route.key_hash, key)
1480 .map(EmbeddedReadSlice::from_slice)
1481 }
1482 } else {
1483 shard
1484 .map
1485 .get_ref_hashed_no_ttl(route.key_hash, key)
1486 .map(EmbeddedReadSlice::from_slice)
1487 };
1488 drop(shard);
1489 EmbeddedReadView { item }
1490 }
1491
1492 pub fn get_view_routed(
1493 &self,
1494 route: EmbeddedKeyRoute,
1495 key: &[u8],
1496 now_ms: u64,
1497 ) -> EmbeddedReadView {
1498 let mut shard = self.shards[route.shard_id].write();
1499 let item = if let Some(session_prefix) = derived_session_storage_prefix(key) {
1500 if shard.session_slots.has_session(&session_prefix) {
1501 shard
1502 .session_slots
1503 .get_ref_hashed(&session_prefix, route.key_hash, key)
1504 .map(EmbeddedReadSlice::from_slice)
1505 } else {
1506 shard
1507 .map
1508 .get_ref_hashed(route.key_hash, key, now_ms)
1509 .map(EmbeddedReadSlice::from_slice)
1510 }
1511 } else {
1512 shard
1513 .map
1514 .get_ref_hashed(route.key_hash, key, now_ms)
1515 .map(EmbeddedReadSlice::from_slice)
1516 };
1517 drop(shard);
1518 EmbeddedReadView { item }
1519 }
1520}