1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost_types::{value::Kind, Value};
5
6use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
7use google_cloud_googleapis::spanner::v1::struct_type::Field;
8use google_cloud_googleapis::spanner::v1::{
9 ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
10};
11
12use crate::row::Row;
13use crate::session::SessionHandle;
14use crate::transaction::CallOptions;
15
16pub trait Reader: Send + Sync {
17 fn read(
18 &self,
19 session: &mut SessionHandle,
20 option: Option<CallOptions>,
21 disable_route_to_leader: bool,
22 ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
23
24 fn update_token(&mut self, resume_token: Vec<u8>);
25
26 fn can_resume(&self) -> bool;
27}
28
29pub struct StatementReader {
30 pub enable_resume: bool,
31 pub request: ExecuteSqlRequest,
32}
33
34impl Reader for StatementReader {
35 async fn read(
36 &self,
37 session: &mut SessionHandle,
38 option: Option<CallOptions>,
39 disable_route_to_leader: bool,
40 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
41 let option = option.unwrap_or_default();
42 let client = &mut session.spanner_client;
43 let result = client
44 .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
45 .await;
46 session.invalidate_if_needed(result).await
47 }
48
49 fn update_token(&mut self, resume_token: Vec<u8>) {
50 self.request.resume_token = resume_token;
51 }
52
53 fn can_resume(&self) -> bool {
54 self.enable_resume && !self.request.resume_token.is_empty()
55 }
56}
57
58pub struct TableReader {
59 pub request: ReadRequest,
60}
61
62impl Reader for TableReader {
63 async fn read(
64 &self,
65 session: &mut SessionHandle,
66 option: Option<CallOptions>,
67 disable_route_to_leader: bool,
68 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
69 let option = option.unwrap_or_default();
70 let client = &mut session.spanner_client;
71 let result = client
72 .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
73 .await;
74 session.invalidate_if_needed(result).await
75 }
76
77 fn update_token(&mut self, resume_token: Vec<u8>) {
78 self.request.resume_token = resume_token;
79 }
80
81 fn can_resume(&self) -> bool {
82 !self.request.resume_token.is_empty()
83 }
84}
85
86pub struct ResultSet {
87 fields: Arc<Vec<Field>>,
88 index: Arc<HashMap<String, usize>>,
89 rows: VecDeque<Value>,
90 chunked_value: bool,
91}
92
93impl ResultSet {
94 fn next(&mut self) -> Option<Row> {
95 if !self.rows.is_empty() {
96 let column_length = self.fields.len();
97 let target_record_is_chunked = self.rows.len() < column_length;
98 let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
99
100 if !target_record_is_chunked && !target_record_contains_chunked_value {
101 let mut values = Vec::with_capacity(column_length);
103 for _ in 0..column_length {
104 values.push(self.rows.pop_front().unwrap());
105 }
106 return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
107 }
108 }
109 None
110 }
111
112 fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
114 match previous_last.kind.unwrap() {
115 Kind::StringValue(last) => match current_first.kind.unwrap() {
116 Kind::StringValue(first) => {
117 tracing::trace!("previous_last={}, current_first={}", &last, first);
118 Ok(Value {
119 kind: Some(Kind::StringValue(last + &first)),
120 })
121 }
122 _ => Err(Status::new(
123 Code::Internal,
124 "chunks kind mismatch: current_first must be StringKind",
125 )),
126 },
127 Kind::ListValue(mut last) => match current_first.kind.unwrap() {
128 Kind::ListValue(mut first) => {
129 let first_value_of_current = first.values.remove(0);
130 let merged = match last.values.pop() {
131 Some(last_value_of_previous) => {
132 ResultSet::merge(last_value_of_previous, first_value_of_current)?
133 }
134 None => first_value_of_current,
136 };
137 last.values.push(merged);
138 last.values.extend(first.values);
139 Ok(Value {
140 kind: Some(Kind::ListValue(last)),
141 })
142 }
143 _ => Err(Status::new(
144 Code::Internal,
145 "chunks kind mismatch: current_first must be ListValue",
146 )),
147 },
148 _ => Err(Status::new(
149 Code::Internal,
150 "previous_last kind mismatch: only StringValue and ListValue can be chunked",
151 )),
152 }
153 }
154
155 fn add(
156 &mut self,
157 metadata: Option<ResultSetMetadata>,
158 mut values: Vec<Value>,
159 chunked_value: bool,
160 ) -> Result<bool, Status> {
161 if self.fields.is_empty() {
163 if let Some(metadata) = metadata {
164 self.fields = metadata
165 .row_type
166 .map(|e| Arc::new(e.fields))
167 .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
168 let mut index = HashMap::new();
170 for (i, f) in self.fields.iter().enumerate() {
171 index.insert(f.name.clone(), i);
172 }
173 self.index = Arc::new(index);
174 }
175 }
176
177 if self.chunked_value {
178 tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
179 let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
181 self.rows.push_back(merged);
182 }
183 self.rows.extend(values);
184 self.chunked_value = chunked_value;
185 Ok(true)
186 }
187}
188
189pub struct RowIterator<'a, T>
190where
191 T: Reader,
192{
193 streaming: Streaming<PartialResultSet>,
194 session: &'a mut SessionHandle,
195 reader: T,
196 rs: ResultSet,
197 reader_option: Option<CallOptions>,
198 disable_route_to_leader: bool,
199 stats: Option<ResultSetStats>,
200}
201
202impl<'a, T> RowIterator<'a, T>
203where
204 T: Reader,
205{
206 pub(crate) async fn new(
207 session: &'a mut SessionHandle,
208 reader: T,
209 option: Option<CallOptions>,
210 disable_route_to_leader: bool,
211 ) -> Result<RowIterator<'a, T>, Status> {
212 let streaming = reader
213 .read(session, option, disable_route_to_leader)
214 .await?
215 .into_inner();
216 let rs = ResultSet {
217 fields: Arc::new(vec![]),
218 index: Arc::new(HashMap::new()),
219 rows: VecDeque::new(),
220 chunked_value: false,
221 };
222 Ok(Self {
223 streaming,
224 session,
225 reader,
226 rs,
227 reader_option: None,
228 disable_route_to_leader,
229 stats: None,
230 })
231 }
232
233 pub fn set_call_options(&mut self, option: CallOptions) {
234 self.reader_option = Some(option);
235 }
236
237 async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
238 let maybe_result_set = match self.streaming.message().await {
240 Ok(s) => s,
241 Err(e) => {
242 if !self.reader.can_resume() {
243 return Err(e);
244 }
245 tracing::debug!("streaming error: {}. resume reading by resume_token", e);
246 let result = self
247 .reader
248 .read(self.session, option, self.disable_route_to_leader)
249 .await?;
250 self.streaming = result.into_inner();
251 self.streaming.message().await?
252 }
253 };
254
255 match maybe_result_set {
256 Some(result_set) => {
257 if result_set.values.is_empty() {
258 return Ok(false);
259 }
260 if !result_set.resume_token.is_empty() {
262 self.reader.update_token(result_set.resume_token);
263 }
264 if result_set.stats.is_some() {
266 self.stats = result_set.stats;
267 }
268 self.rs
269 .add(result_set.metadata, result_set.values, result_set.chunked_value)
270 }
271 None => Ok(false),
272 }
273 }
274
275 pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
277 &self.rs.fields
278 }
279
280 pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
281 for (i, val) in self.rs.fields.iter().enumerate() {
282 if val.name == column_name {
283 return Some((i, val.clone()));
284 }
285 }
286 None
287 }
288
289 pub fn stats(&self) -> Option<&ResultSetStats> {
293 self.stats.as_ref()
294 }
295
296 pub async fn next(&mut self) -> Result<Option<Row>, Status> {
299 loop {
300 let row = self.rs.next();
301 if row.is_some() {
302 return Ok(row);
303 }
304 if !self.try_recv(self.reader_option.clone()).await? {
306 return Ok(None);
307 }
308 }
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use std::collections::VecDeque;
315 use std::sync::Arc;
316
317 use prost_types::value::Kind;
318 use prost_types::Value;
319
320 use google_cloud_googleapis::spanner::v1::struct_type::Field;
321 use google_cloud_googleapis::spanner::v1::{ResultSetMetadata, StructType};
322
323 use crate::reader::ResultSet;
324 use crate::row::{Row, TryFromValue};
325 use crate::statement::ToKind;
326
327 fn empty_rs() -> ResultSet {
328 ResultSet {
329 fields: Arc::new(vec![]),
330 index: Arc::new(Default::default()),
331 rows: Default::default(),
332 chunked_value: false,
333 }
334 }
335
336 fn field(name: &str) -> Field {
337 Field {
338 name: name.to_string(),
339 r#type: None,
340 }
341 }
342
343 fn value(to_kind: impl ToKind) -> Value {
344 Value {
345 kind: Some(to_kind.to_kind()),
346 }
347 }
348
349 fn assert_one_column(rs: &ResultSet) {
350 assert_eq!(rs.fields.len(), 1);
351 assert_eq!(rs.fields[0].name, "column1".to_string());
352 assert_eq!(*rs.index.get("column1").unwrap(), 0);
353 }
354
355 fn assert_multi_column(rs: &ResultSet) {
356 assert_eq!(rs.fields.len(), 2);
357 assert_eq!(rs.fields[0].name, "column1".to_string());
358 assert_eq!(rs.fields[1].name, "column2".to_string());
359 assert_eq!(*rs.index.get("column1").unwrap(), 0);
360 assert_eq!(*rs.index.get("column2").unwrap(), 1);
361 }
362
363 fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
364 assert!(row.is_some());
365 assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
366 }
367
368 fn assert_some_multi_column<
369 T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
370 T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
371 >(
372 row: Option<Row>,
373 v1: T1,
374 v2: T2,
375 ) {
376 assert!(row.is_some());
377 let v = row.unwrap();
378 assert_eq!(v1, v.column::<T1>(0).unwrap());
379 assert_eq!(v2, v.column::<T2>(1).unwrap());
380 }
381
382 #[test]
383 fn test_rs_next_empty() {
384 let mut rs = ResultSet {
385 fields: Arc::new(vec![field("column1")]),
386 index: Arc::new(Default::default()),
387 rows: Default::default(),
388 chunked_value: false,
389 };
390 assert!(rs.next().is_none());
391 }
392
393 #[test]
394 fn test_rs_next_record_chunked_or_not() {
395 let rs = |values| ResultSet {
396 fields: Arc::new(vec![field("column1"), field("column2")]),
397 index: Arc::new(Default::default()),
398 rows: VecDeque::from(values),
399 chunked_value: false,
400 };
401 let mut rs1 = rs(vec![value("value1")]);
402 assert!(rs1.next().is_none());
403 let mut rs2 = rs(vec![value("value1"), value("value2")]);
404 assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
405 }
406
407 #[test]
408 fn test_rs_next_value_chunked_or_not() {
409 let rs = |chunked_value| ResultSet {
410 fields: Arc::new(vec![field("column1"), field("column2")]),
411 index: Arc::new(Default::default()),
412 rows: VecDeque::from(vec![value("value1"), value("value2")]),
413 chunked_value,
414 };
415 assert!(rs(true).next().is_none());
416 assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
417 }
418
419 #[test]
420 fn test_rs_next_plural_record_one_column() {
421 let rs = |chunked_value| ResultSet {
422 fields: Arc::new(vec![field("column1")]),
423 index: Arc::new(Default::default()),
424 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
425 chunked_value,
426 };
427 let mut incomplete = rs(true);
428 assert!(incomplete.next().is_some());
429 assert!(incomplete.next().is_some());
430 assert!(incomplete.next().is_none());
431 let mut complete = rs(false);
432 assert!(complete.next().is_some());
433 assert!(complete.next().is_some());
434 assert!(complete.next().is_some());
435 assert!(complete.next().is_none());
436 }
437
438 #[test]
439 fn test_rs_next_plural_record_multi_column() {
440 let rs = |chunked_value| ResultSet {
441 fields: Arc::new(vec![field("column1"), field("column2")]),
442 index: Arc::new(Default::default()),
443 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
444 chunked_value,
445 };
446 let mut incomplete = rs(true);
447 assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
448 assert!(incomplete.next().is_none());
449 let mut complete = rs(false);
450 assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
451 assert!(incomplete.next().is_none());
452 }
453
454 #[test]
455 fn test_rs_merge_string_value() {
456 let result = ResultSet::merge(value("val"), value("ue1"));
457 assert!(result.is_ok());
458 let kind = result.unwrap().kind.unwrap();
459 match kind {
460 Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
461 _ => unreachable!("must be string value"),
462 }
463 }
464
465 #[test]
466 fn test_rs_merge_list_value() {
467 let previous_last = value(vec!["value1-1", "value1-2", "val"]);
468 let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
469 let result = ResultSet::merge(previous_last, current_first);
470 assert!(result.is_ok());
471 let kind = result.unwrap().kind.unwrap();
472 match kind {
473 Kind::ListValue(v) => {
474 assert_eq!(v.values.len(), 5);
475 match v.values[0].kind.as_ref().unwrap() {
476 Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
477 _ => unreachable!("must be string value"),
478 };
479 match v.values[1].kind.as_ref().unwrap() {
480 Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
481 _ => unreachable!("must be string value"),
482 };
483 match v.values[2].kind.as_ref().unwrap() {
484 Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
485 _ => unreachable!("must be string value"),
486 };
487 match v.values[3].kind.as_ref().unwrap() {
488 Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
489 _ => unreachable!("must be string value"),
490 }
491 match v.values[4].kind.as_ref().unwrap() {
492 Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
493 _ => unreachable!("must be string value"),
494 }
495 }
496 _ => unreachable!("must be string value"),
497 }
498 }
499
500 #[test]
501 fn test_rs_add_one_column_no_chunked_value() {
502 let mut rs = empty_rs();
503 let metadata = Some(ResultSetMetadata {
504 row_type: Some(StructType {
505 fields: vec![field("column1")],
506 }),
507 transaction: None,
508 undeclared_parameters: None,
509 });
510 let values = vec![value("value1"), value("value2"), value("value3")];
511 assert!(rs.add(metadata, values, false).unwrap());
512 assert_eq!(rs.rows.len(), 3);
513 assert_one_column(&rs);
514 assert!(!rs.chunked_value);
515
516 assert_some_one_column(rs.next(), "value1".to_string());
517 assert_some_one_column(rs.next(), "value2".to_string());
518 assert_some_one_column(rs.next(), "value3".to_string());
519 assert!(rs.next().is_none());
520 }
521
522 #[test]
523 fn test_rs_add_multi_column_no_chunked_value() {
524 let mut rs = empty_rs();
525 let metadata = Some(ResultSetMetadata {
526 row_type: Some(StructType {
527 fields: vec![field("column1"), field("column2")],
528 }),
529 transaction: None,
530 undeclared_parameters: None,
531 });
532 let values = vec![value("value1"), value("value2"), value("value3")];
533 assert!(rs.add(metadata, values, false).unwrap());
534 assert_eq!(rs.rows.len(), 3);
535 assert_multi_column(&rs);
536 assert!(!rs.chunked_value);
537
538 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
539 assert!(rs.next().is_none());
540 }
541
542 #[test]
543 fn test_rs_add_multi_column_no_chunked_value_just() {
544 let mut rs = empty_rs();
545 let metadata = Some(ResultSetMetadata {
546 row_type: Some(StructType {
547 fields: vec![field("column1"), field("column2")],
548 }),
549 transaction: None,
550 undeclared_parameters: None,
551 });
552 let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
553 assert!(rs.add(metadata, values, false).unwrap());
554 assert_eq!(rs.rows.len(), 4);
555 assert_multi_column(&rs);
556 assert!(!rs.chunked_value);
557
558 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
559 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
560 assert!(rs.next().is_none());
561 }
562
563 #[test]
564 fn test_rs_add_one_column_chunked_value() {
565 let mut rs = empty_rs();
566 let metadata = Some(ResultSetMetadata {
567 row_type: Some(StructType {
568 fields: vec![field("column1")],
569 }),
570 transaction: None,
571 undeclared_parameters: None,
572 });
573 let values = vec![value("value1"), value("value2"), value("val")];
574 assert!(rs.add(metadata.clone(), values, true).unwrap());
575 assert_eq!(rs.rows.len(), 3);
576 assert_one_column(&rs);
577 assert!(rs.chunked_value);
578
579 assert_some_one_column(rs.next(), "value1".to_string());
580 assert_some_one_column(rs.next(), "value2".to_string());
581 assert!(rs.next().is_none());
582
583 assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
585 assert!(!rs.chunked_value);
586 assert_eq!(rs.rows.len(), 1);
587 assert_some_one_column(rs.next(), "value3".to_string());
588 assert!(rs.next().is_none());
589 }
590
591 #[test]
592 fn test_rs_add_multi_column_chunked_value() {
593 let mut rs = empty_rs();
594 let metadata = Some(ResultSetMetadata {
595 row_type: Some(StructType {
596 fields: vec![field("column1"), field("column2")],
597 }),
598 transaction: None,
599 undeclared_parameters: None,
600 });
601 let values = vec![value("value1"), value("value2"), value("val")];
602 assert!(rs.add(metadata.clone(), values, true).unwrap());
603 assert_eq!(rs.rows.len(), 3);
604 assert_multi_column(&rs);
605 assert!(rs.chunked_value);
606
607 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
608 assert!(rs.next().is_none());
609
610 assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
612 assert!(!rs.chunked_value);
613 assert_eq!(rs.rows.len(), 1);
614 assert!(rs.next().is_none());
615
616 assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
618 assert!(!rs.chunked_value);
619 assert_eq!(rs.rows.len(), 2);
620 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
621 }
622
623 #[test]
624 fn test_rs_add_multi_column_no_chunked_value_list_value() {
625 let mut rs = empty_rs();
626 let metadata = Some(ResultSetMetadata {
627 row_type: Some(StructType {
628 fields: vec![field("column1"), field("column2")],
629 }),
630 transaction: None,
631 undeclared_parameters: None,
632 });
633 let values = vec![value(vec!["value1-1", "value1-2"])];
634 assert!(rs.add(metadata.clone(), values, false).unwrap());
635 assert_eq!(rs.rows.len(), 1);
636 assert_multi_column(&rs);
637 assert!(!rs.chunked_value);
638 assert!(rs.next().is_none());
639 assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
640 assert!(!rs.chunked_value);
641 assert_eq!(rs.rows.len(), 2);
642 assert_some_multi_column(
643 rs.next(),
644 vec!["value1-1".to_string(), "value1-2".to_string()],
645 vec!["value2-1".to_string()],
646 );
647 assert!(rs.next().is_none());
648 }
649
650 #[test]
651 fn test_rs_add_multi_column_chunked_value_list_value() {
652 let mut rs = empty_rs();
653 let metadata = Some(ResultSetMetadata {
654 row_type: Some(StructType {
655 fields: vec![field("column1"), field("column2")],
656 }),
657 transaction: None,
658 undeclared_parameters: None,
659 });
660 let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
661 assert!(rs.add(metadata.clone(), values, true).unwrap());
662 assert_eq!(rs.rows.len(), 2);
663 assert_multi_column(&rs);
664 assert!(rs.chunked_value);
665 assert!(rs.next().is_none());
666
667 assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
669 assert!(rs.chunked_value);
670 assert_eq!(rs.rows.len(), 2);
671 assert!(rs.next().is_none());
672
673 assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
675 assert!(!rs.chunked_value);
676 assert_eq!(rs.rows.len(), 2);
677 assert_some_multi_column(
678 rs.next(),
679 vec!["value1-1".to_string(), "value1-2".to_string()],
680 vec!["value2-1".to_string(), "value2-2".to_string()],
681 );
682 assert!(rs.next().is_none());
683 }
684
685 #[test]
686 fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
687 let mut rs = empty_rs();
688 let metadata = Some(ResultSetMetadata {
689 row_type: Some(StructType {
690 fields: vec![field("column1"), field("column2")],
691 }),
692 transaction: None,
693 undeclared_parameters: None,
694 });
695 let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
696 assert!(rs.add(metadata.clone(), values, true).unwrap());
697 assert_eq!(rs.rows.len(), 2);
698 assert_multi_column(&rs);
699 assert!(rs.chunked_value);
700 assert!(rs.next().is_none());
701
702 assert!(rs
704 .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
705 .unwrap());
706 assert!(rs.chunked_value);
707 assert_eq!(rs.rows.len(), 3);
708 assert_some_multi_column(
709 rs.next(),
710 vec!["value1-1".to_string(), "value1-2".to_string()],
711 "valueA".to_string(),
712 );
713 assert!(rs.next().is_none());
714
715 assert!(rs
717 .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
718 .unwrap());
719 assert!(!rs.chunked_value);
720 assert_eq!(rs.rows.len(), 1);
721 assert!(rs.next().is_none());
722
723 assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
725 assert!(rs.chunked_value);
726 assert_eq!(rs.rows.len(), 2);
727 assert!(rs.next().is_none());
728
729 assert!(rs.add(metadata, vec![value("B")], false).unwrap());
731 assert!(!rs.chunked_value);
732 assert_eq!(rs.rows.len(), 2);
733 assert_some_multi_column(
734 rs.next(),
735 vec!["value2-1".to_string(), "value2-2".to_string()],
736 "valueB".to_string(),
737 );
738 assert!(rs.next().is_none());
739 }
740}