1use std::sync::{Arc, Mutex};
7
8use arrow_array::RecordBatch;
9use arrow_schema::Schema as ArrowSchema;
10use datafusion::{
11 catalog::streaming::StreamingTable,
12 dataframe::DataFrame,
13 execution::{
14 context::{SessionConfig, SessionContext},
15 disk_manager::DiskManagerConfig,
16 memory_pool::FairSpillPool,
17 runtime_env::RuntimeEnvBuilder,
18 TaskContext,
19 },
20 physical_plan::{
21 analyze::AnalyzeExec,
22 display::DisplayableExecutionPlan,
23 execution_plan::{Boundedness, EmissionType},
24 stream::RecordBatchStreamAdapter,
25 streaming::PartitionStream,
26 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
27 },
28};
29use datafusion_common::{DataFusionError, Statistics};
30use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
31use lazy_static::lazy_static;
32
33use futures::{stream, StreamExt};
34use lance_arrow::SchemaExt;
35use lance_core::{
36 utils::{
37 futures::FinallyStreamExt,
38 tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
39 },
40 Error, Result,
41};
42use log::{debug, info, warn};
43use snafu::location;
44
45use crate::utils::{
46 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
47 PARTS_LOADED_METRIC, REQUESTS_METRIC,
48};
49
50pub struct OneShotExec {
58 stream: Mutex<Option<SendableRecordBatchStream>>,
59 schema: Arc<ArrowSchema>,
62 properties: PlanProperties,
63}
64
65impl OneShotExec {
66 pub fn new(stream: SendableRecordBatchStream) -> Self {
68 let schema = stream.schema();
69 Self {
70 stream: Mutex::new(Some(stream)),
71 schema: schema.clone(),
72 properties: PlanProperties::new(
73 EquivalenceProperties::new(schema),
74 Partitioning::RoundRobinBatch(1),
75 EmissionType::Incremental,
76 Boundedness::Bounded,
77 ),
78 }
79 }
80
81 pub fn from_batch(batch: RecordBatch) -> Self {
82 let schema = batch.schema();
83 let stream = Box::pin(RecordBatchStreamAdapter::new(
84 schema,
85 stream::iter(vec![Ok(batch)]),
86 ));
87 Self::new(stream)
88 }
89}
90
91impl std::fmt::Debug for OneShotExec {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 let stream = self.stream.lock().unwrap();
94 f.debug_struct("OneShotExec")
95 .field("exhausted", &stream.is_none())
96 .field("schema", self.schema.as_ref())
97 .finish()
98 }
99}
100
101impl DisplayAs for OneShotExec {
102 fn fmt_as(
103 &self,
104 t: datafusion::physical_plan::DisplayFormatType,
105 f: &mut std::fmt::Formatter,
106 ) -> std::fmt::Result {
107 let stream = self.stream.lock().unwrap();
108 match t {
109 DisplayFormatType::Default | DisplayFormatType::Verbose => {
110 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
111 let columns = self
112 .schema
113 .field_names()
114 .iter()
115 .map(|s| s.to_string())
116 .collect::<Vec<_>>();
117 write!(
118 f,
119 "OneShotStream: {}columns=[{}]",
120 exhausted,
121 columns.join(",")
122 )
123 }
124 }
125 }
126}
127
128impl ExecutionPlan for OneShotExec {
129 fn name(&self) -> &str {
130 "OneShotExec"
131 }
132
133 fn as_any(&self) -> &dyn std::any::Any {
134 self
135 }
136
137 fn schema(&self) -> arrow_schema::SchemaRef {
138 self.schema.clone()
139 }
140
141 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142 vec![]
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 _children: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
149 todo!()
150 }
151
152 fn execute(
153 &self,
154 _partition: usize,
155 _context: Arc<datafusion::execution::TaskContext>,
156 ) -> datafusion_common::Result<SendableRecordBatchStream> {
157 let stream = self
158 .stream
159 .lock()
160 .map_err(|err| DataFusionError::Execution(err.to_string()))?
161 .take();
162 if let Some(stream) = stream {
163 Ok(stream)
164 } else {
165 Err(DataFusionError::Execution(
166 "OneShotExec has already been executed".to_string(),
167 ))
168 }
169 }
170
171 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
172 Ok(Statistics::new_unknown(&self.schema))
173 }
174
175 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
176 &self.properties
177 }
178}
179
180#[derive(Debug, Default, Clone)]
181pub struct LanceExecutionOptions {
182 pub use_spilling: bool,
183 pub mem_pool_size: Option<u64>,
184 pub batch_size: Option<usize>,
185 pub target_partition: Option<usize>,
186}
187
188const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
189
190impl LanceExecutionOptions {
191 pub fn mem_pool_size(&self) -> u64 {
192 self.mem_pool_size.unwrap_or_else(|| {
193 std::env::var("LANCE_MEM_POOL_SIZE")
194 .map(|s| match s.parse::<u64>() {
195 Ok(v) => v,
196 Err(e) => {
197 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
198 DEFAULT_LANCE_MEM_POOL_SIZE
199 }
200 })
201 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
202 })
203 }
204
205 pub fn use_spilling(&self) -> bool {
206 if !self.use_spilling {
207 return false;
208 }
209 std::env::var("LANCE_BYPASS_SPILLING")
210 .map(|_| {
211 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
212 false
213 })
214 .unwrap_or(true)
215 }
216}
217
218pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
219 let mut session_config = SessionConfig::new();
220 let mut runtime_env_builder = RuntimeEnvBuilder::new();
221 if let Some(target_partition) = options.target_partition {
222 session_config = session_config.with_target_partitions(target_partition);
223 }
224 if options.use_spilling() {
225 runtime_env_builder = runtime_env_builder
226 .with_disk_manager(DiskManagerConfig::new())
227 .with_memory_pool(Arc::new(FairSpillPool::new(
228 options.mem_pool_size() as usize
229 )));
230 }
231 let runtime_env = runtime_env_builder.build_arc().unwrap();
232 SessionContext::new_with_config_rt(session_config, runtime_env)
233}
234
235lazy_static! {
236 static ref DEFAULT_SESSION_CONTEXT: SessionContext =
237 new_session_context(&LanceExecutionOptions::default());
238 static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
239 new_session_context(&LanceExecutionOptions {
240 use_spilling: true,
241 ..Default::default()
242 })
243 };
244}
245
246pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
247 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE && options.target_partition.is_none()
248 {
249 return if options.use_spilling() {
250 DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone()
251 } else {
252 DEFAULT_SESSION_CONTEXT.clone()
253 };
254 }
255 new_session_context(options)
256}
257
258fn get_task_context(
259 session_ctx: &SessionContext,
260 options: &LanceExecutionOptions,
261) -> Arc<TaskContext> {
262 let mut state = session_ctx.state();
263 if let Some(batch_size) = options.batch_size.as_ref() {
264 state.config_mut().options_mut().execution.batch_size = *batch_size;
265 }
266
267 state.task_ctx()
268}
269
270#[derive(Default)]
271struct SummaryCounts {
272 iops: usize,
273 requests: usize,
274 bytes_read: usize,
275 indices_loaded: usize,
276 parts_loaded: usize,
277 index_comparisons: usize,
278}
279
280fn visit_node(node: &dyn ExecutionPlan, counts: &mut SummaryCounts) {
281 if let Some(metrics) = node.metrics() {
282 counts.iops += metrics
283 .find_count(IOPS_METRIC)
284 .map(|c| c.value())
285 .unwrap_or(0);
286 counts.requests += metrics
287 .find_count(REQUESTS_METRIC)
288 .map(|c| c.value())
289 .unwrap_or(0);
290 counts.bytes_read += metrics
291 .find_count(BYTES_READ_METRIC)
292 .map(|c| c.value())
293 .unwrap_or(0);
294 counts.indices_loaded += metrics
295 .find_count(INDICES_LOADED_METRIC)
296 .map(|c| c.value())
297 .unwrap_or(0);
298 counts.parts_loaded += metrics
299 .find_count(PARTS_LOADED_METRIC)
300 .map(|c| c.value())
301 .unwrap_or(0);
302 counts.index_comparisons += metrics
303 .find_count(INDEX_COMPARISONS_METRIC)
304 .map(|c| c.value())
305 .unwrap_or(0);
306 }
307 for child in node.children() {
308 visit_node(child.as_ref(), counts);
309 }
310}
311
312fn report_plan_summary_metrics(plan: &dyn ExecutionPlan) {
313 let output_rows = plan
314 .metrics()
315 .map(|m| m.output_rows().unwrap_or(0))
316 .unwrap_or(0);
317 let mut counts = SummaryCounts::default();
318 visit_node(plan, &mut counts);
319 tracing::info!(
320 target: TRACE_EXECUTION,
321 type = EXECUTION_PLAN_RUN,
322 output_rows,
323 iops = counts.iops,
324 requests = counts.requests,
325 bytes_read = counts.bytes_read,
326 indices_loaded = counts.indices_loaded,
327 parts_loaded = counts.parts_loaded,
328 index_comparisons = counts.index_comparisons,
329 );
330}
331
332pub fn execute_plan(
336 plan: Arc<dyn ExecutionPlan>,
337 options: LanceExecutionOptions,
338) -> Result<SendableRecordBatchStream> {
339 debug!(
340 "Executing plan:\n{}",
341 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
342 );
343
344 let session_ctx = get_session_context(&options);
345
346 assert_eq!(plan.properties().partitioning.partition_count(), 1);
349 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
350
351 let schema = stream.schema();
352 let stream = stream.finally(move || {
353 report_plan_summary_metrics(plan.as_ref());
354 });
355 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
356}
357
358pub async fn analyze_plan(
359 plan: Arc<dyn ExecutionPlan>,
360 options: LanceExecutionOptions,
361) -> Result<String> {
362 let schema = plan.schema();
363 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
364
365 let session_ctx = get_session_context(&options);
366 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
367 let mut stream = analyze
368 .execute(0, get_task_context(&session_ctx, &options))
369 .map_err(|err| {
370 Error::io(
371 format!("Failed to execute analyze plan: {}", err),
372 location!(),
373 )
374 })?;
375
376 while (stream.next().await).is_some() {}
378
379 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
380 Ok(format!("{}", display.indent(true)))
381}
382
383pub trait SessionContextExt {
384 fn read_one_shot(
388 &self,
389 data: SendableRecordBatchStream,
390 ) -> datafusion::common::Result<DataFrame>;
391}
392
393struct OneShotPartitionStream {
394 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
395 schema: Arc<ArrowSchema>,
396}
397
398impl std::fmt::Debug for OneShotPartitionStream {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 let data = self.data.lock().unwrap();
401 f.debug_struct("OneShotPartitionStream")
402 .field("exhausted", &data.is_none())
403 .field("schema", self.schema.as_ref())
404 .finish()
405 }
406}
407
408impl OneShotPartitionStream {
409 fn new(data: SendableRecordBatchStream) -> Self {
410 let schema = data.schema();
411 Self {
412 data: Arc::new(Mutex::new(Some(data))),
413 schema,
414 }
415 }
416}
417
418impl PartitionStream for OneShotPartitionStream {
419 fn schema(&self) -> &arrow_schema::SchemaRef {
420 &self.schema
421 }
422
423 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
424 let mut stream = self.data.lock().unwrap();
425 stream
426 .take()
427 .expect("Attempt to consume a one shot dataframe multiple times")
428 }
429}
430
431impl SessionContextExt for SessionContext {
432 fn read_one_shot(
433 &self,
434 data: SendableRecordBatchStream,
435 ) -> datafusion::common::Result<DataFrame> {
436 let schema = data.schema();
437 let part_stream = Arc::new(OneShotPartitionStream::new(data));
438 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
439 self.read_table(Arc::new(provider))
440 }
441}