use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::convert::TryFrom;
use super::{
super::client::{percent_encode, Client},
jobs::{
run_job, CreateDisposition, Job, JobConfigurationQuery, Labels,
TableReference, WriteDisposition,
},
TableSchema,
};
use crate::common::*;
use crate::drivers::bigquery_shared::{BqColumn, TableName};
pub(crate) async fn execute_sql(
project: &str,
sql: &str,
labels: &Labels,
) -> Result<()> {
trace!("executing SQL: {}", sql);
let config = JobConfigurationQuery::new(sql);
let client = Client::new().await?;
run_job(&client, project, Job::new_query(config, labels.to_owned())).await?;
Ok(())
}
pub(crate) async fn query_to_table(
project: &str,
sql: &str,
dest_table: &TableName,
if_exists: &IfExists,
labels: &Labels,
) -> Result<()> {
trace!("writing query to {}: {}", dest_table, sql);
let mut config = JobConfigurationQuery::new(sql);
config.destination_table = Some(TableReference::from(dest_table));
config.create_disposition = Some(CreateDisposition::CreateIfNeeded);
config.write_disposition = Some(WriteDisposition::try_from(if_exists)?);
let client = Client::new().await?;
run_job(&client, project, Job::new_query(config, labels.to_owned())).await?;
Ok(())
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct QueryResultsQuery {
location: String,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct QueryResults {
schema: TableSchema,
rows: Vec<Row>,
job_complete: bool,
}
impl QueryResults {
fn to_json_objects(&self) -> Result<Vec<serde_json::Value>> {
let objects = self
.rows
.iter()
.map(|row| row.to_json_object(&self.schema.fields))
.collect::<Result<Vec<serde_json::Value>>>()?;
trace!(
"rows as objects: {}",
serde_json::to_string(&objects).expect("should be able to serialize rows"),
);
Ok(objects)
}
}
#[derive(Debug, Deserialize)]
struct Row {
#[serde(rename = "f")]
fields: Vec<Value>,
}
impl Row {
fn to_json_object(&self, columns: &[BqColumn]) -> Result<serde_json::Value> {
if columns.len() != self.fields.len() {
return Err(format_err!(
"schema contained {} columns, but row contains {}",
columns.len(),
self.fields.len(),
));
}
let mut obj = serde_json::Map::with_capacity(columns.len());
for (col, value) in columns.iter().zip(self.fields.iter()) {
obj.insert(col.name.to_portable_name(), value.to_json_value()?);
}
Ok(serde_json::Value::Object(obj))
}
}
#[derive(Debug, Deserialize)]
struct Value {
#[serde(rename = "v")]
value: serde_json::Value,
}
impl Value {
fn to_json_value(&self) -> Result<serde_json::Value> {
Ok(self.value.clone())
}
}
#[instrument(level = "trace", skip(labels))]
async fn query_all_json(
project: &str,
sql: &str,
labels: &Labels,
) -> Result<Vec<serde_json::Value>> {
trace!("executing SQL: {}", sql);
let config = JobConfigurationQuery::new(sql);
let client = Client::new().await?;
let job =
run_job(&client, project, Job::new_query(config, labels.to_owned())).await?;
let reference = job.reference()?;
let url = format!(
"https://bigquery.googleapis.com/bigquery/v2/projects/{}/queries/{}",
percent_encode(project),
percent_encode(&reference.job_id),
);
let query = QueryResultsQuery {
location: reference.location.clone(),
};
let results = client.get::<QueryResults, _, _>(&url, query).await?;
if results.job_complete {
results.to_json_objects()
} else {
Err(format_err!(
"expected query to have finished, but it hasn't",
))
}
}
#[instrument(level = "trace", skip(labels))]
pub(crate) async fn query_all<T>(
project: &str,
sql: &str,
labels: &Labels,
) -> Result<Vec<T>>
where
T: DeserializeOwned,
{
let output = query_all_json(project, sql, labels).await?;
let rows = output
.into_iter()
.map(serde_json::from_value::<T>)
.collect::<Result<Vec<T>, _>>()
.context("could not parse count output")?;
Ok(rows)
}
#[instrument(level = "trace", skip(labels))]
pub(crate) async fn query_one<T>(
project: &str,
sql: &str,
labels: &Labels,
) -> Result<T>
where
T: DeserializeOwned,
{
let mut rows = query_all(project, sql, labels).await?;
if rows.len() == 1 {
Ok(rows.remove(0))
} else {
Err(format_err!("expected 1 row, found {}", rows.len()))
}
}