gcloud_spanner/apiv1/
spanner_client.rs

1use std::time::Duration;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::grpc::metadata::{KeyAndValueRef, MetadataMap};
5use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
6use google_cloud_gax::retry::{invoke_fn, RetrySetting};
7use google_cloud_gax::{create_request, grpc};
8use google_cloud_googleapis::spanner::v1::spanner_client::SpannerClient;
9use google_cloud_googleapis::spanner::v1::{
10    BatchCreateSessionsRequest, BatchCreateSessionsResponse, BeginTransactionRequest, CommitRequest, CommitResponse,
11    CreateSessionRequest, DeleteSessionRequest, ExecuteBatchDmlRequest, ExecuteBatchDmlResponse, ExecuteSqlRequest,
12    GetSessionRequest, ListSessionsRequest, ListSessionsResponse, PartialResultSet, PartitionQueryRequest,
13    PartitionReadRequest, PartitionResponse, ReadRequest, ResultSet, RollbackRequest, Session, Transaction,
14};
15
16pub(crate) fn ping_query_request(session_name: impl Into<String>) -> ExecuteSqlRequest {
17    ExecuteSqlRequest {
18        session: session_name.into(),
19        transaction: None,
20        sql: "SELECT 1".to_string(),
21        params: None,
22        param_types: Default::default(),
23        resume_token: vec![],
24        query_mode: 0,
25        partition_token: vec![],
26        seqno: 0,
27        query_options: None,
28        request_options: None,
29        directed_read_options: None,
30        data_boost_enabled: false,
31        last_statement: false,
32    }
33}
34
35fn default_setting() -> RetrySetting {
36    RetrySetting {
37        from_millis: 50,
38        max_delay: Some(Duration::from_secs(10)),
39        factor: 1u64,
40        take: 20,
41        codes: vec![Code::Unavailable, Code::Unknown],
42    }
43}
44
45#[derive(Clone)]
46pub struct Client {
47    inner: SpannerClient<Channel>,
48    metadata: MetadataMap,
49}
50
51impl Client {
52    /// create new spanner client
53    pub fn new(inner: SpannerClient<Channel>) -> Client {
54        // https://github.com/googleapis/google-cloud-go/blob/65a9ba55ed3777f520bd881d891e8917323549a5/spanner/apiv1/spanner_client.go#L73
55        Client {
56            inner: inner.max_decoding_message_size(i32::MAX as usize),
57            metadata: Default::default(),
58        }
59    }
60
61    /// set metadata for spanner client
62    pub(crate) fn with_metadata(self, metadata: MetadataMap) -> Client {
63        Client {
64            inner: self.inner,
65            metadata,
66        }
67    }
68
69    /// create_session creates a new session. A session can be used to perform
70    /// transactions that read and/or modify data in a Cloud Spanner database.
71    /// Sessions are meant to be reused for many consecutive
72    /// transactions.
73    ///
74    /// Sessions can only execute one transaction at a time. To execute
75    /// multiple concurrent read-write/write-only transactions, create
76    /// multiple sessions. Note that standalone reads and queries use a
77    /// transaction internally, and count toward the one transaction
78    /// limit.
79    ///
80    /// Active sessions use additional server resources, so it is a good idea to
81    /// delete idle and unneeded sessions.
82    /// Aside from explicit deletes, Cloud Spanner may delete sessions for which no
83    /// operations are sent for more than an hour. If a session is deleted,
84    /// requests to it return NOT_FOUND.
85    ///
86    /// Idle sessions can be kept alive by sending a trivial SQL query
87    /// periodically, e.g., "SELECT 1".
88    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
89    pub async fn create_session(
90        &mut self,
91        req: CreateSessionRequest,
92        retry: Option<RetrySetting>,
93    ) -> Result<Response<Session>, Status> {
94        let setting = retry.unwrap_or_else(default_setting);
95        let database = &req.database;
96        invoke_fn(
97            Some(setting),
98            |this| async {
99                let request = this.create_request(format!("database={database}"), req.clone());
100                this.inner.create_session(request).await.map_err(|e| (e, this))
101            },
102            self,
103        )
104        .await
105    }
106
107    /// batch_create_sessions creates multiple new sessions.
108    ///
109    /// This API can be used to initialize a session cache on the clients.
110    /// See https:///goo.gl/TgSFN2 (at https:///goo.gl/TgSFN2) for best practices on session cache management.
111    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
112    pub async fn batch_create_sessions(
113        &mut self,
114        req: BatchCreateSessionsRequest,
115        retry: Option<RetrySetting>,
116    ) -> Result<Response<BatchCreateSessionsResponse>, Status> {
117        let setting = retry.unwrap_or_else(default_setting);
118        let database = &req.database;
119        invoke_fn(
120            Some(setting),
121            |this| async {
122                let request = this.create_request(format!("database={database}"), req.clone());
123                this.inner.batch_create_sessions(request).await.map_err(|e| (e, this))
124            },
125            self,
126        )
127        .await
128    }
129
130    /// get_session gets a session. Returns NOT_FOUND if the session does not exist.
131    /// This is mainly useful for determining whether a session is still alive.
132    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
133    pub async fn get_session(
134        &mut self,
135        req: GetSessionRequest,
136        retry: Option<RetrySetting>,
137    ) -> Result<Response<Session>, Status> {
138        let setting = retry.unwrap_or_else(default_setting);
139        let name = &req.name;
140        invoke_fn(
141            Some(setting),
142            |this| async {
143                let request = this.create_request(format!("name={name}"), req.clone());
144                this.inner.get_session(request).await.map_err(|e| (e, this))
145            },
146            self,
147        )
148        .await
149    }
150
151    /// list_sessions lists all sessions in a given database.
152    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
153    pub async fn list_sessions(
154        &mut self,
155        req: ListSessionsRequest,
156        retry: Option<RetrySetting>,
157    ) -> Result<Response<ListSessionsResponse>, Status> {
158        let setting = retry.unwrap_or_else(default_setting);
159        let database = &req.database;
160        invoke_fn(
161            Some(setting),
162            |this| async {
163                let request = this.create_request(format!("database={database}"), req.clone());
164                this.inner.list_sessions(request).await.map_err(|e| (e, this))
165            },
166            self,
167        )
168        .await
169    }
170
171    /// delete_session ends a session, releasing server resources associated with it. This will
172    /// asynchronously trigger cancellation of any operations that are running with
173    /// this session.
174    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
175    pub async fn delete_session(
176        &mut self,
177        req: DeleteSessionRequest,
178        retry: Option<RetrySetting>,
179    ) -> Result<Response<()>, Status> {
180        let setting = retry.unwrap_or_else(default_setting);
181        let name = &req.name;
182        invoke_fn(
183            Some(setting),
184            |this| async {
185                let request = this.create_request(format!("name={name}"), req.clone());
186                this.inner.delete_session(request).await.map_err(|e| (e, this))
187            },
188            self,
189        )
190        .await
191    }
192
193    /// execute_sql executes an SQL statement, returning all results in a single reply. This
194    /// method cannot be used to return a result set larger than 10 MiB;
195    /// if the query yields more data than that, the query fails with
196    /// a FAILED_PRECONDITION error.
197    ///
198    /// Operations inside read-write transactions might return ABORTED. If
199    /// this occurs, the application should restart the transaction from
200    /// the beginning. See Transaction for more details.
201    ///
202    /// Larger result sets can be fetched in streaming fashion by calling
203    /// ExecuteStreamingSql instead.
204    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
205    pub async fn execute_sql(
206        &mut self,
207        req: ExecuteSqlRequest,
208        retry: Option<RetrySetting>,
209    ) -> Result<Response<ResultSet>, Status> {
210        let setting = retry.unwrap_or_else(default_setting);
211        let session = &req.session;
212        invoke_fn(
213            Some(setting),
214            |this| async {
215                let request = this.create_request(format!("session={session}"), req.clone());
216                this.inner.execute_sql(request).await.map_err(|e| (e, this))
217            },
218            self,
219        )
220        .await
221    }
222
223    /// execute_streaming_sql like ExecuteSql, except returns the result
224    /// set as a stream. Unlike ExecuteSql, there
225    /// is no limit on the size of the returned result set. However, no
226    /// individual row in the result set can exceed 100 MiB, and no
227    /// column value can exceed 10 MiB.
228    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
229    pub async fn execute_streaming_sql(
230        &mut self,
231        req: ExecuteSqlRequest,
232        retry: Option<RetrySetting>,
233    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
234        let setting = retry.unwrap_or_else(default_setting);
235        let session = &req.session;
236        invoke_fn(
237            Some(setting),
238            |this| async {
239                let request = this.create_request(format!("session={session}"), req.clone());
240                this.inner.execute_streaming_sql(request).await.map_err(|e| (e, this))
241            },
242            self,
243        )
244        .await
245    }
246
247    /// execute_batch_dml executes a batch of SQL DML statements. This method allows many statements
248    /// to be run with lower latency than submitting them sequentially with
249    /// ExecuteSql.
250    ///
251    /// Statements are executed in sequential order. A request can succeed even if
252    /// a statement fails. The ExecuteBatchDmlResponse.status field in the
253    /// response provides information about the statement that failed. Clients must
254    /// inspect this field to determine whether an error occurred.
255    ///
256    /// Execution stops after the first failed statement; the remaining statements
257    /// are not executed.
258    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
259    pub async fn execute_batch_dml(
260        &mut self,
261        req: ExecuteBatchDmlRequest,
262        retry: Option<RetrySetting>,
263    ) -> Result<Response<ExecuteBatchDmlResponse>, Status> {
264        let setting = retry.unwrap_or_else(default_setting);
265        let session = &req.session;
266        invoke_fn(
267            Some(setting),
268            |this| async {
269                let request = this.create_request(format!("session={session}"), req.clone());
270                let result = this.inner.execute_batch_dml(request).await;
271                match result {
272                    Ok(response) => match response.get_ref().status.as_ref() {
273                        Some(s) => {
274                            let code = Code::from(s.code);
275                            if code == Code::Ok {
276                                Ok(response)
277                            } else {
278                                Err((Status::new(code, s.message.to_string()), this))
279                            }
280                        }
281                        None => Ok(response),
282                    },
283                    Err(err) => Err((err, this)),
284                }
285            },
286            self,
287        )
288        .await
289    }
290
291    /// read reads rows from the database using key lookups and scans, as a
292    /// simple key/value style alternative to
293    /// ExecuteSql.  This method cannot be used to
294    /// return a result set larger than 10 MiB; if the read matches more
295    /// data than that, the read fails with a FAILED_PRECONDITION
296    /// error.
297    ///
298    /// Reads inside read-write transactions might return ABORTED. If
299    /// this occurs, the application should restart the transaction from
300    /// the beginning. See Transaction for more details.
301    ///
302    /// Larger result sets can be yielded in streaming fashion by calling
303    /// StreamingRead instead.
304    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
305    pub async fn read(&mut self, req: ReadRequest, retry: Option<RetrySetting>) -> Result<Response<ResultSet>, Status> {
306        let setting = retry.unwrap_or_else(default_setting);
307        let session = &req.session;
308        invoke_fn(
309            Some(setting),
310            |this| async {
311                let request = this.create_request(format!("session={session}"), req.clone());
312                this.inner.read(request).await.map_err(|e| (e, this))
313            },
314            self,
315        )
316        .await
317    }
318
319    /// streaming_read like read, except returns the result set as a
320    /// stream. Unlike read, there is no limit on the
321    /// size of the returned result set. However, no individual row in
322    /// the result set can exceed 100 MiB, and no column value can exceed
323    /// 10 MiB.
324    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
325    pub async fn streaming_read(
326        &mut self,
327        req: ReadRequest,
328        retry: Option<RetrySetting>,
329    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
330        let setting = retry.unwrap_or_else(default_setting);
331        let session = &req.session;
332        invoke_fn(
333            Some(setting),
334            |this| async {
335                let request = this.create_request(format!("session={session}"), req.clone());
336                this.inner.streaming_read(request).await.map_err(|e| (e, this))
337            },
338            self,
339        )
340        .await
341    }
342
343    /// BeginTransaction begins a new transaction. This step can often be skipped:
344    /// Read, ExecuteSql and
345    /// Commit can begin a new transaction as a
346    /// side-effect.
347    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
348    pub async fn begin_transaction(
349        &mut self,
350        req: BeginTransactionRequest,
351        retry: Option<RetrySetting>,
352    ) -> Result<Response<Transaction>, Status> {
353        let setting = retry.unwrap_or_else(default_setting);
354        let session = &req.session;
355        invoke_fn(
356            Some(setting),
357            |this| async {
358                let request = this.create_request(format!("session={session}"), req.clone());
359                this.inner.begin_transaction(request).await.map_err(|e| (e, this))
360            },
361            self,
362        )
363        .await
364    }
365
366    /// Commit commits a transaction. The request includes the mutations to be
367    /// applied to rows in the database.
368    ///
369    /// Commit might return an ABORTED error. This can occur at any time;
370    /// commonly, the cause is conflicts with concurrent
371    /// transactions. However, it can also happen for a variety of other
372    /// reasons. If Commit returns ABORTED, the caller should re-attempt
373    /// the transaction from the beginning, re-using the same session.
374    ///
375    /// On very rare occasions, Commit might return UNKNOWN. This can happen,
376    /// for example, if the client job experiences a 1+ hour networking failure.
377    /// At that point, Cloud Spanner has lost track of the transaction outcome and
378    /// we recommend that you perform another read from the database to see the
379    /// state of things as they are now.
380    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
381    pub async fn commit(
382        &mut self,
383        req: CommitRequest,
384        retry: Option<RetrySetting>,
385    ) -> Result<Response<CommitResponse>, Status> {
386        let setting = retry.unwrap_or_else(default_setting);
387        let session = &req.session;
388        invoke_fn(
389            Some(setting),
390            |this| async {
391                let request = this.create_request(format!("session={session}"), req.clone());
392                this.inner.commit(request).await.map_err(|e| (e, this))
393            },
394            self,
395        )
396        .await
397    }
398
399    /// Rollback rolls back a transaction, releasing any locks it holds. It is a good
400    /// idea to call this for any transaction that includes one or more
401    /// Read or ExecuteSql requests and
402    /// ultimately decides not to commit.
403    ///
404    /// Rollback returns OK if it successfully aborts the transaction, the
405    /// transaction was already aborted, or the transaction is not
406    /// found. Rollback never returns ABORTED.
407    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
408    pub async fn rollback(
409        &mut self,
410        req: RollbackRequest,
411        retry: Option<RetrySetting>,
412    ) -> Result<Response<()>, Status> {
413        let setting = retry.unwrap_or_else(default_setting);
414        let session = &req.session;
415        invoke_fn(
416            Some(setting),
417            |this| async {
418                let request = this.create_request(format!("session={session}"), req.clone());
419                this.inner.rollback(request).await.map_err(|e| (e, this))
420            },
421            self,
422        )
423        .await
424    }
425
426    /// PartitionQuery creates a set of partition tokens that can be used to execute a query
427    /// operation in parallel.  Each of the returned partition tokens can be used
428    /// by ExecuteStreamingSql to specify a subset
429    /// of the query result to read.  The same session and read-only transaction
430    /// must be used by the PartitionQueryRequest used to create the
431    /// partition tokens and the ExecuteSqlRequests that use the partition tokens.
432    ///
433    /// Partition tokens become invalid when the session used to create them
434    /// is deleted, is idle for too long, begins a new transaction, or becomes too
435    /// old.  When any of these happen, it is not possible to resume the query, and
436    /// the whole operation must be restarted from the beginning.
437    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
438    pub async fn partition_query(
439        &mut self,
440        req: PartitionQueryRequest,
441        retry: Option<RetrySetting>,
442    ) -> Result<Response<PartitionResponse>, Status> {
443        let setting = retry.unwrap_or_else(default_setting);
444        let session = &req.session;
445        invoke_fn(
446            Some(setting),
447            |this| async {
448                let request = this.create_request(format!("session={session}"), req.clone());
449                this.inner.partition_query(request).await.map_err(|e| (e, this))
450            },
451            self,
452        )
453        .await
454    }
455
456    /// PartitionRead creates a set of partition tokens that can be used to execute a read
457    /// operation in parallel.  Each of the returned partition tokens can be used
458    /// by StreamingRead to specify a subset of the read
459    /// result to read.  The same session and read-only transaction must be used by
460    /// the PartitionReadRequest used to create the partition tokens and the
461    /// ReadRequests that use the partition tokens.  There are no ordering
462    /// guarantees on rows returned among the returned partition tokens, or even
463    /// within each individual StreamingRead call issued with a partition_token.
464    ///
465    /// Partition tokens become invalid when the session used to create them
466    /// is deleted, is idle for too long, begins a new transaction, or becomes too
467    /// old.  When any of these happen, it is not possible to resume the read, and
468    /// the whole operation must be restarted from the beginning.
469    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
470    pub async fn partition_read(
471        &mut self,
472        req: PartitionReadRequest,
473        retry: Option<RetrySetting>,
474    ) -> Result<Response<PartitionResponse>, Status> {
475        let setting = retry.unwrap_or_else(default_setting);
476        let session = &req.session;
477        invoke_fn(
478            Some(setting),
479            |this| async {
480                let request = this.create_request(format!("session={session}"), req.clone());
481                this.inner.partition_read(request).await.map_err(|e| (e, this))
482            },
483            self,
484        )
485        .await
486    }
487
488    fn create_request<T>(&self, param_string: String, into_request: impl grpc::IntoRequest<T>) -> grpc::Request<T> {
489        let mut req = create_request(param_string, into_request);
490        let target = req.metadata_mut();
491        for entry in self.metadata.iter() {
492            match entry {
493                KeyAndValueRef::Ascii(k, v) => {
494                    target.append(k, v.clone());
495                }
496                KeyAndValueRef::Binary(k, v) => {
497                    target.append_bin(k, v.clone());
498                }
499            }
500        }
501        req
502    }
503}