google_cloud_bigquery/grpc/apiv1/
bigquery_client.rs1use std::time::Duration;
2
3use google_cloud_gax::conn::Channel;
4use google_cloud_gax::create_request;
5use google_cloud_gax::grpc::{Code, IntoStreamingRequest, Response, Status, Streaming};
6use google_cloud_gax::retry::{invoke_fn, RetrySetting};
7use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_read_client::BigQueryReadClient;
8use google_cloud_googleapis::cloud::bigquery::storage::v1::big_query_write_client::BigQueryWriteClient;
9use google_cloud_googleapis::cloud::bigquery::storage::v1::write_stream::Type;
10use google_cloud_googleapis::cloud::bigquery::storage::v1::{
11 AppendRowsRequest, AppendRowsResponse, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse,
12 CreateReadSessionRequest, CreateWriteStreamRequest, FinalizeWriteStreamRequest, FinalizeWriteStreamResponse,
13 FlushRowsRequest, FlushRowsResponse, GetWriteStreamRequest, ReadRowsRequest, ReadRowsResponse, ReadSession,
14 SplitReadStreamRequest, SplitReadStreamResponse, WriteStream,
15};
16
17fn default_setting() -> RetrySetting {
18 RetrySetting {
19 from_millis: 50,
20 max_delay: Some(Duration::from_secs(60)),
21 factor: 1u64,
22 take: 20,
23 codes: vec![Code::Unavailable, Code::Unknown],
24 }
25}
26
27pub struct StreamingReadClient {
28 inner: BigQueryReadClient<Channel>,
29}
30
31impl StreamingReadClient {
32 pub fn new(inner: BigQueryReadClient<Channel>) -> Self {
33 Self {
34 inner: inner.max_decoding_message_size(i32::MAX as usize),
35 }
36 }
37
38 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
39 pub async fn create_read_session(
40 &mut self,
41 req: CreateReadSessionRequest,
42 retry: Option<RetrySetting>,
43 ) -> Result<Response<ReadSession>, Status> {
44 let setting = retry.unwrap_or_else(default_setting);
45 let table = &req
46 .read_session
47 .as_ref()
48 .ok_or(Status::invalid_argument("read_session is required"))?
49 .table;
50 invoke_fn(
51 Some(setting),
52 |client| async {
53 let request = create_request(format!("read_session.table={table}"), req.clone());
54 client.create_read_session(request).await.map_err(|e| (e, client))
55 },
56 &mut self.inner,
57 )
58 .await
59 }
60
61 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
62 pub async fn read_rows(
63 &mut self,
64 req: ReadRowsRequest,
65 retry: Option<RetrySetting>,
66 ) -> Result<Response<Streaming<ReadRowsResponse>>, Status> {
67 let setting = retry.unwrap_or_else(default_setting);
68 let stream = &req.read_stream;
69 invoke_fn(
70 Some(setting),
71 |client| async {
72 let request = create_request(format!("read_stream={stream}"), req.clone());
73 client.read_rows(request).await.map_err(|e| (e, client))
74 },
75 &mut self.inner,
76 )
77 .await
78 }
79
80 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
81 pub async fn split_read_stream(
82 &mut self,
83 req: SplitReadStreamRequest,
84 retry: Option<RetrySetting>,
85 ) -> Result<Response<SplitReadStreamResponse>, Status> {
86 let setting = retry.unwrap_or_else(default_setting);
87 let name = &req.name;
88 invoke_fn(
89 Some(setting),
90 |client| async {
91 let request = create_request(format!("name={name}"), req.clone());
92 client.split_read_stream(request).await.map_err(|e| (e, client))
93 },
94 &mut self.inner,
95 )
96 .await
97 }
98}
99
100#[derive(Clone)]
101pub struct StreamingWriteClient {
102 inner: BigQueryWriteClient<Channel>,
103}
104
105impl StreamingWriteClient {
106 pub fn new(inner: BigQueryWriteClient<Channel>) -> Self {
107 Self { inner }
108 }
109
110 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
111 pub async fn append_rows(
112 &mut self,
113 req: impl IntoStreamingRequest<Message = AppendRowsRequest>,
114 ) -> Result<Response<Streaming<AppendRowsResponse>>, Status> {
115 self.inner.append_rows(req).await
116 }
117
118 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
119 pub async fn create_write_stream(
120 &mut self,
121 req: CreateWriteStreamRequest,
122 retry: Option<RetrySetting>,
123 ) -> Result<Response<WriteStream>, Status> {
124 let setting = retry.unwrap_or_else(default_setting);
125 let parent = &req.parent;
126 invoke_fn(
127 Some(setting),
128 |client| async {
129 let request = create_request(format!("parent={parent}"), req.clone());
130 client.create_write_stream(request).await.map_err(|e| (e, client))
131 },
132 &mut self.inner,
133 )
134 .await
135 }
136
137 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
138 pub async fn get_write_stream(
139 &mut self,
140 req: GetWriteStreamRequest,
141 retry: Option<RetrySetting>,
142 ) -> Result<Response<WriteStream>, Status> {
143 let setting = retry.unwrap_or_else(default_setting);
144 let name = &req.name;
145 invoke_fn(
146 Some(setting),
147 |client| async {
148 let request = create_request(format!("name={name}"), req.clone());
149 client.get_write_stream(request).await.map_err(|e| (e, client))
150 },
151 &mut self.inner,
152 )
153 .await
154 }
155
156 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
157 pub async fn finalize_write_stream(
158 &mut self,
159 req: FinalizeWriteStreamRequest,
160 retry: Option<RetrySetting>,
161 ) -> Result<Response<FinalizeWriteStreamResponse>, Status> {
162 let setting = retry.unwrap_or_else(default_setting);
163 let name = &req.name;
164 invoke_fn(
165 Some(setting),
166 |client| async {
167 let request = create_request(format!("name={name}"), req.clone());
168 client.finalize_write_stream(request).await.map_err(|e| (e, client))
169 },
170 &mut self.inner,
171 )
172 .await
173 }
174
175 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
176 pub async fn batch_commit_write_streams(
177 &mut self,
178 req: BatchCommitWriteStreamsRequest,
179 retry: Option<RetrySetting>,
180 ) -> Result<Response<BatchCommitWriteStreamsResponse>, Status> {
181 let setting = retry.unwrap_or_else(default_setting);
182 let parent = &req.parent;
183 invoke_fn(
184 Some(setting),
185 |client| async {
186 let request = create_request(format!("parent={parent}"), req.clone());
187 client
188 .batch_commit_write_streams(request)
189 .await
190 .map_err(|e| (e, client))
191 },
192 &mut self.inner,
193 )
194 .await
195 }
196
197 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
198 pub async fn flush_rows(
199 &mut self,
200 req: FlushRowsRequest,
201 retry: Option<RetrySetting>,
202 ) -> Result<Response<FlushRowsResponse>, Status> {
203 let setting = retry.unwrap_or_else(default_setting);
204 let write_stream = &req.write_stream;
205 invoke_fn(
206 Some(setting),
207 |client| async {
208 let request = create_request(format!("write_stream={write_stream}"), req.clone());
209 client.flush_rows(request).await.map_err(|e| (e, client))
210 },
211 &mut self.inner,
212 )
213 .await
214 }
215}
216
217pub(crate) fn create_write_stream_request(table: &str, write_type: Type) -> CreateWriteStreamRequest {
218 CreateWriteStreamRequest {
219 parent: table.to_string(),
220 write_stream: Some(WriteStream {
221 name: "".to_string(),
222 r#type: write_type as i32,
223 create_time: None,
224 commit_time: None,
225 table_schema: None,
226 write_mode: 0,
227 location: "".to_string(),
228 }),
229 }
230}