use std::io;
use sqlx::{ColumnIndex, Database, Decode, Encode, Executor, IntoArguments};
use crate::sql::{
dialects::{
self,
schema::schema::{self, Schema},
},
schema::table::TableSchema,
};
pub mod table;
pub async fn sync_tables<C, DB: Database>(
conn: &mut C,
tables: Vec<TableSchema>
) -> io::Result<()>
where
for<'e> &'e mut C: Executor<'e, Database = DB>,
for<'a> DB::Arguments<'a>: IntoArguments<'a, DB>,
for<'a> &'a str: ColumnIndex<DB::Row>,
for<'a> bool: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i32: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i64: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i64: Encode<'a, DB>,
for<'a> std::string::String: Decode<'a, DB> + Encode<'a, DB> + sqlx::Type<DB>,
{
sync_tables_with_schema(conn, tables, "").await
}
pub async fn sync_tables_with_schema<C, DB: Database>(
conn: &mut C,
tables: Vec<TableSchema>,
default_schema: &str,
) -> io::Result<()>
where
for<'e> &'e mut C: Executor<'e, Database = DB>,
for<'a> DB::Arguments<'a>: IntoArguments<'a, DB>,
for<'a> &'a str: ColumnIndex<DB::Row>,
for<'a> bool: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i32: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i64: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i64: Encode<'a, DB>,
for<'a> std::string::String: Decode<'a, DB> + Encode<'a, DB> + sqlx::Type<DB>,
{
let s = &dialects::schema::new(default_schema.to_string());
check_recreate(&tables, &mut *conn, s).await?;
let db_tables = s.get_tables(&mut *conn).await?;
for table in tables {
if let Some(db_table) = db_tables.iter().find(|t| s.is_table_name_equal(&table, t)) {
for col in &table.columns {
if let Some(db_col) = db_table.columns.iter().find(|c| col.is_name_equal(&c)) {
let sqls = s.sql_alter_column(&table, &db_col, &col)?;
for sql in sqls {
s.execute_sql(&mut *conn, &sql).await?;
}
} else {
let sql = s.sql_add_column(db_table, &col);
s.execute_sql(&mut *conn, &sql).await?;
}
}
if table.trim_columns {
for db_col in &db_table.columns {
if !table.columns.iter().any(|c| c.is_name_equal(db_col)) {
let sql = s.sql_drop_column(&table, db_col);
s.execute_sql(&mut *conn, &sql).await?;
}
}
}
if let Some(new_indexes) = &table.indexes {
for index in new_indexes {
if let Some(olds) = &db_table.indexes {
if let Some(old) = olds.iter().find(|idx| idx.is_name_equal(index)) {
if old.is_columns_equal(index) {
continue;
}
let sql = s.sql_drop_index(db_table, old);
s.execute_sql(&mut *conn, &sql).await?;
}
}
if let Some(sql) = &s.sql_create_index(&table, &index) {
s.execute_sql(&mut *conn, &sql).await?;
}
}
}
if table.trim_indexes {
if let Some(old) = &db_table.indexes {
for oidx in old {
if let Some(idxs) = &table.indexes {
if idxs.iter().any(|idx| idx.is_name_equal(oidx)) {
continue;
}
}
let sql = s.sql_drop_index(db_table, oidx);
s.execute_sql(&mut *conn, &sql).await?;
}
}
}
} else {
let table_sqls = s.sql_create_table(&table)?;
for sql in table_sqls.iter() {
s.execute_sql(&mut *conn, &sql).await?;
}
let index_sqls = s.sql_create_indexes(&table);
for sql in index_sqls.iter() {
s.execute_sql(&mut *conn, &sql).await?;
}
}
}
Ok(())
}
async fn check_recreate<'c, T, DB: Database>(
tables: &Vec<TableSchema>,
conn: &mut T,
s: &impl schema::Schema,
) -> io::Result<()>
where
for<'e> &'e mut T: Executor<'e, Database = DB>,
for<'a> DB::Arguments<'a>: IntoArguments<'a, DB>,
for<'a> &'a str: ColumnIndex<DB::Row>,
for<'a> bool: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i32: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i64: sqlx::Decode<'a, DB> + sqlx::Type<DB>,
for<'a> i64: Encode<'a, DB>,
for<'a> std::string::String: Decode<'a, DB> + Encode<'a, DB> + sqlx::Type<DB>,
{
for table in tables {
if let Some(value) = &table.recreate {
let table_name = &s.table_name_with_schema(table);
let values = s
.query_upgrade_tags(&mut *conn, table_name, &"recreate".to_string())
.await?;
let found = values.iter().any(|v| v == value);
if !found {
let sql = s.sql_drop_table(table);
s.execute_sql(&mut *conn, &sql).await?;
s.insert_upgrade_tag(&mut *conn, table_name, &"recreate".to_string(), value)
.await?;
}
}
}
Ok(())
}