1use std::{
7 collections::HashMap,
8 sync::{Arc, LazyLock, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14 catalog::streaming::StreamingTable,
15 dataframe::DataFrame,
16 execution::{
17 context::{SessionConfig, SessionContext},
18 disk_manager::DiskManagerBuilder,
19 memory_pool::FairSpillPool,
20 runtime_env::RuntimeEnvBuilder,
21 TaskContext,
22 },
23 physical_plan::{
24 analyze::AnalyzeExec,
25 display::DisplayableExecutionPlan,
26 execution_plan::{Boundedness, EmissionType},
27 stream::RecordBatchStreamAdapter,
28 streaming::PartitionStream,
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30 },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34
35use futures::{stream, StreamExt};
36use lance_arrow::SchemaExt;
37use lance_core::{
38 utils::{
39 futures::FinallyStreamExt,
40 tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
41 },
42 Error, Result,
43};
44use log::{debug, info, warn};
45use snafu::location;
46
47use crate::utils::{
48 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
49 PARTS_LOADED_METRIC, REQUESTS_METRIC,
50};
51
52pub struct OneShotExec {
60 stream: Mutex<Option<SendableRecordBatchStream>>,
61 schema: Arc<ArrowSchema>,
64 properties: PlanProperties,
65}
66
67impl OneShotExec {
68 pub fn new(stream: SendableRecordBatchStream) -> Self {
70 let schema = stream.schema();
71 Self {
72 stream: Mutex::new(Some(stream)),
73 schema: schema.clone(),
74 properties: PlanProperties::new(
75 EquivalenceProperties::new(schema),
76 Partitioning::RoundRobinBatch(1),
77 EmissionType::Incremental,
78 Boundedness::Bounded,
79 ),
80 }
81 }
82
83 pub fn from_batch(batch: RecordBatch) -> Self {
84 let schema = batch.schema();
85 let stream = Box::pin(RecordBatchStreamAdapter::new(
86 schema,
87 stream::iter(vec![Ok(batch)]),
88 ));
89 Self::new(stream)
90 }
91}
92
93impl std::fmt::Debug for OneShotExec {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 let stream = self.stream.lock().unwrap();
96 f.debug_struct("OneShotExec")
97 .field("exhausted", &stream.is_none())
98 .field("schema", self.schema.as_ref())
99 .finish()
100 }
101}
102
103impl DisplayAs for OneShotExec {
104 fn fmt_as(
105 &self,
106 t: datafusion::physical_plan::DisplayFormatType,
107 f: &mut std::fmt::Formatter,
108 ) -> std::fmt::Result {
109 let stream = self.stream.lock().unwrap();
110 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
111 let columns = self
112 .schema
113 .field_names()
114 .iter()
115 .map(|s| s.to_string())
116 .collect::<Vec<_>>();
117 match t {
118 DisplayFormatType::Default | DisplayFormatType::Verbose => {
119 write!(
120 f,
121 "OneShotStream: {}columns=[{}]",
122 exhausted,
123 columns.join(",")
124 )
125 }
126 DisplayFormatType::TreeRender => {
127 write!(
128 f,
129 "OneShotStream\nexhausted={}\ncolumns=[{}]",
130 exhausted,
131 columns.join(",")
132 )
133 }
134 }
135 }
136}
137
138impl ExecutionPlan for OneShotExec {
139 fn name(&self) -> &str {
140 "OneShotExec"
141 }
142
143 fn as_any(&self) -> &dyn std::any::Any {
144 self
145 }
146
147 fn schema(&self) -> arrow_schema::SchemaRef {
148 self.schema.clone()
149 }
150
151 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
152 vec![]
153 }
154
155 fn with_new_children(
156 self: Arc<Self>,
157 _children: Vec<Arc<dyn ExecutionPlan>>,
158 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
159 todo!()
160 }
161
162 fn execute(
163 &self,
164 _partition: usize,
165 _context: Arc<datafusion::execution::TaskContext>,
166 ) -> datafusion_common::Result<SendableRecordBatchStream> {
167 let stream = self
168 .stream
169 .lock()
170 .map_err(|err| DataFusionError::Execution(err.to_string()))?
171 .take();
172 if let Some(stream) = stream {
173 Ok(stream)
174 } else {
175 Err(DataFusionError::Execution(
176 "OneShotExec has already been executed".to_string(),
177 ))
178 }
179 }
180
181 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
182 Ok(Statistics::new_unknown(&self.schema))
183 }
184
185 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
186 &self.properties
187 }
188}
189
190pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
192
193#[derive(Default, Clone)]
194pub struct LanceExecutionOptions {
195 pub use_spilling: bool,
196 pub mem_pool_size: Option<u64>,
197 pub batch_size: Option<usize>,
198 pub target_partition: Option<usize>,
199 pub execution_stats_callback: Option<ExecutionStatsCallback>,
200}
201
202impl std::fmt::Debug for LanceExecutionOptions {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("LanceExecutionOptions")
205 .field("use_spilling", &self.use_spilling)
206 .field("mem_pool_size", &self.mem_pool_size)
207 .field("batch_size", &self.batch_size)
208 .field("target_partition", &self.target_partition)
209 .field(
210 "execution_stats_callback",
211 &self.execution_stats_callback.is_some(),
212 )
213 .finish()
214 }
215}
216
217const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
218
219impl LanceExecutionOptions {
220 pub fn mem_pool_size(&self) -> u64 {
221 self.mem_pool_size.unwrap_or_else(|| {
222 std::env::var("LANCE_MEM_POOL_SIZE")
223 .map(|s| match s.parse::<u64>() {
224 Ok(v) => v,
225 Err(e) => {
226 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
227 DEFAULT_LANCE_MEM_POOL_SIZE
228 }
229 })
230 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
231 })
232 }
233
234 pub fn use_spilling(&self) -> bool {
235 if !self.use_spilling {
236 return false;
237 }
238 std::env::var("LANCE_BYPASS_SPILLING")
239 .map(|_| {
240 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
241 false
242 })
243 .unwrap_or(true)
244 }
245}
246
247pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
248 let mut session_config = SessionConfig::new();
249 let mut runtime_env_builder = RuntimeEnvBuilder::new();
250 if let Some(target_partition) = options.target_partition {
251 session_config = session_config.with_target_partitions(target_partition);
252 }
253 if options.use_spilling() {
254 runtime_env_builder = runtime_env_builder
255 .with_disk_manager_builder(DiskManagerBuilder::default())
256 .with_memory_pool(Arc::new(FairSpillPool::new(
257 options.mem_pool_size() as usize
258 )));
259 }
260 let runtime_env = runtime_env_builder.build_arc().unwrap();
261 SessionContext::new_with_config_rt(session_config, runtime_env)
262}
263
264static DEFAULT_SESSION_CONTEXT: LazyLock<SessionContext> =
265 LazyLock::new(|| new_session_context(&LanceExecutionOptions::default()));
266
267static DEFAULT_SESSION_CONTEXT_WITH_SPILLING: LazyLock<SessionContext> = LazyLock::new(|| {
268 new_session_context(&LanceExecutionOptions {
269 use_spilling: true,
270 ..Default::default()
271 })
272});
273
274pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
275 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
276 {
277 return if options.use_spilling() {
278 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
279 } else {
280 DEFAULT_SESSION_CONTEXT.clone()
281 };
282 }
283 new_session_context(options)
284}
285
286fn get_task_context(
287 session_ctx: &SessionContext,
288 options: &LanceExecutionOptions,
289) -> Arc<TaskContext> {
290 let mut state = session_ctx.state();
291 if let Some(batch_size) = options.batch_size.as_ref() {
292 state.config_mut().options_mut().execution.batch_size = *batch_size;
293 }
294
295 state.task_ctx()
296}
297
298#[derive(Default, Clone, Debug, PartialEq, Eq)]
299pub struct ExecutionSummaryCounts {
300 pub iops: usize,
302 pub requests: usize,
305 pub bytes_read: usize,
307 pub indices_loaded: usize,
309 pub parts_loaded: usize,
311 pub index_comparisons: usize,
313 pub all_counts: HashMap<String, usize>,
316}
317
318fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
319 if let Some(metrics) = node.metrics() {
320 for (metric_name, count) in metrics.iter_counts() {
321 match metric_name.as_ref() {
322 IOPS_METRIC => counts.iops += count.value(),
323 REQUESTS_METRIC => counts.requests += count.value(),
324 BYTES_READ_METRIC => counts.bytes_read += count.value(),
325 INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
326 PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
327 INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
328 _ => {
329 let existing = counts
330 .all_counts
331 .entry(metric_name.as_ref().to_string())
332 .or_insert(0);
333 *existing += count.value();
334 }
335 }
336 }
337 }
338 for child in node.children() {
339 visit_node(child.as_ref(), counts);
340 }
341}
342
343fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
344 let output_rows = plan
345 .metrics()
346 .map(|m| m.output_rows().unwrap_or(0))
347 .unwrap_or(0);
348 let mut counts = ExecutionSummaryCounts::default();
349 visit_node(plan, &mut counts);
350 tracing::info!(
351 target: TRACE_EXECUTION,
352 type = EXECUTION_PLAN_RUN,
353 output_rows,
354 iops = counts.iops,
355 requests = counts.requests,
356 bytes_read = counts.bytes_read,
357 indices_loaded = counts.indices_loaded,
358 parts_loaded = counts.parts_loaded,
359 index_comparisons = counts.index_comparisons,
360 );
361 if let Some(callback) = options.execution_stats_callback.as_ref() {
362 callback(&counts);
363 }
364}
365
366pub fn execute_plan(
370 plan: Arc<dyn ExecutionPlan>,
371 options: LanceExecutionOptions,
372) -> Result<SendableRecordBatchStream> {
373 debug!(
374 "Executing plan:\n{}",
375 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
376 );
377
378 let session_ctx = get_session_context(&options);
379
380 assert_eq!(plan.properties().partitioning.partition_count(), 1);
383 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
384
385 let schema = stream.schema();
386 let stream = stream.finally(move || {
387 report_plan_summary_metrics(plan.as_ref(), &options);
388 });
389 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
390}
391
392pub async fn analyze_plan(
393 plan: Arc<dyn ExecutionPlan>,
394 options: LanceExecutionOptions,
395) -> Result<String> {
396 let schema = plan.schema();
397 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
398
399 let session_ctx = get_session_context(&options);
400 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
401 let mut stream = analyze
402 .execute(0, get_task_context(&session_ctx, &options))
403 .map_err(|err| {
404 Error::io(
405 format!("Failed to execute analyze plan: {}", err),
406 location!(),
407 )
408 })?;
409
410 while (stream.next().await).is_some() {}
412
413 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
414 Ok(format!("{}", display.indent(true)))
415}
416
417pub trait SessionContextExt {
418 fn read_one_shot(
422 &self,
423 data: SendableRecordBatchStream,
424 ) -> datafusion::common::Result<DataFrame>;
425}
426
427struct OneShotPartitionStream {
428 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
429 schema: Arc<ArrowSchema>,
430}
431
432impl std::fmt::Debug for OneShotPartitionStream {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 let data = self.data.lock().unwrap();
435 f.debug_struct("OneShotPartitionStream")
436 .field("exhausted", &data.is_none())
437 .field("schema", self.schema.as_ref())
438 .finish()
439 }
440}
441
442impl OneShotPartitionStream {
443 fn new(data: SendableRecordBatchStream) -> Self {
444 let schema = data.schema();
445 Self {
446 data: Arc::new(Mutex::new(Some(data))),
447 schema,
448 }
449 }
450}
451
452impl PartitionStream for OneShotPartitionStream {
453 fn schema(&self) -> &arrow_schema::SchemaRef {
454 &self.schema
455 }
456
457 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
458 let mut stream = self.data.lock().unwrap();
459 stream
460 .take()
461 .expect("Attempt to consume a one shot dataframe multiple times")
462 }
463}
464
465impl SessionContextExt for SessionContext {
466 fn read_one_shot(
467 &self,
468 data: SendableRecordBatchStream,
469 ) -> datafusion::common::Result<DataFrame> {
470 let schema = data.schema();
471 let part_stream = Arc::new(OneShotPartitionStream::new(data));
472 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
473 self.read_table(Arc::new(provider))
474 }
475}