lance_datafusion/
datagen.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use datafusion::{
7    execution::SendableRecordBatchStream,
8    physical_plan::{stream::RecordBatchStreamAdapter, ExecutionPlan},
9};
10use datafusion_common::DataFusionError;
11use futures::TryStreamExt;
12use lance_datagen::{BatchCount, BatchGeneratorBuilder, RowCount};
13
14use crate::exec::OneShotExec;
15
16pub trait DatafusionDatagenExt {
17    fn into_df_stream(
18        self,
19        batch_size: RowCount,
20        num_batches: BatchCount,
21    ) -> SendableRecordBatchStream;
22
23    fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan>;
24}
25
26impl DatafusionDatagenExt for BatchGeneratorBuilder {
27    fn into_df_stream(
28        self,
29        batch_size: RowCount,
30        num_batches: BatchCount,
31    ) -> SendableRecordBatchStream {
32        let (stream, schema) = self.into_reader_stream(batch_size, num_batches);
33        let stream = stream.map_err(DataFusionError::from);
34        Box::pin(RecordBatchStreamAdapter::new(schema, stream))
35    }
36
37    fn into_df_exec(self, batch_size: RowCount, num_batches: BatchCount) -> Arc<dyn ExecutionPlan> {
38        let stream = self.into_df_stream(batch_size, num_batches);
39        Arc::new(OneShotExec::new(stream))
40    }
41}