fast_cache/storage/
embedded_store.rs1use crossbeam_utils::CachePadded;
2#[cfg(not(feature = "embedded-read-biased-lock"))]
3use parking_lot::RwLock;
4#[cfg(feature = "embedded-read-biased-lock")]
5use rblock::RwLock;
6#[cfg(feature = "telemetry")]
7use std::sync::Arc;
8#[cfg(feature = "telemetry")]
9use std::time::Instant;
10
11use crate::config::EvictionPolicy;
12use crate::storage::{
13 Bytes, PackedBatch, PreparedPointKey, RedisObjectBucket, RedisObjectError,
14 RedisObjectReadOutcome, RedisObjectResult, RedisObjectStore, RedisObjectValue,
15 RedisObjectWriteAttempt, RedisStringLookup, StoredEntry, hash_key, hash_key_tag_from_hash,
16 now_millis,
17};
18#[cfg(feature = "telemetry")]
19use crate::storage::{CacheTelemetry, CacheTelemetryHandle};
20#[cfg(feature = "embedded")]
21use crate::storage::{ShardStatsSnapshot, TierStatsSnapshot};
22
23mod batch;
24mod batch_results;
25mod core;
26mod lifecycle;
27mod objects;
28mod owned;
29mod point;
30mod routing;
31mod session_slots;
32mod shard;
33mod views;
34mod write;
35
36pub use owned::{
37 EmbeddedShardHandle, OwnedEmbeddedShard, OwnedEmbeddedWorkerReadSession,
38 OwnedEmbeddedWorkerShards,
39};
40#[cfg(feature = "unsafe")]
41use routing::can_skip_session_lookup;
42pub use routing::{
43 EmbeddedKeyRoute, EmbeddedRouteMode, EmbeddedSessionRoute, shift_for, stripe_index,
44};
45pub(crate) use routing::{assert_valid_shard_count, compute_key_route, compute_session_shard};
46use routing::{
47 batch_derived_session_storage_prefix, can_route_with_key_hash, can_use_route_hash_as_key_hash,
48 derived_session_storage_prefix, point_write_session_storage_prefix, session_route_prefix,
49 uses_flat_key_storage,
50};
51pub use session_slots::PackedSessionWrite;
52pub(crate) use session_slots::SessionSlotMap;
53pub(crate) use shard::EmbeddedShard;
54pub use views::{
55 EmbeddedBatchReadView, EmbeddedReadSlice, EmbeddedReadView, EmbeddedSessionBatchView,
56 OwnedEmbeddedBatchReadView, OwnedEmbeddedReadView, OwnedEmbeddedSessionBatchView,
57 OwnedEmbeddedSessionPackedView,
58};
59
60#[derive(Debug)]
66pub struct EmbeddedStore {
67 shards: Box<[CachePadded<RwLock<EmbeddedShard>>]>,
68 shift: u32,
69 objects: RedisObjectStore,
70 route_mode: EmbeddedRouteMode,
71 #[cfg(feature = "telemetry")]
72 metrics: Option<Arc<CacheTelemetry>>,
73}
74
75#[inline(always)]
76fn reserve_batch_capacity(buffer: &mut Vec<u8>, next_len: usize, item_count: usize) {
77 if buffer.capacity() == 0 && next_len > 0 {
78 buffer.reserve(next_len.saturating_mul(item_count));
81 }
82}
83
84#[inline(always)]
85#[cfg(feature = "embedded")]
86fn accumulate_tier_stats(target: &mut TierStatsSnapshot, snapshot: &TierStatsSnapshot) {
87 target.len = target.len.saturating_add(snapshot.len);
88 target.capacity = target.capacity.saturating_add(snapshot.capacity);
89 target.hits = target.hits.saturating_add(snapshot.hits);
90 target.misses = target.misses.saturating_add(snapshot.misses);
91 target.promotions = target.promotions.saturating_add(snapshot.promotions);
92 target.demotions = target.demotions.saturating_add(snapshot.demotions);
93 target.evictions = target.evictions.saturating_add(snapshot.evictions);
94 target.expirations = target.expirations.saturating_add(snapshot.expirations);
95}
96
97#[inline(always)]
98fn write_now_ms(ttl_ms: Option<u64>, memory_limit_bytes: Option<usize>) -> u64 {
99 if ttl_ms.is_some() || memory_limit_bytes.is_some() {
100 now_millis()
101 } else {
102 0
103 }
104}
105
106#[inline(always)]
107fn write_resp_blob_string_into(out: &mut bytes::BytesMut, value: &[u8]) {
108 #[cfg(not(feature = "unsafe"))]
109 {
110 let mut buf = itoa::Buffer::new();
111 let len_str = buf.format(value.len()).as_bytes();
112 out.extend_from_slice(b"$");
113 out.extend_from_slice(len_str);
114 out.extend_from_slice(b"\r\n");
115 out.extend_from_slice(value);
116 out.extend_from_slice(b"\r\n");
117 }
118 #[cfg(feature = "unsafe")]
119 {
120 if value.len() == 64 {
121 const HEADER: &[u8] = b"$64\r\n";
122 let total = HEADER.len() + 64 + 2;
123 out.reserve(total);
124 unsafe {
126 let start = out.len();
127 let dst = out.as_mut_ptr().add(start);
128 std::ptr::copy_nonoverlapping(HEADER.as_ptr(), dst, HEADER.len());
129 std::ptr::copy_nonoverlapping(value.as_ptr(), dst.add(HEADER.len()), 64);
130 *dst.add(HEADER.len() + 64) = b'\r';
131 *dst.add(HEADER.len() + 65) = b'\n';
132 out.set_len(start + total);
133 }
134 return;
135 }
136
137 let mut buf = itoa::Buffer::new();
138 let len_str = buf.format(value.len()).as_bytes();
139 let total = 1 + len_str.len() + 2 + value.len() + 2;
140 out.reserve(total);
141 unsafe {
143 let start = out.len();
144 let dst = out.as_mut_ptr().add(start);
145 *dst = b'$';
146 let mut pos = 1usize;
147 std::ptr::copy_nonoverlapping(len_str.as_ptr(), dst.add(pos), len_str.len());
148 pos += len_str.len();
149 *dst.add(pos) = b'\r';
150 *dst.add(pos + 1) = b'\n';
151 pos += 2;
152 std::ptr::copy_nonoverlapping(value.as_ptr(), dst.add(pos), value.len());
153 pos += value.len();
154 *dst.add(pos) = b'\r';
155 *dst.add(pos + 1) = b'\n';
156 out.set_len(start + total);
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests;