polars_io/csv/read/
parser.rs

1use std::path::Path;
2
3use memchr::memchr2_iter;
4use num_traits::Pow;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use super::utils::get_file_chunks;
17use crate::path_utils::is_cloud_url;
18use crate::prelude::_csv_read_internal::find_starting_point;
19use crate::utils::compression::maybe_decompress_bytes;
20
21/// Read the number of rows without parsing columns
22/// useful for count(*) queries
23#[allow(clippy::too_many_arguments)]
24pub fn count_rows(
25    path: &Path,
26    separator: u8,
27    quote_char: Option<u8>,
28    comment_prefix: Option<&CommentPrefix>,
29    eol_char: u8,
30    has_header: bool,
31    skip_lines: usize,
32    skip_rows_before_header: usize,
33    skip_rows_after_header: usize,
34) -> PolarsResult<usize> {
35    let file = if is_cloud_url(path) || config::force_async() {
36        feature_gated!("cloud", {
37            crate::file_cache::FILE_CACHE
38                .get_entry(path.to_str().unwrap())
39                // Safety: This was initialized by schema inference.
40                .unwrap()
41                .try_open_assume_latest()?
42        })
43    } else {
44        polars_utils::open_file(path)?
45    };
46
47    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
48    let owned = &mut vec![];
49    let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
50
51    count_rows_from_slice_par(
52        reader_bytes,
53        separator,
54        quote_char,
55        comment_prefix,
56        eol_char,
57        has_header,
58        skip_lines,
59        skip_rows_before_header,
60        skip_rows_after_header,
61    )
62}
63
64/// Read the number of rows without parsing columns
65/// useful for count(*) queries
66#[allow(clippy::too_many_arguments)]
67pub fn count_rows_from_slice_par(
68    mut bytes: &[u8],
69    separator: u8,
70    quote_char: Option<u8>,
71    comment_prefix: Option<&CommentPrefix>,
72    eol_char: u8,
73    has_header: bool,
74    skip_lines: usize,
75    skip_rows_before_header: usize,
76    skip_rows_after_header: usize,
77) -> PolarsResult<usize> {
78    for _ in 0..bytes.len() {
79        if bytes[0] != eol_char {
80            break;
81        }
82
83        bytes = &bytes[1..];
84    }
85
86    // Skip lines and jump past header
87
88    let start_offset = find_starting_point(
89        bytes,
90        quote_char,
91        eol_char,
92        // schema_len
93        // NOTE: schema_len is normally required to differentiate handling a leading blank line
94        // between case (a) when schema_len == 1 (as an empty string) vs case (b) when
95        // schema_len > 1 (as a blank line to be ignored).
96        // We skip blank lines, even when UFT8-BOM is present and schema_len == 1.
97        usize::MAX,
98        skip_lines,
99        skip_rows_before_header,
100        skip_rows_after_header,
101        comment_prefix,
102        has_header,
103    )?;
104    bytes = &bytes[start_offset..];
105
106    const MIN_ROWS_PER_THREAD: usize = 1024;
107    let max_threads = POOL.current_num_threads();
108
109    // Determine if parallelism is beneficial and how many threads
110    let n_threads = get_line_stats(
111        bytes,
112        MIN_ROWS_PER_THREAD,
113        eol_char,
114        None,
115        separator,
116        quote_char,
117    )
118    .map(|(mean, std)| {
119        let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
120        (n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
121    })
122    .unwrap_or(1);
123
124    if n_threads == 1 {
125        return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, false);
126    }
127
128    let file_chunks: Vec<(usize, usize)> =
129        get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
130
131    let iter = file_chunks.into_par_iter().map(|(start, stop)| {
132        let bytes = &bytes[start..stop];
133
134        if comment_prefix.is_some() {
135            SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
136                .filter(|line| !is_comment_line(line, comment_prefix))
137                .count()
138        } else {
139            CountLines::new(quote_char, eol_char).count(bytes).0
140                + bytes.last().is_some_and(|x| *x != b'\n') as usize
141        }
142    });
143
144    let n: usize = POOL.install(|| iter.sum());
145
146    Ok(n)
147}
148
149/// Read the number of rows without parsing columns
150pub fn count_rows_from_slice(
151    mut bytes: &[u8],
152    quote_char: Option<u8>,
153    comment_prefix: Option<&CommentPrefix>,
154    eol_char: u8,
155    has_header: bool,
156) -> PolarsResult<usize> {
157    for _ in 0..bytes.len() {
158        if bytes[0] != eol_char {
159            break;
160        }
161
162        bytes = &bytes[1..];
163    }
164
165    let n = if comment_prefix.is_some() {
166        SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
167            .filter(|line| !is_comment_line(line, comment_prefix))
168            .count()
169    } else {
170        CountLines::new(quote_char, eol_char).count(bytes).0
171            + bytes.last().is_some_and(|x| *x != b'\n') as usize
172    };
173
174    Ok(n - (has_header as usize))
175}
176
177/// Skip the utf-8 Byte Order Mark.
178/// credits to csv-core
179pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
180    if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
181        &input[3..]
182    } else {
183        input
184    }
185}
186
187/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
188///
189/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
190#[inline]
191pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
192    match comment_prefix {
193        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
194        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
195        None => false,
196    }
197}
198
199/// Find the nearest next line position.
200/// Does not check for new line characters embedded in String fields.
201pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
202    let pos = memchr::memchr(eol_char, input)? + 1;
203    if input.len() - pos == 0 {
204        return None;
205    }
206    Some(pos)
207}
208
209pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
210    for _ in 0..skip {
211        if let Some(pos) = next_line_position_naive(input, eol_char) {
212            input = &input[pos..];
213        } else {
214            return input;
215        }
216    }
217    input
218}
219
220/// Find the nearest next line position that is not embedded in a String field.
221pub(super) fn next_line_position(
222    mut input: &[u8],
223    mut expected_fields: Option<usize>,
224    separator: u8,
225    quote_char: Option<u8>,
226    eol_char: u8,
227) -> Option<usize> {
228    fn accept_line(
229        line: &[u8],
230        expected_fields: usize,
231        separator: u8,
232        eol_char: u8,
233        quote_char: Option<u8>,
234    ) -> bool {
235        let mut count = 0usize;
236        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
237            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
238                return false;
239            }
240            count += 1;
241        }
242
243        // if the latest field is missing
244        // e.g.:
245        // a,b,c
246        // vala,valb,
247        // SplitFields returns a count that is 1 less
248        // There fore we accept:
249        // expected == count
250        // and
251        // expected == count - 1
252        expected_fields.wrapping_sub(count) <= 1
253    }
254
255    // we check 3 subsequent lines for `accept_line` before we accept
256    // if 3 groups are rejected we reject completely
257    let mut rejected_line_groups = 0u8;
258
259    let mut total_pos = 0;
260    if input.is_empty() {
261        return None;
262    }
263    let mut lines_checked = 0u8;
264    loop {
265        if rejected_line_groups >= 3 {
266            return None;
267        }
268        lines_checked = lines_checked.wrapping_add(1);
269        // headers might have an extra value
270        // So if we have churned through enough lines
271        // we try one field less.
272        if lines_checked == u8::MAX {
273            if let Some(ef) = expected_fields {
274                expected_fields = Some(ef.saturating_sub(1))
275            }
276        };
277        let pos = memchr::memchr(eol_char, input)? + 1;
278        if input.len() - pos == 0 {
279            return None;
280        }
281        debug_assert!(pos <= input.len());
282        let new_input = unsafe { input.get_unchecked(pos..) };
283        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
284        let line = lines.next();
285
286        match (line, expected_fields) {
287            // count the fields, and determine if they are equal to what we expect from the schema
288            (Some(line), Some(expected_fields)) => {
289                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
290                    let mut valid = true;
291                    for line in lines.take(2) {
292                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
293                            valid = false;
294                            break;
295                        }
296                    }
297                    if valid {
298                        return Some(total_pos + pos);
299                    } else {
300                        rejected_line_groups += 1;
301                    }
302                } else {
303                    debug_assert!(pos < input.len());
304                    unsafe {
305                        input = input.get_unchecked(pos + 1..);
306                    }
307                    total_pos += pos + 1;
308                }
309            },
310            // don't count the fields
311            (Some(_), None) => return Some(total_pos + pos),
312            // // no new line found, check latest line (without eol) for number of fields
313            _ => return None,
314        }
315    }
316}
317
318pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
319    b == eol_char || b == b'\r'
320}
321
322pub(super) fn is_whitespace(b: u8) -> bool {
323    b == b' ' || b == b'\t'
324}
325
326#[inline]
327fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
328where
329    F: Fn(u8) -> bool,
330{
331    if input.is_empty() {
332        return input;
333    }
334
335    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
336    &input[read..]
337}
338
339/// Remove whitespace from the start of buffer.
340/// Makes sure that the bytes stream starts with
341///     'field_1,field_2'
342/// and not with
343///     '\nfield_1,field_1'
344#[inline]
345pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
346    skip_condition(input, is_whitespace)
347}
348
349#[inline]
350pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
351    skip_condition(input, |b| is_line_ending(b, eol_char))
352}
353
354/// Get the mean and standard deviation of length of lines in bytes
355pub(super) fn get_line_stats(
356    bytes: &[u8],
357    n_lines: usize,
358    eol_char: u8,
359    expected_fields: Option<usize>,
360    separator: u8,
361    quote_char: Option<u8>,
362) -> Option<(f32, f32)> {
363    let mut lengths = Vec::with_capacity(n_lines);
364
365    let mut bytes_trunc;
366    let n_lines_per_iter = n_lines / 2;
367
368    let mut n_read = 0;
369
370    // sample from start and 75% in the file
371    for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
372        bytes_trunc = &bytes[offset..];
373        let pos = next_line_position(
374            bytes_trunc,
375            expected_fields,
376            separator,
377            quote_char,
378            eol_char,
379        )?;
380        bytes_trunc = &bytes_trunc[pos + 1..];
381
382        for _ in offset..(offset + n_lines_per_iter) {
383            let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
384            n_read += pos;
385            lengths.push(pos);
386            bytes_trunc = &bytes_trunc[pos..];
387        }
388    }
389
390    let n_samples = lengths.len();
391
392    let mean = (n_read as f32) / (n_samples as f32);
393    let mut std = 0.0;
394    for &len in lengths.iter() {
395        std += (len as f32 - mean).pow(2.0)
396    }
397    std = (std / n_samples as f32).sqrt();
398    Some((mean, std))
399}
400
401/// An adapted version of std::iter::Split.
402/// This exists solely because we cannot split the file in lines naively as
403///
404/// ```text
405///    for line in bytes.split(b'\n') {
406/// ```
407///
408/// This will fail when strings fields are have embedded end line characters.
409/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
410pub(super) struct SplitLines<'a> {
411    v: &'a [u8],
412    quote_char: u8,
413    eol_char: u8,
414    #[cfg(feature = "simd")]
415    simd_eol_char: SimdVec,
416    #[cfg(feature = "simd")]
417    simd_quote_char: SimdVec,
418    #[cfg(feature = "simd")]
419    previous_valid_eols: u64,
420    total_index: usize,
421    quoting: bool,
422    comment_prefix: Option<&'a CommentPrefix>,
423}
424
425#[cfg(feature = "simd")]
426const SIMD_SIZE: usize = 64;
427#[cfg(feature = "simd")]
428use std::simd::prelude::*;
429
430#[cfg(feature = "simd")]
431use polars_utils::clmul::prefix_xorsum_inclusive;
432
433#[cfg(feature = "simd")]
434type SimdVec = u8x64;
435
436impl<'a> SplitLines<'a> {
437    pub(super) fn new(
438        slice: &'a [u8],
439        quote_char: Option<u8>,
440        eol_char: u8,
441        comment_prefix: Option<&'a CommentPrefix>,
442    ) -> Self {
443        let quoting = quote_char.is_some();
444        let quote_char = quote_char.unwrap_or(b'\"');
445        #[cfg(feature = "simd")]
446        let simd_eol_char = SimdVec::splat(eol_char);
447        #[cfg(feature = "simd")]
448        let simd_quote_char = SimdVec::splat(quote_char);
449        Self {
450            v: slice,
451            quote_char,
452            eol_char,
453            #[cfg(feature = "simd")]
454            simd_eol_char,
455            #[cfg(feature = "simd")]
456            simd_quote_char,
457            #[cfg(feature = "simd")]
458            previous_valid_eols: 0,
459            total_index: 0,
460            quoting,
461            comment_prefix,
462        }
463    }
464}
465
466impl<'a> SplitLines<'a> {
467    // scalar as in non-simd
468    fn next_scalar(&mut self) -> Option<&'a [u8]> {
469        if self.v.is_empty() {
470            return None;
471        }
472        if is_comment_line(self.v, self.comment_prefix) {
473            return self.next_comment_line();
474        }
475        {
476            let mut pos = 0u32;
477            let mut iter = self.v.iter();
478            let mut in_field = false;
479            loop {
480                match iter.next() {
481                    Some(&c) => {
482                        pos += 1;
483
484                        if self.quoting && c == self.quote_char {
485                            // toggle between string field enclosure
486                            //      if we encounter a starting '"' -> in_field = true;
487                            //      if we encounter a closing '"' -> in_field = false;
488                            in_field = !in_field;
489                        }
490                        // if we are not in a string and we encounter '\n' we can stop at this position.
491                        else if c == self.eol_char && !in_field {
492                            break;
493                        }
494                    },
495                    None => {
496                        let remainder = self.v;
497                        self.v = &[];
498                        return Some(remainder);
499                    },
500                }
501            }
502
503            unsafe {
504                debug_assert!((pos as usize) <= self.v.len());
505
506                // return line up to this position
507                let ret = Some(
508                    self.v
509                        .get_unchecked(..(self.total_index + pos as usize - 1)),
510                );
511                // skip the '\n' token and update slice.
512                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
513                ret
514            }
515        }
516    }
517    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
518        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
519            unsafe {
520                // return line up to this position
521                let ret = Some(self.v.get_unchecked(..(pos - 1)));
522                // skip the '\n' token and update slice.
523                self.v = self.v.get_unchecked(pos..);
524                ret
525            }
526        } else {
527            let remainder = self.v;
528            self.v = &[];
529            Some(remainder)
530        }
531    }
532}
533
534impl<'a> Iterator for SplitLines<'a> {
535    type Item = &'a [u8];
536
537    #[inline]
538    #[cfg(not(feature = "simd"))]
539    fn next(&mut self) -> Option<&'a [u8]> {
540        self.next_scalar()
541    }
542
543    #[inline]
544    #[cfg(feature = "simd")]
545    fn next(&mut self) -> Option<&'a [u8]> {
546        // First check cached value
547        if self.previous_valid_eols != 0 {
548            let pos = self.previous_valid_eols.trailing_zeros() as usize;
549            self.previous_valid_eols >>= (pos + 1) as u64;
550
551            unsafe {
552                debug_assert!((pos) <= self.v.len());
553
554                // return line up to this position
555                let ret = Some(self.v.get_unchecked(..pos));
556                // skip the '\n' token and update slice.
557                self.v = self.v.get_unchecked(pos + 1..);
558                return ret;
559            }
560        }
561        if self.v.is_empty() {
562            return None;
563        }
564        if self.comment_prefix.is_some() {
565            return self.next_scalar();
566        }
567
568        self.total_index = 0;
569        let mut not_in_field_previous_iter = true;
570
571        loop {
572            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
573            if bytes.len() > SIMD_SIZE {
574                let lane: [u8; SIMD_SIZE] = unsafe {
575                    bytes
576                        .get_unchecked(0..SIMD_SIZE)
577                        .try_into()
578                        .unwrap_unchecked()
579                };
580                let simd_bytes = SimdVec::from(lane);
581                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
582
583                let valid_eols = if self.quoting {
584                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
585                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
586
587                    if not_in_field_previous_iter {
588                        not_in_quote_field = !not_in_quote_field;
589                    }
590                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
591                    eol_mask & not_in_quote_field
592                } else {
593                    eol_mask
594                };
595
596                if valid_eols != 0 {
597                    let pos = valid_eols.trailing_zeros() as usize;
598                    if pos == SIMD_SIZE - 1 {
599                        self.previous_valid_eols = 0;
600                    } else {
601                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
602                    }
603
604                    unsafe {
605                        let pos = self.total_index + pos;
606                        debug_assert!((pos) <= self.v.len());
607
608                        // return line up to this position
609                        let ret = Some(self.v.get_unchecked(..pos));
610                        // skip the '\n' token and update slice.
611                        self.v = self.v.get_unchecked(pos + 1..);
612                        return ret;
613                    }
614                } else {
615                    self.total_index += SIMD_SIZE;
616                }
617            } else {
618                // Denotes if we are in a string field, started with a quote
619                let mut in_field = !not_in_field_previous_iter;
620                let mut pos = 0u32;
621                let mut iter = bytes.iter();
622                loop {
623                    match iter.next() {
624                        Some(&c) => {
625                            pos += 1;
626
627                            if self.quoting && c == self.quote_char {
628                                // toggle between string field enclosure
629                                //      if we encounter a starting '"' -> in_field = true;
630                                //      if we encounter a closing '"' -> in_field = false;
631                                in_field = !in_field;
632                            }
633                            // if we are not in a string and we encounter '\n' we can stop at this position.
634                            else if c == self.eol_char && !in_field {
635                                break;
636                            }
637                        },
638                        None => {
639                            let remainder = self.v;
640                            self.v = &[];
641                            return Some(remainder);
642                        },
643                    }
644                }
645
646                unsafe {
647                    debug_assert!((pos as usize) <= self.v.len());
648
649                    // return line up to this position
650                    let ret = Some(
651                        self.v
652                            .get_unchecked(..(self.total_index + pos as usize - 1)),
653                    );
654                    // skip the '\n' token and update slice.
655                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
656                    return ret;
657                }
658            }
659        }
660    }
661}
662
663pub struct CountLines {
664    quote_char: u8,
665    eol_char: u8,
666    #[cfg(feature = "simd")]
667    simd_eol_char: SimdVec,
668    #[cfg(feature = "simd")]
669    simd_quote_char: SimdVec,
670    quoting: bool,
671}
672
673#[derive(Copy, Clone, Debug)]
674pub struct LineStats {
675    newline_count: usize,
676    last_newline_offset: usize,
677    end_inside_string: bool,
678}
679
680impl CountLines {
681    pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
682        let quoting = quote_char.is_some();
683        let quote_char = quote_char.unwrap_or(b'\"');
684        #[cfg(feature = "simd")]
685        let simd_eol_char = SimdVec::splat(eol_char);
686        #[cfg(feature = "simd")]
687        let simd_quote_char = SimdVec::splat(quote_char);
688        Self {
689            quote_char,
690            eol_char,
691            #[cfg(feature = "simd")]
692            simd_eol_char,
693            #[cfg(feature = "simd")]
694            simd_quote_char,
695            quoting,
696        }
697    }
698
699    /// Analyzes a chunk of CSV data.
700    ///
701    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
702    /// the first is assuming the start of the chunk is *not* inside a string,
703    /// the second assuming the start is inside a string.
704    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
705        let mut scan_offset = 0;
706        let mut states = [
707            LineStats {
708                newline_count: 0,
709                last_newline_offset: 0,
710                end_inside_string: false,
711            },
712            LineStats {
713                newline_count: 0,
714                last_newline_offset: 0,
715                end_inside_string: false,
716            },
717        ];
718
719        // false if even number of quotes seen so far, true otherwise.
720        #[allow(unused_assignments)]
721        let mut global_quote_parity = false;
722
723        #[cfg(feature = "simd")]
724        {
725            // 0 if even number of quotes seen so far, u64::MAX otherwise.
726            let mut global_quote_parity_mask = 0;
727            while scan_offset + 64 <= bytes.len() {
728                let block: [u8; 64] = unsafe {
729                    bytes
730                        .get_unchecked(scan_offset..scan_offset + 64)
731                        .try_into()
732                        .unwrap_unchecked()
733                };
734                let simd_bytes = SimdVec::from(block);
735                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
736                if self.quoting {
737                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
738                    let quote_parity =
739                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
740                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
741
742                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
743                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
744                    states[0].last_newline_offset = select_unpredictable(
745                        start_outside_string_eol_mask != 0,
746                        (scan_offset + 63)
747                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
748                        states[0].last_newline_offset,
749                    );
750
751                    let start_inside_string_eol_mask = eol_mask & quote_parity;
752                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
753                    states[1].last_newline_offset = select_unpredictable(
754                        start_inside_string_eol_mask != 0,
755                        (scan_offset + 63)
756                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
757                        states[1].last_newline_offset,
758                    );
759                } else {
760                    states[0].newline_count += eol_mask.count_ones() as usize;
761                    states[0].last_newline_offset = select_unpredictable(
762                        eol_mask != 0,
763                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
764                        states[0].last_newline_offset,
765                    );
766                }
767
768                scan_offset += 64;
769            }
770
771            global_quote_parity = global_quote_parity_mask > 0;
772        }
773
774        while scan_offset < bytes.len() {
775            let c = unsafe { *bytes.get_unchecked(scan_offset) };
776            global_quote_parity ^= (c == self.quote_char) & self.quoting;
777
778            let state = &mut states[global_quote_parity as usize];
779            state.newline_count += (c == self.eol_char) as usize;
780            state.last_newline_offset =
781                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
782
783            scan_offset += 1;
784        }
785
786        states[0].end_inside_string = global_quote_parity;
787        states[1].end_inside_string = !global_quote_parity;
788        states
789    }
790
791    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
792        loop {
793            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
794
795            let (count, offset) = self.count(b);
796
797            if count > 0 || b.len() == bytes.len() {
798                return (count, offset);
799            }
800
801            *chunk_size *= 2;
802        }
803    }
804
805    /// Returns count and offset to split for remainder in slice.
806    #[cfg(feature = "simd")]
807    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
808        let mut total_idx = 0;
809        let original_bytes = bytes;
810        let mut count = 0;
811        let mut position = 0;
812        let mut not_in_field_previous_iter = true;
813
814        loop {
815            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
816
817            if bytes.len() > SIMD_SIZE {
818                let lane: [u8; SIMD_SIZE] = unsafe {
819                    bytes
820                        .get_unchecked(0..SIMD_SIZE)
821                        .try_into()
822                        .unwrap_unchecked()
823                };
824                let simd_bytes = SimdVec::from(lane);
825                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
826
827                let valid_eols = if self.quoting {
828                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
829                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
830
831                    if not_in_field_previous_iter {
832                        not_in_quote_field = !not_in_quote_field;
833                    }
834                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
835                    eol_mask & not_in_quote_field
836                } else {
837                    eol_mask
838                };
839
840                if valid_eols != 0 {
841                    count += valid_eols.count_ones() as usize;
842                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
843                    debug_assert_eq!(original_bytes[position], self.eol_char)
844                }
845                total_idx += SIMD_SIZE;
846            } else if bytes.is_empty() {
847                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
848                return (count, position);
849            } else {
850                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
851
852                let (count, position) = if c > 0 {
853                    (count + c, total_idx + o)
854                } else {
855                    (count, position)
856                };
857                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
858
859                return (count, position);
860            }
861        }
862    }
863
864    #[cfg(not(feature = "simd"))]
865    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
866        self.count_no_simd(bytes, false)
867    }
868
869    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
870        let iter = bytes.iter();
871        let mut in_field = in_field;
872        let mut count = 0;
873        let mut position = 0;
874
875        for b in iter {
876            let c = *b;
877            if self.quoting && c == self.quote_char {
878                // toggle between string field enclosure
879                //      if we encounter a starting '"' -> in_field = true;
880                //      if we encounter a closing '"' -> in_field = false;
881                in_field = !in_field;
882            }
883            // If we are not in a string and we encounter '\n' we can stop at this position.
884            else if c == self.eol_char && !in_field {
885                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
886                count += 1;
887            }
888        }
889        debug_assert!(count == 0 || bytes[position] == self.eol_char);
890
891        (count, position)
892    }
893}
894
895#[inline]
896fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
897    let mut in_field = false;
898
899    let mut idx = 0u32;
900    // micro optimizations
901    #[allow(clippy::explicit_counter_loop)]
902    for &c in bytes.iter() {
903        if c == quote_char {
904            // toggle between string field enclosure
905            //      if we encounter a starting '"' -> in_field = true;
906            //      if we encounter a closing '"' -> in_field = false;
907            in_field = !in_field;
908        }
909
910        if !in_field && c == needle {
911            return Some(idx as usize);
912        }
913        idx += 1;
914    }
915    None
916}
917
918#[inline]
919pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
920    let pos = match quote {
921        Some(quote) => find_quoted(bytes, quote, eol_char),
922        None => bytes.iter().position(|x| *x == eol_char),
923    };
924    match pos {
925        None => &[],
926        Some(pos) => &bytes[pos + 1..],
927    }
928}
929
930#[inline]
931pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
932    if let Some(pos) = next_line_position_naive(input, eol_char) {
933        unsafe { input.get_unchecked(pos..) }
934    } else {
935        &[]
936    }
937}
938
939/// Parse CSV.
940///
941/// # Arguments
942/// * `bytes` - input to parse
943/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
944///   thread has a different offset.
945/// * `projection` - Indices of the columns to project.
946/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
947///   fields are written to the buffers. The UTF8 data will be parsed later.
948///
949/// Returns the number of bytes parsed successfully.
950#[allow(clippy::too_many_arguments)]
951pub(super) fn parse_lines(
952    mut bytes: &[u8],
953    parse_options: &CsvParseOptions,
954    offset: usize,
955    ignore_errors: bool,
956    null_values: Option<&NullValuesCompiled>,
957    projection: &[usize],
958    buffers: &mut [Buffer],
959    n_lines: usize,
960    // length of original schema
961    schema_len: usize,
962    schema: &Schema,
963) -> PolarsResult<usize> {
964    assert!(
965        !projection.is_empty(),
966        "at least one column should be projected"
967    );
968    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
969    // During projection pushdown we are not checking other csv fields.
970    // This would be very expensive and we don't care as we only want
971    // the projected columns.
972    if projection.len() != schema_len {
973        truncate_ragged_lines = true
974    }
975
976    // we use the pointers to track the no of bytes read.
977    let start = bytes.as_ptr() as usize;
978    let original_bytes_len = bytes.len();
979    let n_lines = n_lines as u32;
980
981    let mut line_count = 0u32;
982    loop {
983        if line_count > n_lines {
984            let end = bytes.as_ptr() as usize;
985            return Ok(end - start);
986        }
987
988        if bytes.is_empty() {
989            return Ok(original_bytes_len);
990        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
991            // deal with comments
992            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
993            bytes = bytes_rem;
994            continue;
995        }
996
997        // Every line we only need to parse the columns that are projected.
998        // Therefore we check if the idx of the field is in our projected columns.
999        // If it is not, we skip the field.
1000        let mut projection_iter = projection.iter().copied();
1001        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1002        let mut processed_fields = 0;
1003
1004        let mut iter = SplitFields::new(
1005            bytes,
1006            parse_options.separator,
1007            parse_options.quote_char,
1008            parse_options.eol_char,
1009        );
1010        let mut idx = 0u32;
1011        let mut read_sol = 0;
1012        loop {
1013            match iter.next() {
1014                // end of line
1015                None => {
1016                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1017                    break;
1018                },
1019                Some((mut field, needs_escaping)) => {
1020                    let field_len = field.len();
1021
1022                    // +1 is the split character that is consumed by the iterator.
1023                    read_sol += field_len + 1;
1024
1025                    if idx == next_projected as u32 {
1026                        // the iterator is finished when it encounters a `\n`
1027                        // this could be preceded by a '\r'
1028                        unsafe {
1029                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1030                                field = field.get_unchecked(..field_len - 1);
1031                            }
1032                        }
1033
1034                        debug_assert!(processed_fields < buffers.len());
1035                        let buf = unsafe {
1036                            // SAFETY: processed fields index can never exceed the projection indices.
1037                            buffers.get_unchecked_mut(processed_fields)
1038                        };
1039                        let mut add_null = false;
1040
1041                        // if we have null values argument, check if this field equal null value
1042                        if let Some(null_values) = null_values {
1043                            let field = if needs_escaping && !field.is_empty() {
1044                                unsafe { field.get_unchecked(1..field.len() - 1) }
1045                            } else {
1046                                field
1047                            };
1048
1049                            // SAFETY:
1050                            // process fields is in bounds
1051                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1052                        }
1053                        if add_null {
1054                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1055                        } else {
1056                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1057                                .map_err(|e| {
1058                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1059                                    let unparsable = String::from_utf8_lossy(field);
1060                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1061                                    polars_err!(
1062                                        ComputeError:
1063                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1064                                        The current offset in the file is {} bytes.\n\
1065                                        \n\
1066                                        You might want to try:\n\
1067                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1068                                        - specifying correct dtype with the `schema_overrides` argument\n\
1069                                        - setting `ignore_errors` to `True`,\n\
1070                                        - adding `{}` to the `null_values` list.\n\n\
1071                                        Original error: ```{}```",
1072                                        &unparsable,
1073                                        buf.dtype(),
1074                                        column_name,
1075                                        idx + 1,
1076                                        bytes_offset,
1077                                        &unparsable,
1078                                        e
1079                                    )
1080                                })?;
1081                        }
1082                        processed_fields += 1;
1083
1084                        // if we have all projected columns we are done with this line
1085                        match projection_iter.next() {
1086                            Some(p) => next_projected = p,
1087                            None => {
1088                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1089                                    bytes = &bytes[read_sol..];
1090                                } else {
1091                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1092                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1093
1094Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1095                                    }
1096                                    let bytes_rem = skip_this_line(
1097                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1098                                        parse_options.quote_char,
1099                                        parse_options.eol_char,
1100                                    );
1101                                    bytes = bytes_rem;
1102                                }
1103                                break;
1104                            },
1105                        }
1106                    }
1107                    idx += 1;
1108                },
1109            }
1110        }
1111
1112        // there can be lines that miss fields (also the comma values)
1113        // this means the splitter won't process them.
1114        // We traverse them to read them as null values.
1115        while processed_fields < projection.len() {
1116            debug_assert!(processed_fields < buffers.len());
1117            let buf = unsafe {
1118                // SAFETY: processed fields index can never exceed the projection indices.
1119                buffers.get_unchecked_mut(processed_fields)
1120            };
1121            buf.add_null(!parse_options.missing_is_null);
1122            processed_fields += 1;
1123        }
1124        line_count += 1;
1125    }
1126}
1127
1128#[cfg(test)]
1129mod test {
1130    use super::SplitLines;
1131
1132    #[test]
1133    fn test_splitlines() {
1134        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1135        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1136        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1137        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1138        assert_eq!(lines.next(), None);
1139
1140        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1141        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1142        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1143        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1144        assert_eq!(lines2.next(), None);
1145    }
1146}