use std::{collections::HashMap, fmt, iter::FromIterator, str::FromStr};
use super::{catalog, PgColumn, TableName};
use crate::common::*;
use crate::schema::Column;
use crate::separator::Separator;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PgCreateTable {
pub(crate) name: String,
pub(crate) columns: Vec<PgColumn>,
pub(crate) if_not_exists: bool,
pub(crate) temporary: bool,
}
impl PgCreateTable {
pub(crate) fn from_name_and_columns(
name: String,
columns: &[Column],
) -> Result<PgCreateTable> {
let pg_columns = columns
.iter()
.map(|c| PgColumn::from_column(c))
.collect::<Result<Vec<PgColumn>>>()?;
Ok(PgCreateTable {
name,
columns: pg_columns,
if_not_exists: false,
temporary: false,
})
}
pub(crate) fn from_pg_catalog(
database_url: &Url,
full_table_name: &str,
) -> Result<PgCreateTable> {
catalog::fetch_from_url(database_url, full_table_name)
}
pub(crate) fn to_table(&self) -> Result<Table> {
let columns = self
.columns
.iter()
.map(|c| c.to_column())
.collect::<Result<Vec<Column>>>()?;
Ok(Table {
name: self.name.clone(),
columns,
})
}
pub(crate) fn aligned_with(
&self,
other_table: &PgCreateTable,
) -> Result<PgCreateTable> {
let column_map = HashMap::<&str, &PgColumn>::from_iter(
self.columns.iter().map(|c| (&c.name[..], c)),
);
Ok(PgCreateTable {
name: self.name.clone(),
columns: other_table
.columns
.iter()
.map(|c| {
if let Some(&col) = column_map.get(&c.name[..]) {
Ok(col.to_owned())
} else {
Err(format_err!(
"could not find column {} in destination table",
c.name
))
}
})
.collect::<Result<Vec<_>>>()?,
if_not_exists: self.if_not_exists,
temporary: self.temporary,
})
}
pub(crate) fn write_export_sql(
&self,
f: &mut dyn Write,
source_args: &SourceArguments<Verified>,
) -> Result<()> {
write!(f, "COPY (")?;
self.write_export_select_sql(f, source_args)?;
write!(f, ") TO STDOUT WITH CSV HEADER")?;
Ok(())
}
pub(crate) fn write_export_select_sql(
&self,
f: &mut dyn Write,
source_args: &SourceArguments<Verified>,
) -> Result<()> {
write!(f, "SELECT ")?;
if self.columns.is_empty() {
return Err(format_err!("cannot export 0 columns"));
}
let mut sep = Separator::new(",");
for col in &self.columns {
write!(f, "{}", sep.display())?;
col.write_export_select_expr(f)?;
}
write!(f, " FROM {}", TableName(&self.name))?;
if let Some(where_clause) = source_args.where_clause() {
write!(f, " WHERE ({})", where_clause)?;
}
Ok(())
}
pub(crate) fn write_count_sql(
&self,
f: &mut dyn Write,
source_args: &SourceArguments<Verified>,
) -> Result<()> {
writeln!(f, "SELECT COUNT(*)")?;
writeln!(f, " FROM {}", TableName(&self.name))?;
if let Some(where_clause) = source_args.where_clause() {
writeln!(f, " WHERE ({})", where_clause)?;
}
Ok(())
}
}
impl fmt::Display for PgCreateTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CREATE")?;
if self.temporary {
write!(f, " TEMPORARY")?;
}
write!(f, " TABLE")?;
if self.if_not_exists {
write!(f, " IF NOT EXISTS")?;
}
writeln!(f, " {} (", TableName(&self.name))?;
for (idx, col) in self.columns.iter().enumerate() {
write!(f, " {}", col)?;
if idx + 1 == self.columns.len() {
writeln!(f)?;
} else {
writeln!(f, ",")?;
}
}
writeln!(f, ");")?;
Ok(())
}
}
#[allow(clippy::all, rust_2018_idioms, elided_lifetimes_in_paths)]
mod grammar {
include!(concat!(env!("OUT_DIR"), "/create_table_sql.rs"));
}
impl FromStr for PgCreateTable {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
Ok(grammar::create_table(s)
.context("error parsing Postgres `CREATE TABLE`")?)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::schema::{Column, DataType, Srid};
use std::str;
#[test]
fn simple_table() {
let input = include_str!("create_table_sql_example.sql");
let pg_table: PgCreateTable = input.parse().unwrap();
let table = pg_table.to_table().unwrap();
let expected = Table {
name: "example".to_string(),
columns: vec![
Column {
name: "a".to_string(),
is_nullable: true,
data_type: DataType::Text,
comment: None,
},
Column {
name: "b".to_string(),
is_nullable: true,
data_type: DataType::Int32,
comment: None,
},
Column {
name: "c".to_string(),
is_nullable: false,
data_type: DataType::Uuid,
comment: None,
},
Column {
name: "d".to_string(),
is_nullable: true,
data_type: DataType::Date,
comment: None,
},
Column {
name: "e".to_string(),
is_nullable: true,
data_type: DataType::Float64,
comment: None,
},
Column {
name: "f".to_string(),
is_nullable: true,
data_type: DataType::Array(Box::new(DataType::Text)),
comment: None,
},
Column {
name: "g".to_string(),
is_nullable: true,
data_type: DataType::Array(Box::new(DataType::Int32)),
comment: None,
},
Column {
name: "h".to_string(),
is_nullable: true,
data_type: DataType::GeoJson(Srid::wgs84()),
comment: None,
},
Column {
name: "i".to_string(),
is_nullable: true,
data_type: DataType::GeoJson(Srid::new(3857)),
comment: None,
},
Column {
name: "j".to_string(),
is_nullable: true,
data_type: DataType::Int16,
comment: None,
},
Column {
name: "k".to_string(),
is_nullable: true,
data_type: DataType::TimestampWithoutTimeZone,
comment: None,
},
],
};
assert_eq!(table, expected);
let mut out = vec![];
write!(&mut out, "{}", &pg_table).expect("error writing table");
let pg_parsed_again: PgCreateTable = str::from_utf8(&out)
.unwrap()
.parse()
.expect("error parsing table");
let parsed_again = pg_parsed_again.to_table().unwrap();
assert_eq!(parsed_again, expected);
}
}