use super::BigQueryLocator;
use crate::clouds::gcloud::bigquery;
use crate::common::*;
use crate::drivers::{
bigquery_shared::{BqTable, GCloudDriverArguments, SchemaBigQueryExt, Usage},
gs::GsLocator,
};
#[instrument(
level = "debug",
name = "bigquery::write_remote_data",
skip_all,
fields(source = %source, dest = %dest)
)]
pub(crate) async fn write_remote_data_helper(
source: BoxLocator,
dest: BigQueryLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<Vec<BoxLocator>> {
let mut source_url = source
.as_any()
.downcast_ref::<GsLocator>()
.ok_or_else(|| format_err!("not a gs:// locator: {}", source))?
.as_url()
.to_owned();
let shared_args = shared_args.verify(BigQueryLocator::features())?;
let _source_args = source_args.verify(Features::empty())?;
let dest_args = dest_args.verify(BigQueryLocator::features())?;
let schema = shared_args.schema();
let temporary_storage = shared_args.temporary_storage();
let if_exists = dest_args.if_exists();
let job_labels = dest_args
.driver_args()
.deserialize::<GCloudDriverArguments>()
.context("error parsing --to-args")?
.job_labels
.to_owned();
if source_url.as_str().ends_with('/') {
source_url = source_url.join("*.csv")?;
}
let use_temp = !schema.bigquery_can_import_from_csv()? || if_exists.is_upsert();
let initial_table_name = if use_temp {
let initial_table_name =
dest.table_name.temporary_table_name(temporary_storage)?;
debug!("loading into temporary table {}", initial_table_name);
initial_table_name
} else {
let initial_table_name = dest.table_name.clone();
debug!("loading directly into final table {}", initial_table_name,);
initial_table_name
};
let initial_table = BqTable::for_table_name_and_columns(
schema,
initial_table_name,
&schema.table.columns,
if use_temp {
Usage::CsvLoad
} else {
Usage::FinalTable
},
)?;
let if_initial_table_exists = if use_temp {
&IfExists::Overwrite
} else {
if_exists
};
bigquery::load(
&source_url,
&initial_table,
if_initial_table_exists,
&job_labels,
)
.await?;
if use_temp {
let dest_table = BqTable::for_table_name_and_columns(
schema,
dest.table_name.clone(),
&schema.table.columns,
Usage::FinalTable,
)?;
debug!("transforming data into final table {}", dest_table.name(),);
let mut query = Vec::new();
dest_table.write_import_sql(initial_table.name(), if_exists, &mut query)?;
let query =
String::from_utf8(query).expect("generated SQL should always be UTF-8");
debug!("import sql: {}", query);
bigquery::execute_sql(dest.project(), &query, &job_labels).await?;
bigquery::drop_table(initial_table.name(), &job_labels).await?;
}
Ok(vec![dest.boxed()])
}