1use std::collections::{HashMap, VecDeque};
2use std::sync::Arc;
3
4use prost_types::{value::Kind, Value};
5
6use google_cloud_gax::grpc::{Code, Response, Status, Streaming};
7use google_cloud_googleapis::spanner::v1::struct_type::Field;
8use google_cloud_googleapis::spanner::v1::{ExecuteSqlRequest, PartialResultSet, ReadRequest, ResultSetMetadata};
9
10use crate::row::Row;
11use crate::session::SessionHandle;
12use crate::transaction::CallOptions;
13
14pub trait Reader: Send + Sync {
15 fn read(
16 &self,
17 session: &mut SessionHandle,
18 option: Option<CallOptions>,
19 ) -> impl std::future::Future<Output = Result<Response<Streaming<PartialResultSet>>, Status>> + Send;
20
21 fn update_token(&mut self, resume_token: Vec<u8>);
22
23 fn can_resume(&self) -> bool;
24}
25
26pub struct StatementReader {
27 pub enable_resume: bool,
28 pub request: ExecuteSqlRequest,
29}
30
31impl Reader for StatementReader {
32 async fn read(
33 &self,
34 session: &mut SessionHandle,
35 option: Option<CallOptions>,
36 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
37 let option = option.unwrap_or_default();
38 let client = &mut session.spanner_client;
39 let result = client.execute_streaming_sql(self.request.clone(), option.retry).await;
40 session.invalidate_if_needed(result).await
41 }
42
43 fn update_token(&mut self, resume_token: Vec<u8>) {
44 self.request.resume_token = resume_token;
45 }
46
47 fn can_resume(&self) -> bool {
48 self.enable_resume && !self.request.resume_token.is_empty()
49 }
50}
51
52pub struct TableReader {
53 pub request: ReadRequest,
54}
55
56impl Reader for TableReader {
57 async fn read(
58 &self,
59 session: &mut SessionHandle,
60 option: Option<CallOptions>,
61 ) -> Result<Response<Streaming<PartialResultSet>>, Status> {
62 let option = option.unwrap_or_default();
63 let client = &mut session.spanner_client;
64 let result = client.streaming_read(self.request.clone(), option.retry).await;
65 session.invalidate_if_needed(result).await
66 }
67
68 fn update_token(&mut self, resume_token: Vec<u8>) {
69 self.request.resume_token = resume_token;
70 }
71
72 fn can_resume(&self) -> bool {
73 !self.request.resume_token.is_empty()
74 }
75}
76
77pub struct ResultSet {
78 fields: Arc<Vec<Field>>,
79 index: Arc<HashMap<String, usize>>,
80 rows: VecDeque<Value>,
81 chunked_value: bool,
82}
83
84impl ResultSet {
85 fn next(&mut self) -> Option<Row> {
86 if !self.rows.is_empty() {
87 let column_length = self.fields.len();
88 let target_record_is_chunked = self.rows.len() < column_length;
89 let target_record_contains_chunked_value = self.chunked_value && self.rows.len() == column_length;
90
91 if !target_record_is_chunked && !target_record_contains_chunked_value {
92 let mut values = Vec::with_capacity(column_length);
94 for _ in 0..column_length {
95 values.push(self.rows.pop_front().unwrap());
96 }
97 return Some(Row::new(Arc::clone(&self.index), Arc::clone(&self.fields), values));
98 }
99 }
100 None
101 }
102
103 fn merge(previous_last: Value, current_first: Value) -> Result<Value, Status> {
105 match previous_last.kind.unwrap() {
106 Kind::StringValue(last) => match current_first.kind.unwrap() {
107 Kind::StringValue(first) => {
108 tracing::trace!("previous_last={}, current_first={}", &last, first);
109 Ok(Value {
110 kind: Some(Kind::StringValue(last + &first)),
111 })
112 }
113 _ => Err(Status::new(
114 Code::Internal,
115 "chunks kind mismatch: current_first must be StringKind",
116 )),
117 },
118 Kind::ListValue(mut last) => match current_first.kind.unwrap() {
119 Kind::ListValue(mut first) => {
120 let first_value_of_current = first.values.remove(0);
121 let merged = match last.values.pop() {
122 Some(last_value_of_previous) => {
123 ResultSet::merge(last_value_of_previous, first_value_of_current)?
124 }
125 None => first_value_of_current,
127 };
128 last.values.push(merged);
129 last.values.extend(first.values);
130 Ok(Value {
131 kind: Some(Kind::ListValue(last)),
132 })
133 }
134 _ => Err(Status::new(
135 Code::Internal,
136 "chunks kind mismatch: current_first must be ListValue",
137 )),
138 },
139 _ => Err(Status::new(
140 Code::Internal,
141 "previous_last kind mismatch: only StringValue and ListValue can be chunked",
142 )),
143 }
144 }
145
146 fn add(
147 &mut self,
148 metadata: Option<ResultSetMetadata>,
149 mut values: Vec<Value>,
150 chunked_value: bool,
151 ) -> Result<bool, Status> {
152 if self.fields.is_empty() {
154 if let Some(metadata) = metadata {
155 self.fields = metadata
156 .row_type
157 .map(|e| Arc::new(e.fields))
158 .ok_or_else(|| Status::new(Code::Internal, "no field metadata found"))?;
159 let mut index = HashMap::new();
161 for (i, f) in self.fields.iter().enumerate() {
162 index.insert(f.name.clone(), i);
163 }
164 self.index = Arc::new(index);
165 }
166 }
167
168 if self.chunked_value {
169 tracing::trace!("now chunked value found previous={}, current={}", self.rows.len(), values.len());
170 let merged = ResultSet::merge(self.rows.pop_back().unwrap(), values.remove(0))?;
172 self.rows.push_back(merged);
173 }
174 self.rows.extend(values);
175 self.chunked_value = chunked_value;
176 Ok(true)
177 }
178}
179
180pub struct RowIterator<'a, T>
181where
182 T: Reader,
183{
184 streaming: Streaming<PartialResultSet>,
185 session: &'a mut SessionHandle,
186 reader: T,
187 rs: ResultSet,
188 reader_option: Option<CallOptions>,
189}
190
191impl<'a, T> RowIterator<'a, T>
192where
193 T: Reader,
194{
195 pub(crate) async fn new(
196 session: &'a mut SessionHandle,
197 reader: T,
198 option: Option<CallOptions>,
199 ) -> Result<RowIterator<'a, T>, Status> {
200 let streaming = reader.read(session, option).await?.into_inner();
201 let rs = ResultSet {
202 fields: Arc::new(vec![]),
203 index: Arc::new(HashMap::new()),
204 rows: VecDeque::new(),
205 chunked_value: false,
206 };
207 Ok(Self {
208 streaming,
209 session,
210 reader,
211 rs,
212 reader_option: None,
213 })
214 }
215
216 pub fn set_call_options(&mut self, option: CallOptions) {
217 self.reader_option = Some(option);
218 }
219
220 async fn try_recv(&mut self, option: Option<CallOptions>) -> Result<bool, Status> {
221 let maybe_result_set = match self.streaming.message().await {
223 Ok(s) => s,
224 Err(e) => {
225 if !self.reader.can_resume() {
226 return Err(e);
227 }
228 tracing::debug!("streaming error: {}. resume reading by resume_token", e);
229 let result = self.reader.read(self.session, option).await?;
230 self.streaming = result.into_inner();
231 self.streaming.message().await?
232 }
233 };
234
235 match maybe_result_set {
236 Some(result_set) => {
237 if result_set.values.is_empty() {
238 return Ok(false);
239 }
240 if !result_set.resume_token.is_empty() {
242 self.reader.update_token(result_set.resume_token);
243 }
244 self.rs
245 .add(result_set.metadata, result_set.values, result_set.chunked_value)
246 }
247 None => Ok(false),
248 }
249 }
250
251 pub fn columns_metadata(&self) -> &Arc<Vec<Field>> {
253 &self.rs.fields
254 }
255
256 pub fn column_metadata(&self, column_name: &str) -> Option<(usize, Field)> {
257 for (i, val) in self.rs.fields.iter().enumerate() {
258 if val.name == column_name {
259 return Some((i, val.clone()));
260 }
261 }
262 None
263 }
264
265 pub async fn next(&mut self) -> Result<Option<Row>, Status> {
268 loop {
269 let row = self.rs.next();
270 if row.is_some() {
271 return Ok(row);
272 }
273 if !self.try_recv(self.reader_option.clone()).await? {
275 return Ok(None);
276 }
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use std::collections::VecDeque;
284 use std::sync::Arc;
285
286 use prost_types::value::Kind;
287 use prost_types::Value;
288
289 use google_cloud_googleapis::spanner::v1::struct_type::Field;
290 use google_cloud_googleapis::spanner::v1::{ResultSetMetadata, StructType};
291
292 use crate::reader::ResultSet;
293 use crate::row::{Row, TryFromValue};
294 use crate::statement::ToKind;
295
296 fn empty_rs() -> ResultSet {
297 ResultSet {
298 fields: Arc::new(vec![]),
299 index: Arc::new(Default::default()),
300 rows: Default::default(),
301 chunked_value: false,
302 }
303 }
304
305 fn field(name: &str) -> Field {
306 Field {
307 name: name.to_string(),
308 r#type: None,
309 }
310 }
311
312 fn value(to_kind: impl ToKind) -> Value {
313 Value {
314 kind: Some(to_kind.to_kind()),
315 }
316 }
317
318 fn assert_one_column(rs: &ResultSet) {
319 assert_eq!(rs.fields.len(), 1);
320 assert_eq!(rs.fields[0].name, "column1".to_string());
321 assert_eq!(*rs.index.get("column1").unwrap(), 0);
322 }
323
324 fn assert_multi_column(rs: &ResultSet) {
325 assert_eq!(rs.fields.len(), 2);
326 assert_eq!(rs.fields[0].name, "column1".to_string());
327 assert_eq!(rs.fields[1].name, "column2".to_string());
328 assert_eq!(*rs.index.get("column1").unwrap(), 0);
329 assert_eq!(*rs.index.get("column2").unwrap(), 1);
330 }
331
332 fn assert_some_one_column<T: TryFromValue + std::cmp::PartialEq + std::fmt::Debug>(row: Option<Row>, v: T) {
333 assert!(row.is_some());
334 assert_eq!(v, row.unwrap().column::<T>(0).unwrap());
335 }
336
337 fn assert_some_multi_column<
338 T1: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
339 T2: TryFromValue + std::cmp::PartialEq + std::fmt::Debug,
340 >(
341 row: Option<Row>,
342 v1: T1,
343 v2: T2,
344 ) {
345 assert!(row.is_some());
346 let v = row.unwrap();
347 assert_eq!(v1, v.column::<T1>(0).unwrap());
348 assert_eq!(v2, v.column::<T2>(1).unwrap());
349 }
350
351 #[test]
352 fn test_rs_next_empty() {
353 let mut rs = ResultSet {
354 fields: Arc::new(vec![field("column1")]),
355 index: Arc::new(Default::default()),
356 rows: Default::default(),
357 chunked_value: false,
358 };
359 assert!(rs.next().is_none());
360 }
361
362 #[test]
363 fn test_rs_next_record_chunked_or_not() {
364 let rs = |values| ResultSet {
365 fields: Arc::new(vec![field("column1"), field("column2")]),
366 index: Arc::new(Default::default()),
367 rows: VecDeque::from(values),
368 chunked_value: false,
369 };
370 let mut rs1 = rs(vec![value("value1")]);
371 assert!(rs1.next().is_none());
372 let mut rs2 = rs(vec![value("value1"), value("value2")]);
373 assert_eq!(rs2.next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
374 }
375
376 #[test]
377 fn test_rs_next_value_chunked_or_not() {
378 let rs = |chunked_value| ResultSet {
379 fields: Arc::new(vec![field("column1"), field("column2")]),
380 index: Arc::new(Default::default()),
381 rows: VecDeque::from(vec![value("value1"), value("value2")]),
382 chunked_value,
383 };
384 assert!(rs(true).next().is_none());
385 assert_eq!(rs(false).next().unwrap().column::<String>(0).unwrap(), "value1".to_string());
386 }
387
388 #[test]
389 fn test_rs_next_plural_record_one_column() {
390 let rs = |chunked_value| ResultSet {
391 fields: Arc::new(vec![field("column1")]),
392 index: Arc::new(Default::default()),
393 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
394 chunked_value,
395 };
396 let mut incomplete = rs(true);
397 assert!(incomplete.next().is_some());
398 assert!(incomplete.next().is_some());
399 assert!(incomplete.next().is_none());
400 let mut complete = rs(false);
401 assert!(complete.next().is_some());
402 assert!(complete.next().is_some());
403 assert!(complete.next().is_some());
404 assert!(complete.next().is_none());
405 }
406
407 #[test]
408 fn test_rs_next_plural_record_multi_column() {
409 let rs = |chunked_value| ResultSet {
410 fields: Arc::new(vec![field("column1"), field("column2")]),
411 index: Arc::new(Default::default()),
412 rows: VecDeque::from(vec![value("value1"), value("value2"), value("value3")]),
413 chunked_value,
414 };
415 let mut incomplete = rs(true);
416 assert_eq!(incomplete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
417 assert!(incomplete.next().is_none());
418 let mut complete = rs(false);
419 assert_eq!(complete.next().unwrap().column::<String>(1).unwrap(), "value2".to_string());
420 assert!(incomplete.next().is_none());
421 }
422
423 #[test]
424 fn test_rs_merge_string_value() {
425 let result = ResultSet::merge(value("val"), value("ue1"));
426 assert!(result.is_ok());
427 let kind = result.unwrap().kind.unwrap();
428 match kind {
429 Kind::StringValue(v) => assert_eq!(v, "value1".to_string()),
430 _ => unreachable!("must be string value"),
431 }
432 }
433
434 #[test]
435 fn test_rs_merge_list_value() {
436 let previous_last = value(vec!["value1-1", "value1-2", "val"]);
437 let current_first = value(vec!["ue1-3", "value2-1", "valu"]);
438 let result = ResultSet::merge(previous_last, current_first);
439 assert!(result.is_ok());
440 let kind = result.unwrap().kind.unwrap();
441 match kind {
442 Kind::ListValue(v) => {
443 assert_eq!(v.values.len(), 5);
444 match v.values[0].kind.as_ref().unwrap() {
445 Kind::StringValue(v) => assert_eq!(*v, "value1-1".to_string()),
446 _ => unreachable!("must be string value"),
447 };
448 match v.values[1].kind.as_ref().unwrap() {
449 Kind::StringValue(v) => assert_eq!(*v, "value1-2".to_string()),
450 _ => unreachable!("must be string value"),
451 };
452 match v.values[2].kind.as_ref().unwrap() {
453 Kind::StringValue(v) => assert_eq!(*v, "value1-3".to_string()),
454 _ => unreachable!("must be string value"),
455 };
456 match v.values[3].kind.as_ref().unwrap() {
457 Kind::StringValue(v) => assert_eq!(*v, "value2-1".to_string()),
458 _ => unreachable!("must be string value"),
459 }
460 match v.values[4].kind.as_ref().unwrap() {
461 Kind::StringValue(v) => assert_eq!(*v, "valu".to_string()),
462 _ => unreachable!("must be string value"),
463 }
464 }
465 _ => unreachable!("must be string value"),
466 }
467 }
468
469 #[test]
470 fn test_rs_add_one_column_no_chunked_value() {
471 let mut rs = empty_rs();
472 let metadata = Some(ResultSetMetadata {
473 row_type: Some(StructType {
474 fields: vec![field("column1")],
475 }),
476 transaction: None,
477 undeclared_parameters: None,
478 });
479 let values = vec![value("value1"), value("value2"), value("value3")];
480 assert!(rs.add(metadata, values, false).unwrap());
481 assert_eq!(rs.rows.len(), 3);
482 assert_one_column(&rs);
483 assert!(!rs.chunked_value);
484
485 assert_some_one_column(rs.next(), "value1".to_string());
486 assert_some_one_column(rs.next(), "value2".to_string());
487 assert_some_one_column(rs.next(), "value3".to_string());
488 assert!(rs.next().is_none());
489 }
490
491 #[test]
492 fn test_rs_add_multi_column_no_chunked_value() {
493 let mut rs = empty_rs();
494 let metadata = Some(ResultSetMetadata {
495 row_type: Some(StructType {
496 fields: vec![field("column1"), field("column2")],
497 }),
498 transaction: None,
499 undeclared_parameters: None,
500 });
501 let values = vec![value("value1"), value("value2"), value("value3")];
502 assert!(rs.add(metadata, values, false).unwrap());
503 assert_eq!(rs.rows.len(), 3);
504 assert_multi_column(&rs);
505 assert!(!rs.chunked_value);
506
507 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
508 assert!(rs.next().is_none());
509 }
510
511 #[test]
512 fn test_rs_add_multi_column_no_chunked_value_just() {
513 let mut rs = empty_rs();
514 let metadata = Some(ResultSetMetadata {
515 row_type: Some(StructType {
516 fields: vec![field("column1"), field("column2")],
517 }),
518 transaction: None,
519 undeclared_parameters: None,
520 });
521 let values = vec![value("value1"), value("value2"), value("value3"), value("value4")];
522 assert!(rs.add(metadata, values, false).unwrap());
523 assert_eq!(rs.rows.len(), 4);
524 assert_multi_column(&rs);
525 assert!(!rs.chunked_value);
526
527 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
528 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
529 assert!(rs.next().is_none());
530 }
531
532 #[test]
533 fn test_rs_add_one_column_chunked_value() {
534 let mut rs = empty_rs();
535 let metadata = Some(ResultSetMetadata {
536 row_type: Some(StructType {
537 fields: vec![field("column1")],
538 }),
539 transaction: None,
540 undeclared_parameters: None,
541 });
542 let values = vec![value("value1"), value("value2"), value("val")];
543 assert!(rs.add(metadata.clone(), values, true).unwrap());
544 assert_eq!(rs.rows.len(), 3);
545 assert_one_column(&rs);
546 assert!(rs.chunked_value);
547
548 assert_some_one_column(rs.next(), "value1".to_string());
549 assert_some_one_column(rs.next(), "value2".to_string());
550 assert!(rs.next().is_none());
551
552 assert!(rs.add(metadata, vec![value("ue3")], false).unwrap());
554 assert!(!rs.chunked_value);
555 assert_eq!(rs.rows.len(), 1);
556 assert_some_one_column(rs.next(), "value3".to_string());
557 assert!(rs.next().is_none());
558 }
559
560 #[test]
561 fn test_rs_add_multi_column_chunked_value() {
562 let mut rs = empty_rs();
563 let metadata = Some(ResultSetMetadata {
564 row_type: Some(StructType {
565 fields: vec![field("column1"), field("column2")],
566 }),
567 transaction: None,
568 undeclared_parameters: None,
569 });
570 let values = vec![value("value1"), value("value2"), value("val")];
571 assert!(rs.add(metadata.clone(), values, true).unwrap());
572 assert_eq!(rs.rows.len(), 3);
573 assert_multi_column(&rs);
574 assert!(rs.chunked_value);
575
576 assert_some_multi_column(rs.next(), "value1".to_string(), "value2".to_string());
577 assert!(rs.next().is_none());
578
579 assert!(rs.add(metadata.clone(), vec![value("ue3")], false).unwrap());
581 assert!(!rs.chunked_value);
582 assert_eq!(rs.rows.len(), 1);
583 assert!(rs.next().is_none());
584
585 assert!(rs.add(metadata, vec![value("value4")], false).unwrap());
587 assert!(!rs.chunked_value);
588 assert_eq!(rs.rows.len(), 2);
589 assert_some_multi_column(rs.next(), "value3".to_string(), "value4".to_string());
590 }
591
592 #[test]
593 fn test_rs_add_multi_column_no_chunked_value_list_value() {
594 let mut rs = empty_rs();
595 let metadata = Some(ResultSetMetadata {
596 row_type: Some(StructType {
597 fields: vec![field("column1"), field("column2")],
598 }),
599 transaction: None,
600 undeclared_parameters: None,
601 });
602 let values = vec![value(vec!["value1-1", "value1-2"])];
603 assert!(rs.add(metadata.clone(), values, false).unwrap());
604 assert_eq!(rs.rows.len(), 1);
605 assert_multi_column(&rs);
606 assert!(!rs.chunked_value);
607 assert!(rs.next().is_none());
608 assert!(rs.add(metadata, vec![value(vec!["value2-1"])], false).unwrap());
609 assert!(!rs.chunked_value);
610 assert_eq!(rs.rows.len(), 2);
611 assert_some_multi_column(
612 rs.next(),
613 vec!["value1-1".to_string(), "value1-2".to_string()],
614 vec!["value2-1".to_string()],
615 );
616 assert!(rs.next().is_none());
617 }
618
619 #[test]
620 fn test_rs_add_multi_column_chunked_value_list_value() {
621 let mut rs = empty_rs();
622 let metadata = Some(ResultSetMetadata {
623 row_type: Some(StructType {
624 fields: vec![field("column1"), field("column2")],
625 }),
626 transaction: None,
627 undeclared_parameters: None,
628 });
629 let values = vec![value(vec!["value1-1", "value1-2"]), value(vec!["value2-"])];
630 assert!(rs.add(metadata.clone(), values, true).unwrap());
631 assert_eq!(rs.rows.len(), 2);
632 assert_multi_column(&rs);
633 assert!(rs.chunked_value);
634 assert!(rs.next().is_none());
635
636 assert!(rs.add(metadata.clone(), vec![value(vec!["1", "valu"])], true).unwrap());
638 assert!(rs.chunked_value);
639 assert_eq!(rs.rows.len(), 2);
640 assert!(rs.next().is_none());
641
642 assert!(rs.add(metadata, vec![value(vec!["e2-2"])], false).unwrap());
644 assert!(!rs.chunked_value);
645 assert_eq!(rs.rows.len(), 2);
646 assert_some_multi_column(
647 rs.next(),
648 vec!["value1-1".to_string(), "value1-2".to_string()],
649 vec!["value2-1".to_string(), "value2-2".to_string()],
650 );
651 assert!(rs.next().is_none());
652 }
653
654 #[test]
655 fn test_rs_add_multi_column_chunked_value_list_and_string_value() {
656 let mut rs = empty_rs();
657 let metadata = Some(ResultSetMetadata {
658 row_type: Some(StructType {
659 fields: vec![field("column1"), field("column2")],
660 }),
661 transaction: None,
662 undeclared_parameters: None,
663 });
664 let values = vec![value(vec!["value1-1", "value1-2"]), value("va")];
665 assert!(rs.add(metadata.clone(), values, true).unwrap());
666 assert_eq!(rs.rows.len(), 2);
667 assert_multi_column(&rs);
668 assert!(rs.chunked_value);
669 assert!(rs.next().is_none());
670
671 assert!(rs
673 .add(metadata.clone(), vec![value("lueA"), value(vec!["valu"])], true)
674 .unwrap());
675 assert!(rs.chunked_value);
676 assert_eq!(rs.rows.len(), 3);
677 assert_some_multi_column(
678 rs.next(),
679 vec!["value1-1".to_string(), "value1-2".to_string()],
680 "valueA".to_string(),
681 );
682 assert!(rs.next().is_none());
683
684 assert!(rs
686 .add(metadata.clone(), vec![value(vec!["e2-1", "value2-2"])], false)
687 .unwrap());
688 assert!(!rs.chunked_value);
689 assert_eq!(rs.rows.len(), 1);
690 assert!(rs.next().is_none());
691
692 assert!(rs.add(metadata.clone(), vec![value("value")], true).unwrap());
694 assert!(rs.chunked_value);
695 assert_eq!(rs.rows.len(), 2);
696 assert!(rs.next().is_none());
697
698 assert!(rs.add(metadata, vec![value("B")], false).unwrap());
700 assert!(!rs.chunked_value);
701 assert_eq!(rs.rows.len(), 2);
702 assert_some_multi_column(
703 rs.next(),
704 vec!["value2-1".to_string(), "value2-2".to_string()],
705 "valueB".to_string(),
706 );
707 assert!(rs.next().is_none());
708 }
709}