lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::sync::{Arc, Mutex};
7
8use arrow_array::RecordBatch;
9use arrow_schema::Schema as ArrowSchema;
10use datafusion::{
11    catalog::streaming::StreamingTable,
12    dataframe::DataFrame,
13    execution::{
14        context::{SessionConfig, SessionContext},
15        disk_manager::DiskManagerConfig,
16        memory_pool::FairSpillPool,
17        runtime_env::RuntimeEnvBuilder,
18        TaskContext,
19    },
20    physical_plan::{
21        analyze::AnalyzeExec,
22        display::DisplayableExecutionPlan,
23        execution_plan::{Boundedness, EmissionType},
24        stream::RecordBatchStreamAdapter,
25        streaming::PartitionStream,
26        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
27    },
28};
29use datafusion_common::{DataFusionError, Statistics};
30use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
31use lazy_static::lazy_static;
32
33use futures::{stream, StreamExt};
34use lance_arrow::SchemaExt;
35use lance_core::{
36    utils::{
37        futures::FinallyStreamExt,
38        tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
39    },
40    Error, Result,
41};
42use log::{debug, info, warn};
43use snafu::location;
44
45use crate::utils::{
46    MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
47    PARTS_LOADED_METRIC, REQUESTS_METRIC,
48};
49
50/// An source execution node created from an existing stream
51///
52/// It can only be used once, and will return the stream.  After that the node
53/// is exhausted.
54///
55/// Note: the stream should be finite, otherwise we will report datafusion properties
56/// incorrectly.
57pub struct OneShotExec {
58    stream: Mutex<Option<SendableRecordBatchStream>>,
59    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
60    // can still function after exhausted
61    schema: Arc<ArrowSchema>,
62    properties: PlanProperties,
63}
64
65impl OneShotExec {
66    /// Create a new instance from a given stream
67    pub fn new(stream: SendableRecordBatchStream) -> Self {
68        let schema = stream.schema();
69        Self {
70            stream: Mutex::new(Some(stream)),
71            schema: schema.clone(),
72            properties: PlanProperties::new(
73                EquivalenceProperties::new(schema),
74                Partitioning::RoundRobinBatch(1),
75                EmissionType::Incremental,
76                Boundedness::Bounded,
77            ),
78        }
79    }
80
81    pub fn from_batch(batch: RecordBatch) -> Self {
82        let schema = batch.schema();
83        let stream = Box::pin(RecordBatchStreamAdapter::new(
84            schema,
85            stream::iter(vec![Ok(batch)]),
86        ));
87        Self::new(stream)
88    }
89}
90
91impl std::fmt::Debug for OneShotExec {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        let stream = self.stream.lock().unwrap();
94        f.debug_struct("OneShotExec")
95            .field("exhausted", &stream.is_none())
96            .field("schema", self.schema.as_ref())
97            .finish()
98    }
99}
100
101impl DisplayAs for OneShotExec {
102    fn fmt_as(
103        &self,
104        t: datafusion::physical_plan::DisplayFormatType,
105        f: &mut std::fmt::Formatter,
106    ) -> std::fmt::Result {
107        let stream = self.stream.lock().unwrap();
108        match t {
109            DisplayFormatType::Default | DisplayFormatType::Verbose => {
110                let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
111                let columns = self
112                    .schema
113                    .field_names()
114                    .iter()
115                    .map(|s| s.to_string())
116                    .collect::<Vec<_>>();
117                write!(
118                    f,
119                    "OneShotStream: {}columns=[{}]",
120                    exhausted,
121                    columns.join(",")
122                )
123            }
124        }
125    }
126}
127
128impl ExecutionPlan for OneShotExec {
129    fn name(&self) -> &str {
130        "OneShotExec"
131    }
132
133    fn as_any(&self) -> &dyn std::any::Any {
134        self
135    }
136
137    fn schema(&self) -> arrow_schema::SchemaRef {
138        self.schema.clone()
139    }
140
141    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142        vec![]
143    }
144
145    fn with_new_children(
146        self: Arc<Self>,
147        _children: Vec<Arc<dyn ExecutionPlan>>,
148    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
149        todo!()
150    }
151
152    fn execute(
153        &self,
154        _partition: usize,
155        _context: Arc<datafusion::execution::TaskContext>,
156    ) -> datafusion_common::Result<SendableRecordBatchStream> {
157        let stream = self
158            .stream
159            .lock()
160            .map_err(|err| DataFusionError::Execution(err.to_string()))?
161            .take();
162        if let Some(stream) = stream {
163            Ok(stream)
164        } else {
165            Err(DataFusionError::Execution(
166                "OneShotExec has already been executed".to_string(),
167            ))
168        }
169    }
170
171    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
172        Ok(Statistics::new_unknown(&self.schema))
173    }
174
175    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
176        &self.properties
177    }
178}
179
180/// Callback for reporting statistics after a scan
181pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
182
183#[derive(Default, Clone)]
184pub struct LanceExecutionOptions {
185    pub use_spilling: bool,
186    pub mem_pool_size: Option<u64>,
187    pub batch_size: Option<usize>,
188    pub target_partition: Option<usize>,
189    pub execution_stats_callback: Option<ExecutionStatsCallback>,
190}
191
192impl std::fmt::Debug for LanceExecutionOptions {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("LanceExecutionOptions")
195            .field("use_spilling", &self.use_spilling)
196            .field("mem_pool_size", &self.mem_pool_size)
197            .field("batch_size", &self.batch_size)
198            .field("target_partition", &self.target_partition)
199            .field(
200                "execution_stats_callback",
201                &self.execution_stats_callback.is_some(),
202            )
203            .finish()
204    }
205}
206
207const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
208
209impl LanceExecutionOptions {
210    pub fn mem_pool_size(&self) -> u64 {
211        self.mem_pool_size.unwrap_or_else(|| {
212            std::env::var("LANCE_MEM_POOL_SIZE")
213                .map(|s| match s.parse::<u64>() {
214                    Ok(v) => v,
215                    Err(e) => {
216                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
217                        DEFAULT_LANCE_MEM_POOL_SIZE
218                    }
219                })
220                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
221        })
222    }
223
224    pub fn use_spilling(&self) -> bool {
225        if !self.use_spilling {
226            return false;
227        }
228        std::env::var("LANCE_BYPASS_SPILLING")
229            .map(|_| {
230                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
231                false
232            })
233            .unwrap_or(true)
234    }
235}
236
237pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
238    let mut session_config = SessionConfig::new();
239    let mut runtime_env_builder = RuntimeEnvBuilder::new();
240    if let Some(target_partition) = options.target_partition {
241        session_config = session_config.with_target_partitions(target_partition);
242    }
243    if options.use_spilling() {
244        runtime_env_builder = runtime_env_builder
245            .with_disk_manager(DiskManagerConfig::new())
246            .with_memory_pool(Arc::new(FairSpillPool::new(
247                options.mem_pool_size() as usize
248            )));
249    }
250    let runtime_env = runtime_env_builder.build_arc().unwrap();
251    SessionContext::new_with_config_rt(session_config, runtime_env)
252}
253
254lazy_static! {
255    static ref DEFAULT_SESSION_CONTEXT: SessionContext =
256        new_session_context(&LanceExecutionOptions::default());
257    static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
258        new_session_context(&LanceExecutionOptions {
259            use_spilling: true,
260            ..Default::default()
261        })
262    };
263}
264
265pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
266    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
267    {
268        return if options.use_spilling() {
269            DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
270        } else {
271            DEFAULT_SESSION_CONTEXT.clone()
272        };
273    }
274    new_session_context(options)
275}
276
277fn get_task_context(
278    session_ctx: &SessionContext,
279    options: &LanceExecutionOptions,
280) -> Arc<TaskContext> {
281    let mut state = session_ctx.state();
282    if let Some(batch_size) = options.batch_size.as_ref() {
283        state.config_mut().options_mut().execution.batch_size = *batch_size;
284    }
285
286    state.task_ctx()
287}
288
289#[derive(Default)]
290pub struct ExecutionSummaryCounts {
291    pub iops: usize,
292    pub requests: usize,
293    pub bytes_read: usize,
294    pub indices_loaded: usize,
295    pub parts_loaded: usize,
296    pub index_comparisons: usize,
297}
298
299fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
300    if let Some(metrics) = node.metrics() {
301        counts.iops += metrics
302            .find_count(IOPS_METRIC)
303            .map(|c| c.value())
304            .unwrap_or(0);
305        counts.requests += metrics
306            .find_count(REQUESTS_METRIC)
307            .map(|c| c.value())
308            .unwrap_or(0);
309        counts.bytes_read += metrics
310            .find_count(BYTES_READ_METRIC)
311            .map(|c| c.value())
312            .unwrap_or(0);
313        counts.indices_loaded += metrics
314            .find_count(INDICES_LOADED_METRIC)
315            .map(|c| c.value())
316            .unwrap_or(0);
317        counts.parts_loaded += metrics
318            .find_count(PARTS_LOADED_METRIC)
319            .map(|c| c.value())
320            .unwrap_or(0);
321        counts.index_comparisons += metrics
322            .find_count(INDEX_COMPARISONS_METRIC)
323            .map(|c| c.value())
324            .unwrap_or(0);
325    }
326    for child in node.children() {
327        visit_node(child.as_ref(), counts);
328    }
329}
330
331fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
332    let output_rows = plan
333        .metrics()
334        .map(|m| m.output_rows().unwrap_or(0))
335        .unwrap_or(0);
336    let mut counts = ExecutionSummaryCounts::default();
337    visit_node(plan, &mut counts);
338    tracing::info!(
339        target: TRACE_EXECUTION,
340        type = EXECUTION_PLAN_RUN,
341        output_rows,
342        iops = counts.iops,
343        requests = counts.requests,
344        bytes_read = counts.bytes_read,
345        indices_loaded = counts.indices_loaded,
346        parts_loaded = counts.parts_loaded,
347        index_comparisons = counts.index_comparisons,
348    );
349    if let Some(callback) = options.execution_stats_callback.as_ref() {
350        callback(&counts);
351    }
352}
353
354/// Executes a plan using default session & runtime configuration
355///
356/// Only executes a single partition.  Panics if the plan has more than one partition.
357pub fn execute_plan(
358    plan: Arc<dyn ExecutionPlan>,
359    options: LanceExecutionOptions,
360) -> Result<SendableRecordBatchStream> {
361    debug!(
362        "Executing plan:\n{}",
363        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
364    );
365
366    let session_ctx = get_session_context(&options);
367
368    // NOTE: we are only executing the first partition here. Therefore, if
369    // the plan has more than one partition, we will be missing data.
370    assert_eq!(plan.properties().partitioning.partition_count(), 1);
371    let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
372
373    let schema = stream.schema();
374    let stream = stream.finally(move || {
375        report_plan_summary_metrics(plan.as_ref(), &options);
376    });
377    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
378}
379
380pub async fn analyze_plan(
381    plan: Arc<dyn ExecutionPlan>,
382    options: LanceExecutionOptions,
383) -> Result<String> {
384    let schema = plan.schema();
385    let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
386
387    let session_ctx = get_session_context(&options);
388    assert_eq!(analyze.properties().partitioning.partition_count(), 1);
389    let mut stream = analyze
390        .execute(0, get_task_context(&session_ctx, &options))
391        .map_err(|err| {
392            Error::io(
393                format!("Failed to execute analyze plan: {}", err),
394                location!(),
395            )
396        })?;
397
398    // fully execute the plan
399    while (stream.next().await).is_some() {}
400
401    let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
402    Ok(format!("{}", display.indent(true)))
403}
404
405pub trait SessionContextExt {
406    /// Creates a DataFrame for reading a stream of data
407    ///
408    /// This dataframe may only be queried once, future queries will fail
409    fn read_one_shot(
410        &self,
411        data: SendableRecordBatchStream,
412    ) -> datafusion::common::Result<DataFrame>;
413}
414
415struct OneShotPartitionStream {
416    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
417    schema: Arc<ArrowSchema>,
418}
419
420impl std::fmt::Debug for OneShotPartitionStream {
421    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422        let data = self.data.lock().unwrap();
423        f.debug_struct("OneShotPartitionStream")
424            .field("exhausted", &data.is_none())
425            .field("schema", self.schema.as_ref())
426            .finish()
427    }
428}
429
430impl OneShotPartitionStream {
431    fn new(data: SendableRecordBatchStream) -> Self {
432        let schema = data.schema();
433        Self {
434            data: Arc::new(Mutex::new(Some(data))),
435            schema,
436        }
437    }
438}
439
440impl PartitionStream for OneShotPartitionStream {
441    fn schema(&self) -> &arrow_schema::SchemaRef {
442        &self.schema
443    }
444
445    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
446        let mut stream = self.data.lock().unwrap();
447        stream
448            .take()
449            .expect("Attempt to consume a one shot dataframe multiple times")
450    }
451}
452
453impl SessionContextExt for SessionContext {
454    fn read_one_shot(
455        &self,
456        data: SendableRecordBatchStream,
457    ) -> datafusion::common::Result<DataFrame> {
458        let schema = data.schema();
459        let part_stream = Arc::new(OneShotPartitionStream::new(data));
460        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
461        self.read_table(Arc::new(provider))
462    }
463}