Skip to main content

fast_cache/storage/embedded_store/
write.rs

1use super::*;
2
3impl EmbeddedStore {
4    /// Inserts or replaces a byte-string value.
5    ///
6    /// `ttl_ms` is a relative TTL in milliseconds. Passing `None` creates a
7    /// persistent value.
8    pub fn set<K, V>(&self, key: K, value: V, ttl_ms: Option<u64>)
9    where
10        K: Into<Bytes>,
11        V: Into<Bytes>,
12    {
13        let now_ms = now_millis();
14        let key = key.into();
15        let route = self.route_key(&key);
16        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
17        if self.objects.has_objects() {
18            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
19            let mut shard = self.shards[route.shard_id].write();
20            if bucket.delete_any(&key) {
21                self.objects.note_deleted(route.shard_id);
22            }
23            if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
24                shard
25                    .session_slots
26                    .delete_hashed(&session_prefix, route.key_hash, &key);
27            }
28            shard
29                .map
30                .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
31            shard.enforce_memory_limit(now_ms);
32            return;
33        }
34        let mut shard = self.shards[route.shard_id].write();
35        if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
36            shard
37                .session_slots
38                .delete_hashed(&session_prefix, route.key_hash, &key);
39        }
40        shard
41            .map
42            .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
43        shard.enforce_memory_limit(now_ms);
44    }
45
46    /// Zero-copy `SET` for the multi-direct hot path. Takes `key` as a slice
47    /// (copied into the entry's small `Box<[u8]>`) and `value` as an
48    /// already-owned `bytes::Bytes` (typically a slice of the connection read
49    /// buffer obtained via `HandoffBuffer::split_prefix`). Skips the
50    /// `value.to_vec()` allocation that the generic `set` performs.
51    pub fn set_value_bytes(&self, key: &[u8], value: bytes::Bytes, ttl_ms: Option<u64>) {
52        let now_ms = now_millis();
53        let route = self.route_key(key);
54        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
55        self.set_value_bytes_routed_expire_at(route, key, value, expire_at_ms, now_ms);
56    }
57
58    /// Stores an already-owned value using precomputed routing and an absolute
59    /// expiry timestamp. This is used by native replication apply paths, where
60    /// the wire frame already carries both the shard sequence and absolute TTL.
61    pub(crate) fn set_value_bytes_routed_expire_at(
62        &self,
63        route: EmbeddedKeyRoute,
64        key: &[u8],
65        value: bytes::Bytes,
66        expire_at_ms: Option<u64>,
67        now_ms: u64,
68    ) {
69        self.set_value_bytes_routed_expire_at_then(route, key, value, expire_at_ms, now_ms, || {});
70    }
71
72    /// Stores an already-owned value without reading the wall clock and runs
73    /// `after_write` before releasing the shard write lock.
74    pub(crate) fn set_value_bytes_routed_no_ttl_then(
75        &self,
76        route: EmbeddedKeyRoute,
77        key: &[u8],
78        value: bytes::Bytes,
79        after_write: impl FnOnce(),
80    ) {
81        let route = match route.shard_id < self.shards.len() {
82            true => route,
83            false => self.route_key(key),
84        };
85        if self.objects.has_objects() {
86            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
87            let mut shard = self.shards[route.shard_id].write();
88            if bucket.delete_any(key) {
89                self.objects.note_deleted(route.shard_id);
90            }
91            if let Some(session_prefix) = point_write_session_storage_prefix(key) {
92                shard
93                    .session_slots
94                    .delete_hashed(&session_prefix, route.key_hash, key);
95            }
96            shard
97                .map
98                .set_bytes_hashed(route.key_hash, key, value, None, 0);
99            shard.enforce_memory_limit(0);
100            after_write();
101            return;
102        }
103        let mut shard = self.shards[route.shard_id].write();
104        if let Some(session_prefix) = point_write_session_storage_prefix(key) {
105            shard
106                .session_slots
107                .delete_hashed(&session_prefix, route.key_hash, key);
108        }
109        shard
110            .map
111            .set_bytes_hashed(route.key_hash, key, value, None, 0);
112        shard.enforce_memory_limit(0);
113        after_write();
114    }
115
116    /// Stores an already-owned value and runs `after_write` before releasing
117    /// the shard write lock. Replication uses this to preserve same-shard
118    /// mutation order without adding a second ordering mutex around storage.
119    pub(crate) fn set_value_bytes_routed_expire_at_then(
120        &self,
121        route: EmbeddedKeyRoute,
122        key: &[u8],
123        value: bytes::Bytes,
124        expire_at_ms: Option<u64>,
125        now_ms: u64,
126        after_write: impl FnOnce(),
127    ) {
128        let route = match route.shard_id < self.shards.len() {
129            true => route,
130            false => self.route_key(key),
131        };
132        if self.objects.has_objects() {
133            let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
134            let mut shard = self.shards[route.shard_id].write();
135            if bucket.delete_any(key) {
136                self.objects.note_deleted(route.shard_id);
137            }
138            if let Some(session_prefix) = point_write_session_storage_prefix(key) {
139                shard
140                    .session_slots
141                    .delete_hashed(&session_prefix, route.key_hash, key);
142            }
143            shard
144                .map
145                .set_bytes_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
146            shard.enforce_memory_limit(now_ms);
147            after_write();
148            return;
149        }
150        let mut shard = self.shards[route.shard_id].write();
151        if let Some(session_prefix) = point_write_session_storage_prefix(key) {
152            shard
153                .session_slots
154                .delete_hashed(&session_prefix, route.key_hash, key);
155        }
156        shard
157            .map
158            .set_bytes_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
159        shard.enforce_memory_limit(now_ms);
160        after_write();
161    }
162
163    pub fn set_routed_no_ttl<K, V>(&self, route: EmbeddedKeyRoute, key: K, value: V)
164    where
165        K: Into<Bytes>,
166        V: Into<Bytes>,
167    {
168        let key = key.into();
169        let mut shard = self.shards[route.shard_id].write();
170        if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
171            shard
172                .session_slots
173                .delete_hashed(&session_prefix, route.key_hash, &key);
174        }
175        shard.map.set_hashed(route.key_hash, key, value, None, 0);
176        shard.enforce_memory_limit(0);
177    }
178
179    pub fn set_slice_routed_no_ttl(&self, route: EmbeddedKeyRoute, key: &[u8], value: &[u8]) {
180        let mut shard = self.shards[route.shard_id].write();
181        if let Some(session_prefix) = point_write_session_storage_prefix(key) {
182            shard
183                .session_slots
184                .delete_hashed(&session_prefix, route.key_hash, key);
185        }
186        shard
187            .map
188            .set_slice_hashed(route.key_hash, key, value, None, 0);
189        shard.enforce_memory_limit(0);
190    }
191
192    pub fn batch_set_session_slices_routed_no_ttl<I, K, V>(
193        &self,
194        route: EmbeddedSessionRoute,
195        items: I,
196    ) where
197        I: IntoIterator<Item = (K, V)>,
198        K: AsRef<[u8]>,
199        V: AsRef<[u8]>,
200    {
201        let mut shard = self.shards[route.shard_id].write();
202        for (key, value) in items {
203            let key = key.as_ref();
204            let key_hash = hash_key(key);
205            shard
206                .map
207                .set_slice_hashed(key_hash, key, value.as_ref(), None, 0);
208        }
209        shard.enforce_memory_limit(0);
210    }
211
212    pub fn batch_set_session_slices_no_ttl<I, K, V>(&self, session_prefix: &[u8], items: I)
213    where
214        I: IntoIterator<Item = (K, V)>,
215        K: AsRef<[u8]>,
216        V: AsRef<[u8]>,
217    {
218        let route = self.route_session(session_prefix);
219        let mut shard = self.shards[route.shard_id].write();
220        for (key, value) in items {
221            let key = key.as_ref();
222            let key_hash = hash_key(key);
223            shard.map.delete_hashed(key_hash, key, 0);
224            shard
225                .session_slots
226                .set_slice_hashed(session_prefix, key_hash, key, value.as_ref());
227        }
228        shard.enforce_memory_limit(0);
229    }
230
231    pub fn batch_set_session_owned_no_ttl(
232        &self,
233        session_prefix: Bytes,
234        items: Vec<(Bytes, Bytes)>,
235    ) {
236        if items.is_empty() {
237            return;
238        }
239        self.batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
240            session_prefix,
241            items,
242        ));
243    }
244
245    pub fn batch_set_session_packed_no_ttl(&self, packed: PackedSessionWrite) {
246        if packed.item_count() == 0 {
247            return;
248        }
249        let route = self.route_session(&packed.session_prefix);
250        let mut shard = self.shards[route.shard_id].write();
251        for entry in packed.slab.entries.iter() {
252            shard.map.delete_hashed(entry.hash, &entry.key, 0);
253        }
254        shard.session_slots.replace_session_slab(packed);
255        shard.enforce_memory_limit(0);
256    }
257
258    pub fn set_routed<K, V>(&self, route: EmbeddedKeyRoute, key: K, value: V, ttl_ms: Option<u64>)
259    where
260        K: Into<Bytes>,
261        V: Into<Bytes>,
262    {
263        let now_ms = now_millis();
264        let key = key.into();
265        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
266        let mut shard = self.shards[route.shard_id].write();
267        if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
268            shard
269                .session_slots
270                .delete_hashed(&session_prefix, route.key_hash, &key);
271        }
272        shard
273            .map
274            .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
275        shard.enforce_memory_limit(now_ms);
276    }
277
278    /// Inserts or replaces multiple byte-string values.
279    ///
280    /// `ttl_ms` applies the same relative TTL to every item in the batch.
281    pub fn batch_set(&self, items: Vec<(Bytes, Bytes)>, ttl_ms: Option<u64>) {
282        if items.is_empty() {
283            return;
284        }
285
286        let now_ms = now_millis();
287        let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
288        if self.objects.has_objects() {
289            for (key, value) in items {
290                let route = self.route_key(&key);
291                let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
292                let mut shard = self.shards[route.shard_id].write();
293                if bucket.delete_any(&key) {
294                    self.objects.note_deleted(route.shard_id);
295                }
296                if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
297                    shard
298                        .session_slots
299                        .delete_hashed(&session_prefix, route.key_hash, &key);
300                }
301                shard
302                    .map
303                    .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
304                shard.enforce_memory_limit(now_ms);
305            }
306            return;
307        }
308        let mut groups = vec![Vec::<(Bytes, Bytes, u64)>::new(); self.shards.len()];
309
310        for (key, value) in items {
311            let (route_hash, key_hash) = self.hashes_for_key(&key);
312            groups[self.route_hash(route_hash)].push((key, value, key_hash));
313        }
314
315        for (shard_id, batch) in groups.into_iter().enumerate() {
316            if batch.is_empty() {
317                continue;
318            }
319            let mut shard = self.shards[shard_id].write();
320            for (key, value, key_hash) in batch {
321                if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
322                    shard
323                        .session_slots
324                        .delete_hashed(&session_prefix, key_hash, &key);
325                }
326                shard
327                    .map
328                    .set_hashed(key_hash, key, value, expire_at_ms, now_ms);
329            }
330            shard.enforce_memory_limit(now_ms);
331        }
332    }
333}