1use std::{
7 collections::HashMap,
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14 catalog::streaming::StreamingTable,
15 dataframe::DataFrame,
16 execution::{
17 context::{SessionConfig, SessionContext},
18 disk_manager::DiskManagerBuilder,
19 memory_pool::FairSpillPool,
20 runtime_env::RuntimeEnvBuilder,
21 TaskContext,
22 },
23 physical_plan::{
24 analyze::AnalyzeExec,
25 display::DisplayableExecutionPlan,
26 execution_plan::{Boundedness, CardinalityEffect, EmissionType},
27 stream::RecordBatchStreamAdapter,
28 streaming::PartitionStream,
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30 },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38 utils::{
39 futures::FinallyStreamExt,
40 tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41 },
42 Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46use tracing::Span;
47
48use crate::{
49 chunker::StrictBatchSizeStream,
50 utils::{
51 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
52 IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
53 },
54};
55
56pub struct OneShotExec {
64 stream: Mutex<Option<SendableRecordBatchStream>>,
65 schema: Arc<ArrowSchema>,
68 properties: PlanProperties,
69}
70
71impl OneShotExec {
72 pub fn new(stream: SendableRecordBatchStream) -> Self {
74 let schema = stream.schema();
75 Self {
76 stream: Mutex::new(Some(stream)),
77 schema: schema.clone(),
78 properties: PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Partitioning::RoundRobinBatch(1),
81 EmissionType::Incremental,
82 Boundedness::Bounded,
83 ),
84 }
85 }
86
87 pub fn from_batch(batch: RecordBatch) -> Self {
88 let schema = batch.schema();
89 let stream = Box::pin(RecordBatchStreamAdapter::new(
90 schema,
91 stream::iter(vec![Ok(batch)]),
92 ));
93 Self::new(stream)
94 }
95}
96
97impl std::fmt::Debug for OneShotExec {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 let stream = self.stream.lock().unwrap();
100 f.debug_struct("OneShotExec")
101 .field("exhausted", &stream.is_none())
102 .field("schema", self.schema.as_ref())
103 .finish()
104 }
105}
106
107impl DisplayAs for OneShotExec {
108 fn fmt_as(
109 &self,
110 t: datafusion::physical_plan::DisplayFormatType,
111 f: &mut std::fmt::Formatter,
112 ) -> std::fmt::Result {
113 let stream = self.stream.lock().unwrap();
114 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
115 let columns = self
116 .schema
117 .field_names()
118 .iter()
119 .map(|s| s.to_string())
120 .collect::<Vec<_>>();
121 match t {
122 DisplayFormatType::Default | DisplayFormatType::Verbose => {
123 write!(
124 f,
125 "OneShotStream: {}columns=[{}]",
126 exhausted,
127 columns.join(",")
128 )
129 }
130 DisplayFormatType::TreeRender => {
131 write!(
132 f,
133 "OneShotStream\nexhausted={}\ncolumns=[{}]",
134 exhausted,
135 columns.join(",")
136 )
137 }
138 }
139 }
140}
141
142impl ExecutionPlan for OneShotExec {
143 fn name(&self) -> &str {
144 "OneShotExec"
145 }
146
147 fn as_any(&self) -> &dyn std::any::Any {
148 self
149 }
150
151 fn schema(&self) -> arrow_schema::SchemaRef {
152 self.schema.clone()
153 }
154
155 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
156 vec![]
157 }
158
159 fn with_new_children(
160 self: Arc<Self>,
161 children: Vec<Arc<dyn ExecutionPlan>>,
162 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
163 if !children.is_empty() {
165 return Err(datafusion_common::DataFusionError::Internal(
166 "OneShotExec does not support children".to_string(),
167 ));
168 }
169 Ok(self)
170 }
171
172 fn execute(
173 &self,
174 _partition: usize,
175 _context: Arc<datafusion::execution::TaskContext>,
176 ) -> datafusion_common::Result<SendableRecordBatchStream> {
177 let stream = self
178 .stream
179 .lock()
180 .map_err(|err| DataFusionError::Execution(err.to_string()))?
181 .take();
182 if let Some(stream) = stream {
183 Ok(stream)
184 } else {
185 Err(DataFusionError::Execution(
186 "OneShotExec has already been executed".to_string(),
187 ))
188 }
189 }
190
191 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
192 Ok(Statistics::new_unknown(&self.schema))
193 }
194
195 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
196 &self.properties
197 }
198}
199
200struct TracedExec {
201 input: Arc<dyn ExecutionPlan>,
202 properties: PlanProperties,
203 span: Span,
204}
205
206impl TracedExec {
207 pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
208 Self {
209 properties: input.properties().clone(),
210 input,
211 span,
212 }
213 }
214}
215
216impl DisplayAs for TracedExec {
217 fn fmt_as(
218 &self,
219 t: datafusion::physical_plan::DisplayFormatType,
220 f: &mut std::fmt::Formatter,
221 ) -> std::fmt::Result {
222 match t {
223 DisplayFormatType::Default
224 | DisplayFormatType::Verbose
225 | DisplayFormatType::TreeRender => {
226 write!(f, "TracedExec")
227 }
228 }
229 }
230}
231
232impl std::fmt::Debug for TracedExec {
233 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
234 write!(f, "TracedExec")
235 }
236}
237impl ExecutionPlan for TracedExec {
238 fn name(&self) -> &str {
239 "TracedExec"
240 }
241
242 fn as_any(&self) -> &dyn std::any::Any {
243 self
244 }
245
246 fn properties(&self) -> &PlanProperties {
247 &self.properties
248 }
249
250 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
251 vec![&self.input]
252 }
253
254 fn with_new_children(
255 self: Arc<Self>,
256 children: Vec<Arc<dyn ExecutionPlan>>,
257 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
258 Ok(Arc::new(Self {
259 input: children[0].clone(),
260 properties: self.properties.clone(),
261 span: self.span.clone(),
262 }))
263 }
264
265 fn execute(
266 &self,
267 partition: usize,
268 context: Arc<TaskContext>,
269 ) -> datafusion_common::Result<SendableRecordBatchStream> {
270 let _guard = self.span.enter();
271 let stream = self.input.execute(partition, context)?;
272 let schema = stream.schema();
273 let stream = stream.stream_in_span(self.span.clone());
274 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
275 }
276}
277
278pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
280
281#[derive(Default, Clone)]
282pub struct LanceExecutionOptions {
283 pub use_spilling: bool,
284 pub mem_pool_size: Option<u64>,
285 pub batch_size: Option<usize>,
286 pub target_partition: Option<usize>,
287 pub execution_stats_callback: Option<ExecutionStatsCallback>,
288}
289
290impl std::fmt::Debug for LanceExecutionOptions {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 f.debug_struct("LanceExecutionOptions")
293 .field("use_spilling", &self.use_spilling)
294 .field("mem_pool_size", &self.mem_pool_size)
295 .field("batch_size", &self.batch_size)
296 .field("target_partition", &self.target_partition)
297 .field(
298 "execution_stats_callback",
299 &self.execution_stats_callback.is_some(),
300 )
301 .finish()
302 }
303}
304
305const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
306
307impl LanceExecutionOptions {
308 pub fn mem_pool_size(&self) -> u64 {
309 self.mem_pool_size.unwrap_or_else(|| {
310 std::env::var("LANCE_MEM_POOL_SIZE")
311 .map(|s| match s.parse::<u64>() {
312 Ok(v) => v,
313 Err(e) => {
314 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
315 DEFAULT_LANCE_MEM_POOL_SIZE
316 }
317 })
318 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
319 })
320 }
321
322 pub fn use_spilling(&self) -> bool {
323 if !self.use_spilling {
324 return false;
325 }
326 std::env::var("LANCE_BYPASS_SPILLING")
327 .map(|_| {
328 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
329 false
330 })
331 .unwrap_or(true)
332 }
333}
334
335pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
336 let mut session_config = SessionConfig::new();
337 let mut runtime_env_builder = RuntimeEnvBuilder::new();
338 if let Some(target_partition) = options.target_partition {
339 session_config = session_config.with_target_partitions(target_partition);
340 }
341 if options.use_spilling() {
342 runtime_env_builder = runtime_env_builder
343 .with_disk_manager_builder(DiskManagerBuilder::default())
344 .with_memory_pool(Arc::new(FairSpillPool::new(
345 options.mem_pool_size() as usize
346 )));
347 }
348 let runtime_env = runtime_env_builder.build_arc().unwrap();
349 SessionContext::new_with_config_rt(session_config, runtime_env)
350}
351
352static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
353 LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
354
355static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
356 new_session_context(&LanceExecutionOptions {
357 use_spilling: true,
358 ..Default::default()
359 })
360});
361
362pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
363 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
364 {
365 return if options.use_spilling() {
366 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
367 } else {
368 DEFAULT_SESSION_CONTEXT.clone()
369 };
370 }
371 new_session_context(options)
372}
373
374fn get_task_context(
375 session_ctx: &SessionContext,
376 options: &LanceExecutionOptions,
377) -> Arc<TaskContext> {
378 let mut state = session_ctx.state();
379 if let Some(batch_size) = options.batch_size.as_ref() {
380 state.config_mut().options_mut().execution.batch_size = *batch_size;
381 }
382
383 state.task_ctx()
384}
385
386#[derive(Default, Clone, Debug, PartialEq, Eq)]
387pub struct ExecutionSummaryCounts {
388 pub iops: usize,
390 pub requests: usize,
393 pub bytes_read: usize,
395 pub indices_loaded: usize,
397 pub parts_loaded: usize,
399 pub index_comparisons: usize,
401 pub all_counts: HashMap<String, usize>,
404}
405
406fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
407 if let Some(metrics) = node.metrics() {
408 for (metric_name, count) in metrics.iter_counts() {
409 match metric_name.as_ref() {
410 IOPS_METRIC => counts.iops += count.value(),
411 REQUESTS_METRIC => counts.requests += count.value(),
412 BYTES_READ_METRIC => counts.bytes_read += count.value(),
413 INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
414 PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
415 INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
416 _ => {
417 let existing = counts
418 .all_counts
419 .entry(metric_name.as_ref().to_string())
420 .or_insert(0);
421 *existing += count.value();
422 }
423 }
424 }
425 }
426 for child in node.children() {
427 visit_node(child.as_ref(), counts);
428 }
429}
430
431fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
432 let output_rows = plan
433 .metrics()
434 .map(|m| m.output_rows().unwrap_or(0))
435 .unwrap_or(0);
436 let mut counts = ExecutionSummaryCounts::default();
437 visit_node(plan, &mut counts);
438 tracing::info!(
439 target: TRACE_EXECUTION,
440 r#type = EXECUTION_PLAN_RUN,
441 output_rows,
442 iops = counts.iops,
443 requests = counts.requests,
444 bytes_read = counts.bytes_read,
445 indices_loaded = counts.indices_loaded,
446 parts_loaded = counts.parts_loaded,
447 index_comparisons = counts.index_comparisons,
448 );
449 if let Some(callback) = options.execution_stats_callback.as_ref() {
450 callback(&counts);
451 }
452}
453
454pub fn execute_plan(
458 plan: Arc<dyn ExecutionPlan>,
459 options: LanceExecutionOptions,
460) -> Result<SendableRecordBatchStream> {
461 debug!(
462 "Executing plan:\n{}",
463 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
464 );
465
466 let session_ctx = get_session_context(&options);
467
468 assert_eq!(plan.properties().partitioning.partition_count(), 1);
471 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
472
473 let schema = stream.schema();
474 let stream = stream.finally(move || {
475 report_plan_summary_metrics(plan.as_ref(), &options);
476 });
477 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
478}
479
480pub async fn analyze_plan(
481 plan: Arc<dyn ExecutionPlan>,
482 options: LanceExecutionOptions,
483) -> Result<String> {
484 let plan = Arc::new(TracedExec::new(plan, Span::current()));
487
488 let schema = plan.schema();
489 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
490
491 let session_ctx = get_session_context(&options);
492 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
493 let mut stream = analyze
494 .execute(0, get_task_context(&session_ctx, &options))
495 .map_err(|err| {
496 Error::io(
497 format!("Failed to execute analyze plan: {}", err),
498 location!(),
499 )
500 })?;
501
502 while (stream.next().await).is_some() {}
504
505 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
506 Ok(format!("{}", display.indent(true)))
507}
508
509pub trait SessionContextExt {
510 fn read_one_shot(
514 &self,
515 data: SendableRecordBatchStream,
516 ) -> datafusion::common::Result<DataFrame>;
517}
518
519struct OneShotPartitionStream {
520 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
521 schema: Arc<ArrowSchema>,
522}
523
524impl std::fmt::Debug for OneShotPartitionStream {
525 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
526 let data = self.data.lock().unwrap();
527 f.debug_struct("OneShotPartitionStream")
528 .field("exhausted", &data.is_none())
529 .field("schema", self.schema.as_ref())
530 .finish()
531 }
532}
533
534impl OneShotPartitionStream {
535 fn new(data: SendableRecordBatchStream) -> Self {
536 let schema = data.schema();
537 Self {
538 data: Arc::new(Mutex::new(Some(data))),
539 schema,
540 }
541 }
542}
543
544impl PartitionStream for OneShotPartitionStream {
545 fn schema(&self) -> &arrow_schema::SchemaRef {
546 &self.schema
547 }
548
549 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
550 let mut stream = self.data.lock().unwrap();
551 stream
552 .take()
553 .expect("Attempt to consume a one shot dataframe multiple times")
554 }
555}
556
557impl SessionContextExt for SessionContext {
558 fn read_one_shot(
559 &self,
560 data: SendableRecordBatchStream,
561 ) -> datafusion::common::Result<DataFrame> {
562 let schema = data.schema();
563 let part_stream = Arc::new(OneShotPartitionStream::new(data));
564 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
565 self.read_table(Arc::new(provider))
566 }
567}
568
569#[derive(Clone, Debug)]
570pub struct StrictBatchSizeExec {
571 input: Arc<dyn ExecutionPlan>,
572 batch_size: usize,
573}
574
575impl StrictBatchSizeExec {
576 pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
577 Self { input, batch_size }
578 }
579}
580
581impl DisplayAs for StrictBatchSizeExec {
582 fn fmt_as(
583 &self,
584 _t: datafusion::physical_plan::DisplayFormatType,
585 f: &mut std::fmt::Formatter,
586 ) -> std::fmt::Result {
587 write!(f, "StrictBatchSizeExec")
588 }
589}
590
591impl ExecutionPlan for StrictBatchSizeExec {
592 fn name(&self) -> &str {
593 "StrictBatchSizeExec"
594 }
595
596 fn as_any(&self) -> &dyn std::any::Any {
597 self
598 }
599
600 fn properties(&self) -> &PlanProperties {
601 self.input.properties()
602 }
603
604 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
605 vec![&self.input]
606 }
607
608 fn with_new_children(
609 self: Arc<Self>,
610 children: Vec<Arc<dyn ExecutionPlan>>,
611 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
612 Ok(Arc::new(Self {
613 input: children[0].clone(),
614 batch_size: self.batch_size,
615 }))
616 }
617
618 fn execute(
619 &self,
620 partition: usize,
621 context: Arc<TaskContext>,
622 ) -> datafusion_common::Result<SendableRecordBatchStream> {
623 let stream = self.input.execute(partition, context)?;
624 let schema = stream.schema();
625 let stream = StrictBatchSizeStream::new(stream, self.batch_size);
626 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
627 }
628
629 fn maintains_input_order(&self) -> Vec<bool> {
630 vec![true]
631 }
632
633 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
634 vec![false]
635 }
636
637 fn partition_statistics(
638 &self,
639 partition: Option<usize>,
640 ) -> datafusion_common::Result<Statistics> {
641 self.input.partition_statistics(partition)
642 }
643
644 fn cardinality_effect(&self) -> CardinalityEffect {
645 CardinalityEffect::Equal
646 }
647}