1use once_cell::sync::OnceCell;
2use serde::{Deserialize, Serialize};
3use std::error::Error;
4use tikv_client::{raw::Client, Config};
5static TIKV: OnceCell<Client> = OnceCell::new();
6#[derive(Clone, Serialize, Deserialize, Debug)]
9pub struct TikvConnParams {
10 pub tlsclusterenabled: bool,
11 pub sslcacerti: String,
12 pub sslclientcerti: String,
13 pub sslclientkeycerti: String,
14 pub host: String,
15}
16
17#[derive(Debug, Serialize, Deserialize)]
18pub struct BatchResponse {
19 pub keys: Vec<String>,
20 pub values: Option<Vec<String>>,
21}
22pub async fn init_client(tikv_conn_param: Option<TikvConnParams>) -> Result<String, String> {
24 match create_client(tikv_conn_param).await {
25 Ok(_client) => {
26 return Ok(String::from("Client Created"));
27 }
28 Err(err) => {
29 return Err(String::from(err));
30 }
31 };
32}
33
34pub async fn create_client(tikv_conn_param: Option<TikvConnParams>) -> Result<Client, String> {
36 let cleint = TIKV.get();
37 match cleint {
38 Some(_client) => {
39 println!("client is active.");
40 Ok(TIKV.get().unwrap().to_owned())
41 },
42 None => {
43 if tikv_conn_param.is_none(){
44 return Err(String::from("client is not active."));
45 }
46 let config = Config::default();
47 let tls_cluster_enabled = tikv_conn_param.clone().unwrap().tlsclusterenabled;
48 println!("In tls cluster enable: {}:-", tls_cluster_enabled);
49 if tls_cluster_enabled {
50 let with_security_config = config.to_owned().with_security(
51 tikv_conn_param.clone().unwrap().sslcacerti,
52 tikv_conn_param.clone().unwrap().sslclientcerti,
53 tikv_conn_param.clone().unwrap().sslclientkeycerti,
54 );
55 let client = Client::new_with_config(
56 vec![tikv_conn_param.clone().unwrap().host],
57 with_security_config,
58 )
59 .await;
60 match client {
61 Ok(client) => {
62 let new_client: Client = client.with_atomic_for_cas();
63 TIKV.get_or_init(|| new_client.to_owned());
64 Ok(new_client)
65 }
66 Err(error) => Err(error.to_string()),
67 }
68 } else {
69 println!("In tls cluster enable: {}:-", tls_cluster_enabled);
70 let client =
72 Client::new_with_config(vec![tikv_conn_param.unwrap().host], config).await;
73 match client {
74 Ok(client) => {
75 let new_client: Client = client.with_atomic_for_cas();
76 TIKV.get_or_init(|| new_client.to_owned());
77 Ok(new_client)
78 }
79 Err(error) => Err(error.to_string()),
80 }
81 }
82 }
83 }
84}
85
86pub async fn get_single_record(
88 key: String,
89 project_name: Option<String>,
90) -> Result<String, String> {
91 let client = create_client(None).await;
92 match client {
93 Ok(client) => {
94 let mut new_key = key;
95 if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
96 let res = get_project_level_key_with_global_prefix(
97 project_name.as_ref().unwrap(),
98 &new_key,
99 );
100 match res {
101 Ok(res) => {
102 new_key = res;
103 }
104 Err(error) => {
105 return Err(error.to_string());
106 }
107 }
108 }
109 let value = client.get(new_key).await; match value {
111 Ok(value) => match value {
112 Some(value) => {
113 return Ok(String::from_utf8(value).unwrap());
114 }
115 None => {
116 return Err("Key Does Not Exits".to_string());
117 }
118 },
119 Err(error) => {
120 return Err(error.to_string());
121 }
122 }
123 }
124 Err(error) => return Err(error.to_string()),
125 }
126}
127
128pub async fn add_single_record(
130 key: String,
131 value: String,
132 old_value: Option<String>,
133 project_name: Option<String>,
134) -> Result<String, String> {
135 let client = create_client(None).await;
136 match client {
137 Ok(client) => {
138 let mut new_key = key.to_owned();
139 if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
140 let res = get_project_level_key_with_global_prefix(
141 project_name.as_ref().unwrap(),
142 &new_key,
143 );
144 match res {
145 Ok(res) => {
146 new_key = res;
147 }
148 Err(error) => {
149 return Err(error.to_string());
150 }
151 }
152 }
153 if !old_value.is_none() && old_value.to_owned().unwrap().len() > 0 {
154 let new_eqa = client
155 .compare_and_swap(
156 new_key.to_owned(),
157 Some(old_value.clone().unwrap().as_bytes().to_vec()),
158 value.as_bytes().to_vec(),
159 )
160 .await;
161 match new_eqa {
162 Ok((_new_val, _flag)) => {
163 if !_flag {
164 return Err("Could not swap.".to_string());
165 }
166 return Ok(String::from(
167 "Record Updated With CAS For Key".to_owned() + &key,
168 ));
169 }
170 Err(error) => {
171 return Err(error.to_string());
172 }
173 }
174 } else {
175 let client_res = client.put(new_key.to_owned(), value).await; match client_res {
177 Ok(_res) => {
178 return Ok(String::from("New Record Added With Key".to_owned() + &key));
179 }
180 Err(error) => {
181 return Err(error.to_string());
182 }
183 }
184 }
185 }
186 Err(error) => {
187 return Err(error.to_string());
188 }
189 }
190}
191
192pub async fn get_batch_using_scan(
194 start: String,
195 end: String,
196 batch_size: i32,
197 only_keys: bool,
198 project_name: Option<String>,
199) -> Result<String, String> {
200 let client = create_client(None).await;
201 match client {
202 Ok(client) => {
203 let mut start_key = start;
204 let mut end_key = end.to_owned();
205 let mut prefix_value = "".to_string();
206 if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
207 let start_key_with_project = get_project_level_key_with_global_prefix(
208 project_name.as_ref().unwrap(),
209 &start_key,
210 );
211 match start_key_with_project {
212 Ok(start_key_with_project) => {
213 let end_key_with_project = get_project_level_key_with_global_prefix(
214 project_name.as_ref().unwrap(),
215 &end_key,
216 );
217 match end_key_with_project {
218 Ok(mut end_key_with_project) => {
219 start_key = start_key_with_project;
220 if end.len() == 0 {
221 end_key_with_project.push_str("~");
222 }
223 end_key_with_project.push_str("\\0");
224 end_key = end_key_with_project;
225 prefix_value = format!("k{}_", project_name.unwrap().to_lowercase())
226 }
227 Err(error) => {
228 return Err(error.to_string());
229 }
230 }
231 }
232 Err(error) => {
233 return Err(error.to_string());
234 }
235 }
236 }
237 if only_keys {
238 let scan_result = client
239 .scan_keys((start_key..=end_key).into_inner(), batch_size as u32)
240 .await;
241 match scan_result {
242 Ok(scan_result) => {
243 let string_keys: Vec<String> = scan_result
244 .iter()
245 .map(|key| {
246 String::from_utf8_lossy(key.as_ref().into())
247 .to_string()
248 .replace(&prefix_value, "")
249 })
250 .collect();
251 let res = serde_json::to_string(&BatchResponse {
252 keys: string_keys,
253 values: None,
254 });
255 match res {
256 Ok(res) => {
257 return Ok(res);
258 }
259 Err(error) => return Err(error.to_string()),
260 }
261 }
262 Err(error) => return Err(error.to_string()),
263 }
264 } else {
265 let scan = client
266 .scan((start_key..=end_key).into_inner(), batch_size as u32)
267 .await;
268 match scan {
269 Ok(scan) => {
270 let string_keys: Vec<String> = scan
271 .iter()
272 .map(|key| {
273 String::from_utf8_lossy(key.0.as_ref().into())
274 .to_string()
275 .replace(&prefix_value, "")
276 })
277 .collect();
278 let string_values: Vec<String> = scan
279 .iter()
280 .map(|key| String::from_utf8_lossy(&key.1.to_vec()).to_string())
281 .collect();
282 let res = serde_json::to_string(&BatchResponse {
283 keys: string_keys,
284 values: Some(string_values),
285 });
286 match res {
287 Ok(res) => {
288 return Ok(res);
289 }
290 Err(error) => return Err(error.to_string()),
291 }
292 }
293 Err(error) => return Err(error.to_string()),
294 }
295 }
296 }
297 Err(error) => {
298 return Err(error.to_string());
299 }
300 }
301}
302
303pub async fn delete_single_record(
305 key: String,
306 project_name: Option<String>,
307) -> Result<String, String> {
308 let client = create_client(None).await;
309 match client {
310 Ok(client) => {
311 let mut new_key = key.to_owned();
312 if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
313 let res = get_project_level_key_with_global_prefix(
314 project_name.as_ref().unwrap(),
315 &new_key,
316 );
317 match res {
318 Ok(res) => {
319 new_key = res;
320 }
321 Err(error) => {
322 return Err(error.to_string());
323 }
324 }
325 }
326 let delete_res = client.delete(new_key.to_owned()).await;
327 match delete_res {
328 Ok(_delete_res) => {
329 return Ok(String::from("Record Deleted With Key".to_owned() + &key));
330 }
331 Err(error) => {
332 return Err(error.to_string());
333 }
334 }
335 }
336 Err(error) => {
337 return Err(error.to_string());
338 }
339 }
340}
341
342pub fn get_project_level_key_with_global_prefix(
343 project: &str,
344 key: &str,
345) -> Result<String, Box<dyn Error>> {
346 if project.trim().is_empty() {
347 return Err("tikv:project cannot be empty".into());
348 }
349 if key.trim().contains("~") {
350 return Err("tikv:invalid character in key: ~".into());
351 }
352 let key_with_project_name = format!("k{}_{}", project.trim().to_lowercase(), key.trim());
353 Ok(key_with_project_name)
354}
355
356#[derive(Debug)]
358pub struct ReturnError {
359 pub error: String,
360}