use super::{prepare_as_destination_helper, S3Locator};
use crate::common::*;
use crate::drivers::{
postgres::connect,
postgres_shared::{pg_quote, PgCreateTable},
redshift::{credentials_sql, RedshiftLocator},
};
pub(crate) async fn write_remote_data_helper(
ctx: Context,
source: BoxLocator,
dest: S3Locator,
shared_args: SharedArguments<Unverified>,
source_args: SourceArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<Vec<BoxLocator>> {
let source = source
.as_any()
.downcast_ref::<RedshiftLocator>()
.ok_or_else(|| format_err!("not a redshift:// locator: {}", source))?;
let shared_args = shared_args.verify(S3Locator::features())?;
let source_args = source_args.verify(RedshiftLocator::features())?;
let dest_args = dest_args.verify(S3Locator::features())?;
let schema = shared_args.schema();
let from_args = source_args.driver_args();
let if_exists = dest_args.if_exists().to_owned();
prepare_as_destination_helper(ctx.clone(), dest.as_url().to_owned(), if_exists)
.await?;
let table_name = source.table_name();
let pg_create_table =
PgCreateTable::from_name_and_columns(table_name.to_owned(), &schema.columns)?;
let mut sql_bytes: Vec<u8> = vec![];
pg_create_table.write_export_select_sql(&mut sql_bytes, &source_args)?;
let select_sql = String::from_utf8(sql_bytes).expect("should always be UTF-8");
debug!(ctx.log(), "export SQL: {}", select_sql);
let mut client = connect(ctx.clone(), source.url().to_owned()).await?;
let unload_sql = format!(
"UNLOAD ({source}) TO {dest}\n{credentials}HEADER FORMAT CSV",
source = pg_quote(&select_sql),
dest = pg_quote(dest.as_url().as_str()),
credentials = credentials_sql(from_args)?,
);
let unload_stmt = client.prepare(&unload_sql).compat().await?;
client
.execute(&unload_stmt, &[])
.compat()
.await
.with_context(|_| format!("error copying {} to {}", table_name, dest))?;
Ok(vec![dest.boxed()])
}