use std::process::{Command, Stdio};
use tokio::io;
use tokio_process::CommandExt;
use super::{prepare_as_destination_helper, GsLocator};
use crate::common::*;
use crate::drivers::{
bigquery::BigQueryLocator,
bigquery_shared::{if_exists_to_bq_load_arg, BqTable, Usage},
};
pub(crate) async fn write_remote_data_helper(
ctx: Context,
source: BoxLocator,
dest: GsLocator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<Vec<BoxLocator>> {
let source = source
.as_any()
.downcast_ref::<BigQueryLocator>()
.ok_or_else(|| format_err!("not a bigquery locator: {}", source))?;
let source_table_name = source.as_table_name().to_owned();
let shared_args = shared_args.verify(GsLocator::features())?;
let source_args = source_args.verify(BigQueryLocator::features())?;
let dest_args = dest_args.verify(GsLocator::features())?;
let schema = shared_args.schema();
let temporary_storage = shared_args.temporary_storage();
let if_exists = dest_args.if_exists().to_owned();
let source_table = BqTable::for_table_name_and_columns(
source_table_name,
&schema.columns,
Usage::FinalTable,
)?;
let temp_table_name = source_table
.name()
.temporary_table_name(&temporary_storage)?;
let mut export_sql_data = vec![];
source_table.write_export_sql(&source_args, &mut export_sql_data)?;
let export_sql =
String::from_utf8(export_sql_data).expect("should always be UTF-8");
debug!(ctx.log(), "export SQL: {}", export_sql);
debug!(ctx.log(), "running `bq query`");
let mut query_child = Command::new("bq")
.stdin(Stdio::piped())
.stdout(Stdio::null())
.args(&[
"query",
"--headless",
"--format=none",
&format!("--destination_table={}", temp_table_name),
if_exists_to_bq_load_arg(&IfExists::Overwrite)?,
"--nouse_legacy_sql",
&format!("--project_id={}", source.project()),
])
.spawn_async()
.context("error starting `bq query`")?;
let child_stdin = query_child
.stdin()
.take()
.expect("don't have stdin that we requested");
io::write_all(child_stdin, export_sql)
.compat()
.await
.context("error piping query to `bq query`")?;
let status = query_child
.compat()
.await
.context("error running `bq query`")?;
if !status.success() {
return Err(format_err!("`bq query` failed with {}", status));
}
prepare_as_destination_helper(ctx.clone(), dest.as_url().to_owned(), if_exists)
.await?;
debug!(ctx.log(), "running `bq extract`");
let extract_child = Command::new("bq")
.args(&[
"extract",
"--headless",
"--destination_format=CSV",
&format!("--project_id={}", source.project()),
&temp_table_name.to_string(),
&format!("{}/*.csv", dest),
])
.stdout(Stdio::null())
.spawn_async()
.context("error starting `bq extract`")?;
let status = extract_child
.compat()
.await
.context("error running `bq extract`")?;
if !status.success() {
return Err(format_err!("`bq extract` failed with {}", status));
}
debug!(ctx.log(), "deleting export temp table: {}", temp_table_name);
let rm_child = Command::new("bq")
.args(&[
"rm",
"--headless",
"-f",
"-t",
&format!("--project_id={}", source.project()),
&temp_table_name.to_string(),
])
.stdout(Stdio::null())
.spawn_async()
.context("error starting `bq rm`")?;
let status = rm_child.compat().await.context("error running `bq rm`")?;
if !status.success() {
return Err(format_err!("`bq rm` failed with {}", status));
}
Ok(vec![dest.boxed()])
}