use num::traits::Pow;
use polars_core::prelude::*;
use super::buffer::*;
use crate::csv::read::NullValuesCompiled;
pub(crate) fn skip_bom(input: &[u8]) -> &[u8] {
if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
&input[3..]
} else {
input
}
}
pub(crate) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
let pos = memchr::memchr(eol_char, input)? + 1;
if input.len() - pos == 0 {
return None;
}
Some(pos)
}
pub(crate) fn next_line_position(
mut input: &[u8],
mut expected_fields: Option<usize>,
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
) -> Option<usize> {
let mut total_pos = 0;
if input.is_empty() {
return None;
}
let mut lines_checked = 0u16;
loop {
lines_checked += 1;
if lines_checked == 256 {
if let Some(ef) = expected_fields {
expected_fields = Some(ef.saturating_sub(1))
}
};
let pos = memchr::memchr(eol_char, input)? + 1;
if input.len() - pos == 0 {
return None;
}
debug_assert!(pos <= input.len());
let new_input = unsafe { input.get_unchecked(pos..) };
let line = SplitLines::new(new_input, quote_char.unwrap_or(b'"'), eol_char).next();
let count_fields = |line: &[u8]| {
SplitFields::new(line, delimiter, quote_char, eol_char)
.into_iter()
.count()
};
match (line, expected_fields) {
(Some(line), Some(expected_fields)) if { count_fields(line) == expected_fields } => {
return Some(total_pos + pos)
}
(Some(_), Some(_)) => {
debug_assert!(pos < input.len());
unsafe {
input = input.get_unchecked(pos + 1..);
}
total_pos += pos + 1;
}
(Some(_), None) => return Some(total_pos + pos),
(None, Some(expected_fields)) if { count_fields(new_input) == expected_fields } => {
return Some(total_pos + pos)
}
_ => return None,
}
}
}
pub(crate) fn is_line_ending(b: u8, eol_char: u8) -> bool {
b == eol_char || b == b'\r'
}
pub(crate) fn is_whitespace(b: u8) -> bool {
b == b' ' || b == b'\t'
}
#[inline]
fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
where
F: Fn(u8) -> bool,
{
if input.is_empty() {
return input;
}
let mut read = 0;
let len = input.len();
while read < len {
let b = input[read];
if !f(b) {
break;
}
read += 1;
}
&input[read..]
}
pub(crate) fn skip_header(input: &[u8], eol_char: u8) -> (&[u8], usize) {
match next_line_position_naive(input, eol_char) {
Some(mut pos) => {
if input[pos] == eol_char {
pos += 1;
}
(&input[pos..], pos)
}
None => (&[], input.len()),
}
}
#[inline]
pub(crate) fn skip_whitespace(input: &[u8]) -> &[u8] {
skip_condition(input, is_whitespace)
}
#[inline]
pub(crate) fn skip_whitespace_exclude(input: &[u8], exclude: u8) -> &[u8] {
skip_condition(input, |b| b != exclude && (is_whitespace(b)))
}
#[inline]
pub(crate) fn skip_whitespace_line_ending_exclude(
input: &[u8],
exclude: u8,
eol_char: u8,
) -> &[u8] {
skip_condition(input, |b| {
b != exclude && (is_whitespace(b) || is_line_ending(b, eol_char))
})
}
#[inline]
pub(crate) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
skip_condition(input, |b| is_line_ending(b, eol_char))
}
pub(crate) fn get_line_stats(
bytes: &[u8],
n_lines: usize,
eol_char: u8,
expected_fields: usize,
delimiter: u8,
quote_char: Option<u8>,
) -> Option<(f32, f32)> {
let mut lengths = Vec::with_capacity(n_lines);
let mut bytes_trunc;
let n_lines_per_iter = n_lines / 2;
let mut n_read = 0;
for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
bytes_trunc = &bytes[offset..];
let pos = next_line_position(
bytes_trunc,
Some(expected_fields),
delimiter,
quote_char,
eol_char,
)?;
bytes_trunc = &bytes_trunc[pos + 1..];
for _ in offset..(offset + n_lines_per_iter) {
let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
n_read += pos;
lengths.push(pos);
bytes_trunc = &bytes_trunc[pos..];
}
}
let n_samples = lengths.len();
let mean = (n_read as f32) / (n_samples as f32);
let mut std = 0.0;
for &len in lengths.iter() {
std += (len as f32 - mean).pow(2.0)
}
std = (std / n_samples as f32).sqrt();
Some((mean, std))
}
pub(crate) struct SplitLines<'a> {
v: &'a [u8],
quote_char: u8,
end_line_char: u8,
}
impl<'a> SplitLines<'a> {
pub(crate) fn new(slice: &'a [u8], quote_char: u8, end_line_char: u8) -> Self {
Self {
v: slice,
quote_char,
end_line_char,
}
}
}
impl<'a> Iterator for SplitLines<'a> {
type Item = &'a [u8];
#[inline]
fn next(&mut self) -> Option<&'a [u8]> {
if self.v.is_empty() {
return None;
}
let mut in_field = false;
let mut pos = 0u32;
let mut iter = self.v.iter();
loop {
match iter.next() {
Some(&c) => {
pos += 1;
if c == self.quote_char {
in_field = !in_field;
}
else if c == self.end_line_char && !in_field {
break;
}
}
None => {
return None;
}
}
}
unsafe {
debug_assert!((pos as usize) <= self.v.len());
let ret = Some(self.v.get_unchecked(..(pos - 1) as usize));
self.v = self.v.get_unchecked(pos as usize..);
ret
}
}
}
pub(crate) struct SplitFields<'a> {
v: &'a [u8],
delimiter: u8,
finished: bool,
quote_char: u8,
quoting: bool,
eol_char: u8,
}
impl<'a> SplitFields<'a> {
pub(crate) fn new(
slice: &'a [u8],
delimiter: u8,
quote_char: Option<u8>,
eol_char: u8,
) -> Self {
Self {
v: slice,
delimiter,
finished: false,
quote_char: quote_char.unwrap_or(b'"'),
quoting: quote_char.is_some(),
eol_char,
}
}
unsafe fn finish_eol(&mut self, need_escaping: bool, idx: usize) -> Option<(&'a [u8], bool)> {
self.finished = true;
debug_assert!(idx <= self.v.len());
Some((self.v.get_unchecked(..idx), need_escaping))
}
fn finish(&mut self, need_escaping: bool) -> Option<(&'a [u8], bool)> {
self.finished = true;
Some((self.v, need_escaping))
}
fn eof_oel(&self, current_ch: u8) -> bool {
current_ch == self.delimiter || current_ch == self.eol_char
}
}
#[inline]
fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
let mut in_field = false;
let mut idx = 0u32;
#[allow(clippy::explicit_counter_loop)]
for &c in bytes.iter() {
if c == quote_char {
in_field = !in_field;
}
if !in_field && c == needle {
return Some(idx as usize);
}
idx += 1;
}
None
}
impl<'a> Iterator for SplitFields<'a> {
type Item = (&'a [u8], bool);
#[inline]
fn next(&mut self) -> Option<(&'a [u8], bool)> {
if self.v.is_empty() || self.finished {
return None;
}
let mut needs_escaping = false;
let pos = if self.quoting && unsafe { *self.v.get_unchecked(0) } == self.quote_char {
needs_escaping = true;
let mut in_field = false;
let mut idx = 0u32;
let mut current_idx = 0u32;
#[allow(clippy::explicit_counter_loop)]
for &c in self.v.iter() {
if c == self.quote_char {
in_field = !in_field;
}
if !in_field && self.eof_oel(c) {
if c == self.eol_char {
return unsafe { self.finish_eol(needs_escaping, current_idx as usize) };
}
idx = current_idx;
break;
}
current_idx += 1;
}
if idx == 0 {
return self.finish(needs_escaping);
}
idx as usize
} else {
match memchr::memchr2(self.delimiter, self.eol_char, self.v) {
None => return self.finish(needs_escaping),
Some(idx) => unsafe {
if *self.v.get_unchecked(idx) == self.eol_char {
return self.finish_eol(needs_escaping, idx);
} else {
idx
}
},
}
};
unsafe {
debug_assert!(pos <= self.v.len());
let ret = Some((self.v.get_unchecked(..pos), needs_escaping));
self.v = self.v.get_unchecked(pos + 1..);
ret
}
}
}
#[inline]
fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
let pos = match quote {
Some(quote) => find_quoted(bytes, quote, eol_char),
None => bytes.iter().position(|x| *x == eol_char),
};
match pos {
None => &[],
Some(pos) => &bytes[pos + 1..],
}
}
#[allow(clippy::too_many_arguments)]
pub(super) fn parse_lines<'a>(
mut bytes: &'a [u8],
offset: usize,
delimiter: u8,
comment_char: Option<u8>,
quote_char: Option<u8>,
eol_char: u8,
null_values: Option<&NullValuesCompiled>,
projection: &[usize],
buffers: &mut [Buffer<'a>],
ignore_parser_errors: bool,
n_lines: usize,
schema_len: usize,
) -> PolarsResult<usize> {
assert!(
!projection.is_empty(),
"at least one column should be projected"
);
let start = bytes.as_ptr() as usize;
let original_bytes_len = bytes.len();
let n_lines = n_lines as u32;
let mut line_count = 0u32;
loop {
if line_count > n_lines {
let end = bytes.as_ptr() as usize;
return Ok(end - start);
}
bytes = if schema_len > 1 {
skip_whitespace_line_ending_exclude(bytes, delimiter, eol_char)
} else {
skip_whitespace_exclude(bytes, delimiter)
};
if bytes.is_empty() {
return Ok(original_bytes_len);
}
if let Some(c) = comment_char {
if bytes[0] == c {
let bytes_rem = skip_this_line(bytes, quote_char, eol_char);
bytes = bytes_rem;
continue;
}
}
let mut projection_iter = projection.iter().copied();
let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
let mut processed_fields = 0;
let mut iter = SplitFields::new(bytes, delimiter, quote_char, eol_char);
let mut idx = 0u32;
let mut read_sol = 0;
loop {
match iter.next() {
None => {
bytes = &bytes[std::cmp::min(read_sol, bytes.len())..];
break;
}
Some((mut field, needs_escaping)) => {
idx += 1;
let field_len = field.len();
read_sol += field_len + 1;
if (idx - 1) == next_projected as u32 {
if field_len > 0 && field[field_len - 1] == b'\r' {
field = &field[..field_len - 1];
}
debug_assert!(processed_fields < buffers.len());
let buf = unsafe {
buffers.get_unchecked_mut(processed_fields)
};
let mut add_null = false;
if let Some(null_values) = null_values {
let field = if needs_escaping && !field.is_empty() {
&field[1..field.len() - 1]
} else {
field
};
add_null = unsafe { null_values.is_null(field, processed_fields) }
}
if add_null {
buf.add_null()
} else {
buf.add(field, ignore_parser_errors, needs_escaping)
.map_err(|_| {
let bytes_offset = offset + field.as_ptr() as usize - start;
let unparsable = String::from_utf8_lossy(field);
PolarsError::ComputeError(
format!(
"Could not parse `{}` as dtype {:?} at column {}.\n\
The current offset in the file is {} bytes.\n\
\n\
Consider specifying the correct dtype, increasing\n\
the number of records used to infer the schema,\n\
running the parser with `ignore_parser_errors=true`\n\
or adding `{}` to the `null_values` list.",
&unparsable,
buf.dtype(),
idx,
bytes_offset,
&unparsable,
)
.into(),
)
})?;
}
processed_fields += 1;
match projection_iter.next() {
Some(p) => next_projected = p,
None => {
if bytes.get(read_sol - 1) == Some(&eol_char) {
bytes = &bytes[read_sol..];
} else {
let bytes_rem = skip_this_line(
&bytes[read_sol - 1..],
quote_char,
eol_char,
);
bytes = bytes_rem;
}
break;
}
}
}
}
}
}
while processed_fields < projection.len() {
debug_assert!(processed_fields < buffers.len());
let buf = unsafe {
buffers.get_unchecked_mut(processed_fields)
};
buf.add_null();
processed_fields += 1;
}
line_count += 1;
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_splitfields() {
let input = "\"foo\",\"bar\"";
let mut fields = SplitFields::new(input.as_bytes(), b',', Some(b'"'), b'\n');
assert_eq!(fields.next(), Some(("\"foo\"".as_bytes(), true)));
assert_eq!(fields.next(), Some(("\"bar\"".as_bytes(), true)));
assert_eq!(fields.next(), None);
let input2 = "\"foo\n bar\";\"baz\";12345";
let mut fields2 = SplitFields::new(input2.as_bytes(), b';', Some(b'"'), b'\n');
assert_eq!(fields2.next(), Some(("\"foo\n bar\"".as_bytes(), true)));
assert_eq!(fields2.next(), Some(("\"baz\"".as_bytes(), true)));
assert_eq!(fields2.next(), Some(("12345".as_bytes(), false)));
assert_eq!(fields2.next(), None);
}
#[test]
fn test_splitlines() {
let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
let mut lines = SplitLines::new(input.as_bytes(), b'"', b'\n');
assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
assert_eq!(lines.next(), None);
let input2 = "1,'foo\n'\n2,'foo\n'\n";
let mut lines2 = SplitLines::new(input2.as_bytes(), b'\'', b'\n');
assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
assert_eq!(lines2.next(), None);
}
}