1use crate::database_client::DatabaseClient;
16use crate::error::internal_error;
17use crate::google::spanner::v1::{self, PartialResultSet};
18use crate::model::ResultSetStats;
19use crate::model::result_set_stats::RowCount;
20use crate::precommit::PrecommitTokenTracker;
21use crate::read_only_transaction::{ReadContextTransactionSelector, TransactionState};
22use crate::result_set_metadata::ResultSetMetadata;
23use crate::row::Row;
24use crate::server_streaming::stream::PartialResultSetStream;
25use bytes::Bytes;
26use gaxi::prost::FromProto;
27use google_cloud_gax::backoff_policy::BackoffPolicy;
28use google_cloud_gax::error::Error as GaxError;
29use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
30use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
31use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicyExt};
32use google_cloud_gax::retry_state::RetryState;
33use std::collections::VecDeque;
34use std::mem::take;
35use std::sync::Arc;
36use tokio::time::sleep;
37
38#[cfg(feature = "unstable-stream")]
39use futures::Stream;
40
41#[derive(Debug)]
56pub struct ResultSet {
57 stream: Option<PartialResultSetStream>,
58 buffered_values: Vec<prost_types::Value>,
59 chunked: bool,
60 seen_last: bool,
61 ready_rows: VecDeque<Row>,
62 local_metadata: Option<ResultSetMetadata>,
63 stats: Option<ResultSetStats>,
64 precommit_token_tracker: PrecommitTokenTracker,
65
66 client: DatabaseClient,
68 session_name: String,
69 transaction_tag: Option<String>,
70 operation: StreamOperation,
71 last_resume_token: Bytes,
72 partial_result_sets_buffer: VecDeque<PartialResultSet>,
73 safe_to_retry: bool,
74 max_buffered_partial_result_sets: usize,
75 retry_count: usize,
76 transaction_selector: Option<ReadContextTransactionSelector>,
77 channel_hint: usize,
78 gax_options: GaxRequestOptions,
79}
80
81#[derive(Debug, Clone)]
82pub(crate) enum StreamOperation {
83 Query(crate::model::ExecuteSqlRequest),
84 Read(crate::model::ReadRequest),
85}
86
87pub(crate) struct ResultSetParams {
88 pub stream: PartialResultSetStream,
89 pub transaction_selector: Option<ReadContextTransactionSelector>,
90 pub precommit_token_tracker: PrecommitTokenTracker,
91 pub client: DatabaseClient,
92 pub session_name: String,
93 pub transaction_tag: Option<String>,
94 pub operation: StreamOperation,
95 pub channel_hint: usize,
96 pub gax_options: GaxRequestOptions,
97}
98
99const MAX_BUFFERED_PARTIAL_RESULT_SETS: usize = 10;
103
104impl ResultSet {
105 pub(crate) async fn create(params: ResultSetParams) -> crate::Result<Self> {
107 let mut result_set = Self::new(params);
108 result_set.init_stream().await?;
109 Ok(result_set)
110 }
111
112 fn new(params: ResultSetParams) -> Self {
114 let ResultSetParams {
115 stream,
116 transaction_selector,
117 precommit_token_tracker,
118 client,
119 session_name,
120 transaction_tag,
121 operation,
122 channel_hint,
123 gax_options,
124 } = params;
125
126 let gax_options = Self::apply_defaults(gax_options);
127
128 Self {
129 stream: Some(stream),
130 buffered_values: Vec::new(),
131 chunked: false,
132 seen_last: false,
133 ready_rows: VecDeque::new(),
134 local_metadata: None,
135 stats: None,
136 precommit_token_tracker,
137 client,
138 session_name,
139 transaction_tag,
140 operation,
141 last_resume_token: Bytes::new(),
142 partial_result_sets_buffer: VecDeque::new(),
143 safe_to_retry: true,
144 max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS,
145 retry_count: 0,
146 transaction_selector,
147 channel_hint,
148 gax_options,
149 }
150 }
151
152 fn apply_defaults(mut gax_options: GaxRequestOptions) -> GaxRequestOptions {
153 if gax_options.retry_policy().is_none() {
154 gax_options.set_retry_policy(Aip194Strict.with_attempt_limit(10));
155 }
156 if gax_options.backoff_policy().is_none() {
157 gax_options.set_backoff_policy(Self::default_backoff_policy());
158 }
159 gax_options
160 }
161
162 fn default_backoff_policy() -> Arc<dyn BackoffPolicy> {
163 Arc::new(ExponentialBackoffBuilder::default().clamp())
164 }
165
166 async fn init_stream(&mut self) -> crate::Result<()> {
167 loop {
170 let stream_result = match &mut self.stream {
171 Some(s) => s.next_message().await,
172 None => {
173 return Err(internal_error(
174 "Query stream ended without metadata or error",
175 ));
176 }
177 };
178
179 match stream_result {
180 Some(Ok(partial_result_set)) => {
181 self.handle_partial_result_set(partial_result_set)?;
182 return Ok(());
183 }
184 Some(Err(e)) => {
185 self.handle_stream_error(e).await?;
186 }
187 None => {
188 return Err(internal_error(
189 "Query stream ended without metadata or error",
190 ));
191 }
192 }
193 }
194 }
195
196 pub fn metadata(&self) -> Option<&ResultSetMetadata> {
212 self.local_metadata.as_ref()
213 }
214
215 pub fn stats(&self) -> Option<&ResultSetStats> {
236 self.stats.as_ref()
237 }
238
239 pub fn update_count(&self) -> Option<i64> {
268 self.stats.as_ref().and_then(|s| {
269 s.row_count.as_ref().map(|rc| match rc {
270 RowCount::RowCountExact(c) => *c,
271 RowCount::RowCountLowerBound(c) => *c,
272 })
273 })
274 }
275
276 pub async fn next(&mut self) -> Option<crate::Result<Row>> {
292 loop {
293 if let Some(row) = self.ready_rows.pop_front() {
294 return Some(Ok(row));
295 }
296
297 if self.seen_last {
298 self.stream = None;
299 return None;
300 }
301
302 let stream_result = match &mut self.stream {
303 Some(s) => s.next_message().await,
304 None => return None,
305 };
306
307 match stream_result {
308 Some(Ok(partial_result_set)) => {
309 if let Err(e) = self.handle_partial_result_set(partial_result_set) {
310 return Some(Err(e));
311 }
312 }
313 Some(Err(e)) => {
314 if let Err(err) = self.handle_stream_error(e).await {
315 return Some(Err(err));
316 }
317 }
318 None => match self.handle_stream_end() {
319 Ok(Some(row)) => return Some(Ok(row)),
320 Ok(None) => return None,
321 Err(e) => return Some(Err(e)),
322 },
323 }
324 }
325 }
326
327 #[cfg(feature = "unstable-stream")]
350 pub fn into_stream(self) -> impl Stream<Item = crate::Result<Row>> + Unpin {
351 use futures::stream::unfold;
352 Box::pin(unfold(self, |mut result_set| async move {
353 result_set.next().await.map(|row| (row, result_set))
354 }))
355 }
356}
357
358impl ResultSet {
359 fn handle_partial_result_set(
360 &mut self,
361 mut partial_result_set: PartialResultSet,
362 ) -> crate::Result<()> {
363 self.precommit_token_tracker.update(
364 partial_result_set
365 .precommit_token
366 .clone()
367 .map(|t| t.cnv().expect("failed to convert precommit token")),
368 );
369
370 if partial_result_set.last {
371 self.seen_last = true;
372 }
373
374 match (
375 self.local_metadata.as_ref(),
376 partial_result_set.metadata.take(),
377 ) {
378 (Some(_), None) => {}
379 (None, None) => {
380 return Err(internal_error(
381 "First PartialResultSet did not contain metadata",
382 ));
383 }
384 (Some(_), Some(_)) => {
385 return Err(internal_error("Additional metadata after first result set"));
386 }
387 (None, Some(m)) => {
388 self.handle_metadata(m)?;
389 }
390 }
391
392 if !partial_result_set.resume_token.is_empty() {
397 self.last_resume_token = partial_result_set.resume_token.clone();
398 self.safe_to_retry = true;
399 self.partial_result_sets_buffer
400 .push_back(partial_result_set);
401 self.flush_buffer()?;
402 return Ok(());
403 }
404
405 if self.partial_result_sets_buffer.len() >= self.max_buffered_partial_result_sets {
408 self.safe_to_retry = false;
411 if let Some(oldest) = self.partial_result_sets_buffer.pop_front() {
412 self.process_partial_result_set(oldest)?;
413 }
414 }
415 self.partial_result_sets_buffer
416 .push_back(partial_result_set);
417
418 if self.seen_last {
419 self.flush_buffer()?;
420 if self.chunked {
421 return Err(crate::error::internal_error(
422 "Stream ended with chunked_value=true",
423 ));
424 }
425 }
426
427 Ok(())
428 }
429
430 fn handle_metadata(&mut self, mut m: v1::ResultSetMetadata) -> crate::Result<()> {
431 let transaction = m.transaction.take();
432 let meta = ResultSetMetadata::new(Some(m));
433 if let Some(selector) = &self.transaction_selector {
434 if let Some(transaction) = transaction {
435 selector.update(
436 transaction.id,
437 transaction
438 .read_timestamp
439 .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()),
440 )?;
441 } else if let ReadContextTransactionSelector::Lazy(lazy) = selector {
442 let is_started = matches!(
443 &*lazy.lock().expect("transaction state mutex poisoned"),
444 TransactionState::Started(_, _)
445 );
446 if !is_started {
447 return Err(internal_error(
448 "Spanner failed to return a transaction ID for a query that included a BeginTransaction option",
449 ));
450 }
451 }
452 }
453 self.local_metadata = Some(meta);
454 Ok(())
455 }
456
457 async fn handle_stream_error(&mut self, e: crate::Error) -> crate::Result<()> {
458 if self.safe_to_retry && self.should_retry(&e) {
459 self.retry_count += 1;
460 self.partial_result_sets_buffer.clear();
463
464 if let Some(policy) = self.gax_options.backoff_policy() {
466 let state =
467 RetryState::new(self.safe_to_retry).set_attempt_count(self.retry_count as u32);
468 let delay = policy.on_failure(&state);
469 sleep(delay).await;
470 }
471
472 self.restart_stream().await?;
473 return Ok(());
474 }
475
476 let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else {
480 return Err(e);
481 };
482 let is_started = matches!(
483 &*lazy.lock().unwrap(),
484 crate::read_only_transaction::TransactionState::Started(_, _)
485 );
486 if is_started {
487 return Err(e);
488 }
489
490 self.transaction_selector
491 .as_ref()
492 .unwrap()
493 .begin_explicitly(crate::read_only_transaction::ExplicitBeginParams {
494 client: self.client.clone(),
495 session_name: self.session_name.clone(),
496 transaction_tag: self.transaction_tag.clone(),
497 channel_hint: self.channel_hint,
498 request_options: self.gax_options.clone(),
499 is_stream_fallback: true,
500 precommit_token_tracker: self.precommit_token_tracker.clone(),
501 mutation_key: None,
502 })
503 .await?;
504
505 self.partial_result_sets_buffer.clear();
506 self.restart_stream().await?;
507 Ok(())
508 }
509
510 fn handle_stream_end(&mut self) -> crate::Result<Option<Row>> {
511 if !self.partial_result_sets_buffer.is_empty() {
516 self.flush_buffer()?;
517 }
518 if self.chunked {
519 return Err(crate::error::internal_error(
521 "Stream ended with chunked_value=true",
522 ));
523 }
524 if let Some(row) = self.ready_rows.pop_front() {
525 return Ok(Some(row));
526 }
527 Ok(None)
528 }
529
530 fn flush_buffer(&mut self) -> crate::Result<()> {
531 let mut buffer_to_flush = take(&mut self.partial_result_sets_buffer);
532 while let Some(partial_result_set) = buffer_to_flush.pop_front() {
533 self.process_partial_result_set(partial_result_set)?;
534 }
535 Ok(())
536 }
537
538 fn process_partial_result_set(
539 &mut self,
540 partial_result_set: PartialResultSet,
541 ) -> crate::Result<()> {
542 let PartialResultSet {
543 stats,
544 values,
545 chunked_value,
546 ..
547 } = partial_result_set;
548
549 match (&self.stats, stats) {
550 (Some(_), Some(_)) => {
551 return Err(internal_error("Additional stats received after first"));
552 }
553 (None, Some(s)) => {
554 let converted_stats = s
555 .cnv()
556 .map_err(|e| internal_error(format!("failed to convert stats: {}", e)))?;
557 self.stats = Some(converted_stats);
558 }
559 _ => {}
560 }
561
562 if values.is_empty() {
563 return Ok(());
564 }
565 let metadata = self.local_metadata.as_ref().ok_or_else(|| {
566 internal_error("PartialResultSet contained values but no metadata was provided")
567 })?;
568 if metadata.column_types.is_empty() {
569 return Err(internal_error(
570 "PartialResultSet contained values but no column metadata was provided",
571 ));
572 }
573
574 let mut values_iter = values.into_iter();
575 if self.chunked
576 && let Some(last_val) = self.buffered_values.last_mut()
577 && let Some(first_new) = values_iter.next()
578 {
579 merge_values(last_val, first_new)?;
580 }
581
582 self.buffered_values.extend(values_iter);
583 self.chunked = chunked_value;
584
585 while self.buffered_values.len() >= metadata.column_types.len() {
586 let column_count = metadata.column_types.len();
587 if self.buffered_values.len() == column_count && self.chunked {
588 break;
589 }
590
591 let row_values: Vec<crate::value::Value> = self
592 .buffered_values
593 .drain(..column_count)
594 .map(crate::value::Value)
595 .collect();
596 self.ready_rows.push_back(Row {
597 values: row_values,
598 metadata: metadata.clone(),
599 });
600 }
601 Ok(())
602 }
603
604 async fn restart_stream(&mut self) -> crate::Result<()> {
605 if let Some(s) = &self.transaction_selector {
609 s.maybe_reset_starting();
610 }
611
612 let transaction_selector = if let Some(s) = &self.transaction_selector {
614 Some(s.selector().await?)
615 } else {
616 None
617 };
618
619 if self.last_resume_token.is_empty() {
624 self.local_metadata = None;
625 }
626
627 match &mut self.operation {
628 StreamOperation::Query(req) => {
629 req.resume_token = self.last_resume_token.clone();
630 req.transaction = transaction_selector
631 .clone()
632 .or_else(|| req.transaction.take());
633 let stream = self
634 .client
635 .spanner
636 .execute_streaming_sql(req.clone(), self.gax_options.clone(), self.channel_hint)
637 .send()
638 .await?;
639 self.stream = Some(stream);
640 }
641 StreamOperation::Read(req) => {
642 req.resume_token = self.last_resume_token.clone();
643 req.transaction = transaction_selector
644 .clone()
645 .or_else(|| req.transaction.take());
646 let stream = self
647 .client
648 .spanner
649 .streaming_read(req.clone(), self.gax_options.clone(), self.channel_hint)
650 .send()
651 .await?;
652 self.stream = Some(stream);
653 }
654 }
655 Ok(())
656 }
657
658 fn should_retry(&self, e: &crate::Error) -> bool {
659 if let Some(policy) = self.gax_options.retry_policy() {
660 let state =
661 RetryState::new(self.safe_to_retry).set_attempt_count(self.retry_count as u32);
662
663 if let Some(status) = e.status() {
664 let gax_error = GaxError::service(status.clone());
665 return policy.on_error(&state, gax_error).is_continue();
666 }
667 }
668 false
669 }
670}
671
672fn merge_values(target: &mut prost_types::Value, source: prost_types::Value) -> crate::Result<()> {
683 use prost_types::value::Kind;
684 match (&mut target.kind, source.kind) {
685 (Some(Kind::StringValue(s)), Some(Kind::StringValue(source_s))) => {
686 s.push_str(&source_s);
687 Ok(())
688 }
689 (Some(Kind::ListValue(target_list)), Some(Kind::ListValue(mut source_list))) => {
690 if source_list.values.is_empty() {
691 return Ok(());
692 }
693 if target_list.values.is_empty() {
694 target_list.values = source_list.values;
695 return Ok(());
696 }
697
698 let source_first = source_list.values.remove(0);
699 if let Some(target_last) = target_list.values.last_mut() {
700 match (&target_last.kind, &source_first.kind) {
701 (Some(Kind::StringValue(_)), Some(Kind::StringValue(_)))
702 | (Some(Kind::ListValue(_)), Some(Kind::ListValue(_))) => {
703 merge_values(target_last, source_first)?;
704 }
705 _ => {
706 target_list.values.push(source_first);
707 }
708 }
709 } else {
710 target_list.values.push(source_first);
711 }
712 target_list.values.extend(source_list.values);
713 Ok(())
714 }
715 _ => Err(internal_error(
719 "Incompatible types for merging chunked values",
720 )),
721 }
722}
723
724#[cfg(test)]
725impl ResultSet {
726 pub(crate) fn set_max_buffered_partial_result_sets(&mut self, limit: usize) {
727 self.max_buffered_partial_result_sets = limit;
728 }
729}
730
731#[cfg(test)]
732pub(crate) mod tests {
733 use super::*;
734 use crate::client::Spanner;
735 use crate::key::KeySet;
736 use crate::read::ReadRequest;
737 use crate::statement::Statement;
738 use crate::transaction::BeginTransactionOption;
739 use gaxi::grpc::tonic::{Code as GrpcCode, Response, Status};
740 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
741 use google_cloud_gax::backoff_policy::BackoffPolicy;
742 use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicyExt};
743 use google_cloud_gax::retry_state::RetryState;
744 use google_cloud_test_macros::tokio_test_no_panics;
745 use prost_types::Value;
746 use spanner_grpc_mock::MockSpanner;
747 use spanner_grpc_mock::google::spanner::v1 as spanner_v1;
748 use spanner_grpc_mock::google::spanner::v1::struct_type::Field;
749 use spanner_grpc_mock::google::spanner::v1::{
750 MultiplexedSessionPrecommitToken, PartialResultSet, ResultSetMetadata, Session, StructType,
751 };
752 use spanner_grpc_mock::start;
753 use spanner_v1::result_set_stats::RowCount;
754 use std::time::Duration;
755
756 mockall::mock! {
757 #[derive(Debug)]
758 BackoffPolicy {}
759 impl BackoffPolicy for BackoffPolicy {
760 fn on_failure(&self, state: &RetryState) -> Duration;
761 }
762 }
763
764 pub(crate) fn string_val(s: &str) -> Value {
765 Value {
766 kind: Some(prost_types::value::Kind::StringValue(s.to_string())),
767 }
768 }
769
770 fn list_val(vals: Vec<Value>) -> Value {
771 Value {
772 kind: Some(prost_types::value::Kind::ListValue(
773 prost_types::ListValue { values: vals },
774 )),
775 }
776 }
777
778 fn metadata(cols: usize) -> Option<ResultSetMetadata> {
779 let mut fields = vec![];
780 for i in 0..cols {
781 fields.push(Field {
782 name: format!("col{}", i),
783 r#type: None,
784 });
785 }
786 Some(ResultSetMetadata {
787 row_type: Some(StructType { fields }),
788 transaction: None,
789 undeclared_parameters: None,
790 })
791 }
792
793 pub(crate) fn adapt<I, T>(items: I) -> tokio::sync::mpsc::Receiver<T>
794 where
795 I: IntoIterator<Item = T>,
796 I::IntoIter: ExactSizeIterator,
797 {
798 let items = items.into_iter();
799 let (tx, rx) = tokio::sync::mpsc::channel(items.len().max(1));
800 for i in items {
801 tx.try_send(i)
802 .expect("can't fail, we allocated enough capacity.");
803 }
804 rx
805 }
806
807 async fn run_mock_query(results: Vec<PartialResultSet>) -> ResultSet {
808 run_mock_query_fallible(results).await.unwrap()
809 }
810
811 async fn run_mock_query_fallible(results: Vec<PartialResultSet>) -> crate::Result<ResultSet> {
812 let mut mock = MockSpanner::new();
813 let rx = adapt(results.into_iter().map(Ok));
814 mock.expect_execute_streaming_sql()
815 .return_once(move |_request| Ok(Response::from(rx)));
816
817 mock.expect_create_session().returning(|_| {
818 Ok(Response::new(Session {
819 name: "session".to_string(),
820 multiplexed: true,
821 ..Default::default()
822 }))
823 });
824
825 let (address, _server) = start("127.0.0.1:0", mock)
826 .await
827 .expect("Failed to start mock server");
828
829 let client: Spanner = Spanner::builder()
830 .with_endpoint(address)
831 .with_credentials(Anonymous::new().build())
832 .build()
833 .await
834 .expect("Failed to build client");
835
836 let db_client: crate::database_client::DatabaseClient =
837 client.database_client("db").build().await.unwrap();
838 let tx: crate::read_only_transaction::SingleUseReadOnlyTransaction =
839 db_client.single_use().build();
840 tx.execute_query("SELECT 1").await
841 }
842
843 #[test]
844 fn test_auto_traits() {
845 static_assertions::assert_impl_all!(ResultSet: std::fmt::Debug, Send, Sync);
846 }
847
848 #[tokio_test_no_panics]
849 async fn test_result_set_zero_rows() {
850 let mut rs = run_mock_query(vec![PartialResultSet {
851 metadata: metadata(2),
852 values: vec![],
853 chunked_value: false,
854 resume_token: vec![],
855 stats: None,
856 precommit_token: None,
857 last: true,
858 cache_update: None,
859 }])
860 .await;
861
862 let next = rs.next().await;
863 assert!(next.is_none());
864 }
865
866 #[tokio_test_no_panics]
867 async fn test_result_set_metadata() -> anyhow::Result<()> {
868 let mut rs = run_mock_query(vec![PartialResultSet {
869 metadata: metadata(2),
870 values: vec![string_val("a"), string_val("b")],
871 last: true,
872 ..Default::default()
873 }])
874 .await;
875
876 let meta = rs.metadata().expect("metadata available");
878 assert_eq!(
879 meta.column_names(),
880 &["col0".to_string(), "col1".to_string()]
881 );
882
883 let _next = rs.next().await.expect("Expected a row")?;
885
886 let meta = rs.metadata().expect("metadata available");
888 assert_eq!(
889 meta.column_names(),
890 &["col0".to_string(), "col1".to_string()]
891 );
892
893 Ok(())
894 }
895
896 #[tokio_test_no_panics]
897 async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> {
898 let res = run_mock_query_fallible(vec![PartialResultSet {
899 values: vec![string_val("row1")],
900 ..Default::default()
901 }])
902 .await;
903
904 assert!(res.is_err(), "Expected an error but got Ok");
905 let err_str = res.expect_err("Expected should be an error").to_string();
906 assert!(
907 err_str.contains("First PartialResultSet did not contain metadata"),
908 "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
909 err_str
910 );
911
912 Ok(())
913 }
914
915 #[tokio_test_no_panics]
916 async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> {
917 let res = run_mock_query_fallible(vec![
918 PartialResultSet {
919 values: vec![string_val("row1")],
920 ..Default::default()
921 },
922 PartialResultSet {
923 resume_token: b"token".to_vec(),
924 ..Default::default()
925 },
926 ])
927 .await;
928
929 assert!(res.is_err(), "Expected an error but got Ok");
930 let err_str = res.expect_err("Expected should be an error").to_string();
931 assert!(
932 err_str.contains("First PartialResultSet did not contain metadata"),
933 "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
934 err_str
935 );
936
937 Ok(())
938 }
939
940 #[tokio_test_no_panics]
941 async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> {
942 let mut rs = run_mock_query(vec![PartialResultSet {
943 metadata: metadata(2),
944 values: vec![string_val("a")],
945 chunked_value: true,
946 ..Default::default()
947 }])
948 .await;
949
950 let res = rs.next().await;
951 assert!(res.is_some(), "Expected an error but got None");
952 let res = res.expect("Expected some response but got None");
953 assert!(res.is_err(), "Expected an error but got Ok");
954 let err_str = res.expect_err("Expected should be an error").to_string();
955 assert!(
956 err_str.contains("Stream ended with chunked_value=true"),
957 "Expected error to contain 'Stream ended with chunked_value=true', but got '{}'",
958 err_str
959 );
960
961 Ok(())
962 }
963
964 #[tokio_test_no_panics]
965 async fn test_result_set_duplicate_metadata() -> anyhow::Result<()> {
966 let mut rs = run_mock_query(vec![
967 PartialResultSet {
968 metadata: metadata(2),
969 values: vec![string_val("a"), string_val("b")],
970 resume_token: b"token1".to_vec(),
971 ..Default::default()
972 },
973 PartialResultSet {
974 metadata: metadata(2),
975 values: vec![string_val("c"), string_val("d")],
976 ..Default::default()
977 },
978 ])
979 .await;
980
981 rs.next().await.expect("Expected a row")?;
982
983 let res2 = rs.next().await;
984 assert!(res2.is_some(), "Expected an error but got None");
985 let res2 = res2.expect("Expected some response but got None");
986 assert!(res2.is_err(), "Expected an error but got Ok");
987 let err_str = res2.expect_err("Expected should be an error").to_string();
988 assert!(
989 err_str.contains("Additional metadata after first result set"),
990 "Expected error to contain 'Additional metadata after first result set', but got '{}'",
991 err_str
992 );
993
994 Ok(())
995 }
996
997 #[tokio_test_no_panics]
998 async fn test_result_set_empty_column_metadata() -> anyhow::Result<()> {
999 let mut rs = run_mock_query(vec![PartialResultSet {
1000 metadata: Some(ResultSetMetadata {
1001 row_type: Some(StructType { fields: vec![] }),
1002 ..Default::default()
1003 }),
1004 values: vec![string_val("a")],
1005 ..Default::default()
1006 }])
1007 .await;
1008
1009 let res = rs.next().await;
1010 assert!(res.is_some(), "Expected an error but got None");
1011 let res = res.expect("Expected some response but got None");
1012 assert!(res.is_err(), "Expected an error but got Ok");
1013 let err_str = res.expect_err("Expected should be an error").to_string();
1014 assert!(
1015 err_str
1016 .contains("PartialResultSet contained values but no column metadata was provided"),
1017 "Expected error to contain 'PartialResultSet contained values but no column metadata was provided', but got '{}'",
1018 err_str
1019 );
1020
1021 Ok(())
1022 }
1023
1024 #[tokio_test_no_panics]
1025 async fn test_result_set_default_policies_applied() -> anyhow::Result<()> {
1026 let rs = run_mock_query(vec![PartialResultSet {
1027 metadata: metadata(2),
1028 last: true,
1029 ..Default::default()
1030 }])
1031 .await;
1032
1033 assert!(
1034 rs.gax_options.retry_policy().is_some(),
1035 "Default retry policy should be applied"
1036 );
1037 assert!(
1038 rs.gax_options.backoff_policy().is_some(),
1039 "Default backoff policy should be applied"
1040 );
1041
1042 Ok(())
1043 }
1044
1045 #[tokio_test_no_panics]
1046 async fn test_result_set_retry_read_stream() -> anyhow::Result<()> {
1047 let mut mock = MockSpanner::new();
1048 let mut seq = mockall::Sequence::new();
1049
1050 mock.expect_streaming_read()
1051 .times(1)
1052 .in_sequence(&mut seq)
1053 .returning(|_request| {
1054 let stream = adapt([
1055 Ok(PartialResultSet {
1056 metadata: metadata(2),
1057 values: vec![string_val("row1"), string_val("b")],
1058 resume_token: b"token1".to_vec(),
1059 ..Default::default()
1060 }),
1061 Err(Status::unavailable("Unavailable error")),
1062 ]);
1063 Ok(Response::from(stream))
1064 });
1065
1066 mock.expect_streaming_read()
1067 .times(1)
1068 .in_sequence(&mut seq)
1069 .returning(|_request| {
1070 let stream = adapt([Ok(PartialResultSet {
1071 values: vec![string_val("row2"), string_val("d")],
1072 resume_token: b"token2".to_vec(),
1073 last: true,
1074 ..Default::default()
1075 })]);
1076 Ok(Response::from(stream))
1077 });
1078
1079 mock.expect_create_session().returning(|_| {
1080 Ok(Response::new(Session {
1081 name: "session".to_string(),
1082 multiplexed: true,
1083 ..Default::default()
1084 }))
1085 });
1086
1087 let (address, _server) = start("127.0.0.1:0", mock).await?;
1088
1089 let client: Spanner = Spanner::builder()
1090 .with_endpoint(address)
1091 .with_credentials(Anonymous::new().build())
1092 .build()
1093 .await?;
1094
1095 let db_client = client.database_client("db").build().await?;
1096 let tx = db_client.single_use().build();
1097 let mut mock_backoff = MockBackoffPolicy::new();
1098 mock_backoff
1099 .expect_on_failure()
1100 .returning(|_| Duration::from_nanos(1));
1101
1102 let read_req = crate::read::ReadRequest::builder("table", vec!["Id", "Value"])
1103 .with_keys(crate::key::KeySet::all())
1104 .with_backoff_policy(mock_backoff)
1105 .build();
1106 let mut rs: ResultSet = tx.execute_read(read_req).await?;
1107
1108 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1109 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1110
1111 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1112 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1113
1114 assert!(rs.next().await.is_none());
1115
1116 Ok(())
1117 }
1118
1119 #[tokio_test_no_panics]
1120 async fn test_result_set_custom_retry_policy() -> anyhow::Result<()> {
1121 let retry_policy = Aip194Strict.continue_on_too_many_requests();
1123
1124 let mut mock = MockSpanner::new();
1125 let mut seq = mockall::Sequence::new();
1126
1127 mock.expect_streaming_read()
1129 .times(1)
1130 .in_sequence(&mut seq)
1131 .returning(|_request| {
1132 let stream = adapt([
1133 Ok(PartialResultSet {
1134 metadata: metadata(2),
1135 values: vec![string_val("row1"), string_val("b")],
1136 resume_token: b"token1".to_vec(),
1137 ..Default::default()
1138 }),
1139 Err(Status::new(GrpcCode::ResourceExhausted, "Quota exceeded")),
1140 ]);
1141 Ok(Response::from(stream))
1142 });
1143
1144 mock.expect_streaming_read()
1146 .times(1)
1147 .in_sequence(&mut seq)
1148 .returning(|_request| {
1149 let stream = adapt([Ok(PartialResultSet {
1150 values: vec![string_val("row2"), string_val("d")],
1151 resume_token: b"token2".to_vec(),
1152 last: true,
1153 ..Default::default()
1154 })]);
1155 Ok(Response::from(stream))
1156 });
1157
1158 mock.expect_create_session().returning(|_| {
1159 Ok(Response::new(Session {
1160 name: "session".to_string(),
1161 multiplexed: true,
1162 ..Default::default()
1163 }))
1164 });
1165
1166 let (address, _server) = start("127.0.0.1:0", mock).await?;
1167
1168 let client: Spanner = Spanner::builder()
1169 .with_endpoint(address)
1170 .with_credentials(Anonymous::new().build())
1171 .build()
1172 .await?;
1173
1174 let db_client = client.database_client("db").build().await?;
1175 let tx = db_client.single_use().build();
1176
1177 let mut mock_backoff = MockBackoffPolicy::new();
1178 mock_backoff
1179 .expect_on_failure()
1180 .times(1)
1181 .returning(|_| Duration::from_nanos(1));
1182
1183 let read_req = ReadRequest::builder("table", vec!["Id", "Value"])
1184 .with_keys(KeySet::all())
1185 .with_retry_policy(retry_policy)
1186 .with_backoff_policy(mock_backoff)
1187 .build();
1188
1189 let mut rs: ResultSet = tx.execute_read(read_req).await?;
1190
1191 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1192 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1193
1194 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1196 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1197
1198 assert!(rs.next().await.is_none());
1199
1200 Ok(())
1201 }
1202
1203 #[tokio_test_no_panics]
1204 async fn test_result_set_one_row() {
1205 let mut rs = run_mock_query(vec![PartialResultSet {
1206 metadata: metadata(2),
1207 values: vec![string_val("a"), string_val("b")],
1208 chunked_value: false,
1209 resume_token: vec![],
1210 stats: None,
1211 precommit_token: None,
1212 last: true,
1213 cache_update: None,
1214 }])
1215 .await;
1216
1217 let row = rs.next().await.unwrap().unwrap();
1218 assert_eq!(row.raw_values().len(), 2);
1219 assert_eq!(row.raw_values()[0].0, string_val("a"));
1220 assert_eq!(row.raw_values()[1].0, string_val("b"));
1221
1222 assert!(rs.next().await.is_none());
1223 }
1224
1225 #[tokio_test_no_panics]
1226 async fn result_set_last_flag() -> anyhow::Result<()> {
1227 let mut rs = run_mock_query(vec![
1228 PartialResultSet {
1229 metadata: metadata(2),
1230 values: vec![string_val("a"), string_val("b")],
1231 last: true,
1232 ..Default::default()
1233 },
1234 PartialResultSet {
1238 values: vec![string_val("c"), string_val("d")],
1239 ..Default::default()
1240 },
1241 ])
1242 .await;
1243
1244 let row = rs.next().await.expect("Expected a row")?;
1245 assert_eq!(row.raw_values()[0].0, string_val("a"));
1246
1247 assert!(rs.next().await.is_none());
1249
1250 Ok(())
1251 }
1252
1253 #[tokio_test_no_panics]
1254 async fn result_set_early_termination_not_cancelled() -> anyhow::Result<()> {
1255 let mut mock = MockSpanner::new();
1256 let (tx, rx) = tokio::sync::mpsc::channel(10);
1257
1258 mock.expect_execute_streaming_sql()
1259 .return_once(move |_request| Ok(Response::from(rx)));
1260
1261 mock.expect_create_session().returning(|_| {
1262 Ok(Response::new(Session {
1263 name: "session".to_string(),
1264 multiplexed: true,
1265 ..Default::default()
1266 }))
1267 });
1268
1269 let (address, _server) = start("127.0.0.1:0", mock).await?;
1270
1271 let client: Spanner = Spanner::builder()
1272 .with_endpoint(address)
1273 .with_credentials(Anonymous::new().build())
1274 .build()
1275 .await?;
1276
1277 let db_client = client.database_client("db").build().await?;
1278 let tx_single = db_client.single_use().build();
1279
1280 tx.send(Ok(PartialResultSet {
1282 metadata: metadata(2),
1283 values: vec![string_val("a"), string_val("b")],
1284 last: true,
1285 ..Default::default()
1286 }))
1287 .await
1288 .expect("Failed to send first message");
1289
1290 let mut rs: ResultSet = tx_single.execute_query("SELECT 1").await?;
1291
1292 let row = rs.next().await.expect("Expected a row")?;
1294 assert_eq!(row.raw_values()[0].0, string_val("a"));
1295
1296 assert!(rs.next().await.is_none());
1299 drop(rs);
1300 tx.closed().await;
1301
1302 let send_result = tx
1304 .send(Ok(PartialResultSet {
1305 values: vec![string_val("c"), string_val("d")],
1306 ..Default::default()
1307 }))
1308 .await;
1309
1310 assert!(send_result.is_err(), "Expected stream to be cancelled");
1311
1312 Ok(())
1313 }
1314
1315 #[tokio_test_no_panics]
1316 async fn test_result_set_chunked_values_string() {
1317 let mut rs = run_mock_query(vec![
1318 PartialResultSet {
1319 metadata: metadata(1),
1320 values: vec![string_val("hello ")],
1321 chunked_value: true,
1322 resume_token: vec![],
1323 stats: None,
1324 precommit_token: None,
1325 last: false,
1326 cache_update: None,
1327 },
1328 PartialResultSet {
1329 metadata: None,
1330 values: vec![string_val("world")],
1331 chunked_value: false,
1332 resume_token: vec![],
1333 stats: None,
1334 precommit_token: None,
1335 last: true,
1336 cache_update: None,
1337 },
1338 ])
1339 .await;
1340
1341 let row = rs.next().await.unwrap().unwrap();
1342 assert_eq!(row.raw_values().len(), 1);
1343 if let Some(prost_types::value::Kind::StringValue(ref s)) = row.raw_values()[0].0.kind {
1344 assert_eq!(s, "hello world");
1345 } else {
1346 panic!("Expected StringValue");
1347 }
1348 assert!(rs.next().await.is_none());
1349 }
1350
1351 #[tokio_test_no_panics]
1352 async fn test_result_set_chunked_values_list() {
1353 let mut rs = run_mock_query(vec![
1354 PartialResultSet {
1355 metadata: metadata(1),
1356 values: vec![list_val(vec![string_val("A")])],
1357 chunked_value: true,
1358 resume_token: vec![],
1359 stats: None,
1360 precommit_token: None,
1361 last: false,
1362 cache_update: None,
1363 },
1364 PartialResultSet {
1365 metadata: None,
1366 values: vec![list_val(vec![string_val("B")])],
1367 chunked_value: false,
1368 resume_token: vec![],
1369 stats: None,
1370 precommit_token: None,
1371 last: true,
1372 cache_update: None,
1373 },
1374 ])
1375 .await;
1376
1377 let row = rs.next().await.unwrap().unwrap();
1378 assert_eq!(row.raw_values().len(), 1);
1379 if let Some(prost_types::value::Kind::ListValue(ref l)) = row.raw_values()[0].0.kind {
1380 assert_eq!(l.values.len(), 1);
1381 if let Some(prost_types::value::Kind::StringValue(ref s)) = l.values[0].kind {
1382 assert_eq!(s, "AB");
1383 } else {
1384 panic!("Expected StringValue");
1385 }
1386 } else {
1387 panic!("Expected ListValue");
1388 }
1389 assert!(rs.next().await.is_none());
1390 }
1391
1392 #[tokio_test_no_panics]
1393 async fn test_multi_response_chunking_bool_array() {
1394 fn bool_val(b: bool) -> Value {
1395 Value {
1396 kind: Some(prost_types::value::Kind::BoolValue(b)),
1397 }
1398 }
1399 fn null_val() -> Value {
1400 Value {
1401 kind: Some(prost_types::value::Kind::NullValue(0)),
1402 }
1403 }
1404
1405 let mut rs = run_mock_query(vec![
1406 PartialResultSet {
1407 metadata: metadata(1),
1408 values: vec![
1409 list_val(vec![bool_val(true)]),
1410 list_val(vec![bool_val(false), null_val(), bool_val(true)]),
1411 ],
1412 chunked_value: true,
1413 resume_token: vec![],
1414 stats: None,
1415 precommit_token: None,
1416 cache_update: None,
1417 last: false,
1418 },
1419 PartialResultSet {
1420 metadata: None,
1421 values: vec![list_val(vec![bool_val(true), bool_val(true)])],
1422 chunked_value: true,
1423 resume_token: vec![],
1424 stats: None,
1425 precommit_token: None,
1426 cache_update: None,
1427 last: false,
1428 },
1429 PartialResultSet {
1430 metadata: None,
1431 values: vec![
1432 list_val(vec![null_val(), null_val(), bool_val(false)]),
1433 list_val(vec![bool_val(true)]),
1434 ],
1435 chunked_value: false,
1436 resume_token: vec![],
1437 stats: None,
1438 precommit_token: None,
1439 cache_update: None,
1440 last: true,
1441 },
1442 ])
1443 .await;
1444
1445 let row1 = rs.next().await.unwrap().unwrap();
1446 assert_eq!(row1.raw_values()[0].0, list_val(vec![bool_val(true)]));
1447
1448 let row2 = rs.next().await.unwrap().unwrap();
1449 assert_eq!(
1450 row2.raw_values()[0].0,
1451 list_val(vec![
1452 bool_val(false),
1453 null_val(),
1454 bool_val(true),
1455 bool_val(true),
1456 bool_val(true),
1457 null_val(),
1458 null_val(),
1459 bool_val(false)
1460 ])
1461 );
1462
1463 let row3 = rs.next().await.unwrap().unwrap();
1464 assert_eq!(row3.raw_values()[0].0, list_val(vec![bool_val(true)]));
1465
1466 assert!(rs.next().await.is_none());
1467 }
1468
1469 #[tokio_test_no_panics]
1470 async fn test_multi_response_chunking_int64_array() {
1471 fn null_val() -> Value {
1472 Value {
1473 kind: Some(prost_types::value::Kind::NullValue(0)),
1474 }
1475 }
1476
1477 let mut rs = run_mock_query(vec![
1478 PartialResultSet {
1479 metadata: metadata(1),
1480 values: vec![
1481 list_val(vec![string_val("10")]),
1482 list_val(vec![string_val("1"), string_val("2"), null_val()]),
1483 ],
1484 chunked_value: true,
1485 resume_token: vec![],
1486 stats: None,
1487 precommit_token: None,
1488 cache_update: None,
1489 last: false,
1490 },
1491 PartialResultSet {
1492 metadata: None,
1493 values: vec![list_val(vec![null_val(), string_val("5")])],
1494 chunked_value: true,
1495 resume_token: vec![],
1496 stats: None,
1497 precommit_token: None,
1498 cache_update: None,
1499 last: false,
1500 },
1501 PartialResultSet {
1502 metadata: None,
1503 values: vec![
1504 list_val(vec![null_val(), string_val("7"), string_val("8")]),
1505 list_val(vec![string_val("20")]),
1506 ],
1507 chunked_value: false,
1508 resume_token: vec![],
1509 stats: None,
1510 precommit_token: None,
1511 cache_update: None,
1512 last: true,
1513 },
1514 ])
1515 .await;
1516
1517 let row1 = rs.next().await.unwrap().unwrap();
1518 assert_eq!(row1.raw_values()[0].0, list_val(vec![string_val("10")]));
1519
1520 let row2 = rs.next().await.unwrap().unwrap();
1521 assert_eq!(
1522 row2.raw_values()[0].0,
1523 list_val(vec![
1524 string_val("1"),
1525 string_val("2"),
1526 null_val(),
1527 null_val(),
1528 string_val("5"),
1529 null_val(),
1530 string_val("7"),
1531 string_val("8")
1532 ])
1533 );
1534
1535 let row3 = rs.next().await.unwrap().unwrap();
1536 assert_eq!(row3.raw_values()[0].0, list_val(vec![string_val("20")]));
1537
1538 assert!(rs.next().await.is_none());
1539 }
1540
1541 #[tokio_test_no_panics]
1542 async fn test_result_set_precommit_token_tracked() -> anyhow::Result<()> {
1543 let token = MultiplexedSessionPrecommitToken {
1544 precommit_token: b"test_token".to_vec(),
1545 seq_num: 99,
1546 };
1547 let results = vec![PartialResultSet {
1548 metadata: metadata(1),
1549 precommit_token: Some(token.clone()),
1550 last: true,
1551 ..Default::default()
1552 }];
1553
1554 let mut mock = MockSpanner::new();
1555 let rx = adapt(results.into_iter().map(Ok));
1556 mock.expect_execute_streaming_sql()
1557 .return_once(move |_request| Ok(Response::from(rx)));
1558
1559 mock.expect_create_session().returning(|_| {
1560 Ok(Response::new(Session {
1561 name: "session".to_string(),
1562 multiplexed: true,
1563 ..Default::default()
1564 }))
1565 });
1566
1567 let (address, _server) = start("127.0.0.1:0", mock)
1568 .await
1569 .expect("Failed to start mock server");
1570
1571 let client: Spanner = Spanner::builder()
1572 .with_endpoint(address)
1573 .with_credentials(Anonymous::new().build())
1574 .build()
1575 .await
1576 .expect("Failed to build client");
1577
1578 let db_client: crate::database_client::DatabaseClient =
1579 client.database_client("db").build().await.unwrap();
1580
1581 let tracker = PrecommitTokenTracker::new();
1582
1583 let req = crate::model::ExecuteSqlRequest::default()
1584 .set_session("session".to_string())
1585 .set_sql("SELECT 1".to_string());
1586
1587 let stream = db_client
1588 .spanner
1589 .execute_streaming_sql(req.clone(), GaxRequestOptions::default(), 0)
1590 .send()
1591 .await?;
1592
1593 let mut rs = ResultSet::create(ResultSetParams {
1594 stream,
1595 transaction_selector: None,
1596 precommit_token_tracker: tracker.clone(),
1597 client: db_client,
1598 session_name: "session".to_string(),
1599 transaction_tag: None,
1600 operation: StreamOperation::Query(req),
1601 channel_hint: 0,
1602 gax_options: GaxRequestOptions::default(),
1603 })
1604 .await?;
1605
1606 assert!(
1608 rs.next().await.is_none(),
1609 "Expected no rows, but received one"
1610 );
1611
1612 let tracked_token = tracker.get().expect("token should be tracked");
1614 assert_eq!(tracked_token.seq_num, 99);
1615 assert_eq!(
1616 tracked_token.precommit_token,
1617 bytes::Bytes::from("test_token")
1618 );
1619
1620 Ok(())
1621 }
1622
1623 #[tokio_test_no_panics]
1624 async fn test_result_set_retry_simple() -> anyhow::Result<()> {
1625 let mut mock = MockSpanner::new();
1626 let mut seq = mockall::Sequence::new();
1627
1628 mock.expect_execute_streaming_sql()
1629 .times(1)
1630 .in_sequence(&mut seq)
1631 .returning(|_request| {
1632 let stream = adapt([
1633 Ok(PartialResultSet {
1634 metadata: metadata(1),
1635 values: vec![string_val("row1")],
1636 resume_token: b"token1".to_vec(),
1637 ..Default::default()
1638 }),
1639 Err(Status::unavailable("Transient error")),
1640 ]);
1641 Ok(Response::from(stream))
1642 });
1643
1644 mock.expect_execute_streaming_sql()
1645 .times(1)
1646 .in_sequence(&mut seq)
1647 .returning(|_request| {
1648 let stream = adapt([Ok(PartialResultSet {
1649 values: vec![string_val("row2")],
1650 resume_token: b"token2".to_vec(),
1651 last: true,
1652 ..Default::default()
1653 })]);
1654 Ok(Response::from(stream))
1655 });
1656
1657 mock.expect_create_session().returning(|_| {
1658 Ok(Response::new(Session {
1659 name: "session".to_string(),
1660 multiplexed: true,
1661 ..Default::default()
1662 }))
1663 });
1664
1665 let (address, _server) = start("127.0.0.1:0", mock).await?;
1666
1667 let client: Spanner = Spanner::builder()
1668 .with_endpoint(address)
1669 .with_credentials(Anonymous::new().build())
1670 .build()
1671 .await?;
1672
1673 let db_client = client.database_client("db").build().await?;
1674 let tx = db_client.single_use().build();
1675 let mut mock_backoff = MockBackoffPolicy::new();
1676 mock_backoff
1677 .expect_on_failure()
1678 .returning(|_| Duration::from_nanos(1));
1679
1680 let stmt = Statement::builder("SELECT 1")
1681 .with_backoff_policy(mock_backoff)
1682 .build();
1683 let mut rs = tx.execute_query(stmt).await?;
1684
1685 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1686 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1687
1688 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1689 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1690
1691 assert!(rs.next().await.is_none());
1692
1693 Ok(())
1694 }
1695
1696 #[tokio_test_no_panics]
1697 async fn test_result_set_retry_non_retriable_error() -> anyhow::Result<()> {
1698 let mut mock = MockSpanner::new();
1699 mock.expect_execute_streaming_sql()
1700 .times(1)
1701 .returning(|_request| {
1702 let stream = adapt([
1703 Ok(PartialResultSet {
1704 metadata: metadata(1),
1705 values: vec![string_val("row1")],
1706 resume_token: b"token1".to_vec(),
1707 ..Default::default()
1708 }),
1709 Err(Status::invalid_argument("Non-retriable error")),
1710 ]);
1711 Ok(Response::from(stream))
1712 });
1713
1714 mock.expect_create_session().returning(|_| {
1715 Ok(Response::new(Session {
1716 name: "session".to_string(),
1717 multiplexed: true,
1718 ..Default::default()
1719 }))
1720 });
1721
1722 let (address, _server) = start("127.0.0.1:0", mock).await?;
1723
1724 let client: Spanner = Spanner::builder()
1725 .with_endpoint(address)
1726 .with_credentials(Anonymous::new().build())
1727 .build()
1728 .await?;
1729
1730 let db_client = client.database_client("db").build().await?;
1731 let tx = db_client.single_use().build();
1732 let mut rs = tx.execute_query("SELECT 1").await?;
1733
1734 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1735 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1736
1737 let res = rs.next().await;
1738 assert!(res.is_some(), "Expected an error but got None");
1739 let res = res.expect("Expected some response but got None");
1740 assert!(res.is_err(), "Expected an error but got Ok");
1741 let err_str = res.expect_err("Expected should be an error").to_string();
1742 assert!(
1743 err_str.contains("Non-retriable error"),
1744 "Expected error to contain 'Non-retriable error', but got '{}'",
1745 err_str
1746 );
1747
1748 Ok(())
1749 }
1750
1751 #[tokio_test_no_panics]
1752 async fn test_result_set_buffer_overflow() -> anyhow::Result<()> {
1753 let mut mock = MockSpanner::new();
1754 let (tx_msg, rx_msg) = tokio::sync::mpsc::channel(10);
1755 mock.expect_execute_streaming_sql()
1756 .times(1)
1758 .return_once(move |_request| Ok(Response::from(rx_msg)));
1759
1760 mock.expect_create_session().returning(|_| {
1761 Ok(Response::new(Session {
1762 name: "session".to_string(),
1763 multiplexed: true,
1764 ..Default::default()
1765 }))
1766 });
1767
1768 let (address, _server) = start("127.0.0.1:0", mock).await?;
1769
1770 let client: Spanner = Spanner::builder()
1771 .with_endpoint(address)
1772 .with_credentials(Anonymous::new().build())
1773 .build()
1774 .await?;
1775
1776 let db_client = client.database_client("db").build().await?;
1777 let tx = db_client.single_use().build();
1778
1779 tx_msg
1780 .send(Ok(PartialResultSet {
1781 metadata: metadata(1),
1782 values: vec![string_val("row1")],
1783 resume_token: b"token1".to_vec(),
1784 ..Default::default()
1785 }))
1786 .await?;
1787
1788 let mut rs = tx.execute_query("SELECT 1").await?;
1789
1790 rs.set_max_buffered_partial_result_sets(2);
1792
1793 tx_msg
1794 .send(Ok(PartialResultSet {
1795 values: vec![string_val("row2")],
1796 ..Default::default()
1797 }))
1798 .await?;
1799 tx_msg
1800 .send(Ok(PartialResultSet {
1801 values: vec![string_val("row3")],
1802 ..Default::default()
1803 }))
1804 .await?;
1805 tx_msg
1806 .send(Ok(PartialResultSet {
1807 values: vec![string_val("row4")],
1808 ..Default::default()
1809 }))
1810 .await?;
1811 tx_msg
1812 .send(Err(Status::unavailable("Unavailable error")))
1813 .await?;
1814
1815 let row1 = rs.next().await.expect("Expected row1")?;
1817 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1818
1819 let row2 = rs.next().await.expect("Expected row2")?;
1821 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1822
1823 let res = rs.next().await;
1825 assert!(res.is_some(), "Expected an error but got None");
1826 let res = res.expect("Expected some response but got None");
1827 assert!(res.is_err(), "Expected an error but got Ok");
1828 let err_str = res.expect_err("Expected should be an error").to_string();
1829 assert!(
1830 err_str.contains("Unavailable error"),
1831 "Expected error to contain 'Unavailable error', but got '{}'",
1832 err_str
1833 );
1834
1835 Ok(())
1836 }
1837
1838 #[tokio_test_no_panics]
1839 async fn test_result_set_retry_missing_resume_token_safe() -> anyhow::Result<()> {
1840 let mut mock = MockSpanner::new();
1841 let mut seq = mockall::Sequence::new();
1842
1843 mock.expect_execute_streaming_sql()
1844 .times(1)
1845 .in_sequence(&mut seq)
1846 .returning(|_request| {
1847 let stream = adapt([
1848 Ok(PartialResultSet {
1849 metadata: metadata(1),
1850 values: vec![string_val("row1")],
1851 ..Default::default()
1853 }),
1854 Err(Status::unavailable("Unavailable error")),
1855 ]);
1856 Ok(Response::from(stream))
1857 });
1858
1859 mock.expect_execute_streaming_sql()
1860 .times(1)
1861 .in_sequence(&mut seq)
1862 .returning(|_request| {
1863 let stream = adapt([Ok(PartialResultSet {
1864 metadata: metadata(1),
1865 values: vec![string_val("row1_retry")],
1866 resume_token: b"token_retry".to_vec(),
1867 ..Default::default()
1868 })]);
1869 Ok(Response::from(stream))
1870 });
1871
1872 mock.expect_create_session().returning(|_| {
1873 Ok(Response::new(Session {
1874 name: "session".to_string(),
1875 multiplexed: true,
1876 ..Default::default()
1877 }))
1878 });
1879
1880 let (address, _server) = start("127.0.0.1:0", mock).await?;
1881
1882 let client: Spanner = Spanner::builder()
1883 .with_endpoint(address)
1884 .with_credentials(Anonymous::new().build())
1885 .build()
1886 .await?;
1887
1888 let db_client = client.database_client("db").build().await?;
1889 let tx = db_client.single_use().build();
1890 let mut mock_backoff = MockBackoffPolicy::new();
1891 mock_backoff
1892 .expect_on_failure()
1893 .returning(|_| Duration::from_nanos(1));
1894
1895 let stmt = Statement::builder("SELECT 1")
1896 .with_backoff_policy(mock_backoff)
1897 .build();
1898 let mut rs = tx.execute_query(stmt).await?;
1899
1900 let row1 = rs.next().await.expect("Expected row1")?;
1901 assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
1902
1903 Ok(())
1904 }
1905
1906 #[tokio_test_no_panics]
1907 async fn test_result_set_retry_under_limit_no_resume_token() -> anyhow::Result<()> {
1908 let mut mock = MockSpanner::new();
1909 let mut seq = mockall::Sequence::new();
1910
1911 mock.expect_execute_streaming_sql()
1913 .times(1)
1914 .in_sequence(&mut seq)
1915 .returning(|_request| {
1916 let stream = adapt([
1917 Ok(PartialResultSet {
1918 metadata: metadata(1),
1919 values: vec![string_val("row1")],
1920 ..Default::default()
1921 }),
1922 Ok(PartialResultSet {
1923 values: vec![string_val("row2")],
1924 ..Default::default()
1925 }),
1926 Err(Status::unavailable("Unavailable error")),
1927 ]);
1928 Ok(Response::from(stream))
1929 });
1930
1931 mock.expect_execute_streaming_sql()
1934 .times(1)
1935 .in_sequence(&mut seq)
1936 .returning(|request| {
1937 assert!(
1938 request.get_ref().resume_token.is_empty(),
1939 "Expected empty resume token for retry"
1940 );
1941 let stream = adapt([Ok(PartialResultSet {
1942 metadata: metadata(1),
1943 values: vec![string_val("row1_retry")],
1944 resume_token: b"token_retry".to_vec(),
1945 ..Default::default()
1946 })]);
1947 Ok(Response::from(stream))
1948 });
1949
1950 mock.expect_create_session().returning(|_| {
1951 Ok(Response::new(Session {
1952 name: "session".to_string(),
1953 multiplexed: true,
1954 ..Default::default()
1955 }))
1956 });
1957
1958 let (address, _server) = start("127.0.0.1:0", mock).await?;
1959
1960 let client: Spanner = Spanner::builder()
1961 .with_endpoint(address)
1962 .with_credentials(Anonymous::new().build())
1963 .build()
1964 .await?;
1965
1966 let db_client = client.database_client("db").build().await?;
1967 let tx = db_client.single_use().build();
1968 let mut mock_backoff = MockBackoffPolicy::new();
1969 mock_backoff
1970 .expect_on_failure()
1971 .returning(|_| Duration::from_nanos(1));
1972
1973 let stmt = Statement::builder("SELECT 1")
1974 .with_backoff_policy(mock_backoff)
1975 .build();
1976 let mut rs = tx.execute_query(stmt).await?;
1977
1978 rs.set_max_buffered_partial_result_sets(3);
1980
1981 let row1 = rs.next().await.expect("Expected row1")?;
1986 assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
1987
1988 Ok(())
1989 }
1990
1991 #[tokio_test_no_panics]
1992 async fn test_result_set_retry_limit_exceeded() -> anyhow::Result<()> {
1993 let mut mock = MockSpanner::new();
1994
1995 mock.expect_execute_streaming_sql()
1996 .times(11) .returning(|_request| {
1998 let stream = adapt([Err(Status::unavailable("Unavailable error"))]);
1999 Ok(Response::from(stream))
2000 });
2001
2002 mock.expect_create_session().returning(|_| {
2003 Ok(Response::new(Session {
2004 name: "session".to_string(),
2005 multiplexed: true,
2006 ..Default::default()
2007 }))
2008 });
2009
2010 let (address, _server) = start("127.0.0.1:0", mock).await?;
2011
2012 let client: Spanner = Spanner::builder()
2013 .with_endpoint(address)
2014 .with_credentials(Anonymous::new().build())
2015 .build()
2016 .await?;
2017
2018 let db_client = client.database_client("db").build().await?;
2019 let tx = db_client.single_use().build();
2020 let mut mock_backoff = MockBackoffPolicy::new();
2021 mock_backoff
2022 .expect_on_failure()
2023 .times(10)
2024 .returning(|_| Duration::from_nanos(1));
2025
2026 let stmt = Statement::builder("SELECT 1")
2027 .with_backoff_policy(mock_backoff)
2028 .build();
2029 let res = tx.execute_query(stmt).await;
2030
2031 assert!(res.is_err(), "Expected an error but got Ok");
2032 let err_str = res.expect_err("Expected should be an error").to_string();
2033 assert!(
2034 err_str.contains("Unavailable error"),
2035 "Expected error to contain 'Unavailable error', but got '{}'",
2036 err_str
2037 );
2038
2039 Ok(())
2040 }
2041
2042 #[tokio_test_no_panics]
2043 async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> {
2044 let mut mock = MockSpanner::new();
2045 let mut seq = mockall::Sequence::new();
2046
2047 mock.expect_execute_streaming_sql()
2050 .times(1)
2051 .in_sequence(&mut seq)
2052 .returning(|_request| {
2053 let stream = adapt([Err(Status::invalid_argument("Invalid query"))]);
2054 Ok(Response::from(stream))
2055 });
2056
2057 mock.expect_begin_transaction()
2059 .times(1)
2060 .in_sequence(&mut seq)
2061 .returning(|_| {
2062 Ok(Response::new(spanner_v1::Transaction {
2063 id: vec![7, 8, 9],
2064 read_timestamp: Some(prost_types::Timestamp {
2065 seconds: 123456789,
2066 nanos: 0,
2067 }),
2068 ..Default::default()
2069 }))
2070 });
2071
2072 mock.expect_execute_streaming_sql()
2074 .times(1)
2075 .in_sequence(&mut seq)
2076 .returning(|req| {
2077 let req = req.into_inner();
2078 match req.transaction.unwrap().selector.unwrap() {
2080 spanner_v1::transaction_selector::Selector::Id(id) => {
2081 assert_eq!(id, vec![7, 8, 9]);
2082 }
2083 _ => panic!("Expected Selector::Id"),
2084 }
2085
2086 let stream = adapt([Ok(PartialResultSet {
2087 metadata: metadata(1),
2088 values: vec![string_val("1")],
2089 ..Default::default()
2090 })]);
2091 Ok(Response::from(stream))
2092 });
2093
2094 mock.expect_create_session().returning(|_| {
2095 Ok(Response::new(Session {
2096 name: "session".to_string(),
2097 multiplexed: true,
2098 ..Default::default()
2099 }))
2100 });
2101
2102 let (address, _server) = start("127.0.0.1:0", mock).await?;
2103
2104 let client: Spanner = Spanner::builder()
2105 .with_endpoint(address)
2106 .with_credentials(Anonymous::new().build())
2107 .build()
2108 .await?;
2109
2110 let db_client = client.database_client("db").build().await?;
2111
2112 let tx = db_client
2113 .read_only_transaction()
2114 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2115 .build()
2116 .await?;
2117 let mut rs = tx.execute_query("SELECT 1").await?;
2118
2119 let row1 = rs.next().await.ok_or_else(|| {
2120 anyhow::anyhow!("Expected row returned successfully despite stream breaking")
2121 })??;
2122 assert_eq!(
2123 row1.raw_values()[0].0,
2124 string_val("1"),
2125 "Verify the returned stream successfully resumed with the correct payload"
2126 );
2127
2128 Ok(())
2129 }
2130
2131 #[tokio_test_no_panics]
2132 async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> {
2133 let mut mock = MockSpanner::new();
2134 let mut seq = mockall::Sequence::new();
2135
2136 mock.expect_execute_streaming_sql()
2138 .times(1)
2139 .in_sequence(&mut seq)
2140 .returning(|_request| {
2141 let stream = adapt([Err(Status::unavailable("Transient network issue"))]);
2142 Ok(Response::from(stream))
2143 });
2144
2145 mock.expect_execute_streaming_sql()
2148 .times(1)
2149 .in_sequence(&mut seq)
2150 .returning(|req| {
2151 let req = req.into_inner();
2152 match req.transaction.unwrap().selector.unwrap() {
2153 spanner_v1::transaction_selector::Selector::Begin(_) => {}
2154 _ => panic!("Expected Selector::Begin on stream retry"),
2155 }
2156
2157 let mut meta = metadata(1).unwrap();
2158 meta.transaction = Some(spanner_v1::Transaction {
2159 id: vec![7, 8, 9],
2160 read_timestamp: None,
2161 ..Default::default()
2162 });
2163
2164 let stream = adapt([Ok(PartialResultSet {
2165 metadata: Some(meta),
2166 values: vec![string_val("1")],
2167 ..Default::default()
2168 })]);
2169 Ok(Response::from(stream))
2170 });
2171
2172 mock.expect_create_session().returning(|_| {
2173 Ok(Response::new(Session {
2174 name: "session".to_string(),
2175 multiplexed: true,
2176 ..Default::default()
2177 }))
2178 });
2179
2180 let (address, _server) = start("127.0.0.1:0", mock).await?;
2181
2182 let client: Spanner = Spanner::builder()
2183 .with_endpoint(address)
2184 .with_credentials(Anonymous::new().build())
2185 .build()
2186 .await?;
2187
2188 let db_client = client.database_client("db").build().await?;
2189
2190 let tx = db_client
2191 .read_only_transaction()
2192 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2193 .build()
2194 .await?;
2195 let mut rs = tx.execute_query("SELECT 1").await?;
2196
2197 let row1 = rs
2198 .next()
2199 .await
2200 .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??;
2201 assert_eq!(
2202 row1.raw_values()[0].0,
2203 string_val("1"),
2204 "Verify resumed stream returns data"
2205 );
2206
2207 Ok(())
2208 }
2209
2210 #[tokio_test_no_panics]
2211 async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> {
2212 let mut mock = MockSpanner::new();
2213 let mut seq = mockall::Sequence::new();
2214
2215 mock.expect_execute_streaming_sql()
2217 .times(1)
2218 .in_sequence(&mut seq)
2219 .returning(|_request| {
2220 let mut meta = metadata(1).unwrap();
2221 meta.transaction = Some(spanner_v1::Transaction {
2222 id: vec![7, 8, 9],
2223 read_timestamp: None,
2224 ..Default::default()
2225 });
2226 let stream = adapt([
2227 Ok(PartialResultSet {
2228 metadata: Some(meta),
2229 values: vec![string_val("1")],
2230 resume_token: b"token1".to_vec(),
2231 ..Default::default()
2232 }),
2233 Err(Status::unavailable("Transient mid-stream network issue")),
2234 ]);
2235 Ok(Response::from(stream))
2236 });
2237
2238 mock.expect_execute_streaming_sql()
2240 .times(1)
2241 .in_sequence(&mut seq)
2242 .returning(|req| {
2243 let req = req.into_inner();
2244 match req.transaction.unwrap().selector.unwrap() {
2245 spanner_v1::transaction_selector::Selector::Id(id) => {
2246 assert_eq!(id, vec![7, 8, 9]);
2247 }
2248 _ => panic!("Expected Selector::Id on stream retry"),
2249 }
2250
2251 let stream = adapt([Ok(PartialResultSet {
2252 values: vec![string_val("2")],
2253 ..Default::default()
2254 })]);
2255 Ok(Response::from(stream))
2256 });
2257
2258 mock.expect_create_session().returning(|_| {
2259 Ok(Response::new(Session {
2260 name: "session".to_string(),
2261 multiplexed: true,
2262 ..Default::default()
2263 }))
2264 });
2265
2266 let (address, _server) = start("127.0.0.1:0", mock).await?;
2267
2268 let client: Spanner = Spanner::builder()
2269 .with_endpoint(address)
2270 .with_credentials(Anonymous::new().build())
2271 .build()
2272 .await?;
2273
2274 let db_client = client.database_client("db").build().await?;
2275
2276 let tx = db_client
2277 .read_only_transaction()
2278 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2279 .build()
2280 .await?;
2281 let mut rs = tx.execute_query("SELECT 1").await?;
2282
2283 let row1 = rs
2284 .next()
2285 .await
2286 .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??;
2287 assert_eq!(
2288 row1.raw_values()[0].0,
2289 string_val("1"),
2290 "Verified chunk 1 payload"
2291 );
2292 let row2 = rs
2293 .next()
2294 .await
2295 .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??;
2296 assert_eq!(
2297 row2.raw_values()[0].0,
2298 string_val("2"),
2299 "Verified chunk 2 reboot dynamically intercepted ID bounds correctly"
2300 );
2301
2302 Ok(())
2303 }
2304
2305 #[tokio_test_no_panics]
2306 async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> {
2307 let mut mock = MockSpanner::new();
2308 let mut seq = mockall::Sequence::new();
2309
2310 mock.expect_execute_streaming_sql()
2312 .times(1)
2313 .in_sequence(&mut seq)
2314 .returning(|_request| {
2315 let stream = adapt([Ok(PartialResultSet {
2316 metadata: metadata(1), values: vec![string_val("1")],
2318 ..Default::default()
2319 })]);
2320 Ok(Response::from(stream))
2321 });
2322
2323 mock.expect_create_session().returning(|_| {
2324 Ok(Response::new(Session {
2325 name: "session".to_string(),
2326 multiplexed: true,
2327 ..Default::default()
2328 }))
2329 });
2330
2331 let (address, _server) = start("127.0.0.1:0", mock).await?;
2332
2333 let client: Spanner = Spanner::builder()
2334 .with_endpoint(address)
2335 .with_credentials(Anonymous::new().build())
2336 .build()
2337 .await?;
2338
2339 let db_client = client.database_client("db").build().await?;
2340
2341 let tx = db_client
2343 .read_only_transaction()
2344 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2345 .build()
2346 .await?;
2347 let err = tx
2348 .execute_query("SELECT 1")
2349 .await
2350 .expect_err("Expected eager validation error");
2351 assert!(
2352 err.to_string()
2353 .contains("failed to return a transaction ID"),
2354 "Caught implicit gap boundary: {}",
2355 err
2356 );
2357
2358 Ok(())
2359 }
2360
2361 #[tokio_test_no_panics]
2362 async fn result_set_stats() -> anyhow::Result<()> {
2363 let mock_stats = spanner_v1::ResultSetStats {
2364 query_plan: Some(spanner_v1::QueryPlan::default()),
2365 ..Default::default()
2366 };
2367
2368 let mut rs = run_mock_query(vec![PartialResultSet {
2369 metadata: metadata(2),
2370 values: vec![string_val("a"), string_val("b")],
2371 last: true,
2372 stats: Some(mock_stats),
2373 ..Default::default()
2374 }])
2375 .await;
2376
2377 rs.next().await.transpose()?;
2378
2379 let received_stats = rs.stats().expect("stats should be available");
2380 assert!(received_stats.query_plan.is_some());
2381
2382 Ok(())
2383 }
2384
2385 #[tokio_test_no_panics]
2386 async fn result_set_update_count() -> anyhow::Result<()> {
2387 let mock_stats = spanner_v1::ResultSetStats {
2388 row_count: Some(RowCount::RowCountExact(42_i64)),
2389 ..Default::default()
2390 };
2391
2392 let mut result_set = run_mock_query(vec![PartialResultSet {
2393 metadata: metadata(2),
2394 values: vec![string_val("a"), string_val("b")],
2395 last: true,
2396 stats: Some(mock_stats),
2397 ..Default::default()
2398 }])
2399 .await;
2400
2401 result_set.next().await.transpose()?;
2402
2403 let update_count = result_set
2404 .update_count()
2405 .expect("Expected update count to be populated");
2406 assert_eq!(update_count, 42, "Expected exactly 42 rows updated");
2407
2408 Ok(())
2409 }
2410
2411 #[tokio_test_no_panics]
2412 async fn result_set_duplicate_stats() -> anyhow::Result<()> {
2413 let mock_stats = spanner_v1::ResultSetStats {
2414 query_plan: Some(spanner_v1::QueryPlan::default()),
2415 ..Default::default()
2416 };
2417
2418 let mut rs = run_mock_query(vec![
2419 PartialResultSet {
2420 metadata: metadata(2),
2421 values: vec![string_val("a"), string_val("b")],
2422 stats: Some(mock_stats.clone()),
2423 resume_token: b"token1".to_vec(),
2424 ..Default::default()
2425 },
2426 PartialResultSet {
2427 values: vec![string_val("c"), string_val("d")],
2428 stats: Some(mock_stats),
2429 last: true,
2430 resume_token: b"token2".to_vec(),
2431 ..Default::default()
2432 },
2433 ])
2434 .await;
2435
2436 let next = rs.next().await;
2438 assert!(next.is_some());
2439 assert!(next.expect("should yield a row").is_ok());
2440
2441 let res2 = rs.next().await;
2443 assert!(res2.is_some());
2444 let res2 = res2.expect("should yield an error");
2445 assert!(res2.is_err());
2446 let err_str = res2.expect_err("should be an error").to_string();
2447 assert!(err_str.contains("Additional stats received after first"));
2448
2449 Ok(())
2450 }
2451
2452 #[tokio_test_no_panics]
2453 async fn test_lazy_begin_deadlock_fixed() -> anyhow::Result<()> {
2454 let mut mock = MockSpanner::new();
2455 let mut seq = mockall::Sequence::new();
2456
2457 mock.expect_execute_streaming_sql()
2459 .times(1)
2460 .in_sequence(&mut seq)
2461 .returning(|_request| {
2462 let mut meta = metadata(1).expect("failed to create metadata");
2463 meta.transaction = Some(spanner_v1::Transaction {
2464 id: b"lazy_tx_id".to_vec(),
2465 ..Default::default()
2466 });
2467 let rx = adapt(
2468 vec![Ok(PartialResultSet {
2469 metadata: Some(meta),
2470 values: vec![string_val("1")],
2471 ..Default::default()
2472 })]
2473 .into_iter(),
2474 );
2475 Ok(Response::from(rx))
2476 });
2477
2478 mock.expect_execute_streaming_sql()
2480 .times(1)
2481 .in_sequence(&mut seq)
2482 .returning(|req| {
2483 let req = req.into_inner();
2484 let selector = req
2485 .transaction
2486 .expect("missing transaction component")
2487 .selector
2488 .expect("missing selector component");
2489
2490 match selector {
2491 spanner_v1::transaction_selector::Selector::Id(id) => {
2492 assert_eq!(id, b"lazy_tx_id".to_vec());
2493 }
2494 _ => panic!("Expected Selector::Id"),
2495 }
2496
2497 let rx = adapt(
2498 vec![Ok(PartialResultSet {
2499 metadata: metadata(1),
2500 values: vec![string_val("2")],
2501 ..Default::default()
2502 })]
2503 .into_iter(),
2504 );
2505 Ok(Response::from(rx))
2506 });
2507
2508 mock.expect_create_session().returning(|_| {
2509 Ok(Response::new(Session {
2510 name: "session".to_string(),
2511 multiplexed: true,
2512 ..Default::default()
2513 }))
2514 });
2515
2516 let (address, _server) = start("127.0.0.1:0", mock).await?;
2517
2518 let client: Spanner = Spanner::builder()
2519 .with_endpoint(address)
2520 .with_credentials(Anonymous::new().build())
2521 .build()
2522 .await?;
2523
2524 let db_client = client.database_client("db").build().await?;
2525
2526 let tx = db_client
2528 .read_only_transaction()
2529 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2530 .build()
2531 .await?;
2532
2533 let _rs = tx.execute_query("SELECT 1").await?;
2535
2536 let mut rs2 = tx.execute_query("SELECT 2").await?;
2538
2539 let row2 = rs2.next().await;
2541 assert!(
2542 row2.is_some(),
2543 "Implicit deadlock encountered; query 2 stalled"
2544 );
2545
2546 Ok(())
2547 }
2548
2549 #[tokio_test_no_panics]
2550 async fn test_result_set_metadata_not_available() {
2551 let res = run_mock_query_fallible(vec![PartialResultSet {
2553 metadata: None,
2554 values: vec![string_val("1")],
2555 ..Default::default()
2556 }])
2557 .await;
2558
2559 let err = res.expect_err("Expected query initialization to fail eagerly");
2560 assert!(
2561 err.to_string()
2562 .contains("First PartialResultSet did not contain metadata"),
2563 "Expected missing metadata safeguard error, got: {}",
2564 err
2565 );
2566 }
2567
2568 #[tokio_test_no_panics]
2569 async fn test_result_set_metadata_available_before_next() -> anyhow::Result<()> {
2570 let mut mock = MockSpanner::new();
2571
2572 mock.expect_execute_streaming_sql().returning(|_request| {
2574 let rx = adapt(
2575 vec![Ok(PartialResultSet {
2576 metadata: metadata(1),
2577 values: vec![string_val("1")],
2578 ..Default::default()
2579 })]
2580 .into_iter(),
2581 );
2582 Ok(Response::from(rx))
2583 });
2584
2585 mock.expect_create_session().returning(|_| {
2586 Ok(Response::new(Session {
2587 name: "session".to_string(),
2588 multiplexed: true,
2589 ..Default::default()
2590 }))
2591 });
2592
2593 let (address, _server) = start("127.0.0.1:0", mock).await?;
2594
2595 let client: Spanner = Spanner::builder()
2596 .with_endpoint(address)
2597 .with_credentials(Anonymous::new().build())
2598 .build()
2599 .await?;
2600
2601 let db_client = client.database_client("db").build().await?;
2602 let tx = db_client.single_use().build();
2603
2604 let mut rs = tx.execute_query("SELECT 1").await?;
2605
2606 let metadata = rs.metadata().expect("metadata available");
2608 assert_eq!(metadata.column_names().len(), 1);
2609 assert_eq!(metadata.column_names()[0], "col0");
2610
2611 let row = rs.next().await;
2613 assert!(row.is_some());
2614
2615 Ok(())
2616 }
2617}