ydb 0.12.0

Crate contains generated low-level grpc code from YDB API protobuf, used as base for ydb crate
Documentation
use crate::errors;
use crate::errors::{YdbError, YdbResult, YdbStatusError};
use crate::grpc::proto_issues_to_ydb_issues;
use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryResult;
use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue, RawValue};
use crate::trace_helpers::ensure_len_string;
use crate::types::Value;
use itertools::Itertools;
use std::collections::HashMap;
use std::sync::Arc;
use std::vec::IntoIter;
use tracing::trace;
use ydb_grpc::ydb_proto::status_ids::StatusCode;
use ydb_grpc::ydb_proto::table::ExecuteScanQueryPartialResponse;

#[derive(Debug)]
pub struct QueryResult {
    pub(crate) results: Vec<ResultSet>,
    pub(crate) tx_id: String,
}

impl QueryResult {
    pub(crate) fn from_raw_result(
        error_on_truncate: bool,
        raw_res: RawExecuteDataQueryResult,
    ) -> YdbResult<Self> {
        trace!(
            "raw_res: {}",
            ensure_len_string(serde_json::to_string(&raw_res)?)
        );
        let mut results = Vec::with_capacity(raw_res.result_sets.len());
        for current_set in raw_res.result_sets.into_iter() {
            if error_on_truncate && current_set.truncated {
                return Err(
                    format!("got truncated result. result set index: {}", results.len())
                        .as_str()
                        .into(),
                );
            }
            let result_set = ResultSet::try_from(current_set)?;

            results.push(result_set);
        }

        Ok(QueryResult {
            results,
            tx_id: raw_res.tx_meta.id,
        })
    }

    pub fn into_only_result(self) -> YdbResult<ResultSet> {
        let mut iter = self.results.into_iter();
        match iter.next() {
            Some(result_set) => {
                if iter.next().is_none() {
                    Ok(result_set)
                } else {
                    Err(YdbError::from_str("more then one result set"))
                }
            }
            None => Err(YdbError::from_str("no result set")),
        }
    }

    pub fn into_only_row(self) -> YdbResult<Row> {
        let result_set = self.into_only_result()?;
        let mut rows = result_set.rows();
        match rows.next() {
            Some(first_row) => {
                if rows.next().is_none() {
                    Ok(first_row)
                } else {
                    Err(YdbError::from_str("result set has more then one row"))
                }
            }
            None => Err(YdbError::NoRows),
        }
    }
}

#[derive(Debug)]
pub struct ResultSet {
    columns: Vec<crate::types::Column>,
    columns_by_name: HashMap<String, usize>,
    raw_result_set: RawResultSet,
}

impl ResultSet {
    #[allow(dead_code)]
    pub(crate) fn columns(&self) -> &Vec<crate::types::Column> {
        &self.columns
    }

    pub fn rows(self) -> ResultSetRowsIter {
        ResultSetRowsIter {
            columns: Arc::new(self.columns),
            columns_by_name: Arc::new(self.columns_by_name),
            row_iter: self.raw_result_set.rows.into_iter(),
        }
    }

    #[allow(dead_code)]
    pub(crate) fn truncated(&self) -> bool {
        self.raw_result_set.truncated
    }
}

impl TryFrom<RawResultSet> for ResultSet {
    type Error = YdbError;

    fn try_from(value: RawResultSet) -> Result<Self, Self::Error> {
        let columns_by_name: HashMap<String, usize> = value
            .columns
            .iter()
            .enumerate()
            .map(|(index, column)| (column.name.clone(), index))
            .collect();
        Ok(Self {
            columns: value
                .columns
                .iter()
                .map(|item| item.clone().try_into())
                .try_collect()?,
            columns_by_name,
            raw_result_set: value,
        })
    }
}

impl IntoIterator for ResultSet {
    type Item = Row;
    type IntoIter = ResultSetRowsIter;

    fn into_iter(self) -> Self::IntoIter {
        self.rows()
    }
}

#[derive(Debug)]
pub struct Row {
    columns: Arc<Vec<crate::types::Column>>,
    columns_by_name: Arc<HashMap<String, usize>>,
    raw_values: HashMap<usize, RawValue>,
}

impl Row {
    pub fn remove_field_by_name(&mut self, name: &str) -> errors::YdbResult<Value> {
        if let Some(&index) = self.columns_by_name.get(name) {
            return self.remove_field(index);
        }
        Err(YdbError::Custom("field not found".into()))
    }

    pub fn remove_field(&mut self, index: usize) -> errors::YdbResult<Value> {
        match self.raw_values.remove(&index) {
            Some(val) => Ok(Value::try_from(RawTypedValue {
                r#type: self.columns[index].v_type.clone(),
                value: val,
            })?),
            None => Err(YdbError::Custom("it has no the field".into())),
        }
    }
}

pub struct ResultSetRowsIter {
    columns: Arc<Vec<crate::types::Column>>,
    columns_by_name: Arc<HashMap<String, usize>>,
    row_iter: IntoIter<Vec<RawValue>>,
}

impl Iterator for ResultSetRowsIter {
    type Item = Row;

    fn next(&mut self) -> Option<Self::Item> {
        match self.row_iter.next() {
            None => None,
            Some(row) => Some(Row {
                columns: self.columns.clone(),
                columns_by_name: self.columns_by_name.clone(),
                raw_values: row.into_iter().enumerate().collect(),
            }),
        }
    }
}

pub struct StreamResult {
    pub(crate) results: tonic::codec::Streaming<ExecuteScanQueryPartialResponse>,
}

impl StreamResult {
    pub async fn next(&mut self) -> YdbResult<Option<ResultSet>> {
        let partial_response = if let Some(partial_response) = self.results.message().await? {
            partial_response
        } else {
            return Ok(None);
        };
        if partial_response.status() != StatusCode::Success {
            return Err(YdbError::YdbStatusError(YdbStatusError {
                message: format!("{:?}", partial_response.issues),
                operation_status: partial_response.status,
                issues: proto_issues_to_ydb_issues(partial_response.issues),
            }));
        };
        let proto_result_set = if let Some(partial_result) = partial_response.result {
            if let Some(proto_result_set) = partial_result.result_set {
                proto_result_set
            } else {
                return Ok(None);
            }
        } else {
            return Err(YdbError::InternalError("unexpected empty result".into()));
        };
        let raw_res = RawResultSet::try_from(proto_result_set)?;
        let result_set = ResultSet::try_from(raw_res)?;
        Ok(Some(result_set))
    }
}

#[derive(Debug)]
pub struct ExplainResult {
    pub query_ast: String,
    pub query_plan: String,
    pub query_full_diagnostics: String,
}

impl From<crate::grpc_wrapper::raw_table_service::explain_data_query::RawExplainDataQueryResult>
    for ExplainResult
{
    fn from(
        value: crate::grpc_wrapper::raw_table_service::explain_data_query::RawExplainDataQueryResult,
    ) -> Self {
        Self {
            query_ast: value.query_ast,
            query_plan: value.query_plan,
            query_full_diagnostics: value.query_full_diagnostics,
        }
    }
}