lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::{
7    collections::HashMap,
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14    catalog::streaming::StreamingTable,
15    dataframe::DataFrame,
16    execution::{
17        context::{SessionConfig, SessionContext},
18        disk_manager::DiskManagerBuilder,
19        memory_pool::FairSpillPool,
20        runtime_env::RuntimeEnvBuilder,
21        TaskContext,
22    },
23    physical_plan::{
24        analyze::AnalyzeExec,
25        display::DisplayableExecutionPlan,
26        execution_plan::{Boundedness, CardinalityEffect, EmissionType},
27        stream::RecordBatchStreamAdapter,
28        streaming::PartitionStream,
29        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30    },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38    utils::{
39        futures::FinallyStreamExt,
40        tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41    },
42    Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46use tracing::Span;
47
48use crate::{
49    chunker::StrictBatchSizeStream,
50    utils::{
51        MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
52        IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
53    },
54};
55
56/// An source execution node created from an existing stream
57///
58/// It can only be used once, and will return the stream.  After that the node
59/// is exhausted.
60///
61/// Note: the stream should be finite, otherwise we will report datafusion properties
62/// incorrectly.
63pub struct OneShotExec {
64    stream: Mutex<Option<SendableRecordBatchStream>>,
65    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
66    // can still function after exhausted
67    schema: Arc<ArrowSchema>,
68    properties: PlanProperties,
69}
70
71impl OneShotExec {
72    /// Create a new instance from a given stream
73    pub fn new(stream: SendableRecordBatchStream) -> Self {
74        let schema = stream.schema();
75        Self {
76            stream: Mutex::new(Some(stream)),
77            schema: schema.clone(),
78            properties: PlanProperties::new(
79                EquivalenceProperties::new(schema),
80                Partitioning::RoundRobinBatch(1),
81                EmissionType::Incremental,
82                Boundedness::Bounded,
83            ),
84        }
85    }
86
87    pub fn from_batch(batch: RecordBatch) -> Self {
88        let schema = batch.schema();
89        let stream = Box::pin(RecordBatchStreamAdapter::new(
90            schema,
91            stream::iter(vec![Ok(batch)]),
92        ));
93        Self::new(stream)
94    }
95}
96
97impl std::fmt::Debug for OneShotExec {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        let stream = self.stream.lock().unwrap();
100        f.debug_struct("OneShotExec")
101            .field("exhausted", &stream.is_none())
102            .field("schema", self.schema.as_ref())
103            .finish()
104    }
105}
106
107impl DisplayAs for OneShotExec {
108    fn fmt_as(
109        &self,
110        t: datafusion::physical_plan::DisplayFormatType,
111        f: &mut std::fmt::Formatter,
112    ) -> std::fmt::Result {
113        let stream = self.stream.lock().unwrap();
114        let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
115        let columns = self
116            .schema
117            .field_names()
118            .iter()
119            .map(|s| s.to_string())
120            .collect::<Vec<_>>();
121        match t {
122            DisplayFormatType::Default | DisplayFormatType::Verbose => {
123                write!(
124                    f,
125                    "OneShotStream: {}columns=[{}]",
126                    exhausted,
127                    columns.join(",")
128                )
129            }
130            DisplayFormatType::TreeRender => {
131                write!(
132                    f,
133                    "OneShotStream\nexhausted={}\ncolumns=[{}]",
134                    exhausted,
135                    columns.join(",")
136                )
137            }
138        }
139    }
140}
141
142impl ExecutionPlan for OneShotExec {
143    fn name(&self) -> &str {
144        "OneShotExec"
145    }
146
147    fn as_any(&self) -> &dyn std::any::Any {
148        self
149    }
150
151    fn schema(&self) -> arrow_schema::SchemaRef {
152        self.schema.clone()
153    }
154
155    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
156        vec![]
157    }
158
159    fn with_new_children(
160        self: Arc<Self>,
161        children: Vec<Arc<dyn ExecutionPlan>>,
162    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
163        // OneShotExec has no children, so this should only be called with an empty vector
164        if !children.is_empty() {
165            return Err(datafusion_common::DataFusionError::Internal(
166                "OneShotExec does not support children".to_string(),
167            ));
168        }
169        Ok(self)
170    }
171
172    fn execute(
173        &self,
174        _partition: usize,
175        _context: Arc<datafusion::execution::TaskContext>,
176    ) -> datafusion_common::Result<SendableRecordBatchStream> {
177        let stream = self
178            .stream
179            .lock()
180            .map_err(|err| DataFusionError::Execution(err.to_string()))?
181            .take();
182        if let Some(stream) = stream {
183            Ok(stream)
184        } else {
185            Err(DataFusionError::Execution(
186                "OneShotExec has already been executed".to_string(),
187            ))
188        }
189    }
190
191    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
192        Ok(Statistics::new_unknown(&self.schema))
193    }
194
195    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
196        &self.properties
197    }
198}
199
200struct TracedExec {
201    input: Arc<dyn ExecutionPlan>,
202    properties: PlanProperties,
203    span: Span,
204}
205
206impl TracedExec {
207    pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
208        Self {
209            properties: input.properties().clone(),
210            input,
211            span,
212        }
213    }
214}
215
216impl DisplayAs for TracedExec {
217    fn fmt_as(
218        &self,
219        t: datafusion::physical_plan::DisplayFormatType,
220        f: &mut std::fmt::Formatter,
221    ) -> std::fmt::Result {
222        match t {
223            DisplayFormatType::Default
224            | DisplayFormatType::Verbose
225            | DisplayFormatType::TreeRender => {
226                write!(f, "TracedExec")
227            }
228        }
229    }
230}
231
232impl std::fmt::Debug for TracedExec {
233    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
234        write!(f, "TracedExec")
235    }
236}
237impl ExecutionPlan for TracedExec {
238    fn name(&self) -> &str {
239        "TracedExec"
240    }
241
242    fn as_any(&self) -> &dyn std::any::Any {
243        self
244    }
245
246    fn properties(&self) -> &PlanProperties {
247        &self.properties
248    }
249
250    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
251        vec![&self.input]
252    }
253
254    fn with_new_children(
255        self: Arc<Self>,
256        children: Vec<Arc<dyn ExecutionPlan>>,
257    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
258        Ok(Arc::new(Self {
259            input: children[0].clone(),
260            properties: self.properties.clone(),
261            span: self.span.clone(),
262        }))
263    }
264
265    fn execute(
266        &self,
267        partition: usize,
268        context: Arc<TaskContext>,
269    ) -> datafusion_common::Result<SendableRecordBatchStream> {
270        let _guard = self.span.enter();
271        let stream = self.input.execute(partition, context)?;
272        let schema = stream.schema();
273        let stream = stream.stream_in_span(self.span.clone());
274        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
275    }
276}
277
278/// Callback for reporting statistics after a scan
279pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
280
281#[derive(Default, Clone)]
282pub struct LanceExecutionOptions {
283    pub use_spilling: bool,
284    pub mem_pool_size: Option<u64>,
285    pub batch_size: Option<usize>,
286    pub target_partition: Option<usize>,
287    pub execution_stats_callback: Option<ExecutionStatsCallback>,
288}
289
290impl std::fmt::Debug for LanceExecutionOptions {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        f.debug_struct("LanceExecutionOptions")
293            .field("use_spilling", &self.use_spilling)
294            .field("mem_pool_size", &self.mem_pool_size)
295            .field("batch_size", &self.batch_size)
296            .field("target_partition", &self.target_partition)
297            .field(
298                "execution_stats_callback",
299                &self.execution_stats_callback.is_some(),
300            )
301            .finish()
302    }
303}
304
305const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
306
307impl LanceExecutionOptions {
308    pub fn mem_pool_size(&self) -> u64 {
309        self.mem_pool_size.unwrap_or_else(|| {
310            std::env::var("LANCE_MEM_POOL_SIZE")
311                .map(|s| match s.parse::<u64>() {
312                    Ok(v) => v,
313                    Err(e) => {
314                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
315                        DEFAULT_LANCE_MEM_POOL_SIZE
316                    }
317                })
318                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
319        })
320    }
321
322    pub fn use_spilling(&self) -> bool {
323        if !self.use_spilling {
324            return false;
325        }
326        std::env::var("LANCE_BYPASS_SPILLING")
327            .map(|_| {
328                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
329                false
330            })
331            .unwrap_or(true)
332    }
333}
334
335pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
336    let mut session_config = SessionConfig::new();
337    let mut runtime_env_builder = RuntimeEnvBuilder::new();
338    if let Some(target_partition) = options.target_partition {
339        session_config = session_config.with_target_partitions(target_partition);
340    }
341    if options.use_spilling() {
342        runtime_env_builder = runtime_env_builder
343            .with_disk_manager_builder(DiskManagerBuilder::default())
344            .with_memory_pool(Arc::new(FairSpillPool::new(
345                options.mem_pool_size() as usize
346            )));
347    }
348    let runtime_env = runtime_env_builder.build_arc().unwrap();
349    SessionContext::new_with_config_rt(session_config, runtime_env)
350}
351
352static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
353    LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
354
355static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
356    new_session_context(&LanceExecutionOptions {
357        use_spilling: true,
358        ..Default::default()
359    })
360});
361
362pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
363    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
364    {
365        return if options.use_spilling() {
366            DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
367        } else {
368            DEFAULT_SESSION_CONTEXT.clone()
369        };
370    }
371    new_session_context(options)
372}
373
374fn get_task_context(
375    session_ctx: &SessionContext,
376    options: &LanceExecutionOptions,
377) -> Arc<TaskContext> {
378    let mut state = session_ctx.state();
379    if let Some(batch_size) = options.batch_size.as_ref() {
380        state.config_mut().options_mut().execution.batch_size = *batch_size;
381    }
382
383    state.task_ctx()
384}
385
386#[derive(Default, Clone, Debug, PartialEq, Eq)]
387pub struct ExecutionSummaryCounts {
388    /// The number of I/O operations performed
389    pub iops: usize,
390    /// The number of requests made to the storage layer (may be larger or smaller than iops
391    /// depending on coalescing configuration)
392    pub requests: usize,
393    /// The number of bytes read during the execution of the plan
394    pub bytes_read: usize,
395    /// The number of top-level indices loaded
396    pub indices_loaded: usize,
397    /// The number of index partitions loaded
398    pub parts_loaded: usize,
399    /// The number of index comparisons performed (the exact meaning depends on the index type)
400    pub index_comparisons: usize,
401    /// Additional metrics for more detailed statistics.  These are subject to change in the future
402    /// and should only be used for debugging purposes.
403    pub all_counts: HashMap<String, usize>,
404}
405
406fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
407    if let Some(metrics) = node.metrics() {
408        for (metric_name, count) in metrics.iter_counts() {
409            match metric_name.as_ref() {
410                IOPS_METRIC => counts.iops += count.value(),
411                REQUESTS_METRIC => counts.requests += count.value(),
412                BYTES_READ_METRIC => counts.bytes_read += count.value(),
413                INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
414                PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
415                INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
416                _ => {
417                    let existing = counts
418                        .all_counts
419                        .entry(metric_name.as_ref().to_string())
420                        .or_insert(0);
421                    *existing += count.value();
422                }
423            }
424        }
425    }
426    for child in node.children() {
427        visit_node(child.as_ref(), counts);
428    }
429}
430
431fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
432    let output_rows = plan
433        .metrics()
434        .map(|m| m.output_rows().unwrap_or(0))
435        .unwrap_or(0);
436    let mut counts = ExecutionSummaryCounts::default();
437    visit_node(plan, &mut counts);
438    tracing::info!(
439        target: TRACE_EXECUTION,
440        r#type = EXECUTION_PLAN_RUN,
441        output_rows,
442        iops = counts.iops,
443        requests = counts.requests,
444        bytes_read = counts.bytes_read,
445        indices_loaded = counts.indices_loaded,
446        parts_loaded = counts.parts_loaded,
447        index_comparisons = counts.index_comparisons,
448    );
449    if let Some(callback) = options.execution_stats_callback.as_ref() {
450        callback(&counts);
451    }
452}
453
454/// Executes a plan using default session & runtime configuration
455///
456/// Only executes a single partition.  Panics if the plan has more than one partition.
457pub fn execute_plan(
458    plan: Arc<dyn ExecutionPlan>,
459    options: LanceExecutionOptions,
460) -> Result<SendableRecordBatchStream> {
461    debug!(
462        "Executing plan:\n{}",
463        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
464    );
465
466    let session_ctx = get_session_context(&options);
467
468    // NOTE: we are only executing the first partition here. Therefore, if
469    // the plan has more than one partition, we will be missing data.
470    assert_eq!(plan.properties().partitioning.partition_count(), 1);
471    let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
472
473    let schema = stream.schema();
474    let stream = stream.finally(move || {
475        report_plan_summary_metrics(plan.as_ref(), &options);
476    });
477    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
478}
479
480pub async fn analyze_plan(
481    plan: Arc<dyn ExecutionPlan>,
482    options: LanceExecutionOptions,
483) -> Result<String> {
484    // This is needed as AnalyzeExec launches a thread task per
485    // partition, and we want these to be connected to the parent span
486    let plan = Arc::new(TracedExec::new(plan, Span::current()));
487
488    let schema = plan.schema();
489    let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
490
491    let session_ctx = get_session_context(&options);
492    assert_eq!(analyze.properties().partitioning.partition_count(), 1);
493    let mut stream = analyze
494        .execute(0, get_task_context(&session_ctx, &options))
495        .map_err(|err| {
496            Error::io(
497                format!("Failed to execute analyze plan: {}", err),
498                location!(),
499            )
500        })?;
501
502    // fully execute the plan
503    while (stream.next().await).is_some() {}
504
505    let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
506    Ok(format!("{}", display.indent(true)))
507}
508
509pub trait SessionContextExt {
510    /// Creates a DataFrame for reading a stream of data
511    ///
512    /// This dataframe may only be queried once, future queries will fail
513    fn read_one_shot(
514        &self,
515        data: SendableRecordBatchStream,
516    ) -> datafusion::common::Result<DataFrame>;
517}
518
519struct OneShotPartitionStream {
520    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
521    schema: Arc<ArrowSchema>,
522}
523
524impl std::fmt::Debug for OneShotPartitionStream {
525    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
526        let data = self.data.lock().unwrap();
527        f.debug_struct("OneShotPartitionStream")
528            .field("exhausted", &data.is_none())
529            .field("schema", self.schema.as_ref())
530            .finish()
531    }
532}
533
534impl OneShotPartitionStream {
535    fn new(data: SendableRecordBatchStream) -> Self {
536        let schema = data.schema();
537        Self {
538            data: Arc::new(Mutex::new(Some(data))),
539            schema,
540        }
541    }
542}
543
544impl PartitionStream for OneShotPartitionStream {
545    fn schema(&self) -> &arrow_schema::SchemaRef {
546        &self.schema
547    }
548
549    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
550        let mut stream = self.data.lock().unwrap();
551        stream
552            .take()
553            .expect("Attempt to consume a one shot dataframe multiple times")
554    }
555}
556
557impl SessionContextExt for SessionContext {
558    fn read_one_shot(
559        &self,
560        data: SendableRecordBatchStream,
561    ) -> datafusion::common::Result<DataFrame> {
562        let schema = data.schema();
563        let part_stream = Arc::new(OneShotPartitionStream::new(data));
564        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
565        self.read_table(Arc::new(provider))
566    }
567}
568
569#[derive(Clone, Debug)]
570pub struct StrictBatchSizeExec {
571    input: Arc<dyn ExecutionPlan>,
572    batch_size: usize,
573}
574
575impl StrictBatchSizeExec {
576    pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
577        Self { input, batch_size }
578    }
579}
580
581impl DisplayAs for StrictBatchSizeExec {
582    fn fmt_as(
583        &self,
584        _t: datafusion::physical_plan::DisplayFormatType,
585        f: &mut std::fmt::Formatter,
586    ) -> std::fmt::Result {
587        write!(f, "StrictBatchSizeExec")
588    }
589}
590
591impl ExecutionPlan for StrictBatchSizeExec {
592    fn name(&self) -> &str {
593        "StrictBatchSizeExec"
594    }
595
596    fn as_any(&self) -> &dyn std::any::Any {
597        self
598    }
599
600    fn properties(&self) -> &PlanProperties {
601        self.input.properties()
602    }
603
604    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
605        vec![&self.input]
606    }
607
608    fn with_new_children(
609        self: Arc<Self>,
610        children: Vec<Arc<dyn ExecutionPlan>>,
611    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
612        Ok(Arc::new(Self {
613            input: children[0].clone(),
614            batch_size: self.batch_size,
615        }))
616    }
617
618    fn execute(
619        &self,
620        partition: usize,
621        context: Arc<TaskContext>,
622    ) -> datafusion_common::Result<SendableRecordBatchStream> {
623        let stream = self.input.execute(partition, context)?;
624        let schema = stream.schema();
625        let stream = StrictBatchSizeStream::new(stream, self.batch_size);
626        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
627    }
628
629    fn maintains_input_order(&self) -> Vec<bool> {
630        vec![true]
631    }
632
633    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
634        vec![false]
635    }
636
637    fn partition_statistics(
638        &self,
639        partition: Option<usize>,
640    ) -> datafusion_common::Result<Statistics> {
641        self.input.partition_statistics(partition)
642    }
643
644    fn cardinality_effect(&self) -> CardinalityEffect {
645        CardinalityEffect::Equal
646    }
647}