google_cloud_bigquery/grpc/apiv1/
bigquery_client.rs

1use std::time::Duration;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::create_request;
5use google_cloud_gax::grpc::{Code, IntoStreamingRequest, Response, Status, Streaming};
6use google_cloud_gax::retry::{invoke_fn, RetrySetting};
7use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
8use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_write_client::BigQueryWriteClient;
9use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type;
10use google_cloud_googleapis::cloud::bigquery::storage::v1::{
11    AppendRowsRequest, AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse,
12    CreateReadSessionRequest, CreateWriteStreamRequest, FinalizeWriteStreamRequest, FinalizeWriteStreamResponse,
13    FlushRowsRequest, FlushRowsResponse, GetWriteStreamRequest, ReadRowsRequest, ReadRowsResponse, ReadSession,
14    SplitReadStreamRequest, SplitReadStreamResponse, WriteStream,
15};
16
17fn default_setting() -> RetrySetting {
18    RetrySetting {
19        from_millis: 50,
20        max_delay: Some(Duration::from_secs(60)),
21        factor: 1u64,
22        take: 20,
23        codes: vec![Code::Unavailable, Code::Unknown],
24    }
25}
26
27pub struct StreamingReadClient {
28    inner: BigQueryReadClient<Channel>,
29}
30
31impl StreamingReadClient {
32    pub fn new(inner: BigQueryReadClient<Channel>) -> Self {
33        Self {
34            inner: inner.max_decoding_message_size(i32::MAX as usize),
35        }
36    }
37
38    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
39    pub async fn create_read_session(
40        &mut self,
41        req: CreateReadSessionRequest,
42        retry: Option<RetrySetting>,
43    ) -> Result<Response<ReadSession>, Status> {
44        let setting = retry.unwrap_or_else(default_setting);
45        let table = &req
46            .read_session
47            .as_ref()
48            .ok_or(Status::invalid_argument("read_session is required"))?
49            .table;
50        invoke_fn(
51            Some(setting),
52            |client| async {
53                let request = create_request(format!("read_session.table={table}"), req.clone());
54                client.create_read_session(request).await.map_err(|e| (e, client))
55            },
56            &mut self.inner,
57        )
58        .await
59    }
60
61    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
62    pub async fn read_rows(
63        &mut self,
64        req: ReadRowsRequest,
65        retry: Option<RetrySetting>,
66    ) -> Result<Response<Streaming<ReadRowsResponse>>, Status> {
67        let setting = retry.unwrap_or_else(default_setting);
68        let stream = &req.read_stream;
69        invoke_fn(
70            Some(setting),
71            |client| async {
72                let request = create_request(format!("read_stream={stream}"), req.clone());
73                client.read_rows(request).await.map_err(|e| (e, client))
74            },
75            &mut self.inner,
76        )
77        .await
78    }
79
80    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
81    pub async fn split_read_stream(
82        &mut self,
83        req: SplitReadStreamRequest,
84        retry: Option<RetrySetting>,
85    ) -> Result<Response<SplitReadStreamResponse>, Status> {
86        let setting = retry.unwrap_or_else(default_setting);
87        let name = &req.name;
88        invoke_fn(
89            Some(setting),
90            |client| async {
91                let request = create_request(format!("name={name}"), req.clone());
92                client.split_read_stream(request).await.map_err(|e| (e, client))
93            },
94            &mut self.inner,
95        )
96        .await
97    }
98}
99
100#[derive(Clone)]
101pub struct StreamingWriteClient {
102    inner: BigQueryWriteClient<Channel>,
103}
104
105impl StreamingWriteClient {
106    pub fn new(inner: BigQueryWriteClient<Channel>) -> Self {
107        Self { inner }
108    }
109
110    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
111    pub async fn append_rows(
112        &mut self,
113        req: impl IntoStreamingRequest<Message = AppendRowsRequest>,
114    ) -> Result<Response<Streaming<AppendRowsResponse>>, Status> {
115        self.inner.append_rows(req).await
116    }
117
118    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
119    pub async fn create_write_stream(
120        &mut self,
121        req: CreateWriteStreamRequest,
122        retry: Option<RetrySetting>,
123    ) -> Result<Response<WriteStream>, Status> {
124        let setting = retry.unwrap_or_else(default_setting);
125        let parent = &req.parent;
126        invoke_fn(
127            Some(setting),
128            |client| async {
129                let request = create_request(format!("parent={parent}"), req.clone());
130                client.create_write_stream(request).await.map_err(|e| (e, client))
131            },
132            &mut self.inner,
133        )
134        .await
135    }
136
137    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
138    pub async fn get_write_stream(
139        &mut self,
140        req: GetWriteStreamRequest,
141        retry: Option<RetrySetting>,
142    ) -> Result<Response<WriteStream>, Status> {
143        let setting = retry.unwrap_or_else(default_setting);
144        let name = &req.name;
145        invoke_fn(
146            Some(setting),
147            |client| async {
148                let request = create_request(format!("name={name}"), req.clone());
149                client.get_write_stream(request).await.map_err(|e| (e, client))
150            },
151            &mut self.inner,
152        )
153        .await
154    }
155
156    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
157    pub async fn finalize_write_stream(
158        &mut self,
159        req: FinalizeWriteStreamRequest,
160        retry: Option<RetrySetting>,
161    ) -> Result<Response<FinalizeWriteStreamResponse>, Status> {
162        let setting = retry.unwrap_or_else(default_setting);
163        let name = &req.name;
164        invoke_fn(
165            Some(setting),
166            |client| async {
167                let request = create_request(format!("name={name}"), req.clone());
168                client.finalize_write_stream(request).await.map_err(|e| (e, client))
169            },
170            &mut self.inner,
171        )
172        .await
173    }
174
175    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
176    pub async fn batch_commit_write_streams(
177        &mut self,
178        req: BatchCommitWriteStreamsRequest,
179        retry: Option<RetrySetting>,
180    ) -> Result<Response<BatchCommitWriteStreamsResponse>, Status> {
181        let setting = retry.unwrap_or_else(default_setting);
182        let parent = &req.parent;
183        invoke_fn(
184            Some(setting),
185            |client| async {
186                let request = create_request(format!("parent={parent}"), req.clone());
187                client
188                    .batch_commit_write_streams(request)
189                    .await
190                    .map_err(|e| (e, client))
191            },
192            &mut self.inner,
193        )
194        .await
195    }
196
197    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
198    pub async fn flush_rows(
199        &mut self,
200        req: FlushRowsRequest,
201        retry: Option<RetrySetting>,
202    ) -> Result<Response<FlushRowsResponse>, Status> {
203        let setting = retry.unwrap_or_else(default_setting);
204        let write_stream = &req.write_stream;
205        invoke_fn(
206            Some(setting),
207            |client| async {
208                let request = create_request(format!("write_stream={write_stream}"), req.clone());
209                client.flush_rows(request).await.map_err(|e| (e, client))
210            },
211            &mut self.inner,
212        )
213        .await
214    }
215}
216
217pub(crate) fn create_write_stream_request(table: &str, write_type: Type) -> CreateWriteStreamRequest {
218    CreateWriteStreamRequest {
219        parent: table.to_string(),
220        write_stream: Some(WriteStream {
221            name: "".to_string(),
222            r#type: write_type as i32,
223            create_time: None,
224            commit_time: None,
225            table_schema: None,
226            write_mode: 0,
227            location: "".to_string(),
228        }),
229    }
230}