1use crate::errors;
2use crate::errors::{YdbError, YdbResult, YdbStatusError};
3use crate::grpc::proto_issues_to_ydb_issues;
4use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryResult;
5use crate::grpc_wrapper::raw_table_service::value::{RawResultSet, RawTypedValue, RawValue};
6use crate::trace_helpers::ensure_len_string;
7use crate::types::Value;
8use itertools::Itertools;
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::vec::IntoIter;
12use tracing::trace;
13use ydb_grpc::ydb_proto::status_ids::StatusCode;
14use ydb_grpc::ydb_proto::table::ExecuteScanQueryPartialResponse;
15
16#[derive(Debug)]
17pub struct QueryResult {
18 pub(crate) results: Vec<ResultSet>,
19 pub(crate) tx_id: String,
20}
21
22impl QueryResult {
23 pub(crate) fn from_raw_result(
24 error_on_truncate: bool,
25 raw_res: RawExecuteDataQueryResult,
26 ) -> YdbResult<Self> {
27 trace!(
28 "raw_res: {}",
29 ensure_len_string(serde_json::to_string(&raw_res)?)
30 );
31 let mut results = Vec::with_capacity(raw_res.result_sets.len());
32 for current_set in raw_res.result_sets.into_iter() {
33 if error_on_truncate && current_set.truncated {
34 return Err(
35 format!("got truncated result. result set index: {}", results.len())
36 .as_str()
37 .into(),
38 );
39 }
40 let result_set = ResultSet::try_from(current_set)?;
41
42 results.push(result_set);
43 }
44
45 Ok(QueryResult {
46 results,
47 tx_id: raw_res.tx_meta.id,
48 })
49 }
50
51 pub fn into_only_result(self) -> YdbResult<ResultSet> {
52 let mut iter = self.results.into_iter();
53 match iter.next() {
54 Some(result_set) => {
55 if iter.next().is_none() {
56 Ok(result_set)
57 } else {
58 Err(YdbError::from_str("more then one result set"))
59 }
60 }
61 None => Err(YdbError::from_str("no result set")),
62 }
63 }
64
65 pub fn into_only_row(self) -> YdbResult<Row> {
66 let result_set = self.into_only_result()?;
67 let mut rows = result_set.rows();
68 match rows.next() {
69 Some(first_row) => {
70 if rows.next().is_none() {
71 Ok(first_row)
72 } else {
73 Err(YdbError::from_str("result set has more then one row"))
74 }
75 }
76 None => Err(YdbError::NoRows),
77 }
78 }
79}
80
81#[derive(Debug)]
82pub struct ResultSet {
83 columns: Vec<crate::types::Column>,
84 columns_by_name: HashMap<String, usize>,
85 raw_result_set: RawResultSet,
86}
87
88impl ResultSet {
89 #[allow(dead_code)]
90 pub(crate) fn columns(&self) -> &Vec<crate::types::Column> {
91 &self.columns
92 }
93
94 pub fn rows(self) -> ResultSetRowsIter {
95 ResultSetRowsIter {
96 columns: Arc::new(self.columns),
97 columns_by_name: Arc::new(self.columns_by_name),
98 row_iter: self.raw_result_set.rows.into_iter(),
99 }
100 }
101
102 #[allow(dead_code)]
103 pub(crate) fn truncated(&self) -> bool {
104 self.raw_result_set.truncated
105 }
106}
107
108impl TryFrom<RawResultSet> for ResultSet {
109 type Error = YdbError;
110
111 fn try_from(value: RawResultSet) -> Result<Self, Self::Error> {
112 let columns_by_name: HashMap<String, usize> = value
113 .columns
114 .iter()
115 .enumerate()
116 .map(|(index, column)| (column.name.clone(), index))
117 .collect();
118 Ok(Self {
119 columns: value
120 .columns
121 .iter()
122 .map(|item| item.clone().try_into())
123 .try_collect()?,
124 columns_by_name,
125 raw_result_set: value,
126 })
127 }
128}
129
130impl IntoIterator for ResultSet {
131 type Item = Row;
132 type IntoIter = ResultSetRowsIter;
133
134 fn into_iter(self) -> Self::IntoIter {
135 self.rows()
136 }
137}
138
139#[derive(Debug)]
140pub struct Row {
141 columns: Arc<Vec<crate::types::Column>>,
142 columns_by_name: Arc<HashMap<String, usize>>,
143 raw_values: HashMap<usize, RawValue>,
144}
145
146impl Row {
147 pub fn remove_field_by_name(&mut self, name: &str) -> errors::YdbResult<Value> {
148 if let Some(&index) = self.columns_by_name.get(name) {
149 return self.remove_field(index);
150 }
151 Err(YdbError::Custom("field not found".into()))
152 }
153
154 pub fn remove_field(&mut self, index: usize) -> errors::YdbResult<Value> {
155 match self.raw_values.remove(&index) {
156 Some(val) => Ok(Value::try_from(RawTypedValue {
157 r#type: self.columns[index].v_type.clone(),
158 value: val,
159 })?),
160 None => Err(YdbError::Custom("it has no the field".into())),
161 }
162 }
163}
164
165pub struct ResultSetRowsIter {
166 columns: Arc<Vec<crate::types::Column>>,
167 columns_by_name: Arc<HashMap<String, usize>>,
168 row_iter: IntoIter<Vec<RawValue>>,
169}
170
171impl Iterator for ResultSetRowsIter {
172 type Item = Row;
173
174 fn next(&mut self) -> Option<Self::Item> {
175 match self.row_iter.next() {
176 None => None,
177 Some(row) => Some(Row {
178 columns: self.columns.clone(),
179 columns_by_name: self.columns_by_name.clone(),
180 raw_values: row.into_iter().enumerate().collect(),
181 }),
182 }
183 }
184}
185
186pub struct StreamResult {
187 pub(crate) results: tonic::codec::Streaming<ExecuteScanQueryPartialResponse>,
188}
189
190impl StreamResult {
191 pub async fn next(&mut self) -> YdbResult<Option<ResultSet>> {
192 let partial_response = if let Some(partial_response) = self.results.message().await? {
193 partial_response
194 } else {
195 return Ok(None);
196 };
197 if partial_response.status() != StatusCode::Success {
198 return Err(YdbError::YdbStatusError(YdbStatusError {
199 message: format!("{:?}", partial_response.issues),
200 operation_status: partial_response.status,
201 issues: proto_issues_to_ydb_issues(partial_response.issues),
202 }));
203 };
204 let proto_result_set = if let Some(partial_result) = partial_response.result {
205 if let Some(proto_result_set) = partial_result.result_set {
206 proto_result_set
207 } else {
208 return Ok(None);
209 }
210 } else {
211 return Err(YdbError::InternalError("unexpected empty result".into()));
212 };
213 let raw_res = RawResultSet::try_from(proto_result_set)?;
214 let result_set = ResultSet::try_from(raw_res)?;
215 Ok(Some(result_set))
216 }
217}