google_cloud_bigquery/storage_write/stream/
mod.rs

1use crate::grpc::apiv1::conn_pool::ConnectionManager;
2use crate::storage_write::flow::FlowController;
3use crate::storage_write::AppendRowsRequestBuilder;
4use google_cloud_gax::grpc::{IntoStreamingRequest, Status, Streaming};
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{
6    AppendRowsRequest, AppendRowsResponse, FinalizeWriteStreamRequest, WriteStream,
7};
8use std::sync::Arc;
9
10pub mod buffered;
11pub mod committed;
12pub mod default;
13pub mod pending;
14
15pub struct Stream {
16    inner: WriteStream,
17    cons: Arc<ConnectionManager>,
18    fc: Option<FlowController>,
19}
20
21impl Stream {
22    pub(crate) fn new(inner: WriteStream, cons: Arc<ConnectionManager>, max_insert_count: usize) -> Self {
23        Self {
24            inner,
25            cons,
26            fc: if max_insert_count > 0 {
27                Some(FlowController::new(max_insert_count))
28            } else {
29                None
30            },
31        }
32    }
33}
34
35pub trait AsStream: Sized {
36    fn as_ref(&self) -> &Stream;
37
38    fn name(&self) -> &str {
39        &self.as_ref().inner.name
40    }
41
42    fn create_streaming_request(
43        &self,
44        rows: Vec<AppendRowsRequestBuilder>,
45    ) -> impl google_cloud_gax::grpc::codegen::tokio_stream::Stream<Item = AppendRowsRequest> {
46        let name = self.name().to_string();
47        async_stream::stream! {
48            for row in rows {
49                yield row.build(&name);
50            }
51        }
52    }
53}
54
55pub(crate) struct ManagedStreamDelegate {}
56
57impl ManagedStreamDelegate {
58    async fn append_rows(
59        stream: &Stream,
60        rows: Vec<AppendRowsRequestBuilder>,
61    ) -> Result<Streaming<AppendRowsResponse>, Status> {
62        let name = stream.inner.name.to_string();
63        let req = async_stream::stream! {
64            for row in rows {
65                yield row.build(&name);
66            }
67        };
68        Self::append_streaming_request(stream, req).await
69    }
70
71    async fn append_streaming_request(
72        stream: &Stream,
73        req: impl IntoStreamingRequest<Message = AppendRowsRequest>,
74    ) -> Result<Streaming<AppendRowsResponse>, Status> {
75        match &stream.fc {
76            None => {
77                let mut client = stream.cons.writer();
78                Ok(client.append_rows(req).await?.into_inner())
79            }
80            Some(fc) => {
81                let permit = fc.acquire().await;
82                let mut client = stream.cons.writer();
83                let result = client.append_rows(req).await?.into_inner();
84                drop(permit);
85                Ok(result)
86            }
87        }
88    }
89}
90
91pub(crate) struct DisposableStreamDelegate {}
92impl DisposableStreamDelegate {
93    async fn finalize(stream: &Stream) -> Result<i64, Status> {
94        let res = stream
95            .cons
96            .writer()
97            .finalize_write_stream(
98                FinalizeWriteStreamRequest {
99                    name: stream.inner.name.to_string(),
100                },
101                None,
102            )
103            .await?
104            .into_inner();
105        Ok(res.row_count)
106    }
107}
108
109#[cfg(test)]
110pub(crate) mod tests {
111    use crate::storage_write::AppendRowsRequestBuilder;
112    use prost_types::{field_descriptor_proto, DescriptorProto, FieldDescriptorProto};
113
114    #[derive(Clone, PartialEq, ::prost::Message)]
115    pub(crate) struct TestData {
116        #[prost(string, tag = "1")]
117        pub col_string: String,
118    }
119
120    pub(crate) fn init() {
121        let filter = tracing_subscriber::filter::EnvFilter::from_default_env()
122            .add_directive("google_cloud_bigquery=trace".parse().unwrap());
123        let _ = tracing_subscriber::fmt().with_env_filter(filter).try_init();
124    }
125
126    pub(crate) fn create_append_rows_request(buf: Vec<Vec<u8>>) -> AppendRowsRequestBuilder {
127        let proto = DescriptorProto {
128            name: Some("TestData".to_string()),
129            field: vec![FieldDescriptorProto {
130                name: Some("col_string".to_string()),
131                number: Some(1),
132                label: None,
133                r#type: Some(field_descriptor_proto::Type::String.into()),
134                type_name: None,
135                extendee: None,
136                default_value: None,
137                oneof_index: None,
138                json_name: None,
139                options: None,
140                proto3_optional: None,
141            }],
142            extension: vec![],
143            nested_type: vec![],
144            enum_type: vec![],
145            extension_range: vec![],
146            oneof_decl: vec![],
147            options: None,
148            reserved_range: vec![],
149            reserved_name: vec![],
150        };
151        AppendRowsRequestBuilder::new(proto, buf)
152    }
153}