google_cloud_bigquery/storage_write/stream/
buffered.rs1use crate::grpc::apiv1::bigquery_client::create_write_stream_request;
2use crate::grpc::apiv1::conn_pool::ConnectionManager;
3use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream};
4use crate::storage_write::AppendRowsRequestBuilder;
5use google_cloud_gax::grpc::{Status, Streaming};
6use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Buffered;
7use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsResponse, FlushRowsRequest};
8use std::sync::Arc;
9
10pub struct Writer {
11 max_insert_count: usize,
12 cm: Arc<ConnectionManager>,
13}
14
15impl Writer {
16 pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>) -> Self {
17 Self { max_insert_count, cm }
18 }
19
20 pub async fn create_write_stream(&self, table: &str) -> Result<BufferedStream, Status> {
21 let req = create_write_stream_request(table, Buffered);
22 let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner();
23 Ok(BufferedStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count)))
24 }
25}
26pub struct BufferedStream {
27 inner: Stream,
28}
29
30impl BufferedStream {
31 pub(crate) fn new(inner: Stream) -> Self {
32 Self { inner }
33 }
34
35 pub async fn append_rows(
36 &self,
37 rows: Vec<AppendRowsRequestBuilder>,
38 ) -> Result<Streaming<AppendRowsResponse>, Status> {
39 ManagedStreamDelegate::append_rows(&self.inner, rows).await
40 }
41
42 pub async fn finalize(&self) -> Result<i64, Status> {
43 DisposableStreamDelegate::finalize(&self.inner).await
44 }
45
46 pub async fn flush_rows(&self, offset: Option<i64>) -> Result<i64, Status> {
47 let stream = &self.inner;
48 let res = stream
49 .cons
50 .writer()
51 .flush_rows(
52 FlushRowsRequest {
53 write_stream: stream.inner.name.to_string(),
54 offset,
55 },
56 None,
57 )
58 .await?
59 .into_inner();
60 Ok(res.offset)
61 }
62}
63
64impl AsStream for BufferedStream {
65 fn as_ref(&self) -> &Stream {
66 &self.inner
67 }
68}
69
70#[cfg(test)]
71mod tests {
72 use futures_util::StreamExt;
73 use std::sync::Arc;
74 use tokio::task::JoinHandle;
75
76 use crate::client::{Client, ClientConfig};
77 use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
78 use google_cloud_gax::grpc::Status;
79 use prost::Message;
80
81 #[ctor::ctor]
82 fn init() {
83 crate::storage_write::stream::tests::init();
84 }
85
86 #[tokio::test]
87 async fn test_storage_write() {
88 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
89 let project_id = project_id.unwrap();
90 let client = Client::new(config).await.unwrap();
91 let tables = ["write_test", "write_test_1"];
92 let writer = client.buffered_storage_writer();
93
94 let mut streams = vec![];
96 for i in 0..2 {
97 let table = format!(
98 "projects/{}/datasets/gcrbq_storage/tables/{}",
99 &project_id,
100 tables[i % tables.len()]
101 )
102 .to_string();
103 let stream = writer.create_write_stream(&table).await.unwrap();
104 streams.push(stream);
105 }
106
107 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
109 for (i, stream) in streams.into_iter().enumerate() {
110 tasks.push(tokio::spawn(async move {
111 let mut rows = vec![];
112 for j in 0..5 {
113 let data = TestData {
114 col_string: format!("buffered_{i}_{j}"),
115 };
116 let mut buf = Vec::new();
117 data.encode(&mut buf).unwrap();
118 rows.push(create_append_rows_request(vec![buf.clone(), buf]));
119 }
120 let mut result = stream.append_rows(rows).await.unwrap();
121 while let Some(res) = result.next().await {
122 let res = res?;
123 tracing::info!("append row errors = {:?}", res.row_errors.len());
124 }
125 let result = stream.flush_rows(Some(0)).await.unwrap();
126 tracing::info!("flush rows count = {:?}", result);
127
128 let result = stream.finalize().await.unwrap();
129 tracing::info!("finalized row count = {:?}", result);
130 Ok(())
131 }));
132 }
133
134 for task in tasks {
136 task.await.unwrap().unwrap();
137 }
138 }
139
140 #[serial_test::serial]
141 #[tokio::test]
142 async fn test_storage_write_single_stream() {
143 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
144 let project_id = project_id.unwrap();
145 let client = Client::new(config).await.unwrap();
146 let writer = client.buffered_storage_writer();
147
148 let mut streams = vec![];
150 let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id).to_string();
151 let stream = Arc::new(writer.create_write_stream(&table).await.unwrap());
152 for _i in 0..2 {
153 streams.push(stream.clone());
154 }
155
156 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
158 for (i, stream) in streams.into_iter().enumerate() {
159 tasks.push(tokio::spawn(async move {
160 let mut rows = vec![];
161 for j in 0..5 {
162 let data = TestData {
163 col_string: format!("buffered_{i}_{j}"),
164 };
165 let mut buf = Vec::new();
166 data.encode(&mut buf).unwrap();
167 rows.push(create_append_rows_request(vec![buf.clone(), buf]));
168 }
169 let mut result = stream.append_rows(rows).await.unwrap();
170 while let Some(res) = result.next().await {
171 let res = res?;
172 tracing::info!("append row errors = {:?}", res.row_errors.len());
173 }
174 Ok(())
175 }));
176 }
177
178 for task in tasks {
180 task.await.unwrap().unwrap();
181 }
182
183 let result = stream.flush_rows(Some(0)).await.unwrap();
184 tracing::info!("flush rows count = {:?}", result);
185
186 let result = stream.finalize().await.unwrap();
187 tracing::info!("finalized row count = {:?}", result);
188 }
189}