Skip to main content

fast_cache/storage/flat_map/
write_local.rs

1use super::*;
2
3impl FlatMap {
4    #[cfg(feature = "embedded")]
5    #[inline(always)]
6    pub fn set_hashed_local<K, V>(
7        &mut self,
8        hash: u64,
9        key: K,
10        value: V,
11        expire_at_ms: Option<u64>,
12        now_ms: u64,
13    ) where
14        K: Into<Bytes>,
15        V: Into<Bytes>,
16    {
17        self.disable_fast_point_map();
18        #[cfg(feature = "telemetry")]
19        let start = self.telemetry.as_ref().map(|_| Instant::now());
20
21        let key = key.into();
22        let mut replacement = Some(SharedBytes::from(value.into()));
23        let access_tick = if self.eviction_policy == EvictionPolicy::None {
24            0
25        } else {
26            self.next_access_tick()
27        };
28        self.record_lru_touch(hash, access_tick);
29        #[cfg(feature = "telemetry")]
30        let written_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
31        #[cfg(feature = "telemetry")]
32        let (key_delta, memory_delta): (isize, isize);
33
34        match self
35            .entries
36            .entry(hash, |entry| entry.matches(hash, &key), |entry| entry.hash)
37        {
38            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
39                let entry = occupied.get_mut();
40                let had_ttl = entry.expire_at_ms.is_some();
41                let previous_value_len = entry.value.len();
42                entry.value = replacement.take().unwrap();
43                #[cfg(feature = "telemetry")]
44                {
45                    key_delta = 0isize;
46                    memory_delta = entry.value.len() as isize - previous_value_len as isize;
47                }
48                entry.access.record_access(access_tick);
49                self.stored_bytes = self
50                    .stored_bytes
51                    .saturating_sub(previous_value_len)
52                    .saturating_add(entry.value.len());
53                entry.expire_at_ms = expire_at_ms;
54                self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
55            }
56            hashbrown::hash_table::Entry::Vacant(vacant) => {
57                let key_len = key.len();
58                let value_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
59                vacant.insert(FlatEntry {
60                    hash,
61                    key_tag: hash_key_tag_from_hash(hash),
62                    key_len,
63                    key: key.into_boxed_slice(),
64                    value: replacement.take().unwrap(),
65                    expire_at_ms,
66                    access: EntryAccessMeta {
67                        last_touch: access_tick,
68                        frequency: 1,
69                    },
70                });
71                self.stored_bytes = self
72                    .stored_bytes
73                    .saturating_add(key_len)
74                    .saturating_add(value_len);
75                #[cfg(feature = "telemetry")]
76                {
77                    key_delta = 1isize;
78                    memory_delta = (key_len + value_len) as isize;
79                }
80                if expire_at_ms.is_some() {
81                    self.ttl_entries = self.ttl_entries.saturating_add(1);
82                }
83            }
84        }
85
86        #[cfg(feature = "telemetry")]
87        self.record_set_metrics(written_len, key_delta, memory_delta, start);
88
89        self.enforce_memory_limit(now_ms);
90    }
91
92    #[cfg(feature = "embedded")]
93    #[inline(always)]
94    pub fn set_slice_hashed_local(
95        &mut self,
96        hash: u64,
97        key: &[u8],
98        value: &[u8],
99        expire_at_ms: Option<u64>,
100        now_ms: u64,
101    ) {
102        let key_tag = hash_key_tag_from_hash(hash);
103        self.set_slice_hashed_tagged_local(hash, key_tag, key, value, expire_at_ms, now_ms);
104    }
105
106    #[cfg(feature = "embedded")]
107    #[inline(always)]
108    pub fn set_slice_hashed_tagged_no_ttl_local(
109        &mut self,
110        hash: u64,
111        key_tag: u64,
112        key: &[u8],
113        value: &[u8],
114    ) {
115        debug_assert_eq!(key_tag, hash_key_tag_from_hash(hash));
116        self.disable_fast_point_map();
117        if !self.retired_values.is_empty() {
118            self.reclaim_retired_if_quiescent();
119        }
120        #[cfg(feature = "telemetry")]
121        let start = self.telemetry.as_ref().map(|_| Instant::now());
122        let has_active_readers = self.has_active_readers();
123        let should_touch_access = self.eviction_policy != EvictionPolicy::None;
124        let access_tick = if should_touch_access {
125            self.next_access_tick()
126        } else {
127            0
128        };
129        self.record_lru_touch(hash, access_tick);
130        let reuse_values =
131            should_reuse_value_buffer(value.len()) && !self.reusable_values.is_empty();
132        let mut reusable_values = if reuse_values {
133            mem::take(&mut self.reusable_values)
134        } else {
135            Vec::new()
136        };
137        let mut reusable_value_bytes = if reuse_values {
138            mem::take(&mut self.reusable_value_bytes)
139        } else {
140            0
141        };
142        #[cfg(feature = "telemetry")]
143        let written_len = value.len();
144        #[cfg(feature = "telemetry")]
145        let (key_delta, memory_delta): (isize, isize);
146
147        match self.entries.entry(
148            hash,
149            |entry| entry.matches_prepared(hash, key, key_tag),
150            |entry| entry.hash,
151        ) {
152            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
153                let entry = occupied.get_mut();
154                let had_ttl = entry.expire_at_ms.is_some();
155                let previous_value_len = entry.value.len();
156                let should_replace_value = previous_value_len != value.len() || has_active_readers;
157                let mut retired_value = None;
158                if should_replace_value {
159                    let new_value = if reuse_values {
160                        shared_bytes_from_reusable_pool(
161                            value,
162                            &mut reusable_values,
163                            &mut reusable_value_bytes,
164                        )
165                    } else {
166                        shared_bytes_from_slice(value)
167                    };
168                    retired_value = Some(mem::replace(&mut entry.value, new_value));
169                } else {
170                    #[cfg(feature = "unsafe")]
171                    {
172                        // SAFETY: this local no-TTL setter is only used by
173                        // embedded worker-local stores after route ownership
174                        // has been checked. With no active read epochs and
175                        // equal lengths, overwriting the value bytes preserves
176                        // stored metadata and avoids `Bytes` promotion checks.
177                        unsafe {
178                            copy_hot_value_bytes(
179                                entry.value.as_ptr().cast_mut(),
180                                value.as_ptr(),
181                                value.len(),
182                            );
183                        }
184                    }
185                    #[cfg(not(feature = "unsafe"))]
186                    {
187                        let current_value = mem::take(&mut entry.value);
188                        match current_value.try_into_mut() {
189                            Ok(mut writable) => {
190                                writable[..].copy_from_slice(value);
191                                entry.value = writable.freeze();
192                            }
193                            Err(current_value) => {
194                                entry.value = if reuse_values {
195                                    shared_bytes_from_reusable_pool(
196                                        value,
197                                        &mut reusable_values,
198                                        &mut reusable_value_bytes,
199                                    )
200                                } else {
201                                    shared_bytes_from_slice(value)
202                                };
203                                retired_value = Some(current_value);
204                            }
205                        }
206                    }
207                }
208                #[cfg(feature = "telemetry")]
209                {
210                    key_delta = 0isize;
211                    memory_delta = entry.value.len() as isize - previous_value_len as isize;
212                }
213                if should_touch_access {
214                    entry.access.record_access(access_tick);
215                }
216                if previous_value_len != entry.value.len() {
217                    self.stored_bytes = self
218                        .stored_bytes
219                        .saturating_sub(previous_value_len)
220                        .saturating_add(entry.value.len());
221                }
222                if had_ttl {
223                    entry.expire_at_ms = None;
224                    self.ttl_entries = self.ttl_entries.saturating_sub(1);
225                }
226                if let Some(old_value) = retired_value {
227                    if has_active_readers {
228                        self.retire_value(old_value);
229                    } else if reuse_values {
230                        recycle_value_into_pool(
231                            old_value,
232                            &mut reusable_values,
233                            &mut reusable_value_bytes,
234                        );
235                    } else {
236                        self.recycle_value(old_value);
237                    }
238                }
239            }
240            hashbrown::hash_table::Entry::Vacant(vacant) => {
241                let key_len = key.len();
242                let value_len = value.len();
243                let stored_value = if reuse_values {
244                    shared_bytes_from_reusable_pool(
245                        value,
246                        &mut reusable_values,
247                        &mut reusable_value_bytes,
248                    )
249                } else {
250                    shared_bytes_from_slice(value)
251                };
252                vacant.insert(FlatEntry {
253                    hash,
254                    key_tag,
255                    key_len,
256                    key: key.to_vec().into_boxed_slice(),
257                    value: stored_value,
258                    expire_at_ms: None,
259                    access: EntryAccessMeta {
260                        last_touch: access_tick,
261                        frequency: 1,
262                    },
263                });
264                self.stored_bytes = self
265                    .stored_bytes
266                    .saturating_add(key_len)
267                    .saturating_add(value_len);
268                #[cfg(feature = "telemetry")]
269                {
270                    key_delta = 1isize;
271                    memory_delta = (key_len + value_len) as isize;
272                }
273            }
274        }
275
276        if reuse_values {
277            self.reusable_values = reusable_values;
278            self.reusable_value_bytes = reusable_value_bytes;
279        }
280
281        #[cfg(feature = "telemetry")]
282        self.record_set_metrics(written_len, key_delta, memory_delta, start);
283    }
284
285    #[cfg(feature = "embedded")]
286    #[inline(always)]
287    pub fn set_slice_hashed_tagged_local(
288        &mut self,
289        hash: u64,
290        key_tag: u64,
291        key: &[u8],
292        value: &[u8],
293        expire_at_ms: Option<u64>,
294        now_ms: u64,
295    ) {
296        debug_assert_eq!(key_tag, hash_key_tag_from_hash(hash));
297        self.disable_fast_point_map();
298        self.reclaim_retired_if_quiescent();
299        #[cfg(feature = "telemetry")]
300        let start = self.telemetry.as_ref().map(|_| Instant::now());
301        let has_active_readers = self.has_active_readers();
302        let should_touch_access = self.eviction_policy != EvictionPolicy::None;
303        let access_tick = if should_touch_access {
304            self.next_access_tick()
305        } else {
306            0
307        };
308        self.record_lru_touch(hash, access_tick);
309        #[cfg(feature = "telemetry")]
310        let written_len = value.len();
311        #[cfg(feature = "telemetry")]
312        let (key_delta, memory_delta): (isize, isize);
313
314        match self.entries.entry(
315            hash,
316            |entry| entry.matches_prepared(hash, key, key_tag),
317            |entry| entry.hash,
318        ) {
319            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
320                let entry = occupied.get_mut();
321                let had_ttl = entry.expire_at_ms.is_some();
322                let previous_value_len = entry.value.len();
323                let mut retired_value = None;
324                if previous_value_len == value.len() && !has_active_readers {
325                    let current_value = mem::take(&mut entry.value);
326                    match current_value.try_into_mut() {
327                        Ok(mut writable) => {
328                            writable[..].copy_from_slice(value);
329                            entry.value = writable.freeze();
330                        }
331                        Err(current_value) => {
332                            entry.value = shared_bytes_from_slice(value);
333                            retired_value = Some(current_value);
334                        }
335                    }
336                } else {
337                    retired_value = Some(mem::replace(
338                        &mut entry.value,
339                        shared_bytes_from_slice(value),
340                    ));
341                }
342                #[cfg(feature = "telemetry")]
343                {
344                    key_delta = 0isize;
345                    memory_delta = entry.value.len() as isize - previous_value_len as isize;
346                }
347                if should_touch_access {
348                    entry.access.record_access(access_tick);
349                }
350                if previous_value_len != entry.value.len() {
351                    self.stored_bytes = self
352                        .stored_bytes
353                        .saturating_sub(previous_value_len)
354                        .saturating_add(entry.value.len());
355                }
356                if had_ttl || expire_at_ms.is_some() {
357                    entry.expire_at_ms = expire_at_ms;
358                    self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
359                }
360                if let Some(old_value) = retired_value {
361                    self.retire_value(old_value);
362                }
363            }
364            hashbrown::hash_table::Entry::Vacant(vacant) => {
365                let key_len = key.len();
366                let value_len = value.len();
367                vacant.insert(FlatEntry {
368                    hash,
369                    key_tag,
370                    key_len,
371                    key: key.to_vec().into_boxed_slice(),
372                    value: shared_bytes_from_slice(value),
373                    expire_at_ms,
374                    access: EntryAccessMeta {
375                        last_touch: access_tick,
376                        frequency: 1,
377                    },
378                });
379                self.stored_bytes = self
380                    .stored_bytes
381                    .saturating_add(key_len)
382                    .saturating_add(value_len);
383                #[cfg(feature = "telemetry")]
384                {
385                    key_delta = 1isize;
386                    memory_delta = (key_len + value_len) as isize;
387                }
388                if expire_at_ms.is_some() {
389                    self.ttl_entries = self.ttl_entries.saturating_add(1);
390                }
391            }
392        }
393
394        #[cfg(feature = "telemetry")]
395        self.record_set_metrics(written_len, key_delta, memory_delta, start);
396
397        self.enforce_memory_limit(now_ms);
398    }
399}