polars_mem_engine/planner/
lp.rs

1use polars_core::POOL;
2use polars_core::prelude::*;
3use polars_expr::state::ExecutionState;
4use polars_plan::plans::expr_ir::ExprIR;
5use polars_utils::unique_id::UniqueId;
6use recursive::recursive;
7
8#[cfg(feature = "python")]
9use self::python_dsl::PythonScanSource;
10use super::*;
11use crate::executors::{
12    self, CachePrefiller, Executor, GroupByStreamingExec, PartitionedSinkExecutor, SinkExecutor,
13    sink_name,
14};
15use crate::scan_predicate::functions::create_scan_predicate;
16
17pub type StreamingExecutorBuilder =
18    fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
19
20fn partitionable_gb(
21    keys: &[ExprIR],
22    aggs: &[ExprIR],
23    input_schema: &Schema,
24    expr_arena: &Arena<AExpr>,
25    apply: &Option<PlanCallback<DataFrame, DataFrame>>,
26) -> bool {
27    // checks:
28    //      1. complex expressions in the group_by itself are also not partitionable
29    //          in this case anything more than col("foo")
30    //      2. a custom function cannot be partitioned
31    //      3. we don't bother with more than 2 keys, as the cardinality likely explodes
32    //         by the combinations
33    if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
34        // complex expressions in the group_by itself are also not partitionable
35        // in this case anything more than col("foo")
36        for key in keys {
37            if (expr_arena).iter(key.node()).count() > 1
38                || has_aexpr(key.node(), expr_arena, |ae| match ae {
39                    AExpr::Literal(lv) => !lv.is_scalar(),
40                    _ => false,
41                })
42            {
43                return false;
44            }
45        }
46
47        can_pre_agg_exprs(aggs, expr_arena, input_schema)
48    } else {
49        false
50    }
51}
52
53#[derive(Clone)]
54struct ConversionState {
55    has_cache_child: bool,
56    has_cache_parent: bool,
57}
58
59impl ConversionState {
60    fn new() -> PolarsResult<Self> {
61        Ok(ConversionState {
62            has_cache_child: false,
63            has_cache_parent: false,
64        })
65    }
66
67    fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
68        let mut new_state = self.clone();
69        new_state.has_cache_child = false;
70        let out = func(&mut new_state);
71        self.has_cache_child = new_state.has_cache_child;
72        out
73    }
74}
75
76pub fn create_physical_plan(
77    root: Node,
78    lp_arena: &mut Arena<IR>,
79    expr_arena: &mut Arena<AExpr>,
80    build_streaming_executor: Option<StreamingExecutorBuilder>,
81) -> PolarsResult<Box<dyn Executor>> {
82    let mut state = ConversionState::new()?;
83    let mut cache_nodes = Default::default();
84    let plan = create_physical_plan_impl(
85        root,
86        lp_arena,
87        expr_arena,
88        &mut state,
89        &mut cache_nodes,
90        build_streaming_executor,
91    )?;
92
93    if cache_nodes.is_empty() {
94        Ok(plan)
95    } else {
96        Ok(Box::new(CachePrefiller {
97            caches: cache_nodes,
98            phys_plan: plan,
99        }))
100    }
101}
102
103pub struct MultiplePhysicalPlans {
104    pub cache_prefiller: Option<Box<dyn Executor>>,
105    pub physical_plans: Vec<Box<dyn Executor>>,
106}
107pub fn create_multiple_physical_plans(
108    roots: &[Node],
109    lp_arena: &mut Arena<IR>,
110    expr_arena: &mut Arena<AExpr>,
111    build_streaming_executor: Option<StreamingExecutorBuilder>,
112) -> PolarsResult<MultiplePhysicalPlans> {
113    let mut state = ConversionState::new()?;
114    let mut cache_nodes = Default::default();
115    let plans = state.with_new_branch(|new_state| {
116        roots
117            .iter()
118            .map(|&node| {
119                create_physical_plan_impl(
120                    node,
121                    lp_arena,
122                    expr_arena,
123                    new_state,
124                    &mut cache_nodes,
125                    build_streaming_executor,
126                )
127            })
128            .collect::<PolarsResult<Vec<_>>>()
129    })?;
130
131    let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
132        struct Empty;
133        impl Executor for Empty {
134            fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
135                Ok(DataFrame::empty())
136            }
137        }
138        Box::new(CachePrefiller {
139            caches: cache_nodes,
140            phys_plan: Box::new(Empty),
141        }) as _
142    });
143
144    Ok(MultiplePhysicalPlans {
145        cache_prefiller,
146        physical_plans: plans,
147    })
148}
149
150#[cfg(feature = "python")]
151#[allow(clippy::type_complexity)]
152pub fn python_scan_predicate(
153    options: &mut PythonOptions,
154    expr_arena: &Arena<AExpr>,
155    state: &mut ExpressionConversionState,
156) -> PolarsResult<(
157    Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
158    Option<Vec<u8>>,
159)> {
160    let mut predicate_serialized = None;
161    let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
162        // Convert to a pyarrow eval string.
163        if matches!(options.python_source, PythonScanSource::Pyarrow) {
164            use polars_core::config::verbose_print_sensitive;
165
166            let predicate_pa = polars_plan::plans::python::pyarrow::predicate_to_pa(
167                e.node(),
168                expr_arena,
169                Default::default(),
170            );
171
172            verbose_print_sensitive(|| {
173                format!(
174                    "python_scan_predicate: \
175                    predicate node: {}, \
176                    converted pyarrow predicate: {}",
177                    ExprIRDisplay::display_node(e.node(), expr_arena),
178                    &predicate_pa.as_deref().unwrap_or("<conversion failed>")
179                )
180            });
181
182            if let Some(eval_str) = predicate_pa {
183                options.predicate = PythonPredicate::PyArrow(eval_str);
184                // We don't have to use a physical expression as pyarrow deals with the filter.
185                None
186            } else {
187                Some(create_physical_expr(
188                    e,
189                    Context::Default,
190                    expr_arena,
191                    &options.schema,
192                    state,
193                )?)
194            }
195        }
196        // Convert to physical expression for the case the reader cannot consume the predicate.
197        else {
198            let dsl_expr = e.to_expr(expr_arena);
199            predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
200
201            Some(create_physical_expr(
202                e,
203                Context::Default,
204                expr_arena,
205                &options.schema,
206                state,
207            )?)
208        }
209    } else {
210        None
211    };
212
213    Ok((predicate, predicate_serialized))
214}
215
216#[recursive]
217fn create_physical_plan_impl(
218    root: Node,
219    lp_arena: &mut Arena<IR>,
220    expr_arena: &mut Arena<AExpr>,
221    state: &mut ConversionState,
222    // Cache nodes in order of discovery
223    cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,
224    build_streaming_executor: Option<StreamingExecutorBuilder>,
225) -> PolarsResult<Box<dyn Executor>> {
226    use IR::*;
227
228    macro_rules! recurse {
229        ($node:expr, $state: expr) => {
230            create_physical_plan_impl(
231                $node,
232                lp_arena,
233                expr_arena,
234                $state,
235                cache_nodes,
236                build_streaming_executor,
237            )
238        };
239    }
240
241    let logical_plan = if state.has_cache_parent
242        || matches!(
243            lp_arena.get(root),
244            IR::Scan { .. } // Needed for the streaming impl
245                | IR::Cache { .. } // Needed for plans branching from the same cache node
246                | IR::GroupBy { .. } // Needed for the streaming impl
247                | IR::Sink { // Needed for the streaming impl
248                    payload: SinkTypeIR::Partition(_),
249                    ..
250                }
251        ) {
252        lp_arena.get(root).clone()
253    } else {
254        lp_arena.take(root)
255    };
256
257    match logical_plan {
258        #[cfg(feature = "python")]
259        PythonScan { mut options } => {
260            let mut expr_conv_state = ExpressionConversionState::new(true);
261            let (predicate, predicate_serialized) =
262                python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
263            Ok(Box::new(executors::PythonScanExec {
264                options,
265                predicate,
266                predicate_serialized,
267            }))
268        },
269        Sink { input, payload } => {
270            let input = recurse!(input, state)?;
271            match payload {
272                SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
273                    input,
274                    name: "mem".to_string(),
275                    f: Box::new(move |df, _state| Ok(Some(df))),
276                })),
277                SinkTypeIR::Callback(CallbackSinkType {
278                    function,
279                    maintain_order: _,
280                    chunk_size,
281                }) => {
282                    let chunk_size = chunk_size.map_or(usize::MAX, Into::into);
283
284                    Ok(Box::new(SinkExecutor {
285                        input,
286                        name: "batches".to_string(),
287                        f: Box::new(move |mut buffer, _state| {
288                            while !buffer.is_empty() {
289                                let df;
290                                (df, buffer) =
291                                    buffer.split_at(buffer.height().min(chunk_size) as i64);
292                                let should_stop = function.call(df)?;
293                                if should_stop {
294                                    break;
295                                }
296                            }
297                            Ok(Some(DataFrame::empty()))
298                        }),
299                    }))
300                },
301                SinkTypeIR::File(FileSinkType {
302                    file_type,
303                    target,
304                    sink_options,
305                    cloud_options,
306                }) => {
307                    let name = sink_name(&file_type).to_owned();
308                    Ok(Box::new(SinkExecutor {
309                        input,
310                        name,
311                        f: Box::new(move |mut df, _state| {
312                            let mut file = target
313                                .open_into_writeable(&sink_options, cloud_options.as_ref())?;
314                            let writer = &mut *file;
315
316                            use std::io::BufWriter;
317                            match &file_type {
318                                #[cfg(feature = "parquet")]
319                                FileType::Parquet(options) => {
320                                    use polars_io::parquet::write::ParquetWriter;
321                                    ParquetWriter::new(BufWriter::new(writer))
322                                        .with_compression(options.compression)
323                                        .with_statistics(options.statistics)
324                                        .with_row_group_size(options.row_group_size)
325                                        .with_data_page_size(options.data_page_size)
326                                        .with_key_value_metadata(options.key_value_metadata.clone())
327                                        .finish(&mut df)?;
328                                },
329                                #[cfg(feature = "ipc")]
330                                FileType::Ipc(options) => {
331                                    use polars_io::SerWriter;
332                                    use polars_io::ipc::IpcWriter;
333                                    IpcWriter::new(BufWriter::new(writer))
334                                        .with_compression(options.compression)
335                                        .with_compat_level(options.compat_level)
336                                        .finish(&mut df)?;
337                                },
338                                #[cfg(feature = "csv")]
339                                FileType::Csv(options) => {
340                                    use polars_io::SerWriter;
341                                    use polars_io::csv::write::CsvWriter;
342                                    CsvWriter::new(BufWriter::new(writer))
343                                        .include_bom(options.include_bom)
344                                        .include_header(options.include_header)
345                                        .with_separator(options.serialize_options.separator)
346                                        .with_line_terminator(
347                                            options.serialize_options.line_terminator.clone(),
348                                        )
349                                        .with_quote_char(options.serialize_options.quote_char)
350                                        .with_batch_size(options.batch_size)
351                                        .with_datetime_format(
352                                            options.serialize_options.datetime_format.clone(),
353                                        )
354                                        .with_date_format(
355                                            options.serialize_options.date_format.clone(),
356                                        )
357                                        .with_time_format(
358                                            options.serialize_options.time_format.clone(),
359                                        )
360                                        .with_float_scientific(
361                                            options.serialize_options.float_scientific,
362                                        )
363                                        .with_float_precision(
364                                            options.serialize_options.float_precision,
365                                        )
366                                        .with_decimal_comma(options.serialize_options.decimal_comma)
367                                        .with_null_value(options.serialize_options.null.clone())
368                                        .with_quote_style(options.serialize_options.quote_style)
369                                        .finish(&mut df)?;
370                                },
371                                #[cfg(feature = "json")]
372                                FileType::Json(_options) => {
373                                    use polars_io::SerWriter;
374                                    use polars_io::json::{JsonFormat, JsonWriter};
375
376                                    JsonWriter::new(BufWriter::new(writer))
377                                        .with_json_format(JsonFormat::JsonLines)
378                                        .finish(&mut df)?;
379                                },
380                                #[allow(unreachable_patterns)]
381                                _ => panic!("enable filetype feature"),
382                            }
383
384                            file.sync_on_close(sink_options.sync_on_close)?;
385                            file.close()?;
386
387                            Ok(None)
388                        }),
389                    }))
390                },
391                SinkTypeIR::Partition(_) => {
392                    let builder = build_streaming_executor
393                        .expect("invalid build. Missing feature new-streaming");
394
395                    let executor = Box::new(PartitionedSinkExecutor::new(
396                        input, builder, root, lp_arena, expr_arena,
397                    ));
398
399                    // Use cache so that this runs during the cache pre-filling stage and not on the
400                    // thread pool, it could deadlock since the streaming engine uses the thread
401                    // pool internally.
402                    let mut prefill = executors::CachePrefill::new_sink(executor);
403                    let exec = prefill.make_exec();
404                    let existing = cache_nodes.insert(prefill.id(), prefill);
405
406                    assert!(existing.is_none());
407
408                    Ok(Box::new(exec))
409                },
410            }
411        },
412        SinkMultiple { .. } => {
413            unreachable!("should be handled with create_multiple_physical_plans")
414        },
415        Union { inputs, options } => {
416            let inputs = state.with_new_branch(|new_state| {
417                inputs
418                    .into_iter()
419                    .map(|node| recurse!(node, new_state))
420                    .collect::<PolarsResult<Vec<_>>>()
421            });
422            let inputs = inputs?;
423            Ok(Box::new(executors::UnionExec { inputs, options }))
424        },
425        HConcat {
426            inputs, options, ..
427        } => {
428            let inputs = state.with_new_branch(|new_state| {
429                inputs
430                    .into_iter()
431                    .map(|node| recurse!(node, new_state))
432                    .collect::<PolarsResult<Vec<_>>>()
433            });
434
435            let inputs = inputs?;
436
437            Ok(Box::new(executors::HConcatExec { inputs, options }))
438        },
439        Slice { input, offset, len } => {
440            let input = recurse!(input, state)?;
441            Ok(Box::new(executors::SliceExec { input, offset, len }))
442        },
443        Filter { input, predicate } => {
444            let streamable = is_elementwise_rec(predicate.node(), expr_arena);
445            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
446            let input = recurse!(input, state)?;
447            let mut state = ExpressionConversionState::new(true);
448            let predicate = create_physical_expr(
449                &predicate,
450                Context::Default,
451                expr_arena,
452                &input_schema,
453                &mut state,
454            )?;
455            Ok(Box::new(executors::FilterExec::new(
456                predicate,
457                input,
458                state.has_windows,
459                streamable,
460            )))
461        },
462        #[allow(unused_variables)]
463        Scan {
464            sources,
465            file_info,
466            hive_parts,
467            output_schema,
468            scan_type,
469            predicate,
470            predicate_file_skip_applied,
471            unified_scan_args,
472        } => {
473            let mut expr_conversion_state = ExpressionConversionState::new(true);
474
475            let mut create_skip_batch_predicate = unified_scan_args.table_statistics.is_some();
476            #[cfg(feature = "parquet")]
477            {
478                create_skip_batch_predicate |= matches!(
479                    &*scan_type,
480                    FileScanIR::Parquet {
481                        options: polars_io::prelude::ParquetOptions {
482                            use_statistics: true,
483                            ..
484                        },
485                        ..
486                    }
487                );
488            }
489
490            let predicate = predicate
491                .map(|predicate| {
492                    create_scan_predicate(
493                        &predicate,
494                        expr_arena,
495                        output_schema.as_ref().unwrap_or(&file_info.schema),
496                        None, // hive_schema
497                        &mut expr_conversion_state,
498                        create_skip_batch_predicate,
499                        false,
500                    )
501                })
502                .transpose()?;
503
504            match *scan_type {
505                FileScanIR::Anonymous { function, .. } => {
506                    Ok(Box::new(executors::AnonymousScanExec {
507                        function,
508                        predicate,
509                        unified_scan_args,
510                        file_info,
511                        output_schema,
512                        predicate_has_windows: expr_conversion_state.has_windows,
513                    }))
514                },
515                #[allow(unreachable_patterns)]
516                _ => {
517                    // We wrap in a CacheExec so that the new-streaming scan gets called from the
518                    // CachePrefiller. This ensures it is called from outside of rayon to avoid
519                    // deadlocks.
520                    //
521                    // Note that we don't actually want it to be kept in memory after being used,
522                    // so we set the count to have it be dropped after a single use (or however
523                    // many times it is referenced after CSE (subplan)).
524                    state.has_cache_parent = true;
525                    state.has_cache_child = true;
526
527                    let build_func = build_streaming_executor
528                        .expect("invalid build. Missing feature new-streaming");
529
530                    let executor = build_func(root, lp_arena, expr_arena)?;
531
532                    let mut prefill = executors::CachePrefill::new_scan(executor);
533                    let exec = prefill.make_exec();
534
535                    let existing = cache_nodes.insert(prefill.id(), prefill);
536
537                    assert!(existing.is_none());
538
539                    Ok(Box::new(exec))
540                },
541                #[allow(unreachable_patterns)]
542                _ => unreachable!(),
543            }
544        },
545
546        Select {
547            expr,
548            input,
549            schema: _schema,
550            options,
551            ..
552        } => {
553            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
554            let input = recurse!(input, state)?;
555            let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
556            let phys_expr = create_physical_expressions_from_irs(
557                &expr,
558                Context::Default,
559                expr_arena,
560                &input_schema,
561                &mut state,
562            )?;
563
564            let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
565                // If all columns are literal we would get a 1 row per thread.
566                && !phys_expr.iter().all(|p| {
567                    p.is_literal()
568                });
569
570            Ok(Box::new(executors::ProjectionExec {
571                input,
572                expr: phys_expr,
573                has_windows: state.has_windows,
574                input_schema,
575                #[cfg(test)]
576                schema: _schema,
577                options,
578                allow_vertical_parallelism,
579            }))
580        },
581        DataFrameScan {
582            df, output_schema, ..
583        } => Ok(Box::new(executors::DataFrameExec {
584            df,
585            projection: output_schema.map(|s| s.iter_names_cloned().collect()),
586        })),
587        Sort {
588            input,
589            by_column,
590            slice,
591            sort_options,
592        } => {
593            debug_assert!(!by_column.is_empty());
594            let input_schema = lp_arena.get(input).schema(lp_arena);
595            let by_column = create_physical_expressions_from_irs(
596                &by_column,
597                Context::Default,
598                expr_arena,
599                input_schema.as_ref(),
600                &mut ExpressionConversionState::new(true),
601            )?;
602            let input = recurse!(input, state)?;
603            Ok(Box::new(executors::SortExec {
604                input,
605                by_column,
606                slice,
607                sort_options,
608            }))
609        },
610        Cache { input, id } => {
611            state.has_cache_parent = true;
612            state.has_cache_child = true;
613
614            if let Some(cache) = cache_nodes.get_mut(&id) {
615                Ok(Box::new(cache.make_exec()))
616            } else {
617                let input = recurse!(input, state)?;
618
619                let mut prefill = executors::CachePrefill::new_cache(input, id);
620                let exec = prefill.make_exec();
621
622                cache_nodes.insert(id, prefill);
623
624                Ok(Box::new(exec))
625            }
626        },
627        Distinct { input, options } => {
628            let input = recurse!(input, state)?;
629            Ok(Box::new(executors::UniqueExec { input, options }))
630        },
631        GroupBy {
632            input,
633            keys,
634            aggs,
635            apply,
636            schema: _,
637            maintain_order,
638            options,
639        } => {
640            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
641            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
642            let phys_keys = create_physical_expressions_from_irs(
643                &keys,
644                Context::Default,
645                expr_arena,
646                &input_schema,
647                &mut ExpressionConversionState::new(true),
648            )?;
649            let phys_aggs = create_physical_expressions_from_irs(
650                &aggs,
651                Context::Aggregation,
652                expr_arena,
653                &input_schema,
654                &mut ExpressionConversionState::new(true),
655            )?;
656
657            let _slice = options.slice;
658            #[cfg(feature = "dynamic_group_by")]
659            if let Some(options) = options.dynamic {
660                let input = recurse!(input, state)?;
661                return Ok(Box::new(executors::GroupByDynamicExec {
662                    input,
663                    keys: phys_keys,
664                    aggs: phys_aggs,
665                    options,
666                    input_schema,
667                    slice: _slice,
668                    apply,
669                }));
670            }
671
672            #[cfg(feature = "dynamic_group_by")]
673            if let Some(options) = options.rolling {
674                let input = recurse!(input, state)?;
675                return Ok(Box::new(executors::GroupByRollingExec {
676                    input,
677                    keys: phys_keys,
678                    aggs: phys_aggs,
679                    options,
680                    input_schema,
681                    slice: _slice,
682                    apply,
683                }));
684            }
685
686            // We first check if we can partition the group_by on the latest moment.
687            let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
688            if partitionable {
689                let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
690                    if let Union { options, .. } = lp {
691                        options.from_partitioned_ds
692                    } else {
693                        false
694                    }
695                });
696                let builder =
697                    build_streaming_executor.expect("invalid build. Missing feature new-streaming");
698
699                let input = recurse!(input, state)?;
700                let executor = Box::new(GroupByStreamingExec::new(
701                    input,
702                    builder,
703                    root,
704                    lp_arena,
705                    expr_arena,
706                    phys_keys,
707                    phys_aggs,
708                    maintain_order,
709                    _slice,
710                    from_partitioned_ds,
711                ));
712
713                // Use cache so that this runs during the cache pre-filling stage and not on the
714                // thread pool, it could deadlock since the streaming engine uses the thread
715                // pool internally.
716                let mut prefill = executors::CachePrefill::new_sink(executor);
717                let exec = prefill.make_exec();
718                let existing = cache_nodes.insert(prefill.id(), prefill);
719
720                assert!(existing.is_none());
721
722                Ok(Box::new(exec))
723            } else {
724                let input = recurse!(input, state)?;
725                Ok(Box::new(executors::GroupByExec::new(
726                    input,
727                    phys_keys,
728                    phys_aggs,
729                    apply,
730                    maintain_order,
731                    input_schema,
732                    options.slice,
733                )))
734            }
735        },
736        Join {
737            input_left,
738            input_right,
739            left_on,
740            right_on,
741            options,
742            schema,
743            ..
744        } => {
745            let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
746            let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
747
748            let (input_left, input_right) = state.with_new_branch(|new_state| {
749                (
750                    recurse!(input_left, new_state),
751                    recurse!(input_right, new_state),
752                )
753            });
754            let input_left = input_left?;
755            let input_right = input_right?;
756
757            // Todo! remove the force option. It can deadlock.
758            let parallel = if options.force_parallel {
759                true
760            } else {
761                options.allow_parallel
762            };
763
764            let left_on = create_physical_expressions_from_irs(
765                &left_on,
766                Context::Default,
767                expr_arena,
768                &schema_left,
769                &mut ExpressionConversionState::new(true),
770            )?;
771            let right_on = create_physical_expressions_from_irs(
772                &right_on,
773                Context::Default,
774                expr_arena,
775                &schema_right,
776                &mut ExpressionConversionState::new(true),
777            )?;
778            let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
779
780            // Convert the join options, to the physical join options. This requires the physical
781            // planner, so we do this last minute.
782            let join_type_options = options
783                .options
784                .map(|o| {
785                    o.compile(|e| {
786                        let phys_expr = create_physical_expr(
787                            e,
788                            Context::Default,
789                            expr_arena,
790                            &schema,
791                            &mut ExpressionConversionState::new(false),
792                        )?;
793
794                        let execution_state = ExecutionState::default();
795
796                        Ok(Arc::new(move |df: DataFrame| {
797                            let mask = phys_expr.evaluate(&df, &execution_state)?;
798                            let mask = mask.as_materialized_series();
799                            let mask = mask.bool()?;
800                            df._filter_seq(mask)
801                        }))
802                    })
803                })
804                .transpose()?;
805
806            Ok(Box::new(executors::JoinExec::new(
807                input_left,
808                input_right,
809                left_on,
810                right_on,
811                parallel,
812                options.args,
813                join_type_options,
814            )))
815        },
816        HStack {
817            input,
818            exprs,
819            schema: output_schema,
820            options,
821        } => {
822            let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
823            let input = recurse!(input, state)?;
824
825            let allow_vertical_parallelism = options.should_broadcast
826                && exprs
827                    .iter()
828                    .all(|e| is_elementwise_rec(e.node(), expr_arena));
829
830            let mut state =
831                ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
832
833            let phys_exprs = create_physical_expressions_from_irs(
834                &exprs,
835                Context::Default,
836                expr_arena,
837                &input_schema,
838                &mut state,
839            )?;
840            Ok(Box::new(executors::StackExec {
841                input,
842                has_windows: state.has_windows,
843                exprs: phys_exprs,
844                input_schema,
845                output_schema,
846                options,
847                allow_vertical_parallelism,
848            }))
849        },
850        MapFunction {
851            input, function, ..
852        } => {
853            let input = recurse!(input, state)?;
854            Ok(Box::new(executors::UdfExec { input, function }))
855        },
856        ExtContext {
857            input, contexts, ..
858        } => {
859            let input = recurse!(input, state)?;
860            let contexts = contexts
861                .into_iter()
862                .map(|node| recurse!(node, state))
863                .collect::<PolarsResult<_>>()?;
864            Ok(Box::new(executors::ExternalContext { input, contexts }))
865        },
866        SimpleProjection { input, columns } => {
867            let input = recurse!(input, state)?;
868            let exec = executors::ProjectionSimple { input, columns };
869            Ok(Box::new(exec))
870        },
871        #[cfg(feature = "merge_sorted")]
872        MergeSorted {
873            input_left,
874            input_right,
875            key,
876        } => {
877            let (input_left, input_right) = state.with_new_branch(|new_state| {
878                (
879                    recurse!(input_left, new_state),
880                    recurse!(input_right, new_state),
881                )
882            });
883            let input_left = input_left?;
884            let input_right = input_right?;
885
886            let exec = executors::MergeSorted {
887                input_left,
888                input_right,
889                key,
890            };
891            Ok(Box::new(exec))
892        },
893        Invalid => unreachable!(),
894    }
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900
901    #[test]
902    fn test_create_multiple_physical_plans_reused_cache() {
903        // Check that reusing the same cache node doesn't panic.
904        // CSE creates duplicate cache nodes with the same ID, but cloud reuses them.
905
906        let mut ir = Arena::new();
907
908        let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
909        let scan = ir.add(IR::DataFrameScan {
910            df: Arc::new(DataFrame::empty_with_schema(&schema)),
911            schema: Arc::new(schema),
912            output_schema: None,
913        });
914
915        let cache = ir.add(IR::Cache {
916            input: scan,
917            id: UniqueId::new(),
918        });
919
920        let left_sink = ir.add(IR::Sink {
921            input: cache,
922            payload: SinkTypeIR::Memory,
923        });
924        let right_sink = ir.add(IR::Sink {
925            input: cache,
926            payload: SinkTypeIR::Memory,
927        });
928
929        let _multiplan = create_multiple_physical_plans(
930            &[left_sink, right_sink],
931            &mut ir,
932            &mut Arena::new(),
933            None,
934        )
935        .unwrap();
936    }
937}