1use std::ops::Deref;
2use std::sync::RwLock;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::aliases::{PlHashSet, PlIndexSet};
11use polars_utils::format_pl_smallstr;
12use sqlparser::ast::{
13 BinaryOperator, CreateTable, CreateTableLikeKind, Delete, Distinct, ExcludeSelectItem,
14 Expr as SQLExpr, Fetch, FromTable, FunctionArg, GroupByExpr, Ident, JoinConstraint,
15 JoinOperator, LimitClause, NamedWindowDefinition, NamedWindowExpr, ObjectName, ObjectType,
16 OrderBy, OrderByKind, Query, RenameSelectItem, Select, SelectItem,
17 SelectItemQualifiedWildcardKind, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias,
18 TableFactor, TableWithJoins, Truncate, UnaryOperator, Value as SQLValue, ValueWithSpan, Values,
19 Visit, WildcardAdditionalOptions, WindowSpec,
20};
21use sqlparser::dialect::GenericDialect;
22use sqlparser::parser::{Parser, ParserOptions};
23
24use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
25use crate::sql_expr::{
26 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
27};
28use crate::sql_visitors::{
29 QualifyExpression, TableIdentifierCollector, check_for_ambiguous_column_refs,
30 expr_has_window_functions, expr_refers_to_table,
31};
32use crate::table_functions::PolarsTableFunctions;
33use crate::types::map_sql_dtype_to_polars;
34
35fn clear_lf(lf: LazyFrame) -> LazyFrame {
36 let cb = PlanCallback::new(move |(_, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
37 let schema = &schemas[0];
38 Ok(DataFrame::empty_with_schema(schema).lazy().logical_plan)
39 });
40 lf.pipe_with_schema(cb)
41}
42
43#[derive(Clone)]
44pub struct TableInfo {
45 pub(crate) frame: LazyFrame,
46 pub(crate) name: PlSmallStr,
47 pub(crate) schema: Arc<Schema>,
48}
49
50struct SelectModifiers {
51 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
56impl SelectModifiers {
57 fn matches_ilike(&self, s: &str) -> bool {
58 match &self.ilike {
59 Some(rx) => rx.is_match(s),
60 None => true,
61 }
62 }
63 fn renamed_cols(&self) -> Vec<Expr> {
64 self.rename
65 .iter()
66 .map(|(before, after)| col(before.clone()).alias(after.clone()))
67 .collect()
68 }
69}
70
71enum ProjectionItem {
73 QualifiedExprs(PlSmallStr, Vec<Expr>),
74 Exprs(Vec<Expr>),
75}
76
77fn expr_output_name(expr: &Expr) -> Option<&PlSmallStr> {
79 match expr {
80 Expr::Column(name) | Expr::Alias(_, name) => Some(name),
81 _ => None,
82 }
83}
84
85fn disambiguate_projection_cols(
87 items: Vec<ProjectionItem>,
88 schema: &Schema,
89) -> PolarsResult<Vec<Expr>> {
90 let mut qualified_wildcard_names: PlHashMap<PlSmallStr, usize> = PlHashMap::new();
92 let mut other_names: PlHashSet<PlSmallStr> = PlHashSet::new();
93 for item in &items {
94 match item {
95 ProjectionItem::QualifiedExprs(_, exprs) => {
96 for expr in exprs {
97 if let Some(name) = expr_output_name(expr) {
98 *qualified_wildcard_names.entry(name.clone()).or_insert(0) += 1;
99 }
100 }
101 },
102 ProjectionItem::Exprs(exprs) => {
103 for expr in exprs {
104 if let Some(name) = expr_output_name(expr) {
105 other_names.insert(name.clone());
106 }
107 }
108 },
109 }
110 }
111
112 let needs_suffix: PlHashSet<PlSmallStr> = qualified_wildcard_names
114 .into_iter()
115 .filter(|(name, count)| *count > 1 || other_names.contains(name))
116 .map(|(name, _)| name)
117 .collect();
118
119 let mut result: Vec<Expr> = Vec::new();
121 for item in items {
122 match item {
123 ProjectionItem::QualifiedExprs(tbl_name, exprs) if !needs_suffix.is_empty() => {
124 for expr in exprs {
125 if let Some(name) = expr_output_name(&expr) {
126 if needs_suffix.contains(name) {
127 let suffixed = format_pl_smallstr!("{}:{}", name, tbl_name);
128 if schema.contains(suffixed.as_str()) {
129 result.push(col(suffixed));
130 continue;
131 }
132 if other_names.contains(name) {
133 polars_bail!(
134 SQLInterface:
135 "column '{}' is duplicated in the SELECT (explicitly, and via the `*` wildcard)", name
136 );
137 }
138 }
139 }
140 result.push(expr);
141 }
142 },
143 ProjectionItem::QualifiedExprs(_, exprs) | ProjectionItem::Exprs(exprs) => {
144 result.extend(exprs);
145 },
146 }
147 }
148 Ok(result)
149}
150
151#[derive(Clone)]
153pub struct SQLContext {
154 pub(crate) table_map: Arc<RwLock<PlHashMap<String, LazyFrame>>>,
155 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
156 pub(crate) lp_arena: Arena<IR>,
157 pub(crate) expr_arena: Arena<AExpr>,
158
159 cte_map: PlHashMap<String, LazyFrame>,
160 table_aliases: PlHashMap<String, String>,
161 joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
162 pub(crate) named_windows: PlHashMap<String, WindowSpec>,
163}
164
165impl Default for SQLContext {
166 fn default() -> Self {
167 Self {
168 function_registry: Arc::new(DefaultFunctionRegistry {}),
169 table_map: Default::default(),
170 cte_map: Default::default(),
171 table_aliases: Default::default(),
172 joined_aliases: Default::default(),
173 named_windows: Default::default(),
174 lp_arena: Default::default(),
175 expr_arena: Default::default(),
176 }
177 }
178}
179
180impl SQLContext {
181 pub fn new() -> Self {
189 Self::default()
190 }
191
192 pub fn get_tables(&self) -> Vec<String> {
194 let mut tables = Vec::from_iter(self.table_map.read().unwrap().keys().cloned());
195 tables.sort_unstable();
196 tables
197 }
198
199 pub fn register(&self, name: &str, lf: LazyFrame) {
215 self.table_map.write().unwrap().insert(name.to_owned(), lf);
216 }
217
218 pub fn unregister(&self, name: &str) {
220 self.table_map.write().unwrap().remove(&name.to_owned());
221 }
222
223 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
242 let mut parser = Parser::new(&GenericDialect);
243 parser = parser.with_options(ParserOptions {
244 trailing_commas: true,
245 ..Default::default()
246 });
247
248 let ast = parser
249 .try_with_sql(query)
250 .map_err(to_sql_interface_err)?
251 .parse_statements()
252 .map_err(to_sql_interface_err)?;
253
254 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
255 let res = self.execute_statement(ast.first().unwrap())?;
256
257 let lp_arena = std::mem::take(&mut self.lp_arena);
260 let expr_arena = std::mem::take(&mut self.expr_arena);
261 res.set_cached_arena(lp_arena, expr_arena);
262
263 self.cte_map.clear();
265 self.table_aliases.clear();
266 self.joined_aliases.clear();
267 self.named_windows.clear();
268
269 Ok(res)
270 }
271
272 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
275 self.function_registry = function_registry;
276 self
277 }
278
279 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
281 &self.function_registry
282 }
283
284 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
286 Arc::get_mut(&mut self.function_registry).unwrap()
287 }
288}
289
290impl SQLContext {
291 fn isolated(&self) -> Self {
292 Self {
293 table_map: Arc::new(RwLock::new(self.table_map.read().unwrap().clone())),
295 named_windows: self.named_windows.clone(),
296 cte_map: self.cte_map.clone(),
297
298 ..Default::default()
299 }
300 }
301
302 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
303 let ast = stmt;
304 Ok(match ast {
305 Statement::Query(query) => self.execute_query(query)?,
306 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
307 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
308 stmt @ Statement::Drop {
309 object_type: ObjectType::Table,
310 ..
311 } => self.execute_drop_table(stmt)?,
312 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
313 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
314 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
315 _ => polars_bail!(
316 SQLInterface: "statement type is not supported:\n{:?}", ast,
317 ),
318 })
319 }
320
321 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
322 self.register_ctes(query)?;
323 self.execute_query_no_ctes(query)
324 }
325
326 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
327 self.validate_query(query)?;
328
329 let lf = self.process_query(&query.body, query)?;
330 self.process_limit_offset(lf, &query.limit_clause, &query.fetch)
331 }
332
333 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
334 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
335 }
336
337 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
338 self.table_map
342 .read()
343 .unwrap()
344 .get(name)
345 .cloned()
346 .or_else(|| self.cte_map.get(name).cloned())
347 .or_else(|| {
348 self.table_aliases.get(name).and_then(|alias| {
349 self.table_map
350 .read()
351 .unwrap()
352 .get(alias.as_str())
353 .or_else(|| self.cte_map.get(alias.as_str()))
354 .cloned()
355 })
356 })
357 }
358
359 pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<LazyFrame>
363 where
364 F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
365 {
366 let mut ctx = self.isolated();
367
368 let lf = query(&mut ctx)?;
370
371 lf.set_cached_arena(ctx.lp_arena, ctx.expr_arena);
373
374 Ok(lf)
375 }
376
377 fn expr_or_ordinal(
378 &mut self,
379 e: &SQLExpr,
380 exprs: &[Expr],
381 selected: Option<&[Expr]>,
382 schema: Option<&Schema>,
383 clause: &str,
384 ) -> PolarsResult<Expr> {
385 match e {
386 SQLExpr::UnaryOp {
387 op: UnaryOperator::Minus,
388 expr,
389 } if matches!(
390 **expr,
391 SQLExpr::Value(ValueWithSpan {
392 value: SQLValue::Number(_, _),
393 ..
394 })
395 ) =>
396 {
397 if let SQLExpr::Value(ValueWithSpan {
398 value: SQLValue::Number(ref idx, _),
399 ..
400 }) = **expr
401 {
402 Err(polars_err!(
403 SQLSyntax:
404 "negative ordinal values are invalid for {}; found -{}",
405 clause,
406 idx
407 ))
408 } else {
409 unreachable!()
410 }
411 },
412 SQLExpr::Value(ValueWithSpan {
413 value: SQLValue::Number(idx, _),
414 ..
415 }) => {
416 let idx = idx.parse::<usize>().map_err(|_| {
418 polars_err!(
419 SQLSyntax:
420 "negative ordinal values are invalid for {}; found {}",
421 clause,
422 idx
423 )
424 })?;
425 let cols = if let Some(cols) = selected {
428 cols
429 } else {
430 exprs
431 };
432 Ok(cols
433 .get(idx - 1)
434 .ok_or_else(|| {
435 polars_err!(
436 SQLInterface:
437 "{} ordinal value must refer to a valid column; found {}",
438 clause,
439 idx
440 )
441 })?
442 .clone())
443 },
444 SQLExpr::Value(v) => Err(polars_err!(
445 SQLSyntax:
446 "{} requires a valid expression or positive ordinal; found {}", clause, v,
447 )),
448 _ => {
449 let mut expr = parse_sql_expr(e, self, schema)?;
452 if matches!(e, SQLExpr::CompoundIdentifier(_)) {
453 if let Some(schema) = schema {
454 expr = expr.map_expr(|ex| match &ex {
455 Expr::Column(name) => {
456 let prefixed = format!("__POLARS_ORIG_{}", name.as_str());
457 if schema.contains(prefixed.as_str()) {
458 col(prefixed)
459 } else {
460 ex
461 }
462 },
463 _ => ex,
464 });
465 }
466 }
467 Ok(expr)
468 },
469 }
470 }
471
472 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
473 if let Some(aliases) = self.joined_aliases.get(tbl_name) {
474 if let Some(name) = aliases.get(column_name) {
475 return name.to_string();
476 }
477 }
478 column_name.to_string()
479 }
480
481 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
482 match expr {
483 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
484 SetExpr::Query(nested_query) => {
485 let lf = self.execute_query_no_ctes(nested_query)?;
486 self.process_order_by(lf, &query.order_by, None)
487 },
488 SetExpr::SetOperation {
489 op: SetOperator::Union,
490 set_quantifier,
491 left,
492 right,
493 } => self.process_union(left, right, set_quantifier, query),
494
495 #[cfg(feature = "semi_anti_join")]
496 SetExpr::SetOperation {
497 op: SetOperator::Intersect | SetOperator::Except,
498 set_quantifier,
499 left,
500 right,
501 } => self.process_except_intersect(left, right, set_quantifier, query),
502
503 SetExpr::Values(Values {
504 explicit_row: _,
505 rows,
506 value_keyword: _,
507 }) => self.process_values(rows),
508
509 SetExpr::Table(tbl) => {
510 if let Some(table_name) = tbl.table_name.as_ref() {
511 self.get_table_from_current_scope(table_name)
512 .ok_or_else(|| {
513 polars_err!(
514 SQLInterface: "no table or alias named '{}' found",
515 tbl
516 )
517 })
518 } else {
519 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
520 }
521 },
522 op => {
523 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
524 },
525 }
526 }
527
528 #[cfg(feature = "semi_anti_join")]
529 fn process_except_intersect(
530 &mut self,
531 left: &SetExpr,
532 right: &SetExpr,
533 quantifier: &SetQuantifier,
534 query: &Query,
535 ) -> PolarsResult<LazyFrame> {
536 let (join_type, op_name) = match *query.body {
537 SetExpr::SetOperation {
538 op: SetOperator::Except,
539 ..
540 } => (JoinType::Anti, "EXCEPT"),
541 _ => (JoinType::Semi, "INTERSECT"),
542 };
543
544 let mut lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
547 let mut rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
548 let lf_schema = self.get_frame_schema(&mut lf)?;
549
550 let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
551 let rf_cols = match quantifier {
552 SetQuantifier::ByName => None,
553 SetQuantifier::Distinct | SetQuantifier::None => {
554 let rf_schema = self.get_frame_schema(&mut rf)?;
555 let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
556 if lf_cols.len() != rf_cols.len() {
557 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
558 }
559 Some(rf_cols)
560 },
561 _ => {
562 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
563 },
564 };
565 let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
566 let joined_tbl = match rf_cols {
567 Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
568 None => join.on(lf_cols).finish(),
569 };
570 let lf = joined_tbl.unique(None, UniqueKeepStrategy::Any);
571 self.process_order_by(lf, &query.order_by, None)
572 }
573
574 fn process_union(
575 &mut self,
576 left: &SetExpr,
577 right: &SetExpr,
578 quantifier: &SetQuantifier,
579 query: &Query,
580 ) -> PolarsResult<LazyFrame> {
581 let quantifier = *quantifier;
582
583 let lf = self.execute_isolated(|ctx| ctx.process_query(left, query))?;
586 let rf = self.execute_isolated(|ctx| ctx.process_query(right, query))?;
587
588 let cb = PlanCallback::new(
589 move |(mut plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
590 let mut rf = LazyFrame::from(plans.pop().unwrap());
591 let lf = LazyFrame::from(plans.pop().unwrap());
592
593 let opts = UnionArgs {
594 parallel: true,
595 to_supertypes: true,
596 maintain_order: false,
597 ..Default::default()
598 };
599 let out = match quantifier {
600 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
602 let lf_schema = &schemas[0];
603 let rf_schema = &schemas[1];
604 if lf_schema.len() != rf_schema.len() {
605 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
606 }
607 if lf_schema.iter_names().ne(rf_schema.iter_names()) {
610 rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
611 }
612 let concatenated = concat(vec![lf, rf], opts);
613 match quantifier {
614 SetQuantifier::Distinct | SetQuantifier::None => {
615 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
616 },
617 _ => concatenated,
618 }
619 },
620 #[cfg(feature = "diagonal_concat")]
622 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
623 #[cfg(feature = "diagonal_concat")]
625 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
626 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
627 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
628 },
629 #[allow(unreachable_patterns)]
630 _ => {
631 polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier)
632 },
633 };
634
635 out.map(|lf| lf.logical_plan)
636 },
637 );
638
639 let lf = lf.pipe_with_schemas(vec![rf], cb);
640 self.process_order_by(lf, &query.order_by, None)
641 }
642
643 fn process_unnest_lateral(
646 &self,
647 lf: LazyFrame,
648 alias: &Option<TableAlias>,
649 array_exprs: &[SQLExpr],
650 with_offset: bool,
651 ) -> PolarsResult<LazyFrame> {
652 let alias = alias
653 .as_ref()
654 .ok_or_else(|| polars_err!(SQLSyntax: "UNNEST table must have an alias"))?;
655 polars_ensure!(!with_offset, SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
656
657 let (mut explode_cols, mut rename_from, mut rename_to) = (
658 Vec::with_capacity(array_exprs.len()),
659 Vec::with_capacity(array_exprs.len()),
660 Vec::with_capacity(array_exprs.len()),
661 );
662 let is_single_col = array_exprs.len() == 1;
663
664 for (i, arr_expr) in array_exprs.iter().enumerate() {
665 let col_name = match arr_expr {
666 SQLExpr::Identifier(ident) => PlSmallStr::from_str(&ident.value),
667 SQLExpr::CompoundIdentifier(parts) => {
668 PlSmallStr::from_str(&parts.last().unwrap().value)
669 },
670 SQLExpr::Array(_) => polars_bail!(
671 SQLInterface: "CROSS JOIN UNNEST with both literal arrays and column references is not supported"
672 ),
673 other => polars_bail!(
674 SQLSyntax: "UNNEST expects column references or array literals, found {:?}", other
675 ),
676 };
677 if let Some(name) = alias
679 .columns
680 .get(i)
681 .map(|c| c.name.value.as_str())
682 .or_else(|| is_single_col.then_some(alias.name.value.as_str()))
683 .filter(|name| !name.is_empty() && *name != col_name.as_str())
684 {
685 rename_from.push(col_name.clone());
686 rename_to.push(PlSmallStr::from_str(name));
687 }
688 explode_cols.push(col_name);
689 }
690
691 let mut lf = lf.explode(
692 Selector::ByName {
693 names: Arc::from(explode_cols),
694 strict: true,
695 },
696 ExplodeOptions {
697 empty_as_null: true,
698 keep_nulls: true,
699 },
700 );
701 if !rename_from.is_empty() {
702 lf = lf.rename(rename_from, rename_to, true);
703 }
704 Ok(lf)
705 }
706
707 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
708 let frame_rows: Vec<Row> = values.iter().map(|row| {
709 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
710 let expr = parse_sql_expr(expr, self, None)?;
711 match expr {
712 Expr::Literal(value) => {
713 value.to_any_value()
714 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
715 .map(|av| av.into_static())
716 },
717 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
718 }
719 }).collect();
720 row_data.map(Row::new)
721 }).collect::<Result<_, _>>()?;
722
723 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
724 }
725
726 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
728 match stmt {
729 Statement::Explain { statement, .. } => {
730 let lf = self.execute_statement(statement)?;
731 let plan = lf.describe_optimized_plan()?;
732 let plan = plan
733 .split('\n')
734 .collect::<Series>()
735 .with_name(PlSmallStr::from_static("Logical Plan"))
736 .into_column();
737 let df = DataFrame::new_infer_height(vec![plan])?;
738 Ok(df.lazy())
739 },
740 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
741 }
742 }
743
744 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
746 let tables = Column::new("name".into(), self.get_tables());
747 let df = DataFrame::new_infer_height(vec![tables])?;
748 Ok(df.lazy())
749 }
750
751 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
753 match stmt {
754 Statement::Drop { names, .. } => {
755 names.iter().for_each(|name| {
756 self.table_map.write().unwrap().remove(&name.to_string());
757 });
758 Ok(DataFrame::empty().lazy())
759 },
760 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
761 }
762 }
763
764 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
766 if let Statement::Delete(Delete {
767 tables,
768 from,
769 using,
770 selection,
771 returning,
772 order_by,
773 limit,
774 delete_token: _,
775 }) = stmt
776 {
777 if !tables.is_empty()
778 || using.is_some()
779 || returning.is_some()
780 || limit.is_some()
781 || !order_by.is_empty()
782 {
783 let error_message = match () {
784 _ if !tables.is_empty() => "DELETE expects exactly one table name",
785 _ if using.is_some() => "DELETE does not support the USING clause",
786 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
787 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
788 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
789 _ => unreachable!(),
790 };
791 polars_bail!(SQLInterface: error_message);
792 }
793 let from_tables = match &from {
794 FromTable::WithFromKeyword(from) => from,
795 FromTable::WithoutKeyword(from) => from,
796 };
797 if from_tables.len() > 1 {
798 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
799 }
800 let tbl_expr = from_tables.first().unwrap();
801 if !tbl_expr.joins.is_empty() {
802 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
803 }
804 let (_, lf) = self.get_table(&tbl_expr.relation)?;
805 if selection.is_none() {
806 Ok(clear_lf(lf))
808 } else {
809 Ok(self.process_where(lf.clone(), selection, true, None)?)
811 }
812 } else {
813 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
814 }
815 }
816
817 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
819 if let Statement::Truncate(Truncate {
820 table_names,
821 partitions,
822 ..
823 }) = stmt
824 {
825 match partitions {
826 None => {
827 if table_names.len() != 1 {
828 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
829 }
830 let tbl = table_names[0].name.to_string();
831 if let Some(lf) = self.table_map.write().unwrap().get_mut(&tbl) {
832 *lf = clear_lf(lf.clone());
833 Ok(lf.clone())
834 } else {
835 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
836 }
837 },
838 _ => {
839 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
840 },
841 }
842 } else {
843 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
844 }
845 }
846
847 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
848 self.cte_map.insert(name.to_owned(), lf);
849 }
850
851 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
852 if let Some(with) = &query.with {
853 if with.recursive {
854 polars_bail!(SQLInterface: "recursive CTEs are not supported")
855 }
856 for cte in &with.cte_tables {
857 let cte_name = cte.alias.name.value.clone();
858 let mut lf = self.execute_query(&cte.query)?;
859 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
860 self.register_cte(&cte_name, lf);
861 }
862 }
863 Ok(())
864 }
865
866 fn register_named_windows(
867 &mut self,
868 named_windows: &[NamedWindowDefinition],
869 ) -> PolarsResult<()> {
870 for NamedWindowDefinition(name, expr) in named_windows {
871 let spec = match expr {
872 NamedWindowExpr::NamedWindow(ref_name) => self
873 .named_windows
874 .get(&ref_name.value)
875 .ok_or_else(|| {
876 polars_err!(
877 SQLInterface:
878 "named window '{}' references undefined window '{}'",
879 name.value, ref_name.value
880 )
881 })?
882 .clone(),
883 NamedWindowExpr::WindowSpec(spec) => spec.clone(),
884 };
885 self.named_windows.insert(name.value.clone(), spec);
886 }
887 Ok(())
888 }
889
890 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
892 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
893 if !tbl_expr.joins.is_empty() {
894 for join in &tbl_expr.joins {
895 if let (
897 JoinOperator::CrossJoin(JoinConstraint::None),
898 TableFactor::UNNEST {
899 alias,
900 array_exprs,
901 with_offset,
902 ..
903 },
904 ) = (&join.join_operator, &join.relation)
905 {
906 if array_exprs.iter().any(|e| !matches!(e, SQLExpr::Array(_))) {
907 lf = self.process_unnest_lateral(lf, alias, array_exprs, *with_offset)?;
908 continue;
909 }
910 }
911
912 let (r_name, mut rf) = self.get_table(&join.relation)?;
913 if r_name.is_empty() {
914 polars_bail!(
916 SQLInterface:
917 "cannot join on unnamed relation; please provide an alias"
918 )
919 }
920 let left_schema = self.get_frame_schema(&mut lf)?;
921 let right_schema = self.get_frame_schema(&mut rf)?;
922
923 lf = match &join.join_operator {
924 op @ (JoinOperator::Join(constraint) | JoinOperator::FullOuter(constraint)
926 | JoinOperator::Left(constraint)
927 | JoinOperator::LeftOuter(constraint)
928 | JoinOperator::Right(constraint)
929 | JoinOperator::RightOuter(constraint)
930 | JoinOperator::Inner(constraint)
931 | JoinOperator::Anti(constraint)
932 | JoinOperator::Semi(constraint)
933 | JoinOperator::LeftAnti(constraint)
934 | JoinOperator::LeftSemi(constraint)
935 | JoinOperator::RightAnti(constraint)
936 | JoinOperator::RightSemi(constraint)) => {
937 let (lf, rf) = match op {
938 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
939 _ => (lf, rf),
940 };
941 self.process_join(
942 &TableInfo {
943 frame: lf,
944 name: (&l_name).into(),
945 schema: left_schema.clone(),
946 },
947 &TableInfo {
948 frame: rf,
949 name: (&r_name).into(),
950 schema: right_schema.clone(),
951 },
952 constraint,
953 match op {
954 JoinOperator::Join(_) | JoinOperator::Inner(_) => JoinType::Inner,
955 JoinOperator::Left(_) | JoinOperator::LeftOuter(_) => {
956 JoinType::Left
957 },
958 JoinOperator::Right(_) | JoinOperator::RightOuter(_) => {
959 JoinType::Right
960 },
961 JoinOperator::FullOuter(_) => JoinType::Full,
962 #[cfg(feature = "semi_anti_join")]
963 JoinOperator::Anti(_)
964 | JoinOperator::LeftAnti(_)
965 | JoinOperator::RightAnti(_) => JoinType::Anti,
966 #[cfg(feature = "semi_anti_join")]
967 JoinOperator::Semi(_)
968 | JoinOperator::LeftSemi(_)
969 | JoinOperator::RightSemi(_) => JoinType::Semi,
970 join_type => polars_bail!(
971 SQLInterface:
972 "join type '{:?}' not currently supported",
973 join_type
974 ),
975 },
976 )?
977 },
978 JoinOperator::CrossJoin(JoinConstraint::None) => {
979 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
980 },
981 JoinOperator::CrossJoin(constraint) => {
982 polars_bail!(
983 SQLInterface:
984 "CROSS JOIN does not support {:?} constraint; consider INNER JOIN instead",
985 constraint
986 )
987 },
988 join_type => {
989 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
990 },
991 };
992
993 let joined_schema = self.get_frame_schema(&mut lf)?;
995
996 self.joined_aliases.insert(
997 r_name.clone(),
998 right_schema
999 .iter_names()
1000 .filter_map(|name| {
1001 let aliased_name = format!("{name}:{r_name}");
1003 if left_schema.contains(name)
1004 && joined_schema.contains(aliased_name.as_str())
1005 {
1006 Some((name.to_string(), aliased_name))
1007 } else {
1008 None
1009 }
1010 })
1011 .collect::<PlHashMap<String, String>>(),
1012 );
1013 }
1014 };
1015 Ok(lf)
1016 }
1017
1018 fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
1020 let Select {
1023 distinct: _,
1025 from: _,
1026 group_by: _,
1027 having: _,
1028 named_window: _,
1029 projection: _,
1030 qualify: _,
1031 selection: _,
1032
1033 flavor: _,
1035 select_token: _,
1036 top_before_distinct: _,
1037 window_before_qualify: _,
1038
1039 ref cluster_by,
1041 ref connect_by,
1042 ref distribute_by,
1043 ref exclude,
1044 ref into,
1045 ref lateral_views,
1046 ref prewhere,
1047 ref sort_by,
1048 ref top,
1049 ref value_table_mode,
1050 } = *select_stmt;
1051
1052 polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
1054 polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
1055 polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
1056 polars_ensure!(exclude.is_none(), SQLInterface: "`EXCLUDE` clause is not supported");
1057 polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO` clause is not supported");
1058 polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
1059 polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
1060 polars_ensure!(sort_by.is_empty(), SQLInterface: "`SORT BY` clause is not supported; use `ORDER BY` instead");
1061 polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
1062 polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");
1063
1064 Ok(())
1065 }
1066
1067 fn validate_query(&self, query: &Query) -> PolarsResult<()> {
1069 let Query {
1071 with: _,
1073 body: _,
1074 order_by: _,
1075 limit_clause: _,
1076 fetch,
1077
1078 for_clause,
1080 format_clause,
1081 locks,
1082 pipe_operators,
1083 settings,
1084 } = query;
1085
1086 polars_ensure!(for_clause.is_none(), SQLInterface: "`FOR` clause is not supported");
1088 polars_ensure!(format_clause.is_none(), SQLInterface: "`FORMAT` clause is not supported");
1089 polars_ensure!(locks.is_empty(), SQLInterface: "`FOR UPDATE/SHARE` locking clause is not supported");
1090 polars_ensure!(pipe_operators.is_empty(), SQLInterface: "pipe operators are not supported");
1091 polars_ensure!(settings.is_none(), SQLInterface: "`SETTINGS` clause is not supported");
1092
1093 if let Some(Fetch {
1095 quantity: _, percent,
1097 with_ties,
1098 }) = fetch
1099 {
1100 polars_ensure!(!percent, SQLInterface: "`FETCH` with `PERCENT` is not supported");
1101 polars_ensure!(!with_ties, SQLInterface: "`FETCH` with `WITH TIES` is not supported");
1102 }
1103 Ok(())
1104 }
1105
1106 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
1108 self.validate_select(select_stmt)?;
1110
1111 self.register_named_windows(&select_stmt.named_window)?;
1113
1114 let (mut lf, base_table_name) = if select_stmt.from.is_empty() {
1116 (DataFrame::empty().lazy(), None)
1117 } else {
1118 let from = select_stmt.clone().from;
1121 if from.len() > 1 {
1122 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
1123 }
1124 let tbl_expr = from.first().unwrap();
1125 let lf = self.execute_from_statement(tbl_expr)?;
1126 let base_name = get_table_name(&tbl_expr.relation);
1127 (lf, base_name)
1128 };
1129
1130 if let Some(ref base_name) = base_table_name {
1132 if !self.joined_aliases.is_empty() {
1133 let using_cols: PlHashSet<String> = select_stmt
1135 .from
1136 .first()
1137 .into_iter()
1138 .flat_map(|t| t.joins.iter())
1139 .filter_map(|join| get_using_cols(&join.join_operator))
1140 .flatten()
1141 .collect();
1142
1143 let check_expr = |e| {
1145 check_for_ambiguous_column_refs(e, &self.joined_aliases, base_name, &using_cols)
1146 };
1147 for item in &select_stmt.projection {
1148 match item {
1149 SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
1150 check_expr(e)?
1151 },
1152 _ => {},
1153 }
1154 }
1155 if let Some(ref where_expr) = select_stmt.selection {
1156 check_expr(where_expr)?;
1157 }
1158 }
1159 }
1160
1161 let mut schema = self.get_frame_schema(&mut lf)?;
1163 lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
1164
1165 let mut select_modifiers = SelectModifiers {
1167 ilike: None,
1168 exclude: PlHashSet::new(),
1169 rename: PlHashMap::new(),
1170 replace: vec![],
1171 };
1172
1173 let window_fn_columns = if select_stmt.qualify.is_some() {
1176 select_stmt
1177 .projection
1178 .iter()
1179 .filter_map(|item| match item {
1180 SelectItem::ExprWithAlias { expr, alias }
1181 if expr_has_window_functions(expr) =>
1182 {
1183 Some(alias.value.clone())
1184 },
1185 _ => None,
1186 })
1187 .collect::<PlHashSet<_>>()
1188 } else {
1189 PlHashSet::new()
1190 };
1191
1192 let mut projections =
1193 self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
1194
1195 let mut explode_names = Vec::new();
1197 let mut explode_exprs = Vec::new();
1198 let mut explode_lookup = PlHashMap::new();
1199
1200 for expr in &projections {
1201 for e in expr {
1202 if let Expr::Explode { input, .. } = e {
1203 match input.as_ref() {
1204 Expr::Column(name) => explode_names.push(name.clone()),
1205 other_expr => {
1206 if !has_expr(other_expr, |e| matches!(e, Expr::Agg(_) | Expr::Len)) {
1208 let temp_name =
1209 format_pl_smallstr!("__POLARS_UNNEST_{}", explode_exprs.len());
1210 explode_exprs.push(other_expr.clone().alias(temp_name.as_str()));
1211 explode_lookup.insert(other_expr.clone(), temp_name.clone());
1212 explode_names.push(temp_name);
1213 }
1214 },
1215 }
1216 }
1217 }
1218 }
1219 if !explode_names.is_empty() {
1220 if !explode_exprs.is_empty() {
1221 lf = lf.with_columns(explode_exprs);
1222 }
1223 lf = lf.explode(
1224 Selector::ByName {
1225 names: Arc::from(explode_names),
1226 strict: true,
1227 },
1228 ExplodeOptions {
1229 empty_as_null: true,
1230 keep_nulls: true,
1231 },
1232 );
1233 projections = projections
1234 .into_iter()
1235 .map(|p| {
1236 p.map_expr(|e| match e {
1238 Expr::Explode { input, .. } => explode_lookup
1239 .get(input.as_ref())
1240 .map(|name| Expr::Column(name.clone()))
1241 .unwrap_or_else(|| input.as_ref().clone()),
1242 _ => e,
1243 })
1244 })
1245 .collect();
1246
1247 schema = self.get_frame_schema(&mut lf)?;
1248 }
1249
1250 let mut group_by_keys: Vec<Expr> = Vec::new();
1252 match &select_stmt.group_by {
1253 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
1255 if !modifiers.is_empty() {
1256 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1257 }
1258 group_by_keys = group_by_exprs
1260 .iter()
1261 .map(|e| match e {
1262 SQLExpr::Identifier(ident) => {
1263 resolve_select_alias(&ident.value, &projections, &schema).map_or_else(
1264 || {
1265 self.expr_or_ordinal(
1266 e,
1267 &projections,
1268 None,
1269 Some(&schema),
1270 "GROUP BY",
1271 )
1272 },
1273 Ok,
1274 )
1275 },
1276 _ => self.expr_or_ordinal(e, &projections, None, Some(&schema), "GROUP BY"),
1277 })
1278 .collect::<PolarsResult<_>>()?
1279 },
1280 GroupByExpr::All(modifiers) => {
1283 if !modifiers.is_empty() {
1284 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
1285 }
1286 projections.iter().for_each(|expr| match expr {
1287 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
1289 Expr::Column(_) => group_by_keys.push(expr.clone()),
1290 Expr::Alias(e, _)
1291 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
1292 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
1293 if let Expr::Column(name) = &**e {
1294 group_by_keys.push(col(name.clone()));
1295 }
1296 },
1297 _ => {
1298 if !has_expr(expr, |e| {
1300 matches!(e, Expr::Agg(_))
1301 || matches!(e, Expr::Len)
1302 || matches!(e, Expr::Over { .. })
1303 || {
1304 #[cfg(feature = "dynamic_group_by")]
1305 {
1306 matches!(e, Expr::Rolling { .. })
1307 }
1308 #[cfg(not(feature = "dynamic_group_by"))]
1309 {
1310 false
1311 }
1312 }
1313 }) {
1314 group_by_keys.push(expr.clone())
1315 }
1316 },
1317 });
1318 },
1319 };
1320
1321 lf = if group_by_keys.is_empty() {
1322 if select_stmt.having.is_some() {
1324 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
1325 };
1326
1327 let mut retained_cols = Vec::with_capacity(projections.len());
1329 let mut retained_names = Vec::with_capacity(projections.len());
1330 let have_order_by = query.order_by.is_some();
1331
1332 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
1334
1335 for p in projections.iter() {
1340 let name = p.to_field(schema.deref())?.name.to_string();
1341 if select_modifiers.matches_ilike(&name)
1342 && !select_modifiers.exclude.contains(&name)
1343 {
1344 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
1345
1346 retained_cols.push(if have_order_by {
1347 col(name.as_str())
1348 } else {
1349 p.clone()
1350 });
1351 retained_names.push(col(name));
1352 }
1353 }
1354
1355 if have_order_by {
1357 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
1362 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1363 {
1364 lf = lf.with_columns(projections);
1365 } else {
1366 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
1375 lf = lf
1376 .clone()
1377 .select(projections)
1378 .with_row_index(NAME, None)
1379 .join(
1380 lf.with_row_index(NAME, None),
1381 [col(NAME)],
1382 [col(NAME)],
1383 JoinArgs {
1384 how: JoinType::Left,
1385 validation: Default::default(),
1386 suffix: None,
1387 slice: None,
1388 nulls_equal: false,
1389 coalesce: Default::default(),
1390 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
1391 build_side: None,
1392 },
1393 );
1394 }
1395 }
1396 if !select_modifiers.replace.is_empty() {
1397 lf = lf.with_columns(&select_modifiers.replace);
1398 }
1399 if !select_modifiers.rename.is_empty() {
1400 lf = lf.with_columns(select_modifiers.renamed_cols());
1401 }
1402 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
1403
1404 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
1406 && !have_order_by
1407 {
1408 lf = lf.with_columns(retained_cols).select(retained_names);
1410 } else {
1411 lf = lf.select(retained_cols);
1412 }
1413 if !select_modifiers.rename.is_empty() {
1414 lf = lf.rename(
1415 select_modifiers.rename.keys(),
1416 select_modifiers.rename.values(),
1417 true,
1418 );
1419 };
1420 lf
1421 } else {
1422 let having = select_stmt
1423 .having
1424 .as_ref()
1425 .map(|expr| parse_sql_expr(expr, self, Some(&schema)))
1426 .transpose()?;
1427 lf = self.process_group_by(lf, &group_by_keys, &projections, having)?;
1428 lf = self.process_order_by(lf, &query.order_by, None)?;
1429
1430 let output_cols: Vec<_> = projections
1432 .iter()
1433 .map(|p| p.to_field(&schema))
1434 .collect::<PolarsResult<Vec<_>>>()?
1435 .into_iter()
1436 .map(|f| col(f.name))
1437 .collect();
1438
1439 lf.select(&output_cols)
1440 };
1441
1442 lf = self.process_qualify(lf, &select_stmt.qualify, &window_fn_columns)?;
1444
1445 lf = match &select_stmt.distinct {
1447 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
1448 Some(Distinct::On(exprs)) => {
1449 let schema = Some(self.get_frame_schema(&mut lf)?);
1451 let cols = exprs
1452 .iter()
1453 .map(|e| {
1454 let expr = parse_sql_expr(e, self, schema.as_deref())?;
1455 if let Expr::Column(name) = expr {
1456 Ok(name)
1457 } else {
1458 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
1459 }
1460 })
1461 .collect::<PolarsResult<Vec<_>>>()?;
1462
1463 lf = self.process_order_by(lf, &query.order_by, None)?;
1465 return Ok(lf.unique_stable(
1466 Some(Selector::ByName {
1467 names: cols.into(),
1468 strict: true,
1469 }),
1470 UniqueKeepStrategy::First,
1471 ));
1472 },
1473 None => lf,
1474 };
1475 Ok(lf)
1476 }
1477
1478 fn column_projections(
1479 &mut self,
1480 select_stmt: &Select,
1481 schema: &SchemaRef,
1482 select_modifiers: &mut SelectModifiers,
1483 ) -> PolarsResult<Vec<Expr>> {
1484 let mut items: Vec<ProjectionItem> = Vec::with_capacity(select_stmt.projection.len());
1485 let mut has_qualified_wildcard = false;
1486
1487 for select_item in &select_stmt.projection {
1488 match select_item {
1489 SelectItem::UnnamedExpr(expr) => {
1490 items.push(ProjectionItem::Exprs(vec![parse_sql_expr(
1491 expr,
1492 self,
1493 Some(schema),
1494 )?]));
1495 },
1496 SelectItem::ExprWithAlias { expr, alias } => {
1497 let expr = parse_sql_expr(expr, self, Some(schema))?;
1498 items.push(ProjectionItem::Exprs(vec![
1499 expr.alias(PlSmallStr::from_str(alias.value.as_str())),
1500 ]));
1501 },
1502 SelectItem::QualifiedWildcard(kind, wildcard_options) => match kind {
1503 SelectItemQualifiedWildcardKind::ObjectName(obj_name) => {
1504 let tbl_name = obj_name
1505 .0
1506 .last()
1507 .and_then(|p| p.as_ident())
1508 .map(|i| PlSmallStr::from_str(&i.value))
1509 .unwrap_or_default();
1510 let exprs = self.process_qualified_wildcard(
1511 obj_name,
1512 wildcard_options,
1513 select_modifiers,
1514 Some(schema),
1515 )?;
1516 items.push(ProjectionItem::QualifiedExprs(tbl_name, exprs));
1517 has_qualified_wildcard = true;
1518 },
1519 SelectItemQualifiedWildcardKind::Expr(_) => {
1520 polars_bail!(SQLSyntax: "qualified wildcard on expressions not yet supported: {:?}", select_item)
1521 },
1522 },
1523 SelectItem::Wildcard(wildcard_options) => {
1524 let cols = schema
1525 .iter_names()
1526 .map(|name| col(name.clone()))
1527 .collect::<Vec<_>>();
1528
1529 items.push(ProjectionItem::Exprs(
1530 self.process_wildcard_additional_options(
1531 cols,
1532 wildcard_options,
1533 select_modifiers,
1534 Some(schema),
1535 )?,
1536 ));
1537 },
1538 }
1539 }
1540
1541 let exprs = if has_qualified_wildcard {
1543 disambiguate_projection_cols(items, schema)?
1544 } else {
1545 items
1546 .into_iter()
1547 .flat_map(|item| match item {
1548 ProjectionItem::Exprs(exprs) | ProjectionItem::QualifiedExprs(_, exprs) => {
1549 exprs
1550 },
1551 })
1552 .collect()
1553 };
1554 let flattened_exprs = exprs
1555 .into_iter()
1556 .flat_map(|expr| expand_exprs(expr, schema))
1557 .collect();
1558
1559 Ok(flattened_exprs)
1560 }
1561
1562 fn process_where(
1563 &mut self,
1564 mut lf: LazyFrame,
1565 expr: &Option<SQLExpr>,
1566 invert_filter: bool,
1567 schema: Option<SchemaRef>,
1568 ) -> PolarsResult<LazyFrame> {
1569 if let Some(expr) = expr {
1570 let schema = match schema {
1571 None => self.get_frame_schema(&mut lf)?,
1572 Some(s) => s,
1573 };
1574
1575 let (all_true, all_false) = match expr {
1577 SQLExpr::Value(ValueWithSpan {
1578 value: SQLValue::Boolean(b),
1579 ..
1580 }) => (*b, !*b),
1581 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1582 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => {
1583 (a.value == b.value, a.value != b.value)
1584 },
1585 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1586 (a.value != b.value, a.value == b.value)
1587 },
1588 _ => (false, false),
1589 },
1590 _ => (false, false),
1591 };
1592 if (all_true && !invert_filter) || (all_false && invert_filter) {
1593 return Ok(lf);
1594 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1595 return Ok(clear_lf(lf));
1596 }
1597
1598 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1600 if filter_expression.clone().meta().has_multiple_outputs() {
1601 filter_expression = all_horizontal([filter_expression])?;
1602 }
1603 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1604 lf = if invert_filter {
1605 lf.remove(filter_expression)
1606 } else {
1607 lf.filter(filter_expression)
1608 };
1609 }
1610 Ok(lf)
1611 }
1612
1613 pub(super) fn process_join(
1614 &mut self,
1615 tbl_left: &TableInfo,
1616 tbl_right: &TableInfo,
1617 constraint: &JoinConstraint,
1618 join_type: JoinType,
1619 ) -> PolarsResult<LazyFrame> {
1620 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
1621 let coalesce_type = match constraint {
1622 JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
1624 _ => JoinCoalesce::KeepColumns,
1625 };
1626 let joined = tbl_left
1627 .frame
1628 .clone()
1629 .join_builder()
1630 .with(tbl_right.frame.clone())
1631 .left_on(left_on)
1632 .right_on(right_on)
1633 .how(join_type)
1634 .suffix(format!(":{}", tbl_right.name))
1635 .coalesce(coalesce_type)
1636 .finish();
1637
1638 Ok(joined)
1639 }
1640
1641 fn process_qualify(
1642 &mut self,
1643 mut lf: LazyFrame,
1644 qualify_expr: &Option<SQLExpr>,
1645 window_fn_columns: &PlHashSet<String>,
1646 ) -> PolarsResult<LazyFrame> {
1647 if let Some(expr) = qualify_expr {
1648 let (has_window_fns, column_refs) = QualifyExpression::analyze(expr);
1651 let references_window_alias = column_refs.iter().any(|c| window_fn_columns.contains(c));
1652 if !has_window_fns && !references_window_alias {
1653 polars_bail!(
1654 SQLSyntax:
1655 "QUALIFY clause must reference window functions either explicitly or via SELECT aliases"
1656 );
1657 }
1658 let schema = self.get_frame_schema(&mut lf)?;
1659 let mut filter_expression = parse_sql_expr(expr, self, Some(&schema))?;
1660 if filter_expression.clone().meta().has_multiple_outputs() {
1661 filter_expression = all_horizontal([filter_expression])?;
1662 }
1663 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1664 lf = lf.filter(filter_expression);
1665 }
1666 Ok(lf)
1667 }
1668
1669 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1670 let mut subplans = vec![];
1671
1672 for e in exprs {
1673 *e = e.clone().map_expr(|e| {
1674 if let Expr::SubPlan(lp, names) = e {
1675 assert_eq!(
1676 names.len(),
1677 1,
1678 "multiple columns in subqueries not yet supported"
1679 );
1680
1681 let select_expr = names[0].1.clone();
1682 let cb =
1683 PlanCallback::new(move |(plans, schemas): (Vec<DslPlan>, Vec<SchemaRef>)| {
1684 let schema = &schemas[0];
1685 polars_ensure!(schema.len() == 1, SQLSyntax: "SQL subquery returns more than one column");
1686 Ok(LazyFrame::from(plans.into_iter().next().unwrap()).select([select_expr.clone()]).logical_plan)
1687 });
1688 subplans.push(LazyFrame::from((**lp).clone()).pipe_with_schema(cb));
1689 Expr::Column(names[0].0.clone()).first()
1690 } else {
1691 e
1692 }
1693 });
1694 }
1695
1696 if subplans.is_empty() {
1697 lf
1698 } else {
1699 subplans.insert(0, lf);
1700 concat_lf_horizontal(
1701 subplans,
1702 HConcatOptions {
1703 broadcast_unit_length: true,
1704 ..Default::default()
1705 },
1706 )
1707 .unwrap()
1708 }
1709 }
1710
1711 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1712 if let Statement::CreateTable(CreateTable {
1713 if_not_exists,
1714 name,
1715 query,
1716 columns,
1717 like,
1718 ..
1719 }) = stmt
1720 {
1721 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1722 if *if_not_exists && self.table_map.read().unwrap().contains_key(tbl_name) {
1723 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1724 }
1725 let lf = match (query, columns.is_empty(), like) {
1726 (Some(query), true, None) => {
1727 self.execute_query(query)?
1731 },
1732 (None, false, None) => {
1733 let mut schema = Schema::with_capacity(columns.len());
1737 for col in columns {
1738 let col_name = col.name.value.as_str();
1739 let dtype = map_sql_dtype_to_polars(&col.data_type)?;
1740 schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
1741 }
1742 DataFrame::empty_with_schema(&schema).lazy()
1743 },
1744 (None, true, Some(like_kind)) => {
1745 let like_name = match like_kind {
1749 CreateTableLikeKind::Plain(like)
1750 | CreateTableLikeKind::Parenthesized(like) => &like.name,
1751 };
1752 let like_table = like_name
1753 .0
1754 .first()
1755 .unwrap()
1756 .as_ident()
1757 .unwrap()
1758 .value
1759 .as_str();
1760 if let Some(table) = self.table_map.read().unwrap().get(like_table).cloned() {
1761 clear_lf(table)
1762 } else {
1763 polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
1764 }
1765 },
1766 (None, true, None) => {
1768 polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
1769 },
1770 _ => {
1772 polars_bail!(
1773 SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
1774 query,
1775 columns,
1776 like,
1777 )
1778 },
1779 };
1780 self.register(tbl_name, lf);
1781
1782 let df_created = df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().as_ident().unwrap().value)] };
1783 Ok(df_created.unwrap().lazy())
1784 } else {
1785 unreachable!()
1786 }
1787 }
1788
1789 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1790 match relation {
1791 TableFactor::Table {
1792 name, alias, args, ..
1793 } => {
1794 if let Some(args) = args {
1795 return self.execute_table_function(name, alias, &args.args);
1796 }
1797 let tbl_name = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1798 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1799 match alias {
1800 Some(alias) => {
1801 self.table_aliases
1802 .insert(alias.name.value.clone(), tbl_name.to_string());
1803 Ok((alias.name.value.clone(), lf))
1804 },
1805 None => Ok((tbl_name.to_string(), lf)),
1806 }
1807 } else {
1808 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1809 }
1810 },
1811 TableFactor::Derived {
1812 lateral,
1813 subquery,
1814 alias,
1815 } => {
1816 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1817 if let Some(alias) = alias {
1818 let mut lf = self.execute_query_no_ctes(subquery)?;
1819 lf = self.rename_columns_from_table_alias(lf, alias)?;
1820 self.table_map
1821 .write()
1822 .unwrap()
1823 .insert(alias.name.value.clone(), lf.clone());
1824 Ok((alias.name.value.clone(), lf))
1825 } else {
1826 polars_bail!(SQLSyntax: "derived tables must have aliases");
1827 }
1828 },
1829 TableFactor::UNNEST {
1830 alias,
1831 array_exprs,
1832 with_offset,
1833 with_offset_alias: _,
1834 ..
1835 } => {
1836 if let Some(alias) = alias {
1837 let column_names: Vec<Option<PlSmallStr>> = alias
1838 .columns
1839 .iter()
1840 .map(|c| {
1841 if c.name.value.is_empty() {
1842 None
1843 } else {
1844 Some(PlSmallStr::from_str(c.name.value.as_str()))
1845 }
1846 })
1847 .collect();
1848
1849 let column_values: Vec<Series> = array_exprs
1850 .iter()
1851 .map(|arr| parse_sql_array(arr, self))
1852 .collect::<Result<_, _>>()?;
1853
1854 polars_ensure!(!column_names.is_empty(),
1855 SQLSyntax:
1856 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1857 );
1858 if column_names.len() != column_values.len() {
1859 let plural = if column_values.len() > 1 { "s" } else { "" };
1860 polars_bail!(
1861 SQLSyntax:
1862 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1863 );
1864 }
1865 let column_series: Vec<Column> = column_values
1866 .into_iter()
1867 .zip(column_names)
1868 .map(|(s, name)| {
1869 if let Some(name) = name {
1870 s.with_name(name)
1871 } else {
1872 s
1873 }
1874 })
1875 .map(Column::from)
1876 .collect();
1877
1878 let lf = DataFrame::new_infer_height(column_series)?.lazy();
1879
1880 if *with_offset {
1881 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1883 }
1884 let table_name = alias.name.value.clone();
1885 self.table_map
1886 .write()
1887 .unwrap()
1888 .insert(table_name.clone(), lf.clone());
1889 Ok((table_name, lf))
1890 } else {
1891 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1892 }
1893 },
1894 TableFactor::NestedJoin {
1895 table_with_joins,
1896 alias,
1897 } => {
1898 let lf = self.execute_from_statement(table_with_joins)?;
1899 match alias {
1900 Some(a) => Ok((a.name.value.clone(), lf)),
1901 None => Ok(("".to_string(), lf)),
1902 }
1903 },
1904 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1906 }
1907 }
1908
1909 fn execute_table_function(
1910 &mut self,
1911 name: &ObjectName,
1912 alias: &Option<TableAlias>,
1913 args: &[FunctionArg],
1914 ) -> PolarsResult<(String, LazyFrame)> {
1915 let tbl_fn = name.0.first().unwrap().as_ident().unwrap().value.as_str();
1916 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1917 let (tbl_name, lf) = read_fn.execute(args)?;
1918 #[allow(clippy::useless_asref)]
1919 let tbl_name = alias
1920 .as_ref()
1921 .map(|a| a.name.value.clone())
1922 .unwrap_or_else(|| tbl_name.to_string());
1923
1924 self.table_map
1925 .write()
1926 .unwrap()
1927 .insert(tbl_name.clone(), lf.clone());
1928 Ok((tbl_name, lf))
1929 }
1930
1931 fn process_order_by(
1932 &mut self,
1933 mut lf: LazyFrame,
1934 order_by: &Option<OrderBy>,
1935 selected: Option<&[Expr]>,
1936 ) -> PolarsResult<LazyFrame> {
1937 if order_by.as_ref().is_none_or(|ob| match &ob.kind {
1938 OrderByKind::Expressions(exprs) => exprs.is_empty(),
1939 OrderByKind::All(_) => false,
1940 }) {
1941 return Ok(lf);
1942 }
1943 let schema = self.get_frame_schema(&mut lf)?;
1944 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1945 let (order_by, order_by_all, n_order_cols) = match &order_by.as_ref().unwrap().kind {
1946 OrderByKind::Expressions(exprs) => {
1947 if exprs.len() == 1
1950 && matches!(&exprs[0].expr, SQLExpr::Identifier(ident)
1951 if ident.value.to_uppercase() == "ALL"
1952 && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1953 {
1954 let n_cols = if let Some(selected) = selected {
1956 selected.len()
1957 } else {
1958 schema.len()
1959 };
1960 (vec![], Some(&exprs[0].options), n_cols)
1961 } else {
1962 (exprs.clone(), None, exprs.len())
1963 }
1964 },
1965 OrderByKind::All(opts) => {
1966 let n_cols = if let Some(selected) = selected {
1967 selected.len()
1968 } else {
1969 schema.len()
1970 };
1971 (vec![], Some(opts), n_cols)
1972 },
1973 };
1974 let mut descending = Vec::with_capacity(n_order_cols);
1975 let mut nulls_last = Vec::with_capacity(n_order_cols);
1976 let mut by: Vec<Expr> = Vec::with_capacity(n_order_cols);
1977
1978 if let Some(opts) = order_by_all {
1979 if let Some(selected) = selected {
1980 by.extend(selected.iter().cloned());
1981 } else {
1982 by.extend(columns_iter);
1983 };
1984 let desc_order = !opts.asc.unwrap_or(true);
1985 nulls_last.resize(by.len(), !opts.nulls_first.unwrap_or(desc_order));
1986 descending.resize(by.len(), desc_order);
1987 } else {
1988 let columns = &columns_iter.collect::<Vec<_>>();
1989 for ob in order_by {
1990 let desc_order = !ob.options.asc.unwrap_or(true);
1993 nulls_last.push(!ob.options.nulls_first.unwrap_or(desc_order));
1994 descending.push(desc_order);
1995
1996 by.push(self.expr_or_ordinal(
1998 &ob.expr,
1999 columns,
2000 selected,
2001 Some(&schema),
2002 "ORDER BY",
2003 )?)
2004 }
2005 }
2006 Ok(lf.sort_by_exprs(
2007 &by,
2008 SortMultipleOptions::default()
2009 .with_order_descending_multi(descending)
2010 .with_nulls_last_multi(nulls_last),
2011 ))
2012 }
2013
2014 fn process_group_by(
2015 &mut self,
2016 mut lf: LazyFrame,
2017 group_by_keys: &[Expr],
2018 projections: &[Expr],
2019 having: Option<Expr>,
2020 ) -> PolarsResult<LazyFrame> {
2021 let schema_before = self.get_frame_schema(&mut lf)?;
2022 let group_by_keys_schema =
2023 expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
2024 format!("group_by keys contained duplicate output name '{duplicate_name}'")
2025 })?;
2026
2027 let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
2029 let mut aggregation_projection = Vec::with_capacity(projections.len());
2030 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
2031 let mut projection_aliases = PlHashSet::new();
2032 let mut group_key_aliases = PlHashSet::new();
2033
2034 let group_key_data: Vec<_> = group_by_keys
2037 .iter()
2038 .map(|gk| {
2039 (
2040 strip_outer_alias(gk),
2041 gk.to_field(&schema_before).ok().map(|f| f.name),
2042 )
2043 })
2044 .collect();
2045
2046 let projection_matches_group_key: Vec<bool> = projections
2047 .iter()
2048 .map(|p| {
2049 let p_stripped = strip_outer_alias(p);
2050 let p_name = p.to_field(&schema_before).ok().map(|f| f.name);
2051 group_key_data
2052 .iter()
2053 .any(|(gk_stripped, gk_name)| *gk_stripped == p_stripped && *gk_name == p_name)
2054 })
2055 .collect();
2056
2057 for (e, &matches_group_key) in projections.iter().zip(&projection_matches_group_key) {
2058 let is_non_group_key_expr = !matches_group_key
2060 && has_expr(e, |e| {
2061 match e {
2062 Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
2063 #[cfg(feature = "dynamic_group_by")]
2064 Expr::Rolling { .. } => true,
2065 Expr::Function { function: func, .. }
2066 if !matches!(func, FunctionExpr::StructExpr(_)) =>
2067 {
2068 has_expr(e, |e| match e {
2071 Expr::Column(name) => !group_by_keys_schema.contains(name),
2072 _ => false,
2073 })
2074 },
2075 _ => false,
2076 }
2077 });
2078
2079 let mut e_inner = e;
2082 if let Expr::Alias(expr, alias) = e {
2083 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
2084 group_key_aliases.insert(alias.as_ref());
2085 e_inner = expr
2086 } else if let Expr::Function {
2087 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
2088 ..
2089 } = expr.deref()
2090 {
2091 projection_overrides
2092 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
2093 } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
2094 projection_aliases.insert(alias.as_ref());
2095 }
2096 }
2097 let field = e_inner.to_field(&schema_before)?;
2098 if is_non_group_key_expr {
2099 let mut e = e.clone();
2100 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
2101 e = (**expr).clone();
2102 } else if let Expr::Alias(expr, name) = &e {
2103 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
2104 e = (**expr).clone().alias(name.clone());
2105 }
2106 }
2107 if group_by_keys_schema.get(&field.name).is_some() {
2110 let alias_name = format!("__POLARS_AGG_{}", field.name);
2111 e = e.alias(alias_name.as_str());
2112 aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
2113 }
2114 aggregation_projection.push(e);
2115 } else if !matches_group_key {
2116 if let Expr::Column(_)
2118 | Expr::Function {
2119 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
2120 ..
2121 } = e_inner
2122 {
2123 if !group_by_keys_schema.contains(&field.name) {
2124 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
2125 }
2126 }
2127 }
2128 }
2129
2130 let having_filter = if let Some(having_expr) = having {
2133 let mut agg_to_name: Vec<(Expr, PlSmallStr)> = aggregation_projection
2134 .iter()
2135 .filter_map(|p| match p {
2136 Expr::Alias(inner, name) if matches!(**inner, Expr::Agg(_) | Expr::Len) => {
2137 Some((inner.as_ref().clone(), name.clone()))
2138 },
2139 e @ (Expr::Agg(_) | Expr::Len) => Some((
2140 e.clone(),
2141 e.to_field(&schema_before)
2142 .map(|f| f.name)
2143 .unwrap_or_default(),
2144 )),
2145 _ => None,
2146 })
2147 .collect();
2148
2149 let mut n_having_aggs = 0;
2150 let updated_having = having_expr.map_expr(|e| {
2151 if !matches!(&e, Expr::Agg(_) | Expr::Len) {
2152 return e;
2153 }
2154 let name = agg_to_name
2155 .iter()
2156 .find_map(|(expr, n)| (*expr == e).then(|| n.clone()))
2157 .unwrap_or_else(|| {
2158 let n = format_pl_smallstr!("__POLARS_HAVING_{n_having_aggs}");
2159 aggregation_projection.push(e.clone().alias(n.clone()));
2160 agg_to_name.push((e.clone(), n.clone()));
2161 n_having_aggs += 1;
2162 n
2163 });
2164 col(name)
2165 });
2166 Some(updated_having)
2167 } else {
2168 None
2169 };
2170
2171 let mut aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
2173 if let Some(filter_expr) = having_filter {
2174 aggregated = aggregated.filter(filter_expr);
2175 }
2176
2177 let projection_schema =
2178 expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
2179 format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
2180 })?;
2181
2182 let final_projection = projection_schema
2185 .iter_names()
2186 .zip(projections.iter().zip(&projection_matches_group_key))
2187 .map(|(name, (projection_expr, &matches_group_key))| {
2188 if let Some(expr) = projection_overrides.get(name.as_str()) {
2189 expr.clone()
2190 } else if let Some(aliased_name) = aliased_aggregations.get(name) {
2191 col(aliased_name.clone()).alias(name.clone())
2192 } else if group_by_keys_schema.get(name).is_some() && matches_group_key {
2193 col(name.clone())
2194 } else if group_by_keys_schema.get(name).is_some()
2195 || projection_aliases.contains(name.as_str())
2196 || group_key_aliases.contains(name.as_str())
2197 {
2198 if has_expr(projection_expr, |e| {
2199 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
2200 }) {
2201 col(name.clone())
2202 } else {
2203 projection_expr.clone()
2204 }
2205 } else {
2206 col(name.clone())
2207 }
2208 })
2209 .collect::<Vec<_>>();
2210
2211 let mut output_projection = final_projection;
2213 for key_name in group_by_keys_schema.iter_names() {
2214 if !projection_schema.contains(key_name) {
2215 output_projection.push(col(key_name.clone()));
2217 } else if group_by_keys.iter().any(|k| is_simple_col_ref(k, key_name)) {
2218 let is_cross_aliased = projections.iter().any(|p| {
2220 p.to_field(&schema_before).is_ok_and(|f| f.name == key_name)
2221 && !is_simple_col_ref(p, key_name)
2222 });
2223 if is_cross_aliased {
2224 let internal_name = format_pl_smallstr!("__POLARS_ORIG_{}", key_name);
2226 output_projection.push(col(key_name.clone()).alias(internal_name));
2227 }
2228 }
2229 }
2230 Ok(aggregated.select(&output_projection))
2231 }
2232
2233 fn process_limit_offset(
2234 &self,
2235 lf: LazyFrame,
2236 limit_clause: &Option<LimitClause>,
2237 fetch: &Option<Fetch>,
2238 ) -> PolarsResult<LazyFrame> {
2239 let (limit, offset) = match limit_clause {
2241 Some(LimitClause::LimitOffset {
2242 limit,
2243 offset,
2244 limit_by,
2245 }) => {
2246 if !limit_by.is_empty() {
2247 polars_bail!(SQLSyntax: "`LIMIT <n> BY <exprs>` clause is not supported");
2250 }
2251 (limit.as_ref(), offset.as_ref().map(|o| &o.value))
2252 },
2253 Some(LimitClause::OffsetCommaLimit { offset, limit }) => (Some(limit), Some(offset)),
2254 None => (None, None),
2255 };
2256
2257 let limit = match (fetch, limit) {
2259 (Some(fetch), None) => fetch.quantity.as_ref(),
2260 (Some(_), Some(_)) => {
2261 polars_bail!(SQLSyntax: "cannot use both `LIMIT` and `FETCH` in the same query")
2262 },
2263 (None, limit) => limit,
2264 };
2265
2266 match (offset, limit) {
2268 (
2269 Some(SQLExpr::Value(ValueWithSpan {
2270 value: SQLValue::Number(offset, _),
2271 ..
2272 })),
2273 Some(SQLExpr::Value(ValueWithSpan {
2274 value: SQLValue::Number(limit, _),
2275 ..
2276 })),
2277 ) => Ok(lf.slice(
2278 offset
2279 .parse()
2280 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2281 limit.parse().map_err(
2282 |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2283 )?,
2284 )),
2285 (
2286 Some(SQLExpr::Value(ValueWithSpan {
2287 value: SQLValue::Number(offset, _),
2288 ..
2289 })),
2290 None,
2291 ) => Ok(lf.slice(
2292 offset
2293 .parse()
2294 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
2295 IdxSize::MAX,
2296 )),
2297 (
2298 None,
2299 Some(SQLExpr::Value(ValueWithSpan {
2300 value: SQLValue::Number(limit, _),
2301 ..
2302 })),
2303 ) => {
2304 Ok(lf.limit(limit.parse().map_err(
2305 |e| polars_err!(SQLInterface: "LIMIT/FETCH conversion error: {}", e),
2306 )?))
2307 },
2308 (None, None) => Ok(lf),
2309 _ => polars_bail!(
2310 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET/FETCH are not supported",
2311 ),
2312 }
2313 }
2314
2315 fn process_qualified_wildcard(
2316 &mut self,
2317 ObjectName(idents): &ObjectName,
2318 options: &WildcardAdditionalOptions,
2319 modifiers: &mut SelectModifiers,
2320 schema: Option<&Schema>,
2321 ) -> PolarsResult<Vec<Expr>> {
2322 let mut idents_with_wildcard: Vec<Ident> = idents
2323 .iter()
2324 .filter_map(|p| p.as_ident().cloned())
2325 .collect();
2326 idents_with_wildcard.push(Ident::new("*"));
2327
2328 let exprs = resolve_compound_identifier(self, &idents_with_wildcard, schema)?;
2329 self.process_wildcard_additional_options(exprs, options, modifiers, schema)
2330 }
2331
2332 fn process_wildcard_additional_options(
2333 &mut self,
2334 exprs: Vec<Expr>,
2335 options: &WildcardAdditionalOptions,
2336 modifiers: &mut SelectModifiers,
2337 schema: Option<&Schema>,
2338 ) -> PolarsResult<Vec<Expr>> {
2339 if options.opt_except.is_some() && options.opt_exclude.is_some() {
2340 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
2341 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
2342 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
2343 }
2344
2345 if let Some(items) = &options.opt_exclude {
2347 match items {
2348 ExcludeSelectItem::Single(ident) => {
2349 modifiers.exclude.insert(ident.value.clone());
2350 },
2351 ExcludeSelectItem::Multiple(idents) => {
2352 modifiers
2353 .exclude
2354 .extend(idents.iter().map(|i| i.value.clone()));
2355 },
2356 };
2357 }
2358
2359 if let Some(items) = &options.opt_except {
2361 modifiers.exclude.insert(items.first_element.value.clone());
2362 modifiers
2363 .exclude
2364 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
2365 }
2366
2367 if let Some(item) = &options.opt_ilike {
2369 let rx = regex::escape(item.pattern.as_str())
2370 .replace('%', ".*")
2371 .replace('_', ".");
2372
2373 modifiers.ilike = Some(
2374 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
2375 );
2376 }
2377
2378 if let Some(items) = &options.opt_rename {
2380 let renames = match items {
2381 RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
2382 RenameSelectItem::Multiple(renames) => renames.as_slice(),
2383 };
2384 for rn in renames {
2385 let before = PlSmallStr::from_str(rn.ident.value.as_str());
2386 let after = PlSmallStr::from_str(rn.alias.value.as_str());
2387 if before != after {
2388 modifiers.rename.insert(before, after);
2389 }
2390 }
2391 }
2392
2393 if let Some(replacements) = &options.opt_replace {
2395 for rp in &replacements.items {
2396 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
2397 modifiers
2398 .replace
2399 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
2400 }
2401 }
2402 Ok(exprs)
2403 }
2404
2405 fn rename_columns_from_table_alias(
2406 &mut self,
2407 mut lf: LazyFrame,
2408 alias: &TableAlias,
2409 ) -> PolarsResult<LazyFrame> {
2410 if alias.columns.is_empty() {
2411 Ok(lf)
2412 } else {
2413 let schema = self.get_frame_schema(&mut lf)?;
2414 if alias.columns.len() != schema.len() {
2415 polars_bail!(
2416 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
2417 alias.columns.len(), alias.name.value, schema.len()
2418 )
2419 } else {
2420 let existing_columns: Vec<_> = schema.iter_names().collect();
2421 let new_columns: Vec<_> =
2422 alias.columns.iter().map(|c| c.name.value.clone()).collect();
2423 Ok(lf.rename(existing_columns, new_columns, true))
2424 }
2425 }
2426 }
2427}
2428
2429impl SQLContext {
2430 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
2432 Self {
2433 table_map: Arc::new(RwLock::new(table_map)),
2434 ..Default::default()
2435 }
2436 }
2437}
2438
2439fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
2440 match expr {
2441 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
2442 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
2443 schema
2444 .iter_names()
2445 .filter(|name| re.is_match(name))
2446 .map(|name| col(name.clone()))
2447 .collect::<Vec<_>>()
2448 },
2449 Expr::Selector(s) => s
2450 .into_columns(schema, &Default::default())
2451 .unwrap()
2452 .into_iter()
2453 .map(col)
2454 .collect::<Vec<_>>(),
2455 _ => vec![expr],
2456 }
2457}
2458
2459fn is_regex_colname(nm: &str) -> bool {
2460 nm.starts_with('^') && nm.ends_with('$')
2461}
2462
2463fn get_using_cols(op: &JoinOperator) -> Option<impl Iterator<Item = String> + '_> {
2465 use JoinOperator::*;
2466 match op {
2467 Join(JoinConstraint::Using(cols))
2468 | Inner(JoinConstraint::Using(cols))
2469 | Left(JoinConstraint::Using(cols))
2470 | LeftOuter(JoinConstraint::Using(cols))
2471 | Right(JoinConstraint::Using(cols))
2472 | RightOuter(JoinConstraint::Using(cols))
2473 | FullOuter(JoinConstraint::Using(cols))
2474 | Semi(JoinConstraint::Using(cols))
2475 | Anti(JoinConstraint::Using(cols))
2476 | LeftSemi(JoinConstraint::Using(cols))
2477 | LeftAnti(JoinConstraint::Using(cols))
2478 | RightSemi(JoinConstraint::Using(cols))
2479 | RightAnti(JoinConstraint::Using(cols)) => Some(cols.iter().filter_map(|c| {
2480 c.0.first()
2481 .and_then(|p| p.as_ident())
2482 .map(|i| i.value.clone())
2483 })),
2484 _ => None,
2485 }
2486}
2487
2488fn get_table_name(factor: &TableFactor) -> Option<String> {
2490 match factor {
2491 TableFactor::Table { name, alias, .. } => {
2492 alias.as_ref().map(|a| a.name.value.clone()).or_else(|| {
2493 name.0
2494 .last()
2495 .and_then(|p| p.as_ident())
2496 .map(|i| i.value.clone())
2497 })
2498 },
2499 TableFactor::Derived { alias, .. }
2500 | TableFactor::NestedJoin { alias, .. }
2501 | TableFactor::TableFunction { alias, .. } => alias.as_ref().map(|a| a.name.value.clone()),
2502 _ => None,
2503 }
2504}
2505
2506fn is_simple_col_ref(expr: &Expr, col_name: &PlSmallStr) -> bool {
2508 match expr {
2509 Expr::Column(n) => n == col_name,
2510 Expr::Alias(inner, _) => matches!(inner.as_ref(), Expr::Column(n) if n == col_name),
2511 _ => false,
2512 }
2513}
2514
2515fn strip_outer_alias(expr: &Expr) -> Expr {
2517 if let Expr::Alias(inner, _) = expr {
2518 inner.as_ref().clone()
2519 } else {
2520 expr.clone()
2521 }
2522}
2523
2524fn resolve_select_alias(name: &str, projections: &[Expr], schema: &Schema) -> Option<Expr> {
2529 if schema.contains(name) {
2531 return None;
2532 }
2533 projections.iter().find_map(|p| match p {
2535 Expr::Alias(inner, alias) if alias.as_str() == name => {
2536 Some(inner.as_ref().clone().alias(alias.clone()))
2537 },
2538 _ => None,
2539 })
2540}
2541
2542fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
2544 let mut found_cols = false;
2545 let mut all_in_schema = true;
2546 for e in expr.into_iter() {
2547 if let Expr::Column(name) = e {
2548 found_cols = true;
2549 if !schema.contains(name.as_str()) {
2550 all_in_schema = false;
2551 break;
2552 }
2553 }
2554 }
2555 found_cols && all_in_schema
2556}
2557
2558fn determine_left_right_join_on(
2566 ctx: &mut SQLContext,
2567 expr_left: &SQLExpr,
2568 expr_right: &SQLExpr,
2569 tbl_left: &TableInfo,
2570 tbl_right: &TableInfo,
2571 join_schema: &Schema,
2572) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2573 let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
2576 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2577 e => e,
2578 };
2579 let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
2580 Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
2581 e => e,
2582 };
2583
2584 let left_refs = (
2588 expr_refers_to_table(expr_left, &tbl_left.name),
2589 expr_refers_to_table(expr_left, &tbl_right.name),
2590 );
2591 let right_refs = (
2592 expr_refers_to_table(expr_right, &tbl_left.name),
2593 expr_refers_to_table(expr_right, &tbl_right.name),
2594 );
2595 match (left_refs, right_refs) {
2597 ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
2599 ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
2601 ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
2603 polars_bail!(
2604 SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
2605 if left_refs.0 && left_refs.1 {
2606 "left"
2607 } else {
2608 "right"
2609 }, tbl_left.name, tbl_right.name
2610 )
2611 },
2612 _ => {},
2614 }
2615
2616 let left_on_cols_in = (
2621 expr_cols_all_in_schema(&left_on, &tbl_left.schema),
2622 expr_cols_all_in_schema(&left_on, &tbl_right.schema),
2623 );
2624 let right_on_cols_in = (
2625 expr_cols_all_in_schema(&right_on, &tbl_left.schema),
2626 expr_cols_all_in_schema(&right_on, &tbl_right.schema),
2627 );
2628 match (left_on_cols_in, right_on_cols_in) {
2629 ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
2631 ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2632 ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
2634 ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
2635 ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
2636 ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
2637 _ => Ok((vec![left_on], vec![right_on])),
2639 }
2640}
2641
2642fn process_join_on(
2643 ctx: &mut SQLContext,
2644 sql_expr: &SQLExpr,
2645 tbl_left: &TableInfo,
2646 tbl_right: &TableInfo,
2647) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2648 match sql_expr {
2649 SQLExpr::BinaryOp { left, op, right } => match op {
2650 BinaryOperator::And => {
2651 let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
2652 let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
2653 left_i.append(&mut left_j);
2654 right_i.append(&mut right_j);
2655 Ok((left_i, right_i))
2656 },
2657 BinaryOperator::Eq => {
2658 let mut join_schema =
2661 Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
2662 for (name, dtype) in tbl_left.schema.iter() {
2663 join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
2664 }
2665 for (name, dtype) in tbl_right.schema.iter() {
2666 if !join_schema.contains(name) {
2667 join_schema.insert_at_index(
2668 join_schema.len(),
2669 name.clone(),
2670 dtype.clone(),
2671 )?;
2672 }
2673 }
2674 determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
2675 },
2676 _ => polars_bail!(
2677 SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
2679 ),
2680 },
2681 SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
2682 _ => polars_bail!(
2683 SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
2684 ),
2685 }
2686}
2687
2688fn process_join_constraint(
2689 constraint: &JoinConstraint,
2690 tbl_left: &TableInfo,
2691 tbl_right: &TableInfo,
2692 ctx: &mut SQLContext,
2693) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
2694 match constraint {
2695 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
2696 process_join_on(ctx, expr, tbl_left, tbl_right)
2697 },
2698 JoinConstraint::Using(idents) if !idents.is_empty() => {
2699 let using: Vec<Expr> = idents
2700 .iter()
2701 .map(|ObjectName(parts)| {
2702 if parts.len() != 1 {
2703 polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects simple column names, not qualified names");
2704 }
2705 match parts[0].as_ident() {
2706 Some(ident) => Ok(col(ident.value.as_str())),
2707 None => polars_bail!(SQLSyntax: "JOIN \"USING\" clause expects identifiers, not functions"),
2708 }
2709 })
2710 .collect::<PolarsResult<Vec<_>>>()?;
2711 Ok((using.clone(), using))
2712 },
2713 JoinConstraint::Natural => {
2714 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
2715 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
2716 let on: Vec<Expr> = left_names
2717 .intersection(&right_names)
2718 .map(|&name| col(name.clone()))
2719 .collect();
2720 if on.is_empty() {
2721 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
2722 }
2723 Ok((on.clone(), on))
2724 },
2725 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
2726 }
2727}
2728
2729pub fn extract_table_identifiers(
2733 query: &str,
2734 include_schema: bool,
2735 unique: bool,
2736) -> PolarsResult<Vec<String>> {
2737 let mut parser = Parser::new(&GenericDialect);
2738 parser = parser.with_options(ParserOptions {
2739 trailing_commas: true,
2740 ..Default::default()
2741 });
2742 let ast = parser
2743 .try_with_sql(query)
2744 .map_err(to_sql_interface_err)?
2745 .parse_statements()
2746 .map_err(to_sql_interface_err)?;
2747
2748 let mut collector = TableIdentifierCollector {
2749 include_schema,
2750 ..Default::default()
2751 };
2752 for stmt in &ast {
2753 let _ = stmt.visit(&mut collector);
2754 }
2755 Ok(if unique {
2756 collector
2757 .tables
2758 .into_iter()
2759 .collect::<PlIndexSet<_>>()
2760 .into_iter()
2761 .collect()
2762 } else {
2763 collector.tables
2764 })
2765}
2766
2767bitflags::bitflags! {
2768 #[derive(PartialEq)]
2773 struct ExprSqlProjectionHeightBehavior: u8 {
2774 const MaintainsColumn = 1 << 0;
2776 const Independent = 1 << 1;
2780 const InheritsContext = 1 << 2;
2783 }
2784}
2785
2786impl ExprSqlProjectionHeightBehavior {
2787 fn identify_from_expr(expr: &Expr) -> Self {
2788 let mut has_column = false;
2789 let mut has_independent = false;
2790
2791 for e in expr.into_iter() {
2792 use Expr::*;
2793 has_column |= matches!(e, Column(_) | Selector(_));
2794 has_independent |= match e {
2795 AnonymousFunction { options, .. } => {
2797 options.returns_scalar() || !options.is_length_preserving()
2798 },
2799 Literal(v) => !v.is_scalar(),
2800 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
2801 Agg { .. } | Len => true,
2802 _ => false,
2803 }
2804 }
2805 if has_independent {
2806 Self::Independent
2807 } else if has_column {
2808 Self::MaintainsColumn
2809 } else {
2810 Self::InheritsContext
2811 }
2812 }
2813}