datafusion_datasource_csv/
source.rs1use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
21use std::any::Any;
22use std::fmt;
23use std::io::{Read, Seek, SeekFrom};
24use std::sync::Arc;
25use std::task::Poll;
26
27use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_meta::FileMeta;
30use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
31use datafusion_datasource::{
32 as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
33 RangeCalculation,
34};
35
36use arrow::csv;
37use arrow::datatypes::SchemaRef;
38use datafusion_common::{DataFusionError, Result, Statistics};
39use datafusion_common_runtime::JoinSet;
40use datafusion_datasource::file::FileSource;
41use datafusion_datasource::file_scan_config::FileScanConfig;
42use datafusion_execution::TaskContext;
43use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
44use datafusion_physical_plan::{
45 DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
46};
47
48use crate::file_format::CsvDecoder;
49use futures::{StreamExt, TryStreamExt};
50use object_store::buffered::BufWriter;
51use object_store::{GetOptions, GetResultPayload, ObjectStore};
52use tokio::io::AsyncWriteExt;
53
54#[derive(Debug, Clone, Default)]
84pub struct CsvSource {
85 batch_size: Option<usize>,
86 file_schema: Option<SchemaRef>,
87 file_projection: Option<Vec<usize>>,
88 pub(crate) has_header: bool,
89 delimiter: u8,
90 quote: u8,
91 terminator: Option<u8>,
92 escape: Option<u8>,
93 comment: Option<u8>,
94 metrics: ExecutionPlanMetricsSet,
95 projected_statistics: Option<Statistics>,
96 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
97}
98
99impl CsvSource {
100 pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
102 Self {
103 has_header,
104 delimiter,
105 quote,
106 ..Self::default()
107 }
108 }
109
110 pub fn has_header(&self) -> bool {
112 self.has_header
113 }
114 pub fn delimiter(&self) -> u8 {
116 self.delimiter
117 }
118
119 pub fn quote(&self) -> u8 {
121 self.quote
122 }
123
124 pub fn terminator(&self) -> Option<u8> {
126 self.terminator
127 }
128
129 pub fn comment(&self) -> Option<u8> {
131 self.comment
132 }
133
134 pub fn escape(&self) -> Option<u8> {
136 self.escape
137 }
138
139 pub fn with_escape(&self, escape: Option<u8>) -> Self {
141 let mut conf = self.clone();
142 conf.escape = escape;
143 conf
144 }
145
146 pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
148 let mut conf = self.clone();
149 conf.terminator = terminator;
150 conf
151 }
152
153 pub fn with_comment(&self, comment: Option<u8>) -> Self {
155 let mut conf = self.clone();
156 conf.comment = comment;
157 conf
158 }
159}
160
161impl CsvSource {
162 fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
163 Ok(self.builder().build(reader)?)
164 }
165
166 fn builder(&self) -> csv::ReaderBuilder {
167 let mut builder = csv::ReaderBuilder::new(Arc::clone(
168 self.file_schema
169 .as_ref()
170 .expect("Schema must be set before initializing builder"),
171 ))
172 .with_delimiter(self.delimiter)
173 .with_batch_size(
174 self.batch_size
175 .expect("Batch size must be set before initializing builder"),
176 )
177 .with_header(self.has_header)
178 .with_quote(self.quote);
179 if let Some(terminator) = self.terminator {
180 builder = builder.with_terminator(terminator);
181 }
182 if let Some(proj) = &self.file_projection {
183 builder = builder.with_projection(proj.clone());
184 }
185 if let Some(escape) = self.escape {
186 builder = builder.with_escape(escape)
187 }
188 if let Some(comment) = self.comment {
189 builder = builder.with_comment(comment);
190 }
191
192 builder
193 }
194}
195
196pub struct CsvOpener {
198 config: Arc<CsvSource>,
199 file_compression_type: FileCompressionType,
200 object_store: Arc<dyn ObjectStore>,
201}
202
203impl CsvOpener {
204 pub fn new(
206 config: Arc<CsvSource>,
207 file_compression_type: FileCompressionType,
208 object_store: Arc<dyn ObjectStore>,
209 ) -> Self {
210 Self {
211 config,
212 file_compression_type,
213 object_store,
214 }
215 }
216}
217
218impl From<CsvSource> for Arc<dyn FileSource> {
219 fn from(source: CsvSource) -> Self {
220 as_file_source(source)
221 }
222}
223
224impl FileSource for CsvSource {
225 fn create_file_opener(
226 &self,
227 object_store: Arc<dyn ObjectStore>,
228 base_config: &FileScanConfig,
229 _partition: usize,
230 ) -> Arc<dyn FileOpener> {
231 Arc::new(CsvOpener {
232 config: Arc::new(self.clone()),
233 file_compression_type: base_config.file_compression_type,
234 object_store,
235 })
236 }
237
238 fn as_any(&self) -> &dyn Any {
239 self
240 }
241
242 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
243 let mut conf = self.clone();
244 conf.batch_size = Some(batch_size);
245 Arc::new(conf)
246 }
247
248 fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
249 let mut conf = self.clone();
250 conf.file_schema = Some(schema);
251 Arc::new(conf)
252 }
253
254 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
255 let mut conf = self.clone();
256 conf.projected_statistics = Some(statistics);
257 Arc::new(conf)
258 }
259
260 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
261 let mut conf = self.clone();
262 conf.file_projection = config.file_column_projection_indices();
263 Arc::new(conf)
264 }
265
266 fn metrics(&self) -> &ExecutionPlanMetricsSet {
267 &self.metrics
268 }
269 fn statistics(&self) -> Result<Statistics> {
270 let statistics = &self.projected_statistics;
271 Ok(statistics
272 .clone()
273 .expect("projected_statistics must be set"))
274 }
275 fn file_type(&self) -> &str {
276 "csv"
277 }
278 fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
279 match t {
280 DisplayFormatType::Default | DisplayFormatType::Verbose => {
281 write!(f, ", has_header={}", self.has_header)
282 }
283 DisplayFormatType::TreeRender => Ok(()),
284 }
285 }
286
287 fn with_schema_adapter_factory(
288 &self,
289 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
290 ) -> Result<Arc<dyn FileSource>> {
291 Ok(Arc::new(Self {
292 schema_adapter_factory: Some(schema_adapter_factory),
293 ..self.clone()
294 }))
295 }
296
297 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
298 self.schema_adapter_factory.clone()
299 }
300}
301
302impl FileOpener for CsvOpener {
303 fn open(
327 &self,
328 file_meta: FileMeta,
329 _file: PartitionedFile,
330 ) -> Result<FileOpenFuture> {
331 let mut csv_has_header = self.config.has_header;
335 if let Some(FileRange { start, .. }) = file_meta.range {
336 if start != 0 {
337 csv_has_header = false;
338 }
339 }
340
341 let config = CsvSource {
342 has_header: csv_has_header,
343 ..(*self.config).clone()
344 };
345
346 let file_compression_type = self.file_compression_type.to_owned();
347
348 if file_meta.range.is_some() {
349 assert!(
350 !file_compression_type.is_compressed(),
351 "Reading compressed .csv in parallel is not supported"
352 );
353 }
354
355 let store = Arc::clone(&self.object_store);
356 let terminator = self.config.terminator;
357
358 Ok(Box::pin(async move {
359 let calculated_range =
362 calculate_range(&file_meta, &store, terminator).await?;
363
364 let range = match calculated_range {
365 RangeCalculation::Range(None) => None,
366 RangeCalculation::Range(Some(range)) => Some(range.into()),
367 RangeCalculation::TerminateEarly => {
368 return Ok(
369 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
370 )
371 }
372 };
373
374 let options = GetOptions {
375 range,
376 ..Default::default()
377 };
378
379 let result = store.get_opts(file_meta.location(), options).await?;
380
381 match result.payload {
382 #[cfg(not(target_arch = "wasm32"))]
383 GetResultPayload::File(mut file, _) => {
384 let is_whole_file_scanned = file_meta.range.is_none();
385 let decoder = if is_whole_file_scanned {
386 file_compression_type.convert_read(file)?
388 } else {
389 file.seek(SeekFrom::Start(result.range.start as _))?;
390 file_compression_type.convert_read(
391 file.take((result.range.end - result.range.start) as u64),
392 )?
393 };
394
395 Ok(futures::stream::iter(config.open(decoder)?).boxed())
396 }
397 GetResultPayload::Stream(s) => {
398 let decoder = config.builder().build_decoder();
399 let s = s.map_err(DataFusionError::from);
400 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
401
402 Ok(deserialize_stream(
403 input,
404 DecoderDeserializer::new(CsvDecoder::new(decoder)),
405 ))
406 }
407 }
408 }))
409 }
410}
411
412pub async fn plan_to_csv(
413 task_ctx: Arc<TaskContext>,
414 plan: Arc<dyn ExecutionPlan>,
415 path: impl AsRef<str>,
416) -> Result<()> {
417 let path = path.as_ref();
418 let parsed = ListingTableUrl::parse(path)?;
419 let object_store_url = parsed.object_store();
420 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
421 let writer_buffer_size = task_ctx
422 .session_config()
423 .options()
424 .execution
425 .objectstore_writer_buffer_size;
426 let mut join_set = JoinSet::new();
427 for i in 0..plan.output_partitioning().partition_count() {
428 let storeref = Arc::clone(&store);
429 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
430 let filename = format!("{}/part-{i}.csv", parsed.prefix());
431 let file = object_store::path::Path::parse(filename)?;
432
433 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
434 join_set.spawn(async move {
435 let mut buf_writer =
436 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
437 let mut buffer = Vec::with_capacity(1024);
438 let mut write_headers = true;
440 while let Some(batch) = stream.next().await.transpose()? {
441 let mut writer = csv::WriterBuilder::new()
442 .with_header(write_headers)
443 .build(buffer);
444 writer.write(&batch)?;
445 buffer = writer.into_inner();
446 buf_writer.write_all(&buffer).await?;
447 buffer.clear();
448 write_headers = false;
450 }
451 buf_writer.shutdown().await.map_err(DataFusionError::from)
452 });
453 }
454
455 while let Some(result) = join_set.join_next().await {
456 match result {
457 Ok(res) => res?, Err(e) => {
459 if e.is_panic() {
460 std::panic::resume_unwind(e.into_panic());
461 } else {
462 unreachable!();
463 }
464 }
465 }
466 }
467
468 Ok(())
469}