Skip to main content

gcloud_spanner/
reader.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost::Message;
5use prost_types::{value::Kind, Value};
6
7use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
8use google_cloud_googleapis::spanner::v1::struct_type::Field;
9use google_cloud_googleapis::spanner::v1::{
10    ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
11};
12
13use crate::retry::StreamingRetry;
14use crate::row::Row;
15use crate::session::SessionHandle;
16use crate::transaction::CallOptions;
17
18pub trait Reader: Send + Sync {
19    fn read(
20        &self,
21        session: &mut SessionHandle,
22        option: Option<CallOptions>,
23        disable_route_to_leader: bool,
24    ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
25
26    fn update_token(&mut self, resume_token: Vec<u8>);
27
28    fn can_resume(&self) -> bool;
29}
30
31pub struct StatementReader {
32    pub enable_resume: bool,
33    pub request: ExecuteSqlRequest,
34}
35
36impl Reader for StatementReader {
37    async fn read(
38        &self,
39        session: &mut SessionHandle,
40        option: Option<CallOptions>,
41        disable_route_to_leader: bool,
42    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
43        let option = option.unwrap_or_default();
44        let client = &mut session.spanner_client;
45        let result = client
46            .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
47            .await;
48        session.invalidate_if_needed(result).await
49    }
50
51    fn update_token(&mut self, resume_token: Vec<u8>) {
52        self.request.resume_token = resume_token;
53    }
54
55    fn can_resume(&self) -> bool {
56        self.enable_resume && !self.request.resume_token.is_empty()
57    }
58}
59
60pub struct TableReader {
61    pub request: ReadRequest,
62}
63
64impl Reader for TableReader {
65    async fn read(
66        &self,
67        session: &mut SessionHandle,
68        option: Option<CallOptions>,
69        disable_route_to_leader: bool,
70    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
71        let option = option.unwrap_or_default();
72        let client = &mut session.spanner_client;
73        let result = client
74            .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
75            .await;
76        session.invalidate_if_needed(result).await
77    }
78
79    fn update_token(&mut self, resume_token: Vec<u8>) {
80        self.request.resume_token = resume_token;
81    }
82
83    fn can_resume(&self) -> bool {
84        !self.request.resume_token.is_empty()
85    }
86}
87
88pub struct ResultSet {
89    fields: Arc<Vec<Field>>,
90    index: Arc<HashMap<String, usize>>,
91    rows: VecDeque<Value>,
92    chunked_value: bool,
93}
94
95const DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS: usize = 128 * 1024 * 1024;
96
97#[derive(Debug)]
98struct ResumablePartialResultSetBuffer {
99    pending: VecDeque<PartialResultSet>,
100    last_delivered_token: Vec<u8>,
101    observed_token: Vec<u8>,
102    bytes_between_tokens: usize,
103    max_bytes_between_tokens: usize,
104    unretryable: bool,
105}
106
107impl ResumablePartialResultSetBuffer {
108    fn new(max_bytes_between_tokens: usize) -> Self {
109        Self {
110            pending: VecDeque::new(),
111            last_delivered_token: Vec::new(),
112            observed_token: Vec::new(),
113            bytes_between_tokens: 0,
114            max_bytes_between_tokens,
115            unretryable: false,
116        }
117    }
118
119    fn push(&mut self, result_set: PartialResultSet) {
120        if !result_set.resume_token.is_empty() && result_set.resume_token != self.observed_token {
121            self.observed_token = result_set.resume_token.clone();
122        }
123
124        if !self.unretryable && self.observed_token == self.last_delivered_token {
125            self.bytes_between_tokens = self.bytes_between_tokens.saturating_add(result_set.encoded_len());
126            if self.bytes_between_tokens >= self.max_bytes_between_tokens {
127                self.unretryable = true;
128            }
129        }
130
131        self.pending.push_back(result_set);
132    }
133
134    fn pop_ready(&mut self, end_of_stream: bool) -> Option<PartialResultSet> {
135        if self.pending.is_empty() {
136            return None;
137        }
138
139        if self.unretryable || end_of_stream {
140            return self.pending.pop_front();
141        }
142
143        if self.observed_token != self.last_delivered_token {
144            let result_set = self.pending.pop_front();
145            if let Some(ref rs) = result_set {
146                if !rs.resume_token.is_empty() && rs.resume_token == self.observed_token {
147                    self.last_delivered_token = self.observed_token.clone();
148                    self.bytes_between_tokens = 0;
149                }
150            }
151            return result_set;
152        }
153
154        None
155    }
156
157    fn on_resumption(&mut self) {
158        self.pending.clear();
159        self.observed_token = self.last_delivered_token.clone();
160        self.bytes_between_tokens = 0;
161        self.unretryable = false;
162    }
163}
164
165impl ResultSet {
166    fn next(&mut self) -> Option<Row> {
167        if !self.rows.is_empty() {
168            let column_length = self.fields.len();
169            let target_record_is_chunked = self.rows.len() < column_length;
170            let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
171
172            if !target_record_is_chunked && !target_record_contains_chunked_value {
173                // get column_length values
174                let mut values = Vec::with_capacity(column_length);
175                for _ in 0..column_length {
176                    values.push(self.rows.pop_front().unwrap());
177                }
178                return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
179            }
180        }
181        None
182    }
183
184    /// Merge tries to combine two protobuf Values if possible.
185    fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
186        match previous_last.kind.unwrap() {
187            Kind::StringValue(last) => match current_first.kind.unwrap() {
188                Kind::StringValue(first) => {
189                    tracing::trace!("previous_last={}, current_first={}", &last, first);
190                    Ok(Value {
191                        kind: Some(Kind::StringValue(last + &first)),
192                    })
193                }
194                _ => Err(Status::new(
195                    Code::Internal,
196                    "chunks kind mismatch: current_first must be StringKind",
197                )),
198            },
199            Kind::ListValue(mut last) => match current_first.kind.unwrap() {
200                Kind::ListValue(first) => {
201                    if first.values.is_empty() {
202                        return Ok(Value {
203                            kind: Some(Kind::ListValue(last)),
204                        });
205                    }
206                    if last.values.is_empty() {
207                        return Ok(Value {
208                            kind: Some(Kind::ListValue(first)),
209                        });
210                    }
211                    // Only recurse when the chunk boundary actually splits a
212                    // single value: both the last element of `last` and the
213                    // first element of `first` must be of a chunkable kind
214                    // (StringValue or ListValue). Otherwise the boundary fell
215                    // between complete elements and we just concatenate.
216                    let mut iter = first.values.into_iter();
217                    let first_value_of_current = iter.next().unwrap();
218                    let last_value_of_previous = last.values.pop().unwrap();
219                    let mergeable = matches!(
220                        (&last_value_of_previous.kind, &first_value_of_current.kind),
221                        (Some(Kind::StringValue(_)), Some(Kind::StringValue(_)))
222                            | (Some(Kind::ListValue(_)), Some(Kind::ListValue(_)))
223                    );
224                    if mergeable {
225                        let merged = ResultSet::merge(last_value_of_previous, first_value_of_current)?;
226                        last.values.push(merged);
227                    } else {
228                        last.values.push(last_value_of_previous);
229                        last.values.push(first_value_of_current);
230                    }
231                    last.values.extend(iter);
232                    Ok(Value {
233                        kind: Some(Kind::ListValue(last)),
234                    })
235                }
236                _ => Err(Status::new(
237                    Code::Internal,
238                    "chunks kind mismatch: current_first must be ListValue",
239                )),
240            },
241            _ => Err(Status::new(
242                Code::Internal,
243                "previous_last kind mismatch: only StringValue and ListValue can be chunked",
244            )),
245        }
246    }
247
248    fn add(
249        &mut self,
250        metadata: Option<ResultSetMetadata>,
251        mut values: Vec<Value>,
252        chunked_value: bool,
253    ) -> Result<bool, Status> {
254        // get metadata only once.
255        if self.fields.is_empty() {
256            if let Some(metadata) = metadata {
257                self.fields = metadata
258                    .row_type
259                    .map(|e| Arc::new(e.fields))
260                    .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
261                // create index for Row::column_by_name("column_name")
262                let mut index = HashMap::new();
263                for (i, f) in self.fields.iter().enumerate() {
264                    index.insert(f.name.clone(), i);
265                }
266                self.index = Arc::new(index);
267            }
268        }
269
270        if self.chunked_value {
271            tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
272            //merge when the chunked value is found.
273            let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
274            self.rows.push_back(merged);
275        }
276        self.rows.extend(values);
277        self.chunked_value = chunked_value;
278        Ok(true)
279    }
280
281    fn is_row_boundary(&self) -> bool {
282        if self.fields.is_empty() {
283            return self.rows.is_empty() && !self.chunked_value;
284        }
285        if self.chunked_value {
286            return false;
287        }
288        let columns = self.fields.len();
289        if columns == 0 {
290            return self.rows.is_empty();
291        }
292        self.rows.len().is_multiple_of(columns)
293    }
294}
295
296pub struct RowIterator<'a, T>
297where
298    T: Reader,
299{
300    streaming: Streaming<PartialResultSet>,
301    session: &'a mut SessionHandle,
302    reader: T,
303    rs: ResultSet,
304    reader_option: Option<CallOptions>,
305    disable_route_to_leader: bool,
306    stats: Option<ResultSetStats>,
307    prs_buffer: ResumablePartialResultSetBuffer,
308    resumable: bool,
309    end_of_stream: bool,
310    stream_retry: StreamingRetry,
311}
312
313impl<'a, T> RowIterator<'a, T>
314where
315    T: Reader,
316{
317    pub(crate) async fn new(
318        session: &'a mut SessionHandle,
319        reader: T,
320        option: Option<CallOptions>,
321        disable_route_to_leader: bool,
322    ) -> Result<RowIterator<'a, T>, Status> {
323        let streaming = reader
324            .read(session, option, disable_route_to_leader)
325            .await?
326            .into_inner();
327        let rs = ResultSet {
328            fields: Arc::new(vec![]),
329            index: Arc::new(HashMap::new()),
330            rows: VecDeque::new(),
331            chunked_value: false,
332        };
333        Ok(Self {
334            streaming,
335            session,
336            reader,
337            rs,
338            reader_option: None,
339            disable_route_to_leader,
340            stats: None,
341            prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
342            resumable: true,
343            end_of_stream: false,
344            stream_retry: StreamingRetry::new(),
345        })
346    }
347
348    pub fn set_call_options(&mut self, option: CallOptions) {
349        self.reader_option = Some(option);
350    }
351
352    async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
353        loop {
354            if let Some(result_set) = self.prs_buffer.pop_ready(self.end_of_stream) {
355                let resume_token_present = !result_set.resume_token.is_empty();
356                //if resume_token changes set new resume_token
357                if resume_token_present {
358                    self.reader.update_token(result_set.resume_token.clone());
359                }
360                // Capture stats if present (only sent with the last response)
361                if result_set.stats.is_some() {
362                    self.stats = result_set.stats;
363                }
364                if result_set.values.is_empty() {
365                    // Process metadata even when values are empty (e.g., QueryMode::Plan)
366                    self.rs
367                        .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
368                    return Ok(false);
369                }
370                let added = self
371                    .rs
372                    .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
373                if resume_token_present && !self.rs.is_row_boundary() {
374                    return Err(Status::new(Code::FailedPrecondition, "resume token is not on a row boundary"));
375                }
376                return Ok(added);
377            }
378
379            if self.end_of_stream {
380                return Ok(false);
381            }
382
383            let received = match self.streaming.message().await {
384                Ok(s) => s,
385                Err(e) => {
386                    if !self.reader.can_resume() || !self.resumable {
387                        return Err(e);
388                    }
389                    tracing::debug!("streaming error: {}. resume reading by resume_token", e);
390                    self.stream_retry.next(e).await?;
391                    let call_option = option.clone();
392                    let result = self
393                        .reader
394                        .read(self.session, call_option, self.disable_route_to_leader)
395                        .await?;
396                    self.streaming = result.into_inner();
397                    self.prs_buffer.on_resumption();
398                    continue;
399                }
400            };
401
402            match received {
403                Some(result_set) => {
404                    if result_set.last {
405                        self.end_of_stream = true;
406                    }
407                    self.prs_buffer.push(result_set);
408                    if self.prs_buffer.unretryable {
409                        self.resumable = false;
410                    }
411                }
412                None => {
413                    self.end_of_stream = true;
414                }
415            }
416        }
417    }
418
419    /// Return metadata for all columns
420    pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
421        &self.rs.fields
422    }
423
424    pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
425        for (i, val) in self.rs.fields.iter().enumerate() {
426            if val.name == column_name {
427                return Some((i, val.clone()));
428            }
429        }
430        None
431    }
432
433    /// Returns query execution statistics if available.
434    /// Stats are only available after all rows have been consumed and only when
435    /// the query was executed with a QueryMode that includes stats (Profile, WithStats, or WithPlanAndStats).
436    pub fn stats(&self) -> Option<&ResultSetStats> {
437        self.stats.as_ref()
438    }
439
440    /// next returns the next result.
441    /// Its second return value is None if there are no more results.
442    pub async fn next(&mut self) -> Result<Option<Row>, Status> {
443        loop {
444            let row = self.rs.next();
445            if row.is_some() {
446                return Ok(row);
447            }
448            // no data found or record chunked.
449            if !self.try_recv(self.reader_option.clone()).await? {
450                return Ok(None);
451            }
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use std::collections::VecDeque;
459    use std::sync::Arc;
460
461    use prost_types::value::Kind;
462    use prost_types::Value;
463
464    use google_cloud_googleapis::spanner::v1::struct_type::Field;
465    use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
466
467    use crate::reader::ResultSet;
468    use crate::row::{Row, TryFromValue};
469    use crate::statement::ToKind;
470
471    fn empty_rs() -> ResultSet {
472        ResultSet {
473            fields: Arc::new(vec![]),
474            index: Arc::new(Default::default()),
475            rows: Default::default(),
476            chunked_value: false,
477        }
478    }
479
480    fn field(name: &str) -> Field {
481        Field {
482            name: name.to_string(),
483            r#type: None,
484        }
485    }
486
487    fn value(to_kind: impl ToKind) -> Value {
488        Value {
489            kind: Some(to_kind.to_kind()),
490        }
491    }
492
493    fn prs(values: Vec<Value>, resume_token: &str, chunked_value: bool) -> PartialResultSet {
494        PartialResultSet {
495            metadata: None,
496            values,
497            chunked_value,
498            resume_token: resume_token.as_bytes().to_vec(),
499            stats: None,
500            precommit_token: None,
501            last: false,
502        }
503    }
504
505    fn assert_one_column(rs: &ResultSet) {
506        assert_eq!(rs.fields.len(), 1);
507        assert_eq!(rs.fields[0].name, "column1".to_string());
508        assert_eq!(*rs.index.get("column1").unwrap(), 0);
509    }
510
511    fn assert_multi_column(rs: &ResultSet) {
512        assert_eq!(rs.fields.len(), 2);
513        assert_eq!(rs.fields[0].name, "column1".to_string());
514        assert_eq!(rs.fields[1].name, "column2".to_string());
515        assert_eq!(*rs.index.get("column1").unwrap(), 0);
516        assert_eq!(*rs.index.get("column2").unwrap(), 1);
517    }
518
519    fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
520        assert!(row.is_some());
521        assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
522    }
523
524    fn assert_some_multi_column<
525        T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
526        T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
527    >(
528        row: Option<Row>,
529        v1: T1,
530        v2: T2,
531    ) {
532        assert!(row.is_some());
533        let v = row.unwrap();
534        assert_eq!(v1, v.column::<T1>(0).unwrap());
535        assert_eq!(v2, v.column::<T2>(1).unwrap());
536    }
537
538    #[test]
539    fn test_rs_next_empty() {
540        let mut rs = ResultSet {
541            fields: Arc::new(vec![field("column1")]),
542            index: Arc::new(Default::default()),
543            rows: Default::default(),
544            chunked_value: false,
545        };
546        assert!(rs.next().is_none());
547    }
548
549    #[test]
550    fn test_rs_next_record_chunked_or_not() {
551        let rs = |values| ResultSet {
552            fields: Arc::new(vec![field("column1"), field("column2")]),
553            index: Arc::new(Default::default()),
554            rows: VecDeque::from(values),
555            chunked_value: false,
556        };
557        let mut rs1 = rs(vec![value("value1")]);
558        assert!(rs1.next().is_none());
559        let mut rs2 = rs(vec![value("value1"), value("value2")]);
560        assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
561    }
562
563    #[test]
564    fn test_rs_next_value_chunked_or_not() {
565        let rs = |chunked_value| ResultSet {
566            fields: Arc::new(vec![field("column1"), field("column2")]),
567            index: Arc::new(Default::default()),
568            rows: VecDeque::from(vec![value("value1"), value("value2")]),
569            chunked_value,
570        };
571        assert!(rs(true).next().is_none());
572        assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
573    }
574
575    #[test]
576    fn test_rs_next_plural_record_one_column() {
577        let rs = |chunked_value| ResultSet {
578            fields: Arc::new(vec![field("column1")]),
579            index: Arc::new(Default::default()),
580            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
581            chunked_value,
582        };
583        let mut incomplete = rs(true);
584        assert!(incomplete.next().is_some());
585        assert!(incomplete.next().is_some());
586        assert!(incomplete.next().is_none());
587        let mut complete = rs(false);
588        assert!(complete.next().is_some());
589        assert!(complete.next().is_some());
590        assert!(complete.next().is_some());
591        assert!(complete.next().is_none());
592    }
593
594    #[test]
595    fn test_rs_next_plural_record_multi_column() {
596        let rs = |chunked_value| ResultSet {
597            fields: Arc::new(vec![field("column1"), field("column2")]),
598            index: Arc::new(Default::default()),
599            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
600            chunked_value,
601        };
602        let mut incomplete = rs(true);
603        assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
604        assert!(incomplete.next().is_none());
605        let mut complete = rs(false);
606        assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
607        assert!(incomplete.next().is_none());
608    }
609
610    #[test]
611    fn test_rs_merge_string_value() {
612        let result = ResultSet::merge(value("val"), value("ue1"));
613        assert!(result.is_ok());
614        let kind = result.unwrap().kind.unwrap();
615        match kind {
616            Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
617            _ => unreachable!("must be string value"),
618        }
619    }
620
621    #[test]
622    fn test_rs_merge_list_value() {
623        let previous_last = value(vec!["value1-1", "value1-2", "val"]);
624        let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
625        let result = ResultSet::merge(previous_last, current_first);
626        assert!(result.is_ok());
627        let kind = result.unwrap().kind.unwrap();
628        match kind {
629            Kind::ListValue(v) => {
630                assert_eq!(v.values.len(), 5);
631                match v.values[0].kind.as_ref().unwrap() {
632                    Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
633                    _ => unreachable!("must be string value"),
634                };
635                match v.values[1].kind.as_ref().unwrap() {
636                    Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
637                    _ => unreachable!("must be string value"),
638                };
639                match v.values[2].kind.as_ref().unwrap() {
640                    Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
641                    _ => unreachable!("must be string value"),
642                };
643                match v.values[3].kind.as_ref().unwrap() {
644                    Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
645                    _ => unreachable!("must be string value"),
646                }
647                match v.values[4].kind.as_ref().unwrap() {
648                    Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
649                    _ => unreachable!("must be string value"),
650                }
651            }
652            _ => unreachable!("must be string value"),
653        }
654    }
655
656    // Regression: chunk boundary falls between complete inner elements where
657    // the last element of the previous list is not a chunkable kind (here
658    // NullValue). Previously this returned `Internal: previous_last kind
659    // mismatch ...`. After the fix it must concatenate without merging.
660    #[test]
661    fn test_rs_merge_list_value_with_non_mergeable_tail() {
662        let null = || Value {
663            kind: Some(Kind::NullValue(prost_types::NullValue::NullValue.into())),
664        };
665        let bool_v = |b: bool| Value {
666            kind: Some(Kind::BoolValue(b)),
667        };
668        let num_v = |n: f64| Value {
669            kind: Some(Kind::NumberValue(n)),
670        };
671
672        // previous tail = NullValue, source first = StringValue: must NOT merge.
673        let previous_last = Value {
674            kind: Some(Kind::ListValue(prost_types::ListValue {
675                values: vec![value("a"), null()],
676            })),
677        };
678        let current_first = Value {
679            kind: Some(Kind::ListValue(prost_types::ListValue {
680                values: vec![value("b"), bool_v(true)],
681            })),
682        };
683        let merged = ResultSet::merge(previous_last, current_first).expect("must concatenate");
684        match merged.kind.unwrap() {
685            Kind::ListValue(v) => {
686                assert_eq!(v.values.len(), 4);
687                assert!(matches!(v.values[0].kind, Some(Kind::StringValue(ref s)) if s == "a"));
688                assert!(matches!(v.values[1].kind, Some(Kind::NullValue(_))));
689                assert!(matches!(v.values[2].kind, Some(Kind::StringValue(ref s)) if s == "b"));
690                assert!(matches!(v.values[3].kind, Some(Kind::BoolValue(true))));
691            }
692            _ => unreachable!("must be list value"),
693        }
694
695        // previous tail = BoolValue, source first = NumberValue: still ok.
696        let previous_last = Value {
697            kind: Some(Kind::ListValue(prost_types::ListValue {
698                values: vec![bool_v(false)],
699            })),
700        };
701        let current_first = Value {
702            kind: Some(Kind::ListValue(prost_types::ListValue {
703                values: vec![num_v(1.5), value("tail")],
704            })),
705        };
706        let merged = ResultSet::merge(previous_last, current_first).expect("must concatenate");
707        match merged.kind.unwrap() {
708            Kind::ListValue(v) => {
709                assert_eq!(v.values.len(), 3);
710                assert!(matches!(v.values[0].kind, Some(Kind::BoolValue(false))));
711                assert!(matches!(v.values[1].kind, Some(Kind::NumberValue(n)) if n == 1.5));
712                assert!(matches!(v.values[2].kind, Some(Kind::StringValue(ref s)) if s == "tail"));
713            }
714            _ => unreachable!("must be list value"),
715        }
716    }
717
718    // Empty list edge cases — a chunk may carry no inner values.
719    #[test]
720    fn test_rs_merge_list_value_empty_sides() {
721        let empty_list = || Value {
722            kind: Some(Kind::ListValue(prost_types::ListValue { values: vec![] })),
723        };
724        let with_one = || Value {
725            kind: Some(Kind::ListValue(prost_types::ListValue {
726                values: vec![value("only")],
727            })),
728        };
729
730        // previous empty + current with one => result is current.
731        let merged = ResultSet::merge(empty_list(), with_one()).unwrap();
732        match merged.kind.unwrap() {
733            Kind::ListValue(v) => {
734                assert_eq!(v.values.len(), 1);
735                assert!(matches!(v.values[0].kind, Some(Kind::StringValue(ref s)) if s == "only"));
736            }
737            _ => unreachable!(),
738        }
739
740        // previous with one + current empty => result is previous.
741        let merged = ResultSet::merge(with_one(), empty_list()).unwrap();
742        match merged.kind.unwrap() {
743            Kind::ListValue(v) => {
744                assert_eq!(v.values.len(), 1);
745                assert!(matches!(v.values[0].kind, Some(Kind::StringValue(ref s)) if s == "only"));
746            }
747            _ => unreachable!(),
748        }
749    }
750
751    #[test]
752    fn test_rs_add_one_column_no_chunked_value() {
753        let mut rs = empty_rs();
754        let metadata = Some(ResultSetMetadata {
755            row_type: Some(StructType {
756                fields: vec![field("column1")],
757            }),
758            transaction: None,
759            undeclared_parameters: None,
760        });
761        let values = vec![value("value1"), value("value2"), value("value3")];
762        assert!(rs.add(metadata, values, false).unwrap());
763        assert_eq!(rs.rows.len(), 3);
764        assert_one_column(&rs);
765        assert!(!rs.chunked_value);
766
767        assert_some_one_column(rs.next(), "value1".to_string());
768        assert_some_one_column(rs.next(), "value2".to_string());
769        assert_some_one_column(rs.next(), "value3".to_string());
770        assert!(rs.next().is_none());
771    }
772
773    #[test]
774    fn test_rs_add_multi_column_no_chunked_value() {
775        let mut rs = empty_rs();
776        let metadata = Some(ResultSetMetadata {
777            row_type: Some(StructType {
778                fields: vec![field("column1"), field("column2")],
779            }),
780            transaction: None,
781            undeclared_parameters: None,
782        });
783        let values = vec![value("value1"), value("value2"), value("value3")];
784        assert!(rs.add(metadata, values, false).unwrap());
785        assert_eq!(rs.rows.len(), 3);
786        assert_multi_column(&rs);
787        assert!(!rs.chunked_value);
788
789        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
790        assert!(rs.next().is_none());
791    }
792
793    #[test]
794    fn test_rs_add_multi_column_no_chunked_value_just() {
795        let mut rs = empty_rs();
796        let metadata = Some(ResultSetMetadata {
797            row_type: Some(StructType {
798                fields: vec![field("column1"), field("column2")],
799            }),
800            transaction: None,
801            undeclared_parameters: None,
802        });
803        let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
804        assert!(rs.add(metadata, values, false).unwrap());
805        assert_eq!(rs.rows.len(), 4);
806        assert_multi_column(&rs);
807        assert!(!rs.chunked_value);
808
809        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
810        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
811        assert!(rs.next().is_none());
812    }
813
814    #[test]
815    fn test_rs_add_one_column_chunked_value() {
816        let mut rs = empty_rs();
817        let metadata = Some(ResultSetMetadata {
818            row_type: Some(StructType {
819                fields: vec![field("column1")],
820            }),
821            transaction: None,
822            undeclared_parameters: None,
823        });
824        let values = vec![value("value1"), value("value2"), value("val")];
825        assert!(rs.add(metadata.clone(), values, true).unwrap());
826        assert_eq!(rs.rows.len(), 3);
827        assert_one_column(&rs);
828        assert!(rs.chunked_value);
829
830        assert_some_one_column(rs.next(), "value1".to_string());
831        assert_some_one_column(rs.next(), "value2".to_string());
832        assert!(rs.next().is_none());
833
834        // add next stream data
835        assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
836        assert!(!rs.chunked_value);
837        assert_eq!(rs.rows.len(), 1);
838        assert_some_one_column(rs.next(), "value3".to_string());
839        assert!(rs.next().is_none());
840    }
841
842    #[test]
843    fn test_rs_add_multi_column_chunked_value() {
844        let mut rs = empty_rs();
845        let metadata = Some(ResultSetMetadata {
846            row_type: Some(StructType {
847                fields: vec![field("column1"), field("column2")],
848            }),
849            transaction: None,
850            undeclared_parameters: None,
851        });
852        let values = vec![value("value1"), value("value2"), value("val")];
853        assert!(rs.add(metadata.clone(), values, true).unwrap());
854        assert_eq!(rs.rows.len(), 3);
855        assert_multi_column(&rs);
856        assert!(rs.chunked_value);
857
858        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
859        assert!(rs.next().is_none());
860
861        // add next stream data
862        assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
863        assert!(!rs.chunked_value);
864        assert_eq!(rs.rows.len(), 1);
865        assert!(rs.next().is_none());
866
867        // add next stream data
868        assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
869        assert!(!rs.chunked_value);
870        assert_eq!(rs.rows.len(), 2);
871        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
872    }
873
874    #[test]
875    fn test_rs_add_multi_column_no_chunked_value_list_value() {
876        let mut rs = empty_rs();
877        let metadata = Some(ResultSetMetadata {
878            row_type: Some(StructType {
879                fields: vec![field("column1"), field("column2")],
880            }),
881            transaction: None,
882            undeclared_parameters: None,
883        });
884        let values = vec![value(vec!["value1-1", "value1-2"])];
885        assert!(rs.add(metadata.clone(), values, false).unwrap());
886        assert_eq!(rs.rows.len(), 1);
887        assert_multi_column(&rs);
888        assert!(!rs.chunked_value);
889        assert!(rs.next().is_none());
890        assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
891        assert!(!rs.chunked_value);
892        assert_eq!(rs.rows.len(), 2);
893        assert_some_multi_column(
894            rs.next(),
895            vec!["value1-1".to_string(), "value1-2".to_string()],
896            vec!["value2-1".to_string()],
897        );
898        assert!(rs.next().is_none());
899    }
900
901    #[test]
902    fn test_rs_add_multi_column_chunked_value_list_value() {
903        let mut rs = empty_rs();
904        let metadata = Some(ResultSetMetadata {
905            row_type: Some(StructType {
906                fields: vec![field("column1"), field("column2")],
907            }),
908            transaction: None,
909            undeclared_parameters: None,
910        });
911        let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
912        assert!(rs.add(metadata.clone(), values, true).unwrap());
913        assert_eq!(rs.rows.len(), 2);
914        assert_multi_column(&rs);
915        assert!(rs.chunked_value);
916        assert!(rs.next().is_none());
917
918        // add next stream data
919        assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
920        assert!(rs.chunked_value);
921        assert_eq!(rs.rows.len(), 2);
922        assert!(rs.next().is_none());
923
924        // add next stream data
925        assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
926        assert!(!rs.chunked_value);
927        assert_eq!(rs.rows.len(), 2);
928        assert_some_multi_column(
929            rs.next(),
930            vec!["value1-1".to_string(), "value1-2".to_string()],
931            vec!["value2-1".to_string(), "value2-2".to_string()],
932        );
933        assert!(rs.next().is_none());
934    }
935
936    #[test]
937    fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
938        let mut rs = empty_rs();
939        let metadata = Some(ResultSetMetadata {
940            row_type: Some(StructType {
941                fields: vec![field("column1"), field("column2")],
942            }),
943            transaction: None,
944            undeclared_parameters: None,
945        });
946        let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
947        assert!(rs.add(metadata.clone(), values, true).unwrap());
948        assert_eq!(rs.rows.len(), 2);
949        assert_multi_column(&rs);
950        assert!(rs.chunked_value);
951        assert!(rs.next().is_none());
952
953        // add next stream data
954        assert!(rs
955            .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
956            .unwrap());
957        assert!(rs.chunked_value);
958        assert_eq!(rs.rows.len(), 3);
959        assert_some_multi_column(
960            rs.next(),
961            vec!["value1-1".to_string(), "value1-2".to_string()],
962            "valueA".to_string(),
963        );
964        assert!(rs.next().is_none());
965
966        // add next stream data
967        assert!(rs
968            .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
969            .unwrap());
970        assert!(!rs.chunked_value);
971        assert_eq!(rs.rows.len(), 1);
972        assert!(rs.next().is_none());
973
974        // add next stream data
975        assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
976        assert!(rs.chunked_value);
977        assert_eq!(rs.rows.len(), 2);
978        assert!(rs.next().is_none());
979
980        // add next stream data
981        assert!(rs.add(metadata, vec![value("B")], false).unwrap());
982        assert!(!rs.chunked_value);
983        assert_eq!(rs.rows.len(), 2);
984        assert_some_multi_column(
985            rs.next(),
986            vec!["value2-1".to_string(), "value2-2".to_string()],
987            "valueB".to_string(),
988        );
989        assert!(rs.next().is_none());
990    }
991
992    #[test]
993    fn test_prs_buffer_waits_for_resume_token() {
994        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
995        buffer.push(prs(vec![value("value-1")], "", false));
996        assert!(buffer.pop_ready(false).is_none());
997
998        buffer.push(prs(vec![value("value-2")], "token-1", false));
999        assert!(buffer.pop_ready(false).is_some());
1000        assert!(buffer.pop_ready(false).is_some());
1001        assert!(buffer.pop_ready(false).is_none());
1002    }
1003
1004    #[test]
1005    fn test_prs_buffer_flushes_on_end_of_stream() {
1006        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
1007        buffer.push(prs(vec![value("value-1")], "", false));
1008        assert!(buffer.pop_ready(false).is_none());
1009        assert!(buffer.pop_ready(true).is_some());
1010        assert!(buffer.pop_ready(true).is_none());
1011    }
1012
1013    #[test]
1014    fn test_prs_buffer_becomes_unretryable_after_limit() {
1015        let mut buffer = super::ResumablePartialResultSetBuffer::new(1);
1016        buffer.push(prs(vec![value("value-1")], "", false));
1017        assert!(buffer.unretryable);
1018        assert!(buffer.pop_ready(false).is_some());
1019    }
1020
1021    #[test]
1022    fn test_prs_buffer_on_resumption_discards_pending() {
1023        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
1024        buffer.push(prs(vec![value("value-1")], "", false));
1025        buffer.on_resumption();
1026        buffer.push(prs(vec![value("value-2")], "token-1", false));
1027        assert!(buffer.pop_ready(false).is_some());
1028        assert!(buffer.pop_ready(false).is_none());
1029    }
1030
1031    #[test]
1032    fn test_rs_is_row_boundary_empty() {
1033        let rs = empty_rs();
1034        assert!(rs.is_row_boundary());
1035    }
1036
1037    #[test]
1038    fn test_rs_is_row_boundary_chunked() {
1039        let rs = ResultSet {
1040            fields: Arc::new(vec![field("column1")]),
1041            index: Arc::new(Default::default()),
1042            rows: VecDeque::from(vec![value("value1")]),
1043            chunked_value: true,
1044        };
1045        assert!(!rs.is_row_boundary());
1046    }
1047
1048    #[test]
1049    fn test_rs_is_row_boundary_multiple_columns() {
1050        let rs_complete = ResultSet {
1051            fields: Arc::new(vec![field("column1"), field("column2")]),
1052            index: Arc::new(Default::default()),
1053            rows: VecDeque::from(vec![value("value1"), value("value2")]),
1054            chunked_value: false,
1055        };
1056        assert!(rs_complete.is_row_boundary());
1057
1058        let rs_partial = ResultSet {
1059            fields: Arc::new(vec![field("column1"), field("column2")]),
1060            index: Arc::new(Default::default()),
1061            rows: VecDeque::from(vec![value("value1")]),
1062            chunked_value: false,
1063        };
1064        assert!(!rs_partial.is_row_boundary());
1065    }
1066}