1use crate::small_hash::{self, AddResult as HAddResult, SmallHashData};
4use crate::util::{parse_f64, parse_i64};
5use crate::value::{HashData, SmallBytes, Value, hash_field_weight};
6use crate::{Entry, Store, StoreError, now_ns};
7use std::sync::Arc;
8
9impl Store {
10 fn hash_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut HashData>, StoreError> {
23 if self.live_entry_mut(key).is_none() {
24 if !create {
25 return Ok(None);
26 }
27 self.insert_entry(
28 SmallBytes::from_slice(key),
29 Entry::new(Value::Hash(Arc::default()), None),
30 );
31 }
32 let is_inline = matches!(
37 self.map.get(key).map(|e| &e.value),
38 Some(Value::SmallHashInline(_))
39 );
40 if is_inline {
41 let promoted = {
42 let e = self.map.get(key).expect("present");
43 if let Value::SmallHashInline(s) = &e.value {
44 small_hash::promote(s)
45 } else {
46 unreachable!()
47 }
48 };
49 self.map.get_mut(key).expect("present").value = Value::Hash(Arc::new(promoted));
50 self.reweigh_entry(key);
51 }
52 match &mut self.map.get_mut(key).expect("present").value {
53 Value::Hash(h) => Ok(Some(Arc::make_mut(h))),
54 _ => Err(StoreError::WrongType),
55 }
56 }
57
58 fn hash_value_for_set(&mut self, key: &[u8]) -> Result<Option<&mut Value>, StoreError> {
62 match self.live_entry_mut(key) {
63 None => Ok(None),
64 Some(e) => match &e.value {
65 Value::Hash(_) | Value::SmallHashInline(_) => Ok(Some(&mut e.value)),
66 _ => Err(StoreError::WrongType),
67 },
68 }
69 }
70
71 fn hash_pairs(&mut self, key: &[u8]) -> Result<Option<Vec<(Vec<u8>, Vec<u8>)>>, StoreError> {
76 match self.live_entry(key) {
77 None => Ok(None),
78 Some(e) => match &e.value {
79 Value::Hash(h) => Ok(Some(
80 h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
81 )),
82 Value::SmallHashInline(h) => Ok(Some(
83 h.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
84 )),
85 _ => Err(StoreError::WrongType),
86 },
87 }
88 }
89
90 pub fn hset_borrowed(
94 &mut self,
95 key: &[u8],
96 pairs: &[(&[u8], &[u8])],
97 ) -> Result<usize, StoreError> {
98 if pairs.is_empty() {
99 return Ok(0);
100 }
101 let mut added = 0usize;
102 let mut delta: i64 = 0;
103 for (f, v) in pairs {
104 match self.hset_one(key, f, v)? {
105 HsetOutcome::AddedInline => {
106 added += 1;
107 }
110 HsetOutcome::UpdatedInline => {}
111 HsetOutcome::AddedHeap(w) => {
112 added += 1;
113 delta += w;
114 }
115 HsetOutcome::UpdatedHeap(d) => {
116 delta += d;
117 }
118 }
119 }
120 self.account_delta(key, delta);
121 Ok(added)
122 }
123
124 pub fn hset(&mut self, key: &[u8], pairs: &[(Vec<u8>, Vec<u8>)]) -> Result<usize, StoreError> {
126 let borrowed: Vec<(&[u8], &[u8])> =
127 pairs.iter().map(|(f, v)| (f.as_slice(), v.as_slice())).collect();
128 self.hset_borrowed(key, &borrowed)
129 }
130
131 pub fn hsetnx(&mut self, key: &[u8], field: &[u8], val: &[u8]) -> Result<bool, StoreError> {
133 let exists = match self.live_entry(key) {
135 None => false,
136 Some(e) => match &e.value {
137 Value::Hash(h) => h.contains_key(field),
138 Value::SmallHashInline(h) => h.contains_key(field),
139 _ => return Err(StoreError::WrongType),
140 },
141 };
142 if exists {
143 return Ok(false);
144 }
145 match self.hset_one(key, field, val)? {
146 HsetOutcome::AddedInline | HsetOutcome::UpdatedInline => Ok(true),
147 HsetOutcome::AddedHeap(w) => {
148 self.account_delta(key, w);
149 Ok(true)
150 }
151 HsetOutcome::UpdatedHeap(_) => Ok(true),
152 }
153 }
154
155 pub fn hget(&mut self, key: &[u8], field: &[u8]) -> Result<Option<&[u8]>, StoreError> {
156 match self.live_entry(key) {
157 None => Ok(None),
158 Some(e) => match &e.value {
159 Value::Hash(h) => Ok(h.get(field).map(Vec::as_slice)),
160 Value::SmallHashInline(h) => Ok(h.get(field)),
161 _ => Err(StoreError::WrongType),
162 },
163 }
164 }
165
166 pub fn hexists(&mut self, key: &[u8], field: &[u8]) -> Result<bool, StoreError> {
167 match self.live_entry(key) {
168 None => Ok(false),
169 Some(e) => match &e.value {
170 Value::Hash(h) => Ok(h.contains_key(field)),
171 Value::SmallHashInline(h) => Ok(h.contains_key(field)),
172 _ => Err(StoreError::WrongType),
173 },
174 }
175 }
176
177 pub fn hlen(&mut self, key: &[u8]) -> Result<usize, StoreError> {
178 match self.live_entry(key) {
179 None => Ok(0),
180 Some(e) => match &e.value {
181 Value::Hash(h) => Ok(h.len()),
182 Value::SmallHashInline(h) => Ok(h.len()),
183 _ => Err(StoreError::WrongType),
184 },
185 }
186 }
187
188 pub fn hmget(
189 &mut self,
190 key: &[u8],
191 fields: &[Vec<u8>],
192 ) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
193 let borrowed: Vec<&[u8]> = fields.iter().map(Vec::as_slice).collect();
194 self.hmget_borrowed(key, &borrowed)
195 }
196
197 pub fn hmget_borrowed(
199 &mut self,
200 key: &[u8],
201 fields: &[&[u8]],
202 ) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
203 match self.live_entry(key) {
204 None => Ok(fields.iter().map(|_| None).collect()),
205 Some(e) => match &e.value {
206 Value::Hash(h) => Ok(fields.iter().map(|f| h.get(*f).cloned()).collect()),
207 Value::SmallHashInline(h) => Ok(fields
208 .iter()
209 .map(|f| h.get(*f).map(<[u8]>::to_vec))
210 .collect()),
211 _ => Err(StoreError::WrongType),
212 },
213 }
214 }
215
216 pub fn hgetall(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
218 match self.hash_pairs(key)? {
219 None => Ok(Vec::new()),
220 Some(pairs) => {
221 let mut out = Vec::with_capacity(pairs.len() * 2);
222 for (f, v) in pairs {
223 out.push(f);
224 out.push(v);
225 }
226 Ok(out)
227 }
228 }
229 }
230
231 pub fn hkeys(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
232 match self.live_entry(key) {
233 None => Ok(Vec::new()),
234 Some(e) => match &e.value {
235 Value::Hash(h) => Ok(h.keys().map(kevy_bytes::SmallBytes::to_vec).collect()),
236 Value::SmallHashInline(h) => Ok(h.iter().map(|(f, _)| f.to_vec()).collect()),
237 _ => Err(StoreError::WrongType),
238 },
239 }
240 }
241
242 pub fn hvals(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
243 match self.live_entry(key) {
244 None => Ok(Vec::new()),
245 Some(e) => match &e.value {
246 Value::Hash(h) => Ok(h.values().cloned().collect()),
247 Value::SmallHashInline(h) => Ok(h.iter().map(|(_, v)| v.to_vec()).collect()),
248 _ => Err(StoreError::WrongType),
249 },
250 }
251 }
252
253 pub fn hdel(&mut self, key: &[u8], fields: &[Vec<u8>]) -> Result<usize, StoreError> {
255 let borrowed: Vec<&[u8]> = fields.iter().map(Vec::as_slice).collect();
256 self.hdel_borrowed(key, &borrowed)
257 }
258
259 pub fn hdel_borrowed(
261 &mut self,
262 key: &[u8],
263 fields: &[&[u8]],
264 ) -> Result<usize, StoreError> {
265 let now = now_ns();
266 if !self.reap(key, now) {
267 return Ok(0);
268 }
269 let (removed, delta, drop_key) = {
270 let h_entry = self.map.get_mut(key).expect("live");
271 match &mut h_entry.value {
272 Value::Hash(h) => {
273 let h = Arc::make_mut(h);
276 let mut r = 0usize;
277 let mut d: i64 = 0;
278 for f in fields {
279 if let Some(old_v) = h.remove(*f) {
280 r += 1;
281 let smb = SmallBytes::from_slice(f);
282 d -= hash_field_weight(&smb, old_v.len()) as i64;
283 }
284 }
285 let drop_now = h.is_empty();
286 (r, d, drop_now)
287 }
288 Value::SmallHashInline(h) => {
289 let mut r = 0usize;
290 for f in fields {
291 if h.try_remove(f) {
292 r += 1;
293 }
294 }
295 let drop_now = h.is_empty();
296 (r, 0i64, drop_now)
297 }
298 _ => return Err(StoreError::WrongType),
299 }
300 };
301 if drop_key {
302 self.remove_entry(key);
303 } else {
304 self.account_delta(key, delta);
305 }
306 Ok(removed)
307 }
308
309 pub fn hincrbyfloat(
313 &mut self,
314 key: &[u8],
315 field: &[u8],
316 delta: f64,
317 ) -> Result<f64, StoreError> {
318 let (next, weight_delta) = {
319 let h = self.hash_mut(key, true)?.expect("created");
320 let cur = match h.get(field) {
321 Some(v) => parse_f64(v).ok_or(StoreError::NotFloat)?,
322 None => 0.0,
323 };
324 let next = cur + delta;
325 if !next.is_finite() {
326 return Err(StoreError::NotFloat);
327 }
328 let new_bytes = format!("{next}").into_bytes();
329 let smb = SmallBytes::from_slice(field);
330 let new_field_w = hash_field_weight(&smb, new_bytes.len()) as i64;
331 let new_value_len = new_bytes.len();
332 let wd = match h.insert(smb, new_bytes) {
333 None => new_field_w,
334 Some(old) => new_value_len as i64 - old.len() as i64,
335 };
336 (next, wd)
337 };
338 self.account_delta(key, weight_delta);
339 Ok(next)
340 }
341
342 pub fn hincrby(&mut self, key: &[u8], field: &[u8], delta: i64) -> Result<i64, StoreError> {
344 let (next, weight_delta) = {
345 let h = self.hash_mut(key, true)?.expect("created");
346 let cur = match h.get(field) {
347 Some(v) => parse_i64(v).ok_or(StoreError::NotInteger)?,
348 None => 0,
349 };
350 let next = cur.checked_add(delta).ok_or(StoreError::Overflow)?;
351 let new_bytes = next.to_string().into_bytes();
352 let smb = SmallBytes::from_slice(field);
353 let new_field_w = hash_field_weight(&smb, new_bytes.len()) as i64;
354 let new_value_len = new_bytes.len();
355 let wd = match h.insert(smb, new_bytes) {
356 None => new_field_w,
357 Some(old) => new_value_len as i64 - old.len() as i64,
358 };
359 (next, wd)
360 };
361 self.account_delta(key, weight_delta);
362 Ok(next)
363 }
364
365 fn hset_one(
369 &mut self,
370 key: &[u8],
371 field: &[u8],
372 value: &[u8],
373 ) -> Result<HsetOutcome, StoreError> {
374 if self.hash_value_for_set(key)?.is_none() {
376 return Ok(self.hset_create(key, field, value));
377 }
378 let v = self.hash_value_for_set(key)?.expect("present and a hash");
379 match v {
380 Value::SmallHashInline(h) => match h.try_set(field, value) {
381 HAddResult::Added => Ok(HsetOutcome::AddedInline),
382 HAddResult::Updated => Ok(HsetOutcome::UpdatedInline),
383 HAddResult::NoRoom => {
384 let mut promoted = small_hash::promote(h);
387 let smb = SmallBytes::from_slice(field);
388 let new_w = hash_field_weight(&smb, value.len()) as i64;
389 let added = !promoted.contains_key(field);
390 let prior_v_len = promoted.get(field).map_or(0, Vec::len);
391 promoted.insert(smb, value.to_vec());
392 *v = Value::Hash(Arc::new(promoted));
393 self.reweigh_entry(key);
394 if added {
395 Ok(HsetOutcome::AddedHeap(new_w))
396 } else {
397 Ok(HsetOutcome::UpdatedHeap(value.len() as i64 - prior_v_len as i64))
398 }
399 }
400 },
401 Value::Hash(h) => {
402 let h = Arc::make_mut(h);
403 let smb = SmallBytes::from_slice(field);
404 let new_w = hash_field_weight(&smb, value.len()) as i64;
405 let new_value_len = value.len();
406 match h.insert(smb, value.to_vec()) {
407 None => Ok(HsetOutcome::AddedHeap(new_w)),
408 Some(old) => {
409 Ok(HsetOutcome::UpdatedHeap(new_value_len as i64 - old.len() as i64))
410 }
411 }
412 }
413 _ => Err(StoreError::WrongType),
414 }
415 }
416
417 fn hset_create(&mut self, key: &[u8], field: &[u8], value: &[u8]) -> HsetOutcome {
420 if let Some(inline) = SmallHashData::with_one(field, value) {
421 self.insert_entry(
422 SmallBytes::from_slice(key),
423 Entry::new(Value::SmallHashInline(inline), None),
424 );
425 HsetOutcome::AddedInline
428 } else {
429 let smb_f = SmallBytes::from_slice(field);
430 let mut h = HashData::with_capacity(1);
431 h.insert(smb_f, value.to_vec());
432 self.insert_entry(
433 SmallBytes::from_slice(key),
434 Entry::new(Value::Hash(Arc::new(h)), None),
435 );
436 HsetOutcome::AddedInline
437 }
438 }
439
440}
441
442enum HsetOutcome {
443 AddedInline,
445 UpdatedInline,
447 AddedHeap(i64),
449 UpdatedHeap(i64),
451}