1#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23#![cfg_attr(not(test), deny(clippy::clone_on_ref_ptr))]
26
27pub mod decoder;
31pub mod display;
32pub mod file;
33pub mod file_compression_type;
34pub mod file_format;
35pub mod file_groups;
36pub mod file_scan_config;
37pub mod file_sink_config;
38pub mod file_stream;
39pub mod memory;
40pub mod schema_adapter;
41pub mod sink;
42pub mod source;
43mod statistics;
44pub mod table_schema;
45
46#[cfg(test)]
47pub mod test_util;
48
49pub mod url;
50pub mod write;
51pub use self::file::as_file_source;
52pub use self::url::ListingTableUrl;
53use crate::file_groups::FileGroup;
54use chrono::TimeZone;
55use datafusion_common::stats::Precision;
56use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
57use datafusion_common::{ScalarValue, Statistics};
58use futures::{Stream, StreamExt};
59use object_store::{path::Path, ObjectMeta};
60use object_store::{GetOptions, GetRange, ObjectStore};
61pub use table_schema::TableSchema;
62#[allow(deprecated)]
64pub use statistics::add_row_stats;
65pub use statistics::compute_all_files_statistics;
66use std::ops::Range;
67use std::pin::Pin;
68use std::sync::Arc;
69
70pub type PartitionedFileStream =
72 Pin<Box<dyn Stream<Item = Result<PartitionedFile>> + Send + Sync + 'static>>;
73
74#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
78pub struct FileRange {
79 pub start: i64,
81 pub end: i64,
83}
84
85impl FileRange {
86 pub fn contains(&self, offset: i64) -> bool {
88 offset >= self.start && offset < self.end
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct PartitionedFile {
96 pub object_meta: ObjectMeta,
98 pub partition_values: Vec<ScalarValue>,
109 pub range: Option<FileRange>,
111 pub statistics: Option<Arc<Statistics>>,
116 pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
118 pub metadata_size_hint: Option<usize>,
120}
121
122impl PartitionedFile {
123 pub fn new(path: impl Into<String>, size: u64) -> Self {
125 Self {
126 object_meta: ObjectMeta {
127 location: Path::from(path.into()),
128 last_modified: chrono::Utc.timestamp_nanos(0),
129 size,
130 e_tag: None,
131 version: None,
132 },
133 partition_values: vec![],
134 range: None,
135 statistics: None,
136 extensions: None,
137 metadata_size_hint: None,
138 }
139 }
140
141 pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self {
143 Self {
144 object_meta: ObjectMeta {
145 location: Path::from(path),
146 last_modified: chrono::Utc.timestamp_nanos(0),
147 size,
148 e_tag: None,
149 version: None,
150 },
151 partition_values: vec![],
152 range: Some(FileRange { start, end }),
153 statistics: None,
154 extensions: None,
155 metadata_size_hint: None,
156 }
157 .with_range(start, end)
158 }
159
160 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
164 self.metadata_size_hint = Some(metadata_size_hint);
165 self
166 }
167
168 pub fn from_path(path: String) -> Result<Self> {
170 let size = std::fs::metadata(path.clone())?.len();
171 Ok(Self::new(path, size))
172 }
173
174 pub fn path(&self) -> &Path {
176 &self.object_meta.location
177 }
178
179 pub fn with_range(mut self, start: i64, end: i64) -> Self {
181 self.range = Some(FileRange { start, end });
182 self
183 }
184
185 pub fn with_extensions(
189 mut self,
190 extensions: Arc<dyn std::any::Any + Send + Sync>,
191 ) -> Self {
192 self.extensions = Some(extensions);
193 self
194 }
195
196 pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
198 self.statistics = Some(statistics);
199 self
200 }
201
202 pub fn has_statistics(&self) -> bool {
206 if let Some(stats) = &self.statistics {
207 stats.column_statistics.iter().any(|col_stats| {
208 col_stats.null_count != Precision::Absent
209 || col_stats.max_value != Precision::Absent
210 || col_stats.min_value != Precision::Absent
211 || col_stats.sum_value != Precision::Absent
212 || col_stats.distinct_count != Precision::Absent
213 })
214 } else {
215 false
216 }
217 }
218}
219
220impl From<ObjectMeta> for PartitionedFile {
221 fn from(object_meta: ObjectMeta) -> Self {
222 PartitionedFile {
223 object_meta,
224 partition_values: vec![],
225 range: None,
226 statistics: None,
227 extensions: None,
228 metadata_size_hint: None,
229 }
230 }
231}
232
233pub enum RangeCalculation {
247 Range(Option<Range<u64>>),
248 TerminateEarly,
249}
250
251pub async fn calculate_range(
262 file: &PartitionedFile,
263 store: &Arc<dyn ObjectStore>,
264 terminator: Option<u8>,
265) -> Result<RangeCalculation> {
266 let location = &file.object_meta.location;
267 let file_size = file.object_meta.size;
268 let newline = terminator.unwrap_or(b'\n');
269
270 match file.range {
271 None => Ok(RangeCalculation::Range(None)),
272 Some(FileRange { start, end }) => {
273 let start: u64 = start.try_into().map_err(|_| {
274 exec_datafusion_err!("Expect start range to fit in u64, got {start}")
275 })?;
276 let end: u64 = end.try_into().map_err(|_| {
277 exec_datafusion_err!("Expect end range to fit in u64, got {end}")
278 })?;
279
280 let start_delta = if start != 0 {
281 find_first_newline(store, location, start - 1, file_size, newline).await?
282 } else {
283 0
284 };
285
286 let end_delta = if end != file_size {
287 find_first_newline(store, location, end - 1, file_size, newline).await?
288 } else {
289 0
290 };
291
292 let range = start + start_delta..end + end_delta;
293
294 if range.start == range.end {
295 return Ok(RangeCalculation::TerminateEarly);
296 }
297
298 Ok(RangeCalculation::Range(Some(range)))
299 }
300 }
301}
302
303async fn find_first_newline(
314 object_store: &Arc<dyn ObjectStore>,
315 location: &Path,
316 start: u64,
317 end: u64,
318 newline: u8,
319) -> Result<u64> {
320 let options = GetOptions {
321 range: Some(GetRange::Bounded(start..end)),
322 ..Default::default()
323 };
324
325 let result = object_store.get_opts(location, options).await?;
326 let mut result_stream = result.into_stream();
327
328 let mut index = 0;
329
330 while let Some(chunk) = result_stream.next().await.transpose()? {
331 if let Some(position) = chunk.iter().position(|&byte| byte == newline) {
332 let position = position as u64;
333 return Ok(index + position);
334 }
335
336 index += chunk.len() as u64;
337 }
338
339 Ok(index)
340}
341
342pub fn generate_test_files(num_files: usize, overlap_factor: f64) -> Vec<FileGroup> {
382 let mut files = Vec::with_capacity(num_files);
383 if num_files == 0 {
384 return vec![];
385 }
386 let range_size = if overlap_factor == 0.0 {
387 100 / num_files as i64
388 } else {
389 (100.0 / (overlap_factor * num_files as f64)).max(1.0) as i64
390 };
391
392 for i in 0..num_files {
393 let base = (i as f64 * range_size as f64 * (1.0 - overlap_factor)) as i64;
394 let min = base as f64;
395 let max = (base + range_size) as f64;
396
397 let file = PartitionedFile {
398 object_meta: ObjectMeta {
399 location: Path::from(format!("file_{i}.parquet")),
400 last_modified: chrono::Utc::now(),
401 size: 1000,
402 e_tag: None,
403 version: None,
404 },
405 partition_values: vec![],
406 range: None,
407 statistics: Some(Arc::new(Statistics {
408 num_rows: Precision::Exact(100),
409 total_byte_size: Precision::Exact(1000),
410 column_statistics: vec![ColumnStatistics {
411 null_count: Precision::Exact(0),
412 max_value: Precision::Exact(ScalarValue::Float64(Some(max))),
413 min_value: Precision::Exact(ScalarValue::Float64(Some(min))),
414 sum_value: Precision::Absent,
415 distinct_count: Precision::Absent,
416 }],
417 })),
418 extensions: None,
419 metadata_size_hint: None,
420 };
421 files.push(file);
422 }
423
424 vec![FileGroup::new(files)]
425}
426
427pub fn verify_sort_integrity(file_groups: &[FileGroup]) -> bool {
430 for group in file_groups {
431 let files = group.iter().collect::<Vec<_>>();
432 for i in 1..files.len() {
433 let prev_file = files[i - 1];
434 let curr_file = files[i];
435
436 if let (Some(prev_stats), Some(curr_stats)) =
438 (&prev_file.statistics, &curr_file.statistics)
439 {
440 let prev_max = &prev_stats.column_statistics[0].max_value;
441 let curr_min = &curr_stats.column_statistics[0].min_value;
442 if curr_min.get_value().unwrap() <= prev_max.get_value().unwrap() {
443 return false;
444 }
445 }
446 }
447 }
448 true
449}
450
451#[cfg(test)]
452mod tests {
453 use super::ListingTableUrl;
454 use arrow::{
455 array::{ArrayRef, Int32Array, RecordBatch},
456 datatypes::{DataType, Field, Schema, SchemaRef},
457 };
458 use datafusion_execution::object_store::{
459 DefaultObjectStoreRegistry, ObjectStoreRegistry,
460 };
461 use object_store::{local::LocalFileSystem, path::Path};
462 use std::{collections::HashMap, ops::Not, sync::Arc};
463 use url::Url;
464
465 pub fn make_partition(sz: i32) -> RecordBatch {
467 let seq_start = 0;
468 let seq_end = sz;
469 let values = (seq_start..seq_end).collect::<Vec<_>>();
470 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
471 let arr = Arc::new(Int32Array::from(values));
472
473 RecordBatch::try_new(schema, vec![arr as ArrayRef]).unwrap()
474 }
475
476 pub fn aggr_test_schema() -> SchemaRef {
478 let mut f1 = Field::new("c1", DataType::Utf8, false);
479 f1.set_metadata(HashMap::from_iter(vec![("testing".into(), "test".into())]));
480 let schema = Schema::new(vec![
481 f1,
482 Field::new("c2", DataType::UInt32, false),
483 Field::new("c3", DataType::Int8, false),
484 Field::new("c4", DataType::Int16, false),
485 Field::new("c5", DataType::Int32, false),
486 Field::new("c6", DataType::Int64, false),
487 Field::new("c7", DataType::UInt8, false),
488 Field::new("c8", DataType::UInt16, false),
489 Field::new("c9", DataType::UInt32, false),
490 Field::new("c10", DataType::UInt64, false),
491 Field::new("c11", DataType::Float32, false),
492 Field::new("c12", DataType::Float64, false),
493 Field::new("c13", DataType::Utf8, false),
494 ]);
495
496 Arc::new(schema)
497 }
498
499 #[test]
500 fn test_object_store_listing_url() {
501 let listing = ListingTableUrl::parse("file:///").unwrap();
502 let store = listing.object_store();
503 assert_eq!(store.as_str(), "file:///");
504
505 let listing = ListingTableUrl::parse("s3://bucket/").unwrap();
506 let store = listing.object_store();
507 assert_eq!(store.as_str(), "s3://bucket/");
508 }
509
510 #[test]
511 fn test_get_store_hdfs() {
512 let sut = DefaultObjectStoreRegistry::default();
513 let url = Url::parse("hdfs://localhost:8020").unwrap();
514 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
515 let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
516 sut.get_store(url.as_ref()).unwrap();
517 }
518
519 #[test]
520 fn test_get_store_s3() {
521 let sut = DefaultObjectStoreRegistry::default();
522 let url = Url::parse("s3://bucket/key").unwrap();
523 sut.register_store(&url, Arc::new(LocalFileSystem::new()));
524 let url = ListingTableUrl::parse("s3://bucket/key").unwrap();
525 sut.get_store(url.as_ref()).unwrap();
526 }
527
528 #[test]
529 fn test_get_store_file() {
530 let sut = DefaultObjectStoreRegistry::default();
531 let url = ListingTableUrl::parse("file:///bucket/key").unwrap();
532 sut.get_store(url.as_ref()).unwrap();
533 }
534
535 #[test]
536 fn test_get_store_local() {
537 let sut = DefaultObjectStoreRegistry::default();
538 let url = ListingTableUrl::parse("../").unwrap();
539 sut.get_store(url.as_ref()).unwrap();
540 }
541
542 #[test]
543 fn test_url_contains() {
544 let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
545
546 assert!(url.contains(
548 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
549 true
550 ));
551
552 assert!(url.contains(
554 &Path::parse("/var/data/mytable/data.parquet").unwrap(),
555 false
556 ));
557
558 assert!(url
561 .contains(
562 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
563 true
564 )
565 .not());
566
567 assert!(url.contains(
569 &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(),
570 false
571 ));
572
573 assert!(url.contains(
575 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
576 false
577 ));
578
579 assert!(url.contains(
583 &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(),
584 true
585 ));
586
587 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true));
589
590 assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false));
592 }
593}