ydb/
result.rs

1use crate::errors;
2use crate::errors::{YdbError, YdbResult, YdbStatusError};
3use crate::grpc::proto_issues_to_ydb_issues;
4use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryResult;
5use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue, RawValue};
6use crate::trace_helpers::ensure_len_string;
7use crate::types::Value;
8use itertools::Itertools;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::vec::IntoIter;
12use tracing::trace;
13use ydb_grpc::ydb_proto::status_ids::StatusCode;
14use ydb_grpc::ydb_proto::table::ExecuteScanQueryPartialResponse;
15
16#[derive(Debug)]
17pub struct QueryResult {
18    pub(crate) results: Vec<ResultSet>,
19    pub(crate) tx_id: String,
20}
21
22impl QueryResult {
23    pub(crate) fn from_raw_result(
24        error_on_truncate: bool,
25        raw_res: RawExecuteDataQueryResult,
26    ) -> YdbResult<Self> {
27        trace!(
28            "raw_res: {}",
29            ensure_len_string(serde_json::to_string(&raw_res)?)
30        );
31        let mut results = Vec::with_capacity(raw_res.result_sets.len());
32        for current_set in raw_res.result_sets.into_iter() {
33            if error_on_truncate && current_set.truncated {
34                return Err(
35                    format!("got truncated result. result set index: {}", results.len())
36                        .as_str()
37                        .into(),
38                );
39            }
40            let result_set = ResultSet::try_from(current_set)?;
41
42            results.push(result_set);
43        }
44
45        Ok(QueryResult {
46            results,
47            tx_id: raw_res.tx_meta.id,
48        })
49    }
50
51    pub fn into_only_result(self) -> YdbResult<ResultSet> {
52        let mut iter = self.results.into_iter();
53        match iter.next() {
54            Some(result_set) => {
55                if iter.next().is_none() {
56                    Ok(result_set)
57                } else {
58                    Err(YdbError::from_str("more then one result set"))
59                }
60            }
61            None => Err(YdbError::from_str("no result set")),
62        }
63    }
64
65    pub fn into_only_row(self) -> YdbResult<Row> {
66        let result_set = self.into_only_result()?;
67        let mut rows = result_set.rows();
68        match rows.next() {
69            Some(first_row) => {
70                if rows.next().is_none() {
71                    Ok(first_row)
72                } else {
73                    Err(YdbError::from_str("result set has more then one row"))
74                }
75            }
76            None => Err(YdbError::NoRows),
77        }
78    }
79}
80
81#[derive(Debug)]
82pub struct ResultSet {
83    columns: Vec<crate::types::Column>,
84    columns_by_name: HashMap<String, usize>,
85    raw_result_set: RawResultSet,
86}
87
88impl ResultSet {
89    #[allow(dead_code)]
90    pub(crate) fn columns(&self) -> &Vec<crate::types::Column> {
91        &self.columns
92    }
93
94    pub fn rows(self) -> ResultSetRowsIter {
95        ResultSetRowsIter {
96            columns: Arc::new(self.columns),
97            columns_by_name: Arc::new(self.columns_by_name),
98            row_iter: self.raw_result_set.rows.into_iter(),
99        }
100    }
101
102    #[allow(dead_code)]
103    pub(crate) fn truncated(&self) -> bool {
104        self.raw_result_set.truncated
105    }
106}
107
108impl TryFrom<RawResultSet> for ResultSet {
109    type Error = YdbError;
110
111    fn try_from(value: RawResultSet) -> Result<Self, Self::Error> {
112        let columns_by_name: HashMap<String, usize> = value
113            .columns
114            .iter()
115            .enumerate()
116            .map(|(index, column)| (column.name.clone(), index))
117            .collect();
118        Ok(Self {
119            columns: value
120                .columns
121                .iter()
122                .map(|item| item.clone().try_into())
123                .try_collect()?,
124            columns_by_name,
125            raw_result_set: value,
126        })
127    }
128}
129
130impl IntoIterator for ResultSet {
131    type Item = Row;
132    type IntoIter = ResultSetRowsIter;
133
134    fn into_iter(self) -> Self::IntoIter {
135        self.rows()
136    }
137}
138
139#[derive(Debug)]
140pub struct Row {
141    columns: Arc<Vec<crate::types::Column>>,
142    columns_by_name: Arc<HashMap<String, usize>>,
143    raw_values: HashMap<usize, RawValue>,
144}
145
146impl Row {
147    pub fn remove_field_by_name(&mut self, name: &str) -> errors::YdbResult<Value> {
148        if let Some(&index) = self.columns_by_name.get(name) {
149            return self.remove_field(index);
150        }
151        Err(YdbError::Custom("field not found".into()))
152    }
153
154    pub fn remove_field(&mut self, index: usize) -> errors::YdbResult<Value> {
155        match self.raw_values.remove(&index) {
156            Some(val) => Ok(Value::try_from(RawTypedValue {
157                r#type: self.columns[index].v_type.clone(),
158                value: val,
159            })?),
160            None => Err(YdbError::Custom("it has no the field".into())),
161        }
162    }
163}
164
165pub struct ResultSetRowsIter {
166    columns: Arc<Vec<crate::types::Column>>,
167    columns_by_name: Arc<HashMap<String, usize>>,
168    row_iter: IntoIter<Vec<RawValue>>,
169}
170
171impl Iterator for ResultSetRowsIter {
172    type Item = Row;
173
174    fn next(&mut self) -> Option<Self::Item> {
175        match self.row_iter.next() {
176            None => None,
177            Some(row) => Some(Row {
178                columns: self.columns.clone(),
179                columns_by_name: self.columns_by_name.clone(),
180                raw_values: row.into_iter().enumerate().collect(),
181            }),
182        }
183    }
184}
185
186pub struct StreamResult {
187    pub(crate) results: tonic::codec::Streaming<ExecuteScanQueryPartialResponse>,
188}
189
190impl StreamResult {
191    pub async fn next(&mut self) -> YdbResult<Option<ResultSet>> {
192        let partial_response = if let Some(partial_response) = self.results.message().await? {
193            partial_response
194        } else {
195            return Ok(None);
196        };
197        if partial_response.status() != StatusCode::Success {
198            return Err(YdbError::YdbStatusError(YdbStatusError {
199                message: format!("{:?}", partial_response.issues),
200                operation_status: partial_response.status,
201                issues: proto_issues_to_ydb_issues(partial_response.issues),
202            }));
203        };
204        let proto_result_set = if let Some(partial_result) = partial_response.result {
205            if let Some(proto_result_set) = partial_result.result_set {
206                proto_result_set
207            } else {
208                return Ok(None);
209            }
210        } else {
211            return Err(YdbError::InternalError("unexpected empty result".into()));
212        };
213        let raw_res = RawResultSet::try_from(proto_result_set)?;
214        let result_set = ResultSet::try_from(raw_res)?;
215        Ok(Some(result_set))
216    }
217}