google_cloud_spanner/
transaction.rs

1use std::ops::DerefMut;
2use std::sync::atomic::AtomicI64;
3
4use prost_types::Struct;
5
6use google_cloud_gax::grpc::Status;
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::spanner::v1::request_options::Priority;
9use google_cloud_googleapis::spanner::v1::{
10    execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
11    ReadRequest, RequestOptions, TransactionSelector,
12};
13
14use crate::key::{Key, KeySet};
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::row::Row;
17use crate::session::ManagedSession;
18use crate::statement::Statement;
19
20#[derive(Clone, Default)]
21pub struct CallOptions {
22    /// Priority is the RPC priority to use for the read operation.
23    pub priority: Option<Priority>,
24    pub retry: Option<RetrySetting>,
25}
26
27#[derive(Clone)]
28pub struct ReadOptions {
29    /// The index to use for reading. If non-empty, you can only read columns
30    /// that are part of the index key, part of the primary key, or stored in the
31    /// index due to a STORING clause in the index definition.
32    pub index: String,
33    /// The maximum number of rows to read. A limit value less than 1 means no limit.
34    pub limit: i64,
35    pub call_options: CallOptions,
36}
37
38impl Default for ReadOptions {
39    fn default() -> Self {
40        ReadOptions {
41            index: "".to_string(),
42            limit: 0,
43            call_options: CallOptions::default(),
44        }
45    }
46}
47
48#[derive(Clone)]
49pub struct QueryOptions {
50    pub mode: QueryMode,
51    pub optimizer_options: Option<ExecuteQueryOptions>,
52    pub call_options: CallOptions,
53    /// If cancel safe is required, such as when tokio::select is used, set to false.
54    /// ```
55    /// use time::{Duration, OffsetDateTime};
56    /// use google_cloud_spanner::client::Client;
57    /// use google_cloud_spanner::key::Key;
58    /// use google_cloud_spanner::statement::Statement;
59    /// use google_cloud_spanner::transaction::QueryOptions;
60    ///
61    /// async fn query(client: Client) {
62    ///   let mut tx = client.single().await.unwrap();
63    ///   let option = QueryOptions {
64    ///     enable_resume: false,
65    ///     ..Default::default()
66    ///   };
67    ///   let mut stmt = Statement::new("SELECT ChangeRecord FROM READ_UserItemChangeStream (
68    ///           start_timestamp => @now,
69    ///           end_timestamp => NULL,
70    ///           partition_token => {},
71    ///           heartbeat_milliseconds => 10000
72    ///   )");
73    ///   stmt.add_param("now", &OffsetDateTime::now_utc());
74    ///   let mut rows = tx.query_with_option(stmt, option).await.unwrap();
75    ///   let mut tick = tokio::time::interval(tokio::time::Duration::from_millis(100));
76    ///   loop {
77    ///     tokio::select! {
78    ///        _ = tick.tick() => {
79    ///             // run task
80    ///        },
81    ///        maybe = rows.next() =>  {
82    ///          let row = maybe.unwrap().unwrap();
83    ///        }
84    ///     }
85    ///   }
86    /// }
87    pub enable_resume: bool,
88}
89
90impl Default for QueryOptions {
91    fn default() -> Self {
92        QueryOptions {
93            mode: QueryMode::Normal,
94            optimizer_options: None,
95            call_options: CallOptions::default(),
96            enable_resume: true,
97        }
98    }
99}
100
101pub struct Transaction {
102    pub(crate) session: Option<ManagedSession>,
103    // for returning ownership of session on before destroy
104    pub(crate) sequence_number: AtomicI64,
105    pub(crate) transaction_selector: TransactionSelector,
106}
107
108impl Transaction {
109    pub(crate) fn create_request_options(priority: Option<Priority>) -> Option<RequestOptions> {
110        priority.map(|s| RequestOptions {
111            priority: s.into(),
112            request_tag: "".to_string(),
113            transaction_tag: "".to_string(),
114        })
115    }
116
117    /// query executes a query against the database. It returns a RowIterator for
118    /// retrieving the resulting rows.
119    ///
120    /// query returns only row data, without a query plan or execution statistics.
121    pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_, impl Reader>, Status> {
122        self.query_with_option(statement, QueryOptions::default()).await
123    }
124
125    /// query executes a query against the database. It returns a RowIterator for
126    /// retrieving the resulting rows.
127    ///
128    /// query returns only row data, without a query plan or execution statistics.
129    pub async fn query_with_option(
130        &mut self,
131        statement: Statement,
132        options: QueryOptions,
133    ) -> Result<RowIterator<'_, impl Reader>, Status> {
134        let request = ExecuteSqlRequest {
135            session: self.session.as_ref().unwrap().session.name.to_string(),
136            transaction: Some(self.transaction_selector.clone()),
137            sql: statement.sql,
138            params: Some(Struct {
139                fields: statement.params,
140            }),
141            param_types: statement.param_types,
142            resume_token: vec![],
143            query_mode: options.mode.into(),
144            partition_token: vec![],
145            seqno: 0,
146            query_options: options.optimizer_options,
147            request_options: Transaction::create_request_options(options.call_options.priority),
148            data_boost_enabled: false,
149            directed_read_options: None,
150        };
151        let session = self.session.as_mut().unwrap().deref_mut();
152        let reader = StatementReader {
153            enable_resume: options.enable_resume,
154            request,
155        };
156        RowIterator::new(session, reader, Some(options.call_options)).await
157    }
158
159    /// read returns a RowIterator for reading multiple rows from the database.
160    /// ```
161    /// use google_cloud_spanner::key::Key;
162    /// use google_cloud_spanner::client::{Client, Error};
163    ///
164    /// #[tokio::main]
165    /// async fn run(client: Client) -> Result<(), Error> {
166    ///     let mut tx = client.single().await?;
167    ///     let mut iter = tx.read("Guild", &["GuildID", "OwnerUserID"], vec![
168    ///         Key::new(&"pk1"),
169    ///         Key::new(&"pk2")
170    ///     ]).await?;
171    ///
172    ///     while let Some(row) = iter.next().await? {
173    ///         let guild_id = row.column_by_name::<String>("GuildID");
174    ///         //do something
175    ///     };
176    ///     Ok(())
177    /// }
178    /// ```
179    pub async fn read(
180        &mut self,
181        table: &str,
182        columns: &[&str],
183        key_set: impl Into<KeySet>,
184    ) -> Result<RowIterator<'_, impl Reader>, Status> {
185        self.read_with_option(table, columns, key_set, ReadOptions::default())
186            .await
187    }
188
189    /// read returns a RowIterator for reading multiple rows from the database.
190    pub async fn read_with_option(
191        &mut self,
192        table: &str,
193        columns: &[&str],
194        key_set: impl Into<KeySet>,
195        options: ReadOptions,
196    ) -> Result<RowIterator<'_, impl Reader>, Status> {
197        let request = ReadRequest {
198            session: self.get_session_name(),
199            transaction: Some(self.transaction_selector.clone()),
200            table: table.to_string(),
201            index: options.index,
202            columns: columns.iter().map(|x| x.to_string()).collect(),
203            key_set: Some(key_set.into().inner),
204            limit: options.limit,
205            resume_token: vec![],
206            partition_token: vec![],
207            request_options: Transaction::create_request_options(options.call_options.priority),
208            data_boost_enabled: false,
209            order_by: 0,
210            directed_read_options: None,
211            lock_hint: 0,
212        };
213
214        let session = self.as_mut_session();
215        let reader = TableReader { request };
216        RowIterator::new(session, reader, Some(options.call_options)).await
217    }
218
219    /// read returns a RowIterator for reading multiple rows from the database.
220    /// ```
221    /// use google_cloud_spanner::key::Key;
222    /// use google_cloud_spanner::client::Client;
223    /// use google_cloud_spanner::client::Error;
224    ///
225    /// async fn run(client: Client) -> Result<(), Error> {
226    ///     let mut tx = client.single().await?;
227    ///     let row = tx.read_row("Guild", &["GuildID", "OwnerUserID"], Key::new(&"guild1")).await?;
228    ///     Ok(())
229    /// }
230    /// ```
231    pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
232        self.read_row_with_option(table, columns, key, ReadOptions::default())
233            .await
234    }
235
236    /// read returns a RowIterator for reading multiple rows from the database.
237    pub async fn read_row_with_option(
238        &mut self,
239        table: &str,
240        columns: &[&str],
241        key: Key,
242        options: ReadOptions,
243    ) -> Result<Option<Row>, Status> {
244        let call_options = options.call_options.clone();
245        let mut reader = self
246            .read_with_option(table, columns, KeySet::from(key), options)
247            .await?;
248        reader.set_call_options(call_options);
249        reader.next().await
250    }
251
252    pub(crate) fn get_session_name(&self) -> String {
253        self.session.as_ref().unwrap().session.name.to_string()
254    }
255
256    pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
257        self.session.as_mut().unwrap()
258    }
259
260    /// returns the owner ship of session.
261    /// must drop destroy after this method.
262    pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
263        self.session.take()
264    }
265}