use std::cell::RefCell;
use std::collections::BTreeSet;
use polars_arrow::error::to_compute_err;
use polars_core::prelude::*;
use polars_lazy::prelude::*;
use polars_plan::prelude::*;
use polars_plan::utils::expressions_to_schema;
use sqlparser::ast::{
    Distinct, ExcludeSelectItem, Expr as SqlExpr, FunctionArg, JoinOperator, ObjectName,
    ObjectType, Offset, OrderByExpr, Query, Select, SelectItem, SetExpr, SetOperator,
    SetQuantifier, Statement, TableAlias, TableFactor, TableWithJoins, Value as SQLValue,
    WildcardAdditionalOptions,
};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::{Parser, ParserOptions};
use crate::sql_expr::{parse_sql_expr, process_join_constraint};
use crate::table_functions::PolarsTableFunctions;
#[derive(Default, Clone)]
pub struct SQLContext {
    pub(crate) table_map: PlHashMap<String, LazyFrame>,
    cte_map: RefCell<PlHashMap<String, LazyFrame>>,
}
impl SQLContext {
    pub fn new() -> Self {
        Self {
            table_map: PlHashMap::new(),
            cte_map: RefCell::new(PlHashMap::new()),
        }
    }
    pub fn get_tables(&self) -> Vec<String> {
        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
        tables.sort_unstable();
        tables
    }
    pub fn register(&mut self, name: &str, lf: LazyFrame) {
        self.table_map.insert(name.to_owned(), lf);
    }
    pub fn unregister(&mut self, name: &str) {
        self.table_map.remove(&name.to_owned());
    }
    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
        let mut parser = Parser::new(&GenericDialect);
        parser = parser.with_options(ParserOptions {
            trailing_commas: true,
            ..Default::default()
        });
        let ast = parser
            .try_with_sql(query)
            .map_err(to_compute_err)?
            .parse_statements()
            .map_err(to_compute_err)?;
        polars_ensure!(ast.len() == 1, ComputeError: "One and only one statement at a time please");
        let res = self.execute_statement(ast.get(0).unwrap());
        self.cte_map.borrow_mut().clear();
        res
    }
}
impl SQLContext {
    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
        self.cte_map.borrow_mut().insert(name.to_owned(), lf);
    }
    fn get_table_from_current_scope(&mut self, name: &str) -> Option<LazyFrame> {
        let table_name = self.table_map.get(name).cloned();
        table_name.or_else(|| self.cte_map.borrow().get(name).cloned())
    }
    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        let ast = stmt;
        Ok(match ast {
            Statement::Query(query) => self.execute_query(query)?,
            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
            stmt @ Statement::Drop {
                object_type: ObjectType::Table,
                ..
            } => self.execute_drop_table(stmt)?,
            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
            _ => polars_bail!(
                ComputeError: "SQL statement type {:?} is not supported", ast,
            ),
        })
    }
    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
        self.register_ctes(query)?;
        let lf = self.process_set_expr(&query.body, query)?;
        self.process_limit_offset(lf, &query.limit, &query.offset)
    }
    fn process_set_expr(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
        match expr {
            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
            SetExpr::Query(query) => self.execute_query(query),
            SetExpr::SetOperation {
                op: SetOperator::Union,
                set_quantifier,
                left,
                right,
            } => self.process_union(left, right, set_quantifier, query),
            SetExpr::SetOperation { op, .. } => {
                polars_bail!(InvalidOperation: "{} operation not yet supported", op)
            },
            op => polars_bail!(InvalidOperation: "{} operation not yet supported", op),
        }
    }
    fn process_union(
        &mut self,
        left: &SetExpr,
        right: &SetExpr,
        quantifier: &SetQuantifier,
        query: &Query,
    ) -> PolarsResult<LazyFrame> {
        let left = self.process_set_expr(left, query)?;
        let right = self.process_set_expr(right, query)?;
        let concatenated = polars_lazy::dsl::concat(
            vec![left, right],
            UnionArgs {
                parallel: true,
                ..Default::default()
            },
        );
        match quantifier {
            SetQuantifier::All => concatenated,
            _ => concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any)),
        }
    }
    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        match stmt {
            Statement::Explain { statement, .. } => {
                let lf = self.execute_statement(statement)?;
                let plan = lf.describe_optimized_plan()?;
                let plan = plan
                    .split('\n')
                    .collect::<Series>()
                    .with_name("Logical Plan");
                let df = DataFrame::new(vec![plan])?;
                Ok(df.lazy())
            },
            _ => unreachable!(),
        }
    }
    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
        let tables = Series::new("name", self.get_tables());
        let df = DataFrame::new(vec![tables])?;
        Ok(df.lazy())
    }
    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        match stmt {
            Statement::Drop { names, .. } => {
                for name in names {
                    self.table_map.remove(&name.to_string());
                }
                Ok(DataFrame::empty().lazy())
            },
            _ => unreachable!(),
        }
    }
    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
        if let Some(with) = &query.with {
            if with.recursive {
                polars_bail!(ComputeError: "Recursive CTEs are not supported")
            }
            for cte in &with.cte_tables {
                let cte_name = cte.alias.name.to_string();
                let cte_lf = self.execute_query(&cte.query)?;
                self.register_cte(&cte_name, cte_lf);
            }
        }
        Ok(())
    }
    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
        let (tbl_name, mut lf) = self.get_table(&tbl_expr.relation)?;
        if !tbl_expr.joins.is_empty() {
            for tbl in &tbl_expr.joins {
                let (join_tbl_name, join_tbl) = self.get_table(&tbl.relation)?;
                match &tbl.join_operator {
                    JoinOperator::Inner(constraint) => {
                        let (left_on, right_on) =
                            process_join_constraint(constraint, &tbl_name, &join_tbl_name)?;
                        lf = lf.inner_join(join_tbl, left_on, right_on)
                    },
                    JoinOperator::LeftOuter(constraint) => {
                        let (left_on, right_on) =
                            process_join_constraint(constraint, &tbl_name, &join_tbl_name)?;
                        lf = lf.left_join(join_tbl, left_on, right_on)
                    },
                    JoinOperator::FullOuter(constraint) => {
                        let (left_on, right_on) =
                            process_join_constraint(constraint, &tbl_name, &join_tbl_name)?;
                        lf = lf.outer_join(join_tbl, left_on, right_on)
                    },
                    JoinOperator::CrossJoin => lf = lf.cross_join(join_tbl),
                    join_type => {
                        polars_bail!(
                            InvalidOperation:
                            "join type '{:?}' not yet supported by polars-sql", join_type
                        );
                    },
                }
            }
        };
        Ok(lf)
    }
    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
        let sql_tbl: &TableWithJoins = select_stmt
            .from
            .get(0)
            .ok_or_else(|| polars_err!(ComputeError: "no table name provided in query"))?;
        let mut lf = self.execute_from_statement(sql_tbl)?;
        let mut contains_wildcard = false;
        let mut contains_wildcard_exclude = false;
        if let Some(expr) = select_stmt.selection.as_ref() {
            let filter_expression = parse_sql_expr(expr, self)?;
            lf = lf.filter(filter_expression)
        }
        let projections: Vec<_> = select_stmt
            .projection
            .iter()
            .map(|select_item| {
                Ok(match select_item {
                    SelectItem::UnnamedExpr(expr) => parse_sql_expr(expr, self)?,
                    SelectItem::ExprWithAlias { expr, alias } => {
                        let expr = parse_sql_expr(expr, self)?;
                        expr.alias(&alias.value)
                    },
                    SelectItem::QualifiedWildcard(oname, wildcard_options) => self
                        .process_qualified_wildcard(
                            oname,
                            wildcard_options,
                            &mut contains_wildcard_exclude,
                        )?,
                    SelectItem::Wildcard(wildcard_options) => {
                        contains_wildcard = true;
                        let e = col("*");
                        self.process_wildcard_additional_options(
                            e,
                            wildcard_options,
                            &mut contains_wildcard_exclude,
                        )?
                    },
                })
            })
            .collect::<PolarsResult<_>>()?;
        let group_by_keys: Vec<Expr> = select_stmt
            .group_by
            .iter()
            .map(|e| match e {
                SqlExpr::Value(SQLValue::Number(idx, _)) => {
                    let idx = match idx.parse::<usize>() {
                        Ok(0) | Err(_) => Err(polars_err!(
                            ComputeError:
                            "group_by error: a positive number or an expression expected, got {}",
                            idx
                        )),
                        Ok(idx) => Ok(idx),
                    }?;
                    Ok(projections[idx].clone())
                },
                SqlExpr::Value(_) => Err(polars_err!(
                    ComputeError:
                    "group_by error: a positive number or an expression expected",
                )),
                _ => parse_sql_expr(e, self),
            })
            .collect::<PolarsResult<_>>()?;
        lf = if group_by_keys.is_empty() {
            if query.order_by.is_empty() {
                lf.select(projections)
            } else if !contains_wildcard {
                let schema = lf.schema()?;
                let mut column_names = schema.get_names();
                let mut retained_names: BTreeSet<String> = BTreeSet::new();
                projections.iter().for_each(|expr| match expr {
                    Expr::Alias(_, name) => {
                        retained_names.insert((name).to_string());
                    },
                    Expr::Column(name) => {
                        retained_names.insert((name).to_string());
                    },
                    Expr::Columns(names) => names.iter().for_each(|name| {
                        retained_names.insert((name).to_string());
                    }),
                    Expr::Exclude(inner_expr, excludes) => {
                        if let Expr::Columns(names) = (*inner_expr).as_ref() {
                            names.iter().for_each(|name| {
                                retained_names.insert((name).to_string());
                            })
                        }
                        excludes.iter().for_each(|excluded| {
                            if let Excluded::Name(name) = excluded {
                                retained_names.remove(&(name.to_string()));
                            }
                        });
                    },
                    _ => {},
                });
                lf = lf.with_columns(projections);
                lf = self.process_order_by(lf, &query.order_by)?;
                column_names.retain(|&name| !retained_names.contains(name));
                lf.drop_columns(column_names)
            } else if contains_wildcard_exclude {
                let mut dropped_names = Vec::with_capacity(projections.len());
                let exclude_expr = projections.iter().find(|expr| {
                    if let Expr::Exclude(_, excludes) = expr {
                        for excluded in excludes.iter() {
                            if let Excluded::Name(name) = excluded {
                                dropped_names.push(name.to_string());
                            }
                        }
                        true
                    } else {
                        false
                    }
                });
                if exclude_expr.is_some() {
                    lf = lf.with_columns(projections);
                    lf = self.process_order_by(lf, &query.order_by)?;
                    lf.drop_columns(dropped_names)
                } else {
                    lf = lf.select(projections);
                    self.process_order_by(lf, &query.order_by)?
                }
            } else {
                lf = lf.select(projections);
                self.process_order_by(lf, &query.order_by)?
            }
        } else {
            lf = self.process_group_by(lf, contains_wildcard, &group_by_keys, &projections)?;
            lf = self.process_order_by(lf, &query.order_by)?;
            match select_stmt.having.as_ref() {
                Some(expr) => lf.filter(parse_sql_expr(expr, self)?),
                None => lf,
            }
        };
        lf = match &select_stmt.distinct {
            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
            Some(Distinct::On(exprs)) => {
                let cols = exprs
                    .iter()
                    .map(|e| {
                        let expr = parse_sql_expr(e, self)?;
                        if let Expr::Column(name) = expr {
                            Ok(name.to_string())
                        } else {
                            Err(polars_err!(
                                ComputeError:
                                "DISTINCT ON only supports column names"
                            ))
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?;
                if !query.order_by.is_empty() {
                    lf = self.process_order_by(lf, &query.order_by)?;
                }
                return Ok(lf.unique_stable(Some(cols), UniqueKeepStrategy::First));
            },
            None => lf,
        };
        Ok(lf)
    }
    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        if let Statement::CreateTable {
            if_not_exists,
            name,
            query,
            ..
        } = stmt
        {
            let tbl_name = name.0.get(0).unwrap().value.as_str();
            if *if_not_exists && self.table_map.contains_key(tbl_name) {
                polars_bail!(ComputeError: "relation {} already exists", tbl_name);
                }
            if let Some(query) = query {
                let lf = self.execute_query(query)?;
                self.register(tbl_name, lf);
                let out = df! {
                    "Response" => ["Create Table"]
                }
                .unwrap()
                .lazy();
                Ok(out)
            } else {
                polars_bail!(ComputeError: "only CREATE TABLE AS SELECT is supported");
            }
        } else {
            unreachable!()
        }
    }
    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
        match relation {
            TableFactor::Table {
                name, alias, args, ..
            } => {
                if let Some(args) = args {
                    return self.execute_tbl_function(name, alias, args);
                }
                let tbl_name = name.0.get(0).unwrap().value.as_str();
                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
                    match alias {
                        Some(alias) => Ok((alias.to_string(), lf)),
                        None => Ok((tbl_name.to_string(), lf)),
                    }
                } else {
                    polars_bail!(ComputeError: "relation '{}' was not found", tbl_name);
                }
            },
            _ => polars_bail!(ComputeError: "not implemented"),
        }
    }
    fn execute_tbl_function(
        &mut self,
        name: &ObjectName,
        alias: &Option<TableAlias>,
        args: &[FunctionArg],
    ) -> PolarsResult<(String, LazyFrame)> {
        let tbl_fn = name.0.get(0).unwrap().value.as_str();
        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
        let (tbl_name, lf) = read_fn.execute(args)?;
        let tbl_name = alias
            .as_ref()
            .map(|a| a.name.value.clone())
            .unwrap_or_else(|| tbl_name);
        self.table_map.insert(tbl_name.clone(), lf.clone());
        Ok((tbl_name, lf))
    }
    fn process_order_by(&mut self, lf: LazyFrame, ob: &[OrderByExpr]) -> PolarsResult<LazyFrame> {
        let mut by = Vec::with_capacity(ob.len());
        let mut descending = Vec::with_capacity(ob.len());
        for ob in ob {
            by.push(parse_sql_expr(&ob.expr, self)?);
            descending.push(!ob.asc.unwrap_or(true));
            polars_ensure!(
                ob.nulls_first.is_none(),
                ComputeError: "nulls first/last is not yet supported",
            );
        }
        Ok(lf.sort_by_exprs(&by, descending, false, false))
    }
    fn process_group_by(
        &mut self,
        lf: LazyFrame,
        contains_wildcard: bool,
        group_by_keys: &[Expr],
        projections: &[Expr],
    ) -> PolarsResult<LazyFrame> {
        polars_ensure!(
            !contains_wildcard,
            ComputeError: "group_by error: can't process wildcard in group_by"
        );
        let schema_before = lf.schema()?;
        let group_by_keys_schema =
            expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
        let mut aggregation_projection = Vec::with_capacity(projections.len());
        let mut aliases: BTreeSet<&str> = BTreeSet::new();
        for mut e in projections {
            if e.clone().meta().is_simple_projection() {
                if let Expr::Alias(expr, name) = e {
                    aliases.insert(name);
                    e = expr
                }
            }
            let field = e.to_field(&schema_before, Context::Default)?;
            if group_by_keys_schema.get(&field.name).is_none() {
                aggregation_projection.push(e.clone())
            }
        }
        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
        let projection_schema =
            expressions_to_schema(projections, &schema_before, Context::Default)?;
        let final_projection = projection_schema
            .iter_names()
            .zip(projections)
            .map(|(name, projection_expr)| {
                if group_by_keys_schema.get(name).is_some() || aliases.contains(name.as_str()) {
                    projection_expr.clone()
                } else {
                    col(name)
                }
            })
            .collect::<Vec<_>>();
        Ok(aggregated.select(&final_projection))
    }
    fn process_limit_offset(
        &mut self,
        lf: LazyFrame,
        limit: &Option<SqlExpr>,
        offset: &Option<Offset>,
    ) -> PolarsResult<LazyFrame> {
        match (offset, limit) {
            (
                Some(Offset {
                    value: SqlExpr::Value(SQLValue::Number(offset, _)),
                    ..
                }),
                Some(SqlExpr::Value(SQLValue::Number(limit, _))),
            ) => Ok(lf.slice(
                offset
                    .parse()
                    .map_err(|e| polars_err!(ComputeError: "OFFSET conversion error: {}", e))?,
                limit
                    .parse()
                    .map_err(|e| polars_err!(ComputeError: "LIMIT conversion error: {}", e))?,
            )),
            (
                Some(Offset {
                    value: SqlExpr::Value(SQLValue::Number(offset, _)),
                    ..
                }),
                None,
            ) => Ok(lf.slice(
                offset
                    .parse()
                    .map_err(|e| polars_err!(ComputeError: "OFFSET conversion error: {}", e))?,
                IdxSize::MAX,
            )),
            (None, Some(SqlExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
                limit
                    .parse()
                    .map_err(|e| polars_err!(ComputeError: "LIMIT conversion error: {}", e))?,
            )),
            (None, None) => Ok(lf),
            _ => polars_bail!(
                ComputeError: "non-numeric arguments for LIMIT/OFFSET are not supported",
            ),
        }
    }
    fn process_qualified_wildcard(
        &mut self,
        ObjectName(idents): &ObjectName,
        options: &WildcardAdditionalOptions,
        contains_wildcard_exclude: &mut bool,
    ) -> PolarsResult<Expr> {
        let idents = idents.as_slice();
        let e = match idents {
            [tbl_name] => {
                let lf = self.table_map.get(&tbl_name.value).ok_or_else(|| {
                    polars_err!(
                        ComputeError: "no table named '{}' found",
                        tbl_name
                    )
                })?;
                let schema = lf.schema()?;
                cols(schema.iter_names())
            },
            e => polars_bail!(
                ComputeError: "Invalid wildcard expression: {:?}",
                e
            ),
        };
        self.process_wildcard_additional_options(e, options, contains_wildcard_exclude)
    }
    fn process_wildcard_additional_options(
        &mut self,
        expr: Expr,
        options: &WildcardAdditionalOptions,
        contains_wildcard_exclude: &mut bool,
    ) -> PolarsResult<Expr> {
        if options.opt_except.is_some() {
            polars_bail!(InvalidOperation: "EXCEPT not supported. Use EXCLUDE instead")
        }
        Ok(match &options.opt_exclude {
            Some(ExcludeSelectItem::Single(ident)) => {
                *contains_wildcard_exclude = true;
                expr.exclude(vec![&ident.value])
            },
            Some(ExcludeSelectItem::Multiple(idents)) => {
                *contains_wildcard_exclude = true;
                expr.exclude(idents.iter().map(|i| &i.value))
            },
            _ => expr,
        })
    }
}
impl SQLContext {
    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
        self.table_map.clone()
    }
    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
        Self {
            table_map,
            cte_map: RefCell::new(PlHashMap::new()),
        }
    }
}