Skip to main content

gcloud_spanner/
reader.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost::Message;
5use prost_types::{value::Kind, Value};
6
7use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
8use google_cloud_googleapis::spanner::v1::struct_type::Field;
9use google_cloud_googleapis::spanner::v1::{
10    ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
11};
12
13use crate::retry::StreamingRetry;
14use crate::row::Row;
15use crate::session::SessionHandle;
16use crate::transaction::CallOptions;
17
18pub trait Reader: Send + Sync {
19    fn read(
20        &self,
21        session: &mut SessionHandle,
22        option: Option<CallOptions>,
23        disable_route_to_leader: bool,
24    ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
25
26    fn update_token(&mut self, resume_token: Vec<u8>);
27
28    fn can_resume(&self) -> bool;
29}
30
31pub struct StatementReader {
32    pub enable_resume: bool,
33    pub request: ExecuteSqlRequest,
34}
35
36impl Reader for StatementReader {
37    async fn read(
38        &self,
39        session: &mut SessionHandle,
40        option: Option<CallOptions>,
41        disable_route_to_leader: bool,
42    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
43        let option = option.unwrap_or_default();
44        let client = &mut session.spanner_client;
45        let result = client
46            .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
47            .await;
48        session.invalidate_if_needed(result).await
49    }
50
51    fn update_token(&mut self, resume_token: Vec<u8>) {
52        self.request.resume_token = resume_token;
53    }
54
55    fn can_resume(&self) -> bool {
56        self.enable_resume && !self.request.resume_token.is_empty()
57    }
58}
59
60pub struct TableReader {
61    pub request: ReadRequest,
62}
63
64impl Reader for TableReader {
65    async fn read(
66        &self,
67        session: &mut SessionHandle,
68        option: Option<CallOptions>,
69        disable_route_to_leader: bool,
70    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
71        let option = option.unwrap_or_default();
72        let client = &mut session.spanner_client;
73        let result = client
74            .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
75            .await;
76        session.invalidate_if_needed(result).await
77    }
78
79    fn update_token(&mut self, resume_token: Vec<u8>) {
80        self.request.resume_token = resume_token;
81    }
82
83    fn can_resume(&self) -> bool {
84        !self.request.resume_token.is_empty()
85    }
86}
87
88pub struct ResultSet {
89    fields: Arc<Vec<Field>>,
90    index: Arc<HashMap<String, usize>>,
91    rows: VecDeque<Value>,
92    chunked_value: bool,
93}
94
95const DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS: usize = 128 * 1024 * 1024;
96
97#[derive(Debug)]
98struct ResumablePartialResultSetBuffer {
99    pending: VecDeque<PartialResultSet>,
100    last_delivered_token: Vec<u8>,
101    observed_token: Vec<u8>,
102    bytes_between_tokens: usize,
103    max_bytes_between_tokens: usize,
104    unretryable: bool,
105}
106
107impl ResumablePartialResultSetBuffer {
108    fn new(max_bytes_between_tokens: usize) -> Self {
109        Self {
110            pending: VecDeque::new(),
111            last_delivered_token: Vec::new(),
112            observed_token: Vec::new(),
113            bytes_between_tokens: 0,
114            max_bytes_between_tokens,
115            unretryable: false,
116        }
117    }
118
119    fn push(&mut self, result_set: PartialResultSet) {
120        if !result_set.resume_token.is_empty() && result_set.resume_token != self.observed_token {
121            self.observed_token = result_set.resume_token.clone();
122        }
123
124        if !self.unretryable && self.observed_token == self.last_delivered_token {
125            self.bytes_between_tokens = self.bytes_between_tokens.saturating_add(result_set.encoded_len());
126            if self.bytes_between_tokens >= self.max_bytes_between_tokens {
127                self.unretryable = true;
128            }
129        }
130
131        self.pending.push_back(result_set);
132    }
133
134    fn pop_ready(&mut self, end_of_stream: bool) -> Option<PartialResultSet> {
135        if self.pending.is_empty() {
136            return None;
137        }
138
139        if self.unretryable || end_of_stream {
140            return self.pending.pop_front();
141        }
142
143        if self.observed_token != self.last_delivered_token {
144            let result_set = self.pending.pop_front();
145            if let Some(ref rs) = result_set {
146                if !rs.resume_token.is_empty() && rs.resume_token == self.observed_token {
147                    self.last_delivered_token = self.observed_token.clone();
148                    self.bytes_between_tokens = 0;
149                }
150            }
151            return result_set;
152        }
153
154        None
155    }
156
157    fn on_resumption(&mut self) {
158        self.pending.clear();
159        self.observed_token = self.last_delivered_token.clone();
160        self.bytes_between_tokens = 0;
161        self.unretryable = false;
162    }
163}
164
165impl ResultSet {
166    fn next(&mut self) -> Option<Row> {
167        if !self.rows.is_empty() {
168            let column_length = self.fields.len();
169            let target_record_is_chunked = self.rows.len() < column_length;
170            let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
171
172            if !target_record_is_chunked && !target_record_contains_chunked_value {
173                // get column_length values
174                let mut values = Vec::with_capacity(column_length);
175                for _ in 0..column_length {
176                    values.push(self.rows.pop_front().unwrap());
177                }
178                return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
179            }
180        }
181        None
182    }
183
184    /// Merge tries to combine two protobuf Values if possible.
185    fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
186        match previous_last.kind.unwrap() {
187            Kind::StringValue(last) => match current_first.kind.unwrap() {
188                Kind::StringValue(first) => {
189                    tracing::trace!("previous_last={}, current_first={}", &last, first);
190                    Ok(Value {
191                        kind: Some(Kind::StringValue(last + &first)),
192                    })
193                }
194                _ => Err(Status::new(
195                    Code::Internal,
196                    "chunks kind mismatch: current_first must be StringKind",
197                )),
198            },
199            Kind::ListValue(mut last) => match current_first.kind.unwrap() {
200                Kind::ListValue(mut first) => {
201                    let first_value_of_current = first.values.remove(0);
202                    let merged = match last.values.pop() {
203                        Some(last_value_of_previous) => {
204                            ResultSet::merge(last_value_of_previous, first_value_of_current)?
205                        }
206                        // last record can be empty
207                        None => first_value_of_current,
208                    };
209                    last.values.push(merged);
210                    last.values.extend(first.values);
211                    Ok(Value {
212                        kind: Some(Kind::ListValue(last)),
213                    })
214                }
215                _ => Err(Status::new(
216                    Code::Internal,
217                    "chunks kind mismatch: current_first must be ListValue",
218                )),
219            },
220            _ => Err(Status::new(
221                Code::Internal,
222                "previous_last kind mismatch: only StringValue and ListValue can be chunked",
223            )),
224        }
225    }
226
227    fn add(
228        &mut self,
229        metadata: Option<ResultSetMetadata>,
230        mut values: Vec<Value>,
231        chunked_value: bool,
232    ) -> Result<bool, Status> {
233        // get metadata only once.
234        if self.fields.is_empty() {
235            if let Some(metadata) = metadata {
236                self.fields = metadata
237                    .row_type
238                    .map(|e| Arc::new(e.fields))
239                    .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
240                // create index for Row::column_by_name("column_name")
241                let mut index = HashMap::new();
242                for (i, f) in self.fields.iter().enumerate() {
243                    index.insert(f.name.clone(), i);
244                }
245                self.index = Arc::new(index);
246            }
247        }
248
249        if self.chunked_value {
250            tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
251            //merge when the chunked value is found.
252            let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
253            self.rows.push_back(merged);
254        }
255        self.rows.extend(values);
256        self.chunked_value = chunked_value;
257        Ok(true)
258    }
259
260    fn is_row_boundary(&self) -> bool {
261        if self.fields.is_empty() {
262            return self.rows.is_empty() && !self.chunked_value;
263        }
264        if self.chunked_value {
265            return false;
266        }
267        let columns = self.fields.len();
268        if columns == 0 {
269            return self.rows.is_empty();
270        }
271        self.rows.len().is_multiple_of(columns)
272    }
273}
274
275pub struct RowIterator<'a, T>
276where
277    T: Reader,
278{
279    streaming: Streaming<PartialResultSet>,
280    session: &'a mut SessionHandle,
281    reader: T,
282    rs: ResultSet,
283    reader_option: Option<CallOptions>,
284    disable_route_to_leader: bool,
285    stats: Option<ResultSetStats>,
286    prs_buffer: ResumablePartialResultSetBuffer,
287    resumable: bool,
288    end_of_stream: bool,
289    stream_retry: StreamingRetry,
290}
291
292impl<'a, T> RowIterator<'a, T>
293where
294    T: Reader,
295{
296    pub(crate) async fn new(
297        session: &'a mut SessionHandle,
298        reader: T,
299        option: Option<CallOptions>,
300        disable_route_to_leader: bool,
301    ) -> Result<RowIterator<'a, T>, Status> {
302        let streaming = reader
303            .read(session, option, disable_route_to_leader)
304            .await?
305            .into_inner();
306        let rs = ResultSet {
307            fields: Arc::new(vec![]),
308            index: Arc::new(HashMap::new()),
309            rows: VecDeque::new(),
310            chunked_value: false,
311        };
312        Ok(Self {
313            streaming,
314            session,
315            reader,
316            rs,
317            reader_option: None,
318            disable_route_to_leader,
319            stats: None,
320            prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
321            resumable: true,
322            end_of_stream: false,
323            stream_retry: StreamingRetry::new(),
324        })
325    }
326
327    pub fn set_call_options(&mut self, option: CallOptions) {
328        self.reader_option = Some(option);
329    }
330
331    async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
332        loop {
333            if let Some(result_set) = self.prs_buffer.pop_ready(self.end_of_stream) {
334                if result_set.values.is_empty() {
335                    return Ok(false);
336                }
337                let resume_token_present = !result_set.resume_token.is_empty();
338                //if resume_token changes set new resume_token
339                if resume_token_present {
340                    self.reader.update_token(result_set.resume_token.clone());
341                }
342                // Capture stats if present (only sent with the last response)
343                if result_set.stats.is_some() {
344                    self.stats = result_set.stats;
345                }
346                let added = self
347                    .rs
348                    .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
349                if resume_token_present && !self.rs.is_row_boundary() {
350                    return Err(Status::new(Code::FailedPrecondition, "resume token is not on a row boundary"));
351                }
352                return Ok(added);
353            }
354
355            if self.end_of_stream {
356                return Ok(false);
357            }
358
359            let received = match self.streaming.message().await {
360                Ok(s) => s,
361                Err(e) => {
362                    if !self.reader.can_resume() || !self.resumable {
363                        return Err(e);
364                    }
365                    tracing::debug!("streaming error: {}. resume reading by resume_token", e);
366                    self.stream_retry.next(e).await?;
367                    let call_option = option.clone();
368                    let result = self
369                        .reader
370                        .read(self.session, call_option, self.disable_route_to_leader)
371                        .await?;
372                    self.streaming = result.into_inner();
373                    self.prs_buffer.on_resumption();
374                    continue;
375                }
376            };
377
378            match received {
379                Some(result_set) => {
380                    if result_set.last {
381                        self.end_of_stream = true;
382                    }
383                    self.prs_buffer.push(result_set);
384                    if self.prs_buffer.unretryable {
385                        self.resumable = false;
386                    }
387                }
388                None => {
389                    self.end_of_stream = true;
390                }
391            }
392        }
393    }
394
395    /// Return metadata for all columns
396    pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
397        &self.rs.fields
398    }
399
400    pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
401        for (i, val) in self.rs.fields.iter().enumerate() {
402            if val.name == column_name {
403                return Some((i, val.clone()));
404            }
405        }
406        None
407    }
408
409    /// Returns query execution statistics if available.
410    /// Stats are only available after all rows have been consumed and only when
411    /// the query was executed with a QueryMode that includes stats (Profile, WithStats, or WithPlanAndStats).
412    pub fn stats(&self) -> Option<&ResultSetStats> {
413        self.stats.as_ref()
414    }
415
416    /// next returns the next result.
417    /// Its second return value is None if there are no more results.
418    pub async fn next(&mut self) -> Result<Option<Row>, Status> {
419        loop {
420            let row = self.rs.next();
421            if row.is_some() {
422                return Ok(row);
423            }
424            // no data found or record chunked.
425            if !self.try_recv(self.reader_option.clone()).await? {
426                return Ok(None);
427            }
428        }
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use std::collections::VecDeque;
435    use std::sync::Arc;
436
437    use prost_types::value::Kind;
438    use prost_types::Value;
439
440    use google_cloud_googleapis::spanner::v1::struct_type::Field;
441    use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
442
443    use crate::reader::ResultSet;
444    use crate::row::{Row, TryFromValue};
445    use crate::statement::ToKind;
446
447    fn empty_rs() -> ResultSet {
448        ResultSet {
449            fields: Arc::new(vec![]),
450            index: Arc::new(Default::default()),
451            rows: Default::default(),
452            chunked_value: false,
453        }
454    }
455
456    fn field(name: &str) -> Field {
457        Field {
458            name: name.to_string(),
459            r#type: None,
460        }
461    }
462
463    fn value(to_kind: impl ToKind) -> Value {
464        Value {
465            kind: Some(to_kind.to_kind()),
466        }
467    }
468
469    fn prs(values: Vec<Value>, resume_token: &str, chunked_value: bool) -> PartialResultSet {
470        PartialResultSet {
471            metadata: None,
472            values,
473            chunked_value,
474            resume_token: resume_token.as_bytes().to_vec(),
475            stats: None,
476            precommit_token: None,
477            last: false,
478        }
479    }
480
481    fn assert_one_column(rs: &ResultSet) {
482        assert_eq!(rs.fields.len(), 1);
483        assert_eq!(rs.fields[0].name, "column1".to_string());
484        assert_eq!(*rs.index.get("column1").unwrap(), 0);
485    }
486
487    fn assert_multi_column(rs: &ResultSet) {
488        assert_eq!(rs.fields.len(), 2);
489        assert_eq!(rs.fields[0].name, "column1".to_string());
490        assert_eq!(rs.fields[1].name, "column2".to_string());
491        assert_eq!(*rs.index.get("column1").unwrap(), 0);
492        assert_eq!(*rs.index.get("column2").unwrap(), 1);
493    }
494
495    fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
496        assert!(row.is_some());
497        assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
498    }
499
500    fn assert_some_multi_column<
501        T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
502        T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
503    >(
504        row: Option<Row>,
505        v1: T1,
506        v2: T2,
507    ) {
508        assert!(row.is_some());
509        let v = row.unwrap();
510        assert_eq!(v1, v.column::<T1>(0).unwrap());
511        assert_eq!(v2, v.column::<T2>(1).unwrap());
512    }
513
514    #[test]
515    fn test_rs_next_empty() {
516        let mut rs = ResultSet {
517            fields: Arc::new(vec![field("column1")]),
518            index: Arc::new(Default::default()),
519            rows: Default::default(),
520            chunked_value: false,
521        };
522        assert!(rs.next().is_none());
523    }
524
525    #[test]
526    fn test_rs_next_record_chunked_or_not() {
527        let rs = |values| ResultSet {
528            fields: Arc::new(vec![field("column1"), field("column2")]),
529            index: Arc::new(Default::default()),
530            rows: VecDeque::from(values),
531            chunked_value: false,
532        };
533        let mut rs1 = rs(vec![value("value1")]);
534        assert!(rs1.next().is_none());
535        let mut rs2 = rs(vec![value("value1"), value("value2")]);
536        assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
537    }
538
539    #[test]
540    fn test_rs_next_value_chunked_or_not() {
541        let rs = |chunked_value| ResultSet {
542            fields: Arc::new(vec![field("column1"), field("column2")]),
543            index: Arc::new(Default::default()),
544            rows: VecDeque::from(vec![value("value1"), value("value2")]),
545            chunked_value,
546        };
547        assert!(rs(true).next().is_none());
548        assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
549    }
550
551    #[test]
552    fn test_rs_next_plural_record_one_column() {
553        let rs = |chunked_value| ResultSet {
554            fields: Arc::new(vec![field("column1")]),
555            index: Arc::new(Default::default()),
556            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
557            chunked_value,
558        };
559        let mut incomplete = rs(true);
560        assert!(incomplete.next().is_some());
561        assert!(incomplete.next().is_some());
562        assert!(incomplete.next().is_none());
563        let mut complete = rs(false);
564        assert!(complete.next().is_some());
565        assert!(complete.next().is_some());
566        assert!(complete.next().is_some());
567        assert!(complete.next().is_none());
568    }
569
570    #[test]
571    fn test_rs_next_plural_record_multi_column() {
572        let rs = |chunked_value| ResultSet {
573            fields: Arc::new(vec![field("column1"), field("column2")]),
574            index: Arc::new(Default::default()),
575            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
576            chunked_value,
577        };
578        let mut incomplete = rs(true);
579        assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
580        assert!(incomplete.next().is_none());
581        let mut complete = rs(false);
582        assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
583        assert!(incomplete.next().is_none());
584    }
585
586    #[test]
587    fn test_rs_merge_string_value() {
588        let result = ResultSet::merge(value("val"), value("ue1"));
589        assert!(result.is_ok());
590        let kind = result.unwrap().kind.unwrap();
591        match kind {
592            Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
593            _ => unreachable!("must be string value"),
594        }
595    }
596
597    #[test]
598    fn test_rs_merge_list_value() {
599        let previous_last = value(vec!["value1-1", "value1-2", "val"]);
600        let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
601        let result = ResultSet::merge(previous_last, current_first);
602        assert!(result.is_ok());
603        let kind = result.unwrap().kind.unwrap();
604        match kind {
605            Kind::ListValue(v) => {
606                assert_eq!(v.values.len(), 5);
607                match v.values[0].kind.as_ref().unwrap() {
608                    Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
609                    _ => unreachable!("must be string value"),
610                };
611                match v.values[1].kind.as_ref().unwrap() {
612                    Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
613                    _ => unreachable!("must be string value"),
614                };
615                match v.values[2].kind.as_ref().unwrap() {
616                    Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
617                    _ => unreachable!("must be string value"),
618                };
619                match v.values[3].kind.as_ref().unwrap() {
620                    Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
621                    _ => unreachable!("must be string value"),
622                }
623                match v.values[4].kind.as_ref().unwrap() {
624                    Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
625                    _ => unreachable!("must be string value"),
626                }
627            }
628            _ => unreachable!("must be string value"),
629        }
630    }
631
632    #[test]
633    fn test_rs_add_one_column_no_chunked_value() {
634        let mut rs = empty_rs();
635        let metadata = Some(ResultSetMetadata {
636            row_type: Some(StructType {
637                fields: vec![field("column1")],
638            }),
639            transaction: None,
640            undeclared_parameters: None,
641        });
642        let values = vec![value("value1"), value("value2"), value("value3")];
643        assert!(rs.add(metadata, values, false).unwrap());
644        assert_eq!(rs.rows.len(), 3);
645        assert_one_column(&rs);
646        assert!(!rs.chunked_value);
647
648        assert_some_one_column(rs.next(), "value1".to_string());
649        assert_some_one_column(rs.next(), "value2".to_string());
650        assert_some_one_column(rs.next(), "value3".to_string());
651        assert!(rs.next().is_none());
652    }
653
654    #[test]
655    fn test_rs_add_multi_column_no_chunked_value() {
656        let mut rs = empty_rs();
657        let metadata = Some(ResultSetMetadata {
658            row_type: Some(StructType {
659                fields: vec![field("column1"), field("column2")],
660            }),
661            transaction: None,
662            undeclared_parameters: None,
663        });
664        let values = vec![value("value1"), value("value2"), value("value3")];
665        assert!(rs.add(metadata, values, false).unwrap());
666        assert_eq!(rs.rows.len(), 3);
667        assert_multi_column(&rs);
668        assert!(!rs.chunked_value);
669
670        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
671        assert!(rs.next().is_none());
672    }
673
674    #[test]
675    fn test_rs_add_multi_column_no_chunked_value_just() {
676        let mut rs = empty_rs();
677        let metadata = Some(ResultSetMetadata {
678            row_type: Some(StructType {
679                fields: vec![field("column1"), field("column2")],
680            }),
681            transaction: None,
682            undeclared_parameters: None,
683        });
684        let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
685        assert!(rs.add(metadata, values, false).unwrap());
686        assert_eq!(rs.rows.len(), 4);
687        assert_multi_column(&rs);
688        assert!(!rs.chunked_value);
689
690        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
691        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
692        assert!(rs.next().is_none());
693    }
694
695    #[test]
696    fn test_rs_add_one_column_chunked_value() {
697        let mut rs = empty_rs();
698        let metadata = Some(ResultSetMetadata {
699            row_type: Some(StructType {
700                fields: vec![field("column1")],
701            }),
702            transaction: None,
703            undeclared_parameters: None,
704        });
705        let values = vec![value("value1"), value("value2"), value("val")];
706        assert!(rs.add(metadata.clone(), values, true).unwrap());
707        assert_eq!(rs.rows.len(), 3);
708        assert_one_column(&rs);
709        assert!(rs.chunked_value);
710
711        assert_some_one_column(rs.next(), "value1".to_string());
712        assert_some_one_column(rs.next(), "value2".to_string());
713        assert!(rs.next().is_none());
714
715        // add next stream data
716        assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
717        assert!(!rs.chunked_value);
718        assert_eq!(rs.rows.len(), 1);
719        assert_some_one_column(rs.next(), "value3".to_string());
720        assert!(rs.next().is_none());
721    }
722
723    #[test]
724    fn test_rs_add_multi_column_chunked_value() {
725        let mut rs = empty_rs();
726        let metadata = Some(ResultSetMetadata {
727            row_type: Some(StructType {
728                fields: vec![field("column1"), field("column2")],
729            }),
730            transaction: None,
731            undeclared_parameters: None,
732        });
733        let values = vec![value("value1"), value("value2"), value("val")];
734        assert!(rs.add(metadata.clone(), values, true).unwrap());
735        assert_eq!(rs.rows.len(), 3);
736        assert_multi_column(&rs);
737        assert!(rs.chunked_value);
738
739        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
740        assert!(rs.next().is_none());
741
742        // add next stream data
743        assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
744        assert!(!rs.chunked_value);
745        assert_eq!(rs.rows.len(), 1);
746        assert!(rs.next().is_none());
747
748        // add next stream data
749        assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
750        assert!(!rs.chunked_value);
751        assert_eq!(rs.rows.len(), 2);
752        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
753    }
754
755    #[test]
756    fn test_rs_add_multi_column_no_chunked_value_list_value() {
757        let mut rs = empty_rs();
758        let metadata = Some(ResultSetMetadata {
759            row_type: Some(StructType {
760                fields: vec![field("column1"), field("column2")],
761            }),
762            transaction: None,
763            undeclared_parameters: None,
764        });
765        let values = vec![value(vec!["value1-1", "value1-2"])];
766        assert!(rs.add(metadata.clone(), values, false).unwrap());
767        assert_eq!(rs.rows.len(), 1);
768        assert_multi_column(&rs);
769        assert!(!rs.chunked_value);
770        assert!(rs.next().is_none());
771        assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
772        assert!(!rs.chunked_value);
773        assert_eq!(rs.rows.len(), 2);
774        assert_some_multi_column(
775            rs.next(),
776            vec!["value1-1".to_string(), "value1-2".to_string()],
777            vec!["value2-1".to_string()],
778        );
779        assert!(rs.next().is_none());
780    }
781
782    #[test]
783    fn test_rs_add_multi_column_chunked_value_list_value() {
784        let mut rs = empty_rs();
785        let metadata = Some(ResultSetMetadata {
786            row_type: Some(StructType {
787                fields: vec![field("column1"), field("column2")],
788            }),
789            transaction: None,
790            undeclared_parameters: None,
791        });
792        let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
793        assert!(rs.add(metadata.clone(), values, true).unwrap());
794        assert_eq!(rs.rows.len(), 2);
795        assert_multi_column(&rs);
796        assert!(rs.chunked_value);
797        assert!(rs.next().is_none());
798
799        // add next stream data
800        assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
801        assert!(rs.chunked_value);
802        assert_eq!(rs.rows.len(), 2);
803        assert!(rs.next().is_none());
804
805        // add next stream data
806        assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
807        assert!(!rs.chunked_value);
808        assert_eq!(rs.rows.len(), 2);
809        assert_some_multi_column(
810            rs.next(),
811            vec!["value1-1".to_string(), "value1-2".to_string()],
812            vec!["value2-1".to_string(), "value2-2".to_string()],
813        );
814        assert!(rs.next().is_none());
815    }
816
817    #[test]
818    fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
819        let mut rs = empty_rs();
820        let metadata = Some(ResultSetMetadata {
821            row_type: Some(StructType {
822                fields: vec![field("column1"), field("column2")],
823            }),
824            transaction: None,
825            undeclared_parameters: None,
826        });
827        let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
828        assert!(rs.add(metadata.clone(), values, true).unwrap());
829        assert_eq!(rs.rows.len(), 2);
830        assert_multi_column(&rs);
831        assert!(rs.chunked_value);
832        assert!(rs.next().is_none());
833
834        // add next stream data
835        assert!(rs
836            .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
837            .unwrap());
838        assert!(rs.chunked_value);
839        assert_eq!(rs.rows.len(), 3);
840        assert_some_multi_column(
841            rs.next(),
842            vec!["value1-1".to_string(), "value1-2".to_string()],
843            "valueA".to_string(),
844        );
845        assert!(rs.next().is_none());
846
847        // add next stream data
848        assert!(rs
849            .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
850            .unwrap());
851        assert!(!rs.chunked_value);
852        assert_eq!(rs.rows.len(), 1);
853        assert!(rs.next().is_none());
854
855        // add next stream data
856        assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
857        assert!(rs.chunked_value);
858        assert_eq!(rs.rows.len(), 2);
859        assert!(rs.next().is_none());
860
861        // add next stream data
862        assert!(rs.add(metadata, vec![value("B")], false).unwrap());
863        assert!(!rs.chunked_value);
864        assert_eq!(rs.rows.len(), 2);
865        assert_some_multi_column(
866            rs.next(),
867            vec!["value2-1".to_string(), "value2-2".to_string()],
868            "valueB".to_string(),
869        );
870        assert!(rs.next().is_none());
871    }
872
873    #[test]
874    fn test_prs_buffer_waits_for_resume_token() {
875        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
876        buffer.push(prs(vec![value("value-1")], "", false));
877        assert!(buffer.pop_ready(false).is_none());
878
879        buffer.push(prs(vec![value("value-2")], "token-1", false));
880        assert!(buffer.pop_ready(false).is_some());
881        assert!(buffer.pop_ready(false).is_some());
882        assert!(buffer.pop_ready(false).is_none());
883    }
884
885    #[test]
886    fn test_prs_buffer_flushes_on_end_of_stream() {
887        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
888        buffer.push(prs(vec![value("value-1")], "", false));
889        assert!(buffer.pop_ready(false).is_none());
890        assert!(buffer.pop_ready(true).is_some());
891        assert!(buffer.pop_ready(true).is_none());
892    }
893
894    #[test]
895    fn test_prs_buffer_becomes_unretryable_after_limit() {
896        let mut buffer = super::ResumablePartialResultSetBuffer::new(1);
897        buffer.push(prs(vec![value("value-1")], "", false));
898        assert!(buffer.unretryable);
899        assert!(buffer.pop_ready(false).is_some());
900    }
901
902    #[test]
903    fn test_prs_buffer_on_resumption_discards_pending() {
904        let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
905        buffer.push(prs(vec![value("value-1")], "", false));
906        buffer.on_resumption();
907        buffer.push(prs(vec![value("value-2")], "token-1", false));
908        assert!(buffer.pop_ready(false).is_some());
909        assert!(buffer.pop_ready(false).is_none());
910    }
911
912    #[test]
913    fn test_rs_is_row_boundary_empty() {
914        let rs = empty_rs();
915        assert!(rs.is_row_boundary());
916    }
917
918    #[test]
919    fn test_rs_is_row_boundary_chunked() {
920        let rs = ResultSet {
921            fields: Arc::new(vec![field("column1")]),
922            index: Arc::new(Default::default()),
923            rows: VecDeque::from(vec![value("value1")]),
924            chunked_value: true,
925        };
926        assert!(!rs.is_row_boundary());
927    }
928
929    #[test]
930    fn test_rs_is_row_boundary_multiple_columns() {
931        let rs_complete = ResultSet {
932            fields: Arc::new(vec![field("column1"), field("column2")]),
933            index: Arc::new(Default::default()),
934            rows: VecDeque::from(vec![value("value1"), value("value2")]),
935            chunked_value: false,
936        };
937        assert!(rs_complete.is_row_boundary());
938
939        let rs_partial = ResultSet {
940            fields: Arc::new(vec![field("column1"), field("column2")]),
941            index: Arc::new(Default::default()),
942            rows: VecDeque::from(vec![value("value1")]),
943            chunked_value: false,
944        };
945        assert!(!rs_partial.is_row_boundary());
946    }
947}