Skip to main content

kevy_store/
value.rs

1//! Value types — one backing structure per Redis type.
2
3pub use kevy_bytes::SmallBytes;
4use kevy_map::{KevyMap, KevySet};
5use std::cmp::Ordering;
6use std::collections::{BTreeSet, VecDeque};
7use std::sync::Arc;
8
9/// Backing structure for a Hash value — [`KevyMap`] keyed by [`SmallBytes`]
10/// (22 B inline / heap-else). Field names ≤22B (the vast majority — `name`,
11/// `email`, etc.) live entirely inside the bucket, saving the 24 B Vec
12/// metadata + heap allocation per field on a 22-byte budget.
13pub type HashData = KevyMap<SmallBytes, Vec<u8>>;
14/// Backing structure for a List value (a ring-buffer deque — O(1) both ends).
15pub type ListData = VecDeque<Vec<u8>>;
16/// Backing structure for a Set value — [`KevySet`] of [`SmallBytes`].
17pub type SetData = KevySet<SmallBytes>;
18
19/// A total-ordered f64 score (Redis scores are never NaN). `total_cmp` gives a
20/// total order so scores can key a `BTreeSet`.
21#[derive(Clone, Copy, PartialEq)]
22pub struct Score(pub f64);
23impl Eq for Score {}
24impl Ord for Score {
25    fn cmp(&self, other: &Self) -> Ordering {
26        self.0.total_cmp(&other.0)
27    }
28}
29impl PartialOrd for Score {
30    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
31        Some(self.cmp(other))
32    }
33}
34
35/// A score-range endpoint for `ZRANGEBYSCORE`/`ZCOUNT` (inclusive or exclusive).
36/// Use `value = ±INFINITY` for `-inf`/`+inf`.
37pub struct ScoreBound {
38    pub value: f64,
39    pub exclusive: bool,
40}
41impl ScoreBound {
42    /// Does `s` satisfy this as a *minimum* bound?
43    pub(crate) fn ge_ok(&self, s: f64) -> bool {
44        if self.exclusive {
45            s > self.value
46        } else {
47            s >= self.value
48        }
49    }
50    /// Does `s` satisfy this as a *maximum* bound?
51    pub(crate) fn le_ok(&self, s: f64) -> bool {
52        if self.exclusive {
53            s < self.value
54        } else {
55            s <= self.value
56        }
57    }
58}
59
60/// Sorted set: a member→score map plus a B-tree ordered by `(score, member)`.
61/// (A B-tree is cache-friendlier than Redis's skiplist; `ZRANK` is O(n) here —
62/// an order-statistics tree for O(log n) rank is a later perf item.)
63#[derive(Default, Clone)]
64pub struct ZSetData {
65    pub(crate) by_member: KevyMap<SmallBytes, f64>,
66    /// The `(score, member)` index is still keyed by `Vec<u8>` member —
67    /// changing this requires the `BTreeSet` to accept a `(Score,
68    /// SmallBytes)` ordering, which is fine but a larger sweep; keep
69    /// it as-is for now to avoid touching ZRANGE paths.
70    pub(crate) by_score: BTreeSet<(Score, Vec<u8>)>,
71}
72
73impl ZSetData {
74    pub(crate) fn insert(&mut self, member: &[u8], score: f64) -> bool {
75        let is_new = match self.by_member.insert(SmallBytes::from_slice(member), score) {
76            Some(old) => {
77                self.by_score.remove(&(Score(old), member.to_vec()));
78                false
79            }
80            None => true,
81        };
82        self.by_score.insert((Score(score), member.to_vec()));
83        is_new
84    }
85    pub(crate) fn remove(&mut self, member: &[u8]) -> bool {
86        match self.by_member.remove(member) {
87            Some(old) => {
88                self.by_score.remove(&(Score(old), member.to_vec()));
89                true
90            }
91            None => false,
92        }
93    }
94    pub(crate) fn len(&self) -> usize {
95        self.by_member.len()
96    }
97    /// `(member, score)` pairs in ascending `(score, member)` order.
98    pub fn ordered(&self) -> impl Iterator<Item = (&[u8], f64)> {
99        self.by_score.iter().map(|(s, m)| (m.as_slice(), s.0))
100    }
101}
102
103/// A stored value. One variant per Redis type.
104///
105/// The collection variants live behind a **shared pointer** (`Arc`) so the
106/// enum is only as big as `Str` (24 B) + tag = 32 B, not the 56 B `ZSetData`
107/// — every `Entry` (incl. the common string case) is then ~48 B instead of
108/// ~80 B, so the bucket array is ~40% denser/smaller (fewer cache misses on
109/// a large keyspace, less RSS). The extra pointer-chase lands only on
110/// collection ops, not the hot string GET path.
111///
112/// `Arc` (same 8 B as the previous `Box`) is what makes O(short-pause)
113/// persistence possible: [`crate::Store::collect_snapshot`] bumps each
114/// collection's refcount instead of serializing it, and a background thread
115/// walks the frozen payloads at leisure. Mutations go through
116/// [`std::sync::Arc::make_mut`] — a single uniqueness check (the steady
117/// state, no snapshot in flight) or a copy-on-write deep clone when a
118/// snapshot still holds the payload.
119///
120/// `Str` holds a [`SmallBytes`] (24 B, same size as `Vec<u8>`) so byte strings
121/// up to 22 bytes live **inline inside the bucket**, killing the second cache
122/// miss the value pointer-chase used to cost on large-keyspace GETs.
123/// `Clone` is the snapshot-collect primitive: `Str` copies its bytes
124/// (inline = 24 B memcpy; heap = one allocation), collections bump a
125/// refcount. See [`crate::Store::collect_snapshot`].
126#[derive(Clone)]
127pub enum Value {
128    Str(SmallBytes),
129    /// L2 (2026-06-21, lessons from valkey OBJ_ENCODING_INT): when a SET
130    /// stores a clean canonical i64 ASCII string (parses round-trip), we
131    /// keep the integer **as i64** rather than as 22 B of inline bytes.
132    /// Wins on INCR (in-place `+= delta`, no parse / no format / no
133    /// SmallBytes wrap) and on memory (8 B vs 24 B). GET formats it via
134    /// a per-`Store` scratch buffer.
135    Int(i64),
136    /// L1 (2026-06-21) / v1.29 Option A (2026-06-29): values larger
137    /// than [`BULK_THRESHOLD`] bytes get stored behind an
138    /// `Arc<Box<[u8]>>` instead of a heap-backed `SmallBytes`. The Arc
139    /// lets the io_uring reactor's reply path borrow the bytes across
140    /// the SQE→CQE window safely (Arc clone keeps them alive even if
141    /// the keyspace mutates) — the prerequisite for the writev
142    /// zero-copy bulk reply path, which skips the per-GET memcpy from
143    /// value storage into the per-conn output buffer.
144    ///
145    /// **Why `Arc<Box<[u8]>>` and not `Arc<[u8]>`**: `Arc<[u8]>` is a
146    /// DST-backed `ArcInner<[u8]> = { strong, weak, [u8; N] }` whose
147    /// data slot sits past the refcount words. `Arc::from(Vec<u8>)`
148    /// allocates a fresh `ArcInner` and `copy_from_slice`s the bytes
149    /// — a hard mandatory 64 KiB memcpy on every big SET. With
150    /// `Arc<Box<[u8]>>`, the `Box<[u8]>` wrapper occupies the Arc's
151    /// data slot (16 B), pointing AT an unchanged heap buffer; so
152    /// `Arc::new(vec.into_boxed_slice())` is **truly zero-copy**
153    /// (the boxed slice's allocation stays put — only the 32-byte
154    /// `ArcInner` is freshly malloced). Per-GET cost: one extra
155    /// pointer dereference (`&**arc` to get `&[u8]`), measured to be
156    /// negligible vs the per-SET memcpy savings. See
157    /// `bench/PERF-FINDING-2026-06-29-arc-from-box-memcpys.md` for
158    /// the empirical perf-record evidence of the original
159    /// `Arc<[u8]>` mandatory copy.
160    ///
161    /// Small values stay on `Str(SmallBytes)` because the inline
162    /// cache-line storage beats an Arc indirection for the common case.
163    ArcBulk(Arc<Box<[u8]>>),
164    Hash(Arc<HashData>),
165    List(Arc<ListData>),
166    Set(Arc<SetData>),
167    ZSet(Arc<ZSetData>),
168    Stream(Arc<crate::stream::StreamData>),
169    /// v1.25 A.7 O5 (valkey-orthodox encoding switch): tiny sets (1-N
170    /// short members) live inline in 24 bytes instead of behind
171    /// `Arc<SetData>` — matches valkey's `OBJ_ENCODING_LISTPACK` for
172    /// sets, which is what `redis-benchmark -t sadd` default `-r 0`
173    /// (cardinality stays at 1 forever, single 20-byte literal member)
174    /// measures. On overflow ([`crate::small_set::SmallSetData::try_add`]
175    /// returns `NoRoom`) the set is promoted to `Value::Set(Arc<SetData>)`
176    /// — the Swiss-table path that wins for larger cardinalities.
177    SmallSetInline(crate::small_set::SmallSetData),
178    /// v1.25 A.8 (extension of A.7 to hashes): tiny hashes
179    /// (1-2 short field-value pairs) live inline in 24 bytes; promoted
180    /// to `Value::Hash(Arc<HashData>)` on overflow. Mirrors valkey's
181    /// `OBJ_ENCODING_LISTPACK` for hashes.
182    SmallHashInline(crate::small_hash::SmallHashData),
183    /// v1.25 A.8: tiny lists inline encoding; promoted to
184    /// `Value::List(Arc<ListData>)` on overflow.
185    SmallListInline(crate::small_list::SmallListData),
186    /// v1.25 A.8: tiny sorted sets inline encoding; promoted to
187    /// `Value::ZSet(Arc<ZSetData>)` on overflow.
188    SmallZSetInline(crate::small_zset::SmallZSetData),
189}
190
191/// Threshold (bytes) above which a SET stores its value as
192/// [`Value::ArcBulk`] (writev-eligible on GET) instead of [`Value::Str`]
193/// (inline `SmallBytes`). 64 B ≈ one cache line — below that the
194/// inline-SmallBytes storage wins on L1 locality; above it the
195/// writev-borrow win dominates.
196pub const BULK_THRESHOLD: usize = 64;
197
198const _: () = {
199    // Don't let future variants undo box-collection's Entry-48B win.
200    assert!(std::mem::size_of::<Value>() <= 32);
201};
202
203/// Heap-size threshold above which an overwritten `Value` is sent to the
204/// runtime's bio thread for off-reactor drop instead of being freed inline
205/// (v1.25 A.3 lazy-drop).
206///
207/// **Calibrated 2026-06-22 from the bench-with-256-B-floor R3 ★ finding**:
208/// dropping the threshold to 256 B regressed Axis I c=50 -d 10240 SET
209/// p999 from 0.487 → 1.583 ms (worse by 3.25×). The cause: `std::sync::mpsc::Sender::send`
210/// is a few hundred ns of atomic + Box clone, which EXCEEDS the inline
211/// `Box::<[u8]>::drop` cost when jemalloc serves the free from a hot
212/// large-class slab (~ 1-3 µs for 10 KB; the bench's steady state).
213/// Off-loading only wins when the inline drop's tail risk (cold-slab
214/// `munmap`/`madvise` consolidation stall, observed at 50-150 µs and
215/// occasionally millisecond-range) exceeds the per-send channel cost
216/// PLUS the cross-thread cache-line bouncing.
217///
218/// v1.25 A.2 (batch-send follow-up to A.3): with per-shard batch
219/// accumulation flushing at the end of every reactor iteration, the
220/// per-mpsc-send cost is amortised across N drops. That makes the
221/// channel hop profitable at smaller sizes than A.3's lone-send model
222/// could justify (A.3 had to lift to 16 KB because per-`mpsc::send`
223/// cost was a few hundred ns — at 256 B the inline drop was cheaper).
224///
225/// **R3 ★ — sweet-spot threshold surprise**: the agent brief and A.3
226/// commit body both gestured at dropping the threshold to 256 B – 1 KB
227/// once batching amortises the send. Sweep on lx64 across
228/// {512, 1024, 4096, 16384} × c=50 SET -d {1K, 4K, 10K, 64K}
229/// disproved that floor: at ≤ 1 KB threshold, p999 / max on small
230/// values (-d 1024, -d 4096) was variance-bounded equal or
231/// occasionally WORSE than the A.3 16 KB threshold, while the larger
232/// sizes (10 KB / 64 KB) won either way. Cause: the Vec::push +
233/// occasional `MAX_PENDING_DROPS` force-flush stall costs more for
234/// small Arcs (jemalloc small-class free is sub-µs even at tail)
235/// than the inline drop it avoids.
236///
237/// Picked **4 KB** as the lowest threshold where the bio-off-reactor
238/// win consistently dominates the batch-buffer overhead on tail
239/// metrics. The biggest A.2 wins (vs A.3 16 KB) land on `-d 64K`
240/// SET p50 (-44 %) and `-d 10K` SET max (-35 %), where each iter's
241/// batch already contains several heavy values per shard.
242pub const HEAP_HEAVY_BYTES: usize = 4 * 1024;
243
244/// Sender half of the runtime's bio-drop channel. Wired from
245/// `kevy-rt`'s `bio.rs` via [`crate::Store::set_bio_drop_sender`]; the
246/// concrete payload is `Vec<Box<Value>>` — a **batch** of values
247/// produced by one shard since its last flush (A.2 batch-send model).
248/// The bio thread (`kevy-rt::bio::spawn`) iterates the batch and
249/// drops each item. One mpsc message per shard-flush amortises the
250/// channel cost (atomic + cross-thread cacheline traffic) across
251/// however many values landed in the batch.
252pub type BioDropSender = std::sync::mpsc::Sender<Vec<Box<Value>>>;
253
254impl Value {
255    /// The Redis type name (`TYPE` command).
256    pub fn type_name(&self) -> &'static str {
257        match self {
258            Value::Str(_) | Value::Int(_) | Value::ArcBulk(_) => "string",
259            Value::Hash(_) | Value::SmallHashInline(_) => "hash",
260            Value::List(_) | Value::SmallListInline(_) => "list",
261            Value::Set(_) | Value::SmallSetInline(_) => "set",
262            Value::ZSet(_) | Value::SmallZSetInline(_) => "zset",
263            Value::Stream(_) => "stream",
264        }
265    }
266
267    /// Approximate heap bytes the value owns. Excludes the inline `Entry` /
268    /// bucket slot — that's a separate per-entry constant accounted by the
269    /// store. Walks collections, so prefer the cached `Entry::weight` for
270    /// hot-path accounting and only call this when bootstrapping or after a
271    /// load-from-snapshot.
272    pub fn weight(&self) -> u64 {
273        match self {
274            Value::Str(s) => s.heap_bytes() as u64,
275            // i64 fits in the enum tag's space; no heap.
276            Value::Int(_) => 0,
277            // Arc<[u8]> heap = the byte slice itself (refcount overhead
278            // is amortised across shared clones).
279            Value::ArcBulk(a) => a.len() as u64,
280            Value::Hash(h) => collection_overhead(h.capacity(), HASH_SLOT_BYTES) + h
281                .iter()
282                .map(|(f, v)| f.heap_bytes() as u64 + v.capacity() as u64)
283                .sum::<u64>(),
284            Value::List(l) => (l.capacity() as u64).saturating_mul(LIST_SLOT_BYTES)
285                + l.iter().map(|v| v.capacity() as u64).sum::<u64>(),
286            Value::Set(s) => collection_overhead(s.capacity(), SET_SLOT_BYTES) + s
287                .iter()
288                .map(|m| m.heap_bytes() as u64)
289                .sum::<u64>(),
290            // Inline collections live entirely in the Value variant
291            // body — zero heap, zero bucket overhead. Accounting matches
292            // `Value::Int` / inline `Value::Str` (both also return 0).
293            Value::SmallSetInline(_)
294            | Value::SmallHashInline(_)
295            | Value::SmallListInline(_)
296            | Value::SmallZSetInline(_) => 0,
297            Value::ZSet(z) => collection_overhead(z.by_member.capacity(), HASH_SLOT_BYTES)
298                + z.by_member
299                    .iter()
300                    .map(|(m, _)| m.heap_bytes() as u64)
301                    .sum::<u64>()
302                + (z.by_score.len() as u64).saturating_mul(BTREE_SLOT_BYTES),
303            Value::Stream(s) => s.weight(),
304        }
305    }
306
307    /// Whether this value's `Drop` is heavy enough to deserve being
308    /// shipped to the bio thread instead of freed inline. Fast: every
309    /// variant decides off a sub-field cheap to inspect (no recursive
310    /// walk), so it's safe to call on every overwrite-SET on the hot
311    /// path. The threshold is intentionally conservative — small Arcs
312    /// + every short string stay on inline-drop where jemalloc small-
313    /// class is sub-µs and a cross-thread hand-off would lose.
314    #[inline]
315    pub fn is_heap_heavy(&self) -> bool {
316        match self {
317            // Inline 22 B / heap ≤ small-class — fast to free inline.
318            Value::Str(_)
319            | Value::Int(_)
320            | Value::SmallSetInline(_)
321            | Value::SmallHashInline(_)
322            | Value::SmallListInline(_)
323            | Value::SmallZSetInline(_) => false,
324            // The Axis I culprit. v1.25 A.3 lazy-drop's primary case.
325            Value::ArcBulk(a) => a.len() >= HEAP_HEAVY_BYTES,
326            // Collection drops walk every element + the bucket array;
327            // worst-case microseconds on a multi-KB hash/zset. Send to
328            // bio so a SET that overwrites a collection-typed key (the
329            // Redis polymorphic case) doesn't stall the reactor.
330            //
331            // The check uses `Arc::strong_count == 1` to avoid sending
332            // a still-shared Arc: another holder (a SnapshotView in
333            // flight, a same-shard live read) would force the bio
334            // thread to only do a refcount-decrement, which is wasted
335            // cross-thread traffic. A unique Arc IS the case where
336            // drop is expensive (it really frees the inner payload).
337            Value::Hash(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
338            Value::List(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
339            Value::Set(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
340            Value::ZSet(a) => {
341                std::sync::Arc::strong_count(a) == 1 && a.by_member.len() > 0
342            }
343            Value::Stream(a) => {
344                std::sync::Arc::strong_count(a) == 1 && a.length() > 0
345            }
346        }
347    }
348}
349
350// `BioDropSender = mpsc::Sender<Box<Value>>` requires `Value: Send`. Static
351// assert: if a future variant inadvertently makes Value `!Send` (e.g. an
352// `Rc<...>` payload) this fails at compile time, BEFORE the runtime tries
353// to hand a value to the bio thread.
354const _: fn() = || {
355    fn assert_send<T: Send>() {}
356    assert_send::<Value>();
357};
358
359/// Per-bucket footprint for `KevyMap`/`KevySet`-backed collections (open-
360/// addressing Swiss table). Approximation, not exact: includes metadata byte
361/// per slot plus the boxed `K`/`V` cell, padded for 7/8 load factor.
362pub(crate) const HASH_SLOT_BYTES: u64 = 32;
363pub(crate) const SET_SLOT_BYTES: u64 = 24;
364/// `VecDeque` ring-buffer slot per stored `Vec<u8>` header (24 B Vec metadata).
365pub(crate) const LIST_SLOT_BYTES: u64 = 24;
366/// `BTreeSet` per-entry overhead (node pointers + 6-element B-tree node padding).
367pub(crate) const BTREE_SLOT_BYTES: u64 = 40;
368/// Per-entry overhead in the top-level keyspace map: the inline 24-byte
369/// `SmallBytes` key cell + the 64-byte `Entry` (post weight/clock fields) +
370/// metadata. Approximation that errs slightly high so `used_memory` stays a
371/// conservative upper bound vs the actual allocator footprint.
372pub const ENTRY_OVERHEAD: u64 = 96;
373
374#[inline]
375fn collection_overhead(capacity: usize, per_slot: u64) -> u64 {
376    (capacity as u64).saturating_mul(per_slot)
377}
378
379/// Per-field delta a new hash field charges against the entry weight: heap
380/// bytes for the field name (if not inline) + value capacity + one slot of
381/// bucket overhead. Used when an HSET inserts a brand-new field.
382#[inline]
383pub fn hash_field_weight(field: &SmallBytes, value_cap: usize) -> u64 {
384    field.heap_bytes() as u64 + value_cap as u64 + HASH_SLOT_BYTES
385}
386
387/// Per-member delta a new set member charges. Mirrors [`hash_field_weight`]
388/// for the set variant (no separate value, single bucket slot).
389#[inline]
390pub fn set_member_weight(member: &SmallBytes) -> u64 {
391    member.heap_bytes() as u64 + SET_SLOT_BYTES
392}
393
394/// Per-item delta a new list element charges (Vec header slot + heap cap).
395#[inline]
396pub fn list_item_weight(value_cap: usize) -> u64 {
397    LIST_SLOT_BYTES + value_cap as u64
398}
399
400/// Per-member delta a new zset member charges: hash slot for `by_member` +
401/// BTreeSet slot for `by_score` + the member's heap bytes.
402#[inline]
403pub fn zset_member_weight(member: &SmallBytes) -> u64 {
404    member.heap_bytes() as u64 + HASH_SLOT_BYTES + BTREE_SLOT_BYTES
405}