google_cloud_bigquery/storage_write/stream/
pending.rs

1use crate::grpc::apiv1::bigquery_client::create_write_stream_request;
2use crate::grpc::apiv1::conn_pool::ConnectionManager;
3use crate::storage_write::stream::{AsStream, DisposableStreamDelegate, ManagedStreamDelegate, Stream};
4use crate::storage_write::AppendRowsRequestBuilder;
5use google_cloud_gax::grpc::{Status, Streaming};
6use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type::Pending;
7use google_cloud_googleapis::cloud::bigquery::storage::v1::{
8    AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse,
9};
10use std::sync::Arc;
11
12pub struct Writer {
13    max_insert_count: usize,
14    cm: Arc<ConnectionManager>,
15    table: String,
16    streams: Vec<String>,
17}
18
19impl Writer {
20    pub(crate) fn new(max_insert_count: usize, cm: Arc<ConnectionManager>, table: String) -> Self {
21        Self {
22            max_insert_count,
23            cm,
24            table,
25            streams: Vec::new(),
26        }
27    }
28
29    pub async fn create_write_stream(&mut self) -> Result<PendingStream, Status> {
30        let req = create_write_stream_request(&self.table, Pending);
31        let stream = self.cm.writer().create_write_stream(req, None).await?.into_inner();
32        self.streams.push(stream.name.to_string());
33        Ok(PendingStream::new(Stream::new(stream, self.cm.clone(), self.max_insert_count)))
34    }
35
36    pub async fn commit(&self) -> Result<BatchCommitWriteStreamsResponse, Status> {
37        let result = self
38            .cm
39            .writer()
40            .batch_commit_write_streams(
41                BatchCommitWriteStreamsRequest {
42                    parent: self.table.to_string(),
43                    write_streams: self.streams.clone(),
44                },
45                None,
46            )
47            .await?
48            .into_inner();
49        Ok(result)
50    }
51}
52pub struct PendingStream {
53    inner: Stream,
54}
55
56impl PendingStream {
57    pub(crate) fn new(inner: Stream) -> Self {
58        Self { inner }
59    }
60
61    pub async fn append_rows(
62        &self,
63        rows: Vec<AppendRowsRequestBuilder>,
64    ) -> Result<Streaming<AppendRowsResponse>, Status> {
65        ManagedStreamDelegate::append_rows(&self.inner, rows).await
66    }
67
68    pub async fn finalize(&self) -> Result<i64, Status> {
69        DisposableStreamDelegate::finalize(&self.inner).await
70    }
71}
72
73impl AsStream for PendingStream {
74    fn as_ref(&self) -> &Stream {
75        &self.inner
76    }
77}
78#[cfg(test)]
79mod tests {
80    use crate::client::{Client, ClientConfig};
81    use crate::storage_write::stream::tests::{create_append_rows_request, TestData};
82    use futures_util::StreamExt;
83    use google_cloud_gax::grpc::Status;
84    use prost::Message;
85    use std::sync::Arc;
86    use tokio::task::JoinHandle;
87
88    #[ctor::ctor]
89    fn init() {
90        crate::storage_write::stream::tests::init();
91    }
92
93    #[serial_test::serial]
94    #[tokio::test]
95    async fn test_storage_write() {
96        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
97        let project_id = project_id.unwrap();
98        let client = Client::new(config).await.unwrap();
99        let tables = ["write_test", "write_test_1"];
100
101        // Create Writers
102        let mut writers = vec![];
103        for i in 0..2 {
104            let table = format!(
105                "projects/{}/datasets/gcrbq_storage/tables/{}",
106                &project_id,
107                tables[i % tables.len()]
108            );
109            let writer = client.pending_storage_writer(&table);
110            writers.push(writer);
111        }
112
113        // Create Streams
114        let mut streams = vec![];
115        for writer in writers.iter_mut() {
116            let stream = writer.create_write_stream().await.unwrap();
117            streams.push(stream);
118        }
119
120        // Append Rows
121        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
122        for (i, stream) in streams.into_iter().enumerate() {
123            tasks.push(tokio::spawn(async move {
124                let mut rows = vec![];
125                for j in 0..5 {
126                    let data = TestData {
127                        col_string: format!("pending_{i}_{j}"),
128                    };
129                    let mut buf = Vec::new();
130                    data.encode(&mut buf).unwrap();
131                    rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
132                }
133                let mut result = stream.append_rows(rows).await.unwrap();
134                while let Some(res) = result.next().await {
135                    let res = res?;
136                    tracing::info!("append row errors = {:?}", res.row_errors.len());
137                }
138                let result = stream.finalize().await.unwrap();
139                tracing::info!("finalized row count = {:?}", result);
140                Ok(())
141            }));
142        }
143
144        // Wait for append rows
145        for task in tasks {
146            task.await.unwrap().unwrap();
147        }
148
149        for writer in writers.iter_mut() {
150            let result = writer.commit().await.unwrap();
151            tracing::info!("committed error count = {:?}", result.stream_errors.len());
152        }
153    }
154
155    #[serial_test::serial]
156    #[tokio::test]
157    async fn test_storage_write_single_stream() {
158        let (config, project_id) = ClientConfig::new_with_auth().await.unwrap();
159        let project_id = project_id.unwrap();
160        let client = Client::new(config).await.unwrap();
161
162        // Create Streams
163        let mut streams = vec![];
164        let table = format!("projects/{}/datasets/gcrbq_storage/tables/write_test", &project_id);
165        let mut writer = client.pending_storage_writer(&table);
166        let stream = Arc::new(writer.create_write_stream().await.unwrap());
167        for _i in 0..2 {
168            streams.push(stream.clone());
169        }
170
171        // Append Rows
172        let mut tasks: Vec<JoinHandle<Result<(), Status>>> = vec![];
173        for (i, stream) in streams.into_iter().enumerate() {
174            tasks.push(tokio::spawn(async move {
175                let mut rows = vec![];
176                for j in 0..5 {
177                    let data = TestData {
178                        col_string: format!("pending_{i}_{j}"),
179                    };
180                    let mut buf = Vec::new();
181                    data.encode(&mut buf).unwrap();
182                    rows.push(create_append_rows_request(vec![buf.clone(), buf.clone(), buf]));
183                }
184                let mut result = stream.append_rows(rows).await.unwrap();
185                while let Some(res) = result.next().await {
186                    let res = res?;
187                    tracing::info!("append row errors = {:?}", res.row_errors.len());
188                }
189                Ok(())
190            }));
191        }
192
193        // Wait for append rows
194        for task in tasks {
195            task.await.unwrap().unwrap();
196        }
197
198        let result = stream.finalize().await.unwrap();
199        tracing::info!("finalized row count = {:?}", result);
200
201        let result = writer.commit().await.unwrap();
202        tracing::info!("commit error count = {:?}", result.stream_errors.len());
203    }
204}