1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost::Message;
5use prost_types::{value::Kind, Value};
6
7use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
8use google_cloud_googleapis::spanner::v1::struct_type::Field;
9use google_cloud_googleapis::spanner::v1::{
10 ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
11};
12
13use crate::retry::StreamingRetry;
14use crate::row::Row;
15use crate::session::SessionHandle;
16use crate::transaction::CallOptions;
17
18pub trait Reader: Send + Sync {
19 fn read(
20 &self,
21 session: &mut SessionHandle,
22 option: Option<CallOptions>,
23 disable_route_to_leader: bool,
24 ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
25
26 fn update_token(&mut self, resume_token: Vec<u8>);
27
28 fn can_resume(&self) -> bool;
29}
30
31pub struct StatementReader {
32 pub enable_resume: bool,
33 pub request: ExecuteSqlRequest,
34}
35
36impl Reader for StatementReader {
37 async fn read(
38 &self,
39 session: &mut SessionHandle,
40 option: Option<CallOptions>,
41 disable_route_to_leader: bool,
42 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
43 let option = option.unwrap_or_default();
44 let client = &mut session.spanner_client;
45 let result = client
46 .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
47 .await;
48 session.invalidate_if_needed(result).await
49 }
50
51 fn update_token(&mut self, resume_token: Vec<u8>) {
52 self.request.resume_token = resume_token;
53 }
54
55 fn can_resume(&self) -> bool {
56 self.enable_resume && !self.request.resume_token.is_empty()
57 }
58}
59
60pub struct TableReader {
61 pub request: ReadRequest,
62}
63
64impl Reader for TableReader {
65 async fn read(
66 &self,
67 session: &mut SessionHandle,
68 option: Option<CallOptions>,
69 disable_route_to_leader: bool,
70 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
71 let option = option.unwrap_or_default();
72 let client = &mut session.spanner_client;
73 let result = client
74 .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
75 .await;
76 session.invalidate_if_needed(result).await
77 }
78
79 fn update_token(&mut self, resume_token: Vec<u8>) {
80 self.request.resume_token = resume_token;
81 }
82
83 fn can_resume(&self) -> bool {
84 !self.request.resume_token.is_empty()
85 }
86}
87
88pub struct ResultSet {
89 fields: Arc<Vec<Field>>,
90 index: Arc<HashMap<String, usize>>,
91 rows: VecDeque<Value>,
92 chunked_value: bool,
93}
94
95const DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS: usize = 128 * 1024 * 1024;
96
97#[derive(Debug)]
98struct ResumablePartialResultSetBuffer {
99 pending: VecDeque<PartialResultSet>,
100 last_delivered_token: Vec<u8>,
101 observed_token: Vec<u8>,
102 bytes_between_tokens: usize,
103 max_bytes_between_tokens: usize,
104 unretryable: bool,
105}
106
107impl ResumablePartialResultSetBuffer {
108 fn new(max_bytes_between_tokens: usize) -> Self {
109 Self {
110 pending: VecDeque::new(),
111 last_delivered_token: Vec::new(),
112 observed_token: Vec::new(),
113 bytes_between_tokens: 0,
114 max_bytes_between_tokens,
115 unretryable: false,
116 }
117 }
118
119 fn push(&mut self, result_set: PartialResultSet) {
120 if !result_set.resume_token.is_empty() && result_set.resume_token != self.observed_token {
121 self.observed_token = result_set.resume_token.clone();
122 }
123
124 if !self.unretryable && self.observed_token == self.last_delivered_token {
125 self.bytes_between_tokens = self.bytes_between_tokens.saturating_add(result_set.encoded_len());
126 if self.bytes_between_tokens >= self.max_bytes_between_tokens {
127 self.unretryable = true;
128 }
129 }
130
131 self.pending.push_back(result_set);
132 }
133
134 fn pop_ready(&mut self, end_of_stream: bool) -> Option<PartialResultSet> {
135 if self.pending.is_empty() {
136 return None;
137 }
138
139 if self.unretryable || end_of_stream {
140 return self.pending.pop_front();
141 }
142
143 if self.observed_token != self.last_delivered_token {
144 let result_set = self.pending.pop_front();
145 if let Some(ref rs) = result_set {
146 if !rs.resume_token.is_empty() && rs.resume_token == self.observed_token {
147 self.last_delivered_token = self.observed_token.clone();
148 self.bytes_between_tokens = 0;
149 }
150 }
151 return result_set;
152 }
153
154 None
155 }
156
157 fn on_resumption(&mut self) {
158 self.pending.clear();
159 self.observed_token = self.last_delivered_token.clone();
160 self.bytes_between_tokens = 0;
161 self.unretryable = false;
162 }
163}
164
165impl ResultSet {
166 fn next(&mut self) -> Option<Row> {
167 if !self.rows.is_empty() {
168 let column_length = self.fields.len();
169 let target_record_is_chunked = self.rows.len() < column_length;
170 let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
171
172 if !target_record_is_chunked && !target_record_contains_chunked_value {
173 let mut values = Vec::with_capacity(column_length);
175 for _ in 0..column_length {
176 values.push(self.rows.pop_front().unwrap());
177 }
178 return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
179 }
180 }
181 None
182 }
183
184 fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
186 match previous_last.kind.unwrap() {
187 Kind::StringValue(last) => match current_first.kind.unwrap() {
188 Kind::StringValue(first) => {
189 tracing::trace!("previous_last={}, current_first={}", &last, first);
190 Ok(Value {
191 kind: Some(Kind::StringValue(last + &first)),
192 })
193 }
194 _ => Err(Status::new(
195 Code::Internal,
196 "chunks kind mismatch: current_first must be StringKind",
197 )),
198 },
199 Kind::ListValue(mut last) => match current_first.kind.unwrap() {
200 Kind::ListValue(mut first) => {
201 let first_value_of_current = first.values.remove(0);
202 let merged = match last.values.pop() {
203 Some(last_value_of_previous) => {
204 ResultSet::merge(last_value_of_previous, first_value_of_current)?
205 }
206 None => first_value_of_current,
208 };
209 last.values.push(merged);
210 last.values.extend(first.values);
211 Ok(Value {
212 kind: Some(Kind::ListValue(last)),
213 })
214 }
215 _ => Err(Status::new(
216 Code::Internal,
217 "chunks kind mismatch: current_first must be ListValue",
218 )),
219 },
220 _ => Err(Status::new(
221 Code::Internal,
222 "previous_last kind mismatch: only StringValue and ListValue can be chunked",
223 )),
224 }
225 }
226
227 fn add(
228 &mut self,
229 metadata: Option<ResultSetMetadata>,
230 mut values: Vec<Value>,
231 chunked_value: bool,
232 ) -> Result<bool, Status> {
233 if self.fields.is_empty() {
235 if let Some(metadata) = metadata {
236 self.fields = metadata
237 .row_type
238 .map(|e| Arc::new(e.fields))
239 .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
240 let mut index = HashMap::new();
242 for (i, f) in self.fields.iter().enumerate() {
243 index.insert(f.name.clone(), i);
244 }
245 self.index = Arc::new(index);
246 }
247 }
248
249 if self.chunked_value {
250 tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
251 let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
253 self.rows.push_back(merged);
254 }
255 self.rows.extend(values);
256 self.chunked_value = chunked_value;
257 Ok(true)
258 }
259
260 fn is_row_boundary(&self) -> bool {
261 if self.fields.is_empty() {
262 return self.rows.is_empty() && !self.chunked_value;
263 }
264 if self.chunked_value {
265 return false;
266 }
267 let columns = self.fields.len();
268 if columns == 0 {
269 return self.rows.is_empty();
270 }
271 self.rows.len().is_multiple_of(columns)
272 }
273}
274
275pub struct RowIterator<'a, T>
276where
277 T: Reader,
278{
279 streaming: Streaming<PartialResultSet>,
280 session: &'a mut SessionHandle,
281 reader: T,
282 rs: ResultSet,
283 reader_option: Option<CallOptions>,
284 disable_route_to_leader: bool,
285 stats: Option<ResultSetStats>,
286 prs_buffer: ResumablePartialResultSetBuffer,
287 resumable: bool,
288 end_of_stream: bool,
289 stream_retry: StreamingRetry,
290}
291
292impl<'a, T> RowIterator<'a, T>
293where
294 T: Reader,
295{
296 pub(crate) async fn new(
297 session: &'a mut SessionHandle,
298 reader: T,
299 option: Option<CallOptions>,
300 disable_route_to_leader: bool,
301 ) -> Result<RowIterator<'a, T>, Status> {
302 let streaming = reader
303 .read(session, option, disable_route_to_leader)
304 .await?
305 .into_inner();
306 let rs = ResultSet {
307 fields: Arc::new(vec![]),
308 index: Arc::new(HashMap::new()),
309 rows: VecDeque::new(),
310 chunked_value: false,
311 };
312 Ok(Self {
313 streaming,
314 session,
315 reader,
316 rs,
317 reader_option: None,
318 disable_route_to_leader,
319 stats: None,
320 prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
321 resumable: true,
322 end_of_stream: false,
323 stream_retry: StreamingRetry::new(),
324 })
325 }
326
327 pub fn set_call_options(&mut self, option: CallOptions) {
328 self.reader_option = Some(option);
329 }
330
331 async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
332 loop {
333 if let Some(result_set) = self.prs_buffer.pop_ready(self.end_of_stream) {
334 let resume_token_present = !result_set.resume_token.is_empty();
335 if resume_token_present {
337 self.reader.update_token(result_set.resume_token.clone());
338 }
339 if result_set.stats.is_some() {
341 self.stats = result_set.stats;
342 }
343 if result_set.values.is_empty() {
344 self.rs
346 .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
347 return Ok(false);
348 }
349 let added = self
350 .rs
351 .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
352 if resume_token_present && !self.rs.is_row_boundary() {
353 return Err(Status::new(Code::FailedPrecondition, "resume token is not on a row boundary"));
354 }
355 return Ok(added);
356 }
357
358 if self.end_of_stream {
359 return Ok(false);
360 }
361
362 let received = match self.streaming.message().await {
363 Ok(s) => s,
364 Err(e) => {
365 if !self.reader.can_resume() || !self.resumable {
366 return Err(e);
367 }
368 tracing::debug!("streaming error: {}. resume reading by resume_token", e);
369 self.stream_retry.next(e).await?;
370 let call_option = option.clone();
371 let result = self
372 .reader
373 .read(self.session, call_option, self.disable_route_to_leader)
374 .await?;
375 self.streaming = result.into_inner();
376 self.prs_buffer.on_resumption();
377 continue;
378 }
379 };
380
381 match received {
382 Some(result_set) => {
383 if result_set.last {
384 self.end_of_stream = true;
385 }
386 self.prs_buffer.push(result_set);
387 if self.prs_buffer.unretryable {
388 self.resumable = false;
389 }
390 }
391 None => {
392 self.end_of_stream = true;
393 }
394 }
395 }
396 }
397
398 pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
400 &self.rs.fields
401 }
402
403 pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
404 for (i, val) in self.rs.fields.iter().enumerate() {
405 if val.name == column_name {
406 return Some((i, val.clone()));
407 }
408 }
409 None
410 }
411
412 pub fn stats(&self) -> Option<&ResultSetStats> {
416 self.stats.as_ref()
417 }
418
419 pub async fn next(&mut self) -> Result<Option<Row>, Status> {
422 loop {
423 let row = self.rs.next();
424 if row.is_some() {
425 return Ok(row);
426 }
427 if !self.try_recv(self.reader_option.clone()).await? {
429 return Ok(None);
430 }
431 }
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use std::collections::VecDeque;
438 use std::sync::Arc;
439
440 use prost_types::value::Kind;
441 use prost_types::Value;
442
443 use google_cloud_googleapis::spanner::v1::struct_type::Field;
444 use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
445
446 use crate::reader::ResultSet;
447 use crate::row::{Row, TryFromValue};
448 use crate::statement::ToKind;
449
450 fn empty_rs() -> ResultSet {
451 ResultSet {
452 fields: Arc::new(vec![]),
453 index: Arc::new(Default::default()),
454 rows: Default::default(),
455 chunked_value: false,
456 }
457 }
458
459 fn field(name: &str) -> Field {
460 Field {
461 name: name.to_string(),
462 r#type: None,
463 }
464 }
465
466 fn value(to_kind: impl ToKind) -> Value {
467 Value {
468 kind: Some(to_kind.to_kind()),
469 }
470 }
471
472 fn prs(values: Vec<Value>, resume_token: &str, chunked_value: bool) -> PartialResultSet {
473 PartialResultSet {
474 metadata: None,
475 values,
476 chunked_value,
477 resume_token: resume_token.as_bytes().to_vec(),
478 stats: None,
479 precommit_token: None,
480 last: false,
481 }
482 }
483
484 fn assert_one_column(rs: &ResultSet) {
485 assert_eq!(rs.fields.len(), 1);
486 assert_eq!(rs.fields[0].name, "column1".to_string());
487 assert_eq!(*rs.index.get("column1").unwrap(), 0);
488 }
489
490 fn assert_multi_column(rs: &ResultSet) {
491 assert_eq!(rs.fields.len(), 2);
492 assert_eq!(rs.fields[0].name, "column1".to_string());
493 assert_eq!(rs.fields[1].name, "column2".to_string());
494 assert_eq!(*rs.index.get("column1").unwrap(), 0);
495 assert_eq!(*rs.index.get("column2").unwrap(), 1);
496 }
497
498 fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
499 assert!(row.is_some());
500 assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
501 }
502
503 fn assert_some_multi_column<
504 T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
505 T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
506 >(
507 row: Option<Row>,
508 v1: T1,
509 v2: T2,
510 ) {
511 assert!(row.is_some());
512 let v = row.unwrap();
513 assert_eq!(v1, v.column::<T1>(0).unwrap());
514 assert_eq!(v2, v.column::<T2>(1).unwrap());
515 }
516
517 #[test]
518 fn test_rs_next_empty() {
519 let mut rs = ResultSet {
520 fields: Arc::new(vec![field("column1")]),
521 index: Arc::new(Default::default()),
522 rows: Default::default(),
523 chunked_value: false,
524 };
525 assert!(rs.next().is_none());
526 }
527
528 #[test]
529 fn test_rs_next_record_chunked_or_not() {
530 let rs = |values| ResultSet {
531 fields: Arc::new(vec![field("column1"), field("column2")]),
532 index: Arc::new(Default::default()),
533 rows: VecDeque::from(values),
534 chunked_value: false,
535 };
536 let mut rs1 = rs(vec![value("value1")]);
537 assert!(rs1.next().is_none());
538 let mut rs2 = rs(vec![value("value1"), value("value2")]);
539 assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
540 }
541
542 #[test]
543 fn test_rs_next_value_chunked_or_not() {
544 let rs = |chunked_value| ResultSet {
545 fields: Arc::new(vec![field("column1"), field("column2")]),
546 index: Arc::new(Default::default()),
547 rows: VecDeque::from(vec![value("value1"), value("value2")]),
548 chunked_value,
549 };
550 assert!(rs(true).next().is_none());
551 assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
552 }
553
554 #[test]
555 fn test_rs_next_plural_record_one_column() {
556 let rs = |chunked_value| ResultSet {
557 fields: Arc::new(vec![field("column1")]),
558 index: Arc::new(Default::default()),
559 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
560 chunked_value,
561 };
562 let mut incomplete = rs(true);
563 assert!(incomplete.next().is_some());
564 assert!(incomplete.next().is_some());
565 assert!(incomplete.next().is_none());
566 let mut complete = rs(false);
567 assert!(complete.next().is_some());
568 assert!(complete.next().is_some());
569 assert!(complete.next().is_some());
570 assert!(complete.next().is_none());
571 }
572
573 #[test]
574 fn test_rs_next_plural_record_multi_column() {
575 let rs = |chunked_value| ResultSet {
576 fields: Arc::new(vec![field("column1"), field("column2")]),
577 index: Arc::new(Default::default()),
578 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
579 chunked_value,
580 };
581 let mut incomplete = rs(true);
582 assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
583 assert!(incomplete.next().is_none());
584 let mut complete = rs(false);
585 assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
586 assert!(incomplete.next().is_none());
587 }
588
589 #[test]
590 fn test_rs_merge_string_value() {
591 let result = ResultSet::merge(value("val"), value("ue1"));
592 assert!(result.is_ok());
593 let kind = result.unwrap().kind.unwrap();
594 match kind {
595 Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
596 _ => unreachable!("must be string value"),
597 }
598 }
599
600 #[test]
601 fn test_rs_merge_list_value() {
602 let previous_last = value(vec!["value1-1", "value1-2", "val"]);
603 let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
604 let result = ResultSet::merge(previous_last, current_first);
605 assert!(result.is_ok());
606 let kind = result.unwrap().kind.unwrap();
607 match kind {
608 Kind::ListValue(v) => {
609 assert_eq!(v.values.len(), 5);
610 match v.values[0].kind.as_ref().unwrap() {
611 Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
612 _ => unreachable!("must be string value"),
613 };
614 match v.values[1].kind.as_ref().unwrap() {
615 Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
616 _ => unreachable!("must be string value"),
617 };
618 match v.values[2].kind.as_ref().unwrap() {
619 Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
620 _ => unreachable!("must be string value"),
621 };
622 match v.values[3].kind.as_ref().unwrap() {
623 Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
624 _ => unreachable!("must be string value"),
625 }
626 match v.values[4].kind.as_ref().unwrap() {
627 Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
628 _ => unreachable!("must be string value"),
629 }
630 }
631 _ => unreachable!("must be string value"),
632 }
633 }
634
635 #[test]
636 fn test_rs_add_one_column_no_chunked_value() {
637 let mut rs = empty_rs();
638 let metadata = Some(ResultSetMetadata {
639 row_type: Some(StructType {
640 fields: vec![field("column1")],
641 }),
642 transaction: None,
643 undeclared_parameters: None,
644 });
645 let values = vec![value("value1"), value("value2"), value("value3")];
646 assert!(rs.add(metadata, values, false).unwrap());
647 assert_eq!(rs.rows.len(), 3);
648 assert_one_column(&rs);
649 assert!(!rs.chunked_value);
650
651 assert_some_one_column(rs.next(), "value1".to_string());
652 assert_some_one_column(rs.next(), "value2".to_string());
653 assert_some_one_column(rs.next(), "value3".to_string());
654 assert!(rs.next().is_none());
655 }
656
657 #[test]
658 fn test_rs_add_multi_column_no_chunked_value() {
659 let mut rs = empty_rs();
660 let metadata = Some(ResultSetMetadata {
661 row_type: Some(StructType {
662 fields: vec![field("column1"), field("column2")],
663 }),
664 transaction: None,
665 undeclared_parameters: None,
666 });
667 let values = vec![value("value1"), value("value2"), value("value3")];
668 assert!(rs.add(metadata, values, false).unwrap());
669 assert_eq!(rs.rows.len(), 3);
670 assert_multi_column(&rs);
671 assert!(!rs.chunked_value);
672
673 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
674 assert!(rs.next().is_none());
675 }
676
677 #[test]
678 fn test_rs_add_multi_column_no_chunked_value_just() {
679 let mut rs = empty_rs();
680 let metadata = Some(ResultSetMetadata {
681 row_type: Some(StructType {
682 fields: vec![field("column1"), field("column2")],
683 }),
684 transaction: None,
685 undeclared_parameters: None,
686 });
687 let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
688 assert!(rs.add(metadata, values, false).unwrap());
689 assert_eq!(rs.rows.len(), 4);
690 assert_multi_column(&rs);
691 assert!(!rs.chunked_value);
692
693 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
694 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
695 assert!(rs.next().is_none());
696 }
697
698 #[test]
699 fn test_rs_add_one_column_chunked_value() {
700 let mut rs = empty_rs();
701 let metadata = Some(ResultSetMetadata {
702 row_type: Some(StructType {
703 fields: vec![field("column1")],
704 }),
705 transaction: None,
706 undeclared_parameters: None,
707 });
708 let values = vec![value("value1"), value("value2"), value("val")];
709 assert!(rs.add(metadata.clone(), values, true).unwrap());
710 assert_eq!(rs.rows.len(), 3);
711 assert_one_column(&rs);
712 assert!(rs.chunked_value);
713
714 assert_some_one_column(rs.next(), "value1".to_string());
715 assert_some_one_column(rs.next(), "value2".to_string());
716 assert!(rs.next().is_none());
717
718 assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
720 assert!(!rs.chunked_value);
721 assert_eq!(rs.rows.len(), 1);
722 assert_some_one_column(rs.next(), "value3".to_string());
723 assert!(rs.next().is_none());
724 }
725
726 #[test]
727 fn test_rs_add_multi_column_chunked_value() {
728 let mut rs = empty_rs();
729 let metadata = Some(ResultSetMetadata {
730 row_type: Some(StructType {
731 fields: vec![field("column1"), field("column2")],
732 }),
733 transaction: None,
734 undeclared_parameters: None,
735 });
736 let values = vec![value("value1"), value("value2"), value("val")];
737 assert!(rs.add(metadata.clone(), values, true).unwrap());
738 assert_eq!(rs.rows.len(), 3);
739 assert_multi_column(&rs);
740 assert!(rs.chunked_value);
741
742 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
743 assert!(rs.next().is_none());
744
745 assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
747 assert!(!rs.chunked_value);
748 assert_eq!(rs.rows.len(), 1);
749 assert!(rs.next().is_none());
750
751 assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
753 assert!(!rs.chunked_value);
754 assert_eq!(rs.rows.len(), 2);
755 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
756 }
757
758 #[test]
759 fn test_rs_add_multi_column_no_chunked_value_list_value() {
760 let mut rs = empty_rs();
761 let metadata = Some(ResultSetMetadata {
762 row_type: Some(StructType {
763 fields: vec![field("column1"), field("column2")],
764 }),
765 transaction: None,
766 undeclared_parameters: None,
767 });
768 let values = vec![value(vec!["value1-1", "value1-2"])];
769 assert!(rs.add(metadata.clone(), values, false).unwrap());
770 assert_eq!(rs.rows.len(), 1);
771 assert_multi_column(&rs);
772 assert!(!rs.chunked_value);
773 assert!(rs.next().is_none());
774 assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
775 assert!(!rs.chunked_value);
776 assert_eq!(rs.rows.len(), 2);
777 assert_some_multi_column(
778 rs.next(),
779 vec!["value1-1".to_string(), "value1-2".to_string()],
780 vec!["value2-1".to_string()],
781 );
782 assert!(rs.next().is_none());
783 }
784
785 #[test]
786 fn test_rs_add_multi_column_chunked_value_list_value() {
787 let mut rs = empty_rs();
788 let metadata = Some(ResultSetMetadata {
789 row_type: Some(StructType {
790 fields: vec![field("column1"), field("column2")],
791 }),
792 transaction: None,
793 undeclared_parameters: None,
794 });
795 let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
796 assert!(rs.add(metadata.clone(), values, true).unwrap());
797 assert_eq!(rs.rows.len(), 2);
798 assert_multi_column(&rs);
799 assert!(rs.chunked_value);
800 assert!(rs.next().is_none());
801
802 assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
804 assert!(rs.chunked_value);
805 assert_eq!(rs.rows.len(), 2);
806 assert!(rs.next().is_none());
807
808 assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
810 assert!(!rs.chunked_value);
811 assert_eq!(rs.rows.len(), 2);
812 assert_some_multi_column(
813 rs.next(),
814 vec!["value1-1".to_string(), "value1-2".to_string()],
815 vec!["value2-1".to_string(), "value2-2".to_string()],
816 );
817 assert!(rs.next().is_none());
818 }
819
820 #[test]
821 fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
822 let mut rs = empty_rs();
823 let metadata = Some(ResultSetMetadata {
824 row_type: Some(StructType {
825 fields: vec![field("column1"), field("column2")],
826 }),
827 transaction: None,
828 undeclared_parameters: None,
829 });
830 let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
831 assert!(rs.add(metadata.clone(), values, true).unwrap());
832 assert_eq!(rs.rows.len(), 2);
833 assert_multi_column(&rs);
834 assert!(rs.chunked_value);
835 assert!(rs.next().is_none());
836
837 assert!(rs
839 .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
840 .unwrap());
841 assert!(rs.chunked_value);
842 assert_eq!(rs.rows.len(), 3);
843 assert_some_multi_column(
844 rs.next(),
845 vec!["value1-1".to_string(), "value1-2".to_string()],
846 "valueA".to_string(),
847 );
848 assert!(rs.next().is_none());
849
850 assert!(rs
852 .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
853 .unwrap());
854 assert!(!rs.chunked_value);
855 assert_eq!(rs.rows.len(), 1);
856 assert!(rs.next().is_none());
857
858 assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
860 assert!(rs.chunked_value);
861 assert_eq!(rs.rows.len(), 2);
862 assert!(rs.next().is_none());
863
864 assert!(rs.add(metadata, vec![value("B")], false).unwrap());
866 assert!(!rs.chunked_value);
867 assert_eq!(rs.rows.len(), 2);
868 assert_some_multi_column(
869 rs.next(),
870 vec!["value2-1".to_string(), "value2-2".to_string()],
871 "valueB".to_string(),
872 );
873 assert!(rs.next().is_none());
874 }
875
876 #[test]
877 fn test_prs_buffer_waits_for_resume_token() {
878 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
879 buffer.push(prs(vec![value("value-1")], "", false));
880 assert!(buffer.pop_ready(false).is_none());
881
882 buffer.push(prs(vec![value("value-2")], "token-1", false));
883 assert!(buffer.pop_ready(false).is_some());
884 assert!(buffer.pop_ready(false).is_some());
885 assert!(buffer.pop_ready(false).is_none());
886 }
887
888 #[test]
889 fn test_prs_buffer_flushes_on_end_of_stream() {
890 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
891 buffer.push(prs(vec![value("value-1")], "", false));
892 assert!(buffer.pop_ready(false).is_none());
893 assert!(buffer.pop_ready(true).is_some());
894 assert!(buffer.pop_ready(true).is_none());
895 }
896
897 #[test]
898 fn test_prs_buffer_becomes_unretryable_after_limit() {
899 let mut buffer = super::ResumablePartialResultSetBuffer::new(1);
900 buffer.push(prs(vec![value("value-1")], "", false));
901 assert!(buffer.unretryable);
902 assert!(buffer.pop_ready(false).is_some());
903 }
904
905 #[test]
906 fn test_prs_buffer_on_resumption_discards_pending() {
907 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
908 buffer.push(prs(vec![value("value-1")], "", false));
909 buffer.on_resumption();
910 buffer.push(prs(vec![value("value-2")], "token-1", false));
911 assert!(buffer.pop_ready(false).is_some());
912 assert!(buffer.pop_ready(false).is_none());
913 }
914
915 #[test]
916 fn test_rs_is_row_boundary_empty() {
917 let rs = empty_rs();
918 assert!(rs.is_row_boundary());
919 }
920
921 #[test]
922 fn test_rs_is_row_boundary_chunked() {
923 let rs = ResultSet {
924 fields: Arc::new(vec![field("column1")]),
925 index: Arc::new(Default::default()),
926 rows: VecDeque::from(vec![value("value1")]),
927 chunked_value: true,
928 };
929 assert!(!rs.is_row_boundary());
930 }
931
932 #[test]
933 fn test_rs_is_row_boundary_multiple_columns() {
934 let rs_complete = ResultSet {
935 fields: Arc::new(vec![field("column1"), field("column2")]),
936 index: Arc::new(Default::default()),
937 rows: VecDeque::from(vec![value("value1"), value("value2")]),
938 chunked_value: false,
939 };
940 assert!(rs_complete.is_row_boundary());
941
942 let rs_partial = ResultSet {
943 fields: Arc::new(vec![field("column1"), field("column2")]),
944 index: Arc::new(Default::default()),
945 rows: VecDeque::from(vec![value("value1")]),
946 chunked_value: false,
947 };
948 assert!(!rs_partial.is_row_boundary());
949 }
950}