Skip to main content

kevy_store/
string.rs

1//! `Store` string commands.
2
3use crate::util::{
4    fmt_num, format_i64_into, itoa_i64_stack, parse_canonical_i64, parse_f64, parse_i64,
5};
6use crate::value::{BULK_THRESHOLD, SmallBytes, Value};
7use crate::{Entry, Store, StoreError, deadline_at, now_ns};
8use std::borrow::Cow;
9use std::sync::Arc;
10use std::time::Duration;
11
12/// L1 return shape for [`Store::get_for_reply`] — lets the reactor's reply
13/// path choose between memcpy (`Bytes`) and writev zero-copy (`ArcBulk`)
14/// off one keyspace lookup.
15pub enum GetReply<'a> {
16    /// Inline-encoded value — caller memcpys the bytes into its output Vec
17    /// (small replies; encoding cost is tiny vs the RTT floor).
18    Bytes(Cow<'a, [u8]>),
19    /// L1 (2026-06-21): Arc-backed bulk. The reactor's reply path pushes
20    /// the Arc into the conn's `output_arcs` so the next `writev` iovec
21    /// list points DIRECTLY at the value bytes — skipping the per-GET
22    /// memcpy that valkey's `tryAvoidBulkStrCopyToReply` likewise avoids.
23    ArcBulk(Arc<Box<[u8]>>),
24}
25
26/// L2 + L1: pick the optimal encoding for `bytes` at SET time:
27/// 1. Canonical i64 ASCII → `Value::Int(n)` (smallest + INCR fast path)
28/// 2. > [`BULK_THRESHOLD`] bytes → `Value::ArcBulk(Arc<[u8]>)` (lets the
29///    reactor reply path borrow the bytes for `writev` zero-copy GET)
30/// 3. Else → `Value::Str(SmallBytes::from_slice(bytes))` (inline-cache-
31///    line storage, beats Arc indirection for small values)
32#[inline]
33fn pick_value_for_set(bytes: &[u8]) -> Value {
34    if let Some(n) = parse_canonical_i64(bytes) {
35        return Value::Int(n);
36    }
37    if bytes.len() > BULK_THRESHOLD {
38        return Value::ArcBulk(Arc::new(Box::<[u8]>::from(bytes)));
39    }
40    Value::Str(SmallBytes::from_slice(bytes))
41}
42
43#[inline]
44fn pick_value_for_set_owned(bytes: Vec<u8>) -> Value {
45    if let Some(n) = parse_canonical_i64(&bytes) {
46        return Value::Int(n);
47    }
48    if bytes.len() > BULK_THRESHOLD {
49        // v1.29 Option A — `Arc::new(box)` is zero-copy when `len ==
50        // capacity` (shrink-to-fit no-ops). See `Value::ArcBulk` doc.
51        return Value::ArcBulk(Arc::new(bytes.into_boxed_slice()));
52    }
53    Value::Str(SmallBytes::from_vec(bytes))
54}
55
56impl Store {
57    // ---- strings -------------------------------------------------------
58
59    /// `SET` — overwrites any existing value/type. NX/XX guards; clears TTL.
60    /// Takes an owned `Vec` so a >22 B value's allocation is adopted as-is
61    /// (no copy). For callers holding a borrowed slice, prefer
62    /// [`Self::set_slice`] — it skips the `to_vec` entirely for values that
63    /// inline.
64    pub fn set(
65        &mut self,
66        key: &[u8],
67        value: Vec<u8>,
68        expire: Option<Duration>,
69        nx: bool,
70        xx: bool,
71    ) -> bool {
72        self.set_value(key, pick_value_for_set_owned(value), expire, nx, xx)
73    }
74
75    /// [`Self::set`] for a borrowed value. Values ≤ 22 B store inline in the
76    /// entry — zero allocator traffic, where `set(key, value.to_vec(), …)`
77    /// paid a malloc for the `Vec` and a free when the inline copy dropped
78    /// it (the dominant overwrite-SET pattern). Larger values pay the same
79    /// single allocation either way.
80    pub fn set_slice(
81        &mut self,
82        key: &[u8],
83        value: &[u8],
84        expire: Option<Duration>,
85        nx: bool,
86        xx: bool,
87    ) -> bool {
88        self.set_value(key, pick_value_for_set(value), expire, nx, xx)
89    }
90
91    fn set_value(
92        &mut self,
93        key: &[u8],
94        new_value: Value,
95        expire: Option<Duration>,
96        nx: bool,
97        xx: bool,
98    ) -> bool {
99        // F1 (v1.25): single-probe overwrite-SET fast path for default
100        // `maxmemory == 0` (the bench and production-common case). Goes
101        // through `kevy_map::RawEntryMut` (B.1, commit 1a9f9af) so the
102        // Occupied arm mutates the entry in place and returns owned
103        // (delta, ttl_delta) — no escaping reference, no second probe.
104        // Overwrite path drops from 2 probes (live_entry_mut: get+get_mut)
105        // to 1 probe. New-key + expired-removed paths still pay the
106        // insert_entry probe (same as before).
107        if self.maxmemory == 0 {
108            return self.set_value_no_evict(key, new_value, expire, nx, xx);
109        }
110        // Eviction path (maxmemory > 0): keep the 2-probe shape so
111        // `live_entry_mut`'s touch_on_access bookkeeping runs.
112        let expire_at = expire.map(|d| deadline_at(now_ns(), d));
113        let key_heap = crate::key_heap_bytes_for(key);
114        #[allow(clippy::single_match_else)]
115        // Phase 1: in-place overwrite or remember we need to insert.
116        // The old value (if any) is taken via `mem::replace` so the bio
117        // hand-off in phase 2 happens AFTER `self.live_entry_mut`'s
118        // borrow on `self.map` is released — without splitting the
119        // borrow we couldn't call `self.maybe_offload_drop`.
120        let (outcome, old_value) = match self.live_entry_mut(key) {
121            Some(e) => {
122                if nx {
123                    return false;
124                }
125                let had_ttl = e.expire_at_ns.is_some();
126                // v1.25 A.3: capture the old Value so it can be shipped
127                // to the bio thread post-borrow rather than dropped
128                // inline (the Drop of a Value::ArcBulk over the heap
129                // heavy threshold is the Axis I tail amplifier).
130                let old = std::mem::replace(&mut e.value, new_value);
131                e.expire_at_ns = expire_at.and_then(crate::pack_deadline);
132                let new_w = key_heap + e.value.weight();
133                let delta = new_w as i64 - e.weight() as i64;
134                let ttl_delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had_ttl);
135                e.set_weight(new_w);
136                (Ok((delta, ttl_delta)), Some(old))
137            }
138            None => {
139                if xx {
140                    return false;
141                }
142                (Err(Entry::new(new_value, expire_at)), None)
143            }
144        };
145        match outcome {
146            Ok((delta, ttl_delta)) => {
147                self.apply_weight_delta(delta);
148                self.adjust_expires(ttl_delta);
149            }
150            Err(entry) => {
151                self.insert_entry(SmallBytes::from_slice(key), entry);
152            }
153        }
154        // Phase 2: hand the old value off if heavy. Done last so the
155        // critical mutation + bookkeeping commit before any (sub-µs in
156        // steady state) channel send.
157        if let Some(old) = old_value {
158            self.maybe_offload_drop(old);
159        }
160        true
161    }
162
163    /// Single-probe overwrite-SET via `kevy_map::RawEntryMut` for the
164    /// `maxmemory == 0` fast path (F1, v1.25). Skips the `live_entry_mut`
165    /// 2-probe shape: Occupied arm mutates in place + returns owned
166    /// (delta, ttl_delta); Expired arm removes via raw-entry handle + falls
167    /// through to insert; Vacant arm goes to insert.
168    fn set_value_no_evict(
169        &mut self,
170        key: &[u8],
171        new_value: Value,
172        expire: Option<Duration>,
173        nx: bool,
174        xx: bool,
175    ) -> bool {
176        use kevy_map::RawEntryMut;
177        let expire_at = expire.map(|d| deadline_at(now_ns(), d));
178        let key_heap = crate::key_heap_bytes_for(key);
179        let (uc, cn) = (self.cached_clock, self.cached_ns);
180        // Hold new_value behind Option so the multi-arm consumption
181        // (overwrite arm vs insert-after-expired arm) is moved-once.
182        let mut value_slot = Some(new_value);
183
184        // Phase 1: probe + decide. Three outcomes — overwrite-finished
185        // (delta + ttl_delta + the displaced old `Value` to maybe ship
186        // to the bio thread), an expired-removed `Entry` whose `Value`
187        // ditto needs offload, or needs-insert with no prior value.
188        enum Outcome {
189            Updated { delta: i64, ttl_delta: i64, old: Value },
190            ExpiredThenInsert { old: Value },
191            NeedInsert,
192        }
193        let outcome = match self.map.raw_entry_mut(key) {
194            RawEntryMut::Occupied(mut occ) => {
195                // Decide via shared ref first so we don't touch the entry
196                // (avoiding a needless cache-line dirtying) on the
197                // is_expired+remove path.
198                let expired = occ.get().is_expired(uc, cn);
199                if expired {
200                    let old = occ.remove();
201                    // borrow on self.map released by remove(self).
202                    self.used_memory = self
203                        .used_memory
204                        .saturating_sub(old.weight() + crate::value::ENTRY_OVERHEAD);
205                    if old.expire_at_ns.is_some() {
206                        self.adjust_expires(-1);
207                    }
208                    self.expired_keys_total =
209                        self.expired_keys_total.saturating_add(1);
210                    if xx {
211                        // No insert; the expired `Value` still needs
212                        // to drop. Ship via the bio path on its way out.
213                        self.maybe_offload_drop(old.value);
214                        return false;
215                    }
216                    Outcome::ExpiredThenInsert { old: old.value }
217                } else {
218                    if nx {
219                        return false;
220                    }
221                    let e = occ.get_mut();
222                    let had_ttl = e.expire_at_ns.is_some();
223                    // v1.25 A.3: take the old Value before overwriting
224                    // so phase 2 can hand it to the bio thread instead
225                    // of dropping inline (the Axis I tail amplifier).
226                    let old = std::mem::replace(&mut e.value, value_slot.take().unwrap());
227                    e.expire_at_ns = expire_at.and_then(crate::pack_deadline);
228                    let new_w = key_heap + e.value.weight();
229                    let delta = new_w as i64 - e.weight() as i64;
230                    let ttl_delta = i64::from(e.expire_at_ns.is_some())
231                        - i64::from(had_ttl);
232                    e.set_weight(new_w);
233                    Outcome::Updated { delta, ttl_delta, old }
234                }
235            }
236            RawEntryMut::Vacant(_) => {
237                if xx {
238                    return false;
239                }
240                Outcome::NeedInsert
241            }
242        };
243
244        // Phase 2: bookkeeping + maybe insert. Borrow on self.map is gone.
245        let old_value: Option<Value> = match outcome {
246            Outcome::Updated { delta, ttl_delta, old } => {
247                self.apply_weight_delta(delta);
248                self.adjust_expires(ttl_delta);
249                Some(old)
250            }
251            Outcome::ExpiredThenInsert { old } => {
252                let entry = Entry::new(value_slot.take().unwrap(), expire_at);
253                self.insert_entry(SmallBytes::from_slice(key), entry);
254                Some(old)
255            }
256            Outcome::NeedInsert => {
257                let entry = Entry::new(value_slot.take().unwrap(), expire_at);
258                self.insert_entry(SmallBytes::from_slice(key), entry);
259                None
260            }
261        };
262        // Phase 3: ship the displaced Value if any. Last so the keyspace
263        // commit precedes any (sub-µs steady-state) channel send.
264        if let Some(old) = old_value {
265            self.maybe_offload_drop(old);
266        }
267        true
268    }
269
270    /// L1 (2026-06-21): GET variant that exposes the underlying encoding
271    /// so the reactor's reply path can choose zero-copy
272    /// (`Value::ArcBulk` → push the Arc to the conn's `output_arcs` for a
273    /// writev iovec) vs memcpy (`Value::Str` / `Value::Int` → encode bytes
274    /// into the conn's output Vec). ONE keyspace lookup; the variant tag
275    /// chooses the encoding without a second probe.
276    pub fn get_for_reply(&mut self, key: &[u8]) -> Result<Option<GetReply<'_>>, StoreError> {
277        match self.live_entry(key) {
278            None => Ok(None),
279            Some(e) => match &e.value {
280                Value::Str(v) => Ok(Some(GetReply::Bytes(Cow::Borrowed(v.as_slice())))),
281                Value::ArcBulk(a) => Ok(Some(GetReply::ArcBulk(Arc::clone(a)))),
282                Value::Int(n) => {
283                    let mut tmp = itoa_i64_stack();
284                    let s = format_i64_into(*n, &mut tmp);
285                    Ok(Some(GetReply::Bytes(Cow::Owned(s.to_vec()))))
286                }
287                _ => Err(StoreError::WrongType),
288            },
289        }
290    }
291
292    /// A.6 (v1.25): fused GET-into-output. Skips the [`GetReply`] enum tag
293    /// round-trip + caller match arm by writing the RESP frame directly into
294    /// `output` (header + bytes + CRLF for Str/Int) or pushing the Arc into
295    /// `output_arcs` at the right offset (ArcBulk zero-copy via writev).
296    /// Returns the same outcomes as [`Self::get_for_reply`]: `Ok(true)` if
297    /// the key was found and emitted, `Ok(false)` if absent (the caller
298    /// emits the `$-1` null bulk — preserves the existing inline-null
299    /// semantics on the reactor side), `Err` for WRONGTYPE.
300    pub fn get_into_output(
301        &mut self,
302        key: &[u8],
303        output: &mut Vec<u8>,
304        output_arcs: &mut Vec<(usize, Arc<Box<[u8]>>)>,
305    ) -> Result<bool, StoreError> {
306        match self.live_entry(key) {
307            None => Ok(false),
308            Some(e) => match &e.value {
309                Value::Str(v) => {
310                    let bytes = v.as_slice();
311                    crate::util::bulk_header_into(output, bytes.len());
312                    output.extend_from_slice(bytes);
313                    output.extend_from_slice(b"\r\n");
314                    Ok(true)
315                }
316                Value::ArcBulk(a) => {
317                    crate::util::bulk_header_into(output, a.len());
318                    let pos = output.len();
319                    output_arcs.push((pos, Arc::clone(a)));
320                    output.extend_from_slice(b"\r\n");
321                    Ok(true)
322                }
323                Value::Int(n) => {
324                    let mut tmp = itoa_i64_stack();
325                    let s = format_i64_into(*n, &mut tmp);
326                    crate::util::bulk_header_into(output, s.len());
327                    output.extend_from_slice(s);
328                    output.extend_from_slice(b"\r\n");
329                    Ok(true)
330                }
331                _ => Err(StoreError::WrongType),
332            },
333        }
334    }
335
336    /// `GET` — returns a `Cow<[u8]>` so `Value::Int` callers can format the
337    /// integer to ASCII without storing it. L2 (2026-06-21): `Value::Str`
338    /// returns `Cow::Borrowed` (zero copy, same as before); `Value::Int`
339    /// formats to a small owned `Vec<u8>` (up to 20 bytes for `i64::MIN`).
340    pub fn get(&mut self, key: &[u8]) -> Result<Option<Cow<'_, [u8]>>, StoreError> {
341        match self.live_entry(key) {
342            None => Ok(None),
343            Some(e) => match &e.value {
344                Value::Str(v) => Ok(Some(Cow::Borrowed(v.as_slice()))),
345                // L1: Arc-backed bulk — return borrow into the Arc's
346                // bytes. Caller can either memcpy via Cow::Borrowed
347                // (default `encode_bulk` path) OR look up the
348                // underlying `Value::ArcBulk(arc)` separately for the
349                // writev zero-copy reply path.
350                Value::ArcBulk(a) => Ok(Some(Cow::Borrowed(a.as_ref()))),
351                Value::Int(n) => {
352                    let mut tmp = itoa_i64_stack();
353                    let s = format_i64_into(*n, &mut tmp);
354                    Ok(Some(Cow::Owned(s.to_vec())))
355                }
356                _ => Err(StoreError::WrongType),
357            },
358        }
359    }
360
361    /// Read-only `GET`: `&self`, so concurrent readers can run under a shared
362    /// lock (embedded mode's `RwLock` read path). Expiry is checked against the
363    /// coarse cached clock but an expired key is *not* removed here (no `&mut`)
364    /// — the reaper / next write reclaims it; a reader just sees `None`. LRU is
365    /// not touched, so this path is only used when eviction is off
366    /// (`maxmemory == 0`); with eviction, the caller takes the mutating
367    /// [`Self::get`] under an exclusive lock so access still stamps the LRU.
368    pub fn get_shared(&self, key: &[u8]) -> Result<Option<Cow<'_, [u8]>>, StoreError> {
369        match self.map.get(key) {
370            None => Ok(None),
371            Some(e) if e.is_expired(self.cached_clock, self.cached_ns) => Ok(None),
372            Some(e) => match &e.value {
373                Value::Str(v) => Ok(Some(Cow::Borrowed(v.as_slice()))),
374                Value::ArcBulk(a) => Ok(Some(Cow::Borrowed(a.as_ref()))),
375                Value::Int(n) => {
376                    let mut tmp = itoa_i64_stack();
377                    let s = format_i64_into(*n, &mut tmp);
378                    Ok(Some(Cow::Owned(s.to_vec())))
379                }
380                _ => Err(StoreError::WrongType),
381            },
382        }
383    }
384
385    pub fn strlen(&mut self, key: &[u8]) -> Result<usize, StoreError> {
386        Ok(self.get(key)?.map_or(0, |c| c.len()))
387    }
388
389    pub fn append(&mut self, key: &[u8], data: &[u8]) -> Result<usize, StoreError> {
390        let outcome = match self.live_entry_mut(key) {
391            Some(e) => match &mut e.value {
392                Value::Str(v) => {
393                    // SmallBytes is immutable; pop out, grow via Vec, re-wrap.
394                    let mut owned = std::mem::take(v).into_vec();
395                    owned.extend_from_slice(data);
396                    let new_len = owned.len();
397                    *v = SmallBytes::from_vec(owned);
398                    AppendOutcome::Reweigh(new_len)
399                }
400                // L1: APPEND on Arc-backed bulk → materialise to a fresh
401                // Vec (no other reader has refs to the old Arc post-replace),
402                // append, then pick the new encoding via SET routing rules.
403                Value::ArcBulk(a) => {
404                    let mut owned: Vec<u8> = a.as_ref().to_vec();
405                    owned.extend_from_slice(data);
406                    let new_len = owned.len();
407                    e.value = pick_value_for_set_owned(owned);
408                    AppendOutcome::Reweigh(new_len)
409                }
410                _ => return Err(StoreError::WrongType),
411            },
412            None => AppendOutcome::Insert,
413        };
414        match outcome {
415            AppendOutcome::Reweigh(new_len) => {
416                self.reweigh_entry(key);
417                Ok(new_len)
418            }
419            AppendOutcome::Insert => {
420                self.insert_entry(
421                    SmallBytes::from_slice(key),
422                    Entry::new(Value::Str(SmallBytes::from_slice(data)), None),
423                );
424                Ok(data.len())
425            }
426        }
427    }
428
429    /// `INCRBY` family; preserves any TTL.
430    ///
431    /// L2 (2026-06-21, lessons from valkey OBJ_ENCODING_INT): the hot path
432    /// matches `Value::Int(n)` and does the increment in place — no parse,
433    /// no format, no allocation. The legacy `Value::Str` arm parses,
434    /// increments, and **promotes** to `Value::Int(next)` so subsequent
435    /// INCRs land on the fast path. Insert-new path also lands as `Int`.
436    pub fn incr_by(&mut self, key: &[u8], delta: i64) -> Result<i64, StoreError> {
437        let outcome = match self.live_entry_mut(key) {
438            Some(e) => match &mut e.value {
439                Value::Int(n) => {
440                    let next = n.checked_add(delta).ok_or(StoreError::Overflow)?;
441                    *n = next;
442                    // In-place i64 mutation — weight unchanged (still 0
443                    // heap bytes for an Int). Skip the reweigh entirely.
444                    return Ok(next);
445                }
446                Value::Str(v) => {
447                    let next = parse_i64(v.as_slice())
448                        .ok_or(StoreError::NotInteger)?
449                        .checked_add(delta)
450                        .ok_or(StoreError::Overflow)?;
451                    // Promote to Int: future INCRs hit the fast path.
452                    e.value = Value::Int(next);
453                    IncrOutcome::Reweigh(next)
454                }
455                Value::ArcBulk(a) => {
456                    // L1: large value claimed to be numeric — parse and
457                    // promote to Int. Subsequent INCRs hit the fast path.
458                    let next = parse_i64(a.as_ref())
459                        .ok_or(StoreError::NotInteger)?
460                        .checked_add(delta)
461                        .ok_or(StoreError::Overflow)?;
462                    e.value = Value::Int(next);
463                    IncrOutcome::Reweigh(next)
464                }
465                _ => return Err(StoreError::WrongType),
466            },
467            // Absent/expired ⇒ start from 0; 0 + delta can't overflow i64.
468            None => IncrOutcome::Insert(delta),
469        };
470        match outcome {
471            IncrOutcome::Reweigh(next) => {
472                self.reweigh_entry(key);
473                Ok(next)
474            }
475            IncrOutcome::Insert(next) => {
476                self.insert_entry(
477                    SmallBytes::from_slice(key),
478                    Entry::new(Value::Int(next), None),
479                );
480                Ok(next)
481            }
482        }
483    }
484
485    /// `GETSET` — set to `val`, return the previous string (WRONGTYPE if the old
486    /// value isn't a string). Clears any TTL, like SET.
487    pub fn getset(&mut self, key: &[u8], val: Vec<u8>) -> Result<Option<Vec<u8>>, StoreError> {
488        let old = match self.live_entry(key) {
489            Some(e) => match &e.value {
490                Value::Str(v) => Some(v.to_vec()),
491                _ => return Err(StoreError::WrongType),
492            },
493            None => None,
494        };
495        self.insert_entry(
496            SmallBytes::from_slice(key),
497            Entry::new(Value::Str(SmallBytes::from_vec(val)), None),
498        );
499        Ok(old)
500    }
501
502    /// `GETDEL` — get then delete (WRONGTYPE if non-string).
503    pub fn getdel(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
504        let is_str = match self.live_entry(key) {
505            None => return Ok(None),
506            Some(e) => matches!(e.value, Value::Str(_)),
507        };
508        if !is_str {
509            return Err(StoreError::WrongType);
510        }
511        match self.remove_entry(key) {
512            Some(Entry {
513                value: Value::Str(v),
514                ..
515            }) => Ok(Some(v.into_vec())),
516            _ => Ok(None),
517        }
518    }
519
520    /// `INCRBYFLOAT` — returns the new value formatted as Redis would. Preserves TTL.
521    pub fn incr_by_float(&mut self, key: &[u8], delta: f64) -> Result<Vec<u8>, StoreError> {
522        let outcome = if let Some(e) = self.live_entry_mut(key) { match &mut e.value {
523            Value::Str(v) => {
524                let next = parse_f64(v.as_slice()).ok_or(StoreError::NotFloat)? + delta;
525                if !next.is_finite() {
526                    return Err(StoreError::NotFloat);
527                }
528                let bytes = fmt_num(next);
529                *v = SmallBytes::from_slice(&bytes);
530                FloatOutcome::Reweigh(bytes)
531            }
532            _ => return Err(StoreError::WrongType),
533        } } else {
534            // Absent/expired ⇒ start from 0.0.
535            if !delta.is_finite() {
536                return Err(StoreError::NotFloat);
537            }
538            FloatOutcome::Insert(fmt_num(delta))
539        };
540        match outcome {
541            FloatOutcome::Reweigh(bytes) => {
542                self.reweigh_entry(key);
543                Ok(bytes)
544            }
545            FloatOutcome::Insert(bytes) => {
546                self.insert_entry(
547                    SmallBytes::from_slice(key),
548                    Entry::new(Value::Str(SmallBytes::from_slice(&bytes)), None),
549                );
550                Ok(bytes)
551            }
552        }
553    }
554}
555
556enum AppendOutcome {
557    Reweigh(usize),
558    Insert,
559}
560
561enum IncrOutcome {
562    Reweigh(i64),
563    Insert(i64),
564}
565
566enum FloatOutcome {
567    Reweigh(Vec<u8>),
568    Insert(Vec<u8>),
569}