lance_datafusion/
exec.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Utilities for working with datafusion execution plans
5
6use std::{
7    collections::HashMap,
8    sync::{Arc, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14    catalog::streaming::StreamingTable,
15    dataframe::DataFrame,
16    execution::{
17        context::{SessionConfig, SessionContext},
18        disk_manager::DiskManagerConfig,
19        memory_pool::FairSpillPool,
20        runtime_env::RuntimeEnvBuilder,
21        TaskContext,
22    },
23    physical_plan::{
24        analyze::AnalyzeExec,
25        display::DisplayableExecutionPlan,
26        execution_plan::{Boundedness, EmissionType},
27        stream::RecordBatchStreamAdapter,
28        streaming::PartitionStream,
29        DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30    },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34use lazy_static::lazy_static;
35
36use futures::{stream, StreamExt};
37use lance_arrow::SchemaExt;
38use lance_core::{
39    utils::{
40        futures::FinallyStreamExt,
41        tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
42    },
43    Error, Result,
44};
45use log::{debug, info, warn};
46use snafu::location;
47
48use crate::utils::{
49    MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
50    PARTS_LOADED_METRIC, REQUESTS_METRIC,
51};
52
53/// An source execution node created from an existing stream
54///
55/// It can only be used once, and will return the stream.  After that the node
56/// is exhausted.
57///
58/// Note: the stream should be finite, otherwise we will report datafusion properties
59/// incorrectly.
60pub struct OneShotExec {
61    stream: Mutex<Option<SendableRecordBatchStream>>,
62    // We save off a copy of the schema to speed up formatting and so ExecutionPlan::schema & display_as
63    // can still function after exhausted
64    schema: Arc<ArrowSchema>,
65    properties: PlanProperties,
66}
67
68impl OneShotExec {
69    /// Create a new instance from a given stream
70    pub fn new(stream: SendableRecordBatchStream) -> Self {
71        let schema = stream.schema();
72        Self {
73            stream: Mutex::new(Some(stream)),
74            schema: schema.clone(),
75            properties: PlanProperties::new(
76                EquivalenceProperties::new(schema),
77                Partitioning::RoundRobinBatch(1),
78                EmissionType::Incremental,
79                Boundedness::Bounded,
80            ),
81        }
82    }
83
84    pub fn from_batch(batch: RecordBatch) -> Self {
85        let schema = batch.schema();
86        let stream = Box::pin(RecordBatchStreamAdapter::new(
87            schema,
88            stream::iter(vec![Ok(batch)]),
89        ));
90        Self::new(stream)
91    }
92}
93
94impl std::fmt::Debug for OneShotExec {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        let stream = self.stream.lock().unwrap();
97        f.debug_struct("OneShotExec")
98            .field("exhausted", &stream.is_none())
99            .field("schema", self.schema.as_ref())
100            .finish()
101    }
102}
103
104impl DisplayAs for OneShotExec {
105    fn fmt_as(
106        &self,
107        t: datafusion::physical_plan::DisplayFormatType,
108        f: &mut std::fmt::Formatter,
109    ) -> std::fmt::Result {
110        let stream = self.stream.lock().unwrap();
111        let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
112        let columns = self
113            .schema
114            .field_names()
115            .iter()
116            .map(|s| s.to_string())
117            .collect::<Vec<_>>();
118        match t {
119            DisplayFormatType::Default | DisplayFormatType::Verbose => {
120                write!(
121                    f,
122                    "OneShotStream: {}columns=[{}]",
123                    exhausted,
124                    columns.join(",")
125                )
126            }
127            DisplayFormatType::TreeRender => {
128                write!(
129                    f,
130                    "OneShotStream\nexhausted={}\ncolumns=[{}]",
131                    exhausted,
132                    columns.join(",")
133                )
134            }
135        }
136    }
137}
138
139impl ExecutionPlan for OneShotExec {
140    fn name(&self) -> &str {
141        "OneShotExec"
142    }
143
144    fn as_any(&self) -> &dyn std::any::Any {
145        self
146    }
147
148    fn schema(&self) -> arrow_schema::SchemaRef {
149        self.schema.clone()
150    }
151
152    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
153        vec![]
154    }
155
156    fn with_new_children(
157        self: Arc<Self>,
158        _children: Vec<Arc<dyn ExecutionPlan>>,
159    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
160        todo!()
161    }
162
163    fn execute(
164        &self,
165        _partition: usize,
166        _context: Arc<datafusion::execution::TaskContext>,
167    ) -> datafusion_common::Result<SendableRecordBatchStream> {
168        let stream = self
169            .stream
170            .lock()
171            .map_err(|err| DataFusionError::Execution(err.to_string()))?
172            .take();
173        if let Some(stream) = stream {
174            Ok(stream)
175        } else {
176            Err(DataFusionError::Execution(
177                "OneShotExec has already been executed".to_string(),
178            ))
179        }
180    }
181
182    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
183        Ok(Statistics::new_unknown(&self.schema))
184    }
185
186    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
187        &self.properties
188    }
189}
190
191/// Callback for reporting statistics after a scan
192pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
193
194#[derive(Default, Clone)]
195pub struct LanceExecutionOptions {
196    pub use_spilling: bool,
197    pub mem_pool_size: Option<u64>,
198    pub batch_size: Option<usize>,
199    pub target_partition: Option<usize>,
200    pub execution_stats_callback: Option<ExecutionStatsCallback>,
201}
202
203impl std::fmt::Debug for LanceExecutionOptions {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        f.debug_struct("LanceExecutionOptions")
206            .field("use_spilling", &self.use_spilling)
207            .field("mem_pool_size", &self.mem_pool_size)
208            .field("batch_size", &self.batch_size)
209            .field("target_partition", &self.target_partition)
210            .field(
211                "execution_stats_callback",
212                &self.execution_stats_callback.is_some(),
213            )
214            .finish()
215    }
216}
217
218const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
219
220impl LanceExecutionOptions {
221    pub fn mem_pool_size(&self) -> u64 {
222        self.mem_pool_size.unwrap_or_else(|| {
223            std::env::var("LANCE_MEM_POOL_SIZE")
224                .map(|s| match s.parse::<u64>() {
225                    Ok(v) => v,
226                    Err(e) => {
227                        warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
228                        DEFAULT_LANCE_MEM_POOL_SIZE
229                    }
230                })
231                .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
232        })
233    }
234
235    pub fn use_spilling(&self) -> bool {
236        if !self.use_spilling {
237            return false;
238        }
239        std::env::var("LANCE_BYPASS_SPILLING")
240            .map(|_| {
241                info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
242                false
243            })
244            .unwrap_or(true)
245    }
246}
247
248pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
249    let mut session_config = SessionConfig::new();
250    let mut runtime_env_builder = RuntimeEnvBuilder::new();
251    if let Some(target_partition) = options.target_partition {
252        session_config = session_config.with_target_partitions(target_partition);
253    }
254    if options.use_spilling() {
255        runtime_env_builder = runtime_env_builder
256            .with_disk_manager(DiskManagerConfig::new())
257            .with_memory_pool(Arc::new(FairSpillPool::new(
258                options.mem_pool_size() as usize
259            )));
260    }
261    let runtime_env = runtime_env_builder.build_arc().unwrap();
262    SessionContext::new_with_config_rt(session_config, runtime_env)
263}
264
265lazy_static! {
266    static ref DEFAULT_SESSION_CONTEXT: SessionContext =
267        new_session_context(&LanceExecutionOptions::default());
268    static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
269        new_session_context(&LanceExecutionOptions {
270            use_spilling: true,
271            ..Default::default()
272        })
273    };
274}
275
276pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
277    if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
278    {
279        return if options.use_spilling() {
280            DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
281        } else {
282            DEFAULT_SESSION_CONTEXT.clone()
283        };
284    }
285    new_session_context(options)
286}
287
288fn get_task_context(
289    session_ctx: &SessionContext,
290    options: &LanceExecutionOptions,
291) -> Arc<TaskContext> {
292    let mut state = session_ctx.state();
293    if let Some(batch_size) = options.batch_size.as_ref() {
294        state.config_mut().options_mut().execution.batch_size = *batch_size;
295    }
296
297    state.task_ctx()
298}
299
300#[derive(Default, Clone, Debug, PartialEq, Eq)]
301pub struct ExecutionSummaryCounts {
302    /// The number of I/O operations performed
303    pub iops: usize,
304    /// The number of requests made to the storage layer (may be larger or smaller than iops
305    /// depending on coalescing configuration)
306    pub requests: usize,
307    /// The number of bytes read during the execution of the plan
308    pub bytes_read: usize,
309    /// The number of top-level indices loaded
310    pub indices_loaded: usize,
311    /// The number of index partitions loaded
312    pub parts_loaded: usize,
313    /// The number of index comparisons performed (the exact meaning depends on the index type)
314    pub index_comparisons: usize,
315    /// Additional metrics for more detailed statistics.  These are subject to change in the future
316    /// and should only be used for debugging purposes.
317    pub all_counts: HashMap<String, usize>,
318}
319
320fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
321    if let Some(metrics) = node.metrics() {
322        for (metric_name, count) in metrics.iter_counts() {
323            match metric_name.as_ref() {
324                IOPS_METRIC => counts.iops += count.value(),
325                REQUESTS_METRIC => counts.requests += count.value(),
326                BYTES_READ_METRIC => counts.bytes_read += count.value(),
327                INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
328                PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
329                INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
330                _ => {
331                    let existing = counts
332                        .all_counts
333                        .entry(metric_name.as_ref().to_string())
334                        .or_insert(0);
335                    *existing += count.value();
336                }
337            }
338        }
339    }
340    for child in node.children() {
341        visit_node(child.as_ref(), counts);
342    }
343}
344
345fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
346    let output_rows = plan
347        .metrics()
348        .map(|m| m.output_rows().unwrap_or(0))
349        .unwrap_or(0);
350    let mut counts = ExecutionSummaryCounts::default();
351    visit_node(plan, &mut counts);
352    tracing::info!(
353        target: TRACE_EXECUTION,
354        type = EXECUTION_PLAN_RUN,
355        output_rows,
356        iops = counts.iops,
357        requests = counts.requests,
358        bytes_read = counts.bytes_read,
359        indices_loaded = counts.indices_loaded,
360        parts_loaded = counts.parts_loaded,
361        index_comparisons = counts.index_comparisons,
362    );
363    if let Some(callback) = options.execution_stats_callback.as_ref() {
364        callback(&counts);
365    }
366}
367
368/// Executes a plan using default session & runtime configuration
369///
370/// Only executes a single partition.  Panics if the plan has more than one partition.
371pub fn execute_plan(
372    plan: Arc<dyn ExecutionPlan>,
373    options: LanceExecutionOptions,
374) -> Result<SendableRecordBatchStream> {
375    debug!(
376        "Executing plan:\n{}",
377        DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
378    );
379
380    let session_ctx = get_session_context(&options);
381
382    // NOTE: we are only executing the first partition here. Therefore, if
383    // the plan has more than one partition, we will be missing data.
384    assert_eq!(plan.properties().partitioning.partition_count(), 1);
385    let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
386
387    let schema = stream.schema();
388    let stream = stream.finally(move || {
389        report_plan_summary_metrics(plan.as_ref(), &options);
390    });
391    Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
392}
393
394pub async fn analyze_plan(
395    plan: Arc<dyn ExecutionPlan>,
396    options: LanceExecutionOptions,
397) -> Result<String> {
398    let schema = plan.schema();
399    let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
400
401    let session_ctx = get_session_context(&options);
402    assert_eq!(analyze.properties().partitioning.partition_count(), 1);
403    let mut stream = analyze
404        .execute(0, get_task_context(&session_ctx, &options))
405        .map_err(|err| {
406            Error::io(
407                format!("Failed to execute analyze plan: {}", err),
408                location!(),
409            )
410        })?;
411
412    // fully execute the plan
413    while (stream.next().await).is_some() {}
414
415    let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
416    Ok(format!("{}", display.indent(true)))
417}
418
419pub trait SessionContextExt {
420    /// Creates a DataFrame for reading a stream of data
421    ///
422    /// This dataframe may only be queried once, future queries will fail
423    fn read_one_shot(
424        &self,
425        data: SendableRecordBatchStream,
426    ) -> datafusion::common::Result<DataFrame>;
427}
428
429struct OneShotPartitionStream {
430    data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
431    schema: Arc<ArrowSchema>,
432}
433
434impl std::fmt::Debug for OneShotPartitionStream {
435    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436        let data = self.data.lock().unwrap();
437        f.debug_struct("OneShotPartitionStream")
438            .field("exhausted", &data.is_none())
439            .field("schema", self.schema.as_ref())
440            .finish()
441    }
442}
443
444impl OneShotPartitionStream {
445    fn new(data: SendableRecordBatchStream) -> Self {
446        let schema = data.schema();
447        Self {
448            data: Arc::new(Mutex::new(Some(data))),
449            schema,
450        }
451    }
452}
453
454impl PartitionStream for OneShotPartitionStream {
455    fn schema(&self) -> &arrow_schema::SchemaRef {
456        &self.schema
457    }
458
459    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
460        let mut stream = self.data.lock().unwrap();
461        stream
462            .take()
463            .expect("Attempt to consume a one shot dataframe multiple times")
464    }
465}
466
467impl SessionContextExt for SessionContext {
468    fn read_one_shot(
469        &self,
470        data: SendableRecordBatchStream,
471    ) -> datafusion::common::Result<DataFrame> {
472        let schema = data.schema();
473        let part_stream = Arc::new(OneShotPartitionStream::new(data));
474        let provider = StreamingTable::try_new(schema, vec![part_stream])?;
475        self.read_table(Arc::new(provider))
476    }
477}