Skip to main content

kevy_store/
zset.rs

1//! `Store` sorted-set commands.
2
3use crate::util::range_bounds;
4use crate::value::{ZSetData, SmallBytes, Value, zset_member_weight, ScoreBound};
5use crate::{Entry, Store, StoreError};
6use std::sync::Arc;
7
8impl Store {
9    // ---- sorted sets ---------------------------------------------------
10
11    fn zset_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut ZSetData>, StoreError> {
12        if self.live_entry_mut(key).is_none() {
13            if !create {
14                return Ok(None);
15            }
16            self.insert_entry(
17                SmallBytes::from_slice(key),
18                Entry::new(Value::ZSet(Arc::default()), None),
19            );
20        }
21        match &mut self.map.get_mut(key).expect("present").value {
22            Value::ZSet(z) => Ok(Some(Arc::make_mut(z))),
23            _ => Err(StoreError::WrongType),
24        }
25    }
26
27    fn zset_ref(&mut self, key: &[u8]) -> Result<Option<&ZSetData>, StoreError> {
28        match self.live_entry(key) {
29            None => Ok(None),
30            Some(e) => match &e.value {
31                Value::ZSet(z) => Ok(Some(z.as_ref())),
32                _ => Err(StoreError::WrongType),
33            },
34        }
35    }
36
37    fn drop_if_empty_zset(&mut self, key: &[u8]) {
38        let empty = matches!(self.map.get(key).map(|e| &e.value), Some(Value::ZSet(z)) if z.len() == 0);
39        if empty {
40            self.remove_entry(key);
41        }
42    }
43
44    /// `ZADD` — returns the count of newly-added members (updates don't count).
45    pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, Vec<u8>)]) -> Result<usize, StoreError> {
46        let (added, delta) = {
47            let z = self.zset_mut(key, true)?.expect("created");
48            let mut a = 0usize;
49            let mut d: i64 = 0;
50            for (score, m) in pairs {
51                let smb = SmallBytes::from_slice(m);
52                let w = zset_member_weight(&smb) as i64;
53                if z.insert(m, *score) {
54                    a += 1;
55                    d += w;
56                }
57                // Updating an existing score reuses the same member entry —
58                // no weight delta (f64 score is a fixed 8 B already counted).
59            }
60            (a, d)
61        };
62        self.account_delta(key, delta);
63        Ok(added)
64    }
65
66    pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> Result<Option<f64>, StoreError> {
67        Ok(self
68            .zset_ref(key)?
69            .and_then(|z| z.by_member.get(member).copied()))
70    }
71
72    pub fn zcard(&mut self, key: &[u8]) -> Result<usize, StoreError> {
73        Ok(self.zset_ref(key)?.map_or(0, super::value::ZSetData::len))
74    }
75
76    pub fn zrem(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
77        let (removed, delta) = {
78            let mut r = 0usize;
79            let mut d: i64 = 0;
80            if let Some(z) = self.zset_mut(key, false)? {
81                for m in members {
82                    if z.remove(m.as_slice()) {
83                        r += 1;
84                        d -= zset_member_weight(&SmallBytes::from_slice(m)) as i64;
85                    }
86                }
87            }
88            (r, d)
89        };
90        self.account_delta(key, delta);
91        self.drop_if_empty_zset(key);
92        Ok(removed)
93    }
94
95    /// `ZRANK` — 0-based position in ascending order (O(n) for now).
96    pub fn zrank(&mut self, key: &[u8], member: &[u8]) -> Result<Option<usize>, StoreError> {
97        Ok(self
98            .zset_ref(key)?
99            .and_then(|z| z.ordered().position(|(m, _)| m == member)))
100    }
101
102    /// `ZINCRBY` — add `incr` to a member's score (default 0), returns the new score.
103    pub fn zincrby(&mut self, key: &[u8], incr: f64, member: &[u8]) -> Result<f64, StoreError> {
104        let (next, delta) = {
105            let z = self.zset_mut(key, true)?.expect("created");
106            let cur = z.by_member.get(member).copied().unwrap_or(0.0);
107            let next = cur + incr;
108            let smb = SmallBytes::from_slice(member);
109            let is_new = !z.by_member.contains_key(member);
110            z.insert(member, next);
111            let d = if is_new { zset_member_weight(&smb) as i64 } else { 0 };
112            (next, d)
113        };
114        self.account_delta(key, delta);
115        Ok(next)
116    }
117
118    /// `ZRANGE key start stop` by rank.
119    pub fn zrange(
120        &mut self,
121        key: &[u8],
122        start: i64,
123        stop: i64,
124    ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
125        match self.zset_ref(key)? {
126            None => Ok(Vec::new()),
127            Some(z) => Ok(match range_bounds(start, stop, z.len()) {
128                None => Vec::new(),
129                Some((s, e)) => z
130                    .ordered()
131                    .skip(s)
132                    .take(e - s + 1)
133                    .map(|(m, sc)| (m.to_vec(), sc))
134                    .collect(),
135            }),
136        }
137    }
138
139    /// `ZRANGEBYSCORE` — members with score in the (possibly exclusive) bounds.
140    pub fn zrange_by_score(
141        &mut self,
142        key: &[u8],
143        min: ScoreBound,
144        max: ScoreBound,
145    ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
146        Ok(self.zset_ref(key)?.map_or(Vec::new(), |z| {
147            z.ordered()
148                .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
149                .map(|(m, sc)| (m.to_vec(), sc))
150                .collect()
151        }))
152    }
153
154    /// `ZCOUNT` — number of members with score in the bounds.
155    pub fn zcount(
156        &mut self,
157        key: &[u8],
158        min: ScoreBound,
159        max: ScoreBound,
160    ) -> Result<usize, StoreError> {
161        Ok(self.zset_ref(key)?.map_or(0, |z| {
162            z.ordered()
163                .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
164                .count()
165        }))
166    }
167}