Skip to main content

google_cloud_spanner/
result_set.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::database_client::DatabaseClient;
16use crate::error::internal_error;
17use crate::google::spanner::v1::{self, PartialResultSet};
18use crate::model::ResultSetStats;
19use crate::model::result_set_stats::RowCount;
20use crate::precommit::PrecommitTokenTracker;
21use crate::read_only_transaction::{ReadContextTransactionSelector, TransactionState};
22use crate::result_set_metadata::ResultSetMetadata;
23use crate::row::Row;
24use crate::server_streaming::stream::PartialResultSetStream;
25use bytes::Bytes;
26use gaxi::prost::FromProto;
27use google_cloud_gax::backoff_policy::BackoffPolicy;
28use google_cloud_gax::error::Error as GaxError;
29use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
30use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
31use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicyExt};
32use google_cloud_gax::retry_state::RetryState;
33use std::collections::VecDeque;
34use std::mem::take;
35use std::sync::Arc;
36use tokio::time::sleep;
37
38#[cfg(feature = "unstable-stream")]
39use futures::Stream;
40
41/// `ResultSet` contains the rows of a query result.
42///
43/// # Example
44/// ```
45/// # use google_cloud_spanner::result::ResultSet;
46/// # use google_cloud_spanner::result::Row;
47/// # async fn process_result_set(mut rs: ResultSet) -> Result<(), google_cloud_spanner::Error> {
48/// while let Some(row) = rs.next().await {
49///     let row: Row = row?;
50///     // Process the row
51/// }
52/// # Ok(())
53/// # }
54/// ```
55#[derive(Debug)]
56pub struct ResultSet {
57    stream: Option<PartialResultSetStream>,
58    buffered_values: Vec<prost_types::Value>,
59    chunked: bool,
60    seen_last: bool,
61    ready_rows: VecDeque<Row>,
62    local_metadata: Option<ResultSetMetadata>,
63    stats: Option<ResultSetStats>,
64    precommit_token_tracker: PrecommitTokenTracker,
65
66    // Fields for retries and buffering of a stream of PartialResultSets.
67    client: DatabaseClient,
68    session_name: String,
69    transaction_tag: Option<String>,
70    operation: StreamOperation,
71    last_resume_token: Bytes,
72    partial_result_sets_buffer: VecDeque<PartialResultSet>,
73    safe_to_retry: bool,
74    max_buffered_partial_result_sets: usize,
75    retry_count: usize,
76    transaction_selector: Option<ReadContextTransactionSelector>,
77    channel_hint: usize,
78    gax_options: GaxRequestOptions,
79}
80
81#[derive(Debug, Clone)]
82pub(crate) enum StreamOperation {
83    Query(crate::model::ExecuteSqlRequest),
84    Read(crate::model::ReadRequest),
85}
86
87pub(crate) struct ResultSetParams {
88    pub stream: PartialResultSetStream,
89    pub transaction_selector: Option<ReadContextTransactionSelector>,
90    pub precommit_token_tracker: PrecommitTokenTracker,
91    pub client: DatabaseClient,
92    pub session_name: String,
93    pub transaction_tag: Option<String>,
94    pub operation: StreamOperation,
95    pub channel_hint: usize,
96    pub gax_options: GaxRequestOptions,
97}
98
99// The maximum number of PartialResultSets to buffer without a resume token.
100// Spanner will normally include a resume token with each PartialResultSet.
101// This maximum is therefore primarily for safety.
102const MAX_BUFFERED_PARTIAL_RESULT_SETS: usize = 10;
103
104impl ResultSet {
105    /// Creates a new result set asynchronously, waiting for the first chunk to arrive.
106    pub(crate) async fn create(params: ResultSetParams) -> crate::Result<Self> {
107        let mut result_set = Self::new(params);
108        result_set.init_stream().await?;
109        Ok(result_set)
110    }
111
112    /// Creates a new result set.
113    fn new(params: ResultSetParams) -> Self {
114        let ResultSetParams {
115            stream,
116            transaction_selector,
117            precommit_token_tracker,
118            client,
119            session_name,
120            transaction_tag,
121            operation,
122            channel_hint,
123            gax_options,
124        } = params;
125
126        let gax_options = Self::apply_defaults(gax_options);
127
128        Self {
129            stream: Some(stream),
130            buffered_values: Vec::new(),
131            chunked: false,
132            seen_last: false,
133            ready_rows: VecDeque::new(),
134            local_metadata: None,
135            stats: None,
136            precommit_token_tracker,
137            client,
138            session_name,
139            transaction_tag,
140            operation,
141            last_resume_token: Bytes::new(),
142            partial_result_sets_buffer: VecDeque::new(),
143            safe_to_retry: true,
144            max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS,
145            retry_count: 0,
146            transaction_selector,
147            channel_hint,
148            gax_options,
149        }
150    }
151
152    fn apply_defaults(mut gax_options: GaxRequestOptions) -> GaxRequestOptions {
153        if gax_options.retry_policy().is_none() {
154            gax_options.set_retry_policy(Aip194Strict.with_attempt_limit(10));
155        }
156        if gax_options.backoff_policy().is_none() {
157            gax_options.set_backoff_policy(Self::default_backoff_policy());
158        }
159        gax_options
160    }
161
162    fn default_backoff_policy() -> Arc<dyn BackoffPolicy> {
163        Arc::new(ExponentialBackoffBuilder::default().clamp())
164    }
165
166    async fn init_stream(&mut self) -> crate::Result<()> {
167        // We loop here because if an initial stream failure occurs and is retriable (e.g., UNAVAILABLE),
168        // we restart the stream and retry fetching the initial chunk.
169        loop {
170            let stream_result = match &mut self.stream {
171                Some(s) => s.next_message().await,
172                None => {
173                    return Err(internal_error(
174                        "Query stream ended without metadata or error",
175                    ));
176                }
177            };
178
179            match stream_result {
180                Some(Ok(partial_result_set)) => {
181                    self.handle_partial_result_set(partial_result_set)?;
182                    return Ok(());
183                }
184                Some(Err(e)) => {
185                    self.handle_stream_error(e).await?;
186                }
187                None => {
188                    return Err(internal_error(
189                        "Query stream ended without metadata or error",
190                    ));
191                }
192            }
193        }
194    }
195
196    /// Returns the metadata of the result set.
197    ///
198    /// # Example
199    /// ```
200    /// # use google_cloud_spanner::result::ResultSet;
201    /// # use google_cloud_spanner::result::Row;
202    /// # async fn fetch_metadata(mut rs: ResultSet) -> Result<(), Box<dyn std::error::Error>> {
203    /// if let Some(metadata) = rs.metadata() {
204    ///     for column in metadata.column_names() {
205    ///         println!("Column name: {}", column);
206    ///     }
207    /// }
208    /// # Ok(())
209    /// # }
210    /// ```
211    pub fn metadata(&self) -> Option<&ResultSetMetadata> {
212        self.local_metadata.as_ref()
213    }
214
215    /// Returns the stats of the result set, if available.
216    ///
217    /// # Example
218    /// ```
219    /// # use google_cloud_spanner::result::ResultSet;
220    /// # use google_cloud_spanner::result::Row;
221    /// # async fn process_stats(mut rs: ResultSet) -> Result<(), google_cloud_spanner::Error> {
222    /// while let Some(row) = rs.next().await {
223    ///     let row = row?;
224    ///     // Process row
225    /// }
226    /// if let Some(stats) = rs.stats() {
227    ///     println!("Query plan: {:?}", stats.query_plan);
228    /// }
229    /// # Ok(())
230    /// # }
231    /// ```
232    ///
233    /// Stats are only available after the results have been fully consumed
234    /// and the query was run in PLAN or PROFILE mode.
235    pub fn stats(&self) -> Option<&ResultSetStats> {
236        self.stats.as_ref()
237    }
238
239    /// Returns the number of rows modified by the DML statement, if available.
240    ///
241    /// # Example
242    /// ```
243    /// # use google_cloud_spanner::client::DatabaseClient;
244    /// # use google_cloud_spanner::result::ResultSet;
245    /// # use google_cloud_spanner::statement::Statement;
246    /// # async fn check_update_count(db_client: &DatabaseClient) -> Result<(), Box<dyn std::error::Error>> {
247    /// let runner = db_client.read_write_transaction().build().await?;
248    /// runner.run(async |tx| {
249    ///     let stmt = Statement::builder("UPDATE Singers SET LastName = 'Simpson' WHERE SingerId = @id THEN RETURN SingerId, LastName")
250    ///         .add_param("id", &123_i64)
251    ///         .build();
252    ///     let mut rs = tx.execute_query(stmt).await?;
253    ///     while let Some(row) = rs.next().await.transpose()? {
254    ///         // Process returned rows
255    ///     }
256    ///     if let Some(count) = rs.update_count() {
257    ///         println!("Rows modified: {}", count);
258    ///     }
259    ///     Ok(())
260    /// }).await?;
261    /// # Ok(())
262    /// # }
263    /// ```
264    ///
265    /// Returns the number of rows modified when this [`ResultSet`] was produced from a
266    /// DML statement with a `THEN RETURN` clause.
267    pub fn update_count(&self) -> Option<i64> {
268        self.stats.as_ref().and_then(|s| {
269            s.row_count.as_ref().map(|rc| match rc {
270                RowCount::RowCountExact(c) => *c,
271                RowCount::RowCountLowerBound(c) => *c,
272            })
273        })
274    }
275
276    /// Fetches the next row from the result set.
277    ///
278    /// # Example
279    /// ```
280    /// # use google_cloud_spanner::result::ResultSet;
281    /// # use google_cloud_spanner::result::Row;
282    /// # async fn fetch_next(mut rs: ResultSet) -> Result<(), google_cloud_spanner::Error> {
283    /// if let Some(row) = rs.next().await.transpose()? {
284    ///     // Process the row
285    /// }
286    /// # Ok(())
287    /// # }
288    /// ```
289    ///
290    /// Returns `None` when all rows have been retrieved.
291    pub async fn next(&mut self) -> Option<crate::Result<Row>> {
292        loop {
293            if let Some(row) = self.ready_rows.pop_front() {
294                return Some(Ok(row));
295            }
296
297            if self.seen_last {
298                self.stream = None;
299                return None;
300            }
301
302            let stream_result = match &mut self.stream {
303                Some(s) => s.next_message().await,
304                None => return None,
305            };
306
307            match stream_result {
308                Some(Ok(partial_result_set)) => {
309                    if let Err(e) = self.handle_partial_result_set(partial_result_set) {
310                        return Some(Err(e));
311                    }
312                }
313                Some(Err(e)) => {
314                    if let Err(err) = self.handle_stream_error(e).await {
315                        return Some(Err(err));
316                    }
317                }
318                None => match self.handle_stream_end() {
319                    Ok(Some(row)) => return Some(Ok(row)),
320                    Ok(None) => return None,
321                    Err(e) => return Some(Err(e)),
322                },
323            }
324        }
325    }
326
327    /// Converts the [`ResultSet`] into a [`Stream`].
328    ///
329    /// # Example
330    ///
331    /// ```
332    /// # use google_cloud_spanner::result::ResultSet;
333    /// # use futures::TryStreamExt;
334    /// # use std::future::ready;
335    /// # async fn example(result_set: ResultSet) -> Result<(), google_cloud_spanner::Error> {
336    /// let rows: Vec<_> = result_set
337    ///     .into_stream()
338    ///     .try_filter(|row| {
339    ///         let id = row.get::<String, _>("Id");
340    ///         ready(id == "id1")
341    ///     })
342    ///     .try_collect()
343    ///     .await?;
344    /// # Ok(())
345    /// # }
346    /// ```
347    ///
348    /// This consumes the [`ResultSet`] and returns a stream of rows.
349    #[cfg(feature = "unstable-stream")]
350    pub fn into_stream(self) -> impl Stream<Item = crate::Result<Row>> + Unpin {
351        use futures::stream::unfold;
352        Box::pin(unfold(self, |mut result_set| async move {
353            result_set.next().await.map(|row| (row, result_set))
354        }))
355    }
356}
357
358impl ResultSet {
359    fn handle_partial_result_set(
360        &mut self,
361        mut partial_result_set: PartialResultSet,
362    ) -> crate::Result<()> {
363        self.precommit_token_tracker.update(
364            partial_result_set
365                .precommit_token
366                .clone()
367                .map(|t| t.cnv().expect("failed to convert precommit token")),
368        );
369
370        if partial_result_set.last {
371            self.seen_last = true;
372        }
373
374        match (
375            self.local_metadata.as_ref(),
376            partial_result_set.metadata.take(),
377        ) {
378            (Some(_), None) => {}
379            (None, None) => {
380                return Err(internal_error(
381                    "First PartialResultSet did not contain metadata",
382                ));
383            }
384            (Some(_), Some(_)) => {
385                return Err(internal_error("Additional metadata after first result set"));
386            }
387            (None, Some(m)) => {
388                self.handle_metadata(m)?;
389            }
390        }
391
392        // Keep track of the last resume_token that we see to be able to resume the stream
393        // in case of a transient error. Most PartialResultSets will have a resume token,
394        // but the API contract is not explicitly guaranteeing that each of them will have
395        // one.
396        if !partial_result_set.resume_token.is_empty() {
397            self.last_resume_token = partial_result_set.resume_token.clone();
398            self.safe_to_retry = true;
399            self.partial_result_sets_buffer
400                .push_back(partial_result_set);
401            self.flush_buffer()?;
402            return Ok(());
403        }
404
405        // The PartialResultSet did not have a resume_token. Buffer the result
406        // and continue with the next PartialResultSet, unless the buffer is full.
407        if self.partial_result_sets_buffer.len() >= self.max_buffered_partial_result_sets {
408            // Mark this stream as 'unsafe to retry', meaning that any transient error
409            // that we see will not be retried. We will instead propagate the error.
410            self.safe_to_retry = false;
411            if let Some(oldest) = self.partial_result_sets_buffer.pop_front() {
412                self.process_partial_result_set(oldest)?;
413            }
414        }
415        self.partial_result_sets_buffer
416            .push_back(partial_result_set);
417
418        if self.seen_last {
419            self.flush_buffer()?;
420            if self.chunked {
421                return Err(crate::error::internal_error(
422                    "Stream ended with chunked_value=true",
423                ));
424            }
425        }
426
427        Ok(())
428    }
429
430    fn handle_metadata(&mut self, mut m: v1::ResultSetMetadata) -> crate::Result<()> {
431        let transaction = m.transaction.take();
432        let meta = ResultSetMetadata::new(Some(m));
433        if let Some(selector) = &self.transaction_selector {
434            if let Some(transaction) = transaction {
435                selector.update(
436                    transaction.id,
437                    transaction
438                        .read_timestamp
439                        .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()),
440                )?;
441            } else if let ReadContextTransactionSelector::Lazy(lazy) = selector {
442                let is_started = matches!(
443                    &*lazy.lock().expect("transaction state mutex poisoned"),
444                    TransactionState::Started(_, _)
445                );
446                if !is_started {
447                    return Err(internal_error(
448                        "Spanner failed to return a transaction ID for a query that included a BeginTransaction option",
449                    ));
450                }
451            }
452        }
453        self.local_metadata = Some(meta);
454        Ok(())
455    }
456
457    async fn handle_stream_error(&mut self, e: crate::Error) -> crate::Result<()> {
458        if self.safe_to_retry && self.should_retry(&e) {
459            self.retry_count += 1;
460            // Clear the buffer and restart the stream using the last
461            // resume_token that we have seen.
462            self.partial_result_sets_buffer.clear();
463
464            // Apply backoff delay if policy is present
465            if let Some(policy) = self.gax_options.backoff_policy() {
466                let state =
467                    RetryState::new(self.safe_to_retry).set_attempt_count(self.retry_count as u32);
468                let delay = policy.on_failure(&state);
469                sleep(delay).await;
470            }
471
472            self.restart_stream().await?;
473            return Ok(());
474        }
475
476        // Check if this stream included an inlined BeginTransaction option
477        // and has not yet returned a transaction ID. If so, we explicitly
478        // begin the transaction and restart the stream.
479        let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else {
480            return Err(e);
481        };
482        let is_started = matches!(
483            &*lazy.lock().unwrap(),
484            crate::read_only_transaction::TransactionState::Started(_, _)
485        );
486        if is_started {
487            return Err(e);
488        }
489
490        self.transaction_selector
491            .as_ref()
492            .unwrap()
493            .begin_explicitly(crate::read_only_transaction::ExplicitBeginParams {
494                client: self.client.clone(),
495                session_name: self.session_name.clone(),
496                transaction_tag: self.transaction_tag.clone(),
497                channel_hint: self.channel_hint,
498                request_options: self.gax_options.clone(),
499                is_stream_fallback: true,
500                precommit_token_tracker: self.precommit_token_tracker.clone(),
501                mutation_key: None,
502            })
503            .await?;
504
505        self.partial_result_sets_buffer.clear();
506        self.restart_stream().await?;
507        Ok(())
508    }
509
510    fn handle_stream_end(&mut self) -> crate::Result<Option<Row>> {
511        // We are at the end of the stream. Return any buffered rows as long
512        // as there are any. If there are no buffered rows, return None.
513
514        // First flush any PartialResultSets that we had received without a resume_token.
515        if !self.partial_result_sets_buffer.is_empty() {
516            self.flush_buffer()?;
517        }
518        if self.chunked {
519            // This should never happen.
520            return Err(crate::error::internal_error(
521                "Stream ended with chunked_value=true",
522            ));
523        }
524        if let Some(row) = self.ready_rows.pop_front() {
525            return Ok(Some(row));
526        }
527        Ok(None)
528    }
529
530    fn flush_buffer(&mut self) -> crate::Result<()> {
531        let mut buffer_to_flush = take(&mut self.partial_result_sets_buffer);
532        while let Some(partial_result_set) = buffer_to_flush.pop_front() {
533            self.process_partial_result_set(partial_result_set)?;
534        }
535        Ok(())
536    }
537
538    fn process_partial_result_set(
539        &mut self,
540        partial_result_set: PartialResultSet,
541    ) -> crate::Result<()> {
542        let PartialResultSet {
543            stats,
544            values,
545            chunked_value,
546            ..
547        } = partial_result_set;
548
549        match (&self.stats, stats) {
550            (Some(_), Some(_)) => {
551                return Err(internal_error("Additional stats received after first"));
552            }
553            (None, Some(s)) => {
554                let converted_stats = s
555                    .cnv()
556                    .map_err(|e| internal_error(format!("failed to convert stats: {}", e)))?;
557                self.stats = Some(converted_stats);
558            }
559            _ => {}
560        }
561
562        if values.is_empty() {
563            return Ok(());
564        }
565        let metadata = self.local_metadata.as_ref().ok_or_else(|| {
566            internal_error("PartialResultSet contained values but no metadata was provided")
567        })?;
568        if metadata.column_types.is_empty() {
569            return Err(internal_error(
570                "PartialResultSet contained values but no column metadata was provided",
571            ));
572        }
573
574        let mut values_iter = values.into_iter();
575        if self.chunked
576            && let Some(last_val) = self.buffered_values.last_mut()
577            && let Some(first_new) = values_iter.next()
578        {
579            merge_values(last_val, first_new)?;
580        }
581
582        self.buffered_values.extend(values_iter);
583        self.chunked = chunked_value;
584
585        while self.buffered_values.len() >= metadata.column_types.len() {
586            let column_count = metadata.column_types.len();
587            if self.buffered_values.len() == column_count && self.chunked {
588                break;
589            }
590
591            let row_values: Vec<crate::value::Value> = self
592                .buffered_values
593                .drain(..column_count)
594                .map(crate::value::Value)
595                .collect();
596            self.ready_rows.push_back(Row {
597                values: row_values,
598                metadata: metadata.clone(),
599            });
600        }
601        Ok(())
602    }
603
604    async fn restart_stream(&mut self) -> crate::Result<()> {
605        // If we are restarting the stream (due to a failure), and the transaction
606        // was in the process of starting (but failed before yielding an ID),
607        // reset the state so the retry attempt can include the begin option again.
608        if let Some(s) = &self.transaction_selector {
609            s.maybe_reset_starting();
610        }
611
612        // Get the latest transaction selector for this transaction.
613        let transaction_selector = if let Some(s) = &self.transaction_selector {
614            Some(s.selector().await?)
615        } else {
616            None
617        };
618
619        // If we are restarting the stream from the beginning (because no resume token
620        // was received prior to the transient failure), we clear our local metadata state.
621        // This ensures that when Spanner transmits the initial metadata chunk on the retried stream,
622        // it is extracted without triggering the 'only-once' metadata validation error.
623        if self.last_resume_token.is_empty() {
624            self.local_metadata = None;
625        }
626
627        match &mut self.operation {
628            StreamOperation::Query(req) => {
629                req.resume_token = self.last_resume_token.clone();
630                req.transaction = transaction_selector
631                    .clone()
632                    .or_else(|| req.transaction.take());
633                let stream = self
634                    .client
635                    .spanner
636                    .execute_streaming_sql(req.clone(), self.gax_options.clone(), self.channel_hint)
637                    .send()
638                    .await?;
639                self.stream = Some(stream);
640            }
641            StreamOperation::Read(req) => {
642                req.resume_token = self.last_resume_token.clone();
643                req.transaction = transaction_selector
644                    .clone()
645                    .or_else(|| req.transaction.take());
646                let stream = self
647                    .client
648                    .spanner
649                    .streaming_read(req.clone(), self.gax_options.clone(), self.channel_hint)
650                    .send()
651                    .await?;
652                self.stream = Some(stream);
653            }
654        }
655        Ok(())
656    }
657
658    fn should_retry(&self, e: &crate::Error) -> bool {
659        if let Some(policy) = self.gax_options.retry_policy() {
660            let state =
661                RetryState::new(self.safe_to_retry).set_attempt_count(self.retry_count as u32);
662
663            if let Some(status) = e.status() {
664                let gax_error = GaxError::service(status.clone());
665                return policy.on_error(&state, gax_error).is_continue();
666            }
667        }
668        false
669    }
670}
671
672/// Merges two values from successive `PartialResultSet`s into a single value.
673///
674/// Cloud Spanner can return a single logical row or column value split across multiple
675/// `PartialResultSet` messages. This occurs when a value (especially large strings or
676/// arrays) exceeds the message size limits of the underlying stream. In these cases,
677/// the `chunked_value` flag is set on the first `PartialResultSet`, indicating that the
678/// final value in the message's `values` array is incomplete and must be combined with
679/// the first value in the `values` array of the subsequent `PartialResultSet`.
680///
681/// This function handles the concatenation of split `StringValue` and `ListValue` types.
682fn merge_values(target: &mut prost_types::Value, source: prost_types::Value) -> crate::Result<()> {
683    use prost_types::value::Kind;
684    match (&mut target.kind, source.kind) {
685        (Some(Kind::StringValue(s)), Some(Kind::StringValue(source_s))) => {
686            s.push_str(&source_s);
687            Ok(())
688        }
689        (Some(Kind::ListValue(target_list)), Some(Kind::ListValue(mut source_list))) => {
690            if source_list.values.is_empty() {
691                return Ok(());
692            }
693            if target_list.values.is_empty() {
694                target_list.values = source_list.values;
695                return Ok(());
696            }
697
698            let source_first = source_list.values.remove(0);
699            if let Some(target_last) = target_list.values.last_mut() {
700                match (&target_last.kind, &source_first.kind) {
701                    (Some(Kind::StringValue(_)), Some(Kind::StringValue(_)))
702                    | (Some(Kind::ListValue(_)), Some(Kind::ListValue(_))) => {
703                        merge_values(target_last, source_first)?;
704                    }
705                    _ => {
706                        target_list.values.push(source_first);
707                    }
708                }
709            } else {
710                target_list.values.push(source_first);
711            }
712            target_list.values.extend(source_list.values);
713            Ok(())
714        }
715        // This is not expected to happen and indicates that Spanner returned data that
716        // violates the contract. In this case we return a service error with error code
717        // Internal.
718        _ => Err(internal_error(
719            "Incompatible types for merging chunked values",
720        )),
721    }
722}
723
724#[cfg(test)]
725impl ResultSet {
726    pub(crate) fn set_max_buffered_partial_result_sets(&mut self, limit: usize) {
727        self.max_buffered_partial_result_sets = limit;
728    }
729}
730
731#[cfg(test)]
732pub(crate) mod tests {
733    use super::*;
734    use crate::client::Spanner;
735    use crate::key::KeySet;
736    use crate::read::ReadRequest;
737    use crate::statement::Statement;
738    use crate::transaction::BeginTransactionOption;
739    use gaxi::grpc::tonic::{Code as GrpcCode, Response, Status};
740    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
741    use google_cloud_gax::backoff_policy::BackoffPolicy;
742    use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicyExt};
743    use google_cloud_gax::retry_state::RetryState;
744    use google_cloud_test_macros::tokio_test_no_panics;
745    use prost_types::Value;
746    use spanner_grpc_mock::MockSpanner;
747    use spanner_grpc_mock::google::spanner::v1 as spanner_v1;
748    use spanner_grpc_mock::google::spanner::v1::struct_type::Field;
749    use spanner_grpc_mock::google::spanner::v1::{
750        MultiplexedSessionPrecommitToken, PartialResultSet, ResultSetMetadata, Session, StructType,
751    };
752    use spanner_grpc_mock::start;
753    use spanner_v1::result_set_stats::RowCount;
754    use std::time::Duration;
755
756    mockall::mock! {
757        #[derive(Debug)]
758        BackoffPolicy {}
759        impl BackoffPolicy for BackoffPolicy {
760            fn on_failure(&self, state: &RetryState) -> Duration;
761        }
762    }
763
764    pub(crate) fn string_val(s: &str) -> Value {
765        Value {
766            kind: Some(prost_types::value::Kind::StringValue(s.to_string())),
767        }
768    }
769
770    fn list_val(vals: Vec<Value>) -> Value {
771        Value {
772            kind: Some(prost_types::value::Kind::ListValue(
773                prost_types::ListValue { values: vals },
774            )),
775        }
776    }
777
778    fn metadata(cols: usize) -> Option<ResultSetMetadata> {
779        let mut fields = vec![];
780        for i in 0..cols {
781            fields.push(Field {
782                name: format!("col{}", i),
783                r#type: None,
784            });
785        }
786        Some(ResultSetMetadata {
787            row_type: Some(StructType { fields }),
788            transaction: None,
789            undeclared_parameters: None,
790        })
791    }
792
793    pub(crate) fn adapt<I, T>(items: I) -> tokio::sync::mpsc::Receiver<T>
794    where
795        I: IntoIterator<Item = T>,
796        I::IntoIter: ExactSizeIterator,
797    {
798        let items = items.into_iter();
799        let (tx, rx) = tokio::sync::mpsc::channel(items.len().max(1));
800        for i in items {
801            tx.try_send(i)
802                .expect("can't fail, we allocated enough capacity.");
803        }
804        rx
805    }
806
807    async fn run_mock_query(results: Vec<PartialResultSet>) -> ResultSet {
808        run_mock_query_fallible(results).await.unwrap()
809    }
810
811    async fn run_mock_query_fallible(results: Vec<PartialResultSet>) -> crate::Result<ResultSet> {
812        let mut mock = MockSpanner::new();
813        let rx = adapt(results.into_iter().map(Ok));
814        mock.expect_execute_streaming_sql()
815            .return_once(move |_request| Ok(Response::from(rx)));
816
817        mock.expect_create_session().returning(|_| {
818            Ok(Response::new(Session {
819                name: "session".to_string(),
820                multiplexed: true,
821                ..Default::default()
822            }))
823        });
824
825        let (address, _server) = start("127.0.0.1:0", mock)
826            .await
827            .expect("Failed to start mock server");
828
829        let client: Spanner = Spanner::builder()
830            .with_endpoint(address)
831            .with_credentials(Anonymous::new().build())
832            .build()
833            .await
834            .expect("Failed to build client");
835
836        let db_client: crate::database_client::DatabaseClient =
837            client.database_client("db").build().await.unwrap();
838        let tx: crate::read_only_transaction::SingleUseReadOnlyTransaction =
839            db_client.single_use().build();
840        tx.execute_query("SELECT 1").await
841    }
842
843    #[test]
844    fn test_auto_traits() {
845        static_assertions::assert_impl_all!(ResultSet: std::fmt::Debug, Send, Sync);
846    }
847
848    #[tokio_test_no_panics]
849    async fn test_result_set_zero_rows() {
850        let mut rs = run_mock_query(vec![PartialResultSet {
851            metadata: metadata(2),
852            values: vec![],
853            chunked_value: false,
854            resume_token: vec![],
855            stats: None,
856            precommit_token: None,
857            last: true,
858            cache_update: None,
859        }])
860        .await;
861
862        let next = rs.next().await;
863        assert!(next.is_none());
864    }
865
866    #[tokio_test_no_panics]
867    async fn test_result_set_metadata() -> anyhow::Result<()> {
868        let mut rs = run_mock_query(vec![PartialResultSet {
869            metadata: metadata(2),
870            values: vec![string_val("a"), string_val("b")],
871            last: true,
872            ..Default::default()
873        }])
874        .await;
875
876        // Called before next() -> metadata is immediately available.
877        let meta = rs.metadata().expect("metadata available");
878        assert_eq!(
879            meta.column_names(),
880            &["col0".to_string(), "col1".to_string()]
881        );
882
883        // Advance
884        let _next = rs.next().await.expect("Expected a row")?;
885
886        // Called after next() -> returns metadata
887        let meta = rs.metadata().expect("metadata available");
888        assert_eq!(
889            meta.column_names(),
890            &["col0".to_string(), "col1".to_string()]
891        );
892
893        Ok(())
894    }
895
896    #[tokio_test_no_panics]
897    async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> {
898        let res = run_mock_query_fallible(vec![PartialResultSet {
899            values: vec![string_val("row1")],
900            ..Default::default()
901        }])
902        .await;
903
904        assert!(res.is_err(), "Expected an error but got Ok");
905        let err_str = res.expect_err("Expected should be an error").to_string();
906        assert!(
907            err_str.contains("First PartialResultSet did not contain metadata"),
908            "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
909            err_str
910        );
911
912        Ok(())
913    }
914
915    #[tokio_test_no_panics]
916    async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> {
917        let res = run_mock_query_fallible(vec![
918            PartialResultSet {
919                values: vec![string_val("row1")],
920                ..Default::default()
921            },
922            PartialResultSet {
923                resume_token: b"token".to_vec(),
924                ..Default::default()
925            },
926        ])
927        .await;
928
929        assert!(res.is_err(), "Expected an error but got Ok");
930        let err_str = res.expect_err("Expected should be an error").to_string();
931        assert!(
932            err_str.contains("First PartialResultSet did not contain metadata"),
933            "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
934            err_str
935        );
936
937        Ok(())
938    }
939
940    #[tokio_test_no_panics]
941    async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> {
942        let mut rs = run_mock_query(vec![PartialResultSet {
943            metadata: metadata(2),
944            values: vec![string_val("a")],
945            chunked_value: true,
946            ..Default::default()
947        }])
948        .await;
949
950        let res = rs.next().await;
951        assert!(res.is_some(), "Expected an error but got None");
952        let res = res.expect("Expected some response but got None");
953        assert!(res.is_err(), "Expected an error but got Ok");
954        let err_str = res.expect_err("Expected should be an error").to_string();
955        assert!(
956            err_str.contains("Stream ended with chunked_value=true"),
957            "Expected error to contain 'Stream ended with chunked_value=true', but got '{}'",
958            err_str
959        );
960
961        Ok(())
962    }
963
964    #[tokio_test_no_panics]
965    async fn test_result_set_duplicate_metadata() -> anyhow::Result<()> {
966        let mut rs = run_mock_query(vec![
967            PartialResultSet {
968                metadata: metadata(2),
969                values: vec![string_val("a"), string_val("b")],
970                resume_token: b"token1".to_vec(),
971                ..Default::default()
972            },
973            PartialResultSet {
974                metadata: metadata(2),
975                values: vec![string_val("c"), string_val("d")],
976                ..Default::default()
977            },
978        ])
979        .await;
980
981        rs.next().await.expect("Expected a row")?;
982
983        let res2 = rs.next().await;
984        assert!(res2.is_some(), "Expected an error but got None");
985        let res2 = res2.expect("Expected some response but got None");
986        assert!(res2.is_err(), "Expected an error but got Ok");
987        let err_str = res2.expect_err("Expected should be an error").to_string();
988        assert!(
989            err_str.contains("Additional metadata after first result set"),
990            "Expected error to contain 'Additional metadata after first result set', but got '{}'",
991            err_str
992        );
993
994        Ok(())
995    }
996
997    #[tokio_test_no_panics]
998    async fn test_result_set_empty_column_metadata() -> anyhow::Result<()> {
999        let mut rs = run_mock_query(vec![PartialResultSet {
1000            metadata: Some(ResultSetMetadata {
1001                row_type: Some(StructType { fields: vec![] }),
1002                ..Default::default()
1003            }),
1004            values: vec![string_val("a")],
1005            ..Default::default()
1006        }])
1007        .await;
1008
1009        let res = rs.next().await;
1010        assert!(res.is_some(), "Expected an error but got None");
1011        let res = res.expect("Expected some response but got None");
1012        assert!(res.is_err(), "Expected an error but got Ok");
1013        let err_str = res.expect_err("Expected should be an error").to_string();
1014        assert!(
1015            err_str
1016                .contains("PartialResultSet contained values but no column metadata was provided"),
1017            "Expected error to contain 'PartialResultSet contained values but no column metadata was provided', but got '{}'",
1018            err_str
1019        );
1020
1021        Ok(())
1022    }
1023
1024    #[tokio_test_no_panics]
1025    async fn test_result_set_default_policies_applied() -> anyhow::Result<()> {
1026        let rs = run_mock_query(vec![PartialResultSet {
1027            metadata: metadata(2),
1028            last: true,
1029            ..Default::default()
1030        }])
1031        .await;
1032
1033        assert!(
1034            rs.gax_options.retry_policy().is_some(),
1035            "Default retry policy should be applied"
1036        );
1037        assert!(
1038            rs.gax_options.backoff_policy().is_some(),
1039            "Default backoff policy should be applied"
1040        );
1041
1042        Ok(())
1043    }
1044
1045    #[tokio_test_no_panics]
1046    async fn test_result_set_retry_read_stream() -> anyhow::Result<()> {
1047        let mut mock = MockSpanner::new();
1048        let mut seq = mockall::Sequence::new();
1049
1050        mock.expect_streaming_read()
1051            .times(1)
1052            .in_sequence(&mut seq)
1053            .returning(|_request| {
1054                let stream = adapt([
1055                    Ok(PartialResultSet {
1056                        metadata: metadata(2),
1057                        values: vec![string_val("row1"), string_val("b")],
1058                        resume_token: b"token1".to_vec(),
1059                        ..Default::default()
1060                    }),
1061                    Err(Status::unavailable("Unavailable error")),
1062                ]);
1063                Ok(Response::from(stream))
1064            });
1065
1066        mock.expect_streaming_read()
1067            .times(1)
1068            .in_sequence(&mut seq)
1069            .returning(|_request| {
1070                let stream = adapt([Ok(PartialResultSet {
1071                    values: vec![string_val("row2"), string_val("d")],
1072                    resume_token: b"token2".to_vec(),
1073                    last: true,
1074                    ..Default::default()
1075                })]);
1076                Ok(Response::from(stream))
1077            });
1078
1079        mock.expect_create_session().returning(|_| {
1080            Ok(Response::new(Session {
1081                name: "session".to_string(),
1082                multiplexed: true,
1083                ..Default::default()
1084            }))
1085        });
1086
1087        let (address, _server) = start("127.0.0.1:0", mock).await?;
1088
1089        let client: Spanner = Spanner::builder()
1090            .with_endpoint(address)
1091            .with_credentials(Anonymous::new().build())
1092            .build()
1093            .await?;
1094
1095        let db_client = client.database_client("db").build().await?;
1096        let tx = db_client.single_use().build();
1097        let mut mock_backoff = MockBackoffPolicy::new();
1098        mock_backoff
1099            .expect_on_failure()
1100            .returning(|_| Duration::from_nanos(1));
1101
1102        let read_req = crate::read::ReadRequest::builder("table", vec!["Id", "Value"])
1103            .with_keys(crate::key::KeySet::all())
1104            .with_backoff_policy(mock_backoff)
1105            .build();
1106        let mut rs: ResultSet = tx.execute_read(read_req).await?;
1107
1108        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1109        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1110
1111        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1112        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1113
1114        assert!(rs.next().await.is_none());
1115
1116        Ok(())
1117    }
1118
1119    #[tokio_test_no_panics]
1120    async fn test_result_set_custom_retry_policy() -> anyhow::Result<()> {
1121        // Extend the default retry policy to also retry on ResourceExhausted.
1122        let retry_policy = Aip194Strict.continue_on_too_many_requests();
1123
1124        let mut mock = MockSpanner::new();
1125        let mut seq = mockall::Sequence::new();
1126
1127        // Fail with RESOURCE_EXHAUSTED on first call
1128        mock.expect_streaming_read()
1129            .times(1)
1130            .in_sequence(&mut seq)
1131            .returning(|_request| {
1132                let stream = adapt([
1133                    Ok(PartialResultSet {
1134                        metadata: metadata(2),
1135                        values: vec![string_val("row1"), string_val("b")],
1136                        resume_token: b"token1".to_vec(),
1137                        ..Default::default()
1138                    }),
1139                    Err(Status::new(GrpcCode::ResourceExhausted, "Quota exceeded")),
1140                ]);
1141                Ok(Response::from(stream))
1142            });
1143
1144        // Succeed on second call
1145        mock.expect_streaming_read()
1146            .times(1)
1147            .in_sequence(&mut seq)
1148            .returning(|_request| {
1149                let stream = adapt([Ok(PartialResultSet {
1150                    values: vec![string_val("row2"), string_val("d")],
1151                    resume_token: b"token2".to_vec(),
1152                    last: true,
1153                    ..Default::default()
1154                })]);
1155                Ok(Response::from(stream))
1156            });
1157
1158        mock.expect_create_session().returning(|_| {
1159            Ok(Response::new(Session {
1160                name: "session".to_string(),
1161                multiplexed: true,
1162                ..Default::default()
1163            }))
1164        });
1165
1166        let (address, _server) = start("127.0.0.1:0", mock).await?;
1167
1168        let client: Spanner = Spanner::builder()
1169            .with_endpoint(address)
1170            .with_credentials(Anonymous::new().build())
1171            .build()
1172            .await?;
1173
1174        let db_client = client.database_client("db").build().await?;
1175        let tx = db_client.single_use().build();
1176
1177        let mut mock_backoff = MockBackoffPolicy::new();
1178        mock_backoff
1179            .expect_on_failure()
1180            .times(1)
1181            .returning(|_| Duration::from_nanos(1));
1182
1183        let read_req = ReadRequest::builder("table", vec!["Id", "Value"])
1184            .with_keys(KeySet::all())
1185            .with_retry_policy(retry_policy)
1186            .with_backoff_policy(mock_backoff)
1187            .build();
1188
1189        let mut rs: ResultSet = tx.execute_read(read_req).await?;
1190
1191        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1192        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1193
1194        // This next() call should trigger the retry because the previous stream ended with error!
1195        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1196        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1197
1198        assert!(rs.next().await.is_none());
1199
1200        Ok(())
1201    }
1202
1203    #[tokio_test_no_panics]
1204    async fn test_result_set_one_row() {
1205        let mut rs = run_mock_query(vec![PartialResultSet {
1206            metadata: metadata(2),
1207            values: vec![string_val("a"), string_val("b")],
1208            chunked_value: false,
1209            resume_token: vec![],
1210            stats: None,
1211            precommit_token: None,
1212            last: true,
1213            cache_update: None,
1214        }])
1215        .await;
1216
1217        let row = rs.next().await.unwrap().unwrap();
1218        assert_eq!(row.raw_values().len(), 2);
1219        assert_eq!(row.raw_values()[0].0, string_val("a"));
1220        assert_eq!(row.raw_values()[1].0, string_val("b"));
1221
1222        assert!(rs.next().await.is_none());
1223    }
1224
1225    #[tokio_test_no_panics]
1226    async fn result_set_last_flag() -> anyhow::Result<()> {
1227        let mut rs = run_mock_query(vec![
1228            PartialResultSet {
1229                metadata: metadata(2),
1230                values: vec![string_val("a"), string_val("b")],
1231                last: true,
1232                ..Default::default()
1233            },
1234            // Note: This is not something that would be returned by Spanner.
1235            // Once a PartialResultSet with last == true is returned, no more
1236            // PartialResultSets will be returned by Spanner.
1237            PartialResultSet {
1238                values: vec![string_val("c"), string_val("d")],
1239                ..Default::default()
1240            },
1241        ])
1242        .await;
1243
1244        let row = rs.next().await.expect("Expected a row")?;
1245        assert_eq!(row.raw_values()[0].0, string_val("a"));
1246
1247        // Verify that the second PartialResultSet is ignored.
1248        assert!(rs.next().await.is_none());
1249
1250        Ok(())
1251    }
1252
1253    #[tokio_test_no_panics]
1254    async fn result_set_early_termination_not_cancelled() -> anyhow::Result<()> {
1255        let mut mock = MockSpanner::new();
1256        let (tx, rx) = tokio::sync::mpsc::channel(10);
1257
1258        mock.expect_execute_streaming_sql()
1259            .return_once(move |_request| Ok(Response::from(rx)));
1260
1261        mock.expect_create_session().returning(|_| {
1262            Ok(Response::new(Session {
1263                name: "session".to_string(),
1264                multiplexed: true,
1265                ..Default::default()
1266            }))
1267        });
1268
1269        let (address, _server) = start("127.0.0.1:0", mock).await?;
1270
1271        let client: Spanner = Spanner::builder()
1272            .with_endpoint(address)
1273            .with_credentials(Anonymous::new().build())
1274            .build()
1275            .await?;
1276
1277        let db_client = client.database_client("db").build().await?;
1278        let tx_single = db_client.single_use().build();
1279
1280        // 1. Send the first message with last: true
1281        tx.send(Ok(PartialResultSet {
1282            metadata: metadata(2),
1283            values: vec![string_val("a"), string_val("b")],
1284            last: true,
1285            ..Default::default()
1286        }))
1287        .await
1288        .expect("Failed to send first message");
1289
1290        let mut rs: ResultSet = tx_single.execute_query("SELECT 1").await?;
1291
1292        // 2. Consume the first message
1293        let row = rs.next().await.expect("Expected a row")?;
1294        assert_eq!(row.raw_values()[0].0, string_val("a"));
1295
1296        // 3. Call next again. It should see seen_last and return None early,
1297        // and drain/cancel the stream cleanly without detached tasks.
1298        assert!(rs.next().await.is_none());
1299        drop(rs);
1300        tx.closed().await;
1301
1302        // 4. Now try to send another message. The stream is cancelled (dropped), so this fails.
1303        let send_result = tx
1304            .send(Ok(PartialResultSet {
1305                values: vec![string_val("c"), string_val("d")],
1306                ..Default::default()
1307            }))
1308            .await;
1309
1310        assert!(send_result.is_err(), "Expected stream to be cancelled");
1311
1312        Ok(())
1313    }
1314
1315    #[tokio_test_no_panics]
1316    async fn test_result_set_chunked_values_string() {
1317        let mut rs = run_mock_query(vec![
1318            PartialResultSet {
1319                metadata: metadata(1),
1320                values: vec![string_val("hello ")],
1321                chunked_value: true,
1322                resume_token: vec![],
1323                stats: None,
1324                precommit_token: None,
1325                last: false,
1326                cache_update: None,
1327            },
1328            PartialResultSet {
1329                metadata: None,
1330                values: vec![string_val("world")],
1331                chunked_value: false,
1332                resume_token: vec![],
1333                stats: None,
1334                precommit_token: None,
1335                last: true,
1336                cache_update: None,
1337            },
1338        ])
1339        .await;
1340
1341        let row = rs.next().await.unwrap().unwrap();
1342        assert_eq!(row.raw_values().len(), 1);
1343        if let Some(prost_types::value::Kind::StringValue(ref s)) = row.raw_values()[0].0.kind {
1344            assert_eq!(s, "hello world");
1345        } else {
1346            panic!("Expected StringValue");
1347        }
1348        assert!(rs.next().await.is_none());
1349    }
1350
1351    #[tokio_test_no_panics]
1352    async fn test_result_set_chunked_values_list() {
1353        let mut rs = run_mock_query(vec![
1354            PartialResultSet {
1355                metadata: metadata(1),
1356                values: vec![list_val(vec![string_val("A")])],
1357                chunked_value: true,
1358                resume_token: vec![],
1359                stats: None,
1360                precommit_token: None,
1361                last: false,
1362                cache_update: None,
1363            },
1364            PartialResultSet {
1365                metadata: None,
1366                values: vec![list_val(vec![string_val("B")])],
1367                chunked_value: false,
1368                resume_token: vec![],
1369                stats: None,
1370                precommit_token: None,
1371                last: true,
1372                cache_update: None,
1373            },
1374        ])
1375        .await;
1376
1377        let row = rs.next().await.unwrap().unwrap();
1378        assert_eq!(row.raw_values().len(), 1);
1379        if let Some(prost_types::value::Kind::ListValue(ref l)) = row.raw_values()[0].0.kind {
1380            assert_eq!(l.values.len(), 1);
1381            if let Some(prost_types::value::Kind::StringValue(ref s)) = l.values[0].kind {
1382                assert_eq!(s, "AB");
1383            } else {
1384                panic!("Expected StringValue");
1385            }
1386        } else {
1387            panic!("Expected ListValue");
1388        }
1389        assert!(rs.next().await.is_none());
1390    }
1391
1392    #[tokio_test_no_panics]
1393    async fn test_multi_response_chunking_bool_array() {
1394        fn bool_val(b: bool) -> Value {
1395            Value {
1396                kind: Some(prost_types::value::Kind::BoolValue(b)),
1397            }
1398        }
1399        fn null_val() -> Value {
1400            Value {
1401                kind: Some(prost_types::value::Kind::NullValue(0)),
1402            }
1403        }
1404
1405        let mut rs = run_mock_query(vec![
1406            PartialResultSet {
1407                metadata: metadata(1),
1408                values: vec![
1409                    list_val(vec![bool_val(true)]),
1410                    list_val(vec![bool_val(false), null_val(), bool_val(true)]),
1411                ],
1412                chunked_value: true,
1413                resume_token: vec![],
1414                stats: None,
1415                precommit_token: None,
1416                cache_update: None,
1417                last: false,
1418            },
1419            PartialResultSet {
1420                metadata: None,
1421                values: vec![list_val(vec![bool_val(true), bool_val(true)])],
1422                chunked_value: true,
1423                resume_token: vec![],
1424                stats: None,
1425                precommit_token: None,
1426                cache_update: None,
1427                last: false,
1428            },
1429            PartialResultSet {
1430                metadata: None,
1431                values: vec![
1432                    list_val(vec![null_val(), null_val(), bool_val(false)]),
1433                    list_val(vec![bool_val(true)]),
1434                ],
1435                chunked_value: false,
1436                resume_token: vec![],
1437                stats: None,
1438                precommit_token: None,
1439                cache_update: None,
1440                last: true,
1441            },
1442        ])
1443        .await;
1444
1445        let row1 = rs.next().await.unwrap().unwrap();
1446        assert_eq!(row1.raw_values()[0].0, list_val(vec![bool_val(true)]));
1447
1448        let row2 = rs.next().await.unwrap().unwrap();
1449        assert_eq!(
1450            row2.raw_values()[0].0,
1451            list_val(vec![
1452                bool_val(false),
1453                null_val(),
1454                bool_val(true),
1455                bool_val(true),
1456                bool_val(true),
1457                null_val(),
1458                null_val(),
1459                bool_val(false)
1460            ])
1461        );
1462
1463        let row3 = rs.next().await.unwrap().unwrap();
1464        assert_eq!(row3.raw_values()[0].0, list_val(vec![bool_val(true)]));
1465
1466        assert!(rs.next().await.is_none());
1467    }
1468
1469    #[tokio_test_no_panics]
1470    async fn test_multi_response_chunking_int64_array() {
1471        fn null_val() -> Value {
1472            Value {
1473                kind: Some(prost_types::value::Kind::NullValue(0)),
1474            }
1475        }
1476
1477        let mut rs = run_mock_query(vec![
1478            PartialResultSet {
1479                metadata: metadata(1),
1480                values: vec![
1481                    list_val(vec![string_val("10")]),
1482                    list_val(vec![string_val("1"), string_val("2"), null_val()]),
1483                ],
1484                chunked_value: true,
1485                resume_token: vec![],
1486                stats: None,
1487                precommit_token: None,
1488                cache_update: None,
1489                last: false,
1490            },
1491            PartialResultSet {
1492                metadata: None,
1493                values: vec![list_val(vec![null_val(), string_val("5")])],
1494                chunked_value: true,
1495                resume_token: vec![],
1496                stats: None,
1497                precommit_token: None,
1498                cache_update: None,
1499                last: false,
1500            },
1501            PartialResultSet {
1502                metadata: None,
1503                values: vec![
1504                    list_val(vec![null_val(), string_val("7"), string_val("8")]),
1505                    list_val(vec![string_val("20")]),
1506                ],
1507                chunked_value: false,
1508                resume_token: vec![],
1509                stats: None,
1510                precommit_token: None,
1511                cache_update: None,
1512                last: true,
1513            },
1514        ])
1515        .await;
1516
1517        let row1 = rs.next().await.unwrap().unwrap();
1518        assert_eq!(row1.raw_values()[0].0, list_val(vec![string_val("10")]));
1519
1520        let row2 = rs.next().await.unwrap().unwrap();
1521        assert_eq!(
1522            row2.raw_values()[0].0,
1523            list_val(vec![
1524                string_val("1"),
1525                string_val("2"),
1526                null_val(),
1527                null_val(),
1528                string_val("5"),
1529                null_val(),
1530                string_val("7"),
1531                string_val("8")
1532            ])
1533        );
1534
1535        let row3 = rs.next().await.unwrap().unwrap();
1536        assert_eq!(row3.raw_values()[0].0, list_val(vec![string_val("20")]));
1537
1538        assert!(rs.next().await.is_none());
1539    }
1540
1541    #[tokio_test_no_panics]
1542    async fn test_result_set_precommit_token_tracked() -> anyhow::Result<()> {
1543        let token = MultiplexedSessionPrecommitToken {
1544            precommit_token: b"test_token".to_vec(),
1545            seq_num: 99,
1546        };
1547        let results = vec![PartialResultSet {
1548            metadata: metadata(1),
1549            precommit_token: Some(token.clone()),
1550            last: true,
1551            ..Default::default()
1552        }];
1553
1554        let mut mock = MockSpanner::new();
1555        let rx = adapt(results.into_iter().map(Ok));
1556        mock.expect_execute_streaming_sql()
1557            .return_once(move |_request| Ok(Response::from(rx)));
1558
1559        mock.expect_create_session().returning(|_| {
1560            Ok(Response::new(Session {
1561                name: "session".to_string(),
1562                multiplexed: true,
1563                ..Default::default()
1564            }))
1565        });
1566
1567        let (address, _server) = start("127.0.0.1:0", mock)
1568            .await
1569            .expect("Failed to start mock server");
1570
1571        let client: Spanner = Spanner::builder()
1572            .with_endpoint(address)
1573            .with_credentials(Anonymous::new().build())
1574            .build()
1575            .await
1576            .expect("Failed to build client");
1577
1578        let db_client: crate::database_client::DatabaseClient =
1579            client.database_client("db").build().await.unwrap();
1580
1581        let tracker = PrecommitTokenTracker::new();
1582
1583        let req = crate::model::ExecuteSqlRequest::default()
1584            .set_session("session".to_string())
1585            .set_sql("SELECT 1".to_string());
1586
1587        let stream = db_client
1588            .spanner
1589            .execute_streaming_sql(req.clone(), GaxRequestOptions::default(), 0)
1590            .send()
1591            .await?;
1592
1593        let mut rs = ResultSet::create(ResultSetParams {
1594            stream,
1595            transaction_selector: None,
1596            precommit_token_tracker: tracker.clone(),
1597            client: db_client,
1598            session_name: "session".to_string(),
1599            transaction_tag: None,
1600            operation: StreamOperation::Query(req),
1601            channel_hint: 0,
1602            gax_options: GaxRequestOptions::default(),
1603        })
1604        .await?;
1605
1606        // Read a row to trigger precommit token extraction
1607        assert!(
1608            rs.next().await.is_none(),
1609            "Expected no rows, but received one"
1610        );
1611
1612        // Validate the tracker correctly intercepted and preserved the token
1613        let tracked_token = tracker.get().expect("token should be tracked");
1614        assert_eq!(tracked_token.seq_num, 99);
1615        assert_eq!(
1616            tracked_token.precommit_token,
1617            bytes::Bytes::from("test_token")
1618        );
1619
1620        Ok(())
1621    }
1622
1623    #[tokio_test_no_panics]
1624    async fn test_result_set_retry_simple() -> anyhow::Result<()> {
1625        let mut mock = MockSpanner::new();
1626        let mut seq = mockall::Sequence::new();
1627
1628        mock.expect_execute_streaming_sql()
1629            .times(1)
1630            .in_sequence(&mut seq)
1631            .returning(|_request| {
1632                let stream = adapt([
1633                    Ok(PartialResultSet {
1634                        metadata: metadata(1),
1635                        values: vec![string_val("row1")],
1636                        resume_token: b"token1".to_vec(),
1637                        ..Default::default()
1638                    }),
1639                    Err(Status::unavailable("Transient error")),
1640                ]);
1641                Ok(Response::from(stream))
1642            });
1643
1644        mock.expect_execute_streaming_sql()
1645            .times(1)
1646            .in_sequence(&mut seq)
1647            .returning(|_request| {
1648                let stream = adapt([Ok(PartialResultSet {
1649                    values: vec![string_val("row2")],
1650                    resume_token: b"token2".to_vec(),
1651                    last: true,
1652                    ..Default::default()
1653                })]);
1654                Ok(Response::from(stream))
1655            });
1656
1657        mock.expect_create_session().returning(|_| {
1658            Ok(Response::new(Session {
1659                name: "session".to_string(),
1660                multiplexed: true,
1661                ..Default::default()
1662            }))
1663        });
1664
1665        let (address, _server) = start("127.0.0.1:0", mock).await?;
1666
1667        let client: Spanner = Spanner::builder()
1668            .with_endpoint(address)
1669            .with_credentials(Anonymous::new().build())
1670            .build()
1671            .await?;
1672
1673        let db_client = client.database_client("db").build().await?;
1674        let tx = db_client.single_use().build();
1675        let mut mock_backoff = MockBackoffPolicy::new();
1676        mock_backoff
1677            .expect_on_failure()
1678            .returning(|_| Duration::from_nanos(1));
1679
1680        let stmt = Statement::builder("SELECT 1")
1681            .with_backoff_policy(mock_backoff)
1682            .build();
1683        let mut rs = tx.execute_query(stmt).await?;
1684
1685        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1686        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1687
1688        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1689        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1690
1691        assert!(rs.next().await.is_none());
1692
1693        Ok(())
1694    }
1695
1696    #[tokio_test_no_panics]
1697    async fn test_result_set_retry_non_retriable_error() -> anyhow::Result<()> {
1698        let mut mock = MockSpanner::new();
1699        mock.expect_execute_streaming_sql()
1700            .times(1)
1701            .returning(|_request| {
1702                let stream = adapt([
1703                    Ok(PartialResultSet {
1704                        metadata: metadata(1),
1705                        values: vec![string_val("row1")],
1706                        resume_token: b"token1".to_vec(),
1707                        ..Default::default()
1708                    }),
1709                    Err(Status::invalid_argument("Non-retriable error")),
1710                ]);
1711                Ok(Response::from(stream))
1712            });
1713
1714        mock.expect_create_session().returning(|_| {
1715            Ok(Response::new(Session {
1716                name: "session".to_string(),
1717                multiplexed: true,
1718                ..Default::default()
1719            }))
1720        });
1721
1722        let (address, _server) = start("127.0.0.1:0", mock).await?;
1723
1724        let client: Spanner = Spanner::builder()
1725            .with_endpoint(address)
1726            .with_credentials(Anonymous::new().build())
1727            .build()
1728            .await?;
1729
1730        let db_client = client.database_client("db").build().await?;
1731        let tx = db_client.single_use().build();
1732        let mut rs = tx.execute_query("SELECT 1").await?;
1733
1734        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1735        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1736
1737        let res = rs.next().await;
1738        assert!(res.is_some(), "Expected an error but got None");
1739        let res = res.expect("Expected some response but got None");
1740        assert!(res.is_err(), "Expected an error but got Ok");
1741        let err_str = res.expect_err("Expected should be an error").to_string();
1742        assert!(
1743            err_str.contains("Non-retriable error"),
1744            "Expected error to contain 'Non-retriable error', but got '{}'",
1745            err_str
1746        );
1747
1748        Ok(())
1749    }
1750
1751    #[tokio_test_no_panics]
1752    async fn test_result_set_buffer_overflow() -> anyhow::Result<()> {
1753        let mut mock = MockSpanner::new();
1754        let (tx_msg, rx_msg) = tokio::sync::mpsc::channel(10);
1755        mock.expect_execute_streaming_sql()
1756            // Should only be called once, as it is not retried due to missing resume tokens.
1757            .times(1)
1758            .return_once(move |_request| Ok(Response::from(rx_msg)));
1759
1760        mock.expect_create_session().returning(|_| {
1761            Ok(Response::new(Session {
1762                name: "session".to_string(),
1763                multiplexed: true,
1764                ..Default::default()
1765            }))
1766        });
1767
1768        let (address, _server) = start("127.0.0.1:0", mock).await?;
1769
1770        let client: Spanner = Spanner::builder()
1771            .with_endpoint(address)
1772            .with_credentials(Anonymous::new().build())
1773            .build()
1774            .await?;
1775
1776        let db_client = client.database_client("db").build().await?;
1777        let tx = db_client.single_use().build();
1778
1779        tx_msg
1780            .send(Ok(PartialResultSet {
1781                metadata: metadata(1),
1782                values: vec![string_val("row1")],
1783                resume_token: b"token1".to_vec(),
1784                ..Default::default()
1785            }))
1786            .await?;
1787
1788        let mut rs = tx.execute_query("SELECT 1").await?;
1789
1790        // Set max buffer size to 2
1791        rs.set_max_buffered_partial_result_sets(2);
1792
1793        tx_msg
1794            .send(Ok(PartialResultSet {
1795                values: vec![string_val("row2")],
1796                ..Default::default()
1797            }))
1798            .await?;
1799        tx_msg
1800            .send(Ok(PartialResultSet {
1801                values: vec![string_val("row3")],
1802                ..Default::default()
1803            }))
1804            .await?;
1805        tx_msg
1806            .send(Ok(PartialResultSet {
1807                values: vec![string_val("row4")],
1808                ..Default::default()
1809            }))
1810            .await?;
1811        tx_msg
1812            .send(Err(Status::unavailable("Unavailable error")))
1813            .await?;
1814
1815        // Read row 1.
1816        let row1 = rs.next().await.expect("Expected row1")?;
1817        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1818
1819        // Read row 2 (flushed when row4 overflows buffer).
1820        let row2 = rs.next().await.expect("Expected row2")?;
1821        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1822
1823        // Try to read next row. As the buffer is now full/unsafe, it will not retry and return the error.
1824        let res = rs.next().await;
1825        assert!(res.is_some(), "Expected an error but got None");
1826        let res = res.expect("Expected some response but got None");
1827        assert!(res.is_err(), "Expected an error but got Ok");
1828        let err_str = res.expect_err("Expected should be an error").to_string();
1829        assert!(
1830            err_str.contains("Unavailable error"),
1831            "Expected error to contain 'Unavailable error', but got '{}'",
1832            err_str
1833        );
1834
1835        Ok(())
1836    }
1837
1838    #[tokio_test_no_panics]
1839    async fn test_result_set_retry_missing_resume_token_safe() -> anyhow::Result<()> {
1840        let mut mock = MockSpanner::new();
1841        let mut seq = mockall::Sequence::new();
1842
1843        mock.expect_execute_streaming_sql()
1844            .times(1)
1845            .in_sequence(&mut seq)
1846            .returning(|_request| {
1847                let stream = adapt([
1848                    Ok(PartialResultSet {
1849                        metadata: metadata(1),
1850                        values: vec![string_val("row1")],
1851                        // no resume token
1852                        ..Default::default()
1853                    }),
1854                    Err(Status::unavailable("Unavailable error")),
1855                ]);
1856                Ok(Response::from(stream))
1857            });
1858
1859        mock.expect_execute_streaming_sql()
1860            .times(1)
1861            .in_sequence(&mut seq)
1862            .returning(|_request| {
1863                let stream = adapt([Ok(PartialResultSet {
1864                    metadata: metadata(1),
1865                    values: vec![string_val("row1_retry")],
1866                    resume_token: b"token_retry".to_vec(),
1867                    ..Default::default()
1868                })]);
1869                Ok(Response::from(stream))
1870            });
1871
1872        mock.expect_create_session().returning(|_| {
1873            Ok(Response::new(Session {
1874                name: "session".to_string(),
1875                multiplexed: true,
1876                ..Default::default()
1877            }))
1878        });
1879
1880        let (address, _server) = start("127.0.0.1:0", mock).await?;
1881
1882        let client: Spanner = Spanner::builder()
1883            .with_endpoint(address)
1884            .with_credentials(Anonymous::new().build())
1885            .build()
1886            .await?;
1887
1888        let db_client = client.database_client("db").build().await?;
1889        let tx = db_client.single_use().build();
1890        let mut mock_backoff = MockBackoffPolicy::new();
1891        mock_backoff
1892            .expect_on_failure()
1893            .returning(|_| Duration::from_nanos(1));
1894
1895        let stmt = Statement::builder("SELECT 1")
1896            .with_backoff_policy(mock_backoff)
1897            .build();
1898        let mut rs = tx.execute_query(stmt).await?;
1899
1900        let row1 = rs.next().await.expect("Expected row1")?;
1901        assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
1902
1903        Ok(())
1904    }
1905
1906    #[tokio_test_no_panics]
1907    async fn test_result_set_retry_under_limit_no_resume_token() -> anyhow::Result<()> {
1908        let mut mock = MockSpanner::new();
1909        let mut seq = mockall::Sequence::new();
1910
1911        // First stream: 2 messages without resume token, then Error.
1912        mock.expect_execute_streaming_sql()
1913            .times(1)
1914            .in_sequence(&mut seq)
1915            .returning(|_request| {
1916                let stream = adapt([
1917                    Ok(PartialResultSet {
1918                        metadata: metadata(1),
1919                        values: vec![string_val("row1")],
1920                        ..Default::default()
1921                    }),
1922                    Ok(PartialResultSet {
1923                        values: vec![string_val("row2")],
1924                        ..Default::default()
1925                    }),
1926                    Err(Status::unavailable("Unavailable error")),
1927                ]);
1928                Ok(Response::from(stream))
1929            });
1930
1931        // Second stream: Retried from the start as the initial stream
1932        // returned Unavailable before the buffer was full.
1933        mock.expect_execute_streaming_sql()
1934            .times(1)
1935            .in_sequence(&mut seq)
1936            .returning(|request| {
1937                assert!(
1938                    request.get_ref().resume_token.is_empty(),
1939                    "Expected empty resume token for retry"
1940                );
1941                let stream = adapt([Ok(PartialResultSet {
1942                    metadata: metadata(1),
1943                    values: vec![string_val("row1_retry")],
1944                    resume_token: b"token_retry".to_vec(),
1945                    ..Default::default()
1946                })]);
1947                Ok(Response::from(stream))
1948            });
1949
1950        mock.expect_create_session().returning(|_| {
1951            Ok(Response::new(Session {
1952                name: "session".to_string(),
1953                multiplexed: true,
1954                ..Default::default()
1955            }))
1956        });
1957
1958        let (address, _server) = start("127.0.0.1:0", mock).await?;
1959
1960        let client: Spanner = Spanner::builder()
1961            .with_endpoint(address)
1962            .with_credentials(Anonymous::new().build())
1963            .build()
1964            .await?;
1965
1966        let db_client = client.database_client("db").build().await?;
1967        let tx = db_client.single_use().build();
1968        let mut mock_backoff = MockBackoffPolicy::new();
1969        mock_backoff
1970            .expect_on_failure()
1971            .returning(|_| Duration::from_nanos(1));
1972
1973        let stmt = Statement::builder("SELECT 1")
1974            .with_backoff_policy(mock_backoff)
1975            .build();
1976        let mut rs = tx.execute_query(stmt).await?;
1977
1978        // Set max buffer size to 3 (so 2 messages is under the limit)
1979        rs.set_max_buffered_partial_result_sets(3);
1980
1981        // Read row 1.
1982        // It reads row1, row2, and then the error from the first stream.
1983        // Since it is less than the buffer size, it retries without a resume token.
1984        // The retry stream returns "row1_retry".
1985        let row1 = rs.next().await.expect("Expected row1")?;
1986        assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
1987
1988        Ok(())
1989    }
1990
1991    #[tokio_test_no_panics]
1992    async fn test_result_set_retry_limit_exceeded() -> anyhow::Result<()> {
1993        let mut mock = MockSpanner::new();
1994
1995        mock.expect_execute_streaming_sql()
1996            .times(11) // 1 initial + 10 retries
1997            .returning(|_request| {
1998                let stream = adapt([Err(Status::unavailable("Unavailable error"))]);
1999                Ok(Response::from(stream))
2000            });
2001
2002        mock.expect_create_session().returning(|_| {
2003            Ok(Response::new(Session {
2004                name: "session".to_string(),
2005                multiplexed: true,
2006                ..Default::default()
2007            }))
2008        });
2009
2010        let (address, _server) = start("127.0.0.1:0", mock).await?;
2011
2012        let client: Spanner = Spanner::builder()
2013            .with_endpoint(address)
2014            .with_credentials(Anonymous::new().build())
2015            .build()
2016            .await?;
2017
2018        let db_client = client.database_client("db").build().await?;
2019        let tx = db_client.single_use().build();
2020        let mut mock_backoff = MockBackoffPolicy::new();
2021        mock_backoff
2022            .expect_on_failure()
2023            .times(10)
2024            .returning(|_| Duration::from_nanos(1));
2025
2026        let stmt = Statement::builder("SELECT 1")
2027            .with_backoff_policy(mock_backoff)
2028            .build();
2029        let res = tx.execute_query(stmt).await;
2030
2031        assert!(res.is_err(), "Expected an error but got Ok");
2032        let err_str = res.expect_err("Expected should be an error").to_string();
2033        assert!(
2034            err_str.contains("Unavailable error"),
2035            "Expected error to contain 'Unavailable error', but got '{}'",
2036            err_str
2037        );
2038
2039        Ok(())
2040    }
2041
2042    #[tokio_test_no_panics]
2043    async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> {
2044        let mut mock = MockSpanner::new();
2045        let mut seq = mockall::Sequence::new();
2046
2047        // 1. Stream yields an error on the first chunk before returning transaction metadata.
2048        // E.g., INVALID_ARGUMENT because the query is malformed.
2049        mock.expect_execute_streaming_sql()
2050            .times(1)
2051            .in_sequence(&mut seq)
2052            .returning(|_request| {
2053                let stream = adapt([Err(Status::invalid_argument("Invalid query"))]);
2054                Ok(Response::from(stream))
2055            });
2056
2057        // 2. The explicit BeginTransaction fallback gets triggered.
2058        mock.expect_begin_transaction()
2059            .times(1)
2060            .in_sequence(&mut seq)
2061            .returning(|_| {
2062                Ok(Response::new(spanner_v1::Transaction {
2063                    id: vec![7, 8, 9],
2064                    read_timestamp: Some(prost_types::Timestamp {
2065                        seconds: 123456789,
2066                        nanos: 0,
2067                    }),
2068                    ..Default::default()
2069                }))
2070            });
2071
2072        // 3. The ResultSet gracefully restarts the stream using the transaction ID returned by BeginTransaction.
2073        mock.expect_execute_streaming_sql()
2074            .times(1)
2075            .in_sequence(&mut seq)
2076            .returning(|req| {
2077                let req = req.into_inner();
2078                // Ensure the explicitly yielded ID is routed into the new stream transaction selector
2079                match req.transaction.unwrap().selector.unwrap() {
2080                    spanner_v1::transaction_selector::Selector::Id(id) => {
2081                        assert_eq!(id, vec![7, 8, 9]);
2082                    }
2083                    _ => panic!("Expected Selector::Id"),
2084                }
2085
2086                let stream = adapt([Ok(PartialResultSet {
2087                    metadata: metadata(1),
2088                    values: vec![string_val("1")],
2089                    ..Default::default()
2090                })]);
2091                Ok(Response::from(stream))
2092            });
2093
2094        mock.expect_create_session().returning(|_| {
2095            Ok(Response::new(Session {
2096                name: "session".to_string(),
2097                multiplexed: true,
2098                ..Default::default()
2099            }))
2100        });
2101
2102        let (address, _server) = start("127.0.0.1:0", mock).await?;
2103
2104        let client: Spanner = Spanner::builder()
2105            .with_endpoint(address)
2106            .with_credentials(Anonymous::new().build())
2107            .build()
2108            .await?;
2109
2110        let db_client = client.database_client("db").build().await?;
2111
2112        let tx = db_client
2113            .read_only_transaction()
2114            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2115            .build()
2116            .await?;
2117        let mut rs = tx.execute_query("SELECT 1").await?;
2118
2119        let row1 = rs.next().await.ok_or_else(|| {
2120            anyhow::anyhow!("Expected row returned successfully despite stream breaking")
2121        })??;
2122        assert_eq!(
2123            row1.raw_values()[0].0,
2124            string_val("1"),
2125            "Verify the returned stream successfully resumed with the correct payload"
2126        );
2127
2128        Ok(())
2129    }
2130
2131    #[tokio_test_no_panics]
2132    async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> {
2133        let mut mock = MockSpanner::new();
2134        let mut seq = mockall::Sequence::new();
2135
2136        // 1. Initial stream throws UNAVAILABLE before metadata.
2137        mock.expect_execute_streaming_sql()
2138            .times(1)
2139            .in_sequence(&mut seq)
2140            .returning(|_request| {
2141                let stream = adapt([Err(Status::unavailable("Transient network issue"))]);
2142                Ok(Response::from(stream))
2143            });
2144
2145        // 2. We retry the stream since it was a transient error.
2146        // The retry should use the same transaction selector as the original request.
2147        mock.expect_execute_streaming_sql()
2148            .times(1)
2149            .in_sequence(&mut seq)
2150            .returning(|req| {
2151                let req = req.into_inner();
2152                match req.transaction.unwrap().selector.unwrap() {
2153                    spanner_v1::transaction_selector::Selector::Begin(_) => {}
2154                    _ => panic!("Expected Selector::Begin on stream retry"),
2155                }
2156
2157                let mut meta = metadata(1).unwrap();
2158                meta.transaction = Some(spanner_v1::Transaction {
2159                    id: vec![7, 8, 9],
2160                    read_timestamp: None,
2161                    ..Default::default()
2162                });
2163
2164                let stream = adapt([Ok(PartialResultSet {
2165                    metadata: Some(meta),
2166                    values: vec![string_val("1")],
2167                    ..Default::default()
2168                })]);
2169                Ok(Response::from(stream))
2170            });
2171
2172        mock.expect_create_session().returning(|_| {
2173            Ok(Response::new(Session {
2174                name: "session".to_string(),
2175                multiplexed: true,
2176                ..Default::default()
2177            }))
2178        });
2179
2180        let (address, _server) = start("127.0.0.1:0", mock).await?;
2181
2182        let client: Spanner = Spanner::builder()
2183            .with_endpoint(address)
2184            .with_credentials(Anonymous::new().build())
2185            .build()
2186            .await?;
2187
2188        let db_client = client.database_client("db").build().await?;
2189
2190        let tx = db_client
2191            .read_only_transaction()
2192            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2193            .build()
2194            .await?;
2195        let mut rs = tx.execute_query("SELECT 1").await?;
2196
2197        let row1 = rs
2198            .next()
2199            .await
2200            .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??;
2201        assert_eq!(
2202            row1.raw_values()[0].0,
2203            string_val("1"),
2204            "Verify resumed stream returns data"
2205        );
2206
2207        Ok(())
2208    }
2209
2210    #[tokio_test_no_panics]
2211    async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> {
2212        let mut mock = MockSpanner::new();
2213        let mut seq = mockall::Sequence::new();
2214
2215        // 1. Stream successfully returns metadata chunk then throws UNAVAILABLE on chunk 2.
2216        mock.expect_execute_streaming_sql()
2217            .times(1)
2218            .in_sequence(&mut seq)
2219            .returning(|_request| {
2220                let mut meta = metadata(1).unwrap();
2221                meta.transaction = Some(spanner_v1::Transaction {
2222                    id: vec![7, 8, 9],
2223                    read_timestamp: None,
2224                    ..Default::default()
2225                });
2226                let stream = adapt([
2227                    Ok(PartialResultSet {
2228                        metadata: Some(meta),
2229                        values: vec![string_val("1")],
2230                        resume_token: b"token1".to_vec(),
2231                        ..Default::default()
2232                    }),
2233                    Err(Status::unavailable("Transient mid-stream network issue")),
2234                ]);
2235                Ok(Response::from(stream))
2236            });
2237
2238        // 2. Stream resumes using Selector::Id.
2239        mock.expect_execute_streaming_sql()
2240            .times(1)
2241            .in_sequence(&mut seq)
2242            .returning(|req| {
2243                let req = req.into_inner();
2244                match req.transaction.unwrap().selector.unwrap() {
2245                    spanner_v1::transaction_selector::Selector::Id(id) => {
2246                        assert_eq!(id, vec![7, 8, 9]);
2247                    }
2248                    _ => panic!("Expected Selector::Id on stream retry"),
2249                }
2250
2251                let stream = adapt([Ok(PartialResultSet {
2252                    values: vec![string_val("2")],
2253                    ..Default::default()
2254                })]);
2255                Ok(Response::from(stream))
2256            });
2257
2258        mock.expect_create_session().returning(|_| {
2259            Ok(Response::new(Session {
2260                name: "session".to_string(),
2261                multiplexed: true,
2262                ..Default::default()
2263            }))
2264        });
2265
2266        let (address, _server) = start("127.0.0.1:0", mock).await?;
2267
2268        let client: Spanner = Spanner::builder()
2269            .with_endpoint(address)
2270            .with_credentials(Anonymous::new().build())
2271            .build()
2272            .await?;
2273
2274        let db_client = client.database_client("db").build().await?;
2275
2276        let tx = db_client
2277            .read_only_transaction()
2278            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2279            .build()
2280            .await?;
2281        let mut rs = tx.execute_query("SELECT 1").await?;
2282
2283        let row1 = rs
2284            .next()
2285            .await
2286            .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??;
2287        assert_eq!(
2288            row1.raw_values()[0].0,
2289            string_val("1"),
2290            "Verified chunk 1 payload"
2291        );
2292        let row2 = rs
2293            .next()
2294            .await
2295            .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??;
2296        assert_eq!(
2297            row2.raw_values()[0].0,
2298            string_val("2"),
2299            "Verified chunk 2 reboot dynamically intercepted ID bounds correctly"
2300        );
2301
2302        Ok(())
2303    }
2304
2305    #[tokio_test_no_panics]
2306    async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> {
2307        let mut mock = MockSpanner::new();
2308        let mut seq = mockall::Sequence::new();
2309
2310        // 1. Initial stream successfully returns metadata chunk but completely lacks the `Transaction` entity.
2311        mock.expect_execute_streaming_sql()
2312            .times(1)
2313            .in_sequence(&mut seq)
2314            .returning(|_request| {
2315                let stream = adapt([Ok(PartialResultSet {
2316                    metadata: metadata(1), // Missing `.transaction` natively
2317                    values: vec![string_val("1")],
2318                    ..Default::default()
2319                })]);
2320                Ok(Response::from(stream))
2321            });
2322
2323        mock.expect_create_session().returning(|_| {
2324            Ok(Response::new(Session {
2325                name: "session".to_string(),
2326                multiplexed: true,
2327                ..Default::default()
2328            }))
2329        });
2330
2331        let (address, _server) = start("127.0.0.1:0", mock).await?;
2332
2333        let client: Spanner = Spanner::builder()
2334            .with_endpoint(address)
2335            .with_credentials(Anonymous::new().build())
2336            .build()
2337            .await?;
2338
2339        let db_client = client.database_client("db").build().await?;
2340
2341        // Use explicitly deferred Lazy begin transaction!
2342        let tx = db_client
2343            .read_only_transaction()
2344            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2345            .build()
2346            .await?;
2347        let err = tx
2348            .execute_query("SELECT 1")
2349            .await
2350            .expect_err("Expected eager validation error");
2351        assert!(
2352            err.to_string()
2353                .contains("failed to return a transaction ID"),
2354            "Caught implicit gap boundary: {}",
2355            err
2356        );
2357
2358        Ok(())
2359    }
2360
2361    #[tokio_test_no_panics]
2362    async fn result_set_stats() -> anyhow::Result<()> {
2363        let mock_stats = spanner_v1::ResultSetStats {
2364            query_plan: Some(spanner_v1::QueryPlan::default()),
2365            ..Default::default()
2366        };
2367
2368        let mut rs = run_mock_query(vec![PartialResultSet {
2369            metadata: metadata(2),
2370            values: vec![string_val("a"), string_val("b")],
2371            last: true,
2372            stats: Some(mock_stats),
2373            ..Default::default()
2374        }])
2375        .await;
2376
2377        rs.next().await.transpose()?;
2378
2379        let received_stats = rs.stats().expect("stats should be available");
2380        assert!(received_stats.query_plan.is_some());
2381
2382        Ok(())
2383    }
2384
2385    #[tokio_test_no_panics]
2386    async fn result_set_update_count() -> anyhow::Result<()> {
2387        let mock_stats = spanner_v1::ResultSetStats {
2388            row_count: Some(RowCount::RowCountExact(42_i64)),
2389            ..Default::default()
2390        };
2391
2392        let mut result_set = run_mock_query(vec![PartialResultSet {
2393            metadata: metadata(2),
2394            values: vec![string_val("a"), string_val("b")],
2395            last: true,
2396            stats: Some(mock_stats),
2397            ..Default::default()
2398        }])
2399        .await;
2400
2401        result_set.next().await.transpose()?;
2402
2403        let update_count = result_set
2404            .update_count()
2405            .expect("Expected update count to be populated");
2406        assert_eq!(update_count, 42, "Expected exactly 42 rows updated");
2407
2408        Ok(())
2409    }
2410
2411    #[tokio_test_no_panics]
2412    async fn result_set_duplicate_stats() -> anyhow::Result<()> {
2413        let mock_stats = spanner_v1::ResultSetStats {
2414            query_plan: Some(spanner_v1::QueryPlan::default()),
2415            ..Default::default()
2416        };
2417
2418        let mut rs = run_mock_query(vec![
2419            PartialResultSet {
2420                metadata: metadata(2),
2421                values: vec![string_val("a"), string_val("b")],
2422                stats: Some(mock_stats.clone()),
2423                resume_token: b"token1".to_vec(),
2424                ..Default::default()
2425            },
2426            PartialResultSet {
2427                values: vec![string_val("c"), string_val("d")],
2428                stats: Some(mock_stats),
2429                last: true,
2430                resume_token: b"token2".to_vec(),
2431                ..Default::default()
2432            },
2433        ])
2434        .await;
2435
2436        // First row should be processed and returned successfully
2437        let next = rs.next().await;
2438        assert!(next.is_some());
2439        assert!(next.expect("should yield a row").is_ok());
2440
2441        // Second call should process the second message and fail due to duplicate stats
2442        let res2 = rs.next().await;
2443        assert!(res2.is_some());
2444        let res2 = res2.expect("should yield an error");
2445        assert!(res2.is_err());
2446        let err_str = res2.expect_err("should be an error").to_string();
2447        assert!(err_str.contains("Additional stats received after first"));
2448
2449        Ok(())
2450    }
2451
2452    #[tokio_test_no_panics]
2453    async fn test_lazy_begin_deadlock_fixed() -> anyhow::Result<()> {
2454        let mut mock = MockSpanner::new();
2455        let mut seq = mockall::Sequence::new();
2456
2457        // Setup mock to return metadata with transaction ID on first query.
2458        mock.expect_execute_streaming_sql()
2459            .times(1)
2460            .in_sequence(&mut seq)
2461            .returning(|_request| {
2462                let mut meta = metadata(1).expect("failed to create metadata");
2463                meta.transaction = Some(spanner_v1::Transaction {
2464                    id: b"lazy_tx_id".to_vec(),
2465                    ..Default::default()
2466                });
2467                let rx = adapt(
2468                    vec![Ok(PartialResultSet {
2469                        metadata: Some(meta),
2470                        values: vec![string_val("1")],
2471                        ..Default::default()
2472                    })]
2473                    .into_iter(),
2474                );
2475                Ok(Response::from(rx))
2476            });
2477
2478        // Mock call for second query which must carry the returned transaction ID
2479        mock.expect_execute_streaming_sql()
2480            .times(1)
2481            .in_sequence(&mut seq)
2482            .returning(|req| {
2483                let req = req.into_inner();
2484                let selector = req
2485                    .transaction
2486                    .expect("missing transaction component")
2487                    .selector
2488                    .expect("missing selector component");
2489
2490                match selector {
2491                    spanner_v1::transaction_selector::Selector::Id(id) => {
2492                        assert_eq!(id, b"lazy_tx_id".to_vec());
2493                    }
2494                    _ => panic!("Expected Selector::Id"),
2495                }
2496
2497                let rx = adapt(
2498                    vec![Ok(PartialResultSet {
2499                        metadata: metadata(1),
2500                        values: vec![string_val("2")],
2501                        ..Default::default()
2502                    })]
2503                    .into_iter(),
2504                );
2505                Ok(Response::from(rx))
2506            });
2507
2508        mock.expect_create_session().returning(|_| {
2509            Ok(Response::new(Session {
2510                name: "session".to_string(),
2511                multiplexed: true,
2512                ..Default::default()
2513            }))
2514        });
2515
2516        let (address, _server) = start("127.0.0.1:0", mock).await?;
2517
2518        let client: Spanner = Spanner::builder()
2519            .with_endpoint(address)
2520            .with_credentials(Anonymous::new().build())
2521            .build()
2522            .await?;
2523
2524        let db_client = client.database_client("db").build().await?;
2525
2526        // Use inline begin transaction
2527        let tx = db_client
2528            .read_only_transaction()
2529            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2530            .build()
2531            .await?;
2532
2533        // Execute query but DO NOT call rs.next()
2534        let _rs = tx.execute_query("SELECT 1").await?;
2535
2536        // Execute second query against same transaction
2537        let mut rs2 = tx.execute_query("SELECT 2").await?;
2538
2539        // Assert it does not hang and yielded elements properly
2540        let row2 = rs2.next().await;
2541        assert!(
2542            row2.is_some(),
2543            "Implicit deadlock encountered; query 2 stalled"
2544        );
2545
2546        Ok(())
2547    }
2548
2549    #[tokio_test_no_panics]
2550    async fn test_result_set_metadata_not_available() {
2551        // Test our explicit safeguard for when Spanner violates the API contract and returns a first PartialResultSet without metadata.
2552        let res = run_mock_query_fallible(vec![PartialResultSet {
2553            metadata: None,
2554            values: vec![string_val("1")],
2555            ..Default::default()
2556        }])
2557        .await;
2558
2559        let err = res.expect_err("Expected query initialization to fail eagerly");
2560        assert!(
2561            err.to_string()
2562                .contains("First PartialResultSet did not contain metadata"),
2563            "Expected missing metadata safeguard error, got: {}",
2564            err
2565        );
2566    }
2567
2568    #[tokio_test_no_panics]
2569    async fn test_result_set_metadata_available_before_next() -> anyhow::Result<()> {
2570        let mut mock = MockSpanner::new();
2571
2572        // Setup mock to return metadata in first chunk.
2573        mock.expect_execute_streaming_sql().returning(|_request| {
2574            let rx = adapt(
2575                vec![Ok(PartialResultSet {
2576                    metadata: metadata(1),
2577                    values: vec![string_val("1")],
2578                    ..Default::default()
2579                })]
2580                .into_iter(),
2581            );
2582            Ok(Response::from(rx))
2583        });
2584
2585        mock.expect_create_session().returning(|_| {
2586            Ok(Response::new(Session {
2587                name: "session".to_string(),
2588                multiplexed: true,
2589                ..Default::default()
2590            }))
2591        });
2592
2593        let (address, _server) = start("127.0.0.1:0", mock).await?;
2594
2595        let client: Spanner = Spanner::builder()
2596            .with_endpoint(address)
2597            .with_credentials(Anonymous::new().build())
2598            .build()
2599            .await?;
2600
2601        let db_client = client.database_client("db").build().await?;
2602        let tx = db_client.single_use().build();
2603
2604        let mut rs = tx.execute_query("SELECT 1").await?;
2605
2606        // Call metadata() BEFORE next(). It should succeed immediately.
2607        let metadata = rs.metadata().expect("metadata available");
2608        assert_eq!(metadata.column_names().len(), 1);
2609        assert_eq!(metadata.column_names()[0], "col0");
2610
2611        // Now consume the row
2612        let row = rs.next().await;
2613        assert!(row.is_some());
2614
2615        Ok(())
2616    }
2617}