google_cloud_bigquery/storage_write/
mod.rs1use google_cloud_gax::grpc::codegen::tokio_stream::Stream;
2use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{ProtoData, Rows};
3use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsRequest, ProtoRows, ProtoSchema};
4use prost_types::DescriptorProto;
5use std::collections::HashMap;
6
7mod flow;
8pub mod stream;
9
10pub struct AppendRowsRequestBuilder {
11 offset: Option<i64>,
12 trace_id: Option<String>,
13 missing_value_interpretations: Option<HashMap<String, i32>>,
14 default_missing_value_interpretation: Option<i32>,
15 data: Vec<Vec<u8>>,
16 schema: DescriptorProto,
17}
18
19impl AppendRowsRequestBuilder {
20 pub fn new(schema: DescriptorProto, data: Vec<Vec<u8>>) -> Self {
21 Self {
22 offset: None,
23 trace_id: None,
24 missing_value_interpretations: None,
25 default_missing_value_interpretation: None,
26 data,
27 schema,
28 }
29 }
30
31 pub fn with_offset(mut self, offset: i64) -> Self {
32 self.offset = Some(offset);
33 self
34 }
35
36 pub fn with_trace_id(mut self, trace_id: String) -> Self {
37 self.trace_id = Some(trace_id);
38 self
39 }
40
41 pub fn with_missing_value_interpretations(mut self, missing_value_interpretations: HashMap<String, i32>) -> Self {
42 self.missing_value_interpretations = Some(missing_value_interpretations);
43 self
44 }
45
46 pub fn with_default_missing_value_interpretation(mut self, default_missing_value_interpretation: i32) -> Self {
47 self.default_missing_value_interpretation = Some(default_missing_value_interpretation);
48 self
49 }
50
51 pub(crate) fn build(self, stream: &str) -> AppendRowsRequest {
52 AppendRowsRequest {
53 write_stream: stream.to_string(),
54 offset: self.offset,
55 trace_id: self.trace_id.unwrap_or_default(),
56 missing_value_interpretations: self.missing_value_interpretations.unwrap_or_default(),
57 default_missing_value_interpretation: self.default_missing_value_interpretation.unwrap_or(0),
58 rows: Some(Rows::ProtoRows(ProtoData {
59 writer_schema: Some(ProtoSchema {
60 proto_descriptor: Some(self.schema),
61 }),
62 rows: Some(ProtoRows {
63 serialized_rows: self.data,
64 }),
65 })),
66 }
67 }
68}
69
70pub fn into_streaming_request(rows: Vec<AppendRowsRequest>) -> impl Stream<Item = AppendRowsRequest> {
71 async_stream::stream! {
72 for row in rows {
73 yield row;
74 }
75 }
76}