Skip to main content

google_cloud_spanner/
batch_read_only_transaction.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::database_client::DatabaseClient;
16use crate::model::PartitionOptions;
17use crate::precommit::PrecommitTokenTracker;
18use crate::read_only_transaction::{
19    BeginTransactionOption, MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder,
20    ReadContextTransactionSelector,
21};
22use crate::result_set::{ResultSet, ResultSetParams, StreamOperation};
23use crate::statement::Statement;
24use crate::timestamp_bound::TimestampBound;
25use google_cloud_gax::backoff_policy::BackoffPolicyArg;
26use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
27use google_cloud_gax::retry_policy::RetryPolicyArg;
28use serde::{Deserialize, Serialize};
29use std::time::Duration;
30
31/// A builder for [BatchReadOnlyTransaction].
32///
33/// # Example
34/// ```
35/// # use google_cloud_spanner::client::Spanner;
36/// # use google_cloud_spanner::transaction::TimestampBound;
37/// # async fn build_tx(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
38/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
39/// let read_only_transaction = db_client.batch_read_only_transaction()
40///     .set_timestamp_bound(TimestampBound::strong())
41///     .build()
42///     .await?;
43/// # Ok(())
44/// # }
45/// ```
46pub struct BatchReadOnlyTransactionBuilder {
47    inner: MultiUseReadOnlyTransactionBuilder,
48}
49
50impl BatchReadOnlyTransactionBuilder {
51    pub(crate) fn new(client: DatabaseClient) -> Self {
52        Self {
53            inner: MultiUseReadOnlyTransactionBuilder::new(client)
54                .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin),
55        }
56    }
57
58    /// Sets the timestamp bound for the read-only transaction.
59    ///
60    /// # Example
61    /// ```
62    /// # use google_cloud_spanner::client::Spanner;
63    /// # use google_cloud_spanner::transaction::TimestampBound;
64    /// # async fn set_bound(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
65    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
66    /// let builder = db_client.batch_read_only_transaction().set_timestamp_bound(TimestampBound::strong());
67    /// # Ok(())
68    /// # }
69    /// ```
70    pub fn set_timestamp_bound(self, bound: TimestampBound) -> Self {
71        Self {
72            inner: self.inner.set_timestamp_bound(bound),
73        }
74    }
75
76    /// Builds the [BatchReadOnlyTransaction] and starts the transaction
77    /// by calling the `BeginTransaction` RPC.
78    ///
79    /// # Example
80    /// ```
81    /// # use google_cloud_spanner::client::Spanner;
82    /// # async fn build(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
83    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
84    /// let transaction = db_client.batch_read_only_transaction().build().await?;
85    /// # Ok(())
86    /// # }
87    /// ```
88    pub async fn build(self) -> crate::Result<BatchReadOnlyTransaction> {
89        let inner = self.inner.build().await?;
90        Ok(BatchReadOnlyTransaction { inner })
91    }
92}
93
94/// A read-only transaction that can be used to partition reads and queries
95/// and execute these in parallel across multiple workers.
96///
97/// # Example
98/// ```
99/// # use google_cloud_spanner::client::Spanner;
100/// # use google_cloud_spanner::statement::Statement;
101/// # use google_cloud_spanner::model::PartitionOptions;
102///
103/// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
104/// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
105/// let transaction = db_client.batch_read_only_transaction().build().await?;
106/// let stmt = Statement::builder("SELECT * FROM users WHERE id = @id")
107///     .add_param("id", &42)
108///     .build();
109/// let options = PartitionOptions::default()
110///     .set_max_partitions(10);
111/// let partitions = transaction.partition_query(stmt, options).await?;
112///
113/// // partitions can be sent to other workers for parallel execution
114/// # Ok(())
115/// # }
116/// ```
117#[derive(Debug)]
118pub struct BatchReadOnlyTransaction {
119    inner: MultiUseReadOnlyTransaction,
120}
121
122impl BatchReadOnlyTransaction {
123    /// Returns the read timestamp chosen for the transaction.
124    pub fn read_timestamp(&self) -> Option<wkt::Timestamp> {
125        self.inner.read_timestamp()
126    }
127
128    /// Creates a set of partitions that can be used to execute a query in parallel.
129    ///
130    /// # Example
131    /// ```
132    /// # use google_cloud_spanner::client::Spanner;
133    /// # use google_cloud_spanner::statement::Statement;
134    /// # use google_cloud_spanner::model::PartitionOptions;
135    /// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
136    /// let db = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
137    /// let transaction = db.batch_read_only_transaction().build().await?;
138    ///
139    /// let stmt = Statement::builder("SELECT * FROM users WHERE id = @id")
140    ///     .add_param("id", &42)
141    ///     .build();
142    /// let options = PartitionOptions::default()
143    ///     .set_max_partitions(10);
144    /// let partitions = transaction.partition_query(stmt, options).await?;
145    /// # Ok(())
146    /// # }
147    /// ```
148    pub async fn partition_query<T: Into<Statement>>(
149        &self,
150        statement: T,
151        options: PartitionOptions,
152    ) -> crate::Result<Vec<Partition>> {
153        let selector = self.inner.context.transaction_selector.selector().await?;
154        let statement = statement.into();
155        let request = statement
156            .clone()
157            .into_partition_query_request()
158            .set_session(self.inner.context.session_name.clone())
159            .set_transaction(selector.clone())
160            .set_partition_options(options);
161
162        let response = self
163            .inner
164            .context
165            .client
166            .spanner
167            .partition_query(
168                request,
169                crate::RequestOptions::default(),
170                self.inner.context.channel_hint,
171            )
172            .await?;
173
174        Ok(response
175            .partitions
176            .into_iter()
177            .map(|p| {
178                let mut req = statement.clone().into_request();
179                req.session = self.inner.context.session_name.clone();
180                req.transaction = Some(selector.clone());
181                req.partition_token = p.partition_token;
182
183                Partition {
184                    inner: PartitionedOperation::Query(req),
185                    gax_options: GaxRequestOptions::default(),
186                }
187            })
188            .collect())
189    }
190
191    /// Creates a set of partitions that can be used to execute a read in parallel.
192    ///
193    /// # Example
194    /// ```
195    /// # use google_cloud_spanner::client::Spanner;
196    /// # use google_cloud_spanner::key::KeySet;
197    /// # use google_cloud_spanner::read::ReadRequest;
198    /// # use google_cloud_spanner::model::PartitionOptions;
199    /// # async fn run(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
200    /// let db = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
201    /// let transaction = db.batch_read_only_transaction().build().await?;
202    ///
203    /// let read = ReadRequest::builder("users", vec!["id".to_string(), "name".to_string()])
204    ///     .with_keys(KeySet::all())
205    ///     .build();
206    /// let options = PartitionOptions::default()
207    ///     .set_max_partitions(10);
208    /// let partitions = transaction.partition_read(read, options).await?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub async fn partition_read<T: Into<crate::read::ReadRequest>>(
213        &self,
214        read: T,
215        options: PartitionOptions,
216    ) -> crate::Result<Vec<Partition>> {
217        let selector = self.inner.context.transaction_selector.selector().await?;
218        let read = read.into();
219        let request = read
220            .clone()
221            .into_partition_read_request()
222            .set_session(self.inner.context.session_name.clone())
223            .set_transaction(selector.clone())
224            .set_partition_options(options);
225
226        let response = self
227            .inner
228            .context
229            .client
230            .spanner
231            .partition_read(
232                request,
233                crate::RequestOptions::default(),
234                self.inner.context.channel_hint,
235            )
236            .await?;
237
238        Ok(response
239            .partitions
240            .into_iter()
241            .map(|p| {
242                let mut req = read.clone().into_request();
243                req.session = self.inner.context.session_name.clone();
244                req.transaction = Some(selector.clone());
245                req.partition_token = p.partition_token;
246
247                Partition {
248                    inner: PartitionedOperation::Read(req),
249                    gax_options: GaxRequestOptions::default(),
250                }
251            })
252            .collect())
253    }
254}
255
256/// Defines the segments of data to be read in a partitioned read or query.
257/// These partitions can be serialized and processed across several
258/// different machines or processes.
259#[derive(Clone, Debug, Serialize, Deserialize)]
260pub struct Partition {
261    pub(crate) inner: PartitionedOperation,
262    #[serde(skip)]
263    pub(crate) gax_options: GaxRequestOptions,
264}
265
266impl Partition {
267    /// Sets whether Data Boost is enabled for this partition.
268    ///
269    /// # Example
270    /// ```
271    /// # use google_cloud_spanner::client::Spanner;
272    /// # use google_cloud_spanner::statement::Statement;
273    /// # use google_cloud_spanner::model::PartitionOptions;
274    /// # async fn run_query(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
275    /// # let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
276    /// # let transaction = db_client.batch_read_only_transaction().build().await?;
277    /// # let partitions = transaction.partition_query(Statement::builder("SELECT * FROM Users").build(), PartitionOptions::default()).await?;
278    /// // On a worker receiving a partition, execute it with Data Boost:
279    /// let mut result_set = partitions[0].clone()
280    ///     .set_data_boost(true)
281    ///     .execute(&db_client)
282    ///     .await?;
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub fn set_data_boost(mut self, enabled: bool) -> Self {
287        match &mut self.inner {
288            PartitionedOperation::Query(req) => req.data_boost_enabled = enabled,
289            PartitionedOperation::Read(req) => req.data_boost_enabled = enabled,
290        }
291        self
292    }
293
294    /// Sets the per-attempt timeout for this partition execution.
295    ///
296    /// **Note:** This field is **not serialized**. Each host that executes a partition must set its own attempt timeout.
297    pub fn with_attempt_timeout(mut self, timeout: Duration) -> Self {
298        self.gax_options.set_attempt_timeout(timeout);
299        self
300    }
301
302    /// Sets the retry policy for this partition execution.
303    ///
304    /// **Note:** This field is **not serialized**. Each host that executes a partition must set its own retry policy.
305    pub fn with_retry_policy(mut self, policy: impl Into<RetryPolicyArg>) -> Self {
306        self.gax_options.set_retry_policy(policy);
307        self
308    }
309
310    /// Sets the backoff policy for this partition execution.
311    ///
312    /// **Note:** This field is **not serialized**. Each host that executes a partition must set its own backoff policy.
313    pub fn with_backoff_policy(mut self, policy: impl Into<BackoffPolicyArg>) -> Self {
314        self.gax_options.set_backoff_policy(policy);
315        self
316    }
317
318    /// Executes this partition and returns a [ResultSet] that
319    /// contains the rows that belong to this partition.
320    ///
321    /// # Example: executing a query partition
322    /// ```
323    /// # use google_cloud_spanner::client::Spanner;
324    /// # use google_cloud_spanner::statement::Statement;
325    /// # use google_cloud_spanner::model::PartitionOptions;
326    /// # async fn run_query(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
327    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
328    /// let transaction = db_client.batch_read_only_transaction().build().await?;
329    /// let partitions = transaction.partition_query(
330    ///     Statement::builder("SELECT * FROM Users").build(),
331    ///     PartitionOptions::default()
332    /// ).await?;
333    ///
334    /// // ... send partitions to other workers ...
335    ///
336    /// // On a worker receiving a partition, execute it:
337    /// let mut result_set = partitions[0].execute(&db_client).await?;
338    /// while let Some(row) = result_set.next().await.transpose()? {
339    ///     // process row
340    /// }
341    /// # Ok(())
342    /// # }
343    /// ```
344    /// # Example: executing a read partition
345    /// ```
346    /// # use google_cloud_spanner::client::Spanner;
347    /// # use google_cloud_spanner::key::KeySet;
348    /// # use google_cloud_spanner::read::ReadRequest;
349    /// # use google_cloud_spanner::model::PartitionOptions;
350    /// # async fn run_read(spanner: Spanner) -> Result<(), google_cloud_spanner::Error> {
351    /// let db_client = spanner.database_client("projects/p/instances/i/databases/d").build().await?;
352    /// let transaction = db_client.batch_read_only_transaction().build().await?;
353    /// let req = ReadRequest::builder("Users", vec!["Id", "Name"]).with_keys(KeySet::all()).build();
354    /// let partitions = transaction.partition_read(req, PartitionOptions::default()).await?;
355    ///
356    /// // ... send partitions to other workers ...
357    ///
358    /// // On a worker receiving a partition, execute it:
359    /// let mut result_set = partitions[0].execute(&db_client).await?;
360    /// while let Some(row) = result_set.next().await.transpose()? {
361    ///     // process row
362    /// }
363    /// # Ok(())
364    /// # }
365    /// ```
366    ///
367    /// A partition can be executed by any `DatabaseClient` that is connected to
368    /// the database that the partitions belong to.
369    pub async fn execute(&self, client: &DatabaseClient) -> crate::Result<ResultSet> {
370        match &self.inner {
371            PartitionedOperation::Query(req) => {
372                Self::execute_query(client, req, self.gax_options.clone()).await
373            }
374            PartitionedOperation::Read(req) => {
375                Self::execute_read(client, req, self.gax_options.clone()).await
376            }
377        }
378    }
379
380    async fn execute_query(
381        client: &DatabaseClient,
382        req: &crate::model::ExecuteSqlRequest,
383        gax_options: GaxRequestOptions,
384    ) -> crate::Result<ResultSet> {
385        let channel_hint = client.spanner.next_channel_hint();
386        let stream = client
387            .spanner
388            .execute_streaming_sql(req.clone(), gax_options.clone(), channel_hint)
389            .send()
390            .await?;
391
392        ResultSet::create(ResultSetParams {
393            stream,
394            transaction_selector: Some(ReadContextTransactionSelector::Fixed(
395                req.transaction
396                    .clone()
397                    .expect("transaction must be set in partition request"),
398                None,
399            )),
400            precommit_token_tracker: PrecommitTokenTracker::new_noop(),
401            client: client.clone(),
402            session_name: req.session.clone(),
403            transaction_tag: None,
404            operation: StreamOperation::Query(req.clone()),
405            channel_hint,
406            gax_options,
407        })
408        .await
409    }
410
411    async fn execute_read(
412        client: &DatabaseClient,
413        req: &crate::model::ReadRequest,
414        gax_options: GaxRequestOptions,
415    ) -> crate::Result<ResultSet> {
416        let channel_hint = client.spanner.next_channel_hint();
417        let stream = client
418            .spanner
419            .streaming_read(req.clone(), gax_options.clone(), channel_hint)
420            .send()
421            .await?;
422
423        ResultSet::create(ResultSetParams {
424            stream,
425            transaction_selector: Some(ReadContextTransactionSelector::Fixed(
426                req.transaction
427                    .clone()
428                    .expect("transaction must be set in partition request"),
429                None,
430            )),
431            precommit_token_tracker: PrecommitTokenTracker::new_noop(),
432            client: client.clone(),
433            session_name: req.session.clone(),
434            transaction_tag: None,
435            operation: StreamOperation::Read(req.clone()),
436            channel_hint,
437            gax_options,
438        })
439        .await
440    }
441}
442
443#[derive(Clone, Debug, Serialize, Deserialize)]
444pub(crate) enum PartitionedOperation {
445    Query(crate::model::ExecuteSqlRequest),
446    Read(crate::model::ReadRequest),
447}
448
449#[cfg(test)]
450pub(crate) mod tests {
451    use super::*;
452    use crate::key::KeySet;
453    use crate::model::transaction_selector::Selector;
454    use crate::model::{ExecuteSqlRequest, ReadRequest as GrpcReadRequest, TransactionSelector};
455    use crate::read::ReadRequest as SpannerReadRequest;
456    use crate::read_only_transaction::tests::{create_session_mock, setup_db_client};
457    use crate::statement::Statement;
458    use crate::transaction::TimestampBound;
459    use gaxi::grpc::tonic::Response;
460    use google_cloud_test_macros::tokio_test_no_panics;
461    use prost_types::Timestamp;
462    use spanner_grpc_mock::google::spanner::v1::{
463        PartialResultSet, Partition as MockPartition, PartitionResponse, ResultSetMetadata,
464        StructType, Transaction,
465    };
466    use static_assertions::assert_impl_all;
467    use std::fmt::Debug;
468
469    #[test]
470    fn auto_traits() {
471        assert_impl_all!(BatchReadOnlyTransactionBuilder: Send, Sync);
472        assert_impl_all!(BatchReadOnlyTransaction: Send, Sync, Debug);
473        assert_impl_all!(Partition: Send, Sync, Debug);
474    }
475
476    #[test]
477    fn serialize_partition_skips_gax_options() -> anyhow::Result<()> {
478        use std::time::Duration;
479
480        let req = crate::model::ExecuteSqlRequest::new()
481            .set_sql("SELECT 1")
482            .set_partition_token(b"token".to_vec());
483
484        let mut gax_options = GaxRequestOptions::default();
485        gax_options.set_attempt_timeout(Duration::from_secs(5));
486        let partition = Partition {
487            inner: PartitionedOperation::Query(req),
488            gax_options,
489        };
490
491        let serialized = serde_json::to_string(&partition)?;
492        let deserialized: Partition = serde_json::from_str(&serialized)?;
493
494        // Verify that gax_options was NOT preserved (it uses default, which is None timeout)
495        assert_eq!(*deserialized.gax_options.attempt_timeout(), None);
496
497        Ok(())
498    }
499
500    fn setup_select1() -> PartialResultSet {
501        PartialResultSet {
502            metadata: Some(ResultSetMetadata {
503                row_type: Some(StructType {
504                    fields: vec![Default::default()],
505                }),
506                ..Default::default()
507            }),
508            values: vec![prost_types::Value {
509                kind: Some(prost_types::value::Kind::StringValue("1".to_string())),
510            }],
511            last: true,
512            ..Default::default()
513        }
514    }
515
516    #[tokio_test_no_panics]
517    async fn partition_execute_respects_options() -> anyhow::Result<()> {
518        use gaxi::grpc::tonic::Response;
519        use std::time::Duration;
520
521        let mut mock = create_session_mock();
522
523        mock.expect_execute_streaming_sql().once().returning(|req| {
524            let timeout = req.metadata().get("grpc-timeout");
525            assert!(timeout.is_some(), "Missing grpc-timeout header");
526            assert_eq!(timeout.unwrap(), "5000000u"); // 5 seconds in micros
527
528            Ok(Response::from(crate::result_set::tests::adapt([Ok(
529                setup_select1(),
530            )])))
531        });
532
533        let (db_client, _server) = setup_db_client(mock).await;
534
535        let req = crate::model::ExecuteSqlRequest::new()
536            .set_session("projects/p/instances/i/databases/d/sessions/123")
537            .set_transaction(crate::model::TransactionSelector {
538                selector: Some(Selector::Id(b"tx_id_1".to_vec().into())),
539                ..Default::default()
540            })
541            .set_sql("SELECT 1")
542            .set_partition_token(b"token".to_vec());
543
544        let partition = Partition {
545            inner: PartitionedOperation::Query(req),
546            gax_options: GaxRequestOptions::default(),
547        };
548
549        let partition = partition.with_attempt_timeout(Duration::from_secs(5));
550
551        let _result_set = partition.execute(&db_client).await?;
552
553        Ok(())
554    }
555
556    #[test]
557    fn serialize_partition_query() -> anyhow::Result<()> {
558        let req = crate::model::ExecuteSqlRequest::new()
559            .set_session("projects/p/instances/i/databases/d/sessions/123")
560            .set_transaction(crate::model::TransactionSelector {
561                selector: Some(crate::model::transaction_selector::Selector::Id(
562                    b"tx_id_1".to_vec().into(),
563                )),
564                ..Default::default()
565            })
566            .set_sql("SELECT * FROM Users")
567            .set_partition_token(b"partition_token_123".to_vec());
568
569        let partition = Partition {
570            inner: PartitionedOperation::Query(req),
571            gax_options: GaxRequestOptions::default(),
572        };
573
574        let serialized = serde_json::to_string(&partition)?;
575        let deserialized: Partition = serde_json::from_str(&serialized)?;
576
577        match &deserialized.inner {
578            PartitionedOperation::Query(r) => {
579                assert_eq!(r.partition_token.as_ref(), b"partition_token_123");
580                assert_eq!(r.sql, "SELECT * FROM Users");
581                assert_eq!(r.session, "projects/p/instances/i/databases/d/sessions/123");
582            }
583            _ => panic!("Expected Query partition"),
584        }
585        Ok(())
586    }
587
588    #[test]
589    fn serialize_partition_read() -> anyhow::Result<()> {
590        let req = crate::model::ReadRequest::new()
591            .set_session("projects/p/instances/i/databases/d/sessions/456")
592            .set_transaction(crate::model::TransactionSelector {
593                selector: Some(crate::model::transaction_selector::Selector::Id(
594                    b"tx_id_2".to_vec().into(),
595                )),
596                ..Default::default()
597            })
598            .set_table("Users")
599            .set_columns(vec!["Id"])
600            .set_partition_token(b"partition_token_456".to_vec());
601
602        let partition = Partition {
603            inner: PartitionedOperation::Read(req),
604            gax_options: GaxRequestOptions::default(),
605        };
606
607        let serialized = serde_json::to_string(&partition)?;
608        let deserialized: Partition = serde_json::from_str(&serialized)?;
609
610        match &deserialized.inner {
611            PartitionedOperation::Read(r) => {
612                assert_eq!(r.partition_token.as_ref(), b"partition_token_456");
613                assert_eq!(r.table, "Users");
614                assert_eq!(r.session, "projects/p/instances/i/databases/d/sessions/456");
615            }
616            _ => panic!("Expected Read partition"),
617        }
618        Ok(())
619    }
620
621    #[tokio_test_no_panics]
622    async fn execute_query() -> anyhow::Result<()> {
623        let mut mock = create_session_mock();
624
625        mock.expect_execute_streaming_sql().once().returning(|req| {
626            let req = req.into_inner();
627            // Verify the partition details were properly stamped onto the request
628            assert_eq!(
629                req.session,
630                "projects/p/instances/i/databases/d/sessions/123"
631            );
632            assert_eq!(req.partition_token, b"partition_token_123".as_slice());
633            assert!(req.transaction.is_some());
634            assert_eq!(req.sql, "SELECT * FROM Users");
635
636            Ok(Response::from(crate::result_set::tests::adapt([Ok(
637                setup_select1(),
638            )])))
639        });
640
641        let (db_client, _server) = setup_db_client(mock).await;
642
643        let req = crate::model::ExecuteSqlRequest::new()
644            .set_session("projects/p/instances/i/databases/d/sessions/123")
645            .set_transaction(crate::model::TransactionSelector {
646                selector: Some(crate::model::transaction_selector::Selector::Id(
647                    b"tx_id_1".to_vec().into(),
648                )),
649                ..Default::default()
650            })
651            .set_sql("SELECT * FROM Users")
652            .set_partition_token(b"partition_token_123".to_vec());
653
654        let partition = Partition {
655            inner: PartitionedOperation::Query(req),
656            gax_options: GaxRequestOptions::default(),
657        };
658
659        let _result_set = partition.execute(&db_client).await?;
660
661        Ok(())
662    }
663
664    #[tokio_test_no_panics]
665    async fn execute_read() -> anyhow::Result<()> {
666        let mut mock = create_session_mock();
667
668        mock.expect_streaming_read().once().returning(|req| {
669            let req = req.into_inner();
670            // Verify the partition details were properly stamped onto the request
671            assert_eq!(
672                req.session,
673                "projects/p/instances/i/databases/d/sessions/456"
674            );
675            assert_eq!(req.partition_token, b"partition_token_456".as_slice());
676            assert!(req.transaction.is_some());
677            assert_eq!(req.table, "Users");
678
679            Ok(Response::from(crate::result_set::tests::adapt([Ok(
680                setup_select1(),
681            )])))
682        });
683
684        let (db_client, _server) = setup_db_client(mock).await;
685
686        let req = crate::model::ReadRequest::new()
687            .set_session("projects/p/instances/i/databases/d/sessions/456")
688            .set_transaction(crate::model::TransactionSelector {
689                selector: Some(crate::model::transaction_selector::Selector::Id(
690                    b"tx_id_2".to_vec().into(),
691                )),
692                ..Default::default()
693            })
694            .set_table("Users")
695            .set_columns(vec!["Id"])
696            .set_partition_token(b"partition_token_456".to_vec());
697
698        let partition = Partition {
699            inner: PartitionedOperation::Read(req),
700            gax_options: GaxRequestOptions::default(),
701        };
702
703        let _result_set = partition.execute(&db_client).await?;
704
705        Ok(())
706    }
707
708    #[tokio_test_no_panics]
709    async fn partition_query() -> anyhow::Result<()> {
710        let mut mock = create_session_mock();
711
712        mock.expect_begin_transaction().once().returning(|req| {
713            let req = req.into_inner();
714            assert_eq!(
715                req.session,
716                "projects/p/instances/i/databases/d/sessions/123"
717            );
718            Ok(Response::new(Transaction {
719                id: vec![1, 2, 3],
720                read_timestamp: Some(Timestamp {
721                    seconds: 123456789,
722                    nanos: 0,
723                }),
724                ..Default::default()
725            }))
726        });
727
728        mock.expect_partition_query().once().returning(|req| {
729            let req = req.into_inner();
730            assert_eq!(
731                req.session,
732                "projects/p/instances/i/databases/d/sessions/123"
733            );
734            assert_eq!(req.sql, "SELECT 1");
735            Ok(Response::new(PartitionResponse {
736                partitions: vec![
737                    MockPartition {
738                        partition_token: vec![10],
739                    },
740                    MockPartition {
741                        partition_token: vec![20],
742                    },
743                ],
744                transaction: None,
745            }))
746        });
747
748        let (db_client, _server) = setup_db_client(mock).await;
749
750        let tx = db_client
751            .batch_read_only_transaction()
752            .set_timestamp_bound(TimestampBound::strong())
753            .build()
754            .await?;
755
756        let ts = tx.read_timestamp().expect("Missing read timestamp");
757        assert_eq!(ts.seconds(), 123456789);
758        assert_eq!(ts.nanos(), 0);
759
760        let partitions = tx
761            .partition_query(
762                Statement::builder("SELECT 1").build(),
763                PartitionOptions::default(),
764            )
765            .await?;
766
767        assert_eq!(partitions.len(), 2);
768
769        match &partitions[0].inner {
770            PartitionedOperation::Query(req) => {
771                assert_eq!(req.partition_token.as_ref(), &[10]);
772                assert_eq!(req.sql, "SELECT 1");
773            }
774            _ => panic!("Expected Query partition"),
775        }
776        Ok(())
777    }
778
779    #[tokio_test_no_panics]
780    async fn partition_read() -> anyhow::Result<()> {
781        let mut mock = create_session_mock();
782
783        mock.expect_begin_transaction().once().returning(|req| {
784            let req = req.into_inner();
785            assert_eq!(
786                req.session,
787                "projects/p/instances/i/databases/d/sessions/123"
788            );
789            Ok(Response::new(Transaction {
790                id: vec![1, 2, 3],
791                read_timestamp: Some(Timestamp {
792                    seconds: 123456789,
793                    nanos: 0,
794                }),
795                ..Default::default()
796            }))
797        });
798
799        mock.expect_partition_read().once().returning(|req| {
800            let req = req.into_inner();
801            assert_eq!(
802                req.session,
803                "projects/p/instances/i/databases/d/sessions/123"
804            );
805            assert_eq!(req.table, "Users");
806            Ok(Response::new(PartitionResponse {
807                partitions: vec![MockPartition {
808                    partition_token: vec![30],
809                }],
810                transaction: None,
811            }))
812        });
813
814        let (db_client, _server) = setup_db_client(mock).await;
815
816        let transaction = db_client.batch_read_only_transaction().build().await?;
817
818        let read = SpannerReadRequest::builder("Users", vec!["Id", "Name"])
819            .with_keys(KeySet::all())
820            .build();
821        let partitions = transaction
822            .partition_read(read, PartitionOptions::default())
823            .await?;
824
825        assert_eq!(partitions.len(), 1);
826
827        match &partitions[0].inner {
828            PartitionedOperation::Read(req) => {
829                assert_eq!(req.partition_token.as_ref(), &[30]);
830                assert_eq!(req.table, "Users");
831            }
832            _ => panic!("Expected Read partition"),
833        }
834        Ok(())
835    }
836
837    #[tokio_test_no_panics]
838    async fn execute_query_with_data_boost() -> anyhow::Result<()> {
839        let mut mock = create_session_mock();
840
841        mock.expect_execute_streaming_sql().once().returning(|req| {
842            let req = req.into_inner();
843            assert!(req.data_boost_enabled, "data_boost_enabled should be true");
844            Ok(Response::from(crate::result_set::tests::adapt([Ok(
845                setup_select1(),
846            )])))
847        });
848
849        let (db_client, _server) = setup_db_client(mock).await;
850
851        let req = ExecuteSqlRequest::new()
852            .set_session("projects/p/instances/i/databases/d/sessions/123")
853            .set_transaction(TransactionSelector {
854                selector: Some(Selector::Id(b"tx_id_1".to_vec().into())),
855                ..Default::default()
856            })
857            .set_sql("SELECT * FROM Users")
858            .set_partition_token(b"partition_token_123".to_vec());
859
860        let partition = Partition {
861            inner: PartitionedOperation::Query(req),
862            gax_options: GaxRequestOptions::default(),
863        };
864
865        let _result_set = partition.set_data_boost(true).execute(&db_client).await?;
866
867        Ok(())
868    }
869
870    #[tokio_test_no_panics]
871    async fn execute_read_with_data_boost() -> anyhow::Result<()> {
872        let mut mock = create_session_mock();
873
874        mock.expect_streaming_read().once().returning(|req| {
875            let req = req.into_inner();
876            assert!(req.data_boost_enabled, "data_boost_enabled should be true");
877            Ok(Response::from(crate::result_set::tests::adapt([Ok(
878                setup_select1(),
879            )])))
880        });
881
882        let (db_client, _server) = setup_db_client(mock).await;
883
884        let req = GrpcReadRequest::new()
885            .set_session("projects/p/instances/i/databases/d/sessions/123")
886            .set_transaction(TransactionSelector {
887                selector: Some(Selector::Id(b"tx_id_2".to_vec().into())),
888                ..Default::default()
889            })
890            .set_table("Users")
891            .set_columns(vec!["Id".to_string(), "Name".to_string()])
892            .set_partition_token(b"partition_token_456".to_vec());
893
894        let partition = Partition {
895            inner: PartitionedOperation::Read(req),
896            gax_options: GaxRequestOptions::default(),
897        };
898
899        let _result_set = partition.set_data_boost(true).execute(&db_client).await?;
900
901        Ok(())
902    }
903}