gcloud_spanner/
transaction.rs

1use std::ops::DerefMut;
2use std::sync::atomic::AtomicI64;
3
4use prost_types::Struct;
5
6use google_cloud_gax::grpc::Status;
7use google_cloud_gax::retry::RetrySetting;
8use google_cloud_googleapis::spanner::v1::request_options::Priority;
9use google_cloud_googleapis::spanner::v1::{
10    execute_sql_request::QueryMode, execute_sql_request::QueryOptions as ExecuteQueryOptions, ExecuteSqlRequest,
11    ReadRequest, RequestOptions, TransactionSelector,
12};
13
14use crate::key::{Key, KeySet};
15use crate::reader::{Reader, RowIterator, StatementReader, TableReader};
16use crate::row::Row;
17use crate::session::ManagedSession;
18use crate::statement::Statement;
19
20#[derive(Clone, Default)]
21pub struct CallOptions {
22    /// Priority is the RPC priority to use for the read operation.
23    pub priority: Option<Priority>,
24    pub retry: Option<RetrySetting>,
25}
26
27#[derive(Clone)]
28pub struct ReadOptions {
29    /// The index to use for reading. If non-empty, you can only read columns
30    /// that are part of the index key, part of the primary key, or stored in the
31    /// index due to a STORING clause in the index definition.
32    pub index: String,
33    /// The maximum number of rows to read. A limit value less than 1 means no limit.
34    pub limit: i64,
35    pub call_options: CallOptions,
36}
37
38impl Default for ReadOptions {
39    fn default() -> Self {
40        ReadOptions {
41            index: "".to_string(),
42            limit: 0,
43            call_options: CallOptions::default(),
44        }
45    }
46}
47
48#[derive(Clone)]
49pub struct QueryOptions {
50    pub mode: QueryMode,
51    pub optimizer_options: Option<ExecuteQueryOptions>,
52    pub call_options: CallOptions,
53    /// If cancel safe is required, such as when tokio::select is used, set to false.
54    /// ```
55    /// use time::{Duration, OffsetDateTime};
56    /// use google_cloud_spanner::client::Client;
57    /// use google_cloud_spanner::key::Key;
58    /// use google_cloud_spanner::statement::Statement;
59    /// use google_cloud_spanner::transaction::QueryOptions;
60    ///
61    /// async fn query(client: Client) {
62    ///   let mut tx = client.single().await.unwrap();
63    ///   let option = QueryOptions {
64    ///     enable_resume: false,
65    ///     ..Default::default()
66    ///   };
67    ///   let mut stmt = Statement::new("SELECT ChangeRecord FROM READ_UserItemChangeStream (
68    ///           start_timestamp => @now,
69    ///           end_timestamp => NULL,
70    ///           partition_token => {},
71    ///           heartbeat_milliseconds => 10000
72    ///   )");
73    ///   stmt.add_param("now", &OffsetDateTime::now_utc());
74    ///   let mut rows = tx.query_with_option(stmt, option).await.unwrap();
75    ///   let mut tick = tokio::time::interval(tokio::time::Duration::from_millis(100));
76    ///   loop {
77    ///     tokio::select! {
78    ///        _ = tick.tick() => {
79    ///             // run task
80    ///        },
81    ///        maybe = rows.next() =>  {
82    ///          let row = maybe.unwrap().unwrap();
83    ///        }
84    ///     }
85    ///   }
86    /// }
87    pub enable_resume: bool,
88}
89
90impl Default for QueryOptions {
91    fn default() -> Self {
92        QueryOptions {
93            mode: QueryMode::Normal,
94            optimizer_options: None,
95            call_options: CallOptions::default(),
96            enable_resume: true,
97        }
98    }
99}
100
101pub struct Transaction {
102    pub(crate) session: Option<ManagedSession>,
103    // for returning ownership of session on before destroy
104    pub(crate) sequence_number: AtomicI64,
105    pub(crate) transaction_selector: TransactionSelector,
106    /// The transaction tag to include with each request in this transaction.
107    pub(crate) transaction_tag: Option<String>,
108}
109
110impl Transaction {
111    pub(crate) fn create_request_options(
112        priority: Option<Priority>,
113        transaction_tag: Option<String>,
114    ) -> Option<RequestOptions> {
115        if priority.is_none() && transaction_tag.as_ref().map(String::is_empty).unwrap_or(true) {
116            return None;
117        }
118        Some(RequestOptions {
119            priority: priority.unwrap_or_default().into(),
120            request_tag: String::new(),
121            transaction_tag: transaction_tag.unwrap_or_default(),
122        })
123    }
124
125    /// query executes a query against the database. It returns a RowIterator for
126    /// retrieving the resulting rows.
127    ///
128    /// query returns only row data, without a query plan or execution statistics.
129    pub async fn query(&mut self, statement: Statement) -> Result<RowIterator<'_, impl Reader>, Status> {
130        self.query_with_option(statement, QueryOptions::default()).await
131    }
132
133    /// query executes a query against the database. It returns a RowIterator for
134    /// retrieving the resulting rows.
135    ///
136    /// query returns only row data, without a query plan or execution statistics.
137    pub async fn query_with_option(
138        &mut self,
139        statement: Statement,
140        options: QueryOptions,
141    ) -> Result<RowIterator<'_, impl Reader>, Status> {
142        let request = ExecuteSqlRequest {
143            session: self.session.as_ref().unwrap().session.name.to_string(),
144            transaction: Some(self.transaction_selector.clone()),
145            sql: statement.sql,
146            params: Some(Struct {
147                fields: statement.params,
148            }),
149            param_types: statement.param_types,
150            resume_token: vec![],
151            query_mode: options.mode.into(),
152            partition_token: vec![],
153            seqno: 0,
154            query_options: options.optimizer_options,
155            request_options: Transaction::create_request_options(
156                options.call_options.priority,
157                self.transaction_tag.clone(),
158            ),
159            data_boost_enabled: false,
160            directed_read_options: None,
161            last_statement: false,
162        };
163        let session = self.session.as_mut().unwrap().deref_mut();
164        let reader = StatementReader {
165            enable_resume: options.enable_resume,
166            request,
167        };
168        RowIterator::new(session, reader, Some(options.call_options)).await
169    }
170
171    /// read returns a RowIterator for reading multiple rows from the database.
172    /// ```
173    /// use google_cloud_spanner::key::Key;
174    /// use google_cloud_spanner::client::{Client, Error};
175    ///
176    /// #[tokio::main]
177    /// async fn run(client: Client) -> Result<(), Error> {
178    ///     let mut tx = client.single().await?;
179    ///     let mut iter = tx.read("Guild", &["GuildID", "OwnerUserID"], vec![
180    ///         Key::new(&"pk1"),
181    ///         Key::new(&"pk2")
182    ///     ]).await?;
183    ///
184    ///     while let Some(row) = iter.next().await? {
185    ///         let guild_id = row.column_by_name::<String>("GuildID");
186    ///         //do something
187    ///     };
188    ///     Ok(())
189    /// }
190    /// ```
191    pub async fn read(
192        &mut self,
193        table: &str,
194        columns: &[&str],
195        key_set: impl Into<KeySet>,
196    ) -> Result<RowIterator<'_, impl Reader>, Status> {
197        self.read_with_option(table, columns, key_set, ReadOptions::default())
198            .await
199    }
200
201    /// read returns a RowIterator for reading multiple rows from the database.
202    pub async fn read_with_option(
203        &mut self,
204        table: &str,
205        columns: &[&str],
206        key_set: impl Into<KeySet>,
207        options: ReadOptions,
208    ) -> Result<RowIterator<'_, impl Reader>, Status> {
209        let request = ReadRequest {
210            session: self.get_session_name(),
211            transaction: Some(self.transaction_selector.clone()),
212            table: table.to_string(),
213            index: options.index,
214            columns: columns.iter().map(|x| x.to_string()).collect(),
215            key_set: Some(key_set.into().inner),
216            limit: options.limit,
217            resume_token: vec![],
218            partition_token: vec![],
219            request_options: Transaction::create_request_options(
220                options.call_options.priority,
221                self.transaction_tag.clone(),
222            ),
223            data_boost_enabled: false,
224            order_by: 0,
225            directed_read_options: None,
226            lock_hint: 0,
227        };
228
229        let session = self.as_mut_session();
230        let reader = TableReader { request };
231        RowIterator::new(session, reader, Some(options.call_options)).await
232    }
233
234    /// read returns a RowIterator for reading multiple rows from the database.
235    /// ```
236    /// use google_cloud_spanner::key::Key;
237    /// use google_cloud_spanner::client::Client;
238    /// use google_cloud_spanner::client::Error;
239    ///
240    /// async fn run(client: Client) -> Result<(), Error> {
241    ///     let mut tx = client.single().await?;
242    ///     let row = tx.read_row("Guild", &["GuildID", "OwnerUserID"], Key::new(&"guild1")).await?;
243    ///     Ok(())
244    /// }
245    /// ```
246    pub async fn read_row(&mut self, table: &str, columns: &[&str], key: Key) -> Result<Option<Row>, Status> {
247        self.read_row_with_option(table, columns, key, ReadOptions::default())
248            .await
249    }
250
251    /// read returns a RowIterator for reading multiple rows from the database.
252    pub async fn read_row_with_option(
253        &mut self,
254        table: &str,
255        columns: &[&str],
256        key: Key,
257        options: ReadOptions,
258    ) -> Result<Option<Row>, Status> {
259        let call_options = options.call_options.clone();
260        let mut reader = self
261            .read_with_option(table, columns, KeySet::from(key), options)
262            .await?;
263        reader.set_call_options(call_options);
264        reader.next().await
265    }
266
267    pub(crate) fn get_session_name(&self) -> String {
268        self.session.as_ref().unwrap().session.name.to_string()
269    }
270
271    pub(crate) fn as_mut_session(&mut self) -> &mut ManagedSession {
272        self.session.as_mut().unwrap()
273    }
274
275    /// returns the owner ship of session.
276    /// must drop destroy after this method.
277    pub(crate) fn take_session(&mut self) -> Option<ManagedSession> {
278        self.session.take()
279    }
280}