lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::{
7    collections::HashMap,
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14    catalog::streaming::StreamingTable,
15    dataframe::DataFrame,
16    execution::{
17        context::{SessionConfig, SessionContext},
18        disk_manager::DiskManagerBuilder,
19        memory_pool::FairSpillPool,
20        runtime_env::RuntimeEnvBuilder,
21        TaskContext,
22    },
23    physical_plan::{
24        analyze::AnalyzeExec,
25        display::DisplayableExecutionPlan,
26        execution_plan::{Boundedness, CardinalityEffect, EmissionType},
27        stream::RecordBatchStreamAdapter,
28        streaming::PartitionStream,
29        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30    },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38    utils::{
39        futures::FinallyStreamExt,
40        tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41    },
42    Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46use tracing::Span;
47
48use crate::udf::register_functions;
49use crate::{
50    chunker::StrictBatchSizeStream,
51    utils::{
52        MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
53        IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
54    },
55};
56
57/// An source execution node created from an existing stream
58///
59/// It can only be used once, and will return the stream.  After that the node
60/// is exhausted.
61///
62/// Note: the stream should be finite, otherwise we will report datafusion properties
63/// incorrectly.
64pub struct OneShotExec {
65    stream: Mutex<Option<SendableRecordBatchStream>>,
66    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
67    // can still function after exhausted
68    schema: Arc<ArrowSchema>,
69    properties: PlanProperties,
70}
71
72impl OneShotExec {
73    /// Create a new instance from a given stream
74    pub fn new(stream: SendableRecordBatchStream) -> Self {
75        let schema = stream.schema();
76        Self {
77            stream: Mutex::new(Some(stream)),
78            schema: schema.clone(),
79            properties: PlanProperties::new(
80                EquivalenceProperties::new(schema),
81                Partitioning::RoundRobinBatch(1),
82                EmissionType::Incremental,
83                Boundedness::Bounded,
84            ),
85        }
86    }
87
88    pub fn from_batch(batch: RecordBatch) -> Self {
89        let schema = batch.schema();
90        let stream = Box::pin(RecordBatchStreamAdapter::new(
91            schema,
92            stream::iter(vec![Ok(batch)]),
93        ));
94        Self::new(stream)
95    }
96}
97
98impl std::fmt::Debug for OneShotExec {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        let stream = self.stream.lock().unwrap();
101        f.debug_struct("OneShotExec")
102            .field("exhausted", &stream.is_none())
103            .field("schema", self.schema.as_ref())
104            .finish()
105    }
106}
107
108impl DisplayAs for OneShotExec {
109    fn fmt_as(
110        &self,
111        t: datafusion::physical_plan::DisplayFormatType,
112        f: &mut std::fmt::Formatter,
113    ) -> std::fmt::Result {
114        let stream = self.stream.lock().unwrap();
115        let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
116        let columns = self
117            .schema
118            .field_names()
119            .iter()
120            .cloned()
121            .cloned()
122            .collect::<Vec<_>>();
123        match t {
124            DisplayFormatType::Default | DisplayFormatType::Verbose => {
125                write!(
126                    f,
127                    "OneShotStream: {}columns=[{}]",
128                    exhausted,
129                    columns.join(",")
130                )
131            }
132            DisplayFormatType::TreeRender => {
133                write!(
134                    f,
135                    "OneShotStream\nexhausted={}\ncolumns=[{}]",
136                    exhausted,
137                    columns.join(",")
138                )
139            }
140        }
141    }
142}
143
144impl ExecutionPlan for OneShotExec {
145    fn name(&self) -> &str {
146        "OneShotExec"
147    }
148
149    fn as_any(&self) -> &dyn std::any::Any {
150        self
151    }
152
153    fn schema(&self) -> arrow_schema::SchemaRef {
154        self.schema.clone()
155    }
156
157    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
158        vec![]
159    }
160
161    fn with_new_children(
162        self: Arc<Self>,
163        children: Vec<Arc<dyn ExecutionPlan>>,
164    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
165        // OneShotExec has no children, so this should only be called with an empty vector
166        if !children.is_empty() {
167            return Err(datafusion_common::DataFusionError::Internal(
168                "OneShotExec does not support children".to_string(),
169            ));
170        }
171        Ok(self)
172    }
173
174    fn execute(
175        &self,
176        _partition: usize,
177        _context: Arc<datafusion::execution::TaskContext>,
178    ) -> datafusion_common::Result<SendableRecordBatchStream> {
179        let stream = self
180            .stream
181            .lock()
182            .map_err(|err| DataFusionError::Execution(err.to_string()))?
183            .take();
184        if let Some(stream) = stream {
185            Ok(stream)
186        } else {
187            Err(DataFusionError::Execution(
188                "OneShotExec has already been executed".to_string(),
189            ))
190        }
191    }
192
193    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
194        Ok(Statistics::new_unknown(&self.schema))
195    }
196
197    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
198        &self.properties
199    }
200}
201
202struct TracedExec {
203    input: Arc<dyn ExecutionPlan>,
204    properties: PlanProperties,
205    span: Span,
206}
207
208impl TracedExec {
209    pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
210        Self {
211            properties: input.properties().clone(),
212            input,
213            span,
214        }
215    }
216}
217
218impl DisplayAs for TracedExec {
219    fn fmt_as(
220        &self,
221        t: datafusion::physical_plan::DisplayFormatType,
222        f: &mut std::fmt::Formatter,
223    ) -> std::fmt::Result {
224        match t {
225            DisplayFormatType::Default
226            | DisplayFormatType::Verbose
227            | DisplayFormatType::TreeRender => {
228                write!(f, "TracedExec")
229            }
230        }
231    }
232}
233
234impl std::fmt::Debug for TracedExec {
235    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
236        write!(f, "TracedExec")
237    }
238}
239impl ExecutionPlan for TracedExec {
240    fn name(&self) -> &str {
241        "TracedExec"
242    }
243
244    fn as_any(&self) -> &dyn std::any::Any {
245        self
246    }
247
248    fn properties(&self) -> &PlanProperties {
249        &self.properties
250    }
251
252    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
253        vec![&self.input]
254    }
255
256    fn with_new_children(
257        self: Arc<Self>,
258        children: Vec<Arc<dyn ExecutionPlan>>,
259    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
260        Ok(Arc::new(Self {
261            input: children[0].clone(),
262            properties: self.properties.clone(),
263            span: self.span.clone(),
264        }))
265    }
266
267    fn execute(
268        &self,
269        partition: usize,
270        context: Arc<TaskContext>,
271    ) -> datafusion_common::Result<SendableRecordBatchStream> {
272        let _guard = self.span.enter();
273        let stream = self.input.execute(partition, context)?;
274        let schema = stream.schema();
275        let stream = stream.stream_in_span(self.span.clone());
276        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
277    }
278}
279
280/// Callback for reporting statistics after a scan
281pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
282
283#[derive(Default, Clone)]
284pub struct LanceExecutionOptions {
285    pub use_spilling: bool,
286    pub mem_pool_size: Option<u64>,
287    pub batch_size: Option<usize>,
288    pub target_partition: Option<usize>,
289    pub execution_stats_callback: Option<ExecutionStatsCallback>,
290}
291
292impl std::fmt::Debug for LanceExecutionOptions {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("LanceExecutionOptions")
295            .field("use_spilling", &self.use_spilling)
296            .field("mem_pool_size", &self.mem_pool_size)
297            .field("batch_size", &self.batch_size)
298            .field("target_partition", &self.target_partition)
299            .field(
300                "execution_stats_callback",
301                &self.execution_stats_callback.is_some(),
302            )
303            .finish()
304    }
305}
306
307const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
308
309impl LanceExecutionOptions {
310    pub fn mem_pool_size(&self) -> u64 {
311        self.mem_pool_size.unwrap_or_else(|| {
312            std::env::var("LANCE_MEM_POOL_SIZE")
313                .map(|s| match s.parse::<u64>() {
314                    Ok(v) => v,
315                    Err(e) => {
316                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
317                        DEFAULT_LANCE_MEM_POOL_SIZE
318                    }
319                })
320                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
321        })
322    }
323
324    pub fn use_spilling(&self) -> bool {
325        if !self.use_spilling {
326            return false;
327        }
328        std::env::var("LANCE_BYPASS_SPILLING")
329            .map(|_| {
330                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
331                false
332            })
333            .unwrap_or(true)
334    }
335}
336
337pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
338    let mut session_config = SessionConfig::new();
339    let mut runtime_env_builder = RuntimeEnvBuilder::new();
340    if let Some(target_partition) = options.target_partition {
341        session_config = session_config.with_target_partitions(target_partition);
342    }
343    if options.use_spilling() {
344        runtime_env_builder = runtime_env_builder
345            .with_disk_manager_builder(DiskManagerBuilder::default())
346            .with_memory_pool(Arc::new(FairSpillPool::new(
347                options.mem_pool_size() as usize
348            )));
349    }
350    let runtime_env = runtime_env_builder.build_arc().unwrap();
351
352    let ctx = SessionContext::new_with_config_rt(session_config, runtime_env);
353    register_functions(&ctx);
354
355    ctx
356}
357
358static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
359    LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
360
361static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
362    new_session_context(&LanceExecutionOptions {
363        use_spilling: true,
364        ..Default::default()
365    })
366});
367
368pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
369    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
370    {
371        return if options.use_spilling() {
372            DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
373        } else {
374            DEFAULT_SESSION_CONTEXT.clone()
375        };
376    }
377    new_session_context(options)
378}
379
380fn get_task_context(
381    session_ctx: &SessionContext,
382    options: &LanceExecutionOptions,
383) -> Arc<TaskContext> {
384    let mut state = session_ctx.state();
385    if let Some(batch_size) = options.batch_size.as_ref() {
386        state.config_mut().options_mut().execution.batch_size = *batch_size;
387    }
388
389    state.task_ctx()
390}
391
392#[derive(Default, Clone, Debug, PartialEq, Eq)]
393pub struct ExecutionSummaryCounts {
394    /// The number of I/O operations performed
395    pub iops: usize,
396    /// The number of requests made to the storage layer (may be larger or smaller than iops
397    /// depending on coalescing configuration)
398    pub requests: usize,
399    /// The number of bytes read during the execution of the plan
400    pub bytes_read: usize,
401    /// The number of top-level indices loaded
402    pub indices_loaded: usize,
403    /// The number of index partitions loaded
404    pub parts_loaded: usize,
405    /// The number of index comparisons performed (the exact meaning depends on the index type)
406    pub index_comparisons: usize,
407    /// Additional metrics for more detailed statistics.  These are subject to change in the future
408    /// and should only be used for debugging purposes.
409    pub all_counts: HashMap<String, usize>,
410}
411
412fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
413    if let Some(metrics) = node.metrics() {
414        for (metric_name, count) in metrics.iter_counts() {
415            match metric_name.as_ref() {
416                IOPS_METRIC => counts.iops += count.value(),
417                REQUESTS_METRIC => counts.requests += count.value(),
418                BYTES_READ_METRIC => counts.bytes_read += count.value(),
419                INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
420                PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
421                INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
422                _ => {
423                    let existing = counts
424                        .all_counts
425                        .entry(metric_name.as_ref().to_string())
426                        .or_insert(0);
427                    *existing += count.value();
428                }
429            }
430        }
431    }
432    for child in node.children() {
433        visit_node(child.as_ref(), counts);
434    }
435}
436
437fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
438    let output_rows = plan
439        .metrics()
440        .map(|m| m.output_rows().unwrap_or(0))
441        .unwrap_or(0);
442    let mut counts = ExecutionSummaryCounts::default();
443    visit_node(plan, &mut counts);
444    tracing::info!(
445        target: TRACE_EXECUTION,
446        r#type = EXECUTION_PLAN_RUN,
447        output_rows,
448        iops = counts.iops,
449        requests = counts.requests,
450        bytes_read = counts.bytes_read,
451        indices_loaded = counts.indices_loaded,
452        parts_loaded = counts.parts_loaded,
453        index_comparisons = counts.index_comparisons,
454    );
455    if let Some(callback) = options.execution_stats_callback.as_ref() {
456        callback(&counts);
457    }
458}
459
460/// Executes a plan using default session & runtime configuration
461///
462/// Only executes a single partition.  Panics if the plan has more than one partition.
463pub fn execute_plan(
464    plan: Arc<dyn ExecutionPlan>,
465    options: LanceExecutionOptions,
466) -> Result<SendableRecordBatchStream> {
467    debug!(
468        "Executing plan:\n{}",
469        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
470    );
471
472    let session_ctx = get_session_context(&options);
473
474    // NOTE: we are only executing the first partition here. Therefore, if
475    // the plan has more than one partition, we will be missing data.
476    assert_eq!(plan.properties().partitioning.partition_count(), 1);
477    let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
478
479    let schema = stream.schema();
480    let stream = stream.finally(move || {
481        report_plan_summary_metrics(plan.as_ref(), &options);
482    });
483    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
484}
485
486pub async fn analyze_plan(
487    plan: Arc<dyn ExecutionPlan>,
488    options: LanceExecutionOptions,
489) -> Result<String> {
490    // This is needed as AnalyzeExec launches a thread task per
491    // partition, and we want these to be connected to the parent span
492    let plan = Arc::new(TracedExec::new(plan, Span::current()));
493
494    let schema = plan.schema();
495    let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
496
497    let session_ctx = get_session_context(&options);
498    assert_eq!(analyze.properties().partitioning.partition_count(), 1);
499    let mut stream = analyze
500        .execute(0, get_task_context(&session_ctx, &options))
501        .map_err(|err| {
502            Error::io(
503                format!("Failed to execute analyze plan: {}", err),
504                location!(),
505            )
506        })?;
507
508    // fully execute the plan
509    while (stream.next().await).is_some() {}
510
511    let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
512    Ok(format!("{}", display.indent(true)))
513}
514
515pub trait SessionContextExt {
516    /// Creates a DataFrame for reading a stream of data
517    ///
518    /// This dataframe may only be queried once, future queries will fail
519    fn read_one_shot(
520        &self,
521        data: SendableRecordBatchStream,
522    ) -> datafusion::common::Result<DataFrame>;
523}
524
525struct OneShotPartitionStream {
526    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
527    schema: Arc<ArrowSchema>,
528}
529
530impl std::fmt::Debug for OneShotPartitionStream {
531    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
532        let data = self.data.lock().unwrap();
533        f.debug_struct("OneShotPartitionStream")
534            .field("exhausted", &data.is_none())
535            .field("schema", self.schema.as_ref())
536            .finish()
537    }
538}
539
540impl OneShotPartitionStream {
541    fn new(data: SendableRecordBatchStream) -> Self {
542        let schema = data.schema();
543        Self {
544            data: Arc::new(Mutex::new(Some(data))),
545            schema,
546        }
547    }
548}
549
550impl PartitionStream for OneShotPartitionStream {
551    fn schema(&self) -> &arrow_schema::SchemaRef {
552        &self.schema
553    }
554
555    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
556        let mut stream = self.data.lock().unwrap();
557        stream
558            .take()
559            .expect("Attempt to consume a one shot dataframe multiple times")
560    }
561}
562
563impl SessionContextExt for SessionContext {
564    fn read_one_shot(
565        &self,
566        data: SendableRecordBatchStream,
567    ) -> datafusion::common::Result<DataFrame> {
568        let schema = data.schema();
569        let part_stream = Arc::new(OneShotPartitionStream::new(data));
570        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
571        self.read_table(Arc::new(provider))
572    }
573}
574
575#[derive(Clone, Debug)]
576pub struct StrictBatchSizeExec {
577    input: Arc<dyn ExecutionPlan>,
578    batch_size: usize,
579}
580
581impl StrictBatchSizeExec {
582    pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
583        Self { input, batch_size }
584    }
585}
586
587impl DisplayAs for StrictBatchSizeExec {
588    fn fmt_as(
589        &self,
590        _t: datafusion::physical_plan::DisplayFormatType,
591        f: &mut std::fmt::Formatter,
592    ) -> std::fmt::Result {
593        write!(f, "StrictBatchSizeExec")
594    }
595}
596
597impl ExecutionPlan for StrictBatchSizeExec {
598    fn name(&self) -> &str {
599        "StrictBatchSizeExec"
600    }
601
602    fn as_any(&self) -> &dyn std::any::Any {
603        self
604    }
605
606    fn properties(&self) -> &PlanProperties {
607        self.input.properties()
608    }
609
610    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
611        vec![&self.input]
612    }
613
614    fn with_new_children(
615        self: Arc<Self>,
616        children: Vec<Arc<dyn ExecutionPlan>>,
617    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
618        Ok(Arc::new(Self {
619            input: children[0].clone(),
620            batch_size: self.batch_size,
621        }))
622    }
623
624    fn execute(
625        &self,
626        partition: usize,
627        context: Arc<TaskContext>,
628    ) -> datafusion_common::Result<SendableRecordBatchStream> {
629        let stream = self.input.execute(partition, context)?;
630        let schema = stream.schema();
631        let stream = StrictBatchSizeStream::new(stream, self.batch_size);
632        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
633    }
634
635    fn maintains_input_order(&self) -> Vec<bool> {
636        vec![true]
637    }
638
639    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
640        vec![false]
641    }
642
643    fn partition_statistics(
644        &self,
645        partition: Option<usize>,
646    ) -> datafusion_common::Result<Statistics> {
647        self.input.partition_statistics(partition)
648    }
649
650    fn cardinality_effect(&self) -> CardinalityEffect {
651        CardinalityEffect::Equal
652    }
653}