gcloud_spanner/
reader.rs

1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost_types::{value::Kind, Value};
5
6use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
7use google_cloud_googleapis::spanner::v1::struct_type::Field;
8use google_cloud_googleapis::spanner::v1::{ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata};
9
10use crate::row::Row;
11use crate::session::SessionHandle;
12use crate::transaction::CallOptions;
13
14pub trait Reader: Send + Sync {
15    fn read(
16        &self,
17        session: &mut SessionHandle,
18        option: Option<CallOptions>,
19    ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
20
21    fn update_token(&mut self, resume_token: Vec<u8>);
22
23    fn can_resume(&self) -> bool;
24}
25
26pub struct StatementReader {
27    pub enable_resume: bool,
28    pub request: ExecuteSqlRequest,
29}
30
31impl Reader for StatementReader {
32    async fn read(
33        &self,
34        session: &mut SessionHandle,
35        option: Option<CallOptions>,
36    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
37        let option = option.unwrap_or_default();
38        let client = &mut session.spanner_client;
39        let result = client.execute_streaming_sql(self.request.clone(), option.retry).await;
40        session.invalidate_if_needed(result).await
41    }
42
43    fn update_token(&mut self, resume_token: Vec<u8>) {
44        self.request.resume_token = resume_token;
45    }
46
47    fn can_resume(&self) -> bool {
48        self.enable_resume && !self.request.resume_token.is_empty()
49    }
50}
51
52pub struct TableReader {
53    pub request: ReadRequest,
54}
55
56impl Reader for TableReader {
57    async fn read(
58        &self,
59        session: &mut SessionHandle,
60        option: Option<CallOptions>,
61    ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
62        let option = option.unwrap_or_default();
63        let client = &mut session.spanner_client;
64        let result = client.streaming_read(self.request.clone(), option.retry).await;
65        session.invalidate_if_needed(result).await
66    }
67
68    fn update_token(&mut self, resume_token: Vec<u8>) {
69        self.request.resume_token = resume_token;
70    }
71
72    fn can_resume(&self) -> bool {
73        !self.request.resume_token.is_empty()
74    }
75}
76
77pub struct ResultSet {
78    fields: Arc<Vec<Field>>,
79    index: Arc<HashMap<String, usize>>,
80    rows: VecDeque<Value>,
81    chunked_value: bool,
82}
83
84impl ResultSet {
85    fn next(&mut self) -> Option<Row> {
86        if !self.rows.is_empty() {
87            let column_length = self.fields.len();
88            let target_record_is_chunked = self.rows.len() < column_length;
89            let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
90
91            if !target_record_is_chunked && !target_record_contains_chunked_value {
92                // get column_length values
93                let mut values = Vec::with_capacity(column_length);
94                for _ in 0..column_length {
95                    values.push(self.rows.pop_front().unwrap());
96                }
97                return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
98            }
99        }
100        None
101    }
102
103    /// Merge tries to combine two protobuf Values if possible.
104    fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
105        match previous_last.kind.unwrap() {
106            Kind::StringValue(last) => match current_first.kind.unwrap() {
107                Kind::StringValue(first) => {
108                    tracing::trace!("previous_last={}, current_first={}", &last, first);
109                    Ok(Value {
110                        kind: Some(Kind::StringValue(last + &first)),
111                    })
112                }
113                _ => Err(Status::new(
114                    Code::Internal,
115                    "chunks kind mismatch: current_first must be StringKind",
116                )),
117            },
118            Kind::ListValue(mut last) => match current_first.kind.unwrap() {
119                Kind::ListValue(mut first) => {
120                    let first_value_of_current = first.values.remove(0);
121                    let merged = match last.values.pop() {
122                        Some(last_value_of_previous) => {
123                            ResultSet::merge(last_value_of_previous, first_value_of_current)?
124                        }
125                        // last record can be empty
126                        None => first_value_of_current,
127                    };
128                    last.values.push(merged);
129                    last.values.extend(first.values);
130                    Ok(Value {
131                        kind: Some(Kind::ListValue(last)),
132                    })
133                }
134                _ => Err(Status::new(
135                    Code::Internal,
136                    "chunks kind mismatch: current_first must be ListValue",
137                )),
138            },
139            _ => Err(Status::new(
140                Code::Internal,
141                "previous_last kind mismatch: only StringValue and ListValue can be chunked",
142            )),
143        }
144    }
145
146    fn add(
147        &mut self,
148        metadata: Option<ResultSetMetadata>,
149        mut values: Vec<Value>,
150        chunked_value: bool,
151    ) -> Result<bool, Status> {
152        // get metadata only once.
153        if self.fields.is_empty() {
154            if let Some(metadata) = metadata {
155                self.fields = metadata
156                    .row_type
157                    .map(|e| Arc::new(e.fields))
158                    .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
159                // create index for Row::column_by_name("column_name")
160                let mut index = HashMap::new();
161                for (i, f) in self.fields.iter().enumerate() {
162                    index.insert(f.name.clone(), i);
163                }
164                self.index = Arc::new(index);
165            }
166        }
167
168        if self.chunked_value {
169            tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
170            //merge when the chunked value is found.
171            let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
172            self.rows.push_back(merged);
173        }
174        self.rows.extend(values);
175        self.chunked_value = chunked_value;
176        Ok(true)
177    }
178}
179
180pub struct RowIterator<'a, T>
181where
182    T: Reader,
183{
184    streaming: Streaming<PartialResultSet>,
185    session: &'a mut SessionHandle,
186    reader: T,
187    rs: ResultSet,
188    reader_option: Option<CallOptions>,
189}
190
191impl<'a, T> RowIterator<'a, T>
192where
193    T: Reader,
194{
195    pub(crate) async fn new(
196        session: &'a mut SessionHandle,
197        reader: T,
198        option: Option<CallOptions>,
199    ) -> Result<RowIterator<'a, T>, Status> {
200        let streaming = reader.read(session, option).await?.into_inner();
201        let rs = ResultSet {
202            fields: Arc::new(vec![]),
203            index: Arc::new(HashMap::new()),
204            rows: VecDeque::new(),
205            chunked_value: false,
206        };
207        Ok(Self {
208            streaming,
209            session,
210            reader,
211            rs,
212            reader_option: None,
213        })
214    }
215
216    pub fn set_call_options(&mut self, option: CallOptions) {
217        self.reader_option = Some(option);
218    }
219
220    async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
221        // try getting records from server
222        let maybe_result_set = match self.streaming.message().await {
223            Ok(s) => s,
224            Err(e) => {
225                if !self.reader.can_resume() {
226                    return Err(e);
227                }
228                tracing::debug!("streaming error: {}. resume reading by resume_token", e);
229                let result = self.reader.read(self.session, option).await?;
230                self.streaming = result.into_inner();
231                self.streaming.message().await?
232            }
233        };
234
235        match maybe_result_set {
236            Some(result_set) => {
237                if result_set.values.is_empty() {
238                    return Ok(false);
239                }
240                //if resume_token changes set new resume_token
241                if !result_set.resume_token.is_empty() {
242                    self.reader.update_token(result_set.resume_token);
243                }
244                self.rs
245                    .add(result_set.metadata, result_set.values, result_set.chunked_value)
246            }
247            None => Ok(false),
248        }
249    }
250
251    /// Return metadata for all columns
252    pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
253        &self.rs.fields
254    }
255
256    pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
257        for (i, val) in self.rs.fields.iter().enumerate() {
258            if val.name == column_name {
259                return Some((i, val.clone()));
260            }
261        }
262        None
263    }
264
265    /// next returns the next result.
266    /// Its second return value is None if there are no more results.
267    pub async fn next(&mut self) -> Result<Option<Row>, Status> {
268        loop {
269            let row = self.rs.next();
270            if row.is_some() {
271                return Ok(row);
272            }
273            // no data found or record chunked.
274            if !self.try_recv(self.reader_option.clone()).await? {
275                return Ok(None);
276            }
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use std::collections::VecDeque;
284    use std::sync::Arc;
285
286    use prost_types::value::Kind;
287    use prost_types::Value;
288
289    use google_cloud_googleapis::spanner::v1::struct_type::Field;
290    use google_cloud_googleapis::spanner::v1::{ResultSetMetadata, StructType};
291
292    use crate::reader::ResultSet;
293    use crate::row::{Row, TryFromValue};
294    use crate::statement::ToKind;
295
296    fn empty_rs() -> ResultSet {
297        ResultSet {
298            fields: Arc::new(vec![]),
299            index: Arc::new(Default::default()),
300            rows: Default::default(),
301            chunked_value: false,
302        }
303    }
304
305    fn field(name: &str) -> Field {
306        Field {
307            name: name.to_string(),
308            r#type: None,
309        }
310    }
311
312    fn value(to_kind: impl ToKind) -> Value {
313        Value {
314            kind: Some(to_kind.to_kind()),
315        }
316    }
317
318    fn assert_one_column(rs: &ResultSet) {
319        assert_eq!(rs.fields.len(), 1);
320        assert_eq!(rs.fields[0].name, "column1".to_string());
321        assert_eq!(*rs.index.get("column1").unwrap(), 0);
322    }
323
324    fn assert_multi_column(rs: &ResultSet) {
325        assert_eq!(rs.fields.len(), 2);
326        assert_eq!(rs.fields[0].name, "column1".to_string());
327        assert_eq!(rs.fields[1].name, "column2".to_string());
328        assert_eq!(*rs.index.get("column1").unwrap(), 0);
329        assert_eq!(*rs.index.get("column2").unwrap(), 1);
330    }
331
332    fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
333        assert!(row.is_some());
334        assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
335    }
336
337    fn assert_some_multi_column<
338        T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
339        T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
340    >(
341        row: Option<Row>,
342        v1: T1,
343        v2: T2,
344    ) {
345        assert!(row.is_some());
346        let v = row.unwrap();
347        assert_eq!(v1, v.column::<T1>(0).unwrap());
348        assert_eq!(v2, v.column::<T2>(1).unwrap());
349    }
350
351    #[test]
352    fn test_rs_next_empty() {
353        let mut rs = ResultSet {
354            fields: Arc::new(vec![field("column1")]),
355            index: Arc::new(Default::default()),
356            rows: Default::default(),
357            chunked_value: false,
358        };
359        assert!(rs.next().is_none());
360    }
361
362    #[test]
363    fn test_rs_next_record_chunked_or_not() {
364        let rs = |values| ResultSet {
365            fields: Arc::new(vec![field("column1"), field("column2")]),
366            index: Arc::new(Default::default()),
367            rows: VecDeque::from(values),
368            chunked_value: false,
369        };
370        let mut rs1 = rs(vec![value("value1")]);
371        assert!(rs1.next().is_none());
372        let mut rs2 = rs(vec![value("value1"), value("value2")]);
373        assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
374    }
375
376    #[test]
377    fn test_rs_next_value_chunked_or_not() {
378        let rs = |chunked_value| ResultSet {
379            fields: Arc::new(vec![field("column1"), field("column2")]),
380            index: Arc::new(Default::default()),
381            rows: VecDeque::from(vec![value("value1"), value("value2")]),
382            chunked_value,
383        };
384        assert!(rs(true).next().is_none());
385        assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
386    }
387
388    #[test]
389    fn test_rs_next_plural_record_one_column() {
390        let rs = |chunked_value| ResultSet {
391            fields: Arc::new(vec![field("column1")]),
392            index: Arc::new(Default::default()),
393            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
394            chunked_value,
395        };
396        let mut incomplete = rs(true);
397        assert!(incomplete.next().is_some());
398        assert!(incomplete.next().is_some());
399        assert!(incomplete.next().is_none());
400        let mut complete = rs(false);
401        assert!(complete.next().is_some());
402        assert!(complete.next().is_some());
403        assert!(complete.next().is_some());
404        assert!(complete.next().is_none());
405    }
406
407    #[test]
408    fn test_rs_next_plural_record_multi_column() {
409        let rs = |chunked_value| ResultSet {
410            fields: Arc::new(vec![field("column1"), field("column2")]),
411            index: Arc::new(Default::default()),
412            rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
413            chunked_value,
414        };
415        let mut incomplete = rs(true);
416        assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
417        assert!(incomplete.next().is_none());
418        let mut complete = rs(false);
419        assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
420        assert!(incomplete.next().is_none());
421    }
422
423    #[test]
424    fn test_rs_merge_string_value() {
425        let result = ResultSet::merge(value("val"), value("ue1"));
426        assert!(result.is_ok());
427        let kind = result.unwrap().kind.unwrap();
428        match kind {
429            Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
430            _ => unreachable!("must be string value"),
431        }
432    }
433
434    #[test]
435    fn test_rs_merge_list_value() {
436        let previous_last = value(vec!["value1-1", "value1-2", "val"]);
437        let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
438        let result = ResultSet::merge(previous_last, current_first);
439        assert!(result.is_ok());
440        let kind = result.unwrap().kind.unwrap();
441        match kind {
442            Kind::ListValue(v) => {
443                assert_eq!(v.values.len(), 5);
444                match v.values[0].kind.as_ref().unwrap() {
445                    Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
446                    _ => unreachable!("must be string value"),
447                };
448                match v.values[1].kind.as_ref().unwrap() {
449                    Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
450                    _ => unreachable!("must be string value"),
451                };
452                match v.values[2].kind.as_ref().unwrap() {
453                    Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
454                    _ => unreachable!("must be string value"),
455                };
456                match v.values[3].kind.as_ref().unwrap() {
457                    Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
458                    _ => unreachable!("must be string value"),
459                }
460                match v.values[4].kind.as_ref().unwrap() {
461                    Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
462                    _ => unreachable!("must be string value"),
463                }
464            }
465            _ => unreachable!("must be string value"),
466        }
467    }
468
469    #[test]
470    fn test_rs_add_one_column_no_chunked_value() {
471        let mut rs = empty_rs();
472        let metadata = Some(ResultSetMetadata {
473            row_type: Some(StructType {
474                fields: vec![field("column1")],
475            }),
476            transaction: None,
477            undeclared_parameters: None,
478        });
479        let values = vec![value("value1"), value("value2"), value("value3")];
480        assert!(rs.add(metadata, values, false).unwrap());
481        assert_eq!(rs.rows.len(), 3);
482        assert_one_column(&rs);
483        assert!(!rs.chunked_value);
484
485        assert_some_one_column(rs.next(), "value1".to_string());
486        assert_some_one_column(rs.next(), "value2".to_string());
487        assert_some_one_column(rs.next(), "value3".to_string());
488        assert!(rs.next().is_none());
489    }
490
491    #[test]
492    fn test_rs_add_multi_column_no_chunked_value() {
493        let mut rs = empty_rs();
494        let metadata = Some(ResultSetMetadata {
495            row_type: Some(StructType {
496                fields: vec![field("column1"), field("column2")],
497            }),
498            transaction: None,
499            undeclared_parameters: None,
500        });
501        let values = vec![value("value1"), value("value2"), value("value3")];
502        assert!(rs.add(metadata, values, false).unwrap());
503        assert_eq!(rs.rows.len(), 3);
504        assert_multi_column(&rs);
505        assert!(!rs.chunked_value);
506
507        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
508        assert!(rs.next().is_none());
509    }
510
511    #[test]
512    fn test_rs_add_multi_column_no_chunked_value_just() {
513        let mut rs = empty_rs();
514        let metadata = Some(ResultSetMetadata {
515            row_type: Some(StructType {
516                fields: vec![field("column1"), field("column2")],
517            }),
518            transaction: None,
519            undeclared_parameters: None,
520        });
521        let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
522        assert!(rs.add(metadata, values, false).unwrap());
523        assert_eq!(rs.rows.len(), 4);
524        assert_multi_column(&rs);
525        assert!(!rs.chunked_value);
526
527        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
528        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
529        assert!(rs.next().is_none());
530    }
531
532    #[test]
533    fn test_rs_add_one_column_chunked_value() {
534        let mut rs = empty_rs();
535        let metadata = Some(ResultSetMetadata {
536            row_type: Some(StructType {
537                fields: vec![field("column1")],
538            }),
539            transaction: None,
540            undeclared_parameters: None,
541        });
542        let values = vec![value("value1"), value("value2"), value("val")];
543        assert!(rs.add(metadata.clone(), values, true).unwrap());
544        assert_eq!(rs.rows.len(), 3);
545        assert_one_column(&rs);
546        assert!(rs.chunked_value);
547
548        assert_some_one_column(rs.next(), "value1".to_string());
549        assert_some_one_column(rs.next(), "value2".to_string());
550        assert!(rs.next().is_none());
551
552        // add next stream data
553        assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
554        assert!(!rs.chunked_value);
555        assert_eq!(rs.rows.len(), 1);
556        assert_some_one_column(rs.next(), "value3".to_string());
557        assert!(rs.next().is_none());
558    }
559
560    #[test]
561    fn test_rs_add_multi_column_chunked_value() {
562        let mut rs = empty_rs();
563        let metadata = Some(ResultSetMetadata {
564            row_type: Some(StructType {
565                fields: vec![field("column1"), field("column2")],
566            }),
567            transaction: None,
568            undeclared_parameters: None,
569        });
570        let values = vec![value("value1"), value("value2"), value("val")];
571        assert!(rs.add(metadata.clone(), values, true).unwrap());
572        assert_eq!(rs.rows.len(), 3);
573        assert_multi_column(&rs);
574        assert!(rs.chunked_value);
575
576        assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
577        assert!(rs.next().is_none());
578
579        // add next stream data
580        assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
581        assert!(!rs.chunked_value);
582        assert_eq!(rs.rows.len(), 1);
583        assert!(rs.next().is_none());
584
585        // add next stream data
586        assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
587        assert!(!rs.chunked_value);
588        assert_eq!(rs.rows.len(), 2);
589        assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
590    }
591
592    #[test]
593    fn test_rs_add_multi_column_no_chunked_value_list_value() {
594        let mut rs = empty_rs();
595        let metadata = Some(ResultSetMetadata {
596            row_type: Some(StructType {
597                fields: vec![field("column1"), field("column2")],
598            }),
599            transaction: None,
600            undeclared_parameters: None,
601        });
602        let values = vec![value(vec!["value1-1", "value1-2"])];
603        assert!(rs.add(metadata.clone(), values, false).unwrap());
604        assert_eq!(rs.rows.len(), 1);
605        assert_multi_column(&rs);
606        assert!(!rs.chunked_value);
607        assert!(rs.next().is_none());
608        assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
609        assert!(!rs.chunked_value);
610        assert_eq!(rs.rows.len(), 2);
611        assert_some_multi_column(
612            rs.next(),
613            vec!["value1-1".to_string(), "value1-2".to_string()],
614            vec!["value2-1".to_string()],
615        );
616        assert!(rs.next().is_none());
617    }
618
619    #[test]
620    fn test_rs_add_multi_column_chunked_value_list_value() {
621        let mut rs = empty_rs();
622        let metadata = Some(ResultSetMetadata {
623            row_type: Some(StructType {
624                fields: vec![field("column1"), field("column2")],
625            }),
626            transaction: None,
627            undeclared_parameters: None,
628        });
629        let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
630        assert!(rs.add(metadata.clone(), values, true).unwrap());
631        assert_eq!(rs.rows.len(), 2);
632        assert_multi_column(&rs);
633        assert!(rs.chunked_value);
634        assert!(rs.next().is_none());
635
636        // add next stream data
637        assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
638        assert!(rs.chunked_value);
639        assert_eq!(rs.rows.len(), 2);
640        assert!(rs.next().is_none());
641
642        // add next stream data
643        assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
644        assert!(!rs.chunked_value);
645        assert_eq!(rs.rows.len(), 2);
646        assert_some_multi_column(
647            rs.next(),
648            vec!["value1-1".to_string(), "value1-2".to_string()],
649            vec!["value2-1".to_string(), "value2-2".to_string()],
650        );
651        assert!(rs.next().is_none());
652    }
653
654    #[test]
655    fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
656        let mut rs = empty_rs();
657        let metadata = Some(ResultSetMetadata {
658            row_type: Some(StructType {
659                fields: vec![field("column1"), field("column2")],
660            }),
661            transaction: None,
662            undeclared_parameters: None,
663        });
664        let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
665        assert!(rs.add(metadata.clone(), values, true).unwrap());
666        assert_eq!(rs.rows.len(), 2);
667        assert_multi_column(&rs);
668        assert!(rs.chunked_value);
669        assert!(rs.next().is_none());
670
671        // add next stream data
672        assert!(rs
673            .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
674            .unwrap());
675        assert!(rs.chunked_value);
676        assert_eq!(rs.rows.len(), 3);
677        assert_some_multi_column(
678            rs.next(),
679            vec!["value1-1".to_string(), "value1-2".to_string()],
680            "valueA".to_string(),
681        );
682        assert!(rs.next().is_none());
683
684        // add next stream data
685        assert!(rs
686            .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
687            .unwrap());
688        assert!(!rs.chunked_value);
689        assert_eq!(rs.rows.len(), 1);
690        assert!(rs.next().is_none());
691
692        // add next stream data
693        assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
694        assert!(rs.chunked_value);
695        assert_eq!(rs.rows.len(), 2);
696        assert!(rs.next().is_none());
697
698        // add next stream data
699        assert!(rs.add(metadata, vec![value("B")], false).unwrap());
700        assert!(!rs.chunked_value);
701        assert_eq!(rs.rows.len(), 2);
702        assert_some_multi_column(
703            rs.next(),
704            vec!["value2-1".to_string(), "value2-2".to_string()],
705            "valueB".to_string(),
706        );
707        assert!(rs.next().is_none());
708    }
709}