polars_sql/
context.rs

1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12    BinaryOperator, CreateTable, Distinct, ExcludeSelectItem, Expr as SQLExpr, FunctionArg,
13    GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset, OrderBy,
14    Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier, Statement,
15    TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16    WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29    pub(crate) frame: LazyFrame,
30    pub(crate) name: String,
31    pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
36    ilike: Option<regex::Regex>,               // SELECT * ILIKE
37    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
38    replace: Vec<Expr>,                        // SELECT * REPLACE
39}
40impl SelectModifiers {
41    fn matches_ilike(&self, s: &str) -> bool {
42        match &self.ilike {
43            Some(rx) => rx.is_match(s),
44            None => true,
45        }
46    }
47    fn renamed_cols(&self) -> Vec<Expr> {
48        self.rename
49            .iter()
50            .map(|(before, after)| col(before.clone()).alias(after.clone()))
51            .collect()
52    }
53}
54
55/// The SQLContext is the main entry point for executing SQL queries.
56#[derive(Clone)]
57pub struct SQLContext {
58    pub(crate) table_map: PlHashMap<String, LazyFrame>,
59    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60    pub(crate) lp_arena: Arena<IR>,
61    pub(crate) expr_arena: Arena<AExpr>,
62
63    cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64    table_aliases: RefCell<PlHashMap<String, String>>,
65    joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69    fn default() -> Self {
70        Self {
71            function_registry: Arc::new(DefaultFunctionRegistry {}),
72            table_map: Default::default(),
73            cte_map: Default::default(),
74            table_aliases: Default::default(),
75            joined_aliases: Default::default(),
76            lp_arena: Default::default(),
77            expr_arena: Default::default(),
78        }
79    }
80}
81
82impl SQLContext {
83    /// Create a new SQLContext.
84    /// ```rust
85    /// # use polars_sql::SQLContext;
86    /// # fn main() {
87    /// let ctx = SQLContext::new();
88    /// # }
89    /// ```
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Get the names of all registered tables, in sorted order.
95    pub fn get_tables(&self) -> Vec<String> {
96        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97        tables.sort_unstable();
98        tables
99    }
100
101    /// Register a [`LazyFrame`] as a table in the SQLContext.
102    /// ```rust
103    /// # use polars_sql::SQLContext;
104    /// # use polars_core::prelude::*;
105    /// # use polars_lazy::prelude::*;
106    /// # fn main() {
107    ///
108    /// let mut ctx = SQLContext::new();
109    /// let df = df! {
110    ///    "a" =>  [1, 2, 3],
111    /// }.unwrap().lazy();
112    ///
113    /// ctx.register("df", df);
114    /// # }
115    ///```
116    pub fn register(&mut self, name: &str, lf: LazyFrame) {
117        self.table_map.insert(name.to_owned(), lf);
118    }
119
120    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
121    pub fn unregister(&mut self, name: &str) {
122        self.table_map.remove(&name.to_owned());
123    }
124
125    /// Execute a SQL query, returning a [`LazyFrame`].
126    /// ```rust
127    /// # use polars_sql::SQLContext;
128    /// # use polars_core::prelude::*;
129    /// # use polars_lazy::prelude::*;
130    /// # fn main() {
131    ///
132    /// let mut ctx = SQLContext::new();
133    /// let df = df! {
134    ///    "a" =>  [1, 2, 3],
135    /// }
136    /// .unwrap();
137    ///
138    /// ctx.register("df", df.clone().lazy());
139    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
140    /// assert!(sql_df.equals(&df));
141    /// # }
142    ///```
143    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144        let mut parser = Parser::new(&GenericDialect);
145        parser = parser.with_options(ParserOptions {
146            trailing_commas: true,
147            ..Default::default()
148        });
149
150        let ast = parser
151            .try_with_sql(query)
152            .map_err(to_sql_interface_err)?
153            .parse_statements()
154            .map_err(to_sql_interface_err)?;
155
156        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157        let res = self.execute_statement(ast.first().unwrap())?;
158
159        // Ensure the result uses the proper arenas.
160        // This will instantiate new arenas with a new version.
161        let lp_arena = std::mem::take(&mut self.lp_arena);
162        let expr_arena = std::mem::take(&mut self.expr_arena);
163        res.set_cached_arena(lp_arena, expr_arena);
164
165        // Every execution should clear the statement-level maps.
166        self.cte_map.borrow_mut().clear();
167        self.table_aliases.borrow_mut().clear();
168        self.joined_aliases.borrow_mut().clear();
169
170        Ok(res)
171    }
172
173    /// add a function registry to the SQLContext
174    /// the registry provides the ability to add custom functions to the SQLContext
175    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176        self.function_registry = function_registry;
177        self
178    }
179
180    /// Get the function registry of the SQLContext
181    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182        &self.function_registry
183    }
184
185    /// Get a mutable reference to the function registry of the SQLContext
186    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187        Arc::get_mut(&mut self.function_registry).unwrap()
188    }
189}
190
191impl SQLContext {
192    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193        let ast = stmt;
194        Ok(match ast {
195            Statement::Query(query) => self.execute_query(query)?,
196            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198            stmt @ Statement::Drop {
199                object_type: ObjectType::Table,
200                ..
201            } => self.execute_drop_table(stmt)?,
202            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204            _ => polars_bail!(
205                SQLInterface: "statement type {:?} is not supported", ast,
206            ),
207        })
208    }
209
210    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211        self.register_ctes(query)?;
212        self.execute_query_no_ctes(query)
213    }
214
215    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216        let lf = self.process_query(&query.body, query)?;
217        self.process_limit_offset(lf, &query.limit, &query.offset)
218    }
219
220    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222    }
223
224    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225        let table = self.table_map.get(name).cloned();
226        table
227            .or_else(|| self.cte_map.borrow().get(name).cloned())
228            .or_else(|| {
229                self.table_aliases
230                    .borrow()
231                    .get(name)
232                    .and_then(|alias| self.table_map.get(alias).cloned())
233            })
234    }
235
236    fn expr_or_ordinal(
237        &mut self,
238        e: &SQLExpr,
239        exprs: &[Expr],
240        selected: Option<&[Expr]>,
241        schema: Option<&Schema>,
242        clause: &str,
243    ) -> PolarsResult<Expr> {
244        match e {
245            SQLExpr::UnaryOp {
246                op: UnaryOperator::Minus,
247                expr,
248            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
249                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
250                    Err(polars_err!(
251                    SQLSyntax:
252                    "negative ordinal values are invalid for {}; found -{}",
253                    clause,
254                    idx
255                    ))
256                } else {
257                    unreachable!()
258                }
259            },
260            SQLExpr::Value(SQLValue::Number(idx, _)) => {
261                // note: sql queries are 1-indexed
262                let idx = idx.parse::<usize>().map_err(|_| {
263                    polars_err!(
264                        SQLSyntax:
265                        "negative ordinal values are invalid for {}; found {}",
266                        clause,
267                        idx
268                    )
269                })?;
270                // note: "selected" cols represent final projection order, so we use those for
271                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
272                let cols = if let Some(cols) = selected {
273                    cols
274                } else {
275                    exprs
276                };
277                Ok(cols
278                    .get(idx - 1)
279                    .ok_or_else(|| {
280                        polars_err!(
281                            SQLInterface:
282                            "{} ordinal value must refer to a valid column; found {}",
283                            clause,
284                            idx
285                        )
286                    })?
287                    .clone())
288            },
289            SQLExpr::Value(v) => Err(polars_err!(
290                SQLSyntax:
291                "{} requires a valid expression or positive ordinal; found {}", clause, v,
292            )),
293            _ => parse_sql_expr(e, self, schema),
294        }
295    }
296
297    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
298        if self.joined_aliases.borrow().contains_key(tbl_name) {
299            self.joined_aliases
300                .borrow()
301                .get(tbl_name)
302                .and_then(|aliases| aliases.get(column_name))
303                .cloned()
304                .unwrap_or_else(|| column_name.to_string())
305        } else {
306            column_name.to_string()
307        }
308    }
309
310    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
311        match expr {
312            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
313            SetExpr::Query(query) => self.execute_query_no_ctes(query),
314            SetExpr::SetOperation {
315                op: SetOperator::Union,
316                set_quantifier,
317                left,
318                right,
319            } => self.process_union(left, right, set_quantifier, query),
320
321            #[cfg(feature = "semi_anti_join")]
322            SetExpr::SetOperation {
323                op: SetOperator::Intersect | SetOperator::Except,
324                set_quantifier,
325                left,
326                right,
327            } => self.process_except_intersect(left, right, set_quantifier, query),
328
329            SetExpr::Values(Values {
330                explicit_row: _,
331                rows,
332            }) => self.process_values(rows),
333
334            SetExpr::Table(tbl) => {
335                if tbl.table_name.is_some() {
336                    let table_name = tbl.table_name.as_ref().unwrap();
337                    self.get_table_from_current_scope(table_name)
338                        .ok_or_else(|| {
339                            polars_err!(
340                                SQLInterface: "no table or alias named '{}' found",
341                                tbl
342                            )
343                        })
344                } else {
345                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
346                }
347            },
348            op => {
349                let op = match op {
350                    SetExpr::SetOperation { op, .. } => op,
351                    _ => unreachable!(),
352                };
353                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
354            },
355        }
356    }
357
358    #[cfg(feature = "semi_anti_join")]
359    fn process_except_intersect(
360        &mut self,
361        left: &SetExpr,
362        right: &SetExpr,
363        quantifier: &SetQuantifier,
364        query: &Query,
365    ) -> PolarsResult<LazyFrame> {
366        let (join_type, op_name) = match *query.body {
367            SetExpr::SetOperation {
368                op: SetOperator::Except,
369                ..
370            } => (JoinType::Anti, "EXCEPT"),
371            _ => (JoinType::Semi, "INTERSECT"),
372        };
373        let mut lf = self.process_query(left, query)?;
374        let mut rf = self.process_query(right, query)?;
375        let join = lf
376            .clone()
377            .join_builder()
378            .with(rf.clone())
379            .how(join_type)
380            .join_nulls(true);
381
382        let lf_schema = self.get_frame_schema(&mut lf)?;
383        let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
384        let joined_tbl = match quantifier {
385            SetQuantifier::ByName => join.on(lf_cols).finish(),
386            SetQuantifier::Distinct | SetQuantifier::None => {
387                let rf_schema = self.get_frame_schema(&mut rf)?;
388                let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
389                if lf_cols.len() != rf_cols.len() {
390                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
391                }
392                join.left_on(lf_cols).right_on(rf_cols).finish()
393            },
394            _ => {
395                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
396            },
397        };
398        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
399    }
400
401    fn process_union(
402        &mut self,
403        left: &SetExpr,
404        right: &SetExpr,
405        quantifier: &SetQuantifier,
406        query: &Query,
407    ) -> PolarsResult<LazyFrame> {
408        let mut lf = self.process_query(left, query)?;
409        let mut rf = self.process_query(right, query)?;
410        let opts = UnionArgs {
411            parallel: true,
412            to_supertypes: true,
413            ..Default::default()
414        };
415        match quantifier {
416            // UNION [ALL | DISTINCT]
417            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
418                let lf_schema = self.get_frame_schema(&mut lf)?;
419                let rf_schema = self.get_frame_schema(&mut rf)?;
420                if lf_schema.len() != rf_schema.len() {
421                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
422                }
423                let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
424                match quantifier {
425                    SetQuantifier::Distinct | SetQuantifier::None => {
426                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
427                    },
428                    _ => concatenated,
429                }
430            },
431            // UNION ALL BY NAME
432            #[cfg(feature = "diagonal_concat")]
433            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
434            // UNION [DISTINCT] BY NAME
435            #[cfg(feature = "diagonal_concat")]
436            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
437                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
438                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
439            },
440            #[allow(unreachable_patterns)]
441            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
442        }
443    }
444
445    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
446        let frame_rows: Vec<Row> = values.iter().map(|row| {
447            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
448                let expr = parse_sql_expr(expr, self, None)?;
449                match expr {
450                    Expr::Literal(value) => {
451                        value.to_any_value()
452                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
453                            .map(|av| av.into_static())
454                    },
455                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
456                }
457            }).collect();
458            row_data.map(Row::new)
459        }).collect::<Result<_, _>>()?;
460
461        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
462    }
463
464    // EXPLAIN SELECT * FROM DF
465    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
466        match stmt {
467            Statement::Explain { statement, .. } => {
468                let lf = self.execute_statement(statement)?;
469                let plan = lf.describe_optimized_plan()?;
470                let plan = plan
471                    .split('\n')
472                    .collect::<Series>()
473                    .with_name(PlSmallStr::from_static("Logical Plan"))
474                    .into_column();
475                let df = DataFrame::new(vec![plan])?;
476                Ok(df.lazy())
477            },
478            _ => unreachable!(),
479        }
480    }
481
482    // SHOW TABLES
483    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
484        let tables = Column::new("name".into(), self.get_tables());
485        let df = DataFrame::new(vec![tables])?;
486        Ok(df.lazy())
487    }
488
489    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
490        match stmt {
491            Statement::Drop { names, .. } => {
492                names.iter().for_each(|name| {
493                    self.table_map.remove(&name.to_string());
494                });
495                Ok(DataFrame::empty().lazy())
496            },
497            _ => unreachable!(),
498        }
499    }
500
501    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
502        if let Statement::Truncate {
503            table_names,
504            partitions,
505            ..
506        } = stmt
507        {
508            match partitions {
509                None => {
510                    if table_names.len() != 1 {
511                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
512                    }
513                    let tbl = table_names[0].to_string();
514                    if let Some(lf) = self.table_map.get_mut(&tbl) {
515                        *lf = DataFrame::empty_with_schema(
516                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
517                                .unwrap()
518                                .as_ref(),
519                        )
520                        .lazy();
521                        Ok(lf.clone())
522                    } else {
523                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
524                    }
525                },
526                _ => {
527                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
528                },
529            }
530        } else {
531            unreachable!()
532        }
533    }
534
535    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
536        self.cte_map.borrow_mut().insert(name.to_owned(), lf);
537    }
538
539    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
540        if let Some(with) = &query.with {
541            if with.recursive {
542                polars_bail!(SQLInterface: "recursive CTEs are not supported")
543            }
544            for cte in &with.cte_tables {
545                let cte_name = cte.alias.name.value.clone();
546                let mut lf = self.execute_query(&cte.query)?;
547                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
548                self.register_cte(&cte_name, lf);
549            }
550        }
551        Ok(())
552    }
553
554    /// execute the 'FROM' part of the query
555    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
556        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
557        if !tbl_expr.joins.is_empty() {
558            for join in &tbl_expr.joins {
559                let (r_name, mut rf) = self.get_table(&join.relation)?;
560                let left_schema = self.get_frame_schema(&mut lf)?;
561                let right_schema = self.get_frame_schema(&mut rf)?;
562
563                lf = match &join.join_operator {
564                    op @ (JoinOperator::FullOuter(constraint)
565                    | JoinOperator::LeftOuter(constraint)
566                    | JoinOperator::RightOuter(constraint)
567                    | JoinOperator::Inner(constraint)
568                    | JoinOperator::Anti(constraint)
569                    | JoinOperator::Semi(constraint)
570                    | JoinOperator::LeftAnti(constraint)
571                    | JoinOperator::LeftSemi(constraint)
572                    | JoinOperator::RightAnti(constraint)
573                    | JoinOperator::RightSemi(constraint)) => {
574                        let (lf, rf) = match op {
575                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
576                            _ => (lf, rf),
577                        };
578                        self.process_join(
579                            &TableInfo {
580                                frame: lf,
581                                name: l_name.clone(),
582                                schema: left_schema.clone(),
583                            },
584                            &TableInfo {
585                                frame: rf,
586                                name: r_name.clone(),
587                                schema: right_schema.clone(),
588                            },
589                            constraint,
590                            match op {
591                                JoinOperator::FullOuter(_) => JoinType::Full,
592                                JoinOperator::LeftOuter(_) => JoinType::Left,
593                                JoinOperator::RightOuter(_) => JoinType::Right,
594                                JoinOperator::Inner(_) => JoinType::Inner,
595                                #[cfg(feature = "semi_anti_join")]
596                                JoinOperator::Anti(_) | JoinOperator::LeftAnti(_) | JoinOperator::RightAnti(_) => JoinType::Anti,
597                                #[cfg(feature = "semi_anti_join")]
598                                JoinOperator::Semi(_) | JoinOperator::LeftSemi(_) | JoinOperator::RightSemi(_) => JoinType::Semi,
599                                join_type => polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type),
600                            },
601                        )?
602                    },
603                    JoinOperator::CrossJoin => {
604                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
605                    },
606                    join_type => {
607                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
608                    },
609                };
610
611                // track join-aliased columns so we can resolve them later
612                let joined_schema = self.get_frame_schema(&mut lf)?;
613
614                self.joined_aliases.borrow_mut().insert(
615                    r_name.to_string(),
616                    right_schema
617                        .iter_names()
618                        .filter_map(|name| {
619                            // col exists in both tables and is aliased in the joined result
620                            let aliased_name = format!("{}:{}", name, r_name);
621                            if left_schema.contains(name)
622                                && joined_schema.contains(aliased_name.as_str())
623                            {
624                                Some((name.to_string(), aliased_name))
625                            } else {
626                                None
627                            }
628                        })
629                        .collect::<PlHashMap<String, String>>(),
630                );
631            }
632        };
633        Ok(lf)
634    }
635
636    /// Execute the 'SELECT' part of the query.
637    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
638        let mut lf = if select_stmt.from.is_empty() {
639            DataFrame::empty().lazy()
640        } else {
641            // Note: implicit joins need more work to support properly,
642            // explicit joins are preferred for now (ref: #16662)
643            let from = select_stmt.clone().from;
644            if from.len() > 1 {
645                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
646            }
647            self.execute_from_statement(from.first().unwrap())?
648        };
649
650        // Filter expression (WHERE clause)
651        let schema = self.get_frame_schema(&mut lf)?;
652        lf = self.process_where(lf, &select_stmt.selection)?;
653
654        // 'SELECT *' modifiers
655        let mut select_modifiers = SelectModifiers {
656            ilike: None,
657            exclude: PlHashSet::new(),
658            rename: PlHashMap::new(),
659            replace: vec![],
660        };
661
662        let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
663
664        // Check for "GROUP BY ..." (after determining projections)
665        let mut group_by_keys: Vec<Expr> = Vec::new();
666        match &select_stmt.group_by {
667            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
668            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
669                if !modifiers.is_empty() {
670                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
671                }
672                // translate the group expressions, allowing ordinal values
673                group_by_keys = group_by_exprs
674                    .iter()
675                    .map(|e| {
676                        self.expr_or_ordinal(
677                            e,
678                            &projections,
679                            None,
680                            Some(schema.deref()),
681                            "GROUP BY",
682                        )
683                    })
684                    .collect::<PolarsResult<_>>()?
685            },
686            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
687            // nested agg/window funcs to the group key (also ignores literals).
688            GroupByExpr::All(modifiers) => {
689                if !modifiers.is_empty() {
690                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
691                }
692                projections.iter().for_each(|expr| match expr {
693                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
694                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
695                    Expr::Column(_) => group_by_keys.push(expr.clone()),
696                    Expr::Alias(e, _)
697                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
698                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
699                        if let Expr::Column(name) = &**e {
700                            group_by_keys.push(col(name.clone()));
701                        }
702                    },
703                    _ => {
704                        // If not quick-matched, add if no nested agg/window expressions
705                        if !has_expr(expr, |e| {
706                            matches!(e, Expr::Agg(_))
707                                || matches!(e, Expr::Len)
708                                || matches!(e, Expr::Window { .. })
709                        }) {
710                            group_by_keys.push(expr.clone())
711                        }
712                    },
713                });
714            },
715        };
716
717        lf = if group_by_keys.is_empty() {
718            // The 'having' clause is only valid inside 'group by'
719            if select_stmt.having.is_some() {
720                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
721            };
722
723            // Final/selected cols, accounting for 'SELECT *' modifiers
724            let mut retained_cols = Vec::with_capacity(projections.len());
725            let mut retained_names = Vec::with_capacity(projections.len());
726            let have_order_by = query.order_by.is_some();
727            let mut all_literal = true;
728
729            // Note: if there is an 'order by' then we project everything (original cols
730            // and new projections) and *then* select the final cols; the retained cols
731            // are used to ensure a correct final projection. If there's no 'order by',
732            // clause then we can project the final column *expressions* directly.
733            for p in projections.iter() {
734                let name = p
735                    .to_field(schema.deref(), Context::Default)?
736                    .name
737                    .to_string();
738                if select_modifiers.matches_ilike(&name)
739                    && !select_modifiers.exclude.contains(&name)
740                {
741                    all_literal &= expr_to_leaf_column_names_iter(p).next().is_none();
742                    retained_cols.push(if have_order_by {
743                        col(name.as_str())
744                    } else {
745                        p.clone()
746                    });
747                    retained_names.push(col(name));
748                }
749            }
750
751            // Apply the remaining modifiers and establish the final projection
752            if have_order_by {
753                lf = lf.with_columns(projections);
754            }
755            if !select_modifiers.replace.is_empty() {
756                lf = lf.with_columns(&select_modifiers.replace);
757            }
758            if !select_modifiers.rename.is_empty() {
759                lf = lf.with_columns(select_modifiers.renamed_cols());
760            }
761
762            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
763
764            if all_literal && !have_order_by {
765                lf = lf.with_columns(retained_cols).select(retained_names);
766            } else {
767                lf = lf.select(retained_cols);
768            }
769
770            if !select_modifiers.rename.is_empty() {
771                lf = lf.rename(
772                    select_modifiers.rename.keys(),
773                    select_modifiers.rename.values(),
774                    true,
775                );
776            };
777            lf
778        } else {
779            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
780            lf = self.process_order_by(lf, &query.order_by, None)?;
781
782            // Apply optional 'having' clause, post-aggregation.
783            let schema = Some(self.get_frame_schema(&mut lf)?);
784            match select_stmt.having.as_ref() {
785                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
786                None => lf,
787            }
788        };
789
790        // Apply optional DISTINCT clause.
791        lf = match &select_stmt.distinct {
792            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
793            Some(Distinct::On(exprs)) => {
794                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
795                let schema = Some(self.get_frame_schema(&mut lf)?);
796                let cols = exprs
797                    .iter()
798                    .map(|e| {
799                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
800                        if let Expr::Column(name) = expr {
801                            Ok(name.clone())
802                        } else {
803                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
804                        }
805                    })
806                    .collect::<PolarsResult<Vec<_>>>()?;
807
808                // DISTINCT ON has to apply the ORDER BY before the operation.
809                lf = self.process_order_by(lf, &query.order_by, None)?;
810                return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
811            },
812            None => lf,
813        };
814        Ok(lf)
815    }
816
817    fn column_projections(
818        &mut self,
819        select_stmt: &Select,
820        schema: &SchemaRef,
821        select_modifiers: &mut SelectModifiers,
822    ) -> PolarsResult<Vec<Expr>> {
823        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
824            .projection
825            .iter()
826            .map(|select_item| match select_item {
827                SelectItem::UnnamedExpr(expr) => {
828                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
829                },
830                SelectItem::ExprWithAlias { expr, alias } => {
831                    let expr = parse_sql_expr(expr, self, Some(schema))?;
832                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
833                },
834                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
835                    .process_qualified_wildcard(
836                        obj_name,
837                        wildcard_options,
838                        select_modifiers,
839                        Some(schema),
840                    ),
841                SelectItem::Wildcard(wildcard_options) => {
842                    let cols = schema
843                        .iter_names()
844                        .map(|name| col(name.clone()))
845                        .collect::<Vec<_>>();
846
847                    self.process_wildcard_additional_options(
848                        cols,
849                        wildcard_options,
850                        select_modifiers,
851                        Some(schema),
852                    )
853                },
854            })
855            .collect();
856
857        let flattened_exprs: Vec<Expr> = parsed_items?
858            .into_iter()
859            .flatten()
860            .flat_map(|expr| expand_exprs(expr, schema))
861            .collect();
862
863        Ok(flattened_exprs)
864    }
865
866    fn process_where(
867        &mut self,
868        mut lf: LazyFrame,
869        expr: &Option<SQLExpr>,
870    ) -> PolarsResult<LazyFrame> {
871        if let Some(expr) = expr {
872            let schema = self.get_frame_schema(&mut lf)?;
873
874            // shortcut filter evaluation if given expression is just TRUE or FALSE
875            let (all_true, all_false) = match expr {
876                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
877                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
878                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
879                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
880                        (a != b, a == b)
881                    },
882                    _ => (false, false),
883                },
884                _ => (false, false),
885            };
886            if all_true {
887                return Ok(lf);
888            } else if all_false {
889                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
890            }
891
892            // ...otherwise parse and apply the filter as normal
893            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
894            if filter_expression.clone().meta().has_multiple_outputs() {
895                filter_expression = all_horizontal([filter_expression])?;
896            }
897            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
898            lf = lf.filter(filter_expression);
899        }
900        Ok(lf)
901    }
902
903    pub(super) fn process_join(
904        &mut self,
905        tbl_left: &TableInfo,
906        tbl_right: &TableInfo,
907        constraint: &JoinConstraint,
908        join_type: JoinType,
909    ) -> PolarsResult<LazyFrame> {
910        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
911
912        let joined = tbl_left
913            .frame
914            .clone()
915            .join_builder()
916            .with(tbl_right.frame.clone())
917            .left_on(left_on)
918            .right_on(right_on)
919            .how(join_type)
920            .suffix(format!(":{}", tbl_right.name))
921            .coalesce(JoinCoalesce::KeepColumns)
922            .finish();
923
924        Ok(joined)
925    }
926
927    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
928        let mut contexts = vec![];
929        for expr in exprs {
930            *expr = expr.clone().map_expr(|e| match e {
931                Expr::SubPlan(lp, names) => {
932                    contexts.push(<LazyFrame>::from((**lp).clone()));
933                    if names.len() == 1 {
934                        Expr::Column(names[0].as_str().into())
935                    } else {
936                        Expr::SubPlan(lp, names)
937                    }
938                },
939                e => e,
940            })
941        }
942
943        if contexts.is_empty() {
944            lf
945        } else {
946            lf.with_context(contexts)
947        }
948    }
949
950    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
951        if let Statement::CreateTable(CreateTable {
952            if_not_exists,
953            name,
954            query,
955            ..
956        }) = stmt
957        {
958            let tbl_name = name.0.first().unwrap().value.as_str();
959            // CREATE TABLE IF NOT EXISTS
960            if *if_not_exists && self.table_map.contains_key(tbl_name) {
961                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
962                // CREATE OR REPLACE TABLE
963            }
964            if let Some(query) = query {
965                let lf = self.execute_query(query)?;
966                self.register(tbl_name, lf);
967                let out = df! {
968                    "Response" => ["CREATE TABLE"]
969                }
970                .unwrap()
971                .lazy();
972                Ok(out)
973            } else {
974                polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
975            }
976        } else {
977            unreachable!()
978        }
979    }
980
981    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
982        match relation {
983            TableFactor::Table {
984                name, alias, args, ..
985            } => {
986                if let Some(args) = args {
987                    return self.execute_table_function(name, alias, &args.args);
988                }
989                let tbl_name = name.0.first().unwrap().value.as_str();
990                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
991                    match alias {
992                        Some(alias) => {
993                            self.table_aliases
994                                .borrow_mut()
995                                .insert(alias.name.value.clone(), tbl_name.to_string());
996                            Ok((alias.to_string(), lf))
997                        },
998                        None => Ok((tbl_name.to_string(), lf)),
999                    }
1000                } else {
1001                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1002                }
1003            },
1004            TableFactor::Derived {
1005                lateral,
1006                subquery,
1007                alias,
1008            } => {
1009                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1010                if let Some(alias) = alias {
1011                    let mut lf = self.execute_query_no_ctes(subquery)?;
1012                    lf = self.rename_columns_from_table_alias(lf, alias)?;
1013                    self.table_map.insert(alias.name.value.clone(), lf.clone());
1014                    Ok((alias.name.value.clone(), lf))
1015                } else {
1016                    polars_bail!(SQLSyntax: "derived tables must have aliases");
1017                }
1018            },
1019            TableFactor::UNNEST {
1020                alias,
1021                array_exprs,
1022                with_offset,
1023                with_offset_alias: _,
1024                ..
1025            } => {
1026                if let Some(alias) = alias {
1027                    let table_name = alias.name.value.clone();
1028                    let column_names: Vec<Option<PlSmallStr>> = alias
1029                        .columns
1030                        .iter()
1031                        .map(|c| {
1032                            if c.name.value.is_empty() {
1033                                None
1034                            } else {
1035                                Some(PlSmallStr::from_str(c.name.value.as_str()))
1036                            }
1037                        })
1038                        .collect();
1039
1040                    let column_values: Vec<Series> = array_exprs
1041                        .iter()
1042                        .map(|arr| parse_sql_array(arr, self))
1043                        .collect::<Result<_, _>>()?;
1044
1045                    polars_ensure!(!column_names.is_empty(),
1046                        SQLSyntax:
1047                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1048                    );
1049                    if column_names.len() != column_values.len() {
1050                        let plural = if column_values.len() > 1 { "s" } else { "" };
1051                        polars_bail!(
1052                            SQLSyntax:
1053                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1054                        );
1055                    }
1056                    let column_series: Vec<Column> = column_values
1057                        .into_iter()
1058                        .zip(column_names)
1059                        .map(|(s, name)| {
1060                            if let Some(name) = name {
1061                                s.clone().with_name(name)
1062                            } else {
1063                                s.clone()
1064                            }
1065                        })
1066                        .map(Column::from)
1067                        .collect();
1068
1069                    let lf = DataFrame::new(column_series)?.lazy();
1070                    if *with_offset {
1071                        // TODO: support 'WITH ORDINALITY' modifier.
1072                        //  (note that 'WITH OFFSET' is BigQuery-specific syntax, not PostgreSQL)
1073                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH OFFSET/ORDINALITY");
1074                    }
1075                    self.table_map.insert(table_name.clone(), lf.clone());
1076                    Ok((table_name.clone(), lf))
1077                } else {
1078                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1079                }
1080            },
1081            TableFactor::NestedJoin {
1082                table_with_joins,
1083                alias,
1084            } => {
1085                let lf = self.execute_from_statement(table_with_joins)?;
1086                match alias {
1087                    Some(a) => Ok((a.name.value.clone(), lf)),
1088                    None => Ok(("".to_string(), lf)),
1089                }
1090            },
1091            // Support bare table, optionally with an alias, for now
1092            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1093        }
1094    }
1095
1096    fn execute_table_function(
1097        &mut self,
1098        name: &ObjectName,
1099        alias: &Option<TableAlias>,
1100        args: &[FunctionArg],
1101    ) -> PolarsResult<(String, LazyFrame)> {
1102        let tbl_fn = name.0.first().unwrap().value.as_str();
1103        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1104        let (tbl_name, lf) = read_fn.execute(args)?;
1105        #[allow(clippy::useless_asref)]
1106        let tbl_name = alias
1107            .as_ref()
1108            .map(|a| a.name.value.clone())
1109            .unwrap_or_else(|| tbl_name);
1110
1111        self.table_map.insert(tbl_name.clone(), lf.clone());
1112        Ok((tbl_name, lf))
1113    }
1114
1115    fn process_order_by(
1116        &mut self,
1117        mut lf: LazyFrame,
1118        order_by: &Option<OrderBy>,
1119        selected: Option<&[Expr]>,
1120    ) -> PolarsResult<LazyFrame> {
1121        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1122            return Ok(lf);
1123        }
1124        let schema = self.get_frame_schema(&mut lf)?;
1125        let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1126
1127        let order_by = order_by.as_ref().unwrap().exprs.clone();
1128        let mut descending = Vec::with_capacity(order_by.len());
1129        let mut nulls_last = Vec::with_capacity(order_by.len());
1130        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1131
1132        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
1133            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1134        {
1135            if let Some(selected) = selected {
1136                by.extend(selected.iter().cloned());
1137            } else {
1138                by.extend(columns_iter);
1139            };
1140            let desc_order = !order_by[0].asc.unwrap_or(true);
1141            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1142            descending.resize(by.len(), desc_order);
1143        } else {
1144            let columns = &columns_iter.collect::<Vec<_>>();
1145            for ob in order_by {
1146                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
1147                // https://www.postgresql.org/docs/current/queries-order.html
1148                let desc_order = !ob.asc.unwrap_or(true);
1149                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1150                descending.push(desc_order);
1151
1152                // translate order expression, allowing ordinal values
1153                by.push(self.expr_or_ordinal(
1154                    &ob.expr,
1155                    columns,
1156                    selected,
1157                    Some(&schema),
1158                    "ORDER BY",
1159                )?)
1160            }
1161        }
1162        Ok(lf.sort_by_exprs(
1163            &by,
1164            SortMultipleOptions::default()
1165                .with_order_descending_multi(descending)
1166                .with_nulls_last_multi(nulls_last)
1167                .with_maintain_order(true),
1168        ))
1169    }
1170
1171    fn process_group_by(
1172        &mut self,
1173        mut lf: LazyFrame,
1174        group_by_keys: &[Expr],
1175        projections: &[Expr],
1176    ) -> PolarsResult<LazyFrame> {
1177        let schema_before = self.get_frame_schema(&mut lf)?;
1178        let group_by_keys_schema =
1179            expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1180
1181        // Remove the group_by keys as polars adds those implicitly.
1182        let mut aggregation_projection = Vec::with_capacity(projections.len());
1183        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1184        let mut projection_aliases = PlHashSet::new();
1185        let mut group_key_aliases = PlHashSet::new();
1186
1187        for mut e in projections {
1188            // `Len` represents COUNT(*) so we treat as an aggregation here.
1189            let is_agg_or_window = has_expr(e, |e| {
1190                matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1191            });
1192
1193            // Note: if simple aliased expression we defer aliasing until after the group_by.
1194            if let Expr::Alias(expr, alias) = e {
1195                if e.clone().meta().is_simple_projection() {
1196                    group_key_aliases.insert(alias.as_ref());
1197                    e = expr
1198                } else if let Expr::Function {
1199                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1200                    ..
1201                } = expr.deref()
1202                {
1203                    projection_overrides
1204                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1205                } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1206                    projection_aliases.insert(alias.as_ref());
1207                }
1208            }
1209            let field = e.to_field(&schema_before, Context::Default)?;
1210            if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1211                let mut e = e.clone();
1212                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1213                    e = (**expr).clone();
1214                } else if let Expr::Alias(expr, name) = &e {
1215                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1216                        e = (**expr).clone().alias(name.clone());
1217                    }
1218                }
1219                aggregation_projection.push(e);
1220            } else if let Expr::Column(_)
1221            | Expr::Function {
1222                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1223                ..
1224            } = e
1225            {
1226                // Non-aggregated columns must be part of the GROUP BY clause
1227                if !group_by_keys_schema.contains(&field.name) {
1228                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1229                }
1230            }
1231        }
1232        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1233        let projection_schema =
1234            expressions_to_schema(projections, &schema_before, Context::Default)?;
1235
1236        // A final projection to get the proper order and any deferred transforms/aliases.
1237        let final_projection = projection_schema
1238            .iter_names()
1239            .zip(projections)
1240            .map(|(name, projection_expr)| {
1241                if let Some(expr) = projection_overrides.get(name.as_str()) {
1242                    expr.clone()
1243                } else if group_by_keys_schema.get(name).is_some()
1244                    || projection_aliases.contains(name.as_str())
1245                    || group_key_aliases.contains(name.as_str())
1246                {
1247                    projection_expr.clone()
1248                } else {
1249                    col(name.clone())
1250                }
1251            })
1252            .collect::<Vec<_>>();
1253
1254        Ok(aggregated.select(&final_projection))
1255    }
1256
1257    fn process_limit_offset(
1258        &self,
1259        lf: LazyFrame,
1260        limit: &Option<SQLExpr>,
1261        offset: &Option<Offset>,
1262    ) -> PolarsResult<LazyFrame> {
1263        match (offset, limit) {
1264            (
1265                Some(Offset {
1266                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1267                    ..
1268                }),
1269                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1270            ) => Ok(lf.slice(
1271                offset
1272                    .parse()
1273                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1274                limit
1275                    .parse()
1276                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1277            )),
1278            (
1279                Some(Offset {
1280                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
1281                    ..
1282                }),
1283                None,
1284            ) => Ok(lf.slice(
1285                offset
1286                    .parse()
1287                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1288                IdxSize::MAX,
1289            )),
1290            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1291                limit
1292                    .parse()
1293                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1294            )),
1295            (None, None) => Ok(lf),
1296            _ => polars_bail!(
1297                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1298            ),
1299        }
1300    }
1301
1302    fn process_qualified_wildcard(
1303        &mut self,
1304        ObjectName(idents): &ObjectName,
1305        options: &WildcardAdditionalOptions,
1306        modifiers: &mut SelectModifiers,
1307        schema: Option<&Schema>,
1308    ) -> PolarsResult<Vec<Expr>> {
1309        let mut new_idents = idents.clone();
1310        new_idents.push(Ident::new("*"));
1311
1312        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1313        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1314    }
1315
1316    fn process_wildcard_additional_options(
1317        &mut self,
1318        exprs: Vec<Expr>,
1319        options: &WildcardAdditionalOptions,
1320        modifiers: &mut SelectModifiers,
1321        schema: Option<&Schema>,
1322    ) -> PolarsResult<Vec<Expr>> {
1323        if options.opt_except.is_some() && options.opt_exclude.is_some() {
1324            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1325        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1326            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1327        }
1328
1329        // SELECT * EXCLUDE
1330        if let Some(items) = &options.opt_exclude {
1331            match items {
1332                ExcludeSelectItem::Single(ident) => {
1333                    modifiers.exclude.insert(ident.value.clone());
1334                },
1335                ExcludeSelectItem::Multiple(idents) => {
1336                    modifiers
1337                        .exclude
1338                        .extend(idents.iter().map(|i| i.value.clone()));
1339                },
1340            };
1341        }
1342
1343        // SELECT * EXCEPT
1344        if let Some(items) = &options.opt_except {
1345            modifiers.exclude.insert(items.first_element.value.clone());
1346            modifiers
1347                .exclude
1348                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1349        }
1350
1351        // SELECT * ILIKE
1352        if let Some(item) = &options.opt_ilike {
1353            let rx = regex::escape(item.pattern.as_str())
1354                .replace('%', ".*")
1355                .replace('_', ".");
1356
1357            modifiers.ilike = Some(regex::Regex::new(format!("^(?is){}$", rx).as_str()).unwrap());
1358        }
1359
1360        // SELECT * RENAME
1361        if let Some(items) = &options.opt_rename {
1362            let renames = match items {
1363                RenameSelectItem::Single(rename) => vec![rename],
1364                RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1365            };
1366            for rn in renames {
1367                let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1368                let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1369                if before != after {
1370                    modifiers.rename.insert(before, after);
1371                }
1372            }
1373        }
1374
1375        // SELECT * REPLACE
1376        if let Some(replacements) = &options.opt_replace {
1377            for rp in &replacements.items {
1378                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1379                modifiers
1380                    .replace
1381                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1382            }
1383        }
1384        Ok(exprs)
1385    }
1386
1387    fn rename_columns_from_table_alias(
1388        &mut self,
1389        mut lf: LazyFrame,
1390        alias: &TableAlias,
1391    ) -> PolarsResult<LazyFrame> {
1392        if alias.columns.is_empty() {
1393            Ok(lf)
1394        } else {
1395            let schema = self.get_frame_schema(&mut lf)?;
1396            if alias.columns.len() != schema.len() {
1397                polars_bail!(
1398                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1399                    alias.columns.len(), alias.name.value, schema.len()
1400                )
1401            } else {
1402                let existing_columns: Vec<_> = schema.iter_names().collect();
1403                let new_columns: Vec<_> =
1404                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
1405                Ok(lf.rename(existing_columns, new_columns, true))
1406            }
1407        }
1408    }
1409}
1410
1411impl SQLContext {
1412    /// Get internal table map. For internal use only.
1413    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1414        self.table_map.clone()
1415    }
1416
1417    /// Create a new SQLContext from a table map. For internal use only
1418    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1419        Self {
1420            table_map,
1421            ..Default::default()
1422        }
1423    }
1424}
1425
1426fn collect_compound_identifiers(
1427    left: &[Ident],
1428    right: &[Ident],
1429    left_name: &str,
1430    right_name: &str,
1431) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1432    if left.len() == 2 && right.len() == 2 {
1433        let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1434        let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1435
1436        // switch left/right operands if the caller has them in reverse
1437        if left_name == tbl_b || right_name == tbl_a {
1438            Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1439        } else {
1440            Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1441        }
1442    } else {
1443        polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1444    }
1445}
1446
1447fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1448    match expr {
1449        Expr::Wildcard => schema
1450            .iter_names()
1451            .map(|name| col(name.clone()))
1452            .collect::<Vec<_>>(),
1453        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1454            let rx = regex::Regex::new(&nm).unwrap();
1455            schema
1456                .iter_names()
1457                .filter(|name| rx.is_match(name))
1458                .map(|name| col(name.clone()))
1459                .collect::<Vec<_>>()
1460        },
1461        Expr::Columns(names) => names
1462            .iter()
1463            .map(|name| col(name.clone()))
1464            .collect::<Vec<_>>(),
1465        _ => vec![expr],
1466    }
1467}
1468
1469fn is_regex_colname(nm: &str) -> bool {
1470    nm.starts_with('^') && nm.ends_with('$')
1471}
1472
1473fn process_join_on(
1474    expression: &sqlparser::ast::Expr,
1475    tbl_left: &TableInfo,
1476    tbl_right: &TableInfo,
1477) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1478    match expression {
1479        SQLExpr::BinaryOp { left, op, right } => match op {
1480            BinaryOperator::And => {
1481                let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1482                let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1483                left_i.append(&mut left_j);
1484                right_i.append(&mut right_j);
1485                Ok((left_i, right_i))
1486            },
1487            BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1488                (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1489                    collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1490                },
1491                _ => {
1492                    polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1493                },
1494            },
1495            _ => {
1496                polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1497            },
1498        },
1499        SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1500        _ => {
1501            polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1502        },
1503    }
1504}
1505
1506fn process_join_constraint(
1507    constraint: &JoinConstraint,
1508    tbl_left: &TableInfo,
1509    tbl_right: &TableInfo,
1510) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1511    match constraint {
1512        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1513            process_join_on(expr, tbl_left, tbl_right)
1514        },
1515        JoinConstraint::Using(idents) if !idents.is_empty() => {
1516            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1517            Ok((using.clone(), using))
1518        },
1519        JoinConstraint::Natural => {
1520            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1521            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1522            let on: Vec<Expr> = left_names
1523                .intersection(&right_names)
1524                .map(|&name| col(name.clone()))
1525                .collect();
1526            if on.is_empty() {
1527                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1528            }
1529            Ok((on.clone(), on))
1530        },
1531        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1532    }
1533}