Skip to main content

google_cloud_spanner/
result_set.rs

1// Copyright 2026 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::database_client::DatabaseClient;
16use crate::error::internal_error;
17use crate::google::spanner::v1::{self, PartialResultSet};
18use crate::model::ResultSetStats;
19use crate::model::result_set_stats::RowCount;
20use crate::precommit::PrecommitTokenTracker;
21use crate::read_only_transaction::{ReadContextTransactionSelector, TransactionState};
22use crate::result_set_metadata::ResultSetMetadata;
23use crate::retry_policy::SpannerRetryPolicy;
24use crate::row::Row;
25use crate::server_streaming::stream::PartialResultSetStream;
26use bytes::Bytes;
27use gaxi::prost::FromProto;
28use google_cloud_gax::backoff_policy::BackoffPolicy;
29use google_cloud_gax::exponential_backoff::ExponentialBackoffBuilder;
30use google_cloud_gax::options::RequestOptions as GaxRequestOptions;
31use google_cloud_gax::retry_policy::RetryPolicyExt;
32use google_cloud_gax::retry_result::RetryResult;
33use google_cloud_gax::retry_state::RetryState;
34use std::collections::VecDeque;
35use std::mem::take;
36use std::sync::Arc;
37use std::time::Duration;
38use tokio::runtime::Handle;
39use tokio::time::{sleep, timeout};
40
41#[cfg(feature = "unstable-stream")]
42use futures::Stream;
43
44/// `ResultSet` contains the rows of a query result.
45///
46/// # Example
47/// ```
48/// # use google_cloud_spanner::result::ResultSet;
49/// # use google_cloud_spanner::result::Row;
50/// # async fn process_result_set(mut rs: ResultSet) -> Result<(), google_cloud_spanner::Error> {
51/// while let Some(row) = rs.next().await {
52///     let row: Row = row?;
53///     // Process the row
54/// }
55/// # Ok(())
56/// # }
57/// ```
58#[derive(Debug)]
59pub struct ResultSet {
60    stream: Option<PartialResultSetStream>,
61    buffered_values: Vec<prost_types::Value>,
62    chunked: bool,
63    seen_last: bool,
64    ready_rows: VecDeque<Row>,
65    local_metadata: Option<ResultSetMetadata>,
66    stats: Option<ResultSetStats>,
67    precommit_token_tracker: PrecommitTokenTracker,
68    tokio_handle: Option<Handle>,
69
70    // Fields for retries and buffering of a stream of PartialResultSets.
71    client: DatabaseClient,
72    session_name: String,
73    transaction_tag: Option<String>,
74    operation: StreamOperation,
75    last_resume_token: Bytes,
76    partial_result_sets_buffer: VecDeque<PartialResultSet>,
77    safe_to_retry: bool,
78    max_buffered_partial_result_sets: usize,
79    retry_count: usize,
80    transaction_selector: Option<ReadContextTransactionSelector>,
81    channel_hint: usize,
82    gax_options: GaxRequestOptions,
83}
84
85#[derive(Debug, Clone)]
86pub(crate) enum StreamOperation {
87    Query(crate::model::ExecuteSqlRequest),
88    Read(crate::model::ReadRequest),
89}
90
91pub(crate) struct ResultSetParams {
92    pub stream: PartialResultSetStream,
93    pub transaction_selector: Option<ReadContextTransactionSelector>,
94    pub precommit_token_tracker: PrecommitTokenTracker,
95    pub client: DatabaseClient,
96    pub session_name: String,
97    pub transaction_tag: Option<String>,
98    pub operation: StreamOperation,
99    pub channel_hint: usize,
100    pub gax_options: GaxRequestOptions,
101}
102
103// The maximum number of PartialResultSets to buffer without a resume token.
104// Spanner will normally include a resume token with each PartialResultSet.
105// This maximum is therefore primarily for safety.
106const MAX_BUFFERED_PARTIAL_RESULT_SETS: usize = 10;
107
108impl ResultSet {
109    /// Creates a new result set asynchronously, waiting for the first chunk to arrive.
110    pub(crate) async fn create(params: ResultSetParams) -> crate::Result<Self> {
111        let mut result_set = Self::new(params);
112        result_set.init_stream().await?;
113        Ok(result_set)
114    }
115
116    /// Creates a new result set.
117    fn new(params: ResultSetParams) -> Self {
118        let ResultSetParams {
119            stream,
120            transaction_selector,
121            precommit_token_tracker,
122            client,
123            session_name,
124            transaction_tag,
125            operation,
126            channel_hint,
127            gax_options,
128        } = params;
129
130        let gax_options = Self::apply_defaults(gax_options);
131
132        Self {
133            stream: Some(stream),
134            buffered_values: Vec::new(),
135            chunked: false,
136            seen_last: false,
137            ready_rows: VecDeque::new(),
138            local_metadata: None,
139            stats: None,
140            precommit_token_tracker,
141            client,
142            session_name,
143            transaction_tag,
144            operation,
145            last_resume_token: Bytes::new(),
146            partial_result_sets_buffer: VecDeque::new(),
147            safe_to_retry: true,
148            max_buffered_partial_result_sets: MAX_BUFFERED_PARTIAL_RESULT_SETS,
149            retry_count: 0,
150            transaction_selector,
151            channel_hint,
152            gax_options,
153            tokio_handle: Handle::try_current().ok(),
154        }
155    }
156
157    fn apply_defaults(mut gax_options: GaxRequestOptions) -> GaxRequestOptions {
158        if gax_options.retry_policy().is_none() {
159            gax_options.set_retry_policy(SpannerRetryPolicy::new().with_attempt_limit(10));
160        }
161        if gax_options.backoff_policy().is_none() {
162            gax_options.set_backoff_policy(Self::default_backoff_policy());
163        }
164        gax_options
165    }
166
167    fn default_backoff_policy() -> Arc<dyn BackoffPolicy> {
168        Arc::new(ExponentialBackoffBuilder::default().clamp())
169    }
170
171    async fn init_stream(&mut self) -> crate::Result<()> {
172        // We loop here because if an initial stream failure occurs and is retriable (e.g., UNAVAILABLE),
173        // we restart the stream and retry fetching the initial chunk.
174        loop {
175            let stream_result = match &mut self.stream {
176                Some(s) => s.next_message().await,
177                None => {
178                    return Err(internal_error(
179                        "Query stream ended without metadata or error",
180                    ));
181                }
182            };
183
184            match stream_result {
185                Some(Ok(partial_result_set)) => {
186                    self.handle_partial_result_set(partial_result_set)?;
187                    return Ok(());
188                }
189                Some(Err(e)) => {
190                    self.handle_stream_error(e).await?;
191                }
192                None => {
193                    return Err(internal_error(
194                        "Query stream ended without metadata or error",
195                    ));
196                }
197            }
198        }
199    }
200
201    /// Returns the metadata of the result set.
202    ///
203    /// # Example
204    /// ```
205    /// # use google_cloud_spanner::result::ResultSet;
206    /// # use google_cloud_spanner::result::Row;
207    /// # async fn fetch_metadata(mut rs: ResultSet) -> Result<(), Box<dyn std::error::Error>> {
208    /// if let Some(metadata) = rs.metadata() {
209    ///     for column in metadata.column_names() {
210    ///         println!("Column name: {}", column);
211    ///     }
212    /// }
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub fn metadata(&self) -> Option<&ResultSetMetadata> {
217        self.local_metadata.as_ref()
218    }
219
220    /// Returns the stats of the result set, if available.
221    ///
222    /// # Example
223    /// ```
224    /// # use google_cloud_spanner::result::ResultSet;
225    /// # use google_cloud_spanner::result::Row;
226    /// # async fn process_stats(mut rs: ResultSet) -> Result<(), google_cloud_spanner::Error> {
227    /// while let Some(row) = rs.next().await {
228    ///     let row = row?;
229    ///     // Process row
230    /// }
231    /// if let Some(stats) = rs.stats() {
232    ///     println!("Query plan: {:?}", stats.query_plan);
233    /// }
234    /// # Ok(())
235    /// # }
236    /// ```
237    ///
238    /// Stats are only available after the results have been fully consumed
239    /// and the query was run in PLAN or PROFILE mode.
240    pub fn stats(&self) -> Option<&ResultSetStats> {
241        self.stats.as_ref()
242    }
243
244    /// Returns the number of rows modified by the DML statement, if available.
245    ///
246    /// # Example
247    /// ```
248    /// # use google_cloud_spanner::client::DatabaseClient;
249    /// # use google_cloud_spanner::result::ResultSet;
250    /// # use google_cloud_spanner::statement::Statement;
251    /// # async fn check_update_count(db_client: &DatabaseClient) -> Result<(), Box<dyn std::error::Error>> {
252    /// let runner = db_client.read_write_transaction().build().await?;
253    /// runner.run(async |tx| {
254    ///     let stmt = Statement::builder("UPDATE Singers SET LastName = 'Simpson' WHERE SingerId = @id THEN RETURN SingerId, LastName")
255    ///         .add_param("id", &123_i64)
256    ///         .build();
257    ///     let mut rs = tx.execute_query(stmt).await?;
258    ///     while let Some(row) = rs.next().await.transpose()? {
259    ///         // Process returned rows
260    ///     }
261    ///     if let Some(count) = rs.update_count() {
262    ///         println!("Rows modified: {}", count);
263    ///     }
264    ///     Ok(())
265    /// }).await?;
266    /// # Ok(())
267    /// # }
268    /// ```
269    ///
270    /// Returns the number of rows modified when this [`ResultSet`] was produced from a
271    /// DML statement with a `THEN RETURN` clause.
272    pub fn update_count(&self) -> Option<i64> {
273        self.stats.as_ref().and_then(|s| {
274            s.row_count.as_ref().map(|rc| match rc {
275                RowCount::RowCountExact(c) => *c,
276                RowCount::RowCountLowerBound(c) => *c,
277            })
278        })
279    }
280
281    /// Fetches the next row from the result set.
282    ///
283    /// # Example
284    /// ```
285    /// # use google_cloud_spanner::result::ResultSet;
286    /// # use google_cloud_spanner::result::Row;
287    /// # async fn fetch_next(mut rs: ResultSet) -> Result<(), google_cloud_spanner::Error> {
288    /// if let Some(row) = rs.next().await.transpose()? {
289    ///     // Process the row
290    /// }
291    /// # Ok(())
292    /// # }
293    /// ```
294    ///
295    /// Returns `None` when all rows have been retrieved.
296    pub async fn next(&mut self) -> Option<crate::Result<Row>> {
297        loop {
298            if let Some(row) = self.ready_rows.pop_front() {
299                return Some(Ok(row));
300            }
301
302            if self.seen_last {
303                if let Some(handle) = &self.tokio_handle
304                    && let Some(s) = self.stream.take()
305                {
306                    drain_stream_in_background(handle, s);
307                }
308                return None;
309            }
310
311            let stream_result = match &mut self.stream {
312                Some(s) => s.next_message().await,
313                None => return None,
314            };
315
316            match stream_result {
317                Some(Ok(partial_result_set)) => {
318                    if let Err(e) = self.handle_partial_result_set(partial_result_set) {
319                        return Some(Err(e));
320                    }
321                }
322                Some(Err(e)) => {
323                    if let Err(err) = self.handle_stream_error(e).await {
324                        return Some(Err(err));
325                    }
326                }
327                None => match self.handle_stream_end() {
328                    Ok(Some(row)) => return Some(Ok(row)),
329                    Ok(None) => return None,
330                    Err(e) => return Some(Err(e)),
331                },
332            }
333        }
334    }
335
336    /// Converts the [`ResultSet`] into a [`Stream`].
337    ///
338    /// # Example
339    ///
340    /// ```
341    /// # use google_cloud_spanner::result::ResultSet;
342    /// # use futures::TryStreamExt;
343    /// # use std::future::ready;
344    /// # async fn example(result_set: ResultSet) -> Result<(), google_cloud_spanner::Error> {
345    /// let rows: Vec<_> = result_set
346    ///     .into_stream()
347    ///     .try_filter(|row| {
348    ///         let id = row.get::<String, _>("Id");
349    ///         ready(id == "id1")
350    ///     })
351    ///     .try_collect()
352    ///     .await?;
353    /// # Ok(())
354    /// # }
355    /// ```
356    ///
357    /// This consumes the [`ResultSet`] and returns a stream of rows.
358    #[cfg(feature = "unstable-stream")]
359    pub fn into_stream(self) -> impl Stream<Item = crate::Result<Row>> + Unpin {
360        use futures::stream::unfold;
361        Box::pin(unfold(self, |mut result_set| async move {
362            result_set.next().await.map(|row| (row, result_set))
363        }))
364    }
365}
366
367impl ResultSet {
368    fn handle_partial_result_set(
369        &mut self,
370        mut partial_result_set: PartialResultSet,
371    ) -> crate::Result<()> {
372        self.precommit_token_tracker.update(
373            partial_result_set
374                .precommit_token
375                .clone()
376                .map(|t| t.cnv().expect("failed to convert precommit token")),
377        );
378
379        if partial_result_set.last {
380            self.seen_last = true;
381        }
382
383        match (
384            self.local_metadata.as_ref(),
385            partial_result_set.metadata.take(),
386        ) {
387            (Some(_), None) => {}
388            (None, None) => {
389                return Err(internal_error(
390                    "First PartialResultSet did not contain metadata",
391                ));
392            }
393            (Some(_), Some(_)) => {
394                return Err(internal_error("Additional metadata after first result set"));
395            }
396            (None, Some(m)) => {
397                self.handle_metadata(m)?;
398            }
399        }
400
401        // Keep track of the last resume_token that we see to be able to resume the stream
402        // in case of a transient error. Most PartialResultSets will have a resume token,
403        // but the API contract is not explicitly guaranteeing that each of them will have
404        // one.
405        if !partial_result_set.resume_token.is_empty() {
406            self.last_resume_token = partial_result_set.resume_token.clone();
407            self.safe_to_retry = true;
408            self.partial_result_sets_buffer
409                .push_back(partial_result_set);
410            self.flush_buffer()?;
411            return Ok(());
412        }
413
414        // The PartialResultSet did not have a resume_token. Buffer the result
415        // and continue with the next PartialResultSet, unless the buffer is full.
416        if self.partial_result_sets_buffer.len() >= self.max_buffered_partial_result_sets {
417            // Mark this stream as 'unsafe to retry', meaning that any transient error
418            // that we see will not be retried. We will instead propagate the error.
419            self.safe_to_retry = false;
420            if let Some(oldest) = self.partial_result_sets_buffer.pop_front() {
421                self.process_partial_result_set(oldest)?;
422            }
423        }
424        self.partial_result_sets_buffer
425            .push_back(partial_result_set);
426
427        if self.seen_last {
428            self.flush_buffer()?;
429            if self.chunked {
430                return Err(crate::error::internal_error(
431                    "Stream ended with chunked_value=true",
432                ));
433            }
434        }
435
436        Ok(())
437    }
438
439    fn handle_metadata(&mut self, mut m: v1::ResultSetMetadata) -> crate::Result<()> {
440        let transaction = m.transaction.take();
441        let meta = ResultSetMetadata::new(Some(m));
442        if let Some(selector) = &self.transaction_selector {
443            if let Some(transaction) = transaction {
444                selector.update(
445                    transaction.id,
446                    transaction
447                        .read_timestamp
448                        .and_then(|t| wkt::Timestamp::new(t.seconds, t.nanos).ok()),
449                )?;
450            } else if let ReadContextTransactionSelector::Lazy(lazy) = selector {
451                let is_started = matches!(
452                    &*lazy.lock().expect("transaction state mutex poisoned"),
453                    TransactionState::Started(_, _)
454                );
455                if !is_started {
456                    return Err(internal_error(
457                        "Spanner failed to return a transaction ID for a query that included a BeginTransaction option",
458                    ));
459                }
460            }
461        }
462        self.local_metadata = Some(meta);
463        Ok(())
464    }
465    async fn handle_stream_error(&mut self, e: crate::Error) -> crate::Result<()> {
466        let mut e = e;
467        if self.safe_to_retry {
468            match self.check_retry(e) {
469                Ok(()) => {
470                    self.retry_count += 1;
471                    // Clear the buffer and restart the stream using the last
472                    // resume_token that we have seen.
473                    self.partial_result_sets_buffer.clear();
474
475                    // Apply backoff delay if policy is present
476                    if let Some(policy) = self.gax_options.backoff_policy() {
477                        let state = RetryState::new(self.safe_to_retry)
478                            .set_attempt_count(self.retry_count as u32);
479                        let delay = policy.on_failure(&state);
480                        sleep(delay).await;
481                    }
482
483                    self.restart_stream().await?;
484                    return Ok(());
485                }
486                Err(err) => {
487                    e = err;
488                }
489            }
490        }
491
492        // Check if this stream included an inlined BeginTransaction option
493        // and has not yet returned a transaction ID. If so, we explicitly
494        // begin the transaction and restart the stream.
495        let Some(ReadContextTransactionSelector::Lazy(lazy)) = &self.transaction_selector else {
496            return Err(e);
497        };
498        let is_started = matches!(
499            &*lazy.lock().unwrap(),
500            crate::read_only_transaction::TransactionState::Started(_, _)
501        );
502        if is_started {
503            return Err(e);
504        }
505
506        self.transaction_selector
507            .as_ref()
508            .unwrap()
509            .begin_explicitly(crate::read_only_transaction::ExplicitBeginParams {
510                client: self.client.clone(),
511                session_name: self.session_name.clone(),
512                transaction_tag: self.transaction_tag.clone(),
513                channel_hint: self.channel_hint,
514                request_options: self.gax_options.clone(),
515                is_stream_fallback: true,
516                precommit_token_tracker: self.precommit_token_tracker.clone(),
517                mutation_key: None,
518            })
519            .await?;
520
521        self.partial_result_sets_buffer.clear();
522        self.restart_stream().await?;
523        Ok(())
524    }
525
526    fn handle_stream_end(&mut self) -> crate::Result<Option<Row>> {
527        // We are at the end of the stream. Return any buffered rows as long
528        // as there are any. If there are no buffered rows, return None.
529
530        // First flush any PartialResultSets that we had received without a resume_token.
531        if !self.partial_result_sets_buffer.is_empty() {
532            self.flush_buffer()?;
533        }
534        if self.chunked {
535            // This should never happen.
536            return Err(crate::error::internal_error(
537                "Stream ended with chunked_value=true",
538            ));
539        }
540        if let Some(row) = self.ready_rows.pop_front() {
541            return Ok(Some(row));
542        }
543        Ok(None)
544    }
545
546    fn flush_buffer(&mut self) -> crate::Result<()> {
547        let mut buffer_to_flush = take(&mut self.partial_result_sets_buffer);
548        while let Some(partial_result_set) = buffer_to_flush.pop_front() {
549            self.process_partial_result_set(partial_result_set)?;
550        }
551        Ok(())
552    }
553
554    fn process_partial_result_set(
555        &mut self,
556        partial_result_set: PartialResultSet,
557    ) -> crate::Result<()> {
558        let PartialResultSet {
559            stats,
560            values,
561            chunked_value,
562            ..
563        } = partial_result_set;
564
565        match (&self.stats, stats) {
566            (Some(_), Some(_)) => {
567                return Err(internal_error("Additional stats received after first"));
568            }
569            (None, Some(s)) => {
570                let converted_stats = s
571                    .cnv()
572                    .map_err(|e| internal_error(format!("failed to convert stats: {}", e)))?;
573                self.stats = Some(converted_stats);
574            }
575            _ => {}
576        }
577
578        if values.is_empty() {
579            return Ok(());
580        }
581        let metadata = self.local_metadata.as_ref().ok_or_else(|| {
582            internal_error("PartialResultSet contained values but no metadata was provided")
583        })?;
584        if metadata.column_types.is_empty() {
585            return Err(internal_error(
586                "PartialResultSet contained values but no column metadata was provided",
587            ));
588        }
589
590        let mut values_iter = values.into_iter();
591        if self.chunked
592            && let Some(last_val) = self.buffered_values.last_mut()
593            && let Some(first_new) = values_iter.next()
594        {
595            merge_values(last_val, first_new)?;
596        }
597
598        self.buffered_values.extend(values_iter);
599        self.chunked = chunked_value;
600
601        while self.buffered_values.len() >= metadata.column_types.len() {
602            let column_count = metadata.column_types.len();
603            if self.buffered_values.len() == column_count && self.chunked {
604                break;
605            }
606
607            let row_values: Vec<crate::value::Value> = self
608                .buffered_values
609                .drain(..column_count)
610                .map(crate::value::Value)
611                .collect();
612            self.ready_rows.push_back(Row {
613                values: row_values,
614                metadata: metadata.clone(),
615            });
616        }
617        Ok(())
618    }
619
620    async fn restart_stream(&mut self) -> crate::Result<()> {
621        // If we are restarting the stream (due to a failure), and the transaction
622        // was in the process of starting (but failed before yielding an ID),
623        // reset the state so the retry attempt can include the begin option again.
624        if let Some(s) = &self.transaction_selector {
625            s.maybe_reset_starting();
626        }
627
628        // Get the latest transaction selector for this transaction.
629        let transaction_selector = if let Some(s) = &self.transaction_selector {
630            Some(s.selector().await?)
631        } else {
632            None
633        };
634
635        // If we are restarting the stream from the beginning (because no resume token
636        // was received prior to the transient failure), we clear our local metadata state.
637        // This ensures that when Spanner transmits the initial metadata chunk on the retried stream,
638        // it is extracted without triggering the 'only-once' metadata validation error.
639        if self.last_resume_token.is_empty() {
640            self.local_metadata = None;
641        }
642
643        match &mut self.operation {
644            StreamOperation::Query(req) => {
645                req.resume_token = self.last_resume_token.clone();
646                req.transaction = transaction_selector
647                    .clone()
648                    .or_else(|| req.transaction.take());
649                let stream = self
650                    .client
651                    .spanner
652                    .execute_streaming_sql(req.clone(), self.gax_options.clone(), self.channel_hint)
653                    .send()
654                    .await?;
655                self.stream = Some(stream);
656            }
657            StreamOperation::Read(req) => {
658                req.resume_token = self.last_resume_token.clone();
659                req.transaction = transaction_selector
660                    .clone()
661                    .or_else(|| req.transaction.take());
662                let stream = self
663                    .client
664                    .spanner
665                    .streaming_read(req.clone(), self.gax_options.clone(), self.channel_hint)
666                    .send()
667                    .await?;
668                self.stream = Some(stream);
669            }
670        }
671        Ok(())
672    }
673
674    fn check_retry(&self, e: crate::Error) -> Result<(), crate::Error> {
675        if let Some(policy) = self.gax_options.retry_policy() {
676            let state =
677                RetryState::new(self.safe_to_retry).set_attempt_count(self.retry_count as u32);
678
679            match policy.on_error(&state, e) {
680                RetryResult::Continue(_) => return Ok(()),
681                RetryResult::Permanent(err) | RetryResult::Exhausted(err) => return Err(err),
682            }
683        }
684        Err(e)
685    }
686}
687
688impl Drop for ResultSet {
689    fn drop(&mut self) {
690        // If the query stream has finished sending all chunks (seen_last is true), but
691        // the client hasn't read the trailers/EOF yet, dropping the stream receiver
692        // would cause tonic to send an HTTP/2 RST_STREAM.
693        // If an application often executes a query that it knows only returns one or a
694        // few rows, and the application stops reading after that many rows, then these
695        // stream resets could trigger GFE/frontend security protection (too_many_internal_resets).
696        // To prevent this, we drain the remaining trailers asynchronously in a background task.
697        // Note: We only do this if seen_last is true, to prevent a background task from potentially
698        // iterating through a large number of partial results.
699        if self.seen_last
700            && let Some(handle) = &self.tokio_handle
701            && let Some(s) = self.stream.take()
702        {
703            drain_stream_in_background(handle, s);
704        }
705    }
706}
707
708fn drain_stream_in_background(handle: &Handle, mut stream: PartialResultSetStream) {
709    handle.spawn(async move {
710        let _ = timeout(Duration::from_secs(5), async move {
711            while let Some(Ok(_)) = stream.next_message().await {}
712        })
713        .await;
714    });
715}
716
717/// Merges two values from successive `PartialResultSet`s into a single value.
718///
719/// Cloud Spanner can return a single logical row or column value split across multiple
720/// `PartialResultSet` messages. This occurs when a value (especially large strings or
721/// arrays) exceeds the message size limits of the underlying stream. In these cases,
722/// the `chunked_value` flag is set on the first `PartialResultSet`, indicating that the
723/// final value in the message's `values` array is incomplete and must be combined with
724/// the first value in the `values` array of the subsequent `PartialResultSet`.
725///
726/// This function handles the concatenation of split `StringValue` and `ListValue` types.
727fn merge_values(target: &mut prost_types::Value, source: prost_types::Value) -> crate::Result<()> {
728    use prost_types::value::Kind;
729    match (&mut target.kind, source.kind) {
730        (Some(Kind::StringValue(s)), Some(Kind::StringValue(source_s))) => {
731            s.push_str(&source_s);
732            Ok(())
733        }
734        (Some(Kind::ListValue(target_list)), Some(Kind::ListValue(mut source_list))) => {
735            if source_list.values.is_empty() {
736                return Ok(());
737            }
738            if target_list.values.is_empty() {
739                target_list.values = source_list.values;
740                return Ok(());
741            }
742
743            let source_first = source_list.values.remove(0);
744            if let Some(target_last) = target_list.values.last_mut() {
745                match (&target_last.kind, &source_first.kind) {
746                    (Some(Kind::StringValue(_)), Some(Kind::StringValue(_)))
747                    | (Some(Kind::ListValue(_)), Some(Kind::ListValue(_))) => {
748                        merge_values(target_last, source_first)?;
749                    }
750                    _ => {
751                        target_list.values.push(source_first);
752                    }
753                }
754            } else {
755                target_list.values.push(source_first);
756            }
757            target_list.values.extend(source_list.values);
758            Ok(())
759        }
760        // This is not expected to happen and indicates that Spanner returned data that
761        // violates the contract. In this case we return a service error with error code
762        // Internal.
763        _ => Err(internal_error(
764            "Incompatible types for merging chunked values",
765        )),
766    }
767}
768
769#[cfg(test)]
770impl ResultSet {
771    pub(crate) fn set_max_buffered_partial_result_sets(&mut self, limit: usize) {
772        self.max_buffered_partial_result_sets = limit;
773    }
774}
775
776#[cfg(test)]
777pub(crate) mod tests {
778    use super::*;
779    use crate::client::Spanner;
780    use crate::key::KeySet;
781    use crate::read::ReadRequest;
782    use crate::statement::Statement;
783    use crate::transaction::BeginTransactionOption;
784    use gaxi::grpc::tonic::{Code as GrpcCode, MetadataMap, Response, Status};
785    use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
786    use google_cloud_gax::backoff_policy::BackoffPolicy;
787    use google_cloud_gax::retry_policy::{Aip194Strict, RetryPolicyExt};
788    use google_cloud_gax::retry_state::RetryState;
789    use google_cloud_test_macros::tokio_test_no_panics;
790    use prost_types::Value;
791    use spanner_grpc_mock::MockSpanner;
792    use spanner_grpc_mock::google::spanner::v1 as spanner_v1;
793    use spanner_grpc_mock::google::spanner::v1::struct_type::Field;
794    use spanner_grpc_mock::google::spanner::v1::{
795        MultiplexedSessionPrecommitToken, PartialResultSet, ResultSetMetadata, Session, StructType,
796    };
797    use spanner_grpc_mock::start;
798    use spanner_v1::result_set_stats::RowCount;
799    use std::time::Duration;
800
801    mockall::mock! {
802        #[derive(Debug)]
803        BackoffPolicy {}
804        impl BackoffPolicy for BackoffPolicy {
805            fn on_failure(&self, state: &RetryState) -> Duration;
806        }
807    }
808
809    pub(crate) fn string_val(s: &str) -> Value {
810        Value {
811            kind: Some(prost_types::value::Kind::StringValue(s.to_string())),
812        }
813    }
814
815    fn list_val(vals: Vec<Value>) -> Value {
816        Value {
817            kind: Some(prost_types::value::Kind::ListValue(
818                prost_types::ListValue { values: vals },
819            )),
820        }
821    }
822
823    fn metadata(cols: usize) -> Option<ResultSetMetadata> {
824        let mut fields = vec![];
825        for i in 0..cols {
826            fields.push(Field {
827                name: format!("col{}", i),
828                r#type: None,
829            });
830        }
831        Some(ResultSetMetadata {
832            row_type: Some(StructType { fields }),
833            transaction: None,
834            undeclared_parameters: None,
835        })
836    }
837
838    pub(crate) fn adapt<I, T>(items: I) -> tokio::sync::mpsc::Receiver<T>
839    where
840        I: IntoIterator<Item = T>,
841        I::IntoIter: ExactSizeIterator,
842    {
843        let items = items.into_iter();
844        let (tx, rx) = tokio::sync::mpsc::channel(items.len().max(1));
845        for i in items {
846            tx.try_send(i)
847                .expect("can't fail, we allocated enough capacity.");
848        }
849        rx
850    }
851
852    async fn run_mock_query(results: Vec<PartialResultSet>) -> ResultSet {
853        run_mock_query_fallible(results).await.unwrap()
854    }
855
856    async fn run_mock_query_fallible(results: Vec<PartialResultSet>) -> crate::Result<ResultSet> {
857        let mut mock = MockSpanner::new();
858        let rx = adapt(results.into_iter().map(Ok));
859        mock.expect_execute_streaming_sql()
860            .return_once(move |_request| Ok(Response::from(rx)));
861
862        mock.expect_create_session().returning(|_| {
863            Ok(Response::new(Session {
864                name: "session".to_string(),
865                multiplexed: true,
866                ..Default::default()
867            }))
868        });
869
870        let (address, _server) = start("127.0.0.1:0", mock)
871            .await
872            .expect("Failed to start mock server");
873
874        let client: Spanner = Spanner::builder()
875            .with_endpoint(address)
876            .with_credentials(Anonymous::new().build())
877            .build()
878            .await
879            .expect("Failed to build client");
880
881        let db_client: crate::database_client::DatabaseClient =
882            client.database_client("db").build().await.unwrap();
883        let tx: crate::read_only_transaction::SingleUseReadOnlyTransaction =
884            db_client.single_use().build();
885        tx.execute_query("SELECT 1").await
886    }
887
888    #[test]
889    fn test_auto_traits() {
890        static_assertions::assert_impl_all!(ResultSet: std::fmt::Debug, Send, Sync);
891    }
892
893    #[tokio_test_no_panics]
894    async fn test_result_set_zero_rows() {
895        let mut rs = run_mock_query(vec![PartialResultSet {
896            metadata: metadata(2),
897            values: vec![],
898            chunked_value: false,
899            resume_token: vec![],
900            stats: None,
901            precommit_token: None,
902            last: true,
903            cache_update: None,
904        }])
905        .await;
906
907        let next = rs.next().await;
908        assert!(next.is_none());
909    }
910
911    #[tokio_test_no_panics]
912    async fn test_result_set_metadata() -> anyhow::Result<()> {
913        let mut rs = run_mock_query(vec![PartialResultSet {
914            metadata: metadata(2),
915            values: vec![string_val("a"), string_val("b")],
916            last: true,
917            ..Default::default()
918        }])
919        .await;
920
921        // Called before next() -> metadata is immediately available.
922        let meta = rs.metadata().expect("metadata available");
923        assert_eq!(
924            meta.column_names(),
925            &["col0".to_string(), "col1".to_string()]
926        );
927
928        // Advance
929        let _next = rs.next().await.expect("Expected a row")?;
930
931        // Called after next() -> returns metadata
932        let meta = rs.metadata().expect("metadata available");
933        assert_eq!(
934            meta.column_names(),
935            &["col0".to_string(), "col1".to_string()]
936        );
937
938        Ok(())
939    }
940
941    #[tokio_test_no_panics]
942    async fn test_result_set_handle_partial_result_set_error() -> anyhow::Result<()> {
943        let res = run_mock_query_fallible(vec![PartialResultSet {
944            values: vec![string_val("row1")],
945            ..Default::default()
946        }])
947        .await;
948
949        assert!(res.is_err(), "Expected an error but got Ok");
950        let err_str = res.expect_err("Expected should be an error").to_string();
951        assert!(
952            err_str.contains("First PartialResultSet did not contain metadata"),
953            "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
954            err_str
955        );
956
957        Ok(())
958    }
959
960    #[tokio_test_no_panics]
961    async fn test_result_set_handle_partial_result_set_error_immediate() -> anyhow::Result<()> {
962        let res = run_mock_query_fallible(vec![
963            PartialResultSet {
964                values: vec![string_val("row1")],
965                ..Default::default()
966            },
967            PartialResultSet {
968                resume_token: b"token".to_vec(),
969                ..Default::default()
970            },
971        ])
972        .await;
973
974        assert!(res.is_err(), "Expected an error but got Ok");
975        let err_str = res.expect_err("Expected should be an error").to_string();
976        assert!(
977            err_str.contains("First PartialResultSet did not contain metadata"),
978            "Expected error to contain 'First PartialResultSet did not contain metadata', but got '{}'",
979            err_str
980        );
981
982        Ok(())
983    }
984
985    #[tokio_test_no_panics]
986    async fn test_result_set_stream_ended_with_chunked_value() -> anyhow::Result<()> {
987        let mut rs = run_mock_query(vec![PartialResultSet {
988            metadata: metadata(2),
989            values: vec![string_val("a")],
990            chunked_value: true,
991            ..Default::default()
992        }])
993        .await;
994
995        let res = rs.next().await;
996        assert!(res.is_some(), "Expected an error but got None");
997        let res = res.expect("Expected some response but got None");
998        assert!(res.is_err(), "Expected an error but got Ok");
999        let err_str = res.expect_err("Expected should be an error").to_string();
1000        assert!(
1001            err_str.contains("Stream ended with chunked_value=true"),
1002            "Expected error to contain 'Stream ended with chunked_value=true', but got '{}'",
1003            err_str
1004        );
1005
1006        Ok(())
1007    }
1008
1009    #[tokio_test_no_panics]
1010    async fn test_result_set_duplicate_metadata() -> anyhow::Result<()> {
1011        let mut rs = run_mock_query(vec![
1012            PartialResultSet {
1013                metadata: metadata(2),
1014                values: vec![string_val("a"), string_val("b")],
1015                resume_token: b"token1".to_vec(),
1016                ..Default::default()
1017            },
1018            PartialResultSet {
1019                metadata: metadata(2),
1020                values: vec![string_val("c"), string_val("d")],
1021                ..Default::default()
1022            },
1023        ])
1024        .await;
1025
1026        rs.next().await.expect("Expected a row")?;
1027
1028        let res2 = rs.next().await;
1029        assert!(res2.is_some(), "Expected an error but got None");
1030        let res2 = res2.expect("Expected some response but got None");
1031        assert!(res2.is_err(), "Expected an error but got Ok");
1032        let err_str = res2.expect_err("Expected should be an error").to_string();
1033        assert!(
1034            err_str.contains("Additional metadata after first result set"),
1035            "Expected error to contain 'Additional metadata after first result set', but got '{}'",
1036            err_str
1037        );
1038
1039        Ok(())
1040    }
1041
1042    #[tokio_test_no_panics]
1043    async fn test_result_set_empty_column_metadata() -> anyhow::Result<()> {
1044        let mut rs = run_mock_query(vec![PartialResultSet {
1045            metadata: Some(ResultSetMetadata {
1046                row_type: Some(StructType { fields: vec![] }),
1047                ..Default::default()
1048            }),
1049            values: vec![string_val("a")],
1050            ..Default::default()
1051        }])
1052        .await;
1053
1054        let res = rs.next().await;
1055        assert!(res.is_some(), "Expected an error but got None");
1056        let res = res.expect("Expected some response but got None");
1057        assert!(res.is_err(), "Expected an error but got Ok");
1058        let err_str = res.expect_err("Expected should be an error").to_string();
1059        assert!(
1060            err_str
1061                .contains("PartialResultSet contained values but no column metadata was provided"),
1062            "Expected error to contain 'PartialResultSet contained values but no column metadata was provided', but got '{}'",
1063            err_str
1064        );
1065
1066        Ok(())
1067    }
1068
1069    #[tokio_test_no_panics]
1070    async fn test_result_set_default_policies_applied() -> anyhow::Result<()> {
1071        let rs = run_mock_query(vec![PartialResultSet {
1072            metadata: metadata(2),
1073            last: true,
1074            ..Default::default()
1075        }])
1076        .await;
1077
1078        assert!(
1079            rs.gax_options.retry_policy().is_some(),
1080            "Default retry policy should be applied"
1081        );
1082        assert!(
1083            rs.gax_options.backoff_policy().is_some(),
1084            "Default backoff policy should be applied"
1085        );
1086
1087        Ok(())
1088    }
1089
1090    #[tokio_test_no_panics]
1091    async fn test_result_set_retry_read_stream() -> anyhow::Result<()> {
1092        let mut mock = MockSpanner::new();
1093        let mut seq = mockall::Sequence::new();
1094
1095        mock.expect_streaming_read()
1096            .times(1)
1097            .in_sequence(&mut seq)
1098            .returning(|_request| {
1099                let stream = adapt([
1100                    Ok(PartialResultSet {
1101                        metadata: metadata(2),
1102                        values: vec![string_val("row1"), string_val("b")],
1103                        resume_token: b"token1".to_vec(),
1104                        ..Default::default()
1105                    }),
1106                    Err(Status::unavailable("Unavailable error")),
1107                ]);
1108                Ok(Response::from(stream))
1109            });
1110
1111        mock.expect_streaming_read()
1112            .times(1)
1113            .in_sequence(&mut seq)
1114            .returning(|_request| {
1115                let stream = adapt([Ok(PartialResultSet {
1116                    values: vec![string_val("row2"), string_val("d")],
1117                    resume_token: b"token2".to_vec(),
1118                    last: true,
1119                    ..Default::default()
1120                })]);
1121                Ok(Response::from(stream))
1122            });
1123
1124        mock.expect_create_session().returning(|_| {
1125            Ok(Response::new(Session {
1126                name: "session".to_string(),
1127                multiplexed: true,
1128                ..Default::default()
1129            }))
1130        });
1131
1132        let (address, _server) = start("127.0.0.1:0", mock).await?;
1133
1134        let client: Spanner = Spanner::builder()
1135            .with_endpoint(address)
1136            .with_credentials(Anonymous::new().build())
1137            .build()
1138            .await?;
1139
1140        let db_client = client.database_client("db").build().await?;
1141        let tx = db_client.single_use().build();
1142        let mut mock_backoff = MockBackoffPolicy::new();
1143        mock_backoff
1144            .expect_on_failure()
1145            .returning(|_| Duration::from_nanos(1));
1146
1147        let read_req = crate::read::ReadRequest::builder("table", vec!["Id", "Value"])
1148            .with_keys(crate::key::KeySet::all())
1149            .with_backoff_policy(mock_backoff)
1150            .build();
1151        let mut rs: ResultSet = tx.execute_read(read_req).await?;
1152
1153        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1154        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1155
1156        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1157        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1158
1159        assert!(rs.next().await.is_none());
1160
1161        Ok(())
1162    }
1163
1164    #[tokio_test_no_panics]
1165    async fn test_result_set_custom_retry_policy() -> anyhow::Result<()> {
1166        // Extend the default retry policy to also retry on ResourceExhausted.
1167        let retry_policy = Aip194Strict.continue_on_too_many_requests();
1168
1169        let mut mock = MockSpanner::new();
1170        let mut seq = mockall::Sequence::new();
1171
1172        // Fail with RESOURCE_EXHAUSTED on first call
1173        mock.expect_streaming_read()
1174            .times(1)
1175            .in_sequence(&mut seq)
1176            .returning(|_request| {
1177                let stream = adapt([
1178                    Ok(PartialResultSet {
1179                        metadata: metadata(2),
1180                        values: vec![string_val("row1"), string_val("b")],
1181                        resume_token: b"token1".to_vec(),
1182                        ..Default::default()
1183                    }),
1184                    Err(Status::new(GrpcCode::ResourceExhausted, "Quota exceeded")),
1185                ]);
1186                Ok(Response::from(stream))
1187            });
1188
1189        // Succeed on second call
1190        mock.expect_streaming_read()
1191            .times(1)
1192            .in_sequence(&mut seq)
1193            .returning(|_request| {
1194                let stream = adapt([Ok(PartialResultSet {
1195                    values: vec![string_val("row2"), string_val("d")],
1196                    resume_token: b"token2".to_vec(),
1197                    last: true,
1198                    ..Default::default()
1199                })]);
1200                Ok(Response::from(stream))
1201            });
1202
1203        mock.expect_create_session().returning(|_| {
1204            Ok(Response::new(Session {
1205                name: "session".to_string(),
1206                multiplexed: true,
1207                ..Default::default()
1208            }))
1209        });
1210
1211        let (address, _server) = start("127.0.0.1:0", mock).await?;
1212
1213        let client: Spanner = Spanner::builder()
1214            .with_endpoint(address)
1215            .with_credentials(Anonymous::new().build())
1216            .build()
1217            .await?;
1218
1219        let db_client = client.database_client("db").build().await?;
1220        let tx = db_client.single_use().build();
1221
1222        let mut mock_backoff = MockBackoffPolicy::new();
1223        mock_backoff
1224            .expect_on_failure()
1225            .times(1)
1226            .returning(|_| Duration::from_nanos(1));
1227
1228        let read_req = ReadRequest::builder("table", vec!["Id", "Value"])
1229            .with_keys(KeySet::all())
1230            .with_retry_policy(retry_policy)
1231            .with_backoff_policy(mock_backoff)
1232            .build();
1233
1234        let mut rs: ResultSet = tx.execute_read(read_req).await?;
1235
1236        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1237        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1238
1239        // This next() call should trigger the retry because the previous stream ended with error!
1240        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1241        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1242
1243        assert!(rs.next().await.is_none());
1244
1245        Ok(())
1246    }
1247
1248    #[tokio_test_no_panics]
1249    async fn test_result_set_transport_error_retry() -> anyhow::Result<()> {
1250        let mut mock = MockSpanner::new();
1251        let mut seq = mockall::Sequence::new();
1252
1253        // Fail with transport error in the middle of stream on first call
1254        mock.expect_streaming_read()
1255            .times(1)
1256            .in_sequence(&mut seq)
1257            .returning(|_request| {
1258                let mut status = Status::unavailable("connection reset");
1259                let mut headers = std::mem::take(status.metadata_mut()).into_headers();
1260                headers.insert("content-type", http::HeaderValue::from_static("text/html"));
1261                *status.metadata_mut() = MetadataMap::from_headers(headers);
1262                let stream = adapt([
1263                    Ok(PartialResultSet {
1264                        metadata: metadata(2),
1265                        values: vec![string_val("row1"), string_val("b")],
1266                        resume_token: b"token1".to_vec(),
1267                        ..Default::default()
1268                    }),
1269                    Err(status),
1270                ]);
1271                Ok(Response::from(stream))
1272            });
1273
1274        // Succeed on second call
1275        mock.expect_streaming_read()
1276            .times(1)
1277            .in_sequence(&mut seq)
1278            .returning(|_request| {
1279                let stream = adapt([Ok(PartialResultSet {
1280                    values: vec![string_val("row2"), string_val("d")],
1281                    resume_token: b"token2".to_vec(),
1282                    last: true,
1283                    ..Default::default()
1284                })]);
1285                Ok(Response::from(stream))
1286            });
1287
1288        mock.expect_create_session().returning(|_| {
1289            Ok(Response::new(Session {
1290                name: "session".to_string(),
1291                multiplexed: true,
1292                ..Default::default()
1293            }))
1294        });
1295
1296        let (address, _server) = start("127.0.0.1:0", mock).await?;
1297
1298        let client: Spanner = Spanner::builder()
1299            .with_endpoint(address)
1300            .with_credentials(Anonymous::new().build())
1301            .build()
1302            .await?;
1303
1304        let db_client = client.database_client("db").build().await?;
1305        let tx = db_client.single_use().build();
1306
1307        let mut mock_backoff = MockBackoffPolicy::new();
1308        mock_backoff
1309            .expect_on_failure()
1310            .times(1)
1311            .returning(|_| Duration::from_nanos(1));
1312
1313        let read_req = ReadRequest::builder("table", vec!["Id", "Value"])
1314            .with_keys(KeySet::all())
1315            .with_backoff_policy(mock_backoff)
1316            .build();
1317
1318        let mut rs: ResultSet = tx.execute_read(read_req).await?;
1319
1320        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1321        assert_eq!(
1322            row1.raw_values()[0].0,
1323            string_val("row1"),
1324            "Expected row1 to be read successfully before transport error"
1325        );
1326
1327        // This next() call should trigger the retry because the previous stream ended with a transport error.
1328        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1329        assert_eq!(
1330            row2.raw_values()[0].0,
1331            string_val("row2"),
1332            "Expected stream to resume and return row2 after transport error retry"
1333        );
1334
1335        assert!(
1336            rs.next().await.is_none(),
1337            "Expected stream to end successfully"
1338        );
1339
1340        Ok(())
1341    }
1342
1343    #[tokio_test_no_panics]
1344    async fn test_result_set_one_row() {
1345        let mut rs = run_mock_query(vec![PartialResultSet {
1346            metadata: metadata(2),
1347            values: vec![string_val("a"), string_val("b")],
1348            chunked_value: false,
1349            resume_token: vec![],
1350            stats: None,
1351            precommit_token: None,
1352            last: true,
1353            cache_update: None,
1354        }])
1355        .await;
1356
1357        let row = rs.next().await.unwrap().unwrap();
1358        assert_eq!(row.raw_values().len(), 2);
1359        assert_eq!(row.raw_values()[0].0, string_val("a"));
1360        assert_eq!(row.raw_values()[1].0, string_val("b"));
1361
1362        assert!(rs.next().await.is_none());
1363    }
1364
1365    #[tokio_test_no_panics]
1366    async fn result_set_last_flag() -> anyhow::Result<()> {
1367        let mut rs = run_mock_query(vec![
1368            PartialResultSet {
1369                metadata: metadata(2),
1370                values: vec![string_val("a"), string_val("b")],
1371                last: true,
1372                ..Default::default()
1373            },
1374            // Note: This is not something that would be returned by Spanner.
1375            // Once a PartialResultSet with last == true is returned, no more
1376            // PartialResultSets will be returned by Spanner.
1377            PartialResultSet {
1378                values: vec![string_val("c"), string_val("d")],
1379                ..Default::default()
1380            },
1381        ])
1382        .await;
1383
1384        let row = rs.next().await.expect("Expected a row")?;
1385        assert_eq!(row.raw_values()[0].0, string_val("a"));
1386
1387        // Verify that the second PartialResultSet is ignored.
1388        assert!(rs.next().await.is_none());
1389
1390        Ok(())
1391    }
1392
1393    #[tokio_test_no_panics]
1394    async fn result_set_last_flag_drained_in_background() -> anyhow::Result<()> {
1395        let mut mock = MockSpanner::new();
1396        let (tx, rx) = tokio::sync::mpsc::channel(10);
1397
1398        mock.expect_execute_streaming_sql()
1399            .return_once(move |_request| Ok(Response::from(rx)));
1400
1401        mock.expect_create_session().returning(|_| {
1402            Ok(Response::new(Session {
1403                name: "session".to_string(),
1404                multiplexed: true,
1405                ..Default::default()
1406            }))
1407        });
1408
1409        let (address, _server) = start("127.0.0.1:0", mock).await?;
1410
1411        let client: Spanner = Spanner::builder()
1412            .with_endpoint(address)
1413            .with_credentials(Anonymous::new().build())
1414            .build()
1415            .await?;
1416
1417        let db_client = client.database_client("db").build().await?;
1418        let tx_single = db_client.single_use().build();
1419
1420        // 1. Send the first message with last: true
1421        tx.send(Ok(PartialResultSet {
1422            metadata: metadata(2),
1423            values: vec![string_val("a"), string_val("b")],
1424            last: true,
1425            ..Default::default()
1426        }))
1427        .await
1428        .expect("Failed to send first message");
1429
1430        let mut rs: ResultSet = tx_single.execute_query("SELECT 1").await?;
1431
1432        // 2. Consume the first message
1433        let row = rs.next().await.expect("Expected a row")?;
1434        assert_eq!(row.raw_values()[0].0, string_val("a"));
1435
1436        // 3. Call next again. It should see seen_last and return None early.
1437        assert!(rs.next().await.is_none());
1438        drop(rs);
1439
1440        // 4. Since the stream is being drained in a background task, the connection
1441        // receiver should still be alive, and therefore tx should NOT be closed yet.
1442        assert!(
1443            !tx.is_closed(),
1444            "Expected stream to remain open in background task for draining"
1445        );
1446
1447        // 5. Drop the sender to close the stream.
1448        drop(tx);
1449
1450        Ok(())
1451    }
1452
1453    #[tokio_test_no_panics]
1454    async fn result_set_last_flag_drained_in_background_on_drop() -> anyhow::Result<()> {
1455        let mut mock = MockSpanner::new();
1456        let (tx, rx) = tokio::sync::mpsc::channel(10);
1457
1458        mock.expect_execute_streaming_sql()
1459            .return_once(move |_request| Ok(Response::from(rx)));
1460
1461        mock.expect_create_session().returning(|_| {
1462            Ok(Response::new(Session {
1463                name: "session".to_string(),
1464                multiplexed: true,
1465                ..Default::default()
1466            }))
1467        });
1468
1469        let (address, _server) = start("127.0.0.1:0", mock).await?;
1470
1471        let client: Spanner = Spanner::builder()
1472            .with_endpoint(address)
1473            .with_credentials(Anonymous::new().build())
1474            .build()
1475            .await?;
1476
1477        let db_client = client.database_client("db").build().await?;
1478        let tx_single = db_client.single_use().build();
1479
1480        // 1. Send the first message with last: true
1481        tx.send(Ok(PartialResultSet {
1482            metadata: metadata(2),
1483            values: vec![string_val("a"), string_val("b")],
1484            last: true,
1485            ..Default::default()
1486        }))
1487        .await
1488        .expect("Failed to send first message");
1489
1490        let mut result_set: ResultSet = tx_single.execute_query("SELECT 1").await?;
1491
1492        // 2. Consume the first message
1493        let row = result_set.next().await.expect("Expected a row")?;
1494        assert_eq!(row.raw_values()[0].0, string_val("a"));
1495
1496        // 3. Drop result_set early (without calling next() to get None).
1497        // Since we got a message with last: true, seen_last is true.
1498        // It should spawn a background task on drop.
1499        drop(result_set);
1500
1501        // 4. Since the stream is being drained in a background task, the connection
1502        // receiver should still be alive, and therefore tx should NOT be closed yet.
1503        assert!(
1504            !tx.is_closed(),
1505            "Expected stream to remain open in background task for draining"
1506        );
1507
1508        // 5. Drop the sender to close the stream.
1509        drop(tx);
1510
1511        Ok(())
1512    }
1513
1514    #[tokio_test_no_panics]
1515    async fn result_set_last_flag_drained_in_background_on_drop_outside_runtime()
1516    -> anyhow::Result<()> {
1517        let mut mock = MockSpanner::new();
1518        let (tx, rx) = tokio::sync::mpsc::channel(10);
1519
1520        mock.expect_execute_streaming_sql()
1521            .return_once(move |_request| Ok(Response::from(rx)));
1522
1523        mock.expect_create_session().returning(|_| {
1524            Ok(Response::new(Session {
1525                name: "session".to_string(),
1526                multiplexed: true,
1527                ..Default::default()
1528            }))
1529        });
1530
1531        let (address, _server) = start("127.0.0.1:0", mock).await?;
1532
1533        let client: Spanner = Spanner::builder()
1534            .with_endpoint(address)
1535            .with_credentials(Anonymous::new().build())
1536            .build()
1537            .await?;
1538
1539        let db_client = client.database_client("db").build().await?;
1540        let tx_single = db_client.single_use().build();
1541
1542        // 1. Send the first message with last: true
1543        tx.send(Ok(PartialResultSet {
1544            metadata: metadata(2),
1545            values: vec![string_val("a"), string_val("b")],
1546            last: true,
1547            ..Default::default()
1548        }))
1549        .await
1550        .expect("Failed to send first message");
1551
1552        let mut result_set: ResultSet = tx_single.execute_query("SELECT 1").await?;
1553
1554        // 2. Consume the first message
1555        let row = result_set.next().await.expect("Expected a row")?;
1556        assert_eq!(row.raw_values()[0].0, string_val("a"));
1557
1558        // 3. Move the ResultSet to a separate non-Tokio OS thread and drop it there.
1559        std::thread::spawn(move || {
1560            drop(result_set);
1561        })
1562        .join()
1563        .expect("Thread panicked");
1564
1565        // 4. Since the stream is being drained in a background task on the captured runtime,
1566        // the connection receiver should still be alive, and therefore tx should NOT be closed yet.
1567        assert!(
1568            !tx.is_closed(),
1569            "Expected stream to remain open in background task for draining when dropped outside runtime"
1570        );
1571
1572        // 5. Drop the sender to close the stream.
1573        drop(tx);
1574
1575        Ok(())
1576    }
1577
1578    #[tokio_test_no_panics]
1579    async fn test_result_set_chunked_values_string() {
1580        let mut rs = run_mock_query(vec![
1581            PartialResultSet {
1582                metadata: metadata(1),
1583                values: vec![string_val("hello ")],
1584                chunked_value: true,
1585                resume_token: vec![],
1586                stats: None,
1587                precommit_token: None,
1588                last: false,
1589                cache_update: None,
1590            },
1591            PartialResultSet {
1592                metadata: None,
1593                values: vec![string_val("world")],
1594                chunked_value: false,
1595                resume_token: vec![],
1596                stats: None,
1597                precommit_token: None,
1598                last: true,
1599                cache_update: None,
1600            },
1601        ])
1602        .await;
1603
1604        let row = rs.next().await.unwrap().unwrap();
1605        assert_eq!(row.raw_values().len(), 1);
1606        if let Some(prost_types::value::Kind::StringValue(ref s)) = row.raw_values()[0].0.kind {
1607            assert_eq!(s, "hello world");
1608        } else {
1609            panic!("Expected StringValue");
1610        }
1611        assert!(rs.next().await.is_none());
1612    }
1613
1614    #[tokio_test_no_panics]
1615    async fn test_result_set_chunked_values_list() {
1616        let mut rs = run_mock_query(vec![
1617            PartialResultSet {
1618                metadata: metadata(1),
1619                values: vec![list_val(vec![string_val("A")])],
1620                chunked_value: true,
1621                resume_token: vec![],
1622                stats: None,
1623                precommit_token: None,
1624                last: false,
1625                cache_update: None,
1626            },
1627            PartialResultSet {
1628                metadata: None,
1629                values: vec![list_val(vec![string_val("B")])],
1630                chunked_value: false,
1631                resume_token: vec![],
1632                stats: None,
1633                precommit_token: None,
1634                last: true,
1635                cache_update: None,
1636            },
1637        ])
1638        .await;
1639
1640        let row = rs.next().await.unwrap().unwrap();
1641        assert_eq!(row.raw_values().len(), 1);
1642        if let Some(prost_types::value::Kind::ListValue(ref l)) = row.raw_values()[0].0.kind {
1643            assert_eq!(l.values.len(), 1);
1644            if let Some(prost_types::value::Kind::StringValue(ref s)) = l.values[0].kind {
1645                assert_eq!(s, "AB");
1646            } else {
1647                panic!("Expected StringValue");
1648            }
1649        } else {
1650            panic!("Expected ListValue");
1651        }
1652        assert!(rs.next().await.is_none());
1653    }
1654
1655    #[tokio_test_no_panics]
1656    async fn test_multi_response_chunking_bool_array() {
1657        fn bool_val(b: bool) -> Value {
1658            Value {
1659                kind: Some(prost_types::value::Kind::BoolValue(b)),
1660            }
1661        }
1662        fn null_val() -> Value {
1663            Value {
1664                kind: Some(prost_types::value::Kind::NullValue(0)),
1665            }
1666        }
1667
1668        let mut rs = run_mock_query(vec![
1669            PartialResultSet {
1670                metadata: metadata(1),
1671                values: vec![
1672                    list_val(vec![bool_val(true)]),
1673                    list_val(vec![bool_val(false), null_val(), bool_val(true)]),
1674                ],
1675                chunked_value: true,
1676                resume_token: vec![],
1677                stats: None,
1678                precommit_token: None,
1679                cache_update: None,
1680                last: false,
1681            },
1682            PartialResultSet {
1683                metadata: None,
1684                values: vec![list_val(vec![bool_val(true), bool_val(true)])],
1685                chunked_value: true,
1686                resume_token: vec![],
1687                stats: None,
1688                precommit_token: None,
1689                cache_update: None,
1690                last: false,
1691            },
1692            PartialResultSet {
1693                metadata: None,
1694                values: vec![
1695                    list_val(vec![null_val(), null_val(), bool_val(false)]),
1696                    list_val(vec![bool_val(true)]),
1697                ],
1698                chunked_value: false,
1699                resume_token: vec![],
1700                stats: None,
1701                precommit_token: None,
1702                cache_update: None,
1703                last: true,
1704            },
1705        ])
1706        .await;
1707
1708        let row1 = rs.next().await.unwrap().unwrap();
1709        assert_eq!(row1.raw_values()[0].0, list_val(vec![bool_val(true)]));
1710
1711        let row2 = rs.next().await.unwrap().unwrap();
1712        assert_eq!(
1713            row2.raw_values()[0].0,
1714            list_val(vec![
1715                bool_val(false),
1716                null_val(),
1717                bool_val(true),
1718                bool_val(true),
1719                bool_val(true),
1720                null_val(),
1721                null_val(),
1722                bool_val(false)
1723            ])
1724        );
1725
1726        let row3 = rs.next().await.unwrap().unwrap();
1727        assert_eq!(row3.raw_values()[0].0, list_val(vec![bool_val(true)]));
1728
1729        assert!(rs.next().await.is_none());
1730    }
1731
1732    #[tokio_test_no_panics]
1733    async fn test_multi_response_chunking_int64_array() {
1734        fn null_val() -> Value {
1735            Value {
1736                kind: Some(prost_types::value::Kind::NullValue(0)),
1737            }
1738        }
1739
1740        let mut rs = run_mock_query(vec![
1741            PartialResultSet {
1742                metadata: metadata(1),
1743                values: vec![
1744                    list_val(vec![string_val("10")]),
1745                    list_val(vec![string_val("1"), string_val("2"), null_val()]),
1746                ],
1747                chunked_value: true,
1748                resume_token: vec![],
1749                stats: None,
1750                precommit_token: None,
1751                cache_update: None,
1752                last: false,
1753            },
1754            PartialResultSet {
1755                metadata: None,
1756                values: vec![list_val(vec![null_val(), string_val("5")])],
1757                chunked_value: true,
1758                resume_token: vec![],
1759                stats: None,
1760                precommit_token: None,
1761                cache_update: None,
1762                last: false,
1763            },
1764            PartialResultSet {
1765                metadata: None,
1766                values: vec![
1767                    list_val(vec![null_val(), string_val("7"), string_val("8")]),
1768                    list_val(vec![string_val("20")]),
1769                ],
1770                chunked_value: false,
1771                resume_token: vec![],
1772                stats: None,
1773                precommit_token: None,
1774                cache_update: None,
1775                last: true,
1776            },
1777        ])
1778        .await;
1779
1780        let row1 = rs.next().await.unwrap().unwrap();
1781        assert_eq!(row1.raw_values()[0].0, list_val(vec![string_val("10")]));
1782
1783        let row2 = rs.next().await.unwrap().unwrap();
1784        assert_eq!(
1785            row2.raw_values()[0].0,
1786            list_val(vec![
1787                string_val("1"),
1788                string_val("2"),
1789                null_val(),
1790                null_val(),
1791                string_val("5"),
1792                null_val(),
1793                string_val("7"),
1794                string_val("8")
1795            ])
1796        );
1797
1798        let row3 = rs.next().await.unwrap().unwrap();
1799        assert_eq!(row3.raw_values()[0].0, list_val(vec![string_val("20")]));
1800
1801        assert!(rs.next().await.is_none());
1802    }
1803
1804    #[tokio_test_no_panics]
1805    async fn test_result_set_precommit_token_tracked() -> anyhow::Result<()> {
1806        let token = MultiplexedSessionPrecommitToken {
1807            precommit_token: b"test_token".to_vec(),
1808            seq_num: 99,
1809        };
1810        let results = vec![PartialResultSet {
1811            metadata: metadata(1),
1812            precommit_token: Some(token.clone()),
1813            last: true,
1814            ..Default::default()
1815        }];
1816
1817        let mut mock = MockSpanner::new();
1818        let rx = adapt(results.into_iter().map(Ok));
1819        mock.expect_execute_streaming_sql()
1820            .return_once(move |_request| Ok(Response::from(rx)));
1821
1822        mock.expect_create_session().returning(|_| {
1823            Ok(Response::new(Session {
1824                name: "session".to_string(),
1825                multiplexed: true,
1826                ..Default::default()
1827            }))
1828        });
1829
1830        let (address, _server) = start("127.0.0.1:0", mock)
1831            .await
1832            .expect("Failed to start mock server");
1833
1834        let client: Spanner = Spanner::builder()
1835            .with_endpoint(address)
1836            .with_credentials(Anonymous::new().build())
1837            .build()
1838            .await
1839            .expect("Failed to build client");
1840
1841        let db_client: crate::database_client::DatabaseClient =
1842            client.database_client("db").build().await.unwrap();
1843
1844        let tracker = PrecommitTokenTracker::new();
1845
1846        let req = crate::model::ExecuteSqlRequest::default()
1847            .set_session("session".to_string())
1848            .set_sql("SELECT 1".to_string());
1849
1850        let stream = db_client
1851            .spanner
1852            .execute_streaming_sql(req.clone(), GaxRequestOptions::default(), 0)
1853            .send()
1854            .await?;
1855
1856        let mut rs = ResultSet::create(ResultSetParams {
1857            stream,
1858            transaction_selector: None,
1859            precommit_token_tracker: tracker.clone(),
1860            client: db_client,
1861            session_name: "session".to_string(),
1862            transaction_tag: None,
1863            operation: StreamOperation::Query(req),
1864            channel_hint: 0,
1865            gax_options: GaxRequestOptions::default(),
1866        })
1867        .await?;
1868
1869        // Read a row to trigger precommit token extraction
1870        assert!(
1871            rs.next().await.is_none(),
1872            "Expected no rows, but received one"
1873        );
1874
1875        // Validate the tracker correctly intercepted and preserved the token
1876        let tracked_token = tracker.get().expect("token should be tracked");
1877        assert_eq!(tracked_token.seq_num, 99);
1878        assert_eq!(
1879            tracked_token.precommit_token,
1880            bytes::Bytes::from("test_token")
1881        );
1882
1883        Ok(())
1884    }
1885
1886    #[tokio_test_no_panics]
1887    async fn test_result_set_retry_simple() -> anyhow::Result<()> {
1888        let mut mock = MockSpanner::new();
1889        let mut seq = mockall::Sequence::new();
1890
1891        mock.expect_execute_streaming_sql()
1892            .times(1)
1893            .in_sequence(&mut seq)
1894            .returning(|_request| {
1895                let stream = adapt([
1896                    Ok(PartialResultSet {
1897                        metadata: metadata(1),
1898                        values: vec![string_val("row1")],
1899                        resume_token: b"token1".to_vec(),
1900                        ..Default::default()
1901                    }),
1902                    Err(Status::unavailable("Transient error")),
1903                ]);
1904                Ok(Response::from(stream))
1905            });
1906
1907        mock.expect_execute_streaming_sql()
1908            .times(1)
1909            .in_sequence(&mut seq)
1910            .returning(|_request| {
1911                let stream = adapt([Ok(PartialResultSet {
1912                    values: vec![string_val("row2")],
1913                    resume_token: b"token2".to_vec(),
1914                    last: true,
1915                    ..Default::default()
1916                })]);
1917                Ok(Response::from(stream))
1918            });
1919
1920        mock.expect_create_session().returning(|_| {
1921            Ok(Response::new(Session {
1922                name: "session".to_string(),
1923                multiplexed: true,
1924                ..Default::default()
1925            }))
1926        });
1927
1928        let (address, _server) = start("127.0.0.1:0", mock).await?;
1929
1930        let client: Spanner = Spanner::builder()
1931            .with_endpoint(address)
1932            .with_credentials(Anonymous::new().build())
1933            .build()
1934            .await?;
1935
1936        let db_client = client.database_client("db").build().await?;
1937        let tx = db_client.single_use().build();
1938        let mut mock_backoff = MockBackoffPolicy::new();
1939        mock_backoff
1940            .expect_on_failure()
1941            .returning(|_| Duration::from_nanos(1));
1942
1943        let stmt = Statement::builder("SELECT 1")
1944            .with_backoff_policy(mock_backoff)
1945            .build();
1946        let mut rs = tx.execute_query(stmt).await?;
1947
1948        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1949        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1950
1951        let row2 = rs.next().await.expect("Stream ended unexpectedly")?;
1952        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
1953
1954        assert!(rs.next().await.is_none());
1955
1956        Ok(())
1957    }
1958
1959    #[tokio_test_no_panics]
1960    async fn test_result_set_retry_non_retriable_error() -> anyhow::Result<()> {
1961        let mut mock = MockSpanner::new();
1962        mock.expect_execute_streaming_sql()
1963            .times(1)
1964            .returning(|_request| {
1965                let stream = adapt([
1966                    Ok(PartialResultSet {
1967                        metadata: metadata(1),
1968                        values: vec![string_val("row1")],
1969                        resume_token: b"token1".to_vec(),
1970                        ..Default::default()
1971                    }),
1972                    Err(Status::invalid_argument("Non-retriable error")),
1973                ]);
1974                Ok(Response::from(stream))
1975            });
1976
1977        mock.expect_create_session().returning(|_| {
1978            Ok(Response::new(Session {
1979                name: "session".to_string(),
1980                multiplexed: true,
1981                ..Default::default()
1982            }))
1983        });
1984
1985        let (address, _server) = start("127.0.0.1:0", mock).await?;
1986
1987        let client: Spanner = Spanner::builder()
1988            .with_endpoint(address)
1989            .with_credentials(Anonymous::new().build())
1990            .build()
1991            .await?;
1992
1993        let db_client = client.database_client("db").build().await?;
1994        let tx = db_client.single_use().build();
1995        let mut rs = tx.execute_query("SELECT 1").await?;
1996
1997        let row1 = rs.next().await.expect("Stream ended unexpectedly")?;
1998        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
1999
2000        let res = rs.next().await;
2001        assert!(res.is_some(), "Expected an error but got None");
2002        let res = res.expect("Expected some response but got None");
2003        assert!(res.is_err(), "Expected an error but got Ok");
2004        let err_str = res.expect_err("Expected should be an error").to_string();
2005        assert!(
2006            err_str.contains("Non-retriable error"),
2007            "Expected error to contain 'Non-retriable error', but got '{}'",
2008            err_str
2009        );
2010
2011        Ok(())
2012    }
2013
2014    #[tokio_test_no_panics]
2015    async fn test_result_set_buffer_overflow() -> anyhow::Result<()> {
2016        let mut mock = MockSpanner::new();
2017        let (tx_msg, rx_msg) = tokio::sync::mpsc::channel(10);
2018        mock.expect_execute_streaming_sql()
2019            // Should only be called once, as it is not retried due to missing resume tokens.
2020            .times(1)
2021            .return_once(move |_request| Ok(Response::from(rx_msg)));
2022
2023        mock.expect_create_session().returning(|_| {
2024            Ok(Response::new(Session {
2025                name: "session".to_string(),
2026                multiplexed: true,
2027                ..Default::default()
2028            }))
2029        });
2030
2031        let (address, _server) = start("127.0.0.1:0", mock).await?;
2032
2033        let client: Spanner = Spanner::builder()
2034            .with_endpoint(address)
2035            .with_credentials(Anonymous::new().build())
2036            .build()
2037            .await?;
2038
2039        let db_client = client.database_client("db").build().await?;
2040        let tx = db_client.single_use().build();
2041
2042        tx_msg
2043            .send(Ok(PartialResultSet {
2044                metadata: metadata(1),
2045                values: vec![string_val("row1")],
2046                resume_token: b"token1".to_vec(),
2047                ..Default::default()
2048            }))
2049            .await?;
2050
2051        let mut rs = tx.execute_query("SELECT 1").await?;
2052
2053        // Set max buffer size to 2
2054        rs.set_max_buffered_partial_result_sets(2);
2055
2056        tx_msg
2057            .send(Ok(PartialResultSet {
2058                values: vec![string_val("row2")],
2059                ..Default::default()
2060            }))
2061            .await?;
2062        tx_msg
2063            .send(Ok(PartialResultSet {
2064                values: vec![string_val("row3")],
2065                ..Default::default()
2066            }))
2067            .await?;
2068        tx_msg
2069            .send(Ok(PartialResultSet {
2070                values: vec![string_val("row4")],
2071                ..Default::default()
2072            }))
2073            .await?;
2074        tx_msg
2075            .send(Err(Status::unavailable("Unavailable error")))
2076            .await?;
2077
2078        // Read row 1.
2079        let row1 = rs.next().await.expect("Expected row1")?;
2080        assert_eq!(row1.raw_values()[0].0, string_val("row1"));
2081
2082        // Read row 2 (flushed when row4 overflows buffer).
2083        let row2 = rs.next().await.expect("Expected row2")?;
2084        assert_eq!(row2.raw_values()[0].0, string_val("row2"));
2085
2086        // Try to read next row. As the buffer is now full/unsafe, it will not retry and return the error.
2087        let res = rs.next().await;
2088        assert!(res.is_some(), "Expected an error but got None");
2089        let res = res.expect("Expected some response but got None");
2090        assert!(res.is_err(), "Expected an error but got Ok");
2091        let err_str = res.expect_err("Expected should be an error").to_string();
2092        assert!(
2093            err_str.contains("Unavailable error"),
2094            "Expected error to contain 'Unavailable error', but got '{}'",
2095            err_str
2096        );
2097
2098        Ok(())
2099    }
2100
2101    #[tokio_test_no_panics]
2102    async fn test_result_set_retry_missing_resume_token_safe() -> anyhow::Result<()> {
2103        let mut mock = MockSpanner::new();
2104        let mut seq = mockall::Sequence::new();
2105
2106        mock.expect_execute_streaming_sql()
2107            .times(1)
2108            .in_sequence(&mut seq)
2109            .returning(|_request| {
2110                let stream = adapt([
2111                    Ok(PartialResultSet {
2112                        metadata: metadata(1),
2113                        values: vec![string_val("row1")],
2114                        // no resume token
2115                        ..Default::default()
2116                    }),
2117                    Err(Status::unavailable("Unavailable error")),
2118                ]);
2119                Ok(Response::from(stream))
2120            });
2121
2122        mock.expect_execute_streaming_sql()
2123            .times(1)
2124            .in_sequence(&mut seq)
2125            .returning(|_request| {
2126                let stream = adapt([Ok(PartialResultSet {
2127                    metadata: metadata(1),
2128                    values: vec![string_val("row1_retry")],
2129                    resume_token: b"token_retry".to_vec(),
2130                    ..Default::default()
2131                })]);
2132                Ok(Response::from(stream))
2133            });
2134
2135        mock.expect_create_session().returning(|_| {
2136            Ok(Response::new(Session {
2137                name: "session".to_string(),
2138                multiplexed: true,
2139                ..Default::default()
2140            }))
2141        });
2142
2143        let (address, _server) = start("127.0.0.1:0", mock).await?;
2144
2145        let client: Spanner = Spanner::builder()
2146            .with_endpoint(address)
2147            .with_credentials(Anonymous::new().build())
2148            .build()
2149            .await?;
2150
2151        let db_client = client.database_client("db").build().await?;
2152        let tx = db_client.single_use().build();
2153        let mut mock_backoff = MockBackoffPolicy::new();
2154        mock_backoff
2155            .expect_on_failure()
2156            .returning(|_| Duration::from_nanos(1));
2157
2158        let stmt = Statement::builder("SELECT 1")
2159            .with_backoff_policy(mock_backoff)
2160            .build();
2161        let mut rs = tx.execute_query(stmt).await?;
2162
2163        let row1 = rs.next().await.expect("Expected row1")?;
2164        assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
2165
2166        Ok(())
2167    }
2168
2169    #[tokio_test_no_panics]
2170    async fn test_result_set_retry_under_limit_no_resume_token() -> anyhow::Result<()> {
2171        let mut mock = MockSpanner::new();
2172        let mut seq = mockall::Sequence::new();
2173
2174        // First stream: 2 messages without resume token, then Error.
2175        mock.expect_execute_streaming_sql()
2176            .times(1)
2177            .in_sequence(&mut seq)
2178            .returning(|_request| {
2179                let stream = adapt([
2180                    Ok(PartialResultSet {
2181                        metadata: metadata(1),
2182                        values: vec![string_val("row1")],
2183                        ..Default::default()
2184                    }),
2185                    Ok(PartialResultSet {
2186                        values: vec![string_val("row2")],
2187                        ..Default::default()
2188                    }),
2189                    Err(Status::unavailable("Unavailable error")),
2190                ]);
2191                Ok(Response::from(stream))
2192            });
2193
2194        // Second stream: Retried from the start as the initial stream
2195        // returned Unavailable before the buffer was full.
2196        mock.expect_execute_streaming_sql()
2197            .times(1)
2198            .in_sequence(&mut seq)
2199            .returning(|request| {
2200                assert!(
2201                    request.get_ref().resume_token.is_empty(),
2202                    "Expected empty resume token for retry"
2203                );
2204                let stream = adapt([Ok(PartialResultSet {
2205                    metadata: metadata(1),
2206                    values: vec![string_val("row1_retry")],
2207                    resume_token: b"token_retry".to_vec(),
2208                    ..Default::default()
2209                })]);
2210                Ok(Response::from(stream))
2211            });
2212
2213        mock.expect_create_session().returning(|_| {
2214            Ok(Response::new(Session {
2215                name: "session".to_string(),
2216                multiplexed: true,
2217                ..Default::default()
2218            }))
2219        });
2220
2221        let (address, _server) = start("127.0.0.1:0", mock).await?;
2222
2223        let client: Spanner = Spanner::builder()
2224            .with_endpoint(address)
2225            .with_credentials(Anonymous::new().build())
2226            .build()
2227            .await?;
2228
2229        let db_client = client.database_client("db").build().await?;
2230        let tx = db_client.single_use().build();
2231        let mut mock_backoff = MockBackoffPolicy::new();
2232        mock_backoff
2233            .expect_on_failure()
2234            .returning(|_| Duration::from_nanos(1));
2235
2236        let stmt = Statement::builder("SELECT 1")
2237            .with_backoff_policy(mock_backoff)
2238            .build();
2239        let mut rs = tx.execute_query(stmt).await?;
2240
2241        // Set max buffer size to 3 (so 2 messages is under the limit)
2242        rs.set_max_buffered_partial_result_sets(3);
2243
2244        // Read row 1.
2245        // It reads row1, row2, and then the error from the first stream.
2246        // Since it is less than the buffer size, it retries without a resume token.
2247        // The retry stream returns "row1_retry".
2248        let row1 = rs.next().await.expect("Expected row1")?;
2249        assert_eq!(row1.raw_values()[0].0, string_val("row1_retry"));
2250
2251        Ok(())
2252    }
2253
2254    #[tokio_test_no_panics]
2255    async fn test_result_set_retry_limit_exceeded() -> anyhow::Result<()> {
2256        let mut mock = MockSpanner::new();
2257
2258        mock.expect_execute_streaming_sql()
2259            .times(11) // 1 initial + 10 retries
2260            .returning(|_request| {
2261                let stream = adapt([Err(Status::unavailable("Unavailable error"))]);
2262                Ok(Response::from(stream))
2263            });
2264
2265        mock.expect_create_session().returning(|_| {
2266            Ok(Response::new(Session {
2267                name: "session".to_string(),
2268                multiplexed: true,
2269                ..Default::default()
2270            }))
2271        });
2272
2273        let (address, _server) = start("127.0.0.1:0", mock).await?;
2274
2275        let client: Spanner = Spanner::builder()
2276            .with_endpoint(address)
2277            .with_credentials(Anonymous::new().build())
2278            .build()
2279            .await?;
2280
2281        let db_client = client.database_client("db").build().await?;
2282        let tx = db_client.single_use().build();
2283        let mut mock_backoff = MockBackoffPolicy::new();
2284        mock_backoff
2285            .expect_on_failure()
2286            .times(10)
2287            .returning(|_| Duration::from_nanos(1));
2288
2289        let stmt = Statement::builder("SELECT 1")
2290            .with_backoff_policy(mock_backoff)
2291            .build();
2292        let res = tx.execute_query(stmt).await;
2293
2294        assert!(res.is_err(), "Expected an error but got Ok");
2295        let err_str = res.expect_err("Expected should be an error").to_string();
2296        assert!(
2297            err_str.contains("Unavailable error"),
2298            "Expected error to contain 'Unavailable error', but got '{}'",
2299            err_str
2300        );
2301
2302        Ok(())
2303    }
2304
2305    #[tokio_test_no_panics]
2306    async fn result_set_inline_begin_stream_error_fallback() -> anyhow::Result<()> {
2307        let mut mock = MockSpanner::new();
2308        let mut seq = mockall::Sequence::new();
2309
2310        // 1. Stream yields an error on the first chunk before returning transaction metadata.
2311        // E.g., INVALID_ARGUMENT because the query is malformed.
2312        mock.expect_execute_streaming_sql()
2313            .times(1)
2314            .in_sequence(&mut seq)
2315            .returning(|_request| {
2316                let stream = adapt([Err(Status::invalid_argument("Invalid query"))]);
2317                Ok(Response::from(stream))
2318            });
2319
2320        // 2. The explicit BeginTransaction fallback gets triggered.
2321        mock.expect_begin_transaction()
2322            .times(1)
2323            .in_sequence(&mut seq)
2324            .returning(|_| {
2325                Ok(Response::new(spanner_v1::Transaction {
2326                    id: vec![7, 8, 9],
2327                    read_timestamp: Some(prost_types::Timestamp {
2328                        seconds: 123456789,
2329                        nanos: 0,
2330                    }),
2331                    ..Default::default()
2332                }))
2333            });
2334
2335        // 3. The ResultSet gracefully restarts the stream using the transaction ID returned by BeginTransaction.
2336        mock.expect_execute_streaming_sql()
2337            .times(1)
2338            .in_sequence(&mut seq)
2339            .returning(|req| {
2340                let req = req.into_inner();
2341                // Ensure the explicitly yielded ID is routed into the new stream transaction selector
2342                match req.transaction.unwrap().selector.unwrap() {
2343                    spanner_v1::transaction_selector::Selector::Id(id) => {
2344                        assert_eq!(id, vec![7, 8, 9]);
2345                    }
2346                    _ => panic!("Expected Selector::Id"),
2347                }
2348
2349                let stream = adapt([Ok(PartialResultSet {
2350                    metadata: metadata(1),
2351                    values: vec![string_val("1")],
2352                    ..Default::default()
2353                })]);
2354                Ok(Response::from(stream))
2355            });
2356
2357        mock.expect_create_session().returning(|_| {
2358            Ok(Response::new(Session {
2359                name: "session".to_string(),
2360                multiplexed: true,
2361                ..Default::default()
2362            }))
2363        });
2364
2365        let (address, _server) = start("127.0.0.1:0", mock).await?;
2366
2367        let client: Spanner = Spanner::builder()
2368            .with_endpoint(address)
2369            .with_credentials(Anonymous::new().build())
2370            .build()
2371            .await?;
2372
2373        let db_client = client.database_client("db").build().await?;
2374
2375        let tx = db_client
2376            .read_only_transaction()
2377            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2378            .build()
2379            .await?;
2380        let mut rs = tx.execute_query("SELECT 1").await?;
2381
2382        let row1 = rs.next().await.ok_or_else(|| {
2383            anyhow::anyhow!("Expected row returned successfully despite stream breaking")
2384        })??;
2385        assert_eq!(
2386            row1.raw_values()[0].0,
2387            string_val("1"),
2388            "Verify the returned stream successfully resumed with the correct payload"
2389        );
2390
2391        Ok(())
2392    }
2393
2394    #[tokio_test_no_panics]
2395    async fn result_set_retry_inline_begin_transient_error() -> anyhow::Result<()> {
2396        let mut mock = MockSpanner::new();
2397        let mut seq = mockall::Sequence::new();
2398
2399        // 1. Initial stream throws UNAVAILABLE before metadata.
2400        mock.expect_execute_streaming_sql()
2401            .times(1)
2402            .in_sequence(&mut seq)
2403            .returning(|_request| {
2404                let stream = adapt([Err(Status::unavailable("Transient network issue"))]);
2405                Ok(Response::from(stream))
2406            });
2407
2408        // 2. We retry the stream since it was a transient error.
2409        // The retry should use the same transaction selector as the original request.
2410        mock.expect_execute_streaming_sql()
2411            .times(1)
2412            .in_sequence(&mut seq)
2413            .returning(|req| {
2414                let req = req.into_inner();
2415                match req.transaction.unwrap().selector.unwrap() {
2416                    spanner_v1::transaction_selector::Selector::Begin(_) => {}
2417                    _ => panic!("Expected Selector::Begin on stream retry"),
2418                }
2419
2420                let mut meta = metadata(1).unwrap();
2421                meta.transaction = Some(spanner_v1::Transaction {
2422                    id: vec![7, 8, 9],
2423                    read_timestamp: None,
2424                    ..Default::default()
2425                });
2426
2427                let stream = adapt([Ok(PartialResultSet {
2428                    metadata: Some(meta),
2429                    values: vec![string_val("1")],
2430                    ..Default::default()
2431                })]);
2432                Ok(Response::from(stream))
2433            });
2434
2435        mock.expect_create_session().returning(|_| {
2436            Ok(Response::new(Session {
2437                name: "session".to_string(),
2438                multiplexed: true,
2439                ..Default::default()
2440            }))
2441        });
2442
2443        let (address, _server) = start("127.0.0.1:0", mock).await?;
2444
2445        let client: Spanner = Spanner::builder()
2446            .with_endpoint(address)
2447            .with_credentials(Anonymous::new().build())
2448            .build()
2449            .await?;
2450
2451        let db_client = client.database_client("db").build().await?;
2452
2453        let tx = db_client
2454            .read_only_transaction()
2455            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2456            .build()
2457            .await?;
2458        let mut rs = tx.execute_query("SELECT 1").await?;
2459
2460        let row1 = rs
2461            .next()
2462            .await
2463            .ok_or_else(|| anyhow::anyhow!("Expected stream to recover safely"))??;
2464        assert_eq!(
2465            row1.raw_values()[0].0,
2466            string_val("1"),
2467            "Verify resumed stream returns data"
2468        );
2469
2470        Ok(())
2471    }
2472
2473    #[tokio_test_no_panics]
2474    async fn result_set_retry_inline_begin_id_recovered() -> anyhow::Result<()> {
2475        let mut mock = MockSpanner::new();
2476        let mut seq = mockall::Sequence::new();
2477
2478        // 1. Stream successfully returns metadata chunk then throws UNAVAILABLE on chunk 2.
2479        mock.expect_execute_streaming_sql()
2480            .times(1)
2481            .in_sequence(&mut seq)
2482            .returning(|_request| {
2483                let mut meta = metadata(1).unwrap();
2484                meta.transaction = Some(spanner_v1::Transaction {
2485                    id: vec![7, 8, 9],
2486                    read_timestamp: None,
2487                    ..Default::default()
2488                });
2489                let stream = adapt([
2490                    Ok(PartialResultSet {
2491                        metadata: Some(meta),
2492                        values: vec![string_val("1")],
2493                        resume_token: b"token1".to_vec(),
2494                        ..Default::default()
2495                    }),
2496                    Err(Status::unavailable("Transient mid-stream network issue")),
2497                ]);
2498                Ok(Response::from(stream))
2499            });
2500
2501        // 2. Stream resumes using Selector::Id.
2502        mock.expect_execute_streaming_sql()
2503            .times(1)
2504            .in_sequence(&mut seq)
2505            .returning(|req| {
2506                let req = req.into_inner();
2507                match req.transaction.unwrap().selector.unwrap() {
2508                    spanner_v1::transaction_selector::Selector::Id(id) => {
2509                        assert_eq!(id, vec![7, 8, 9]);
2510                    }
2511                    _ => panic!("Expected Selector::Id on stream retry"),
2512                }
2513
2514                let stream = adapt([Ok(PartialResultSet {
2515                    values: vec![string_val("2")],
2516                    ..Default::default()
2517                })]);
2518                Ok(Response::from(stream))
2519            });
2520
2521        mock.expect_create_session().returning(|_| {
2522            Ok(Response::new(Session {
2523                name: "session".to_string(),
2524                multiplexed: true,
2525                ..Default::default()
2526            }))
2527        });
2528
2529        let (address, _server) = start("127.0.0.1:0", mock).await?;
2530
2531        let client: Spanner = Spanner::builder()
2532            .with_endpoint(address)
2533            .with_credentials(Anonymous::new().build())
2534            .build()
2535            .await?;
2536
2537        let db_client = client.database_client("db").build().await?;
2538
2539        let tx = db_client
2540            .read_only_transaction()
2541            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2542            .build()
2543            .await?;
2544        let mut rs = tx.execute_query("SELECT 1").await?;
2545
2546        let row1 = rs
2547            .next()
2548            .await
2549            .ok_or_else(|| anyhow::anyhow!("Expected stream row1 extracted"))??;
2550        assert_eq!(
2551            row1.raw_values()[0].0,
2552            string_val("1"),
2553            "Verified chunk 1 payload"
2554        );
2555        let row2 = rs
2556            .next()
2557            .await
2558            .ok_or_else(|| anyhow::anyhow!("Expected stream row2 recovered"))??;
2559        assert_eq!(
2560            row2.raw_values()[0].0,
2561            string_val("2"),
2562            "Verified chunk 2 reboot dynamically intercepted ID bounds correctly"
2563        );
2564
2565        Ok(())
2566    }
2567
2568    #[tokio_test_no_panics]
2569    async fn result_set_inline_begin_metadata_missing_transaction_fails() -> anyhow::Result<()> {
2570        let mut mock = MockSpanner::new();
2571        let mut seq = mockall::Sequence::new();
2572
2573        // 1. Initial stream successfully returns metadata chunk but completely lacks the `Transaction` entity.
2574        mock.expect_execute_streaming_sql()
2575            .times(1)
2576            .in_sequence(&mut seq)
2577            .returning(|_request| {
2578                let stream = adapt([Ok(PartialResultSet {
2579                    metadata: metadata(1), // Missing `.transaction` natively
2580                    values: vec![string_val("1")],
2581                    ..Default::default()
2582                })]);
2583                Ok(Response::from(stream))
2584            });
2585
2586        mock.expect_create_session().returning(|_| {
2587            Ok(Response::new(Session {
2588                name: "session".to_string(),
2589                multiplexed: true,
2590                ..Default::default()
2591            }))
2592        });
2593
2594        let (address, _server) = start("127.0.0.1:0", mock).await?;
2595
2596        let client: Spanner = Spanner::builder()
2597            .with_endpoint(address)
2598            .with_credentials(Anonymous::new().build())
2599            .build()
2600            .await?;
2601
2602        let db_client = client.database_client("db").build().await?;
2603
2604        // Use explicitly deferred Lazy begin transaction!
2605        let tx = db_client
2606            .read_only_transaction()
2607            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2608            .build()
2609            .await?;
2610        let err = tx
2611            .execute_query("SELECT 1")
2612            .await
2613            .expect_err("Expected eager validation error");
2614        assert!(
2615            err.to_string()
2616                .contains("failed to return a transaction ID"),
2617            "Caught implicit gap boundary: {}",
2618            err
2619        );
2620
2621        Ok(())
2622    }
2623
2624    #[tokio_test_no_panics]
2625    async fn result_set_stats() -> anyhow::Result<()> {
2626        let mock_stats = spanner_v1::ResultSetStats {
2627            query_plan: Some(spanner_v1::QueryPlan::default()),
2628            ..Default::default()
2629        };
2630
2631        let mut rs = run_mock_query(vec![PartialResultSet {
2632            metadata: metadata(2),
2633            values: vec![string_val("a"), string_val("b")],
2634            last: true,
2635            stats: Some(mock_stats),
2636            ..Default::default()
2637        }])
2638        .await;
2639
2640        rs.next().await.transpose()?;
2641
2642        let received_stats = rs.stats().expect("stats should be available");
2643        assert!(received_stats.query_plan.is_some());
2644
2645        Ok(())
2646    }
2647
2648    #[tokio_test_no_panics]
2649    async fn result_set_update_count() -> anyhow::Result<()> {
2650        let mock_stats = spanner_v1::ResultSetStats {
2651            row_count: Some(RowCount::RowCountExact(42_i64)),
2652            ..Default::default()
2653        };
2654
2655        let mut result_set = run_mock_query(vec![PartialResultSet {
2656            metadata: metadata(2),
2657            values: vec![string_val("a"), string_val("b")],
2658            last: true,
2659            stats: Some(mock_stats),
2660            ..Default::default()
2661        }])
2662        .await;
2663
2664        result_set.next().await.transpose()?;
2665
2666        let update_count = result_set
2667            .update_count()
2668            .expect("Expected update count to be populated");
2669        assert_eq!(update_count, 42, "Expected exactly 42 rows updated");
2670
2671        Ok(())
2672    }
2673
2674    #[tokio_test_no_panics]
2675    async fn result_set_duplicate_stats() -> anyhow::Result<()> {
2676        let mock_stats = spanner_v1::ResultSetStats {
2677            query_plan: Some(spanner_v1::QueryPlan::default()),
2678            ..Default::default()
2679        };
2680
2681        let mut rs = run_mock_query(vec![
2682            PartialResultSet {
2683                metadata: metadata(2),
2684                values: vec![string_val("a"), string_val("b")],
2685                stats: Some(mock_stats.clone()),
2686                resume_token: b"token1".to_vec(),
2687                ..Default::default()
2688            },
2689            PartialResultSet {
2690                values: vec![string_val("c"), string_val("d")],
2691                stats: Some(mock_stats),
2692                last: true,
2693                resume_token: b"token2".to_vec(),
2694                ..Default::default()
2695            },
2696        ])
2697        .await;
2698
2699        // First row should be processed and returned successfully
2700        let next = rs.next().await;
2701        assert!(next.is_some());
2702        assert!(next.expect("should yield a row").is_ok());
2703
2704        // Second call should process the second message and fail due to duplicate stats
2705        let res2 = rs.next().await;
2706        assert!(res2.is_some());
2707        let res2 = res2.expect("should yield an error");
2708        assert!(res2.is_err());
2709        let err_str = res2.expect_err("should be an error").to_string();
2710        assert!(err_str.contains("Additional stats received after first"));
2711
2712        Ok(())
2713    }
2714
2715    #[tokio_test_no_panics]
2716    async fn test_lazy_begin_deadlock_fixed() -> anyhow::Result<()> {
2717        let mut mock = MockSpanner::new();
2718        let mut seq = mockall::Sequence::new();
2719
2720        // Setup mock to return metadata with transaction ID on first query.
2721        mock.expect_execute_streaming_sql()
2722            .times(1)
2723            .in_sequence(&mut seq)
2724            .returning(|_request| {
2725                let mut meta = metadata(1).expect("failed to create metadata");
2726                meta.transaction = Some(spanner_v1::Transaction {
2727                    id: b"lazy_tx_id".to_vec(),
2728                    ..Default::default()
2729                });
2730                let rx = adapt(
2731                    vec![Ok(PartialResultSet {
2732                        metadata: Some(meta),
2733                        values: vec![string_val("1")],
2734                        ..Default::default()
2735                    })]
2736                    .into_iter(),
2737                );
2738                Ok(Response::from(rx))
2739            });
2740
2741        // Mock call for second query which must carry the returned transaction ID
2742        mock.expect_execute_streaming_sql()
2743            .times(1)
2744            .in_sequence(&mut seq)
2745            .returning(|req| {
2746                let req = req.into_inner();
2747                let selector = req
2748                    .transaction
2749                    .expect("missing transaction component")
2750                    .selector
2751                    .expect("missing selector component");
2752
2753                match selector {
2754                    spanner_v1::transaction_selector::Selector::Id(id) => {
2755                        assert_eq!(id, b"lazy_tx_id".to_vec());
2756                    }
2757                    _ => panic!("Expected Selector::Id"),
2758                }
2759
2760                let rx = adapt(
2761                    vec![Ok(PartialResultSet {
2762                        metadata: metadata(1),
2763                        values: vec![string_val("2")],
2764                        ..Default::default()
2765                    })]
2766                    .into_iter(),
2767                );
2768                Ok(Response::from(rx))
2769            });
2770
2771        mock.expect_create_session().returning(|_| {
2772            Ok(Response::new(Session {
2773                name: "session".to_string(),
2774                multiplexed: true,
2775                ..Default::default()
2776            }))
2777        });
2778
2779        let (address, _server) = start("127.0.0.1:0", mock).await?;
2780
2781        let client: Spanner = Spanner::builder()
2782            .with_endpoint(address)
2783            .with_credentials(Anonymous::new().build())
2784            .build()
2785            .await?;
2786
2787        let db_client = client.database_client("db").build().await?;
2788
2789        // Use inline begin transaction
2790        let tx = db_client
2791            .read_only_transaction()
2792            .with_begin_transaction_option(BeginTransactionOption::InlineBegin)
2793            .build()
2794            .await?;
2795
2796        // Execute query but DO NOT call rs.next()
2797        let _rs = tx.execute_query("SELECT 1").await?;
2798
2799        // Execute second query against same transaction
2800        let mut rs2 = tx.execute_query("SELECT 2").await?;
2801
2802        // Assert it does not hang and yielded elements properly
2803        let row2 = rs2.next().await;
2804        assert!(
2805            row2.is_some(),
2806            "Implicit deadlock encountered; query 2 stalled"
2807        );
2808
2809        Ok(())
2810    }
2811
2812    #[tokio_test_no_panics]
2813    async fn test_result_set_metadata_not_available() {
2814        // Test our explicit safeguard for when Spanner violates the API contract and returns a first PartialResultSet without metadata.
2815        let res = run_mock_query_fallible(vec![PartialResultSet {
2816            metadata: None,
2817            values: vec![string_val("1")],
2818            ..Default::default()
2819        }])
2820        .await;
2821
2822        let err = res.expect_err("Expected query initialization to fail eagerly");
2823        assert!(
2824            err.to_string()
2825                .contains("First PartialResultSet did not contain metadata"),
2826            "Expected missing metadata safeguard error, got: {}",
2827            err
2828        );
2829    }
2830
2831    #[tokio_test_no_panics]
2832    async fn test_result_set_metadata_available_before_next() -> anyhow::Result<()> {
2833        let mut mock = MockSpanner::new();
2834
2835        // Setup mock to return metadata in first chunk.
2836        mock.expect_execute_streaming_sql().returning(|_request| {
2837            let rx = adapt(
2838                vec![Ok(PartialResultSet {
2839                    metadata: metadata(1),
2840                    values: vec![string_val("1")],
2841                    ..Default::default()
2842                })]
2843                .into_iter(),
2844            );
2845            Ok(Response::from(rx))
2846        });
2847
2848        mock.expect_create_session().returning(|_| {
2849            Ok(Response::new(Session {
2850                name: "session".to_string(),
2851                multiplexed: true,
2852                ..Default::default()
2853            }))
2854        });
2855
2856        let (address, _server) = start("127.0.0.1:0", mock).await?;
2857
2858        let client: Spanner = Spanner::builder()
2859            .with_endpoint(address)
2860            .with_credentials(Anonymous::new().build())
2861            .build()
2862            .await?;
2863
2864        let db_client = client.database_client("db").build().await?;
2865        let tx = db_client.single_use().build();
2866
2867        let mut rs = tx.execute_query("SELECT 1").await?;
2868
2869        // Call metadata() BEFORE next(). It should succeed immediately.
2870        let metadata = rs.metadata().expect("metadata available");
2871        assert_eq!(metadata.column_names().len(), 1);
2872        assert_eq!(metadata.column_names()[0], "col0");
2873
2874        // Now consume the row
2875        let row = rs.next().await;
2876        assert!(row.is_some());
2877
2878        Ok(())
2879    }
2880}