1use std::time::Duration;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
5use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
6use google_cloud_gax::retry::{invoke_fn, RetrySetting};
7use google_cloud_gax::{create_request, grpc};
8use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
9use google_cloud_googleapis::spanner::v1::{
10 BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
11 CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest,
12 GetSessionRequest, ListSessionsRequest, ListSessionsResponse, PartialResultSet, PartitionQueryRequest,
13 PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
14};
15
16pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
17 ExecuteSqlRequest {
18 session: session_name.into(),
19 transaction: None,
20 sql: "SELECT 1".to_string(),
21 params: None,
22 param_types: Default::default(),
23 resume_token: vec![],
24 query_mode: 0,
25 partition_token: vec![],
26 seqno: 0,
27 query_options: None,
28 request_options: None,
29 directed_read_options: None,
30 data_boost_enabled: false,
31 }
32}
33
34fn default_setting() -> RetrySetting {
35 RetrySetting {
36 from_millis: 50,
37 max_delay: Some(Duration::from_secs(10)),
38 factor: 1u64,
39 take: 20,
40 codes: vec![Code::Unavailable, Code::Unknown],
41 }
42}
43
44#[derive(Clone)]
45pub struct Client {
46 inner: SpannerClient<Channel>,
47 metadata: MetadataMap,
48}
49
50impl Client {
51 pub fn new(inner: SpannerClient<Channel>) -> Client {
53 Client {
55 inner: inner.max_decoding_message_size(i32::MAX as usize),
56 metadata: Default::default(),
57 }
58 }
59
60 pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
62 Client {
63 inner: self.inner,
64 metadata,
65 }
66 }
67
68 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
88 pub async fn create_session(
89 &mut self,
90 req: CreateSessionRequest,
91 retry: Option<RetrySetting>,
92 ) -> Result<Response<Session>, Status> {
93 let setting = retry.unwrap_or_else(default_setting);
94 let database = &req.database;
95 invoke_fn(
96 Some(setting),
97 |this| async {
98 let request = this.create_request(format!("database={database}"), req.clone());
99 this.inner.create_session(request).await.map_err(|e| (e, this))
100 },
101 self,
102 )
103 .await
104 }
105
106 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
111 pub async fn batch_create_sessions(
112 &mut self,
113 req: BatchCreateSessionsRequest,
114 retry: Option<RetrySetting>,
115 ) -> Result<Response<BatchCreateSessionsResponse>, Status> {
116 let setting = retry.unwrap_or_else(default_setting);
117 let database = &req.database;
118 invoke_fn(
119 Some(setting),
120 |this| async {
121 let request = this.create_request(format!("database={database}"), req.clone());
122 this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
123 },
124 self,
125 )
126 .await
127 }
128
129 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
132 pub async fn get_session(
133 &mut self,
134 req: GetSessionRequest,
135 retry: Option<RetrySetting>,
136 ) -> Result<Response<Session>, Status> {
137 let setting = retry.unwrap_or_else(default_setting);
138 let name = &req.name;
139 invoke_fn(
140 Some(setting),
141 |this| async {
142 let request = this.create_request(format!("name={name}"), req.clone());
143 this.inner.get_session(request).await.map_err(|e| (e, this))
144 },
145 self,
146 )
147 .await
148 }
149
150 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
152 pub async fn list_sessions(
153 &mut self,
154 req: ListSessionsRequest,
155 retry: Option<RetrySetting>,
156 ) -> Result<Response<ListSessionsResponse>, Status> {
157 let setting = retry.unwrap_or_else(default_setting);
158 let database = &req.database;
159 invoke_fn(
160 Some(setting),
161 |this| async {
162 let request = this.create_request(format!("database={database}"), req.clone());
163 this.inner.list_sessions(request).await.map_err(|e| (e, this))
164 },
165 self,
166 )
167 .await
168 }
169
170 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
174 pub async fn delete_session(
175 &mut self,
176 req: DeleteSessionRequest,
177 retry: Option<RetrySetting>,
178 ) -> Result<Response<()>, Status> {
179 let setting = retry.unwrap_or_else(default_setting);
180 let name = &req.name;
181 invoke_fn(
182 Some(setting),
183 |this| async {
184 let request = this.create_request(format!("name={name}"), req.clone());
185 this.inner.delete_session(request).await.map_err(|e| (e, this))
186 },
187 self,
188 )
189 .await
190 }
191
192 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
204 pub async fn execute_sql(
205 &mut self,
206 req: ExecuteSqlRequest,
207 retry: Option<RetrySetting>,
208 ) -> Result<Response<ResultSet>, Status> {
209 let setting = retry.unwrap_or_else(default_setting);
210 let session = &req.session;
211 invoke_fn(
212 Some(setting),
213 |this| async {
214 let request = this.create_request(format!("session={session}"), req.clone());
215 this.inner.execute_sql(request).await.map_err(|e| (e, this))
216 },
217 self,
218 )
219 .await
220 }
221
222 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
228 pub async fn execute_streaming_sql(
229 &mut self,
230 req: ExecuteSqlRequest,
231 retry: Option<RetrySetting>,
232 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
233 let setting = retry.unwrap_or_else(default_setting);
234 let session = &req.session;
235 invoke_fn(
236 Some(setting),
237 |this| async {
238 let request = this.create_request(format!("session={session}"), req.clone());
239 this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
240 },
241 self,
242 )
243 .await
244 }
245
246 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
258 pub async fn execute_batch_dml(
259 &mut self,
260 req: ExecuteBatchDmlRequest,
261 retry: Option<RetrySetting>,
262 ) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
263 let setting = retry.unwrap_or_else(default_setting);
264 let session = &req.session;
265 invoke_fn(
266 Some(setting),
267 |this| async {
268 let request = this.create_request(format!("session={session}"), req.clone());
269 let result = this.inner.execute_batch_dml(request).await;
270 match result {
271 Ok(response) => match response.get_ref().status.as_ref() {
272 Some(s) => {
273 let code = Code::from(s.code);
274 if code == Code::Ok {
275 Ok(response)
276 } else {
277 Err((Status::new(code, s.message.to_string()), this))
278 }
279 }
280 None => Ok(response),
281 },
282 Err(err) => Err((err, this)),
283 }
284 },
285 self,
286 )
287 .await
288 }
289
290 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
304 pub async fn read(&mut self, req: ReadRequest, retry: Option<RetrySetting>) -> Result<Response<ResultSet>, Status> {
305 let setting = retry.unwrap_or_else(default_setting);
306 let session = &req.session;
307 invoke_fn(
308 Some(setting),
309 |this| async {
310 let request = this.create_request(format!("session={session}"), req.clone());
311 this.inner.read(request).await.map_err(|e| (e, this))
312 },
313 self,
314 )
315 .await
316 }
317
318 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
324 pub async fn streaming_read(
325 &mut self,
326 req: ReadRequest,
327 retry: Option<RetrySetting>,
328 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
329 let setting = retry.unwrap_or_else(default_setting);
330 let session = &req.session;
331 invoke_fn(
332 Some(setting),
333 |this| async {
334 let request = this.create_request(format!("session={session}"), req.clone());
335 this.inner.streaming_read(request).await.map_err(|e| (e, this))
336 },
337 self,
338 )
339 .await
340 }
341
342 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
347 pub async fn begin_transaction(
348 &mut self,
349 req: BeginTransactionRequest,
350 retry: Option<RetrySetting>,
351 ) -> Result<Response<Transaction>, Status> {
352 let setting = retry.unwrap_or_else(default_setting);
353 let session = &req.session;
354 invoke_fn(
355 Some(setting),
356 |this| async {
357 let request = this.create_request(format!("session={session}"), req.clone());
358 this.inner.begin_transaction(request).await.map_err(|e| (e, this))
359 },
360 self,
361 )
362 .await
363 }
364
365 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
380 pub async fn commit(
381 &mut self,
382 req: CommitRequest,
383 retry: Option<RetrySetting>,
384 ) -> Result<Response<CommitResponse>, Status> {
385 let setting = retry.unwrap_or_else(default_setting);
386 let session = &req.session;
387 invoke_fn(
388 Some(setting),
389 |this| async {
390 let request = this.create_request(format!("session={session}"), req.clone());
391 this.inner.commit(request).await.map_err(|e| (e, this))
392 },
393 self,
394 )
395 .await
396 }
397
398 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
407 pub async fn rollback(
408 &mut self,
409 req: RollbackRequest,
410 retry: Option<RetrySetting>,
411 ) -> Result<Response<()>, Status> {
412 let setting = retry.unwrap_or_else(default_setting);
413 let session = &req.session;
414 invoke_fn(
415 Some(setting),
416 |this| async {
417 let request = this.create_request(format!("session={session}"), req.clone());
418 this.inner.rollback(request).await.map_err(|e| (e, this))
419 },
420 self,
421 )
422 .await
423 }
424
425 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
437 pub async fn partition_query(
438 &mut self,
439 req: PartitionQueryRequest,
440 retry: Option<RetrySetting>,
441 ) -> Result<Response<PartitionResponse>, Status> {
442 let setting = retry.unwrap_or_else(default_setting);
443 let session = &req.session;
444 invoke_fn(
445 Some(setting),
446 |this| async {
447 let request = this.create_request(format!("session={session}"), req.clone());
448 this.inner.partition_query(request).await.map_err(|e| (e, this))
449 },
450 self,
451 )
452 .await
453 }
454
455 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
469 pub async fn partition_read(
470 &mut self,
471 req: PartitionReadRequest,
472 retry: Option<RetrySetting>,
473 ) -> Result<Response<PartitionResponse>, Status> {
474 let setting = retry.unwrap_or_else(default_setting);
475 let session = &req.session;
476 invoke_fn(
477 Some(setting),
478 |this| async {
479 let request = this.create_request(format!("session={session}"), req.clone());
480 this.inner.partition_read(request).await.map_err(|e| (e, this))
481 },
482 self,
483 )
484 .await
485 }
486
487 fn create_request<T>(&self, param_string: String, into_request: impl grpc::IntoRequest<T>) -> grpc::Request<T> {
488 let mut req = create_request(param_string, into_request);
489 let target = req.metadata_mut();
490 for entry in self.metadata.iter() {
491 match entry {
492 KeyAndValueRef::Ascii(k, v) => {
493 target.append(k, v.clone());
494 }
495 KeyAndValueRef::Binary(k, v) => {
496 target.append_bin(k, v.clone());
497 }
498 }
499 }
500 req
501 }
502}