1use crate::client::TimeoutSettings;
2use crate::client_table::TableServiceClientType;
3use crate::errors::{YdbError, YdbResult};
4use crate::query::Query;
5use crate::result::{QueryResult, StreamResult};
6use crate::types::Value;
7use derivative::Derivative;
8use itertools::Itertools;
9use std::sync::atomic::{AtomicI64, Ordering};
10
11use crate::grpc_connection_manager::GrpcConnectionManager;
12use crate::grpc_wrapper::raw_table_service::client::{
13 CollectStatsMode, RawTableClient, SessionStatus,
14};
15use crate::grpc_wrapper::runtime_interceptors::InterceptedChannel;
16
17use crate::grpc_wrapper::raw_errors::RawResult;
18use crate::grpc_wrapper::raw_table_service::bulk_upsert::RawBulkUpsertRequest;
19use crate::grpc_wrapper::raw_table_service::commit_transaction::RawCommitTransactionRequest;
20use crate::grpc_wrapper::raw_table_service::copy_table::{
21 RawCopyTableRequest, RawCopyTablesRequest,
22};
23use crate::grpc_wrapper::raw_table_service::execute_data_query::RawExecuteDataQueryRequest;
24use crate::grpc_wrapper::raw_table_service::execute_scheme_query::RawExecuteSchemeQueryRequest;
25use crate::grpc_wrapper::raw_table_service::keepalive::RawKeepAliveRequest;
26use crate::grpc_wrapper::raw_table_service::rollback_transaction::RawRollbackTransactionRequest;
27use crate::table_service_types::CopyTableItem;
28use crate::trace_helpers::ensure_len_string;
29use tracing::{debug, trace};
30use ydb_grpc::ydb_proto::table::v1::table_service_client::TableServiceClient;
31use ydb_grpc::ydb_proto::table::{execute_scan_query_request, ExecuteScanQueryRequest};
32
33static REQUEST_NUMBER: AtomicI64 = AtomicI64::new(0);
34static DEFAULT_COLLECT_STAT_MODE: CollectStatsMode = CollectStatsMode::None;
35
36fn req_number() -> i64 {
37 REQUEST_NUMBER.fetch_add(1, Ordering::Relaxed)
38}
39
40type DropSessionCallback = dyn FnOnce(&mut Session) + Send + Sync;
41
42#[derive(Derivative)]
43#[derivative(Debug)]
44pub(crate) struct Session {
45 pub(crate) id: String,
46
47 pub(crate) can_pooled: bool,
48
49 #[derivative(Debug = "ignore")]
50 on_drop_callbacks: Vec<Box<DropSessionCallback>>,
51
52 #[derivative(Debug = "ignore")]
53 channel_pool: Box<dyn CreateTableClient>,
54
55 timeouts: TimeoutSettings,
56}
57
58impl Session {
59 pub(crate) fn new<CT: CreateTableClient + 'static>(
60 id: String,
61 channel_pool: CT,
62 timeouts: TimeoutSettings,
63 ) -> Self {
64 Self {
65 id,
66 can_pooled: true,
67 on_drop_callbacks: Vec::new(),
68 channel_pool: Box::new(channel_pool),
69 timeouts,
70 }
71 }
72
73 pub(crate) fn handle_error(&mut self, err: &YdbError) {
74 if let YdbError::YdbStatusError(err) = err {
75 use ydb_grpc::ydb_proto::status_ids::StatusCode;
76 if let Ok(status) = StatusCode::try_from(err.operation_status) {
77 if status == StatusCode::BadSession || status == StatusCode::SessionExpired {
78 self.can_pooled = false;
79 }
80 }
81 }
82 }
83
84 fn handle_raw_result<T>(&mut self, res: RawResult<T>) -> YdbResult<T> {
85 let res = res.map_err(YdbError::from);
86 if let Err(err) = &res {
87 self.handle_error(err);
88 }
89 res
90 }
91
92 pub(crate) async fn commit_transaction(&mut self, tx_id: String) -> YdbResult<()> {
93 let mut table = self.get_table_client().await?;
94 let res = table
95 .commit_transaction(RawCommitTransactionRequest {
96 session_id: self.id.clone(),
97 tx_id,
98 operation_params: self.timeouts.operation_params(),
99 collect_stats: DEFAULT_COLLECT_STAT_MODE,
100 })
101 .await;
102 self.handle_raw_result(res)?;
103 Ok(())
104 }
105
106 pub(crate) async fn execute_schema_query(&mut self, query: String) -> YdbResult<()> {
107 let res = self
108 .get_table_client()
109 .await?
110 .execute_scheme_query(RawExecuteSchemeQueryRequest {
111 session_id: self.id.clone(),
112 yql_text: query,
113 operation_params: self.timeouts.operation_params(),
114 })
115 .await;
116 self.handle_raw_result(res)?;
117 Ok(())
118 }
119
120 pub(crate) async fn execute_bulk_upsert(
121 &mut self,
122 table_path: String,
123 rows: Value,
124 ) -> YdbResult<()> {
125 let req = RawBulkUpsertRequest {
126 table: table_path,
127 rows: rows.to_typed_value()?,
128 operation_params: self.timeouts.operation_params(),
129 };
130 let res = self.get_table_client().await?.bulk_upsert(req).await;
131 self.handle_raw_result(res)?;
132 Ok(())
133 }
134
135 #[tracing::instrument(skip(self, req), fields(req_number=req_number()))]
136 pub(crate) async fn execute_data_query(
137 &mut self,
138 mut req: RawExecuteDataQueryRequest,
139 error_on_truncated: bool,
140 ) -> YdbResult<QueryResult> {
141 req.session_id.clone_from(&self.id);
142 req.operation_params = self.timeouts.operation_params();
143
144 trace!(
145 "request: {}",
146 ensure_len_string(serde_json::to_string(&req)?)
147 );
148
149 let res = self.get_table_client().await?.execute_data_query(req).await;
150 let res = self.handle_raw_result(res)?;
151 trace!(
152 "result: {}",
153 ensure_len_string(serde_json::to_string(&res)?)
154 );
155 if error_on_truncated {
156 return Err(YdbError::from_str("result of query was truncated"));
157 }
158 QueryResult::from_raw_result(error_on_truncated, res)
159 }
160
161 #[tracing::instrument(skip(self, query), fields(req_number=req_number()))]
162 pub async fn execute_scan_query(&mut self, query: Query) -> YdbResult<StreamResult> {
163 let req = ExecuteScanQueryRequest {
164 query: Some(query.query_to_proto()),
165 parameters: query.params_to_proto()?,
166 mode: execute_scan_query_request::Mode::Exec as i32,
167 ..ExecuteScanQueryRequest::default()
168 };
169 debug!(
170 "request: {}",
171 crate::trace_helpers::ensure_len_string(serde_json::to_string(&req)?)
172 );
173 let mut channel = self.get_channel().await?;
174 let resp = channel.stream_execute_scan_query(req).await?;
175 let stream = resp.into_inner();
176 Ok(StreamResult { results: stream })
177 }
178
179 pub(crate) async fn rollback_transaction(&mut self, tx_id: String) -> YdbResult<()> {
180 let mut table = self.get_table_client().await?;
181 let res = table
182 .rollback_transaction(RawRollbackTransactionRequest {
183 session_id: self.id.clone(),
184 tx_id,
185 operation_params: self.timeouts.operation_params(),
186 })
187 .await;
188
189 self.handle_raw_result(res)
190 }
191
192 pub async fn copy_table(
193 &mut self,
194 source_path: String,
195 destination_path: String,
196 ) -> YdbResult<()> {
197 let mut table = self.get_table_client().await?;
198 let res = table
199 .copy_table(RawCopyTableRequest {
200 session_id: self.id.clone(),
201 source_path,
202 destination_path,
203 operation_params: self.timeouts.operation_params(),
204 })
205 .await;
206
207 self.handle_raw_result(res)
208 }
209
210 pub async fn copy_tables(&mut self, tables: Vec<CopyTableItem>) -> YdbResult<()> {
211 let mut table = self.get_table_client().await?;
212 let res = table
213 .copy_tables(RawCopyTablesRequest {
214 operation_params: self.timeouts.operation_params(),
215 session_id: self.id.clone(),
216 tables: tables.into_iter().map_into().collect(),
217 })
218 .await;
219
220 self.handle_raw_result(res)
221 }
222
223 pub(crate) async fn keepalive(&mut self) -> YdbResult<()> {
224 let mut table = self.get_table_client().await?;
225 let res = table
226 .keep_alive(RawKeepAliveRequest {
227 operation_params: self.timeouts.operation_params(),
228 session_id: self.id.clone(),
229 })
230 .await;
231
232 let res = self.handle_raw_result(res)?;
233
234 if let SessionStatus::Ready = res.session_status {
235 Ok(())
236 } else {
237 let err = YdbError::from_str(format!("bad status while session ping: {res:?}"));
238 self.handle_error(&err);
239 Err(err)
240 }
241 }
242
243 pub fn with_timeouts(mut self, timeouts: TimeoutSettings) -> Self {
244 self.timeouts = timeouts;
245 self
246 }
247
248 async fn get_channel(&self) -> YdbResult<TableServiceClientType> {
250 self.channel_pool.create_grpc_table_client().await
251 }
252
253 async fn get_table_client(&self) -> YdbResult<RawTableClient> {
254 self.channel_pool.create_table_client(self.timeouts).await
255 }
256
257 #[allow(dead_code)]
258 pub(crate) fn on_drop(&mut self, f: Box<dyn FnOnce(&mut Self) + Send + Sync>) {
259 self.on_drop_callbacks.push(f)
260 }
261
262 pub(crate) fn clone_without_ondrop(&self) -> Self {
263 Self {
264 id: self.id.clone(),
265 can_pooled: self.can_pooled,
266 on_drop_callbacks: Vec::new(),
267 channel_pool: self.channel_pool.clone_box(),
268 timeouts: self.timeouts,
269 }
270 }
271}
272
273impl Drop for Session {
274 fn drop(&mut self) {
275 trace!("drop session: {}", &self.id);
276 while let Some(on_drop) = self.on_drop_callbacks.pop() {
277 on_drop(self)
278 }
279 }
280}
281
282#[async_trait::async_trait]
283pub(crate) trait CreateTableClient: Send + Sync {
284 async fn create_grpc_table_client(&self) -> YdbResult<TableServiceClient<InterceptedChannel>>;
285 async fn create_table_client(&self, timeouts: TimeoutSettings) -> YdbResult<RawTableClient>;
286 fn clone_box(&self) -> Box<dyn CreateTableClient>;
287}
288
289#[async_trait::async_trait]
290impl CreateTableClient for GrpcConnectionManager {
291 async fn create_grpc_table_client(&self) -> YdbResult<TableServiceClient<InterceptedChannel>> {
292 self.get_auth_service(TableServiceClient::<InterceptedChannel>::new)
293 .await
294 }
295
296 async fn create_table_client(&self, timeouts: TimeoutSettings) -> YdbResult<RawTableClient> {
297 self.get_auth_service(RawTableClient::new)
298 .await
299 .map(|item| item.with_timeout(timeouts))
300 }
301
302 fn clone_box(&self) -> Box<dyn CreateTableClient> {
303 Box::new(self.clone())
304 }
305}