ydb/
session.rs

1use crate::client::TimeoutSettings;
2use crate::client_table::TableServiceClientType;
3use crate::errors::{YdbError, YdbResult};
4use crate::query::Query;
5use crate::result::{QueryResult, StreamResult};
6use crate::types::Value;
7use derivative::Derivative;
8use itertools::Itertools;
9use std::sync::atomic::{AtomicI64, Ordering};
10
11use crate::grpc_connection_manager::GrpcConnectionManager;
12use crate::grpc_wrapper::raw_table_service::client::{
13    CollectStatsMode, RawTableClient, SessionStatus,
14};
15use crate::grpc_wrapper::runtime_interceptors::InterceptedChannel;
16
17use crate::grpc_wrapper::raw_errors::RawResult;
18use crate::grpc_wrapper::raw_table_service::bulk_upsert::RawBulkUpsertRequest;
19use crate::grpc_wrapper::raw_table_service::commit_transaction::RawCommitTransactionRequest;
20use crate::grpc_wrapper::raw_table_service::copy_table::{
21    RawCopyTableRequest, RawCopyTablesRequest,
22};
23use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryRequest;
24use crate::grpc_wrapper::raw_table_service::execute_scheme_query::RawExecuteSchemeQueryRequest;
25use crate::grpc_wrapper::raw_table_service::keepalive::RawKeepAliveRequest;
26use crate::grpc_wrapper::raw_table_service::rollback_transaction::RawRollbackTransactionRequest;
27use crate::table_service_types::CopyTableItem;
28use crate::trace_helpers::ensure_len_string;
29use tracing::{debug, trace};
30use ydb_grpc::ydb_proto::table::v1::table_service_client::TableServiceClient;
31use ydb_grpc::ydb_proto::table::{execute_scan_query_request, ExecuteScanQueryRequest};
32
33static REQUEST_NUMBER: AtomicI64 = AtomicI64::new(0);
34static DEFAULT_COLLECT_STAT_MODE: CollectStatsMode = CollectStatsMode::None;
35
36fn req_number() -> i64 {
37    REQUEST_NUMBER.fetch_add(1, Ordering::Relaxed)
38}
39
40type DropSessionCallback = dyn FnOnce(&mut Session) + Send + Sync;
41
42#[derive(Derivative)]
43#[derivative(Debug)]
44pub(crate) struct Session {
45    pub(crate) id: String,
46
47    pub(crate) can_pooled: bool,
48
49    #[derivative(Debug = "ignore")]
50    on_drop_callbacks: Vec<Box<DropSessionCallback>>,
51
52    #[derivative(Debug = "ignore")]
53    channel_pool: Box<dyn CreateTableClient>,
54
55    timeouts: TimeoutSettings,
56}
57
58impl Session {
59    pub(crate) fn new<CT: CreateTableClient + 'static>(
60        id: String,
61        channel_pool: CT,
62        timeouts: TimeoutSettings,
63    ) -> Self {
64        Self {
65            id,
66            can_pooled: true,
67            on_drop_callbacks: Vec::new(),
68            channel_pool: Box::new(channel_pool),
69            timeouts,
70        }
71    }
72
73    pub(crate) fn handle_error(&mut self, err: &YdbError) {
74        if let YdbError::YdbStatusError(err) = err {
75            use ydb_grpc::ydb_proto::status_ids::StatusCode;
76            if let Ok(status) = StatusCode::try_from(err.operation_status) {
77                if status == StatusCode::BadSession || status == StatusCode::SessionExpired {
78                    self.can_pooled = false;
79                }
80            }
81        }
82    }
83
84    fn handle_raw_result<T>(&mut self, res: RawResult<T>) -> YdbResult<T> {
85        let res = res.map_err(YdbError::from);
86        if let Err(err) = &res {
87            self.handle_error(err);
88        }
89        res
90    }
91
92    pub(crate) async fn commit_transaction(&mut self, tx_id: String) -> YdbResult<()> {
93        let mut table = self.get_table_client().await?;
94        let res = table
95            .commit_transaction(RawCommitTransactionRequest {
96                session_id: self.id.clone(),
97                tx_id,
98                operation_params: self.timeouts.operation_params(),
99                collect_stats: DEFAULT_COLLECT_STAT_MODE,
100            })
101            .await;
102        self.handle_raw_result(res)?;
103        Ok(())
104    }
105
106    pub(crate) async fn execute_schema_query(&mut self, query: String) -> YdbResult<()> {
107        let res = self
108            .get_table_client()
109            .await?
110            .execute_scheme_query(RawExecuteSchemeQueryRequest {
111                session_id: self.id.clone(),
112                yql_text: query,
113                operation_params: self.timeouts.operation_params(),
114            })
115            .await;
116        self.handle_raw_result(res)?;
117        Ok(())
118    }
119
120    pub(crate) async fn execute_bulk_upsert(
121        &mut self,
122        table_path: String,
123        rows: Value,
124    ) -> YdbResult<()> {
125        let req = RawBulkUpsertRequest {
126            table: table_path,
127            rows: rows.to_typed_value()?,
128            operation_params: self.timeouts.operation_params(),
129        };
130        let res = self.get_table_client().await?.bulk_upsert(req).await;
131        self.handle_raw_result(res)?;
132        Ok(())
133    }
134
135    #[tracing::instrument(skip(self, req), fields(req_number=req_number()))]
136    pub(crate) async fn execute_data_query(
137        &mut self,
138        mut req: RawExecuteDataQueryRequest,
139        error_on_truncated: bool,
140    ) -> YdbResult<QueryResult> {
141        req.session_id.clone_from(&self.id);
142        req.operation_params = self.timeouts.operation_params();
143
144        trace!(
145            "request: {}",
146            ensure_len_string(serde_json::to_string(&req)?)
147        );
148
149        let res = self.get_table_client().await?.execute_data_query(req).await;
150        let res = self.handle_raw_result(res)?;
151        trace!(
152            "result: {}",
153            ensure_len_string(serde_json::to_string(&res)?)
154        );
155        if error_on_truncated {
156            return Err(YdbError::from_str("result of query was truncated"));
157        }
158        QueryResult::from_raw_result(error_on_truncated, res)
159    }
160
161    #[tracing::instrument(skip(self, query), fields(req_number=req_number()))]
162    pub async fn execute_scan_query(&mut self, query: Query) -> YdbResult<StreamResult> {
163        let req = ExecuteScanQueryRequest {
164            query: Some(query.query_to_proto()),
165            parameters: query.params_to_proto()?,
166            mode: execute_scan_query_request::Mode::Exec as i32,
167            ..ExecuteScanQueryRequest::default()
168        };
169        debug!(
170            "request: {}",
171            crate::trace_helpers::ensure_len_string(serde_json::to_string(&req)?)
172        );
173        let mut channel = self.get_channel().await?;
174        let resp = channel.stream_execute_scan_query(req).await?;
175        let stream = resp.into_inner();
176        Ok(StreamResult { results: stream })
177    }
178
179    pub(crate) async fn rollback_transaction(&mut self, tx_id: String) -> YdbResult<()> {
180        let mut table = self.get_table_client().await?;
181        let res = table
182            .rollback_transaction(RawRollbackTransactionRequest {
183                session_id: self.id.clone(),
184                tx_id,
185                operation_params: self.timeouts.operation_params(),
186            })
187            .await;
188
189        self.handle_raw_result(res)
190    }
191
192    pub async fn copy_table(
193        &mut self,
194        source_path: String,
195        destination_path: String,
196    ) -> YdbResult<()> {
197        let mut table = self.get_table_client().await?;
198        let res = table
199            .copy_table(RawCopyTableRequest {
200                session_id: self.id.clone(),
201                source_path,
202                destination_path,
203                operation_params: self.timeouts.operation_params(),
204            })
205            .await;
206
207        self.handle_raw_result(res)
208    }
209
210    pub async fn copy_tables(&mut self, tables: Vec<CopyTableItem>) -> YdbResult<()> {
211        let mut table = self.get_table_client().await?;
212        let res = table
213            .copy_tables(RawCopyTablesRequest {
214                operation_params: self.timeouts.operation_params(),
215                session_id: self.id.clone(),
216                tables: tables.into_iter().map_into().collect(),
217            })
218            .await;
219
220        self.handle_raw_result(res)
221    }
222
223    pub(crate) async fn keepalive(&mut self) -> YdbResult<()> {
224        let mut table = self.get_table_client().await?;
225        let res = table
226            .keep_alive(RawKeepAliveRequest {
227                operation_params: self.timeouts.operation_params(),
228                session_id: self.id.clone(),
229            })
230            .await;
231
232        let res = self.handle_raw_result(res)?;
233
234        if let SessionStatus::Ready = res.session_status {
235            Ok(())
236        } else {
237            let err = YdbError::from_str(format!("bad status while session ping: {res:?}"));
238            self.handle_error(&err);
239            Err(err)
240        }
241    }
242
243    pub fn with_timeouts(mut self, timeouts: TimeoutSettings) -> Self {
244        self.timeouts = timeouts;
245        self
246    }
247
248    // deprecated, use get_table_client instead
249    async fn get_channel(&self) -> YdbResult<TableServiceClientType> {
250        self.channel_pool.create_grpc_table_client().await
251    }
252
253    async fn get_table_client(&self) -> YdbResult<RawTableClient> {
254        self.channel_pool.create_table_client(self.timeouts).await
255    }
256
257    #[allow(dead_code)]
258    pub(crate) fn on_drop(&mut self, f: Box<dyn FnOnce(&mut Self) + Send + Sync>) {
259        self.on_drop_callbacks.push(f)
260    }
261
262    pub(crate) fn clone_without_ondrop(&self) -> Self {
263        Self {
264            id: self.id.clone(),
265            can_pooled: self.can_pooled,
266            on_drop_callbacks: Vec::new(),
267            channel_pool: self.channel_pool.clone_box(),
268            timeouts: self.timeouts,
269        }
270    }
271}
272
273impl Drop for Session {
274    fn drop(&mut self) {
275        trace!("drop session: {}", &self.id);
276        while let Some(on_drop) = self.on_drop_callbacks.pop() {
277            on_drop(self)
278        }
279    }
280}
281
282#[async_trait::async_trait]
283pub(crate) trait CreateTableClient: Send + Sync {
284    async fn create_grpc_table_client(&self) -> YdbResult<TableServiceClient<InterceptedChannel>>;
285    async fn create_table_client(&self, timeouts: TimeoutSettings) -> YdbResult<RawTableClient>;
286    fn clone_box(&self) -> Box<dyn CreateTableClient>;
287}
288
289#[async_trait::async_trait]
290impl CreateTableClient for GrpcConnectionManager {
291    async fn create_grpc_table_client(&self) -> YdbResult<TableServiceClient<InterceptedChannel>> {
292        self.get_auth_service(TableServiceClient::<InterceptedChannel>::new)
293            .await
294    }
295
296    async fn create_table_client(&self, timeouts: TimeoutSettings) -> YdbResult<RawTableClient> {
297        self.get_auth_service(RawTableClient::new)
298            .await
299            .map(|item| item.with_timeout(timeouts))
300    }
301
302    fn clone_box(&self) -> Box<dyn CreateTableClient> {
303        Box::new(self.clone())
304    }
305}