lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::{
7    collections::HashMap,
8    fmt::{self, Formatter},
9    sync::{Arc, LazyLock, Mutex},
10    time::Duration,
11};
12
13use arrow_array::RecordBatch;
14use arrow_schema::Schema as ArrowSchema;
15use datafusion::{
16    catalog::streaming::StreamingTable,
17    dataframe::DataFrame,
18    execution::{
19        context::{SessionConfig, SessionContext},
20        disk_manager::DiskManagerBuilder,
21        memory_pool::FairSpillPool,
22        runtime_env::RuntimeEnvBuilder,
23        TaskContext,
24    },
25    physical_plan::{
26        analyze::AnalyzeExec,
27        display::DisplayableExecutionPlan,
28        execution_plan::{Boundedness, CardinalityEffect, EmissionType},
29        stream::RecordBatchStreamAdapter,
30        streaming::PartitionStream,
31        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
32    },
33};
34use datafusion_common::{DataFusionError, Statistics};
35use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
36
37use futures::{stream, StreamExt};
38use lance_arrow::SchemaExt;
39use lance_core::{
40    utils::{
41        futures::FinallyStreamExt,
42        tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
43    },
44    Error, Result,
45};
46use log::{debug, info, warn};
47use snafu::location;
48use tracing::Span;
49
50use crate::udf::register_functions;
51use crate::{
52    chunker::StrictBatchSizeStream,
53    utils::{
54        MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
55        IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
56    },
57};
58
59/// An source execution node created from an existing stream
60///
61/// It can only be used once, and will return the stream.  After that the node
62/// is exhausted.
63///
64/// Note: the stream should be finite, otherwise we will report datafusion properties
65/// incorrectly.
66pub struct OneShotExec {
67    stream: Mutex<Option<SendableRecordBatchStream>>,
68    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
69    // can still function after exhausted
70    schema: Arc<ArrowSchema>,
71    properties: PlanProperties,
72}
73
74impl OneShotExec {
75    /// Create a new instance from a given stream
76    pub fn new(stream: SendableRecordBatchStream) -> Self {
77        let schema = stream.schema();
78        Self {
79            stream: Mutex::new(Some(stream)),
80            schema: schema.clone(),
81            properties: PlanProperties::new(
82                EquivalenceProperties::new(schema),
83                Partitioning::RoundRobinBatch(1),
84                EmissionType::Incremental,
85                Boundedness::Bounded,
86            ),
87        }
88    }
89
90    pub fn from_batch(batch: RecordBatch) -> Self {
91        let schema = batch.schema();
92        let stream = Box::pin(RecordBatchStreamAdapter::new(
93            schema,
94            stream::iter(vec![Ok(batch)]),
95        ));
96        Self::new(stream)
97    }
98}
99
100impl std::fmt::Debug for OneShotExec {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        let stream = self.stream.lock().unwrap();
103        f.debug_struct("OneShotExec")
104            .field("exhausted", &stream.is_none())
105            .field("schema", self.schema.as_ref())
106            .finish()
107    }
108}
109
110impl DisplayAs for OneShotExec {
111    fn fmt_as(
112        &self,
113        t: datafusion::physical_plan::DisplayFormatType,
114        f: &mut std::fmt::Formatter,
115    ) -> std::fmt::Result {
116        let stream = self.stream.lock().unwrap();
117        let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
118        let columns = self
119            .schema
120            .field_names()
121            .iter()
122            .cloned()
123            .cloned()
124            .collect::<Vec<_>>();
125        match t {
126            DisplayFormatType::Default | DisplayFormatType::Verbose => {
127                write!(
128                    f,
129                    "OneShotStream: {}columns=[{}]",
130                    exhausted,
131                    columns.join(",")
132                )
133            }
134            DisplayFormatType::TreeRender => {
135                write!(
136                    f,
137                    "OneShotStream\nexhausted={}\ncolumns=[{}]",
138                    exhausted,
139                    columns.join(",")
140                )
141            }
142        }
143    }
144}
145
146impl ExecutionPlan for OneShotExec {
147    fn name(&self) -> &str {
148        "OneShotExec"
149    }
150
151    fn as_any(&self) -> &dyn std::any::Any {
152        self
153    }
154
155    fn schema(&self) -> arrow_schema::SchemaRef {
156        self.schema.clone()
157    }
158
159    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
160        vec![]
161    }
162
163    fn with_new_children(
164        self: Arc<Self>,
165        children: Vec<Arc<dyn ExecutionPlan>>,
166    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
167        // OneShotExec has no children, so this should only be called with an empty vector
168        if !children.is_empty() {
169            return Err(datafusion_common::DataFusionError::Internal(
170                "OneShotExec does not support children".to_string(),
171            ));
172        }
173        Ok(self)
174    }
175
176    fn execute(
177        &self,
178        _partition: usize,
179        _context: Arc<datafusion::execution::TaskContext>,
180    ) -> datafusion_common::Result<SendableRecordBatchStream> {
181        let stream = self
182            .stream
183            .lock()
184            .map_err(|err| DataFusionError::Execution(err.to_string()))?
185            .take();
186        if let Some(stream) = stream {
187            Ok(stream)
188        } else {
189            Err(DataFusionError::Execution(
190                "OneShotExec has already been executed".to_string(),
191            ))
192        }
193    }
194
195    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
196        Ok(Statistics::new_unknown(&self.schema))
197    }
198
199    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
200        &self.properties
201    }
202}
203
204struct TracedExec {
205    input: Arc<dyn ExecutionPlan>,
206    properties: PlanProperties,
207    span: Span,
208}
209
210impl TracedExec {
211    pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
212        Self {
213            properties: input.properties().clone(),
214            input,
215            span,
216        }
217    }
218}
219
220impl DisplayAs for TracedExec {
221    fn fmt_as(
222        &self,
223        t: datafusion::physical_plan::DisplayFormatType,
224        f: &mut std::fmt::Formatter,
225    ) -> std::fmt::Result {
226        match t {
227            DisplayFormatType::Default
228            | DisplayFormatType::Verbose
229            | DisplayFormatType::TreeRender => {
230                write!(f, "TracedExec")
231            }
232        }
233    }
234}
235
236impl std::fmt::Debug for TracedExec {
237    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
238        write!(f, "TracedExec")
239    }
240}
241impl ExecutionPlan for TracedExec {
242    fn name(&self) -> &str {
243        "TracedExec"
244    }
245
246    fn as_any(&self) -> &dyn std::any::Any {
247        self
248    }
249
250    fn properties(&self) -> &PlanProperties {
251        &self.properties
252    }
253
254    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
255        vec![&self.input]
256    }
257
258    fn with_new_children(
259        self: Arc<Self>,
260        children: Vec<Arc<dyn ExecutionPlan>>,
261    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
262        Ok(Arc::new(Self {
263            input: children[0].clone(),
264            properties: self.properties.clone(),
265            span: self.span.clone(),
266        }))
267    }
268
269    fn execute(
270        &self,
271        partition: usize,
272        context: Arc<TaskContext>,
273    ) -> datafusion_common::Result<SendableRecordBatchStream> {
274        let _guard = self.span.enter();
275        let stream = self.input.execute(partition, context)?;
276        let schema = stream.schema();
277        let stream = stream.stream_in_span(self.span.clone());
278        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
279    }
280}
281
282/// Callback for reporting statistics after a scan
283pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
284
285#[derive(Default, Clone)]
286pub struct LanceExecutionOptions {
287    pub use_spilling: bool,
288    pub mem_pool_size: Option<u64>,
289    pub batch_size: Option<usize>,
290    pub target_partition: Option<usize>,
291    pub execution_stats_callback: Option<ExecutionStatsCallback>,
292}
293
294impl std::fmt::Debug for LanceExecutionOptions {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        f.debug_struct("LanceExecutionOptions")
297            .field("use_spilling", &self.use_spilling)
298            .field("mem_pool_size", &self.mem_pool_size)
299            .field("batch_size", &self.batch_size)
300            .field("target_partition", &self.target_partition)
301            .field(
302                "execution_stats_callback",
303                &self.execution_stats_callback.is_some(),
304            )
305            .finish()
306    }
307}
308
309const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
310
311impl LanceExecutionOptions {
312    pub fn mem_pool_size(&self) -> u64 {
313        self.mem_pool_size.unwrap_or_else(|| {
314            std::env::var("LANCE_MEM_POOL_SIZE")
315                .map(|s| match s.parse::<u64>() {
316                    Ok(v) => v,
317                    Err(e) => {
318                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
319                        DEFAULT_LANCE_MEM_POOL_SIZE
320                    }
321                })
322                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
323        })
324    }
325
326    pub fn use_spilling(&self) -> bool {
327        if !self.use_spilling {
328            return false;
329        }
330        std::env::var("LANCE_BYPASS_SPILLING")
331            .map(|_| {
332                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
333                false
334            })
335            .unwrap_or(true)
336    }
337}
338
339pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
340    let mut session_config = SessionConfig::new();
341    let mut runtime_env_builder = RuntimeEnvBuilder::new();
342    if let Some(target_partition) = options.target_partition {
343        session_config = session_config.with_target_partitions(target_partition);
344    }
345    if options.use_spilling() {
346        runtime_env_builder = runtime_env_builder
347            .with_disk_manager_builder(DiskManagerBuilder::default())
348            .with_memory_pool(Arc::new(FairSpillPool::new(
349                options.mem_pool_size() as usize
350            )));
351    }
352    let runtime_env = runtime_env_builder.build_arc().unwrap();
353
354    let ctx = SessionContext::new_with_config_rt(session_config, runtime_env);
355    register_functions(&ctx);
356
357    ctx
358}
359
360static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
361    LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
362
363static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
364    new_session_context(&LanceExecutionOptions {
365        use_spilling: true,
366        ..Default::default()
367    })
368});
369
370pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
371    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
372    {
373        return if options.use_spilling() {
374            DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
375        } else {
376            DEFAULT_SESSION_CONTEXT.clone()
377        };
378    }
379    new_session_context(options)
380}
381
382fn get_task_context(
383    session_ctx: &SessionContext,
384    options: &LanceExecutionOptions,
385) -> Arc<TaskContext> {
386    let mut state = session_ctx.state();
387    if let Some(batch_size) = options.batch_size.as_ref() {
388        state.config_mut().options_mut().execution.batch_size = *batch_size;
389    }
390
391    state.task_ctx()
392}
393
394#[derive(Default, Clone, Debug, PartialEq, Eq)]
395pub struct ExecutionSummaryCounts {
396    /// The number of I/O operations performed
397    pub iops: usize,
398    /// The number of requests made to the storage layer (may be larger or smaller than iops
399    /// depending on coalescing configuration)
400    pub requests: usize,
401    /// The number of bytes read during the execution of the plan
402    pub bytes_read: usize,
403    /// The number of top-level indices loaded
404    pub indices_loaded: usize,
405    /// The number of index partitions loaded
406    pub parts_loaded: usize,
407    /// The number of index comparisons performed (the exact meaning depends on the index type)
408    pub index_comparisons: usize,
409    /// Additional metrics for more detailed statistics.  These are subject to change in the future
410    /// and should only be used for debugging purposes.
411    pub all_counts: HashMap<String, usize>,
412}
413
414fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
415    if let Some(metrics) = node.metrics() {
416        for (metric_name, count) in metrics.iter_counts() {
417            match metric_name.as_ref() {
418                IOPS_METRIC => counts.iops += count.value(),
419                REQUESTS_METRIC => counts.requests += count.value(),
420                BYTES_READ_METRIC => counts.bytes_read += count.value(),
421                INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
422                PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
423                INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
424                _ => {
425                    let existing = counts
426                        .all_counts
427                        .entry(metric_name.as_ref().to_string())
428                        .or_insert(0);
429                    *existing += count.value();
430                }
431            }
432        }
433    }
434    for child in node.children() {
435        visit_node(child.as_ref(), counts);
436    }
437}
438
439fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
440    let output_rows = plan
441        .metrics()
442        .map(|m| m.output_rows().unwrap_or(0))
443        .unwrap_or(0);
444    let mut counts = ExecutionSummaryCounts::default();
445    visit_node(plan, &mut counts);
446    tracing::info!(
447        target: TRACE_EXECUTION,
448        r#type = EXECUTION_PLAN_RUN,
449        output_rows,
450        iops = counts.iops,
451        requests = counts.requests,
452        bytes_read = counts.bytes_read,
453        indices_loaded = counts.indices_loaded,
454        parts_loaded = counts.parts_loaded,
455        index_comparisons = counts.index_comparisons,
456    );
457    if let Some(callback) = options.execution_stats_callback.as_ref() {
458        callback(&counts);
459    }
460}
461
462/// Executes a plan using default session & runtime configuration
463///
464/// Only executes a single partition.  Panics if the plan has more than one partition.
465pub fn execute_plan(
466    plan: Arc<dyn ExecutionPlan>,
467    options: LanceExecutionOptions,
468) -> Result<SendableRecordBatchStream> {
469    debug!(
470        "Executing plan:\n{}",
471        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
472    );
473
474    let session_ctx = get_session_context(&options);
475
476    // NOTE: we are only executing the first partition here. Therefore, if
477    // the plan has more than one partition, we will be missing data.
478    assert_eq!(plan.properties().partitioning.partition_count(), 1);
479    let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
480
481    let schema = stream.schema();
482    let stream = stream.finally(move || {
483        report_plan_summary_metrics(plan.as_ref(), &options);
484    });
485    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
486}
487
488pub async fn analyze_plan(
489    plan: Arc<dyn ExecutionPlan>,
490    options: LanceExecutionOptions,
491) -> Result<String> {
492    // This is needed as AnalyzeExec launches a thread task per
493    // partition, and we want these to be connected to the parent span
494    let plan = Arc::new(TracedExec::new(plan, Span::current()));
495
496    let schema = plan.schema();
497    let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
498
499    let session_ctx = get_session_context(&options);
500    assert_eq!(analyze.properties().partitioning.partition_count(), 1);
501    let mut stream = analyze
502        .execute(0, get_task_context(&session_ctx, &options))
503        .map_err(|err| {
504            Error::io(
505                format!("Failed to execute analyze plan: {}", err),
506                location!(),
507            )
508        })?;
509
510    // fully execute the plan
511    while (stream.next().await).is_some() {}
512
513    let result = format_plan(analyze);
514    Ok(result)
515}
516
517pub fn format_plan(plan: Arc<dyn ExecutionPlan>) -> String {
518    /// A visitor which calculates additional metrics for all the plans.
519    struct CalculateVisitor {
520        highest_index: usize,
521        index_to_cumulative_cpu: HashMap<usize, usize>,
522    }
523    impl CalculateVisitor {
524        fn calculate_cumulative_cpu(&mut self, plan: &Arc<dyn ExecutionPlan>) -> usize {
525            self.highest_index += 1;
526            let plan_index = self.highest_index;
527            let elapsed_cpu: usize = match plan.metrics() {
528                Some(metrics) => metrics.elapsed_compute().unwrap_or_default(),
529                None => 0,
530            };
531            let mut cumulative_cpu = elapsed_cpu;
532            for child in plan.children() {
533                cumulative_cpu += self.calculate_cumulative_cpu(child);
534            }
535            self.index_to_cumulative_cpu
536                .insert(plan_index, cumulative_cpu);
537            cumulative_cpu
538        }
539    }
540
541    /// A visitor which prints out all the plans.
542    struct PrintVisitor {
543        highest_index: usize,
544        indent: usize,
545    }
546    impl PrintVisitor {
547        fn write_output(
548            &mut self,
549            plan: &Arc<dyn ExecutionPlan>,
550            f: &mut Formatter,
551            calcs: &CalculateVisitor,
552        ) -> std::fmt::Result {
553            self.highest_index += 1;
554            write!(f, "{:indent$}", "", indent = self.indent * 2)?;
555            plan.fmt_as(datafusion::physical_plan::DisplayFormatType::Verbose, f)?;
556            if let Some(metrics) = plan.metrics() {
557                let metrics = metrics
558                    .aggregate_by_name()
559                    .sorted_for_display()
560                    .timestamps_removed();
561
562                write!(f, ", metrics=[{metrics}]")?;
563            } else {
564                write!(f, ", metrics=[]")?;
565            }
566            let cumulative_cpu = calcs
567                .index_to_cumulative_cpu
568                .get(&self.highest_index)
569                .unwrap();
570            let cumulative_cpu_duration = Duration::from_nanos((*cumulative_cpu) as u64);
571            write!(f, ", cumulative_cpu={cumulative_cpu_duration:?}")?;
572            writeln!(f)?;
573            self.indent += 1;
574            for child in plan.children() {
575                self.write_output(child, f, calcs)?;
576            }
577            self.indent -= 1;
578            std::fmt::Result::Ok(())
579        }
580    }
581    // A wrapper which prints out a plan.
582    struct PrintWrapper {
583        plan: Arc<dyn ExecutionPlan>,
584    }
585    impl fmt::Display for PrintWrapper {
586        fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
587            let mut calcs = CalculateVisitor {
588                highest_index: 0,
589                index_to_cumulative_cpu: HashMap::new(),
590            };
591            calcs.calculate_cumulative_cpu(&self.plan);
592            let mut prints = PrintVisitor {
593                highest_index: 0,
594                indent: 0,
595            };
596            prints.write_output(&self.plan, f, &calcs)
597        }
598    }
599    let wrapper = PrintWrapper { plan };
600    format!("{}", wrapper)
601}
602
603pub trait SessionContextExt {
604    /// Creates a DataFrame for reading a stream of data
605    ///
606    /// This dataframe may only be queried once, future queries will fail
607    fn read_one_shot(
608        &self,
609        data: SendableRecordBatchStream,
610    ) -> datafusion::common::Result<DataFrame>;
611}
612
613struct OneShotPartitionStream {
614    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
615    schema: Arc<ArrowSchema>,
616}
617
618impl std::fmt::Debug for OneShotPartitionStream {
619    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620        let data = self.data.lock().unwrap();
621        f.debug_struct("OneShotPartitionStream")
622            .field("exhausted", &data.is_none())
623            .field("schema", self.schema.as_ref())
624            .finish()
625    }
626}
627
628impl OneShotPartitionStream {
629    fn new(data: SendableRecordBatchStream) -> Self {
630        let schema = data.schema();
631        Self {
632            data: Arc::new(Mutex::new(Some(data))),
633            schema,
634        }
635    }
636}
637
638impl PartitionStream for OneShotPartitionStream {
639    fn schema(&self) -> &arrow_schema::SchemaRef {
640        &self.schema
641    }
642
643    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
644        let mut stream = self.data.lock().unwrap();
645        stream
646            .take()
647            .expect("Attempt to consume a one shot dataframe multiple times")
648    }
649}
650
651impl SessionContextExt for SessionContext {
652    fn read_one_shot(
653        &self,
654        data: SendableRecordBatchStream,
655    ) -> datafusion::common::Result<DataFrame> {
656        let schema = data.schema();
657        let part_stream = Arc::new(OneShotPartitionStream::new(data));
658        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
659        self.read_table(Arc::new(provider))
660    }
661}
662
663#[derive(Clone, Debug)]
664pub struct StrictBatchSizeExec {
665    input: Arc<dyn ExecutionPlan>,
666    batch_size: usize,
667}
668
669impl StrictBatchSizeExec {
670    pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
671        Self { input, batch_size }
672    }
673}
674
675impl DisplayAs for StrictBatchSizeExec {
676    fn fmt_as(
677        &self,
678        _t: datafusion::physical_plan::DisplayFormatType,
679        f: &mut std::fmt::Formatter,
680    ) -> std::fmt::Result {
681        write!(f, "StrictBatchSizeExec")
682    }
683}
684
685impl ExecutionPlan for StrictBatchSizeExec {
686    fn name(&self) -> &str {
687        "StrictBatchSizeExec"
688    }
689
690    fn as_any(&self) -> &dyn std::any::Any {
691        self
692    }
693
694    fn properties(&self) -> &PlanProperties {
695        self.input.properties()
696    }
697
698    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
699        vec![&self.input]
700    }
701
702    fn with_new_children(
703        self: Arc<Self>,
704        children: Vec<Arc<dyn ExecutionPlan>>,
705    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
706        Ok(Arc::new(Self {
707            input: children[0].clone(),
708            batch_size: self.batch_size,
709        }))
710    }
711
712    fn execute(
713        &self,
714        partition: usize,
715        context: Arc<TaskContext>,
716    ) -> datafusion_common::Result<SendableRecordBatchStream> {
717        let stream = self.input.execute(partition, context)?;
718        let schema = stream.schema();
719        let stream = StrictBatchSizeStream::new(stream, self.batch_size);
720        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
721    }
722
723    fn maintains_input_order(&self) -> Vec<bool> {
724        vec![true]
725    }
726
727    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
728        vec![false]
729    }
730
731    fn partition_statistics(
732        &self,
733        partition: Option<usize>,
734    ) -> datafusion_common::Result<Statistics> {
735        self.input.partition_statistics(partition)
736    }
737
738    fn cardinality_effect(&self) -> CardinalityEffect {
739        CardinalityEffect::Equal
740    }
741}