Skip to main content

google_cloud_spanner/
read_only_transaction.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::database_client::DatabaseClient;
16use crate::error::internal_error;
17use crate::model::TransactionOptions;
18use crate::model::TransactionSelector;
19use crate::model::transaction_options::ReadOnly;
20use crate::precommit::PrecommitTokenTracker;
21use crate::result_set::{ResultSet, ResultSetParams, StreamOperation};
22use crate::statement::Statement;
23use crate::timestamp_bound::TimestampBound;
24use crate::transaction_retry_policy::is_aborted;
25use google_cloud_gax::backoff_policy::BackoffPolicyArg;
26use google_cloud_gax::options::internal::RequestOptionsExt as _;
27use google_cloud_gax::retry_policy::RetryPolicyArg;
28use std::mem::replace;
29use std::sync::{Arc, Mutex};
30use std::time::Duration;
31use tokio::sync::Notify;
32
33/// A builder for [SingleUseReadOnlyTransaction].
34///
35/// # Example
36/// ```
37/// # use google_cloud_spanner::client::Spanner;
38/// # use google_cloud_spanner::transaction::TimestampBound;
39/// # async fn build_tx(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
40/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
41/// let read_only_tx = db_client.single_use()
42///     .set_timestamp_bound(TimestampBound::strong())
43///     .build();
44/// # Ok(())
45/// # }
46/// ```
47pub struct SingleUseReadOnlyTransactionBuilder {
48    client: DatabaseClient,
49    timestamp_bound: Option<TimestampBound>,
50}
51
52impl SingleUseReadOnlyTransactionBuilder {
53    pub(crate) fn new(client: DatabaseClient) -> Self {
54        Self {
55            client,
56            timestamp_bound: None,
57        }
58    }
59
60    /// Sets the timestamp bound for the read-only transaction.
61    ///
62    /// # Example
63    /// ```
64    /// # use google_cloud_spanner::client::Spanner;
65    /// # use google_cloud_spanner::transaction::TimestampBound;
66    /// # async fn set_bound(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
67    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
68    /// let builder = db_client.single_use().set_timestamp_bound(TimestampBound::strong());
69    /// # Ok(())
70    /// # }
71    /// ```
72    /// When reading data in Spanner in a read-only transaction, you can set a timestamp bound,
73    /// which tells Spanner how to choose a timestamp at which to read the data.
74    ///
75    /// See <https://docs.cloud.google.com/spanner/docs/timestamp-bounds> for more information.
76    pub fn set_timestamp_bound(mut self, bound: TimestampBound) -> Self {
77        self.timestamp_bound = Some(bound);
78        self
79    }
80
81    /// Builds the [SingleUseReadOnlyTransaction].
82    ///
83    /// # Example
84    ///
85    /// ```
86    /// # use google_cloud_spanner::client::Spanner;
87    /// # async fn build(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
88    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
89    /// let tx = db_client.single_use().build();
90    /// # Ok(())
91    /// # }
92    /// ```
93    pub fn build(self) -> SingleUseReadOnlyTransaction {
94        let read_only = match self.timestamp_bound {
95            Some(b) => ReadOnly::default().set_timestamp_bound(b.0),
96            None => ReadOnly::default().set_strong(true),
97        };
98        let transaction_selector = crate::model::TransactionSelector::default()
99            .set_single_use(TransactionOptions::default().set_read_only(read_only));
100
101        let session_name = self.client.session_name();
102        let channel_hint = self.client.spanner.next_channel_hint();
103        SingleUseReadOnlyTransaction {
104            context: ReadContext {
105                session_name,
106                client: self.client,
107                transaction_selector: ReadContextTransactionSelector::Fixed(
108                    transaction_selector,
109                    None,
110                ),
111                precommit_token_tracker: PrecommitTokenTracker::new_noop(),
112                transaction_tag: None,
113                channel_hint,
114                begin_transaction_request_options: None,
115            },
116        }
117    }
118}
119
120/// A single-use read-only transaction. A single-use read-only transaction is the most
121/// efficient way to execute a single query or read operation.
122///
123/// # Example
124/// ```
125/// # use google_cloud_spanner::client::Spanner;
126/// # use google_cloud_spanner::statement::Statement;
127/// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
128/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
129/// let tx = db_client.single_use().build();
130/// let stmt = Statement::builder("SELECT * FROM users WHERE id = @id")
131///     .add_param("id", &42)
132///     .build();
133/// let rs = tx.execute_query(stmt).await?;
134/// # Ok(())
135/// # }
136/// ```
137#[derive(Debug)]
138pub struct SingleUseReadOnlyTransaction {
139    context: ReadContext,
140}
141
142impl SingleUseReadOnlyTransaction {
143    /// Executes a query.
144    ///
145    /// # Example
146    /// ```
147    /// # use google_cloud_spanner::client::Spanner;
148    /// # use google_cloud_spanner::statement::Statement;
149    /// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
150    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
151    /// let tx = db_client.single_use().build();
152    /// let stmt = Statement::builder("SELECT * FROM users WHERE id = @id")
153    ///     .add_param("id", &42)
154    ///     .build();
155    /// let mut rs = tx.execute_query(stmt).await?;
156    /// while let Some(row) = rs.next().await {
157    ///     let _row = row?;
158    ///     // process row
159    /// }
160    /// # Ok(())
161    /// # }
162    /// ```
163    pub async fn execute_query<T: Into<Statement>>(
164        &self,
165        statement: T,
166    ) -> crate::Result<ResultSet> {
167        self.context.execute_query(statement).await
168    }
169
170    /// Reads rows from the database using key lookups and scans, as a simple key/value style alternative to execute_query.
171    ///
172    /// # Example
173    /// ```
174    /// # use google_cloud_spanner::client::Spanner;
175    /// # use google_cloud_spanner::key::KeySet;
176    /// # use google_cloud_spanner::read::ReadRequest;
177    /// # use google_cloud_spanner::key;
178    /// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
179    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
180    /// let transaction = db_client.single_use().build();
181    ///
182    /// // Read using the primary key
183    /// let read_by_pk = ReadRequest::builder("Users", vec!["Id", "Name"]).with_keys(KeySet::all()).build();
184    /// let mut result_set = transaction.execute_read(read_by_pk).await?;
185    /// while let Some(row) = result_set.next().await {
186    ///     let _row = row?;
187    ///     // process row
188    /// }
189    ///
190    /// // Read using a secondary index
191    /// let read_by_index = ReadRequest::builder("Users", vec!["Id", "Name"])
192    ///     .with_index("UsersByIndex", key![1_i64]).build();
193    /// let mut result_set = transaction.execute_read(read_by_index).await?;
194    /// while let Some(row) = result_set.next().await {
195    ///     let _row = row?;
196    ///     // process row
197    /// }
198    /// # Ok(())
199    /// # }
200    /// ```
201    pub async fn execute_read<T: Into<crate::read::ReadRequest>>(
202        &self,
203        read: T,
204    ) -> crate::Result<ResultSet> {
205        self.context.execute_read(read).await
206    }
207}
208
209/// Options for how to start a transaction.
210#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
211#[non_exhaustive]
212pub enum BeginTransactionOption {
213    /// The transaction will be started inlined with the first statement.
214    /// This reduces the number of round-trips to Spanner by one.
215    #[default]
216    InlineBegin,
217    /// The transaction will be started explicitly using a `BeginTransaction` RPC.
218    ExplicitBegin,
219}
220
221/// A builder for [MultiUseReadOnlyTransaction].
222///
223/// # Example
224/// ```
225/// # use google_cloud_spanner::client::Spanner;
226/// # use google_cloud_spanner::transaction::TimestampBound;
227/// # async fn build_tx(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
228/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
229/// let read_only_tx = db_client.read_only_transaction()
230///     .set_timestamp_bound(TimestampBound::strong())
231///     .build()
232///     .await?;
233/// # Ok(())
234/// # }
235/// ```
236pub struct MultiUseReadOnlyTransactionBuilder {
237    client: DatabaseClient,
238    timestamp_bound: Option<TimestampBound>,
239    begin_transaction_option: BeginTransactionOption,
240    begin_gax_options: Option<crate::RequestOptions>,
241}
242
243impl MultiUseReadOnlyTransactionBuilder {
244    pub(crate) fn new(client: DatabaseClient) -> Self {
245        Self {
246            client,
247            timestamp_bound: None,
248            begin_transaction_option: BeginTransactionOption::InlineBegin,
249            begin_gax_options: None,
250        }
251    }
252
253    /// Sets the option for how to start a transaction.
254    ///
255    /// # Example
256    /// ```
257    /// # use google_cloud_spanner::client::Spanner;
258    /// # use google_cloud_spanner::transaction::BeginTransactionOption;
259    /// # use google_cloud_spanner::statement::Statement;
260    /// # async fn set_begin_option(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
261    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
262    /// let transaction = db_client.read_only_transaction().with_begin_transaction_option(BeginTransactionOption::ExplicitBegin).build().await?;
263    /// let statement = Statement::builder("SELECT * FROM users").build();
264    /// let result_set = transaction.execute_query(statement).await?;
265    /// # Ok(())
266    /// # }
267    /// ```
268    ///
269    /// By default, the Spanner client will inline the `BeginTransaction` call with the first query
270    /// in the transaction. This reduces the number of round-trips to Spanner that are needed for a
271    /// transaction. Setting this option to `ExplicitBegin` can be beneficial for specific transaction
272    /// shapes:
273    ///
274    /// 1. When the transaction executes multiple parallel queries at the start of the transaction.
275    ///    Only one query can include a `BeginTransaction` option, and all other queries must wait for
276    ///    the first query to return the first result before they can proceed to execute. A
277    ///    `BeginTransaction` RPC will quickly return a transaction ID and allow all queries to start
278    ///    execution in parallel once the transaction ID has been returned.
279    /// 2. When the first query in the transaction could fail. If the query fails, then it will also
280    ///    not start a transaction and return a transaction ID. The transaction will then fall back to
281    ///    executing a `BeginTransaction` RPC and retry the first query.
282    ///
283    /// Default is `BeginTransactionOption::InlineBegin`.
284    pub fn with_begin_transaction_option(mut self, option: BeginTransactionOption) -> Self {
285        self.begin_transaction_option = option;
286        self
287    }
288
289    /// Sets the per-attempt timeout for the BeginTransaction RPC.
290    ///
291    /// # Example
292    /// ```
293    /// # use google_cloud_spanner::client::Spanner;
294    /// # use std::time::Duration;
295    /// # async fn sample(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
296    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
297    /// let transaction = db_client.read_only_transaction()
298    ///     .with_begin_attempt_timeout(Duration::from_secs(10))
299    ///     .build()
300    ///     .await?;
301    /// # Ok(())
302    /// # }
303    /// ```
304    ///
305    /// Note: This timeout is only used if the transaction uses the `ExplicitBegin` transaction option.
306    pub fn with_begin_attempt_timeout(mut self, timeout: Duration) -> Self {
307        self.begin_gax_options
308            .get_or_insert_with(crate::RequestOptions::default)
309            .set_attempt_timeout(timeout);
310        self
311    }
312
313    /// Sets the retry policy for the BeginTransaction RPC.
314    ///
315    /// # Example
316    /// ```
317    /// # use google_cloud_spanner::client::Spanner;
318    /// # use google_cloud_gax::retry_policy::NeverRetry;
319    /// # async fn sample(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
320    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
321    /// let transaction = db_client.read_only_transaction()
322    ///     .with_begin_retry_policy(NeverRetry)
323    ///     .build()
324    ///     .await?;
325    /// # Ok(())
326    /// # }
327    /// ```
328    ///
329    /// Note: This policy is only used if the transaction uses the `ExplicitBegin` transaction option.
330    pub fn with_begin_retry_policy(mut self, policy: impl Into<RetryPolicyArg>) -> Self {
331        self.begin_gax_options
332            .get_or_insert_with(crate::RequestOptions::default)
333            .set_retry_policy(policy);
334        self
335    }
336
337    /// Sets the backoff policy for the BeginTransaction RPC.
338    ///
339    /// # Example
340    /// ```
341    /// # use google_cloud_spanner::client::Spanner;
342    /// # use google_cloud_gax::exponential_backoff::ExponentialBackoff;
343    /// # async fn sample(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
344    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
345    /// let transaction = db_client.read_only_transaction()
346    ///     .with_begin_backoff_policy(ExponentialBackoff::default())
347    ///     .build()
348    ///     .await?;
349    /// # Ok(())
350    /// # }
351    /// ```
352    ///
353    /// Note: This policy is only used if the transaction uses the `ExplicitBegin` transaction option.
354    pub fn with_begin_backoff_policy(mut self, policy: impl Into<BackoffPolicyArg>) -> Self {
355        self.begin_gax_options
356            .get_or_insert_with(crate::RequestOptions::default)
357            .set_backoff_policy(policy);
358        self
359    }
360
361    /// Sets the timestamp bound for the read-only transaction.
362    ///
363    /// # Example
364    /// ```
365    /// # use google_cloud_spanner::client::Spanner;
366    /// # use google_cloud_spanner::transaction::TimestampBound;
367    /// # async fn set_bound(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
368    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
369    /// let builder = db_client.read_only_transaction().set_timestamp_bound(TimestampBound::strong());
370    /// # Ok(())
371    /// # }
372    /// ```
373    pub fn set_timestamp_bound(mut self, bound: TimestampBound) -> Self {
374        self.timestamp_bound = Some(bound);
375        self
376    }
377
378    async fn begin(
379        &self,
380        session_name: String,
381        options: TransactionOptions,
382        channel_hint: usize,
383        request_options: crate::RequestOptions,
384    ) -> crate::Result<ReadContextTransactionSelector> {
385        let response = execute_begin_transaction(
386            &self.client,
387            session_name,
388            options,
389            None,
390            channel_hint,
391            request_options,
392            None,
393        )
394        .await?;
395
396        let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id);
397
398        Ok(ReadContextTransactionSelector::Fixed(
399            transaction_selector,
400            response.read_timestamp,
401        ))
402    }
403
404    /// Builds the [MultiUseReadOnlyTransaction] and starts the transaction
405    /// by calling the `BeginTransaction` RPC.
406    ///
407    /// # Example
408    /// ```
409    /// # use google_cloud_spanner::client::Spanner;
410    /// # async fn build(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
411    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
412    /// let tx = db_client.read_only_transaction().build().await?;
413    /// # Ok(())
414    /// # }
415    /// ```
416    pub async fn build(self) -> crate::Result<MultiUseReadOnlyTransaction> {
417        let read_only = ReadOnly::default().set_return_read_timestamp(true);
418        let read_only = match self.timestamp_bound.as_ref() {
419            Some(b) => read_only.set_timestamp_bound(b.0.clone()),
420            None => read_only.set_strong(true),
421        };
422        let options = TransactionOptions::default().set_read_only(read_only);
423
424        let session_name = self.client.session_name();
425        let channel_hint = self.client.spanner.next_channel_hint();
426        let selector = match self.begin_transaction_option {
427            BeginTransactionOption::ExplicitBegin => {
428                self.begin(
429                    session_name.clone(),
430                    options,
431                    channel_hint,
432                    self.begin_gax_options.clone().unwrap_or_default(),
433                )
434                .await?
435            }
436            BeginTransactionOption::InlineBegin => ReadContextTransactionSelector::Lazy(Arc::new(
437                Mutex::new(TransactionState::NotStarted(options)),
438            )),
439        };
440
441        Ok(MultiUseReadOnlyTransaction {
442            context: ReadContext {
443                session_name,
444                client: self.client,
445                transaction_selector: selector,
446                precommit_token_tracker: PrecommitTokenTracker::new_noop(),
447                transaction_tag: None,
448                channel_hint,
449                begin_transaction_request_options: self.begin_gax_options.clone(),
450            },
451        })
452    }
453}
454
455/// A multi-use read-only transaction. This transaction can be used for multiple read queries
456/// ensuring consistency across all queries.
457///
458/// # Example
459/// ```
460/// # use google_cloud_spanner::client::Spanner;
461/// # use google_cloud_spanner::statement::Statement;
462/// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
463/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
464/// let tx = db_client.read_only_transaction().build().await?;
465/// let stmt1 = Statement::builder("SELECT * FROM users WHERE id = @id")
466///     .add_param("id", &42)
467///     .build();
468/// let mut rs1 = tx.execute_query(stmt1).await?;
469///
470/// let stmt2 = Statement::builder("SELECT * FROM other_table")
471///     .build();
472/// let mut rs2 = tx.execute_query(stmt2).await?;
473/// # Ok(())
474/// # }
475/// ```
476#[derive(Debug)]
477pub struct MultiUseReadOnlyTransaction {
478    pub(crate) context: ReadContext,
479}
480
481impl MultiUseReadOnlyTransaction {
482    /// Returns the read timestamp chosen for the transaction.
483    pub fn read_timestamp(&self) -> Option<wkt::Timestamp> {
484        self.context.transaction_selector.read_timestamp()
485    }
486
487    /// Executes a query using this transaction.
488    ///
489    /// # Example
490    /// ```
491    /// # use google_cloud_spanner::client::Spanner;
492    /// # use google_cloud_spanner::statement::Statement;
493    /// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
494    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
495    /// let tx = db_client.read_only_transaction().build().await?;
496    /// let stmt = Statement::builder("SELECT * FROM users WHERE id = @id")
497    ///     .add_param("id", &42)
498    ///     .build();
499    /// let mut rs = tx.execute_query(stmt).await?;
500    /// while let Some(row) = rs.next().await {
501    ///     let _row = row?;
502    ///     // process row
503    /// }
504    /// # Ok(())
505    /// # }
506    /// ```
507    pub async fn execute_query<T: Into<Statement>>(
508        &self,
509        statement: T,
510    ) -> crate::Result<ResultSet> {
511        self.context.execute_query(statement).await
512    }
513
514    /// Reads rows from the database using key lookups and scans, as a simple key/value style alternative to execute_query.
515    ///
516    /// # Example
517    /// ```
518    /// # use google_cloud_spanner::client::Spanner;
519    /// # use google_cloud_spanner::key::KeySet;
520    /// # use google_cloud_spanner::read::ReadRequest;
521    /// # use google_cloud_spanner::key;
522    /// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
523    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
524    /// let transaction = db_client.read_only_transaction().build().await?;
525    ///
526    /// // Read using the primary key
527    /// let read_by_pk = ReadRequest::builder("Users", vec!["Id", "Name"]).with_keys(KeySet::all()).build();
528    /// let mut result_set = transaction.execute_read(read_by_pk).await?;
529    /// while let Some(row) = result_set.next().await {
530    ///     let _row = row?;
531    ///     // process row
532    /// }
533    ///
534    /// // Read using a secondary index
535    /// let read_by_index = ReadRequest::builder("Users", vec!["Id", "Name"])
536    ///     .with_index("UsersByIndex", key![1_i64]).build();
537    /// let mut result_set = transaction.execute_read(read_by_index).await?;
538    /// while let Some(row) = result_set.next().await {
539    ///     let _row = row?;
540    ///     // process row
541    /// }
542    /// # Ok(())
543    /// # }
544    /// ```
545    pub async fn execute_read<T: Into<crate::read::ReadRequest>>(
546        &self,
547        read: T,
548    ) -> crate::Result<ResultSet> {
549        self.context.execute_read(read).await
550    }
551}
552
553/// Executes an explicit `BeginTransaction` RPC on Spanner.
554pub(crate) async fn execute_begin_transaction(
555    client: &crate::database_client::DatabaseClient,
556    session_name: String,
557    options: crate::model::TransactionOptions,
558    transaction_tag: Option<String>,
559    channel_hint: usize,
560    request_options: crate::RequestOptions,
561    mutation_key: Option<crate::model::Mutation>,
562) -> crate::Result<crate::model::Transaction> {
563    let mut request = crate::model::BeginTransactionRequest::default()
564        .set_session(session_name)
565        .set_options(options)
566        .set_or_clear_mutation_key(mutation_key);
567    if let Some(tag) = transaction_tag {
568        request = request
569            .set_request_options(crate::model::RequestOptions::default().set_transaction_tag(tag));
570    }
571
572    client
573        .spanner
574        .begin_transaction(request, request_options, channel_hint)
575        .await
576}
577
578#[derive(Clone, Debug)]
579pub(crate) enum ReadContextTransactionSelector {
580    Fixed(crate::model::TransactionSelector, Option<wkt::Timestamp>),
581    Lazy(Arc<Mutex<TransactionState>>),
582}
583
584#[derive(Clone, Debug)]
585pub(crate) enum TransactionState {
586    NotStarted(crate::model::TransactionOptions),
587    Starting(crate::model::TransactionOptions, Arc<Notify>),
588    Started(crate::model::TransactionSelector, Option<wkt::Timestamp>),
589    Failed(Arc<crate::Error>),
590}
591
592enum SelectorStatus {
593    Ready(crate::model::TransactionSelector),
594    Wait(std::sync::Arc<tokio::sync::Notify>),
595}
596
597impl ReadContextTransactionSelector {
598    pub(crate) async fn selector(&self) -> crate::Result<crate::model::TransactionSelector> {
599        match self {
600            Self::Fixed(selector, _) => Ok(selector.clone()),
601            Self::Lazy(_) => loop {
602                match self.poll_selector_status()? {
603                    SelectorStatus::Ready(selector) => return Ok(selector),
604                    SelectorStatus::Wait(notify) => notify.notified().await,
605                }
606            },
607        }
608    }
609
610    /// Inspects the current lazy selector state returning whether it is ready,
611    /// failed, or needs to wait for the transaction to start.
612    fn poll_selector_status(&self) -> crate::Result<SelectorStatus> {
613        let Self::Lazy(lazy) = self else {
614            unreachable!("poll_selector_status called on non-Lazy selector");
615        };
616        let mut guard = lazy.lock().expect("transaction state mutex poisoned");
617
618        // Fast path: Transaction is already started.
619        if let TransactionState::Started(selector, _) = &*guard {
620            return Ok(SelectorStatus::Ready(selector.clone()));
621        }
622
623        // If the transaction has not started, extract options and transition the state to Starting.
624        let pending_options = if let TransactionState::NotStarted(options) = &*guard {
625            Some(options.clone())
626        } else {
627            // The state is either Starting or Failed. Concurrent threads will yield None here
628            // and fall through to either wait for the leader or fail immediately.
629            None
630        };
631        if let Some(options) = pending_options {
632            // This thread becomes the "leader" and will start the transaction.
633            let notify = Arc::new(Notify::new());
634            *guard = TransactionState::Starting(options.clone(), Arc::clone(&notify));
635            return Ok(SelectorStatus::Ready(
636                crate::model::TransactionSelector::default().set_begin(options),
637            ));
638        }
639
640        // Handle other states: yield error or wait.
641        match &*guard {
642            // Note: Failed will only be reached if the following happens:
643            // 1. The first query fails and the transaction falls back to an explicit BeginTransaction RPC.
644            // 2. The BeginTransaction RPC fails. This is the error that will be returned to all the waiting queries.
645            TransactionState::Failed(err) => {
646                let error = if let Some(status) = err.status() {
647                    crate::Error::service(status.clone())
648                } else {
649                    crate::error::internal_error(format!("Transaction failed to start: {}", err))
650                };
651                Err(error)
652            }
653            // Transaction is starting. Wait until a transaction ID is returned.
654            TransactionState::Starting(_, notify) => Ok(SelectorStatus::Wait(Arc::clone(notify))),
655            TransactionState::Started(_, _) | TransactionState::NotStarted(_) => unreachable!(),
656        }
657    }
658}
659
660pub(crate) struct ExplicitBeginParams {
661    pub(crate) client: crate::database_client::DatabaseClient,
662    pub(crate) session_name: String,
663    pub(crate) transaction_tag: Option<String>,
664    pub(crate) channel_hint: usize,
665    pub(crate) request_options: crate::RequestOptions,
666    pub(crate) is_stream_fallback: bool,
667    pub(crate) precommit_token_tracker: crate::precommit::PrecommitTokenTracker,
668    pub(crate) mutation_key: Option<crate::model::Mutation>,
669}
670
671impl ReadContextTransactionSelector {
672    /// Explicitly begins a transaction if the transaction selector is a `Lazy`
673    /// selector and the transaction has not yet been started. This is used by
674    /// the client to force the start of a transaction if the first statement
675    /// failed.
676    pub(crate) async fn begin_explicitly(&self, params: ExplicitBeginParams) -> crate::Result<()> {
677        let Self::Lazy(lazy) = self else {
678            return Ok(());
679        };
680
681        enum FallbackAction {
682            Begin(
683                crate::model::TransactionOptions,
684                Option<Arc<tokio::sync::Notify>>,
685            ),
686            Wait(Arc<tokio::sync::Notify>),
687            None,
688        }
689
690        let action = {
691            let mut guard = lazy
692                .lock()
693                .map_err(|_| internal_error("transaction state mutex poisoned"))?;
694            match &*guard {
695                TransactionState::NotStarted(options) => {
696                    // The transaction has not started yet. This thread becomes the "leader"
697                    // and transitions the state to Starting before performing the BeginTransaction RPC.
698                    let options = options.clone();
699                    let notify = Arc::new(tokio::sync::Notify::new());
700                    *guard = TransactionState::Starting(options.clone(), Arc::clone(&notify));
701                    FallbackAction::Begin(options, Some(notify))
702                }
703                TransactionState::Starting(options, notify) => {
704                    // The transaction is already in the process of starting. If this call originated from
705                    // an explicit begin request (`is_stream_fallback = false`), this thread is a follower
706                    // and must wait for the leader. If this call originated from a stream resume fallback
707                    // (`is_stream_fallback = true`), this thread is the stream leader whose initial query failed,
708                    // and it must proceed with an explicit BeginTransaction RPC.
709                    if !params.is_stream_fallback {
710                        FallbackAction::Wait(Arc::clone(notify))
711                    } else {
712                        FallbackAction::Begin(options.clone(), Some(Arc::clone(notify)))
713                    }
714                }
715                TransactionState::Started(_, _) | TransactionState::Failed(_) => {
716                    // The transaction has already reached a terminal state (Started or Failed).
717                    // No further action is needed in this explicit begin attempt.
718                    FallbackAction::None
719                }
720            }
721        };
722
723        let (options, notify_opt) = match action {
724            FallbackAction::None => return Ok(()),
725            FallbackAction::Wait(notify) => {
726                notify.notified().await;
727                return Ok(());
728            }
729            FallbackAction::Begin(opts, notif) => (opts, notif),
730        };
731
732        // Only the leader thread will reach this point to perform the explicit begin.
733        // Waiters are blocked in `poll_selector_status` waiting for the result,
734        // and already completed states return early above.
735        let response = match execute_begin_transaction(
736            &params.client,
737            params.session_name,
738            options,
739            params.transaction_tag,
740            params.channel_hint,
741            params.request_options,
742            params.mutation_key,
743        )
744        .await
745        {
746            Ok(r) => r,
747            Err(e) => {
748                let mut guard = lazy.lock().expect("transaction state mutex poisoned");
749                let error = Arc::new(e);
750                *guard = TransactionState::Failed(Arc::clone(&error));
751                // Release the lock and notify all the waiting queries that
752                // the transaction has failed.
753                drop(guard);
754                if let Some(notify) = notify_opt {
755                    notify.notify_waiters();
756                }
757
758                let return_error = if let Some(status) = error.status() {
759                    crate::Error::service(status.clone())
760                } else {
761                    crate::error::internal_error(format!("Transaction failed to start: {}", error))
762                };
763                return Err(return_error);
764            }
765        };
766
767        self.update(response.id, response.read_timestamp)?;
768        params
769            .precommit_token_tracker
770            .update(response.precommit_token);
771
772        Ok(())
773    }
774
775    /// Updates the transaction state to `Started` with the given transaction ID and optional
776    /// read timestamp.
777    ///
778    /// This method is called when a transaction has successfully been initiated (either via
779    /// an inline begin in a query or an explicit `BeginTransaction` RPC).
780    ///
781    /// If the previous state was `Starting`, it will notify all concurrent threads that were
782    /// waiting for the transaction to start.
783    pub(crate) fn update(
784        &self,
785        id: bytes::Bytes,
786        timestamp: Option<wkt::Timestamp>,
787    ) -> crate::Result<()> {
788        let Self::Lazy(lazy) = self else {
789            return Ok(());
790        };
791        let mut guard = lazy.lock().expect("transaction state mutex poisoned");
792
793        if matches!(
794            &*guard,
795            TransactionState::NotStarted(_) | TransactionState::Starting(_, _)
796        ) {
797            // Atomically transition the state to Started and extract the previous state.
798            // We do this to take ownership of the Notify handle (if it was Starting)
799            // so we can notify waiters after dropping the lock.
800            let previous_state = replace(
801                &mut *guard,
802                TransactionState::Started(TransactionSelector::default().set_id(id), timestamp),
803            );
804            drop(guard);
805
806            // Notify all queries that are waiting for the transaction.
807            if let TransactionState::Starting(_, notify) = previous_state {
808                notify.notify_waiters();
809            }
810            Ok(())
811        } else if let TransactionState::Started(existing_selector, _) = &*guard {
812            // Spanner returns the transaction ID on all statements executed within that transaction
813            // when using multiplexed sessions.If the transaction has already started with the same ID,
814            // this is expected behavior and can be ignored.
815            if existing_selector.id() == Some(&id) {
816                Ok(())
817            } else {
818                Err(crate::error::internal_error(
819                    "got a transaction id for an already Started or Failed transaction",
820                ))
821            }
822        } else {
823            // This should never happen.
824            Err(crate::error::internal_error(
825                "got a transaction id for an already Started or Failed transaction",
826            ))
827        }
828    }
829
830    /// Returns the transaction ID if it is already available, without waiting.
831    ///
832    /// This method inspects the selector and returns the transaction ID if the
833    /// transaction has already started. It returns `None` if the transaction
834    /// has not yet started or is in a state without an ID.
835    pub(crate) fn get_id_no_wait(&self) -> crate::Result<Option<bytes::Bytes>> {
836        use crate::model::transaction_selector::Selector;
837        match self {
838            Self::Fixed(selector, _) => {
839                if let Some(Selector::Id(id)) = &selector.selector {
840                    return Ok(Some(id.clone()));
841                }
842            }
843            Self::Lazy(lazy) => {
844                let guard = lazy
845                    .lock()
846                    .map_err(|_| internal_error("transaction state mutex poisoned"))?;
847                if let TransactionState::Started(selector, _) = &*guard
848                    && let Some(Selector::Id(id)) = &selector.selector
849                {
850                    return Ok(Some(id.clone()));
851                }
852            }
853        }
854        Ok(None)
855    }
856
857    /// Returns whether the transaction selector is currently in the `Starting` state.
858    pub(crate) fn is_starting(&self) -> crate::Result<bool> {
859        match self {
860            Self::Lazy(lazy) => {
861                let guard = lazy
862                    .lock()
863                    .map_err(|_| internal_error("transaction state mutex poisoned"))?;
864                Ok(matches!(&*guard, TransactionState::Starting(_, _)))
865            }
866            _ => Ok(false),
867        }
868    }
869
870    /// Resets the selector state from `Starting` back to `NotStarted`.
871    ///
872    /// This is used during stream resume fallbacks when the first query stream
873    /// fails before yielding a transaction ID. It unlocks any parked waiters
874    /// allowing them (or the retry attempt) to include the begin option again.
875    /// Only one of the waiters will win that 'race' and include a new
876    /// BeginTransaction option. All the others will continue to wait.
877    pub(crate) fn maybe_reset_starting(&self) {
878        let Self::Lazy(lazy) = self else {
879            return;
880        };
881
882        let mut guard = lazy.lock().expect("transaction state mutex poisoned");
883        if let TransactionState::Starting(options, notify) = &*guard {
884            let options = options.clone();
885            let notify = Arc::clone(notify);
886            *guard = TransactionState::NotStarted(options);
887            drop(guard);
888            notify.notify_waiters();
889        }
890    }
891
892    /// Returns the read timestamp of the transaction, if available.
893    ///
894    /// For `Fixed` transactions, this returns the timestamp stored in the variant.
895    /// For `Lazy` transactions, this returns the timestamp once the transaction has successfully started
896    /// and yielded a timestamp. Returns `None` if the transaction has not started or did not yield a timestamp.
897    pub(crate) fn read_timestamp(&self) -> Option<wkt::Timestamp> {
898        match self {
899            Self::Fixed(_, timestamp) => *timestamp,
900            Self::Lazy(lazy) => {
901                let guard = lazy.lock().expect("transaction state mutex poisoned");
902                if let TransactionState::Started(_, timestamp) = &*guard {
903                    *timestamp
904                } else {
905                    None
906                }
907            }
908        }
909    }
910}
911
912#[derive(Clone, Debug)]
913pub(crate) struct ReadContext {
914    pub(crate) session_name: String,
915    pub(crate) client: DatabaseClient,
916    pub(crate) transaction_selector: ReadContextTransactionSelector,
917    pub(crate) precommit_token_tracker: PrecommitTokenTracker,
918    pub(crate) transaction_tag: Option<String>,
919    pub(crate) channel_hint: usize,
920    pub(crate) begin_transaction_request_options: Option<crate::RequestOptions>,
921}
922
923impl ReadContext {
924    /// Amends the given request options with the transaction tag if present.
925    ///
926    /// This method returns the `RequestOptions` that should be used for the request.
927    /// If no `transaction_tag` has been set, the given `RequestOptions` is returned unchanged.
928    /// If a `transaction_tag` has been set, the given `RequestOptions` is modified to include the tag
929    /// (or a new `RequestOptions` is created if `None` was passed in).
930    pub(crate) fn amend_request_options(
931        &self,
932        mut options: Option<crate::model::RequestOptions>,
933    ) -> Option<crate::model::RequestOptions> {
934        if let Some(tag) = &self.transaction_tag {
935            options
936                .get_or_insert_with(crate::model::RequestOptions::default)
937                .transaction_tag = tag.clone();
938        }
939        options
940    }
941
942    /// Attempts to execute an explicit `begin_transaction` RPC if the current transaction
943    /// selector is still in the `Lazy(NotStarted)` state. This is used as a
944    /// fallback mechanism when an initial implicit begin attempt failed.
945    pub(crate) async fn begin_explicitly_if_not_started(
946        &self,
947        fallback_options: crate::RequestOptions,
948        is_stream_fallback: bool,
949        mutation_key: Option<crate::model::Mutation>,
950    ) -> crate::Result<bool> {
951        let ReadContextTransactionSelector::Lazy(lazy) = &self.transaction_selector else {
952            return Ok(false);
953        };
954        let is_started = matches!(&*lazy.lock().unwrap(), TransactionState::Started(_, _));
955        if is_started {
956            return Ok(false);
957        }
958
959        let options = merge_request_options(
960            fallback_options,
961            self.begin_transaction_request_options.as_ref(),
962        );
963
964        self.transaction_selector
965            .begin_explicitly(ExplicitBeginParams {
966                client: self.client.clone(),
967                session_name: self.session_name.clone(),
968                transaction_tag: self.transaction_tag.clone(),
969                channel_hint: self.channel_hint,
970                request_options: options,
971                is_stream_fallback,
972                precommit_token_tracker: self.precommit_token_tracker.clone(),
973                mutation_key,
974            })
975            .await?;
976        Ok(true)
977    }
978}
979
980/// Merges the configured fields from a `source` `RequestOptions` into a `destination` `RequestOptions`.
981/// Configured options in `source` will override those in `destination`.
982fn merge_request_options(
983    mut destination: crate::RequestOptions,
984    source: Option<&crate::RequestOptions>,
985) -> crate::RequestOptions {
986    let Some(source) = source else {
987        return destination;
988    };
989
990    if let Some(timeout) = source.attempt_timeout() {
991        destination.set_attempt_timeout(*timeout);
992    }
993    if let Some(retry) = source.retry_policy() {
994        destination.set_retry_policy(retry.clone());
995    }
996    if let Some(backoff) = source.backoff_policy() {
997        destination.set_backoff_policy(backoff.clone());
998    }
999    if let Some(src_headers) = source.get_extension::<http::HeaderMap>() {
1000        let mut dest_headers = destination
1001            .get_extension::<http::HeaderMap>()
1002            .cloned()
1003            .unwrap_or_default();
1004        for (name, value) in src_headers.iter() {
1005            dest_headers.insert(name.clone(), value.clone());
1006        }
1007        destination = destination.insert_extension(dest_headers);
1008    }
1009    destination
1010}
1011
1012/// Helper macro to execute a streaming SQL or streaming read RPC with retry logic.
1013macro_rules! execute_stream_with_retry {
1014    ($self:expr, $request:ident, $gax_options:ident, $rpc_method:ident, $operation_variant:path) => {{
1015        let stream = match $self
1016            .client
1017            .spanner
1018            .$rpc_method($request.clone(), $gax_options.clone(), $self.channel_hint)
1019            .send()
1020            .await
1021        {
1022            Ok(s) => s,
1023            Err(e) => {
1024                if is_aborted(&e) {
1025                    return Err(e);
1026                }
1027                if $self
1028                    .begin_explicitly_if_not_started($gax_options.clone(), true, None)
1029                    .await?
1030                {
1031                    $request.transaction = Some($self.transaction_selector.selector().await?);
1032                    $self
1033                        .client
1034                        .spanner
1035                        .$rpc_method($request.clone(), $gax_options.clone(), $self.channel_hint)
1036                        .send()
1037                        .await?
1038                } else {
1039                    return Err(e);
1040                }
1041            }
1042        };
1043
1044        ResultSet::create(ResultSetParams {
1045            stream,
1046            transaction_selector: Some($self.transaction_selector.clone()),
1047            precommit_token_tracker: $self.precommit_token_tracker.clone(),
1048            client: $self.client.clone(),
1049            session_name: $self.session_name.clone(),
1050            transaction_tag: $self.transaction_tag.clone(),
1051            operation: $operation_variant($request),
1052            channel_hint: $self.channel_hint,
1053            gax_options: $gax_options,
1054        })
1055        .await
1056    }};
1057}
1058
1059impl ReadContext {
1060    pub(crate) async fn execute_query<T: Into<Statement>>(
1061        &self,
1062        statement: T,
1063    ) -> crate::Result<ResultSet> {
1064        let statement = statement.into();
1065        let gax_options = statement.gax_options().clone();
1066        let mut request = statement
1067            .into_request()
1068            .set_session(self.session_name.clone())
1069            .set_transaction(self.transaction_selector.selector().await?);
1070        request.request_options = self.amend_request_options(request.request_options);
1071
1072        execute_stream_with_retry!(
1073            self,
1074            request,
1075            gax_options,
1076            execute_streaming_sql,
1077            StreamOperation::Query
1078        )
1079    }
1080
1081    pub(crate) async fn execute_read<T: Into<crate::read::ReadRequest>>(
1082        &self,
1083        read: T,
1084    ) -> crate::Result<ResultSet> {
1085        let read = read.into();
1086        let gax_options = read.gax_options.clone();
1087        let mut request = read
1088            .into_request()
1089            .set_session(self.session_name.clone())
1090            .set_transaction(self.transaction_selector.selector().await?);
1091        request.request_options = self.amend_request_options(request.request_options);
1092
1093        execute_stream_with_retry!(
1094            self,
1095            request,
1096            gax_options,
1097            streaming_read,
1098            StreamOperation::Read
1099        )
1100    }
1101}
1102
1103#[cfg(test)]
1104pub(crate) mod tests {
1105    use super::*;
1106    use crate::result_set::tests::adapt;
1107    use crate::result_set::tests::string_val;
1108    use crate::statement::Statement;
1109    use crate::value::Value;
1110    use gaxi::grpc::tonic::{self, Code, Response, Status};
1111    use google_cloud_gax::error::rpc::Code as GaxCode;
1112    use google_cloud_gax::exponential_backoff::ExponentialBackoff;
1113    use google_cloud_gax::retry_policy::NeverRetry;
1114    use google_cloud_test_macros::tokio_test_no_panics;
1115    use http::{HeaderMap, HeaderName, HeaderValue};
1116    use mock_v1::transaction_selector::Selector;
1117    use spanner_grpc_mock::MockSpanner;
1118    use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1119    use std::sync::mpsc::channel as std_channel;
1120    use std::sync::{Arc, Mutex as StdMutex};
1121    use tokio::sync::oneshot::channel as oneshot_channel;
1122    use tokio::sync::{Barrier, Mutex, Notify, mpsc};
1123
1124    #[test]
1125    fn auto_traits() {
1126        static_assertions::assert_impl_all!(SingleUseReadOnlyTransactionBuilder: Send, Sync);
1127        static_assertions::assert_impl_all!(SingleUseReadOnlyTransaction: Send, Sync, std::fmt::Debug);
1128        static_assertions::assert_impl_all!(MultiUseReadOnlyTransactionBuilder: Send, Sync);
1129        static_assertions::assert_impl_all!(MultiUseReadOnlyTransaction: Send, Sync, std::fmt::Debug);
1130        static_assertions::assert_impl_all!(ReadContext: Send, Sync, std::fmt::Debug);
1131    }
1132
1133    pub(crate) fn create_session_mock() -> spanner_grpc_mock::MockSpanner {
1134        let mut mock = spanner_grpc_mock::MockSpanner::new();
1135        mock.expect_create_session().once().returning(|_| {
1136            Ok(Response::new(mock_v1::Session {
1137                name: "projects/p/instances/i/databases/d/sessions/123".to_string(),
1138                ..Default::default()
1139            }))
1140        });
1141        mock
1142    }
1143
1144    fn setup_select1() -> spanner_grpc_mock::google::spanner::v1::PartialResultSet {
1145        spanner_grpc_mock::google::spanner::v1::PartialResultSet {
1146            metadata: Some(spanner_grpc_mock::google::spanner::v1::ResultSetMetadata {
1147                row_type: Some(spanner_grpc_mock::google::spanner::v1::StructType {
1148                    fields: vec![Default::default()],
1149                }),
1150                ..Default::default()
1151            }),
1152            values: vec![prost_types::Value {
1153                kind: Some(prost_types::value::Kind::StringValue("1".to_string())),
1154            }],
1155            last: true,
1156            ..Default::default()
1157        }
1158    }
1159
1160    pub(crate) async fn setup_db_client(
1161        mock: spanner_grpc_mock::MockSpanner,
1162    ) -> (DatabaseClient, tokio::task::JoinHandle<()>) {
1163        use crate::client::Spanner;
1164        use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
1165        let (address, server) = spanner_grpc_mock::start("0.0.0.0:0", mock)
1166            .await
1167            .expect("Failed to start mock server");
1168
1169        let spanner = Spanner::builder()
1170            .with_endpoint(address)
1171            .with_credentials(Anonymous::new().build())
1172            .build()
1173            .await
1174            .expect("Failed to build client");
1175
1176        let db_client = spanner
1177            .database_client("projects/p/instances/i/databases/d")
1178            .build()
1179            .await
1180            .expect("Failed to create DatabaseClient");
1181
1182        (db_client, server)
1183    }
1184
1185    #[tokio_test_no_panics]
1186    async fn single_use_builder() {
1187        let mock = create_session_mock();
1188
1189        let (db_client, _server) = setup_db_client(mock).await;
1190
1191        let tx = db_client.single_use().build();
1192        let selector = tx
1193            .context
1194            .transaction_selector
1195            .selector()
1196            .await
1197            .expect("Failed to get selector");
1198        let ro = selector
1199            .single_use()
1200            .expect("Expected SingleUse selector")
1201            .read_only()
1202            .expect("Expected ReadOnly mode");
1203        assert_eq!(
1204            ro.timestamp_bound,
1205            Some(crate::model::transaction_options::read_only::TimestampBound::Strong(true))
1206        );
1207
1208        let tx2 = db_client
1209            .single_use()
1210            .set_timestamp_bound(crate::timestamp_bound::TimestampBound::max_staleness(
1211                std::time::Duration::from_secs(10),
1212            ))
1213            .build();
1214        let selector = tx2
1215            .context
1216            .transaction_selector
1217            .selector()
1218            .await
1219            .expect("Failed to get selector");
1220        let ro2 = selector
1221            .single_use()
1222            .expect("Expected SingleUse selector")
1223            .read_only()
1224            .expect("Expected ReadOnly mode");
1225        assert_eq!(
1226            ro2.timestamp_bound,
1227            Some(
1228                crate::model::transaction_options::read_only::TimestampBound::MaxStaleness(
1229                    Box::new(wkt::Duration::new(10, 0).expect("failed to create Duration"))
1230                )
1231            )
1232        );
1233    }
1234
1235    #[tokio_test_no_panics]
1236    async fn execute_single_query() {
1237        use super::super::result_set::tests::string_val;
1238        use crate::statement::Statement;
1239        use crate::value::Value;
1240
1241        let mut mock = create_session_mock();
1242
1243        mock.expect_execute_streaming_sql().once().returning(|req| {
1244            let req = req.into_inner();
1245            assert_eq!(
1246                req.session,
1247                "projects/p/instances/i/databases/d/sessions/123"
1248            );
1249            assert_eq!(req.sql, "SELECT 1");
1250
1251            Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1252                setup_select1(),
1253            )])))
1254        });
1255
1256        let (db_client, _server) = setup_db_client(mock).await;
1257
1258        let tx = db_client.single_use().build();
1259        let mut rs = tx
1260            .execute_query(Statement::builder("SELECT 1").build())
1261            .await
1262            .expect("Failed to execute query");
1263
1264        let row = rs.next().await.expect("has row").expect("has valid row");
1265        assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1266        let result = rs.next().await;
1267        assert!(result.is_none(), "expected None, got {result:?}");
1268    }
1269
1270    #[tokio_test_no_panics]
1271    async fn execute_multi_query() {
1272        use super::super::result_set::tests::string_val;
1273        use crate::statement::Statement;
1274        use crate::value::Value;
1275        use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1276
1277        let mut mock = create_session_mock();
1278
1279        mock.expect_begin_transaction().once().returning(|req| {
1280            let req = req.into_inner();
1281            assert_eq!(
1282                req.session,
1283                "projects/p/instances/i/databases/d/sessions/123"
1284            );
1285            Ok(tonic::Response::new(mock_v1::Transaction {
1286                id: vec![1, 2, 3],
1287                // prost_types::Timestamp fields need to be explicitly set because default is 0 for both
1288                read_timestamp: Some(prost_types::Timestamp {
1289                    seconds: 123456789,
1290                    nanos: 0,
1291                }),
1292                ..Default::default()
1293            }))
1294        });
1295
1296        mock.expect_execute_streaming_sql()
1297            .times(2)
1298            .returning(|req| {
1299                let req = req.into_inner();
1300                assert_eq!(
1301                    req.session,
1302                    "projects/p/instances/i/databases/d/sessions/123"
1303                );
1304                assert_eq!(
1305                    req.transaction
1306                        .expect("transaction should be present")
1307                        .selector
1308                        .expect("selector should be present"),
1309                    mock_v1::transaction_selector::Selector::Id(vec![1, 2, 3])
1310                );
1311
1312                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1313                    setup_select1(),
1314                )])))
1315            });
1316
1317        let (db_client, _server) = setup_db_client(mock).await;
1318
1319        let tx = db_client
1320            .read_only_transaction()
1321            .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
1322            .build()
1323            .await
1324            .expect("Failed to start tx");
1325        assert_eq!(
1326            tx.read_timestamp()
1327                .expect("expected read timestamp")
1328                .seconds(),
1329            123456789
1330        );
1331
1332        for _ in 0..2 {
1333            let mut rs = tx
1334                .execute_query(Statement::builder("SELECT 1").build())
1335                .await
1336                .expect("Failed to execute query");
1337
1338            let row = rs.next().await.expect("has row").expect("has valid row");
1339            assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1340
1341            let result = rs.next().await;
1342            assert!(result.is_none(), "expected None, got {result:?}");
1343        }
1344    }
1345
1346    #[tokio_test_no_panics]
1347    async fn execute_multi_query_inline_begin() -> anyhow::Result<()> {
1348        use super::super::result_set::tests::string_val;
1349        use crate::statement::Statement;
1350        use crate::value::Value;
1351        use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1352
1353        let mut mock = create_session_mock();
1354
1355        // No explicit begin_transaction should be called.
1356        mock.expect_begin_transaction().never();
1357
1358        let mut seq = mockall::Sequence::new();
1359
1360        mock.expect_execute_streaming_sql()
1361            .times(1)
1362            .in_sequence(&mut seq)
1363            .returning(move |req| {
1364                let req = req.into_inner();
1365                assert_eq!(
1366                    req.session,
1367                    "projects/p/instances/i/databases/d/sessions/123"
1368                );
1369
1370                // First call: Should have Selector::Begin
1371                match req.transaction.unwrap().selector.unwrap() {
1372                    mock_v1::transaction_selector::Selector::Begin(_) => {}
1373                    _ => panic!("Expected Selector::Begin"),
1374                }
1375                let mut rs = setup_select1();
1376                rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1377                    id: vec![4, 5, 6],
1378                    read_timestamp: Some(prost_types::Timestamp {
1379                        seconds: 987654321,
1380                        nanos: 0,
1381                    }),
1382                    ..Default::default()
1383                });
1384                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(rs)])))
1385            });
1386
1387        mock.expect_execute_streaming_sql()
1388            .times(1)
1389            .in_sequence(&mut seq)
1390            .returning(move |req| {
1391                let req = req.into_inner();
1392                // Second call: Should have Selector::Id using the ID returned in the first call
1393                match req.transaction.unwrap().selector.unwrap() {
1394                    mock_v1::transaction_selector::Selector::Id(id) => {
1395                        assert_eq!(id, vec![4, 5, 6]);
1396                    }
1397                    _ => panic!("Expected Selector::Id"),
1398                }
1399                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1400                    setup_select1(),
1401                )])))
1402            });
1403
1404        let (db_client, _server) = setup_db_client(mock).await;
1405
1406        let tx = db_client
1407            .read_only_transaction()
1408            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1409            .build()
1410            .await?;
1411
1412        // The read timestamp is not available until the first query is executed.
1413        assert!(tx.read_timestamp().is_none());
1414
1415        for i in 0..2 {
1416            let mut rs = tx
1417                .execute_query(Statement::builder("SELECT 1").build())
1418                .await?;
1419
1420            let row = rs.next().await.expect("Expected a row")?;
1421            assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1422
1423            let result = rs.next().await;
1424            assert!(result.is_none(), "Expected None, got {result:?}");
1425
1426            if i == 0 {
1427                // Read timestamp becomes available.
1428                assert_eq!(
1429                    tx.read_timestamp()
1430                        .expect("Expected read timestamp")
1431                        .seconds(),
1432                    987654321
1433                );
1434            }
1435        }
1436
1437        Ok(())
1438    }
1439
1440    #[tokio_test_no_panics]
1441    async fn execute_single_read() {
1442        use super::super::result_set::tests::string_val;
1443        use crate::key::KeySet;
1444        use crate::read::ReadRequest;
1445        use crate::value::Value;
1446
1447        let mut mock = create_session_mock();
1448
1449        mock.expect_streaming_read().once().returning(|req| {
1450            let req = req.into_inner();
1451            assert_eq!(
1452                req.session,
1453                "projects/p/instances/i/databases/d/sessions/123"
1454            );
1455            assert_eq!(req.table, "Users");
1456            assert_eq!(req.columns, vec!["Id".to_string(), "Name".to_string()]);
1457
1458            Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1459                setup_select1(),
1460            )])))
1461        });
1462
1463        let (db_client, _server) = setup_db_client(mock).await;
1464
1465        let tx = db_client.single_use().build();
1466        let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1467            .with_keys(KeySet::all())
1468            .build();
1469        let mut rs = tx.execute_read(read).await.expect("Failed to execute read");
1470
1471        let row = rs.next().await.expect("has row").expect("has valid row");
1472        assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1473        let result = rs.next().await;
1474        assert!(result.is_none(), "expected None, got {result:?}");
1475    }
1476
1477    #[tokio_test_no_panics]
1478    async fn execute_multi_read() -> anyhow::Result<()> {
1479        use super::super::result_set::tests::string_val;
1480        use crate::key::KeySet;
1481        use crate::read::ReadRequest;
1482        use crate::value::Value;
1483        use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1484
1485        let mut mock = create_session_mock();
1486
1487        // No explicit begin_transaction should be called.
1488        mock.expect_begin_transaction().never();
1489
1490        let mut seq = mockall::Sequence::new();
1491
1492        mock.expect_streaming_read()
1493            .times(1)
1494            .in_sequence(&mut seq)
1495            .returning(move |req| {
1496                let req = req.into_inner();
1497                assert_eq!(
1498                    req.session,
1499                    "projects/p/instances/i/databases/d/sessions/123"
1500                );
1501
1502                // First call: Should have Selector::Begin
1503                match req.transaction.unwrap().selector.unwrap() {
1504                    mock_v1::transaction_selector::Selector::Begin(_) => {}
1505                    _ => panic!("Expected Selector::Begin"),
1506                }
1507                let mut rs = setup_select1();
1508                rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1509                    id: vec![4, 5, 6],
1510                    read_timestamp: Some(prost_types::Timestamp {
1511                        seconds: 987654321,
1512                        nanos: 0,
1513                    }),
1514                    ..Default::default()
1515                });
1516                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(rs)])))
1517            });
1518
1519        mock.expect_streaming_read()
1520            .times(1)
1521            .in_sequence(&mut seq)
1522            .returning(move |req| {
1523                let req = req.into_inner();
1524                // Second call: Should have Selector::Id using the ID returned in the first call
1525                match req.transaction.unwrap().selector.unwrap() {
1526                    mock_v1::transaction_selector::Selector::Id(id) => {
1527                        assert_eq!(id, vec![4, 5, 6]);
1528                    }
1529                    _ => panic!("Expected Selector::Id"),
1530                }
1531                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1532                    setup_select1(),
1533                )])))
1534            });
1535
1536        let (db_client, _server) = setup_db_client(mock).await;
1537
1538        let tx = db_client
1539            .read_only_transaction()
1540            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1541            .build()
1542            .await?;
1543
1544        // The read timestamp is not available until the first query is executed.
1545        assert!(tx.read_timestamp().is_none());
1546
1547        for i in 0..2 {
1548            let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1549                .with_keys(KeySet::all())
1550                .build();
1551            let mut rs = tx.execute_read(read).await?;
1552
1553            let row = rs.next().await.expect("Expected a row")?;
1554            assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1555
1556            let result = rs.next().await;
1557            assert!(result.is_none(), "Expected None, got {result:?}");
1558
1559            if i == 0 {
1560                // Read timestamp becomes available.
1561                assert_eq!(
1562                    tx.read_timestamp()
1563                        .expect("Expected read timestamp")
1564                        .seconds(),
1565                    987654321
1566                );
1567            }
1568        }
1569
1570        Ok(())
1571    }
1572
1573    #[tokio_test_no_panics]
1574    async fn inline_begin_failure_retry_success() -> anyhow::Result<()> {
1575        use crate::value::Value;
1576        use gaxi::grpc::tonic::Status;
1577        use tonic::Response;
1578
1579        let mut mock = create_session_mock();
1580        let mut seq = mockall::Sequence::new();
1581
1582        // 1. Initial query fails
1583        mock.expect_execute_streaming_sql()
1584            .times(1)
1585            .in_sequence(&mut seq)
1586            .returning(|_| Err(Status::internal("Internal error")));
1587
1588        // 2. Explicit begin transaction succeeds
1589        mock.expect_begin_transaction()
1590            .times(1)
1591            .in_sequence(&mut seq)
1592            .returning(|req| {
1593                let req = req.into_inner();
1594                assert_eq!(
1595                    req.session,
1596                    "projects/p/instances/i/databases/d/sessions/123"
1597                );
1598                // Return a transaction with ID
1599                Ok(Response::new(mock_v1::Transaction {
1600                    id: vec![7, 8, 9],
1601                    read_timestamp: Some(prost_types::Timestamp {
1602                        seconds: 123456789,
1603                        nanos: 0,
1604                    }),
1605                    ..Default::default()
1606                }))
1607            });
1608
1609        // 3. Retry of the query succeeds
1610        mock.expect_execute_streaming_sql()
1611            .times(1)
1612            .in_sequence(&mut seq)
1613            .returning(|req| {
1614                let req = req.into_inner();
1615                // Ensure it uses the new transaction ID
1616                match req.transaction.unwrap().selector.unwrap() {
1617                    mock_v1::transaction_selector::Selector::Id(id) => {
1618                        assert_eq!(id, vec![7, 8, 9]);
1619                    }
1620                    _ => panic!("Expected Selector::Id"),
1621                }
1622                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1623                    setup_select1(),
1624                )])))
1625            });
1626
1627        let (db_client, _server) = setup_db_client(mock).await;
1628        let tx = db_client
1629            .read_only_transaction()
1630            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1631            .build()
1632            .await?;
1633
1634        let mut rs = tx
1635            .execute_query(Statement::builder("SELECT 1").build())
1636            .await?;
1637
1638        let row = rs
1639            .next()
1640            .await
1641            .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??;
1642        assert_eq!(
1643            row.raw_values(),
1644            [Value(string_val("1"))],
1645            "The parsed row value safely matched the underlying stream chunk"
1646        );
1647
1648        Ok(())
1649    }
1650
1651    #[tokio_test_no_panics]
1652    async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> {
1653        use gaxi::grpc::tonic::Status;
1654        use tonic::Response;
1655
1656        let mut mock = create_session_mock();
1657        let mut seq = mockall::Sequence::new();
1658
1659        // 1. Initial query fails
1660        mock.expect_execute_streaming_sql()
1661            .times(1)
1662            .in_sequence(&mut seq)
1663            .returning(|_| Err(Status::internal("Internal error first")));
1664
1665        // 2. Explicit begin transaction succeeds
1666        mock.expect_begin_transaction()
1667            .times(1)
1668            .in_sequence(&mut seq)
1669            .returning(|_| {
1670                Ok(Response::new(mock_v1::Transaction {
1671                    id: vec![7, 8, 9],
1672                    read_timestamp: Some(prost_types::Timestamp {
1673                        seconds: 123456789,
1674                        nanos: 0,
1675                    }),
1676                    ..Default::default()
1677                }))
1678            });
1679
1680        // 3. Retry of the query fails again
1681        mock.expect_execute_streaming_sql()
1682            .times(1)
1683            .in_sequence(&mut seq)
1684            .returning(|_| Err(Status::internal("Internal error second")));
1685
1686        let (db_client, _server) = setup_db_client(mock).await;
1687        let tx = db_client
1688            .read_only_transaction()
1689            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1690            .build()
1691            .await?;
1692
1693        let rs_result = tx
1694            .execute_query(Statement::builder("SELECT 1").build())
1695            .await;
1696
1697        assert!(
1698            rs_result.is_err(),
1699            "The failed execution bubbled upwards securely"
1700        );
1701        let err_str = rs_result.unwrap_err().to_string();
1702        assert!(
1703            err_str.contains("Internal error second"),
1704            "Secondary error message accurately propagates: {}",
1705            err_str
1706        );
1707
1708        Ok(())
1709    }
1710
1711    #[tokio_test_no_panics]
1712    async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> {
1713        use gaxi::grpc::tonic::Status;
1714
1715        let mut mock = create_session_mock();
1716        let mut seq = mockall::Sequence::new();
1717
1718        // 1. Initial query fails
1719        mock.expect_execute_streaming_sql()
1720            .times(1)
1721            .in_sequence(&mut seq)
1722            .returning(|_| Err(Status::internal("Internal error query")));
1723
1724        // 2. Explicit begin transaction fails
1725        mock.expect_begin_transaction()
1726            .times(1)
1727            .in_sequence(&mut seq)
1728            .returning(|_| Err(Status::internal("Internal error begin tx")));
1729
1730        let (db_client, _server) = setup_db_client(mock).await;
1731        let tx = db_client
1732            .read_only_transaction()
1733            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1734            .build()
1735            .await?;
1736
1737        let rs_result = tx
1738            .execute_query(Statement::builder("SELECT 1").build())
1739            .await;
1740
1741        assert!(
1742            rs_result.is_err(),
1743            "The explicitly errored fallback boot securely propagated outwards"
1744        );
1745        let err_str = rs_result.unwrap_err().to_string();
1746        assert!(
1747            err_str.contains("Internal error begin tx"),
1748            "Natively propagated specific BeginTx bounds: {}",
1749            err_str
1750        );
1751
1752        Ok(())
1753    }
1754
1755    #[tokio_test_no_panics]
1756    async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> {
1757        use crate::key::KeySet;
1758        use crate::read::ReadRequest;
1759        use crate::value::Value;
1760        use gaxi::grpc::tonic::Status;
1761        use tonic::Response;
1762
1763        let mut mock = create_session_mock();
1764        let mut seq = mockall::Sequence::new();
1765
1766        // 1. Initial read fails
1767        mock.expect_streaming_read()
1768            .times(1)
1769            .in_sequence(&mut seq)
1770            .returning(|_| Err(Status::internal("Internal error")));
1771
1772        // 2. Explicit begin transaction succeeds
1773        mock.expect_begin_transaction()
1774            .times(1)
1775            .in_sequence(&mut seq)
1776            .returning(|_| {
1777                Ok(Response::new(mock_v1::Transaction {
1778                    id: vec![7, 8, 9],
1779                    read_timestamp: None,
1780                    ..Default::default()
1781                }))
1782            });
1783
1784        // 3. Retry of the read succeeds
1785        mock.expect_streaming_read()
1786            .times(1)
1787            .in_sequence(&mut seq)
1788            .returning(|req| {
1789                let req = req.into_inner();
1790                // Ensure it uses the new transaction ID
1791                match req.transaction.unwrap().selector.unwrap() {
1792                    mock_v1::transaction_selector::Selector::Id(id) => {
1793                        assert_eq!(id, vec![7, 8, 9]);
1794                    }
1795                    _ => panic!("Expected Selector::Id"),
1796                }
1797                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1798                    setup_select1(),
1799                )])))
1800            });
1801
1802        let (db_client, _server) = setup_db_client(mock).await;
1803        let tx = db_client
1804            .read_only_transaction()
1805            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1806            .build()
1807            .await?;
1808
1809        let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1810            .with_keys(KeySet::all())
1811            .build();
1812        let mut rs = tx.execute_read(read).await?;
1813
1814        let row = rs
1815            .next()
1816            .await
1817            .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??;
1818        assert_eq!(
1819            row.raw_values(),
1820            [Value(string_val("1"))],
1821            "The macro correctly unpacked read arrays seamlessly"
1822        );
1823
1824        Ok(())
1825    }
1826
1827    #[tokio_test_no_panics]
1828    async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> {
1829        use crate::statement::Statement;
1830        use gaxi::grpc::tonic::Status;
1831
1832        let mut mock = create_session_mock();
1833
1834        mock.expect_execute_streaming_sql()
1835            .times(1)
1836            .returning(|_| Err(Status::internal("Internal error single use query")));
1837
1838        mock.expect_begin_transaction().never();
1839
1840        let (db_client, _server) = setup_db_client(mock).await;
1841        // single_use creates a Fixed selector
1842        let tx = db_client.single_use().build();
1843
1844        let rs_result = tx
1845            .execute_query(Statement::builder("SELECT 1").build())
1846            .await;
1847
1848        assert!(rs_result.is_err());
1849        let err_str = rs_result.unwrap_err().to_string();
1850        assert!(err_str.contains("Internal error single use query"));
1851
1852        Ok(())
1853    }
1854
1855    #[tokio_test_no_panics]
1856    async fn inline_begin_already_started_query_send_error_returns_immediately()
1857    -> anyhow::Result<()> {
1858        use crate::statement::Statement;
1859        use gaxi::grpc::tonic::Status;
1860        use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1861
1862        let mut mock = create_session_mock();
1863        let mut seq = mockall::Sequence::new();
1864
1865        mock.expect_begin_transaction().never();
1866
1867        // 1. First query executes successfully and implicitly starts the transaction.
1868        mock.expect_execute_streaming_sql()
1869            .times(1)
1870            .in_sequence(&mut seq)
1871            .returning(move |_req| {
1872                let mut rs = setup_select1();
1873                rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1874                    id: vec![4, 5, 6],
1875                    read_timestamp: None,
1876                    ..Default::default()
1877                });
1878                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(rs)])))
1879            });
1880
1881        // 2. Second query fails immediately upon send()
1882        mock.expect_execute_streaming_sql()
1883            .times(1)
1884            .in_sequence(&mut seq)
1885            .returning(|_| Err(Status::internal("Internal error second query")));
1886
1887        let (db_client, _server) = setup_db_client(mock).await;
1888
1889        let tx = db_client
1890            .read_only_transaction()
1891            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1892            .build()
1893            .await?;
1894
1895        // Run first query (starts tx)
1896        let mut rs = tx
1897            .execute_query(Statement::builder("SELECT 1").build())
1898            .await?;
1899        let _ = rs.next().await.expect("has row")?;
1900
1901        // Run second query (fails)
1902        let rs_result = tx
1903            .execute_query(Statement::builder("SELECT 2").build())
1904            .await;
1905
1906        assert!(rs_result.is_err());
1907        let err_str = rs_result.unwrap_err().to_string();
1908        assert!(err_str.contains("Internal error second query"));
1909
1910        Ok(())
1911    }
1912
1913    #[tokio_test_no_panics]
1914    async fn execute_concurrent_queries_inline_begin() -> anyhow::Result<()> {
1915        let mut mock = create_session_mock();
1916        mock.expect_begin_transaction().never();
1917
1918        let mut seq = mockall::Sequence::new();
1919        let (tx_sender, rx_receiver) = mpsc::channel(1);
1920        let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
1921
1922        let task1_ready = Arc::new(Notify::new());
1923        let task1_ready_clone = Arc::clone(&task1_ready);
1924        let tasks_started = Arc::new(Barrier::new(3));
1925
1926        // 1. First query: should include Selector::Begin
1927        mock.expect_execute_streaming_sql()
1928            .times(1)
1929            .in_sequence(&mut seq)
1930            .returning(move |req| {
1931                task1_ready_clone.notify_one();
1932                let req = req.into_inner();
1933                match req.transaction.unwrap().selector.unwrap() {
1934                    Selector::Begin(_) => {}
1935                    _ => panic!("Expected Selector::Begin for first query"),
1936                }
1937                let rx = rx_receiver
1938                    .try_lock()
1939                    .expect("mutex poisoned")
1940                    .take()
1941                    .unwrap();
1942                Ok(Response::from(rx))
1943            });
1944
1945        // 2. The other queries: should include populated Selector::Id
1946        mock.expect_execute_streaming_sql()
1947            .times(2)
1948            .in_sequence(&mut seq)
1949            .returning(move |req| {
1950                let req = req.into_inner();
1951                match req.transaction.unwrap().selector.unwrap() {
1952                    Selector::Id(id) => {
1953                        assert_eq!(id, vec![4, 5, 6]);
1954                    }
1955                    _ => panic!("Expected Selector::Id for other queries"),
1956                }
1957
1958                let (tx, rx) = mpsc::channel(1);
1959                tx.try_send(Ok(setup_select1()))
1960                    .expect("send should succeed");
1961                Ok(Response::from(rx))
1962            });
1963
1964        let (db_client, _server) = setup_db_client(mock).await;
1965        let tx = db_client
1966            .read_only_transaction()
1967            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1968            .build()
1969            .await?;
1970        let tx = Arc::new(tx);
1971
1972        // Spawn 3 concurrent queries.
1973        // Task 1 launches first and executes the first query.
1974        let tx1 = Arc::clone(&tx);
1975        let handle1 = tokio::spawn(async move {
1976            let mut rs = tx1
1977                .execute_query(Statement::builder("SELECT 1").build())
1978                .await?;
1979            // Read the first result to get the transaction ID.
1980            let _ = rs.next().await;
1981            Ok::<_, crate::Error>(rs)
1982        });
1983
1984        // Wait for Task 1 to reach the mock server.
1985        task1_ready.notified().await;
1986
1987        let tx2 = Arc::clone(&tx);
1988        let tasks_started2 = Arc::clone(&tasks_started);
1989        let handle2 = tokio::spawn(async move {
1990            tasks_started2.wait().await;
1991            tx2.execute_query(Statement::builder("SELECT 1").build())
1992                .await
1993        });
1994
1995        let tx3 = Arc::clone(&tx);
1996        let tasks_started3 = Arc::clone(&tasks_started);
1997        let handle3 = tokio::spawn(async move {
1998            tasks_started3.wait().await;
1999            tx3.execute_query(Statement::builder("SELECT 1").build())
2000                .await
2001        });
2002
2003        // Ensure both Tasks 2 and 3 have reached the barrier before proceeding.
2004        tasks_started.wait().await;
2005
2006        // Flush the scheduler on this single-threaded executor.
2007        // This guarantees that Tasks 2 & 3 run until they both hit the internal
2008        // selector Notify latch and become suspended.
2009        tokio::task::yield_now().await;
2010
2011        // Provide the first result (including the transaction ID) to Task 1.
2012        // This transitions the selector to 'Started' and unblocks Tasks 2 and 3.
2013        let mut rs = setup_select1();
2014        rs.metadata
2015            .as_mut()
2016            .expect("metadata should be present")
2017            .transaction = Some(mock_v1::Transaction {
2018            id: vec![4, 5, 6],
2019            read_timestamp: Some(prost_types::Timestamp {
2020                seconds: 987654321,
2021                nanos: 0,
2022            }),
2023            ..Default::default()
2024        });
2025        tx_sender.send(Ok(rs)).await.expect("channel broken");
2026        drop(tx_sender);
2027
2028        // Collect all results
2029        let mut rs1 = handle1.await??;
2030        let mut rs2 = handle2.await??;
2031        let mut rs3 = handle3.await??;
2032
2033        // Verify the query results
2034        assert!(rs1.next().await.is_none());
2035
2036        let row2 = rs2.next().await.expect("Expected a row")?;
2037        assert_eq!(row2.raw_values(), [Value(string_val("1"))]);
2038        assert!(rs2.next().await.is_none());
2039
2040        let row3 = rs3.next().await.expect("Expected a row")?;
2041        assert_eq!(row3.raw_values(), [Value(string_val("1"))]);
2042        assert!(rs3.next().await.is_none());
2043
2044        // Verify that the read timestamp was populated
2045        assert_eq!(
2046            tx.read_timestamp()
2047                .expect("read timestamp should be populated")
2048                .seconds(),
2049            987654321
2050        );
2051
2052        Ok(())
2053    }
2054
2055    #[tokio_test_no_panics]
2056    async fn execute_concurrent_queries_inline_begin_failed_cascade() -> anyhow::Result<()> {
2057        let mut mock = create_session_mock();
2058        let mut seq = mockall::Sequence::new();
2059
2060        let (tx_sender, rx_receiver) = mpsc::channel(1);
2061        let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
2062
2063        let task1_ready = Arc::new(Notify::new());
2064        let task1_ready_clone = Arc::clone(&task1_ready);
2065        let tasks_started = Arc::new(Barrier::new(3));
2066
2067        // 1. Return a stream connected to tx_sender.
2068        // We will use tx_sender later in the test to inject a failed first chunk.
2069        mock.expect_execute_streaming_sql()
2070            .times(1)
2071            .in_sequence(&mut seq)
2072            .returning(move |_req| {
2073                task1_ready_clone.notify_one();
2074                let rx = rx_receiver
2075                    .try_lock()
2076                    .expect("mutex poisoned")
2077                    .take()
2078                    .expect("receiver should be present");
2079                Ok(tonic::Response::from(rx))
2080            });
2081
2082        // 2. Fallback BeginTransaction RPC fails
2083        mock.expect_begin_transaction()
2084            .times(1)
2085            .in_sequence(&mut seq)
2086            .returning(|_| {
2087                Err(gaxi::grpc::tonic::Status::internal(
2088                    "Fallback BeginTransaction failed",
2089                ))
2090            });
2091
2092        // The other queries will never be executed.
2093        mock.expect_execute_streaming_sql().times(0).returning(|_| {
2094            panic!("Other queries should not launch after failure to start the transaction")
2095        });
2096
2097        let (db_client, _server) = setup_db_client(mock).await;
2098        let tx = db_client
2099            .read_only_transaction()
2100            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2101            .build()
2102            .await?;
2103        let tx = Arc::new(tx);
2104
2105        // Spawn 3 concurrent queries.
2106        let tx1 = Arc::clone(&tx);
2107        let handle1 = tokio::spawn(async move {
2108            let mut rs = tx1
2109                .execute_query(Statement::builder("SELECT 1").build())
2110                .await?;
2111            rs.next().await.ok_or_else(|| {
2112                crate::error::internal_error("stream exhausted (this should never happen)")
2113            })??;
2114            Ok::<_, crate::Error>(rs)
2115        });
2116
2117        // Wait for Task 1 to reach the mock and transition the selector to Starting.
2118        task1_ready.notified().await;
2119
2120        let tx2 = Arc::clone(&tx);
2121        let tasks_started2 = Arc::clone(&tasks_started);
2122        let handle2 = tokio::spawn(async move {
2123            tasks_started2.wait().await;
2124            tx2.execute_query(Statement::builder("SELECT 1").build())
2125                .await
2126        });
2127
2128        let tx3 = Arc::clone(&tx);
2129        let tasks_started3 = Arc::clone(&tasks_started);
2130        let handle3 = tokio::spawn(async move {
2131            tasks_started3.wait().await;
2132            tx3.execute_query(Statement::builder("SELECT 1").build())
2133                .await
2134        });
2135
2136        // Ensure both Tasks 2 and 3 have reached the barrier before proceeding.
2137        tasks_started.wait().await;
2138
2139        // Flush the scheduler on this single-threaded executor.
2140        // This guarantees that Tasks 2 & 3 run until they both hit the internal
2141        // selector Notify latch and become suspended.
2142        tokio::task::yield_now().await;
2143
2144        // Push error to channel failing first query stream!
2145        tx_sender
2146            .send(Err(gaxi::grpc::tonic::Status::internal(
2147                "Mocked boot failed",
2148            )))
2149            .await
2150            .expect("channel broken");
2151        drop(tx_sender);
2152
2153        // Collect all results - all should fail with identical cached error!
2154        let err1 = handle1
2155            .await?
2156            .expect_err("task 1 should have failed")
2157            .to_string();
2158        let err2 = handle2
2159            .await?
2160            .expect_err("task 2 should have failed")
2161            .to_string();
2162        let err3 = handle3
2163            .await?
2164            .expect_err("task 3 should have failed")
2165            .to_string();
2166
2167        assert!(
2168            err1.contains("Fallback BeginTransaction failed"),
2169            "err1: {}",
2170            err1
2171        );
2172        assert!(
2173            err2.contains("Fallback BeginTransaction failed"),
2174            "err2: {}",
2175            err2
2176        );
2177        assert!(
2178            err3.contains("Fallback BeginTransaction failed"),
2179            "err3: {}",
2180            err3
2181        );
2182
2183        Ok(())
2184    }
2185
2186    #[tokio_test_no_panics]
2187    async fn execute_concurrent_queries_inline_begin_stream_restart_deadlock_prevention()
2188    -> crate::Result<()> {
2189        let mut mock = create_session_mock();
2190        mock.expect_begin_transaction().never();
2191
2192        let mut seq = mockall::Sequence::new();
2193
2194        let (tx_sender, rx_receiver) = mpsc::channel(1);
2195        let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
2196
2197        let task1_ready = Arc::new(Notify::new());
2198        let task1_ready_clone = Arc::clone(&task1_ready);
2199        let tasks_started = Arc::new(Barrier::new(3));
2200
2201        // 1. Task 1 initial query: Return a stream connected to tx_sender for error injection.
2202        mock.expect_execute_streaming_sql()
2203            .times(1)
2204            .in_sequence(&mut seq)
2205            .returning(move |req| {
2206                let req = req.into_inner();
2207                // Return a stream connected to tx_sender.
2208                // We will use tx_sender later in the test to inject a transient error.
2209                task1_ready_clone.notify_one();
2210                match req
2211                    .transaction
2212                    .expect("transaction should be present")
2213                    .selector
2214                    .expect("selector should be present")
2215                {
2216                    Selector::Begin(_) => {}
2217                    _ => panic!("Expected Selector::Begin for first query"),
2218                }
2219                let rx = rx_receiver
2220                    .try_lock()
2221                    .expect("mutex poisoned")
2222                    .take()
2223                    .expect("receiver should be present");
2224                Ok(Response::from(rx))
2225            });
2226
2227        // 2. Task 1 restart query: should include Selector::Begin, since
2228        // it failed with a transient error.
2229        mock.expect_execute_streaming_sql()
2230            .times(1)
2231            .in_sequence(&mut seq)
2232            .returning(move |req| {
2233                let req = req.into_inner();
2234                match req
2235                    .transaction
2236                    .expect("transaction should be present")
2237                    .selector
2238                    .expect("selector should be present")
2239                {
2240                    Selector::Begin(_) => {
2241                        let mut rs = setup_select1();
2242                        rs.metadata
2243                            .as_mut()
2244                            .expect("metadata should be present")
2245                            .transaction = Some(mock_v1::Transaction {
2246                            id: vec![4, 5, 6],
2247                            ..Default::default()
2248                        });
2249                        let (tx, rx) = mpsc::channel(1);
2250                        tx.try_send(Ok(rs)).expect("send should succeed");
2251                        Ok(Response::from(rx))
2252                    }
2253                    _ => panic!("Expected Selector::Begin for stream restart query"),
2254                }
2255            });
2256
2257        // 3. Tasks 2 & 3: should include populated Selector::Id
2258        mock.expect_execute_streaming_sql()
2259            .times(2)
2260            .in_sequence(&mut seq)
2261            .returning(move |req| {
2262                let req = req.into_inner();
2263                match req
2264                    .transaction
2265                    .expect("transaction should be present")
2266                    .selector
2267                    .expect("selector should be present")
2268                {
2269                    Selector::Id(id) => {
2270                        assert_eq!(id, vec![4, 5, 6]);
2271                        let (tx, rx) = mpsc::channel(1);
2272                        tx.try_send(Ok(setup_select1()))
2273                            .expect("send should succeed");
2274                        Ok(Response::from(rx))
2275                    }
2276                    _ => panic!("Expected Selector::Id for concurrent queries"),
2277                }
2278            });
2279
2280        let (db_client, _server) = setup_db_client(mock).await;
2281        let tx = db_client
2282            .read_only_transaction()
2283            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2284            .build()
2285            .await?;
2286        let tx = Arc::new(tx);
2287
2288        let handle1_tx = Arc::clone(&tx);
2289        let handle1 = tokio::spawn(async move {
2290            let mut rs = handle1_tx
2291                .execute_query(Statement::builder("SELECT 1").build())
2292                .await?;
2293            let _ = rs.next().await.ok_or_else(|| {
2294                crate::error::internal_error("stream exhausted (this should never happen)")
2295            })??;
2296            Ok::<_, crate::Error>(rs)
2297        });
2298
2299        // Wait for Task 1 to reach the mock and transition the selector to Starting.
2300        task1_ready.notified().await;
2301
2302        let handle2_tx = Arc::clone(&tx);
2303        let tasks_started2 = Arc::clone(&tasks_started);
2304        let handle2 = tokio::spawn(async move {
2305            tasks_started2.wait().await;
2306            let mut rs = handle2_tx
2307                .execute_query(Statement::builder("SELECT 1").build())
2308                .await?;
2309            let _ = rs.next().await.ok_or_else(|| {
2310                crate::error::internal_error("stream exhausted (this should never happen)")
2311            })??;
2312            Ok::<_, crate::Error>(rs)
2313        });
2314
2315        let handle3_tx = Arc::clone(&tx);
2316        let tasks_started3 = Arc::clone(&tasks_started);
2317        let handle3 = tokio::spawn(async move {
2318            tasks_started3.wait().await;
2319            let mut rs = handle3_tx
2320                .execute_query(Statement::builder("SELECT 1").build())
2321                .await?;
2322            let _ = rs.next().await.ok_or_else(|| {
2323                crate::error::internal_error("stream exhausted (this should never happen)")
2324            })??;
2325            Ok::<_, crate::Error>(rs)
2326        });
2327
2328        // Ensure both Tasks 2 and 3 have reached the barrier before proceeding.
2329        tasks_started.wait().await;
2330
2331        // Flush the scheduler on this single-threaded executor.
2332        // This guarantees that Tasks 2 & 3 run until they both hit the internal
2333        // selector Notify latch and become suspended.
2334        tokio::task::yield_now().await;
2335
2336        let grpc_status = Status::new(gaxi::grpc::tonic::Code::Unavailable, "transient error");
2337        tx_sender.send(Err(grpc_status)).await.expect("send failed");
2338        drop(tx_sender);
2339
2340        // Collect and verify all results.
2341        // handle.await returns Result<Result<ResultSet, Error>, JoinError>.
2342        // The first ? handles the potential JoinError (panic in the task),
2343        // and the second ? handles the Spanner error.
2344        let mut rs1 = handle1.await.expect("Task 1 panicked")?;
2345        let mut rs2 = handle2.await.expect("Task 2 panicked")?;
2346        let mut rs3 = handle3.await.expect("Task 3 panicked")?;
2347
2348        // Verify that all results have been exhausted.
2349        // (The tasks themselves already successfully read the first row).
2350        assert!(rs1.next().await.is_none(), "Stream 1 should be exhausted");
2351        assert!(rs2.next().await.is_none(), "Stream 2 should be exhausted");
2352        assert!(rs3.next().await.is_none(), "Stream 3 should be exhausted");
2353
2354        Ok(())
2355    }
2356
2357    #[tokio_test_no_panics]
2358    async fn execute_concurrent_queries_late_arrival_failure() -> anyhow::Result<()> {
2359        let mut mock = create_session_mock();
2360        let mut seq = mockall::Sequence::new();
2361
2362        // 1. Initial query fails.
2363        mock.expect_execute_streaming_sql()
2364            .times(1)
2365            .in_sequence(&mut seq)
2366            .returning(|req| {
2367                let req = req.into_inner();
2368                match req
2369                    .transaction
2370                    .expect("transaction should be present")
2371                    .selector
2372                    .expect("selector should be present")
2373                {
2374                    Selector::Begin(_) => {}
2375                    _ => panic!("Expected Selector::Begin for first query"),
2376                }
2377                Err(Status::internal("Initial inline-begin failed"))
2378            });
2379
2380        // 2. Fallback BeginTransaction RPC also fails.
2381        mock.expect_begin_transaction()
2382            .times(1)
2383            .in_sequence(&mut seq)
2384            .returning(|_| Err(Status::internal("Fallback BeginTransaction failed")));
2385
2386        // Any further attempts would panic because we haven't mocked them.
2387        mock.expect_execute_streaming_sql().never();
2388
2389        let (db_client, _server) = setup_db_client(mock).await;
2390        let tx = db_client
2391            .read_only_transaction()
2392            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2393            .build()
2394            .await?;
2395
2396        // First query: triggers the failure and transitions the state to Failed.
2397        let err1 = tx
2398            .execute_query(Statement::builder("SELECT 1").build())
2399            .await
2400            .expect_err("First query should fail");
2401        assert!(
2402            err1.to_string()
2403                .contains("Fallback BeginTransaction failed")
2404        );
2405
2406        // Second query: starts AFTER the failure is already cached.
2407        // It should immediately return the same error without invoking the mock server.
2408        let err2 = tx
2409            .execute_query(Statement::builder("SELECT 1").build())
2410            .await
2411            .expect_err("Late query should fail immediately");
2412        assert!(
2413            err2.to_string()
2414                .contains("Fallback BeginTransaction failed")
2415        );
2416
2417        Ok(())
2418    }
2419
2420    #[tokio_test_no_panics]
2421    async fn execute_concurrent_reads_inline_begin() -> anyhow::Result<()> {
2422        use crate::key::KeySet;
2423        use crate::read::ReadRequest;
2424        let mut mock = create_session_mock();
2425        mock.expect_begin_transaction().never();
2426
2427        let mut seq = mockall::Sequence::new();
2428        let (tx_sender, rx_receiver) = mpsc::channel(1);
2429        let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
2430
2431        let task1_ready = Arc::new(Notify::new());
2432        let task1_ready_clone = Arc::clone(&task1_ready);
2433        let tasks_started = Arc::new(Barrier::new(3));
2434
2435        // 1. First read: should include Selector::Begin
2436        mock.expect_streaming_read()
2437            .times(1)
2438            .in_sequence(&mut seq)
2439            .returning(move |req| {
2440                task1_ready_clone.notify_one();
2441                let req = req.into_inner();
2442                match req
2443                    .transaction
2444                    .expect("transaction should be present")
2445                    .selector
2446                    .expect("selector should be present")
2447                {
2448                    mock_v1::transaction_selector::Selector::Begin(_) => {}
2449                    _ => panic!("Expected Selector::Begin for first read"),
2450                }
2451
2452                let rx = rx_receiver
2453                    .try_lock()
2454                    .expect("mutex poisoned")
2455                    .take()
2456                    .expect("receiver should be present");
2457                Ok(Response::from(rx))
2458            });
2459
2460        // 2. The other reads: should include populated Selector::Id
2461        mock.expect_streaming_read()
2462            .times(2)
2463            .in_sequence(&mut seq)
2464            .returning(move |req| {
2465                let req = req.into_inner();
2466                match req
2467                    .transaction
2468                    .expect("transaction should be present")
2469                    .selector
2470                    .expect("selector should be present")
2471                {
2472                    mock_v1::transaction_selector::Selector::Id(id) => {
2473                        assert_eq!(id, vec![4, 5, 6]);
2474                    }
2475                    _ => panic!("Expected Selector::Id for other reads"),
2476                }
2477
2478                let (tx, rx) = mpsc::channel(1);
2479                tx.try_send(Ok(setup_select1()))
2480                    .expect("send should succeed");
2481                Ok(Response::from(rx))
2482            });
2483
2484        let (db_client, _server) = setup_db_client(mock).await;
2485        let tx = db_client
2486            .read_only_transaction()
2487            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2488            .build()
2489            .await?;
2490        let tx = Arc::new(tx);
2491
2492        let read_req = ReadRequest::builder("Table", vec!["Col"])
2493            .with_keys(KeySet::all())
2494            .build();
2495
2496        // Spawn 3 concurrent reads.
2497        let tx1 = Arc::clone(&tx);
2498        let read1 = read_req.clone();
2499        let handle1 = tokio::spawn(async move {
2500            let mut rs = tx1.execute_read(read1).await?;
2501            let _ = rs.next().await;
2502            Ok::<_, crate::Error>(rs)
2503        });
2504
2505        task1_ready.notified().await;
2506
2507        let tx2 = Arc::clone(&tx);
2508        let read2 = read_req.clone();
2509        let tasks_started2 = Arc::clone(&tasks_started);
2510        let handle2 = tokio::spawn(async move {
2511            tasks_started2.wait().await;
2512            let mut rs = tx2.execute_read(read2).await?;
2513            let _ = rs.next().await;
2514            Ok::<_, crate::Error>(rs)
2515        });
2516
2517        let tx3 = Arc::clone(&tx);
2518        let read3 = read_req.clone();
2519        let tasks_started3 = Arc::clone(&tasks_started);
2520        let handle3 = tokio::spawn(async move {
2521            tasks_started3.wait().await;
2522            let mut rs = tx3.execute_read(read3).await?;
2523            let _ = rs.next().await;
2524            Ok::<_, crate::Error>(rs)
2525        });
2526
2527        tasks_started.wait().await;
2528        tokio::task::yield_now().await;
2529
2530        // Provide the transaction ID.
2531        let mut rs = setup_select1();
2532        rs.metadata
2533            .as_mut()
2534            .expect("metadata should be present")
2535            .transaction = Some(mock_v1::Transaction {
2536            id: vec![4, 5, 6],
2537            ..Default::default()
2538        });
2539        tx_sender.send(Ok(rs)).await.expect("send failed");
2540        drop(tx_sender);
2541
2542        let mut rs1 = handle1.await.expect("Task 1 panicked")?;
2543        let mut rs2 = handle2.await.expect("Task 2 panicked")?;
2544        let mut rs3 = handle3.await.expect("Task 3 panicked")?;
2545
2546        assert!(rs1.next().await.is_none());
2547        assert!(rs2.next().await.is_none());
2548        assert!(rs3.next().await.is_none());
2549
2550        Ok(())
2551    }
2552
2553    #[tokio_test_no_panics]
2554    async fn execute_inline_begin_idempotent_update() -> anyhow::Result<()> {
2555        let (db_client, _server) = setup_db_client(create_session_mock()).await;
2556        // Access internal state for unit testing.
2557        let tx = db_client
2558            .read_only_transaction()
2559            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2560            .build()
2561            .await?;
2562
2563        let id1 = bytes::Bytes::from_static(b"tx1");
2564        let id2 = bytes::Bytes::from_static(b"tx2");
2565
2566        // 1. Initial update.
2567        tx.context.transaction_selector.update(id1.clone(), None)?;
2568        assert_eq!(
2569            tx.context
2570                .transaction_selector
2571                .selector()
2572                .await?
2573                .id()
2574                .expect("ID should be present"),
2575            &id1
2576        );
2577
2578        // 2. Redundant update with same ID should succeed, as Spanner returns the
2579        // transaction ID on all statements executed within that transaction when
2580        // using multiplexed sessions.
2581        tx.context.transaction_selector.update(id1.clone(), None)?;
2582
2583        // 3. Update with DIFFERENT ID after already Started should fail.
2584        let err2 = tx
2585            .context
2586            .transaction_selector
2587            .update(id2, None)
2588            .expect_err("Update after Started should fail");
2589        assert!(err2.to_string().contains("already Started or Failed"));
2590
2591        Ok(())
2592    }
2593
2594    #[tokio_test_no_panics]
2595    async fn execute_inline_begin_with_transient_failure() -> anyhow::Result<()> {
2596        let mut mock = create_session_mock();
2597        let mut seq = mockall::Sequence::new();
2598
2599        // 1. First attempt fails transiently.
2600        mock.expect_execute_streaming_sql()
2601            .times(1)
2602            .in_sequence(&mut seq)
2603            .returning(|_| Err(Status::new(Code::Unavailable, "Transient 1")));
2604
2605        // 2. Fallback BeginTransaction succeeds.
2606        mock.expect_begin_transaction()
2607            .times(1)
2608            .in_sequence(&mut seq)
2609            .returning(|_| {
2610                Ok(Response::new(mock_v1::Transaction {
2611                    id: vec![7, 8, 9],
2612                    ..Default::default()
2613                }))
2614            });
2615
2616        // 3. The manual retry of the query (which happens after explicit begin fallback).
2617        mock.expect_execute_streaming_sql()
2618            .times(1)
2619            .in_sequence(&mut seq)
2620            .returning(|_| {
2621                let (tx, rx) = mpsc::channel(1);
2622                tx.try_send(Ok(setup_select1()))
2623                    .expect("send should succeed");
2624                Ok(Response::from(rx))
2625            });
2626
2627        let (db_client, _server) = setup_db_client(mock).await;
2628        let tx = db_client
2629            .read_only_transaction()
2630            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2631            .build()
2632            .await?;
2633
2634        let mut rs = tx
2635            .execute_query(Statement::builder("SELECT 1").build())
2636            .await?;
2637        assert!(rs.next().await.is_some());
2638        assert!(rs.next().await.is_none());
2639
2640        Ok(())
2641    }
2642
2643    #[tokio_test_no_panics]
2644    async fn leader_aware_routing_query_in_read_only() -> anyhow::Result<()> {
2645        let mut mock = create_session_mock();
2646        mock.expect_execute_streaming_sql().once().returning(|req| {
2647            assert!(
2648                req.metadata()
2649                    .get("x-goog-spanner-route-to-leader")
2650                    .is_none()
2651            );
2652            let stream = adapt([Ok(mock_v1::PartialResultSet {
2653                metadata: Some(mock_v1::ResultSetMetadata {
2654                    row_type: Some(mock_v1::StructType { fields: vec![] }),
2655                    ..Default::default()
2656                }),
2657                ..Default::default()
2658            })]);
2659            Ok(tonic::Response::from(stream))
2660        });
2661
2662        let (db_client, _server) = setup_db_client(mock).await;
2663        let tx = db_client.single_use().build();
2664        let _rs = tx
2665            .execute_query(Statement::builder("SELECT 1").build())
2666            .await?;
2667        Ok(())
2668    }
2669
2670    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2671    async fn execute_concurrent_begin_explicitly_redundancy_prevention() -> anyhow::Result<()> {
2672        let (tx_rpc, rx_rpc) = std_channel();
2673        let (tx_started, rx_started) = oneshot_channel();
2674        let tx_started_mutex = StdMutex::new(Some(tx_started));
2675
2676        let mut mock = create_session_mock();
2677        let mut seq = mockall::Sequence::new();
2678
2679        // Task 1 (leader) fires the initial query inline.
2680        mock.expect_execute_streaming_sql()
2681            .once()
2682            .in_sequence(&mut seq)
2683            .returning(move |_req| {
2684                if let Some(tx) = tx_started_mutex.lock().expect("mutex poisoned").take() {
2685                    let _ = tx.send(());
2686                }
2687                rx_rpc.recv().expect("channel broken");
2688                let (tx, rx) = mpsc::channel(1);
2689                let metadata = mock_v1::ResultSetMetadata {
2690                    transaction: Some(mock_v1::Transaction {
2691                        id: vec![42],
2692                        ..Default::default()
2693                    }),
2694                    ..Default::default()
2695                };
2696                let prs = mock_v1::PartialResultSet {
2697                    metadata: Some(metadata),
2698                    ..Default::default()
2699                };
2700                tx.try_send(Ok(prs)).expect("send should succeed");
2701                Ok(tonic::Response::new(rx))
2702            });
2703
2704        // Task 2 (follower) arrives while Task 1 is in flight, suspends until Task 1 completes,
2705        // and then successfully fires its query using the newly extracted transaction ID (vec![42]).
2706        mock.expect_execute_streaming_sql()
2707            .once()
2708            .in_sequence(&mut seq)
2709            .returning(move |req| {
2710                let req = req.into_inner();
2711                assert_eq!(
2712                    req.transaction,
2713                    Some(mock_v1::TransactionSelector {
2714                        selector: Some(mock_v1::transaction_selector::Selector::Id(vec![42])),
2715                    })
2716                );
2717                let (tx, rx) = mpsc::channel(1);
2718                let metadata = mock_v1::ResultSetMetadata {
2719                    row_type: Some(mock_v1::StructType { fields: vec![] }),
2720                    ..Default::default()
2721                };
2722                let prs = mock_v1::PartialResultSet {
2723                    metadata: Some(metadata),
2724                    ..Default::default()
2725                };
2726                tx.try_send(Ok(prs)).expect("send should succeed");
2727                Ok(tonic::Response::new(rx))
2728            });
2729
2730        let (db_client, _server) = setup_db_client(mock).await;
2731
2732        let tx = Arc::new(
2733            db_client
2734                .read_only_transaction()
2735                .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2736                .build()
2737                .await?,
2738        );
2739
2740        let tx_leader = Arc::clone(&tx);
2741        let handle_leader = tokio::spawn(async move {
2742            let mut rs = tx_leader
2743                .execute_query(Statement::builder("SELECT 1").build())
2744                .await?;
2745            let _ = rs.next().await;
2746            Ok::<_, crate::Error>(())
2747        });
2748
2749        rx_started.await.expect("oneshot broken");
2750
2751        // Now the state is Starting and the leader is blocked inside execute_streaming_sql.
2752        // Task 2 executes a concurrent query, which must wait for the leader rather than firing a redundant RPC.
2753        let tx_follower = Arc::clone(&tx);
2754        let handle_follower = tokio::spawn(async move {
2755            let mut rs = tx_follower
2756                .execute_query(Statement::builder("SELECT 2").build())
2757                .await?;
2758            let _ = rs.next().await;
2759            Ok::<_, crate::Error>(())
2760        });
2761
2762        // Unblock the leader
2763        tx_rpc.send(()).expect("send failed");
2764
2765        handle_leader.await.expect("Task 1 panicked")?;
2766        handle_follower.await.expect("Task 2 panicked")?;
2767
2768        Ok(())
2769    }
2770
2771    #[tokio_test_no_panics]
2772    async fn execute_multi_query_redundant_transaction_id_explicit() -> anyhow::Result<()> {
2773        run_execute_multi_query_redundant_transaction_id(BeginTransactionOption::ExplicitBegin)
2774            .await
2775    }
2776
2777    #[tokio_test_no_panics]
2778    async fn execute_multi_query_redundant_transaction_id_inline() -> anyhow::Result<()> {
2779        run_execute_multi_query_redundant_transaction_id(BeginTransactionOption::InlineBegin).await
2780    }
2781
2782    async fn run_execute_multi_query_redundant_transaction_id(
2783        option: BeginTransactionOption,
2784    ) -> anyhow::Result<()> {
2785        let mut mock = create_session_mock();
2786        let mut sequence = mockall::Sequence::new();
2787
2788        if option == BeginTransactionOption::ExplicitBegin {
2789            mock.expect_begin_transaction()
2790                .once()
2791                .in_sequence(&mut sequence)
2792                .returning(|req| {
2793                    let req = req.into_inner();
2794                    assert_eq!(
2795                        req.session,
2796                        "projects/p/instances/i/databases/d/sessions/123"
2797                    );
2798                    Ok(tonic::Response::new(mock_v1::Transaction {
2799                        id: vec![4, 5, 6],
2800                        read_timestamp: Some(prost_types::Timestamp {
2801                            seconds: 123456789,
2802                            nanos: 0,
2803                        }),
2804                        ..Default::default()
2805                    }))
2806                });
2807
2808            mock.expect_execute_streaming_sql()
2809                .times(2)
2810                .returning(|req| {
2811                    let req = req.into_inner();
2812                    assert_eq!(
2813                        req.transaction
2814                            .expect("transaction should be present")
2815                            .selector
2816                            .expect("selector should be present"),
2817                        mock_v1::transaction_selector::Selector::Id(vec![4, 5, 6])
2818                    );
2819
2820                    let mut result_set_partial = setup_select1();
2821                    result_set_partial
2822                        .metadata
2823                        .as_mut()
2824                        .expect("metadata should be present")
2825                        .transaction = Some(mock_v1::Transaction {
2826                        id: vec![4, 5, 6],
2827                        read_timestamp: Some(prost_types::Timestamp {
2828                            seconds: 123456789,
2829                            nanos: 0,
2830                        }),
2831                        ..Default::default()
2832                    });
2833                    Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
2834                        result_set_partial,
2835                    )])))
2836                });
2837        } else {
2838            mock.expect_begin_transaction().never();
2839
2840            mock.expect_execute_streaming_sql()
2841                .times(1)
2842                .in_sequence(&mut sequence)
2843                .returning(|req| {
2844                    let req = req.into_inner();
2845                    assert_eq!(
2846                        req.session,
2847                        "projects/p/instances/i/databases/d/sessions/123"
2848                    );
2849
2850                    match req
2851                        .transaction
2852                        .expect("transaction should be present")
2853                        .selector
2854                        .expect("selector should be present")
2855                    {
2856                        mock_v1::transaction_selector::Selector::Begin(_) => {}
2857                        _ => panic!("Expected Selector::Begin"),
2858                    }
2859                    let mut result_set_partial = setup_select1();
2860                    result_set_partial
2861                        .metadata
2862                        .as_mut()
2863                        .expect("metadata should be present")
2864                        .transaction = Some(mock_v1::Transaction {
2865                        id: vec![4, 5, 6],
2866                        read_timestamp: Some(prost_types::Timestamp {
2867                            seconds: 987654321,
2868                            nanos: 0,
2869                        }),
2870                        ..Default::default()
2871                    });
2872                    Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
2873                        result_set_partial,
2874                    )])))
2875                });
2876
2877            mock.expect_execute_streaming_sql()
2878                .times(1)
2879                .in_sequence(&mut sequence)
2880                .returning(|req| {
2881                    let req = req.into_inner();
2882                    match req
2883                        .transaction
2884                        .expect("transaction should be present")
2885                        .selector
2886                        .expect("selector should be present")
2887                    {
2888                        mock_v1::transaction_selector::Selector::Id(id) => {
2889                            assert_eq!(id, vec![4, 5, 6]);
2890                        }
2891                        _ => panic!("Expected Selector::Id"),
2892                    }
2893                    let mut result_set_partial = setup_select1();
2894                    result_set_partial
2895                        .metadata
2896                        .as_mut()
2897                        .expect("metadata should be present")
2898                        .transaction = Some(mock_v1::Transaction {
2899                        id: vec![4, 5, 6],
2900                        read_timestamp: Some(prost_types::Timestamp {
2901                            seconds: 987654321,
2902                            nanos: 0,
2903                        }),
2904                        ..Default::default()
2905                    });
2906                    Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
2907                        result_set_partial,
2908                    )])))
2909                });
2910        }
2911
2912        let (db_client, _server) = setup_db_client(mock).await;
2913
2914        let transaction = db_client
2915            .read_only_transaction()
2916            .with_begin_transaction_option(option)
2917            .build()
2918            .await
2919            .expect("Failed to start transaction");
2920
2921        for _ in 0..2 {
2922            let mut result_set = transaction
2923                .execute_query(Statement::builder("SELECT 1").build())
2924                .await
2925                .expect("Failed to execute query");
2926
2927            let row = result_set
2928                .next()
2929                .await
2930                .expect("has row")
2931                .expect("has valid row");
2932            assert_eq!(row.raw_values(), [Value(string_val("1"))]);
2933
2934            let next_result = result_set.next().await;
2935            assert!(next_result.is_none(), "expected None, got {next_result:?}");
2936        }
2937
2938        Ok(())
2939    }
2940
2941    #[tokio_test_no_panics]
2942    async fn read_only_transaction_begin_with_never_retry() -> anyhow::Result<()> {
2943        let mut mock = MockSpanner::new();
2944        let mut sequence = mockall::Sequence::new();
2945
2946        mock.expect_begin_transaction()
2947            .once()
2948            .in_sequence(&mut sequence)
2949            .returning(|_| Err(tonic::Status::unavailable("transient error")));
2950
2951        mock.expect_create_session().returning(|_| {
2952            Ok(Response::new(mock_v1::Session {
2953                name: "session".to_string(),
2954                multiplexed: true,
2955                ..Default::default()
2956            }))
2957        });
2958
2959        let (db_client, _server) = setup_db_client(mock).await;
2960
2961        let res = db_client
2962            .read_only_transaction()
2963            .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
2964            .with_begin_retry_policy(NeverRetry)
2965            .build()
2966            .await;
2967
2968        assert!(res.is_err(), "should fail immediately without retry");
2969        let err = res.unwrap_err();
2970        assert_eq!(err.status().expect("status").code, GaxCode::Unavailable);
2971
2972        Ok(())
2973    }
2974
2975    #[tokio_test_no_panics]
2976    async fn read_only_transaction_lazy_begin_fallback_never_retry() -> anyhow::Result<()> {
2977        let mut mock = MockSpanner::new();
2978        let mut sequence = mockall::Sequence::new();
2979
2980        // 1. First query execution fails with Unavailable (transient error)
2981        mock.expect_execute_streaming_sql()
2982            .once()
2983            .in_sequence(&mut sequence)
2984            .returning(|_| Err(tonic::Status::unavailable("transient error")));
2985
2986        // 2. Fallback explicit BeginTransaction is executed exactly once and fails
2987        mock.expect_begin_transaction()
2988            .once()
2989            .in_sequence(&mut sequence)
2990            .returning(|_| Err(tonic::Status::unavailable("transient error")));
2991
2992        mock.expect_create_session().returning(|_| {
2993            Ok(Response::new(mock_v1::Session {
2994                name: "session".to_string(),
2995                multiplexed: true,
2996                ..Default::default()
2997            }))
2998        });
2999
3000        let (db_client, _server) = setup_db_client(mock).await;
3001
3002        let transaction = db_client
3003            .read_only_transaction()
3004            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
3005            .with_begin_retry_policy(NeverRetry)
3006            .build()
3007            .await?;
3008
3009        let stmt = Statement::builder("SELECT 1").build();
3010        let res = transaction.execute_query(stmt).await;
3011
3012        assert!(
3013            res.is_err(),
3014            "should fail immediately during fallback without retrying the fallback RPC"
3015        );
3016        let err = res.unwrap_err();
3017        assert_eq!(err.status().expect("status").code, GaxCode::Unavailable);
3018
3019        Ok(())
3020    }
3021
3022    #[tokio_test_no_panics]
3023    async fn read_only_transaction_begin_with_attempt_timeout() -> anyhow::Result<()> {
3024        let mut mock = MockSpanner::new();
3025        let mut sequence = mockall::Sequence::new();
3026
3027        mock.expect_begin_transaction()
3028            .once()
3029            .in_sequence(&mut sequence)
3030            .withf(|req| {
3031                let timeout_header = req.metadata().get("grpc-timeout");
3032                assert!(
3033                    timeout_header.is_some(),
3034                    "grpc-timeout header should be present"
3035                );
3036                let val = timeout_header.unwrap().to_str().unwrap();
3037                assert!(
3038                    val.contains("5000") || val.contains("5"),
3039                    "timeout header value '{}' should represent 5 seconds",
3040                    val
3041                );
3042                true
3043            })
3044            .returning(|_| {
3045                Ok(Response::new(mock_v1::Transaction {
3046                    id: vec![42],
3047                    ..Default::default()
3048                }))
3049            });
3050
3051        mock.expect_create_session().returning(|_| {
3052            Ok(Response::new(mock_v1::Session {
3053                name: "session".to_string(),
3054                multiplexed: true,
3055                ..Default::default()
3056            }))
3057        });
3058
3059        let (db_client, _server) = setup_db_client(mock).await;
3060
3061        let _transaction = db_client
3062            .read_only_transaction()
3063            .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
3064            .with_begin_attempt_timeout(std::time::Duration::from_secs(5))
3065            .build()
3066            .await?;
3067
3068        Ok(())
3069    }
3070
3071    #[tokio_test_no_panics]
3072    async fn read_only_transaction_builder_sets_gax_options() -> anyhow::Result<()> {
3073        let mut mock = MockSpanner::new();
3074        mock.expect_create_session().returning(|_| {
3075            Ok(Response::new(mock_v1::Session {
3076                name: "session".to_string(),
3077                multiplexed: true,
3078                ..Default::default()
3079            }))
3080        });
3081        let (db_client, _server) = setup_db_client(mock).await;
3082
3083        let builder = db_client
3084            .read_only_transaction()
3085            .with_begin_attempt_timeout(Duration::from_secs(5))
3086            .with_begin_retry_policy(NeverRetry)
3087            .with_begin_backoff_policy(ExponentialBackoff::default());
3088
3089        let gax = builder
3090            .begin_gax_options
3091            .as_ref()
3092            .expect("begin_gax_options missing");
3093        assert_eq!(*gax.attempt_timeout(), Some(Duration::from_secs(5)));
3094        assert!(gax.retry_policy().is_some());
3095        assert!(gax.backoff_policy().is_some());
3096
3097        Ok(())
3098    }
3099
3100    #[tokio_test_no_panics]
3101    async fn read_only_transaction_lazy_begin_fallback_uses_statement_options_when_unconfigured()
3102    -> anyhow::Result<()> {
3103        let mut mock = MockSpanner::new();
3104        let mut sequence = mockall::Sequence::new();
3105
3106        // 1. First query execution fails with Unavailable (transient error)
3107        mock.expect_execute_streaming_sql()
3108            .once()
3109            .in_sequence(&mut sequence)
3110            .returning(|_| Err(tonic::Status::unavailable("transient error")));
3111
3112        // 2. Fallback explicit BeginTransaction is executed. Since the transaction itself has no
3113        // custom options, it must inherit the statement options, which set attempt_timeout to 5 seconds.
3114        mock.expect_begin_transaction()
3115            .once()
3116            .in_sequence(&mut sequence)
3117            .withf(|req| {
3118                let timeout_header = req.metadata().get("grpc-timeout");
3119                assert!(
3120                    timeout_header.is_some(),
3121                    "grpc-timeout header should be present"
3122                );
3123                let val = timeout_header.unwrap().to_str().unwrap();
3124                assert!(
3125                    val.contains("5000") || val.contains("5"),
3126                    "timeout header value '{}' should represent 5 seconds",
3127                    val
3128                );
3129                true
3130            })
3131            .returning(|_| {
3132                Ok(Response::new(mock_v1::Transaction {
3133                    id: vec![42],
3134                    ..Default::default()
3135                }))
3136            });
3137
3138        // 3. Query is retried with the successfully obtained transaction ID, succeeding this time
3139        mock.expect_execute_streaming_sql()
3140            .once()
3141            .in_sequence(&mut sequence)
3142            .withf(|req| {
3143                matches!(
3144                    req.get_ref()
3145                        .transaction
3146                        .as_ref()
3147                        .and_then(|t| t.selector.as_ref()),
3148                    Some(mock_v1::transaction_selector::Selector::Id(id)) if id == &vec![42]
3149                )
3150            })
3151            .returning(|_| {
3152                let mut result_set_partial = setup_select1();
3153                result_set_partial
3154                    .metadata
3155                    .as_mut()
3156                    .expect("metadata should be present")
3157                    .transaction = Some(mock_v1::Transaction {
3158                    id: vec![42],
3159                    ..Default::default()
3160                });
3161                Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
3162                    result_set_partial,
3163                )])))
3164            });
3165
3166        mock.expect_create_session().returning(|_| {
3167            Ok(Response::new(mock_v1::Session {
3168                name: "session".to_string(),
3169                multiplexed: true,
3170                ..Default::default()
3171            }))
3172        });
3173
3174        let (db_client, _server) = setup_db_client(mock).await;
3175
3176        let transaction = db_client
3177            .read_only_transaction()
3178            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
3179            .build()
3180            .await?;
3181
3182        let mut stmt_opts = crate::RequestOptions::default();
3183        stmt_opts.set_attempt_timeout(Duration::from_secs(5));
3184        let stmt = Statement::builder("SELECT 1")
3185            .build()
3186            .with_gax_options(stmt_opts);
3187
3188        let mut rs = transaction.execute_query(stmt).await?;
3189        let row = rs.next().await.expect("has row")?;
3190        assert_eq!(row.raw_values(), [Value(string_val("1"))]);
3191
3192        Ok(())
3193    }
3194
3195    #[tokio_test_no_panics]
3196    async fn read_only_transaction_lazy_begin_fallback_merges_custom_options() -> anyhow::Result<()>
3197    {
3198        let mut mock = MockSpanner::new();
3199        let mut sequence = mockall::Sequence::new();
3200
3201        // 1. First query execution fails with Unavailable (transient error)
3202        mock.expect_execute_streaming_sql()
3203            .once()
3204            .in_sequence(&mut sequence)
3205            .returning(|_| Err(tonic::Status::unavailable("transient error")));
3206
3207        // 2. Fallback explicit BeginTransaction must have BOTH:
3208        // - attempt_timeout of 5 seconds (inherited from statement's options)
3209        // - retry_policy of NeverRetry (inherited from transaction's begin options)
3210        // If it did not merge correctly, the timeout header would be missing, or it would retry.
3211        mock.expect_begin_transaction()
3212            .once()
3213            .in_sequence(&mut sequence)
3214            .withf(|req| {
3215                let timeout_header = req.metadata().get("grpc-timeout");
3216                assert!(
3217                    timeout_header.is_some(),
3218                    "grpc-timeout header should be present"
3219                );
3220                let val = timeout_header.unwrap().to_str().unwrap();
3221                assert!(
3222                    val.contains("5000") || val.contains("5"),
3223                    "timeout header value '{}' should represent 5 seconds",
3224                    val
3225                );
3226                true
3227            })
3228            .returning(|_| Err(tonic::Status::unavailable("transient error")));
3229
3230        mock.expect_create_session().returning(|_| {
3231            Ok(Response::new(mock_v1::Session {
3232                name: "session".to_string(),
3233                multiplexed: true,
3234                ..Default::default()
3235            }))
3236        });
3237
3238        let (db_client, _server) = setup_db_client(mock).await;
3239
3240        let transaction = db_client
3241            .read_only_transaction()
3242            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
3243            .with_begin_retry_policy(NeverRetry)
3244            .build()
3245            .await?;
3246
3247        let mut stmt_opts = crate::RequestOptions::default();
3248        stmt_opts.set_attempt_timeout(Duration::from_secs(5));
3249        let stmt = Statement::builder("SELECT 1")
3250            .build()
3251            .with_gax_options(stmt_opts);
3252
3253        let res = transaction.execute_query(stmt).await;
3254
3255        assert!(
3256            res.is_err(),
3257            "should fail immediately because of NeverRetry"
3258        );
3259        let err = res.unwrap_err();
3260        assert_eq!(err.status().expect("status").code, GaxCode::Unavailable);
3261
3262        Ok(())
3263    }
3264
3265    #[test]
3266    fn test_merge_request_options() {
3267        // Case 1: Destination has values, source is empty (Destination preserved)
3268        let mut dest = crate::RequestOptions::default();
3269        dest.set_attempt_timeout(Duration::from_secs(2));
3270        dest.set_retry_policy(NeverRetry);
3271
3272        // Source is None (Destination preserved)
3273        let merged = merge_request_options(dest, None);
3274
3275        assert_eq!(*merged.attempt_timeout(), Some(Duration::from_secs(2)));
3276        assert!(merged.retry_policy().is_some());
3277
3278        // Case 2: Source has overriding values, destination is empty (Source overrides)
3279        let dest = crate::RequestOptions::default();
3280
3281        let mut source = crate::RequestOptions::default();
3282        source.set_attempt_timeout(Duration::from_secs(5));
3283        source.set_retry_policy(NeverRetry);
3284
3285        let merged = merge_request_options(dest, Some(&source));
3286
3287        assert_eq!(*merged.attempt_timeout(), Some(Duration::from_secs(5)));
3288        assert!(merged.retry_policy().is_some());
3289
3290        // Case 3: Both have distinct custom headers (Headers must merge/combine)
3291        let mut dest = crate::RequestOptions::default();
3292        let mut dest_headers = HeaderMap::new();
3293        dest_headers.insert(
3294            HeaderName::from_static("x-goog-spanner-route-to-leader"),
3295            HeaderValue::from_static("true"),
3296        );
3297        dest = dest.insert_extension(dest_headers);
3298
3299        let mut source = crate::RequestOptions::default();
3300        let mut src_headers = HeaderMap::new();
3301        src_headers.insert(
3302            HeaderName::from_static("x-custom-header"),
3303            HeaderValue::from_static("custom-value"),
3304        );
3305        source = source.insert_extension(src_headers);
3306
3307        let merged = merge_request_options(dest, Some(&source));
3308        let merged_headers = merged
3309            .get_extension::<HeaderMap>()
3310            .expect("HeaderMap missing");
3311
3312        assert_eq!(
3313            merged_headers
3314                .get("x-goog-spanner-route-to-leader")
3315                .unwrap()
3316                .to_str()
3317                .unwrap(),
3318            "true"
3319        );
3320        assert_eq!(
3321            merged_headers
3322                .get("x-custom-header")
3323                .unwrap()
3324                .to_str()
3325                .unwrap(),
3326            "custom-value"
3327        );
3328    }
3329}