datafusion_datasource_csv/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading CSV files
19
20use std::any::Any;
21use std::fmt;
22use std::io::{Read, Seek, SeekFrom};
23use std::sync::Arc;
24use std::task::Poll;
25
26use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_meta::FileMeta;
29use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
30use datafusion_datasource::{
31    calculate_range, FileRange, ListingTableUrl, RangeCalculation,
32};
33
34use arrow::csv;
35use arrow::datatypes::SchemaRef;
36use datafusion_common::config::ConfigOptions;
37use datafusion_common::{Constraints, DataFusionError, Result, Statistics};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_datasource::source::DataSourceExec;
42use datafusion_execution::{SendableRecordBatchStream, TaskContext};
43use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
44use datafusion_physical_expr_common::sort_expr::LexOrdering;
45use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
46use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
47use datafusion_physical_plan::projection::ProjectionExec;
48use datafusion_physical_plan::{
49    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
50};
51
52use crate::file_format::CsvDecoder;
53use datafusion_datasource::file_groups::FileGroup;
54use futures::{StreamExt, TryStreamExt};
55use object_store::buffered::BufWriter;
56use object_store::{GetOptions, GetResultPayload, ObjectStore};
57use tokio::io::AsyncWriteExt;
58
59/// Old Csv source, deprecated with DataSourceExec implementation and CsvSource
60///
61/// See examples on `CsvSource`
62#[derive(Debug, Clone)]
63#[deprecated(since = "46.0.0", note = "use DataSourceExec instead")]
64pub struct CsvExec {
65    base_config: FileScanConfig,
66    inner: DataSourceExec,
67}
68
69/// Builder for [`CsvExec`].
70///
71/// See example on [`CsvExec`].
72#[derive(Debug, Clone)]
73#[deprecated(since = "46.0.0", note = "use FileScanConfig instead")]
74pub struct CsvExecBuilder {
75    file_scan_config: FileScanConfig,
76    file_compression_type: FileCompressionType,
77    // TODO: it seems like these format options could be reused across all the various CSV config
78    has_header: bool,
79    delimiter: u8,
80    quote: u8,
81    terminator: Option<u8>,
82    escape: Option<u8>,
83    comment: Option<u8>,
84    newlines_in_values: bool,
85}
86
87#[allow(unused, deprecated)]
88impl CsvExecBuilder {
89    /// Create a new builder to read the provided file scan configuration.
90    pub fn new(file_scan_config: FileScanConfig) -> Self {
91        Self {
92            file_scan_config,
93            // TODO: these defaults are duplicated from `CsvOptions` - should they be computed?
94            has_header: false,
95            delimiter: b',',
96            quote: b'"',
97            terminator: None,
98            escape: None,
99            comment: None,
100            newlines_in_values: false,
101            file_compression_type: FileCompressionType::UNCOMPRESSED,
102        }
103    }
104
105    /// Set whether the first row defines the column names.
106    ///
107    /// The default value is `false`.
108    pub fn with_has_header(mut self, has_header: bool) -> Self {
109        self.has_header = has_header;
110        self
111    }
112
113    /// Set the column delimeter.
114    ///
115    /// The default is `,`.
116    pub fn with_delimeter(mut self, delimiter: u8) -> Self {
117        self.delimiter = delimiter;
118        self
119    }
120
121    /// Set the quote character.
122    ///
123    /// The default is `"`.
124    pub fn with_quote(mut self, quote: u8) -> Self {
125        self.quote = quote;
126        self
127    }
128
129    /// Set the line terminator. If not set, the default is CRLF.
130    ///
131    /// The default is None.
132    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
133        self.terminator = terminator;
134        self
135    }
136
137    /// Set the escape character.
138    ///
139    /// The default is `None` (i.e. quotes cannot be escaped).
140    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
141        self.escape = escape;
142        self
143    }
144
145    /// Set the comment character.
146    ///
147    /// The default is `None` (i.e. comments are not supported).
148    pub fn with_comment(mut self, comment: Option<u8>) -> Self {
149        self.comment = comment;
150        self
151    }
152
153    /// Set whether newlines in (quoted) values are supported.
154    ///
155    /// Parsing newlines in quoted values may be affected by execution behaviour such as
156    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
157    /// parsed successfully, which may reduce performance.
158    ///
159    /// The default value is `false`.
160    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
161        self.newlines_in_values = newlines_in_values;
162        self
163    }
164
165    /// Set the file compression type.
166    ///
167    /// The default is [`FileCompressionType::UNCOMPRESSED`].
168    pub fn with_file_compression_type(
169        mut self,
170        file_compression_type: FileCompressionType,
171    ) -> Self {
172        self.file_compression_type = file_compression_type;
173        self
174    }
175
176    /// Build a [`CsvExec`].
177    #[must_use]
178    pub fn build(self) -> CsvExec {
179        let Self {
180            file_scan_config: base_config,
181            file_compression_type,
182            has_header,
183            delimiter,
184            quote,
185            terminator,
186            escape,
187            comment,
188            newlines_in_values,
189        } = self;
190
191        let (
192            projected_schema,
193            projected_constraints,
194            projected_statistics,
195            projected_output_ordering,
196        ) = base_config.project();
197        let cache = CsvExec::compute_properties(
198            projected_schema,
199            &projected_output_ordering,
200            projected_constraints,
201            &base_config,
202        );
203        let csv = CsvSource::new(has_header, delimiter, quote)
204            .with_comment(comment)
205            .with_escape(escape)
206            .with_terminator(terminator);
207        let base_config = base_config
208            .with_newlines_in_values(newlines_in_values)
209            .with_file_compression_type(file_compression_type)
210            .with_source(Arc::new(csv));
211
212        CsvExec {
213            inner: DataSourceExec::new(Arc::new(base_config.clone())),
214            base_config,
215        }
216    }
217}
218
219#[allow(unused, deprecated)]
220impl CsvExec {
221    /// Create a new CSV reader execution plan provided base and specific configurations
222    #[allow(clippy::too_many_arguments)]
223    pub fn new(
224        base_config: FileScanConfig,
225        has_header: bool,
226        delimiter: u8,
227        quote: u8,
228        terminator: Option<u8>,
229        escape: Option<u8>,
230        comment: Option<u8>,
231        newlines_in_values: bool,
232        file_compression_type: FileCompressionType,
233    ) -> Self {
234        CsvExecBuilder::new(base_config)
235            .with_has_header(has_header)
236            .with_delimeter(delimiter)
237            .with_quote(quote)
238            .with_terminator(terminator)
239            .with_escape(escape)
240            .with_comment(comment)
241            .with_newlines_in_values(newlines_in_values)
242            .with_file_compression_type(file_compression_type)
243            .build()
244    }
245
246    /// Return a [`CsvExecBuilder`].
247    ///
248    /// See example on [`CsvExec`] and [`CsvExecBuilder`] for specifying CSV table options.
249    pub fn builder(file_scan_config: FileScanConfig) -> CsvExecBuilder {
250        CsvExecBuilder::new(file_scan_config)
251    }
252
253    /// Ref to the base configs
254    pub fn base_config(&self) -> &FileScanConfig {
255        &self.base_config
256    }
257
258    fn file_scan_config(&self) -> FileScanConfig {
259        self.inner
260            .data_source()
261            .as_any()
262            .downcast_ref::<FileScanConfig>()
263            .unwrap()
264            .clone()
265    }
266
267    fn csv_source(&self) -> CsvSource {
268        let source = self.file_scan_config();
269        source
270            .file_source()
271            .as_any()
272            .downcast_ref::<CsvSource>()
273            .unwrap()
274            .clone()
275    }
276
277    /// true if the first line of each file is a header
278    pub fn has_header(&self) -> bool {
279        self.csv_source().has_header()
280    }
281
282    /// Specifies whether newlines in (quoted) values are supported.
283    ///
284    /// Parsing newlines in quoted values may be affected by execution behaviour such as
285    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
286    /// parsed successfully, which may reduce performance.
287    ///
288    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
289    pub fn newlines_in_values(&self) -> bool {
290        let source = self.file_scan_config();
291        source.newlines_in_values()
292    }
293
294    fn output_partitioning_helper(file_scan_config: &FileScanConfig) -> Partitioning {
295        Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
296    }
297
298    /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
299    fn compute_properties(
300        schema: SchemaRef,
301        orderings: &[LexOrdering],
302        constraints: Constraints,
303        file_scan_config: &FileScanConfig,
304    ) -> PlanProperties {
305        // Equivalence Properties
306        let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings)
307            .with_constraints(constraints);
308
309        PlanProperties::new(
310            eq_properties,
311            Self::output_partitioning_helper(file_scan_config), // Output Partitioning
312            EmissionType::Incremental,
313            Boundedness::Bounded,
314        )
315    }
316
317    fn with_file_groups(mut self, file_groups: Vec<FileGroup>) -> Self {
318        self.base_config.file_groups = file_groups.clone();
319        let mut file_source = self.file_scan_config();
320        file_source = file_source.with_file_groups(file_groups);
321        self.inner = self.inner.with_data_source(Arc::new(file_source));
322        self
323    }
324}
325
326#[allow(unused, deprecated)]
327impl DisplayAs for CsvExec {
328    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
329        self.inner.fmt_as(t, f)
330    }
331}
332
333#[allow(unused, deprecated)]
334impl ExecutionPlan for CsvExec {
335    fn name(&self) -> &'static str {
336        "CsvExec"
337    }
338
339    /// Return a reference to Any that can be used for downcasting
340    fn as_any(&self) -> &dyn Any {
341        self
342    }
343
344    fn properties(&self) -> &PlanProperties {
345        self.inner.properties()
346    }
347
348    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
349        // this is a leaf node and has no children
350        vec![]
351    }
352
353    fn with_new_children(
354        self: Arc<Self>,
355        _: Vec<Arc<dyn ExecutionPlan>>,
356    ) -> Result<Arc<dyn ExecutionPlan>> {
357        Ok(self)
358    }
359
360    /// Redistribute files across partitions according to their size
361    /// See comments on `FileGroupPartitioner` for more detail.
362    ///
363    /// Return `None` if can't get repartitioned (empty, compressed file, or `newlines_in_values` set).
364    fn repartitioned(
365        &self,
366        target_partitions: usize,
367        config: &ConfigOptions,
368    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
369        self.inner.repartitioned(target_partitions, config)
370    }
371
372    fn execute(
373        &self,
374        partition: usize,
375        context: Arc<TaskContext>,
376    ) -> Result<SendableRecordBatchStream> {
377        self.inner.execute(partition, context)
378    }
379
380    fn statistics(&self) -> Result<Statistics> {
381        self.inner.statistics()
382    }
383
384    fn metrics(&self) -> Option<MetricsSet> {
385        self.inner.metrics()
386    }
387
388    fn fetch(&self) -> Option<usize> {
389        self.inner.fetch()
390    }
391
392    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
393        self.inner.with_fetch(limit)
394    }
395
396    fn try_swapping_with_projection(
397        &self,
398        projection: &ProjectionExec,
399    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
400        self.inner.try_swapping_with_projection(projection)
401    }
402}
403
404/// A Config for [`CsvOpener`]
405///
406/// # Example: create a `DataSourceExec` for CSV
407/// ```
408/// # use std::sync::Arc;
409/// # use arrow::datatypes::Schema;
410/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
411/// # use datafusion_datasource::PartitionedFile;
412/// # use datafusion_datasource_csv::source::CsvSource;
413/// # use datafusion_execution::object_store::ObjectStoreUrl;
414/// # use datafusion_datasource::source::DataSourceExec;
415///
416/// # let object_store_url = ObjectStoreUrl::local_filesystem();
417/// # let file_schema = Arc::new(Schema::empty());
418///
419/// let source = Arc::new(CsvSource::new(
420///         true,
421///         b',',
422///         b'"',
423///     )
424///     .with_terminator(Some(b'#')
425/// ));
426/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
427/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
428///     .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
429///     .with_newlines_in_values(true) // The file contains newlines in values;
430///     .build();
431/// let exec = (DataSourceExec::from_data_source(config));
432/// ```
433#[derive(Debug, Clone, Default)]
434pub struct CsvSource {
435    batch_size: Option<usize>,
436    file_schema: Option<SchemaRef>,
437    file_projection: Option<Vec<usize>>,
438    pub(crate) has_header: bool,
439    delimiter: u8,
440    quote: u8,
441    terminator: Option<u8>,
442    escape: Option<u8>,
443    comment: Option<u8>,
444    metrics: ExecutionPlanMetricsSet,
445    projected_statistics: Option<Statistics>,
446}
447
448impl CsvSource {
449    /// Returns a [`CsvSource`]
450    pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
451        Self {
452            has_header,
453            delimiter,
454            quote,
455            ..Self::default()
456        }
457    }
458
459    /// true if the first line of each file is a header
460    pub fn has_header(&self) -> bool {
461        self.has_header
462    }
463    /// A column delimiter
464    pub fn delimiter(&self) -> u8 {
465        self.delimiter
466    }
467
468    /// The quote character
469    pub fn quote(&self) -> u8 {
470        self.quote
471    }
472
473    /// The line terminator
474    pub fn terminator(&self) -> Option<u8> {
475        self.terminator
476    }
477
478    /// Lines beginning with this byte are ignored.
479    pub fn comment(&self) -> Option<u8> {
480        self.comment
481    }
482
483    /// The escape character
484    pub fn escape(&self) -> Option<u8> {
485        self.escape
486    }
487
488    /// Initialize a CsvSource with escape
489    pub fn with_escape(&self, escape: Option<u8>) -> Self {
490        let mut conf = self.clone();
491        conf.escape = escape;
492        conf
493    }
494
495    /// Initialize a CsvSource with terminator
496    pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
497        let mut conf = self.clone();
498        conf.terminator = terminator;
499        conf
500    }
501
502    /// Initialize a CsvSource with comment
503    pub fn with_comment(&self, comment: Option<u8>) -> Self {
504        let mut conf = self.clone();
505        conf.comment = comment;
506        conf
507    }
508}
509
510impl CsvSource {
511    fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
512        Ok(self.builder().build(reader)?)
513    }
514
515    fn builder(&self) -> csv::ReaderBuilder {
516        let mut builder = csv::ReaderBuilder::new(Arc::clone(
517            self.file_schema
518                .as_ref()
519                .expect("Schema must be set before initializing builder"),
520        ))
521        .with_delimiter(self.delimiter)
522        .with_batch_size(
523            self.batch_size
524                .expect("Batch size must be set before initializing builder"),
525        )
526        .with_header(self.has_header)
527        .with_quote(self.quote);
528        if let Some(terminator) = self.terminator {
529            builder = builder.with_terminator(terminator);
530        }
531        if let Some(proj) = &self.file_projection {
532            builder = builder.with_projection(proj.clone());
533        }
534        if let Some(escape) = self.escape {
535            builder = builder.with_escape(escape)
536        }
537        if let Some(comment) = self.comment {
538            builder = builder.with_comment(comment);
539        }
540
541        builder
542    }
543}
544
545/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`]
546pub struct CsvOpener {
547    config: Arc<CsvSource>,
548    file_compression_type: FileCompressionType,
549    object_store: Arc<dyn ObjectStore>,
550}
551
552impl CsvOpener {
553    /// Returns a [`CsvOpener`]
554    pub fn new(
555        config: Arc<CsvSource>,
556        file_compression_type: FileCompressionType,
557        object_store: Arc<dyn ObjectStore>,
558    ) -> Self {
559        Self {
560            config,
561            file_compression_type,
562            object_store,
563        }
564    }
565}
566
567impl FileSource for CsvSource {
568    fn create_file_opener(
569        &self,
570        object_store: Arc<dyn ObjectStore>,
571        base_config: &FileScanConfig,
572        _partition: usize,
573    ) -> Arc<dyn FileOpener> {
574        Arc::new(CsvOpener {
575            config: Arc::new(self.clone()),
576            file_compression_type: base_config.file_compression_type,
577            object_store,
578        })
579    }
580
581    fn as_any(&self) -> &dyn Any {
582        self
583    }
584
585    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
586        let mut conf = self.clone();
587        conf.batch_size = Some(batch_size);
588        Arc::new(conf)
589    }
590
591    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
592        let mut conf = self.clone();
593        conf.file_schema = Some(schema);
594        Arc::new(conf)
595    }
596
597    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
598        let mut conf = self.clone();
599        conf.projected_statistics = Some(statistics);
600        Arc::new(conf)
601    }
602
603    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
604        let mut conf = self.clone();
605        conf.file_projection = config.file_column_projection_indices();
606        Arc::new(conf)
607    }
608
609    fn metrics(&self) -> &ExecutionPlanMetricsSet {
610        &self.metrics
611    }
612    fn statistics(&self) -> Result<Statistics> {
613        let statistics = &self.projected_statistics;
614        Ok(statistics
615            .clone()
616            .expect("projected_statistics must be set"))
617    }
618    fn file_type(&self) -> &str {
619        "csv"
620    }
621    fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
622        match t {
623            DisplayFormatType::Default | DisplayFormatType::Verbose => {
624                write!(f, ", has_header={}", self.has_header)
625            }
626            DisplayFormatType::TreeRender => Ok(()),
627        }
628    }
629}
630
631impl FileOpener for CsvOpener {
632    /// Open a partitioned CSV file.
633    ///
634    /// If `file_meta.range` is `None`, the entire file is opened.
635    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies that the partition
636    /// corresponds to the byte range [start, end) within the file.
637    ///
638    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
639    /// are applied to determine which lines to read:
640    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
641    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
642    ///
643    /// Examples:
644    /// Consider the following partitions enclosed by braces `{}`:
645    ///
646    /// {A,1,2,3,4,5,6,7,8,9\n
647    ///  A,1,2,3,4,5,6,7,8,9\n}
648    ///  A,1,2,3,4,5,6,7,8,9\n
649    ///  The lines read would be: [0, 1]
650    ///
651    ///  A,{1,2,3,4,5,6,7,8,9\n
652    ///  A,1,2,3,4,5,6,7,8,9\n
653    ///  A},1,2,3,4,5,6,7,8,9\n
654    ///  The lines read would be: [1, 2]
655    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
656        // `self.config.has_header` controls whether to skip reading the 1st line header
657        // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
658        // partition, then don't skip first line
659        let mut csv_has_header = self.config.has_header;
660        if let Some(FileRange { start, .. }) = file_meta.range {
661            if start != 0 {
662                csv_has_header = false;
663            }
664        }
665
666        let config = CsvSource {
667            has_header: csv_has_header,
668            ..(*self.config).clone()
669        };
670
671        let file_compression_type = self.file_compression_type.to_owned();
672
673        if file_meta.range.is_some() {
674            assert!(
675                !file_compression_type.is_compressed(),
676                "Reading compressed .csv in parallel is not supported"
677            );
678        }
679
680        let store = Arc::clone(&self.object_store);
681        let terminator = self.config.terminator;
682
683        Ok(Box::pin(async move {
684            // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
685
686            let calculated_range =
687                calculate_range(&file_meta, &store, terminator).await?;
688
689            let range = match calculated_range {
690                RangeCalculation::Range(None) => None,
691                RangeCalculation::Range(Some(range)) => Some(range.into()),
692                RangeCalculation::TerminateEarly => {
693                    return Ok(
694                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
695                    )
696                }
697            };
698
699            let options = GetOptions {
700                range,
701                ..Default::default()
702            };
703
704            let result = store.get_opts(file_meta.location(), options).await?;
705
706            match result.payload {
707                #[cfg(not(target_arch = "wasm32"))]
708                GetResultPayload::File(mut file, _) => {
709                    let is_whole_file_scanned = file_meta.range.is_none();
710                    let decoder = if is_whole_file_scanned {
711                        // Don't seek if no range as breaks FIFO files
712                        file_compression_type.convert_read(file)?
713                    } else {
714                        file.seek(SeekFrom::Start(result.range.start as _))?;
715                        file_compression_type.convert_read(
716                            file.take((result.range.end - result.range.start) as u64),
717                        )?
718                    };
719
720                    Ok(futures::stream::iter(config.open(decoder)?).boxed())
721                }
722                GetResultPayload::Stream(s) => {
723                    let decoder = config.builder().build_decoder();
724                    let s = s.map_err(DataFusionError::from);
725                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
726
727                    Ok(deserialize_stream(
728                        input,
729                        DecoderDeserializer::new(CsvDecoder::new(decoder)),
730                    ))
731                }
732            }
733        }))
734    }
735}
736
737pub async fn plan_to_csv(
738    task_ctx: Arc<TaskContext>,
739    plan: Arc<dyn ExecutionPlan>,
740    path: impl AsRef<str>,
741) -> Result<()> {
742    let path = path.as_ref();
743    let parsed = ListingTableUrl::parse(path)?;
744    let object_store_url = parsed.object_store();
745    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
746    let mut join_set = JoinSet::new();
747    for i in 0..plan.output_partitioning().partition_count() {
748        let storeref = Arc::clone(&store);
749        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
750        let filename = format!("{}/part-{i}.csv", parsed.prefix());
751        let file = object_store::path::Path::parse(filename)?;
752
753        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
754        join_set.spawn(async move {
755            let mut buf_writer = BufWriter::new(storeref, file.clone());
756            let mut buffer = Vec::with_capacity(1024);
757            //only write headers on first iteration
758            let mut write_headers = true;
759            while let Some(batch) = stream.next().await.transpose()? {
760                let mut writer = csv::WriterBuilder::new()
761                    .with_header(write_headers)
762                    .build(buffer);
763                writer.write(&batch)?;
764                buffer = writer.into_inner();
765                buf_writer.write_all(&buffer).await?;
766                buffer.clear();
767                //prevent writing headers more than once
768                write_headers = false;
769            }
770            buf_writer.shutdown().await.map_err(DataFusionError::from)
771        });
772    }
773
774    while let Some(result) = join_set.join_next().await {
775        match result {
776            Ok(res) => res?, // propagate DataFusion error
777            Err(e) => {
778                if e.is_panic() {
779                    std::panic::resume_unwind(e.into_panic());
780                } else {
781                    unreachable!();
782                }
783            }
784        }
785    }
786
787    Ok(())
788}