datafusion_datasource_csv/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading CSV files
19
20use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
21use std::any::Any;
22use std::fmt;
23use std::io::{Read, Seek, SeekFrom};
24use std::sync::Arc;
25use std::task::Poll;
26
27use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_meta::FileMeta;
30use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
31use datafusion_datasource::{
32    as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
33    RangeCalculation,
34};
35
36use arrow::csv;
37use arrow::datatypes::SchemaRef;
38use datafusion_common::{DataFusionError, Result, Statistics};
39use datafusion_common_runtime::JoinSet;
40use datafusion_datasource::file::FileSource;
41use datafusion_datasource::file_scan_config::FileScanConfig;
42use datafusion_execution::TaskContext;
43use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
44use datafusion_physical_plan::{
45    DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
46};
47
48use crate::file_format::CsvDecoder;
49use futures::{StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53
54/// A Config for [`CsvOpener`]
55///
56/// # Example: create a `DataSourceExec` for CSV
57/// ```
58/// # use std::sync::Arc;
59/// # use arrow::datatypes::Schema;
60/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
61/// # use datafusion_datasource::PartitionedFile;
62/// # use datafusion_datasource_csv::source::CsvSource;
63/// # use datafusion_execution::object_store::ObjectStoreUrl;
64/// # use datafusion_datasource::source::DataSourceExec;
65///
66/// # let object_store_url = ObjectStoreUrl::local_filesystem();
67/// # let file_schema = Arc::new(Schema::empty());
68///
69/// let source = Arc::new(CsvSource::new(
70///         true,
71///         b',',
72///         b'"',
73///     )
74///     .with_terminator(Some(b'#')
75/// ));
76/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
77/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
78///     .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
79///     .with_newlines_in_values(true) // The file contains newlines in values;
80///     .build();
81/// let exec = (DataSourceExec::from_data_source(config));
82/// ```
83#[derive(Debug, Clone, Default)]
84pub struct CsvSource {
85    batch_size: Option<usize>,
86    file_schema: Option<SchemaRef>,
87    file_projection: Option<Vec<usize>>,
88    pub(crate) has_header: bool,
89    delimiter: u8,
90    quote: u8,
91    terminator: Option<u8>,
92    escape: Option<u8>,
93    comment: Option<u8>,
94    metrics: ExecutionPlanMetricsSet,
95    projected_statistics: Option<Statistics>,
96    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
97}
98
99impl CsvSource {
100    /// Returns a [`CsvSource`]
101    pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
102        Self {
103            has_header,
104            delimiter,
105            quote,
106            ..Self::default()
107        }
108    }
109
110    /// true if the first line of each file is a header
111    pub fn has_header(&self) -> bool {
112        self.has_header
113    }
114    /// A column delimiter
115    pub fn delimiter(&self) -> u8 {
116        self.delimiter
117    }
118
119    /// The quote character
120    pub fn quote(&self) -> u8 {
121        self.quote
122    }
123
124    /// The line terminator
125    pub fn terminator(&self) -> Option<u8> {
126        self.terminator
127    }
128
129    /// Lines beginning with this byte are ignored.
130    pub fn comment(&self) -> Option<u8> {
131        self.comment
132    }
133
134    /// The escape character
135    pub fn escape(&self) -> Option<u8> {
136        self.escape
137    }
138
139    /// Initialize a CsvSource with escape
140    pub fn with_escape(&self, escape: Option<u8>) -> Self {
141        let mut conf = self.clone();
142        conf.escape = escape;
143        conf
144    }
145
146    /// Initialize a CsvSource with terminator
147    pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
148        let mut conf = self.clone();
149        conf.terminator = terminator;
150        conf
151    }
152
153    /// Initialize a CsvSource with comment
154    pub fn with_comment(&self, comment: Option<u8>) -> Self {
155        let mut conf = self.clone();
156        conf.comment = comment;
157        conf
158    }
159}
160
161impl CsvSource {
162    fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
163        Ok(self.builder().build(reader)?)
164    }
165
166    fn builder(&self) -> csv::ReaderBuilder {
167        let mut builder = csv::ReaderBuilder::new(Arc::clone(
168            self.file_schema
169                .as_ref()
170                .expect("Schema must be set before initializing builder"),
171        ))
172        .with_delimiter(self.delimiter)
173        .with_batch_size(
174            self.batch_size
175                .expect("Batch size must be set before initializing builder"),
176        )
177        .with_header(self.has_header)
178        .with_quote(self.quote);
179        if let Some(terminator) = self.terminator {
180            builder = builder.with_terminator(terminator);
181        }
182        if let Some(proj) = &self.file_projection {
183            builder = builder.with_projection(proj.clone());
184        }
185        if let Some(escape) = self.escape {
186            builder = builder.with_escape(escape)
187        }
188        if let Some(comment) = self.comment {
189            builder = builder.with_comment(comment);
190        }
191
192        builder
193    }
194}
195
196/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`]
197pub struct CsvOpener {
198    config: Arc<CsvSource>,
199    file_compression_type: FileCompressionType,
200    object_store: Arc<dyn ObjectStore>,
201}
202
203impl CsvOpener {
204    /// Returns a [`CsvOpener`]
205    pub fn new(
206        config: Arc<CsvSource>,
207        file_compression_type: FileCompressionType,
208        object_store: Arc<dyn ObjectStore>,
209    ) -> Self {
210        Self {
211            config,
212            file_compression_type,
213            object_store,
214        }
215    }
216}
217
218impl From<CsvSource> for Arc<dyn FileSource> {
219    fn from(source: CsvSource) -> Self {
220        as_file_source(source)
221    }
222}
223
224impl FileSource for CsvSource {
225    fn create_file_opener(
226        &self,
227        object_store: Arc<dyn ObjectStore>,
228        base_config: &FileScanConfig,
229        _partition: usize,
230    ) -> Arc<dyn FileOpener> {
231        Arc::new(CsvOpener {
232            config: Arc::new(self.clone()),
233            file_compression_type: base_config.file_compression_type,
234            object_store,
235        })
236    }
237
238    fn as_any(&self) -> &dyn Any {
239        self
240    }
241
242    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
243        let mut conf = self.clone();
244        conf.batch_size = Some(batch_size);
245        Arc::new(conf)
246    }
247
248    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
249        let mut conf = self.clone();
250        conf.file_schema = Some(schema);
251        Arc::new(conf)
252    }
253
254    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
255        let mut conf = self.clone();
256        conf.projected_statistics = Some(statistics);
257        Arc::new(conf)
258    }
259
260    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
261        let mut conf = self.clone();
262        conf.file_projection = config.file_column_projection_indices();
263        Arc::new(conf)
264    }
265
266    fn metrics(&self) -> &ExecutionPlanMetricsSet {
267        &self.metrics
268    }
269    fn statistics(&self) -> Result<Statistics> {
270        let statistics = &self.projected_statistics;
271        Ok(statistics
272            .clone()
273            .expect("projected_statistics must be set"))
274    }
275    fn file_type(&self) -> &str {
276        "csv"
277    }
278    fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
279        match t {
280            DisplayFormatType::Default | DisplayFormatType::Verbose => {
281                write!(f, ", has_header={}", self.has_header)
282            }
283            DisplayFormatType::TreeRender => Ok(()),
284        }
285    }
286
287    fn with_schema_adapter_factory(
288        &self,
289        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
290    ) -> Result<Arc<dyn FileSource>> {
291        Ok(Arc::new(Self {
292            schema_adapter_factory: Some(schema_adapter_factory),
293            ..self.clone()
294        }))
295    }
296
297    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
298        self.schema_adapter_factory.clone()
299    }
300}
301
302impl FileOpener for CsvOpener {
303    /// Open a partitioned CSV file.
304    ///
305    /// If `file_meta.range` is `None`, the entire file is opened.
306    /// If `file_meta.range` is `Some(FileRange {start, end})`, this signifies that the partition
307    /// corresponds to the byte range [start, end) within the file.
308    ///
309    /// Note: `start` or `end` might be in the middle of some lines. In such cases, the following rules
310    /// are applied to determine which lines to read:
311    /// 1. The first line of the partition is the line in which the index of the first character >= `start`.
312    /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
313    ///
314    /// Examples:
315    /// Consider the following partitions enclosed by braces `{}`:
316    ///
317    /// {A,1,2,3,4,5,6,7,8,9\n
318    ///  A,1,2,3,4,5,6,7,8,9\n}
319    ///  A,1,2,3,4,5,6,7,8,9\n
320    ///  The lines read would be: [0, 1]
321    ///
322    ///  A,{1,2,3,4,5,6,7,8,9\n
323    ///  A,1,2,3,4,5,6,7,8,9\n
324    ///  A},1,2,3,4,5,6,7,8,9\n
325    ///  The lines read would be: [1, 2]
326    fn open(
327        &self,
328        file_meta: FileMeta,
329        _file: PartitionedFile,
330    ) -> Result<FileOpenFuture> {
331        // `self.config.has_header` controls whether to skip reading the 1st line header
332        // If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
333        // partition, then don't skip first line
334        let mut csv_has_header = self.config.has_header;
335        if let Some(FileRange { start, .. }) = file_meta.range {
336            if start != 0 {
337                csv_has_header = false;
338            }
339        }
340
341        let config = CsvSource {
342            has_header: csv_has_header,
343            ..(*self.config).clone()
344        };
345
346        let file_compression_type = self.file_compression_type.to_owned();
347
348        if file_meta.range.is_some() {
349            assert!(
350                !file_compression_type.is_compressed(),
351                "Reading compressed .csv in parallel is not supported"
352            );
353        }
354
355        let store = Arc::clone(&self.object_store);
356        let terminator = self.config.terminator;
357
358        Ok(Box::pin(async move {
359            // Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
360
361            let calculated_range =
362                calculate_range(&file_meta, &store, terminator).await?;
363
364            let range = match calculated_range {
365                RangeCalculation::Range(None) => None,
366                RangeCalculation::Range(Some(range)) => Some(range.into()),
367                RangeCalculation::TerminateEarly => {
368                    return Ok(
369                        futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
370                    )
371                }
372            };
373
374            let options = GetOptions {
375                range,
376                ..Default::default()
377            };
378
379            let result = store.get_opts(file_meta.location(), options).await?;
380
381            match result.payload {
382                #[cfg(not(target_arch = "wasm32"))]
383                GetResultPayload::File(mut file, _) => {
384                    let is_whole_file_scanned = file_meta.range.is_none();
385                    let decoder = if is_whole_file_scanned {
386                        // Don't seek if no range as breaks FIFO files
387                        file_compression_type.convert_read(file)?
388                    } else {
389                        file.seek(SeekFrom::Start(result.range.start as _))?;
390                        file_compression_type.convert_read(
391                            file.take((result.range.end - result.range.start) as u64),
392                        )?
393                    };
394
395                    Ok(futures::stream::iter(config.open(decoder)?).boxed())
396                }
397                GetResultPayload::Stream(s) => {
398                    let decoder = config.builder().build_decoder();
399                    let s = s.map_err(DataFusionError::from);
400                    let input = file_compression_type.convert_stream(s.boxed())?.fuse();
401
402                    Ok(deserialize_stream(
403                        input,
404                        DecoderDeserializer::new(CsvDecoder::new(decoder)),
405                    ))
406                }
407            }
408        }))
409    }
410}
411
412pub async fn plan_to_csv(
413    task_ctx: Arc<TaskContext>,
414    plan: Arc<dyn ExecutionPlan>,
415    path: impl AsRef<str>,
416) -> Result<()> {
417    let path = path.as_ref();
418    let parsed = ListingTableUrl::parse(path)?;
419    let object_store_url = parsed.object_store();
420    let store = task_ctx.runtime_env().object_store(&object_store_url)?;
421    let writer_buffer_size = task_ctx
422        .session_config()
423        .options()
424        .execution
425        .objectstore_writer_buffer_size;
426    let mut join_set = JoinSet::new();
427    for i in 0..plan.output_partitioning().partition_count() {
428        let storeref = Arc::clone(&store);
429        let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
430        let filename = format!("{}/part-{i}.csv", parsed.prefix());
431        let file = object_store::path::Path::parse(filename)?;
432
433        let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
434        join_set.spawn(async move {
435            let mut buf_writer =
436                BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
437            let mut buffer = Vec::with_capacity(1024);
438            //only write headers on first iteration
439            let mut write_headers = true;
440            while let Some(batch) = stream.next().await.transpose()? {
441                let mut writer = csv::WriterBuilder::new()
442                    .with_header(write_headers)
443                    .build(buffer);
444                writer.write(&batch)?;
445                buffer = writer.into_inner();
446                buf_writer.write_all(&buffer).await?;
447                buffer.clear();
448                //prevent writing headers more than once
449                write_headers = false;
450            }
451            buf_writer.shutdown().await.map_err(DataFusionError::from)
452        });
453    }
454
455    while let Some(result) = join_set.join_next().await {
456        match result {
457            Ok(res) => res?, // propagate DataFusion error
458            Err(e) => {
459                if e.is_panic() {
460                    std::panic::resume_unwind(e.into_panic());
461                } else {
462                    unreachable!();
463                }
464            }
465        }
466    }
467
468    Ok(())
469}