1#[cfg(not(feature = "embedded-read-biased-lock"))]
2use parking_lot::RwLockWriteGuard;
3#[cfg(feature = "embedded-read-biased-lock")]
4use rblock::RwLockWriteGuard;
5
6use crate::storage::FlatMap;
7
8use super::batch_results::{
9 BatchReadViewBuilder, OrderedBatchReadViewBuilder, OrderedPackedBatchBuilder,
10 PackedBatchBuilder,
11};
12use super::*;
13
14#[derive(Debug)]
20pub struct EmbeddedShardHandle<'a> {
21 shard_id: usize,
22 shard: RwLockWriteGuard<'a, EmbeddedShard>,
23}
24
25impl<'a> EmbeddedShardHandle<'a> {
26 #[inline(always)]
27 pub fn shard_id(&self) -> usize {
28 self.shard_id
29 }
30
31 #[inline(always)]
32 pub fn get_ref_no_ttl_hashed(&mut self, key_hash: u64, key: &[u8]) -> Option<&[u8]> {
33 self.shard.map.get_ref_hashed_no_ttl(key_hash, key)
34 }
35
36 #[inline(always)]
37 pub fn set_hashed_no_ttl<K, V>(&mut self, key_hash: u64, key: K, value: V)
38 where
39 K: Into<Bytes>,
40 V: Into<Bytes>,
41 {
42 self.shard.map.set_hashed(key_hash, key, value, None, 0);
43 self.shard.enforce_memory_limit(0);
44 }
45
46 #[inline(always)]
47 pub fn set_slice_hashed_no_ttl(&mut self, key_hash: u64, key: &[u8], value: &[u8]) {
48 self.shard
49 .map
50 .set_slice_hashed(key_hash, key, value, None, 0);
51 self.shard.enforce_memory_limit(0);
52 }
53}
54
55#[derive(Debug)]
60pub struct OwnedEmbeddedShard {
61 shard_id: usize,
62 map: FlatMap,
63 session_slots: SessionSlotMap,
64 memory_limit_bytes: Option<usize>,
65 eviction_policy: EvictionPolicy,
66}
67
68impl OwnedEmbeddedShard {
69 #[inline(always)]
70 fn update_lazy_read_sampling(&mut self) {
71 let enabled = if self.eviction_policy == EvictionPolicy::None {
72 false
73 } else if let Some(limit) = self.memory_limit_bytes {
74 let watermark = limit.saturating_mul(3) / 4;
75 self.stored_bytes() >= watermark.max(1)
76 } else {
77 false
78 };
79 self.session_slots.configure_read_sampling(enabled);
80 }
81
82 #[inline(always)]
83 pub fn shard_id(&self) -> usize {
84 self.shard_id
85 }
86
87 #[inline(always)]
88 pub fn get_ref_no_ttl_hashed(&mut self, key_hash: u64, key: &[u8]) -> Option<&[u8]> {
89 self.map.get_ref_hashed_no_ttl(key_hash, key)
90 }
91
92 #[inline(always)]
93 pub fn set_hashed_no_ttl<K, V>(&mut self, key_hash: u64, key: K, value: V)
94 where
95 K: Into<Bytes>,
96 V: Into<Bytes>,
97 {
98 self.map.set_hashed(key_hash, key, value, None, 0);
99 self.enforce_memory_limit(0);
100 }
101
102 #[inline(always)]
103 pub fn set_slice_hashed_no_ttl(&mut self, key_hash: u64, key: &[u8], value: &[u8]) {
104 self.map.set_slice_hashed(key_hash, key, value, None, 0);
105 self.enforce_memory_limit(0);
106 }
107
108 #[inline(always)]
109 pub fn get_session_ref_hashed_no_ttl(
110 &mut self,
111 session_prefix: &[u8],
112 key_hash: u64,
113 key: &[u8],
114 ) -> Option<&[u8]> {
115 self.session_slots
116 .get_ref_hashed(session_prefix, key_hash, key)
117 }
118
119 #[inline(always)]
120 pub fn get_ref_hashed_session_or_flat(
121 &mut self,
122 key_hash: u64,
123 key: &[u8],
124 now_ms: u64,
125 ) -> Option<&[u8]> {
126 if let Some(session_prefix) = derived_session_storage_prefix(key)
127 && let Some(value) = self
128 .session_slots
129 .get_ref_hashed(&session_prefix, key_hash, key)
130 {
131 return Some(value);
132 }
133
134 self.map.get_ref_hashed(key_hash, key, now_ms)
135 }
136
137 #[inline(always)]
138 pub fn get_ref_hashed_published_session_or_flat(
139 &mut self,
140 key_hash: u64,
141 key: &[u8],
142 now_ms: u64,
143 ) -> Option<&[u8]> {
144 if let Some(session_prefix) = derived_session_storage_prefix(key)
145 && self.session_slots.has_session(&session_prefix)
146 {
147 return self
148 .session_slots
149 .get_ref_hashed(&session_prefix, key_hash, key);
150 }
151
152 self.map.get_ref_hashed(key_hash, key, now_ms)
153 }
154
155 #[inline(always)]
156 pub fn get_ref_hashed_active_session_or_flat(
157 &mut self,
158 active_session_prefix: Option<&[u8]>,
159 key_hash: u64,
160 key: &[u8],
161 now_ms: u64,
162 ) -> Option<&[u8]> {
163 match active_session_prefix {
164 Some(session_prefix) => {
165 self.session_slots
166 .get_ref_hashed(session_prefix, key_hash, key)
167 }
168 None => self.map.get_ref_hashed(key_hash, key, now_ms),
169 }
170 }
171
172 #[inline(always)]
173 pub fn get_ref_hashed_active_session_or_no_ttl_flat(
174 &mut self,
175 active_session_prefix: Option<&[u8]>,
176 key_hash: u64,
177 key: &[u8],
178 ) -> Option<&[u8]> {
179 match active_session_prefix {
180 Some(session_prefix) => {
181 self.get_session_ref_hashed_no_ttl(session_prefix, key_hash, key)
182 }
183 None => self.get_ref_no_ttl_hashed(key_hash, key),
184 }
185 }
186
187 #[inline(always)]
188 pub fn set_session_slice_hashed_no_ttl(
189 &mut self,
190 session_prefix: &[u8],
191 key_hash: u64,
192 key: &[u8],
193 value: &[u8],
194 ) {
195 self.map.delete_hashed(key_hash, key, 0);
196 self.session_slots
197 .set_slice_hashed(session_prefix, key_hash, key, value);
198 self.enforce_memory_limit(0);
199 }
200
201 fn stored_bytes(&self) -> usize {
202 self.map
203 .stored_bytes()
204 .saturating_add(self.session_slots.stored_bytes())
205 }
206
207 #[inline(always)]
208 fn exceeds_memory_limit(&self) -> bool {
209 self.memory_limit_bytes
210 .is_some_and(|limit| self.stored_bytes() > limit)
211 }
212
213 #[inline(always)]
214 fn enforce_memory_limit(&mut self, now_ms: u64) {
215 self.update_lazy_read_sampling();
216 let Some(limit) = self.memory_limit_bytes else {
217 return;
218 };
219 if self.stored_bytes() <= limit {
220 return;
221 }
222
223 self.map.process_maintenance(now_ms);
224 if self.session_slots.is_empty() {
225 self.map.evict_to_memory_target(
226 self.eviction_policy,
227 now_ms,
228 FlatMap::eviction_target_bytes(limit),
229 );
230 self.update_lazy_read_sampling();
231 return;
232 }
233 while self.stored_bytes() > limit {
234 let map_candidate = self.map.eviction_candidate(self.eviction_policy);
235 let session_candidate = self.session_slots.eviction_candidate(self.eviction_policy);
236 let evicted = match (map_candidate, session_candidate) {
237 (Some((map_rank, _, _)), Some((session_rank, _, _, _))) => {
238 if session_rank < map_rank {
239 self.session_slots.evict_with_policy(self.eviction_policy)
240 } else {
241 self.map.evict_with_policy(self.eviction_policy, now_ms)
242 }
243 }
244 (Some((_rank, _, _)), None) => {
245 self.map.evict_with_policy(self.eviction_policy, now_ms)
246 }
247 (None, Some((_rank, _, _, _))) => {
248 self.session_slots.evict_with_policy(self.eviction_policy)
249 }
250 (None, None) => false,
251 };
252 if !evicted {
253 break;
254 }
255 }
256 self.update_lazy_read_sampling();
257 }
258}
259
260#[derive(Debug)]
265pub struct OwnedEmbeddedWorkerShards {
266 route_mode: EmbeddedRouteMode,
267 shard_count: usize,
268 shift: u32,
269 shard_lookup: Vec<usize>,
270 shards: Vec<OwnedEmbeddedShard>,
271 #[cfg(feature = "telemetry")]
272 metrics: Option<Arc<CacheTelemetry>>,
273}
274
275impl OwnedEmbeddedWorkerShards {
276 #[cfg(feature = "embedded")]
277 pub(crate) fn local_tier_stats_snapshot(
278 &self,
279 ) -> (TierStatsSnapshot, TierStatsSnapshot, TierStatsSnapshot) {
280 let mut hot = TierStatsSnapshot {
281 name: "hot",
282 ..TierStatsSnapshot::default()
283 };
284 let mut warm = TierStatsSnapshot {
285 name: "warm",
286 ..TierStatsSnapshot::default()
287 };
288 let mut cold = TierStatsSnapshot {
289 name: "cold",
290 ..TierStatsSnapshot::default()
291 };
292
293 for shard in &self.shards {
294 let (shard_hot, shard_warm, shard_cold) = shard.map.stats_snapshot();
295 accumulate_tier_stats(&mut hot, &shard_hot);
296 accumulate_tier_stats(&mut warm, &shard_warm);
297 accumulate_tier_stats(&mut cold, &shard_cold);
298 }
299
300 (hot, warm, cold)
301 }
302
303 fn new(
304 route_mode: EmbeddedRouteMode,
305 shard_count: usize,
306 shards: Vec<OwnedEmbeddedShard>,
307 #[cfg(feature = "telemetry")] metrics: Option<Arc<CacheTelemetry>>,
308 ) -> Self {
309 assert_valid_shard_count(shard_count);
310 let mut shard_lookup = vec![usize::MAX; shard_count];
311 for (index, shard) in shards.iter().enumerate() {
312 shard_lookup[shard.shard_id()] = index;
313 }
314 Self {
315 route_mode,
316 shard_count,
317 shift: shift_for(shard_count),
318 shard_lookup,
319 shards,
320 #[cfg(feature = "telemetry")]
321 metrics,
322 }
323 }
324
325 #[inline(always)]
326 pub fn shard_count(&self) -> usize {
327 self.shard_count
328 }
329
330 #[inline(always)]
331 pub fn route_mode(&self) -> EmbeddedRouteMode {
332 self.route_mode
333 }
334
335 #[inline(always)]
336 pub fn worker_shard_count(&self) -> usize {
337 self.shards.len()
338 }
339
340 #[inline(always)]
341 pub fn owns_shard(&self, shard_id: usize) -> bool {
342 self.shard_lookup
343 .get(shard_id)
344 .copied()
345 .is_some_and(|index| index != usize::MAX)
346 }
347
348 #[inline(always)]
349 pub fn route_key(&self, key: &[u8]) -> EmbeddedKeyRoute {
350 compute_key_route(self.route_mode, self.shift, key)
351 }
352
353 #[inline(always)]
354 pub fn prepare_point_key(&self, key: &[u8]) -> PreparedPointKey {
355 let route = self.route_key(key);
356 PreparedPointKey {
357 route,
358 key_len: key.len(),
359 key_tag: hash_key_tag_from_hash(route.key_hash),
360 key: key.to_vec(),
361 }
362 }
363
364 #[inline(always)]
365 pub fn route_session(&self, session_prefix: &[u8]) -> EmbeddedSessionRoute {
366 EmbeddedSessionRoute {
367 shard_id: compute_session_shard(self.shift, session_prefix),
368 }
369 }
370
371 #[inline(always)]
372 pub fn begin_read_session(&mut self) -> OwnedEmbeddedWorkerReadSession<'_> {
373 OwnedEmbeddedWorkerReadSession {
374 opened: vec![false; self.shards.len()],
375 opened_indices: Vec::with_capacity(self.shards.len()),
376 worker: self,
377 }
378 }
379
380 #[inline(always)]
381 pub fn get_ref_no_ttl_routed(&mut self, route: EmbeddedKeyRoute, key: &[u8]) -> Option<&[u8]> {
382 self.shard_for_route_mut(route.shard_id)
383 .get_ref_no_ttl_hashed(route.key_hash, key)
384 }
385
386 #[inline(always)]
387 pub fn get_prepared_ref_no_ttl(&mut self, prepared: &PreparedPointKey) -> Option<&[u8]> {
388 self.shard_for_route_mut(prepared.route().shard_id)
389 .map
390 .get_ref_hashed_prepared_no_ttl(
391 prepared.route().key_hash,
392 prepared.key(),
393 prepared.key_tag(),
394 )
395 }
396
397 #[cfg(feature = "embedded")]
398 pub(crate) fn local_get_slice(&mut self, key: &[u8]) -> Option<EmbeddedReadSlice> {
399 let route = self.route_key(key);
400 self.local_get_slice_routed(route, key)
401 }
402
403 #[cfg(feature = "embedded")]
404 #[inline(always)]
405 pub(crate) fn local_get_slice_routed_no_ttl(
406 &mut self,
407 route: EmbeddedKeyRoute,
408 key: &[u8],
409 ) -> Option<EmbeddedReadSlice> {
410 let shard = self.shard_for_route_mut(route.shard_id);
411 let value = if let Some(session_prefix) = derived_session_storage_prefix(key) {
412 if shard.session_slots.has_session(&session_prefix) {
413 shard
414 .session_slots
415 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
416 } else {
417 shard.map.get_ref_hashed_no_ttl(route.key_hash, key)
418 }
419 } else {
420 shard.map.get_ref_hashed_no_ttl(route.key_hash, key)
421 };
422 value.map(EmbeddedReadSlice::from_slice)
423 }
424
425 #[cfg(feature = "embedded")]
426 #[inline(always)]
427 pub(crate) fn local_get_slice_routed(
428 &mut self,
429 route: EmbeddedKeyRoute,
430 key: &[u8],
431 ) -> Option<EmbeddedReadSlice> {
432 let now_ms = now_millis();
433 let shard = self.shard_for_route_mut(route.shard_id);
434 let value = if let Some(session_prefix) = derived_session_storage_prefix(key) {
435 if shard.session_slots.has_session(&session_prefix) {
436 shard
437 .session_slots
438 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
439 } else {
440 shard.map.get_ref_hashed_local(route.key_hash, key, now_ms)
441 }
442 } else {
443 shard.map.get_ref_hashed_local(route.key_hash, key, now_ms)
444 };
445 value.map(EmbeddedReadSlice::from_slice)
446 }
447
448 #[cfg(feature = "embedded")]
449 #[inline(always)]
450 pub(crate) fn local_get_point_ref_routed_no_ttl(
451 &mut self,
452 route: EmbeddedKeyRoute,
453 key: &[u8],
454 ) -> Option<&[u8]> {
455 self.shard_for_route_mut(route.shard_id)
456 .map
457 .get_ref_hashed_no_ttl(route.key_hash, key)
458 }
459
460 #[cfg(feature = "embedded")]
461 #[inline(always)]
462 pub(crate) fn local_get_point_ref_prepared_routed_no_ttl(
463 &mut self,
464 prepared: &PreparedPointKey,
465 ) -> Option<&[u8]> {
466 self.shard_for_route_mut(prepared.route().shard_id)
467 .map
468 .get_ref_hashed_prepared_no_ttl(
469 prepared.route().key_hash,
470 prepared.key(),
471 prepared.key_tag(),
472 )
473 }
474
475 #[cfg(feature = "embedded")]
476 #[inline(always)]
477 pub(crate) fn local_get_ref_routed_no_ttl(
478 &mut self,
479 route: EmbeddedKeyRoute,
480 key: &[u8],
481 ) -> Option<&[u8]> {
482 let shard = self.shard_for_route_mut(route.shard_id);
483 if let Some(session_prefix) = derived_session_storage_prefix(key)
484 && shard.session_slots.has_session(&session_prefix)
485 {
486 return shard
487 .session_slots
488 .get_ref_hashed_local(&session_prefix, route.key_hash, key);
489 }
490 shard.map.get_ref_hashed_no_ttl(route.key_hash, key)
491 }
492
493 #[cfg(feature = "embedded")]
494 pub(crate) fn local_batch_get_slices(
495 &mut self,
496 keys: &[Bytes],
497 ) -> Vec<Option<EmbeddedReadSlice>> {
498 keys.iter()
499 .map(|key| self.local_get_slice(key))
500 .collect::<Vec<_>>()
501 }
502
503 #[cfg(feature = "embedded")]
504 pub(crate) fn local_batch_get_session_slices_prehashed(
505 &mut self,
506 session_prefix: &[u8],
507 keys: &[Bytes],
508 key_hashes: &[u64],
509 ) -> Vec<Option<EmbeddedReadSlice>> {
510 assert_eq!(
511 keys.len(),
512 key_hashes.len(),
513 "keys and key_hashes must have matching lengths",
514 );
515 let route = self.route_session(session_prefix);
516 let shard = self.shard_for_route_mut(route.shard_id);
517 let now_ms = now_millis();
518 let active_session_prefix = shard
519 .session_slots
520 .has_session(session_prefix)
521 .then_some(session_prefix);
522 keys.iter()
523 .zip(key_hashes.iter().copied())
524 .map(|(key, key_hash)| {
525 let value = match active_session_prefix {
526 Some(session_prefix) => {
527 shard
528 .session_slots
529 .get_ref_hashed_local(session_prefix, key_hash, key)
530 }
531 None => shard.map.get_ref_hashed_local(key_hash, key, now_ms),
532 };
533 value.map(EmbeddedReadSlice::from_slice)
534 })
535 .collect::<Vec<_>>()
536 }
537
538 #[cfg(feature = "embedded")]
539 #[inline(always)]
540 pub(crate) fn local_get_session_ref_hashed_no_ttl<'a>(
541 &'a mut self,
542 session_prefix: &[u8],
543 key_hash: u64,
544 key: &[u8],
545 ) -> Option<&'a [u8]> {
546 let route = self.route_session(session_prefix);
547 let shard = self.shard_for_route_mut(route.shard_id);
548 if shard.session_slots.has_session(session_prefix) {
549 return shard
550 .session_slots
551 .get_ref_hashed_local(session_prefix, key_hash, key);
552 }
553 shard.map.get_ref_hashed_local(key_hash, key, now_millis())
554 }
555
556 #[cfg(feature = "embedded")]
557 pub(crate) fn local_get(&mut self, key: &[u8]) -> Option<Bytes> {
558 let route = self.route_key(key);
559 let now_ms = now_millis();
560 let shard = self.shard_for_route_mut(route.shard_id);
561 if let Some(session_prefix) = derived_session_storage_prefix(key)
562 && let Some(value) =
563 shard
564 .session_slots
565 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
566 {
567 return Some(value.to_vec());
568 }
569 shard
570 .map
571 .get_ref_hashed_local(route.key_hash, key, now_ms)
572 .map(<[u8]>::to_vec)
573 }
574
575 #[cfg(feature = "embedded")]
576 pub(crate) fn local_set(&mut self, key: Bytes, value: Bytes, ttl_ms: Option<u64>) {
577 let now_ms = now_millis();
578 let route = self.route_key(&key);
579 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
580 let shard = self.shard_for_route_mut(route.shard_id);
581 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
582 shard
583 .session_slots
584 .delete_hashed_local(&session_prefix, route.key_hash, &key);
585 }
586 shard
587 .map
588 .set_hashed_local(route.key_hash, key, value, expire_at_ms, now_ms);
589 shard.enforce_memory_limit(now_ms);
590 }
591
592 #[cfg(feature = "embedded")]
593 pub(crate) fn local_set_slice_no_ttl(
594 &mut self,
595 route: EmbeddedKeyRoute,
596 key: &[u8],
597 value: &[u8],
598 ) {
599 self.local_set_slice_tagged_no_ttl(
600 route,
601 hash_key_tag_from_hash(route.key_hash),
602 key,
603 value,
604 );
605 }
606
607 #[cfg(feature = "embedded")]
608 #[inline(always)]
609 pub(crate) fn local_set_slice(
610 &mut self,
611 route: EmbeddedKeyRoute,
612 key: &[u8],
613 value: &[u8],
614 ttl_ms: Option<u64>,
615 ) {
616 match ttl_ms {
617 None => self.local_set_slice_no_ttl(route, key, value),
618 Some(ttl_ms) => {
619 let now_ms = now_millis();
620 let expire_at_ms = Some(now_ms.saturating_add(ttl_ms));
621 let shard = self.shard_for_route_mut(route.shard_id);
622 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
623 shard
624 .session_slots
625 .delete_hashed_local(&session_prefix, route.key_hash, key);
626 }
627 shard
628 .map
629 .set_slice_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
630 shard.enforce_memory_limit(now_ms);
631 }
632 }
633 }
634
635 #[cfg(feature = "embedded")]
636 pub(crate) fn local_set_prepared_slice_no_ttl(
637 &mut self,
638 prepared: &PreparedPointKey,
639 value: &[u8],
640 ) {
641 if self.route_mode == EmbeddedRouteMode::FullKey {
642 let route = prepared.route();
643 let shard = self.shard_for_route_mut(route.shard_id);
644 shard.map.set_slice_hashed_tagged_no_ttl_local(
645 route.key_hash,
646 prepared.key_tag(),
647 prepared.key(),
648 value,
649 );
650 if shard.exceeds_memory_limit() {
651 shard.enforce_memory_limit(0);
652 }
653 return;
654 }
655
656 self.local_set_slice_tagged_no_ttl(
657 prepared.route(),
658 prepared.key_tag(),
659 prepared.key(),
660 value,
661 );
662 }
663
664 #[cfg(feature = "embedded")]
665 pub(crate) fn local_set_slice_tagged_no_ttl(
666 &mut self,
667 route: EmbeddedKeyRoute,
668 key_tag: u64,
669 key: &[u8],
670 value: &[u8],
671 ) {
672 let shard = self.shard_for_route_mut(route.shard_id);
673 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
674 shard
675 .session_slots
676 .delete_hashed_local(&session_prefix, route.key_hash, key);
677 }
678 shard
679 .map
680 .set_slice_hashed_tagged_no_ttl_local(route.key_hash, key_tag, key, value);
681 if shard.exceeds_memory_limit() {
682 shard.enforce_memory_limit(0);
683 }
684 }
685
686 #[cfg(feature = "embedded")]
687 pub(crate) fn local_batch_set(&mut self, items: Vec<(Bytes, Bytes)>, ttl_ms: Option<u64>) {
688 let now_ms = now_millis();
689 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
690 for (key, value) in items {
691 let route = self.route_key(&key);
692 let shard = self.shard_for_route_mut(route.shard_id);
693 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
694 shard
695 .session_slots
696 .delete_hashed_local(&session_prefix, route.key_hash, &key);
697 }
698 shard
699 .map
700 .set_hashed_local(route.key_hash, key, value, expire_at_ms, now_ms);
701 shard.enforce_memory_limit(now_ms);
702 }
703 }
704
705 #[cfg(feature = "embedded")]
706 pub(crate) fn local_batch_set_session_owned_no_ttl(
707 &mut self,
708 session_prefix: Bytes,
709 items: Vec<(Bytes, Bytes)>,
710 ) {
711 self.local_batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
712 session_prefix,
713 items,
714 ));
715 }
716
717 #[cfg(feature = "embedded")]
718 pub(crate) fn local_batch_set_session_packed_no_ttl(&mut self, packed: PackedSessionWrite) {
719 if packed.item_count() == 0 {
720 return;
721 }
722 let route = self.route_session(packed.session_prefix());
723 let shard = self.shard_for_route_mut(route.shard_id);
724 for entry in packed.slab.entries.iter() {
725 shard.map.delete_hashed_local(entry.hash, &entry.key, 0);
726 }
727 shard.session_slots.replace_session_slab_local(packed);
728 shard.enforce_memory_limit(0);
729 }
730
731 #[cfg(feature = "embedded")]
732 pub(crate) fn local_delete(&mut self, key: &[u8]) -> bool {
733 let route = self.route_key(key);
734 let shard = self.shard_for_route_mut(route.shard_id);
735 let deleted_session = derived_session_storage_prefix(key).is_some_and(|session_prefix| {
736 shard
737 .session_slots
738 .delete_hashed_local(&session_prefix, route.key_hash, key)
739 });
740 let deleted_map = shard.map.delete_hashed_local(route.key_hash, key, 0);
741 deleted_session || deleted_map
742 }
743
744 #[cfg(feature = "embedded")]
745 pub(crate) fn local_exists(&mut self, key: &[u8]) -> bool {
746 self.local_get(key).is_some()
747 }
748
749 #[cfg(feature = "embedded")]
750 pub(crate) fn local_ttl_seconds(&mut self, key: &[u8]) -> i64 {
751 let route = self.route_key(key);
752 let now_ms = now_millis();
753 let shard = self.shard_for_route_mut(route.shard_id);
754 if let Some(session_prefix) = derived_session_storage_prefix(key)
755 && shard
756 .session_slots
757 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
758 .is_some()
759 {
760 return -1;
761 }
762 shard.map.ttl_seconds(key, now_ms)
763 }
764
765 #[cfg(feature = "embedded")]
766 pub(crate) fn local_pttl_millis(&mut self, key: &[u8]) -> i64 {
767 let route = self.route_key(key);
768 let now_ms = now_millis();
769 let shard = self.shard_for_route_mut(route.shard_id);
770 if let Some(session_prefix) = derived_session_storage_prefix(key)
771 && shard
772 .session_slots
773 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
774 .is_some()
775 {
776 return -1;
777 }
778 shard.map.ttl_millis(key, now_ms)
779 }
780
781 #[cfg(feature = "embedded")]
782 pub(crate) fn local_expire(&mut self, key: &[u8], expire_at_ms: u64) -> bool {
783 let route = self.route_key(key);
784 let now_ms = now_millis();
785 let shard = self.shard_for_route_mut(route.shard_id);
786 if let Some(session_prefix) = derived_session_storage_prefix(key)
787 && shard
788 .session_slots
789 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
790 .is_some()
791 {
792 return false;
793 }
794 shard.map.expire(key, expire_at_ms, now_ms)
795 }
796
797 #[cfg(feature = "embedded")]
798 pub(crate) fn local_persist(&mut self, key: &[u8]) -> bool {
799 let route = self.route_key(key);
800 let now_ms = now_millis();
801 let shard = self.shard_for_route_mut(route.shard_id);
802 if let Some(session_prefix) = derived_session_storage_prefix(key)
803 && shard
804 .session_slots
805 .get_ref_hashed_local(&session_prefix, route.key_hash, key)
806 .is_some()
807 {
808 return false;
809 }
810 shard.map.persist(key, now_ms)
811 }
812
813 #[inline(always)]
814 pub fn get_view_routed_no_ttl(
815 &mut self,
816 route: EmbeddedKeyRoute,
817 key: &[u8],
818 ) -> OwnedEmbeddedReadView {
819 let shard = self.shard_for_route_mut(route.shard_id);
820 let item = shard
821 .get_ref_no_ttl_hashed(route.key_hash, key)
822 .map(EmbeddedReadSlice::from_slice);
823 OwnedEmbeddedReadView { item }
824 }
825
826 #[inline(always)]
827 pub fn set_slice_routed_no_ttl(&mut self, route: EmbeddedKeyRoute, key: &[u8], value: &[u8]) {
828 self.shard_for_route_mut(route.shard_id)
829 .set_slice_hashed_no_ttl(route.key_hash, key, value);
830 }
831
832 pub fn batch_set_session_slices_no_ttl<I, K, V>(&mut self, session_prefix: &[u8], items: I)
833 where
834 I: IntoIterator<Item = (K, V)>,
835 K: AsRef<[u8]>,
836 V: AsRef<[u8]>,
837 {
838 let route = self.route_session(session_prefix);
839 let shard = self.shard_for_route_mut(route.shard_id);
840 for (key, value) in items {
841 let key = key.as_ref();
842 shard.set_session_slice_hashed_no_ttl(
843 session_prefix,
844 hash_key(key),
845 key,
846 value.as_ref(),
847 );
848 }
849 }
850
851 pub fn batch_set_session_packed_no_ttl(&mut self, packed: PackedSessionWrite) {
852 if packed.item_count() == 0 {
853 return;
854 }
855 let route = self.route_session(&packed.session_prefix);
856 let shard = self.shard_for_route_mut(route.shard_id);
857 for entry in packed.slab.entries.iter() {
858 shard.map.delete_hashed(entry.hash, &entry.key, 0);
859 }
860 shard.session_slots.replace_session_slab(packed);
861 shard.enforce_memory_limit(0);
862 }
863
864 pub fn batch_get_session_view_no_ttl(
865 &mut self,
866 session_prefix: &[u8],
867 keys: &[Bytes],
868 ) -> OwnedEmbeddedSessionBatchView {
869 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
870 self.batch_get_session_view_prehashed_no_ttl(session_prefix, keys, &key_hashes)
871 }
872
873 pub fn batch_get_session_view_prehashed_no_ttl(
874 &mut self,
875 session_prefix: &[u8],
876 keys: &[Bytes],
877 key_hashes: &[u64],
878 ) -> OwnedEmbeddedSessionBatchView {
879 assert_eq!(
880 keys.len(),
881 key_hashes.len(),
882 "keys and key_hashes must have matching lengths",
883 );
884 if keys.is_empty() {
885 return OwnedEmbeddedBatchReadView {
886 items: Vec::new(),
887 hit_count: 0,
888 total_bytes: 0,
889 };
890 }
891
892 let route = self.route_session(session_prefix);
893 let shard = self.shard_for_route_mut(route.shard_id);
894 let mut view = BatchReadViewBuilder::new(keys.len());
895 for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
896 view.push(shard.get_session_ref_hashed_no_ttl(session_prefix, key_hash, key));
897 }
898
899 view.finish_owned()
900 }
901
902 pub fn batch_get_session_view_routed_no_ttl(
903 &mut self,
904 route: EmbeddedSessionRoute,
905 keys: &[Bytes],
906 ) -> OwnedEmbeddedSessionBatchView {
907 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
908 self.batch_get_session_view_prehashed_routed_no_ttl(route, keys, &key_hashes)
909 }
910
911 pub fn batch_get_session_view_prehashed_routed_no_ttl(
912 &mut self,
913 route: EmbeddedSessionRoute,
914 keys: &[Bytes],
915 key_hashes: &[u64],
916 ) -> OwnedEmbeddedSessionBatchView {
917 assert_eq!(
918 keys.len(),
919 key_hashes.len(),
920 "keys and key_hashes must have matching lengths",
921 );
922 if keys.is_empty() {
923 return OwnedEmbeddedBatchReadView {
924 items: Vec::new(),
925 hit_count: 0,
926 total_bytes: 0,
927 };
928 }
929
930 let shard = self.shard_for_route_mut(route.shard_id);
931 let session_prefix = batch_derived_session_storage_prefix(keys);
932 let active_session_prefix = session_prefix
933 .as_ref()
934 .filter(|prefix| shard.session_slots.has_session(prefix))
935 .map(Vec::as_slice);
936
937 let mut view = BatchReadViewBuilder::new(keys.len());
938 for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
939 view.push(shard.get_ref_hashed_active_session_or_no_ttl_flat(
940 active_session_prefix,
941 key_hash,
942 key,
943 ));
944 }
945
946 view.finish_owned()
947 }
948
949 pub fn batch_get_session_packed_no_ttl(
950 &mut self,
951 session_prefix: &[u8],
952 keys: &[Bytes],
953 ) -> PackedBatch {
954 if keys.is_empty() {
955 return PackedBatch::default();
956 }
957
958 let route = self.route_session(session_prefix);
959 let shard = self.shard_for_route_mut(route.shard_id);
960 let mut packed = PackedBatchBuilder::new(keys.len());
961 for key in keys {
962 let key_hash = hash_key(key);
963 packed.push(shard.get_session_ref_hashed_no_ttl(session_prefix, key_hash, key));
964 }
965 packed.finish()
966 }
967
968 pub fn batch_get_session_packed_routed_no_ttl(
969 &mut self,
970 route: EmbeddedSessionRoute,
971 keys: &[Bytes],
972 ) -> PackedBatch {
973 if keys.is_empty() {
974 return PackedBatch::default();
975 }
976
977 let shard = self.shard_for_route_mut(route.shard_id);
978 let session_prefix = batch_derived_session_storage_prefix(keys);
979 let active_session_prefix = session_prefix
980 .as_ref()
981 .filter(|prefix| shard.session_slots.has_session(prefix))
982 .map(Vec::as_slice);
983 let mut packed = PackedBatchBuilder::new(keys.len());
984 for key in keys {
985 let key_hash = hash_key(key);
986 packed.push(shard.get_ref_hashed_active_session_or_no_ttl_flat(
987 active_session_prefix,
988 key_hash,
989 key,
990 ));
991 }
992 packed.finish()
993 }
994
995 pub fn len(&self) -> usize {
996 self.shards
997 .iter()
998 .map(|shard| shard.map.len().saturating_add(shard.session_slots.len()))
999 .sum()
1000 }
1001
1002 pub fn is_empty(&self) -> bool {
1003 self.len() == 0
1004 }
1005
1006 pub fn process_maintenance(&mut self) -> usize {
1007 let now_ms = now_millis();
1008 self.shards
1009 .iter_mut()
1010 .map(|shard| shard.map.process_maintenance(now_ms))
1011 .sum()
1012 }
1013
1014 pub fn restore_entries<I>(&mut self, entries: I)
1015 where
1016 I: IntoIterator<Item = StoredEntry>,
1017 {
1018 let now_ms = now_millis();
1019 for entry in entries {
1020 if entry
1021 .expire_at_ms
1022 .is_some_and(|expire_at_ms| expire_at_ms <= now_ms)
1023 {
1024 continue;
1025 }
1026 let route = self.route_key(&entry.key);
1027 let shard = self.shard_for_route_mut(route.shard_id);
1028 if let Some(session_prefix) = point_write_session_storage_prefix(&entry.key) {
1029 shard
1030 .session_slots
1031 .delete_hashed(&session_prefix, route.key_hash, &entry.key);
1032 }
1033 shard.map.set_hashed(
1034 route.key_hash,
1035 entry.key,
1036 entry.value,
1037 entry.expire_at_ms,
1038 now_ms,
1039 );
1040 shard.enforce_memory_limit(now_ms);
1041 }
1042 }
1043
1044 pub fn get(&mut self, key: &[u8]) -> Option<Bytes> {
1045 let now_ms = now_millis();
1046 let route = self.route_key(key);
1047 let shard = self.shard_for_route_mut(route.shard_id);
1048 if let Some(session_prefix) = derived_session_storage_prefix(key)
1049 && let Some(value) =
1050 shard
1051 .session_slots
1052 .get_ref_hashed(&session_prefix, route.key_hash, key)
1053 {
1054 return Some(value.to_vec());
1055 }
1056 shard
1057 .map
1058 .get_ref_hashed(route.key_hash, key, now_ms)
1059 .map(<[u8]>::to_vec)
1060 }
1061
1062 pub fn get_view(&mut self, key: &[u8]) -> OwnedEmbeddedReadView {
1063 let route = self.route_key(key);
1064 let now_ms = now_millis();
1065 let shard = self.shard_for_route_mut(route.shard_id);
1066 let item = if let Some(session_prefix) = derived_session_storage_prefix(key) {
1067 if shard.session_slots.has_session(&session_prefix) {
1068 shard
1069 .session_slots
1070 .get_ref_hashed(&session_prefix, route.key_hash, key)
1071 .map(EmbeddedReadSlice::from_slice)
1072 } else {
1073 shard
1074 .map
1075 .get_ref_hashed(route.key_hash, key, now_ms)
1076 .map(EmbeddedReadSlice::from_slice)
1077 }
1078 } else {
1079 shard
1080 .map
1081 .get_ref_hashed(route.key_hash, key, now_ms)
1082 .map(EmbeddedReadSlice::from_slice)
1083 };
1084 OwnedEmbeddedReadView { item }
1085 }
1086
1087 pub fn batch_get(&mut self, keys: Vec<Bytes>) -> Vec<Option<Bytes>> {
1088 let total = keys.len();
1089 if total == 0 {
1090 return Vec::new();
1091 }
1092
1093 #[cfg(feature = "telemetry")]
1094 let start = self.metrics.as_ref().map(|_| Instant::now());
1095 let now_ms = now_millis();
1096 let mut values = vec![None; total];
1097 let mut groups = vec![Vec::<(usize, Bytes, u64)>::new(); self.shards.len()];
1098 let mut touched = Vec::new();
1099
1100 for (index, key) in keys.into_iter().enumerate() {
1101 let route = self.route_key(&key);
1102 let local_index = self
1103 .shard_lookup
1104 .get(route.shard_id)
1105 .copied()
1106 .filter(|idx| *idx != usize::MAX)
1107 .expect("routed key does not belong to this owned worker");
1108 if groups[local_index].is_empty() {
1109 touched.push(route.shard_id);
1110 }
1111 groups[local_index].push((index, key, route.key_hash));
1112 }
1113
1114 for (local_index, batch) in groups.into_iter().enumerate() {
1115 if batch.is_empty() {
1116 continue;
1117 }
1118 let shard = &mut self.shards[local_index];
1119 for (index, key, key_hash) in batch {
1120 values[index] = shard
1121 .get_ref_hashed_session_or_flat(key_hash, &key, now_ms)
1122 .map(<[u8]>::to_vec);
1123 }
1124 }
1125
1126 #[cfg(feature = "telemetry")]
1127 self.record_batch_metrics(start, &touched);
1128 values
1129 }
1130
1131 pub fn batch_get_view(&mut self, keys: &[Bytes]) -> OwnedEmbeddedBatchReadView {
1132 let total = keys.len();
1133 if total == 0 {
1134 return OwnedEmbeddedBatchReadView {
1135 items: Vec::new(),
1136 hit_count: 0,
1137 total_bytes: 0,
1138 };
1139 }
1140
1141 #[cfg(feature = "telemetry")]
1142 let start = self.metrics.as_ref().map(|_| Instant::now());
1143 let now_ms = now_millis();
1144
1145 let mut groups = vec![Vec::<(usize, &Bytes, u64, usize)>::new(); self.shards.len()];
1146 let mut touched = Vec::new();
1147 for (index, key) in keys.iter().enumerate() {
1148 let route = self.route_key(key);
1149 let local_index = self
1150 .shard_lookup
1151 .get(route.shard_id)
1152 .copied()
1153 .filter(|idx| *idx != usize::MAX)
1154 .expect("routed key does not belong to this owned worker");
1155 if groups[local_index].is_empty() {
1156 touched.push(route.shard_id);
1157 }
1158 groups[local_index].push((index, key, route.key_hash, route.shard_id));
1159 }
1160
1161 let mut view = OrderedBatchReadViewBuilder::new(total);
1162
1163 for (local_index, batch) in groups.into_iter().enumerate() {
1164 if batch.is_empty() {
1165 continue;
1166 }
1167 let shard = &mut self.shards[local_index];
1168 for (index, key, key_hash, _shard_id) in batch {
1169 if let Some(value) =
1170 shard.get_ref_hashed_published_session_or_flat(key_hash, key, now_ms)
1171 {
1172 view.record_hit(index, value);
1173 }
1174 }
1175 }
1176
1177 #[cfg(feature = "telemetry")]
1178 self.record_batch_metrics(start, &touched);
1179 view.finish_owned()
1180 }
1181
1182 pub fn batch_get_session_view(
1183 &mut self,
1184 session_prefix: &[u8],
1185 keys: &[Bytes],
1186 ) -> OwnedEmbeddedSessionBatchView {
1187 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
1188 self.batch_get_session_view_prehashed(session_prefix, keys, &key_hashes)
1189 }
1190
1191 pub fn batch_get_session_view_prehashed(
1192 &mut self,
1193 session_prefix: &[u8],
1194 keys: &[Bytes],
1195 key_hashes: &[u64],
1196 ) -> OwnedEmbeddedSessionBatchView {
1197 assert_eq!(
1198 keys.len(),
1199 key_hashes.len(),
1200 "keys and key_hashes must have matching lengths",
1201 );
1202 if keys.is_empty() {
1203 return OwnedEmbeddedBatchReadView {
1204 items: Vec::new(),
1205 hit_count: 0,
1206 total_bytes: 0,
1207 };
1208 }
1209
1210 #[cfg(feature = "telemetry")]
1211 let start = self.metrics.as_ref().map(|_| Instant::now());
1212 let now_ms = now_millis();
1213 let route = self.route_session(session_prefix);
1214 let shard = self.shard_for_route_mut(route.shard_id);
1215 let active_session_prefix = shard
1216 .session_slots
1217 .has_session(session_prefix)
1218 .then_some(session_prefix);
1219
1220 let mut view = BatchReadViewBuilder::new(keys.len());
1221 for (key, key_hash) in keys.iter().zip(key_hashes.iter().copied()) {
1222 view.push(shard.get_ref_hashed_active_session_or_flat(
1223 active_session_prefix,
1224 key_hash,
1225 key,
1226 now_ms,
1227 ));
1228 }
1229
1230 #[cfg(feature = "telemetry")]
1231 self.record_batch_metrics(start, &[route.shard_id]);
1232 view.finish_owned()
1233 }
1234
1235 pub fn batch_get_session_packed_view_prehashed(
1236 &mut self,
1237 session_prefix: &[u8],
1238 keys: &[Bytes],
1239 key_hashes: &[u64],
1240 ) -> Option<OwnedEmbeddedSessionPackedView> {
1241 assert_eq!(
1242 keys.len(),
1243 key_hashes.len(),
1244 "keys and key_hashes must have matching lengths",
1245 );
1246 if keys.is_empty() {
1247 return Some(OwnedEmbeddedSessionPackedView {
1248 buffer: EmbeddedReadSlice::from_slice(&[]),
1249 offsets: Vec::new(),
1250 lengths: Vec::new(),
1251 hit_count: 0,
1252 total_bytes: 0,
1253 });
1254 }
1255
1256 let route = self.route_session(session_prefix);
1257 let shard = self.shard_for_route_mut(route.shard_id);
1258 if !shard.session_slots.has_session(session_prefix) {
1259 return None;
1260 }
1261
1262 let meta =
1263 shard
1264 .session_slots
1265 .get_packed_view_hashed_local(session_prefix, keys, key_hashes)?;
1266
1267 Some(OwnedEmbeddedSessionPackedView {
1268 buffer: EmbeddedReadSlice { bytes: meta.buffer },
1269 offsets: meta.offsets,
1270 lengths: meta.lengths,
1271 hit_count: meta.hit_count,
1272 total_bytes: meta.total_bytes,
1273 })
1274 }
1275
1276 pub fn batch_get_session_packed_view(
1277 &mut self,
1278 session_prefix: &[u8],
1279 keys: &[Bytes],
1280 ) -> Option<OwnedEmbeddedSessionPackedView> {
1281 let key_hashes = keys.iter().map(|key| hash_key(key)).collect::<Vec<_>>();
1282 self.batch_get_session_packed_view_prehashed(session_prefix, keys, &key_hashes)
1283 }
1284
1285 pub fn batch_get_session_packed(
1286 &mut self,
1287 session_prefix: &[u8],
1288 keys: &[Bytes],
1289 ) -> PackedBatch {
1290 if keys.is_empty() {
1291 return PackedBatch::default();
1292 }
1293
1294 #[cfg(feature = "telemetry")]
1295 let start = self.metrics.as_ref().map(|_| Instant::now());
1296 let route = self.route_session(session_prefix);
1297 let now_ms = now_millis();
1298 let shard = self.shard_for_route_mut(route.shard_id);
1299 let active_session_prefix = shard
1300 .session_slots
1301 .has_session(session_prefix)
1302 .then_some(session_prefix);
1303 let mut packed = PackedBatchBuilder::new(keys.len());
1304 for key in keys {
1305 let key_hash = hash_key(key);
1306 packed.push(shard.get_ref_hashed_active_session_or_flat(
1307 active_session_prefix,
1308 key_hash,
1309 key,
1310 now_ms,
1311 ));
1312 }
1313 #[cfg(feature = "telemetry")]
1314 self.record_batch_metrics(start, &[route.shard_id]);
1315 packed.finish()
1316 }
1317
1318 pub fn batch_get_packed(&mut self, keys: &[Bytes]) -> PackedBatch {
1319 let total = keys.len();
1320 if total == 0 {
1321 return PackedBatch::default();
1322 }
1323
1324 #[cfg(feature = "telemetry")]
1325 let start = self.metrics.as_ref().map(|_| Instant::now());
1326 let now_ms = now_millis();
1327 let mut groups = vec![Vec::<(usize, &Bytes, u64, usize)>::new(); self.shards.len()];
1328 let mut touched = Vec::new();
1329 for (index, key) in keys.iter().enumerate() {
1330 let route = self.route_key(key);
1331 let local_index = self
1332 .shard_lookup
1333 .get(route.shard_id)
1334 .copied()
1335 .filter(|idx| *idx != usize::MAX)
1336 .expect("routed key does not belong to this owned worker");
1337 if groups[local_index].is_empty() {
1338 touched.push(route.shard_id);
1339 }
1340 groups[local_index].push((index, key, route.key_hash, route.shard_id));
1341 }
1342
1343 let mut packed = OrderedPackedBatchBuilder::new(total);
1344
1345 for (local_index, batch) in groups.into_iter().enumerate() {
1346 if batch.is_empty() {
1347 continue;
1348 }
1349 let shard = &mut self.shards[local_index];
1350 for (index, key, key_hash, _shard_id) in batch {
1351 if let Some(value) = shard.get_ref_hashed_session_or_flat(key_hash, key, now_ms) {
1352 packed.record_hit(index, value);
1353 }
1354 }
1355 }
1356
1357 #[cfg(feature = "telemetry")]
1358 self.record_batch_metrics(start, &touched);
1359 packed.finish()
1360 }
1361
1362 pub fn set(&mut self, key: Bytes, value: Bytes, ttl_ms: Option<u64>) {
1363 let now_ms = now_millis();
1364 let route = self.route_key(&key);
1365 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
1366 let shard = self.shard_for_route_mut(route.shard_id);
1367 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
1368 shard
1369 .session_slots
1370 .delete_hashed(&session_prefix, route.key_hash, &key);
1371 }
1372 shard
1373 .map
1374 .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
1375 shard.enforce_memory_limit(now_ms);
1376 }
1377
1378 pub fn batch_set(&mut self, items: Vec<(Bytes, Bytes)>, ttl_ms: Option<u64>) {
1379 if items.is_empty() {
1380 return;
1381 }
1382
1383 let now_ms = now_millis();
1384 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
1385 let mut groups = vec![Vec::<(Bytes, Bytes, EmbeddedKeyRoute)>::new(); self.shards.len()];
1386 for (key, value) in items {
1387 let route = self.route_key(&key);
1388 let local_index = self
1389 .shard_lookup
1390 .get(route.shard_id)
1391 .copied()
1392 .filter(|idx| *idx != usize::MAX)
1393 .expect("routed key does not belong to this owned worker");
1394 groups[local_index].push((key, value, route));
1395 }
1396
1397 for (local_index, batch) in groups.into_iter().enumerate() {
1398 if batch.is_empty() {
1399 continue;
1400 }
1401 let shard = &mut self.shards[local_index];
1402 for (key, value, route) in batch {
1403 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
1404 shard
1405 .session_slots
1406 .delete_hashed(&session_prefix, route.key_hash, &key);
1407 }
1408 shard
1409 .map
1410 .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
1411 }
1412 shard.enforce_memory_limit(now_ms);
1413 }
1414 }
1415
1416 pub fn batch_set_session_owned_no_ttl(
1417 &mut self,
1418 session_prefix: Bytes,
1419 items: Vec<(Bytes, Bytes)>,
1420 ) {
1421 if items.is_empty() {
1422 return;
1423 }
1424 self.local_batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
1425 session_prefix,
1426 items,
1427 ));
1428 }
1429
1430 pub fn delete(&mut self, key: &[u8]) -> bool {
1431 let now_ms = now_millis();
1432 let route = self.route_key(key);
1433 let shard = self.shard_for_route_mut(route.shard_id);
1434 if let Some(session_prefix) = derived_session_storage_prefix(key)
1435 && shard
1436 .session_slots
1437 .delete_hashed(&session_prefix, route.key_hash, key)
1438 {
1439 return true;
1440 }
1441 shard.map.delete_hashed(route.key_hash, key, now_ms)
1442 }
1443
1444 pub fn exists(&mut self, key: &[u8]) -> bool {
1445 self.get(key).is_some()
1446 }
1447
1448 #[cfg(feature = "telemetry")]
1449 #[inline(always)]
1450 fn record_batch_metrics(&self, start: Option<Instant>, touched_shards: &[usize]) {
1451 if let (Some(metrics), Some(start)) = (&self.metrics, start) {
1452 metrics.record_batch_get(start.elapsed().as_nanos() as u64);
1453 for &shard_id in touched_shards {
1454 metrics.record_batch_get_shard(shard_id);
1455 }
1456 }
1457 }
1458
1459 fn shard_for_route_mut(&mut self, shard_id: usize) -> &mut OwnedEmbeddedShard {
1460 let index = self
1461 .shard_lookup
1462 .get(shard_id)
1463 .copied()
1464 .filter(|index| *index != usize::MAX)
1465 .expect("routed key does not belong to this owned worker");
1466 &mut self.shards[index]
1467 }
1468}
1469
1470#[derive(Debug)]
1476pub struct OwnedEmbeddedWorkerReadSession<'a> {
1477 pub(super) worker: &'a mut OwnedEmbeddedWorkerShards,
1478 pub(super) opened: Vec<bool>,
1479 pub(super) opened_indices: Vec<usize>,
1480}
1481
1482impl<'a> OwnedEmbeddedWorkerReadSession<'a> {
1483 #[inline(always)]
1484 fn shard_for_route_mut(&mut self, shard_id: usize) -> &mut OwnedEmbeddedShard {
1485 let index = self
1486 .worker
1487 .shard_lookup
1488 .get(shard_id)
1489 .copied()
1490 .filter(|index| *index != usize::MAX)
1491 .expect("routed key does not belong to this owned worker");
1492 if !self.opened[index] {
1493 self.worker.shards[index].map.begin_read_epoch();
1494 self.opened[index] = true;
1495 self.opened_indices.push(index);
1496 }
1497 &mut self.worker.shards[index]
1498 }
1499
1500 #[inline(always)]
1501 pub fn get_ref_no_ttl_routed(&mut self, route: EmbeddedKeyRoute, key: &[u8]) -> Option<&[u8]> {
1502 self.shard_for_route_mut(route.shard_id)
1503 .get_ref_no_ttl_hashed(route.key_hash, key)
1504 }
1505}
1506
1507impl Drop for OwnedEmbeddedWorkerReadSession<'_> {
1508 fn drop(&mut self) {
1509 for index in self.opened_indices.drain(..) {
1510 self.worker.shards[index].map.end_read_epoch();
1511 }
1512 }
1513}
1514
1515impl EmbeddedStore {
1516 pub fn bind_shard(&self, shard_id: usize) -> EmbeddedShardHandle<'_> {
1517 assert!(shard_id < self.shards.len(), "invalid shard id");
1518 EmbeddedShardHandle {
1519 shard_id,
1520 shard: self.shards[shard_id].write(),
1521 }
1522 }
1523
1524 pub fn into_owned_shards(self) -> Vec<OwnedEmbeddedShard> {
1525 self.shards
1526 .into_iter()
1527 .enumerate()
1528 .map(|(shard_id, shard)| {
1529 let shard = shard.into_inner().into_inner();
1530 OwnedEmbeddedShard {
1531 shard_id,
1532 map: shard.map,
1533 session_slots: shard.session_slots,
1534 memory_limit_bytes: shard.memory_limit_bytes,
1535 eviction_policy: shard.eviction_policy,
1536 }
1537 })
1538 .collect()
1539 }
1540
1541 pub fn into_owned_workers(self, worker_count: usize) -> Vec<OwnedEmbeddedWorkerShards> {
1542 let worker_count = worker_count.max(1);
1543 let route_mode = self.route_mode;
1544 let shard_count = self.shard_count();
1545 #[cfg(feature = "telemetry")]
1546 let metrics = self.metrics.clone();
1547 let mut buckets = (0..worker_count)
1548 .map(|_| Vec::<OwnedEmbeddedShard>::new())
1549 .collect::<Vec<_>>();
1550
1551 for shard in self.into_owned_shards() {
1552 buckets[shard.shard_id() % worker_count].push(shard);
1553 }
1554
1555 buckets
1556 .into_iter()
1557 .map(|shards| {
1558 OwnedEmbeddedWorkerShards::new(
1559 route_mode,
1560 shard_count,
1561 shards,
1562 #[cfg(feature = "telemetry")]
1563 metrics.clone(),
1564 )
1565 })
1566 .collect()
1567 }
1568}