lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::{
7    collections::HashMap,
8    sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14    catalog::streaming::StreamingTable,
15    dataframe::DataFrame,
16    execution::{
17        context::{SessionConfig, SessionContext},
18        disk_manager::DiskManagerBuilder,
19        memory_pool::FairSpillPool,
20        runtime_env::RuntimeEnvBuilder,
21        TaskContext,
22    },
23    physical_plan::{
24        analyze::AnalyzeExec,
25        display::DisplayableExecutionPlan,
26        execution_plan::{Boundedness, EmissionType},
27        stream::RecordBatchStreamAdapter,
28        streaming::PartitionStream,
29        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30    },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38    utils::{
39        futures::FinallyStreamExt,
40        tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41    },
42    Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46
47use crate::utils::{
48    MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
49    PARTS_LOADED_METRIC, REQUESTS_METRIC,
50};
51
52/// An source execution node created from an existing stream
53///
54/// It can only be used once, and will return the stream.  After that the node
55/// is exhausted.
56///
57/// Note: the stream should be finite, otherwise we will report datafusion properties
58/// incorrectly.
59pub struct OneShotExec {
60    stream: Mutex<Option<SendableRecordBatchStream>>,
61    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
62    // can still function after exhausted
63    schema: Arc<ArrowSchema>,
64    properties: PlanProperties,
65}
66
67impl OneShotExec {
68    /// Create a new instance from a given stream
69    pub fn new(stream: SendableRecordBatchStream) -> Self {
70        let schema = stream.schema();
71        Self {
72            stream: Mutex::new(Some(stream)),
73            schema: schema.clone(),
74            properties: PlanProperties::new(
75                EquivalenceProperties::new(schema),
76                Partitioning::RoundRobinBatch(1),
77                EmissionType::Incremental,
78                Boundedness::Bounded,
79            ),
80        }
81    }
82
83    pub fn from_batch(batch: RecordBatch) -> Self {
84        let schema = batch.schema();
85        let stream = Box::pin(RecordBatchStreamAdapter::new(
86            schema,
87            stream::iter(vec![Ok(batch)]),
88        ));
89        Self::new(stream)
90    }
91}
92
93impl std::fmt::Debug for OneShotExec {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        let stream = self.stream.lock().unwrap();
96        f.debug_struct("OneShotExec")
97            .field("exhausted", &stream.is_none())
98            .field("schema", self.schema.as_ref())
99            .finish()
100    }
101}
102
103impl DisplayAs for OneShotExec {
104    fn fmt_as(
105        &self,
106        t: datafusion::physical_plan::DisplayFormatType,
107        f: &mut std::fmt::Formatter,
108    ) -> std::fmt::Result {
109        let stream = self.stream.lock().unwrap();
110        let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
111        let columns = self
112            .schema
113            .field_names()
114            .iter()
115            .map(|s| s.to_string())
116            .collect::<Vec<_>>();
117        match t {
118            DisplayFormatType::Default | DisplayFormatType::Verbose => {
119                write!(
120                    f,
121                    "OneShotStream: {}columns=[{}]",
122                    exhausted,
123                    columns.join(",")
124                )
125            }
126            DisplayFormatType::TreeRender => {
127                write!(
128                    f,
129                    "OneShotStream\nexhausted={}\ncolumns=[{}]",
130                    exhausted,
131                    columns.join(",")
132                )
133            }
134        }
135    }
136}
137
138impl ExecutionPlan for OneShotExec {
139    fn name(&self) -> &str {
140        "OneShotExec"
141    }
142
143    fn as_any(&self) -> &dyn std::any::Any {
144        self
145    }
146
147    fn schema(&self) -> arrow_schema::SchemaRef {
148        self.schema.clone()
149    }
150
151    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
152        vec![]
153    }
154
155    fn with_new_children(
156        self: Arc<Self>,
157        _children: Vec<Arc<dyn ExecutionPlan>>,
158    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
159        todo!()
160    }
161
162    fn execute(
163        &self,
164        _partition: usize,
165        _context: Arc<datafusion::execution::TaskContext>,
166    ) -> datafusion_common::Result<SendableRecordBatchStream> {
167        let stream = self
168            .stream
169            .lock()
170            .map_err(|err| DataFusionError::Execution(err.to_string()))?
171            .take();
172        if let Some(stream) = stream {
173            Ok(stream)
174        } else {
175            Err(DataFusionError::Execution(
176                "OneShotExec has already been executed".to_string(),
177            ))
178        }
179    }
180
181    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
182        Ok(Statistics::new_unknown(&self.schema))
183    }
184
185    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
186        &self.properties
187    }
188}
189
190/// Callback for reporting statistics after a scan
191pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
192
193#[derive(Default, Clone)]
194pub struct LanceExecutionOptions {
195    pub use_spilling: bool,
196    pub mem_pool_size: Option<u64>,
197    pub batch_size: Option<usize>,
198    pub target_partition: Option<usize>,
199    pub execution_stats_callback: Option<ExecutionStatsCallback>,
200}
201
202impl std::fmt::Debug for LanceExecutionOptions {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("LanceExecutionOptions")
205            .field("use_spilling", &self.use_spilling)
206            .field("mem_pool_size", &self.mem_pool_size)
207            .field("batch_size", &self.batch_size)
208            .field("target_partition", &self.target_partition)
209            .field(
210                "execution_stats_callback",
211                &self.execution_stats_callback.is_some(),
212            )
213            .finish()
214    }
215}
216
217const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
218
219impl LanceExecutionOptions {
220    pub fn mem_pool_size(&self) -> u64 {
221        self.mem_pool_size.unwrap_or_else(|| {
222            std::env::var("LANCE_MEM_POOL_SIZE")
223                .map(|s| match s.parse::<u64>() {
224                    Ok(v) => v,
225                    Err(e) => {
226                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
227                        DEFAULT_LANCE_MEM_POOL_SIZE
228                    }
229                })
230                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
231        })
232    }
233
234    pub fn use_spilling(&self) -> bool {
235        if !self.use_spilling {
236            return false;
237        }
238        std::env::var("LANCE_BYPASS_SPILLING")
239            .map(|_| {
240                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
241                false
242            })
243            .unwrap_or(true)
244    }
245}
246
247pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
248    let mut session_config = SessionConfig::new();
249    let mut runtime_env_builder = RuntimeEnvBuilder::new();
250    if let Some(target_partition) = options.target_partition {
251        session_config = session_config.with_target_partitions(target_partition);
252    }
253    if options.use_spilling() {
254        runtime_env_builder = runtime_env_builder
255            .with_disk_manager_builder(DiskManagerBuilder::default())
256            .with_memory_pool(Arc::new(FairSpillPool::new(
257                options.mem_pool_size() as usize
258            )));
259    }
260    let runtime_env = runtime_env_builder.build_arc().unwrap();
261    SessionContext::new_with_config_rt(session_config, runtime_env)
262}
263
264static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
265    LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
266
267static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
268    new_session_context(&LanceExecutionOptions {
269        use_spilling: true,
270        ..Default::default()
271    })
272});
273
274pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
275    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
276    {
277        return if options.use_spilling() {
278            DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
279        } else {
280            DEFAULT_SESSION_CONTEXT.clone()
281        };
282    }
283    new_session_context(options)
284}
285
286fn get_task_context(
287    session_ctx: &SessionContext,
288    options: &LanceExecutionOptions,
289) -> Arc<TaskContext> {
290    let mut state = session_ctx.state();
291    if let Some(batch_size) = options.batch_size.as_ref() {
292        state.config_mut().options_mut().execution.batch_size = *batch_size;
293    }
294
295    state.task_ctx()
296}
297
298#[derive(Default, Clone, Debug, PartialEq, Eq)]
299pub struct ExecutionSummaryCounts {
300    /// The number of I/O operations performed
301    pub iops: usize,
302    /// The number of requests made to the storage layer (may be larger or smaller than iops
303    /// depending on coalescing configuration)
304    pub requests: usize,
305    /// The number of bytes read during the execution of the plan
306    pub bytes_read: usize,
307    /// The number of top-level indices loaded
308    pub indices_loaded: usize,
309    /// The number of index partitions loaded
310    pub parts_loaded: usize,
311    /// The number of index comparisons performed (the exact meaning depends on the index type)
312    pub index_comparisons: usize,
313    /// Additional metrics for more detailed statistics.  These are subject to change in the future
314    /// and should only be used for debugging purposes.
315    pub all_counts: HashMap<String, usize>,
316}
317
318fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
319    if let Some(metrics) = node.metrics() {
320        for (metric_name, count) in metrics.iter_counts() {
321            match metric_name.as_ref() {
322                IOPS_METRIC => counts.iops += count.value(),
323                REQUESTS_METRIC => counts.requests += count.value(),
324                BYTES_READ_METRIC => counts.bytes_read += count.value(),
325                INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
326                PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
327                INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
328                _ => {
329                    let existing = counts
330                        .all_counts
331                        .entry(metric_name.as_ref().to_string())
332                        .or_insert(0);
333                    *existing += count.value();
334                }
335            }
336        }
337    }
338    for child in node.children() {
339        visit_node(child.as_ref(), counts);
340    }
341}
342
343fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
344    let output_rows = plan
345        .metrics()
346        .map(|m| m.output_rows().unwrap_or(0))
347        .unwrap_or(0);
348    let mut counts = ExecutionSummaryCounts::default();
349    visit_node(plan, &mut counts);
350    tracing::info!(
351        target: TRACE_EXECUTION,
352        type = EXECUTION_PLAN_RUN,
353        output_rows,
354        iops = counts.iops,
355        requests = counts.requests,
356        bytes_read = counts.bytes_read,
357        indices_loaded = counts.indices_loaded,
358        parts_loaded = counts.parts_loaded,
359        index_comparisons = counts.index_comparisons,
360    );
361    if let Some(callback) = options.execution_stats_callback.as_ref() {
362        callback(&counts);
363    }
364}
365
366/// Executes a plan using default session & runtime configuration
367///
368/// Only executes a single partition.  Panics if the plan has more than one partition.
369pub fn execute_plan(
370    plan: Arc<dyn ExecutionPlan>,
371    options: LanceExecutionOptions,
372) -> Result<SendableRecordBatchStream> {
373    debug!(
374        "Executing plan:\n{}",
375        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
376    );
377
378    let session_ctx = get_session_context(&options);
379
380    // NOTE: we are only executing the first partition here. Therefore, if
381    // the plan has more than one partition, we will be missing data.
382    assert_eq!(plan.properties().partitioning.partition_count(), 1);
383    let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
384
385    let schema = stream.schema();
386    let stream = stream.finally(move || {
387        report_plan_summary_metrics(plan.as_ref(), &options);
388    });
389    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
390}
391
392pub async fn analyze_plan(
393    plan: Arc<dyn ExecutionPlan>,
394    options: LanceExecutionOptions,
395) -> Result<String> {
396    let schema = plan.schema();
397    let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
398
399    let session_ctx = get_session_context(&options);
400    assert_eq!(analyze.properties().partitioning.partition_count(), 1);
401    let mut stream = analyze
402        .execute(0, get_task_context(&session_ctx, &options))
403        .map_err(|err| {
404            Error::io(
405                format!("Failed to execute analyze plan: {}", err),
406                location!(),
407            )
408        })?;
409
410    // fully execute the plan
411    while (stream.next().await).is_some() {}
412
413    let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
414    Ok(format!("{}", display.indent(true)))
415}
416
417pub trait SessionContextExt {
418    /// Creates a DataFrame for reading a stream of data
419    ///
420    /// This dataframe may only be queried once, future queries will fail
421    fn read_one_shot(
422        &self,
423        data: SendableRecordBatchStream,
424    ) -> datafusion::common::Result<DataFrame>;
425}
426
427struct OneShotPartitionStream {
428    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
429    schema: Arc<ArrowSchema>,
430}
431
432impl std::fmt::Debug for OneShotPartitionStream {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        let data = self.data.lock().unwrap();
435        f.debug_struct("OneShotPartitionStream")
436            .field("exhausted", &data.is_none())
437            .field("schema", self.schema.as_ref())
438            .finish()
439    }
440}
441
442impl OneShotPartitionStream {
443    fn new(data: SendableRecordBatchStream) -> Self {
444        let schema = data.schema();
445        Self {
446            data: Arc::new(Mutex::new(Some(data))),
447            schema,
448        }
449    }
450}
451
452impl PartitionStream for OneShotPartitionStream {
453    fn schema(&self) -> &arrow_schema::SchemaRef {
454        &self.schema
455    }
456
457    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
458        let mut stream = self.data.lock().unwrap();
459        stream
460            .take()
461            .expect("Attempt to consume a one shot dataframe multiple times")
462    }
463}
464
465impl SessionContextExt for SessionContext {
466    fn read_one_shot(
467        &self,
468        data: SendableRecordBatchStream,
469    ) -> datafusion::common::Result<DataFrame> {
470        let schema = data.schema();
471        let part_stream = Arc::new(OneShotPartitionStream::new(data));
472        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
473        self.read_table(Arc::new(provider))
474    }
475}