1use super::*;
37use std::sync::{Arc, RwLock};
38use error::YdbError;
39use auth::Credentials;
40
41use table::*;
42
43use tonic::codegen::InterceptedService;
44use tonic::service::Interceptor;
45use tonic::transport::{Endpoint, Channel, Uri};
46
47use payload::YdbResponseWithResult;
48use generated::ydb::discovery::v1::discovery_service_client::DiscoveryServiceClient;
49use generated::ydb::table::transaction_control::TxSelector;
50use generated::ydb::table::{TransactionSettings, ExecuteDataQueryRequest, TransactionControl, self, CreateSessionRequest, DeleteSessionRequest};
51use generated::ydb::table::transaction_settings::TxMode;
52use generated::ydb::table::v1::table_service_client::TableServiceClient;
53use tower::Service;
54
55#[derive(Debug, Clone)]
56pub struct YdbEndpoint {
57 pub ssl: bool,
58 pub host: String,
59 pub port: u16,
60 pub load_factor: f32,
61}
62
63impl YdbEndpoint {
64 pub fn scheme(&self) -> &'static str {
65 if self.ssl { "grpcs" } else { "grpc" }
66 }
67 pub fn make_endpoint(&self) -> Endpoint {
68 let uri: tonic::transport::Uri = format!("{}://{}:{}", self.scheme(), self.host, self.port).try_into().unwrap();
69 let mut e = Endpoint::from(uri).tcp_keepalive(Some(std::time::Duration::from_secs(15)));
70 if self.ssl {
71 e = e.tls_config(Default::default()).unwrap()
72 }
73 e
74 }
75}
76
77impl TryFrom<Uri> for YdbEndpoint {
78 type Error = String;
79
80 fn try_from(value: Uri) -> Result<Self, Self::Error> {
81 let ssl = match value.scheme_str() {
83 Some("grpc") => false,
84 Some("grpcs") => true,
85 _ => return Err("Unknown protocol".to_owned()),
86 };
87 let host = value.host().ok_or("no host")?.to_owned();
88 let port = value.port_u16().ok_or("no port")?;
89 let load_factor = 0.0;
90 Ok(Self {ssl, host, port, load_factor})
91 }
92}
93
94
95pub fn create_endpoint(uri: Uri) -> Endpoint {
110 let mut res = Endpoint::from(uri);
111 if matches!(res.uri().scheme_str(), Some("grpcs")) {
112 res = res.tls_config(tonic::transport::ClientTlsConfig::new()).unwrap()
113 };
114 res.tcp_keepalive(Some(std::time::Duration::from_secs(15)))
115}
116
117
118#[ctor::ctor]
120static BUILD_INFO: AsciiValue = concat!("ydb-unofficial/", env!("CARGO_PKG_VERSION")).try_into().unwrap();
121
122#[derive(Clone, Debug)]
123struct DBInterceptor<C: Clone> {
124 db_name: AsciiValue,
125 creds: C
126}
127
128impl<C: Credentials> Interceptor for DBInterceptor<C> {
129 fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
130 let headers = request.metadata_mut();
131 headers.insert("x-ydb-database", self.db_name.clone());
132 headers.insert("x-ydb-sdk-build-info", BUILD_INFO.clone());
133 headers.insert("x-ydb-auth-ticket", self.creds.token());
134 Ok(request)
135 }
136}
137
138#[derive(Debug)]
140pub struct YdbConnection<C: Credentials> {
141 inner: InterceptedService<Channel, DBInterceptor<C>>,
142 session_id: Arc<RwLock<Option<String>>>,
143}
144
145
146impl<C: Credentials> Service<tonic::codegen::http::Request<tonic::body::BoxBody>> for YdbConnection<C> {
147 type Response = tonic::codegen::http::Response<tonic::body::BoxBody>;
148
149 type Error = tonic::transport::Error;
150
151 type Future = tonic::service::interceptor::ResponseFuture<tonic::transport::channel::ResponseFuture>;
152
153 fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
154 self.inner.poll_ready(cx)
155 }
156
157 fn call(&mut self, request: tonic::codegen::http::Request<tonic::body::BoxBody>) -> Self::Future {
158 self.inner.call(request)
159 }
160}
161
162impl YdbConnection<String> {
163 pub fn from_env() -> Self {
169 use std::env::var;
170 let url = var("YDB_URL").expect("YDB_URL not set");
171 let db_name = var("DB_NAME").expect("DB_NAME not set");
172 let creds = var("DB_TOKEN").expect("DB_TOKEN not set");
173
174 let endpoint = create_endpoint(url.try_into().unwrap());
175 let channel = endpoint.connect_lazy();
176 YdbConnection::new(channel, db_name.as_str().try_into().unwrap(), creds)
177 }
178}
179
180impl<C: Credentials> YdbConnection<C> {
181 pub fn new(channel: Channel, db_name: AsciiValue, creds: C) -> Self {
191 let interceptor = DBInterceptor {db_name, creds};
192 let inner = tower::ServiceBuilder::new()
193 .layer(tonic::service::interceptor(interceptor))
194 .service(channel);
195 YdbConnection{inner, session_id: Arc::new(RwLock::new(None))}
196 }
197 pub fn discovery(&mut self) -> DiscoveryServiceClient<&mut Self> {
219 DiscoveryServiceClient::new(self)
220 }
221
222 pub async fn table(&mut self) -> Result<TableClientWithSession<C>, YdbError> {
235 let session_id = if let Some(session_id) = self.session_id() {
236 session_id
237 } else {
238 let mut client = TableServiceClient::new(&mut self.inner);
239 let response = client.create_session(CreateSessionRequest::default()).await?;
240 let session_id = response.into_inner().result()?.session_id;
241 log::debug!("Session created: {session_id}");
242 *self.session_id.write().unwrap() = Some(session_id.clone());
243 session_id
244 };
245 let session_ref = self.session_id.clone();
246 let client = TableServiceClient::new(self);
247 Ok(TableClientWithSession {session_ref, session_id, client })
248 }
249 pub fn table_if_ready(&mut self) -> Option<TableClientWithSession<C>> {
250 let session_id = self.session_id()?;
251 let session_ref = self.session_id.clone();
252 Some(TableClientWithSession {session_ref, session_id, client: TableServiceClient::new(self) })
253 }
254 fn session_id(&self) -> Option<String> {
255 self.session_id.read().unwrap().clone()
256 }
257 pub async fn close_session(&mut self) -> Result<(), YdbError> {
258 delete_session(&self.session_id, self.inner.clone()).await?;
259 Ok(())
260 }
261 #[doc(hidden)]
262 pub fn close_session_hard(self) {
263 *self.session_id.write().unwrap() = None;
264 }
265}
266
267
268async fn delete_session<C: Credentials>(session_ref: &Arc<RwLock<Option<String>>>, service: InterceptedService<Channel, DBInterceptor<C>>) -> Result<(), YdbError> {
269 let session_id = session_ref.read().unwrap().clone();
270 if let Some(session_id) = session_id {
271 let mut client = TableServiceClient::new(service);
272 let response = client.delete_session(DeleteSessionRequest{session_id, ..Default::default()}).await?;
273 let code = response.get_ref().operation.as_ref().ok_or(YdbError::EmptyResponse)?.status();
274 process_session_fail(code, session_ref);
275 }
276 *session_ref.write().unwrap() = None;
277 Ok(())
278}
279
280impl<C: Credentials> Drop for YdbConnection<C> {
281 fn drop(&mut self) {
282 if let Some(session_id) = self.session_id() {
283 let mut client = TableServiceClient::new(self.inner.clone());
284 tokio::spawn(async move {
285 let copy = session_id.clone();
286 if let Err(e) = client.delete_session(DeleteSessionRequest{session_id, ..Default::default()}).await {
287 log::error!("Error on closing session ({copy}): {e}");
288 } else {
289 log::debug!("Session closed: {copy}");
290 }
291 });
292 }
293 log::debug!("YdbConnection closed");
294 }
295}
296
297#[derive(Debug)]
301pub struct TableClientWithSession<'a, C: Credentials> {
302 session_ref: Arc<RwLock<Option<String>>>,
304 session_id: String,
305 client: TableServiceClient<&'a mut YdbConnection<C>>,
306}
307
308fn process_session_fail(
309 code: crate::generated::ydb::status_ids::StatusCode,
310 session_ref: &Arc<RwLock<Option<String>>>,
311) {
312 use crate::generated::ydb::status_ids::StatusCode;
313 match code {
314 StatusCode::BadSession | StatusCode::SessionExpired | StatusCode::SessionBusy => {
315 *session_ref.write().unwrap() = None;
316 },
317 _ => {}
318 }
319}
320
321macro_rules! delegate {
322 (with $field:ident : $(fn $fun:ident($arg:ty) -> $ret:ty;)+) => { $(
323 pub async fn $fun(&mut self, mut req: $arg) -> Result<tonic::Response<$ret>, YdbError> {
324 req.$field = self.$field.clone();
325 let result = self.client.$fun(req).await?;
326 let status = result.get_ref().operation.as_ref().ok_or(YdbError::EmptyResponse)?.status();
327 use crate::generated::ydb::status_ids::StatusCode;
328 use crate::error::ErrWithOperation;
329 match status {
330 StatusCode::Success => Ok(result),
331 _ => {
332 process_session_fail(status, &self.session_ref);
333 Err(YdbError::Ydb(ErrWithOperation(result.into_inner().operation.unwrap())))
334 },
335 }
336 }
337 )+}
338}
339
340
341impl <'a, C: Credentials + Send> TableClientWithSession<'a, C> {
342 delegate!{ with session_id:
343 fn create_table(CreateTableRequest) -> CreateTableResponse;
344 fn drop_table(DropTableRequest) -> DropTableResponse;
345 fn alter_table(AlterTableRequest) -> AlterTableResponse;
346 fn copy_table(CopyTableRequest) -> CopyTableResponse;
347 fn rename_tables(RenameTablesRequest) -> RenameTablesResponse;
348 fn describe_table(DescribeTableRequest) -> DescribeTableResponse;
349 fn execute_data_query(ExecuteDataQueryRequest) -> ExecuteDataQueryResponse;
350 fn execute_scheme_query(ExecuteSchemeQueryRequest) -> ExecuteSchemeQueryResponse;
351 fn explain_data_query(ExplainDataQueryRequest) -> ExplainDataQueryResponse;
352 fn prepare_data_query(PrepareDataQueryRequest) -> PrepareDataQueryResponse;
353 fn keep_alive(KeepAliveRequest) -> KeepAliveResponse;
354 fn begin_transaction(BeginTransactionRequest) -> BeginTransactionResponse;
355 fn commit_transaction(CommitTransactionRequest) -> CommitTransactionResponse;
356 fn rollback_transaction(RollbackTransactionRequest) -> RollbackTransactionResponse;
357 fn delete_session(DeleteSessionRequest) -> DeleteSessionResponse;
358 }
359 pub async fn stream_read_table(&mut self, mut req: ReadTableRequest) -> Result<tonic::Response<tonic::codec::Streaming<ReadTableResponse>>, tonic::Status> {
360 req.session_id = self.session_id.clone();
361 self.client.stream_read_table(req).await
362 }
363 pub async fn update_session(&mut self) -> Result<(), YdbError> {
364 let response = self.client.create_session(CreateSessionRequest::default()).await?;
365 let session_id = response.into_inner().result()?.session_id;
366 log::debug!("Session created: {session_id}");
367 *self.session_ref.write().unwrap() = Some(session_id.clone());
368 self.session_id = session_id;
369 Ok(())
370 }
371}
372
373#[derive(Debug)]
375pub struct YdbTransaction<'a, C: Credentials> {
376 tx_control: TransactionControl,
377 client: TableClientWithSession<'a, C>,
379}
380
381impl<'a, C: Credentials> YdbTransaction<'a, C> {
400 pub(crate) fn new(client: TableClientWithSession<'a, C>, tx_control: TransactionControl) -> Self {
401 Self {client, tx_control}
402 }
403 pub(crate) fn table_client(&mut self) -> &mut TableClientWithSession<'a, C> {
404 &mut self.client
405 }
406 pub async fn create(mut client: TableClientWithSession<'a, C>) -> Result<YdbTransaction<'a, C>, crate::error::YdbError> {
408 let tx_settings = Some(TransactionSettings{tx_mode: Some(TxMode::SerializableReadWrite(Default::default()))});
409 let response = client.begin_transaction(BeginTransactionRequest{tx_settings, ..Default::default()}).await?;
410 let tx_id = response.into_inner().result()?.tx_meta.unwrap().id;
411 let tx_control = TransactionControl{commit_tx: false, tx_selector: Some(TxSelector::TxId(tx_id))};
412 Ok(Self {tx_control, client})
413 }
414 fn invoke_tx_id(&self) -> String {
415 if let TxSelector::TxId(tx_id) = self.tx_control.clone().tx_selector.unwrap() {
416 tx_id
417 } else {
418 panic!("looks like a bug")
419 }
420 }
421 pub(crate) async fn commit_inner(&mut self) -> Result<CommitTransactionResult, YdbError> {
422 let tx_id = self.invoke_tx_id();
423 let response = self.client.commit_transaction(CommitTransactionRequest {tx_id, ..Default::default()}).await?;
424 let result = response.into_inner().result()?; Ok(result)
426 }
427 pub async fn commit(mut self) -> (TableClientWithSession<'a, C>, Result<CommitTransactionResult, YdbError>) {
430 let result = self.commit_inner().await;
431 (self.client, result)
432 }
433 pub (crate) async fn rollback_inner(&mut self) -> Result<(), YdbError> {
434 let tx_id = self.invoke_tx_id();
435 self.client.rollback_transaction(RollbackTransactionRequest {tx_id, ..Default::default()}).await?;
436 Ok(())
437 }
438 pub async fn rollback(mut self) -> (TableClientWithSession<'a, C>, Result<(), YdbError> ) {
439 let result = self.rollback_inner().await;
440 (self.client, result)
441 }
442 pub async fn execute_data_query(&mut self, mut req: ExecuteDataQueryRequest) -> Result<tonic::Response<ExecuteDataQueryResponse>,YdbError> {
445 req.tx_control = Some(self.tx_control.clone());
446 self.client.execute_data_query(req).await
447 }
448}