datafusion_datasource_csv/
source.rs1use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
21use std::any::Any;
22use std::fmt;
23use std::io::{Read, Seek, SeekFrom};
24use std::sync::Arc;
25use std::task::Poll;
26
27use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_meta::FileMeta;
30use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
31use datafusion_datasource::{
32 as_file_source, calculate_range, FileRange, ListingTableUrl, RangeCalculation,
33};
34
35use arrow::csv;
36use arrow::datatypes::SchemaRef;
37use datafusion_common::{DataFusionError, Result, Statistics};
38use datafusion_common_runtime::JoinSet;
39use datafusion_datasource::file::FileSource;
40use datafusion_datasource::file_scan_config::FileScanConfig;
41use datafusion_execution::TaskContext;
42use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
43use datafusion_physical_plan::{
44 DisplayFormatType, ExecutionPlan, ExecutionPlanProperties,
45};
46
47use crate::file_format::CsvDecoder;
48use futures::{StreamExt, TryStreamExt};
49use object_store::buffered::BufWriter;
50use object_store::{GetOptions, GetResultPayload, ObjectStore};
51use tokio::io::AsyncWriteExt;
52
53#[derive(Debug, Clone, Default)]
83pub struct CsvSource {
84 batch_size: Option<usize>,
85 file_schema: Option<SchemaRef>,
86 file_projection: Option<Vec<usize>>,
87 pub(crate) has_header: bool,
88 delimiter: u8,
89 quote: u8,
90 terminator: Option<u8>,
91 escape: Option<u8>,
92 comment: Option<u8>,
93 metrics: ExecutionPlanMetricsSet,
94 projected_statistics: Option<Statistics>,
95 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
96}
97
98impl CsvSource {
99 pub fn new(has_header: bool, delimiter: u8, quote: u8) -> Self {
101 Self {
102 has_header,
103 delimiter,
104 quote,
105 ..Self::default()
106 }
107 }
108
109 pub fn has_header(&self) -> bool {
111 self.has_header
112 }
113 pub fn delimiter(&self) -> u8 {
115 self.delimiter
116 }
117
118 pub fn quote(&self) -> u8 {
120 self.quote
121 }
122
123 pub fn terminator(&self) -> Option<u8> {
125 self.terminator
126 }
127
128 pub fn comment(&self) -> Option<u8> {
130 self.comment
131 }
132
133 pub fn escape(&self) -> Option<u8> {
135 self.escape
136 }
137
138 pub fn with_escape(&self, escape: Option<u8>) -> Self {
140 let mut conf = self.clone();
141 conf.escape = escape;
142 conf
143 }
144
145 pub fn with_terminator(&self, terminator: Option<u8>) -> Self {
147 let mut conf = self.clone();
148 conf.terminator = terminator;
149 conf
150 }
151
152 pub fn with_comment(&self, comment: Option<u8>) -> Self {
154 let mut conf = self.clone();
155 conf.comment = comment;
156 conf
157 }
158}
159
160impl CsvSource {
161 fn open<R: Read>(&self, reader: R) -> Result<csv::Reader<R>> {
162 Ok(self.builder().build(reader)?)
163 }
164
165 fn builder(&self) -> csv::ReaderBuilder {
166 let mut builder = csv::ReaderBuilder::new(Arc::clone(
167 self.file_schema
168 .as_ref()
169 .expect("Schema must be set before initializing builder"),
170 ))
171 .with_delimiter(self.delimiter)
172 .with_batch_size(
173 self.batch_size
174 .expect("Batch size must be set before initializing builder"),
175 )
176 .with_header(self.has_header)
177 .with_quote(self.quote);
178 if let Some(terminator) = self.terminator {
179 builder = builder.with_terminator(terminator);
180 }
181 if let Some(proj) = &self.file_projection {
182 builder = builder.with_projection(proj.clone());
183 }
184 if let Some(escape) = self.escape {
185 builder = builder.with_escape(escape)
186 }
187 if let Some(comment) = self.comment {
188 builder = builder.with_comment(comment);
189 }
190
191 builder
192 }
193}
194
195pub struct CsvOpener {
197 config: Arc<CsvSource>,
198 file_compression_type: FileCompressionType,
199 object_store: Arc<dyn ObjectStore>,
200}
201
202impl CsvOpener {
203 pub fn new(
205 config: Arc<CsvSource>,
206 file_compression_type: FileCompressionType,
207 object_store: Arc<dyn ObjectStore>,
208 ) -> Self {
209 Self {
210 config,
211 file_compression_type,
212 object_store,
213 }
214 }
215}
216
217impl From<CsvSource> for Arc<dyn FileSource> {
218 fn from(source: CsvSource) -> Self {
219 as_file_source(source)
220 }
221}
222
223impl FileSource for CsvSource {
224 fn create_file_opener(
225 &self,
226 object_store: Arc<dyn ObjectStore>,
227 base_config: &FileScanConfig,
228 _partition: usize,
229 ) -> Arc<dyn FileOpener> {
230 Arc::new(CsvOpener {
231 config: Arc::new(self.clone()),
232 file_compression_type: base_config.file_compression_type,
233 object_store,
234 })
235 }
236
237 fn as_any(&self) -> &dyn Any {
238 self
239 }
240
241 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
242 let mut conf = self.clone();
243 conf.batch_size = Some(batch_size);
244 Arc::new(conf)
245 }
246
247 fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
248 let mut conf = self.clone();
249 conf.file_schema = Some(schema);
250 Arc::new(conf)
251 }
252
253 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
254 let mut conf = self.clone();
255 conf.projected_statistics = Some(statistics);
256 Arc::new(conf)
257 }
258
259 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
260 let mut conf = self.clone();
261 conf.file_projection = config.file_column_projection_indices();
262 Arc::new(conf)
263 }
264
265 fn metrics(&self) -> &ExecutionPlanMetricsSet {
266 &self.metrics
267 }
268 fn statistics(&self) -> Result<Statistics> {
269 let statistics = &self.projected_statistics;
270 Ok(statistics
271 .clone()
272 .expect("projected_statistics must be set"))
273 }
274 fn file_type(&self) -> &str {
275 "csv"
276 }
277 fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
278 match t {
279 DisplayFormatType::Default | DisplayFormatType::Verbose => {
280 write!(f, ", has_header={}", self.has_header)
281 }
282 DisplayFormatType::TreeRender => Ok(()),
283 }
284 }
285
286 fn with_schema_adapter_factory(
287 &self,
288 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
289 ) -> Result<Arc<dyn FileSource>> {
290 Ok(Arc::new(Self {
291 schema_adapter_factory: Some(schema_adapter_factory),
292 ..self.clone()
293 }))
294 }
295
296 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
297 self.schema_adapter_factory.clone()
298 }
299}
300
301impl FileOpener for CsvOpener {
302 fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
326 let mut csv_has_header = self.config.has_header;
330 if let Some(FileRange { start, .. }) = file_meta.range {
331 if start != 0 {
332 csv_has_header = false;
333 }
334 }
335
336 let config = CsvSource {
337 has_header: csv_has_header,
338 ..(*self.config).clone()
339 };
340
341 let file_compression_type = self.file_compression_type.to_owned();
342
343 if file_meta.range.is_some() {
344 assert!(
345 !file_compression_type.is_compressed(),
346 "Reading compressed .csv in parallel is not supported"
347 );
348 }
349
350 let store = Arc::clone(&self.object_store);
351 let terminator = self.config.terminator;
352
353 Ok(Box::pin(async move {
354 let calculated_range =
357 calculate_range(&file_meta, &store, terminator).await?;
358
359 let range = match calculated_range {
360 RangeCalculation::Range(None) => None,
361 RangeCalculation::Range(Some(range)) => Some(range.into()),
362 RangeCalculation::TerminateEarly => {
363 return Ok(
364 futures::stream::poll_fn(move |_| Poll::Ready(None)).boxed()
365 )
366 }
367 };
368
369 let options = GetOptions {
370 range,
371 ..Default::default()
372 };
373
374 let result = store.get_opts(file_meta.location(), options).await?;
375
376 match result.payload {
377 #[cfg(not(target_arch = "wasm32"))]
378 GetResultPayload::File(mut file, _) => {
379 let is_whole_file_scanned = file_meta.range.is_none();
380 let decoder = if is_whole_file_scanned {
381 file_compression_type.convert_read(file)?
383 } else {
384 file.seek(SeekFrom::Start(result.range.start as _))?;
385 file_compression_type.convert_read(
386 file.take((result.range.end - result.range.start) as u64),
387 )?
388 };
389
390 Ok(futures::stream::iter(config.open(decoder)?).boxed())
391 }
392 GetResultPayload::Stream(s) => {
393 let decoder = config.builder().build_decoder();
394 let s = s.map_err(DataFusionError::from);
395 let input = file_compression_type.convert_stream(s.boxed())?.fuse();
396
397 Ok(deserialize_stream(
398 input,
399 DecoderDeserializer::new(CsvDecoder::new(decoder)),
400 ))
401 }
402 }
403 }))
404 }
405}
406
407pub async fn plan_to_csv(
408 task_ctx: Arc<TaskContext>,
409 plan: Arc<dyn ExecutionPlan>,
410 path: impl AsRef<str>,
411) -> Result<()> {
412 let path = path.as_ref();
413 let parsed = ListingTableUrl::parse(path)?;
414 let object_store_url = parsed.object_store();
415 let store = task_ctx.runtime_env().object_store(&object_store_url)?;
416 let writer_buffer_size = task_ctx
417 .session_config()
418 .options()
419 .execution
420 .objectstore_writer_buffer_size;
421 let mut join_set = JoinSet::new();
422 for i in 0..plan.output_partitioning().partition_count() {
423 let storeref = Arc::clone(&store);
424 let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
425 let filename = format!("{}/part-{i}.csv", parsed.prefix());
426 let file = object_store::path::Path::parse(filename)?;
427
428 let mut stream = plan.execute(i, Arc::clone(&task_ctx))?;
429 join_set.spawn(async move {
430 let mut buf_writer =
431 BufWriter::with_capacity(storeref, file.clone(), writer_buffer_size);
432 let mut buffer = Vec::with_capacity(1024);
433 let mut write_headers = true;
435 while let Some(batch) = stream.next().await.transpose()? {
436 let mut writer = csv::WriterBuilder::new()
437 .with_header(write_headers)
438 .build(buffer);
439 writer.write(&batch)?;
440 buffer = writer.into_inner();
441 buf_writer.write_all(&buffer).await?;
442 buffer.clear();
443 write_headers = false;
445 }
446 buf_writer.shutdown().await.map_err(DataFusionError::from)
447 });
448 }
449
450 while let Some(result) = join_set.join_next().await {
451 match result {
452 Ok(res) => res?, Err(e) => {
454 if e.is_panic() {
455 std::panic::resume_unwind(e.into_panic());
456 } else {
457 unreachable!();
458 }
459 }
460 }
461 }
462
463 Ok(())
464}