datafusion_datasource_csv/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading CSV files
19
20use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
21use datafusion_physical_plan::projection::ProjectionExprs;
22use std::any::Any;
23use std::fmt;
24use std::io::{Read, Seek, SeekFrom};
25use std::sync::Arc;
26use std::task::Poll;
27
28use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
29use datafusion_datasource::file_compression_type::FileCompressionType;
30use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
31use datafusion_datasource::{
32    FileRange, ListingTableUrl, PartitionedFile, RangeCalculation, TableSchema,
33    as_file_source, calculate_range,
34};
35
36use arrow::csv;
37use datafusion_common::config::CsvOptions;
38use datafusion_common::{DataFusionError, Result};
39use datafusion_common_runtime::JoinSet;
40use datafusion_datasource::file::FileSource;
41use datafusion_datasource::file_scan_config::FileScanConfig;
42use datafusion_execution::TaskContext;
43use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
44use datafusion_physical_plan::{
45    DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
46};
47
48use crate::file_format::CsvDecoder;
49use futures::{StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53
54/// A Config for [`CsvOpener`]
55///
56/// # Example: create a `DataSourceExec` for CSV
57/// ```
58/// # use std::sync::Arc;
59/// # use arrow::datatypes::Schema;
60/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
61/// # use datafusion_datasource::PartitionedFile;
62/// # use datafusion_datasource_csv::source::CsvSource;
63/// # use datafusion_execution::object_store::ObjectStoreUrl;
64/// # use datafusion_datasource::source::DataSourceExec;
65/// # use datafusion_common::config::CsvOptions;
66///
67/// # let object_store_url = ObjectStoreUrl::local_filesystem();
68/// # let file_schema = Arc::new(Schema::empty());
69///
70/// let options = CsvOptions {
71///     has_header: Some(true),
72///     delimiter: b',',
73///     quote: b'"',
74///     newlines_in_values: Some(true), // The file contains newlines in values
75///     ..Default::default()
76/// };
77/// let source = Arc::new(CsvSource::new(file_schema.clone())
78///     .with_csv_options(options)
79///     .with_terminator(Some(b'#'))
80/// );
81/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
82/// let config = FileScanConfigBuilder::new(object_store_url, source)
83///     .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
84///     .build();
85/// let exec = (DataSourceExec::from_data_source(config));
86/// ```
87#[derive(Debug, Clone)]
88pub struct CsvSource {
89    options: CsvOptions,
90    batch_size: Option<usize>,
91    table_schema: TableSchema,
92    projection: SplitProjection,
93    metrics: ExecutionPlanMetricsSet,
94}
95
96impl CsvSource {
97    /// Returns a [`CsvSource`]
98    pub fn new(table_schema: impl Into<TableSchema>) -> Self {
99        let table_schema = table_schema.into();
100        Self {
101            options: CsvOptions::default(),
102            projection: SplitProjection::unprojected(&table_schema),
103            table_schema,
104            batch_size: None,
105            metrics: ExecutionPlanMetricsSet::new(),
106        }
107    }
108
109    /// Sets the CSV options
110    pub fn with_csv_options(mut self, options: CsvOptions) -> Self {
111        self.options = options;
112        self
113    }
114
115    /// true if the first line of each file is a header
116    pub fn has_header(&self) -> bool {
117        self.options.has_header.unwrap_or(true)
118    }
119
120    // true if rows length support truncate
121    pub fn truncate_rows(&self) -> bool {
122        self.options.truncated_rows.unwrap_or(false)
123    }
124    /// A column delimiter
125    pub fn delimiter(&self) -> u8 {
126        self.options.delimiter
127    }
128
129    /// The quote character
130    pub fn quote(&self) -> u8 {
131        self.options.quote
132    }
133
134    /// The line terminator
135    pub fn terminator(&self) -> Option<u8> {
136        self.options.terminator
137    }
138
139    /// Lines beginning with this byte are ignored.
140    pub fn comment(&self) -> Option<u8> {
141        self.options.comment
142    }
143
144    /// The escape character
145    pub fn escape(&self) -> Option<u8> {
146        self.options.escape
147    }
148
149    /// Initialize a CsvSource with escape
150    pub fn with_escape(&self, escape: Option<u8>) -> Self {
151        let mut conf = self.clone();
152        conf.options.escape = escape;
153        conf
154    }
155
156    /// Initialize a CsvSource with terminator
157    pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
158        let mut conf = self.clone();
159        conf.options.terminator = terminator;
160        conf
161    }
162
163    /// Initialize a CsvSource with comment
164    pub fn with_comment(&self, comment: Option<u8>) -> Self {
165        let mut conf = self.clone();
166        conf.options.comment = comment;
167        conf
168    }
169
170    /// Whether to support truncate rows when read csv file
171    pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
172        let mut conf = self.clone();
173        conf.options.truncated_rows = Some(truncate_rows);
174        conf
175    }
176
177    /// Whether values may contain newline characters
178    pub fn newlines_in_values(&self) -> bool {
179        self.options.newlines_in_values.unwrap_or(false)
180    }
181}
182
183impl CsvSource {
184    fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
185        Ok(self.builder().build(reader)?)
186    }
187
188    fn builder(&self) -> csv::ReaderBuilder {
189        let mut builder =
190            csv::ReaderBuilder::new(Arc::clone(self.table_schema.file_schema()))
191                .with_delimiter(self.delimiter())
192                .with_batch_size(
193                    self.batch_size
194                        .expect("Batch size must be set before initializing builder"),
195                )
196                .with_header(self.has_header())
197                .with_quote(self.quote())
198                .with_truncated_rows(self.truncate_rows());
199        if let Some(terminator) = self.terminator() {
200            builder = builder.with_terminator(terminator);
201        }
202        builder = builder.with_projection(self.projection.file_indices.clone());
203        if let Some(escape) = self.escape() {
204            builder = builder.with_escape(escape)
205        }
206        if let Some(comment) = self.comment() {
207            builder = builder.with_comment(comment);
208        }
209
210        builder
211    }
212}
213
214/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`]
215pub struct CsvOpener {
216    config: Arc<CsvSource>,
217    file_compression_type: FileCompressionType,
218    object_store: Arc<dyn ObjectStore>,
219    partition_index: usize,
220}
221
222impl CsvOpener {
223    /// Returns a [`CsvOpener`]
224    pub fn new(
225        config: Arc<CsvSource>,
226        file_compression_type: FileCompressionType,
227        object_store: Arc<dyn ObjectStore>,
228    ) -> Self {
229        Self {
230            config,
231            file_compression_type,
232            object_store,
233            partition_index: 0,
234        }
235    }
236}
237
238impl From<CsvSource> for Arc<dyn FileSource> {
239    fn from(source: CsvSource) -> Self {
240        as_file_source(source)
241    }
242}
243
244impl FileSource for CsvSource {
245    fn create_file_opener(
246        &self,
247        object_store: Arc<dyn ObjectStore>,
248        base_config: &FileScanConfig,
249        partition_index: usize,
250    ) -> Result<Arc<dyn FileOpener>> {
251        let mut opener = Arc::new(CsvOpener {
252            config: Arc::new(self.clone()),
253            file_compression_type: base_config.file_compression_type,
254            object_store,
255            partition_index,
256        }) as Arc<dyn FileOpener>;
257        opener = ProjectionOpener::try_new(
258            self.projection.clone(),
259            Arc::clone(&opener),
260            self.table_schema.file_schema(),
261        )?;
262        Ok(opener)
263    }
264
265    fn as_any(&self) -> &dyn Any {
266        self
267    }
268
269    fn table_schema(&self) -> &TableSchema {
270        &self.table_schema
271    }
272
273    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
274        let mut conf = self.clone();
275        conf.batch_size = Some(batch_size);
276        Arc::new(conf)
277    }
278
279    fn try_pushdown_projection(
280        &self,
281        projection: &ProjectionExprs,
282    ) -> Result<Option<Arc<dyn FileSource>>> {
283        let mut source = self.clone();
284        let new_projection = self.projection.source.try_merge(projection)?;
285        let split_projection =
286            SplitProjection::new(self.table_schema.file_schema(), &new_projection);
287        source.projection = split_projection;
288        Ok(Some(Arc::new(source)))
289    }
290
291    fn projection(&self) -> Option<&ProjectionExprs> {
292        Some(&self.projection.source)
293    }
294
295    fn metrics(&self) -> &ExecutionPlanMetricsSet {
296        &self.metrics
297    }
298
299    fn file_type(&self) -> &str {
300        "csv"
301    }
302
303    fn supports_repartitioning(&self) -> bool {
304        // Cannot repartition if values may contain newlines, as record
305        // boundaries cannot be determined by byte offset alone
306        !self.options.newlines_in_values.unwrap_or(false)
307    }
308
309    fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
310        match t {
311            DisplayFormatType::Default | DisplayFormatType::Verbose => {
312                write!(f, ", has_header={}", self.has_header())
313            }
314            DisplayFormatType::TreeRender => Ok(()),
315        }
316    }
317}
318
319impl FileOpener for CsvOpener {
320    /// Open a partitioned CSV file.
321    ///
322    /// If `file_meta.range` is `None`, the entire file is opened.
323    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies that the partition
324    /// corresponds to the byte range [start, end) within the file.
325    ///
326    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
327    /// are applied to determine which lines to read:
328    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
329    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
330    ///
331    /// Examples:
332    /// Consider the following partitions enclosed by braces `{}`:
333    ///
334    /// {A,1,2,3,4,5,6,7,8,9\n
335    ///  A,1,2,3,4,5,6,7,8,9\n}
336    ///  A,1,2,3,4,5,6,7,8,9\n
337    ///  The lines read would be: [0, 1]
338    ///
339    ///  A,{1,2,3,4,5,6,7,8,9\n
340    ///  A,1,2,3,4,5,6,7,8,9\n
341    ///  A},1,2,3,4,5,6,7,8,9\n
342    ///  The lines read would be: [1, 2]
343    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
344        // `self.config.has_header` controls whether to skip reading the 1st line header
345        // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
346        // partition, then don't skip first line
347        let mut csv_has_header = self.config.has_header();
348        if let Some(FileRange { start, .. }) = partitioned_file.range
349            && start != 0
350        {
351            csv_has_header = false;
352        }
353
354        let mut config = (*self.config).clone();
355        config.options.has_header = Some(csv_has_header);
356        config.options.truncated_rows = Some(config.truncate_rows());
357
358        let file_compression_type = self.file_compression_type.to_owned();
359
360        if partitioned_file.range.is_some() {
361            assert!(
362                !file_compression_type.is_compressed(),
363                "Reading compressed .csv in parallel is not supported"
364            );
365        }
366
367        let store = Arc::clone(&self.object_store);
368        let terminator = self.config.terminator();
369
370        let baseline_metrics =
371            BaselineMetrics::new(&self.config.metrics, self.partition_index);
372
373        Ok(Box::pin(async move {
374            // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
375
376            let calculated_range =
377                calculate_range(&partitioned_file, &store, terminator).await?;
378
379            let range = match calculated_range {
380                RangeCalculation::Range(None) => None,
381                RangeCalculation::Range(Some(range)) => Some(range.into()),
382                RangeCalculation::TerminateEarly => {
383                    return Ok(
384                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
385                    );
386                }
387            };
388
389            let options = GetOptions {
390                range,
391                ..Default::default()
392            };
393
394            let result = store
395                .get_opts(&partitioned_file.object_meta.location, options)
396                .await?;
397
398            match result.payload {
399                #[cfg(not(target_arch = "wasm32"))]
400                GetResultPayload::File(mut file, _) => {
401                    let is_whole_file_scanned = partitioned_file.range.is_none();
402                    let decoder = if is_whole_file_scanned {
403                        // Don't seek if no range as breaks FIFO files
404                        file_compression_type.convert_read(file)?
405                    } else {
406                        file.seek(SeekFrom::Start(result.range.start as _))?;
407                        file_compression_type.convert_read(
408                            file.take((result.range.end - result.range.start) as u64),
409                        )?
410                    };
411
412                    let mut reader = config.open(decoder)?;
413
414                    // Use std::iter::from_fn to wrap execution of iterator's next() method.
415                    let iterator = std::iter::from_fn(move || {
416                        let mut timer = baseline_metrics.elapsed_compute().timer();
417                        let result = reader.next();
418                        timer.stop();
419                        result
420                    });
421
422                    Ok(futures::stream::iter(iterator)
423                        .map(|r| r.map_err(Into::into))
424                        .boxed())
425                }
426                GetResultPayload::Stream(s) => {
427                    let decoder = config.builder().build_decoder();
428                    let s = s.map_err(DataFusionError::from);
429                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
430
431                    let stream = deserialize_stream(
432                        input,
433                        DecoderDeserializer::new(CsvDecoder::new(decoder)),
434                    );
435                    Ok(stream.map_err(Into::into).boxed())
436                }
437            }
438        }))
439    }
440}
441
442pub async fn plan_to_csv(
443    task_ctx: Arc<TaskContext>,
444    plan: Arc<dyn ExecutionPlan>,
445    path: impl AsRef<str>,
446) -> Result<()> {
447    let path = path.as_ref();
448    let parsed = ListingTableUrl::parse(path)?;
449    let object_store_url = parsed.object_store();
450    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
451    let writer_buffer_size = task_ctx
452        .session_config()
453        .options()
454        .execution
455        .objectstore_writer_buffer_size;
456    let mut join_set = JoinSet::new();
457    for i in 0..plan.output_partitioning().partition_count() {
458        let storeref = Arc::clone(&store);
459        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
460        let filename = format!("{}/part-{i}.csv", parsed.prefix());
461        let file = object_store::path::Path::parse(filename)?;
462
463        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
464        join_set.spawn(async move {
465            let mut buf_writer =
466                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
467            let mut buffer = Vec::with_capacity(1024);
468            //only write headers on first iteration
469            let mut write_headers = true;
470            while let Some(batch) = stream.next().await.transpose()? {
471                let mut writer = csv::WriterBuilder::new()
472                    .with_header(write_headers)
473                    .build(buffer);
474                writer.write(&batch)?;
475                buffer = writer.into_inner();
476                buf_writer.write_all(&buffer).await?;
477                buffer.clear();
478                //prevent writing headers more than once
479                write_headers = false;
480            }
481            buf_writer.shutdown().await.map_err(DataFusionError::from)
482        });
483    }
484
485    while let Some(result) = join_set.join_next().await {
486        match result {
487            Ok(res) => res?, // propagate DataFusion error
488            Err(e) => {
489                if e.is_panic() {
490                    std::panic::resume_unwind(e.into_panic());
491                } else {
492                    unreachable!();
493                }
494            }
495        }
496    }
497
498    Ok(())
499}