Skip to main content

kevy_store/
set.rs

1//! `Store` set commands.
2
3use crate::small_set::{AddResult, SmallSetData, promote};
4use crate::value::{SetData, SmallBytes, Value, set_member_weight};
5use crate::{Entry, Store, StoreError};
6use std::sync::Arc;
7
8impl Store {
9    // ---- sets ----------------------------------------------------------
10
11    /// Borrow the value at `key` for mutation. Returns `None` if the key
12    /// is absent (and `create == false`) or if the entry exists but is a
13    /// non-set type (returns `WrongType`). On `create == true` for a
14    /// missing key, **does not** materialise an entry — the caller
15    /// decides between `SmallSetInline` and `Set` based on the first
16    /// member, so we don't pre-allocate an empty `Arc<KevySet>` only to
17    /// discard it.
18    fn set_value_mut(
19        &mut self,
20        key: &[u8],
21    ) -> Result<Option<&mut Value>, StoreError> {
22        match self.live_entry_mut(key) {
23            None => Ok(None),
24            Some(e) => match &e.value {
25                Value::Set(_) | Value::SmallSetInline(_) => Ok(Some(&mut e.value)),
26                _ => Err(StoreError::WrongType),
27            },
28        }
29    }
30
31    fn drop_if_empty_set(&mut self, key: &[u8]) {
32        let empty = match self.map.get(key).map(|e| &e.value) {
33            Some(Value::Set(s)) => s.is_empty(),
34            Some(Value::SmallSetInline(s)) => s.len() == 0,
35            _ => false,
36        };
37        if empty {
38            self.remove_entry(key);
39        }
40    }
41
42    /// `SADD` — returns the count of newly-added members.
43    pub fn sadd(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
44        let slices: Vec<&[u8]> = members.iter().map(Vec::as_slice).collect();
45        self.sadd_borrowed(key, &slices)
46    }
47
48    /// G4 (v1.25): borrowed-slice SADD — kills the per-member `Vec<u8>` alloc
49    /// the dispatch layer used to do via `rest(args, 2)`. Behaviour identical
50    /// to [`Self::sadd`]; mirrors valkey's `setTypeAdd(set, objectGetVal(
51    /// c->argv[j]))` zero-copy hand-off (`t_set.c:611`).
52    ///
53    /// A.7 O5: takes the encoding-switch path. New key starts as
54    /// `SmallSetInline` if the first member fits; subsequent inserts
55    /// stay inline until `SmallSetData::try_add` returns `NoRoom`, at
56    /// which point the set is promoted in-place to
57    /// `Value::Set(Arc<KevySet>)` and the spilling member is re-inserted
58    /// in the heap-backed variant.
59    pub fn sadd_borrowed(
60        &mut self,
61        key: &[u8],
62        members: &[&[u8]],
63    ) -> Result<usize, StoreError> {
64        if members.is_empty() {
65            return Ok(0);
66        }
67        let mut added = 0usize;
68        let mut delta: i64 = 0;
69        for m in members {
70            match self.sadd_one(key, m)? {
71                SaddOutcome::AddedInline => {
72                    added += 1;
73                    // SmallSetInline carries zero heap (see
74                    // `Value::weight` arm). The 1-byte length prefix +
75                    // member bytes live inside the Value enum body.
76                }
77                SaddOutcome::AddedHeap(w) => {
78                    added += 1;
79                    delta += w;
80                }
81                SaddOutcome::AlreadyPresent => {}
82            }
83        }
84        self.account_delta(key, delta);
85        Ok(added)
86    }
87
88    /// Insert one member; encapsulates the encoding-switch decision so
89    /// `sadd_borrowed` can stay short. The split keeps each function
90    /// under the 50-LOC house rule.
91    fn sadd_one(&mut self, key: &[u8], m: &[u8]) -> Result<SaddOutcome, StoreError> {
92        // Missing key — pick the encoding by member size.
93        if self.set_value_mut(key)?.is_none() {
94            return Ok(self.sadd_create(key, m));
95        }
96        let v = self.set_value_mut(key)?.expect("present and a set type");
97        match v {
98            Value::SmallSetInline(s) => match s.try_add(m) {
99                AddResult::Added => Ok(SaddOutcome::AddedInline),
100                AddResult::AlreadyPresent => Ok(SaddOutcome::AlreadyPresent),
101                AddResult::NoRoom => {
102                    // Upgrade in place: promote inline to KevySet, then
103                    // insert the spilling member into the heap set.
104                    let mut promoted = promote(s);
105                    let smb = SmallBytes::from_slice(m);
106                    let w = set_member_weight(&smb) as i64;
107                    let inserted = promoted.insert(smb);
108                    debug_assert!(inserted, "promote re-inserts existing inline");
109                    *v = Value::Set(Arc::new(promoted));
110                    // The upgrade itself adds the heap weight of the
111                    // promoted set; `reweigh_entry` recomputes from
112                    // scratch so we don't have to track per-member
113                    // deltas separately for the inline→heap step.
114                    self.reweigh_entry(key);
115                    if inserted {
116                        Ok(SaddOutcome::AddedHeap(w))
117                    } else {
118                        Ok(SaddOutcome::AlreadyPresent)
119                    }
120                }
121            },
122            Value::Set(s) => {
123                let smb = SmallBytes::from_slice(m);
124                let w = set_member_weight(&smb) as i64;
125                if Arc::make_mut(s).insert(smb) {
126                    Ok(SaddOutcome::AddedHeap(w))
127                } else {
128                    Ok(SaddOutcome::AlreadyPresent)
129                }
130            }
131            _ => Err(StoreError::WrongType),
132        }
133    }
134
135    /// Create a fresh entry for `key` holding one member. Picks
136    /// `SmallSetInline` when the member fits the inline budget, falls
137    /// back to `Value::Set(Arc<KevySet>)` otherwise.
138    ///
139    /// Returns [`SaddOutcome::AddedInline`] either way — the
140    /// `insert_entry` call has already accounted for the member's
141    /// weight via `value.weight()`, so the caller MUST NOT also apply
142    /// a delta. `AddedInline` carries zero delta in the caller, which
143    /// is exactly the right shape for the heap-backed branch too.
144    fn sadd_create(&mut self, key: &[u8], m: &[u8]) -> SaddOutcome {
145        if let Some(inline) = SmallSetData::with_one(m) {
146            self.insert_entry(
147                SmallBytes::from_slice(key),
148                Entry::new(Value::SmallSetInline(inline), None),
149            );
150        } else {
151            let smb = SmallBytes::from_slice(m);
152            let mut s = SetData::with_capacity(1);
153            s.insert(smb);
154            self.insert_entry(
155                SmallBytes::from_slice(key),
156                Entry::new(Value::Set(Arc::new(s)), None),
157            );
158        }
159        SaddOutcome::AddedInline
160    }
161
162    /// `SREM` — returns the count removed (deleting an emptied key).
163    pub fn srem(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
164        let slices: Vec<&[u8]> = members.iter().map(Vec::as_slice).collect();
165        self.srem_borrowed(key, &slices)
166    }
167
168    /// G4 (v1.25): borrowed-slice SREM — see [`Self::sadd_borrowed`].
169    pub fn srem_borrowed(
170        &mut self,
171        key: &[u8],
172        members: &[&[u8]],
173    ) -> Result<usize, StoreError> {
174        let (removed, delta) = {
175            let mut r = 0usize;
176            let mut d: i64 = 0;
177            if let Some(v) = self.set_value_mut(key)? {
178                match v {
179                    Value::SmallSetInline(s) => {
180                        for m in members {
181                            if s.try_remove(m) {
182                                r += 1;
183                                // Inline removal is zero-heap; no delta.
184                            }
185                        }
186                    }
187                    Value::Set(s) => {
188                        let set_mut = Arc::make_mut(s);
189                        for m in members {
190                            if set_mut.remove(*m) {
191                                r += 1;
192                                d -= set_member_weight(&SmallBytes::from_slice(m)) as i64;
193                            }
194                        }
195                    }
196                    _ => return Err(StoreError::WrongType),
197                }
198            }
199            (r, d)
200        };
201        self.account_delta(key, delta);
202        self.drop_if_empty_set(key);
203        Ok(removed)
204    }
205
206    pub fn sismember(&mut self, key: &[u8], member: &[u8]) -> Result<bool, StoreError> {
207        match self.live_entry(key) {
208            None => Ok(false),
209            Some(e) => match &e.value {
210                Value::Set(s) => Ok(s.contains(member)),
211                Value::SmallSetInline(s) => Ok(s.contains(member)),
212                _ => Err(StoreError::WrongType),
213            },
214        }
215    }
216
217    pub fn scard(&mut self, key: &[u8]) -> Result<usize, StoreError> {
218        match self.live_entry(key) {
219            None => Ok(0),
220            Some(e) => match &e.value {
221                Value::Set(s) => Ok(s.len()),
222                Value::SmallSetInline(s) => Ok(s.len()),
223                _ => Err(StoreError::WrongType),
224            },
225        }
226    }
227
228    pub fn smembers(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
229        match self.live_entry(key) {
230            None => Ok(Vec::new()),
231            Some(e) => match &e.value {
232                Value::Set(s) => {
233                    Ok(s.iter().map(kevy_bytes::SmallBytes::to_vec).collect())
234                }
235                Value::SmallSetInline(s) => {
236                    Ok(s.iter_slices().map(<[u8]>::to_vec).collect())
237                }
238                _ => Err(StoreError::WrongType),
239            },
240        }
241    }
242
243    /// `SPOP key count` — remove and return up to `count` arbitrary members.
244    pub fn spop(&mut self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>, StoreError> {
245        let (out, delta) = {
246            let mut o: Vec<Vec<u8>> = Vec::new();
247            let mut d: i64 = 0;
248            if let Some(v) = self.set_value_mut(key)? {
249                match v {
250                    Value::SmallSetInline(s) => {
251                        let take: Vec<Vec<u8>> =
252                            s.iter_slices().take(count).map(<[u8]>::to_vec).collect();
253                        for m in &take {
254                            s.try_remove(m.as_slice());
255                        }
256                        o = take;
257                    }
258                    Value::Set(s) => {
259                        let set_mut = Arc::make_mut(s);
260                        let take: Vec<Vec<u8>> = set_mut
261                            .iter()
262                            .take(count)
263                            .map(kevy_bytes::SmallBytes::to_vec)
264                            .collect();
265                        for m in &take {
266                            if set_mut.remove(m.as_slice()) {
267                                d -= set_member_weight(&SmallBytes::from_slice(m)) as i64;
268                            }
269                        }
270                        o = take;
271                    }
272                    _ => return Err(StoreError::WrongType),
273                }
274            }
275            (o, d)
276        };
277        self.account_delta(key, delta);
278        self.drop_if_empty_set(key);
279        Ok(out)
280    }
281
282    /// `SRANDMEMBER key count` — up to `count` arbitrary members, not removed.
283    pub fn srandmember(&mut self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>, StoreError> {
284        match self.live_entry(key) {
285            None => Ok(Vec::new()),
286            Some(e) => match &e.value {
287                Value::Set(s) => Ok(s
288                    .iter()
289                    .take(count)
290                    .map(kevy_bytes::SmallBytes::to_vec)
291                    .collect()),
292                Value::SmallSetInline(s) => {
293                    Ok(s.iter_slices().take(count).map(<[u8]>::to_vec).collect())
294                }
295                _ => Err(StoreError::WrongType),
296            },
297        }
298    }
299
300    /// Snapshot of a set's members for cross-shard algebra (SINTER/etc.).
301    pub fn set_snapshot(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
302        self.smembers(key)
303    }
304}
305
306/// Per-member result for the inner [`Store::sadd_one`] step. Lets
307/// `sadd_borrowed` route the weight delta correctly: inline adds carry
308/// zero heap (no delta), heap adds carry the per-member byte budget,
309/// already-present means no count + no delta.
310enum SaddOutcome {
311    AddedInline,
312    AddedHeap(i64),
313    AlreadyPresent,
314}