1use crate::util::range_bounds;
4use crate::value::{ZSetData, SmallBytes, Value, zset_member_weight, ScoreBound};
5use crate::{Entry, Store, StoreError};
6use std::sync::Arc;
7
8impl Store {
9 fn zset_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut ZSetData>, StoreError> {
12 if self.live_entry_mut(key).is_none() {
13 if !create {
14 return Ok(None);
15 }
16 self.insert_entry(
17 SmallBytes::from_slice(key),
18 Entry::new(Value::ZSet(Arc::default()), None),
19 );
20 }
21 match &mut self.map.get_mut(key).expect("present").value {
22 Value::ZSet(z) => Ok(Some(Arc::make_mut(z))),
23 _ => Err(StoreError::WrongType),
24 }
25 }
26
27 fn zset_ref(&mut self, key: &[u8]) -> Result<Option<&ZSetData>, StoreError> {
28 match self.live_entry(key) {
29 None => Ok(None),
30 Some(e) => match &e.value {
31 Value::ZSet(z) => Ok(Some(z.as_ref())),
32 _ => Err(StoreError::WrongType),
33 },
34 }
35 }
36
37 fn drop_if_empty_zset(&mut self, key: &[u8]) {
38 let empty = matches!(self.map.get(key).map(|e| &e.value), Some(Value::ZSet(z)) if z.len() == 0);
39 if empty {
40 self.remove_entry(key);
41 }
42 }
43
44 pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, Vec<u8>)]) -> Result<usize, StoreError> {
46 let (added, delta) = {
47 let z = self.zset_mut(key, true)?.expect("created");
48 let mut a = 0usize;
49 let mut d: i64 = 0;
50 for (score, m) in pairs {
51 let smb = SmallBytes::from_slice(m);
52 let w = zset_member_weight(&smb) as i64;
53 if z.insert(m, *score) {
54 a += 1;
55 d += w;
56 }
57 }
60 (a, d)
61 };
62 self.account_delta(key, delta);
63 Ok(added)
64 }
65
66 pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> Result<Option<f64>, StoreError> {
67 Ok(self
68 .zset_ref(key)?
69 .and_then(|z| z.by_member.get(member).copied()))
70 }
71
72 pub fn zcard(&mut self, key: &[u8]) -> Result<usize, StoreError> {
73 Ok(self.zset_ref(key)?.map_or(0, super::value::ZSetData::len))
74 }
75
76 pub fn zrem(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
77 let (removed, delta) = {
78 let mut r = 0usize;
79 let mut d: i64 = 0;
80 if let Some(z) = self.zset_mut(key, false)? {
81 for m in members {
82 if z.remove(m.as_slice()) {
83 r += 1;
84 d -= zset_member_weight(&SmallBytes::from_slice(m)) as i64;
85 }
86 }
87 }
88 (r, d)
89 };
90 self.account_delta(key, delta);
91 self.drop_if_empty_zset(key);
92 Ok(removed)
93 }
94
95 pub fn zrank(&mut self, key: &[u8], member: &[u8]) -> Result<Option<usize>, StoreError> {
97 Ok(self
98 .zset_ref(key)?
99 .and_then(|z| z.ordered().position(|(m, _)| m == member)))
100 }
101
102 pub fn zincrby(&mut self, key: &[u8], incr: f64, member: &[u8]) -> Result<f64, StoreError> {
104 let (next, delta) = {
105 let z = self.zset_mut(key, true)?.expect("created");
106 let cur = z.by_member.get(member).copied().unwrap_or(0.0);
107 let next = cur + incr;
108 let smb = SmallBytes::from_slice(member);
109 let is_new = !z.by_member.contains_key(member);
110 z.insert(member, next);
111 let d = if is_new { zset_member_weight(&smb) as i64 } else { 0 };
112 (next, d)
113 };
114 self.account_delta(key, delta);
115 Ok(next)
116 }
117
118 pub fn zrange(
120 &mut self,
121 key: &[u8],
122 start: i64,
123 stop: i64,
124 ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
125 match self.zset_ref(key)? {
126 None => Ok(Vec::new()),
127 Some(z) => Ok(match range_bounds(start, stop, z.len()) {
128 None => Vec::new(),
129 Some((s, e)) => z
130 .ordered()
131 .skip(s)
132 .take(e - s + 1)
133 .map(|(m, sc)| (m.to_vec(), sc))
134 .collect(),
135 }),
136 }
137 }
138
139 pub fn zrange_by_score(
141 &mut self,
142 key: &[u8],
143 min: ScoreBound,
144 max: ScoreBound,
145 ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
146 Ok(self.zset_ref(key)?.map_or(Vec::new(), |z| {
147 z.ordered()
148 .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
149 .map(|(m, sc)| (m.to_vec(), sc))
150 .collect()
151 }))
152 }
153
154 pub fn zcount(
156 &mut self,
157 key: &[u8],
158 min: ScoreBound,
159 max: ScoreBound,
160 ) -> Result<usize, StoreError> {
161 Ok(self.zset_ref(key)?.map_or(0, |z| {
162 z.ordered()
163 .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
164 .count()
165 }))
166 }
167}