use serde::{Deserialize, Serialize};
use std::{collections::HashMap, convert::TryFrom};
use tokio::time::{sleep, Duration};
use super::{
super::{Client, NoQuery},
BigQueryError, TableSchema,
};
use crate::common::*;
use crate::drivers::bigquery_shared::TableName;
pub(crate) type Labels = HashMap<String, String>;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Job {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) self_link: Option<String>,
pub(crate) configuration: JobConfiguration,
pub(crate) job_reference: Option<JobReference>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) status: Option<JobStatus>,
}
impl Job {
fn from_config(configuration: JobConfiguration) -> Self {
Job {
id: None,
self_link: None,
configuration,
job_reference: None,
status: None,
}
}
pub(crate) fn new_query(
query_config: JobConfigurationQuery,
labels: Labels,
) -> Self {
let mut config = JobConfiguration::default();
config.query = Some(query_config);
config.labels = labels;
Self::from_config(config)
}
pub(crate) fn new_load(load_config: JobConfigurationLoad, labels: Labels) -> Self {
let mut config = JobConfiguration::default();
config.load = Some(load_config);
config.labels = labels;
Self::from_config(config)
}
pub(crate) fn new_extract(
extract_config: JobConfigurationExtract,
labels: Labels,
) -> Self {
let mut config = JobConfiguration::default();
config.extract = Some(extract_config);
config.labels = labels;
Self::from_config(config)
}
pub(crate) fn reference(&self) -> Result<&JobReference> {
self.job_reference
.as_ref()
.ok_or_else(|| format_err!("newly created job has no jobReference"))
}
pub(crate) fn url(&self) -> Result<Url> {
self.self_link
.as_ref()
.ok_or_else(|| format_err!("newly created job has no selfLink"))?
.parse::<Url>()
.context("BigQuery returned invalid selfLink")
}
}
#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JobReference {
pub(crate) project_id: String,
pub(crate) job_id: String,
pub(crate) location: String,
}
#[derive(Debug, Default, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JobConfiguration {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) query: Option<JobConfigurationQuery>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) load: Option<JobConfigurationLoad>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) extract: Option<JobConfigurationExtract>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) dry_run: Option<bool>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub(crate) labels: Labels,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JobConfigurationQuery {
pub(crate) destination_table: Option<TableReference>,
pub(crate) create_disposition: Option<CreateDisposition>,
pub(crate) write_disposition: Option<WriteDisposition>,
pub(crate) query: String,
pub(crate) use_legacy_sql: Option<bool>,
}
impl JobConfigurationQuery {
pub(crate) fn new<S: Into<String>>(query: S) -> Self {
JobConfigurationQuery {
destination_table: None,
create_disposition: None,
write_disposition: None,
query: query.into(),
use_legacy_sql: Some(false),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JobConfigurationLoad {
pub(crate) source_uris: Vec<String>,
pub(crate) schema: Option<TableSchema>,
pub(crate) destination_table: TableReference,
pub(crate) create_disposition: Option<CreateDisposition>,
pub(crate) write_disposition: Option<WriteDisposition>,
pub(crate) skip_leading_rows: Option<i32>,
pub(crate) allow_quoted_newlines: Option<bool>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JobConfigurationExtract {
pub(crate) destination_uris: Vec<String>,
pub(crate) source_table: TableReference,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct JobStatus {
state: JobState,
error_result: Option<BigQueryError>,
#[serde(default)]
errors: Vec<BigQueryError>,
}
impl JobStatus {
fn check_for_error(&self) -> Result<(), BigQueryError> {
if let Some(err) = &self.error_result {
Err(err.clone())
} else {
Ok(())
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Serialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub(crate) enum JobState {
Pending,
Running,
Done,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct TableReference {
pub(crate) project_id: String,
pub(crate) dataset_id: String,
pub(crate) table_id: String,
}
impl From<&TableName> for TableReference {
fn from(name: &TableName) -> Self {
Self {
project_id: name.project().to_owned(),
dataset_id: name.dataset().to_owned(),
table_id: name.table().to_owned(),
}
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Serialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[allow(clippy::enum_variant_names)]
pub(crate) enum CreateDisposition {
CreateIfNeeded,
CreateNever,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Serialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
#[allow(clippy::enum_variant_names)]
pub(crate) enum WriteDisposition {
WriteTruncate,
WriteAppend,
WriteEmpty,
}
impl TryFrom<&IfExists> for WriteDisposition {
type Error = Error;
fn try_from(if_exists: &IfExists) -> Result<Self> {
match if_exists {
IfExists::Append => Ok(WriteDisposition::WriteAppend),
IfExists::Error => Ok(WriteDisposition::WriteEmpty),
IfExists::Overwrite => Ok(WriteDisposition::WriteTruncate),
IfExists::Upsert(_) => {
Err(format_err!("cannot upsert to using writeDisposition"))
}
}
}
}
#[instrument(level = "trace", skip(client, job))]
pub(crate) async fn run_job(
client: &Client,
project_id: &str,
mut job: Job,
) -> Result<Job> {
trace!("starting BigQuery job on {} {:?}", project_id, job);
let insert_url = format!(
"https://bigquery.googleapis.com/bigquery/v2/projects/{}/jobs",
project_id,
);
job = client
.post::<Job, _, _, _>(&insert_url, NoQuery, job)
.await?;
let job_url = job.url()?;
let mut sleep_duration = Duration::from_secs(2);
loop {
let state = job.status.as_ref().map(|s| s.state);
if state == Some(JobState::Done) {
break;
}
sleep(sleep_duration).await;
if sleep_duration < Duration::from_secs(16) {
sleep_duration *= 2;
}
job = client.get::<Job, _, _>(job_url.as_str(), NoQuery).await?;
}
job.status
.as_ref()
.expect("should have already checked for status")
.check_for_error()?;
Ok(job)
}