1use std::sync::{Arc, Mutex};
7
8use arrow_array::RecordBatch;
9use arrow_schema::Schema as ArrowSchema;
10use datafusion::{
11 catalog::streaming::StreamingTable,
12 dataframe::DataFrame,
13 execution::{
14 context::{SessionConfig, SessionContext},
15 disk_manager::DiskManagerConfig,
16 memory_pool::FairSpillPool,
17 runtime_env::RuntimeEnvBuilder,
18 TaskContext,
19 },
20 physical_plan::{
21 analyze::AnalyzeExec,
22 display::DisplayableExecutionPlan,
23 execution_plan::{Boundedness, EmissionType},
24 stream::RecordBatchStreamAdapter,
25 streaming::PartitionStream,
26 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
27 },
28};
29use datafusion_common::{DataFusionError, Statistics};
30use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
31use lazy_static::lazy_static;
32
33use futures::{stream, StreamExt};
34use lance_arrow::SchemaExt;
35use lance_core::{
36 utils::{
37 futures::FinallyStreamExt,
38 tracing::{EXECUTION_PLAN_RUN, TRACE_EXECUTION},
39 },
40 Error, Result,
41};
42use log::{debug, info, warn};
43use snafu::location;
44
45use crate::utils::{
46 MetricsExt, BYTES_READ_METRIC, INDEX_COMPARISONS_METRIC, INDICES_LOADED_METRIC, IOPS_METRIC,
47 PARTS_LOADED_METRIC, REQUESTS_METRIC,
48};
49
50pub struct OneShotExec {
58 stream: Mutex<Option<SendableRecordBatchStream>>,
59 schema: Arc<ArrowSchema>,
62 properties: PlanProperties,
63}
64
65impl OneShotExec {
66 pub fn new(stream: SendableRecordBatchStream) -> Self {
68 let schema = stream.schema();
69 Self {
70 stream: Mutex::new(Some(stream)),
71 schema: schema.clone(),
72 properties: PlanProperties::new(
73 EquivalenceProperties::new(schema),
74 Partitioning::RoundRobinBatch(1),
75 EmissionType::Incremental,
76 Boundedness::Bounded,
77 ),
78 }
79 }
80
81 pub fn from_batch(batch: RecordBatch) -> Self {
82 let schema = batch.schema();
83 let stream = Box::pin(RecordBatchStreamAdapter::new(
84 schema,
85 stream::iter(vec![Ok(batch)]),
86 ));
87 Self::new(stream)
88 }
89}
90
91impl std::fmt::Debug for OneShotExec {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 let stream = self.stream.lock().unwrap();
94 f.debug_struct("OneShotExec")
95 .field("exhausted", &stream.is_none())
96 .field("schema", self.schema.as_ref())
97 .finish()
98 }
99}
100
101impl DisplayAs for OneShotExec {
102 fn fmt_as(
103 &self,
104 t: datafusion::physical_plan::DisplayFormatType,
105 f: &mut std::fmt::Formatter,
106 ) -> std::fmt::Result {
107 let stream = self.stream.lock().unwrap();
108 match t {
109 DisplayFormatType::Default | DisplayFormatType::Verbose => {
110 let exhausted = if stream.is_some() { "" } else { "EXHAUSTED" };
111 let columns = self
112 .schema
113 .field_names()
114 .iter()
115 .map(|s| s.to_string())
116 .collect::<Vec<_>>();
117 write!(
118 f,
119 "OneShotStream: {}columns=[{}]",
120 exhausted,
121 columns.join(",")
122 )
123 }
124 }
125 }
126}
127
128impl ExecutionPlan for OneShotExec {
129 fn name(&self) -> &str {
130 "OneShotExec"
131 }
132
133 fn as_any(&self) -> &dyn std::any::Any {
134 self
135 }
136
137 fn schema(&self) -> arrow_schema::SchemaRef {
138 self.schema.clone()
139 }
140
141 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
142 vec![]
143 }
144
145 fn with_new_children(
146 self: Arc<Self>,
147 _children: Vec<Arc<dyn ExecutionPlan>>,
148 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
149 todo!()
150 }
151
152 fn execute(
153 &self,
154 _partition: usize,
155 _context: Arc<datafusion::execution::TaskContext>,
156 ) -> datafusion_common::Result<SendableRecordBatchStream> {
157 let stream = self
158 .stream
159 .lock()
160 .map_err(|err| DataFusionError::Execution(err.to_string()))?
161 .take();
162 if let Some(stream) = stream {
163 Ok(stream)
164 } else {
165 Err(DataFusionError::Execution(
166 "OneShotExec has already been executed".to_string(),
167 ))
168 }
169 }
170
171 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
172 Ok(Statistics::new_unknown(&self.schema))
173 }
174
175 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
176 &self.properties
177 }
178}
179
180#[derive(Debug, Default, Clone)]
181pub struct LanceExecutionOptions {
182 pub use_spilling: bool,
183 pub mem_pool_size: Option<u64>,
184 pub batch_size: Option<usize>,
185}
186
187const DEFAULT_LANCE_MEM_POOL_SIZE: u64 = 100 * 1024 * 1024;
188
189impl LanceExecutionOptions {
190 pub fn mem_pool_size(&self) -> u64 {
191 self.mem_pool_size.unwrap_or_else(|| {
192 std::env::var("LANCE_MEM_POOL_SIZE")
193 .map(|s| match s.parse::<u64>() {
194 Ok(v) => v,
195 Err(e) => {
196 warn!("Failed to parse LANCE_MEM_POOL_SIZE: {}, using default", e);
197 DEFAULT_LANCE_MEM_POOL_SIZE
198 }
199 })
200 .unwrap_or(DEFAULT_LANCE_MEM_POOL_SIZE)
201 })
202 }
203
204 pub fn use_spilling(&self) -> bool {
205 if !self.use_spilling {
206 return false;
207 }
208 std::env::var("LANCE_BYPASS_SPILLING")
209 .map(|_| {
210 info!("Bypassing spilling because LANCE_BYPASS_SPILLING is set");
211 false
212 })
213 .unwrap_or(true)
214 }
215}
216
217pub fn new_session_context(options: &LanceExecutionOptions) -> SessionContext {
218 let session_config = SessionConfig::new();
219 let mut runtime_env_builder = RuntimeEnvBuilder::new();
220 if options.use_spilling() {
221 runtime_env_builder = runtime_env_builder
222 .with_disk_manager(DiskManagerConfig::new())
223 .with_memory_pool(Arc::new(FairSpillPool::new(
224 options.mem_pool_size() as usize
225 )));
226 }
227 let runtime_env = runtime_env_builder.build_arc().unwrap();
228 SessionContext::new_with_config_rt(session_config, runtime_env)
229}
230
231lazy_static! {
232 static ref DEFAULT_SESSION_CONTEXT: SessionContext =
233 new_session_context(&LanceExecutionOptions::default());
234 static ref DEFAULT_SESSION_CONTEXT_WITH_SPILLING: SessionContext = {
235 new_session_context(&LanceExecutionOptions {
236 use_spilling: true,
237 ..Default::default()
238 })
239 };
240}
241
242pub fn get_session_context(options: &LanceExecutionOptions) -> SessionContext {
243 let session_ctx: SessionContext;
244 if options.mem_pool_size() == DEFAULT_LANCE_MEM_POOL_SIZE {
245 if options.use_spilling() {
246 session_ctx = DEFAULT_SESSION_CONTEXT_WITH_SPILLING.clone();
247 } else {
248 session_ctx = DEFAULT_SESSION_CONTEXT.clone();
249 }
250 } else {
251 session_ctx = new_session_context(options)
252 }
253 session_ctx
254}
255
256fn get_task_context(
257 session_ctx: &SessionContext,
258 options: &LanceExecutionOptions,
259) -> Arc<TaskContext> {
260 let mut state = session_ctx.state();
261 if let Some(batch_size) = options.batch_size.as_ref() {
262 state.config_mut().options_mut().execution.batch_size = *batch_size;
263 }
264
265 state.task_ctx()
266}
267
268#[derive(Default)]
269struct SummaryCounts {
270 iops: usize,
271 requests: usize,
272 bytes_read: usize,
273 indices_loaded: usize,
274 parts_loaded: usize,
275 index_comparisons: usize,
276}
277
278fn visit_node(node: &dyn ExecutionPlan, counts: &mut SummaryCounts) {
279 if let Some(metrics) = node.metrics() {
280 counts.iops += metrics
281 .find_count(IOPS_METRIC)
282 .map(|c| c.value())
283 .unwrap_or(0);
284 counts.requests += metrics
285 .find_count(REQUESTS_METRIC)
286 .map(|c| c.value())
287 .unwrap_or(0);
288 counts.bytes_read += metrics
289 .find_count(BYTES_READ_METRIC)
290 .map(|c| c.value())
291 .unwrap_or(0);
292 counts.indices_loaded += metrics
293 .find_count(INDICES_LOADED_METRIC)
294 .map(|c| c.value())
295 .unwrap_or(0);
296 counts.parts_loaded += metrics
297 .find_count(PARTS_LOADED_METRIC)
298 .map(|c| c.value())
299 .unwrap_or(0);
300 counts.index_comparisons += metrics
301 .find_count(INDEX_COMPARISONS_METRIC)
302 .map(|c| c.value())
303 .unwrap_or(0);
304 }
305 for child in node.children() {
306 visit_node(child.as_ref(), counts);
307 }
308}
309
310fn report_plan_summary_metrics(plan: &dyn ExecutionPlan) {
311 let output_rows = plan
312 .metrics()
313 .map(|m| m.output_rows().unwrap_or(0))
314 .unwrap_or(0);
315 let mut counts = SummaryCounts::default();
316 visit_node(plan, &mut counts);
317 tracing::info!(
318 target: TRACE_EXECUTION,
319 type = EXECUTION_PLAN_RUN,
320 output_rows,
321 iops = counts.iops,
322 requests = counts.requests,
323 bytes_read = counts.bytes_read,
324 indices_loaded = counts.indices_loaded,
325 parts_loaded = counts.parts_loaded,
326 index_comparisons = counts.index_comparisons,
327 );
328}
329
330pub fn execute_plan(
334 plan: Arc<dyn ExecutionPlan>,
335 options: LanceExecutionOptions,
336) -> Result<SendableRecordBatchStream> {
337 debug!(
338 "Executing plan:\n{}",
339 DisplayableExecutionPlan::new(plan.as_ref()).indent(true)
340 );
341
342 let session_ctx = get_session_context(&options);
343
344 assert_eq!(plan.properties().partitioning.partition_count(), 1);
347 let stream = plan.execute(0, get_task_context(&session_ctx, &options))?;
348
349 let schema = stream.schema();
350 let stream = stream.finally(move || {
351 report_plan_summary_metrics(plan.as_ref());
352 });
353 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
354}
355
356pub async fn analyze_plan(
357 plan: Arc<dyn ExecutionPlan>,
358 options: LanceExecutionOptions,
359) -> Result<String> {
360 let schema = plan.schema();
361 let analyze = Arc::new(AnalyzeExec::new(true, true, plan, schema));
362
363 let session_ctx = get_session_context(&options);
364 assert_eq!(analyze.properties().partitioning.partition_count(), 1);
365 let mut stream = analyze
366 .execute(0, get_task_context(&session_ctx, &options))
367 .map_err(|err| {
368 Error::io(
369 format!("Failed to execute analyze plan: {}", err),
370 location!(),
371 )
372 })?;
373
374 while (stream.next().await).is_some() {}
376
377 let display = DisplayableExecutionPlan::with_metrics(analyze.as_ref());
378 Ok(format!("{}", display.indent(true)))
379}
380
381pub trait SessionContextExt {
382 fn read_one_shot(
386 &self,
387 data: SendableRecordBatchStream,
388 ) -> datafusion::common::Result<DataFrame>;
389}
390
391struct OneShotPartitionStream {
392 data: Arc<Mutex<Option<SendableRecordBatchStream>>>,
393 schema: Arc<ArrowSchema>,
394}
395
396impl std::fmt::Debug for OneShotPartitionStream {
397 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398 let data = self.data.lock().unwrap();
399 f.debug_struct("OneShotPartitionStream")
400 .field("exhausted", &data.is_none())
401 .field("schema", self.schema.as_ref())
402 .finish()
403 }
404}
405
406impl OneShotPartitionStream {
407 fn new(data: SendableRecordBatchStream) -> Self {
408 let schema = data.schema();
409 Self {
410 data: Arc::new(Mutex::new(Some(data))),
411 schema,
412 }
413 }
414}
415
416impl PartitionStream for OneShotPartitionStream {
417 fn schema(&self) -> &arrow_schema::SchemaRef {
418 &self.schema
419 }
420
421 fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
422 let mut stream = self.data.lock().unwrap();
423 stream
424 .take()
425 .expect("Attempt to consume a one shot dataframe multiple times")
426 }
427}
428
429impl SessionContextExt for SessionContext {
430 fn read_one_shot(
431 &self,
432 data: SendableRecordBatchStream,
433 ) -> datafusion::common::Result<DataFrame> {
434 let schema = data.schema();
435 let part_stream = Arc::new(OneShotPartitionStream::new(data));
436 let provider = StreamingTable::try_new(schema, vec![part_stream])?;
437 self.read_table(Arc::new(provider))
438 }
439}