use itertools::Itertools;
use std::{
collections::{HashMap, HashSet},
fmt,
};
use super::{PgColumn, PgDataType, PgName, PgScalarDataType};
use crate::common::*;
use crate::schema::Column;
use crate::separator::Separator;
#[derive(Debug)]
pub(crate) enum CheckCatalog {
Yes,
No,
}
impl From<&IfExists> for CheckCatalog {
fn from(if_exists: &IfExists) -> CheckCatalog {
match if_exists {
IfExists::Error | IfExists::Overwrite => CheckCatalog::No,
IfExists::Append | IfExists::Upsert(_) => CheckCatalog::Yes,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PgCreateTable {
pub(crate) name: PgName,
pub(crate) columns: Vec<PgColumn>,
pub(crate) if_not_exists: bool,
pub(crate) temporary: bool,
}
impl PgCreateTable {
pub(crate) fn from_name_and_columns(
schema: &Schema,
table_name: PgName,
columns: &[Column],
) -> Result<PgCreateTable> {
let pg_columns = columns
.iter()
.map(|c| PgColumn::from_column(schema, c))
.collect::<Result<Vec<PgColumn>>>()?;
Ok(PgCreateTable {
name: table_name,
columns: pg_columns,
if_not_exists: false,
temporary: false,
})
}
pub(crate) fn to_table(&self) -> Result<Table> {
let columns = self
.columns
.iter()
.map(|c| c.to_column())
.collect::<Result<Vec<Column>>>()?;
Ok(Table {
name: self.name.unquoted(),
columns,
})
}
pub(crate) fn aligned_with(
&self,
other_table: &PgCreateTable,
) -> Result<PgCreateTable> {
let column_map = self
.columns
.iter()
.map(|c| (&c.name[..], c))
.collect::<HashMap<_, _>>();
Ok(PgCreateTable {
name: self.name.clone(),
columns: other_table
.columns
.iter()
.map(|c| {
if let Some(&col) = column_map.get(&c.name[..]) {
Ok(col.to_owned())
} else {
Err(format_err!(
"could not find column {} in destination table: {}",
c.name,
column_map.keys().join(", "),
))
}
})
.collect::<Result<Vec<_>>>()?,
if_not_exists: self.if_not_exists,
temporary: self.temporary,
})
}
pub(crate) fn named_type_names(&self) -> HashSet<&PgName> {
let mut names = HashSet::new();
for col in &self.columns {
let scalar_ty = match &col.data_type {
PgDataType::Array { ty, .. } => ty,
PgDataType::Scalar(ty) => ty,
};
if let PgScalarDataType::Named(name) = scalar_ty {
names.insert(name);
}
}
names
}
pub(crate) fn write_export_sql(
&self,
f: &mut dyn Write,
source_args: &SourceArguments<Verified>,
) -> Result<()> {
write!(f, "COPY (")?;
self.write_export_select_sql(f, source_args)?;
write!(f, ") TO STDOUT WITH CSV HEADER")?;
Ok(())
}
pub(crate) fn write_export_select_sql(
&self,
f: &mut dyn Write,
source_args: &SourceArguments<Verified>,
) -> Result<()> {
write!(f, "SELECT ")?;
if self.columns.is_empty() {
return Err(format_err!("cannot export 0 columns"));
}
let mut sep = Separator::new(",");
for col in &self.columns {
write!(f, "{}", sep.display())?;
col.write_export_select_expr(f)?;
}
write!(f, " FROM {}", &self.name.quoted())?;
if let Some(where_clause) = source_args.where_clause() {
write!(f, " WHERE ({})", where_clause)?;
}
Ok(())
}
pub(crate) fn write_count_sql(
&self,
f: &mut dyn Write,
source_args: &SourceArguments<Verified>,
) -> Result<()> {
writeln!(f, "SELECT COUNT(*)")?;
writeln!(f, " FROM {}", &self.name.quoted())?;
if let Some(where_clause) = source_args.where_clause() {
writeln!(f, " WHERE ({})", where_clause)?;
}
Ok(())
}
}
impl fmt::Display for PgCreateTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CREATE")?;
if self.temporary {
write!(f, " TEMPORARY")?;
}
write!(f, " TABLE")?;
if self.if_not_exists {
write!(f, " IF NOT EXISTS")?;
}
writeln!(f, " {} (", &self.name.quoted())?;
for (idx, col) in self.columns.iter().enumerate() {
write!(f, " {}", col)?;
if idx + 1 == self.columns.len() {
writeln!(f)?;
} else {
writeln!(f, ",")?;
}
}
writeln!(f, ");")?;
Ok(())
}
}