gcp_bigquery_client/
dataset.rs

1//! Manage BigQuery dataset.
2use std::sync::Arc;
3
4use log::warn;
5use reqwest::Client;
6
7use crate::auth::Authenticator;
8use crate::error::BQError;
9use crate::model::dataset::Dataset;
10use crate::model::datasets::Datasets;
11use crate::model::information_schema::schemata::Schemata;
12use crate::model::query_request::QueryRequest;
13use crate::model::query_response::{QueryResponse, ResultSet};
14use crate::{process_response, urlencode, BIG_QUERY_V2_URL};
15
16/// A dataset API handler.
17#[derive(Clone)]
18pub struct DatasetApi {
19    client: Client,
20    auth: Arc<dyn Authenticator>,
21    base_url: String,
22}
23
24impl DatasetApi {
25    pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
26        Self {
27            client,
28            auth,
29            base_url: BIG_QUERY_V2_URL.to_string(),
30        }
31    }
32
33    pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
34        self.base_url = base_url;
35        self
36    }
37
38    /// Creates a new empty dataset.
39    /// # Argument
40    /// * `dataset` - The dataset to create
41    ///
42    /// # Example
43    /// ```
44    /// # use gcp_bigquery_client::{Client, env_vars};
45    /// # use gcp_bigquery_client::model::dataset::Dataset;
46    /// # use gcp_bigquery_client::error::BQError;
47    ///
48    /// # async fn run() -> Result<(), BQError> {
49    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
50    /// let dataset_id = &format!("{}_dataset", dataset_id);
51    ///
52    /// let client = Client::from_service_account_key_file(sa_key).await?;
53    ///
54    /// # client.dataset().delete_if_exists(project_id, dataset_id, true);
55    /// client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
56    /// # Ok(())
57    /// # }
58    /// ```
59    pub async fn create(&self, dataset: Dataset) -> Result<Dataset, BQError> {
60        let req_url = &format!(
61            "{base_url}/projects/{project_id}/datasets",
62            base_url = self.base_url,
63            project_id = urlencode(&dataset.dataset_reference.project_id)
64        );
65
66        let access_token = self.auth.access_token().await?;
67
68        let request = self
69            .client
70            .post(req_url.as_str())
71            .bearer_auth(access_token)
72            .json(&dataset)
73            .build()?;
74
75        let response = self.client.execute(request).await?;
76
77        process_response(response).await
78    }
79
80    /// Lists all datasets in the specified project to which the user has been granted the READER dataset role.
81    /// # Arguments
82    /// * `project_id` - Project ID of the datasets to be listed
83    /// * `options` - Options
84    ///
85    /// # Example
86    /// ```
87    /// # use gcp_bigquery_client::{Client, env_vars};
88    /// # use gcp_bigquery_client::model::dataset::Dataset;
89    /// # use gcp_bigquery_client::error::BQError;
90    /// # use gcp_bigquery_client::dataset::ListOptions;
91    ///
92    /// # async fn run() -> Result<(), BQError> {
93    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
94    /// let dataset_id = &format!("{}_dataset", dataset_id);
95    ///
96    /// let client = Client::from_service_account_key_file(sa_key).await?;
97    ///
98    /// let datasets = client.dataset().list(project_id, ListOptions::default().all(true)).await?;
99    /// for dataset in datasets.datasets.iter() {
100    ///     // Do some stuff
101    /// }
102    /// # Ok(())
103    /// # }
104    /// ```
105    pub async fn list(&self, project_id: &str, options: ListOptions) -> Result<Datasets, BQError> {
106        let req_url = &format!(
107            "{base_url}/projects/{project_id}/datasets",
108            base_url = self.base_url,
109            project_id = urlencode(project_id)
110        );
111
112        let access_token = self.auth.access_token().await?;
113
114        let mut request = self.client.get(req_url).bearer_auth(access_token);
115
116        // process options
117        if let Some(max_results) = options.max_results {
118            request = request.query(&[("maxResults", max_results.to_string())]);
119        }
120        if let Some(page_token) = options.page_token {
121            request = request.query(&[("pageToken", page_token)]);
122        }
123        if let Some(all) = options.all {
124            request = request.query(&[("all", all.to_string())]);
125        }
126        if let Some(filter) = options.filter {
127            request = request.query(&[("filter", filter)]);
128        }
129
130        let request = request.build()?;
131        let response = self.client.execute(request).await?;
132
133        process_response(response).await
134    }
135
136    /// Deletes the dataset specified by the datasetId value. Before you can delete a dataset, you must delete all its
137    /// tables, either manually or by specifying deleteContents. Immediately after deletion, you can create another
138    /// dataset with the same name.
139    /// # Arguments
140    /// * `project_id` - Project ID of the dataset being deleted
141    /// * `dataset_id` - Dataset ID of dataset being deleted
142    /// * `delete_contents` - If True, delete all the tables in the dataset. If False and the dataset contains tables, the request will fail. Default is False
143    ///
144    /// # Example
145    /// ```
146    /// # use gcp_bigquery_client::{Client, env_vars};
147    /// # use gcp_bigquery_client::model::dataset::Dataset;
148    /// # use gcp_bigquery_client::error::BQError;
149    /// # use gcp_bigquery_client::dataset::ListOptions;
150    ///
151    /// # async fn run() -> Result<(), BQError> {
152    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
153    /// let dataset_id = &format!("{}_dataset", dataset_id);
154    ///
155    /// let client = Client::from_service_account_key_file(sa_key).await?;
156    ///
157    /// # client.dataset().delete_if_exists(project_id, dataset_id, true);
158    /// client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
159    /// client.dataset().delete(project_id, dataset_id, true).await?;
160    /// # Ok(())
161    /// # }
162    /// ```
163    pub async fn delete(&self, project_id: &str, dataset_id: &str, delete_contents: bool) -> Result<(), BQError> {
164        let req_url = &format!(
165            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
166            base_url = self.base_url,
167            project_id = urlencode(project_id),
168            dataset_id = urlencode(dataset_id)
169        );
170
171        let access_token = self.auth.access_token().await?;
172
173        let request = self
174            .client
175            .delete(req_url)
176            .bearer_auth(access_token)
177            .query(&[("deleteContents", delete_contents.to_string())])
178            .build()?;
179        let response = self.client.execute(request).await?;
180
181        if response.status().is_success() {
182            Ok(())
183        } else {
184            Err(BQError::ResponseError {
185                error: response.json().await?,
186            })
187        }
188    }
189
190    /// Deletes the dataset specified by the datasetId value and returns true or returs false when
191    /// the dataset doesn't exist. Before you can delete a dataset, you must delete all its
192    /// tables, either manually or by specifying deleteContents. Immediately after deletion, you can create another
193    /// dataset with the same name.
194    /// # Arguments
195    /// * `project_id` - Project ID of the dataset being deleted
196    /// * `dataset_id` - Dataset ID of dataset being deleted
197    /// * `delete_contents` - If True, delete all the tables in the dataset. If False and the dataset contains tables, the request will fail. Default is False
198    ///
199    /// # Example
200    /// ```
201    /// # use gcp_bigquery_client::{Client, env_vars};
202    /// # use gcp_bigquery_client::model::dataset::Dataset;
203    /// # use gcp_bigquery_client::error::BQError;
204    /// # use gcp_bigquery_client::dataset::ListOptions;
205    ///
206    /// # async fn run() -> Result<(), BQError> {
207    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
208    /// let dataset_id = &format!("{}_dataset", dataset_id);
209    ///
210    /// let client = Client::from_service_account_key_file(sa_key).await?;
211    ///
212    /// client.dataset().delete_if_exists(project_id, dataset_id, true);
213    /// # Ok(())
214    /// # }
215    /// ```
216    pub async fn delete_if_exists(&self, project_id: &str, dataset_id: &str, delete_contents: bool) -> bool {
217        match self.delete(project_id, dataset_id, delete_contents).await {
218            Err(BQError::ResponseError { error }) => {
219                if error.error.code != 404 {
220                    warn!("dataset.delete_if_exists: unexpected error: {:?}", error);
221                }
222                false
223            }
224            Err(err) => {
225                warn!("dataset.delete_if_exists: unexpected error: {:?}", err);
226                false
227            }
228            Ok(_) => true,
229        }
230    }
231
232    /// Returns the dataset specified by datasetID.
233    /// # Arguments
234    /// * `project_id` - Project ID of the requested dataset
235    /// * `dataset_id` - Dataset ID of the requested dataset
236    ///
237    /// # Example
238    /// ```
239    /// # use gcp_bigquery_client::{Client, env_vars};
240    /// # use gcp_bigquery_client::model::dataset::Dataset;
241    /// # use gcp_bigquery_client::error::BQError;
242    /// # use gcp_bigquery_client::dataset::ListOptions;
243    ///
244    /// # async fn run() -> Result<(), BQError> {
245    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
246    /// let dataset_id = &format!("{}_dataset", dataset_id);
247    ///
248    /// let client = Client::from_service_account_key_file(sa_key).await?;
249    ///
250    /// # client.dataset().delete_if_exists(project_id, dataset_id, true);
251    /// client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
252    /// let dataset = client.dataset().get(project_id, dataset_id).await?;
253    /// # Ok(())
254    /// # }
255    /// ```
256    pub async fn get(&self, project_id: &str, dataset_id: &str) -> Result<Dataset, BQError> {
257        let req_url = &format!(
258            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
259            base_url = self.base_url,
260            project_id = urlencode(project_id),
261            dataset_id = urlencode(dataset_id)
262        );
263
264        let access_token = self.auth.access_token().await?;
265
266        let request = self.client.get(req_url).bearer_auth(access_token).build()?;
267        let response = self.client.execute(request).await?;
268
269        process_response(response).await
270    }
271
272    /// Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the
273    /// patch method only replaces fields that are provided in the submitted dataset resource. This method supports
274    /// patch semantics.
275    /// # Arguments
276    /// * dataset - The request body contains an instance of Dataset.
277    pub async fn patch(&self, project_id: &str, dataset_id: &str, dataset: Dataset) -> Result<Dataset, BQError> {
278        let req_url = &format!(
279            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
280            base_url = self.base_url,
281            project_id = urlencode(project_id),
282            dataset_id = urlencode(dataset_id)
283        );
284
285        let access_token = self.auth.access_token().await?;
286
287        let request = self
288            .client
289            .patch(req_url)
290            .bearer_auth(access_token)
291            .json(&dataset)
292            .build()?;
293        let response = self.client.execute(request).await?;
294
295        process_response(response).await
296    }
297
298    /// Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the
299    /// patch method only replaces fields that are provided in the submitted dataset resource.
300    /// # Arguments
301    /// * dataset - The request body contains an instance of Dataset.
302    pub async fn update(&self, project_id: &str, dataset_id: &str, dataset: Dataset) -> Result<Dataset, BQError> {
303        let req_url = &format!(
304            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
305            base_url = self.base_url,
306            project_id = urlencode(project_id),
307            dataset_id = urlencode(dataset_id)
308        );
309
310        let access_token = self.auth.access_token().await?;
311
312        let request = self
313            .client
314            .put(req_url)
315            .bearer_auth(access_token)
316            .json(&dataset)
317            .build()?;
318        let response = self.client.execute(request).await?;
319
320        process_response(response).await
321    }
322
323    pub async fn information_schema_schemata(&self, project_id: &str, region: &str) -> Result<Vec<Schemata>, BQError> {
324        let req_url = format!(
325            "{base_url}/projects/{project_id}/queries",
326            base_url = self.base_url,
327            project_id = urlencode(project_id)
328        );
329
330        let access_token = self.auth.access_token().await?;
331        let query_request = QueryRequest::new(format!("SELECT * FROM {region}.INFORMATION_SCHEMA.SCHEMATA"));
332
333        let request = self
334            .client
335            .post(req_url.as_str())
336            .bearer_auth(access_token)
337            .json(&query_request)
338            .build()?;
339
340        let resp = self.client.execute(request).await?;
341
342        let query_response: QueryResponse = process_response(resp).await?;
343        let mut rs = ResultSet::new_from_query_response(query_response);
344        let mut result = vec![];
345        let catalog_name_pos = *rs
346            .column_index("catalog_name")
347            .expect("The catalog_name column is expected");
348        let schema_name_pos = *rs
349            .column_index("schema_name")
350            .expect("The schema_name column is expected");
351        let schema_owner_pos = *rs
352            .column_index("schema_owner")
353            .expect("The schema_owner column is expected");
354        let creation_time_pos = *rs
355            .column_index("creation_time")
356            .expect("The creation_time column is expected");
357        let last_modified_time_pos = *rs
358            .column_index("last_modified_time")
359            .expect("The last_modified_time column is expected");
360        let location_pos = *rs.column_index("location").expect("The location column is expected");
361
362        while rs.next_row() {
363            result.push(Schemata {
364                catalog_name: rs.get_string(catalog_name_pos)?.expect("A catalog name is expected"),
365                schema_name: rs.get_string(schema_name_pos)?.expect("A schema_name is expected"),
366                schema_owner: rs.get_string(schema_owner_pos)?,
367                creation_time: rs.get_string(creation_time_pos)?.expect("A creation_time is expected"),
368                last_modified_time: rs
369                    .get_string(last_modified_time_pos)?
370                    .expect("A last_modified_time is expected"),
371                location: rs.get_string(location_pos)?.expect("A location is expected"),
372            });
373        }
374
375        // ToDo page token, max result, process timestamp
376
377        Ok(result)
378    }
379}
380
381/// A list of options used to create a dataset API handler.
382#[derive(Default)]
383pub struct ListOptions {
384    max_results: Option<u64>,
385    page_token: Option<String>,
386    all: Option<bool>,
387    filter: Option<String>,
388}
389
390impl ListOptions {
391    /// The maximum number of results to return in a single response page. Leverage the page tokens to iterate through
392    /// the entire collection.
393    pub fn max_results(mut self, value: u64) -> Self {
394        self.max_results = Some(value);
395        self
396    }
397
398    /// Page token, returned by a previous call, to request the next page of results
399    pub fn page_token(mut self, value: String) -> Self {
400        self.page_token = Some(value);
401        self
402    }
403
404    /// Whether to list all datasets, including hidden ones
405    pub fn all(mut self, value: bool) -> Self {
406        self.all = Some(value);
407        self
408    }
409
410    /// An expression for filtering the results of the request by label. The syntax is "labels.<name>[:<value>]".
411    /// Multiple filters can be ANDed together by connecting with a space. Example: "labels.department:receiving
412    /// labels.active". See Filtering datasets using labels for details.
413    pub fn filter(mut self, value: String) -> Self {
414        self.filter = Some(value);
415        self
416    }
417}
418
419#[cfg(test)]
420mod test {
421    use crate::dataset::ListOptions;
422    use crate::error::BQError;
423    use crate::model::dataset::Dataset;
424    use crate::{env_vars, Client};
425
426    #[tokio::test]
427    async fn test() -> Result<(), BQError> {
428        let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
429        let dataset_id = &format!("{dataset_id}_dataset");
430
431        let client = Client::from_service_account_key_file(sa_key).await?;
432
433        // Delete the dataset if needed
434        let result = client.dataset().delete(project_id, dataset_id, true).await;
435        if result.is_ok() {
436            println!("Removed previous dataset '{dataset_id}'");
437        }
438
439        // Create dataset
440        let created_dataset = client
441            .dataset()
442            .create(
443                Dataset::new(project_id, dataset_id)
444                    .friendly_name("A dataset used for unit tests")
445                    .location("US")
446                    .label("owner", "me")
447                    .label("env", "prod"),
448            )
449            .await?;
450        assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));
451
452        // Get dataset
453        let dataset = client.dataset().get(project_id, dataset_id).await?;
454        assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));
455
456        // Patch dataset
457        let dataset = client.dataset().patch(project_id, dataset_id, dataset).await?;
458        assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));
459
460        // Update dataset
461        let dataset = client.dataset().update(project_id, dataset_id, dataset).await?;
462        assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));
463
464        // List datasets
465        let datasets = client
466            .dataset()
467            .list(project_id, ListOptions::default().all(true))
468            .await?;
469        let mut created_dataset_found = false;
470        for dataset in datasets.datasets.iter() {
471            if dataset.dataset_reference.dataset_id == *dataset_id {
472                created_dataset_found = true;
473            }
474        }
475        assert!(created_dataset_found);
476
477        // Delete dataset
478        client.dataset().delete(project_id, dataset_id, true).await?;
479
480        Ok(())
481    }
482
483    #[tokio::test]
484    async fn test_information_schema() -> Result<(), BQError> {
485        let (ref project_id, ref _dataset_id, ref _table_id, ref sa_key) = env_vars();
486        //let dataset_id = &format!("{}_dataset", dataset_id);
487
488        let client = Client::from_service_account_key_file(sa_key).await?;
489
490        let result = client
491            .dataset()
492            .information_schema_schemata(project_id, "region-us")
493            .await?;
494        dbg!(result);
495        Ok(())
496    }
497}