1use std::sync::{Arc, Mutex};
7
8use arrow_array::RecordBatch;
9use arrow_schema::Schema as ArrowSchema;
10use datafusion::{
11 catalog::streaming::StreamingTable,
12 dataframe::DataFrame,
13 execution::{
14 context::{SessionConfig, SessionContext},
15 disk_manager::DiskManagerConfig,
16 memory_pool::FairSpillPool,
17 runtime_env::RuntimeEnvBuilder,
18 TaskContext,
19 },
20 physical_plan::{
21 analyze::AnalyzeExec,
22 display::DisplayableExecutionPlan,
23 execution_plan::{Boundedness, EmissionType},
24 stream::RecordBatchStreamAdapter,
25 streaming::PartitionStream,
26 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
27 },
28};
29use datafusion_common::{DataFusionError, Statistics};
30use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
31use lazy_static::lazy_static;
32
33use futures::{stream, StreamExt};
34use lance_arrow::SchemaExt;
35use lance_core::{
36 utils::{
37 futures::FinallyStreamExt,
38 tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
39 },
40 Error, Result,
41};
42use log::{debug, info, warn};
43use snafu::location;
44
45use crate::utils::{
46 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
47 PARTS_LOADED_METRIC, REQUESTS_METRIC,
48};
49
50pub struct OneShotExec {
58 stream: Mutex<Option<SendableRecordBatchStream>>,
59 schema: Arc<ArrowSchema>,
62 properties: PlanProperties,
63}
64
65impl OneShotExec {
66 pub fn new(stream: SendableRecordBatchStream) -> Self {
68 let schema = stream.schema();
69 Self {
70 stream: Mutex::new(Some(stream)),
71 schema: schema.clone(),
72 properties: PlanProperties::new(
73 EquivalenceProperties::new(schema),
74 Partitioning::RoundRobinBatch(1),
75 EmissionType::Incremental,
76 Boundedness::Bounded,
77 ),
78 }
79 }
80
81 pub fn from_batch(batch: RecordBatch) -> Self {
82 let schema = batch.schema();
83 let stream = Box::pin(RecordBatchStreamAdapter::new(
84 schema,
85 stream::iter(vec![Ok(batch)]),
86 ));
87 Self::new(stream)
88 }
89}
90
91impl std::fmt::Debug for OneShotExec {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 let stream = self.stream.lock().unwrap();
94 f.debug_struct("OneShotExec")
95 .field("exhausted", &stream.is_none())
96 .field("schema", self.schema.as_ref())
97 .finish()
98 }
99}
100
101impl DisplayAs for OneShotExec {
102 fn fmt_as(
103 &self,
104 t: datafusion::physical_plan::DisplayFormatType,
105 f: &mut std::fmt::Formatter,
106 ) -> std::fmt::Result {
107 let stream = self.stream.lock().unwrap();
108 match t {
109 DisplayFormatType::Default | DisplayFormatType::Verbose => {
110 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
111 let columns = self
112 .schema
113 .field_names()
114 .iter()
115 .map(|s| s.to_string())
116 .collect::<Vec<_>>();
117 write!(
118 f,
119 "OneShotStream: {}columns=[{}]",
120 exhausted,
121 columns.join(",")
122 )
123 }
124 }
125 }
126}
127
128impl ExecutionPlan for OneShotExec {
129 fn name(&self) -> &str {
130 "OneShotExec"
131 }
132
133 fn as_any(&self) -> &dyn std::any::Any {
134 self
135 }
136
137 fn schema(&self) -> arrow_schema::SchemaRef {
138 self.schema.clone()
139 }
140
141 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142 vec![]
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 _children: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
149 todo!()
150 }
151
152 fn execute(
153 &self,
154 _partition: usize,
155 _context: Arc<datafusion::execution::TaskContext>,
156 ) -> datafusion_common::Result<SendableRecordBatchStream> {
157 let stream = self
158 .stream
159 .lock()
160 .map_err(|err| DataFusionError::Execution(err.to_string()))?
161 .take();
162 if let Some(stream) = stream {
163 Ok(stream)
164 } else {
165 Err(DataFusionError::Execution(
166 "OneShotExec has already been executed".to_string(),
167 ))
168 }
169 }
170
171 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
172 Ok(Statistics::new_unknown(&self.schema))
173 }
174
175 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
176 &self.properties
177 }
178}
179
180pub type ExecutionStatsCallback = Arc<dyn Fn(&ExecutionSummaryCounts) + Send + Sync>;
182
183#[derive(Default, Clone)]
184pub struct LanceExecutionOptions {
185 pub use_spilling: bool,
186 pub mem_pool_size: Option<u64>,
187 pub batch_size: Option<usize>,
188 pub target_partition: Option<usize>,
189 pub execution_stats_callback: Option<ExecutionStatsCallback>,
190}
191
192impl std::fmt::Debug for LanceExecutionOptions {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("LanceExecutionOptions")
195 .field("use_spilling", &self.use_spilling)
196 .field("mem_pool_size", &self.mem_pool_size)
197 .field("batch_size", &self.batch_size)
198 .field("target_partition", &self.target_partition)
199 .field(
200 "execution_stats_callback",
201 &self.execution_stats_callback.is_some(),
202 )
203 .finish()
204 }
205}
206
207const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
208
209impl LanceExecutionOptions {
210 pub fn mem_pool_size(&self) -> u64 {
211 self.mem_pool_size.unwrap_or_else(|| {
212 std::env::var("LANCE_MEM_POOL_SIZE")
213 .map(|s| match s.parse::<u64>() {
214 Ok(v) => v,
215 Err(e) => {
216 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
217 DEFAULT_LANCE_MEM_POOL_SIZE
218 }
219 })
220 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
221 })
222 }
223
224 pub fn use_spilling(&self) -> bool {
225 if !self.use_spilling {
226 return false;
227 }
228 std::env::var("LANCE_BYPASS_SPILLING")
229 .map(|_| {
230 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
231 false
232 })
233 .unwrap_or(true)
234 }
235}
236
237pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
238 let mut session_config = SessionConfig::new();
239 let mut runtime_env_builder = RuntimeEnvBuilder::new();
240 if let Some(target_partition) = options.target_partition {
241 session_config = session_config.with_target_partitions(target_partition);
242 }
243 if options.use_spilling() {
244 runtime_env_builder = runtime_env_builder
245 .with_disk_manager(DiskManagerConfig::new())
246 .with_memory_pool(Arc::new(FairSpillPool::new(
247 options.mem_pool_size() as usize
248 )));
249 }
250 let runtime_env = runtime_env_builder.build_arc().unwrap();
251 SessionContext::new_with_config_rt(session_config, runtime_env)
252}
253
254lazy_static! {
255 static ref DEFAULT_SESSION_CONTEXT: SessionContext =
256 new_session_context(&LanceExecutionOptions::default());
257 static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
258 new_session_context(&LanceExecutionOptions {
259 use_spilling: true,
260 ..Default::default()
261 })
262 };
263}
264
265pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
266 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
267 {
268 return if options.use_spilling() {
269 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
270 } else {
271 DEFAULT_SESSION_CONTEXT.clone()
272 };
273 }
274 new_session_context(options)
275}
276
277fn get_task_context(
278 session_ctx: &SessionContext,
279 options: &LanceExecutionOptions,
280) -> Arc<TaskContext> {
281 let mut state = session_ctx.state();
282 if let Some(batch_size) = options.batch_size.as_ref() {
283 state.config_mut().options_mut().execution.batch_size = *batch_size;
284 }
285
286 state.task_ctx()
287}
288
289#[derive(Default)]
290pub struct ExecutionSummaryCounts {
291 pub iops: usize,
292 pub requests: usize,
293 pub bytes_read: usize,
294 pub indices_loaded: usize,
295 pub parts_loaded: usize,
296 pub index_comparisons: usize,
297}
298
299fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) {
300 if let Some(metrics) = node.metrics() {
301 counts.iops += metrics
302 .find_count(IOPS_METRIC)
303 .map(|c| c.value())
304 .unwrap_or(0);
305 counts.requests += metrics
306 .find_count(REQUESTS_METRIC)
307 .map(|c| c.value())
308 .unwrap_or(0);
309 counts.bytes_read += metrics
310 .find_count(BYTES_READ_METRIC)
311 .map(|c| c.value())
312 .unwrap_or(0);
313 counts.indices_loaded += metrics
314 .find_count(INDICES_LOADED_METRIC)
315 .map(|c| c.value())
316 .unwrap_or(0);
317 counts.parts_loaded += metrics
318 .find_count(PARTS_LOADED_METRIC)
319 .map(|c| c.value())
320 .unwrap_or(0);
321 counts.index_comparisons += metrics
322 .find_count(INDEX_COMPARISONS_METRIC)
323 .map(|c| c.value())
324 .unwrap_or(0);
325 }
326 for child in node.children() {
327 visit_node(child.as_ref(), counts);
328 }
329}
330
331fn report_plan_summary_metrics(plan: &dyn ExecutionPlan, options: &LanceExecutionOptions) {
332 let output_rows = plan
333 .metrics()
334 .map(|m| m.output_rows().unwrap_or(0))
335 .unwrap_or(0);
336 let mut counts = ExecutionSummaryCounts::default();
337 visit_node(plan, &mut counts);
338 tracing::info!(
339 target: TRACE_EXECUTION,
340 type = EXECUTION_PLAN_RUN,
341 output_rows,
342 iops = counts.iops,
343 requests = counts.requests,
344 bytes_read = counts.bytes_read,
345 indices_loaded = counts.indices_loaded,
346 parts_loaded = counts.parts_loaded,
347 index_comparisons = counts.index_comparisons,
348 );
349 if let Some(callback) = options.execution_stats_callback.as_ref() {
350 callback(&counts);
351 }
352}
353
354pub fn execute_plan(
358 plan: Arc<dyn ExecutionPlan>,
359 options: LanceExecutionOptions,
360) -> Result<SendableRecordBatchStream> {
361 debug!(
362 "Executing plan:\n{}",
363 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
364 );
365
366 let session_ctx = get_session_context(&options);
367
368 assert_eq!(plan.properties().partitioning.partition_count(), 1);
371 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
372
373 let schema = stream.schema();
374 let stream = stream.finally(move || {
375 report_plan_summary_metrics(plan.as_ref(), &options);
376 });
377 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
378}
379
380pub async fn analyze_plan(
381 plan: Arc<dyn ExecutionPlan>,
382 options: LanceExecutionOptions,
383) -> Result<String> {
384 let schema = plan.schema();
385 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
386
387 let session_ctx = get_session_context(&options);
388 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
389 let mut stream = analyze
390 .execute(0, get_task_context(&session_ctx, &options))
391 .map_err(|err| {
392 Error::io(
393 format!("Failed to execute analyze plan: {}", err),
394 location!(),
395 )
396 })?;
397
398 while (stream.next().await).is_some() {}
400
401 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
402 Ok(format!("{}", display.indent(true)))
403}
404
405pub trait SessionContextExt {
406 fn read_one_shot(
410 &self,
411 data: SendableRecordBatchStream,
412 ) -> datafusion::common::Result<DataFrame>;
413}
414
415struct OneShotPartitionStream {
416 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
417 schema: Arc<ArrowSchema>,
418}
419
420impl std::fmt::Debug for OneShotPartitionStream {
421 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
422 let data = self.data.lock().unwrap();
423 f.debug_struct("OneShotPartitionStream")
424 .field("exhausted", &data.is_none())
425 .field("schema", self.schema.as_ref())
426 .finish()
427 }
428}
429
430impl OneShotPartitionStream {
431 fn new(data: SendableRecordBatchStream) -> Self {
432 let schema = data.schema();
433 Self {
434 data: Arc::new(Mutex::new(Some(data))),
435 schema,
436 }
437 }
438}
439
440impl PartitionStream for OneShotPartitionStream {
441 fn schema(&self) -> &arrow_schema::SchemaRef {
442 &self.schema
443 }
444
445 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
446 let mut stream = self.data.lock().unwrap();
447 stream
448 .take()
449 .expect("Attempt to consume a one shot dataframe multiple times")
450 }
451}
452
453impl SessionContextExt for SessionContext {
454 fn read_one_shot(
455 &self,
456 data: SendableRecordBatchStream,
457 ) -> datafusion::common::Result<DataFrame> {
458 let schema = data.schema();
459 let part_stream = Arc::new(OneShotPartitionStream::new(data));
460 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
461 self.read_table(Arc::new(provider))
462 }
463}