polars_sql/
context.rs

1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12    BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13    FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14    OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15    Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16    WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29    pub(crate) frame: LazyFrame,
30    pub(crate) name: PlSmallStr,
31    pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
36    ilike: Option<regex::Regex>,               // SELECT * ILIKE
37    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
38    replace: Vec<Expr>,                        // SELECT * REPLACE
39}
40impl SelectModifiers {
41    fn matches_ilike(&self, s: &str) -> bool {
42        match &self.ilike {
43            Some(rx) => rx.is_match(s),
44            None => true,
45        }
46    }
47    fn renamed_cols(&self) -> Vec<Expr> {
48        self.rename
49            .iter()
50            .map(|(before, after)| col(before.clone()).alias(after.clone()))
51            .collect()
52    }
53}
54
55/// The SQLContext is the main entry point for executing SQL queries.
56#[derive(Clone)]
57pub struct SQLContext {
58    pub(crate) table_map: PlHashMap<String, LazyFrame>,
59    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60    pub(crate) lp_arena: Arena<IR>,
61    pub(crate) expr_arena: Arena<AExpr>,
62
63    cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64    table_aliases: RefCell<PlHashMap<String, String>>,
65    joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69    fn default() -> Self {
70        Self {
71            function_registry: Arc::new(DefaultFunctionRegistry {}),
72            table_map: Default::default(),
73            cte_map: Default::default(),
74            table_aliases: Default::default(),
75            joined_aliases: Default::default(),
76            lp_arena: Default::default(),
77            expr_arena: Default::default(),
78        }
79    }
80}
81
82impl SQLContext {
83    /// Create a new SQLContext.
84    /// ```rust
85    /// # use polars_sql::SQLContext;
86    /// # fn main() {
87    /// let ctx = SQLContext::new();
88    /// # }
89    /// ```
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Get the names of all registered tables, in sorted order.
95    pub fn get_tables(&self) -> Vec<String> {
96        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97        tables.sort_unstable();
98        tables
99    }
100
101    /// Register a [`LazyFrame`] as a table in the SQLContext.
102    /// ```rust
103    /// # use polars_sql::SQLContext;
104    /// # use polars_core::prelude::*;
105    /// # use polars_lazy::prelude::*;
106    /// # fn main() {
107    ///
108    /// let mut ctx = SQLContext::new();
109    /// let df = df! {
110    ///    "a" =>  [1, 2, 3],
111    /// }.unwrap().lazy();
112    ///
113    /// ctx.register("df", df);
114    /// # }
115    ///```
116    pub fn register(&mut self, name: &str, lf: LazyFrame) {
117        self.table_map.insert(name.to_owned(), lf);
118    }
119
120    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
121    pub fn unregister(&mut self, name: &str) {
122        self.table_map.remove(&name.to_owned());
123    }
124
125    /// Execute a SQL query, returning a [`LazyFrame`].
126    /// ```rust
127    /// # use polars_sql::SQLContext;
128    /// # use polars_core::prelude::*;
129    /// # use polars_lazy::prelude::*;
130    /// # fn main() {
131    ///
132    /// let mut ctx = SQLContext::new();
133    /// let df = df! {
134    ///    "a" =>  [1, 2, 3],
135    /// }
136    /// .unwrap();
137    ///
138    /// ctx.register("df", df.clone().lazy());
139    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
140    /// assert!(sql_df.equals(&df));
141    /// # }
142    ///```
143    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144        let mut parser = Parser::new(&GenericDialect);
145        parser = parser.with_options(ParserOptions {
146            trailing_commas: true,
147            ..Default::default()
148        });
149
150        let ast = parser
151            .try_with_sql(query)
152            .map_err(to_sql_interface_err)?
153            .parse_statements()
154            .map_err(to_sql_interface_err)?;
155
156        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157        let res = self.execute_statement(ast.first().unwrap())?;
158
159        // Ensure the result uses the proper arenas.
160        // This will instantiate new arenas with a new version.
161        let lp_arena = std::mem::take(&mut self.lp_arena);
162        let expr_arena = std::mem::take(&mut self.expr_arena);
163        res.set_cached_arena(lp_arena, expr_arena);
164
165        // Every execution should clear the statement-level maps.
166        self.cte_map.borrow_mut().clear();
167        self.table_aliases.borrow_mut().clear();
168        self.joined_aliases.borrow_mut().clear();
169
170        Ok(res)
171    }
172
173    /// add a function registry to the SQLContext
174    /// the registry provides the ability to add custom functions to the SQLContext
175    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176        self.function_registry = function_registry;
177        self
178    }
179
180    /// Get the function registry of the SQLContext
181    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182        &self.function_registry
183    }
184
185    /// Get a mutable reference to the function registry of the SQLContext
186    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187        Arc::get_mut(&mut self.function_registry).unwrap()
188    }
189}
190
191impl SQLContext {
192    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193        let ast = stmt;
194        Ok(match ast {
195            Statement::Query(query) => self.execute_query(query)?,
196            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198            stmt @ Statement::Drop {
199                object_type: ObjectType::Table,
200                ..
201            } => self.execute_drop_table(stmt)?,
202            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205            _ => polars_bail!(
206                SQLInterface: "statement type is not supported:\n{:?}", ast,
207            ),
208        })
209    }
210
211    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212        self.register_ctes(query)?;
213        self.execute_query_no_ctes(query)
214    }
215
216    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217        let lf = self.process_query(&query.body, query)?;
218        self.process_limit_offset(lf, &query.limit, &query.offset)
219    }
220
221    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223    }
224
225    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226        let table = self.table_map.get(name).cloned();
227        table
228            .or_else(|| self.cte_map.borrow().get(name).cloned())
229            .or_else(|| {
230                self.table_aliases
231                    .borrow()
232                    .get(name)
233                    .and_then(|alias| self.table_map.get(alias).cloned())
234            })
235    }
236
237    fn expr_or_ordinal(
238        &mut self,
239        e: &SQLExpr,
240        exprs: &[Expr],
241        selected: Option<&[Expr]>,
242        schema: Option<&Schema>,
243        clause: &str,
244    ) -> PolarsResult<Expr> {
245        match e {
246            SQLExpr::UnaryOp {
247                op: UnaryOperator::Minus,
248                expr,
249            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251                    Err(polars_err!(
252                    SQLSyntax:
253                    "negative ordinal values are invalid for {}; found -{}",
254                    clause,
255                    idx
256                    ))
257                } else {
258                    unreachable!()
259                }
260            },
261            SQLExpr::Value(SQLValue::Number(idx, _)) => {
262                // note: sql queries are 1-indexed
263                let idx = idx.parse::<usize>().map_err(|_| {
264                    polars_err!(
265                        SQLSyntax:
266                        "negative ordinal values are invalid for {}; found {}",
267                        clause,
268                        idx
269                    )
270                })?;
271                // note: "selected" cols represent final projection order, so we use those for
272                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
273                let cols = if let Some(cols) = selected {
274                    cols
275                } else {
276                    exprs
277                };
278                Ok(cols
279                    .get(idx - 1)
280                    .ok_or_else(|| {
281                        polars_err!(
282                            SQLInterface:
283                            "{} ordinal value must refer to a valid column; found {}",
284                            clause,
285                            idx
286                        )
287                    })?
288                    .clone())
289            },
290            SQLExpr::Value(v) => Err(polars_err!(
291                SQLSyntax:
292                "{} requires a valid expression or positive ordinal; found {}", clause, v,
293            )),
294            _ => parse_sql_expr(e, self, schema),
295        }
296    }
297
298    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299        if self.joined_aliases.borrow().contains_key(tbl_name) {
300            self.joined_aliases
301                .borrow()
302                .get(tbl_name)
303                .and_then(|aliases| aliases.get(column_name))
304                .cloned()
305                .unwrap_or_else(|| column_name.to_string())
306        } else {
307            column_name.to_string()
308        }
309    }
310
311    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312        match expr {
313            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314            SetExpr::Query(query) => self.execute_query_no_ctes(query),
315            SetExpr::SetOperation {
316                op: SetOperator::Union,
317                set_quantifier,
318                left,
319                right,
320            } => self.process_union(left, right, set_quantifier, query),
321
322            #[cfg(feature = "semi_anti_join")]
323            SetExpr::SetOperation {
324                op: SetOperator::Intersect | SetOperator::Except,
325                set_quantifier,
326                left,
327                right,
328            } => self.process_except_intersect(left, right, set_quantifier, query),
329
330            SetExpr::Values(Values {
331                explicit_row: _,
332                rows,
333            }) => self.process_values(rows),
334
335            SetExpr::Table(tbl) => {
336                if tbl.table_name.is_some() {
337                    let table_name = tbl.table_name.as_ref().unwrap();
338                    self.get_table_from_current_scope(table_name)
339                        .ok_or_else(|| {
340                            polars_err!(
341                                SQLInterface: "no table or alias named '{}' found",
342                                tbl
343                            )
344                        })
345                } else {
346                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347                }
348            },
349            op => {
350                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351            },
352        }
353    }
354
355    #[cfg(feature = "semi_anti_join")]
356    fn process_except_intersect(
357        &mut self,
358        left: &SetExpr,
359        right: &SetExpr,
360        quantifier: &SetQuantifier,
361        query: &Query,
362    ) -> PolarsResult<LazyFrame> {
363        let (join_type, op_name) = match *query.body {
364            SetExpr::SetOperation {
365                op: SetOperator::Except,
366                ..
367            } => (JoinType::Anti, "EXCEPT"),
368            _ => (JoinType::Semi, "INTERSECT"),
369        };
370        let mut lf = self.process_query(left, query)?;
371        let mut rf = self.process_query(right, query)?;
372        let join = lf
373            .clone()
374            .join_builder()
375            .with(rf.clone())
376            .how(join_type)
377            .join_nulls(true);
378
379        let lf_schema = self.get_frame_schema(&mut lf)?;
380        let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381        let joined_tbl = match quantifier {
382            SetQuantifier::ByName => join.on(lf_cols).finish(),
383            SetQuantifier::Distinct | SetQuantifier::None => {
384                let rf_schema = self.get_frame_schema(&mut rf)?;
385                let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386                if lf_cols.len() != rf_cols.len() {
387                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388                }
389                join.left_on(lf_cols).right_on(rf_cols).finish()
390            },
391            _ => {
392                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393            },
394        };
395        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396    }
397
398    fn process_union(
399        &mut self,
400        left: &SetExpr,
401        right: &SetExpr,
402        quantifier: &SetQuantifier,
403        query: &Query,
404    ) -> PolarsResult<LazyFrame> {
405        let mut lf = self.process_query(left, query)?;
406        let mut rf = self.process_query(right, query)?;
407        let opts = UnionArgs {
408            parallel: true,
409            to_supertypes: true,
410            ..Default::default()
411        };
412        match quantifier {
413            // UNION [ALL | DISTINCT]
414            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415                let lf_schema = self.get_frame_schema(&mut lf)?;
416                let rf_schema = self.get_frame_schema(&mut rf)?;
417                if lf_schema.len() != rf_schema.len() {
418                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419                }
420                let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421                match quantifier {
422                    SetQuantifier::Distinct | SetQuantifier::None => {
423                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424                    },
425                    _ => concatenated,
426                }
427            },
428            // UNION ALL BY NAME
429            #[cfg(feature = "diagonal_concat")]
430            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431            // UNION [DISTINCT] BY NAME
432            #[cfg(feature = "diagonal_concat")]
433            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436            },
437            #[allow(unreachable_patterns)]
438            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439        }
440    }
441
442    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443        let frame_rows: Vec<Row> = values.iter().map(|row| {
444            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445                let expr = parse_sql_expr(expr, self, None)?;
446                match expr {
447                    Expr::Literal(value) => {
448                        value.to_any_value()
449                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450                            .map(|av| av.into_static())
451                    },
452                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453                }
454            }).collect();
455            row_data.map(Row::new)
456        }).collect::<Result<_, _>>()?;
457
458        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459    }
460
461    // EXPLAIN SELECT * FROM DF
462    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463        match stmt {
464            Statement::Explain { statement, .. } => {
465                let lf = self.execute_statement(statement)?;
466                let plan = lf.describe_optimized_plan()?;
467                let plan = plan
468                    .split('\n')
469                    .collect::<Series>()
470                    .with_name(PlSmallStr::from_static("Logical Plan"))
471                    .into_column();
472                let df = DataFrame::new(vec![plan])?;
473                Ok(df.lazy())
474            },
475            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476        }
477    }
478
479    // SHOW TABLES
480    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481        let tables = Column::new("name".into(), self.get_tables());
482        let df = DataFrame::new(vec![tables])?;
483        Ok(df.lazy())
484    }
485
486    // DROP TABLE <tbl>
487    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488        match stmt {
489            Statement::Drop { names, .. } => {
490                names.iter().for_each(|name| {
491                    self.table_map.remove(&name.to_string());
492                });
493                Ok(DataFrame::empty().lazy())
494            },
495            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496        }
497    }
498
499    // DELETE FROM <tbl> [WHERE ...]
500    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501        if let Statement::Delete(Delete {
502            tables,
503            from,
504            using,
505            selection,
506            returning,
507            order_by,
508            limit,
509        }) = stmt
510        {
511            if !tables.is_empty()
512                || using.is_some()
513                || returning.is_some()
514                || limit.is_some()
515                || !order_by.is_empty()
516            {
517                let error_message = match () {
518                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
519                    _ if using.is_some() => "DELETE does not support the USING clause",
520                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523                    _ => unreachable!(),
524                };
525                polars_bail!(SQLInterface: error_message);
526            }
527            let from_tables = match &from {
528                FromTable::WithFromKeyword(from) => from,
529                FromTable::WithoutKeyword(from) => from,
530            };
531            if from_tables.len() > 1 {
532                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533            }
534            let tbl_expr = from_tables.first().unwrap();
535            if !tbl_expr.joins.is_empty() {
536                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537            }
538            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539            if selection.is_none() {
540                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
541                Ok(DataFrame::empty_with_schema(
542                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543                        .unwrap()
544                        .as_ref(),
545                )
546                .lazy())
547            } else {
548                // apply constraint as inverted filter (drops rows matching the selection)
549                Ok(self.process_where(lf.clone(), selection, true)?)
550            }
551        } else {
552            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553        }
554    }
555
556    // TRUNCATE <tbl>
557    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558        if let Statement::Truncate {
559            table_names,
560            partitions,
561            ..
562        } = stmt
563        {
564            match partitions {
565                None => {
566                    if table_names.len() != 1 {
567                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568                    }
569                    let tbl = table_names[0].to_string();
570                    if let Some(lf) = self.table_map.get_mut(&tbl) {
571                        *lf = DataFrame::empty_with_schema(
572                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573                                .unwrap()
574                                .as_ref(),
575                        )
576                        .lazy();
577                        Ok(lf.clone())
578                    } else {
579                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580                    }
581                },
582                _ => {
583                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584                },
585            }
586        } else {
587            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588        }
589    }
590
591    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592        self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593    }
594
595    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596        if let Some(with) = &query.with {
597            if with.recursive {
598                polars_bail!(SQLInterface: "recursive CTEs are not supported")
599            }
600            for cte in &with.cte_tables {
601                let cte_name = cte.alias.name.value.clone();
602                let mut lf = self.execute_query(&cte.query)?;
603                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604                self.register_cte(&cte_name, lf);
605            }
606        }
607        Ok(())
608    }
609
610    /// execute the 'FROM' part of the query
611    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613        if !tbl_expr.joins.is_empty() {
614            for join in &tbl_expr.joins {
615                let (r_name, mut rf) = self.get_table(&join.relation)?;
616                if r_name.is_empty() {
617                    // Require non-empty to avoid duplicate column errors from nested self-joins.
618                    polars_bail!(
619                        SQLInterface:
620                        "cannot join on unnamed relation; please provide an alias"
621                    )
622                }
623                let left_schema = self.get_frame_schema(&mut lf)?;
624                let right_schema = self.get_frame_schema(&mut rf)?;
625
626                lf = match &join.join_operator {
627                    op @ (JoinOperator::FullOuter(constraint)
628                    | JoinOperator::LeftOuter(constraint)
629                    | JoinOperator::RightOuter(constraint)
630                    | JoinOperator::Inner(constraint)
631                    | JoinOperator::Anti(constraint)
632                    | JoinOperator::Semi(constraint)
633                    | JoinOperator::LeftAnti(constraint)
634                    | JoinOperator::LeftSemi(constraint)
635                    | JoinOperator::RightAnti(constraint)
636                    | JoinOperator::RightSemi(constraint)) => {
637                        let (lf, rf) = match op {
638                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639                            _ => (lf, rf),
640                        };
641                        self.process_join(
642                            &TableInfo {
643                                frame: lf,
644                                name: (&l_name).into(),
645                                schema: left_schema.clone(),
646                            },
647                            &TableInfo {
648                                frame: rf,
649                                name: (&r_name).into(),
650                                schema: right_schema.clone(),
651                            },
652                            constraint,
653                            match op {
654                                JoinOperator::FullOuter(_) => JoinType::Full,
655                                JoinOperator::LeftOuter(_) => JoinType::Left,
656                                JoinOperator::RightOuter(_) => JoinType::Right,
657                                JoinOperator::Inner(_) => JoinType::Inner,
658                                #[cfg(feature = "semi_anti_join")]
659                                JoinOperator::Anti(_)
660                                | JoinOperator::LeftAnti(_)
661                                | JoinOperator::RightAnti(_) => JoinType::Anti,
662                                #[cfg(feature = "semi_anti_join")]
663                                JoinOperator::Semi(_)
664                                | JoinOperator::LeftSemi(_)
665                                | JoinOperator::RightSemi(_) => JoinType::Semi,
666                                join_type => polars_bail!(
667                                    SQLInterface:
668                                    "join type '{:?}' not currently supported",
669                                    join_type
670                                ),
671                            },
672                        )?
673                    },
674                    JoinOperator::CrossJoin => {
675                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676                    },
677                    join_type => {
678                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679                    },
680                };
681
682                // track join-aliased columns so we can resolve them later
683                let joined_schema = self.get_frame_schema(&mut lf)?;
684
685                self.joined_aliases.borrow_mut().insert(
686                    r_name.clone(),
687                    right_schema
688                        .iter_names()
689                        .filter_map(|name| {
690                            // col exists in both tables and is aliased in the joined result
691                            let aliased_name = format!("{name}:{r_name}");
692                            if left_schema.contains(name)
693                                && joined_schema.contains(aliased_name.as_str())
694                            {
695                                Some((name.to_string(), aliased_name))
696                            } else {
697                                None
698                            }
699                        })
700                        .collect::<PlHashMap<String, String>>(),
701                );
702            }
703        };
704        Ok(lf)
705    }
706
707    /// Execute the 'SELECT' part of the query.
708    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709        let mut lf = if select_stmt.from.is_empty() {
710            DataFrame::empty().lazy()
711        } else {
712            // Note: implicit joins need more work to support properly,
713            // explicit joins are preferred for now (ref: #16662)
714            let from = select_stmt.clone().from;
715            if from.len() > 1 {
716                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717            }
718            self.execute_from_statement(from.first().unwrap())?
719        };
720
721        // Filter expression (WHERE clause)
722        let schema = self.get_frame_schema(&mut lf)?;
723        lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725        // 'SELECT *' modifiers
726        let mut select_modifiers = SelectModifiers {
727            ilike: None,
728            exclude: PlHashSet::new(),
729            rename: PlHashMap::new(),
730            replace: vec![],
731        };
732
733        let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735        // Check for "GROUP BY ..." (after determining projections)
736        let mut group_by_keys: Vec<Expr> = Vec::new();
737        match &select_stmt.group_by {
738            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
739            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740                if !modifiers.is_empty() {
741                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742                }
743                // translate the group expressions, allowing ordinal values
744                group_by_keys = group_by_exprs
745                    .iter()
746                    .map(|e| {
747                        self.expr_or_ordinal(
748                            e,
749                            &projections,
750                            None,
751                            Some(schema.deref()),
752                            "GROUP BY",
753                        )
754                    })
755                    .collect::<PolarsResult<_>>()?
756            },
757            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
758            // nested agg/window funcs to the group key (also ignores literals).
759            GroupByExpr::All(modifiers) => {
760                if !modifiers.is_empty() {
761                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762                }
763                projections.iter().for_each(|expr| match expr {
764                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
765                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766                    Expr::Column(_) => group_by_keys.push(expr.clone()),
767                    Expr::Alias(e, _)
768                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770                        if let Expr::Column(name) = &**e {
771                            group_by_keys.push(col(name.clone()));
772                        }
773                    },
774                    _ => {
775                        // If not quick-matched, add if no nested agg/window expressions
776                        if !has_expr(expr, |e| {
777                            matches!(e, Expr::Agg(_))
778                                || matches!(e, Expr::Len)
779                                || matches!(e, Expr::Window { .. })
780                        }) {
781                            group_by_keys.push(expr.clone())
782                        }
783                    },
784                });
785            },
786        };
787
788        lf = if group_by_keys.is_empty() {
789            // The 'having' clause is only valid inside 'group by'
790            if select_stmt.having.is_some() {
791                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792            };
793
794            // Final/selected cols, accounting for 'SELECT *' modifiers
795            let mut retained_cols = Vec::with_capacity(projections.len());
796            let mut retained_names = Vec::with_capacity(projections.len());
797            let have_order_by = query.order_by.is_some();
798            // Initialize containing InheritsContext to handle empty projection case.
799            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801            // Note: if there is an 'order by' then we project everything (original cols
802            // and new projections) and *then* select the final cols; the retained cols
803            // are used to ensure a correct final projection. If there's no 'order by',
804            // clause then we can project the final column *expressions* directly.
805            for p in projections.iter() {
806                let name = p
807                    .to_field(schema.deref(), Context::Default)?
808                    .name
809                    .to_string();
810                if select_modifiers.matches_ilike(&name)
811                    && !select_modifiers.exclude.contains(&name)
812                {
813                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815                    retained_cols.push(if have_order_by {
816                        col(name.as_str())
817                    } else {
818                        p.clone()
819                    });
820                    retained_names.push(col(name));
821                }
822            }
823
824            // Apply the remaining modifiers and establish the final projection
825            if have_order_by {
826                // We can safely use `with_columns()` and avoid a join if:
827                // * There is already a projection that projects to the table height.
828                // * All projection heights inherit from context (e.g. all scalar literals that
829                //   are to be broadcasted to table height).
830                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832                {
833                    lf = lf.with_columns(projections);
834                } else {
835                    // We hit this branch if the output height is not guaranteed to match the table
836                    // height. E.g.:
837                    //
838                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
839                    // * SELECT UNNEST(list_col) FROM df ORDER BY sort_key;
840                    //
841                    // For these cases we truncate / extend the sorting columns with NULLs to match
842                    // the output height. We do this by projecting independently and then joining
843                    // back the original frame on the row index.
844                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845                    lf = lf
846                        .clone()
847                        .select(projections)
848                        .with_row_index(NAME, None)
849                        .join(
850                            lf.with_row_index(NAME, None),
851                            [col(NAME)],
852                            [col(NAME)],
853                            JoinArgs {
854                                how: JoinType::Left,
855                                validation: Default::default(),
856                                suffix: None,
857                                slice: None,
858                                nulls_equal: false,
859                                coalesce: Default::default(),
860                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861                            },
862                        );
863                }
864            }
865
866            if !select_modifiers.replace.is_empty() {
867                lf = lf.with_columns(&select_modifiers.replace);
868            }
869            if !select_modifiers.rename.is_empty() {
870                lf = lf.with_columns(select_modifiers.renamed_cols());
871            }
872
873            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875            // Note: If `have_order_by`, with_columns is already done above.
876            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877                && !have_order_by
878            {
879                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
880                lf = lf.with_columns(retained_cols).select(retained_names);
881            } else {
882                lf = lf.select(retained_cols);
883            }
884
885            if !select_modifiers.rename.is_empty() {
886                lf = lf.rename(
887                    select_modifiers.rename.keys(),
888                    select_modifiers.rename.values(),
889                    true,
890                );
891            };
892            lf
893        } else {
894            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895            lf = self.process_order_by(lf, &query.order_by, None)?;
896
897            // Apply optional 'having' clause, post-aggregation.
898            let schema = Some(self.get_frame_schema(&mut lf)?);
899            match select_stmt.having.as_ref() {
900                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901                None => lf,
902            }
903        };
904
905        // Apply optional DISTINCT clause.
906        lf = match &select_stmt.distinct {
907            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908            Some(Distinct::On(exprs)) => {
909                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
910                let schema = Some(self.get_frame_schema(&mut lf)?);
911                let cols = exprs
912                    .iter()
913                    .map(|e| {
914                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
915                        if let Expr::Column(name) = expr {
916                            Ok(name.clone())
917                        } else {
918                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919                        }
920                    })
921                    .collect::<PolarsResult<Vec<_>>>()?;
922
923                // DISTINCT ON has to apply the ORDER BY before the operation.
924                lf = self.process_order_by(lf, &query.order_by, None)?;
925                return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
926            },
927            None => lf,
928        };
929        Ok(lf)
930    }
931
932    fn column_projections(
933        &mut self,
934        select_stmt: &Select,
935        schema: &SchemaRef,
936        select_modifiers: &mut SelectModifiers,
937    ) -> PolarsResult<Vec<Expr>> {
938        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
939            .projection
940            .iter()
941            .map(|select_item| match select_item {
942                SelectItem::UnnamedExpr(expr) => {
943                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
944                },
945                SelectItem::ExprWithAlias { expr, alias } => {
946                    let expr = parse_sql_expr(expr, self, Some(schema))?;
947                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
948                },
949                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
950                    .process_qualified_wildcard(
951                        obj_name,
952                        wildcard_options,
953                        select_modifiers,
954                        Some(schema),
955                    ),
956                SelectItem::Wildcard(wildcard_options) => {
957                    let cols = schema
958                        .iter_names()
959                        .map(|name| col(name.clone()))
960                        .collect::<Vec<_>>();
961
962                    self.process_wildcard_additional_options(
963                        cols,
964                        wildcard_options,
965                        select_modifiers,
966                        Some(schema),
967                    )
968                },
969            })
970            .collect();
971
972        let flattened_exprs: Vec<Expr> = parsed_items?
973            .into_iter()
974            .flatten()
975            .flat_map(|expr| expand_exprs(expr, schema))
976            .collect();
977
978        Ok(flattened_exprs)
979    }
980
981    fn process_where(
982        &mut self,
983        mut lf: LazyFrame,
984        expr: &Option<SQLExpr>,
985        invert_filter: bool,
986    ) -> PolarsResult<LazyFrame> {
987        if let Some(expr) = expr {
988            let schema = self.get_frame_schema(&mut lf)?;
989
990            // shortcut filter evaluation if given expression is just TRUE or FALSE
991            let (all_true, all_false) = match expr {
992                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
993                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
994                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
995                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
996                        (a != b, a == b)
997                    },
998                    _ => (false, false),
999                },
1000                _ => (false, false),
1001            };
1002            if (all_true && !invert_filter) || (all_false && invert_filter) {
1003                return Ok(lf);
1004            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1005                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1006            }
1007
1008            // ...otherwise parse and apply the filter as normal
1009            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1010            if filter_expression.clone().meta().has_multiple_outputs() {
1011                filter_expression = all_horizontal([filter_expression])?;
1012            }
1013            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1014            lf = if invert_filter {
1015                lf.remove(filter_expression)
1016            } else {
1017                lf.filter(filter_expression)
1018            };
1019        }
1020        Ok(lf)
1021    }
1022
1023    pub(super) fn process_join(
1024        &mut self,
1025        tbl_left: &TableInfo,
1026        tbl_right: &TableInfo,
1027        constraint: &JoinConstraint,
1028        join_type: JoinType,
1029    ) -> PolarsResult<LazyFrame> {
1030        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1031
1032        let joined = tbl_left
1033            .frame
1034            .clone()
1035            .join_builder()
1036            .with(tbl_right.frame.clone())
1037            .left_on(left_on)
1038            .right_on(right_on)
1039            .how(join_type)
1040            .suffix(format!(":{}", tbl_right.name))
1041            .coalesce(JoinCoalesce::KeepColumns)
1042            .finish();
1043
1044        Ok(joined)
1045    }
1046
1047    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1048        let mut contexts = vec![];
1049        for expr in exprs {
1050            *expr = expr.clone().map_expr(|e| match e {
1051                Expr::SubPlan(lp, names) => {
1052                    contexts.push(<LazyFrame>::from((**lp).clone()));
1053                    if names.len() == 1 {
1054                        Expr::Column(names[0].as_str().into())
1055                    } else {
1056                        Expr::SubPlan(lp, names)
1057                    }
1058                },
1059                e => e,
1060            })
1061        }
1062
1063        if contexts.is_empty() {
1064            lf
1065        } else {
1066            lf.with_context(contexts)
1067        }
1068    }
1069
1070    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1071        if let Statement::CreateTable(CreateTable {
1072            if_not_exists,
1073            name,
1074            query,
1075            ..
1076        }) = stmt
1077        {
1078            let tbl_name = name.0.first().unwrap().value.as_str();
1079            // CREATE TABLE IF NOT EXISTS
1080            if *if_not_exists && self.table_map.contains_key(tbl_name) {
1081                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1082                // CREATE OR REPLACE TABLE
1083            }
1084            if let Some(query) = query {
1085                let lf = self.execute_query(query)?;
1086                self.register(tbl_name, lf);
1087                let out = df! {
1088                    "Response" => ["CREATE TABLE"]
1089                }
1090                .unwrap()
1091                .lazy();
1092                Ok(out)
1093            } else {
1094                polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1095            }
1096        } else {
1097            unreachable!()
1098        }
1099    }
1100
1101    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1102        match relation {
1103            TableFactor::Table {
1104                name, alias, args, ..
1105            } => {
1106                if let Some(args) = args {
1107                    return self.execute_table_function(name, alias, &args.args);
1108                }
1109                let tbl_name = name.0.first().unwrap().value.as_str();
1110                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1111                    match alias {
1112                        Some(alias) => {
1113                            self.table_aliases
1114                                .borrow_mut()
1115                                .insert(alias.name.value.clone(), tbl_name.to_string());
1116                            Ok((alias.to_string(), lf))
1117                        },
1118                        None => Ok((tbl_name.to_string(), lf)),
1119                    }
1120                } else {
1121                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1122                }
1123            },
1124            TableFactor::Derived {
1125                lateral,
1126                subquery,
1127                alias,
1128            } => {
1129                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1130                if let Some(alias) = alias {
1131                    let mut lf = self.execute_query_no_ctes(subquery)?;
1132                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1133                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1134                    Ok((alias.name.value.clone(), lf))
1135                } else {
1136                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1137                }
1138            },
1139            TableFactor::UNNEST {
1140                alias,
1141                array_exprs,
1142                with_offset,
1143                with_offset_alias: _,
1144                ..
1145            } => {
1146                if let Some(alias) = alias {
1147                    let column_names: Vec<Option<PlSmallStr>> = alias
1148                        .columns
1149                        .iter()
1150                        .map(|c| {
1151                            if c.name.value.is_empty() {
1152                                None
1153                            } else {
1154                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1155                            }
1156                        })
1157                        .collect();
1158
1159                    let column_values: Vec<Series> = array_exprs
1160                        .iter()
1161                        .map(|arr| parse_sql_array(arr, self))
1162                        .collect::<Result<_, _>>()?;
1163
1164                    polars_ensure!(!column_names.is_empty(),
1165                        SQLSyntax:
1166                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1167                    );
1168                    if column_names.len() != column_values.len() {
1169                        let plural = if column_values.len() > 1 { "s" } else { "" };
1170                        polars_bail!(
1171                            SQLSyntax:
1172                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1173                        );
1174                    }
1175                    let column_series: Vec<Column> = column_values
1176                        .into_iter()
1177                        .zip(column_names)
1178                        .map(|(s, name)| {
1179                            if let Some(name) = name {
1180                                s.clone().with_name(name)
1181                            } else {
1182                                s.clone()
1183                            }
1184                        })
1185                        .map(Column::from)
1186                        .collect();
1187
1188                    let lf = DataFrame::new(column_series)?.lazy();
1189
1190                    if *with_offset {
1191                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1192                        //  (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1193                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1194                    }
1195                    let table_name = alias.name.value.clone();
1196                    self.table_map.insert(table_name.clone(), lf.clone());
1197                    Ok((table_name.clone(), lf))
1198                } else {
1199                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1200                }
1201            },
1202            TableFactor::NestedJoin {
1203                table_with_joins,
1204                alias,
1205            } => {
1206                let lf = self.execute_from_statement(table_with_joins)?;
1207                match alias {
1208                    Some(a) => Ok((a.name.value.clone(), lf)),
1209                    None => Ok(("".to_string(), lf)),
1210                }
1211            },
1212            // Support bare table, optionally with an alias, for now
1213            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1214        }
1215    }
1216
1217    fn execute_table_function(
1218        &mut self,
1219        name: &ObjectName,
1220        alias: &Option<TableAlias>,
1221        args: &[FunctionArg],
1222    ) -> PolarsResult<(String, LazyFrame)> {
1223        let tbl_fn = name.0.first().unwrap().value.as_str();
1224        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1225        let (tbl_name, lf) = read_fn.execute(args)?;
1226        #[allow(clippy::useless_asref)]
1227        let tbl_name = alias
1228            .as_ref()
1229            .map(|a| a.name.value.clone())
1230            .unwrap_or_else(|| tbl_name);
1231
1232        self.table_map.insert(tbl_name.clone(), lf.clone());
1233        Ok((tbl_name, lf))
1234    }
1235
1236    fn process_order_by(
1237        &mut self,
1238        mut lf: LazyFrame,
1239        order_by: &Option<OrderBy>,
1240        selected: Option<&[Expr]>,
1241    ) -> PolarsResult<LazyFrame> {
1242        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1243            return Ok(lf);
1244        }
1245        let schema = self.get_frame_schema(&mut lf)?;
1246        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1247
1248        let order_by = order_by.as_ref().unwrap().exprs.clone();
1249        let mut descending = Vec::with_capacity(order_by.len());
1250        let mut nulls_last = Vec::with_capacity(order_by.len());
1251        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1252
1253        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1254            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1255        {
1256            if let Some(selected) = selected {
1257                by.extend(selected.iter().cloned());
1258            } else {
1259                by.extend(columns_iter);
1260            };
1261            let desc_order = !order_by[0].asc.unwrap_or(true);
1262            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1263            descending.resize(by.len(), desc_order);
1264        } else {
1265            let columns = &columns_iter.collect::<Vec<_>>();
1266            for ob in order_by {
1267                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1268                // https://www.postgresql.org/docs/current/queries-order.html
1269                let desc_order = !ob.asc.unwrap_or(true);
1270                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1271                descending.push(desc_order);
1272
1273                // translate order expression, allowing ordinal values
1274                by.push(self.expr_or_ordinal(
1275                    &ob.expr,
1276                    columns,
1277                    selected,
1278                    Some(&schema),
1279                    "ORDER BY",
1280                )?)
1281            }
1282        }
1283        Ok(lf.sort_by_exprs(
1284            &by,
1285            SortMultipleOptions::default()
1286                .with_order_descending_multi(descending)
1287                .with_nulls_last_multi(nulls_last)
1288                .with_maintain_order(true),
1289        ))
1290    }
1291
1292    fn process_group_by(
1293        &mut self,
1294        mut lf: LazyFrame,
1295        group_by_keys: &[Expr],
1296        projections: &[Expr],
1297    ) -> PolarsResult<LazyFrame> {
1298        let schema_before = self.get_frame_schema(&mut lf)?;
1299        let group_by_keys_schema =
1300            expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1301
1302        // Remove the group_by keys as polars adds those implicitly.
1303        let mut aggregation_projection = Vec::with_capacity(projections.len());
1304        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1305        let mut projection_aliases = PlHashSet::new();
1306        let mut group_key_aliases = PlHashSet::new();
1307
1308        for mut e in projections {
1309            // `Len` represents COUNT(*) so we treat as an aggregation here.
1310            let is_agg_or_window = has_expr(e, |e| {
1311                matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1312            });
1313
1314            let mut is_function_under_alias = false;
1315
1316            // Note: if simple aliased expression we defer aliasing until after the group_by.
1317            if let Expr::Alias(expr, alias) = e {
1318                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1319                    group_key_aliases.insert(alias.as_ref());
1320                    e = expr
1321                } else if let Expr::Function {
1322                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1323                    ..
1324                } = expr.deref()
1325                {
1326                    projection_overrides
1327                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1328                } else if let Expr::Function { .. } = expr.deref() {
1329                    is_function_under_alias = true;
1330                } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1331                    projection_aliases.insert(alias.as_ref());
1332                }
1333            }
1334            let field = e.to_field(&schema_before, Context::Default)?;
1335            if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1336                let mut e = e.clone();
1337                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1338                    e = (**expr).clone();
1339                } else if let Expr::Alias(expr, name) = &e {
1340                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1341                        e = (**expr).clone().alias(name.clone());
1342                    }
1343                }
1344                aggregation_projection.push(e);
1345            } else if let Expr::Column(_)
1346            | Expr::Function {
1347                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1348                ..
1349            } = e
1350            {
1351                // Non-aggregated columns must be part of the GROUP BY clause
1352                if !group_by_keys_schema.contains(&field.name) {
1353                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1354                }
1355            } else if is_function_under_alias || matches!(e, Expr::Function { .. }) {
1356                aggregation_projection.push(e.clone());
1357            } else if let Expr::Literal { .. }
1358            | Expr::Cast { .. }
1359            | Expr::Ternary { .. }
1360            | Expr::Field { .. }
1361            | Expr::Alias { .. } = e
1362            {
1363                // do nothing
1364            } else {
1365                polars_bail!(SQLSyntax: "Unsupported operation in the GROUP BY clause: {}", e);
1366            }
1367        }
1368        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1369        let projection_schema =
1370            expressions_to_schema(projections, &schema_before, Context::Default)?;
1371
1372        // A final projection to get the proper order and any deferred transforms/aliases.
1373        let final_projection = projection_schema
1374            .iter_names()
1375            .zip(projections)
1376            .map(|(name, projection_expr)| {
1377                if let Some(expr) = projection_overrides.get(name.as_str()) {
1378                    expr.clone()
1379                } else if group_by_keys_schema.get(name).is_some()
1380                    || projection_aliases.contains(name.as_str())
1381                    || group_key_aliases.contains(name.as_str())
1382                {
1383                    projection_expr.clone()
1384                } else {
1385                    col(name.clone())
1386                }
1387            })
1388            .collect::<Vec<_>>();
1389
1390        Ok(aggregated.select(&final_projection))
1391    }
1392
1393    fn process_limit_offset(
1394        &self,
1395        lf: LazyFrame,
1396        limit: &Option<SQLExpr>,
1397        offset: &Option<Offset>,
1398    ) -> PolarsResult<LazyFrame> {
1399        match (offset, limit) {
1400            (
1401                Some(Offset {
1402                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1403                    ..
1404                }),
1405                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1406            ) => Ok(lf.slice(
1407                offset
1408                    .parse()
1409                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1410                limit
1411                    .parse()
1412                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1413            )),
1414            (
1415                Some(Offset {
1416                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1417                    ..
1418                }),
1419                None,
1420            ) => Ok(lf.slice(
1421                offset
1422                    .parse()
1423                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1424                IdxSize::MAX,
1425            )),
1426            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1427                limit
1428                    .parse()
1429                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1430            )),
1431            (None, None) => Ok(lf),
1432            _ => polars_bail!(
1433                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1434            ),
1435        }
1436    }
1437
1438    fn process_qualified_wildcard(
1439        &mut self,
1440        ObjectName(idents): &ObjectName,
1441        options: &WildcardAdditionalOptions,
1442        modifiers: &mut SelectModifiers,
1443        schema: Option<&Schema>,
1444    ) -> PolarsResult<Vec<Expr>> {
1445        let mut new_idents = idents.clone();
1446        new_idents.push(Ident::new("*"));
1447
1448        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1449        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1450    }
1451
1452    fn process_wildcard_additional_options(
1453        &mut self,
1454        exprs: Vec<Expr>,
1455        options: &WildcardAdditionalOptions,
1456        modifiers: &mut SelectModifiers,
1457        schema: Option<&Schema>,
1458    ) -> PolarsResult<Vec<Expr>> {
1459        if options.opt_except.is_some() && options.opt_exclude.is_some() {
1460            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1461        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1462            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1463        }
1464
1465        // SELECT * EXCLUDE
1466        if let Some(items) = &options.opt_exclude {
1467            match items {
1468                ExcludeSelectItem::Single(ident) => {
1469                    modifiers.exclude.insert(ident.value.clone());
1470                },
1471                ExcludeSelectItem::Multiple(idents) => {
1472                    modifiers
1473                        .exclude
1474                        .extend(idents.iter().map(|i| i.value.clone()));
1475                },
1476            };
1477        }
1478
1479        // SELECT * EXCEPT
1480        if let Some(items) = &options.opt_except {
1481            modifiers.exclude.insert(items.first_element.value.clone());
1482            modifiers
1483                .exclude
1484                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1485        }
1486
1487        // SELECT * ILIKE
1488        if let Some(item) = &options.opt_ilike {
1489            let rx = regex::escape(item.pattern.as_str())
1490                .replace('%', ".*")
1491                .replace('_', ".");
1492
1493            modifiers.ilike = Some(
1494                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1495            );
1496        }
1497
1498        // SELECT * RENAME
1499        if let Some(items) = &options.opt_rename {
1500            let renames = match items {
1501                RenameSelectItem::Single(rename) => vec![rename],
1502                RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1503            };
1504            for rn in renames {
1505                let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1506                let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1507                if before != after {
1508                    modifiers.rename.insert(before, after);
1509                }
1510            }
1511        }
1512
1513        // SELECT * REPLACE
1514        if let Some(replacements) = &options.opt_replace {
1515            for rp in &replacements.items {
1516                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1517                modifiers
1518                    .replace
1519                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1520            }
1521        }
1522        Ok(exprs)
1523    }
1524
1525    fn rename_columns_from_table_alias(
1526        &mut self,
1527        mut lf: LazyFrame,
1528        alias: &TableAlias,
1529    ) -> PolarsResult<LazyFrame> {
1530        if alias.columns.is_empty() {
1531            Ok(lf)
1532        } else {
1533            let schema = self.get_frame_schema(&mut lf)?;
1534            if alias.columns.len() != schema.len() {
1535                polars_bail!(
1536                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1537                    alias.columns.len(), alias.name.value, schema.len()
1538                )
1539            } else {
1540                let existing_columns: Vec<_> = schema.iter_names().collect();
1541                let new_columns: Vec<_> =
1542                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
1543                Ok(lf.rename(existing_columns, new_columns, true))
1544            }
1545        }
1546    }
1547}
1548
1549impl SQLContext {
1550    /// Get internal table map. For internal use only.
1551    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1552        self.table_map.clone()
1553    }
1554
1555    /// Create a new SQLContext from a table map. For internal use only
1556    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1557        Self {
1558            table_map,
1559            ..Default::default()
1560        }
1561    }
1562}
1563
1564fn collect_compound_identifiers(
1565    left: &[Ident],
1566    right: &[Ident],
1567    left_name: &str,
1568    right_name: &str,
1569) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1570    if left.len() == 2 && right.len() == 2 {
1571        let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1572        let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1573
1574        // switch left/right operands if the caller has them in reverse
1575        if left_name == tbl_b || right_name == tbl_a {
1576            Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1577        } else {
1578            Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1579        }
1580    } else {
1581        polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1582    }
1583}
1584
1585fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1586    match expr {
1587        Expr::Wildcard => schema
1588            .iter_names()
1589            .map(|name| col(name.clone()))
1590            .collect::<Vec<_>>(),
1591        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1592            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1593            schema
1594                .iter_names()
1595                .filter(|name| re.is_match(name))
1596                .map(|name| col(name.clone()))
1597                .collect::<Vec<_>>()
1598        },
1599        Expr::Columns(names) => names
1600            .iter()
1601            .map(|name| col(name.clone()))
1602            .collect::<Vec<_>>(),
1603        _ => vec![expr],
1604    }
1605}
1606
1607fn is_regex_colname(nm: &str) -> bool {
1608    nm.starts_with('^') && nm.ends_with('$')
1609}
1610
1611fn process_join_on(
1612    expression: &sqlparser::ast::Expr,
1613    tbl_left: &TableInfo,
1614    tbl_right: &TableInfo,
1615) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1616    match expression {
1617        SQLExpr::BinaryOp { left, op, right } => match op {
1618            BinaryOperator::And => {
1619                let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1620                let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1621                left_i.append(&mut left_j);
1622                right_i.append(&mut right_j);
1623                Ok((left_i, right_i))
1624            },
1625            BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1626                (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1627                    collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1628                },
1629                _ => {
1630                    polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1631                },
1632            },
1633            _ => {
1634                polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1635            },
1636        },
1637        SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1638        _ => {
1639            polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1640        },
1641    }
1642}
1643
1644fn process_join_constraint(
1645    constraint: &JoinConstraint,
1646    tbl_left: &TableInfo,
1647    tbl_right: &TableInfo,
1648) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1649    match constraint {
1650        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1651            process_join_on(expr, tbl_left, tbl_right)
1652        },
1653        JoinConstraint::Using(idents) if !idents.is_empty() => {
1654            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1655            Ok((using.clone(), using))
1656        },
1657        JoinConstraint::Natural => {
1658            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1659            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1660            let on: Vec<Expr> = left_names
1661                .intersection(&right_names)
1662                .map(|&name| col(name.clone()))
1663                .collect();
1664            if on.is_empty() {
1665                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1666            }
1667            Ok((on.clone(), on))
1668        },
1669        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1670    }
1671}
1672
1673bitflags::bitflags! {
1674    /// Bitfield indicating whether there exists a projection with the specified height behavior.
1675    ///
1676    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
1677    /// context.
1678    #[derive(PartialEq)]
1679    struct ExprSqlProjectionHeightBehavior: u8 {
1680        /// Maintains the height of input column(s)
1681        const MaintainsColumn = 1 << 0;
1682        /// Height is independent of input, e.g.:
1683        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
1684        /// * aggregations: count(*), first(), sum() etc.
1685        const Independent = 1 << 1;
1686        /// "Inherits" the height of the context, e.g.:
1687        /// * Scalar literals
1688        const InheritsContext = 1 << 2;
1689    }
1690}
1691
1692impl ExprSqlProjectionHeightBehavior {
1693    fn identify_from_expr(expr: &Expr) -> Self {
1694        let mut has_column = false;
1695        let mut has_independent = false;
1696
1697        for e in expr.into_iter() {
1698            use Expr::*;
1699
1700            has_column |= matches!(e, Column(_) | Columns(_) | DtypeColumn(_) | IndexColumn(_));
1701
1702            has_independent |= match e {
1703                // @TODO: This is broken now with functions.
1704                AnonymousFunction { options, .. } => {
1705                    options.returns_scalar() || !options.is_length_preserving()
1706                },
1707
1708                Literal(v) => !v.is_scalar(),
1709
1710                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1711
1712                Agg { .. } | Len => true,
1713
1714                _ => false,
1715            }
1716        }
1717
1718        if has_independent {
1719            Self::Independent
1720        } else if has_column {
1721            Self::MaintainsColumn
1722        } else {
1723            Self::InheritsContext
1724        }
1725    }
1726}