gcp-bigquery-client 0.10.0

An ergonomic async client library for GCP BigQuery.
Documentation

GCP BigQuery Client

An ergonomic Rust async client library for GCP BigQuery.

  • Support all BigQuery API endpoints (not all covered by unit tests yet)
  • Support Service Account Key authentication, workload identity and other yup-oauth2 mechanisms
  • Create tables and rows via builder patterns
  • Persist complex Rust structs in structured BigQuery tables
  • Async API

Features:

  • rust-tls (default): RUSTLS-based
  • native-tls: OpenSSL-based

Example

This example performs the following operations:

  • Load a set of environment variables to set $PROJECT_ID, $DATASET_ID, $TABLE_ID and $GOOGLE_APPLICATION_CREDENTIALS
  • Init the BigQuery client
  • Create a dataset in the GCP project $PROJECT_ID
  • Create a table in the previously created dataset (table schema)
  • Insert a set of rows in the previously created table via the BigQuery Streaming API. The inserted rows are based on a regular Rust struct implementing the trait Serialize.
  • Perform a select query on the previously created table
  • Drop the table previously created
  • Drop the dataset previously created
    let client = gcp_bigquery_client::Client::from_service_account_key_file(gcp_sa_key).await;

    // Create a new dataset
    let dataset = client
        .dataset()
        .create(
            Dataset::new(project_id, dataset_id)
                .location("US")
                .friendly_name("Just a demo dataset")
                .label("owner", "me")
                .label("env", "prod"),
        )
        .await?;

    // Create a new table
    let table = dataset
        .create_table(
            &client,
            Table::from_dataset(
                &dataset,
                table_id,
                TableSchema::new(vec![
                    TableFieldSchema::timestamp("ts"),
                    TableFieldSchema::integer("int_value"),
                    TableFieldSchema::float("float_value"),
                    TableFieldSchema::bool("bool_value"),
                    TableFieldSchema::string("string_value"),
                    TableFieldSchema::record(
                        "record_value",
                        vec![
                            TableFieldSchema::integer("int_value"),
                            TableFieldSchema::string("string_value"),
                            TableFieldSchema::record(
                                "record_value",
                                vec![
                                    TableFieldSchema::integer("int_value"),
                                    TableFieldSchema::string("string_value"),
                                ],
                            ),
                        ],
                    ),
                ]),
            )
            .friendly_name("Demo table")
            .description("A nice description for this table")
            .label("owner", "me")
            .label("env", "prod")
            .expiration_time(SystemTime::now() + Duration::from_secs(3600))
            .time_partitioning(
                TimePartitioning::per_day()
                    .expiration_ms(Duration::from_secs(3600 * 24 * 7))
                    .field("ts"),
            ),
        )
        .await?;
    println!("Table created -> {:?}", table);

    // Insert data via BigQuery Streaming API
    let mut insert_request = TableDataInsertAllRequest::new();
    insert_request.add_row(
        None,
        MyRow {
            ts: Utc::now(),
            int_value: 1,
            float_value: 1.0,
            bool_value: false,
            string_value: "first".into(),
            record_value: FirstRecordLevel {
                int_value: 10,
                string_value: "sub_level_1.1".into(),
                record_value: SecondRecordLevel {
                    int_value: 20,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;
    insert_request.add_row(
        None,
        MyRow {
            ts: Utc::now(),
            int_value: 2,
            float_value: 2.0,
            bool_value: true,
            string_value: "second".into(),
            record_value: FirstRecordLevel {
                int_value: 11,
                string_value: "sub_level_1.2".into(),
                record_value: SecondRecordLevel {
                    int_value: 21,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;
    insert_request.add_row(
        None,
        MyRow {
            ts: Utc::now(),
            int_value: 3,
            float_value: 3.0,
            bool_value: false,
            string_value: "third".into(),
            record_value: FirstRecordLevel {
                int_value: 12,
                string_value: "sub_level_1.3".into(),
                record_value: SecondRecordLevel {
                    int_value: 22,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;
    insert_request.add_row(
        None,
        MyRow {
            ts: Utc::now(),
            int_value: 4,
            float_value: 4.0,
            bool_value: true,
            string_value: "fourth".into(),
            record_value: FirstRecordLevel {
                int_value: 13,
                string_value: "sub_level_1.4".into(),
                record_value: SecondRecordLevel {
                    int_value: 23,
                    string_value: "leaf".to_string(),
                },
            },
        },
    )?;

    client
        .tabledata()
        .insert_all(project_id, dataset_id, table_id, insert_request)
        .await?;

    // Query
    let mut rs = client
        .job()
        .query(
            project_id,
            QueryRequest::new(format!(
                "SELECT COUNT(*) AS c FROM `{}.{}.{}`",
                project_id, dataset_id, table_id
            )),
        )
        .await?;
    while rs.next_row() {
        println!("Number of rows inserted: {}", rs.get_i64_by_name("c")?.unwrap());
    }

    // Delete the table previously created
    client.table().delete(project_id, dataset_id, table_id).await?;

    // Delete the dataset previously created
    client.dataset().delete(project_id, dataset_id, true).await?;

Status

The API of this crate is still subject to change up to version 1.0.

List of endpoints implemented:

  • Dataset - All methods
  • Table - All methods
  • Tabledata - All methods
  • Job - All methods
  • Model - All methods (not tested)
  • Project (not tested)
  • Routine - All methods (not tested)

License