Skip to main content

fast_cache/storage/flat_map/
write.rs

1use super::*;
2
3impl FlatMap {
4    #[inline(always)]
5    pub fn set<K, V>(&mut self, key: K, value: V, expire_at_ms: Option<u64>, now_ms: u64)
6    where
7        K: Into<Bytes>,
8        V: Into<Bytes>,
9    {
10        let key = key.into();
11        self.set_hashed(hash_key(&key), key, value, expire_at_ms, now_ms);
12    }
13
14    #[inline(always)]
15    pub fn set_slice(&mut self, key: &[u8], value: &[u8], expire_at_ms: Option<u64>, now_ms: u64) {
16        self.set_slice_hashed(hash_key(key), key, value, expire_at_ms, now_ms);
17    }
18
19    /// Zero-copy `SET` for the multi-direct hot path: takes `value` as an
20    /// already-owned `SharedBytes` (typically a `split_prefix` slice from the
21    /// connection read buffer). Avoids the heap allocation that
22    /// `set_slice_hashed` performs to copy `value` into a new `SharedBytes`.
23    /// Key is copied into a `Box<[u8]>` so the entry retains a tight key
24    /// allocation (keys are small and don't benefit from sharing).
25    pub fn set_bytes_hashed(
26        &mut self,
27        hash: u64,
28        key: &[u8],
29        value: SharedBytes,
30        expire_at_ms: Option<u64>,
31        now_ms: u64,
32    ) {
33        self.disable_fast_point_map();
34        self.reclaim_retired_if_quiescent();
35        let access_tick = if self.eviction_policy == EvictionPolicy::None {
36            0
37        } else {
38            self.next_access_tick()
39        };
40        self.record_lru_touch(hash, access_tick);
41
42        let key_tag = hash_key_tag_from_hash(hash);
43        match self.entries.entry(
44            hash,
45            |entry| entry.matches_hashed_key(hash, key),
46            |entry| entry.hash,
47        ) {
48            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
49                let entry = occupied.get_mut();
50                let had_ttl = entry.expire_at_ms.is_some();
51                let previous_value_len = entry.value.len();
52                let new_value_len = value.len();
53                let retired_value = mem::replace(&mut entry.value, value);
54                entry.access.record_access(access_tick);
55                self.stored_bytes = self
56                    .stored_bytes
57                    .saturating_sub(previous_value_len)
58                    .saturating_add(new_value_len);
59                entry.expire_at_ms = expire_at_ms;
60                self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
61                self.retire_value(retired_value);
62            }
63            hashbrown::hash_table::Entry::Vacant(vacant) => {
64                let key_len = key.len();
65                let value_len = value.len();
66                vacant.insert(FlatEntry {
67                    hash,
68                    key_tag,
69                    key_len,
70                    key: key.to_vec().into_boxed_slice(),
71                    value,
72                    expire_at_ms,
73                    access: EntryAccessMeta {
74                        last_touch: access_tick,
75                        frequency: 1,
76                    },
77                });
78                self.stored_bytes = self
79                    .stored_bytes
80                    .saturating_add(key_len)
81                    .saturating_add(value_len);
82                if expire_at_ms.is_some() {
83                    self.ttl_entries = self.ttl_entries.saturating_add(1);
84                }
85            }
86        }
87
88        self.enforce_memory_limit(now_ms);
89    }
90
91    #[inline(always)]
92    pub fn set_hashed<K, V>(
93        &mut self,
94        hash: u64,
95        key: K,
96        value: V,
97        expire_at_ms: Option<u64>,
98        now_ms: u64,
99    ) where
100        K: Into<Bytes>,
101        V: Into<Bytes>,
102    {
103        self.disable_fast_point_map();
104        self.reclaim_retired_if_quiescent();
105        #[cfg(feature = "telemetry")]
106        let start = self.telemetry.as_ref().map(|_| Instant::now());
107
108        let key = key.into();
109        let mut replacement = Some(SharedBytes::from(value.into()));
110        let _ = self.has_active_readers();
111        let access_tick = if self.eviction_policy == EvictionPolicy::None {
112            0
113        } else {
114            self.next_access_tick()
115        };
116        self.record_lru_touch(hash, access_tick);
117        #[cfg(feature = "telemetry")]
118        let written_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
119        #[cfg(feature = "telemetry")]
120        let (key_delta, memory_delta): (isize, isize);
121
122        match self
123            .entries
124            .entry(hash, |entry| entry.matches(hash, &key), |entry| entry.hash)
125        {
126            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
127                let entry = occupied.get_mut();
128                let had_ttl = entry.expire_at_ms.is_some();
129                let previous_value_len = entry.value.len();
130                let retired_value =
131                    Some(mem::replace(&mut entry.value, replacement.take().unwrap()));
132                #[cfg(feature = "telemetry")]
133                {
134                    key_delta = 0isize;
135                    memory_delta = entry.value.len() as isize - previous_value_len as isize;
136                }
137                entry.access.record_access(access_tick);
138                self.stored_bytes = self
139                    .stored_bytes
140                    .saturating_sub(previous_value_len)
141                    .saturating_add(entry.value.len());
142                entry.expire_at_ms = expire_at_ms;
143                self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
144                if let Some(old_value) = retired_value {
145                    self.retire_value(old_value);
146                }
147            }
148            hashbrown::hash_table::Entry::Vacant(vacant) => {
149                let key_len = key.len();
150                let value_len = replacement.as_ref().map_or(0, |bytes| bytes.len());
151                vacant.insert(FlatEntry {
152                    hash,
153                    key_tag: hash_key_tag_from_hash(hash),
154                    key_len,
155                    key: key.into_boxed_slice(),
156                    value: replacement.take().unwrap(),
157                    expire_at_ms,
158                    access: EntryAccessMeta {
159                        last_touch: access_tick,
160                        frequency: 1,
161                    },
162                });
163                self.stored_bytes = self
164                    .stored_bytes
165                    .saturating_add(key_len)
166                    .saturating_add(value_len);
167                #[cfg(feature = "telemetry")]
168                {
169                    key_delta = 1isize;
170                    memory_delta = (key_len + value_len) as isize;
171                }
172                if expire_at_ms.is_some() {
173                    self.ttl_entries = self.ttl_entries.saturating_add(1);
174                }
175            }
176        }
177
178        #[cfg(feature = "telemetry")]
179        self.record_set_metrics(written_len, key_delta, memory_delta, start);
180
181        self.enforce_memory_limit(now_ms);
182    }
183
184    #[inline(always)]
185    pub fn set_slice_hashed(
186        &mut self,
187        hash: u64,
188        key: &[u8],
189        value: &[u8],
190        expire_at_ms: Option<u64>,
191        now_ms: u64,
192    ) {
193        self.disable_fast_point_map();
194        self.reclaim_retired_if_quiescent();
195        #[cfg(feature = "telemetry")]
196        let start = self.telemetry.as_ref().map(|_| Instant::now());
197        let has_active_readers = self.has_active_readers();
198        let should_touch_access = self.eviction_policy != EvictionPolicy::None;
199        let access_tick = if should_touch_access {
200            self.next_access_tick()
201        } else {
202            0
203        };
204        self.record_lru_touch(hash, access_tick);
205        let reuse_values =
206            should_reuse_value_buffer(value.len()) && !self.reusable_values.is_empty();
207        let mut reusable_values = if reuse_values {
208            mem::take(&mut self.reusable_values)
209        } else {
210            Vec::new()
211        };
212        let mut reusable_value_bytes = if reuse_values {
213            mem::take(&mut self.reusable_value_bytes)
214        } else {
215            0
216        };
217        #[cfg(feature = "telemetry")]
218        let written_len = value.len();
219        #[cfg(feature = "telemetry")]
220        let (key_delta, memory_delta): (isize, isize);
221
222        let key_tag = hash_key_tag_from_hash(hash);
223        match self.entries.entry(
224            hash,
225            |entry| entry.matches_hashed_key(hash, key),
226            |entry| entry.hash,
227        ) {
228            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
229                let entry = occupied.get_mut();
230                let had_ttl = entry.expire_at_ms.is_some();
231                let previous_value_len = entry.value.len();
232                let should_replace_value = previous_value_len != value.len() || has_active_readers;
233                let mut retired_value = None;
234                if should_replace_value {
235                    let new_value = if reuse_values {
236                        shared_bytes_from_reusable_pool(
237                            value,
238                            &mut reusable_values,
239                            &mut reusable_value_bytes,
240                        )
241                    } else {
242                        shared_bytes_from_slice(value)
243                    };
244                    retired_value = Some(mem::replace(&mut entry.value, new_value));
245                } else {
246                    let current_value = mem::take(&mut entry.value);
247                    match current_value.try_into_mut() {
248                        Ok(mut writable) => {
249                            writable[..].copy_from_slice(value);
250                            entry.value = writable.freeze();
251                        }
252                        Err(current_value) => {
253                            entry.value = if reuse_values {
254                                shared_bytes_from_reusable_pool(
255                                    value,
256                                    &mut reusable_values,
257                                    &mut reusable_value_bytes,
258                                )
259                            } else {
260                                shared_bytes_from_slice(value)
261                            };
262                            retired_value = Some(current_value);
263                        }
264                    }
265                }
266                #[cfg(feature = "telemetry")]
267                {
268                    key_delta = 0isize;
269                    memory_delta = entry.value.len() as isize - previous_value_len as isize;
270                }
271                if should_touch_access {
272                    entry.access.record_access(access_tick);
273                }
274                if previous_value_len != entry.value.len() {
275                    self.stored_bytes = self
276                        .stored_bytes
277                        .saturating_sub(previous_value_len)
278                        .saturating_add(entry.value.len());
279                }
280                if had_ttl || expire_at_ms.is_some() {
281                    entry.expire_at_ms = expire_at_ms;
282                    self.adjust_ttl_count(had_ttl, expire_at_ms.is_some());
283                }
284                if let Some(old_value) = retired_value {
285                    if has_active_readers {
286                        self.retire_value(old_value);
287                    } else if reuse_values {
288                        recycle_value_into_pool(
289                            old_value,
290                            &mut reusable_values,
291                            &mut reusable_value_bytes,
292                        );
293                    } else {
294                        self.recycle_value(old_value);
295                    }
296                }
297            }
298            hashbrown::hash_table::Entry::Vacant(vacant) => {
299                let key_len = key.len();
300                let value_len = value.len();
301                let stored_value = if reuse_values {
302                    shared_bytes_from_reusable_pool(
303                        value,
304                        &mut reusable_values,
305                        &mut reusable_value_bytes,
306                    )
307                } else {
308                    shared_bytes_from_slice(value)
309                };
310                vacant.insert(FlatEntry {
311                    hash,
312                    key_tag,
313                    key_len,
314                    key: key.to_vec().into_boxed_slice(),
315                    value: stored_value,
316                    expire_at_ms,
317                    access: EntryAccessMeta {
318                        last_touch: access_tick,
319                        frequency: 1,
320                    },
321                });
322                self.stored_bytes = self
323                    .stored_bytes
324                    .saturating_add(key_len)
325                    .saturating_add(value_len);
326                #[cfg(feature = "telemetry")]
327                {
328                    key_delta = 1isize;
329                    memory_delta = (key_len + value_len) as isize;
330                }
331                if expire_at_ms.is_some() {
332                    self.ttl_entries = self.ttl_entries.saturating_add(1);
333                }
334            }
335        }
336
337        if reuse_values {
338            self.reusable_values = reusable_values;
339            self.reusable_value_bytes = reusable_value_bytes;
340        }
341
342        #[cfg(feature = "telemetry")]
343        self.record_set_metrics(written_len, key_delta, memory_delta, start);
344
345        self.enforce_memory_limit(now_ms);
346    }
347}