1use std::iter::FromIterator;
18use std::time::{SystemTime, Duration};
19use std::collections::VecDeque;
20
21use indexmap::map::Entry;
22
23use super::container::Container;
24use super::container::ContainerPtr;
25use super::container::ContainerImpl;
26
27type Key = super::Key;
28type Value = super::Value;
29type Arguments = super::Arguments;
30type ExecResult = super::ExecResult;
31
32type Inner = Vec<u8>;
33
34#[derive(Clone, Copy)]
35enum BitOperation {
36 And,
37 Or,
38 Xor,
39 Not,
40}
41
42impl std::str::FromStr for BitOperation {
43 type Err = String;
44
45 fn from_str(op: &str) -> Result<Self, Self::Err> {
46 match &op.to_lowercase()[..] {
47 "and" => Ok(BitOperation::And),
48 "or" => Ok(BitOperation::Or),
49 "xor" => Ok(BitOperation::Xor),
50 "not" => Ok(BitOperation::Not),
51 op@_ => Err(format!("Unsupported operation '{}'", op)),
52 }
53 }
54}
55
56fn inner_parse<T>(cnt: &Inner, def: T) -> Result<T, String>
57where T: std::str::FromStr,
58 <T as std::str::FromStr>::Err: std::fmt::Display
59{
60 if cnt.len() == 0 {
61 Ok(def)
62 } else {
63 let str = std::str::from_utf8(cnt).map_err(|e|format!("{}", e))?;
64 str.parse::<T>().map_err(|e|format!("{}", e))
65 }
66}
67
68
69impl super::Storage {
70 fn strings_container_factory() -> Container {
71 Container::Strings(ContainerImpl::<Inner>::new())
72 }
73 async fn strings_get_container(&self, key: Key) -> ContainerPtr {
74 self.get_container(key, Self::strings_container_factory).await
75 }
76 async fn strings_get_containers(&self, keys: Vec<Key>) -> Vec<ContainerPtr> {
77 self.get_containers(keys, Self::strings_container_factory).await
78 }
79 async fn strings_try_get_containers(&self, keys: &Vec<Key>) -> Vec<Option<ContainerPtr>> {
80 self.try_get_containers(keys).await
81 }
82 fn strings_unwrap_container(container: &Container) -> Result<&ContainerImpl<Inner>, String> {
83 match container {
84 Container::Strings(ref c) => Ok(c),
85 _ => Err(format!("Unexpected container type")),
86 }
87 }
88 fn strings_unwrap_mut_container(container: &mut Container) -> Result<&mut ContainerImpl<Inner>, String> {
89 match container {
90 Container::Strings(ref mut c) => Ok(c),
91 _ => Err(format!("Unexpected container type")),
92 }
93 }
94 async fn strings_lock<F: FnOnce(&Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
95 let c1 = self.strings_get_container(key).await;
96 let c2 = c1.lock().await;
97 let c3 = Self::strings_unwrap_container(&c2)?;
98 processor(&c3.inner)
99 }
100 async fn strings_lock_mut<F: FnOnce(&mut Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
101 let c1 = self.strings_get_container(key).await;
102 let mut c2 = c1.lock().await;
103 let c3 = Self::strings_unwrap_mut_container(&mut c2)?;
104 processor(&mut c3.inner)
105 }
106
107 async fn strings_locks<F>(&self, write_keys: Vec<Key>, read_keys: &Vec<Key>, callback: F) -> ExecResult
108 where F: FnOnce(VecDeque<&mut ContainerImpl<Inner>>, VecDeque<Option<&ContainerImpl<Inner>>>) -> ExecResult {
109 let write_containers = self.strings_get_containers(write_keys).await;
110 let read_containers = self.strings_try_get_containers(read_keys).await;
111 let writes = write_containers.iter().map(|x|x.as_ref());
112 let reads = read_containers.iter().map(|x|{
113 match x {
114 None => None,
115 Some(x) => Some(x.as_ref()),
116 }
117 });
118 let (mut writes, reads) = Self::lock_all(writes, reads).await;
119
120 let mut out_writes = VecDeque::with_capacity(writes.len());
121 for g in &mut writes {
122 out_writes.push_back(Self::strings_unwrap_mut_container(&mut **g)?);
123 }
124 let mut out_reads = VecDeque::with_capacity(reads.len());
125 for g in &reads {
126 match g {
127 None => out_reads.push_back(None),
128 Some(g) => out_reads.push_back(Some(Self::strings_unwrap_container(&**g)?)),
129 }
130 }
131
132 callback(out_writes, out_reads)
133 }
134
135 pub async fn strings_append(&self, mut args: Arguments) -> ExecResult {
136 let key = Self::extract_key(args.pop_front())?;
137 let value = Self::extract_buffer(args.pop_front())?;
138 self.strings_lock_mut(key, |cnt| -> ExecResult {
139 cnt.append(&mut value.into_iter().collect());
140 Ok(Value::Integer(cnt.len() as i64))
141 }).await
142 }
143
144 pub async fn strings_get(&self, mut args: Arguments) -> ExecResult {
145 let key = Self::extract_key(args.pop_front())?;
146 self.strings_locks(vec![], &vec![key], |_, mut cnts| -> ExecResult {
147 let cnt = cnts.remove(0).expect("option should be exists, but not");
148 match cnt {
149 Some(cnt) => Ok(Value::Buffer(cnt.inner.clone())),
150 None => Ok(Value::Nill),
151 }
152 }).await
153 }
154
155 pub async fn strings_set(&mut self, mut args: Arguments) -> ExecResult {
156 let key = Self::extract_key(args.pop_front())?;
157 let value = Self::extract_buffer(args.pop_front())?;
158
159 let mut keepttl = false;
160 let mut expire: Option<SystemTime> = None;
161 let mut set_if_exists: Option<bool> = None;
162
163 while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
164 match &subcmd.to_uppercase()[..] {
165 "KEEPTTL" => keepttl = true,
166 "XX" => set_if_exists = Some(true),
167 "NX" => set_if_exists = Some(false),
168 "EX" => expire = Some(SystemTime::now() + Duration::from_secs(Self::extract_unsigned_integer(args.pop_front())?)),
169 "PX" => expire = Some(SystemTime::now() + Duration::from_millis(Self::extract_unsigned_integer(args.pop_front())?)),
170 arg => return Err(format!("Unexpected argument '{}'", arg)),
171 }
172 }
173
174 let mut cnt = ContainerImpl::<Inner>::new();
175 cnt.inner = value;
176 if ! keepttl {
177 cnt.expiration_time = None;
178 }
179 if let Some(expire) = expire {
180 cnt.expiration_time = Some(expire);
181 }
182 let cnt = Self::make_container(Container::Strings(cnt));
183
184 let mut containers = self.containers.lock().await;
185 let entry = containers.entry(key.clone());
186 let result = match (set_if_exists, entry) {
187 (None, Entry::Vacant(e)) | (Some(false), Entry::Vacant(e)) => {
188 e.insert(cnt);
189 Ok(Value::Ok)
190 },
191 (None, Entry::Occupied(mut e)) | (Some(true), Entry::Occupied(mut e)) => {
192 *e.get_mut() = cnt;
193 Ok(Value::Ok)
194 },
195 _ => Ok(Value::Nill),
196 };
197 drop(containers);
198
199 if let (Ok(Value::Ok), Some(timepoint)) = (result.clone(), expire) {
200 self.expire_key_at(&key, timepoint).await;
201 }
202 result
203 }
204
205 pub async fn strings_setex_impl(&mut self, key: Key, timepoint: SystemTime, value: Vec<u8>) -> ExecResult {
206 let cnt = self.strings_get_container(key.clone()).await;
207 let mut cnt = cnt.lock().await;
208 let mut cnt = Self::strings_unwrap_mut_container(&mut cnt)?;
209
210 cnt.inner = value;
211 cnt.expiration_time = Some(timepoint);
212 drop(cnt);
213
214 self.expire_key_at(&key, timepoint).await;
215 Ok(Value::Ok)
216 }
217
218 pub async fn strings_setex(&mut self, mut args: Arguments) -> ExecResult {
219 let key = Self::extract_key(args.pop_front())?;
220 let seconds = Self::extract_unsigned_integer(args.pop_front())?;
221 let value = Self::extract_buffer(args.pop_front())?;
222 let timepoint = SystemTime::now() + Duration::from_secs(seconds);
223 self.strings_setex_impl(key, timepoint, value).await
224 }
225
226 pub async fn strings_psetex(&mut self, mut args: Arguments) -> ExecResult {
227 let key = Self::extract_key(args.pop_front())?;
228 let millis = Self::extract_unsigned_integer(args.pop_front())?;
229 let value = Self::extract_buffer(args.pop_front())?;
230 let timepoint = SystemTime::now() + Duration::from_millis(millis);
231 self.strings_setex_impl(key, timepoint, value).await
232 }
233
234 pub async fn strings_setnx(&mut self, mut args: Arguments) -> ExecResult {
235 let key = Self::extract_key(args.pop_front())?;
236 let value = Self::extract_buffer(args.pop_front())?;
237
238 let mut cnt = ContainerImpl::<Inner>::new();
239 cnt.inner = value;
240 cnt.expiration_time = None;
241 let cnt = Self::make_container(Container::Strings(cnt));
242
243 let mut containers = self.containers.lock().await;
244 match containers.entry(key.clone()) {
245 Entry::Occupied(_) => Ok(Value::Bool(false)),
246 Entry::Vacant(e) => {
247 e.insert(cnt);
248 Ok(Value::Bool(true))
249 },
250 }
251 }
252
253 pub async fn strings_getset(&self, mut args: Arguments) -> ExecResult {
254 let key = Self::extract_key(args.pop_front())?;
255 let value = Self::extract_buffer(args.pop_front())?;
256 let mut value: Inner = value.into();
257 self.strings_locks(vec![key], &vec![], |mut cnt, _| {
258 let mut cnt = cnt.remove(0).expect("key should be created, but not");
259 cnt.expiration_time = None;
260 std::mem::swap(&mut cnt.inner, &mut value);
261 Ok(Value::Nill)
262 }).await.unwrap();
263 Ok(Value::Buffer(value.into()))
264 }
265
266 pub async fn strings_len(&self, mut args: Arguments) -> ExecResult {
267 let key = Self::extract_key(args.pop_front())?;
268 self.strings_lock(key, |cnt| -> ExecResult {
269 Ok(Value::Integer(cnt.len() as i64))
270 }).await
271 }
272
273 pub async fn strings_incrby(&self, mut args: Arguments) -> ExecResult {
274 let key = Self::extract_key(args.pop_front())?;
275 let value = if let Ok(value) = Self::extract_integer(args.pop_front()) {value} else {1};
276 self.strings_lock_mut(key, |cnt| -> ExecResult {
277 let number = inner_parse::<i64>(cnt, 0)?;
278 let number = number + value;
279 *cnt = format!("{}", number).as_bytes().to_vec();
280 Ok(Value::Integer(number))
281 }).await
282 }
283
284 pub async fn strings_decrby(&self, mut args: Arguments) -> ExecResult {
285 let key = Self::extract_key(args.pop_front())?;
286 let value = if let Ok(value) = Self::extract_integer(args.pop_front()) {value} else {1};
287 self.strings_lock_mut(key, |cnt| -> ExecResult {
288 let number = inner_parse::<i64>(cnt, 0)?;
289 let number = number - value;
290 *cnt = format!("{}", number).as_bytes().to_vec();
291 Ok(Value::Integer(number))
292 }).await
293 }
294
295 pub async fn strings_incrby_float(&self, mut args: Arguments) -> ExecResult {
296 let key = Self::extract_key(args.pop_front())?;
297 let value = if let Ok(value) = Self::extract_integer(args.pop_front()) {value} else {1};
298 self.strings_lock_mut(key, |cnt| -> ExecResult {
299 let number = if cnt.len() == 0 {0} else {std::str::from_utf8(cnt).map_err(|e|format!("{}", e))?.parse::<i64>().map_err(|e|format!("{}", e))?};
300 let number = number + value;
301 *cnt = format!("{}", number).as_bytes().to_vec();
302 Ok(Value::Integer(number))
303 }).await
304 }
305
306 pub async fn strings_bitcount(&self, mut args: Arguments) -> ExecResult {
307 static BITCOUNTMAP: [u8; 256] = [0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8];
308
309 let key = Self::extract_key(args.pop_front())?;
310 let start = if let Ok(start) = Self::extract_integer(args.pop_front()) {start} else {0};
311 let end = if let Ok(end) = Self::extract_integer(args.pop_front()) {end} else {-1};
312 self.strings_lock(key, |cnt| -> ExecResult {
313 let start = if start >= 0 {start} else {cnt.len() as i64 + start} as usize;
314 let end = 1 + if end >= 0 { end} else {cnt.len() as i64 + end} as usize;
315 if start >= cnt.len() || start >= end {
316 return Ok(Value::Integer(0));
317 }
318 let sum: u64 = cnt
319 .iter()
320 .skip(start)
321 .take(end - start)
322 .map(|ch|BITCOUNTMAP[*ch as usize] as u64)
323 .sum();
324 Ok(Value::Integer(sum as i64))
325 }).await
326 }
327
328 pub async fn strings_mget(&self, mut args: Arguments) -> ExecResult {
329 let keys = args.drain(..).filter_map(|a|Self::extract_key(Some(a)).ok()).collect();
330 self.strings_locks(vec![], &keys, |_, cnts| {
331 let mut out = VecDeque::with_capacity(cnts.len());
332 for cnt in cnts {
333 match cnt {
334 Some(cnt) => out.push_back(Value::Buffer(cnt.inner.clone())),
335 None => out.push_back(Value::Nill),
336 }
337 }
338 Ok(Value::Array(out))
339 }).await
340 }
341
342 pub async fn strings_mset(&self, mut args: Arguments) -> ExecResult {
343 let mut keys = Vec::with_capacity(args.len() / 2);
344 let mut values = VecDeque::with_capacity(args.len() / 2);
345 while args.len() > 1 {
346 if let Ok(key) = Self::extract_key(args.pop_front()) {
347 keys.push(key);
348 let value = Self::extract_buffer(args.pop_front())?;
349 values.push_back(value);
350 }
351 }
352 self.strings_locks(keys, &vec![], |cnts, _| {
353 for mut cnt in cnts {
354 cnt.inner = values.pop_front().unwrap();
355 cnt.expiration_time = None;
356 }
357 Ok(Value::Ok)
358 }).await
359 }
360
361 pub async fn strings_bitop(&self, mut args: Arguments) -> ExecResult {
362 match Self::extract_string(args.pop_front())?.parse::<BitOperation>()? {
363 BitOperation::Not => self.strings_bitop_not(args).await,
364 op => self.strings_bitop_cmn(op, args).await,
365 }
366 }
367
368 async fn strings_bitop_not(&self, mut args: Arguments) -> ExecResult {
369 let dest = Self::extract_key(args.pop_front())?;
370 let src = Self::extract_key(args.pop_front())?;
371 self.strings_locks(vec![dest], &vec![src], |mut dest, mut cnts| {
372 let dest = dest.remove(0).ok_or("BITOP NOT dst src")?;
373 let src = cnts.remove(0).ok_or("BITOP NOT dst src")?;
374
375 dest.expiration_time = None;
376 dest.inner = match src {
377 Some(src) => Vec::from_iter(src.inner.iter().map(|ch|!*ch)),
378 None => Vec::new(),
379 };
380 Ok(Value::Integer(dest.inner.len() as i64))
381 }).await
382 }
383
384 async fn strings_bitop_cmn(&self, operation: BitOperation, mut args: Arguments) -> ExecResult {
385 let dest = Self::extract_key(args.pop_front())?;
386 let keys = args.drain(..).filter_map(|a|Self::extract_key(Some(a)).ok()).collect();
387
388 self.strings_locks(vec![dest], &keys, |mut dest, mut cnts| {
389 let max_len = cnts.iter()
390 .map(|cnt|if cnt.is_none() {0} else {cnt.unwrap().inner.len()})
391 .max().unwrap_or(0);
392 let min_len = cnts.iter()
393 .map(|cnt|if cnt.is_none() {0} else {cnt.unwrap().inner.len()})
394 .min().unwrap_or(0);
395
396 let unexpected_cnts_error = "BITOP <OPERATION> dst src [[src]]";
397 let dest = dest.remove(0).ok_or(unexpected_cnts_error)?;
398 let src = cnts.remove(0).ok_or(unexpected_cnts_error)?;
399
400 dest.expiration_time = None;
401 dest.inner = match src {
402 Some(src) => src.inner.clone(),
403 None => Inner::with_capacity(max_len),
404 };
405 dest.inner.resize(max_len, 0);
406
407 match operation {
408 BitOperation::And => if min_len > 0 {
409 cnts.iter().filter_map(|cnt|cnt.as_ref())
410 .for_each(|cnt| {
411 for i in 0..min_len {
412 match (dest.inner.get_mut(i), cnt.inner.get(i)) {
413 (Some(d), Some(c)) => *d = *d & *c,
414 _ => panic!("Unexpected arm"),
415 }
416 }
417 });
418 },
419 op@BitOperation::Or | op@BitOperation::Xor => {
420 cnts.iter().filter_map(|cnt|cnt.as_ref())
421 .for_each(|cnt| {
422 for i in 0..cnt.inner.len() {
423 match (op, dest.inner.get_mut(i), cnt.inner.get(i)) {
424 (BitOperation::Or, Some(d), Some(c)) => *d = *d | *c,
425 (BitOperation::Xor, Some(d), Some(c)) => *d = *d ^ *c,
426 _ => panic!("Unexpected arm"),
427 }
428 }
429 });
430 },
431 BitOperation::Not => panic!("Unexpected arm"),
432 }
433 Ok(Value::Integer(dest.inner.len() as i64))
434 }).await
435 }
436
437 pub async fn strings_setbit(&self, mut args: Arguments) -> ExecResult {
438 let key = Self::extract_key(args.pop_front())?;
439 let offset = Self::extract_integer(args.pop_front())? as usize;
440 let bit = Self::extract_bit(args.pop_front())?;
441
442 if offset >= 2^32 {
443 return Err("offset is out of range [0; 2^32)".to_owned());
444 }
445 let byte_index = offset / 8;
446 let bit_index = offset % 8;
447 let mut mask = 0b1000_0000;
448 mask >>= bit_index;
449
450 self.strings_lock_mut(key, |cnt| -> ExecResult {
451 if byte_index >= cnt.len() {
452 cnt.resize(1 + byte_index, 0);
453 }
454 let byte = cnt.get_mut(byte_index).unwrap();
455 let original = *byte & mask;
456 if bit {
457 *byte = *byte | mask;
458 } else {
459 *byte = *byte & !mask;
460 }
461 match original {
462 0 => Ok(Value::Bool(false)),
463 _ => Ok(Value::Bool(true)),
464 }
465 }).await
466 }
467
468 pub async fn strings_getbit(&self, mut args: Arguments) -> ExecResult {
469 let key = Self::extract_key(args.pop_front())?;
470 let offset = Self::extract_integer(args.pop_front())? as usize;
471 if offset >= 2^32 {
472 return Err("offset is out of range [0; 2^32)".to_owned());
473 }
474 let byte_index = offset / 8;
475 let bit_index = offset % 8;
476 let mut mask = 0b1000_0000;
477 mask >>= bit_index;
478
479 self.strings_lock(key, |cnt| -> ExecResult {
480 if byte_index >= cnt.len() {
481 return Ok(Value::Bool(false));
482 }
483 let byte = cnt.get(byte_index).unwrap();
484 let bit = *byte & mask;
485 match bit {
486 0 => Ok(Value::Bool(false)),
487 _ => Ok(Value::Bool(true)),
488 }
489 }).await
490 }
491
492 pub async fn strings_get_range(&self, mut args: Arguments) -> ExecResult {
493 let key = Self::extract_key(args.pop_front())?;
494 let start = Self::extract_integer(args.pop_front())?;
495 let end = Self::extract_integer(args.pop_front())?;
496 self.strings_lock(key, |cnt| -> ExecResult {
497 let start = if start >= 0 {start} else {cnt.len() as i64 + start} as usize;
498 let end = 1 + if end >= 0 { end} else {cnt.len() as i64 + end} as usize;
499 if start >= cnt.len() || start >= end {
500 return Ok(Value::Buffer(vec![]));
501 }
502 let iter = cnt
503 .iter()
504 .skip(start)
505 .take(end - start);
506 Ok(Value::Buffer(Vec::from_iter(iter.cloned())))
507 }).await
508 }
509
510 pub async fn strings_set_range(&self, mut args: Arguments) -> ExecResult {
511 let key = Self::extract_key(args.pop_front())?;
512 let start = Self::extract_index(args.pop_front())?;
513 let value = Self::extract_buffer(args.pop_front())?;
514 let end = start + value.len();
515
516 self.strings_lock_mut(key, |cnt| -> ExecResult {
517 if cnt.len() < end {
518 cnt.resize(end, 0);
519 }
520 cnt[start..end].copy_from_slice(&value[..]);
521 Ok(Value::Integer(cnt.len() as i64))
522 }).await
523 }
524}
525