inventyv_tikv_sdk/
lib.rs

1use once_cell::sync::OnceCell;
2use serde::{Deserialize, Serialize};
3use std::error::Error;
4use tikv_client::{raw::Client, Config};
5static TIKV: OnceCell<Client> = OnceCell::new();
6// use wasm_bindgen::prelude::*;
7
8#[derive(Clone, Serialize, Deserialize, Debug)]
9pub struct TikvConnParams {
10    pub tlsclusterenabled: bool,
11    pub sslcacerti: String,
12    pub sslclientcerti: String,
13    pub sslclientkeycerti: String,
14    pub host: String,
15}
16
17#[derive(Debug, Serialize, Deserialize)]
18pub struct BatchResponse {
19    pub keys: Vec<String>,
20    pub values: Option<Vec<String>>,
21}
22// #[wasm_bindgen]
23pub async fn init_client(tikv_conn_param: Option<TikvConnParams>) -> Result<String, String> {
24    match create_client(tikv_conn_param).await {
25        Ok(_client) => {
26            return Ok(String::from("Client Created"));
27        }
28        Err(err) => {
29            return Err(String::from(err));
30        }
31    };
32}
33
34// #[wasm_bindgen]
35pub async fn create_client(tikv_conn_param: Option<TikvConnParams>) -> Result<Client, String> {
36    let cleint = TIKV.get();
37    match cleint {
38        Some(_client) => {
39        println!("client is active.");
40            Ok(TIKV.get().unwrap().to_owned())
41        },
42        None => {
43            if tikv_conn_param.is_none(){
44              return Err(String::from("client is not active."));
45            }
46            let config = Config::default();
47            let tls_cluster_enabled = tikv_conn_param.clone().unwrap().tlsclusterenabled;
48            println!("In tls cluster enable: {}:-", tls_cluster_enabled);
49            if tls_cluster_enabled {
50                let with_security_config = config.to_owned().with_security(
51                    tikv_conn_param.clone().unwrap().sslcacerti,
52                    tikv_conn_param.clone().unwrap().sslclientcerti,
53                    tikv_conn_param.clone().unwrap().sslclientkeycerti,
54                );
55                let client = Client::new_with_config(
56                    vec![tikv_conn_param.clone().unwrap().host],
57                    with_security_config,
58                )
59                .await;
60                match client {
61                    Ok(client) => {
62                        let new_client: Client = client.with_atomic_for_cas();
63                        TIKV.get_or_init(|| new_client.to_owned());
64                        Ok(new_client)
65                    }
66                    Err(error) => Err(error.to_string()),
67                }
68            } else {
69                println!("In tls cluster enable: {}:-", tls_cluster_enabled);
70                // let security = config.with_security("ca_path", "cert_path", "key_path");
71                let client =
72                    Client::new_with_config(vec![tikv_conn_param.unwrap().host], config).await;
73                match client {
74                    Ok(client) => {
75                        let new_client: Client = client.with_atomic_for_cas();
76                        TIKV.get_or_init(|| new_client.to_owned());
77                        Ok(new_client)
78                    }
79                    Err(error) => Err(error.to_string()),
80                }
81            }
82        }
83    }
84}
85
86// // #[wasm_bindgen]
87pub async fn get_single_record(
88    key: String,
89    project_name: Option<String>,
90) -> Result<String, String> {
91    let client = create_client(None).await;
92    match client {
93        Ok(client) => {
94            let mut new_key = key;
95            if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
96                let res = get_project_level_key_with_global_prefix(
97                    project_name.as_ref().unwrap(),
98                    &new_key,
99                );
100                match res {
101                    Ok(res) => {
102                        new_key = res;
103                    }
104                    Err(error) => {
105                        return Err(error.to_string());
106                    }
107                }
108            }
109            let value = client.get(new_key).await; // Returns a `tikv_client::Error` on failure.
110            match value {
111                Ok(value) => match value {
112                    Some(value) => {
113                        return Ok(String::from_utf8(value).unwrap());
114                    }
115                    None => {
116                        return Err("Key Does Not Exits".to_string());
117                    }
118                },
119                Err(error) => {
120                    return Err(error.to_string());
121                }
122            }
123        }
124        Err(error) => return Err(error.to_string()),
125    }
126}
127
128// #[wasm_bindgen]
129pub async fn add_single_record(
130    key: String,
131    value: String,
132    old_value: Option<String>,
133    project_name: Option<String>,
134) -> Result<String, String> {
135    let client = create_client(None).await;
136    match client {
137        Ok(client) => {
138            let mut new_key = key.to_owned();
139            if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
140                let res = get_project_level_key_with_global_prefix(
141                    project_name.as_ref().unwrap(),
142                    &new_key,
143                );
144                match res {
145                    Ok(res) => {
146                        new_key = res;
147                    }
148                    Err(error) => {
149                        return Err(error.to_string());
150                    }
151                }
152            }
153            if !old_value.is_none() && old_value.to_owned().unwrap().len() > 0 {
154                let new_eqa = client
155                    .compare_and_swap(
156                        new_key.to_owned(),
157                        Some(old_value.clone().unwrap().as_bytes().to_vec()),
158                        value.as_bytes().to_vec(),
159                    )
160                    .await;
161                match new_eqa {
162                    Ok((_new_val, _flag)) => {
163                        if !_flag {
164                            return Err("Could not swap.".to_string());
165                        }
166                        return Ok(String::from(
167                            "Record Updated With CAS For Key".to_owned() + &key,
168                        ));
169                    }
170                    Err(error) => {
171                        return Err(error.to_string());
172                    }
173                }
174            } else {
175                let client_res = client.put(new_key.to_owned(), value).await; // Returns a `tikv_client::Error` on failure.
176                match client_res {
177                    Ok(_res) => {
178                        return Ok(String::from("New Record Added With Key".to_owned() + &key));
179                    }
180                    Err(error) => {
181                        return Err(error.to_string());
182                    }
183                }
184            }
185        }
186        Err(error) => {
187            return Err(error.to_string());
188        }
189    }
190}
191
192// #[wasm_bindgen]
193pub async fn get_batch_using_scan(
194    start: String,
195    end: String,
196    batch_size: i32,
197    only_keys: bool,
198    project_name: Option<String>,
199) -> Result<String, String> {
200    let client = create_client(None).await;
201    match client {
202        Ok(client) => {
203            let mut start_key = start;
204            let mut end_key = end.to_owned();
205            let mut prefix_value = "".to_string();
206            if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
207                let start_key_with_project = get_project_level_key_with_global_prefix(
208                    project_name.as_ref().unwrap(),
209                    &start_key,
210                );
211                match start_key_with_project {
212                    Ok(start_key_with_project) => {
213                        let end_key_with_project = get_project_level_key_with_global_prefix(
214                            project_name.as_ref().unwrap(),
215                            &end_key,
216                        );
217                        match end_key_with_project {
218                            Ok(mut end_key_with_project) => {
219                                start_key = start_key_with_project;
220                                if end.len() == 0 {
221                                    end_key_with_project.push_str("~");
222                                }
223                                end_key_with_project.push_str("\\0");
224                                end_key = end_key_with_project;
225                                prefix_value = format!("k{}_", project_name.unwrap().to_lowercase())
226                            }
227                            Err(error) => {
228                                return Err(error.to_string());
229                            }
230                        }
231                    }
232                    Err(error) => {
233                        return Err(error.to_string());
234                    }
235                }
236            }
237            if only_keys {
238                let scan_result = client
239                    .scan_keys((start_key..=end_key).into_inner(), batch_size as u32)
240                    .await;
241                match scan_result {
242                    Ok(scan_result) => {
243                        let string_keys: Vec<String> = scan_result
244                            .iter()
245                            .map(|key| {
246                                String::from_utf8_lossy(key.as_ref().into())
247                                    .to_string()
248                                    .replace(&prefix_value, "")
249                            })
250                            .collect();
251                        let res = serde_json::to_string(&BatchResponse {
252                            keys: string_keys,
253                            values: None,
254                        });
255                        match res {
256                            Ok(res) => {
257                                return Ok(res);
258                            }
259                            Err(error) => return Err(error.to_string()),
260                        }
261                    }
262                    Err(error) => return Err(error.to_string()),
263                }
264            } else {
265                let scan = client
266                    .scan((start_key..=end_key).into_inner(), batch_size as u32)
267                    .await;
268                match scan {
269                    Ok(scan) => {
270                        let string_keys: Vec<String> = scan
271                            .iter()
272                            .map(|key| {
273                                String::from_utf8_lossy(key.0.as_ref().into())
274                                    .to_string()
275                                    .replace(&prefix_value, "")
276                            })
277                            .collect();
278                        let string_values: Vec<String> = scan
279                            .iter()
280                            .map(|key| String::from_utf8_lossy(&key.1.to_vec()).to_string())
281                            .collect();
282                        let res = serde_json::to_string(&BatchResponse {
283                            keys: string_keys,
284                            values: Some(string_values),
285                        });
286                        match res {
287                            Ok(res) => {
288                                return Ok(res);
289                            }
290                            Err(error) => return Err(error.to_string()),
291                        }
292                    }
293                    Err(error) => return Err(error.to_string()),
294                }
295            }
296        }
297        Err(error) => {
298            return Err(error.to_string());
299        }
300    }
301}
302
303// #[wasm_bindgen]
304pub async fn delete_single_record(
305    key: String,
306    project_name: Option<String>,
307) -> Result<String, String> {
308    let client = create_client(None).await;
309    match client {
310        Ok(client) => {
311            let mut new_key = key.to_owned();
312            if !project_name.is_none() && project_name.as_ref().unwrap().len() > 0 {
313                let res = get_project_level_key_with_global_prefix(
314                    project_name.as_ref().unwrap(),
315                    &new_key,
316                );
317                match res {
318                    Ok(res) => {
319                        new_key = res;
320                    }
321                    Err(error) => {
322                        return Err(error.to_string());
323                    }
324                }
325            }
326            let delete_res = client.delete(new_key.to_owned()).await;
327            match delete_res {
328                Ok(_delete_res) => {
329                    return Ok(String::from("Record Deleted With Key".to_owned() + &key));
330                }
331                Err(error) => {
332                    return Err(error.to_string());
333                }
334            }
335        }
336        Err(error) => {
337            return Err(error.to_string());
338        }
339    }
340}
341
342pub fn get_project_level_key_with_global_prefix(
343    project: &str,
344    key: &str,
345) -> Result<String, Box<dyn Error>> {
346    if project.trim().is_empty() {
347        return Err("tikv:project cannot be empty".into());
348    }
349    if key.trim().contains("~") {
350        return Err("tikv:invalid character in key: ~".into());
351    }
352    let key_with_project_name = format!("k{}_{}", project.trim().to_lowercase(), key.trim());
353    Ok(key_with_project_name)
354}
355
356// pub fn caste
357#[derive(Debug)]
358pub struct ReturnError {
359    pub error: String,
360}