1use std::{
7 collections::HashMap,
8 fmt::{self, Formatter},
9 sync::{Arc, LazyLock, Mutex},
10 time::Duration,
11};
12
13use arrow_array::RecordBatch;
14use arrow_schema::Schema as ArrowSchema;
15use datafusion::{
16 catalog::streaming::StreamingTable,
17 dataframe::DataFrame,
18 execution::{
19 context::{SessionConfig, SessionContext},
20 disk_manager::DiskManagerBuilder,
21 memory_pool::FairSpillPool,
22 runtime_env::RuntimeEnvBuilder,
23 TaskContext,
24 },
25 physical_plan::{
26 analyze::AnalyzeExec,
27 display::DisplayableExecutionPlan,
28 execution_plan::{Boundedness, CardinalityEffect, EmissionType},
29 stream::RecordBatchStreamAdapter,
30 streaming::PartitionStream,
31 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
32 },
33};
34use datafusion_common::{DataFusionError, Statistics};
35use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
36
37use futures::{stream, StreamExt};
38use lance_arrow::SchemaExt;
39use lance_core::{
40 utils::{
41 futures::FinallyStreamExt,
42 tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
43 },
44 Error, Result,
45};
46use log::{debug, info, warn};
47use snafu::location;
48use tracing::Span;
49
50use crate::udf::register_functions;
51use crate::{
52 chunker::StrictBatchSizeStream,
53 utils::{
54 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
55 IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
56 },
57};
58
59pub struct OneShotExec {
67 stream: Mutex<Option<SendableRecordBatchStream>>,
68 schema: Arc<ArrowSchema>,
71 properties: PlanProperties,
72}
73
74impl OneShotExec {
75 pub fn new(stream: SendableRecordBatchStream) -> Self {
77 let schema = stream.schema();
78 Self {
79 stream: Mutex::new(Some(stream)),
80 schema: schema.clone(),
81 properties: PlanProperties::new(
82 EquivalenceProperties::new(schema),
83 Partitioning::RoundRobinBatch(1),
84 EmissionType::Incremental,
85 Boundedness::Bounded,
86 ),
87 }
88 }
89
90 pub fn from_batch(batch: RecordBatch) -> Self {
91 let schema = batch.schema();
92 let stream = Box::pin(RecordBatchStreamAdapter::new(
93 schema,
94 stream::iter(vec![Ok(batch)]),
95 ));
96 Self::new(stream)
97 }
98}
99
100impl std::fmt::Debug for OneShotExec {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 let stream = self.stream.lock().unwrap();
103 f.debug_struct("OneShotExec")
104 .field("exhausted", &stream.is_none())
105 .field("schema", self.schema.as_ref())
106 .finish()
107 }
108}
109
110impl DisplayAs for OneShotExec {
111 fn fmt_as(
112 &self,
113 t: datafusion::physical_plan::DisplayFormatType,
114 f: &mut std::fmt::Formatter,
115 ) -> std::fmt::Result {
116 let stream = self.stream.lock().unwrap();
117 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
118 let columns = self
119 .schema
120 .field_names()
121 .iter()
122 .cloned()
123 .cloned()
124 .collect::<Vec<_>>();
125 match t {
126 DisplayFormatType::Default | DisplayFormatType::Verbose => {
127 write!(
128 f,
129 "OneShotStream: {}columns=[{}]",
130 exhausted,
131 columns.join(",")
132 )
133 }
134 DisplayFormatType::TreeRender => {
135 write!(
136 f,
137 "OneShotStream\nexhausted={}\ncolumns=[{}]",
138 exhausted,
139 columns.join(",")
140 )
141 }
142 }
143 }
144}
145
146impl ExecutionPlan for OneShotExec {
147 fn name(&self) -> &str {
148 "OneShotExec"
149 }
150
151 fn as_any(&self) -> &dyn std::any::Any {
152 self
153 }
154
155 fn schema(&self) -> arrow_schema::SchemaRef {
156 self.schema.clone()
157 }
158
159 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
160 vec![]
161 }
162
163 fn with_new_children(
164 self: Arc<Self>,
165 children: Vec<Arc<dyn ExecutionPlan>>,
166 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
167 if !children.is_empty() {
169 return Err(datafusion_common::DataFusionError::Internal(
170 "OneShotExec does not support children".to_string(),
171 ));
172 }
173 Ok(self)
174 }
175
176 fn execute(
177 &self,
178 _partition: usize,
179 _context: Arc<datafusion::execution::TaskContext>,
180 ) -> datafusion_common::Result<SendableRecordBatchStream> {
181 let stream = self
182 .stream
183 .lock()
184 .map_err(|err| DataFusionError::Execution(err.to_string()))?
185 .take();
186 if let Some(stream) = stream {
187 Ok(stream)
188 } else {
189 Err(DataFusionError::Execution(
190 "OneShotExec has already been executed".to_string(),
191 ))
192 }
193 }
194
195 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
196 Ok(Statistics::new_unknown(&self.schema))
197 }
198
199 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
200 &self.properties
201 }
202}
203
204struct TracedExec {
205 input: Arc<dyn ExecutionPlan>,
206 properties: PlanProperties,
207 span: Span,
208}
209
210impl TracedExec {
211 pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
212 Self {
213 properties: input.properties().clone(),
214 input,
215 span,
216 }
217 }
218}
219
220impl DisplayAs for TracedExec {
221 fn fmt_as(
222 &self,
223 t: datafusion::physical_plan::DisplayFormatType,
224 f: &mut std::fmt::Formatter,
225 ) -> std::fmt::Result {
226 match t {
227 DisplayFormatType::Default
228 | DisplayFormatType::Verbose
229 | DisplayFormatType::TreeRender => {
230 write!(f, "TracedExec")
231 }
232 }
233 }
234}
235
236impl std::fmt::Debug for TracedExec {
237 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
238 write!(f, "TracedExec")
239 }
240}
241impl ExecutionPlan for TracedExec {
242 fn name(&self) -> &str {
243 "TracedExec"
244 }
245
246 fn as_any(&self) -> &dyn std::any::Any {
247 self
248 }
249
250 fn properties(&self) -> &PlanProperties {
251 &self.properties
252 }
253
254 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
255 vec![&self.input]
256 }
257
258 fn with_new_children(
259 self: Arc<Self>,
260 children: Vec<Arc<dyn ExecutionPlan>>,
261 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
262 Ok(Arc::new(Self {
263 input: children[0].clone(),
264 properties: self.properties.clone(),
265 span: self.span.clone(),
266 }))
267 }
268
269 fn execute(
270 &self,
271 partition: usize,
272 context: Arc<TaskContext>,
273 ) -> datafusion_common::Result<SendableRecordBatchStream> {
274 let _guard = self.span.enter();
275 let stream = self.input.execute(partition, context)?;
276 let schema = stream.schema();
277 let stream = stream.stream_in_span(self.span.clone());
278 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
279 }
280}
281
282pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
284
285#[derive(Default, Clone)]
286pub struct LanceExecutionOptions {
287 pub use_spilling: bool,
288 pub mem_pool_size: Option<u64>,
289 pub batch_size: Option<usize>,
290 pub target_partition: Option<usize>,
291 pub execution_stats_callback: Option<ExecutionStatsCallback>,
292 pub skip_logging: bool,
293}
294
295impl std::fmt::Debug for LanceExecutionOptions {
296 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297 f.debug_struct("LanceExecutionOptions")
298 .field("use_spilling", &self.use_spilling)
299 .field("mem_pool_size", &self.mem_pool_size)
300 .field("batch_size", &self.batch_size)
301 .field("target_partition", &self.target_partition)
302 .field("skip_logging", &self.skip_logging)
303 .field(
304 "execution_stats_callback",
305 &self.execution_stats_callback.is_some(),
306 )
307 .finish()
308 }
309}
310
311const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
312
313impl LanceExecutionOptions {
314 pub fn mem_pool_size(&self) -> u64 {
315 self.mem_pool_size.unwrap_or_else(|| {
316 std::env::var("LANCE_MEM_POOL_SIZE")
317 .map(|s| match s.parse::<u64>() {
318 Ok(v) => v,
319 Err(e) => {
320 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
321 DEFAULT_LANCE_MEM_POOL_SIZE
322 }
323 })
324 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
325 })
326 }
327
328 pub fn use_spilling(&self) -> bool {
329 if !self.use_spilling {
330 return false;
331 }
332 std::env::var("LANCE_BYPASS_SPILLING")
333 .map(|_| {
334 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
335 false
336 })
337 .unwrap_or(true)
338 }
339}
340
341pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
342 let mut session_config = SessionConfig::new();
343 let mut runtime_env_builder = RuntimeEnvBuilder::new();
344 if let Some(target_partition) = options.target_partition {
345 session_config = session_config.with_target_partitions(target_partition);
346 }
347 if options.use_spilling() {
348 runtime_env_builder = runtime_env_builder
349 .with_disk_manager_builder(DiskManagerBuilder::default())
350 .with_memory_pool(Arc::new(FairSpillPool::new(
351 options.mem_pool_size() as usize
352 )));
353 }
354 let runtime_env = runtime_env_builder.build_arc().unwrap();
355
356 let ctx = SessionContext::new_with_config_rt(session_config, runtime_env);
357 register_functions(&ctx);
358
359 ctx
360}
361
362static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
363 LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
364
365static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
366 new_session_context(&LanceExecutionOptions {
367 use_spilling: true,
368 ..Default::default()
369 })
370});
371
372pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
373 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
374 {
375 return if options.use_spilling() {
376 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
377 } else {
378 DEFAULT_SESSION_CONTEXT.clone()
379 };
380 }
381 new_session_context(options)
382}
383
384fn get_task_context(
385 session_ctx: &SessionContext,
386 options: &LanceExecutionOptions,
387) -> Arc<TaskContext> {
388 let mut state = session_ctx.state();
389 if let Some(batch_size) = options.batch_size.as_ref() {
390 state.config_mut().options_mut().execution.batch_size = *batch_size;
391 }
392
393 state.task_ctx()
394}
395
396#[derive(Default, Clone, Debug, PartialEq, Eq)]
397pub struct ExecutionSummaryCounts {
398 pub iops: usize,
400 pub requests: usize,
403 pub bytes_read: usize,
405 pub indices_loaded: usize,
407 pub parts_loaded: usize,
409 pub index_comparisons: usize,
411 pub all_counts: HashMap<String, usize>,
414}
415
416fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
417 if let Some(metrics) = node.metrics() {
418 for (metric_name, count) in metrics.iter_counts() {
419 match metric_name.as_ref() {
420 IOPS_METRIC => counts.iops += count.value(),
421 REQUESTS_METRIC => counts.requests += count.value(),
422 BYTES_READ_METRIC => counts.bytes_read += count.value(),
423 INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
424 PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
425 INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
426 _ => {
427 let existing = counts
428 .all_counts
429 .entry(metric_name.as_ref().to_string())
430 .or_insert(0);
431 *existing += count.value();
432 }
433 }
434 }
435 for (metric_name, gauge) in metrics.iter_gauges() {
437 match metric_name.as_ref() {
438 IOPS_METRIC => counts.iops += gauge.value(),
439 REQUESTS_METRIC => counts.requests += gauge.value(),
440 BYTES_READ_METRIC => counts.bytes_read += gauge.value(),
441 _ => {}
442 }
443 }
444 }
445 for child in node.children() {
446 visit_node(child.as_ref(), counts);
447 }
448}
449
450fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
451 let output_rows = plan
452 .metrics()
453 .map(|m| m.output_rows().unwrap_or(0))
454 .unwrap_or(0);
455 let mut counts = ExecutionSummaryCounts::default();
456 visit_node(plan, &mut counts);
457 tracing::info!(
458 target: TRACE_EXECUTION,
459 r#type = EXECUTION_PLAN_RUN,
460 plan_summary = display_plan_one_liner(plan),
461 output_rows,
462 iops = counts.iops,
463 requests = counts.requests,
464 bytes_read = counts.bytes_read,
465 indices_loaded = counts.indices_loaded,
466 parts_loaded = counts.parts_loaded,
467 index_comparisons = counts.index_comparisons,
468 );
469 if let Some(callback) = options.execution_stats_callback.as_ref() {
470 callback(&counts);
471 }
472}
473
474fn display_plan_one_liner(plan: &dyn ExecutionPlan) -> String {
481 let mut output = String::new();
482
483 display_plan_one_liner_impl(plan, &mut output);
484
485 output
486}
487
488fn display_plan_one_liner_impl(plan: &dyn ExecutionPlan, output: &mut String) {
489 let name = plan.name().trim_end_matches("Exec");
491 output.push_str(name);
492
493 let children = plan.children();
494 if !children.is_empty() {
495 output.push('(');
496 for (i, child) in children.iter().enumerate() {
497 if i > 0 {
498 output.push(',');
499 }
500 display_plan_one_liner_impl(child.as_ref(), output);
501 }
502 output.push(')');
503 }
504}
505
506pub fn execute_plan(
510 plan: Arc<dyn ExecutionPlan>,
511 options: LanceExecutionOptions,
512) -> Result<SendableRecordBatchStream> {
513 if !options.skip_logging {
514 debug!(
515 "Executing plan:\n{}",
516 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
517 );
518 }
519
520 let session_ctx = get_session_context(&options);
521
522 assert_eq!(plan.properties().partitioning.partition_count(), 1);
525 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
526
527 let schema = stream.schema();
528 let stream = stream.finally(move || {
529 if !options.skip_logging {
530 report_plan_summary_metrics(plan.as_ref(), &options);
531 }
532 });
533 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
534}
535
536pub async fn analyze_plan(
537 plan: Arc<dyn ExecutionPlan>,
538 options: LanceExecutionOptions,
539) -> Result<String> {
540 let plan = Arc::new(TracedExec::new(plan, Span::current()));
543
544 let schema = plan.schema();
545 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
546
547 let session_ctx = get_session_context(&options);
548 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
549 let mut stream = analyze
550 .execute(0, get_task_context(&session_ctx, &options))
551 .map_err(|err| {
552 Error::io(
553 format!("Failed to execute analyze plan: {}", err),
554 location!(),
555 )
556 })?;
557
558 while (stream.next().await).is_some() {}
560
561 let result = format_plan(analyze);
562 Ok(result)
563}
564
565pub fn format_plan(plan: Arc<dyn ExecutionPlan>) -> String {
566 struct CalculateVisitor {
568 highest_index: usize,
569 index_to_cumulative_cpu: HashMap<usize, usize>,
570 }
571 impl CalculateVisitor {
572 fn calculate_cumulative_cpu(&mut self, plan: &Arc<dyn ExecutionPlan>) -> usize {
573 self.highest_index += 1;
574 let plan_index = self.highest_index;
575 let elapsed_cpu: usize = match plan.metrics() {
576 Some(metrics) => metrics.elapsed_compute().unwrap_or_default(),
577 None => 0,
578 };
579 let mut cumulative_cpu = elapsed_cpu;
580 for child in plan.children() {
581 cumulative_cpu += self.calculate_cumulative_cpu(child);
582 }
583 self.index_to_cumulative_cpu
584 .insert(plan_index, cumulative_cpu);
585 cumulative_cpu
586 }
587 }
588
589 struct PrintVisitor {
591 highest_index: usize,
592 indent: usize,
593 }
594 impl PrintVisitor {
595 fn write_output(
596 &mut self,
597 plan: &Arc<dyn ExecutionPlan>,
598 f: &mut Formatter,
599 calcs: &CalculateVisitor,
600 ) -> std::fmt::Result {
601 self.highest_index += 1;
602 write!(f, "{:indent$}", "", indent = self.indent * 2)?;
603 plan.fmt_as(datafusion::physical_plan::DisplayFormatType::Verbose, f)?;
604 if let Some(metrics) = plan.metrics() {
605 let metrics = metrics
606 .aggregate_by_name()
607 .sorted_for_display()
608 .timestamps_removed();
609
610 write!(f, ", metrics=[{metrics}]")?;
611 } else {
612 write!(f, ", metrics=[]")?;
613 }
614 let cumulative_cpu = calcs
615 .index_to_cumulative_cpu
616 .get(&self.highest_index)
617 .unwrap();
618 let cumulative_cpu_duration = Duration::from_nanos((*cumulative_cpu) as u64);
619 write!(f, ", cumulative_cpu={cumulative_cpu_duration:?}")?;
620 writeln!(f)?;
621 self.indent += 1;
622 for child in plan.children() {
623 self.write_output(child, f, calcs)?;
624 }
625 self.indent -= 1;
626 std::fmt::Result::Ok(())
627 }
628 }
629 struct PrintWrapper {
631 plan: Arc<dyn ExecutionPlan>,
632 }
633 impl fmt::Display for PrintWrapper {
634 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
635 let mut calcs = CalculateVisitor {
636 highest_index: 0,
637 index_to_cumulative_cpu: HashMap::new(),
638 };
639 calcs.calculate_cumulative_cpu(&self.plan);
640 let mut prints = PrintVisitor {
641 highest_index: 0,
642 indent: 0,
643 };
644 prints.write_output(&self.plan, f, &calcs)
645 }
646 }
647 let wrapper = PrintWrapper { plan };
648 format!("{}", wrapper)
649}
650
651pub trait SessionContextExt {
652 fn read_one_shot(
656 &self,
657 data: SendableRecordBatchStream,
658 ) -> datafusion::common::Result<DataFrame>;
659}
660
661struct OneShotPartitionStream {
662 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
663 schema: Arc<ArrowSchema>,
664}
665
666impl std::fmt::Debug for OneShotPartitionStream {
667 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
668 let data = self.data.lock().unwrap();
669 f.debug_struct("OneShotPartitionStream")
670 .field("exhausted", &data.is_none())
671 .field("schema", self.schema.as_ref())
672 .finish()
673 }
674}
675
676impl OneShotPartitionStream {
677 fn new(data: SendableRecordBatchStream) -> Self {
678 let schema = data.schema();
679 Self {
680 data: Arc::new(Mutex::new(Some(data))),
681 schema,
682 }
683 }
684}
685
686impl PartitionStream for OneShotPartitionStream {
687 fn schema(&self) -> &arrow_schema::SchemaRef {
688 &self.schema
689 }
690
691 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
692 let mut stream = self.data.lock().unwrap();
693 stream
694 .take()
695 .expect("Attempt to consume a one shot dataframe multiple times")
696 }
697}
698
699impl SessionContextExt for SessionContext {
700 fn read_one_shot(
701 &self,
702 data: SendableRecordBatchStream,
703 ) -> datafusion::common::Result<DataFrame> {
704 let schema = data.schema();
705 let part_stream = Arc::new(OneShotPartitionStream::new(data));
706 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
707 self.read_table(Arc::new(provider))
708 }
709}
710
711#[derive(Clone, Debug)]
712pub struct StrictBatchSizeExec {
713 input: Arc<dyn ExecutionPlan>,
714 batch_size: usize,
715}
716
717impl StrictBatchSizeExec {
718 pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
719 Self { input, batch_size }
720 }
721}
722
723impl DisplayAs for StrictBatchSizeExec {
724 fn fmt_as(
725 &self,
726 _t: datafusion::physical_plan::DisplayFormatType,
727 f: &mut std::fmt::Formatter,
728 ) -> std::fmt::Result {
729 write!(f, "StrictBatchSizeExec")
730 }
731}
732
733impl ExecutionPlan for StrictBatchSizeExec {
734 fn name(&self) -> &str {
735 "StrictBatchSizeExec"
736 }
737
738 fn as_any(&self) -> &dyn std::any::Any {
739 self
740 }
741
742 fn properties(&self) -> &PlanProperties {
743 self.input.properties()
744 }
745
746 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
747 vec![&self.input]
748 }
749
750 fn with_new_children(
751 self: Arc<Self>,
752 children: Vec<Arc<dyn ExecutionPlan>>,
753 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
754 Ok(Arc::new(Self {
755 input: children[0].clone(),
756 batch_size: self.batch_size,
757 }))
758 }
759
760 fn execute(
761 &self,
762 partition: usize,
763 context: Arc<TaskContext>,
764 ) -> datafusion_common::Result<SendableRecordBatchStream> {
765 let stream = self.input.execute(partition, context)?;
766 let schema = stream.schema();
767 let stream = StrictBatchSizeStream::new(stream, self.batch_size);
768 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
769 }
770
771 fn maintains_input_order(&self) -> Vec<bool> {
772 vec![true]
773 }
774
775 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
776 vec![false]
777 }
778
779 fn partition_statistics(
780 &self,
781 partition: Option<usize>,
782 ) -> datafusion_common::Result<Statistics> {
783 self.input.partition_statistics(partition)
784 }
785
786 fn cardinality_effect(&self) -> CardinalityEffect {
787 CardinalityEffect::Equal
788 }
789
790 fn supports_limit_pushdown(&self) -> bool {
791 true
792 }
793}