google_cloud_bigquery/storage_write/stream/
committed.rs

1use crate::grpc::apiv1::bigquery_client::create_write_stream_request;
2use crate::grpc::apiv1::conn_pool::ConnectionManager;
3use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream};
4use crate::storage_write::AppendRowsRequestBuilder;
5use google_cloud_gax::grpc::{Status, Streaming};
6use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Committed;
7use google_cloud_googleapis::cloud::bigquery::storage::v1::AppendRowsResponse;
8use std::sync::Arc;
9
10pub struct Writer {
11    max_insert_count: usize,
12    cm: Arc<ConnectionManager>,
13}
14
15impl Writer {
16    pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>) -> Self {
17        Self { max_insert_count, cm }
18    }
19
20    pub async fn create_write_stream(&self, table: &str) -> Result<CommittedStream, Status> {
21        let req = create_write_stream_request(table, Committed);
22        let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner();
23        Ok(CommittedStream::new(Stream::new(
24            stream,
25            self.cm.clone(),
26            self.max_insert_count,
27        )))
28    }
29}
30
31pub struct CommittedStream {
32    inner: Stream,
33}
34
35impl CommittedStream {
36    pub(crate) fn new(inner: Stream) -> Self {
37        Self { inner }
38    }
39
40    pub async fn append_rows(
41        &self,
42        rows: Vec<AppendRowsRequestBuilder>,
43    ) -> Result<Streaming<AppendRowsResponse>, Status> {
44        ManagedStreamDelegate::append_rows(&self.inner, rows).await
45    }
46
47    pub async fn finalize(&self) -> Result<i64, Status> {
48        DisposableStreamDelegate::finalize(&self.inner).await
49    }
50}
51
52impl AsStream for CommittedStream {
53    fn as_ref(&self) -> &Stream {
54        &self.inner
55    }
56}
57#[cfg(test)]
58mod tests {
59    use crate::client::{Client, ClientConfig};
60    use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
61    use futures_util::StreamExt;
62    use google_cloud_gax::grpc::Status;
63    use prost::Message;
64    use std::sync::Arc;
65    use tokio::task::JoinHandle;
66
67    #[ctor::ctor]
68    fn init() {
69        crate::storage_write::stream::tests::init();
70    }
71
72    #[serial_test::serial]
73    #[tokio::test]
74    async fn test_storage_write() {
75        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
76        let project_id = project_id.unwrap();
77        let client = Client::new(config).await.unwrap();
78        let tables = ["write_test", "write_test_1"];
79        let writer = client.committed_storage_writer();
80
81        // Create Streams
82        let mut streams = vec![];
83        for i in 0..2 {
84            let table = format!(
85                "projects/{}/datasets/gcrbq_storage/tables/{}",
86                &project_id,
87                tables[i % tables.len()]
88            )
89            .to_string();
90            let stream = writer.create_write_stream(&table).await.unwrap();
91            streams.push(stream);
92        }
93
94        // Append Rows
95        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
96        for (i, stream) in streams.into_iter().enumerate() {
97            tasks.push(tokio::spawn(async move {
98                let mut rows = vec![];
99                for j in 0..5 {
100                    let data = TestData {
101                        col_string: format!("committed_{i}_{j}"),
102                    };
103                    let mut buf = Vec::new();
104                    data.encode(&mut buf).unwrap();
105                    rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
106                }
107                let mut result = stream.append_rows(rows).await.unwrap();
108                while let Some(res) = result.next().await {
109                    let res = res?;
110                    tracing::info!("append row errors = {:?}", res.row_errors.len());
111                }
112                let result = stream.finalize().await.unwrap();
113                tracing::info!("finalized row count = {:?}", result);
114                Ok(())
115            }));
116        }
117
118        // Wait for append rows
119        for task in tasks {
120            task.await.unwrap().unwrap();
121        }
122    }
123
124    #[serial_test::serial]
125    #[tokio::test]
126    async fn test_storage_write_single_stream() {
127        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
128        let project_id = project_id.unwrap();
129        let client = Client::new(config).await.unwrap();
130        let writer = client.committed_storage_writer();
131
132        // Create Streams
133        let mut streams = vec![];
134        let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id).to_string();
135        let stream = Arc::new(writer.create_write_stream(&table).await.unwrap());
136        for _i in 0..2 {
137            streams.push(stream.clone());
138        }
139
140        // Append Rows
141        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
142        for (i, stream) in streams.into_iter().enumerate() {
143            tasks.push(tokio::spawn(async move {
144                let mut rows = vec![];
145                for j in 0..5 {
146                    let data = TestData {
147                        col_string: format!("committed_{i}_{j}"),
148                    };
149                    let mut buf = Vec::new();
150                    data.encode(&mut buf).unwrap();
151                    rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
152                }
153                let mut result = stream.append_rows(rows).await.unwrap();
154                while let Some(res) = result.next().await {
155                    let res = res?;
156                    tracing::info!("append row errors = {:?}", res.row_errors.len());
157                }
158                Ok(())
159            }));
160        }
161
162        // Wait for append rows
163        for task in tasks {
164            task.await.unwrap().unwrap();
165        }
166
167        let result = stream.finalize().await.unwrap();
168        tracing::info!("finalized row count = {:?}", result);
169    }
170}