use crate::{
error::TitoError,
key_encoder::safe_encode,
types::{
TitoEngine, TitoFindByIndexPayload, TitoFindOneByIndexPayload, TitoIndexBlockType,
TitoModelTrait, TitoPaginated, TitoScanPayload, TitoTransaction,
},
utils::next_string_lexicographically,
TitoModel,
};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
impl<
E: TitoEngine,
T: Default
+ Clone
+ Serialize
+ DeserializeOwned
+ Unpin
+ std::marker::Send
+ Sync
+ TitoModelTrait,
> TitoModel<E, T>
{
pub fn get_index_keys(
&self,
id: String,
value: &T,
json: &Value,
) -> Result<Vec<(String, Value)>, TitoError> {
let mut all_index_keys = vec![];
for index_config in value.get_indexes().iter() {
if !index_config.condition {
continue;
}
let base_key = format!("index:{}:", index_config.name);
let mut combinations: Vec<String> = vec![String::new()];
if let Some(generator) = &index_config.custom_generator {
let custom_keys = generator()?;
for key in custom_keys {
let key = format!("{}{}:{}", base_key, key, id);
if !key.is_empty() {
all_index_keys.push((key, json.clone()));
}
}
}
for field in index_config.fields.iter() {
let field_values = match self.get_nested_values(json, &field.name) {
Some(values) if !values.is_empty() => values,
_ => {
vec![Value::String("__null__".to_string())]
}
};
let mut new_combinations = vec![];
for value in field_values {
let field_str = match field.r#type {
TitoIndexBlockType::String => match value.as_str() {
Some("") => Some(format!("{}:__null__", field.name)),
Some(s) => Some(format!("{}:{}", field.name, safe_encode(&s))),
None => Some(format!("{}:__null__", field.name)),
},
TitoIndexBlockType::Number => match value.as_i64() {
Some(n) => Some(format!("{}:{:0>10}", field.name, n)),
None => Some(format!("{}:__null__", field.name)),
},
};
if let Some(field_str) = field_str {
for existing_combo in &combinations {
new_combinations.push(format!(
"{}{}{}",
existing_combo,
if existing_combo.is_empty() { "" } else { ":" },
field_str
));
}
}
}
if !new_combinations.is_empty() {
combinations = new_combinations;
}
}
for combo in combinations {
if !combo.is_empty() {
all_index_keys.push((format!("{}{}:{}", base_key, combo, id), json.clone()));
}
}
}
Ok(all_index_keys)
}
pub async fn find_by_index_raw(
&self,
payload: TitoFindByIndexPayload,
tx: &E::Transaction,
) -> Result<(Vec<(String, Value)>, bool), TitoError>
where
T: serde::de::DeserializeOwned,
{
let indexes = self.model.get_indexes();
let index = indexes
.iter()
.find(|index_config| index_config.name == payload.index)
.unwrap();
let index_fields = index.fields.clone();
let mut key_from_values = vec![];
for (i, value) in payload.values.iter().enumerate() {
let index_field = index_fields[i].clone();
let index_field_type = index_field.r#type;
let value = match index_field_type {
TitoIndexBlockType::String => safe_encode(value),
TitoIndexBlockType::Number => format!("{:0>10}", value),
};
let field_name = index_field.name.clone();
let key_part = format!("{}:{}", field_name, value);
key_from_values.push(key_part);
}
let key_from_value = key_from_values.join(":");
let mut id = format!("index:{}:{}", payload.index, key_from_value);
if payload.exact_match {
id.push_str(":");
}
let end = if let Some(end) = payload.end {
if let Some(last_colon_index) = id.rfind(':') {
Some(format!("{}:{}", &id[..last_colon_index], end))
} else {
None
}
} else {
None
};
let (items, has_more) = self
.scan(
TitoScanPayload {
start: id,
end,
limit: payload.limit,
cursor: payload.cursor.clone(),
},
tx,
)
.await?;
let items = self
.fetch_and_stitch_relationships(items, payload.rels, tx)
.await?;
Ok((items, has_more))
}
pub async fn find_by_index_reverse_raw(
&self,
payload: TitoFindByIndexPayload,
tx: &E::Transaction,
) -> Result<(Vec<(String, Value)>, bool), TitoError>
where
T: serde::de::DeserializeOwned,
{
let indexes = self.model.get_indexes();
let index = indexes
.iter()
.find(|index_config| index_config.name == payload.index)
.unwrap();
let index_fields = index.fields.clone();
let mut key_from_values = vec![];
for (i, value) in payload.values.iter().enumerate() {
let index_field = index_fields[i].clone();
let index_field_type = index_field.r#type;
let value = match index_field_type {
TitoIndexBlockType::String => safe_encode(value),
TitoIndexBlockType::Number => format!("{:0>10}", value),
};
let field_name = index_field.name.clone();
let key_part = format!("{}:{}", field_name, value);
key_from_values.push(key_part);
}
let key_from_value = key_from_values.join(":");
let id = format!("index:{}:{}", payload.index, key_from_value);
let (items, has_more) = self
.scan_reverse(
TitoScanPayload {
start: id,
end: payload.end.clone(),
limit: payload.limit,
cursor: payload.cursor.clone(),
},
tx,
)
.await?;
let items = self
.fetch_and_stitch_relationships(items, payload.rels, tx)
.await?;
Ok((items, has_more))
}
pub async fn find_by_index_tx(
&self,
payload: TitoFindByIndexPayload,
tx: &E::Transaction,
) -> Result<TitoPaginated<T>, TitoError>
where
T: serde::de::DeserializeOwned,
{
let (items, has_more) = self
.find_by_index_raw(
TitoFindByIndexPayload {
index: payload.index,
values: payload.values,
rels: payload.rels,
end: payload.end,
exact_match: payload.exact_match,
limit: payload.limit,
cursor: payload.cursor,
},
tx,
)
.await?;
let results = self.to_paginated_items(items, has_more)?;
Ok(results)
}
pub async fn find_by_index(
&self,
payload: TitoFindByIndexPayload,
) -> Result<TitoPaginated<T>, TitoError>
where
T: serde::de::DeserializeOwned,
{
self.tx(|tx| async move { self.find_by_index_tx(payload, &tx).await })
.await
}
pub async fn find_by_index_reverse_tx(
&self,
payload: TitoFindByIndexPayload,
tx: &E::Transaction,
) -> Result<TitoPaginated<T>, TitoError>
where
T: serde::de::DeserializeOwned,
{
let cursor = payload.cursor.clone();
let (items, has_more) = self.find_by_index_reverse_raw(payload, tx).await?;
let results = self.to_paginated_items(items, has_more)?;
Ok(results)
}
pub async fn find_by_index_reverse(
&self,
payload: TitoFindByIndexPayload,
) -> Result<TitoPaginated<T>, TitoError>
where
T: serde::de::DeserializeOwned,
{
self.tx(|tx| async move { self.find_by_index_reverse_tx(payload, &tx).await })
.await
}
pub async fn find_one_by_index_tx(
&self,
payload: TitoFindOneByIndexPayload,
tx: &E::Transaction,
) -> Result<T, TitoError>
where
T: serde::de::DeserializeOwned,
{
let (items, _) = self
.find_by_index_raw(
TitoFindByIndexPayload {
index: payload.index.clone(),
values: payload.values.clone(),
rels: payload.rels.clone(),
end: None,
exact_match: true,
limit: Some(1),
cursor: None,
},
tx,
)
.await?;
if let Some(value) = items.get(0) {
match serde_json::from_value::<T>(value.1.clone()) {
Ok(item) => Ok(item),
Err(e) => Err(TitoError::NotFound(format!(
"Failed to deserialize record for index '{}' with values {:?}: {}",
payload.index, payload.values, e
))),
}
} else {
Err(TitoError::NotFound(format!(
"No record found for index '{}' with values {:?}",
payload.index, payload.values
)))
}
}
pub async fn find_one_by_index(
&self,
payload: TitoFindOneByIndexPayload,
) -> Result<T, TitoError>
where
T: serde::de::DeserializeOwned,
{
self.tx(|tx| async move { self.find_one_by_index_tx(payload, &tx).await })
.await
}
pub async fn reindex(&self) -> Result<(), TitoError> {
let table = self.get_table();
let start_key = format!("{}:", table);
let end_key = next_string_lexicographically(start_key.clone());
let mut cursor = start_key.clone();
self.tx(|tx| async move {
loop {
let scan_range = cursor.clone()..end_key.clone();
let kvs = tx.scan(scan_range, 100).await.map_err(|e| {
TitoError::TransactionFailed(String::from("Failed migration, scan"))
})?;
let mut has_kvs = false;
for kv in kvs {
has_kvs = true;
let key = String::from_utf8(kv.0.into()).unwrap();
let value: Value = serde_json::from_slice(&kv.1).unwrap();
let model_instance =
serde_json::from_value::<T>(value.clone()).map_err(|_| {
TitoError::TransactionFailed(String::from("Failed migration, model"))
})?;
self.update_with_options(model_instance, false, &tx).await?;
cursor = next_string_lexicographically(key);
}
if !has_kvs {
break;
}
}
Ok::<_, TitoError>(true)
})
.await;
Ok(())
}
}