Skip to main content

polars_sql/
context.rs

1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13    BinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct, ExcludeSelectItem,
14    Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident, JoinConstraint,
15    JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName, ObjectType,
16    OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectItem,
17    SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18    TableFactor, TableWithJoins, Truncate, UnaryOperator, Value as SQLValue, ValueWithSpan, Values,
19    Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29    QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30    expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35fn clear_lf(lf: LazyFrame) -> LazyFrame {
36    let cb = PlanCallback::new(move |(_, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
37        let schema = &schemas[0];
38        Ok(DataFrame::empty_with_schema(schema).lazy().logical_plan)
39    });
40    lf.pipe_with_schema(cb)
41}
42
43#[derive(Clone)]
44pub struct TableInfo {
45    pub(crate) frame: LazyFrame,
46    pub(crate) name: PlSmallStr,
47    pub(crate) schema: Arc<Schema>,
48}
49
50struct SelectModifiers {
51    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
52    ilike: Option<regex::Regex>,               // SELECT * ILIKE
53    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
54    replace: Vec<Expr>,                        // SELECT * REPLACE
55}
56impl SelectModifiers {
57    fn matches_ilike(&self, s: &str) -> bool {
58        match &self.ilike {
59            Some(rx) => rx.is_match(s),
60            None => true,
61        }
62    }
63    fn renamed_cols(&self) -> Vec<Expr> {
64        self.rename
65            .iter()
66            .map(|(before, after)| col(before.clone()).alias(after.clone()))
67            .collect()
68    }
69}
70
71/// For SELECT projection items; helps simplify any required disambiguation.
72enum ProjectionItem {
73    QualifiedExprs(PlSmallStr, Vec<Expr>),
74    Exprs(Vec<Expr>),
75}
76
77/// Extract the output column name from an expression (if it has one).
78fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
79    match expr {
80        Expr::Column(name) | Expr::Alias(_, name) => Some(name),
81        _ => None,
82    }
83}
84
85/// Disambiguate qualified wildcard columns that conflict with each other or other projections.
86fn disambiguate_projection_cols(
87    items: Vec<ProjectionItem>,
88    schema: &Schema,
89) -> PolarsResult<Vec<Expr>> {
90    // Establish qualified wildcard names (with counts), and other expression names
91    let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
92    let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
93    for item in &items {
94        match item {
95            ProjectionItem::QualifiedExprs(_, exprs) => {
96                for expr in exprs {
97                    if let Some(name) = expr_output_name(expr) {
98                        *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
99                    }
100                }
101            },
102            ProjectionItem::Exprs(exprs) => {
103                for expr in exprs {
104                    if let Some(name) = expr_output_name(expr) {
105                        other_names.insert(name.clone());
106                    }
107                }
108            },
109        }
110    }
111
112    // Names requiring disambiguation (duplicates across wildcards, eg: `tbl1.*`,`tbl2.*`)
113    let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
114        .into_iter()
115        .filter(|(name, count)| *count > 1 || other_names.contains(name))
116        .map(|(name, _)| name)
117        .collect();
118
119    // Output, applying suffixes where needed
120    let mut result: Vec<Expr> = Vec::new();
121    for item in items {
122        match item {
123            ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
124                for expr in exprs {
125                    if let Some(name) = expr_output_name(&expr) {
126                        if needs_suffix.contains(name) {
127                            let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
128                            if schema.contains(suffixed.as_str()) {
129                                result.push(col(suffixed));
130                                continue;
131                            }
132                            if other_names.contains(name) {
133                                polars_bail!(
134                                    SQLInterface:
135                                    "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
136                                );
137                            }
138                        }
139                    }
140                    result.push(expr);
141                }
142            },
143            ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
144                result.extend(exprs);
145            },
146        }
147    }
148    Ok(result)
149}
150
151/// The SQLContext is the main entry point for executing SQL queries.
152#[derive(Clone)]
153pub struct SQLContext {
154    pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
155    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
156    pub(crate) lp_arena: Arena<IR>,
157    pub(crate) expr_arena: Arena<AExpr>,
158
159    cte_map: PlHashMap<String, LazyFrame>,
160    table_aliases: PlHashMap<String, String>,
161    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
162    pub(crate) named_windows: PlHashMap<String, WindowSpec>,
163}
164
165impl Default for SQLContext {
166    fn default() -> Self {
167        Self {
168            function_registry: Arc::new(DefaultFunctionRegistry {}),
169            table_map: Default::default(),
170            cte_map: Default::default(),
171            table_aliases: Default::default(),
172            joined_aliases: Default::default(),
173            named_windows: Default::default(),
174            lp_arena: Default::default(),
175            expr_arena: Default::default(),
176        }
177    }
178}
179
180impl SQLContext {
181    /// Create a new SQLContext.
182    /// ```rust
183    /// # use polars_sql::SQLContext;
184    /// # fn main() {
185    /// let ctx = SQLContext::new();
186    /// # }
187    /// ```
188    pub fn new() -> Self {
189        Self::default()
190    }
191
192    /// Get the names of all registered tables, in sorted order.
193    pub fn get_tables(&self) -> Vec<String> {
194        let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
195        tables.sort_unstable();
196        tables
197    }
198
199    /// Register a [`LazyFrame`] as a table in the SQLContext.
200    /// ```rust
201    /// # use polars_sql::SQLContext;
202    /// # use polars_core::prelude::*;
203    /// # use polars_lazy::prelude::*;
204    /// # fn main() {
205    ///
206    /// let mut ctx = SQLContext::new();
207    /// let df = df! {
208    ///    "a" =>  [1, 2, 3],
209    /// }.unwrap().lazy();
210    ///
211    /// ctx.register("df", df);
212    /// # }
213    ///```
214    pub fn register(&self, name: &str, lf: LazyFrame) {
215        self.table_map.write().unwrap().insert(name.to_owned(), lf);
216    }
217
218    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
219    pub fn unregister(&self, name: &str) {
220        self.table_map.write().unwrap().remove(&name.to_owned());
221    }
222
223    /// Execute a SQL query, returning a [`LazyFrame`].
224    /// ```rust
225    /// # use polars_sql::SQLContext;
226    /// # use polars_core::prelude::*;
227    /// # use polars_lazy::prelude::*;
228    /// # fn main() {
229    ///
230    /// let mut ctx = SQLContext::new();
231    /// let df = df! {
232    ///    "a" =>  [1, 2, 3],
233    /// }
234    /// .unwrap();
235    ///
236    /// ctx.register("df", df.clone().lazy());
237    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
238    /// assert!(sql_df.equals(&df));
239    /// # }
240    ///```
241    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
242        let mut parser = Parser::new(&GenericDialect);
243        parser = parser.with_options(ParserOptions {
244            trailing_commas: true,
245            ..Default::default()
246        });
247
248        let ast = parser
249            .try_with_sql(query)
250            .map_err(to_sql_interface_err)?
251            .parse_statements()
252            .map_err(to_sql_interface_err)?;
253
254        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
255        let res = self.execute_statement(ast.first().unwrap())?;
256
257        // Ensure the result uses the proper arenas.
258        // This will instantiate new arenas with a new version.
259        let lp_arena = std::mem::take(&mut self.lp_arena);
260        let expr_arena = std::mem::take(&mut self.expr_arena);
261        res.set_cached_arena(lp_arena, expr_arena);
262
263        // Every execution should clear the statement-level maps.
264        self.cte_map.clear();
265        self.table_aliases.clear();
266        self.joined_aliases.clear();
267        self.named_windows.clear();
268
269        Ok(res)
270    }
271
272    /// Add a function registry to the SQLContext.
273    /// The registry provides the ability to add custom functions to the SQLContext.
274    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
275        self.function_registry = function_registry;
276        self
277    }
278
279    /// Get the function registry of the SQLContext
280    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
281        &self.function_registry
282    }
283
284    /// Get a mutable reference to the function registry of the SQLContext
285    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
286        Arc::get_mut(&mut self.function_registry).unwrap()
287    }
288}
289
290impl SQLContext {
291    fn isolated(&self) -> Self {
292        Self {
293            // Deep clone to isolate
294            table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
295            named_windows: self.named_windows.clone(),
296            cte_map: self.cte_map.clone(),
297
298            ..Default::default()
299        }
300    }
301
302    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
303        let ast = stmt;
304        Ok(match ast {
305            Statement::Query(query) => self.execute_query(query)?,
306            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
307            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
308            stmt @ Statement::Drop {
309                object_type: ObjectType::Table,
310                ..
311            } => self.execute_drop_table(stmt)?,
312            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
313            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
314            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
315            _ => polars_bail!(
316                SQLInterface: "statement type is not supported:\n{:?}", ast,
317            ),
318        })
319    }
320
321    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
322        self.register_ctes(query)?;
323        self.execute_query_no_ctes(query)
324    }
325
326    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
327        self.validate_query(query)?;
328
329        let lf = self.process_query(&query.body, query)?;
330        self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
331    }
332
333    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
334        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
335    }
336
337    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
338        // Resolve the table name in the current scope; multi-stage fallback
339        // * table name → cte name
340        // * table alias → cte alias
341        self.table_map
342            .read()
343            .unwrap()
344            .get(name)
345            .cloned()
346            .or_else(|| self.cte_map.get(name).cloned())
347            .or_else(|| {
348                self.table_aliases.get(name).and_then(|alias| {
349                    self.table_map
350                        .read()
351                        .unwrap()
352                        .get(alias.as_str())
353                        .or_else(|| self.cte_map.get(alias.as_str()))
354                        .cloned()
355                })
356            })
357    }
358
359    /// Execute a query in an isolated context. This prevents subqueries from mutating
360    /// arenas and other context state. Returns both the LazyFrame *and* its associated
361    /// Schema (so that the correct arenas are used when determining schema).
362    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
363    where
364        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
365    {
366        let mut ctx = self.isolated();
367
368        // Execute query with clean state (eg: nested/subquery)
369        let lf = query(&mut ctx)?;
370
371        // Save state
372        lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
373
374        Ok(lf)
375    }
376
377    fn expr_or_ordinal(
378        &mut self,
379        e: &SQLExpr,
380        exprs: &[Expr],
381        selected: Option<&[Expr]>,
382        schema: Option<&Schema>,
383        clause: &str,
384    ) -> PolarsResult<Expr> {
385        match e {
386            SQLExpr::UnaryOp {
387                op: UnaryOperator::Minus,
388                expr,
389            } if matches!(
390                **expr,
391                SQLExpr::Value(ValueWithSpan {
392                    value: SQLValue::Number(_, _),
393                    ..
394                })
395            ) =>
396            {
397                if let SQLExpr::Value(ValueWithSpan {
398                    value: SQLValue::Number(ref idx, _),
399                    ..
400                }) = **expr
401                {
402                    Err(polars_err!(
403                    SQLSyntax:
404                    "negative ordinal values are invalid for {}; found -{}",
405                    clause,
406                    idx
407                    ))
408                } else {
409                    unreachable!()
410                }
411            },
412            SQLExpr::Value(ValueWithSpan {
413                value: SQLValue::Number(idx, _),
414                ..
415            }) => {
416                // note: sql queries are 1-indexed
417                let idx = idx.parse::<usize>().map_err(|_| {
418                    polars_err!(
419                        SQLSyntax:
420                        "negative ordinal values are invalid for {}; found {}",
421                        clause,
422                        idx
423                    )
424                })?;
425                // note: "selected" cols represent final projection order, so we use those for
426                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
427                let cols = if let Some(cols) = selected {
428                    cols
429                } else {
430                    exprs
431                };
432                Ok(cols
433                    .get(idx - 1)
434                    .ok_or_else(|| {
435                        polars_err!(
436                            SQLInterface:
437                            "{} ordinal value must refer to a valid column; found {}",
438                            clause,
439                            idx
440                        )
441                    })?
442                    .clone())
443            },
444            SQLExpr::Value(v) => Err(polars_err!(
445                SQLSyntax:
446                "{} requires a valid expression or positive ordinal; found {}", clause, v,
447            )),
448            _ => {
449                // Handle qualified cross-aliasing in ORDER BY clauses
450                // (eg: `SELECT a AS b, -b AS a ... ORDER BY self.a`)
451                let mut expr = parse_sql_expr(e, self, schema)?;
452                if matches!(e, SQLExpr::CompoundIdentifier(_)) {
453                    if let Some(schema) = schema {
454                        expr = expr.map_expr(|ex| match &ex {
455                            Expr::Column(name) => {
456                                let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
457                                if schema.contains(prefixed.as_str()) {
458                                    col(prefixed)
459                                } else {
460                                    ex
461                                }
462                            },
463                            _ => ex,
464                        });
465                    }
466                }
467                Ok(expr)
468            },
469        }
470    }
471
472    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
473        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
474            if let Some(name) = aliases.get(column_name) {
475                return name.to_string();
476            }
477        }
478        column_name.to_string()
479    }
480
481    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
482        match expr {
483            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
484            SetExpr::Query(nested_query) => {
485                let lf = self.execute_query_no_ctes(nested_query)?;
486                self.process_order_by(lf, &query.order_by, None)
487            },
488            SetExpr::SetOperation {
489                op: SetOperator::Union,
490                set_quantifier,
491                left,
492                right,
493            } => self.process_union(left, right, set_quantifier, query),
494
495            #[cfg(feature = "semi_anti_join")]
496            SetExpr::SetOperation {
497                op: SetOperator::Intersect | SetOperator::Except,
498                set_quantifier,
499                left,
500                right,
501            } => self.process_except_intersect(left, right, set_quantifier, query),
502
503            SetExpr::Values(Values {
504                explicit_row: _,
505                rows,
506                value_keyword: _,
507            }) => self.process_values(rows),
508
509            SetExpr::Table(tbl) => {
510                if let Some(table_name) = tbl.table_name.as_ref() {
511                    self.get_table_from_current_scope(table_name)
512                        .ok_or_else(|| {
513                            polars_err!(
514                                SQLInterface: "no table or alias named '{}' found",
515                                tbl
516                            )
517                        })
518                } else {
519                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
520                }
521            },
522            op => {
523                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
524            },
525        }
526    }
527
528    #[cfg(feature = "semi_anti_join")]
529    fn process_except_intersect(
530        &mut self,
531        left: &SetExpr,
532        right: &SetExpr,
533        quantifier: &SetQuantifier,
534        query: &Query,
535    ) -> PolarsResult<LazyFrame> {
536        let (join_type, op_name) = match *query.body {
537            SetExpr::SetOperation {
538                op: SetOperator::Except,
539                ..
540            } => (JoinType::Anti, "EXCEPT"),
541            _ => (JoinType::Semi, "INTERSECT"),
542        };
543
544        // Note: each side of the EXCEPT/INTERSECT operation should execute
545        // in isolation to prevent context state leakage between them
546        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
547        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
548        let lf_schema = self.get_frame_schema(&mut lf)?;
549
550        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
551        let rf_cols = match quantifier {
552            SetQuantifier::ByName => None,
553            SetQuantifier::Distinct | SetQuantifier::None => {
554                let rf_schema = self.get_frame_schema(&mut rf)?;
555                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
556                if lf_cols.len() != rf_cols.len() {
557                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
558                }
559                Some(rf_cols)
560            },
561            _ => {
562                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
563            },
564        };
565        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
566        let joined_tbl = match rf_cols {
567            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
568            None => join.on(lf_cols).finish(),
569        };
570        let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
571        self.process_order_by(lf, &query.order_by, None)
572    }
573
574    fn process_union(
575        &mut self,
576        left: &SetExpr,
577        right: &SetExpr,
578        quantifier: &SetQuantifier,
579        query: &Query,
580    ) -> PolarsResult<LazyFrame> {
581        let quantifier = *quantifier;
582
583        // Note: each side of the UNION operation should execute
584        // in isolation to prevent context state leakage between them
585        let lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
586        let rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
587
588        let cb = PlanCallback::new(
589            move |(mut plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
590                let mut rf = LazyFrame::from(plans.pop().unwrap());
591                let lf = LazyFrame::from(plans.pop().unwrap());
592
593                let opts = UnionArgs {
594                    parallel: true,
595                    to_supertypes: true,
596                    maintain_order: false,
597                    ..Default::default()
598                };
599                let out = match quantifier {
600                    // UNION [ALL | DISTINCT]
601                    SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
602                        let lf_schema = &schemas[0];
603                        let rf_schema = &schemas[1];
604                        if lf_schema.len() != rf_schema.len() {
605                            polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
606                        }
607                        // rename `rf` columns to match `lf` if they differ; SQL behaves
608                        // positionally on UNION ops (unless using the "BY NAME" qualifier)
609                        if lf_schema.iter_names().ne(rf_schema.iter_names()) {
610                            rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
611                        }
612                        let concatenated = concat(vec![lf, rf], opts);
613                        match quantifier {
614                            SetQuantifier::Distinct | SetQuantifier::None => {
615                                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
616                            },
617                            _ => concatenated,
618                        }
619                    },
620                    // UNION ALL BY NAME
621                    #[cfg(feature = "diagonal_concat")]
622                    SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
623                    // UNION [DISTINCT] BY NAME
624                    #[cfg(feature = "diagonal_concat")]
625                    SetQuantifier::ByName | SetQuantifier::DistinctByName => {
626                        let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
627                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
628                    },
629                    #[allow(unreachable_patterns)]
630                    _ => {
631                        polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
632                    },
633                };
634
635                out.map(|lf| lf.logical_plan)
636            },
637        );
638
639        let lf = lf.pipe_with_schemas(vec![rf], cb);
640        self.process_order_by(lf, &query.order_by, None)
641    }
642
643    /// Process UNNEST as a lateral operation when it contains column references
644    /// (handles `CROSS JOIN UNNEST(col) AS name` by exploding the referenced col).
645    fn process_unnest_lateral(
646        &self,
647        lf: LazyFrame,
648        alias: &Option<TableAlias>,
649        array_exprs: &[SQLExpr],
650        with_offset: bool,
651    ) -> PolarsResult<LazyFrame> {
652        let alias = alias
653            .as_ref()
654            .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
655        polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
656
657        let (mut explode_cols, mut rename_from, mut rename_to) = (
658            Vec::with_capacity(array_exprs.len()),
659            Vec::with_capacity(array_exprs.len()),
660            Vec::with_capacity(array_exprs.len()),
661        );
662        let is_single_col = array_exprs.len() == 1;
663
664        for (i, arr_expr) in array_exprs.iter().enumerate() {
665            let col_name = match arr_expr {
666                SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
667                SQLExpr::CompoundIdentifier(parts) => {
668                    PlSmallStr::from_str(&parts.last().unwrap().value)
669                },
670                SQLExpr::Array(_) => polars_bail!(
671                    SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
672                ),
673                other => polars_bail!(
674                    SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
675                ),
676            };
677            // alias: column name from "AS t(col)", or table alias
678            if let Some(name) = alias
679                .columns
680                .get(i)
681                .map(|c| c.name.value.as_str())
682                .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
683                .filter(|name| !name.is_empty() && *name != col_name.as_str())
684            {
685                rename_from.push(col_name.clone());
686                rename_to.push(PlSmallStr::from_str(name));
687            }
688            explode_cols.push(col_name);
689        }
690
691        let mut lf = lf.explode(
692            Selector::ByName {
693                names: Arc::from(explode_cols),
694                strict: true,
695            },
696            ExplodeOptions {
697                empty_as_null: true,
698                keep_nulls: true,
699            },
700        );
701        if !rename_from.is_empty() {
702            lf = lf.rename(rename_from, rename_to, true);
703        }
704        Ok(lf)
705    }
706
707    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
708        let frame_rows: Vec<Row> = values.iter().map(|row| {
709            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
710                let expr = parse_sql_expr(expr, self, None)?;
711                match expr {
712                    Expr::Literal(value) => {
713                        value.to_any_value()
714                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
715                            .map(|av| av.into_static())
716                    },
717                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
718                }
719            }).collect();
720            row_data.map(Row::new)
721        }).collect::<Result<_, _>>()?;
722
723        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
724    }
725
726    // EXPLAIN SELECT * FROM DF
727    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
728        match stmt {
729            Statement::Explain { statement, .. } => {
730                let lf = self.execute_statement(statement)?;
731                let plan = lf.describe_optimized_plan()?;
732                let plan = plan
733                    .split('\n')
734                    .collect::<Series>()
735                    .with_name(PlSmallStr::from_static("Logical Plan"))
736                    .into_column();
737                let df = DataFrame::new_infer_height(vec![plan])?;
738                Ok(df.lazy())
739            },
740            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
741        }
742    }
743
744    // SHOW TABLES
745    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
746        let tables = Column::new("name".into(), self.get_tables());
747        let df = DataFrame::new_infer_height(vec![tables])?;
748        Ok(df.lazy())
749    }
750
751    // DROP TABLE <tbl>
752    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
753        match stmt {
754            Statement::Drop { names, .. } => {
755                names.iter().for_each(|name| {
756                    self.table_map.write().unwrap().remove(&name.to_string());
757                });
758                Ok(DataFrame::empty().lazy())
759            },
760            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
761        }
762    }
763
764    // DELETE FROM <tbl> [WHERE ...]
765    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
766        if let Statement::Delete(Delete {
767            tables,
768            from,
769            using,
770            selection,
771            returning,
772            order_by,
773            limit,
774            delete_token: _,
775        }) = stmt
776        {
777            if !tables.is_empty()
778                || using.is_some()
779                || returning.is_some()
780                || limit.is_some()
781                || !order_by.is_empty()
782            {
783                let error_message = match () {
784                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
785                    _ if using.is_some() => "DELETE does not support the USING clause",
786                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
787                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
788                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
789                    _ => unreachable!(),
790                };
791                polars_bail!(SQLInterface: error_message);
792            }
793            let from_tables = match &from {
794                FromTable::WithFromKeyword(from) => from,
795                FromTable::WithoutKeyword(from) => from,
796            };
797            if from_tables.len() > 1 {
798                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
799            }
800            let tbl_expr = from_tables.first().unwrap();
801            if !tbl_expr.joins.is_empty() {
802                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
803            }
804            let (_, lf) = self.get_table(&tbl_expr.relation)?;
805            if selection.is_none() {
806                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
807                Ok(clear_lf(lf))
808            } else {
809                // apply constraint as inverted filter (drops rows matching the selection)
810                Ok(self.process_where(lf.clone(), selection, true, None)?)
811            }
812        } else {
813            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
814        }
815    }
816
817    // TRUNCATE <tbl>
818    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
819        if let Statement::Truncate(Truncate {
820            table_names,
821            partitions,
822            ..
823        }) = stmt
824        {
825            match partitions {
826                None => {
827                    if table_names.len() != 1 {
828                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
829                    }
830                    let tbl = table_names[0].name.to_string();
831                    if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
832                        *lf = clear_lf(lf.clone());
833                        Ok(lf.clone())
834                    } else {
835                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
836                    }
837                },
838                _ => {
839                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
840                },
841            }
842        } else {
843            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
844        }
845    }
846
847    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
848        self.cte_map.insert(name.to_owned(), lf);
849    }
850
851    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
852        if let Some(with) = &query.with {
853            if with.recursive {
854                polars_bail!(SQLInterface: "recursive CTEs are not supported")
855            }
856            for cte in &with.cte_tables {
857                let cte_name = cte.alias.name.value.clone();
858                let mut lf = self.execute_query(&cte.query)?;
859                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
860                self.register_cte(&cte_name, lf);
861            }
862        }
863        Ok(())
864    }
865
866    fn register_named_windows(
867        &mut self,
868        named_windows: &[NamedWindowDefinition],
869    ) -> PolarsResult<()> {
870        for NamedWindowDefinition(name, expr) in named_windows {
871            let spec = match expr {
872                NamedWindowExpr::NamedWindow(ref_name) => self
873                    .named_windows
874                    .get(&ref_name.value)
875                    .ok_or_else(|| {
876                        polars_err!(
877                            SQLInterface:
878                            "named window '{}' references undefined window '{}'",
879                            name.value, ref_name.value
880                        )
881                    })?
882                    .clone(),
883                NamedWindowExpr::WindowSpec(spec) => spec.clone(),
884            };
885            self.named_windows.insert(name.value.clone(), spec);
886        }
887        Ok(())
888    }
889
890    /// execute the 'FROM' part of the query
891    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
892        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
893        if !tbl_expr.joins.is_empty() {
894            for join in &tbl_expr.joins {
895                // Handle "CROSS JOIN UNNEST(col)" as a lateral join op
896                if let (
897                    JoinOperator::CrossJoin(JoinConstraint::None),
898                    TableFactor::UNNEST {
899                        alias,
900                        array_exprs,
901                        with_offset,
902                        ..
903                    },
904                ) = (&join.join_operator, &join.relation)
905                {
906                    if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
907                        lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
908                        continue;
909                    }
910                }
911
912                let (r_name, mut rf) = self.get_table(&join.relation)?;
913                if r_name.is_empty() {
914                    // Require non-empty to avoid duplicate column errors from nested self-joins.
915                    polars_bail!(
916                        SQLInterface:
917                        "cannot join on unnamed relation; please provide an alias"
918                    )
919                }
920                let left_schema = self.get_frame_schema(&mut lf)?;
921                let right_schema = self.get_frame_schema(&mut rf)?;
922
923                lf = match &join.join_operator {
924                    op @ (JoinOperator::Join(constraint)  // note: bare "join" is inner
925                    | JoinOperator::FullOuter(constraint)
926                    | JoinOperator::Left(constraint)
927                    | JoinOperator::LeftOuter(constraint)
928                    | JoinOperator::Right(constraint)
929                    | JoinOperator::RightOuter(constraint)
930                    | JoinOperator::Inner(constraint)
931                    | JoinOperator::Anti(constraint)
932                    | JoinOperator::Semi(constraint)
933                    | JoinOperator::LeftAnti(constraint)
934                    | JoinOperator::LeftSemi(constraint)
935                    | JoinOperator::RightAnti(constraint)
936                    | JoinOperator::RightSemi(constraint)) => {
937                        let (lf, rf) = match op {
938                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
939                            _ => (lf, rf),
940                        };
941                        self.process_join(
942                            &TableInfo {
943                                frame: lf,
944                                name: (&l_name).into(),
945                                schema: left_schema.clone(),
946                            },
947                            &TableInfo {
948                                frame: rf,
949                                name: (&r_name).into(),
950                                schema: right_schema.clone(),
951                            },
952                            constraint,
953                            match op {
954                                JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
955                                JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
956                                    JoinType::Left
957                                },
958                                JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
959                                    JoinType::Right
960                                },
961                                JoinOperator::FullOuter(_) => JoinType::Full,
962                                #[cfg(feature = "semi_anti_join")]
963                                JoinOperator::Anti(_)
964                                | JoinOperator::LeftAnti(_)
965                                | JoinOperator::RightAnti(_) => JoinType::Anti,
966                                #[cfg(feature = "semi_anti_join")]
967                                JoinOperator::Semi(_)
968                                | JoinOperator::LeftSemi(_)
969                                | JoinOperator::RightSemi(_) => JoinType::Semi,
970                                join_type => polars_bail!(
971                                    SQLInterface:
972                                    "join type '{:?}' not currently supported",
973                                    join_type
974                                ),
975                            },
976                        )?
977                    },
978                    JoinOperator::CrossJoin(JoinConstraint::None) => {
979                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
980                    },
981                    JoinOperator::CrossJoin(constraint) => {
982                        polars_bail!(
983                            SQLInterface:
984                            "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
985                            constraint
986                        )
987                    },
988                    join_type => {
989                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
990                    },
991                };
992
993                // track join-aliased columns so we can resolve/check them later
994                let joined_schema = self.get_frame_schema(&mut lf)?;
995
996                self.joined_aliases.insert(
997                    r_name.clone(),
998                    right_schema
999                        .iter_names()
1000                        .filter_map(|name| {
1001                            // col exists in both tables and is aliased in the joined result
1002                            let aliased_name = format!("{name}:{r_name}");
1003                            if left_schema.contains(name)
1004                                && joined_schema.contains(aliased_name.as_str())
1005                            {
1006                                Some((name.to_string(), aliased_name))
1007                            } else {
1008                                None
1009                            }
1010                        })
1011                        .collect::<PlHashMap<String, String>>(),
1012                );
1013            }
1014        };
1015        Ok(lf)
1016    }
1017
1018    /// Check that the SELECT statement only contains supported clauses.
1019    fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1020        // Destructure "Select" exhaustively; that way if/when new fields are added in
1021        // future sqlparser versions, we'll get a compilation error and can handle them
1022        let Select {
1023            // Supported clauses
1024            distinct: _,
1025            from: _,
1026            group_by: _,
1027            having: _,
1028            named_window: _,
1029            projection: _,
1030            qualify: _,
1031            selection: _,
1032
1033            // Metadata/token fields (can ignore)
1034            flavor: _,
1035            select_token: _,
1036            top_before_distinct: _,
1037            window_before_qualify: _,
1038
1039            // Unsupported clauses
1040            ref cluster_by,
1041            ref connect_by,
1042            ref distribute_by,
1043            ref exclude,
1044            ref into,
1045            ref lateral_views,
1046            ref prewhere,
1047            ref sort_by,
1048            ref top,
1049            ref value_table_mode,
1050        } = *select_stmt;
1051
1052        // Raise specific error messages for unsupported attributes
1053        polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1054        polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1055        polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1056        polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1057        polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1058        polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1059        polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1060        polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1061        polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1062        polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1063
1064        Ok(())
1065    }
1066
1067    /// Check that the QUERY only contains supported clauses.
1068    fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1069        // As with "Select" validation (above) destructure "Query" exhaustively
1070        let Query {
1071            // Supported clauses
1072            with: _,
1073            body: _,
1074            order_by: _,
1075            limit_clause: _,
1076            fetch,
1077
1078            // Unsupported clauses
1079            for_clause,
1080            format_clause,
1081            locks,
1082            pipe_operators,
1083            settings,
1084        } = query;
1085
1086        // Raise specific error messages for unsupported attributes
1087        polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1088        polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1089        polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1090        polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1091        polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1092
1093        // Validate FETCH clause options (if present)
1094        if let Some(Fetch {
1095            quantity: _, // supported
1096            percent,
1097            with_ties,
1098        }) = fetch
1099        {
1100            polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1101            polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1102        }
1103        Ok(())
1104    }
1105
1106    /// Execute the 'SELECT' part of the query.
1107    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1108        // Check that the statement doesn't contain unsupported SELECT clauses
1109        self.validate_select(select_stmt)?;
1110
1111        // Parse named windows first, as they may be referenced in the SELECT clause
1112        self.register_named_windows(&select_stmt.named_window)?;
1113
1114        // Get `FROM` table/data
1115        let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1116            (DataFrame::empty().lazy(), None)
1117        } else {
1118            // Note: implicit joins need more work to support properly,
1119            // explicit joins are preferred for now (ref: #16662)
1120            let from = select_stmt.clone().from;
1121            if from.len() > 1 {
1122                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1123            }
1124            let tbl_expr = from.first().unwrap();
1125            let lf = self.execute_from_statement(tbl_expr)?;
1126            let base_name = get_table_name(&tbl_expr.relation);
1127            (lf, base_name)
1128        };
1129
1130        // Check for ambiguous column references in SELECT and WHERE (if there were joins)
1131        if let Some(ref base_name) = base_table_name {
1132            if !self.joined_aliases.is_empty() {
1133                // Extract USING columns from joins (these are coalesced and not ambiguous)
1134                let using_cols: PlHashSet<String> = select_stmt
1135                    .from
1136                    .first()
1137                    .into_iter()
1138                    .flat_map(|t| t.joins.iter())
1139                    .filter_map(|join| get_using_cols(&join.join_operator))
1140                    .flatten()
1141                    .collect();
1142
1143                // Check SELECT and WHERE expressions for ambiguous column references
1144                let check_expr = |e| {
1145                    check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1146                };
1147                for item in &select_stmt.projection {
1148                    match item {
1149                        SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1150                            check_expr(e)?
1151                        },
1152                        _ => {},
1153                    }
1154                }
1155                if let Some(ref where_expr) = select_stmt.selection {
1156                    check_expr(where_expr)?;
1157                }
1158            }
1159        }
1160
1161        // Apply `WHERE` constraint
1162        let mut schema = self.get_frame_schema(&mut lf)?;
1163        lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1164
1165        // Determine projections
1166        let mut select_modifiers = SelectModifiers {
1167            ilike: None,
1168            exclude: PlHashSet::new(),
1169            rename: PlHashMap::new(),
1170            replace: vec![],
1171        };
1172
1173        // Collect window function cols if QUALIFY is present (we check at the
1174        // SQL level because empty OVER() clauses don't create Expr::Over)
1175        let window_fn_columns = if select_stmt.qualify.is_some() {
1176            select_stmt
1177                .projection
1178                .iter()
1179                .filter_map(|item| match item {
1180                    SelectItem::ExprWithAlias { expr, alias }
1181                        if expr_has_window_functions(expr) =>
1182                    {
1183                        Some(alias.value.clone())
1184                    },
1185                    _ => None,
1186                })
1187                .collect::<PlHashSet<_>>()
1188        } else {
1189            PlHashSet::new()
1190        };
1191
1192        let mut projections =
1193            self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1194
1195        // Apply `UNNEST` expressions
1196        let mut explode_names = Vec::new();
1197        let mut explode_exprs = Vec::new();
1198        let mut explode_lookup = PlHashMap::new();
1199
1200        for expr in &projections {
1201            for e in expr {
1202                if let Expr::Explode { input, .. } = e {
1203                    match input.as_ref() {
1204                        Expr::Column(name) => explode_names.push(name.clone()),
1205                        other_expr => {
1206                            // Note: skip aggregate expressions; those are handled in the GROUP BY phase
1207                            if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1208                                let temp_name =
1209                                    format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1210                                explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1211                                explode_lookup.insert(other_expr.clone(), temp_name.clone());
1212                                explode_names.push(temp_name);
1213                            }
1214                        },
1215                    }
1216                }
1217            }
1218        }
1219        if !explode_names.is_empty() {
1220            if !explode_exprs.is_empty() {
1221                lf = lf.with_columns(explode_exprs);
1222            }
1223            lf = lf.explode(
1224                Selector::ByName {
1225                    names: Arc::from(explode_names),
1226                    strict: true,
1227                },
1228                ExplodeOptions {
1229                    empty_as_null: true,
1230                    keep_nulls: true,
1231                },
1232            );
1233            projections = projections
1234                .into_iter()
1235                .map(|p| {
1236                    // Update "projections" with column refs to the now-exploded expressions
1237                    p.map_expr(|e| match e {
1238                        Expr::Explode { input, .. } => explode_lookup
1239                            .get(input.as_ref())
1240                            .map(|name| Expr::Column(name.clone()))
1241                            .unwrap_or_else(|| input.as_ref().clone()),
1242                        _ => e,
1243                    })
1244                })
1245                .collect();
1246
1247            schema = self.get_frame_schema(&mut lf)?;
1248        }
1249
1250        // Check for "GROUP BY ..." (after determining projections)
1251        let mut group_by_keys: Vec<Expr> = Vec::new();
1252        match &select_stmt.group_by {
1253            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
1254            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1255                if !modifiers.is_empty() {
1256                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1257                }
1258                // Translate the group expressions, resolving ordinal values and SELECT aliases
1259                group_by_keys = group_by_exprs
1260                    .iter()
1261                    .map(|e| match e {
1262                        SQLExpr::Identifier(ident) => {
1263                            resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1264                                || {
1265                                    self.expr_or_ordinal(
1266                                        e,
1267                                        &projections,
1268                                        None,
1269                                        Some(&schema),
1270                                        "GROUP BY",
1271                                    )
1272                                },
1273                                Ok,
1274                            )
1275                        },
1276                        _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1277                    })
1278                    .collect::<PolarsResult<_>>()?
1279            },
1280            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
1281            // nested agg/window funcs to the group key (also ignores literals).
1282            GroupByExpr::All(modifiers) => {
1283                if !modifiers.is_empty() {
1284                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1285                }
1286                projections.iter().for_each(|expr| match expr {
1287                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
1288                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1289                    Expr::Column(_) => group_by_keys.push(expr.clone()),
1290                    Expr::Alias(e, _)
1291                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1292                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1293                        if let Expr::Column(name) = &**e {
1294                            group_by_keys.push(col(name.clone()));
1295                        }
1296                    },
1297                    _ => {
1298                        // If not quick-matched, add if no nested agg/window expressions
1299                        if !has_expr(expr, |e| {
1300                            matches!(e, Expr::Agg(_))
1301                                || matches!(e, Expr::Len)
1302                                || matches!(e, Expr::Over { .. })
1303                                || {
1304                                    #[cfg(feature = "dynamic_group_by")]
1305                                    {
1306                                        matches!(e, Expr::Rolling { .. })
1307                                    }
1308                                    #[cfg(not(feature = "dynamic_group_by"))]
1309                                    {
1310                                        false
1311                                    }
1312                                }
1313                        }) {
1314                            group_by_keys.push(expr.clone())
1315                        }
1316                    },
1317                });
1318            },
1319        };
1320
1321        lf = if group_by_keys.is_empty() {
1322            // The 'having' clause is only valid inside 'group by'
1323            if select_stmt.having.is_some() {
1324                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1325            };
1326
1327            // Final/selected cols, accounting for 'SELECT *' modifiers
1328            let mut retained_cols = Vec::with_capacity(projections.len());
1329            let mut retained_names = Vec::with_capacity(projections.len());
1330            let have_order_by = query.order_by.is_some();
1331
1332            // Initialize containing InheritsContext to handle empty projection case.
1333            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1334
1335            // Note: if there is an 'order by' then we project everything (original cols
1336            // and new projections) and *then* select the final cols; the retained cols
1337            // are used to ensure a correct final projection. If there's no 'order by',
1338            // clause then we can project the final column *expressions* directly.
1339            for p in projections.iter() {
1340                let name = p.to_field(schema.deref())?.name.to_string();
1341                if select_modifiers.matches_ilike(&name)
1342                    && !select_modifiers.exclude.contains(&name)
1343                {
1344                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1345
1346                    retained_cols.push(if have_order_by {
1347                        col(name.as_str())
1348                    } else {
1349                        p.clone()
1350                    });
1351                    retained_names.push(col(name));
1352                }
1353            }
1354
1355            // Apply the remaining modifiers and establish the final projection
1356            if have_order_by {
1357                // We can safely use `with_columns()` and avoid a join if:
1358                // * There is already a projection that projects to the table height.
1359                // * All projection heights inherit from context (e.g. all scalar literals that
1360                //   are to be broadcasted to table height).
1361                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1362                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1363                {
1364                    lf = lf.with_columns(projections);
1365                } else {
1366                    // We hit this branch if the output height is not guaranteed to match the table
1367                    // height. E.g.:
1368                    //
1369                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
1370                    //
1371                    // For these cases we truncate / extend the sorting columns with NULLs to match
1372                    // the output height. We do this by projecting independently and then joining
1373                    // back the original frame on the row index.
1374                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1375                    lf = lf
1376                        .clone()
1377                        .select(projections)
1378                        .with_row_index(NAME, None)
1379                        .join(
1380                            lf.with_row_index(NAME, None),
1381                            [col(NAME)],
1382                            [col(NAME)],
1383                            JoinArgs {
1384                                how: JoinType::Left,
1385                                validation: Default::default(),
1386                                suffix: None,
1387                                slice: None,
1388                                nulls_equal: false,
1389                                coalesce: Default::default(),
1390                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
1391                                build_side: None,
1392                            },
1393                        );
1394                }
1395            }
1396            if !select_modifiers.replace.is_empty() {
1397                lf = lf.with_columns(&select_modifiers.replace);
1398            }
1399            if !select_modifiers.rename.is_empty() {
1400                lf = lf.with_columns(select_modifiers.renamed_cols());
1401            }
1402            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1403
1404            // Note: If `have_order_by`, with_columns is already done above.
1405            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1406                && !have_order_by
1407            {
1408                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
1409                lf = lf.with_columns(retained_cols).select(retained_names);
1410            } else {
1411                lf = lf.select(retained_cols);
1412            }
1413            if !select_modifiers.rename.is_empty() {
1414                lf = lf.rename(
1415                    select_modifiers.rename.keys(),
1416                    select_modifiers.rename.values(),
1417                    true,
1418                );
1419            };
1420            lf
1421        } else {
1422            let having = select_stmt
1423                .having
1424                .as_ref()
1425                .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1426                .transpose()?;
1427            lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1428            lf = self.process_order_by(lf, &query.order_by, None)?;
1429
1430            // Drop any extra columns (eg: added to maintain ORDER BY access to original cols)
1431            let output_cols: Vec<_> = projections
1432                .iter()
1433                .map(|p| p.to_field(&schema))
1434                .collect::<PolarsResult<Vec<_>>>()?
1435                .into_iter()
1436                .map(|f| col(f.name))
1437                .collect();
1438
1439            lf.select(&output_cols)
1440        };
1441
1442        // Apply optional QUALIFY clause (filters on window functions).
1443        lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1444
1445        // Apply optional DISTINCT clause.
1446        lf = match &select_stmt.distinct {
1447            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1448            Some(Distinct::On(exprs)) => {
1449                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
1450                let schema = Some(self.get_frame_schema(&mut lf)?);
1451                let cols = exprs
1452                    .iter()
1453                    .map(|e| {
1454                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
1455                        if let Expr::Column(name) = expr {
1456                            Ok(name)
1457                        } else {
1458                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1459                        }
1460                    })
1461                    .collect::<PolarsResult<Vec<_>>>()?;
1462
1463                // DISTINCT ON has to apply the ORDER BY before the operation.
1464                lf = self.process_order_by(lf, &query.order_by, None)?;
1465                return Ok(lf.unique_stable(
1466                    Some(Selector::ByName {
1467                        names: cols.into(),
1468                        strict: true,
1469                    }),
1470                    UniqueKeepStrategy::First,
1471                ));
1472            },
1473            None => lf,
1474        };
1475        Ok(lf)
1476    }
1477
1478    fn column_projections(
1479        &mut self,
1480        select_stmt: &Select,
1481        schema: &SchemaRef,
1482        select_modifiers: &mut SelectModifiers,
1483    ) -> PolarsResult<Vec<Expr>> {
1484        let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1485        let mut has_qualified_wildcard = false;
1486
1487        for select_item in &select_stmt.projection {
1488            match select_item {
1489                SelectItem::UnnamedExpr(expr) => {
1490                    items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1491                        expr,
1492                        self,
1493                        Some(schema),
1494                    )?]));
1495                },
1496                SelectItem::ExprWithAlias { expr, alias } => {
1497                    let expr = parse_sql_expr(expr, self, Some(schema))?;
1498                    items.push(ProjectionItem::Exprs(vec![
1499                        expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1500                    ]));
1501                },
1502                SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1503                    SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1504                        let tbl_name = obj_name
1505                            .0
1506                            .last()
1507                            .and_then(|p| p.as_ident())
1508                            .map(|i| PlSmallStr::from_str(&i.value))
1509                            .unwrap_or_default();
1510                        let exprs = self.process_qualified_wildcard(
1511                            obj_name,
1512                            wildcard_options,
1513                            select_modifiers,
1514                            Some(schema),
1515                        )?;
1516                        items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1517                        has_qualified_wildcard = true;
1518                    },
1519                    SelectItemQualifiedWildcardKind::Expr(_) => {
1520                        polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1521                    },
1522                },
1523                SelectItem::Wildcard(wildcard_options) => {
1524                    let cols = schema
1525                        .iter_names()
1526                        .map(|name| col(name.clone()))
1527                        .collect::<Vec<_>>();
1528
1529                    items.push(ProjectionItem::Exprs(
1530                        self.process_wildcard_additional_options(
1531                            cols,
1532                            wildcard_options,
1533                            select_modifiers,
1534                            Some(schema),
1535                        )?,
1536                    ));
1537                },
1538            }
1539        }
1540
1541        // Disambiguate qualified wildcards (if any) and flatten expressions
1542        let exprs = if has_qualified_wildcard {
1543            disambiguate_projection_cols(items, schema)?
1544        } else {
1545            items
1546                .into_iter()
1547                .flat_map(|item| match item {
1548                    ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1549                        exprs
1550                    },
1551                })
1552                .collect()
1553        };
1554        let flattened_exprs = exprs
1555            .into_iter()
1556            .flat_map(|expr| expand_exprs(expr, schema))
1557            .collect();
1558
1559        Ok(flattened_exprs)
1560    }
1561
1562    fn process_where(
1563        &mut self,
1564        mut lf: LazyFrame,
1565        expr: &Option<SQLExpr>,
1566        invert_filter: bool,
1567        schema: Option<SchemaRef>,
1568    ) -> PolarsResult<LazyFrame> {
1569        if let Some(expr) = expr {
1570            let schema = match schema {
1571                None => self.get_frame_schema(&mut lf)?,
1572                Some(s) => s,
1573            };
1574
1575            // shortcut filter evaluation if given expression is just TRUE or FALSE
1576            let (all_true, all_false) = match expr {
1577                SQLExpr::Value(ValueWithSpan {
1578                    value: SQLValue::Boolean(b),
1579                    ..
1580                }) => (*b, !*b),
1581                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1582                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => {
1583                        (a.value == b.value, a.value != b.value)
1584                    },
1585                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1586                        (a.value != b.value, a.value == b.value)
1587                    },
1588                    _ => (false, false),
1589                },
1590                _ => (false, false),
1591            };
1592            if (all_true && !invert_filter) || (all_false && invert_filter) {
1593                return Ok(lf);
1594            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1595                return Ok(clear_lf(lf));
1596            }
1597
1598            // ...otherwise parse and apply the filter as normal
1599            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1600            if filter_expression.clone().meta().has_multiple_outputs() {
1601                filter_expression = all_horizontal([filter_expression])?;
1602            }
1603            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1604            lf = if invert_filter {
1605                lf.remove(filter_expression)
1606            } else {
1607                lf.filter(filter_expression)
1608            };
1609        }
1610        Ok(lf)
1611    }
1612
1613    pub(super) fn process_join(
1614        &mut self,
1615        tbl_left: &TableInfo,
1616        tbl_right: &TableInfo,
1617        constraint: &JoinConstraint,
1618        join_type: JoinType,
1619    ) -> PolarsResult<LazyFrame> {
1620        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1621        let coalesce_type = match constraint {
1622            // "NATURAL" joins should coalesce; otherwise we disambiguate
1623            JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1624            _ => JoinCoalesce::KeepColumns,
1625        };
1626        let joined = tbl_left
1627            .frame
1628            .clone()
1629            .join_builder()
1630            .with(tbl_right.frame.clone())
1631            .left_on(left_on)
1632            .right_on(right_on)
1633            .how(join_type)
1634            .suffix(format!(":{}", tbl_right.name))
1635            .coalesce(coalesce_type)
1636            .finish();
1637
1638        Ok(joined)
1639    }
1640
1641    fn process_qualify(
1642        &mut self,
1643        mut lf: LazyFrame,
1644        qualify_expr: &Option<SQLExpr>,
1645        window_fn_columns: &PlHashSet<String>,
1646    ) -> PolarsResult<LazyFrame> {
1647        if let Some(expr) = qualify_expr {
1648            // Check the QUALIFY expression to identify window functions
1649            // and collect column refs (for looking up aliases from SELECT)
1650            let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1651            let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1652            if !has_window_fns && !references_window_alias {
1653                polars_bail!(
1654                    SQLSyntax:
1655                    "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1656                );
1657            }
1658            let schema = self.get_frame_schema(&mut lf)?;
1659            let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1660            if filter_expression.clone().meta().has_multiple_outputs() {
1661                filter_expression = all_horizontal([filter_expression])?;
1662            }
1663            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1664            lf = lf.filter(filter_expression);
1665        }
1666        Ok(lf)
1667    }
1668
1669    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1670        let mut subplans = vec![];
1671
1672        for e in exprs {
1673            *e = e.clone().map_expr(|e| {
1674                if let Expr::SubPlan(lp, names) = e {
1675                    assert_eq!(
1676                        names.len(),
1677                        1,
1678                        "multiple columns in subqueries not yet supported"
1679                    );
1680
1681                    let select_expr = names[0].1.clone();
1682                    let cb =
1683                        PlanCallback::new(move |(plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
1684                            let schema = &schemas[0];
1685                            polars_ensure!(schema.len() == 1,  SQLSyntax: "SQL subquery returns more than one column");
1686                            Ok(LazyFrame::from(plans.into_iter().next().unwrap()).select([select_expr.clone()]).logical_plan)
1687                        });
1688                    subplans.push(LazyFrame::from((**lp).clone()).pipe_with_schema(cb));
1689                    Expr::Column(names[0].0.clone()).first()
1690                } else {
1691                    e
1692                }
1693            });
1694        }
1695
1696        if subplans.is_empty() {
1697            lf
1698        } else {
1699            subplans.insert(0, lf);
1700            concat_lf_horizontal(
1701                subplans,
1702                HConcatOptions {
1703                    broadcast_unit_length: true,
1704                    ..Default::default()
1705                },
1706            )
1707            .unwrap()
1708        }
1709    }
1710
1711    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1712        if let Statement::CreateTable(CreateTable {
1713            if_not_exists,
1714            name,
1715            query,
1716            columns,
1717            like,
1718            ..
1719        }) = stmt
1720        {
1721            let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1722            if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1723                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1724            }
1725            let lf = match (query, columns.is_empty(), like) {
1726                (Some(query), true, None) => {
1727                    // ----------------------------------------------------
1728                    // CREATE TABLE [IF NOT EXISTS] <name> AS <query>
1729                    // ----------------------------------------------------
1730                    self.execute_query(query)?
1731                },
1732                (None, false, None) => {
1733                    // ----------------------------------------------------
1734                    // CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
1735                    // ----------------------------------------------------
1736                    let mut schema = Schema::with_capacity(columns.len());
1737                    for col in columns {
1738                        let col_name = col.name.value.as_str();
1739                        let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1740                        schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1741                    }
1742                    DataFrame::empty_with_schema(&schema).lazy()
1743                },
1744                (None, true, Some(like_kind)) => {
1745                    // ----------------------------------------------------
1746                    // CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
1747                    // ----------------------------------------------------
1748                    let like_name = match like_kind {
1749                        CreateTableLikeKind::Plain(like)
1750                        | CreateTableLikeKind::Parenthesized(like) => &like.name,
1751                    };
1752                    let like_table = like_name
1753                        .0
1754                        .first()
1755                        .unwrap()
1756                        .as_ident()
1757                        .unwrap()
1758                        .value
1759                        .as_str();
1760                    if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1761                        clear_lf(table)
1762                    } else {
1763                        polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1764                    }
1765                },
1766                // No valid options provided
1767                (None, true, None) => {
1768                    polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1769                },
1770                // Mutually exclusive options
1771                _ => {
1772                    polars_bail!(
1773                        SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1774                        query,
1775                        columns,
1776                        like,
1777                    )
1778                },
1779            };
1780            self.register(tbl_name, lf);
1781
1782            let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1783            Ok(df_created.unwrap().lazy())
1784        } else {
1785            unreachable!()
1786        }
1787    }
1788
1789    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1790        match relation {
1791            TableFactor::Table {
1792                name, alias, args, ..
1793            } => {
1794                if let Some(args) = args {
1795                    return self.execute_table_function(name, alias, &args.args);
1796                }
1797                let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1798                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1799                    match alias {
1800                        Some(alias) => {
1801                            self.table_aliases
1802                                .insert(alias.name.value.clone(), tbl_name.to_string());
1803                            Ok((alias.name.value.clone(), lf))
1804                        },
1805                        None => Ok((tbl_name.to_string(), lf)),
1806                    }
1807                } else {
1808                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1809                }
1810            },
1811            TableFactor::Derived {
1812                lateral,
1813                subquery,
1814                alias,
1815            } => {
1816                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1817                if let Some(alias) = alias {
1818                    let mut lf = self.execute_query_no_ctes(subquery)?;
1819                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1820                    self.table_map
1821                        .write()
1822                        .unwrap()
1823                        .insert(alias.name.value.clone(), lf.clone());
1824                    Ok((alias.name.value.clone(), lf))
1825                } else {
1826                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1827                }
1828            },
1829            TableFactor::UNNEST {
1830                alias,
1831                array_exprs,
1832                with_offset,
1833                with_offset_alias: _,
1834                ..
1835            } => {
1836                if let Some(alias) = alias {
1837                    let column_names: Vec<Option<PlSmallStr>> = alias
1838                        .columns
1839                        .iter()
1840                        .map(|c| {
1841                            if c.name.value.is_empty() {
1842                                None
1843                            } else {
1844                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1845                            }
1846                        })
1847                        .collect();
1848
1849                    let column_values: Vec<Series> = array_exprs
1850                        .iter()
1851                        .map(|arr| parse_sql_array(arr, self))
1852                        .collect::<Result<_, _>>()?;
1853
1854                    polars_ensure!(!column_names.is_empty(),
1855                        SQLSyntax:
1856                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1857                    );
1858                    if column_names.len() != column_values.len() {
1859                        let plural = if column_values.len() > 1 { "s" } else { "" };
1860                        polars_bail!(
1861                            SQLSyntax:
1862                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1863                        );
1864                    }
1865                    let column_series: Vec<Column> = column_values
1866                        .into_iter()
1867                        .zip(column_names)
1868                        .map(|(s, name)| {
1869                            if let Some(name) = name {
1870                                s.with_name(name)
1871                            } else {
1872                                s
1873                            }
1874                        })
1875                        .map(Column::from)
1876                        .collect();
1877
1878                    let lf = DataFrame::new_infer_height(column_series)?.lazy();
1879
1880                    if *with_offset {
1881                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1882                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1883                    }
1884                    let table_name = alias.name.value.clone();
1885                    self.table_map
1886                        .write()
1887                        .unwrap()
1888                        .insert(table_name.clone(), lf.clone());
1889                    Ok((table_name, lf))
1890                } else {
1891                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1892                }
1893            },
1894            TableFactor::NestedJoin {
1895                table_with_joins,
1896                alias,
1897            } => {
1898                let lf = self.execute_from_statement(table_with_joins)?;
1899                match alias {
1900                    Some(a) => Ok((a.name.value.clone(), lf)),
1901                    None => Ok(("".to_string(), lf)),
1902                }
1903            },
1904            // Support bare table, optionally with an alias, for now
1905            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1906        }
1907    }
1908
1909    fn execute_table_function(
1910        &mut self,
1911        name: &ObjectName,
1912        alias: &Option<TableAlias>,
1913        args: &[FunctionArg],
1914    ) -> PolarsResult<(String, LazyFrame)> {
1915        let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1916        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1917        let (tbl_name, lf) = read_fn.execute(args)?;
1918        #[allow(clippy::useless_asref)]
1919        let tbl_name = alias
1920            .as_ref()
1921            .map(|a| a.name.value.clone())
1922            .unwrap_or_else(|| tbl_name.to_string());
1923
1924        self.table_map
1925            .write()
1926            .unwrap()
1927            .insert(tbl_name.clone(), lf.clone());
1928        Ok((tbl_name, lf))
1929    }
1930
1931    fn process_order_by(
1932        &mut self,
1933        mut lf: LazyFrame,
1934        order_by: &Option<OrderBy>,
1935        selected: Option<&[Expr]>,
1936    ) -> PolarsResult<LazyFrame> {
1937        if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1938            OrderByKind::Expressions(exprs) => exprs.is_empty(),
1939            OrderByKind::All(_) => false,
1940        }) {
1941            return Ok(lf);
1942        }
1943        let schema = self.get_frame_schema(&mut lf)?;
1944        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1945        let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1946            OrderByKind::Expressions(exprs) => {
1947                // TODO: will look at making an upstream PR that allows us to more easily
1948                //  create a GenericDialect variant supporting "OrderByKind::All" instead
1949                if exprs.len() == 1
1950                    && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1951                        if ident.value.to_uppercase() == "ALL"
1952                        && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1953                {
1954                    // Treat as ORDER BY ALL
1955                    let n_cols = if let Some(selected) = selected {
1956                        selected.len()
1957                    } else {
1958                        schema.len()
1959                    };
1960                    (vec![], Some(&exprs[0].options), n_cols)
1961                } else {
1962                    (exprs.clone(), None, exprs.len())
1963                }
1964            },
1965            OrderByKind::All(opts) => {
1966                let n_cols = if let Some(selected) = selected {
1967                    selected.len()
1968                } else {
1969                    schema.len()
1970                };
1971                (vec![], Some(opts), n_cols)
1972            },
1973        };
1974        let mut descending = Vec::with_capacity(n_order_cols);
1975        let mut nulls_last = Vec::with_capacity(n_order_cols);
1976        let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1977
1978        if let Some(opts) = order_by_all {
1979            if let Some(selected) = selected {
1980                by.extend(selected.iter().cloned());
1981            } else {
1982                by.extend(columns_iter);
1983            };
1984            let desc_order = !opts.asc.unwrap_or(true);
1985            nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1986            descending.resize(by.len(), desc_order);
1987        } else {
1988            let columns = &columns_iter.collect::<Vec<_>>();
1989            for ob in order_by {
1990                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1991                // https://www.postgresql.org/docs/current/queries-order.html
1992                let desc_order = !ob.options.asc.unwrap_or(true);
1993                nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1994                descending.push(desc_order);
1995
1996                // translate order expression, allowing ordinal values
1997                by.push(self.expr_or_ordinal(
1998                    &ob.expr,
1999                    columns,
2000                    selected,
2001                    Some(&schema),
2002                    "ORDER BY",
2003                )?)
2004            }
2005        }
2006        Ok(lf.sort_by_exprs(
2007            &by,
2008            SortMultipleOptions::default()
2009                .with_order_descending_multi(descending)
2010                .with_nulls_last_multi(nulls_last),
2011        ))
2012    }
2013
2014    fn process_group_by(
2015        &mut self,
2016        mut lf: LazyFrame,
2017        group_by_keys: &[Expr],
2018        projections: &[Expr],
2019        having: Option<Expr>,
2020    ) -> PolarsResult<LazyFrame> {
2021        let schema_before = self.get_frame_schema(&mut lf)?;
2022        let group_by_keys_schema =
2023            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2024                format!("group_by keys contained duplicate output name '{duplicate_name}'")
2025            })?;
2026
2027        // Note: remove the `group_by` keys as Polars adds those implicitly.
2028        let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2029        let mut aggregation_projection = Vec::with_capacity(projections.len());
2030        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2031        let mut projection_aliases = PlHashSet::new();
2032        let mut group_key_aliases = PlHashSet::new();
2033
2034        // Pre-compute group key data (stripped expression + output name) to avoid repeated work.
2035        // We check both expression AND output name match to avoid cross-aliasing issues.
2036        let group_key_data: Vec<_> = group_by_keys
2037            .iter()
2038            .map(|gk| {
2039                (
2040                    strip_outer_alias(gk),
2041                    gk.to_field(&schema_before).ok().map(|f| f.name),
2042                )
2043            })
2044            .collect();
2045
2046        let projection_matches_group_key: Vec<bool> = projections
2047            .iter()
2048            .map(|p| {
2049                let p_stripped = strip_outer_alias(p);
2050                let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
2051                group_key_data
2052                    .iter()
2053                    .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
2054            })
2055            .collect();
2056
2057        for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
2058            // `Len` represents COUNT(*) so we treat as an aggregation here.
2059            let is_non_group_key_expr = !matches_group_key
2060                && has_expr(e, |e| {
2061                    match e {
2062                        Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2063                        #[cfg(feature = "dynamic_group_by")]
2064                        Expr::Rolling { .. } => true,
2065                        Expr::Function { function: func, .. }
2066                            if !matches!(func, FunctionExpr::StructExpr(_)) =>
2067                        {
2068                            // If it's a function call containing a column NOT in the group by keys,
2069                            // we treat it as an aggregation.
2070                            has_expr(e, |e| match e {
2071                                Expr::Column(name) => !group_by_keys_schema.contains(name),
2072                                _ => false,
2073                            })
2074                        },
2075                        _ => false,
2076                    }
2077                });
2078
2079            // Note: if simple aliased expression we defer aliasing until after the group_by.
2080            // Use `e_inner` to track the potentially unwrapped expression for field lookup.
2081            let mut e_inner = e;
2082            if let Expr::Alias(expr, alias) = e {
2083                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2084                    group_key_aliases.insert(alias.as_ref());
2085                    e_inner = expr
2086                } else if let Expr::Function {
2087                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2088                    ..
2089                } = expr.deref()
2090                {
2091                    projection_overrides
2092                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2093                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2094                    projection_aliases.insert(alias.as_ref());
2095                }
2096            }
2097            let field = e_inner.to_field(&schema_before)?;
2098            if is_non_group_key_expr {
2099                let mut e = e.clone();
2100                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
2101                    e = (**expr).clone();
2102                } else if let Expr::Alias(expr, name) = &e {
2103                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
2104                        e = (**expr).clone().alias(name.clone());
2105                    }
2106                }
2107                // If aggregation colname conflicts with a group key,
2108                // alias it to avoid duplicate/mis-tracked columns
2109                if group_by_keys_schema.get(&field.name).is_some() {
2110                    let alias_name = format!("__POLARS_AGG_{}", field.name);
2111                    e = e.alias(alias_name.as_str());
2112                    aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
2113                }
2114                aggregation_projection.push(e);
2115            } else if !matches_group_key {
2116                // Non-aggregated columns must be part of the GROUP BY clause
2117                if let Expr::Column(_)
2118                | Expr::Function {
2119                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2120                    ..
2121                } = e_inner
2122                {
2123                    if !group_by_keys_schema.contains(&field.name) {
2124                        polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2125                    }
2126                }
2127            }
2128        }
2129
2130        // Process HAVING clause: identify aggregate expressions, reusing those already
2131        // in projections, or compute as temporary columns and then post-filter/discard
2132        let having_filter = if let Some(having_expr) = having {
2133            let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2134                .iter()
2135                .filter_map(|p| match p {
2136                    Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2137                        Some((inner.as_ref().clone(), name.clone()))
2138                    },
2139                    e @ (Expr::Agg(_) | Expr::Len) => Some((
2140                        e.clone(),
2141                        e.to_field(&schema_before)
2142                            .map(|f| f.name)
2143                            .unwrap_or_default(),
2144                    )),
2145                    _ => None,
2146                })
2147                .collect();
2148
2149            let mut n_having_aggs = 0;
2150            let updated_having = having_expr.map_expr(|e| {
2151                if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2152                    return e;
2153                }
2154                let name = agg_to_name
2155                    .iter()
2156                    .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2157                    .unwrap_or_else(|| {
2158                        let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2159                        aggregation_projection.push(e.clone().alias(n.clone()));
2160                        agg_to_name.push((e.clone(), n.clone()));
2161                        n_having_aggs += 1;
2162                        n
2163                    });
2164                col(name)
2165            });
2166            Some(updated_having)
2167        } else {
2168            None
2169        };
2170
2171        // Apply HAVING filter after aggregation
2172        let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2173        if let Some(filter_expr) = having_filter {
2174            aggregated = aggregated.filter(filter_expr);
2175        }
2176
2177        let projection_schema =
2178            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2179                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2180            })?;
2181
2182        // A final projection to get the proper order and any deferred transforms/aliases
2183        // (will also drop any temporary columns created for the HAVING post-filter).
2184        let final_projection = projection_schema
2185            .iter_names()
2186            .zip(projections.iter().zip(&projection_matches_group_key))
2187            .map(|(name, (projection_expr, &matches_group_key))| {
2188                if let Some(expr) = projection_overrides.get(name.as_str()) {
2189                    expr.clone()
2190                } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2191                    col(aliased_name.clone()).alias(name.clone())
2192                } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
2193                    col(name.clone())
2194                } else if group_by_keys_schema.get(name).is_some()
2195                    || projection_aliases.contains(name.as_str())
2196                    || group_key_aliases.contains(name.as_str())
2197                {
2198                    if has_expr(projection_expr, |e| {
2199                        matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2200                    }) {
2201                        col(name.clone())
2202                    } else {
2203                        projection_expr.clone()
2204                    }
2205                } else {
2206                    col(name.clone())
2207                }
2208            })
2209            .collect::<Vec<_>>();
2210
2211        // Include original GROUP BY columns for ORDER BY access (if aliased).
2212        let mut output_projection = final_projection;
2213        for key_name in group_by_keys_schema.iter_names() {
2214            if !projection_schema.contains(key_name) {
2215                // Original col name not in output - add for ORDER BY access
2216                output_projection.push(col(key_name.clone()));
2217            } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2218                // Original col name in output - check if cross-aliased
2219                let is_cross_aliased = projections.iter().any(|p| {
2220                    p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2221                        && !is_simple_col_ref(p, key_name)
2222                });
2223                if is_cross_aliased {
2224                    // Add original name under a prefixed alias for subsequent ORDER BY resolution
2225                    let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2226                    output_projection.push(col(key_name.clone()).alias(internal_name));
2227                }
2228            }
2229        }
2230        Ok(aggregated.select(&output_projection))
2231    }
2232
2233    fn process_limit_offset(
2234        &self,
2235        lf: LazyFrame,
2236        limit_clause: &Option<LimitClause>,
2237        fetch: &Option<Fetch>,
2238    ) -> PolarsResult<LazyFrame> {
2239        // Extract limit and offset from LimitClause
2240        let (limit, offset) = match limit_clause {
2241            Some(LimitClause::LimitOffset {
2242                limit,
2243                offset,
2244                limit_by,
2245            }) => {
2246                if !limit_by.is_empty() {
2247                    // TODO: might be able to support as an aggregate `top_k_by` operation?
2248                    //  (https://clickhouse.com/docs/sql-reference/statements/select/limit-by)
2249                    polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2250                }
2251                (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2252            },
2253            Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2254            None => (None, None),
2255        };
2256
2257        // Handle FETCH clause (alternative to LIMIT, mutually exclusive)
2258        let limit = match (fetch, limit) {
2259            (Some(fetch), None) => fetch.quantity.as_ref(),
2260            (Some(_), Some(_)) => {
2261                polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2262            },
2263            (None, limit) => limit,
2264        };
2265
2266        // Apply limit and/or offset
2267        match (offset, limit) {
2268            (
2269                Some(SQLExpr::Value(ValueWithSpan {
2270                    value: SQLValue::Number(offset, _),
2271                    ..
2272                })),
2273                Some(SQLExpr::Value(ValueWithSpan {
2274                    value: SQLValue::Number(limit, _),
2275                    ..
2276                })),
2277            ) => Ok(lf.slice(
2278                offset
2279                    .parse()
2280                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2281                limit.parse().map_err(
2282                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2283                )?,
2284            )),
2285            (
2286                Some(SQLExpr::Value(ValueWithSpan {
2287                    value: SQLValue::Number(offset, _),
2288                    ..
2289                })),
2290                None,
2291            ) => Ok(lf.slice(
2292                offset
2293                    .parse()
2294                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2295                IdxSize::MAX,
2296            )),
2297            (
2298                None,
2299                Some(SQLExpr::Value(ValueWithSpan {
2300                    value: SQLValue::Number(limit, _),
2301                    ..
2302                })),
2303            ) => {
2304                Ok(lf.limit(limit.parse().map_err(
2305                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2306                )?))
2307            },
2308            (None, None) => Ok(lf),
2309            _ => polars_bail!(
2310                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2311            ),
2312        }
2313    }
2314
2315    fn process_qualified_wildcard(
2316        &mut self,
2317        ObjectName(idents): &ObjectName,
2318        options: &WildcardAdditionalOptions,
2319        modifiers: &mut SelectModifiers,
2320        schema: Option<&Schema>,
2321    ) -> PolarsResult<Vec<Expr>> {
2322        let mut idents_with_wildcard: Vec<Ident> = idents
2323            .iter()
2324            .filter_map(|p| p.as_ident().cloned())
2325            .collect();
2326        idents_with_wildcard.push(Ident::new("*"));
2327
2328        let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2329        self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2330    }
2331
2332    fn process_wildcard_additional_options(
2333        &mut self,
2334        exprs: Vec<Expr>,
2335        options: &WildcardAdditionalOptions,
2336        modifiers: &mut SelectModifiers,
2337        schema: Option<&Schema>,
2338    ) -> PolarsResult<Vec<Expr>> {
2339        if options.opt_except.is_some() && options.opt_exclude.is_some() {
2340            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2341        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2342            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2343        }
2344
2345        // SELECT * EXCLUDE
2346        if let Some(items) = &options.opt_exclude {
2347            match items {
2348                ExcludeSelectItem::Single(ident) => {
2349                    modifiers.exclude.insert(ident.value.clone());
2350                },
2351                ExcludeSelectItem::Multiple(idents) => {
2352                    modifiers
2353                        .exclude
2354                        .extend(idents.iter().map(|i| i.value.clone()));
2355                },
2356            };
2357        }
2358
2359        // SELECT * EXCEPT
2360        if let Some(items) = &options.opt_except {
2361            modifiers.exclude.insert(items.first_element.value.clone());
2362            modifiers
2363                .exclude
2364                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2365        }
2366
2367        // SELECT * ILIKE
2368        if let Some(item) = &options.opt_ilike {
2369            let rx = regex::escape(item.pattern.as_str())
2370                .replace('%', ".*")
2371                .replace('_', ".");
2372
2373            modifiers.ilike = Some(
2374                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2375            );
2376        }
2377
2378        // SELECT * RENAME
2379        if let Some(items) = &options.opt_rename {
2380            let renames = match items {
2381                RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2382                RenameSelectItem::Multiple(renames) => renames.as_slice(),
2383            };
2384            for rn in renames {
2385                let before = PlSmallStr::from_str(rn.ident.value.as_str());
2386                let after = PlSmallStr::from_str(rn.alias.value.as_str());
2387                if before != after {
2388                    modifiers.rename.insert(before, after);
2389                }
2390            }
2391        }
2392
2393        // SELECT * REPLACE
2394        if let Some(replacements) = &options.opt_replace {
2395            for rp in &replacements.items {
2396                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2397                modifiers
2398                    .replace
2399                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2400            }
2401        }
2402        Ok(exprs)
2403    }
2404
2405    fn rename_columns_from_table_alias(
2406        &mut self,
2407        mut lf: LazyFrame,
2408        alias: &TableAlias,
2409    ) -> PolarsResult<LazyFrame> {
2410        if alias.columns.is_empty() {
2411            Ok(lf)
2412        } else {
2413            let schema = self.get_frame_schema(&mut lf)?;
2414            if alias.columns.len() != schema.len() {
2415                polars_bail!(
2416                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2417                    alias.columns.len(), alias.name.value, schema.len()
2418                )
2419            } else {
2420                let existing_columns: Vec<_> = schema.iter_names().collect();
2421                let new_columns: Vec<_> =
2422                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
2423                Ok(lf.rename(existing_columns, new_columns, true))
2424            }
2425        }
2426    }
2427}
2428
2429impl SQLContext {
2430    /// Create a new SQLContext from a table map. For internal use only
2431    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2432        Self {
2433            table_map: Arc::new(RwLock::new(table_map)),
2434            ..Default::default()
2435        }
2436    }
2437}
2438
2439fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2440    match expr {
2441        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2442            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2443            schema
2444                .iter_names()
2445                .filter(|name| re.is_match(name))
2446                .map(|name| col(name.clone()))
2447                .collect::<Vec<_>>()
2448        },
2449        Expr::Selector(s) => s
2450            .into_columns(schema, &Default::default())
2451            .unwrap()
2452            .into_iter()
2453            .map(col)
2454            .collect::<Vec<_>>(),
2455        _ => vec![expr],
2456    }
2457}
2458
2459fn is_regex_colname(nm: &str) -> bool {
2460    nm.starts_with('^') && nm.ends_with('$')
2461}
2462
2463/// Extract column names from a USING clause in a JoinOperator (if present).
2464fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2465    use JoinOperator::*;
2466    match op {
2467        Join(JoinConstraint::Using(cols))
2468        | Inner(JoinConstraint::Using(cols))
2469        | Left(JoinConstraint::Using(cols))
2470        | LeftOuter(JoinConstraint::Using(cols))
2471        | Right(JoinConstraint::Using(cols))
2472        | RightOuter(JoinConstraint::Using(cols))
2473        | FullOuter(JoinConstraint::Using(cols))
2474        | Semi(JoinConstraint::Using(cols))
2475        | Anti(JoinConstraint::Using(cols))
2476        | LeftSemi(JoinConstraint::Using(cols))
2477        | LeftAnti(JoinConstraint::Using(cols))
2478        | RightSemi(JoinConstraint::Using(cols))
2479        | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2480            c.0.first()
2481                .and_then(|p| p.as_ident())
2482                .map(|i| i.value.clone())
2483        })),
2484        _ => None,
2485    }
2486}
2487
2488/// Extract the table name (or alias) from a TableFactor.
2489fn get_table_name(factor: &TableFactor) -> Option<String> {
2490    match factor {
2491        TableFactor::Table { name, alias, .. } => {
2492            alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2493                name.0
2494                    .last()
2495                    .and_then(|p| p.as_ident())
2496                    .map(|i| i.value.clone())
2497            })
2498        },
2499        TableFactor::Derived { alias, .. }
2500        | TableFactor::NestedJoin { alias, .. }
2501        | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2502        _ => None,
2503    }
2504}
2505
2506/// Check if an expression is a simple column reference (with optional alias) to the given name.
2507fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2508    match expr {
2509        Expr::Column(n) => n == col_name,
2510        Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2511        _ => false,
2512    }
2513}
2514
2515/// Strip the outer alias from an expression (if present) for expression equality comparison.
2516fn strip_outer_alias(expr: &Expr) -> Expr {
2517    if let Expr::Alias(inner, _) = expr {
2518        inner.as_ref().clone()
2519    } else {
2520        expr.clone()
2521    }
2522}
2523
2524/// Resolve a SELECT alias to its underlying expression (for use in GROUP BY).
2525///
2526/// Returns the expression WITH alias if the name matches a projection alias and is NOT a column
2527/// that exists in the schema; otherwise returns `None` to use the default/standard resolution.
2528fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2529    // Original columns take precedence over SELECT aliases
2530    if schema.contains(name) {
2531        return None;
2532    }
2533    // Find a projection with this alias and return its expression (preserving the alias)
2534    projections.iter().find_map(|p| match p {
2535        Expr::Alias(inner, alias) if alias.as_str() == name => {
2536            Some(inner.as_ref().clone().alias(alias.clone()))
2537        },
2538        _ => None,
2539    })
2540}
2541
2542/// Check if all columns referred to in a Polars expression exist in the given Schema.
2543fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2544    let mut found_cols = false;
2545    let mut all_in_schema = true;
2546    for e in expr.into_iter() {
2547        if let Expr::Column(name) = e {
2548            found_cols = true;
2549            if !schema.contains(name.as_str()) {
2550                all_in_schema = false;
2551                break;
2552            }
2553        }
2554    }
2555    found_cols && all_in_schema
2556}
2557
2558/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
2559///
2560/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
2561/// either way round, and in joins with more than two tables you can also join against an earlier
2562/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
2563/// we join `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
2564/// resolve as our native `join` function operates on only two tables at a time.
2565fn determine_left_right_join_on(
2566    ctx: &mut SQLContext,
2567    expr_left: &SQLExpr,
2568    expr_right: &SQLExpr,
2569    tbl_left: &TableInfo,
2570    tbl_right: &TableInfo,
2571    join_schema: &Schema,
2572) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2573    // parse, removing any aliases that may have been added by `resolve_column`
2574    // (called inside `parse_sql_expr`) as we need the actual/underlying col
2575    let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2576        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2577        e => e,
2578    };
2579    let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2580        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2581        e => e,
2582    };
2583
2584    // ------------------------------------------------------------------
2585    // simple/typical case: can fully resolve SQL-level table references
2586    // ------------------------------------------------------------------
2587    let left_refs = (
2588        expr_refers_to_table(expr_left, &tbl_left.name),
2589        expr_refers_to_table(expr_left, &tbl_right.name),
2590    );
2591    let right_refs = (
2592        expr_refers_to_table(expr_right, &tbl_left.name),
2593        expr_refers_to_table(expr_right, &tbl_right.name),
2594    );
2595    // if the SQL-level references unambiguously indicate table ownership, we're done
2596    match (left_refs, right_refs) {
2597        // standard: left expr → left table, right expr → right table
2598        ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2599        // reversed: left expr → right table, right expr → left table
2600        ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2601        // unsupported: one side references *both* tables
2602        ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2603            polars_bail!(
2604               SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2605               if left_refs.0 && left_refs.1 {
2606                    "left"
2607                } else {
2608                    "right"
2609                }, tbl_left.name, tbl_right.name
2610            )
2611        },
2612        // fall through to the more involved col/ref resolution
2613        _ => {},
2614    }
2615
2616    // ------------------------------------------------------------------
2617    // more involved: additionally employ schema-based column resolution
2618    // (applies to unqualified columns and/or chained joins)
2619    // ------------------------------------------------------------------
2620    let left_on_cols_in = (
2621        expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2622        expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2623    );
2624    let right_on_cols_in = (
2625        expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2626        expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2627    );
2628    match (left_on_cols_in, right_on_cols_in) {
2629        // each expression's columns exist in exactly one schema
2630        ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2631        ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2632        // one expression in both, other only in one; prefer the unique one
2633        ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2634        ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2635        ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2636        ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2637        // pass through as-is
2638        _ => Ok((vec![left_on], vec![right_on])),
2639    }
2640}
2641
2642fn process_join_on(
2643    ctx: &mut SQLContext,
2644    sql_expr: &SQLExpr,
2645    tbl_left: &TableInfo,
2646    tbl_right: &TableInfo,
2647) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2648    match sql_expr {
2649        SQLExpr::BinaryOp { left, op, right } => match op {
2650            BinaryOperator::And => {
2651                let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2652                let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2653                left_i.append(&mut left_j);
2654                right_i.append(&mut right_j);
2655                Ok((left_i, right_i))
2656            },
2657            BinaryOperator::Eq => {
2658                // establish unified schema with cols from both tables; needed for multi/chained
2659                // joins where suffixed intermediary/joined cols aren't in an existing schema.
2660                let mut join_schema =
2661                    Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2662                for (name, dtype) in tbl_left.schema.iter() {
2663                    join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2664                }
2665                for (name, dtype) in tbl_right.schema.iter() {
2666                    if !join_schema.contains(name) {
2667                        join_schema.insert_at_index(
2668                            join_schema.len(),
2669                            name.clone(),
2670                            dtype.clone(),
2671                        )?;
2672                    }
2673                }
2674                determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2675            },
2676            _ => polars_bail!(
2677                // TODO: should be able to support more operators later (via `join_where`?)
2678                SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2679            ),
2680        },
2681        SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2682        _ => polars_bail!(
2683            SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2684        ),
2685    }
2686}
2687
2688fn process_join_constraint(
2689    constraint: &JoinConstraint,
2690    tbl_left: &TableInfo,
2691    tbl_right: &TableInfo,
2692    ctx: &mut SQLContext,
2693) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2694    match constraint {
2695        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2696            process_join_on(ctx, expr, tbl_left, tbl_right)
2697        },
2698        JoinConstraint::Using(idents) if !idents.is_empty() => {
2699            let using: Vec<Expr> = idents
2700                .iter()
2701                .map(|ObjectName(parts)| {
2702                    if parts.len() != 1 {
2703                        polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2704                    }
2705                    match parts[0].as_ident() {
2706                        Some(ident) => Ok(col(ident.value.as_str())),
2707                        None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2708                    }
2709                })
2710                .collect::<PolarsResult<Vec<_>>>()?;
2711            Ok((using.clone(), using))
2712        },
2713        JoinConstraint::Natural => {
2714            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2715            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2716            let on: Vec<Expr> = left_names
2717                .intersection(&right_names)
2718                .map(|&name| col(name.clone()))
2719                .collect();
2720            if on.is_empty() {
2721                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2722            }
2723            Ok((on.clone(), on))
2724        },
2725        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2726    }
2727}
2728
2729/// Extract table identifiers referenced in a SQL query; uses a visitor to
2730/// collect all table names that appear in FROM clauses, JOINs, TABLE refs
2731/// in set operations, and subqueries.
2732pub fn extract_table_identifiers(
2733    query: &str,
2734    include_schema: bool,
2735    unique: bool,
2736) -> PolarsResult<Vec<String>> {
2737    let mut parser = Parser::new(&GenericDialect);
2738    parser = parser.with_options(ParserOptions {
2739        trailing_commas: true,
2740        ..Default::default()
2741    });
2742    let ast = parser
2743        .try_with_sql(query)
2744        .map_err(to_sql_interface_err)?
2745        .parse_statements()
2746        .map_err(to_sql_interface_err)?;
2747
2748    let mut collector = TableIdentifierCollector {
2749        include_schema,
2750        ..Default::default()
2751    };
2752    for stmt in &ast {
2753        let _ = stmt.visit(&mut collector);
2754    }
2755    Ok(if unique {
2756        collector
2757            .tables
2758            .into_iter()
2759            .collect::<PlIndexSet<_>>()
2760            .into_iter()
2761            .collect()
2762    } else {
2763        collector.tables
2764    })
2765}
2766
2767bitflags::bitflags! {
2768    /// Bitfield indicating whether there exists a projection with the specified height behavior.
2769    ///
2770    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
2771    /// context.
2772    #[derive(PartialEq)]
2773    struct ExprSqlProjectionHeightBehavior: u8 {
2774        /// Maintains the height of input column(s)
2775        const MaintainsColumn = 1 << 0;
2776        /// Height is independent of input, e.g.:
2777        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
2778        /// * aggregations: count(*), first(), sum() etc.
2779        const Independent = 1 << 1;
2780        /// "Inherits" the height of the context, e.g.:
2781        /// * Scalar literals
2782        const InheritsContext = 1 << 2;
2783    }
2784}
2785
2786impl ExprSqlProjectionHeightBehavior {
2787    fn identify_from_expr(expr: &Expr) -> Self {
2788        let mut has_column = false;
2789        let mut has_independent = false;
2790
2791        for e in expr.into_iter() {
2792            use Expr::*;
2793            has_column |= matches!(e, Column(_) | Selector(_));
2794            has_independent |= match e {
2795                // @TODO: This is broken now with functions.
2796                AnonymousFunction { options, .. } => {
2797                    options.returns_scalar() || !options.is_length_preserving()
2798                },
2799                Literal(v) => !v.is_scalar(),
2800                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2801                Agg { .. } | Len => true,
2802                _ => false,
2803            }
2804        }
2805        if has_independent {
2806            Self::Independent
2807        } else if has_column {
2808            Self::MaintainsColumn
2809        } else {
2810            Self::InheritsContext
2811        }
2812    }
2813}