radish_database/
hash.rs

1/* Copyright (c) 2020 Dmitry Shatilov <shatilov dot diman at gmail dot com>
2 * 
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Affero General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 * GNU Affero General Public License for more details.
12
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 */
16
17use std::collections::VecDeque;
18
19use indexmap::IndexMap;
20
21use super::container::Container;
22use super::container::ContainerPtr;
23use super::container::ContainerImpl;
24
25type Key = super::Key;
26type Value = super::Value;
27type Arguments = super::Arguments;
28type ExecResult = super::ExecResult;
29
30type Inner = IndexMap<Value, Value>;
31
32impl super::Storage {
33	async fn hash_get_container(&self, key: Key) -> ContainerPtr {
34		self.get_container(key, ||Container::Hash(ContainerImpl::<Inner>::new())).await
35	}
36	async fn _hash_try_get_container(&self, key: &Key) -> Option<ContainerPtr> {
37		self.try_get_container(key).await
38	}
39	async fn hash_unwrap_container(container: &Container) -> Result<&ContainerImpl<Inner>, String> {
40		match container {
41			Container::Hash(ref c) => Ok(c),
42			_ => Err(format!("Unexpected container type")),
43		}
44	}
45	async fn hash_unwrap_mut_container(container: &mut Container) -> Result<&mut ContainerImpl<Inner>, String> {
46		match container {
47			Container::Hash(ref mut c) => Ok(c),
48			_ => Err(format!("Unexpected container type")),
49		}
50	}
51	async fn hash_lock<F: FnOnce(&Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
52		let c1 = self.hash_get_container(key).await;
53		let c2 = c1.lock().await;
54		let c3 = Self::hash_unwrap_container(&c2).await?;
55		processor(&c3.inner)
56	}
57	async fn hash_lock_mut<F: FnOnce(&mut Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
58		let c1 = self.hash_get_container(key).await;
59		let mut c2 = c1.lock().await;
60		let c3 = Self::hash_unwrap_mut_container(&mut c2).await?;
61		processor(&mut c3.inner)
62	}
63	async fn _hash_try_lock_mut<F: FnOnce(&mut Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
64		match self._hash_try_get_container(&key).await {
65			None => Ok(Value::Nill),
66			Some(c1) => {
67				let mut c2 = c1.lock().await;
68				let c3 = Self::hash_unwrap_mut_container(&mut c2).await?;
69				processor(&mut c3.inner)
70			}
71		}
72	}
73
74	pub async fn hash_set(&self, mut args: Arguments) -> ExecResult {
75		let key = Self::extract_key(args.pop_front())?;
76		self.hash_lock_mut(key, |hash| -> ExecResult {
77			let mut count = 0;
78			while args.len() >= 2 {
79				let field = args.pop_front().unwrap();
80				let value = args.pop_front().unwrap();
81				hash.insert(field, value);
82				count = count + 1;
83			}
84			Ok(Value::Integer(count as i64))
85		}).await
86	}
87
88	pub async fn hash_set_nx(&self, mut args: Arguments) -> ExecResult {
89		let key = Self::extract_key(args.pop_front())?;
90		self.hash_lock_mut(key, |hash| -> ExecResult {
91			let field = args.pop_front().unwrap();
92			let value = args.pop_front().unwrap();
93			if let indexmap::map::Entry::Vacant(place) = hash.entry(field) {
94				place.insert(value);
95				Ok(Value::Bool(true))
96			} else {
97				Ok(Value::Bool(false))
98			}
99		}).await
100	}
101
102	pub async fn hash_del(&self, mut args: Arguments) -> ExecResult {
103		let key = Self::extract_key(args.pop_front())?;
104		self.hash_lock_mut(key, |hash| -> ExecResult {
105			let mut count = 0;
106			for field in args {
107				if let Some(_) = hash.remove(&field) {
108					count = count + 1;
109				}
110			}
111			Ok(Value::Integer(count as i64))
112		}).await
113	}
114
115	pub async fn hash_get(&self, mut args: Arguments) -> ExecResult {
116		let key = Self::extract_key(args.pop_front())?;
117		let field = Self::extract(args.pop_front())?;
118		self.hash_lock(key, |hash| -> ExecResult {
119			match hash.get(&field) {
120				None => Ok(Value::Nill),
121				Some(value) => Ok(value.clone()),
122			}
123		}).await
124	}
125
126	pub async fn hash_mget(&self, mut args: Arguments) -> ExecResult {
127		let key = Self::extract_key(args.pop_front())?;
128		self.hash_lock(key, |hash| -> ExecResult {
129			let mut out = VecDeque::with_capacity(args.len());
130			while let Some(field) = args.pop_front() {
131			match hash.get(&field) {
132				None => out.push_back(Value::Nill),
133				Some(value) => out.push_back(value.clone()),
134			}
135			}
136			Ok(Value::Array(out))
137		}).await
138	}
139
140	pub async fn hash_get_all(&self, mut args: Arguments) -> ExecResult {
141		let key = Self::extract_key(args.pop_front())?;
142		self.hash_lock(key, |hash| -> ExecResult {
143			let mut out = VecDeque::with_capacity(2 * hash.len());
144			for (field, value) in hash {
145				out.push_back(field.clone());
146				out.push_back(value.clone());
147			}
148			Ok(Value::Array(out))
149		}).await
150	}
151
152	pub async fn hash_exists(&self, mut args: Arguments) -> ExecResult {
153		let key = Self::extract_key(args.pop_front())?;
154		let field = Self::extract(args.pop_front())?;
155		self.hash_lock(key, |hash| -> ExecResult {
156			Ok(Value::Bool(hash.contains_key(&field)))
157		}).await
158	}
159
160	pub async fn hash_keys(&self, mut args: Arguments) -> ExecResult {
161		let key = Self::extract_key(args.pop_front())?;
162		self.hash_lock(key, |hash| -> ExecResult {
163			Ok(Value::Array(hash.keys().cloned().collect()))
164		}).await
165	}
166
167	pub async fn hash_values(&self, mut args: Arguments) -> ExecResult {
168		let key = Self::extract_key(args.pop_front())?;
169		self.hash_lock(key, |hash| -> ExecResult {
170			Ok(Value::Array(hash.values().cloned().collect()))
171		}).await
172	}
173
174	pub async fn hash_len(&self, mut args: Arguments) -> ExecResult {
175		let key = Self::extract_key(args.pop_front())?;
176		self.hash_lock(key, |hash| -> ExecResult {
177			Ok(Value::Integer(hash.len() as i64))
178		}).await
179	}
180
181	pub async fn hash_strlen(&self, mut args: Arguments) -> ExecResult {
182		let key = Self::extract_key(args.pop_front())?;
183		let field = Self::extract(args.pop_front())?;
184		self.hash_lock(key, |hash| -> ExecResult {
185			match hash.get(&field) {
186				Some(Value::Buffer(value)) => Ok(Value::Integer(value.len() as i64)),
187				_ => Ok(Value::Nill),
188			}
189		}).await
190	}
191
192	pub async fn hash_incrby(&self, mut args: Arguments) -> ExecResult {
193		let key = Self::extract_key(args.pop_front())?;
194		let field = Self::extract(args.pop_front())?;
195		let value = Self::extract_integer(args.pop_front())?;
196		self.hash_lock_mut(key, |hash| -> ExecResult {
197			match hash.entry(field).or_insert(Value::Integer(0)) {
198				Value::Integer(v) => {
199					*v = *v + value;
200					Ok(Value::Integer(*v))
201				},
202				_ => Err(format!("Unexpected field type")),
203			}
204		}).await
205	}
206
207	pub async fn hash_incrbyfloat(&self, mut args: Arguments) -> ExecResult {
208		let key = Self::extract_key(args.pop_front())?;
209		let field = Self::extract(args.pop_front())?;
210		let value = Self::extract_float(args.pop_front())?;
211		self.hash_lock_mut(key, |hash| -> ExecResult {
212			match hash.entry(field).or_insert(Value::Float(0f64.to_bits())) {
213				Value::Float(ref mut n) => {
214					*n = (f64::from_bits(*n) + value).to_bits();
215					Ok(Value::Float(*n))
216				},
217				_ => Err(format!("Unexpected field type")),
218			}
219		}).await
220	}
221
222	pub async fn hash_scan(&self, mut args: Arguments) -> ExecResult {
223		let key = Self::extract_key(args.pop_front())?;
224		let start = Self::extract_index(args.pop_front())?;
225
226		let mut pattern: Option<String> = None;
227		let mut max_check = 100usize;
228
229		while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
230			match &subcmd.to_uppercase()[..] {
231				"MATCH" => pattern = Some(Self::extract_string(args.pop_front())?),
232				"COUNT" => max_check = Self::extract_index(args.pop_front())?,
233				arg => return Err(format!("Unexpected argument '{}'", arg)),
234			}
235		}
236
237		let pattern = match pattern {
238			None => None,
239			Some(pattern) => Some(regex::bytes::Regex::new(&pattern[..]).map_err(|e|format!("{}", e))?),
240		};
241
242		let mut fields = vec![];
243
244		self.hash_lock(key, |hash| -> ExecResult {
245			let end = start + max_check;
246			let mut next = start;
247			for i in start..end {
248				next = i;
249				if let Some((k, _)) = hash.get_index(i) {
250					if let Some(pattern) = &pattern {
251						match k {
252							Value::Buffer(k) => {
253								if ! pattern.is_match(&k[..]) {
254									continue;
255								}
256							},
257							o@_ => {
258								let bytes = format!("{}", o).bytes().collect::<Vec<u8>>();
259								if ! pattern.is_match(&bytes[..]) {
260									continue;
261								}
262							}
263						}
264					}
265					fields.push(k.clone());
266				} else {
267					next = 0;
268					break;
269				}
270			}
271
272			let next = Value::Integer(next as i64);
273			let fields = Value::Array(
274				fields
275				.drain(..)
276				.collect()
277			);
278			Ok(Value::Array(vec![next, fields].into()))
279		}).await
280	}
281}
282