Skip to main content

gcloud_spanner/apiv1/
spanner_client.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use google_cloud_gax::conn::Channel;
5use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
6use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
7use google_cloud_gax::retry::{invoke_fn, RetrySetting};
8use google_cloud_gax::{create_request, grpc};
9use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
10use google_cloud_googleapis::spanner::v1::{
11    BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
12    CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest,
13    GetSessionRequest, ListSessionsRequest, ListSessionsResponse, PartialResultSet, PartitionQueryRequest,
14    PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
15};
16
17use crate::metrics::MetricsRecorder;
18
19const ROUTE_TO_LEADER_HEADER: &str = "x-goog-spanner-route-to-leader";
20
21pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
22    ExecuteSqlRequest {
23        session: session_name.into(),
24        transaction: None,
25        sql: "SELECT 1".to_string(),
26        params: None,
27        param_types: Default::default(),
28        resume_token: vec![],
29        query_mode: 0,
30        partition_token: vec![],
31        seqno: 0,
32        query_options: None,
33        request_options: None,
34        directed_read_options: None,
35        data_boost_enabled: false,
36        last_statement: false,
37    }
38}
39
40fn default_setting() -> RetrySetting {
41    RetrySetting {
42        from_millis: 50,
43        max_delay: Some(Duration::from_secs(10)),
44        factor: 1u64,
45        take: 20,
46        codes: vec![Code::Unavailable, Code::Unknown],
47    }
48}
49
50#[derive(Clone)]
51pub struct Client {
52    inner: SpannerClient<Channel>,
53    metadata: MetadataMap,
54    metrics: Arc<MetricsRecorder>,
55}
56
57impl Client {
58    /// create new spanner client
59    pub fn new(inner: SpannerClient<Channel>) -> Client {
60        // https://github.com/googleapis/google-cloud-go/blob/65a9ba55ed3777f520bd881d891e8917323549a5/spanner/apiv1/spanner_client.go#L73
61        Client {
62            inner: inner.max_decoding_message_size(i32::MAX as usize),
63            metadata: Default::default(),
64            metrics: Arc::new(MetricsRecorder::default()),
65        }
66    }
67
68    /// set metadata for spanner client
69    pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
70        Client {
71            inner: self.inner,
72            metadata,
73            metrics: self.metrics,
74        }
75    }
76
77    pub(crate) fn with_metrics(mut self, metrics: Arc<MetricsRecorder>) -> Client {
78        self.metrics = metrics;
79        self
80    }
81
82    /// create_session creates a new session. A session can be used to perform
83    /// transactions that read and/or modify data in a Cloud Spanner database.
84    /// Sessions are meant to be reused for many consecutive
85    /// transactions.
86    ///
87    /// Sessions can only execute one transaction at a time. To execute
88    /// multiple concurrent read-write/write-only transactions, create
89    /// multiple sessions. Note that standalone reads and queries use a
90    /// transaction internally, and count toward the one transaction
91    /// limit.
92    ///
93    /// Active sessions use additional server resources, so it is a good idea to
94    /// delete idle and unneeded sessions.
95    /// Aside from explicit deletes, Cloud Spanner may delete sessions for which no
96    /// operations are sent for more than an hour. If a session is deleted,
97    /// requests to it return NOT_FOUND.
98    ///
99    /// Idle sessions can be kept alive by sending a trivial SQL query
100    /// periodically, e.g., "SELECT 1".
101    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
102    pub async fn create_session(
103        &mut self,
104        req: CreateSessionRequest,
105        disable_route_to_leader: bool,
106        retry: Option<RetrySetting>,
107    ) -> Result<Response<Session>, Status> {
108        let setting = retry.unwrap_or_else(default_setting);
109        let database = &req.database;
110        let metrics = Arc::clone(&self.metrics);
111        invoke_fn(
112            Some(setting),
113            |this| async {
114                let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone());
115                this.inner.create_session(request).await.map_err(|e| (e, this))
116            },
117            self,
118        )
119        .await
120        .inspect(move |response| {
121            Self::record_gfe(metrics.as_ref(), "createSession", response);
122        })
123    }
124
125    /// batch_create_sessions creates multiple new sessions.
126    ///
127    /// This API can be used to initialize a session cache on the clients.
128    /// See https:///goo.gl/TgSFN2 (at https:///goo.gl/TgSFN2) for best practices on session cache management.
129    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
130    pub async fn batch_create_sessions(
131        &mut self,
132        req: BatchCreateSessionsRequest,
133        disable_route_to_leader: bool,
134        retry: Option<RetrySetting>,
135    ) -> Result<Response<BatchCreateSessionsResponse>, Status> {
136        let setting = retry.unwrap_or_else(default_setting);
137        let database = &req.database;
138        let metrics = Arc::clone(&self.metrics);
139        invoke_fn(
140            Some(setting),
141            |this| async {
142                let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone());
143                this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
144            },
145            self,
146        )
147        .await
148        .inspect(move |response| {
149            Self::record_gfe(metrics.as_ref(), "batchCreateSessions", response);
150        })
151    }
152
153    /// get_session gets a session. Returns NOT_FOUND if the session does not exist.
154    /// This is mainly useful for determining whether a session is still alive.
155    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
156    pub async fn get_session(
157        &mut self,
158        req: GetSessionRequest,
159        disable_route_to_leader: bool,
160        retry: Option<RetrySetting>,
161    ) -> Result<Response<Session>, Status> {
162        let setting = retry.unwrap_or_else(default_setting);
163        let name = &req.name;
164        let metrics = Arc::clone(&self.metrics);
165        invoke_fn(
166            Some(setting),
167            |this| async {
168                let request = this.create_request(disable_route_to_leader, format!("name={name}"), req.clone());
169                this.inner.get_session(request).await.map_err(|e| (e, this))
170            },
171            self,
172        )
173        .await
174        .inspect(move |response| {
175            Self::record_gfe(metrics.as_ref(), "getSession", response);
176        })
177    }
178
179    /// list_sessions lists all sessions in a given database.
180    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
181    pub async fn list_sessions(
182        &mut self,
183        req: ListSessionsRequest,
184        disable_route_to_leader: bool,
185        retry: Option<RetrySetting>,
186    ) -> Result<Response<ListSessionsResponse>, Status> {
187        let setting = retry.unwrap_or_else(default_setting);
188        let database = &req.database;
189        let metrics = Arc::clone(&self.metrics);
190        invoke_fn(
191            Some(setting),
192            |this| async {
193                let request = this.create_request(disable_route_to_leader, format!("database={database}"), req.clone());
194                this.inner.list_sessions(request).await.map_err(|e| (e, this))
195            },
196            self,
197        )
198        .await
199        .inspect(move |response| {
200            Self::record_gfe(metrics.as_ref(), "listSessions", response);
201        })
202    }
203
204    /// delete_session ends a session, releasing server resources associated with it. This will
205    /// asynchronously trigger cancellation of any operations that are running with
206    /// this session.
207    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
208    pub async fn delete_session(
209        &mut self,
210        req: DeleteSessionRequest,
211        disable_route_to_leader: bool,
212        retry: Option<RetrySetting>,
213    ) -> Result<Response<()>, Status> {
214        let setting = retry.unwrap_or_else(default_setting);
215        let name = &req.name;
216        let metrics = Arc::clone(&self.metrics);
217        invoke_fn(
218            Some(setting),
219            |this| async {
220                let request = this.create_request(disable_route_to_leader, format!("name={name}"), req.clone());
221                this.inner.delete_session(request).await.map_err(|e| (e, this))
222            },
223            self,
224        )
225        .await
226        .inspect(move |response| {
227            Self::record_gfe(metrics.as_ref(), "deleteSession", response);
228        })
229    }
230
231    /// execute_sql executes an SQL statement, returning all results in a single reply. This
232    /// method cannot be used to return a result set larger than 10 MiB;
233    /// if the query yields more data than that, the query fails with
234    /// a FAILED_PRECONDITION error.
235    ///
236    /// Operations inside read-write transactions might return ABORTED. If
237    /// this occurs, the application should restart the transaction from
238    /// the beginning. See Transaction for more details.
239    ///
240    /// Larger result sets can be fetched in streaming fashion by calling
241    /// ExecuteStreamingSql instead.
242    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
243    pub async fn execute_sql(
244        &mut self,
245        req: ExecuteSqlRequest,
246        disable_route_to_leader: bool,
247        retry: Option<RetrySetting>,
248    ) -> Result<Response<ResultSet>, Status> {
249        let setting = retry.unwrap_or_else(default_setting);
250        let session = &req.session;
251        let metrics = Arc::clone(&self.metrics);
252        invoke_fn(
253            Some(setting),
254            |this| async {
255                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
256                this.inner.execute_sql(request).await.map_err(|e| (e, this))
257            },
258            self,
259        )
260        .await
261        .inspect(move |response| {
262            Self::record_gfe(metrics.as_ref(), "executeSql", response);
263        })
264    }
265
266    /// execute_streaming_sql like ExecuteSql, except returns the result
267    /// set as a stream. Unlike ExecuteSql, there
268    /// is no limit on the size of the returned result set. However, no
269    /// individual row in the result set can exceed 100 MiB, and no
270    /// column value can exceed 10 MiB.
271    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
272    pub async fn execute_streaming_sql(
273        &mut self,
274        req: ExecuteSqlRequest,
275        disable_route_to_leader: bool,
276        retry: Option<RetrySetting>,
277    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
278        let setting = retry.unwrap_or_else(default_setting);
279        let session = &req.session;
280        let metrics = Arc::clone(&self.metrics);
281        invoke_fn(
282            Some(setting),
283            |this| async {
284                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
285                this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
286            },
287            self,
288        )
289        .await
290        .inspect(move |response| {
291            Self::record_gfe(metrics.as_ref(), "executeStreamingSql", response);
292        })
293    }
294
295    /// execute_batch_dml executes a batch of SQL DML statements. This method allows many statements
296    /// to be run with lower latency than submitting them sequentially with
297    /// ExecuteSql.
298    ///
299    /// Statements are executed in sequential order. A request can succeed even if
300    /// a statement fails. The ExecuteBatchDmlResponse.status field in the
301    /// response provides information about the statement that failed. Clients must
302    /// inspect this field to determine whether an error occurred.
303    ///
304    /// Execution stops after the first failed statement; the remaining statements
305    /// are not executed.
306    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
307    pub async fn execute_batch_dml(
308        &mut self,
309        req: ExecuteBatchDmlRequest,
310        disable_route_to_leader: bool,
311        retry: Option<RetrySetting>,
312    ) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
313        let setting = retry.unwrap_or_else(default_setting);
314        let session = &req.session;
315        let metrics = Arc::clone(&self.metrics);
316        invoke_fn(
317            Some(setting),
318            |this| async {
319                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
320                let result = this.inner.execute_batch_dml(request).await;
321                match result {
322                    Ok(response) => match response.get_ref().status.as_ref() {
323                        Some(s) => {
324                            let code = Code::from(s.code);
325                            if code == Code::Ok {
326                                Ok(response)
327                            } else {
328                                Err((Status::new(code, s.message.to_string()), this))
329                            }
330                        }
331                        None => Ok(response),
332                    },
333                    Err(err) => Err((err, this)),
334                }
335            },
336            self,
337        )
338        .await
339        .inspect(move |response| {
340            Self::record_gfe(metrics.as_ref(), "executeBatchDml", response);
341        })
342    }
343
344    /// read reads rows from the database using key lookups and scans, as a
345    /// simple key/value style alternative to
346    /// ExecuteSql.  This method cannot be used to
347    /// return a result set larger than 10 MiB; if the read matches more
348    /// data than that, the read fails with a FAILED_PRECONDITION
349    /// error.
350    ///
351    /// Reads inside read-write transactions might return ABORTED. If
352    /// this occurs, the application should restart the transaction from
353    /// the beginning. See Transaction for more details.
354    ///
355    /// Larger result sets can be yielded in streaming fashion by calling
356    /// StreamingRead instead.
357    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
358    pub async fn read(
359        &mut self,
360        req: ReadRequest,
361        disable_route_to_leader: bool,
362        retry: Option<RetrySetting>,
363    ) -> Result<Response<ResultSet>, Status> {
364        let setting = retry.unwrap_or_else(default_setting);
365        let session = &req.session;
366        let metrics = Arc::clone(&self.metrics);
367        invoke_fn(
368            Some(setting),
369            |this| async {
370                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
371                this.inner.read(request).await.map_err(|e| (e, this))
372            },
373            self,
374        )
375        .await
376        .inspect(move |response| {
377            Self::record_gfe(metrics.as_ref(), "read", response);
378        })
379    }
380
381    /// streaming_read like read, except returns the result set as a
382    /// stream. Unlike read, there is no limit on the
383    /// size of the returned result set. However, no individual row in
384    /// the result set can exceed 100 MiB, and no column value can exceed
385    /// 10 MiB.
386    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
387    pub async fn streaming_read(
388        &mut self,
389        req: ReadRequest,
390        disable_route_to_leader: bool,
391        retry: Option<RetrySetting>,
392    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
393        let setting = retry.unwrap_or_else(default_setting);
394        let session = &req.session;
395        let metrics = Arc::clone(&self.metrics);
396        invoke_fn(
397            Some(setting),
398            |this| async {
399                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
400                this.inner.streaming_read(request).await.map_err(|e| (e, this))
401            },
402            self,
403        )
404        .await
405        .inspect(move |response| {
406            Self::record_gfe(metrics.as_ref(), "streamingRead", response);
407        })
408    }
409
410    /// BeginTransaction begins a new transaction. This step can often be skipped:
411    /// Read, ExecuteSql and
412    /// Commit can begin a new transaction as a
413    /// side-effect.
414    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
415    pub async fn begin_transaction(
416        &mut self,
417        req: BeginTransactionRequest,
418        disable_route_to_leader: bool,
419        retry: Option<RetrySetting>,
420    ) -> Result<Response<Transaction>, Status> {
421        let setting = retry.unwrap_or_else(default_setting);
422        let session = &req.session;
423        let metrics = Arc::clone(&self.metrics);
424        invoke_fn(
425            Some(setting),
426            |this| async {
427                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
428                this.inner.begin_transaction(request).await.map_err(|e| (e, this))
429            },
430            self,
431        )
432        .await
433        .inspect(move |response| {
434            Self::record_gfe(metrics.as_ref(), "beginTransaction", response);
435        })
436    }
437
438    /// Commit commits a transaction. The request includes the mutations to be
439    /// applied to rows in the database.
440    ///
441    /// Commit might return an ABORTED error. This can occur at any time;
442    /// commonly, the cause is conflicts with concurrent
443    /// transactions. However, it can also happen for a variety of other
444    /// reasons. If Commit returns ABORTED, the caller should re-attempt
445    /// the transaction from the beginning, re-using the same session.
446    ///
447    /// On very rare occasions, Commit might return UNKNOWN. This can happen,
448    /// for example, if the client job experiences a 1+ hour networking failure.
449    /// At that point, Cloud Spanner has lost track of the transaction outcome and
450    /// we recommend that you perform another read from the database to see the
451    /// state of things as they are now.
452    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
453    pub async fn commit(
454        &mut self,
455        req: CommitRequest,
456        disable_route_to_leader: bool,
457        retry: Option<RetrySetting>,
458    ) -> Result<Response<CommitResponse>, Status> {
459        let setting = retry.unwrap_or_else(default_setting);
460        let session = &req.session;
461        let metrics = Arc::clone(&self.metrics);
462        invoke_fn(
463            Some(setting),
464            |this| async {
465                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
466                this.inner.commit(request).await.map_err(|e| (e, this))
467            },
468            self,
469        )
470        .await
471        .inspect(move |response| {
472            Self::record_gfe(metrics.as_ref(), "commit", response);
473        })
474    }
475
476    /// Rollback rolls back a transaction, releasing any locks it holds. It is a good
477    /// idea to call this for any transaction that includes one or more
478    /// Read or ExecuteSql requests and
479    /// ultimately decides not to commit.
480    ///
481    /// Rollback returns OK if it successfully aborts the transaction, the
482    /// transaction was already aborted, or the transaction is not
483    /// found. Rollback never returns ABORTED.
484    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
485    pub async fn rollback(
486        &mut self,
487        req: RollbackRequest,
488        disable_route_to_leader: bool,
489        retry: Option<RetrySetting>,
490    ) -> Result<Response<()>, Status> {
491        let setting = retry.unwrap_or_else(default_setting);
492        let session = &req.session;
493        let metrics = Arc::clone(&self.metrics);
494        invoke_fn(
495            Some(setting),
496            |this| async {
497                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
498                this.inner.rollback(request).await.map_err(|e| (e, this))
499            },
500            self,
501        )
502        .await
503        .inspect(move |response| {
504            Self::record_gfe(metrics.as_ref(), "rollback", response);
505        })
506    }
507
508    /// PartitionQuery creates a set of partition tokens that can be used to execute a query
509    /// operation in parallel.  Each of the returned partition tokens can be used
510    /// by ExecuteStreamingSql to specify a subset
511    /// of the query result to read.  The same session and read-only transaction
512    /// must be used by the PartitionQueryRequest used to create the
513    /// partition tokens and the ExecuteSqlRequests that use the partition tokens.
514    ///
515    /// Partition tokens become invalid when the session used to create them
516    /// is deleted, is idle for too long, begins a new transaction, or becomes too
517    /// old.  When any of these happen, it is not possible to resume the query, and
518    /// the whole operation must be restarted from the beginning.
519    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
520    pub async fn partition_query(
521        &mut self,
522        req: PartitionQueryRequest,
523        disable_route_to_leader: bool,
524        retry: Option<RetrySetting>,
525    ) -> Result<Response<PartitionResponse>, Status> {
526        let setting = retry.unwrap_or_else(default_setting);
527        let session = &req.session;
528        let metrics = Arc::clone(&self.metrics);
529        invoke_fn(
530            Some(setting),
531            |this| async {
532                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
533                this.inner.partition_query(request).await.map_err(|e| (e, this))
534            },
535            self,
536        )
537        .await
538        .inspect(move |response| {
539            Self::record_gfe(metrics.as_ref(), "partitionQuery", response);
540        })
541    }
542
543    /// PartitionRead creates a set of partition tokens that can be used to execute a read
544    /// operation in parallel.  Each of the returned partition tokens can be used
545    /// by StreamingRead to specify a subset of the read
546    /// result to read.  The same session and read-only transaction must be used by
547    /// the PartitionReadRequest used to create the partition tokens and the
548    /// ReadRequests that use the partition tokens.  There are no ordering
549    /// guarantees on rows returned among the returned partition tokens, or even
550    /// within each individual StreamingRead call issued with a partition_token.
551    ///
552    /// Partition tokens become invalid when the session used to create them
553    /// is deleted, is idle for too long, begins a new transaction, or becomes too
554    /// old.  When any of these happen, it is not possible to resume the read, and
555    /// the whole operation must be restarted from the beginning.
556    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
557    pub async fn partition_read(
558        &mut self,
559        req: PartitionReadRequest,
560        disable_route_to_leader: bool,
561        retry: Option<RetrySetting>,
562    ) -> Result<Response<PartitionResponse>, Status> {
563        let setting = retry.unwrap_or_else(default_setting);
564        let session = &req.session;
565        let metrics = Arc::clone(&self.metrics);
566        invoke_fn(
567            Some(setting),
568            |this| async {
569                let request = this.create_request(disable_route_to_leader, format!("session={session}"), req.clone());
570                this.inner.partition_read(request).await.map_err(|e| (e, this))
571            },
572            self,
573        )
574        .await
575        .inspect(move |response| {
576            Self::record_gfe(metrics.as_ref(), "partitionRead", response);
577        })
578    }
579
580    fn create_request<T>(
581        &self,
582        disable_route_to_leader: bool,
583        param_string: String,
584        into_request: impl grpc::IntoRequest<T>,
585    ) -> grpc::Request<T> {
586        let mut req = create_request(param_string, into_request);
587        let target = req.metadata_mut();
588        if !disable_route_to_leader {
589            target.append(ROUTE_TO_LEADER_HEADER, "true".parse().unwrap());
590        }
591        for entry in self.metadata.iter() {
592            match entry {
593                KeyAndValueRef::Ascii(k, v) => {
594                    target.append(k, v.clone());
595                }
596                KeyAndValueRef::Binary(k, v) => {
597                    target.append_bin(k, v.clone());
598                }
599            }
600        }
601        req
602    }
603
604    fn record_gfe<T>(metrics: &MetricsRecorder, method: &'static str, response: &Response<T>) {
605        metrics.record_server_timing(method, response.metadata());
606    }
607}