Skip to main content

fast_cache/storage/flat_map/
write_hot.rs

1use super::*;
2
3#[cfg(feature = "fast-point-map")]
4use super::fast_point::FastPointMutation;
5
6impl FlatMap {
7    ///
8    /// # Safety
9    ///
10    /// The caller must guarantee that this map has exclusive ownership of
11    /// stored value buffers: no outstanding `bytes::Bytes` clones and no
12    /// borrowed value slices may exist while this runs. The 1-shard FCNP server
13    /// satisfies this by using borrowed response encoding and single-threaded
14    /// shard ownership.
15    #[inline(always)]
16    pub unsafe fn set_slice_hashed_no_ttl_hot(&mut self, hash: u64, key: &[u8], value: &[u8]) {
17        let key_tag = hash_key_tag_from_hash(hash);
18        // SAFETY: forwarded from this function's caller.
19        unsafe { self.set_slice_hashed_tagged_no_ttl_hot(hash, key_tag, key, value) };
20    }
21
22    ///
23    /// # Safety
24    ///
25    /// Same safety contract as `set_slice_hashed_no_ttl_hot`. `key_tag` must
26    /// be the `hash_key_tag_from_hash(hash)` value for `key`.
27    #[inline(always)]
28    pub unsafe fn set_slice_hashed_tagged_no_ttl_hot(
29        &mut self,
30        hash: u64,
31        key_tag: u64,
32        key: &[u8],
33        value: &[u8],
34    ) {
35        debug_assert_eq!(self.memory_limit_bytes, None);
36        debug_assert_eq!(self.eviction_policy, EvictionPolicy::None);
37
38        if !self.retired_values.is_empty() {
39            self.reclaim_retired_if_quiescent();
40        }
41
42        #[cfg(feature = "telemetry")]
43        let start = self.telemetry.as_ref().map(|_| Instant::now());
44        #[cfg(feature = "telemetry")]
45        let written_len = value.len();
46        #[cfg(feature = "telemetry")]
47        let (key_delta, memory_delta): (isize, isize);
48
49        let has_active_readers = self.has_active_readers();
50        #[cfg(feature = "fast-point-map")]
51        let allow_in_place_replace = !has_active_readers && cfg!(feature = "unsafe");
52        #[cfg(feature = "fast-point-map")]
53        if self.fast_points.is_active() {
54            if let Some(mutation) = unsafe {
55                self.fast_points
56                    .upsert_slice(hash, key_tag, key, value, allow_in_place_replace)
57            } {
58                #[cfg(feature = "telemetry")]
59                {
60                    match &mutation {
61                        FastPointMutation::Inserted { key_len, value_len } => {
62                            key_delta = 1isize;
63                            memory_delta = (key_len + value_len) as isize;
64                        }
65                        FastPointMutation::Replaced {
66                            old_value_len,
67                            new_value_len,
68                            ..
69                        } => {
70                            key_delta = 0isize;
71                            memory_delta = *new_value_len as isize - *old_value_len as isize;
72                        }
73                    }
74                }
75                match mutation {
76                    FastPointMutation::Inserted { key_len, value_len } => {
77                        self.stored_bytes = self
78                            .stored_bytes
79                            .saturating_add(key_len)
80                            .saturating_add(value_len);
81                    }
82                    FastPointMutation::Replaced {
83                        old_value,
84                        old_value_len,
85                        new_value_len,
86                    } => {
87                        if old_value_len != new_value_len {
88                            self.stored_bytes = self
89                                .stored_bytes
90                                .saturating_sub(old_value_len)
91                                .saturating_add(new_value_len);
92                        }
93                        if let Some(old_value) = old_value {
94                            self.retire_value(old_value);
95                        }
96                    }
97                }
98
99                #[cfg(feature = "telemetry")]
100                self.record_set_metrics(written_len, key_delta, memory_delta, start);
101                return;
102            }
103            self.disable_fast_point_map();
104        }
105
106        match self.entries.entry(
107            hash,
108            |entry| entry.matches_prepared(hash, key, key_tag),
109            |entry| entry.hash,
110        ) {
111            hashbrown::hash_table::Entry::Occupied(mut occupied) => {
112                let entry = occupied.get_mut();
113                let had_ttl = entry.expire_at_ms.is_some();
114                let previous_value_len = entry.value.len();
115                let mut retired_value = None;
116                let should_replace_value = has_active_readers || previous_value_len != value.len();
117
118                #[cfg(feature = "unsafe")]
119                if !should_replace_value {
120                    // SAFETY: guaranteed by this function's caller. The value
121                    // buffer was allocated by this map and has no outstanding
122                    // aliases in the 1-shard FCNP hot path.
123                    unsafe {
124                        copy_hot_value_bytes(
125                            entry.value.as_ptr().cast_mut(),
126                            value.as_ptr(),
127                            value.len(),
128                        );
129                    }
130                }
131                if cfg!(not(feature = "unsafe")) || should_replace_value {
132                    retired_value = Some(mem::replace(
133                        &mut entry.value,
134                        shared_bytes_from_slice(value),
135                    ));
136                }
137
138                if previous_value_len != entry.value.len() {
139                    self.stored_bytes = self
140                        .stored_bytes
141                        .saturating_sub(previous_value_len)
142                        .saturating_add(entry.value.len());
143                }
144                if had_ttl {
145                    entry.expire_at_ms = None;
146                    self.ttl_entries = self.ttl_entries.saturating_sub(1);
147                }
148                #[cfg(feature = "telemetry")]
149                {
150                    key_delta = 0isize;
151                    memory_delta = entry.value.len() as isize - previous_value_len as isize;
152                }
153                if let Some(old_value) = retired_value {
154                    self.retire_value(old_value);
155                }
156            }
157            hashbrown::hash_table::Entry::Vacant(vacant) => {
158                let key_len = key.len();
159                let value_len = value.len();
160                let stored_value = shared_bytes_from_slice(value);
161                vacant.insert(FlatEntry {
162                    hash,
163                    key_tag,
164                    key_len,
165                    key: key.to_vec().into_boxed_slice(),
166                    value: stored_value,
167                    expire_at_ms: None,
168                    access: EntryAccessMeta {
169                        last_touch: 0,
170                        frequency: 1,
171                    },
172                });
173                self.stored_bytes = self
174                    .stored_bytes
175                    .saturating_add(key_len)
176                    .saturating_add(value_len);
177                #[cfg(feature = "telemetry")]
178                {
179                    key_delta = 1isize;
180                    memory_delta = (key_len + value_len) as isize;
181                }
182            }
183        }
184
185        #[cfg(feature = "telemetry")]
186        self.record_set_metrics(written_len, key_delta, memory_delta, start);
187    }
188}