fast_cache/storage/flat_map/
write_local.rs1use super::*;
2
3impl FlatMap {
4 #[cfg(feature = "embedded")]
5 #[inline(always)]
6 pub fn set_hashed_local<K, V>(
7 &mut self,
8 hash: u64,
9 key: K,
10 value: V,
11 expire_at_ms: Option<u64>,
12 now_ms: u64,
13 ) where
14 K: Into<Bytes>,
15 V: Into<Bytes>,
16 {
17 self.disable_fast_point_map();
18 #[cfg(feature = "telemetry")]
19 let start = self.telemetry.as_ref().map(|_| Instant::now());
20
21 let key = key.into();
22 let mut replacement = Some(SharedBytes::from(value.into()));
23 let access_tick = if self.eviction_policy == EvictionPolicy::None {
24 0
25 } else {
26 self.next_access_tick()
27 };
28 self.record_lru_touch(hash, access_tick);
29 #[cfg(feature = "telemetry")]
30 let written_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
31 #[cfg(feature = "telemetry")]
32 let (key_delta, memory_delta): (isize, isize);
33
34 match self
35 .entries
36 .entry(hash, |entry| entry.matches(hash, &key), |entry| entry.hash)
37 {
38 hashbrown::hash_table::Entry::Occupied(mut occupied) => {
39 let entry = occupied.get_mut();
40 let had_ttl = entry.expire_at_ms.is_some();
41 let previous_value_len = entry.value.len();
42 entry.value = replacement.take().unwrap();
43 #[cfg(feature = "telemetry")]
44 {
45 key_delta = 0isize;
46 memory_delta = entry.value.len() as isize - previous_value_len as isize;
47 }
48 entry.access.record_access(access_tick);
49 self.stored_bytes = self
50 .stored_bytes
51 .saturating_sub(previous_value_len)
52 .saturating_add(entry.value.len());
53 entry.expire_at_ms = expire_at_ms;
54 self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
55 }
56 hashbrown::hash_table::Entry::Vacant(vacant) => {
57 let key_len = key.len();
58 let value_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
59 vacant.insert(FlatEntry {
60 hash,
61 key_tag: hash_key_tag_from_hash(hash),
62 key_len,
63 key: key.into_boxed_slice(),
64 value: replacement.take().unwrap(),
65 expire_at_ms,
66 access: EntryAccessMeta {
67 last_touch: access_tick,
68 frequency: 1,
69 },
70 });
71 self.stored_bytes = self
72 .stored_bytes
73 .saturating_add(key_len)
74 .saturating_add(value_len);
75 #[cfg(feature = "telemetry")]
76 {
77 key_delta = 1isize;
78 memory_delta = (key_len + value_len) as isize;
79 }
80 if expire_at_ms.is_some() {
81 self.ttl_entries = self.ttl_entries.saturating_add(1);
82 }
83 }
84 }
85
86 #[cfg(feature = "telemetry")]
87 self.record_set_metrics(written_len, key_delta, memory_delta, start);
88
89 self.enforce_memory_limit(now_ms);
90 }
91
92 #[cfg(feature = "embedded")]
93 #[inline(always)]
94 pub fn set_slice_hashed_local(
95 &mut self,
96 hash: u64,
97 key: &[u8],
98 value: &[u8],
99 expire_at_ms: Option<u64>,
100 now_ms: u64,
101 ) {
102 let key_tag = hash_key_tag_from_hash(hash);
103 self.set_slice_hashed_tagged_local(hash, key_tag, key, value, expire_at_ms, now_ms);
104 }
105
106 #[cfg(feature = "embedded")]
107 #[inline(always)]
108 pub fn set_slice_hashed_tagged_no_ttl_local(
109 &mut self,
110 hash: u64,
111 key_tag: u64,
112 key: &[u8],
113 value: &[u8],
114 ) {
115 debug_assert_eq!(key_tag, hash_key_tag_from_hash(hash));
116 self.disable_fast_point_map();
117 if !self.retired_values.is_empty() {
118 self.reclaim_retired_if_quiescent();
119 }
120 #[cfg(feature = "telemetry")]
121 let start = self.telemetry.as_ref().map(|_| Instant::now());
122 let has_active_readers = self.has_active_readers();
123 let should_touch_access = self.eviction_policy != EvictionPolicy::None;
124 let access_tick = if should_touch_access {
125 self.next_access_tick()
126 } else {
127 0
128 };
129 self.record_lru_touch(hash, access_tick);
130 let reuse_values =
131 should_reuse_value_buffer(value.len()) && !self.reusable_values.is_empty();
132 let mut reusable_values = if reuse_values {
133 mem::take(&mut self.reusable_values)
134 } else {
135 Vec::new()
136 };
137 let mut reusable_value_bytes = if reuse_values {
138 mem::take(&mut self.reusable_value_bytes)
139 } else {
140 0
141 };
142 #[cfg(feature = "telemetry")]
143 let written_len = value.len();
144 #[cfg(feature = "telemetry")]
145 let (key_delta, memory_delta): (isize, isize);
146
147 match self.entries.entry(
148 hash,
149 |entry| entry.matches_prepared(hash, key, key_tag),
150 |entry| entry.hash,
151 ) {
152 hashbrown::hash_table::Entry::Occupied(mut occupied) => {
153 let entry = occupied.get_mut();
154 let had_ttl = entry.expire_at_ms.is_some();
155 let previous_value_len = entry.value.len();
156 let should_replace_value = previous_value_len != value.len() || has_active_readers;
157 let mut retired_value = None;
158 if should_replace_value {
159 let new_value = if reuse_values {
160 shared_bytes_from_reusable_pool(
161 value,
162 &mut reusable_values,
163 &mut reusable_value_bytes,
164 )
165 } else {
166 shared_bytes_from_slice(value)
167 };
168 retired_value = Some(mem::replace(&mut entry.value, new_value));
169 } else {
170 #[cfg(feature = "unsafe")]
171 {
172 unsafe {
178 copy_hot_value_bytes(
179 entry.value.as_ptr().cast_mut(),
180 value.as_ptr(),
181 value.len(),
182 );
183 }
184 }
185 #[cfg(not(feature = "unsafe"))]
186 {
187 let current_value = mem::take(&mut entry.value);
188 match current_value.try_into_mut() {
189 Ok(mut writable) => {
190 writable[..].copy_from_slice(value);
191 entry.value = writable.freeze();
192 }
193 Err(current_value) => {
194 entry.value = if reuse_values {
195 shared_bytes_from_reusable_pool(
196 value,
197 &mut reusable_values,
198 &mut reusable_value_bytes,
199 )
200 } else {
201 shared_bytes_from_slice(value)
202 };
203 retired_value = Some(current_value);
204 }
205 }
206 }
207 }
208 #[cfg(feature = "telemetry")]
209 {
210 key_delta = 0isize;
211 memory_delta = entry.value.len() as isize - previous_value_len as isize;
212 }
213 if should_touch_access {
214 entry.access.record_access(access_tick);
215 }
216 if previous_value_len != entry.value.len() {
217 self.stored_bytes = self
218 .stored_bytes
219 .saturating_sub(previous_value_len)
220 .saturating_add(entry.value.len());
221 }
222 if had_ttl {
223 entry.expire_at_ms = None;
224 self.ttl_entries = self.ttl_entries.saturating_sub(1);
225 }
226 if let Some(old_value) = retired_value {
227 if has_active_readers {
228 self.retire_value(old_value);
229 } else if reuse_values {
230 recycle_value_into_pool(
231 old_value,
232 &mut reusable_values,
233 &mut reusable_value_bytes,
234 );
235 } else {
236 self.recycle_value(old_value);
237 }
238 }
239 }
240 hashbrown::hash_table::Entry::Vacant(vacant) => {
241 let key_len = key.len();
242 let value_len = value.len();
243 let stored_value = if reuse_values {
244 shared_bytes_from_reusable_pool(
245 value,
246 &mut reusable_values,
247 &mut reusable_value_bytes,
248 )
249 } else {
250 shared_bytes_from_slice(value)
251 };
252 vacant.insert(FlatEntry {
253 hash,
254 key_tag,
255 key_len,
256 key: key.to_vec().into_boxed_slice(),
257 value: stored_value,
258 expire_at_ms: None,
259 access: EntryAccessMeta {
260 last_touch: access_tick,
261 frequency: 1,
262 },
263 });
264 self.stored_bytes = self
265 .stored_bytes
266 .saturating_add(key_len)
267 .saturating_add(value_len);
268 #[cfg(feature = "telemetry")]
269 {
270 key_delta = 1isize;
271 memory_delta = (key_len + value_len) as isize;
272 }
273 }
274 }
275
276 if reuse_values {
277 self.reusable_values = reusable_values;
278 self.reusable_value_bytes = reusable_value_bytes;
279 }
280
281 #[cfg(feature = "telemetry")]
282 self.record_set_metrics(written_len, key_delta, memory_delta, start);
283 }
284
285 #[cfg(feature = "embedded")]
286 #[inline(always)]
287 pub fn set_slice_hashed_tagged_local(
288 &mut self,
289 hash: u64,
290 key_tag: u64,
291 key: &[u8],
292 value: &[u8],
293 expire_at_ms: Option<u64>,
294 now_ms: u64,
295 ) {
296 debug_assert_eq!(key_tag, hash_key_tag_from_hash(hash));
297 self.disable_fast_point_map();
298 self.reclaim_retired_if_quiescent();
299 #[cfg(feature = "telemetry")]
300 let start = self.telemetry.as_ref().map(|_| Instant::now());
301 let has_active_readers = self.has_active_readers();
302 let should_touch_access = self.eviction_policy != EvictionPolicy::None;
303 let access_tick = if should_touch_access {
304 self.next_access_tick()
305 } else {
306 0
307 };
308 self.record_lru_touch(hash, access_tick);
309 #[cfg(feature = "telemetry")]
310 let written_len = value.len();
311 #[cfg(feature = "telemetry")]
312 let (key_delta, memory_delta): (isize, isize);
313
314 match self.entries.entry(
315 hash,
316 |entry| entry.matches_prepared(hash, key, key_tag),
317 |entry| entry.hash,
318 ) {
319 hashbrown::hash_table::Entry::Occupied(mut occupied) => {
320 let entry = occupied.get_mut();
321 let had_ttl = entry.expire_at_ms.is_some();
322 let previous_value_len = entry.value.len();
323 let mut retired_value = None;
324 if previous_value_len == value.len() && !has_active_readers {
325 let current_value = mem::take(&mut entry.value);
326 match current_value.try_into_mut() {
327 Ok(mut writable) => {
328 writable[..].copy_from_slice(value);
329 entry.value = writable.freeze();
330 }
331 Err(current_value) => {
332 entry.value = shared_bytes_from_slice(value);
333 retired_value = Some(current_value);
334 }
335 }
336 } else {
337 retired_value = Some(mem::replace(
338 &mut entry.value,
339 shared_bytes_from_slice(value),
340 ));
341 }
342 #[cfg(feature = "telemetry")]
343 {
344 key_delta = 0isize;
345 memory_delta = entry.value.len() as isize - previous_value_len as isize;
346 }
347 if should_touch_access {
348 entry.access.record_access(access_tick);
349 }
350 if previous_value_len != entry.value.len() {
351 self.stored_bytes = self
352 .stored_bytes
353 .saturating_sub(previous_value_len)
354 .saturating_add(entry.value.len());
355 }
356 if had_ttl || expire_at_ms.is_some() {
357 entry.expire_at_ms = expire_at_ms;
358 self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
359 }
360 if let Some(old_value) = retired_value {
361 self.retire_value(old_value);
362 }
363 }
364 hashbrown::hash_table::Entry::Vacant(vacant) => {
365 let key_len = key.len();
366 let value_len = value.len();
367 vacant.insert(FlatEntry {
368 hash,
369 key_tag,
370 key_len,
371 key: key.to_vec().into_boxed_slice(),
372 value: shared_bytes_from_slice(value),
373 expire_at_ms,
374 access: EntryAccessMeta {
375 last_touch: access_tick,
376 frequency: 1,
377 },
378 });
379 self.stored_bytes = self
380 .stored_bytes
381 .saturating_add(key_len)
382 .saturating_add(value_len);
383 #[cfg(feature = "telemetry")]
384 {
385 key_delta = 1isize;
386 memory_delta = (key_len + value_len) as isize;
387 }
388 if expire_at_ms.is_some() {
389 self.ttl_entries = self.ttl_entries.saturating_add(1);
390 }
391 }
392 }
393
394 #[cfg(feature = "telemetry")]
395 self.record_set_metrics(written_len, key_delta, memory_delta, start);
396
397 self.enforce_memory_limit(now_ms);
398 }
399}