polars_sql/
context.rs

1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12    BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13    FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14    OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15    Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16    WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29    pub(crate) frame: LazyFrame,
30    pub(crate) name: PlSmallStr,
31    pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
36    ilike: Option<regex::Regex>,               // SELECT * ILIKE
37    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
38    replace: Vec<Expr>,                        // SELECT * REPLACE
39}
40impl SelectModifiers {
41    fn matches_ilike(&self, s: &str) -> bool {
42        match &self.ilike {
43            Some(rx) => rx.is_match(s),
44            None => true,
45        }
46    }
47    fn renamed_cols(&self) -> Vec<Expr> {
48        self.rename
49            .iter()
50            .map(|(before, after)| col(before.clone()).alias(after.clone()))
51            .collect()
52    }
53}
54
55/// The SQLContext is the main entry point for executing SQL queries.
56#[derive(Clone)]
57pub struct SQLContext {
58    pub(crate) table_map: PlHashMap<String, LazyFrame>,
59    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60    pub(crate) lp_arena: Arena<IR>,
61    pub(crate) expr_arena: Arena<AExpr>,
62
63    cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64    table_aliases: RefCell<PlHashMap<String, String>>,
65    joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69    fn default() -> Self {
70        Self {
71            function_registry: Arc::new(DefaultFunctionRegistry {}),
72            table_map: Default::default(),
73            cte_map: Default::default(),
74            table_aliases: Default::default(),
75            joined_aliases: Default::default(),
76            lp_arena: Default::default(),
77            expr_arena: Default::default(),
78        }
79    }
80}
81
82impl SQLContext {
83    /// Create a new SQLContext.
84    /// ```rust
85    /// # use polars_sql::SQLContext;
86    /// # fn main() {
87    /// let ctx = SQLContext::new();
88    /// # }
89    /// ```
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Get the names of all registered tables, in sorted order.
95    pub fn get_tables(&self) -> Vec<String> {
96        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97        tables.sort_unstable();
98        tables
99    }
100
101    /// Register a [`LazyFrame`] as a table in the SQLContext.
102    /// ```rust
103    /// # use polars_sql::SQLContext;
104    /// # use polars_core::prelude::*;
105    /// # use polars_lazy::prelude::*;
106    /// # fn main() {
107    ///
108    /// let mut ctx = SQLContext::new();
109    /// let df = df! {
110    ///    "a" =>  [1, 2, 3],
111    /// }.unwrap().lazy();
112    ///
113    /// ctx.register("df", df);
114    /// # }
115    ///```
116    pub fn register(&mut self, name: &str, lf: LazyFrame) {
117        self.table_map.insert(name.to_owned(), lf);
118    }
119
120    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
121    pub fn unregister(&mut self, name: &str) {
122        self.table_map.remove(&name.to_owned());
123    }
124
125    /// Execute a SQL query, returning a [`LazyFrame`].
126    /// ```rust
127    /// # use polars_sql::SQLContext;
128    /// # use polars_core::prelude::*;
129    /// # use polars_lazy::prelude::*;
130    /// # fn main() {
131    ///
132    /// let mut ctx = SQLContext::new();
133    /// let df = df! {
134    ///    "a" =>  [1, 2, 3],
135    /// }
136    /// .unwrap();
137    ///
138    /// ctx.register("df", df.clone().lazy());
139    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
140    /// assert!(sql_df.equals(&df));
141    /// # }
142    ///```
143    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144        let mut parser = Parser::new(&GenericDialect);
145        parser = parser.with_options(ParserOptions {
146            trailing_commas: true,
147            ..Default::default()
148        });
149
150        let ast = parser
151            .try_with_sql(query)
152            .map_err(to_sql_interface_err)?
153            .parse_statements()
154            .map_err(to_sql_interface_err)?;
155
156        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157        let res = self.execute_statement(ast.first().unwrap())?;
158
159        // Ensure the result uses the proper arenas.
160        // This will instantiate new arenas with a new version.
161        let lp_arena = std::mem::take(&mut self.lp_arena);
162        let expr_arena = std::mem::take(&mut self.expr_arena);
163        res.set_cached_arena(lp_arena, expr_arena);
164
165        // Every execution should clear the statement-level maps.
166        self.cte_map.borrow_mut().clear();
167        self.table_aliases.borrow_mut().clear();
168        self.joined_aliases.borrow_mut().clear();
169
170        Ok(res)
171    }
172
173    /// add a function registry to the SQLContext
174    /// the registry provides the ability to add custom functions to the SQLContext
175    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176        self.function_registry = function_registry;
177        self
178    }
179
180    /// Get the function registry of the SQLContext
181    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182        &self.function_registry
183    }
184
185    /// Get a mutable reference to the function registry of the SQLContext
186    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187        Arc::get_mut(&mut self.function_registry).unwrap()
188    }
189}
190
191impl SQLContext {
192    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193        let ast = stmt;
194        Ok(match ast {
195            Statement::Query(query) => self.execute_query(query)?,
196            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198            stmt @ Statement::Drop {
199                object_type: ObjectType::Table,
200                ..
201            } => self.execute_drop_table(stmt)?,
202            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205            _ => polars_bail!(
206                SQLInterface: "statement type is not supported:\n{:?}", ast,
207            ),
208        })
209    }
210
211    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212        self.register_ctes(query)?;
213        self.execute_query_no_ctes(query)
214    }
215
216    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217        let lf = self.process_query(&query.body, query)?;
218        self.process_limit_offset(lf, &query.limit, &query.offset)
219    }
220
221    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223    }
224
225    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226        let table = self.table_map.get(name).cloned();
227        table
228            .or_else(|| self.cte_map.borrow().get(name).cloned())
229            .or_else(|| {
230                self.table_aliases
231                    .borrow()
232                    .get(name)
233                    .and_then(|alias| self.table_map.get(alias).cloned())
234            })
235    }
236
237    fn expr_or_ordinal(
238        &mut self,
239        e: &SQLExpr,
240        exprs: &[Expr],
241        selected: Option<&[Expr]>,
242        schema: Option<&Schema>,
243        clause: &str,
244    ) -> PolarsResult<Expr> {
245        match e {
246            SQLExpr::UnaryOp {
247                op: UnaryOperator::Minus,
248                expr,
249            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251                    Err(polars_err!(
252                    SQLSyntax:
253                    "negative ordinal values are invalid for {}; found -{}",
254                    clause,
255                    idx
256                    ))
257                } else {
258                    unreachable!()
259                }
260            },
261            SQLExpr::Value(SQLValue::Number(idx, _)) => {
262                // note: sql queries are 1-indexed
263                let idx = idx.parse::<usize>().map_err(|_| {
264                    polars_err!(
265                        SQLSyntax:
266                        "negative ordinal values are invalid for {}; found {}",
267                        clause,
268                        idx
269                    )
270                })?;
271                // note: "selected" cols represent final projection order, so we use those for
272                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
273                let cols = if let Some(cols) = selected {
274                    cols
275                } else {
276                    exprs
277                };
278                Ok(cols
279                    .get(idx - 1)
280                    .ok_or_else(|| {
281                        polars_err!(
282                            SQLInterface:
283                            "{} ordinal value must refer to a valid column; found {}",
284                            clause,
285                            idx
286                        )
287                    })?
288                    .clone())
289            },
290            SQLExpr::Value(v) => Err(polars_err!(
291                SQLSyntax:
292                "{} requires a valid expression or positive ordinal; found {}", clause, v,
293            )),
294            _ => parse_sql_expr(e, self, schema),
295        }
296    }
297
298    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299        if self.joined_aliases.borrow().contains_key(tbl_name) {
300            self.joined_aliases
301                .borrow()
302                .get(tbl_name)
303                .and_then(|aliases| aliases.get(column_name))
304                .cloned()
305                .unwrap_or_else(|| column_name.to_string())
306        } else {
307            column_name.to_string()
308        }
309    }
310
311    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312        match expr {
313            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314            SetExpr::Query(query) => self.execute_query_no_ctes(query),
315            SetExpr::SetOperation {
316                op: SetOperator::Union,
317                set_quantifier,
318                left,
319                right,
320            } => self.process_union(left, right, set_quantifier, query),
321
322            #[cfg(feature = "semi_anti_join")]
323            SetExpr::SetOperation {
324                op: SetOperator::Intersect | SetOperator::Except,
325                set_quantifier,
326                left,
327                right,
328            } => self.process_except_intersect(left, right, set_quantifier, query),
329
330            SetExpr::Values(Values {
331                explicit_row: _,
332                rows,
333            }) => self.process_values(rows),
334
335            SetExpr::Table(tbl) => {
336                if tbl.table_name.is_some() {
337                    let table_name = tbl.table_name.as_ref().unwrap();
338                    self.get_table_from_current_scope(table_name)
339                        .ok_or_else(|| {
340                            polars_err!(
341                                SQLInterface: "no table or alias named '{}' found",
342                                tbl
343                            )
344                        })
345                } else {
346                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347                }
348            },
349            op => {
350                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351            },
352        }
353    }
354
355    #[cfg(feature = "semi_anti_join")]
356    fn process_except_intersect(
357        &mut self,
358        left: &SetExpr,
359        right: &SetExpr,
360        quantifier: &SetQuantifier,
361        query: &Query,
362    ) -> PolarsResult<LazyFrame> {
363        let (join_type, op_name) = match *query.body {
364            SetExpr::SetOperation {
365                op: SetOperator::Except,
366                ..
367            } => (JoinType::Anti, "EXCEPT"),
368            _ => (JoinType::Semi, "INTERSECT"),
369        };
370        let mut lf = self.process_query(left, query)?;
371        let mut rf = self.process_query(right, query)?;
372        let join = lf
373            .clone()
374            .join_builder()
375            .with(rf.clone())
376            .how(join_type)
377            .join_nulls(true);
378
379        let lf_schema = self.get_frame_schema(&mut lf)?;
380        let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381        let joined_tbl = match quantifier {
382            SetQuantifier::ByName => join.on(lf_cols).finish(),
383            SetQuantifier::Distinct | SetQuantifier::None => {
384                let rf_schema = self.get_frame_schema(&mut rf)?;
385                let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386                if lf_cols.len() != rf_cols.len() {
387                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388                }
389                join.left_on(lf_cols).right_on(rf_cols).finish()
390            },
391            _ => {
392                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393            },
394        };
395        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396    }
397
398    fn process_union(
399        &mut self,
400        left: &SetExpr,
401        right: &SetExpr,
402        quantifier: &SetQuantifier,
403        query: &Query,
404    ) -> PolarsResult<LazyFrame> {
405        let mut lf = self.process_query(left, query)?;
406        let mut rf = self.process_query(right, query)?;
407        let opts = UnionArgs {
408            parallel: true,
409            to_supertypes: true,
410            ..Default::default()
411        };
412        match quantifier {
413            // UNION [ALL | DISTINCT]
414            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415                let lf_schema = self.get_frame_schema(&mut lf)?;
416                let rf_schema = self.get_frame_schema(&mut rf)?;
417                if lf_schema.len() != rf_schema.len() {
418                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419                }
420                let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421                match quantifier {
422                    SetQuantifier::Distinct | SetQuantifier::None => {
423                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424                    },
425                    _ => concatenated,
426                }
427            },
428            // UNION ALL BY NAME
429            #[cfg(feature = "diagonal_concat")]
430            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431            // UNION [DISTINCT] BY NAME
432            #[cfg(feature = "diagonal_concat")]
433            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436            },
437            #[allow(unreachable_patterns)]
438            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439        }
440    }
441
442    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443        let frame_rows: Vec<Row> = values.iter().map(|row| {
444            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445                let expr = parse_sql_expr(expr, self, None)?;
446                match expr {
447                    Expr::Literal(value) => {
448                        value.to_any_value()
449                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450                            .map(|av| av.into_static())
451                    },
452                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453                }
454            }).collect();
455            row_data.map(Row::new)
456        }).collect::<Result<_, _>>()?;
457
458        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459    }
460
461    // EXPLAIN SELECT * FROM DF
462    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463        match stmt {
464            Statement::Explain { statement, .. } => {
465                let lf = self.execute_statement(statement)?;
466                let plan = lf.describe_optimized_plan()?;
467                let plan = plan
468                    .split('\n')
469                    .collect::<Series>()
470                    .with_name(PlSmallStr::from_static("Logical Plan"))
471                    .into_column();
472                let df = DataFrame::new(vec![plan])?;
473                Ok(df.lazy())
474            },
475            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476        }
477    }
478
479    // SHOW TABLES
480    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481        let tables = Column::new("name".into(), self.get_tables());
482        let df = DataFrame::new(vec![tables])?;
483        Ok(df.lazy())
484    }
485
486    // DROP TABLE <tbl>
487    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488        match stmt {
489            Statement::Drop { names, .. } => {
490                names.iter().for_each(|name| {
491                    self.table_map.remove(&name.to_string());
492                });
493                Ok(DataFrame::empty().lazy())
494            },
495            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496        }
497    }
498
499    // DELETE FROM <tbl> [WHERE ...]
500    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501        if let Statement::Delete(Delete {
502            tables,
503            from,
504            using,
505            selection,
506            returning,
507            order_by,
508            limit,
509        }) = stmt
510        {
511            if !tables.is_empty()
512                || using.is_some()
513                || returning.is_some()
514                || limit.is_some()
515                || !order_by.is_empty()
516            {
517                let error_message = match () {
518                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
519                    _ if using.is_some() => "DELETE does not support the USING clause",
520                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523                    _ => unreachable!(),
524                };
525                polars_bail!(SQLInterface: error_message);
526            }
527            let from_tables = match &from {
528                FromTable::WithFromKeyword(from) => from,
529                FromTable::WithoutKeyword(from) => from,
530            };
531            if from_tables.len() > 1 {
532                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533            }
534            let tbl_expr = from_tables.first().unwrap();
535            if !tbl_expr.joins.is_empty() {
536                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537            }
538            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539            if selection.is_none() {
540                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
541                Ok(DataFrame::empty_with_schema(
542                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543                        .unwrap()
544                        .as_ref(),
545                )
546                .lazy())
547            } else {
548                // apply constraint as inverted filter (drops rows matching the selection)
549                Ok(self.process_where(lf.clone(), selection, true)?)
550            }
551        } else {
552            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553        }
554    }
555
556    // TRUNCATE <tbl>
557    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558        if let Statement::Truncate {
559            table_names,
560            partitions,
561            ..
562        } = stmt
563        {
564            match partitions {
565                None => {
566                    if table_names.len() != 1 {
567                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568                    }
569                    let tbl = table_names[0].to_string();
570                    if let Some(lf) = self.table_map.get_mut(&tbl) {
571                        *lf = DataFrame::empty_with_schema(
572                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573                                .unwrap()
574                                .as_ref(),
575                        )
576                        .lazy();
577                        Ok(lf.clone())
578                    } else {
579                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580                    }
581                },
582                _ => {
583                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584                },
585            }
586        } else {
587            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588        }
589    }
590
591    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592        self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593    }
594
595    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596        if let Some(with) = &query.with {
597            if with.recursive {
598                polars_bail!(SQLInterface: "recursive CTEs are not supported")
599            }
600            for cte in &with.cte_tables {
601                let cte_name = cte.alias.name.value.clone();
602                let mut lf = self.execute_query(&cte.query)?;
603                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604                self.register_cte(&cte_name, lf);
605            }
606        }
607        Ok(())
608    }
609
610    /// execute the 'FROM' part of the query
611    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613        if !tbl_expr.joins.is_empty() {
614            for join in &tbl_expr.joins {
615                let (r_name, mut rf) = self.get_table(&join.relation)?;
616                if r_name.is_empty() {
617                    // Require non-empty to avoid duplicate column errors from nested self-joins.
618                    polars_bail!(
619                        SQLInterface:
620                        "cannot join on unnamed relation; please provide an alias"
621                    )
622                }
623                let left_schema = self.get_frame_schema(&mut lf)?;
624                let right_schema = self.get_frame_schema(&mut rf)?;
625
626                lf = match &join.join_operator {
627                    op @ (JoinOperator::FullOuter(constraint)
628                    | JoinOperator::LeftOuter(constraint)
629                    | JoinOperator::RightOuter(constraint)
630                    | JoinOperator::Inner(constraint)
631                    | JoinOperator::Anti(constraint)
632                    | JoinOperator::Semi(constraint)
633                    | JoinOperator::LeftAnti(constraint)
634                    | JoinOperator::LeftSemi(constraint)
635                    | JoinOperator::RightAnti(constraint)
636                    | JoinOperator::RightSemi(constraint)) => {
637                        let (lf, rf) = match op {
638                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639                            _ => (lf, rf),
640                        };
641                        self.process_join(
642                            &TableInfo {
643                                frame: lf,
644                                name: (&l_name).into(),
645                                schema: left_schema.clone(),
646                            },
647                            &TableInfo {
648                                frame: rf,
649                                name: (&r_name).into(),
650                                schema: right_schema.clone(),
651                            },
652                            constraint,
653                            match op {
654                                JoinOperator::FullOuter(_) => JoinType::Full,
655                                JoinOperator::LeftOuter(_) => JoinType::Left,
656                                JoinOperator::RightOuter(_) => JoinType::Right,
657                                JoinOperator::Inner(_) => JoinType::Inner,
658                                #[cfg(feature = "semi_anti_join")]
659                                JoinOperator::Anti(_)
660                                | JoinOperator::LeftAnti(_)
661                                | JoinOperator::RightAnti(_) => JoinType::Anti,
662                                #[cfg(feature = "semi_anti_join")]
663                                JoinOperator::Semi(_)
664                                | JoinOperator::LeftSemi(_)
665                                | JoinOperator::RightSemi(_) => JoinType::Semi,
666                                join_type => polars_bail!(
667                                    SQLInterface:
668                                    "join type '{:?}' not currently supported",
669                                    join_type
670                                ),
671                            },
672                        )?
673                    },
674                    JoinOperator::CrossJoin => {
675                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676                    },
677                    join_type => {
678                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679                    },
680                };
681
682                // track join-aliased columns so we can resolve them later
683                let joined_schema = self.get_frame_schema(&mut lf)?;
684
685                self.joined_aliases.borrow_mut().insert(
686                    r_name.clone(),
687                    right_schema
688                        .iter_names()
689                        .filter_map(|name| {
690                            // col exists in both tables and is aliased in the joined result
691                            let aliased_name = format!("{name}:{r_name}");
692                            if left_schema.contains(name)
693                                && joined_schema.contains(aliased_name.as_str())
694                            {
695                                Some((name.to_string(), aliased_name))
696                            } else {
697                                None
698                            }
699                        })
700                        .collect::<PlHashMap<String, String>>(),
701                );
702            }
703        };
704        Ok(lf)
705    }
706
707    /// Execute the 'SELECT' part of the query.
708    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709        let mut lf = if select_stmt.from.is_empty() {
710            DataFrame::empty().lazy()
711        } else {
712            // Note: implicit joins need more work to support properly,
713            // explicit joins are preferred for now (ref: #16662)
714            let from = select_stmt.clone().from;
715            if from.len() > 1 {
716                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717            }
718            self.execute_from_statement(from.first().unwrap())?
719        };
720
721        // Filter expression (WHERE clause)
722        let schema = self.get_frame_schema(&mut lf)?;
723        lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725        // 'SELECT *' modifiers
726        let mut select_modifiers = SelectModifiers {
727            ilike: None,
728            exclude: PlHashSet::new(),
729            rename: PlHashMap::new(),
730            replace: vec![],
731        };
732
733        let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735        // Check for "GROUP BY ..." (after determining projections)
736        let mut group_by_keys: Vec<Expr> = Vec::new();
737        match &select_stmt.group_by {
738            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
739            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740                if !modifiers.is_empty() {
741                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742                }
743                // translate the group expressions, allowing ordinal values
744                group_by_keys = group_by_exprs
745                    .iter()
746                    .map(|e| {
747                        self.expr_or_ordinal(
748                            e,
749                            &projections,
750                            None,
751                            Some(schema.deref()),
752                            "GROUP BY",
753                        )
754                    })
755                    .collect::<PolarsResult<_>>()?
756            },
757            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
758            // nested agg/window funcs to the group key (also ignores literals).
759            GroupByExpr::All(modifiers) => {
760                if !modifiers.is_empty() {
761                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762                }
763                projections.iter().for_each(|expr| match expr {
764                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
765                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766                    Expr::Column(_) => group_by_keys.push(expr.clone()),
767                    Expr::Alias(e, _)
768                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770                        if let Expr::Column(name) = &**e {
771                            group_by_keys.push(col(name.clone()));
772                        }
773                    },
774                    _ => {
775                        // If not quick-matched, add if no nested agg/window expressions
776                        if !has_expr(expr, |e| {
777                            matches!(e, Expr::Agg(_))
778                                || matches!(e, Expr::Len)
779                                || matches!(e, Expr::Window { .. })
780                        }) {
781                            group_by_keys.push(expr.clone())
782                        }
783                    },
784                });
785            },
786        };
787
788        lf = if group_by_keys.is_empty() {
789            // The 'having' clause is only valid inside 'group by'
790            if select_stmt.having.is_some() {
791                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792            };
793
794            // Final/selected cols, accounting for 'SELECT *' modifiers
795            let mut retained_cols = Vec::with_capacity(projections.len());
796            let mut retained_names = Vec::with_capacity(projections.len());
797            let have_order_by = query.order_by.is_some();
798            // Initialize containing InheritsContext to handle empty projection case.
799            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801            // Note: if there is an 'order by' then we project everything (original cols
802            // and new projections) and *then* select the final cols; the retained cols
803            // are used to ensure a correct final projection. If there's no 'order by',
804            // clause then we can project the final column *expressions* directly.
805            for p in projections.iter() {
806                let name = p
807                    .to_field(schema.deref(), Context::Default)?
808                    .name
809                    .to_string();
810                if select_modifiers.matches_ilike(&name)
811                    && !select_modifiers.exclude.contains(&name)
812                {
813                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815                    retained_cols.push(if have_order_by {
816                        col(name.as_str())
817                    } else {
818                        p.clone()
819                    });
820                    retained_names.push(col(name));
821                }
822            }
823
824            // Apply the remaining modifiers and establish the final projection
825            if have_order_by {
826                // We can safely use `with_columns()` and avoid a join if:
827                // * There is already a projection that projects to the table height.
828                // * All projection heights inherit from context (e.g. all scalar literals that
829                //   are to be broadcasted to table height).
830                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832                {
833                    lf = lf.with_columns(projections);
834                } else {
835                    // We hit this branch if the output height is not guaranteed to match the table
836                    // height. E.g.:
837                    //
838                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
839                    // * SELECT UNNEST(list_col) FROM df ORDER BY sort_key;
840                    //
841                    // For these cases we truncate / extend the sorting columns with NULLs to match
842                    // the output height. We do this by projecting independently and then joining
843                    // back the original frame on the row index.
844                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845                    lf = lf
846                        .clone()
847                        .select(projections)
848                        .with_row_index(NAME, None)
849                        .join(
850                            lf.with_row_index(NAME, None),
851                            [col(NAME)],
852                            [col(NAME)],
853                            JoinArgs {
854                                how: JoinType::Left,
855                                validation: Default::default(),
856                                suffix: None,
857                                slice: None,
858                                nulls_equal: false,
859                                coalesce: Default::default(),
860                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861                            },
862                        );
863                }
864            }
865
866            if !select_modifiers.replace.is_empty() {
867                lf = lf.with_columns(&select_modifiers.replace);
868            }
869            if !select_modifiers.rename.is_empty() {
870                lf = lf.with_columns(select_modifiers.renamed_cols());
871            }
872
873            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875            // Note: If `have_order_by`, with_columns is already done above.
876            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877                && !have_order_by
878            {
879                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
880                lf = lf.with_columns(retained_cols).select(retained_names);
881            } else {
882                lf = lf.select(retained_cols);
883            }
884
885            if !select_modifiers.rename.is_empty() {
886                lf = lf.rename(
887                    select_modifiers.rename.keys(),
888                    select_modifiers.rename.values(),
889                    true,
890                );
891            };
892            lf
893        } else {
894            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895            lf = self.process_order_by(lf, &query.order_by, None)?;
896
897            // Apply optional 'having' clause, post-aggregation.
898            let schema = Some(self.get_frame_schema(&mut lf)?);
899            match select_stmt.having.as_ref() {
900                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901                None => lf,
902            }
903        };
904
905        // Apply optional DISTINCT clause.
906        lf = match &select_stmt.distinct {
907            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908            Some(Distinct::On(exprs)) => {
909                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
910                let schema = Some(self.get_frame_schema(&mut lf)?);
911                let cols = exprs
912                    .iter()
913                    .map(|e| {
914                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
915                        if let Expr::Column(name) = expr {
916                            Ok(name)
917                        } else {
918                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919                        }
920                    })
921                    .collect::<PolarsResult<Vec<_>>>()?;
922
923                // DISTINCT ON has to apply the ORDER BY before the operation.
924                lf = self.process_order_by(lf, &query.order_by, None)?;
925                return Ok(lf.unique_stable(
926                    Some(Selector::ByName {
927                        names: cols.into(),
928                        strict: true,
929                    }),
930                    UniqueKeepStrategy::First,
931                ));
932            },
933            None => lf,
934        };
935        Ok(lf)
936    }
937
938    fn column_projections(
939        &mut self,
940        select_stmt: &Select,
941        schema: &SchemaRef,
942        select_modifiers: &mut SelectModifiers,
943    ) -> PolarsResult<Vec<Expr>> {
944        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
945            .projection
946            .iter()
947            .map(|select_item| match select_item {
948                SelectItem::UnnamedExpr(expr) => {
949                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
950                },
951                SelectItem::ExprWithAlias { expr, alias } => {
952                    let expr = parse_sql_expr(expr, self, Some(schema))?;
953                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
954                },
955                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
956                    .process_qualified_wildcard(
957                        obj_name,
958                        wildcard_options,
959                        select_modifiers,
960                        Some(schema),
961                    ),
962                SelectItem::Wildcard(wildcard_options) => {
963                    let cols = schema
964                        .iter_names()
965                        .map(|name| col(name.clone()))
966                        .collect::<Vec<_>>();
967
968                    self.process_wildcard_additional_options(
969                        cols,
970                        wildcard_options,
971                        select_modifiers,
972                        Some(schema),
973                    )
974                },
975            })
976            .collect();
977
978        let flattened_exprs: Vec<Expr> = parsed_items?
979            .into_iter()
980            .flatten()
981            .flat_map(|expr| expand_exprs(expr, schema))
982            .collect();
983
984        Ok(flattened_exprs)
985    }
986
987    fn process_where(
988        &mut self,
989        mut lf: LazyFrame,
990        expr: &Option<SQLExpr>,
991        invert_filter: bool,
992    ) -> PolarsResult<LazyFrame> {
993        if let Some(expr) = expr {
994            let schema = self.get_frame_schema(&mut lf)?;
995
996            // shortcut filter evaluation if given expression is just TRUE or FALSE
997            let (all_true, all_false) = match expr {
998                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
999                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1000                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
1001                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1002                        (a != b, a == b)
1003                    },
1004                    _ => (false, false),
1005                },
1006                _ => (false, false),
1007            };
1008            if (all_true && !invert_filter) || (all_false && invert_filter) {
1009                return Ok(lf);
1010            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1011                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1012            }
1013
1014            // ...otherwise parse and apply the filter as normal
1015            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1016            if filter_expression.clone().meta().has_multiple_outputs() {
1017                filter_expression = all_horizontal([filter_expression])?;
1018            }
1019            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1020            lf = if invert_filter {
1021                lf.remove(filter_expression)
1022            } else {
1023                lf.filter(filter_expression)
1024            };
1025        }
1026        Ok(lf)
1027    }
1028
1029    pub(super) fn process_join(
1030        &mut self,
1031        tbl_left: &TableInfo,
1032        tbl_right: &TableInfo,
1033        constraint: &JoinConstraint,
1034        join_type: JoinType,
1035    ) -> PolarsResult<LazyFrame> {
1036        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1037
1038        let joined = tbl_left
1039            .frame
1040            .clone()
1041            .join_builder()
1042            .with(tbl_right.frame.clone())
1043            .left_on(left_on)
1044            .right_on(right_on)
1045            .how(join_type)
1046            .suffix(format!(":{}", tbl_right.name))
1047            .coalesce(JoinCoalesce::KeepColumns)
1048            .finish();
1049
1050        Ok(joined)
1051    }
1052
1053    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1054        let mut contexts = vec![];
1055        for expr in exprs {
1056            *expr = expr.clone().map_expr(|e| match e {
1057                Expr::SubPlan(lp, names) => {
1058                    contexts.push(<LazyFrame>::from((**lp).clone()));
1059                    if names.len() == 1 {
1060                        Expr::Column(names[0].as_str().into())
1061                    } else {
1062                        Expr::SubPlan(lp, names)
1063                    }
1064                },
1065                e => e,
1066            })
1067        }
1068
1069        if contexts.is_empty() {
1070            lf
1071        } else {
1072            lf.with_context(contexts)
1073        }
1074    }
1075
1076    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1077        if let Statement::CreateTable(CreateTable {
1078            if_not_exists,
1079            name,
1080            query,
1081            ..
1082        }) = stmt
1083        {
1084            let tbl_name = name.0.first().unwrap().value.as_str();
1085            // CREATE TABLE IF NOT EXISTS
1086            if *if_not_exists && self.table_map.contains_key(tbl_name) {
1087                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1088                // CREATE OR REPLACE TABLE
1089            }
1090            if let Some(query) = query {
1091                let lf = self.execute_query(query)?;
1092                self.register(tbl_name, lf);
1093                let out = df! {
1094                    "Response" => ["CREATE TABLE"]
1095                }
1096                .unwrap()
1097                .lazy();
1098                Ok(out)
1099            } else {
1100                polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1101            }
1102        } else {
1103            unreachable!()
1104        }
1105    }
1106
1107    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1108        match relation {
1109            TableFactor::Table {
1110                name, alias, args, ..
1111            } => {
1112                if let Some(args) = args {
1113                    return self.execute_table_function(name, alias, &args.args);
1114                }
1115                let tbl_name = name.0.first().unwrap().value.as_str();
1116                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1117                    match alias {
1118                        Some(alias) => {
1119                            self.table_aliases
1120                                .borrow_mut()
1121                                .insert(alias.name.value.clone(), tbl_name.to_string());
1122                            Ok((alias.to_string(), lf))
1123                        },
1124                        None => Ok((tbl_name.to_string(), lf)),
1125                    }
1126                } else {
1127                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1128                }
1129            },
1130            TableFactor::Derived {
1131                lateral,
1132                subquery,
1133                alias,
1134            } => {
1135                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1136                if let Some(alias) = alias {
1137                    let mut lf = self.execute_query_no_ctes(subquery)?;
1138                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1139                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1140                    Ok((alias.name.value.clone(), lf))
1141                } else {
1142                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1143                }
1144            },
1145            TableFactor::UNNEST {
1146                alias,
1147                array_exprs,
1148                with_offset,
1149                with_offset_alias: _,
1150                ..
1151            } => {
1152                if let Some(alias) = alias {
1153                    let column_names: Vec<Option<PlSmallStr>> = alias
1154                        .columns
1155                        .iter()
1156                        .map(|c| {
1157                            if c.name.value.is_empty() {
1158                                None
1159                            } else {
1160                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1161                            }
1162                        })
1163                        .collect();
1164
1165                    let column_values: Vec<Series> = array_exprs
1166                        .iter()
1167                        .map(|arr| parse_sql_array(arr, self))
1168                        .collect::<Result<_, _>>()?;
1169
1170                    polars_ensure!(!column_names.is_empty(),
1171                        SQLSyntax:
1172                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1173                    );
1174                    if column_names.len() != column_values.len() {
1175                        let plural = if column_values.len() > 1 { "s" } else { "" };
1176                        polars_bail!(
1177                            SQLSyntax:
1178                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1179                        );
1180                    }
1181                    let column_series: Vec<Column> = column_values
1182                        .into_iter()
1183                        .zip(column_names)
1184                        .map(|(s, name)| {
1185                            if let Some(name) = name {
1186                                s.with_name(name)
1187                            } else {
1188                                s
1189                            }
1190                        })
1191                        .map(Column::from)
1192                        .collect();
1193
1194                    let lf = DataFrame::new(column_series)?.lazy();
1195
1196                    if *with_offset {
1197                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1198                        //  (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1199                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1200                    }
1201                    let table_name = alias.name.value.clone();
1202                    self.table_map.insert(table_name.clone(), lf.clone());
1203                    Ok((table_name, lf))
1204                } else {
1205                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1206                }
1207            },
1208            TableFactor::NestedJoin {
1209                table_with_joins,
1210                alias,
1211            } => {
1212                let lf = self.execute_from_statement(table_with_joins)?;
1213                match alias {
1214                    Some(a) => Ok((a.name.value.clone(), lf)),
1215                    None => Ok(("".to_string(), lf)),
1216                }
1217            },
1218            // Support bare table, optionally with an alias, for now
1219            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1220        }
1221    }
1222
1223    fn execute_table_function(
1224        &mut self,
1225        name: &ObjectName,
1226        alias: &Option<TableAlias>,
1227        args: &[FunctionArg],
1228    ) -> PolarsResult<(String, LazyFrame)> {
1229        let tbl_fn = name.0.first().unwrap().value.as_str();
1230        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1231        let (tbl_name, lf) = read_fn.execute(args)?;
1232        #[allow(clippy::useless_asref)]
1233        let tbl_name = alias
1234            .as_ref()
1235            .map(|a| a.name.value.clone())
1236            .unwrap_or_else(|| tbl_name.to_str().to_string());
1237
1238        self.table_map.insert(tbl_name.clone(), lf.clone());
1239        Ok((tbl_name, lf))
1240    }
1241
1242    fn process_order_by(
1243        &mut self,
1244        mut lf: LazyFrame,
1245        order_by: &Option<OrderBy>,
1246        selected: Option<&[Expr]>,
1247    ) -> PolarsResult<LazyFrame> {
1248        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1249            return Ok(lf);
1250        }
1251        let schema = self.get_frame_schema(&mut lf)?;
1252        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1253
1254        let order_by = order_by.as_ref().unwrap().exprs.clone();
1255        let mut descending = Vec::with_capacity(order_by.len());
1256        let mut nulls_last = Vec::with_capacity(order_by.len());
1257        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1258
1259        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1260            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1261        {
1262            if let Some(selected) = selected {
1263                by.extend(selected.iter().cloned());
1264            } else {
1265                by.extend(columns_iter);
1266            };
1267            let desc_order = !order_by[0].asc.unwrap_or(true);
1268            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1269            descending.resize(by.len(), desc_order);
1270        } else {
1271            let columns = &columns_iter.collect::<Vec<_>>();
1272            for ob in order_by {
1273                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1274                // https://www.postgresql.org/docs/current/queries-order.html
1275                let desc_order = !ob.asc.unwrap_or(true);
1276                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1277                descending.push(desc_order);
1278
1279                // translate order expression, allowing ordinal values
1280                by.push(self.expr_or_ordinal(
1281                    &ob.expr,
1282                    columns,
1283                    selected,
1284                    Some(&schema),
1285                    "ORDER BY",
1286                )?)
1287            }
1288        }
1289        Ok(lf.sort_by_exprs(
1290            &by,
1291            SortMultipleOptions::default()
1292                .with_order_descending_multi(descending)
1293                .with_nulls_last_multi(nulls_last)
1294                .with_maintain_order(true),
1295        ))
1296    }
1297
1298    fn process_group_by(
1299        &mut self,
1300        mut lf: LazyFrame,
1301        group_by_keys: &[Expr],
1302        projections: &[Expr],
1303    ) -> PolarsResult<LazyFrame> {
1304        let schema_before = self.get_frame_schema(&mut lf)?;
1305        let group_by_keys_schema =
1306            expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1307
1308        // Remove the group_by keys as polars adds those implicitly.
1309        let mut aggregation_projection = Vec::with_capacity(projections.len());
1310        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1311        let mut projection_aliases = PlHashSet::new();
1312        let mut group_key_aliases = PlHashSet::new();
1313
1314        for mut e in projections {
1315            // `Len` represents COUNT(*) so we treat as an aggregation here.
1316            let is_agg_or_window = has_expr(e, |e| {
1317                matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1318            });
1319
1320            let mut is_function_under_alias = false;
1321
1322            // Note: if simple aliased expression we defer aliasing until after the group_by.
1323            if let Expr::Alias(expr, alias) = e {
1324                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1325                    group_key_aliases.insert(alias.as_ref());
1326                    e = expr
1327                } else if let Expr::Function {
1328                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1329                    ..
1330                } = expr.deref()
1331                {
1332                    projection_overrides
1333                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1334                } else if let Expr::Function { .. } = expr.deref() {
1335                    is_function_under_alias = true;
1336                } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1337                    projection_aliases.insert(alias.as_ref());
1338                }
1339            }
1340            let field = e.to_field(&schema_before, Context::Default)?;
1341            if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1342                let mut e = e.clone();
1343                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1344                    e = (**expr).clone();
1345                } else if let Expr::Alias(expr, name) = &e {
1346                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1347                        e = (**expr).clone().alias(name.clone());
1348                    }
1349                }
1350                aggregation_projection.push(e);
1351            } else if let Expr::Column(_)
1352            | Expr::Function {
1353                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1354                ..
1355            } = e
1356            {
1357                // Non-aggregated columns must be part of the GROUP BY clause
1358                if !group_by_keys_schema.contains(&field.name) {
1359                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1360                }
1361            } else if is_function_under_alias || matches!(e, Expr::Function { .. }) {
1362                aggregation_projection.push(e.clone());
1363            } else if let Expr::Literal { .. }
1364            | Expr::Cast { .. }
1365            | Expr::Ternary { .. }
1366            | Expr::Field { .. }
1367            | Expr::Alias { .. } = e
1368            {
1369                // do nothing
1370            } else {
1371                polars_bail!(SQLSyntax: "Unsupported operation in the GROUP BY clause: {}", e);
1372            }
1373        }
1374        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1375        let projection_schema =
1376            expressions_to_schema(projections, &schema_before, Context::Default)?;
1377
1378        // A final projection to get the proper order and any deferred transforms/aliases.
1379        let final_projection = projection_schema
1380            .iter_names()
1381            .zip(projections)
1382            .map(|(name, projection_expr)| {
1383                if let Some(expr) = projection_overrides.get(name.as_str()) {
1384                    expr.clone()
1385                } else if group_by_keys_schema.get(name).is_some()
1386                    || projection_aliases.contains(name.as_str())
1387                    || group_key_aliases.contains(name.as_str())
1388                {
1389                    projection_expr.clone()
1390                } else {
1391                    col(name.clone())
1392                }
1393            })
1394            .collect::<Vec<_>>();
1395
1396        Ok(aggregated.select(&final_projection))
1397    }
1398
1399    fn process_limit_offset(
1400        &self,
1401        lf: LazyFrame,
1402        limit: &Option<SQLExpr>,
1403        offset: &Option<Offset>,
1404    ) -> PolarsResult<LazyFrame> {
1405        match (offset, limit) {
1406            (
1407                Some(Offset {
1408                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1409                    ..
1410                }),
1411                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1412            ) => Ok(lf.slice(
1413                offset
1414                    .parse()
1415                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1416                limit
1417                    .parse()
1418                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1419            )),
1420            (
1421                Some(Offset {
1422                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1423                    ..
1424                }),
1425                None,
1426            ) => Ok(lf.slice(
1427                offset
1428                    .parse()
1429                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1430                IdxSize::MAX,
1431            )),
1432            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1433                limit
1434                    .parse()
1435                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1436            )),
1437            (None, None) => Ok(lf),
1438            _ => polars_bail!(
1439                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1440            ),
1441        }
1442    }
1443
1444    fn process_qualified_wildcard(
1445        &mut self,
1446        ObjectName(idents): &ObjectName,
1447        options: &WildcardAdditionalOptions,
1448        modifiers: &mut SelectModifiers,
1449        schema: Option<&Schema>,
1450    ) -> PolarsResult<Vec<Expr>> {
1451        let mut new_idents = idents.clone();
1452        new_idents.push(Ident::new("*"));
1453
1454        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1455        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1456    }
1457
1458    fn process_wildcard_additional_options(
1459        &mut self,
1460        exprs: Vec<Expr>,
1461        options: &WildcardAdditionalOptions,
1462        modifiers: &mut SelectModifiers,
1463        schema: Option<&Schema>,
1464    ) -> PolarsResult<Vec<Expr>> {
1465        if options.opt_except.is_some() && options.opt_exclude.is_some() {
1466            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1467        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1468            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1469        }
1470
1471        // SELECT * EXCLUDE
1472        if let Some(items) = &options.opt_exclude {
1473            match items {
1474                ExcludeSelectItem::Single(ident) => {
1475                    modifiers.exclude.insert(ident.value.clone());
1476                },
1477                ExcludeSelectItem::Multiple(idents) => {
1478                    modifiers
1479                        .exclude
1480                        .extend(idents.iter().map(|i| i.value.clone()));
1481                },
1482            };
1483        }
1484
1485        // SELECT * EXCEPT
1486        if let Some(items) = &options.opt_except {
1487            modifiers.exclude.insert(items.first_element.value.clone());
1488            modifiers
1489                .exclude
1490                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1491        }
1492
1493        // SELECT * ILIKE
1494        if let Some(item) = &options.opt_ilike {
1495            let rx = regex::escape(item.pattern.as_str())
1496                .replace('%', ".*")
1497                .replace('_', ".");
1498
1499            modifiers.ilike = Some(
1500                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1501            );
1502        }
1503
1504        // SELECT * RENAME
1505        if let Some(items) = &options.opt_rename {
1506            let renames = match items {
1507                RenameSelectItem::Single(rename) => vec![rename],
1508                RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1509            };
1510            for rn in renames {
1511                let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1512                let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1513                if before != after {
1514                    modifiers.rename.insert(before, after);
1515                }
1516            }
1517        }
1518
1519        // SELECT * REPLACE
1520        if let Some(replacements) = &options.opt_replace {
1521            for rp in &replacements.items {
1522                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1523                modifiers
1524                    .replace
1525                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1526            }
1527        }
1528        Ok(exprs)
1529    }
1530
1531    fn rename_columns_from_table_alias(
1532        &mut self,
1533        mut lf: LazyFrame,
1534        alias: &TableAlias,
1535    ) -> PolarsResult<LazyFrame> {
1536        if alias.columns.is_empty() {
1537            Ok(lf)
1538        } else {
1539            let schema = self.get_frame_schema(&mut lf)?;
1540            if alias.columns.len() != schema.len() {
1541                polars_bail!(
1542                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1543                    alias.columns.len(), alias.name.value, schema.len()
1544                )
1545            } else {
1546                let existing_columns: Vec<_> = schema.iter_names().collect();
1547                let new_columns: Vec<_> =
1548                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
1549                Ok(lf.rename(existing_columns, new_columns, true))
1550            }
1551        }
1552    }
1553}
1554
1555impl SQLContext {
1556    /// Get internal table map. For internal use only.
1557    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1558        self.table_map.clone()
1559    }
1560
1561    /// Create a new SQLContext from a table map. For internal use only
1562    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1563        Self {
1564            table_map,
1565            ..Default::default()
1566        }
1567    }
1568}
1569
1570fn collect_compound_identifiers(
1571    left: &[Ident],
1572    right: &[Ident],
1573    left_name: &str,
1574    right_name: &str,
1575) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1576    if left.len() == 2 && right.len() == 2 {
1577        let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1578        let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1579
1580        // switch left/right operands if the caller has them in reverse
1581        if left_name == tbl_b || right_name == tbl_a {
1582            Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1583        } else {
1584            Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1585        }
1586    } else {
1587        polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1588    }
1589}
1590
1591fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1592    match expr {
1593        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1594            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1595            schema
1596                .iter_names()
1597                .filter(|name| re.is_match(name))
1598                .map(|name| col(name.clone()))
1599                .collect::<Vec<_>>()
1600        },
1601        Expr::Selector(s) => s
1602            .into_columns(schema, &Default::default())
1603            .unwrap()
1604            .into_iter()
1605            .map(col)
1606            .collect::<Vec<_>>(),
1607        _ => vec![expr],
1608    }
1609}
1610
1611fn is_regex_colname(nm: &str) -> bool {
1612    nm.starts_with('^') && nm.ends_with('$')
1613}
1614
1615fn process_join_on(
1616    expression: &sqlparser::ast::Expr,
1617    tbl_left: &TableInfo,
1618    tbl_right: &TableInfo,
1619) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1620    match expression {
1621        SQLExpr::BinaryOp { left, op, right } => match op {
1622            BinaryOperator::And => {
1623                let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1624                let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1625                left_i.append(&mut left_j);
1626                right_i.append(&mut right_j);
1627                Ok((left_i, right_i))
1628            },
1629            BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1630                (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1631                    collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1632                },
1633                _ => {
1634                    polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1635                },
1636            },
1637            _ => {
1638                polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1639            },
1640        },
1641        SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1642        _ => {
1643            polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1644        },
1645    }
1646}
1647
1648fn process_join_constraint(
1649    constraint: &JoinConstraint,
1650    tbl_left: &TableInfo,
1651    tbl_right: &TableInfo,
1652) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1653    match constraint {
1654        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1655            process_join_on(expr, tbl_left, tbl_right)
1656        },
1657        JoinConstraint::Using(idents) if !idents.is_empty() => {
1658            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1659            Ok((using.clone(), using))
1660        },
1661        JoinConstraint::Natural => {
1662            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1663            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1664            let on: Vec<Expr> = left_names
1665                .intersection(&right_names)
1666                .map(|&name| col(name.clone()))
1667                .collect();
1668            if on.is_empty() {
1669                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1670            }
1671            Ok((on.clone(), on))
1672        },
1673        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1674    }
1675}
1676
1677bitflags::bitflags! {
1678    /// Bitfield indicating whether there exists a projection with the specified height behavior.
1679    ///
1680    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
1681    /// context.
1682    #[derive(PartialEq)]
1683    struct ExprSqlProjectionHeightBehavior: u8 {
1684        /// Maintains the height of input column(s)
1685        const MaintainsColumn = 1 << 0;
1686        /// Height is independent of input, e.g.:
1687        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
1688        /// * aggregations: count(*), first(), sum() etc.
1689        const Independent = 1 << 1;
1690        /// "Inherits" the height of the context, e.g.:
1691        /// * Scalar literals
1692        const InheritsContext = 1 << 2;
1693    }
1694}
1695
1696impl ExprSqlProjectionHeightBehavior {
1697    fn identify_from_expr(expr: &Expr) -> Self {
1698        let mut has_column = false;
1699        let mut has_independent = false;
1700
1701        for e in expr.into_iter() {
1702            use Expr::*;
1703
1704            has_column |= matches!(e, Column(_) | Selector(_));
1705
1706            has_independent |= match e {
1707                // @TODO: This is broken now with functions.
1708                AnonymousFunction { options, .. } => {
1709                    options.returns_scalar() || !options.is_length_preserving()
1710                },
1711
1712                Literal(v) => !v.is_scalar(),
1713
1714                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1715
1716                Agg { .. } | Len => true,
1717
1718                _ => false,
1719            }
1720        }
1721
1722        if has_independent {
1723            Self::Independent
1724        } else if has_column {
1725            Self::MaintainsColumn
1726        } else {
1727            Self::InheritsContext
1728        }
1729    }
1730}