lance_datafusion/
datagen.rs1use std::sync::Arc;
5
6use datafusion::{
7 execution::SendableRecordBatchStream,
8 physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan},
9};
10use datafusion_common::DataFusionError;
11use futures::TryStreamExt;
12use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount};
13
14use crate::exec::OneShotExec;
15
16pub trait DatafusionDatagenExt {
17 fn into_df_stream(
18 self,
19 batch_size: RowCount,
20 num_batches: BatchCount,
21 ) -> SendableRecordBatchStream;
22
23 fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan>;
24}
25
26impl DatafusionDatagenExt for BatchGeneratorBuilder {
27 fn into_df_stream(
28 self,
29 batch_size: RowCount,
30 num_batches: BatchCount,
31 ) -> SendableRecordBatchStream {
32 let (stream, schema) = self.into_reader_stream(batch_size, num_batches);
33 let stream = stream.map_err(DataFusionError::from);
34 Box::pin(RecordBatchStreamAdapter::new(schema, stream))
35 }
36
37 fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan> {
38 let stream = self.into_df_stream(batch_size, num_batches);
39 Arc::new(OneShotExec::new(stream))
40 }
41}