gcp-bigquery-client 0.26.0

An ergonomic async client library for GCP BigQuery.
Documentation
use gcp_bigquery_client::{
    env_vars,
    storage::{ColumnMode, ColumnType, FieldDescriptor, StorageApi, StreamName, TableDescriptor},
};
use prost::Message;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (ref project_id, ref dataset_id, ref table_id, ref gcp_sa_key) = env_vars();

    let mut client = gcp_bigquery_client::Client::from_service_account_key_file(gcp_sa_key).await?;

    let field_descriptors = vec![
        FieldDescriptor {
            name: "actor_id".to_string(),
            number: 1,
            typ: ColumnType::Int64,
            mode: ColumnMode::Required,
        },
        FieldDescriptor {
            name: "first_name".to_string(),
            number: 2,
            typ: ColumnType::String,
            mode: ColumnMode::Required,
        },
        FieldDescriptor {
            name: "last_name".to_string(),
            number: 3,
            typ: ColumnType::String,
            mode: ColumnMode::Required,
        },
        FieldDescriptor {
            name: "last_update".to_string(),
            number: 4,
            typ: ColumnType::String,
            mode: ColumnMode::Required,
        },
    ];
    let table_descriptor = TableDescriptor { field_descriptors };

    #[derive(Clone, PartialEq, Message)]
    struct Actor {
        #[prost(int32, tag = "1")]
        actor_id: i32,

        #[prost(string, tag = "2")]
        first_name: String,

        #[prost(string, tag = "3")]
        last_name: String,

        #[prost(string, tag = "4")]
        last_update: String,
    }

    let actor1 = Actor {
        actor_id: 1,
        first_name: "John".to_string(),
        last_name: "Doe".to_string(),
        last_update: "2007-02-15 09:34:33 UTC".to_string(),
    };

    let actor2 = Actor {
        actor_id: 2,
        first_name: "Jane".to_string(),
        last_name: "Doe".to_string(),
        last_update: "2008-02-15 09:34:33 UTC".to_string(),
    };

    let stream_name = StreamName::new_default(project_id.clone(), dataset_id.clone(), table_id.clone());
    let trace_id = "test_client".to_string();

    const MAX_SIZE: usize = 9 * 1024 * 1024; // 9 MB
    let (rows, _) = StorageApi::create_rows(&table_descriptor, &[actor1, actor2], MAX_SIZE);
    let mut streaming = client.storage_mut().append_rows(&stream_name, rows, trace_id).await?;

    while let Some(resp) = streaming.next().await {
        let resp = resp?;
        println!("response: {resp:#?}");
    }

    Ok(())
}