gcloud_spanner/
reader.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost_types::{value::Kind, Value};
5
6use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
7use google_cloud_googleapis::spanner::v1::struct_type::Field;
8use google_cloud_googleapis::spanner::v1::{
9    ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
10};
11
12use crate::row::Row;
13use crate::session::SessionHandle;
14use crate::transaction::CallOptions;
15
16pub trait Reader: Send + Sync {
17    fn read(
18        &self,
19        session: &mut SessionHandle,
20        option: Option<CallOptions>,
21        disable_route_to_leader: bool,
22    ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
23
24    fn update_token(&mut self, resume_token: Vec<u8>);
25
26    fn can_resume(&self) -> bool;
27}
28
29pub struct StatementReader {
30    pub enable_resume: bool,
31    pub request: ExecuteSqlRequest,
32}
33
34impl Reader for StatementReader {
35    async fn read(
36        &self,
37        session: &mut SessionHandle,
38        option: Option<CallOptions>,
39        disable_route_to_leader: bool,
40    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
41        let option = option.unwrap_or_default();
42        let client = &mut session.spanner_client;
43        let result = client
44            .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
45            .await;
46        session.invalidate_if_needed(result).await
47    }
48
49    fn update_token(&mut self, resume_token: Vec<u8>) {
50        self.request.resume_token = resume_token;
51    }
52
53    fn can_resume(&self) -> bool {
54        self.enable_resume && !self.request.resume_token.is_empty()
55    }
56}
57
58pub struct TableReader {
59    pub request: ReadRequest,
60}
61
62impl Reader for TableReader {
63    async fn read(
64        &self,
65        session: &mut SessionHandle,
66        option: Option<CallOptions>,
67        disable_route_to_leader: bool,
68    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
69        let option = option.unwrap_or_default();
70        let client = &mut session.spanner_client;
71        let result = client
72            .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
73            .await;
74        session.invalidate_if_needed(result).await
75    }
76
77    fn update_token(&mut self, resume_token: Vec<u8>) {
78        self.request.resume_token = resume_token;
79    }
80
81    fn can_resume(&self) -> bool {
82        !self.request.resume_token.is_empty()
83    }
84}
85
86pub struct ResultSet {
87    fields: Arc<Vec<Field>>,
88    index: Arc<HashMap<String, usize>>,
89    rows: VecDeque<Value>,
90    chunked_value: bool,
91}
92
93impl ResultSet {
94    fn next(&mut self) -> Option<Row> {
95        if !self.rows.is_empty() {
96            let column_length = self.fields.len();
97            let target_record_is_chunked = self.rows.len() < column_length;
98            let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
99
100            if !target_record_is_chunked && !target_record_contains_chunked_value {
101                // get column_length values
102                let mut values = Vec::with_capacity(column_length);
103                for _ in 0..column_length {
104                    values.push(self.rows.pop_front().unwrap());
105                }
106                return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
107            }
108        }
109        None
110    }
111
112    /// Merge tries to combine two protobuf Values if possible.
113    fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
114        match previous_last.kind.unwrap() {
115            Kind::StringValue(last) => match current_first.kind.unwrap() {
116                Kind::StringValue(first) => {
117                    tracing::trace!("previous_last={}, current_first={}", &last, first);
118                    Ok(Value {
119                        kind: Some(Kind::StringValue(last + &first)),
120                    })
121                }
122                _ => Err(Status::new(
123                    Code::Internal,
124                    "chunks kind mismatch: current_first must be StringKind",
125                )),
126            },
127            Kind::ListValue(mut last) => match current_first.kind.unwrap() {
128                Kind::ListValue(mut first) => {
129                    let first_value_of_current = first.values.remove(0);
130                    let merged = match last.values.pop() {
131                        Some(last_value_of_previous) => {
132                            ResultSet::merge(last_value_of_previous, first_value_of_current)?
133                        }
134                        // last record can be empty
135                        None => first_value_of_current,
136                    };
137                    last.values.push(merged);
138                    last.values.extend(first.values);
139                    Ok(Value {
140                        kind: Some(Kind::ListValue(last)),
141                    })
142                }
143                _ => Err(Status::new(
144                    Code::Internal,
145                    "chunks kind mismatch: current_first must be ListValue",
146                )),
147            },
148            _ => Err(Status::new(
149                Code::Internal,
150                "previous_last kind mismatch: only StringValue and ListValue can be chunked",
151            )),
152        }
153    }
154
155    fn add(
156        &mut self,
157        metadata: Option<ResultSetMetadata>,
158        mut values: Vec<Value>,
159        chunked_value: bool,
160    ) -> Result<bool, Status> {
161        // get metadata only once.
162        if self.fields.is_empty() {
163            if let Some(metadata) = metadata {
164                self.fields = metadata
165                    .row_type
166                    .map(|e| Arc::new(e.fields))
167                    .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
168                // create index for Row::column_by_name("column_name")
169                let mut index = HashMap::new();
170                for (i, f) in self.fields.iter().enumerate() {
171                    index.insert(f.name.clone(), i);
172                }
173                self.index = Arc::new(index);
174            }
175        }
176
177        if self.chunked_value {
178            tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
179            //merge when the chunked value is found.
180            let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
181            self.rows.push_back(merged);
182        }
183        self.rows.extend(values);
184        self.chunked_value = chunked_value;
185        Ok(true)
186    }
187}
188
189pub struct RowIterator<'a, T>
190where
191    T: Reader,
192{
193    streaming: Streaming<PartialResultSet>,
194    session: &'a mut SessionHandle,
195    reader: T,
196    rs: ResultSet,
197    reader_option: Option<CallOptions>,
198    disable_route_to_leader: bool,
199    stats: Option<ResultSetStats>,
200}
201
202impl<'a, T> RowIterator<'a, T>
203where
204    T: Reader,
205{
206    pub(crate) async fn new(
207        session: &'a mut SessionHandle,
208        reader: T,
209        option: Option<CallOptions>,
210        disable_route_to_leader: bool,
211    ) -> Result<RowIterator<'a, T>, Status> {
212        let streaming = reader
213            .read(session, option, disable_route_to_leader)
214            .await?
215            .into_inner();
216        let rs = ResultSet {
217            fields: Arc::new(vec![]),
218            index: Arc::new(HashMap::new()),
219            rows: VecDeque::new(),
220            chunked_value: false,
221        };
222        Ok(Self {
223            streaming,
224            session,
225            reader,
226            rs,
227            reader_option: None,
228            disable_route_to_leader,
229            stats: None,
230        })
231    }
232
233    pub fn set_call_options(&mut self, option: CallOptions) {
234        self.reader_option = Some(option);
235    }
236
237    async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
238        // try getting records from server
239        let maybe_result_set = match self.streaming.message().await {
240            Ok(s) => s,
241            Err(e) => {
242                if !self.reader.can_resume() {
243                    return Err(e);
244                }
245                tracing::debug!("streaming error: {}. resume reading by resume_token", e);
246                let result = self
247                    .reader
248                    .read(self.session, option, self.disable_route_to_leader)
249                    .await?;
250                self.streaming = result.into_inner();
251                self.streaming.message().await?
252            }
253        };
254
255        match maybe_result_set {
256            Some(result_set) => {
257                if result_set.values.is_empty() {
258                    return Ok(false);
259                }
260                //if resume_token changes set new resume_token
261                if !result_set.resume_token.is_empty() {
262                    self.reader.update_token(result_set.resume_token);
263                }
264                // Capture stats if present (only sent with the last response)
265                if result_set.stats.is_some() {
266                    self.stats = result_set.stats;
267                }
268                self.rs
269                    .add(result_set.metadata, result_set.values, result_set.chunked_value)
270            }
271            None => Ok(false),
272        }
273    }
274
275    /// Return metadata for all columns
276    pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
277        &self.rs.fields
278    }
279
280    pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
281        for (i, val) in self.rs.fields.iter().enumerate() {
282            if val.name == column_name {
283                return Some((i, val.clone()));
284            }
285        }
286        None
287    }
288
289    /// Returns query execution statistics if available.
290    /// Stats are only available after all rows have been consumed and only when
291    /// the query was executed with a QueryMode that includes stats (Profile, WithStats, or WithPlanAndStats).
292    pub fn stats(&self) -> Option<&ResultSetStats> {
293        self.stats.as_ref()
294    }
295
296    /// next returns the next result.
297    /// Its second return value is None if there are no more results.
298    pub async fn next(&mut self) -> Result<Option<Row>, Status> {
299        loop {
300            let row = self.rs.next();
301            if row.is_some() {
302                return Ok(row);
303            }
304            // no data found or record chunked.
305            if !self.try_recv(self.reader_option.clone()).await? {
306                return Ok(None);
307            }
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use std::collections::VecDeque;
315    use std::sync::Arc;
316
317    use prost_types::value::Kind;
318    use prost_types::Value;
319
320    use google_cloud_googleapis::spanner::v1::struct_type::Field;
321    use google_cloud_googleapis::spanner::v1::{ResultSetMetadata, StructType};
322
323    use crate::reader::ResultSet;
324    use crate::row::{Row, TryFromValue};
325    use crate::statement::ToKind;
326
327    fn empty_rs() -> ResultSet {
328        ResultSet {
329            fields: Arc::new(vec![]),
330            index: Arc::new(Default::default()),
331            rows: Default::default(),
332            chunked_value: false,
333        }
334    }
335
336    fn field(name: &str) -> Field {
337        Field {
338            name: name.to_string(),
339            r#type: None,
340        }
341    }
342
343    fn value(to_kind: impl ToKind) -> Value {
344        Value {
345            kind: Some(to_kind.to_kind()),
346        }
347    }
348
349    fn assert_one_column(rs: &ResultSet) {
350        assert_eq!(rs.fields.len(), 1);
351        assert_eq!(rs.fields[0].name, "column1".to_string());
352        assert_eq!(*rs.index.get("column1").unwrap(), 0);
353    }
354
355    fn assert_multi_column(rs: &ResultSet) {
356        assert_eq!(rs.fields.len(), 2);
357        assert_eq!(rs.fields[0].name, "column1".to_string());
358        assert_eq!(rs.fields[1].name, "column2".to_string());
359        assert_eq!(*rs.index.get("column1").unwrap(), 0);
360        assert_eq!(*rs.index.get("column2").unwrap(), 1);
361    }
362
363    fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
364        assert!(row.is_some());
365        assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
366    }
367
368    fn assert_some_multi_column<
369        T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
370        T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
371    >(
372        row: Option<Row>,
373        v1: T1,
374        v2: T2,
375    ) {
376        assert!(row.is_some());
377        let v = row.unwrap();
378        assert_eq!(v1, v.column::<T1>(0).unwrap());
379        assert_eq!(v2, v.column::<T2>(1).unwrap());
380    }
381
382    #[test]
383    fn test_rs_next_empty() {
384        let mut rs = ResultSet {
385            fields: Arc::new(vec![field("column1")]),
386            index: Arc::new(Default::default()),
387            rows: Default::default(),
388            chunked_value: false,
389        };
390        assert!(rs.next().is_none());
391    }
392
393    #[test]
394    fn test_rs_next_record_chunked_or_not() {
395        let rs = |values| ResultSet {
396            fields: Arc::new(vec![field("column1"), field("column2")]),
397            index: Arc::new(Default::default()),
398            rows: VecDeque::from(values),
399            chunked_value: false,
400        };
401        let mut rs1 = rs(vec![value("value1")]);
402        assert!(rs1.next().is_none());
403        let mut rs2 = rs(vec![value("value1"), value("value2")]);
404        assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
405    }
406
407    #[test]
408    fn test_rs_next_value_chunked_or_not() {
409        let rs = |chunked_value| ResultSet {
410            fields: Arc::new(vec![field("column1"), field("column2")]),
411            index: Arc::new(Default::default()),
412            rows: VecDeque::from(vec![value("value1"), value("value2")]),
413            chunked_value,
414        };
415        assert!(rs(true).next().is_none());
416        assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
417    }
418
419    #[test]
420    fn test_rs_next_plural_record_one_column() {
421        let rs = |chunked_value| ResultSet {
422            fields: Arc::new(vec![field("column1")]),
423            index: Arc::new(Default::default()),
424            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
425            chunked_value,
426        };
427        let mut incomplete = rs(true);
428        assert!(incomplete.next().is_some());
429        assert!(incomplete.next().is_some());
430        assert!(incomplete.next().is_none());
431        let mut complete = rs(false);
432        assert!(complete.next().is_some());
433        assert!(complete.next().is_some());
434        assert!(complete.next().is_some());
435        assert!(complete.next().is_none());
436    }
437
438    #[test]
439    fn test_rs_next_plural_record_multi_column() {
440        let rs = |chunked_value| ResultSet {
441            fields: Arc::new(vec![field("column1"), field("column2")]),
442            index: Arc::new(Default::default()),
443            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
444            chunked_value,
445        };
446        let mut incomplete = rs(true);
447        assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
448        assert!(incomplete.next().is_none());
449        let mut complete = rs(false);
450        assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
451        assert!(incomplete.next().is_none());
452    }
453
454    #[test]
455    fn test_rs_merge_string_value() {
456        let result = ResultSet::merge(value("val"), value("ue1"));
457        assert!(result.is_ok());
458        let kind = result.unwrap().kind.unwrap();
459        match kind {
460            Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
461            _ => unreachable!("must be string value"),
462        }
463    }
464
465    #[test]
466    fn test_rs_merge_list_value() {
467        let previous_last = value(vec!["value1-1", "value1-2", "val"]);
468        let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
469        let result = ResultSet::merge(previous_last, current_first);
470        assert!(result.is_ok());
471        let kind = result.unwrap().kind.unwrap();
472        match kind {
473            Kind::ListValue(v) => {
474                assert_eq!(v.values.len(), 5);
475                match v.values[0].kind.as_ref().unwrap() {
476                    Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
477                    _ => unreachable!("must be string value"),
478                };
479                match v.values[1].kind.as_ref().unwrap() {
480                    Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
481                    _ => unreachable!("must be string value"),
482                };
483                match v.values[2].kind.as_ref().unwrap() {
484                    Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
485                    _ => unreachable!("must be string value"),
486                };
487                match v.values[3].kind.as_ref().unwrap() {
488                    Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
489                    _ => unreachable!("must be string value"),
490                }
491                match v.values[4].kind.as_ref().unwrap() {
492                    Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
493                    _ => unreachable!("must be string value"),
494                }
495            }
496            _ => unreachable!("must be string value"),
497        }
498    }
499
500    #[test]
501    fn test_rs_add_one_column_no_chunked_value() {
502        let mut rs = empty_rs();
503        let metadata = Some(ResultSetMetadata {
504            row_type: Some(StructType {
505                fields: vec![field("column1")],
506            }),
507            transaction: None,
508            undeclared_parameters: None,
509        });
510        let values = vec![value("value1"), value("value2"), value("value3")];
511        assert!(rs.add(metadata, values, false).unwrap());
512        assert_eq!(rs.rows.len(), 3);
513        assert_one_column(&rs);
514        assert!(!rs.chunked_value);
515
516        assert_some_one_column(rs.next(), "value1".to_string());
517        assert_some_one_column(rs.next(), "value2".to_string());
518        assert_some_one_column(rs.next(), "value3".to_string());
519        assert!(rs.next().is_none());
520    }
521
522    #[test]
523    fn test_rs_add_multi_column_no_chunked_value() {
524        let mut rs = empty_rs();
525        let metadata = Some(ResultSetMetadata {
526            row_type: Some(StructType {
527                fields: vec![field("column1"), field("column2")],
528            }),
529            transaction: None,
530            undeclared_parameters: None,
531        });
532        let values = vec![value("value1"), value("value2"), value("value3")];
533        assert!(rs.add(metadata, values, false).unwrap());
534        assert_eq!(rs.rows.len(), 3);
535        assert_multi_column(&rs);
536        assert!(!rs.chunked_value);
537
538        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
539        assert!(rs.next().is_none());
540    }
541
542    #[test]
543    fn test_rs_add_multi_column_no_chunked_value_just() {
544        let mut rs = empty_rs();
545        let metadata = Some(ResultSetMetadata {
546            row_type: Some(StructType {
547                fields: vec![field("column1"), field("column2")],
548            }),
549            transaction: None,
550            undeclared_parameters: None,
551        });
552        let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
553        assert!(rs.add(metadata, values, false).unwrap());
554        assert_eq!(rs.rows.len(), 4);
555        assert_multi_column(&rs);
556        assert!(!rs.chunked_value);
557
558        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
559        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
560        assert!(rs.next().is_none());
561    }
562
563    #[test]
564    fn test_rs_add_one_column_chunked_value() {
565        let mut rs = empty_rs();
566        let metadata = Some(ResultSetMetadata {
567            row_type: Some(StructType {
568                fields: vec![field("column1")],
569            }),
570            transaction: None,
571            undeclared_parameters: None,
572        });
573        let values = vec![value("value1"), value("value2"), value("val")];
574        assert!(rs.add(metadata.clone(), values, true).unwrap());
575        assert_eq!(rs.rows.len(), 3);
576        assert_one_column(&rs);
577        assert!(rs.chunked_value);
578
579        assert_some_one_column(rs.next(), "value1".to_string());
580        assert_some_one_column(rs.next(), "value2".to_string());
581        assert!(rs.next().is_none());
582
583        // add next stream data
584        assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
585        assert!(!rs.chunked_value);
586        assert_eq!(rs.rows.len(), 1);
587        assert_some_one_column(rs.next(), "value3".to_string());
588        assert!(rs.next().is_none());
589    }
590
591    #[test]
592    fn test_rs_add_multi_column_chunked_value() {
593        let mut rs = empty_rs();
594        let metadata = Some(ResultSetMetadata {
595            row_type: Some(StructType {
596                fields: vec![field("column1"), field("column2")],
597            }),
598            transaction: None,
599            undeclared_parameters: None,
600        });
601        let values = vec![value("value1"), value("value2"), value("val")];
602        assert!(rs.add(metadata.clone(), values, true).unwrap());
603        assert_eq!(rs.rows.len(), 3);
604        assert_multi_column(&rs);
605        assert!(rs.chunked_value);
606
607        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
608        assert!(rs.next().is_none());
609
610        // add next stream data
611        assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
612        assert!(!rs.chunked_value);
613        assert_eq!(rs.rows.len(), 1);
614        assert!(rs.next().is_none());
615
616        // add next stream data
617        assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
618        assert!(!rs.chunked_value);
619        assert_eq!(rs.rows.len(), 2);
620        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
621    }
622
623    #[test]
624    fn test_rs_add_multi_column_no_chunked_value_list_value() {
625        let mut rs = empty_rs();
626        let metadata = Some(ResultSetMetadata {
627            row_type: Some(StructType {
628                fields: vec![field("column1"), field("column2")],
629            }),
630            transaction: None,
631            undeclared_parameters: None,
632        });
633        let values = vec![value(vec!["value1-1", "value1-2"])];
634        assert!(rs.add(metadata.clone(), values, false).unwrap());
635        assert_eq!(rs.rows.len(), 1);
636        assert_multi_column(&rs);
637        assert!(!rs.chunked_value);
638        assert!(rs.next().is_none());
639        assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
640        assert!(!rs.chunked_value);
641        assert_eq!(rs.rows.len(), 2);
642        assert_some_multi_column(
643            rs.next(),
644            vec!["value1-1".to_string(), "value1-2".to_string()],
645            vec!["value2-1".to_string()],
646        );
647        assert!(rs.next().is_none());
648    }
649
650    #[test]
651    fn test_rs_add_multi_column_chunked_value_list_value() {
652        let mut rs = empty_rs();
653        let metadata = Some(ResultSetMetadata {
654            row_type: Some(StructType {
655                fields: vec![field("column1"), field("column2")],
656            }),
657            transaction: None,
658            undeclared_parameters: None,
659        });
660        let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
661        assert!(rs.add(metadata.clone(), values, true).unwrap());
662        assert_eq!(rs.rows.len(), 2);
663        assert_multi_column(&rs);
664        assert!(rs.chunked_value);
665        assert!(rs.next().is_none());
666
667        // add next stream data
668        assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
669        assert!(rs.chunked_value);
670        assert_eq!(rs.rows.len(), 2);
671        assert!(rs.next().is_none());
672
673        // add next stream data
674        assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
675        assert!(!rs.chunked_value);
676        assert_eq!(rs.rows.len(), 2);
677        assert_some_multi_column(
678            rs.next(),
679            vec!["value1-1".to_string(), "value1-2".to_string()],
680            vec!["value2-1".to_string(), "value2-2".to_string()],
681        );
682        assert!(rs.next().is_none());
683    }
684
685    #[test]
686    fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
687        let mut rs = empty_rs();
688        let metadata = Some(ResultSetMetadata {
689            row_type: Some(StructType {
690                fields: vec![field("column1"), field("column2")],
691            }),
692            transaction: None,
693            undeclared_parameters: None,
694        });
695        let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
696        assert!(rs.add(metadata.clone(), values, true).unwrap());
697        assert_eq!(rs.rows.len(), 2);
698        assert_multi_column(&rs);
699        assert!(rs.chunked_value);
700        assert!(rs.next().is_none());
701
702        // add next stream data
703        assert!(rs
704            .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
705            .unwrap());
706        assert!(rs.chunked_value);
707        assert_eq!(rs.rows.len(), 3);
708        assert_some_multi_column(
709            rs.next(),
710            vec!["value1-1".to_string(), "value1-2".to_string()],
711            "valueA".to_string(),
712        );
713        assert!(rs.next().is_none());
714
715        // add next stream data
716        assert!(rs
717            .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
718            .unwrap());
719        assert!(!rs.chunked_value);
720        assert_eq!(rs.rows.len(), 1);
721        assert!(rs.next().is_none());
722
723        // add next stream data
724        assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
725        assert!(rs.chunked_value);
726        assert_eq!(rs.rows.len(), 2);
727        assert!(rs.next().is_none());
728
729        // add next stream data
730        assert!(rs.add(metadata, vec![value("B")], false).unwrap());
731        assert!(!rs.chunked_value);
732        assert_eq!(rs.rows.len(), 2);
733        assert_some_multi_column(
734            rs.next(),
735            vec!["value2-1".to_string(), "value2-2".to_string()],
736            "valueB".to_string(),
737        );
738        assert!(rs.next().is_none());
739    }
740}