1use std::ops::Deref;
2
3use polars_core::frame::row::Row;
4use polars_core::prelude::*;
5use polars_lazy::prelude::*;
6use polars_ops::frame::JoinCoalesce;
7use polars_plan::dsl::function_expr::StructFunction;
8use polars_plan::prelude::*;
9use polars_utils::format_pl_smallstr;
10use sqlparser::ast::{
11 BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
12 FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset,
13 OrderBy, Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier,
14 Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
15 WildcardAdditionalOptions,
16};
17use sqlparser::dialect::GenericDialect;
18use sqlparser::parser::{Parser, ParserOptions};
19
20use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
21use crate::sql_expr::{
22 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
23};
24use crate::table_functions::PolarsTableFunctions;
25
26#[derive(Clone)]
27pub struct TableInfo {
28 pub(crate) frame: LazyFrame,
29 pub(crate) name: PlSmallStr,
30 pub(crate) schema: Arc<Schema>,
31}
32
33struct SelectModifiers {
34 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
39impl SelectModifiers {
40 fn matches_ilike(&self, s: &str) -> bool {
41 match &self.ilike {
42 Some(rx) => rx.is_match(s),
43 None => true,
44 }
45 }
46 fn renamed_cols(&self) -> Vec<Expr> {
47 self.rename
48 .iter()
49 .map(|(before, after)| col(before.clone()).alias(after.clone()))
50 .collect()
51 }
52}
53
54#[derive(Clone)]
56pub struct SQLContext {
57 pub(crate) table_map: PlHashMap<String, LazyFrame>,
58 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
59 pub(crate) lp_arena: Arena<IR>,
60 pub(crate) expr_arena: Arena<AExpr>,
61
62 cte_map: PlHashMap<String, LazyFrame>,
63 table_aliases: PlHashMap<String, String>,
64 joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
65}
66
67impl Default for SQLContext {
68 fn default() -> Self {
69 Self {
70 function_registry: Arc::new(DefaultFunctionRegistry {}),
71 table_map: Default::default(),
72 cte_map: Default::default(),
73 table_aliases: Default::default(),
74 joined_aliases: Default::default(),
75 lp_arena: Default::default(),
76 expr_arena: Default::default(),
77 }
78 }
79}
80
81impl SQLContext {
82 pub fn new() -> Self {
90 Self::default()
91 }
92
93 pub fn get_tables(&self) -> Vec<String> {
95 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
96 tables.sort_unstable();
97 tables
98 }
99
100 pub fn register(&mut self, name: &str, lf: LazyFrame) {
116 self.table_map.insert(name.to_owned(), lf);
117 }
118
119 pub fn unregister(&mut self, name: &str) {
121 self.table_map.remove(&name.to_owned());
122 }
123
124 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
143 let mut parser = Parser::new(&GenericDialect);
144 parser = parser.with_options(ParserOptions {
145 trailing_commas: true,
146 ..Default::default()
147 });
148
149 let ast = parser
150 .try_with_sql(query)
151 .map_err(to_sql_interface_err)?
152 .parse_statements()
153 .map_err(to_sql_interface_err)?;
154
155 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
156 let res = self.execute_statement(ast.first().unwrap())?;
157
158 let lp_arena = std::mem::take(&mut self.lp_arena);
161 let expr_arena = std::mem::take(&mut self.expr_arena);
162 res.set_cached_arena(lp_arena, expr_arena);
163
164 self.cte_map.clear();
166 self.table_aliases.clear();
167 self.joined_aliases.clear();
168
169 Ok(res)
170 }
171
172 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
175 self.function_registry = function_registry;
176 self
177 }
178
179 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
181 &self.function_registry
182 }
183
184 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
186 Arc::get_mut(&mut self.function_registry).unwrap()
187 }
188}
189
190impl SQLContext {
191 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
192 let ast = stmt;
193 Ok(match ast {
194 Statement::Query(query) => self.execute_query(query)?,
195 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
196 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
197 stmt @ Statement::Drop {
198 object_type: ObjectType::Table,
199 ..
200 } => self.execute_drop_table(stmt)?,
201 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
202 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
203 stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
204 _ => polars_bail!(
205 SQLInterface: "statement type is not supported:\n{:?}", ast,
206 ),
207 })
208 }
209
210 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211 self.register_ctes(query)?;
212 self.execute_query_no_ctes(query)
213 }
214
215 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216 let lf = self.process_query(&query.body, query)?;
217 self.process_limit_offset(lf, &query.limit, &query.offset)
218 }
219
220 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222 }
223
224 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225 self.table_map
229 .get(name)
230 .or_else(|| self.cte_map.get(name))
231 .or_else(|| {
232 self.table_aliases.get(name).and_then(|alias| {
233 self.table_map
234 .get(alias.as_str())
235 .or_else(|| self.cte_map.get(alias.as_str()))
236 })
237 })
238 .cloned()
239 }
240
241 pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<(LazyFrame, SchemaRef)>
245 where
246 F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
247 {
248 let (joined_aliases, table_aliases, lp_arena, expr_arena, table_map) = (
250 std::mem::take(&mut self.joined_aliases),
252 std::mem::take(&mut self.table_aliases),
253 std::mem::take(&mut self.lp_arena),
254 std::mem::take(&mut self.expr_arena),
255 self.table_map.clone(),
257 );
258
259 let mut lf = query(self)?;
261 let schema = self.get_frame_schema(&mut lf)?;
262
263 lf.set_cached_arena(
265 std::mem::replace(&mut self.lp_arena, lp_arena),
266 std::mem::replace(&mut self.expr_arena, expr_arena),
267 );
268 self.joined_aliases = joined_aliases;
269 self.table_aliases = table_aliases;
270 self.table_map = table_map;
271
272 Ok((lf, schema))
273 }
274
275 fn expr_or_ordinal(
276 &mut self,
277 e: &SQLExpr,
278 exprs: &[Expr],
279 selected: Option<&[Expr]>,
280 schema: Option<&Schema>,
281 clause: &str,
282 ) -> PolarsResult<Expr> {
283 match e {
284 SQLExpr::UnaryOp {
285 op: UnaryOperator::Minus,
286 expr,
287 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
288 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
289 Err(polars_err!(
290 SQLSyntax:
291 "negative ordinal values are invalid for {}; found -{}",
292 clause,
293 idx
294 ))
295 } else {
296 unreachable!()
297 }
298 },
299 SQLExpr::Value(SQLValue::Number(idx, _)) => {
300 let idx = idx.parse::<usize>().map_err(|_| {
302 polars_err!(
303 SQLSyntax:
304 "negative ordinal values are invalid for {}; found {}",
305 clause,
306 idx
307 )
308 })?;
309 let cols = if let Some(cols) = selected {
312 cols
313 } else {
314 exprs
315 };
316 Ok(cols
317 .get(idx - 1)
318 .ok_or_else(|| {
319 polars_err!(
320 SQLInterface:
321 "{} ordinal value must refer to a valid column; found {}",
322 clause,
323 idx
324 )
325 })?
326 .clone())
327 },
328 SQLExpr::Value(v) => Err(polars_err!(
329 SQLSyntax:
330 "{} requires a valid expression or positive ordinal; found {}", clause, v,
331 )),
332 _ => parse_sql_expr(e, self, schema),
333 }
334 }
335
336 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
337 if let Some(aliases) = self.joined_aliases.get(tbl_name) {
338 if let Some(name) = aliases.get(column_name) {
339 return name.to_string();
340 }
341 }
342 column_name.to_string()
343 }
344
345 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
346 match expr {
347 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
348 SetExpr::Query(query) => self.execute_query_no_ctes(query),
349 SetExpr::SetOperation {
350 op: SetOperator::Union,
351 set_quantifier,
352 left,
353 right,
354 } => self.process_union(left, right, set_quantifier, query),
355
356 #[cfg(feature = "semi_anti_join")]
357 SetExpr::SetOperation {
358 op: SetOperator::Intersect | SetOperator::Except,
359 set_quantifier,
360 left,
361 right,
362 } => self.process_except_intersect(left, right, set_quantifier, query),
363
364 SetExpr::Values(Values {
365 explicit_row: _,
366 rows,
367 }) => self.process_values(rows),
368
369 SetExpr::Table(tbl) => {
370 if tbl.table_name.is_some() {
371 let table_name = tbl.table_name.as_ref().unwrap();
372 self.get_table_from_current_scope(table_name)
373 .ok_or_else(|| {
374 polars_err!(
375 SQLInterface: "no table or alias named '{}' found",
376 tbl
377 )
378 })
379 } else {
380 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
381 }
382 },
383 op => {
384 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
385 },
386 }
387 }
388
389 #[cfg(feature = "semi_anti_join")]
390 fn process_except_intersect(
391 &mut self,
392 left: &SetExpr,
393 right: &SetExpr,
394 quantifier: &SetQuantifier,
395 query: &Query,
396 ) -> PolarsResult<LazyFrame> {
397 let (join_type, op_name) = match *query.body {
398 SetExpr::SetOperation {
399 op: SetOperator::Except,
400 ..
401 } => (JoinType::Anti, "EXCEPT"),
402 _ => (JoinType::Semi, "INTERSECT"),
403 };
404 let mut lf = self.process_query(left, query)?;
405 let mut rf = self.process_query(right, query)?;
406 let lf_schema = self.get_frame_schema(&mut lf)?;
407
408 let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
409 let rf_cols = match quantifier {
410 SetQuantifier::ByName => None,
411 SetQuantifier::Distinct | SetQuantifier::None => {
412 let rf_schema = self.get_frame_schema(&mut rf)?;
413 let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
414 if lf_cols.len() != rf_cols.len() {
415 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
416 }
417 Some(rf_cols)
418 },
419 _ => {
420 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
421 },
422 };
423 let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
424 let joined_tbl = match rf_cols {
425 Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
426 None => join.on(lf_cols).finish(),
427 };
428 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
429 }
430
431 fn process_union(
432 &mut self,
433 left: &SetExpr,
434 right: &SetExpr,
435 quantifier: &SetQuantifier,
436 query: &Query,
437 ) -> PolarsResult<LazyFrame> {
438 let mut lf = self.process_query(left, query)?;
439 let mut rf = self.process_query(right, query)?;
440 let opts = UnionArgs {
441 parallel: true,
442 to_supertypes: true,
443 ..Default::default()
444 };
445 match quantifier {
446 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
448 let lf_schema = self.get_frame_schema(&mut lf)?;
449 let rf_schema = self.get_frame_schema(&mut rf)?;
450 if lf_schema.len() != rf_schema.len() {
451 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
452 }
453 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
454 match quantifier {
455 SetQuantifier::Distinct | SetQuantifier::None => {
456 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
457 },
458 _ => concatenated,
459 }
460 },
461 #[cfg(feature = "diagonal_concat")]
463 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
464 #[cfg(feature = "diagonal_concat")]
466 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
467 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
468 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
469 },
470 #[allow(unreachable_patterns)]
471 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
472 }
473 }
474
475 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
476 let frame_rows: Vec<Row> = values.iter().map(|row| {
477 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
478 let expr = parse_sql_expr(expr, self, None)?;
479 match expr {
480 Expr::Literal(value) => {
481 value.to_any_value()
482 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
483 .map(|av| av.into_static())
484 },
485 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
486 }
487 }).collect();
488 row_data.map(Row::new)
489 }).collect::<Result<_, _>>()?;
490
491 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
492 }
493
494 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
496 match stmt {
497 Statement::Explain { statement, .. } => {
498 let lf = self.execute_statement(statement)?;
499 let plan = lf.describe_optimized_plan()?;
500 let plan = plan
501 .split('\n')
502 .collect::<Series>()
503 .with_name(PlSmallStr::from_static("Logical Plan"))
504 .into_column();
505 let df = DataFrame::new(vec![plan])?;
506 Ok(df.lazy())
507 },
508 _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
509 }
510 }
511
512 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
514 let tables = Column::new("name".into(), self.get_tables());
515 let df = DataFrame::new(vec![tables])?;
516 Ok(df.lazy())
517 }
518
519 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
521 match stmt {
522 Statement::Drop { names, .. } => {
523 names.iter().for_each(|name| {
524 self.table_map.remove(&name.to_string());
525 });
526 Ok(DataFrame::empty().lazy())
527 },
528 _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
529 }
530 }
531
532 fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
534 if let Statement::Delete(Delete {
535 tables,
536 from,
537 using,
538 selection,
539 returning,
540 order_by,
541 limit,
542 }) = stmt
543 {
544 if !tables.is_empty()
545 || using.is_some()
546 || returning.is_some()
547 || limit.is_some()
548 || !order_by.is_empty()
549 {
550 let error_message = match () {
551 _ if !tables.is_empty() => "DELETE expects exactly one table name",
552 _ if using.is_some() => "DELETE does not support the USING clause",
553 _ if returning.is_some() => "DELETE does not support the RETURNING clause",
554 _ if limit.is_some() => "DELETE does not support the LIMIT clause",
555 _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
556 _ => unreachable!(),
557 };
558 polars_bail!(SQLInterface: error_message);
559 }
560 let from_tables = match &from {
561 FromTable::WithFromKeyword(from) => from,
562 FromTable::WithoutKeyword(from) => from,
563 };
564 if from_tables.len() > 1 {
565 polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
566 }
567 let tbl_expr = from_tables.first().unwrap();
568 if !tbl_expr.joins.is_empty() {
569 polars_bail!(SQLInterface: "DELETE does not support table JOINs")
570 }
571 let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
572 if selection.is_none() {
573 Ok(DataFrame::empty_with_schema(
575 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
576 .unwrap()
577 .as_ref(),
578 )
579 .lazy())
580 } else {
581 Ok(self.process_where(lf.clone(), selection, true, None)?)
583 }
584 } else {
585 polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
586 }
587 }
588
589 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
591 if let Statement::Truncate {
592 table_names,
593 partitions,
594 ..
595 } = stmt
596 {
597 match partitions {
598 None => {
599 if table_names.len() != 1 {
600 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
601 }
602 let tbl = table_names[0].to_string();
603 if let Some(lf) = self.table_map.get_mut(&tbl) {
604 *lf = DataFrame::empty_with_schema(
605 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
606 .unwrap()
607 .as_ref(),
608 )
609 .lazy();
610 Ok(lf.clone())
611 } else {
612 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
613 }
614 },
615 _ => {
616 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
617 },
618 }
619 } else {
620 polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
621 }
622 }
623
624 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
625 self.cte_map.insert(name.to_owned(), lf);
626 }
627
628 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
629 if let Some(with) = &query.with {
630 if with.recursive {
631 polars_bail!(SQLInterface: "recursive CTEs are not supported")
632 }
633 for cte in &with.cte_tables {
634 let cte_name = cte.alias.name.value.clone();
635 let mut lf = self.execute_query(&cte.query)?;
636 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
637 self.register_cte(&cte_name, lf);
638 }
639 }
640 Ok(())
641 }
642
643 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
645 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
646 if !tbl_expr.joins.is_empty() {
647 for join in &tbl_expr.joins {
648 let (r_name, mut rf) = self.get_table(&join.relation)?;
649 if r_name.is_empty() {
650 polars_bail!(
652 SQLInterface:
653 "cannot join on unnamed relation; please provide an alias"
654 )
655 }
656 let left_schema = self.get_frame_schema(&mut lf)?;
657 let right_schema = self.get_frame_schema(&mut rf)?;
658
659 lf = match &join.join_operator {
660 op @ (JoinOperator::FullOuter(constraint)
661 | JoinOperator::LeftOuter(constraint)
662 | JoinOperator::RightOuter(constraint)
663 | JoinOperator::Inner(constraint)
664 | JoinOperator::Anti(constraint)
665 | JoinOperator::Semi(constraint)
666 | JoinOperator::LeftAnti(constraint)
667 | JoinOperator::LeftSemi(constraint)
668 | JoinOperator::RightAnti(constraint)
669 | JoinOperator::RightSemi(constraint)) => {
670 let (lf, rf) = match op {
671 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
672 _ => (lf, rf),
673 };
674 self.process_join(
675 &TableInfo {
676 frame: lf,
677 name: (&l_name).into(),
678 schema: left_schema.clone(),
679 },
680 &TableInfo {
681 frame: rf,
682 name: (&r_name).into(),
683 schema: right_schema.clone(),
684 },
685 constraint,
686 match op {
687 JoinOperator::FullOuter(_) => JoinType::Full,
688 JoinOperator::LeftOuter(_) => JoinType::Left,
689 JoinOperator::RightOuter(_) => JoinType::Right,
690 JoinOperator::Inner(_) => JoinType::Inner,
691 #[cfg(feature = "semi_anti_join")]
692 JoinOperator::Anti(_)
693 | JoinOperator::LeftAnti(_)
694 | JoinOperator::RightAnti(_) => JoinType::Anti,
695 #[cfg(feature = "semi_anti_join")]
696 JoinOperator::Semi(_)
697 | JoinOperator::LeftSemi(_)
698 | JoinOperator::RightSemi(_) => JoinType::Semi,
699 join_type => polars_bail!(
700 SQLInterface:
701 "join type '{:?}' not currently supported",
702 join_type
703 ),
704 },
705 )?
706 },
707 JoinOperator::CrossJoin => {
708 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
709 },
710 join_type => {
711 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
712 },
713 };
714
715 let joined_schema = self.get_frame_schema(&mut lf)?;
717
718 self.joined_aliases.insert(
719 r_name.clone(),
720 right_schema
721 .iter_names()
722 .filter_map(|name| {
723 let aliased_name = format!("{name}:{r_name}");
725 if left_schema.contains(name)
726 && joined_schema.contains(aliased_name.as_str())
727 {
728 Some((name.to_string(), aliased_name))
729 } else {
730 None
731 }
732 })
733 .collect::<PlHashMap<String, String>>(),
734 );
735 }
736 };
737 Ok(lf)
738 }
739
740 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
742 let mut lf = if select_stmt.from.is_empty() {
743 DataFrame::empty().lazy()
744 } else {
745 let from = select_stmt.clone().from;
748 if from.len() > 1 {
749 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
750 }
751 self.execute_from_statement(from.first().unwrap())?
752 };
753
754 let schema = self.get_frame_schema(&mut lf)?;
756 lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;
757
758 let mut select_modifiers = SelectModifiers {
760 ilike: None,
761 exclude: PlHashSet::new(),
762 rename: PlHashMap::new(),
763 replace: vec![],
764 };
765
766 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
767
768 let mut group_by_keys: Vec<Expr> = Vec::new();
770 match &select_stmt.group_by {
771 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
773 if !modifiers.is_empty() {
774 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
775 }
776 group_by_keys = group_by_exprs
778 .iter()
779 .map(|e| {
780 self.expr_or_ordinal(
781 e,
782 &projections,
783 None,
784 Some(schema.deref()),
785 "GROUP BY",
786 )
787 })
788 .collect::<PolarsResult<_>>()?
789 },
790 GroupByExpr::All(modifiers) => {
793 if !modifiers.is_empty() {
794 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
795 }
796 projections.iter().for_each(|expr| match expr {
797 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
799 Expr::Column(_) => group_by_keys.push(expr.clone()),
800 Expr::Alias(e, _)
801 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
802 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
803 if let Expr::Column(name) = &**e {
804 group_by_keys.push(col(name.clone()));
805 }
806 },
807 _ => {
808 if !has_expr(expr, |e| {
810 matches!(e, Expr::Agg(_))
811 || matches!(e, Expr::Len)
812 || matches!(e, Expr::Window { .. })
813 }) {
814 group_by_keys.push(expr.clone())
815 }
816 },
817 });
818 },
819 };
820
821 lf = if group_by_keys.is_empty() {
822 if select_stmt.having.is_some() {
824 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
825 };
826
827 let mut retained_cols = Vec::with_capacity(projections.len());
829 let mut retained_names = Vec::with_capacity(projections.len());
830 let have_order_by = query.order_by.is_some();
831 let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;
833
834 for p in projections.iter() {
839 let name = p.to_field(schema.deref())?.name.to_string();
840 if select_modifiers.matches_ilike(&name)
841 && !select_modifiers.exclude.contains(&name)
842 {
843 projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);
844
845 retained_cols.push(if have_order_by {
846 col(name.as_str())
847 } else {
848 p.clone()
849 });
850 retained_names.push(col(name));
851 }
852 }
853
854 if have_order_by {
856 if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
861 || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
862 {
863 lf = lf.with_columns(projections);
864 } else {
865 const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
875 lf = lf
876 .clone()
877 .select(projections)
878 .with_row_index(NAME, None)
879 .join(
880 lf.with_row_index(NAME, None),
881 [col(NAME)],
882 [col(NAME)],
883 JoinArgs {
884 how: JoinType::Left,
885 validation: Default::default(),
886 suffix: None,
887 slice: None,
888 nulls_equal: false,
889 coalesce: Default::default(),
890 maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
891 },
892 );
893 }
894 }
895
896 if !select_modifiers.replace.is_empty() {
897 lf = lf.with_columns(&select_modifiers.replace);
898 }
899 if !select_modifiers.rename.is_empty() {
900 lf = lf.with_columns(select_modifiers.renamed_cols());
901 }
902
903 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
904
905 if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
907 && !have_order_by
908 {
909 lf = lf.with_columns(retained_cols).select(retained_names);
911 } else {
912 lf = lf.select(retained_cols);
913 }
914
915 if !select_modifiers.rename.is_empty() {
916 lf = lf.rename(
917 select_modifiers.rename.keys(),
918 select_modifiers.rename.values(),
919 true,
920 );
921 };
922 lf
923 } else {
924 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
925 lf = self.process_order_by(lf, &query.order_by, None)?;
926
927 let schema = Some(self.get_frame_schema(&mut lf)?);
929 match select_stmt.having.as_ref() {
930 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
931 None => lf,
932 }
933 };
934
935 lf = match &select_stmt.distinct {
937 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
938 Some(Distinct::On(exprs)) => {
939 let schema = Some(self.get_frame_schema(&mut lf)?);
941 let cols = exprs
942 .iter()
943 .map(|e| {
944 let expr = parse_sql_expr(e, self, schema.as_deref())?;
945 if let Expr::Column(name) = expr {
946 Ok(name)
947 } else {
948 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
949 }
950 })
951 .collect::<PolarsResult<Vec<_>>>()?;
952
953 lf = self.process_order_by(lf, &query.order_by, None)?;
955 return Ok(lf.unique_stable(
956 Some(Selector::ByName {
957 names: cols.into(),
958 strict: true,
959 }),
960 UniqueKeepStrategy::First,
961 ));
962 },
963 None => lf,
964 };
965 Ok(lf)
966 }
967
968 fn column_projections(
969 &mut self,
970 select_stmt: &Select,
971 schema: &SchemaRef,
972 select_modifiers: &mut SelectModifiers,
973 ) -> PolarsResult<Vec<Expr>> {
974 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
975 .projection
976 .iter()
977 .map(|select_item| match select_item {
978 SelectItem::UnnamedExpr(expr) => {
979 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
980 },
981 SelectItem::ExprWithAlias { expr, alias } => {
982 let expr = parse_sql_expr(expr, self, Some(schema))?;
983 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
984 },
985 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
986 .process_qualified_wildcard(
987 obj_name,
988 wildcard_options,
989 select_modifiers,
990 Some(schema),
991 ),
992 SelectItem::Wildcard(wildcard_options) => {
993 let cols = schema
994 .iter_names()
995 .map(|name| col(name.clone()))
996 .collect::<Vec<_>>();
997
998 self.process_wildcard_additional_options(
999 cols,
1000 wildcard_options,
1001 select_modifiers,
1002 Some(schema),
1003 )
1004 },
1005 })
1006 .collect();
1007
1008 let flattened_exprs: Vec<Expr> = parsed_items?
1009 .into_iter()
1010 .flatten()
1011 .flat_map(|expr| expand_exprs(expr, schema))
1012 .collect();
1013
1014 Ok(flattened_exprs)
1015 }
1016
1017 fn process_where(
1018 &mut self,
1019 mut lf: LazyFrame,
1020 expr: &Option<SQLExpr>,
1021 invert_filter: bool,
1022 schema: Option<SchemaRef>,
1023 ) -> PolarsResult<LazyFrame> {
1024 if let Some(expr) = expr {
1025 let schema = match schema {
1026 None => self.get_frame_schema(&mut lf)?,
1027 Some(s) => s,
1028 };
1029
1030 let (all_true, all_false) = match expr {
1032 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
1033 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
1034 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
1035 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
1036 (a != b, a == b)
1037 },
1038 _ => (false, false),
1039 },
1040 _ => (false, false),
1041 };
1042 if (all_true && !invert_filter) || (all_false && invert_filter) {
1043 return Ok(lf);
1044 } else if (all_false && !invert_filter) || (all_true && invert_filter) {
1045 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
1046 }
1047
1048 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
1050 if filter_expression.clone().meta().has_multiple_outputs() {
1051 filter_expression = all_horizontal([filter_expression])?;
1052 }
1053 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
1054 lf = if invert_filter {
1055 lf.remove(filter_expression)
1056 } else {
1057 lf.filter(filter_expression)
1058 };
1059 }
1060 Ok(lf)
1061 }
1062
1063 pub(super) fn process_join(
1064 &mut self,
1065 tbl_left: &TableInfo,
1066 tbl_right: &TableInfo,
1067 constraint: &JoinConstraint,
1068 join_type: JoinType,
1069 ) -> PolarsResult<LazyFrame> {
1070 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
1071
1072 let joined = tbl_left
1073 .frame
1074 .clone()
1075 .join_builder()
1076 .with(tbl_right.frame.clone())
1077 .left_on(left_on)
1078 .right_on(right_on)
1079 .how(join_type)
1080 .suffix(format!(":{}", tbl_right.name))
1081 .coalesce(JoinCoalesce::KeepColumns)
1082 .finish();
1083
1084 Ok(joined)
1085 }
1086
1087 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
1088 let mut contexts = vec![];
1089 for expr in exprs {
1090 *expr = expr.clone().map_expr(|e| match e {
1091 Expr::SubPlan(lp, names) => {
1092 contexts.push(<LazyFrame>::from((**lp).clone()));
1093 if names.len() == 1 {
1094 Expr::Column(names[0].as_str().into())
1095 } else {
1096 Expr::SubPlan(lp, names)
1097 }
1098 },
1099 e => e,
1100 })
1101 }
1102
1103 if contexts.is_empty() {
1104 lf
1105 } else {
1106 lf.with_context(contexts)
1107 }
1108 }
1109
1110 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
1111 if let Statement::CreateTable(CreateTable {
1112 if_not_exists,
1113 name,
1114 query,
1115 ..
1116 }) = stmt
1117 {
1118 let tbl_name = name.0.first().unwrap().value.as_str();
1119 if *if_not_exists && self.table_map.contains_key(tbl_name) {
1121 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
1122 }
1124 if let Some(query) = query {
1125 let lf = self.execute_query(query)?;
1126 self.register(tbl_name, lf);
1127 let out = df! {
1128 "Response" => ["CREATE TABLE"]
1129 }
1130 .unwrap()
1131 .lazy();
1132 Ok(out)
1133 } else {
1134 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
1135 }
1136 } else {
1137 unreachable!()
1138 }
1139 }
1140
1141 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
1142 match relation {
1143 TableFactor::Table {
1144 name, alias, args, ..
1145 } => {
1146 if let Some(args) = args {
1147 return self.execute_table_function(name, alias, &args.args);
1148 }
1149 let tbl_name = name.0.first().unwrap().value.as_str();
1150 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
1151 match alias {
1152 Some(alias) => {
1153 self.table_aliases
1154 .insert(alias.name.value.clone(), tbl_name.to_string());
1155 Ok((alias.to_string(), lf))
1156 },
1157 None => Ok((tbl_name.to_string(), lf)),
1158 }
1159 } else {
1160 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1161 }
1162 },
1163 TableFactor::Derived {
1164 lateral,
1165 subquery,
1166 alias,
1167 } => {
1168 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1169 if let Some(alias) = alias {
1170 let mut lf = self.execute_query_no_ctes(subquery)?;
1171 lf = self.rename_columns_from_table_alias(lf, alias)?;
1172 self.table_map.insert(alias.name.value.clone(), lf.clone());
1173 Ok((alias.name.value.clone(), lf))
1174 } else {
1175 polars_bail!(SQLSyntax: "derived tables must have aliases");
1176 }
1177 },
1178 TableFactor::UNNEST {
1179 alias,
1180 array_exprs,
1181 with_offset,
1182 with_offset_alias: _,
1183 ..
1184 } => {
1185 if let Some(alias) = alias {
1186 let column_names: Vec<Option<PlSmallStr>> = alias
1187 .columns
1188 .iter()
1189 .map(|c| {
1190 if c.name.value.is_empty() {
1191 None
1192 } else {
1193 Some(PlSmallStr::from_str(c.name.value.as_str()))
1194 }
1195 })
1196 .collect();
1197
1198 let column_values: Vec<Series> = array_exprs
1199 .iter()
1200 .map(|arr| parse_sql_array(arr, self))
1201 .collect::<Result<_, _>>()?;
1202
1203 polars_ensure!(!column_names.is_empty(),
1204 SQLSyntax:
1205 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1206 );
1207 if column_names.len() != column_values.len() {
1208 let plural = if column_values.len() > 1 { "s" } else { "" };
1209 polars_bail!(
1210 SQLSyntax:
1211 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1212 );
1213 }
1214 let column_series: Vec<Column> = column_values
1215 .into_iter()
1216 .zip(column_names)
1217 .map(|(s, name)| {
1218 if let Some(name) = name {
1219 s.with_name(name)
1220 } else {
1221 s
1222 }
1223 })
1224 .map(Column::from)
1225 .collect();
1226
1227 let lf = DataFrame::new(column_series)?.lazy();
1228
1229 if *with_offset {
1230 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
1233 }
1234 let table_name = alias.name.value.clone();
1235 self.table_map.insert(table_name.clone(), lf.clone());
1236 Ok((table_name, lf))
1237 } else {
1238 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1239 }
1240 },
1241 TableFactor::NestedJoin {
1242 table_with_joins,
1243 alias,
1244 } => {
1245 let lf = self.execute_from_statement(table_with_joins)?;
1246 match alias {
1247 Some(a) => Ok((a.name.value.clone(), lf)),
1248 None => Ok(("".to_string(), lf)),
1249 }
1250 },
1251 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1253 }
1254 }
1255
1256 fn execute_table_function(
1257 &mut self,
1258 name: &ObjectName,
1259 alias: &Option<TableAlias>,
1260 args: &[FunctionArg],
1261 ) -> PolarsResult<(String, LazyFrame)> {
1262 let tbl_fn = name.0.first().unwrap().value.as_str();
1263 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1264 let (tbl_name, lf) = read_fn.execute(args)?;
1265 #[allow(clippy::useless_asref)]
1266 let tbl_name = alias
1267 .as_ref()
1268 .map(|a| a.name.value.clone())
1269 .unwrap_or_else(|| tbl_name.to_str().to_string());
1270
1271 self.table_map.insert(tbl_name.clone(), lf.clone());
1272 Ok((tbl_name, lf))
1273 }
1274
1275 fn process_order_by(
1276 &mut self,
1277 mut lf: LazyFrame,
1278 order_by: &Option<OrderBy>,
1279 selected: Option<&[Expr]>,
1280 ) -> PolarsResult<LazyFrame> {
1281 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1282 return Ok(lf);
1283 }
1284 let schema = self.get_frame_schema(&mut lf)?;
1285 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1286
1287 let order_by = order_by.as_ref().unwrap().exprs.clone();
1288 let mut descending = Vec::with_capacity(order_by.len());
1289 let mut nulls_last = Vec::with_capacity(order_by.len());
1290 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1291
1292 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1294 {
1295 if let Some(selected) = selected {
1296 by.extend(selected.iter().cloned());
1297 } else {
1298 by.extend(columns_iter);
1299 };
1300 let desc_order = !order_by[0].asc.unwrap_or(true);
1301 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1302 descending.resize(by.len(), desc_order);
1303 } else {
1304 let columns = &columns_iter.collect::<Vec<_>>();
1305 for ob in order_by {
1306 let desc_order = !ob.asc.unwrap_or(true);
1309 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1310 descending.push(desc_order);
1311
1312 by.push(self.expr_or_ordinal(
1314 &ob.expr,
1315 columns,
1316 selected,
1317 Some(&schema),
1318 "ORDER BY",
1319 )?)
1320 }
1321 }
1322 Ok(lf.sort_by_exprs(
1323 &by,
1324 SortMultipleOptions::default()
1325 .with_order_descending_multi(descending)
1326 .with_nulls_last_multi(nulls_last)
1327 .with_maintain_order(true),
1328 ))
1329 }
1330
1331 fn process_group_by(
1332 &mut self,
1333 mut lf: LazyFrame,
1334 group_by_keys: &[Expr],
1335 projections: &[Expr],
1336 ) -> PolarsResult<LazyFrame> {
1337 let schema_before = self.get_frame_schema(&mut lf)?;
1338 let group_by_keys_schema =
1339 expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
1340 format!("group_by keys contained duplicate output name '{duplicate_name}'")
1341 })?;
1342
1343 let mut aggregation_projection = Vec::with_capacity(projections.len());
1345 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1346 let mut projection_aliases = PlHashSet::new();
1347 let mut group_key_aliases = PlHashSet::new();
1348
1349 for mut e in projections {
1350 let is_non_group_key_expr = has_expr(e, |e| {
1352 match e {
1353 Expr::Agg(_) | Expr::Len | Expr::Window { .. } => true,
1354 Expr::Function { function: func, .. }
1355 if !matches!(func, FunctionExpr::StructExpr(_)) =>
1356 {
1357 has_expr(e, |e| match e {
1360 Expr::Column(name) => !group_by_keys_schema.contains(name),
1361 _ => false,
1362 })
1363 },
1364 _ => false,
1365 }
1366 });
1367
1368 if let Expr::Alias(expr, alias) = e {
1370 if e.clone().meta().is_simple_projection(Some(&schema_before)) {
1371 group_key_aliases.insert(alias.as_ref());
1372 e = expr
1373 } else if let Expr::Function {
1374 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1375 ..
1376 } = expr.deref()
1377 {
1378 projection_overrides
1379 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1380 } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
1381 projection_aliases.insert(alias.as_ref());
1382 }
1383 }
1384 let field = e.to_field(&schema_before)?;
1385 if group_by_keys_schema.get(&field.name).is_none() && is_non_group_key_expr {
1386 let mut e = e.clone();
1387 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1388 e = (**expr).clone();
1389 } else if let Expr::Alias(expr, name) = &e {
1390 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1391 e = (**expr).clone().alias(name.clone());
1392 }
1393 }
1394 aggregation_projection.push(e);
1395 } else if let Expr::Column(_)
1396 | Expr::Function {
1397 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1398 ..
1399 } = e
1400 {
1401 if !group_by_keys_schema.contains(&field.name) {
1403 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1404 }
1405 }
1406 }
1407 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1408 let projection_schema =
1409 expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
1410 format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
1411 })?;
1412
1413 let final_projection = projection_schema
1415 .iter_names()
1416 .zip(projections)
1417 .map(|(name, projection_expr)| {
1418 if let Some(expr) = projection_overrides.get(name.as_str()) {
1419 expr.clone()
1420 } else if group_by_keys_schema.get(name).is_some()
1421 || projection_aliases.contains(name.as_str())
1422 || group_key_aliases.contains(name.as_str())
1423 {
1424 projection_expr.clone()
1425 } else {
1426 col(name.clone())
1427 }
1428 })
1429 .collect::<Vec<_>>();
1430
1431 Ok(aggregated.select(&final_projection))
1432 }
1433
1434 fn process_limit_offset(
1435 &self,
1436 lf: LazyFrame,
1437 limit: &Option<SQLExpr>,
1438 offset: &Option<Offset>,
1439 ) -> PolarsResult<LazyFrame> {
1440 match (offset, limit) {
1441 (
1442 Some(Offset {
1443 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1444 ..
1445 }),
1446 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1447 ) => Ok(lf.slice(
1448 offset
1449 .parse()
1450 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1451 limit
1452 .parse()
1453 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1454 )),
1455 (
1456 Some(Offset {
1457 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1458 ..
1459 }),
1460 None,
1461 ) => Ok(lf.slice(
1462 offset
1463 .parse()
1464 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1465 IdxSize::MAX,
1466 )),
1467 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1468 limit
1469 .parse()
1470 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1471 )),
1472 (None, None) => Ok(lf),
1473 _ => polars_bail!(
1474 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1475 ),
1476 }
1477 }
1478
1479 fn process_qualified_wildcard(
1480 &mut self,
1481 ObjectName(idents): &ObjectName,
1482 options: &WildcardAdditionalOptions,
1483 modifiers: &mut SelectModifiers,
1484 schema: Option<&Schema>,
1485 ) -> PolarsResult<Vec<Expr>> {
1486 let mut new_idents = idents.clone();
1487 new_idents.push(Ident::new("*"));
1488
1489 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1490 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1491 }
1492
1493 fn process_wildcard_additional_options(
1494 &mut self,
1495 exprs: Vec<Expr>,
1496 options: &WildcardAdditionalOptions,
1497 modifiers: &mut SelectModifiers,
1498 schema: Option<&Schema>,
1499 ) -> PolarsResult<Vec<Expr>> {
1500 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1501 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1502 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1503 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1504 }
1505
1506 if let Some(items) = &options.opt_exclude {
1508 match items {
1509 ExcludeSelectItem::Single(ident) => {
1510 modifiers.exclude.insert(ident.value.clone());
1511 },
1512 ExcludeSelectItem::Multiple(idents) => {
1513 modifiers
1514 .exclude
1515 .extend(idents.iter().map(|i| i.value.clone()));
1516 },
1517 };
1518 }
1519
1520 if let Some(items) = &options.opt_except {
1522 modifiers.exclude.insert(items.first_element.value.clone());
1523 modifiers
1524 .exclude
1525 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1526 }
1527
1528 if let Some(item) = &options.opt_ilike {
1530 let rx = regex::escape(item.pattern.as_str())
1531 .replace('%', ".*")
1532 .replace('_', ".");
1533
1534 modifiers.ilike = Some(
1535 polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
1536 );
1537 }
1538
1539 if let Some(items) = &options.opt_rename {
1541 let renames = match items {
1542 RenameSelectItem::Single(rename) => vec![rename],
1543 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1544 };
1545 for rn in renames {
1546 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1547 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1548 if before != after {
1549 modifiers.rename.insert(before, after);
1550 }
1551 }
1552 }
1553
1554 if let Some(replacements) = &options.opt_replace {
1556 for rp in &replacements.items {
1557 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1558 modifiers
1559 .replace
1560 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1561 }
1562 }
1563 Ok(exprs)
1564 }
1565
1566 fn rename_columns_from_table_alias(
1567 &mut self,
1568 mut lf: LazyFrame,
1569 alias: &TableAlias,
1570 ) -> PolarsResult<LazyFrame> {
1571 if alias.columns.is_empty() {
1572 Ok(lf)
1573 } else {
1574 let schema = self.get_frame_schema(&mut lf)?;
1575 if alias.columns.len() != schema.len() {
1576 polars_bail!(
1577 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1578 alias.columns.len(), alias.name.value, schema.len()
1579 )
1580 } else {
1581 let existing_columns: Vec<_> = schema.iter_names().collect();
1582 let new_columns: Vec<_> =
1583 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1584 Ok(lf.rename(existing_columns, new_columns, true))
1585 }
1586 }
1587 }
1588}
1589
1590impl SQLContext {
1591 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1593 self.table_map.clone()
1594 }
1595
1596 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1598 Self {
1599 table_map,
1600 ..Default::default()
1601 }
1602 }
1603}
1604
1605fn collect_compound_identifiers(
1606 left: &[Ident],
1607 right: &[Ident],
1608 left_name: &str,
1609 right_name: &str,
1610) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1611 if left.len() == 2 && right.len() == 2 {
1612 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1613 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1614
1615 if left_name == tbl_b || right_name == tbl_a {
1617 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1618 } else {
1619 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1620 }
1621 } else {
1622 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1623 }
1624}
1625
1626fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1627 match expr {
1628 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1629 let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
1630 schema
1631 .iter_names()
1632 .filter(|name| re.is_match(name))
1633 .map(|name| col(name.clone()))
1634 .collect::<Vec<_>>()
1635 },
1636 Expr::Selector(s) => s
1637 .into_columns(schema, &Default::default())
1638 .unwrap()
1639 .into_iter()
1640 .map(col)
1641 .collect::<Vec<_>>(),
1642 _ => vec![expr],
1643 }
1644}
1645
1646fn is_regex_colname(nm: &str) -> bool {
1647 nm.starts_with('^') && nm.ends_with('$')
1648}
1649
1650fn process_join_on(
1651 expression: &sqlparser::ast::Expr,
1652 tbl_left: &TableInfo,
1653 tbl_right: &TableInfo,
1654) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1655 match expression {
1656 SQLExpr::BinaryOp { left, op, right } => match op {
1657 BinaryOperator::And => {
1658 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1659 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1660 left_i.append(&mut left_j);
1661 right_i.append(&mut right_j);
1662 Ok((left_i, right_i))
1663 },
1664 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1665 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1666 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1667 },
1668 _ => {
1669 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1670 },
1671 },
1672 _ => {
1673 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1674 },
1675 },
1676 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1677 _ => {
1678 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1679 },
1680 }
1681}
1682
1683fn process_join_constraint(
1684 constraint: &JoinConstraint,
1685 tbl_left: &TableInfo,
1686 tbl_right: &TableInfo,
1687) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1688 match constraint {
1689 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1690 process_join_on(expr, tbl_left, tbl_right)
1691 },
1692 JoinConstraint::Using(idents) if !idents.is_empty() => {
1693 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1694 Ok((using.clone(), using))
1695 },
1696 JoinConstraint::Natural => {
1697 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1698 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1699 let on: Vec<Expr> = left_names
1700 .intersection(&right_names)
1701 .map(|&name| col(name.clone()))
1702 .collect();
1703 if on.is_empty() {
1704 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1705 }
1706 Ok((on.clone(), on))
1707 },
1708 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1709 }
1710}
1711
1712bitflags::bitflags! {
1713 #[derive(PartialEq)]
1718 struct ExprSqlProjectionHeightBehavior: u8 {
1719 const MaintainsColumn = 1 << 0;
1721 const Independent = 1 << 1;
1725 const InheritsContext = 1 << 2;
1728 }
1729}
1730
1731impl ExprSqlProjectionHeightBehavior {
1732 fn identify_from_expr(expr: &Expr) -> Self {
1733 let mut has_column = false;
1734 let mut has_independent = false;
1735
1736 for e in expr.into_iter() {
1737 use Expr::*;
1738 has_column |= matches!(e, Column(_) | Selector(_));
1739 has_independent |= match e {
1740 AnonymousFunction { options, .. } => {
1742 options.returns_scalar() || !options.is_length_preserving()
1743 },
1744 Literal(v) => !v.is_scalar(),
1745 Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
1746 Agg { .. } | Len => true,
1747 _ => false,
1748 }
1749 }
1750 if has_independent {
1751 Self::Independent
1752 } else if has_column {
1753 Self::MaintainsColumn
1754 } else {
1755 Self::InheritsContext
1756 }
1757 }
1758}