1use std::any::Any;
21use std::fmt;
22use std::io::{Read, Seek, SeekFrom};
23use std::sync::Arc;
24use std::task::Poll;
25
26use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_meta::FileMeta;
29use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
30use datafusion_datasource::{
31 calculate_range, FileRange, ListingTableUrl, RangeCalculation,
32};
33
34use arrow::csv;
35use arrow::datatypes::SchemaRef;
36use datafusion_common::config::ConfigOptions;
37use datafusion_common::{Constraints, DataFusionError, Result, Statistics};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_datasource::source::DataSourceExec;
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
46use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
47use datafusion_physical_plan::projection::ProjectionExec;
48use datafusion_physical_plan::{
49 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
50};
51
52use crate::file_format::CsvDecoder;
53use datafusion_datasource::file_groups::FileGroup;
54use futures::{StreamExt, TryStreamExt};
55use object_store::buffered::BufWriter;
56use object_store::{GetOptions, GetResultPayload, ObjectStore};
57use tokio::io::AsyncWriteExt;
58
59#[derive(Debug, Clone)]
63#[deprecated(since = "46.0.0", note = "use DataSourceExec instead")]
64pub struct CsvExec {
65 base_config: FileScanConfig,
66 inner: DataSourceExec,
67}
68
69#[derive(Debug, Clone)]
73#[deprecated(since = "46.0.0", note = "use FileScanConfig instead")]
74pub struct CsvExecBuilder {
75 file_scan_config: FileScanConfig,
76 file_compression_type: FileCompressionType,
77 has_header: bool,
79 delimiter: u8,
80 quote: u8,
81 terminator: Option<u8>,
82 escape: Option<u8>,
83 comment: Option<u8>,
84 newlines_in_values: bool,
85}
86
87#[allow(unused, deprecated)]
88impl CsvExecBuilder {
89 pub fn new(file_scan_config: FileScanConfig) -> Self {
91 Self {
92 file_scan_config,
93 has_header: false,
95 delimiter: b',',
96 quote: b'"',
97 terminator: None,
98 escape: None,
99 comment: None,
100 newlines_in_values: false,
101 file_compression_type: FileCompressionType::UNCOMPRESSED,
102 }
103 }
104
105 pub fn with_has_header(mut self, has_header: bool) -> Self {
109 self.has_header = has_header;
110 self
111 }
112
113 pub fn with_delimeter(mut self, delimiter: u8) -> Self {
117 self.delimiter = delimiter;
118 self
119 }
120
121 pub fn with_quote(mut self, quote: u8) -> Self {
125 self.quote = quote;
126 self
127 }
128
129 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
133 self.terminator = terminator;
134 self
135 }
136
137 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
141 self.escape = escape;
142 self
143 }
144
145 pub fn with_comment(mut self, comment: Option<u8>) -> Self {
149 self.comment = comment;
150 self
151 }
152
153 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
161 self.newlines_in_values = newlines_in_values;
162 self
163 }
164
165 pub fn with_file_compression_type(
169 mut self,
170 file_compression_type: FileCompressionType,
171 ) -> Self {
172 self.file_compression_type = file_compression_type;
173 self
174 }
175
176 #[must_use]
178 pub fn build(self) -> CsvExec {
179 let Self {
180 file_scan_config: base_config,
181 file_compression_type,
182 has_header,
183 delimiter,
184 quote,
185 terminator,
186 escape,
187 comment,
188 newlines_in_values,
189 } = self;
190
191 let (
192 projected_schema,
193 projected_constraints,
194 projected_statistics,
195 projected_output_ordering,
196 ) = base_config.project();
197 let cache = CsvExec::compute_properties(
198 projected_schema,
199 &projected_output_ordering,
200 projected_constraints,
201 &base_config,
202 );
203 let csv = CsvSource::new(has_header, delimiter, quote)
204 .with_comment(comment)
205 .with_escape(escape)
206 .with_terminator(terminator);
207 let base_config = base_config
208 .with_newlines_in_values(newlines_in_values)
209 .with_file_compression_type(file_compression_type)
210 .with_source(Arc::new(csv));
211
212 CsvExec {
213 inner: DataSourceExec::new(Arc::new(base_config.clone())),
214 base_config,
215 }
216 }
217}
218
219#[allow(unused, deprecated)]
220impl CsvExec {
221 #[allow(clippy::too_many_arguments)]
223 pub fn new(
224 base_config: FileScanConfig,
225 has_header: bool,
226 delimiter: u8,
227 quote: u8,
228 terminator: Option<u8>,
229 escape: Option<u8>,
230 comment: Option<u8>,
231 newlines_in_values: bool,
232 file_compression_type: FileCompressionType,
233 ) -> Self {
234 CsvExecBuilder::new(base_config)
235 .with_has_header(has_header)
236 .with_delimeter(delimiter)
237 .with_quote(quote)
238 .with_terminator(terminator)
239 .with_escape(escape)
240 .with_comment(comment)
241 .with_newlines_in_values(newlines_in_values)
242 .with_file_compression_type(file_compression_type)
243 .build()
244 }
245
246 pub fn builder(file_scan_config: FileScanConfig) -> CsvExecBuilder {
250 CsvExecBuilder::new(file_scan_config)
251 }
252
253 pub fn base_config(&self) -> &FileScanConfig {
255 &self.base_config
256 }
257
258 fn file_scan_config(&self) -> FileScanConfig {
259 self.inner
260 .data_source()
261 .as_any()
262 .downcast_ref::<FileScanConfig>()
263 .unwrap()
264 .clone()
265 }
266
267 fn csv_source(&self) -> CsvSource {
268 let source = self.file_scan_config();
269 source
270 .file_source()
271 .as_any()
272 .downcast_ref::<CsvSource>()
273 .unwrap()
274 .clone()
275 }
276
277 pub fn has_header(&self) -> bool {
279 self.csv_source().has_header()
280 }
281
282 pub fn newlines_in_values(&self) -> bool {
290 let source = self.file_scan_config();
291 source.newlines_in_values()
292 }
293
294 fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
295 Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
296 }
297
298 fn compute_properties(
300 schema: SchemaRef,
301 orderings: &[LexOrdering],
302 constraints: Constraints,
303 file_scan_config: &FileScanConfig,
304 ) -> PlanProperties {
305 let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)
307 .with_constraints(constraints);
308
309 PlanProperties::new(
310 eq_properties,
311 Self::output_partitioning_helper(file_scan_config), EmissionType::Incremental,
313 Boundedness::Bounded,
314 )
315 }
316
317 fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
318 self.base_config.file_groups = file_groups.clone();
319 let mut file_source = self.file_scan_config();
320 file_source = file_source.with_file_groups(file_groups);
321 self.inner = self.inner.with_data_source(Arc::new(file_source));
322 self
323 }
324}
325
326#[allow(unused, deprecated)]
327impl DisplayAs for CsvExec {
328 fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
329 self.inner.fmt_as(t, f)
330 }
331}
332
333#[allow(unused, deprecated)]
334impl ExecutionPlan for CsvExec {
335 fn name(&self) -> &'static str {
336 "CsvExec"
337 }
338
339 fn as_any(&self) -> &dyn Any {
341 self
342 }
343
344 fn properties(&self) -> &PlanProperties {
345 self.inner.properties()
346 }
347
348 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
349 vec![]
351 }
352
353 fn with_new_children(
354 self: Arc<Self>,
355 _: Vec<Arc<dyn ExecutionPlan>>,
356 ) -> Result<Arc<dyn ExecutionPlan>> {
357 Ok(self)
358 }
359
360 fn repartitioned(
365 &self,
366 target_partitions: usize,
367 config: &ConfigOptions,
368 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
369 self.inner.repartitioned(target_partitions, config)
370 }
371
372 fn execute(
373 &self,
374 partition: usize,
375 context: Arc<TaskContext>,
376 ) -> Result<SendableRecordBatchStream> {
377 self.inner.execute(partition, context)
378 }
379
380 fn statistics(&self) -> Result<Statistics> {
381 self.inner.statistics()
382 }
383
384 fn metrics(&self) -> Option<MetricsSet> {
385 self.inner.metrics()
386 }
387
388 fn fetch(&self) -> Option<usize> {
389 self.inner.fetch()
390 }
391
392 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
393 self.inner.with_fetch(limit)
394 }
395
396 fn try_swapping_with_projection(
397 &self,
398 projection: &ProjectionExec,
399 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
400 self.inner.try_swapping_with_projection(projection)
401 }
402}
403
404#[derive(Debug, Clone, Default)]
434pub struct CsvSource {
435 batch_size: Option<usize>,
436 file_schema: Option<SchemaRef>,
437 file_projection: Option<Vec<usize>>,
438 pub(crate) has_header: bool,
439 delimiter: u8,
440 quote: u8,
441 terminator: Option<u8>,
442 escape: Option<u8>,
443 comment: Option<u8>,
444 metrics: ExecutionPlanMetricsSet,
445 projected_statistics: Option<Statistics>,
446}
447
448impl CsvSource {
449 pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
451 Self {
452 has_header,
453 delimiter,
454 quote,
455 ..Self::default()
456 }
457 }
458
459 pub fn has_header(&self) -> bool {
461 self.has_header
462 }
463 pub fn delimiter(&self) -> u8 {
465 self.delimiter
466 }
467
468 pub fn quote(&self) -> u8 {
470 self.quote
471 }
472
473 pub fn terminator(&self) -> Option<u8> {
475 self.terminator
476 }
477
478 pub fn comment(&self) -> Option<u8> {
480 self.comment
481 }
482
483 pub fn escape(&self) -> Option<u8> {
485 self.escape
486 }
487
488 pub fn with_escape(&self, escape: Option<u8>) -> Self {
490 let mut conf = self.clone();
491 conf.escape = escape;
492 conf
493 }
494
495 pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
497 let mut conf = self.clone();
498 conf.terminator = terminator;
499 conf
500 }
501
502 pub fn with_comment(&self, comment: Option<u8>) -> Self {
504 let mut conf = self.clone();
505 conf.comment = comment;
506 conf
507 }
508}
509
510impl CsvSource {
511 fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
512 Ok(self.builder().build(reader)?)
513 }
514
515 fn builder(&self) -> csv::ReaderBuilder {
516 let mut builder = csv::ReaderBuilder::new(Arc::clone(
517 self.file_schema
518 .as_ref()
519 .expect("Schema must be set before initializing builder"),
520 ))
521 .with_delimiter(self.delimiter)
522 .with_batch_size(
523 self.batch_size
524 .expect("Batch size must be set before initializing builder"),
525 )
526 .with_header(self.has_header)
527 .with_quote(self.quote);
528 if let Some(terminator) = self.terminator {
529 builder = builder.with_terminator(terminator);
530 }
531 if let Some(proj) = &self.file_projection {
532 builder = builder.with_projection(proj.clone());
533 }
534 if let Some(escape) = self.escape {
535 builder = builder.with_escape(escape)
536 }
537 if let Some(comment) = self.comment {
538 builder = builder.with_comment(comment);
539 }
540
541 builder
542 }
543}
544
545pub struct CsvOpener {
547 config: Arc<CsvSource>,
548 file_compression_type: FileCompressionType,
549 object_store: Arc<dyn ObjectStore>,
550}
551
552impl CsvOpener {
553 pub fn new(
555 config: Arc<CsvSource>,
556 file_compression_type: FileCompressionType,
557 object_store: Arc<dyn ObjectStore>,
558 ) -> Self {
559 Self {
560 config,
561 file_compression_type,
562 object_store,
563 }
564 }
565}
566
567impl FileSource for CsvSource {
568 fn create_file_opener(
569 &self,
570 object_store: Arc<dyn ObjectStore>,
571 base_config: &FileScanConfig,
572 _partition: usize,
573 ) -> Arc<dyn FileOpener> {
574 Arc::new(CsvOpener {
575 config: Arc::new(self.clone()),
576 file_compression_type: base_config.file_compression_type,
577 object_store,
578 })
579 }
580
581 fn as_any(&self) -> &dyn Any {
582 self
583 }
584
585 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
586 let mut conf = self.clone();
587 conf.batch_size = Some(batch_size);
588 Arc::new(conf)
589 }
590
591 fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
592 let mut conf = self.clone();
593 conf.file_schema = Some(schema);
594 Arc::new(conf)
595 }
596
597 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
598 let mut conf = self.clone();
599 conf.projected_statistics = Some(statistics);
600 Arc::new(conf)
601 }
602
603 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
604 let mut conf = self.clone();
605 conf.file_projection = config.file_column_projection_indices();
606 Arc::new(conf)
607 }
608
609 fn metrics(&self) -> &ExecutionPlanMetricsSet {
610 &self.metrics
611 }
612 fn statistics(&self) -> Result<Statistics> {
613 let statistics = &self.projected_statistics;
614 Ok(statistics
615 .clone()
616 .expect("projected_statistics must be set"))
617 }
618 fn file_type(&self) -> &str {
619 "csv"
620 }
621 fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
622 match t {
623 DisplayFormatType::Default | DisplayFormatType::Verbose => {
624 write!(f, ", has_header={}", self.has_header)
625 }
626 DisplayFormatType::TreeRender => Ok(()),
627 }
628 }
629}
630
631impl FileOpener for CsvOpener {
632 fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
656 let mut csv_has_header = self.config.has_header;
660 if let Some(FileRange { start, .. }) = file_meta.range {
661 if start != 0 {
662 csv_has_header = false;
663 }
664 }
665
666 let config = CsvSource {
667 has_header: csv_has_header,
668 ..(*self.config).clone()
669 };
670
671 let file_compression_type = self.file_compression_type.to_owned();
672
673 if file_meta.range.is_some() {
674 assert!(
675 !file_compression_type.is_compressed(),
676 "Reading compressed .csv in parallel is not supported"
677 );
678 }
679
680 let store = Arc::clone(&self.object_store);
681 let terminator = self.config.terminator;
682
683 Ok(Box::pin(async move {
684 let calculated_range =
687 calculate_range(&file_meta, &store, terminator).await?;
688
689 let range = match calculated_range {
690 RangeCalculation::Range(None) => None,
691 RangeCalculation::Range(Some(range)) => Some(range.into()),
692 RangeCalculation::TerminateEarly => {
693 return Ok(
694 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
695 )
696 }
697 };
698
699 let options = GetOptions {
700 range,
701 ..Default::default()
702 };
703
704 let result = store.get_opts(file_meta.location(), options).await?;
705
706 match result.payload {
707 #[cfg(not(target_arch = "wasm32"))]
708 GetResultPayload::File(mut file, _) => {
709 let is_whole_file_scanned = file_meta.range.is_none();
710 let decoder = if is_whole_file_scanned {
711 file_compression_type.convert_read(file)?
713 } else {
714 file.seek(SeekFrom::Start(result.range.start as _))?;
715 file_compression_type.convert_read(
716 file.take((result.range.end - result.range.start) as u64),
717 )?
718 };
719
720 Ok(futures::stream::iter(config.open(decoder)?).boxed())
721 }
722 GetResultPayload::Stream(s) => {
723 let decoder = config.builder().build_decoder();
724 let s = s.map_err(DataFusionError::from);
725 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
726
727 Ok(deserialize_stream(
728 input,
729 DecoderDeserializer::new(CsvDecoder::new(decoder)),
730 ))
731 }
732 }
733 }))
734 }
735}
736
737pub async fn plan_to_csv(
738 task_ctx: Arc<TaskContext>,
739 plan: Arc<dyn ExecutionPlan>,
740 path: impl AsRef<str>,
741) -> Result<()> {
742 let path = path.as_ref();
743 let parsed = ListingTableUrl::parse(path)?;
744 let object_store_url = parsed.object_store();
745 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
746 let mut join_set = JoinSet::new();
747 for i in 0..plan.output_partitioning().partition_count() {
748 let storeref = Arc::clone(&store);
749 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
750 let filename = format!("{}/part-{i}.csv", parsed.prefix());
751 let file = object_store::path::Path::parse(filename)?;
752
753 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
754 join_set.spawn(async move {
755 let mut buf_writer = BufWriter::new(storeref, file.clone());
756 let mut buffer = Vec::with_capacity(1024);
757 let mut write_headers = true;
759 while let Some(batch) = stream.next().await.transpose()? {
760 let mut writer = csv::WriterBuilder::new()
761 .with_header(write_headers)
762 .build(buffer);
763 writer.write(&batch)?;
764 buffer = writer.into_inner();
765 buf_writer.write_all(&buffer).await?;
766 buffer.clear();
767 write_headers = false;
769 }
770 buf_writer.shutdown().await.map_err(DataFusionError::from)
771 });
772 }
773
774 while let Some(result) = join_set.join_next().await {
775 match result {
776 Ok(res) => res?, Err(e) => {
778 if e.is_panic() {
779 std::panic::resume_unwind(e.into_panic());
780 } else {
781 unreachable!();
782 }
783 }
784 }
785 }
786
787 Ok(())
788}