1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost::Message;
5use prost_types::{value::Kind, Value};
6
7use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
8use google_cloud_googleapis::spanner::v1::struct_type::Field;
9use google_cloud_googleapis::spanner::v1::{
10 ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata, ResultSetStats,
11};
12
13use crate::retry::StreamingRetry;
14use crate::row::Row;
15use crate::session::SessionHandle;
16use crate::transaction::CallOptions;
17
18pub trait Reader: Send + Sync {
19 fn read(
20 &self,
21 session: &mut SessionHandle,
22 option: Option<CallOptions>,
23 disable_route_to_leader: bool,
24 ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
25
26 fn update_token(&mut self, resume_token: Vec<u8>);
27
28 fn can_resume(&self) -> bool;
29}
30
31pub struct StatementReader {
32 pub enable_resume: bool,
33 pub request: ExecuteSqlRequest,
34}
35
36impl Reader for StatementReader {
37 async fn read(
38 &self,
39 session: &mut SessionHandle,
40 option: Option<CallOptions>,
41 disable_route_to_leader: bool,
42 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
43 let option = option.unwrap_or_default();
44 let client = &mut session.spanner_client;
45 let result = client
46 .execute_streaming_sql(self.request.clone(), disable_route_to_leader, option.retry)
47 .await;
48 session.invalidate_if_needed(result).await
49 }
50
51 fn update_token(&mut self, resume_token: Vec<u8>) {
52 self.request.resume_token = resume_token;
53 }
54
55 fn can_resume(&self) -> bool {
56 self.enable_resume && !self.request.resume_token.is_empty()
57 }
58}
59
60pub struct TableReader {
61 pub request: ReadRequest,
62}
63
64impl Reader for TableReader {
65 async fn read(
66 &self,
67 session: &mut SessionHandle,
68 option: Option<CallOptions>,
69 disable_route_to_leader: bool,
70 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
71 let option = option.unwrap_or_default();
72 let client = &mut session.spanner_client;
73 let result = client
74 .streaming_read(self.request.clone(), disable_route_to_leader, option.retry)
75 .await;
76 session.invalidate_if_needed(result).await
77 }
78
79 fn update_token(&mut self, resume_token: Vec<u8>) {
80 self.request.resume_token = resume_token;
81 }
82
83 fn can_resume(&self) -> bool {
84 !self.request.resume_token.is_empty()
85 }
86}
87
88pub struct ResultSet {
89 fields: Arc<Vec<Field>>,
90 index: Arc<HashMap<String, usize>>,
91 rows: VecDeque<Value>,
92 chunked_value: bool,
93}
94
95const DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS: usize = 128 * 1024 * 1024;
96
97#[derive(Debug)]
98struct ResumablePartialResultSetBuffer {
99 pending: VecDeque<PartialResultSet>,
100 last_delivered_token: Vec<u8>,
101 observed_token: Vec<u8>,
102 bytes_between_tokens: usize,
103 max_bytes_between_tokens: usize,
104 unretryable: bool,
105}
106
107impl ResumablePartialResultSetBuffer {
108 fn new(max_bytes_between_tokens: usize) -> Self {
109 Self {
110 pending: VecDeque::new(),
111 last_delivered_token: Vec::new(),
112 observed_token: Vec::new(),
113 bytes_between_tokens: 0,
114 max_bytes_between_tokens,
115 unretryable: false,
116 }
117 }
118
119 fn push(&mut self, result_set: PartialResultSet) {
120 if !result_set.resume_token.is_empty() && result_set.resume_token != self.observed_token {
121 self.observed_token = result_set.resume_token.clone();
122 }
123
124 if !self.unretryable && self.observed_token == self.last_delivered_token {
125 self.bytes_between_tokens = self.bytes_between_tokens.saturating_add(result_set.encoded_len());
126 if self.bytes_between_tokens >= self.max_bytes_between_tokens {
127 self.unretryable = true;
128 }
129 }
130
131 self.pending.push_back(result_set);
132 }
133
134 fn pop_ready(&mut self, end_of_stream: bool) -> Option<PartialResultSet> {
135 if self.pending.is_empty() {
136 return None;
137 }
138
139 if self.unretryable || end_of_stream {
140 return self.pending.pop_front();
141 }
142
143 if self.observed_token != self.last_delivered_token {
144 let result_set = self.pending.pop_front();
145 if let Some(ref rs) = result_set {
146 if !rs.resume_token.is_empty() && rs.resume_token == self.observed_token {
147 self.last_delivered_token = self.observed_token.clone();
148 self.bytes_between_tokens = 0;
149 }
150 }
151 return result_set;
152 }
153
154 None
155 }
156
157 fn on_resumption(&mut self) {
158 self.pending.clear();
159 self.observed_token = self.last_delivered_token.clone();
160 self.bytes_between_tokens = 0;
161 self.unretryable = false;
162 }
163}
164
165impl ResultSet {
166 fn next(&mut self) -> Option<Row> {
167 if !self.rows.is_empty() {
168 let column_length = self.fields.len();
169 let target_record_is_chunked = self.rows.len() < column_length;
170 let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
171
172 if !target_record_is_chunked && !target_record_contains_chunked_value {
173 let mut values = Vec::with_capacity(column_length);
175 for _ in 0..column_length {
176 values.push(self.rows.pop_front().unwrap());
177 }
178 return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
179 }
180 }
181 None
182 }
183
184 fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
186 match previous_last.kind.unwrap() {
187 Kind::StringValue(last) => match current_first.kind.unwrap() {
188 Kind::StringValue(first) => {
189 tracing::trace!("previous_last={}, current_first={}", &last, first);
190 Ok(Value {
191 kind: Some(Kind::StringValue(last + &first)),
192 })
193 }
194 _ => Err(Status::new(
195 Code::Internal,
196 "chunks kind mismatch: current_first must be StringKind",
197 )),
198 },
199 Kind::ListValue(mut last) => match current_first.kind.unwrap() {
200 Kind::ListValue(first) => {
201 if first.values.is_empty() {
202 return Ok(Value {
203 kind: Some(Kind::ListValue(last)),
204 });
205 }
206 if last.values.is_empty() {
207 return Ok(Value {
208 kind: Some(Kind::ListValue(first)),
209 });
210 }
211 let mut iter = first.values.into_iter();
217 let first_value_of_current = iter.next().unwrap();
218 let last_value_of_previous = last.values.pop().unwrap();
219 let mergeable = matches!(
220 (&last_value_of_previous.kind, &first_value_of_current.kind),
221 (Some(Kind::StringValue(_)), Some(Kind::StringValue(_)))
222 | (Some(Kind::ListValue(_)), Some(Kind::ListValue(_)))
223 );
224 if mergeable {
225 let merged = ResultSet::merge(last_value_of_previous, first_value_of_current)?;
226 last.values.push(merged);
227 } else {
228 last.values.push(last_value_of_previous);
229 last.values.push(first_value_of_current);
230 }
231 last.values.extend(iter);
232 Ok(Value {
233 kind: Some(Kind::ListValue(last)),
234 })
235 }
236 _ => Err(Status::new(
237 Code::Internal,
238 "chunks kind mismatch: current_first must be ListValue",
239 )),
240 },
241 _ => Err(Status::new(
242 Code::Internal,
243 "previous_last kind mismatch: only StringValue and ListValue can be chunked",
244 )),
245 }
246 }
247
248 fn add(
249 &mut self,
250 metadata: Option<ResultSetMetadata>,
251 mut values: Vec<Value>,
252 chunked_value: bool,
253 ) -> Result<bool, Status> {
254 if self.fields.is_empty() {
256 if let Some(metadata) = metadata {
257 self.fields = metadata
258 .row_type
259 .map(|e| Arc::new(e.fields))
260 .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
261 let mut index = HashMap::new();
263 for (i, f) in self.fields.iter().enumerate() {
264 index.insert(f.name.clone(), i);
265 }
266 self.index = Arc::new(index);
267 }
268 }
269
270 if self.chunked_value {
271 tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
272 let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
274 self.rows.push_back(merged);
275 }
276 self.rows.extend(values);
277 self.chunked_value = chunked_value;
278 Ok(true)
279 }
280
281 fn is_row_boundary(&self) -> bool {
282 if self.fields.is_empty() {
283 return self.rows.is_empty() && !self.chunked_value;
284 }
285 if self.chunked_value {
286 return false;
287 }
288 let columns = self.fields.len();
289 if columns == 0 {
290 return self.rows.is_empty();
291 }
292 self.rows.len().is_multiple_of(columns)
293 }
294}
295
296pub struct RowIterator<'a, T>
297where
298 T: Reader,
299{
300 streaming: Streaming<PartialResultSet>,
301 session: &'a mut SessionHandle,
302 reader: T,
303 rs: ResultSet,
304 reader_option: Option<CallOptions>,
305 disable_route_to_leader: bool,
306 stats: Option<ResultSetStats>,
307 prs_buffer: ResumablePartialResultSetBuffer,
308 resumable: bool,
309 end_of_stream: bool,
310 stream_retry: StreamingRetry,
311}
312
313impl<'a, T> RowIterator<'a, T>
314where
315 T: Reader,
316{
317 pub(crate) async fn new(
318 session: &'a mut SessionHandle,
319 reader: T,
320 option: Option<CallOptions>,
321 disable_route_to_leader: bool,
322 ) -> Result<RowIterator<'a, T>, Status> {
323 let streaming = reader
324 .read(session, option, disable_route_to_leader)
325 .await?
326 .into_inner();
327 let rs = ResultSet {
328 fields: Arc::new(vec![]),
329 index: Arc::new(HashMap::new()),
330 rows: VecDeque::new(),
331 chunked_value: false,
332 };
333 Ok(Self {
334 streaming,
335 session,
336 reader,
337 rs,
338 reader_option: None,
339 disable_route_to_leader,
340 stats: None,
341 prs_buffer: ResumablePartialResultSetBuffer::new(DEFAULT_MAX_BYTES_BETWEEN_RESUME_TOKENS),
342 resumable: true,
343 end_of_stream: false,
344 stream_retry: StreamingRetry::new(),
345 })
346 }
347
348 pub fn set_call_options(&mut self, option: CallOptions) {
349 self.reader_option = Some(option);
350 }
351
352 async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
353 loop {
354 if let Some(result_set) = self.prs_buffer.pop_ready(self.end_of_stream) {
355 let resume_token_present = !result_set.resume_token.is_empty();
356 if resume_token_present {
358 self.reader.update_token(result_set.resume_token.clone());
359 }
360 if result_set.stats.is_some() {
362 self.stats = result_set.stats;
363 }
364 if result_set.values.is_empty() {
365 self.rs
367 .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
368 return Ok(false);
369 }
370 let added = self
371 .rs
372 .add(result_set.metadata, result_set.values, result_set.chunked_value)?;
373 if resume_token_present && !self.rs.is_row_boundary() {
374 return Err(Status::new(Code::FailedPrecondition, "resume token is not on a row boundary"));
375 }
376 return Ok(added);
377 }
378
379 if self.end_of_stream {
380 return Ok(false);
381 }
382
383 let received = match self.streaming.message().await {
384 Ok(s) => s,
385 Err(e) => {
386 if !self.reader.can_resume() || !self.resumable {
387 return Err(e);
388 }
389 tracing::debug!("streaming error: {}. resume reading by resume_token", e);
390 self.stream_retry.next(e).await?;
391 let call_option = option.clone();
392 let result = self
393 .reader
394 .read(self.session, call_option, self.disable_route_to_leader)
395 .await?;
396 self.streaming = result.into_inner();
397 self.prs_buffer.on_resumption();
398 continue;
399 }
400 };
401
402 match received {
403 Some(result_set) => {
404 if result_set.last {
405 self.end_of_stream = true;
406 }
407 self.prs_buffer.push(result_set);
408 if self.prs_buffer.unretryable {
409 self.resumable = false;
410 }
411 }
412 None => {
413 self.end_of_stream = true;
414 }
415 }
416 }
417 }
418
419 pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
421 &self.rs.fields
422 }
423
424 pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
425 for (i, val) in self.rs.fields.iter().enumerate() {
426 if val.name == column_name {
427 return Some((i, val.clone()));
428 }
429 }
430 None
431 }
432
433 pub fn stats(&self) -> Option<&ResultSetStats> {
437 self.stats.as_ref()
438 }
439
440 pub async fn next(&mut self) -> Result<Option<Row>, Status> {
443 loop {
444 let row = self.rs.next();
445 if row.is_some() {
446 return Ok(row);
447 }
448 if !self.try_recv(self.reader_option.clone()).await? {
450 return Ok(None);
451 }
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use std::collections::VecDeque;
459 use std::sync::Arc;
460
461 use prost_types::value::Kind;
462 use prost_types::Value;
463
464 use google_cloud_googleapis::spanner::v1::struct_type::Field;
465 use google_cloud_googleapis::spanner::v1::{PartialResultSet, ResultSetMetadata, StructType};
466
467 use crate::reader::ResultSet;
468 use crate::row::{Row, TryFromValue};
469 use crate::statement::ToKind;
470
471 fn empty_rs() -> ResultSet {
472 ResultSet {
473 fields: Arc::new(vec![]),
474 index: Arc::new(Default::default()),
475 rows: Default::default(),
476 chunked_value: false,
477 }
478 }
479
480 fn field(name: &str) -> Field {
481 Field {
482 name: name.to_string(),
483 r#type: None,
484 }
485 }
486
487 fn value(to_kind: impl ToKind) -> Value {
488 Value {
489 kind: Some(to_kind.to_kind()),
490 }
491 }
492
493 fn prs(values: Vec<Value>, resume_token: &str, chunked_value: bool) -> PartialResultSet {
494 PartialResultSet {
495 metadata: None,
496 values,
497 chunked_value,
498 resume_token: resume_token.as_bytes().to_vec(),
499 stats: None,
500 precommit_token: None,
501 last: false,
502 }
503 }
504
505 fn assert_one_column(rs: &ResultSet) {
506 assert_eq!(rs.fields.len(), 1);
507 assert_eq!(rs.fields[0].name, "column1".to_string());
508 assert_eq!(*rs.index.get("column1").unwrap(), 0);
509 }
510
511 fn assert_multi_column(rs: &ResultSet) {
512 assert_eq!(rs.fields.len(), 2);
513 assert_eq!(rs.fields[0].name, "column1".to_string());
514 assert_eq!(rs.fields[1].name, "column2".to_string());
515 assert_eq!(*rs.index.get("column1").unwrap(), 0);
516 assert_eq!(*rs.index.get("column2").unwrap(), 1);
517 }
518
519 fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
520 assert!(row.is_some());
521 assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
522 }
523
524 fn assert_some_multi_column<
525 T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
526 T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
527 >(
528 row: Option<Row>,
529 v1: T1,
530 v2: T2,
531 ) {
532 assert!(row.is_some());
533 let v = row.unwrap();
534 assert_eq!(v1, v.column::<T1>(0).unwrap());
535 assert_eq!(v2, v.column::<T2>(1).unwrap());
536 }
537
538 #[test]
539 fn test_rs_next_empty() {
540 let mut rs = ResultSet {
541 fields: Arc::new(vec![field("column1")]),
542 index: Arc::new(Default::default()),
543 rows: Default::default(),
544 chunked_value: false,
545 };
546 assert!(rs.next().is_none());
547 }
548
549 #[test]
550 fn test_rs_next_record_chunked_or_not() {
551 let rs = |values| ResultSet {
552 fields: Arc::new(vec![field("column1"), field("column2")]),
553 index: Arc::new(Default::default()),
554 rows: VecDeque::from(values),
555 chunked_value: false,
556 };
557 let mut rs1 = rs(vec![value("value1")]);
558 assert!(rs1.next().is_none());
559 let mut rs2 = rs(vec![value("value1"), value("value2")]);
560 assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
561 }
562
563 #[test]
564 fn test_rs_next_value_chunked_or_not() {
565 let rs = |chunked_value| ResultSet {
566 fields: Arc::new(vec![field("column1"), field("column2")]),
567 index: Arc::new(Default::default()),
568 rows: VecDeque::from(vec![value("value1"), value("value2")]),
569 chunked_value,
570 };
571 assert!(rs(true).next().is_none());
572 assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
573 }
574
575 #[test]
576 fn test_rs_next_plural_record_one_column() {
577 let rs = |chunked_value| ResultSet {
578 fields: Arc::new(vec![field("column1")]),
579 index: Arc::new(Default::default()),
580 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
581 chunked_value,
582 };
583 let mut incomplete = rs(true);
584 assert!(incomplete.next().is_some());
585 assert!(incomplete.next().is_some());
586 assert!(incomplete.next().is_none());
587 let mut complete = rs(false);
588 assert!(complete.next().is_some());
589 assert!(complete.next().is_some());
590 assert!(complete.next().is_some());
591 assert!(complete.next().is_none());
592 }
593
594 #[test]
595 fn test_rs_next_plural_record_multi_column() {
596 let rs = |chunked_value| ResultSet {
597 fields: Arc::new(vec![field("column1"), field("column2")]),
598 index: Arc::new(Default::default()),
599 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
600 chunked_value,
601 };
602 let mut incomplete = rs(true);
603 assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
604 assert!(incomplete.next().is_none());
605 let mut complete = rs(false);
606 assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
607 assert!(incomplete.next().is_none());
608 }
609
610 #[test]
611 fn test_rs_merge_string_value() {
612 let result = ResultSet::merge(value("val"), value("ue1"));
613 assert!(result.is_ok());
614 let kind = result.unwrap().kind.unwrap();
615 match kind {
616 Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
617 _ => unreachable!("must be string value"),
618 }
619 }
620
621 #[test]
622 fn test_rs_merge_list_value() {
623 let previous_last = value(vec!["value1-1", "value1-2", "val"]);
624 let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
625 let result = ResultSet::merge(previous_last, current_first);
626 assert!(result.is_ok());
627 let kind = result.unwrap().kind.unwrap();
628 match kind {
629 Kind::ListValue(v) => {
630 assert_eq!(v.values.len(), 5);
631 match v.values[0].kind.as_ref().unwrap() {
632 Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
633 _ => unreachable!("must be string value"),
634 };
635 match v.values[1].kind.as_ref().unwrap() {
636 Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
637 _ => unreachable!("must be string value"),
638 };
639 match v.values[2].kind.as_ref().unwrap() {
640 Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
641 _ => unreachable!("must be string value"),
642 };
643 match v.values[3].kind.as_ref().unwrap() {
644 Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
645 _ => unreachable!("must be string value"),
646 }
647 match v.values[4].kind.as_ref().unwrap() {
648 Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
649 _ => unreachable!("must be string value"),
650 }
651 }
652 _ => unreachable!("must be string value"),
653 }
654 }
655
656 #[test]
661 fn test_rs_merge_list_value_with_non_mergeable_tail() {
662 let null = || Value {
663 kind: Some(Kind::NullValue(prost_types::NullValue::NullValue.into())),
664 };
665 let bool_v = |b: bool| Value {
666 kind: Some(Kind::BoolValue(b)),
667 };
668 let num_v = |n: f64| Value {
669 kind: Some(Kind::NumberValue(n)),
670 };
671
672 let previous_last = Value {
674 kind: Some(Kind::ListValue(prost_types::ListValue {
675 values: vec![value("a"), null()],
676 })),
677 };
678 let current_first = Value {
679 kind: Some(Kind::ListValue(prost_types::ListValue {
680 values: vec![value("b"), bool_v(true)],
681 })),
682 };
683 let merged = ResultSet::merge(previous_last, current_first).expect("must concatenate");
684 match merged.kind.unwrap() {
685 Kind::ListValue(v) => {
686 assert_eq!(v.values.len(), 4);
687 assert!(matches!(v.values[0].kind, Some(Kind::StringValue(ref s)) if s == "a"));
688 assert!(matches!(v.values[1].kind, Some(Kind::NullValue(_))));
689 assert!(matches!(v.values[2].kind, Some(Kind::StringValue(ref s)) if s == "b"));
690 assert!(matches!(v.values[3].kind, Some(Kind::BoolValue(true))));
691 }
692 _ => unreachable!("must be list value"),
693 }
694
695 let previous_last = Value {
697 kind: Some(Kind::ListValue(prost_types::ListValue {
698 values: vec![bool_v(false)],
699 })),
700 };
701 let current_first = Value {
702 kind: Some(Kind::ListValue(prost_types::ListValue {
703 values: vec![num_v(1.5), value("tail")],
704 })),
705 };
706 let merged = ResultSet::merge(previous_last, current_first).expect("must concatenate");
707 match merged.kind.unwrap() {
708 Kind::ListValue(v) => {
709 assert_eq!(v.values.len(), 3);
710 assert!(matches!(v.values[0].kind, Some(Kind::BoolValue(false))));
711 assert!(matches!(v.values[1].kind, Some(Kind::NumberValue(n)) if n == 1.5));
712 assert!(matches!(v.values[2].kind, Some(Kind::StringValue(ref s)) if s == "tail"));
713 }
714 _ => unreachable!("must be list value"),
715 }
716 }
717
718 #[test]
720 fn test_rs_merge_list_value_empty_sides() {
721 let empty_list = || Value {
722 kind: Some(Kind::ListValue(prost_types::ListValue { values: vec![] })),
723 };
724 let with_one = || Value {
725 kind: Some(Kind::ListValue(prost_types::ListValue {
726 values: vec![value("only")],
727 })),
728 };
729
730 let merged = ResultSet::merge(empty_list(), with_one()).unwrap();
732 match merged.kind.unwrap() {
733 Kind::ListValue(v) => {
734 assert_eq!(v.values.len(), 1);
735 assert!(matches!(v.values[0].kind, Some(Kind::StringValue(ref s)) if s == "only"));
736 }
737 _ => unreachable!(),
738 }
739
740 let merged = ResultSet::merge(with_one(), empty_list()).unwrap();
742 match merged.kind.unwrap() {
743 Kind::ListValue(v) => {
744 assert_eq!(v.values.len(), 1);
745 assert!(matches!(v.values[0].kind, Some(Kind::StringValue(ref s)) if s == "only"));
746 }
747 _ => unreachable!(),
748 }
749 }
750
751 #[test]
752 fn test_rs_add_one_column_no_chunked_value() {
753 let mut rs = empty_rs();
754 let metadata = Some(ResultSetMetadata {
755 row_type: Some(StructType {
756 fields: vec![field("column1")],
757 }),
758 transaction: None,
759 undeclared_parameters: None,
760 });
761 let values = vec![value("value1"), value("value2"), value("value3")];
762 assert!(rs.add(metadata, values, false).unwrap());
763 assert_eq!(rs.rows.len(), 3);
764 assert_one_column(&rs);
765 assert!(!rs.chunked_value);
766
767 assert_some_one_column(rs.next(), "value1".to_string());
768 assert_some_one_column(rs.next(), "value2".to_string());
769 assert_some_one_column(rs.next(), "value3".to_string());
770 assert!(rs.next().is_none());
771 }
772
773 #[test]
774 fn test_rs_add_multi_column_no_chunked_value() {
775 let mut rs = empty_rs();
776 let metadata = Some(ResultSetMetadata {
777 row_type: Some(StructType {
778 fields: vec![field("column1"), field("column2")],
779 }),
780 transaction: None,
781 undeclared_parameters: None,
782 });
783 let values = vec![value("value1"), value("value2"), value("value3")];
784 assert!(rs.add(metadata, values, false).unwrap());
785 assert_eq!(rs.rows.len(), 3);
786 assert_multi_column(&rs);
787 assert!(!rs.chunked_value);
788
789 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
790 assert!(rs.next().is_none());
791 }
792
793 #[test]
794 fn test_rs_add_multi_column_no_chunked_value_just() {
795 let mut rs = empty_rs();
796 let metadata = Some(ResultSetMetadata {
797 row_type: Some(StructType {
798 fields: vec![field("column1"), field("column2")],
799 }),
800 transaction: None,
801 undeclared_parameters: None,
802 });
803 let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
804 assert!(rs.add(metadata, values, false).unwrap());
805 assert_eq!(rs.rows.len(), 4);
806 assert_multi_column(&rs);
807 assert!(!rs.chunked_value);
808
809 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
810 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
811 assert!(rs.next().is_none());
812 }
813
814 #[test]
815 fn test_rs_add_one_column_chunked_value() {
816 let mut rs = empty_rs();
817 let metadata = Some(ResultSetMetadata {
818 row_type: Some(StructType {
819 fields: vec![field("column1")],
820 }),
821 transaction: None,
822 undeclared_parameters: None,
823 });
824 let values = vec![value("value1"), value("value2"), value("val")];
825 assert!(rs.add(metadata.clone(), values, true).unwrap());
826 assert_eq!(rs.rows.len(), 3);
827 assert_one_column(&rs);
828 assert!(rs.chunked_value);
829
830 assert_some_one_column(rs.next(), "value1".to_string());
831 assert_some_one_column(rs.next(), "value2".to_string());
832 assert!(rs.next().is_none());
833
834 assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
836 assert!(!rs.chunked_value);
837 assert_eq!(rs.rows.len(), 1);
838 assert_some_one_column(rs.next(), "value3".to_string());
839 assert!(rs.next().is_none());
840 }
841
842 #[test]
843 fn test_rs_add_multi_column_chunked_value() {
844 let mut rs = empty_rs();
845 let metadata = Some(ResultSetMetadata {
846 row_type: Some(StructType {
847 fields: vec![field("column1"), field("column2")],
848 }),
849 transaction: None,
850 undeclared_parameters: None,
851 });
852 let values = vec![value("value1"), value("value2"), value("val")];
853 assert!(rs.add(metadata.clone(), values, true).unwrap());
854 assert_eq!(rs.rows.len(), 3);
855 assert_multi_column(&rs);
856 assert!(rs.chunked_value);
857
858 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
859 assert!(rs.next().is_none());
860
861 assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
863 assert!(!rs.chunked_value);
864 assert_eq!(rs.rows.len(), 1);
865 assert!(rs.next().is_none());
866
867 assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
869 assert!(!rs.chunked_value);
870 assert_eq!(rs.rows.len(), 2);
871 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
872 }
873
874 #[test]
875 fn test_rs_add_multi_column_no_chunked_value_list_value() {
876 let mut rs = empty_rs();
877 let metadata = Some(ResultSetMetadata {
878 row_type: Some(StructType {
879 fields: vec![field("column1"), field("column2")],
880 }),
881 transaction: None,
882 undeclared_parameters: None,
883 });
884 let values = vec![value(vec!["value1-1", "value1-2"])];
885 assert!(rs.add(metadata.clone(), values, false).unwrap());
886 assert_eq!(rs.rows.len(), 1);
887 assert_multi_column(&rs);
888 assert!(!rs.chunked_value);
889 assert!(rs.next().is_none());
890 assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
891 assert!(!rs.chunked_value);
892 assert_eq!(rs.rows.len(), 2);
893 assert_some_multi_column(
894 rs.next(),
895 vec!["value1-1".to_string(), "value1-2".to_string()],
896 vec!["value2-1".to_string()],
897 );
898 assert!(rs.next().is_none());
899 }
900
901 #[test]
902 fn test_rs_add_multi_column_chunked_value_list_value() {
903 let mut rs = empty_rs();
904 let metadata = Some(ResultSetMetadata {
905 row_type: Some(StructType {
906 fields: vec![field("column1"), field("column2")],
907 }),
908 transaction: None,
909 undeclared_parameters: None,
910 });
911 let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
912 assert!(rs.add(metadata.clone(), values, true).unwrap());
913 assert_eq!(rs.rows.len(), 2);
914 assert_multi_column(&rs);
915 assert!(rs.chunked_value);
916 assert!(rs.next().is_none());
917
918 assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
920 assert!(rs.chunked_value);
921 assert_eq!(rs.rows.len(), 2);
922 assert!(rs.next().is_none());
923
924 assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
926 assert!(!rs.chunked_value);
927 assert_eq!(rs.rows.len(), 2);
928 assert_some_multi_column(
929 rs.next(),
930 vec!["value1-1".to_string(), "value1-2".to_string()],
931 vec!["value2-1".to_string(), "value2-2".to_string()],
932 );
933 assert!(rs.next().is_none());
934 }
935
936 #[test]
937 fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
938 let mut rs = empty_rs();
939 let metadata = Some(ResultSetMetadata {
940 row_type: Some(StructType {
941 fields: vec![field("column1"), field("column2")],
942 }),
943 transaction: None,
944 undeclared_parameters: None,
945 });
946 let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
947 assert!(rs.add(metadata.clone(), values, true).unwrap());
948 assert_eq!(rs.rows.len(), 2);
949 assert_multi_column(&rs);
950 assert!(rs.chunked_value);
951 assert!(rs.next().is_none());
952
953 assert!(rs
955 .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
956 .unwrap());
957 assert!(rs.chunked_value);
958 assert_eq!(rs.rows.len(), 3);
959 assert_some_multi_column(
960 rs.next(),
961 vec!["value1-1".to_string(), "value1-2".to_string()],
962 "valueA".to_string(),
963 );
964 assert!(rs.next().is_none());
965
966 assert!(rs
968 .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
969 .unwrap());
970 assert!(!rs.chunked_value);
971 assert_eq!(rs.rows.len(), 1);
972 assert!(rs.next().is_none());
973
974 assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
976 assert!(rs.chunked_value);
977 assert_eq!(rs.rows.len(), 2);
978 assert!(rs.next().is_none());
979
980 assert!(rs.add(metadata, vec![value("B")], false).unwrap());
982 assert!(!rs.chunked_value);
983 assert_eq!(rs.rows.len(), 2);
984 assert_some_multi_column(
985 rs.next(),
986 vec!["value2-1".to_string(), "value2-2".to_string()],
987 "valueB".to_string(),
988 );
989 assert!(rs.next().is_none());
990 }
991
992 #[test]
993 fn test_prs_buffer_waits_for_resume_token() {
994 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
995 buffer.push(prs(vec![value("value-1")], "", false));
996 assert!(buffer.pop_ready(false).is_none());
997
998 buffer.push(prs(vec![value("value-2")], "token-1", false));
999 assert!(buffer.pop_ready(false).is_some());
1000 assert!(buffer.pop_ready(false).is_some());
1001 assert!(buffer.pop_ready(false).is_none());
1002 }
1003
1004 #[test]
1005 fn test_prs_buffer_flushes_on_end_of_stream() {
1006 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
1007 buffer.push(prs(vec![value("value-1")], "", false));
1008 assert!(buffer.pop_ready(false).is_none());
1009 assert!(buffer.pop_ready(true).is_some());
1010 assert!(buffer.pop_ready(true).is_none());
1011 }
1012
1013 #[test]
1014 fn test_prs_buffer_becomes_unretryable_after_limit() {
1015 let mut buffer = super::ResumablePartialResultSetBuffer::new(1);
1016 buffer.push(prs(vec![value("value-1")], "", false));
1017 assert!(buffer.unretryable);
1018 assert!(buffer.pop_ready(false).is_some());
1019 }
1020
1021 #[test]
1022 fn test_prs_buffer_on_resumption_discards_pending() {
1023 let mut buffer = super::ResumablePartialResultSetBuffer::new(1024);
1024 buffer.push(prs(vec![value("value-1")], "", false));
1025 buffer.on_resumption();
1026 buffer.push(prs(vec![value("value-2")], "token-1", false));
1027 assert!(buffer.pop_ready(false).is_some());
1028 assert!(buffer.pop_ready(false).is_none());
1029 }
1030
1031 #[test]
1032 fn test_rs_is_row_boundary_empty() {
1033 let rs = empty_rs();
1034 assert!(rs.is_row_boundary());
1035 }
1036
1037 #[test]
1038 fn test_rs_is_row_boundary_chunked() {
1039 let rs = ResultSet {
1040 fields: Arc::new(vec![field("column1")]),
1041 index: Arc::new(Default::default()),
1042 rows: VecDeque::from(vec![value("value1")]),
1043 chunked_value: true,
1044 };
1045 assert!(!rs.is_row_boundary());
1046 }
1047
1048 #[test]
1049 fn test_rs_is_row_boundary_multiple_columns() {
1050 let rs_complete = ResultSet {
1051 fields: Arc::new(vec![field("column1"), field("column2")]),
1052 index: Arc::new(Default::default()),
1053 rows: VecDeque::from(vec![value("value1"), value("value2")]),
1054 chunked_value: false,
1055 };
1056 assert!(rs_complete.is_row_boundary());
1057
1058 let rs_partial = ResultSet {
1059 fields: Arc::new(vec![field("column1"), field("column2")]),
1060 index: Arc::new(Default::default()),
1061 rows: VecDeque::from(vec![value("value1")]),
1062 chunked_value: false,
1063 };
1064 assert!(!rs_partial.is_row_boundary());
1065 }
1066}