1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12 BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13 FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14 OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15 Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16 WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29 pub(crate) frame: LazyFrame,
30 pub(crate) name: PlSmallStr,
31 pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
40impl SelectModifiers {
41 fn matches_ilike(&self, s: &str) -> bool {
42 match &self.ilike {
43 Some(rx) => rx.is_match(s),
44 None => true,
45 }
46 }
47 fn renamed_cols(&self) -> Vec<Expr> {
48 self.rename
49 .iter()
50 .map(|(before, after)| col(before.clone()).alias(after.clone()))
51 .collect()
52 }
53}
54
55#[derive(Clone)]
57pub struct SQLContext {
58 pub(crate) table_map: PlHashMap<String, LazyFrame>,
59 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60 pub(crate) lp_arena: Arena<IR>,
61 pub(crate) expr_arena: Arena<AExpr>,
62
63 cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64 table_aliases: RefCell<PlHashMap<String, String>>,
65 joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69 fn default() -> Self {
70 Self {
71 function_registry: Arc::new(DefaultFunctionRegistry {}),
72 table_map: Default::default(),
73 cte_map: Default::default(),
74 table_aliases: Default::default(),
75 joined_aliases: Default::default(),
76 lp_arena: Default::default(),
77 expr_arena: Default::default(),
78 }
79 }
80}
81
82impl SQLContext {
83 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn get_tables(&self) -> Vec<String> {
96 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97 tables.sort_unstable();
98 tables
99 }
100
101 pub fn register(&mut self, name: &str, lf: LazyFrame) {
117 self.table_map.insert(name.to_owned(), lf);
118 }
119
120 pub fn unregister(&mut self, name: &str) {
122 self.table_map.remove(&name.to_owned());
123 }
124
125 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144 let mut parser = Parser::new(&GenericDialect);
145 parser = parser.with_options(ParserOptions {
146 trailing_commas: true,
147 ..Default::default()
148 });
149
150 let ast = parser
151 .try_with_sql(query)
152 .map_err(to_sql_interface_err)?
153 .parse_statements()
154 .map_err(to_sql_interface_err)?;
155
156 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157 let res = self.execute_statement(ast.first().unwrap())?;
158
159 let lp_arena = std::mem::take(&mut self.lp_arena);
162 let expr_arena = std::mem::take(&mut self.expr_arena);
163 res.set_cached_arena(lp_arena, expr_arena);
164
165 self.cte_map.borrow_mut().clear();
167 self.table_aliases.borrow_mut().clear();
168 self.joined_aliases.borrow_mut().clear();
169
170 Ok(res)
171 }
172
173 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176 self.function_registry = function_registry;
177 self
178 }
179
180 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182 &self.function_registry
183 }
184
185 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187 Arc::get_mut(&mut self.function_registry).unwrap()
188 }
189}
190
191impl SQLContext {
192 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193 let ast = stmt;
194 Ok(match ast {
195 Statement::Query(query) => self.execute_query(query)?,
196 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198 stmt @ Statement::Drop {
199 object_type: ObjectType::Table,
200 ..
201 } => self.execute_drop_table(stmt)?,
202 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205 _ => polars_bail!(
206 SQLInterface: "statement type is not supported:\n{:?}", ast,
207 ),
208 })
209 }
210
211 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212 self.register_ctes(query)?;
213 self.execute_query_no_ctes(query)
214 }
215
216 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217 let lf = self.process_query(&query.body, query)?;
218 self.process_limit_offset(lf, &query.limit, &query.offset)
219 }
220
221 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223 }
224
225 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226 let table = self.table_map.get(name).cloned();
227 table
228 .or_else(|| self.cte_map.borrow().get(name).cloned())
229 .or_else(|| {
230 self.table_aliases
231 .borrow()
232 .get(name)
233 .and_then(|alias| self.table_map.get(alias).cloned())
234 })
235 }
236
237 fn expr_or_ordinal(
238 &mut self,
239 e: &SQLExpr,
240 exprs: &[Expr],
241 selected: Option<&[Expr]>,
242 schema: Option<&Schema>,
243 clause: &str,
244 ) -> PolarsResult<Expr> {
245 match e {
246 SQLExpr::UnaryOp {
247 op: UnaryOperator::Minus,
248 expr,
249 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251 Err(polars_err!(
252 SQLSyntax:
253 "negative ordinal values are invalid for {}; found -{}",
254 clause,
255 idx
256 ))
257 } else {
258 unreachable!()
259 }
260 },
261 SQLExpr::Value(SQLValue::Number(idx, _)) => {
262 let idx = idx.parse::<usize>().map_err(|_| {
264 polars_err!(
265 SQLSyntax:
266 "negative ordinal values are invalid for {}; found {}",
267 clause,
268 idx
269 )
270 })?;
271 let cols = if let Some(cols) = selected {
274 cols
275 } else {
276 exprs
277 };
278 Ok(cols
279 .get(idx - 1)
280 .ok_or_else(|| {
281 polars_err!(
282 SQLInterface:
283 "{} ordinal value must refer to a valid column; found {}",
284 clause,
285 idx
286 )
287 })?
288 .clone())
289 },
290 SQLExpr::Value(v) => Err(polars_err!(
291 SQLSyntax:
292 "{} requires a valid expression or positive ordinal; found {}", clause, v,
293 )),
294 _ => parse_sql_expr(e, self, schema),
295 }
296 }
297
298 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299 if self.joined_aliases.borrow().contains_key(tbl_name) {
300 self.joined_aliases
301 .borrow()
302 .get(tbl_name)
303 .and_then(|aliases| aliases.get(column_name))
304 .cloned()
305 .unwrap_or_else(|| column_name.to_string())
306 } else {
307 column_name.to_string()
308 }
309 }
310
311 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312 match expr {
313 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314 SetExpr::Query(query) => self.execute_query_no_ctes(query),
315 SetExpr::SetOperation {
316 op: SetOperator::Union,
317 set_quantifier,
318 left,
319 right,
320 } => self.process_union(left, right, set_quantifier, query),
321
322 #[cfg(feature = "semi_anti_join")]
323 SetExpr::SetOperation {
324 op: SetOperator::Intersect | SetOperator::Except,
325 set_quantifier,
326 left,
327 right,
328 } => self.process_except_intersect(left, right, set_quantifier, query),
329
330 SetExpr::Values(Values {
331 explicit_row: _,
332 rows,
333 }) => self.process_values(rows),
334
335 SetExpr::Table(tbl) => {
336 if tbl.table_name.is_some() {
337 let table_name = tbl.table_name.as_ref().unwrap();
338 self.get_table_from_current_scope(table_name)
339 .ok_or_else(|| {
340 polars_err!(
341 SQLInterface: "no table or alias named '{}' found",
342 tbl
343 )
344 })
345 } else {
346 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347 }
348 },
349 op => {
350 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351 },
352 }
353 }
354
355 #[cfg(feature = "semi_anti_join")]
356 fn process_except_intersect(
357 &mut self,
358 left: &SetExpr,
359 right: &SetExpr,
360 quantifier: &SetQuantifier,
361 query: &Query,
362 ) -> PolarsResult<LazyFrame> {
363 let (join_type, op_name) = match *query.body {
364 SetExpr::SetOperation {
365 op: SetOperator::Except,
366 ..
367 } => (JoinType::Anti, "EXCEPT"),
368 _ => (JoinType::Semi, "INTERSECT"),
369 };
370 let mut lf = self.process_query(left, query)?;
371 let mut rf = self.process_query(right, query)?;
372 let join = lf
373 .clone()
374 .join_builder()
375 .with(rf.clone())
376 .how(join_type)
377 .join_nulls(true);
378
379 let lf_schema = self.get_frame_schema(&mut lf)?;
380 let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381 let joined_tbl = match quantifier {
382 SetQuantifier::ByName => join.on(lf_cols).finish(),
383 SetQuantifier::Distinct | SetQuantifier::None => {
384 let rf_schema = self.get_frame_schema(&mut rf)?;
385 let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386 if lf_cols.len() != rf_cols.len() {
387 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388 }
389 join.left_on(lf_cols).right_on(rf_cols).finish()
390 },
391 _ => {
392 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393 },
394 };
395 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396 }
397
398 fn process_union(
399 &mut self,
400 left: &SetExpr,
401 right: &SetExpr,
402 quantifier: &SetQuantifier,
403 query: &Query,
404 ) -> PolarsResult<LazyFrame> {
405 let mut lf = self.process_query(left, query)?;
406 let mut rf = self.process_query(right, query)?;
407 let opts = UnionArgs {
408 parallel: true,
409 to_supertypes: true,
410 ..Default::default()
411 };
412 match quantifier {
413 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415 let lf_schema = self.get_frame_schema(&mut lf)?;
416 let rf_schema = self.get_frame_schema(&mut rf)?;
417 if lf_schema.len() != rf_schema.len() {
418 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419 }
420 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421 match quantifier {
422 SetQuantifier::Distinct | SetQuantifier::None => {
423 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424 },
425 _ => concatenated,
426 }
427 },
428 #[cfg(feature = "diagonal_concat")]
430 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431 #[cfg(feature = "diagonal_concat")]
433 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436 },
437 #[allow(unreachable_patterns)]
438 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439 }
440 }
441
442 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443 let frame_rows: Vec<Row> = values.iter().map(|row| {
444 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445 let expr = parse_sql_expr(expr, self, None)?;
446 match expr {
447 Expr::Literal(value) => {
448 value.to_any_value()
449 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450 .map(|av| av.into_static())
451 },
452 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453 }
454 }).collect();
455 row_data.map(Row::new)
456 }).collect::<Result<_, _>>()?;
457
458 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459 }
460
461 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463 match stmt {
464 Statement::Explain { statement, .. } => {
465 let lf = self.execute_statement(statement)?;
466 let plan = lf.describe_optimized_plan()?;
467 let plan = plan
468 .split('\n')
469 .collect::<Series>()
470 .with_name(PlSmallStr::from_static("Logical Plan"))
471 .into_column();
472 let df = DataFrame::new(vec![plan])?;
473 Ok(df.lazy())
474 },
475 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476 }
477 }
478
479 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481 let tables = Column::new("name".into(), self.get_tables());
482 let df = DataFrame::new(vec![tables])?;
483 Ok(df.lazy())
484 }
485
486 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488 match stmt {
489 Statement::Drop { names, .. } => {
490 names.iter().for_each(|name| {
491 self.table_map.remove(&name.to_string());
492 });
493 Ok(DataFrame::empty().lazy())
494 },
495 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496 }
497 }
498
499 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501 if let Statement::Delete(Delete {
502 tables,
503 from,
504 using,
505 selection,
506 returning,
507 order_by,
508 limit,
509 }) = stmt
510 {
511 if !tables.is_empty()
512 || using.is_some()
513 || returning.is_some()
514 || limit.is_some()
515 || !order_by.is_empty()
516 {
517 let error_message = match () {
518 _ if !tables.is_empty() => "DELETE expects exactly one table name",
519 _ if using.is_some() => "DELETE does not support the USING clause",
520 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523 _ => unreachable!(),
524 };
525 polars_bail!(SQLInterface: error_message);
526 }
527 let from_tables = match &from {
528 FromTable::WithFromKeyword(from) => from,
529 FromTable::WithoutKeyword(from) => from,
530 };
531 if from_tables.len() > 1 {
532 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533 }
534 let tbl_expr = from_tables.first().unwrap();
535 if !tbl_expr.joins.is_empty() {
536 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537 }
538 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539 if selection.is_none() {
540 Ok(DataFrame::empty_with_schema(
542 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543 .unwrap()
544 .as_ref(),
545 )
546 .lazy())
547 } else {
548 Ok(self.process_where(lf.clone(), selection, true)?)
550 }
551 } else {
552 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553 }
554 }
555
556 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558 if let Statement::Truncate {
559 table_names,
560 partitions,
561 ..
562 } = stmt
563 {
564 match partitions {
565 None => {
566 if table_names.len() != 1 {
567 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568 }
569 let tbl = table_names[0].to_string();
570 if let Some(lf) = self.table_map.get_mut(&tbl) {
571 *lf = DataFrame::empty_with_schema(
572 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573 .unwrap()
574 .as_ref(),
575 )
576 .lazy();
577 Ok(lf.clone())
578 } else {
579 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580 }
581 },
582 _ => {
583 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584 },
585 }
586 } else {
587 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588 }
589 }
590
591 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592 self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593 }
594
595 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596 if let Some(with) = &query.with {
597 if with.recursive {
598 polars_bail!(SQLInterface: "recursive CTEs are not supported")
599 }
600 for cte in &with.cte_tables {
601 let cte_name = cte.alias.name.value.clone();
602 let mut lf = self.execute_query(&cte.query)?;
603 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604 self.register_cte(&cte_name, lf);
605 }
606 }
607 Ok(())
608 }
609
610 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613 if !tbl_expr.joins.is_empty() {
614 for join in &tbl_expr.joins {
615 let (r_name, mut rf) = self.get_table(&join.relation)?;
616 if r_name.is_empty() {
617 polars_bail!(
619 SQLInterface:
620 "cannot join on unnamed relation; please provide an alias"
621 )
622 }
623 let left_schema = self.get_frame_schema(&mut lf)?;
624 let right_schema = self.get_frame_schema(&mut rf)?;
625
626 lf = match &join.join_operator {
627 op @ (JoinOperator::FullOuter(constraint)
628 | JoinOperator::LeftOuter(constraint)
629 | JoinOperator::RightOuter(constraint)
630 | JoinOperator::Inner(constraint)
631 | JoinOperator::Anti(constraint)
632 | JoinOperator::Semi(constraint)
633 | JoinOperator::LeftAnti(constraint)
634 | JoinOperator::LeftSemi(constraint)
635 | JoinOperator::RightAnti(constraint)
636 | JoinOperator::RightSemi(constraint)) => {
637 let (lf, rf) = match op {
638 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639 _ => (lf, rf),
640 };
641 self.process_join(
642 &TableInfo {
643 frame: lf,
644 name: (&l_name).into(),
645 schema: left_schema.clone(),
646 },
647 &TableInfo {
648 frame: rf,
649 name: (&r_name).into(),
650 schema: right_schema.clone(),
651 },
652 constraint,
653 match op {
654 JoinOperator::FullOuter(_) => JoinType::Full,
655 JoinOperator::LeftOuter(_) => JoinType::Left,
656 JoinOperator::RightOuter(_) => JoinType::Right,
657 JoinOperator::Inner(_) => JoinType::Inner,
658 #[cfg(feature = "semi_anti_join")]
659 JoinOperator::Anti(_)
660 | JoinOperator::LeftAnti(_)
661 | JoinOperator::RightAnti(_) => JoinType::Anti,
662 #[cfg(feature = "semi_anti_join")]
663 JoinOperator::Semi(_)
664 | JoinOperator::LeftSemi(_)
665 | JoinOperator::RightSemi(_) => JoinType::Semi,
666 join_type => polars_bail!(
667 SQLInterface:
668 "join type '{:?}' not currently supported",
669 join_type
670 ),
671 },
672 )?
673 },
674 JoinOperator::CrossJoin => {
675 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676 },
677 join_type => {
678 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679 },
680 };
681
682 let joined_schema = self.get_frame_schema(&mut lf)?;
684
685 self.joined_aliases.borrow_mut().insert(
686 r_name.clone(),
687 right_schema
688 .iter_names()
689 .filter_map(|name| {
690 let aliased_name = format!("{}:{}", name, r_name);
692 if left_schema.contains(name)
693 && joined_schema.contains(aliased_name.as_str())
694 {
695 Some((name.to_string(), aliased_name))
696 } else {
697 None
698 }
699 })
700 .collect::<PlHashMap<String, String>>(),
701 );
702 }
703 };
704 Ok(lf)
705 }
706
707 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709 let mut lf = if select_stmt.from.is_empty() {
710 DataFrame::empty().lazy()
711 } else {
712 let from = select_stmt.clone().from;
715 if from.len() > 1 {
716 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717 }
718 self.execute_from_statement(from.first().unwrap())?
719 };
720
721 let schema = self.get_frame_schema(&mut lf)?;
723 lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725 let mut select_modifiers = SelectModifiers {
727 ilike: None,
728 exclude: PlHashSet::new(),
729 rename: PlHashMap::new(),
730 replace: vec![],
731 };
732
733 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735 let mut group_by_keys: Vec<Expr> = Vec::new();
737 match &select_stmt.group_by {
738 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740 if !modifiers.is_empty() {
741 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742 }
743 group_by_keys = group_by_exprs
745 .iter()
746 .map(|e| {
747 self.expr_or_ordinal(
748 e,
749 &projections,
750 None,
751 Some(schema.deref()),
752 "GROUP BY",
753 )
754 })
755 .collect::<PolarsResult<_>>()?
756 },
757 GroupByExpr::All(modifiers) => {
760 if !modifiers.is_empty() {
761 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762 }
763 projections.iter().for_each(|expr| match expr {
764 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766 Expr::Column(_) => group_by_keys.push(expr.clone()),
767 Expr::Alias(e, _)
768 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770 if let Expr::Column(name) = &**e {
771 group_by_keys.push(col(name.clone()));
772 }
773 },
774 _ => {
775 if !has_expr(expr, |e| {
777 matches!(e, Expr::Agg(_))
778 || matches!(e, Expr::Len)
779 || matches!(e, Expr::Window { .. })
780 }) {
781 group_by_keys.push(expr.clone())
782 }
783 },
784 });
785 },
786 };
787
788 lf = if group_by_keys.is_empty() {
789 if select_stmt.having.is_some() {
791 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792 };
793
794 let mut retained_cols = Vec::with_capacity(projections.len());
796 let mut retained_names = Vec::with_capacity(projections.len());
797 let have_order_by = query.order_by.is_some();
798 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801 for p in projections.iter() {
806 let name = p
807 .to_field(schema.deref(), Context::Default)?
808 .name
809 .to_string();
810 if select_modifiers.matches_ilike(&name)
811 && !select_modifiers.exclude.contains(&name)
812 {
813 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815 retained_cols.push(if have_order_by {
816 col(name.as_str())
817 } else {
818 p.clone()
819 });
820 retained_names.push(col(name));
821 }
822 }
823
824 if have_order_by {
826 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832 {
833 lf = lf.with_columns(projections);
834 } else {
835 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845 lf = lf
846 .clone()
847 .select(projections)
848 .with_row_index(NAME, None)
849 .join(
850 lf.with_row_index(NAME, None),
851 [col(NAME)],
852 [col(NAME)],
853 JoinArgs {
854 how: JoinType::Left,
855 validation: Default::default(),
856 suffix: None,
857 slice: None,
858 nulls_equal: false,
859 coalesce: Default::default(),
860 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861 },
862 );
863 }
864 }
865
866 if !select_modifiers.replace.is_empty() {
867 lf = lf.with_columns(&select_modifiers.replace);
868 }
869 if !select_modifiers.rename.is_empty() {
870 lf = lf.with_columns(select_modifiers.renamed_cols());
871 }
872
873 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877 && !have_order_by
878 {
879 lf = lf.with_columns(retained_cols).select(retained_names);
881 } else {
882 lf = lf.select(retained_cols);
883 }
884
885 if !select_modifiers.rename.is_empty() {
886 lf = lf.rename(
887 select_modifiers.rename.keys(),
888 select_modifiers.rename.values(),
889 true,
890 );
891 };
892 lf
893 } else {
894 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895 lf = self.process_order_by(lf, &query.order_by, None)?;
896
897 let schema = Some(self.get_frame_schema(&mut lf)?);
899 match select_stmt.having.as_ref() {
900 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901 None => lf,
902 }
903 };
904
905 lf = match &select_stmt.distinct {
907 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908 Some(Distinct::On(exprs)) => {
909 let schema = Some(self.get_frame_schema(&mut lf)?);
911 let cols = exprs
912 .iter()
913 .map(|e| {
914 let expr = parse_sql_expr(e, self, schema.as_deref())?;
915 if let Expr::Column(name) = expr {
916 Ok(name.clone())
917 } else {
918 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919 }
920 })
921 .collect::<PolarsResult<Vec<_>>>()?;
922
923 lf = self.process_order_by(lf, &query.order_by, None)?;
925 return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
926 },
927 None => lf,
928 };
929 Ok(lf)
930 }
931
932 fn column_projections(
933 &mut self,
934 select_stmt: &Select,
935 schema: &SchemaRef,
936 select_modifiers: &mut SelectModifiers,
937 ) -> PolarsResult<Vec<Expr>> {
938 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
939 .projection
940 .iter()
941 .map(|select_item| match select_item {
942 SelectItem::UnnamedExpr(expr) => {
943 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
944 },
945 SelectItem::ExprWithAlias { expr, alias } => {
946 let expr = parse_sql_expr(expr, self, Some(schema))?;
947 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
948 },
949 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
950 .process_qualified_wildcard(
951 obj_name,
952 wildcard_options,
953 select_modifiers,
954 Some(schema),
955 ),
956 SelectItem::Wildcard(wildcard_options) => {
957 let cols = schema
958 .iter_names()
959 .map(|name| col(name.clone()))
960 .collect::<Vec<_>>();
961
962 self.process_wildcard_additional_options(
963 cols,
964 wildcard_options,
965 select_modifiers,
966 Some(schema),
967 )
968 },
969 })
970 .collect();
971
972 let flattened_exprs: Vec<Expr> = parsed_items?
973 .into_iter()
974 .flatten()
975 .flat_map(|expr| expand_exprs(expr, schema))
976 .collect();
977
978 Ok(flattened_exprs)
979 }
980
981 fn process_where(
982 &mut self,
983 mut lf: LazyFrame,
984 expr: &Option<SQLExpr>,
985 invert_filter: bool,
986 ) -> PolarsResult<LazyFrame> {
987 if let Some(expr) = expr {
988 let schema = self.get_frame_schema(&mut lf)?;
989
990 let (all_true, all_false) = match expr {
992 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
993 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
994 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
995 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
996 (a != b, a == b)
997 },
998 _ => (false, false),
999 },
1000 _ => (false, false),
1001 };
1002 if (all_true && !invert_filter) || (all_false && invert_filter) {
1003 return Ok(lf);
1004 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1005 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1006 }
1007
1008 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1010 if filter_expression.clone().meta().has_multiple_outputs() {
1011 filter_expression = all_horizontal([filter_expression])?;
1012 }
1013 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1014 lf = if invert_filter {
1015 lf.remove(filter_expression)
1016 } else {
1017 lf.filter(filter_expression)
1018 };
1019 }
1020 Ok(lf)
1021 }
1022
1023 pub(super) fn process_join(
1024 &mut self,
1025 tbl_left: &TableInfo,
1026 tbl_right: &TableInfo,
1027 constraint: &JoinConstraint,
1028 join_type: JoinType,
1029 ) -> PolarsResult<LazyFrame> {
1030 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1031
1032 let joined = tbl_left
1033 .frame
1034 .clone()
1035 .join_builder()
1036 .with(tbl_right.frame.clone())
1037 .left_on(left_on)
1038 .right_on(right_on)
1039 .how(join_type)
1040 .suffix(format!(":{}", tbl_right.name))
1041 .coalesce(JoinCoalesce::KeepColumns)
1042 .finish();
1043
1044 Ok(joined)
1045 }
1046
1047 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1048 let mut contexts = vec![];
1049 for expr in exprs {
1050 *expr = expr.clone().map_expr(|e| match e {
1051 Expr::SubPlan(lp, names) => {
1052 contexts.push(<LazyFrame>::from((**lp).clone()));
1053 if names.len() == 1 {
1054 Expr::Column(names[0].as_str().into())
1055 } else {
1056 Expr::SubPlan(lp, names)
1057 }
1058 },
1059 e => e,
1060 })
1061 }
1062
1063 if contexts.is_empty() {
1064 lf
1065 } else {
1066 lf.with_context(contexts)
1067 }
1068 }
1069
1070 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1071 if let Statement::CreateTable(CreateTable {
1072 if_not_exists,
1073 name,
1074 query,
1075 ..
1076 }) = stmt
1077 {
1078 let tbl_name = name.0.first().unwrap().value.as_str();
1079 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1081 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1082 }
1084 if let Some(query) = query {
1085 let lf = self.execute_query(query)?;
1086 self.register(tbl_name, lf);
1087 let out = df! {
1088 "Response" => ["CREATE TABLE"]
1089 }
1090 .unwrap()
1091 .lazy();
1092 Ok(out)
1093 } else {
1094 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1095 }
1096 } else {
1097 unreachable!()
1098 }
1099 }
1100
1101 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1102 match relation {
1103 TableFactor::Table {
1104 name, alias, args, ..
1105 } => {
1106 if let Some(args) = args {
1107 return self.execute_table_function(name, alias, &args.args);
1108 }
1109 let tbl_name = name.0.first().unwrap().value.as_str();
1110 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1111 match alias {
1112 Some(alias) => {
1113 self.table_aliases
1114 .borrow_mut()
1115 .insert(alias.name.value.clone(), tbl_name.to_string());
1116 Ok((alias.to_string(), lf))
1117 },
1118 None => Ok((tbl_name.to_string(), lf)),
1119 }
1120 } else {
1121 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1122 }
1123 },
1124 TableFactor::Derived {
1125 lateral,
1126 subquery,
1127 alias,
1128 } => {
1129 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1130 if let Some(alias) = alias {
1131 let mut lf = self.execute_query_no_ctes(subquery)?;
1132 lf = self.rename_columns_from_table_alias(lf, alias)?;
1133 self.table_map.insert(alias.name.value.clone(), lf.clone());
1134 Ok((alias.name.value.clone(), lf))
1135 } else {
1136 polars_bail!(SQLSyntax: "derived tables must have aliases");
1137 }
1138 },
1139 TableFactor::UNNEST {
1140 alias,
1141 array_exprs,
1142 with_offset,
1143 with_offset_alias: _,
1144 ..
1145 } => {
1146 if let Some(alias) = alias {
1147 let column_names: Vec<Option<PlSmallStr>> = alias
1148 .columns
1149 .iter()
1150 .map(|c| {
1151 if c.name.value.is_empty() {
1152 None
1153 } else {
1154 Some(PlSmallStr::from_str(c.name.value.as_str()))
1155 }
1156 })
1157 .collect();
1158
1159 let column_values: Vec<Series> = array_exprs
1160 .iter()
1161 .map(|arr| parse_sql_array(arr, self))
1162 .collect::<Result<_, _>>()?;
1163
1164 polars_ensure!(!column_names.is_empty(),
1165 SQLSyntax:
1166 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1167 );
1168 if column_names.len() != column_values.len() {
1169 let plural = if column_values.len() > 1 { "s" } else { "" };
1170 polars_bail!(
1171 SQLSyntax:
1172 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1173 );
1174 }
1175 let column_series: Vec<Column> = column_values
1176 .into_iter()
1177 .zip(column_names)
1178 .map(|(s, name)| {
1179 if let Some(name) = name {
1180 s.clone().with_name(name)
1181 } else {
1182 s.clone()
1183 }
1184 })
1185 .map(Column::from)
1186 .collect();
1187
1188 let lf = DataFrame::new(column_series)?.lazy();
1189
1190 if *with_offset {
1191 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1194 }
1195 let table_name = alias.name.value.clone();
1196 self.table_map.insert(table_name.clone(), lf.clone());
1197 Ok((table_name.clone(), lf))
1198 } else {
1199 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1200 }
1201 },
1202 TableFactor::NestedJoin {
1203 table_with_joins,
1204 alias,
1205 } => {
1206 let lf = self.execute_from_statement(table_with_joins)?;
1207 match alias {
1208 Some(a) => Ok((a.name.value.clone(), lf)),
1209 None => Ok(("".to_string(), lf)),
1210 }
1211 },
1212 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1214 }
1215 }
1216
1217 fn execute_table_function(
1218 &mut self,
1219 name: &ObjectName,
1220 alias: &Option<TableAlias>,
1221 args: &[FunctionArg],
1222 ) -> PolarsResult<(String, LazyFrame)> {
1223 let tbl_fn = name.0.first().unwrap().value.as_str();
1224 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1225 let (tbl_name, lf) = read_fn.execute(args)?;
1226 #[allow(clippy::useless_asref)]
1227 let tbl_name = alias
1228 .as_ref()
1229 .map(|a| a.name.value.clone())
1230 .unwrap_or_else(|| tbl_name);
1231
1232 self.table_map.insert(tbl_name.clone(), lf.clone());
1233 Ok((tbl_name, lf))
1234 }
1235
1236 fn process_order_by(
1237 &mut self,
1238 mut lf: LazyFrame,
1239 order_by: &Option<OrderBy>,
1240 selected: Option<&[Expr]>,
1241 ) -> PolarsResult<LazyFrame> {
1242 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1243 return Ok(lf);
1244 }
1245 let schema = self.get_frame_schema(&mut lf)?;
1246 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1247
1248 let order_by = order_by.as_ref().unwrap().exprs.clone();
1249 let mut descending = Vec::with_capacity(order_by.len());
1250 let mut nulls_last = Vec::with_capacity(order_by.len());
1251 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1252
1253 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1255 {
1256 if let Some(selected) = selected {
1257 by.extend(selected.iter().cloned());
1258 } else {
1259 by.extend(columns_iter);
1260 };
1261 let desc_order = !order_by[0].asc.unwrap_or(true);
1262 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1263 descending.resize(by.len(), desc_order);
1264 } else {
1265 let columns = &columns_iter.collect::<Vec<_>>();
1266 for ob in order_by {
1267 let desc_order = !ob.asc.unwrap_or(true);
1270 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1271 descending.push(desc_order);
1272
1273 by.push(self.expr_or_ordinal(
1275 &ob.expr,
1276 columns,
1277 selected,
1278 Some(&schema),
1279 "ORDER BY",
1280 )?)
1281 }
1282 }
1283 Ok(lf.sort_by_exprs(
1284 &by,
1285 SortMultipleOptions::default()
1286 .with_order_descending_multi(descending)
1287 .with_nulls_last_multi(nulls_last)
1288 .with_maintain_order(true),
1289 ))
1290 }
1291
1292 fn process_group_by(
1293 &mut self,
1294 mut lf: LazyFrame,
1295 group_by_keys: &[Expr],
1296 projections: &[Expr],
1297 ) -> PolarsResult<LazyFrame> {
1298 let schema_before = self.get_frame_schema(&mut lf)?;
1299 let group_by_keys_schema =
1300 expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1301
1302 let mut aggregation_projection = Vec::with_capacity(projections.len());
1304 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1305 let mut projection_aliases = PlHashSet::new();
1306 let mut group_key_aliases = PlHashSet::new();
1307
1308 for mut e in projections {
1309 let is_agg_or_window = has_expr(e, |e| {
1311 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1312 });
1313
1314 if let Expr::Alias(expr, alias) = e {
1316 if e.clone().meta().is_simple_projection() {
1317 group_key_aliases.insert(alias.as_ref());
1318 e = expr
1319 } else if let Expr::Function {
1320 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1321 ..
1322 } = expr.deref()
1323 {
1324 projection_overrides
1325 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1326 } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1327 projection_aliases.insert(alias.as_ref());
1328 }
1329 }
1330 let field = e.to_field(&schema_before, Context::Default)?;
1331 if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1332 let mut e = e.clone();
1333 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1334 e = (**expr).clone();
1335 } else if let Expr::Alias(expr, name) = &e {
1336 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1337 e = (**expr).clone().alias(name.clone());
1338 }
1339 }
1340 aggregation_projection.push(e);
1341 } else if let Expr::Column(_)
1342 | Expr::Function {
1343 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1344 ..
1345 } = e
1346 {
1347 if !group_by_keys_schema.contains(&field.name) {
1349 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1350 }
1351 }
1352 }
1353 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1354 let projection_schema =
1355 expressions_to_schema(projections, &schema_before, Context::Default)?;
1356
1357 let final_projection = projection_schema
1359 .iter_names()
1360 .zip(projections)
1361 .map(|(name, projection_expr)| {
1362 if let Some(expr) = projection_overrides.get(name.as_str()) {
1363 expr.clone()
1364 } else if group_by_keys_schema.get(name).is_some()
1365 || projection_aliases.contains(name.as_str())
1366 || group_key_aliases.contains(name.as_str())
1367 {
1368 projection_expr.clone()
1369 } else {
1370 col(name.clone())
1371 }
1372 })
1373 .collect::<Vec<_>>();
1374
1375 Ok(aggregated.select(&final_projection))
1376 }
1377
1378 fn process_limit_offset(
1379 &self,
1380 lf: LazyFrame,
1381 limit: &Option<SQLExpr>,
1382 offset: &Option<Offset>,
1383 ) -> PolarsResult<LazyFrame> {
1384 match (offset, limit) {
1385 (
1386 Some(Offset {
1387 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1388 ..
1389 }),
1390 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1391 ) => Ok(lf.slice(
1392 offset
1393 .parse()
1394 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1395 limit
1396 .parse()
1397 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1398 )),
1399 (
1400 Some(Offset {
1401 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1402 ..
1403 }),
1404 None,
1405 ) => Ok(lf.slice(
1406 offset
1407 .parse()
1408 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1409 IdxSize::MAX,
1410 )),
1411 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1412 limit
1413 .parse()
1414 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1415 )),
1416 (None, None) => Ok(lf),
1417 _ => polars_bail!(
1418 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1419 ),
1420 }
1421 }
1422
1423 fn process_qualified_wildcard(
1424 &mut self,
1425 ObjectName(idents): &ObjectName,
1426 options: &WildcardAdditionalOptions,
1427 modifiers: &mut SelectModifiers,
1428 schema: Option<&Schema>,
1429 ) -> PolarsResult<Vec<Expr>> {
1430 let mut new_idents = idents.clone();
1431 new_idents.push(Ident::new("*"));
1432
1433 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1434 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1435 }
1436
1437 fn process_wildcard_additional_options(
1438 &mut self,
1439 exprs: Vec<Expr>,
1440 options: &WildcardAdditionalOptions,
1441 modifiers: &mut SelectModifiers,
1442 schema: Option<&Schema>,
1443 ) -> PolarsResult<Vec<Expr>> {
1444 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1445 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1446 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1447 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1448 }
1449
1450 if let Some(items) = &options.opt_exclude {
1452 match items {
1453 ExcludeSelectItem::Single(ident) => {
1454 modifiers.exclude.insert(ident.value.clone());
1455 },
1456 ExcludeSelectItem::Multiple(idents) => {
1457 modifiers
1458 .exclude
1459 .extend(idents.iter().map(|i| i.value.clone()));
1460 },
1461 };
1462 }
1463
1464 if let Some(items) = &options.opt_except {
1466 modifiers.exclude.insert(items.first_element.value.clone());
1467 modifiers
1468 .exclude
1469 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1470 }
1471
1472 if let Some(item) = &options.opt_ilike {
1474 let rx = regex::escape(item.pattern.as_str())
1475 .replace('%', ".*")
1476 .replace('_', ".");
1477
1478 modifiers.ilike = Some(
1479 polars_utils::regex_cache::compile_regex(format!("^(?is){}$", rx).as_str())
1480 .unwrap(),
1481 );
1482 }
1483
1484 if let Some(items) = &options.opt_rename {
1486 let renames = match items {
1487 RenameSelectItem::Single(rename) => vec![rename],
1488 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1489 };
1490 for rn in renames {
1491 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1492 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1493 if before != after {
1494 modifiers.rename.insert(before, after);
1495 }
1496 }
1497 }
1498
1499 if let Some(replacements) = &options.opt_replace {
1501 for rp in &replacements.items {
1502 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1503 modifiers
1504 .replace
1505 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1506 }
1507 }
1508 Ok(exprs)
1509 }
1510
1511 fn rename_columns_from_table_alias(
1512 &mut self,
1513 mut lf: LazyFrame,
1514 alias: &TableAlias,
1515 ) -> PolarsResult<LazyFrame> {
1516 if alias.columns.is_empty() {
1517 Ok(lf)
1518 } else {
1519 let schema = self.get_frame_schema(&mut lf)?;
1520 if alias.columns.len() != schema.len() {
1521 polars_bail!(
1522 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1523 alias.columns.len(), alias.name.value, schema.len()
1524 )
1525 } else {
1526 let existing_columns: Vec<_> = schema.iter_names().collect();
1527 let new_columns: Vec<_> =
1528 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1529 Ok(lf.rename(existing_columns, new_columns, true))
1530 }
1531 }
1532 }
1533}
1534
1535impl SQLContext {
1536 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1538 self.table_map.clone()
1539 }
1540
1541 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1543 Self {
1544 table_map,
1545 ..Default::default()
1546 }
1547 }
1548}
1549
1550fn collect_compound_identifiers(
1551 left: &[Ident],
1552 right: &[Ident],
1553 left_name: &str,
1554 right_name: &str,
1555) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1556 if left.len() == 2 && right.len() == 2 {
1557 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1558 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1559
1560 if left_name == tbl_b || right_name == tbl_a {
1562 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1563 } else {
1564 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1565 }
1566 } else {
1567 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1568 }
1569}
1570
1571fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1572 match expr {
1573 Expr::Wildcard => schema
1574 .iter_names()
1575 .map(|name| col(name.clone()))
1576 .collect::<Vec<_>>(),
1577 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1578 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1579 schema
1580 .iter_names()
1581 .filter(|name| re.is_match(name))
1582 .map(|name| col(name.clone()))
1583 .collect::<Vec<_>>()
1584 },
1585 Expr::Columns(names) => names
1586 .iter()
1587 .map(|name| col(name.clone()))
1588 .collect::<Vec<_>>(),
1589 _ => vec![expr],
1590 }
1591}
1592
1593fn is_regex_colname(nm: &str) -> bool {
1594 nm.starts_with('^') && nm.ends_with('$')
1595}
1596
1597fn process_join_on(
1598 expression: &sqlparser::ast::Expr,
1599 tbl_left: &TableInfo,
1600 tbl_right: &TableInfo,
1601) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1602 match expression {
1603 SQLExpr::BinaryOp { left, op, right } => match op {
1604 BinaryOperator::And => {
1605 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1606 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1607 left_i.append(&mut left_j);
1608 right_i.append(&mut right_j);
1609 Ok((left_i, right_i))
1610 },
1611 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1612 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1613 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1614 },
1615 _ => {
1616 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1617 },
1618 },
1619 _ => {
1620 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1621 },
1622 },
1623 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1624 _ => {
1625 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1626 },
1627 }
1628}
1629
1630fn process_join_constraint(
1631 constraint: &JoinConstraint,
1632 tbl_left: &TableInfo,
1633 tbl_right: &TableInfo,
1634) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1635 match constraint {
1636 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1637 process_join_on(expr, tbl_left, tbl_right)
1638 },
1639 JoinConstraint::Using(idents) if !idents.is_empty() => {
1640 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1641 Ok((using.clone(), using))
1642 },
1643 JoinConstraint::Natural => {
1644 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1645 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1646 let on: Vec<Expr> = left_names
1647 .intersection(&right_names)
1648 .map(|&name| col(name.clone()))
1649 .collect();
1650 if on.is_empty() {
1651 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1652 }
1653 Ok((on.clone(), on))
1654 },
1655 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1656 }
1657}
1658
1659bitflags::bitflags! {
1660 #[derive(PartialEq)]
1665 struct ExprSqlProjectionHeightBehavior: u8 {
1666 const MaintainsColumn = 1 << 0;
1668 const Independent = 1 << 1;
1672 const InheritsContext = 1 << 2;
1675 }
1676}
1677
1678impl ExprSqlProjectionHeightBehavior {
1679 fn identify_from_expr(expr: &Expr) -> Self {
1680 let mut has_column = false;
1681 let mut has_independent = false;
1682
1683 for e in expr.into_iter() {
1684 use Expr::*;
1685
1686 has_column |= matches!(e, Column(_) | Columns(_) | DtypeColumn(_) | IndexColumn(_));
1687
1688 has_independent |= match e {
1689 Function { options, .. } | AnonymousFunction { options, .. } => {
1690 options.returns_scalar() || !options.is_length_preserving()
1691 },
1692
1693 Literal(v) => !v.is_scalar(),
1694
1695 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1696
1697 Agg { .. } | Len => true,
1698
1699 _ => false,
1700 }
1701 }
1702
1703 if has_independent {
1704 Self::Independent
1705 } else if has_column {
1706 Self::MaintainsColumn
1707 } else {
1708 Self::InheritsContext
1709 }
1710 }
1711}