use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
use datafusion_physical_plan::projection::ProjectionExprs;
use std::any::Any;
use std::fmt;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use std::task::Poll;
use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::{
FileRange, ListingTableUrl, PartitionedFile, RangeCalculation, TableSchema,
as_file_source, calculate_range,
};
use arrow::csv;
use datafusion_common::config::CsvOptions;
use datafusion_common::{DataFusionError, Result};
use datafusion_common_runtime::JoinSet;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_scan_config::FileScanConfig;
use datafusion_execution::TaskContext;
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use datafusion_physical_plan::{
DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
};
use crate::file_format::CsvDecoder;
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
use tokio::io::AsyncWriteExt;
#[derive(Debug, Clone)]
pub struct CsvSource {
options: CsvOptions,
batch_size: Option<usize>,
table_schema: TableSchema,
projection: SplitProjection,
metrics: ExecutionPlanMetricsSet,
}
impl CsvSource {
pub fn new(table_schema: impl Into<TableSchema>) -> Self {
let table_schema = table_schema.into();
Self {
options: CsvOptions::default(),
projection: SplitProjection::unprojected(&table_schema),
table_schema,
batch_size: None,
metrics: ExecutionPlanMetricsSet::new(),
}
}
pub fn with_csv_options(mut self, options: CsvOptions) -> Self {
self.options = options;
self
}
pub fn has_header(&self) -> bool {
self.options.has_header.unwrap_or(true)
}
pub fn truncate_rows(&self) -> bool {
self.options.truncated_rows.unwrap_or(false)
}
pub fn delimiter(&self) -> u8 {
self.options.delimiter
}
pub fn quote(&self) -> u8 {
self.options.quote
}
pub fn terminator(&self) -> Option<u8> {
self.options.terminator
}
pub fn comment(&self) -> Option<u8> {
self.options.comment
}
pub fn escape(&self) -> Option<u8> {
self.options.escape
}
pub fn with_escape(&self, escape: Option<u8>) -> Self {
let mut conf = self.clone();
conf.options.escape = escape;
conf
}
pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
let mut conf = self.clone();
conf.options.terminator = terminator;
conf
}
pub fn with_comment(&self, comment: Option<u8>) -> Self {
let mut conf = self.clone();
conf.options.comment = comment;
conf
}
pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
let mut conf = self.clone();
conf.options.truncated_rows = Some(truncate_rows);
conf
}
pub fn newlines_in_values(&self) -> bool {
self.options.newlines_in_values.unwrap_or(false)
}
}
impl CsvSource {
fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
Ok(self.builder().build(reader)?)
}
fn builder(&self) -> csv::ReaderBuilder {
let mut builder =
csv::ReaderBuilder::new(Arc::clone(self.table_schema.file_schema()))
.with_delimiter(self.delimiter())
.with_batch_size(
self.batch_size
.expect("Batch size must be set before initializing builder"),
)
.with_header(self.has_header())
.with_quote(self.quote())
.with_truncated_rows(self.truncate_rows());
if let Some(terminator) = self.terminator() {
builder = builder.with_terminator(terminator);
}
builder = builder.with_projection(self.projection.file_indices.clone());
if let Some(escape) = self.escape() {
builder = builder.with_escape(escape)
}
if let Some(comment) = self.comment() {
builder = builder.with_comment(comment);
}
builder
}
}
pub struct CsvOpener {
config: Arc<CsvSource>,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
partition_index: usize,
}
impl CsvOpener {
pub fn new(
config: Arc<CsvSource>,
file_compression_type: FileCompressionType,
object_store: Arc<dyn ObjectStore>,
) -> Self {
Self {
config,
file_compression_type,
object_store,
partition_index: 0,
}
}
}
impl From<CsvSource> for Arc<dyn FileSource> {
fn from(source: CsvSource) -> Self {
as_file_source(source)
}
}
impl FileSource for CsvSource {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition_index: usize,
) -> Result<Arc<dyn FileOpener>> {
let mut opener = Arc::new(CsvOpener {
config: Arc::new(self.clone()),
file_compression_type: base_config.file_compression_type,
object_store,
partition_index,
}) as Arc<dyn FileOpener>;
opener = ProjectionOpener::try_new(
self.projection.clone(),
Arc::clone(&opener),
self.table_schema.file_schema(),
)?;
Ok(opener)
}
fn as_any(&self) -> &dyn Any {
self
}
fn table_schema(&self) -> &TableSchema {
&self.table_schema
}
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.batch_size = Some(batch_size);
Arc::new(conf)
}
fn try_pushdown_projection(
&self,
projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
let mut source = self.clone();
let new_projection = self.projection.source.try_merge(projection)?;
let split_projection =
SplitProjection::new(self.table_schema.file_schema(), &new_projection);
source.projection = split_projection;
Ok(Some(Arc::new(source)))
}
fn projection(&self) -> Option<&ProjectionExprs> {
Some(&self.projection.source)
}
fn metrics(&self) -> &ExecutionPlanMetricsSet {
&self.metrics
}
fn file_type(&self) -> &str {
"csv"
}
fn supports_repartitioning(&self) -> bool {
!self.options.newlines_in_values.unwrap_or(false)
}
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, ", has_header={}", self.has_header())
}
DisplayFormatType::TreeRender => Ok(()),
}
}
}
impl FileOpener for CsvOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let mut csv_has_header = self.config.has_header();
if let Some(FileRange { start, .. }) = partitioned_file.range
&& start != 0
{
csv_has_header = false;
}
let mut config = (*self.config).clone();
config.options.has_header = Some(csv_has_header);
config.options.truncated_rows = Some(config.truncate_rows());
let file_compression_type = self.file_compression_type.to_owned();
if partitioned_file.range.is_some() {
assert!(
!file_compression_type.is_compressed(),
"Reading compressed .csv in parallel is not supported"
);
}
let store = Arc::clone(&self.object_store);
let terminator = self.config.terminator();
let baseline_metrics =
BaselineMetrics::new(&self.config.metrics, self.partition_index);
Ok(Box::pin(async move {
let calculated_range =
calculate_range(&partitioned_file, &store, terminator).await?;
let range = match calculated_range {
RangeCalculation::Range(None) => None,
RangeCalculation::Range(Some(range)) => Some(range.into()),
RangeCalculation::TerminateEarly => {
return Ok(
futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
);
}
};
let options = GetOptions {
range,
..Default::default()
};
let result = store
.get_opts(&partitioned_file.object_meta.location, options)
.await?;
match result.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let is_whole_file_scanned = partitioned_file.range.is_none();
let decoder = if is_whole_file_scanned {
file_compression_type.convert_read(file)?
} else {
file.seek(SeekFrom::Start(result.range.start as _))?;
file_compression_type.convert_read(
file.take((result.range.end - result.range.start) as u64),
)?
};
let mut reader = config.open(decoder)?;
let iterator = std::iter::from_fn(move || {
let mut timer = baseline_metrics.elapsed_compute().timer();
let result = reader.next();
timer.stop();
result
});
Ok(futures::stream::iter(iterator)
.map(|r| r.map_err(Into::into))
.boxed())
}
GetResultPayload::Stream(s) => {
let decoder = config.builder().build_decoder();
let s = s.map_err(DataFusionError::from);
let input = file_compression_type.convert_stream(s.boxed())?.fuse();
let stream = deserialize_stream(
input,
DecoderDeserializer::new(CsvDecoder::new(decoder)),
);
Ok(stream.map_err(Into::into).boxed())
}
}
}))
}
}
pub async fn plan_to_csv(
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let writer_buffer_size = task_ctx
.session_config()
.options()
.execution
.objectstore_writer_buffer_size;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
let filename = format!("{}/part-{i}.csv", parsed.prefix());
let file = object_store::path::Path::parse(filename)?;
let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
join_set.spawn(async move {
let mut buf_writer =
BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
let mut buffer = Vec::with_capacity(1024);
let mut write_headers = true;
while let Some(batch) = stream.next().await.transpose()? {
let mut writer = csv::WriterBuilder::new()
.with_header(write_headers)
.build(buffer);
writer.write(&batch)?;
buffer = writer.into_inner();
buf_writer.write_all(&buffer).await?;
buffer.clear();
write_headers = false;
}
buf_writer.shutdown().await.map_err(DataFusionError::from)
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => res?, Err(e) => {
if e.is_panic() {
std::panic::resume_unwind(e.into_panic());
} else {
unreachable!();
}
}
}
}
Ok(())
}