use itertools::Itertools;
use std::{collections::HashSet, io::prelude::*, iter::FromIterator, str};
use super::{connect, csv_to_binary::copy_csv_to_pg_binary, Client, PostgresLocator};
use crate::common::*;
use crate::drivers::postgres_shared::{Ident, PgCreateTable, TableName};
use crate::transform::spawn_sync_transform;
async fn drop_table_if_exists(
ctx: &Context,
client: &mut Client,
table: &PgCreateTable,
) -> Result<()> {
debug!(ctx.log(), "deleting table {} if exists", table.name);
let drop_sql = format!("DROP TABLE IF EXISTS {}", TableName(&table.name));
let drop_stmt = client.prepare(&drop_sql).compat().await?;
client
.execute(&drop_stmt, &[])
.compat()
.await
.with_context(|_| format!("error deleting existing {}", table.name))?;
Ok(())
}
async fn create_table(
ctx: &Context,
client: &mut Client,
table: &PgCreateTable,
) -> Result<()> {
debug!(ctx.log(), "create table {}", table.name);
let create_sql = format!("{}", table);
debug!(ctx.log(), "CREATE TABLE SQL: {}", create_sql);
let create_stmt = client.prepare(&create_sql).compat().await?;
client
.execute(&create_stmt, &[])
.compat()
.await
.with_context(|_| format!("error creating {}", &table.name))?;
Ok(())
}
async fn create_temp_table_for(
ctx: &Context,
client: &mut Client,
table: &PgCreateTable,
) -> Result<PgCreateTable> {
let mut temp_table = table.to_owned();
let temp_name = {
let name = TableName(&table.name);
let (_, base_name) = name.split()?;
format!("{}_temp_{}", base_name, TemporaryStorage::random_tag())
};
temp_table.name = temp_name;
temp_table.if_not_exists = false;
temp_table.temporary = true;
create_table(ctx, client, &temp_table).await?;
Ok(temp_table)
}
pub(crate) async fn prepare_table(
ctx: &Context,
client: &mut Client,
mut table: PgCreateTable,
if_exists: &IfExists,
) -> Result<()> {
match if_exists {
IfExists::Overwrite => {
drop_table_if_exists(ctx, client, &table).await?;
table.if_not_exists = false;
}
IfExists::Append => {
table.if_not_exists = true;
}
IfExists::Error => {
table.if_not_exists = false;
}
IfExists::Upsert(_keys) => {
table.if_not_exists = true;
}
}
create_table(ctx, client, &table).await
}
fn copy_from_sql(table: &PgCreateTable, data_format: &str) -> Result<String> {
let mut copy_sql_buff = vec![];
writeln!(&mut copy_sql_buff, "COPY {} (", TableName(&table.name),)?;
for (idx, col) in table.columns.iter().enumerate() {
if idx + 1 == table.columns.len() {
writeln!(&mut copy_sql_buff, " {}", Ident(&col.name))?;
} else {
writeln!(&mut copy_sql_buff, " {},", Ident(&col.name))?;
}
}
writeln!(&mut copy_sql_buff, ") FROM STDIN WITH {}", data_format)?;
let copy_sql = str::from_utf8(©_sql_buff)
.expect("generated SQL should always be UTF-8")
.to_owned();
Ok(copy_sql)
}
async fn copy_from_stream<'a>(
ctx: &'a Context,
client: &'a mut Client,
dest: &'a PgCreateTable,
stream: BoxStream<BytesMut>,
) -> Result<()> {
debug!(ctx.log(), "copying data into {:?}", dest.name);
let copy_from_sql = copy_from_sql(&dest, "BINARY")?;
let stmt = client.prepare(©_from_sql).compat().await?;
client
.copy_in(&stmt, &[], stream)
.compat()
.await
.with_context(|_| format!("error copying data into {}", dest.name))?;
Ok(())
}
fn upsert_sql(
src_table: &PgCreateTable,
dest_table: &PgCreateTable,
upsert_keys: &[String],
) -> Result<String> {
let upsert_keys_set: HashSet<&str> =
HashSet::from_iter(upsert_keys.iter().map(|k| &k[..]));
let value_keys = dest_table
.columns
.iter()
.filter_map(|c| {
if upsert_keys_set.contains(&c.name[..]) {
None
} else {
Some(&c.name[..])
}
})
.collect::<Vec<_>>();
Ok(format!(
r#"
INSERT INTO {dest_table} ({all_columns}) (
SELECT {all_columns} FROM {src_table}
)
ON CONFLICT ({key_columns})
DO UPDATE SET
{value_updates}
"#,
dest_table = Ident(&dest_table.name),
src_table = Ident(&src_table.name),
all_columns = dest_table.columns.iter().map(|c| Ident(&c.name)).join(", "),
key_columns = upsert_keys.iter().map(|k| Ident(k)).join(", "),
value_updates = value_keys
.iter()
.map(|vk| format!("{name} = EXCLUDED.{name}", name = vk))
.join(",\n "),
))
}
pub(crate) async fn upsert_from(
ctx: &Context,
client: &mut Client,
src_table: &PgCreateTable,
dest_table: &PgCreateTable,
upsert_keys: &[String],
) -> Result<()> {
let sql = upsert_sql(src_table, dest_table, upsert_keys)?;
debug!(
ctx.log(),
"upserting from {} to {} with {}", src_table.name, dest_table.name, sql,
);
let stmt = client.prepare(&sql).compat().await?;
client
.execute(&stmt, &[])
.compat()
.await
.with_context(|_| {
format!(
"error upserting from {} to {}",
src_table.name, dest_table.name,
)
})?;
Ok(())
}
pub(crate) async fn write_local_data_helper(
ctx: Context,
dest: PostgresLocator,
mut data: BoxStream<CsvStream>,
shared_args: SharedArguments<Unverified>,
dest_args: DestinationArguments<Unverified>,
) -> Result<BoxStream<BoxFuture<BoxLocator>>> {
let shared_args = shared_args.verify(PostgresLocator::features())?;
let dest_args = dest_args.verify(PostgresLocator::features())?;
let schema = shared_args.schema();
let if_exists = dest_args.if_exists().to_owned();
let url = dest.url.clone();
let table_name = dest.table_name.clone();
let ctx = ctx.child(o!("table" => table_name.clone()));
debug!(
ctx.log(),
"writing data streams to {} table {}", url, table_name,
);
let dest_table =
PgCreateTable::from_name_and_columns(table_name.clone(), &schema.columns)?;
let mut client = connect(ctx.clone(), url.clone()).await?;
prepare_table(&ctx, &mut client, dest_table.clone(), &if_exists).await?;
let mut real_dest_table =
PgCreateTable::from_pg_catalog(dest.url(), dest.table_name())?;
real_dest_table = real_dest_table.aligned_with(&dest_table)?;
let fut = async move {
loop {
match data.into_future().compat().await {
Err((err, _rest_of_stream)) => {
debug!(ctx.log(), "error reading stream of streams: {}", err);
return Err(err);
}
Ok((Some(csv_stream), rest_of_stream)) => {
data = rest_of_stream;
let ctx = ctx.child(o!("stream" => csv_stream.name.clone()));
let transform_table = real_dest_table.clone();
let binary_stream = spawn_sync_transform(
ctx.clone(),
"copy_csv_to_pg_binary".to_owned(),
csv_stream.data,
move |_ctx, rdr, wtr| {
copy_csv_to_pg_binary(&transform_table, rdr, wtr)
},
)?;
if let IfExists::Upsert(cols) = &if_exists {
let temp_table =
create_temp_table_for(&ctx, &mut client, &dest_table)
.await?;
copy_from_stream(
&ctx,
&mut client,
&temp_table,
binary_stream,
)
.await?;
upsert_from(
&ctx,
&mut client,
&temp_table,
&dest_table,
&cols,
)
.await?;
drop_table_if_exists(&ctx, &mut client, &temp_table).await?;
} else {
copy_from_stream(
&ctx,
&mut client,
&dest_table,
binary_stream,
)
.await?;
}
}
Ok((None, _rest_of_stream)) => {
return Ok(dest.boxed());
}
}
}
};
Ok(box_stream_once(Ok(fut.boxed())))
}