ydb_unofficial/
client.rs

1//! Common Ydb client, that wraps GRPC(s) transport with needed headers
2//! 
3//! # Examples
4//! ``` rust
5//! # #[tokio::main]
6//! # async fn main() {
7//!     let url = std::env::var("YDB_URL").expect("YDB_URL not set");
8//!     let db_name = std::env::var("DB_NAME").expect("DB_NAME not set");
9//!     let creds = std::env::var("DB_TOKEN").expect("DB_TOKEN not set");
10//! 
11//!     // how you can create service
12//!     let endpoint = ydb_unofficial::client::create_endpoint(url.try_into().unwrap());
13//!     let channel = endpoint.connect().await.unwrap();
14//!     use ydb_unofficial::YdbConnection;
15//!     let mut service = YdbConnection::new(channel, db_name.as_str().try_into().unwrap(), creds);
16//! 
17//!     // how to use it, e.g. use discovery service:
18//!     use ydb_unofficial::generated::ydb::discovery::ListEndpointsRequest;
19//!     let endpoints_response = service.discovery().list_endpoints(
20//!         ListEndpointsRequest{
21//!             database: db_name.into(), 
22//!             ..Default::default()
23//!         }
24//!     ).await.unwrap();
25//!     
26//!     // how you can parse response to invoke result with YdbResponseWithResult trait
27//!     use ydb_unofficial::YdbResponseWithResult;
28//!     let endpoints_result = endpoints_response.get_ref().result().unwrap();
29//!     assert!(endpoints_result.endpoints.len() > 0);
30//! 
31//!     // now to use table operations
32//!     use ydb_unofficial::generated::ydb::table;
33//!     
34//! # }
35//! ```
36use super::*;
37use std::sync::{Arc, RwLock};
38use error::YdbError;
39use auth::Credentials;
40
41use table::*;
42
43use tonic::codegen::InterceptedService;
44use tonic::service::Interceptor;
45use tonic::transport::{Endpoint, Channel, Uri};
46
47use payload::YdbResponseWithResult;
48use generated::ydb::discovery::v1::discovery_service_client::DiscoveryServiceClient;
49use generated::ydb::table::transaction_control::TxSelector;
50use generated::ydb::table::{TransactionSettings, ExecuteDataQueryRequest, TransactionControl, self, CreateSessionRequest, DeleteSessionRequest};
51use generated::ydb::table::transaction_settings::TxMode;
52use generated::ydb::table::v1::table_service_client::TableServiceClient;
53use tower::Service;
54
55#[derive(Debug, Clone)]
56pub struct YdbEndpoint {
57    pub ssl: bool,
58    pub host: String,
59    pub port: u16,
60    pub load_factor: f32,
61}
62
63impl YdbEndpoint {
64    pub fn scheme(&self) -> &'static str {
65        if self.ssl { "grpcs" } else { "grpc" }
66    }
67    pub fn make_endpoint(&self) -> Endpoint {
68        let uri: tonic::transport::Uri = format!("{}://{}:{}", self.scheme(), self.host, self.port).try_into().unwrap();
69        let mut e = Endpoint::from(uri).tcp_keepalive(Some(std::time::Duration::from_secs(15)));
70        if self.ssl {
71            e = e.tls_config(Default::default()).unwrap()
72        }
73        e
74    }
75}
76
77impl TryFrom<Uri> for YdbEndpoint {
78    type Error = String;
79
80    fn try_from(value: Uri) -> Result<Self, Self::Error> {
81        //TODO: убрать дублирование
82        let ssl = match value.scheme_str() {
83            Some("grpc") => false,
84            Some("grpcs") => true,
85            _ => return Err("Unknown protocol".to_owned()),
86        };
87        let host = value.host().ok_or("no host")?.to_owned();
88        let port = value.port_u16().ok_or("no port")?;
89        let load_factor = 0.0;
90        Ok(Self {ssl, host, port, load_factor})
91    }
92}
93
94
95/// Creates endpoint from uri
96/// If protocol is `grpcs`, then creates [`tonic::transport::ClientTlsConfig`] and applies to [`Endpoint`]
97///
98/// # Arguments
99///
100/// * `uri` - An [`Uri`] of endpoint
101///
102/// # Examples
103///
104/// ```
105/// use ydb_unofficial::client;
106/// let url = "grpcs://ydb.serverless.yandexcloud.net:2135";
107/// let enpoint = client::create_endpoint(url.try_into().unwrap());
108/// ```
109pub fn create_endpoint(uri: Uri) -> Endpoint {
110    let mut res = Endpoint::from(uri);
111    if matches!(res.uri().scheme_str(), Some("grpcs")) {
112        res = res.tls_config(tonic::transport::ClientTlsConfig::new()).unwrap()
113    };
114    res.tcp_keepalive(Some(std::time::Duration::from_secs(15)))
115}
116
117
118//#[allow(non_upper_case_globals)]
119#[ctor::ctor]
120static BUILD_INFO: AsciiValue = concat!("ydb-unofficial/", env!("CARGO_PKG_VERSION")).try_into().unwrap();
121
122#[derive(Clone, Debug)]
123struct DBInterceptor<C: Clone> {
124    db_name: AsciiValue,
125    creds: C
126}
127
128impl<C: Credentials> Interceptor for DBInterceptor<C> {
129    fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
130        let headers = request.metadata_mut();
131        headers.insert("x-ydb-database", self.db_name.clone());
132        headers.insert("x-ydb-sdk-build-info", BUILD_INFO.clone());
133        headers.insert("x-ydb-auth-ticket", self.creds.token());
134        Ok(request)    
135    }
136}
137
138/// Ydb connection implementation, that pass database name and auth data to grpc channel
139#[derive(Debug)]
140pub struct YdbConnection<C: Credentials> {
141    inner: InterceptedService<Channel, DBInterceptor<C>>,
142    session_id: Arc<RwLock<Option<String>>>,
143}
144
145
146impl<C: Credentials> Service<tonic::codegen::http::Request<tonic::body::BoxBody>> for YdbConnection<C> {
147    type Response = tonic::codegen::http::Response<tonic::body::BoxBody>;
148
149    type Error = tonic::transport::Error;
150
151    type Future = tonic::service::interceptor::ResponseFuture<tonic::transport::channel::ResponseFuture>;
152
153    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
154        self.inner.poll_ready(cx)
155    }
156
157    fn call(&mut self, request: tonic::codegen::http::Request<tonic::body::BoxBody>) -> Self::Future {
158        self.inner.call(request)
159    }
160}
161
162impl YdbConnection<String> {
163    /// Creates connection from environment.
164    /// It uses following env variables
165    /// * YDB_URL - grpc-url to database host
166    /// * DB_NAME - name of database connect to
167    /// * DB_TOKEN - temporary token to access to database
168    pub fn from_env() -> Self {
169        use std::env::var;
170        let url = var("YDB_URL").expect("YDB_URL not set");
171        let db_name = var("DB_NAME").expect("DB_NAME not set");
172        let creds = var("DB_TOKEN").expect("DB_TOKEN not set");
173    
174        let endpoint = create_endpoint(url.try_into().unwrap());
175        let channel = endpoint.connect_lazy();
176        YdbConnection::new(channel, db_name.as_str().try_into().unwrap(), creds)
177    }
178}
179
180impl<C: Credentials> YdbConnection<C> {
181    /// YdbConnection constructor
182    /// 
183    /// # Arguments
184    /// * `channel` - transport channel to database (can be make from [`Endpoint`])
185    /// * `db_name` - database name (you can get it from yandex cloud for example) in [`AsciiValue`] format
186    /// * `creds` - some object, that implements [`Credentials`] (e.g. [`String`])
187    /// 
188    /// # Examples
189    /// See [`self`]
190    pub fn new(channel: Channel, db_name: AsciiValue, creds: C) -> Self {
191        let interceptor = DBInterceptor {db_name, creds};
192        let inner = tower::ServiceBuilder::new()
193            .layer(tonic::service::interceptor(interceptor))
194            .service(channel);
195        YdbConnection{inner, session_id: Arc::new(RwLock::new(None))}
196    }
197    /// Creates discovery service client
198    /// 
199    /// # Examples
200    /// ```rust
201    /// # #[tokio::main] 
202    /// # async fn main() {
203    ///     let mut conn = ydb_unofficial::YdbConnection::from_env();
204    ///     let db_name = std::env::var("DB_NAME").unwrap();
205    ///     use ydb_unofficial::generated::ydb::discovery::ListEndpointsRequest;
206    ///     let endpoints_response = conn.discovery().list_endpoints(
207    ///         ListEndpointsRequest{
208    ///             database: db_name.into(), 
209    ///             ..Default::default()
210    ///         }
211    ///     ).await.unwrap();
212    ///     // how you can parse response to invoke result with YdbResponseWithResult trait
213    ///     use ydb_unofficial::YdbResponseWithResult;
214    ///     let endpoints_result = endpoints_response.get_ref().result().unwrap();
215    ///     assert!(endpoints_result.endpoints.len() > 0);
216    /// # }
217    /// ```
218    pub fn discovery(&mut self) -> DiscoveryServiceClient<&mut Self> {
219        DiscoveryServiceClient::new(self)
220    }
221
222    /// Creates session and returns [`TableClientWithSession`]
223    /// # Examples
224    /// ```rust
225    /// # #[tokio::main]
226    /// # async fn main() {
227    ///     let mut conn = ydb_unofficial::YdbConnection::from_env();
228    ///     let mut table_client = conn.table().await.unwrap();
229    ///     use ydb_unofficial::generated::ydb::table::*;
230    ///     let keep_alive_response = table_client.keep_alive(KeepAliveRequest::default()).await.unwrap();
231    ///     //..some another code
232    /// # }
233    /// ```
234    pub async fn table(&mut self) -> Result<TableClientWithSession<C>, YdbError> {
235        let session_id = if let Some(session_id) = self.session_id() {
236            session_id
237        } else {
238            let mut client = TableServiceClient::new(&mut self.inner);
239            let response = client.create_session(CreateSessionRequest::default()).await?;
240            let session_id = response.into_inner().result()?.session_id;
241            log::debug!("Session created: {session_id}");
242            *self.session_id.write().unwrap() = Some(session_id.clone());
243            session_id
244        };
245        let session_ref = self.session_id.clone();
246        let client = TableServiceClient::new(self);
247        Ok(TableClientWithSession {session_ref, session_id, client })
248    }
249    pub fn table_if_ready(&mut self) -> Option<TableClientWithSession<C>> {
250        let session_id = self.session_id()?;
251        let session_ref = self.session_id.clone();
252        Some(TableClientWithSession {session_ref, session_id, client: TableServiceClient::new(self) })
253    }
254    fn session_id(&self) -> Option<String> {
255        self.session_id.read().unwrap().clone()
256    }
257    pub async fn close_session(&mut self) -> Result<(), YdbError> {
258        delete_session(&self.session_id, self.inner.clone()).await?;
259        Ok(())
260    }
261    #[doc(hidden)]
262    pub fn close_session_hard(self) {
263        *self.session_id.write().unwrap() = None;
264    }
265}
266
267
268async fn delete_session<C: Credentials>(session_ref: &Arc<RwLock<Option<String>>>, service: InterceptedService<Channel, DBInterceptor<C>>)  -> Result<(), YdbError> {
269    let session_id = session_ref.read().unwrap().clone();
270    if let Some(session_id) = session_id {
271        let mut client = TableServiceClient::new(service);
272        let response = client.delete_session(DeleteSessionRequest{session_id, ..Default::default()}).await?;
273        let code = response.get_ref().operation.as_ref().ok_or(YdbError::EmptyResponse)?.status();
274        process_session_fail(code, session_ref);
275    }
276    *session_ref.write().unwrap() = None;
277    Ok(())
278}
279
280impl<C: Credentials> Drop for YdbConnection<C> {
281    fn drop(&mut self) {
282        if let Some(session_id) = self.session_id() {
283            let mut client = TableServiceClient::new(self.inner.clone());
284            tokio::spawn(async move {
285                let copy = session_id.clone();
286                if let Err(e) = client.delete_session(DeleteSessionRequest{session_id, ..Default::default()}).await {
287                    log::error!("Error on closing session ({copy}): {e}");
288                } else {
289                    log::debug!("Session closed: {copy}");
290                }
291            });
292        }
293        log::debug!("YdbConnection closed");
294    }
295}
296
297/// [`TableServiceClient`] with active session.
298/// for each method (that requires session_id) table client injects session_id field
299/// Session may be invalid. In this case you need to recreate that client with [`YdbConnection::table`]
300#[derive(Debug)]
301pub struct TableClientWithSession<'a, C: Credentials> {
302    //TODO: тут бы это все как-то покрасивее сделать, но из TableServiceClient YdbConnection не достать
303    session_ref: Arc<RwLock<Option<String>>>, 
304    session_id: String,
305    client: TableServiceClient<&'a mut YdbConnection<C>>,
306}
307
308fn process_session_fail(
309    code: crate::generated::ydb::status_ids::StatusCode, 
310    session_ref: &Arc<RwLock<Option<String>>>,
311) {
312    use crate::generated::ydb::status_ids::StatusCode;
313    match code {
314        StatusCode::BadSession | StatusCode::SessionExpired | StatusCode::SessionBusy => {
315            *session_ref.write().unwrap() = None;
316        },
317        _ => {}
318    }
319}
320
321macro_rules! delegate {
322    (with $field:ident : $(fn $fun:ident($arg:ty) -> $ret:ty;)+) => { $(
323        pub async fn $fun(&mut self, mut req: $arg) -> Result<tonic::Response<$ret>, YdbError> {
324            req.$field = self.$field.clone();
325            let result = self.client.$fun(req).await?;
326            let status = result.get_ref().operation.as_ref().ok_or(YdbError::EmptyResponse)?.status();
327            use crate::generated::ydb::status_ids::StatusCode;
328            use crate::error::ErrWithOperation;
329            match status {
330                StatusCode::Success => Ok(result),
331                _ => {
332                    process_session_fail(status, &self.session_ref);
333                    Err(YdbError::Ydb(ErrWithOperation(result.into_inner().operation.unwrap())))
334                },
335            }
336        }
337    )+} 
338}
339
340
341impl <'a, C: Credentials + Send> TableClientWithSession<'a, C> {
342    delegate!{ with session_id:
343        fn create_table(CreateTableRequest) -> CreateTableResponse;
344        fn drop_table(DropTableRequest) -> DropTableResponse;
345        fn alter_table(AlterTableRequest) -> AlterTableResponse;
346        fn copy_table(CopyTableRequest) -> CopyTableResponse;
347        fn rename_tables(RenameTablesRequest) -> RenameTablesResponse;
348        fn describe_table(DescribeTableRequest) -> DescribeTableResponse;
349        fn execute_data_query(ExecuteDataQueryRequest) -> ExecuteDataQueryResponse;
350        fn execute_scheme_query(ExecuteSchemeQueryRequest) -> ExecuteSchemeQueryResponse;
351        fn explain_data_query(ExplainDataQueryRequest) -> ExplainDataQueryResponse;
352        fn prepare_data_query(PrepareDataQueryRequest) -> PrepareDataQueryResponse;
353        fn keep_alive(KeepAliveRequest) -> KeepAliveResponse;
354        fn begin_transaction(BeginTransactionRequest) -> BeginTransactionResponse;
355        fn commit_transaction(CommitTransactionRequest) -> CommitTransactionResponse;
356        fn rollback_transaction(RollbackTransactionRequest) -> RollbackTransactionResponse;
357        fn delete_session(DeleteSessionRequest) -> DeleteSessionResponse;
358    }
359    pub async fn stream_read_table(&mut self, mut req: ReadTableRequest) -> Result<tonic::Response<tonic::codec::Streaming<ReadTableResponse>>, tonic::Status> {
360        req.session_id = self.session_id.clone();
361        self.client.stream_read_table(req).await
362    }
363    pub async fn update_session(&mut self) -> Result<(), YdbError> {
364        let response = self.client.create_session(CreateSessionRequest::default()).await?;
365        let session_id = response.into_inner().result()?.session_id;
366        log::debug!("Session created: {session_id}");
367        *self.session_ref.write().unwrap() = Some(session_id.clone());
368        self.session_id = session_id;
369        Ok(())
370    }
371}
372
373/// [`TableServiceClient`] with active session and transaction
374#[derive(Debug)]
375pub struct YdbTransaction<'a, C: Credentials> {
376    tx_control: TransactionControl,
377    //TODO: переделать на &mut
378    client: TableClientWithSession<'a, C>,
379}
380
381/// Transactions implementation for ydb.
382/// # Examples
383/// ```rust
384/// # #[tokio::main]
385/// # async fn main() {
386///   let mut conn = ydb_unofficial::YdbConnection::from_env();
387///   let mut transaction = ydb_unofficial::YdbTransaction::create(conn.table().await.unwrap()).await.unwrap(); 
388///   use ydb_unofficial::generated::ydb::table::query::Query;
389///   let req = ydb_unofficial::generated::ydb::table::ExecuteDataQueryRequest{
390///     query: Some(ydb_unofficial::generated::ydb::table::Query{
391///        query: Some(Query::YqlText("SELECT 1+1 as sum, 2*2 as mul".into()))
392///     }),
393///      ..Default::default()
394///   };
395///   let result = transaction.execute_data_query(req).await.unwrap();
396///   transaction.commit().await;
397/// # }
398/// ```
399impl<'a, C: Credentials> YdbTransaction<'a, C> {
400    pub(crate) fn new(client: TableClientWithSession<'a, C>, tx_control: TransactionControl) -> Self {
401        Self {client, tx_control}
402    }
403    pub(crate) fn table_client(&mut self) -> &mut TableClientWithSession<'a, C> {
404        &mut self.client
405    }
406    /// Method that just creates ReadWrite transaction 
407    pub async fn create(mut client: TableClientWithSession<'a, C>) -> Result<YdbTransaction<'a, C>, crate::error::YdbError> {
408        let tx_settings = Some(TransactionSettings{tx_mode: Some(TxMode::SerializableReadWrite(Default::default()))});
409        let response = client.begin_transaction(BeginTransactionRequest{tx_settings, ..Default::default()}).await?;
410        let tx_id = response.into_inner().result()?.tx_meta.unwrap().id;
411        let tx_control = TransactionControl{commit_tx: false, tx_selector: Some(TxSelector::TxId(tx_id))};
412        Ok(Self {tx_control, client})
413    }
414    fn invoke_tx_id(&self) -> String {
415        if let TxSelector::TxId(tx_id) = self.tx_control.clone().tx_selector.unwrap() {
416            tx_id
417        } else {
418            panic!("looks like a bug")
419        }
420    }
421    pub(crate) async fn commit_inner(&mut self) ->  Result<CommitTransactionResult, YdbError> {
422        let tx_id = self.invoke_tx_id();
423        let response = self.client.commit_transaction(CommitTransactionRequest {tx_id, ..Default::default()}).await?;
424        let result = response.into_inner().result()?; //что там может быть полезного?
425        Ok(result)
426    }
427    /// Commit transaction, drop transaction with commit
428    //TODO: какое-то упоротое возвращаемое значение
429    pub async fn commit(mut self) -> (TableClientWithSession<'a, C>, Result<CommitTransactionResult, YdbError>) {
430        let result = self.commit_inner().await;
431        (self.client, result)
432    }
433    pub (crate) async fn rollback_inner(&mut self) -> Result<(), YdbError> {
434        let tx_id = self.invoke_tx_id();
435        self.client.rollback_transaction(RollbackTransactionRequest {tx_id, ..Default::default()}).await?;
436        Ok(())
437    }
438    pub async fn rollback(mut self) -> (TableClientWithSession<'a, C>, Result<(), YdbError> ) {
439        let result = self.rollback_inner().await;
440        (self.client, result)
441    }
442    /// you can execute multiple query requests in transaction
443    /// transaction data will inject for each request
444    pub async fn execute_data_query(&mut self, mut req: ExecuteDataQueryRequest) -> Result<tonic::Response<ExecuteDataQueryResponse>,YdbError> {
445        req.tx_control = Some(self.tx_control.clone());
446        self.client.execute_data_query(req).await
447    }
448}