drasi_index_garnet/
lib.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use drasi_core::interface::IndexError;
16use redis::{aio::MultiplexedConnection, cmd, AsyncCommands};
17
18pub mod element_index;
19pub mod future_queue;
20pub mod result_index;
21mod storage_models;
22
23trait ClearByPattern {
24    async fn clear(&self, pattern: String) -> Result<(), IndexError>;
25}
26
27impl ClearByPattern for MultiplexedConnection {
28    async fn clear(&self, pattern: String) -> Result<(), IndexError> {
29        let mut con = self.clone();
30        let mut con2 = self.clone();
31
32        let mut cursor = "0".to_string();
33        loop {
34            let mut cmd = cmd("SCAN");
35            let cmd = cmd.arg(remove_surrounding_quotes(&cursor));
36            let cmd = cmd.arg("MATCH");
37            let cmd = cmd.arg(&pattern);
38            let cmd = cmd.arg("COUNT");
39            let cmd = cmd.arg(100);
40
41            let result = match cmd
42                .query_async::<MultiplexedConnection, Vec<redis::Value>>(&mut con)
43                .await
44            {
45                Ok(v) => v,
46                Err(e) => return Err(IndexError::other(e)),
47            };
48
49            if result.len() < 2 {
50                break;
51            }
52
53            match &result[0] {
54                redis::Value::Status(s) => {
55                    cursor = s.clone();
56                }
57                redis::Value::Data(d) => {
58                    if let Ok(s) = String::from_utf8(d.to_vec()) {
59                        cursor = s;
60                    }
61                }
62                _ => (),
63            }
64
65            if let redis::Value::Bulk(b) = &result[1] {
66                for k in b {
67                    if let redis::Value::Data(d) = k {
68                        if let Ok(k) = String::from_utf8(d.to_vec()) {
69                            match con2.del::<&str, ()>(remove_surrounding_quotes(&k)).await {
70                                Ok(_) => (),
71                                Err(e) => return Err(IndexError::other(e)),
72                            }
73                        }
74                    }
75                }
76            }
77
78            if cursor == "0" {
79                break;
80            }
81        }
82        Ok(())
83    }
84}
85
86fn remove_surrounding_quotes(s: &str) -> &str {
87    if s.len() >= 2
88        && (s.starts_with('"') && s.ends_with('"') || s.starts_with('\'') && s.ends_with('\''))
89    {
90        &s[1..s.len() - 1]
91    } else {
92        s
93    }
94}