google_cloud_bigquery/storage_write/stream/
committed.rs1use crate::grpc::apiv1::bigquery_client::create_write_stream_request;
2use crate::grpc::apiv1::conn_pool::ConnectionManager;
3use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream};
4use crate::storage_write::AppendRowsRequestBuilder;
5use google_cloud_gax::grpc::{Status, Streaming};
6use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Committed;
7use google_cloud_googleapis::cloud::bigquery::storage::v1::AppendRowsResponse;
8use std::sync::Arc;
9
10pub struct Writer {
11 max_insert_count: usize,
12 cm: Arc<ConnectionManager>,
13}
14
15impl Writer {
16 pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>) -> Self {
17 Self { max_insert_count, cm }
18 }
19
20 pub async fn create_write_stream(&self, table: &str) -> Result<CommittedStream, Status> {
21 let req = create_write_stream_request(table, Committed);
22 let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner();
23 Ok(CommittedStream::new(Stream::new(
24 stream,
25 self.cm.clone(),
26 self.max_insert_count,
27 )))
28 }
29}
30
31pub struct CommittedStream {
32 inner: Stream,
33}
34
35impl CommittedStream {
36 pub(crate) fn new(inner: Stream) -> Self {
37 Self { inner }
38 }
39
40 pub async fn append_rows(
41 &self,
42 rows: Vec<AppendRowsRequestBuilder>,
43 ) -> Result<Streaming<AppendRowsResponse>, Status> {
44 ManagedStreamDelegate::append_rows(&self.inner, rows).await
45 }
46
47 pub async fn finalize(&self) -> Result<i64, Status> {
48 DisposableStreamDelegate::finalize(&self.inner).await
49 }
50}
51
52impl AsStream for CommittedStream {
53 fn as_ref(&self) -> &Stream {
54 &self.inner
55 }
56}
57#[cfg(test)]
58mod tests {
59 use crate::client::{Client, ClientConfig};
60 use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
61 use futures_util::StreamExt;
62 use google_cloud_gax::grpc::Status;
63 use prost::Message;
64 use std::sync::Arc;
65 use tokio::task::JoinHandle;
66
67 #[ctor::ctor]
68 fn init() {
69 crate::storage_write::stream::tests::init();
70 }
71
72 #[serial_test::serial]
73 #[tokio::test]
74 async fn test_storage_write() {
75 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
76 let project_id = project_id.unwrap();
77 let client = Client::new(config).await.unwrap();
78 let tables = ["write_test", "write_test_1"];
79 let writer = client.committed_storage_writer();
80
81 let mut streams = vec![];
83 for i in 0..2 {
84 let table = format!(
85 "projects/{}/datasets/gcrbq_storage/tables/{}",
86 &project_id,
87 tables[i % tables.len()]
88 )
89 .to_string();
90 let stream = writer.create_write_stream(&table).await.unwrap();
91 streams.push(stream);
92 }
93
94 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
96 for (i, stream) in streams.into_iter().enumerate() {
97 tasks.push(tokio::spawn(async move {
98 let mut rows = vec![];
99 for j in 0..5 {
100 let data = TestData {
101 col_string: format!("committed_{i}_{j}"),
102 };
103 let mut buf = Vec::new();
104 data.encode(&mut buf).unwrap();
105 rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
106 }
107 let mut result = stream.append_rows(rows).await.unwrap();
108 while let Some(res) = result.next().await {
109 let res = res?;
110 tracing::info!("append row errors = {:?}", res.row_errors.len());
111 }
112 let result = stream.finalize().await.unwrap();
113 tracing::info!("finalized row count = {:?}", result);
114 Ok(())
115 }));
116 }
117
118 for task in tasks {
120 task.await.unwrap().unwrap();
121 }
122 }
123
124 #[serial_test::serial]
125 #[tokio::test]
126 async fn test_storage_write_single_stream() {
127 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
128 let project_id = project_id.unwrap();
129 let client = Client::new(config).await.unwrap();
130 let writer = client.committed_storage_writer();
131
132 let mut streams = vec![];
134 let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id).to_string();
135 let stream = Arc::new(writer.create_write_stream(&table).await.unwrap());
136 for _i in 0..2 {
137 streams.push(stream.clone());
138 }
139
140 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
142 for (i, stream) in streams.into_iter().enumerate() {
143 tasks.push(tokio::spawn(async move {
144 let mut rows = vec![];
145 for j in 0..5 {
146 let data = TestData {
147 col_string: format!("committed_{i}_{j}"),
148 };
149 let mut buf = Vec::new();
150 data.encode(&mut buf).unwrap();
151 rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
152 }
153 let mut result = stream.append_rows(rows).await.unwrap();
154 while let Some(res) = result.next().await {
155 let res = res?;
156 tracing::info!("append row errors = {:?}", res.row_errors.len());
157 }
158 Ok(())
159 }));
160 }
161
162 for task in tasks {
164 task.await.unwrap().unwrap();
165 }
166
167 let result = stream.finalize().await.unwrap();
168 tracing::info!("finalized row count = {:?}", result);
169 }
170}