1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12 BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13 FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14 OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15 Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16 WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29 pub(crate) frame: LazyFrame,
30 pub(crate) name: PlSmallStr,
31 pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
40impl SelectModifiers {
41 fn matches_ilike(&self, s: &str) -> bool {
42 match &self.ilike {
43 Some(rx) => rx.is_match(s),
44 None => true,
45 }
46 }
47 fn renamed_cols(&self) -> Vec<Expr> {
48 self.rename
49 .iter()
50 .map(|(before, after)| col(before.clone()).alias(after.clone()))
51 .collect()
52 }
53}
54
55#[derive(Clone)]
57pub struct SQLContext {
58 pub(crate) table_map: PlHashMap<String, LazyFrame>,
59 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60 pub(crate) lp_arena: Arena<IR>,
61 pub(crate) expr_arena: Arena<AExpr>,
62
63 cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64 table_aliases: RefCell<PlHashMap<String, String>>,
65 joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69 fn default() -> Self {
70 Self {
71 function_registry: Arc::new(DefaultFunctionRegistry {}),
72 table_map: Default::default(),
73 cte_map: Default::default(),
74 table_aliases: Default::default(),
75 joined_aliases: Default::default(),
76 lp_arena: Default::default(),
77 expr_arena: Default::default(),
78 }
79 }
80}
81
82impl SQLContext {
83 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn get_tables(&self) -> Vec<String> {
96 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97 tables.sort_unstable();
98 tables
99 }
100
101 pub fn register(&mut self, name: &str, lf: LazyFrame) {
117 self.table_map.insert(name.to_owned(), lf);
118 }
119
120 pub fn unregister(&mut self, name: &str) {
122 self.table_map.remove(&name.to_owned());
123 }
124
125 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144 let mut parser = Parser::new(&GenericDialect);
145 parser = parser.with_options(ParserOptions {
146 trailing_commas: true,
147 ..Default::default()
148 });
149
150 let ast = parser
151 .try_with_sql(query)
152 .map_err(to_sql_interface_err)?
153 .parse_statements()
154 .map_err(to_sql_interface_err)?;
155
156 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157 let res = self.execute_statement(ast.first().unwrap())?;
158
159 let lp_arena = std::mem::take(&mut self.lp_arena);
162 let expr_arena = std::mem::take(&mut self.expr_arena);
163 res.set_cached_arena(lp_arena, expr_arena);
164
165 self.cte_map.borrow_mut().clear();
167 self.table_aliases.borrow_mut().clear();
168 self.joined_aliases.borrow_mut().clear();
169
170 Ok(res)
171 }
172
173 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176 self.function_registry = function_registry;
177 self
178 }
179
180 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182 &self.function_registry
183 }
184
185 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187 Arc::get_mut(&mut self.function_registry).unwrap()
188 }
189}
190
191impl SQLContext {
192 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193 let ast = stmt;
194 Ok(match ast {
195 Statement::Query(query) => self.execute_query(query)?,
196 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198 stmt @ Statement::Drop {
199 object_type: ObjectType::Table,
200 ..
201 } => self.execute_drop_table(stmt)?,
202 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205 _ => polars_bail!(
206 SQLInterface: "statement type is not supported:\n{:?}", ast,
207 ),
208 })
209 }
210
211 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212 self.register_ctes(query)?;
213 self.execute_query_no_ctes(query)
214 }
215
216 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217 let lf = self.process_query(&query.body, query)?;
218 self.process_limit_offset(lf, &query.limit, &query.offset)
219 }
220
221 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223 }
224
225 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226 let table = self.table_map.get(name).cloned();
227 table
228 .or_else(|| self.cte_map.borrow().get(name).cloned())
229 .or_else(|| {
230 self.table_aliases
231 .borrow()
232 .get(name)
233 .and_then(|alias| self.table_map.get(alias).cloned())
234 })
235 }
236
237 fn expr_or_ordinal(
238 &mut self,
239 e: &SQLExpr,
240 exprs: &[Expr],
241 selected: Option<&[Expr]>,
242 schema: Option<&Schema>,
243 clause: &str,
244 ) -> PolarsResult<Expr> {
245 match e {
246 SQLExpr::UnaryOp {
247 op: UnaryOperator::Minus,
248 expr,
249 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251 Err(polars_err!(
252 SQLSyntax:
253 "negative ordinal values are invalid for {}; found -{}",
254 clause,
255 idx
256 ))
257 } else {
258 unreachable!()
259 }
260 },
261 SQLExpr::Value(SQLValue::Number(idx, _)) => {
262 let idx = idx.parse::<usize>().map_err(|_| {
264 polars_err!(
265 SQLSyntax:
266 "negative ordinal values are invalid for {}; found {}",
267 clause,
268 idx
269 )
270 })?;
271 let cols = if let Some(cols) = selected {
274 cols
275 } else {
276 exprs
277 };
278 Ok(cols
279 .get(idx - 1)
280 .ok_or_else(|| {
281 polars_err!(
282 SQLInterface:
283 "{} ordinal value must refer to a valid column; found {}",
284 clause,
285 idx
286 )
287 })?
288 .clone())
289 },
290 SQLExpr::Value(v) => Err(polars_err!(
291 SQLSyntax:
292 "{} requires a valid expression or positive ordinal; found {}", clause, v,
293 )),
294 _ => parse_sql_expr(e, self, schema),
295 }
296 }
297
298 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299 if self.joined_aliases.borrow().contains_key(tbl_name) {
300 self.joined_aliases
301 .borrow()
302 .get(tbl_name)
303 .and_then(|aliases| aliases.get(column_name))
304 .cloned()
305 .unwrap_or_else(|| column_name.to_string())
306 } else {
307 column_name.to_string()
308 }
309 }
310
311 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312 match expr {
313 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314 SetExpr::Query(query) => self.execute_query_no_ctes(query),
315 SetExpr::SetOperation {
316 op: SetOperator::Union,
317 set_quantifier,
318 left,
319 right,
320 } => self.process_union(left, right, set_quantifier, query),
321
322 #[cfg(feature = "semi_anti_join")]
323 SetExpr::SetOperation {
324 op: SetOperator::Intersect | SetOperator::Except,
325 set_quantifier,
326 left,
327 right,
328 } => self.process_except_intersect(left, right, set_quantifier, query),
329
330 SetExpr::Values(Values {
331 explicit_row: _,
332 rows,
333 }) => self.process_values(rows),
334
335 SetExpr::Table(tbl) => {
336 if tbl.table_name.is_some() {
337 let table_name = tbl.table_name.as_ref().unwrap();
338 self.get_table_from_current_scope(table_name)
339 .ok_or_else(|| {
340 polars_err!(
341 SQLInterface: "no table or alias named '{}' found",
342 tbl
343 )
344 })
345 } else {
346 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347 }
348 },
349 op => {
350 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351 },
352 }
353 }
354
355 #[cfg(feature = "semi_anti_join")]
356 fn process_except_intersect(
357 &mut self,
358 left: &SetExpr,
359 right: &SetExpr,
360 quantifier: &SetQuantifier,
361 query: &Query,
362 ) -> PolarsResult<LazyFrame> {
363 let (join_type, op_name) = match *query.body {
364 SetExpr::SetOperation {
365 op: SetOperator::Except,
366 ..
367 } => (JoinType::Anti, "EXCEPT"),
368 _ => (JoinType::Semi, "INTERSECT"),
369 };
370 let mut lf = self.process_query(left, query)?;
371 let mut rf = self.process_query(right, query)?;
372 let join = lf
373 .clone()
374 .join_builder()
375 .with(rf.clone())
376 .how(join_type)
377 .join_nulls(true);
378
379 let lf_schema = self.get_frame_schema(&mut lf)?;
380 let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381 let joined_tbl = match quantifier {
382 SetQuantifier::ByName => join.on(lf_cols).finish(),
383 SetQuantifier::Distinct | SetQuantifier::None => {
384 let rf_schema = self.get_frame_schema(&mut rf)?;
385 let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386 if lf_cols.len() != rf_cols.len() {
387 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388 }
389 join.left_on(lf_cols).right_on(rf_cols).finish()
390 },
391 _ => {
392 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393 },
394 };
395 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396 }
397
398 fn process_union(
399 &mut self,
400 left: &SetExpr,
401 right: &SetExpr,
402 quantifier: &SetQuantifier,
403 query: &Query,
404 ) -> PolarsResult<LazyFrame> {
405 let mut lf = self.process_query(left, query)?;
406 let mut rf = self.process_query(right, query)?;
407 let opts = UnionArgs {
408 parallel: true,
409 to_supertypes: true,
410 ..Default::default()
411 };
412 match quantifier {
413 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415 let lf_schema = self.get_frame_schema(&mut lf)?;
416 let rf_schema = self.get_frame_schema(&mut rf)?;
417 if lf_schema.len() != rf_schema.len() {
418 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419 }
420 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421 match quantifier {
422 SetQuantifier::Distinct | SetQuantifier::None => {
423 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424 },
425 _ => concatenated,
426 }
427 },
428 #[cfg(feature = "diagonal_concat")]
430 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431 #[cfg(feature = "diagonal_concat")]
433 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436 },
437 #[allow(unreachable_patterns)]
438 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439 }
440 }
441
442 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443 let frame_rows: Vec<Row> = values.iter().map(|row| {
444 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445 let expr = parse_sql_expr(expr, self, None)?;
446 match expr {
447 Expr::Literal(value) => {
448 value.to_any_value()
449 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450 .map(|av| av.into_static())
451 },
452 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453 }
454 }).collect();
455 row_data.map(Row::new)
456 }).collect::<Result<_, _>>()?;
457
458 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459 }
460
461 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463 match stmt {
464 Statement::Explain { statement, .. } => {
465 let lf = self.execute_statement(statement)?;
466 let plan = lf.describe_optimized_plan()?;
467 let plan = plan
468 .split('\n')
469 .collect::<Series>()
470 .with_name(PlSmallStr::from_static("Logical Plan"))
471 .into_column();
472 let df = DataFrame::new(vec![plan])?;
473 Ok(df.lazy())
474 },
475 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476 }
477 }
478
479 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481 let tables = Column::new("name".into(), self.get_tables());
482 let df = DataFrame::new(vec![tables])?;
483 Ok(df.lazy())
484 }
485
486 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488 match stmt {
489 Statement::Drop { names, .. } => {
490 names.iter().for_each(|name| {
491 self.table_map.remove(&name.to_string());
492 });
493 Ok(DataFrame::empty().lazy())
494 },
495 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496 }
497 }
498
499 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501 if let Statement::Delete(Delete {
502 tables,
503 from,
504 using,
505 selection,
506 returning,
507 order_by,
508 limit,
509 }) = stmt
510 {
511 if !tables.is_empty()
512 || using.is_some()
513 || returning.is_some()
514 || limit.is_some()
515 || !order_by.is_empty()
516 {
517 let error_message = match () {
518 _ if !tables.is_empty() => "DELETE expects exactly one table name",
519 _ if using.is_some() => "DELETE does not support the USING clause",
520 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523 _ => unreachable!(),
524 };
525 polars_bail!(SQLInterface: error_message);
526 }
527 let from_tables = match &from {
528 FromTable::WithFromKeyword(from) => from,
529 FromTable::WithoutKeyword(from) => from,
530 };
531 if from_tables.len() > 1 {
532 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533 }
534 let tbl_expr = from_tables.first().unwrap();
535 if !tbl_expr.joins.is_empty() {
536 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537 }
538 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539 if selection.is_none() {
540 Ok(DataFrame::empty_with_schema(
542 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543 .unwrap()
544 .as_ref(),
545 )
546 .lazy())
547 } else {
548 Ok(self.process_where(lf.clone(), selection, true)?)
550 }
551 } else {
552 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553 }
554 }
555
556 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558 if let Statement::Truncate {
559 table_names,
560 partitions,
561 ..
562 } = stmt
563 {
564 match partitions {
565 None => {
566 if table_names.len() != 1 {
567 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568 }
569 let tbl = table_names[0].to_string();
570 if let Some(lf) = self.table_map.get_mut(&tbl) {
571 *lf = DataFrame::empty_with_schema(
572 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573 .unwrap()
574 .as_ref(),
575 )
576 .lazy();
577 Ok(lf.clone())
578 } else {
579 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580 }
581 },
582 _ => {
583 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584 },
585 }
586 } else {
587 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588 }
589 }
590
591 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592 self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593 }
594
595 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596 if let Some(with) = &query.with {
597 if with.recursive {
598 polars_bail!(SQLInterface: "recursive CTEs are not supported")
599 }
600 for cte in &with.cte_tables {
601 let cte_name = cte.alias.name.value.clone();
602 let mut lf = self.execute_query(&cte.query)?;
603 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604 self.register_cte(&cte_name, lf);
605 }
606 }
607 Ok(())
608 }
609
610 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613 if !tbl_expr.joins.is_empty() {
614 for join in &tbl_expr.joins {
615 let (r_name, mut rf) = self.get_table(&join.relation)?;
616 if r_name.is_empty() {
617 polars_bail!(
619 SQLInterface:
620 "cannot join on unnamed relation; please provide an alias"
621 )
622 }
623 let left_schema = self.get_frame_schema(&mut lf)?;
624 let right_schema = self.get_frame_schema(&mut rf)?;
625
626 lf = match &join.join_operator {
627 op @ (JoinOperator::FullOuter(constraint)
628 | JoinOperator::LeftOuter(constraint)
629 | JoinOperator::RightOuter(constraint)
630 | JoinOperator::Inner(constraint)
631 | JoinOperator::Anti(constraint)
632 | JoinOperator::Semi(constraint)
633 | JoinOperator::LeftAnti(constraint)
634 | JoinOperator::LeftSemi(constraint)
635 | JoinOperator::RightAnti(constraint)
636 | JoinOperator::RightSemi(constraint)) => {
637 let (lf, rf) = match op {
638 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639 _ => (lf, rf),
640 };
641 self.process_join(
642 &TableInfo {
643 frame: lf,
644 name: (&l_name).into(),
645 schema: left_schema.clone(),
646 },
647 &TableInfo {
648 frame: rf,
649 name: (&r_name).into(),
650 schema: right_schema.clone(),
651 },
652 constraint,
653 match op {
654 JoinOperator::FullOuter(_) => JoinType::Full,
655 JoinOperator::LeftOuter(_) => JoinType::Left,
656 JoinOperator::RightOuter(_) => JoinType::Right,
657 JoinOperator::Inner(_) => JoinType::Inner,
658 #[cfg(feature = "semi_anti_join")]
659 JoinOperator::Anti(_)
660 | JoinOperator::LeftAnti(_)
661 | JoinOperator::RightAnti(_) => JoinType::Anti,
662 #[cfg(feature = "semi_anti_join")]
663 JoinOperator::Semi(_)
664 | JoinOperator::LeftSemi(_)
665 | JoinOperator::RightSemi(_) => JoinType::Semi,
666 join_type => polars_bail!(
667 SQLInterface:
668 "join type '{:?}' not currently supported",
669 join_type
670 ),
671 },
672 )?
673 },
674 JoinOperator::CrossJoin => {
675 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676 },
677 join_type => {
678 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679 },
680 };
681
682 let joined_schema = self.get_frame_schema(&mut lf)?;
684
685 self.joined_aliases.borrow_mut().insert(
686 r_name.clone(),
687 right_schema
688 .iter_names()
689 .filter_map(|name| {
690 let aliased_name = format!("{name}:{r_name}");
692 if left_schema.contains(name)
693 && joined_schema.contains(aliased_name.as_str())
694 {
695 Some((name.to_string(), aliased_name))
696 } else {
697 None
698 }
699 })
700 .collect::<PlHashMap<String, String>>(),
701 );
702 }
703 };
704 Ok(lf)
705 }
706
707 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709 let mut lf = if select_stmt.from.is_empty() {
710 DataFrame::empty().lazy()
711 } else {
712 let from = select_stmt.clone().from;
715 if from.len() > 1 {
716 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717 }
718 self.execute_from_statement(from.first().unwrap())?
719 };
720
721 let schema = self.get_frame_schema(&mut lf)?;
723 lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725 let mut select_modifiers = SelectModifiers {
727 ilike: None,
728 exclude: PlHashSet::new(),
729 rename: PlHashMap::new(),
730 replace: vec![],
731 };
732
733 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735 let mut group_by_keys: Vec<Expr> = Vec::new();
737 match &select_stmt.group_by {
738 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740 if !modifiers.is_empty() {
741 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742 }
743 group_by_keys = group_by_exprs
745 .iter()
746 .map(|e| {
747 self.expr_or_ordinal(
748 e,
749 &projections,
750 None,
751 Some(schema.deref()),
752 "GROUP BY",
753 )
754 })
755 .collect::<PolarsResult<_>>()?
756 },
757 GroupByExpr::All(modifiers) => {
760 if !modifiers.is_empty() {
761 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762 }
763 projections.iter().for_each(|expr| match expr {
764 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766 Expr::Column(_) => group_by_keys.push(expr.clone()),
767 Expr::Alias(e, _)
768 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770 if let Expr::Column(name) = &**e {
771 group_by_keys.push(col(name.clone()));
772 }
773 },
774 _ => {
775 if !has_expr(expr, |e| {
777 matches!(e, Expr::Agg(_))
778 || matches!(e, Expr::Len)
779 || matches!(e, Expr::Window { .. })
780 }) {
781 group_by_keys.push(expr.clone())
782 }
783 },
784 });
785 },
786 };
787
788 lf = if group_by_keys.is_empty() {
789 if select_stmt.having.is_some() {
791 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792 };
793
794 let mut retained_cols = Vec::with_capacity(projections.len());
796 let mut retained_names = Vec::with_capacity(projections.len());
797 let have_order_by = query.order_by.is_some();
798 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801 for p in projections.iter() {
806 let name = p
807 .to_field(schema.deref(), Context::Default)?
808 .name
809 .to_string();
810 if select_modifiers.matches_ilike(&name)
811 && !select_modifiers.exclude.contains(&name)
812 {
813 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815 retained_cols.push(if have_order_by {
816 col(name.as_str())
817 } else {
818 p.clone()
819 });
820 retained_names.push(col(name));
821 }
822 }
823
824 if have_order_by {
826 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832 {
833 lf = lf.with_columns(projections);
834 } else {
835 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845 lf = lf
846 .clone()
847 .select(projections)
848 .with_row_index(NAME, None)
849 .join(
850 lf.with_row_index(NAME, None),
851 [col(NAME)],
852 [col(NAME)],
853 JoinArgs {
854 how: JoinType::Left,
855 validation: Default::default(),
856 suffix: None,
857 slice: None,
858 nulls_equal: false,
859 coalesce: Default::default(),
860 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861 },
862 );
863 }
864 }
865
866 if !select_modifiers.replace.is_empty() {
867 lf = lf.with_columns(&select_modifiers.replace);
868 }
869 if !select_modifiers.rename.is_empty() {
870 lf = lf.with_columns(select_modifiers.renamed_cols());
871 }
872
873 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877 && !have_order_by
878 {
879 lf = lf.with_columns(retained_cols).select(retained_names);
881 } else {
882 lf = lf.select(retained_cols);
883 }
884
885 if !select_modifiers.rename.is_empty() {
886 lf = lf.rename(
887 select_modifiers.rename.keys(),
888 select_modifiers.rename.values(),
889 true,
890 );
891 };
892 lf
893 } else {
894 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895 lf = self.process_order_by(lf, &query.order_by, None)?;
896
897 let schema = Some(self.get_frame_schema(&mut lf)?);
899 match select_stmt.having.as_ref() {
900 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901 None => lf,
902 }
903 };
904
905 lf = match &select_stmt.distinct {
907 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908 Some(Distinct::On(exprs)) => {
909 let schema = Some(self.get_frame_schema(&mut lf)?);
911 let cols = exprs
912 .iter()
913 .map(|e| {
914 let expr = parse_sql_expr(e, self, schema.as_deref())?;
915 if let Expr::Column(name) = expr {
916 Ok(name.clone())
917 } else {
918 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919 }
920 })
921 .collect::<PolarsResult<Vec<_>>>()?;
922
923 lf = self.process_order_by(lf, &query.order_by, None)?;
925 return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
926 },
927 None => lf,
928 };
929 Ok(lf)
930 }
931
932 fn column_projections(
933 &mut self,
934 select_stmt: &Select,
935 schema: &SchemaRef,
936 select_modifiers: &mut SelectModifiers,
937 ) -> PolarsResult<Vec<Expr>> {
938 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
939 .projection
940 .iter()
941 .map(|select_item| match select_item {
942 SelectItem::UnnamedExpr(expr) => {
943 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
944 },
945 SelectItem::ExprWithAlias { expr, alias } => {
946 let expr = parse_sql_expr(expr, self, Some(schema))?;
947 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
948 },
949 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
950 .process_qualified_wildcard(
951 obj_name,
952 wildcard_options,
953 select_modifiers,
954 Some(schema),
955 ),
956 SelectItem::Wildcard(wildcard_options) => {
957 let cols = schema
958 .iter_names()
959 .map(|name| col(name.clone()))
960 .collect::<Vec<_>>();
961
962 self.process_wildcard_additional_options(
963 cols,
964 wildcard_options,
965 select_modifiers,
966 Some(schema),
967 )
968 },
969 })
970 .collect();
971
972 let flattened_exprs: Vec<Expr> = parsed_items?
973 .into_iter()
974 .flatten()
975 .flat_map(|expr| expand_exprs(expr, schema))
976 .collect();
977
978 Ok(flattened_exprs)
979 }
980
981 fn process_where(
982 &mut self,
983 mut lf: LazyFrame,
984 expr: &Option<SQLExpr>,
985 invert_filter: bool,
986 ) -> PolarsResult<LazyFrame> {
987 if let Some(expr) = expr {
988 let schema = self.get_frame_schema(&mut lf)?;
989
990 let (all_true, all_false) = match expr {
992 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
993 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
994 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
995 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
996 (a != b, a == b)
997 },
998 _ => (false, false),
999 },
1000 _ => (false, false),
1001 };
1002 if (all_true && !invert_filter) || (all_false && invert_filter) {
1003 return Ok(lf);
1004 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1005 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1006 }
1007
1008 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1010 if filter_expression.clone().meta().has_multiple_outputs() {
1011 filter_expression = all_horizontal([filter_expression])?;
1012 }
1013 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1014 lf = if invert_filter {
1015 lf.remove(filter_expression)
1016 } else {
1017 lf.filter(filter_expression)
1018 };
1019 }
1020 Ok(lf)
1021 }
1022
1023 pub(super) fn process_join(
1024 &mut self,
1025 tbl_left: &TableInfo,
1026 tbl_right: &TableInfo,
1027 constraint: &JoinConstraint,
1028 join_type: JoinType,
1029 ) -> PolarsResult<LazyFrame> {
1030 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1031
1032 let joined = tbl_left
1033 .frame
1034 .clone()
1035 .join_builder()
1036 .with(tbl_right.frame.clone())
1037 .left_on(left_on)
1038 .right_on(right_on)
1039 .how(join_type)
1040 .suffix(format!(":{}", tbl_right.name))
1041 .coalesce(JoinCoalesce::KeepColumns)
1042 .finish();
1043
1044 Ok(joined)
1045 }
1046
1047 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1048 let mut contexts = vec![];
1049 for expr in exprs {
1050 *expr = expr.clone().map_expr(|e| match e {
1051 Expr::SubPlan(lp, names) => {
1052 contexts.push(<LazyFrame>::from((**lp).clone()));
1053 if names.len() == 1 {
1054 Expr::Column(names[0].as_str().into())
1055 } else {
1056 Expr::SubPlan(lp, names)
1057 }
1058 },
1059 e => e,
1060 })
1061 }
1062
1063 if contexts.is_empty() {
1064 lf
1065 } else {
1066 lf.with_context(contexts)
1067 }
1068 }
1069
1070 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1071 if let Statement::CreateTable(CreateTable {
1072 if_not_exists,
1073 name,
1074 query,
1075 ..
1076 }) = stmt
1077 {
1078 let tbl_name = name.0.first().unwrap().value.as_str();
1079 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1081 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1082 }
1084 if let Some(query) = query {
1085 let lf = self.execute_query(query)?;
1086 self.register(tbl_name, lf);
1087 let out = df! {
1088 "Response" => ["CREATE TABLE"]
1089 }
1090 .unwrap()
1091 .lazy();
1092 Ok(out)
1093 } else {
1094 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1095 }
1096 } else {
1097 unreachable!()
1098 }
1099 }
1100
1101 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1102 match relation {
1103 TableFactor::Table {
1104 name, alias, args, ..
1105 } => {
1106 if let Some(args) = args {
1107 return self.execute_table_function(name, alias, &args.args);
1108 }
1109 let tbl_name = name.0.first().unwrap().value.as_str();
1110 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1111 match alias {
1112 Some(alias) => {
1113 self.table_aliases
1114 .borrow_mut()
1115 .insert(alias.name.value.clone(), tbl_name.to_string());
1116 Ok((alias.to_string(), lf))
1117 },
1118 None => Ok((tbl_name.to_string(), lf)),
1119 }
1120 } else {
1121 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1122 }
1123 },
1124 TableFactor::Derived {
1125 lateral,
1126 subquery,
1127 alias,
1128 } => {
1129 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1130 if let Some(alias) = alias {
1131 let mut lf = self.execute_query_no_ctes(subquery)?;
1132 lf = self.rename_columns_from_table_alias(lf, alias)?;
1133 self.table_map.insert(alias.name.value.clone(), lf.clone());
1134 Ok((alias.name.value.clone(), lf))
1135 } else {
1136 polars_bail!(SQLSyntax: "derived tables must have aliases");
1137 }
1138 },
1139 TableFactor::UNNEST {
1140 alias,
1141 array_exprs,
1142 with_offset,
1143 with_offset_alias: _,
1144 ..
1145 } => {
1146 if let Some(alias) = alias {
1147 let column_names: Vec<Option<PlSmallStr>> = alias
1148 .columns
1149 .iter()
1150 .map(|c| {
1151 if c.name.value.is_empty() {
1152 None
1153 } else {
1154 Some(PlSmallStr::from_str(c.name.value.as_str()))
1155 }
1156 })
1157 .collect();
1158
1159 let column_values: Vec<Series> = array_exprs
1160 .iter()
1161 .map(|arr| parse_sql_array(arr, self))
1162 .collect::<Result<_, _>>()?;
1163
1164 polars_ensure!(!column_names.is_empty(),
1165 SQLSyntax:
1166 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1167 );
1168 if column_names.len() != column_values.len() {
1169 let plural = if column_values.len() > 1 { "s" } else { "" };
1170 polars_bail!(
1171 SQLSyntax:
1172 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1173 );
1174 }
1175 let column_series: Vec<Column> = column_values
1176 .into_iter()
1177 .zip(column_names)
1178 .map(|(s, name)| {
1179 if let Some(name) = name {
1180 s.clone().with_name(name)
1181 } else {
1182 s.clone()
1183 }
1184 })
1185 .map(Column::from)
1186 .collect();
1187
1188 let lf = DataFrame::new(column_series)?.lazy();
1189
1190 if *with_offset {
1191 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1194 }
1195 let table_name = alias.name.value.clone();
1196 self.table_map.insert(table_name.clone(), lf.clone());
1197 Ok((table_name.clone(), lf))
1198 } else {
1199 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1200 }
1201 },
1202 TableFactor::NestedJoin {
1203 table_with_joins,
1204 alias,
1205 } => {
1206 let lf = self.execute_from_statement(table_with_joins)?;
1207 match alias {
1208 Some(a) => Ok((a.name.value.clone(), lf)),
1209 None => Ok(("".to_string(), lf)),
1210 }
1211 },
1212 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1214 }
1215 }
1216
1217 fn execute_table_function(
1218 &mut self,
1219 name: &ObjectName,
1220 alias: &Option<TableAlias>,
1221 args: &[FunctionArg],
1222 ) -> PolarsResult<(String, LazyFrame)> {
1223 let tbl_fn = name.0.first().unwrap().value.as_str();
1224 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1225 let (tbl_name, lf) = read_fn.execute(args)?;
1226 #[allow(clippy::useless_asref)]
1227 let tbl_name = alias
1228 .as_ref()
1229 .map(|a| a.name.value.clone())
1230 .unwrap_or_else(|| tbl_name);
1231
1232 self.table_map.insert(tbl_name.clone(), lf.clone());
1233 Ok((tbl_name, lf))
1234 }
1235
1236 fn process_order_by(
1237 &mut self,
1238 mut lf: LazyFrame,
1239 order_by: &Option<OrderBy>,
1240 selected: Option<&[Expr]>,
1241 ) -> PolarsResult<LazyFrame> {
1242 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1243 return Ok(lf);
1244 }
1245 let schema = self.get_frame_schema(&mut lf)?;
1246 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1247
1248 let order_by = order_by.as_ref().unwrap().exprs.clone();
1249 let mut descending = Vec::with_capacity(order_by.len());
1250 let mut nulls_last = Vec::with_capacity(order_by.len());
1251 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1252
1253 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1255 {
1256 if let Some(selected) = selected {
1257 by.extend(selected.iter().cloned());
1258 } else {
1259 by.extend(columns_iter);
1260 };
1261 let desc_order = !order_by[0].asc.unwrap_or(true);
1262 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1263 descending.resize(by.len(), desc_order);
1264 } else {
1265 let columns = &columns_iter.collect::<Vec<_>>();
1266 for ob in order_by {
1267 let desc_order = !ob.asc.unwrap_or(true);
1270 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1271 descending.push(desc_order);
1272
1273 by.push(self.expr_or_ordinal(
1275 &ob.expr,
1276 columns,
1277 selected,
1278 Some(&schema),
1279 "ORDER BY",
1280 )?)
1281 }
1282 }
1283 Ok(lf.sort_by_exprs(
1284 &by,
1285 SortMultipleOptions::default()
1286 .with_order_descending_multi(descending)
1287 .with_nulls_last_multi(nulls_last)
1288 .with_maintain_order(true),
1289 ))
1290 }
1291
1292 fn process_group_by(
1293 &mut self,
1294 mut lf: LazyFrame,
1295 group_by_keys: &[Expr],
1296 projections: &[Expr],
1297 ) -> PolarsResult<LazyFrame> {
1298 let schema_before = self.get_frame_schema(&mut lf)?;
1299 let group_by_keys_schema =
1300 expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1301
1302 let mut aggregation_projection = Vec::with_capacity(projections.len());
1304 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1305 let mut projection_aliases = PlHashSet::new();
1306 let mut group_key_aliases = PlHashSet::new();
1307
1308 for mut e in projections {
1309 let is_agg_or_window = has_expr(e, |e| {
1311 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1312 });
1313
1314 let mut is_function_under_alias = false;
1315
1316 if let Expr::Alias(expr, alias) = e {
1318 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1319 group_key_aliases.insert(alias.as_ref());
1320 e = expr
1321 } else if let Expr::Function {
1322 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1323 ..
1324 } = expr.deref()
1325 {
1326 projection_overrides
1327 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1328 } else if let Expr::Function { .. } = expr.deref() {
1329 is_function_under_alias = true;
1330 } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1331 projection_aliases.insert(alias.as_ref());
1332 }
1333 }
1334 let field = e.to_field(&schema_before, Context::Default)?;
1335 if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1336 let mut e = e.clone();
1337 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1338 e = (**expr).clone();
1339 } else if let Expr::Alias(expr, name) = &e {
1340 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1341 e = (**expr).clone().alias(name.clone());
1342 }
1343 }
1344 aggregation_projection.push(e);
1345 } else if let Expr::Column(_)
1346 | Expr::Function {
1347 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1348 ..
1349 } = e
1350 {
1351 if !group_by_keys_schema.contains(&field.name) {
1353 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1354 }
1355 } else if is_function_under_alias || matches!(e, Expr::Function { .. }) {
1356 aggregation_projection.push(e.clone());
1357 } else if let Expr::Literal { .. }
1358 | Expr::Cast { .. }
1359 | Expr::Ternary { .. }
1360 | Expr::Field { .. }
1361 | Expr::Alias { .. } = e
1362 {
1363 } else {
1365 polars_bail!(SQLSyntax: "Unsupported operation in the GROUP BY clause: {}", e);
1366 }
1367 }
1368 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1369 let projection_schema =
1370 expressions_to_schema(projections, &schema_before, Context::Default)?;
1371
1372 let final_projection = projection_schema
1374 .iter_names()
1375 .zip(projections)
1376 .map(|(name, projection_expr)| {
1377 if let Some(expr) = projection_overrides.get(name.as_str()) {
1378 expr.clone()
1379 } else if group_by_keys_schema.get(name).is_some()
1380 || projection_aliases.contains(name.as_str())
1381 || group_key_aliases.contains(name.as_str())
1382 {
1383 projection_expr.clone()
1384 } else {
1385 col(name.clone())
1386 }
1387 })
1388 .collect::<Vec<_>>();
1389
1390 Ok(aggregated.select(&final_projection))
1391 }
1392
1393 fn process_limit_offset(
1394 &self,
1395 lf: LazyFrame,
1396 limit: &Option<SQLExpr>,
1397 offset: &Option<Offset>,
1398 ) -> PolarsResult<LazyFrame> {
1399 match (offset, limit) {
1400 (
1401 Some(Offset {
1402 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1403 ..
1404 }),
1405 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1406 ) => Ok(lf.slice(
1407 offset
1408 .parse()
1409 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1410 limit
1411 .parse()
1412 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1413 )),
1414 (
1415 Some(Offset {
1416 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1417 ..
1418 }),
1419 None,
1420 ) => Ok(lf.slice(
1421 offset
1422 .parse()
1423 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1424 IdxSize::MAX,
1425 )),
1426 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1427 limit
1428 .parse()
1429 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1430 )),
1431 (None, None) => Ok(lf),
1432 _ => polars_bail!(
1433 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1434 ),
1435 }
1436 }
1437
1438 fn process_qualified_wildcard(
1439 &mut self,
1440 ObjectName(idents): &ObjectName,
1441 options: &WildcardAdditionalOptions,
1442 modifiers: &mut SelectModifiers,
1443 schema: Option<&Schema>,
1444 ) -> PolarsResult<Vec<Expr>> {
1445 let mut new_idents = idents.clone();
1446 new_idents.push(Ident::new("*"));
1447
1448 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1449 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1450 }
1451
1452 fn process_wildcard_additional_options(
1453 &mut self,
1454 exprs: Vec<Expr>,
1455 options: &WildcardAdditionalOptions,
1456 modifiers: &mut SelectModifiers,
1457 schema: Option<&Schema>,
1458 ) -> PolarsResult<Vec<Expr>> {
1459 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1460 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1461 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1462 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1463 }
1464
1465 if let Some(items) = &options.opt_exclude {
1467 match items {
1468 ExcludeSelectItem::Single(ident) => {
1469 modifiers.exclude.insert(ident.value.clone());
1470 },
1471 ExcludeSelectItem::Multiple(idents) => {
1472 modifiers
1473 .exclude
1474 .extend(idents.iter().map(|i| i.value.clone()));
1475 },
1476 };
1477 }
1478
1479 if let Some(items) = &options.opt_except {
1481 modifiers.exclude.insert(items.first_element.value.clone());
1482 modifiers
1483 .exclude
1484 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1485 }
1486
1487 if let Some(item) = &options.opt_ilike {
1489 let rx = regex::escape(item.pattern.as_str())
1490 .replace('%', ".*")
1491 .replace('_', ".");
1492
1493 modifiers.ilike = Some(
1494 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1495 );
1496 }
1497
1498 if let Some(items) = &options.opt_rename {
1500 let renames = match items {
1501 RenameSelectItem::Single(rename) => vec![rename],
1502 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1503 };
1504 for rn in renames {
1505 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1506 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1507 if before != after {
1508 modifiers.rename.insert(before, after);
1509 }
1510 }
1511 }
1512
1513 if let Some(replacements) = &options.opt_replace {
1515 for rp in &replacements.items {
1516 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1517 modifiers
1518 .replace
1519 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1520 }
1521 }
1522 Ok(exprs)
1523 }
1524
1525 fn rename_columns_from_table_alias(
1526 &mut self,
1527 mut lf: LazyFrame,
1528 alias: &TableAlias,
1529 ) -> PolarsResult<LazyFrame> {
1530 if alias.columns.is_empty() {
1531 Ok(lf)
1532 } else {
1533 let schema = self.get_frame_schema(&mut lf)?;
1534 if alias.columns.len() != schema.len() {
1535 polars_bail!(
1536 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1537 alias.columns.len(), alias.name.value, schema.len()
1538 )
1539 } else {
1540 let existing_columns: Vec<_> = schema.iter_names().collect();
1541 let new_columns: Vec<_> =
1542 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1543 Ok(lf.rename(existing_columns, new_columns, true))
1544 }
1545 }
1546 }
1547}
1548
1549impl SQLContext {
1550 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1552 self.table_map.clone()
1553 }
1554
1555 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1557 Self {
1558 table_map,
1559 ..Default::default()
1560 }
1561 }
1562}
1563
1564fn collect_compound_identifiers(
1565 left: &[Ident],
1566 right: &[Ident],
1567 left_name: &str,
1568 right_name: &str,
1569) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1570 if left.len() == 2 && right.len() == 2 {
1571 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1572 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1573
1574 if left_name == tbl_b || right_name == tbl_a {
1576 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1577 } else {
1578 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1579 }
1580 } else {
1581 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1582 }
1583}
1584
1585fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1586 match expr {
1587 Expr::Wildcard => schema
1588 .iter_names()
1589 .map(|name| col(name.clone()))
1590 .collect::<Vec<_>>(),
1591 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1592 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1593 schema
1594 .iter_names()
1595 .filter(|name| re.is_match(name))
1596 .map(|name| col(name.clone()))
1597 .collect::<Vec<_>>()
1598 },
1599 Expr::Columns(names) => names
1600 .iter()
1601 .map(|name| col(name.clone()))
1602 .collect::<Vec<_>>(),
1603 _ => vec![expr],
1604 }
1605}
1606
1607fn is_regex_colname(nm: &str) -> bool {
1608 nm.starts_with('^') && nm.ends_with('$')
1609}
1610
1611fn process_join_on(
1612 expression: &sqlparser::ast::Expr,
1613 tbl_left: &TableInfo,
1614 tbl_right: &TableInfo,
1615) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1616 match expression {
1617 SQLExpr::BinaryOp { left, op, right } => match op {
1618 BinaryOperator::And => {
1619 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1620 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1621 left_i.append(&mut left_j);
1622 right_i.append(&mut right_j);
1623 Ok((left_i, right_i))
1624 },
1625 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1626 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1627 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1628 },
1629 _ => {
1630 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1631 },
1632 },
1633 _ => {
1634 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1635 },
1636 },
1637 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1638 _ => {
1639 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1640 },
1641 }
1642}
1643
1644fn process_join_constraint(
1645 constraint: &JoinConstraint,
1646 tbl_left: &TableInfo,
1647 tbl_right: &TableInfo,
1648) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1649 match constraint {
1650 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1651 process_join_on(expr, tbl_left, tbl_right)
1652 },
1653 JoinConstraint::Using(idents) if !idents.is_empty() => {
1654 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1655 Ok((using.clone(), using))
1656 },
1657 JoinConstraint::Natural => {
1658 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1659 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1660 let on: Vec<Expr> = left_names
1661 .intersection(&right_names)
1662 .map(|&name| col(name.clone()))
1663 .collect();
1664 if on.is_empty() {
1665 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1666 }
1667 Ok((on.clone(), on))
1668 },
1669 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1670 }
1671}
1672
1673bitflags::bitflags! {
1674 #[derive(PartialEq)]
1679 struct ExprSqlProjectionHeightBehavior: u8 {
1680 const MaintainsColumn = 1 << 0;
1682 const Independent = 1 << 1;
1686 const InheritsContext = 1 << 2;
1689 }
1690}
1691
1692impl ExprSqlProjectionHeightBehavior {
1693 fn identify_from_expr(expr: &Expr) -> Self {
1694 let mut has_column = false;
1695 let mut has_independent = false;
1696
1697 for e in expr.into_iter() {
1698 use Expr::*;
1699
1700 has_column |= matches!(e, Column(_) | Columns(_) | DtypeColumn(_) | IndexColumn(_));
1701
1702 has_independent |= match e {
1703 AnonymousFunction { options, .. } => {
1705 options.returns_scalar() || !options.is_length_preserving()
1706 },
1707
1708 Literal(v) => !v.is_scalar(),
1709
1710 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1711
1712 Agg { .. } | Len => true,
1713
1714 _ => false,
1715 }
1716 }
1717
1718 if has_independent {
1719 Self::Independent
1720 } else if has_column {
1721 Self::MaintainsColumn
1722 } else {
1723 Self::InheritsContext
1724 }
1725 }
1726}