gcp-bigquery-client 0.20.0

An ergonomic async client library for GCP BigQuery.
Documentation
//! Manage BigQuery dataset.
use std::sync::Arc;

use log::warn;
use reqwest::Client;

use crate::auth::Authenticator;
use crate::error::BQError;
use crate::model::dataset::Dataset;
use crate::model::datasets::Datasets;
use crate::model::information_schema::schemata::Schemata;
use crate::model::query_request::QueryRequest;
use crate::model::query_response::{QueryResponse, ResultSet};
use crate::{process_response, urlencode, BIG_QUERY_V2_URL};

/// A dataset API handler.
#[derive(Clone)]
pub struct DatasetApi {
    client: Client,
    auth: Arc<dyn Authenticator>,
    base_url: String,
}

impl DatasetApi {
    pub(crate) fn new(client: Client, auth: Arc<dyn Authenticator>) -> Self {
        Self {
            client,
            auth,
            base_url: BIG_QUERY_V2_URL.to_string(),
        }
    }

    pub(crate) fn with_base_url(&mut self, base_url: String) -> &mut Self {
        self.base_url = base_url;
        self
    }

    /// Creates a new empty dataset.
    /// # Argument
    /// * `dataset` - The dataset to create
    ///
    /// # Example
    /// ```
    /// # use gcp_bigquery_client::{Client, env_vars};
    /// # use gcp_bigquery_client::model::dataset::Dataset;
    /// # use gcp_bigquery_client::error::BQError;
    ///
    /// # async fn run() -> Result<(), BQError> {
    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
    /// let dataset_id = &format!("{}_dataset", dataset_id);
    ///
    /// let client = Client::from_service_account_key_file(sa_key).await?;
    ///
    /// # client.dataset().delete_if_exists(project_id, dataset_id, true);
    /// client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn create(&self, dataset: Dataset) -> Result<Dataset, BQError> {
        let req_url = &format!(
            "{base_url}/projects/{project_id}/datasets",
            base_url = self.base_url,
            project_id = urlencode(&dataset.dataset_reference.project_id)
        );

        let access_token = self.auth.access_token().await?;

        let request = self
            .client
            .post(req_url.as_str())
            .bearer_auth(access_token)
            .json(&dataset)
            .build()?;

        let response = self.client.execute(request).await?;

