kevy_store/string.rs
1//! `Store` string commands.
2
3use crate::util::{
4 fmt_num, format_i64_into, itoa_i64_stack, parse_canonical_i64, parse_f64, parse_i64,
5};
6use crate::value::{BULK_THRESHOLD, SmallBytes, Value};
7use crate::{Entry, Store, StoreError, deadline_at, now_ns};
8use std::borrow::Cow;
9use std::sync::Arc;
10use std::time::Duration;
11
12/// L1 return shape for [`Store::get_for_reply`] — lets the reactor's reply
13/// path choose between memcpy (`Bytes`) and writev zero-copy (`ArcBulk`)
14/// off one keyspace lookup.
15pub enum GetReply<'a> {
16 /// Inline-encoded value — caller memcpys the bytes into its output Vec
17 /// (small replies; encoding cost is tiny vs the RTT floor).
18 Bytes(Cow<'a, [u8]>),
19 /// L1 (2026-06-21): Arc-backed bulk. The reactor's reply path pushes
20 /// the Arc into the conn's `output_arcs` so the next `writev` iovec
21 /// list points DIRECTLY at the value bytes — skipping the per-GET
22 /// memcpy that valkey's `tryAvoidBulkStrCopyToReply` likewise avoids.
23 ArcBulk(Arc<Box<[u8]>>),
24}
25
26/// L2 + L1: pick the optimal encoding for `bytes` at SET time:
27/// 1. Canonical i64 ASCII → `Value::Int(n)` (smallest + INCR fast path)
28/// 2. > [`BULK_THRESHOLD`] bytes → `Value::ArcBulk(Arc<[u8]>)` (lets the
29/// reactor reply path borrow the bytes for `writev` zero-copy GET)
30/// 3. Else → `Value::Str(SmallBytes::from_slice(bytes))` (inline-cache-
31/// line storage, beats Arc indirection for small values)
32#[inline]
33fn pick_value_for_set(bytes: &[u8]) -> Value {
34 if let Some(n) = parse_canonical_i64(bytes) {
35 return Value::Int(n);
36 }
37 if bytes.len() > BULK_THRESHOLD {
38 return Value::ArcBulk(Arc::new(Box::<[u8]>::from(bytes)));
39 }
40 Value::Str(SmallBytes::from_slice(bytes))
41}
42
43#[inline]
44fn pick_value_for_set_owned(bytes: Vec<u8>) -> Value {
45 if let Some(n) = parse_canonical_i64(&bytes) {
46 return Value::Int(n);
47 }
48 if bytes.len() > BULK_THRESHOLD {
49 // v1.29 Option A — `Arc::new(box)` is zero-copy when `len ==
50 // capacity` (shrink-to-fit no-ops). See `Value::ArcBulk` doc.
51 return Value::ArcBulk(Arc::new(bytes.into_boxed_slice()));
52 }
53 Value::Str(SmallBytes::from_vec(bytes))
54}
55
56impl Store {
57 // ---- strings -------------------------------------------------------
58
59 /// `SET` — overwrites any existing value/type. NX/XX guards; clears TTL.
60 /// Takes an owned `Vec` so a >22 B value's allocation is adopted as-is
61 /// (no copy). For callers holding a borrowed slice, prefer
62 /// [`Self::set_slice`] — it skips the `to_vec` entirely for values that
63 /// inline.
64 pub fn set(
65 &mut self,
66 key: &[u8],
67 value: Vec<u8>,
68 expire: Option<Duration>,
69 nx: bool,
70 xx: bool,
71 ) -> bool {
72 self.set_value(key, pick_value_for_set_owned(value), expire, nx, xx)
73 }
74
75 /// [`Self::set`] for a borrowed value. Values ≤ 22 B store inline in the
76 /// entry — zero allocator traffic, where `set(key, value.to_vec(), …)`
77 /// paid a malloc for the `Vec` and a free when the inline copy dropped
78 /// it (the dominant overwrite-SET pattern). Larger values pay the same
79 /// single allocation either way.
80 pub fn set_slice(
81 &mut self,
82 key: &[u8],
83 value: &[u8],
84 expire: Option<Duration>,
85 nx: bool,
86 xx: bool,
87 ) -> bool {
88 self.set_value(key, pick_value_for_set(value), expire, nx, xx)
89 }
90
91 fn set_value(
92 &mut self,
93 key: &[u8],
94 new_value: Value,
95 expire: Option<Duration>,
96 nx: bool,
97 xx: bool,
98 ) -> bool {
99 // F1 (v1.25): single-probe overwrite-SET fast path for default
100 // `maxmemory == 0` (the bench and production-common case). Goes
101 // through `kevy_map::RawEntryMut` (B.1, commit 1a9f9af) so the
102 // Occupied arm mutates the entry in place and returns owned
103 // (delta, ttl_delta) — no escaping reference, no second probe.
104 // Overwrite path drops from 2 probes (live_entry_mut: get+get_mut)
105 // to 1 probe. New-key + expired-removed paths still pay the
106 // insert_entry probe (same as before).
107 if self.maxmemory == 0 {
108 return self.set_value_no_evict(key, new_value, expire, nx, xx);
109 }
110 // Eviction path (maxmemory > 0): keep the 2-probe shape so
111 // `live_entry_mut`'s touch_on_access bookkeeping runs.
112 let expire_at = expire.map(|d| deadline_at(now_ns(), d));
113 let key_heap = crate::key_heap_bytes_for(key);
114 #[allow(clippy::single_match_else)]
115 // Phase 1: in-place overwrite or remember we need to insert.
116 // The old value (if any) is taken via `mem::replace` so the bio
117 // hand-off in phase 2 happens AFTER `self.live_entry_mut`'s
118 // borrow on `self.map` is released — without splitting the
119 // borrow we couldn't call `self.maybe_offload_drop`.
120 let (outcome, old_value) = match self.live_entry_mut(key) {
121 Some(e) => {
122 if nx {
123 return false;
124 }
125 let had_ttl = e.expire_at_ns.is_some();
126 // v1.25 A.3: capture the old Value so it can be shipped
127 // to the bio thread post-borrow rather than dropped
128 // inline (the Drop of a Value::ArcBulk over the heap
129 // heavy threshold is the Axis I tail amplifier).
130 let old = std::mem::replace(&mut e.value, new_value);
131 e.expire_at_ns = expire_at.and_then(crate::pack_deadline);
132 let new_w = key_heap + e.value.weight();
133 let delta = new_w as i64 - e.weight() as i64;
134 let ttl_delta = i64::from(e.expire_at_ns.is_some()) - i64::from(had_ttl);
135 e.set_weight(new_w);
136 (Ok((delta, ttl_delta)), Some(old))
137 }
138 None => {
139 if xx {
140 return false;
141 }
142 (Err(Entry::new(new_value, expire_at)), None)
143 }
144 };
145 match outcome {
146 Ok((delta, ttl_delta)) => {
147 self.apply_weight_delta(delta);
148 self.adjust_expires(ttl_delta);
149 }
150 Err(entry) => {
151 self.insert_entry(SmallBytes::from_slice(key), entry);
152 }
153 }
154 // Phase 2: hand the old value off if heavy. Done last so the
155 // critical mutation + bookkeeping commit before any (sub-µs in
156 // steady state) channel send.
157 if let Some(old) = old_value {
158 self.maybe_offload_drop(old);
159 }
160 true
161 }
162
163 /// Single-probe overwrite-SET via `kevy_map::RawEntryMut` for the
164 /// `maxmemory == 0` fast path (F1, v1.25). Skips the `live_entry_mut`
165 /// 2-probe shape: Occupied arm mutates in place + returns owned
166 /// (delta, ttl_delta); Expired arm removes via raw-entry handle + falls
167 /// through to insert; Vacant arm goes to insert.
168 fn set_value_no_evict(
169 &mut self,
170 key: &[u8],
171 new_value: Value,
172 expire: Option<Duration>,
173 nx: bool,
174 xx: bool,
175 ) -> bool {
176 use kevy_map::RawEntryMut;
177 let expire_at = expire.map(|d| deadline_at(now_ns(), d));
178 let key_heap = crate::key_heap_bytes_for(key);
179 let (uc, cn) = (self.cached_clock, self.cached_ns);
180 // Hold new_value behind Option so the multi-arm consumption
181 // (overwrite arm vs insert-after-expired arm) is moved-once.
182 let mut value_slot = Some(new_value);
183
184 // Phase 1: probe + decide. Three outcomes — overwrite-finished
185 // (delta + ttl_delta + the displaced old `Value` to maybe ship
186 // to the bio thread), an expired-removed `Entry` whose `Value`
187 // ditto needs offload, or needs-insert with no prior value.
188 enum Outcome {
189 Updated { delta: i64, ttl_delta: i64, old: Value },
190 ExpiredThenInsert { old: Value },
191 NeedInsert,
192 }
193 let outcome = match self.map.raw_entry_mut(key) {
194 RawEntryMut::Occupied(mut occ) => {
195 // Decide via shared ref first so we don't touch the entry
196 // (avoiding a needless cache-line dirtying) on the
197 // is_expired+remove path.
198 let expired = occ.get().is_expired(uc, cn);
199 if expired {
200 let old = occ.remove();
201 // borrow on self.map released by remove(self).
202 self.used_memory = self
203 .used_memory
204 .saturating_sub(old.weight() + crate::value::ENTRY_OVERHEAD);
205 if old.expire_at_ns.is_some() {
206 self.adjust_expires(-1);
207 }
208 self.expired_keys_total =
209 self.expired_keys_total.saturating_add(1);
210 if xx {
211 // No insert; the expired `Value` still needs
212 // to drop. Ship via the bio path on its way out.
213 self.maybe_offload_drop(old.value);
214 return false;
215 }
216 Outcome::ExpiredThenInsert { old: old.value }
217 } else {
218 if nx {
219 return false;
220 }
221 let e = occ.get_mut();
222 let had_ttl = e.expire_at_ns.is_some();
223 // v1.25 A.3: take the old Value before overwriting
224 // so phase 2 can hand it to the bio thread instead
225 // of dropping inline (the Axis I tail amplifier).
226 let old = std::mem::replace(&mut e.value, value_slot.take().unwrap());
227 e.expire_at_ns = expire_at.and_then(crate::pack_deadline);
228 let new_w = key_heap + e.value.weight();
229 let delta = new_w as i64 - e.weight() as i64;
230 let ttl_delta = i64::from(e.expire_at_ns.is_some())
231 - i64::from(had_ttl);
232 e.set_weight(new_w);
233 Outcome::Updated { delta, ttl_delta, old }
234 }
235 }
236 RawEntryMut::Vacant(_) => {
237 if xx {
238 return false;
239 }
240 Outcome::NeedInsert
241 }
242 };
243
244 // Phase 2: bookkeeping + maybe insert. Borrow on self.map is gone.
245 let old_value: Option<Value> = match outcome {
246 Outcome::Updated { delta, ttl_delta, old } => {
247 self.apply_weight_delta(delta);
248 self.adjust_expires(ttl_delta);
249 Some(old)
250 }
251 Outcome::ExpiredThenInsert { old } => {
252 let entry = Entry::new(value_slot.take().unwrap(), expire_at);
253 self.insert_entry(SmallBytes::from_slice(key), entry);
254 Some(old)
255 }
256 Outcome::NeedInsert => {
257 let entry = Entry::new(value_slot.take().unwrap(), expire_at);
258 self.insert_entry(SmallBytes::from_slice(key), entry);
259 None
260 }
261 };
262 // Phase 3: ship the displaced Value if any. Last so the keyspace
263 // commit precedes any (sub-µs steady-state) channel send.
264 if let Some(old) = old_value {
265 self.maybe_offload_drop(old);
266 }
267 true
268 }
269
270 /// L1 (2026-06-21): GET variant that exposes the underlying encoding
271 /// so the reactor's reply path can choose zero-copy
272 /// (`Value::ArcBulk` → push the Arc to the conn's `output_arcs` for a
273 /// writev iovec) vs memcpy (`Value::Str` / `Value::Int` → encode bytes
274 /// into the conn's output Vec). ONE keyspace lookup; the variant tag
275 /// chooses the encoding without a second probe.
276 pub fn get_for_reply(&mut self, key: &[u8]) -> Result<Option<GetReply<'_>>, StoreError> {
277 match self.live_entry(key) {
278 None => Ok(None),
279 Some(e) => match &e.value {
280 Value::Str(v) => Ok(Some(GetReply::Bytes(Cow::Borrowed(v.as_slice())))),
281 Value::ArcBulk(a) => Ok(Some(GetReply::ArcBulk(Arc::clone(a)))),
282 Value::Int(n) => {
283 let mut tmp = itoa_i64_stack();
284 let s = format_i64_into(*n, &mut tmp);
285 Ok(Some(GetReply::Bytes(Cow::Owned(s.to_vec()))))
286 }
287 _ => Err(StoreError::WrongType),
288 },
289 }
290 }
291
292 /// A.6 (v1.25): fused GET-into-output. Skips the [`GetReply`] enum tag
293 /// round-trip + caller match arm by writing the RESP frame directly into
294 /// `output` (header + bytes + CRLF for Str/Int) or pushing the Arc into
295 /// `output_arcs` at the right offset (ArcBulk zero-copy via writev).
296 /// Returns the same outcomes as [`Self::get_for_reply`]: `Ok(true)` if
297 /// the key was found and emitted, `Ok(false)` if absent (the caller
298 /// emits the `$-1` null bulk — preserves the existing inline-null
299 /// semantics on the reactor side), `Err` for WRONGTYPE.
300 pub fn get_into_output(
301 &mut self,
302 key: &[u8],
303 output: &mut Vec<u8>,
304 output_arcs: &mut Vec<(usize, Arc<Box<[u8]>>)>,
305 ) -> Result<bool, StoreError> {
306 match self.live_entry(key) {
307 None => Ok(false),
308 Some(e) => match &e.value {
309 Value::Str(v) => {
310 let bytes = v.as_slice();
311 crate::util::bulk_header_into(output, bytes.len());
312 output.extend_from_slice(bytes);
313 output.extend_from_slice(b"\r\n");
314 Ok(true)
315 }
316 Value::ArcBulk(a) => {
317 crate::util::bulk_header_into(output, a.len());
318 let pos = output.len();
319 output_arcs.push((pos, Arc::clone(a)));
320 output.extend_from_slice(b"\r\n");
321 Ok(true)
322 }
323 Value::Int(n) => {
324 let mut tmp = itoa_i64_stack();
325 let s = format_i64_into(*n, &mut tmp);
326 crate::util::bulk_header_into(output, s.len());
327 output.extend_from_slice(s);
328 output.extend_from_slice(b"\r\n");
329 Ok(true)
330 }
331 _ => Err(StoreError::WrongType),
332 },
333 }
334 }
335
336 /// `GET` — returns a `Cow<[u8]>` so `Value::Int` callers can format the
337 /// integer to ASCII without storing it. L2 (2026-06-21): `Value::Str`
338 /// returns `Cow::Borrowed` (zero copy, same as before); `Value::Int`
339 /// formats to a small owned `Vec<u8>` (up to 20 bytes for `i64::MIN`).
340 pub fn get(&mut self, key: &[u8]) -> Result<Option<Cow<'_, [u8]>>, StoreError> {
341 match self.live_entry(key) {
342 None => Ok(None),
343 Some(e) => match &e.value {
344 Value::Str(v) => Ok(Some(Cow::Borrowed(v.as_slice()))),
345 // L1: Arc-backed bulk — return borrow into the Arc's
346 // bytes. Caller can either memcpy via Cow::Borrowed
347 // (default `encode_bulk` path) OR look up the
348 // underlying `Value::ArcBulk(arc)` separately for the
349 // writev zero-copy reply path.
350 Value::ArcBulk(a) => Ok(Some(Cow::Borrowed(a.as_ref()))),
351 Value::Int(n) => {
352 let mut tmp = itoa_i64_stack();
353 let s = format_i64_into(*n, &mut tmp);
354 Ok(Some(Cow::Owned(s.to_vec())))
355 }
356 _ => Err(StoreError::WrongType),
357 },
358 }
359 }
360
361 /// Read-only `GET`: `&self`, so concurrent readers can run under a shared
362 /// lock (embedded mode's `RwLock` read path). Expiry is checked against the
363 /// coarse cached clock but an expired key is *not* removed here (no `&mut`)
364 /// — the reaper / next write reclaims it; a reader just sees `None`. LRU is
365 /// not touched, so this path is only used when eviction is off
366 /// (`maxmemory == 0`); with eviction, the caller takes the mutating
367 /// [`Self::get`] under an exclusive lock so access still stamps the LRU.
368 pub fn get_shared(&self, key: &[u8]) -> Result<Option<Cow<'_, [u8]>>, StoreError> {
369 match self.map.get(key) {
370 None => Ok(None),
371 Some(e) if e.is_expired(self.cached_clock, self.cached_ns) => Ok(None),
372 Some(e) => match &e.value {
373 Value::Str(v) => Ok(Some(Cow::Borrowed(v.as_slice()))),
374 Value::ArcBulk(a) => Ok(Some(Cow::Borrowed(a.as_ref()))),
375 Value::Int(n) => {
376 let mut tmp = itoa_i64_stack();
377 let s = format_i64_into(*n, &mut tmp);
378 Ok(Some(Cow::Owned(s.to_vec())))
379 }
380 _ => Err(StoreError::WrongType),
381 },
382 }
383 }
384
385 pub fn strlen(&mut self, key: &[u8]) -> Result<usize, StoreError> {
386 Ok(self.get(key)?.map_or(0, |c| c.len()))
387 }
388
389 pub fn append(&mut self, key: &[u8], data: &[u8]) -> Result<usize, StoreError> {
390 let outcome = match self.live_entry_mut(key) {
391 Some(e) => match &mut e.value {
392 Value::Str(v) => {
393 // SmallBytes is immutable; pop out, grow via Vec, re-wrap.
394 let mut owned = std::mem::take(v).into_vec();
395 owned.extend_from_slice(data);
396 let new_len = owned.len();
397 *v = SmallBytes::from_vec(owned);
398 AppendOutcome::Reweigh(new_len)
399 }
400 // L1: APPEND on Arc-backed bulk → materialise to a fresh
401 // Vec (no other reader has refs to the old Arc post-replace),
402 // append, then pick the new encoding via SET routing rules.
403 Value::ArcBulk(a) => {
404 let mut owned: Vec<u8> = a.as_ref().to_vec();
405 owned.extend_from_slice(data);
406 let new_len = owned.len();
407 e.value = pick_value_for_set_owned(owned);
408 AppendOutcome::Reweigh(new_len)
409 }
410 _ => return Err(StoreError::WrongType),
411 },
412 None => AppendOutcome::Insert,
413 };
414 match outcome {
415 AppendOutcome::Reweigh(new_len) => {
416 self.reweigh_entry(key);
417 Ok(new_len)
418 }
419 AppendOutcome::Insert => {
420 self.insert_entry(
421 SmallBytes::from_slice(key),
422 Entry::new(Value::Str(SmallBytes::from_slice(data)), None),
423 );
424 Ok(data.len())
425 }
426 }
427 }
428
429 /// `INCRBY` family; preserves any TTL.
430 ///
431 /// L2 (2026-06-21, lessons from valkey OBJ_ENCODING_INT): the hot path
432 /// matches `Value::Int(n)` and does the increment in place — no parse,
433 /// no format, no allocation. The legacy `Value::Str` arm parses,
434 /// increments, and **promotes** to `Value::Int(next)` so subsequent
435 /// INCRs land on the fast path. Insert-new path also lands as `Int`.
436 pub fn incr_by(&mut self, key: &[u8], delta: i64) -> Result<i64, StoreError> {
437 let outcome = match self.live_entry_mut(key) {
438 Some(e) => match &mut e.value {
439 Value::Int(n) => {
440 let next = n.checked_add(delta).ok_or(StoreError::Overflow)?;
441 *n = next;
442 // In-place i64 mutation — weight unchanged (still 0
443 // heap bytes for an Int). Skip the reweigh entirely.
444 return Ok(next);
445 }
446 Value::Str(v) => {
447 let next = parse_i64(v.as_slice())
448 .ok_or(StoreError::NotInteger)?
449 .checked_add(delta)
450 .ok_or(StoreError::Overflow)?;
451 // Promote to Int: future INCRs hit the fast path.
452 e.value = Value::Int(next);
453 IncrOutcome::Reweigh(next)
454 }
455 Value::ArcBulk(a) => {
456 // L1: large value claimed to be numeric — parse and
457 // promote to Int. Subsequent INCRs hit the fast path.
458 let next = parse_i64(a.as_ref())
459 .ok_or(StoreError::NotInteger)?
460 .checked_add(delta)
461 .ok_or(StoreError::Overflow)?;
462 e.value = Value::Int(next);
463 IncrOutcome::Reweigh(next)
464 }
465 _ => return Err(StoreError::WrongType),
466 },
467 // Absent/expired ⇒ start from 0; 0 + delta can't overflow i64.
468 None => IncrOutcome::Insert(delta),
469 };
470 match outcome {
471 IncrOutcome::Reweigh(next) => {
472 self.reweigh_entry(key);
473 Ok(next)
474 }
475 IncrOutcome::Insert(next) => {
476 self.insert_entry(
477 SmallBytes::from_slice(key),
478 Entry::new(Value::Int(next), None),
479 );
480 Ok(next)
481 }
482 }
483 }
484
485 /// `GETSET` — set to `val`, return the previous string (WRONGTYPE if the old
486 /// value isn't a string). Clears any TTL, like SET.
487 pub fn getset(&mut self, key: &[u8], val: Vec<u8>) -> Result<Option<Vec<u8>>, StoreError> {
488 let old = match self.live_entry(key) {
489 Some(e) => match &e.value {
490 Value::Str(v) => Some(v.to_vec()),
491 _ => return Err(StoreError::WrongType),
492 },
493 None => None,
494 };
495 self.insert_entry(
496 SmallBytes::from_slice(key),
497 Entry::new(Value::Str(SmallBytes::from_vec(val)), None),
498 );
499 Ok(old)
500 }
501
502 /// `GETDEL` — get then delete (WRONGTYPE if non-string).
503 pub fn getdel(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
504 let is_str = match self.live_entry(key) {
505 None => return Ok(None),
506 Some(e) => matches!(e.value, Value::Str(_)),
507 };
508 if !is_str {
509 return Err(StoreError::WrongType);
510 }
511 match self.remove_entry(key) {
512 Some(Entry {
513 value: Value::Str(v),
514 ..
515 }) => Ok(Some(v.into_vec())),
516 _ => Ok(None),
517 }
518 }
519
520 /// `INCRBYFLOAT` — returns the new value formatted as Redis would. Preserves TTL.
521 pub fn incr_by_float(&mut self, key: &[u8], delta: f64) -> Result<Vec<u8>, StoreError> {
522 let outcome = if let Some(e) = self.live_entry_mut(key) { match &mut e.value {
523 Value::Str(v) => {
524 let next = parse_f64(v.as_slice()).ok_or(StoreError::NotFloat)? + delta;
525 if !next.is_finite() {
526 return Err(StoreError::NotFloat);
527 }
528 let bytes = fmt_num(next);
529 *v = SmallBytes::from_slice(&bytes);
530 FloatOutcome::Reweigh(bytes)
531 }
532 _ => return Err(StoreError::WrongType),
533 } } else {
534 // Absent/expired ⇒ start from 0.0.
535 if !delta.is_finite() {
536 return Err(StoreError::NotFloat);
537 }
538 FloatOutcome::Insert(fmt_num(delta))
539 };
540 match outcome {
541 FloatOutcome::Reweigh(bytes) => {
542 self.reweigh_entry(key);
543 Ok(bytes)
544 }
545 FloatOutcome::Insert(bytes) => {
546 self.insert_entry(
547 SmallBytes::from_slice(key),
548 Entry::new(Value::Str(SmallBytes::from_slice(&bytes)), None),
549 );
550 Ok(bytes)
551 }
552 }
553 }
554}
555
556enum AppendOutcome {
557 Reweigh(usize),
558 Insert,
559}
560
561enum IncrOutcome {
562 Reweigh(i64),
563 Insert(i64),
564}
565
566enum FloatOutcome {
567 Reweigh(Vec<u8>),
568 Insert(Vec<u8>),
569}