1use crate::database_client::DatabaseClient;
16use crate::error::internal_error;
17use crate::google::spanner::v1::{self, PartialResultSet};
18use crate::model::ResultSetStats;
19use crate::model::result_set_stats::RowCount;
20use crate::precommit::PrecommitTokenTracker;
21use crate::read_only_transaction::{ReadContextTransactionSelector, TransactionState};
22use crate::result_set_metadata::ResultSetMetadata;
23use crate::retry_policy::SpannerRetryPolicy;
24use crate::row::Row;
25use crate::server_streaming::stream::PartialResultSetStream;
26use bytes::Bytes;
27use gaxi::prost::FromProto;
28use google_cloud_gax::backoff_policy::BackoffPolicy;
29use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
30use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
31use google_cloud_gax::retry_policy::RetryPolicyExt;
32use google_cloud_gax::retry_result::RetryResult;
33use google_cloud_gax::retry_state::RetryState;
34use std::collections::VecDeque;
35use std::mem::take;
36use std::sync::Arc;
37use std::time::Duration;
38use tokio::runtime::Handle;
39use tokio::time::{sleep, timeout};
40
41#[cfg(feature = "unstable-stream")]
42use futures::Stream;
43
44#[derive(Debug)]
59pub struct ResultSet {
60 stream: Option<PartialResultSetStream>,
61 buffered_values: Vec<prost_types::Value>,
62 chunked: bool,
63 seen_last: bool,
64 ready_rows: VecDeque<Row>,
65 local_metadata: Option<ResultSetMetadata>,
66 stats: Option<ResultSetStats>,
67 precommit_token_tracker: PrecommitTokenTracker,
68 tokio_handle: Option<Handle>,
69
70 client: DatabaseClient,
72 session_name: String,
73 transaction_tag: Option<String>,
74 operation: StreamOperation,
75 last_resume_token: Bytes,
76 partial_result_sets_buffer: VecDeque<PartialResultSet>,
77 safe_to_retry: bool,
78 max_buffered_partial_result_sets: usize,
79 retry_count: usize,
80 transaction_selector: Option<ReadContextTransactionSelector>,
81 channel_hint: usize,
82 gax_options: GaxRequestOptions,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) enum StreamOperation {
87 Query(crate::model::ExecuteSqlRequest),
88 Read(crate::model::ReadRequest),
89}
90
91pub(crate) struct ResultSetParams {
92 pub stream: PartialResultSetStream,
93 pub transaction_selector: Option<ReadContextTransactionSelector>,
94 pub precommit_token_tracker: PrecommitTokenTracker,
95 pub client: DatabaseClient,
96 pub session_name: String,
97 pub transaction_tag: Option<String>,
98 pub operation: StreamOperation,
99 pub channel_hint: usize,
100 pub gax_options: GaxRequestOptions,
101}
102
103const MAX_BUFFERED_PARTIAL_RESULT_SETS: usize = 10;
107
108impl ResultSet {
109 pub(crate) async fn create(params: ResultSetParams) -> crate::Result<Self> {
111 let mut result_set = Self::new(params);
112 result_set.init_stream().await?;
113 Ok(result_set)
114 }
115
116 fn new(params: ResultSetParams) -> Self {
118 let ResultSetParams {
119 stream,
120 transaction_selector,
121 precommit_token_tracker,
122 client,
123 session_name,
124 transaction_tag,
125 operation,
126 channel_hint,
127 gax_options,
128 } = params;
129
130 let gax_options = Self::apply_defaults(gax_options);
131
132 Self {
133 stream: Some(stream),
134 buffered_values: Vec::new(),
135 chunked: false,
136 seen_last: false,
137 ready_rows: VecDeque::new(),
138 local_metadata: None,
139 stats: None,
140 precommit_token_tracker,
141 client,
142 session_name,
143 transaction_tag,
144 operation,
145 last_resume_token: Bytes::new(),
146 partial_result_sets_buffer: VecDeque::new(),
147 safe_to_retry: true,
148 max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS,
149 retry_count: 0,
150 transaction_selector,
151 channel_hint,
152 gax_options,
153 tokio_handle: Handle::try_current().ok(),
154 }
155 }
156
157 fn apply_defaults(mut gax_options: GaxRequestOptions) -> GaxRequestOptions {
158 if gax_options.retry_policy().is_none() {
159 gax_options.set_retry_policy(SpannerRetryPolicy::new().with_attempt_limit(10));
160 }
161 if gax_options.backoff_policy().is_none() {
162 gax_options.set_backoff_policy(Self::default_backoff_policy());
163 }
164 gax_options
165 }
166
167 fn default_backoff_policy() -> Arc<dyn BackoffPolicy> {
168 Arc::new(ExponentialBackoffBuilder::default().clamp())
169 }
170
171 async fn init_stream(&mut self) -> crate::Result<()> {
172 loop {
175 let stream_result = match &mut self.stream {
176 Some(s) => s.next_message().await,
177 None => {
178 return Err(internal_error(
179 "Query stream ended without metadata or error",
180 ));
181 }
182 };
183
184 match stream_result {
185 Some(Ok(partial_result_set)) => {
186 self.handle_partial_result_set(partial_result_set)?;
187 return Ok(());
188 }
189 Some(Err(e)) => {
190 self.handle_stream_error(e).await?;
191 }
192 None => {
193 return Err(internal_error(
194 "Query stream ended without metadata or error",
195 ));
196 }
197 }
198 }
199 }
200
201 pub fn metadata(&self) -> Option<&ResultSetMetadata> {
217 self.local_metadata.as_ref()
218 }
219
220 pub fn stats(&self) -> Option<&ResultSetStats> {
241 self.stats.as_ref()
242 }
243
244 pub fn update_count(&self) -> Option<i64> {
273 self.stats.as_ref().and_then(|s| {
274 s.row_count.as_ref().map(|rc| match rc {
275 RowCount::RowCountExact(c) => *c,
276 RowCount::RowCountLowerBound(c) => *c,
277 })
278 })
279 }
280
281 pub async fn next(&mut self) -> Option<crate::Result<Row>> {
297 loop {
298 if let Some(row) = self.ready_rows.pop_front() {
299 return Some(Ok(row));
300 }
301
302 if self.seen_last {
303 if let Some(handle) = &self.tokio_handle
304 && let Some(s) = self.stream.take()
305 {
306 drain_stream_in_background(handle, s);
307 }
308 return None;
309 }
310
311 let stream_result = match &mut self.stream {
312 Some(s) => s.next_message().await,
313 None => return None,
314 };
315
316 match stream_result {
317 Some(Ok(partial_result_set)) => {
318 if let Err(e) = self.handle_partial_result_set(partial_result_set) {
319 return Some(Err(e));
320 }
321 }
322 Some(Err(e)) => {
323 if let Err(err) = self.handle_stream_error(e).await {
324 return Some(Err(err));
325 }
326 }
327 None => match self.handle_stream_end() {
328 Ok(Some(row)) => return Some(Ok(row)),
329 Ok(None) => return None,
330 Err(e) => return Some(Err(e)),
331 },
332 }
333 }
334 }
335
336 #[cfg(feature = "unstable-stream")]
359 pub fn into_stream(self) -> impl Stream<Item = crate::Result<Row>> + Unpin {
360 use futures::stream::unfold;
361 Box::pin(unfold(self, |mut result_set| async move {
362 result_set.next().await.map(|row| (row, result_set))
363 }))
364 }
365}
366
367impl ResultSet {
368 fn handle_partial_result_set(
369 &mut self,
370 mut partial_result_set: PartialResultSet,
371 ) -> crate::Result<()> {
372 self.precommit_token_tracker.update(
373 partial_result_set
374 .precommit_token
375 .clone()
376 .map(|t| t.cnv().expect("failed to convert precommit token")),
377 );
378
379 if partial_result_set.last {
380 self.seen_last = true;
381 }
382
383 match (
384 self.local_metadata.as_ref(),
385 partial_result_set.metadata.take(),
386 ) {
387 (Some(_), None) => {}
388 (None, None) => {
389 return Err(internal_error(
390 "First PartialResultSet did not contain metadata",
391 ));
392 }
393 (Some(_), Some(_)) => {
394 return Err(internal_error("Additional metadata after first result set"));
395 }
396 (None, Some(m)) => {
397 self.handle_metadata(m)?;
398 }
399 }
400
401 if !partial_result_set.resume_token.is_empty() {
406 self.last_resume_token = partial_result_set.resume_token.clone();
407 self.safe_to_retry = true;
408 self.partial_result_sets_buffer
409 .push_back(partial_result_set);
410 self.flush_buffer()?;
411 return Ok(());
412 }
413
414 if self.partial_result_sets_buffer.len() >= self.max_buffered_partial_result_sets {
417 self.safe_to_retry = false;
420 if let Some(oldest) = self.partial_result_sets_buffer.pop_front() {
421 self.process_partial_result_set(oldest)?;
422 }
423 }
424 self.partial_result_sets_buffer
425 .push_back(partial_result_set);
426
427 if self.seen_last {
428 self.flush_buffer()?;
429 if self.chunked {
430 return Err(crate::error::internal_error(
431 "Stream ended with chunked_value=true",
432 ));
433 }
434 }
435
436 Ok(())
437 }
438
439 fn handle_metadata(&mut self, mut m: v1::ResultSetMetadata) -> crate::Result<()> {
440 let transaction = m.transaction.take();
441 let meta = ResultSetMetadata::new(Some(m));
442 if let Some(selector) = &self.transaction_selector {
443 if let Some(transaction) = transaction {
444 selector.update(
445 transaction.id,
446 transaction
447 .read_timestamp
448 .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()),
449 )?;
450 } else if let ReadContextTransactionSelector::Lazy(lazy) = selector {
451 let is_started = matches!(
452 &*lazy.lock().expect("transaction state mutex poisoned"),
453 TransactionState::Started(_, _)
454 );
455 if !is_started {
456 return Err(internal_error(
457 "Spanner failed to return a transaction ID for a query that included a BeginTransaction option",
458 ));
459 }
460 }
461 }
462 self.local_metadata = Some(meta);
463 Ok(())
464 }
465 async fn handle_stream_error(&mut self, e: crate::Error) -> crate::Result<()> {
466 let mut e = e;
467 if self.safe_to_retry {
468 match self.check_retry(e) {
469 Ok(()) => {
470 self.retry_count += 1;
471 self.partial_result_sets_buffer.clear();
474
475 if let Some(policy) = self.gax_options.backoff_policy() {
477 let state = RetryState::new(self.safe_to_retry)
478 .set_attempt_count(self.retry_count as u32);
479 let delay = policy.on_failure(&state);
480 sleep(delay).await;
481 }
482
483 self.restart_stream().await?;
484 return Ok(());
485 }
486 Err(err) => {
487 e = err;
488 }
489 }
490 }
491
492 let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else {
496 return Err(e);
497 };
498 let is_started = matches!(
499 &*lazy.lock().unwrap(),
500 crate::read_only_transaction::TransactionState::Started(_, _)
501 );
502 if is_started {
503 return Err(e);
504 }
505
506 self.transaction_selector
507 .as_ref()
508 .unwrap()
509 .begin_explicitly(crate::read_only_transaction::ExplicitBeginParams {
510 client: self.client.clone(),
511 session_name: self.session_name.clone(),
512 transaction_tag: self.transaction_tag.clone(),
513 channel_hint: self.channel_hint,
514 request_options: self.gax_options.clone(),
515 is_stream_fallback: true,
516 precommit_token_tracker: self.precommit_token_tracker.clone(),
517 mutation_key: None,
518 })
519 .await?;
520
521 self.partial_result_sets_buffer.clear();
522 self.restart_stream().await?;
523 Ok(())
524 }
525
526 fn handle_stream_end(&mut self) -> crate::Result<Option<Row>> {
527 if !self.partial_result_sets_buffer.is_empty() {
532 self.flush_buffer()?;
533 }
534 if self.chunked {
535 return Err(crate::error::internal_error(
537 "Stream ended with chunked_value=true",
538 ));
539 }
540 if let Some(row) = self.ready_rows.pop_front() {
541 return Ok(Some(row));
542 }
543 Ok(None)
544 }
545
546 fn flush_buffer(&mut self) -> crate::Result<()> {
547 let mut buffer_to_flush = take(&mut self.partial_result_sets_buffer);
548 while let Some(partial_result_set) = buffer_to_flush.pop_front() {
549 self.process_partial_result_set(partial_result_set)?;
550 }
551 Ok(())
552 }
553
554 fn process_partial_result_set(
555 &mut self,
556 partial_result_set: PartialResultSet,
557 ) -> crate::Result<()> {
558 let PartialResultSet {
559 stats,
560 values,
561 chunked_value,
562 ..
563 } = partial_result_set;
564
565 match (&self.stats, stats) {
566 (Some(_), Some(_)) => {
567 return Err(internal_error("Additional stats received after first"));
568 }
569 (None, Some(s)) => {
570 let converted_stats = s
571 .cnv()
572 .map_err(|e| internal_error(format!("failed to convert stats: {}", e)))?;
573 self.stats = Some(converted_stats);
574 }
575 _ => {}
576 }
577
578 if values.is_empty() {
579 return Ok(());
580 }
581 let metadata = self.local_metadata.as_ref().ok_or_else(|| {
582 internal_error("PartialResultSet contained values but no metadata was provided")
583 })?;
584 if metadata.column_types.is_empty() {
585 return Err(internal_error(
586 "PartialResultSet contained values but no column metadata was provided",
587 ));
588 }
589
590 let mut values_iter = values.into_iter();
591 if self.chunked
592 && let Some(last_val) = self.buffered_values.last_mut()
593 && let Some(first_new) = values_iter.next()
594 {
595 merge_values(last_val, first_new)?;
596 }
597
598 self.buffered_values.extend(values_iter);
599 self.chunked = chunked_value;
600
601 while self.buffered_values.len() >= metadata.column_types.len() {
602 let column_count = metadata.column_types.len();
603 if self.buffered_values.len() == column_count && self.chunked {
604 break;
605 }
606
607 let row_values: Vec<crate::value::Value> = self
608 .buffered_values
609 .drain(..column_count)
610 .map(crate::value::Value)
611 .collect();
612 self.ready_rows.push_back(Row {
613 values: row_values,
614 metadata: metadata.clone(),
615 });
616 }
617 Ok(())
618 }
619
620 async fn restart_stream(&mut self) -> crate::Result<()> {
621 if let Some(s) = &self.transaction_selector {
625 s.maybe_reset_starting();
626 }
627
628 let transaction_selector = if let Some(s) = &self.transaction_selector {
630 Some(s.selector().await?)
631 } else {
632 None
633 };
634
635 if self.last_resume_token.is_empty() {
640 self.local_metadata = None;
641 }
642
643 match &mut self.operation {
644 StreamOperation::Query(req) => {
645 req.resume_token = self.last_resume_token.clone();
646 req.transaction = transaction_selector
647 .clone()
648 .or_else(|| req.transaction.take());
649 let stream = self
650 .client
651 .spanner
652 .execute_streaming_sql(req.clone(), self.gax_options.clone(), self.channel_hint)
653 .send()
654 .await?;
655 self.stream = Some(stream);
656 }
657 StreamOperation::Read(req) => {
658 req.resume_token = self.last_resume_token.clone();
659 req.transaction = transaction_selector
660 .clone()
661 .or_else(|| req.transaction.take());
662 let stream = self
663 .client
664 .spanner
665 .streaming_read(req.clone(), self.gax_options.clone(), self.channel_hint)
666 .send()
667 .await?;
668 self.stream = Some(stream);
669 }
670 }
671 Ok(())
672 }
673
674 fn check_retry(&self, e: crate::Error) -> Result<(), crate::Error> {
675 if let Some(policy) = self.gax_options.retry_policy() {
676 let state =
677 RetryState::new(self.safe_to_retry).set_attempt_count(self.retry_count as u32);
678
679 match policy.on_error(&state, e) {
680 RetryResult::Continue(_) => return Ok(()),
681 RetryResult::Permanent(err) | RetryResult::Exhausted(err) => return Err(err),
682 }
683 }
684 Err(e)
685 }
686}
687
688impl Drop for ResultSet {
689 fn drop(&mut self) {
690 if self.seen_last
700 && let Some(handle) = &self.tokio_handle
701 && let Some(s) = self.stream.take()
702 {
703 drain_stream_in_background(handle, s);
704 }
705 }
706}
707
708fn drain_stream_in_background(handle: &Handle, mut stream: PartialResultSetStream) {
709 handle.spawn(async move {
710 let _ = timeout(Duration::from_secs(5), async move {
711 while let Some(Ok(_)) = stream.next_message().await {}
712 })
713 .await;
714 });
715}
716
717fn merge_values(target: &mut prost_types::Value, source: prost_types::Value) -> crate::Result<()> {
728 use prost_types::value::Kind;
729 match (&mut target.kind, source.kind) {
730 (Some(Kind::StringValue(s)), Some(Kind::StringValue(source_s))) => {
731 s.push_str(&source_s);
732 Ok(())
733 }
734 (Some(Kind::ListValue(target_list)), Some(Kind::ListValue(mut source_list))) => {
735 if source_list.values.is_empty() {
736 return Ok(());
737 }
738 if target_list.values.is_empty() {
739 target_list.values = source_list.values;
740 return Ok(());
741 }
742
743 let source_first = source_list.values.remove(0);
744 if let Some(target_last) = target_list.values.last_mut() {
745 match (&target_last.kind, &source_first.kind) {
746 (Some(Kind::StringValue(_)), Some(Kind::StringValue(_)))
747 | (Some(Kind::ListValue(_)), Some(Kind::ListValue(_))) => {
748 merge_values(target_last, source_first)?;
749 }
750 _ => {
751 target_list.values.push(source_first);
752 }
753 }
754 } else {
755 target_list.values.push(source_first);
756 }
757 target_list.values.extend(source_list.values);
758 Ok(())
759 }
760 _ => Err(internal_error(
764 "Incompatible types for merging chunked values",
765 )),
766 }
767}
768
769#[cfg(test)]
770impl ResultSet {
771 pub(crate) fn set_max_buffered_partial_result_sets(&mut self, limit: usize) {
772 self.max_buffered_partial_result_sets = limit;
773 }
774}
775
776#[cfg(test)]
777pub(crate) mod tests {
778 use super::*;
779 use crate::client::Spanner;
780 use crate::key::KeySet;
781 use crate::read::ReadRequest;
782 use crate::statement::Statement;
783 use crate::transaction::BeginTransactionOption;
784 use gaxi::grpc::tonic::{Code as GrpcCode, MetadataMap, Response, Status};
785 use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
786 use google_cloud_gax::backoff_policy::BackoffPolicy;
787 use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicyExt};
788 use google_cloud_gax::retry_state::RetryState;
789 use google_cloud_test_macros::tokio_test_no_panics;
790 use prost_types::Value;
791 use spanner_grpc_mock::MockSpanner;
792 use spanner_grpc_mock::google::spanner::v1 as spanner_v1;
793 use spanner_grpc_mock::google::spanner::v1::struct_type::Field;
794 use spanner_grpc_mock::google::spanner::v1::{
795 MultiplexedSessionPrecommitToken, PartialResultSet, ResultSetMetadata, Session, StructType,
796 };
797 use spanner_grpc_mock::start;
798 use spanner_v1::result_set_stats::RowCount;
799 use std::time::Duration;
800
801 mockall::mock! {
802 #[derive(Debug)]
803 BackoffPolicy {}
804 impl BackoffPolicy for BackoffPolicy {
805 fn on_failure(&self, state: &RetryState) -> Duration;
806 }
807 }
808
809 pub(crate) fn string_val(s: &str) -> Value {
810 Value {
811 kind: Some(prost_types::value::Kind::StringValue(s.to_string())),
812 }
813 }
814
815 fn list_val(vals: Vec<Value>) -> Value {
816 Value {
817 kind: Some(prost_types::value::Kind::ListValue(
818 prost_types::ListValue { values: vals },
819 )),
820 }
821 }
822
823 fn metadata(cols: usize) -> Option<ResultSetMetadata> {
824 let mut fields = vec![];
825 for i in 0..cols {
826 fields.push(Field {
827 name: format!("col{}", i),
828 r#type: None,
829 });
830 }
831 Some(ResultSetMetadata {
832 row_type: Some(StructType { fields }),
833 transaction: None,
834 undeclared_parameters: None,
835 })
836 }
837
838 pub(crate) fn adapt<I, T>(items: I) -> tokio::sync::mpsc::Receiver<T>
839 where
840 I: IntoIterator<Item = T>,
841 I::IntoIter: ExactSizeIterator,
842 {
843 let items = items.into_iter();
844 let (tx, rx) = tokio::sync::mpsc::channel(items.len().max(1));
845 for i in items {
846 tx.try_send(i)
847 .expect("can't fail, we allocated enough capacity.");
848 }
849 rx
850 }
851
852 async fn run_mock_query(results: Vec<PartialResultSet>) -> ResultSet {
853 run_mock_query_fallible(results).await.unwrap()
854 }
855
856 async fn run_mock_query_fallible(results: Vec<PartialResultSet>) -> crate::Result<ResultSet> {
857 let mut mock = MockSpanner::new();
858 let rx = adapt(results.into_iter().map(Ok));
859 mock.expect_execute_streaming_sql()
860 .return_once(move |_request| Ok(Response::from(rx)));
861
862 mock.expect_create_session().returning(|_| {
863 Ok(Response::new(Session {
864 name: "session".to_string(),
865 multiplexed: true,
866 ..Default::default()
867 }))
868 });
869
870 let (address, _server) = start("127.0.0.1:0", mock)
871 .await
872 .expect("Failed to start mock server");
873
874 let client: Spanner = Spanner::builder()
875 .with_endpoint(address)
876 .with_credentials(Anonymous::new().build())
877 .build()
878 .await
879 .expect("Failed to build client");
880
881 let db_client: crate::database_client::DatabaseClient =
882 client.database_client("db").build().await.unwrap();
883 let tx: crate::read_only_transaction::SingleUseReadOnlyTransaction =
884 db_client.single_use().build();
885 tx.execute_query("SELECT 1").await
886 }
887
888 #[test]
889 fn test_auto_traits() {
890 static_assertions::assert_impl_all!(ResultSet: std::fmt::Debug, Send, Sync);
891 }
892
893 #[tokio_test_no_panics]
894 async fn test_result_set_zero_rows() {
895 let mut rs = run_mock_query(vec![PartialResultSet {
896 metadata: metadata(2),
897 values: vec![],
898 chunked_value: false,
899 resume_token: vec![],
900 stats: None,
901 precommit_token: None,
902 last: true,
903 cache_update: None,
904 }])
905 .await;
906
907 let next = rs.next().await;
908 assert!(next.is_none());
909 }
910
911 #[tokio_test_no_panics]
912 async fn test_result_set_metadata() -> anyhow::Result<()> {
913 let mut rs = run_mock_query(vec![PartialResultSet {
914 metadata: metadata(2),
915 values: vec![string_val("a"), string_val("b")],
916 last: true,
917 ..Default::default()
918 }])
919 .await;
920
921 let meta = rs.metadata().expect("metadata available");
923 assert_eq!(
924 meta.column_names(),
925 &["col0".to_string(), "col1".to_string()]
926 );
927
928 let _next = rs.next().await.expect("Expected a row")?;
930
931 let meta = rs.metadata().expect("metadata available");
933 assert_eq!(
934 meta.column_names(),
935 &["col0".to_string(), "col1".to_string()]
936 );
937
938 Ok(())
939 }
940
941 #[tokio_test_no_panics]
942 async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> {
943 let res = run_mock_query_fallible(vec![PartialResultSet {
944 values: vec![string_val("row1")],
945 ..Default::default()
946 }])
947 .await;
948
949 assert!(res.is_err(), "Expected an error but got Ok");
950 let err_str = res.expect_err("Expected should be an error").to_string();
951 assert!(
952 err_str.contains("First PartialResultSet did not contain metadata"),
953 "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
954 err_str
955 );
956
957 Ok(())
958 }
959
960 #[tokio_test_no_panics]
961 async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> {
962 let res = run_mock_query_fallible(vec![
963 PartialResultSet {
964 values: vec![string_val("row1")],
965 ..Default::default()
966 },
967 PartialResultSet {
968 resume_token: b"token".to_vec(),
969 ..Default::default()
970 },
971 ])
972 .await;
973
974 assert!(res.is_err(), "Expected an error but got Ok");
975 let err_str = res.expect_err("Expected should be an error").to_string();
976 assert!(
977 err_str.contains("First PartialResultSet did not contain metadata"),
978 "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
979 err_str
980 );
981
982 Ok(())
983 }
984
985 #[tokio_test_no_panics]
986 async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> {
987 let mut rs = run_mock_query(vec![PartialResultSet {
988 metadata: metadata(2),
989 values: vec![string_val("a")],
990 chunked_value: true,
991 ..Default::default()
992 }])
993 .await;
994
995 let res = rs.next().await;
996 assert!(res.is_some(), "Expected an error but got None");
997 let res = res.expect("Expected some response but got None");
998 assert!(res.is_err(), "Expected an error but got Ok");
999 let err_str = res.expect_err("Expected should be an error").to_string();
1000 assert!(
1001 err_str.contains("Stream ended with chunked_value=true"),
1002 "Expected error to contain 'Stream ended with chunked_value=true', but got '{}'",
1003 err_str
1004 );
1005
1006 Ok(())
1007 }
1008
1009 #[tokio_test_no_panics]
1010 async fn test_result_set_duplicate_metadata() -> anyhow::Result<()> {
1011 let mut rs = run_mock_query(vec![
1012 PartialResultSet {
1013 metadata: metadata(2),
1014 values: vec![string_val("a"), string_val("b")],
1015 resume_token: b"token1".to_vec(),
1016 ..Default::default()
1017 },
1018 PartialResultSet {
1019 metadata: metadata(2),
1020 values: vec![string_val("c"), string_val("d")],
1021 ..Default::default()
1022 },
1023 ])
1024 .await;
1025
1026 rs.next().await.expect("Expected a row")?;
1027
1028 let res2 = rs.next().await;
1029 assert!(res2.is_some(), "Expected an error but got None");
1030 let res2 = res2.expect("Expected some response but got None");
1031 assert!(res2.is_err(), "Expected an error but got Ok");
1032 let err_str = res2.expect_err("Expected should be an error").to_string();
1033 assert!(
1034 err_str.contains("Additional metadata after first result set"),
1035 "Expected error to contain 'Additional metadata after first result set', but got '{}'",
1036 err_str
1037 );
1038
1039 Ok(())
1040 }
1041
1042 #[tokio_test_no_panics]
1043 async fn test_result_set_empty_column_metadata() -> anyhow::Result<()> {
1044 let mut rs = run_mock_query(vec![PartialResultSet {
1045 metadata: Some(ResultSetMetadata {
1046 row_type: Some(StructType { fields: vec![] }),
1047 ..Default::default()
1048 }),
1049 values: vec![string_val("a")],
1050 ..Default::default()
1051 }])
1052 .await;
1053
1054 let res = rs.next().await;
1055 assert!(res.is_some(), "Expected an error but got None");
1056 let res = res.expect("Expected some response but got None");
1057 assert!(res.is_err(), "Expected an error but got Ok");
1058 let err_str = res.expect_err("Expected should be an error").to_string();
1059 assert!(
1060 err_str
1061 .contains("PartialResultSet contained values but no column metadata was provided"),
1062 "Expected error to contain 'PartialResultSet contained values but no column metadata was provided', but got '{}'",
1063 err_str
1064 );
1065
1066 Ok(())
1067 }
1068
1069 #[tokio_test_no_panics]
1070 async fn test_result_set_default_policies_applied() -> anyhow::Result<()> {
1071 let rs = run_mock_query(vec![PartialResultSet {
1072 metadata: metadata(2),
1073 last: true,
1074 ..Default::default()
1075 }])
1076 .await;
1077
1078 assert!(
1079 rs.gax_options.retry_policy().is_some(),
1080 "Default retry policy should be applied"
1081 );
1082 assert!(
1083 rs.gax_options.backoff_policy().is_some(),
1084 "Default backoff policy should be applied"
1085 );
1086
1087 Ok(())
1088 }
1089
1090 #[tokio_test_no_panics]
1091 async fn test_result_set_retry_read_stream() -> anyhow::Result<()> {
1092 let mut mock = MockSpanner::new();
1093 let mut seq = mockall::Sequence::new();
1094
1095 mock.expect_streaming_read()
1096 .times(1)
1097 .in_sequence(&mut seq)
1098 .returning(|_request| {
1099 let stream = adapt([
1100 Ok(PartialResultSet {
1101 metadata: metadata(2),
1102 values: vec![string_val("row1"), string_val("b")],
1103 resume_token: b"token1".to_vec(),
1104 ..Default::default()
1105 }),
1106 Err(Status::unavailable("Unavailable error")),
1107 ]);
1108 Ok(Response::from(stream))
1109 });
1110
1111 mock.expect_streaming_read()
1112 .times(1)
1113 .in_sequence(&mut seq)
1114 .returning(|_request| {
1115 let stream = adapt([Ok(PartialResultSet {
1116 values: vec![string_val("row2"), string_val("d")],
1117 resume_token: b"token2".to_vec(),
1118 last: true,
1119 ..Default::default()
1120 })]);
1121 Ok(Response::from(stream))
1122 });
1123
1124 mock.expect_create_session().returning(|_| {
1125 Ok(Response::new(Session {
1126 name: "session".to_string(),
1127 multiplexed: true,
1128 ..Default::default()
1129 }))
1130 });
1131
1132 let (address, _server) = start("127.0.0.1:0", mock).await?;
1133
1134 let client: Spanner = Spanner::builder()
1135 .with_endpoint(address)
1136 .with_credentials(Anonymous::new().build())
1137 .build()
1138 .await?;
1139
1140 let db_client = client.database_client("db").build().await?;
1141 let tx = db_client.single_use().build();
1142 let mut mock_backoff = MockBackoffPolicy::new();
1143 mock_backoff
1144 .expect_on_failure()
1145 .returning(|_| Duration::from_nanos(1));
1146
1147 let read_req = crate::read::ReadRequest::builder("table", vec!["Id", "Value"])
1148 .with_keys(crate::key::KeySet::all())
1149 .with_backoff_policy(mock_backoff)
1150 .build();
1151 let mut rs: ResultSet = tx.execute_read(read_req).await?;
1152
1153 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1154 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1155
1156 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1157 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1158
1159 assert!(rs.next().await.is_none());
1160
1161 Ok(())
1162 }
1163
1164 #[tokio_test_no_panics]
1165 async fn test_result_set_custom_retry_policy() -> anyhow::Result<()> {
1166 let retry_policy = Aip194Strict.continue_on_too_many_requests();
1168
1169 let mut mock = MockSpanner::new();
1170 let mut seq = mockall::Sequence::new();
1171
1172 mock.expect_streaming_read()
1174 .times(1)
1175 .in_sequence(&mut seq)
1176 .returning(|_request| {
1177 let stream = adapt([
1178 Ok(PartialResultSet {
1179 metadata: metadata(2),
1180 values: vec![string_val("row1"), string_val("b")],
1181 resume_token: b"token1".to_vec(),
1182 ..Default::default()
1183 }),
1184 Err(Status::new(GrpcCode::ResourceExhausted, "Quota exceeded")),
1185 ]);
1186 Ok(Response::from(stream))
1187 });
1188
1189 mock.expect_streaming_read()
1191 .times(1)
1192 .in_sequence(&mut seq)
1193 .returning(|_request| {
1194 let stream = adapt([Ok(PartialResultSet {
1195 values: vec![string_val("row2"), string_val("d")],
1196 resume_token: b"token2".to_vec(),
1197 last: true,
1198 ..Default::default()
1199 })]);
1200 Ok(Response::from(stream))
1201 });
1202
1203 mock.expect_create_session().returning(|_| {
1204 Ok(Response::new(Session {
1205 name: "session".to_string(),
1206 multiplexed: true,
1207 ..Default::default()
1208 }))
1209 });
1210
1211 let (address, _server) = start("127.0.0.1:0", mock).await?;
1212
1213 let client: Spanner = Spanner::builder()
1214 .with_endpoint(address)
1215 .with_credentials(Anonymous::new().build())
1216 .build()
1217 .await?;
1218
1219 let db_client = client.database_client("db").build().await?;
1220 let tx = db_client.single_use().build();
1221
1222 let mut mock_backoff = MockBackoffPolicy::new();
1223 mock_backoff
1224 .expect_on_failure()
1225 .times(1)
1226 .returning(|_| Duration::from_nanos(1));
1227
1228 let read_req = ReadRequest::builder("table", vec!["Id", "Value"])
1229 .with_keys(KeySet::all())
1230 .with_retry_policy(retry_policy)
1231 .with_backoff_policy(mock_backoff)
1232 .build();
1233
1234 let mut rs: ResultSet = tx.execute_read(read_req).await?;
1235
1236 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1237 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1238
1239 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1241 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1242
1243 assert!(rs.next().await.is_none());
1244
1245 Ok(())
1246 }
1247
1248 #[tokio_test_no_panics]
1249 async fn test_result_set_transport_error_retry() -> anyhow::Result<()> {
1250 let mut mock = MockSpanner::new();
1251 let mut seq = mockall::Sequence::new();
1252
1253 mock.expect_streaming_read()
1255 .times(1)
1256 .in_sequence(&mut seq)
1257 .returning(|_request| {
1258 let mut status = Status::unavailable("connection reset");
1259 let mut headers = std::mem::take(status.metadata_mut()).into_headers();
1260 headers.insert("content-type", http::HeaderValue::from_static("text/html"));
1261 *status.metadata_mut() = MetadataMap::from_headers(headers);
1262 let stream = adapt([
1263 Ok(PartialResultSet {
1264 metadata: metadata(2),
1265 values: vec![string_val("row1"), string_val("b")],
1266 resume_token: b"token1".to_vec(),
1267 ..Default::default()
1268 }),
1269 Err(status),
1270 ]);
1271 Ok(Response::from(stream))
1272 });
1273
1274 mock.expect_streaming_read()
1276 .times(1)
1277 .in_sequence(&mut seq)
1278 .returning(|_request| {
1279 let stream = adapt([Ok(PartialResultSet {
1280 values: vec![string_val("row2"), string_val("d")],
1281 resume_token: b"token2".to_vec(),
1282 last: true,
1283 ..Default::default()
1284 })]);
1285 Ok(Response::from(stream))
1286 });
1287
1288 mock.expect_create_session().returning(|_| {
1289 Ok(Response::new(Session {
1290 name: "session".to_string(),
1291 multiplexed: true,
1292 ..Default::default()
1293 }))
1294 });
1295
1296 let (address, _server) = start("127.0.0.1:0", mock).await?;
1297
1298 let client: Spanner = Spanner::builder()
1299 .with_endpoint(address)
1300 .with_credentials(Anonymous::new().build())
1301 .build()
1302 .await?;
1303
1304 let db_client = client.database_client("db").build().await?;
1305 let tx = db_client.single_use().build();
1306
1307 let mut mock_backoff = MockBackoffPolicy::new();
1308 mock_backoff
1309 .expect_on_failure()
1310 .times(1)
1311 .returning(|_| Duration::from_nanos(1));
1312
1313 let read_req = ReadRequest::builder("table", vec!["Id", "Value"])
1314 .with_keys(KeySet::all())
1315 .with_backoff_policy(mock_backoff)
1316 .build();
1317
1318 let mut rs: ResultSet = tx.execute_read(read_req).await?;
1319
1320 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1321 assert_eq!(
1322 row1.raw_values()[0].0,
1323 string_val("row1"),
1324 "Expected row1 to be read successfully before transport error"
1325 );
1326
1327 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1329 assert_eq!(
1330 row2.raw_values()[0].0,
1331 string_val("row2"),
1332 "Expected stream to resume and return row2 after transport error retry"
1333 );
1334
1335 assert!(
1336 rs.next().await.is_none(),
1337 "Expected stream to end successfully"
1338 );
1339
1340 Ok(())
1341 }
1342
1343 #[tokio_test_no_panics]
1344 async fn test_result_set_one_row() {
1345 let mut rs = run_mock_query(vec![PartialResultSet {
1346 metadata: metadata(2),
1347 values: vec![string_val("a"), string_val("b")],
1348 chunked_value: false,
1349 resume_token: vec![],
1350 stats: None,
1351 precommit_token: None,
1352 last: true,
1353 cache_update: None,
1354 }])
1355 .await;
1356
1357 let row = rs.next().await.unwrap().unwrap();
1358 assert_eq!(row.raw_values().len(), 2);
1359 assert_eq!(row.raw_values()[0].0, string_val("a"));
1360 assert_eq!(row.raw_values()[1].0, string_val("b"));
1361
1362 assert!(rs.next().await.is_none());
1363 }
1364
1365 #[tokio_test_no_panics]
1366 async fn result_set_last_flag() -> anyhow::Result<()> {
1367 let mut rs = run_mock_query(vec![
1368 PartialResultSet {
1369 metadata: metadata(2),
1370 values: vec![string_val("a"), string_val("b")],
1371 last: true,
1372 ..Default::default()
1373 },
1374 PartialResultSet {
1378 values: vec![string_val("c"), string_val("d")],
1379 ..Default::default()
1380 },
1381 ])
1382 .await;
1383
1384 let row = rs.next().await.expect("Expected a row")?;
1385 assert_eq!(row.raw_values()[0].0, string_val("a"));
1386
1387 assert!(rs.next().await.is_none());
1389
1390 Ok(())
1391 }
1392
1393 #[tokio_test_no_panics]
1394 async fn result_set_last_flag_drained_in_background() -> anyhow::Result<()> {
1395 let mut mock = MockSpanner::new();
1396 let (tx, rx) = tokio::sync::mpsc::channel(10);
1397
1398 mock.expect_execute_streaming_sql()
1399 .return_once(move |_request| Ok(Response::from(rx)));
1400
1401 mock.expect_create_session().returning(|_| {
1402 Ok(Response::new(Session {
1403 name: "session".to_string(),
1404 multiplexed: true,
1405 ..Default::default()
1406 }))
1407 });
1408
1409 let (address, _server) = start("127.0.0.1:0", mock).await?;
1410
1411 let client: Spanner = Spanner::builder()
1412 .with_endpoint(address)
1413 .with_credentials(Anonymous::new().build())
1414 .build()
1415 .await?;
1416
1417 let db_client = client.database_client("db").build().await?;
1418 let tx_single = db_client.single_use().build();
1419
1420 tx.send(Ok(PartialResultSet {
1422 metadata: metadata(2),
1423 values: vec![string_val("a"), string_val("b")],
1424 last: true,
1425 ..Default::default()
1426 }))
1427 .await
1428 .expect("Failed to send first message");
1429
1430 let mut rs: ResultSet = tx_single.execute_query("SELECT 1").await?;
1431
1432 let row = rs.next().await.expect("Expected a row")?;
1434 assert_eq!(row.raw_values()[0].0, string_val("a"));
1435
1436 assert!(rs.next().await.is_none());
1438 drop(rs);
1439
1440 assert!(
1443 !tx.is_closed(),
1444 "Expected stream to remain open in background task for draining"
1445 );
1446
1447 drop(tx);
1449
1450 Ok(())
1451 }
1452
1453 #[tokio_test_no_panics]
1454 async fn result_set_last_flag_drained_in_background_on_drop() -> anyhow::Result<()> {
1455 let mut mock = MockSpanner::new();
1456 let (tx, rx) = tokio::sync::mpsc::channel(10);
1457
1458 mock.expect_execute_streaming_sql()
1459 .return_once(move |_request| Ok(Response::from(rx)));
1460
1461 mock.expect_create_session().returning(|_| {
1462 Ok(Response::new(Session {
1463 name: "session".to_string(),
1464 multiplexed: true,
1465 ..Default::default()
1466 }))
1467 });
1468
1469 let (address, _server) = start("127.0.0.1:0", mock).await?;
1470
1471 let client: Spanner = Spanner::builder()
1472 .with_endpoint(address)
1473 .with_credentials(Anonymous::new().build())
1474 .build()
1475 .await?;
1476
1477 let db_client = client.database_client("db").build().await?;
1478 let tx_single = db_client.single_use().build();
1479
1480 tx.send(Ok(PartialResultSet {
1482 metadata: metadata(2),
1483 values: vec![string_val("a"), string_val("b")],
1484 last: true,
1485 ..Default::default()
1486 }))
1487 .await
1488 .expect("Failed to send first message");
1489
1490 let mut result_set: ResultSet = tx_single.execute_query("SELECT 1").await?;
1491
1492 let row = result_set.next().await.expect("Expected a row")?;
1494 assert_eq!(row.raw_values()[0].0, string_val("a"));
1495
1496 drop(result_set);
1500
1501 assert!(
1504 !tx.is_closed(),
1505 "Expected stream to remain open in background task for draining"
1506 );
1507
1508 drop(tx);
1510
1511 Ok(())
1512 }
1513
1514 #[tokio_test_no_panics]
1515 async fn result_set_last_flag_drained_in_background_on_drop_outside_runtime()
1516 -> anyhow::Result<()> {
1517 let mut mock = MockSpanner::new();
1518 let (tx, rx) = tokio::sync::mpsc::channel(10);
1519
1520 mock.expect_execute_streaming_sql()
1521 .return_once(move |_request| Ok(Response::from(rx)));
1522
1523 mock.expect_create_session().returning(|_| {
1524 Ok(Response::new(Session {
1525 name: "session".to_string(),
1526 multiplexed: true,
1527 ..Default::default()
1528 }))
1529 });
1530
1531 let (address, _server) = start("127.0.0.1:0", mock).await?;
1532
1533 let client: Spanner = Spanner::builder()
1534 .with_endpoint(address)
1535 .with_credentials(Anonymous::new().build())
1536 .build()
1537 .await?;
1538
1539 let db_client = client.database_client("db").build().await?;
1540 let tx_single = db_client.single_use().build();
1541
1542 tx.send(Ok(PartialResultSet {
1544 metadata: metadata(2),
1545 values: vec![string_val("a"), string_val("b")],
1546 last: true,
1547 ..Default::default()
1548 }))
1549 .await
1550 .expect("Failed to send first message");
1551
1552 let mut result_set: ResultSet = tx_single.execute_query("SELECT 1").await?;
1553
1554 let row = result_set.next().await.expect("Expected a row")?;
1556 assert_eq!(row.raw_values()[0].0, string_val("a"));
1557
1558 std::thread::spawn(move || {
1560 drop(result_set);
1561 })
1562 .join()
1563 .expect("Thread panicked");
1564
1565 assert!(
1568 !tx.is_closed(),
1569 "Expected stream to remain open in background task for draining when dropped outside runtime"
1570 );
1571
1572 drop(tx);
1574
1575 Ok(())
1576 }
1577
1578 #[tokio_test_no_panics]
1579 async fn test_result_set_chunked_values_string() {
1580 let mut rs = run_mock_query(vec![
1581 PartialResultSet {
1582 metadata: metadata(1),
1583 values: vec![string_val("hello ")],
1584 chunked_value: true,
1585 resume_token: vec![],
1586 stats: None,
1587 precommit_token: None,
1588 last: false,
1589 cache_update: None,
1590 },
1591 PartialResultSet {
1592 metadata: None,
1593 values: vec![string_val("world")],
1594 chunked_value: false,
1595 resume_token: vec![],
1596 stats: None,
1597 precommit_token: None,
1598 last: true,
1599 cache_update: None,
1600 },
1601 ])
1602 .await;
1603
1604 let row = rs.next().await.unwrap().unwrap();
1605 assert_eq!(row.raw_values().len(), 1);
1606 if let Some(prost_types::value::Kind::StringValue(ref s)) = row.raw_values()[0].0.kind {
1607 assert_eq!(s, "hello world");
1608 } else {
1609 panic!("Expected StringValue");
1610 }
1611 assert!(rs.next().await.is_none());
1612 }
1613
1614 #[tokio_test_no_panics]
1615 async fn test_result_set_chunked_values_list() {
1616 let mut rs = run_mock_query(vec![
1617 PartialResultSet {
1618 metadata: metadata(1),
1619 values: vec![list_val(vec![string_val("A")])],
1620 chunked_value: true,
1621 resume_token: vec![],
1622 stats: None,
1623 precommit_token: None,
1624 last: false,
1625 cache_update: None,
1626 },
1627 PartialResultSet {
1628 metadata: None,
1629 values: vec![list_val(vec![string_val("B")])],
1630 chunked_value: false,
1631 resume_token: vec![],
1632 stats: None,
1633 precommit_token: None,
1634 last: true,
1635 cache_update: None,
1636 },
1637 ])
1638 .await;
1639
1640 let row = rs.next().await.unwrap().unwrap();
1641 assert_eq!(row.raw_values().len(), 1);
1642 if let Some(prost_types::value::Kind::ListValue(ref l)) = row.raw_values()[0].0.kind {
1643 assert_eq!(l.values.len(), 1);
1644 if let Some(prost_types::value::Kind::StringValue(ref s)) = l.values[0].kind {
1645 assert_eq!(s, "AB");
1646 } else {
1647 panic!("Expected StringValue");
1648 }
1649 } else {
1650 panic!("Expected ListValue");
1651 }
1652 assert!(rs.next().await.is_none());
1653 }
1654
1655 #[tokio_test_no_panics]
1656 async fn test_multi_response_chunking_bool_array() {
1657 fn bool_val(b: bool) -> Value {
1658 Value {
1659 kind: Some(prost_types::value::Kind::BoolValue(b)),
1660 }
1661 }
1662 fn null_val() -> Value {
1663 Value {
1664 kind: Some(prost_types::value::Kind::NullValue(0)),
1665 }
1666 }
1667
1668 let mut rs = run_mock_query(vec![
1669 PartialResultSet {
1670 metadata: metadata(1),
1671 values: vec![
1672 list_val(vec![bool_val(true)]),
1673 list_val(vec![bool_val(false), null_val(), bool_val(true)]),
1674 ],
1675 chunked_value: true,
1676 resume_token: vec![],
1677 stats: None,
1678 precommit_token: None,
1679 cache_update: None,
1680 last: false,
1681 },
1682 PartialResultSet {
1683 metadata: None,
1684 values: vec![list_val(vec![bool_val(true), bool_val(true)])],
1685 chunked_value: true,
1686 resume_token: vec![],
1687 stats: None,
1688 precommit_token: None,
1689 cache_update: None,
1690 last: false,
1691 },
1692 PartialResultSet {
1693 metadata: None,
1694 values: vec![
1695 list_val(vec![null_val(), null_val(), bool_val(false)]),
1696 list_val(vec![bool_val(true)]),
1697 ],
1698 chunked_value: false,
1699 resume_token: vec![],
1700 stats: None,
1701 precommit_token: None,
1702 cache_update: None,
1703 last: true,
1704 },
1705 ])
1706 .await;
1707
1708 let row1 = rs.next().await.unwrap().unwrap();
1709 assert_eq!(row1.raw_values()[0].0, list_val(vec![bool_val(true)]));
1710
1711 let row2 = rs.next().await.unwrap().unwrap();
1712 assert_eq!(
1713 row2.raw_values()[0].0,
1714 list_val(vec![
1715 bool_val(false),
1716 null_val(),
1717 bool_val(true),
1718 bool_val(true),
1719 bool_val(true),
1720 null_val(),
1721 null_val(),
1722 bool_val(false)
1723 ])
1724 );
1725
1726 let row3 = rs.next().await.unwrap().unwrap();
1727 assert_eq!(row3.raw_values()[0].0, list_val(vec![bool_val(true)]));
1728
1729 assert!(rs.next().await.is_none());
1730 }
1731
1732 #[tokio_test_no_panics]
1733 async fn test_multi_response_chunking_int64_array() {
1734 fn null_val() -> Value {
1735 Value {
1736 kind: Some(prost_types::value::Kind::NullValue(0)),
1737 }
1738 }
1739
1740 let mut rs = run_mock_query(vec![
1741 PartialResultSet {
1742 metadata: metadata(1),
1743 values: vec![
1744 list_val(vec![string_val("10")]),
1745 list_val(vec![string_val("1"), string_val("2"), null_val()]),
1746 ],
1747 chunked_value: true,
1748 resume_token: vec![],
1749 stats: None,
1750 precommit_token: None,
1751 cache_update: None,
1752 last: false,
1753 },
1754 PartialResultSet {
1755 metadata: None,
1756 values: vec![list_val(vec![null_val(), string_val("5")])],
1757 chunked_value: true,
1758 resume_token: vec![],
1759 stats: None,
1760 precommit_token: None,
1761 cache_update: None,
1762 last: false,
1763 },
1764 PartialResultSet {
1765 metadata: None,
1766 values: vec![
1767 list_val(vec![null_val(), string_val("7"), string_val("8")]),
1768 list_val(vec![string_val("20")]),
1769 ],
1770 chunked_value: false,
1771 resume_token: vec![],
1772 stats: None,
1773 precommit_token: None,
1774 cache_update: None,
1775 last: true,
1776 },
1777 ])
1778 .await;
1779
1780 let row1 = rs.next().await.unwrap().unwrap();
1781 assert_eq!(row1.raw_values()[0].0, list_val(vec![string_val("10")]));
1782
1783 let row2 = rs.next().await.unwrap().unwrap();
1784 assert_eq!(
1785 row2.raw_values()[0].0,
1786 list_val(vec![
1787 string_val("1"),
1788 string_val("2"),
1789 null_val(),
1790 null_val(),
1791 string_val("5"),
1792 null_val(),
1793 string_val("7"),
1794 string_val("8")
1795 ])
1796 );
1797
1798 let row3 = rs.next().await.unwrap().unwrap();
1799 assert_eq!(row3.raw_values()[0].0, list_val(vec![string_val("20")]));
1800
1801 assert!(rs.next().await.is_none());
1802 }
1803
1804 #[tokio_test_no_panics]
1805 async fn test_result_set_precommit_token_tracked() -> anyhow::Result<()> {
1806 let token = MultiplexedSessionPrecommitToken {
1807 precommit_token: b"test_token".to_vec(),
1808 seq_num: 99,
1809 };
1810 let results = vec![PartialResultSet {
1811 metadata: metadata(1),
1812 precommit_token: Some(token.clone()),
1813 last: true,
1814 ..Default::default()
1815 }];
1816
1817 let mut mock = MockSpanner::new();
1818 let rx = adapt(results.into_iter().map(Ok));
1819 mock.expect_execute_streaming_sql()
1820 .return_once(move |_request| Ok(Response::from(rx)));
1821
1822 mock.expect_create_session().returning(|_| {
1823 Ok(Response::new(Session {
1824 name: "session".to_string(),
1825 multiplexed: true,
1826 ..Default::default()
1827 }))
1828 });
1829
1830 let (address, _server) = start("127.0.0.1:0", mock)
1831 .await
1832 .expect("Failed to start mock server");
1833
1834 let client: Spanner = Spanner::builder()
1835 .with_endpoint(address)
1836 .with_credentials(Anonymous::new().build())
1837 .build()
1838 .await
1839 .expect("Failed to build client");
1840
1841 let db_client: crate::database_client::DatabaseClient =
1842 client.database_client("db").build().await.unwrap();
1843
1844 let tracker = PrecommitTokenTracker::new();
1845
1846 let req = crate::model::ExecuteSqlRequest::default()
1847 .set_session("session".to_string())
1848 .set_sql("SELECT 1".to_string());
1849
1850 let stream = db_client
1851 .spanner
1852 .execute_streaming_sql(req.clone(), GaxRequestOptions::default(), 0)
1853 .send()
1854 .await?;
1855
1856 let mut rs = ResultSet::create(ResultSetParams {
1857 stream,
1858 transaction_selector: None,
1859 precommit_token_tracker: tracker.clone(),
1860 client: db_client,
1861 session_name: "session".to_string(),
1862 transaction_tag: None,
1863 operation: StreamOperation::Query(req),
1864 channel_hint: 0,
1865 gax_options: GaxRequestOptions::default(),
1866 })
1867 .await?;
1868
1869 assert!(
1871 rs.next().await.is_none(),
1872 "Expected no rows, but received one"
1873 );
1874
1875 let tracked_token = tracker.get().expect("token should be tracked");
1877 assert_eq!(tracked_token.seq_num, 99);
1878 assert_eq!(
1879 tracked_token.precommit_token,
1880 bytes::Bytes::from("test_token")
1881 );
1882
1883 Ok(())
1884 }
1885
1886 #[tokio_test_no_panics]
1887 async fn test_result_set_retry_simple() -> anyhow::Result<()> {
1888 let mut mock = MockSpanner::new();
1889 let mut seq = mockall::Sequence::new();
1890
1891 mock.expect_execute_streaming_sql()
1892 .times(1)
1893 .in_sequence(&mut seq)
1894 .returning(|_request| {
1895 let stream = adapt([
1896 Ok(PartialResultSet {
1897 metadata: metadata(1),
1898 values: vec![string_val("row1")],
1899 resume_token: b"token1".to_vec(),
1900 ..Default::default()
1901 }),
1902 Err(Status::unavailable("Transient error")),
1903 ]);
1904 Ok(Response::from(stream))
1905 });
1906
1907 mock.expect_execute_streaming_sql()
1908 .times(1)
1909 .in_sequence(&mut seq)
1910 .returning(|_request| {
1911 let stream = adapt([Ok(PartialResultSet {
1912 values: vec![string_val("row2")],
1913 resume_token: b"token2".to_vec(),
1914 last: true,
1915 ..Default::default()
1916 })]);
1917 Ok(Response::from(stream))
1918 });
1919
1920 mock.expect_create_session().returning(|_| {
1921 Ok(Response::new(Session {
1922 name: "session".to_string(),
1923 multiplexed: true,
1924 ..Default::default()
1925 }))
1926 });
1927
1928 let (address, _server) = start("127.0.0.1:0", mock).await?;
1929
1930 let client: Spanner = Spanner::builder()
1931 .with_endpoint(address)
1932 .with_credentials(Anonymous::new().build())
1933 .build()
1934 .await?;
1935
1936 let db_client = client.database_client("db").build().await?;
1937 let tx = db_client.single_use().build();
1938 let mut mock_backoff = MockBackoffPolicy::new();
1939 mock_backoff
1940 .expect_on_failure()
1941 .returning(|_| Duration::from_nanos(1));
1942
1943 let stmt = Statement::builder("SELECT 1")
1944 .with_backoff_policy(mock_backoff)
1945 .build();
1946 let mut rs = tx.execute_query(stmt).await?;
1947
1948 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1949 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1950
1951 let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1952 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1953
1954 assert!(rs.next().await.is_none());
1955
1956 Ok(())
1957 }
1958
1959 #[tokio_test_no_panics]
1960 async fn test_result_set_retry_non_retriable_error() -> anyhow::Result<()> {
1961 let mut mock = MockSpanner::new();
1962 mock.expect_execute_streaming_sql()
1963 .times(1)
1964 .returning(|_request| {
1965 let stream = adapt([
1966 Ok(PartialResultSet {
1967 metadata: metadata(1),
1968 values: vec![string_val("row1")],
1969 resume_token: b"token1".to_vec(),
1970 ..Default::default()
1971 }),
1972 Err(Status::invalid_argument("Non-retriable error")),
1973 ]);
1974 Ok(Response::from(stream))
1975 });
1976
1977 mock.expect_create_session().returning(|_| {
1978 Ok(Response::new(Session {
1979 name: "session".to_string(),
1980 multiplexed: true,
1981 ..Default::default()
1982 }))
1983 });
1984
1985 let (address, _server) = start("127.0.0.1:0", mock).await?;
1986
1987 let client: Spanner = Spanner::builder()
1988 .with_endpoint(address)
1989 .with_credentials(Anonymous::new().build())
1990 .build()
1991 .await?;
1992
1993 let db_client = client.database_client("db").build().await?;
1994 let tx = db_client.single_use().build();
1995 let mut rs = tx.execute_query("SELECT 1").await?;
1996
1997 let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1998 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1999
2000 let res = rs.next().await;
2001 assert!(res.is_some(), "Expected an error but got None");
2002 let res = res.expect("Expected some response but got None");
2003 assert!(res.is_err(), "Expected an error but got Ok");
2004 let err_str = res.expect_err("Expected should be an error").to_string();
2005 assert!(
2006 err_str.contains("Non-retriable error"),
2007 "Expected error to contain 'Non-retriable error', but got '{}'",
2008 err_str
2009 );
2010
2011 Ok(())
2012 }
2013
2014 #[tokio_test_no_panics]
2015 async fn test_result_set_buffer_overflow() -> anyhow::Result<()> {
2016 let mut mock = MockSpanner::new();
2017 let (tx_msg, rx_msg) = tokio::sync::mpsc::channel(10);
2018 mock.expect_execute_streaming_sql()
2019 .times(1)
2021 .return_once(move |_request| Ok(Response::from(rx_msg)));
2022
2023 mock.expect_create_session().returning(|_| {
2024 Ok(Response::new(Session {
2025 name: "session".to_string(),
2026 multiplexed: true,
2027 ..Default::default()
2028 }))
2029 });
2030
2031 let (address, _server) = start("127.0.0.1:0", mock).await?;
2032
2033 let client: Spanner = Spanner::builder()
2034 .with_endpoint(address)
2035 .with_credentials(Anonymous::new().build())
2036 .build()
2037 .await?;
2038
2039 let db_client = client.database_client("db").build().await?;
2040 let tx = db_client.single_use().build();
2041
2042 tx_msg
2043 .send(Ok(PartialResultSet {
2044 metadata: metadata(1),
2045 values: vec![string_val("row1")],
2046 resume_token: b"token1".to_vec(),
2047 ..Default::default()
2048 }))
2049 .await?;
2050
2051 let mut rs = tx.execute_query("SELECT 1").await?;
2052
2053 rs.set_max_buffered_partial_result_sets(2);
2055
2056 tx_msg
2057 .send(Ok(PartialResultSet {
2058 values: vec![string_val("row2")],
2059 ..Default::default()
2060 }))
2061 .await?;
2062 tx_msg
2063 .send(Ok(PartialResultSet {
2064 values: vec![string_val("row3")],
2065 ..Default::default()
2066 }))
2067 .await?;
2068 tx_msg
2069 .send(Ok(PartialResultSet {
2070 values: vec![string_val("row4")],
2071 ..Default::default()
2072 }))
2073 .await?;
2074 tx_msg
2075 .send(Err(Status::unavailable("Unavailable error")))
2076 .await?;
2077
2078 let row1 = rs.next().await.expect("Expected row1")?;
2080 assert_eq!(row1.raw_values()[0].0, string_val("row1"));
2081
2082 let row2 = rs.next().await.expect("Expected row2")?;
2084 assert_eq!(row2.raw_values()[0].0, string_val("row2"));
2085
2086 let res = rs.next().await;
2088 assert!(res.is_some(), "Expected an error but got None");
2089 let res = res.expect("Expected some response but got None");
2090 assert!(res.is_err(), "Expected an error but got Ok");
2091 let err_str = res.expect_err("Expected should be an error").to_string();
2092 assert!(
2093 err_str.contains("Unavailable error"),
2094 "Expected error to contain 'Unavailable error', but got '{}'",
2095 err_str
2096 );
2097
2098 Ok(())
2099 }
2100
2101 #[tokio_test_no_panics]
2102 async fn test_result_set_retry_missing_resume_token_safe() -> anyhow::Result<()> {
2103 let mut mock = MockSpanner::new();
2104 let mut seq = mockall::Sequence::new();
2105
2106 mock.expect_execute_streaming_sql()
2107 .times(1)
2108 .in_sequence(&mut seq)
2109 .returning(|_request| {
2110 let stream = adapt([
2111 Ok(PartialResultSet {
2112 metadata: metadata(1),
2113 values: vec![string_val("row1")],
2114 ..Default::default()
2116 }),
2117 Err(Status::unavailable("Unavailable error")),
2118 ]);
2119 Ok(Response::from(stream))
2120 });
2121
2122 mock.expect_execute_streaming_sql()
2123 .times(1)
2124 .in_sequence(&mut seq)
2125 .returning(|_request| {
2126 let stream = adapt([Ok(PartialResultSet {
2127 metadata: metadata(1),
2128 values: vec![string_val("row1_retry")],
2129 resume_token: b"token_retry".to_vec(),
2130 ..Default::default()
2131 })]);
2132 Ok(Response::from(stream))
2133 });
2134
2135 mock.expect_create_session().returning(|_| {
2136 Ok(Response::new(Session {
2137 name: "session".to_string(),
2138 multiplexed: true,
2139 ..Default::default()
2140 }))
2141 });
2142
2143 let (address, _server) = start("127.0.0.1:0", mock).await?;
2144
2145 let client: Spanner = Spanner::builder()
2146 .with_endpoint(address)
2147 .with_credentials(Anonymous::new().build())
2148 .build()
2149 .await?;
2150
2151 let db_client = client.database_client("db").build().await?;
2152 let tx = db_client.single_use().build();
2153 let mut mock_backoff = MockBackoffPolicy::new();
2154 mock_backoff
2155 .expect_on_failure()
2156 .returning(|_| Duration::from_nanos(1));
2157
2158 let stmt = Statement::builder("SELECT 1")
2159 .with_backoff_policy(mock_backoff)
2160 .build();
2161 let mut rs = tx.execute_query(stmt).await?;
2162
2163 let row1 = rs.next().await.expect("Expected row1")?;
2164 assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
2165
2166 Ok(())
2167 }
2168
2169 #[tokio_test_no_panics]
2170 async fn test_result_set_retry_under_limit_no_resume_token() -> anyhow::Result<()> {
2171 let mut mock = MockSpanner::new();
2172 let mut seq = mockall::Sequence::new();
2173
2174 mock.expect_execute_streaming_sql()
2176 .times(1)
2177 .in_sequence(&mut seq)
2178 .returning(|_request| {
2179 let stream = adapt([
2180 Ok(PartialResultSet {
2181 metadata: metadata(1),
2182 values: vec![string_val("row1")],
2183 ..Default::default()
2184 }),
2185 Ok(PartialResultSet {
2186 values: vec![string_val("row2")],
2187 ..Default::default()
2188 }),
2189 Err(Status::unavailable("Unavailable error")),
2190 ]);
2191 Ok(Response::from(stream))
2192 });
2193
2194 mock.expect_execute_streaming_sql()
2197 .times(1)
2198 .in_sequence(&mut seq)
2199 .returning(|request| {
2200 assert!(
2201 request.get_ref().resume_token.is_empty(),
2202 "Expected empty resume token for retry"
2203 );
2204 let stream = adapt([Ok(PartialResultSet {
2205 metadata: metadata(1),
2206 values: vec![string_val("row1_retry")],
2207 resume_token: b"token_retry".to_vec(),
2208 ..Default::default()
2209 })]);
2210 Ok(Response::from(stream))
2211 });
2212
2213 mock.expect_create_session().returning(|_| {
2214 Ok(Response::new(Session {
2215 name: "session".to_string(),
2216 multiplexed: true,
2217 ..Default::default()
2218 }))
2219 });
2220
2221 let (address, _server) = start("127.0.0.1:0", mock).await?;
2222
2223 let client: Spanner = Spanner::builder()
2224 .with_endpoint(address)
2225 .with_credentials(Anonymous::new().build())
2226 .build()
2227 .await?;
2228
2229 let db_client = client.database_client("db").build().await?;
2230 let tx = db_client.single_use().build();
2231 let mut mock_backoff = MockBackoffPolicy::new();
2232 mock_backoff
2233 .expect_on_failure()
2234 .returning(|_| Duration::from_nanos(1));
2235
2236 let stmt = Statement::builder("SELECT 1")
2237 .with_backoff_policy(mock_backoff)
2238 .build();
2239 let mut rs = tx.execute_query(stmt).await?;
2240
2241 rs.set_max_buffered_partial_result_sets(3);
2243
2244 let row1 = rs.next().await.expect("Expected row1")?;
2249 assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
2250
2251 Ok(())
2252 }
2253
2254 #[tokio_test_no_panics]
2255 async fn test_result_set_retry_limit_exceeded() -> anyhow::Result<()> {
2256 let mut mock = MockSpanner::new();
2257
2258 mock.expect_execute_streaming_sql()
2259 .times(11) .returning(|_request| {
2261 let stream = adapt([Err(Status::unavailable("Unavailable error"))]);
2262 Ok(Response::from(stream))
2263 });
2264
2265 mock.expect_create_session().returning(|_| {
2266 Ok(Response::new(Session {
2267 name: "session".to_string(),
2268 multiplexed: true,
2269 ..Default::default()
2270 }))
2271 });
2272
2273 let (address, _server) = start("127.0.0.1:0", mock).await?;
2274
2275 let client: Spanner = Spanner::builder()
2276 .with_endpoint(address)
2277 .with_credentials(Anonymous::new().build())
2278 .build()
2279 .await?;
2280
2281 let db_client = client.database_client("db").build().await?;
2282 let tx = db_client.single_use().build();
2283 let mut mock_backoff = MockBackoffPolicy::new();
2284 mock_backoff
2285 .expect_on_failure()
2286 .times(10)
2287 .returning(|_| Duration::from_nanos(1));
2288
2289 let stmt = Statement::builder("SELECT 1")
2290 .with_backoff_policy(mock_backoff)
2291 .build();
2292 let res = tx.execute_query(stmt).await;
2293
2294 assert!(res.is_err(), "Expected an error but got Ok");
2295 let err_str = res.expect_err("Expected should be an error").to_string();
2296 assert!(
2297 err_str.contains("Unavailable error"),
2298 "Expected error to contain 'Unavailable error', but got '{}'",
2299 err_str
2300 );
2301
2302 Ok(())
2303 }
2304
2305 #[tokio_test_no_panics]
2306 async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> {
2307 let mut mock = MockSpanner::new();
2308 let mut seq = mockall::Sequence::new();
2309
2310 mock.expect_execute_streaming_sql()
2313 .times(1)
2314 .in_sequence(&mut seq)
2315 .returning(|_request| {
2316 let stream = adapt([Err(Status::invalid_argument("Invalid query"))]);
2317 Ok(Response::from(stream))
2318 });
2319
2320 mock.expect_begin_transaction()
2322 .times(1)
2323 .in_sequence(&mut seq)
2324 .returning(|_| {
2325 Ok(Response::new(spanner_v1::Transaction {
2326 id: vec![7, 8, 9],
2327 read_timestamp: Some(prost_types::Timestamp {
2328 seconds: 123456789,
2329 nanos: 0,
2330 }),
2331 ..Default::default()
2332 }))
2333 });
2334
2335 mock.expect_execute_streaming_sql()
2337 .times(1)
2338 .in_sequence(&mut seq)
2339 .returning(|req| {
2340 let req = req.into_inner();
2341 match req.transaction.unwrap().selector.unwrap() {
2343 spanner_v1::transaction_selector::Selector::Id(id) => {
2344 assert_eq!(id, vec![7, 8, 9]);
2345 }
2346 _ => panic!("Expected Selector::Id"),
2347 }
2348
2349 let stream = adapt([Ok(PartialResultSet {
2350 metadata: metadata(1),
2351 values: vec![string_val("1")],
2352 ..Default::default()
2353 })]);
2354 Ok(Response::from(stream))
2355 });
2356
2357 mock.expect_create_session().returning(|_| {
2358 Ok(Response::new(Session {
2359 name: "session".to_string(),
2360 multiplexed: true,
2361 ..Default::default()
2362 }))
2363 });
2364
2365 let (address, _server) = start("127.0.0.1:0", mock).await?;
2366
2367 let client: Spanner = Spanner::builder()
2368 .with_endpoint(address)
2369 .with_credentials(Anonymous::new().build())
2370 .build()
2371 .await?;
2372
2373 let db_client = client.database_client("db").build().await?;
2374
2375 let tx = db_client
2376 .read_only_transaction()
2377 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2378 .build()
2379 .await?;
2380 let mut rs = tx.execute_query("SELECT 1").await?;
2381
2382 let row1 = rs.next().await.ok_or_else(|| {
2383 anyhow::anyhow!("Expected row returned successfully despite stream breaking")
2384 })??;
2385 assert_eq!(
2386 row1.raw_values()[0].0,
2387 string_val("1"),
2388 "Verify the returned stream successfully resumed with the correct payload"
2389 );
2390
2391 Ok(())
2392 }
2393
2394 #[tokio_test_no_panics]
2395 async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> {
2396 let mut mock = MockSpanner::new();
2397 let mut seq = mockall::Sequence::new();
2398
2399 mock.expect_execute_streaming_sql()
2401 .times(1)
2402 .in_sequence(&mut seq)
2403 .returning(|_request| {
2404 let stream = adapt([Err(Status::unavailable("Transient network issue"))]);
2405 Ok(Response::from(stream))
2406 });
2407
2408 mock.expect_execute_streaming_sql()
2411 .times(1)
2412 .in_sequence(&mut seq)
2413 .returning(|req| {
2414 let req = req.into_inner();
2415 match req.transaction.unwrap().selector.unwrap() {
2416 spanner_v1::transaction_selector::Selector::Begin(_) => {}
2417 _ => panic!("Expected Selector::Begin on stream retry"),
2418 }
2419
2420 let mut meta = metadata(1).unwrap();
2421 meta.transaction = Some(spanner_v1::Transaction {
2422 id: vec![7, 8, 9],
2423 read_timestamp: None,
2424 ..Default::default()
2425 });
2426
2427 let stream = adapt([Ok(PartialResultSet {
2428 metadata: Some(meta),
2429 values: vec![string_val("1")],
2430 ..Default::default()
2431 })]);
2432 Ok(Response::from(stream))
2433 });
2434
2435 mock.expect_create_session().returning(|_| {
2436 Ok(Response::new(Session {
2437 name: "session".to_string(),
2438 multiplexed: true,
2439 ..Default::default()
2440 }))
2441 });
2442
2443 let (address, _server) = start("127.0.0.1:0", mock).await?;
2444
2445 let client: Spanner = Spanner::builder()
2446 .with_endpoint(address)
2447 .with_credentials(Anonymous::new().build())
2448 .build()
2449 .await?;
2450
2451 let db_client = client.database_client("db").build().await?;
2452
2453 let tx = db_client
2454 .read_only_transaction()
2455 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2456 .build()
2457 .await?;
2458 let mut rs = tx.execute_query("SELECT 1").await?;
2459
2460 let row1 = rs
2461 .next()
2462 .await
2463 .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??;
2464 assert_eq!(
2465 row1.raw_values()[0].0,
2466 string_val("1"),
2467 "Verify resumed stream returns data"
2468 );
2469
2470 Ok(())
2471 }
2472
2473 #[tokio_test_no_panics]
2474 async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> {
2475 let mut mock = MockSpanner::new();
2476 let mut seq = mockall::Sequence::new();
2477
2478 mock.expect_execute_streaming_sql()
2480 .times(1)
2481 .in_sequence(&mut seq)
2482 .returning(|_request| {
2483 let mut meta = metadata(1).unwrap();
2484 meta.transaction = Some(spanner_v1::Transaction {
2485 id: vec![7, 8, 9],
2486 read_timestamp: None,
2487 ..Default::default()
2488 });
2489 let stream = adapt([
2490 Ok(PartialResultSet {
2491 metadata: Some(meta),
2492 values: vec![string_val("1")],
2493 resume_token: b"token1".to_vec(),
2494 ..Default::default()
2495 }),
2496 Err(Status::unavailable("Transient mid-stream network issue")),
2497 ]);
2498 Ok(Response::from(stream))
2499 });
2500
2501 mock.expect_execute_streaming_sql()
2503 .times(1)
2504 .in_sequence(&mut seq)
2505 .returning(|req| {
2506 let req = req.into_inner();
2507 match req.transaction.unwrap().selector.unwrap() {
2508 spanner_v1::transaction_selector::Selector::Id(id) => {
2509 assert_eq!(id, vec![7, 8, 9]);
2510 }
2511 _ => panic!("Expected Selector::Id on stream retry"),
2512 }
2513
2514 let stream = adapt([Ok(PartialResultSet {
2515 values: vec![string_val("2")],
2516 ..Default::default()
2517 })]);
2518 Ok(Response::from(stream))
2519 });
2520
2521 mock.expect_create_session().returning(|_| {
2522 Ok(Response::new(Session {
2523 name: "session".to_string(),
2524 multiplexed: true,
2525 ..Default::default()
2526 }))
2527 });
2528
2529 let (address, _server) = start("127.0.0.1:0", mock).await?;
2530
2531 let client: Spanner = Spanner::builder()
2532 .with_endpoint(address)
2533 .with_credentials(Anonymous::new().build())
2534 .build()
2535 .await?;
2536
2537 let db_client = client.database_client("db").build().await?;
2538
2539 let tx = db_client
2540 .read_only_transaction()
2541 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2542 .build()
2543 .await?;
2544 let mut rs = tx.execute_query("SELECT 1").await?;
2545
2546 let row1 = rs
2547 .next()
2548 .await
2549 .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??;
2550 assert_eq!(
2551 row1.raw_values()[0].0,
2552 string_val("1"),
2553 "Verified chunk 1 payload"
2554 );
2555 let row2 = rs
2556 .next()
2557 .await
2558 .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??;
2559 assert_eq!(
2560 row2.raw_values()[0].0,
2561 string_val("2"),
2562 "Verified chunk 2 reboot dynamically intercepted ID bounds correctly"
2563 );
2564
2565 Ok(())
2566 }
2567
2568 #[tokio_test_no_panics]
2569 async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> {
2570 let mut mock = MockSpanner::new();
2571 let mut seq = mockall::Sequence::new();
2572
2573 mock.expect_execute_streaming_sql()
2575 .times(1)
2576 .in_sequence(&mut seq)
2577 .returning(|_request| {
2578 let stream = adapt([Ok(PartialResultSet {
2579 metadata: metadata(1), values: vec![string_val("1")],
2581 ..Default::default()
2582 })]);
2583 Ok(Response::from(stream))
2584 });
2585
2586 mock.expect_create_session().returning(|_| {
2587 Ok(Response::new(Session {
2588 name: "session".to_string(),
2589 multiplexed: true,
2590 ..Default::default()
2591 }))
2592 });
2593
2594 let (address, _server) = start("127.0.0.1:0", mock).await?;
2595
2596 let client: Spanner = Spanner::builder()
2597 .with_endpoint(address)
2598 .with_credentials(Anonymous::new().build())
2599 .build()
2600 .await?;
2601
2602 let db_client = client.database_client("db").build().await?;
2603
2604 let tx = db_client
2606 .read_only_transaction()
2607 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2608 .build()
2609 .await?;
2610 let err = tx
2611 .execute_query("SELECT 1")
2612 .await
2613 .expect_err("Expected eager validation error");
2614 assert!(
2615 err.to_string()
2616 .contains("failed to return a transaction ID"),
2617 "Caught implicit gap boundary: {}",
2618 err
2619 );
2620
2621 Ok(())
2622 }
2623
2624 #[tokio_test_no_panics]
2625 async fn result_set_stats() -> anyhow::Result<()> {
2626 let mock_stats = spanner_v1::ResultSetStats {
2627 query_plan: Some(spanner_v1::QueryPlan::default()),
2628 ..Default::default()
2629 };
2630
2631 let mut rs = run_mock_query(vec![PartialResultSet {
2632 metadata: metadata(2),
2633 values: vec![string_val("a"), string_val("b")],
2634 last: true,
2635 stats: Some(mock_stats),
2636 ..Default::default()
2637 }])
2638 .await;
2639
2640 rs.next().await.transpose()?;
2641
2642 let received_stats = rs.stats().expect("stats should be available");
2643 assert!(received_stats.query_plan.is_some());
2644
2645 Ok(())
2646 }
2647
2648 #[tokio_test_no_panics]
2649 async fn result_set_update_count() -> anyhow::Result<()> {
2650 let mock_stats = spanner_v1::ResultSetStats {
2651 row_count: Some(RowCount::RowCountExact(42_i64)),
2652 ..Default::default()
2653 };
2654
2655 let mut result_set = run_mock_query(vec![PartialResultSet {
2656 metadata: metadata(2),
2657 values: vec![string_val("a"), string_val("b")],
2658 last: true,
2659 stats: Some(mock_stats),
2660 ..Default::default()
2661 }])
2662 .await;
2663
2664 result_set.next().await.transpose()?;
2665
2666 let update_count = result_set
2667 .update_count()
2668 .expect("Expected update count to be populated");
2669 assert_eq!(update_count, 42, "Expected exactly 42 rows updated");
2670
2671 Ok(())
2672 }
2673
2674 #[tokio_test_no_panics]
2675 async fn result_set_duplicate_stats() -> anyhow::Result<()> {
2676 let mock_stats = spanner_v1::ResultSetStats {
2677 query_plan: Some(spanner_v1::QueryPlan::default()),
2678 ..Default::default()
2679 };
2680
2681 let mut rs = run_mock_query(vec![
2682 PartialResultSet {
2683 metadata: metadata(2),
2684 values: vec![string_val("a"), string_val("b")],
2685 stats: Some(mock_stats.clone()),
2686 resume_token: b"token1".to_vec(),
2687 ..Default::default()
2688 },
2689 PartialResultSet {
2690 values: vec![string_val("c"), string_val("d")],
2691 stats: Some(mock_stats),
2692 last: true,
2693 resume_token: b"token2".to_vec(),
2694 ..Default::default()
2695 },
2696 ])
2697 .await;
2698
2699 let next = rs.next().await;
2701 assert!(next.is_some());
2702 assert!(next.expect("should yield a row").is_ok());
2703
2704 let res2 = rs.next().await;
2706 assert!(res2.is_some());
2707 let res2 = res2.expect("should yield an error");
2708 assert!(res2.is_err());
2709 let err_str = res2.expect_err("should be an error").to_string();
2710 assert!(err_str.contains("Additional stats received after first"));
2711
2712 Ok(())
2713 }
2714
2715 #[tokio_test_no_panics]
2716 async fn test_lazy_begin_deadlock_fixed() -> anyhow::Result<()> {
2717 let mut mock = MockSpanner::new();
2718 let mut seq = mockall::Sequence::new();
2719
2720 mock.expect_execute_streaming_sql()
2722 .times(1)
2723 .in_sequence(&mut seq)
2724 .returning(|_request| {
2725 let mut meta = metadata(1).expect("failed to create metadata");
2726 meta.transaction = Some(spanner_v1::Transaction {
2727 id: b"lazy_tx_id".to_vec(),
2728 ..Default::default()
2729 });
2730 let rx = adapt(
2731 vec![Ok(PartialResultSet {
2732 metadata: Some(meta),
2733 values: vec![string_val("1")],
2734 ..Default::default()
2735 })]
2736 .into_iter(),
2737 );
2738 Ok(Response::from(rx))
2739 });
2740
2741 mock.expect_execute_streaming_sql()
2743 .times(1)
2744 .in_sequence(&mut seq)
2745 .returning(|req| {
2746 let req = req.into_inner();
2747 let selector = req
2748 .transaction
2749 .expect("missing transaction component")
2750 .selector
2751 .expect("missing selector component");
2752
2753 match selector {
2754 spanner_v1::transaction_selector::Selector::Id(id) => {
2755 assert_eq!(id, b"lazy_tx_id".to_vec());
2756 }
2757 _ => panic!("Expected Selector::Id"),
2758 }
2759
2760 let rx = adapt(
2761 vec![Ok(PartialResultSet {
2762 metadata: metadata(1),
2763 values: vec![string_val("2")],
2764 ..Default::default()
2765 })]
2766 .into_iter(),
2767 );
2768 Ok(Response::from(rx))
2769 });
2770
2771 mock.expect_create_session().returning(|_| {
2772 Ok(Response::new(Session {
2773 name: "session".to_string(),
2774 multiplexed: true,
2775 ..Default::default()
2776 }))
2777 });
2778
2779 let (address, _server) = start("127.0.0.1:0", mock).await?;
2780
2781 let client: Spanner = Spanner::builder()
2782 .with_endpoint(address)
2783 .with_credentials(Anonymous::new().build())
2784 .build()
2785 .await?;
2786
2787 let db_client = client.database_client("db").build().await?;
2788
2789 let tx = db_client
2791 .read_only_transaction()
2792 .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2793 .build()
2794 .await?;
2795
2796 let _rs = tx.execute_query("SELECT 1").await?;
2798
2799 let mut rs2 = tx.execute_query("SELECT 2").await?;
2801
2802 let row2 = rs2.next().await;
2804 assert!(
2805 row2.is_some(),
2806 "Implicit deadlock encountered; query 2 stalled"
2807 );
2808
2809 Ok(())
2810 }
2811
2812 #[tokio_test_no_panics]
2813 async fn test_result_set_metadata_not_available() {
2814 let res = run_mock_query_fallible(vec![PartialResultSet {
2816 metadata: None,
2817 values: vec![string_val("1")],
2818 ..Default::default()
2819 }])
2820 .await;
2821
2822 let err = res.expect_err("Expected query initialization to fail eagerly");
2823 assert!(
2824 err.to_string()
2825 .contains("First PartialResultSet did not contain metadata"),
2826 "Expected missing metadata safeguard error, got: {}",
2827 err
2828 );
2829 }
2830
2831 #[tokio_test_no_panics]
2832 async fn test_result_set_metadata_available_before_next() -> anyhow::Result<()> {
2833 let mut mock = MockSpanner::new();
2834
2835 mock.expect_execute_streaming_sql().returning(|_request| {
2837 let rx = adapt(
2838 vec![Ok(PartialResultSet {
2839 metadata: metadata(1),
2840 values: vec![string_val("1")],
2841 ..Default::default()
2842 })]
2843 .into_iter(),
2844 );
2845 Ok(Response::from(rx))
2846 });
2847
2848 mock.expect_create_session().returning(|_| {
2849 Ok(Response::new(Session {
2850 name: "session".to_string(),
2851 multiplexed: true,
2852 ..Default::default()
2853 }))
2854 });
2855
2856 let (address, _server) = start("127.0.0.1:0", mock).await?;
2857
2858 let client: Spanner = Spanner::builder()
2859 .with_endpoint(address)
2860 .with_credentials(Anonymous::new().build())
2861 .build()
2862 .await?;
2863
2864 let db_client = client.database_client("db").build().await?;
2865 let tx = db_client.single_use().build();
2866
2867 let mut rs = tx.execute_query("SELECT 1").await?;
2868
2869 let metadata = rs.metadata().expect("metadata available");
2871 assert_eq!(metadata.column_names().len(), 1);
2872 assert_eq!(metadata.column_names()[0], "col0");
2873
2874 let row = rs.next().await;
2876 assert!(row.is_some());
2877
2878 Ok(())
2879 }
2880}