datafusion_datasource_csv/
source.rs1use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
21use datafusion_physical_plan::projection::ProjectionExprs;
22use std::any::Any;
23use std::fmt;
24use std::io::{Read, Seek, SeekFrom};
25use std::sync::Arc;
26use std::task::Poll;
27
28use datafusion_datasource::decoder::{DecoderDeserializer, deserialize_stream};
29use datafusion_datasource::file_compression_type::FileCompressionType;
30use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
31use datafusion_datasource::{
32 FileRange, ListingTableUrl, PartitionedFile, RangeCalculation, TableSchema,
33 as_file_source, calculate_range,
34};
35
36use arrow::csv;
37use datafusion_common::config::CsvOptions;
38use datafusion_common::{DataFusionError, Result};
39use datafusion_common_runtime::JoinSet;
40use datafusion_datasource::file::FileSource;
41use datafusion_datasource::file_scan_config::FileScanConfig;
42use datafusion_execution::TaskContext;
43use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
44use datafusion_physical_plan::{
45 DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
46};
47
48use crate::file_format::CsvDecoder;
49use futures::{StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53
54#[derive(Debug, Clone)]
88pub struct CsvSource {
89 options: CsvOptions,
90 batch_size: Option<usize>,
91 table_schema: TableSchema,
92 projection: SplitProjection,
93 metrics: ExecutionPlanMetricsSet,
94}
95
96impl CsvSource {
97 pub fn new(table_schema: impl Into<TableSchema>) -> Self {
99 let table_schema = table_schema.into();
100 Self {
101 options: CsvOptions::default(),
102 projection: SplitProjection::unprojected(&table_schema),
103 table_schema,
104 batch_size: None,
105 metrics: ExecutionPlanMetricsSet::new(),
106 }
107 }
108
109 pub fn with_csv_options(mut self, options: CsvOptions) -> Self {
111 self.options = options;
112 self
113 }
114
115 pub fn has_header(&self) -> bool {
117 self.options.has_header.unwrap_or(true)
118 }
119
120 pub fn truncate_rows(&self) -> bool {
122 self.options.truncated_rows.unwrap_or(false)
123 }
124 pub fn delimiter(&self) -> u8 {
126 self.options.delimiter
127 }
128
129 pub fn quote(&self) -> u8 {
131 self.options.quote
132 }
133
134 pub fn terminator(&self) -> Option<u8> {
136 self.options.terminator
137 }
138
139 pub fn comment(&self) -> Option<u8> {
141 self.options.comment
142 }
143
144 pub fn escape(&self) -> Option<u8> {
146 self.options.escape
147 }
148
149 pub fn with_escape(&self, escape: Option<u8>) -> Self {
151 let mut conf = self.clone();
152 conf.options.escape = escape;
153 conf
154 }
155
156 pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
158 let mut conf = self.clone();
159 conf.options.terminator = terminator;
160 conf
161 }
162
163 pub fn with_comment(&self, comment: Option<u8>) -> Self {
165 let mut conf = self.clone();
166 conf.options.comment = comment;
167 conf
168 }
169
170 pub fn with_truncate_rows(&self, truncate_rows: bool) -> Self {
172 let mut conf = self.clone();
173 conf.options.truncated_rows = Some(truncate_rows);
174 conf
175 }
176
177 pub fn newlines_in_values(&self) -> bool {
179 self.options.newlines_in_values.unwrap_or(false)
180 }
181}
182
183impl CsvSource {
184 fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
185 Ok(self.builder().build(reader)?)
186 }
187
188 fn builder(&self) -> csv::ReaderBuilder {
189 let mut builder =
190 csv::ReaderBuilder::new(Arc::clone(self.table_schema.file_schema()))
191 .with_delimiter(self.delimiter())
192 .with_batch_size(
193 self.batch_size
194 .expect("Batch size must be set before initializing builder"),
195 )
196 .with_header(self.has_header())
197 .with_quote(self.quote())
198 .with_truncated_rows(self.truncate_rows());
199 if let Some(terminator) = self.terminator() {
200 builder = builder.with_terminator(terminator);
201 }
202 builder = builder.with_projection(self.projection.file_indices.clone());
203 if let Some(escape) = self.escape() {
204 builder = builder.with_escape(escape)
205 }
206 if let Some(comment) = self.comment() {
207 builder = builder.with_comment(comment);
208 }
209
210 builder
211 }
212}
213
214pub struct CsvOpener {
216 config: Arc<CsvSource>,
217 file_compression_type: FileCompressionType,
218 object_store: Arc<dyn ObjectStore>,
219 partition_index: usize,
220}
221
222impl CsvOpener {
223 pub fn new(
225 config: Arc<CsvSource>,
226 file_compression_type: FileCompressionType,
227 object_store: Arc<dyn ObjectStore>,
228 ) -> Self {
229 Self {
230 config,
231 file_compression_type,
232 object_store,
233 partition_index: 0,
234 }
235 }
236}
237
238impl From<CsvSource> for Arc<dyn FileSource> {
239 fn from(source: CsvSource) -> Self {
240 as_file_source(source)
241 }
242}
243
244impl FileSource for CsvSource {
245 fn create_file_opener(
246 &self,
247 object_store: Arc<dyn ObjectStore>,
248 base_config: &FileScanConfig,
249 partition_index: usize,
250 ) -> Result<Arc<dyn FileOpener>> {
251 let mut opener = Arc::new(CsvOpener {
252 config: Arc::new(self.clone()),
253 file_compression_type: base_config.file_compression_type,
254 object_store,
255 partition_index,
256 }) as Arc<dyn FileOpener>;
257 opener = ProjectionOpener::try_new(
258 self.projection.clone(),
259 Arc::clone(&opener),
260 self.table_schema.file_schema(),
261 )?;
262 Ok(opener)
263 }
264
265 fn as_any(&self) -> &dyn Any {
266 self
267 }
268
269 fn table_schema(&self) -> &TableSchema {
270 &self.table_schema
271 }
272
273 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
274 let mut conf = self.clone();
275 conf.batch_size = Some(batch_size);
276 Arc::new(conf)
277 }
278
279 fn try_pushdown_projection(
280 &self,
281 projection: &ProjectionExprs,
282 ) -> Result<Option<Arc<dyn FileSource>>> {
283 let mut source = self.clone();
284 let new_projection = self.projection.source.try_merge(projection)?;
285 let split_projection =
286 SplitProjection::new(self.table_schema.file_schema(), &new_projection);
287 source.projection = split_projection;
288 Ok(Some(Arc::new(source)))
289 }
290
291 fn projection(&self) -> Option<&ProjectionExprs> {
292 Some(&self.projection.source)
293 }
294
295 fn metrics(&self) -> &ExecutionPlanMetricsSet {
296 &self.metrics
297 }
298
299 fn file_type(&self) -> &str {
300 "csv"
301 }
302
303 fn supports_repartitioning(&self) -> bool {
304 !self.options.newlines_in_values.unwrap_or(false)
307 }
308
309 fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
310 match t {
311 DisplayFormatType::Default | DisplayFormatType::Verbose => {
312 write!(f, ", has_header={}", self.has_header())
313 }
314 DisplayFormatType::TreeRender => Ok(()),
315 }
316 }
317}
318
319impl FileOpener for CsvOpener {
320 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
344 let mut csv_has_header = self.config.has_header();
348 if let Some(FileRange { start, .. }) = partitioned_file.range
349 && start != 0
350 {
351 csv_has_header = false;
352 }
353
354 let mut config = (*self.config).clone();
355 config.options.has_header = Some(csv_has_header);
356 config.options.truncated_rows = Some(config.truncate_rows());
357
358 let file_compression_type = self.file_compression_type.to_owned();
359
360 if partitioned_file.range.is_some() {
361 assert!(
362 !file_compression_type.is_compressed(),
363 "Reading compressed .csv in parallel is not supported"
364 );
365 }
366
367 let store = Arc::clone(&self.object_store);
368 let terminator = self.config.terminator();
369
370 let baseline_metrics =
371 BaselineMetrics::new(&self.config.metrics, self.partition_index);
372
373 Ok(Box::pin(async move {
374 let calculated_range =
377 calculate_range(&partitioned_file, &store, terminator).await?;
378
379 let range = match calculated_range {
380 RangeCalculation::Range(None) => None,
381 RangeCalculation::Range(Some(range)) => Some(range.into()),
382 RangeCalculation::TerminateEarly => {
383 return Ok(
384 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
385 );
386 }
387 };
388
389 let options = GetOptions {
390 range,
391 ..Default::default()
392 };
393
394 let result = store
395 .get_opts(&partitioned_file.object_meta.location, options)
396 .await?;
397
398 match result.payload {
399 #[cfg(not(target_arch = "wasm32"))]
400 GetResultPayload::File(mut file, _) => {
401 let is_whole_file_scanned = partitioned_file.range.is_none();
402 let decoder = if is_whole_file_scanned {
403 file_compression_type.convert_read(file)?
405 } else {
406 file.seek(SeekFrom::Start(result.range.start as _))?;
407 file_compression_type.convert_read(
408 file.take((result.range.end - result.range.start) as u64),
409 )?
410 };
411
412 let mut reader = config.open(decoder)?;
413
414 let iterator = std::iter::from_fn(move || {
416 let mut timer = baseline_metrics.elapsed_compute().timer();
417 let result = reader.next();
418 timer.stop();
419 result
420 });
421
422 Ok(futures::stream::iter(iterator)
423 .map(|r| r.map_err(Into::into))
424 .boxed())
425 }
426 GetResultPayload::Stream(s) => {
427 let decoder = config.builder().build_decoder();
428 let s = s.map_err(DataFusionError::from);
429 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
430
431 let stream = deserialize_stream(
432 input,
433 DecoderDeserializer::new(CsvDecoder::new(decoder)),
434 );
435 Ok(stream.map_err(Into::into).boxed())
436 }
437 }
438 }))
439 }
440}
441
442pub async fn plan_to_csv(
443 task_ctx: Arc<TaskContext>,
444 plan: Arc<dyn ExecutionPlan>,
445 path: impl AsRef<str>,
446) -> Result<()> {
447 let path = path.as_ref();
448 let parsed = ListingTableUrl::parse(path)?;
449 let object_store_url = parsed.object_store();
450 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
451 let writer_buffer_size = task_ctx
452 .session_config()
453 .options()
454 .execution
455 .objectstore_writer_buffer_size;
456 let mut join_set = JoinSet::new();
457 for i in 0..plan.output_partitioning().partition_count() {
458 let storeref = Arc::clone(&store);
459 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
460 let filename = format!("{}/part-{i}.csv", parsed.prefix());
461 let file = object_store::path::Path::parse(filename)?;
462
463 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
464 join_set.spawn(async move {
465 let mut buf_writer =
466 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
467 let mut buffer = Vec::with_capacity(1024);
468 let mut write_headers = true;
470 while let Some(batch) = stream.next().await.transpose()? {
471 let mut writer = csv::WriterBuilder::new()
472 .with_header(write_headers)
473 .build(buffer);
474 writer.write(&batch)?;
475 buffer = writer.into_inner();
476 buf_writer.write_all(&buffer).await?;
477 buffer.clear();
478 write_headers = false;
480 }
481 buf_writer.shutdown().await.map_err(DataFusionError::from)
482 });
483 }
484
485 while let Some(result) = join_set.join_next().await {
486 match result {
487 Ok(res) => res?, Err(e) => {
489 if e.is_panic() {
490 std::panic::resume_unwind(e.into_panic());
491 } else {
492 unreachable!();
493 }
494 }
495 }
496 }
497
498 Ok(())
499}