1use std::cell::RefCell;
2use std::ops::Deref;
3
4use polars_core::frame::row::Row;
5use polars_core::prelude::*;
6use polars_lazy::prelude::*;
7use polars_ops::frame::JoinCoalesce;
8use polars_plan::dsl::function_expr::StructFunction;
9use polars_plan::prelude::*;
10use polars_utils::format_pl_smallstr;
11use sqlparser::ast::{
12 BinaryOperator, CreateTable, Distinct, ExcludeSelectItem, Expr as SQLExpr, FunctionArg,
13 GroupByExpr, Ident, JoinConstraint, JoinOperator, ObjectName, ObjectType, Offset, OrderBy,
14 Query, RenameSelectItem, Select, SelectItem, SetExpr, SetOperator, SetQuantifier, Statement,
15 TableAlias, TableFactor, TableWithJoins, UnaryOperator, Value as SQLValue, Values,
16 WildcardAdditionalOptions,
17};
18use sqlparser::dialect::GenericDialect;
19use sqlparser::parser::{Parser, ParserOptions};
20
21use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
22use crate::sql_expr::{
23 parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
24};
25use crate::table_functions::PolarsTableFunctions;
26
27#[derive(Clone)]
28pub struct TableInfo {
29 pub(crate) frame: LazyFrame,
30 pub(crate) name: String,
31 pub(crate) schema: Arc<Schema>,
32}
33
34struct SelectModifiers {
35 exclude: PlHashSet<String>, ilike: Option<regex::Regex>, rename: PlHashMap<PlSmallStr, PlSmallStr>, replace: Vec<Expr>, }
40impl SelectModifiers {
41 fn matches_ilike(&self, s: &str) -> bool {
42 match &self.ilike {
43 Some(rx) => rx.is_match(s),
44 None => true,
45 }
46 }
47 fn renamed_cols(&self) -> Vec<Expr> {
48 self.rename
49 .iter()
50 .map(|(before, after)| col(before.clone()).alias(after.clone()))
51 .collect()
52 }
53}
54
55#[derive(Clone)]
57pub struct SQLContext {
58 pub(crate) table_map: PlHashMap<String, LazyFrame>,
59 pub(crate) function_registry: Arc<dyn FunctionRegistry>,
60 pub(crate) lp_arena: Arena<IR>,
61 pub(crate) expr_arena: Arena<AExpr>,
62
63 cte_map: RefCell<PlHashMap<String, LazyFrame>>,
64 table_aliases: RefCell<PlHashMap<String, String>>,
65 joined_aliases: RefCell<PlHashMap<String, PlHashMap<String, String>>>,
66}
67
68impl Default for SQLContext {
69 fn default() -> Self {
70 Self {
71 function_registry: Arc::new(DefaultFunctionRegistry {}),
72 table_map: Default::default(),
73 cte_map: Default::default(),
74 table_aliases: Default::default(),
75 joined_aliases: Default::default(),
76 lp_arena: Default::default(),
77 expr_arena: Default::default(),
78 }
79 }
80}
81
82impl SQLContext {
83 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn get_tables(&self) -> Vec<String> {
96 let mut tables = Vec::from_iter(self.table_map.keys().cloned());
97 tables.sort_unstable();
98 tables
99 }
100
101 pub fn register(&mut self, name: &str, lf: LazyFrame) {
117 self.table_map.insert(name.to_owned(), lf);
118 }
119
120 pub fn unregister(&mut self, name: &str) {
122 self.table_map.remove(&name.to_owned());
123 }
124
125 pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
144 let mut parser = Parser::new(&GenericDialect);
145 parser = parser.with_options(ParserOptions {
146 trailing_commas: true,
147 ..Default::default()
148 });
149
150 let ast = parser
151 .try_with_sql(query)
152 .map_err(to_sql_interface_err)?
153 .parse_statements()
154 .map_err(to_sql_interface_err)?;
155
156 polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
157 let res = self.execute_statement(ast.first().unwrap())?;
158
159 let lp_arena = std::mem::take(&mut self.lp_arena);
162 let expr_arena = std::mem::take(&mut self.expr_arena);
163 res.set_cached_arena(lp_arena, expr_arena);
164
165 self.cte_map.borrow_mut().clear();
167 self.table_aliases.borrow_mut().clear();
168 self.joined_aliases.borrow_mut().clear();
169
170 Ok(res)
171 }
172
173 pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
176 self.function_registry = function_registry;
177 self
178 }
179
180 pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
182 &self.function_registry
183 }
184
185 pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
187 Arc::get_mut(&mut self.function_registry).unwrap()
188 }
189}
190
191impl SQLContext {
192 pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
193 let ast = stmt;
194 Ok(match ast {
195 Statement::Query(query) => self.execute_query(query)?,
196 stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
197 stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
198 stmt @ Statement::Drop {
199 object_type: ObjectType::Table,
200 ..
201 } => self.execute_drop_table(stmt)?,
202 stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
203 stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
204 _ => polars_bail!(
205 SQLInterface: "statement type {:?} is not supported", ast,
206 ),
207 })
208 }
209
210 pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
211 self.register_ctes(query)?;
212 self.execute_query_no_ctes(query)
213 }
214
215 pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
216 let lf = self.process_query(&query.body, query)?;
217 self.process_limit_offset(lf, &query.limit, &query.offset)
218 }
219
220 pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
221 frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
222 }
223
224 pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
225 let table = self.table_map.get(name).cloned();
226 table
227 .or_else(|| self.cte_map.borrow().get(name).cloned())
228 .or_else(|| {
229 self.table_aliases
230 .borrow()
231 .get(name)
232 .and_then(|alias| self.table_map.get(alias).cloned())
233 })
234 }
235
236 fn expr_or_ordinal(
237 &mut self,
238 e: &SQLExpr,
239 exprs: &[Expr],
240 selected: Option<&[Expr]>,
241 schema: Option<&Schema>,
242 clause: &str,
243 ) -> PolarsResult<Expr> {
244 match e {
245 SQLExpr::UnaryOp {
246 op: UnaryOperator::Minus,
247 expr,
248 } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
249 if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
250 Err(polars_err!(
251 SQLSyntax:
252 "negative ordinal values are invalid for {}; found -{}",
253 clause,
254 idx
255 ))
256 } else {
257 unreachable!()
258 }
259 },
260 SQLExpr::Value(SQLValue::Number(idx, _)) => {
261 let idx = idx.parse::<usize>().map_err(|_| {
263 polars_err!(
264 SQLSyntax:
265 "negative ordinal values are invalid for {}; found {}",
266 clause,
267 idx
268 )
269 })?;
270 let cols = if let Some(cols) = selected {
273 cols
274 } else {
275 exprs
276 };
277 Ok(cols
278 .get(idx - 1)
279 .ok_or_else(|| {
280 polars_err!(
281 SQLInterface:
282 "{} ordinal value must refer to a valid column; found {}",
283 clause,
284 idx
285 )
286 })?
287 .clone())
288 },
289 SQLExpr::Value(v) => Err(polars_err!(
290 SQLSyntax:
291 "{} requires a valid expression or positive ordinal; found {}", clause, v,
292 )),
293 _ => parse_sql_expr(e, self, schema),
294 }
295 }
296
297 pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
298 if self.joined_aliases.borrow().contains_key(tbl_name) {
299 self.joined_aliases
300 .borrow()
301 .get(tbl_name)
302 .and_then(|aliases| aliases.get(column_name))
303 .cloned()
304 .unwrap_or_else(|| column_name.to_string())
305 } else {
306 column_name.to_string()
307 }
308 }
309
310 fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
311 match expr {
312 SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
313 SetExpr::Query(query) => self.execute_query_no_ctes(query),
314 SetExpr::SetOperation {
315 op: SetOperator::Union,
316 set_quantifier,
317 left,
318 right,
319 } => self.process_union(left, right, set_quantifier, query),
320
321 #[cfg(feature = "semi_anti_join")]
322 SetExpr::SetOperation {
323 op: SetOperator::Intersect | SetOperator::Except,
324 set_quantifier,
325 left,
326 right,
327 } => self.process_except_intersect(left, right, set_quantifier, query),
328
329 SetExpr::Values(Values {
330 explicit_row: _,
331 rows,
332 }) => self.process_values(rows),
333
334 SetExpr::Table(tbl) => {
335 if tbl.table_name.is_some() {
336 let table_name = tbl.table_name.as_ref().unwrap();
337 self.get_table_from_current_scope(table_name)
338 .ok_or_else(|| {
339 polars_err!(
340 SQLInterface: "no table or alias named '{}' found",
341 tbl
342 )
343 })
344 } else {
345 polars_bail!(SQLInterface: "'TABLE' requires valid table name")
346 }
347 },
348 op => {
349 let op = match op {
350 SetExpr::SetOperation { op, .. } => op,
351 _ => unreachable!(),
352 };
353 polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
354 },
355 }
356 }
357
358 #[cfg(feature = "semi_anti_join")]
359 fn process_except_intersect(
360 &mut self,
361 left: &SetExpr,
362 right: &SetExpr,
363 quantifier: &SetQuantifier,
364 query: &Query,
365 ) -> PolarsResult<LazyFrame> {
366 let (join_type, op_name) = match *query.body {
367 SetExpr::SetOperation {
368 op: SetOperator::Except,
369 ..
370 } => (JoinType::Anti, "EXCEPT"),
371 _ => (JoinType::Semi, "INTERSECT"),
372 };
373 let mut lf = self.process_query(left, query)?;
374 let mut rf = self.process_query(right, query)?;
375 let join = lf
376 .clone()
377 .join_builder()
378 .with(rf.clone())
379 .how(join_type)
380 .join_nulls(true);
381
382 let lf_schema = self.get_frame_schema(&mut lf)?;
383 let lf_cols: Vec<_> = lf_schema.iter_names().map(|nm| col(nm.clone())).collect();
384 let joined_tbl = match quantifier {
385 SetQuantifier::ByName => join.on(lf_cols).finish(),
386 SetQuantifier::Distinct | SetQuantifier::None => {
387 let rf_schema = self.get_frame_schema(&mut rf)?;
388 let rf_cols: Vec<_> = rf_schema.iter_names().map(|nm| col(nm.clone())).collect();
389 if lf_cols.len() != rf_cols.len() {
390 polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
391 }
392 join.left_on(lf_cols).right_on(rf_cols).finish()
393 },
394 _ => {
395 polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
396 },
397 };
398 Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
399 }
400
401 fn process_union(
402 &mut self,
403 left: &SetExpr,
404 right: &SetExpr,
405 quantifier: &SetQuantifier,
406 query: &Query,
407 ) -> PolarsResult<LazyFrame> {
408 let mut lf = self.process_query(left, query)?;
409 let mut rf = self.process_query(right, query)?;
410 let opts = UnionArgs {
411 parallel: true,
412 to_supertypes: true,
413 ..Default::default()
414 };
415 match quantifier {
416 SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
418 let lf_schema = self.get_frame_schema(&mut lf)?;
419 let rf_schema = self.get_frame_schema(&mut rf)?;
420 if lf_schema.len() != rf_schema.len() {
421 polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
422 }
423 let concatenated = polars_lazy::dsl::concat(vec![lf, rf], opts);
424 match quantifier {
425 SetQuantifier::Distinct | SetQuantifier::None => {
426 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
427 },
428 _ => concatenated,
429 }
430 },
431 #[cfg(feature = "diagonal_concat")]
433 SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
434 #[cfg(feature = "diagonal_concat")]
436 SetQuantifier::ByName | SetQuantifier::DistinctByName => {
437 let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
438 concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
439 },
440 #[allow(unreachable_patterns)]
441 _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
442 }
443 }
444
445 fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
446 let frame_rows: Vec<Row> = values.iter().map(|row| {
447 let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
448 let expr = parse_sql_expr(expr, self, None)?;
449 match expr {
450 Expr::Literal(value) => {
451 value.to_any_value()
452 .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
453 .map(|av| av.into_static())
454 },
455 _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
456 }
457 }).collect();
458 row_data.map(Row::new)
459 }).collect::<Result<_, _>>()?;
460
461 Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
462 }
463
464 fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
466 match stmt {
467 Statement::Explain { statement, .. } => {
468 let lf = self.execute_statement(statement)?;
469 let plan = lf.describe_optimized_plan()?;
470 let plan = plan
471 .split('\n')
472 .collect::<Series>()
473 .with_name(PlSmallStr::from_static("Logical Plan"))
474 .into_column();
475 let df = DataFrame::new(vec![plan])?;
476 Ok(df.lazy())
477 },
478 _ => unreachable!(),
479 }
480 }
481
482 fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
484 let tables = Column::new("name".into(), self.get_tables());
485 let df = DataFrame::new(vec![tables])?;
486 Ok(df.lazy())
487 }
488
489 fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
490 match stmt {
491 Statement::Drop { names, .. } => {
492 names.iter().for_each(|name| {
493 self.table_map.remove(&name.to_string());
494 });
495 Ok(DataFrame::empty().lazy())
496 },
497 _ => unreachable!(),
498 }
499 }
500
501 fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
502 if let Statement::Truncate {
503 table_names,
504 partitions,
505 ..
506 } = stmt
507 {
508 match partitions {
509 None => {
510 if table_names.len() != 1 {
511 polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
512 }
513 let tbl = table_names[0].to_string();
514 if let Some(lf) = self.table_map.get_mut(&tbl) {
515 *lf = DataFrame::empty_with_schema(
516 lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
517 .unwrap()
518 .as_ref(),
519 )
520 .lazy();
521 Ok(lf.clone())
522 } else {
523 polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
524 }
525 },
526 _ => {
527 polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
528 },
529 }
530 } else {
531 unreachable!()
532 }
533 }
534
535 fn register_cte(&mut self, name: &str, lf: LazyFrame) {
536 self.cte_map.borrow_mut().insert(name.to_owned(), lf);
537 }
538
539 fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
540 if let Some(with) = &query.with {
541 if with.recursive {
542 polars_bail!(SQLInterface: "recursive CTEs are not supported")
543 }
544 for cte in &with.cte_tables {
545 let cte_name = cte.alias.name.value.clone();
546 let mut lf = self.execute_query(&cte.query)?;
547 lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
548 self.register_cte(&cte_name, lf);
549 }
550 }
551 Ok(())
552 }
553
554 fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
556 let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
557 if !tbl_expr.joins.is_empty() {
558 for join in &tbl_expr.joins {
559 let (r_name, mut rf) = self.get_table(&join.relation)?;
560 let left_schema = self.get_frame_schema(&mut lf)?;
561 let right_schema = self.get_frame_schema(&mut rf)?;
562
563 lf = match &join.join_operator {
564 op @ (JoinOperator::FullOuter(constraint)
565 | JoinOperator::LeftOuter(constraint)
566 | JoinOperator::RightOuter(constraint)
567 | JoinOperator::Inner(constraint)
568 | JoinOperator::Anti(constraint)
569 | JoinOperator::Semi(constraint)
570 | JoinOperator::LeftAnti(constraint)
571 | JoinOperator::LeftSemi(constraint)
572 | JoinOperator::RightAnti(constraint)
573 | JoinOperator::RightSemi(constraint)) => {
574 let (lf, rf) = match op {
575 JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
576 _ => (lf, rf),
577 };
578 self.process_join(
579 &TableInfo {
580 frame: lf,
581 name: l_name.clone(),
582 schema: left_schema.clone(),
583 },
584 &TableInfo {
585 frame: rf,
586 name: r_name.clone(),
587 schema: right_schema.clone(),
588 },
589 constraint,
590 match op {
591 JoinOperator::FullOuter(_) => JoinType::Full,
592 JoinOperator::LeftOuter(_) => JoinType::Left,
593 JoinOperator::RightOuter(_) => JoinType::Right,
594 JoinOperator::Inner(_) => JoinType::Inner,
595 #[cfg(feature = "semi_anti_join")]
596 JoinOperator::Anti(_) | JoinOperator::LeftAnti(_) | JoinOperator::RightAnti(_) => JoinType::Anti,
597 #[cfg(feature = "semi_anti_join")]
598 JoinOperator::Semi(_) | JoinOperator::LeftSemi(_) | JoinOperator::RightSemi(_) => JoinType::Semi,
599 join_type => polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type),
600 },
601 )?
602 },
603 JoinOperator::CrossJoin => {
604 lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
605 },
606 join_type => {
607 polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
608 },
609 };
610
611 let joined_schema = self.get_frame_schema(&mut lf)?;
613
614 self.joined_aliases.borrow_mut().insert(
615 r_name.to_string(),
616 right_schema
617 .iter_names()
618 .filter_map(|name| {
619 let aliased_name = format!("{}:{}", name, r_name);
621 if left_schema.contains(name)
622 && joined_schema.contains(aliased_name.as_str())
623 {
624 Some((name.to_string(), aliased_name))
625 } else {
626 None
627 }
628 })
629 .collect::<PlHashMap<String, String>>(),
630 );
631 }
632 };
633 Ok(lf)
634 }
635
636 fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
638 let mut lf = if select_stmt.from.is_empty() {
639 DataFrame::empty().lazy()
640 } else {
641 let from = select_stmt.clone().from;
644 if from.len() > 1 {
645 polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
646 }
647 self.execute_from_statement(from.first().unwrap())?
648 };
649
650 let schema = self.get_frame_schema(&mut lf)?;
652 lf = self.process_where(lf, &select_stmt.selection)?;
653
654 let mut select_modifiers = SelectModifiers {
656 ilike: None,
657 exclude: PlHashSet::new(),
658 rename: PlHashMap::new(),
659 replace: vec![],
660 };
661
662 let projections = self.column_projections(select_stmt, &schema, &mut select_modifiers)?;
663
664 let mut group_by_keys: Vec<Expr> = Vec::new();
666 match &select_stmt.group_by {
667 GroupByExpr::Expressions(group_by_exprs, modifiers) => {
669 if !modifiers.is_empty() {
670 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
671 }
672 group_by_keys = group_by_exprs
674 .iter()
675 .map(|e| {
676 self.expr_or_ordinal(
677 e,
678 &projections,
679 None,
680 Some(schema.deref()),
681 "GROUP BY",
682 )
683 })
684 .collect::<PolarsResult<_>>()?
685 },
686 GroupByExpr::All(modifiers) => {
689 if !modifiers.is_empty() {
690 polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
691 }
692 projections.iter().for_each(|expr| match expr {
693 Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
695 Expr::Column(_) => group_by_keys.push(expr.clone()),
696 Expr::Alias(e, _)
697 if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
698 Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
699 if let Expr::Column(name) = &**e {
700 group_by_keys.push(col(name.clone()));
701 }
702 },
703 _ => {
704 if !has_expr(expr, |e| {
706 matches!(e, Expr::Agg(_))
707 || matches!(e, Expr::Len)
708 || matches!(e, Expr::Window { .. })
709 }) {
710 group_by_keys.push(expr.clone())
711 }
712 },
713 });
714 },
715 };
716
717 lf = if group_by_keys.is_empty() {
718 if select_stmt.having.is_some() {
720 polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
721 };
722
723 let mut retained_cols = Vec::with_capacity(projections.len());
725 let mut retained_names = Vec::with_capacity(projections.len());
726 let have_order_by = query.order_by.is_some();
727 let mut all_literal = true;
728
729 for p in projections.iter() {
734 let name = p
735 .to_field(schema.deref(), Context::Default)?
736 .name
737 .to_string();
738 if select_modifiers.matches_ilike(&name)
739 && !select_modifiers.exclude.contains(&name)
740 {
741 all_literal &= expr_to_leaf_column_names_iter(p).next().is_none();
742 retained_cols.push(if have_order_by {
743 col(name.as_str())
744 } else {
745 p.clone()
746 });
747 retained_names.push(col(name));
748 }
749 }
750
751 if have_order_by {
753 lf = lf.with_columns(projections);
754 }
755 if !select_modifiers.replace.is_empty() {
756 lf = lf.with_columns(&select_modifiers.replace);
757 }
758 if !select_modifiers.rename.is_empty() {
759 lf = lf.with_columns(select_modifiers.renamed_cols());
760 }
761
762 lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;
763
764 if all_literal && !have_order_by {
765 lf = lf.with_columns(retained_cols).select(retained_names);
766 } else {
767 lf = lf.select(retained_cols);
768 }
769
770 if !select_modifiers.rename.is_empty() {
771 lf = lf.rename(
772 select_modifiers.rename.keys(),
773 select_modifiers.rename.values(),
774 true,
775 );
776 };
777 lf
778 } else {
779 lf = self.process_group_by(lf, &group_by_keys, &projections)?;
780 lf = self.process_order_by(lf, &query.order_by, None)?;
781
782 let schema = Some(self.get_frame_schema(&mut lf)?);
784 match select_stmt.having.as_ref() {
785 Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
786 None => lf,
787 }
788 };
789
790 lf = match &select_stmt.distinct {
792 Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
793 Some(Distinct::On(exprs)) => {
794 let schema = Some(self.get_frame_schema(&mut lf)?);
796 let cols = exprs
797 .iter()
798 .map(|e| {
799 let expr = parse_sql_expr(e, self, schema.as_deref())?;
800 if let Expr::Column(name) = expr {
801 Ok(name.clone())
802 } else {
803 Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
804 }
805 })
806 .collect::<PolarsResult<Vec<_>>>()?;
807
808 lf = self.process_order_by(lf, &query.order_by, None)?;
810 return Ok(lf.unique_stable(Some(cols.clone()), UniqueKeepStrategy::First));
811 },
812 None => lf,
813 };
814 Ok(lf)
815 }
816
817 fn column_projections(
818 &mut self,
819 select_stmt: &Select,
820 schema: &SchemaRef,
821 select_modifiers: &mut SelectModifiers,
822 ) -> PolarsResult<Vec<Expr>> {
823 let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
824 .projection
825 .iter()
826 .map(|select_item| match select_item {
827 SelectItem::UnnamedExpr(expr) => {
828 Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
829 },
830 SelectItem::ExprWithAlias { expr, alias } => {
831 let expr = parse_sql_expr(expr, self, Some(schema))?;
832 Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
833 },
834 SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
835 .process_qualified_wildcard(
836 obj_name,
837 wildcard_options,
838 select_modifiers,
839 Some(schema),
840 ),
841 SelectItem::Wildcard(wildcard_options) => {
842 let cols = schema
843 .iter_names()
844 .map(|name| col(name.clone()))
845 .collect::<Vec<_>>();
846
847 self.process_wildcard_additional_options(
848 cols,
849 wildcard_options,
850 select_modifiers,
851 Some(schema),
852 )
853 },
854 })
855 .collect();
856
857 let flattened_exprs: Vec<Expr> = parsed_items?
858 .into_iter()
859 .flatten()
860 .flat_map(|expr| expand_exprs(expr, schema))
861 .collect();
862
863 Ok(flattened_exprs)
864 }
865
866 fn process_where(
867 &mut self,
868 mut lf: LazyFrame,
869 expr: &Option<SQLExpr>,
870 ) -> PolarsResult<LazyFrame> {
871 if let Some(expr) = expr {
872 let schema = self.get_frame_schema(&mut lf)?;
873
874 let (all_true, all_false) = match expr {
876 SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
877 SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
878 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
879 (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
880 (a != b, a == b)
881 },
882 _ => (false, false),
883 },
884 _ => (false, false),
885 };
886 if all_true {
887 return Ok(lf);
888 } else if all_false {
889 return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
890 }
891
892 let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
894 if filter_expression.clone().meta().has_multiple_outputs() {
895 filter_expression = all_horizontal([filter_expression])?;
896 }
897 lf = self.process_subqueries(lf, vec![&mut filter_expression]);
898 lf = lf.filter(filter_expression);
899 }
900 Ok(lf)
901 }
902
903 pub(super) fn process_join(
904 &mut self,
905 tbl_left: &TableInfo,
906 tbl_right: &TableInfo,
907 constraint: &JoinConstraint,
908 join_type: JoinType,
909 ) -> PolarsResult<LazyFrame> {
910 let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right)?;
911
912 let joined = tbl_left
913 .frame
914 .clone()
915 .join_builder()
916 .with(tbl_right.frame.clone())
917 .left_on(left_on)
918 .right_on(right_on)
919 .how(join_type)
920 .suffix(format!(":{}", tbl_right.name))
921 .coalesce(JoinCoalesce::KeepColumns)
922 .finish();
923
924 Ok(joined)
925 }
926
927 fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
928 let mut contexts = vec![];
929 for expr in exprs {
930 *expr = expr.clone().map_expr(|e| match e {
931 Expr::SubPlan(lp, names) => {
932 contexts.push(<LazyFrame>::from((**lp).clone()));
933 if names.len() == 1 {
934 Expr::Column(names[0].as_str().into())
935 } else {
936 Expr::SubPlan(lp, names)
937 }
938 },
939 e => e,
940 })
941 }
942
943 if contexts.is_empty() {
944 lf
945 } else {
946 lf.with_context(contexts)
947 }
948 }
949
950 fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
951 if let Statement::CreateTable(CreateTable {
952 if_not_exists,
953 name,
954 query,
955 ..
956 }) = stmt
957 {
958 let tbl_name = name.0.first().unwrap().value.as_str();
959 if *if_not_exists && self.table_map.contains_key(tbl_name) {
961 polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
962 }
964 if let Some(query) = query {
965 let lf = self.execute_query(query)?;
966 self.register(tbl_name, lf);
967 let out = df! {
968 "Response" => ["CREATE TABLE"]
969 }
970 .unwrap()
971 .lazy();
972 Ok(out)
973 } else {
974 polars_bail!(SQLInterface: "only `CREATE TABLE AS SELECT ...` is currently supported");
975 }
976 } else {
977 unreachable!()
978 }
979 }
980
981 fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
982 match relation {
983 TableFactor::Table {
984 name, alias, args, ..
985 } => {
986 if let Some(args) = args {
987 return self.execute_table_function(name, alias, &args.args);
988 }
989 let tbl_name = name.0.first().unwrap().value.as_str();
990 if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
991 match alias {
992 Some(alias) => {
993 self.table_aliases
994 .borrow_mut()
995 .insert(alias.name.value.clone(), tbl_name.to_string());
996 Ok((alias.to_string(), lf))
997 },
998 None => Ok((tbl_name.to_string(), lf)),
999 }
1000 } else {
1001 polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
1002 }
1003 },
1004 TableFactor::Derived {
1005 lateral,
1006 subquery,
1007 alias,
1008 } => {
1009 polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
1010 if let Some(alias) = alias {
1011 let mut lf = self.execute_query_no_ctes(subquery)?;
1012 lf = self.rename_columns_from_table_alias(lf, alias)?;
1013 self.table_map.insert(alias.name.value.clone(), lf.clone());
1014 Ok((alias.name.value.clone(), lf))
1015 } else {
1016 polars_bail!(SQLSyntax: "derived tables must have aliases");
1017 }
1018 },
1019 TableFactor::UNNEST {
1020 alias,
1021 array_exprs,
1022 with_offset,
1023 with_offset_alias: _,
1024 ..
1025 } => {
1026 if let Some(alias) = alias {
1027 let table_name = alias.name.value.clone();
1028 let column_names: Vec<Option<PlSmallStr>> = alias
1029 .columns
1030 .iter()
1031 .map(|c| {
1032 if c.name.value.is_empty() {
1033 None
1034 } else {
1035 Some(PlSmallStr::from_str(c.name.value.as_str()))
1036 }
1037 })
1038 .collect();
1039
1040 let column_values: Vec<Series> = array_exprs
1041 .iter()
1042 .map(|arr| parse_sql_array(arr, self))
1043 .collect::<Result<_, _>>()?;
1044
1045 polars_ensure!(!column_names.is_empty(),
1046 SQLSyntax:
1047 "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
1048 );
1049 if column_names.len() != column_values.len() {
1050 let plural = if column_values.len() > 1 { "s" } else { "" };
1051 polars_bail!(
1052 SQLSyntax:
1053 "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
1054 );
1055 }
1056 let column_series: Vec<Column> = column_values
1057 .into_iter()
1058 .zip(column_names)
1059 .map(|(s, name)| {
1060 if let Some(name) = name {
1061 s.clone().with_name(name)
1062 } else {
1063 s.clone()
1064 }
1065 })
1066 .map(Column::from)
1067 .collect();
1068
1069 let lf = DataFrame::new(column_series)?.lazy();
1070 if *with_offset {
1071 polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH OFFSET/ORDINALITY");
1074 }
1075 self.table_map.insert(table_name.clone(), lf.clone());
1076 Ok((table_name.clone(), lf))
1077 } else {
1078 polars_bail!(SQLSyntax: "UNNEST table must have an alias");
1079 }
1080 },
1081 TableFactor::NestedJoin {
1082 table_with_joins,
1083 alias,
1084 } => {
1085 let lf = self.execute_from_statement(table_with_joins)?;
1086 match alias {
1087 Some(a) => Ok((a.name.value.clone(), lf)),
1088 None => Ok(("".to_string(), lf)),
1089 }
1090 },
1091 _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
1093 }
1094 }
1095
1096 fn execute_table_function(
1097 &mut self,
1098 name: &ObjectName,
1099 alias: &Option<TableAlias>,
1100 args: &[FunctionArg],
1101 ) -> PolarsResult<(String, LazyFrame)> {
1102 let tbl_fn = name.0.first().unwrap().value.as_str();
1103 let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
1104 let (tbl_name, lf) = read_fn.execute(args)?;
1105 #[allow(clippy::useless_asref)]
1106 let tbl_name = alias
1107 .as_ref()
1108 .map(|a| a.name.value.clone())
1109 .unwrap_or_else(|| tbl_name);
1110
1111 self.table_map.insert(tbl_name.clone(), lf.clone());
1112 Ok((tbl_name, lf))
1113 }
1114
1115 fn process_order_by(
1116 &mut self,
1117 mut lf: LazyFrame,
1118 order_by: &Option<OrderBy>,
1119 selected: Option<&[Expr]>,
1120 ) -> PolarsResult<LazyFrame> {
1121 if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
1122 return Ok(lf);
1123 }
1124 let schema = self.get_frame_schema(&mut lf)?;
1125 let columns_iter = schema.iter_names().map(|e| col(e.clone()));
1126
1127 let order_by = order_by.as_ref().unwrap().exprs.clone();
1128 let mut descending = Vec::with_capacity(order_by.len());
1129 let mut nulls_last = Vec::with_capacity(order_by.len());
1130 let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());
1131
1132 if order_by.len() == 1 && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
1134 {
1135 if let Some(selected) = selected {
1136 by.extend(selected.iter().cloned());
1137 } else {
1138 by.extend(columns_iter);
1139 };
1140 let desc_order = !order_by[0].asc.unwrap_or(true);
1141 nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
1142 descending.resize(by.len(), desc_order);
1143 } else {
1144 let columns = &columns_iter.collect::<Vec<_>>();
1145 for ob in order_by {
1146 let desc_order = !ob.asc.unwrap_or(true);
1149 nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
1150 descending.push(desc_order);
1151
1152 by.push(self.expr_or_ordinal(
1154 &ob.expr,
1155 columns,
1156 selected,
1157 Some(&schema),
1158 "ORDER BY",
1159 )?)
1160 }
1161 }
1162 Ok(lf.sort_by_exprs(
1163 &by,
1164 SortMultipleOptions::default()
1165 .with_order_descending_multi(descending)
1166 .with_nulls_last_multi(nulls_last)
1167 .with_maintain_order(true),
1168 ))
1169 }
1170
1171 fn process_group_by(
1172 &mut self,
1173 mut lf: LazyFrame,
1174 group_by_keys: &[Expr],
1175 projections: &[Expr],
1176 ) -> PolarsResult<LazyFrame> {
1177 let schema_before = self.get_frame_schema(&mut lf)?;
1178 let group_by_keys_schema =
1179 expressions_to_schema(group_by_keys, &schema_before, Context::Default)?;
1180
1181 let mut aggregation_projection = Vec::with_capacity(projections.len());
1183 let mut projection_overrides = PlHashMap::with_capacity(projections.len());
1184 let mut projection_aliases = PlHashSet::new();
1185 let mut group_key_aliases = PlHashSet::new();
1186
1187 for mut e in projections {
1188 let is_agg_or_window = has_expr(e, |e| {
1190 matches!(e, Expr::Agg(_) | Expr::Len | Expr::Window { .. })
1191 });
1192
1193 if let Expr::Alias(expr, alias) = e {
1195 if e.clone().meta().is_simple_projection() {
1196 group_key_aliases.insert(alias.as_ref());
1197 e = expr
1198 } else if let Expr::Function {
1199 function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
1200 ..
1201 } = expr.deref()
1202 {
1203 projection_overrides
1204 .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
1205 } else if !is_agg_or_window && !group_by_keys_schema.contains(alias) {
1206 projection_aliases.insert(alias.as_ref());
1207 }
1208 }
1209 let field = e.to_field(&schema_before, Context::Default)?;
1210 if group_by_keys_schema.get(&field.name).is_none() && is_agg_or_window {
1211 let mut e = e.clone();
1212 if let Expr::Agg(AggExpr::Implode(expr)) = &e {
1213 e = (**expr).clone();
1214 } else if let Expr::Alias(expr, name) = &e {
1215 if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
1216 e = (**expr).clone().alias(name.clone());
1217 }
1218 }
1219 aggregation_projection.push(e);
1220 } else if let Expr::Column(_)
1221 | Expr::Function {
1222 function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
1223 ..
1224 } = e
1225 {
1226 if !group_by_keys_schema.contains(&field.name) {
1228 polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
1229 }
1230 }
1231 }
1232 let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
1233 let projection_schema =
1234 expressions_to_schema(projections, &schema_before, Context::Default)?;
1235
1236 let final_projection = projection_schema
1238 .iter_names()
1239 .zip(projections)
1240 .map(|(name, projection_expr)| {
1241 if let Some(expr) = projection_overrides.get(name.as_str()) {
1242 expr.clone()
1243 } else if group_by_keys_schema.get(name).is_some()
1244 || projection_aliases.contains(name.as_str())
1245 || group_key_aliases.contains(name.as_str())
1246 {
1247 projection_expr.clone()
1248 } else {
1249 col(name.clone())
1250 }
1251 })
1252 .collect::<Vec<_>>();
1253
1254 Ok(aggregated.select(&final_projection))
1255 }
1256
1257 fn process_limit_offset(
1258 &self,
1259 lf: LazyFrame,
1260 limit: &Option<SQLExpr>,
1261 offset: &Option<Offset>,
1262 ) -> PolarsResult<LazyFrame> {
1263 match (offset, limit) {
1264 (
1265 Some(Offset {
1266 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1267 ..
1268 }),
1269 Some(SQLExpr::Value(SQLValue::Number(limit, _))),
1270 ) => Ok(lf.slice(
1271 offset
1272 .parse()
1273 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1274 limit
1275 .parse()
1276 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1277 )),
1278 (
1279 Some(Offset {
1280 value: SQLExpr::Value(SQLValue::Number(offset, _)),
1281 ..
1282 }),
1283 None,
1284 ) => Ok(lf.slice(
1285 offset
1286 .parse()
1287 .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
1288 IdxSize::MAX,
1289 )),
1290 (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
1291 limit
1292 .parse()
1293 .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
1294 )),
1295 (None, None) => Ok(lf),
1296 _ => polars_bail!(
1297 SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
1298 ),
1299 }
1300 }
1301
1302 fn process_qualified_wildcard(
1303 &mut self,
1304 ObjectName(idents): &ObjectName,
1305 options: &WildcardAdditionalOptions,
1306 modifiers: &mut SelectModifiers,
1307 schema: Option<&Schema>,
1308 ) -> PolarsResult<Vec<Expr>> {
1309 let mut new_idents = idents.clone();
1310 new_idents.push(Ident::new("*"));
1311
1312 let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
1313 self.process_wildcard_additional_options(expr?, options, modifiers, schema)
1314 }
1315
1316 fn process_wildcard_additional_options(
1317 &mut self,
1318 exprs: Vec<Expr>,
1319 options: &WildcardAdditionalOptions,
1320 modifiers: &mut SelectModifiers,
1321 schema: Option<&Schema>,
1322 ) -> PolarsResult<Vec<Expr>> {
1323 if options.opt_except.is_some() && options.opt_exclude.is_some() {
1324 polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
1325 } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
1326 polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
1327 }
1328
1329 if let Some(items) = &options.opt_exclude {
1331 match items {
1332 ExcludeSelectItem::Single(ident) => {
1333 modifiers.exclude.insert(ident.value.clone());
1334 },
1335 ExcludeSelectItem::Multiple(idents) => {
1336 modifiers
1337 .exclude
1338 .extend(idents.iter().map(|i| i.value.clone()));
1339 },
1340 };
1341 }
1342
1343 if let Some(items) = &options.opt_except {
1345 modifiers.exclude.insert(items.first_element.value.clone());
1346 modifiers
1347 .exclude
1348 .extend(items.additional_elements.iter().map(|i| i.value.clone()));
1349 }
1350
1351 if let Some(item) = &options.opt_ilike {
1353 let rx = regex::escape(item.pattern.as_str())
1354 .replace('%', ".*")
1355 .replace('_', ".");
1356
1357 modifiers.ilike = Some(regex::Regex::new(format!("^(?is){}$", rx).as_str()).unwrap());
1358 }
1359
1360 if let Some(items) = &options.opt_rename {
1362 let renames = match items {
1363 RenameSelectItem::Single(rename) => vec![rename],
1364 RenameSelectItem::Multiple(renames) => renames.iter().collect(),
1365 };
1366 for rn in renames {
1367 let (before, after) = (rn.ident.value.as_str(), rn.alias.value.as_str());
1368 let (before, after) = (PlSmallStr::from_str(before), PlSmallStr::from_str(after));
1369 if before != after {
1370 modifiers.rename.insert(before, after);
1371 }
1372 }
1373 }
1374
1375 if let Some(replacements) = &options.opt_replace {
1377 for rp in &replacements.items {
1378 let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
1379 modifiers
1380 .replace
1381 .push(replacement_expr?.alias(rp.column_name.value.as_str()));
1382 }
1383 }
1384 Ok(exprs)
1385 }
1386
1387 fn rename_columns_from_table_alias(
1388 &mut self,
1389 mut lf: LazyFrame,
1390 alias: &TableAlias,
1391 ) -> PolarsResult<LazyFrame> {
1392 if alias.columns.is_empty() {
1393 Ok(lf)
1394 } else {
1395 let schema = self.get_frame_schema(&mut lf)?;
1396 if alias.columns.len() != schema.len() {
1397 polars_bail!(
1398 SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
1399 alias.columns.len(), alias.name.value, schema.len()
1400 )
1401 } else {
1402 let existing_columns: Vec<_> = schema.iter_names().collect();
1403 let new_columns: Vec<_> =
1404 alias.columns.iter().map(|c| c.name.value.clone()).collect();
1405 Ok(lf.rename(existing_columns, new_columns, true))
1406 }
1407 }
1408 }
1409}
1410
1411impl SQLContext {
1412 pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
1414 self.table_map.clone()
1415 }
1416
1417 pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
1419 Self {
1420 table_map,
1421 ..Default::default()
1422 }
1423 }
1424}
1425
1426fn collect_compound_identifiers(
1427 left: &[Ident],
1428 right: &[Ident],
1429 left_name: &str,
1430 right_name: &str,
1431) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1432 if left.len() == 2 && right.len() == 2 {
1433 let (tbl_a, col_name_a) = (left[0].value.as_str(), left[1].value.as_str());
1434 let (tbl_b, col_name_b) = (right[0].value.as_str(), right[1].value.as_str());
1435
1436 if left_name == tbl_b || right_name == tbl_a {
1438 Ok((vec![col(col_name_b)], vec![col(col_name_a)]))
1439 } else {
1440 Ok((vec![col(col_name_a)], vec![col(col_name_b)]))
1441 }
1442 } else {
1443 polars_bail!(SQLInterface: "collect_compound_identifiers: Expected left.len() == 2 && right.len() == 2, but found left.len() == {:?}, right.len() == {:?}", left.len(), right.len());
1444 }
1445}
1446
1447fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
1448 match expr {
1449 Expr::Wildcard => schema
1450 .iter_names()
1451 .map(|name| col(name.clone()))
1452 .collect::<Vec<_>>(),
1453 Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
1454 let rx = regex::Regex::new(&nm).unwrap();
1455 schema
1456 .iter_names()
1457 .filter(|name| rx.is_match(name))
1458 .map(|name| col(name.clone()))
1459 .collect::<Vec<_>>()
1460 },
1461 Expr::Columns(names) => names
1462 .iter()
1463 .map(|name| col(name.clone()))
1464 .collect::<Vec<_>>(),
1465 _ => vec![expr],
1466 }
1467}
1468
1469fn is_regex_colname(nm: &str) -> bool {
1470 nm.starts_with('^') && nm.ends_with('$')
1471}
1472
1473fn process_join_on(
1474 expression: &sqlparser::ast::Expr,
1475 tbl_left: &TableInfo,
1476 tbl_right: &TableInfo,
1477) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1478 match expression {
1479 SQLExpr::BinaryOp { left, op, right } => match op {
1480 BinaryOperator::And => {
1481 let (mut left_i, mut right_i) = process_join_on(left, tbl_left, tbl_right)?;
1482 let (mut left_j, mut right_j) = process_join_on(right, tbl_left, tbl_right)?;
1483 left_i.append(&mut left_j);
1484 right_i.append(&mut right_j);
1485 Ok((left_i, right_i))
1486 },
1487 BinaryOperator::Eq => match (left.as_ref(), right.as_ref()) {
1488 (SQLExpr::CompoundIdentifier(left), SQLExpr::CompoundIdentifier(right)) => {
1489 collect_compound_identifiers(left, right, &tbl_left.name, &tbl_right.name)
1490 },
1491 _ => {
1492 polars_bail!(SQLInterface: "only equi-join constraints (on identifiers) are currently supported; found lhs={:?}, rhs={:?}", left, right)
1493 },
1494 },
1495 _ => {
1496 polars_bail!(SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op)
1497 },
1498 },
1499 SQLExpr::Nested(expr) => process_join_on(expr, tbl_left, tbl_right),
1500 _ => {
1501 polars_bail!(SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", expression)
1502 },
1503 }
1504}
1505
1506fn process_join_constraint(
1507 constraint: &JoinConstraint,
1508 tbl_left: &TableInfo,
1509 tbl_right: &TableInfo,
1510) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
1511 match constraint {
1512 JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
1513 process_join_on(expr, tbl_left, tbl_right)
1514 },
1515 JoinConstraint::Using(idents) if !idents.is_empty() => {
1516 let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
1517 Ok((using.clone(), using))
1518 },
1519 JoinConstraint::Natural => {
1520 let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
1521 let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
1522 let on: Vec<Expr> = left_names
1523 .intersection(&right_names)
1524 .map(|&name| col(name.clone()))
1525 .collect();
1526 if on.is_empty() {
1527 polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
1528 }
1529 Ok((on.clone(), on))
1530 },
1531 _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
1532 }
1533}