1use std::sync::Arc;
2use std::time::Duration;
3
4use google_cloud_gax::conn::Channel;
5use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
6use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
7use google_cloud_gax::retry::{invoke_fn, RetrySetting};
8use google_cloud_gax::{create_request, grpc};
9use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
10use google_cloud_googleapis::spanner::v1::{
11 BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
12 CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest,
13 GetSessionRequest, ListSessionsRequest, ListSessionsResponse, PartialResultSet, PartitionQueryRequest,
14 PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
15};
16
17use crate::metrics::MetricsRecorder;
18
19const ROUTE_TO_LEADER_HEADER: &str = "x-goog-spanner-route-to-leader";
20
21pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
22 ExecuteSqlRequest {
23 session: session_name.into(),
24 transaction: None,
25 sql: "SELECT 1".to_string(),
26 params: None,
27 param_types: Default::default(),
28 resume_token: vec![],
29 query_mode: 0,
30 partition_token: vec![],
31 seqno: 0,
32 query_options: None,
33 request_options: None,
34 directed_read_options: None,
35 data_boost_enabled: false,
36 last_statement: false,
37 }
38}
39
40fn default_setting() -> RetrySetting {
41 RetrySetting {
42 from_millis: 50,
43 max_delay: Some(Duration::from_secs(10)),
44 factor: 1u64,
45 take: 20,
46 codes: vec![Code::Unavailable, Code::Unknown],
47 }
48}
49
50#[derive(Clone)]
51pub struct Client {
52 inner: SpannerClient<Channel>,
53 metadata: MetadataMap,
54 metrics: Arc<MetricsRecorder>,
55}
56
57impl Client {
58 pub fn new(inner: SpannerClient<Channel>) -> Client {
60 Client {
62 inner: inner.max_decoding_message_size(i32::MAX as usize),
63 metadata: Default::default(),
64 metrics: Arc::new(MetricsRecorder::default()),
65 }
66 }
67
68 pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
70 Client {
71 inner: self.inner,
72 metadata,
73 metrics: self.metrics,
74 }
75 }
76
77 pub(crate) fn with_metrics(mut self, metrics: Arc<MetricsRecorder>) -> Client {
78 self.metrics = metrics;
79 self
80 }
81
82 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
102 pub async fn create_session(
103 &mut self,
104 req: CreateSessionRequest,
105 disable_route_to_leader: bool,
106 retry: Option<RetrySetting>,
107 ) -> Result<Response<Session>, Status> {
108 let setting = retry.unwrap_or_else(default_setting);
109 let database = &req.database;
110 let metrics = Arc::clone(&self.metrics);
111 invoke_fn(
112 Some(setting),
113 |this| async {
114 let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone());
115 this.inner.create_session(request).await.map_err(|e| (e, this))
116 },
117 self,
118 )
119 .await
120 .inspect(move |response| {
121 Self::record_gfe(metrics.as_ref(), "createSession", response);
122 })
123 }
124
125 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
130 pub async fn batch_create_sessions(
131 &mut self,
132 req: BatchCreateSessionsRequest,
133 disable_route_to_leader: bool,
134 retry: Option<RetrySetting>,
135 ) -> Result<Response<BatchCreateSessionsResponse>, Status> {
136 let setting = retry.unwrap_or_else(default_setting);
137 let database = &req.database;
138 let metrics = Arc::clone(&self.metrics);
139 invoke_fn(
140 Some(setting),
141 |this| async {
142 let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone());
143 this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
144 },
145 self,
146 )
147 .await
148 .inspect(move |response| {
149 Self::record_gfe(metrics.as_ref(), "batchCreateSessions", response);
150 })
151 }
152
153 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
156 pub async fn get_session(
157 &mut self,
158 req: GetSessionRequest,
159 disable_route_to_leader: bool,
160 retry: Option<RetrySetting>,
161 ) -> Result<Response<Session>, Status> {
162 let setting = retry.unwrap_or_else(default_setting);
163 let name = &req.name;
164 let metrics = Arc::clone(&self.metrics);
165 invoke_fn(
166 Some(setting),
167 |this| async {
168 let request = this.create_request(disable_route_to_leader, format!("name={name}"), req.clone());
169 this.inner.get_session(request).await.map_err(|e| (e, this))
170 },
171 self,
172 )
173 .await
174 .inspect(move |response| {
175 Self::record_gfe(metrics.as_ref(), "getSession", response);
176 })
177 }
178
179 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
181 pub async fn list_sessions(
182 &mut self,
183 req: ListSessionsRequest,
184 disable_route_to_leader: bool,
185 retry: Option<RetrySetting>,
186 ) -> Result<Response<ListSessionsResponse>, Status> {
187 let setting = retry.unwrap_or_else(default_setting);
188 let database = &req.database;
189 let metrics = Arc::clone(&self.metrics);
190 invoke_fn(
191 Some(setting),
192 |this| async {
193 let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone());
194 this.inner.list_sessions(request).await.map_err(|e| (e, this))
195 },
196 self,
197 )
198 .await
199 .inspect(move |response| {
200 Self::record_gfe(metrics.as_ref(), "listSessions", response);
201 })
202 }
203
204 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
208 pub async fn delete_session(
209 &mut self,
210 req: DeleteSessionRequest,
211 disable_route_to_leader: bool,
212 retry: Option<RetrySetting>,
213 ) -> Result<Response<()>, Status> {
214 let setting = retry.unwrap_or_else(default_setting);
215 let name = &req.name;
216 let metrics = Arc::clone(&self.metrics);
217 invoke_fn(
218 Some(setting),
219 |this| async {
220 let request = this.create_request(disable_route_to_leader, format!("name={name}"), req.clone());
221 this.inner.delete_session(request).await.map_err(|e| (e, this))
222 },
223 self,
224 )
225 .await
226 .inspect(move |response| {
227 Self::record_gfe(metrics.as_ref(), "deleteSession", response);
228 })
229 }
230
231 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
243 pub async fn execute_sql(
244 &mut self,
245 req: ExecuteSqlRequest,
246 disable_route_to_leader: bool,
247 retry: Option<RetrySetting>,
248 ) -> Result<Response<ResultSet>, Status> {
249 let setting = retry.unwrap_or_else(default_setting);
250 let session = &req.session;
251 let metrics = Arc::clone(&self.metrics);
252 invoke_fn(
253 Some(setting),
254 |this| async {
255 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
256 this.inner.execute_sql(request).await.map_err(|e| (e, this))
257 },
258 self,
259 )
260 .await
261 .inspect(move |response| {
262 Self::record_gfe(metrics.as_ref(), "executeSql", response);
263 })
264 }
265
266 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
272 pub async fn execute_streaming_sql(
273 &mut self,
274 req: ExecuteSqlRequest,
275 disable_route_to_leader: bool,
276 retry: Option<RetrySetting>,
277 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
278 let setting = retry.unwrap_or_else(default_setting);
279 let session = &req.session;
280 let metrics = Arc::clone(&self.metrics);
281 invoke_fn(
282 Some(setting),
283 |this| async {
284 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
285 this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
286 },
287 self,
288 )
289 .await
290 .inspect(move |response| {
291 Self::record_gfe(metrics.as_ref(), "executeStreamingSql", response);
292 })
293 }
294
295 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
307 pub async fn execute_batch_dml(
308 &mut self,
309 req: ExecuteBatchDmlRequest,
310 disable_route_to_leader: bool,
311 retry: Option<RetrySetting>,
312 ) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
313 let setting = retry.unwrap_or_else(default_setting);
314 let session = &req.session;
315 let metrics = Arc::clone(&self.metrics);
316 invoke_fn(
317 Some(setting),
318 |this| async {
319 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
320 let result = this.inner.execute_batch_dml(request).await;
321 match result {
322 Ok(response) => match response.get_ref().status.as_ref() {
323 Some(s) => {
324 let code = Code::from(s.code);
325 if code == Code::Ok {
326 Ok(response)
327 } else {
328 Err((Status::new(code, s.message.to_string()), this))
329 }
330 }
331 None => Ok(response),
332 },
333 Err(err) => Err((err, this)),
334 }
335 },
336 self,
337 )
338 .await
339 .inspect(move |response| {
340 Self::record_gfe(metrics.as_ref(), "executeBatchDml", response);
341 })
342 }
343
344 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
358 pub async fn read(
359 &mut self,
360 req: ReadRequest,
361 disable_route_to_leader: bool,
362 retry: Option<RetrySetting>,
363 ) -> Result<Response<ResultSet>, Status> {
364 let setting = retry.unwrap_or_else(default_setting);
365 let session = &req.session;
366 let metrics = Arc::clone(&self.metrics);
367 invoke_fn(
368 Some(setting),
369 |this| async {
370 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
371 this.inner.read(request).await.map_err(|e| (e, this))
372 },
373 self,
374 )
375 .await
376 .inspect(move |response| {
377 Self::record_gfe(metrics.as_ref(), "read", response);
378 })
379 }
380
381 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
387 pub async fn streaming_read(
388 &mut self,
389 req: ReadRequest,
390 disable_route_to_leader: bool,
391 retry: Option<RetrySetting>,
392 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
393 let setting = retry.unwrap_or_else(default_setting);
394 let session = &req.session;
395 let metrics = Arc::clone(&self.metrics);
396 invoke_fn(
397 Some(setting),
398 |this| async {
399 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
400 this.inner.streaming_read(request).await.map_err(|e| (e, this))
401 },
402 self,
403 )
404 .await
405 .inspect(move |response| {
406 Self::record_gfe(metrics.as_ref(), "streamingRead", response);
407 })
408 }
409
410 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
415 pub async fn begin_transaction(
416 &mut self,
417 req: BeginTransactionRequest,
418 disable_route_to_leader: bool,
419 retry: Option<RetrySetting>,
420 ) -> Result<Response<Transaction>, Status> {
421 let setting = retry.unwrap_or_else(default_setting);
422 let session = &req.session;
423 let metrics = Arc::clone(&self.metrics);
424 invoke_fn(
425 Some(setting),
426 |this| async {
427 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
428 this.inner.begin_transaction(request).await.map_err(|e| (e, this))
429 },
430 self,
431 )
432 .await
433 .inspect(move |response| {
434 Self::record_gfe(metrics.as_ref(), "beginTransaction", response);
435 })
436 }
437
438 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
453 pub async fn commit(
454 &mut self,
455 req: CommitRequest,
456 disable_route_to_leader: bool,
457 retry: Option<RetrySetting>,
458 ) -> Result<Response<CommitResponse>, Status> {
459 let setting = retry.unwrap_or_else(default_setting);
460 let session = &req.session;
461 let metrics = Arc::clone(&self.metrics);
462 invoke_fn(
463 Some(setting),
464 |this| async {
465 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
466 this.inner.commit(request).await.map_err(|e| (e, this))
467 },
468 self,
469 )
470 .await
471 .inspect(move |response| {
472 Self::record_gfe(metrics.as_ref(), "commit", response);
473 })
474 }
475
476 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
485 pub async fn rollback(
486 &mut self,
487 req: RollbackRequest,
488 disable_route_to_leader: bool,
489 retry: Option<RetrySetting>,
490 ) -> Result<Response<()>, Status> {
491 let setting = retry.unwrap_or_else(default_setting);
492 let session = &req.session;
493 let metrics = Arc::clone(&self.metrics);
494 invoke_fn(
495 Some(setting),
496 |this| async {
497 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
498 this.inner.rollback(request).await.map_err(|e| (e, this))
499 },
500 self,
501 )
502 .await
503 .inspect(move |response| {
504 Self::record_gfe(metrics.as_ref(), "rollback", response);
505 })
506 }
507
508 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
520 pub async fn partition_query(
521 &mut self,
522 req: PartitionQueryRequest,
523 disable_route_to_leader: bool,
524 retry: Option<RetrySetting>,
525 ) -> Result<Response<PartitionResponse>, Status> {
526 let setting = retry.unwrap_or_else(default_setting);
527 let session = &req.session;
528 let metrics = Arc::clone(&self.metrics);
529 invoke_fn(
530 Some(setting),
531 |this| async {
532 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
533 this.inner.partition_query(request).await.map_err(|e| (e, this))
534 },
535 self,
536 )
537 .await
538 .inspect(move |response| {
539 Self::record_gfe(metrics.as_ref(), "partitionQuery", response);
540 })
541 }
542
543 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
557 pub async fn partition_read(
558 &mut self,
559 req: PartitionReadRequest,
560 disable_route_to_leader: bool,
561 retry: Option<RetrySetting>,
562 ) -> Result<Response<PartitionResponse>, Status> {
563 let setting = retry.unwrap_or_else(default_setting);
564 let session = &req.session;
565 let metrics = Arc::clone(&self.metrics);
566 invoke_fn(
567 Some(setting),
568 |this| async {
569 let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
570 this.inner.partition_read(request).await.map_err(|e| (e, this))
571 },
572 self,
573 )
574 .await
575 .inspect(move |response| {
576 Self::record_gfe(metrics.as_ref(), "partitionRead", response);
577 })
578 }
579
580 fn create_request<T>(
581 &self,
582 disable_route_to_leader: bool,
583 param_string: String,
584 into_request: impl grpc::IntoRequest<T>,
585 ) -> grpc::Request<T> {
586 let mut req = create_request(param_string, into_request);
587 let target = req.metadata_mut();
588 if !disable_route_to_leader {
589 target.append(ROUTE_TO_LEADER_HEADER, "true".parse().unwrap());
590 }
591 for entry in self.metadata.iter() {
592 match entry {
593 KeyAndValueRef::Ascii(k, v) => {
594 target.append(k, v.clone());
595 }
596 KeyAndValueRef::Binary(k, v) => {
597 target.append_bin(k, v.clone());
598 }
599 }
600 }
601 req
602 }
603
604 fn record_gfe<T>(metrics: &MetricsRecorder, method: &'static str, response: &Response<T>) {
605 metrics.record_server_timing(method, response.metadata());
606 }
607}