kevy_store/value.rs
1//! Value types — one backing structure per Redis type.
2
3pub use kevy_bytes::SmallBytes;
4use kevy_map::{KevyMap, KevySet};
5use std::cmp::Ordering;
6use std::collections::{BTreeSet, VecDeque};
7use std::sync::Arc;
8
9/// Backing structure for a Hash value — [`KevyMap`] keyed by [`SmallBytes`]
10/// (22 B inline / heap-else). Field names ≤22B (the vast majority — `name`,
11/// `email`, etc.) live entirely inside the bucket, saving the 24 B Vec
12/// metadata + heap allocation per field on a 22-byte budget.
13pub type HashData = KevyMap<SmallBytes, Vec<u8>>;
14/// Backing structure for a List value (a ring-buffer deque — O(1) both ends).
15pub type ListData = VecDeque<Vec<u8>>;
16/// Backing structure for a Set value — [`KevySet`] of [`SmallBytes`].
17pub type SetData = KevySet<SmallBytes>;
18
19/// A total-ordered f64 score (Redis scores are never NaN). `total_cmp` gives a
20/// total order so scores can key a `BTreeSet`.
21#[derive(Clone, Copy, PartialEq)]
22pub struct Score(pub f64);
23impl Eq for Score {}
24impl Ord for Score {
25 fn cmp(&self, other: &Self) -> Ordering {
26 self.0.total_cmp(&other.0)
27 }
28}
29impl PartialOrd for Score {
30 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
31 Some(self.cmp(other))
32 }
33}
34
35/// A score-range endpoint for `ZRANGEBYSCORE`/`ZCOUNT` (inclusive or exclusive).
36/// Use `value = ±INFINITY` for `-inf`/`+inf`.
37pub struct ScoreBound {
38 pub value: f64,
39 pub exclusive: bool,
40}
41impl ScoreBound {
42 /// Does `s` satisfy this as a *minimum* bound?
43 pub(crate) fn ge_ok(&self, s: f64) -> bool {
44 if self.exclusive {
45 s > self.value
46 } else {
47 s >= self.value
48 }
49 }
50 /// Does `s` satisfy this as a *maximum* bound?
51 pub(crate) fn le_ok(&self, s: f64) -> bool {
52 if self.exclusive {
53 s < self.value
54 } else {
55 s <= self.value
56 }
57 }
58}
59
60/// Sorted set: a member→score map plus a B-tree ordered by `(score, member)`.
61/// (A B-tree is cache-friendlier than Redis's skiplist; `ZRANK` is O(n) here —
62/// an order-statistics tree for O(log n) rank is a later perf item.)
63#[derive(Default, Clone)]
64pub struct ZSetData {
65 pub(crate) by_member: KevyMap<SmallBytes, f64>,
66 /// The `(score, member)` index is still keyed by `Vec<u8>` member —
67 /// changing this requires the `BTreeSet` to accept a `(Score,
68 /// SmallBytes)` ordering, which is fine but a larger sweep; keep
69 /// it as-is for now to avoid touching ZRANGE paths.
70 pub(crate) by_score: BTreeSet<(Score, Vec<u8>)>,
71}
72
73impl ZSetData {
74 pub(crate) fn insert(&mut self, member: &[u8], score: f64) -> bool {
75 let is_new = match self.by_member.insert(SmallBytes::from_slice(member), score) {
76 Some(old) => {
77 self.by_score.remove(&(Score(old), member.to_vec()));
78 false
79 }
80 None => true,
81 };
82 self.by_score.insert((Score(score), member.to_vec()));
83 is_new
84 }
85 pub(crate) fn remove(&mut self, member: &[u8]) -> bool {
86 match self.by_member.remove(member) {
87 Some(old) => {
88 self.by_score.remove(&(Score(old), member.to_vec()));
89 true
90 }
91 None => false,
92 }
93 }
94 pub(crate) fn len(&self) -> usize {
95 self.by_member.len()
96 }
97 /// `(member, score)` pairs in ascending `(score, member)` order.
98 pub fn ordered(&self) -> impl Iterator<Item = (&[u8], f64)> {
99 self.by_score.iter().map(|(s, m)| (m.as_slice(), s.0))
100 }
101}
102
103/// A stored value. One variant per Redis type.
104///
105/// The collection variants live behind a **shared pointer** (`Arc`) so the
106/// enum is only as big as `Str` (24 B) + tag = 32 B, not the 56 B `ZSetData`
107/// — every `Entry` (incl. the common string case) is then ~48 B instead of
108/// ~80 B, so the bucket array is ~40% denser/smaller (fewer cache misses on
109/// a large keyspace, less RSS). The extra pointer-chase lands only on
110/// collection ops, not the hot string GET path.
111///
112/// `Arc` (same 8 B as the previous `Box`) is what makes O(short-pause)
113/// persistence possible: [`crate::Store::collect_snapshot`] bumps each
114/// collection's refcount instead of serializing it, and a background thread
115/// walks the frozen payloads at leisure. Mutations go through
116/// [`std::sync::Arc::make_mut`] — a single uniqueness check (the steady
117/// state, no snapshot in flight) or a copy-on-write deep clone when a
118/// snapshot still holds the payload.
119///
120/// `Str` holds a [`SmallBytes`] (24 B, same size as `Vec<u8>`) so byte strings
121/// up to 22 bytes live **inline inside the bucket**, killing the second cache
122/// miss the value pointer-chase used to cost on large-keyspace GETs.
123/// `Clone` is the snapshot-collect primitive: `Str` copies its bytes
124/// (inline = 24 B memcpy; heap = one allocation), collections bump a
125/// refcount. See [`crate::Store::collect_snapshot`].
126#[derive(Clone)]
127pub enum Value {
128 Str(SmallBytes),
129 /// L2 (2026-06-21, lessons from valkey OBJ_ENCODING_INT): when a SET
130 /// stores a clean canonical i64 ASCII string (parses round-trip), we
131 /// keep the integer **as i64** rather than as 22 B of inline bytes.
132 /// Wins on INCR (in-place `+= delta`, no parse / no format / no
133 /// SmallBytes wrap) and on memory (8 B vs 24 B). GET formats it via
134 /// a per-`Store` scratch buffer.
135 Int(i64),
136 /// L1 (2026-06-21) / v1.29 Option A (2026-06-29): values larger
137 /// than [`BULK_THRESHOLD`] bytes get stored behind an
138 /// `Arc<Box<[u8]>>` instead of a heap-backed `SmallBytes`. The Arc
139 /// lets the io_uring reactor's reply path borrow the bytes across
140 /// the SQE→CQE window safely (Arc clone keeps them alive even if
141 /// the keyspace mutates) — the prerequisite for the writev
142 /// zero-copy bulk reply path, which skips the per-GET memcpy from
143 /// value storage into the per-conn output buffer.
144 ///
145 /// **Why `Arc<Box<[u8]>>` and not `Arc<[u8]>`**: `Arc<[u8]>` is a
146 /// DST-backed `ArcInner<[u8]> = { strong, weak, [u8; N] }` whose
147 /// data slot sits past the refcount words. `Arc::from(Vec<u8>)`
148 /// allocates a fresh `ArcInner` and `copy_from_slice`s the bytes
149 /// — a hard mandatory 64 KiB memcpy on every big SET. With
150 /// `Arc<Box<[u8]>>`, the `Box<[u8]>` wrapper occupies the Arc's
151 /// data slot (16 B), pointing AT an unchanged heap buffer; so
152 /// `Arc::new(vec.into_boxed_slice())` is **truly zero-copy**
153 /// (the boxed slice's allocation stays put — only the 32-byte
154 /// `ArcInner` is freshly malloced). Per-GET cost: one extra
155 /// pointer dereference (`&**arc` to get `&[u8]`), measured to be
156 /// negligible vs the per-SET memcpy savings. See
157 /// `bench/PERF-FINDING-2026-06-29-arc-from-box-memcpys.md` for
158 /// the empirical perf-record evidence of the original
159 /// `Arc<[u8]>` mandatory copy.
160 ///
161 /// Small values stay on `Str(SmallBytes)` because the inline
162 /// cache-line storage beats an Arc indirection for the common case.
163 ArcBulk(Arc<Box<[u8]>>),
164 Hash(Arc<HashData>),
165 List(Arc<ListData>),
166 Set(Arc<SetData>),
167 ZSet(Arc<ZSetData>),
168 Stream(Arc<crate::stream::StreamData>),
169 /// v1.25 A.7 O5 (valkey-orthodox encoding switch): tiny sets (1-N
170 /// short members) live inline in 24 bytes instead of behind
171 /// `Arc<SetData>` — matches valkey's `OBJ_ENCODING_LISTPACK` for
172 /// sets, which is what `redis-benchmark -t sadd` default `-r 0`
173 /// (cardinality stays at 1 forever, single 20-byte literal member)
174 /// measures. On overflow ([`crate::small_set::SmallSetData::try_add`]
175 /// returns `NoRoom`) the set is promoted to `Value::Set(Arc<SetData>)`
176 /// — the Swiss-table path that wins for larger cardinalities.
177 SmallSetInline(crate::small_set::SmallSetData),
178 /// v1.25 A.8 (extension of A.7 to hashes): tiny hashes
179 /// (1-2 short field-value pairs) live inline in 24 bytes; promoted
180 /// to `Value::Hash(Arc<HashData>)` on overflow. Mirrors valkey's
181 /// `OBJ_ENCODING_LISTPACK` for hashes.
182 SmallHashInline(crate::small_hash::SmallHashData),
183 /// v1.25 A.8: tiny lists inline encoding; promoted to
184 /// `Value::List(Arc<ListData>)` on overflow.
185 SmallListInline(crate::small_list::SmallListData),
186 /// v1.25 A.8: tiny sorted sets inline encoding; promoted to
187 /// `Value::ZSet(Arc<ZSetData>)` on overflow.
188 SmallZSetInline(crate::small_zset::SmallZSetData),
189}
190
191/// Threshold (bytes) above which a SET stores its value as
192/// [`Value::ArcBulk`] (writev-eligible on GET) instead of [`Value::Str`]
193/// (inline `SmallBytes`). 64 B ≈ one cache line — below that the
194/// inline-SmallBytes storage wins on L1 locality; above it the
195/// writev-borrow win dominates.
196pub const BULK_THRESHOLD: usize = 64;
197
198const _: () = {
199 // Don't let future variants undo box-collection's Entry-48B win.
200 assert!(std::mem::size_of::<Value>() <= 32);
201};
202
203/// Heap-size threshold above which an overwritten `Value` is sent to the
204/// runtime's bio thread for off-reactor drop instead of being freed inline
205/// (v1.25 A.3 lazy-drop).
206///
207/// **Calibrated 2026-06-22 from the bench-with-256-B-floor R3 ★ finding**:
208/// dropping the threshold to 256 B regressed Axis I c=50 -d 10240 SET
209/// p999 from 0.487 → 1.583 ms (worse by 3.25×). The cause: `std::sync::mpsc::Sender::send`
210/// is a few hundred ns of atomic + Box clone, which EXCEEDS the inline
211/// `Box::<[u8]>::drop` cost when jemalloc serves the free from a hot
212/// large-class slab (~ 1-3 µs for 10 KB; the bench's steady state).
213/// Off-loading only wins when the inline drop's tail risk (cold-slab
214/// `munmap`/`madvise` consolidation stall, observed at 50-150 µs and
215/// occasionally millisecond-range) exceeds the per-send channel cost
216/// PLUS the cross-thread cache-line bouncing.
217///
218/// v1.25 A.2 (batch-send follow-up to A.3): with per-shard batch
219/// accumulation flushing at the end of every reactor iteration, the
220/// per-mpsc-send cost is amortised across N drops. That makes the
221/// channel hop profitable at smaller sizes than A.3's lone-send model
222/// could justify (A.3 had to lift to 16 KB because per-`mpsc::send`
223/// cost was a few hundred ns — at 256 B the inline drop was cheaper).
224///
225/// **R3 ★ — sweet-spot threshold surprise**: the agent brief and A.3
226/// commit body both gestured at dropping the threshold to 256 B – 1 KB
227/// once batching amortises the send. Sweep on lx64 across
228/// {512, 1024, 4096, 16384} × c=50 SET -d {1K, 4K, 10K, 64K}
229/// disproved that floor: at ≤ 1 KB threshold, p999 / max on small
230/// values (-d 1024, -d 4096) was variance-bounded equal or
231/// occasionally WORSE than the A.3 16 KB threshold, while the larger
232/// sizes (10 KB / 64 KB) won either way. Cause: the Vec::push +
233/// occasional `MAX_PENDING_DROPS` force-flush stall costs more for
234/// small Arcs (jemalloc small-class free is sub-µs even at tail)
235/// than the inline drop it avoids.
236///
237/// Picked **4 KB** as the lowest threshold where the bio-off-reactor
238/// win consistently dominates the batch-buffer overhead on tail
239/// metrics. The biggest A.2 wins (vs A.3 16 KB) land on `-d 64K`
240/// SET p50 (-44 %) and `-d 10K` SET max (-35 %), where each iter's
241/// batch already contains several heavy values per shard.
242pub const HEAP_HEAVY_BYTES: usize = 4 * 1024;
243
244/// Sender half of the runtime's bio-drop channel. Wired from
245/// `kevy-rt`'s `bio.rs` via [`crate::Store::set_bio_drop_sender`]; the
246/// concrete payload is `Vec<Box<Value>>` — a **batch** of values
247/// produced by one shard since its last flush (A.2 batch-send model).
248/// The bio thread (`kevy-rt::bio::spawn`) iterates the batch and
249/// drops each item. One mpsc message per shard-flush amortises the
250/// channel cost (atomic + cross-thread cacheline traffic) across
251/// however many values landed in the batch.
252pub type BioDropSender = std::sync::mpsc::Sender<Vec<Box<Value>>>;
253
254impl Value {
255 /// The Redis type name (`TYPE` command).
256 pub fn type_name(&self) -> &'static str {
257 match self {
258 Value::Str(_) | Value::Int(_) | Value::ArcBulk(_) => "string",
259 Value::Hash(_) | Value::SmallHashInline(_) => "hash",
260 Value::List(_) | Value::SmallListInline(_) => "list",
261 Value::Set(_) | Value::SmallSetInline(_) => "set",
262 Value::ZSet(_) | Value::SmallZSetInline(_) => "zset",
263 Value::Stream(_) => "stream",
264 }
265 }
266
267 /// Approximate heap bytes the value owns. Excludes the inline `Entry` /
268 /// bucket slot — that's a separate per-entry constant accounted by the
269 /// store. Walks collections, so prefer the cached `Entry::weight` for
270 /// hot-path accounting and only call this when bootstrapping or after a
271 /// load-from-snapshot.
272 pub fn weight(&self) -> u64 {
273 match self {
274 Value::Str(s) => s.heap_bytes() as u64,
275 // i64 fits in the enum tag's space; no heap.
276 Value::Int(_) => 0,
277 // Arc<[u8]> heap = the byte slice itself (refcount overhead
278 // is amortised across shared clones).
279 Value::ArcBulk(a) => a.len() as u64,
280 Value::Hash(h) => collection_overhead(h.capacity(), HASH_SLOT_BYTES) + h
281 .iter()
282 .map(|(f, v)| f.heap_bytes() as u64 + v.capacity() as u64)
283 .sum::<u64>(),
284 Value::List(l) => (l.capacity() as u64).saturating_mul(LIST_SLOT_BYTES)
285 + l.iter().map(|v| v.capacity() as u64).sum::<u64>(),
286 Value::Set(s) => collection_overhead(s.capacity(), SET_SLOT_BYTES) + s
287 .iter()
288 .map(|m| m.heap_bytes() as u64)
289 .sum::<u64>(),
290 // Inline collections live entirely in the Value variant
291 // body — zero heap, zero bucket overhead. Accounting matches
292 // `Value::Int` / inline `Value::Str` (both also return 0).
293 Value::SmallSetInline(_)
294 | Value::SmallHashInline(_)
295 | Value::SmallListInline(_)
296 | Value::SmallZSetInline(_) => 0,
297 Value::ZSet(z) => collection_overhead(z.by_member.capacity(), HASH_SLOT_BYTES)
298 + z.by_member
299 .iter()
300 .map(|(m, _)| m.heap_bytes() as u64)
301 .sum::<u64>()
302 + (z.by_score.len() as u64).saturating_mul(BTREE_SLOT_BYTES),
303 Value::Stream(s) => s.weight(),
304 }
305 }
306
307 /// Whether this value's `Drop` is heavy enough to deserve being
308 /// shipped to the bio thread instead of freed inline. Fast: every
309 /// variant decides off a sub-field cheap to inspect (no recursive
310 /// walk), so it's safe to call on every overwrite-SET on the hot
311 /// path. The threshold is intentionally conservative — small Arcs
312 /// + every short string stay on inline-drop where jemalloc small-
313 /// class is sub-µs and a cross-thread hand-off would lose.
314 #[inline]
315 pub fn is_heap_heavy(&self) -> bool {
316 match self {
317 // Inline 22 B / heap ≤ small-class — fast to free inline.
318 Value::Str(_)
319 | Value::Int(_)
320 | Value::SmallSetInline(_)
321 | Value::SmallHashInline(_)
322 | Value::SmallListInline(_)
323 | Value::SmallZSetInline(_) => false,
324 // The Axis I culprit. v1.25 A.3 lazy-drop's primary case.
325 Value::ArcBulk(a) => a.len() >= HEAP_HEAVY_BYTES,
326 // Collection drops walk every element + the bucket array;
327 // worst-case microseconds on a multi-KB hash/zset. Send to
328 // bio so a SET that overwrites a collection-typed key (the
329 // Redis polymorphic case) doesn't stall the reactor.
330 //
331 // The check uses `Arc::strong_count == 1` to avoid sending
332 // a still-shared Arc: another holder (a SnapshotView in
333 // flight, a same-shard live read) would force the bio
334 // thread to only do a refcount-decrement, which is wasted
335 // cross-thread traffic. A unique Arc IS the case where
336 // drop is expensive (it really frees the inner payload).
337 Value::Hash(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
338 Value::List(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
339 Value::Set(a) => std::sync::Arc::strong_count(a) == 1 && !a.is_empty(),
340 Value::ZSet(a) => {
341 std::sync::Arc::strong_count(a) == 1 && a.by_member.len() > 0
342 }
343 Value::Stream(a) => {
344 std::sync::Arc::strong_count(a) == 1 && a.length() > 0
345 }
346 }
347 }
348}
349
350// `BioDropSender = mpsc::Sender<Box<Value>>` requires `Value: Send`. Static
351// assert: if a future variant inadvertently makes Value `!Send` (e.g. an
352// `Rc<...>` payload) this fails at compile time, BEFORE the runtime tries
353// to hand a value to the bio thread.
354const _: fn() = || {
355 fn assert_send<T: Send>() {}
356 assert_send::<Value>();
357};
358
359/// Per-bucket footprint for `KevyMap`/`KevySet`-backed collections (open-
360/// addressing Swiss table). Approximation, not exact: includes metadata byte
361/// per slot plus the boxed `K`/`V` cell, padded for 7/8 load factor.
362pub(crate) const HASH_SLOT_BYTES: u64 = 32;
363pub(crate) const SET_SLOT_BYTES: u64 = 24;
364/// `VecDeque` ring-buffer slot per stored `Vec<u8>` header (24 B Vec metadata).
365pub(crate) const LIST_SLOT_BYTES: u64 = 24;
366/// `BTreeSet` per-entry overhead (node pointers + 6-element B-tree node padding).
367pub(crate) const BTREE_SLOT_BYTES: u64 = 40;
368/// Per-entry overhead in the top-level keyspace map: the inline 24-byte
369/// `SmallBytes` key cell + the 64-byte `Entry` (post weight/clock fields) +
370/// metadata. Approximation that errs slightly high so `used_memory` stays a
371/// conservative upper bound vs the actual allocator footprint.
372pub const ENTRY_OVERHEAD: u64 = 96;
373
374#[inline]
375fn collection_overhead(capacity: usize, per_slot: u64) -> u64 {
376 (capacity as u64).saturating_mul(per_slot)
377}
378
379/// Per-field delta a new hash field charges against the entry weight: heap
380/// bytes for the field name (if not inline) + value capacity + one slot of
381/// bucket overhead. Used when an HSET inserts a brand-new field.
382#[inline]
383pub fn hash_field_weight(field: &SmallBytes, value_cap: usize) -> u64 {
384 field.heap_bytes() as u64 + value_cap as u64 + HASH_SLOT_BYTES
385}
386
387/// Per-member delta a new set member charges. Mirrors [`hash_field_weight`]
388/// for the set variant (no separate value, single bucket slot).
389#[inline]
390pub fn set_member_weight(member: &SmallBytes) -> u64 {
391 member.heap_bytes() as u64 + SET_SLOT_BYTES
392}
393
394/// Per-item delta a new list element charges (Vec header slot + heap cap).
395#[inline]
396pub fn list_item_weight(value_cap: usize) -> u64 {
397 LIST_SLOT_BYTES + value_cap as u64
398}
399
400/// Per-member delta a new zset member charges: hash slot for `by_member` +
401/// BTreeSet slot for `by_score` + the member's heap bytes.
402#[inline]
403pub fn zset_member_weight(member: &SmallBytes) -> u64 {
404 member.heap_bytes() as u64 + HASH_SLOT_BYTES + BTREE_SLOT_BYTES
405}