use bigml::{
try_with_permanent_failure,
wait::{wait, WaitStatus},
};
use super::{prepare_as_destination_helper, GsLocator};
use crate::clouds::gcloud::bigquery::original_bigquery_error;
use crate::clouds::gcloud::storage::gcs_write_access_denied_wait_options;
use crate::clouds::gcloud::{bigquery, storage};
use crate::common::*;
use crate::drivers::{
bigquery::BigQueryLocator,
bigquery_shared::{BqTable, GCloudDriverArguments, Usage},
};
#[instrument(
level = "debug",
name = "gs::write_remote_data",
skip_all,
fields(source = %source, dest = %dest)
)]
pub(crate) async fn write_remote_data_helper(
ctx: Context,
source: BoxLocator,
dest: GsLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<Vec<BoxLocator>> {
let source = source
.as_any()
.downcast_ref::<BigQueryLocator>()
.ok_or_else(|| format_err!("not a bigquery locator: {}", source))?;
let source_table_name = source.as_table_name().to_owned();
let shared_args = shared_args.verify(GsLocator::features())?;
let source_args = source_args.verify(BigQueryLocator::features())?;
let dest_args = dest_args.verify(GsLocator::features())?;
let schema = shared_args.schema();
let temporary_storage = shared_args.temporary_storage();
let if_exists = dest_args.if_exists().to_owned();
let job_labels = source_args
.driver_args()
.deserialize::<GCloudDriverArguments>()
.context("error parsing --from-args")?
.job_labels
.to_owned();
let source_table = BqTable::for_table_name_and_columns(
schema,
source_table_name.clone(),
&schema.table.columns,
Usage::FinalTable,
)?;
let mut real_source_table = BqTable::read_from_table(&source_table_name).await?;
real_source_table = real_source_table.aligned_with(&source_table)?;
let temp_table_name = source_table
.name()
.temporary_table_name(temporary_storage)?;
let mut export_sql_data = vec![];
real_source_table.write_export_sql(&source_args, &mut export_sql_data)?;
let export_sql =
String::from_utf8(export_sql_data).expect("should always be UTF-8");
debug!("export SQL: {}", export_sql);
bigquery::query_to_table(
source.project(),
&export_sql,
&temp_table_name,
&IfExists::Overwrite,
&job_labels,
)
.await?;
let opt = gcs_write_access_denied_wait_options();
wait(&opt, || async {
try_with_permanent_failure!(
prepare_as_destination_helper(
ctx.clone(),
dest.as_url().to_owned(),
if_exists.clone(),
)
.await
);
match bigquery::extract(&temp_table_name, dest.as_url(), &job_labels).await {
Ok(()) => WaitStatus::Finished(()),
Err(err) if should_retry_extract(&err) => {
WaitStatus::FailedTemporarily(err)
}
Err(err) => WaitStatus::FailedPermanently(err),
}
})
.await?;
bigquery::drop_table(&temp_table_name, &job_labels).await?;
if if_exists == IfExists::Overwrite {
let mut storage_object_stream = storage::ls(&ctx, &dest.url).await?;
let mut dest_urls = vec![];
while let Some(storage_object) = storage_object_stream.next().await {
let storage_object = storage_object?;
let locator = storage_object.to_url_string().parse::<GsLocator>()?;
dest_urls.push(locator.boxed());
}
Ok(dest_urls)
} else {
Ok(vec![dest.boxed()])
}
}
fn should_retry_extract(err: &Error) -> bool {
if let Some(bigquery_error) = original_bigquery_error(err) {
bigquery_error.is_access_denied()
} else {
false
}
}