        process_response(response).await
    }

    /// Lists all datasets in the specified project to which the user has been granted the READER dataset role.
    /// # Arguments
    /// * `project_id` - Project ID of the datasets to be listed
    /// * `options` - Options
    ///
    /// # Example
    /// ```
    /// # use gcp_bigquery_client::{Client, env_vars};
    /// # use gcp_bigquery_client::model::dataset::Dataset;
    /// # use gcp_bigquery_client::error::BQError;
    /// # use gcp_bigquery_client::dataset::ListOptions;
    ///
    /// # async fn run() -> Result<(), BQError> {
    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
    /// let dataset_id = &format!("{}_dataset", dataset_id);
    ///
    /// let client = Client::from_service_account_key_file(sa_key).await?;
    ///
    /// let datasets = client.dataset().list(project_id, ListOptions::default().all(true)).await?;
    /// for dataset in datasets.datasets.iter() {
    ///     // Do some stuff
    /// }
    /// # Ok(())
    /// # }
    /// ```
    pub async fn list(&self, project_id: &str, options: ListOptions) -> Result<Datasets, BQError> {
        let req_url = &format!(
            "{base_url}/projects/{project_id}/datasets",
            base_url = self.base_url,
            project_id = urlencode(project_id)
        );

        let access_token = self.auth.access_token().await?;

        let mut request = self.client.get(req_url).bearer_auth(access_token);

        // process options
        if let Some(max_results) = options.max_results {
            request = request.query(&[("maxResults", max_results.to_string())]);
        }
        if let Some(page_token) = options.page_token {
            request = request.query(&[("pageToken", page_token)]);
        }
        if let Some(all) = options.all {
            request = request.query(&[("all", all.to_string())]);
        }
        if let Some(filter) = options.filter {
            request = request.query(&[("filter", filter)]);
        }

        let request = request.build()?;
        let response = self.client.execute(request).await?;

        process_response(response).await
    }

    /// Deletes the dataset specified by the datasetId value. Before you can delete a dataset, you must delete all its
    /// tables, either manually or by specifying deleteContents. Immediately after deletion, you can create another
    /// dataset with the same name.
    /// # Arguments
    /// * `project_id` - Project ID of the dataset being deleted
    /// * `dataset_id` - Dataset ID of dataset being deleted
    /// * `delete_contents` - If True, delete all the tables in the dataset. If False and the dataset contains tables, the request will fail. Default is False
    ///
    /// # Example
    /// ```
    /// # use gcp_bigquery_client::{Client, env_vars};
    /// # use gcp_bigquery_client::model::dataset::Dataset;
    /// # use gcp_bigquery_client::error::BQError;
    /// # use gcp_bigquery_client::dataset::ListOptions;
    ///
    /// # async fn run() -> Result<(), BQError> {
    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
    /// let dataset_id = &format!("{}_dataset", dataset_id);
    ///
    /// let client = Client::from_service_account_key_file(sa_key).await?;
    ///
    /// # client.dataset().delete_if_exists(project_id, dataset_id, true);
    /// client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
    /// client.dataset().delete(project_id, dataset_id, true).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn delete(&self, project_id: &str, dataset_id: &str, delete_contents: bool) -> Result<(), BQError> {
        let req_url = &format!(
            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
            base_url = self.base_url,
            project_id = urlencode(project_id),
            dataset_id = urlencode(dataset_id)
        );

        let access_token = self.auth.access_token().await?;

        let request = self
            .client
            .delete(req_url)
            .bearer_auth(access_token)
            .query(&[("deleteContents", delete_contents.to_string())])
            .build()?;
        let response = self.client.execute(request).await?;

        if response.status().is_success() {
            Ok(())
        } else {
            Err(BQError::ResponseError {
                error: response.json().await?,
            })
        }
    }

    /// Deletes the dataset specified by the datasetId value and returns true or returs false when
    /// the dataset doesn't exist. Before you can delete a dataset, you must delete all its
    /// tables, either manually or by specifying deleteContents. Immediately after deletion, you can create another
    /// dataset with the same name.
    /// # Arguments
    /// * `project_id` - Project ID of the dataset being deleted
    /// * `dataset_id` - Dataset ID of dataset being deleted
    /// * `delete_contents` - If True, delete all the tables in the dataset. If False and the dataset contains tables, the request will fail. Default is False
    ///
    /// # Example
    /// ```
    /// # use gcp_bigquery_client::{Client, env_vars};
    /// # use gcp_bigquery_client::model::dataset::Dataset;
    /// # use gcp_bigquery_client::error::BQError;
    /// # use gcp_bigquery_client::dataset::ListOptions;
    ///
    /// # async fn run() -> Result<(), BQError> {
    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
    /// let dataset_id = &format!("{}_dataset", dataset_id);
    ///
    /// let client = Client::from_service_account_key_file(sa_key).await?;
    ///
    /// client.dataset().delete_if_exists(project_id, dataset_id, true);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn delete_if_exists(&self, project_id: &str, dataset_id: &str, delete_contents: bool) -> bool {
        match self.delete(project_id, dataset_id, delete_contents).await {
            Err(BQError::ResponseError { error }) => {
                if error.error.code != 404 {
                    warn!("dataset.delete_if_exists: unexpected error: {:?}", error);
                }
                false
            }
            Err(err) => {
                warn!("dataset.delete_if_exists: unexpected error: {:?}", err);
                false
            }
            Ok(_) => true,
        }
    }

    /// Returns the dataset specified by datasetID.
    /// # Arguments
    /// * `project_id` - Project ID of the requested dataset
    /// * `dataset_id` - Dataset ID of the requested dataset
    ///
    /// # Example
    /// ```
    /// # use gcp_bigquery_client::{Client, env_vars};
    /// # use gcp_bigquery_client::model::dataset::Dataset;
    /// # use gcp_bigquery_client::error::BQError;
    /// # use gcp_bigquery_client::dataset::ListOptions;
    ///
    /// # async fn run() -> Result<(), BQError> {
    /// let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
    /// let dataset_id = &format!("{}_dataset", dataset_id);
    ///
    /// let client = Client::from_service_account_key_file(sa_key).await?;
    ///
    /// # client.dataset().delete_if_exists(project_id, dataset_id, true);
    /// client.dataset().create(Dataset::new(project_id, dataset_id)).await?;
    /// let dataset = client.dataset().get(project_id, dataset_id).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn get(&self, project_id: &str, dataset_id: &str) -> Result<Dataset, BQError> {
        let req_url = &format!(
            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
            base_url = self.base_url,
            project_id = urlencode(project_id),
            dataset_id = urlencode(dataset_id)
        );

        let access_token = self.auth.access_token().await?;

        let request = self.client.get(req_url).bearer_auth(access_token).build()?;
        let response = self.client.execute(request).await?;

        process_response(response).await
    }

    /// Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the
    /// patch method only replaces fields that are provided in the submitted dataset resource. This method supports
    /// patch semantics.
    /// # Arguments
    /// * dataset - The request body contains an instance of Dataset.
    pub async fn patch(&self, project_id: &str, dataset_id: &str, dataset: Dataset) -> Result<Dataset, BQError> {
        let req_url = &format!(
            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
            base_url = self.base_url,
            project_id = urlencode(project_id),
            dataset_id = urlencode(dataset_id)
        );

        let access_token = self.auth.access_token().await?;

        let request = self
            .client
            .patch(req_url)
            .bearer_auth(access_token)
            .json(&dataset)
            .build()?;
        let response = self.client.execute(request).await?;

        process_response(response).await
    }

    /// Updates information in an existing dataset. The update method replaces the entire dataset resource, whereas the
    /// patch method only replaces fields that are provided in the submitted dataset resource.
    /// # Arguments
    /// * dataset - The request body contains an instance of Dataset.
    pub async fn update(&self, project_id: &str, dataset_id: &str, dataset: Dataset) -> Result<Dataset, BQError> {
        let req_url = &format!(
            "{base_url}/projects/{project_id}/datasets/{dataset_id}",
            base_url = self.base_url,
            project_id = urlencode(project_id),
            dataset_id = urlencode(dataset_id)
        );

        let access_token = self.auth.access_token().await?;

        let request = self
            .client
            .put(req_url)
            .bearer_auth(access_token)
            .json(&dataset)
            .build()?;
        let response = self.client.execute(request).await?;

        process_response(response).await
    }

    pub async fn information_schema_schemata(&self, project_id: &str, region: &str) -> Result<Vec<Schemata>, BQError> {
        let req_url = format!(
            "{base_url}/projects/{project_id}/queries",
            base_url = self.base_url,
            project_id = urlencode(project_id)
        );

        let access_token = self.auth.access_token().await?;
        let query_request = QueryRequest::new(format!("SELECT * FROM {region}.INFORMATION_SCHEMA.SCHEMATA"));

        let request = self
            .client
            .post(req_url.as_str())
            .bearer_auth(access_token)
            .json(&query_request)
            .build()?;

        let resp = self.client.execute(request).await?;

        let query_response: QueryResponse = process_response(resp).await?;
        let mut rs = ResultSet::new(query_response);
        let mut result = vec![];
        let catalog_name_pos = *rs
            .column_index("catalog_name")
            .expect("The catalog_name column is expected");
        let schema_name_pos = *rs
            .column_index("schema_name")
            .expect("The schema_name column is expected");
        let schema_owner_pos = *rs
            .column_index("schema_owner")
            .expect("The schema_owner column is expected");
        let creation_time_pos = *rs
            .column_index("creation_time")
            .expect("The creation_time column is expected");
        let last_modified_time_pos = *rs
            .column_index("last_modified_time")
            .expect("The last_modified_time column is expected");
        let location_pos = *rs.column_index("location").expect("The location column is expected");

        while rs.next_row() {
            result.push(Schemata {
                catalog_name: rs.get_string(catalog_name_pos)?.expect("A catalog name is expected"),
                schema_name: rs.get_string(schema_name_pos)?.expect("A schema_name is expected"),
                schema_owner: rs.get_string(schema_owner_pos)?,
                creation_time: rs.get_string(creation_time_pos)?.expect("A creation_time is expected"),
                last_modified_time: rs
                    .get_string(last_modified_time_pos)?
                    .expect("A last_modified_time is expected"),
                location: rs.get_string(location_pos)?.expect("A location is expected"),
            });
        }

        // ToDo page token, max result, process timestamp

        Ok(result)
    }
}

