Skip to main content

kevy_store/
list_ops.rs

1//! `Store` list ops introduced in v1.27.3 for BullMQ end-to-end:
2//! `RPOPLPUSH`, `LMOVE`, `LPOS`. Kept in a sibling module to keep
3//! `list.rs` under the 500-LOC house rule.
4//!
5//! All three are local-shard-only for v1.27.3 — the cross-shard
6//! Take→Put orchestrator (mirroring `RENAME`'s `exec_rename`) is a
7//! later runtime concern; the dispatch layer routes by source key and
8//! these helpers operate on whatever the local `Store` holds for `dst`.
9
10use crate::value::Value;
11use crate::{Store, StoreError};
12
13impl Store {
14    /// `RPOPLPUSH source destination` — atomically pop one element from
15    /// the tail of `src` and push it onto the head of `dst`. Returns the
16    /// moved element, or `None` if `src` was empty / absent.
17    ///
18    /// When `src == dst` Redis defines the result as a rotation
19    /// (tail → head of the same list), which falls out of this code
20    /// naturally because the pop sees the pre-rotation tail.
21    pub fn rpoplpush(
22        &mut self,
23        src: &[u8],
24        dst: &[u8],
25    ) -> Result<Option<Vec<u8>>, StoreError> {
26        // WRONGTYPE pre-check on dst: if dst exists but isn't a list,
27        // we must reject BEFORE consuming the src element (Redis: the
28        // pop is reverted on WRONGTYPE at the destination).
29        match self.live_entry(dst) {
30            None => {}
31            Some(e) => match &e.value {
32                Value::List(_) | Value::SmallListInline(_) => {}
33                _ => return Err(StoreError::WrongType),
34            },
35        }
36        let mut popped = self.rpop(src, 1)?;
37        let Some(v) = popped.pop() else {
38            return Ok(None);
39        };
40        // Push to the head of dst. `lpush_borrowed` returns the new
41        // length; we want the popped value back to the caller.
42        self.lpush_borrowed(dst, &[v.as_slice()])?;
43        Ok(Some(v))
44    }
45
46    /// `LMOVE source destination LEFT|RIGHT LEFT|RIGHT` — generalised
47    /// `RPOPLPUSH`. `from_left=true` pops from the head, otherwise the
48    /// tail; `to_left=true` pushes to the head, otherwise the tail.
49    pub fn lmove(
50        &mut self,
51        src: &[u8],
52        dst: &[u8],
53        from_left: bool,
54        to_left: bool,
55    ) -> Result<Option<Vec<u8>>, StoreError> {
56        match self.live_entry(dst) {
57            None => {}
58            Some(e) => match &e.value {
59                Value::List(_) | Value::SmallListInline(_) => {}
60                _ => return Err(StoreError::WrongType),
61            },
62        }
63        let mut popped = if from_left {
64            self.lpop(src, 1)?
65        } else {
66            self.rpop(src, 1)?
67        };
68        let Some(v) = popped.pop() else {
69            return Ok(None);
70        };
71        if to_left {
72            self.lpush_borrowed(dst, &[v.as_slice()])?;
73        } else {
74            self.rpush_borrowed(dst, &[v.as_slice()])?;
75        }
76        Ok(Some(v))
77    }
78
79    /// `LPOS key element [RANK n] [COUNT n] [MAXLEN n]` — find the
80    /// zero-based position(s) of `element` in the list.
81    ///
82    /// * `rank > 0` — scan head→tail, skipping the first `rank-1`
83    ///   matches. `rank == 1` (default) returns the first match.
84    /// * `rank < 0` — scan tail→head, returning matches as
85    ///   absolute (head-relative) indices.
86    /// * `count` — `None` returns the first match as a 1-element vec
87    ///   (caller emits an integer / nil); `Some(0)` returns all
88    ///   matches; `Some(n)` caps to `n`.
89    /// * `maxlen` — `0` means unlimited; otherwise stop after
90    ///   scanning that many elements (in the chosen direction).
91    ///
92    /// Returns the matched indices in scan order. An empty result with
93    /// `count == None` is the caller's signal to emit RESP nil.
94    pub fn lpos(
95        &mut self,
96        key: &[u8],
97        element: &[u8],
98        rank: i64,
99        count: Option<i64>,
100        maxlen: usize,
101    ) -> Result<Vec<i64>, StoreError> {
102        if rank == 0 {
103            return Err(StoreError::OutOfRange);
104        }
105        if let Some(c) = count {
106            if c < 0 {
107                return Err(StoreError::OutOfRange);
108            }
109        }
110        let entries: Vec<Vec<u8>> = match self.live_entry(key) {
111            None => return Ok(Vec::new()),
112            Some(e) => match &e.value {
113                Value::List(l) => l.iter().cloned().collect(),
114                Value::SmallListInline(l) => l.iter().map(<[u8]>::to_vec).collect(),
115                _ => return Err(StoreError::WrongType),
116            },
117        };
118        let n = entries.len();
119        if n == 0 {
120            return Ok(Vec::new());
121        }
122        let skip = (rank.unsigned_abs() as usize).saturating_sub(1);
123        let cap = match count {
124            None => 1,
125            Some(0) => usize::MAX,
126            Some(c) => c as usize,
127        };
128        let want_reverse = rank < 0;
129        let scan_limit = if maxlen == 0 { n } else { maxlen.min(n) };
130        let mut out = Vec::new();
131        let mut scanned = 0usize;
132        let mut skipped = 0usize;
133        let iter: Box<dyn Iterator<Item = (usize, &Vec<u8>)>> = if want_reverse {
134            Box::new(entries.iter().enumerate().rev())
135        } else {
136            Box::new(entries.iter().enumerate())
137        };
138        for (idx, v) in iter {
139            if scanned >= scan_limit {
140                break;
141            }
142            scanned += 1;
143            if v.as_slice() == element {
144                if skipped < skip {
145                    skipped += 1;
146                    continue;
147                }
148                out.push(idx as i64);
149                if out.len() >= cap {
150                    break;
151                }
152            }
153        }
154        Ok(out)
155    }
156}