1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12 BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
13 FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
14 OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
15 Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16 WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29 pub(crate) frame: LazyFrame,
30 pub(crate) name: PlSmallStr,
31 pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
40impl SelectModifiers {
41 fn matches_ilike(&self, s: &str) -> bool {
42 match &self.ilike {
43 Some(rx) => rx.is_match(s),
44 None => true,
45 }
46 }
47 fn renamed_cols(&self) -> Vec<Expr> {
48 self.rename
49 .iter()
50 .map(|(before, after)| col(before.clone()).alias(after.clone()))
51 .collect()
52 }
53}
54
55#[derive(Clone)]
57pub struct SQLContext {
58 pub(crate) table_map: PlHashMap<String, LazyFrame>,
59 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60 pub(crate) lp_arena: Arena<IR>,
61 pub(crate) expr_arena: Arena<AExpr>,
62
63 cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64 table_aliases: RefCell<PlHashMap<String, String>>,
65 joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69 fn default() -> Self {
70 Self {
71 function_registry: Arc::new(DefaultFunctionRegistry {}),
72 table_map: Default::default(),
73 cte_map: Default::default(),
74 table_aliases: Default::default(),
75 joined_aliases: Default::default(),
76 lp_arena: Default::default(),
77 expr_arena: Default::default(),
78 }
79 }
80}
81
82impl SQLContext {
83 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn get_tables(&self) -> Vec<String> {
96 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97 tables.sort_unstable();
98 tables
99 }
100
101 pub fn register(&mut self, name: &str, lf: LazyFrame) {
117 self.table_map.insert(name.to_owned(), lf);
118 }
119
120 pub fn unregister(&mut self, name: &str) {
122 self.table_map.remove(&name.to_owned());
123 }
124
125 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144 let mut parser = Parser::new(&GenericDialect);
145 parser = parser.with_options(ParserOptions {
146 trailing_commas: true,
147 ..Default::default()
148 });
149
150 let ast = parser
151 .try_with_sql(query)
152 .map_err(to_sql_interface_err)?
153 .parse_statements()
154 .map_err(to_sql_interface_err)?;
155
156 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157 let res = self.execute_statement(ast.first().unwrap())?;
158
159 let lp_arena = std::mem::take(&mut self.lp_arena);
162 let expr_arena = std::mem::take(&mut self.expr_arena);
163 res.set_cached_arena(lp_arena, expr_arena);
164
165 self.cte_map.borrow_mut().clear();
167 self.table_aliases.borrow_mut().clear();
168 self.joined_aliases.borrow_mut().clear();
169
170 Ok(res)
171 }
172
173 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176 self.function_registry = function_registry;
177 self
178 }
179
180 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182 &self.function_registry
183 }
184
185 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187 Arc::get_mut(&mut self.function_registry).unwrap()
188 }
189}
190
191impl SQLContext {
192 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193 let ast = stmt;
194 Ok(match ast {
195 Statement::Query(query) => self.execute_query(query)?,
196 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198 stmt @ Statement::Drop {
199 object_type: ObjectType::Table,
200 ..
201 } => self.execute_drop_table(stmt)?,
202 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
205 _ => polars_bail!(
206 SQLInterface: "statement type is not supported:\n{:?}", ast,
207 ),
208 })
209 }
210
211 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
212 self.register_ctes(query)?;
213 self.execute_query_no_ctes(query)
214 }
215
216 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
217 let lf = self.process_query(&query.body, query)?;
218 self.process_limit_offset(lf, &query.limit, &query.offset)
219 }
220
221 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
222 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
223 }
224
225 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
226 let table = self.table_map.get(name).cloned();
227 table
228 .or_else(|| self.cte_map.borrow().get(name).cloned())
229 .or_else(|| {
230 self.table_aliases
231 .borrow()
232 .get(name)
233 .and_then(|alias| self.table_map.get(alias).cloned())
234 })
235 }
236
237 fn expr_or_ordinal(
238 &mut self,
239 e: &SQLExpr,
240 exprs: &[Expr],
241 selected: Option<&[Expr]>,
242 schema: Option<&Schema>,
243 clause: &str,
244 ) -> PolarsResult<Expr> {
245 match e {
246 SQLExpr::UnaryOp {
247 op: UnaryOperator::Minus,
248 expr,
249 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
250 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
251 Err(polars_err!(
252 SQLSyntax:
253 "negative ordinal values are invalid for {}; found -{}",
254 clause,
255 idx
256 ))
257 } else {
258 unreachable!()
259 }
260 },
261 SQLExpr::Value(SQLValue::Number(idx, _)) => {
262 let idx = idx.parse::<usize>().map_err(|_| {
264 polars_err!(
265 SQLSyntax:
266 "negative ordinal values are invalid for {}; found {}",
267 clause,
268 idx
269 )
270 })?;
271 let cols = if let Some(cols) = selected {
274 cols
275 } else {
276 exprs
277 };
278 Ok(cols
279 .get(idx - 1)
280 .ok_or_else(|| {
281 polars_err!(
282 SQLInterface:
283 "{} ordinal value must refer to a valid column; found {}",
284 clause,
285 idx
286 )
287 })?
288 .clone())
289 },
290 SQLExpr::Value(v) => Err(polars_err!(
291 SQLSyntax:
292 "{} requires a valid expression or positive ordinal; found {}", clause, v,
293 )),
294 _ => parse_sql_expr(e, self, schema),
295 }
296 }
297
298 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
299 if self.joined_aliases.borrow().contains_key(tbl_name) {
300 self.joined_aliases
301 .borrow()
302 .get(tbl_name)
303 .and_then(|aliases| aliases.get(column_name))
304 .cloned()
305 .unwrap_or_else(|| column_name.to_string())
306 } else {
307 column_name.to_string()
308 }
309 }
310
311 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
312 match expr {
313 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
314 SetExpr::Query(query) => self.execute_query_no_ctes(query),
315 SetExpr::SetOperation {
316 op: SetOperator::Union,
317 set_quantifier,
318 left,
319 right,
320 } => self.process_union(left, right, set_quantifier, query),
321
322 #[cfg(feature = "semi_anti_join")]
323 SetExpr::SetOperation {
324 op: SetOperator::Intersect | SetOperator::Except,
325 set_quantifier,
326 left,
327 right,
328 } => self.process_except_intersect(left, right, set_quantifier, query),
329
330 SetExpr::Values(Values {
331 explicit_row: _,
332 rows,
333 }) => self.process_values(rows),
334
335 SetExpr::Table(tbl) => {
336 if tbl.table_name.is_some() {
337 let table_name = tbl.table_name.as_ref().unwrap();
338 self.get_table_from_current_scope(table_name)
339 .ok_or_else(|| {
340 polars_err!(
341 SQLInterface: "no table or alias named '{}' found",
342 tbl
343 )
344 })
345 } else {
346 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
347 }
348 },
349 op => {
350 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
351 },
352 }
353 }
354
355 #[cfg(feature = "semi_anti_join")]
356 fn process_except_intersect(
357 &mut self,
358 left: &SetExpr,
359 right: &SetExpr,
360 quantifier: &SetQuantifier,
361 query: &Query,
362 ) -> PolarsResult<LazyFrame> {
363 let (join_type, op_name) = match *query.body {
364 SetExpr::SetOperation {
365 op: SetOperator::Except,
366 ..
367 } => (JoinType::Anti, "EXCEPT"),
368 _ => (JoinType::Semi, "INTERSECT"),
369 };
370 let mut lf = self.process_query(left, query)?;
371 let mut rf = self.process_query(right, query)?;
372 let join = lf
373 .clone()
374 .join_builder()
375 .with(rf.clone())
376 .how(join_type)
377 .join_nulls(true);
378
379 let lf_schema = self.get_frame_schema(&mut lf)?;
380 let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
381 let joined_tbl = match quantifier {
382 SetQuantifier::ByName => join.on(lf_cols).finish(),
383 SetQuantifier::Distinct | SetQuantifier::None => {
384 let rf_schema = self.get_frame_schema(&mut rf)?;
385 let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
386 if lf_cols.len() != rf_cols.len() {
387 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
388 }
389 join.left_on(lf_cols).right_on(rf_cols).finish()
390 },
391 _ => {
392 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
393 },
394 };
395 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
396 }
397
398 fn process_union(
399 &mut self,
400 left: &SetExpr,
401 right: &SetExpr,
402 quantifier: &SetQuantifier,
403 query: &Query,
404 ) -> PolarsResult<LazyFrame> {
405 let mut lf = self.process_query(left, query)?;
406 let mut rf = self.process_query(right, query)?;
407 let opts = UnionArgs {
408 parallel: true,
409 to_supertypes: true,
410 ..Default::default()
411 };
412 match quantifier {
413 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
415 let lf_schema = self.get_frame_schema(&mut lf)?;
416 let rf_schema = self.get_frame_schema(&mut rf)?;
417 if lf_schema.len() != rf_schema.len() {
418 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
419 }
420 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
421 match quantifier {
422 SetQuantifier::Distinct | SetQuantifier::None => {
423 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
424 },
425 _ => concatenated,
426 }
427 },
428 #[cfg(feature = "diagonal_concat")]
430 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
431 #[cfg(feature = "diagonal_concat")]
433 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
434 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
435 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
436 },
437 #[allow(unreachable_patterns)]
438 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
439 }
440 }
441
442 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
443 let frame_rows: Vec<Row> = values.iter().map(|row| {
444 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
445 let expr = parse_sql_expr(expr, self, None)?;
446 match expr {
447 Expr::Literal(value) => {
448 value.to_any_value()
449 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
450 .map(|av| av.into_static())
451 },
452 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
453 }
454 }).collect();
455 row_data.map(Row::new)
456 }).collect::<Result<_, _>>()?;
457
458 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
459 }
460
461 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
463 match stmt {
464 Statement::Explain { statement, .. } => {
465 let lf = self.execute_statement(statement)?;
466 let plan = lf.describe_optimized_plan()?;
467 let plan = plan
468 .split('\n')
469 .collect::<Series>()
470 .with_name(PlSmallStr::from_static("Logical Plan"))
471 .into_column();
472 let df = DataFrame::new(vec![plan])?;
473 Ok(df.lazy())
474 },
475 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
476 }
477 }
478
479 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
481 let tables = Column::new("name".into(), self.get_tables());
482 let df = DataFrame::new(vec![tables])?;
483 Ok(df.lazy())
484 }
485
486 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
488 match stmt {
489 Statement::Drop { names, .. } => {
490 names.iter().for_each(|name| {
491 self.table_map.remove(&name.to_string());
492 });
493 Ok(DataFrame::empty().lazy())
494 },
495 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
496 }
497 }
498
499 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
501 if let Statement::Delete(Delete {
502 tables,
503 from,
504 using,
505 selection,
506 returning,
507 order_by,
508 limit,
509 }) = stmt
510 {
511 if !tables.is_empty()
512 || using.is_some()
513 || returning.is_some()
514 || limit.is_some()
515 || !order_by.is_empty()
516 {
517 let error_message = match () {
518 _ if !tables.is_empty() => "DELETE expects exactly one table name",
519 _ if using.is_some() => "DELETE does not support the USING clause",
520 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
521 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
522 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
523 _ => unreachable!(),
524 };
525 polars_bail!(SQLInterface: error_message);
526 }
527 let from_tables = match &from {
528 FromTable::WithFromKeyword(from) => from,
529 FromTable::WithoutKeyword(from) => from,
530 };
531 if from_tables.len() > 1 {
532 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
533 }
534 let tbl_expr = from_tables.first().unwrap();
535 if !tbl_expr.joins.is_empty() {
536 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
537 }
538 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
539 if selection.is_none() {
540 Ok(DataFrame::empty_with_schema(
542 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
543 .unwrap()
544 .as_ref(),
545 )
546 .lazy())
547 } else {
548 Ok(self.process_where(lf.clone(), selection, true)?)
550 }
551 } else {
552 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
553 }
554 }
555
556 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
558 if let Statement::Truncate {
559 table_names,
560 partitions,
561 ..
562 } = stmt
563 {
564 match partitions {
565 None => {
566 if table_names.len() != 1 {
567 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
568 }
569 let tbl = table_names[0].to_string();
570 if let Some(lf) = self.table_map.get_mut(&tbl) {
571 *lf = DataFrame::empty_with_schema(
572 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
573 .unwrap()
574 .as_ref(),
575 )
576 .lazy();
577 Ok(lf.clone())
578 } else {
579 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
580 }
581 },
582 _ => {
583 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
584 },
585 }
586 } else {
587 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
588 }
589 }
590
591 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
592 self.cte_map.borrow_mut().insert(name.to_owned(), lf);
593 }
594
595 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
596 if let Some(with) = &query.with {
597 if with.recursive {
598 polars_bail!(SQLInterface: "recursive CTEs are not supported")
599 }
600 for cte in &with.cte_tables {
601 let cte_name = cte.alias.name.value.clone();
602 let mut lf = self.execute_query(&cte.query)?;
603 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
604 self.register_cte(&cte_name, lf);
605 }
606 }
607 Ok(())
608 }
609
610 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
612 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
613 if !tbl_expr.joins.is_empty() {
614 for join in &tbl_expr.joins {
615 let (r_name, mut rf) = self.get_table(&join.relation)?;
616 if r_name.is_empty() {
617 polars_bail!(
619 SQLInterface:
620 "cannot join on unnamed relation; please provide an alias"
621 )
622 }
623 let left_schema = self.get_frame_schema(&mut lf)?;
624 let right_schema = self.get_frame_schema(&mut rf)?;
625
626 lf = match &join.join_operator {
627 op @ (JoinOperator::FullOuter(constraint)
628 | JoinOperator::LeftOuter(constraint)
629 | JoinOperator::RightOuter(constraint)
630 | JoinOperator::Inner(constraint)
631 | JoinOperator::Anti(constraint)
632 | JoinOperator::Semi(constraint)
633 | JoinOperator::LeftAnti(constraint)
634 | JoinOperator::LeftSemi(constraint)
635 | JoinOperator::RightAnti(constraint)
636 | JoinOperator::RightSemi(constraint)) => {
637 let (lf, rf) = match op {
638 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
639 _ => (lf, rf),
640 };
641 self.process_join(
642 &TableInfo {
643 frame: lf,
644 name: (&l_name).into(),
645 schema: left_schema.clone(),
646 },
647 &TableInfo {
648 frame: rf,
649 name: (&r_name).into(),
650 schema: right_schema.clone(),
651 },
652 constraint,
653 match op {
654 JoinOperator::FullOuter(_) => JoinType::Full,
655 JoinOperator::LeftOuter(_) => JoinType::Left,
656 JoinOperator::RightOuter(_) => JoinType::Right,
657 JoinOperator::Inner(_) => JoinType::Inner,
658 #[cfg(feature = "semi_anti_join")]
659 JoinOperator::Anti(_)
660 | JoinOperator::LeftAnti(_)
661 | JoinOperator::RightAnti(_) => JoinType::Anti,
662 #[cfg(feature = "semi_anti_join")]
663 JoinOperator::Semi(_)
664 | JoinOperator::LeftSemi(_)
665 | JoinOperator::RightSemi(_) => JoinType::Semi,
666 join_type => polars_bail!(
667 SQLInterface:
668 "join type '{:?}' not currently supported",
669 join_type
670 ),
671 },
672 )?
673 },
674 JoinOperator::CrossJoin => {
675 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
676 },
677 join_type => {
678 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
679 },
680 };
681
682 let joined_schema = self.get_frame_schema(&mut lf)?;
684
685 self.joined_aliases.borrow_mut().insert(
686 r_name.clone(),
687 right_schema
688 .iter_names()
689 .filter_map(|name| {
690 let aliased_name = format!("{name}:{r_name}");
692 if left_schema.contains(name)
693 && joined_schema.contains(aliased_name.as_str())
694 {
695 Some((name.to_string(), aliased_name))
696 } else {
697 None
698 }
699 })
700 .collect::<PlHashMap<String, String>>(),
701 );
702 }
703 };
704 Ok(lf)
705 }
706
707 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
709 let mut lf = if select_stmt.from.is_empty() {
710 DataFrame::empty().lazy()
711 } else {
712 let from = select_stmt.clone().from;
715 if from.len() > 1 {
716 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
717 }
718 self.execute_from_statement(from.first().unwrap())?
719 };
720
721 let schema = self.get_frame_schema(&mut lf)?;
723 lf = self.process_where(lf, &select_stmt.selection, false)?;
724
725 let mut select_modifiers = SelectModifiers {
727 ilike: None,
728 exclude: PlHashSet::new(),
729 rename: PlHashMap::new(),
730 replace: vec![],
731 };
732
733 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
734
735 let mut group_by_keys: Vec<Expr> = Vec::new();
737 match &select_stmt.group_by {
738 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
740 if !modifiers.is_empty() {
741 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
742 }
743 group_by_keys = group_by_exprs
745 .iter()
746 .map(|e| {
747 self.expr_or_ordinal(
748 e,
749 &projections,
750 None,
751 Some(schema.deref()),
752 "GROUP BY",
753 )
754 })
755 .collect::<PolarsResult<_>>()?
756 },
757 GroupByExpr::All(modifiers) => {
760 if !modifiers.is_empty() {
761 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
762 }
763 projections.iter().for_each(|expr| match expr {
764 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
766 Expr::Column(_) => group_by_keys.push(expr.clone()),
767 Expr::Alias(e, _)
768 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
769 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
770 if let Expr::Column(name) = &**e {
771 group_by_keys.push(col(name.clone()));
772 }
773 },
774 _ => {
775 if !has_expr(expr, |e| {
777 matches!(e, Expr::Agg(_))
778 || matches!(e, Expr::Len)
779 || matches!(e, Expr::Window { .. })
780 }) {
781 group_by_keys.push(expr.clone())
782 }
783 },
784 });
785 },
786 };
787
788 lf = if group_by_keys.is_empty() {
789 if select_stmt.having.is_some() {
791 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
792 };
793
794 let mut retained_cols = Vec::with_capacity(projections.len());
796 let mut retained_names = Vec::with_capacity(projections.len());
797 let have_order_by = query.order_by.is_some();
798 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
800
801 for p in projections.iter() {
806 let name = p
807 .to_field(schema.deref(), Context::Default)?
808 .name
809 .to_string();
810 if select_modifiers.matches_ilike(&name)
811 && !select_modifiers.exclude.contains(&name)
812 {
813 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
814
815 retained_cols.push(if have_order_by {
816 col(name.as_str())
817 } else {
818 p.clone()
819 });
820 retained_names.push(col(name));
821 }
822 }
823
824 if have_order_by {
826 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
831 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
832 {
833 lf = lf.with_columns(projections);
834 } else {
835 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
845 lf = lf
846 .clone()
847 .select(projections)
848 .with_row_index(NAME, None)
849 .join(
850 lf.with_row_index(NAME, None),
851 [col(NAME)],
852 [col(NAME)],
853 JoinArgs {
854 how: JoinType::Left,
855 validation: Default::default(),
856 suffix: None,
857 slice: None,
858 nulls_equal: false,
859 coalesce: Default::default(),
860 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
861 },
862 );
863 }
864 }
865
866 if !select_modifiers.replace.is_empty() {
867 lf = lf.with_columns(&select_modifiers.replace);
868 }
869 if !select_modifiers.rename.is_empty() {
870 lf = lf.with_columns(select_modifiers.renamed_cols());
871 }
872
873 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
874
875 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
877 && !have_order_by
878 {
879 lf = lf.with_columns(retained_cols).select(retained_names);
881 } else {
882 lf = lf.select(retained_cols);
883 }
884
885 if !select_modifiers.rename.is_empty() {
886 lf = lf.rename(
887 select_modifiers.rename.keys(),
888 select_modifiers.rename.values(),
889 true,
890 );
891 };
892 lf
893 } else {
894 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
895 lf = self.process_order_by(lf, &query.order_by, None)?;
896
897 let schema = Some(self.get_frame_schema(&mut lf)?);
899 match select_stmt.having.as_ref() {
900 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
901 None => lf,
902 }
903 };
904
905 lf = match &select_stmt.distinct {
907 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
908 Some(Distinct::On(exprs)) => {
909 let schema = Some(self.get_frame_schema(&mut lf)?);
911 let cols = exprs
912 .iter()
913 .map(|e| {
914 let expr = parse_sql_expr(e, self, schema.as_deref())?;
915 if let Expr::Column(name) = expr {
916 Ok(name)
917 } else {
918 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
919 }
920 })
921 .collect::<PolarsResult<Vec<_>>>()?;
922
923 lf = self.process_order_by(lf, &query.order_by, None)?;
925 return Ok(lf.unique_stable(
926 Some(Selector::ByName {
927 names: cols.into(),
928 strict: true,
929 }),
930 UniqueKeepStrategy::First,
931 ));
932 },
933 None => lf,
934 };
935 Ok(lf)
936 }
937
938 fn column_projections(
939 &mut self,
940 select_stmt: &Select,
941 schema: &SchemaRef,
942 select_modifiers: &mut SelectModifiers,
943 ) -> PolarsResult<Vec<Expr>> {
944 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
945 .projection
946 .iter()
947 .map(|select_item| match select_item {
948 SelectItem::UnnamedExpr(expr) => {
949 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
950 },
951 SelectItem::ExprWithAlias { expr, alias } => {
952 let expr = parse_sql_expr(expr, self, Some(schema))?;
953 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
954 },
955 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
956 .process_qualified_wildcard(
957 obj_name,
958 wildcard_options,
959 select_modifiers,
960 Some(schema),
961 ),
962 SelectItem::Wildcard(wildcard_options) => {
963 let cols = schema
964 .iter_names()
965 .map(|name| col(name.clone()))
966 .collect::<Vec<_>>();
967
968 self.process_wildcard_additional_options(
969 cols,
970 wildcard_options,
971 select_modifiers,
972 Some(schema),
973 )
974 },
975 })
976 .collect();
977
978 let flattened_exprs: Vec<Expr> = parsed_items?
979 .into_iter()
980 .flatten()
981 .flat_map(|expr| expand_exprs(expr, schema))
982 .collect();
983
984 Ok(flattened_exprs)
985 }
986
987 fn process_where(
988 &mut self,
989 mut lf: LazyFrame,
990 expr: &Option<SQLExpr>,
991 invert_filter: bool,
992 ) -> PolarsResult<LazyFrame> {
993 if let Some(expr) = expr {
994 let schema = self.get_frame_schema(&mut lf)?;
995
996 let (all_true, all_false) = match expr {
998 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
999 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1000 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
1001 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1002 (a != b, a == b)
1003 },
1004 _ => (false, false),
1005 },
1006 _ => (false, false),
1007 };
1008 if (all_true && !invert_filter) || (all_false && invert_filter) {
1009 return Ok(lf);
1010 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1011 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1012 }
1013
1014 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1016 if filter_expression.clone().meta().has_multiple_outputs() {
1017 filter_expression = all_horizontal([filter_expression])?;
1018 }
1019 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1020 lf = if invert_filter {
1021 lf.remove(filter_expression)
1022 } else {
1023 lf.filter(filter_expression)
1024 };
1025 }
1026 Ok(lf)
1027 }
1028
1029 pub(super) fn process_join(
1030 &mut self,
1031 tbl_left: &TableInfo,
1032 tbl_right: &TableInfo,
1033 constraint: &JoinConstraint,
1034 join_type: JoinType,
1035 ) -> PolarsResult<LazyFrame> {
1036 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1037
1038 let joined = tbl_left
1039 .frame
1040 .clone()
1041 .join_builder()
1042 .with(tbl_right.frame.clone())
1043 .left_on(left_on)
1044 .right_on(right_on)
1045 .how(join_type)
1046 .suffix(format!(":{}", tbl_right.name))
1047 .coalesce(JoinCoalesce::KeepColumns)
1048 .finish();
1049
1050 Ok(joined)
1051 }
1052
1053 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1054 let mut contexts = vec![];
1055 for expr in exprs {
1056 *expr = expr.clone().map_expr(|e| match e {
1057 Expr::SubPlan(lp, names) => {
1058 contexts.push(<LazyFrame>::from((**lp).clone()));
1059 if names.len() == 1 {
1060 Expr::Column(names[0].as_str().into())
1061 } else {
1062 Expr::SubPlan(lp, names)
1063 }
1064 },
1065 e => e,
1066 })
1067 }
1068
1069 if contexts.is_empty() {
1070 lf
1071 } else {
1072 lf.with_context(contexts)
1073 }
1074 }
1075
1076 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1077 if let Statement::CreateTable(CreateTable {
1078 if_not_exists,
1079 name,
1080 query,
1081 ..
1082 }) = stmt
1083 {
1084 let tbl_name = name.0.first().unwrap().value.as_str();
1085 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1087 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1088 }
1090 if let Some(query) = query {
1091 let lf = self.execute_query(query)?;
1092 self.register(tbl_name, lf);
1093 let out = df! {
1094 "Response" => ["CREATE TABLE"]
1095 }
1096 .unwrap()
1097 .lazy();
1098 Ok(out)
1099 } else {
1100 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1101 }
1102 } else {
1103 unreachable!()
1104 }
1105 }
1106
1107 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1108 match relation {
1109 TableFactor::Table {
1110 name, alias, args, ..
1111 } => {
1112 if let Some(args) = args {
1113 return self.execute_table_function(name, alias, &args.args);
1114 }
1115 let tbl_name = name.0.first().unwrap().value.as_str();
1116 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1117 match alias {
1118 Some(alias) => {
1119 self.table_aliases
1120 .borrow_mut()
1121 .insert(alias.name.value.clone(), tbl_name.to_string());
1122 Ok((alias.to_string(), lf))
1123 },
1124 None => Ok((tbl_name.to_string(), lf)),
1125 }
1126 } else {
1127 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1128 }
1129 },
1130 TableFactor::Derived {
1131 lateral,
1132 subquery,
1133 alias,
1134 } => {
1135 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1136 if let Some(alias) = alias {
1137 let mut lf = self.execute_query_no_ctes(subquery)?;
1138 lf = self.rename_columns_from_table_alias(lf, alias)?;
1139 self.table_map.insert(alias.name.value.clone(), lf.clone());
1140 Ok((alias.name.value.clone(), lf))
1141 } else {
1142 polars_bail!(SQLSyntax: "derived tables must have aliases");
1143 }
1144 },
1145 TableFactor::UNNEST {
1146 alias,
1147 array_exprs,
1148 with_offset,
1149 with_offset_alias: _,
1150 ..
1151 } => {
1152 if let Some(alias) = alias {
1153 let column_names: Vec<Option<PlSmallStr>> = alias
1154 .columns
1155 .iter()
1156 .map(|c| {
1157 if c.name.value.is_empty() {
1158 None
1159 } else {
1160 Some(PlSmallStr::from_str(c.name.value.as_str()))
1161 }
1162 })
1163 .collect();
1164
1165 let column_values: Vec<Series> = array_exprs
1166 .iter()
1167 .map(|arr| parse_sql_array(arr, self))
1168 .collect::<Result<_, _>>()?;
1169
1170 polars_ensure!(!column_names.is_empty(),
1171 SQLSyntax:
1172 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1173 );
1174 if column_names.len() != column_values.len() {
1175 let plural = if column_values.len() > 1 { "s" } else { "" };
1176 polars_bail!(
1177 SQLSyntax:
1178 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1179 );
1180 }
1181 let column_series: Vec<Column> = column_values
1182 .into_iter()
1183 .zip(column_names)
1184 .map(|(s, name)| {
1185 if let Some(name) = name {
1186 s.with_name(name)
1187 } else {
1188 s
1189 }
1190 })
1191 .map(Column::from)
1192 .collect();
1193
1194 let lf = DataFrame::new(column_series)?.lazy();
1195
1196 if *with_offset {
1197 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1200 }
1201 let table_name = alias.name.value.clone();
1202 self.table_map.insert(table_name.clone(), lf.clone());
1203 Ok((table_name, lf))
1204 } else {
1205 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1206 }
1207 },
1208 TableFactor::NestedJoin {
1209 table_with_joins,
1210 alias,
1211 } => {
1212 let lf = self.execute_from_statement(table_with_joins)?;
1213 match alias {
1214 Some(a) => Ok((a.name.value.clone(), lf)),
1215 None => Ok(("".to_string(), lf)),
1216 }
1217 },
1218 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1220 }
1221 }
1222
1223 fn execute_table_function(
1224 &mut self,
1225 name: &ObjectName,
1226 alias: &Option<TableAlias>,
1227 args: &[FunctionArg],
1228 ) -> PolarsResult<(String, LazyFrame)> {
1229 let tbl_fn = name.0.first().unwrap().value.as_str();
1230 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1231 let (tbl_name, lf) = read_fn.execute(args)?;
1232 #[allow(clippy::useless_asref)]
1233 let tbl_name = alias
1234 .as_ref()
1235 .map(|a| a.name.value.clone())
1236 .unwrap_or_else(|| tbl_name.to_str().to_string());
1237
1238 self.table_map.insert(tbl_name.clone(), lf.clone());
1239 Ok((tbl_name, lf))
1240 }
1241
1242 fn process_order_by(
1243 &mut self,
1244 mut lf: LazyFrame,
1245 order_by: &Option<OrderBy>,
1246 selected: Option<&[Expr]>,
1247 ) -> PolarsResult<LazyFrame> {
1248 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1249 return Ok(lf);
1250 }
1251 let schema = self.get_frame_schema(&mut lf)?;
1252 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1253
1254 let order_by = order_by.as_ref().unwrap().exprs.clone();
1255 let mut descending = Vec::with_capacity(order_by.len());
1256 let mut nulls_last = Vec::with_capacity(order_by.len());
1257 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1258
1259 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1261 {
1262 if let Some(selected) = selected {
1263 by.extend(selected.iter().cloned());
1264 } else {
1265 by.extend(columns_iter);
1266 };
1267 let desc_order = !order_by[0].asc.unwrap_or(true);
1268 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1269 descending.resize(by.len(), desc_order);
1270 } else {
1271 let columns = &columns_iter.collect::<Vec<_>>();
1272 for ob in order_by {
1273 let desc_order = !ob.asc.unwrap_or(true);
1276 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1277 descending.push(desc_order);
1278
1279 by.push(self.expr_or_ordinal(
1281 &ob.expr,
1282 columns,
1283 selected,
1284 Some(&schema),
1285 "ORDER BY",
1286 )?)
1287 }
1288 }
1289 Ok(lf.sort_by_exprs(
1290 &by,
1291 SortMultipleOptions::default()
1292 .with_order_descending_multi(descending)
1293 .with_nulls_last_multi(nulls_last)
1294 .with_maintain_order(true),
1295 ))
1296 }
1297
1298 fn process_group_by(
1299 &mut self,
1300 mut lf: LazyFrame,
1301 group_by_keys: &[Expr],
1302 projections: &[Expr],
1303 ) -> PolarsResult<LazyFrame> {
1304 let schema_before = self.get_frame_schema(&mut lf)?;
1305 let group_by_keys_schema =
1306 expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1307
1308 let mut aggregation_projection = Vec::with_capacity(projections.len());
1310 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1311 let mut projection_aliases = PlHashSet::new();
1312 let mut group_key_aliases = PlHashSet::new();
1313
1314 for mut e in projections {
1315 let is_agg_or_window = has_expr(e, |e| {
1317 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1318 });
1319
1320 let mut is_function_under_alias = false;
1321
1322 if let Expr::Alias(expr, alias) = e {
1324 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1325 group_key_aliases.insert(alias.as_ref());
1326 e = expr
1327 } else if let Expr::Function {
1328 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1329 ..
1330 } = expr.deref()
1331 {
1332 projection_overrides
1333 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1334 } else if let Expr::Function { .. } = expr.deref() {
1335 is_function_under_alias = true;
1336 } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1337 projection_aliases.insert(alias.as_ref());
1338 }
1339 }
1340 let field = e.to_field(&schema_before, Context::Default)?;
1341 if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1342 let mut e = e.clone();
1343 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1344 e = (**expr).clone();
1345 } else if let Expr::Alias(expr, name) = &e {
1346 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1347 e = (**expr).clone().alias(name.clone());
1348 }
1349 }
1350 aggregation_projection.push(e);
1351 } else if let Expr::Column(_)
1352 | Expr::Function {
1353 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1354 ..
1355 } = e
1356 {
1357 if !group_by_keys_schema.contains(&field.name) {
1359 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1360 }
1361 } else if is_function_under_alias || matches!(e, Expr::Function { .. }) {
1362 aggregation_projection.push(e.clone());
1363 } else if let Expr::Literal { .. }
1364 | Expr::Cast { .. }
1365 | Expr::Ternary { .. }
1366 | Expr::Field { .. }
1367 | Expr::Alias { .. } = e
1368 {
1369 } else {
1371 polars_bail!(SQLSyntax: "Unsupported operation in the GROUP BY clause: {}", e);
1372 }
1373 }
1374 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1375 let projection_schema =
1376 expressions_to_schema(projections, &schema_before, Context::Default)?;
1377
1378 let final_projection = projection_schema
1380 .iter_names()
1381 .zip(projections)
1382 .map(|(name, projection_expr)| {
1383 if let Some(expr) = projection_overrides.get(name.as_str()) {
1384 expr.clone()
1385 } else if group_by_keys_schema.get(name).is_some()
1386 || projection_aliases.contains(name.as_str())
1387 || group_key_aliases.contains(name.as_str())
1388 {
1389 projection_expr.clone()
1390 } else {
1391 col(name.clone())
1392 }
1393 })
1394 .collect::<Vec<_>>();
1395
1396 Ok(aggregated.select(&final_projection))
1397 }
1398
1399 fn process_limit_offset(
1400 &self,
1401 lf: LazyFrame,
1402 limit: &Option<SQLExpr>,
1403 offset: &Option<Offset>,
1404 ) -> PolarsResult<LazyFrame> {
1405 match (offset, limit) {
1406 (
1407 Some(Offset {
1408 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1409 ..
1410 }),
1411 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1412 ) => Ok(lf.slice(
1413 offset
1414 .parse()
1415 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1416 limit
1417 .parse()
1418 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1419 )),
1420 (
1421 Some(Offset {
1422 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1423 ..
1424 }),
1425 None,
1426 ) => Ok(lf.slice(
1427 offset
1428 .parse()
1429 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1430 IdxSize::MAX,
1431 )),
1432 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1433 limit
1434 .parse()
1435 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1436 )),
1437 (None, None) => Ok(lf),
1438 _ => polars_bail!(
1439 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1440 ),
1441 }
1442 }
1443
1444 fn process_qualified_wildcard(
1445 &mut self,
1446 ObjectName(idents): &ObjectName,
1447 options: &WildcardAdditionalOptions,
1448 modifiers: &mut SelectModifiers,
1449 schema: Option<&Schema>,
1450 ) -> PolarsResult<Vec<Expr>> {
1451 let mut new_idents = idents.clone();
1452 new_idents.push(Ident::new("*"));
1453
1454 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1455 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1456 }
1457
1458 fn process_wildcard_additional_options(
1459 &mut self,
1460 exprs: Vec<Expr>,
1461 options: &WildcardAdditionalOptions,
1462 modifiers: &mut SelectModifiers,
1463 schema: Option<&Schema>,
1464 ) -> PolarsResult<Vec<Expr>> {
1465 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1466 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1467 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1468 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1469 }
1470
1471 if let Some(items) = &options.opt_exclude {
1473 match items {
1474 ExcludeSelectItem::Single(ident) => {
1475 modifiers.exclude.insert(ident.value.clone());
1476 },
1477 ExcludeSelectItem::Multiple(idents) => {
1478 modifiers
1479 .exclude
1480 .extend(idents.iter().map(|i| i.value.clone()));
1481 },
1482 };
1483 }
1484
1485 if let Some(items) = &options.opt_except {
1487 modifiers.exclude.insert(items.first_element.value.clone());
1488 modifiers
1489 .exclude
1490 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1491 }
1492
1493 if let Some(item) = &options.opt_ilike {
1495 let rx = regex::escape(item.pattern.as_str())
1496 .replace('%', ".*")
1497 .replace('_', ".");
1498
1499 modifiers.ilike = Some(
1500 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1501 );
1502 }
1503
1504 if let Some(items) = &options.opt_rename {
1506 let renames = match items {
1507 RenameSelectItem::Single(rename) => vec![rename],
1508 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1509 };
1510 for rn in renames {
1511 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1512 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1513 if before != after {
1514 modifiers.rename.insert(before, after);
1515 }
1516 }
1517 }
1518
1519 if let Some(replacements) = &options.opt_replace {
1521 for rp in &replacements.items {
1522 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1523 modifiers
1524 .replace
1525 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1526 }
1527 }
1528 Ok(exprs)
1529 }
1530
1531 fn rename_columns_from_table_alias(
1532 &mut self,
1533 mut lf: LazyFrame,
1534 alias: &TableAlias,
1535 ) -> PolarsResult<LazyFrame> {
1536 if alias.columns.is_empty() {
1537 Ok(lf)
1538 } else {
1539 let schema = self.get_frame_schema(&mut lf)?;
1540 if alias.columns.len() != schema.len() {
1541 polars_bail!(
1542 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1543 alias.columns.len(), alias.name.value, schema.len()
1544 )
1545 } else {
1546 let existing_columns: Vec<_> = schema.iter_names().collect();
1547 let new_columns: Vec<_> =
1548 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1549 Ok(lf.rename(existing_columns, new_columns, true))
1550 }
1551 }
1552 }
1553}
1554
1555impl SQLContext {
1556 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1558 self.table_map.clone()
1559 }
1560
1561 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1563 Self {
1564 table_map,
1565 ..Default::default()
1566 }
1567 }
1568}
1569
1570fn collect_compound_identifiers(
1571 left: &[Ident],
1572 right: &[Ident],
1573 left_name: &str,
1574 right_name: &str,
1575) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1576 if left.len() == 2 && right.len() == 2 {
1577 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1578 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1579
1580 if left_name == tbl_b || right_name == tbl_a {
1582 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1583 } else {
1584 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1585 }
1586 } else {
1587 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1588 }
1589}
1590
1591fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1592 match expr {
1593 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1594 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1595 schema
1596 .iter_names()
1597 .filter(|name| re.is_match(name))
1598 .map(|name| col(name.clone()))
1599 .collect::<Vec<_>>()
1600 },
1601 Expr::Selector(s) => s
1602 .into_columns(schema, &Default::default())
1603 .unwrap()
1604 .into_iter()
1605 .map(col)
1606 .collect::<Vec<_>>(),
1607 _ => vec![expr],
1608 }
1609}
1610
1611fn is_regex_colname(nm: &str) -> bool {
1612 nm.starts_with('^') && nm.ends_with('$')
1613}
1614
1615fn process_join_on(
1616 expression: &sqlparser::ast::Expr,
1617 tbl_left: &TableInfo,
1618 tbl_right: &TableInfo,
1619) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1620 match expression {
1621 SQLExpr::BinaryOp { left, op, right } => match op {
1622 BinaryOperator::And => {
1623 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1624 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1625 left_i.append(&mut left_j);
1626 right_i.append(&mut right_j);
1627 Ok((left_i, right_i))
1628 },
1629 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1630 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1631 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1632 },
1633 _ => {
1634 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1635 },
1636 },
1637 _ => {
1638 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1639 },
1640 },
1641 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1642 _ => {
1643 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1644 },
1645 }
1646}
1647
1648fn process_join_constraint(
1649 constraint: &JoinConstraint,
1650 tbl_left: &TableInfo,
1651 tbl_right: &TableInfo,
1652) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1653 match constraint {
1654 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1655 process_join_on(expr, tbl_left, tbl_right)
1656 },
1657 JoinConstraint::Using(idents) if !idents.is_empty() => {
1658 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1659 Ok((using.clone(), using))
1660 },
1661 JoinConstraint::Natural => {
1662 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1663 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1664 let on: Vec<Expr> = left_names
1665 .intersection(&right_names)
1666 .map(|&name| col(name.clone()))
1667 .collect();
1668 if on.is_empty() {
1669 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1670 }
1671 Ok((on.clone(), on))
1672 },
1673 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1674 }
1675}
1676
1677bitflags::bitflags! {
1678 #[derive(PartialEq)]
1683 struct ExprSqlProjectionHeightBehavior: u8 {
1684 const MaintainsColumn = 1 << 0;
1686 const Independent = 1 << 1;
1690 const InheritsContext = 1 << 2;
1693 }
1694}
1695
1696impl ExprSqlProjectionHeightBehavior {
1697 fn identify_from_expr(expr: &Expr) -> Self {
1698 let mut has_column = false;
1699 let mut has_independent = false;
1700
1701 for e in expr.into_iter() {
1702 use Expr::*;
1703
1704 has_column |= matches!(e, Column(_) | Selector(_));
1705
1706 has_independent |= match e {
1707 AnonymousFunction { options, .. } => {
1709 options.returns_scalar() || !options.is_length_preserving()
1710 },
1711
1712 Literal(v) => !v.is_scalar(),
1713
1714 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1715
1716 Agg { .. } | Len => true,
1717
1718 _ => false,
1719 }
1720 }
1721
1722 if has_independent {
1723 Self::Independent
1724 } else if has_column {
1725 Self::MaintainsColumn
1726 } else {
1727 Self::InheritsContext
1728 }
1729 }
1730}