drasi_index_garnet/
lib.rs1use drasi_core::interface::IndexError;
16use redis::{aio::MultiplexedConnection, cmd, AsyncCommands};
17
18pub mod element_index;
19pub mod future_queue;
20pub mod result_index;
21mod storage_models;
22
23trait ClearByPattern {
24 async fn clear(&self, pattern: String) -> Result<(), IndexError>;
25}
26
27impl ClearByPattern for MultiplexedConnection {
28 async fn clear(&self, pattern: String) -> Result<(), IndexError> {
29 let mut con = self.clone();
30 let mut con2 = self.clone();
31
32 let mut cursor = "0".to_string();
33 loop {
34 let mut cmd = cmd("SCAN");
35 let cmd = cmd.arg(remove_surrounding_quotes(&cursor));
36 let cmd = cmd.arg("MATCH");
37 let cmd = cmd.arg(&pattern);
38 let cmd = cmd.arg("COUNT");
39 let cmd = cmd.arg(100);
40
41 let result = match cmd
42 .query_async::<MultiplexedConnection, Vec<redis::Value>>(&mut con)
43 .await
44 {
45 Ok(v) => v,
46 Err(e) => return Err(IndexError::other(e)),
47 };
48
49 if result.len() < 2 {
50 break;
51 }
52
53 match &result[0] {
54 redis::Value::Status(s) => {
55 cursor = s.clone();
56 }
57 redis::Value::Data(d) => {
58 if let Ok(s) = String::from_utf8(d.to_vec()) {
59 cursor = s;
60 }
61 }
62 _ => (),
63 }
64
65 if let redis::Value::Bulk(b) = &result[1] {
66 for k in b {
67 if let redis::Value::Data(d) = k {
68 if let Ok(k) = String::from_utf8(d.to_vec()) {
69 match con2.del::<&str, ()>(remove_surrounding_quotes(&k)).await {
70 Ok(_) => (),
71 Err(e) => return Err(IndexError::other(e)),
72 }
73 }
74 }
75 }
76 }
77
78 if cursor == "0" {
79 break;
80 }
81 }
82 Ok(())
83 }
84}
85
86fn remove_surrounding_quotes(s: &str) -> &str {
87 if s.len() >= 2
88 && (s.starts_with('"') && s.ends_with('"') || s.starts_with('\'') && s.ends_with('\''))
89 {
90 &s[1..s.len() - 1]
91 } else {
92 s
93 }
94}