1use crate::small_set::{AddResult, SmallSetData, promote};
4use crate::value::{SetData, SmallBytes, Value, set_member_weight};
5use crate::{Entry, Store, StoreError};
6use std::sync::Arc;
7
8impl Store {
9 fn set_value_mut(
19 &mut self,
20 key: &[u8],
21 ) -> Result<Option<&mut Value>, StoreError> {
22 match self.live_entry_mut(key) {
23 None => Ok(None),
24 Some(e) => match &e.value {
25 Value::Set(_) | Value::SmallSetInline(_) => Ok(Some(&mut e.value)),
26 _ => Err(StoreError::WrongType),
27 },
28 }
29 }
30
31 fn drop_if_empty_set(&mut self, key: &[u8]) {
32 let empty = match self.map.get(key).map(|e| &e.value) {
33 Some(Value::Set(s)) => s.is_empty(),
34 Some(Value::SmallSetInline(s)) => s.len() == 0,
35 _ => false,
36 };
37 if empty {
38 self.remove_entry(key);
39 }
40 }
41
42 pub fn sadd(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
44 let slices: Vec<&[u8]> = members.iter().map(Vec::as_slice).collect();
45 self.sadd_borrowed(key, &slices)
46 }
47
48 pub fn sadd_borrowed(
60 &mut self,
61 key: &[u8],
62 members: &[&[u8]],
63 ) -> Result<usize, StoreError> {
64 if members.is_empty() {
65 return Ok(0);
66 }
67 let mut added = 0usize;
68 let mut delta: i64 = 0;
69 for m in members {
70 match self.sadd_one(key, m)? {
71 SaddOutcome::AddedInline => {
72 added += 1;
73 }
77 SaddOutcome::AddedHeap(w) => {
78 added += 1;
79 delta += w;
80 }
81 SaddOutcome::AlreadyPresent => {}
82 }
83 }
84 self.account_delta(key, delta);
85 Ok(added)
86 }
87
88 fn sadd_one(&mut self, key: &[u8], m: &[u8]) -> Result<SaddOutcome, StoreError> {
92 if self.set_value_mut(key)?.is_none() {
94 return Ok(self.sadd_create(key, m));
95 }
96 let v = self.set_value_mut(key)?.expect("present and a set type");
97 match v {
98 Value::SmallSetInline(s) => match s.try_add(m) {
99 AddResult::Added => Ok(SaddOutcome::AddedInline),
100 AddResult::AlreadyPresent => Ok(SaddOutcome::AlreadyPresent),
101 AddResult::NoRoom => {
102 let mut promoted = promote(s);
105 let smb = SmallBytes::from_slice(m);
106 let w = set_member_weight(&smb) as i64;
107 let inserted = promoted.insert(smb);
108 debug_assert!(inserted, "promote re-inserts existing inline");
109 *v = Value::Set(Arc::new(promoted));
110 self.reweigh_entry(key);
115 if inserted {
116 Ok(SaddOutcome::AddedHeap(w))
117 } else {
118 Ok(SaddOutcome::AlreadyPresent)
119 }
120 }
121 },
122 Value::Set(s) => {
123 let smb = SmallBytes::from_slice(m);
124 let w = set_member_weight(&smb) as i64;
125 if Arc::make_mut(s).insert(smb) {
126 Ok(SaddOutcome::AddedHeap(w))
127 } else {
128 Ok(SaddOutcome::AlreadyPresent)
129 }
130 }
131 _ => Err(StoreError::WrongType),
132 }
133 }
134
135 fn sadd_create(&mut self, key: &[u8], m: &[u8]) -> SaddOutcome {
145 if let Some(inline) = SmallSetData::with_one(m) {
146 self.insert_entry(
147 SmallBytes::from_slice(key),
148 Entry::new(Value::SmallSetInline(inline), None),
149 );
150 } else {
151 let smb = SmallBytes::from_slice(m);
152 let mut s = SetData::with_capacity(1);
153 s.insert(smb);
154 self.insert_entry(
155 SmallBytes::from_slice(key),
156 Entry::new(Value::Set(Arc::new(s)), None),
157 );
158 }
159 SaddOutcome::AddedInline
160 }
161
162 pub fn srem(&mut self, key: &[u8], members: &[Vec<u8>]) -> Result<usize, StoreError> {
164 let slices: Vec<&[u8]> = members.iter().map(Vec::as_slice).collect();
165 self.srem_borrowed(key, &slices)
166 }
167
168 pub fn srem_borrowed(
170 &mut self,
171 key: &[u8],
172 members: &[&[u8]],
173 ) -> Result<usize, StoreError> {
174 let (removed, delta) = {
175 let mut r = 0usize;
176 let mut d: i64 = 0;
177 if let Some(v) = self.set_value_mut(key)? {
178 match v {
179 Value::SmallSetInline(s) => {
180 for m in members {
181 if s.try_remove(m) {
182 r += 1;
183 }
185 }
186 }
187 Value::Set(s) => {
188 let set_mut = Arc::make_mut(s);
189 for m in members {
190 if set_mut.remove(*m) {
191 r += 1;
192 d -= set_member_weight(&SmallBytes::from_slice(m)) as i64;
193 }
194 }
195 }
196 _ => return Err(StoreError::WrongType),
197 }
198 }
199 (r, d)
200 };
201 self.account_delta(key, delta);
202 self.drop_if_empty_set(key);
203 Ok(removed)
204 }
205
206 pub fn sismember(&mut self, key: &[u8], member: &[u8]) -> Result<bool, StoreError> {
207 match self.live_entry(key) {
208 None => Ok(false),
209 Some(e) => match &e.value {
210 Value::Set(s) => Ok(s.contains(member)),
211 Value::SmallSetInline(s) => Ok(s.contains(member)),
212 _ => Err(StoreError::WrongType),
213 },
214 }
215 }
216
217 pub fn scard(&mut self, key: &[u8]) -> Result<usize, StoreError> {
218 match self.live_entry(key) {
219 None => Ok(0),
220 Some(e) => match &e.value {
221 Value::Set(s) => Ok(s.len()),
222 Value::SmallSetInline(s) => Ok(s.len()),
223 _ => Err(StoreError::WrongType),
224 },
225 }
226 }
227
228 pub fn smembers(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
229 match self.live_entry(key) {
230 None => Ok(Vec::new()),
231 Some(e) => match &e.value {
232 Value::Set(s) => {
233 Ok(s.iter().map(kevy_bytes::SmallBytes::to_vec).collect())
234 }
235 Value::SmallSetInline(s) => {
236 Ok(s.iter_slices().map(<[u8]>::to_vec).collect())
237 }
238 _ => Err(StoreError::WrongType),
239 },
240 }
241 }
242
243 pub fn spop(&mut self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>, StoreError> {
245 let (out, delta) = {
246 let mut o: Vec<Vec<u8>> = Vec::new();
247 let mut d: i64 = 0;
248 if let Some(v) = self.set_value_mut(key)? {
249 match v {
250 Value::SmallSetInline(s) => {
251 let take: Vec<Vec<u8>> =
252 s.iter_slices().take(count).map(<[u8]>::to_vec).collect();
253 for m in &take {
254 s.try_remove(m.as_slice());
255 }
256 o = take;
257 }
258 Value::Set(s) => {
259 let set_mut = Arc::make_mut(s);
260 let take: Vec<Vec<u8>> = set_mut
261 .iter()
262 .take(count)
263 .map(kevy_bytes::SmallBytes::to_vec)
264 .collect();
265 for m in &take {
266 if set_mut.remove(m.as_slice()) {
267 d -= set_member_weight(&SmallBytes::from_slice(m)) as i64;
268 }
269 }
270 o = take;
271 }
272 _ => return Err(StoreError::WrongType),
273 }
274 }
275 (o, d)
276 };
277 self.account_delta(key, delta);
278 self.drop_if_empty_set(key);
279 Ok(out)
280 }
281
282 pub fn srandmember(&mut self, key: &[u8], count: usize) -> Result<Vec<Vec<u8>>, StoreError> {
284 match self.live_entry(key) {
285 None => Ok(Vec::new()),
286 Some(e) => match &e.value {
287 Value::Set(s) => Ok(s
288 .iter()
289 .take(count)
290 .map(kevy_bytes::SmallBytes::to_vec)
291 .collect()),
292 Value::SmallSetInline(s) => {
293 Ok(s.iter_slices().take(count).map(<[u8]>::to_vec).collect())
294 }
295 _ => Err(StoreError::WrongType),
296 },
297 }
298 }
299
300 pub fn set_snapshot(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
302 self.smembers(key)
303 }
304}
305
306enum SaddOutcome {
311 AddedInline,
312 AddedHeap(i64),
313 AlreadyPresent,
314}