polars_sql/
context.rs

1use std::ops::Deref;
2
3use polars_core::frame::row::Row;
4use polars_core::prelude::*;
5use polars_lazy::prelude::*;
6use polars_ops::frame::JoinCoalesce;
7use polars_plan::dsl::function_expr::StructFunction;
8use polars_plan::prelude::*;
9use polars_utils::format_pl_smallstr;
10use sqlparser::ast::{
11    BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
12    FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
13    OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
14    Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
15    WildcardAdditionalOptions,
16};
17use sqlparser::dialect::GenericDialect;
18use sqlparser::parser::{Parser, ParserOptions};
19
20use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
21use crate::sql_expr::{
22    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
23};
24use crate::table_functions::PolarsTableFunctions;
25
26#[derive(Clone)]
27pub struct TableInfo {
28    pub(crate) frame: LazyFrame,
29    pub(crate) name: PlSmallStr,
30    pub(crate) schema: Arc<Schema>,
31}
32
33struct SelectModifiers {
34    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
35    ilike: Option<regex::Regex>,               // SELECT * ILIKE
36    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
37    replace: Vec<Expr>,                        // SELECT * REPLACE
38}
39impl SelectModifiers {
40    fn matches_ilike(&self, s: &str) -> bool {
41        match &self.ilike {
42            Some(rx) => rx.is_match(s),
43            None => true,
44        }
45    }
46    fn renamed_cols(&self) -> Vec<Expr> {
47        self.rename
48            .iter()
49            .map(|(before, after)| col(before.clone()).alias(after.clone()))
50            .collect()
51    }
52}
53
54/// The SQLContext is the main entry point for executing SQL queries.
55#[derive(Clone)]
56pub struct SQLContext {
57    pub(crate) table_map: PlHashMap<String, LazyFrame>,
58    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
59    pub(crate) lp_arena: Arena<IR>,
60    pub(crate) expr_arena: Arena<AExpr>,
61
62    cte_map: PlHashMap<String, LazyFrame>,
63    table_aliases: PlHashMap<String, String>,
64    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
65}
66
67impl Default for SQLContext {
68    fn default() -> Self {
69        Self {
70            function_registry: Arc::new(DefaultFunctionRegistry {}),
71            table_map: Default::default(),
72            cte_map: Default::default(),
73            table_aliases: Default::default(),
74            joined_aliases: Default::default(),
75            lp_arena: Default::default(),
76            expr_arena: Default::default(),
77        }
78    }
79}
80
81impl SQLContext {
82    /// Create a new SQLContext.
83    /// ```rust
84    /// # use polars_sql::SQLContext;
85    /// # fn main() {
86    /// let ctx = SQLContext::new();
87    /// # }
88    /// ```
89    pub fn new() -> Self {
90        Self::default()
91    }
92
93    /// Get the names of all registered tables, in sorted order.
94    pub fn get_tables(&self) -> Vec<String> {
95        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
96        tables.sort_unstable();
97        tables
98    }
99
100    /// Register a [`LazyFrame`] as a table in the SQLContext.
101    /// ```rust
102    /// # use polars_sql::SQLContext;
103    /// # use polars_core::prelude::*;
104    /// # use polars_lazy::prelude::*;
105    /// # fn main() {
106    ///
107    /// let mut ctx = SQLContext::new();
108    /// let df = df! {
109    ///    "a" =>  [1, 2, 3],
110    /// }.unwrap().lazy();
111    ///
112    /// ctx.register("df", df);
113    /// # }
114    ///```
115    pub fn register(&mut self, name: &str, lf: LazyFrame) {
116        self.table_map.insert(name.to_owned(), lf);
117    }
118
119    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
120    pub fn unregister(&mut self, name: &str) {
121        self.table_map.remove(&name.to_owned());
122    }
123
124    /// Execute a SQL query, returning a [`LazyFrame`].
125    /// ```rust
126    /// # use polars_sql::SQLContext;
127    /// # use polars_core::prelude::*;
128    /// # use polars_lazy::prelude::*;
129    /// # fn main() {
130    ///
131    /// let mut ctx = SQLContext::new();
132    /// let df = df! {
133    ///    "a" =>  [1, 2, 3],
134    /// }
135    /// .unwrap();
136    ///
137    /// ctx.register("df", df.clone().lazy());
138    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
139    /// assert!(sql_df.equals(&df));
140    /// # }
141    ///```
142    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
143        let mut parser = Parser::new(&GenericDialect);
144        parser = parser.with_options(ParserOptions {
145            trailing_commas: true,
146            ..Default::default()
147        });
148
149        let ast = parser
150            .try_with_sql(query)
151            .map_err(to_sql_interface_err)?
152            .parse_statements()
153            .map_err(to_sql_interface_err)?;
154
155        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
156        let res = self.execute_statement(ast.first().unwrap())?;
157
158        // Ensure the result uses the proper arenas.
159        // This will instantiate new arenas with a new version.
160        let lp_arena = std::mem::take(&mut self.lp_arena);
161        let expr_arena = std::mem::take(&mut self.expr_arena);
162        res.set_cached_arena(lp_arena, expr_arena);
163
164        // Every execution should clear the statement-level maps.
165        self.cte_map.clear();
166        self.table_aliases.clear();
167        self.joined_aliases.clear();
168
169        Ok(res)
170    }
171
172    /// Add a function registry to the SQLContext.
173    /// The registry provides the ability to add custom functions to the SQLContext.
174    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
175        self.function_registry = function_registry;
176        self
177    }
178
179    /// Get the function registry of the SQLContext
180    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
181        &self.function_registry
182    }
183
184    /// Get a mutable reference to the function registry of the SQLContext
185    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
186        Arc::get_mut(&mut self.function_registry).unwrap()
187    }
188}
189
190impl SQLContext {
191    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
192        let ast = stmt;
193        Ok(match ast {
194            Statement::Query(query) => self.execute_query(query)?,
195            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
196            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
197            stmt @ Statement::Drop {
198                object_type: ObjectType::Table,
199                ..
200            } => self.execute_drop_table(stmt)?,
201            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
202            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
203            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
204            _ => polars_bail!(
205                SQLInterface: "statement type is not supported:\n{:?}", ast,
206            ),
207        })
208    }
209
210    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211        self.register_ctes(query)?;
212        self.execute_query_no_ctes(query)
213    }
214
215    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216        let lf = self.process_query(&query.body, query)?;
217        self.process_limit_offset(lf, &query.limit, &query.offset)
218    }
219
220    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222    }
223
224    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225        // Resolve the table name in the current scope; multi-stage fallback
226        // * table name → cte name
227        // * table alias → cte alias
228        self.table_map
229            .get(name)
230            .or_else(|| self.cte_map.get(name))
231            .or_else(|| {
232                self.table_aliases.get(name).and_then(|alias| {
233                    self.table_map
234                        .get(alias.as_str())
235                        .or_else(|| self.cte_map.get(alias.as_str()))
236                })
237            })
238            .cloned()
239    }
240
241    /// Execute a query in an isolated context. This prevents subqueries from mutating
242    /// arenas and other context state. Returns both the LazyFrame *and* its associated
243    /// Schema (so that the correct arenas are used when determining schema).
244    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<(LazyFrame, SchemaRef)>
245    where
246        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
247    {
248        // Save key state (arenas and lookups)
249        let (joined_aliases, table_aliases, lp_arena, expr_arena, table_map) = (
250            // "take" to ensure subqueries start with clean state
251            std::mem::take(&mut self.joined_aliases),
252            std::mem::take(&mut self.table_aliases),
253            std::mem::take(&mut self.lp_arena),
254            std::mem::take(&mut self.expr_arena),
255            // "clone" to allow subqueries to see registered tables
256            self.table_map.clone(),
257        );
258
259        // Execute query with clean state (eg: nested/subquery)
260        let mut lf = query(self)?;
261        let schema = self.get_frame_schema(&mut lf)?;
262
263        // Restore saved state
264        lf.set_cached_arena(
265            std::mem::replace(&mut self.lp_arena, lp_arena),
266            std::mem::replace(&mut self.expr_arena, expr_arena),
267        );
268        self.joined_aliases = joined_aliases;
269        self.table_aliases = table_aliases;
270        self.table_map = table_map;
271
272        Ok((lf, schema))
273    }
274
275    fn expr_or_ordinal(
276        &mut self,
277        e: &SQLExpr,
278        exprs: &[Expr],
279        selected: Option<&[Expr]>,
280        schema: Option<&Schema>,
281        clause: &str,
282    ) -> PolarsResult<Expr> {
283        match e {
284            SQLExpr::UnaryOp {
285                op: UnaryOperator::Minus,
286                expr,
287            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
288                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
289                    Err(polars_err!(
290                    SQLSyntax:
291                    "negative ordinal values are invalid for {}; found -{}",
292                    clause,
293                    idx
294                    ))
295                } else {
296                    unreachable!()
297                }
298            },
299            SQLExpr::Value(SQLValue::Number(idx, _)) => {
300                // note: sql queries are 1-indexed
301                let idx = idx.parse::<usize>().map_err(|_| {
302                    polars_err!(
303                        SQLSyntax:
304                        "negative ordinal values are invalid for {}; found {}",
305                        clause,
306                        idx
307                    )
308                })?;
309                // note: "selected" cols represent final projection order, so we use those for
310                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
311                let cols = if let Some(cols) = selected {
312                    cols
313                } else {
314                    exprs
315                };
316                Ok(cols
317                    .get(idx - 1)
318                    .ok_or_else(|| {
319                        polars_err!(
320                            SQLInterface:
321                            "{} ordinal value must refer to a valid column; found {}",
322                            clause,
323                            idx
324                        )
325                    })?
326                    .clone())
327            },
328            SQLExpr::Value(v) => Err(polars_err!(
329                SQLSyntax:
330                "{} requires a valid expression or positive ordinal; found {}", clause, v,
331            )),
332            _ => parse_sql_expr(e, self, schema),
333        }
334    }
335
336    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
337        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
338            if let Some(name) = aliases.get(column_name) {
339                return name.to_string();
340            }
341        }
342        column_name.to_string()
343    }
344
345    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
346        match expr {
347            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
348            SetExpr::Query(query) => self.execute_query_no_ctes(query),
349            SetExpr::SetOperation {
350                op: SetOperator::Union,
351                set_quantifier,
352                left,
353                right,
354            } => self.process_union(left, right, set_quantifier, query),
355
356            #[cfg(feature = "semi_anti_join")]
357            SetExpr::SetOperation {
358                op: SetOperator::Intersect | SetOperator::Except,
359                set_quantifier,
360                left,
361                right,
362            } => self.process_except_intersect(left, right, set_quantifier, query),
363
364            SetExpr::Values(Values {
365                explicit_row: _,
366                rows,
367            }) => self.process_values(rows),
368
369            SetExpr::Table(tbl) => {
370                if tbl.table_name.is_some() {
371                    let table_name = tbl.table_name.as_ref().unwrap();
372                    self.get_table_from_current_scope(table_name)
373                        .ok_or_else(|| {
374                            polars_err!(
375                                SQLInterface: "no table or alias named '{}' found",
376                                tbl
377                            )
378                        })
379                } else {
380                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
381                }
382            },
383            op => {
384                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
385            },
386        }
387    }
388
389    #[cfg(feature = "semi_anti_join")]
390    fn process_except_intersect(
391        &mut self,
392        left: &SetExpr,
393        right: &SetExpr,
394        quantifier: &SetQuantifier,
395        query: &Query,
396    ) -> PolarsResult<LazyFrame> {
397        let (join_type, op_name) = match *query.body {
398            SetExpr::SetOperation {
399                op: SetOperator::Except,
400                ..
401            } => (JoinType::Anti, "EXCEPT"),
402            _ => (JoinType::Semi, "INTERSECT"),
403        };
404        let mut lf = self.process_query(left, query)?;
405        let mut rf = self.process_query(right, query)?;
406        let lf_schema = self.get_frame_schema(&mut lf)?;
407
408        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
409        let rf_cols = match quantifier {
410            SetQuantifier::ByName => None,
411            SetQuantifier::Distinct | SetQuantifier::None => {
412                let rf_schema = self.get_frame_schema(&mut rf)?;
413                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
414                if lf_cols.len() != rf_cols.len() {
415                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
416                }
417                Some(rf_cols)
418            },
419            _ => {
420                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
421            },
422        };
423        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
424        let joined_tbl = match rf_cols {
425            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
426            None => join.on(lf_cols).finish(),
427        };
428        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
429    }
430
431    fn process_union(
432        &mut self,
433        left: &SetExpr,
434        right: &SetExpr,
435        quantifier: &SetQuantifier,
436        query: &Query,
437    ) -> PolarsResult<LazyFrame> {
438        let mut lf = self.process_query(left, query)?;
439        let mut rf = self.process_query(right, query)?;
440        let opts = UnionArgs {
441            parallel: true,
442            to_supertypes: true,
443            ..Default::default()
444        };
445        match quantifier {
446            // UNION [ALL | DISTINCT]
447            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
448                let lf_schema = self.get_frame_schema(&mut lf)?;
449                let rf_schema = self.get_frame_schema(&mut rf)?;
450                if lf_schema.len() != rf_schema.len() {
451                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
452                }
453                let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
454                match quantifier {
455                    SetQuantifier::Distinct | SetQuantifier::None => {
456                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
457                    },
458                    _ => concatenated,
459                }
460            },
461            // UNION ALL BY NAME
462            #[cfg(feature = "diagonal_concat")]
463            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
464            // UNION [DISTINCT] BY NAME
465            #[cfg(feature = "diagonal_concat")]
466            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
467                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
468                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
469            },
470            #[allow(unreachable_patterns)]
471            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
472        }
473    }
474
475    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
476        let frame_rows: Vec<Row> = values.iter().map(|row| {
477            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
478                let expr = parse_sql_expr(expr, self, None)?;
479                match expr {
480                    Expr::Literal(value) => {
481                        value.to_any_value()
482                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
483                            .map(|av| av.into_static())
484                    },
485                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
486                }
487            }).collect();
488            row_data.map(Row::new)
489        }).collect::<Result<_, _>>()?;
490
491        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
492    }
493
494    // EXPLAIN SELECT * FROM DF
495    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
496        match stmt {
497            Statement::Explain { statement, .. } => {
498                let lf = self.execute_statement(statement)?;
499                let plan = lf.describe_optimized_plan()?;
500                let plan = plan
501                    .split('\n')
502                    .collect::<Series>()
503                    .with_name(PlSmallStr::from_static("Logical Plan"))
504                    .into_column();
505                let df = DataFrame::new(vec![plan])?;
506                Ok(df.lazy())
507            },
508            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
509        }
510    }
511
512    // SHOW TABLES
513    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
514        let tables = Column::new("name".into(), self.get_tables());
515        let df = DataFrame::new(vec![tables])?;
516        Ok(df.lazy())
517    }
518
519    // DROP TABLE <tbl>
520    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
521        match stmt {
522            Statement::Drop { names, .. } => {
523                names.iter().for_each(|name| {
524                    self.table_map.remove(&name.to_string());
525                });
526                Ok(DataFrame::empty().lazy())
527            },
528            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
529        }
530    }
531
532    // DELETE FROM <tbl> [WHERE ...]
533    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
534        if let Statement::Delete(Delete {
535            tables,
536            from,
537            using,
538            selection,
539            returning,
540            order_by,
541            limit,
542        }) = stmt
543        {
544            if !tables.is_empty()
545                || using.is_some()
546                || returning.is_some()
547                || limit.is_some()
548                || !order_by.is_empty()
549            {
550                let error_message = match () {
551                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
552                    _ if using.is_some() => "DELETE does not support the USING clause",
553                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
554                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
555                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
556                    _ => unreachable!(),
557                };
558                polars_bail!(SQLInterface: error_message);
559            }
560            let from_tables = match &from {
561                FromTable::WithFromKeyword(from) => from,
562                FromTable::WithoutKeyword(from) => from,
563            };
564            if from_tables.len() > 1 {
565                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
566            }
567            let tbl_expr = from_tables.first().unwrap();
568            if !tbl_expr.joins.is_empty() {
569                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
570            }
571            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
572            if selection.is_none() {
573                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
574                Ok(DataFrame::empty_with_schema(
575                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
576                        .unwrap()
577                        .as_ref(),
578                )
579                .lazy())
580            } else {
581                // apply constraint as inverted filter (drops rows matching the selection)
582                Ok(self.process_where(lf.clone(), selection, true, None)?)
583            }
584        } else {
585            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
586        }
587    }
588
589    // TRUNCATE <tbl>
590    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
591        if let Statement::Truncate {
592            table_names,
593            partitions,
594            ..
595        } = stmt
596        {
597            match partitions {
598                None => {
599                    if table_names.len() != 1 {
600                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
601                    }
602                    let tbl = table_names[0].to_string();
603                    if let Some(lf) = self.table_map.get_mut(&tbl) {
604                        *lf = DataFrame::empty_with_schema(
605                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
606                                .unwrap()
607                                .as_ref(),
608                        )
609                        .lazy();
610                        Ok(lf.clone())
611                    } else {
612                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
613                    }
614                },
615                _ => {
616                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
617                },
618            }
619        } else {
620            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
621        }
622    }
623
624    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
625        self.cte_map.insert(name.to_owned(), lf);
626    }
627
628    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
629        if let Some(with) = &query.with {
630            if with.recursive {
631                polars_bail!(SQLInterface: "recursive CTEs are not supported")
632            }
633            for cte in &with.cte_tables {
634                let cte_name = cte.alias.name.value.clone();
635                let mut lf = self.execute_query(&cte.query)?;
636                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
637                self.register_cte(&cte_name, lf);
638            }
639        }
640        Ok(())
641    }
642
643    /// execute the 'FROM' part of the query
644    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
645        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
646        if !tbl_expr.joins.is_empty() {
647            for join in &tbl_expr.joins {
648                let (r_name, mut rf) = self.get_table(&join.relation)?;
649                if r_name.is_empty() {
650                    // Require non-empty to avoid duplicate column errors from nested self-joins.
651                    polars_bail!(
652                        SQLInterface:
653                        "cannot join on unnamed relation; please provide an alias"
654                    )
655                }
656                let left_schema = self.get_frame_schema(&mut lf)?;
657                let right_schema = self.get_frame_schema(&mut rf)?;
658
659                lf = match &join.join_operator {
660                    op @ (JoinOperator::FullOuter(constraint)
661                    | JoinOperator::LeftOuter(constraint)
662                    | JoinOperator::RightOuter(constraint)
663                    | JoinOperator::Inner(constraint)
664                    | JoinOperator::Anti(constraint)
665                    | JoinOperator::Semi(constraint)
666                    | JoinOperator::LeftAnti(constraint)
667                    | JoinOperator::LeftSemi(constraint)
668                    | JoinOperator::RightAnti(constraint)
669                    | JoinOperator::RightSemi(constraint)) => {
670                        let (lf, rf) = match op {
671                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
672                            _ => (lf, rf),
673                        };
674                        self.process_join(
675                            &TableInfo {
676                                frame: lf,
677                                name: (&l_name).into(),
678                                schema: left_schema.clone(),
679                            },
680                            &TableInfo {
681                                frame: rf,
682                                name: (&r_name).into(),
683                                schema: right_schema.clone(),
684                            },
685                            constraint,
686                            match op {
687                                JoinOperator::FullOuter(_) => JoinType::Full,
688                                JoinOperator::LeftOuter(_) => JoinType::Left,
689                                JoinOperator::RightOuter(_) => JoinType::Right,
690                                JoinOperator::Inner(_) => JoinType::Inner,
691                                #[cfg(feature = "semi_anti_join")]
692                                JoinOperator::Anti(_)
693                                | JoinOperator::LeftAnti(_)
694                                | JoinOperator::RightAnti(_) => JoinType::Anti,
695                                #[cfg(feature = "semi_anti_join")]
696                                JoinOperator::Semi(_)
697                                | JoinOperator::LeftSemi(_)
698                                | JoinOperator::RightSemi(_) => JoinType::Semi,
699                                join_type => polars_bail!(
700                                    SQLInterface:
701                                    "join type '{:?}' not currently supported",
702                                    join_type
703                                ),
704                            },
705                        )?
706                    },
707                    JoinOperator::CrossJoin => {
708                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
709                    },
710                    join_type => {
711                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
712                    },
713                };
714
715                // track join-aliased columns so we can resolve them later
716                let joined_schema = self.get_frame_schema(&mut lf)?;
717
718                self.joined_aliases.insert(
719                    r_name.clone(),
720                    right_schema
721                        .iter_names()
722                        .filter_map(|name| {
723                            // col exists in both tables and is aliased in the joined result
724                            let aliased_name = format!("{name}:{r_name}");
725                            if left_schema.contains(name)
726                                && joined_schema.contains(aliased_name.as_str())
727                            {
728                                Some((name.to_string(), aliased_name))
729                            } else {
730                                None
731                            }
732                        })
733                        .collect::<PlHashMap<String, String>>(),
734                );
735            }
736        };
737        Ok(lf)
738    }
739
740    /// Execute the 'SELECT' part of the query.
741    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
742        let mut lf = if select_stmt.from.is_empty() {
743            DataFrame::empty().lazy()
744        } else {
745            // Note: implicit joins need more work to support properly,
746            // explicit joins are preferred for now (ref: #16662)
747            let from = select_stmt.clone().from;
748            if from.len() > 1 {
749                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
750            }
751            self.execute_from_statement(from.first().unwrap())?
752        };
753
754        // Filter expression (WHERE clause)
755        let schema = self.get_frame_schema(&mut lf)?;
756        lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
757
758        // 'SELECT *' modifiers
759        let mut select_modifiers = SelectModifiers {
760            ilike: None,
761            exclude: PlHashSet::new(),
762            rename: PlHashMap::new(),
763            replace: vec![],
764        };
765
766        let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
767
768        // Check for "GROUP BY ..." (after determining projections)
769        let mut group_by_keys: Vec<Expr> = Vec::new();
770        match &select_stmt.group_by {
771            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
772            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
773                if !modifiers.is_empty() {
774                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
775                }
776                // translate the group expressions, allowing ordinal values
777                group_by_keys = group_by_exprs
778                    .iter()
779                    .map(|e| {
780                        self.expr_or_ordinal(
781                            e,
782                            &projections,
783                            None,
784                            Some(schema.deref()),
785                            "GROUP BY",
786                        )
787                    })
788                    .collect::<PolarsResult<_>>()?
789            },
790            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
791            // nested agg/window funcs to the group key (also ignores literals).
792            GroupByExpr::All(modifiers) => {
793                if !modifiers.is_empty() {
794                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
795                }
796                projections.iter().for_each(|expr| match expr {
797                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
798                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
799                    Expr::Column(_) => group_by_keys.push(expr.clone()),
800                    Expr::Alias(e, _)
801                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
802                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
803                        if let Expr::Column(name) = &**e {
804                            group_by_keys.push(col(name.clone()));
805                        }
806                    },
807                    _ => {
808                        // If not quick-matched, add if no nested agg/window expressions
809                        if !has_expr(expr, |e| {
810                            matches!(e, Expr::Agg(_))
811                                || matches!(e, Expr::Len)
812                                || matches!(e, Expr::Window { .. })
813                        }) {
814                            group_by_keys.push(expr.clone())
815                        }
816                    },
817                });
818            },
819        };
820
821        lf = if group_by_keys.is_empty() {
822            // The 'having' clause is only valid inside 'group by'
823            if select_stmt.having.is_some() {
824                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
825            };
826
827            // Final/selected cols, accounting for 'SELECT *' modifiers
828            let mut retained_cols = Vec::with_capacity(projections.len());
829            let mut retained_names = Vec::with_capacity(projections.len());
830            let have_order_by = query.order_by.is_some();
831            // Initialize containing InheritsContext to handle empty projection case.
832            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
833
834            // Note: if there is an 'order by' then we project everything (original cols
835            // and new projections) and *then* select the final cols; the retained cols
836            // are used to ensure a correct final projection. If there's no 'order by',
837            // clause then we can project the final column *expressions* directly.
838            for p in projections.iter() {
839                let name = p.to_field(schema.deref())?.name.to_string();
840                if select_modifiers.matches_ilike(&name)
841                    && !select_modifiers.exclude.contains(&name)
842                {
843                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
844
845                    retained_cols.push(if have_order_by {
846                        col(name.as_str())
847                    } else {
848                        p.clone()
849                    });
850                    retained_names.push(col(name));
851                }
852            }
853
854            // Apply the remaining modifiers and establish the final projection
855            if have_order_by {
856                // We can safely use `with_columns()` and avoid a join if:
857                // * There is already a projection that projects to the table height.
858                // * All projection heights inherit from context (e.g. all scalar literals that
859                //   are to be broadcasted to table height).
860                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
861                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
862                {
863                    lf = lf.with_columns(projections);
864                } else {
865                    // We hit this branch if the output height is not guaranteed to match the table
866                    // height. E.g.:
867                    //
868                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
869                    // * SELECT UNNEST(list_col) FROM df ORDER BY sort_key;
870                    //
871                    // For these cases we truncate / extend the sorting columns with NULLs to match
872                    // the output height. We do this by projecting independently and then joining
873                    // back the original frame on the row index.
874                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
875                    lf = lf
876                        .clone()
877                        .select(projections)
878                        .with_row_index(NAME, None)
879                        .join(
880                            lf.with_row_index(NAME, None),
881                            [col(NAME)],
882                            [col(NAME)],
883                            JoinArgs {
884                                how: JoinType::Left,
885                                validation: Default::default(),
886                                suffix: None,
887                                slice: None,
888                                nulls_equal: false,
889                                coalesce: Default::default(),
890                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
891                            },
892                        );
893                }
894            }
895
896            if !select_modifiers.replace.is_empty() {
897                lf = lf.with_columns(&select_modifiers.replace);
898            }
899            if !select_modifiers.rename.is_empty() {
900                lf = lf.with_columns(select_modifiers.renamed_cols());
901            }
902
903            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
904
905            // Note: If `have_order_by`, with_columns is already done above.
906            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
907                && !have_order_by
908            {
909                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
910                lf = lf.with_columns(retained_cols).select(retained_names);
911            } else {
912                lf = lf.select(retained_cols);
913            }
914
915            if !select_modifiers.rename.is_empty() {
916                lf = lf.rename(
917                    select_modifiers.rename.keys(),
918                    select_modifiers.rename.values(),
919                    true,
920                );
921            };
922            lf
923        } else {
924            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
925            lf = self.process_order_by(lf, &query.order_by, None)?;
926
927            // Apply optional 'having' clause, post-aggregation.
928            let schema = Some(self.get_frame_schema(&mut lf)?);
929            match select_stmt.having.as_ref() {
930                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
931                None => lf,
932            }
933        };
934
935        // Apply optional DISTINCT clause.
936        lf = match &select_stmt.distinct {
937            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
938            Some(Distinct::On(exprs)) => {
939                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
940                let schema = Some(self.get_frame_schema(&mut lf)?);
941                let cols = exprs
942                    .iter()
943                    .map(|e| {
944                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
945                        if let Expr::Column(name) = expr {
946                            Ok(name)
947                        } else {
948                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
949                        }
950                    })
951                    .collect::<PolarsResult<Vec<_>>>()?;
952
953                // DISTINCT ON has to apply the ORDER BY before the operation.
954                lf = self.process_order_by(lf, &query.order_by, None)?;
955                return Ok(lf.unique_stable(
956                    Some(Selector::ByName {
957                        names: cols.into(),
958                        strict: true,
959                    }),
960                    UniqueKeepStrategy::First,
961                ));
962            },
963            None => lf,
964        };
965        Ok(lf)
966    }
967
968    fn column_projections(
969        &mut self,
970        select_stmt: &Select,
971        schema: &SchemaRef,
972        select_modifiers: &mut SelectModifiers,
973    ) -> PolarsResult<Vec<Expr>> {
974        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
975            .projection
976            .iter()
977            .map(|select_item| match select_item {
978                SelectItem::UnnamedExpr(expr) => {
979                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
980                },
981                SelectItem::ExprWithAlias { expr, alias } => {
982                    let expr = parse_sql_expr(expr, self, Some(schema))?;
983                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
984                },
985                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
986                    .process_qualified_wildcard(
987                        obj_name,
988                        wildcard_options,
989                        select_modifiers,
990                        Some(schema),
991                    ),
992                SelectItem::Wildcard(wildcard_options) => {
993                    let cols = schema
994                        .iter_names()
995                        .map(|name| col(name.clone()))
996                        .collect::<Vec<_>>();
997
998                    self.process_wildcard_additional_options(
999                        cols,
1000                        wildcard_options,
1001                        select_modifiers,
1002                        Some(schema),
1003                    )
1004                },
1005            })
1006            .collect();
1007
1008        let flattened_exprs: Vec<Expr> = parsed_items?
1009            .into_iter()
1010            .flatten()
1011            .flat_map(|expr| expand_exprs(expr, schema))
1012            .collect();
1013
1014        Ok(flattened_exprs)
1015    }
1016
1017    fn process_where(
1018        &mut self,
1019        mut lf: LazyFrame,
1020        expr: &Option<SQLExpr>,
1021        invert_filter: bool,
1022        schema: Option<SchemaRef>,
1023    ) -> PolarsResult<LazyFrame> {
1024        if let Some(expr) = expr {
1025            let schema = match schema {
1026                None => self.get_frame_schema(&mut lf)?,
1027                Some(s) => s,
1028            };
1029
1030            // shortcut filter evaluation if given expression is just TRUE or FALSE
1031            let (all_true, all_false) = match expr {
1032                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
1033                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1034                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
1035                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1036                        (a != b, a == b)
1037                    },
1038                    _ => (false, false),
1039                },
1040                _ => (false, false),
1041            };
1042            if (all_true && !invert_filter) || (all_false && invert_filter) {
1043                return Ok(lf);
1044            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1045                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1046            }
1047
1048            // ...otherwise parse and apply the filter as normal
1049            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1050            if filter_expression.clone().meta().has_multiple_outputs() {
1051                filter_expression = all_horizontal([filter_expression])?;
1052            }
1053            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1054            lf = if invert_filter {
1055                lf.remove(filter_expression)
1056            } else {
1057                lf.filter(filter_expression)
1058            };
1059        }
1060        Ok(lf)
1061    }
1062
1063    pub(super) fn process_join(
1064        &mut self,
1065        tbl_left: &TableInfo,
1066        tbl_right: &TableInfo,
1067        constraint: &JoinConstraint,
1068        join_type: JoinType,
1069    ) -> PolarsResult<LazyFrame> {
1070        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1071
1072        let joined = tbl_left
1073            .frame
1074            .clone()
1075            .join_builder()
1076            .with(tbl_right.frame.clone())
1077            .left_on(left_on)
1078            .right_on(right_on)
1079            .how(join_type)
1080            .suffix(format!(":{}", tbl_right.name))
1081            .coalesce(JoinCoalesce::KeepColumns)
1082            .finish();
1083
1084        Ok(joined)
1085    }
1086
1087    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1088        let mut contexts = vec![];
1089        for expr in exprs {
1090            *expr = expr.clone().map_expr(|e| match e {
1091                Expr::SubPlan(lp, names) => {
1092                    contexts.push(<LazyFrame>::from((**lp).clone()));
1093                    if names.len() == 1 {
1094                        Expr::Column(names[0].as_str().into())
1095                    } else {
1096                        Expr::SubPlan(lp, names)
1097                    }
1098                },
1099                e => e,
1100            })
1101        }
1102
1103        if contexts.is_empty() {
1104            lf
1105        } else {
1106            lf.with_context(contexts)
1107        }
1108    }
1109
1110    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1111        if let Statement::CreateTable(CreateTable {
1112            if_not_exists,
1113            name,
1114            query,
1115            ..
1116        }) = stmt
1117        {
1118            let tbl_name = name.0.first().unwrap().value.as_str();
1119            // CREATE TABLE IF NOT EXISTS
1120            if *if_not_exists && self.table_map.contains_key(tbl_name) {
1121                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1122                // CREATE OR REPLACE TABLE
1123            }
1124            if let Some(query) = query {
1125                let lf = self.execute_query(query)?;
1126                self.register(tbl_name, lf);
1127                let out = df! {
1128                    "Response" => ["CREATE TABLE"]
1129                }
1130                .unwrap()
1131                .lazy();
1132                Ok(out)
1133            } else {
1134                polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1135            }
1136        } else {
1137            unreachable!()
1138        }
1139    }
1140
1141    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1142        match relation {
1143            TableFactor::Table {
1144                name, alias, args, ..
1145            } => {
1146                if let Some(args) = args {
1147                    return self.execute_table_function(name, alias, &args.args);
1148                }
1149                let tbl_name = name.0.first().unwrap().value.as_str();
1150                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1151                    match alias {
1152                        Some(alias) => {
1153                            self.table_aliases
1154                                .insert(alias.name.value.clone(), tbl_name.to_string());
1155                            Ok((alias.to_string(), lf))
1156                        },
1157                        None => Ok((tbl_name.to_string(), lf)),
1158                    }
1159                } else {
1160                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1161                }
1162            },
1163            TableFactor::Derived {
1164                lateral,
1165                subquery,
1166                alias,
1167            } => {
1168                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1169                if let Some(alias) = alias {
1170                    let mut lf = self.execute_query_no_ctes(subquery)?;
1171                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1172                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1173                    Ok((alias.name.value.clone(), lf))
1174                } else {
1175                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1176                }
1177            },
1178            TableFactor::UNNEST {
1179                alias,
1180                array_exprs,
1181                with_offset,
1182                with_offset_alias: _,
1183                ..
1184            } => {
1185                if let Some(alias) = alias {
1186                    let column_names: Vec<Option<PlSmallStr>> = alias
1187                        .columns
1188                        .iter()
1189                        .map(|c| {
1190                            if c.name.value.is_empty() {
1191                                None
1192                            } else {
1193                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1194                            }
1195                        })
1196                        .collect();
1197
1198                    let column_values: Vec<Series> = array_exprs
1199                        .iter()
1200                        .map(|arr| parse_sql_array(arr, self))
1201                        .collect::<Result<_, _>>()?;
1202
1203                    polars_ensure!(!column_names.is_empty(),
1204                        SQLSyntax:
1205                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1206                    );
1207                    if column_names.len() != column_values.len() {
1208                        let plural = if column_values.len() > 1 { "s" } else { "" };
1209                        polars_bail!(
1210                            SQLSyntax:
1211                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1212                        );
1213                    }
1214                    let column_series: Vec<Column> = column_values
1215                        .into_iter()
1216                        .zip(column_names)
1217                        .map(|(s, name)| {
1218                            if let Some(name) = name {
1219                                s.with_name(name)
1220                            } else {
1221                                s
1222                            }
1223                        })
1224                        .map(Column::from)
1225                        .collect();
1226
1227                    let lf = DataFrame::new(column_series)?.lazy();
1228
1229                    if *with_offset {
1230                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
1231                        //  (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1232                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1233                    }
1234                    let table_name = alias.name.value.clone();
1235                    self.table_map.insert(table_name.clone(), lf.clone());
1236                    Ok((table_name, lf))
1237                } else {
1238                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1239                }
1240            },
1241            TableFactor::NestedJoin {
1242                table_with_joins,
1243                alias,
1244            } => {
1245                let lf = self.execute_from_statement(table_with_joins)?;
1246                match alias {
1247                    Some(a) => Ok((a.name.value.clone(), lf)),
1248                    None => Ok(("".to_string(), lf)),
1249                }
1250            },
1251            // Support bare table, optionally with an alias, for now
1252            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1253        }
1254    }
1255
1256    fn execute_table_function(
1257        &mut self,
1258        name: &ObjectName,
1259        alias: &Option<TableAlias>,
1260        args: &[FunctionArg],
1261    ) -> PolarsResult<(String, LazyFrame)> {
1262        let tbl_fn = name.0.first().unwrap().value.as_str();
1263        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1264        let (tbl_name, lf) = read_fn.execute(args)?;
1265        #[allow(clippy::useless_asref)]
1266        let tbl_name = alias
1267            .as_ref()
1268            .map(|a| a.name.value.clone())
1269            .unwrap_or_else(|| tbl_name.to_str().to_string());
1270
1271        self.table_map.insert(tbl_name.clone(), lf.clone());
1272        Ok((tbl_name, lf))
1273    }
1274
1275    fn process_order_by(
1276        &mut self,
1277        mut lf: LazyFrame,
1278        order_by: &Option<OrderBy>,
1279        selected: Option<&[Expr]>,
1280    ) -> PolarsResult<LazyFrame> {
1281        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1282            return Ok(lf);
1283        }
1284        let schema = self.get_frame_schema(&mut lf)?;
1285        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1286
1287        let order_by = order_by.as_ref().unwrap().exprs.clone();
1288        let mut descending = Vec::with_capacity(order_by.len());
1289        let mut nulls_last = Vec::with_capacity(order_by.len());
1290        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1291
1292        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1293            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1294        {
1295            if let Some(selected) = selected {
1296                by.extend(selected.iter().cloned());
1297            } else {
1298                by.extend(columns_iter);
1299            };
1300            let desc_order = !order_by[0].asc.unwrap_or(true);
1301            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1302            descending.resize(by.len(), desc_order);
1303        } else {
1304            let columns = &columns_iter.collect::<Vec<_>>();
1305            for ob in order_by {
1306                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1307                // https://www.postgresql.org/docs/current/queries-order.html
1308                let desc_order = !ob.asc.unwrap_or(true);
1309                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1310                descending.push(desc_order);
1311
1312                // translate order expression, allowing ordinal values
1313                by.push(self.expr_or_ordinal(
1314                    &ob.expr,
1315                    columns,
1316                    selected,
1317                    Some(&schema),
1318                    "ORDER BY",
1319                )?)
1320            }
1321        }
1322        Ok(lf.sort_by_exprs(
1323            &by,
1324            SortMultipleOptions::default()
1325                .with_order_descending_multi(descending)
1326                .with_nulls_last_multi(nulls_last)
1327                .with_maintain_order(true),
1328        ))
1329    }
1330
1331    fn process_group_by(
1332        &mut self,
1333        mut lf: LazyFrame,
1334        group_by_keys: &[Expr],
1335        projections: &[Expr],
1336    ) -> PolarsResult<LazyFrame> {
1337        let schema_before = self.get_frame_schema(&mut lf)?;
1338        let group_by_keys_schema =
1339            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
1340                format!("group_by keys contained duplicate output name '{duplicate_name}'")
1341            })?;
1342
1343        // Remove the group_by keys as polars adds those implicitly.
1344        let mut aggregation_projection = Vec::with_capacity(projections.len());
1345        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1346        let mut projection_aliases = PlHashSet::new();
1347        let mut group_key_aliases = PlHashSet::new();
1348
1349        for mut e in projections {
1350            // `Len` represents COUNT(*) so we treat as an aggregation here.
1351            let is_non_group_key_expr = has_expr(e, |e| {
1352                match e {
1353                    Expr::Agg(_) | Expr::Len | Expr::Window { .. } => true,
1354                    Expr::Function { function: func, .. }
1355                        if !matches!(func, FunctionExpr::StructExpr(_)) =>
1356                    {
1357                        // If it's a function call containing a column NOT in the group by keys,
1358                        // we treat it as an aggregation.
1359                        has_expr(e, |e| match e {
1360                            Expr::Column(name) => !group_by_keys_schema.contains(name),
1361                            _ => false,
1362                        })
1363                    },
1364                    _ => false,
1365                }
1366            });
1367
1368            // Note: if simple aliased expression we defer aliasing until after the group_by.
1369            if let Expr::Alias(expr, alias) = e {
1370                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1371                    group_key_aliases.insert(alias.as_ref());
1372                    e = expr
1373                } else if let Expr::Function {
1374                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1375                    ..
1376                } = expr.deref()
1377                {
1378                    projection_overrides
1379                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1380                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1381                    projection_aliases.insert(alias.as_ref());
1382                }
1383            }
1384            let field = e.to_field(&schema_before)?;
1385            if group_by_keys_schema.get(&field.name).is_none() && is_non_group_key_expr {
1386                let mut e = e.clone();
1387                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1388                    e = (**expr).clone();
1389                } else if let Expr::Alias(expr, name) = &e {
1390                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1391                        e = (**expr).clone().alias(name.clone());
1392                    }
1393                }
1394                aggregation_projection.push(e);
1395            } else if let Expr::Column(_)
1396            | Expr::Function {
1397                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1398                ..
1399            } = e
1400            {
1401                // Non-aggregated columns must be part of the GROUP BY clause
1402                if !group_by_keys_schema.contains(&field.name) {
1403                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1404                }
1405            }
1406        }
1407        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1408        let projection_schema =
1409            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
1410                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
1411            })?;
1412
1413        // A final projection to get the proper order and any deferred transforms/aliases.
1414        let final_projection = projection_schema
1415            .iter_names()
1416            .zip(projections)
1417            .map(|(name, projection_expr)| {
1418                if let Some(expr) = projection_overrides.get(name.as_str()) {
1419                    expr.clone()
1420                } else if group_by_keys_schema.get(name).is_some()
1421                    || projection_aliases.contains(name.as_str())
1422                    || group_key_aliases.contains(name.as_str())
1423                {
1424                    projection_expr.clone()
1425                } else {
1426                    col(name.clone())
1427                }
1428            })
1429            .collect::<Vec<_>>();
1430
1431        Ok(aggregated.select(&final_projection))
1432    }
1433
1434    fn process_limit_offset(
1435        &self,
1436        lf: LazyFrame,
1437        limit: &Option<SQLExpr>,
1438        offset: &Option<Offset>,
1439    ) -> PolarsResult<LazyFrame> {
1440        match (offset, limit) {
1441            (
1442                Some(Offset {
1443                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1444                    ..
1445                }),
1446                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1447            ) => Ok(lf.slice(
1448                offset
1449                    .parse()
1450                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1451                limit
1452                    .parse()
1453                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1454            )),
1455            (
1456                Some(Offset {
1457                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1458                    ..
1459                }),
1460                None,
1461            ) => Ok(lf.slice(
1462                offset
1463                    .parse()
1464                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1465                IdxSize::MAX,
1466            )),
1467            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1468                limit
1469                    .parse()
1470                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1471            )),
1472            (None, None) => Ok(lf),
1473            _ => polars_bail!(
1474                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1475            ),
1476        }
1477    }
1478
1479    fn process_qualified_wildcard(
1480        &mut self,
1481        ObjectName(idents): &ObjectName,
1482        options: &WildcardAdditionalOptions,
1483        modifiers: &mut SelectModifiers,
1484        schema: Option<&Schema>,
1485    ) -> PolarsResult<Vec<Expr>> {
1486        let mut new_idents = idents.clone();
1487        new_idents.push(Ident::new("*"));
1488
1489        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1490        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1491    }
1492
1493    fn process_wildcard_additional_options(
1494        &mut self,
1495        exprs: Vec<Expr>,
1496        options: &WildcardAdditionalOptions,
1497        modifiers: &mut SelectModifiers,
1498        schema: Option<&Schema>,
1499    ) -> PolarsResult<Vec<Expr>> {
1500        if options.opt_except.is_some() && options.opt_exclude.is_some() {
1501            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1502        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1503            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1504        }
1505
1506        // SELECT * EXCLUDE
1507        if let Some(items) = &options.opt_exclude {
1508            match items {
1509                ExcludeSelectItem::Single(ident) => {
1510                    modifiers.exclude.insert(ident.value.clone());
1511                },
1512                ExcludeSelectItem::Multiple(idents) => {
1513                    modifiers
1514                        .exclude
1515                        .extend(idents.iter().map(|i| i.value.clone()));
1516                },
1517            };
1518        }
1519
1520        // SELECT * EXCEPT
1521        if let Some(items) = &options.opt_except {
1522            modifiers.exclude.insert(items.first_element.value.clone());
1523            modifiers
1524                .exclude
1525                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1526        }
1527
1528        // SELECT * ILIKE
1529        if let Some(item) = &options.opt_ilike {
1530            let rx = regex::escape(item.pattern.as_str())
1531                .replace('%', ".*")
1532                .replace('_', ".");
1533
1534            modifiers.ilike = Some(
1535                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1536            );
1537        }
1538
1539        // SELECT * RENAME
1540        if let Some(items) = &options.opt_rename {
1541            let renames = match items {
1542                RenameSelectItem::Single(rename) => vec![rename],
1543                RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1544            };
1545            for rn in renames {
1546                let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1547                let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1548                if before != after {
1549                    modifiers.rename.insert(before, after);
1550                }
1551            }
1552        }
1553
1554        // SELECT * REPLACE
1555        if let Some(replacements) = &options.opt_replace {
1556            for rp in &replacements.items {
1557                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1558                modifiers
1559                    .replace
1560                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1561            }
1562        }
1563        Ok(exprs)
1564    }
1565
1566    fn rename_columns_from_table_alias(
1567        &mut self,
1568        mut lf: LazyFrame,
1569        alias: &TableAlias,
1570    ) -> PolarsResult<LazyFrame> {
1571        if alias.columns.is_empty() {
1572            Ok(lf)
1573        } else {
1574            let schema = self.get_frame_schema(&mut lf)?;
1575            if alias.columns.len() != schema.len() {
1576                polars_bail!(
1577                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1578                    alias.columns.len(), alias.name.value, schema.len()
1579                )
1580            } else {
1581                let existing_columns: Vec<_> = schema.iter_names().collect();
1582                let new_columns: Vec<_> =
1583                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
1584                Ok(lf.rename(existing_columns, new_columns, true))
1585            }
1586        }
1587    }
1588}
1589
1590impl SQLContext {
1591    /// Get internal table map. For internal use only.
1592    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1593        self.table_map.clone()
1594    }
1595
1596    /// Create a new SQLContext from a table map. For internal use only
1597    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1598        Self {
1599            table_map,
1600            ..Default::default()
1601        }
1602    }
1603}
1604
1605fn collect_compound_identifiers(
1606    left: &[Ident],
1607    right: &[Ident],
1608    left_name: &str,
1609    right_name: &str,
1610) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1611    if left.len() == 2 && right.len() == 2 {
1612        let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1613        let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1614
1615        // switch left/right operands if the caller has them in reverse
1616        if left_name == tbl_b || right_name == tbl_a {
1617            Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1618        } else {
1619            Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1620        }
1621    } else {
1622        polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1623    }
1624}
1625
1626fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1627    match expr {
1628        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1629            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1630            schema
1631                .iter_names()
1632                .filter(|name| re.is_match(name))
1633                .map(|name| col(name.clone()))
1634                .collect::<Vec<_>>()
1635        },
1636        Expr::Selector(s) => s
1637            .into_columns(schema, &Default::default())
1638            .unwrap()
1639            .into_iter()
1640            .map(col)
1641            .collect::<Vec<_>>(),
1642        _ => vec![expr],
1643    }
1644}
1645
1646fn is_regex_colname(nm: &str) -> bool {
1647    nm.starts_with('^') && nm.ends_with('$')
1648}
1649
1650fn process_join_on(
1651    expression: &sqlparser::ast::Expr,
1652    tbl_left: &TableInfo,
1653    tbl_right: &TableInfo,
1654) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1655    match expression {
1656        SQLExpr::BinaryOp { left, op, right } => match op {
1657            BinaryOperator::And => {
1658                let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1659                let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1660                left_i.append(&mut left_j);
1661                right_i.append(&mut right_j);
1662                Ok((left_i, right_i))
1663            },
1664            BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1665                (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1666                    collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1667                },
1668                _ => {
1669                    polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1670                },
1671            },
1672            _ => {
1673                polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1674            },
1675        },
1676        SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1677        _ => {
1678            polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1679        },
1680    }
1681}
1682
1683fn process_join_constraint(
1684    constraint: &JoinConstraint,
1685    tbl_left: &TableInfo,
1686    tbl_right: &TableInfo,
1687) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1688    match constraint {
1689        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1690            process_join_on(expr, tbl_left, tbl_right)
1691        },
1692        JoinConstraint::Using(idents) if !idents.is_empty() => {
1693            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1694            Ok((using.clone(), using))
1695        },
1696        JoinConstraint::Natural => {
1697            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1698            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1699            let on: Vec<Expr> = left_names
1700                .intersection(&right_names)
1701                .map(|&name| col(name.clone()))
1702                .collect();
1703            if on.is_empty() {
1704                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1705            }
1706            Ok((on.clone(), on))
1707        },
1708        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1709    }
1710}
1711
1712bitflags::bitflags! {
1713    /// Bitfield indicating whether there exists a projection with the specified height behavior.
1714    ///
1715    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
1716    /// context.
1717    #[derive(PartialEq)]
1718    struct ExprSqlProjectionHeightBehavior: u8 {
1719        /// Maintains the height of input column(s)
1720        const MaintainsColumn = 1 << 0;
1721        /// Height is independent of input, e.g.:
1722        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
1723        /// * aggregations: count(*), first(), sum() etc.
1724        const Independent = 1 << 1;
1725        /// "Inherits" the height of the context, e.g.:
1726        /// * Scalar literals
1727        const InheritsContext = 1 << 2;
1728    }
1729}
1730
1731impl ExprSqlProjectionHeightBehavior {
1732    fn identify_from_expr(expr: &Expr) -> Self {
1733        let mut has_column = false;
1734        let mut has_independent = false;
1735
1736        for e in expr.into_iter() {
1737            use Expr::*;
1738            has_column |= matches!(e, Column(_) | Selector(_));
1739            has_independent |= match e {
1740                // @TODO: This is broken now with functions.
1741                AnonymousFunction { options, .. } => {
1742                    options.returns_scalar() || !options.is_length_preserving()
1743                },
1744                Literal(v) => !v.is_scalar(),
1745                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1746                Agg { .. } | Len => true,
1747                _ => false,
1748            }
1749        }
1750        if has_independent {
1751            Self::Independent
1752        } else if has_column {
1753            Self::MaintainsColumn
1754        } else {
1755            Self::InheritsContext
1756        }
1757    }
1758}