Skip to main content

ember_core/keyspace/
zset.rs

1use super::*;
2
3impl Keyspace {
4    /// Adds members with scores to a sorted set, with optional ZADD flags.
5    ///
6    /// Creates the sorted set if the key doesn't exist. Returns a
7    /// `ZAddResult` containing the count for the client response and the
8    /// list of members that were actually applied (for AOF correctness).
9    /// Returns `Err(WriteError::WrongType)` on type mismatch, or
10    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
11    pub fn zadd(
12        &mut self,
13        key: &str,
14        members: &[(f64, String)],
15        flags: &ZAddFlags,
16    ) -> Result<ZAddResult, WriteError> {
17        self.remove_if_expired(key);
18
19        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::SortedSet(_)))?;
20
21        // worst-case estimate: assume all members are new
22        let member_increase: usize = members
23            .iter()
24            .map(|(_, m)| SortedSet::estimated_member_cost(m))
25            .sum();
26        self.reserve_memory(is_new, key, SortedSet::BASE_OVERHEAD, member_increase)?;
27
28        if is_new {
29            self.insert_empty(key, Value::SortedSet(Box::default()));
30        }
31
32        let track_access = self.track_access;
33        let (count, applied) = self
34            .track_size(key, |entry| {
35                let Value::SortedSet(ref mut ss) = entry.value else {
36                    unreachable!("type verified by ensure_collection_type");
37                };
38                let mut count = 0;
39                let mut applied = Vec::new();
40                for (score, member) in members {
41                    let result = ss.add_with_flags(member, *score, flags);
42                    if result.added || result.updated {
43                        applied.push((*score, member.clone()));
44                    }
45                    if flags.ch {
46                        if result.added || result.updated {
47                            count += 1;
48                        }
49                    } else if result.added {
50                        count += 1;
51                    }
52                }
53                entry.touch(track_access);
54                (count, applied)
55            })
56            .unwrap_or_default();
57
58        // clean up if the set is still empty (e.g. XX flag on a new key)
59        if let Some(entry) = self.entries.get(key) {
60            if matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty()) {
61                self.memory.remove_with_size(entry.entry_size(key));
62                self.entries.remove(key);
63            }
64        }
65
66        Ok(ZAddResult { count, applied })
67    }
68
69    /// Removes members from a sorted set. Returns the names of members
70    /// that were actually removed (for AOF correctness). Deletes the key
71    /// if the set becomes empty.
72    ///
73    /// Returns `Err(WrongType)` if the key holds a non-sorted-set value.
74    pub fn zrem(&mut self, key: &str, members: &[String]) -> Result<Vec<String>, WrongType> {
75        if self.remove_if_expired(key) {
76            return Ok(vec![]);
77        }
78
79        let Some(entry) = self.entries.get(key) else {
80            return Ok(vec![]);
81        };
82        if !matches!(entry.value, Value::SortedSet(_)) {
83            return Err(WrongType);
84        }
85
86        let Some(entry) = self.entries.get_mut(key) else {
87            return Ok(vec![]);
88        };
89        let old_entry_size = entry.entry_size(key);
90        let track_access = self.track_access;
91        let mut removed = Vec::new();
92        let mut removed_bytes: usize = 0;
93        if let Value::SortedSet(ref mut ss) = entry.value {
94            for member in members {
95                if ss.remove(member) {
96                    removed_bytes += SortedSet::estimated_member_cost(member);
97                    removed.push(member.clone());
98                }
99            }
100        }
101        entry.touch(track_access);
102
103        let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
104        self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes);
105
106        if !removed.is_empty() {
107            self.bump_version(key);
108        }
109
110        Ok(removed)
111    }
112
113    /// Returns the score for a member in a sorted set.
114    ///
115    /// Returns `Ok(None)` if the key or member doesn't exist.
116    /// Returns `Err(WrongType)` on type mismatch.
117    pub fn zscore(&mut self, key: &str, member: &str) -> Result<Option<f64>, WrongType> {
118        let Some(entry) = self.get_live_entry(key) else {
119            return Ok(None);
120        };
121        match &entry.value {
122            Value::SortedSet(ss) => Ok(ss.score(member)),
123            _ => Err(WrongType),
124        }
125    }
126
127    /// Returns the 0-based rank of a member in a sorted set (lowest score = 0).
128    ///
129    /// Returns `Ok(None)` if the key or member doesn't exist.
130    /// Returns `Err(WrongType)` on type mismatch.
131    pub fn zrank(&mut self, key: &str, member: &str) -> Result<Option<usize>, WrongType> {
132        let Some(entry) = self.get_live_entry(key) else {
133            return Ok(None);
134        };
135        match &entry.value {
136            Value::SortedSet(ss) => Ok(ss.rank(member)),
137            _ => Err(WrongType),
138        }
139    }
140
141    /// Returns a range of members from a sorted set by rank.
142    ///
143    /// Supports negative indices. If `with_scores` is true, the result
144    /// includes `(member, score)` pairs; otherwise just members.
145    /// Returns `Err(WrongType)` on type mismatch.
146    pub fn zrange(
147        &mut self,
148        key: &str,
149        start: i64,
150        stop: i64,
151    ) -> Result<Vec<(String, f64)>, WrongType> {
152        let Some(entry) = self.get_live_entry(key) else {
153            return Ok(vec![]);
154        };
155        match &entry.value {
156            Value::SortedSet(ss) => {
157                let items = ss.range_by_rank(start, stop);
158                Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
159            }
160            _ => Err(WrongType),
161        }
162    }
163
164    /// Incrementally iterates members of a sorted set.
165    ///
166    /// Returns the next cursor and a batch of member-score pairs. A returned
167    /// cursor of `0` means the iteration is complete. Pattern matching
168    /// (MATCH) filters on member names.
169    pub fn scan_sorted_set(
170        &mut self,
171        key: &str,
172        cursor: u64,
173        count: usize,
174        pattern: Option<&str>,
175    ) -> Result<(u64, Vec<(String, f64)>), WrongType> {
176        let Some(entry) = self.get_live_entry(key) else {
177            return Ok((0, vec![]));
178        };
179        let Value::SortedSet(ref ss) = entry.value else {
180            return Err(WrongType);
181        };
182
183        let target = if count == 0 { 10 } else { count };
184        let compiled = pattern.map(GlobPattern::new);
185        let mut result = Vec::with_capacity(target);
186        let mut pos = 0u64;
187        let mut done = true;
188
189        for (member, score) in ss.iter() {
190            if pos < cursor {
191                pos += 1;
192                continue;
193            }
194            if let Some(ref pat) = compiled {
195                if !pat.matches(member) {
196                    pos += 1;
197                    continue;
198                }
199            }
200            result.push((member.to_owned(), score));
201            pos += 1;
202            if result.len() >= target {
203                done = false;
204                break;
205            }
206        }
207
208        Ok(if done { (0, result) } else { (pos, result) })
209    }
210
211    /// Returns the reverse rank of a member (highest score = 0).
212    ///
213    /// Returns `Ok(None)` if the key or member doesn't exist.
214    /// Returns `Err(WrongType)` on type mismatch.
215    pub fn zrevrank(&mut self, key: &str, member: &str) -> Result<Option<usize>, WrongType> {
216        let Some(entry) = self.get_live_entry(key) else {
217            return Ok(None);
218        };
219        match &entry.value {
220            Value::SortedSet(ss) => Ok(ss.rev_rank(member)),
221            _ => Err(WrongType),
222        }
223    }
224
225    /// Returns a range of members in reverse rank order (highest first).
226    pub fn zrevrange(
227        &mut self,
228        key: &str,
229        start: i64,
230        stop: i64,
231    ) -> Result<Vec<(String, f64)>, WrongType> {
232        let Some(entry) = self.get_live_entry(key) else {
233            return Ok(vec![]);
234        };
235        match &entry.value {
236            Value::SortedSet(ss) => {
237                let items = ss.rev_range_by_rank(start, stop);
238                Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
239            }
240            _ => Err(WrongType),
241        }
242    }
243
244    /// Counts members with scores in the given bounds.
245    pub fn zcount(
246        &mut self,
247        key: &str,
248        min: ScoreBound,
249        max: ScoreBound,
250    ) -> Result<usize, WrongType> {
251        let Some(entry) = self.get_live_entry(key) else {
252            return Ok(0);
253        };
254        match &entry.value {
255            Value::SortedSet(ss) => Ok(ss.count_by_score(min, max)),
256            _ => Err(WrongType),
257        }
258    }
259
260    /// Increments the score of a member in a sorted set. If the member
261    /// doesn't exist, it is added with the increment as its score.
262    /// If the key doesn't exist, a new sorted set is created.
263    ///
264    /// Returns the new score.
265    pub fn zincrby(&mut self, key: &str, increment: f64, member: &str) -> Result<f64, WriteError> {
266        self.remove_if_expired(key);
267
268        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::SortedSet(_)))?;
269
270        // worst case: member is new
271        self.reserve_memory(
272            is_new,
273            key,
274            SortedSet::BASE_OVERHEAD,
275            SortedSet::estimated_member_cost(member),
276        )?;
277
278        if is_new {
279            self.insert_empty(key, Value::SortedSet(Box::default()));
280        }
281
282        let track_access = self.track_access;
283        let new_score = self
284            .track_size(key, |entry| {
285                let Value::SortedSet(ref mut ss) = entry.value else {
286                    unreachable!("type verified by ensure_collection_type");
287                };
288                let score = ss.incr(member, increment);
289                entry.touch(track_access);
290                score
291            })
292            .unwrap_or(increment);
293
294        Ok(new_score)
295    }
296
297    /// Returns members with scores in the given range, in ascending order.
298    pub fn zrangebyscore(
299        &mut self,
300        key: &str,
301        min: ScoreBound,
302        max: ScoreBound,
303        offset: usize,
304        count: Option<usize>,
305    ) -> Result<Vec<(String, f64)>, WrongType> {
306        let Some(entry) = self.get_live_entry(key) else {
307            return Ok(vec![]);
308        };
309        match &entry.value {
310            Value::SortedSet(ss) => {
311                let items = ss.range_by_score(min, max, offset, count);
312                Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
313            }
314            _ => Err(WrongType),
315        }
316    }
317
318    /// Returns members with scores in the given range, in descending order.
319    pub fn zrevrangebyscore(
320        &mut self,
321        key: &str,
322        min: ScoreBound,
323        max: ScoreBound,
324        offset: usize,
325        count: Option<usize>,
326    ) -> Result<Vec<(String, f64)>, WrongType> {
327        let Some(entry) = self.get_live_entry(key) else {
328            return Ok(vec![]);
329        };
330        match &entry.value {
331            Value::SortedSet(ss) => {
332                let items = ss.rev_range_by_score(min, max, offset, count);
333                Ok(items.into_iter().map(|(m, s)| (m.to_owned(), s)).collect())
334            }
335            _ => Err(WrongType),
336        }
337    }
338
339    /// Removes and returns up to `count` members with the lowest scores.
340    /// Deletes the key if the set becomes empty.
341    pub fn zpopmin(&mut self, key: &str, count: usize) -> Result<Vec<(String, f64)>, WrongType> {
342        if self.remove_if_expired(key) {
343            return Ok(vec![]);
344        }
345        let Some(entry) = self.entries.get(key) else {
346            return Ok(vec![]);
347        };
348        if !matches!(entry.value, Value::SortedSet(_)) {
349            return Err(WrongType);
350        }
351
352        let Some(entry) = self.entries.get_mut(key) else {
353            return Ok(vec![]);
354        };
355        let old_entry_size = entry.entry_size(key);
356        let track_access = self.track_access;
357        let mut removed_bytes = 0usize;
358        let popped = if let Value::SortedSet(ref mut ss) = entry.value {
359            let items = ss.pop_min(count);
360            for (member, _) in &items {
361                removed_bytes += SortedSet::estimated_member_cost(member);
362            }
363            items
364        } else {
365            vec![]
366        };
367
368        if !popped.is_empty() {
369            entry.touch(track_access);
370        }
371
372        let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
373        self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes);
374
375        if !popped.is_empty() {
376            self.bump_version(key);
377        }
378
379        Ok(popped)
380    }
381
382    /// Removes and returns up to `count` members with the highest scores.
383    /// Deletes the key if the set becomes empty.
384    pub fn zpopmax(&mut self, key: &str, count: usize) -> Result<Vec<(String, f64)>, WrongType> {
385        if self.remove_if_expired(key) {
386            return Ok(vec![]);
387        }
388        let Some(entry) = self.entries.get(key) else {
389            return Ok(vec![]);
390        };
391        if !matches!(entry.value, Value::SortedSet(_)) {
392            return Err(WrongType);
393        }
394
395        let Some(entry) = self.entries.get_mut(key) else {
396            return Ok(vec![]);
397        };
398        let old_entry_size = entry.entry_size(key);
399        let track_access = self.track_access;
400        let mut removed_bytes = 0usize;
401        let popped = if let Value::SortedSet(ref mut ss) = entry.value {
402            let items = ss.pop_max(count);
403            for (member, _) in &items {
404                removed_bytes += SortedSet::estimated_member_cost(member);
405            }
406            items
407        } else {
408            vec![]
409        };
410
411        if !popped.is_empty() {
412            entry.touch(track_access);
413        }
414
415        let is_empty = matches!(&entry.value, Value::SortedSet(ss) if ss.is_empty());
416        self.cleanup_after_remove(key, old_entry_size, is_empty, removed_bytes);
417
418        if !popped.is_empty() {
419            self.bump_version(key);
420        }
421
422        Ok(popped)
423    }
424
425    /// Returns the number of members in a sorted set, or 0 if the key doesn't exist.
426    ///
427    /// Returns `Err(WrongType)` on type mismatch.
428    pub fn zcard(&mut self, key: &str) -> Result<usize, WrongType> {
429        if self.remove_if_expired(key) {
430            return Ok(0);
431        }
432        match self.entries.get(key) {
433            None => Ok(0),
434            Some(entry) => match &entry.value {
435                Value::SortedSet(ss) => Ok(ss.len()),
436                _ => Err(WrongType),
437            },
438        }
439    }
440
441    /// Returns members in the first sorted set that are not in any of the others.
442    ///
443    /// Missing keys are treated as empty sets. Results are ordered by score
444    /// then by member name.
445    pub fn zdiff(&mut self, keys: &[String]) -> Result<Vec<(String, f64)>, WrongType> {
446        if keys.is_empty() {
447            return Ok(vec![]);
448        }
449        for key in keys {
450            self.remove_if_expired(key);
451        }
452        let first: Vec<(String, f64)> = match self.entries.get(keys[0].as_str()) {
453            None => return Ok(vec![]),
454            Some(e) => match &e.value {
455                Value::SortedSet(ss) => ss.iter().map(|(m, s)| (m.to_owned(), s)).collect(),
456                _ => return Err(WrongType),
457            },
458        };
459        let mut excluded: AHashMap<String, ()> = AHashMap::new();
460        for key in &keys[1..] {
461            match self.entries.get(key.as_str()) {
462                None => {}
463                Some(e) => match &e.value {
464                    Value::SortedSet(ss) => {
465                        for (member, _) in ss.iter() {
466                            excluded.insert(member.to_owned(), ());
467                        }
468                    }
469                    _ => return Err(WrongType),
470                },
471            }
472        }
473        Ok(first
474            .into_iter()
475            .filter(|(m, _)| !excluded.contains_key(m))
476            .collect())
477    }
478
479    /// Returns members present in all of the given sorted sets, with scores summed.
480    ///
481    /// If any key is missing the result is empty. Results are ordered by
482    /// score then member name.
483    pub fn zinter(&mut self, keys: &[String]) -> Result<Vec<(String, f64)>, WrongType> {
484        if keys.is_empty() {
485            return Ok(vec![]);
486        }
487        for key in keys {
488            self.remove_if_expired(key);
489        }
490        let mut candidates: Vec<(String, f64)> = match self.entries.get(keys[0].as_str()) {
491            None => return Ok(vec![]),
492            Some(e) => match &e.value {
493                Value::SortedSet(ss) => ss.iter().map(|(m, s)| (m.to_owned(), s)).collect(),
494                _ => return Err(WrongType),
495            },
496        };
497        for key in &keys[1..] {
498            match self.entries.get(key.as_str()) {
499                None => return Ok(vec![]),
500                Some(e) => match &e.value {
501                    Value::SortedSet(ss) => {
502                        let lookup: AHashMap<String, f64> =
503                            ss.iter().map(|(m, s)| (m.to_owned(), s)).collect();
504                        candidates = candidates
505                            .into_iter()
506                            .filter_map(|(m, score)| lookup.get(&m).map(|&s| (m, score + s)))
507                            .collect();
508                    }
509                    _ => return Err(WrongType),
510                },
511            }
512        }
513        candidates.sort_by(|(am, as_), (bm, bs)| {
514            as_.partial_cmp(bs)
515                .unwrap_or(std::cmp::Ordering::Equal)
516                .then_with(|| am.cmp(bm))
517        });
518        Ok(candidates)
519    }
520
521    /// Returns the union of all given sorted sets, with scores summed across keys.
522    ///
523    /// Missing keys contribute no members. Results are ordered by score then
524    /// member name.
525    pub fn zunion(&mut self, keys: &[String]) -> Result<Vec<(String, f64)>, WrongType> {
526        if keys.is_empty() {
527            return Ok(vec![]);
528        }
529        for key in keys {
530            self.remove_if_expired(key);
531        }
532        let mut totals: AHashMap<String, f64> = AHashMap::new();
533        for key in keys {
534            match self.entries.get(key.as_str()) {
535                None => {}
536                Some(e) => match &e.value {
537                    Value::SortedSet(ss) => {
538                        for (member, score) in ss.iter() {
539                            *totals.entry(member.to_owned()).or_insert(0.0) += score;
540                        }
541                    }
542                    _ => return Err(WrongType),
543                },
544            }
545        }
546        let mut result: Vec<(String, f64)> = totals.into_iter().collect();
547        result.sort_by(|(am, as_), (bm, bs)| {
548            as_.partial_cmp(bs)
549                .unwrap_or(std::cmp::Ordering::Equal)
550                .then_with(|| am.cmp(bm))
551        });
552        Ok(result)
553    }
554
555    /// Computes the diff of sorted sets and stores the result in `dest`.
556    ///
557    /// Equivalent to calling `zdiff` then writing the result to `dest`.
558    /// Replaces any existing value at `dest`. Returns the count of stored
559    /// members and the scored pairs (for AOF persistence).
560    pub fn zdiffstore(
561        &mut self,
562        dest: &str,
563        keys: &[String],
564    ) -> Result<(usize, Vec<(f64, String)>), WrongType> {
565        let members = self.zdiff(keys)?;
566        self.zstore_result(dest, members)
567    }
568
569    /// Computes the intersection of sorted sets and stores the result in `dest`.
570    ///
571    /// Equivalent to calling `zinter` then writing the result to `dest`.
572    /// Replaces any existing value at `dest`. Returns the count of stored
573    /// members and the scored pairs (for AOF persistence).
574    pub fn zinterstore(
575        &mut self,
576        dest: &str,
577        keys: &[String],
578    ) -> Result<(usize, Vec<(f64, String)>), WrongType> {
579        let members = self.zinter(keys)?;
580        self.zstore_result(dest, members)
581    }
582
583    /// Computes the union of sorted sets and stores the result in `dest`.
584    ///
585    /// Equivalent to calling `zunion` then writing the result to `dest`.
586    /// Replaces any existing value at `dest`. Returns the count of stored
587    /// members and the scored pairs (for AOF persistence).
588    pub fn zunionstore(
589        &mut self,
590        dest: &str,
591        keys: &[String],
592    ) -> Result<(usize, Vec<(f64, String)>), WrongType> {
593        let members = self.zunion(keys)?;
594        self.zstore_result(dest, members)
595    }
596
597    /// Writes a computed sorted set result to `dest`, replacing any existing key.
598    ///
599    /// Returns the cardinality and the stored members (score, member) pairs for AOF.
600    fn zstore_result(
601        &mut self,
602        dest: &str,
603        members: Vec<(String, f64)>,
604    ) -> Result<(usize, Vec<(f64, String)>), WrongType> {
605        // remove any existing entry at dest (any type)
606        self.remove_if_expired(dest);
607        if let Some(old) = self.entries.remove(dest) {
608            self.memory.remove(dest, &old.value);
609            self.decrement_expiry_if_set(&old);
610            self.defer_drop(old.value);
611        }
612
613        let count = members.len();
614        if count == 0 {
615            return Ok((0, vec![]));
616        }
617
618        let mut ss = SortedSet::default();
619        let flags = ZAddFlags::default();
620        for (member, score) in &members {
621            ss.add_with_flags(member, *score, &flags);
622        }
623
624        let value = Value::SortedSet(Box::new(ss));
625        self.memory.add(dest, &value);
626        let entry = Entry::new(value, None);
627        self.entries.insert(CompactString::from(dest), entry);
628        self.bump_version(dest);
629
630        // return as (score, member) to match the ZAdd AOF record convention
631        let stored: Vec<(f64, String)> = members.into_iter().map(|(m, s)| (s, m)).collect();
632        Ok((count, stored))
633    }
634
635    /// Returns random member(s) from a sorted set.
636    ///
637    /// - `count = None`: return one random member as a single string (no score)
638    /// - `count > 0`: return up to count distinct members
639    /// - `count < 0`: return |count| members, allowing duplicates
640    ///
641    /// If `with_scores` is true and count is `Some`, returns `(member, Some(score))` pairs.
642    /// When `count` is `None`, the score field is always `None`.
643    pub fn zrandmember(
644        &mut self,
645        key: &str,
646        count: Option<i64>,
647        with_scores: bool,
648    ) -> Result<Vec<(String, Option<f64>)>, WrongType> {
649        let Some(entry) = self.get_live_entry(key) else {
650            return Ok(vec![]);
651        };
652        let Value::SortedSet(ref zset) = entry.value else {
653            return Err(WrongType);
654        };
655        if zset.is_empty() {
656            return Ok(vec![]);
657        }
658
659        // collect into a vec for indexed random access
660        let members: Vec<(&str, f64)> = zset.iter().collect();
661        let mut rng = rand::rng();
662
663        let result = match count {
664            None => {
665                // single random member — no score even if with_scores is set
666                use rand::seq::IteratorRandom;
667                members
668                    .iter()
669                    .choose(&mut rng)
670                    .map(|(m, _)| ((*m).to_owned(), None))
671                    .into_iter()
672                    .collect()
673            }
674            Some(n) if n > 0 => {
675                use rand::seq::IteratorRandom;
676                let n = (n as usize).min(members.len());
677                members
678                    .iter()
679                    .choose_multiple(&mut rng, n)
680                    .into_iter()
681                    .map(|(m, s)| {
682                        let score = if with_scores { Some(*s) } else { None };
683                        ((*m).to_owned(), score)
684                    })
685                    .collect()
686            }
687            Some(n) => {
688                // negative count: allow duplicates, return |n| entries
689                use rand::Rng;
690                let n = n.unsigned_abs() as usize;
691                (0..n)
692                    .map(|_| {
693                        let idx = rng.random_range(0..members.len());
694                        let (m, s) = members[idx];
695                        let score = if with_scores { Some(s) } else { None };
696                        (m.to_owned(), score)
697                    })
698                    .collect()
699            }
700        };
701
702        Ok(result)
703    }
704}
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709
710    #[test]
711    fn zadd_creates_sorted_set() {
712        let mut ks = Keyspace::new();
713        let result = ks
714            .zadd(
715                "board",
716                &[(100.0, "alice".into()), (200.0, "bob".into())],
717                &ZAddFlags::default(),
718            )
719            .unwrap();
720        assert_eq!(result.count, 2);
721        assert_eq!(result.applied.len(), 2);
722        assert_eq!(ks.value_type("board"), "zset");
723    }
724
725    #[test]
726    fn zadd_updates_existing_score() {
727        let mut ks = Keyspace::new();
728        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
729            .unwrap();
730        // update score — default flags don't count updates
731        let result = ks
732            .zadd("z", &[(200.0, "alice".into())], &ZAddFlags::default())
733            .unwrap();
734        assert_eq!(result.count, 0);
735        // score was updated, so applied should have the member
736        assert_eq!(result.applied.len(), 1);
737        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
738    }
739
740    #[test]
741    fn zadd_ch_flag_counts_changes() {
742        let mut ks = Keyspace::new();
743        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
744            .unwrap();
745        let flags = ZAddFlags {
746            ch: true,
747            ..Default::default()
748        };
749        let result = ks
750            .zadd(
751                "z",
752                &[(200.0, "alice".into()), (50.0, "bob".into())],
753                &flags,
754            )
755            .unwrap();
756        // 1 updated + 1 added = 2
757        assert_eq!(result.count, 2);
758        assert_eq!(result.applied.len(), 2);
759    }
760
761    #[test]
762    fn zadd_nx_skips_existing() {
763        let mut ks = Keyspace::new();
764        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
765            .unwrap();
766        let flags = ZAddFlags {
767            nx: true,
768            ..Default::default()
769        };
770        let result = ks.zadd("z", &[(999.0, "alice".into())], &flags).unwrap();
771        assert_eq!(result.count, 0);
772        assert!(result.applied.is_empty());
773        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
774    }
775
776    #[test]
777    fn zadd_xx_skips_new() {
778        let mut ks = Keyspace::new();
779        let flags = ZAddFlags {
780            xx: true,
781            ..Default::default()
782        };
783        let result = ks.zadd("z", &[(100.0, "alice".into())], &flags).unwrap();
784        assert_eq!(result.count, 0);
785        assert!(result.applied.is_empty());
786        // key should be cleaned up since nothing was added
787        assert_eq!(ks.value_type("z"), "none");
788    }
789
790    #[test]
791    fn zadd_gt_only_increases() {
792        let mut ks = Keyspace::new();
793        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
794            .unwrap();
795        let flags = ZAddFlags {
796            gt: true,
797            ..Default::default()
798        };
799        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
800        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
801        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
802        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(200.0));
803    }
804
805    #[test]
806    fn zadd_lt_only_decreases() {
807        let mut ks = Keyspace::new();
808        ks.zadd("z", &[(100.0, "alice".into())], &ZAddFlags::default())
809            .unwrap();
810        let flags = ZAddFlags {
811            lt: true,
812            ..Default::default()
813        };
814        ks.zadd("z", &[(200.0, "alice".into())], &flags).unwrap();
815        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(100.0));
816        ks.zadd("z", &[(50.0, "alice".into())], &flags).unwrap();
817        assert_eq!(ks.zscore("z", "alice").unwrap(), Some(50.0));
818    }
819
820    #[test]
821    fn zrem_removes_members() {
822        let mut ks = Keyspace::new();
823        ks.zadd(
824            "z",
825            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
826            &ZAddFlags::default(),
827        )
828        .unwrap();
829        let removed = ks
830            .zrem("z", &["a".into(), "c".into(), "nonexistent".into()])
831            .unwrap();
832        assert_eq!(removed.len(), 2);
833        assert!(removed.contains(&"a".to_owned()));
834        assert!(removed.contains(&"c".to_owned()));
835        assert_eq!(ks.zscore("z", "a").unwrap(), None);
836        assert_eq!(ks.zscore("z", "b").unwrap(), Some(2.0));
837    }
838
839    #[test]
840    fn zrem_auto_deletes_empty() {
841        let mut ks = Keyspace::new();
842        ks.zadd("z", &[(1.0, "only".into())], &ZAddFlags::default())
843            .unwrap();
844        ks.zrem("z", &["only".into()]).unwrap();
845        assert!(!ks.exists("z"));
846        assert_eq!(ks.stats().key_count, 0);
847    }
848
849    #[test]
850    fn zrem_missing_key() {
851        let mut ks = Keyspace::new();
852        assert!(ks.zrem("nope", &["a".into()]).unwrap().is_empty());
853    }
854
855    #[test]
856    fn zscore_returns_score() {
857        let mut ks = Keyspace::new();
858        ks.zadd("z", &[(42.5, "member".into())], &ZAddFlags::default())
859            .unwrap();
860        assert_eq!(ks.zscore("z", "member").unwrap(), Some(42.5));
861        assert_eq!(ks.zscore("z", "missing").unwrap(), None);
862    }
863
864    #[test]
865    fn zscore_missing_key() {
866        let mut ks = Keyspace::new();
867        assert_eq!(ks.zscore("nope", "m").unwrap(), None);
868    }
869
870    #[test]
871    fn zrank_returns_rank() {
872        let mut ks = Keyspace::new();
873        ks.zadd(
874            "z",
875            &[
876                (300.0, "c".into()),
877                (100.0, "a".into()),
878                (200.0, "b".into()),
879            ],
880            &ZAddFlags::default(),
881        )
882        .unwrap();
883        assert_eq!(ks.zrank("z", "a").unwrap(), Some(0));
884        assert_eq!(ks.zrank("z", "b").unwrap(), Some(1));
885        assert_eq!(ks.zrank("z", "c").unwrap(), Some(2));
886        assert_eq!(ks.zrank("z", "d").unwrap(), None);
887    }
888
889    #[test]
890    fn zrange_returns_range() {
891        let mut ks = Keyspace::new();
892        ks.zadd(
893            "z",
894            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
895            &ZAddFlags::default(),
896        )
897        .unwrap();
898
899        let all = ks.zrange("z", 0, -1).unwrap();
900        assert_eq!(
901            all,
902            vec![
903                ("a".to_owned(), 1.0),
904                ("b".to_owned(), 2.0),
905                ("c".to_owned(), 3.0),
906            ]
907        );
908
909        let middle = ks.zrange("z", 1, 1).unwrap();
910        assert_eq!(middle, vec![("b".to_owned(), 2.0)]);
911
912        let last_two = ks.zrange("z", -2, -1).unwrap();
913        assert_eq!(last_two, vec![("b".to_owned(), 2.0), ("c".to_owned(), 3.0)]);
914    }
915
916    #[test]
917    fn zrange_missing_key() {
918        let mut ks = Keyspace::new();
919        assert!(ks.zrange("nope", 0, -1).unwrap().is_empty());
920    }
921
922    #[test]
923    fn zadd_on_string_key_returns_wrongtype() {
924        let mut ks = Keyspace::new();
925        ks.set("s".into(), Bytes::from("val"), None, false, false);
926        assert!(ks
927            .zadd("s", &[(1.0, "m".into())], &ZAddFlags::default())
928            .is_err());
929    }
930
931    #[test]
932    fn zrem_on_string_key_returns_wrongtype() {
933        let mut ks = Keyspace::new();
934        ks.set("s".into(), Bytes::from("val"), None, false, false);
935        assert!(ks.zrem("s", &["m".into()]).is_err());
936    }
937
938    #[test]
939    fn zscore_on_list_key_returns_wrongtype() {
940        let mut ks = Keyspace::new();
941        ks.rpush("l", &[Bytes::from("item")]).unwrap();
942        assert!(ks.zscore("l", "m").is_err());
943    }
944
945    #[test]
946    fn zrank_on_string_key_returns_wrongtype() {
947        let mut ks = Keyspace::new();
948        ks.set("s".into(), Bytes::from("val"), None, false, false);
949        assert!(ks.zrank("s", "m").is_err());
950    }
951
952    #[test]
953    fn zrange_on_string_key_returns_wrongtype() {
954        let mut ks = Keyspace::new();
955        ks.set("s".into(), Bytes::from("val"), None, false, false);
956        assert!(ks.zrange("s", 0, -1).is_err());
957    }
958
959    #[test]
960    fn sorted_set_memory_tracked() {
961        let mut ks = Keyspace::new();
962        let before = ks.stats().used_bytes;
963        ks.zadd("z", &[(1.0, "alice".into())], &ZAddFlags::default())
964            .unwrap();
965        let after_add = ks.stats().used_bytes;
966        assert!(after_add > before);
967
968        ks.zadd("z", &[(2.0, "bob".into())], &ZAddFlags::default())
969            .unwrap();
970        let after_second = ks.stats().used_bytes;
971        assert!(after_second > after_add);
972
973        ks.zrem("z", &["alice".into()]).unwrap();
974        let after_remove = ks.stats().used_bytes;
975        assert!(after_remove < after_second);
976    }
977
978    #[test]
979    fn zrem_returns_actually_removed_members() {
980        let mut ks = Keyspace::new();
981        ks.zadd(
982            "z",
983            &[(1.0, "a".into()), (2.0, "b".into())],
984            &ZAddFlags::default(),
985        )
986        .unwrap();
987        // "a" exists, "ghost" doesn't — only "a" should be in the result
988        let removed = ks.zrem("z", &["a".into(), "ghost".into()]).unwrap();
989        assert_eq!(removed, vec!["a".to_owned()]);
990    }
991
992    #[test]
993    fn zcard_returns_count() {
994        let mut ks = Keyspace::new();
995        ks.zadd(
996            "z",
997            &[(1.0, "a".into()), (2.0, "b".into())],
998            &ZAddFlags::default(),
999        )
1000        .unwrap();
1001        assert_eq!(ks.zcard("z").unwrap(), 2);
1002    }
1003
1004    #[test]
1005    fn zcard_missing_key_returns_zero() {
1006        let mut ks = Keyspace::new();
1007        assert_eq!(ks.zcard("missing").unwrap(), 0);
1008    }
1009
1010    #[test]
1011    fn zcard_on_string_key_returns_wrongtype() {
1012        let mut ks = Keyspace::new();
1013        ks.set("s".into(), Bytes::from("val"), None, false, false);
1014        assert!(ks.zcard("s").is_err());
1015    }
1016
1017    // --- scan_sorted_set ---
1018
1019    #[test]
1020    fn scan_zset_returns_all() {
1021        let mut ks = Keyspace::new();
1022        ks.zadd(
1023            "z",
1024            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1025            &ZAddFlags::default(),
1026        )
1027        .unwrap();
1028        let (cursor, members) = ks.scan_sorted_set("z", 0, 100, None).unwrap();
1029        assert_eq!(cursor, 0);
1030        assert_eq!(members.len(), 3);
1031        // sorted set iteration is score-ordered
1032        assert_eq!(members[0].0, "a");
1033        assert_eq!(members[2].0, "c");
1034    }
1035
1036    #[test]
1037    fn scan_zset_missing_key() {
1038        let mut ks = Keyspace::new();
1039        let (cursor, members) = ks.scan_sorted_set("missing", 0, 10, None).unwrap();
1040        assert_eq!(cursor, 0);
1041        assert!(members.is_empty());
1042    }
1043
1044    #[test]
1045    fn scan_zset_wrong_type() {
1046        let mut ks = Keyspace::new();
1047        ks.set("z".into(), Bytes::from("string"), None, false, false);
1048        assert!(ks.scan_sorted_set("z", 0, 10, None).is_err());
1049    }
1050
1051    #[test]
1052    fn scan_zset_with_pattern() {
1053        let mut ks = Keyspace::new();
1054        ks.zadd(
1055            "z",
1056            &[
1057                (1.0, "player:1".into()),
1058                (2.0, "player:2".into()),
1059                (3.0, "enemy:1".into()),
1060            ],
1061            &ZAddFlags::default(),
1062        )
1063        .unwrap();
1064        let (_, members) = ks.scan_sorted_set("z", 0, 100, Some("player:*")).unwrap();
1065        assert_eq!(members.len(), 2);
1066        assert!(members.iter().all(|(m, _)| m.starts_with("player:")));
1067    }
1068
1069    #[test]
1070    fn scan_zset_pagination() {
1071        let mut ks = Keyspace::new();
1072        let items: Vec<(f64, String)> = (0..20).map(|i| (i as f64, format!("m{i}"))).collect();
1073        ks.zadd("z", &items, &ZAddFlags::default()).unwrap();
1074
1075        let mut collected = Vec::new();
1076        let mut cursor = 0u64;
1077        loop {
1078            let (next, batch) = ks.scan_sorted_set("z", cursor, 5, None).unwrap();
1079            collected.extend(batch);
1080            if next == 0 {
1081                break;
1082            }
1083            cursor = next;
1084        }
1085        assert_eq!(collected.len(), 20);
1086    }
1087
1088    #[test]
1089    fn zadd_rejects_when_memory_full() {
1090        let config = ShardConfig {
1091            max_memory: Some(150),
1092            eviction_policy: EvictionPolicy::NoEviction,
1093            ..ShardConfig::default()
1094        };
1095        let mut ks = Keyspace::with_config(config);
1096
1097        assert_eq!(
1098            ks.set("a".into(), Bytes::from("val"), None, false, false),
1099            SetResult::Ok
1100        );
1101
1102        let result = ks.zadd("z", &[(1.0, "member".into())], &ZAddFlags::default());
1103        assert!(matches!(result, Err(WriteError::OutOfMemory)));
1104
1105        // original key should be untouched
1106        assert!(ks.exists("a"));
1107    }
1108
1109    #[test]
1110    fn zdiff_returns_members_unique_to_first() {
1111        let mut ks = Keyspace::new();
1112        ks.zadd(
1113            "a",
1114            &[(1.0, "x".into()), (2.0, "y".into()), (3.0, "z".into())],
1115            &ZAddFlags::default(),
1116        )
1117        .unwrap();
1118        ks.zadd(
1119            "b",
1120            &[(1.0, "y".into()), (1.0, "w".into())],
1121            &ZAddFlags::default(),
1122        )
1123        .unwrap();
1124
1125        let keys = vec!["a".to_owned(), "b".to_owned()];
1126        let diff = ks.zdiff(&keys).unwrap();
1127        let members: Vec<&str> = diff.iter().map(|(m, _)| m.as_str()).collect();
1128        assert!(members.contains(&"x"));
1129        assert!(members.contains(&"z"));
1130        assert!(!members.contains(&"y"));
1131    }
1132
1133    #[test]
1134    fn zdiff_with_missing_second_key_returns_all() {
1135        let mut ks = Keyspace::new();
1136        ks.zadd("a", &[(1.0, "x".into())], &ZAddFlags::default())
1137            .unwrap();
1138        let keys = vec!["a".to_owned(), "missing".to_owned()];
1139        let diff = ks.zdiff(&keys).unwrap();
1140        assert_eq!(diff.len(), 1);
1141        assert_eq!(diff[0].0, "x");
1142    }
1143
1144    #[test]
1145    fn zinter_returns_common_members_with_summed_scores() {
1146        let mut ks = Keyspace::new();
1147        ks.zadd(
1148            "a",
1149            &[(1.0, "x".into()), (2.0, "y".into())],
1150            &ZAddFlags::default(),
1151        )
1152        .unwrap();
1153        ks.zadd(
1154            "b",
1155            &[(3.0, "x".into()), (4.0, "z".into())],
1156            &ZAddFlags::default(),
1157        )
1158        .unwrap();
1159
1160        let keys = vec!["a".to_owned(), "b".to_owned()];
1161        let inter = ks.zinter(&keys).unwrap();
1162        assert_eq!(inter.len(), 1);
1163        assert_eq!(inter[0].0, "x");
1164        assert!((inter[0].1 - 4.0).abs() < f64::EPSILON); // 1.0 + 3.0
1165    }
1166
1167    #[test]
1168    fn zinter_empty_when_no_common_members() {
1169        let mut ks = Keyspace::new();
1170        ks.zadd("a", &[(1.0, "x".into())], &ZAddFlags::default())
1171            .unwrap();
1172        ks.zadd("b", &[(1.0, "y".into())], &ZAddFlags::default())
1173            .unwrap();
1174        let keys = vec!["a".to_owned(), "b".to_owned()];
1175        assert!(ks.zinter(&keys).unwrap().is_empty());
1176    }
1177
1178    #[test]
1179    fn zunion_combines_all_members_with_summed_scores() {
1180        let mut ks = Keyspace::new();
1181        ks.zadd(
1182            "a",
1183            &[(1.0, "x".into()), (2.0, "y".into())],
1184            &ZAddFlags::default(),
1185        )
1186        .unwrap();
1187        ks.zadd(
1188            "b",
1189            &[(3.0, "x".into()), (4.0, "z".into())],
1190            &ZAddFlags::default(),
1191        )
1192        .unwrap();
1193
1194        let keys = vec!["a".to_owned(), "b".to_owned()];
1195        let union = ks.zunion(&keys).unwrap();
1196        assert_eq!(union.len(), 3); // x, y, z
1197        let x = union.iter().find(|(m, _)| m == "x").unwrap();
1198        assert!((x.1 - 4.0).abs() < f64::EPSILON); // 1.0 + 3.0
1199    }
1200
1201    #[test]
1202    fn zdiff_wrong_type_returns_error() {
1203        let mut ks = Keyspace::new();
1204        ks.set("s".into(), Bytes::from("v"), None, false, false);
1205        let keys = vec!["s".to_owned()];
1206        assert!(ks.zdiff(&keys).is_err());
1207    }
1208
1209    // --- zrandmember ---
1210
1211    #[test]
1212    fn zrandmember_no_count_returns_single_member() {
1213        let mut ks = Keyspace::new();
1214        ks.zadd(
1215            "z",
1216            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1217            &ZAddFlags::default(),
1218        )
1219        .unwrap();
1220        let result = ks.zrandmember("z", None, false).unwrap();
1221        assert_eq!(result.len(), 1);
1222        assert!(["a", "b", "c"].contains(&result[0].0.as_str()));
1223        // no count means no score
1224        assert!(result[0].1.is_none());
1225    }
1226
1227    #[test]
1228    fn zrandmember_positive_count_distinct() {
1229        let mut ks = Keyspace::new();
1230        ks.zadd(
1231            "z",
1232            &[(1.0, "a".into()), (2.0, "b".into()), (3.0, "c".into())],
1233            &ZAddFlags::default(),
1234        )
1235        .unwrap();
1236        let result = ks.zrandmember("z", Some(2), false).unwrap();
1237        assert_eq!(result.len(), 2);
1238        for (m, s) in &result {
1239            assert!(["a", "b", "c"].contains(&m.as_str()));
1240            assert!(s.is_none());
1241        }
1242        // distinct
1243        let unique: std::collections::HashSet<_> = result.iter().map(|(m, _)| m).collect();
1244        assert_eq!(unique.len(), 2);
1245    }
1246
1247    #[test]
1248    fn zrandmember_positive_count_capped_at_set_size() {
1249        let mut ks = Keyspace::new();
1250        ks.zadd(
1251            "z",
1252            &[(1.0, "a".into()), (2.0, "b".into())],
1253            &ZAddFlags::default(),
1254        )
1255        .unwrap();
1256        let result = ks.zrandmember("z", Some(10), false).unwrap();
1257        assert_eq!(result.len(), 2);
1258    }
1259
1260    #[test]
1261    fn zrandmember_negative_count_allows_duplicates() {
1262        let mut ks = Keyspace::new();
1263        ks.zadd("z", &[(1.0, "only".into())], &ZAddFlags::default())
1264            .unwrap();
1265        let result = ks.zrandmember("z", Some(-5), false).unwrap();
1266        assert_eq!(result.len(), 5);
1267        assert!(result.iter().all(|(m, _)| m == "only"));
1268    }
1269
1270    #[test]
1271    fn zrandmember_with_scores() {
1272        let mut ks = Keyspace::new();
1273        ks.zadd(
1274            "z",
1275            &[(1.0, "a".into()), (2.0, "b".into())],
1276            &ZAddFlags::default(),
1277        )
1278        .unwrap();
1279        let result = ks.zrandmember("z", Some(2), true).unwrap();
1280        assert_eq!(result.len(), 2);
1281        for (m, s) in &result {
1282            assert!(["a", "b"].contains(&m.as_str()));
1283            assert!(s.is_some());
1284        }
1285    }
1286
1287    #[test]
1288    fn zrandmember_missing_key_returns_empty() {
1289        let mut ks = Keyspace::new();
1290        assert!(ks.zrandmember("missing", None, false).unwrap().is_empty());
1291        assert!(ks.zrandmember("missing", Some(5), true).unwrap().is_empty());
1292    }
1293
1294    #[test]
1295    fn zrandmember_wrong_type_returns_error() {
1296        let mut ks = Keyspace::new();
1297        ks.set("s".into(), Bytes::from("val"), None, false, false);
1298        assert!(ks.zrandmember("s", None, false).is_err());
1299    }
1300
1301    // --- zdiffstore / zinterstore / zunionstore ---
1302
1303    #[test]
1304    fn zdiffstore_basic() {
1305        let mut ks = Keyspace::new();
1306        ks.zadd(
1307            "a",
1308            &[(1.0, "x".into()), (2.0, "y".into()), (3.0, "z".into())],
1309            &ZAddFlags::default(),
1310        )
1311        .unwrap();
1312        ks.zadd("b", &[(1.0, "y".into())], &ZAddFlags::default())
1313            .unwrap();
1314
1315        let keys = vec!["a".to_owned(), "b".to_owned()];
1316        let (count, _stored) = ks.zdiffstore("dest", &keys).unwrap();
1317        assert_eq!(count, 2); // x, z
1318
1319        // dest key should now hold a sorted set
1320        assert_eq!(ks.value_type("dest"), "zset");
1321        let members = ks.zrange("dest", 0, -1).unwrap();
1322        let names: Vec<&str> = members.iter().map(|(m, _)| m.as_str()).collect();
1323        assert!(names.contains(&"x"));
1324        assert!(names.contains(&"z"));
1325        assert!(!names.contains(&"y"));
1326    }
1327
1328    #[test]
1329    fn zinterstore_basic() {
1330        let mut ks = Keyspace::new();
1331        ks.zadd(
1332            "a",
1333            &[(1.0, "x".into()), (2.0, "y".into())],
1334            &ZAddFlags::default(),
1335        )
1336        .unwrap();
1337        ks.zadd(
1338            "b",
1339            &[(3.0, "x".into()), (4.0, "z".into())],
1340            &ZAddFlags::default(),
1341        )
1342        .unwrap();
1343
1344        let keys = vec!["a".to_owned(), "b".to_owned()];
1345        let (count, _stored) = ks.zinterstore("dest", &keys).unwrap();
1346        assert_eq!(count, 1); // only x is in both
1347
1348        let members = ks.zrange("dest", 0, -1).unwrap();
1349        assert_eq!(members.len(), 1);
1350        assert_eq!(members[0].0, "x");
1351        // score is summed: 1.0 + 3.0 = 4.0
1352        assert!((members[0].1 - 4.0).abs() < f64::EPSILON);
1353    }
1354
1355    #[test]
1356    fn zunionstore_basic() {
1357        let mut ks = Keyspace::new();
1358        ks.zadd(
1359            "a",
1360            &[(1.0, "x".into()), (2.0, "y".into())],
1361            &ZAddFlags::default(),
1362        )
1363        .unwrap();
1364        ks.zadd(
1365            "b",
1366            &[(3.0, "x".into()), (4.0, "z".into())],
1367            &ZAddFlags::default(),
1368        )
1369        .unwrap();
1370
1371        let keys = vec!["a".to_owned(), "b".to_owned()];
1372        let (count, _stored) = ks.zunionstore("dest", &keys).unwrap();
1373        assert_eq!(count, 3); // x, y, z
1374
1375        let members = ks.zrange("dest", 0, -1).unwrap();
1376        assert_eq!(members.len(), 3);
1377        let x = members.iter().find(|(m, _)| m == "x").unwrap();
1378        // score is summed: 1.0 + 3.0 = 4.0
1379        assert!((x.1 - 4.0).abs() < f64::EPSILON);
1380    }
1381
1382    #[test]
1383    fn zstore_overwrites_existing_dest() {
1384        let mut ks = Keyspace::new();
1385        ks.zadd("a", &[(1.0, "x".into())], &ZAddFlags::default())
1386            .unwrap();
1387        // put something at dest first
1388        ks.set("dest".into(), Bytes::from("old"), None, false, false);
1389
1390        let keys = vec!["a".to_owned()];
1391        let (count, _) = ks.zunionstore("dest", &keys).unwrap();
1392        assert_eq!(count, 1);
1393        assert_eq!(ks.value_type("dest"), "zset");
1394    }
1395
1396    #[test]
1397    fn zstore_empty_result_removes_dest() {
1398        let mut ks = Keyspace::new();
1399        ks.zadd("a", &[(1.0, "x".into())], &ZAddFlags::default())
1400            .unwrap();
1401        ks.zadd("b", &[(1.0, "x".into())], &ZAddFlags::default())
1402            .unwrap();
1403        // intersection of disjoint sets is empty
1404        ks.zadd("dest", &[(5.0, "old".into())], &ZAddFlags::default())
1405            .unwrap();
1406
1407        let keys = vec!["a".to_owned(), "b".to_owned()];
1408        // zdiff of a and b where b has all of a's members → empty
1409        let keys_diff = vec!["a".to_owned(), "b".to_owned()];
1410        let (count, _) = ks.zdiffstore("dest", &keys_diff).unwrap();
1411        assert_eq!(count, 0);
1412        // dest should be removed when result is empty
1413        assert_eq!(ks.value_type("dest"), "none");
1414
1415        // also check zinterstore on disjoint sets
1416        ks.zadd("c", &[(1.0, "p".into())], &ZAddFlags::default())
1417            .unwrap();
1418        ks.zadd("d", &[(1.0, "q".into())], &ZAddFlags::default())
1419            .unwrap();
1420        ks.zadd("dest2", &[(5.0, "old".into())], &ZAddFlags::default())
1421            .unwrap();
1422        let keys_inter = vec!["c".to_owned(), "d".to_owned()];
1423        let (count2, _) = ks.zinterstore("dest2", &keys_inter).unwrap();
1424        assert_eq!(count2, 0);
1425        assert_eq!(ks.value_type("dest2"), "none");
1426        _ = keys;
1427    }
1428
1429    #[test]
1430    fn zstore_wrong_type_returns_error() {
1431        let mut ks = Keyspace::new();
1432        ks.set("s".into(), Bytes::from("val"), None, false, false);
1433        let keys = vec!["s".to_owned()];
1434        assert!(ks.zunionstore("dest", &keys).is_err());
1435        assert!(ks.zinterstore("dest", &keys).is_err());
1436        assert!(ks.zdiffstore("dest", &keys).is_err());
1437    }
1438}