1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost::Message;
5use prost_types::{value::Kind, Value};
6
7use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
8use google_cloud_googleapis::spanner::v1::struct_type::Field;
9use google_cloud_googleapis::spanner::v1::{
10 ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
11};
12
13use crate::retry::StreamingRetry;
14use crate::row::Row;
15use crate::session::SessionHandle;
16use crate::transaction::CallOptions;
17
18pub trait Reader: Send + Sync {
19 fn read(
20 &self,
21 session: &mut SessionHandle,
22 option: Option<CallOptions>,
23 disable_route_to_leader: bool,
24 ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
25
26 fn update_token(&mut self, resume_token: Vec<u8>);
27
28 fn can_resume(&self) -> bool;
29}
30
31pub struct StatementReader {
32 pub enable_resume: bool,
33 pub request: ExecuteSqlRequest,
34}
35
36impl Reader for StatementReader {
37 async fn read(
38 &self,
39 session: &mut SessionHandle,
40 option: Option<CallOptions>,
41 disable_route_to_leader: bool,
42 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
43 let option = option.unwrap_or_default();
44 let client = &mut session.spanner_client;
45 let result = client
46 .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
47 .await;
48 session.invalidate_if_needed(result).await
49 }
50
51 fn update_token(&mut self, resume_token: Vec<u8>) {
52 self.request.resume_token = resume_token;
53 }
54
55 fn can_resume(&self) -> bool {
56 self.enable_resume && !self.request.resume_token.is_empty()
57 }
58}
59
60pub struct TableReader {
61 pub request: ReadRequest,
62}
63
64impl Reader for TableReader {
65 async fn read(
66 &self,
67 session: &mut SessionHandle,
68 option: Option<CallOptions>,
69 disable_route_to_leader: bool,
70 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
71 let option = option.unwrap_or_default();
72 let client = &mut session.spanner_client;
73 let result = client
74 .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
75 .await;
76 session.invalidate_if_needed(result).await
77 }
78
79 fn update_token(&mut self, resume_token: Vec<u8>) {
80 self.request.resume_token = resume_token;
81 }
82
83 fn can_resume(&self) -> bool {
84 !self.request.resume_token.is_empty()
85 }
86}
87
88pub struct ResultSet {
89 fields: Arc<Vec<Field>>,
90 index: Arc<HashMap<String, usize>>,
91 rows: VecDeque<Value>,
92 chunked_value: bool,
93}
94
95const DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS: usize = 128 * 1024 * 1024;
96
97#[derive(Debug)]
98struct ResumablePartialResultSetBuffer {
99 pending: VecDeque<PartialResultSet>,
100 last_delivered_token: Vec<u8>,
101 observed_token: Vec<u8>,
102 bytes_between_tokens: usize,
103 max_bytes_between_tokens: usize,
104 unretryable: bool,
105}
106
107impl ResumablePartialResultSetBuffer {
108 fn new(max_bytes_between_tokens: usize) -> Self {
109 Self {
110 pending: VecDeque::new(),
111 last_delivered_token: Vec::new(),
112 observed_token: Vec::new(),
113 bytes_between_tokens: 0,
114 max_bytes_between_tokens,
115 unretryable: false,
116 }
117 }
118
119 fn push(&mut self, result_set: PartialResultSet) {
120 if !result_set.resume_token.is_empty() && result_set.resume_token != self.observed_token {
121 self.observed_token = result_set.resume_token.clone();
122 }
123
124 if !self.unretryable && self.observed_token == self.last_delivered_token {
125 self.bytes_between_tokens = self.bytes_between_tokens.saturating_add(result_set.encoded_len());
126 if self.bytes_between_tokens >= self.max_bytes_between_tokens {
127 self.unretryable = true;
128 }
129 }
130
131 self.pending.push_back(result_set);
132 }
133
134 fn pop_ready(&mut self, end_of_stream: bool) -> Option<PartialResultSet> {
135 if self.pending.is_empty() {
136 return None;
137 }
138
139 if self.unretryable || end_of_stream {
140 return self.pending.pop_front();
141 }
142
143 if self.observed_token != self.last_delivered_token {
144 let result_set = self.pending.pop_front();
145 if let Some(ref rs) = result_set {
146 if !rs.resume_token.is_empty() && rs.resume_token == self.observed_token {
147 self.last_delivered_token = self.observed_token.clone();
148 self.bytes_between_tokens = 0;
149 }
150 }
151 return result_set;
152 }
153
154 None
155 }
156
157 fn on_resumption(&mut self) {
158 self.pending.clear();
159 self.observed_token = self.last_delivered_token.clone();
160 self.bytes_between_tokens = 0;
161 self.unretryable = false;
162 }
163}
164
165impl ResultSet {
166 fn next(&mut self) -> Option<Row> {
167 if !self.rows.is_empty() {
168 let column_length = self.fields.len();
169 let target_record_is_chunked = self.rows.len() < column_length;
170 let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
171
172 if !target_record_is_chunked && !target_record_contains_chunked_value {
173 let mut values = Vec::with_capacity(column_length);
175 for _ in 0..column_length {
176 values.push(self.rows.pop_front().unwrap());
177 }
178 return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
179 }
180 }
181 None
182 }
183
184 fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
186 match previous_last.kind.unwrap() {
187 Kind::StringValue(last) => match current_first.kind.unwrap() {
188 Kind::StringValue(first) => {
189 tracing::trace!("previous_last={}, current_first={}", &last, first);
190 Ok(Value {
191 kind: Some(Kind::StringValue(last + &first)),
192 })
193 }
194 _ => Err(Status::new(
195 Code::Internal,
196 "chunks kind mismatch: current_first must be StringKind",
197 )),
198 },
199 Kind::ListValue(mut last) => match current_first.kind.unwrap() {
200 Kind::ListValue(mut first) => {
201 let first_value_of_current = first.values.remove(0);
202 let merged = match last.values.pop() {
203 Some(last_value_of_previous) => {
204 ResultSet::merge(last_value_of_previous, first_value_of_current)?
205 }
206 None => first_value_of_current,
208 };
209 last.values.push(merged);
210 last.values.extend(first.values);
211 Ok(Value {
212 kind: Some(Kind::ListValue(last)),
213 })
214 }
215 _ => Err(Status::new(
216 Code::Internal,
217 "chunks kind mismatch: current_first must be ListValue",
218 )),
219 },
220 _ => Err(Status::new(
221 Code::Internal,
222 "previous_last kind mismatch: only StringValue and ListValue can be chunked",
223 )),
224 }
225 }
226
227 fn add(
228 &mut self,
229 metadata: Option<ResultSetMetadata>,
230 mut values: Vec<Value>,
231 chunked_value: bool,
232 ) -> Result<bool, Status> {
233 if self.fields.is_empty() {
235 if let Some(metadata) = metadata {
236 self.fields = metadata
237 .row_type
238 .map(|e| Arc::new(e.fields))
239 .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
240 let mut index = HashMap::new();
242 for (i, f) in self.fields.iter().enumerate() {
243 index.insert(f.name.clone(), i);
244 }
245 self.index = Arc::new(index);
246 }
247 }
248
249 if self.chunked_value {
250 tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
251 let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
253 self.rows.push_back(merged);
254 }
255 self.rows.extend(values);
256 self.chunked_value = chunked_value;
257 Ok(true)
258 }
259
260 fn is_row_boundary(&self) -> bool {
261 if self.fields.is_empty() {
262 return self.rows.is_empty() && !self.chunked_value;
263 }
264 if self.chunked_value {
265 return false;
266 }
267 let columns = self.fields.len();
268 if columns == 0 {
269 return self.rows.is_empty();
270 }
271 self.rows.len().is_multiple_of(columns)
272 }
273}
274
275pub struct RowIterator<'a, T>
276where
277 T: Reader,
278{
279 streaming: Streaming<PartialResultSet>,
280 session: &'a mut SessionHandle,
281 reader: T,
282 rs: ResultSet,
283 reader_option: Option<CallOptions>,
284 disable_route_to_leader: bool,
285 stats: Option<ResultSetStats>,
286 prs_buffer: ResumablePartialResultSetBuffer,
287 resumable: bool,
288 end_of_stream: bool,
289 stream_retry: StreamingRetry,
290}
291
292impl<'a, T> RowIterator<'a, T>
293where
294 T: Reader,
295{
296 pub(crate) async fn new(
297 session: &'a mut SessionHandle,
298 reader: T,
299 option: Option<CallOptions>,
300 disable_route_to_leader: bool,
301 ) -> Result<RowIterator<'a, T>, Status> {
302 let streaming = reader
303 .read(session, option, disable_route_to_leader)
304 .await?
305 .into_inner();
306 let rs = ResultSet {
307 fields: Arc::new(vec![]),
308 index: Arc::new(HashMap::new()),
309 rows: VecDeque::new(),
310 chunked_value: false,
311 };
312 Ok(Self {
313 streaming,
314 session,
315 reader,
316 rs,
317 reader_option: None,
318 disable_route_to_leader,
319 stats: None,
320 prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
321 resumable: true,
322 end_of_stream: false,
323 stream_retry: StreamingRetry::new(),
324 })
325 }
326
327 pub fn set_call_options(&mut self, option: CallOptions) {
328 self.reader_option = Some(option);
329 }
330
331 async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
332 loop {
333 if let Some(result_set) = self.prs_buffer.pop_ready(self.end_of_stream) {
334 if result_set.values.is_empty() {
335 return Ok(false);
336 }
337 let resume_token_present = !result_set.resume_token.is_empty();
338 if resume_token_present {
340 self.reader.update_token(result_set.resume_token.clone());
341 }
342 if result_set.stats.is_some() {
344 self.stats = result_set.stats;
345 }
346 let added = self
347 .rs
348 .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
349 if resume_token_present && !self.rs.is_row_boundary() {
350 return Err(Status::new(Code::FailedPrecondition, "resume token is not on a row boundary"));
351 }
352 return Ok(added);
353 }
354
355 if self.end_of_stream {
356 return Ok(false);
357 }
358
359 let received = match self.streaming.message().await {
360 Ok(s) => s,
361 Err(e) => {
362 if !self.reader.can_resume() || !self.resumable {
363 return Err(e);
364 }
365 tracing::debug!("streaming error: {}. resume reading by resume_token", e);
366 self.stream_retry.next(e).await?;
367 let call_option = option.clone();
368 let result = self
369 .reader
370 .read(self.session, call_option, self.disable_route_to_leader)
371 .await?;
372 self.streaming = result.into_inner();
373 self.prs_buffer.on_resumption();
374 continue;
375 }
376 };
377
378 match received {
379 Some(result_set) => {
380 if result_set.last {
381 self.end_of_stream = true;
382 }
383 self.prs_buffer.push(result_set);
384 if self.prs_buffer.unretryable {
385 self.resumable = false;
386 }
387 }
388 None => {
389 self.end_of_stream = true;
390 }
391 }
392 }
393 }
394
395 pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
397 &self.rs.fields
398 }
399
400 pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
401 for (i, val) in self.rs.fields.iter().enumerate() {
402 if val.name == column_name {
403 return Some((i, val.clone()));
404 }
405 }
406 None
407 }
408
409 pub fn stats(&self) -> Option<&ResultSetStats> {
413 self.stats.as_ref()
414 }
415
416 pub async fn next(&mut self) -> Result<Option<Row>, Status> {
419 loop {
420 let row = self.rs.next();
421 if row.is_some() {
422 return Ok(row);
423 }
424 if !self.try_recv(self.reader_option.clone()).await? {
426 return Ok(None);
427 }
428 }
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use std::collections::VecDeque;
435 use std::sync::Arc;
436
437 use prost_types::value::Kind;
438 use prost_types::Value;
439
440 use google_cloud_googleapis::spanner::v1::struct_type::Field;
441 use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
442
443 use crate::reader::ResultSet;
444 use crate::row::{Row, TryFromValue};
445 use crate::statement::ToKind;
446
447 fn empty_rs() -> ResultSet {
448 ResultSet {
449 fields: Arc::new(vec![]),
450 index: Arc::new(Default::default()),
451 rows: Default::default(),
452 chunked_value: false,
453 }
454 }
455
456 fn field(name: &str) -> Field {
457 Field {
458 name: name.to_string(),
459 r#type: None,
460 }
461 }
462
463 fn value(to_kind: impl ToKind) -> Value {
464 Value {
465 kind: Some(to_kind.to_kind()),
466 }
467 }
468
469 fn prs(values: Vec<Value>, resume_token: &str, chunked_value: bool) -> PartialResultSet {
470 PartialResultSet {
471 metadata: None,
472 values,
473 chunked_value,
474 resume_token: resume_token.as_bytes().to_vec(),
475 stats: None,
476 precommit_token: None,
477 last: false,
478 }
479 }
480
481 fn assert_one_column(rs: &ResultSet) {
482 assert_eq!(rs.fields.len(), 1);
483 assert_eq!(rs.fields[0].name, "column1".to_string());
484 assert_eq!(*rs.index.get("column1").unwrap(), 0);
485 }
486
487 fn assert_multi_column(rs: &ResultSet) {
488 assert_eq!(rs.fields.len(), 2);
489 assert_eq!(rs.fields[0].name, "column1".to_string());
490 assert_eq!(rs.fields[1].name, "column2".to_string());
491 assert_eq!(*rs.index.get("column1").unwrap(), 0);
492 assert_eq!(*rs.index.get("column2").unwrap(), 1);
493 }
494
495 fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
496 assert!(row.is_some());
497 assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
498 }
499
500 fn assert_some_multi_column<
501 T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
502 T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
503 >(
504 row: Option<Row>,
505 v1: T1,
506 v2: T2,
507 ) {
508 assert!(row.is_some());
509 let v = row.unwrap();
510 assert_eq!(v1, v.column::<T1>(0).unwrap());
511 assert_eq!(v2, v.column::<T2>(1).unwrap());
512 }
513
514 #[test]
515 fn test_rs_next_empty() {
516 let mut rs = ResultSet {
517 fields: Arc::new(vec![field("column1")]),
518 index: Arc::new(Default::default()),
519 rows: Default::default(),
520 chunked_value: false,
521 };
522 assert!(rs.next().is_none());
523 }
524
525 #[test]
526 fn test_rs_next_record_chunked_or_not() {
527 let rs = |values| ResultSet {
528 fields: Arc::new(vec![field("column1"), field("column2")]),
529 index: Arc::new(Default::default()),
530 rows: VecDeque::from(values),
531 chunked_value: false,
532 };
533 let mut rs1 = rs(vec![value("value1")]);
534 assert!(rs1.next().is_none());
535 let mut rs2 = rs(vec![value("value1"), value("value2")]);
536 assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
537 }
538
539 #[test]
540 fn test_rs_next_value_chunked_or_not() {
541 let rs = |chunked_value| ResultSet {
542 fields: Arc::new(vec![field("column1"), field("column2")]),
543 index: Arc::new(Default::default()),
544 rows: VecDeque::from(vec![value("value1"), value("value2")]),
545 chunked_value,
546 };
547 assert!(rs(true).next().is_none());
548 assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
549 }
550
551 #[test]
552 fn test_rs_next_plural_record_one_column() {
553 let rs = |chunked_value| ResultSet {
554 fields: Arc::new(vec![field("column1")]),
555 index: Arc::new(Default::default()),
556 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
557 chunked_value,
558 };
559 let mut incomplete = rs(true);
560 assert!(incomplete.next().is_some());
561 assert!(incomplete.next().is_some());
562 assert!(incomplete.next().is_none());
563 let mut complete = rs(false);
564 assert!(complete.next().is_some());
565 assert!(complete.next().is_some());
566 assert!(complete.next().is_some());
567 assert!(complete.next().is_none());
568 }
569
570 #[test]
571 fn test_rs_next_plural_record_multi_column() {
572 let rs = |chunked_value| ResultSet {
573 fields: Arc::new(vec![field("column1"), field("column2")]),
574 index: Arc::new(Default::default()),
575 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
576 chunked_value,
577 };
578 let mut incomplete = rs(true);
579 assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
580 assert!(incomplete.next().is_none());
581 let mut complete = rs(false);
582 assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
583 assert!(incomplete.next().is_none());
584 }
585
586 #[test]
587 fn test_rs_merge_string_value() {
588 let result = ResultSet::merge(value("val"), value("ue1"));
589 assert!(result.is_ok());
590 let kind = result.unwrap().kind.unwrap();
591 match kind {
592 Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
593 _ => unreachable!("must be string value"),
594 }
595 }
596
597 #[test]
598 fn test_rs_merge_list_value() {
599 let previous_last = value(vec!["value1-1", "value1-2", "val"]);
600 let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
601 let result = ResultSet::merge(previous_last, current_first);
602 assert!(result.is_ok());
603 let kind = result.unwrap().kind.unwrap();
604 match kind {
605 Kind::ListValue(v) => {
606 assert_eq!(v.values.len(), 5);
607 match v.values[0].kind.as_ref().unwrap() {
608 Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
609 _ => unreachable!("must be string value"),
610 };
611 match v.values[1].kind.as_ref().unwrap() {
612 Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
613 _ => unreachable!("must be string value"),
614 };
615 match v.values[2].kind.as_ref().unwrap() {
616 Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
617 _ => unreachable!("must be string value"),
618 };
619 match v.values[3].kind.as_ref().unwrap() {
620 Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
621 _ => unreachable!("must be string value"),
622 }
623 match v.values[4].kind.as_ref().unwrap() {
624 Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
625 _ => unreachable!("must be string value"),
626 }
627 }
628 _ => unreachable!("must be string value"),
629 }
630 }
631
632 #[test]
633 fn test_rs_add_one_column_no_chunked_value() {
634 let mut rs = empty_rs();
635 let metadata = Some(ResultSetMetadata {
636 row_type: Some(StructType {
637 fields: vec![field("column1")],
638 }),
639 transaction: None,
640 undeclared_parameters: None,
641 });
642 let values = vec![value("value1"), value("value2"), value("value3")];
643 assert!(rs.add(metadata, values, false).unwrap());
644 assert_eq!(rs.rows.len(), 3);
645 assert_one_column(&rs);
646 assert!(!rs.chunked_value);
647
648 assert_some_one_column(rs.next(), "value1".to_string());
649 assert_some_one_column(rs.next(), "value2".to_string());
650 assert_some_one_column(rs.next(), "value3".to_string());
651 assert!(rs.next().is_none());
652 }
653
654 #[test]
655 fn test_rs_add_multi_column_no_chunked_value() {
656 let mut rs = empty_rs();
657 let metadata = Some(ResultSetMetadata {
658 row_type: Some(StructType {
659 fields: vec![field("column1"), field("column2")],
660 }),
661 transaction: None,
662 undeclared_parameters: None,
663 });
664 let values = vec![value("value1"), value("value2"), value("value3")];
665 assert!(rs.add(metadata, values, false).unwrap());
666 assert_eq!(rs.rows.len(), 3);
667 assert_multi_column(&rs);
668 assert!(!rs.chunked_value);
669
670 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
671 assert!(rs.next().is_none());
672 }
673
674 #[test]
675 fn test_rs_add_multi_column_no_chunked_value_just() {
676 let mut rs = empty_rs();
677 let metadata = Some(ResultSetMetadata {
678 row_type: Some(StructType {
679 fields: vec![field("column1"), field("column2")],
680 }),
681 transaction: None,
682 undeclared_parameters: None,
683 });
684 let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
685 assert!(rs.add(metadata, values, false).unwrap());
686 assert_eq!(rs.rows.len(), 4);
687 assert_multi_column(&rs);
688 assert!(!rs.chunked_value);
689
690 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
691 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
692 assert!(rs.next().is_none());
693 }
694
695 #[test]
696 fn test_rs_add_one_column_chunked_value() {
697 let mut rs = empty_rs();
698 let metadata = Some(ResultSetMetadata {
699 row_type: Some(StructType {
700 fields: vec![field("column1")],
701 }),
702 transaction: None,
703 undeclared_parameters: None,
704 });
705 let values = vec![value("value1"), value("value2"), value("val")];
706 assert!(rs.add(metadata.clone(), values, true).unwrap());
707 assert_eq!(rs.rows.len(), 3);
708 assert_one_column(&rs);
709 assert!(rs.chunked_value);
710
711 assert_some_one_column(rs.next(), "value1".to_string());
712 assert_some_one_column(rs.next(), "value2".to_string());
713 assert!(rs.next().is_none());
714
715 assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
717 assert!(!rs.chunked_value);
718 assert_eq!(rs.rows.len(), 1);
719 assert_some_one_column(rs.next(), "value3".to_string());
720 assert!(rs.next().is_none());
721 }
722
723 #[test]
724 fn test_rs_add_multi_column_chunked_value() {
725 let mut rs = empty_rs();
726 let metadata = Some(ResultSetMetadata {
727 row_type: Some(StructType {
728 fields: vec![field("column1"), field("column2")],
729 }),
730 transaction: None,
731 undeclared_parameters: None,
732 });
733 let values = vec![value("value1"), value("value2"), value("val")];
734 assert!(rs.add(metadata.clone(), values, true).unwrap());
735 assert_eq!(rs.rows.len(), 3);
736 assert_multi_column(&rs);
737 assert!(rs.chunked_value);
738
739 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
740 assert!(rs.next().is_none());
741
742 assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
744 assert!(!rs.chunked_value);
745 assert_eq!(rs.rows.len(), 1);
746 assert!(rs.next().is_none());
747
748 assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
750 assert!(!rs.chunked_value);
751 assert_eq!(rs.rows.len(), 2);
752 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
753 }
754
755 #[test]
756 fn test_rs_add_multi_column_no_chunked_value_list_value() {
757 let mut rs = empty_rs();
758 let metadata = Some(ResultSetMetadata {
759 row_type: Some(StructType {
760 fields: vec![field("column1"), field("column2")],
761 }),
762 transaction: None,
763 undeclared_parameters: None,
764 });
765 let values = vec![value(vec!["value1-1", "value1-2"])];
766 assert!(rs.add(metadata.clone(), values, false).unwrap());
767 assert_eq!(rs.rows.len(), 1);
768 assert_multi_column(&rs);
769 assert!(!rs.chunked_value);
770 assert!(rs.next().is_none());
771 assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
772 assert!(!rs.chunked_value);
773 assert_eq!(rs.rows.len(), 2);
774 assert_some_multi_column(
775 rs.next(),
776 vec!["value1-1".to_string(), "value1-2".to_string()],
777 vec!["value2-1".to_string()],
778 );
779 assert!(rs.next().is_none());
780 }
781
782 #[test]
783 fn test_rs_add_multi_column_chunked_value_list_value() {
784 let mut rs = empty_rs();
785 let metadata = Some(ResultSetMetadata {
786 row_type: Some(StructType {
787 fields: vec![field("column1"), field("column2")],
788 }),
789 transaction: None,
790 undeclared_parameters: None,
791 });
792 let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
793 assert!(rs.add(metadata.clone(), values, true).unwrap());
794 assert_eq!(rs.rows.len(), 2);
795 assert_multi_column(&rs);
796 assert!(rs.chunked_value);
797 assert!(rs.next().is_none());
798
799 assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
801 assert!(rs.chunked_value);
802 assert_eq!(rs.rows.len(), 2);
803 assert!(rs.next().is_none());
804
805 assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
807 assert!(!rs.chunked_value);
808 assert_eq!(rs.rows.len(), 2);
809 assert_some_multi_column(
810 rs.next(),
811 vec!["value1-1".to_string(), "value1-2".to_string()],
812 vec!["value2-1".to_string(), "value2-2".to_string()],
813 );
814 assert!(rs.next().is_none());
815 }
816
817 #[test]
818 fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
819 let mut rs = empty_rs();
820 let metadata = Some(ResultSetMetadata {
821 row_type: Some(StructType {
822 fields: vec![field("column1"), field("column2")],
823 }),
824 transaction: None,
825 undeclared_parameters: None,
826 });
827 let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
828 assert!(rs.add(metadata.clone(), values, true).unwrap());
829 assert_eq!(rs.rows.len(), 2);
830 assert_multi_column(&rs);
831 assert!(rs.chunked_value);
832 assert!(rs.next().is_none());
833
834 assert!(rs
836 .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
837 .unwrap());
838 assert!(rs.chunked_value);
839 assert_eq!(rs.rows.len(), 3);
840 assert_some_multi_column(
841 rs.next(),
842 vec!["value1-1".to_string(), "value1-2".to_string()],
843 "valueA".to_string(),
844 );
845 assert!(rs.next().is_none());
846
847 assert!(rs
849 .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
850 .unwrap());
851 assert!(!rs.chunked_value);
852 assert_eq!(rs.rows.len(), 1);
853 assert!(rs.next().is_none());
854
855 assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
857 assert!(rs.chunked_value);
858 assert_eq!(rs.rows.len(), 2);
859 assert!(rs.next().is_none());
860
861 assert!(rs.add(metadata, vec![value("B")], false).unwrap());
863 assert!(!rs.chunked_value);
864 assert_eq!(rs.rows.len(), 2);
865 assert_some_multi_column(
866 rs.next(),
867 vec!["value2-1".to_string(), "value2-2".to_string()],
868 "valueB".to_string(),
869 );
870 assert!(rs.next().is_none());
871 }
872
873 #[test]
874 fn test_prs_buffer_waits_for_resume_token() {
875 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
876 buffer.push(prs(vec![value("value-1")], "", false));
877 assert!(buffer.pop_ready(false).is_none());
878
879 buffer.push(prs(vec![value("value-2")], "token-1", false));
880 assert!(buffer.pop_ready(false).is_some());
881 assert!(buffer.pop_ready(false).is_some());
882 assert!(buffer.pop_ready(false).is_none());
883 }
884
885 #[test]
886 fn test_prs_buffer_flushes_on_end_of_stream() {
887 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
888 buffer.push(prs(vec![value("value-1")], "", false));
889 assert!(buffer.pop_ready(false).is_none());
890 assert!(buffer.pop_ready(true).is_some());
891 assert!(buffer.pop_ready(true).is_none());
892 }
893
894 #[test]
895 fn test_prs_buffer_becomes_unretryable_after_limit() {
896 let mut buffer = super::ResumablePartialResultSetBuffer::new(1);
897 buffer.push(prs(vec![value("value-1")], "", false));
898 assert!(buffer.unretryable);
899 assert!(buffer.pop_ready(false).is_some());
900 }
901
902 #[test]
903 fn test_prs_buffer_on_resumption_discards_pending() {
904 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
905 buffer.push(prs(vec![value("value-1")], "", false));
906 buffer.on_resumption();
907 buffer.push(prs(vec![value("value-2")], "token-1", false));
908 assert!(buffer.pop_ready(false).is_some());
909 assert!(buffer.pop_ready(false).is_none());
910 }
911
912 #[test]
913 fn test_rs_is_row_boundary_empty() {
914 let rs = empty_rs();
915 assert!(rs.is_row_boundary());
916 }
917
918 #[test]
919 fn test_rs_is_row_boundary_chunked() {
920 let rs = ResultSet {
921 fields: Arc::new(vec![field("column1")]),
922 index: Arc::new(Default::default()),
923 rows: VecDeque::from(vec![value("value1")]),
924 chunked_value: true,
925 };
926 assert!(!rs.is_row_boundary());
927 }
928
929 #[test]
930 fn test_rs_is_row_boundary_multiple_columns() {
931 let rs_complete = ResultSet {
932 fields: Arc::new(vec![field("column1"), field("column2")]),
933 index: Arc::new(Default::default()),
934 rows: VecDeque::from(vec![value("value1"), value("value2")]),
935 chunked_value: false,
936 };
937 assert!(rs_complete.is_row_boundary());
938
939 let rs_partial = ResultSet {
940 fields: Arc::new(vec![field("column1"), field("column2")]),
941 index: Arc::new(Default::default()),
942 rows: VecDeque::from(vec![value("value1")]),
943 chunked_value: false,
944 };
945 assert!(!rs_partial.is_row_boundary());
946 }
947}