google_cloud_bigquery/storage_write/stream/
buffered.rs

1use crate::grpc::apiv1::bigquery_client::create_write_stream_request;
2use crate::grpc::apiv1::conn_pool::ConnectionManager;
3use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream};
4use crate::storage_write::AppendRowsRequestBuilder;
5use google_cloud_gax::grpc::{Status, Streaming};
6use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Buffered;
7use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsResponse, FlushRowsRequest};
8use std::sync::Arc;
9
10pub struct Writer {
11    max_insert_count: usize,
12    cm: Arc<ConnectionManager>,
13}
14
15impl Writer {
16    pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>) -> Self {
17        Self { max_insert_count, cm }
18    }
19
20    pub async fn create_write_stream(&self, table: &str) -> Result<BufferedStream, Status> {
21        let req = create_write_stream_request(table, Buffered);
22        let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner();
23        Ok(BufferedStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count)))
24    }
25}
26pub struct BufferedStream {
27    inner: Stream,
28}
29
30impl BufferedStream {
31    pub(crate) fn new(inner: Stream) -> Self {
32        Self { inner }
33    }
34
35    pub async fn append_rows(
36        &self,
37        rows: Vec<AppendRowsRequestBuilder>,
38    ) -> Result<Streaming<AppendRowsResponse>, Status> {
39        ManagedStreamDelegate::append_rows(&self.inner, rows).await
40    }
41
42    pub async fn finalize(&self) -> Result<i64, Status> {
43        DisposableStreamDelegate::finalize(&self.inner).await
44    }
45
46    pub async fn flush_rows(&self, offset: Option<i64>) -> Result<i64, Status> {
47        let stream = &self.inner;
48        let res = stream
49            .cons
50            .writer()
51            .flush_rows(
52                FlushRowsRequest {
53                    write_stream: stream.inner.name.to_string(),
54                    offset,
55                },
56                None,
57            )
58            .await?
59            .into_inner();
60        Ok(res.offset)
61    }
62}
63
64impl AsStream for BufferedStream {
65    fn as_ref(&self) -> &Stream {
66        &self.inner
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use futures_util::StreamExt;
73    use std::sync::Arc;
74    use tokio::task::JoinHandle;
75
76    use crate::client::{Client, ClientConfig};
77    use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
78    use google_cloud_gax::grpc::Status;
79    use prost::Message;
80
81    #[ctor::ctor]
82    fn init() {
83        crate::storage_write::stream::tests::init();
84    }
85
86    #[tokio::test]
87    async fn test_storage_write() {
88        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
89        let project_id = project_id.unwrap();
90        let client = Client::new(config).await.unwrap();
91        let tables = ["write_test", "write_test_1"];
92        let writer = client.buffered_storage_writer();
93
94        // Create Streams
95        let mut streams = vec![];
96        for i in 0..2 {
97            let table = format!(
98                "projects/{}/datasets/gcrbq_storage/tables/{}",
99                &project_id,
100                tables[i % tables.len()]
101            )
102            .to_string();
103            let stream = writer.create_write_stream(&table).await.unwrap();
104            streams.push(stream);
105        }
106
107        // Append Rows
108        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
109        for (i, stream) in streams.into_iter().enumerate() {
110            tasks.push(tokio::spawn(async move {
111                let mut rows = vec![];
112                for j in 0..5 {
113                    let data = TestData {
114                        col_string: format!("buffered_{i}_{j}"),
115                    };
116                    let mut buf = Vec::new();
117                    data.encode(&mut buf).unwrap();
118                    rows.push(create_append_rows_request(vec![buf.clone(), buf]));
119                }
120                let mut result = stream.append_rows(rows).await.unwrap();
121                while let Some(res) = result.next().await {
122                    let res = res?;
123                    tracing::info!("append row errors = {:?}", res.row_errors.len());
124                }
125                let result = stream.flush_rows(Some(0)).await.unwrap();
126                tracing::info!("flush rows count = {:?}", result);
127
128                let result = stream.finalize().await.unwrap();
129                tracing::info!("finalized row count = {:?}", result);
130                Ok(())
131            }));
132        }
133
134        // Wait for append rows
135        for task in tasks {
136            task.await.unwrap().unwrap();
137        }
138    }
139
140    #[serial_test::serial]
141    #[tokio::test]
142    async fn test_storage_write_single_stream() {
143        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
144        let project_id = project_id.unwrap();
145        let client = Client::new(config).await.unwrap();
146        let writer = client.buffered_storage_writer();
147
148        // Create Streams
149        let mut streams = vec![];
150        let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id).to_string();
151        let stream = Arc::new(writer.create_write_stream(&table).await.unwrap());
152        for _i in 0..2 {
153            streams.push(stream.clone());
154        }
155
156        // Append Rows
157        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
158        for (i, stream) in streams.into_iter().enumerate() {
159            tasks.push(tokio::spawn(async move {
160                let mut rows = vec![];
161                for j in 0..5 {
162                    let data = TestData {
163                        col_string: format!("buffered_{i}_{j}"),
164                    };
165                    let mut buf = Vec::new();
166                    data.encode(&mut buf).unwrap();
167                    rows.push(create_append_rows_request(vec![buf.clone(), buf]));
168                }
169                let mut result = stream.append_rows(rows).await.unwrap();
170                while let Some(res) = result.next().await {
171                    let res = res?;
172                    tracing::info!("append row errors = {:?}", res.row_errors.len());
173                }
174                Ok(())
175            }));
176        }
177
178        // Wait for append rows
179        for task in tasks {
180            task.await.unwrap().unwrap();
181        }
182
183        let result = stream.flush_rows(Some(0)).await.unwrap();
184        tracing::info!("flush rows count = {:?}", result);
185
186        let result = stream.finalize().await.unwrap();
187        tracing::info!("finalized row count = {:?}", result);
188    }
189}