1use super::*;
2
3impl EmbeddedStore {
4 pub fn set<K, V>(&self, key: K, value: V, ttl_ms: Option<u64>)
9 where
10 K: Into<Bytes>,
11 V: Into<Bytes>,
12 {
13 let now_ms = now_millis();
14 let key = key.into();
15 let route = self.route_key(&key);
16 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
17 if self.objects.has_objects() {
18 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
19 let mut shard = self.shards[route.shard_id].write();
20 if bucket.delete_any(&key) {
21 self.objects.note_deleted(route.shard_id);
22 }
23 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
24 shard
25 .session_slots
26 .delete_hashed(&session_prefix, route.key_hash, &key);
27 }
28 shard
29 .map
30 .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
31 shard.enforce_memory_limit(now_ms);
32 return;
33 }
34 let mut shard = self.shards[route.shard_id].write();
35 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
36 shard
37 .session_slots
38 .delete_hashed(&session_prefix, route.key_hash, &key);
39 }
40 shard
41 .map
42 .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
43 shard.enforce_memory_limit(now_ms);
44 }
45
46 pub fn set_value_bytes(&self, key: &[u8], value: bytes::Bytes, ttl_ms: Option<u64>) {
52 let now_ms = now_millis();
53 let route = self.route_key(key);
54 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
55 self.set_value_bytes_routed_expire_at(route, key, value, expire_at_ms, now_ms);
56 }
57
58 pub(crate) fn set_value_bytes_routed_expire_at(
62 &self,
63 route: EmbeddedKeyRoute,
64 key: &[u8],
65 value: bytes::Bytes,
66 expire_at_ms: Option<u64>,
67 now_ms: u64,
68 ) {
69 self.set_value_bytes_routed_expire_at_then(route, key, value, expire_at_ms, now_ms, || {});
70 }
71
72 pub(crate) fn set_value_bytes_routed_no_ttl_then(
75 &self,
76 route: EmbeddedKeyRoute,
77 key: &[u8],
78 value: bytes::Bytes,
79 after_write: impl FnOnce(),
80 ) {
81 let route = match route.shard_id < self.shards.len() {
82 true => route,
83 false => self.route_key(key),
84 };
85 if self.objects.has_objects() {
86 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
87 let mut shard = self.shards[route.shard_id].write();
88 if bucket.delete_any(key) {
89 self.objects.note_deleted(route.shard_id);
90 }
91 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
92 shard
93 .session_slots
94 .delete_hashed(&session_prefix, route.key_hash, key);
95 }
96 shard
97 .map
98 .set_bytes_hashed(route.key_hash, key, value, None, 0);
99 shard.enforce_memory_limit(0);
100 after_write();
101 return;
102 }
103 let mut shard = self.shards[route.shard_id].write();
104 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
105 shard
106 .session_slots
107 .delete_hashed(&session_prefix, route.key_hash, key);
108 }
109 shard
110 .map
111 .set_bytes_hashed(route.key_hash, key, value, None, 0);
112 shard.enforce_memory_limit(0);
113 after_write();
114 }
115
116 pub(crate) fn set_value_bytes_routed_expire_at_then(
120 &self,
121 route: EmbeddedKeyRoute,
122 key: &[u8],
123 value: bytes::Bytes,
124 expire_at_ms: Option<u64>,
125 now_ms: u64,
126 after_write: impl FnOnce(),
127 ) {
128 let route = match route.shard_id < self.shards.len() {
129 true => route,
130 false => self.route_key(key),
131 };
132 if self.objects.has_objects() {
133 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
134 let mut shard = self.shards[route.shard_id].write();
135 if bucket.delete_any(key) {
136 self.objects.note_deleted(route.shard_id);
137 }
138 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
139 shard
140 .session_slots
141 .delete_hashed(&session_prefix, route.key_hash, key);
142 }
143 shard
144 .map
145 .set_bytes_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
146 shard.enforce_memory_limit(now_ms);
147 after_write();
148 return;
149 }
150 let mut shard = self.shards[route.shard_id].write();
151 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
152 shard
153 .session_slots
154 .delete_hashed(&session_prefix, route.key_hash, key);
155 }
156 shard
157 .map
158 .set_bytes_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
159 shard.enforce_memory_limit(now_ms);
160 after_write();
161 }
162
163 pub fn set_routed_no_ttl<K, V>(&self, route: EmbeddedKeyRoute, key: K, value: V)
164 where
165 K: Into<Bytes>,
166 V: Into<Bytes>,
167 {
168 let key = key.into();
169 let mut shard = self.shards[route.shard_id].write();
170 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
171 shard
172 .session_slots
173 .delete_hashed(&session_prefix, route.key_hash, &key);
174 }
175 shard.map.set_hashed(route.key_hash, key, value, None, 0);
176 shard.enforce_memory_limit(0);
177 }
178
179 pub fn set_slice_routed_no_ttl(&self, route: EmbeddedKeyRoute, key: &[u8], value: &[u8]) {
180 let mut shard = self.shards[route.shard_id].write();
181 if let Some(session_prefix) = point_write_session_storage_prefix(key) {
182 shard
183 .session_slots
184 .delete_hashed(&session_prefix, route.key_hash, key);
185 }
186 shard
187 .map
188 .set_slice_hashed(route.key_hash, key, value, None, 0);
189 shard.enforce_memory_limit(0);
190 }
191
192 pub fn batch_set_session_slices_routed_no_ttl<I, K, V>(
193 &self,
194 route: EmbeddedSessionRoute,
195 items: I,
196 ) where
197 I: IntoIterator<Item = (K, V)>,
198 K: AsRef<[u8]>,
199 V: AsRef<[u8]>,
200 {
201 let mut shard = self.shards[route.shard_id].write();
202 for (key, value) in items {
203 let key = key.as_ref();
204 let key_hash = hash_key(key);
205 shard
206 .map
207 .set_slice_hashed(key_hash, key, value.as_ref(), None, 0);
208 }
209 shard.enforce_memory_limit(0);
210 }
211
212 pub fn batch_set_session_slices_no_ttl<I, K, V>(&self, session_prefix: &[u8], items: I)
213 where
214 I: IntoIterator<Item = (K, V)>,
215 K: AsRef<[u8]>,
216 V: AsRef<[u8]>,
217 {
218 let route = self.route_session(session_prefix);
219 let mut shard = self.shards[route.shard_id].write();
220 for (key, value) in items {
221 let key = key.as_ref();
222 let key_hash = hash_key(key);
223 shard.map.delete_hashed(key_hash, key, 0);
224 shard
225 .session_slots
226 .set_slice_hashed(session_prefix, key_hash, key, value.as_ref());
227 }
228 shard.enforce_memory_limit(0);
229 }
230
231 pub fn batch_set_session_owned_no_ttl(
232 &self,
233 session_prefix: Bytes,
234 items: Vec<(Bytes, Bytes)>,
235 ) {
236 if items.is_empty() {
237 return;
238 }
239 self.batch_set_session_packed_no_ttl(PackedSessionWrite::from_owned_items(
240 session_prefix,
241 items,
242 ));
243 }
244
245 pub fn batch_set_session_packed_no_ttl(&self, packed: PackedSessionWrite) {
246 if packed.item_count() == 0 {
247 return;
248 }
249 let route = self.route_session(&packed.session_prefix);
250 let mut shard = self.shards[route.shard_id].write();
251 for entry in packed.slab.entries.iter() {
252 shard.map.delete_hashed(entry.hash, &entry.key, 0);
253 }
254 shard.session_slots.replace_session_slab(packed);
255 shard.enforce_memory_limit(0);
256 }
257
258 pub fn set_routed<K, V>(&self, route: EmbeddedKeyRoute, key: K, value: V, ttl_ms: Option<u64>)
259 where
260 K: Into<Bytes>,
261 V: Into<Bytes>,
262 {
263 let now_ms = now_millis();
264 let key = key.into();
265 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
266 let mut shard = self.shards[route.shard_id].write();
267 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
268 shard
269 .session_slots
270 .delete_hashed(&session_prefix, route.key_hash, &key);
271 }
272 shard
273 .map
274 .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
275 shard.enforce_memory_limit(now_ms);
276 }
277
278 pub fn batch_set(&self, items: Vec<(Bytes, Bytes)>, ttl_ms: Option<u64>) {
282 if items.is_empty() {
283 return;
284 }
285
286 let now_ms = now_millis();
287 let expire_at_ms = ttl_ms.map(|ttl| now_ms.saturating_add(ttl));
288 if self.objects.has_objects() {
289 for (key, value) in items {
290 let route = self.route_key(&key);
291 let mut bucket = self.objects.write_bucket(route.shard_id, route.key_hash);
292 let mut shard = self.shards[route.shard_id].write();
293 if bucket.delete_any(&key) {
294 self.objects.note_deleted(route.shard_id);
295 }
296 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
297 shard
298 .session_slots
299 .delete_hashed(&session_prefix, route.key_hash, &key);
300 }
301 shard
302 .map
303 .set_hashed(route.key_hash, key, value, expire_at_ms, now_ms);
304 shard.enforce_memory_limit(now_ms);
305 }
306 return;
307 }
308 let mut groups = vec![Vec::<(Bytes, Bytes, u64)>::new(); self.shards.len()];
309
310 for (key, value) in items {
311 let (route_hash, key_hash) = self.hashes_for_key(&key);
312 groups[self.route_hash(route_hash)].push((key, value, key_hash));
313 }
314
315 for (shard_id, batch) in groups.into_iter().enumerate() {
316 if batch.is_empty() {
317 continue;
318 }
319 let mut shard = self.shards[shard_id].write();
320 for (key, value, key_hash) in batch {
321 if let Some(session_prefix) = point_write_session_storage_prefix(&key) {
322 shard
323 .session_slots
324 .delete_hashed(&session_prefix, key_hash, &key);
325 }
326 shard
327 .map
328 .set_hashed(key_hash, key, value, expire_at_ms, now_ms);
329 }
330 shard.enforce_memory_limit(now_ms);
331 }
332 }
333}