/// A list of options used to create a dataset API handler.
#[derive(Default)]
pub struct ListOptions {
    max_results: Option<u64>,
    page_token: Option<String>,
    all: Option<bool>,
    filter: Option<String>,
}

impl ListOptions {
    /// The maximum number of results to return in a single response page. Leverage the page tokens to iterate through
    /// the entire collection.
    pub fn max_results(mut self, value: u64) -> Self {
        self.max_results = Some(value);
        self
    }

    /// Page token, returned by a previous call, to request the next page of results
    pub fn page_token(mut self, value: String) -> Self {
        self.page_token = Some(value);
        self
    }

    /// Whether to list all datasets, including hidden ones
    pub fn all(mut self, value: bool) -> Self {
        self.all = Some(value);
        self
    }

    /// An expression for filtering the results of the request by label. The syntax is "labels.<name>[:<value>]".
    /// Multiple filters can be ANDed together by connecting with a space. Example: "labels.department:receiving
    /// labels.active". See Filtering datasets using labels for details.
    pub fn filter(mut self, value: String) -> Self {
        self.filter = Some(value);
        self
    }
}

#[cfg(test)]
mod test {
    use crate::dataset::ListOptions;
    use crate::error::BQError;
    use crate::model::dataset::Dataset;
    use crate::{env_vars, Client};

