gcp_bigquery_client/
table.rs

1//! Manage BigQuery table
2use std::sync::Arc;
3
4use log::warn;
5use reqwest::Client;
6
7use crate::auth::Authenticator;
8use crate::error::BQError;
9use crate::model::get_iam_policy_request::GetIamPolicyRequest;
10use crate::model::policy::Policy;
11use crate::model::set_iam_policy_request::SetIamPolicyRequest;
12use crate::model::table::Table;
13use crate::model::table_list::TableList;
14use crate::model::test_iam_permissions_request::TestIamPermissionsRequest;
15use crate::model::test_iam_permissions_response::TestIamPermissionsResponse;
16use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
17
18/// A table API handler.
19#[derive(Clone)]
20pub struct TableApi {
21    client: Client,
22    auth: Arc<dyn Authenticator>,
23    base_url: String,
24}
25
26impl TableApi {
27    pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
28        Self {
29            client,
30            auth,
31            base_url: BIG_QUERY_V2_URL.to_string(),
32        }
33    }
34
35    pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
36        self.base_url = base_url;
37        self
38    }
39
40    /// Creates a new, empty table in the dataset.
41    /// # Arguments
42    /// * table - The request body contains an instance of Table.
43    pub async fn create(&self, table: Table) -> Result<Table, BQError> {
44        let req_url = &format!(
45            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables",
46            base_url = self.base_url,
47            project_id = urlencode(&table.table_reference.project_id),
48            dataset_id = urlencode(&table.table_reference.dataset_id)
49        );
50
51        let access_token = self.auth.access_token().await?;
52
53        let request = self
54            .client
55            .post(req_url.as_str())
56            .bearer_auth(access_token)
57            .json(&table)
58            .build()?;
59
60        let response = self.client.execute(request).await?;
61
62        process_response(response).await
63    }
64
65    /// Deletes the table specified by tableId from the dataset. If the table contains data, all the data will be deleted.
66    /// # Arguments
67    /// * project_id - Project ID of the table to delete
68    /// * dataset_id - Dataset ID of the table to delete
69    /// * table_id - Table ID of the table to delete
70    pub async fn delete(&self, project_id: &str, dataset_id: &str, table_id: &str) -> Result<(), BQError> {
71        let req_url = &format!(
72            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
73            base_url = self.base_url,
74            project_id = urlencode(project_id),
75            dataset_id = urlencode(dataset_id),
76            table_id = urlencode(table_id)
77        );
78
79        let access_token = self.auth.access_token().await?;
80
81        let request = self.client.delete(req_url.as_str()).bearer_auth(access_token).build()?;
82
83        let response = self.client.execute(request).await?;
84
85        if response.status().is_success() {
86            Ok(())
87        } else {
88            Err(BQError::ResponseError {
89                error: response.json().await?,
90            })
91        }
92    }
93
94    pub async fn delete_if_exists(&self, project_id: &str, dataset_id: &str, table_id: &str) -> bool {
95        match self.delete(project_id, dataset_id, table_id).await {
96            Err(BQError::ResponseError { error }) => {
97                if error.error.code != 404 {
98                    warn!("table.delete_if_exists: unexpected error: {error:?}");
99                }
100                false
101            }
102            Err(err) => {
103                warn!("table.delete_if_exists: unexpected error: {err:?}");
104                false
105            }
106            Ok(_) => true,
107        }
108    }
109
110    /// Gets the specified table resource by table ID. This method does not return the data in the table, it only
111    /// returns the table resource, which describes the structure of this table.
112    /// # Arguments
113    /// * project_id - Project ID of the table to delete
114    /// * dataset_id - Dataset ID of the table to delete
115    /// * table_id - Table ID of the table to delete
116    /// * selected_fields - tabledata.list of table schema fields to return (comma-separated). If unspecified, all
117    ///   fields are returned. A fieldMask cannot be used here because the fields will automatically be converted from
118    ///   camelCase to snake_case and the conversion will fail if there are underscores. Since these are fields in
119    ///   BigQuery table schemas, underscores are allowed.
120    pub async fn get(
121        &self,
122        project_id: &str,
123        dataset_id: &str,
124        table_id: &str,
125        selected_fields: Option<Vec<&str>>,
126    ) -> Result<Table, BQError> {
127        let req_url = &format!(
128            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
129            base_url = self.base_url,
130            project_id = urlencode(project_id),
131            dataset_id = urlencode(dataset_id),
132            table_id = urlencode(table_id)
133        );
134
135        let access_token = self.auth.access_token().await?;
136
137        let mut request_builder = self.client.get(req_url.as_str()).bearer_auth(access_token);
138        if let Some(selected_fields) = selected_fields {
139            let selected_fields = selected_fields.join(",");
140            request_builder = request_builder.query(&[("selectedFields", selected_fields)]);
141        }
142
143        let request = request_builder.build()?;
144
145        let response = self.client.execute(request).await?;
146
147        process_response(response).await
148    }
149
150    /// Lists all tables in the specified dataset. Requires the READER dataset role.
151    /// # Arguments
152    /// * project_id - Project ID of the table to delete
153    /// * dataset_id - Dataset ID of the table to delete
154    /// * options - Options
155    pub async fn list(&self, project_id: &str, dataset_id: &str, options: ListOptions) -> Result<TableList, BQError> {
156        let req_url = &format!(
157            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables",
158            base_url = self.base_url,
159            project_id = urlencode(project_id),
160            dataset_id = urlencode(dataset_id)
161        );
162
163        let access_token = self.auth.access_token().await?;
164
165        let mut request = self.client.get(req_url).bearer_auth(access_token);
166
167        // process options
168        if let Some(max_results) = options.max_results {
169            request = request.query(&[("maxResults", max_results.to_string())]);
170        }
171        if let Some(page_token) = options.page_token {
172            request = request.query(&[("pageToken", page_token)]);
173        }
174
175        let request = request.build()?;
176        let response = self.client.execute(request).await?;
177
178        process_response(response).await
179    }
180
181    /// Updates information in an existing table. The update method replaces the entire table resource, whereas the
182    /// patch method only replaces fields that are provided in the submitted table resource. This method supports
183    /// RFC5789 patch semantics.
184    /// # Arguments
185    /// * project_id - Project ID of the table to delete
186    /// * dataset_id - Dataset ID of the table to delete
187    /// * table_id - Table ID of the table to delete
188    /// * table - Table to patch
189    pub async fn patch(
190        &self,
191        project_id: &str,
192        dataset_id: &str,
193        table_id: &str,
194        table: Table,
195    ) -> Result<Table, BQError> {
196        let req_url = &format!(
197            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
198            base_url = self.base_url,
199            project_id = urlencode(project_id),
200            dataset_id = urlencode(dataset_id),
201            table_id = urlencode(table_id)
202        );
203
204        let access_token = self.auth.access_token().await?;
205
206        let request = self
207            .client
208            .patch(req_url)
209            .bearer_auth(access_token)
210            .json(&table)
211            .build()?;
212        let response = self.client.execute(request).await?;
213
214        process_response(response).await
215    }
216
217    /// Updates information in an existing table. The update method replaces the entire Table resource, whereas the
218    /// patch method only replaces fields that are provided in the submitted Table resource.
219    /// # Arguments
220    /// * project_id - Project ID of the table to delete
221    /// * dataset_id - Dataset ID of the table to delete
222    /// * table_id - Table ID of the table to delete
223    /// * table - Table to update
224    pub async fn update(
225        &self,
226        project_id: &str,
227        dataset_id: &str,
228        table_id: &str,
229        table: Table,
230    ) -> Result<Table, BQError> {
231        let req_url = &format!(
232            "{base_url}/projects/{project_id}/datasets/{dataset_id}/tables/{table_id}",
233            base_url = self.base_url,
234            project_id = urlencode(project_id),
235            dataset_id = urlencode(dataset_id),
236            table_id = urlencode(table_id)
237        );
238
239        let access_token = self.auth.access_token().await?;
240
241        let request = self
242            .client
243            .put(req_url)
244            .bearer_auth(access_token)
245            .json(&table)
246            .build()?;
247        let response = self.client.execute(request).await?;
248
249        process_response(response).await
250    }
251
252    /// Gets the access control policy for a resource. Returns an empty policy if the resource exists and does not have
253    /// a policy set.
254    /// # Argument
255    /// * `resource` - The resource for which the policy is being requested. See the operation documentation for the
256    ///   appropriate value for this field.
257    pub async fn get_iam_policy(
258        &self,
259        resource: &str,
260        get_iam_policy_request: GetIamPolicyRequest,
261    ) -> Result<Policy, BQError> {
262        let req_url = &format!(
263            "{base_url}/projects/{resource}/:getIamPolicy",
264            base_url = self.base_url,
265            resource = urlencode(resource)
266        );
267
268        let access_token = self.auth.access_token().await?;
269
270        let request = self
271            .client
272            .post(req_url.as_str())
273            .bearer_auth(access_token)
274            .json(&get_iam_policy_request)
275            .build()?;
276
277        let response = self.client.execute(request).await?;
278
279        process_response(response).await
280    }
281
282    /// Sets the access control policy on the specified resource. Replaces any existing policy. Can return `NOT_FOUND`,
283    /// `INVALID_ARGUMENT`, and `PERMISSION_DENIED` errors.
284    /// # Argument
285    /// * `resource` - The resource for which the policy is being specified. See the operation documentation for the appropriate value for this field.
286    pub async fn set_iam_policy(
287        &self,
288        resource: &str,
289        set_iam_policy_request: SetIamPolicyRequest,
290    ) -> Result<Policy, BQError> {
291        let req_url = &format!(
292            "{base_url}/projects/{resource}/:setIamPolicy",
293            base_url = self.base_url,
294            resource = urlencode(resource)
295        );
296
297        let access_token = self.auth.access_token().await?;
298
299        let request = self
300            .client
301            .post(req_url.as_str())
302            .bearer_auth(access_token)
303            .json(&set_iam_policy_request)
304            .build()?;
305
306        let response = self.client.execute(request).await?;
307
308        process_response(response).await
309    }
310
311    /// Returns permissions that a caller has on the specified resource. If the resource does not exist, this will
312    /// return an empty set of permissions, not a `NOT_FOUND` error. Note: This operation is designed to be used for
313    /// building permission-aware UIs and command-line tools, not for authorization checking. This operation may
314    /// \"fail open\" without warning.
315    /// # Argument
316    /// * `resource` - The resource for which the policy detail is being requested. See the operation documentation for
317    ///   the appropriate value for this field.
318    pub async fn test_iam_permissions(
319        &self,
320        resource: &str,
321        test_iam_permissions_request: TestIamPermissionsRequest,
322    ) -> Result<TestIamPermissionsResponse, BQError> {
323        let req_url = &format!(
324            "{base_url}/projects/{resource}/:testIamPermissions",
325            base_url = self.base_url,
326            resource = urlencode(resource)
327        );
328
329        let access_token = self.auth.access_token().await?;
330
331        let request = self
332            .client
333            .post(req_url.as_str())
334            .bearer_auth(access_token)
335            .json(&test_iam_permissions_request)
336            .build()?;
337
338        let response = self.client.execute(request).await?;
339
340        process_response(response).await
341    }
342}
343
344/// A list of options to use with the table API handler.
345#[derive(Default)]
346pub struct ListOptions {
347    max_results: Option<u64>,
348    page_token: Option<String>,
349}
350
351impl ListOptions {
352    /// The maximum number of results to return in a single response page. Leverage the page tokens to iterate
353    /// through the entire collection.
354    pub fn max_results(mut self, value: u64) -> Self {
355        self.max_results = Some(value);
356        self
357    }
358
359    /// Page token, returned by a previous call, to request the next page of results
360    pub fn page_token(mut self, value: String) -> Self {
361        self.page_token = Some(value);
362        self
363    }
364}
365
366#[cfg(test)]
367mod test {
368    use crate::error::BQError;
369    use crate::model::dataset::Dataset;
370    use crate::model::field_type::FieldType;
371    use crate::model::table::Table;
372    use crate::model::table_field_schema::TableFieldSchema;
373    use crate::model::table_schema::TableSchema;
374    use crate::table::ListOptions;
375    use crate::{env_vars, Client};
376    use std::time::{Duration, SystemTime};
377
378    #[tokio::test]
379    async fn test() -> Result<(), BQError> {
380        let (ref project_id, ref dataset_id, ref table_id, ref sa_key) = env_vars();
381        let dataset_id = &format!("{dataset_id}_table");
382
383        let client = Client::from_service_account_key_file(sa_key).await?;
384
385        // Delete the dataset if needed
386        client.dataset().delete_if_exists(project_id, dataset_id, true).await;
387
388        // Create dataset
389        let created_dataset = client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
390        assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
391
392        // Create table
393        let table = Table::new(
394            project_id,
395            dataset_id,
396            table_id,
397            TableSchema::new(vec![
398                TableFieldSchema::new("col1", FieldType::String),
399                TableFieldSchema::new("col2", FieldType::Int64),
400                TableFieldSchema::new("col3", FieldType::Boolean),
401                TableFieldSchema::new("col4", FieldType::Datetime),
402            ]),
403        );
404        let created_table = client
405            .table()
406            .create(
407                table
408                    .description("A table used for unit tests")
409                    .label("owner", "me")
410                    .label("env", "prod")
411                    .expiration_time(SystemTime::now() + Duration::from_secs(3600)),
412            )
413            .await?;
414        assert_eq!(created_table.table_reference.table_id, table_id.to_string());
415
416        let table = client.table().get(project_id, dataset_id, table_id, None).await?;
417        assert_eq!(table.table_reference.table_id, table_id.to_string());
418
419        let table = client.table().update(project_id, dataset_id, table_id, table).await?;
420        assert_eq!(table.table_reference.table_id, table_id.to_string());
421
422        let table = client.table().patch(project_id, dataset_id, table_id, table).await?;
423        assert_eq!(table.table_reference.table_id, table_id.to_string());
424
425        // List tables
426        let tables = client
427            .table()
428            .list(project_id, dataset_id, ListOptions::default())
429            .await?;
430        let mut created_table_found = false;
431        for table_list_tables in tables.tables.unwrap().iter() {
432            if &table_list_tables.table_reference.dataset_id == dataset_id {
433                created_table_found = true;
434            }
435        }
436        assert!(created_table_found);
437
438        client.table().delete(project_id, dataset_id, table_id).await?;
439
440        // Delete dataset
441        client.dataset().delete(project_id, dataset_id, true).await?;
442
443        Ok(())
444    }
445}