Skip to main content

polars_io/csv/read/
parser.rs

1use std::cmp;
2
3use memchr::memchr2_iter;
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::pl_path::PlRefPath;
10use polars_utils::select::select_unpredictable;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::builder::Builder;
15use super::options::{CommentPrefix, NullValuesCompiled};
16use super::splitfields::SplitFields;
17use crate::csv::read::read_until_start_and_infer_schema;
18use crate::prelude::CsvReadOptions;
19use crate::utils::compression::CompressedReader;
20
21/// Read the number of rows without parsing columns
22/// useful for count(*) queries
23#[allow(clippy::too_many_arguments)]
24pub fn count_rows(
25    path: PlRefPath,
26    quote_char: Option<u8>,
27    comment_prefix: Option<&CommentPrefix>,
28    eol_char: u8,
29    has_header: bool,
30    skip_lines: usize,
31    skip_rows_before_header: usize,
32    skip_rows_after_header: usize,
33) -> PolarsResult<usize> {
34    let file = if path.has_scheme() || config::force_async() {
35        feature_gated!("cloud", {
36            crate::file_cache::FILE_CACHE
37                .get_entry(path)
38                // Safety: This was initialized by schema inference.
39                .unwrap()
40                .try_open_assume_latest()?
41        })
42    } else {
43        polars_utils::open_file(path.as_std_path())?
44    };
45
46    let mmap = MMapSemaphore::new_from_file(&file).unwrap();
47
48    count_rows_from_slice_par(
49        Buffer::from_owner(mmap),
50        quote_char,
51        comment_prefix,
52        eol_char,
53        has_header,
54        skip_lines,
55        skip_rows_before_header,
56        skip_rows_after_header,
57    )
58}
59
60/// Read the number of rows without parsing columns
61/// useful for count(*) queries
62#[allow(clippy::too_many_arguments)]
63pub fn count_rows_from_slice_par(
64    buffer: Buffer<u8>,
65    quote_char: Option<u8>,
66    comment_prefix: Option<&CommentPrefix>,
67    eol_char: u8,
68    has_header: bool,
69    skip_lines: usize,
70    skip_rows_before_header: usize,
71    skip_rows_after_header: usize,
72) -> PolarsResult<usize> {
73    let mut reader = CompressedReader::try_new(buffer)?;
74
75    let reader_options = CsvReadOptions {
76        parse_options: Arc::new(CsvParseOptions {
77            quote_char,
78            comment_prefix: comment_prefix.cloned(),
79            eol_char,
80            ..Default::default()
81        }),
82        has_header,
83        skip_lines,
84        skip_rows: skip_rows_before_header,
85        skip_rows_after_header,
86        ..Default::default()
87    };
88
89    let (_, mut leftover) =
90        read_until_start_and_infer_schema(&reader_options, None, None, &mut reader)?;
91
92    const BYTES_PER_CHUNK: usize = if cfg!(debug_assertions) {
93        128
94    } else {
95        512 * 1024
96    };
97
98    let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
99    POOL.install(|| {
100        let mut states = Vec::new();
101        let eof_unterminated_row;
102
103        if comment_prefix.is_none() {
104            let mut last_slice = Buffer::new();
105            let mut err = None;
106
107            let streaming_iter = std::iter::from_fn(|| {
108                let (slice, read_n) = match reader.read_next_slice(&leftover, BYTES_PER_CHUNK) {
109                    Ok(tup) => tup,
110                    Err(e) => {
111                        err = Some(e);
112                        return None;
113                    },
114                };
115
116                leftover = Buffer::new();
117                if slice.is_empty() && read_n == 0 {
118                    return None;
119                }
120
121                last_slice = slice.clone();
122                Some(slice)
123            });
124
125            states = streaming_iter
126                .enumerate()
127                .par_bridge()
128                .map(|(id, slice)| (count.analyze_chunk(&slice), id))
129                .collect::<Vec<_>>();
130
131            if let Some(e) = err {
132                return Err(e.into());
133            }
134
135            // par_bridge does not guarantee order, but is mostly sorted so `slice::sort` is a
136            // decent fit.
137            states.sort_by_key(|(_, id)| *id);
138
139            // Technically this is broken if the input has a comment line at the end that is longer
140            // than `BYTES_PER_CHUNK`, but in practice this ought to be fine.
141            eof_unterminated_row = ends_in_unterminated_row(&last_slice, eol_char, comment_prefix);
142        } else {
143            // For the non-compressed case this is a zero-copy op.
144            // TODO: Implement streaming chunk logic.
145            let (bytes, _) = reader.read_next_slice(&leftover, usize::MAX)?;
146
147            let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
148            (0..num_chunks)
149                .into_par_iter()
150                .map(|chunk_idx| {
151                    let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
152                    let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
153
154                    if start_offset != 0 {
155                        // Ensure we start at the start of a line.
156                        if let Some(nl_off) = bytes[start_offset..next_start_offset]
157                            .iter()
158                            .position(|b| *b == eol_char)
159                        {
160                            start_offset += nl_off + 1;
161                        } else {
162                            return (count.analyze_chunk(&[]), 0);
163                        }
164                    }
165
166                    let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
167                        .iter()
168                        .position(|b| *b == eol_char)
169                    {
170                        next_start_offset + nl_off + 1
171                    } else {
172                        bytes.len()
173                    };
174
175                    (count.analyze_chunk(&bytes[start_offset..stop_offset]), 0)
176                })
177                .collect_into_vec(&mut states);
178
179            eof_unterminated_row = ends_in_unterminated_row(&bytes, eol_char, comment_prefix);
180        }
181
182        let mut n = 0;
183        let mut in_string = false;
184        for (pair, _) in states {
185            n += pair[in_string as usize].newline_count;
186            in_string = pair[in_string as usize].end_inside_string;
187        }
188        n += eof_unterminated_row as usize;
189
190        Ok(n)
191    })
192}
193
194/// Checks if a line in a CSV file is a comment based on the given comment prefix configuration.
195///
196/// This function is used during CSV parsing to determine whether a line should be ignored based on its starting characters.
197#[inline]
198pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
199    match comment_prefix {
200        Some(CommentPrefix::Single(c)) => line.first() == Some(c),
201        Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
202        None => false,
203    }
204}
205
206/// Find the nearest next line position.
207/// Does not check for new line characters embedded in String fields.
208pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
209    let pos = memchr::memchr(eol_char, input)? + 1;
210    if input.len() - pos == 0 {
211        return None;
212    }
213    Some(pos)
214}
215
216/// Find the nearest next line position that is not embedded in a String field.
217pub(super) fn next_line_position(
218    mut input: &[u8],
219    mut expected_fields: Option<usize>,
220    separator: u8,
221    quote_char: Option<u8>,
222    eol_char: u8,
223) -> Option<usize> {
224    fn accept_line(
225        line: &[u8],
226        expected_fields: usize,
227        separator: u8,
228        eol_char: u8,
229        quote_char: Option<u8>,
230    ) -> bool {
231        let mut count = 0usize;
232        for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
233            if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
234                return false;
235            }
236            count += 1;
237        }
238
239        // if the latest field is missing
240        // e.g.:
241        // a,b,c
242        // vala,valb,
243        // SplitFields returns a count that is 1 less
244        // There fore we accept:
245        // expected == count
246        // and
247        // expected == count - 1
248        expected_fields.wrapping_sub(count) <= 1
249    }
250
251    // we check 3 subsequent lines for `accept_line` before we accept
252    // if 3 groups are rejected we reject completely
253    let mut rejected_line_groups = 0u8;
254
255    let mut total_pos = 0;
256    if input.is_empty() {
257        return None;
258    }
259    let mut lines_checked = 0u8;
260    loop {
261        if rejected_line_groups >= 3 {
262            return None;
263        }
264        lines_checked = lines_checked.wrapping_add(1);
265        // headers might have an extra value
266        // So if we have churned through enough lines
267        // we try one field less.
268        if lines_checked == u8::MAX {
269            if let Some(ef) = expected_fields {
270                expected_fields = Some(ef.saturating_sub(1))
271            }
272        };
273        let pos = memchr::memchr(eol_char, input)? + 1;
274        if input.len() - pos == 0 {
275            return None;
276        }
277        debug_assert!(pos <= input.len());
278        let new_input = unsafe { input.get_unchecked(pos..) };
279        let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
280        let line = lines.next();
281
282        match (line, expected_fields) {
283            // count the fields, and determine if they are equal to what we expect from the schema
284            (Some(line), Some(expected_fields)) => {
285                if accept_line(line, expected_fields, separator, eol_char, quote_char) {
286                    let mut valid = true;
287                    for line in lines.take(2) {
288                        if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
289                            valid = false;
290                            break;
291                        }
292                    }
293                    if valid {
294                        return Some(total_pos + pos);
295                    } else {
296                        rejected_line_groups += 1;
297                    }
298                } else {
299                    debug_assert!(pos < input.len());
300                    unsafe {
301                        input = input.get_unchecked(pos + 1..);
302                    }
303                    total_pos += pos + 1;
304                }
305            },
306            // don't count the fields
307            (Some(_), None) => return Some(total_pos + pos),
308            // // no new line found, check latest line (without eol) for number of fields
309            _ => return None,
310        }
311    }
312}
313
314#[inline(always)]
315pub(super) fn is_whitespace(b: u8) -> bool {
316    b == b' ' || b == b'\t'
317}
318
319/// May have false-positives, but not false negatives.
320#[inline(always)]
321pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
322    // We're interested in \t (ASCII 9) and " " (ASCII 32), both of which are
323    // <= 32. In that range there aren't a lot of other common symbols (besides
324    // newline), so this is a quick test which can be worth doing to avoid the
325    // exact test.
326    b <= 32
327}
328
329#[inline]
330fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
331where
332    F: Fn(u8) -> bool,
333{
334    if input.is_empty() {
335        return input;
336    }
337
338    let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
339    &input[read..]
340}
341
342/// Remove whitespace from the start of buffer.
343/// Makes sure that the bytes stream starts with
344///     'field_1,field_2'
345/// and not with
346///     '\nfield_1,field_1'
347#[inline]
348pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
349    skip_condition(input, is_whitespace)
350}
351
352/// An adapted version of std::iter::Split.
353/// This exists solely because we cannot split the file in lines naively as
354///
355/// ```text
356///    for line in bytes.split(b'\n') {
357/// ```
358///
359/// This will fail when strings fields are have embedded end line characters.
360/// For instance: "This is a valid field\nI have multiples lines" is a valid string field, that contains multiple lines.
361pub struct SplitLines<'a> {
362    v: &'a [u8],
363    quote_char: u8,
364    eol_char: u8,
365    #[cfg(feature = "simd")]
366    simd_eol_char: SimdVec,
367    #[cfg(feature = "simd")]
368    simd_quote_char: SimdVec,
369    #[cfg(feature = "simd")]
370    previous_valid_eols: u64,
371    total_index: usize,
372    quoting: bool,
373    comment_prefix: Option<&'a CommentPrefix>,
374}
375
376#[cfg(feature = "simd")]
377const SIMD_SIZE: usize = 64;
378#[cfg(feature = "simd")]
379use std::simd::prelude::*;
380
381#[cfg(feature = "simd")]
382use polars_utils::clmul::prefix_xorsum_inclusive;
383
384#[cfg(feature = "simd")]
385type SimdVec = u8x64;
386
387impl<'a> SplitLines<'a> {
388    pub fn new(
389        slice: &'a [u8],
390        quote_char: Option<u8>,
391        eol_char: u8,
392        comment_prefix: Option<&'a CommentPrefix>,
393    ) -> Self {
394        let quoting = quote_char.is_some();
395        let quote_char = quote_char.unwrap_or(b'\"');
396        #[cfg(feature = "simd")]
397        let simd_eol_char = SimdVec::splat(eol_char);
398        #[cfg(feature = "simd")]
399        let simd_quote_char = SimdVec::splat(quote_char);
400        Self {
401            v: slice,
402            quote_char,
403            eol_char,
404            #[cfg(feature = "simd")]
405            simd_eol_char,
406            #[cfg(feature = "simd")]
407            simd_quote_char,
408            #[cfg(feature = "simd")]
409            previous_valid_eols: 0,
410            total_index: 0,
411            quoting,
412            comment_prefix,
413        }
414    }
415}
416
417impl<'a> SplitLines<'a> {
418    // scalar as in non-simd
419    fn next_scalar(&mut self) -> Option<&'a [u8]> {
420        if self.v.is_empty() {
421            return None;
422        }
423        if is_comment_line(self.v, self.comment_prefix) {
424            return self.next_comment_line();
425        }
426        {
427            let mut pos = 0u32;
428            let mut iter = self.v.iter();
429            let mut in_field = false;
430            loop {
431                match iter.next() {
432                    Some(&c) => {
433                        pos += 1;
434
435                        if self.quoting && c == self.quote_char {
436                            // toggle between string field enclosure
437                            //      if we encounter a starting '"' -> in_field = true;
438                            //      if we encounter a closing '"' -> in_field = false;
439                            in_field = !in_field;
440                        }
441                        // if we are not in a string and we encounter '\n' we can stop at this position.
442                        else if c == self.eol_char && !in_field {
443                            break;
444                        }
445                    },
446                    None => {
447                        let remainder = self.v;
448                        self.v = &[];
449                        return Some(remainder);
450                    },
451                }
452            }
453
454            unsafe {
455                debug_assert!((pos as usize) <= self.v.len());
456
457                // return line up to this position
458                let ret = Some(
459                    self.v
460                        .get_unchecked(..(self.total_index + pos as usize - 1)),
461                );
462                // skip the '\n' token and update slice.
463                self.v = self.v.get_unchecked(self.total_index + pos as usize..);
464                ret
465            }
466        }
467    }
468    fn next_comment_line(&mut self) -> Option<&'a [u8]> {
469        if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
470            unsafe {
471                // return line up to this position
472                let ret = Some(self.v.get_unchecked(..(pos - 1)));
473                // skip the '\n' token and update slice.
474                self.v = self.v.get_unchecked(pos..);
475                ret
476            }
477        } else {
478            let remainder = self.v;
479            self.v = &[];
480            Some(remainder)
481        }
482    }
483}
484
485impl<'a> Iterator for SplitLines<'a> {
486    type Item = &'a [u8];
487
488    #[inline]
489    #[cfg(not(feature = "simd"))]
490    fn next(&mut self) -> Option<&'a [u8]> {
491        self.next_scalar()
492    }
493
494    #[inline]
495    #[cfg(feature = "simd")]
496    fn next(&mut self) -> Option<&'a [u8]> {
497        // First check cached value
498        if self.previous_valid_eols != 0 {
499            let pos = self.previous_valid_eols.trailing_zeros() as usize;
500            self.previous_valid_eols >>= (pos + 1) as u64;
501
502            unsafe {
503                debug_assert!((pos) <= self.v.len());
504
505                // return line up to this position
506                let ret = Some(self.v.get_unchecked(..pos));
507                // skip the '\n' token and update slice.
508                self.v = self.v.get_unchecked(pos + 1..);
509                return ret;
510            }
511        }
512        if self.v.is_empty() {
513            return None;
514        }
515        if self.comment_prefix.is_some() {
516            return self.next_scalar();
517        }
518
519        self.total_index = 0;
520        let mut not_in_field_previous_iter = true;
521
522        loop {
523            let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
524            if bytes.len() > SIMD_SIZE {
525                let lane: [u8; SIMD_SIZE] = unsafe {
526                    bytes
527                        .get_unchecked(0..SIMD_SIZE)
528                        .try_into()
529                        .unwrap_unchecked()
530                };
531                let simd_bytes = SimdVec::from(lane);
532                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
533
534                let valid_eols = if self.quoting {
535                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
536                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
537
538                    if not_in_field_previous_iter {
539                        not_in_quote_field = !not_in_quote_field;
540                    }
541                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
542                    eol_mask & not_in_quote_field
543                } else {
544                    eol_mask
545                };
546
547                if valid_eols != 0 {
548                    let pos = valid_eols.trailing_zeros() as usize;
549                    if pos == SIMD_SIZE - 1 {
550                        self.previous_valid_eols = 0;
551                    } else {
552                        self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
553                    }
554
555                    unsafe {
556                        let pos = self.total_index + pos;
557                        debug_assert!((pos) <= self.v.len());
558
559                        // return line up to this position
560                        let ret = Some(self.v.get_unchecked(..pos));
561                        // skip the '\n' token and update slice.
562                        self.v = self.v.get_unchecked(pos + 1..);
563                        return ret;
564                    }
565                } else {
566                    self.total_index += SIMD_SIZE;
567                }
568            } else {
569                // Denotes if we are in a string field, started with a quote
570                let mut in_field = !not_in_field_previous_iter;
571                let mut pos = 0u32;
572                let mut iter = bytes.iter();
573                loop {
574                    match iter.next() {
575                        Some(&c) => {
576                            pos += 1;
577
578                            if self.quoting && c == self.quote_char {
579                                // toggle between string field enclosure
580                                //      if we encounter a starting '"' -> in_field = true;
581                                //      if we encounter a closing '"' -> in_field = false;
582                                in_field = !in_field;
583                            }
584                            // if we are not in a string and we encounter '\n' we can stop at this position.
585                            else if c == self.eol_char && !in_field {
586                                break;
587                            }
588                        },
589                        None => {
590                            let remainder = self.v;
591                            self.v = &[];
592                            return Some(remainder);
593                        },
594                    }
595                }
596
597                unsafe {
598                    debug_assert!((pos as usize) <= self.v.len());
599
600                    // return line up to this position
601                    let ret = Some(
602                        self.v
603                            .get_unchecked(..(self.total_index + pos as usize - 1)),
604                    );
605                    // skip the '\n' token and update slice.
606                    self.v = self.v.get_unchecked(self.total_index + pos as usize..);
607                    return ret;
608                }
609            }
610        }
611    }
612}
613
614pub struct CountLines {
615    quote_char: u8,
616    eol_char: u8,
617    #[cfg(feature = "simd")]
618    simd_eol_char: SimdVec,
619    #[cfg(feature = "simd")]
620    simd_quote_char: SimdVec,
621    quoting: bool,
622    comment_prefix: Option<CommentPrefix>,
623}
624
625#[derive(Copy, Clone, Debug, Default)]
626pub struct LineStats {
627    pub newline_count: usize,
628    pub last_newline_offset: usize,
629    pub end_inside_string: bool,
630}
631
632impl CountLines {
633    pub fn new(
634        quote_char: Option<u8>,
635        eol_char: u8,
636        comment_prefix: Option<CommentPrefix>,
637    ) -> Self {
638        let quoting = quote_char.is_some();
639        let quote_char = quote_char.unwrap_or(b'\"');
640        #[cfg(feature = "simd")]
641        let simd_eol_char = SimdVec::splat(eol_char);
642        #[cfg(feature = "simd")]
643        let simd_quote_char = SimdVec::splat(quote_char);
644        Self {
645            quote_char,
646            eol_char,
647            #[cfg(feature = "simd")]
648            simd_eol_char,
649            #[cfg(feature = "simd")]
650            simd_quote_char,
651            quoting,
652            comment_prefix,
653        }
654    }
655
656    /// Analyzes a chunk of CSV data.
657    ///
658    /// Returns (newline_count, last_newline_offset, end_inside_string) twice,
659    /// the first is assuming the start of the chunk is *not* inside a string,
660    /// the second assuming the start is inside a string.
661    ///
662    /// If comment_prefix is not None the start of bytes must be at the start of
663    /// a line (and thus not in the middle of a comment).
664    pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
665        let mut states = [
666            LineStats {
667                newline_count: 0,
668                last_newline_offset: 0,
669                end_inside_string: false,
670            },
671            LineStats {
672                newline_count: 0,
673                last_newline_offset: 0,
674                end_inside_string: false,
675            },
676        ];
677
678        // If we have to deal with comments we can't use SIMD and have to explicitly do two passes.
679        if self.comment_prefix.is_some() {
680            states[0] = self.analyze_chunk_with_comment(bytes, false);
681            states[1] = self.analyze_chunk_with_comment(bytes, true);
682            return states;
683        }
684
685        // False if even number of quotes seen so far, true otherwise.
686        #[allow(unused_assignments)]
687        let mut global_quote_parity = false;
688        let mut scan_offset = 0;
689
690        #[cfg(feature = "simd")]
691        {
692            // 0 if even number of quotes seen so far, u64::MAX otherwise.
693            let mut global_quote_parity_mask = 0;
694            while scan_offset + 64 <= bytes.len() {
695                let block: [u8; 64] = unsafe {
696                    bytes
697                        .get_unchecked(scan_offset..scan_offset + 64)
698                        .try_into()
699                        .unwrap_unchecked()
700                };
701                let simd_bytes = SimdVec::from(block);
702                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
703                if self.quoting {
704                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
705                    let quote_parity =
706                        prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
707                    global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
708
709                    let start_outside_string_eol_mask = eol_mask & !quote_parity;
710                    states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
711                    states[0].last_newline_offset = select_unpredictable(
712                        start_outside_string_eol_mask != 0,
713                        (scan_offset + 63)
714                            .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
715                        states[0].last_newline_offset,
716                    );
717
718                    let start_inside_string_eol_mask = eol_mask & quote_parity;
719                    states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
720                    states[1].last_newline_offset = select_unpredictable(
721                        start_inside_string_eol_mask != 0,
722                        (scan_offset + 63)
723                            .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
724                        states[1].last_newline_offset,
725                    );
726                } else {
727                    states[0].newline_count += eol_mask.count_ones() as usize;
728                    states[0].last_newline_offset = select_unpredictable(
729                        eol_mask != 0,
730                        (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
731                        states[0].last_newline_offset,
732                    );
733                }
734
735                scan_offset += 64;
736            }
737
738            global_quote_parity = global_quote_parity_mask > 0;
739        }
740
741        while scan_offset < bytes.len() {
742            let c = unsafe { *bytes.get_unchecked(scan_offset) };
743            global_quote_parity ^= (c == self.quote_char) & self.quoting;
744
745            let state = &mut states[global_quote_parity as usize];
746            state.newline_count += (c == self.eol_char) as usize;
747            state.last_newline_offset =
748                select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
749
750            scan_offset += 1;
751        }
752
753        states[0].end_inside_string = global_quote_parity;
754        states[1].end_inside_string = !global_quote_parity;
755        states
756    }
757
758    // bytes must begin at the start of a line.
759    fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
760        let pre_s = match self.comment_prefix.as_ref().unwrap() {
761            CommentPrefix::Single(pc) => core::slice::from_ref(pc),
762            CommentPrefix::Multi(ps) => ps.as_bytes(),
763        };
764
765        let mut state = LineStats::default();
766        let mut scan_offset = 0;
767        while scan_offset < bytes.len() {
768            // Skip comment line if needed.
769            while bytes[scan_offset..].starts_with(pre_s) {
770                scan_offset += pre_s.len();
771                let Some(nl_off) = bytes[scan_offset..]
772                    .iter()
773                    .position(|c| *c == self.eol_char)
774                else {
775                    break;
776                };
777                scan_offset += nl_off + 1;
778            }
779
780            while scan_offset < bytes.len() {
781                let c = unsafe { *bytes.get_unchecked(scan_offset) };
782                in_string ^= (c == self.quote_char) & self.quoting;
783
784                if c == self.eol_char && !in_string {
785                    state.newline_count += 1;
786                    state.last_newline_offset = scan_offset;
787                    scan_offset += 1;
788                    break;
789                } else {
790                    scan_offset += 1;
791                }
792            }
793        }
794
795        state.end_inside_string = in_string;
796        state
797    }
798
799    pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
800        loop {
801            let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
802
803            let (count, offset) = if self.comment_prefix.is_some() {
804                let stats = self.analyze_chunk_with_comment(b, false);
805                (stats.newline_count, stats.last_newline_offset)
806            } else {
807                self.count(b)
808            };
809
810            if count > 0 || b.len() == bytes.len() {
811                return (count, offset);
812            }
813
814            *chunk_size = chunk_size.saturating_mul(2);
815        }
816    }
817
818    pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
819        let stats = if self.comment_prefix.is_some() {
820            self.analyze_chunk_with_comment(bytes, false)
821        } else {
822            self.analyze_chunk(bytes)[0]
823        };
824
825        let mut count = stats.newline_count;
826        let mut offset = stats.last_newline_offset;
827
828        if count > 0 {
829            offset = cmp::min(offset + 1, bytes.len());
830        } else {
831            debug_assert!(offset == 0);
832        }
833
834        if is_eof {
835            count += ends_in_unterminated_row(bytes, self.eol_char, self.comment_prefix.as_ref())
836                as usize;
837            offset = bytes.len();
838        }
839
840        (count, offset)
841    }
842
843    /// Returns count and offset to split for remainder in slice.
844    #[cfg(feature = "simd")]
845    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
846        let mut total_idx = 0;
847        let original_bytes = bytes;
848        let mut count = 0;
849        let mut position = 0;
850        let mut not_in_field_previous_iter = true;
851
852        loop {
853            let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
854
855            if bytes.len() > SIMD_SIZE {
856                let lane: [u8; SIMD_SIZE] = unsafe {
857                    bytes
858                        .get_unchecked(0..SIMD_SIZE)
859                        .try_into()
860                        .unwrap_unchecked()
861                };
862                let simd_bytes = SimdVec::from(lane);
863                let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
864
865                let valid_eols = if self.quoting {
866                    let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
867                    let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
868
869                    if not_in_field_previous_iter {
870                        not_in_quote_field = !not_in_quote_field;
871                    }
872                    not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
873                    eol_mask & not_in_quote_field
874                } else {
875                    eol_mask
876                };
877
878                if valid_eols != 0 {
879                    count += valid_eols.count_ones() as usize;
880                    position = total_idx + 63 - valid_eols.leading_zeros() as usize;
881                    debug_assert_eq!(original_bytes[position], self.eol_char)
882                }
883                total_idx += SIMD_SIZE;
884            } else if bytes.is_empty() {
885                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
886                return (count, position);
887            } else {
888                let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
889
890                let (count, position) = if c > 0 {
891                    (count + c, total_idx + o)
892                } else {
893                    (count, position)
894                };
895                debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
896
897                return (count, position);
898            }
899        }
900    }
901
902    #[cfg(not(feature = "simd"))]
903    pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
904        self.count_no_simd(bytes, false)
905    }
906
907    fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
908        let iter = bytes.iter();
909        let mut in_field = in_field;
910        let mut count = 0;
911        let mut position = 0;
912
913        for b in iter {
914            let c = *b;
915            if self.quoting && c == self.quote_char {
916                // toggle between string field enclosure
917                //      if we encounter a starting '"' -> in_field = true;
918                //      if we encounter a closing '"' -> in_field = false;
919                in_field = !in_field;
920            }
921            // If we are not in a string and we encounter '\n' we can stop at this position.
922            else if c == self.eol_char && !in_field {
923                position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
924                count += 1;
925            }
926        }
927        debug_assert!(count == 0 || bytes[position] == self.eol_char);
928
929        (count, position)
930    }
931}
932
933fn ends_in_unterminated_row(
934    bytes: &[u8],
935    eol_char: u8,
936    comment_prefix: Option<&CommentPrefix>,
937) -> bool {
938    if !bytes.is_empty() && bytes.last().copied().unwrap() != eol_char {
939        // We can do a simple backwards-scan to find the start of last line if it is a
940        // comment line, since comment lines can't escape new-lines.
941        let last_new_line_post = memchr::memrchr(eol_char, bytes).unwrap_or(0);
942        let last_line_is_comment_line = bytes
943            .get(last_new_line_post + 1..)
944            .map(|line| is_comment_line(line, comment_prefix))
945            .unwrap_or(false);
946
947        return !last_line_is_comment_line;
948    }
949
950    false
951}
952
953#[inline]
954fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
955    let mut in_field = false;
956
957    let mut idx = 0u32;
958    // micro optimizations
959    #[allow(clippy::explicit_counter_loop)]
960    for &c in bytes.iter() {
961        if c == quote_char {
962            // toggle between string field enclosure
963            //      if we encounter a starting '"' -> in_field = true;
964            //      if we encounter a closing '"' -> in_field = false;
965            in_field = !in_field;
966        }
967
968        if !in_field && c == needle {
969            return Some(idx as usize);
970        }
971        idx += 1;
972    }
973    None
974}
975
976#[inline]
977pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
978    let pos = match quote {
979        Some(quote) => find_quoted(bytes, quote, eol_char),
980        None => bytes.iter().position(|x| *x == eol_char),
981    };
982    match pos {
983        None => &[],
984        Some(pos) => &bytes[pos + 1..],
985    }
986}
987
988#[inline]
989pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
990    if let Some(pos) = next_line_position_naive(input, eol_char) {
991        unsafe { input.get_unchecked(pos..) }
992    } else {
993        &[]
994    }
995}
996
997/// Parse CSV.
998///
999/// # Arguments
1000/// * `bytes` - input to parse
1001/// * `offset` - offset in bytes in total input. This is 0 if single threaded. If multi-threaded every
1002///   thread has a different offset.
1003/// * `projection` - Indices of the columns to project.
1004/// * `buffers` - Parsed output will be written to these buffers. Except for UTF8 data. The offsets of the
1005///   fields are written to the buffers. The UTF8 data will be parsed later.
1006///
1007/// Returns the number of bytes parsed successfully.
1008#[allow(clippy::too_many_arguments)]
1009pub(super) fn parse_lines(
1010    mut bytes: &[u8],
1011    parse_options: &CsvParseOptions,
1012    offset: usize,
1013    ignore_errors: bool,
1014    null_values: Option<&NullValuesCompiled>,
1015    projection: &[usize],
1016    buffers: &mut [Builder],
1017    n_lines: usize,
1018    // length of original schema
1019    schema_len: usize,
1020    schema: &Schema,
1021) -> PolarsResult<usize> {
1022    assert!(
1023        !projection.is_empty(),
1024        "at least one column should be projected"
1025    );
1026    let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
1027    // During projection pushdown we are not checking other csv fields.
1028    // This would be very expensive and we don't care as we only want
1029    // the projected columns.
1030    if projection.len() != schema_len {
1031        truncate_ragged_lines = true
1032    }
1033
1034    // we use the pointers to track the no of bytes read.
1035    let start = bytes.as_ptr() as usize;
1036    let original_bytes_len = bytes.len();
1037    let n_lines = n_lines as u32;
1038
1039    let mut line_count = 0u32;
1040    loop {
1041        if line_count > n_lines {
1042            let end = bytes.as_ptr() as usize;
1043            return Ok(end - start);
1044        }
1045
1046        if bytes.is_empty() {
1047            return Ok(original_bytes_len);
1048        } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1049            // deal with comments
1050            let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1051            bytes = bytes_rem;
1052            continue;
1053        }
1054
1055        // Every line we only need to parse the columns that are projected.
1056        // Therefore we check if the idx of the field is in our projected columns.
1057        // If it is not, we skip the field.
1058        let mut projection_iter = projection.iter().copied();
1059        let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1060        let mut processed_fields = 0;
1061
1062        let mut iter = SplitFields::new(
1063            bytes,
1064            parse_options.separator,
1065            parse_options.quote_char,
1066            parse_options.eol_char,
1067        );
1068        let mut idx = 0u32;
1069        let mut read_sol = 0;
1070        loop {
1071            match iter.next() {
1072                // end of line
1073                None => {
1074                    bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1075                    break;
1076                },
1077                Some((mut field, needs_escaping)) => {
1078                    let field_len = field.len();
1079
1080                    // +1 is the split character that is consumed by the iterator.
1081                    read_sol += field_len + 1;
1082
1083                    if idx == next_projected as u32 {
1084                        // the iterator is finished when it encounters a `\n`
1085                        // this could be preceded by a '\r'
1086                        unsafe {
1087                            if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1088                                field = field.get_unchecked(..field_len - 1);
1089                            }
1090                        }
1091
1092                        debug_assert!(processed_fields < buffers.len());
1093                        let buf = unsafe {
1094                            // SAFETY: processed fields index can never exceed the projection indices.
1095                            buffers.get_unchecked_mut(processed_fields)
1096                        };
1097                        let mut add_null = false;
1098
1099                        // if we have null values argument, check if this field equal null value
1100                        if let Some(null_values) = null_values {
1101                            let field = if needs_escaping && !field.is_empty() {
1102                                unsafe { field.get_unchecked(1..field.len() - 1) }
1103                            } else {
1104                                field
1105                            };
1106
1107                            // SAFETY:
1108                            // process fields is in bounds
1109                            add_null = unsafe { null_values.is_null(field, idx as usize) }
1110                        }
1111                        if add_null {
1112                            buf.add_null(!parse_options.missing_is_null && field.is_empty())
1113                        } else {
1114                            buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1115                                .map_err(|e| {
1116                                    let bytes_offset = offset + field.as_ptr() as usize - start;
1117                                    let unparsable = String::from_utf8_lossy(field);
1118                                    let column_name = schema.get_at_index(idx as usize).unwrap().0;
1119                                    polars_err!(
1120                                        ComputeError:
1121                                        "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1122                                        The current offset in the file is {} bytes.\n\
1123                                        \n\
1124                                        You might want to try:\n\
1125                                        - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1126                                        - specifying correct dtype with the `schema_overrides` argument\n\
1127                                        - setting `ignore_errors` to `True`,\n\
1128                                        - adding `{}` to the `null_values` list.\n\n\
1129                                        Original error: ```{}```",
1130                                        &unparsable,
1131                                        buf.dtype(),
1132                                        column_name,
1133                                        idx + 1,
1134                                        bytes_offset,
1135                                        &unparsable,
1136                                        e
1137                                    )
1138                                })?;
1139                        }
1140                        processed_fields += 1;
1141
1142                        // if we have all projected columns we are done with this line
1143                        match projection_iter.next() {
1144                            Some(p) => next_projected = p,
1145                            None => {
1146                                if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1147                                    bytes = unsafe { bytes.get_unchecked(read_sol..) };
1148                                } else {
1149                                    if !truncate_ragged_lines && read_sol < bytes.len() {
1150                                        polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1151
1152Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1153                                    }
1154                                    let bytes_rem = skip_this_line(
1155                                        unsafe { bytes.get_unchecked(read_sol - 1..) },
1156                                        parse_options.quote_char,
1157                                        parse_options.eol_char,
1158                                    );
1159                                    bytes = bytes_rem;
1160                                }
1161                                break;
1162                            },
1163                        }
1164                    }
1165                    idx += 1;
1166                },
1167            }
1168        }
1169
1170        // there can be lines that miss fields (also the comma values)
1171        // this means the splitter won't process them.
1172        // We traverse them to read them as null values.
1173        while processed_fields < projection.len() {
1174            debug_assert!(processed_fields < buffers.len());
1175            let buf = unsafe {
1176                // SAFETY: processed fields index can never exceed the projection indices.
1177                buffers.get_unchecked_mut(processed_fields)
1178            };
1179            buf.add_null(!parse_options.missing_is_null);
1180            processed_fields += 1;
1181        }
1182        line_count += 1;
1183    }
1184}
1185
1186#[cfg(test)]
1187mod test {
1188    use super::SplitLines;
1189
1190    #[test]
1191    fn test_splitlines() {
1192        let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1193        let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1194        assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1195        assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1196        assert_eq!(lines.next(), None);
1197
1198        let input2 = "1,'foo\n'\n2,'foo\n'\n";
1199        let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1200        assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1201        assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1202        assert_eq!(lines2.next(), None);
1203    }
1204}