1use std::{
7 collections::HashMap,
8 sync::{Arc, Mutex},
9};
10
11use arrow_array::RecordBatch;
12use arrow_schema::Schema as ArrowSchema;
13use datafusion::{
14 catalog::streaming::StreamingTable,
15 dataframe::DataFrame,
16 execution::{
17 context::{SessionConfig, SessionContext},
18 disk_manager::DiskManagerConfig,
19 memory_pool::FairSpillPool,
20 runtime_env::RuntimeEnvBuilder,
21 TaskContext,
22 },
23 physical_plan::{
24 analyze::AnalyzeExec,
25 display::DisplayableExecutionPlan,
26 execution_plan::{Boundedness, EmissionType},
27 stream::RecordBatchStreamAdapter,
28 streaming::PartitionStream,
29 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
30 },
31};
32use datafusion_common::{DataFusionError, Statistics};
33use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
34use lazy_static::lazy_static;
35
36use futures::{stream, StreamExt};
37use lance_arrow::SchemaExt;
38use lance_core::{
39 utils::{
40 futures::FinallyStreamExt,
41 tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
42 },
43 Error, Result,
44};
45use log::{debug, info, warn};
46use snafu::location;
47
48use crate::utils::{
49 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
50 PARTS_LOADED_METRIC, REQUESTS_METRIC,
51};
52
53pub struct OneShotExec {
61 stream: Mutex<Option<SendableRecordBatchStream>>,
62 schema: Arc<ArrowSchema>,
65 properties: PlanProperties,
66}
67
68impl OneShotExec {
69 pub fn new(stream: SendableRecordBatchStream) -> Self {
71 let schema = stream.schema();
72 Self {
73 stream: Mutex::new(Some(stream)),
74 schema: schema.clone(),
75 properties: PlanProperties::new(
76 EquivalenceProperties::new(schema),
77 Partitioning::RoundRobinBatch(1),
78 EmissionType::Incremental,
79 Boundedness::Bounded,
80 ),
81 }
82 }
83
84 pub fn from_batch(batch: RecordBatch) -> Self {
85 let schema = batch.schema();
86 let stream = Box::pin(RecordBatchStreamAdapter::new(
87 schema,
88 stream::iter(vec![Ok(batch)]),
89 ));
90 Self::new(stream)
91 }
92}
93
94impl std::fmt::Debug for OneShotExec {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 let stream = self.stream.lock().unwrap();
97 f.debug_struct("OneShotExec")
98 .field("exhausted", &stream.is_none())
99 .field("schema", self.schema.as_ref())
100 .finish()
101 }
102}
103
104impl DisplayAs for OneShotExec {
105 fn fmt_as(
106 &self,
107 t: datafusion::physical_plan::DisplayFormatType,
108 f: &mut std::fmt::Formatter,
109 ) -> std::fmt::Result {
110 let stream = self.stream.lock().unwrap();
111 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
112 let columns = self
113 .schema
114 .field_names()
115 .iter()
116 .map(|s| s.to_string())
117 .collect::<Vec<_>>();
118 match t {
119 DisplayFormatType::Default | DisplayFormatType::Verbose => {
120 write!(
121 f,
122 "OneShotStream: {}columns=[{}]",
123 exhausted,
124 columns.join(",")
125 )
126 }
127 DisplayFormatType::TreeRender => {
128 write!(
129 f,
130 "OneShotStream\nexhausted={}\ncolumns=[{}]",
131 exhausted,
132 columns.join(",")
133 )
134 }
135 }
136 }
137}
138
139impl ExecutionPlan for OneShotExec {
140 fn name(&self) -> &str {
141 "OneShotExec"
142 }
143
144 fn as_any(&self) -> &dyn std::any::Any {
145 self
146 }
147
148 fn schema(&self) -> arrow_schema::SchemaRef {
149 self.schema.clone()
150 }
151
152 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
153 vec![]
154 }
155
156 fn with_new_children(
157 self: Arc<Self>,
158 _children: Vec<Arc<dyn ExecutionPlan>>,
159 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
160 todo!()
161 }
162
163 fn execute(
164 &self,
165 _partition: usize,
166 _context: Arc<datafusion::execution::TaskContext>,
167 ) -> datafusion_common::Result<SendableRecordBatchStream> {
168 let stream = self
169 .stream
170 .lock()
171 .map_err(|err| DataFusionError::Execution(err.to_string()))?
172 .take();
173 if let Some(stream) = stream {
174 Ok(stream)
175 } else {
176 Err(DataFusionError::Execution(
177 "OneShotExec has already been executed".to_string(),
178 ))
179 }
180 }
181
182 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
183 Ok(Statistics::new_unknown(&self.schema))
184 }
185
186 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
187 &self.properties
188 }
189}
190
191pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
193
194#[derive(Default, Clone)]
195pub struct LanceExecutionOptions {
196 pub use_spilling: bool,
197 pub mem_pool_size: Option<u64>,
198 pub batch_size: Option<usize>,
199 pub target_partition: Option<usize>,
200 pub execution_stats_callback: Option<ExecutionStatsCallback>,
201}
202
203impl std::fmt::Debug for LanceExecutionOptions {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("LanceExecutionOptions")
206 .field("use_spilling", &self.use_spilling)
207 .field("mem_pool_size", &self.mem_pool_size)
208 .field("batch_size", &self.batch_size)
209 .field("target_partition", &self.target_partition)
210 .field(
211 "execution_stats_callback",
212 &self.execution_stats_callback.is_some(),
213 )
214 .finish()
215 }
216}
217
218const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
219
220impl LanceExecutionOptions {
221 pub fn mem_pool_size(&self) -> u64 {
222 self.mem_pool_size.unwrap_or_else(|| {
223 std::env::var("LANCE_MEM_POOL_SIZE")
224 .map(|s| match s.parse::<u64>() {
225 Ok(v) => v,
226 Err(e) => {
227 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
228 DEFAULT_LANCE_MEM_POOL_SIZE
229 }
230 })
231 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
232 })
233 }
234
235 pub fn use_spilling(&self) -> bool {
236 if !self.use_spilling {
237 return false;
238 }
239 std::env::var("LANCE_BYPASS_SPILLING")
240 .map(|_| {
241 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
242 false
243 })
244 .unwrap_or(true)
245 }
246}
247
248pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
249 let mut session_config = SessionConfig::new();
250 let mut runtime_env_builder = RuntimeEnvBuilder::new();
251 if let Some(target_partition) = options.target_partition {
252 session_config = session_config.with_target_partitions(target_partition);
253 }
254 if options.use_spilling() {
255 runtime_env_builder = runtime_env_builder
256 .with_disk_manager(DiskManagerConfig::new())
257 .with_memory_pool(Arc::new(FairSpillPool::new(
258 options.mem_pool_size() as usize
259 )));
260 }
261 let runtime_env = runtime_env_builder.build_arc().unwrap();
262 SessionContext::new_with_config_rt(session_config, runtime_env)
263}
264
265lazy_static! {
266 static ref DEFAULT_SESSION_CONTEXT: SessionContext =
267 new_session_context(&LanceExecutionOptions::default());
268 static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
269 new_session_context(&LanceExecutionOptions {
270 use_spilling: true,
271 ..Default::default()
272 })
273 };
274}
275
276pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
277 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
278 {
279 return if options.use_spilling() {
280 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
281 } else {
282 DEFAULT_SESSION_CONTEXT.clone()
283 };
284 }
285 new_session_context(options)
286}
287
288fn get_task_context(
289 session_ctx: &SessionContext,
290 options: &LanceExecutionOptions,
291) -> Arc<TaskContext> {
292 let mut state = session_ctx.state();
293 if let Some(batch_size) = options.batch_size.as_ref() {
294 state.config_mut().options_mut().execution.batch_size = *batch_size;
295 }
296
297 state.task_ctx()
298}
299
300#[derive(Default, Clone, Debug, PartialEq, Eq)]
301pub struct ExecutionSummaryCounts {
302 pub iops: usize,
304 pub requests: usize,
307 pub bytes_read: usize,
309 pub indices_loaded: usize,
311 pub parts_loaded: usize,
313 pub index_comparisons: usize,
315 pub all_counts: HashMap<String, usize>,
318}
319
320fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
321 if let Some(metrics) = node.metrics() {
322 for (metric_name, count) in metrics.iter_counts() {
323 match metric_name.as_ref() {
324 IOPS_METRIC => counts.iops += count.value(),
325 REQUESTS_METRIC => counts.requests += count.value(),
326 BYTES_READ_METRIC => counts.bytes_read += count.value(),
327 INDICES_LOADED_METRIC => counts.indices_loaded += count.value(),
328 PARTS_LOADED_METRIC => counts.parts_loaded += count.value(),
329 INDEX_COMPARISONS_METRIC => counts.index_comparisons += count.value(),
330 _ => {
331 let existing = counts
332 .all_counts
333 .entry(metric_name.as_ref().to_string())
334 .or_insert(0);
335 *existing += count.value();
336 }
337 }
338 }
339 }
340 for child in node.children() {
341 visit_node(child.as_ref(), counts);
342 }
343}
344
345fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
346 let output_rows = plan
347 .metrics()
348 .map(|m| m.output_rows().unwrap_or(0))
349 .unwrap_or(0);
350 let mut counts = ExecutionSummaryCounts::default();
351 visit_node(plan, &mut counts);
352 tracing::info!(
353 target: TRACE_EXECUTION,
354 type = EXECUTION_PLAN_RUN,
355 output_rows,
356 iops = counts.iops,
357 requests = counts.requests,
358 bytes_read = counts.bytes_read,
359 indices_loaded = counts.indices_loaded,
360 parts_loaded = counts.parts_loaded,
361 index_comparisons = counts.index_comparisons,
362 );
363 if let Some(callback) = options.execution_stats_callback.as_ref() {
364 callback(&counts);
365 }
366}
367
368pub fn execute_plan(
372 plan: Arc<dyn ExecutionPlan>,
373 options: LanceExecutionOptions,
374) -> Result<SendableRecordBatchStream> {
375 debug!(
376 "Executing plan:\n{}",
377 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
378 );
379
380 let session_ctx = get_session_context(&options);
381
382 assert_eq!(plan.properties().partitioning.partition_count(), 1);
385 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
386
387 let schema = stream.schema();
388 let stream = stream.finally(move || {
389 report_plan_summary_metrics(plan.as_ref(), &options);
390 });
391 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
392}
393
394pub async fn analyze_plan(
395 plan: Arc<dyn ExecutionPlan>,
396 options: LanceExecutionOptions,
397) -> Result<String> {
398 let schema = plan.schema();
399 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
400
401 let session_ctx = get_session_context(&options);
402 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
403 let mut stream = analyze
404 .execute(0, get_task_context(&session_ctx, &options))
405 .map_err(|err| {
406 Error::io(
407 format!("Failed to execute analyze plan: {}", err),
408 location!(),
409 )
410 })?;
411
412 while (stream.next().await).is_some() {}
414
415 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
416 Ok(format!("{}", display.indent(true)))
417}
418
419pub trait SessionContextExt {
420 fn read_one_shot(
424 &self,
425 data: SendableRecordBatchStream,
426 ) -> datafusion::common::Result<DataFrame>;
427}
428
429struct OneShotPartitionStream {
430 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
431 schema: Arc<ArrowSchema>,
432}
433
434impl std::fmt::Debug for OneShotPartitionStream {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 let data = self.data.lock().unwrap();
437 f.debug_struct("OneShotPartitionStream")
438 .field("exhausted", &data.is_none())
439 .field("schema", self.schema.as_ref())
440 .finish()
441 }
442}
443
444impl OneShotPartitionStream {
445 fn new(data: SendableRecordBatchStream) -> Self {
446 let schema = data.schema();
447 Self {
448 data: Arc::new(Mutex::new(Some(data))),
449 schema,
450 }
451 }
452}
453
454impl PartitionStream for OneShotPartitionStream {
455 fn schema(&self) -> &arrow_schema::SchemaRef {
456 &self.schema
457 }
458
459 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
460 let mut stream = self.data.lock().unwrap();
461 stream
462 .take()
463 .expect("Attempt to consume a one shot dataframe multiple times")
464 }
465}
466
467impl SessionContextExt for SessionContext {
468 fn read_one_shot(
469 &self,
470 data: SendableRecordBatchStream,
471 ) -> datafusion::common::Result<DataFrame> {
472 let schema = data.schema();
473 let part_stream = Arc::new(OneShotPartitionStream::new(data));
474 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
475 self.read_table(Arc::new(provider))
476 }
477}