//! Run BigQuery locally by using [big-query-emulator](https://github.com/goccy/bigquery-emulator).
//!
//! With Docker:
//! ```bash
//! docker run -p 9050:9050 ghcr.io/goccy/bigquery-emulator:latest --project=my_project
//! ```
use auth_mock::GoogleAuthMock;
use bq::BQ;
mod auth_mock {
use serde::Serialize;
use std::ops::Deref;
use wiremock::{
matchers::{method, path},
Mock, MockServer, ResponseTemplate, Times,
};
pub const AUTH_TOKEN_ENDPOINT: &str = "/:o/oauth2/token";
pub struct GoogleAuthMock {
server: MockServer,
}
impl Deref for GoogleAuthMock {
type Target = MockServer;
fn deref(&self) -> &Self::Target {
&self.server
}
}
impl GoogleAuthMock {
pub async fn start() -> Self {
Self {
server: MockServer::start().await,
}
}
}
#[derive(Eq, PartialEq, Serialize, Debug, Clone)]
pub struct Token {
access_token: String,
token_type: String,
expires_in: u32,
}
impl Token {
fn fake() -> Self {
Self {
access_token: "aaaa".to_string(),
token_type: "bearer".to_string(),
expires_in: 9999999,
}
}
}
impl GoogleAuthMock {
/// Mock token, given how many times the endpoint will be called.
pub async fn mock_token<T: Into<Times>>(&self, n_times: T) {
let response = ResponseTemplate::new(200).set_body_json(Token::fake());
Mock::given(method("POST"))
.and(path(AUTH_TOKEN_ENDPOINT))
.respond_with(response)
.named("mock token")
.expect(n_times)
.mount(self)
.await;
}
}
}
pub fn dummy_configuration(oauth_server: &str) -> serde_json::Value {
let oauth_endpoint = format!("{oauth_server}/:o/oauth2");
serde_json::json!({
"type": "service_account",
"project_id": "dummy",
"private_key_id": "dummy",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDNk6cKkWP/4NMu\nWb3s24YHfM639IXzPtTev06PUVVQnyHmT1bZgQ/XB6BvIRaReqAqnQd61PAGtX3e\n8XocTw+u/ZfiPJOf+jrXMkRBpiBh9mbyEIqBy8BC20OmsUc+O/YYh/qRccvRfPI7\n3XMabQ8eFWhI6z/t35oRpvEVFJnSIgyV4JR/L/cjtoKnxaFwjBzEnxPiwtdy4olU\nKO/1maklXexvlO7onC7CNmPAjuEZKzdMLzFszikCDnoKJC8k6+2GZh0/JDMAcAF4\nwxlKNQ89MpHVRXZ566uKZg0MqZqkq5RXPn6u7yvNHwZ0oahHT+8ixPPrAEjuPEKM\nUPzVRz71AgMBAAECggEAfdbVWLW5Befkvam3hea2+5xdmeN3n3elrJhkiXxbAhf3\nE1kbq9bCEHmdrokNnI34vz0SWBFCwIiWfUNJ4UxQKGkZcSZto270V8hwWdNMXUsM\npz6S2nMTxJkdp0s7dhAUS93o9uE2x4x5Z0XecJ2ztFGcXY6Lupu2XvnW93V9109h\nkY3uICLdbovJq7wS/fO/AL97QStfEVRWW2agIXGvoQG5jOwfPh86GZZRYP9b8VNw\ntkAUJe4qpzNbWs9AItXOzL+50/wsFkD/iWMGWFuU8DY5ZwsL434N+uzFlaD13wtZ\n63D+tNAxCSRBfZGQbd7WxJVFfZe/2vgjykKWsdyNAQKBgQDnEBgSI836HGSRk0Ub\nDwiEtdfh2TosV+z6xtyU7j/NwjugTOJEGj1VO/TMlZCEfpkYPLZt3ek2LdNL66n8\nDyxwzTT5Q3D/D0n5yE3mmxy13Qyya6qBYvqqyeWNwyotGM7hNNOix1v9lEMtH5Rd\nUT0gkThvJhtrV663bcAWCALmtQKBgQDjw2rYlMUp2TUIa2/E7904WOnSEG85d+nc\norhzthX8EWmPgw1Bbfo6NzH4HhebTw03j3NjZdW2a8TG/uEmZFWhK4eDvkx+rxAa\n6EwamS6cmQ4+vdep2Ac4QCSaTZj02YjHb06Be3gptvpFaFrotH2jnpXxggdiv8ul\n6x+ooCffQQKBgQCR3ykzGoOI6K/c75prELyR+7MEk/0TzZaAY1cSdq61GXBHLQKT\nd/VMgAN1vN51pu7DzGBnT/dRCvEgNvEjffjSZdqRmrAVdfN/y6LSeQ5RCfJgGXSV\nJoWVmMxhCNrxiX3h01Xgp/c9SYJ3VD54AzeR/dwg32/j/oEAsDraLciXGQKBgQDF\nMNc8k/DvfmJv27R06Ma6liA6AoiJVMxgfXD8nVUDW3/tBCVh1HmkFU1p54PArvxe\nchAQqoYQ3dUMBHeh6ZRJaYp2ATfxJlfnM99P1/eHFOxEXdBt996oUMBf53bZ5cyJ\n/lAVwnQSiZy8otCyUDHGivJ+mXkTgcIq8BoEwERFAQKBgQDmImBaFqoMSVihqHIf\nDa4WZqwM7ODqOx0JnBKrKO8UOc51J5e1vpwP/qRpNhUipoILvIWJzu4efZY7GN5C\nImF9sN3PP6Sy044fkVPyw4SYEisxbvp9tfw8Xmpj/pbmugkB2ut6lz5frmEBoJSN\n3osZlZTgx+pM3sO6ITV6U4ID2Q==\n-----END PRIVATE KEY-----\n",
"client_email": "dummy@developer.gserviceaccount.com",
"client_id": "dummy",
"auth_uri": format!("{oauth_endpoint}/auth"),
"token_uri": format!("{}{}", oauth_server, auth_mock::AUTH_TOKEN_ENDPOINT),
"auth_provider_x509_cert_url": format!("{oauth_endpoint}/v1/certs"),
"client_x509_cert_url": format!("{oauth_server}/robot/v1/metadata/x509/457015483506-compute%40developer.gserviceaccount.com")
})
}
mod bq {
use std::path::Path;
use fake::{Fake, StringFaker};
use gcp_bigquery_client::{
model::{
dataset::Dataset, query_request::QueryRequest, table::Table,
table_data_insert_all_request::TableDataInsertAllRequest, table_field_schema::TableFieldSchema,
table_schema::TableSchema,
},
Client,
};
use serde::Serialize;
// The project ID needs to match with the flag `--project` of the bigquery emulator.
const PROJECT_ID: &str = "my_project";
const NAME_COLUMN: &str = "name";
const TABLE_ID: &str = "table";
pub struct BQ {
client: Client,
project_id: String,
dataset_id: String,
table_id: String,
}
#[derive(Serialize, Debug, Clone, PartialEq, Eq)]
pub struct Row {
pub name: String,
}
impl BQ {
pub async fn new(sa_config_path: &Path, big_query_auth_base_url: String) -> Self {
let client = gcp_bigquery_client::client_builder::ClientBuilder::new()
.with_auth_base_url(big_query_auth_base_url)
// Url of the BigQuery emulator docker image.
.with_v2_base_url("http://localhost:9050".to_string())
.build_from_service_account_key_file(sa_config_path.to_str().unwrap())
.await
.unwrap();
// Use a random dataset id, so that each run is isolated.
let dataset_id: String = {
const LETTERS: &str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
let f = StringFaker::with(Vec::from(LETTERS), 8);
f.fake()
};
// Create a new dataset
let dataset = client
.dataset()
.create(Dataset::new(PROJECT_ID, &dataset_id))
.await
.unwrap();
create_table(&client, &dataset).await;
Self {
client,
project_id: PROJECT_ID.to_string(),
dataset_id: dataset_id.to_string(),
table_id: TABLE_ID.to_string(),
}
}
pub async fn delete_dataset(&self) {
// Delete the table previously created
self.client
.table()
.delete(&self.project_id, &self.dataset_id, &self.table_id)
.await
.unwrap();
// Delete the dataset previously created
self.client
.dataset()
.delete(&self.project_id, &self.dataset_id, true)
.await
.unwrap();
}
pub async fn insert_row(&self, name: String) {
let mut insert_request = TableDataInsertAllRequest::new();
insert_request.add_row(None, Row { name }).unwrap();
self.client
.tabledata()
.insert_all(&self.project_id, &self.dataset_id, &self.table_id, insert_request)
.await
.unwrap();
}
pub async fn get_rows(&self) -> Vec<String> {
let mut rs = self
.client
.job()
.query(
&self.project_id,
QueryRequest::new(format!(
"SELECT * FROM `{}.{}.{}`",
&self.project_id, &self.dataset_id, &self.table_id
)),
)
.await
.unwrap();
let mut rows: Vec<String> = vec![];
while rs.next_row() {
let name = rs.get_string_by_name(NAME_COLUMN).unwrap().unwrap();
rows.push(name)
}
rows
}
}
async fn create_table(client: &Client, dataset: &Dataset) {
dataset
.create_table(
client,
Table::from_dataset(
dataset,
TABLE_ID,
TableSchema::new(vec![TableFieldSchema::string(NAME_COLUMN)]),
),
)
.await
.unwrap();
}
}
#[tokio::main]
async fn main() {
let google_auth = GoogleAuthMock::start().await;
google_auth.mock_token(1).await;
let google_config = dummy_configuration(&google_auth.uri());
// Write google configuration to file.
let temp_file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(temp_file.path(), serde_json::to_string_pretty(&google_config).unwrap()).unwrap();
let bq = BQ::new(temp_file.path(), google_auth.uri()).await;
let name = "foo";
bq.insert_row(name.to_string()).await;
let rows = bq.get_rows().await;
assert_eq!(rows, vec![name]);
println!("That's all Folks!");
bq.delete_dataset().await;
}