google_cloud_bigquery/storage_write/stream/
pending.rs1use crate::grpc::apiv1::bigquery_client::create_write_stream_request;
2use crate::grpc::apiv1::conn_pool::ConnectionManager;
3use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream};
4use crate::storage_write::AppendRowsRequestBuilder;
5use google_cloud_gax::grpc::{Status, Streaming};
6use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Pending;
7use google_cloud_googleapis::cloud::bigquery::storage::v1::{
8 AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse,
9};
10use std::sync::Arc;
11
12pub struct Writer {
13 max_insert_count: usize,
14 cm: Arc<ConnectionManager>,
15 table: String,
16 streams: Vec<String>,
17}
18
19impl Writer {
20 pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>, table: String) -> Self {
21 Self {
22 max_insert_count,
23 cm,
24 table,
25 streams: Vec::new(),
26 }
27 }
28
29 pub async fn create_write_stream(&mut self) -> Result<PendingStream, Status> {
30 let req = create_write_stream_request(&self.table, Pending);
31 let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner();
32 self.streams.push(stream.name.to_string());
33 Ok(PendingStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count)))
34 }
35
36 pub async fn commit(&self) -> Result<BatchCommitWriteStreamsResponse, Status> {
37 let result = self
38 .cm
39 .writer()
40 .batch_commit_write_streams(
41 BatchCommitWriteStreamsRequest {
42 parent: self.table.to_string(),
43 write_streams: self.streams.clone(),
44 },
45 None,
46 )
47 .await?
48 .into_inner();
49 Ok(result)
50 }
51}
52pub struct PendingStream {
53 inner: Stream,
54}
55
56impl PendingStream {
57 pub(crate) fn new(inner: Stream) -> Self {
58 Self { inner }
59 }
60
61 pub async fn append_rows(
62 &self,
63 rows: Vec<AppendRowsRequestBuilder>,
64 ) -> Result<Streaming<AppendRowsResponse>, Status> {
65 ManagedStreamDelegate::append_rows(&self.inner, rows).await
66 }
67
68 pub async fn finalize(&self) -> Result<i64, Status> {
69 DisposableStreamDelegate::finalize(&self.inner).await
70 }
71}
72
73impl AsStream for PendingStream {
74 fn as_ref(&self) -> &Stream {
75 &self.inner
76 }
77}
78#[cfg(test)]
79mod tests {
80 use crate::client::{Client, ClientConfig};
81 use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
82 use futures_util::StreamExt;
83 use google_cloud_gax::grpc::Status;
84 use prost::Message;
85 use std::sync::Arc;
86 use tokio::task::JoinHandle;
87
88 #[ctor::ctor]
89 fn init() {
90 crate::storage_write::stream::tests::init();
91 }
92
93 #[serial_test::serial]
94 #[tokio::test]
95 async fn test_storage_write() {
96 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
97 let project_id = project_id.unwrap();
98 let client = Client::new(config).await.unwrap();
99 let tables = ["write_test", "write_test_1"];
100
101 let mut writers = vec![];
103 for i in 0..2 {
104 let table = format!(
105 "projects/{}/datasets/gcrbq_storage/tables/{}",
106 &project_id,
107 tables[i % tables.len()]
108 );
109 let writer = client.pending_storage_writer(&table);
110 writers.push(writer);
111 }
112
113 let mut streams = vec![];
115 for writer in writers.iter_mut() {
116 let stream = writer.create_write_stream().await.unwrap();
117 streams.push(stream);
118 }
119
120 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
122 for (i, stream) in streams.into_iter().enumerate() {
123 tasks.push(tokio::spawn(async move {
124 let mut rows = vec![];
125 for j in 0..5 {
126 let data = TestData {
127 col_string: format!("pending_{i}_{j}"),
128 };
129 let mut buf = Vec::new();
130 data.encode(&mut buf).unwrap();
131 rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
132 }
133 let mut result = stream.append_rows(rows).await.unwrap();
134 while let Some(res) = result.next().await {
135 let res = res?;
136 tracing::info!("append row errors = {:?}", res.row_errors.len());
137 }
138 let result = stream.finalize().await.unwrap();
139 tracing::info!("finalized row count = {:?}", result);
140 Ok(())
141 }));
142 }
143
144 for task in tasks {
146 task.await.unwrap().unwrap();
147 }
148
149 for writer in writers.iter_mut() {
150 let result = writer.commit().await.unwrap();
151 tracing::info!("committed error count = {:?}", result.stream_errors.len());
152 }
153 }
154
155 #[serial_test::serial]
156 #[tokio::test]
157 async fn test_storage_write_single_stream() {
158 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
159 let project_id = project_id.unwrap();
160 let client = Client::new(config).await.unwrap();
161
162 let mut streams = vec![];
164 let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id);
165 let mut writer = client.pending_storage_writer(&table);
166 let stream = Arc::new(writer.create_write_stream().await.unwrap());
167 for _i in 0..2 {
168 streams.push(stream.clone());
169 }
170
171 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
173 for (i, stream) in streams.into_iter().enumerate() {
174 tasks.push(tokio::spawn(async move {
175 let mut rows = vec![];
176 for j in 0..5 {
177 let data = TestData {
178 col_string: format!("pending_{i}_{j}"),
179 };
180 let mut buf = Vec::new();
181 data.encode(&mut buf).unwrap();
182 rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
183 }
184 let mut result = stream.append_rows(rows).await.unwrap();
185 while let Some(res) = result.next().await {
186 let res = res?;
187 tracing::info!("append row errors = {:?}", res.row_errors.len());
188 }
189 Ok(())
190 }));
191 }
192
193 for task in tasks {
195 task.await.unwrap().unwrap();
196 }
197
198 let result = stream.finalize().await.unwrap();
199 tracing::info!("finalized row count = {:?}", result);
200
201 let result = writer.commit().await.unwrap();
202 tracing::info!("commit error count = {:?}", result.stream_errors.len());
203 }
204}