#![allow(unused_variables)]
use itertools::Itertools;
use log::{debug, info};
use postgres::Client;
use sqlparser::ast;
use sqlparser::ast::{AlterTableOperation, ColumnDef, DataType, Ident, ObjectName, ObjectType, Statement};
use sqlparser::dialect::PostgreSqlDialect;
use crate::{file_loader, SQLParser};
use crate::plan::{Error, Plan};
pub fn apply_file(files: Vec<String>, schema: &String, client: &mut Client) -> Result<Plan, Error> {
let mut plan = Plan::new();
let mut all_statements: Vec<ast::Statement> = vec![];
debug!("files: {:?}",files);
create_schema_if_not_exist_or_public(&schema, client, &mut plan);
let dialect = PostgreSqlDialect {}; for file in files {
let contents = file_loader::load(&file);
let ast: Vec<ast::Statement> = SQLParser::parse_sql(&dialect, &contents).unwrap();
info!("AST: {:?}", ast);
all_statements.extend(ast);
}
fetch_table_names_all_from_file(&mut plan, &all_statements);
plan.table_names_dup_from_file = plan.table_names_all_from_file.clone().into_iter().duplicates().collect::<Vec<String>>();
if !plan.table_names_dup_from_file.is_empty() { return Err(Error::DuplicateTableName { table_names: plan.table_names_dup_from_file.get(0).unwrap().to_string() }); }
plan.table_names_unique_from_file = plan.table_names_all_from_file.clone().into_iter().unique().collect::<Vec<String>>();
fetch_table_names_all_from_db(&schema, client, &mut plan);
fetch_table_names_and_table_names_existing_or_drop(&schema, client, &mut plan);
filter_table_names_new(&mut plan);
filter_table_names_existing_for_table_names_unchanged_or_table_statements_changes(&schema, client, &mut plan, &all_statements);
fill_table_statements_dropped(&mut plan);
fill_table_statements_new(&mut plan, &all_statements);
make_sql_statements(&mut plan);
make_reverse_plan(&mut plan, &schema, client);
return Ok(plan);
}
fn create_schema_if_not_exist_or_public(schema: &String, client: &mut Client, plan: &mut Plan) {
debug!("!schema.eq_ignore_ascii_case(public): {} | {}",schema,!schema.eq_ignore_ascii_case("public"));
if !schema.eq_ignore_ascii_case("public") {
if let is_schema_exist = client.query_one("SELECT EXISTS(SELECT 1 FROM information_schema.schemata WHERE schema_name = $1)", &[schema]).unwrap() {
let schema_status: bool = is_schema_exist.get(0);
plan.schema_name = schema.to_string();
plan.schema_does_not_exist = !schema_status;
debug!("New schema {} found: status:{}",schema,schema_status);
}
}
}
fn fill_table_statements_new(plan: &mut Plan, ast: &Vec<Statement>) {
for statement in ast {
match statement {
Statement::CreateTable { or_replace, temporary, external, global, if_not_exists, name, columns, constraints, hive_distribution, hive_formats, table_properties, with_options, file_format, location, query, without_rowid, like, engine, default_charset, collation, on_commit } => {
if plan.table_names_new.contains(&name.to_string()) {
plan.table_statements_new.push(statement.clone());
}
}
_ => {}
}
}
}
fn fill_table_statements_dropped(plan: &mut Plan) {
for table_name in &plan.table_names_dropped {
plan.table_statements_dropped.push(Statement::Drop {
object_type: ObjectType::Table,
if_exists: false,
names: vec![ObjectName(vec![Ident { value: table_name.to_string(), quote_style: None }])],
cascade: false,
purge: false,
})
}
}
fn filter_table_names_existing_for_table_names_unchanged_or_table_statements_changes(schema: &&String, client: &mut Client, plan: &mut Plan, ast: &Vec<Statement>) {
for table_name in &plan.table_names_existing {
let column_defs_from_db = make_column_def_by_table_name(&schema, client, &table_name);
for statement in ast {
match statement {
Statement::CreateTable { or_replace, temporary, external, global, if_not_exists, name, columns, constraints, hive_distribution, hive_formats, table_properties, with_options, file_format, location, query, without_rowid, like, engine, default_charset, collation, on_commit } => {
if name.to_string() == table_name.to_string() {
debug!("{:?}", columns);
let mut table_changes = diff_from_table_statements(name, columns.to_vec(), column_defs_from_db);
if table_changes.len() == 0 {
plan.table_names_unchanged.push(name.to_string());
}
plan.table_statements_changes.append(&mut table_changes);
break;
}
}
_ => {}
}
}
}
}
fn filter_table_names_new(plan: &mut Plan) {
for name in &plan.table_names_unique_from_file {
if !&plan.table_names_existing.contains(&name.to_string()) && !&plan.table_names_dropped.contains(&name.to_string()) {
plan.table_names_new.push(name.to_string());
}
}
}
fn fetch_table_names_and_table_names_existing_or_drop(schema: &&String, client: &mut Client, plan: &mut Plan) {
for row_table in client.query("Select table_name from information_schema.tables where table_schema= $1 ", &[schema]).unwrap() {
let table_name: &str = row_table.get(0);
for name in &plan.table_names_unique_from_file {
if name == table_name {
plan.table_names_existing.push(table_name.to_string());
}
}
if !&plan.table_names_existing.contains(&table_name.to_string()) {
plan.table_names_dropped.push(table_name.to_string())
}
}
}
fn fetch_table_names_all_from_file(plan: &mut Plan, ast: &Vec<Statement>) {
for statement in ast {
match statement {
Statement::CreateTable { or_replace, temporary, external, global, if_not_exists, name, columns, constraints, hive_distribution, hive_formats, table_properties, with_options, file_format, location, query, without_rowid, like, engine, default_charset, collation, on_commit } => {
plan.table_names_all_from_file.push(name.to_string());
}
_ => {}
}
}
}
fn fetch_table_names_all_from_db(schema: &&String, client: &mut Client, plan: &mut Plan) {
for row_table in client.query("Select table_name from information_schema.tables where table_schema= $1 ", &[schema]).unwrap() {
let table_name: &str = row_table.get(0);
plan.table_names_all_from_db.push(table_name.to_string());
}
}
fn make_column_def_by_table_name(schema: &&String, client: &mut Client, table_name: &String) -> Vec<ColumnDef> {
let mut column_defs_from_db: Vec<ColumnDef> = vec![];
for row_column in client.query("Select * from information_schema.columns where table_schema = $1 and table_name= $2 ", &[&schema, &table_name.to_string()]).unwrap() {
let table_catalog: &str = row_column.get("table_catalog"); let table_schema: &str = row_column.get("table_schema"); let table_name: &str = row_column.get("table_name"); let column_name: &str = row_column.get("column_name"); let ordinal_position: i32 = row_column.get("ordinal_position"); let column_default: Option<&str> = row_column.get("column_default"); let is_nullable: bool = if row_column.get::<&str, &str>("is_nullable") == "YES" { true } else { false }; let data_type: &str = row_column.get("data_type"); let character_maximum_length: Option<i32> = row_column.get("character_maximum_length"); let character_octet_length: Option<i32> = row_column.get("character_octet_length"); let numeric_precision: Option<i32> = row_column.get("numeric_precision"); let numeric_precision_radix: Option<i32> = row_column.get("numeric_precision_radix"); let numeric_scale: Option<i32> = row_column.get("numeric_scale"); let datetime_precision: Option<i32> = row_column.get("datetime_precision"); let interval_type: Option<&str> = row_column.get("interval_type"); let interval_precision: Option<i32> = row_column.get("interval_precision"); let character_set_catalog: Option<&str> = row_column.get("character_set_catalog"); let character_set_schema: Option<&str> = row_column.get("character_set_schema"); let character_set_name: Option<&str> = row_column.get("character_set_name"); let collation_catalog: Option<&str> = row_column.get("collation_catalog"); let collation_schema: Option<&str> = row_column.get("collation_schema"); let collation_name: Option<&str> = row_column.get("collation_name"); let domain_catalog: Option<&str> = row_column.get("domain_catalog"); let domain_schema: Option<&str> = row_column.get("domain_schema"); let domain_name: Option<&str> = row_column.get("domain_name"); let udt_catalog: Option<&str> = row_column.get("udt_catalog"); let udt_schema: Option<&str> = row_column.get("udt_schema"); let udt_name: Option<&str> = row_column.get("udt_name"); let maximum_cardinality: Option<i32> = row_column.get("maximum_cardinality"); let dtd_identifier: Option<&str> = row_column.get("dtd_identifier"); let is_identity: Option<&str> = row_column.get("is_identity"); let identity_generation: Option<&str> = row_column.get("identity_generation"); let identity_start: Option<&str> = row_column.get("identity_start"); let identity_increment: Option<&str> = row_column.get("identity_increment"); let identity_maximum: Option<&str> = row_column.get("identity_maximum"); let identity_minimum: Option<&str> = row_column.get("identity_minimum"); let identity_cycle: Option<&str> = row_column.get("identity_cycle"); let is_generated: Option<&str> = row_column.get("is_generated"); let generation_expression: Option<&str> = row_column.get("generation_expression"); let is_updatable: Option<&str> = row_column.get("is_updatable");
let dt = match data_type {
"integer" => { DataType::Int(None) }
"text" => { DataType::Text }
&_ => { DataType::Text }
};
debug!("{}, {}", column_name, data_type);
let stmt_column = ColumnDef { name: Ident { value: column_name.to_string(), quote_style: None }, data_type: dt, collation: None, options: vec![] };
column_defs_from_db.push(stmt_column);
}
column_defs_from_db
}
fn diff_from_table_statements(table_name: &ObjectName, from_file: Vec<ColumnDef>, from_db: Vec<ColumnDef>) -> Vec<Statement> {
let mut table_statement_updated: Vec<Statement> = vec![];
for column_file in &from_file {
for column_db in &from_db {
debug!("{},{}", column_file, column_db);
if column_file.name.value == column_db.name.value {
debug!("diff_from_table_statements: {},{}", column_file, column_db);
if column_file.data_type != column_db.data_type {
table_statement_updated.push(
Statement::AlterTable {
name: table_name.clone(),
operation: AlterTableOperation::DropColumn {
column_name: column_file.name.clone(),
if_exists: false,
cascade: false,
},
}
);
table_statement_updated.push(
Statement::AlterTable {
name: table_name.clone(),
operation: AlterTableOperation::AddColumn {
column_def: ColumnDef {
name: column_file.name.clone(),
data_type: column_file.data_type.clone(),
collation: None,
options: vec![],
},
},
}
);
}
break;
}
}
}
return table_statement_updated;
}
fn make_sql_statements(plan: &mut Plan) {
for table_statement_dropped in &plan.table_statements_dropped {
plan.sql_statements_for_step_up.push(table_statement_dropped.to_string());
}
for table_statements_change in &plan.table_statements_changes {
plan.sql_statements_for_step_up.push(table_statements_change.to_string());
}
for table_statement_new in &plan.table_statements_new {
plan.sql_statements_for_step_up.push(table_statement_new.to_string());
}
}
fn make_reverse_plan(plan: &mut Plan, schema: &&String, client: &mut Client) {
for table_statement_dropped in &plan.table_statements_dropped {
match table_statement_dropped {
Statement::Drop { object_type: _, if_exists: _, names, cascade: _, purge: _ } => {
let table_name = names[0].to_string();
debug!("{}",table_name);
}
_ => {}
}
}
for table_statements_change in &plan.table_statements_changes {
match table_statements_change {
Statement::Drop { object_type, if_exists, names, cascade, purge } => {}
_ => {}
}
}
for table_statement_new in &plan.table_statements_new {
match table_statement_new {
Statement::CreateTable { or_replace, temporary, external, global, if_not_exists, name, columns, constraints, hive_distribution, hive_formats, table_properties, with_options, file_format, location, query, without_rowid, like, engine, default_charset, collation, on_commit } => {
plan.sql_statements_for_step_down.push(Statement::Drop {
object_type: ObjectType::Table,
if_exists: false,
names: vec![name.to_owned()],
cascade: false,
purge: false,
}.to_string())
}
_ => {}
}
}
}