1use std::cmp;
2
3use memchr::memchr2_iter;
4use polars_buffer::Buffer;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::pl_path::PlRefPath;
10use polars_utils::select::select_unpredictable;
11use rayon::prelude::*;
12
13use super::CsvParseOptions;
14use super::builder::Builder;
15use super::options::{CommentPrefix, NullValuesCompiled};
16use super::splitfields::SplitFields;
17use crate::csv::read::read_until_start_and_infer_schema;
18use crate::prelude::CsvReadOptions;
19use crate::utils::compression::CompressedReader;
20
21#[allow(clippy::too_many_arguments)]
24pub fn count_rows(
25 path: PlRefPath,
26 quote_char: Option<u8>,
27 comment_prefix: Option<&CommentPrefix>,
28 eol_char: u8,
29 has_header: bool,
30 skip_lines: usize,
31 skip_rows_before_header: usize,
32 skip_rows_after_header: usize,
33) -> PolarsResult<usize> {
34 let file = if path.has_scheme() || config::force_async() {
35 feature_gated!("cloud", {
36 crate::file_cache::FILE_CACHE
37 .get_entry(path)
38 .unwrap()
40 .try_open_assume_latest()?
41 })
42 } else {
43 polars_utils::open_file(path.as_std_path())?
44 };
45
46 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
47
48 count_rows_from_slice_par(
49 Buffer::from_owner(mmap),
50 quote_char,
51 comment_prefix,
52 eol_char,
53 has_header,
54 skip_lines,
55 skip_rows_before_header,
56 skip_rows_after_header,
57 )
58}
59
60#[allow(clippy::too_many_arguments)]
63pub fn count_rows_from_slice_par(
64 buffer: Buffer<u8>,
65 quote_char: Option<u8>,
66 comment_prefix: Option<&CommentPrefix>,
67 eol_char: u8,
68 has_header: bool,
69 skip_lines: usize,
70 skip_rows_before_header: usize,
71 skip_rows_after_header: usize,
72) -> PolarsResult<usize> {
73 let mut reader = CompressedReader::try_new(buffer)?;
74
75 let reader_options = CsvReadOptions {
76 parse_options: Arc::new(CsvParseOptions {
77 quote_char,
78 comment_prefix: comment_prefix.cloned(),
79 eol_char,
80 ..Default::default()
81 }),
82 has_header,
83 skip_lines,
84 skip_rows: skip_rows_before_header,
85 skip_rows_after_header,
86 ..Default::default()
87 };
88
89 let (_, mut leftover) =
90 read_until_start_and_infer_schema(&reader_options, None, None, &mut reader)?;
91
92 const BYTES_PER_CHUNK: usize = if cfg!(debug_assertions) {
93 128
94 } else {
95 512 * 1024
96 };
97
98 let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
99 POOL.install(|| {
100 let mut states = Vec::new();
101 let eof_unterminated_row;
102
103 if comment_prefix.is_none() {
104 let mut last_slice = Buffer::new();
105 let mut err = None;
106
107 let streaming_iter = std::iter::from_fn(|| {
108 let (slice, read_n) = match reader.read_next_slice(&leftover, BYTES_PER_CHUNK) {
109 Ok(tup) => tup,
110 Err(e) => {
111 err = Some(e);
112 return None;
113 },
114 };
115
116 leftover = Buffer::new();
117 if slice.is_empty() && read_n == 0 {
118 return None;
119 }
120
121 last_slice = slice.clone();
122 Some(slice)
123 });
124
125 states = streaming_iter
126 .enumerate()
127 .par_bridge()
128 .map(|(id, slice)| (count.analyze_chunk(&slice), id))
129 .collect::<Vec<_>>();
130
131 if let Some(e) = err {
132 return Err(e.into());
133 }
134
135 states.sort_by_key(|(_, id)| *id);
138
139 eof_unterminated_row = ends_in_unterminated_row(&last_slice, eol_char, comment_prefix);
142 } else {
143 let (bytes, _) = reader.read_next_slice(&leftover, usize::MAX)?;
146
147 let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
148 (0..num_chunks)
149 .into_par_iter()
150 .map(|chunk_idx| {
151 let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
152 let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
153
154 if start_offset != 0 {
155 if let Some(nl_off) = bytes[start_offset..next_start_offset]
157 .iter()
158 .position(|b| *b == eol_char)
159 {
160 start_offset += nl_off + 1;
161 } else {
162 return (count.analyze_chunk(&[]), 0);
163 }
164 }
165
166 let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
167 .iter()
168 .position(|b| *b == eol_char)
169 {
170 next_start_offset + nl_off + 1
171 } else {
172 bytes.len()
173 };
174
175 (count.analyze_chunk(&bytes[start_offset..stop_offset]), 0)
176 })
177 .collect_into_vec(&mut states);
178
179 eof_unterminated_row = ends_in_unterminated_row(&bytes, eol_char, comment_prefix);
180 }
181
182 let mut n = 0;
183 let mut in_string = false;
184 for (pair, _) in states {
185 n += pair[in_string as usize].newline_count;
186 in_string = pair[in_string as usize].end_inside_string;
187 }
188 n += eof_unterminated_row as usize;
189
190 Ok(n)
191 })
192}
193
194#[inline]
198pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
199 match comment_prefix {
200 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
201 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
202 None => false,
203 }
204}
205
206pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
209 let pos = memchr::memchr(eol_char, input)? + 1;
210 if input.len() - pos == 0 {
211 return None;
212 }
213 Some(pos)
214}
215
216pub(super) fn next_line_position(
218 mut input: &[u8],
219 mut expected_fields: Option<usize>,
220 separator: u8,
221 quote_char: Option<u8>,
222 eol_char: u8,
223) -> Option<usize> {
224 fn accept_line(
225 line: &[u8],
226 expected_fields: usize,
227 separator: u8,
228 eol_char: u8,
229 quote_char: Option<u8>,
230 ) -> bool {
231 let mut count = 0usize;
232 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
233 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
234 return false;
235 }
236 count += 1;
237 }
238
239 expected_fields.wrapping_sub(count) <= 1
249 }
250
251 let mut rejected_line_groups = 0u8;
254
255 let mut total_pos = 0;
256 if input.is_empty() {
257 return None;
258 }
259 let mut lines_checked = 0u8;
260 loop {
261 if rejected_line_groups >= 3 {
262 return None;
263 }
264 lines_checked = lines_checked.wrapping_add(1);
265 if lines_checked == u8::MAX {
269 if let Some(ef) = expected_fields {
270 expected_fields = Some(ef.saturating_sub(1))
271 }
272 };
273 let pos = memchr::memchr(eol_char, input)? + 1;
274 if input.len() - pos == 0 {
275 return None;
276 }
277 debug_assert!(pos <= input.len());
278 let new_input = unsafe { input.get_unchecked(pos..) };
279 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
280 let line = lines.next();
281
282 match (line, expected_fields) {
283 (Some(line), Some(expected_fields)) => {
285 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
286 let mut valid = true;
287 for line in lines.take(2) {
288 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
289 valid = false;
290 break;
291 }
292 }
293 if valid {
294 return Some(total_pos + pos);
295 } else {
296 rejected_line_groups += 1;
297 }
298 } else {
299 debug_assert!(pos < input.len());
300 unsafe {
301 input = input.get_unchecked(pos + 1..);
302 }
303 total_pos += pos + 1;
304 }
305 },
306 (Some(_), None) => return Some(total_pos + pos),
308 _ => return None,
310 }
311 }
312}
313
314#[inline(always)]
315pub(super) fn is_whitespace(b: u8) -> bool {
316 b == b' ' || b == b'\t'
317}
318
319#[inline(always)]
321pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
322 b <= 32
327}
328
329#[inline]
330fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
331where
332 F: Fn(u8) -> bool,
333{
334 if input.is_empty() {
335 return input;
336 }
337
338 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
339 &input[read..]
340}
341
342#[inline]
348pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
349 skip_condition(input, is_whitespace)
350}
351
352pub struct SplitLines<'a> {
362 v: &'a [u8],
363 quote_char: u8,
364 eol_char: u8,
365 #[cfg(feature = "simd")]
366 simd_eol_char: SimdVec,
367 #[cfg(feature = "simd")]
368 simd_quote_char: SimdVec,
369 #[cfg(feature = "simd")]
370 previous_valid_eols: u64,
371 total_index: usize,
372 quoting: bool,
373 comment_prefix: Option<&'a CommentPrefix>,
374}
375
376#[cfg(feature = "simd")]
377const SIMD_SIZE: usize = 64;
378#[cfg(feature = "simd")]
379use std::simd::prelude::*;
380
381#[cfg(feature = "simd")]
382use polars_utils::clmul::prefix_xorsum_inclusive;
383
384#[cfg(feature = "simd")]
385type SimdVec = u8x64;
386
387impl<'a> SplitLines<'a> {
388 pub fn new(
389 slice: &'a [u8],
390 quote_char: Option<u8>,
391 eol_char: u8,
392 comment_prefix: Option<&'a CommentPrefix>,
393 ) -> Self {
394 let quoting = quote_char.is_some();
395 let quote_char = quote_char.unwrap_or(b'\"');
396 #[cfg(feature = "simd")]
397 let simd_eol_char = SimdVec::splat(eol_char);
398 #[cfg(feature = "simd")]
399 let simd_quote_char = SimdVec::splat(quote_char);
400 Self {
401 v: slice,
402 quote_char,
403 eol_char,
404 #[cfg(feature = "simd")]
405 simd_eol_char,
406 #[cfg(feature = "simd")]
407 simd_quote_char,
408 #[cfg(feature = "simd")]
409 previous_valid_eols: 0,
410 total_index: 0,
411 quoting,
412 comment_prefix,
413 }
414 }
415}
416
417impl<'a> SplitLines<'a> {
418 fn next_scalar(&mut self) -> Option<&'a [u8]> {
420 if self.v.is_empty() {
421 return None;
422 }
423 if is_comment_line(self.v, self.comment_prefix) {
424 return self.next_comment_line();
425 }
426 {
427 let mut pos = 0u32;
428 let mut iter = self.v.iter();
429 let mut in_field = false;
430 loop {
431 match iter.next() {
432 Some(&c) => {
433 pos += 1;
434
435 if self.quoting && c == self.quote_char {
436 in_field = !in_field;
440 }
441 else if c == self.eol_char && !in_field {
443 break;
444 }
445 },
446 None => {
447 let remainder = self.v;
448 self.v = &[];
449 return Some(remainder);
450 },
451 }
452 }
453
454 unsafe {
455 debug_assert!((pos as usize) <= self.v.len());
456
457 let ret = Some(
459 self.v
460 .get_unchecked(..(self.total_index + pos as usize - 1)),
461 );
462 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
464 ret
465 }
466 }
467 }
468 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
469 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
470 unsafe {
471 let ret = Some(self.v.get_unchecked(..(pos - 1)));
473 self.v = self.v.get_unchecked(pos..);
475 ret
476 }
477 } else {
478 let remainder = self.v;
479 self.v = &[];
480 Some(remainder)
481 }
482 }
483}
484
485impl<'a> Iterator for SplitLines<'a> {
486 type Item = &'a [u8];
487
488 #[inline]
489 #[cfg(not(feature = "simd"))]
490 fn next(&mut self) -> Option<&'a [u8]> {
491 self.next_scalar()
492 }
493
494 #[inline]
495 #[cfg(feature = "simd")]
496 fn next(&mut self) -> Option<&'a [u8]> {
497 if self.previous_valid_eols != 0 {
499 let pos = self.previous_valid_eols.trailing_zeros() as usize;
500 self.previous_valid_eols >>= (pos + 1) as u64;
501
502 unsafe {
503 debug_assert!((pos) <= self.v.len());
504
505 let ret = Some(self.v.get_unchecked(..pos));
507 self.v = self.v.get_unchecked(pos + 1..);
509 return ret;
510 }
511 }
512 if self.v.is_empty() {
513 return None;
514 }
515 if self.comment_prefix.is_some() {
516 return self.next_scalar();
517 }
518
519 self.total_index = 0;
520 let mut not_in_field_previous_iter = true;
521
522 loop {
523 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
524 if bytes.len() > SIMD_SIZE {
525 let lane: [u8; SIMD_SIZE] = unsafe {
526 bytes
527 .get_unchecked(0..SIMD_SIZE)
528 .try_into()
529 .unwrap_unchecked()
530 };
531 let simd_bytes = SimdVec::from(lane);
532 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
533
534 let valid_eols = if self.quoting {
535 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
536 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
537
538 if not_in_field_previous_iter {
539 not_in_quote_field = !not_in_quote_field;
540 }
541 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
542 eol_mask & not_in_quote_field
543 } else {
544 eol_mask
545 };
546
547 if valid_eols != 0 {
548 let pos = valid_eols.trailing_zeros() as usize;
549 if pos == SIMD_SIZE - 1 {
550 self.previous_valid_eols = 0;
551 } else {
552 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
553 }
554
555 unsafe {
556 let pos = self.total_index + pos;
557 debug_assert!((pos) <= self.v.len());
558
559 let ret = Some(self.v.get_unchecked(..pos));
561 self.v = self.v.get_unchecked(pos + 1..);
563 return ret;
564 }
565 } else {
566 self.total_index += SIMD_SIZE;
567 }
568 } else {
569 let mut in_field = !not_in_field_previous_iter;
571 let mut pos = 0u32;
572 let mut iter = bytes.iter();
573 loop {
574 match iter.next() {
575 Some(&c) => {
576 pos += 1;
577
578 if self.quoting && c == self.quote_char {
579 in_field = !in_field;
583 }
584 else if c == self.eol_char && !in_field {
586 break;
587 }
588 },
589 None => {
590 let remainder = self.v;
591 self.v = &[];
592 return Some(remainder);
593 },
594 }
595 }
596
597 unsafe {
598 debug_assert!((pos as usize) <= self.v.len());
599
600 let ret = Some(
602 self.v
603 .get_unchecked(..(self.total_index + pos as usize - 1)),
604 );
605 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
607 return ret;
608 }
609 }
610 }
611 }
612}
613
614pub struct CountLines {
615 quote_char: u8,
616 eol_char: u8,
617 #[cfg(feature = "simd")]
618 simd_eol_char: SimdVec,
619 #[cfg(feature = "simd")]
620 simd_quote_char: SimdVec,
621 quoting: bool,
622 comment_prefix: Option<CommentPrefix>,
623}
624
625#[derive(Copy, Clone, Debug, Default)]
626pub struct LineStats {
627 pub newline_count: usize,
628 pub last_newline_offset: usize,
629 pub end_inside_string: bool,
630}
631
632impl CountLines {
633 pub fn new(
634 quote_char: Option<u8>,
635 eol_char: u8,
636 comment_prefix: Option<CommentPrefix>,
637 ) -> Self {
638 let quoting = quote_char.is_some();
639 let quote_char = quote_char.unwrap_or(b'\"');
640 #[cfg(feature = "simd")]
641 let simd_eol_char = SimdVec::splat(eol_char);
642 #[cfg(feature = "simd")]
643 let simd_quote_char = SimdVec::splat(quote_char);
644 Self {
645 quote_char,
646 eol_char,
647 #[cfg(feature = "simd")]
648 simd_eol_char,
649 #[cfg(feature = "simd")]
650 simd_quote_char,
651 quoting,
652 comment_prefix,
653 }
654 }
655
656 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
665 let mut states = [
666 LineStats {
667 newline_count: 0,
668 last_newline_offset: 0,
669 end_inside_string: false,
670 },
671 LineStats {
672 newline_count: 0,
673 last_newline_offset: 0,
674 end_inside_string: false,
675 },
676 ];
677
678 if self.comment_prefix.is_some() {
680 states[0] = self.analyze_chunk_with_comment(bytes, false);
681 states[1] = self.analyze_chunk_with_comment(bytes, true);
682 return states;
683 }
684
685 #[allow(unused_assignments)]
687 let mut global_quote_parity = false;
688 let mut scan_offset = 0;
689
690 #[cfg(feature = "simd")]
691 {
692 let mut global_quote_parity_mask = 0;
694 while scan_offset + 64 <= bytes.len() {
695 let block: [u8; 64] = unsafe {
696 bytes
697 .get_unchecked(scan_offset..scan_offset + 64)
698 .try_into()
699 .unwrap_unchecked()
700 };
701 let simd_bytes = SimdVec::from(block);
702 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
703 if self.quoting {
704 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
705 let quote_parity =
706 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
707 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
708
709 let start_outside_string_eol_mask = eol_mask & !quote_parity;
710 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
711 states[0].last_newline_offset = select_unpredictable(
712 start_outside_string_eol_mask != 0,
713 (scan_offset + 63)
714 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
715 states[0].last_newline_offset,
716 );
717
718 let start_inside_string_eol_mask = eol_mask & quote_parity;
719 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
720 states[1].last_newline_offset = select_unpredictable(
721 start_inside_string_eol_mask != 0,
722 (scan_offset + 63)
723 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
724 states[1].last_newline_offset,
725 );
726 } else {
727 states[0].newline_count += eol_mask.count_ones() as usize;
728 states[0].last_newline_offset = select_unpredictable(
729 eol_mask != 0,
730 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
731 states[0].last_newline_offset,
732 );
733 }
734
735 scan_offset += 64;
736 }
737
738 global_quote_parity = global_quote_parity_mask > 0;
739 }
740
741 while scan_offset < bytes.len() {
742 let c = unsafe { *bytes.get_unchecked(scan_offset) };
743 global_quote_parity ^= (c == self.quote_char) & self.quoting;
744
745 let state = &mut states[global_quote_parity as usize];
746 state.newline_count += (c == self.eol_char) as usize;
747 state.last_newline_offset =
748 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
749
750 scan_offset += 1;
751 }
752
753 states[0].end_inside_string = global_quote_parity;
754 states[1].end_inside_string = !global_quote_parity;
755 states
756 }
757
758 fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
760 let pre_s = match self.comment_prefix.as_ref().unwrap() {
761 CommentPrefix::Single(pc) => core::slice::from_ref(pc),
762 CommentPrefix::Multi(ps) => ps.as_bytes(),
763 };
764
765 let mut state = LineStats::default();
766 let mut scan_offset = 0;
767 while scan_offset < bytes.len() {
768 while bytes[scan_offset..].starts_with(pre_s) {
770 scan_offset += pre_s.len();
771 let Some(nl_off) = bytes[scan_offset..]
772 .iter()
773 .position(|c| *c == self.eol_char)
774 else {
775 break;
776 };
777 scan_offset += nl_off + 1;
778 }
779
780 while scan_offset < bytes.len() {
781 let c = unsafe { *bytes.get_unchecked(scan_offset) };
782 in_string ^= (c == self.quote_char) & self.quoting;
783
784 if c == self.eol_char && !in_string {
785 state.newline_count += 1;
786 state.last_newline_offset = scan_offset;
787 scan_offset += 1;
788 break;
789 } else {
790 scan_offset += 1;
791 }
792 }
793 }
794
795 state.end_inside_string = in_string;
796 state
797 }
798
799 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
800 loop {
801 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
802
803 let (count, offset) = if self.comment_prefix.is_some() {
804 let stats = self.analyze_chunk_with_comment(b, false);
805 (stats.newline_count, stats.last_newline_offset)
806 } else {
807 self.count(b)
808 };
809
810 if count > 0 || b.len() == bytes.len() {
811 return (count, offset);
812 }
813
814 *chunk_size = chunk_size.saturating_mul(2);
815 }
816 }
817
818 pub fn count_rows(&self, bytes: &[u8], is_eof: bool) -> (usize, usize) {
819 let stats = if self.comment_prefix.is_some() {
820 self.analyze_chunk_with_comment(bytes, false)
821 } else {
822 self.analyze_chunk(bytes)[0]
823 };
824
825 let mut count = stats.newline_count;
826 let mut offset = stats.last_newline_offset;
827
828 if count > 0 {
829 offset = cmp::min(offset + 1, bytes.len());
830 } else {
831 debug_assert!(offset == 0);
832 }
833
834 if is_eof {
835 count += ends_in_unterminated_row(bytes, self.eol_char, self.comment_prefix.as_ref())
836 as usize;
837 offset = bytes.len();
838 }
839
840 (count, offset)
841 }
842
843 #[cfg(feature = "simd")]
845 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
846 let mut total_idx = 0;
847 let original_bytes = bytes;
848 let mut count = 0;
849 let mut position = 0;
850 let mut not_in_field_previous_iter = true;
851
852 loop {
853 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
854
855 if bytes.len() > SIMD_SIZE {
856 let lane: [u8; SIMD_SIZE] = unsafe {
857 bytes
858 .get_unchecked(0..SIMD_SIZE)
859 .try_into()
860 .unwrap_unchecked()
861 };
862 let simd_bytes = SimdVec::from(lane);
863 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
864
865 let valid_eols = if self.quoting {
866 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
867 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
868
869 if not_in_field_previous_iter {
870 not_in_quote_field = !not_in_quote_field;
871 }
872 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
873 eol_mask & not_in_quote_field
874 } else {
875 eol_mask
876 };
877
878 if valid_eols != 0 {
879 count += valid_eols.count_ones() as usize;
880 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
881 debug_assert_eq!(original_bytes[position], self.eol_char)
882 }
883 total_idx += SIMD_SIZE;
884 } else if bytes.is_empty() {
885 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
886 return (count, position);
887 } else {
888 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
889
890 let (count, position) = if c > 0 {
891 (count + c, total_idx + o)
892 } else {
893 (count, position)
894 };
895 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
896
897 return (count, position);
898 }
899 }
900 }
901
902 #[cfg(not(feature = "simd"))]
903 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
904 self.count_no_simd(bytes, false)
905 }
906
907 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
908 let iter = bytes.iter();
909 let mut in_field = in_field;
910 let mut count = 0;
911 let mut position = 0;
912
913 for b in iter {
914 let c = *b;
915 if self.quoting && c == self.quote_char {
916 in_field = !in_field;
920 }
921 else if c == self.eol_char && !in_field {
923 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
924 count += 1;
925 }
926 }
927 debug_assert!(count == 0 || bytes[position] == self.eol_char);
928
929 (count, position)
930 }
931}
932
933fn ends_in_unterminated_row(
934 bytes: &[u8],
935 eol_char: u8,
936 comment_prefix: Option<&CommentPrefix>,
937) -> bool {
938 if !bytes.is_empty() && bytes.last().copied().unwrap() != eol_char {
939 let last_new_line_post = memchr::memrchr(eol_char, bytes).unwrap_or(0);
942 let last_line_is_comment_line = bytes
943 .get(last_new_line_post + 1..)
944 .map(|line| is_comment_line(line, comment_prefix))
945 .unwrap_or(false);
946
947 return !last_line_is_comment_line;
948 }
949
950 false
951}
952
953#[inline]
954fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
955 let mut in_field = false;
956
957 let mut idx = 0u32;
958 #[allow(clippy::explicit_counter_loop)]
960 for &c in bytes.iter() {
961 if c == quote_char {
962 in_field = !in_field;
966 }
967
968 if !in_field && c == needle {
969 return Some(idx as usize);
970 }
971 idx += 1;
972 }
973 None
974}
975
976#[inline]
977pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
978 let pos = match quote {
979 Some(quote) => find_quoted(bytes, quote, eol_char),
980 None => bytes.iter().position(|x| *x == eol_char),
981 };
982 match pos {
983 None => &[],
984 Some(pos) => &bytes[pos + 1..],
985 }
986}
987
988#[inline]
989pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
990 if let Some(pos) = next_line_position_naive(input, eol_char) {
991 unsafe { input.get_unchecked(pos..) }
992 } else {
993 &[]
994 }
995}
996
997#[allow(clippy::too_many_arguments)]
1009pub(super) fn parse_lines(
1010 mut bytes: &[u8],
1011 parse_options: &CsvParseOptions,
1012 offset: usize,
1013 ignore_errors: bool,
1014 null_values: Option<&NullValuesCompiled>,
1015 projection: &[usize],
1016 buffers: &mut [Builder],
1017 n_lines: usize,
1018 schema_len: usize,
1020 schema: &Schema,
1021) -> PolarsResult<usize> {
1022 assert!(
1023 !projection.is_empty(),
1024 "at least one column should be projected"
1025 );
1026 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
1027 if projection.len() != schema_len {
1031 truncate_ragged_lines = true
1032 }
1033
1034 let start = bytes.as_ptr() as usize;
1036 let original_bytes_len = bytes.len();
1037 let n_lines = n_lines as u32;
1038
1039 let mut line_count = 0u32;
1040 loop {
1041 if line_count > n_lines {
1042 let end = bytes.as_ptr() as usize;
1043 return Ok(end - start);
1044 }
1045
1046 if bytes.is_empty() {
1047 return Ok(original_bytes_len);
1048 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1049 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1051 bytes = bytes_rem;
1052 continue;
1053 }
1054
1055 let mut projection_iter = projection.iter().copied();
1059 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1060 let mut processed_fields = 0;
1061
1062 let mut iter = SplitFields::new(
1063 bytes,
1064 parse_options.separator,
1065 parse_options.quote_char,
1066 parse_options.eol_char,
1067 );
1068 let mut idx = 0u32;
1069 let mut read_sol = 0;
1070 loop {
1071 match iter.next() {
1072 None => {
1074 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1075 break;
1076 },
1077 Some((mut field, needs_escaping)) => {
1078 let field_len = field.len();
1079
1080 read_sol += field_len + 1;
1082
1083 if idx == next_projected as u32 {
1084 unsafe {
1087 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1088 field = field.get_unchecked(..field_len - 1);
1089 }
1090 }
1091
1092 debug_assert!(processed_fields < buffers.len());
1093 let buf = unsafe {
1094 buffers.get_unchecked_mut(processed_fields)
1096 };
1097 let mut add_null = false;
1098
1099 if let Some(null_values) = null_values {
1101 let field = if needs_escaping && !field.is_empty() {
1102 unsafe { field.get_unchecked(1..field.len() - 1) }
1103 } else {
1104 field
1105 };
1106
1107 add_null = unsafe { null_values.is_null(field, idx as usize) }
1110 }
1111 if add_null {
1112 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1113 } else {
1114 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1115 .map_err(|e| {
1116 let bytes_offset = offset + field.as_ptr() as usize - start;
1117 let unparsable = String::from_utf8_lossy(field);
1118 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1119 polars_err!(
1120 ComputeError:
1121 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1122 The current offset in the file is {} bytes.\n\
1123 \n\
1124 You might want to try:\n\
1125 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1126 - specifying correct dtype with the `schema_overrides` argument\n\
1127 - setting `ignore_errors` to `True`,\n\
1128 - adding `{}` to the `null_values` list.\n\n\
1129 Original error: ```{}```",
1130 &unparsable,
1131 buf.dtype(),
1132 column_name,
1133 idx + 1,
1134 bytes_offset,
1135 &unparsable,
1136 e
1137 )
1138 })?;
1139 }
1140 processed_fields += 1;
1141
1142 match projection_iter.next() {
1144 Some(p) => next_projected = p,
1145 None => {
1146 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1147 bytes = unsafe { bytes.get_unchecked(read_sol..) };
1148 } else {
1149 if !truncate_ragged_lines && read_sol < bytes.len() {
1150 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1151
1152Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1153 }
1154 let bytes_rem = skip_this_line(
1155 unsafe { bytes.get_unchecked(read_sol - 1..) },
1156 parse_options.quote_char,
1157 parse_options.eol_char,
1158 );
1159 bytes = bytes_rem;
1160 }
1161 break;
1162 },
1163 }
1164 }
1165 idx += 1;
1166 },
1167 }
1168 }
1169
1170 while processed_fields < projection.len() {
1174 debug_assert!(processed_fields < buffers.len());
1175 let buf = unsafe {
1176 buffers.get_unchecked_mut(processed_fields)
1178 };
1179 buf.add_null(!parse_options.missing_is_null);
1180 processed_fields += 1;
1181 }
1182 line_count += 1;
1183 }
1184}
1185
1186#[cfg(test)]
1187mod test {
1188 use super::SplitLines;
1189
1190 #[test]
1191 fn test_splitlines() {
1192 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1193 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1194 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1195 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1196 assert_eq!(lines.next(), None);
1197
1198 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1199 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1200 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1201 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1202 assert_eq!(lines2.next(), None);
1203 }
1204}