    #[tokio::test]
    async fn test() -> Result<(), BQError> {
        let (ref project_id, ref dataset_id, ref _table_id, ref sa_key) = env_vars();
        let dataset_id = &format!("{dataset_id}_dataset");

        let client = Client::from_service_account_key_file(sa_key).await?;

        // Delete the dataset if needed
        let result = client.dataset().delete(project_id, dataset_id, true).await;
        if result.is_ok() {
            println!("Removed previous dataset '{dataset_id}'");
        }

        // Create dataset
        let created_dataset = client
            .dataset()
            .create(
                Dataset::new(project_id, dataset_id)
                    .friendly_name("A dataset used for unit tests")
                    .location("US")
                    .label("owner", "me")
                    .label("env", "prod"),
            )
            .await?;
        assert_eq!(created_dataset.id, Some(format!("{project_id}:{dataset_id}")));

        // Get dataset
        let dataset = client.dataset().get(project_id, dataset_id).await?;
        assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));

        // Patch dataset
        let dataset = client.dataset().patch(project_id, dataset_id, dataset).await?;
        assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));

        // Update dataset
        let dataset = client.dataset().update(project_id, dataset_id, dataset).await?;
        assert_eq!(dataset.id, Some(format!("{project_id}:{dataset_id}")));

        // List datasets
        let datasets = client
            .dataset()
            .list(project_id, ListOptions::default().all(true))
            .await?;
        let mut created_dataset_found = false;
        for dataset in datasets.datasets.iter() {
            if dataset.dataset_reference.dataset_id == *dataset_id {
                created_dataset_found = true;
            }
        }
        assert!(created_dataset_found);

        // Delete dataset
        client.dataset().delete(project_id, dataset_id, true).await?;

        Ok(())
    }

    #[tokio::test]
    async fn test_information_schema() -> Result<(), BQError> {
        let (ref project_id, ref _dataset_id, ref _table_id, ref sa_key) = env_vars();
        //let dataset_id = &format!("{}_dataset", dataset_id);

        let client = Client::from_service_account_key_file(sa_key).await?;

        let result = client
            .dataset()
            .information_schema_schemata(project_id, "region-us")
            .await?;
        dbg!(result);
        Ok(())
    }
}