google_cloud_bigquery/storage_write/
mod.rs

1use google_cloud_gax::grpc::codegen::tokio_stream::Stream;
2use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{ProtoData, Rows};
3use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsRequest, ProtoRows, ProtoSchema};
4use prost_types::DescriptorProto;
5use std::collections::HashMap;
6
7mod flow;
8pub mod stream;
9
10pub struct AppendRowsRequestBuilder {
11    offset: Option<i64>,
12    trace_id: Option<String>,
13    missing_value_interpretations: Option<HashMap<String, i32>>,
14    default_missing_value_interpretation: Option<i32>,
15    data: Vec<Vec<u8>>,
16    schema: DescriptorProto,
17}
18
19impl AppendRowsRequestBuilder {
20    pub fn new(schema: DescriptorProto, data: Vec<Vec<u8>>) -> Self {
21        Self {
22            offset: None,
23            trace_id: None,
24            missing_value_interpretations: None,
25            default_missing_value_interpretation: None,
26            data,
27            schema,
28        }
29    }
30
31    pub fn with_offset(mut self, offset: i64) -> Self {
32        self.offset = Some(offset);
33        self
34    }
35
36    pub fn with_trace_id(mut self, trace_id: String) -> Self {
37        self.trace_id = Some(trace_id);
38        self
39    }
40
41    pub fn with_missing_value_interpretations(mut self, missing_value_interpretations: HashMap<String, i32>) -> Self {
42        self.missing_value_interpretations = Some(missing_value_interpretations);
43        self
44    }
45
46    pub fn with_default_missing_value_interpretation(mut self, default_missing_value_interpretation: i32) -> Self {
47        self.default_missing_value_interpretation = Some(default_missing_value_interpretation);
48        self
49    }
50
51    pub(crate) fn build(self, stream: &str) -> AppendRowsRequest {
52        AppendRowsRequest {
53            write_stream: stream.to_string(),
54            offset: self.offset,
55            trace_id: self.trace_id.unwrap_or_default(),
56            missing_value_interpretations: self.missing_value_interpretations.unwrap_or_default(),
57            default_missing_value_interpretation: self.default_missing_value_interpretation.unwrap_or(0),
58            rows: Some(Rows::ProtoRows(ProtoData {
59                writer_schema: Some(ProtoSchema {
60                    proto_descriptor: Some(self.schema),
61                }),
62                rows: Some(ProtoRows {
63                    serialized_rows: self.data,
64                }),
65            })),
66        }
67    }
68}
69
70pub fn into_streaming_request(rows: Vec<AppendRowsRequest>) -> impl Stream<Item = AppendRowsRequest> {
71    async_stream::stream! {
72        for row in rows {
73            yield row;
74        }
75    }
76}