1use std::time::Duration;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
5use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
6use google_cloud_gax::retry::{invoke_fn, RetrySetting};
7use google_cloud_gax::{create_request, grpc};
8use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
9use google_cloud_googleapis::spanner::v1::{
10 BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
11 CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest,
12 GetSessionRequest, ListSessionsRequest, ListSessionsResponse, PartialResultSet, PartitionQueryRequest,
13 PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
14};
15
16pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
17 ExecuteSqlRequest {
18 session: session_name.into(),
19 transaction: None,
20 sql: "SELECT 1".to_string(),
21 params: None,
22 param_types: Default::default(),
23 resume_token: vec![],
24 query_mode: 0,
25 partition_token: vec![],
26 seqno: 0,
27 query_options: None,
28 request_options: None,
29 directed_read_options: None,
30 data_boost_enabled: false,
31 last_statement: false,
32 }
33}
34
35fn default_setting() -> RetrySetting {
36 RetrySetting {
37 from_millis: 50,
38 max_delay: Some(Duration::from_secs(10)),
39 factor: 1u64,
40 take: 20,
41 codes: vec![Code::Unavailable, Code::Unknown],
42 }
43}
44
45#[derive(Clone)]
46pub struct Client {
47 inner: SpannerClient<Channel>,
48 metadata: MetadataMap,
49}
50
51impl Client {
52 pub fn new(inner: SpannerClient<Channel>) -> Client {
54 Client {
56 inner: inner.max_decoding_message_size(i32::MAX as usize),
57 metadata: Default::default(),
58 }
59 }
60
61 pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
63 Client {
64 inner: self.inner,
65 metadata,
66 }
67 }
68
69 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
89 pub async fn create_session(
90 &mut self,
91 req: CreateSessionRequest,
92 retry: Option<RetrySetting>,
93 ) -> Result<Response<Session>, Status> {
94 let setting = retry.unwrap_or_else(default_setting);
95 let database = &req.database;
96 invoke_fn(
97 Some(setting),
98 |this| async {
99 let request = this.create_request(format!("database={database}"), req.clone());
100 this.inner.create_session(request).await.map_err(|e| (e, this))
101 },
102 self,
103 )
104 .await
105 }
106
107 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
112 pub async fn batch_create_sessions(
113 &mut self,
114 req: BatchCreateSessionsRequest,
115 retry: Option<RetrySetting>,
116 ) -> Result<Response<BatchCreateSessionsResponse>, Status> {
117 let setting = retry.unwrap_or_else(default_setting);
118 let database = &req.database;
119 invoke_fn(
120 Some(setting),
121 |this| async {
122 let request = this.create_request(format!("database={database}"), req.clone());
123 this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
124 },
125 self,
126 )
127 .await
128 }
129
130 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
133 pub async fn get_session(
134 &mut self,
135 req: GetSessionRequest,
136 retry: Option<RetrySetting>,
137 ) -> Result<Response<Session>, Status> {
138 let setting = retry.unwrap_or_else(default_setting);
139 let name = &req.name;
140 invoke_fn(
141 Some(setting),
142 |this| async {
143 let request = this.create_request(format!("name={name}"), req.clone());
144 this.inner.get_session(request).await.map_err(|e| (e, this))
145 },
146 self,
147 )
148 .await
149 }
150
151 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
153 pub async fn list_sessions(
154 &mut self,
155 req: ListSessionsRequest,
156 retry: Option<RetrySetting>,
157 ) -> Result<Response<ListSessionsResponse>, Status> {
158 let setting = retry.unwrap_or_else(default_setting);
159 let database = &req.database;
160 invoke_fn(
161 Some(setting),
162 |this| async {
163 let request = this.create_request(format!("database={database}"), req.clone());
164 this.inner.list_sessions(request).await.map_err(|e| (e, this))
165 },
166 self,
167 )
168 .await
169 }
170
171 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
175 pub async fn delete_session(
176 &mut self,
177 req: DeleteSessionRequest,
178 retry: Option<RetrySetting>,
179 ) -> Result<Response<()>, Status> {
180 let setting = retry.unwrap_or_else(default_setting);
181 let name = &req.name;
182 invoke_fn(
183 Some(setting),
184 |this| async {
185 let request = this.create_request(format!("name={name}"), req.clone());
186 this.inner.delete_session(request).await.map_err(|e| (e, this))
187 },
188 self,
189 )
190 .await
191 }
192
193 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
205 pub async fn execute_sql(
206 &mut self,
207 req: ExecuteSqlRequest,
208 retry: Option<RetrySetting>,
209 ) -> Result<Response<ResultSet>, Status> {
210 let setting = retry.unwrap_or_else(default_setting);
211 let session = &req.session;
212 invoke_fn(
213 Some(setting),
214 |this| async {
215 let request = this.create_request(format!("session={session}"), req.clone());
216 this.inner.execute_sql(request).await.map_err(|e| (e, this))
217 },
218 self,
219 )
220 .await
221 }
222
223 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
229 pub async fn execute_streaming_sql(
230 &mut self,
231 req: ExecuteSqlRequest,
232 retry: Option<RetrySetting>,
233 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
234 let setting = retry.unwrap_or_else(default_setting);
235 let session = &req.session;
236 invoke_fn(
237 Some(setting),
238 |this| async {
239 let request = this.create_request(format!("session={session}"), req.clone());
240 this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
241 },
242 self,
243 )
244 .await
245 }
246
247 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
259 pub async fn execute_batch_dml(
260 &mut self,
261 req: ExecuteBatchDmlRequest,
262 retry: Option<RetrySetting>,
263 ) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
264 let setting = retry.unwrap_or_else(default_setting);
265 let session = &req.session;
266 invoke_fn(
267 Some(setting),
268 |this| async {
269 let request = this.create_request(format!("session={session}"), req.clone());
270 let result = this.inner.execute_batch_dml(request).await;
271 match result {
272 Ok(response) => match response.get_ref().status.as_ref() {
273 Some(s) => {
274 let code = Code::from(s.code);
275 if code == Code::Ok {
276 Ok(response)
277 } else {
278 Err((Status::new(code, s.message.to_string()), this))
279 }
280 }
281 None => Ok(response),
282 },
283 Err(err) => Err((err, this)),
284 }
285 },
286 self,
287 )
288 .await
289 }
290
291 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
305 pub async fn read(&mut self, req: ReadRequest, retry: Option<RetrySetting>) -> Result<Response<ResultSet>, Status> {
306 let setting = retry.unwrap_or_else(default_setting);
307 let session = &req.session;
308 invoke_fn(
309 Some(setting),
310 |this| async {
311 let request = this.create_request(format!("session={session}"), req.clone());
312 this.inner.read(request).await.map_err(|e| (e, this))
313 },
314 self,
315 )
316 .await
317 }
318
319 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
325 pub async fn streaming_read(
326 &mut self,
327 req: ReadRequest,
328 retry: Option<RetrySetting>,
329 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
330 let setting = retry.unwrap_or_else(default_setting);
331 let session = &req.session;
332 invoke_fn(
333 Some(setting),
334 |this| async {
335 let request = this.create_request(format!("session={session}"), req.clone());
336 this.inner.streaming_read(request).await.map_err(|e| (e, this))
337 },
338 self,
339 )
340 .await
341 }
342
343 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
348 pub async fn begin_transaction(
349 &mut self,
350 req: BeginTransactionRequest,
351 retry: Option<RetrySetting>,
352 ) -> Result<Response<Transaction>, Status> {
353 let setting = retry.unwrap_or_else(default_setting);
354 let session = &req.session;
355 invoke_fn(
356 Some(setting),
357 |this| async {
358 let request = this.create_request(format!("session={session}"), req.clone());
359 this.inner.begin_transaction(request).await.map_err(|e| (e, this))
360 },
361 self,
362 )
363 .await
364 }
365
366 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
381 pub async fn commit(
382 &mut self,
383 req: CommitRequest,
384 retry: Option<RetrySetting>,
385 ) -> Result<Response<CommitResponse>, Status> {
386 let setting = retry.unwrap_or_else(default_setting);
387 let session = &req.session;
388 invoke_fn(
389 Some(setting),
390 |this| async {
391 let request = this.create_request(format!("session={session}"), req.clone());
392 this.inner.commit(request).await.map_err(|e| (e, this))
393 },
394 self,
395 )
396 .await
397 }
398
399 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
408 pub async fn rollback(
409 &mut self,
410 req: RollbackRequest,
411 retry: Option<RetrySetting>,
412 ) -> Result<Response<()>, Status> {
413 let setting = retry.unwrap_or_else(default_setting);
414 let session = &req.session;
415 invoke_fn(
416 Some(setting),
417 |this| async {
418 let request = this.create_request(format!("session={session}"), req.clone());
419 this.inner.rollback(request).await.map_err(|e| (e, this))
420 },
421 self,
422 )
423 .await
424 }
425
426 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
438 pub async fn partition_query(
439 &mut self,
440 req: PartitionQueryRequest,
441 retry: Option<RetrySetting>,
442 ) -> Result<Response<PartitionResponse>, Status> {
443 let setting = retry.unwrap_or_else(default_setting);
444 let session = &req.session;
445 invoke_fn(
446 Some(setting),
447 |this| async {
448 let request = this.create_request(format!("session={session}"), req.clone());
449 this.inner.partition_query(request).await.map_err(|e| (e, this))
450 },
451 self,
452 )
453 .await
454 }
455
456 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
470 pub async fn partition_read(
471 &mut self,
472 req: PartitionReadRequest,
473 retry: Option<RetrySetting>,
474 ) -> Result<Response<PartitionResponse>, Status> {
475 let setting = retry.unwrap_or_else(default_setting);
476 let session = &req.session;
477 invoke_fn(
478 Some(setting),
479 |this| async {
480 let request = this.create_request(format!("session={session}"), req.clone());
481 this.inner.partition_read(request).await.map_err(|e| (e, this))
482 },
483 self,
484 )
485 .await
486 }
487
488 fn create_request<T>(&self, param_string: String, into_request: impl grpc::IntoRequest<T>) -> grpc::Request<T> {
489 let mut req = create_request(param_string, into_request);
490 let target = req.metadata_mut();
491 for entry in self.metadata.iter() {
492 match entry {
493 KeyAndValueRef::Ascii(k, v) => {
494 target.append(k, v.clone());
495 }
496 KeyAndValueRef::Binary(k, v) => {
497 target.append_bin(k, v.clone());
498 }
499 }
500 }
501 req
502 }
503}