1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
#![cfg(feature = "alter-table")]

use {
    super::validate,
    crate::{
        ast::{AlterTableOperation, ObjectName},
        data::get_name,
        result::{MutResult, TrySelf},
        store::{GStore, GStoreMut},
    },
    std::fmt::Debug,
};

#[cfg(feature = "index")]
use {
    super::AlterError,
    crate::{
        ast::Expr,
        data::{Schema, SchemaIndex},
    },
    futures::stream::{self, TryStreamExt},
};

pub async fn alter_table<T: 'static + Debug, U: GStore<T> + GStoreMut<T>>(
    storage: U,
    name: &ObjectName,
    operation: &AlterTableOperation,
) -> MutResult<U, ()> {
    let (storage, table_name) = get_name(name).try_self(storage)?;

    match operation {
        AlterTableOperation::RenameTable {
            table_name: new_table_name,
        } => {
            let (storage, new_table_name) = get_name(new_table_name).try_self(storage)?;

            storage.rename_schema(table_name, new_table_name).await
        }
        AlterTableOperation::RenameColumn {
            old_column_name,
            new_column_name,
        } => {
            storage
                .rename_column(table_name, old_column_name, new_column_name)
                .await
        }
        AlterTableOperation::AddColumn { column_def } => {
            validate(column_def)
                .try_self(storage)
                .map(|(storage, _)| storage)?
                .add_column(table_name, column_def)
                .await
        }
        AlterTableOperation::DropColumn {
            column_name,
            if_exists,
        } => {
            #[cfg(feature = "index")]
            let storage = {
                let indexes = match storage.fetch_schema(table_name).await {
                    Ok(Some(Schema { indexes, .. })) => indexes,
                    Ok(None) => {
                        return Err((
                            storage,
                            AlterError::TableNotFound(table_name.to_owned()).into(),
                        ));
                    }
                    Err(e) => {
                        return Err((storage, e));
                    }
                };

                let indexes = indexes
                    .iter()
                    .filter(|SchemaIndex { expr, .. }| find_column(expr, column_name))
                    .map(Ok);

                stream::iter(indexes)
                    .try_fold(storage, |storage, SchemaIndex { name, .. }| async move {
                        storage
                            .drop_index(table_name, name)
                            .await
                            .map(|(storage, _)| storage)
                    })
                    .await?
            };

            storage
                .drop_column(table_name, column_name, *if_exists)
                .await
        }
    }
}

#[cfg(feature = "index")]
fn find_column(expr: &Expr, column_name: &str) -> bool {
    let find = |expr| find_column(expr, column_name);

    match expr {
        Expr::Identifier(ident) => ident == column_name,
        Expr::Nested(expr) => find(expr),
        Expr::BinaryOp { left, right, .. } => find(left) || find(right),
        Expr::UnaryOp { expr, .. } => find(expr),
        Expr::Cast { expr, .. } => find(expr),
        _ => false,
    }
}