1use crate::database_client::DatabaseClient;
16use crate::model::PartitionOptions;
17use crate::precommit::PrecommitTokenTracker;
18use crate::read_only_transaction::{
19 BeginTransactionOption, MultiUseReadOnlyTransaction, MultiUseReadOnlyTransactionBuilder,
20 ReadContextTransactionSelector,
21};
22use crate::result_set::{ResultSet, ResultSetParams, StreamOperation};
23use crate::statement::Statement;
24use crate::timestamp_bound::TimestampBound;
25use google_cloud_gax::backoff_policy::BackoffPolicyArg;
26use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
27use google_cloud_gax::retry_policy::RetryPolicyArg;
28use serde::{Deserialize, Serialize};
29use std::time::Duration;
30
31pub struct BatchReadOnlyTransactionBuilder {
47 inner: MultiUseReadOnlyTransactionBuilder,
48}
49
50impl BatchReadOnlyTransactionBuilder {
51 pub(crate) fn new(client: DatabaseClient) -> Self {
52 Self {
53 inner: MultiUseReadOnlyTransactionBuilder::new(client)
54 .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin),
55 }
56 }
57
58 pub fn set_timestamp_bound(self, bound: TimestampBound) -> Self {
71 Self {
72 inner: self.inner.set_timestamp_bound(bound),
73 }
74 }
75
76 pub async fn build(self) -> crate::Result<BatchReadOnlyTransaction> {
89 let inner = self.inner.build().await?;
90 Ok(BatchReadOnlyTransaction { inner })
91 }
92}
93
94#[derive(Debug)]
118pub struct BatchReadOnlyTransaction {
119 inner: MultiUseReadOnlyTransaction,
120}
121
122impl BatchReadOnlyTransaction {
123 pub fn read_timestamp(&self) -> Option<wkt::Timestamp> {
125 self.inner.read_timestamp()
126 }
127
128 pub async fn partition_query<T: Into<Statement>>(
149 &self,
150 statement: T,
151 options: PartitionOptions,
152 ) -> crate::Result<Vec<Partition>> {
153 let selector = self.inner.context.transaction_selector.selector().await?;
154 let statement = statement.into();
155 let request = statement
156 .clone()
157 .into_partition_query_request()
158 .set_session(self.inner.context.session_name.clone())
159 .set_transaction(selector.clone())
160 .set_partition_options(options);
161
162 let response = self
163 .inner
164 .context
165 .client
166 .spanner
167 .partition_query(
168 request,
169 crate::RequestOptions::default(),
170 self.inner.context.channel_hint,
171 )
172 .await?;
173
174 Ok(response
175 .partitions
176 .into_iter()
177 .map(|p| {
178 let mut req = statement.clone().into_request();
179 req.session = self.inner.context.session_name.clone();
180 req.transaction = Some(selector.clone());
181 req.partition_token = p.partition_token;
182
183 Partition {
184 inner: PartitionedOperation::Query(req),
185 gax_options: GaxRequestOptions::default(),
186 }
187 })
188 .collect())
189 }
190
191 pub async fn partition_read<T: Into<crate::read::ReadRequest>>(
213 &self,
214 read: T,
215 options: PartitionOptions,
216 ) -> crate::Result<Vec<Partition>> {
217 let selector = self.inner.context.transaction_selector.selector().await?;
218 let read = read.into();
219 let request = read
220 .clone()
221 .into_partition_read_request()
222 .set_session(self.inner.context.session_name.clone())
223 .set_transaction(selector.clone())
224 .set_partition_options(options);
225
226 let response = self
227 .inner
228 .context
229 .client
230 .spanner
231 .partition_read(
232 request,
233 crate::RequestOptions::default(),
234 self.inner.context.channel_hint,
235 )
236 .await?;
237
238 Ok(response
239 .partitions
240 .into_iter()
241 .map(|p| {
242 let mut req = read.clone().into_request();
243 req.session = self.inner.context.session_name.clone();
244 req.transaction = Some(selector.clone());
245 req.partition_token = p.partition_token;
246
247 Partition {
248 inner: PartitionedOperation::Read(req),
249 gax_options: GaxRequestOptions::default(),
250 }
251 })
252 .collect())
253 }
254}
255
256#[derive(Clone, Debug, Serialize, Deserialize)]
260pub struct Partition {
261 pub(crate) inner: PartitionedOperation,
262 #[serde(skip)]
263 pub(crate) gax_options: GaxRequestOptions,
264}
265
266impl Partition {
267 pub fn set_data_boost(mut self, enabled: bool) -> Self {
287 match &mut self.inner {
288 PartitionedOperation::Query(req) => req.data_boost_enabled = enabled,
289 PartitionedOperation::Read(req) => req.data_boost_enabled = enabled,
290 }
291 self
292 }
293
294 pub fn with_attempt_timeout(mut self, timeout: Duration) -> Self {
298 self.gax_options.set_attempt_timeout(timeout);
299 self
300 }
301
302 pub fn with_retry_policy(mut self, policy: impl Into<RetryPolicyArg>) -> Self {
306 self.gax_options.set_retry_policy(policy);
307 self
308 }
309
310 pub fn with_backoff_policy(mut self, policy: impl Into<BackoffPolicyArg>) -> Self {
314 self.gax_options.set_backoff_policy(policy);
315 self
316 }
317
318 pub async fn execute(&self, client: &DatabaseClient) -> crate::Result<ResultSet> {
370 match &self.inner {
371 PartitionedOperation::Query(req) => {
372 Self::execute_query(client, req, self.gax_options.clone()).await
373 }
374 PartitionedOperation::Read(req) => {
375 Self::execute_read(client, req, self.gax_options.clone()).await
376 }
377 }
378 }
379
380 async fn execute_query(
381 client: &DatabaseClient,
382 req: &crate::model::ExecuteSqlRequest,
383 gax_options: GaxRequestOptions,
384 ) -> crate::Result<ResultSet> {
385 let channel_hint = client.spanner.next_channel_hint();
386 let stream = client
387 .spanner
388 .execute_streaming_sql(req.clone(), gax_options.clone(), channel_hint)
389 .send()
390 .await?;
391
392 ResultSet::create(ResultSetParams {
393 stream,
394 transaction_selector: Some(ReadContextTransactionSelector::Fixed(
395 req.transaction
396 .clone()
397 .expect("transaction must be set in partition request"),
398 None,
399 )),
400 precommit_token_tracker: PrecommitTokenTracker::new_noop(),
401 client: client.clone(),
402 session_name: req.session.clone(),
403 transaction_tag: None,
404 operation: StreamOperation::Query(req.clone()),
405 channel_hint,
406 gax_options,
407 })
408 .await
409 }
410
411 async fn execute_read(
412 client: &DatabaseClient,
413 req: &crate::model::ReadRequest,
414 gax_options: GaxRequestOptions,
415 ) -> crate::Result<ResultSet> {
416 let channel_hint = client.spanner.next_channel_hint();
417 let stream = client
418 .spanner
419 .streaming_read(req.clone(), gax_options.clone(), channel_hint)
420 .send()
421 .await?;
422
423 ResultSet::create(ResultSetParams {
424 stream,
425 transaction_selector: Some(ReadContextTransactionSelector::Fixed(
426 req.transaction
427 .clone()
428 .expect("transaction must be set in partition request"),
429 None,
430 )),
431 precommit_token_tracker: PrecommitTokenTracker::new_noop(),
432 client: client.clone(),
433 session_name: req.session.clone(),
434 transaction_tag: None,
435 operation: StreamOperation::Read(req.clone()),
436 channel_hint,
437 gax_options,
438 })
439 .await
440 }
441}
442
443#[derive(Clone, Debug, Serialize, Deserialize)]
444pub(crate) enum PartitionedOperation {
445 Query(crate::model::ExecuteSqlRequest),
446 Read(crate::model::ReadRequest),
447}
448
449#[cfg(test)]
450pub(crate) mod tests {
451 use super::*;
452 use crate::key::KeySet;
453 use crate::model::transaction_selector::Selector;
454 use crate::model::{ExecuteSqlRequest, ReadRequest as GrpcReadRequest, TransactionSelector};
455 use crate::read::ReadRequest as SpannerReadRequest;
456 use crate::read_only_transaction::tests::{create_session_mock, setup_db_client};
457 use crate::statement::Statement;
458 use crate::transaction::TimestampBound;
459 use gaxi::grpc::tonic::Response;
460 use google_cloud_test_macros::tokio_test_no_panics;
461 use prost_types::Timestamp;
462 use spanner_grpc_mock::google::spanner::v1::{
463 PartialResultSet, Partition as MockPartition, PartitionResponse, ResultSetMetadata,
464 StructType, Transaction,
465 };
466 use static_assertions::assert_impl_all;
467 use std::fmt::Debug;
468
469 #[test]
470 fn auto_traits() {
471 assert_impl_all!(BatchReadOnlyTransactionBuilder: Send, Sync);
472 assert_impl_all!(BatchReadOnlyTransaction: Send, Sync, Debug);
473 assert_impl_all!(Partition: Send, Sync, Debug);
474 }
475
476 #[test]
477 fn serialize_partition_skips_gax_options() -> anyhow::Result<()> {
478 use std::time::Duration;
479
480 let req = crate::model::ExecuteSqlRequest::new()
481 .set_sql("SELECT 1")
482 .set_partition_token(b"token".to_vec());
483
484 let mut gax_options = GaxRequestOptions::default();
485 gax_options.set_attempt_timeout(Duration::from_secs(5));
486 let partition = Partition {
487 inner: PartitionedOperation::Query(req),
488 gax_options,
489 };
490
491 let serialized = serde_json::to_string(&partition)?;
492 let deserialized: Partition = serde_json::from_str(&serialized)?;
493
494 assert_eq!(*deserialized.gax_options.attempt_timeout(), None);
496
497 Ok(())
498 }
499
500 fn setup_select1() -> PartialResultSet {
501 PartialResultSet {
502 metadata: Some(ResultSetMetadata {
503 row_type: Some(StructType {
504 fields: vec![Default::default()],
505 }),
506 ..Default::default()
507 }),
508 values: vec![prost_types::Value {
509 kind: Some(prost_types::value::Kind::StringValue("1".to_string())),
510 }],
511 last: true,
512 ..Default::default()
513 }
514 }
515
516 #[tokio_test_no_panics]
517 async fn partition_execute_respects_options() -> anyhow::Result<()> {
518 use gaxi::grpc::tonic::Response;
519 use std::time::Duration;
520
521 let mut mock = create_session_mock();
522
523 mock.expect_execute_streaming_sql().once().returning(|req| {
524 let timeout = req.metadata().get("grpc-timeout");
525 assert!(timeout.is_some(), "Missing grpc-timeout header");
526 assert_eq!(timeout.unwrap(), "5000000u"); Ok(Response::from(crate::result_set::tests::adapt([Ok(
529 setup_select1(),
530 )])))
531 });
532
533 let (db_client, _server) = setup_db_client(mock).await;
534
535 let req = crate::model::ExecuteSqlRequest::new()
536 .set_session("projects/p/instances/i/databases/d/sessions/123")
537 .set_transaction(crate::model::TransactionSelector {
538 selector: Some(Selector::Id(b"tx_id_1".to_vec().into())),
539 ..Default::default()
540 })
541 .set_sql("SELECT 1")
542 .set_partition_token(b"token".to_vec());
543
544 let partition = Partition {
545 inner: PartitionedOperation::Query(req),
546 gax_options: GaxRequestOptions::default(),
547 };
548
549 let partition = partition.with_attempt_timeout(Duration::from_secs(5));
550
551 let _result_set = partition.execute(&db_client).await?;
552
553 Ok(())
554 }
555
556 #[test]
557 fn serialize_partition_query() -> anyhow::Result<()> {
558 let req = crate::model::ExecuteSqlRequest::new()
559 .set_session("projects/p/instances/i/databases/d/sessions/123")
560 .set_transaction(crate::model::TransactionSelector {
561 selector: Some(crate::model::transaction_selector::Selector::Id(
562 b"tx_id_1".to_vec().into(),
563 )),
564 ..Default::default()
565 })
566 .set_sql("SELECT * FROM Users")
567 .set_partition_token(b"partition_token_123".to_vec());
568
569 let partition = Partition {
570 inner: PartitionedOperation::Query(req),
571 gax_options: GaxRequestOptions::default(),
572 };
573
574 let serialized = serde_json::to_string(&partition)?;
575 let deserialized: Partition = serde_json::from_str(&serialized)?;
576
577 match &deserialized.inner {
578 PartitionedOperation::Query(r) => {
579 assert_eq!(r.partition_token.as_ref(), b"partition_token_123");
580 assert_eq!(r.sql, "SELECT * FROM Users");
581 assert_eq!(r.session, "projects/p/instances/i/databases/d/sessions/123");
582 }
583 _ => panic!("Expected Query partition"),
584 }
585 Ok(())
586 }
587
588 #[test]
589 fn serialize_partition_read() -> anyhow::Result<()> {
590 let req = crate::model::ReadRequest::new()
591 .set_session("projects/p/instances/i/databases/d/sessions/456")
592 .set_transaction(crate::model::TransactionSelector {
593 selector: Some(crate::model::transaction_selector::Selector::Id(
594 b"tx_id_2".to_vec().into(),
595 )),
596 ..Default::default()
597 })
598 .set_table("Users")
599 .set_columns(vec!["Id"])
600 .set_partition_token(b"partition_token_456".to_vec());
601
602 let partition = Partition {
603 inner: PartitionedOperation::Read(req),
604 gax_options: GaxRequestOptions::default(),
605 };
606
607 let serialized = serde_json::to_string(&partition)?;
608 let deserialized: Partition = serde_json::from_str(&serialized)?;
609
610 match &deserialized.inner {
611 PartitionedOperation::Read(r) => {
612 assert_eq!(r.partition_token.as_ref(), b"partition_token_456");
613 assert_eq!(r.table, "Users");
614 assert_eq!(r.session, "projects/p/instances/i/databases/d/sessions/456");
615 }
616 _ => panic!("Expected Read partition"),
617 }
618 Ok(())
619 }
620
621 #[tokio_test_no_panics]
622 async fn execute_query() -> anyhow::Result<()> {
623 let mut mock = create_session_mock();
624
625 mock.expect_execute_streaming_sql().once().returning(|req| {
626 let req = req.into_inner();
627 assert_eq!(
629 req.session,
630 "projects/p/instances/i/databases/d/sessions/123"
631 );
632 assert_eq!(req.partition_token, b"partition_token_123".as_slice());
633 assert!(req.transaction.is_some());
634 assert_eq!(req.sql, "SELECT * FROM Users");
635
636 Ok(Response::from(crate::result_set::tests::adapt([Ok(
637 setup_select1(),
638 )])))
639 });
640
641 let (db_client, _server) = setup_db_client(mock).await;
642
643 let req = crate::model::ExecuteSqlRequest::new()
644 .set_session("projects/p/instances/i/databases/d/sessions/123")
645 .set_transaction(crate::model::TransactionSelector {
646 selector: Some(crate::model::transaction_selector::Selector::Id(
647 b"tx_id_1".to_vec().into(),
648 )),
649 ..Default::default()
650 })
651 .set_sql("SELECT * FROM Users")
652 .set_partition_token(b"partition_token_123".to_vec());
653
654 let partition = Partition {
655 inner: PartitionedOperation::Query(req),
656 gax_options: GaxRequestOptions::default(),
657 };
658
659 let _result_set = partition.execute(&db_client).await?;
660
661 Ok(())
662 }
663
664 #[tokio_test_no_panics]
665 async fn execute_read() -> anyhow::Result<()> {
666 let mut mock = create_session_mock();
667
668 mock.expect_streaming_read().once().returning(|req| {
669 let req = req.into_inner();
670 assert_eq!(
672 req.session,
673 "projects/p/instances/i/databases/d/sessions/456"
674 );
675 assert_eq!(req.partition_token, b"partition_token_456".as_slice());
676 assert!(req.transaction.is_some());
677 assert_eq!(req.table, "Users");
678
679 Ok(Response::from(crate::result_set::tests::adapt([Ok(
680 setup_select1(),
681 )])))
682 });
683
684 let (db_client, _server) = setup_db_client(mock).await;
685
686 let req = crate::model::ReadRequest::new()
687 .set_session("projects/p/instances/i/databases/d/sessions/456")
688 .set_transaction(crate::model::TransactionSelector {
689 selector: Some(crate::model::transaction_selector::Selector::Id(
690 b"tx_id_2".to_vec().into(),
691 )),
692 ..Default::default()
693 })
694 .set_table("Users")
695 .set_columns(vec!["Id"])
696 .set_partition_token(b"partition_token_456".to_vec());
697
698 let partition = Partition {
699 inner: PartitionedOperation::Read(req),
700 gax_options: GaxRequestOptions::default(),
701 };
702
703 let _result_set = partition.execute(&db_client).await?;
704
705 Ok(())
706 }
707
708 #[tokio_test_no_panics]
709 async fn partition_query() -> anyhow::Result<()> {
710 let mut mock = create_session_mock();
711
712 mock.expect_begin_transaction().once().returning(|req| {
713 let req = req.into_inner();
714 assert_eq!(
715 req.session,
716 "projects/p/instances/i/databases/d/sessions/123"
717 );
718 Ok(Response::new(Transaction {
719 id: vec![1, 2, 3],
720 read_timestamp: Some(Timestamp {
721 seconds: 123456789,
722 nanos: 0,
723 }),
724 ..Default::default()
725 }))
726 });
727
728 mock.expect_partition_query().once().returning(|req| {
729 let req = req.into_inner();
730 assert_eq!(
731 req.session,
732 "projects/p/instances/i/databases/d/sessions/123"
733 );
734 assert_eq!(req.sql, "SELECT 1");
735 Ok(Response::new(PartitionResponse {
736 partitions: vec![
737 MockPartition {
738 partition_token: vec![10],
739 },
740 MockPartition {
741 partition_token: vec![20],
742 },
743 ],
744 transaction: None,
745 }))
746 });
747
748 let (db_client, _server) = setup_db_client(mock).await;
749
750 let tx = db_client
751 .batch_read_only_transaction()
752 .set_timestamp_bound(TimestampBound::strong())
753 .build()
754 .await?;
755
756 let ts = tx.read_timestamp().expect("Missing read timestamp");
757 assert_eq!(ts.seconds(), 123456789);
758 assert_eq!(ts.nanos(), 0);
759
760 let partitions = tx
761 .partition_query(
762 Statement::builder("SELECT 1").build(),
763 PartitionOptions::default(),
764 )
765 .await?;
766
767 assert_eq!(partitions.len(), 2);
768
769 match &partitions[0].inner {
770 PartitionedOperation::Query(req) => {
771 assert_eq!(req.partition_token.as_ref(), &[10]);
772 assert_eq!(req.sql, "SELECT 1");
773 }
774 _ => panic!("Expected Query partition"),
775 }
776 Ok(())
777 }
778
779 #[tokio_test_no_panics]
780 async fn partition_read() -> anyhow::Result<()> {
781 let mut mock = create_session_mock();
782
783 mock.expect_begin_transaction().once().returning(|req| {
784 let req = req.into_inner();
785 assert_eq!(
786 req.session,
787 "projects/p/instances/i/databases/d/sessions/123"
788 );
789 Ok(Response::new(Transaction {
790 id: vec![1, 2, 3],
791 read_timestamp: Some(Timestamp {
792 seconds: 123456789,
793 nanos: 0,
794 }),
795 ..Default::default()
796 }))
797 });
798
799 mock.expect_partition_read().once().returning(|req| {
800 let req = req.into_inner();
801 assert_eq!(
802 req.session,
803 "projects/p/instances/i/databases/d/sessions/123"
804 );
805 assert_eq!(req.table, "Users");
806 Ok(Response::new(PartitionResponse {
807 partitions: vec![MockPartition {
808 partition_token: vec![30],
809 }],
810 transaction: None,
811 }))
812 });
813
814 let (db_client, _server) = setup_db_client(mock).await;
815
816 let transaction = db_client.batch_read_only_transaction().build().await?;
817
818 let read = SpannerReadRequest::builder("Users", vec!["Id", "Name"])
819 .with_keys(KeySet::all())
820 .build();
821 let partitions = transaction
822 .partition_read(read, PartitionOptions::default())
823 .await?;
824
825 assert_eq!(partitions.len(), 1);
826
827 match &partitions[0].inner {
828 PartitionedOperation::Read(req) => {
829 assert_eq!(req.partition_token.as_ref(), &[30]);
830 assert_eq!(req.table, "Users");
831 }
832 _ => panic!("Expected Read partition"),
833 }
834 Ok(())
835 }
836
837 #[tokio_test_no_panics]
838 async fn execute_query_with_data_boost() -> anyhow::Result<()> {
839 let mut mock = create_session_mock();
840
841 mock.expect_execute_streaming_sql().once().returning(|req| {
842 let req = req.into_inner();
843 assert!(req.data_boost_enabled, "data_boost_enabled should be true");
844 Ok(Response::from(crate::result_set::tests::adapt([Ok(
845 setup_select1(),
846 )])))
847 });
848
849 let (db_client, _server) = setup_db_client(mock).await;
850
851 let req = ExecuteSqlRequest::new()
852 .set_session("projects/p/instances/i/databases/d/sessions/123")
853 .set_transaction(TransactionSelector {
854 selector: Some(Selector::Id(b"tx_id_1".to_vec().into())),
855 ..Default::default()
856 })
857 .set_sql("SELECT * FROM Users")
858 .set_partition_token(b"partition_token_123".to_vec());
859
860 let partition = Partition {
861 inner: PartitionedOperation::Query(req),
862 gax_options: GaxRequestOptions::default(),
863 };
864
865 let _result_set = partition.set_data_boost(true).execute(&db_client).await?;
866
867 Ok(())
868 }
869
870 #[tokio_test_no_panics]
871 async fn execute_read_with_data_boost() -> anyhow::Result<()> {
872 let mut mock = create_session_mock();
873
874 mock.expect_streaming_read().once().returning(|req| {
875 let req = req.into_inner();
876 assert!(req.data_boost_enabled, "data_boost_enabled should be true");
877 Ok(Response::from(crate::result_set::tests::adapt([Ok(
878 setup_select1(),
879 )])))
880 });
881
882 let (db_client, _server) = setup_db_client(mock).await;
883
884 let req = GrpcReadRequest::new()
885 .set_session("projects/p/instances/i/databases/d/sessions/123")
886 .set_transaction(TransactionSelector {
887 selector: Some(Selector::Id(b"tx_id_2".to_vec().into())),
888 ..Default::default()
889 })
890 .set_table("Users")
891 .set_columns(vec!["Id".to_string(), "Name".to_string()])
892 .set_partition_token(b"partition_token_456".to_vec());
893
894 let partition = Partition {
895 inner: PartitionedOperation::Read(req),
896 gax_options: GaxRequestOptions::default(),
897 };
898
899 let _result_set = partition.set_data_boost(true).execute(&db_client).await?;
900
901 Ok(())
902 }
903}