1use std::sync::Arc;
2
3use crate::http::bigquery_client::BigqueryClient;
4use crate::http::error::Error;
5use crate::http::table;
6use crate::http::table::get_iam_policy::GetIamPolicyRequest;
7use crate::http::table::list::{ListTablesRequest, ListTablesResponse, TableOverview};
8use crate::http::table::set_iam_policy::SetIamPolicyRequest;
9use crate::http::table::test_iam_permissions::{TestIamPermissionsRequest, TestIamPermissionsResponse};
10use crate::http::table::Table;
11use crate::http::types::Policy;
12
13#[derive(Debug, Clone)]
14pub struct BigqueryTableClient {
15 inner: Arc<BigqueryClient>,
16}
17
18impl BigqueryTableClient {
19 pub fn new(inner: Arc<BigqueryClient>) -> Self {
20 Self { inner }
21 }
22
23 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
52 pub async fn create(&self, metadata: &Table) -> Result<Table, Error> {
53 let builder = table::insert::build(self.inner.endpoint(), self.inner.http(), metadata);
54 self.inner.send(builder).await
55 }
56
57 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
59 pub async fn delete(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<(), Error> {
60 let builder = table::delete::build(self.inner.endpoint(), self.inner.http(), project_id, dataset_id, table_id);
61 self.inner.send_get_empty(builder).await
62 }
63
64 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
66 pub async fn patch(&self, metadata: &Table) -> Result<Table, Error> {
67 let builder = table::patch::build(self.inner.endpoint(), self.inner.http(), metadata);
68 self.inner.send(builder).await
69 }
70
71 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
73 pub async fn get(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<Table, Error> {
74 let builder = table::get::build(self.inner.endpoint(), self.inner.http(), project_id, dataset_id, table_id);
75 self.inner.send(builder).await
76 }
77
78 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
80 pub async fn get_iam_policy(
81 &self,
82 project_id: &str,
83 dataset_id: &str,
84 table_id: &str,
85 req: &GetIamPolicyRequest,
86 ) -> Result<Policy, Error> {
87 let builder = table::get_iam_policy::build(
88 self.inner.endpoint(),
89 self.inner.http(),
90 project_id,
91 dataset_id,
92 table_id,
93 req,
94 );
95 self.inner.send(builder).await
96 }
97
98 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
100 pub async fn set_iam_policy(
101 &self,
102 project_id: &str,
103 dataset_id: &str,
104 table_id: &str,
105 req: &SetIamPolicyRequest,
106 ) -> Result<Policy, Error> {
107 let builder = table::set_iam_policy::build(
108 self.inner.endpoint(),
109 self.inner.http(),
110 project_id,
111 dataset_id,
112 table_id,
113 req,
114 );
115 self.inner.send(builder).await
116 }
117
118 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
120 pub async fn test_iam_permissions(
121 &self,
122 project_id: &str,
123 dataset_id: &str,
124 table_id: &str,
125 req: &TestIamPermissionsRequest,
126 ) -> Result<TestIamPermissionsResponse, Error> {
127 let builder = table::test_iam_permissions::build(
128 self.inner.endpoint(),
129 self.inner.http(),
130 project_id,
131 dataset_id,
132 table_id,
133 req,
134 );
135 self.inner.send(builder).await
136 }
137
138 #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
140 pub async fn list(
141 &self,
142 project_id: &str,
143 dataset_id: &str,
144 req: &ListTablesRequest,
145 ) -> Result<Vec<TableOverview>, Error> {
146 let mut page_token: Option<String> = None;
147 let mut tables = vec![];
148 loop {
149 let builder = table::list::build(
150 self.inner.endpoint(),
151 self.inner.http(),
152 project_id,
153 dataset_id,
154 req,
155 page_token,
156 );
157 let response: ListTablesResponse = self.inner.send(builder).await?;
158 tables.extend(response.tables);
159 if response.next_page_token.is_none() {
160 break;
161 }
162 page_token = response.next_page_token;
163 }
164 Ok(tables)
165 }
166}
167
168#[cfg(test)]
169mod test {
170 use std::ops::Add;
171 use std::sync::Arc;
172
173 use serial_test::serial;
174 use time::OffsetDateTime;
175
176 use crate::http::bigquery_client::test::{bucket_name, create_client, dataset_name};
177 use crate::http::bigquery_table_client::BigqueryTableClient;
178 use crate::http::table::get_iam_policy::GetIamPolicyRequest;
179 use crate::http::table::list::ListTablesRequest;
180 use crate::http::table::set_iam_policy::SetIamPolicyRequest;
181 use crate::http::table::{
182 Clustering, CsvOptions, ExternalDataConfiguration, MaterializedViewDefinition, PartitionRange,
183 RangePartitioning, RoundingMode, SourceFormat, Table, TableFieldMode, TableFieldSchema, TableFieldType,
184 TableSchema, TimePartitionType, TimePartitioning, ViewDefinition,
185 };
186 use crate::http::types::{Bindings, Policy};
187
188 #[tokio::test]
189 #[serial]
190 pub async fn crud_table() {
191 let dataset = dataset_name("table");
192 let (client, project) = create_client().await;
193 let client = BigqueryTableClient::new(Arc::new(client));
194
195 let mut table1 = Table::default();
197 table1.table_reference.dataset_id = dataset.to_string();
198 table1.table_reference.project_id = project.to_string();
199 table1.table_reference.table_id = "table1".to_string();
200 table1.schema = Some(TableSchema {
201 fields: vec![
202 TableFieldSchema {
203 name: "col1".to_string(),
204 data_type: TableFieldType::String,
205 description: Some("column1".to_string()),
206 max_length: Some(32),
207 ..Default::default()
208 },
209 TableFieldSchema {
210 name: "col2".to_string(),
211 data_type: TableFieldType::Numeric,
212 description: Some("column2".to_string()),
213 precision: Some(10),
214 rounding_mode: Some(RoundingMode::RoundHalfEven),
215 scale: Some(2),
216 ..Default::default()
217 },
218 TableFieldSchema {
219 name: "col3".to_string(),
220 data_type: TableFieldType::Timestamp,
221 mode: Some(TableFieldMode::Required),
222 default_value_expression: Some("CURRENT_TIMESTAMP".to_string()),
223 ..Default::default()
224 },
225 TableFieldSchema {
226 name: "col4".to_string(),
227 data_type: TableFieldType::Int64,
228 mode: Some(TableFieldMode::Repeated),
229 ..Default::default()
230 },
231 TableFieldSchema {
232 name: "col5".to_string(),
233 data_type: TableFieldType::Int64,
234 ..Default::default()
235 },
236 ],
237 });
238 let table1 = client.create(&table1).await.unwrap();
239
240 let ref1 = &table1.table_reference;
242 let policy = client
243 .set_iam_policy(
244 &ref1.project_id,
245 &ref1.dataset_id,
246 &ref1.table_id,
247 &SetIamPolicyRequest {
248 policy: Policy {
249 bindings: vec![Bindings {
250 role: "roles/viewer".to_string(),
251 members: vec!["allAuthenticatedUsers".to_string()],
252 ..Default::default()
253 }],
254 ..Default::default()
255 },
256 ..Default::default()
257 },
258 )
259 .await
260 .unwrap();
261 let actual_policy = client
262 .get_iam_policy(
263 &ref1.project_id,
264 &ref1.dataset_id,
265 &ref1.table_id,
266 &GetIamPolicyRequest::default(),
267 )
268 .await
269 .unwrap();
270 assert_eq!(policy, actual_policy);
271
272 let mut view = Table::default();
273 view.table_reference.dataset_id = table1.table_reference.dataset_id.to_string();
274 view.table_reference.project_id = table1.table_reference.project_id.to_string();
275 view.table_reference.table_id = "view1".to_string();
276 view.view = Some(ViewDefinition {
277 query: format!("SELECT col1 FROM {}.table1", dataset),
278 ..Default::default()
279 });
280 let _view = client.create(&view).await.unwrap();
281
282 let mut table2 = table1.clone();
284 table2.table_reference.table_id = "range_partition".to_string();
285 table2.range_partitioning = Some(RangePartitioning {
286 field: "col5".to_string(),
287 range: PartitionRange {
288 start: "1".to_string(),
289 end: "10000".to_string(),
290 interval: "1".to_string(),
291 },
292 });
293 table2.expiration_time = Some(OffsetDateTime::now_utc().add(time::Duration::days(1)).unix_timestamp() * 1000);
294 let _table2 = client.create(&table2).await.unwrap();
295
296 let mut table3 = table1.clone();
298 table3.table_reference.table_id = "time_partition".to_string();
299 table3.time_partitioning = Some(TimePartitioning {
300 partition_type: TimePartitionType::Day,
301 expiration_ms: Some(3600000),
302 field: Some("col3".to_string()),
303 });
304 table3.clustering = Some(Clustering {
305 fields: vec!["col1".to_string(), "col5".to_string()],
306 });
307 let _table3 = client.create(&table3).await.unwrap();
308
309 let mut mv = Table::default();
311 mv.table_reference.dataset_id = table1.table_reference.dataset_id.to_string();
312 mv.table_reference.project_id = table1.table_reference.project_id.to_string();
313 mv.table_reference.table_id = "materialized_view1".to_string();
314 mv.materialized_view = Some(MaterializedViewDefinition {
315 query: format!("SELECT col2 FROM {}.table1", dataset),
316 refresh_interval_ms: Some(3600000),
317 ..Default::default()
318 });
319 let _mv = client.create(&mv).await.unwrap();
320
321 let tables = client
323 .list(
324 project.as_str(),
325 &table1.table_reference.dataset_id,
326 &ListTablesRequest::default(),
327 )
328 .await
329 .unwrap();
330 for table in tables {
331 let table = table.table_reference;
332 client
333 .delete(table.project_id.as_str(), table.dataset_id.as_str(), table.table_id.as_str())
334 .await
335 .unwrap();
336 }
337 }
338
339 #[tokio::test]
340 #[serial]
341 pub async fn external_table() {
342 let dataset = dataset_name("table");
343 let (client, project) = create_client().await;
344 let client = BigqueryTableClient::new(Arc::new(client));
345
346 let mut table = Table::default();
348 table.table_reference.dataset_id = dataset.to_string();
349 table.table_reference.project_id = project.to_string();
350 table.table_reference.table_id = format!("external_data_{}", OffsetDateTime::now_utc().unix_timestamp());
351 table.external_data_configuration = Some(ExternalDataConfiguration {
352 source_uris: vec![format!("gs://{}/external_data.csv", bucket_name(&project, "job"))],
353 autodetect: true,
354 source_format: SourceFormat::Csv,
355 csv_options: Some(CsvOptions {
356 field_delimiter: Some("|".to_string()),
357 encoding: Some("UTF-8".to_string()),
358 skip_leading_rows: Some(0),
359 ..Default::default()
360 }),
361 ..Default::default()
362 });
363
364 let create_result = client.create(&table).await.unwrap();
365 let patch_result = client.patch(&create_result).await.unwrap();
366 let tref = &patch_result.table_reference;
367 let get_result = client
368 .get(tref.project_id.as_str(), tref.dataset_id.as_str(), tref.table_id.as_str())
369 .await
370 .unwrap();
371 assert_eq!(get_result, patch_result);
372
373 client
375 .delete(tref.project_id.as_str(), tref.dataset_id.as_str(), tref.table_id.as_str())
376 .await
377 .unwrap();
378 }
379}