datafusion_datasource_csv/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading CSV files
19
20use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
21use std::any::Any;
22use std::fmt;
23use std::io::{Read, Seek, SeekFrom};
24use std::sync::Arc;
25use std::task::Poll;
26
27use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
30use datafusion_datasource::{
31    as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
32    RangeCalculation, TableSchema,
33};
34
35use arrow::csv;
36use arrow::datatypes::SchemaRef;
37use datafusion_common::{DataFusionError, Result, Statistics};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_execution::TaskContext;
42use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
43use datafusion_physical_plan::{
44    DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
45};
46
47use crate::file_format::CsvDecoder;
48use futures::{StreamExt, TryStreamExt};
49use object_store::buffered::BufWriter;
50use object_store::{GetOptions, GetResultPayload, ObjectStore};
51use tokio::io::AsyncWriteExt;
52
53/// A Config for [`CsvOpener`]
54///
55/// # Example: create a `DataSourceExec` for CSV
56/// ```
57/// # use std::sync::Arc;
58/// # use arrow::datatypes::Schema;
59/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
60/// # use datafusion_datasource::PartitionedFile;
61/// # use datafusion_datasource_csv::source::CsvSource;
62/// # use datafusion_execution::object_store::ObjectStoreUrl;
63/// # use datafusion_datasource::source::DataSourceExec;
64///
65/// # let object_store_url = ObjectStoreUrl::local_filesystem();
66/// # let file_schema = Arc::new(Schema::empty());
67///
68/// let source = Arc::new(CsvSource::new(
69///         true,
70///         b',',
71///         b'"',
72///     )
73///     .with_terminator(Some(b'#')
74/// ));
75/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
76/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
77///     .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
78///     .with_newlines_in_values(true) // The file contains newlines in values;
79///     .build();
80/// let exec = (DataSourceExec::from_data_source(config));
81/// ```
82#[derive(Debug, Clone, Default)]
83pub struct CsvSource {
84    batch_size: Option<usize>,
85    file_schema: Option<SchemaRef>,
86    file_projection: Option<Vec<usize>>,
87    pub(crate) has_header: bool,
88    delimiter: u8,
89    quote: u8,
90    terminator: Option<u8>,
91    escape: Option<u8>,
92    comment: Option<u8>,
93    metrics: ExecutionPlanMetricsSet,
94    projected_statistics: Option<Statistics>,
95    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
96    truncate_rows: bool,
97}
98
99impl CsvSource {
100    /// Returns a [`CsvSource`]
101    pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
102        Self {
103            has_header,
104            delimiter,
105            quote,
106            ..Self::default()
107        }
108    }
109
110    /// true if the first line of each file is a header
111    pub fn has_header(&self) -> bool {
112        self.has_header
113    }
114
115    // true if rows length support truncate
116    pub fn truncate_rows(&self) -> bool {
117        self.truncate_rows
118    }
119    /// A column delimiter
120    pub fn delimiter(&self) -> u8 {
121        self.delimiter
122    }
123
124    /// The quote character
125    pub fn quote(&self) -> u8 {
126        self.quote
127    }
128
129    /// The line terminator
130    pub fn terminator(&self) -> Option<u8> {
131        self.terminator
132    }
133
134    /// Lines beginning with this byte are ignored.
135    pub fn comment(&self) -> Option<u8> {
136        self.comment
137    }
138
139    /// The escape character
140    pub fn escape(&self) -> Option<u8> {
141        self.escape
142    }
143
144    /// Initialize a CsvSource with escape
145    pub fn with_escape(&self, escape: Option<u8>) -> Self {
146        let mut conf = self.clone();
147        conf.escape = escape;
148        conf
149    }
150
151    /// Initialize a CsvSource with terminator
152    pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
153        let mut conf = self.clone();
154        conf.terminator = terminator;
155        conf
156    }
157
158    /// Initialize a CsvSource with comment
159    pub fn with_comment(&self, comment: Option<u8>) -> Self {
160        let mut conf = self.clone();
161        conf.comment = comment;
162        conf
163    }
164
165    /// Whether to support truncate rows when read csv file
166    pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
167        let mut conf = self.clone();
168        conf.truncate_rows = truncate_rows;
169        conf
170    }
171}
172
173impl CsvSource {
174    fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
175        Ok(self.builder().build(reader)?)
176    }
177
178    fn builder(&self) -> csv::ReaderBuilder {
179        let mut builder = csv::ReaderBuilder::new(Arc::clone(
180            self.file_schema
181                .as_ref()
182                .expect("Schema must be set before initializing builder"),
183        ))
184        .with_delimiter(self.delimiter)
185        .with_batch_size(
186            self.batch_size
187                .expect("Batch size must be set before initializing builder"),
188        )
189        .with_header(self.has_header)
190        .with_quote(self.quote)
191        .with_truncated_rows(self.truncate_rows);
192        if let Some(terminator) = self.terminator {
193            builder = builder.with_terminator(terminator);
194        }
195        if let Some(proj) = &self.file_projection {
196            builder = builder.with_projection(proj.clone());
197        }
198        if let Some(escape) = self.escape {
199            builder = builder.with_escape(escape)
200        }
201        if let Some(comment) = self.comment {
202            builder = builder.with_comment(comment);
203        }
204
205        builder
206    }
207}
208
209/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`]
210pub struct CsvOpener {
211    config: Arc<CsvSource>,
212    file_compression_type: FileCompressionType,
213    object_store: Arc<dyn ObjectStore>,
214}
215
216impl CsvOpener {
217    /// Returns a [`CsvOpener`]
218    pub fn new(
219        config: Arc<CsvSource>,
220        file_compression_type: FileCompressionType,
221        object_store: Arc<dyn ObjectStore>,
222    ) -> Self {
223        Self {
224            config,
225            file_compression_type,
226            object_store,
227        }
228    }
229}
230
231impl From<CsvSource> for Arc<dyn FileSource> {
232    fn from(source: CsvSource) -> Self {
233        as_file_source(source)
234    }
235}
236
237impl FileSource for CsvSource {
238    fn create_file_opener(
239        &self,
240        object_store: Arc<dyn ObjectStore>,
241        base_config: &FileScanConfig,
242        _partition: usize,
243    ) -> Arc<dyn FileOpener> {
244        Arc::new(CsvOpener {
245            config: Arc::new(self.clone()),
246            file_compression_type: base_config.file_compression_type,
247            object_store,
248        })
249    }
250
251    fn as_any(&self) -> &dyn Any {
252        self
253    }
254
255    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
256        let mut conf = self.clone();
257        conf.batch_size = Some(batch_size);
258        Arc::new(conf)
259    }
260
261    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
262        let mut conf = self.clone();
263        conf.file_schema = Some(Arc::clone(schema.file_schema()));
264        Arc::new(conf)
265    }
266
267    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
268        let mut conf = self.clone();
269        conf.projected_statistics = Some(statistics);
270        Arc::new(conf)
271    }
272
273    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
274        let mut conf = self.clone();
275        conf.file_projection = config.file_column_projection_indices();
276        Arc::new(conf)
277    }
278
279    fn metrics(&self) -> &ExecutionPlanMetricsSet {
280        &self.metrics
281    }
282    fn statistics(&self) -> Result<Statistics> {
283        let statistics = &self.projected_statistics;
284        Ok(statistics
285            .clone()
286            .expect("projected_statistics must be set"))
287    }
288    fn file_type(&self) -> &str {
289        "csv"
290    }
291    fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
292        match t {
293            DisplayFormatType::Default | DisplayFormatType::Verbose => {
294                write!(f, ", has_header={}", self.has_header)
295            }
296            DisplayFormatType::TreeRender => Ok(()),
297        }
298    }
299
300    fn with_schema_adapter_factory(
301        &self,
302        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
303    ) -> Result<Arc<dyn FileSource>> {
304        Ok(Arc::new(Self {
305            schema_adapter_factory: Some(schema_adapter_factory),
306            ..self.clone()
307        }))
308    }
309
310    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
311        self.schema_adapter_factory.clone()
312    }
313}
314
315impl FileOpener for CsvOpener {
316    /// Open a partitioned CSV file.
317    ///
318    /// If `file_meta.range` is `None`, the entire file is opened.
319    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies that the partition
320    /// corresponds to the byte range [start, end) within the file.
321    ///
322    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
323    /// are applied to determine which lines to read:
324    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
325    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
326    ///
327    /// Examples:
328    /// Consider the following partitions enclosed by braces `{}`:
329    ///
330    /// {A,1,2,3,4,5,6,7,8,9\n
331    ///  A,1,2,3,4,5,6,7,8,9\n}
332    ///  A,1,2,3,4,5,6,7,8,9\n
333    ///  The lines read would be: [0, 1]
334    ///
335    ///  A,{1,2,3,4,5,6,7,8,9\n
336    ///  A,1,2,3,4,5,6,7,8,9\n
337    ///  A},1,2,3,4,5,6,7,8,9\n
338    ///  The lines read would be: [1, 2]
339    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
340        // `self.config.has_header` controls whether to skip reading the 1st line header
341        // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
342        // partition, then don't skip first line
343        let mut csv_has_header = self.config.has_header;
344        if let Some(FileRange { start, .. }) = partitioned_file.range {
345            if start != 0 {
346                csv_has_header = false;
347            }
348        }
349
350        let config = CsvSource {
351            has_header: csv_has_header,
352            truncate_rows: self.config.truncate_rows,
353            ..(*self.config).clone()
354        };
355
356        let file_compression_type = self.file_compression_type.to_owned();
357
358        if partitioned_file.range.is_some() {
359            assert!(
360                !file_compression_type.is_compressed(),
361                "Reading compressed .csv in parallel is not supported"
362            );
363        }
364
365        let store = Arc::clone(&self.object_store);
366        let terminator = self.config.terminator;
367
368        Ok(Box::pin(async move {
369            // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
370
371            let calculated_range =
372                calculate_range(&partitioned_file, &store, terminator).await?;
373
374            let range = match calculated_range {
375                RangeCalculation::Range(None) => None,
376                RangeCalculation::Range(Some(range)) => Some(range.into()),
377                RangeCalculation::TerminateEarly => {
378                    return Ok(
379                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
380                    )
381                }
382            };
383
384            let options = GetOptions {
385                range,
386                ..Default::default()
387            };
388
389            let result = store
390                .get_opts(&partitioned_file.object_meta.location, options)
391                .await?;
392
393            match result.payload {
394                #[cfg(not(target_arch = "wasm32"))]
395                GetResultPayload::File(mut file, _) => {
396                    let is_whole_file_scanned = partitioned_file.range.is_none();
397                    let decoder = if is_whole_file_scanned {
398                        // Don't seek if no range as breaks FIFO files
399                        file_compression_type.convert_read(file)?
400                    } else {
401                        file.seek(SeekFrom::Start(result.range.start as _))?;
402                        file_compression_type.convert_read(
403                            file.take((result.range.end - result.range.start) as u64),
404                        )?
405                    };
406
407                    Ok(futures::stream::iter(config.open(decoder)?)
408                        .map(|r| r.map_err(Into::into))
409                        .boxed())
410                }
411                GetResultPayload::Stream(s) => {
412                    let decoder = config.builder().build_decoder();
413                    let s = s.map_err(DataFusionError::from);
414                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
415
416                    let stream = deserialize_stream(
417                        input,
418                        DecoderDeserializer::new(CsvDecoder::new(decoder)),
419                    );
420                    Ok(stream.map_err(Into::into).boxed())
421                }
422            }
423        }))
424    }
425}
426
427pub async fn plan_to_csv(
428    task_ctx: Arc<TaskContext>,
429    plan: Arc<dyn ExecutionPlan>,
430    path: impl AsRef<str>,
431) -> Result<()> {
432    let path = path.as_ref();
433    let parsed = ListingTableUrl::parse(path)?;
434    let object_store_url = parsed.object_store();
435    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
436    let writer_buffer_size = task_ctx
437        .session_config()
438        .options()
439        .execution
440        .objectstore_writer_buffer_size;
441    let mut join_set = JoinSet::new();
442    for i in 0..plan.output_partitioning().partition_count() {
443        let storeref = Arc::clone(&store);
444        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
445        let filename = format!("{}/part-{i}.csv", parsed.prefix());
446        let file = object_store::path::Path::parse(filename)?;
447
448        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
449        join_set.spawn(async move {
450            let mut buf_writer =
451                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
452            let mut buffer = Vec::with_capacity(1024);
453            //only write headers on first iteration
454            let mut write_headers = true;
455            while let Some(batch) = stream.next().await.transpose()? {
456                let mut writer = csv::WriterBuilder::new()
457                    .with_header(write_headers)
458                    .build(buffer);
459                writer.write(&batch)?;
460                buffer = writer.into_inner();
461                buf_writer.write_all(&buffer).await?;
462                buffer.clear();
463                //prevent writing headers more than once
464                write_headers = false;
465            }
466            buf_writer.shutdown().await.map_err(DataFusionError::from)
467        });
468    }
469
470    while let Some(result) = join_set.join_next().await {
471        match result {
472            Ok(res) => res?, // propagate DataFusion error
473            Err(e) => {
474                if e.is_panic() {
475                    std::panic::resume_unwind(e.into_panic());
476                } else {
477                    unreachable!();
478                }
479            }
480        }
481    }
482
483    Ok(())
484}