use crate::{
Limit, MontycatClientError,
engine::{structure::Engine, utils::send_data},
request::{
store_request::structure::StoreRequestClient,
structure::Req,
utis::functions::{convert_custom_key, merge_bulk_keys_values, merge_keys},
},
tools::functions::{define_type, process_json_value},
};
use async_trait::async_trait;
use serde::Serialize;
use std::collections::HashMap;
#[async_trait]
pub trait Keyspace
where
Self: Sized + Send + Sync,
{
fn get_engine(&self) -> Engine;
fn get_name(&self) -> &str;
fn get_persistent(&self) -> bool;
fn get_distributed(&self) -> bool;
async fn remove_keyspace(&self) -> Result<Option<Vec<u8>>, MontycatClientError> {
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let vec: Vec<String> = vec![
"remove-keyspace".into(),
"store".into(),
store,
"keyspace".into(),
name.to_owned(),
"persistent".into(),
if persistent { "y".into() } else { "n".into() },
];
let credentials: Vec<String> = engine.get_credentials();
let query: Req = Req::new_raw_command(vec, credentials);
let bytes: Vec<u8> = query.byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn get_value(
&self,
key: Option<&str>,
custom_key: Option<&str>,
with_pointers: bool,
key_included: bool,
with_pointers_metadata: bool,
) -> Result<Option<Vec<u8>>, MontycatClientError> {
if key.is_none() && custom_key.is_none() {
return Err(MontycatClientError::ClientSelectedBothKeyAndCustomKey);
}
if key.is_none() && custom_key.is_none() {
return Err(MontycatClientError::ClientNoValidInputProvided);
}
let mut key: String = key.unwrap_or("").to_owned();
if with_pointers_metadata && with_pointers {
return Err(MontycatClientError::ClientSelectedBothPointersValueAndMetadata);
}
if let Some(custom_key_unwrapped) = custom_key {
key = convert_custom_key(custom_key_unwrapped);
}
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "get_value".to_string();
let new_store_req: StoreRequestClient = StoreRequestClient {
key: key.to_owned().into(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
with_pointers,
key_included,
pointers_metadata: with_pointers_metadata,
username: engine.username.clone(),
password: engine.password.clone(),
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_req).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn delete_key(
&self,
key: Option<&str>,
custom_key: Option<&str>,
) -> Result<Option<Vec<u8>>, MontycatClientError> {
if key.is_some() && custom_key.is_some() {
return Err(MontycatClientError::ClientSelectedBothKeyAndCustomKey);
}
if key.is_none() && custom_key.is_none() {
return Err(MontycatClientError::ClientNoValidInputProvided);
}
let mut key: String = key.unwrap_or("").to_owned();
if let Some(custom_key_unwrapped) = custom_key {
key = convert_custom_key(custom_key_unwrapped);
}
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "delete_key".to_string();
let new_store_req: StoreRequestClient = StoreRequestClient {
key: key.to_owned().into(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
username: engine.username.clone(),
password: engine.password.clone(),
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_req).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn list_all_depending_keys(
&self,
key: &str,
custom_key: Option<&str>,
) -> Result<Option<Vec<u8>>, MontycatClientError> {
if !key.is_empty() && custom_key.is_some() {
return Err(MontycatClientError::ClientSelectedBothKeyAndCustomKey);
}
let mut key: String = key.to_owned();
if let Some(custom_key_unwrapped) = custom_key {
key = convert_custom_key(custom_key_unwrapped);
}
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "list_all_depending_keys".to_string();
let new_store_req: StoreRequestClient = StoreRequestClient {
key: key.to_owned().into(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
username: engine.username.clone(),
password: engine.password.clone(),
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_req).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
#[allow(clippy::too_many_arguments)]
async fn get_bulk(
&self,
bulk_keys: Option<Vec<String>>,
bulk_custom_keys: Option<Vec<String>>,
with_pointers: bool,
key_included: bool,
with_pointers_metadata: bool,
limit: Option<Limit>,
volumes: Option<Vec<String>>,
latest_volume: Option<bool>,
) -> Result<Option<Vec<u8>>, MontycatClientError> {
if with_pointers && with_pointers_metadata {
return Err(MontycatClientError::ClientSelectedBothPointersValueAndMetadata);
}
let processed_keys: Vec<String> = merge_keys(bulk_keys, bulk_custom_keys).await?;
let selected_options = [
!processed_keys.is_empty(),
volumes.as_ref().is_some_and(|v| !v.is_empty()),
latest_volume.unwrap_or(false),
]
.iter()
.filter(|&&x| x)
.count();
if selected_options != 1 {
return Err(MontycatClientError::ClientGenericError(
"Multiple conflicting options provided. Please provide exactly one of the following: keys, volumes, or latest volume.".into()
));
}
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "get_bulk".to_string();
let limit_map: HashMap<String, usize> = match limit {
Some(lim) => {
if lim.start > lim.stop {
return Err(MontycatClientError::ClientGenericError(
"Limit start cannot be greater than stop".into(),
));
}
lim.to_map()
}
None => Limit::default_limit().to_map(),
};
let new_store_req: StoreRequestClient = StoreRequestClient {
bulk_keys: processed_keys,
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
limit_output: limit_map,
username: engine.username.clone(),
password: engine.password.clone(),
with_pointers,
key_included,
pointers_metadata: with_pointers_metadata,
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_req).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn delete_bulk(
&self,
bulk_keys: Option<Vec<String>>,
bulk_custom_keys: Option<Vec<String>>,
) -> Result<Option<Vec<u8>>, MontycatClientError> {
let keys_processed: Vec<String> = merge_keys(bulk_keys, bulk_custom_keys).await?;
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "delete_bulk".to_string();
let new_store_req: StoreRequestClient = StoreRequestClient {
bulk_keys: keys_processed,
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
username: engine.username.clone(),
password: engine.password.clone(),
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_req).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn get_len(&self) -> Result<Option<Vec<u8>>, MontycatClientError> {
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "get_len".to_string();
let new_store_req: StoreRequestClient = StoreRequestClient {
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
username: engine.username.clone(),
password: engine.password.clone(),
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_req).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn enforce_schema(
&self,
schema_params: (std::collections::HashMap<&str, &str>, &str),
) -> Result<Option<Vec<u8>>, MontycatClientError> {
let (fields, schema_name) = schema_params;
let mut schema_types: HashMap<String, (&'static str, bool)> = HashMap::new();
for (field_name, field_type) in fields.into_iter() {
let type_def = define_type(field_type)?;
schema_types.insert(field_name.to_string(), type_def);
}
let schema_types_as_string: String = serde_json::to_string(&schema_types)
.map_err(|e| MontycatClientError::ClientValueParsingError(e.to_string()))?;
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let vec: Vec<String> = vec![
"enforce-schema".into(),
"store".into(),
store,
"keyspace".into(),
name.to_owned(),
"persistent".into(),
if persistent { "y".into() } else { "n".into() },
"schema_name".into(),
schema_name.to_string(),
"schema_content".into(),
schema_types_as_string,
];
let credentials: Vec<String> = engine.get_credentials();
let query: Req = Req::new_raw_command(vec, credentials);
let bytes: Vec<u8> = query.byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn remove_enforced_schema(
&self,
schema_name: (HashMap<&str, &str>, &str),
) -> Result<Option<Vec<u8>>, MontycatClientError> {
let (_fields, schema_name) = schema_name;
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let vec: Vec<String> = vec![
"remove-enforced-schema".into(),
"store".into(),
store,
"keyspace".into(),
name.to_owned(),
"persistent".into(),
if persistent { "y".into() } else { "n".into() },
"schema_name".into(),
schema_name.to_string(),
];
let credentials: Vec<String> = engine.get_credentials();
let query: Req = Req::new_raw_command(vec, credentials);
let bytes: Vec<u8> = query.byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn list_all_schemas_in_keyspace(&self) -> Result<Option<Vec<u8>>, MontycatClientError> {
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "list_all_schemas_in_keyspace".to_string();
let new_store_request: StoreRequestClient = StoreRequestClient {
username: engine.username.clone(),
password: engine.password.clone(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_request).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn update_bulk<T>(
&self,
bulk_keys_values: Vec<HashMap<String, T>>,
bulk_custom_keys_values: Vec<HashMap<String, T>>,
) -> Result<Option<Vec<u8>>, MontycatClientError>
where
T: Serialize + Send + 'static,
{
if bulk_keys_values.is_empty() && bulk_custom_keys_values.is_empty() {
return Err(MontycatClientError::ClientNoValidInputProvided);
}
let bulk: HashMap<String, String> =
merge_bulk_keys_values(bulk_keys_values, bulk_custom_keys_values).await?;
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "update_value".to_string();
let new_store_request: StoreRequestClient = StoreRequestClient {
bulk_keys_values: bulk,
username: engine.username.clone(),
password: engine.password.clone(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_request).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn lookup_keys_where<T>(
&self,
search_criteria: T,
limit: Option<Limit>,
schema_name: Option<(HashMap<&str, &str>, &str)>,
) -> Result<Option<Vec<u8>>, MontycatClientError>
where
T: Serialize + Send + 'static,
{
let schema = {
if let Some((_, schema_name)) = schema_name {
Some(schema_name.to_string())
} else {
None
}
};
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "lookup_keys".to_string();
let filters_serialized: String = process_json_value(&search_criteria)?;
let limit_map: HashMap<String, usize> = match limit {
Some(lim) => {
if lim.start > lim.stop {
return Err(MontycatClientError::ClientGenericError(
"Limit start cannot be greater than stop".into(),
));
}
lim.to_map()
}
None => Limit::default_limit().to_map(),
};
let new_store_request: StoreRequestClient = StoreRequestClient {
schema,
limit_output: limit_map,
search_criteria: filters_serialized,
username: engine.username.clone(),
password: engine.password.clone(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_request).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
async fn lookup_values_where<T>(
&self,
search_criteria: T,
limit: Option<Limit>,
with_pointers: bool,
key_included: bool,
pointers_metadata: bool,
schema_name: Option<(HashMap<&str, &str>, &str)>,
) -> Result<Option<Vec<u8>>, MontycatClientError>
where
T: Serialize + Send + 'static,
{
let schema = {
if let Some((_, schema_name)) = schema_name {
Some(schema_name.to_string())
} else {
None
}
};
let engine: Engine = self.get_engine();
let name: &str = self.get_name();
let persistent: bool = self.get_persistent();
let distributed: bool = self.get_distributed();
let store: String = engine
.store
.clone()
.ok_or(MontycatClientError::ClientStoreNotSet)?;
let use_tls: bool = engine.use_tls;
let command: String = "lookup_values".to_string();
let filters_serialized: String = process_json_value(&search_criteria)?;
let limit_map: HashMap<String, usize> = match limit {
Some(lim) => {
if lim.start > lim.stop {
return Err(MontycatClientError::ClientGenericError(
"Limit start cannot be greater than stop".into(),
));
}
lim.to_map()
}
None => Limit::default_limit().to_map(),
};
let new_store_request: StoreRequestClient = StoreRequestClient {
with_pointers,
key_included,
pointers_metadata,
schema,
limit_output: limit_map,
search_criteria: filters_serialized,
username: engine.username.clone(),
password: engine.password.clone(),
keyspace: name.to_owned(),
store,
persistent,
distributed,
command,
..Default::default()
};
let bytes: Vec<u8> = Req::new_store_command(new_store_request).byte_down()?;
let response: Option<Vec<u8>> = send_data(
&engine.host,
engine.port,
bytes.as_slice(),
None,
None,
use_tls,
)
.await?;
Ok(response)
}
}