Skip to main content

polars_sql/
context.rs

1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::{JoinCoalesce, MaintainOrderJoin};
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13    BinaryOperator as SQLBinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct,
14    ExcludeSelectItem, Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident,
15    JoinConstraint, JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName,
16    ObjectType, OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectFlavor, SelectItem,
17    SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18    TableFactor, TableWithJoins, Truncate, UnaryOperator as SQLUnaryOperator, Value as SQLValue,
19    ValueWithSpan, Values, Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29    QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30    expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35#[derive(Clone)]
36pub struct TableInfo {
37    pub(crate) frame: LazyFrame,
38    pub(crate) name: PlSmallStr,
39    pub(crate) schema: Arc<Schema>,
40}
41
42struct SelectModifiers {
43    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
44    ilike: Option<regex::Regex>,               // SELECT * ILIKE
45    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
46    replace: Vec<Expr>,                        // SELECT * REPLACE
47}
48impl SelectModifiers {
49    fn matches_ilike(&self, s: &str) -> bool {
50        match &self.ilike {
51            Some(rx) => rx.is_match(s),
52            None => true,
53        }
54    }
55    fn renamed_cols(&self) -> Vec<Expr> {
56        self.rename
57            .iter()
58            .map(|(before, after)| col(before.clone()).alias(after.clone()))
59            .collect()
60    }
61}
62
63/// For SELECT projection items; helps simplify any required disambiguation.
64enum ProjectionItem {
65    QualifiedExprs(PlSmallStr, Vec<Expr>),
66    Exprs(Vec<Expr>),
67}
68
69/// Extract the output column name from an expression (if it has one).
70fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
71    match expr {
72        Expr::Column(name) | Expr::Alias(_, name) => Some(name),
73        _ => None,
74    }
75}
76
77/// Disambiguate qualified wildcard columns that conflict with each other or other projections.
78fn disambiguate_projection_cols(
79    items: Vec<ProjectionItem>,
80    schema: &Schema,
81) -> PolarsResult<Vec<Expr>> {
82    // Establish qualified wildcard names (with counts), and other expression names
83    let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
84    let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
85    for item in &items {
86        match item {
87            ProjectionItem::QualifiedExprs(_, exprs) => {
88                for expr in exprs {
89                    if let Some(name) = expr_output_name(expr) {
90                        *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
91                    }
92                }
93            },
94            ProjectionItem::Exprs(exprs) => {
95                for expr in exprs {
96                    if let Some(name) = expr_output_name(expr) {
97                        other_names.insert(name.clone());
98                    }
99                }
100            },
101        }
102    }
103
104    // Names requiring disambiguation (duplicates across wildcards, eg: `tbl1.*`,`tbl2.*`)
105    let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
106        .into_iter()
107        .filter(|(name, count)| *count > 1 || other_names.contains(name))
108        .map(|(name, _)| name)
109        .collect();
110
111    // Output, applying suffixes where needed
112    let mut result: Vec<Expr> = Vec::new();
113    for item in items {
114        match item {
115            ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
116                for expr in exprs {
117                    if let Some(name) = expr_output_name(&expr) {
118                        if needs_suffix.contains(name) {
119                            let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
120                            if schema.contains(suffixed.as_str()) {
121                                result.push(col(suffixed));
122                                continue;
123                            }
124                            if other_names.contains(name) {
125                                polars_bail!(
126                                    SQLInterface:
127                                    "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
128                                );
129                            }
130                        }
131                    }
132                    result.push(expr);
133                }
134            },
135            ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
136                result.extend(exprs);
137            },
138        }
139    }
140    Ok(result)
141}
142
143/// The SQLContext is the main entry point for executing SQL queries.
144#[derive(Clone)]
145pub struct SQLContext {
146    pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
147    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
148    pub(crate) lp_arena: Arena<IR>,
149    pub(crate) expr_arena: Arena<AExpr>,
150
151    cte_map: PlHashMap<String, LazyFrame>,
152    table_aliases: PlHashMap<String, String>,
153    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
154    pub(crate) named_windows: PlHashMap<String, WindowSpec>,
155}
156
157impl Default for SQLContext {
158    fn default() -> Self {
159        Self {
160            function_registry: Arc::new(DefaultFunctionRegistry {}),
161            table_map: Default::default(),
162            cte_map: Default::default(),
163            table_aliases: Default::default(),
164            joined_aliases: Default::default(),
165            named_windows: Default::default(),
166            lp_arena: Default::default(),
167            expr_arena: Default::default(),
168        }
169    }
170}
171
172impl SQLContext {
173    /// Create a new SQLContext.
174    /// ```rust
175    /// # use polars_sql::SQLContext;
176    /// # fn main() {
177    /// let ctx = SQLContext::new();
178    /// # }
179    /// ```
180    pub fn new() -> Self {
181        Self::default()
182    }
183
184    /// Get the names of all registered tables, in sorted order.
185    pub fn get_tables(&self) -> Vec<String> {
186        let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
187        tables.sort_unstable();
188        tables
189    }
190
191    /// Register a [`LazyFrame`] as a table in the SQLContext.
192    /// ```rust
193    /// # use polars_sql::SQLContext;
194    /// # use polars_core::prelude::*;
195    /// # use polars_lazy::prelude::*;
196    /// # fn main() {
197    ///
198    /// let mut ctx = SQLContext::new();
199    /// let df = df! {
200    ///    "a" =>  [1, 2, 3],
201    /// }.unwrap().lazy();
202    ///
203    /// ctx.register("df", df);
204    /// # }
205    ///```
206    pub fn register(&self, name: &str, lf: LazyFrame) {
207        self.table_map.write().unwrap().insert(name.to_owned(), lf);
208    }
209
210    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
211    pub fn unregister(&self, name: &str) {
212        self.table_map.write().unwrap().remove(&name.to_owned());
213    }
214
215    /// Execute a SQL query, returning a [`LazyFrame`].
216    /// ```rust
217    /// # use polars_sql::SQLContext;
218    /// # use polars_core::prelude::*;
219    /// # use polars_lazy::prelude::*;
220    /// # fn main() {
221    ///
222    /// let mut ctx = SQLContext::new();
223    /// let df = df! {
224    ///    "a" =>  [1, 2, 3],
225    /// }
226    /// .unwrap();
227    ///
228    /// ctx.register("df", df.clone().lazy());
229    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
230    /// assert!(sql_df.equals(&df));
231    /// # }
232    ///```
233    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
234        let mut parser = Parser::new(&GenericDialect);
235        parser = parser.with_options(ParserOptions {
236            trailing_commas: true,
237            ..Default::default()
238        });
239
240        let ast = parser
241            .try_with_sql(query)
242            .map_err(to_sql_interface_err)?
243            .parse_statements()
244            .map_err(to_sql_interface_err)?;
245
246        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
247        let res = self.execute_statement(ast.first().unwrap())?;
248
249        // Ensure the result uses the proper arenas.
250        // This will instantiate new arenas with a new version.
251        let lp_arena = std::mem::take(&mut self.lp_arena);
252        let expr_arena = std::mem::take(&mut self.expr_arena);
253        res.set_cached_arena(lp_arena, expr_arena);
254
255        // Every execution should clear the statement-level maps.
256        self.cte_map.clear();
257        self.table_aliases.clear();
258        self.joined_aliases.clear();
259        self.named_windows.clear();
260
261        Ok(res)
262    }
263
264    /// Add a function registry to the SQLContext.
265    /// The registry provides the ability to add custom functions to the SQLContext.
266    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
267        self.function_registry = function_registry;
268        self
269    }
270
271    /// Get the function registry of the SQLContext
272    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
273        &self.function_registry
274    }
275
276    /// Get a mutable reference to the function registry of the SQLContext
277    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
278        Arc::get_mut(&mut self.function_registry).unwrap()
279    }
280}
281
282impl SQLContext {
283    fn isolated(&self) -> Self {
284        Self {
285            // Deep clone to isolate
286            table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
287            named_windows: self.named_windows.clone(),
288            cte_map: self.cte_map.clone(),
289
290            ..Default::default()
291        }
292    }
293
294    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
295        let ast = stmt;
296        Ok(match ast {
297            Statement::Query(query) => self.execute_query(query)?,
298            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
299            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
300            stmt @ Statement::Drop {
301                object_type: ObjectType::Table,
302                ..
303            } => self.execute_drop_table(stmt)?,
304            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
305            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
306            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
307            _ => polars_bail!(
308                SQLInterface: "statement type is not supported:\n{:?}", ast,
309            ),
310        })
311    }
312
313    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
314        self.register_ctes(query)?;
315        self.execute_query_no_ctes(query)
316    }
317
318    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
319        self.validate_query(query)?;
320
321        let lf = self.process_query(&query.body, query)?;
322        self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
323    }
324
325    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
326        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
327    }
328
329    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
330        // Resolve the table name in the current scope; multi-stage fallback
331        // * table name → cte name
332        // * table alias → cte alias
333        self.table_map
334            .read()
335            .unwrap()
336            .get(name)
337            .cloned()
338            .or_else(|| self.cte_map.get(name).cloned())
339            .or_else(|| {
340                self.table_aliases.get(name).and_then(|alias| {
341                    self.table_map
342                        .read()
343                        .unwrap()
344                        .get(alias.as_str())
345                        .or_else(|| self.cte_map.get(alias.as_str()))
346                        .cloned()
347                })
348            })
349    }
350
351    /// Execute a query in an isolated context. This prevents subqueries from mutating
352    /// arenas and other context state. Returns both the LazyFrame *and* its associated
353    /// Schema (so that the correct arenas are used when determining schema).
354    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
355    where
356        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
357    {
358        let mut ctx = self.isolated();
359
360        // Execute query with clean state (eg: nested/subquery)
361        let lf = query(&mut ctx)?;
362
363        // Save state
364        lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
365
366        Ok(lf)
367    }
368
369    fn expr_or_ordinal(
370        &mut self,
371        e: &SQLExpr,
372        exprs: &[Expr],
373        selected: Option<&[Expr]>,
374        schema: Option<&Schema>,
375        clause: &str,
376    ) -> PolarsResult<Expr> {
377        match e {
378            SQLExpr::UnaryOp {
379                op: SQLUnaryOperator::Minus,
380                expr,
381            } if matches!(
382                **expr,
383                SQLExpr::Value(ValueWithSpan {
384                    value: SQLValue::Number(_, _),
385                    ..
386                })
387            ) =>
388            {
389                if let SQLExpr::Value(ValueWithSpan {
390                    value: SQLValue::Number(ref idx, _),
391                    ..
392                }) = **expr
393                {
394                    Err(polars_err!(
395                    SQLSyntax:
396                    "negative ordinal values are invalid for {}; found -{}",
397                    clause,
398                    idx
399                    ))
400                } else {
401                    unreachable!()
402                }
403            },
404            SQLExpr::Value(ValueWithSpan {
405                value: SQLValue::Number(idx, _),
406                ..
407            }) => {
408                // note: sql queries are 1-indexed
409                let idx = idx.parse::<usize>().map_err(|_| {
410                    polars_err!(
411                        SQLSyntax:
412                        "negative ordinal values are invalid for {}; found {}",
413                        clause,
414                        idx
415                    )
416                })?;
417                // note: "selected" cols represent final projection order, so we use those for
418                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
419                let cols = if let Some(cols) = selected {
420                    cols
421                } else {
422                    exprs
423                };
424                Ok(cols
425                    .get(idx - 1)
426                    .ok_or_else(|| {
427                        polars_err!(
428                            SQLInterface:
429                            "{} ordinal value must refer to a valid column; found {}",
430                            clause,
431                            idx
432                        )
433                    })?
434                    .clone())
435            },
436            SQLExpr::Value(v) => Err(polars_err!(
437                SQLSyntax:
438                "{} requires a valid expression or positive ordinal; found {}", clause, v,
439            )),
440            _ => {
441                // Handle qualified cross-aliasing in ORDER BY clauses
442                // (eg: `SELECT a AS b, -b AS a ... ORDER BY self.a`)
443                let mut expr = parse_sql_expr(e, self, schema)?;
444                if matches!(e, SQLExpr::CompoundIdentifier(_)) {
445                    if let Some(schema) = schema {
446                        expr = expr.map_expr(|ex| match &ex {
447                            Expr::Column(name) => {
448                                let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
449                                if schema.contains(prefixed.as_str()) {
450                                    col(prefixed)
451                                } else {
452                                    ex
453                                }
454                            },
455                            _ => ex,
456                        });
457                    }
458                }
459                Ok(expr)
460            },
461        }
462    }
463
464    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
465        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
466            if let Some(name) = aliases.get(column_name) {
467                return name.to_string();
468            }
469        }
470        column_name.to_string()
471    }
472
473    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
474        match expr {
475            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
476            SetExpr::Query(nested_query) => {
477                let lf = self.execute_query_no_ctes(nested_query)?;
478                self.process_order_by(lf, &query.order_by, None)
479            },
480            SetExpr::SetOperation {
481                op: SetOperator::Union,
482                set_quantifier,
483                left,
484                right,
485            } => self.process_union(left, right, set_quantifier, query),
486
487            #[cfg(feature = "semi_anti_join")]
488            SetExpr::SetOperation {
489                op: SetOperator::Intersect | SetOperator::Except,
490                set_quantifier,
491                left,
492                right,
493            } => self.process_except_intersect(left, right, set_quantifier, query),
494
495            SetExpr::Values(Values {
496                explicit_row: _,
497                rows,
498                value_keyword: _,
499            }) => self.process_values(rows),
500
501            SetExpr::Table(tbl) => {
502                if let Some(table_name) = tbl.table_name.as_ref() {
503                    self.get_table_from_current_scope(table_name)
504                        .ok_or_else(|| {
505                            polars_err!(
506                                SQLInterface: "no table or alias named '{}' found",
507                                tbl
508                            )
509                        })
510                } else {
511                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
512                }
513            },
514            op => {
515                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
516            },
517        }
518    }
519
520    #[cfg(feature = "semi_anti_join")]
521    fn process_except_intersect(
522        &mut self,
523        left: &SetExpr,
524        right: &SetExpr,
525        quantifier: &SetQuantifier,
526        query: &Query,
527    ) -> PolarsResult<LazyFrame> {
528        let (join_type, op_name) = match *query.body {
529            SetExpr::SetOperation {
530                op: SetOperator::Except,
531                ..
532            } => (JoinType::Anti, "EXCEPT"),
533            _ => (JoinType::Semi, "INTERSECT"),
534        };
535
536        // Note: each side of the EXCEPT/INTERSECT operation should execute
537        // in isolation to prevent context state leakage between them
538        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
539        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
540        let lf_schema = self.get_frame_schema(&mut lf)?;
541
542        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
543        let rf_cols = match quantifier {
544            SetQuantifier::ByName => None,
545            SetQuantifier::Distinct | SetQuantifier::None => {
546                let rf_schema = self.get_frame_schema(&mut rf)?;
547                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
548                if lf_cols.len() != rf_cols.len() {
549                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
550                }
551                Some(rf_cols)
552            },
553            _ => {
554                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
555            },
556        };
557        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
558        let joined_tbl = match rf_cols {
559            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
560            None => join.on(lf_cols).finish(),
561        };
562        let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
563        self.process_order_by(lf, &query.order_by, None)
564    }
565
566    fn process_union(
567        &mut self,
568        left: &SetExpr,
569        right: &SetExpr,
570        quantifier: &SetQuantifier,
571        query: &Query,
572    ) -> PolarsResult<LazyFrame> {
573        let quantifier = *quantifier;
574
575        // Note: each side of the UNION operation should execute
576        // in isolation to prevent context state leakage between them
577        let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
578        let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
579
580        let opts = UnionArgs {
581            parallel: true,
582            to_supertypes: true,
583            maintain_order: false,
584            ..Default::default()
585        };
586        let lf = match quantifier {
587            // UNION [ALL | DISTINCT]
588            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
589                let lf_schema = self.get_frame_schema(&mut lf)?;
590                let rf_schema = self.get_frame_schema(&mut rf)?;
591                if lf_schema.len() != rf_schema.len() {
592                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
593                }
594                // rename `rf` columns to match `lf` if they differ; SQL behaves
595                // positionally on UNION ops (unless using the "BY NAME" qualifier)
596                if lf_schema.iter_names().ne(rf_schema.iter_names()) {
597                    rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
598                }
599                let concatenated = concat(vec![lf, rf], opts);
600                match quantifier {
601                    SetQuantifier::Distinct | SetQuantifier::None => {
602                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
603                    },
604                    _ => concatenated,
605                }
606            },
607            // UNION ALL BY NAME
608            #[cfg(feature = "diagonal_concat")]
609            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
610            // UNION [DISTINCT] BY NAME
611            #[cfg(feature = "diagonal_concat")]
612            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
613                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
614                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
615            },
616            #[allow(unreachable_patterns)]
617            _ => {
618                polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
619            },
620        }?;
621
622        self.process_order_by(lf, &query.order_by, None)
623    }
624
625    /// Process UNNEST as a lateral operation when it contains column references
626    /// (handles `CROSS JOIN UNNEST(col) AS name` by exploding the referenced col).
627    fn process_unnest_lateral(
628        &self,
629        lf: LazyFrame,
630        alias: &Option<TableAlias>,
631        array_exprs: &[SQLExpr],
632        with_offset: bool,
633    ) -> PolarsResult<LazyFrame> {
634        let alias = alias
635            .as_ref()
636            .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
637        polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
638
639        let (mut explode_cols, mut rename_from, mut rename_to) = (
640            Vec::with_capacity(array_exprs.len()),
641            Vec::with_capacity(array_exprs.len()),
642            Vec::with_capacity(array_exprs.len()),
643        );
644        let is_single_col = array_exprs.len() == 1;
645
646        for (i, arr_expr) in array_exprs.iter().enumerate() {
647            let col_name = match arr_expr {
648                SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
649                SQLExpr::CompoundIdentifier(parts) => {
650                    PlSmallStr::from_str(&parts.last().unwrap().value)
651                },
652                SQLExpr::Array(_) => polars_bail!(
653                    SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
654                ),
655                other => polars_bail!(
656                    SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
657                ),
658            };
659            // alias: column name from "AS t(col)", or table alias
660            if let Some(name) = alias
661                .columns
662                .get(i)
663                .map(|c| c.name.value.as_str())
664                .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
665                .filter(|name| !name.is_empty() && *name != col_name.as_str())
666            {
667                rename_from.push(col_name.clone());
668                rename_to.push(PlSmallStr::from_str(name));
669            }
670            explode_cols.push(col_name);
671        }
672
673        let mut lf = lf.explode(
674            Selector::ByName {
675                names: Arc::from(explode_cols),
676                strict: true,
677            },
678            ExplodeOptions {
679                empty_as_null: true,
680                keep_nulls: true,
681            },
682        );
683        if !rename_from.is_empty() {
684            lf = lf.rename(rename_from, rename_to, true);
685        }
686        Ok(lf)
687    }
688
689    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
690        let frame_rows: Vec<Row> = values.iter().map(|row| {
691            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
692                let expr = parse_sql_expr(expr, self, None)?;
693                match expr {
694                    Expr::Literal(value) => {
695                        value.to_any_value()
696                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
697                            .map(|av| av.into_static())
698                    },
699                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
700                }
701            }).collect();
702            row_data.map(Row::new)
703        }).collect::<Result<_, _>>()?;
704
705        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
706    }
707
708    // EXPLAIN SELECT * FROM DF
709    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
710        match stmt {
711            Statement::Explain { statement, .. } => {
712                let lf = self.execute_statement(statement)?;
713                let plan = lf.describe_optimized_plan()?;
714                let plan = plan
715                    .split('\n')
716                    .collect::<Series>()
717                    .with_name(PlSmallStr::from_static("Logical Plan"))
718                    .into_column();
719                let df = DataFrame::new_infer_height(vec![plan])?;
720                Ok(df.lazy())
721            },
722            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
723        }
724    }
725
726    // SHOW TABLES
727    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
728        let tables = Column::new("name".into(), self.get_tables());
729        let df = DataFrame::new_infer_height(vec![tables])?;
730        Ok(df.lazy())
731    }
732
733    // DROP TABLE <tbl>
734    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
735        match stmt {
736            Statement::Drop { names, .. } => {
737                names.iter().for_each(|name| {
738                    self.table_map.write().unwrap().remove(&name.to_string());
739                });
740                Ok(DataFrame::empty().lazy())
741            },
742            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
743        }
744    }
745
746    // DELETE FROM <tbl> [WHERE ...]
747    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
748        if let Statement::Delete(Delete {
749            tables,
750            from,
751            using,
752            selection,
753            returning,
754            order_by,
755            limit,
756            delete_token: _,
757        }) = stmt
758        {
759            let error_message: Option<&'static str> = if !tables.is_empty() {
760                Some("DELETE expects exactly one table name")
761            } else if using.is_some() {
762                Some("DELETE does not support the USING clause")
763            } else if returning.is_some() {
764                Some("DELETE does not support the RETURNING clause")
765            } else if limit.is_some() {
766                Some("DELETE does not support the LIMIT clause")
767            } else if !order_by.is_empty() {
768                Some("DELETE does not support the ORDER BY clause")
769            } else {
770                None
771            };
772
773            if let Some(msg) = error_message {
774                polars_bail!(SQLInterface: msg);
775            }
776
777            let from_tables = match &from {
778                FromTable::WithFromKeyword(from) => from,
779                FromTable::WithoutKeyword(from) => from,
780            };
781            if from_tables.len() > 1 {
782                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
783            }
784            let tbl_expr = from_tables.first().unwrap();
785            if !tbl_expr.joins.is_empty() {
786                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
787            }
788            let (_, lf) = self.get_table(&tbl_expr.relation)?;
789            if selection.is_none() {
790                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
791                Ok(lf.clear())
792            } else {
793                // apply constraint as inverted filter (drops rows matching the selection)
794                Ok(self.process_where(lf.clone(), selection, true, None)?)
795            }
796        } else {
797            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
798        }
799    }
800
801    // TRUNCATE <tbl>
802    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
803        if let Statement::Truncate(Truncate {
804            table_names,
805            partitions,
806            ..
807        }) = stmt
808        {
809            match partitions {
810                None => {
811                    if table_names.len() != 1 {
812                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
813                    }
814                    let tbl = table_names[0].name.to_string();
815                    if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
816                        *lf = lf.clone().clear();
817                        Ok(lf.clone())
818                    } else {
819                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
820                    }
821                },
822                _ => {
823                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
824                },
825            }
826        } else {
827            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
828        }
829    }
830
831    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
832        self.cte_map.insert(name.to_owned(), lf);
833    }
834
835    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
836        if let Some(with) = &query.with {
837            if with.recursive {
838                polars_bail!(SQLInterface: "recursive CTEs are not supported")
839            }
840            for cte in &with.cte_tables {
841                let cte_name = cte.alias.name.value.clone();
842                let mut lf = self.execute_query(&cte.query)?;
843                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
844                self.register_cte(&cte_name, lf);
845            }
846        }
847        Ok(())
848    }
849
850    fn register_named_windows(
851        &mut self,
852        named_windows: &[NamedWindowDefinition],
853    ) -> PolarsResult<()> {
854        for NamedWindowDefinition(name, expr) in named_windows {
855            let spec = match expr {
856                NamedWindowExpr::NamedWindow(ref_name) => self
857                    .named_windows
858                    .get(&ref_name.value)
859                    .ok_or_else(|| {
860                        polars_err!(
861                            SQLInterface:
862                            "named window '{}' references undefined window '{}'",
863                            name.value, ref_name.value
864                        )
865                    })?
866                    .clone(),
867                NamedWindowExpr::WindowSpec(spec) => spec.clone(),
868            };
869            self.named_windows.insert(name.value.clone(), spec);
870        }
871        Ok(())
872    }
873
874    /// execute the 'FROM' part of the query
875    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
876        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
877        if !tbl_expr.joins.is_empty() {
878            for join in &tbl_expr.joins {
879                // Handle "CROSS JOIN UNNEST(col)" as a lateral join op
880                if let (
881                    JoinOperator::CrossJoin(JoinConstraint::None),
882                    TableFactor::UNNEST {
883                        alias,
884                        array_exprs,
885                        with_offset,
886                        ..
887                    },
888                ) = (&join.join_operator, &join.relation)
889                {
890                    if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
891                        lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
892                        continue;
893                    }
894                }
895
896                let (r_name, mut rf) = self.get_table(&join.relation)?;
897                if r_name.is_empty() {
898                    // Require non-empty to avoid duplicate column errors from nested self-joins.
899                    polars_bail!(
900                        SQLInterface:
901                        "cannot JOIN on unnamed relation; please provide an alias"
902                    )
903                }
904                let left_schema = self.get_frame_schema(&mut lf)?;
905                let right_schema = self.get_frame_schema(&mut rf)?;
906
907                lf = match &join.join_operator {
908                    op @ (JoinOperator::Join(constraint)  // note: bare "join" is inner
909                    | JoinOperator::FullOuter(constraint)
910                    | JoinOperator::Left(constraint)
911                    | JoinOperator::LeftOuter(constraint)
912                    | JoinOperator::Right(constraint)
913                    | JoinOperator::RightOuter(constraint)
914                    | JoinOperator::Inner(constraint)
915                    | JoinOperator::Anti(constraint)
916                    | JoinOperator::Semi(constraint)
917                    | JoinOperator::LeftAnti(constraint)
918                    | JoinOperator::LeftSemi(constraint)
919                    | JoinOperator::RightAnti(constraint)
920                    | JoinOperator::RightSemi(constraint)) => {
921                        let (lf, rf) = match op {
922                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
923                            _ => (lf, rf),
924                        };
925                        self.process_join(
926                            &TableInfo {
927                                frame: lf,
928                                name: (&l_name).into(),
929                                schema: left_schema.clone(),
930                            },
931                            &TableInfo {
932                                frame: rf,
933                                name: (&r_name).into(),
934                                schema: right_schema.clone(),
935                            },
936                            constraint,
937                            match op {
938                                JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
939                                JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
940                                    JoinType::Left
941                                },
942                                JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
943                                    JoinType::Right
944                                },
945                                JoinOperator::FullOuter(_) => JoinType::Full,
946                                #[cfg(feature = "semi_anti_join")]
947                                JoinOperator::Anti(_)
948                                | JoinOperator::LeftAnti(_)
949                                | JoinOperator::RightAnti(_) => JoinType::Anti,
950                                #[cfg(feature = "semi_anti_join")]
951                                JoinOperator::Semi(_)
952                                | JoinOperator::LeftSemi(_)
953                                | JoinOperator::RightSemi(_) => JoinType::Semi,
954                                join_type => polars_bail!(
955                                    SQLInterface:
956                                    "join type '{:?}' not currently supported",
957                                    join_type
958                                ),
959                            },
960                        )?
961                    },
962                    JoinOperator::CrossJoin(JoinConstraint::None) => {
963                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
964                    },
965                    JoinOperator::CrossJoin(constraint) => {
966                        polars_bail!(
967                            SQLInterface:
968                            "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
969                            constraint
970                        )
971                    },
972                    join_type => {
973                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
974                    },
975                };
976
977                // track join-aliased columns so we can resolve/check them later
978                let joined_schema = self.get_frame_schema(&mut lf)?;
979
980                self.joined_aliases.insert(
981                    r_name.clone(),
982                    right_schema
983                        .iter_names()
984                        .filter_map(|name| {
985                            // col exists in both tables and is aliased in the joined result
986                            let aliased_name = format!("{name}:{r_name}");
987                            if left_schema.contains(name)
988                                && joined_schema.contains(aliased_name.as_str())
989                            {
990                                Some((name.to_string(), aliased_name))
991                            } else {
992                                None
993                            }
994                        })
995                        .collect::<PlHashMap<String, String>>(),
996                );
997            }
998        };
999        Ok(lf)
1000    }
1001
1002    /// Check that the SELECT statement only contains supported clauses.
1003    fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1004        // Destructure "Select" exhaustively; that way if/when new fields are added in
1005        // future sqlparser versions, we'll get a compilation error and can handle them
1006        let Select {
1007            // Supported clauses
1008            distinct: _,
1009            from: _,
1010            group_by: _,
1011            having: _,
1012            named_window: _,
1013            projection: _,
1014            qualify: _,
1015            selection: _,
1016
1017            // Metadata/token fields (can ignore)
1018            flavor: _,
1019            select_token: _,
1020            top_before_distinct: _,
1021            window_before_qualify: _,
1022
1023            // Unsupported clauses
1024            ref cluster_by,
1025            ref connect_by,
1026            ref distribute_by,
1027            ref exclude,
1028            ref into,
1029            ref lateral_views,
1030            ref prewhere,
1031            ref sort_by,
1032            ref top,
1033            ref value_table_mode,
1034        } = *select_stmt;
1035
1036        // Raise specific error messages for unsupported attributes
1037        polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1038        polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1039        polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1040        polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1041        polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1042        polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1043        polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1044        polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1045        polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1046        polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1047
1048        Ok(())
1049    }
1050
1051    /// Check that the QUERY only contains supported clauses.
1052    fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1053        // As with "Select" validation (above) destructure "Query" exhaustively
1054        let Query {
1055            // Supported clauses
1056            with: _,
1057            body: _,
1058            order_by: _,
1059            limit_clause: _,
1060            fetch,
1061
1062            // Unsupported clauses
1063            for_clause,
1064            format_clause,
1065            locks,
1066            pipe_operators,
1067            settings,
1068        } = query;
1069
1070        // Raise specific error messages for unsupported attributes
1071        polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1072        polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1073        polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1074        polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1075        polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1076
1077        // Validate FETCH clause options (if present)
1078        if let Some(Fetch {
1079            quantity: _, // supported
1080            percent,
1081            with_ties,
1082        }) = fetch
1083        {
1084            polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1085            polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1086        }
1087        Ok(())
1088    }
1089
1090    /// Execute the 'SELECT' part of the query.
1091    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1092        // Check that the statement doesn't contain unsupported SELECT clauses
1093        self.validate_select(select_stmt)?;
1094
1095        // Parse named windows first, as they may be referenced in the SELECT clause
1096        self.register_named_windows(&select_stmt.named_window)?;
1097
1098        // Get `FROM` table/data
1099        let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1100            (DataFrame::empty().lazy(), None)
1101        } else {
1102            // Note: implicit joins need more work to support properly,
1103            // explicit joins are preferred for now (ref: #16662)
1104            let from = select_stmt.clone().from;
1105            if from.len() > 1 {
1106                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1107            }
1108            let tbl_expr = from.first().unwrap();
1109            let lf = self.execute_from_statement(tbl_expr)?;
1110            let base_name = get_table_name(&tbl_expr.relation);
1111            (lf, base_name)
1112        };
1113
1114        // Check for ambiguous column references in SELECT and WHERE (if there were joins)
1115        if let Some(ref base_name) = base_table_name {
1116            if !self.joined_aliases.is_empty() {
1117                // Extract USING columns from joins (these are coalesced and not ambiguous)
1118                let using_cols: PlHashSet<String> = select_stmt
1119                    .from
1120                    .first()
1121                    .into_iter()
1122                    .flat_map(|t| t.joins.iter())
1123                    .filter_map(|join| get_using_cols(&join.join_operator))
1124                    .flatten()
1125                    .collect();
1126
1127                // Check SELECT and WHERE expressions for ambiguous column references
1128                let check_expr = |e| {
1129                    check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1130                };
1131                for item in &select_stmt.projection {
1132                    match item {
1133                        SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1134                            check_expr(e)?
1135                        },
1136                        _ => {},
1137                    }
1138                }
1139                if let Some(ref where_expr) = select_stmt.selection {
1140                    check_expr(where_expr)?;
1141                }
1142            }
1143        }
1144
1145        // Apply `WHERE` constraint
1146        let mut schema = self.get_frame_schema(&mut lf)?;
1147        lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1148
1149        // Determine projections
1150        let mut select_modifiers = SelectModifiers {
1151            ilike: None,
1152            exclude: PlHashSet::new(),
1153            rename: PlHashMap::new(),
1154            replace: vec![],
1155        };
1156
1157        // Collect window function cols if QUALIFY is present (we check at the
1158        // SQL level because empty OVER() clauses don't create Expr::Over)
1159        let window_fn_columns = if select_stmt.qualify.is_some() {
1160            select_stmt
1161                .projection
1162                .iter()
1163                .filter_map(|item| match item {
1164                    SelectItem::ExprWithAlias { expr, alias }
1165                        if expr_has_window_functions(expr) =>
1166                    {
1167                        Some(alias.value.clone())
1168                    },
1169                    _ => None,
1170                })
1171                .collect::<PlHashSet<_>>()
1172        } else {
1173            PlHashSet::new()
1174        };
1175
1176        let mut projections =
1177            self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1178
1179        // Apply `UNNEST` expressions
1180        let mut explode_names = Vec::new();
1181        let mut explode_exprs = Vec::new();
1182        let mut explode_lookup = PlHashMap::new();
1183
1184        for expr in &projections {
1185            for e in expr {
1186                if let Expr::Explode { input, .. } = e {
1187                    match input.as_ref() {
1188                        Expr::Column(name) => explode_names.push(name.clone()),
1189                        other_expr => {
1190                            // Note: skip aggregate expressions; those are handled in the GROUP BY phase
1191                            if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1192                                let temp_name =
1193                                    format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1194                                explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1195                                explode_lookup.insert(other_expr.clone(), temp_name.clone());
1196                                explode_names.push(temp_name);
1197                            }
1198                        },
1199                    }
1200                }
1201            }
1202        }
1203        if !explode_names.is_empty() {
1204            if !explode_exprs.is_empty() {
1205                lf = lf.with_columns(explode_exprs);
1206            }
1207            lf = lf.explode(
1208                Selector::ByName {
1209                    names: Arc::from(explode_names),
1210                    strict: true,
1211                },
1212                ExplodeOptions {
1213                    empty_as_null: true,
1214                    keep_nulls: true,
1215                },
1216            );
1217            projections = projections
1218                .into_iter()
1219                .map(|p| {
1220                    // Update "projections" with column refs to the now-exploded expressions
1221                    p.map_expr(|e| match e {
1222                        Expr::Explode { input, .. } => explode_lookup
1223                            .get(input.as_ref())
1224                            .map(|name| Expr::Column(name.clone()))
1225                            .unwrap_or_else(|| input.as_ref().clone()),
1226                        _ => e,
1227                    })
1228                })
1229                .collect();
1230
1231            schema = self.get_frame_schema(&mut lf)?;
1232        }
1233
1234        // Check for "GROUP BY ..." (after determining projections)
1235        let mut group_by_keys: Vec<Expr> = Vec::new();
1236        match &select_stmt.group_by {
1237            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
1238            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1239                if !modifiers.is_empty() {
1240                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1241                }
1242                // Translate the group expressions, resolving ordinal values and SELECT aliases
1243                group_by_keys = group_by_exprs
1244                    .iter()
1245                    .map(|e| match e {
1246                        SQLExpr::Identifier(ident) => {
1247                            resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1248                                || {
1249                                    self.expr_or_ordinal(
1250                                        e,
1251                                        &projections,
1252                                        None,
1253                                        Some(&schema),
1254                                        "GROUP BY",
1255                                    )
1256                                },
1257                                Ok,
1258                            )
1259                        },
1260                        _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1261                    })
1262                    .collect::<PolarsResult<_>>()?
1263            },
1264            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
1265            // nested agg/window funcs to the group key (also ignores literals).
1266            GroupByExpr::All(modifiers) => {
1267                if !modifiers.is_empty() {
1268                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1269                }
1270                projections.iter().for_each(|expr| match expr {
1271                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
1272                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1273                    Expr::Column(_) => group_by_keys.push(expr.clone()),
1274                    Expr::Alias(e, _)
1275                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1276                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1277                        if let Expr::Column(name) = &**e {
1278                            group_by_keys.push(col(name.clone()));
1279                        }
1280                    },
1281                    _ => {
1282                        // If not quick-matched, add if no nested agg/window expressions
1283                        if !has_expr(expr, |e| {
1284                            matches!(e, Expr::Agg(_))
1285                                || matches!(e, Expr::Len)
1286                                || matches!(e, Expr::Over { .. })
1287                                || {
1288                                    #[cfg(feature = "dynamic_group_by")]
1289                                    {
1290                                        matches!(e, Expr::Rolling { .. })
1291                                    }
1292                                    #[cfg(not(feature = "dynamic_group_by"))]
1293                                    {
1294                                        false
1295                                    }
1296                                }
1297                        }) {
1298                            group_by_keys.push(expr.clone())
1299                        }
1300                    },
1301                });
1302            },
1303        };
1304
1305        lf = if group_by_keys.is_empty() {
1306            // The 'having' clause is only valid inside 'group by'
1307            if select_stmt.having.is_some() {
1308                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1309            };
1310
1311            // Final/selected cols, accounting for 'SELECT *' modifiers
1312            let mut retained_cols = Vec::with_capacity(projections.len());
1313            let mut retained_names = Vec::with_capacity(projections.len());
1314            let have_order_by = query.order_by.is_some();
1315
1316            // Initialize containing InheritsContext to handle empty projection case.
1317            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1318
1319            // Note: if there is an 'order by' then we project everything (original cols
1320            // and new projections) and *then* select the final cols; the retained cols
1321            // are used to ensure a correct final projection. If there's no 'order by',
1322            // clause then we can project the final column *expressions* directly.
1323            for p in projections.iter() {
1324                let name = p.to_field(schema.deref())?.name.to_string();
1325                if select_modifiers.matches_ilike(&name)
1326                    && !select_modifiers.exclude.contains(&name)
1327                {
1328                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1329
1330                    retained_cols.push(if have_order_by {
1331                        col(name.as_str())
1332                    } else {
1333                        p.clone()
1334                    });
1335                    retained_names.push(col(name));
1336                }
1337            }
1338
1339            // Apply the remaining modifiers and establish the final projection
1340            if have_order_by {
1341                // We can safely use `with_columns()` and avoid a join if:
1342                // * There is already a projection that projects to the table height.
1343                // * All projection heights inherit from context (e.g. all scalar literals that
1344                //   are to be broadcasted to table height).
1345                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1346                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1347                {
1348                    lf = lf.with_columns(projections);
1349                } else {
1350                    // We hit this branch if the output height is not guaranteed to match the table
1351                    // height. E.g.:
1352                    //
1353                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
1354                    //
1355                    // For these cases we truncate / extend the sorting columns with NULLs to match
1356                    // the output height. We do this by projecting independently and then joining
1357                    // back the original frame on the row index.
1358                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1359                    lf = lf
1360                        .clone()
1361                        .select(projections)
1362                        .with_row_index(NAME, None)
1363                        .join(
1364                            lf.with_row_index(NAME, None),
1365                            [col(NAME)],
1366                            [col(NAME)],
1367                            JoinArgs {
1368                                how: JoinType::Left,
1369                                validation: Default::default(),
1370                                suffix: None,
1371                                slice: None,
1372                                nulls_equal: false,
1373                                coalesce: Default::default(),
1374                                maintain_order: MaintainOrderJoin::Left,
1375                                build_side: None,
1376                            },
1377                        );
1378                }
1379            }
1380            if !select_modifiers.replace.is_empty() {
1381                lf = lf.with_columns(&select_modifiers.replace);
1382            }
1383            if !select_modifiers.rename.is_empty() {
1384                lf = lf.with_columns(select_modifiers.renamed_cols());
1385            }
1386            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1387
1388            // Note: If `have_order_by`, with_columns is already done above.
1389            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1390                && !have_order_by
1391            {
1392                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
1393                lf = lf.with_columns(retained_cols).select(retained_names);
1394            } else {
1395                lf = lf.select(retained_cols);
1396            }
1397            if !select_modifiers.rename.is_empty() {
1398                lf = lf.rename(
1399                    select_modifiers.rename.keys(),
1400                    select_modifiers.rename.values(),
1401                    true,
1402                );
1403            };
1404            lf
1405        } else {
1406            let having = select_stmt
1407                .having
1408                .as_ref()
1409                .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1410                .transpose()?;
1411            lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1412            lf = self.process_order_by(lf, &query.order_by, None)?;
1413
1414            // Drop any extra columns (eg: added to maintain ORDER BY access to original cols)
1415            let output_cols: Vec<_> = projections
1416                .iter()
1417                .map(|p| p.to_field(&schema))
1418                .collect::<PolarsResult<Vec<_>>>()?
1419                .into_iter()
1420                .map(|f| col(f.name))
1421                .collect();
1422
1423            lf.select(&output_cols)
1424        };
1425
1426        // Apply optional QUALIFY clause (filters on window functions).
1427        lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1428
1429        // Apply optional DISTINCT clause.
1430        lf = match &select_stmt.distinct {
1431            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1432            Some(Distinct::On(exprs)) => {
1433                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
1434                let schema = Some(self.get_frame_schema(&mut lf)?);
1435                let cols = exprs
1436                    .iter()
1437                    .map(|e| {
1438                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
1439                        if let Expr::Column(name) = expr {
1440                            Ok(name)
1441                        } else {
1442                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1443                        }
1444                    })
1445                    .collect::<PolarsResult<Vec<_>>>()?;
1446
1447                // DISTINCT ON has to apply the ORDER BY before the operation.
1448                lf = self.process_order_by(lf, &query.order_by, None)?;
1449                return Ok(lf.unique_stable(
1450                    Some(Selector::ByName {
1451                        names: cols.into(),
1452                        strict: true,
1453                    }),
1454                    UniqueKeepStrategy::First,
1455                ));
1456            },
1457            None => lf,
1458        };
1459        Ok(lf)
1460    }
1461
1462    fn column_projections(
1463        &mut self,
1464        select_stmt: &Select,
1465        schema: &SchemaRef,
1466        select_modifiers: &mut SelectModifiers,
1467    ) -> PolarsResult<Vec<Expr>> {
1468        if select_stmt.projection.is_empty()
1469            && select_stmt.flavor == SelectFlavor::FromFirstNoSelect
1470        {
1471            // eg: bare "FROM tbl" is equivalent to "SELECT * FROM tbl".
1472            return Ok(schema.iter_names().map(|name| col(name.clone())).collect());
1473        }
1474        let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1475        let mut has_qualified_wildcard = false;
1476
1477        for select_item in &select_stmt.projection {
1478            match select_item {
1479                SelectItem::UnnamedExpr(expr) => {
1480                    items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1481                        expr,
1482                        self,
1483                        Some(schema),
1484                    )?]));
1485                },
1486                SelectItem::ExprWithAlias { expr, alias } => {
1487                    let expr = parse_sql_expr(expr, self, Some(schema))?;
1488                    items.push(ProjectionItem::Exprs(vec![
1489                        expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1490                    ]));
1491                },
1492                SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1493                    SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1494                        let tbl_name = obj_name
1495                            .0
1496                            .last()
1497                            .and_then(|p| p.as_ident())
1498                            .map(|i| PlSmallStr::from_str(&i.value))
1499                            .unwrap_or_default();
1500                        let exprs = self.process_qualified_wildcard(
1501                            obj_name,
1502                            wildcard_options,
1503                            select_modifiers,
1504                            Some(schema),
1505                        )?;
1506                        items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1507                        has_qualified_wildcard = true;
1508                    },
1509                    SelectItemQualifiedWildcardKind::Expr(_) => {
1510                        polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1511                    },
1512                },
1513                SelectItem::Wildcard(wildcard_options) => {
1514                    let cols = schema.iter_names().map(|name| col(name.clone())).collect();
1515                    items.push(ProjectionItem::Exprs(
1516                        self.process_wildcard_additional_options(
1517                            cols,
1518                            wildcard_options,
1519                            select_modifiers,
1520                            Some(schema),
1521                        )?,
1522                    ));
1523                },
1524            }
1525        }
1526
1527        // Disambiguate qualified wildcards (if any) and flatten expressions
1528        let exprs = if has_qualified_wildcard {
1529            disambiguate_projection_cols(items, schema)?
1530        } else {
1531            items
1532                .into_iter()
1533                .flat_map(|item| match item {
1534                    ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1535                        exprs
1536                    },
1537                })
1538                .collect()
1539        };
1540        let flattened_exprs = exprs
1541            .into_iter()
1542            .flat_map(|expr| expand_exprs(expr, schema))
1543            .collect();
1544
1545        Ok(flattened_exprs)
1546    }
1547
1548    fn process_where(
1549        &mut self,
1550        mut lf: LazyFrame,
1551        expr: &Option<SQLExpr>,
1552        invert_filter: bool,
1553        schema: Option<SchemaRef>,
1554    ) -> PolarsResult<LazyFrame> {
1555        if let Some(expr) = expr {
1556            let schema = match schema {
1557                None => self.get_frame_schema(&mut lf)?,
1558                Some(s) => s,
1559            };
1560
1561            // shortcut filter evaluation if given expression is just TRUE or FALSE
1562            let (all_true, all_false) = match expr {
1563                SQLExpr::Value(ValueWithSpan {
1564                    value: SQLValue::Boolean(b),
1565                    ..
1566                }) => (*b, !*b),
1567                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1568                    (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::Eq) => {
1569                        (a.value == b.value, a.value != b.value)
1570                    },
1571                    (SQLExpr::Value(a), SQLExpr::Value(b), SQLBinaryOperator::NotEq) => {
1572                        (a.value != b.value, a.value == b.value)
1573                    },
1574                    _ => (false, false),
1575                },
1576                _ => (false, false),
1577            };
1578            if (all_true && !invert_filter) || (all_false && invert_filter) {
1579                return Ok(lf);
1580            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1581                return Ok(lf.clear());
1582            }
1583
1584            // ...otherwise parse and apply the filter as normal
1585            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1586            if filter_expression.clone().meta().has_multiple_outputs() {
1587                filter_expression = all_horizontal([filter_expression])?;
1588            }
1589            lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1590            lf = if invert_filter {
1591                lf.remove(filter_expression)
1592            } else {
1593                lf.filter(filter_expression)
1594            };
1595        }
1596        Ok(lf)
1597    }
1598
1599    pub(super) fn process_join(
1600        &mut self,
1601        tbl_left: &TableInfo,
1602        tbl_right: &TableInfo,
1603        constraint: &JoinConstraint,
1604        join_type: JoinType,
1605    ) -> PolarsResult<LazyFrame> {
1606        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1607        let coalesce_type = match constraint {
1608            // "NATURAL" joins should coalesce; otherwise we disambiguate
1609            JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1610            _ => JoinCoalesce::KeepColumns,
1611        };
1612        let joined = tbl_left
1613            .frame
1614            .clone()
1615            .join_builder()
1616            .with(tbl_right.frame.clone())
1617            .left_on(left_on)
1618            .right_on(right_on)
1619            .how(join_type)
1620            .suffix(format!(":{}", tbl_right.name))
1621            .coalesce(coalesce_type)
1622            .finish();
1623
1624        Ok(joined)
1625    }
1626
1627    fn process_qualify(
1628        &mut self,
1629        mut lf: LazyFrame,
1630        qualify_expr: &Option<SQLExpr>,
1631        window_fn_columns: &PlHashSet<String>,
1632    ) -> PolarsResult<LazyFrame> {
1633        if let Some(expr) = qualify_expr {
1634            // Check the QUALIFY expression to identify window functions
1635            // and collect column refs (for looking up aliases from SELECT)
1636            let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1637            let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1638            if !has_window_fns && !references_window_alias {
1639                polars_bail!(
1640                    SQLSyntax:
1641                    "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1642                );
1643            }
1644            let schema = self.get_frame_schema(&mut lf)?;
1645            let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1646            if filter_expression.clone().meta().has_multiple_outputs() {
1647                filter_expression = all_horizontal([filter_expression])?;
1648            }
1649            lf = self.process_subqueries(lf, vec![&mut filter_expression])?;
1650            lf = lf.filter(filter_expression);
1651        }
1652        Ok(lf)
1653    }
1654
1655    fn process_subqueries(
1656        &mut self,
1657        lf: LazyFrame,
1658        exprs: Vec<&mut Expr>,
1659    ) -> PolarsResult<LazyFrame> {
1660        let mut subplans = vec![];
1661
1662        for e in exprs {
1663            *e = e.clone().try_map_expr(|e| {
1664                if let Expr::SubPlan(lp, names) = e {
1665                    assert_eq!(
1666                        names.len(),
1667                        1,
1668                        "multiple columns in subqueries not yet supported"
1669                    );
1670
1671                    let select_expr = names[0].1.clone();
1672                    let mut lf = LazyFrame::from((**lp).clone());
1673                    let schema = self.get_frame_schema(&mut lf)?;
1674                    polars_ensure!(schema.len() == 1,  SQLSyntax: "SQL subquery returns more than one column");
1675                    let lf = lf.select([select_expr.clone()]);
1676
1677                    subplans.push(lf);
1678                    Ok(Expr::Column(names[0].0.clone()).first())
1679                } else {
1680                    Ok(e)
1681                }
1682            })?;
1683        }
1684
1685        if subplans.is_empty() {
1686            Ok(lf)
1687        } else {
1688            subplans.insert(0, lf);
1689            concat_lf_horizontal(
1690                subplans,
1691                HConcatOptions {
1692                    broadcast_unit_length: true,
1693                    ..Default::default()
1694                },
1695            )
1696        }
1697    }
1698
1699    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1700        if let Statement::CreateTable(CreateTable {
1701            if_not_exists,
1702            name,
1703            query,
1704            columns,
1705            like,
1706            ..
1707        }) = stmt
1708        {
1709            let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1710            if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1711                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1712            }
1713            let lf = match (query, columns.is_empty(), like) {
1714                (Some(query), true, None) => {
1715                    // ----------------------------------------------------
1716                    // CREATE TABLE [IF NOT EXISTS] <name> AS <query>
1717                    // ----------------------------------------------------
1718                    self.execute_query(query)?
1719                },
1720                (None, false, None) => {
1721                    // ----------------------------------------------------
1722                    // CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
1723                    // ----------------------------------------------------
1724                    let mut schema = Schema::with_capacity(columns.len());
1725                    for col in columns {
1726                        let col_name = col.name.value.as_str();
1727                        let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1728                        schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1729                    }
1730                    DataFrame::empty_with_schema(&schema).lazy()
1731                },
1732                (None, true, Some(like_kind)) => {
1733                    // ----------------------------------------------------
1734                    // CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
1735                    // ----------------------------------------------------
1736                    let like_name = match like_kind {
1737                        CreateTableLikeKind::Plain(like)
1738                        | CreateTableLikeKind::Parenthesized(like) => &like.name,
1739                    };
1740                    let like_table = like_name
1741                        .0
1742                        .first()
1743                        .unwrap()
1744                        .as_ident()
1745                        .unwrap()
1746                        .value
1747                        .as_str();
1748                    if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1749                        table.clear()
1750                    } else {
1751                        polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1752                    }
1753                },
1754                // No valid options provided
1755                (None, true, None) => {
1756                    polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1757                },
1758                // Mutually exclusive options
1759                _ => {
1760                    polars_bail!(
1761                        SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1762                        query,
1763                        columns,
1764                        like,
1765                    )
1766                },
1767            };
1768            self.register(tbl_name, lf);
1769
1770            let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1771            Ok(df_created.unwrap().lazy())
1772        } else {
1773            unreachable!()
1774        }
1775    }
1776
1777    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1778        match relation {
1779            TableFactor::Table {
1780                name, alias, args, ..
1781            } => {
1782                if let Some(args) = args {
1783                    return self.execute_table_function(name, alias, &args.args);
1784                }
1785                let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1786                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1787                    match alias {
1788                        Some(alias) => {
1789                            self.table_aliases
1790                                .insert(alias.name.value.clone(), tbl_name.to_string());
1791                            Ok((alias.name.value.clone(), lf))
1792                        },
1793                        None => Ok((tbl_name.to_string(), lf)),
1794                    }
1795                } else {
1796                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1797                }
1798            },
1799            TableFactor::Derived {
1800                lateral,
1801                subquery,
1802                alias,
1803            } => {
1804                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1805                if let Some(alias) = alias {
1806                    let mut lf = self.execute_query_no_ctes(subquery)?;
1807                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1808                    self.table_map
1809                        .write()
1810                        .unwrap()
1811                        .insert(alias.name.value.clone(), lf.clone());
1812                    Ok((alias.name.value.clone(), lf))
1813                } else {
1814                    let lf = self.execute_query_no_ctes(subquery)?;
1815                    Ok(("".to_string(), lf))
1816                }
1817            },
1818            TableFactor::UNNEST {
1819                alias,
1820                array_exprs,
1821                with_offset,
1822                with_offset_alias: _,
1823                ..
1824            } => {
1825                if let Some(alias) = alias {
1826                    let column_names: Vec<Option<PlSmallStr>> = alias
1827                        .columns
1828                        .iter()
1829                        .map(|c| {
1830                            if c.name.value.is_empty() {
1831                                None
1832                            } else {
1833                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1834                            }
1835                        })
1836                        .collect();
1837
1838                    let column_values: Vec<Series> = array_exprs
1839                        .iter()
1840                        .map(|arr| parse_sql_array(arr, self))
1841                        .collect::<Result<_, _>>()?;
1842
1843                    polars_ensure!(!column_names.is_empty(),
1844                        SQLSyntax:
1845                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1846                    );
1847                    if column_names.len() != column_values.len() {
1848                        let plural = if column_values.len() > 1 { "s" } else { "" };
1849                        polars_bail!(
1850                            SQLSyntax:
1851                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1852                        );
1853                    }
1854                    let column_series: Vec<Column> = column_values
1855                        .into_iter()
1856                        .zip(column_names)
1857                        .map(|(s, name)| {
1858                            if let Some(name) = name {
1859                                s.with_name(name)
1860                            } else {
1861                                s
1862                            }
1863                        })
1864                        .map(Column::from)
1865                        .collect();
1866
1867                    let lf = DataFrame::new_infer_height(column_series)?.lazy();
1868
1869                    if *with_offset {
1870                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1871                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1872                    }
1873                    let table_name = alias.name.value.clone();
1874                    self.table_map
1875                        .write()
1876                        .unwrap()
1877                        .insert(table_name.clone(), lf.clone());
1878                    Ok((table_name, lf))
1879                } else {
1880                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1881                }
1882            },
1883            TableFactor::NestedJoin {
1884                table_with_joins,
1885                alias,
1886            } => {
1887                let lf = self.execute_from_statement(table_with_joins)?;
1888                match alias {
1889                    Some(a) => Ok((a.name.value.clone(), lf)),
1890                    None => Ok(("".to_string(), lf)),
1891                }
1892            },
1893            // Support bare table, optionally with an alias, for now
1894            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1895        }
1896    }
1897
1898    fn execute_table_function(
1899        &mut self,
1900        name: &ObjectName,
1901        alias: &Option<TableAlias>,
1902        args: &[FunctionArg],
1903    ) -> PolarsResult<(String, LazyFrame)> {
1904        let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1905        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1906        let (tbl_name, lf) = read_fn.execute(args)?;
1907        #[allow(clippy::useless_asref)]
1908        let tbl_name = alias
1909            .as_ref()
1910            .map(|a| a.name.value.clone())
1911            .unwrap_or_else(|| tbl_name.to_string());
1912
1913        self.table_map
1914            .write()
1915            .unwrap()
1916            .insert(tbl_name.clone(), lf.clone());
1917        Ok((tbl_name, lf))
1918    }
1919
1920    fn process_order_by(
1921        &mut self,
1922        mut lf: LazyFrame,
1923        order_by: &Option<OrderBy>,
1924        selected: Option<&[Expr]>,
1925    ) -> PolarsResult<LazyFrame> {
1926        if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1927            OrderByKind::Expressions(exprs) => exprs.is_empty(),
1928            OrderByKind::All(_) => false,
1929        }) {
1930            return Ok(lf);
1931        }
1932        let schema = self.get_frame_schema(&mut lf)?;
1933        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1934        let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1935            OrderByKind::Expressions(exprs) => {
1936                // TODO: will look at making an upstream PR that allows us to more easily
1937                //  create a GenericDialect variant supporting "OrderByKind::All" instead
1938                if exprs.len() == 1
1939                    && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1940                        if ident.value.to_uppercase() == "ALL"
1941                        && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1942                {
1943                    // Treat as ORDER BY ALL
1944                    let n_cols = if let Some(selected) = selected {
1945                        selected.len()
1946                    } else {
1947                        schema.len()
1948                    };
1949                    (vec![], Some(&exprs[0].options), n_cols)
1950                } else {
1951                    (exprs.clone(), None, exprs.len())
1952                }
1953            },
1954            OrderByKind::All(opts) => {
1955                let n_cols = if let Some(selected) = selected {
1956                    selected.len()
1957                } else {
1958                    schema.len()
1959                };
1960                (vec![], Some(opts), n_cols)
1961            },
1962        };
1963        let mut descending = Vec::with_capacity(n_order_cols);
1964        let mut nulls_last = Vec::with_capacity(n_order_cols);
1965        let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1966
1967        if let Some(opts) = order_by_all {
1968            if let Some(selected) = selected {
1969                by.extend(selected.iter().cloned());
1970            } else {
1971                by.extend(columns_iter);
1972            };
1973            let desc_order = !opts.asc.unwrap_or(true);
1974            nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1975            descending.resize(by.len(), desc_order);
1976        } else {
1977            let columns = &columns_iter.collect::<Vec<_>>();
1978            for ob in order_by {
1979                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1980                // https://www.postgresql.org/docs/current/queries-order.html
1981                let desc_order = !ob.options.asc.unwrap_or(true);
1982                nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1983                descending.push(desc_order);
1984
1985                // translate order expression, allowing ordinal values
1986                by.push(self.expr_or_ordinal(
1987                    &ob.expr,
1988                    columns,
1989                    selected,
1990                    Some(&schema),
1991                    "ORDER BY",
1992                )?)
1993            }
1994        }
1995        Ok(lf.sort_by_exprs(
1996            &by,
1997            SortMultipleOptions::default()
1998                .with_order_descending_multi(descending)
1999                .with_nulls_last_multi(nulls_last),
2000        ))
2001    }
2002
2003    fn process_group_by(
2004        &mut self,
2005        mut lf: LazyFrame,
2006        group_by_keys: &[Expr],
2007        projections: &[Expr],
2008        having: Option<Expr>,
2009    ) -> PolarsResult<LazyFrame> {
2010        let schema_before = self.get_frame_schema(&mut lf)?;
2011        let group_by_keys_schema =
2012            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2013                format!("group_by keys contained duplicate output name '{duplicate_name}'")
2014            })?;
2015
2016        // Note: remove the `group_by` keys as Polars adds those implicitly.
2017        let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2018        let mut aggregation_projection = Vec::with_capacity(projections.len());
2019        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2020        let mut projection_aliases = PlHashSet::new();
2021        let mut group_key_aliases = PlHashSet::new();
2022
2023        // Pre-compute group key data (stripped expression + output name) to avoid repeated work.
2024        // We check both expression AND output name match to avoid cross-aliasing issues.
2025        let group_key_data: Vec<_> = group_by_keys
2026            .iter()
2027            .map(|gk| {
2028                (
2029                    strip_outer_alias(gk),
2030                    gk.to_field(&schema_before).ok().map(|f| f.name),
2031                )
2032            })
2033            .collect();
2034
2035        let projection_matches_group_key: Vec<bool> = projections
2036            .iter()
2037            .map(|p| {
2038                let p_stripped = strip_outer_alias(p);
2039                let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
2040                group_key_data
2041                    .iter()
2042                    .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
2043            })
2044            .collect();
2045
2046        for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
2047            // `Len` represents COUNT(*) so we treat as an aggregation here.
2048            let is_non_group_key_expr = !matches_group_key
2049                && has_expr(e, |e| {
2050                    match e {
2051                        Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2052                        #[cfg(feature = "dynamic_group_by")]
2053                        Expr::Rolling { .. } => true,
2054                        Expr::AnonymousFunction { options, .. } => options.returns_scalar(),
2055                        Expr::Function { function: func, .. }
2056                            if !matches!(func, FunctionExpr::StructExpr(_)) =>
2057                        {
2058                            // If it's a function call containing a column NOT in the group by keys,
2059                            // we treat it as an aggregation.
2060                            has_expr(e, |e| match e {
2061                                Expr::Column(name) => !group_by_keys_schema.contains(name),
2062                                _ => false,
2063                            })
2064                        },
2065                        _ => false,
2066                    }
2067                });
2068
2069            // Note: if simple aliased expression we defer aliasing until after the group_by.
2070            // Use `e_inner` to track the potentially unwrapped expression for field lookup.
2071            let mut e_inner = e;
2072            if let Expr::Alias(expr, alias) = e {
2073                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2074                    group_key_aliases.insert(alias.as_ref());
2075                    e_inner = expr
2076                } else if let Expr::Function {
2077                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2078                    ..
2079                } = expr.deref()
2080                {
2081                    projection_overrides
2082                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2083                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2084                    projection_aliases.insert(alias.as_ref());
2085                }
2086            }
2087            let field = e_inner.to_field(&schema_before)?;
2088            if is_non_group_key_expr {
2089                let mut e = e.clone();
2090                if let Expr::Agg(AggExpr::Implode {
2091                    input: expr,
2092                    maintain_order: _,
2093                }) = &e
2094                {
2095                    e = (**expr).clone();
2096                } else if let Expr::Alias(expr, name) = &e {
2097                    if let Expr::Agg(AggExpr::Implode {
2098                        input: expr,
2099                        maintain_order: _,
2100                    }) = expr.as_ref()
2101                    {
2102                        e = (**expr).clone().alias(name.clone());
2103                    }
2104                }
2105                // If aggregation colname conflicts with a group key,
2106                // alias it to avoid duplicate/mis-tracked columns
2107                if group_by_keys_schema.get(&field.name).is_some() {
2108                    let alias_name = format!("__POLARS_AGG_{}", field.name);
2109                    e = e.alias(alias_name.as_str());
2110                    aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
2111                }
2112                aggregation_projection.push(e);
2113            } else if !matches_group_key {
2114                // Non-aggregated columns must be part of the GROUP BY clause
2115                if let Expr::Column(_)
2116                | Expr::Function {
2117                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2118                    ..
2119                } = e_inner
2120                {
2121                    if !group_by_keys_schema.contains(&field.name) {
2122                        polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2123                    }
2124                }
2125            }
2126        }
2127
2128        // Process HAVING clause: identify aggregate expressions, reusing those already
2129        // in projections, or compute as temporary columns and then post-filter/discard
2130        let having_filter = if let Some(having_expr) = having {
2131            let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2132                .iter()
2133                .filter_map(|p| match p {
2134                    Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2135                        Some((inner.as_ref().clone(), name.clone()))
2136                    },
2137                    e @ (Expr::Agg(_) | Expr::Len) => Some((
2138                        e.clone(),
2139                        e.to_field(&schema_before)
2140                            .map(|f| f.name)
2141                            .unwrap_or_default(),
2142                    )),
2143                    _ => None,
2144                })
2145                .collect();
2146
2147            let mut n_having_aggs = 0;
2148            let updated_having = having_expr.map_expr(|e| {
2149                if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2150                    return e;
2151                }
2152                let name = agg_to_name
2153                    .iter()
2154                    .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2155                    .unwrap_or_else(|| {
2156                        let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2157                        aggregation_projection.push(e.clone().alias(n.clone()));
2158                        agg_to_name.push((e.clone(), n.clone()));
2159                        n_having_aggs += 1;
2160                        n
2161                    });
2162                col(name)
2163            });
2164            Some(updated_having)
2165        } else {
2166            None
2167        };
2168
2169        // Apply HAVING filter after aggregation
2170        let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2171        if let Some(filter_expr) = having_filter {
2172            aggregated = aggregated.filter(filter_expr);
2173        }
2174
2175        let projection_schema =
2176            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2177                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2178            })?;
2179
2180        // A final projection to get the proper order and any deferred transforms/aliases
2181        // (will also drop any temporary columns created for the HAVING post-filter).
2182        let final_projection = projection_schema
2183            .iter_names()
2184            .zip(projections.iter().zip(&projection_matches_group_key))
2185            .map(|(name, (projection_expr, &matches_group_key))| {
2186                if let Some(expr) = projection_overrides.get(name.as_str()) {
2187                    expr.clone()
2188                } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2189                    col(aliased_name.clone()).alias(name.clone())
2190                } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
2191                    col(name.clone())
2192                } else if group_by_keys_schema.get(name).is_some()
2193                    || projection_aliases.contains(name.as_str())
2194                    || group_key_aliases.contains(name.as_str())
2195                {
2196                    if has_expr(projection_expr, |e| {
2197                        matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2198                    }) {
2199                        col(name.clone())
2200                    } else {
2201                        projection_expr.clone()
2202                    }
2203                } else {
2204                    col(name.clone())
2205                }
2206            })
2207            .collect::<Vec<_>>();
2208
2209        // Include original GROUP BY columns for ORDER BY access (if aliased).
2210        let mut output_projection = final_projection;
2211        for key_name in group_by_keys_schema.iter_names() {
2212            if !projection_schema.contains(key_name) {
2213                // Original col name not in output - add for ORDER BY access
2214                output_projection.push(col(key_name.clone()));
2215            } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2216                // Original col name in output - check if cross-aliased
2217                let is_cross_aliased = projections.iter().any(|p| {
2218                    p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2219                        && !is_simple_col_ref(p, key_name)
2220                });
2221                if is_cross_aliased {
2222                    // Add original name under a prefixed alias for subsequent ORDER BY resolution
2223                    let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2224                    output_projection.push(col(key_name.clone()).alias(internal_name));
2225                }
2226            }
2227        }
2228        Ok(aggregated.select(&output_projection))
2229    }
2230
2231    fn process_limit_offset(
2232        &self,
2233        lf: LazyFrame,
2234        limit_clause: &Option<LimitClause>,
2235        fetch: &Option<Fetch>,
2236    ) -> PolarsResult<LazyFrame> {
2237        // Extract limit and offset from LimitClause
2238        let (limit, offset) = match limit_clause {
2239            Some(LimitClause::LimitOffset {
2240                limit,
2241                offset,
2242                limit_by,
2243            }) => {
2244                if !limit_by.is_empty() {
2245                    // TODO: might be able to support as an aggregate `top_k_by` operation?
2246                    //  (https://clickhouse.com/docs/sql-reference/statements/select/limit-by)
2247                    polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2248                }
2249                (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2250            },
2251            Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2252            None => (None, None),
2253        };
2254
2255        // Handle FETCH clause (alternative to LIMIT, mutually exclusive)
2256        let limit = match (fetch, limit) {
2257            (Some(fetch), None) => fetch.quantity.as_ref(),
2258            (Some(_), Some(_)) => {
2259                polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2260            },
2261            (None, limit) => limit,
2262        };
2263
2264        // Apply limit and/or offset
2265        match (offset, limit) {
2266            (
2267                Some(SQLExpr::Value(ValueWithSpan {
2268                    value: SQLValue::Number(offset, _),
2269                    ..
2270                })),
2271                Some(SQLExpr::Value(ValueWithSpan {
2272                    value: SQLValue::Number(limit, _),
2273                    ..
2274                })),
2275            ) => Ok(lf.slice(
2276                offset
2277                    .parse()
2278                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2279                limit.parse().map_err(
2280                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2281                )?,
2282            )),
2283            (
2284                Some(SQLExpr::Value(ValueWithSpan {
2285                    value: SQLValue::Number(offset, _),
2286                    ..
2287                })),
2288                None,
2289            ) => Ok(lf.slice(
2290                offset
2291                    .parse()
2292                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2293                IdxSize::MAX,
2294            )),
2295            (
2296                None,
2297                Some(SQLExpr::Value(ValueWithSpan {
2298                    value: SQLValue::Number(limit, _),
2299                    ..
2300                })),
2301            ) => {
2302                Ok(lf.limit(limit.parse().map_err(
2303                    |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2304                )?))
2305            },
2306            (None, None) => Ok(lf),
2307            _ => polars_bail!(
2308                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2309            ),
2310        }
2311    }
2312
2313    fn process_qualified_wildcard(
2314        &mut self,
2315        ObjectName(idents): &ObjectName,
2316        options: &WildcardAdditionalOptions,
2317        modifiers: &mut SelectModifiers,
2318        schema: Option<&Schema>,
2319    ) -> PolarsResult<Vec<Expr>> {
2320        let mut idents_with_wildcard: Vec<Ident> = idents
2321            .iter()
2322            .filter_map(|p| p.as_ident().cloned())
2323            .collect();
2324        idents_with_wildcard.push(Ident::new("*"));
2325
2326        let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2327        self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2328    }
2329
2330    fn process_wildcard_additional_options(
2331        &mut self,
2332        exprs: Vec<Expr>,
2333        options: &WildcardAdditionalOptions,
2334        modifiers: &mut SelectModifiers,
2335        schema: Option<&Schema>,
2336    ) -> PolarsResult<Vec<Expr>> {
2337        if options.opt_except.is_some() && options.opt_exclude.is_some() {
2338            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2339        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2340            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2341        }
2342
2343        // SELECT * EXCLUDE
2344        if let Some(items) = &options.opt_exclude {
2345            match items {
2346                ExcludeSelectItem::Single(ident) => {
2347                    modifiers.exclude.insert(ident.value.clone());
2348                },
2349                ExcludeSelectItem::Multiple(idents) => {
2350                    modifiers
2351                        .exclude
2352                        .extend(idents.iter().map(|i| i.value.clone()));
2353                },
2354            };
2355        }
2356
2357        // SELECT * EXCEPT
2358        if let Some(items) = &options.opt_except {
2359            modifiers.exclude.insert(items.first_element.value.clone());
2360            modifiers
2361                .exclude
2362                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2363        }
2364
2365        // SELECT * ILIKE
2366        if let Some(item) = &options.opt_ilike {
2367            let rx = regex::escape(item.pattern.as_str())
2368                .replace('%', ".*")
2369                .replace('_', ".");
2370
2371            modifiers.ilike = Some(
2372                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2373            );
2374        }
2375
2376        // SELECT * RENAME
2377        if let Some(items) = &options.opt_rename {
2378            let renames = match items {
2379                RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2380                RenameSelectItem::Multiple(renames) => renames.as_slice(),
2381            };
2382            for rn in renames {
2383                let before = PlSmallStr::from_str(rn.ident.value.as_str());
2384                let after = PlSmallStr::from_str(rn.alias.value.as_str());
2385                if before != after {
2386                    modifiers.rename.insert(before, after);
2387                }
2388            }
2389        }
2390
2391        // SELECT * REPLACE
2392        if let Some(replacements) = &options.opt_replace {
2393            for rp in &replacements.items {
2394                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2395                modifiers
2396                    .replace
2397                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2398            }
2399        }
2400        Ok(exprs)
2401    }
2402
2403    fn rename_columns_from_table_alias(
2404        &mut self,
2405        mut lf: LazyFrame,
2406        alias: &TableAlias,
2407    ) -> PolarsResult<LazyFrame> {
2408        if alias.columns.is_empty() {
2409            Ok(lf)
2410        } else {
2411            let schema = self.get_frame_schema(&mut lf)?;
2412            if alias.columns.len() != schema.len() {
2413                polars_bail!(
2414                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2415                    alias.columns.len(), alias.name.value, schema.len()
2416                )
2417            } else {
2418                let existing_columns: Vec<_> = schema.iter_names().collect();
2419                let new_columns: Vec<_> =
2420                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
2421                Ok(lf.rename(existing_columns, new_columns, true))
2422            }
2423        }
2424    }
2425}
2426
2427impl SQLContext {
2428    /// Create a new SQLContext from a table map. For internal use only
2429    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2430        Self {
2431            table_map: Arc::new(RwLock::new(table_map)),
2432            ..Default::default()
2433        }
2434    }
2435}
2436
2437fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2438    match expr {
2439        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2440            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2441            schema
2442                .iter_names()
2443                .filter(|name| re.is_match(name))
2444                .map(|name| col(name.clone()))
2445                .collect::<Vec<_>>()
2446        },
2447        Expr::Selector(s) => s
2448            .into_columns(schema, &Default::default())
2449            .unwrap()
2450            .into_iter()
2451            .map(col)
2452            .collect::<Vec<_>>(),
2453        _ => vec![expr],
2454    }
2455}
2456
2457fn is_regex_colname(nm: &str) -> bool {
2458    nm.starts_with('^') && nm.ends_with('$')
2459}
2460
2461/// Extract column names from a USING clause in a JoinOperator (if present).
2462fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2463    use JoinOperator::*;
2464    match op {
2465        Join(JoinConstraint::Using(cols))
2466        | Inner(JoinConstraint::Using(cols))
2467        | Left(JoinConstraint::Using(cols))
2468        | LeftOuter(JoinConstraint::Using(cols))
2469        | Right(JoinConstraint::Using(cols))
2470        | RightOuter(JoinConstraint::Using(cols))
2471        | FullOuter(JoinConstraint::Using(cols))
2472        | Semi(JoinConstraint::Using(cols))
2473        | Anti(JoinConstraint::Using(cols))
2474        | LeftSemi(JoinConstraint::Using(cols))
2475        | LeftAnti(JoinConstraint::Using(cols))
2476        | RightSemi(JoinConstraint::Using(cols))
2477        | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2478            c.0.first()
2479                .and_then(|p| p.as_ident())
2480                .map(|i| i.value.clone())
2481        })),
2482        _ => None,
2483    }
2484}
2485
2486/// Extract the table name (or alias) from a TableFactor.
2487fn get_table_name(factor: &TableFactor) -> Option<String> {
2488    match factor {
2489        TableFactor::Table { name, alias, .. } => {
2490            alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2491                name.0
2492                    .last()
2493                    .and_then(|p| p.as_ident())
2494                    .map(|i| i.value.clone())
2495            })
2496        },
2497        TableFactor::Derived { alias, .. }
2498        | TableFactor::NestedJoin { alias, .. }
2499        | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2500        _ => None,
2501    }
2502}
2503
2504/// Check if an expression is a simple column reference (with optional alias) to the given name.
2505fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2506    match expr {
2507        Expr::Column(n) => n == col_name,
2508        Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2509        _ => false,
2510    }
2511}
2512
2513/// Strip the outer alias from an expression (if present) for expression equality comparison.
2514fn strip_outer_alias(expr: &Expr) -> Expr {
2515    if let Expr::Alias(inner, _) = expr {
2516        inner.as_ref().clone()
2517    } else {
2518        expr.clone()
2519    }
2520}
2521
2522/// Resolve a SELECT alias to its underlying expression (for use in GROUP BY).
2523///
2524/// Returns the expression WITH alias if the name matches a projection alias and is NOT a column
2525/// that exists in the schema; otherwise returns `None` to use the default/standard resolution.
2526fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2527    // Original columns take precedence over SELECT aliases
2528    if schema.contains(name) {
2529        return None;
2530    }
2531    // Find a projection with this alias and return its expression (preserving the alias)
2532    projections.iter().find_map(|p| match p {
2533        Expr::Alias(inner, alias) if alias.as_str() == name => {
2534            Some(inner.as_ref().clone().alias(alias.clone()))
2535        },
2536        _ => None,
2537    })
2538}
2539
2540/// Check if all columns referred to in a Polars expression exist in the given Schema.
2541fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2542    let mut found_cols = false;
2543    let mut all_in_schema = true;
2544    for e in expr.into_iter() {
2545        if let Expr::Column(name) = e {
2546            found_cols = true;
2547            if !schema.contains(name.as_str()) {
2548                all_in_schema = false;
2549                break;
2550            }
2551        }
2552    }
2553    found_cols && all_in_schema
2554}
2555
2556/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
2557///
2558/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
2559/// either way round, and in joins with more than two tables you can also join against an earlier
2560/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
2561/// we join `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
2562/// resolve as our native `join` function operates on only two tables at a time.
2563fn determine_left_right_join_on(
2564    ctx: &mut SQLContext,
2565    expr_left: &SQLExpr,
2566    expr_right: &SQLExpr,
2567    tbl_left: &TableInfo,
2568    tbl_right: &TableInfo,
2569    join_schema: &Schema,
2570) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2571    // parse, removing any aliases that may have been added by `resolve_column`
2572    // (called inside `parse_sql_expr`) as we need the actual/underlying col
2573    let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2574        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2575        e => e,
2576    };
2577    let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2578        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2579        e => e,
2580    };
2581
2582    // ------------------------------------------------------------------
2583    // simple/typical case: can fully resolve SQL-level table references
2584    // ------------------------------------------------------------------
2585    let left_refs = (
2586        expr_refers_to_table(expr_left, &tbl_left.name),
2587        expr_refers_to_table(expr_left, &tbl_right.name),
2588    );
2589    let right_refs = (
2590        expr_refers_to_table(expr_right, &tbl_left.name),
2591        expr_refers_to_table(expr_right, &tbl_right.name),
2592    );
2593    // if the SQL-level references unambiguously indicate table ownership, we're done
2594    match (left_refs, right_refs) {
2595        // standard: left expr → left table, right expr → right table
2596        ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2597        // reversed: left expr → right table, right expr → left table
2598        ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2599        // unsupported: one side references *both* tables
2600        ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2601            polars_bail!(
2602               SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2603               if left_refs.0 && left_refs.1 {
2604                    "left"
2605                } else {
2606                    "right"
2607                }, tbl_left.name, tbl_right.name
2608            )
2609        },
2610        // fall through to the more involved col/ref resolution
2611        _ => {},
2612    }
2613
2614    // ------------------------------------------------------------------
2615    // more involved: additionally employ schema-based column resolution
2616    // (applies to unqualified columns and/or chained joins)
2617    // ------------------------------------------------------------------
2618    let left_on_cols_in = (
2619        expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2620        expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2621    );
2622    let right_on_cols_in = (
2623        expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2624        expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2625    );
2626    match (left_on_cols_in, right_on_cols_in) {
2627        // each expression's columns exist in exactly one schema
2628        ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2629        ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2630        // one expression in both, other only in one; prefer the unique one
2631        ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2632        ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2633        ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2634        ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2635        // pass through as-is
2636        _ => Ok((vec![left_on], vec![right_on])),
2637    }
2638}
2639
2640fn process_join_on(
2641    ctx: &mut SQLContext,
2642    sql_expr: &SQLExpr,
2643    tbl_left: &TableInfo,
2644    tbl_right: &TableInfo,
2645) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2646    match sql_expr {
2647        SQLExpr::BinaryOp { left, op, right } => match op {
2648            SQLBinaryOperator::And => {
2649                let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2650                let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2651                left_i.append(&mut left_j);
2652                right_i.append(&mut right_j);
2653                Ok((left_i, right_i))
2654            },
2655            SQLBinaryOperator::Eq => {
2656                // establish unified schema with cols from both tables; needed for multi/chained
2657                // joins where suffixed intermediary/joined cols aren't in an existing schema.
2658                let mut join_schema =
2659                    Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2660                for (name, dtype) in tbl_left.schema.iter() {
2661                    join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2662                }
2663                for (name, dtype) in tbl_right.schema.iter() {
2664                    if !join_schema.contains(name) {
2665                        join_schema.insert_at_index(
2666                            join_schema.len(),
2667                            name.clone(),
2668                            dtype.clone(),
2669                        )?;
2670                    }
2671                }
2672                determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2673            },
2674            _ => polars_bail!(
2675                // TODO: should be able to support more operators later (via `join_where`?)
2676                SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2677            ),
2678        },
2679        SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2680        _ => polars_bail!(
2681            SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2682        ),
2683    }
2684}
2685
2686fn process_join_constraint(
2687    constraint: &JoinConstraint,
2688    tbl_left: &TableInfo,
2689    tbl_right: &TableInfo,
2690    ctx: &mut SQLContext,
2691) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2692    match constraint {
2693        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2694            process_join_on(ctx, expr, tbl_left, tbl_right)
2695        },
2696        JoinConstraint::Using(idents) if !idents.is_empty() => {
2697            let using: Vec<Expr> = idents
2698                .iter()
2699                .map(|ObjectName(parts)| {
2700                    if parts.len() != 1 {
2701                        polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2702                    }
2703                    match parts[0].as_ident() {
2704                        Some(ident) => Ok(col(ident.value.as_str())),
2705                        None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2706                    }
2707                })
2708                .collect::<PolarsResult<Vec<_>>>()?;
2709            Ok((using.clone(), using))
2710        },
2711        JoinConstraint::Natural => {
2712            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2713            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2714            let on: Vec<Expr> = left_names
2715                .intersection(&right_names)
2716                .map(|&name| col(name.clone()))
2717                .collect();
2718            if on.is_empty() {
2719                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2720            }
2721            Ok((on.clone(), on))
2722        },
2723        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2724    }
2725}
2726
2727/// Extract table identifiers referenced in a SQL query; uses a visitor to
2728/// collect all table names that appear in FROM clauses, JOINs, TABLE refs
2729/// in set operations, and subqueries.
2730pub fn extract_table_identifiers(
2731    query: &str,
2732    include_schema: bool,
2733    unique: bool,
2734) -> PolarsResult<Vec<String>> {
2735    let mut parser = Parser::new(&GenericDialect);
2736    parser = parser.with_options(ParserOptions {
2737        trailing_commas: true,
2738        ..Default::default()
2739    });
2740    let ast = parser
2741        .try_with_sql(query)
2742        .map_err(to_sql_interface_err)?
2743        .parse_statements()
2744        .map_err(to_sql_interface_err)?;
2745
2746    let mut collector = TableIdentifierCollector {
2747        include_schema,
2748        ..Default::default()
2749    };
2750    for stmt in &ast {
2751        let _ = stmt.visit(&mut collector);
2752    }
2753    Ok(if unique {
2754        collector
2755            .tables
2756            .into_iter()
2757            .collect::<PlIndexSet<_>>()
2758            .into_iter()
2759            .collect()
2760    } else {
2761        collector.tables
2762    })
2763}
2764
2765bitflags::bitflags! {
2766    /// Bitfield indicating whether there exists a projection with the specified height behavior.
2767    ///
2768    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
2769    /// context.
2770    #[derive(PartialEq)]
2771    struct ExprSqlProjectionHeightBehavior: u8 {
2772        /// Maintains the height of input column(s)
2773        const MaintainsColumn = 1 << 0;
2774        /// Height is independent of input, e.g.:
2775        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
2776        /// * aggregations: count(*), first(), sum() etc.
2777        const Independent = 1 << 1;
2778        /// "Inherits" the height of the context, e.g.:
2779        /// * Scalar literals
2780        const InheritsContext = 1 << 2;
2781    }
2782}
2783
2784impl ExprSqlProjectionHeightBehavior {
2785    fn identify_from_expr(expr: &Expr) -> Self {
2786        let mut has_column = false;
2787        let mut has_independent = false;
2788
2789        for e in expr.into_iter() {
2790            use Expr::*;
2791            has_column |= matches!(e, Column(_) | Selector(_));
2792            has_independent |= match e {
2793                // @TODO: This is broken now with functions.
2794                AnonymousFunction { options, .. } => {
2795                    options.returns_scalar() || !options.is_length_preserving()
2796                },
2797                Literal(v) => !v.is_scalar(),
2798                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2799                Agg { .. } | Len => true,
2800                _ => false,
2801            }
2802        }
2803        if has_independent {
2804            Self::Independent
2805        } else if has_column {
2806            Self::MaintainsColumn
2807        } else {
2808            Self::InheritsContext
2809        }
2810    }
2811}