1use crate::database_client::DatabaseClient;
16use crate::error::internal_error;
17use crate::model::TransactionOptions;
18use crate::model::TransactionSelector;
19use crate::model::transaction_options::ReadOnly;
20use crate::precommit::PrecommitTokenTracker;
21use crate::result_set::{ResultSet, ResultSetParams, StreamOperation};
22use crate::statement::Statement;
23use crate::timestamp_bound::TimestampBound;
24use crate::transaction_retry_policy::is_aborted;
25use google_cloud_gax::backoff_policy::BackoffPolicyArg;
26use google_cloud_gax::options::internal::RequestOptionsExt as _;
27use google_cloud_gax::retry_policy::RetryPolicyArg;
28use std::mem::replace;
29use std::sync::{Arc, Mutex};
30use std::time::Duration;
31use tokio::sync::Notify;
32
33pub struct SingleUseReadOnlyTransactionBuilder {
48 client: DatabaseClient,
49 timestamp_bound: Option<TimestampBound>,
50}
51
52impl SingleUseReadOnlyTransactionBuilder {
53 pub(crate) fn new(client: DatabaseClient) -> Self {
54 Self {
55 client,
56 timestamp_bound: None,
57 }
58 }
59
60 pub fn set_timestamp_bound(mut self, bound: TimestampBound) -> Self {
77 self.timestamp_bound = Some(bound);
78 self
79 }
80
81 pub fn build(self) -> SingleUseReadOnlyTransaction {
94 let read_only = match self.timestamp_bound {
95 Some(b) => ReadOnly::default().set_timestamp_bound(b.0),
96 None => ReadOnly::default().set_strong(true),
97 };
98 let transaction_selector = crate::model::TransactionSelector::default()
99 .set_single_use(TransactionOptions::default().set_read_only(read_only));
100
101 let session_name = self.client.session_name();
102 let channel_hint = self.client.spanner.next_channel_hint();
103 SingleUseReadOnlyTransaction {
104 context: ReadContext {
105 session_name,
106 client: self.client,
107 transaction_selector: ReadContextTransactionSelector::Fixed(
108 transaction_selector,
109 None,
110 ),
111 precommit_token_tracker: PrecommitTokenTracker::new_noop(),
112 transaction_tag: None,
113 channel_hint,
114 begin_transaction_request_options: None,
115 },
116 }
117 }
118}
119
120#[derive(Debug)]
138pub struct SingleUseReadOnlyTransaction {
139 context: ReadContext,
140}
141
142impl SingleUseReadOnlyTransaction {
143 pub async fn execute_query<T: Into<Statement>>(
164 &self,
165 statement: T,
166 ) -> crate::Result<ResultSet> {
167 self.context.execute_query(statement).await
168 }
169
170 pub async fn execute_read<T: Into<crate::read::ReadRequest>>(
202 &self,
203 read: T,
204 ) -> crate::Result<ResultSet> {
205 self.context.execute_read(read).await
206 }
207}
208
209#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
211#[non_exhaustive]
212pub enum BeginTransactionOption {
213 #[default]
216 InlineBegin,
217 ExplicitBegin,
219}
220
221pub struct MultiUseReadOnlyTransactionBuilder {
237 client: DatabaseClient,
238 timestamp_bound: Option<TimestampBound>,
239 begin_transaction_option: BeginTransactionOption,
240 begin_gax_options: Option<crate::RequestOptions>,
241}
242
243impl MultiUseReadOnlyTransactionBuilder {
244 pub(crate) fn new(client: DatabaseClient) -> Self {
245 Self {
246 client,
247 timestamp_bound: None,
248 begin_transaction_option: BeginTransactionOption::InlineBegin,
249 begin_gax_options: None,
250 }
251 }
252
253 pub fn with_begin_transaction_option(mut self, option: BeginTransactionOption) -> Self {
285 self.begin_transaction_option = option;
286 self
287 }
288
289 pub fn with_begin_attempt_timeout(mut self, timeout: Duration) -> Self {
307 self.begin_gax_options
308 .get_or_insert_with(crate::RequestOptions::default)
309 .set_attempt_timeout(timeout);
310 self
311 }
312
313 pub fn with_begin_retry_policy(mut self, policy: impl Into<RetryPolicyArg>) -> Self {
331 self.begin_gax_options
332 .get_or_insert_with(crate::RequestOptions::default)
333 .set_retry_policy(policy);
334 self
335 }
336
337 pub fn with_begin_backoff_policy(mut self, policy: impl Into<BackoffPolicyArg>) -> Self {
355 self.begin_gax_options
356 .get_or_insert_with(crate::RequestOptions::default)
357 .set_backoff_policy(policy);
358 self
359 }
360
361 pub fn set_timestamp_bound(mut self, bound: TimestampBound) -> Self {
374 self.timestamp_bound = Some(bound);
375 self
376 }
377
378 async fn begin(
379 &self,
380 session_name: String,
381 options: TransactionOptions,
382 channel_hint: usize,
383 request_options: crate::RequestOptions,
384 ) -> crate::Result<ReadContextTransactionSelector> {
385 let response = execute_begin_transaction(
386 &self.client,
387 session_name,
388 options,
389 None,
390 channel_hint,
391 request_options,
392 None,
393 )
394 .await?;
395
396 let transaction_selector = crate::model::TransactionSelector::default().set_id(response.id);
397
398 Ok(ReadContextTransactionSelector::Fixed(
399 transaction_selector,
400 response.read_timestamp,
401 ))
402 }
403
404 pub async fn build(self) -> crate::Result<MultiUseReadOnlyTransaction> {
417 let read_only = ReadOnly::default().set_return_read_timestamp(true);
418 let read_only = match self.timestamp_bound.as_ref() {
419 Some(b) => read_only.set_timestamp_bound(b.0.clone()),
420 None => read_only.set_strong(true),
421 };
422 let options = TransactionOptions::default().set_read_only(read_only);
423
424 let session_name = self.client.session_name();
425 let channel_hint = self.client.spanner.next_channel_hint();
426 let selector = match self.begin_transaction_option {
427 BeginTransactionOption::ExplicitBegin => {
428 self.begin(
429 session_name.clone(),
430 options,
431 channel_hint,
432 self.begin_gax_options.clone().unwrap_or_default(),
433 )
434 .await?
435 }
436 BeginTransactionOption::InlineBegin => ReadContextTransactionSelector::Lazy(Arc::new(
437 Mutex::new(TransactionState::NotStarted(options)),
438 )),
439 };
440
441 Ok(MultiUseReadOnlyTransaction {
442 context: ReadContext {
443 session_name,
444 client: self.client,
445 transaction_selector: selector,
446 precommit_token_tracker: PrecommitTokenTracker::new_noop(),
447 transaction_tag: None,
448 channel_hint,
449 begin_transaction_request_options: self.begin_gax_options.clone(),
450 },
451 })
452 }
453}
454
455#[derive(Debug)]
477pub struct MultiUseReadOnlyTransaction {
478 pub(crate) context: ReadContext,
479}
480
481impl MultiUseReadOnlyTransaction {
482 pub fn read_timestamp(&self) -> Option<wkt::Timestamp> {
484 self.context.transaction_selector.read_timestamp()
485 }
486
487 pub async fn execute_query<T: Into<Statement>>(
508 &self,
509 statement: T,
510 ) -> crate::Result<ResultSet> {
511 self.context.execute_query(statement).await
512 }
513
514 pub async fn execute_read<T: Into<crate::read::ReadRequest>>(
546 &self,
547 read: T,
548 ) -> crate::Result<ResultSet> {
549 self.context.execute_read(read).await
550 }
551}
552
553pub(crate) async fn execute_begin_transaction(
555 client: &crate::database_client::DatabaseClient,
556 session_name: String,
557 options: crate::model::TransactionOptions,
558 transaction_tag: Option<String>,
559 channel_hint: usize,
560 request_options: crate::RequestOptions,
561 mutation_key: Option<crate::model::Mutation>,
562) -> crate::Result<crate::model::Transaction> {
563 let mut request = crate::model::BeginTransactionRequest::default()
564 .set_session(session_name)
565 .set_options(options)
566 .set_or_clear_mutation_key(mutation_key);
567 if let Some(tag) = transaction_tag {
568 request = request
569 .set_request_options(crate::model::RequestOptions::default().set_transaction_tag(tag));
570 }
571
572 client
573 .spanner
574 .begin_transaction(request, request_options, channel_hint)
575 .await
576}
577
578#[derive(Clone, Debug)]
579pub(crate) enum ReadContextTransactionSelector {
580 Fixed(crate::model::TransactionSelector, Option<wkt::Timestamp>),
581 Lazy(Arc<Mutex<TransactionState>>),
582}
583
584#[derive(Clone, Debug)]
585pub(crate) enum TransactionState {
586 NotStarted(crate::model::TransactionOptions),
587 Starting(crate::model::TransactionOptions, Arc<Notify>),
588 Started(crate::model::TransactionSelector, Option<wkt::Timestamp>),
589 Failed(Arc<crate::Error>),
590}
591
592enum SelectorStatus {
593 Ready(crate::model::TransactionSelector),
594 Wait(std::sync::Arc<tokio::sync::Notify>),
595}
596
597impl ReadContextTransactionSelector {
598 pub(crate) async fn selector(&self) -> crate::Result<crate::model::TransactionSelector> {
599 match self {
600 Self::Fixed(selector, _) => Ok(selector.clone()),
601 Self::Lazy(_) => loop {
602 match self.poll_selector_status()? {
603 SelectorStatus::Ready(selector) => return Ok(selector),
604 SelectorStatus::Wait(notify) => notify.notified().await,
605 }
606 },
607 }
608 }
609
610 fn poll_selector_status(&self) -> crate::Result<SelectorStatus> {
613 let Self::Lazy(lazy) = self else {
614 unreachable!("poll_selector_status called on non-Lazy selector");
615 };
616 let mut guard = lazy.lock().expect("transaction state mutex poisoned");
617
618 if let TransactionState::Started(selector, _) = &*guard {
620 return Ok(SelectorStatus::Ready(selector.clone()));
621 }
622
623 let pending_options = if let TransactionState::NotStarted(options) = &*guard {
625 Some(options.clone())
626 } else {
627 None
630 };
631 if let Some(options) = pending_options {
632 let notify = Arc::new(Notify::new());
634 *guard = TransactionState::Starting(options.clone(), Arc::clone(¬ify));
635 return Ok(SelectorStatus::Ready(
636 crate::model::TransactionSelector::default().set_begin(options),
637 ));
638 }
639
640 match &*guard {
642 TransactionState::Failed(err) => {
646 let error = if let Some(status) = err.status() {
647 crate::Error::service(status.clone())
648 } else {
649 crate::error::internal_error(format!("Transaction failed to start: {}", err))
650 };
651 Err(error)
652 }
653 TransactionState::Starting(_, notify) => Ok(SelectorStatus::Wait(Arc::clone(notify))),
655 TransactionState::Started(_, _) | TransactionState::NotStarted(_) => unreachable!(),
656 }
657 }
658}
659
660pub(crate) struct ExplicitBeginParams {
661 pub(crate) client: crate::database_client::DatabaseClient,
662 pub(crate) session_name: String,
663 pub(crate) transaction_tag: Option<String>,
664 pub(crate) channel_hint: usize,
665 pub(crate) request_options: crate::RequestOptions,
666 pub(crate) is_stream_fallback: bool,
667 pub(crate) precommit_token_tracker: crate::precommit::PrecommitTokenTracker,
668 pub(crate) mutation_key: Option<crate::model::Mutation>,
669}
670
671impl ReadContextTransactionSelector {
672 pub(crate) async fn begin_explicitly(&self, params: ExplicitBeginParams) -> crate::Result<()> {
677 let Self::Lazy(lazy) = self else {
678 return Ok(());
679 };
680
681 enum FallbackAction {
682 Begin(
683 crate::model::TransactionOptions,
684 Option<Arc<tokio::sync::Notify>>,
685 ),
686 Wait(Arc<tokio::sync::Notify>),
687 None,
688 }
689
690 let action = {
691 let mut guard = lazy
692 .lock()
693 .map_err(|_| internal_error("transaction state mutex poisoned"))?;
694 match &*guard {
695 TransactionState::NotStarted(options) => {
696 let options = options.clone();
699 let notify = Arc::new(tokio::sync::Notify::new());
700 *guard = TransactionState::Starting(options.clone(), Arc::clone(¬ify));
701 FallbackAction::Begin(options, Some(notify))
702 }
703 TransactionState::Starting(options, notify) => {
704 if !params.is_stream_fallback {
710 FallbackAction::Wait(Arc::clone(notify))
711 } else {
712 FallbackAction::Begin(options.clone(), Some(Arc::clone(notify)))
713 }
714 }
715 TransactionState::Started(_, _) | TransactionState::Failed(_) => {
716 FallbackAction::None
719 }
720 }
721 };
722
723 let (options, notify_opt) = match action {
724 FallbackAction::None => return Ok(()),
725 FallbackAction::Wait(notify) => {
726 notify.notified().await;
727 return Ok(());
728 }
729 FallbackAction::Begin(opts, notif) => (opts, notif),
730 };
731
732 let response = match execute_begin_transaction(
736 ¶ms.client,
737 params.session_name,
738 options,
739 params.transaction_tag,
740 params.channel_hint,
741 params.request_options,
742 params.mutation_key,
743 )
744 .await
745 {
746 Ok(r) => r,
747 Err(e) => {
748 let mut guard = lazy.lock().expect("transaction state mutex poisoned");
749 let error = Arc::new(e);
750 *guard = TransactionState::Failed(Arc::clone(&error));
751 drop(guard);
754 if let Some(notify) = notify_opt {
755 notify.notify_waiters();
756 }
757
758 let return_error = if let Some(status) = error.status() {
759 crate::Error::service(status.clone())
760 } else {
761 crate::error::internal_error(format!("Transaction failed to start: {}", error))
762 };
763 return Err(return_error);
764 }
765 };
766
767 self.update(response.id, response.read_timestamp)?;
768 params
769 .precommit_token_tracker
770 .update(response.precommit_token);
771
772 Ok(())
773 }
774
775 pub(crate) fn update(
784 &self,
785 id: bytes::Bytes,
786 timestamp: Option<wkt::Timestamp>,
787 ) -> crate::Result<()> {
788 let Self::Lazy(lazy) = self else {
789 return Ok(());
790 };
791 let mut guard = lazy.lock().expect("transaction state mutex poisoned");
792
793 if matches!(
794 &*guard,
795 TransactionState::NotStarted(_) | TransactionState::Starting(_, _)
796 ) {
797 let previous_state = replace(
801 &mut *guard,
802 TransactionState::Started(TransactionSelector::default().set_id(id), timestamp),
803 );
804 drop(guard);
805
806 if let TransactionState::Starting(_, notify) = previous_state {
808 notify.notify_waiters();
809 }
810 Ok(())
811 } else if let TransactionState::Started(existing_selector, _) = &*guard {
812 if existing_selector.id() == Some(&id) {
816 Ok(())
817 } else {
818 Err(crate::error::internal_error(
819 "got a transaction id for an already Started or Failed transaction",
820 ))
821 }
822 } else {
823 Err(crate::error::internal_error(
825 "got a transaction id for an already Started or Failed transaction",
826 ))
827 }
828 }
829
830 pub(crate) fn get_id_no_wait(&self) -> crate::Result<Option<bytes::Bytes>> {
836 use crate::model::transaction_selector::Selector;
837 match self {
838 Self::Fixed(selector, _) => {
839 if let Some(Selector::Id(id)) = &selector.selector {
840 return Ok(Some(id.clone()));
841 }
842 }
843 Self::Lazy(lazy) => {
844 let guard = lazy
845 .lock()
846 .map_err(|_| internal_error("transaction state mutex poisoned"))?;
847 if let TransactionState::Started(selector, _) = &*guard
848 && let Some(Selector::Id(id)) = &selector.selector
849 {
850 return Ok(Some(id.clone()));
851 }
852 }
853 }
854 Ok(None)
855 }
856
857 pub(crate) fn is_starting(&self) -> crate::Result<bool> {
859 match self {
860 Self::Lazy(lazy) => {
861 let guard = lazy
862 .lock()
863 .map_err(|_| internal_error("transaction state mutex poisoned"))?;
864 Ok(matches!(&*guard, TransactionState::Starting(_, _)))
865 }
866 _ => Ok(false),
867 }
868 }
869
870 pub(crate) fn maybe_reset_starting(&self) {
878 let Self::Lazy(lazy) = self else {
879 return;
880 };
881
882 let mut guard = lazy.lock().expect("transaction state mutex poisoned");
883 if let TransactionState::Starting(options, notify) = &*guard {
884 let options = options.clone();
885 let notify = Arc::clone(notify);
886 *guard = TransactionState::NotStarted(options);
887 drop(guard);
888 notify.notify_waiters();
889 }
890 }
891
892 pub(crate) fn read_timestamp(&self) -> Option<wkt::Timestamp> {
898 match self {
899 Self::Fixed(_, timestamp) => *timestamp,
900 Self::Lazy(lazy) => {
901 let guard = lazy.lock().expect("transaction state mutex poisoned");
902 if let TransactionState::Started(_, timestamp) = &*guard {
903 *timestamp
904 } else {
905 None
906 }
907 }
908 }
909 }
910}
911
912#[derive(Clone, Debug)]
913pub(crate) struct ReadContext {
914 pub(crate) session_name: String,
915 pub(crate) client: DatabaseClient,
916 pub(crate) transaction_selector: ReadContextTransactionSelector,
917 pub(crate) precommit_token_tracker: PrecommitTokenTracker,
918 pub(crate) transaction_tag: Option<String>,
919 pub(crate) channel_hint: usize,
920 pub(crate) begin_transaction_request_options: Option<crate::RequestOptions>,
921}
922
923impl ReadContext {
924 pub(crate) fn amend_request_options(
931 &self,
932 mut options: Option<crate::model::RequestOptions>,
933 ) -> Option<crate::model::RequestOptions> {
934 if let Some(tag) = &self.transaction_tag {
935 options
936 .get_or_insert_with(crate::model::RequestOptions::default)
937 .transaction_tag = tag.clone();
938 }
939 options
940 }
941
942 pub(crate) async fn begin_explicitly_if_not_started(
946 &self,
947 fallback_options: crate::RequestOptions,
948 is_stream_fallback: bool,
949 mutation_key: Option<crate::model::Mutation>,
950 ) -> crate::Result<bool> {
951 let ReadContextTransactionSelector::Lazy(lazy) = &self.transaction_selector else {
952 return Ok(false);
953 };
954 let is_started = matches!(&*lazy.lock().unwrap(), TransactionState::Started(_, _));
955 if is_started {
956 return Ok(false);
957 }
958
959 let options = merge_request_options(
960 fallback_options,
961 self.begin_transaction_request_options.as_ref(),
962 );
963
964 self.transaction_selector
965 .begin_explicitly(ExplicitBeginParams {
966 client: self.client.clone(),
967 session_name: self.session_name.clone(),
968 transaction_tag: self.transaction_tag.clone(),
969 channel_hint: self.channel_hint,
970 request_options: options,
971 is_stream_fallback,
972 precommit_token_tracker: self.precommit_token_tracker.clone(),
973 mutation_key,
974 })
975 .await?;
976 Ok(true)
977 }
978}
979
980fn merge_request_options(
983 mut destination: crate::RequestOptions,
984 source: Option<&crate::RequestOptions>,
985) -> crate::RequestOptions {
986 let Some(source) = source else {
987 return destination;
988 };
989
990 if let Some(timeout) = source.attempt_timeout() {
991 destination.set_attempt_timeout(*timeout);
992 }
993 if let Some(retry) = source.retry_policy() {
994 destination.set_retry_policy(retry.clone());
995 }
996 if let Some(backoff) = source.backoff_policy() {
997 destination.set_backoff_policy(backoff.clone());
998 }
999 if let Some(src_headers) = source.get_extension::<http::HeaderMap>() {
1000 let mut dest_headers = destination
1001 .get_extension::<http::HeaderMap>()
1002 .cloned()
1003 .unwrap_or_default();
1004 for (name, value) in src_headers.iter() {
1005 dest_headers.insert(name.clone(), value.clone());
1006 }
1007 destination = destination.insert_extension(dest_headers);
1008 }
1009 destination
1010}
1011
1012macro_rules! execute_stream_with_retry {
1014 ($self:expr, $request:ident, $gax_options:ident, $rpc_method:ident, $operation_variant:path) => {{
1015 let stream = match $self
1016 .client
1017 .spanner
1018 .$rpc_method($request.clone(), $gax_options.clone(), $self.channel_hint)
1019 .send()
1020 .await
1021 {
1022 Ok(s) => s,
1023 Err(e) => {
1024 if is_aborted(&e) {
1025 return Err(e);
1026 }
1027 if $self
1028 .begin_explicitly_if_not_started($gax_options.clone(), true, None)
1029 .await?
1030 {
1031 $request.transaction = Some($self.transaction_selector.selector().await?);
1032 $self
1033 .client
1034 .spanner
1035 .$rpc_method($request.clone(), $gax_options.clone(), $self.channel_hint)
1036 .send()
1037 .await?
1038 } else {
1039 return Err(e);
1040 }
1041 }
1042 };
1043
1044 ResultSet::create(ResultSetParams {
1045 stream,
1046 transaction_selector: Some($self.transaction_selector.clone()),
1047 precommit_token_tracker: $self.precommit_token_tracker.clone(),
1048 client: $self.client.clone(),
1049 session_name: $self.session_name.clone(),
1050 transaction_tag: $self.transaction_tag.clone(),
1051 operation: $operation_variant($request),
1052 channel_hint: $self.channel_hint,
1053 gax_options: $gax_options,
1054 })
1055 .await
1056 }};
1057}
1058
1059impl ReadContext {
1060 pub(crate) async fn execute_query<T: Into<Statement>>(
1061 &self,
1062 statement: T,
1063 ) -> crate::Result<ResultSet> {
1064 let statement = statement.into();
1065 let gax_options = statement.gax_options().clone();
1066 let mut request = statement
1067 .into_request()
1068 .set_session(self.session_name.clone())
1069 .set_transaction(self.transaction_selector.selector().await?);
1070 request.request_options = self.amend_request_options(request.request_options);
1071
1072 execute_stream_with_retry!(
1073 self,
1074 request,
1075 gax_options,
1076 execute_streaming_sql,
1077 StreamOperation::Query
1078 )
1079 }
1080
1081 pub(crate) async fn execute_read<T: Into<crate::read::ReadRequest>>(
1082 &self,
1083 read: T,
1084 ) -> crate::Result<ResultSet> {
1085 let read = read.into();
1086 let gax_options = read.gax_options.clone();
1087 let mut request = read
1088 .into_request()
1089 .set_session(self.session_name.clone())
1090 .set_transaction(self.transaction_selector.selector().await?);
1091 request.request_options = self.amend_request_options(request.request_options);
1092
1093 execute_stream_with_retry!(
1094 self,
1095 request,
1096 gax_options,
1097 streaming_read,
1098 StreamOperation::Read
1099 )
1100 }
1101}
1102
1103#[cfg(test)]
1104pub(crate) mod tests {
1105 use super::*;
1106 use crate::result_set::tests::adapt;
1107 use crate::result_set::tests::string_val;
1108 use crate::statement::Statement;
1109 use crate::value::Value;
1110 use gaxi::grpc::tonic::{self, Code, Response, Status};
1111 use google_cloud_gax::error::rpc::Code as GaxCode;
1112 use google_cloud_gax::exponential_backoff::ExponentialBackoff;
1113 use google_cloud_gax::retry_policy::NeverRetry;
1114 use google_cloud_test_macros::tokio_test_no_panics;
1115 use http::{HeaderMap, HeaderName, HeaderValue};
1116 use mock_v1::transaction_selector::Selector;
1117 use spanner_grpc_mock::MockSpanner;
1118 use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1119 use std::sync::mpsc::channel as std_channel;
1120 use std::sync::{Arc, Mutex as StdMutex};
1121 use tokio::sync::oneshot::channel as oneshot_channel;
1122 use tokio::sync::{Barrier, Mutex, Notify, mpsc};
1123
1124 #[test]
1125 fn auto_traits() {
1126 static_assertions::assert_impl_all!(SingleUseReadOnlyTransactionBuilder: Send, Sync);
1127 static_assertions::assert_impl_all!(SingleUseReadOnlyTransaction: Send, Sync, std::fmt::Debug);
1128 static_assertions::assert_impl_all!(MultiUseReadOnlyTransactionBuilder: Send, Sync);
1129 static_assertions::assert_impl_all!(MultiUseReadOnlyTransaction: Send, Sync, std::fmt::Debug);
1130 static_assertions::assert_impl_all!(ReadContext: Send, Sync, std::fmt::Debug);
1131 }
1132
1133 pub(crate) fn create_session_mock() -> spanner_grpc_mock::MockSpanner {
1134 let mut mock = spanner_grpc_mock::MockSpanner::new();
1135 mock.expect_create_session().once().returning(|_| {
1136 Ok(Response::new(mock_v1::Session {
1137 name: "projects/p/instances/i/databases/d/sessions/123".to_string(),
1138 ..Default::default()
1139 }))
1140 });
1141 mock
1142 }
1143
1144 fn setup_select1() -> spanner_grpc_mock::google::spanner::v1::PartialResultSet {
1145 spanner_grpc_mock::google::spanner::v1::PartialResultSet {
1146 metadata: Some(spanner_grpc_mock::google::spanner::v1::ResultSetMetadata {
1147 row_type: Some(spanner_grpc_mock::google::spanner::v1::StructType {
1148 fields: vec![Default::default()],
1149 }),
1150 ..Default::default()
1151 }),
1152 values: vec![prost_types::Value {
1153 kind: Some(prost_types::value::Kind::StringValue("1".to_string())),
1154 }],
1155 last: true,
1156 ..Default::default()
1157 }
1158 }
1159
1160 pub(crate) async fn setup_db_client(
1161 mock: spanner_grpc_mock::MockSpanner,
1162 ) -> (DatabaseClient, tokio::task::JoinHandle<()>) {
1163 use crate::client::Spanner;
1164 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
1165 let (address, server) = spanner_grpc_mock::start("0.0.0.0:0", mock)
1166 .await
1167 .expect("Failed to start mock server");
1168
1169 let spanner = Spanner::builder()
1170 .with_endpoint(address)
1171 .with_credentials(Anonymous::new().build())
1172 .build()
1173 .await
1174 .expect("Failed to build client");
1175
1176 let db_client = spanner
1177 .database_client("projects/p/instances/i/databases/d")
1178 .build()
1179 .await
1180 .expect("Failed to create DatabaseClient");
1181
1182 (db_client, server)
1183 }
1184
1185 #[tokio_test_no_panics]
1186 async fn single_use_builder() {
1187 let mock = create_session_mock();
1188
1189 let (db_client, _server) = setup_db_client(mock).await;
1190
1191 let tx = db_client.single_use().build();
1192 let selector = tx
1193 .context
1194 .transaction_selector
1195 .selector()
1196 .await
1197 .expect("Failed to get selector");
1198 let ro = selector
1199 .single_use()
1200 .expect("Expected SingleUse selector")
1201 .read_only()
1202 .expect("Expected ReadOnly mode");
1203 assert_eq!(
1204 ro.timestamp_bound,
1205 Some(crate::model::transaction_options::read_only::TimestampBound::Strong(true))
1206 );
1207
1208 let tx2 = db_client
1209 .single_use()
1210 .set_timestamp_bound(crate::timestamp_bound::TimestampBound::max_staleness(
1211 std::time::Duration::from_secs(10),
1212 ))
1213 .build();
1214 let selector = tx2
1215 .context
1216 .transaction_selector
1217 .selector()
1218 .await
1219 .expect("Failed to get selector");
1220 let ro2 = selector
1221 .single_use()
1222 .expect("Expected SingleUse selector")
1223 .read_only()
1224 .expect("Expected ReadOnly mode");
1225 assert_eq!(
1226 ro2.timestamp_bound,
1227 Some(
1228 crate::model::transaction_options::read_only::TimestampBound::MaxStaleness(
1229 Box::new(wkt::Duration::new(10, 0).expect("failed to create Duration"))
1230 )
1231 )
1232 );
1233 }
1234
1235 #[tokio_test_no_panics]
1236 async fn execute_single_query() {
1237 use super::super::result_set::tests::string_val;
1238 use crate::statement::Statement;
1239 use crate::value::Value;
1240
1241 let mut mock = create_session_mock();
1242
1243 mock.expect_execute_streaming_sql().once().returning(|req| {
1244 let req = req.into_inner();
1245 assert_eq!(
1246 req.session,
1247 "projects/p/instances/i/databases/d/sessions/123"
1248 );
1249 assert_eq!(req.sql, "SELECT 1");
1250
1251 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1252 setup_select1(),
1253 )])))
1254 });
1255
1256 let (db_client, _server) = setup_db_client(mock).await;
1257
1258 let tx = db_client.single_use().build();
1259 let mut rs = tx
1260 .execute_query(Statement::builder("SELECT 1").build())
1261 .await
1262 .expect("Failed to execute query");
1263
1264 let row = rs.next().await.expect("has row").expect("has valid row");
1265 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1266 let result = rs.next().await;
1267 assert!(result.is_none(), "expected None, got {result:?}");
1268 }
1269
1270 #[tokio_test_no_panics]
1271 async fn execute_multi_query() {
1272 use super::super::result_set::tests::string_val;
1273 use crate::statement::Statement;
1274 use crate::value::Value;
1275 use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1276
1277 let mut mock = create_session_mock();
1278
1279 mock.expect_begin_transaction().once().returning(|req| {
1280 let req = req.into_inner();
1281 assert_eq!(
1282 req.session,
1283 "projects/p/instances/i/databases/d/sessions/123"
1284 );
1285 Ok(tonic::Response::new(mock_v1::Transaction {
1286 id: vec![1, 2, 3],
1287 read_timestamp: Some(prost_types::Timestamp {
1289 seconds: 123456789,
1290 nanos: 0,
1291 }),
1292 ..Default::default()
1293 }))
1294 });
1295
1296 mock.expect_execute_streaming_sql()
1297 .times(2)
1298 .returning(|req| {
1299 let req = req.into_inner();
1300 assert_eq!(
1301 req.session,
1302 "projects/p/instances/i/databases/d/sessions/123"
1303 );
1304 assert_eq!(
1305 req.transaction
1306 .expect("transaction should be present")
1307 .selector
1308 .expect("selector should be present"),
1309 mock_v1::transaction_selector::Selector::Id(vec![1, 2, 3])
1310 );
1311
1312 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1313 setup_select1(),
1314 )])))
1315 });
1316
1317 let (db_client, _server) = setup_db_client(mock).await;
1318
1319 let tx = db_client
1320 .read_only_transaction()
1321 .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
1322 .build()
1323 .await
1324 .expect("Failed to start tx");
1325 assert_eq!(
1326 tx.read_timestamp()
1327 .expect("expected read timestamp")
1328 .seconds(),
1329 123456789
1330 );
1331
1332 for _ in 0..2 {
1333 let mut rs = tx
1334 .execute_query(Statement::builder("SELECT 1").build())
1335 .await
1336 .expect("Failed to execute query");
1337
1338 let row = rs.next().await.expect("has row").expect("has valid row");
1339 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1340
1341 let result = rs.next().await;
1342 assert!(result.is_none(), "expected None, got {result:?}");
1343 }
1344 }
1345
1346 #[tokio_test_no_panics]
1347 async fn execute_multi_query_inline_begin() -> anyhow::Result<()> {
1348 use super::super::result_set::tests::string_val;
1349 use crate::statement::Statement;
1350 use crate::value::Value;
1351 use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1352
1353 let mut mock = create_session_mock();
1354
1355 mock.expect_begin_transaction().never();
1357
1358 let mut seq = mockall::Sequence::new();
1359
1360 mock.expect_execute_streaming_sql()
1361 .times(1)
1362 .in_sequence(&mut seq)
1363 .returning(move |req| {
1364 let req = req.into_inner();
1365 assert_eq!(
1366 req.session,
1367 "projects/p/instances/i/databases/d/sessions/123"
1368 );
1369
1370 match req.transaction.unwrap().selector.unwrap() {
1372 mock_v1::transaction_selector::Selector::Begin(_) => {}
1373 _ => panic!("Expected Selector::Begin"),
1374 }
1375 let mut rs = setup_select1();
1376 rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1377 id: vec![4, 5, 6],
1378 read_timestamp: Some(prost_types::Timestamp {
1379 seconds: 987654321,
1380 nanos: 0,
1381 }),
1382 ..Default::default()
1383 });
1384 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(rs)])))
1385 });
1386
1387 mock.expect_execute_streaming_sql()
1388 .times(1)
1389 .in_sequence(&mut seq)
1390 .returning(move |req| {
1391 let req = req.into_inner();
1392 match req.transaction.unwrap().selector.unwrap() {
1394 mock_v1::transaction_selector::Selector::Id(id) => {
1395 assert_eq!(id, vec![4, 5, 6]);
1396 }
1397 _ => panic!("Expected Selector::Id"),
1398 }
1399 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1400 setup_select1(),
1401 )])))
1402 });
1403
1404 let (db_client, _server) = setup_db_client(mock).await;
1405
1406 let tx = db_client
1407 .read_only_transaction()
1408 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1409 .build()
1410 .await?;
1411
1412 assert!(tx.read_timestamp().is_none());
1414
1415 for i in 0..2 {
1416 let mut rs = tx
1417 .execute_query(Statement::builder("SELECT 1").build())
1418 .await?;
1419
1420 let row = rs.next().await.expect("Expected a row")?;
1421 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1422
1423 let result = rs.next().await;
1424 assert!(result.is_none(), "Expected None, got {result:?}");
1425
1426 if i == 0 {
1427 assert_eq!(
1429 tx.read_timestamp()
1430 .expect("Expected read timestamp")
1431 .seconds(),
1432 987654321
1433 );
1434 }
1435 }
1436
1437 Ok(())
1438 }
1439
1440 #[tokio_test_no_panics]
1441 async fn execute_single_read() {
1442 use super::super::result_set::tests::string_val;
1443 use crate::key::KeySet;
1444 use crate::read::ReadRequest;
1445 use crate::value::Value;
1446
1447 let mut mock = create_session_mock();
1448
1449 mock.expect_streaming_read().once().returning(|req| {
1450 let req = req.into_inner();
1451 assert_eq!(
1452 req.session,
1453 "projects/p/instances/i/databases/d/sessions/123"
1454 );
1455 assert_eq!(req.table, "Users");
1456 assert_eq!(req.columns, vec!["Id".to_string(), "Name".to_string()]);
1457
1458 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1459 setup_select1(),
1460 )])))
1461 });
1462
1463 let (db_client, _server) = setup_db_client(mock).await;
1464
1465 let tx = db_client.single_use().build();
1466 let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1467 .with_keys(KeySet::all())
1468 .build();
1469 let mut rs = tx.execute_read(read).await.expect("Failed to execute read");
1470
1471 let row = rs.next().await.expect("has row").expect("has valid row");
1472 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1473 let result = rs.next().await;
1474 assert!(result.is_none(), "expected None, got {result:?}");
1475 }
1476
1477 #[tokio_test_no_panics]
1478 async fn execute_multi_read() -> anyhow::Result<()> {
1479 use super::super::result_set::tests::string_val;
1480 use crate::key::KeySet;
1481 use crate::read::ReadRequest;
1482 use crate::value::Value;
1483 use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1484
1485 let mut mock = create_session_mock();
1486
1487 mock.expect_begin_transaction().never();
1489
1490 let mut seq = mockall::Sequence::new();
1491
1492 mock.expect_streaming_read()
1493 .times(1)
1494 .in_sequence(&mut seq)
1495 .returning(move |req| {
1496 let req = req.into_inner();
1497 assert_eq!(
1498 req.session,
1499 "projects/p/instances/i/databases/d/sessions/123"
1500 );
1501
1502 match req.transaction.unwrap().selector.unwrap() {
1504 mock_v1::transaction_selector::Selector::Begin(_) => {}
1505 _ => panic!("Expected Selector::Begin"),
1506 }
1507 let mut rs = setup_select1();
1508 rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1509 id: vec![4, 5, 6],
1510 read_timestamp: Some(prost_types::Timestamp {
1511 seconds: 987654321,
1512 nanos: 0,
1513 }),
1514 ..Default::default()
1515 });
1516 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(rs)])))
1517 });
1518
1519 mock.expect_streaming_read()
1520 .times(1)
1521 .in_sequence(&mut seq)
1522 .returning(move |req| {
1523 let req = req.into_inner();
1524 match req.transaction.unwrap().selector.unwrap() {
1526 mock_v1::transaction_selector::Selector::Id(id) => {
1527 assert_eq!(id, vec![4, 5, 6]);
1528 }
1529 _ => panic!("Expected Selector::Id"),
1530 }
1531 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1532 setup_select1(),
1533 )])))
1534 });
1535
1536 let (db_client, _server) = setup_db_client(mock).await;
1537
1538 let tx = db_client
1539 .read_only_transaction()
1540 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1541 .build()
1542 .await?;
1543
1544 assert!(tx.read_timestamp().is_none());
1546
1547 for i in 0..2 {
1548 let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1549 .with_keys(KeySet::all())
1550 .build();
1551 let mut rs = tx.execute_read(read).await?;
1552
1553 let row = rs.next().await.expect("Expected a row")?;
1554 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
1555
1556 let result = rs.next().await;
1557 assert!(result.is_none(), "Expected None, got {result:?}");
1558
1559 if i == 0 {
1560 assert_eq!(
1562 tx.read_timestamp()
1563 .expect("Expected read timestamp")
1564 .seconds(),
1565 987654321
1566 );
1567 }
1568 }
1569
1570 Ok(())
1571 }
1572
1573 #[tokio_test_no_panics]
1574 async fn inline_begin_failure_retry_success() -> anyhow::Result<()> {
1575 use crate::value::Value;
1576 use gaxi::grpc::tonic::Status;
1577 use tonic::Response;
1578
1579 let mut mock = create_session_mock();
1580 let mut seq = mockall::Sequence::new();
1581
1582 mock.expect_execute_streaming_sql()
1584 .times(1)
1585 .in_sequence(&mut seq)
1586 .returning(|_| Err(Status::internal("Internal error")));
1587
1588 mock.expect_begin_transaction()
1590 .times(1)
1591 .in_sequence(&mut seq)
1592 .returning(|req| {
1593 let req = req.into_inner();
1594 assert_eq!(
1595 req.session,
1596 "projects/p/instances/i/databases/d/sessions/123"
1597 );
1598 Ok(Response::new(mock_v1::Transaction {
1600 id: vec![7, 8, 9],
1601 read_timestamp: Some(prost_types::Timestamp {
1602 seconds: 123456789,
1603 nanos: 0,
1604 }),
1605 ..Default::default()
1606 }))
1607 });
1608
1609 mock.expect_execute_streaming_sql()
1611 .times(1)
1612 .in_sequence(&mut seq)
1613 .returning(|req| {
1614 let req = req.into_inner();
1615 match req.transaction.unwrap().selector.unwrap() {
1617 mock_v1::transaction_selector::Selector::Id(id) => {
1618 assert_eq!(id, vec![7, 8, 9]);
1619 }
1620 _ => panic!("Expected Selector::Id"),
1621 }
1622 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1623 setup_select1(),
1624 )])))
1625 });
1626
1627 let (db_client, _server) = setup_db_client(mock).await;
1628 let tx = db_client
1629 .read_only_transaction()
1630 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1631 .build()
1632 .await?;
1633
1634 let mut rs = tx
1635 .execute_query(Statement::builder("SELECT 1").build())
1636 .await?;
1637
1638 let row = rs
1639 .next()
1640 .await
1641 .ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??;
1642 assert_eq!(
1643 row.raw_values(),
1644 [Value(string_val("1"))],
1645 "The parsed row value safely matched the underlying stream chunk"
1646 );
1647
1648 Ok(())
1649 }
1650
1651 #[tokio_test_no_panics]
1652 async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> {
1653 use gaxi::grpc::tonic::Status;
1654 use tonic::Response;
1655
1656 let mut mock = create_session_mock();
1657 let mut seq = mockall::Sequence::new();
1658
1659 mock.expect_execute_streaming_sql()
1661 .times(1)
1662 .in_sequence(&mut seq)
1663 .returning(|_| Err(Status::internal("Internal error first")));
1664
1665 mock.expect_begin_transaction()
1667 .times(1)
1668 .in_sequence(&mut seq)
1669 .returning(|_| {
1670 Ok(Response::new(mock_v1::Transaction {
1671 id: vec![7, 8, 9],
1672 read_timestamp: Some(prost_types::Timestamp {
1673 seconds: 123456789,
1674 nanos: 0,
1675 }),
1676 ..Default::default()
1677 }))
1678 });
1679
1680 mock.expect_execute_streaming_sql()
1682 .times(1)
1683 .in_sequence(&mut seq)
1684 .returning(|_| Err(Status::internal("Internal error second")));
1685
1686 let (db_client, _server) = setup_db_client(mock).await;
1687 let tx = db_client
1688 .read_only_transaction()
1689 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1690 .build()
1691 .await?;
1692
1693 let rs_result = tx
1694 .execute_query(Statement::builder("SELECT 1").build())
1695 .await;
1696
1697 assert!(
1698 rs_result.is_err(),
1699 "The failed execution bubbled upwards securely"
1700 );
1701 let err_str = rs_result.unwrap_err().to_string();
1702 assert!(
1703 err_str.contains("Internal error second"),
1704 "Secondary error message accurately propagates: {}",
1705 err_str
1706 );
1707
1708 Ok(())
1709 }
1710
1711 #[tokio_test_no_panics]
1712 async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> {
1713 use gaxi::grpc::tonic::Status;
1714
1715 let mut mock = create_session_mock();
1716 let mut seq = mockall::Sequence::new();
1717
1718 mock.expect_execute_streaming_sql()
1720 .times(1)
1721 .in_sequence(&mut seq)
1722 .returning(|_| Err(Status::internal("Internal error query")));
1723
1724 mock.expect_begin_transaction()
1726 .times(1)
1727 .in_sequence(&mut seq)
1728 .returning(|_| Err(Status::internal("Internal error begin tx")));
1729
1730 let (db_client, _server) = setup_db_client(mock).await;
1731 let tx = db_client
1732 .read_only_transaction()
1733 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1734 .build()
1735 .await?;
1736
1737 let rs_result = tx
1738 .execute_query(Statement::builder("SELECT 1").build())
1739 .await;
1740
1741 assert!(
1742 rs_result.is_err(),
1743 "The explicitly errored fallback boot securely propagated outwards"
1744 );
1745 let err_str = rs_result.unwrap_err().to_string();
1746 assert!(
1747 err_str.contains("Internal error begin tx"),
1748 "Natively propagated specific BeginTx bounds: {}",
1749 err_str
1750 );
1751
1752 Ok(())
1753 }
1754
1755 #[tokio_test_no_panics]
1756 async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> {
1757 use crate::key::KeySet;
1758 use crate::read::ReadRequest;
1759 use crate::value::Value;
1760 use gaxi::grpc::tonic::Status;
1761 use tonic::Response;
1762
1763 let mut mock = create_session_mock();
1764 let mut seq = mockall::Sequence::new();
1765
1766 mock.expect_streaming_read()
1768 .times(1)
1769 .in_sequence(&mut seq)
1770 .returning(|_| Err(Status::internal("Internal error")));
1771
1772 mock.expect_begin_transaction()
1774 .times(1)
1775 .in_sequence(&mut seq)
1776 .returning(|_| {
1777 Ok(Response::new(mock_v1::Transaction {
1778 id: vec![7, 8, 9],
1779 read_timestamp: None,
1780 ..Default::default()
1781 }))
1782 });
1783
1784 mock.expect_streaming_read()
1786 .times(1)
1787 .in_sequence(&mut seq)
1788 .returning(|req| {
1789 let req = req.into_inner();
1790 match req.transaction.unwrap().selector.unwrap() {
1792 mock_v1::transaction_selector::Selector::Id(id) => {
1793 assert_eq!(id, vec![7, 8, 9]);
1794 }
1795 _ => panic!("Expected Selector::Id"),
1796 }
1797 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
1798 setup_select1(),
1799 )])))
1800 });
1801
1802 let (db_client, _server) = setup_db_client(mock).await;
1803 let tx = db_client
1804 .read_only_transaction()
1805 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1806 .build()
1807 .await?;
1808
1809 let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1810 .with_keys(KeySet::all())
1811 .build();
1812 let mut rs = tx.execute_read(read).await?;
1813
1814 let row = rs
1815 .next()
1816 .await
1817 .ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??;
1818 assert_eq!(
1819 row.raw_values(),
1820 [Value(string_val("1"))],
1821 "The macro correctly unpacked read arrays seamlessly"
1822 );
1823
1824 Ok(())
1825 }
1826
1827 #[tokio_test_no_panics]
1828 async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> {
1829 use crate::statement::Statement;
1830 use gaxi::grpc::tonic::Status;
1831
1832 let mut mock = create_session_mock();
1833
1834 mock.expect_execute_streaming_sql()
1835 .times(1)
1836 .returning(|_| Err(Status::internal("Internal error single use query")));
1837
1838 mock.expect_begin_transaction().never();
1839
1840 let (db_client, _server) = setup_db_client(mock).await;
1841 let tx = db_client.single_use().build();
1843
1844 let rs_result = tx
1845 .execute_query(Statement::builder("SELECT 1").build())
1846 .await;
1847
1848 assert!(rs_result.is_err());
1849 let err_str = rs_result.unwrap_err().to_string();
1850 assert!(err_str.contains("Internal error single use query"));
1851
1852 Ok(())
1853 }
1854
1855 #[tokio_test_no_panics]
1856 async fn inline_begin_already_started_query_send_error_returns_immediately()
1857 -> anyhow::Result<()> {
1858 use crate::statement::Statement;
1859 use gaxi::grpc::tonic::Status;
1860 use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1861
1862 let mut mock = create_session_mock();
1863 let mut seq = mockall::Sequence::new();
1864
1865 mock.expect_begin_transaction().never();
1866
1867 mock.expect_execute_streaming_sql()
1869 .times(1)
1870 .in_sequence(&mut seq)
1871 .returning(move |_req| {
1872 let mut rs = setup_select1();
1873 rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1874 id: vec![4, 5, 6],
1875 read_timestamp: None,
1876 ..Default::default()
1877 });
1878 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(rs)])))
1879 });
1880
1881 mock.expect_execute_streaming_sql()
1883 .times(1)
1884 .in_sequence(&mut seq)
1885 .returning(|_| Err(Status::internal("Internal error second query")));
1886
1887 let (db_client, _server) = setup_db_client(mock).await;
1888
1889 let tx = db_client
1890 .read_only_transaction()
1891 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1892 .build()
1893 .await?;
1894
1895 let mut rs = tx
1897 .execute_query(Statement::builder("SELECT 1").build())
1898 .await?;
1899 let _ = rs.next().await.expect("has row")?;
1900
1901 let rs_result = tx
1903 .execute_query(Statement::builder("SELECT 2").build())
1904 .await;
1905
1906 assert!(rs_result.is_err());
1907 let err_str = rs_result.unwrap_err().to_string();
1908 assert!(err_str.contains("Internal error second query"));
1909
1910 Ok(())
1911 }
1912
1913 #[tokio_test_no_panics]
1914 async fn execute_concurrent_queries_inline_begin() -> anyhow::Result<()> {
1915 let mut mock = create_session_mock();
1916 mock.expect_begin_transaction().never();
1917
1918 let mut seq = mockall::Sequence::new();
1919 let (tx_sender, rx_receiver) = mpsc::channel(1);
1920 let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
1921
1922 let task1_ready = Arc::new(Notify::new());
1923 let task1_ready_clone = Arc::clone(&task1_ready);
1924 let tasks_started = Arc::new(Barrier::new(3));
1925
1926 mock.expect_execute_streaming_sql()
1928 .times(1)
1929 .in_sequence(&mut seq)
1930 .returning(move |req| {
1931 task1_ready_clone.notify_one();
1932 let req = req.into_inner();
1933 match req.transaction.unwrap().selector.unwrap() {
1934 Selector::Begin(_) => {}
1935 _ => panic!("Expected Selector::Begin for first query"),
1936 }
1937 let rx = rx_receiver
1938 .try_lock()
1939 .expect("mutex poisoned")
1940 .take()
1941 .unwrap();
1942 Ok(Response::from(rx))
1943 });
1944
1945 mock.expect_execute_streaming_sql()
1947 .times(2)
1948 .in_sequence(&mut seq)
1949 .returning(move |req| {
1950 let req = req.into_inner();
1951 match req.transaction.unwrap().selector.unwrap() {
1952 Selector::Id(id) => {
1953 assert_eq!(id, vec![4, 5, 6]);
1954 }
1955 _ => panic!("Expected Selector::Id for other queries"),
1956 }
1957
1958 let (tx, rx) = mpsc::channel(1);
1959 tx.try_send(Ok(setup_select1()))
1960 .expect("send should succeed");
1961 Ok(Response::from(rx))
1962 });
1963
1964 let (db_client, _server) = setup_db_client(mock).await;
1965 let tx = db_client
1966 .read_only_transaction()
1967 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
1968 .build()
1969 .await?;
1970 let tx = Arc::new(tx);
1971
1972 let tx1 = Arc::clone(&tx);
1975 let handle1 = tokio::spawn(async move {
1976 let mut rs = tx1
1977 .execute_query(Statement::builder("SELECT 1").build())
1978 .await?;
1979 let _ = rs.next().await;
1981 Ok::<_, crate::Error>(rs)
1982 });
1983
1984 task1_ready.notified().await;
1986
1987 let tx2 = Arc::clone(&tx);
1988 let tasks_started2 = Arc::clone(&tasks_started);
1989 let handle2 = tokio::spawn(async move {
1990 tasks_started2.wait().await;
1991 tx2.execute_query(Statement::builder("SELECT 1").build())
1992 .await
1993 });
1994
1995 let tx3 = Arc::clone(&tx);
1996 let tasks_started3 = Arc::clone(&tasks_started);
1997 let handle3 = tokio::spawn(async move {
1998 tasks_started3.wait().await;
1999 tx3.execute_query(Statement::builder("SELECT 1").build())
2000 .await
2001 });
2002
2003 tasks_started.wait().await;
2005
2006 tokio::task::yield_now().await;
2010
2011 let mut rs = setup_select1();
2014 rs.metadata
2015 .as_mut()
2016 .expect("metadata should be present")
2017 .transaction = Some(mock_v1::Transaction {
2018 id: vec![4, 5, 6],
2019 read_timestamp: Some(prost_types::Timestamp {
2020 seconds: 987654321,
2021 nanos: 0,
2022 }),
2023 ..Default::default()
2024 });
2025 tx_sender.send(Ok(rs)).await.expect("channel broken");
2026 drop(tx_sender);
2027
2028 let mut rs1 = handle1.await??;
2030 let mut rs2 = handle2.await??;
2031 let mut rs3 = handle3.await??;
2032
2033 assert!(rs1.next().await.is_none());
2035
2036 let row2 = rs2.next().await.expect("Expected a row")?;
2037 assert_eq!(row2.raw_values(), [Value(string_val("1"))]);
2038 assert!(rs2.next().await.is_none());
2039
2040 let row3 = rs3.next().await.expect("Expected a row")?;
2041 assert_eq!(row3.raw_values(), [Value(string_val("1"))]);
2042 assert!(rs3.next().await.is_none());
2043
2044 assert_eq!(
2046 tx.read_timestamp()
2047 .expect("read timestamp should be populated")
2048 .seconds(),
2049 987654321
2050 );
2051
2052 Ok(())
2053 }
2054
2055 #[tokio_test_no_panics]
2056 async fn execute_concurrent_queries_inline_begin_failed_cascade() -> anyhow::Result<()> {
2057 let mut mock = create_session_mock();
2058 let mut seq = mockall::Sequence::new();
2059
2060 let (tx_sender, rx_receiver) = mpsc::channel(1);
2061 let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
2062
2063 let task1_ready = Arc::new(Notify::new());
2064 let task1_ready_clone = Arc::clone(&task1_ready);
2065 let tasks_started = Arc::new(Barrier::new(3));
2066
2067 mock.expect_execute_streaming_sql()
2070 .times(1)
2071 .in_sequence(&mut seq)
2072 .returning(move |_req| {
2073 task1_ready_clone.notify_one();
2074 let rx = rx_receiver
2075 .try_lock()
2076 .expect("mutex poisoned")
2077 .take()
2078 .expect("receiver should be present");
2079 Ok(tonic::Response::from(rx))
2080 });
2081
2082 mock.expect_begin_transaction()
2084 .times(1)
2085 .in_sequence(&mut seq)
2086 .returning(|_| {
2087 Err(gaxi::grpc::tonic::Status::internal(
2088 "Fallback BeginTransaction failed",
2089 ))
2090 });
2091
2092 mock.expect_execute_streaming_sql().times(0).returning(|_| {
2094 panic!("Other queries should not launch after failure to start the transaction")
2095 });
2096
2097 let (db_client, _server) = setup_db_client(mock).await;
2098 let tx = db_client
2099 .read_only_transaction()
2100 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2101 .build()
2102 .await?;
2103 let tx = Arc::new(tx);
2104
2105 let tx1 = Arc::clone(&tx);
2107 let handle1 = tokio::spawn(async move {
2108 let mut rs = tx1
2109 .execute_query(Statement::builder("SELECT 1").build())
2110 .await?;
2111 rs.next().await.ok_or_else(|| {
2112 crate::error::internal_error("stream exhausted (this should never happen)")
2113 })??;
2114 Ok::<_, crate::Error>(rs)
2115 });
2116
2117 task1_ready.notified().await;
2119
2120 let tx2 = Arc::clone(&tx);
2121 let tasks_started2 = Arc::clone(&tasks_started);
2122 let handle2 = tokio::spawn(async move {
2123 tasks_started2.wait().await;
2124 tx2.execute_query(Statement::builder("SELECT 1").build())
2125 .await
2126 });
2127
2128 let tx3 = Arc::clone(&tx);
2129 let tasks_started3 = Arc::clone(&tasks_started);
2130 let handle3 = tokio::spawn(async move {
2131 tasks_started3.wait().await;
2132 tx3.execute_query(Statement::builder("SELECT 1").build())
2133 .await
2134 });
2135
2136 tasks_started.wait().await;
2138
2139 tokio::task::yield_now().await;
2143
2144 tx_sender
2146 .send(Err(gaxi::grpc::tonic::Status::internal(
2147 "Mocked boot failed",
2148 )))
2149 .await
2150 .expect("channel broken");
2151 drop(tx_sender);
2152
2153 let err1 = handle1
2155 .await?
2156 .expect_err("task 1 should have failed")
2157 .to_string();
2158 let err2 = handle2
2159 .await?
2160 .expect_err("task 2 should have failed")
2161 .to_string();
2162 let err3 = handle3
2163 .await?
2164 .expect_err("task 3 should have failed")
2165 .to_string();
2166
2167 assert!(
2168 err1.contains("Fallback BeginTransaction failed"),
2169 "err1: {}",
2170 err1
2171 );
2172 assert!(
2173 err2.contains("Fallback BeginTransaction failed"),
2174 "err2: {}",
2175 err2
2176 );
2177 assert!(
2178 err3.contains("Fallback BeginTransaction failed"),
2179 "err3: {}",
2180 err3
2181 );
2182
2183 Ok(())
2184 }
2185
2186 #[tokio_test_no_panics]
2187 async fn execute_concurrent_queries_inline_begin_stream_restart_deadlock_prevention()
2188 -> crate::Result<()> {
2189 let mut mock = create_session_mock();
2190 mock.expect_begin_transaction().never();
2191
2192 let mut seq = mockall::Sequence::new();
2193
2194 let (tx_sender, rx_receiver) = mpsc::channel(1);
2195 let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
2196
2197 let task1_ready = Arc::new(Notify::new());
2198 let task1_ready_clone = Arc::clone(&task1_ready);
2199 let tasks_started = Arc::new(Barrier::new(3));
2200
2201 mock.expect_execute_streaming_sql()
2203 .times(1)
2204 .in_sequence(&mut seq)
2205 .returning(move |req| {
2206 let req = req.into_inner();
2207 task1_ready_clone.notify_one();
2210 match req
2211 .transaction
2212 .expect("transaction should be present")
2213 .selector
2214 .expect("selector should be present")
2215 {
2216 Selector::Begin(_) => {}
2217 _ => panic!("Expected Selector::Begin for first query"),
2218 }
2219 let rx = rx_receiver
2220 .try_lock()
2221 .expect("mutex poisoned")
2222 .take()
2223 .expect("receiver should be present");
2224 Ok(Response::from(rx))
2225 });
2226
2227 mock.expect_execute_streaming_sql()
2230 .times(1)
2231 .in_sequence(&mut seq)
2232 .returning(move |req| {
2233 let req = req.into_inner();
2234 match req
2235 .transaction
2236 .expect("transaction should be present")
2237 .selector
2238 .expect("selector should be present")
2239 {
2240 Selector::Begin(_) => {
2241 let mut rs = setup_select1();
2242 rs.metadata
2243 .as_mut()
2244 .expect("metadata should be present")
2245 .transaction = Some(mock_v1::Transaction {
2246 id: vec![4, 5, 6],
2247 ..Default::default()
2248 });
2249 let (tx, rx) = mpsc::channel(1);
2250 tx.try_send(Ok(rs)).expect("send should succeed");
2251 Ok(Response::from(rx))
2252 }
2253 _ => panic!("Expected Selector::Begin for stream restart query"),
2254 }
2255 });
2256
2257 mock.expect_execute_streaming_sql()
2259 .times(2)
2260 .in_sequence(&mut seq)
2261 .returning(move |req| {
2262 let req = req.into_inner();
2263 match req
2264 .transaction
2265 .expect("transaction should be present")
2266 .selector
2267 .expect("selector should be present")
2268 {
2269 Selector::Id(id) => {
2270 assert_eq!(id, vec![4, 5, 6]);
2271 let (tx, rx) = mpsc::channel(1);
2272 tx.try_send(Ok(setup_select1()))
2273 .expect("send should succeed");
2274 Ok(Response::from(rx))
2275 }
2276 _ => panic!("Expected Selector::Id for concurrent queries"),
2277 }
2278 });
2279
2280 let (db_client, _server) = setup_db_client(mock).await;
2281 let tx = db_client
2282 .read_only_transaction()
2283 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2284 .build()
2285 .await?;
2286 let tx = Arc::new(tx);
2287
2288 let handle1_tx = Arc::clone(&tx);
2289 let handle1 = tokio::spawn(async move {
2290 let mut rs = handle1_tx
2291 .execute_query(Statement::builder("SELECT 1").build())
2292 .await?;
2293 let _ = rs.next().await.ok_or_else(|| {
2294 crate::error::internal_error("stream exhausted (this should never happen)")
2295 })??;
2296 Ok::<_, crate::Error>(rs)
2297 });
2298
2299 task1_ready.notified().await;
2301
2302 let handle2_tx = Arc::clone(&tx);
2303 let tasks_started2 = Arc::clone(&tasks_started);
2304 let handle2 = tokio::spawn(async move {
2305 tasks_started2.wait().await;
2306 let mut rs = handle2_tx
2307 .execute_query(Statement::builder("SELECT 1").build())
2308 .await?;
2309 let _ = rs.next().await.ok_or_else(|| {
2310 crate::error::internal_error("stream exhausted (this should never happen)")
2311 })??;
2312 Ok::<_, crate::Error>(rs)
2313 });
2314
2315 let handle3_tx = Arc::clone(&tx);
2316 let tasks_started3 = Arc::clone(&tasks_started);
2317 let handle3 = tokio::spawn(async move {
2318 tasks_started3.wait().await;
2319 let mut rs = handle3_tx
2320 .execute_query(Statement::builder("SELECT 1").build())
2321 .await?;
2322 let _ = rs.next().await.ok_or_else(|| {
2323 crate::error::internal_error("stream exhausted (this should never happen)")
2324 })??;
2325 Ok::<_, crate::Error>(rs)
2326 });
2327
2328 tasks_started.wait().await;
2330
2331 tokio::task::yield_now().await;
2335
2336 let grpc_status = Status::new(gaxi::grpc::tonic::Code::Unavailable, "transient error");
2337 tx_sender.send(Err(grpc_status)).await.expect("send failed");
2338 drop(tx_sender);
2339
2340 let mut rs1 = handle1.await.expect("Task 1 panicked")?;
2345 let mut rs2 = handle2.await.expect("Task 2 panicked")?;
2346 let mut rs3 = handle3.await.expect("Task 3 panicked")?;
2347
2348 assert!(rs1.next().await.is_none(), "Stream 1 should be exhausted");
2351 assert!(rs2.next().await.is_none(), "Stream 2 should be exhausted");
2352 assert!(rs3.next().await.is_none(), "Stream 3 should be exhausted");
2353
2354 Ok(())
2355 }
2356
2357 #[tokio_test_no_panics]
2358 async fn execute_concurrent_queries_late_arrival_failure() -> anyhow::Result<()> {
2359 let mut mock = create_session_mock();
2360 let mut seq = mockall::Sequence::new();
2361
2362 mock.expect_execute_streaming_sql()
2364 .times(1)
2365 .in_sequence(&mut seq)
2366 .returning(|req| {
2367 let req = req.into_inner();
2368 match req
2369 .transaction
2370 .expect("transaction should be present")
2371 .selector
2372 .expect("selector should be present")
2373 {
2374 Selector::Begin(_) => {}
2375 _ => panic!("Expected Selector::Begin for first query"),
2376 }
2377 Err(Status::internal("Initial inline-begin failed"))
2378 });
2379
2380 mock.expect_begin_transaction()
2382 .times(1)
2383 .in_sequence(&mut seq)
2384 .returning(|_| Err(Status::internal("Fallback BeginTransaction failed")));
2385
2386 mock.expect_execute_streaming_sql().never();
2388
2389 let (db_client, _server) = setup_db_client(mock).await;
2390 let tx = db_client
2391 .read_only_transaction()
2392 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2393 .build()
2394 .await?;
2395
2396 let err1 = tx
2398 .execute_query(Statement::builder("SELECT 1").build())
2399 .await
2400 .expect_err("First query should fail");
2401 assert!(
2402 err1.to_string()
2403 .contains("Fallback BeginTransaction failed")
2404 );
2405
2406 let err2 = tx
2409 .execute_query(Statement::builder("SELECT 1").build())
2410 .await
2411 .expect_err("Late query should fail immediately");
2412 assert!(
2413 err2.to_string()
2414 .contains("Fallback BeginTransaction failed")
2415 );
2416
2417 Ok(())
2418 }
2419
2420 #[tokio_test_no_panics]
2421 async fn execute_concurrent_reads_inline_begin() -> anyhow::Result<()> {
2422 use crate::key::KeySet;
2423 use crate::read::ReadRequest;
2424 let mut mock = create_session_mock();
2425 mock.expect_begin_transaction().never();
2426
2427 let mut seq = mockall::Sequence::new();
2428 let (tx_sender, rx_receiver) = mpsc::channel(1);
2429 let rx_receiver = Arc::new(Mutex::new(Some(rx_receiver)));
2430
2431 let task1_ready = Arc::new(Notify::new());
2432 let task1_ready_clone = Arc::clone(&task1_ready);
2433 let tasks_started = Arc::new(Barrier::new(3));
2434
2435 mock.expect_streaming_read()
2437 .times(1)
2438 .in_sequence(&mut seq)
2439 .returning(move |req| {
2440 task1_ready_clone.notify_one();
2441 let req = req.into_inner();
2442 match req
2443 .transaction
2444 .expect("transaction should be present")
2445 .selector
2446 .expect("selector should be present")
2447 {
2448 mock_v1::transaction_selector::Selector::Begin(_) => {}
2449 _ => panic!("Expected Selector::Begin for first read"),
2450 }
2451
2452 let rx = rx_receiver
2453 .try_lock()
2454 .expect("mutex poisoned")
2455 .take()
2456 .expect("receiver should be present");
2457 Ok(Response::from(rx))
2458 });
2459
2460 mock.expect_streaming_read()
2462 .times(2)
2463 .in_sequence(&mut seq)
2464 .returning(move |req| {
2465 let req = req.into_inner();
2466 match req
2467 .transaction
2468 .expect("transaction should be present")
2469 .selector
2470 .expect("selector should be present")
2471 {
2472 mock_v1::transaction_selector::Selector::Id(id) => {
2473 assert_eq!(id, vec![4, 5, 6]);
2474 }
2475 _ => panic!("Expected Selector::Id for other reads"),
2476 }
2477
2478 let (tx, rx) = mpsc::channel(1);
2479 tx.try_send(Ok(setup_select1()))
2480 .expect("send should succeed");
2481 Ok(Response::from(rx))
2482 });
2483
2484 let (db_client, _server) = setup_db_client(mock).await;
2485 let tx = db_client
2486 .read_only_transaction()
2487 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2488 .build()
2489 .await?;
2490 let tx = Arc::new(tx);
2491
2492 let read_req = ReadRequest::builder("Table", vec!["Col"])
2493 .with_keys(KeySet::all())
2494 .build();
2495
2496 let tx1 = Arc::clone(&tx);
2498 let read1 = read_req.clone();
2499 let handle1 = tokio::spawn(async move {
2500 let mut rs = tx1.execute_read(read1).await?;
2501 let _ = rs.next().await;
2502 Ok::<_, crate::Error>(rs)
2503 });
2504
2505 task1_ready.notified().await;
2506
2507 let tx2 = Arc::clone(&tx);
2508 let read2 = read_req.clone();
2509 let tasks_started2 = Arc::clone(&tasks_started);
2510 let handle2 = tokio::spawn(async move {
2511 tasks_started2.wait().await;
2512 let mut rs = tx2.execute_read(read2).await?;
2513 let _ = rs.next().await;
2514 Ok::<_, crate::Error>(rs)
2515 });
2516
2517 let tx3 = Arc::clone(&tx);
2518 let read3 = read_req.clone();
2519 let tasks_started3 = Arc::clone(&tasks_started);
2520 let handle3 = tokio::spawn(async move {
2521 tasks_started3.wait().await;
2522 let mut rs = tx3.execute_read(read3).await?;
2523 let _ = rs.next().await;
2524 Ok::<_, crate::Error>(rs)
2525 });
2526
2527 tasks_started.wait().await;
2528 tokio::task::yield_now().await;
2529
2530 let mut rs = setup_select1();
2532 rs.metadata
2533 .as_mut()
2534 .expect("metadata should be present")
2535 .transaction = Some(mock_v1::Transaction {
2536 id: vec![4, 5, 6],
2537 ..Default::default()
2538 });
2539 tx_sender.send(Ok(rs)).await.expect("send failed");
2540 drop(tx_sender);
2541
2542 let mut rs1 = handle1.await.expect("Task 1 panicked")?;
2543 let mut rs2 = handle2.await.expect("Task 2 panicked")?;
2544 let mut rs3 = handle3.await.expect("Task 3 panicked")?;
2545
2546 assert!(rs1.next().await.is_none());
2547 assert!(rs2.next().await.is_none());
2548 assert!(rs3.next().await.is_none());
2549
2550 Ok(())
2551 }
2552
2553 #[tokio_test_no_panics]
2554 async fn execute_inline_begin_idempotent_update() -> anyhow::Result<()> {
2555 let (db_client, _server) = setup_db_client(create_session_mock()).await;
2556 let tx = db_client
2558 .read_only_transaction()
2559 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2560 .build()
2561 .await?;
2562
2563 let id1 = bytes::Bytes::from_static(b"tx1");
2564 let id2 = bytes::Bytes::from_static(b"tx2");
2565
2566 tx.context.transaction_selector.update(id1.clone(), None)?;
2568 assert_eq!(
2569 tx.context
2570 .transaction_selector
2571 .selector()
2572 .await?
2573 .id()
2574 .expect("ID should be present"),
2575 &id1
2576 );
2577
2578 tx.context.transaction_selector.update(id1.clone(), None)?;
2582
2583 let err2 = tx
2585 .context
2586 .transaction_selector
2587 .update(id2, None)
2588 .expect_err("Update after Started should fail");
2589 assert!(err2.to_string().contains("already Started or Failed"));
2590
2591 Ok(())
2592 }
2593
2594 #[tokio_test_no_panics]
2595 async fn execute_inline_begin_with_transient_failure() -> anyhow::Result<()> {
2596 let mut mock = create_session_mock();
2597 let mut seq = mockall::Sequence::new();
2598
2599 mock.expect_execute_streaming_sql()
2601 .times(1)
2602 .in_sequence(&mut seq)
2603 .returning(|_| Err(Status::new(Code::Unavailable, "Transient 1")));
2604
2605 mock.expect_begin_transaction()
2607 .times(1)
2608 .in_sequence(&mut seq)
2609 .returning(|_| {
2610 Ok(Response::new(mock_v1::Transaction {
2611 id: vec![7, 8, 9],
2612 ..Default::default()
2613 }))
2614 });
2615
2616 mock.expect_execute_streaming_sql()
2618 .times(1)
2619 .in_sequence(&mut seq)
2620 .returning(|_| {
2621 let (tx, rx) = mpsc::channel(1);
2622 tx.try_send(Ok(setup_select1()))
2623 .expect("send should succeed");
2624 Ok(Response::from(rx))
2625 });
2626
2627 let (db_client, _server) = setup_db_client(mock).await;
2628 let tx = db_client
2629 .read_only_transaction()
2630 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2631 .build()
2632 .await?;
2633
2634 let mut rs = tx
2635 .execute_query(Statement::builder("SELECT 1").build())
2636 .await?;
2637 assert!(rs.next().await.is_some());
2638 assert!(rs.next().await.is_none());
2639
2640 Ok(())
2641 }
2642
2643 #[tokio_test_no_panics]
2644 async fn leader_aware_routing_query_in_read_only() -> anyhow::Result<()> {
2645 let mut mock = create_session_mock();
2646 mock.expect_execute_streaming_sql().once().returning(|req| {
2647 assert!(
2648 req.metadata()
2649 .get("x-goog-spanner-route-to-leader")
2650 .is_none()
2651 );
2652 let stream = adapt([Ok(mock_v1::PartialResultSet {
2653 metadata: Some(mock_v1::ResultSetMetadata {
2654 row_type: Some(mock_v1::StructType { fields: vec![] }),
2655 ..Default::default()
2656 }),
2657 ..Default::default()
2658 })]);
2659 Ok(tonic::Response::from(stream))
2660 });
2661
2662 let (db_client, _server) = setup_db_client(mock).await;
2663 let tx = db_client.single_use().build();
2664 let _rs = tx
2665 .execute_query(Statement::builder("SELECT 1").build())
2666 .await?;
2667 Ok(())
2668 }
2669
2670 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2671 async fn execute_concurrent_begin_explicitly_redundancy_prevention() -> anyhow::Result<()> {
2672 let (tx_rpc, rx_rpc) = std_channel();
2673 let (tx_started, rx_started) = oneshot_channel();
2674 let tx_started_mutex = StdMutex::new(Some(tx_started));
2675
2676 let mut mock = create_session_mock();
2677 let mut seq = mockall::Sequence::new();
2678
2679 mock.expect_execute_streaming_sql()
2681 .once()
2682 .in_sequence(&mut seq)
2683 .returning(move |_req| {
2684 if let Some(tx) = tx_started_mutex.lock().expect("mutex poisoned").take() {
2685 let _ = tx.send(());
2686 }
2687 rx_rpc.recv().expect("channel broken");
2688 let (tx, rx) = mpsc::channel(1);
2689 let metadata = mock_v1::ResultSetMetadata {
2690 transaction: Some(mock_v1::Transaction {
2691 id: vec![42],
2692 ..Default::default()
2693 }),
2694 ..Default::default()
2695 };
2696 let prs = mock_v1::PartialResultSet {
2697 metadata: Some(metadata),
2698 ..Default::default()
2699 };
2700 tx.try_send(Ok(prs)).expect("send should succeed");
2701 Ok(tonic::Response::new(rx))
2702 });
2703
2704 mock.expect_execute_streaming_sql()
2707 .once()
2708 .in_sequence(&mut seq)
2709 .returning(move |req| {
2710 let req = req.into_inner();
2711 assert_eq!(
2712 req.transaction,
2713 Some(mock_v1::TransactionSelector {
2714 selector: Some(mock_v1::transaction_selector::Selector::Id(vec![42])),
2715 })
2716 );
2717 let (tx, rx) = mpsc::channel(1);
2718 let metadata = mock_v1::ResultSetMetadata {
2719 row_type: Some(mock_v1::StructType { fields: vec![] }),
2720 ..Default::default()
2721 };
2722 let prs = mock_v1::PartialResultSet {
2723 metadata: Some(metadata),
2724 ..Default::default()
2725 };
2726 tx.try_send(Ok(prs)).expect("send should succeed");
2727 Ok(tonic::Response::new(rx))
2728 });
2729
2730 let (db_client, _server) = setup_db_client(mock).await;
2731
2732 let tx = Arc::new(
2733 db_client
2734 .read_only_transaction()
2735 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2736 .build()
2737 .await?,
2738 );
2739
2740 let tx_leader = Arc::clone(&tx);
2741 let handle_leader = tokio::spawn(async move {
2742 let mut rs = tx_leader
2743 .execute_query(Statement::builder("SELECT 1").build())
2744 .await?;
2745 let _ = rs.next().await;
2746 Ok::<_, crate::Error>(())
2747 });
2748
2749 rx_started.await.expect("oneshot broken");
2750
2751 let tx_follower = Arc::clone(&tx);
2754 let handle_follower = tokio::spawn(async move {
2755 let mut rs = tx_follower
2756 .execute_query(Statement::builder("SELECT 2").build())
2757 .await?;
2758 let _ = rs.next().await;
2759 Ok::<_, crate::Error>(())
2760 });
2761
2762 tx_rpc.send(()).expect("send failed");
2764
2765 handle_leader.await.expect("Task 1 panicked")?;
2766 handle_follower.await.expect("Task 2 panicked")?;
2767
2768 Ok(())
2769 }
2770
2771 #[tokio_test_no_panics]
2772 async fn execute_multi_query_redundant_transaction_id_explicit() -> anyhow::Result<()> {
2773 run_execute_multi_query_redundant_transaction_id(BeginTransactionOption::ExplicitBegin)
2774 .await
2775 }
2776
2777 #[tokio_test_no_panics]
2778 async fn execute_multi_query_redundant_transaction_id_inline() -> anyhow::Result<()> {
2779 run_execute_multi_query_redundant_transaction_id(BeginTransactionOption::InlineBegin).await
2780 }
2781
2782 async fn run_execute_multi_query_redundant_transaction_id(
2783 option: BeginTransactionOption,
2784 ) -> anyhow::Result<()> {
2785 let mut mock = create_session_mock();
2786 let mut sequence = mockall::Sequence::new();
2787
2788 if option == BeginTransactionOption::ExplicitBegin {
2789 mock.expect_begin_transaction()
2790 .once()
2791 .in_sequence(&mut sequence)
2792 .returning(|req| {
2793 let req = req.into_inner();
2794 assert_eq!(
2795 req.session,
2796 "projects/p/instances/i/databases/d/sessions/123"
2797 );
2798 Ok(tonic::Response::new(mock_v1::Transaction {
2799 id: vec![4, 5, 6],
2800 read_timestamp: Some(prost_types::Timestamp {
2801 seconds: 123456789,
2802 nanos: 0,
2803 }),
2804 ..Default::default()
2805 }))
2806 });
2807
2808 mock.expect_execute_streaming_sql()
2809 .times(2)
2810 .returning(|req| {
2811 let req = req.into_inner();
2812 assert_eq!(
2813 req.transaction
2814 .expect("transaction should be present")
2815 .selector
2816 .expect("selector should be present"),
2817 mock_v1::transaction_selector::Selector::Id(vec![4, 5, 6])
2818 );
2819
2820 let mut result_set_partial = setup_select1();
2821 result_set_partial
2822 .metadata
2823 .as_mut()
2824 .expect("metadata should be present")
2825 .transaction = Some(mock_v1::Transaction {
2826 id: vec![4, 5, 6],
2827 read_timestamp: Some(prost_types::Timestamp {
2828 seconds: 123456789,
2829 nanos: 0,
2830 }),
2831 ..Default::default()
2832 });
2833 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
2834 result_set_partial,
2835 )])))
2836 });
2837 } else {
2838 mock.expect_begin_transaction().never();
2839
2840 mock.expect_execute_streaming_sql()
2841 .times(1)
2842 .in_sequence(&mut sequence)
2843 .returning(|req| {
2844 let req = req.into_inner();
2845 assert_eq!(
2846 req.session,
2847 "projects/p/instances/i/databases/d/sessions/123"
2848 );
2849
2850 match req
2851 .transaction
2852 .expect("transaction should be present")
2853 .selector
2854 .expect("selector should be present")
2855 {
2856 mock_v1::transaction_selector::Selector::Begin(_) => {}
2857 _ => panic!("Expected Selector::Begin"),
2858 }
2859 let mut result_set_partial = setup_select1();
2860 result_set_partial
2861 .metadata
2862 .as_mut()
2863 .expect("metadata should be present")
2864 .transaction = Some(mock_v1::Transaction {
2865 id: vec![4, 5, 6],
2866 read_timestamp: Some(prost_types::Timestamp {
2867 seconds: 987654321,
2868 nanos: 0,
2869 }),
2870 ..Default::default()
2871 });
2872 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
2873 result_set_partial,
2874 )])))
2875 });
2876
2877 mock.expect_execute_streaming_sql()
2878 .times(1)
2879 .in_sequence(&mut sequence)
2880 .returning(|req| {
2881 let req = req.into_inner();
2882 match req
2883 .transaction
2884 .expect("transaction should be present")
2885 .selector
2886 .expect("selector should be present")
2887 {
2888 mock_v1::transaction_selector::Selector::Id(id) => {
2889 assert_eq!(id, vec![4, 5, 6]);
2890 }
2891 _ => panic!("Expected Selector::Id"),
2892 }
2893 let mut result_set_partial = setup_select1();
2894 result_set_partial
2895 .metadata
2896 .as_mut()
2897 .expect("metadata should be present")
2898 .transaction = Some(mock_v1::Transaction {
2899 id: vec![4, 5, 6],
2900 read_timestamp: Some(prost_types::Timestamp {
2901 seconds: 987654321,
2902 nanos: 0,
2903 }),
2904 ..Default::default()
2905 });
2906 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
2907 result_set_partial,
2908 )])))
2909 });
2910 }
2911
2912 let (db_client, _server) = setup_db_client(mock).await;
2913
2914 let transaction = db_client
2915 .read_only_transaction()
2916 .with_begin_transaction_option(option)
2917 .build()
2918 .await
2919 .expect("Failed to start transaction");
2920
2921 for _ in 0..2 {
2922 let mut result_set = transaction
2923 .execute_query(Statement::builder("SELECT 1").build())
2924 .await
2925 .expect("Failed to execute query");
2926
2927 let row = result_set
2928 .next()
2929 .await
2930 .expect("has row")
2931 .expect("has valid row");
2932 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
2933
2934 let next_result = result_set.next().await;
2935 assert!(next_result.is_none(), "expected None, got {next_result:?}");
2936 }
2937
2938 Ok(())
2939 }
2940
2941 #[tokio_test_no_panics]
2942 async fn read_only_transaction_begin_with_never_retry() -> anyhow::Result<()> {
2943 let mut mock = MockSpanner::new();
2944 let mut sequence = mockall::Sequence::new();
2945
2946 mock.expect_begin_transaction()
2947 .once()
2948 .in_sequence(&mut sequence)
2949 .returning(|_| Err(tonic::Status::unavailable("transient error")));
2950
2951 mock.expect_create_session().returning(|_| {
2952 Ok(Response::new(mock_v1::Session {
2953 name: "session".to_string(),
2954 multiplexed: true,
2955 ..Default::default()
2956 }))
2957 });
2958
2959 let (db_client, _server) = setup_db_client(mock).await;
2960
2961 let res = db_client
2962 .read_only_transaction()
2963 .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
2964 .with_begin_retry_policy(NeverRetry)
2965 .build()
2966 .await;
2967
2968 assert!(res.is_err(), "should fail immediately without retry");
2969 let err = res.unwrap_err();
2970 assert_eq!(err.status().expect("status").code, GaxCode::Unavailable);
2971
2972 Ok(())
2973 }
2974
2975 #[tokio_test_no_panics]
2976 async fn read_only_transaction_lazy_begin_fallback_never_retry() -> anyhow::Result<()> {
2977 let mut mock = MockSpanner::new();
2978 let mut sequence = mockall::Sequence::new();
2979
2980 mock.expect_execute_streaming_sql()
2982 .once()
2983 .in_sequence(&mut sequence)
2984 .returning(|_| Err(tonic::Status::unavailable("transient error")));
2985
2986 mock.expect_begin_transaction()
2988 .once()
2989 .in_sequence(&mut sequence)
2990 .returning(|_| Err(tonic::Status::unavailable("transient error")));
2991
2992 mock.expect_create_session().returning(|_| {
2993 Ok(Response::new(mock_v1::Session {
2994 name: "session".to_string(),
2995 multiplexed: true,
2996 ..Default::default()
2997 }))
2998 });
2999
3000 let (db_client, _server) = setup_db_client(mock).await;
3001
3002 let transaction = db_client
3003 .read_only_transaction()
3004 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
3005 .with_begin_retry_policy(NeverRetry)
3006 .build()
3007 .await?;
3008
3009 let stmt = Statement::builder("SELECT 1").build();
3010 let res = transaction.execute_query(stmt).await;
3011
3012 assert!(
3013 res.is_err(),
3014 "should fail immediately during fallback without retrying the fallback RPC"
3015 );
3016 let err = res.unwrap_err();
3017 assert_eq!(err.status().expect("status").code, GaxCode::Unavailable);
3018
3019 Ok(())
3020 }
3021
3022 #[tokio_test_no_panics]
3023 async fn read_only_transaction_begin_with_attempt_timeout() -> anyhow::Result<()> {
3024 let mut mock = MockSpanner::new();
3025 let mut sequence = mockall::Sequence::new();
3026
3027 mock.expect_begin_transaction()
3028 .once()
3029 .in_sequence(&mut sequence)
3030 .withf(|req| {
3031 let timeout_header = req.metadata().get("grpc-timeout");
3032 assert!(
3033 timeout_header.is_some(),
3034 "grpc-timeout header should be present"
3035 );
3036 let val = timeout_header.unwrap().to_str().unwrap();
3037 assert!(
3038 val.contains("5000") || val.contains("5"),
3039 "timeout header value '{}' should represent 5 seconds",
3040 val
3041 );
3042 true
3043 })
3044 .returning(|_| {
3045 Ok(Response::new(mock_v1::Transaction {
3046 id: vec![42],
3047 ..Default::default()
3048 }))
3049 });
3050
3051 mock.expect_create_session().returning(|_| {
3052 Ok(Response::new(mock_v1::Session {
3053 name: "session".to_string(),
3054 multiplexed: true,
3055 ..Default::default()
3056 }))
3057 });
3058
3059 let (db_client, _server) = setup_db_client(mock).await;
3060
3061 let _transaction = db_client
3062 .read_only_transaction()
3063 .with_begin_transaction_option(BeginTransactionOption::ExplicitBegin)
3064 .with_begin_attempt_timeout(std::time::Duration::from_secs(5))
3065 .build()
3066 .await?;
3067
3068 Ok(())
3069 }
3070
3071 #[tokio_test_no_panics]
3072 async fn read_only_transaction_builder_sets_gax_options() -> anyhow::Result<()> {
3073 let mut mock = MockSpanner::new();
3074 mock.expect_create_session().returning(|_| {
3075 Ok(Response::new(mock_v1::Session {
3076 name: "session".to_string(),
3077 multiplexed: true,
3078 ..Default::default()
3079 }))
3080 });
3081 let (db_client, _server) = setup_db_client(mock).await;
3082
3083 let builder = db_client
3084 .read_only_transaction()
3085 .with_begin_attempt_timeout(Duration::from_secs(5))
3086 .with_begin_retry_policy(NeverRetry)
3087 .with_begin_backoff_policy(ExponentialBackoff::default());
3088
3089 let gax = builder
3090 .begin_gax_options
3091 .as_ref()
3092 .expect("begin_gax_options missing");
3093 assert_eq!(*gax.attempt_timeout(), Some(Duration::from_secs(5)));
3094 assert!(gax.retry_policy().is_some());
3095 assert!(gax.backoff_policy().is_some());
3096
3097 Ok(())
3098 }
3099
3100 #[tokio_test_no_panics]
3101 async fn read_only_transaction_lazy_begin_fallback_uses_statement_options_when_unconfigured()
3102 -> anyhow::Result<()> {
3103 let mut mock = MockSpanner::new();
3104 let mut sequence = mockall::Sequence::new();
3105
3106 mock.expect_execute_streaming_sql()
3108 .once()
3109 .in_sequence(&mut sequence)
3110 .returning(|_| Err(tonic::Status::unavailable("transient error")));
3111
3112 mock.expect_begin_transaction()
3115 .once()
3116 .in_sequence(&mut sequence)
3117 .withf(|req| {
3118 let timeout_header = req.metadata().get("grpc-timeout");
3119 assert!(
3120 timeout_header.is_some(),
3121 "grpc-timeout header should be present"
3122 );
3123 let val = timeout_header.unwrap().to_str().unwrap();
3124 assert!(
3125 val.contains("5000") || val.contains("5"),
3126 "timeout header value '{}' should represent 5 seconds",
3127 val
3128 );
3129 true
3130 })
3131 .returning(|_| {
3132 Ok(Response::new(mock_v1::Transaction {
3133 id: vec![42],
3134 ..Default::default()
3135 }))
3136 });
3137
3138 mock.expect_execute_streaming_sql()
3140 .once()
3141 .in_sequence(&mut sequence)
3142 .withf(|req| {
3143 matches!(
3144 req.get_ref()
3145 .transaction
3146 .as_ref()
3147 .and_then(|t| t.selector.as_ref()),
3148 Some(mock_v1::transaction_selector::Selector::Id(id)) if id == &vec![42]
3149 )
3150 })
3151 .returning(|_| {
3152 let mut result_set_partial = setup_select1();
3153 result_set_partial
3154 .metadata
3155 .as_mut()
3156 .expect("metadata should be present")
3157 .transaction = Some(mock_v1::Transaction {
3158 id: vec![42],
3159 ..Default::default()
3160 });
3161 Ok(gaxi::grpc::tonic::Response::from(adapt([Ok(
3162 result_set_partial,
3163 )])))
3164 });
3165
3166 mock.expect_create_session().returning(|_| {
3167 Ok(Response::new(mock_v1::Session {
3168 name: "session".to_string(),
3169 multiplexed: true,
3170 ..Default::default()
3171 }))
3172 });
3173
3174 let (db_client, _server) = setup_db_client(mock).await;
3175
3176 let transaction = db_client
3177 .read_only_transaction()
3178 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
3179 .build()
3180 .await?;
3181
3182 let mut stmt_opts = crate::RequestOptions::default();
3183 stmt_opts.set_attempt_timeout(Duration::from_secs(5));
3184 let stmt = Statement::builder("SELECT 1")
3185 .build()
3186 .with_gax_options(stmt_opts);
3187
3188 let mut rs = transaction.execute_query(stmt).await?;
3189 let row = rs.next().await.expect("has row")?;
3190 assert_eq!(row.raw_values(), [Value(string_val("1"))]);
3191
3192 Ok(())
3193 }
3194
3195 #[tokio_test_no_panics]
3196 async fn read_only_transaction_lazy_begin_fallback_merges_custom_options() -> anyhow::Result<()>
3197 {
3198 let mut mock = MockSpanner::new();
3199 let mut sequence = mockall::Sequence::new();
3200
3201 mock.expect_execute_streaming_sql()
3203 .once()
3204 .in_sequence(&mut sequence)
3205 .returning(|_| Err(tonic::Status::unavailable("transient error")));
3206
3207 mock.expect_begin_transaction()
3212 .once()
3213 .in_sequence(&mut sequence)
3214 .withf(|req| {
3215 let timeout_header = req.metadata().get("grpc-timeout");
3216 assert!(
3217 timeout_header.is_some(),
3218 "grpc-timeout header should be present"
3219 );
3220 let val = timeout_header.unwrap().to_str().unwrap();
3221 assert!(
3222 val.contains("5000") || val.contains("5"),
3223 "timeout header value '{}' should represent 5 seconds",
3224 val
3225 );
3226 true
3227 })
3228 .returning(|_| Err(tonic::Status::unavailable("transient error")));
3229
3230 mock.expect_create_session().returning(|_| {
3231 Ok(Response::new(mock_v1::Session {
3232 name: "session".to_string(),
3233 multiplexed: true,
3234 ..Default::default()
3235 }))
3236 });
3237
3238 let (db_client, _server) = setup_db_client(mock).await;
3239
3240 let transaction = db_client
3241 .read_only_transaction()
3242 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
3243 .with_begin_retry_policy(NeverRetry)
3244 .build()
3245 .await?;
3246
3247 let mut stmt_opts = crate::RequestOptions::default();
3248 stmt_opts.set_attempt_timeout(Duration::from_secs(5));
3249 let stmt = Statement::builder("SELECT 1")
3250 .build()
3251 .with_gax_options(stmt_opts);
3252
3253 let res = transaction.execute_query(stmt).await;
3254
3255 assert!(
3256 res.is_err(),
3257 "should fail immediately because of NeverRetry"
3258 );
3259 let err = res.unwrap_err();
3260 assert_eq!(err.status().expect("status").code, GaxCode::Unavailable);
3261
3262 Ok(())
3263 }
3264
3265 #[test]
3266 fn test_merge_request_options() {
3267 let mut dest = crate::RequestOptions::default();
3269 dest.set_attempt_timeout(Duration::from_secs(2));
3270 dest.set_retry_policy(NeverRetry);
3271
3272 let merged = merge_request_options(dest, None);
3274
3275 assert_eq!(*merged.attempt_timeout(), Some(Duration::from_secs(2)));
3276 assert!(merged.retry_policy().is_some());
3277
3278 let dest = crate::RequestOptions::default();
3280
3281 let mut source = crate::RequestOptions::default();
3282 source.set_attempt_timeout(Duration::from_secs(5));
3283 source.set_retry_policy(NeverRetry);
3284
3285 let merged = merge_request_options(dest, Some(&source));
3286
3287 assert_eq!(*merged.attempt_timeout(), Some(Duration::from_secs(5)));
3288 assert!(merged.retry_policy().is_some());
3289
3290 let mut dest = crate::RequestOptions::default();
3292 let mut dest_headers = HeaderMap::new();
3293 dest_headers.insert(
3294 HeaderName::from_static("x-goog-spanner-route-to-leader"),
3295 HeaderValue::from_static("true"),
3296 );
3297 dest = dest.insert_extension(dest_headers);
3298
3299 let mut source = crate::RequestOptions::default();
3300 let mut src_headers = HeaderMap::new();
3301 src_headers.insert(
3302 HeaderName::from_static("x-custom-header"),
3303 HeaderValue::from_static("custom-value"),
3304 );
3305 source = source.insert_extension(src_headers);
3306
3307 let merged = merge_request_options(dest, Some(&source));
3308 let merged_headers = merged
3309 .get_extension::<HeaderMap>()
3310 .expect("HeaderMap missing");
3311
3312 assert_eq!(
3313 merged_headers
3314 .get("x-goog-spanner-route-to-leader")
3315 .unwrap()
3316 .to_str()
3317 .unwrap(),
3318 "true"
3319 );
3320 assert_eq!(
3321 merged_headers
3322 .get("x-custom-header")
3323 .unwrap()
3324 .to_str()
3325 .unwrap(),
3326 "custom-value"
3327 );
3328 }
3329}