use itertools::Itertools;
use serde_json;
use std::{
collections::{HashMap, HashSet},
io::Write,
};
use super::{BqColumn, ColumnBigQueryExt, Ident, Mode, TableName, Usage};
use crate::common::*;
use crate::schema::{Column, Table};
use crate::uniquifier::Uniquifier;
pub(crate) trait TableBigQueryExt {
fn bigquery_can_import_from_csv(&self) -> Result<bool>;
}
impl TableBigQueryExt for Table {
fn bigquery_can_import_from_csv(&self) -> Result<bool> {
for col in &self.columns {
if !col.bigquery_can_import_from_csv()? {
return Ok(false);
}
}
Ok(true)
}
}
pub(crate) struct BqTable {
pub(crate) name: TableName,
pub(crate) columns: Vec<BqColumn>,
}
impl BqTable {
pub(crate) fn for_table_name_and_columns(
name: TableName,
columns: &[Column],
usage: Usage,
) -> Result<BqTable> {
let mut uniquifier = Uniquifier::default();
let columns = columns
.iter()
.map(move |c| BqColumn::for_column(c, usage, &mut uniquifier))
.collect::<Result<Vec<BqColumn>>>()?;
Ok(BqTable { name, columns })
}
pub(crate) fn to_table(&self) -> Result<Table> {
let columns = self
.columns
.iter()
.map(|c| c.to_column())
.collect::<Result<Vec<Column>>>()?;
Ok(Table {
name: self.name.to_string(),
columns,
})
}
pub(crate) fn name(&self) -> &TableName {
&self.name
}
pub(crate) fn write_json_schema(&self, f: &mut dyn Write) -> Result<()> {
serde_json::to_writer_pretty(f, &self.columns)?;
Ok(())
}
pub(crate) fn write_import_sql(
&self,
source_table_name: &TableName,
f: &mut dyn Write,
) -> Result<()> {
for (i, col) in self.columns.iter().enumerate() {
col.write_import_udf(f, i)?;
}
write!(f, "SELECT ")?;
for (i, col) in self.columns.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
col.write_import_select_expr(f, i)?;
}
write!(
f,
" FROM {}",
Ident(&source_table_name.dotted().to_string())
)?;
Ok(())
}
pub(crate) fn write_merge_sql(
&self,
source_table_name: &TableName,
merge_keys: &[String],
f: &mut dyn Write,
) -> Result<()> {
let mut column_map = HashMap::new();
for col in &self.columns {
let external_name =
col.external_name.as_ref().map(|n| &n[..]).ok_or_else(|| {
format_err!("missing external name for column {:?}", col.name)
})?;
column_map.insert(external_name, col);
}
let merge_keys = merge_keys
.iter()
.map(|key| -> Result<&BqColumn> {
Ok(column_map.get(&key[..]).ok_or_else(|| {
format_err!("upsert key {} is not in table", key)
})?)
})
.collect::<Result<Vec<&BqColumn>>>()?;
for merge_key in &merge_keys {
if merge_key.mode != Mode::Required {
return Err(format_err!(
"BigQuery cannot upsert on {:?} because it is not REQUIRED (aka NOT NULL)",
merge_key.name,
));
}
}
let merge_key_table = merge_keys
.iter()
.map(|c| &c.name[..])
.collect::<HashSet<_>>();
for (idx, col) in self.columns.iter().enumerate() {
col.write_import_udf(f, idx)?;
}
let col_import_expr = |c: &BqColumn, idx: usize| -> String {
let mut buf = vec![];
c.write_import_expr(&mut buf, idx, Some("temp."))
.expect("should always be able to write col_import_expr");
String::from_utf8(buf).expect("col_import_expr should be UTF-8")
};
write!(
f,
r#"
MERGE INTO {dest_table} AS dest
USING {temp_table} AS temp
ON
{key_comparisons}
WHEN MATCHED THEN UPDATE SET
{updates}
WHEN NOT MATCHED THEN INSERT (
{columns}
) VALUES (
{values}
)"#,
dest_table = Ident(&self.name().dotted().to_string()),
temp_table = Ident(&source_table_name.dotted().to_string()),
key_comparisons = merge_keys
.iter()
.enumerate()
.map(|(idx, c)| format!(
"dest.{col} = {expr}",
col = Ident(&c.name),
expr = col_import_expr(c, idx),
))
.join(" AND\n "),
updates = self
.columns
.iter()
.enumerate()
.filter_map(|(idx, c)| if merge_key_table.contains(&c.name[..]) {
None
} else {
Some(format!(
"{col} = {expr}",
col = Ident(&c.name),
expr = col_import_expr(c, idx),
))
})
.join(",\n "),
columns = self.columns.iter().map(|c| Ident(&c.name)).join(",\n "),
values = self
.columns
.iter()
.enumerate()
.map(|(idx, c)| col_import_expr(c, idx))
.join(",\n "),
)?;
Ok(())
}
pub(crate) fn write_export_sql(
&self,
source_args: &SourceArguments<Verified>,
f: &mut dyn Write,
) -> Result<()> {
write!(f, "SELECT ")?;
for (i, col) in self.columns.iter().enumerate() {
if i > 0 {
write!(f, ",")?;
}
col.write_export_select_expr(f)?;
}
write!(f, " FROM {}", Ident(&self.name.dotted().to_string()))?;
if let Some(where_clause) = source_args.where_clause() {
write!(f, " WHERE ({})", where_clause)?;
}
Ok(())
}
pub(crate) fn write_count_sql(
&self,
source_args: &SourceArguments<Verified>,
f: &mut dyn Write,
) -> Result<()> {
write!(f, "SELECT COUNT(*) AS `count`")?;
write!(f, " FROM {}", Ident(&self.name.dotted().to_string()))?;
if let Some(where_clause) = source_args.where_clause() {
write!(f, " WHERE ({})", where_clause)?;
}
Ok(())
}
}