use crate::FileType;
use crate::Result;
use crate::pipeline;
use crate::pipeline::Step;
use crate::pipeline::dataframe::DataFramePipeline;
use crate::pipeline::record_batch::RecordBatchPipeline;
pub(crate) fn block_on_pipeline_future<T, F>(fut: F) -> Result<T>
where
F: std::future::Future<Output = Result<T>>,
{
if let Ok(handle) = tokio::runtime::Handle::try_current() {
tokio::task::block_in_place(|| handle.block_on(fut))
} else {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(fut)
}
}
pub enum Pipeline {
DataFrame(DataFramePipeline),
RecordBatch(RecordBatchPipeline),
}
impl Pipeline {
pub fn execute(&mut self) -> Result<()> {
match self {
Pipeline::DataFrame(p) => p.execute(),
Pipeline::RecordBatch(p) => p.execute(),
}
}
}
pub async fn write_batches(
batches: Vec<arrow::record_batch::RecordBatch>,
output_path: &str,
output_file_type: FileType,
sparse: bool,
json_pretty: bool,
) -> eyre::Result<()> {
let ctx = datafusion::execution::context::SessionContext::new();
let df = ctx.read_batches(batches).map_err(|e| eyre::eyre!("{e}"))?;
let source = pipeline::dataframe::DataFrameSource::new(df);
let writer_step = pipeline::dataframe::DataFrameWriter::new(
output_path,
output_file_type,
sparse,
json_pretty,
);
writer_step.execute(source).await?;
Ok(())
}