1use std::collections::VecDeque;
18
19use indexmap::IndexSet;
20
21use super::container::Container;
22use super::container::ContainerPtr;
23use super::container::ContainerImpl;
24
25type Key = super::Key;
26type Value = super::Value;
27type Arguments = super::Arguments;
28type ExecResult = super::ExecResult;
29
30type Inner = IndexSet<Value>;
31
32impl super::Storage {
33 async fn set_get_container(&self, key: Key) -> ContainerPtr {
34 self.get_container(key, ||Container::Set(ContainerImpl::<Inner>::new())).await
35 }
36 async fn set_get_containers(&self, keys: Vec<Key>) -> Vec<ContainerPtr> {
37 self.get_containers(keys, ||Container::Set(ContainerImpl::<Inner>::new())).await
38 }
39 async fn set_unwrap_container(container: &Container) -> Result<&ContainerImpl<Inner>, String> {
40 match container {
41 Container::Set(ref c) => Ok(c),
42 _ => Err(format!("Unexpected container type")),
43 }
44 }
45 async fn set_unwrap_mut_container(container: &mut Container) -> Result<&mut ContainerImpl<Inner>, String> {
46 match container {
47 Container::Set(ref mut c) => Ok(c),
48 _ => Err(format!("Unexpected container type")),
49 }
50 }
51 async fn set_lock<F: FnOnce(&Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
52 let c1 = self.set_get_container(key).await;
53 let c2 = c1.lock().await;
54 let c3 = Self::set_unwrap_container(&c2).await?;
55 processor(&c3.inner)
56 }
57 async fn set_lock_mut<F: FnOnce(&mut Inner) -> ExecResult>(&self, key: Key, processor: F) -> ExecResult {
58 let c1 = self.set_get_container(key).await;
59 let mut c2 = c1.lock().await;
60 let c3 = Self::set_unwrap_mut_container(&mut c2).await?;
61 processor(&mut c3.inner)
62 }
63
64 async fn set_lock_containers<F>(&self, keys: Vec<Key>, callback: F) -> ExecResult
65 where F: FnOnce(VecDeque<&mut ContainerImpl<Inner>>) -> ExecResult {
66 let containers = self.set_get_containers(keys).await;
67 let (mut guards, _) = Self::lock_all(containers.iter().map(|c|c.as_ref()), std::iter::empty()).await;
68
69 let mut inners = VecDeque::with_capacity(guards.len());
70 for g in &mut guards {
71 inners.push_back(Self::set_unwrap_mut_container(&mut *g).await?);
72 }
73
74 callback(inners)
75 }
76
77 pub async fn set_card(&self, mut args: Arguments) -> ExecResult {
78 let key = Self::extract_key(args.pop_front())?;
79 self.set_lock(key, |set| -> ExecResult {
80 Ok(Value::Integer(set.len() as i64))
81 }).await
82 }
83
84 pub async fn set_members(&self, mut args: Arguments) -> ExecResult {
85 let key = Self::extract_key(args.pop_front())?;
86 self.set_lock(key, |set| -> ExecResult {
87 Ok(Value::Array(set.iter().map(|v|v.clone()).collect()))
88 }).await
89 }
90
91 pub async fn set_is_member(&self, mut args: Arguments) -> ExecResult {
92 let key = Self::extract_key(args.pop_front())?;
93 let member = Self::extract(args.pop_front())?;
94 self.set_lock(key, |set| -> ExecResult {
95 Ok(Value::Integer(if set.contains(&member) {1} else {0}))
96 }).await
97 }
98
99 pub async fn set_add(&self, mut args: Arguments) -> ExecResult {
100 let key = Self::extract_key(args.pop_front())?;
101 self.set_lock_mut(key, |set| -> ExecResult {
102 let mut count: u32 = 0;
103 for arg in args {
104 if set.insert(arg) {
105 count = count + 1;
106 }
107 }
108 Ok(Value::Integer(count as i64))
109 }).await
110 }
111
112 pub async fn set_rem(&self, mut args: Arguments) -> ExecResult {
113 let key = Self::extract_key(args.pop_front())?;
114 self.set_lock_mut(key, |set| {
115 let mut count: u32 = 0;
116 for arg in args {
117 if set.remove(&arg) {
118 count = count + 1;
119 }
120 }
121 Ok(Value::Integer(count as i64))
122 }).await
123 }
124
125 pub async fn set_pop(&self, mut args: Arguments) -> ExecResult {
126 let key = Self::extract_key(args.pop_front())?;
127 let count = if let Ok(count) = Self::extract_index(args.pop_front()) {count} else {1};
128 self.set_lock_mut(key, |set| {
129 let mut remove_items = VecDeque::with_capacity(count);
130 for _ in 0..count {
131 let index = rand::random::<usize>() % set.len();
132 if let Some(item) = set.swap_remove_index(index) {
133 remove_items.push_back(item);
134 }
135 }
136 Ok(Value::Array(remove_items))
137 }).await
138 }
139
140 pub async fn set_move(&self, mut args: Arguments) -> ExecResult {
141 let source = Self::extract_key(args.pop_front())?;
142 let destination = Self::extract_key(args.pop_front())?;
143 let member = Self::extract(args.pop_front())?;
144 self.set_lock_containers(vec![source, destination], |mut sets| -> ExecResult {
145 let source = sets.pop_front().unwrap();
146 if ! source.inner.remove(&member) {
147 Ok(Value::Integer(0))
148 } else {
149 let destination = sets.pop_front().unwrap();
150 destination.inner.insert(member);
151 Ok(Value::Integer(1))
152 }
153 }).await
154 }
155
156 fn set_diff_make_iter<'a>(sets: &'a VecDeque<&mut ContainerImpl<Inner>>) -> impl Iterator<Item=Value> + 'a {
157 let main_set = sets.get(0).unwrap();
158 main_set
159 .inner
160 .iter()
161 .filter(move |&v| {
162 ! sets
163 .iter()
164 .skip(1)
165 .any(|set| set.inner.contains(v))
166 })
167 .map(|v|v.clone())
168 }
169
170 pub async fn set_diff(&self, mut args: Arguments) -> ExecResult {
171 let mut keys = vec![Self::extract_key(args.pop_front())?];
172 while let Ok(key) = Self::extract_key(args.pop_front()) {
173 keys.push(key);
174 }
175 self.set_lock_containers(keys, |sets| -> ExecResult {
176 Ok(Value::Array(Self::set_diff_make_iter(&sets).collect()))
177 }).await
178 }
179
180 pub async fn set_diff_store(&self, mut args: Arguments) -> ExecResult {
181 let mut keys = vec![Self::extract_key(args.pop_front())?];
182 while let Ok(key) = Self::extract_key(args.pop_front()) {
183 keys.push(key);
184 }
185 self.set_lock_containers(keys, |mut sets| -> ExecResult {
186 let dest_set = sets.pop_front().unwrap();
187
188 let mut tmp = Inner::new();
189 Self::set_diff_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
190
191 dest_set.inner.clear();
192 dest_set.expiration_time = None;
193 std::mem::swap(&mut dest_set.inner, &mut tmp);
194
195 Ok(Value::Integer(dest_set.inner.len() as i64))
196 }).await
197 }
198
199 fn set_inter_make_iter<'a>(sets: &'a VecDeque<&mut ContainerImpl<Inner>>) -> impl Iterator<Item=Value> + 'a {
200 let main_set = sets.get(0).unwrap();
201 main_set
202 .inner
203 .iter()
204 .filter(move |&v| {
205 ! sets
206 .iter()
207 .skip(1)
208 .any(|set| ! set.inner.contains(v))
209 })
210 .map(|v|v.clone())
211 }
212
213 pub async fn set_inter(&self, mut args: Arguments) -> ExecResult {
214 let mut keys = vec![Self::extract_key(args.pop_front())?];
215 while let Ok(key) = Self::extract_key(args.pop_front()) {
216 keys.push(key);
217 }
218 self.set_lock_containers(keys, |sets| -> ExecResult {
219 Ok(Value::Array(Self::set_inter_make_iter(&sets).collect()))
220 }).await
221 }
222
223 pub async fn set_inter_store(&self, mut args: Arguments) -> ExecResult {
224 let mut keys = vec![Self::extract_key(args.pop_front())?];
225 while let Ok(key) = Self::extract_key(args.pop_front()) {
226 keys.push(key);
227 }
228 self.set_lock_containers(keys, |mut sets| -> ExecResult {
229 let dest_set = sets.pop_front().unwrap();
230
231 let mut tmp = Inner::new();
232 Self::set_inter_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
233
234 dest_set.inner.clear();
235 dest_set.expiration_time = None;
236 std::mem::swap(&mut dest_set.inner, &mut tmp);
237
238 Ok(Value::Integer(dest_set.inner.len() as i64))
239 }).await
240 }
241
242 fn set_union_make_iter<'a>(sets: &'a VecDeque<&mut ContainerImpl<Inner>>) -> impl Iterator<Item=Value> + 'a {
243 sets
244 .iter()
245 .flat_map(|s|s.inner.iter())
246 .map(|v|v.clone())
247 }
248
249 pub async fn set_union(&self, mut args: Arguments) -> ExecResult {
250 let mut keys = vec![Self::extract_key(args.pop_front())?];
251 while let Ok(key) = Self::extract_key(args.pop_front()) {
252 keys.push(key);
253 }
254 self.set_lock_containers(keys, |sets| -> ExecResult {
255 let mut tmp = Inner::new();
256 Self::set_union_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
257 Ok(Value::Array(tmp.drain(..).collect()))
258 }).await
259 }
260
261 pub async fn set_union_store(&self, mut args: Arguments) -> ExecResult {
262 let mut keys = vec![Self::extract_key(args.pop_front())?];
263 while let Ok(key) = Self::extract_key(args.pop_front()) {
264 keys.push(key);
265 }
266 self.set_lock_containers(keys, |mut sets| -> ExecResult {
267 let dest_set = sets.pop_front().unwrap();
268
269 let mut tmp = Inner::new();
270 Self::set_union_make_iter(&sets).for_each(|v|{tmp.insert(v.clone());});
271
272 dest_set.inner.clear();
273 dest_set.expiration_time = None;
274 std::mem::swap(&mut dest_set.inner, &mut tmp);
275
276 Ok(Value::Integer(dest_set.inner.len() as i64))
277 }).await
278 }
279
280 pub async fn _set_rand_member(&self, mut args: Arguments) -> ExecResult {
281 let key = Self::extract_key(args.pop_front())?;
282 let count = if let Ok(count) = Self::extract_integer(args.pop_front()) {count} else {1};
283 let (repeates, count) = if count >= 0 {(false, count as usize)} else {(true, -count as usize)};
284
285 self.set_lock_mut(key, |set| {
286 let mut items = VecDeque::with_capacity(count);
287
288 if repeates {
289 for _ in 0..count {
290 let index = rand::random::<usize>() % set.len();
291 if let Some(item) = set.get_index(index) {
292 items.push_back(item.clone());
293 }
294 }
295 } else {
296 return Err("Unimplemented".to_owned());
297 }
298 Ok(Value::Array(items))
299 }).await
300 }
301
302 pub async fn set_scan(&self, mut args: Arguments) -> ExecResult {
303 let key = Self::extract_key(args.pop_front())?;
304 let start = Self::extract_index(args.pop_front())?;
305
306 let mut pattern: Option<String> = None;
307 let mut max_check = 100usize;
308
309 while let Some(subcmd) = Self::extract_string(args.pop_front()).ok() {
310 match &subcmd.to_uppercase()[..] {
311 "MATCH" => pattern = Some(Self::extract_string(args.pop_front())?),
312 "COUNT" => max_check = Self::extract_index(args.pop_front())?,
313 arg => return Err(format!("Unexpected argument '{}'", arg)),
314 }
315 }
316
317 let pattern = match pattern {
318 None => None,
319 Some(pattern) => Some(regex::bytes::Regex::new(&pattern[..]).map_err(|e|format!("{}", e))?),
320 };
321
322 let mut values = vec![];
323
324 self.set_lock(key, |set| -> ExecResult {
325 let end = start + max_check;
326 let mut next = start;
327 for i in start..end {
328 next = i;
329 if let Some(value) = set.get_index(i) {
330 if let Some(pattern) = &pattern {
331 match value {
332 Value::Buffer(value) => {
333 if ! pattern.is_match(&value[..]) {
334 continue;
335 }
336 },
337 o@_ => {
338 let bytes = format!("{}", o).bytes().collect::<Vec<u8>>();
339 if ! pattern.is_match(&bytes[..]) {
340 continue;
341 }
342 }
343 }
344 }
345 values.push(value.clone());
346 } else {
347 next = 0;
348 break;
349 }
350 }
351
352 let next = Value::Integer(next as i64);
353 let values = Value::Array(
354 values
355 .drain(..)
356 .collect()
357 );
358 Ok(Value::Array(vec![next, values].into()))
359 }).await
360 }
361}
362