fast_cache/storage/flat_map/
write.rs1use super::*;
2
3impl FlatMap {
4 #[inline(always)]
5 pub fn set<K, V>(&mut self, key: K, value: V, expire_at_ms: Option<u64>, now_ms: u64)
6 where
7 K: Into<Bytes>,
8 V: Into<Bytes>,
9 {
10 let key = key.into();
11 self.set_hashed(hash_key(&key), key, value, expire_at_ms, now_ms);
12 }
13
14 #[inline(always)]
15 pub fn set_slice(&mut self, key: &[u8], value: &[u8], expire_at_ms: Option<u64>, now_ms: u64) {
16 self.set_slice_hashed(hash_key(key), key, value, expire_at_ms, now_ms);
17 }
18
19 pub fn set_bytes_hashed(
26 &mut self,
27 hash: u64,
28 key: &[u8],
29 value: SharedBytes,
30 expire_at_ms: Option<u64>,
31 now_ms: u64,
32 ) {
33 self.disable_fast_point_map();
34 self.reclaim_retired_if_quiescent();
35 let access_tick = if self.eviction_policy == EvictionPolicy::None {
36 0
37 } else {
38 self.next_access_tick()
39 };
40 self.record_lru_touch(hash, access_tick);
41
42 let key_tag = hash_key_tag_from_hash(hash);
43 match self.entries.entry(
44 hash,
45 |entry| entry.matches_hashed_key(hash, key),
46 |entry| entry.hash,
47 ) {
48 hashbrown::hash_table::Entry::Occupied(mut occupied) => {
49 let entry = occupied.get_mut();
50 let had_ttl = entry.expire_at_ms.is_some();
51 let previous_value_len = entry.value.len();
52 let new_value_len = value.len();
53 let retired_value = mem::replace(&mut entry.value, value);
54 entry.access.record_access(access_tick);
55 self.stored_bytes = self
56 .stored_bytes
57 .saturating_sub(previous_value_len)
58 .saturating_add(new_value_len);
59 entry.expire_at_ms = expire_at_ms;
60 self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
61 self.retire_value(retired_value);
62 }
63 hashbrown::hash_table::Entry::Vacant(vacant) => {
64 let key_len = key.len();
65 let value_len = value.len();
66 vacant.insert(FlatEntry {
67 hash,
68 key_tag,
69 key_len,
70 key: key.to_vec().into_boxed_slice(),
71 value,
72 expire_at_ms,
73 access: EntryAccessMeta {
74 last_touch: access_tick,
75 frequency: 1,
76 },
77 });
78 self.stored_bytes = self
79 .stored_bytes
80 .saturating_add(key_len)
81 .saturating_add(value_len);
82 if expire_at_ms.is_some() {
83 self.ttl_entries = self.ttl_entries.saturating_add(1);
84 }
85 }
86 }
87
88 self.enforce_memory_limit(now_ms);
89 }
90
91 #[inline(always)]
92 pub fn set_hashed<K, V>(
93 &mut self,
94 hash: u64,
95 key: K,
96 value: V,
97 expire_at_ms: Option<u64>,
98 now_ms: u64,
99 ) where
100 K: Into<Bytes>,
101 V: Into<Bytes>,
102 {
103 self.disable_fast_point_map();
104 self.reclaim_retired_if_quiescent();
105 #[cfg(feature = "telemetry")]
106 let start = self.telemetry.as_ref().map(|_| Instant::now());
107
108 let key = key.into();
109 let mut replacement = Some(SharedBytes::from(value.into()));
110 let _ = self.has_active_readers();
111 let access_tick = if self.eviction_policy == EvictionPolicy::None {
112 0
113 } else {
114 self.next_access_tick()
115 };
116 self.record_lru_touch(hash, access_tick);
117 #[cfg(feature = "telemetry")]
118 let written_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
119 #[cfg(feature = "telemetry")]
120 let (key_delta, memory_delta): (isize, isize);
121
122 match self
123 .entries
124 .entry(hash, |entry| entry.matches(hash, &key), |entry| entry.hash)
125 {
126 hashbrown::hash_table::Entry::Occupied(mut occupied) => {
127 let entry = occupied.get_mut();
128 let had_ttl = entry.expire_at_ms.is_some();
129 let previous_value_len = entry.value.len();
130 let retired_value =
131 Some(mem::replace(&mut entry.value, replacement.take().unwrap()));
132 #[cfg(feature = "telemetry")]
133 {
134 key_delta = 0isize;
135 memory_delta = entry.value.len() as isize - previous_value_len as isize;
136 }
137 entry.access.record_access(access_tick);
138 self.stored_bytes = self
139 .stored_bytes
140 .saturating_sub(previous_value_len)
141 .saturating_add(entry.value.len());
142 entry.expire_at_ms = expire_at_ms;
143 self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
144 if let Some(old_value) = retired_value {
145 self.retire_value(old_value);
146 }
147 }
148 hashbrown::hash_table::Entry::Vacant(vacant) => {
149 let key_len = key.len();
150 let value_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
151 vacant.insert(FlatEntry {
152 hash,
153 key_tag: hash_key_tag_from_hash(hash),
154 key_len,
155 key: key.into_boxed_slice(),
156 value: replacement.take().unwrap(),
157 expire_at_ms,
158 access: EntryAccessMeta {
159 last_touch: access_tick,
160 frequency: 1,
161 },
162 });
163 self.stored_bytes = self
164 .stored_bytes
165 .saturating_add(key_len)
166 .saturating_add(value_len);
167 #[cfg(feature = "telemetry")]
168 {
169 key_delta = 1isize;
170 memory_delta = (key_len + value_len) as isize;
171 }
172 if expire_at_ms.is_some() {
173 self.ttl_entries = self.ttl_entries.saturating_add(1);
174 }
175 }
176 }
177
178 #[cfg(feature = "telemetry")]
179 self.record_set_metrics(written_len, key_delta, memory_delta, start);
180
181 self.enforce_memory_limit(now_ms);
182 }
183
184 #[inline(always)]
185 pub fn set_slice_hashed(
186 &mut self,
187 hash: u64,
188 key: &[u8],
189 value: &[u8],
190 expire_at_ms: Option<u64>,
191 now_ms: u64,
192 ) {
193 self.disable_fast_point_map();
194 self.reclaim_retired_if_quiescent();
195 #[cfg(feature = "telemetry")]
196 let start = self.telemetry.as_ref().map(|_| Instant::now());
197 let has_active_readers = self.has_active_readers();
198 let should_touch_access = self.eviction_policy != EvictionPolicy::None;
199 let access_tick = if should_touch_access {
200 self.next_access_tick()
201 } else {
202 0
203 };
204 self.record_lru_touch(hash, access_tick);
205 let reuse_values =
206 should_reuse_value_buffer(value.len()) && !self.reusable_values.is_empty();
207 let mut reusable_values = if reuse_values {
208 mem::take(&mut self.reusable_values)
209 } else {
210 Vec::new()
211 };
212 let mut reusable_value_bytes = if reuse_values {
213 mem::take(&mut self.reusable_value_bytes)
214 } else {
215 0
216 };
217 #[cfg(feature = "telemetry")]
218 let written_len = value.len();
219 #[cfg(feature = "telemetry")]
220 let (key_delta, memory_delta): (isize, isize);
221
222 let key_tag = hash_key_tag_from_hash(hash);
223 match self.entries.entry(
224 hash,
225 |entry| entry.matches_hashed_key(hash, key),
226 |entry| entry.hash,
227 ) {
228 hashbrown::hash_table::Entry::Occupied(mut occupied) => {
229 let entry = occupied.get_mut();
230 let had_ttl = entry.expire_at_ms.is_some();
231 let previous_value_len = entry.value.len();
232 let should_replace_value = previous_value_len != value.len() || has_active_readers;
233 let mut retired_value = None;
234 if should_replace_value {
235 let new_value = if reuse_values {
236 shared_bytes_from_reusable_pool(
237 value,
238 &mut reusable_values,
239 &mut reusable_value_bytes,
240 )
241 } else {
242 shared_bytes_from_slice(value)
243 };
244 retired_value = Some(mem::replace(&mut entry.value, new_value));
245 } else {
246 let current_value = mem::take(&mut entry.value);
247 match current_value.try_into_mut() {
248 Ok(mut writable) => {
249 writable[..].copy_from_slice(value);
250 entry.value = writable.freeze();
251 }
252 Err(current_value) => {
253 entry.value = if reuse_values {
254 shared_bytes_from_reusable_pool(
255 value,
256 &mut reusable_values,
257 &mut reusable_value_bytes,
258 )
259 } else {
260 shared_bytes_from_slice(value)
261 };
262 retired_value = Some(current_value);
263 }
264 }
265 }
266 #[cfg(feature = "telemetry")]
267 {
268 key_delta = 0isize;
269 memory_delta = entry.value.len() as isize - previous_value_len as isize;
270 }
271 if should_touch_access {
272 entry.access.record_access(access_tick);
273 }
274 if previous_value_len != entry.value.len() {
275 self.stored_bytes = self
276 .stored_bytes
277 .saturating_sub(previous_value_len)
278 .saturating_add(entry.value.len());
279 }
280 if had_ttl || expire_at_ms.is_some() {
281 entry.expire_at_ms = expire_at_ms;
282 self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
283 }
284 if let Some(old_value) = retired_value {
285 if has_active_readers {
286 self.retire_value(old_value);
287 } else if reuse_values {
288 recycle_value_into_pool(
289 old_value,
290 &mut reusable_values,
291 &mut reusable_value_bytes,
292 );
293 } else {
294 self.recycle_value(old_value);
295 }
296 }
297 }
298 hashbrown::hash_table::Entry::Vacant(vacant) => {
299 let key_len = key.len();
300 let value_len = value.len();
301 let stored_value = if reuse_values {
302 shared_bytes_from_reusable_pool(
303 value,
304 &mut reusable_values,
305 &mut reusable_value_bytes,
306 )
307 } else {
308 shared_bytes_from_slice(value)
309 };
310 vacant.insert(FlatEntry {
311 hash,
312 key_tag,
313 key_len,
314 key: key.to_vec().into_boxed_slice(),
315 value: stored_value,
316 expire_at_ms,
317 access: EntryAccessMeta {
318 last_touch: access_tick,
319 frequency: 1,
320 },
321 });
322 self.stored_bytes = self
323 .stored_bytes
324 .saturating_add(key_len)
325 .saturating_add(value_len);
326 #[cfg(feature = "telemetry")]
327 {
328 key_delta = 1isize;
329 memory_delta = (key_len + value_len) as isize;
330 }
331 if expire_at_ms.is_some() {
332 self.ttl_entries = self.ttl_entries.saturating_add(1);
333 }
334 }
335 }
336
337 if reuse_values {
338 self.reusable_values = reusable_values;
339 self.reusable_value_bytes = reusable_value_bytes;
340 }
341
342 #[cfg(feature = "telemetry")]
343 self.record_set_metrics(written_len, key_delta, memory_delta, start);
344
345 self.enforce_memory_limit(now_ms);
346 }
347}