use std::cmp;
use std::iter::Iterator;
use std::num::NonZeroUsize;
use std::sync::Arc;
use polars_buffer::Buffer;
use polars_core::prelude::Schema;
use polars_core::schema::SchemaRef;
use polars_error::{PolarsResult, polars_bail, polars_ensure};
use crate::csv::read::schema_inference::infer_file_schema_impl;
use crate::prelude::_csv_read_internal::{SplitLines, is_comment_line};
use crate::prelude::{CsvParseOptions, CsvReadOptions};
use crate::utils::compression::{ByteSourceReader, CompressedReader};
use crate::utils::stream_buf_reader::ReaderSource;
pub type InspectContentFn<'a> = Box<dyn FnMut(&[u8]) + 'a>;
#[inline(never)]
pub fn read_until_start_and_infer_schema_from_compressed_reader(
options: &CsvReadOptions,
projected_schema: Option<SchemaRef>,
mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
reader: &mut CompressedReader,
) -> PolarsResult<(Schema, Buffer<u8>)> {
const ESTIMATED_BYTES_PER_ROW: usize = 200;
#[derive(Copy, Clone)]
enum State {
SkipEmpty,
SkipRowsBeforeHeader(usize),
SkipHeader(bool),
SkipRowsAfterHeader(usize),
ContentInspect,
InferCollect,
Done,
}
polars_ensure!(
!(options.skip_lines != 0 && options.skip_rows != 0),
InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
);
let prev_leftover = skip_lines_naive_from_compressed_reader(
options.parse_options.eol_char,
options.skip_lines,
options.raise_if_empty,
reader,
)?;
let mut state = if options.has_header {
State::SkipEmpty
} else if options.skip_lines != 0 {
State::SkipHeader(false)
} else {
State::SkipRowsBeforeHeader(options.skip_rows)
};
let comment_prefix = options.parse_options.comment_prefix.as_ref();
let infer_schema_length = if options.schema.is_some() {
Some(0)
} else {
options.infer_schema_length
};
let mut header_line = None;
let mut content_lines = Vec::with_capacity(infer_schema_length.unwrap_or_else(|| {
reader
.total_len_estimate()
.saturating_div(ESTIMATED_BYTES_PER_ROW)
}));
let initial_read_size = infer_schema_length
.map(|isl| {
cmp::max(
CompressedReader::initial_read_size(),
isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
)
})
.unwrap_or(usize::MAX);
let leftover = for_each_line_from_reader_from_compressed_reader(
&options.parse_options,
true,
prev_leftover,
initial_read_size,
reader,
|mem_slice_line| {
let line = &*mem_slice_line;
let done = loop {
match &mut state {
State::SkipEmpty => {
if line.is_empty() || line == b"\r" {
break LineUse::ConsumeDiscard;
}
state = State::SkipRowsBeforeHeader(options.skip_rows);
},
State::SkipRowsBeforeHeader(remaining) => {
let is_comment = is_comment_line(line, comment_prefix);
if *remaining == 0 && !is_comment {
state = State::SkipHeader(false);
continue;
}
*remaining -= !is_comment as usize;
break LineUse::ConsumeDiscard;
},
State::SkipHeader(did_skip) => {
if !options.has_header || *did_skip {
state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
continue;
}
header_line = Some(mem_slice_line.clone());
*did_skip = true;
break LineUse::ConsumeDiscard;
},
State::SkipRowsAfterHeader(remaining) => {
let is_comment = is_comment_line(line, comment_prefix);
if *remaining == 0 && !is_comment {
state = State::ContentInspect;
continue;
}
*remaining -= !is_comment as usize;
break LineUse::ConsumeDiscard;
},
State::ContentInspect => {
if let Some(func) = &mut inspect_first_content_row_fn {
func(line);
}
state = State::InferCollect;
},
State::InferCollect => {
if !is_comment_line(line, comment_prefix) {
content_lines.push(mem_slice_line.clone());
if content_lines.len() >= infer_schema_length.unwrap_or(usize::MAX) {
state = State::Done;
continue;
}
}
break LineUse::ConsumeKeep;
},
State::Done => {
break LineUse::Done;
},
}
};
Ok(done)
},
)?;
let infer_all_as_str = infer_schema_length == Some(0);
let inferred_schema = infer_schema(
&header_line,
&content_lines,
infer_all_as_str,
options,
projected_schema,
)?;
Ok((inferred_schema, leftover))
}
#[inline(never)]
pub fn read_until_start_and_infer_schema(
options: &CsvReadOptions,
projected_schema: Option<SchemaRef>,
decompressed_file_size_hint: Option<usize>,
mut inspect_first_content_row_fn: Option<InspectContentFn<'_>>,
reader: &mut ByteSourceReader<ReaderSource>,
) -> PolarsResult<(Schema, Buffer<u8>)> {
const ESTIMATED_BYTES_PER_ROW: usize = 200;
#[derive(Copy, Clone)]
enum State {
SkipEmpty,
SkipRowsBeforeHeader(usize),
SkipHeader(bool),
SkipRowsAfterHeader(usize),
ContentInspect,
InferCollect,
Done,
}
polars_ensure!(
!(options.skip_lines != 0 && options.skip_rows != 0),
InvalidOperation: "only one of 'skip_rows'/'skip_lines' may be set"
);
let prev_leftover = skip_lines_naive(
options.parse_options.eol_char,
options.skip_lines,
options.raise_if_empty,
decompressed_file_size_hint,
reader,
)?;
let mut state = if options.has_header {
State::SkipEmpty
} else if options.skip_lines != 0 {
State::SkipHeader(false)
} else {
State::SkipRowsBeforeHeader(options.skip_rows)
};
let comment_prefix = options.parse_options.comment_prefix.as_ref();
let infer_schema_length = if options.schema.is_some() {
Some(0)
} else {
options.infer_schema_length
};
let mut header_line = None;
let mut content_lines = Vec::with_capacity(infer_schema_length.unwrap_or_else(|| {
decompressed_file_size_hint
.map(|size| size.saturating_div(ESTIMATED_BYTES_PER_ROW))
.unwrap_or(100)
}));
let initial_read_size = infer_schema_length
.map(|isl| {
cmp::max(
CompressedReader::initial_read_size(),
isl.saturating_mul(ESTIMATED_BYTES_PER_ROW),
)
})
.unwrap_or(usize::MAX);
let leftover = for_each_line_from_reader(
&options.parse_options,
true,
prev_leftover,
initial_read_size,
decompressed_file_size_hint,
reader,
|mem_slice_line| {
let line = &*mem_slice_line;
let done = loop {
match &mut state {
State::SkipEmpty => {
if line.is_empty() || line == b"\r" {
break LineUse::ConsumeDiscard;
}
state = State::SkipRowsBeforeHeader(options.skip_rows);
},
State::SkipRowsBeforeHeader(remaining) => {
let is_comment = is_comment_line(line, comment_prefix);
if *remaining == 0 && !is_comment {
state = State::SkipHeader(false);
continue;
}
*remaining -= !is_comment as usize;
break LineUse::ConsumeDiscard;
},
State::SkipHeader(did_skip) => {
if !options.has_header || *did_skip {
state = State::SkipRowsAfterHeader(options.skip_rows_after_header);
continue;
}
header_line = Some(mem_slice_line.clone());
*did_skip = true;
break LineUse::ConsumeDiscard;
},
State::SkipRowsAfterHeader(remaining) => {
let is_comment = is_comment_line(line, comment_prefix);
if *remaining == 0 && !is_comment {
state = State::ContentInspect;
continue;
}
*remaining -= !is_comment as usize;
break LineUse::ConsumeDiscard;
},
State::ContentInspect => {
if let Some(func) = &mut inspect_first_content_row_fn {
func(line);
}
state = State::InferCollect;
},
State::InferCollect => {
if !is_comment_line(line, comment_prefix) {
content_lines.push(mem_slice_line.clone());
if content_lines.len() >= infer_schema_length.unwrap_or(usize::MAX) {
state = State::Done;
continue;
}
}
break LineUse::ConsumeKeep;
},
State::Done => {
break LineUse::Done;
},
}
};
Ok(done)
},
)?;
let infer_all_as_str = infer_schema_length == Some(0);
let inferred_schema = infer_schema(
&header_line,
&content_lines,
infer_all_as_str,
options,
projected_schema,
)?;
Ok((inferred_schema, leftover))
}
enum LineUse {
ConsumeDiscard,
ConsumeKeep,
Done,
}
fn for_each_line_from_reader_from_compressed_reader(
parse_options: &CsvParseOptions,
is_file_start: bool,
mut prev_leftover: Buffer<u8>,
initial_read_size: usize,
reader: &mut CompressedReader,
mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
) -> PolarsResult<Buffer<u8>> {
let mut is_first_line = is_file_start;
let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
.map(|x| {
x.parse::<NonZeroUsize>()
.unwrap_or_else(|_| {
panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
})
.get()
})
.ok();
let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
let mut retain_offset = None;
loop {
let (mut slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
if slice.is_empty() {
return Ok(Buffer::new());
}
if is_first_line {
is_first_line = false;
const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
if slice.get(0..3) == UTF8_BOM_MARKER {
slice = slice.sliced(3..);
}
}
let line_to_sub_slice = |line: &[u8]| {
let start = line.as_ptr() as usize - slice.as_ptr() as usize;
slice.clone().sliced(start..(start + line.len()))
};
let effective_slice = if let Some(offset) = retain_offset {
slice.clone().sliced(offset..)
} else {
slice.clone()
};
let mut lines = SplitLines::new(
&effective_slice,
parse_options.quote_char,
parse_options.eol_char,
parse_options.comment_prefix.as_ref(),
);
let Some(mut prev_line) = lines.next() else {
read_size = read_size.saturating_mul(2);
prev_leftover = slice;
continue;
};
let mut should_ret = false;
for next_line in lines {
match line_fn(line_to_sub_slice(prev_line))? {
LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
LineUse::ConsumeKeep => {
if retain_offset.is_none() {
let retain_start_offset =
prev_line.as_ptr() as usize - slice.as_ptr() as usize;
prev_leftover = slice.clone().sliced(retain_start_offset..);
retain_offset = Some(0);
}
},
LineUse::Done => {
should_ret = true;
break;
},
}
prev_line = next_line;
}
let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
if bytes_read < read_size {
match line_fn(line_to_sub_slice(prev_line))? {
LineUse::ConsumeDiscard => {
debug_assert!(retain_offset.is_none());
unconsumed_offset += prev_line.len();
if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
unconsumed_offset += 1;
}
},
LineUse::ConsumeKeep | LineUse::Done => (),
}
should_ret = true;
}
if let Some(offset) = &mut retain_offset {
if *offset == 0 {
*offset = unconsumed_offset - (slice.len() - prev_leftover.len());
} else {
prev_leftover = slice;
*offset += unconsumed_offset;
}
} else {
prev_leftover = slice.sliced(unconsumed_offset..);
}
if should_ret {
return Ok(prev_leftover);
}
if read_size < CompressedReader::ideal_read_size() && fixed_read_size.is_none() {
read_size *= 4;
}
}
}
fn for_each_line_from_reader(
parse_options: &CsvParseOptions,
is_file_start: bool,
mut prev_leftover: Buffer<u8>,
initial_read_size: usize,
decompressed_file_size_hint: Option<usize>,
reader: &mut ByteSourceReader<ReaderSource>,
mut line_fn: impl FnMut(Buffer<u8>) -> PolarsResult<LineUse>,
) -> PolarsResult<Buffer<u8>> {
let mut is_first_line = is_file_start;
let fixed_read_size = std::env::var("POLARS_FORCE_CSV_INFER_READ_SIZE")
.map(|x| {
x.parse::<NonZeroUsize>()
.unwrap_or_else(|_| {
panic!("invalid value for POLARS_FORCE_CSV_INFER_READ_SIZE: {x}")
})
.get()
})
.ok();
let mut read_size = fixed_read_size.unwrap_or(initial_read_size);
let mut retain_offset = None;
loop {
let (mut slice, bytes_read) =
reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
if slice.is_empty() {
return Ok(Buffer::new());
}
if is_first_line {
is_first_line = false;
const UTF8_BOM_MARKER: Option<&[u8]> = Some(b"\xef\xbb\xbf");
if slice.get(0..3) == UTF8_BOM_MARKER {
slice = slice.sliced(3..);
}
}
let line_to_sub_slice = |line: &[u8]| {
let start = line.as_ptr() as usize - slice.as_ptr() as usize;
slice.clone().sliced(start..(start + line.len()))
};
let effective_slice = if let Some(offset) = retain_offset {
slice.clone().sliced(offset..)
} else {
slice.clone()
};
let mut lines = SplitLines::new(
&effective_slice,
parse_options.quote_char,
parse_options.eol_char,
parse_options.comment_prefix.as_ref(),
);
let Some(mut prev_line) = lines.next() else {
read_size = read_size.saturating_mul(2);
prev_leftover = slice;
continue;
};
let mut should_ret = false;
for next_line in lines {
match line_fn(line_to_sub_slice(prev_line))? {
LineUse::ConsumeDiscard => debug_assert!(retain_offset.is_none()),
LineUse::ConsumeKeep => {
if retain_offset.is_none() {
let retain_start_offset =
prev_line.as_ptr() as usize - slice.as_ptr() as usize;
prev_leftover = slice.clone().sliced(retain_start_offset..);
retain_offset = Some(0);
}
},
LineUse::Done => {
should_ret = true;
break;
},
}
prev_line = next_line;
}
let mut unconsumed_offset = prev_line.as_ptr() as usize - effective_slice.as_ptr() as usize;
if bytes_read < read_size {
match line_fn(line_to_sub_slice(prev_line))? {
LineUse::ConsumeDiscard => {
debug_assert!(retain_offset.is_none());
unconsumed_offset += prev_line.len();
if effective_slice.get(unconsumed_offset) == Some(&parse_options.eol_char) {
unconsumed_offset += 1;
}
},
LineUse::ConsumeKeep | LineUse::Done => (),
}
should_ret = true;
}
if let Some(offset) = &mut retain_offset {
if *offset == 0 {
*offset = unconsumed_offset - (slice.len() - prev_leftover.len());
} else {
prev_leftover = slice;
*offset += unconsumed_offset;
}
} else {
prev_leftover = slice.sliced(unconsumed_offset..);
}
if should_ret {
return Ok(prev_leftover);
}
if read_size < ByteSourceReader::<ReaderSource>::ideal_read_size()
&& fixed_read_size.is_none()
{
read_size *= 4;
}
}
}
fn skip_lines_naive_from_compressed_reader(
eol_char: u8,
skip_lines: usize,
raise_if_empty: bool,
reader: &mut CompressedReader,
) -> PolarsResult<Buffer<u8>> {
let mut prev_leftover = Buffer::new();
if skip_lines == 0 {
return Ok(prev_leftover);
}
let mut remaining = skip_lines;
let mut read_size = CompressedReader::initial_read_size();
loop {
let (slice, bytes_read) = reader.read_next_slice(&prev_leftover, read_size)?;
let mut bytes: &[u8] = &slice;
'inner: loop {
let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
read_size = read_size.saturating_mul(2);
break 'inner;
};
pos = cmp::min(pos + 1, bytes.len());
bytes = &bytes[pos..];
remaining -= 1;
if remaining == 0 {
let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
prev_leftover = slice.sliced(unconsumed_offset..);
return Ok(prev_leftover);
}
}
if bytes_read == 0 {
if raise_if_empty {
polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
} else {
return Ok(Buffer::new());
}
}
prev_leftover = Buffer::new();
if read_size < CompressedReader::ideal_read_size() {
read_size *= 4;
}
}
}
fn skip_lines_naive(
eol_char: u8,
skip_lines: usize,
raise_if_empty: bool,
decompressed_file_size_hint: Option<usize>,
reader: &mut ByteSourceReader<ReaderSource>,
) -> PolarsResult<Buffer<u8>> {
let mut prev_leftover = Buffer::new();
if skip_lines == 0 {
return Ok(prev_leftover);
}
let mut remaining = skip_lines;
let mut read_size = CompressedReader::initial_read_size();
loop {
let (slice, bytes_read) =
reader.read_next_slice(&prev_leftover, read_size, decompressed_file_size_hint)?;
let mut bytes: &[u8] = &slice;
'inner: loop {
let Some(mut pos) = memchr::memchr(eol_char, bytes) else {
read_size = read_size.saturating_mul(2);
break 'inner;
};
pos = cmp::min(pos + 1, bytes.len());
bytes = &bytes[pos..];
remaining -= 1;
if remaining == 0 {
let unconsumed_offset = bytes.as_ptr() as usize - slice.as_ptr() as usize;
prev_leftover = slice.sliced(unconsumed_offset..);
return Ok(prev_leftover);
}
}
if bytes_read == 0 {
if raise_if_empty {
polars_bail!(NoData: "specified skip_lines is larger than total number of lines.");
} else {
return Ok(Buffer::new());
}
}
prev_leftover = Buffer::new();
if read_size < CompressedReader::ideal_read_size() {
read_size *= 4;
}
}
}
fn infer_schema(
header_line: &Option<Buffer<u8>>,
content_lines: &[Buffer<u8>],
infer_all_as_str: bool,
options: &CsvReadOptions,
projected_schema: Option<SchemaRef>,
) -> PolarsResult<Schema> {
let has_no_inference_data = if options.has_header {
header_line.is_none()
} else {
content_lines.is_empty()
};
if options.raise_if_empty && has_no_inference_data {
polars_bail!(NoData: "empty CSV");
}
let mut inferred_schema = if has_no_inference_data {
Schema::default()
} else {
infer_file_schema_impl(
header_line,
content_lines,
infer_all_as_str,
&options.parse_options,
options.schema_overwrite.as_deref(),
)
};
if let Some(schema) = &options.schema {
if schema.len() < inferred_schema.len() && !options.parse_options.truncate_ragged_lines {
polars_bail!(
SchemaMismatch:
"provided schema does not match number of columns in file ({} != {} in file)",
schema.len(),
inferred_schema.len(),
);
}
if options.parse_options.truncate_ragged_lines {
inferred_schema = Arc::unwrap_or_clone(schema.clone());
} else {
inferred_schema = schema
.iter_names()
.zip(inferred_schema.into_iter().map(|(_, dtype)| dtype))
.map(|(name, dtype)| (name.clone(), dtype))
.collect();
}
}
if let Some(dtypes) = options.dtype_overwrite.as_deref() {
for (i, dtype) in dtypes.iter().enumerate() {
inferred_schema.set_dtype_at_index(i, dtype.clone());
}
}
if let Some(projected_schema) = projected_schema {
for (name, inferred_dtype) in inferred_schema.iter_mut() {
if let Some(projected_dtype) = projected_schema.get(name) {
*inferred_dtype = projected_dtype.clone();
}
}
}
Ok(inferred_schema)
}