ydb_unofficial/sqlx/
executor.rs

1use futures::StreamExt;
2use futures::future::FutureExt;
3use sqlx_core::describe::Describe;
4use sqlx_core::executor::{Executor, Execute};
5use sqlx_core::Either;
6use tonic::codegen::futures_core::{future::BoxFuture, stream::BoxStream};
7use ydb_grpc_bindings::generated::ydb;
8use ydb::status_ids::StatusCode;
9use ydb::r#type::PrimitiveTypeId;
10use ydb::table::{ExecuteDataQueryRequest, ExplainDataQueryRequest, PrepareDataQueryRequest, PrepareQueryResult};
11use ydb_grpc_bindings::generated::ydb::table::ExecuteSchemeQueryRequest;
12
13use crate::client::TableClientWithSession;
14use crate::{YdbResponseWithResult, YdbTransaction};
15use crate::error::YdbError;
16use crate::auth::UpdatableToken;
17
18use super::prelude::*;
19
20#[derive(Debug)]
21pub struct YdbExecutor<'c> {
22    pub retry: bool, 
23    pub inner: YdbTransaction<'c, UpdatableToken>,
24    pub log_options: LogOptions,
25}
26
27#[derive(Debug)]
28pub struct YdbSchemeExecutor<'c> {
29    pub inner: TableClientWithSession<'c, UpdatableToken>,
30    pub log_options: LogOptions,
31} 
32
33fn make_grpc_request<'e>(mut query: impl Execute<'e, Ydb>) -> ExecuteDataQueryRequest {
34    let parameters = query.take_arguments().map(|a|a.0).unwrap_or_default();
35    let query = if let Some(statement) = query.statement() {
36        Some(crate::generated::ydb::table::query::Query::Id(statement.query_id().to_owned()))
37    } else {
38        Some(crate::generated::ydb::table::query::Query::YqlText(query.sql().to_owned()))
39    };
40    let query = Some(crate::generated::ydb::table::Query{query});
41    ExecuteDataQueryRequest{ query, parameters, ..Default::default()}
42}
43
44impl<'c> YdbExecutor<'c> {
45    /// configure executor to handle expired session error. In this case executor updates the session, then retries query
46    pub fn retry(mut self) -> Self {
47        self.retry = true;
48        self
49    }
50    pub async fn send(&mut self, req: ExecuteDataQueryRequest) -> Result<YdbQueryResult, YdbError> {
51        let log_msg = format!("Running sql: {:?}", req.query);
52        let fut = self.inner.execute_data_query(req);
53        let response = self.log_options.wrap(&log_msg, fut).await?;
54        let result = response.into_inner().result().map_err(YdbError::from)?;
55        Ok(result.into())
56    }
57}
58
59impl<'c> Executor<'c> for YdbExecutor<'c> {
60    type Database = Ydb;
61
62    fn execute<'e, 'q: 'e, E: 'q>(mut self, query: E,) -> BoxFuture<'e, Result<YdbQueryResult, sqlx_core::Error>>
63    where 'c: 'e, E: Execute<'q, Self::Database> {
64        let req = make_grpc_request(query);
65        Box::pin(async move {
66            if self.retry {
67                let result = self.send(req.clone()).await;
68                match &result {
69                    Err(YdbError::Ydb(ErrWithOperation(op))) if op.status() == StatusCode::BadSession => {
70                        self.inner.table_client().update_session().await?;
71                        self.send(req).await
72                    }
73                    _ => result
74                }
75            } else {
76                self.send(req).await
77            }.map_err(Into::into)
78        })
79    }
80
81    fn fetch_many<'e, 'q: 'e, E: 'q>(
82        self,
83        query: E,
84    ) -> BoxStream<'e, Result<Either<YdbQueryResult, YdbRow>,sqlx_core::Error>>
85    where 'c: 'e, E: Execute<'q, Ydb> { 
86        let stream = futures::stream::once(self.execute(query))
87        .map(|r| {
88            let mut err = Vec::with_capacity(1);
89            let v = match r {
90                Ok(v) => v.result_sets,
91                Err(e) => {
92                    err.push(Err(e));
93                    vec![]
94                },
95            };
96            let v = v.into_iter()
97            .map(|rs|rs.to_rows().into_iter()).flatten()
98            .map(|r|Ok(Either::Right(r)))
99            .chain(err);
100            futures::stream::iter(v)
101        }).flatten();
102
103        Box::pin(stream)
104        
105    }
106
107    fn fetch_optional<'e, 'q: 'e, E: 'q>(self, query: E) -> BoxFuture<'e, Result<Option<YdbRow>, sqlx_core::Error>>
108    where 'c: 'e, E: Execute<'q, Ydb> { Box::pin( async move {
109        let rows = self.fetch_all(query).await?;
110        Ok(rows.into_iter().next())
111    })}
112
113    fn prepare<'e, 'q: 'e>(mut self, sql: &'q str) -> BoxFuture<'e, Result<YdbStatement, sqlx_core::Error>>
114    where 'c: 'e {Box::pin(async move {
115        let yql_text = sql.to_owned();
116        let msg = format!("Prepare YQL statement: {}", sql);
117        let fut = self.inner.table_client().prepare_data_query(PrepareDataQueryRequest{yql_text, ..Default::default()});
118        let response = self.log_options.wrap(&msg, fut).await?;
119        let PrepareQueryResult {query_id, parameters_types} = response.into_inner().result().map_err(YdbError::from)?;
120        let parameters = parameters_types.into();
121        let yql = sql.to_owned();
122        Ok(YdbStatement {query_id, yql, parameters})
123    })}
124
125    fn fetch_all<'e, 'q: 'e, E: 'q>( self, query: E ) -> BoxFuture<'e, Result<Vec<YdbRow>, sqlx_core::Error>>
126    where 'c: 'e, E: Execute<'q, Self::Database> {Box::pin ( async move {
127        let result = self.execute(query).await?;
128        let rows = result.result_sets.into_iter().next().map(|rs|rs.to_rows()).unwrap_or_default();
129        Ok(rows)
130    })}
131
132    fn execute_many<'e, 'q: 'e, E: 'q>( self, query: E) -> BoxStream<'e, Result<YdbQueryResult, sqlx_core::Error>>
133    where 'c: 'e, E: Execute<'q, Self::Database> {
134        Box::pin(self.execute(query).into_stream())
135    }
136
137    fn fetch<'e, 'q: 'e, E: 'q>(self, query: E) -> BoxStream<'e, Result<YdbRow, sqlx_core::Error>>
138    where 'c: 'e, E: Execute<'q, Self::Database> {
139        let stream = futures::stream::once(self.fetch_all(query))
140        .map(|r| {
141            let mut err = Vec::with_capacity(1);
142            let v = match r {
143                Ok(v) => v,
144                Err(e) => {
145                    err.push(Err(e));
146                    vec![]
147                },
148            };
149            let v = v.into_iter().map(|i|Ok(i)).chain(err);
150            futures::stream::iter(v)
151        }).flatten();
152        Box::pin(stream)
153    }
154
155    fn fetch_one<'e, 'q: 'e, E: 'q>(self, query: E) -> BoxFuture<'e, Result<YdbRow, sqlx_core::Error>>
156    where 'c: 'e, E: Execute<'q, Self::Database> { Box::pin( async move {
157        let row = self.fetch_optional(query).await?;
158        row.ok_or(sqlx_core::Error::RowNotFound)
159    })}
160
161    fn prepare_with<'e, 'q: 'e>(self, sql: &'q str, _parameters: &'e [YdbTypeInfo]) -> BoxFuture<'e, Result<YdbStatement, sqlx_core::Error>>
162    where 'c: 'e { self.prepare(sql) }
163
164    //TODO: спрятать под фичу
165    fn describe<'e, 'q: 'e>(mut self, sql: &'q str) -> BoxFuture<'e, Result<Describe<Ydb>, sqlx_core::Error>>
166    where 'c: 'e { Box::pin( async move {
167        let response = self.inner.table_client().explain_data_query(ExplainDataQueryRequest{ yql_text: sql.to_owned(), ..Default::default() }).await?;
168        let result = response.into_inner().result().map_err(YdbError::from)?;
169        let (_, mut node) = super::minikql::Node::parse(&result.query_ast).map_err(|_|YdbError::DecodeAst)?;
170        node.eval();
171        let outputs = super::minikql::invoke_outputs(&node).unwrap_or_default();
172        let (columns, nullable) = outputs.into_iter().fold((vec![], vec![]), |(mut cols, mut nulls), (ordinal, name, typ, optional)|{
173            nulls.push(Some(optional));
174            let name = name.to_owned();
175            let type_info = if let Some(t) = PrimitiveTypeId::from_str_name(&typ.to_ascii_uppercase()) {
176                YdbTypeInfo::Primitive(t)
177            } else {
178                YdbTypeInfo::Unknown
179            };
180            cols.push(YdbColumn{ ordinal, name, type_info });
181            (cols, nulls)
182        });
183        //TODO: implement parameters invoking
184        let parameters = None;
185        Ok(Describe { columns, parameters, nullable })
186    })}
187}
188
189
190impl <'c> Executor<'c> for YdbSchemeExecutor<'c> {
191    type Database = Ydb;
192
193    fn execute<'e, 'q: 'e, E: 'q>(mut self, query: E,) -> BoxFuture<'e, Result<YdbQueryResult, sqlx_core::Error>>
194    where 'c: 'e, E: Execute<'q, Self::Database> {
195        let yql_text = query.sql().to_owned();
196        let msg = format!("Run YDB scheme statement: {yql_text}");
197        Box::pin(async move {
198            let fut = self.inner.execute_scheme_query(ExecuteSchemeQueryRequest{ yql_text, ..Default::default()});
199            self.log_options.wrap(&msg, fut).await?;
200            Ok(Default::default())
201        })
202    }
203    fn execute_many<'e, 'q: 'e, E: 'q>( self, query: E) -> BoxStream<'e, Result<YdbQueryResult, sqlx_core::Error>>
204    where 'c: 'e, E: Execute<'q, Self::Database> {
205        Box::pin(self.execute(query).into_stream())
206    }
207    fn fetch_many<'e, 'q: 'e, E: 'q>(self, _query: E,) -> BoxStream<'e, Result<Either<YdbQueryResult, YdbRow>, sqlx_core::Error>>
208    where 'c: 'e, E: Execute<'q, Self::Database> {
209        Box::pin(futures::future::err(only_execute_err()).into_stream())
210    }
211
212    fn fetch_optional<'e, 'q: 'e, E: 'q>(self, _query: E) -> BoxFuture<'e, Result<Option<YdbRow>, sqlx_core::Error>>
213    where 'c: 'e, E: Execute<'q, Self::Database> {
214        Box::pin(futures::future::err(only_execute_err()))
215    }
216
217    fn prepare_with<'e, 'q: 'e>( self, _sql: &'q str, _parameters: &'e [YdbTypeInfo]) -> BoxFuture<'e, Result<YdbStatement, sqlx_core::Error>>
218    where 'c: 'e {
219        Box::pin(futures::future::err(only_execute_err()))
220    }
221
222    fn describe<'e, 'q: 'e>(self, _sql: &'q str) -> BoxFuture<'e, Result<Describe<Ydb>, sqlx_core::Error>>
223    where 'c: 'e { 
224        Box::pin(futures::future::err(only_execute_err()))
225    }
226}
227
228fn only_execute_err() -> sqlx_core::Error {
229    sqlx_core::Error::AnyDriverError("Only execute method allowed in SchemeExecutor".to_owned().into())
230}