radish_database/
set.rs

1/* Copyright (c) 2020 Dmitry Shatilov <shatilov dot diman at gmail dot com>
2 * 
3 * This program is free software: you can redistribute it and/or modify
4 * it under the terms of the GNU Affero General Public License as published by
5 * the Free Software Foundation, either version 3 of the License, or
6 * (at your option) any later version.
7
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 * GNU Affero General Public License for more details.
12
13 * You should have received a copy of the GNU Affero General Public License
14 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
15 */
16
17use std::collections::VecDeque;
18
19use indexmap::IndexSet;
20
21use super::container::Container;
22use super::container::ContainerPtr;
23use super::container::ContainerImpl;
24
25type Key = super::Key;
26type Value = super::Value;
27type Arguments = super::Arguments;
28type ExecResult = super::ExecResult;
29
30type Inner = IndexSet<Value>;
31
32impl super::Storage {
33	async fn set_get_container(&self, key: Key) -> ContainerPtr {
34		self.get_container(key, ||Container::Set(ContainerImpl::<Inner>::new())).await
35	}
36	async fn set_get_containers(&self, keys: Vec<Key>) -> Vec<ContainerPtr> {
37		self.get_containers(keys, ||Container::Set(ContainerImpl::<Inner>::new())).await
38	}
39	async fn set_unwrap_container(container: &Container) -> Result<&ContainerImpl<Inner>, String> {
40		match container {
41			Container::Set(ref c) => Ok(c),
42			_ => Err(format!("Unexpected container type")),
43		}
44	}
45	async fn set_unwrap_mut_container(container: &mut Container) -> Result<&mut ContainerImpl<Inner>, String> {
46		match container {
47			Container::Set(ref mut c) => Ok(c),
48			_ => Err(format!("Unexpected container type")),
49		}
50	}
51	async fn set_lock<F: FnOnce(&Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
52		let c1 = self.set_get_container(key).await;
53		let c2 = c1.lock().await;
54		let c3 = Self::set_unwrap_container(&c2).await?;
55		processor(&c3.inner)
56	}
57	async fn set_lock_mut<F: FnOnce(&mut Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
58		let c1 = self.set_get_container(key).await;
59		let mut c2 = c1.lock().await;
60		let c3 = Self::set_unwrap_mut_container(&mut c2).await?;
61		processor(&mut c3.inner)
62	}
63
64	async fn set_lock_containers<F>(&self, keys: Vec<Key>, callback: F) -> ExecResult
65	where F: FnOnce(VecDeque<&mut ContainerImpl<Inner>>) -> ExecResult {
66		let containers = self.set_get_containers(keys).await;
67		let (mut guards, _) = Self::lock_all(containers.iter().map(|c|c.as_ref()), std::iter::empty()).await;
68
69		let mut inners = VecDeque::with_capacity(guards.len());
70		for g in &mut guards {
71			inners.push_back(Self::set_unwrap_mut_container(&mut *g).await?);
72		}
73
74		callback(inners)
75	}
76
77	pub async fn set_card(&self, mut args: Arguments) -> ExecResult {
78		let key = Self::extract_key(args.pop_front())?;
79		self.set_lock(key, |set| -> ExecResult {
80			Ok(Value::Integer(set.len() as i64))
81		}).await
82	}
83
84	pub async fn set_members(&self, mut args: Arguments) -> ExecResult {
85		let key = Self::extract_key(args.pop_front())?;
86		self.set_lock(key, |set| -> ExecResult {
87			Ok(Value::Array(set.iter().map(|v|v.clone()).collect()))
88		}).await
89	}
90
91	pub async fn set_is_member(&self, mut args: Arguments) -> ExecResult {
92		let key = Self::extract_key(args.pop_front())?;
93		let member = Self::extract(args.pop_front())?;
94		self.set_lock(key, |set| -> ExecResult {
95			Ok(Value::Integer(if set.contains(&member) {1} else {0}))
96		}).await
97	}
98
99	pub async fn set_add(&self, mut args: Arguments) -> ExecResult {
100		let key = Self::extract_key(args.pop_front())?;
101		self.set_lock_mut(key, |set| -> ExecResult {
102			let mut count: u32 = 0;
103			for arg in args {
104				if set.insert(arg) {
105					count = count + 1;
106				}
107			}
108			Ok(Value::Integer(count as i64))
109		}).await
110	}
111
112	pub async fn set_rem(&self, mut args: Arguments) -> ExecResult {
113		let key = Self::extract_key(args.pop_front())?;
114		self.set_lock_mut(key, |set| {
115			let mut count: u32 = 0;
116			for arg in args {
117				if set.remove(&arg) {
118					count = count + 1;
119				}
120			}
121			Ok(Value::Integer(count as i64))
122		}).await
123	}
124
125	pub async fn set_pop(&self, mut args: Arguments) -> ExecResult {
126		let key = Self::extract_key(args.pop_front())?;
127		let count = if let Ok(count) = Self::extract_index(args.pop_front()) {count} else {1};
128		self.set_lock_mut(key, |set| {
129			let mut remove_items = VecDeque::with_capacity(count);
130			for _ in 0..count {
131				let index = rand::random::<usize>() % set.len();
132				if let Some(item) = set.swap_remove_index(index) {
133					remove_items.push_back(item);
134				}
135			}
136			Ok(Value::Array(remove_items))
137		}).await
138	}
139
140	pub async fn set_move(&self, mut args: Arguments) -> ExecResult {
141		let source = Self::extract_key(args.pop_front())?;
142		let destination = Self::extract_key(args.pop_front())?;
143		let member = Self::extract(args.pop_front())?;
144		self.set_lock_containers(vec![source, destination], |mut sets| -> ExecResult {
145			let source = sets.pop_front().unwrap();
146			if ! source.inner.remove(&member) {
147				Ok(Value::Integer(0))
148			} else {
149				let destination = sets.pop_front().unwrap();
150				destination.inner.insert(member);
151				Ok(Value::Integer(1))
152			}
153		}).await
154	}
155
156	fn set_diff_make_iter<'a>(sets: &'a VecDeque<&mut ContainerImpl<Inner>>) -> impl Iterator<Item=Value> + 'a {
157		let main_set = sets.get(0).unwrap();
158		main_set
159		.inner
160		.iter()
161		.filter(move |&v| {
162			! sets
163			.iter()
164			.skip(1)
165			.any(|set| set.inner.contains(v))
166		})
167		.map(|v|v.clone())
168	}
169
170	pub async fn set_diff(&self, mut args: Arguments) -> ExecResult {
171		let mut keys = vec![Self::extract_key(args.pop_front())?];
172		while let Ok(key) = Self::extract_key(args.pop_front()) {
173			keys.push(key);
174		}
175		self.set_lock_containers(keys, |sets| -> ExecResult {
176			Ok(Value::Array(Self::set_diff_make_iter(&sets).collect()))
177		}).await
178	}
179
180	pub async fn set_diff_store(&self, mut args: Arguments) -> ExecResult {
181		let mut keys = vec![Self::extract_key(args.pop_front())?];
182		while let Ok(key) = Self::extract_key(args.pop_front()) {
183			keys.push(key);
184		}
185		self.set_lock_containers(keys, |mut sets| -> ExecResult {
186			let dest_set = sets.pop_front().unwrap();
187
188			let mut tmp = Inner::new();
189			Self::set_diff_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
190
191			dest_set.inner.clear();
192			dest_set.expiration_time = None;
193			std::mem::swap(&mut dest_set.inner, &mut tmp);
194
195			Ok(Value::Integer(dest_set.inner.len() as i64))
196		}).await
197	}
198
199	fn set_inter_make_iter<'a>(sets: &'a VecDeque<&mut ContainerImpl<Inner>>) -> impl Iterator<Item=Value> + 'a {
200		let main_set = sets.get(0).unwrap();
201		main_set
202		.inner
203		.iter()
204		.filter(move |&v| {
205			! sets
206			.iter()
207			.skip(1)
208			.any(|set| ! set.inner.contains(v))
209		})
210		.map(|v|v.clone())
211	}
212
213	pub async fn set_inter(&self, mut args: Arguments) -> ExecResult {
214		let mut keys = vec![Self::extract_key(args.pop_front())?];
215		while let Ok(key) = Self::extract_key(args.pop_front()) {
216			keys.push(key);
217		}
218		self.set_lock_containers(keys, |sets| -> ExecResult {
219			Ok(Value::Array(Self::set_inter_make_iter(&sets).collect()))
220		}).await
221	}
222
223	pub async fn set_inter_store(&self, mut args: Arguments) -> ExecResult {
224		let mut keys = vec![Self::extract_key(args.pop_front())?];
225		while let Ok(key) = Self::extract_key(args.pop_front()) {
226			keys.push(key);
227		}
228		self.set_lock_containers(keys, |mut sets| -> ExecResult {
229			let dest_set = sets.pop_front().unwrap();
230
231			let mut tmp = Inner::new();
232			Self::set_inter_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
233
234			dest_set.inner.clear();
235			dest_set.expiration_time = None;
236			std::mem::swap(&mut dest_set.inner, &mut tmp);
237
238			Ok(Value::Integer(dest_set.inner.len() as i64))
239		}).await
240	}
241
242	fn set_union_make_iter<'a>(sets: &'a VecDeque<&mut ContainerImpl<Inner>>) -> impl Iterator<Item=Value> + 'a {
243		sets
244		.iter()
245		.flat_map(|s|s.inner.iter())
246		.map(|v|v.clone())
247	}
248
249	pub async fn set_union(&self, mut args: Arguments) -> ExecResult {
250		let mut keys = vec![Self::extract_key(args.pop_front())?];
251		while let Ok(key) = Self::extract_key(args.pop_front()) {
252			keys.push(key);
253		}
254		self.set_lock_containers(keys, |sets| -> ExecResult {
255			let mut tmp = Inner::new();
256			Self::set_union_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
257			Ok(Value::Array(tmp.drain(..).collect()))
258		}).await
259	}
260
261	pub async fn set_union_store(&self, mut args: Arguments) -> ExecResult {
262		let mut keys = vec![Self::extract_key(args.pop_front())?];
263		while let Ok(key) = Self::extract_key(args.pop_front()) {
264			keys.push(key);
265		}
266		self.set_lock_containers(keys, |mut sets| -> ExecResult {
267			let dest_set = sets.pop_front().unwrap();
268
269			let mut tmp = Inner::new();
270			Self::set_union_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
271
272			dest_set.inner.clear();
273			dest_set.expiration_time = None;
274			std::mem::swap(&mut dest_set.inner, &mut tmp);
275
276			Ok(Value::Integer(dest_set.inner.len() as i64))
277		}).await
278	}
279
280	pub async fn _set_rand_member(&self, mut args: Arguments) -> ExecResult {
281		let key = Self::extract_key(args.pop_front())?;
282		let count = if let Ok(count) = Self::extract_integer(args.pop_front()) {count} else {1};
283		let (repeates, count) = if count >= 0 {(false, count as usize)} else {(true, -count as usize)};
284
285		self.set_lock_mut(key, |set| {
286			let mut items = VecDeque::with_capacity(count);
287
288			if repeates {
289				for _ in 0..count {
290					let index = rand::random::<usize>() % set.len();
291					if let Some(item) = set.get_index(index) {
292						items.push_back(item.clone());
293					}
294				}
295			} else {
296				return Err("Unimplemented".to_owned());
297			}
298			Ok(Value::Array(items))
299		}).await
300	}
301
302	pub async fn set_scan(&self, mut args: Arguments) -> ExecResult {
303		let key = Self::extract_key(args.pop_front())?;
304		let start = Self::extract_index(args.pop_front())?;
305
306		let mut pattern: Option<String> = None;
307		let mut max_check = 100usize;
308
309		while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
310			match &subcmd.to_uppercase()[..] {
311				"MATCH" => pattern = Some(Self::extract_string(args.pop_front())?),
312				"COUNT" => max_check = Self::extract_index(args.pop_front())?,
313				arg => return Err(format!("Unexpected argument '{}'", arg)),
314			}
315		}
316
317		let pattern = match pattern {
318			None => None,
319			Some(pattern) => Some(regex::bytes::Regex::new(&pattern[..]).map_err(|e|format!("{}", e))?),
320		};
321
322		let mut values = vec![];
323
324		self.set_lock(key, |set| -> ExecResult {
325			let end = start + max_check;
326			let mut next = start;
327			for i in start..end {
328				next = i;
329				if let Some(value) = set.get_index(i) {
330					if let Some(pattern) = &pattern {
331						match value {
332							Value::Buffer(value) => {
333								if ! pattern.is_match(&value[..]) {
334									continue;
335								}
336							},
337							o@_ => {
338								let bytes = format!("{}", o).bytes().collect::<Vec<u8>>();
339								if ! pattern.is_match(&bytes[..]) {
340									continue;
341								}
342							}
343						}
344					}
345					values.push(value.clone());
346				} else {
347					next = 0;
348					break;
349				}
350			}
351
352			let next = Value::Integer(next as i64);
353			let values = Value::Array(
354				values
355				.drain(..)
356				.collect()
357			);
358			Ok(Value::Array(vec![next, values].into()))
359		}).await
360	}
361}
362