1use std::{
7 collections::HashMap,
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14 catalog::streaming::StreamingTable,
15 dataframe::DataFrame,
16 execution::{
17 context::{SessionConfig, SessionContext},
18 disk_manager::DiskManagerBuilder,
19 memory_pool::FairSpillPool,
20 runtime_env::RuntimeEnvBuilder,
21 TaskContext,
22 },
23 physical_plan::{
24 analyze::AnalyzeExec,
25 display::DisplayableExecutionPlan,
26 execution_plan::{Boundedness, CardinalityEffect, EmissionType},
27 stream::RecordBatchStreamAdapter,
28 streaming::PartitionStream,
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30 },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38 utils::{
39 futures::FinallyStreamExt,
40 tracing::{StreamTracingExt, EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41 },
42 Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46use tracing::Span;
47
48use crate::{
49 chunker::StrictBatchSizeStream,
50 utils::{
51 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC,
52 IOPS_METRIC, PARTS_LOADED_METRIC, REQUESTS_METRIC,
53 },
54};
55
56pub struct OneShotExec {
64 stream: Mutex<Option<SendableRecordBatchStream>>,
65 schema: Arc<ArrowSchema>,
68 properties: PlanProperties,
69}
70
71impl OneShotExec {
72 pub fn new(stream: SendableRecordBatchStream) -> Self {
74 let schema = stream.schema();
75 Self {
76 stream: Mutex::new(Some(stream)),
77 schema: schema.clone(),
78 properties: PlanProperties::new(
79 EquivalenceProperties::new(schema),
80 Partitioning::RoundRobinBatch(1),
81 EmissionType::Incremental,
82 Boundedness::Bounded,
83 ),
84 }
85 }
86
87 pub fn from_batch(batch: RecordBatch) -> Self {
88 let schema = batch.schema();
89 let stream = Box::pin(RecordBatchStreamAdapter::new(
90 schema,
91 stream::iter(vec![Ok(batch)]),
92 ));
93 Self::new(stream)
94 }
95}
96
97impl std::fmt::Debug for OneShotExec {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 let stream = self.stream.lock().unwrap();
100 f.debug_struct("OneShotExec")
101 .field("exhausted", &stream.is_none())
102 .field("schema", self.schema.as_ref())
103 .finish()
104 }
105}
106
107impl DisplayAs for OneShotExec {
108 fn fmt_as(
109 &self,
110 t: datafusion::physical_plan::DisplayFormatType,
111 f: &mut std::fmt::Formatter,
112 ) -> std::fmt::Result {
113 let stream = self.stream.lock().unwrap();
114 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
115 let columns = self
116 .schema
117 .field_names()
118 .iter()
119 .map(|s| s.to_string())
120 .collect::<Vec<_>>();
121 match t {
122 DisplayFormatType::Default | DisplayFormatType::Verbose => {
123 write!(
124 f,
125 "OneShotStream: {}columns=[{}]",
126 exhausted,
127 columns.join(",")
128 )
129 }
130 DisplayFormatType::TreeRender => {
131 write!(
132 f,
133 "OneShotStream\nexhausted={}\ncolumns=[{}]",
134 exhausted,
135 columns.join(",")
136 )
137 }
138 }
139 }
140}
141
142impl ExecutionPlan for OneShotExec {
143 fn name(&self) -> &str {
144 "OneShotExec"
145 }
146
147 fn as_any(&self) -> &dyn std::any::Any {
148 self
149 }
150
151 fn schema(&self) -> arrow_schema::SchemaRef {
152 self.schema.clone()
153 }
154
155 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
156 vec![]
157 }
158
159 fn with_new_children(
160 self: Arc<Self>,
161 _children: Vec<Arc<dyn ExecutionPlan>>,
162 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
163 todo!()
164 }
165
166 fn execute(
167 &self,
168 _partition: usize,
169 _context: Arc<datafusion::execution::TaskContext>,
170 ) -> datafusion_common::Result<SendableRecordBatchStream> {
171 let stream = self
172 .stream
173 .lock()
174 .map_err(|err| DataFusionError::Execution(err.to_string()))?
175 .take();
176 if let Some(stream) = stream {
177 Ok(stream)
178 } else {
179 Err(DataFusionError::Execution(
180 "OneShotExec has already been executed".to_string(),
181 ))
182 }
183 }
184
185 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
186 Ok(Statistics::new_unknown(&self.schema))
187 }
188
189 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
190 &self.properties
191 }
192}
193
194struct TracedExec {
195 input: Arc<dyn ExecutionPlan>,
196 properties: PlanProperties,
197 span: Span,
198}
199
200impl TracedExec {
201 pub fn new(input: Arc<dyn ExecutionPlan>, span: Span) -> Self {
202 Self {
203 properties: input.properties().clone(),
204 input,
205 span,
206 }
207 }
208}
209
210impl DisplayAs for TracedExec {
211 fn fmt_as(
212 &self,
213 t: datafusion::physical_plan::DisplayFormatType,
214 f: &mut std::fmt::Formatter,
215 ) -> std::fmt::Result {
216 match t {
217 DisplayFormatType::Default
218 | DisplayFormatType::Verbose
219 | DisplayFormatType::TreeRender => {
220 write!(f, "TracedExec")
221 }
222 }
223 }
224}
225
226impl std::fmt::Debug for TracedExec {
227 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
228 write!(f, "TracedExec")
229 }
230}
231impl ExecutionPlan for TracedExec {
232 fn name(&self) -> &str {
233 "TracedExec"
234 }
235
236 fn as_any(&self) -> &dyn std::any::Any {
237 self
238 }
239
240 fn properties(&self) -> &PlanProperties {
241 &self.properties
242 }
243
244 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
245 vec![&self.input]
246 }
247
248 fn with_new_children(
249 self: Arc<Self>,
250 children: Vec<Arc<dyn ExecutionPlan>>,
251 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
252 Ok(Arc::new(Self {
253 input: children[0].clone(),
254 properties: self.properties.clone(),
255 span: self.span.clone(),
256 }))
257 }
258
259 fn execute(
260 &self,
261 partition: usize,
262 context: Arc<TaskContext>,
263 ) -> datafusion_common::Result<SendableRecordBatchStream> {
264 let _guard = self.span.enter();
265 let stream = self.input.execute(partition, context)?;
266 let schema = stream.schema();
267 let stream = stream.stream_in_span(self.span.clone());
268 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
269 }
270}
271
272pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
274
275#[derive(Default, Clone)]
276pub struct LanceExecutionOptions {
277 pub use_spilling: bool,
278 pub mem_pool_size: Option<u64>,
279 pub batch_size: Option<usize>,
280 pub target_partition: Option<usize>,
281 pub execution_stats_callback: Option<ExecutionStatsCallback>,
282}
283
284impl std::fmt::Debug for LanceExecutionOptions {
285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286 f.debug_struct("LanceExecutionOptions")
287 .field("use_spilling", &self.use_spilling)
288 .field("mem_pool_size", &self.mem_pool_size)
289 .field("batch_size", &self.batch_size)
290 .field("target_partition", &self.target_partition)
291 .field(
292 "execution_stats_callback",
293 &self.execution_stats_callback.is_some(),
294 )
295 .finish()
296 }
297}
298
299const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
300
301impl LanceExecutionOptions {
302 pub fn mem_pool_size(&self) -> u64 {
303 self.mem_pool_size.unwrap_or_else(|| {
304 std::env::var("LANCE_MEM_POOL_SIZE")
305 .map(|s| match s.parse::<u64>() {
306 Ok(v) => v,
307 Err(e) => {
308 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
309 DEFAULT_LANCE_MEM_POOL_SIZE
310 }
311 })
312 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
313 })
314 }
315
316 pub fn use_spilling(&self) -> bool {
317 if !self.use_spilling {
318 return false;
319 }
320 std::env::var("LANCE_BYPASS_SPILLING")
321 .map(|_| {
322 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
323 false
324 })
325 .unwrap_or(true)
326 }
327}
328
329pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
330 let mut session_config = SessionConfig::new();
331 let mut runtime_env_builder = RuntimeEnvBuilder::new();
332 if let Some(target_partition) = options.target_partition {
333 session_config = session_config.with_target_partitions(target_partition);
334 }
335 if options.use_spilling() {
336 runtime_env_builder = runtime_env_builder
337 .with_disk_manager_builder(DiskManagerBuilder::default())
338 .with_memory_pool(Arc::new(FairSpillPool::new(
339 options.mem_pool_size() as usize
340 )));
341 }
342 let runtime_env = runtime_env_builder.build_arc().unwrap();
343 SessionContext::new_with_config_rt(session_config, runtime_env)
344}
345
346static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
347 LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
348
349static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
350 new_session_context(&LanceExecutionOptions {
351 use_spilling: true,
352 ..Default::default()
353 })
354});
355
356pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
357 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
358 {
359 return if options.use_spilling() {
360 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
361 } else {
362 DEFAULT_SESSION_CONTEXT.clone()
363 };
364 }
365 new_session_context(options)
366}
367
368fn get_task_context(
369 session_ctx: &SessionContext,
370 options: &LanceExecutionOptions,
371) -> Arc<TaskContext> {
372 let mut state = session_ctx.state();
373 if let Some(batch_size) = options.batch_size.as_ref() {
374 state.config_mut().options_mut().execution.batch_size = *batch_size;
375 }
376
377 state.task_ctx()
378}
379
380#[derive(Default, Clone, Debug, PartialEq, Eq)]
381pub struct ExecutionSummaryCounts {
382 pub iops: usize,
384 pub requests: usize,
387 pub bytes_read: usize,
389 pub indices_loaded: usize,
391 pub parts_loaded: usize,
393 pub index_comparisons: usize,
395 pub all_counts: HashMap<String, usize>,
398}
399
400fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
401 if let Some(metrics) = node.metrics() {
402 for (metric_name, count) in metrics.iter_counts() {
403 match metric_name.as_ref() {
404 IOPS_METRIC => counts.iops += count.value(),
405 REQUESTS_METRIC => counts.requests += count.value(),
406 BYTES_READ_METRIC => counts.bytes_read += count.value(),
407 INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
408 PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
409 INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
410 _ => {
411 let existing = counts
412 .all_counts
413 .entry(metric_name.as_ref().to_string())
414 .or_insert(0);
415 *existing += count.value();
416 }
417 }
418 }
419 }
420 for child in node.children() {
421 visit_node(child.as_ref(), counts);
422 }
423}
424
425fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
426 let output_rows = plan
427 .metrics()
428 .map(|m| m.output_rows().unwrap_or(0))
429 .unwrap_or(0);
430 let mut counts = ExecutionSummaryCounts::default();
431 visit_node(plan, &mut counts);
432 tracing::info!(
433 target: TRACE_EXECUTION,
434 r#type = EXECUTION_PLAN_RUN,
435 output_rows,
436 iops = counts.iops,
437 requests = counts.requests,
438 bytes_read = counts.bytes_read,
439 indices_loaded = counts.indices_loaded,
440 parts_loaded = counts.parts_loaded,
441 index_comparisons = counts.index_comparisons,
442 );
443 if let Some(callback) = options.execution_stats_callback.as_ref() {
444 callback(&counts);
445 }
446}
447
448pub fn execute_plan(
452 plan: Arc<dyn ExecutionPlan>,
453 options: LanceExecutionOptions,
454) -> Result<SendableRecordBatchStream> {
455 debug!(
456 "Executing plan:\n{}",
457 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
458 );
459
460 let session_ctx = get_session_context(&options);
461
462 assert_eq!(plan.properties().partitioning.partition_count(), 1);
465 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
466
467 let schema = stream.schema();
468 let stream = stream.finally(move || {
469 report_plan_summary_metrics(plan.as_ref(), &options);
470 });
471 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
472}
473
474pub async fn analyze_plan(
475 plan: Arc<dyn ExecutionPlan>,
476 options: LanceExecutionOptions,
477) -> Result<String> {
478 let plan = Arc::new(TracedExec::new(plan, Span::current()));
481
482 let schema = plan.schema();
483 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
484
485 let session_ctx = get_session_context(&options);
486 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
487 let mut stream = analyze
488 .execute(0, get_task_context(&session_ctx, &options))
489 .map_err(|err| {
490 Error::io(
491 format!("Failed to execute analyze plan: {}", err),
492 location!(),
493 )
494 })?;
495
496 while (stream.next().await).is_some() {}
498
499 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
500 Ok(format!("{}", display.indent(true)))
501}
502
503pub trait SessionContextExt {
504 fn read_one_shot(
508 &self,
509 data: SendableRecordBatchStream,
510 ) -> datafusion::common::Result<DataFrame>;
511}
512
513struct OneShotPartitionStream {
514 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
515 schema: Arc<ArrowSchema>,
516}
517
518impl std::fmt::Debug for OneShotPartitionStream {
519 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520 let data = self.data.lock().unwrap();
521 f.debug_struct("OneShotPartitionStream")
522 .field("exhausted", &data.is_none())
523 .field("schema", self.schema.as_ref())
524 .finish()
525 }
526}
527
528impl OneShotPartitionStream {
529 fn new(data: SendableRecordBatchStream) -> Self {
530 let schema = data.schema();
531 Self {
532 data: Arc::new(Mutex::new(Some(data))),
533 schema,
534 }
535 }
536}
537
538impl PartitionStream for OneShotPartitionStream {
539 fn schema(&self) -> &arrow_schema::SchemaRef {
540 &self.schema
541 }
542
543 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
544 let mut stream = self.data.lock().unwrap();
545 stream
546 .take()
547 .expect("Attempt to consume a one shot dataframe multiple times")
548 }
549}
550
551impl SessionContextExt for SessionContext {
552 fn read_one_shot(
553 &self,
554 data: SendableRecordBatchStream,
555 ) -> datafusion::common::Result<DataFrame> {
556 let schema = data.schema();
557 let part_stream = Arc::new(OneShotPartitionStream::new(data));
558 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
559 self.read_table(Arc::new(provider))
560 }
561}
562
563#[derive(Clone, Debug)]
564pub struct StrictBatchSizeExec {
565 input: Arc<dyn ExecutionPlan>,
566 batch_size: usize,
567}
568
569impl StrictBatchSizeExec {
570 pub fn new(input: Arc<dyn ExecutionPlan>, batch_size: usize) -> Self {
571 Self { input, batch_size }
572 }
573}
574
575impl DisplayAs for StrictBatchSizeExec {
576 fn fmt_as(
577 &self,
578 _t: datafusion::physical_plan::DisplayFormatType,
579 f: &mut std::fmt::Formatter,
580 ) -> std::fmt::Result {
581 write!(f, "StrictBatchSizeExec")
582 }
583}
584
585impl ExecutionPlan for StrictBatchSizeExec {
586 fn name(&self) -> &str {
587 "StrictBatchSizeExec"
588 }
589
590 fn as_any(&self) -> &dyn std::any::Any {
591 self
592 }
593
594 fn properties(&self) -> &PlanProperties {
595 self.input.properties()
596 }
597
598 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
599 vec![&self.input]
600 }
601
602 fn with_new_children(
603 self: Arc<Self>,
604 children: Vec<Arc<dyn ExecutionPlan>>,
605 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
606 Ok(Arc::new(Self {
607 input: children[0].clone(),
608 batch_size: self.batch_size,
609 }))
610 }
611
612 fn execute(
613 &self,
614 partition: usize,
615 context: Arc<TaskContext>,
616 ) -> datafusion_common::Result<SendableRecordBatchStream> {
617 let stream = self.input.execute(partition, context)?;
618 let schema = stream.schema();
619 let stream = StrictBatchSizeStream::new(stream, self.batch_size);
620 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
621 }
622
623 fn maintains_input_order(&self) -> Vec<bool> {
624 vec![true]
625 }
626
627 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
628 vec![false]
629 }
630
631 fn partition_statistics(
632 &self,
633 partition: Option<usize>,
634 ) -> datafusion_common::Result<Statistics> {
635 self.input.partition_statistics(partition)
636 }
637
638 fn cardinality_effect(&self) -> CardinalityEffect {
639 CardinalityEffect::Equal
640 }
641}