1use std::{
7 collections::HashMap,
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14 catalog::streaming::StreamingTable,
15 dataframe::DataFrame,
16 execution::{
17 context::{SessionConfig, SessionContext},
18 disk_manager::DiskManagerBuilder,
19 memory_pool::FairSpillPool,
20 runtime_env::RuntimeEnvBuilder,
21 TaskContext,
22 },
23 physical_plan::{
24 analyze::AnalyzeExec,
25 display::DisplayableExecutionPlan,
26 execution_plan::{Boundedness, CardinalityEffect, EmissionType},
27 stream::RecordBatchStreamAdapter,
28 streaming::PartitionStream,
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30 },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38 utils::{
39 futures::FinallyStreamExt,
40 tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41 },
42 Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46use tracing::Span;
47
48use crate::udf::register_functions;
49use crate::{
50 chunker::StrictBatchSizeStream,
51 utils::{
52 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
53 IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
54 },
55};
56
57pub struct OneShotExec {
65 stream: Mutex<Option<SendableRecordBatchStream>>,
66 schema: Arc<ArrowSchema>,
69 properties: PlanProperties,
70}
71
72impl OneShotExec {
73 pub fn new(stream: SendableRecordBatchStream) -> Self {
75 let schema = stream.schema();
76 Self {
77 stream: Mutex::new(Some(stream)),
78 schema: schema.clone(),
79 properties: PlanProperties::new(
80 EquivalenceProperties::new(schema),
81 Partitioning::RoundRobinBatch(1),
82 EmissionType::Incremental,
83 Boundedness::Bounded,
84 ),
85 }
86 }
87
88 pub fn from_batch(batch: RecordBatch) -> Self {
89 let schema = batch.schema();
90 let stream = Box::pin(RecordBatchStreamAdapter::new(
91 schema,
92 stream::iter(vec![Ok(batch)]),
93 ));
94 Self::new(stream)
95 }
96}
97
98impl std::fmt::Debug for OneShotExec {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 let stream = self.stream.lock().unwrap();
101 f.debug_struct("OneShotExec")
102 .field("exhausted", &stream.is_none())
103 .field("schema", self.schema.as_ref())
104 .finish()
105 }
106}
107
108impl DisplayAs for OneShotExec {
109 fn fmt_as(
110 &self,
111 t: datafusion::physical_plan::DisplayFormatType,
112 f: &mut std::fmt::Formatter,
113 ) -> std::fmt::Result {
114 let stream = self.stream.lock().unwrap();
115 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
116 let columns = self
117 .schema
118 .field_names()
119 .iter()
120 .cloned()
121 .cloned()
122 .collect::<Vec<_>>();
123 match t {
124 DisplayFormatType::Default | DisplayFormatType::Verbose => {
125 write!(
126 f,
127 "OneShotStream: {}columns=[{}]",
128 exhausted,
129 columns.join(",")
130 )
131 }
132 DisplayFormatType::TreeRender => {
133 write!(
134 f,
135 "OneShotStream\nexhausted={}\ncolumns=[{}]",
136 exhausted,
137 columns.join(",")
138 )
139 }
140 }
141 }
142}
143
144impl ExecutionPlan for OneShotExec {
145 fn name(&self) -> &str {
146 "OneShotExec"
147 }
148
149 fn as_any(&self) -> &dyn std::any::Any {
150 self
151 }
152
153 fn schema(&self) -> arrow_schema::SchemaRef {
154 self.schema.clone()
155 }
156
157 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
158 vec![]
159 }
160
161 fn with_new_children(
162 self: Arc<Self>,
163 children: Vec<Arc<dyn ExecutionPlan>>,
164 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
165 if !children.is_empty() {
167 return Err(datafusion_common::DataFusionError::Internal(
168 "OneShotExec does not support children".to_string(),
169 ));
170 }
171 Ok(self)
172 }
173
174 fn execute(
175 &self,
176 _partition: usize,
177 _context: Arc<datafusion::execution::TaskContext>,
178 ) -> datafusion_common::Result<SendableRecordBatchStream> {
179 let stream = self
180 .stream
181 .lock()
182 .map_err(|err| DataFusionError::Execution(err.to_string()))?
183 .take();
184 if let Some(stream) = stream {
185 Ok(stream)
186 } else {
187 Err(DataFusionError::Execution(
188 "OneShotExec has already been executed".to_string(),
189 ))
190 }
191 }
192
193 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
194 Ok(Statistics::new_unknown(&self.schema))
195 }
196
197 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
198 &self.properties
199 }
200}
201
202struct TracedExec {
203 input: Arc<dyn ExecutionPlan>,
204 properties: PlanProperties,
205 span: Span,
206}
207
208impl TracedExec {
209 pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
210 Self {
211 properties: input.properties().clone(),
212 input,
213 span,
214 }
215 }
216}
217
218impl DisplayAs for TracedExec {
219 fn fmt_as(
220 &self,
221 t: datafusion::physical_plan::DisplayFormatType,
222 f: &mut std::fmt::Formatter,
223 ) -> std::fmt::Result {
224 match t {
225 DisplayFormatType::Default
226 | DisplayFormatType::Verbose
227 | DisplayFormatType::TreeRender => {
228 write!(f, "TracedExec")
229 }
230 }
231 }
232}
233
234impl std::fmt::Debug for TracedExec {
235 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
236 write!(f, "TracedExec")
237 }
238}
239impl ExecutionPlan for TracedExec {
240 fn name(&self) -> &str {
241 "TracedExec"
242 }
243
244 fn as_any(&self) -> &dyn std::any::Any {
245 self
246 }
247
248 fn properties(&self) -> &PlanProperties {
249 &self.properties
250 }
251
252 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
253 vec![&self.input]
254 }
255
256 fn with_new_children(
257 self: Arc<Self>,
258 children: Vec<Arc<dyn ExecutionPlan>>,
259 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
260 Ok(Arc::new(Self {
261 input: children[0].clone(),
262 properties: self.properties.clone(),
263 span: self.span.clone(),
264 }))
265 }
266
267 fn execute(
268 &self,
269 partition: usize,
270 context: Arc<TaskContext>,
271 ) -> datafusion_common::Result<SendableRecordBatchStream> {
272 let _guard = self.span.enter();
273 let stream = self.input.execute(partition, context)?;
274 let schema = stream.schema();
275 let stream = stream.stream_in_span(self.span.clone());
276 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
277 }
278}
279
280pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
282
283#[derive(Default, Clone)]
284pub struct LanceExecutionOptions {
285 pub use_spilling: bool,
286 pub mem_pool_size: Option<u64>,
287 pub batch_size: Option<usize>,
288 pub target_partition: Option<usize>,
289 pub execution_stats_callback: Option<ExecutionStatsCallback>,
290}
291
292impl std::fmt::Debug for LanceExecutionOptions {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 f.debug_struct("LanceExecutionOptions")
295 .field("use_spilling", &self.use_spilling)
296 .field("mem_pool_size", &self.mem_pool_size)
297 .field("batch_size", &self.batch_size)
298 .field("target_partition", &self.target_partition)
299 .field(
300 "execution_stats_callback",
301 &self.execution_stats_callback.is_some(),
302 )
303 .finish()
304 }
305}
306
307const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
308
309impl LanceExecutionOptions {
310 pub fn mem_pool_size(&self) -> u64 {
311 self.mem_pool_size.unwrap_or_else(|| {
312 std::env::var("LANCE_MEM_POOL_SIZE")
313 .map(|s| match s.parse::<u64>() {
314 Ok(v) => v,
315 Err(e) => {
316 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
317 DEFAULT_LANCE_MEM_POOL_SIZE
318 }
319 })
320 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
321 })
322 }
323
324 pub fn use_spilling(&self) -> bool {
325 if !self.use_spilling {
326 return false;
327 }
328 std::env::var("LANCE_BYPASS_SPILLING")
329 .map(|_| {
330 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
331 false
332 })
333 .unwrap_or(true)
334 }
335}
336
337pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
338 let mut session_config = SessionConfig::new();
339 let mut runtime_env_builder = RuntimeEnvBuilder::new();
340 if let Some(target_partition) = options.target_partition {
341 session_config = session_config.with_target_partitions(target_partition);
342 }
343 if options.use_spilling() {
344 runtime_env_builder = runtime_env_builder
345 .with_disk_manager_builder(DiskManagerBuilder::default())
346 .with_memory_pool(Arc::new(FairSpillPool::new(
347 options.mem_pool_size() as usize
348 )));
349 }
350 let runtime_env = runtime_env_builder.build_arc().unwrap();
351
352 let ctx = SessionContext::new_with_config_rt(session_config, runtime_env);
353 register_functions(&ctx);
354
355 ctx
356}
357
358static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
359 LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
360
361static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
362 new_session_context(&LanceExecutionOptions {
363 use_spilling: true,
364 ..Default::default()
365 })
366});
367
368pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
369 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
370 {
371 return if options.use_spilling() {
372 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
373 } else {
374 DEFAULT_SESSION_CONTEXT.clone()
375 };
376 }
377 new_session_context(options)
378}
379
380fn get_task_context(
381 session_ctx: &SessionContext,
382 options: &LanceExecutionOptions,
383) -> Arc<TaskContext> {
384 let mut state = session_ctx.state();
385 if let Some(batch_size) = options.batch_size.as_ref() {
386 state.config_mut().options_mut().execution.batch_size = *batch_size;
387 }
388
389 state.task_ctx()
390}
391
392#[derive(Default, Clone, Debug, PartialEq, Eq)]
393pub struct ExecutionSummaryCounts {
394 pub iops: usize,
396 pub requests: usize,
399 pub bytes_read: usize,
401 pub indices_loaded: usize,
403 pub parts_loaded: usize,
405 pub index_comparisons: usize,
407 pub all_counts: HashMap<String, usize>,
410}
411
412fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
413 if let Some(metrics) = node.metrics() {
414 for (metric_name, count) in metrics.iter_counts() {
415 match metric_name.as_ref() {
416 IOPS_METRIC => counts.iops += count.value(),
417 REQUESTS_METRIC => counts.requests += count.value(),
418 BYTES_READ_METRIC => counts.bytes_read += count.value(),
419 INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
420 PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
421 INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
422 _ => {
423 let existing = counts
424 .all_counts
425 .entry(metric_name.as_ref().to_string())
426 .or_insert(0);
427 *existing += count.value();
428 }
429 }
430 }
431 }
432 for child in node.children() {
433 visit_node(child.as_ref(), counts);
434 }
435}
436
437fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
438 let output_rows = plan
439 .metrics()
440 .map(|m| m.output_rows().unwrap_or(0))
441 .unwrap_or(0);
442 let mut counts = ExecutionSummaryCounts::default();
443 visit_node(plan, &mut counts);
444 tracing::info!(
445 target: TRACE_EXECUTION,
446 r#type = EXECUTION_PLAN_RUN,
447 output_rows,
448 iops = counts.iops,
449 requests = counts.requests,
450 bytes_read = counts.bytes_read,
451 indices_loaded = counts.indices_loaded,
452 parts_loaded = counts.parts_loaded,
453 index_comparisons = counts.index_comparisons,
454 );
455 if let Some(callback) = options.execution_stats_callback.as_ref() {
456 callback(&counts);
457 }
458}
459
460pub fn execute_plan(
464 plan: Arc<dyn ExecutionPlan>,
465 options: LanceExecutionOptions,
466) -> Result<SendableRecordBatchStream> {
467 debug!(
468 "Executing plan:\n{}",
469 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
470 );
471
472 let session_ctx = get_session_context(&options);
473
474 assert_eq!(plan.properties().partitioning.partition_count(), 1);
477 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
478
479 let schema = stream.schema();
480 let stream = stream.finally(move || {
481 report_plan_summary_metrics(plan.as_ref(), &options);
482 });
483 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
484}
485
486pub async fn analyze_plan(
487 plan: Arc<dyn ExecutionPlan>,
488 options: LanceExecutionOptions,
489) -> Result<String> {
490 let plan = Arc::new(TracedExec::new(plan, Span::current()));
493
494 let schema = plan.schema();
495 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
496
497 let session_ctx = get_session_context(&options);
498 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
499 let mut stream = analyze
500 .execute(0, get_task_context(&session_ctx, &options))
501 .map_err(|err| {
502 Error::io(
503 format!("Failed to execute analyze plan: {}", err),
504 location!(),
505 )
506 })?;
507
508 while (stream.next().await).is_some() {}
510
511 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
512 Ok(format!("{}", display.indent(true)))
513}
514
515pub trait SessionContextExt {
516 fn read_one_shot(
520 &self,
521 data: SendableRecordBatchStream,
522 ) -> datafusion::common::Result<DataFrame>;
523}
524
525struct OneShotPartitionStream {
526 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
527 schema: Arc<ArrowSchema>,
528}
529
530impl std::fmt::Debug for OneShotPartitionStream {
531 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
532 let data = self.data.lock().unwrap();
533 f.debug_struct("OneShotPartitionStream")
534 .field("exhausted", &data.is_none())
535 .field("schema", self.schema.as_ref())
536 .finish()
537 }
538}
539
540impl OneShotPartitionStream {
541 fn new(data: SendableRecordBatchStream) -> Self {
542 let schema = data.schema();
543 Self {
544 data: Arc::new(Mutex::new(Some(data))),
545 schema,
546 }
547 }
548}
549
550impl PartitionStream for OneShotPartitionStream {
551 fn schema(&self) -> &arrow_schema::SchemaRef {
552 &self.schema
553 }
554
555 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
556 let mut stream = self.data.lock().unwrap();
557 stream
558 .take()
559 .expect("Attempt to consume a one shot dataframe multiple times")
560 }
561}
562
563impl SessionContextExt for SessionContext {
564 fn read_one_shot(
565 &self,
566 data: SendableRecordBatchStream,
567 ) -> datafusion::common::Result<DataFrame> {
568 let schema = data.schema();
569 let part_stream = Arc::new(OneShotPartitionStream::new(data));
570 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
571 self.read_table(Arc::new(provider))
572 }
573}
574
575#[derive(Clone, Debug)]
576pub struct StrictBatchSizeExec {
577 input: Arc<dyn ExecutionPlan>,
578 batch_size: usize,
579}
580
581impl StrictBatchSizeExec {
582 pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
583 Self { input, batch_size }
584 }
585}
586
587impl DisplayAs for StrictBatchSizeExec {
588 fn fmt_as(
589 &self,
590 _t: datafusion::physical_plan::DisplayFormatType,
591 f: &mut std::fmt::Formatter,
592 ) -> std::fmt::Result {
593 write!(f, "StrictBatchSizeExec")
594 }
595}
596
597impl ExecutionPlan for StrictBatchSizeExec {
598 fn name(&self) -> &str {
599 "StrictBatchSizeExec"
600 }
601
602 fn as_any(&self) -> &dyn std::any::Any {
603 self
604 }
605
606 fn properties(&self) -> &PlanProperties {
607 self.input.properties()
608 }
609
610 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
611 vec![&self.input]
612 }
613
614 fn with_new_children(
615 self: Arc<Self>,
616 children: Vec<Arc<dyn ExecutionPlan>>,
617 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
618 Ok(Arc::new(Self {
619 input: children[0].clone(),
620 batch_size: self.batch_size,
621 }))
622 }
623
624 fn execute(
625 &self,
626 partition: usize,
627 context: Arc<TaskContext>,
628 ) -> datafusion_common::Result<SendableRecordBatchStream> {
629 let stream = self.input.execute(partition, context)?;
630 let schema = stream.schema();
631 let stream = StrictBatchSizeStream::new(stream, self.batch_size);
632 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
633 }
634
635 fn maintains_input_order(&self) -> Vec<bool> {
636 vec![true]
637 }
638
639 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
640 vec![false]
641 }
642
643 fn partition_statistics(
644 &self,
645 partition: Option<usize>,
646 ) -> datafusion_common::Result<Statistics> {
647 self.input.partition_statistics(partition)
648 }
649
650 fn cardinality_effect(&self) -> CardinalityEffect {
651 CardinalityEffect::Equal
652 }
653}