1#![allow(clippy::new_without_default)]
4
5mod body;
6pub mod error;
7pub mod results;
8
9use body::{
10 CreateRadixTreeBody, RadixTreeBatchOperationBody, RadixTreeInsertBody, RadixTreeSearchBody,
11};
12use error::Error;
13use reqwest::{Client, StatusCode};
14use results::{
15 BatchOperationResult, ContainsResult, CreateRadixTreeResult, DeleteRadixTreeResult,
16 InsertResult, RadixTreeExistsResult, RemoveResult, SearchResult,
17};
18use rustc_hash::FxHashMap;
19use serde::{Deserialize, Serialize};
20use std::{
21 num::Wrapping,
22 sync::{
23 atomic::{AtomicUsize, Ordering},
24 Arc,
25 },
26};
27use tokio::sync::RwLock;
28
29macro_rules! format_error {
30 ($format:expr $(, $arg:expr)*) => {
31 $crate::error::Error(format!($format $(, $arg)*))
32 };
33}
34
35macro_rules! match_response {
36 ($response:expr, $result:ident) => {
37 match $response.status() {
38 reqwest::StatusCode::OK => Ok($result::Ok),
39 reqwest::StatusCode::UNAUTHORIZED => Ok($result::UnAuthorized),
40 reqwest::StatusCode::SERVICE_UNAVAILABLE => Ok($result::TemporarilyUnavailable),
41 reqwest::StatusCode::NOT_FOUND => Ok($result::NotFound),
42 _ => Err(format_error!("Received invalid response: {:?}", $response)),
43 }
44 };
45}
46
47pub type PodURI = String;
48pub type RadixTreeId = String;
49
50#[derive(Debug, Serialize, Deserialize)]
51struct SyncData {
52 pub uri: PodURI,
53 pub rt_ids: Vec<RadixTreeId>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub enum BatchOperation {
59 Insert((String, Option<String>)),
60 Remove(String),
61}
62
63#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
65pub enum SearchSortOrder {
66 Ascending,
67 Descending,
68}
69
70#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
93pub enum SearchSort {
94 Length(SearchSortOrder),
95 Alphabetical(SearchSortOrder),
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct SearchOptions {
121 pub sort: Option<Vec<SearchSort>>,
122 pub depth: usize,
123 pub limit: Option<usize>,
124 pub inclusive: Option<bool>,
125 pub predictive: Option<bool>,
126 pub corrective: Option<usize>,
127 pub prepend_prefix: Option<bool>,
128}
129
130pub struct SearchOptionsBuilder {
132 sort: Option<Vec<SearchSort>>,
133 depth: usize,
134 limit: Option<usize>,
135 inclusive: Option<bool>,
136 predictive: Option<bool>,
137 corrective: Option<usize>,
138 prepend_prefix: Option<bool>,
139}
140
141impl SearchOptionsBuilder {
142 pub fn new(depth: usize) -> Self {
143 SearchOptionsBuilder {
144 sort: None,
145 depth,
146 limit: None,
147 inclusive: None,
148 predictive: None,
149 corrective: None,
150 prepend_prefix: None,
151 }
152 }
153
154 pub fn build(self) -> SearchOptions {
155 SearchOptions {
156 sort: self.sort,
157 depth: self.depth,
158 limit: self.limit,
159 inclusive: self.inclusive,
160 predictive: self.predictive,
161 corrective: self.corrective,
162 prepend_prefix: self.prepend_prefix,
163 }
164 }
165
166 pub fn sort(mut self, sort: Vec<SearchSort>) -> Self {
168 self.sort = Some(sort);
169 self
170 }
171
172 pub fn limit(mut self, limit: usize) -> Self {
174 self.limit = Some(limit);
175 self
176 }
177
178 pub fn inclusive(mut self, inclusive: bool) -> Self {
180 self.inclusive = Some(inclusive);
181 self
182 }
183
184 pub fn predictive(mut self, predictive: bool) -> Self {
186 self.predictive = Some(predictive);
187 self
188 }
189
190 pub fn corrective(mut self, mistakes: usize) -> Self {
195 self.corrective = Some(mistakes);
196 self
197 }
198
199 pub fn prepend_prefix(mut self, prepend_prefix: bool) -> Self {
221 self.prepend_prefix = Some(prepend_prefix);
222 self
223 }
224}
225
226#[derive(Debug, Clone)]
227enum Mode {
228 Single,
229 Cluster,
230}
231
232#[derive(Debug, Clone)]
311pub struct RtPods {
312 key: Option<String>,
313 mode: Mode,
314 client: Arc<Client>,
315 rt_map: Arc<RwLock<FxHashMap<RadixTreeId, PodURI>>>,
316 pod_uris: Arc<Vec<PodURI>>,
317 pod_index: Arc<AtomicUsize>,
318}
319
320#[derive(Debug, Clone)]
328pub struct RtPodsConfig {
329 pub key: Option<String>,
330 pub https: Option<bool>,
331 pub https_allow_invalid: Option<bool>,
332}
333
334pub struct RtPodsConfigBuilder {
335 key: Option<String>,
336 https: Option<bool>,
337 https_allow_invalid: Option<bool>,
338}
339
340impl RtPodsConfigBuilder {
341 pub fn new() -> Self {
342 Self {
343 key: None,
344 https: None,
345 https_allow_invalid: None,
346 }
347 }
348
349 pub fn build(self) -> RtPodsConfig {
350 RtPodsConfig {
351 key: self.key,
352 https: self.https,
353 https_allow_invalid: self.https_allow_invalid,
354 }
355 }
356
357 pub fn key(mut self, key: String) -> Self {
358 self.key = Some(key);
359 self
360 }
361
362 pub fn https(mut self, https: bool) -> Self {
363 self.https = Some(https);
364 self
365 }
366
367 pub fn https_allow_invalid(mut self, https_allow_invalid: bool) -> Self {
368 self.https_allow_invalid = Some(https_allow_invalid);
369 self
370 }
371}
372
373impl RtPods {
374 pub fn new(
409 uris: Vec<impl AsRef<str> + std::fmt::Display>,
410 config: Option<RtPodsConfig>,
411 ) -> Result<Self, Error> {
412 let https = config
413 .as_ref()
414 .is_some_and(|config| config.https.is_some_and(|v| v));
415
416 let https_allow_invalid = config
417 .as_ref()
418 .is_some_and(|config| config.https_allow_invalid.is_some_and(|v| v));
419
420 let pod_uris = Arc::new(
421 uris.into_iter()
422 .map(|uri| format!("{}://{}", if https { "https" } else { "http" }, uri))
423 .collect::<Vec<String>>(),
424 );
425 let pod_uris_len = pod_uris.len();
426
427 if pod_uris_len == 0 {
428 return Err(format_error!("Must provide at least one PodURI"));
429 }
430
431 let mode = if pod_uris_len > 1 {
432 Mode::Cluster
433 } else {
434 Mode::Single
435 };
436
437 let key = if let Some(config) = config {
438 config.key
439 } else {
440 None
441 };
442
443 let client = Client::builder()
444 .danger_accept_invalid_certs(https_allow_invalid)
445 .build()
446 .map_err(|err| format_error!("Failed to build client! {:?}", err))?;
447
448 Ok(Self {
449 key,
450 mode,
451 client: Arc::new(client),
452 rt_map: Arc::new(RwLock::new(FxHashMap::default())),
453 pod_uris,
454 pod_index: Arc::new(AtomicUsize::new(0)),
455 })
456 }
457
458 async fn get_sync_data(
459 client: &Client,
460 uri: &str,
461 key: Option<&String>,
462 ) -> Result<SyncData, Error> {
463 let mut request = client.get(format!("{}/__get_client_sync_data", uri));
464
465 if let Some(key) = key {
466 request = request.bearer_auth(key);
467 }
468
469 let response = request
470 .send()
471 .await
472 .map_err(|err| format_error!("{:?}", err))?;
473
474 if response.status() != 200 {
475 return Err(format_error!("Failed to get sync data from {uri}"));
476 }
477
478 let sync_data = response
479 .json::<SyncData>()
480 .await
481 .map_err(|err| format_error!("{:?}", err))?;
482
483 Ok(sync_data)
484 }
485
486 fn next_pod_uri(&self) -> PodURI {
487 let pods_length = self.pod_uris.len();
488
489 if pods_length == 0 {
490 return self.pod_uris[0].to_string();
491 }
492
493 let index = Wrapping(self.pod_index.fetch_add(1, Ordering::SeqCst)) % Wrapping(pods_length);
494 let index = index.0;
495 self.pod_uris[index].to_string()
496 }
497
498 async fn get_target_pod_uri(&self, radix_tree_id: &str) -> PodURI {
499 if matches!(self.mode, Mode::Cluster) {
500 if let Some(pod_uri) = self.rt_map.read().await.get(radix_tree_id) {
501 return pod_uri.to_string();
502 }
503 }
504
505 self.next_pod_uri()
506 }
507
508 pub async fn sync(&self) {
535 if matches!(self.mode, Mode::Cluster) {
536 for pod_uri in self.pod_uris.iter() {
537 if let Ok(SyncData { rt_ids, .. }) =
538 RtPods::get_sync_data(&self.client, pod_uri, self.key.as_ref()).await
539 {
540 let mut rt_map_writer = self.rt_map.write().await;
541 for rt_id in rt_ids {
542 rt_map_writer.insert(rt_id, pod_uri.to_owned());
543 }
544 }
545 }
546 }
547 }
548
549 pub async fn search(
588 &self,
589 radix_tree_id: &str,
590 query: &str,
591 search_options: SearchOptions,
592 ) -> Result<SearchResult, Error> {
593 let body = RadixTreeSearchBody {
594 s: query,
595 id: radix_tree_id,
596 search_options,
597 };
598
599 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
600
601 let mut request = self
602 .client
603 .post(format!("{}/radix_tree_search", pod_uri))
604 .json(&body);
605
606 if let Some(ref key) = self.key {
607 request = request.bearer_auth(key);
608 }
609
610 let response = request
611 .send()
612 .await
613 .map_err(|err| format_error!("{:?}", err))?;
614
615 match response.status() {
616 StatusCode::OK => {}
617 StatusCode::UNAUTHORIZED => return Ok(SearchResult::UnAuthorized),
618 StatusCode::SERVICE_UNAVAILABLE => return Ok(SearchResult::TemporarilyUnavailable),
619 StatusCode::NOT_FOUND => return Ok(SearchResult::NotFound),
620 _ => {
621 return Err(format_error!("Received invalid response: {:?}", response));
622 }
623 }
624
625 let results = response
626 .json::<Vec<(String, Option<String>)>>()
627 .await
628 .map_err(|err| format_error!("{:?}", err))?;
629
630 Ok(SearchResult::Ok(results))
631 }
632
633 pub async fn insert(
644 &self,
645 radix_tree_id: &str,
646 s: &str,
647 data: Option<&str>,
648 ) -> Result<InsertResult, Error> {
649 let body = RadixTreeInsertBody {
650 s,
651 id: radix_tree_id,
652 data,
653 };
654
655 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
656
657 let mut request = self
658 .client
659 .post(format!("{}/radix_tree_insert", pod_uri))
660 .json(&body);
661
662 if let Some(ref key) = self.key {
663 request = request.bearer_auth(key);
664 }
665
666 let response = request
667 .send()
668 .await
669 .map_err(|err| format_error!("{:?}", err))?;
670
671 match_response!(response, InsertResult)
672 }
673
674 pub async fn remove(&self, radix_tree_id: &str, s: &str) -> Result<RemoveResult, Error> {
685 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
686
687 let mut request = self.client.delete(format!(
688 "{}/radix_tree_remove?s={}&id={}",
689 pod_uri, s, radix_tree_id
690 ));
691
692 if let Some(ref key) = self.key {
693 request = request.bearer_auth(key);
694 }
695
696 let response = request
697 .send()
698 .await
699 .map_err(|err| format_error!("{:?}", err))?;
700
701 match_response!(response, RemoveResult)
702 }
703
704 pub async fn contains(&self, radix_tree_id: &str, s: &str) -> Result<ContainsResult, Error> {
715 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
716
717 let mut request = self.client.get(format!(
718 "{}/radix_tree_contains?s={}&id={}",
719 pod_uri, s, radix_tree_id
720 ));
721
722 if let Some(ref key) = self.key {
723 request = request.bearer_auth(key);
724 }
725
726 let response = request
727 .send()
728 .await
729 .map_err(|err| format_error!("{:?}", err))?;
730
731 match response.status() {
732 StatusCode::OK => {}
733 StatusCode::UNAUTHORIZED => return Ok(ContainsResult::UnAuthorized),
734 StatusCode::SERVICE_UNAVAILABLE => return Ok(ContainsResult::TemporarilyUnavailable),
735 StatusCode::NOT_FOUND => return Ok(ContainsResult::NotFound),
736 _ => {
737 return Err(format_error!("Received invalid response: {:?}", response));
738 }
739 }
740
741 let result = response
742 .json::<bool>()
743 .await
744 .map_err(|err| format_error!("{:?}", err))?;
745
746 Ok(ContainsResult::Ok(result))
747 }
748
749 pub async fn batch_operation(
773 &self,
774 radix_tree_id: &str,
775 operations: Vec<BatchOperation>,
776 ) -> Result<BatchOperationResult, Error> {
777 let body = RadixTreeBatchOperationBody {
778 id: radix_tree_id,
779 operations,
780 };
781
782 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
783
784 let mut request = self
785 .client
786 .put(format!("{}/radix_tree_batch_operation", pod_uri))
787 .json(&body);
788
789 if let Some(ref key) = self.key {
790 request = request.bearer_auth(key);
791 }
792
793 let response = request
794 .send()
795 .await
796 .map_err(|err| format_error!("{:?}", err))?;
797
798 match response.status() {
799 StatusCode::OK => Ok(BatchOperationResult::Ok),
800 StatusCode::NOT_FOUND => Ok(BatchOperationResult::NotFound),
801 StatusCode::UNAUTHORIZED => Ok(BatchOperationResult::UnAuthorized),
802 StatusCode::SERVICE_UNAVAILABLE => Ok(BatchOperationResult::TemporarilyUnavailable),
803 _ => Err(format_error!("Received invalid response: {:?}", response)),
804 }
805 }
806
807 pub async fn delete_radix_tree(
818 &self,
819 radix_tree_id: &str,
820 ) -> Result<DeleteRadixTreeResult, Error> {
821 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
822
823 let mut request = self.client.delete(format!(
824 "{}/delete_radix_tree?id={}",
825 pod_uri, radix_tree_id
826 ));
827
828 if let Some(ref key) = self.key {
829 request = request.bearer_auth(key);
830 }
831
832 let response = request
833 .send()
834 .await
835 .map_err(|err| format_error!("{:?}", err))?;
836
837 match_response!(response, DeleteRadixTreeResult)
838 }
839
840 pub async fn create_radix_tree(
851 &self,
852 radix_tree_id: &str,
853 ) -> Result<CreateRadixTreeResult, Error> {
854 let body = CreateRadixTreeBody {
855 id: radix_tree_id,
856 _override_find_target: None,
857 };
858
859 let pod_uri = self.next_pod_uri();
860
861 let mut request = self
862 .client
863 .post(format!("{}/create_radix_tree", pod_uri))
864 .json(&body);
865
866 if let Some(ref key) = self.key {
867 request = request.bearer_auth(key);
868 }
869
870 let response = request
871 .send()
872 .await
873 .map_err(|err| format_error!("{:?}", err))?;
874
875 match response.status() {
876 StatusCode::OK => Ok(CreateRadixTreeResult::Ok),
877 StatusCode::BAD_REQUEST => Ok(CreateRadixTreeResult::AlreadyExists),
878 StatusCode::UNAUTHORIZED => Ok(CreateRadixTreeResult::UnAuthorized),
879 StatusCode::SERVICE_UNAVAILABLE => Ok(CreateRadixTreeResult::TemporarilyUnavailable),
880 _ => Err(format_error!("Received invalid response: {:?}", response)),
881 }
882 }
883
884 pub async fn radix_tree_exists(
895 &self,
896 radix_tree_id: &str,
897 ) -> Result<RadixTreeExistsResult, Error> {
898 let pod_uri = self.get_target_pod_uri(radix_tree_id).await;
899
900 let mut request = self.client.get(format!(
901 "{}/radix_tree_exists?id={}",
902 pod_uri, radix_tree_id
903 ));
904
905 if let Some(ref key) = self.key {
906 request = request.bearer_auth(key);
907 }
908
909 let response = request
910 .send()
911 .await
912 .map_err(|err| format_error!("{:?}", err))?;
913
914 match response.status() {
915 StatusCode::OK => {}
916 StatusCode::UNAUTHORIZED => return Ok(RadixTreeExistsResult::UnAuthorized),
917 StatusCode::SERVICE_UNAVAILABLE => {
918 return Ok(RadixTreeExistsResult::TemporarilyUnavailable)
919 }
920 _ => {
921 return Err(format_error!("Received invalid response: {:?}", response));
922 }
923 }
924
925 let result = response
926 .json::<bool>()
927 .await
928 .map_err(|err| format_error!("{:?}", err))?;
929
930 Ok(RadixTreeExistsResult::Ok(result))
931 }
932}