google_cloud_spanner/apiv1/
spanner_client.rs

1use std::time::Duration;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
5use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
6use google_cloud_gax::retry::{invoke_fn, RetrySetting};
7use google_cloud_gax::{create_request, grpc};
8use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
9use google_cloud_googleapis::spanner::v1::{
10    BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
11    CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest,
12    GetSessionRequest, ListSessionsRequest, ListSessionsResponse, PartialResultSet, PartitionQueryRequest,
13    PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
14};
15
16pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
17    ExecuteSqlRequest {
18        session: session_name.into(),
19        transaction: None,
20        sql: "SELECT 1".to_string(),
21        params: None,
22        param_types: Default::default(),
23        resume_token: vec![],
24        query_mode: 0,
25        partition_token: vec![],
26        seqno: 0,
27        query_options: None,
28        request_options: None,
29        directed_read_options: None,
30        data_boost_enabled: false,
31    }
32}
33
34fn default_setting() -> RetrySetting {
35    RetrySetting {
36        from_millis: 50,
37        max_delay: Some(Duration::from_secs(10)),
38        factor: 1u64,
39        take: 20,
40        codes: vec![Code::Unavailable, Code::Unknown],
41    }
42}
43
44#[derive(Clone)]
45pub struct Client {
46    inner: SpannerClient<Channel>,
47    metadata: MetadataMap,
48}
49
50impl Client {
51    /// create new spanner client
52    pub fn new(inner: SpannerClient<Channel>) -> Client {
53        // https://github.com/googleapis/google-cloud-go/blob/65a9ba55ed3777f520bd881d891e8917323549a5/spanner/apiv1/spanner_client.go#L73
54        Client {
55            inner: inner.max_decoding_message_size(i32::MAX as usize),
56            metadata: Default::default(),
57        }
58    }
59
60    /// set metadata for spanner client
61    pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
62        Client {
63            inner: self.inner,
64            metadata,
65        }
66    }
67
68    /// create_session creates a new session. A session can be used to perform
69    /// transactions that read and/or modify data in a Cloud Spanner database.
70    /// Sessions are meant to be reused for many consecutive
71    /// transactions.
72    ///
73    /// Sessions can only execute one transaction at a time. To execute
74    /// multiple concurrent read-write/write-only transactions, create
75    /// multiple sessions. Note that standalone reads and queries use a
76    /// transaction internally, and count toward the one transaction
77    /// limit.
78    ///
79    /// Active sessions use additional server resources, so it is a good idea to
80    /// delete idle and unneeded sessions.
81    /// Aside from explicit deletes, Cloud Spanner may delete sessions for which no
82    /// operations are sent for more than an hour. If a session is deleted,
83    /// requests to it return NOT_FOUND.
84    ///
85    /// Idle sessions can be kept alive by sending a trivial SQL query
86    /// periodically, e.g., "SELECT 1".
87    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
88    pub async fn create_session(
89        &mut self,
90        req: CreateSessionRequest,
91        retry: Option<RetrySetting>,
92    ) -> Result<Response<Session>, Status> {
93        let setting = retry.unwrap_or_else(default_setting);
94        let database = &req.database;
95        invoke_fn(
96            Some(setting),
97            |this| async {
98                let request = this.create_request(format!("database={database}"), req.clone());
99                this.inner.create_session(request).await.map_err(|e| (e, this))
100            },
101            self,
102        )
103        .await
104    }
105
106    /// batch_create_sessions creates multiple new sessions.
107    ///
108    /// This API can be used to initialize a session cache on the clients.
109    /// See https:///goo.gl/TgSFN2 (at https:///goo.gl/TgSFN2) for best practices on session cache management.
110    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
111    pub async fn batch_create_sessions(
112        &mut self,
113        req: BatchCreateSessionsRequest,
114        retry: Option<RetrySetting>,
115    ) -> Result<Response<BatchCreateSessionsResponse>, Status> {
116        let setting = retry.unwrap_or_else(default_setting);
117        let database = &req.database;
118        invoke_fn(
119            Some(setting),
120            |this| async {
121                let request = this.create_request(format!("database={database}"), req.clone());
122                this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
123            },
124            self,
125        )
126        .await
127    }
128
129    /// get_session gets a session. Returns NOT_FOUND if the session does not exist.
130    /// This is mainly useful for determining whether a session is still alive.
131    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
132    pub async fn get_session(
133        &mut self,
134        req: GetSessionRequest,
135        retry: Option<RetrySetting>,
136    ) -> Result<Response<Session>, Status> {
137        let setting = retry.unwrap_or_else(default_setting);
138        let name = &req.name;
139        invoke_fn(
140            Some(setting),
141            |this| async {
142                let request = this.create_request(format!("name={name}"), req.clone());
143                this.inner.get_session(request).await.map_err(|e| (e, this))
144            },
145            self,
146        )
147        .await
148    }
149
150    /// list_sessions lists all sessions in a given database.
151    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
152    pub async fn list_sessions(
153        &mut self,
154        req: ListSessionsRequest,
155        retry: Option<RetrySetting>,
156    ) -> Result<Response<ListSessionsResponse>, Status> {
157        let setting = retry.unwrap_or_else(default_setting);
158        let database = &req.database;
159        invoke_fn(
160            Some(setting),
161            |this| async {
162                let request = this.create_request(format!("database={database}"), req.clone());
163                this.inner.list_sessions(request).await.map_err(|e| (e, this))
164            },
165            self,
166        )
167        .await
168    }
169
170    /// delete_session ends a session, releasing server resources associated with it. This will
171    /// asynchronously trigger cancellation of any operations that are running with
172    /// this session.
173    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
174    pub async fn delete_session(
175        &mut self,
176        req: DeleteSessionRequest,
177        retry: Option<RetrySetting>,
178    ) -> Result<Response<()>, Status> {
179        let setting = retry.unwrap_or_else(default_setting);
180        let name = &req.name;
181        invoke_fn(
182            Some(setting),
183            |this| async {
184                let request = this.create_request(format!("name={name}"), req.clone());
185                this.inner.delete_session(request).await.map_err(|e| (e, this))
186            },
187            self,
188        )
189        .await
190    }
191
192    /// execute_sql executes an SQL statement, returning all results in a single reply. This
193    /// method cannot be used to return a result set larger than 10 MiB;
194    /// if the query yields more data than that, the query fails with
195    /// a FAILED_PRECONDITION error.
196    ///
197    /// Operations inside read-write transactions might return ABORTED. If
198    /// this occurs, the application should restart the transaction from
199    /// the beginning. See Transaction for more details.
200    ///
201    /// Larger result sets can be fetched in streaming fashion by calling
202    /// ExecuteStreamingSql instead.
203    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
204    pub async fn execute_sql(
205        &mut self,
206        req: ExecuteSqlRequest,
207        retry: Option<RetrySetting>,
208    ) -> Result<Response<ResultSet>, Status> {
209        let setting = retry.unwrap_or_else(default_setting);
210        let session = &req.session;
211        invoke_fn(
212            Some(setting),
213            |this| async {
214                let request = this.create_request(format!("session={session}"), req.clone());
215                this.inner.execute_sql(request).await.map_err(|e| (e, this))
216            },
217            self,
218        )
219        .await
220    }
221
222    /// execute_streaming_sql like ExecuteSql, except returns the result
223    /// set as a stream. Unlike ExecuteSql, there
224    /// is no limit on the size of the returned result set. However, no
225    /// individual row in the result set can exceed 100 MiB, and no
226    /// column value can exceed 10 MiB.
227    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
228    pub async fn execute_streaming_sql(
229        &mut self,
230        req: ExecuteSqlRequest,
231        retry: Option<RetrySetting>,
232    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
233        let setting = retry.unwrap_or_else(default_setting);
234        let session = &req.session;
235        invoke_fn(
236            Some(setting),
237            |this| async {
238                let request = this.create_request(format!("session={session}"), req.clone());
239                this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
240            },
241            self,
242        )
243        .await
244    }
245
246    /// execute_batch_dml executes a batch of SQL DML statements. This method allows many statements
247    /// to be run with lower latency than submitting them sequentially with
248    /// ExecuteSql.
249    ///
250    /// Statements are executed in sequential order. A request can succeed even if
251    /// a statement fails. The ExecuteBatchDmlResponse.status field in the
252    /// response provides information about the statement that failed. Clients must
253    /// inspect this field to determine whether an error occurred.
254    ///
255    /// Execution stops after the first failed statement; the remaining statements
256    /// are not executed.
257    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
258    pub async fn execute_batch_dml(
259        &mut self,
260        req: ExecuteBatchDmlRequest,
261        retry: Option<RetrySetting>,
262    ) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
263        let setting = retry.unwrap_or_else(default_setting);
264        let session = &req.session;
265        invoke_fn(
266            Some(setting),
267            |this| async {
268                let request = this.create_request(format!("session={session}"), req.clone());
269                let result = this.inner.execute_batch_dml(request).await;
270                match result {
271                    Ok(response) => match response.get_ref().status.as_ref() {
272                        Some(s) => {
273                            let code = Code::from(s.code);
274                            if code == Code::Ok {
275                                Ok(response)
276                            } else {
277                                Err((Status::new(code, s.message.to_string()), this))
278                            }
279                        }
280                        None => Ok(response),
281                    },
282                    Err(err) => Err((err, this)),
283                }
284            },
285            self,
286        )
287        .await
288    }
289
290    /// read reads rows from the database using key lookups and scans, as a
291    /// simple key/value style alternative to
292    /// ExecuteSql.  This method cannot be used to
293    /// return a result set larger than 10 MiB; if the read matches more
294    /// data than that, the read fails with a FAILED_PRECONDITION
295    /// error.
296    ///
297    /// Reads inside read-write transactions might return ABORTED. If
298    /// this occurs, the application should restart the transaction from
299    /// the beginning. See Transaction for more details.
300    ///
301    /// Larger result sets can be yielded in streaming fashion by calling
302    /// StreamingRead instead.
303    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
304    pub async fn read(&mut self, req: ReadRequest, retry: Option<RetrySetting>) -> Result<Response<ResultSet>, Status> {
305        let setting = retry.unwrap_or_else(default_setting);
306        let session = &req.session;
307        invoke_fn(
308            Some(setting),
309            |this| async {
310                let request = this.create_request(format!("session={session}"), req.clone());
311                this.inner.read(request).await.map_err(|e| (e, this))
312            },
313            self,
314        )
315        .await
316    }
317
318    /// streaming_read like read, except returns the result set as a
319    /// stream. Unlike read, there is no limit on the
320    /// size of the returned result set. However, no individual row in
321    /// the result set can exceed 100 MiB, and no column value can exceed
322    /// 10 MiB.
323    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
324    pub async fn streaming_read(
325        &mut self,
326        req: ReadRequest,
327        retry: Option<RetrySetting>,
328    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
329        let setting = retry.unwrap_or_else(default_setting);
330        let session = &req.session;
331        invoke_fn(
332            Some(setting),
333            |this| async {
334                let request = this.create_request(format!("session={session}"), req.clone());
335                this.inner.streaming_read(request).await.map_err(|e| (e, this))
336            },
337            self,
338        )
339        .await
340    }
341
342    /// BeginTransaction begins a new transaction. This step can often be skipped:
343    /// Read, ExecuteSql and
344    /// Commit can begin a new transaction as a
345    /// side-effect.
346    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
347    pub async fn begin_transaction(
348        &mut self,
349        req: BeginTransactionRequest,
350        retry: Option<RetrySetting>,
351    ) -> Result<Response<Transaction>, Status> {
352        let setting = retry.unwrap_or_else(default_setting);
353        let session = &req.session;
354        invoke_fn(
355            Some(setting),
356            |this| async {
357                let request = this.create_request(format!("session={session}"), req.clone());
358                this.inner.begin_transaction(request).await.map_err(|e| (e, this))
359            },
360            self,
361        )
362        .await
363    }
364
365    /// Commit commits a transaction. The request includes the mutations to be
366    /// applied to rows in the database.
367    ///
368    /// Commit might return an ABORTED error. This can occur at any time;
369    /// commonly, the cause is conflicts with concurrent
370    /// transactions. However, it can also happen for a variety of other
371    /// reasons. If Commit returns ABORTED, the caller should re-attempt
372    /// the transaction from the beginning, re-using the same session.
373    ///
374    /// On very rare occasions, Commit might return UNKNOWN. This can happen,
375    /// for example, if the client job experiences a 1+ hour networking failure.
376    /// At that point, Cloud Spanner has lost track of the transaction outcome and
377    /// we recommend that you perform another read from the database to see the
378    /// state of things as they are now.
379    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
380    pub async fn commit(
381        &mut self,
382        req: CommitRequest,
383        retry: Option<RetrySetting>,
384    ) -> Result<Response<CommitResponse>, Status> {
385        let setting = retry.unwrap_or_else(default_setting);
386        let session = &req.session;
387        invoke_fn(
388            Some(setting),
389            |this| async {
390                let request = this.create_request(format!("session={session}"), req.clone());
391                this.inner.commit(request).await.map_err(|e| (e, this))
392            },
393            self,
394        )
395        .await
396    }
397
398    /// Rollback rolls back a transaction, releasing any locks it holds. It is a good
399    /// idea to call this for any transaction that includes one or more
400    /// Read or ExecuteSql requests and
401    /// ultimately decides not to commit.
402    ///
403    /// Rollback returns OK if it successfully aborts the transaction, the
404    /// transaction was already aborted, or the transaction is not
405    /// found. Rollback never returns ABORTED.
406    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
407    pub async fn rollback(
408        &mut self,
409        req: RollbackRequest,
410        retry: Option<RetrySetting>,
411    ) -> Result<Response<()>, Status> {
412        let setting = retry.unwrap_or_else(default_setting);
413        let session = &req.session;
414        invoke_fn(
415            Some(setting),
416            |this| async {
417                let request = this.create_request(format!("session={session}"), req.clone());
418                this.inner.rollback(request).await.map_err(|e| (e, this))
419            },
420            self,
421        )
422        .await
423    }
424
425    /// PartitionQuery creates a set of partition tokens that can be used to execute a query
426    /// operation in parallel.  Each of the returned partition tokens can be used
427    /// by ExecuteStreamingSql to specify a subset
428    /// of the query result to read.  The same session and read-only transaction
429    /// must be used by the PartitionQueryRequest used to create the
430    /// partition tokens and the ExecuteSqlRequests that use the partition tokens.
431    ///
432    /// Partition tokens become invalid when the session used to create them
433    /// is deleted, is idle for too long, begins a new transaction, or becomes too
434    /// old.  When any of these happen, it is not possible to resume the query, and
435    /// the whole operation must be restarted from the beginning.
436    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
437    pub async fn partition_query(
438        &mut self,
439        req: PartitionQueryRequest,
440        retry: Option<RetrySetting>,
441    ) -> Result<Response<PartitionResponse>, Status> {
442        let setting = retry.unwrap_or_else(default_setting);
443        let session = &req.session;
444        invoke_fn(
445            Some(setting),
446            |this| async {
447                let request = this.create_request(format!("session={session}"), req.clone());
448                this.inner.partition_query(request).await.map_err(|e| (e, this))
449            },
450            self,
451        )
452        .await
453    }
454
455    /// PartitionRead creates a set of partition tokens that can be used to execute a read
456    /// operation in parallel.  Each of the returned partition tokens can be used
457    /// by StreamingRead to specify a subset of the read
458    /// result to read.  The same session and read-only transaction must be used by
459    /// the PartitionReadRequest used to create the partition tokens and the
460    /// ReadRequests that use the partition tokens.  There are no ordering
461    /// guarantees on rows returned among the returned partition tokens, or even
462    /// within each individual StreamingRead call issued with a partition_token.
463    ///
464    /// Partition tokens become invalid when the session used to create them
465    /// is deleted, is idle for too long, begins a new transaction, or becomes too
466    /// old.  When any of these happen, it is not possible to resume the read, and
467    /// the whole operation must be restarted from the beginning.
468    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
469    pub async fn partition_read(
470        &mut self,
471        req: PartitionReadRequest,
472        retry: Option<RetrySetting>,
473    ) -> Result<Response<PartitionResponse>, Status> {
474        let setting = retry.unwrap_or_else(default_setting);
475        let session = &req.session;
476        invoke_fn(
477            Some(setting),
478            |this| async {
479                let request = this.create_request(format!("session={session}"), req.clone());
480                this.inner.partition_read(request).await.map_err(|e| (e, this))
481            },
482            self,
483        )
484        .await
485    }
486
487    fn create_request<T>(&self, param_string: String, into_request: impl grpc::IntoRequest<T>) -> grpc::Request<T> {
488        let mut req = create_request(param_string, into_request);
489        let target = req.metadata_mut();
490        for entry in self.metadata.iter() {
491            match entry {
492                KeyAndValueRef::Ascii(k, v) => {
493                    target.append(k, v.clone());
494                }
495                KeyAndValueRef::Binary(k, v) => {
496                    target.append_bin(k, v.clone());
497                }
498            }
499        }
500        req
501    }
502}