Skip to main content

kevy_store/
hash.rs

1//! `Store` hash commands.
2
3use crate::small_hash::{self, AddResult as HAddResult, SmallHashData};
4use crate::util::parse_i64;
5use crate::value::{HashData, SmallBytes, Value, hash_field_weight};
6use crate::{Entry, Store, StoreError, now_ns};
7use std::sync::Arc;
8
9impl Store {
10    // ---- hashes --------------------------------------------------------
11
12    /// Borrow the key's hash mutably, optionally creating it. `Ok(None)` means
13    /// the key is absent and `create` was false.
14    ///
15    /// A.8: only used by the heap-backed legacy read/mutate sites
16    /// (`hincrby`, the `hash_ref` reader). The bulk writers (`hset` /
17    /// `hdel`) take the encoding-switch path via `hset_one` /
18    /// `hash_field_get_mut_for_hset`. When `create=true` on a missing
19    /// key, materialises a heap `Value::Hash(Arc::default())` (no inline
20    /// path), matching pre-A.8 behaviour for read-modify-write entry
21    /// points that don't carry per-pair size info.
22    fn hash_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut HashData>, StoreError> {
23        if self.live_entry_mut(key).is_none() {
24            if !create {
25                return Ok(None);
26            }
27            self.insert_entry(
28                SmallBytes::from_slice(key),
29                Entry::new(Value::Hash(Arc::default()), None),
30            );
31        }
32        // A.8: detect inline-encoding first (independent borrow), then
33        // upgrade out-of-scope of the &mut, then re-borrow as Hash. The
34        // borrow checker rejects the obvious in-place match because
35        // both arms would return a borrow tied to the same `&mut self`.
36        let is_inline = matches!(
37            self.map.get(key).map(|e| &e.value),
38            Some(Value::SmallHashInline(_))
39        );
40        if is_inline {
41            let promoted = {
42                let e = self.map.get(key).expect("present");
43                if let Value::SmallHashInline(s) = &e.value {
44                    small_hash::promote(s)
45                } else {
46                    unreachable!()
47                }
48            };
49            self.map.get_mut(key).expect("present").value = Value::Hash(Arc::new(promoted));
50            self.reweigh_entry(key);
51        }
52        match &mut self.map.get_mut(key).expect("present").value {
53            Value::Hash(h) => Ok(Some(Arc::make_mut(h))),
54            _ => Err(StoreError::WrongType),
55        }
56    }
57
58    /// A.8: read the key's hash slot for HSET. `WrongType` if the entry
59    /// is not a hash. Returns `None` when the key is absent — caller
60    /// (`hset_one`) creates the entry then.
61    fn hash_value_for_set(&mut self, key: &[u8]) -> Result<Option<&mut Value>, StoreError> {
62        match self.live_entry_mut(key) {
63            None => Ok(None),
64            Some(e) => match &e.value {
65                Value::Hash(_) | Value::SmallHashInline(_) => Ok(Some(&mut e.value)),
66                _ => Err(StoreError::WrongType),
67            },
68        }
69    }
70
71    /// Read the key's hash immutably (lazily expiring) — returns the
72    /// pairs as a vector of `(&[u8], &[u8])`. None if absent.
73    /// Internal helper for read-only paths; collects into a new Vec to
74    /// avoid the two-encoding match dance at every callsite.
75    fn hash_pairs(&mut self, key: &[u8]) -> Result<Option<Vec<(Vec<u8>, Vec<u8>)>>, StoreError> {
76        match self.live_entry(key) {
77            None => Ok(None),
78            Some(e) => match &e.value {
79                Value::Hash(h) => Ok(Some(
80                    h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
81                )),
82                Value::SmallHashInline(h) => Ok(Some(
83                    h.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
84                )),
85                _ => Err(StoreError::WrongType),
86            },
87        }
88    }
89
90    /// G4 (v1.25): borrowed-pair `HSET` — kills the per-field+value
91    /// `Vec<u8>` allocs the dispatch layer used to do before calling
92    /// [`Self::hset`]. A.8: routes through the encoding-switch path.
93    pub fn hset_borrowed(
94        &mut self,
95        key: &[u8],
96        pairs: &[(&[u8], &[u8])],
97    ) -> Result<usize, StoreError> {
98        if pairs.is_empty() {
99            return Ok(0);
100        }
101        let mut added = 0usize;
102        let mut delta: i64 = 0;
103        for (f, v) in pairs {
104            match self.hset_one(key, f, v)? {
105                HsetOutcome::AddedInline => {
106                    added += 1;
107                    // Inline carries zero heap delta — already accounted
108                    // at insert_entry / per-call via value.weight()==0.
109                }
110                HsetOutcome::UpdatedInline => {}
111                HsetOutcome::AddedHeap(w) => {
112                    added += 1;
113                    delta += w;
114                }
115                HsetOutcome::UpdatedHeap(d) => {
116                    delta += d;
117                }
118            }
119        }
120        self.account_delta(key, delta);
121        Ok(added)
122    }
123
124    /// `HSET` — returns the count of newly-added fields.
125    pub fn hset(&mut self, key: &[u8], pairs: &[(Vec<u8>, Vec<u8>)]) -> Result<usize, StoreError> {
126        let borrowed: Vec<(&[u8], &[u8])> =
127            pairs.iter().map(|(f, v)| (f.as_slice(), v.as_slice())).collect();
128        self.hset_borrowed(key, &borrowed)
129    }
130
131    /// `HSETNX` — set only if the field is absent; returns whether it was set.
132    pub fn hsetnx(&mut self, key: &[u8], field: &[u8], val: &[u8]) -> Result<bool, StoreError> {
133        // Existing-field fast check via the encoding-aware reader.
134        let exists = match self.live_entry(key) {
135            None => false,
136            Some(e) => match &e.value {
137                Value::Hash(h) => h.contains_key(field),
138                Value::SmallHashInline(h) => h.contains_key(field),
139                _ => return Err(StoreError::WrongType),
140            },
141        };
142        if exists {
143            return Ok(false);
144        }
145        match self.hset_one(key, field, val)? {
146            HsetOutcome::AddedInline | HsetOutcome::UpdatedInline => Ok(true),
147            HsetOutcome::AddedHeap(w) => {
148                self.account_delta(key, w);
149                Ok(true)
150            }
151            HsetOutcome::UpdatedHeap(_) => Ok(true),
152        }
153    }
154
155    pub fn hget(&mut self, key: &[u8], field: &[u8]) -> Result<Option<&[u8]>, StoreError> {
156        match self.live_entry(key) {
157            None => Ok(None),
158            Some(e) => match &e.value {
159                Value::Hash(h) => Ok(h.get(field).map(Vec::as_slice)),
160                Value::SmallHashInline(h) => Ok(h.get(field)),
161                _ => Err(StoreError::WrongType),
162            },
163        }
164    }
165
166    pub fn hexists(&mut self, key: &[u8], field: &[u8]) -> Result<bool, StoreError> {
167        match self.live_entry(key) {
168            None => Ok(false),
169            Some(e) => match &e.value {
170                Value::Hash(h) => Ok(h.contains_key(field)),
171                Value::SmallHashInline(h) => Ok(h.contains_key(field)),
172                _ => Err(StoreError::WrongType),
173            },
174        }
175    }
176
177    pub fn hlen(&mut self, key: &[u8]) -> Result<usize, StoreError> {
178        match self.live_entry(key) {
179            None => Ok(0),
180            Some(e) => match &e.value {
181                Value::Hash(h) => Ok(h.len()),
182                Value::SmallHashInline(h) => Ok(h.len()),
183                _ => Err(StoreError::WrongType),
184            },
185        }
186    }
187
188    pub fn hmget(
189        &mut self,
190        key: &[u8],
191        fields: &[Vec<u8>],
192    ) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
193        let borrowed: Vec<&[u8]> = fields.iter().map(Vec::as_slice).collect();
194        self.hmget_borrowed(key, &borrowed)
195    }
196
197    /// G4 (v1.25): borrowed-slice `HMGET`.
198    pub fn hmget_borrowed(
199        &mut self,
200        key: &[u8],
201        fields: &[&[u8]],
202    ) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
203        match self.live_entry(key) {
204            None => Ok(fields.iter().map(|_| None).collect()),
205            Some(e) => match &e.value {
206                Value::Hash(h) => Ok(fields.iter().map(|f| h.get(*f).cloned()).collect()),
207                Value::SmallHashInline(h) => Ok(fields
208                    .iter()
209                    .map(|f| h.get(*f).map(<[u8]>::to_vec))
210                    .collect()),
211                _ => Err(StoreError::WrongType),
212            },
213        }
214    }
215
216    /// `HGETALL` — flat `[field, value, field, value, ...]`.
217    pub fn hgetall(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
218        match self.hash_pairs(key)? {
219            None => Ok(Vec::new()),
220            Some(pairs) => {
221                let mut out = Vec::with_capacity(pairs.len() * 2);
222                for (f, v) in pairs {
223                    out.push(f);
224                    out.push(v);
225                }
226                Ok(out)
227            }
228        }
229    }
230
231    pub fn hkeys(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
232        match self.live_entry(key) {
233            None => Ok(Vec::new()),
234            Some(e) => match &e.value {
235                Value::Hash(h) => Ok(h.keys().map(kevy_bytes::SmallBytes::to_vec).collect()),
236                Value::SmallHashInline(h) => Ok(h.iter().map(|(f, _)| f.to_vec()).collect()),
237                _ => Err(StoreError::WrongType),
238            },
239        }
240    }
241
242    pub fn hvals(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
243        match self.live_entry(key) {
244            None => Ok(Vec::new()),
245            Some(e) => match &e.value {
246                Value::Hash(h) => Ok(h.values().cloned().collect()),
247                Value::SmallHashInline(h) => Ok(h.iter().map(|(_, v)| v.to_vec()).collect()),
248                _ => Err(StoreError::WrongType),
249            },
250        }
251    }
252
253    /// `HDEL` — returns count removed; deletes the key if hash becomes empty.
254    pub fn hdel(&mut self, key: &[u8], fields: &[Vec<u8>]) -> Result<usize, StoreError> {
255        let borrowed: Vec<&[u8]> = fields.iter().map(Vec::as_slice).collect();
256        self.hdel_borrowed(key, &borrowed)
257    }
258
259    /// G4 (v1.25): borrowed-slice `HDEL`. A.8: encoding-aware.
260    pub fn hdel_borrowed(
261        &mut self,
262        key: &[u8],
263        fields: &[&[u8]],
264    ) -> Result<usize, StoreError> {
265        let now = now_ns();
266        if !self.reap(key, now) {
267            return Ok(0);
268        }
269        let (removed, delta, drop_key) = {
270            let h_entry = self.map.get_mut(key).expect("live");
271            match &mut h_entry.value {
272                Value::Hash(h) => {
273                    // G-A3: hoist Arc::make_mut OUT of the loop — done
274                    // once per command instead of per-field.
275                    let h = Arc::make_mut(h);
276                    let mut r = 0usize;
277                    let mut d: i64 = 0;
278                    for f in fields {
279                        if let Some(old_v) = h.remove(*f) {
280                            r += 1;
281                            let smb = SmallBytes::from_slice(f);
282                            d -= hash_field_weight(&smb, old_v.len()) as i64;
283                        }
284                    }
285                    let drop_now = h.is_empty();
286                    (r, d, drop_now)
287                }
288                Value::SmallHashInline(h) => {
289                    let mut r = 0usize;
290                    for f in fields {
291                        if h.try_remove(f) {
292                            r += 1;
293                        }
294                    }
295                    let drop_now = h.is_empty();
296                    (r, 0i64, drop_now)
297                }
298                _ => return Err(StoreError::WrongType),
299            }
300        };
301        if drop_key {
302            self.remove_entry(key);
303        } else {
304            self.account_delta(key, delta);
305        }
306        Ok(removed)
307    }
308
309    /// `HINCRBY` — preserves TTL; errors if the field isn't an integer.
310    pub fn hincrby(&mut self, key: &[u8], field: &[u8], delta: i64) -> Result<i64, StoreError> {
311        let (next, weight_delta) = {
312            let h = self.hash_mut(key, true)?.expect("created");
313            let cur = match h.get(field) {
314                Some(v) => parse_i64(v).ok_or(StoreError::NotInteger)?,
315                None => 0,
316            };
317            let next = cur.checked_add(delta).ok_or(StoreError::Overflow)?;
318            let new_bytes = next.to_string().into_bytes();
319            let smb = SmallBytes::from_slice(field);
320            let new_field_w = hash_field_weight(&smb, new_bytes.len()) as i64;
321            let new_value_len = new_bytes.len();
322            let wd = match h.insert(smb, new_bytes) {
323                None => new_field_w,
324                Some(old) => new_value_len as i64 - old.len() as i64,
325            };
326            (next, wd)
327        };
328        self.account_delta(key, weight_delta);
329        Ok(next)
330    }
331
332    /// A.8 core: set one `(field, value)` pair, applying the
333    /// encoding-switch. Returns the per-call outcome (whether added or
334    /// updated, and whether it sits in the inline or heap variant).
335    fn hset_one(
336        &mut self,
337        key: &[u8],
338        field: &[u8],
339        value: &[u8],
340    ) -> Result<HsetOutcome, StoreError> {
341        // Missing key — pick encoding by first pair size.
342        if self.hash_value_for_set(key)?.is_none() {
343            return Ok(self.hset_create(key, field, value));
344        }
345        let v = self.hash_value_for_set(key)?.expect("present and a hash");
346        match v {
347            Value::SmallHashInline(h) => match h.try_set(field, value) {
348                HAddResult::Added => Ok(HsetOutcome::AddedInline),
349                HAddResult::Updated => Ok(HsetOutcome::UpdatedInline),
350                HAddResult::NoRoom => {
351                    // Promote inline → Hash(Arc<HashData>), then set
352                    // (handles the spilling pair).
353                    let mut promoted = small_hash::promote(h);
354                    let smb = SmallBytes::from_slice(field);
355                    let new_w = hash_field_weight(&smb, value.len()) as i64;
356                    let added = !promoted.contains_key(field);
357                    let prior_v_len = promoted.get(field).map_or(0, Vec::len);
358                    promoted.insert(smb, value.to_vec());
359                    *v = Value::Hash(Arc::new(promoted));
360                    self.reweigh_entry(key);
361                    if added {
362                        Ok(HsetOutcome::AddedHeap(new_w))
363                    } else {
364                        Ok(HsetOutcome::UpdatedHeap(value.len() as i64 - prior_v_len as i64))
365                    }
366                }
367            },
368            Value::Hash(h) => {
369                let h = Arc::make_mut(h);
370                let smb = SmallBytes::from_slice(field);
371                let new_w = hash_field_weight(&smb, value.len()) as i64;
372                let new_value_len = value.len();
373                match h.insert(smb, value.to_vec()) {
374                    None => Ok(HsetOutcome::AddedHeap(new_w)),
375                    Some(old) => {
376                        Ok(HsetOutcome::UpdatedHeap(new_value_len as i64 - old.len() as i64))
377                    }
378                }
379            }
380            _ => Err(StoreError::WrongType),
381        }
382    }
383
384    /// Create a fresh entry for `key` holding one pair. Picks inline
385    /// when both field + value fit, falls back to heap otherwise.
386    fn hset_create(&mut self, key: &[u8], field: &[u8], value: &[u8]) -> HsetOutcome {
387        if let Some(inline) = SmallHashData::with_one(field, value) {
388            self.insert_entry(
389                SmallBytes::from_slice(key),
390                Entry::new(Value::SmallHashInline(inline), None),
391            );
392            // Insert already accounts via value.weight() == 0; per-pair
393            // delta is zero in the caller (matches inline arm).
394            HsetOutcome::AddedInline
395        } else {
396            let smb_f = SmallBytes::from_slice(field);
397            let mut h = HashData::with_capacity(1);
398            h.insert(smb_f, value.to_vec());
399            self.insert_entry(
400                SmallBytes::from_slice(key),
401                Entry::new(Value::Hash(Arc::new(h)), None),
402            );
403            HsetOutcome::AddedInline
404        }
405    }
406
407}
408
409enum HsetOutcome {
410    /// Field was new and lives in the inline variant (zero heap delta).
411    AddedInline,
412    /// Field existed in the inline variant (no count bump, no delta).
413    UpdatedInline,
414    /// Field was new in the heap variant; carries the new field's weight.
415    AddedHeap(i64),
416    /// Field existed in the heap variant; carries the value-length delta.
417    UpdatedHeap(i64),
418}