polars_io/csv/read/
parser.rs

1use memchr::memchr2_iter;
2use polars_core::prelude::*;
3use polars_core::{POOL, config};
4use polars_error::feature_gated;
5use polars_utils::mmap::MMapSemaphore;
6use polars_utils::plpath::PlPathRef;
7use polars_utils::select::select_unpredictable;
8use rayon::prelude::*;
9
10use super::CsvParseOptions;
11use super::buffer::Buffer;
12use super::options::{CommentPrefix, NullValuesCompiled};
13use super::splitfields::SplitFields;
14use crate::prelude::_csv_read_internal::find_starting_point;
15use crate::utils::compression::maybe_decompress_bytes;
16
17/// Read the number of rows without parsing columns
18/// useful for count(*) queries
19#[allow(clippy::too_many_arguments)]
20pub fn count_rows(
21    addr: PlPathRef<'_>,
22    quote_char: Option<u8>,
23    comment_prefix: Option<&CommentPrefix>,
24    eol_char: u8,
25    has_header: bool,
26    skip_lines: usize,
27    skip_rows_before_header: usize,
28    skip_rows_after_header: usize,
29) -> PolarsResult<usize> {
30    let file = match addr
31        .as_local_path()
32        .and_then(|v| (!config::force_async()).then_some(v))
33    {
34        None => feature_gated!("cloud", {
35            crate::file_cache::FILE_CACHE
36                .get_entry(addr)
37                // Safety: This was initialized by schema inference.
38                .unwrap()
39                .try_open_assume_latest()?
40        }),
41        Some(path) => polars_utils::open_file(path)?,
42    };
43
44    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
45    let owned = &mut vec![];
46    let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
47
48    count_rows_from_slice_par(
49        reader_bytes,
50        quote_char,
51        comment_prefix,
52        eol_char,
53        has_header,
54        skip_lines,
55        skip_rows_before_header,
56        skip_rows_after_header,
57    )
58}
59
60/// Read the number of rows without parsing columns
61/// useful for count(*) queries
62#[allow(clippy::too_many_arguments)]
63pub fn count_rows_from_slice_par(
64    mut bytes: &[u8],
65    quote_char: Option<u8>,
66    comment_prefix: Option<&CommentPrefix>,
67    eol_char: u8,
68    has_header: bool,
69    skip_lines: usize,
70    skip_rows_before_header: usize,
71    skip_rows_after_header: usize,
72) -> PolarsResult<usize> {
73    let start_offset = find_starting_point(
74        bytes,
75        quote_char,
76        eol_char,
77        // schema_len
78        // NOTE: schema_len is normally required to differentiate handling a leading blank line
79        // between case (a) when schema_len == 1 (as an empty string) vs case (b) when
80        // schema_len > 1 (as a blank line to be ignored).
81        // We skip blank lines, even when UFT8-BOM is present and schema_len == 1.
82        usize::MAX,
83        skip_lines,
84        skip_rows_before_header,
85        skip_rows_after_header,
86        comment_prefix,
87        has_header,
88    )?;
89    bytes = &bytes[start_offset..];
90
91    #[cfg(debug_assertions)]
92    const BYTES_PER_CHUNK: usize = 128;
93    #[cfg(not(debug_assertions))]
94    const BYTES_PER_CHUNK: usize = 1 << 16;
95
96    let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
97    POOL.install(|| {
98        let mut states = Vec::new();
99        if comment_prefix.is_none() {
100            bytes
101                .par_chunks(BYTES_PER_CHUNK)
102                .map(|chunk| count.analyze_chunk(chunk))
103                .collect_into_vec(&mut states);
104        } else {
105            let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
106            (0..num_chunks)
107                .into_par_iter()
108                .map(|chunk_idx| {
109                    let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
110                    let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
111
112                    if start_offset != 0 {
113                        // Ensure we start at the start of a line.
114                        if let Some(nl_off) = bytes[start_offset..next_start_offset]
115                            .iter()
116                            .position(|b| *b == eol_char)
117                        {
118                            start_offset += nl_off + 1;
119                        } else {
120                            return count.analyze_chunk(&[]);
121                        }
122                    }
123
124                    let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
125                        .iter()
126                        .position(|b| *b == eol_char)
127                    {
128                        next_start_offset + nl_off + 1
129                    } else {
130                        bytes.len()
131                    };
132
133                    count.analyze_chunk(&bytes[start_offset..stop_offset])
134                })
135                .collect_into_vec(&mut states);
136        }
137
138        let mut n = 0;
139        let mut in_string = false;
140        for pair in states {
141            n += pair[in_string as usize].newline_count;
142            in_string = pair[in_string as usize].end_inside_string;
143        }
144        if let Some(last) = bytes.last()
145            && *last != eol_char
146            && (comment_prefix.is_none()
147                || !is_comment_line(
148                    bytes.rsplit(|c| *c == eol_char).next().unwrap(),
149                    comment_prefix,
150                ))
151        {
152            n += 1
153        }
154
155        Ok(n)
156    })
157}
158
159/// Skip the utf-8 Byte Order Mark.
160/// credits to csv-core
161pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
162    if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
163        &input[3..]
164    } else {
165        input
166    }
167}
168
169/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
170///
171/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
172#[inline]
173pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
174    match comment_prefix {
175        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
176        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
177        None => false,
178    }
179}
180
181/// Find the nearest next line position.
182/// Does not check for new line characters embedded in String fields.
183pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
184    let pos = memchr::memchr(eol_char, input)? + 1;
185    if input.len() - pos == 0 {
186        return None;
187    }
188    Some(pos)
189}
190
191pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
192    for _ in 0..skip {
193        if let Some(pos) = next_line_position_naive(input, eol_char) {
194            input = &input[pos..];
195        } else {
196            return input;
197        }
198    }
199    input
200}
201
202/// Find the nearest next line position that is not embedded in a String field.
203pub(super) fn next_line_position(
204    mut input: &[u8],
205    mut expected_fields: Option<usize>,
206    separator: u8,
207    quote_char: Option<u8>,
208    eol_char: u8,
209) -> Option<usize> {
210    fn accept_line(
211        line: &[u8],
212        expected_fields: usize,
213        separator: u8,
214        eol_char: u8,
215        quote_char: Option<u8>,
216    ) -> bool {
217        let mut count = 0usize;
218        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
219            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
220                return false;
221            }
222            count += 1;
223        }
224
225        // if the latest field is missing
226        // e.g.:
227        // a,b,c
228        // vala,valb,
229        // SplitFields returns a count that is 1 less
230        // There fore we accept:
231        // expected == count
232        // and
233        // expected == count - 1
234        expected_fields.wrapping_sub(count) <= 1
235    }
236
237    // we check 3 subsequent lines for `accept_line` before we accept
238    // if 3 groups are rejected we reject completely
239    let mut rejected_line_groups = 0u8;
240
241    let mut total_pos = 0;
242    if input.is_empty() {
243        return None;
244    }
245    let mut lines_checked = 0u8;
246    loop {
247        if rejected_line_groups >= 3 {
248            return None;
249        }
250        lines_checked = lines_checked.wrapping_add(1);
251        // headers might have an extra value
252        // So if we have churned through enough lines
253        // we try one field less.
254        if lines_checked == u8::MAX {
255            if let Some(ef) = expected_fields {
256                expected_fields = Some(ef.saturating_sub(1))
257            }
258        };
259        let pos = memchr::memchr(eol_char, input)? + 1;
260        if input.len() - pos == 0 {
261            return None;
262        }
263        debug_assert!(pos <= input.len());
264        let new_input = unsafe { input.get_unchecked(pos..) };
265        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
266        let line = lines.next();
267
268        match (line, expected_fields) {
269            // count the fields, and determine if they are equal to what we expect from the schema
270            (Some(line), Some(expected_fields)) => {
271                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
272                    let mut valid = true;
273                    for line in lines.take(2) {
274                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
275                            valid = false;
276                            break;
277                        }
278                    }
279                    if valid {
280                        return Some(total_pos + pos);
281                    } else {
282                        rejected_line_groups += 1;
283                    }
284                } else {
285                    debug_assert!(pos < input.len());
286                    unsafe {
287                        input = input.get_unchecked(pos + 1..);
288                    }
289                    total_pos += pos + 1;
290                }
291            },
292            // don't count the fields
293            (Some(_), None) => return Some(total_pos + pos),
294            // // no new line found, check latest line (without eol) for number of fields
295            _ => return None,
296        }
297    }
298}
299
300#[inline(always)]
301pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
302    b == eol_char || b == b'\r'
303}
304
305#[inline(always)]
306pub(super) fn is_whitespace(b: u8) -> bool {
307    b == b' ' || b == b'\t'
308}
309
310/// May have false-positives, but not false negatives.
311#[inline(always)]
312pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
313    // We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are
314    // <= 32. In that range there aren't a lot of other common symbols (besides
315    // newline), so this is a quick test which can be worth doing to avoid the
316    // exact test.
317    b <= 32
318}
319
320#[inline]
321fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
322where
323    F: Fn(u8) -> bool,
324{
325    if input.is_empty() {
326        return input;
327    }
328
329    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
330    &input[read..]
331}
332
333/// Remove whitespace from the start of buffer.
334/// Makes sure that the bytes stream starts with
335///     'field_1,field_2'
336/// and not with
337///     '\nfield_1,field_1'
338#[inline]
339pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
340    skip_condition(input, is_whitespace)
341}
342
343#[inline]
344pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
345    skip_condition(input, |b| is_line_ending(b, eol_char))
346}
347
348/// An adapted version of std::iter::Split.
349/// This exists solely because we cannot split the file in lines naively as
350///
351/// ```text
352///    for line in bytes.split(b'\n') {
353/// ```
354///
355/// This will fail when strings fields are have embedded end line characters.
356/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
357pub(super) struct SplitLines<'a> {
358    v: &'a [u8],
359    quote_char: u8,
360    eol_char: u8,
361    #[cfg(feature = "simd")]
362    simd_eol_char: SimdVec,
363    #[cfg(feature = "simd")]
364    simd_quote_char: SimdVec,
365    #[cfg(feature = "simd")]
366    previous_valid_eols: u64,
367    total_index: usize,
368    quoting: bool,
369    comment_prefix: Option<&'a CommentPrefix>,
370}
371
372#[cfg(feature = "simd")]
373const SIMD_SIZE: usize = 64;
374#[cfg(feature = "simd")]
375use std::simd::prelude::*;
376
377#[cfg(feature = "simd")]
378use polars_utils::clmul::prefix_xorsum_inclusive;
379
380#[cfg(feature = "simd")]
381type SimdVec = u8x64;
382
383impl<'a> SplitLines<'a> {
384    pub(super) fn new(
385        slice: &'a [u8],
386        quote_char: Option<u8>,
387        eol_char: u8,
388        comment_prefix: Option<&'a CommentPrefix>,
389    ) -> Self {
390        let quoting = quote_char.is_some();
391        let quote_char = quote_char.unwrap_or(b'\"');
392        #[cfg(feature = "simd")]
393        let simd_eol_char = SimdVec::splat(eol_char);
394        #[cfg(feature = "simd")]
395        let simd_quote_char = SimdVec::splat(quote_char);
396        Self {
397            v: slice,
398            quote_char,
399            eol_char,
400            #[cfg(feature = "simd")]
401            simd_eol_char,
402            #[cfg(feature = "simd")]
403            simd_quote_char,
404            #[cfg(feature = "simd")]
405            previous_valid_eols: 0,
406            total_index: 0,
407            quoting,
408            comment_prefix,
409        }
410    }
411}
412
413impl<'a> SplitLines<'a> {
414    // scalar as in non-simd
415    fn next_scalar(&mut self) -> Option<&'a [u8]> {
416        if self.v.is_empty() {
417            return None;
418        }
419        if is_comment_line(self.v, self.comment_prefix) {
420            return self.next_comment_line();
421        }
422        {
423            let mut pos = 0u32;
424            let mut iter = self.v.iter();
425            let mut in_field = false;
426            loop {
427                match iter.next() {
428                    Some(&c) => {
429                        pos += 1;
430
431                        if self.quoting && c == self.quote_char {
432                            // toggle between string field enclosure
433                            //      if we encounter a starting '"' -> in_field = true;
434                            //      if we encounter a closing '"' -> in_field = false;
435                            in_field = !in_field;
436                        }
437                        // if we are not in a string and we encounter '\n' we can stop at this position.
438                        else if c == self.eol_char && !in_field {
439                            break;
440                        }
441                    },
442                    None => {
443                        let remainder = self.v;
444                        self.v = &[];
445                        return Some(remainder);
446                    },
447                }
448            }
449
450            unsafe {
451                debug_assert!((pos as usize) <= self.v.len());
452
453                // return line up to this position
454                let ret = Some(
455                    self.v
456                        .get_unchecked(..(self.total_index + pos as usize - 1)),
457                );
458                // skip the '\n' token and update slice.
459                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
460                ret
461            }
462        }
463    }
464    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
465        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
466            unsafe {
467                // return line up to this position
468                let ret = Some(self.v.get_unchecked(..(pos - 1)));
469                // skip the '\n' token and update slice.
470                self.v = self.v.get_unchecked(pos..);
471                ret
472            }
473        } else {
474            let remainder = self.v;
475            self.v = &[];
476            Some(remainder)
477        }
478    }
479}
480
481impl<'a> Iterator for SplitLines<'a> {
482    type Item = &'a [u8];
483
484    #[inline]
485    #[cfg(not(feature = "simd"))]
486    fn next(&mut self) -> Option<&'a [u8]> {
487        self.next_scalar()
488    }
489
490    #[inline]
491    #[cfg(feature = "simd")]
492    fn next(&mut self) -> Option<&'a [u8]> {
493        // First check cached value
494        if self.previous_valid_eols != 0 {
495            let pos = self.previous_valid_eols.trailing_zeros() as usize;
496            self.previous_valid_eols >>= (pos + 1) as u64;
497
498            unsafe {
499                debug_assert!((pos) <= self.v.len());
500
501                // return line up to this position
502                let ret = Some(self.v.get_unchecked(..pos));
503                // skip the '\n' token and update slice.
504                self.v = self.v.get_unchecked(pos + 1..);
505                return ret;
506            }
507        }
508        if self.v.is_empty() {
509            return None;
510        }
511        if self.comment_prefix.is_some() {
512            return self.next_scalar();
513        }
514
515        self.total_index = 0;
516        let mut not_in_field_previous_iter = true;
517
518        loop {
519            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
520            if bytes.len() > SIMD_SIZE {
521                let lane: [u8; SIMD_SIZE] = unsafe {
522                    bytes
523                        .get_unchecked(0..SIMD_SIZE)
524                        .try_into()
525                        .unwrap_unchecked()
526                };
527                let simd_bytes = SimdVec::from(lane);
528                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
529
530                let valid_eols = if self.quoting {
531                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
532                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
533
534                    if not_in_field_previous_iter {
535                        not_in_quote_field = !not_in_quote_field;
536                    }
537                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
538                    eol_mask & not_in_quote_field
539                } else {
540                    eol_mask
541                };
542
543                if valid_eols != 0 {
544                    let pos = valid_eols.trailing_zeros() as usize;
545                    if pos == SIMD_SIZE - 1 {
546                        self.previous_valid_eols = 0;
547                    } else {
548                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
549                    }
550
551                    unsafe {
552                        let pos = self.total_index + pos;
553                        debug_assert!((pos) <= self.v.len());
554
555                        // return line up to this position
556                        let ret = Some(self.v.get_unchecked(..pos));
557                        // skip the '\n' token and update slice.
558                        self.v = self.v.get_unchecked(pos + 1..);
559                        return ret;
560                    }
561                } else {
562                    self.total_index += SIMD_SIZE;
563                }
564            } else {
565                // Denotes if we are in a string field, started with a quote
566                let mut in_field = !not_in_field_previous_iter;
567                let mut pos = 0u32;
568                let mut iter = bytes.iter();
569                loop {
570                    match iter.next() {
571                        Some(&c) => {
572                            pos += 1;
573
574                            if self.quoting && c == self.quote_char {
575                                // toggle between string field enclosure
576                                //      if we encounter a starting '"' -> in_field = true;
577                                //      if we encounter a closing '"' -> in_field = false;
578                                in_field = !in_field;
579                            }
580                            // if we are not in a string and we encounter '\n' we can stop at this position.
581                            else if c == self.eol_char && !in_field {
582                                break;
583                            }
584                        },
585                        None => {
586                            let remainder = self.v;
587                            self.v = &[];
588                            return Some(remainder);
589                        },
590                    }
591                }
592
593                unsafe {
594                    debug_assert!((pos as usize) <= self.v.len());
595
596                    // return line up to this position
597                    let ret = Some(
598                        self.v
599                            .get_unchecked(..(self.total_index + pos as usize - 1)),
600                    );
601                    // skip the '\n' token and update slice.
602                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
603                    return ret;
604                }
605            }
606        }
607    }
608}
609
610pub struct CountLines {
611    quote_char: u8,
612    eol_char: u8,
613    #[cfg(feature = "simd")]
614    simd_eol_char: SimdVec,
615    #[cfg(feature = "simd")]
616    simd_quote_char: SimdVec,
617    quoting: bool,
618    comment_prefix: Option<CommentPrefix>,
619}
620
621#[derive(Copy, Clone, Debug, Default)]
622pub struct LineStats {
623    newline_count: usize,
624    last_newline_offset: usize,
625    end_inside_string: bool,
626}
627
628impl CountLines {
629    pub fn new(
630        quote_char: Option<u8>,
631        eol_char: u8,
632        comment_prefix: Option<CommentPrefix>,
633    ) -> Self {
634        let quoting = quote_char.is_some();
635        let quote_char = quote_char.unwrap_or(b'\"');
636        #[cfg(feature = "simd")]
637        let simd_eol_char = SimdVec::splat(eol_char);
638        #[cfg(feature = "simd")]
639        let simd_quote_char = SimdVec::splat(quote_char);
640        Self {
641            quote_char,
642            eol_char,
643            #[cfg(feature = "simd")]
644            simd_eol_char,
645            #[cfg(feature = "simd")]
646            simd_quote_char,
647            quoting,
648            comment_prefix,
649        }
650    }
651
652    /// Analyzes a chunk of CSV data.
653    ///
654    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
655    /// the first is assuming the start of the chunk is *not* inside a string,
656    /// the second assuming the start is inside a string.
657    ///
658    /// If comment_prefix is not None the start of bytes must be at the start of
659    /// a line (and thus not in the middle of a comment).
660    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
661        let mut states = [
662            LineStats {
663                newline_count: 0,
664                last_newline_offset: 0,
665                end_inside_string: false,
666            },
667            LineStats {
668                newline_count: 0,
669                last_newline_offset: 0,
670                end_inside_string: false,
671            },
672        ];
673
674        // If we have to deal with comments we can't use SIMD and have to explicitly do two passes.
675        if self.comment_prefix.is_some() {
676            states[0] = self.analyze_chunk_with_comment(bytes, false);
677            states[1] = self.analyze_chunk_with_comment(bytes, true);
678            return states;
679        }
680
681        // False if even number of quotes seen so far, true otherwise.
682        #[allow(unused_assignments)]
683        let mut global_quote_parity = false;
684        let mut scan_offset = 0;
685
686        #[cfg(feature = "simd")]
687        {
688            // 0 if even number of quotes seen so far, u64::MAX otherwise.
689            let mut global_quote_parity_mask = 0;
690            while scan_offset + 64 <= bytes.len() {
691                let block: [u8; 64] = unsafe {
692                    bytes
693                        .get_unchecked(scan_offset..scan_offset + 64)
694                        .try_into()
695                        .unwrap_unchecked()
696                };
697                let simd_bytes = SimdVec::from(block);
698                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
699                if self.quoting {
700                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
701                    let quote_parity =
702                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
703                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
704
705                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
706                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
707                    states[0].last_newline_offset = select_unpredictable(
708                        start_outside_string_eol_mask != 0,
709                        (scan_offset + 63)
710                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
711                        states[0].last_newline_offset,
712                    );
713
714                    let start_inside_string_eol_mask = eol_mask & quote_parity;
715                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
716                    states[1].last_newline_offset = select_unpredictable(
717                        start_inside_string_eol_mask != 0,
718                        (scan_offset + 63)
719                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
720                        states[1].last_newline_offset,
721                    );
722                } else {
723                    states[0].newline_count += eol_mask.count_ones() as usize;
724                    states[0].last_newline_offset = select_unpredictable(
725                        eol_mask != 0,
726                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
727                        states[0].last_newline_offset,
728                    );
729                }
730
731                scan_offset += 64;
732            }
733
734            global_quote_parity = global_quote_parity_mask > 0;
735        }
736
737        while scan_offset < bytes.len() {
738            let c = unsafe { *bytes.get_unchecked(scan_offset) };
739            global_quote_parity ^= (c == self.quote_char) & self.quoting;
740
741            let state = &mut states[global_quote_parity as usize];
742            state.newline_count += (c == self.eol_char) as usize;
743            state.last_newline_offset =
744                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
745
746            scan_offset += 1;
747        }
748
749        states[0].end_inside_string = global_quote_parity;
750        states[1].end_inside_string = !global_quote_parity;
751        states
752    }
753
754    // bytes must begin at the start of a line.
755    fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
756        let pre_s = match self.comment_prefix.as_ref().unwrap() {
757            CommentPrefix::Single(pc) => core::slice::from_ref(pc),
758            CommentPrefix::Multi(ps) => ps.as_bytes(),
759        };
760
761        let mut state = LineStats::default();
762        let mut scan_offset = 0;
763        while scan_offset < bytes.len() {
764            // Skip comment line if needed.
765            while bytes[scan_offset..].starts_with(pre_s) {
766                scan_offset += pre_s.len();
767                let Some(nl_off) = bytes[scan_offset..]
768                    .iter()
769                    .position(|c| *c == self.eol_char)
770                else {
771                    break;
772                };
773                scan_offset += nl_off + 1;
774            }
775
776            while scan_offset < bytes.len() {
777                let c = unsafe { *bytes.get_unchecked(scan_offset) };
778                in_string ^= (c == self.quote_char) & self.quoting;
779
780                if c == self.eol_char && !in_string {
781                    state.newline_count += 1;
782                    state.last_newline_offset = scan_offset;
783                    scan_offset += 1;
784                    break;
785                } else {
786                    scan_offset += 1;
787                }
788            }
789        }
790
791        state.end_inside_string = in_string;
792        state
793    }
794
795    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
796        loop {
797            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
798
799            let (count, offset) = if self.comment_prefix.is_some() {
800                let stats = self.analyze_chunk_with_comment(b, false);
801                (stats.newline_count, stats.last_newline_offset)
802            } else {
803                self.count(b)
804            };
805
806            if count > 0 || b.len() == bytes.len() {
807                return (count, offset);
808            }
809
810            *chunk_size = chunk_size.saturating_mul(2);
811        }
812    }
813
814    /// Returns count and offset to split for remainder in slice.
815    #[cfg(feature = "simd")]
816    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
817        let mut total_idx = 0;
818        let original_bytes = bytes;
819        let mut count = 0;
820        let mut position = 0;
821        let mut not_in_field_previous_iter = true;
822
823        loop {
824            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
825
826            if bytes.len() > SIMD_SIZE {
827                let lane: [u8; SIMD_SIZE] = unsafe {
828                    bytes
829                        .get_unchecked(0..SIMD_SIZE)
830                        .try_into()
831                        .unwrap_unchecked()
832                };
833                let simd_bytes = SimdVec::from(lane);
834                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
835
836                let valid_eols = if self.quoting {
837                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
838                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
839
840                    if not_in_field_previous_iter {
841                        not_in_quote_field = !not_in_quote_field;
842                    }
843                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
844                    eol_mask & not_in_quote_field
845                } else {
846                    eol_mask
847                };
848
849                if valid_eols != 0 {
850                    count += valid_eols.count_ones() as usize;
851                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
852                    debug_assert_eq!(original_bytes[position], self.eol_char)
853                }
854                total_idx += SIMD_SIZE;
855            } else if bytes.is_empty() {
856                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
857                return (count, position);
858            } else {
859                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
860
861                let (count, position) = if c > 0 {
862                    (count + c, total_idx + o)
863                } else {
864                    (count, position)
865                };
866                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
867
868                return (count, position);
869            }
870        }
871    }
872
873    #[cfg(not(feature = "simd"))]
874    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
875        self.count_no_simd(bytes, false)
876    }
877
878    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
879        let iter = bytes.iter();
880        let mut in_field = in_field;
881        let mut count = 0;
882        let mut position = 0;
883
884        for b in iter {
885            let c = *b;
886            if self.quoting && c == self.quote_char {
887                // toggle between string field enclosure
888                //      if we encounter a starting '"' -> in_field = true;
889                //      if we encounter a closing '"' -> in_field = false;
890                in_field = !in_field;
891            }
892            // If we are not in a string and we encounter '\n' we can stop at this position.
893            else if c == self.eol_char && !in_field {
894                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
895                count += 1;
896            }
897        }
898        debug_assert!(count == 0 || bytes[position] == self.eol_char);
899
900        (count, position)
901    }
902}
903
904#[inline]
905fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
906    let mut in_field = false;
907
908    let mut idx = 0u32;
909    // micro optimizations
910    #[allow(clippy::explicit_counter_loop)]
911    for &c in bytes.iter() {
912        if c == quote_char {
913            // toggle between string field enclosure
914            //      if we encounter a starting '"' -> in_field = true;
915            //      if we encounter a closing '"' -> in_field = false;
916            in_field = !in_field;
917        }
918
919        if !in_field && c == needle {
920            return Some(idx as usize);
921        }
922        idx += 1;
923    }
924    None
925}
926
927#[inline]
928pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
929    let pos = match quote {
930        Some(quote) => find_quoted(bytes, quote, eol_char),
931        None => bytes.iter().position(|x| *x == eol_char),
932    };
933    match pos {
934        None => &[],
935        Some(pos) => &bytes[pos + 1..],
936    }
937}
938
939#[inline]
940pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
941    if let Some(pos) = next_line_position_naive(input, eol_char) {
942        unsafe { input.get_unchecked(pos..) }
943    } else {
944        &[]
945    }
946}
947
948/// Parse CSV.
949///
950/// # Arguments
951/// * `bytes` - input to parse
952/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
953///   thread has a different offset.
954/// * `projection` - Indices of the columns to project.
955/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
956///   fields are written to the buffers. The UTF8 data will be parsed later.
957///
958/// Returns the number of bytes parsed successfully.
959#[allow(clippy::too_many_arguments)]
960pub(super) fn parse_lines(
961    mut bytes: &[u8],
962    parse_options: &CsvParseOptions,
963    offset: usize,
964    ignore_errors: bool,
965    null_values: Option<&NullValuesCompiled>,
966    projection: &[usize],
967    buffers: &mut [Buffer],
968    n_lines: usize,
969    // length of original schema
970    schema_len: usize,
971    schema: &Schema,
972) -> PolarsResult<usize> {
973    assert!(
974        !projection.is_empty(),
975        "at least one column should be projected"
976    );
977    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
978    // During projection pushdown we are not checking other csv fields.
979    // This would be very expensive and we don't care as we only want
980    // the projected columns.
981    if projection.len() != schema_len {
982        truncate_ragged_lines = true
983    }
984
985    // we use the pointers to track the no of bytes read.
986    let start = bytes.as_ptr() as usize;
987    let original_bytes_len = bytes.len();
988    let n_lines = n_lines as u32;
989
990    let mut line_count = 0u32;
991    loop {
992        if line_count > n_lines {
993            let end = bytes.as_ptr() as usize;
994            return Ok(end - start);
995        }
996
997        if bytes.is_empty() {
998            return Ok(original_bytes_len);
999        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1000            // deal with comments
1001            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1002            bytes = bytes_rem;
1003            continue;
1004        }
1005
1006        // Every line we only need to parse the columns that are projected.
1007        // Therefore we check if the idx of the field is in our projected columns.
1008        // If it is not, we skip the field.
1009        let mut projection_iter = projection.iter().copied();
1010        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1011        let mut processed_fields = 0;
1012
1013        let mut iter = SplitFields::new(
1014            bytes,
1015            parse_options.separator,
1016            parse_options.quote_char,
1017            parse_options.eol_char,
1018        );
1019        let mut idx = 0u32;
1020        let mut read_sol = 0;
1021        loop {
1022            match iter.next() {
1023                // end of line
1024                None => {
1025                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1026                    break;
1027                },
1028                Some((mut field, needs_escaping)) => {
1029                    let field_len = field.len();
1030
1031                    // +1 is the split character that is consumed by the iterator.
1032                    read_sol += field_len + 1;
1033
1034                    if idx == next_projected as u32 {
1035                        // the iterator is finished when it encounters a `\n`
1036                        // this could be preceded by a '\r'
1037                        unsafe {
1038                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1039                                field = field.get_unchecked(..field_len - 1);
1040                            }
1041                        }
1042
1043                        debug_assert!(processed_fields < buffers.len());
1044                        let buf = unsafe {
1045                            // SAFETY: processed fields index can never exceed the projection indices.
1046                            buffers.get_unchecked_mut(processed_fields)
1047                        };
1048                        let mut add_null = false;
1049
1050                        // if we have null values argument, check if this field equal null value
1051                        if let Some(null_values) = null_values {
1052                            let field = if needs_escaping && !field.is_empty() {
1053                                unsafe { field.get_unchecked(1..field.len() - 1) }
1054                            } else {
1055                                field
1056                            };
1057
1058                            // SAFETY:
1059                            // process fields is in bounds
1060                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1061                        }
1062                        if add_null {
1063                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1064                        } else {
1065                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1066                                .map_err(|e| {
1067                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1068                                    let unparsable = String::from_utf8_lossy(field);
1069                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1070                                    polars_err!(
1071                                        ComputeError:
1072                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1073                                        The current offset in the file is {} bytes.\n\
1074                                        \n\
1075                                        You might want to try:\n\
1076                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1077                                        - specifying correct dtype with the `schema_overrides` argument\n\
1078                                        - setting `ignore_errors` to `True`,\n\
1079                                        - adding `{}` to the `null_values` list.\n\n\
1080                                        Original error: ```{}```",
1081                                        &unparsable,
1082                                        buf.dtype(),
1083                                        column_name,
1084                                        idx + 1,
1085                                        bytes_offset,
1086                                        &unparsable,
1087                                        e
1088                                    )
1089                                })?;
1090                        }
1091                        processed_fields += 1;
1092
1093                        // if we have all projected columns we are done with this line
1094                        match projection_iter.next() {
1095                            Some(p) => next_projected = p,
1096                            None => {
1097                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1098                                    bytes = unsafe { bytes.get_unchecked(read_sol..) };
1099                                } else {
1100                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1101                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1102
1103Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1104                                    }
1105                                    let bytes_rem = skip_this_line(
1106                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1107                                        parse_options.quote_char,
1108                                        parse_options.eol_char,
1109                                    );
1110                                    bytes = bytes_rem;
1111                                }
1112                                break;
1113                            },
1114                        }
1115                    }
1116                    idx += 1;
1117                },
1118            }
1119        }
1120
1121        // there can be lines that miss fields (also the comma values)
1122        // this means the splitter won't process them.
1123        // We traverse them to read them as null values.
1124        while processed_fields < projection.len() {
1125            debug_assert!(processed_fields < buffers.len());
1126            let buf = unsafe {
1127                // SAFETY: processed fields index can never exceed the projection indices.
1128                buffers.get_unchecked_mut(processed_fields)
1129            };
1130            buf.add_null(!parse_options.missing_is_null);
1131            processed_fields += 1;
1132        }
1133        line_count += 1;
1134    }
1135}
1136
1137#[cfg(test)]
1138mod test {
1139    use super::SplitLines;
1140
1141    #[test]
1142    fn test_splitlines() {
1143        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1144        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1145        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1146        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1147        assert_eq!(lines.next(), None);
1148
1149        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1150        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1151        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1152        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1153        assert_eq!(lines2.next(), None);
1154    }
1155}