use super::*;
use crate::csv::read_impl::{to_batched_owned, BatchedCsvReader, OwnedBatchedCsvReader};
use crate::csv::utils::infer_file_schema;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CsvEncoding {
Utf8,
LossyUtf8,
}
#[derive(Clone, Debug, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum NullValues {
AllColumnsSingle(String),
AllColumns(Vec<String>),
Named(Vec<(String, String)>),
}
pub(super) enum NullValuesCompiled {
AllColumnsSingle(String),
AllColumns(Vec<String>),
Columns(Vec<String>),
}
impl NullValuesCompiled {
pub(super) fn apply_projection(&mut self, projections: &[usize]) {
if let Self::Columns(nv) = self {
let nv = projections
.iter()
.map(|i| std::mem::take(&mut nv[*i]))
.collect::<Vec<_>>();
*self = NullValuesCompiled::Columns(nv);
}
}
pub(super) unsafe fn is_null(&self, field: &[u8], index: usize) -> bool {
use NullValuesCompiled::*;
match self {
AllColumnsSingle(v) => v.as_bytes() == field,
AllColumns(v) => v.iter().any(|v| v.as_bytes() == field),
Columns(v) => {
debug_assert!(index < v.len());
v.get_unchecked(index).as_bytes() == field
}
}
}
}
impl NullValues {
pub(super) fn compile(self, schema: &Schema) -> PolarsResult<NullValuesCompiled> {
Ok(match self {
NullValues::AllColumnsSingle(v) => NullValuesCompiled::AllColumnsSingle(v),
NullValues::AllColumns(v) => NullValuesCompiled::AllColumns(v),
NullValues::Named(v) => {
let mut null_values = vec!["".to_string(); schema.len()];
for (name, null_value) in v {
let i = schema.try_index_of(&name)?;
null_values[i] = null_value;
}
NullValuesCompiled::Columns(null_values)
}
})
}
}
#[must_use]
pub struct CsvReader<'a, R>
where
R: MmapBytesReader,
{
reader: R,
rechunk: bool,
n_rows: Option<usize>,
max_records: Option<usize>,
skip_rows_before_header: usize,
projection: Option<Vec<usize>>,
columns: Option<Vec<String>>,
delimiter: Option<u8>,
has_header: bool,
ignore_parser_errors: bool,
pub(crate) schema: Option<&'a Schema>,
encoding: CsvEncoding,
n_threads: Option<usize>,
path: Option<PathBuf>,
schema_overwrite: Option<&'a Schema>,
dtype_overwrite: Option<&'a [DataType]>,
sample_size: usize,
chunk_size: usize,
low_memory: bool,
comment_char: Option<u8>,
eol_char: u8,
null_values: Option<NullValues>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
quote_char: Option<u8>,
skip_rows_after_header: usize,
parse_dates: bool,
row_count: Option<RowCount>,
owned_schema: Option<Box<Schema>>,
}
impl<'a, R> CsvReader<'a, R>
where
R: 'a + MmapBytesReader,
{
pub fn with_skip_rows_after_header(mut self, offset: usize) -> Self {
self.skip_rows_after_header = offset;
self
}
pub fn with_row_count(mut self, rc: Option<RowCount>) -> Self {
self.row_count = rc;
self
}
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
pub fn with_encoding(mut self, enc: CsvEncoding) -> Self {
self.encoding = enc;
self
}
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}
pub fn with_ignore_parser_errors(mut self, ignore: bool) -> Self {
self.ignore_parser_errors = ignore;
self
}
pub fn with_schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
self
}
pub fn with_skip_rows(mut self, skip_rows: usize) -> Self {
self.skip_rows_before_header = skip_rows;
self
}
pub fn with_rechunk(mut self, rechunk: bool) -> Self {
self.rechunk = rechunk;
self
}
pub fn has_header(mut self, has_header: bool) -> Self {
self.has_header = has_header;
self
}
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = Some(delimiter);
self
}
pub fn with_comment_char(mut self, comment_char: Option<u8>) -> Self {
self.comment_char = comment_char;
self
}
pub fn with_end_of_line_char(mut self, eol_char: u8) -> Self {
self.eol_char = eol_char;
self
}
pub fn with_null_values(mut self, null_values: Option<NullValues>) -> Self {
self.null_values = null_values;
self
}
pub fn with_dtypes(mut self, schema: Option<&'a Schema>) -> Self {
self.schema_overwrite = schema;
self
}
pub fn with_dtypes_slice(mut self, dtypes: Option<&'a [DataType]>) -> Self {
self.dtype_overwrite = dtypes;
self
}
pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
self.max_records = max_records;
self
}
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}
pub fn with_n_threads(mut self, n: Option<usize>) -> Self {
self.n_threads = n;
self
}
pub fn with_path<P: Into<PathBuf>>(mut self, path: Option<P>) -> Self {
self.path = path.map(|p| p.into());
self
}
pub fn sample_size(mut self, size: usize) -> Self {
self.sample_size = size;
self
}
pub fn low_memory(mut self, toggle: bool) -> Self {
self.low_memory = toggle;
self
}
pub fn with_quote_char(mut self, quote: Option<u8>) -> Self {
self.quote_char = quote;
self
}
pub fn with_parse_dates(mut self, toggle: bool) -> Self {
self.parse_dates = toggle;
self
}
#[cfg(feature = "private")]
pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
self.predicate = predicate;
self
}
}
impl<'a> CsvReader<'a, File> {
pub fn from_path<P: Into<PathBuf>>(path: P) -> PolarsResult<Self> {
let path = resolve_homedir(&path.into());
let f = std::fs::File::open(&path)?;
Ok(Self::new(f).with_path(Some(path)))
}
}
impl<'a, R: MmapBytesReader + 'a> CsvReader<'a, R> {
fn core_reader<'b>(
&'b mut self,
schema: Option<&'b Schema>,
to_cast: Vec<Field>,
) -> PolarsResult<CoreReader<'b>>
where
'a: 'b,
{
let reader_bytes = get_reader_bytes(&mut self.reader)?;
CoreReader::new(
reader_bytes,
self.n_rows,
self.skip_rows_before_header,
std::mem::take(&mut self.projection),
self.max_records,
self.delimiter,
self.has_header,
self.ignore_parser_errors,
self.schema,
std::mem::take(&mut self.columns),
self.encoding,
self.n_threads,
schema,
self.dtype_overwrite,
self.sample_size,
self.chunk_size,
self.low_memory,
self.comment_char,
self.quote_char,
self.eol_char,
std::mem::take(&mut self.null_values),
std::mem::take(&mut self.predicate),
to_cast,
self.skip_rows_after_header,
std::mem::take(&mut self.row_count),
self.parse_dates,
)
}
fn prepare_schema_overwrite(&self, overwriting_schema: &Schema) -> (Schema, Vec<Field>, bool) {
let mut to_cast = Vec::with_capacity(overwriting_schema.len());
let mut _has_categorical = false;
#[allow(clippy::unnecessary_filter_map)]
let fields = overwriting_schema.iter_fields().filter_map(|mut fld| {
use DataType::*;
match fld.data_type() {
Time => {
to_cast.push(fld);
None
}
Int8 | Int16 | UInt8 | UInt16 => {
to_cast.push(fld.clone());
fld.coerce(DataType::Int32);
Some(fld)
}
#[cfg(feature = "dtype-categorical")]
Categorical(_) => {
_has_categorical = true;
Some(fld)
}
_ => Some(fld),
}
});
let schema = Schema::from(fields);
(schema, to_cast, _has_categorical)
}
pub fn batched_borrowed(&'a mut self) -> PolarsResult<BatchedCsvReader<'a>> {
if let Some(schema) = self.schema_overwrite {
let (schema, to_cast, has_cat) = self.prepare_schema_overwrite(schema);
self.owned_schema = Some(Box::new(schema));
let schema = unsafe {
std::mem::transmute::<Option<&Schema>, Option<&Schema>>(
self.owned_schema.as_ref().map(|b| b.as_ref()),
)
};
let csv_reader = self.core_reader(schema, to_cast)?;
csv_reader.batched(has_cat)
} else {
let csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.batched(false)
}
}
}
impl<'a> CsvReader<'a, Box<dyn MmapBytesReader>> {
pub fn batched(mut self, schema: Option<SchemaRef>) -> PolarsResult<OwnedBatchedCsvReader> {
match schema {
Some(schema) => Ok(to_batched_owned(self, schema)),
None => {
let reader_bytes = get_reader_bytes(&mut self.reader)?;
let (inferred_schema, _, _) = infer_file_schema(
&reader_bytes,
self.delimiter.unwrap_or(b','),
self.max_records,
self.has_header,
None,
&mut self.skip_rows_before_header,
self.skip_rows_after_header,
self.comment_char,
self.quote_char,
self.eol_char,
self.null_values.as_ref(),
self.parse_dates,
)?;
let schema = Arc::new(inferred_schema);
Ok(to_batched_owned(self, schema))
}
}
}
}
impl<'a, R> SerReader<R> for CsvReader<'a, R>
where
R: MmapBytesReader + 'a,
{
fn new(reader: R) -> Self {
CsvReader {
reader,
rechunk: true,
n_rows: None,
max_records: Some(128),
skip_rows_before_header: 0,
projection: None,
delimiter: None,
has_header: true,
ignore_parser_errors: false,
schema: None,
columns: None,
encoding: CsvEncoding::Utf8,
n_threads: None,
path: None,
schema_overwrite: None,
dtype_overwrite: None,
sample_size: 1024,
chunk_size: 1 << 18,
low_memory: false,
comment_char: None,
eol_char: b'\n',
null_values: None,
predicate: None,
quote_char: Some(b'"'),
skip_rows_after_header: 0,
parse_dates: false,
row_count: None,
owned_schema: None,
}
}
fn finish(mut self) -> PolarsResult<DataFrame> {
let rechunk = self.rechunk;
let schema_overwrite = self.schema_overwrite;
let dtype_overwrite = self.dtype_overwrite;
let should_parse_dates = self.parse_dates;
let low_memory = self.low_memory;
#[cfg(feature = "dtype-categorical")]
let mut _cat_lock = None;
let mut df = if let Some(schema) = schema_overwrite {
let (schema, to_cast, _has_cat) = self.prepare_schema_overwrite(schema);
#[cfg(feature = "dtype-categorical")]
if _has_cat {
_cat_lock = Some(polars_core::IUseStringCache::new())
}
let mut csv_reader = self.core_reader(Some(&schema), to_cast)?;
csv_reader.as_df()?
} else {
#[cfg(feature = "dtype-categorical")]
{
let has_cat = self
.schema
.map(|schema| {
schema
.iter_dtypes()
.any(|dtype| matches!(dtype, DataType::Categorical(_)))
})
.unwrap_or(false);
if has_cat {
_cat_lock = Some(polars_core::IUseStringCache::new())
}
}
let mut csv_reader = self.core_reader(self.schema, vec![])?;
csv_reader.as_df()?
};
if rechunk && df.n_chunks() > 1 {
if low_memory {
df.as_single_chunk();
} else {
df.as_single_chunk_par();
}
}
#[cfg(feature = "temporal")]
if should_parse_dates {
let fixed_schema = match (schema_overwrite, dtype_overwrite) {
(Some(schema), _) => Cow::Borrowed(schema),
(None, Some(dtypes)) => {
let fields = dtypes
.iter()
.zip(df.get_column_names())
.map(|(dtype, name)| Field::new(name, dtype.clone()));
Cow::Owned(Schema::from(fields))
}
_ => Cow::Owned(Schema::default()),
};
df = parse_dates(df, &fixed_schema)
}
Ok(df)
}
}
#[cfg(feature = "temporal")]
fn parse_dates(mut df: DataFrame, fixed_schema: &Schema) -> DataFrame {
let cols = std::mem::take(df.get_columns_mut())
.into_par_iter()
.map(|s| {
if let Ok(ca) = s.utf8() {
if fixed_schema.index_of(s.name()).is_some() {
return s;
}
#[cfg(feature = "dtype-time")]
if let Ok(ca) = ca.as_time(None, false) {
return ca.into_series();
}
s
} else {
s
}
})
.collect::<Vec<_>>();
DataFrame::new_no_checks(cols)
}