1use futures::StreamExt;
2use futures::future::FutureExt;
3use sqlx_core::describe::Describe;
4use sqlx_core::executor::{Executor, Execute};
5use sqlx_core::Either;
6use tonic::codegen::futures_core::{future::BoxFuture, stream::BoxStream};
7use ydb_grpc_bindings::generated::ydb;
8use ydb::status_ids::StatusCode;
9use ydb::r#type::PrimitiveTypeId;
10use ydb::table::{ExecuteDataQueryRequest, ExplainDataQueryRequest, PrepareDataQueryRequest, PrepareQueryResult};
11use ydb_grpc_bindings::generated::ydb::table::ExecuteSchemeQueryRequest;
12
13use crate::client::TableClientWithSession;
14use crate::{YdbResponseWithResult, YdbTransaction};
15use crate::error::YdbError;
16use crate::auth::UpdatableToken;
17
18use super::prelude::*;
19
20#[derive(Debug)]
21pub struct YdbExecutor<'c> {
22 pub retry: bool,
23 pub inner: YdbTransaction<'c, UpdatableToken>,
24 pub log_options: LogOptions,
25}
26
27#[derive(Debug)]
28pub struct YdbSchemeExecutor<'c> {
29 pub inner: TableClientWithSession<'c, UpdatableToken>,
30 pub log_options: LogOptions,
31}
32
33fn make_grpc_request<'e>(mut query: impl Execute<'e, Ydb>) -> ExecuteDataQueryRequest {
34 let parameters = query.take_arguments().map(|a|a.0).unwrap_or_default();
35 let query = if let Some(statement) = query.statement() {
36 Some(crate::generated::ydb::table::query::Query::Id(statement.query_id().to_owned()))
37 } else {
38 Some(crate::generated::ydb::table::query::Query::YqlText(query.sql().to_owned()))
39 };
40 let query = Some(crate::generated::ydb::table::Query{query});
41 ExecuteDataQueryRequest{ query, parameters, ..Default::default()}
42}
43
44impl<'c> YdbExecutor<'c> {
45 pub fn retry(mut self) -> Self {
47 self.retry = true;
48 self
49 }
50 pub async fn send(&mut self, req: ExecuteDataQueryRequest) -> Result<YdbQueryResult, YdbError> {
51 let log_msg = format!("Running sql: {:?}", req.query);
52 let fut = self.inner.execute_data_query(req);
53 let response = self.log_options.wrap(&log_msg, fut).await?;
54 let result = response.into_inner().result().map_err(YdbError::from)?;
55 Ok(result.into())
56 }
57}
58
59impl<'c> Executor<'c> for YdbExecutor<'c> {
60 type Database = Ydb;
61
62 fn execute<'e, 'q: 'e, E: 'q>(mut self, query: E,) -> BoxFuture<'e, Result<YdbQueryResult, sqlx_core::Error>>
63 where 'c: 'e, E: Execute<'q, Self::Database> {
64 let req = make_grpc_request(query);
65 Box::pin(async move {
66 if self.retry {
67 let result = self.send(req.clone()).await;
68 match &result {
69 Err(YdbError::Ydb(ErrWithOperation(op))) if op.status() == StatusCode::BadSession => {
70 self.inner.table_client().update_session().await?;
71 self.send(req).await
72 }
73 _ => result
74 }
75 } else {
76 self.send(req).await
77 }.map_err(Into::into)
78 })
79 }
80
81 fn fetch_many<'e, 'q: 'e, E: 'q>(
82 self,
83 query: E,
84 ) -> BoxStream<'e, Result<Either<YdbQueryResult, YdbRow>,sqlx_core::Error>>
85 where 'c: 'e, E: Execute<'q, Ydb> {
86 let stream = futures::stream::once(self.execute(query))
87 .map(|r| {
88 let mut err = Vec::with_capacity(1);
89 let v = match r {
90 Ok(v) => v.result_sets,
91 Err(e) => {
92 err.push(Err(e));
93 vec![]
94 },
95 };
96 let v = v.into_iter()
97 .map(|rs|rs.to_rows().into_iter()).flatten()
98 .map(|r|Ok(Either::Right(r)))
99 .chain(err);
100 futures::stream::iter(v)
101 }).flatten();
102
103 Box::pin(stream)
104
105 }
106
107 fn fetch_optional<'e, 'q: 'e, E: 'q>(self, query: E) -> BoxFuture<'e, Result<Option<YdbRow>, sqlx_core::Error>>
108 where 'c: 'e, E: Execute<'q, Ydb> { Box::pin( async move {
109 let rows = self.fetch_all(query).await?;
110 Ok(rows.into_iter().next())
111 })}
112
113 fn prepare<'e, 'q: 'e>(mut self, sql: &'q str) -> BoxFuture<'e, Result<YdbStatement, sqlx_core::Error>>
114 where 'c: 'e {Box::pin(async move {
115 let yql_text = sql.to_owned();
116 let msg = format!("Prepare YQL statement: {}", sql);
117 let fut = self.inner.table_client().prepare_data_query(PrepareDataQueryRequest{yql_text, ..Default::default()});
118 let response = self.log_options.wrap(&msg, fut).await?;
119 let PrepareQueryResult {query_id, parameters_types} = response.into_inner().result().map_err(YdbError::from)?;
120 let parameters = parameters_types.into();
121 let yql = sql.to_owned();
122 Ok(YdbStatement {query_id, yql, parameters})
123 })}
124
125 fn fetch_all<'e, 'q: 'e, E: 'q>( self, query: E ) -> BoxFuture<'e, Result<Vec<YdbRow>, sqlx_core::Error>>
126 where 'c: 'e, E: Execute<'q, Self::Database> {Box::pin ( async move {
127 let result = self.execute(query).await?;
128 let rows = result.result_sets.into_iter().next().map(|rs|rs.to_rows()).unwrap_or_default();
129 Ok(rows)
130 })}
131
132 fn execute_many<'e, 'q: 'e, E: 'q>( self, query: E) -> BoxStream<'e, Result<YdbQueryResult, sqlx_core::Error>>
133 where 'c: 'e, E: Execute<'q, Self::Database> {
134 Box::pin(self.execute(query).into_stream())
135 }
136
137 fn fetch<'e, 'q: 'e, E: 'q>(self, query: E) -> BoxStream<'e, Result<YdbRow, sqlx_core::Error>>
138 where 'c: 'e, E: Execute<'q, Self::Database> {
139 let stream = futures::stream::once(self.fetch_all(query))
140 .map(|r| {
141 let mut err = Vec::with_capacity(1);
142 let v = match r {
143 Ok(v) => v,
144 Err(e) => {
145 err.push(Err(e));
146 vec![]
147 },
148 };
149 let v = v.into_iter().map(|i|Ok(i)).chain(err);
150 futures::stream::iter(v)
151 }).flatten();
152 Box::pin(stream)
153 }
154
155 fn fetch_one<'e, 'q: 'e, E: 'q>(self, query: E) -> BoxFuture<'e, Result<YdbRow, sqlx_core::Error>>
156 where 'c: 'e, E: Execute<'q, Self::Database> { Box::pin( async move {
157 let row = self.fetch_optional(query).await?;
158 row.ok_or(sqlx_core::Error::RowNotFound)
159 })}
160
161 fn prepare_with<'e, 'q: 'e>(self, sql: &'q str, _parameters: &'e [YdbTypeInfo]) -> BoxFuture<'e, Result<YdbStatement, sqlx_core::Error>>
162 where 'c: 'e { self.prepare(sql) }
163
164 fn describe<'e, 'q: 'e>(mut self, sql: &'q str) -> BoxFuture<'e, Result<Describe<Ydb>, sqlx_core::Error>>
166 where 'c: 'e { Box::pin( async move {
167 let response = self.inner.table_client().explain_data_query(ExplainDataQueryRequest{ yql_text: sql.to_owned(), ..Default::default() }).await?;
168 let result = response.into_inner().result().map_err(YdbError::from)?;
169 let (_, mut node) = super::minikql::Node::parse(&result.query_ast).map_err(|_|YdbError::DecodeAst)?;
170 node.eval();
171 let outputs = super::minikql::invoke_outputs(&node).unwrap_or_default();
172 let (columns, nullable) = outputs.into_iter().fold((vec![], vec![]), |(mut cols, mut nulls), (ordinal, name, typ, optional)|{
173 nulls.push(Some(optional));
174 let name = name.to_owned();
175 let type_info = if let Some(t) = PrimitiveTypeId::from_str_name(&typ.to_ascii_uppercase()) {
176 YdbTypeInfo::Primitive(t)
177 } else {
178 YdbTypeInfo::Unknown
179 };
180 cols.push(YdbColumn{ ordinal, name, type_info });
181 (cols, nulls)
182 });
183 let parameters = None;
185 Ok(Describe { columns, parameters, nullable })
186 })}
187}
188
189
190impl <'c> Executor<'c> for YdbSchemeExecutor<'c> {
191 type Database = Ydb;
192
193 fn execute<'e, 'q: 'e, E: 'q>(mut self, query: E,) -> BoxFuture<'e, Result<YdbQueryResult, sqlx_core::Error>>
194 where 'c: 'e, E: Execute<'q, Self::Database> {
195 let yql_text = query.sql().to_owned();
196 let msg = format!("Run YDB scheme statement: {yql_text}");
197 Box::pin(async move {
198 let fut = self.inner.execute_scheme_query(ExecuteSchemeQueryRequest{ yql_text, ..Default::default()});
199 self.log_options.wrap(&msg, fut).await?;
200 Ok(Default::default())
201 })
202 }
203 fn execute_many<'e, 'q: 'e, E: 'q>( self, query: E) -> BoxStream<'e, Result<YdbQueryResult, sqlx_core::Error>>
204 where 'c: 'e, E: Execute<'q, Self::Database> {
205 Box::pin(self.execute(query).into_stream())
206 }
207 fn fetch_many<'e, 'q: 'e, E: 'q>(self, _query: E,) -> BoxStream<'e, Result<Either<YdbQueryResult, YdbRow>, sqlx_core::Error>>
208 where 'c: 'e, E: Execute<'q, Self::Database> {
209 Box::pin(futures::future::err(only_execute_err()).into_stream())
210 }
211
212 fn fetch_optional<'e, 'q: 'e, E: 'q>(self, _query: E) -> BoxFuture<'e, Result<Option<YdbRow>, sqlx_core::Error>>
213 where 'c: 'e, E: Execute<'q, Self::Database> {
214 Box::pin(futures::future::err(only_execute_err()))
215 }
216
217 fn prepare_with<'e, 'q: 'e>( self, _sql: &'q str, _parameters: &'e [YdbTypeInfo]) -> BoxFuture<'e, Result<YdbStatement, sqlx_core::Error>>
218 where 'c: 'e {
219 Box::pin(futures::future::err(only_execute_err()))
220 }
221
222 fn describe<'e, 'q: 'e>(self, _sql: &'q str) -> BoxFuture<'e, Result<Describe<Ydb>, sqlx_core::Error>>
223 where 'c: 'e {
224 Box::pin(futures::future::err(only_execute_err()))
225 }
226}
227
228fn only_execute_err() -> sqlx_core::Error {
229 sqlx_core::Error::AnyDriverError("Only execute method allowed in SchemeExecutor".to_owned().into())
230}