Skip to main content

gcloud_spanner/
reader.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost::Message;
5use prost_types::{value::Kind, Value};
6
7use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
8use google_cloud_googleapis::spanner::v1::struct_type::Field;
9use google_cloud_googleapis::spanner::v1::{
10    ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
11};
12
13use crate::retry::StreamingRetry;
14use crate::row::Row;
15use crate::session::SessionHandle;
16use crate::transaction::CallOptions;
17
18pub trait Reader: Send + Sync {
19    fn read(
20        &self,
21        session: &mut SessionHandle,
22        option: Option<CallOptions>,
23        disable_route_to_leader: bool,
24    ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
25
26    fn update_token(&mut self, resume_token: Vec<u8>);
27
28    fn can_resume(&self) -> bool;
29}
30
31pub struct StatementReader {
32    pub enable_resume: bool,
33    pub request: ExecuteSqlRequest,
34}
35
36impl Reader for StatementReader {
37    async fn read(
38        &self,
39        session: &mut SessionHandle,
40        option: Option<CallOptions>,
41        disable_route_to_leader: bool,
42    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
43        let option = option.unwrap_or_default();
44        let client = &mut session.spanner_client;
45        let result = client
46            .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
47            .await;
48        session.invalidate_if_needed(result).await
49    }
50
51    fn update_token(&mut self, resume_token: Vec<u8>) {
52        self.request.resume_token = resume_token;
53    }
54
55    fn can_resume(&self) -> bool {
56        self.enable_resume && !self.request.resume_token.is_empty()
57    }
58}
59
60pub struct TableReader {
61    pub request: ReadRequest,
62}
63
64impl Reader for TableReader {
65    async fn read(
66        &self,
67        session: &mut SessionHandle,
68        option: Option<CallOptions>,
69        disable_route_to_leader: bool,
70    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
71        let option = option.unwrap_or_default();
72        let client = &mut session.spanner_client;
73        let result = client
74            .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
75            .await;
76        session.invalidate_if_needed(result).await
77    }
78
79    fn update_token(&mut self, resume_token: Vec<u8>) {
80        self.request.resume_token = resume_token;
81    }
82
83    fn can_resume(&self) -> bool {
84        !self.request.resume_token.is_empty()
85    }
86}
87
88pub struct ResultSet {
89    fields: Arc<Vec<Field>>,
90    index: Arc<HashMap<String, usize>>,
91    rows: VecDeque<Value>,
92    chunked_value: bool,
93}
94
95const DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS: usize = 128 * 1024 * 1024;
96
97#[derive(Debug)]
98struct ResumablePartialResultSetBuffer {
99    pending: VecDeque<PartialResultSet>,
100    last_delivered_token: Vec<u8>,
101    observed_token: Vec<u8>,
102    bytes_between_tokens: usize,
103    max_bytes_between_tokens: usize,
104    unretryable: bool,
105}
106
107impl ResumablePartialResultSetBuffer {
108    fn new(max_bytes_between_tokens: usize) -> Self {
109        Self {
110            pending: VecDeque::new(),
111            last_delivered_token: Vec::new(),
112            observed_token: Vec::new(),
113            bytes_between_tokens: 0,
114            max_bytes_between_tokens,
115            unretryable: false,
116        }
117    }
118
119    fn push(&mut self, result_set: PartialResultSet) {
120        if !result_set.resume_token.is_empty() && result_set.resume_token != self.observed_token {
121            self.observed_token = result_set.resume_token.clone();
122        }
123
124        if !self.unretryable && self.observed_token == self.last_delivered_token {
125            self.bytes_between_tokens = self.bytes_between_tokens.saturating_add(result_set.encoded_len());
126            if self.bytes_between_tokens >= self.max_bytes_between_tokens {
127                self.unretryable = true;
128            }
129        }
130
131        self.pending.push_back(result_set);
132    }
133
134    fn pop_ready(&mut self, end_of_stream: bool) -> Option<PartialResultSet> {
135        if self.pending.is_empty() {
136            return None;
137        }
138
139        if self.unretryable || end_of_stream {
140            return self.pending.pop_front();
141        }
142
143        if self.observed_token != self.last_delivered_token {
144            let result_set = self.pending.pop_front();
145            if let Some(ref rs) = result_set {
146                if !rs.resume_token.is_empty() && rs.resume_token == self.observed_token {
147                    self.last_delivered_token = self.observed_token.clone();
148                    self.bytes_between_tokens = 0;
149                }
150            }
151            return result_set;
152        }
153
154        None
155    }
156
157    fn on_resumption(&mut self) {
158        self.pending.clear();
159        self.observed_token = self.last_delivered_token.clone();
160        self.bytes_between_tokens = 0;
161        self.unretryable = false;
162    }
163}
164
165impl ResultSet {
166    fn next(&mut self) -> Option<Row> {
167        if !self.rows.is_empty() {
168            let column_length = self.fields.len();
169            let target_record_is_chunked = self.rows.len() < column_length;
170            let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
171
172            if !target_record_is_chunked && !target_record_contains_chunked_value {
173                // get column_length values
174                let mut values = Vec::with_capacity(column_length);
175                for _ in 0..column_length {
176                    values.push(self.rows.pop_front().unwrap());
177                }
178                return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
179            }
180        }
181        None
182    }
183
184    /// Merge tries to combine two protobuf Values if possible.
185    fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
186        match previous_last.kind.unwrap() {
187            Kind::StringValue(last) => match current_first.kind.unwrap() {
188                Kind::StringValue(first) => {
189                    tracing::trace!("previous_last={}, current_first={}", &last, first);
190                    Ok(Value {
191                        kind: Some(Kind::StringValue(last + &first)),
192                    })
193                }
194                _ => Err(Status::new(
195                    Code::Internal,
196                    "chunks kind mismatch: current_first must be StringKind",
197                )),
198            },
199            Kind::ListValue(mut last) => match current_first.kind.unwrap() {
200                Kind::ListValue(mut first) => {
201                    let first_value_of_current = first.values.remove(0);
202                    let merged = match last.values.pop() {
203                        Some(last_value_of_previous) => {
204                            ResultSet::merge(last_value_of_previous, first_value_of_current)?
205                        }
206                        // last record can be empty
207                        None => first_value_of_current,
208                    };
209                    last.values.push(merged);
210                    last.values.extend(first.values);
211                    Ok(Value {
212                        kind: Some(Kind::ListValue(last)),
213                    })
214                }
215                _ => Err(Status::new(
216                    Code::Internal,
217                    "chunks kind mismatch: current_first must be ListValue",
218                )),
219            },
220            _ => Err(Status::new(
221                Code::Internal,
222                "previous_last kind mismatch: only StringValue and ListValue can be chunked",
223            )),
224        }
225    }
226
227    fn add(
228        &mut self,
229        metadata: Option<ResultSetMetadata>,
230        mut values: Vec<Value>,
231        chunked_value: bool,
232    ) -> Result<bool, Status> {
233        // get metadata only once.
234        if self.fields.is_empty() {
235            if let Some(metadata) = metadata {
236                self.fields = metadata
237                    .row_type
238                    .map(|e| Arc::new(e.fields))
239                    .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
240                // create index for Row::column_by_name("column_name")
241                let mut index = HashMap::new();
242                for (i, f) in self.fields.iter().enumerate() {
243                    index.insert(f.name.clone(), i);
244                }
245                self.index = Arc::new(index);
246            }
247        }
248
249        if self.chunked_value {
250            tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
251            //merge when the chunked value is found.
252            let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
253            self.rows.push_back(merged);
254        }
255        self.rows.extend(values);
256        self.chunked_value = chunked_value;
257        Ok(true)
258    }
259
260    fn is_row_boundary(&self) -> bool {
261        if self.fields.is_empty() {
262            return self.rows.is_empty() && !self.chunked_value;
263        }
264        if self.chunked_value {
265            return false;
266        }
267        let columns = self.fields.len();
268        if columns == 0 {
269            return self.rows.is_empty();
270        }
271        self.rows.len().is_multiple_of(columns)
272    }
273}
274
275pub struct RowIterator<'a, T>
276where
277    T: Reader,
278{
279    streaming: Streaming<PartialResultSet>,
280    session: &'a mut SessionHandle,
281    reader: T,
282    rs: ResultSet,
283    reader_option: Option<CallOptions>,
284    disable_route_to_leader: bool,
285    stats: Option<ResultSetStats>,
286    prs_buffer: ResumablePartialResultSetBuffer,
287    resumable: bool,
288    end_of_stream: bool,
289    stream_retry: StreamingRetry,
290}
291
292impl<'a, T> RowIterator<'a, T>
293where
294    T: Reader,
295{
296    pub(crate) async fn new(
297        session: &'a mut SessionHandle,
298        reader: T,
299        option: Option<CallOptions>,
300        disable_route_to_leader: bool,
301    ) -> Result<RowIterator<'a, T>, Status> {
302        let streaming = reader
303            .read(session, option, disable_route_to_leader)
304            .await?
305            .into_inner();
306        let rs = ResultSet {
307            fields: Arc::new(vec![]),
308            index: Arc::new(HashMap::new()),
309            rows: VecDeque::new(),
310            chunked_value: false,
311        };
312        Ok(Self {
313            streaming,
314            session,
315            reader,
316            rs,
317            reader_option: None,
318            disable_route_to_leader,
319            stats: None,
320            prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
321            resumable: true,
322            end_of_stream: false,
323            stream_retry: StreamingRetry::new(),
324        })
325    }
326
327    pub fn set_call_options(&mut self, option: CallOptions) {
328        self.reader_option = Some(option);
329    }
330
331    async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
332        loop {
333            if let Some(result_set) = self.prs_buffer.pop_ready(self.end_of_stream) {
334                let resume_token_present = !result_set.resume_token.is_empty();
335                //if resume_token changes set new resume_token
336                if resume_token_present {
337                    self.reader.update_token(result_set.resume_token.clone());
338                }
339                // Capture stats if present (only sent with the last response)
340                if result_set.stats.is_some() {
341                    self.stats = result_set.stats;
342                }
343                if result_set.values.is_empty() {
344                    // Process metadata even when values are empty (e.g., QueryMode::Plan)
345                    self.rs
346                        .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
347                    return Ok(false);
348                }
349                let added = self
350                    .rs
351                    .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
352                if resume_token_present && !self.rs.is_row_boundary() {
353                    return Err(Status::new(Code::FailedPrecondition, "resume token is not on a row boundary"));
354                }
355                return Ok(added);
356            }
357
358            if self.end_of_stream {
359                return Ok(false);
360            }
361
362            let received = match self.streaming.message().await {
363                Ok(s) => s,
364                Err(e) => {
365                    if !self.reader.can_resume() || !self.resumable {
366                        return Err(e);
367                    }
368                    tracing::debug!("streaming error: {}. resume reading by resume_token", e);
369                    self.stream_retry.next(e).await?;
370                    let call_option = option.clone();
371                    let result = self
372                        .reader
373                        .read(self.session, call_option, self.disable_route_to_leader)
374                        .await?;
375                    self.streaming = result.into_inner();
376                    self.prs_buffer.on_resumption();
377                    continue;
378                }
379            };
380
381            match received {
382                Some(result_set) => {
383                    if result_set.last {
384                        self.end_of_stream = true;
385                    }
386                    self.prs_buffer.push(result_set);
387                    if self.prs_buffer.unretryable {
388                        self.resumable = false;
389                    }
390                }
391                None => {
392                    self.end_of_stream = true;
393                }
394            }
395        }
396    }
397
398    /// Return metadata for all columns
399    pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
400        &self.rs.fields
401    }
402
403    pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
404        for (i, val) in self.rs.fields.iter().enumerate() {
405            if val.name == column_name {
406                return Some((i, val.clone()));
407            }
408        }
409        None
410    }
411
412    /// Returns query execution statistics if available.
413    /// Stats are only available after all rows have been consumed and only when
414    /// the query was executed with a QueryMode that includes stats (Profile, WithStats, or WithPlanAndStats).
415    pub fn stats(&self) -> Option<&ResultSetStats> {
416        self.stats.as_ref()
417    }
418
419    /// next returns the next result.
420    /// Its second return value is None if there are no more results.
421    pub async fn next(&mut self) -> Result<Option<Row>, Status> {
422        loop {
423            let row = self.rs.next();
424            if row.is_some() {
425                return Ok(row);
426            }
427            // no data found or record chunked.
428            if !self.try_recv(self.reader_option.clone()).await? {
429                return Ok(None);
430            }
431        }
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use std::collections::VecDeque;
438    use std::sync::Arc;
439
440    use prost_types::value::Kind;
441    use prost_types::Value;
442
443    use google_cloud_googleapis::spanner::v1::struct_type::Field;
444    use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
445
446    use crate::reader::ResultSet;
447    use crate::row::{Row, TryFromValue};
448    use crate::statement::ToKind;
449
450    fn empty_rs() -> ResultSet {
451        ResultSet {
452            fields: Arc::new(vec![]),
453            index: Arc::new(Default::default()),
454            rows: Default::default(),
455            chunked_value: false,
456        }
457    }
458
459    fn field(name: &str) -> Field {
460        Field {
461            name: name.to_string(),
462            r#type: None,
463        }
464    }
465
466    fn value(to_kind: impl ToKind) -> Value {
467        Value {
468            kind: Some(to_kind.to_kind()),
469        }
470    }
471
472    fn prs(values: Vec<Value>, resume_token: &str, chunked_value: bool) -> PartialResultSet {
473        PartialResultSet {
474            metadata: None,
475            values,
476            chunked_value,
477            resume_token: resume_token.as_bytes().to_vec(),
478            stats: None,
479            precommit_token: None,
480            last: false,
481        }
482    }
483
484    fn assert_one_column(rs: &ResultSet) {
485        assert_eq!(rs.fields.len(), 1);
486        assert_eq!(rs.fields[0].name, "column1".to_string());
487        assert_eq!(*rs.index.get("column1").unwrap(), 0);
488    }
489
490    fn assert_multi_column(rs: &ResultSet) {
491        assert_eq!(rs.fields.len(), 2);
492        assert_eq!(rs.fields[0].name, "column1".to_string());
493        assert_eq!(rs.fields[1].name, "column2".to_string());
494        assert_eq!(*rs.index.get("column1").unwrap(), 0);
495        assert_eq!(*rs.index.get("column2").unwrap(), 1);
496    }
497
498    fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
499        assert!(row.is_some());
500        assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
501    }
502
503    fn assert_some_multi_column<
504        T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
505        T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
506    >(
507        row: Option<Row>,
508        v1: T1,
509        v2: T2,
510    ) {
511        assert!(row.is_some());
512        let v = row.unwrap();
513        assert_eq!(v1, v.column::<T1>(0).unwrap());
514        assert_eq!(v2, v.column::<T2>(1).unwrap());
515    }
516
517    #[test]
518    fn test_rs_next_empty() {
519        let mut rs = ResultSet {
520            fields: Arc::new(vec![field("column1")]),
521            index: Arc::new(Default::default()),
522            rows: Default::default(),
523            chunked_value: false,
524        };
525        assert!(rs.next().is_none());
526    }
527
528    #[test]
529    fn test_rs_next_record_chunked_or_not() {
530        let rs = |values| ResultSet {
531            fields: Arc::new(vec![field("column1"), field("column2")]),
532            index: Arc::new(Default::default()),
533            rows: VecDeque::from(values),
534            chunked_value: false,
535        };
536        let mut rs1 = rs(vec![value("value1")]);
537        assert!(rs1.next().is_none());
538        let mut rs2 = rs(vec![value("value1"), value("value2")]);
539        assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
540    }
541
542    #[test]
543    fn test_rs_next_value_chunked_or_not() {
544        let rs = |chunked_value| ResultSet {
545            fields: Arc::new(vec![field("column1"), field("column2")]),
546            index: Arc::new(Default::default()),
547            rows: VecDeque::from(vec![value("value1"), value("value2")]),
548            chunked_value,
549        };
550        assert!(rs(true).next().is_none());
551        assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
552    }
553
554    #[test]
555    fn test_rs_next_plural_record_one_column() {
556        let rs = |chunked_value| ResultSet {
557            fields: Arc::new(vec![field("column1")]),
558            index: Arc::new(Default::default()),
559            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
560            chunked_value,
561        };
562        let mut incomplete = rs(true);
563        assert!(incomplete.next().is_some());
564        assert!(incomplete.next().is_some());
565        assert!(incomplete.next().is_none());
566        let mut complete = rs(false);
567        assert!(complete.next().is_some());
568        assert!(complete.next().is_some());
569        assert!(complete.next().is_some());
570        assert!(complete.next().is_none());
571    }
572
573    #[test]
574    fn test_rs_next_plural_record_multi_column() {
575        let rs = |chunked_value| ResultSet {
576            fields: Arc::new(vec![field("column1"), field("column2")]),
577            index: Arc::new(Default::default()),
578            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
579            chunked_value,
580        };
581        let mut incomplete = rs(true);
582        assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
583        assert!(incomplete.next().is_none());
584        let mut complete = rs(false);
585        assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
586        assert!(incomplete.next().is_none());
587    }
588
589    #[test]
590    fn test_rs_merge_string_value() {
591        let result = ResultSet::merge(value("val"), value("ue1"));
592        assert!(result.is_ok());
593        let kind = result.unwrap().kind.unwrap();
594        match kind {
595            Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
596            _ => unreachable!("must be string value"),
597        }
598    }
599
600    #[test]
601    fn test_rs_merge_list_value() {
602        let previous_last = value(vec!["value1-1", "value1-2", "val"]);
603        let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
604        let result = ResultSet::merge(previous_last, current_first);
605        assert!(result.is_ok());
606        let kind = result.unwrap().kind.unwrap();
607        match kind {
608            Kind::ListValue(v) => {
609                assert_eq!(v.values.len(), 5);
610                match v.values[0].kind.as_ref().unwrap() {
611                    Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
612                    _ => unreachable!("must be string value"),
613                };
614                match v.values[1].kind.as_ref().unwrap() {
615                    Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
616                    _ => unreachable!("must be string value"),
617                };
618                match v.values[2].kind.as_ref().unwrap() {
619                    Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
620                    _ => unreachable!("must be string value"),
621                };
622                match v.values[3].kind.as_ref().unwrap() {
623                    Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
624                    _ => unreachable!("must be string value"),
625                }
626                match v.values[4].kind.as_ref().unwrap() {
627                    Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
628                    _ => unreachable!("must be string value"),
629                }
630            }
631            _ => unreachable!("must be string value"),
632        }
633    }
634
635    #[test]
636    fn test_rs_add_one_column_no_chunked_value() {
637        let mut rs = empty_rs();
638        let metadata = Some(ResultSetMetadata {
639            row_type: Some(StructType {
640                fields: vec![field("column1")],
641            }),
642            transaction: None,
643            undeclared_parameters: None,
644        });
645        let values = vec![value("value1"), value("value2"), value("value3")];
646        assert!(rs.add(metadata, values, false).unwrap());
647        assert_eq!(rs.rows.len(), 3);
648        assert_one_column(&rs);
649        assert!(!rs.chunked_value);
650
651        assert_some_one_column(rs.next(), "value1".to_string());
652        assert_some_one_column(rs.next(), "value2".to_string());
653        assert_some_one_column(rs.next(), "value3".to_string());
654        assert!(rs.next().is_none());
655    }
656
657    #[test]
658    fn test_rs_add_multi_column_no_chunked_value() {
659        let mut rs = empty_rs();
660        let metadata = Some(ResultSetMetadata {
661            row_type: Some(StructType {
662                fields: vec![field("column1"), field("column2")],
663            }),
664            transaction: None,
665            undeclared_parameters: None,
666        });
667        let values = vec![value("value1"), value("value2"), value("value3")];
668        assert!(rs.add(metadata, values, false).unwrap());
669        assert_eq!(rs.rows.len(), 3);
670        assert_multi_column(&rs);
671        assert!(!rs.chunked_value);
672
673        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
674        assert!(rs.next().is_none());
675    }
676
677    #[test]
678    fn test_rs_add_multi_column_no_chunked_value_just() {
679        let mut rs = empty_rs();
680        let metadata = Some(ResultSetMetadata {
681            row_type: Some(StructType {
682                fields: vec![field("column1"), field("column2")],
683            }),
684            transaction: None,
685            undeclared_parameters: None,
686        });
687        let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
688        assert!(rs.add(metadata, values, false).unwrap());
689        assert_eq!(rs.rows.len(), 4);
690        assert_multi_column(&rs);
691        assert!(!rs.chunked_value);
692
693        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
694        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
695        assert!(rs.next().is_none());
696    }
697
698    #[test]
699    fn test_rs_add_one_column_chunked_value() {
700        let mut rs = empty_rs();
701        let metadata = Some(ResultSetMetadata {
702            row_type: Some(StructType {
703                fields: vec![field("column1")],
704            }),
705            transaction: None,
706            undeclared_parameters: None,
707        });
708        let values = vec![value("value1"), value("value2"), value("val")];
709        assert!(rs.add(metadata.clone(), values, true).unwrap());
710        assert_eq!(rs.rows.len(), 3);
711        assert_one_column(&rs);
712        assert!(rs.chunked_value);
713
714        assert_some_one_column(rs.next(), "value1".to_string());
715        assert_some_one_column(rs.next(), "value2".to_string());
716        assert!(rs.next().is_none());
717
718        // add next stream data
719        assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
720        assert!(!rs.chunked_value);
721        assert_eq!(rs.rows.len(), 1);
722        assert_some_one_column(rs.next(), "value3".to_string());
723        assert!(rs.next().is_none());
724    }
725
726    #[test]
727    fn test_rs_add_multi_column_chunked_value() {
728        let mut rs = empty_rs();
729        let metadata = Some(ResultSetMetadata {
730            row_type: Some(StructType {
731                fields: vec![field("column1"), field("column2")],
732            }),
733            transaction: None,
734            undeclared_parameters: None,
735        });
736        let values = vec![value("value1"), value("value2"), value("val")];
737        assert!(rs.add(metadata.clone(), values, true).unwrap());
738        assert_eq!(rs.rows.len(), 3);
739        assert_multi_column(&rs);
740        assert!(rs.chunked_value);
741
742        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
743        assert!(rs.next().is_none());
744
745        // add next stream data
746        assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
747        assert!(!rs.chunked_value);
748        assert_eq!(rs.rows.len(), 1);
749        assert!(rs.next().is_none());
750
751        // add next stream data
752        assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
753        assert!(!rs.chunked_value);
754        assert_eq!(rs.rows.len(), 2);
755        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
756    }
757
758    #[test]
759    fn test_rs_add_multi_column_no_chunked_value_list_value() {
760        let mut rs = empty_rs();
761        let metadata = Some(ResultSetMetadata {
762            row_type: Some(StructType {
763                fields: vec![field("column1"), field("column2")],
764            }),
765            transaction: None,
766            undeclared_parameters: None,
767        });
768        let values = vec![value(vec!["value1-1", "value1-2"])];
769        assert!(rs.add(metadata.clone(), values, false).unwrap());
770        assert_eq!(rs.rows.len(), 1);
771        assert_multi_column(&rs);
772        assert!(!rs.chunked_value);
773        assert!(rs.next().is_none());
774        assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
775        assert!(!rs.chunked_value);
776        assert_eq!(rs.rows.len(), 2);
777        assert_some_multi_column(
778            rs.next(),
779            vec!["value1-1".to_string(), "value1-2".to_string()],
780            vec!["value2-1".to_string()],
781        );
782        assert!(rs.next().is_none());
783    }
784
785    #[test]
786    fn test_rs_add_multi_column_chunked_value_list_value() {
787        let mut rs = empty_rs();
788        let metadata = Some(ResultSetMetadata {
789            row_type: Some(StructType {
790                fields: vec![field("column1"), field("column2")],
791            }),
792            transaction: None,
793            undeclared_parameters: None,
794        });
795        let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
796        assert!(rs.add(metadata.clone(), values, true).unwrap());
797        assert_eq!(rs.rows.len(), 2);
798        assert_multi_column(&rs);
799        assert!(rs.chunked_value);
800        assert!(rs.next().is_none());
801
802        // add next stream data
803        assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
804        assert!(rs.chunked_value);
805        assert_eq!(rs.rows.len(), 2);
806        assert!(rs.next().is_none());
807
808        // add next stream data
809        assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
810        assert!(!rs.chunked_value);
811        assert_eq!(rs.rows.len(), 2);
812        assert_some_multi_column(
813            rs.next(),
814            vec!["value1-1".to_string(), "value1-2".to_string()],
815            vec!["value2-1".to_string(), "value2-2".to_string()],
816        );
817        assert!(rs.next().is_none());
818    }
819
820    #[test]
821    fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
822        let mut rs = empty_rs();
823        let metadata = Some(ResultSetMetadata {
824            row_type: Some(StructType {
825                fields: vec![field("column1"), field("column2")],
826            }),
827            transaction: None,
828            undeclared_parameters: None,
829        });
830        let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
831        assert!(rs.add(metadata.clone(), values, true).unwrap());
832        assert_eq!(rs.rows.len(), 2);
833        assert_multi_column(&rs);
834        assert!(rs.chunked_value);
835        assert!(rs.next().is_none());
836
837        // add next stream data
838        assert!(rs
839            .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
840            .unwrap());
841        assert!(rs.chunked_value);
842        assert_eq!(rs.rows.len(), 3);
843        assert_some_multi_column(
844            rs.next(),
845            vec!["value1-1".to_string(), "value1-2".to_string()],
846            "valueA".to_string(),
847        );
848        assert!(rs.next().is_none());
849
850        // add next stream data
851        assert!(rs
852            .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
853            .unwrap());
854        assert!(!rs.chunked_value);
855        assert_eq!(rs.rows.len(), 1);
856        assert!(rs.next().is_none());
857
858        // add next stream data
859        assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
860        assert!(rs.chunked_value);
861        assert_eq!(rs.rows.len(), 2);
862        assert!(rs.next().is_none());
863
864        // add next stream data
865        assert!(rs.add(metadata, vec![value("B")], false).unwrap());
866        assert!(!rs.chunked_value);
867        assert_eq!(rs.rows.len(), 2);
868        assert_some_multi_column(
869            rs.next(),
870            vec!["value2-1".to_string(), "value2-2".to_string()],
871            "valueB".to_string(),
872        );
873        assert!(rs.next().is_none());
874    }
875
876    #[test]
877    fn test_prs_buffer_waits_for_resume_token() {
878        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
879        buffer.push(prs(vec![value("value-1")], "", false));
880        assert!(buffer.pop_ready(false).is_none());
881
882        buffer.push(prs(vec![value("value-2")], "token-1", false));
883        assert!(buffer.pop_ready(false).is_some());
884        assert!(buffer.pop_ready(false).is_some());
885        assert!(buffer.pop_ready(false).is_none());
886    }
887
888    #[test]
889    fn test_prs_buffer_flushes_on_end_of_stream() {
890        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
891        buffer.push(prs(vec![value("value-1")], "", false));
892        assert!(buffer.pop_ready(false).is_none());
893        assert!(buffer.pop_ready(true).is_some());
894        assert!(buffer.pop_ready(true).is_none());
895    }
896
897    #[test]
898    fn test_prs_buffer_becomes_unretryable_after_limit() {
899        let mut buffer = super::ResumablePartialResultSetBuffer::new(1);
900        buffer.push(prs(vec![value("value-1")], "", false));
901        assert!(buffer.unretryable);
902        assert!(buffer.pop_ready(false).is_some());
903    }
904
905    #[test]
906    fn test_prs_buffer_on_resumption_discards_pending() {
907        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
908        buffer.push(prs(vec![value("value-1")], "", false));
909        buffer.on_resumption();
910        buffer.push(prs(vec![value("value-2")], "token-1", false));
911        assert!(buffer.pop_ready(false).is_some());
912        assert!(buffer.pop_ready(false).is_none());
913    }
914
915    #[test]
916    fn test_rs_is_row_boundary_empty() {
917        let rs = empty_rs();
918        assert!(rs.is_row_boundary());
919    }
920
921    #[test]
922    fn test_rs_is_row_boundary_chunked() {
923        let rs = ResultSet {
924            fields: Arc::new(vec![field("column1")]),
925            index: Arc::new(Default::default()),
926            rows: VecDeque::from(vec![value("value1")]),
927            chunked_value: true,
928        };
929        assert!(!rs.is_row_boundary());
930    }
931
932    #[test]
933    fn test_rs_is_row_boundary_multiple_columns() {
934        let rs_complete = ResultSet {
935            fields: Arc::new(vec![field("column1"), field("column2")]),
936            index: Arc::new(Default::default()),
937            rows: VecDeque::from(vec![value("value1"), value("value2")]),
938            chunked_value: false,
939        };
940        assert!(rs_complete.is_row_boundary());
941
942        let rs_partial = ResultSet {
943            fields: Arc::new(vec![field("column1"), field("column2")]),
944            index: Arc::new(Default::default()),
945            rows: VecDeque::from(vec![value("value1")]),
946            chunked_value: false,
947        };
948        assert!(!rs_partial.is_row_boundary());
949    }
950}