1use crate::small_hash::{self, AddResult as HAddResult, SmallHashData};
4use crate::util::parse_i64;
5use crate::value::{HashData, SmallBytes, Value, hash_field_weight};
6use crate::{Entry, Store, StoreError, now_ns};
7use std::sync::Arc;
8
9impl Store {
10 fn hash_mut(&mut self, key: &[u8], create: bool) -> Result<Option<&mut HashData>, StoreError> {
23 if self.live_entry_mut(key).is_none() {
24 if !create {
25 return Ok(None);
26 }
27 self.insert_entry(
28 SmallBytes::from_slice(key),
29 Entry::new(Value::Hash(Arc::default()), None),
30 );
31 }
32 let is_inline = matches!(
37 self.map.get(key).map(|e| &e.value),
38 Some(Value::SmallHashInline(_))
39 );
40 if is_inline {
41 let promoted = {
42 let e = self.map.get(key).expect("present");
43 if let Value::SmallHashInline(s) = &e.value {
44 small_hash::promote(s)
45 } else {
46 unreachable!()
47 }
48 };
49 self.map.get_mut(key).expect("present").value = Value::Hash(Arc::new(promoted));
50 self.reweigh_entry(key);
51 }
52 match &mut self.map.get_mut(key).expect("present").value {
53 Value::Hash(h) => Ok(Some(Arc::make_mut(h))),
54 _ => Err(StoreError::WrongType),
55 }
56 }
57
58 fn hash_value_for_set(&mut self, key: &[u8]) -> Result<Option<&mut Value>, StoreError> {
62 match self.live_entry_mut(key) {
63 None => Ok(None),
64 Some(e) => match &e.value {
65 Value::Hash(_) | Value::SmallHashInline(_) => Ok(Some(&mut e.value)),
66 _ => Err(StoreError::WrongType),
67 },
68 }
69 }
70
71 fn hash_pairs(&mut self, key: &[u8]) -> Result<Option<Vec<(Vec<u8>, Vec<u8>)>>, StoreError> {
76 match self.live_entry(key) {
77 None => Ok(None),
78 Some(e) => match &e.value {
79 Value::Hash(h) => Ok(Some(
80 h.iter().map(|(f, v)| (f.to_vec(), v.clone())).collect(),
81 )),
82 Value::SmallHashInline(h) => Ok(Some(
83 h.iter().map(|(f, v)| (f.to_vec(), v.to_vec())).collect(),
84 )),
85 _ => Err(StoreError::WrongType),
86 },
87 }
88 }
89
90 pub fn hset_borrowed(
94 &mut self,
95 key: &[u8],
96 pairs: &[(&[u8], &[u8])],
97 ) -> Result<usize, StoreError> {
98 if pairs.is_empty() {
99 return Ok(0);
100 }
101 let mut added = 0usize;
102 let mut delta: i64 = 0;
103 for (f, v) in pairs {
104 match self.hset_one(key, f, v)? {
105 HsetOutcome::AddedInline => {
106 added += 1;
107 }
110 HsetOutcome::UpdatedInline => {}
111 HsetOutcome::AddedHeap(w) => {
112 added += 1;
113 delta += w;
114 }
115 HsetOutcome::UpdatedHeap(d) => {
116 delta += d;
117 }
118 }
119 }
120 self.account_delta(key, delta);
121 Ok(added)
122 }
123
124 pub fn hset(&mut self, key: &[u8], pairs: &[(Vec<u8>, Vec<u8>)]) -> Result<usize, StoreError> {
126 let borrowed: Vec<(&[u8], &[u8])> =
127 pairs.iter().map(|(f, v)| (f.as_slice(), v.as_slice())).collect();
128 self.hset_borrowed(key, &borrowed)
129 }
130
131 pub fn hsetnx(&mut self, key: &[u8], field: &[u8], val: &[u8]) -> Result<bool, StoreError> {
133 let exists = match self.live_entry(key) {
135 None => false,
136 Some(e) => match &e.value {
137 Value::Hash(h) => h.contains_key(field),
138 Value::SmallHashInline(h) => h.contains_key(field),
139 _ => return Err(StoreError::WrongType),
140 },
141 };
142 if exists {
143 return Ok(false);
144 }
145 match self.hset_one(key, field, val)? {
146 HsetOutcome::AddedInline | HsetOutcome::UpdatedInline => Ok(true),
147 HsetOutcome::AddedHeap(w) => {
148 self.account_delta(key, w);
149 Ok(true)
150 }
151 HsetOutcome::UpdatedHeap(_) => Ok(true),
152 }
153 }
154
155 pub fn hget(&mut self, key: &[u8], field: &[u8]) -> Result<Option<&[u8]>, StoreError> {
156 match self.live_entry(key) {
157 None => Ok(None),
158 Some(e) => match &e.value {
159 Value::Hash(h) => Ok(h.get(field).map(Vec::as_slice)),
160 Value::SmallHashInline(h) => Ok(h.get(field)),
161 _ => Err(StoreError::WrongType),
162 },
163 }
164 }
165
166 pub fn hexists(&mut self, key: &[u8], field: &[u8]) -> Result<bool, StoreError> {
167 match self.live_entry(key) {
168 None => Ok(false),
169 Some(e) => match &e.value {
170 Value::Hash(h) => Ok(h.contains_key(field)),
171 Value::SmallHashInline(h) => Ok(h.contains_key(field)),
172 _ => Err(StoreError::WrongType),
173 },
174 }
175 }
176
177 pub fn hlen(&mut self, key: &[u8]) -> Result<usize, StoreError> {
178 match self.live_entry(key) {
179 None => Ok(0),
180 Some(e) => match &e.value {
181 Value::Hash(h) => Ok(h.len()),
182 Value::SmallHashInline(h) => Ok(h.len()),
183 _ => Err(StoreError::WrongType),
184 },
185 }
186 }
187
188 pub fn hmget(
189 &mut self,
190 key: &[u8],
191 fields: &[Vec<u8>],
192 ) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
193 let borrowed: Vec<&[u8]> = fields.iter().map(Vec::as_slice).collect();
194 self.hmget_borrowed(key, &borrowed)
195 }
196
197 pub fn hmget_borrowed(
199 &mut self,
200 key: &[u8],
201 fields: &[&[u8]],
202 ) -> Result<Vec<Option<Vec<u8>>>, StoreError> {
203 match self.live_entry(key) {
204 None => Ok(fields.iter().map(|_| None).collect()),
205 Some(e) => match &e.value {
206 Value::Hash(h) => Ok(fields.iter().map(|f| h.get(*f).cloned()).collect()),
207 Value::SmallHashInline(h) => Ok(fields
208 .iter()
209 .map(|f| h.get(*f).map(<[u8]>::to_vec))
210 .collect()),
211 _ => Err(StoreError::WrongType),
212 },
213 }
214 }
215
216 pub fn hgetall(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
218 match self.hash_pairs(key)? {
219 None => Ok(Vec::new()),
220 Some(pairs) => {
221 let mut out = Vec::with_capacity(pairs.len() * 2);
222 for (f, v) in pairs {
223 out.push(f);
224 out.push(v);
225 }
226 Ok(out)
227 }
228 }
229 }
230
231 pub fn hkeys(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
232 match self.live_entry(key) {
233 None => Ok(Vec::new()),
234 Some(e) => match &e.value {
235 Value::Hash(h) => Ok(h.keys().map(kevy_bytes::SmallBytes::to_vec).collect()),
236 Value::SmallHashInline(h) => Ok(h.iter().map(|(f, _)| f.to_vec()).collect()),
237 _ => Err(StoreError::WrongType),
238 },
239 }
240 }
241
242 pub fn hvals(&mut self, key: &[u8]) -> Result<Vec<Vec<u8>>, StoreError> {
243 match self.live_entry(key) {
244 None => Ok(Vec::new()),
245 Some(e) => match &e.value {
246 Value::Hash(h) => Ok(h.values().cloned().collect()),
247 Value::SmallHashInline(h) => Ok(h.iter().map(|(_, v)| v.to_vec()).collect()),
248 _ => Err(StoreError::WrongType),
249 },
250 }
251 }
252
253 pub fn hdel(&mut self, key: &[u8], fields: &[Vec<u8>]) -> Result<usize, StoreError> {
255 let borrowed: Vec<&[u8]> = fields.iter().map(Vec::as_slice).collect();
256 self.hdel_borrowed(key, &borrowed)
257 }
258
259 pub fn hdel_borrowed(
261 &mut self,
262 key: &[u8],
263 fields: &[&[u8]],
264 ) -> Result<usize, StoreError> {
265 let now = now_ns();
266 if !self.reap(key, now) {
267 return Ok(0);
268 }
269 let (removed, delta, drop_key) = {
270 let h_entry = self.map.get_mut(key).expect("live");
271 match &mut h_entry.value {
272 Value::Hash(h) => {
273 let h = Arc::make_mut(h);
276 let mut r = 0usize;
277 let mut d: i64 = 0;
278 for f in fields {
279 if let Some(old_v) = h.remove(*f) {
280 r += 1;
281 let smb = SmallBytes::from_slice(f);
282 d -= hash_field_weight(&smb, old_v.len()) as i64;
283 }
284 }
285 let drop_now = h.is_empty();
286 (r, d, drop_now)
287 }
288 Value::SmallHashInline(h) => {
289 let mut r = 0usize;
290 for f in fields {
291 if h.try_remove(f) {
292 r += 1;
293 }
294 }
295 let drop_now = h.is_empty();
296 (r, 0i64, drop_now)
297 }
298 _ => return Err(StoreError::WrongType),
299 }
300 };
301 if drop_key {
302 self.remove_entry(key);
303 } else {
304 self.account_delta(key, delta);
305 }
306 Ok(removed)
307 }
308
309 pub fn hincrby(&mut self, key: &[u8], field: &[u8], delta: i64) -> Result<i64, StoreError> {
311 let (next, weight_delta) = {
312 let h = self.hash_mut(key, true)?.expect("created");
313 let cur = match h.get(field) {
314 Some(v) => parse_i64(v).ok_or(StoreError::NotInteger)?,
315 None => 0,
316 };
317 let next = cur.checked_add(delta).ok_or(StoreError::Overflow)?;
318 let new_bytes = next.to_string().into_bytes();
319 let smb = SmallBytes::from_slice(field);
320 let new_field_w = hash_field_weight(&smb, new_bytes.len()) as i64;
321 let new_value_len = new_bytes.len();
322 let wd = match h.insert(smb, new_bytes) {
323 None => new_field_w,
324 Some(old) => new_value_len as i64 - old.len() as i64,
325 };
326 (next, wd)
327 };
328 self.account_delta(key, weight_delta);
329 Ok(next)
330 }
331
332 fn hset_one(
336 &mut self,
337 key: &[u8],
338 field: &[u8],
339 value: &[u8],
340 ) -> Result<HsetOutcome, StoreError> {
341 if self.hash_value_for_set(key)?.is_none() {
343 return Ok(self.hset_create(key, field, value));
344 }
345 let v = self.hash_value_for_set(key)?.expect("present and a hash");
346 match v {
347 Value::SmallHashInline(h) => match h.try_set(field, value) {
348 HAddResult::Added => Ok(HsetOutcome::AddedInline),
349 HAddResult::Updated => Ok(HsetOutcome::UpdatedInline),
350 HAddResult::NoRoom => {
351 let mut promoted = small_hash::promote(h);
354 let smb = SmallBytes::from_slice(field);
355 let new_w = hash_field_weight(&smb, value.len()) as i64;
356 let added = !promoted.contains_key(field);
357 let prior_v_len = promoted.get(field).map_or(0, Vec::len);
358 promoted.insert(smb, value.to_vec());
359 *v = Value::Hash(Arc::new(promoted));
360 self.reweigh_entry(key);
361 if added {
362 Ok(HsetOutcome::AddedHeap(new_w))
363 } else {
364 Ok(HsetOutcome::UpdatedHeap(value.len() as i64 - prior_v_len as i64))
365 }
366 }
367 },
368 Value::Hash(h) => {
369 let h = Arc::make_mut(h);
370 let smb = SmallBytes::from_slice(field);
371 let new_w = hash_field_weight(&smb, value.len()) as i64;
372 let new_value_len = value.len();
373 match h.insert(smb, value.to_vec()) {
374 None => Ok(HsetOutcome::AddedHeap(new_w)),
375 Some(old) => {
376 Ok(HsetOutcome::UpdatedHeap(new_value_len as i64 - old.len() as i64))
377 }
378 }
379 }
380 _ => Err(StoreError::WrongType),
381 }
382 }
383
384 fn hset_create(&mut self, key: &[u8], field: &[u8], value: &[u8]) -> HsetOutcome {
387 if let Some(inline) = SmallHashData::with_one(field, value) {
388 self.insert_entry(
389 SmallBytes::from_slice(key),
390 Entry::new(Value::SmallHashInline(inline), None),
391 );
392 HsetOutcome::AddedInline
395 } else {
396 let smb_f = SmallBytes::from_slice(field);
397 let mut h = HashData::with_capacity(1);
398 h.insert(smb_f, value.to_vec());
399 self.insert_entry(
400 SmallBytes::from_slice(key),
401 Entry::new(Value::Hash(Arc::new(h)), None),
402 );
403 HsetOutcome::AddedInline
404 }
405 }
406
407}
408
409enum HsetOutcome {
410 AddedInline,
412 UpdatedInline,
414 AddedHeap(i64),
416 UpdatedHeap(i64),
418}