gcloud_spanner/
transaction.rs

1use std::ops::DerefMut;
2use std::sync::atomic::AtomicI64;
3
4use prost_types::Struct;
5
6use google_cloud_gax::grpc::Status;
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::spanner::v1::request_options::Priority;
9use google_cloud_googleapis::spanner::v1::{
10    execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
11    ReadRequest, RequestOptions, TransactionSelector,
12};
13
14use crate::key::{Key, KeySet};
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::row::Row;
17use crate::session::ManagedSession;
18use crate::statement::Statement;
19
20#[derive(Clone, Default)]
21pub struct CallOptions {
22    /// Priority is the RPC priority to use for the read operation.
23    pub priority: Option<Priority>,
24    pub retry: Option<RetrySetting>,
25}
26
27#[derive(Clone)]
28pub struct ReadOptions {
29    /// The index to use for reading. If non-empty, you can only read columns
30    /// that are part of the index key, part of the primary key, or stored in the
31    /// index due to a STORING clause in the index definition.
32    pub index: String,
33    /// The maximum number of rows to read. A limit value less than 1 means no limit.
34    pub limit: i64,
35    pub call_options: CallOptions,
36}
37
38impl Default for ReadOptions {
39    fn default() -> Self {
40        ReadOptions {
41            index: "".to_string(),
42            limit: 0,
43            call_options: CallOptions::default(),
44        }
45    }
46}
47
48#[derive(Clone)]
49pub struct QueryOptions {
50    pub mode: QueryMode,
51    pub optimizer_options: Option<ExecuteQueryOptions>,
52    pub call_options: CallOptions,
53    /// If cancel safe is required, such as when tokio::select is used, set to false.
54    /// ```
55    /// use time::{Duration, OffsetDateTime};
56    /// use google_cloud_spanner::client::Client;
57    /// use google_cloud_spanner::key::Key;
58    /// use google_cloud_spanner::statement::Statement;
59    /// use google_cloud_spanner::transaction::QueryOptions;
60    ///
61    /// async fn query(client: Client) {
62    ///   let mut tx = client.single().await.unwrap();
63    ///   let option = QueryOptions {
64    ///     enable_resume: false,
65    ///     ..Default::default()
66    ///   };
67    ///   let mut stmt = Statement::new("SELECT ChangeRecord FROM READ_UserItemChangeStream (
68    ///           start_timestamp => @now,
69    ///           end_timestamp => NULL,
70    ///           partition_token => {},
71    ///           heartbeat_milliseconds => 10000
72    ///   )");
73    ///   stmt.add_param("now", &OffsetDateTime::now_utc());
74    ///   let mut rows = tx.query_with_option(stmt, option).await.unwrap();
75    ///   let mut tick = tokio::time::interval(tokio::time::Duration::from_millis(100));
76    ///   loop {
77    ///     tokio::select! {
78    ///        _ = tick.tick() => {
79    ///             // run task
80    ///        },
81    ///        maybe = rows.next() =>  {
82    ///          let row = maybe.unwrap().unwrap();
83    ///        }
84    ///     }
85    ///   }
86    /// }
87    pub enable_resume: bool,
88}
89
90impl Default for QueryOptions {
91    fn default() -> Self {
92        QueryOptions {
93            mode: QueryMode::Normal,
94            optimizer_options: None,
95            call_options: CallOptions::default(),
96            enable_resume: true,
97        }
98    }
99}
100
101pub struct Transaction {
102    pub(crate) session: Option<ManagedSession>,
103    // for returning ownership of session on before destroy
104    pub(crate) sequence_number: AtomicI64,
105    pub(crate) transaction_selector: TransactionSelector,
106    /// The transaction tag to include with each request in this transaction.
107    pub(crate) transaction_tag: Option<String>,
108    /// disableRouteToLeader specifies if all the requests of type read-write and PDML
109    /// need to be routed to the leader region.
110    pub(crate) disable_route_to_leader: bool,
111}
112
113impl Transaction {
114    pub(crate) fn create_request_options(
115        priority: Option<Priority>,
116        transaction_tag: Option<String>,
117    ) -> Option<RequestOptions> {
118        if priority.is_none() && transaction_tag.as_ref().map(String::is_empty).unwrap_or(true) {
119            return None;
120        }
121        Some(RequestOptions {
122            priority: priority.unwrap_or_default().into(),
123            request_tag: String::new(),
124            transaction_tag: transaction_tag.unwrap_or_default(),
125        })
126    }
127
128    /// query executes a query against the database. It returns a RowIterator for
129    /// retrieving the resulting rows.
130    ///
131    /// query returns only row data, without a query plan or execution statistics.
132    pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_, impl Reader>, Status> {
133        self.query_with_option(statement, QueryOptions::default()).await
134    }
135
136    /// query executes a query against the database. It returns a RowIterator for
137    /// retrieving the resulting rows.
138    pub async fn query_with_option(
139        &mut self,
140        statement: Statement,
141        options: QueryOptions,
142    ) -> Result<RowIterator<'_, impl Reader>, Status> {
143        let request = ExecuteSqlRequest {
144            session: self.session.as_ref().unwrap().session.name.to_string(),
145            transaction: Some(self.transaction_selector.clone()),
146            sql: statement.sql,
147            params: Some(Struct {
148                fields: statement.params,
149            }),
150            param_types: statement.param_types,
151            resume_token: vec![],
152            query_mode: options.mode.into(),
153            partition_token: vec![],
154            seqno: 0,
155            query_options: options.optimizer_options,
156            request_options: Transaction::create_request_options(
157                options.call_options.priority,
158                self.transaction_tag.clone(),
159            ),
160            data_boost_enabled: false,
161            directed_read_options: None,
162            last_statement: false,
163        };
164        let session = self.session.as_mut().unwrap().deref_mut();
165        let reader = StatementReader {
166            enable_resume: options.enable_resume,
167            request,
168        };
169        RowIterator::new(session, reader, Some(options.call_options), self.disable_route_to_leader).await
170    }
171
172    /// read returns a RowIterator for reading multiple rows from the database.
173    /// ```
174    /// use google_cloud_spanner::key::Key;
175    /// use google_cloud_spanner::client::{Client, Error};
176    ///
177    /// #[tokio::main]
178    /// async fn run(client: Client) -> Result<(), Error> {
179    ///     let mut tx = client.single().await?;
180    ///     let mut iter = tx.read("Guild", &["GuildID", "OwnerUserID"], vec![
181    ///         Key::new(&"pk1"),
182    ///         Key::new(&"pk2")
183    ///     ]).await?;
184    ///
185    ///     while let Some(row) = iter.next().await? {
186    ///         let guild_id = row.column_by_name::<String>("GuildID");
187    ///         //do something
188    ///     };
189    ///     Ok(())
190    /// }
191    /// ```
192    pub async fn read(
193        &mut self,
194        table: &str,
195        columns: &[&str],
196        key_set: impl Into<KeySet>,
197    ) -> Result<RowIterator<'_, impl Reader>, Status> {
198        self.read_with_option(table, columns, key_set, ReadOptions::default())
199            .await
200    }
201
202    /// read returns a RowIterator for reading multiple rows from the database.
203    pub async fn read_with_option(
204        &mut self,
205        table: &str,
206        columns: &[&str],
207        key_set: impl Into<KeySet>,
208        options: ReadOptions,
209    ) -> Result<RowIterator<'_, impl Reader>, Status> {
210        let request = ReadRequest {
211            session: self.get_session_name(),
212            transaction: Some(self.transaction_selector.clone()),
213            table: table.to_string(),
214            index: options.index,
215            columns: columns.iter().map(|x| x.to_string()).collect(),
216            key_set: Some(key_set.into().inner),
217            limit: options.limit,
218            resume_token: vec![],
219            partition_token: vec![],
220            request_options: Transaction::create_request_options(
221                options.call_options.priority,
222                self.transaction_tag.clone(),
223            ),
224            data_boost_enabled: false,
225            order_by: 0,
226            directed_read_options: None,
227            lock_hint: 0,
228        };
229
230        let disable_route_to_leader = self.disable_route_to_leader;
231        let session = self.as_mut_session();
232        let reader = TableReader { request };
233        RowIterator::new(session, reader, Some(options.call_options), disable_route_to_leader).await
234    }
235
236    /// read returns a RowIterator for reading multiple rows from the database.
237    /// ```
238    /// use google_cloud_spanner::key::Key;
239    /// use google_cloud_spanner::client::Client;
240    /// use google_cloud_spanner::client::Error;
241    ///
242    /// async fn run(client: Client) -> Result<(), Error> {
243    ///     let mut tx = client.single().await?;
244    ///     let row = tx.read_row("Guild", &["GuildID", "OwnerUserID"], Key::new(&"guild1")).await?;
245    ///     Ok(())
246    /// }
247    /// ```
248    pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
249        self.read_row_with_option(table, columns, key, ReadOptions::default())
250            .await
251    }
252
253    /// read returns a RowIterator for reading multiple rows from the database.
254    pub async fn read_row_with_option(
255        &mut self,
256        table: &str,
257        columns: &[&str],
258        key: Key,
259        options: ReadOptions,
260    ) -> Result<Option<Row>, Status> {
261        let call_options = options.call_options.clone();
262        let mut reader = self
263            .read_with_option(table, columns, KeySet::from(key), options)
264            .await?;
265        reader.set_call_options(call_options);
266        reader.next().await
267    }
268
269    pub(crate) fn get_session_name(&self) -> String {
270        self.session.as_ref().unwrap().session.name.to_string()
271    }
272
273    pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
274        self.session.as_mut().unwrap()
275    }
276
277    /// returns the owner ship of session.
278    /// must drop destroy after this method.
279    pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
280        self.session.take()
281    }
282}