radish_database/
strings.rs

1/* Copyright (c) 2020 Dmitry Shatilov <shatilov dot diman at gmail dot com>
2 * 
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Affero General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 * GNU Affero General Public License for more details.
12
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 */
16
17use std::iter::FromIterator;
18use std::time::{SystemTime, Duration};
19use std::collections::VecDeque;
20
21use indexmap::map::Entry;
22
23use super::container::Container;
24use super::container::ContainerPtr;
25use super::container::ContainerImpl;
26
27type Key = super::Key;
28type Value = super::Value;
29type Arguments = super::Arguments;
30type ExecResult = super::ExecResult;
31
32type Inner = Vec<u8>;
33
34#[derive(Clone, Copy)]
35enum BitOperation {
36	And,
37	Or,
38	Xor,
39	Not,
40}
41
42impl std::str::FromStr for BitOperation {
43	type Err = String;
44
45	fn from_str(op: &str) -> Result<Self, Self::Err> {
46		match &op.to_lowercase()[..] {
47			"and" => Ok(BitOperation::And),
48			"or" =>  Ok(BitOperation::Or),
49			"xor" => Ok(BitOperation::Xor),
50			"not" => Ok(BitOperation::Not),
51			op@_ => Err(format!("Unsupported operation '{}'", op)),
52		}
53	}
54}
55
56fn inner_parse<T>(cnt: &Inner, def: T) -> Result<T, String>
57where	T: std::str::FromStr,
58	<T as std::str::FromStr>::Err: std::fmt::Display
59{
60	if cnt.len() == 0 {
61		Ok(def)
62	} else {
63		let str = std::str::from_utf8(cnt).map_err(|e|format!("{}", e))?;
64		str.parse::<T>().map_err(|e|format!("{}", e))
65	}
66}
67
68
69impl super::Storage {
70	fn strings_container_factory() -> Container {
71		Container::Strings(ContainerImpl::<Inner>::new())
72	}
73	async fn strings_get_container(&self, key: Key) -> ContainerPtr {
74		self.get_container(key, Self::strings_container_factory).await
75	}
76	async fn strings_get_containers(&self, keys: Vec<Key>) -> Vec<ContainerPtr> {
77		self.get_containers(keys, Self::strings_container_factory).await
78	}
79	async fn strings_try_get_containers(&self, keys: &Vec<Key>) -> Vec<Option<ContainerPtr>> {
80		self.try_get_containers(keys).await
81	}
82	fn strings_unwrap_container(container: &Container) -> Result<&ContainerImpl<Inner>, String> {
83		match container {
84			Container::Strings(ref c) => Ok(c),
85			_ => Err(format!("Unexpected container type")),
86		}
87	}
88	fn strings_unwrap_mut_container(container: &mut Container) -> Result<&mut ContainerImpl<Inner>, String> {
89		match container {
90			Container::Strings(ref mut c) => Ok(c),
91			_ => Err(format!("Unexpected container type")),
92		}
93	}
94	async fn strings_lock<F: FnOnce(&Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
95		let c1 = self.strings_get_container(key).await;
96		let c2 = c1.lock().await;
97		let c3 = Self::strings_unwrap_container(&c2)?;
98		processor(&c3.inner)
99	}
100	async fn strings_lock_mut<F: FnOnce(&mut Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
101		let c1 = self.strings_get_container(key).await;
102		let mut c2 = c1.lock().await;
103		let c3 = Self::strings_unwrap_mut_container(&mut c2)?;
104		processor(&mut c3.inner)
105	}
106
107	async fn strings_locks<F>(&self, write_keys: Vec<Key>, read_keys: &Vec<Key>, callback: F) -> ExecResult
108	where F: FnOnce(VecDeque<&mut ContainerImpl<Inner>>, VecDeque<Option<&ContainerImpl<Inner>>>) -> ExecResult {
109		let write_containers = self.strings_get_containers(write_keys).await;
110		let read_containers = self.strings_try_get_containers(read_keys).await;
111		let writes = write_containers.iter().map(|x|x.as_ref());
112		let reads = read_containers.iter().map(|x|{
113			match x {
114				None => None,
115				Some(x) => Some(x.as_ref()),
116			}
117		});
118		let (mut writes, reads) = Self::lock_all(writes, reads).await;
119
120		let mut out_writes = VecDeque::with_capacity(writes.len());
121		for g in &mut writes {
122			out_writes.push_back(Self::strings_unwrap_mut_container(&mut **g)?);
123		}
124		let mut out_reads = VecDeque::with_capacity(reads.len());
125		for g in &reads {
126			match g {
127				None => out_reads.push_back(None),
128				Some(g) => out_reads.push_back(Some(Self::strings_unwrap_container(&**g)?)),
129			}
130		}
131
132		callback(out_writes, out_reads)
133	}
134
135	pub async fn strings_append(&self, mut args: Arguments) -> ExecResult {
136		let key = Self::extract_key(args.pop_front())?;
137		let value = Self::extract_buffer(args.pop_front())?;
138		self.strings_lock_mut(key, |cnt| -> ExecResult {
139			cnt.append(&mut value.into_iter().collect());
140			Ok(Value::Integer(cnt.len() as i64))
141		}).await
142	}
143
144	pub async fn strings_get(&self, mut args: Arguments) -> ExecResult {
145		let key = Self::extract_key(args.pop_front())?;
146		self.strings_locks(vec![], &vec![key], |_, mut cnts| -> ExecResult {
147			let cnt = cnts.remove(0).expect("option should be exists, but not");
148			match cnt {
149				Some(cnt) => Ok(Value::Buffer(cnt.inner.clone())),
150				None => Ok(Value::Nill),
151			}
152		}).await
153	}
154
155	pub async fn strings_set(&mut self, mut args: Arguments) -> ExecResult {
156		let key = Self::extract_key(args.pop_front())?;
157		let value = Self::extract_buffer(args.pop_front())?;
158
159		let mut keepttl = false;
160		let mut expire: Option<SystemTime> = None;
161		let mut set_if_exists: Option<bool> = None;
162
163		while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
164			match &subcmd.to_uppercase()[..] {
165				"KEEPTTL" => keepttl = true,
166				"XX" => set_if_exists = Some(true),
167				"NX" => set_if_exists = Some(false),
168				"EX" => expire = Some(SystemTime::now() + Duration::from_secs(Self::extract_unsigned_integer(args.pop_front())?)),
169				"PX" => expire = Some(SystemTime::now() + Duration::from_millis(Self::extract_unsigned_integer(args.pop_front())?)),
170				arg => return Err(format!("Unexpected argument '{}'", arg)),
171			}
172		}
173
174		let mut cnt = ContainerImpl::<Inner>::new();
175		cnt.inner = value;
176		if ! keepttl {
177			cnt.expiration_time = None;
178		}
179		if let Some(expire) = expire {
180			cnt.expiration_time = Some(expire);
181		}
182		let cnt = Self::make_container(Container::Strings(cnt));
183
184		let mut containers = self.containers.lock().await;
185		let entry = containers.entry(key.clone());
186		let result = match (set_if_exists, entry) {
187			(None, Entry::Vacant(e)) | (Some(false), Entry::Vacant(e)) => {
188				e.insert(cnt);
189				Ok(Value::Ok)
190			},
191			(None, Entry::Occupied(mut e)) | (Some(true), Entry::Occupied(mut e)) => {
192				*e.get_mut() = cnt;
193				Ok(Value::Ok)
194			},
195			_ => Ok(Value::Nill),
196		};
197		drop(containers);
198
199		if let (Ok(Value::Ok), Some(timepoint)) = (result.clone(), expire) {
200			self.expire_key_at(&key, timepoint).await;
201		}
202		result
203	}
204
205	pub async fn strings_setex_impl(&mut self, key: Key, timepoint: SystemTime, value: Vec<u8>) -> ExecResult {
206		let cnt = self.strings_get_container(key.clone()).await;
207		let mut cnt = cnt.lock().await;
208		let mut cnt = Self::strings_unwrap_mut_container(&mut cnt)?;
209
210		cnt.inner = value;
211		cnt.expiration_time = Some(timepoint);
212		drop(cnt);
213
214		self.expire_key_at(&key, timepoint).await;
215		Ok(Value::Ok)
216	}
217
218	pub async fn strings_setex(&mut self, mut args: Arguments) -> ExecResult {
219		let key = Self::extract_key(args.pop_front())?;
220		let seconds = Self::extract_unsigned_integer(args.pop_front())?;
221		let value = Self::extract_buffer(args.pop_front())?;
222		let timepoint = SystemTime::now() + Duration::from_secs(seconds);
223		self.strings_setex_impl(key, timepoint, value).await
224	}
225
226	pub async fn strings_psetex(&mut self, mut args: Arguments) -> ExecResult {
227		let key = Self::extract_key(args.pop_front())?;
228		let millis = Self::extract_unsigned_integer(args.pop_front())?;
229		let value = Self::extract_buffer(args.pop_front())?;
230		let timepoint = SystemTime::now() + Duration::from_millis(millis);
231		self.strings_setex_impl(key, timepoint, value).await
232	}
233
234	pub async fn strings_setnx(&mut self, mut args: Arguments) -> ExecResult {
235		let key = Self::extract_key(args.pop_front())?;
236		let value = Self::extract_buffer(args.pop_front())?;
237
238		let mut cnt = ContainerImpl::<Inner>::new();
239		cnt.inner = value;
240		cnt.expiration_time = None;
241		let cnt = Self::make_container(Container::Strings(cnt));
242
243		let mut containers = self.containers.lock().await;
244		match containers.entry(key.clone()) {
245			Entry::Occupied(_) => Ok(Value::Bool(false)),
246			Entry::Vacant(e) => {
247				e.insert(cnt);
248				Ok(Value::Bool(true))
249			},
250		}
251	}
252
253	pub async fn strings_getset(&self, mut args: Arguments) -> ExecResult {
254		let key = Self::extract_key(args.pop_front())?;
255		let value = Self::extract_buffer(args.pop_front())?;
256		let mut value: Inner = value.into();
257		self.strings_locks(vec![key], &vec![], |mut cnt, _| {
258			let mut cnt = cnt.remove(0).expect("key should be created, but not");
259			cnt.expiration_time = None;
260			std::mem::swap(&mut cnt.inner, &mut value);
261			Ok(Value::Nill)
262		}).await.unwrap();
263		Ok(Value::Buffer(value.into()))
264	}
265
266	pub async fn strings_len(&self, mut args: Arguments) -> ExecResult {
267		let key = Self::extract_key(args.pop_front())?;
268		self.strings_lock(key, |cnt| -> ExecResult {
269			Ok(Value::Integer(cnt.len() as i64))
270		}).await
271	}
272
273	pub async fn strings_incrby(&self, mut args: Arguments) -> ExecResult {
274		let key = Self::extract_key(args.pop_front())?;
275		let value = if let Ok(value) = Self::extract_integer(args.pop_front()) {value} else {1};
276		self.strings_lock_mut(key, |cnt| -> ExecResult {
277			let number = inner_parse::<i64>(cnt, 0)?;
278			let number = number + value;
279			*cnt = format!("{}", number).as_bytes().to_vec();
280			Ok(Value::Integer(number))
281		}).await
282	}
283
284	pub async fn strings_decrby(&self, mut args: Arguments) -> ExecResult {
285		let key = Self::extract_key(args.pop_front())?;
286		let value = if let Ok(value) = Self::extract_integer(args.pop_front()) {value} else {1};
287		self.strings_lock_mut(key, |cnt| -> ExecResult {
288			let number = inner_parse::<i64>(cnt, 0)?;
289			let number = number - value;
290			*cnt = format!("{}", number).as_bytes().to_vec();
291			Ok(Value::Integer(number))
292		}).await
293	}
294
295	pub async fn strings_incrby_float(&self, mut args: Arguments) -> ExecResult {
296		let key = Self::extract_key(args.pop_front())?;
297		let value = if let Ok(value) = Self::extract_integer(args.pop_front()) {value} else {1};
298		self.strings_lock_mut(key, |cnt| -> ExecResult {
299			let number = if cnt.len() == 0 {0} else {std::str::from_utf8(cnt).map_err(|e|format!("{}", e))?.parse::<i64>().map_err(|e|format!("{}", e))?};
300			let number = number + value;
301			*cnt = format!("{}", number).as_bytes().to_vec();
302			Ok(Value::Integer(number))
303		}).await
304	}
305
306	pub async fn strings_bitcount(&self, mut args: Arguments) -> ExecResult {
307		static BITCOUNTMAP: [u8; 256] = [0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8];
308
309		let key = Self::extract_key(args.pop_front())?;
310		let start = if let Ok(start) = Self::extract_integer(args.pop_front()) {start} else {0};
311		let end = if let Ok(end) = Self::extract_integer(args.pop_front()) {end} else {-1};
312		self.strings_lock(key, |cnt| -> ExecResult {
313			let start =     if start >= 0 {start} else {cnt.len() as i64 + start} as usize;
314			let end   = 1 + if end   >= 0 {  end} else {cnt.len() as i64 +   end} as usize;
315			if start >= cnt.len() || start >= end {
316				return Ok(Value::Integer(0));
317			}
318			let sum: u64 = cnt
319				.iter()
320				.skip(start)
321				.take(end - start)
322				.map(|ch|BITCOUNTMAP[*ch as usize] as u64)
323				.sum();
324			Ok(Value::Integer(sum as i64))
325		}).await
326	}
327
328	pub async fn strings_mget(&self, mut args: Arguments) -> ExecResult {
329		let keys = args.drain(..).filter_map(|a|Self::extract_key(Some(a)).ok()).collect();
330		self.strings_locks(vec![], &keys, |_, cnts| {
331			let mut out = VecDeque::with_capacity(cnts.len());
332			for cnt in cnts {
333				match cnt {
334					Some(cnt) => out.push_back(Value::Buffer(cnt.inner.clone())),
335					None => out.push_back(Value::Nill),
336				}
337			}
338			Ok(Value::Array(out))
339		}).await
340	}
341
342	pub async fn strings_mset(&self, mut args: Arguments) -> ExecResult {
343		let mut keys = Vec::with_capacity(args.len() / 2);
344		let mut values = VecDeque::with_capacity(args.len() / 2);
345		while args.len() > 1 {
346			if let Ok(key) = Self::extract_key(args.pop_front()) {
347				keys.push(key);
348				let value = Self::extract_buffer(args.pop_front())?;
349				values.push_back(value);
350			}
351		}
352		self.strings_locks(keys, &vec![], |cnts, _| {
353			for mut cnt in cnts {
354				cnt.inner = values.pop_front().unwrap();
355				cnt.expiration_time = None;
356			}
357			Ok(Value::Ok)
358		}).await
359	}
360
361	pub async fn strings_bitop(&self, mut args: Arguments) -> ExecResult {
362		match Self::extract_string(args.pop_front())?.parse::<BitOperation>()? {
363			BitOperation::Not => self.strings_bitop_not(args).await,
364			op => self.strings_bitop_cmn(op, args).await,
365		}
366	}
367
368	async fn strings_bitop_not(&self, mut args: Arguments) -> ExecResult {
369		let dest = Self::extract_key(args.pop_front())?;
370		let src = Self::extract_key(args.pop_front())?;
371		self.strings_locks(vec![dest], &vec![src], |mut dest, mut cnts| {
372			let dest = dest.remove(0).ok_or("BITOP NOT dst src")?;
373			let src = cnts.remove(0).ok_or("BITOP NOT dst src")?;
374
375			dest.expiration_time = None;
376			dest.inner = match src {
377				Some(src) => Vec::from_iter(src.inner.iter().map(|ch|!*ch)),
378				None => Vec::new(),
379			};
380			Ok(Value::Integer(dest.inner.len() as i64))
381		}).await
382	}
383
384	async fn strings_bitop_cmn(&self, operation: BitOperation, mut args: Arguments) -> ExecResult {
385		let dest = Self::extract_key(args.pop_front())?;
386		let keys = args.drain(..).filter_map(|a|Self::extract_key(Some(a)).ok()).collect();
387
388		self.strings_locks(vec![dest], &keys, |mut dest, mut cnts| {
389			let max_len = cnts.iter()
390				.map(|cnt|if cnt.is_none() {0} else {cnt.unwrap().inner.len()})
391				.max().unwrap_or(0);
392			let min_len = cnts.iter()
393				.map(|cnt|if cnt.is_none() {0} else {cnt.unwrap().inner.len()})
394				.min().unwrap_or(0);
395
396			let unexpected_cnts_error = "BITOP <OPERATION> dst src [[src]]";
397			let dest = dest.remove(0).ok_or(unexpected_cnts_error)?;
398			let src = cnts.remove(0).ok_or(unexpected_cnts_error)?;
399
400			dest.expiration_time = None;
401			dest.inner = match src {
402				Some(src) => src.inner.clone(),
403				None => Inner::with_capacity(max_len),
404			};
405			dest.inner.resize(max_len, 0);
406
407			match operation {
408				BitOperation::And => if min_len > 0 {
409					cnts.iter().filter_map(|cnt|cnt.as_ref())
410					.for_each(|cnt| {
411						for i in 0..min_len {
412							match (dest.inner.get_mut(i), cnt.inner.get(i)) {
413								(Some(d), Some(c)) => *d = *d & *c,
414								_ => panic!("Unexpected arm"),
415							}
416						}
417					});
418				},
419				op@BitOperation::Or | op@BitOperation::Xor => {
420					cnts.iter().filter_map(|cnt|cnt.as_ref())
421					.for_each(|cnt| {
422						for i in 0..cnt.inner.len() {
423							match (op, dest.inner.get_mut(i), cnt.inner.get(i)) {
424								(BitOperation::Or,  Some(d), Some(c)) => *d = *d | *c,
425								(BitOperation::Xor, Some(d), Some(c)) => *d = *d ^ *c,
426								_ => panic!("Unexpected arm"),
427							}
428						}
429					});
430				},
431				BitOperation::Not => panic!("Unexpected arm"),
432			}
433			Ok(Value::Integer(dest.inner.len() as i64))
434		}).await
435	}
436
437	pub async fn strings_setbit(&self, mut args: Arguments) -> ExecResult {
438		let key = Self::extract_key(args.pop_front())?;
439		let offset = Self::extract_integer(args.pop_front())? as usize;
440		let bit = Self::extract_bit(args.pop_front())?;
441
442		if offset >= 2^32 {
443			return Err("offset is out of range [0; 2^32)".to_owned());
444		}
445		let byte_index = offset / 8;
446		let bit_index = offset % 8;
447		let mut mask = 0b1000_0000;
448		mask >>= bit_index;
449
450		self.strings_lock_mut(key, |cnt| -> ExecResult {
451			if byte_index >= cnt.len() {
452				cnt.resize(1 + byte_index, 0);
453			}
454			let byte = cnt.get_mut(byte_index).unwrap();
455			let original = *byte & mask;
456			if bit {
457				*byte = *byte | mask;
458			} else {
459				*byte = *byte & !mask;
460			}
461			match original {
462				0 => Ok(Value::Bool(false)),
463				_ => Ok(Value::Bool(true)),
464			}
465		}).await
466	}
467
468	pub async fn strings_getbit(&self, mut args: Arguments) -> ExecResult {
469		let key = Self::extract_key(args.pop_front())?;
470		let offset = Self::extract_integer(args.pop_front())? as usize;
471		if offset >= 2^32 {
472			return Err("offset is out of range [0; 2^32)".to_owned());
473		}
474		let byte_index = offset / 8;
475		let bit_index = offset % 8;
476		let mut mask = 0b1000_0000;
477		mask >>= bit_index;
478
479		self.strings_lock(key, |cnt| -> ExecResult {
480			if byte_index >= cnt.len() {
481				return Ok(Value::Bool(false));
482			}
483			let byte = cnt.get(byte_index).unwrap();
484			let bit = *byte & mask;
485			match bit {
486				0 => Ok(Value::Bool(false)),
487				_ => Ok(Value::Bool(true)),
488			}
489		}).await
490	}
491
492	pub async fn strings_get_range(&self, mut args: Arguments) -> ExecResult {
493		let key = Self::extract_key(args.pop_front())?;
494		let start = Self::extract_integer(args.pop_front())?;
495		let end = Self::extract_integer(args.pop_front())?;
496		self.strings_lock(key, |cnt| -> ExecResult {
497			let start =     if start >= 0 {start} else {cnt.len() as i64 + start} as usize;
498			let end   = 1 + if end   >= 0 {  end} else {cnt.len() as i64 +   end} as usize;
499			if start >= cnt.len() || start >= end {
500				return Ok(Value::Buffer(vec![]));
501			}
502			let iter = cnt
503				.iter()
504				.skip(start)
505				.take(end - start);
506			Ok(Value::Buffer(Vec::from_iter(iter.cloned())))
507		}).await
508	}
509
510	pub async fn strings_set_range(&self, mut args: Arguments) -> ExecResult {
511		let key = Self::extract_key(args.pop_front())?;
512		let start = Self::extract_index(args.pop_front())?;
513		let value = Self::extract_buffer(args.pop_front())?;
514		let end = start + value.len();
515
516		self.strings_lock_mut(key, |cnt| -> ExecResult {
517			if cnt.len() < end {
518				cnt.resize(end, 0);
519			}
520			cnt[start..end].copy_from_slice(&value[..]);
521			Ok(Value::Integer(cnt.len() as i64))
522		}).await
523	}
524}
525