kevy_store/list_ops.rs
1//! `Store` list ops introduced in v1.27.3 for BullMQ end-to-end:
2//! `RPOPLPUSH`, `LMOVE`, `LPOS`. Kept in a sibling module to keep
3//! `list.rs` under the 500-LOC house rule.
4//!
5//! All three are local-shard-only for v1.27.3 — the cross-shard
6//! Take→Put orchestrator (mirroring `RENAME`'s `exec_rename`) is a
7//! later runtime concern; the dispatch layer routes by source key and
8//! these helpers operate on whatever the local `Store` holds for `dst`.
9
10use crate::value::Value;
11use crate::{Store, StoreError};
12
13impl Store {
14 /// `RPOPLPUSH source destination` — atomically pop one element from
15 /// the tail of `src` and push it onto the head of `dst`. Returns the
16 /// moved element, or `None` if `src` was empty / absent.
17 ///
18 /// When `src == dst` Redis defines the result as a rotation
19 /// (tail → head of the same list), which falls out of this code
20 /// naturally because the pop sees the pre-rotation tail.
21 pub fn rpoplpush(
22 &mut self,
23 src: &[u8],
24 dst: &[u8],
25 ) -> Result<Option<Vec<u8>>, StoreError> {
26 // WRONGTYPE pre-check on dst: if dst exists but isn't a list,
27 // we must reject BEFORE consuming the src element (Redis: the
28 // pop is reverted on WRONGTYPE at the destination).
29 match self.live_entry(dst) {
30 None => {}
31 Some(e) => match &e.value {
32 Value::List(_) | Value::SmallListInline(_) => {}
33 _ => return Err(StoreError::WrongType),
34 },
35 }
36 let mut popped = self.rpop(src, 1)?;
37 let Some(v) = popped.pop() else {
38 return Ok(None);
39 };
40 // Push to the head of dst. `lpush_borrowed` returns the new
41 // length; we want the popped value back to the caller.
42 self.lpush_borrowed(dst, &[v.as_slice()])?;
43 Ok(Some(v))
44 }
45
46 /// `LMOVE source destination LEFT|RIGHT LEFT|RIGHT` — generalised
47 /// `RPOPLPUSH`. `from_left=true` pops from the head, otherwise the
48 /// tail; `to_left=true` pushes to the head, otherwise the tail.
49 pub fn lmove(
50 &mut self,
51 src: &[u8],
52 dst: &[u8],
53 from_left: bool,
54 to_left: bool,
55 ) -> Result<Option<Vec<u8>>, StoreError> {
56 match self.live_entry(dst) {
57 None => {}
58 Some(e) => match &e.value {
59 Value::List(_) | Value::SmallListInline(_) => {}
60 _ => return Err(StoreError::WrongType),
61 },
62 }
63 let mut popped = if from_left {
64 self.lpop(src, 1)?
65 } else {
66 self.rpop(src, 1)?
67 };
68 let Some(v) = popped.pop() else {
69 return Ok(None);
70 };
71 if to_left {
72 self.lpush_borrowed(dst, &[v.as_slice()])?;
73 } else {
74 self.rpush_borrowed(dst, &[v.as_slice()])?;
75 }
76 Ok(Some(v))
77 }
78
79 /// `LPOS key element [RANK n] [COUNT n] [MAXLEN n]` — find the
80 /// zero-based position(s) of `element` in the list.
81 ///
82 /// * `rank > 0` — scan head→tail, skipping the first `rank-1`
83 /// matches. `rank == 1` (default) returns the first match.
84 /// * `rank < 0` — scan tail→head, returning matches as
85 /// absolute (head-relative) indices.
86 /// * `count` — `None` returns the first match as a 1-element vec
87 /// (caller emits an integer / nil); `Some(0)` returns all
88 /// matches; `Some(n)` caps to `n`.
89 /// * `maxlen` — `0` means unlimited; otherwise stop after
90 /// scanning that many elements (in the chosen direction).
91 ///
92 /// Returns the matched indices in scan order. An empty result with
93 /// `count == None` is the caller's signal to emit RESP nil.
94 pub fn lpos(
95 &mut self,
96 key: &[u8],
97 element: &[u8],
98 rank: i64,
99 count: Option<i64>,
100 maxlen: usize,
101 ) -> Result<Vec<i64>, StoreError> {
102 if rank == 0 {
103 return Err(StoreError::OutOfRange);
104 }
105 if let Some(c) = count {
106 if c < 0 {
107 return Err(StoreError::OutOfRange);
108 }
109 }
110 let entries: Vec<Vec<u8>> = match self.live_entry(key) {
111 None => return Ok(Vec::new()),
112 Some(e) => match &e.value {
113 Value::List(l) => l.iter().cloned().collect(),
114 Value::SmallListInline(l) => l.iter().map(<[u8]>::to_vec).collect(),
115 _ => return Err(StoreError::WrongType),
116 },
117 };
118 let n = entries.len();
119 if n == 0 {
120 return Ok(Vec::new());
121 }
122 let skip = (rank.unsigned_abs() as usize).saturating_sub(1);
123 let cap = match count {
124 None => 1,
125 Some(0) => usize::MAX,
126 Some(c) => c as usize,
127 };
128 let want_reverse = rank < 0;
129 let scan_limit = if maxlen == 0 { n } else { maxlen.min(n) };
130 let mut out = Vec::new();
131 let mut scanned = 0usize;
132 let mut skipped = 0usize;
133 let iter: Box<dyn Iterator<Item = (usize, &Vec<u8>)>> = if want_reverse {
134 Box::new(entries.iter().enumerate().rev())
135 } else {
136 Box::new(entries.iter().enumerate())
137 };
138 for (idx, v) in iter {
139 if scanned >= scan_limit {
140 break;
141 }
142 scanned += 1;
143 if v.as_slice() == element {
144 if skipped < skip {
145 skipped += 1;
146 continue;
147 }
148 out.push(idx as i64);
149 if out.len() >= cap {
150 break;
151 }
152 }
153 }
154 Ok(out)
155 }
156}