use std::sync::Arc;
#[cfg(feature = "avro")]
use crate::datasource::file_format::avro::AvroFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::{file_format::csv::CsvFormat, listing::ListingOptions};
use crate::error::Result;
use crate::execution::context::{SessionConfig, SessionState};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::config::{ConfigFileDecryptionProperties, TableOptions};
use datafusion_common::{
DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
};
use async_trait::async_trait;
use datafusion_datasource_json::file_format::JsonFormat;
use datafusion_expr::SortExpr;
#[derive(Clone)]
pub struct CsvReadOptions<'a> {
pub has_header: bool,
pub delimiter: u8,
pub quote: u8,
pub terminator: Option<u8>,
pub escape: Option<u8>,
pub comment: Option<u8>,
pub newlines_in_values: bool,
pub schema: Option<&'a Schema>,
pub schema_infer_max_records: usize,
pub file_extension: &'a str,
pub table_partition_cols: Vec<(String, DataType)>,
pub file_compression_type: FileCompressionType,
pub file_sort_order: Vec<Vec<SortExpr>>,
pub null_regex: Option<String>,
pub truncated_rows: bool,
}
impl Default for CsvReadOptions<'_> {
fn default() -> Self {
Self::new()
}
}
impl<'a> CsvReadOptions<'a> {
pub fn new() -> Self {
Self {
has_header: true,
schema: None,
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
delimiter: b',',
quote: b'"',
terminator: None,
escape: None,
newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
file_sort_order: vec![],
comment: None,
null_regex: None,
truncated_rows: false,
}
}
pub fn has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self
}
pub fn comment(mut self, comment: u8) -> Self {
self.comment = Some(comment);
self
}
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
pub fn quote(mut self, quote: u8) -> Self {
self.quote = quote;
self
}
pub fn terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}
pub fn escape(mut self, escape: u8) -> Self {
self.escape = Some(escape);
self
}
pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = newlines_in_values;
self
}
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
self
}
pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
if let Some(d) = delimiter {
self.delimiter = d;
}
self
}
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
pub fn table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn schema_infer_max_records(mut self, max_records: usize) -> Self {
self.schema_infer_max_records = max_records;
self
}
pub fn file_compression_type(
mut self,
file_compression_type: FileCompressionType,
) -> Self {
self.file_compression_type = file_compression_type;
self
}
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
self.null_regex = null_regex;
self
}
pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
self.truncated_rows = truncated_rows;
self
}
}
#[derive(Clone)]
pub struct ParquetReadOptions<'a> {
pub file_extension: &'a str,
pub table_partition_cols: Vec<(String, DataType)>,
pub parquet_pruning: Option<bool>,
pub skip_metadata: Option<bool>,
pub schema: Option<&'a Schema>,
pub file_sort_order: Vec<Vec<SortExpr>>,
pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
pub metadata_size_hint: Option<usize>,
}
impl Default for ParquetReadOptions<'_> {
fn default() -> Self {
Self {
file_extension: DEFAULT_PARQUET_EXTENSION,
table_partition_cols: vec![],
parquet_pruning: None,
skip_metadata: None,
schema: None,
file_sort_order: vec![],
file_decryption_properties: None,
metadata_size_hint: None,
}
}
}
impl<'a> ParquetReadOptions<'a> {
pub fn new() -> Self {
Default::default()
}
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
self
}
pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
self.parquet_pruning = Some(parquet_pruning);
self
}
pub fn skip_metadata(mut self, skip_metadata: bool) -> Self {
self.skip_metadata = Some(skip_metadata);
self
}
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
pub fn table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
pub fn file_decryption_properties(
mut self,
file_decryption_properties: ConfigFileDecryptionProperties,
) -> Self {
self.file_decryption_properties = Some(file_decryption_properties);
self
}
pub fn metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
self.metadata_size_hint = size_hint;
self
}
}
#[derive(Clone)]
pub struct ArrowReadOptions<'a> {
pub schema: Option<&'a Schema>,
pub file_extension: &'a str,
pub table_partition_cols: Vec<(String, DataType)>,
}
impl Default for ArrowReadOptions<'_> {
fn default() -> Self {
Self {
schema: None,
file_extension: DEFAULT_ARROW_EXTENSION,
table_partition_cols: vec![],
}
}
}
impl<'a> ArrowReadOptions<'a> {
pub fn table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
}
#[derive(Clone)]
pub struct AvroReadOptions<'a> {
pub schema: Option<&'a Schema>,
pub file_extension: &'a str,
pub table_partition_cols: Vec<(String, DataType)>,
}
impl Default for AvroReadOptions<'_> {
fn default() -> Self {
Self {
schema: None,
file_extension: DEFAULT_AVRO_EXTENSION,
table_partition_cols: vec![],
}
}
}
impl<'a> AvroReadOptions<'a> {
pub fn table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
}
#[deprecated(
since = "53.0.0",
note = "Use `JsonReadOptions` instead. This alias will be removed in a future version."
)]
#[doc = "Deprecated: Use [`JsonReadOptions`] instead."]
pub type NdJsonReadOptions<'a> = JsonReadOptions<'a>;
#[derive(Clone)]
pub struct JsonReadOptions<'a> {
pub schema: Option<&'a Schema>,
pub schema_infer_max_records: usize,
pub file_extension: &'a str,
pub table_partition_cols: Vec<(String, DataType)>,
pub file_compression_type: FileCompressionType,
pub infinite: bool,
pub file_sort_order: Vec<Vec<SortExpr>>,
pub newline_delimited: bool,
}
impl Default for JsonReadOptions<'_> {
fn default() -> Self {
Self {
schema: None,
schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
file_extension: DEFAULT_JSON_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
newline_delimited: true,
}
}
}
impl<'a> JsonReadOptions<'a> {
pub fn table_partition_cols(
mut self,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
}
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
self
}
pub fn mark_infinite(mut self, infinite: bool) -> Self {
self.infinite = infinite;
self
}
pub fn file_compression_type(
mut self,
file_compression_type: FileCompressionType,
) -> Self {
self.file_compression_type = file_compression_type;
self
}
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
self.file_sort_order = file_sort_order;
self
}
pub fn schema_infer_max_records(mut self, schema_infer_max_records: usize) -> Self {
self.schema_infer_max_records = schema_infer_max_records;
self
}
pub fn newline_delimited(mut self, newline_delimited: bool) -> Self {
self.newline_delimited = newline_delimited;
self
}
}
#[async_trait]
pub trait ReadOptions<'a> {
fn to_listing_options(
&self,
config: &SessionConfig,
table_options: TableOptions,
) -> ListingOptions;
async fn get_resolved_schema(
&self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef>;
async fn _get_resolved_schema(
&'a self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
schema: Option<&'a Schema>,
) -> Result<SchemaRef>
where
'a: 'async_trait,
{
if let Some(s) = schema {
return Ok(Arc::new(s.to_owned()));
}
self.to_listing_options(config, state.default_table_options())
.infer_schema(&state, &table_path)
.await
}
}
#[async_trait]
impl ReadOptions<'_> for CsvReadOptions<'_> {
fn to_listing_options(
&self,
config: &SessionConfig,
table_options: TableOptions,
) -> ListingOptions {
let file_format = CsvFormat::default()
.with_options(table_options.csv)
.with_has_header(self.has_header)
.with_comment(self.comment)
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
.with_terminator(self.terminator)
.with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned())
.with_null_regex(self.null_regex.clone())
.with_truncated_rows(self.truncated_rows);
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
}
async fn get_resolved_schema(
&self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
#[cfg(feature = "parquet")]
#[async_trait]
impl ReadOptions<'_> for ParquetReadOptions<'_> {
fn to_listing_options(
&self,
config: &SessionConfig,
table_options: TableOptions,
) -> ListingOptions {
let mut options = table_options.parquet;
if let Some(file_decryption_properties) = &self.file_decryption_properties {
options.crypto.file_decryption = Some(file_decryption_properties.clone());
}
if let Some(metadata_size_hint) = self.metadata_size_hint {
options.global.metadata_size_hint = Some(metadata_size_hint);
}
let mut file_format = ParquetFormat::new().with_options(options);
if let Some(parquet_pruning) = self.parquet_pruning {
file_format = file_format.with_enable_pruning(parquet_pruning)
}
if let Some(skip_metadata) = self.skip_metadata {
file_format = file_format.with_skip_metadata(skip_metadata)
}
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_session_config_options(config)
}
async fn get_resolved_schema(
&self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
#[async_trait]
impl ReadOptions<'_> for JsonReadOptions<'_> {
fn to_listing_options(
&self,
config: &SessionConfig,
table_options: TableOptions,
) -> ListingOptions {
let file_format = JsonFormat::default()
.with_options(table_options.json)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned())
.with_newline_delimited(self.newline_delimited);
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
}
async fn get_resolved_schema(
&self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
#[cfg(feature = "avro")]
#[async_trait]
impl ReadOptions<'_> for AvroReadOptions<'_> {
fn to_listing_options(
&self,
config: &SessionConfig,
_table_options: TableOptions,
) -> ListingOptions {
let file_format = AvroFormat;
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
}
async fn get_resolved_schema(
&self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
#[async_trait]
impl ReadOptions<'_> for ArrowReadOptions<'_> {
fn to_listing_options(
&self,
config: &SessionConfig,
_table_options: TableOptions,
) -> ListingOptions {
let file_format = ArrowFormat;
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
.with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
}
async fn get_resolved_schema(
&self,
config: &SessionConfig,
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}