google_cloud_bigquery/http/
bigquery_table_client.rs

1use std::sync::Arc;
2
3use crate::http::bigquery_client::BigqueryClient;
4use crate::http::error::Error;
5use crate::http::table;
6use crate::http::table::get_iam_policy::GetIamPolicyRequest;
7use crate::http::table::list::{ListTablesRequest, ListTablesResponse, TableOverview};
8use crate::http::table::set_iam_policy::SetIamPolicyRequest;
9use crate::http::table::test_iam_permissions::{TestIamPermissionsRequest, TestIamPermissionsResponse};
10use crate::http::table::Table;
11use crate::http::types::Policy;
12
13#[derive(Debug, Clone)]
14pub struct BigqueryTableClient {
15    inner: Arc<BigqueryClient>,
16}
17
18impl BigqueryTableClient {
19    pub fn new(inner: Arc<BigqueryClient>) -> Self {
20        Self { inner }
21    }
22
23    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/create
24    /// ```rust
25    /// use google_cloud_bigquery::http::bigquery_table_client::BigqueryTableClient;
26    /// use google_cloud_bigquery::http::error::Error;
27    /// use google_cloud_bigquery::http::table::{Table, TableFieldSchema, TableFieldType, TableReference, TableSchema};
28    ///
29    /// async fn run(client: BigqueryTableClient) -> Result<Table, Error> {
30    ///     let table = Table {
31    ///         table_reference: TableReference {
32    ///             project_id: "project".to_string(),
33    ///             dataset_id: "dataset".to_string(),
34    ///             table_id: "table".to_string()
35    ///         },
36    ///         schema: Some(TableSchema {
37    ///             fields: vec![
38    ///                 TableFieldSchema {
39    ///                     name: "col1".to_string(),
40    ///                     data_type: TableFieldType::String,
41    ///                     max_length: Some(32),
42    ///                     ..Default::default()
43    ///                 }
44    ///             ]
45    ///         }),
46    ///         ..Default::default()
47    ///     };
48    ///     client.create(&table).await
49    /// }
50    /// ```
51    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
52    pub async fn create(&self, metadata: &Table) -> Result<Table, Error> {
53        let builder = table::insert::build(self.inner.endpoint(), self.inner.http(), metadata);
54        self.inner.send(builder).await
55    }
56
57    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/delete
58    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
59    pub async fn delete(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<(), Error> {
60        let builder = table::delete::build(self.inner.endpoint(), self.inner.http(), project_id, dataset_id, table_id);
61        self.inner.send_get_empty(builder).await
62    }
63
64    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/patch
65    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
66    pub async fn patch(&self, metadata: &Table) -> Result<Table, Error> {
67        let builder = table::patch::build(self.inner.endpoint(), self.inner.http(), metadata);
68        self.inner.send(builder).await
69    }
70
71    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get
72    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
73    pub async fn get(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<Table, Error> {
74        let builder = table::get::build(self.inner.endpoint(), self.inner.http(), project_id, dataset_id, table_id);
75        self.inner.send(builder).await
76    }
77
78    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get_iam_policy
79    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
80    pub async fn get_iam_policy(
81        &self,
82        project_id: &str,
83        dataset_id: &str,
84        table_id: &str,
85        req: &GetIamPolicyRequest,
86    ) -> Result<Policy, Error> {
87        let builder = table::get_iam_policy::build(
88            self.inner.endpoint(),
89            self.inner.http(),
90            project_id,
91            dataset_id,
92            table_id,
93            req,
94        );
95        self.inner.send(builder).await
96    }
97
98    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/set_iam_policy
99    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
100    pub async fn set_iam_policy(
101        &self,
102        project_id: &str,
103        dataset_id: &str,
104        table_id: &str,
105        req: &SetIamPolicyRequest,
106    ) -> Result<Policy, Error> {
107        let builder = table::set_iam_policy::build(
108            self.inner.endpoint(),
109            self.inner.http(),
110            project_id,
111            dataset_id,
112            table_id,
113            req,
114        );
115        self.inner.send(builder).await
116    }
117
118    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/test_iam_policy
119    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
120    pub async fn test_iam_permissions(
121        &self,
122        project_id: &str,
123        dataset_id: &str,
124        table_id: &str,
125        req: &TestIamPermissionsRequest,
126    ) -> Result<TestIamPermissionsResponse, Error> {
127        let builder = table::test_iam_permissions::build(
128            self.inner.endpoint(),
129            self.inner.http(),
130            project_id,
131            dataset_id,
132            table_id,
133            req,
134        );
135        self.inner.send(builder).await
136    }
137
138    /// https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
139    #[cfg_attr(feature = "trace", tracing::instrument(skip_all))]
140    pub async fn list(
141        &self,
142        project_id: &str,
143        dataset_id: &str,
144        req: &ListTablesRequest,
145    ) -> Result<Vec<TableOverview>, Error> {
146        let mut page_token: Option<String> = None;
147        let mut tables = vec![];
148        loop {
149            let builder = table::list::build(
150                self.inner.endpoint(),
151                self.inner.http(),
152                project_id,
153                dataset_id,
154                req,
155                page_token,
156            );
157            let response: ListTablesResponse = self.inner.send(builder).await?;
158            tables.extend(response.tables);
159            if response.next_page_token.is_none() {
160                break;
161            }
162            page_token = response.next_page_token;
163        }
164        Ok(tables)
165    }
166}
167
168#[cfg(test)]
169mod test {
170    use std::ops::Add;
171    use std::sync::Arc;
172
173    use serial_test::serial;
174    use time::OffsetDateTime;
175
176    use crate::http::bigquery_client::test::{bucket_name, create_client, dataset_name};
177    use crate::http::bigquery_table_client::BigqueryTableClient;
178    use crate::http::table::get_iam_policy::GetIamPolicyRequest;
179    use crate::http::table::list::ListTablesRequest;
180    use crate::http::table::set_iam_policy::SetIamPolicyRequest;
181    use crate::http::table::{
182        Clustering, CsvOptions, ExternalDataConfiguration, MaterializedViewDefinition, PartitionRange,
183        RangePartitioning, RoundingMode, SourceFormat, Table, TableFieldMode, TableFieldSchema, TableFieldType,
184        TableSchema, TimePartitionType, TimePartitioning, ViewDefinition,
185    };
186    use crate::http::types::{Bindings, Policy};
187
188    #[tokio::test]
189    #[serial]
190    pub async fn crud_table() {
191        let dataset = dataset_name("table");
192        let (client, project) = create_client().await;
193        let client = BigqueryTableClient::new(Arc::new(client));
194
195        // empty
196        let mut table1 = Table::default();
197        table1.table_reference.dataset_id = dataset.to_string();
198        table1.table_reference.project_id = project.to_string();
199        table1.table_reference.table_id = "table1".to_string();
200        table1.schema = Some(TableSchema {
201            fields: vec![
202                TableFieldSchema {
203                    name: "col1".to_string(),
204                    data_type: TableFieldType::String,
205                    description: Some("column1".to_string()),
206                    max_length: Some(32),
207                    ..Default::default()
208                },
209                TableFieldSchema {
210                    name: "col2".to_string(),
211                    data_type: TableFieldType::Numeric,
212                    description: Some("column2".to_string()),
213                    precision: Some(10),
214                    rounding_mode: Some(RoundingMode::RoundHalfEven),
215                    scale: Some(2),
216                    ..Default::default()
217                },
218                TableFieldSchema {
219                    name: "col3".to_string(),
220                    data_type: TableFieldType::Timestamp,
221                    mode: Some(TableFieldMode::Required),
222                    default_value_expression: Some("CURRENT_TIMESTAMP".to_string()),
223                    ..Default::default()
224                },
225                TableFieldSchema {
226                    name: "col4".to_string(),
227                    data_type: TableFieldType::Int64,
228                    mode: Some(TableFieldMode::Repeated),
229                    ..Default::default()
230                },
231                TableFieldSchema {
232                    name: "col5".to_string(),
233                    data_type: TableFieldType::Int64,
234                    ..Default::default()
235                },
236            ],
237        });
238        let table1 = client.create(&table1).await.unwrap();
239
240        // iam
241        let ref1 = &table1.table_reference;
242        let policy = client
243            .set_iam_policy(
244                &ref1.project_id,
245                &ref1.dataset_id,
246                &ref1.table_id,
247                &SetIamPolicyRequest {
248                    policy: Policy {
249                        bindings: vec![Bindings {
250                            role: "roles/viewer".to_string(),
251                            members: vec!["allAuthenticatedUsers".to_string()],
252                            ..Default::default()
253                        }],
254                        ..Default::default()
255                    },
256                    ..Default::default()
257                },
258            )
259            .await
260            .unwrap();
261        let actual_policy = client
262            .get_iam_policy(
263                &ref1.project_id,
264                &ref1.dataset_id,
265                &ref1.table_id,
266                &GetIamPolicyRequest::default(),
267            )
268            .await
269            .unwrap();
270        assert_eq!(policy, actual_policy);
271
272        let mut view = Table::default();
273        view.table_reference.dataset_id = table1.table_reference.dataset_id.to_string();
274        view.table_reference.project_id = table1.table_reference.project_id.to_string();
275        view.table_reference.table_id = "view1".to_string();
276        view.view = Some(ViewDefinition {
277            query: format!("SELECT col1 FROM {}.table1", dataset),
278            ..Default::default()
279        });
280        let _view = client.create(&view).await.unwrap();
281
282        // range partition
283        let mut table2 = table1.clone();
284        table2.table_reference.table_id = "range_partition".to_string();
285        table2.range_partitioning = Some(RangePartitioning {
286            field: "col5".to_string(),
287            range: PartitionRange {
288                start: "1".to_string(),
289                end: "10000".to_string(),
290                interval: "1".to_string(),
291            },
292        });
293        table2.expiration_time = Some(OffsetDateTime::now_utc().add(time::Duration::days(1)).unix_timestamp() * 1000);
294        let _table2 = client.create(&table2).await.unwrap();
295
296        // time partition
297        let mut table3 = table1.clone();
298        table3.table_reference.table_id = "time_partition".to_string();
299        table3.time_partitioning = Some(TimePartitioning {
300            partition_type: TimePartitionType::Day,
301            expiration_ms: Some(3600000),
302            field: Some("col3".to_string()),
303        });
304        table3.clustering = Some(Clustering {
305            fields: vec!["col1".to_string(), "col5".to_string()],
306        });
307        let _table3 = client.create(&table3).await.unwrap();
308
309        // materialized view
310        let mut mv = Table::default();
311        mv.table_reference.dataset_id = table1.table_reference.dataset_id.to_string();
312        mv.table_reference.project_id = table1.table_reference.project_id.to_string();
313        mv.table_reference.table_id = "materialized_view1".to_string();
314        mv.materialized_view = Some(MaterializedViewDefinition {
315            query: format!("SELECT col2 FROM {}.table1", dataset),
316            refresh_interval_ms: Some(3600000),
317            ..Default::default()
318        });
319        let _mv = client.create(&mv).await.unwrap();
320
321        // delete
322        let tables = client
323            .list(
324                project.as_str(),
325                &table1.table_reference.dataset_id,
326                &ListTablesRequest::default(),
327            )
328            .await
329            .unwrap();
330        for table in tables {
331            let table = table.table_reference;
332            client
333                .delete(table.project_id.as_str(), table.dataset_id.as_str(), table.table_id.as_str())
334                .await
335                .unwrap();
336        }
337    }
338
339    #[tokio::test]
340    #[serial]
341    pub async fn external_table() {
342        let dataset = dataset_name("table");
343        let (client, project) = create_client().await;
344        let client = BigqueryTableClient::new(Arc::new(client));
345
346        // CSV
347        let mut table = Table::default();
348        table.table_reference.dataset_id = dataset.to_string();
349        table.table_reference.project_id = project.to_string();
350        table.table_reference.table_id = format!("external_data_{}", OffsetDateTime::now_utc().unix_timestamp());
351        table.external_data_configuration = Some(ExternalDataConfiguration {
352            source_uris: vec![format!("gs://{}/external_data.csv", bucket_name(&project, "job"))],
353            autodetect: true,
354            source_format: SourceFormat::Csv,
355            csv_options: Some(CsvOptions {
356                field_delimiter: Some("|".to_string()),
357                encoding: Some("UTF-8".to_string()),
358                skip_leading_rows: Some(0),
359                ..Default::default()
360            }),
361            ..Default::default()
362        });
363
364        let create_result = client.create(&table).await.unwrap();
365        let patch_result = client.patch(&create_result).await.unwrap();
366        let tref = &patch_result.table_reference;
367        let get_result = client
368            .get(tref.project_id.as_str(), tref.dataset_id.as_str(), tref.table_id.as_str())
369            .await
370            .unwrap();
371        assert_eq!(get_result, patch_result);
372
373        // cleanup
374        client
375            .delete(tref.project_id.as_str(), tref.dataset_id.as_str(), tref.table_id.as_str())
376            .await
377            .unwrap();
378    }
379}