1use crate::small_zset::{self, AddResult as ZAddResult, SmallZSetData};
4use crate::util::range_bounds;
5use crate::value::{ZSetData, SmallBytes, Value, zset_member_weight, ScoreBound};
6use crate::{Entry, Store, StoreError};
7use std::sync::Arc;
8
9impl Store {
10 fn zset_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut ZSetData>, StoreError> {
14 if self.live_entry_mut(key).is_none() {
15 if !create {
16 return Ok(None);
17 }
18 self.insert_entry(
19 SmallBytes::from_slice(key),
20 Entry::new(Value::ZSet(Arc::default()), None),
21 );
22 }
23 let is_inline = matches!(
25 self.map.get(key).map(|e| &e.value),
26 Some(Value::SmallZSetInline(_))
27 );
28 if is_inline {
29 let promoted = {
30 let e = self.map.get(key).expect("present");
31 if let Value::SmallZSetInline(s) = &e.value {
32 small_zset::promote(s)
33 } else {
34 unreachable!()
35 }
36 };
37 self.map.get_mut(key).expect("present").value = Value::ZSet(Arc::new(promoted));
38 self.reweigh_entry(key);
39 }
40 match &mut self.map.get_mut(key).expect("present").value {
41 Value::ZSet(z) => Ok(Some(Arc::make_mut(z))),
42 _ => Err(StoreError::WrongType),
43 }
44 }
45
46 fn zset_value_for_set(&mut self, key: &[u8]) -> Result<Option<&mut Value>, StoreError> {
48 match self.live_entry_mut(key) {
49 None => Ok(None),
50 Some(e) => match &e.value {
51 Value::ZSet(_) | Value::SmallZSetInline(_) => Ok(Some(&mut e.value)),
52 _ => Err(StoreError::WrongType),
53 },
54 }
55 }
56
57 fn drop_if_empty_zset(&mut self, key: &[u8]) {
58 let empty = match self.map.get(key).map(|e| &e.value) {
59 Some(Value::ZSet(z)) => z.len() == 0,
60 Some(Value::SmallZSetInline(z)) => z.is_empty(),
61 _ => false,
62 };
63 if empty {
64 self.remove_entry(key);
65 }
66 }
67
68 pub fn zadd_borrowed(
70 &mut self,
71 key: &[u8],
72 pairs: &[(f64, &[u8])],
73 ) -> Result<usize, StoreError> {
74 if pairs.is_empty() {
75 return Ok(0);
76 }
77 let mut added = 0usize;
78 let mut delta: i64 = 0;
79 for (score, m) in pairs {
80 match self.zadd_one(key, *m, *score)? {
81 ZaddOutcome::AddedInline => added += 1,
82 ZaddOutcome::UpdatedInline => {}
83 ZaddOutcome::AddedHeap(w) => {
84 added += 1;
85 delta += w;
86 }
87 ZaddOutcome::UpdatedHeap => {}
88 }
89 }
90 self.account_delta(key, delta);
91 Ok(added)
92 }
93
94 pub fn zadd(&mut self, key: &[u8], pairs: &[(f64, Vec<u8>)]) -> Result<usize, StoreError> {
96 let borrowed: Vec<(f64, &[u8])> =
97 pairs.iter().map(|(s, m)| (*s, m.as_slice())).collect();
98 self.zadd_borrowed(key, &borrowed)
99 }
100
101 pub fn zscore(&mut self, key: &[u8], member: &[u8]) -> Result<Option<f64>, StoreError> {
102 match self.live_entry(key) {
103 None => Ok(None),
104 Some(e) => match &e.value {
105 Value::ZSet(z) => Ok(z.by_member.get(member).copied()),
106 Value::SmallZSetInline(z) => Ok(z.score(member)),
107 _ => Err(StoreError::WrongType),
108 },
109 }
110 }
111
112 pub fn zcard(&mut self, key: &[u8]) -> Result<usize, StoreError> {
113 match self.live_entry(key) {
114 None => Ok(0),
115 Some(e) => match &e.value {
116 Value::ZSet(z) => Ok(z.len()),
117 Value::SmallZSetInline(z) => Ok(z.len()),
118 _ => Err(StoreError::WrongType),
119 },
120 }
121 }
122
123 pub fn zrem(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
124 let borrowed: Vec<&[u8]> = members.iter().map(Vec::as_slice).collect();
125 self.zrem_borrowed(key, &borrowed)
126 }
127
128 pub fn zrem_borrowed(
130 &mut self,
131 key: &[u8],
132 members: &[&[u8]],
133 ) -> Result<usize, StoreError> {
134 let (removed, delta) = {
135 let mut r = 0usize;
136 let mut d: i64 = 0;
137 if let Some(e) = self.live_entry_mut(key) {
138 match &mut e.value {
139 Value::ZSet(z) => {
140 let z = Arc::make_mut(z);
142 for m in members {
143 if z.remove(*m) {
144 r += 1;
145 d -= zset_member_weight(&SmallBytes::from_slice(m)) as i64;
146 }
147 }
148 }
149 Value::SmallZSetInline(z) => {
150 for m in members {
151 if z.try_remove(m) {
152 r += 1;
153 }
154 }
155 }
156 _ => return Err(StoreError::WrongType),
157 }
158 }
159 (r, d)
160 };
161 self.account_delta(key, delta);
162 self.drop_if_empty_zset(key);
163 Ok(removed)
164 }
165
166 pub fn zrank(&mut self, key: &[u8], member: &[u8]) -> Result<Option<usize>, StoreError> {
168 match self.live_entry(key) {
169 None => Ok(None),
170 Some(e) => match &e.value {
171 Value::ZSet(z) => Ok(z.ordered().position(|(m, _)| m == member)),
172 Value::SmallZSetInline(z) => {
173 let mut entries: Vec<(&[u8], f64)> = z.iter().collect();
176 entries.sort_by(|a, b| {
177 a.1.total_cmp(&b.1).then_with(|| a.0.cmp(b.0))
178 });
179 Ok(entries.iter().position(|(m, _)| *m == member))
180 }
181 _ => Err(StoreError::WrongType),
182 },
183 }
184 }
185
186 pub fn zincrby(&mut self, key: &[u8], incr: f64, member: &[u8]) -> Result<f64, StoreError> {
188 let z = self.zset_mut(key, true)?.expect("created");
189 let cur = z.by_member.get(member).copied().unwrap_or(0.0);
190 let next = cur + incr;
191 let smb = SmallBytes::from_slice(member);
192 let is_new = !z.by_member.contains_key(member);
193 z.insert(member, next);
194 let d = if is_new { zset_member_weight(&smb) as i64 } else { 0 };
195 self.account_delta(key, d);
196 Ok(next)
197 }
198
199 pub fn zrange(
201 &mut self,
202 key: &[u8],
203 start: i64,
204 stop: i64,
205 ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
206 match self.live_entry(key) {
207 None => Ok(Vec::new()),
208 Some(e) => match &e.value {
209 Value::ZSet(z) => Ok(match range_bounds(start, stop, z.len()) {
210 None => Vec::new(),
211 Some((s, end)) => z
212 .ordered()
213 .skip(s)
214 .take(end - s + 1)
215 .map(|(m, sc)| (m.to_vec(), sc))
216 .collect(),
217 }),
218 Value::SmallZSetInline(z) => {
219 let mut entries: Vec<(Vec<u8>, f64)> =
220 z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect();
221 entries.sort_by(|a, b| {
222 a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0))
223 });
224 Ok(match range_bounds(start, stop, entries.len()) {
225 None => Vec::new(),
226 Some((s, end)) => entries.into_iter().skip(s).take(end - s + 1).collect(),
227 })
228 }
229 _ => Err(StoreError::WrongType),
230 },
231 }
232 }
233
234 pub fn zrange_by_score(
236 &mut self,
237 key: &[u8],
238 min: ScoreBound,
239 max: ScoreBound,
240 ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
241 match self.live_entry(key) {
242 None => Ok(Vec::new()),
243 Some(e) => match &e.value {
244 Value::ZSet(z) => Ok(z
245 .ordered()
246 .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
247 .map(|(m, sc)| (m.to_vec(), sc))
248 .collect()),
249 Value::SmallZSetInline(z) => {
250 let mut entries: Vec<(Vec<u8>, f64)> = z
251 .iter()
252 .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
253 .map(|(m, sc)| (m.to_vec(), sc))
254 .collect();
255 entries.sort_by(|a, b| {
256 a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0))
257 });
258 Ok(entries)
259 }
260 _ => Err(StoreError::WrongType),
261 },
262 }
263 }
264
265 pub fn zcount(
267 &mut self,
268 key: &[u8],
269 min: ScoreBound,
270 max: ScoreBound,
271 ) -> Result<usize, StoreError> {
272 match self.live_entry(key) {
273 None => Ok(0),
274 Some(e) => match &e.value {
275 Value::ZSet(z) => Ok(z
276 .ordered()
277 .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
278 .count()),
279 Value::SmallZSetInline(z) => Ok(z
280 .iter()
281 .filter(|(_, sc)| min.ge_ok(*sc) && max.le_ok(*sc))
282 .count()),
283 _ => Err(StoreError::WrongType),
284 },
285 }
286 }
287
288 pub fn zpopmin(
292 &mut self,
293 key: &[u8],
294 count: usize,
295 ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
296 if count == 0 {
297 if let Some(e) = self.live_entry(key) {
300 match &e.value {
301 Value::ZSet(_) | Value::SmallZSetInline(_) => {}
302 _ => return Err(StoreError::WrongType),
303 }
304 }
305 return Ok(Vec::new());
306 }
307 let to_pop: Vec<(Vec<u8>, f64)> = match self.live_entry(key) {
311 None => return Ok(Vec::new()),
312 Some(e) => match &e.value {
313 Value::ZSet(z) => z
314 .ordered()
315 .take(count)
316 .map(|(m, sc)| (m.to_vec(), sc))
317 .collect(),
318 Value::SmallZSetInline(z) => {
319 let mut entries: Vec<(Vec<u8>, f64)> =
320 z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect();
321 entries.sort_by(|a, b| a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
322 entries.into_iter().take(count).collect()
323 }
324 _ => return Err(StoreError::WrongType),
325 },
326 };
327 if to_pop.is_empty() {
328 return Ok(to_pop);
329 }
330 let borrowed: Vec<&[u8]> = to_pop.iter().map(|(m, _)| m.as_slice()).collect();
331 self.zrem_borrowed(key, &borrowed)?;
332 Ok(to_pop)
333 }
334
335 pub fn zrem_range_by_rank(
339 &mut self,
340 key: &[u8],
341 start: i64,
342 stop: i64,
343 ) -> Result<usize, StoreError> {
344 let to_remove: Vec<Vec<u8>> = match self.live_entry(key) {
345 None => return Ok(0),
346 Some(e) => match &e.value {
347 Value::ZSet(z) => match crate::util::range_bounds(start, stop, z.len()) {
348 None => return Ok(0),
349 Some((s, end)) => z
350 .ordered()
351 .skip(s)
352 .take(end - s + 1)
353 .map(|(m, _)| m.to_vec())
354 .collect(),
355 },
356 Value::SmallZSetInline(z) => {
357 let mut entries: Vec<(Vec<u8>, f64)> =
358 z.iter().map(|(m, sc)| (m.to_vec(), sc)).collect();
359 entries.sort_by(|a, b| a.1.total_cmp(&b.1).then_with(|| a.0.cmp(&b.0)));
360 match crate::util::range_bounds(start, stop, entries.len()) {
361 None => return Ok(0),
362 Some((s, end)) => entries
363 .into_iter()
364 .skip(s)
365 .take(end - s + 1)
366 .map(|(m, _)| m)
367 .collect(),
368 }
369 }
370 _ => return Err(StoreError::WrongType),
371 },
372 };
373 if to_remove.is_empty() {
374 return Ok(0);
375 }
376 let borrowed: Vec<&[u8]> = to_remove.iter().map(Vec::as_slice).collect();
377 self.zrem_borrowed(key, &borrowed)
378 }
379
380 pub fn zrem_range_by_score(
384 &mut self,
385 key: &[u8],
386 min: ScoreBound,
387 max: ScoreBound,
388 ) -> Result<usize, StoreError> {
389 let hits = self.zrange_by_score(key, min, max)?;
392 if hits.is_empty() {
393 return Ok(0);
397 }
398 let borrowed: Vec<&[u8]> = hits.iter().map(|(m, _)| m.as_slice()).collect();
399 self.zrem_borrowed(key, &borrowed)
400 }
401
402 pub fn zrev_range_by_score(
407 &mut self,
408 key: &[u8],
409 min: ScoreBound,
410 max: ScoreBound,
411 ) -> Result<Vec<(Vec<u8>, f64)>, StoreError> {
412 let mut v = self.zrange_by_score(key, min, max)?;
413 v.reverse();
414 Ok(v)
415 }
416
417 fn zadd_one(&mut self, key: &[u8], m: &[u8], score: f64) -> Result<ZaddOutcome, StoreError> {
419 if self.zset_value_for_set(key)?.is_none() {
420 return Ok(self.zadd_create(key, m, score));
421 }
422 let v = self.zset_value_for_set(key)?.expect("present and a zset");
423 match v {
424 Value::SmallZSetInline(z) => match z.try_set(m, score) {
425 ZAddResult::Added => Ok(ZaddOutcome::AddedInline),
426 ZAddResult::Updated => Ok(ZaddOutcome::UpdatedInline),
427 ZAddResult::NoRoom => {
428 let mut promoted = small_zset::promote(z);
429 let smb = SmallBytes::from_slice(m);
430 let is_new = !promoted.by_member.contains_key(m);
431 let w = zset_member_weight(&smb) as i64;
432 promoted.insert(m, score);
433 *v = Value::ZSet(Arc::new(promoted));
434 self.reweigh_entry(key);
435 if is_new {
436 Ok(ZaddOutcome::AddedHeap(w))
437 } else {
438 Ok(ZaddOutcome::UpdatedHeap)
439 }
440 }
441 },
442 Value::ZSet(z) => {
443 let z = Arc::make_mut(z);
444 let smb = SmallBytes::from_slice(m);
445 let w = zset_member_weight(&smb) as i64;
446 if z.insert(m, score) {
447 Ok(ZaddOutcome::AddedHeap(w))
448 } else {
449 Ok(ZaddOutcome::UpdatedHeap)
450 }
451 }
452 _ => Err(StoreError::WrongType),
453 }
454 }
455
456 fn zadd_create(&mut self, key: &[u8], m: &[u8], score: f64) -> ZaddOutcome {
458 if let Some(inline) = SmallZSetData::with_one(m, score) {
459 self.insert_entry(
460 SmallBytes::from_slice(key),
461 Entry::new(Value::SmallZSetInline(inline), None),
462 );
463 ZaddOutcome::AddedInline
464 } else {
465 let mut z = ZSetData::default();
466 z.insert(m, score);
467 self.insert_entry(
468 SmallBytes::from_slice(key),
469 Entry::new(Value::ZSet(Arc::new(z)), None),
470 );
471 ZaddOutcome::AddedInline
472 }
473 }
474}
475
476enum ZaddOutcome {
477 AddedInline,
478 UpdatedInline,
479 AddedHeap(i64),
480 UpdatedHeap,
481}