google_cloud_bigquery/storage_write/stream/
default.rs

1use crate::grpc::apiv1::conn_pool::ConnectionManager;
2use crate::storage_write::stream::{AsStream, ManagedStreamDelegate, Stream};
3use crate::storage_write::AppendRowsRequestBuilder;
4use google_cloud_gax::grpc::{Status, Streaming};
5use google_cloud_googleapis::cloud::bigquery::storage::v1::{AppendRowsResponse, GetWriteStreamRequest};
6use std::sync::Arc;
7
8pub struct Writer {
9    max_insert_count: usize,
10    cm: Arc<ConnectionManager>,
11}
12
13impl Writer {
14    pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>) -> Self {
15        Self { max_insert_count, cm }
16    }
17
18    pub async fn create_write_stream(&self, table: &str) -> Result<DefaultStream, Status> {
19        let stream = self
20            .cm
21            .writer()
22            .get_write_stream(
23                GetWriteStreamRequest {
24                    name: format!("{table}/streams/_default"),
25                    ..Default::default()
26                },
27                None,
28            )
29            .await?
30            .into_inner();
31        Ok(DefaultStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count)))
32    }
33}
34
35pub struct DefaultStream {
36    inner: Stream,
37}
38
39impl DefaultStream {
40    pub(crate) fn new(inner: Stream) -> Self {
41        Self { inner }
42    }
43    pub async fn append_rows(
44        &self,
45        rows: Vec<AppendRowsRequestBuilder>,
46    ) -> Result<Streaming<AppendRowsResponse>, Status> {
47        ManagedStreamDelegate::append_rows(&self.inner, rows).await
48    }
49}
50
51impl AsStream for DefaultStream {
52    fn as_ref(&self) -> &Stream {
53        &self.inner
54    }
55}
56
57#[cfg(test)]
58mod tests {
59    use crate::client::{Client, ClientConfig};
60    use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
61    use futures_util::StreamExt;
62    use google_cloud_gax::grpc::Status;
63    use prost::Message;
64    use tokio::task::JoinHandle;
65
66    #[ctor::ctor]
67    fn init() {
68        crate::storage_write::stream::tests::init();
69    }
70
71    #[serial_test::serial]
72    #[tokio::test]
73    async fn test_storage_write() {
74        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
75        let project_id = project_id.unwrap();
76        let client = Client::new(config).await.unwrap();
77        let tables = ["write_test", "write_test_1"];
78        let writer = client.default_storage_writer();
79
80        // Create Streams
81        let mut streams = vec![];
82        for i in 0..2 {
83            let table = format!(
84                "projects/{}/datasets/gcrbq_storage/tables/{}",
85                &project_id,
86                tables[i % tables.len()]
87            )
88            .to_string();
89            let stream = writer.create_write_stream(&table).await.unwrap();
90            streams.push(stream);
91        }
92
93        // Append Rows
94        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
95        for (i, stream) in streams.into_iter().enumerate() {
96            tasks.push(tokio::spawn(async move {
97                let mut rows = vec![];
98                for j in 0..5 {
99                    let data = TestData {
100                        col_string: format!("default_{i}_{j}"),
101                    };
102                    let mut buf = Vec::new();
103                    data.encode(&mut buf).unwrap();
104                    rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
105                }
106                let mut result = stream.append_rows(rows).await.unwrap();
107                while let Some(res) = result.next().await {
108                    let res = res?;
109                    tracing::info!("append row errors = {:?}", res.row_errors.len());
110                }
111                Ok(())
112            }));
113        }
114
115        // Wait for append rows
116        for task in tasks {
117            task.await.unwrap().unwrap();
118        }
119    }
120}