Skip to main content

ember_core/keyspace/
list.rs

1use super::*;
2
3/// Resolves a signed index into a VecDeque position.
4/// Negative indices count from the end (-1 = last element).
5/// Returns `None` if the resolved index is out of bounds.
6fn resolve_index(index: i64, len: usize) -> Option<usize> {
7    let resolved = if index < 0 {
8        (len as i64).checked_add(index)?
9    } else {
10        index
11    };
12    if resolved < 0 || resolved >= len as i64 {
13        None
14    } else {
15        Some(resolved as usize)
16    }
17}
18
19impl Keyspace {
20    /// Pushes one or more values to the head (left) of a list.
21    ///
22    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
23    /// if the key exists but holds a non-list value, or
24    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
25    /// Returns the new length on success.
26    pub fn lpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
27        self.list_push(key, values, true)
28    }
29
30    /// Pushes one or more values to the tail (right) of a list.
31    ///
32    /// Creates the list if the key doesn't exist. Returns `Err(WriteError::WrongType)`
33    /// if the key exists but holds a non-list value, or
34    /// `Err(WriteError::OutOfMemory)` if the memory limit is reached.
35    /// Returns the new length on success.
36    pub fn rpush(&mut self, key: &str, values: &[Bytes]) -> Result<usize, WriteError> {
37        self.list_push(key, values, false)
38    }
39
40    /// Pops a value from the head (left) of a list.
41    ///
42    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
43    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
44    pub fn lpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
45        self.list_pop(key, true)
46    }
47
48    /// Pops a value from the tail (right) of a list.
49    ///
50    /// Returns `Ok(None)` if the key doesn't exist. Removes the key if
51    /// the list becomes empty. Returns `Err(WrongType)` on type mismatch.
52    pub fn rpop(&mut self, key: &str) -> Result<Option<Bytes>, WrongType> {
53        self.list_pop(key, false)
54    }
55
56    /// Pops up to `count` values from the head of a list.
57    ///
58    /// Returns `Ok(None)` if the key doesn't exist or is empty. Returns
59    /// `Ok(Some(items))` with 1–count elements otherwise. Removes the key
60    /// when the list becomes empty. Returns `Err(WrongType)` on type mismatch.
61    pub fn lpop_count(&mut self, key: &str, count: usize) -> Result<Option<Vec<Bytes>>, WrongType> {
62        let mut items = Vec::with_capacity(count);
63        for _ in 0..count {
64            match self.lpop(key)? {
65                Some(v) => items.push(v),
66                None => break,
67            }
68        }
69        if items.is_empty() {
70            Ok(None)
71        } else {
72            Ok(Some(items))
73        }
74    }
75
76    /// Pops up to `count` values from the tail of a list.
77    ///
78    /// Returns `Ok(None)` if the key doesn't exist or is empty. Returns
79    /// `Ok(Some(items))` with 1–count elements otherwise. Removes the key
80    /// when the list becomes empty. Returns `Err(WrongType)` on type mismatch.
81    pub fn rpop_count(&mut self, key: &str, count: usize) -> Result<Option<Vec<Bytes>>, WrongType> {
82        let mut items = Vec::with_capacity(count);
83        for _ in 0..count {
84            match self.rpop(key)? {
85                Some(v) => items.push(v),
86                None => break,
87            }
88        }
89        if items.is_empty() {
90            Ok(None)
91        } else {
92            Ok(Some(items))
93        }
94    }
95
96    /// Returns a range of elements from a list by index.
97    ///
98    /// Supports negative indices (e.g. -1 = last element). Out-of-bounds
99    /// indices are clamped to the list boundaries. Returns `Err(WrongType)`
100    /// on type mismatch. Missing keys return an empty vec.
101    pub fn lrange(&mut self, key: &str, start: i64, stop: i64) -> Result<Vec<Bytes>, WrongType> {
102        let Some(entry) = self.get_live_entry(key) else {
103            return Ok(vec![]);
104        };
105        match &entry.value {
106            Value::List(deque) => {
107                let len = deque.len() as i64;
108                let (s, e) = normalize_range(start, stop, len);
109                // empty range: inverted indices or both out of bounds
110                if s > e {
111                    return Ok(vec![]);
112                }
113                Ok(deque
114                    .iter()
115                    .skip(s as usize)
116                    .take((e - s + 1) as usize)
117                    .cloned()
118                    .collect())
119            }
120            _ => Err(WrongType),
121        }
122    }
123
124    /// Returns the length of a list, or 0 if the key doesn't exist.
125    ///
126    /// Returns `Err(WrongType)` on type mismatch.
127    pub fn llen(&mut self, key: &str) -> Result<usize, WrongType> {
128        if self.remove_if_expired(key) {
129            return Ok(0);
130        }
131        match self.entries.get(key) {
132            None => Ok(0),
133            Some(entry) => match &entry.value {
134                Value::List(deque) => Ok(deque.len()),
135                _ => Err(WrongType),
136            },
137        }
138    }
139
140    /// Returns the element at `index` in the list, or `None` if out of bounds.
141    ///
142    /// Negative indices count from the tail (-1 = last element).
143    /// Returns `Err(WrongType)` if the key holds a non-list value.
144    pub fn lindex(&mut self, key: &str, index: i64) -> Result<Option<Bytes>, WrongType> {
145        let Some(entry) = self.get_live_entry(key) else {
146            return Ok(None);
147        };
148        match &entry.value {
149            Value::List(deque) => {
150                Ok(resolve_index(index, deque.len()).and_then(|i| deque.get(i).cloned()))
151            }
152            _ => Err(WrongType),
153        }
154    }
155
156    /// Sets the element at `index` to `value`.
157    ///
158    /// Negative indices count from the tail. Returns an error if the key
159    /// doesn't exist, the index is out of range, or the key holds the
160    /// wrong type.
161    pub fn lset(&mut self, key: &str, index: i64, value: Bytes) -> Result<(), LsetError> {
162        self.remove_if_expired(key);
163
164        match self.entries.get(key) {
165            None => return Err(LsetError::NoSuchKey),
166            Some(e) if !matches!(e.value, Value::List(_)) => return Err(LsetError::WrongType),
167            _ => {}
168        }
169
170        // mutate in a scoped block so entry borrow is released for memory updates
171        let (old_len, new_len) = {
172            let entry = self.entries.get_mut(key).expect("verified above");
173            let Value::List(ref mut deque) = entry.value else {
174                unreachable!("type checked");
175            };
176            let pos = resolve_index(index, deque.len()).ok_or(LsetError::IndexOutOfRange)?;
177            let elem = deque.get_mut(pos).ok_or(LsetError::IndexOutOfRange)?;
178            let old_len = elem.len();
179            let new_len = value.len();
180            *elem = value;
181            (old_len, new_len)
182        };
183
184        let delta = new_len as isize - old_len as isize;
185        {
186            let entry = self.entries.get_mut(key).expect("key confirmed to exist");
187            entry.touch(self.track_access);
188            if delta > 0 {
189                entry.cached_value_size += delta as u32;
190            } else if delta < 0 {
191                entry.cached_value_size = entry.cached_value_size.saturating_sub((-delta) as u32);
192            }
193        }
194        self.bump_version(key);
195        if delta > 0 {
196            self.memory.grow_by(delta as usize);
197        } else if delta < 0 {
198            self.memory.shrink_by((-delta) as usize);
199        }
200
201        Ok(())
202    }
203
204    /// Trims a list to keep only elements in the range [start, stop].
205    ///
206    /// Supports negative indices. If the range is empty or inverted, the
207    /// key is deleted. Missing keys are a no-op.
208    pub fn ltrim(&mut self, key: &str, start: i64, stop: i64) -> Result<(), WrongType> {
209        if self.remove_if_expired(key) {
210            return Ok(());
211        }
212
213        let (len, old_entry_size) = match self.entries.get(key) {
214            None => return Ok(()),
215            Some(e) => match &e.value {
216                Value::List(d) => (d.len(), e.entry_size(key)),
217                _ => return Err(WrongType),
218            },
219        };
220
221        let (s, e) = normalize_range(start, stop, len as i64);
222
223        let (is_empty, new_value_size) = {
224            let entry = self.entries.get_mut(key).expect("verified above");
225            {
226                let Value::List(ref mut deque) = entry.value else {
227                    unreachable!("type checked");
228                };
229                if s > e || s >= len as i64 {
230                    deque.clear();
231                } else {
232                    let s_usize = s as usize;
233                    let e_usize = (e as usize).min(len - 1);
234                    deque.truncate(e_usize + 1);
235                    deque.drain(..s_usize);
236                }
237            }
238
239            let empty = matches!(&entry.value, Value::List(d) if d.is_empty());
240            entry.touch(self.track_access);
241            let nvs = if !empty {
242                let size = memory::value_size(&entry.value);
243                entry.cached_value_size = size as u32;
244                size
245            } else {
246                0
247            };
248
249            (empty, nvs)
250        };
251
252        self.bump_version(key);
253
254        if is_empty {
255            if let Some(removed) = self.entries.remove(key) {
256                self.decrement_expiry_if_set(&removed);
257            }
258            self.memory.remove_with_size(old_entry_size);
259        } else {
260            let new_entry_size = key.len() + new_value_size + memory::ENTRY_OVERHEAD;
261            self.memory.adjust(old_entry_size, new_entry_size);
262        }
263
264        Ok(())
265    }
266
267    /// Inserts `value` before or after the first occurrence of `pivot`.
268    ///
269    /// Returns the new list length on success, -1 if the pivot was not
270    /// found, or 0 if the key doesn't exist.
271    pub fn linsert(
272        &mut self,
273        key: &str,
274        before: bool,
275        pivot: &[u8],
276        value: Bytes,
277    ) -> Result<i64, WriteError> {
278        self.remove_if_expired(key);
279
280        match self.entries.get(key) {
281            None => return Ok(0),
282            Some(e) if !matches!(e.value, Value::List(_)) => {
283                return Err(WriteError::WrongType);
284            }
285            _ => {}
286        }
287
288        // find pivot position
289        let pivot_pos = {
290            let entry = self.entries.get(key).expect("exists");
291            match &entry.value {
292                Value::List(deque) => deque.iter().position(|e| e.as_ref() == pivot),
293                _ => unreachable!("type checked"),
294            }
295        };
296
297        let pivot_pos = match pivot_pos {
298            Some(p) => p,
299            None => return Ok(-1),
300        };
301
302        let element_cost = memory::VECDEQUE_ELEMENT_OVERHEAD + value.len();
303        if !self.enforce_memory_limit(element_cost) {
304            return Err(WriteError::OutOfMemory);
305        }
306
307        let new_len = {
308            let entry = match self.entries.get_mut(key) {
309                Some(e) => e,
310                None => return Ok(0), // evicted during memory enforcement
311            };
312            let Value::List(ref mut deque) = entry.value else {
313                unreachable!("type checked");
314            };
315
316            let insert_pos = if before { pivot_pos } else { pivot_pos + 1 };
317            if insert_pos > deque.len() {
318                return Ok(-1); // safety: pivot position no longer valid
319            }
320
321            deque.insert(insert_pos, value);
322            let len = deque.len() as i64;
323            entry.touch(self.track_access);
324            entry.cached_value_size += element_cost as u32;
325            len
326        };
327
328        self.bump_version(key);
329        self.memory.grow_by(element_cost);
330        Ok(new_len)
331    }
332
333    /// Removes elements equal to `value` from the list.
334    ///
335    /// - `count > 0`: remove first `count` matches from head
336    /// - `count < 0`: remove last `|count|` matches from tail
337    /// - `count == 0`: remove all matches
338    ///
339    /// Returns the number of elements removed.
340    pub fn lrem(&mut self, key: &str, count: i64, value: &[u8]) -> Result<usize, WrongType> {
341        if self.remove_if_expired(key) {
342            return Ok(0);
343        }
344
345        match self.entries.get(key) {
346            None => return Ok(0),
347            Some(e) if !matches!(e.value, Value::List(_)) => return Err(WrongType),
348            _ => {}
349        }
350
351        let old_entry_size = self.entries.get(key).expect("exists").entry_size(key);
352
353        let (removed_count, removed_bytes, is_empty) = {
354            let entry = self.entries.get_mut(key).expect("exists");
355            let Value::List(ref mut deque) = entry.value else {
356                unreachable!("type checked");
357            };
358
359            let limit = if count == 0 {
360                usize::MAX
361            } else {
362                count.unsigned_abs() as usize
363            };
364            let mut indices: Vec<usize> = Vec::new();
365
366            if count >= 0 {
367                for (i, elem) in deque.iter().enumerate() {
368                    if elem.as_ref() == value {
369                        indices.push(i);
370                        if indices.len() >= limit {
371                            break;
372                        }
373                    }
374                }
375            } else {
376                for (i, elem) in deque.iter().enumerate().rev() {
377                    if elem.as_ref() == value {
378                        indices.push(i);
379                        if indices.len() >= limit {
380                            break;
381                        }
382                    }
383                }
384            }
385
386            let n = indices.len();
387            if n == 0 {
388                return Ok(0);
389            }
390
391            // remove from highest index first to preserve lower indices
392            indices.sort_unstable_by(|a, b| b.cmp(a));
393
394            let mut bytes = 0usize;
395            for idx in &indices {
396                if let Some(elem) = deque.remove(*idx) {
397                    bytes += elem.len() + memory::VECDEQUE_ELEMENT_OVERHEAD;
398                }
399            }
400
401            let empty = deque.is_empty();
402            entry.touch(self.track_access);
403
404            if !empty {
405                entry.cached_value_size =
406                    (entry.cached_value_size as usize).saturating_sub(bytes) as u32;
407            }
408
409            (n, bytes, empty)
410        };
411
412        self.bump_version(key);
413
414        if is_empty {
415            if let Some(removed) = self.entries.remove(key) {
416                self.decrement_expiry_if_set(&removed);
417            }
418            self.memory.remove_with_size(old_entry_size);
419        } else {
420            self.memory.shrink_by(removed_bytes);
421        }
422
423        Ok(removed_count)
424    }
425
426    /// Finds the positions of elements matching `element` in the list.
427    ///
428    /// - `rank`: 1-based. Positive scans from head, negative from tail.
429    ///   Rank 2 means "start from the 2nd match".
430    /// - `count`: 0 returns all matches, N returns at most N.
431    /// - `maxlen`: limits the scan to the first (or last) N entries.
432    pub fn lpos(
433        &mut self,
434        key: &str,
435        element: &[u8],
436        rank: i64,
437        count: usize,
438        maxlen: usize,
439    ) -> Result<Vec<i64>, WrongType> {
440        let Some(entry) = self.get_live_entry(key) else {
441            return Ok(vec![]);
442        };
443
444        let Value::List(ref deque) = entry.value else {
445            return Err(WrongType);
446        };
447
448        let len = deque.len();
449        let limit = if maxlen == 0 { len } else { maxlen.min(len) };
450        let rank = if rank == 0 { 1 } else { rank };
451        let forward = rank > 0;
452        let skip = (rank.unsigned_abs() as usize).saturating_sub(1);
453
454        let mut skipped = 0usize;
455        let mut results = Vec::new();
456
457        if forward {
458            for i in 0..limit {
459                if deque.get(i).map(|e| e.as_ref() == element).unwrap_or(false) {
460                    if skipped < skip {
461                        skipped += 1;
462                    } else {
463                        results.push(i as i64);
464                        if count > 0 && results.len() >= count {
465                            break;
466                        }
467                    }
468                }
469            }
470        } else {
471            let start = if maxlen == 0 {
472                0
473            } else {
474                len.saturating_sub(maxlen)
475            };
476            for i in (start..len).rev() {
477                if deque.get(i).map(|e| e.as_ref() == element).unwrap_or(false) {
478                    if skipped < skip {
479                        skipped += 1;
480                    } else {
481                        results.push(i as i64);
482                        if count > 0 && results.len() >= count {
483                            break;
484                        }
485                    }
486                }
487            }
488        }
489
490        Ok(results)
491    }
492
493    /// Internal push implementation shared by lpush/rpush.
494    pub(super) fn list_push(
495        &mut self,
496        key: &str,
497        values: &[Bytes],
498        left: bool,
499    ) -> Result<usize, WriteError> {
500        self.remove_if_expired(key);
501
502        let is_new = self.ensure_collection_type(key, |v| matches!(v, Value::List(_)))?;
503
504        let element_increase: usize = values
505            .iter()
506            .map(|v| memory::VECDEQUE_ELEMENT_OVERHEAD + v.len())
507            .sum();
508        self.reserve_memory(
509            is_new,
510            key,
511            memory::VECDEQUE_BASE_OVERHEAD,
512            element_increase,
513        )?;
514
515        if is_new {
516            self.insert_empty(key, Value::List(VecDeque::new()));
517        }
518
519        // safe: key was just inserted or confirmed to exist
520        let len = {
521            let entry = self.entries.get_mut(key).unwrap();
522            let Value::List(ref mut deque) = entry.value else {
523                unreachable!("type verified by ensure_collection_type");
524            };
525            for val in values {
526                if left {
527                    deque.push_front(val.clone());
528                } else {
529                    deque.push_back(val.clone());
530                }
531            }
532            let len = deque.len();
533            entry.touch(self.track_access);
534            entry.cached_value_size += element_increase as u32;
535            len
536        };
537
538        self.bump_version(key);
539        // apply the known delta — no need to rescan the entire list
540        self.memory.grow_by(element_increase);
541
542        Ok(len)
543    }
544
545    /// Atomically pops a value from `source` and pushes it to `destination`.
546    ///
547    /// `src_left` controls whether to pop from the head (true) or tail (false)
548    /// of the source. `dst_left` controls whether to push to the head (true)
549    /// or tail (false) of the destination. Source and destination may be the
550    /// same key (rotate). Returns `Ok(None)` if the source list is missing.
551    pub fn lmove(
552        &mut self,
553        source: &str,
554        destination: &str,
555        src_left: bool,
556        dst_left: bool,
557    ) -> Result<Option<Bytes>, WriteError> {
558        if self.remove_if_expired(source) {
559            return Ok(None);
560        }
561        match self.entries.get(source) {
562            None => return Ok(None),
563            Some(e) if !matches!(e.value, Value::List(_)) => return Err(WriteError::WrongType),
564            _ => {}
565        }
566
567        if source != destination {
568            self.remove_if_expired(destination);
569            if let Some(e) = self.entries.get(destination) {
570                if !matches!(e.value, Value::List(_)) {
571                    return Err(WriteError::WrongType);
572                }
573            }
574        }
575
576        let popped = self.list_pop(source, src_left).map_err(WriteError::from)?;
577        let Some(value) = popped else {
578            return Ok(None);
579        };
580        self.list_push(destination, std::slice::from_ref(&value), dst_left)?;
581        Ok(Some(value))
582    }
583
584    /// Internal pop implementation shared by lpop/rpop.
585    pub(super) fn list_pop(&mut self, key: &str, left: bool) -> Result<Option<Bytes>, WrongType> {
586        if self.remove_if_expired(key) {
587            return Ok(None);
588        }
589
590        let (popped, is_empty) = {
591            let Some(entry) = self.entries.get_mut(key) else {
592                return Ok(None);
593            };
594            if !matches!(entry.value, Value::List(_)) {
595                return Err(WrongType);
596            }
597
598            let Value::List(ref mut deque) = entry.value else {
599                // checked above
600                return Err(WrongType);
601            };
602            let popped = if left {
603                deque.pop_front()
604            } else {
605                deque.pop_back()
606            };
607            entry.touch(self.track_access);
608
609            let is_empty = matches!(&entry.value, Value::List(d) if d.is_empty());
610            (popped, is_empty)
611        };
612
613        self.bump_version(key);
614        if let Some(ref elem) = popped {
615            let element_size = elem.len() + memory::VECDEQUE_ELEMENT_OVERHEAD;
616            let old_size = if is_empty {
617                // the list held exactly this one element — compute exact old size
618                // from constants rather than scanning
619                key.len() + memory::ENTRY_OVERHEAD + memory::VECDEQUE_BASE_OVERHEAD + element_size
620            } else {
621                0 // unused in the non-empty branch of cleanup_after_remove
622            };
623            self.cleanup_after_remove(key, old_size, is_empty, element_size);
624        }
625
626        Ok(popped)
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633
634    #[test]
635    fn lpush_creates_list() {
636        let mut ks = Keyspace::new();
637        let len = ks
638            .lpush("list", &[Bytes::from("a"), Bytes::from("b")])
639            .unwrap();
640        assert_eq!(len, 2);
641        // lpush pushes each to front, so order is b, a
642        let items = ks.lrange("list", 0, -1).unwrap();
643        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("a")]);
644    }
645
646    #[test]
647    fn rpush_creates_list() {
648        let mut ks = Keyspace::new();
649        let len = ks
650            .rpush("list", &[Bytes::from("a"), Bytes::from("b")])
651            .unwrap();
652        assert_eq!(len, 2);
653        let items = ks.lrange("list", 0, -1).unwrap();
654        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
655    }
656
657    #[test]
658    fn push_to_existing_list() {
659        let mut ks = Keyspace::new();
660        ks.rpush("list", &[Bytes::from("a")]).unwrap();
661        let len = ks.rpush("list", &[Bytes::from("b")]).unwrap();
662        assert_eq!(len, 2);
663    }
664
665    #[test]
666    fn lpop_returns_front() {
667        let mut ks = Keyspace::new();
668        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
669            .unwrap();
670        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("a")));
671        assert_eq!(ks.lpop("list").unwrap(), Some(Bytes::from("b")));
672        assert_eq!(ks.lpop("list").unwrap(), None); // empty, key deleted
673    }
674
675    #[test]
676    fn rpop_returns_back() {
677        let mut ks = Keyspace::new();
678        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
679            .unwrap();
680        assert_eq!(ks.rpop("list").unwrap(), Some(Bytes::from("b")));
681    }
682
683    #[test]
684    fn pop_from_missing_key() {
685        let mut ks = Keyspace::new();
686        assert_eq!(ks.lpop("nope").unwrap(), None);
687        assert_eq!(ks.rpop("nope").unwrap(), None);
688    }
689
690    #[test]
691    fn empty_list_auto_deletes_key() {
692        let mut ks = Keyspace::new();
693        ks.rpush("list", &[Bytes::from("only")]).unwrap();
694        ks.lpop("list").unwrap();
695        assert!(!ks.exists("list"));
696        assert_eq!(ks.stats().key_count, 0);
697        assert_eq!(ks.stats().used_bytes, 0);
698    }
699
700    #[test]
701    fn lrange_negative_indices() {
702        let mut ks = Keyspace::new();
703        ks.rpush(
704            "list",
705            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
706        )
707        .unwrap();
708        // -2 to -1 => last two elements
709        let items = ks.lrange("list", -2, -1).unwrap();
710        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
711    }
712
713    #[test]
714    fn lrange_out_of_bounds_clamps() {
715        let mut ks = Keyspace::new();
716        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
717            .unwrap();
718        let items = ks.lrange("list", -100, 100).unwrap();
719        assert_eq!(items, vec![Bytes::from("a"), Bytes::from("b")]);
720    }
721
722    #[test]
723    fn lrange_missing_key_returns_empty() {
724        let mut ks = Keyspace::new();
725        assert_eq!(ks.lrange("nope", 0, -1).unwrap(), Vec::<Bytes>::new());
726    }
727
728    #[test]
729    fn llen_returns_length() {
730        let mut ks = Keyspace::new();
731        assert_eq!(ks.llen("nope").unwrap(), 0);
732        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
733            .unwrap();
734        assert_eq!(ks.llen("list").unwrap(), 2);
735    }
736
737    #[test]
738    fn list_memory_tracked_on_push_pop() {
739        let mut ks = Keyspace::new();
740        ks.rpush("list", &[Bytes::from("hello")]).unwrap();
741        let after_push = ks.stats().used_bytes;
742        assert!(after_push > 0);
743
744        ks.rpush("list", &[Bytes::from("world")]).unwrap();
745        let after_second = ks.stats().used_bytes;
746        assert!(after_second > after_push);
747
748        ks.lpop("list").unwrap();
749        let after_pop = ks.stats().used_bytes;
750        assert!(after_pop < after_second);
751    }
752
753    #[test]
754    fn lpush_on_string_key_returns_wrongtype() {
755        let mut ks = Keyspace::new();
756        ks.set("s".into(), Bytes::from("val"), None, false, false);
757        assert!(ks.lpush("s", &[Bytes::from("nope")]).is_err());
758    }
759
760    #[test]
761    fn lrange_on_string_key_returns_wrongtype() {
762        let mut ks = Keyspace::new();
763        ks.set("s".into(), Bytes::from("val"), None, false, false);
764        assert!(ks.lrange("s", 0, -1).is_err());
765    }
766
767    #[test]
768    fn llen_on_string_key_returns_wrongtype() {
769        let mut ks = Keyspace::new();
770        ks.set("s".into(), Bytes::from("val"), None, false, false);
771        assert!(ks.llen("s").is_err());
772    }
773
774    #[test]
775    fn lpush_rejects_when_memory_full() {
776        let config = ShardConfig {
777            max_memory: Some(150),
778            eviction_policy: EvictionPolicy::NoEviction,
779            ..ShardConfig::default()
780        };
781        let mut ks = Keyspace::with_config(config);
782
783        // first key eats up most of the budget
784        assert_eq!(
785            ks.set("a".into(), Bytes::from("val"), None, false, false),
786            SetResult::Ok
787        );
788
789        // lpush should be rejected — not enough room
790        let result = ks.lpush("list", &[Bytes::from("big-value-here")]);
791        assert_eq!(result, Err(WriteError::OutOfMemory));
792
793        // original key should be untouched
794        assert!(ks.exists("a"));
795    }
796
797    #[test]
798    fn rpush_rejects_when_memory_full() {
799        let config = ShardConfig {
800            max_memory: Some(150),
801            eviction_policy: EvictionPolicy::NoEviction,
802            ..ShardConfig::default()
803        };
804        let mut ks = Keyspace::with_config(config);
805
806        assert_eq!(
807            ks.set("a".into(), Bytes::from("val"), None, false, false),
808            SetResult::Ok
809        );
810
811        let result = ks.rpush("list", &[Bytes::from("big-value-here")]);
812        assert_eq!(result, Err(WriteError::OutOfMemory));
813    }
814
815    #[test]
816    fn lpush_evicts_under_lru_policy() {
817        // "a" entry = 1 + 3 + 128 = 132 bytes.
818        // list entry = 4 + 24 + (4 + 32) + 128 = 192 bytes.
819        // effective limit = 250 * 90 / 100 = 225. fits one entry but not both,
820        // so lpush should evict "a" to make room for the list.
821        let config = ShardConfig {
822            max_memory: Some(250),
823            eviction_policy: EvictionPolicy::AllKeysLru,
824            ..ShardConfig::default()
825        };
826        let mut ks = Keyspace::with_config(config);
827
828        assert_eq!(
829            ks.set("a".into(), Bytes::from("val"), None, false, false),
830            SetResult::Ok
831        );
832
833        // should evict "a" to make room for the list
834        assert!(ks.lpush("list", &[Bytes::from("item")]).is_ok());
835        assert!(!ks.exists("a"));
836    }
837
838    #[test]
839    fn list_auto_deleted_when_empty() {
840        let mut ks = Keyspace::new();
841        ks.lpush("list", &[Bytes::from("a"), Bytes::from("b")])
842            .unwrap();
843        assert_eq!(ks.len(), 1);
844
845        // pop all elements
846        let _ = ks.lpop("list");
847        let _ = ks.lpop("list");
848
849        // list should be auto-deleted
850        assert_eq!(ks.len(), 0);
851        assert!(!ks.exists("list"));
852    }
853
854    #[test]
855    fn lrange_inverted_start_stop_returns_empty() {
856        let mut ks = Keyspace::new();
857        ks.lpush(
858            "list",
859            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
860        )
861        .unwrap();
862
863        // start > stop with positive indices
864        let result = ks.lrange("list", 2, 0).unwrap();
865        assert!(result.is_empty());
866    }
867
868    #[test]
869    fn lrange_large_stop_clamps_to_len() {
870        let mut ks = Keyspace::new();
871        ks.lpush("list", &[Bytes::from("a"), Bytes::from("b")])
872            .unwrap();
873
874        // large indices should clamp to list bounds
875        let result = ks.lrange("list", 0, 1000).unwrap();
876        assert_eq!(result.len(), 2);
877    }
878
879    // --- lindex ---
880
881    #[test]
882    fn lindex_returns_element() {
883        let mut ks = Keyspace::new();
884        ks.rpush(
885            "list",
886            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
887        )
888        .unwrap();
889        assert_eq!(ks.lindex("list", 0).unwrap(), Some(Bytes::from("a")));
890        assert_eq!(ks.lindex("list", 2).unwrap(), Some(Bytes::from("c")));
891    }
892
893    #[test]
894    fn lindex_negative() {
895        let mut ks = Keyspace::new();
896        ks.rpush(
897            "list",
898            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
899        )
900        .unwrap();
901        assert_eq!(ks.lindex("list", -1).unwrap(), Some(Bytes::from("c")));
902        assert_eq!(ks.lindex("list", -3).unwrap(), Some(Bytes::from("a")));
903    }
904
905    #[test]
906    fn lindex_out_of_bounds() {
907        let mut ks = Keyspace::new();
908        ks.rpush("list", &[Bytes::from("a")]).unwrap();
909        assert_eq!(ks.lindex("list", 1).unwrap(), None);
910        assert_eq!(ks.lindex("list", -2).unwrap(), None);
911        assert_eq!(ks.lindex("list", 100).unwrap(), None);
912    }
913
914    #[test]
915    fn lindex_missing_key() {
916        let mut ks = Keyspace::new();
917        assert_eq!(ks.lindex("nope", 0).unwrap(), None);
918    }
919
920    #[test]
921    fn lindex_wrong_type() {
922        let mut ks = Keyspace::new();
923        ks.set("s".into(), Bytes::from("val"), None, false, false);
924        assert!(ks.lindex("s", 0).is_err());
925    }
926
927    // --- lset ---
928
929    #[test]
930    fn lset_replaces_element() {
931        let mut ks = Keyspace::new();
932        ks.rpush(
933            "list",
934            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
935        )
936        .unwrap();
937        ks.lset("list", 1, Bytes::from("B")).unwrap();
938        let items = ks.lrange("list", 0, -1).unwrap();
939        assert_eq!(
940            items,
941            vec![Bytes::from("a"), Bytes::from("B"), Bytes::from("c")]
942        );
943    }
944
945    #[test]
946    fn lset_negative_index() {
947        let mut ks = Keyspace::new();
948        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
949            .unwrap();
950        ks.lset("list", -1, Bytes::from("B")).unwrap();
951        assert_eq!(ks.lindex("list", 1).unwrap(), Some(Bytes::from("B")));
952    }
953
954    #[test]
955    fn lset_out_of_range() {
956        let mut ks = Keyspace::new();
957        ks.rpush("list", &[Bytes::from("a")]).unwrap();
958        assert_eq!(
959            ks.lset("list", 5, Bytes::from("x")),
960            Err(LsetError::IndexOutOfRange)
961        );
962    }
963
964    #[test]
965    fn lset_missing_key() {
966        let mut ks = Keyspace::new();
967        assert_eq!(
968            ks.lset("nope", 0, Bytes::from("x")),
969            Err(LsetError::NoSuchKey)
970        );
971    }
972
973    #[test]
974    fn lset_wrong_type() {
975        let mut ks = Keyspace::new();
976        ks.set("s".into(), Bytes::from("val"), None, false, false);
977        assert_eq!(ks.lset("s", 0, Bytes::from("x")), Err(LsetError::WrongType));
978    }
979
980    #[test]
981    fn lset_tracks_memory() {
982        let mut ks = Keyspace::new();
983        ks.rpush("list", &[Bytes::from("short")]).unwrap();
984        let before = ks.stats().used_bytes;
985        ks.lset("list", 0, Bytes::from("a much longer value"))
986            .unwrap();
987        let after = ks.stats().used_bytes;
988        assert!(after > before);
989    }
990
991    // --- ltrim ---
992
993    #[test]
994    fn ltrim_keeps_range() {
995        let mut ks = Keyspace::new();
996        ks.rpush(
997            "list",
998            &[
999                Bytes::from("a"),
1000                Bytes::from("b"),
1001                Bytes::from("c"),
1002                Bytes::from("d"),
1003            ],
1004        )
1005        .unwrap();
1006        ks.ltrim("list", 1, 2).unwrap();
1007        let items = ks.lrange("list", 0, -1).unwrap();
1008        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
1009    }
1010
1011    #[test]
1012    fn ltrim_negative_indices() {
1013        let mut ks = Keyspace::new();
1014        ks.rpush(
1015            "list",
1016            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
1017        )
1018        .unwrap();
1019        ks.ltrim("list", -2, -1).unwrap();
1020        let items = ks.lrange("list", 0, -1).unwrap();
1021        assert_eq!(items, vec![Bytes::from("b"), Bytes::from("c")]);
1022    }
1023
1024    #[test]
1025    fn ltrim_inverted_range_deletes() {
1026        let mut ks = Keyspace::new();
1027        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1028            .unwrap();
1029        ks.ltrim("list", 2, 0).unwrap();
1030        assert!(!ks.exists("list"));
1031        assert_eq!(ks.stats().key_count, 0);
1032    }
1033
1034    #[test]
1035    fn ltrim_out_of_bounds_clamps() {
1036        let mut ks = Keyspace::new();
1037        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1038            .unwrap();
1039        ks.ltrim("list", -100, 100).unwrap();
1040        assert_eq!(ks.llen("list").unwrap(), 2);
1041    }
1042
1043    #[test]
1044    fn ltrim_missing_key_noop() {
1045        let mut ks = Keyspace::new();
1046        ks.ltrim("nope", 0, 1).unwrap();
1047    }
1048
1049    #[test]
1050    fn ltrim_wrong_type() {
1051        let mut ks = Keyspace::new();
1052        ks.set("s".into(), Bytes::from("val"), None, false, false);
1053        assert!(ks.ltrim("s", 0, 1).is_err());
1054    }
1055
1056    #[test]
1057    fn ltrim_tracks_memory() {
1058        let mut ks = Keyspace::new();
1059        ks.rpush(
1060            "list",
1061            &[
1062                Bytes::from("a"),
1063                Bytes::from("b"),
1064                Bytes::from("c"),
1065                Bytes::from("d"),
1066            ],
1067        )
1068        .unwrap();
1069        let before = ks.stats().used_bytes;
1070        ks.ltrim("list", 0, 0).unwrap(); // keep only first element
1071        let after = ks.stats().used_bytes;
1072        assert!(after < before);
1073    }
1074
1075    // --- linsert ---
1076
1077    #[test]
1078    fn linsert_before() {
1079        let mut ks = Keyspace::new();
1080        ks.rpush("list", &[Bytes::from("a"), Bytes::from("c")])
1081            .unwrap();
1082        let len = ks.linsert("list", true, b"c", Bytes::from("b")).unwrap();
1083        assert_eq!(len, 3);
1084        let items = ks.lrange("list", 0, -1).unwrap();
1085        assert_eq!(
1086            items,
1087            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
1088        );
1089    }
1090
1091    #[test]
1092    fn linsert_after() {
1093        let mut ks = Keyspace::new();
1094        ks.rpush("list", &[Bytes::from("a"), Bytes::from("c")])
1095            .unwrap();
1096        let len = ks.linsert("list", false, b"a", Bytes::from("b")).unwrap();
1097        assert_eq!(len, 3);
1098        let items = ks.lrange("list", 0, -1).unwrap();
1099        assert_eq!(
1100            items,
1101            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
1102        );
1103    }
1104
1105    #[test]
1106    fn linsert_pivot_not_found() {
1107        let mut ks = Keyspace::new();
1108        ks.rpush("list", &[Bytes::from("a")]).unwrap();
1109        assert_eq!(
1110            ks.linsert("list", true, b"missing", Bytes::from("x"))
1111                .unwrap(),
1112            -1
1113        );
1114    }
1115
1116    #[test]
1117    fn linsert_missing_key() {
1118        let mut ks = Keyspace::new();
1119        assert_eq!(ks.linsert("nope", true, b"a", Bytes::from("x")).unwrap(), 0);
1120    }
1121
1122    #[test]
1123    fn linsert_wrong_type() {
1124        let mut ks = Keyspace::new();
1125        ks.set("s".into(), Bytes::from("val"), None, false, false);
1126        assert!(ks.linsert("s", true, b"a", Bytes::from("x")).is_err());
1127    }
1128
1129    // --- lrem ---
1130
1131    #[test]
1132    fn lrem_positive_count_from_head() {
1133        let mut ks = Keyspace::new();
1134        ks.rpush(
1135            "list",
1136            &[
1137                Bytes::from("a"),
1138                Bytes::from("b"),
1139                Bytes::from("a"),
1140                Bytes::from("c"),
1141                Bytes::from("a"),
1142            ],
1143        )
1144        .unwrap();
1145        let removed = ks.lrem("list", 2, b"a").unwrap();
1146        assert_eq!(removed, 2);
1147        let items = ks.lrange("list", 0, -1).unwrap();
1148        assert_eq!(
1149            items,
1150            vec![Bytes::from("b"), Bytes::from("c"), Bytes::from("a")]
1151        );
1152    }
1153
1154    #[test]
1155    fn lrem_negative_count_from_tail() {
1156        let mut ks = Keyspace::new();
1157        ks.rpush(
1158            "list",
1159            &[
1160                Bytes::from("a"),
1161                Bytes::from("b"),
1162                Bytes::from("a"),
1163                Bytes::from("c"),
1164                Bytes::from("a"),
1165            ],
1166        )
1167        .unwrap();
1168        let removed = ks.lrem("list", -2, b"a").unwrap();
1169        assert_eq!(removed, 2);
1170        let items = ks.lrange("list", 0, -1).unwrap();
1171        assert_eq!(
1172            items,
1173            vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")]
1174        );
1175    }
1176
1177    #[test]
1178    fn lrem_zero_removes_all() {
1179        let mut ks = Keyspace::new();
1180        ks.rpush(
1181            "list",
1182            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("a")],
1183        )
1184        .unwrap();
1185        let removed = ks.lrem("list", 0, b"a").unwrap();
1186        assert_eq!(removed, 2);
1187        let items = ks.lrange("list", 0, -1).unwrap();
1188        assert_eq!(items, vec![Bytes::from("b")]);
1189    }
1190
1191    #[test]
1192    fn lrem_no_matches() {
1193        let mut ks = Keyspace::new();
1194        ks.rpush("list", &[Bytes::from("a"), Bytes::from("b")])
1195            .unwrap();
1196        assert_eq!(ks.lrem("list", 0, b"x").unwrap(), 0);
1197    }
1198
1199    #[test]
1200    fn lrem_missing_key() {
1201        let mut ks = Keyspace::new();
1202        assert_eq!(ks.lrem("nope", 0, b"x").unwrap(), 0);
1203    }
1204
1205    #[test]
1206    fn lrem_wrong_type() {
1207        let mut ks = Keyspace::new();
1208        ks.set("s".into(), Bytes::from("val"), None, false, false);
1209        assert!(ks.lrem("s", 0, b"x").is_err());
1210    }
1211
1212    #[test]
1213    fn lrem_empties_list_deletes_key() {
1214        let mut ks = Keyspace::new();
1215        ks.rpush("list", &[Bytes::from("a"), Bytes::from("a")])
1216            .unwrap();
1217        ks.lrem("list", 0, b"a").unwrap();
1218        assert!(!ks.exists("list"));
1219        assert_eq!(ks.stats().key_count, 0);
1220        assert_eq!(ks.stats().used_bytes, 0);
1221    }
1222
1223    // --- lpos ---
1224
1225    #[test]
1226    fn lpos_finds_first_match() {
1227        let mut ks = Keyspace::new();
1228        ks.rpush(
1229            "list",
1230            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("a")],
1231        )
1232        .unwrap();
1233        let result = ks.lpos("list", b"a", 1, 1, 0).unwrap();
1234        assert_eq!(result, vec![0]);
1235    }
1236
1237    #[test]
1238    fn lpos_with_count_all() {
1239        let mut ks = Keyspace::new();
1240        ks.rpush(
1241            "list",
1242            &[
1243                Bytes::from("a"),
1244                Bytes::from("b"),
1245                Bytes::from("a"),
1246                Bytes::from("c"),
1247                Bytes::from("a"),
1248            ],
1249        )
1250        .unwrap();
1251        let result = ks.lpos("list", b"a", 1, 0, 0).unwrap();
1252        assert_eq!(result, vec![0, 2, 4]);
1253    }
1254
1255    #[test]
1256    fn lpos_with_rank() {
1257        let mut ks = Keyspace::new();
1258        ks.rpush(
1259            "list",
1260            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("a")],
1261        )
1262        .unwrap();
1263        // rank 2 = second match from head
1264        let result = ks.lpos("list", b"a", 2, 1, 0).unwrap();
1265        assert_eq!(result, vec![2]);
1266    }
1267
1268    #[test]
1269    fn lpos_negative_rank() {
1270        let mut ks = Keyspace::new();
1271        ks.rpush(
1272            "list",
1273            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("a")],
1274        )
1275        .unwrap();
1276        // rank -1 = last match
1277        let result = ks.lpos("list", b"a", -1, 1, 0).unwrap();
1278        assert_eq!(result, vec![2]);
1279    }
1280
1281    #[test]
1282    fn lpos_with_maxlen() {
1283        let mut ks = Keyspace::new();
1284        ks.rpush(
1285            "list",
1286            &[
1287                Bytes::from("b"),
1288                Bytes::from("a"),
1289                Bytes::from("b"),
1290                Bytes::from("a"),
1291            ],
1292        )
1293        .unwrap();
1294        // maxlen=2: only scan first 2 elements
1295        let result = ks.lpos("list", b"a", 1, 0, 2).unwrap();
1296        assert_eq!(result, vec![1]);
1297    }
1298
1299    #[test]
1300    fn lpos_no_match() {
1301        let mut ks = Keyspace::new();
1302        ks.rpush("list", &[Bytes::from("a")]).unwrap();
1303        let result = ks.lpos("list", b"x", 1, 1, 0).unwrap();
1304        assert!(result.is_empty());
1305    }
1306
1307    #[test]
1308    fn lpos_missing_key() {
1309        let mut ks = Keyspace::new();
1310        let result = ks.lpos("nope", b"a", 1, 1, 0).unwrap();
1311        assert!(result.is_empty());
1312    }
1313
1314    #[test]
1315    fn lpos_wrong_type() {
1316        let mut ks = Keyspace::new();
1317        ks.set("s".into(), Bytes::from("val"), None, false, false);
1318        assert!(ks.lpos("s", b"a", 1, 1, 0).is_err());
1319    }
1320
1321    #[test]
1322    fn lmove_left_to_right() {
1323        let mut ks = Keyspace::new();
1324        ks.rpush(
1325            "src",
1326            &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")],
1327        )
1328        .unwrap();
1329        let moved = ks.lmove("src", "dst", true, false).unwrap();
1330        assert_eq!(moved, Some(Bytes::from("a")));
1331        // src should now be [b, c]
1332        assert_eq!(
1333            ks.lrange("src", 0, -1).unwrap(),
1334            vec![Bytes::from("b"), Bytes::from("c")]
1335        );
1336        // dst should be [a]
1337        assert_eq!(ks.lrange("dst", 0, -1).unwrap(), vec![Bytes::from("a")]);
1338    }
1339
1340    #[test]
1341    fn lmove_rotate_same_key() {
1342        let mut ks = Keyspace::new();
1343        ks.rpush("q", &[Bytes::from("1"), Bytes::from("2"), Bytes::from("3")])
1344            .unwrap();
1345        // rotate: pop from left, push to right
1346        let moved = ks.lmove("q", "q", true, false).unwrap();
1347        assert_eq!(moved, Some(Bytes::from("1")));
1348        let items = ks.lrange("q", 0, -1).unwrap();
1349        assert_eq!(
1350            items,
1351            vec![Bytes::from("2"), Bytes::from("3"), Bytes::from("1")]
1352        );
1353    }
1354
1355    #[test]
1356    fn lmove_missing_source() {
1357        let mut ks = Keyspace::new();
1358        let moved = ks.lmove("missing", "dst", true, true).unwrap();
1359        assert_eq!(moved, None);
1360        assert!(!ks.exists("dst"));
1361    }
1362
1363    #[test]
1364    fn lmove_wrong_type_returns_error() {
1365        let mut ks = Keyspace::new();
1366        ks.set("s".into(), Bytes::from("hello"), None, false, false);
1367        assert!(ks.lmove("s", "dst", true, true).is_err());
1368    }
1369
1370    #[test]
1371    fn lpop_count_pops_multiple_from_head() {
1372        let mut ks = Keyspace::new();
1373        ks.rpush(
1374            "l",
1375            &[
1376                Bytes::from("a"),
1377                Bytes::from("b"),
1378                Bytes::from("c"),
1379                Bytes::from("d"),
1380            ],
1381        )
1382        .unwrap();
1383        let result = ks.lpop_count("l", 3).unwrap();
1384        assert_eq!(
1385            result,
1386            Some(vec![Bytes::from("a"), Bytes::from("b"), Bytes::from("c")])
1387        );
1388        // one item left
1389        assert_eq!(ks.llen("l").unwrap(), 1);
1390    }
1391
1392    #[test]
1393    fn rpop_count_pops_multiple_from_tail() {
1394        let mut ks = Keyspace::new();
1395        ks.rpush("l", &[Bytes::from("a"), Bytes::from("b"), Bytes::from("c")])
1396            .unwrap();
1397        let result = ks.rpop_count("l", 2).unwrap();
1398        assert_eq!(result, Some(vec![Bytes::from("c"), Bytes::from("b")]));
1399        assert_eq!(ks.llen("l").unwrap(), 1);
1400    }
1401
1402    #[test]
1403    fn lpop_count_missing_key_returns_none() {
1404        let mut ks = Keyspace::new();
1405        assert_eq!(ks.lpop_count("missing", 5).unwrap(), None);
1406    }
1407
1408    #[test]
1409    fn lpop_count_capped_at_list_size() {
1410        let mut ks = Keyspace::new();
1411        ks.rpush("l", &[Bytes::from("x"), Bytes::from("y")])
1412            .unwrap();
1413        // request more than available
1414        let result = ks.lpop_count("l", 10).unwrap();
1415        assert_eq!(result, Some(vec![Bytes::from("x"), Bytes::from("y")]));
1416        // key deleted after emptied
1417        assert!(!ks.exists("l"));
1418    }
1419
1420    #[test]
1421    fn lpop_count_wrong_type_returns_error() {
1422        let mut ks = Keyspace::new();
1423        ks.set("s".into(), Bytes::from("hello"), None, false, false);
1424        assert!(ks.lpop_count("s", 1).is_err());
1425    }
1426}