Skip to main content

kevy_store/
zset.rs

1//! `Store` sorted-set commands.
2
3use crate::small_zset::{self, AddResult as ZAddResult, SmallZSetData};
4use crate::util::range_bounds;
5use crate::value::{ZSetData, SmallBytes, Value, zset_member_weight, ScoreBound};
6use crate::{Entry, Store, StoreError};
7use std::sync::Arc;
8
9impl Store {
10    // ---- sorted sets ---------------------------------------------------
11
12    /// Borrow the key's zset mutably; promote inline → heap if needed.
13    fn zset_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut ZSetData>, StoreError> {
14        if self.live_entry_mut(key).is_none() {
15            if !create {
16                return Ok(None);
17            }
18            self.insert_entry(
19                SmallBytes::from_slice(key),
20                Entry::new(Value::ZSet(Arc::default()), None),
21            );
22        }
23        // A.8: see hash.rs::hash_mut — promote out-of-scope.
24        let is_inline = matches!(
25            self.map.get(key).map(|e| &e.value),
26            Some(Value::SmallZSetInline(_))
27        );
28        if is_inline {
29            let promoted = {
30                let e = self.map.get(key).expect("present");
31                if let Value::SmallZSetInline(s) = &e.value {
32                    small_zset::promote(s)
33                } else {
34                    unreachable!()
35                }
36            };
37            self.map.get_mut(key).expect("present").value = Value::ZSet(Arc::new(promoted));
38            self.reweigh_entry(key);
39        }
40        match &mut self.map.get_mut(key).expect("present").value {
41            Value::ZSet(z) => Ok(Some(Arc::make_mut(z))),
42            _ => Err(StoreError::WrongType),
43        }
44    }
45
46    /// A.8: read the key's zset slot for ZADD. None when absent.
47    fn zset_value_for_set(&mut self, key: &[u8]) -> Result<Option<&mut Value>, StoreError> {
48        match self.live_entry_mut(key) {
49            None => Ok(None),
50            Some(e) => match &e.value {
51                Value::ZSet(_) | Value::SmallZSetInline(_) => Ok(Some(&mut e.value)),
52                _ => Err(StoreError::WrongType),
53            },
54        }
55    }
56
57    fn drop_if_empty_zset(&mut self, key: &[u8]) {
58        let empty = match self.map.get(key).map(|e| &e.value) {
59            Some(Value::ZSet(z)) => z.len() == 0,
60            Some(Value::SmallZSetInline(z)) => z.is_empty(),
61            _ => false,
62        };
63        if empty {
64            self.remove_entry(key);
65        }
66    }
67
68    /// G4 (v1.25): borrowed-pair `ZADD`. A.8: encoding-switch.
69    pub fn zadd_borrowed(
70        &mut self,
71        key: &[u8],
72        pairs: &[(f64, &[u8])],
73    ) -> Result<usize, StoreError> {
74        if pairs.is_empty() {
75            return Ok(0);
76        }
77        let mut added = 0usize;
78        let mut delta: i64 = 0;
79        for (score, m) in pairs {
80            match self.zadd_one(key, *m, *score)? {
81                ZaddOutcome::AddedInline => added += 1,
82                ZaddOutcome::UpdatedInline => {}
83                ZaddOutcome::AddedHeap(w) => {
84                    added += 1;
85                    delta += w;
86                }
87                ZaddOutcome::UpdatedHeap => {}
88            }
89        }
90        self.account_delta(key, delta);
91        Ok(added)
92    }
93
94    /// `ZADD` — returns the count of newly-added members.
95    pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, Vec<u8>)]) -> Result<usize, StoreError> {
96        let borrowed: Vec<(f64, &[u8])> =
97            pairs.iter().map(|(s, m)| (*s, m.as_slice())).collect();
98        self.zadd_borrowed(key, &borrowed)
99    }
100
101    pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> Result<Option<f64>, StoreError> {
102        match self.live_entry(key) {
103            None => Ok(None),
104            Some(e) => match &e.value {
105                Value::ZSet(z) => Ok(z.by_member.get(member).copied()),
106                Value::SmallZSetInline(z) => Ok(z.score(member)),
107                _ => Err(StoreError::WrongType),
108            },
109        }
110    }
111
112    pub fn zcard(&mut self, key: &[u8]) -> Result<usize, StoreError> {
113        match self.live_entry(key) {
114            None => Ok(0),
115            Some(e) => match &e.value {
116                Value::ZSet(z) => Ok(z.len()),
117                Value::SmallZSetInline(z) => Ok(z.len()),
118                _ => Err(StoreError::WrongType),
119            },
120        }
121    }
122
123    pub fn zrem(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
124        let borrowed: Vec<&[u8]> = members.iter().map(Vec::as_slice).collect();
125        self.zrem_borrowed(key, &borrowed)
126    }
127
128    /// G4 (v1.25): borrowed-slice `ZREM`. A.8: encoding-aware.
129    pub fn zrem_borrowed(
130        &mut self,
131        key: &[u8],
132        members: &[&[u8]],
133    ) -> Result<usize, StoreError> {
134        let (removed, delta) = {
135            let mut r = 0usize;
136            let mut d: i64 = 0;
137            if let Some(e) = self.live_entry_mut(key) {
138                match &mut e.value {
139                    Value::ZSet(z) => {
140                        // G-A3: hoist Arc::make_mut OUT of loop.
141                        let z = Arc::make_mut(z);
142                        for m in members {
143                            if z.remove(*m) {
144                                r += 1;
145                                d -= zset_member_weight(&SmallBytes::from_slice(m)) as i64;
146                            }
147                        }
148                    }
149                    Value::SmallZSetInline(z) => {
150                        for m in members {
151                            if z.try_remove(m) {
152                                r += 1;
153                            }
154                        }
155                    }
156                    _ => return Err(StoreError::WrongType),
157                }
158            }
159            (r, d)
160        };
161        self.account_delta(key, delta);
162        self.drop_if_empty_zset(key);
163        Ok(removed)
164    }
165
166    /// `ZRANK` — 0-based position in ascending order (O(n) for now).
167    pub fn zrank(&mut self, key: &[u8], member: &[u8]) -> Result<Option<usize>, StoreError> {
168        match self.live_entry(key) {
169            None => Ok(None),
170            Some(e) => match &e.value {
171                Value::ZSet(z) => Ok(z.ordered().position(|(m, _)| m == member)),
172                Value::SmallZSetInline(z) => {
173                    // Inline holds at most 2 entries; sort by score (then
174                    // bytes) so ZRANK matches ZRANGE order.
175                    let mut entries: Vec<(&[u8], f64)> = z.iter().collect();
176                    entries.sort_by(|a, b| {
177                        a.1.total_cmp(&b.1).then_with(|| a.0.cmp(b.0))
178                    });
179                    Ok(entries.iter().position(|(m, _)| *m == member))
180                }
181                _ => Err(StoreError::WrongType),
182            },
183        }
184    }
185
186    /// `ZINCRBY` — add `incr` to a member's score; returns the new score.
187    pub fn zincrby(&mut self, key: &[u8], incr: f64, member: &[u8]) -> Result<f64, StoreError> {
188        let z = self.zset_mut(key, true)?.expect("created");
189        let cur = z.by_member.get(member).copied().unwrap_or(0.0);
190        let next = cur + incr;
191        let smb = SmallBytes::from_slice(member);
192        let is_new = !z.by_member.contains_key(member);
193        z.insert(member, next);
194        let d = if is_new { zset_member_weight(&smb) as i64 } else { 0 };
195        self.account_delta(key, d);
196        Ok(next)
197    }
198
199    /// `ZRANGE key start stop` by rank.
200    pub fn zrange(
201        &mut self,
202        key: &[u8],
203        start: i64,
204        stop: i64,
205    ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
206        match self.live_entry(key) {
207            None => Ok(Vec::new()),
208            Some(e) => match &e.value {
209                Value::ZSet(z) => Ok(match range_bounds(start, stop, z.len()) {
210                    None => Vec::new(),
211                    Some((s, end)) => z
212                        .ordered()
213                        .skip(s)
214                        .take(end - s + 1)
215                        .map(|(m, sc)| (m.to_vec(), sc))
216                        .collect(),
217                }),
218                Value::SmallZSetInline(z) => {
219                    let mut entries: Vec<(Vec<u8>, f64)> =
220                        z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect();
221                    entries.sort_by(|a, b| {
222                        a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0))
223                    });
224                    Ok(match range_bounds(start, stop, entries.len()) {
225                        None => Vec::new(),
226                        Some((s, end)) => entries.into_iter().skip(s).take(end - s + 1).collect(),
227                    })
228                }
229                _ => Err(StoreError::WrongType),
230            },
231        }
232    }
233
234    /// `ZRANGEBYSCORE`.
235    pub fn zrange_by_score(
236        &mut self,
237        key: &[u8],
238        min: ScoreBound,
239        max: ScoreBound,
240    ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
241        match self.live_entry(key) {
242            None => Ok(Vec::new()),
243            Some(e) => match &e.value {
244                Value::ZSet(z) => Ok(z
245                    .ordered()
246                    .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
247                    .map(|(m, sc)| (m.to_vec(), sc))
248                    .collect()),
249                Value::SmallZSetInline(z) => {
250                    let mut entries: Vec<(Vec<u8>, f64)> = z
251                        .iter()
252                        .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
253                        .map(|(m, sc)| (m.to_vec(), sc))
254                        .collect();
255                    entries.sort_by(|a, b| {
256                        a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0))
257                    });
258                    Ok(entries)
259                }
260                _ => Err(StoreError::WrongType),
261            },
262        }
263    }
264
265    /// `ZCOUNT`.
266    pub fn zcount(
267        &mut self,
268        key: &[u8],
269        min: ScoreBound,
270        max: ScoreBound,
271    ) -> Result<usize, StoreError> {
272        match self.live_entry(key) {
273            None => Ok(0),
274            Some(e) => match &e.value {
275                Value::ZSet(z) => Ok(z
276                    .ordered()
277                    .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
278                    .count()),
279                Value::SmallZSetInline(z) => Ok(z
280                    .iter()
281                    .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
282                    .count()),
283                _ => Err(StoreError::WrongType),
284            },
285        }
286    }
287
288    /// `ZPOPMIN key [count]` — pop and return the `count` lowest-scored
289    /// members (ascending by `(score, member)`). Returns `(member,
290    /// score)` pairs in pop order; empty when the key is absent / empty.
291    pub fn zpopmin(
292        &mut self,
293        key: &[u8],
294        count: usize,
295    ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
296        if count == 0 {
297            // Validate type up-front so ZPOPMIN k 0 against a wrong-type
298            // key still reports WRONGTYPE (Redis behaviour).
299            if let Some(e) = self.live_entry(key) {
300                match &e.value {
301                    Value::ZSet(_) | Value::SmallZSetInline(_) => {}
302                    _ => return Err(StoreError::WrongType),
303                }
304            }
305            return Ok(Vec::new());
306        }
307        // Snapshot the lowest `count` members first (immutable borrow),
308        // then remove them via the shared zrem path (which handles the
309        // encoding, weight accounting, and empty-key cleanup uniformly).
310        let to_pop: Vec<(Vec<u8>, f64)> = match self.live_entry(key) {
311            None => return Ok(Vec::new()),
312            Some(e) => match &e.value {
313                Value::ZSet(z) => z
314                    .ordered()
315                    .take(count)
316                    .map(|(m, sc)| (m.to_vec(), sc))
317                    .collect(),
318                Value::SmallZSetInline(z) => {
319                    let mut entries: Vec<(Vec<u8>, f64)> =
320                        z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect();
321                    entries.sort_by(|a, b| a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
322                    entries.into_iter().take(count).collect()
323                }
324                _ => return Err(StoreError::WrongType),
325            },
326        };
327        if to_pop.is_empty() {
328            return Ok(to_pop);
329        }
330        let borrowed: Vec<&[u8]> = to_pop.iter().map(|(m, _)| m.as_slice()).collect();
331        self.zrem_borrowed(key, &borrowed)?;
332        Ok(to_pop)
333    }
334
335    /// `ZREMRANGEBYRANK key start stop` — remove members in the rank
336    /// range `[start, stop]` (inclusive, negative indices count from
337    /// the tail). Returns the number of members removed.
338    pub fn zrem_range_by_rank(
339        &mut self,
340        key: &[u8],
341        start: i64,
342        stop: i64,
343    ) -> Result<usize, StoreError> {
344        let to_remove: Vec<Vec<u8>> = match self.live_entry(key) {
345            None => return Ok(0),
346            Some(e) => match &e.value {
347                Value::ZSet(z) => match crate::util::range_bounds(start, stop, z.len()) {
348                    None => return Ok(0),
349                    Some((s, end)) => z
350                        .ordered()
351                        .skip(s)
352                        .take(end - s + 1)
353                        .map(|(m, _)| m.to_vec())
354                        .collect(),
355                },
356                Value::SmallZSetInline(z) => {
357                    let mut entries: Vec<(Vec<u8>, f64)> =
358                        z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect();
359                    entries.sort_by(|a, b| a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
360                    match crate::util::range_bounds(start, stop, entries.len()) {
361                        None => return Ok(0),
362                        Some((s, end)) => entries
363                            .into_iter()
364                            .skip(s)
365                            .take(end - s + 1)
366                            .map(|(m, _)| m)
367                            .collect(),
368                    }
369                }
370                _ => return Err(StoreError::WrongType),
371            },
372        };
373        if to_remove.is_empty() {
374            return Ok(0);
375        }
376        let borrowed: Vec<&[u8]> = to_remove.iter().map(Vec::as_slice).collect();
377        self.zrem_borrowed(key, &borrowed)
378    }
379
380    /// `ZREMRANGEBYSCORE key min max` — remove every member whose score
381    /// satisfies `min ≤ score ≤ max` (with `(` for exclusive bounds via
382    /// `ScoreBound`). Returns the number removed.
383    pub fn zrem_range_by_score(
384        &mut self,
385        key: &[u8],
386        min: ScoreBound,
387        max: ScoreBound,
388    ) -> Result<usize, StoreError> {
389        // Reuse zrange_by_score's bound logic to materialise the hit set
390        // — keeps inline / heap parity in one place.
391        let hits = self.zrange_by_score(key, min, max)?;
392        if hits.is_empty() {
393            // Still need to honour wrong-type errors that zrange_by_score
394            // already surfaced; here Ok([]) means empty match, not type
395            // mismatch, so it's safe to early-return.
396            return Ok(0);
397        }
398        let borrowed: Vec<&[u8]> = hits.iter().map(|(m, _)| m.as_slice()).collect();
399        self.zrem_borrowed(key, &borrowed)
400    }
401
402    /// `ZREVRANGEBYSCORE` — `zrange_by_score` reversed. Bounds are
403    /// passed in the `(min, max)` order already (the caller is
404    /// responsible for swapping the user-facing `max first, min second`
405    /// at the dispatch layer).
406    pub fn zrev_range_by_score(
407        &mut self,
408        key: &[u8],
409        min: ScoreBound,
410        max: ScoreBound,
411    ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
412        let mut v = self.zrange_by_score(key, min, max)?;
413        v.reverse();
414        Ok(v)
415    }
416
417    /// A.8 core: set one `(member, score)` pair via encoding-switch.
418    fn zadd_one(&mut self, key: &[u8], m: &[u8], score: f64) -> Result<ZaddOutcome, StoreError> {
419        if self.zset_value_for_set(key)?.is_none() {
420            return Ok(self.zadd_create(key, m, score));
421        }
422        let v = self.zset_value_for_set(key)?.expect("present and a zset");
423        match v {
424            Value::SmallZSetInline(z) => match z.try_set(m, score) {
425                ZAddResult::Added => Ok(ZaddOutcome::AddedInline),
426                ZAddResult::Updated => Ok(ZaddOutcome::UpdatedInline),
427                ZAddResult::NoRoom => {
428                    let mut promoted = small_zset::promote(z);
429                    let smb = SmallBytes::from_slice(m);
430                    let is_new = !promoted.by_member.contains_key(m);
431                    let w = zset_member_weight(&smb) as i64;
432                    promoted.insert(m, score);
433                    *v = Value::ZSet(Arc::new(promoted));
434                    self.reweigh_entry(key);
435                    if is_new {
436                        Ok(ZaddOutcome::AddedHeap(w))
437                    } else {
438                        Ok(ZaddOutcome::UpdatedHeap)
439                    }
440                }
441            },
442            Value::ZSet(z) => {
443                let z = Arc::make_mut(z);
444                let smb = SmallBytes::from_slice(m);
445                let w = zset_member_weight(&smb) as i64;
446                if z.insert(m, score) {
447                    Ok(ZaddOutcome::AddedHeap(w))
448                } else {
449                    Ok(ZaddOutcome::UpdatedHeap)
450                }
451            }
452            _ => Err(StoreError::WrongType),
453        }
454    }
455
456    /// Create a fresh entry holding one `(member, score)` pair.
457    fn zadd_create(&mut self, key: &[u8], m: &[u8], score: f64) -> ZaddOutcome {
458        if let Some(inline) = SmallZSetData::with_one(m, score) {
459            self.insert_entry(
460                SmallBytes::from_slice(key),
461                Entry::new(Value::SmallZSetInline(inline), None),
462            );
463            ZaddOutcome::AddedInline
464        } else {
465            let mut z = ZSetData::default();
466            z.insert(m, score);
467            self.insert_entry(
468                SmallBytes::from_slice(key),
469                Entry::new(Value::ZSet(Arc::new(z)), None),
470            );
471            ZaddOutcome::AddedInline
472        }
473    }
474}
475
476enum ZaddOutcome {
477    AddedInline,
478    UpdatedInline,
479    AddedHeap(i64),
480    UpdatedHeap,
481}