tito 0.4.3

Database layer on TiKV with indexing, relationships, transactions, and built-in transactional outbox with partitioned scheduled pub/sub with consumer groups
Documentation
use crate::{
    error::TitoError,
    key_encoder::safe_encode,
    types::{
        FieldValue, TitoEngine, TitoFindByIndexPayload, TitoFindOneByIndexPayload,
        TitoIndexBlockType, TitoModelTrait, TitoPaginated, TitoScanPayload,
    },
    TitoModel,
};
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;

impl<
        E: TitoEngine,
        T: Default
            + Clone
            + Serialize
            + DeserializeOwned
            + Unpin
            + std::marker::Send
            + Sync
            + TitoModelTrait,
    > TitoModel<E, T>
{
    pub fn get_index_keys(
        &self,
        id: String,
        value: &T,
        json: &Value,
    ) -> Result<Vec<(String, Value)>, TitoError> {
        let mut all_index_keys = vec![];

        for index_config in value.indexes().iter() {
            if !index_config.condition {
                continue;
            }

            let base_key = format!("index:{}:", index_config.name);

            let mut combinations: Vec<String> = vec![String::new()];

            for field in index_config.fields.iter() {
                let field_values = match &field.r#type {
                    TitoIndexBlockType::Custom(value) => {
                        vec![FieldValue::Simple(Value::String(value.clone()))]
                    }
                    _ => match self.get_nested_values(json, &field.name) {
                        Some(values) if !values.is_empty() => values,
                        _ => vec![FieldValue::Simple(Value::String("__null__".to_string()))],
                    },
                };

                let mut new_combinations = vec![];

                for field_value in field_values {
                    let field_str = match field_value {
                        FieldValue::HashMapEntry { key, value } => match &field.r#type {
                            TitoIndexBlockType::String | TitoIndexBlockType::Custom(_) => match value.as_str() {
                                Some("") => Some(format!("{}:{}.__null__", field.name, key)),
                                Some(s) => {
                                    Some(format!("{}:{}.{}", field.name, key, safe_encode(&s)))
                                }
                                None => Some(format!("{}:{}.__null__", field.name, key)),
                            },
                            TitoIndexBlockType::Number => match value.as_i64() {
                                Some(n) => Some(format!("{}:{}:{:0>10}", field.name, key, n)),
                                None => Some(format!("{}:{}:__null__", field.name, key)),
                            },
                        },
                        FieldValue::Simple(value) => match &field.r#type {
                            TitoIndexBlockType::String | TitoIndexBlockType::Custom(_) => match value.as_str() {
                                Some("") => Some(format!("{}:__null__", field.name)),
                                Some(s) => Some(format!("{}:{}", field.name, safe_encode(&s))),
                                None => Some(format!("{}:__null__", field.name)),
                            },
                            TitoIndexBlockType::Number => match value.as_i64() {
                                Some(n) => Some(format!("{}:{:0>10}", field.name, n)),
                                None => Some(format!("{}:__null__", field.name)),
                            },
                        },
                    };

                    if let Some(field_str) = field_str {
                        for existing_combo in &combinations {
                            new_combinations.push(format!(
                                "{}{}{}",
                                existing_combo,
                                if existing_combo.is_empty() { "" } else { ":" },
                                field_str
                            ));
                        }
                    }
                }

                if !new_combinations.is_empty() {
                    combinations = new_combinations;
                }
            }

            for combo in combinations {
                if !combo.is_empty() {
                    all_index_keys.push((format!("{}{}:{}", base_key, combo, id), json.clone()));
                }
            }
        }

        Ok(all_index_keys)
    }

    pub async fn find_by_index_raw(
        &self,
        payload: TitoFindByIndexPayload,
        tx: &E::Transaction,
    ) -> Result<(Vec<(String, Value)>, bool), TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        let schema = T::default();
        let indexes = schema.indexes();

        let index = indexes
            .iter()
            .find(|index_config| index_config.name == payload.index)
            .ok_or_else(|| TitoError::IndexError(format!("Index '{}' not found on model '{}'", payload.index, T::table())))?;

        let index_fields = index.fields.clone();

        let mut key_from_values = vec![];
        for (i, value) in payload.values.iter().enumerate() {
            let index_field = index_fields.get(i).cloned().ok_or_else(|| TitoError::IndexError(format!("Index '{}' has {} fields but {} values were provided", payload.index, index_fields.len(), payload.values.len())))?;
            let index_field_type = index_field.r#type;

            let value = match index_field_type {
                TitoIndexBlockType::String | TitoIndexBlockType::Custom(_) => safe_encode(value),
                TitoIndexBlockType::Number => format!("{:0>10}", value),
            };

            let field_name = index_field.name.clone();
            let key_part = format!("{}:{}", field_name, value);
            key_from_values.push(key_part);
        }

        let key_from_value = key_from_values.join(":");

        let id = format!("index:{}:{}:", payload.index, key_from_value);

        let (items, has_more) = self
            .scan(
                TitoScanPayload {
                    start: id,
                    end: None,
                    limit: payload.limit,
                    cursor: payload.cursor.clone(),
                },
                tx,
            )
            .await?;

        let items = self
            .fetch_and_stitch_relationships(items, payload.rels, tx)
            .await?;

        Ok((items, has_more))
    }

    pub async fn find_by_index_reverse_raw(
        &self,
        payload: TitoFindByIndexPayload,
        tx: &E::Transaction,
    ) -> Result<(Vec<(String, Value)>, bool), TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        let schema = T::default();
        let indexes = schema.indexes();
        let index = indexes
            .iter()
            .find(|index_config| index_config.name == payload.index)
            .ok_or_else(|| TitoError::IndexError(format!("Index '{}' not found on model '{}'", payload.index, T::table())))?;

        let index_fields = index.fields.clone();

        let mut key_from_values = vec![];
        for (i, value) in payload.values.iter().enumerate() {
            let index_field = index_fields.get(i).cloned().ok_or_else(|| TitoError::IndexError(format!("Index '{}' has {} fields but {} values were provided", payload.index, index_fields.len(), payload.values.len())))?;
            let index_field_type = index_field.r#type;

            let value = match index_field_type {
                TitoIndexBlockType::String | TitoIndexBlockType::Custom(_) => safe_encode(value),
                TitoIndexBlockType::Number => format!("{:0>10}", value),
            };

            let field_name = index_field.name.clone();
            let key_part = format!("{}:{}", field_name, value);
            key_from_values.push(key_part);
        }

        let key_from_value = key_from_values.join(":");

        let id = format!("index:{}:{}:", payload.index, key_from_value);

        let (items, has_more) = self
            .scan_reverse(
                TitoScanPayload {
                    start: id,
                    end: None,
                    limit: payload.limit,
                    cursor: payload.cursor.clone(),
                },
                tx,
            )
            .await?;

        let items = self
            .fetch_and_stitch_relationships(items, payload.rels, tx)
            .await?;

        Ok((items, has_more))
    }

    async fn find_by_index_with_tx(
        &self,
        payload: TitoFindByIndexPayload,
        tx: &E::Transaction,
    ) -> Result<TitoPaginated<T>, TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        let (items, has_more) = self
            .find_by_index_raw(payload, tx)
            .await?;

        let results = self.to_paginated_items(items, has_more)?;

        Ok(results)
    }

    pub async fn find_by_index(
        &self,
        payload: TitoFindByIndexPayload,
        tx: Option<&E::Transaction>,
    ) -> Result<TitoPaginated<T>, TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        match tx {
            Some(tx) => self.find_by_index_with_tx(payload, tx).await,
            None => {
                self.tx(|tx| {
                    let payload = payload.clone();
                    async move { self.find_by_index_with_tx(payload, &tx).await }
                })
                .await
            }
        }
    }

    async fn find_by_index_reverse_with_tx(
        &self,
        payload: TitoFindByIndexPayload,
        tx: &E::Transaction,
    ) -> Result<TitoPaginated<T>, TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        let (items, has_more) = self.find_by_index_reverse_raw(payload, tx).await?;

        let results = self.to_paginated_items(items, has_more)?;

        Ok(results)
    }

    pub async fn find_by_index_reverse(
        &self,
        payload: TitoFindByIndexPayload,
        tx: Option<&E::Transaction>,
    ) -> Result<TitoPaginated<T>, TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        match tx {
            Some(tx) => self.find_by_index_reverse_with_tx(payload, tx).await,
            None => {
                self.tx(|tx| {
                    let payload = payload.clone();
                    async move { self.find_by_index_reverse_with_tx(payload, &tx).await }
                })
                .await
            }
        }
    }

    async fn find_one_by_index_with_tx(
        &self,
        payload: TitoFindOneByIndexPayload,
        tx: &E::Transaction,
    ) -> Result<T, TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        let (items, _) = self
            .find_by_index_raw(
                TitoFindByIndexPayload {
                    index: payload.index.clone(),
                    values: payload.values.clone(),
                    rels: payload.rels.clone(),
                    limit: Some(1),
                    cursor: None,
                },
                tx,
            )
            .await?;

        if let Some(value) = items.get(0) {
            match serde_json::from_value::<T>(value.1.clone()) {
                Ok(item) => Ok(item),
                Err(e) => Err(TitoError::NotFound(format!(
                    "Failed to deserialize record for index '{}' with values {:?}: {}",
                    payload.index, payload.values, e
                ))),
            }
        } else {
            Err(TitoError::NotFound(format!(
                "No record found for index '{}' with values {:?}",
                payload.index, payload.values
            )))
        }
    }

    pub async fn find_one_by_index(
        &self,
        payload: TitoFindOneByIndexPayload,
        tx: Option<&E::Transaction>,
    ) -> Result<T, TitoError>
    where
        T: serde::de::DeserializeOwned,
    {
        match tx {
            Some(tx) => self.find_one_by_index_with_tx(payload, tx).await,
            None => {
                self.tx(|tx| {
                    let payload = payload.clone();
                    async move { self.find_one_by_index_with_tx(payload, &tx).await }
                })
                .await
            }
        }
    }

}