google_cloud_bigquery/storage_write/stream/
default.rs1use crate::grpc::apiv1::conn_pool::ConnectionManager;
2use crate::storage_write::stream::{AsStream, ManagedStreamDelegate, Stream};
3use crate::storage_write::AppendRowsRequestBuilder;
4use google_cloud_gax::grpc::{Status, Streaming};
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsResponse, GetWriteStreamRequest};
6use std::sync::Arc;
7
8pub struct Writer {
9 max_insert_count: usize,
10 cm: Arc<ConnectionManager>,
11}
12
13impl Writer {
14 pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>) -> Self {
15 Self { max_insert_count, cm }
16 }
17
18 pub async fn create_write_stream(&self, table: &str) -> Result<DefaultStream, Status> {
19 let stream = self
20 .cm
21 .writer()
22 .get_write_stream(
23 GetWriteStreamRequest {
24 name: format!("{table}/streams/_default"),
25 ..Default::default()
26 },
27 None,
28 )
29 .await?
30 .into_inner();
31 Ok(DefaultStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count)))
32 }
33}
34
35pub struct DefaultStream {
36 inner: Stream,
37}
38
39impl DefaultStream {
40 pub(crate) fn new(inner: Stream) -> Self {
41 Self { inner }
42 }
43 pub async fn append_rows(
44 &self,
45 rows: Vec<AppendRowsRequestBuilder>,
46 ) -> Result<Streaming<AppendRowsResponse>, Status> {
47 ManagedStreamDelegate::append_rows(&self.inner, rows).await
48 }
49}
50
51impl AsStream for DefaultStream {
52 fn as_ref(&self) -> &Stream {
53 &self.inner
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use crate::client::{Client, ClientConfig};
60 use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
61 use futures_util::StreamExt;
62 use google_cloud_gax::grpc::Status;
63 use prost::Message;
64 use tokio::task::JoinHandle;
65
66 #[ctor::ctor]
67 fn init() {
68 crate::storage_write::stream::tests::init();
69 }
70
71 #[serial_test::serial]
72 #[tokio::test]
73 async fn test_storage_write() {
74 let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
75 let project_id = project_id.unwrap();
76 let client = Client::new(config).await.unwrap();
77 let tables = ["write_test", "write_test_1"];
78 let writer = client.default_storage_writer();
79
80 let mut streams = vec![];
82 for i in 0..2 {
83 let table = format!(
84 "projects/{}/datasets/gcrbq_storage/tables/{}",
85 &project_id,
86 tables[i % tables.len()]
87 )
88 .to_string();
89 let stream = writer.create_write_stream(&table).await.unwrap();
90 streams.push(stream);
91 }
92
93 let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
95 for (i, stream) in streams.into_iter().enumerate() {
96 tasks.push(tokio::spawn(async move {
97 let mut rows = vec![];
98 for j in 0..5 {
99 let data = TestData {
100 col_string: format!("default_{i}_{j}"),
101 };
102 let mut buf = Vec::new();
103 data.encode(&mut buf).unwrap();
104 rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
105 }
106 let mut result = stream.append_rows(rows).await.unwrap();
107 while let Some(res) = result.next().await {
108 let res = res?;
109 tracing::info!("append row errors = {:?}", res.row_errors.len());
110 }
111 Ok(())
112 }));
113 }
114
115 for task in tasks {
117 task.await.unwrap().unwrap();
118 }
119 }
120}