1use memchr::memchr2_iter;
2use polars_core::prelude::*;
3use polars_core::{POOL, config};
4use polars_error::feature_gated;
5use polars_utils::mmap::MMapSemaphore;
6use polars_utils::plpath::PlPathRef;
7use polars_utils::select::select_unpredictable;
8use rayon::prelude::*;
9
10use super::CsvParseOptions;
11use super::buffer::Buffer;
12use super::options::{CommentPrefix, NullValuesCompiled};
13use super::splitfields::SplitFields;
14use crate::prelude::_csv_read_internal::find_starting_point;
15use crate::utils::compression::maybe_decompress_bytes;
16
17#[allow(clippy::too_many_arguments)]
20pub fn count_rows(
21 addr: PlPathRef<'_>,
22 quote_char: Option<u8>,
23 comment_prefix: Option<&CommentPrefix>,
24 eol_char: u8,
25 has_header: bool,
26 skip_lines: usize,
27 skip_rows_before_header: usize,
28 skip_rows_after_header: usize,
29) -> PolarsResult<usize> {
30 let file = match addr
31 .as_local_path()
32 .and_then(|v| (!config::force_async()).then_some(v))
33 {
34 None => feature_gated!("cloud", {
35 crate::file_cache::FILE_CACHE
36 .get_entry(addr)
37 .unwrap()
39 .try_open_assume_latest()?
40 }),
41 Some(path) => polars_utils::open_file(path)?,
42 };
43
44 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
45 let owned = &mut vec![];
46 let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
47
48 count_rows_from_slice_par(
49 reader_bytes,
50 quote_char,
51 comment_prefix,
52 eol_char,
53 has_header,
54 skip_lines,
55 skip_rows_before_header,
56 skip_rows_after_header,
57 )
58}
59
60#[allow(clippy::too_many_arguments)]
63pub fn count_rows_from_slice_par(
64 mut bytes: &[u8],
65 quote_char: Option<u8>,
66 comment_prefix: Option<&CommentPrefix>,
67 eol_char: u8,
68 has_header: bool,
69 skip_lines: usize,
70 skip_rows_before_header: usize,
71 skip_rows_after_header: usize,
72) -> PolarsResult<usize> {
73 let start_offset = find_starting_point(
74 bytes,
75 quote_char,
76 eol_char,
77 usize::MAX,
83 skip_lines,
84 skip_rows_before_header,
85 skip_rows_after_header,
86 comment_prefix,
87 has_header,
88 )?;
89 bytes = &bytes[start_offset..];
90
91 #[cfg(debug_assertions)]
92 const BYTES_PER_CHUNK: usize = 128;
93 #[cfg(not(debug_assertions))]
94 const BYTES_PER_CHUNK: usize = 1 << 16;
95
96 let count = CountLines::new(quote_char, eol_char, comment_prefix.cloned());
97 POOL.install(|| {
98 let mut states = Vec::new();
99 if comment_prefix.is_none() {
100 bytes
101 .par_chunks(BYTES_PER_CHUNK)
102 .map(|chunk| count.analyze_chunk(chunk))
103 .collect_into_vec(&mut states);
104 } else {
105 let num_chunks = bytes.len().div_ceil(BYTES_PER_CHUNK);
106 (0..num_chunks)
107 .into_par_iter()
108 .map(|chunk_idx| {
109 let mut start_offset = chunk_idx * BYTES_PER_CHUNK;
110 let next_start_offset = (start_offset + BYTES_PER_CHUNK).min(bytes.len());
111
112 if start_offset != 0 {
113 if let Some(nl_off) = bytes[start_offset..next_start_offset]
115 .iter()
116 .position(|b| *b == eol_char)
117 {
118 start_offset += nl_off + 1;
119 } else {
120 return count.analyze_chunk(&[]);
121 }
122 }
123
124 let stop_offset = if let Some(nl_off) = bytes[next_start_offset..]
125 .iter()
126 .position(|b| *b == eol_char)
127 {
128 next_start_offset + nl_off + 1
129 } else {
130 bytes.len()
131 };
132
133 count.analyze_chunk(&bytes[start_offset..stop_offset])
134 })
135 .collect_into_vec(&mut states);
136 }
137
138 let mut n = 0;
139 let mut in_string = false;
140 for pair in states {
141 n += pair[in_string as usize].newline_count;
142 in_string = pair[in_string as usize].end_inside_string;
143 }
144 if let Some(last) = bytes.last()
145 && *last != eol_char
146 && (comment_prefix.is_none()
147 || !is_comment_line(
148 bytes.rsplit(|c| *c == eol_char).next().unwrap(),
149 comment_prefix,
150 ))
151 {
152 n += 1
153 }
154
155 Ok(n)
156 })
157}
158
159pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
162 if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
163 &input[3..]
164 } else {
165 input
166 }
167}
168
169#[inline]
173pub fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
174 match comment_prefix {
175 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
176 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
177 None => false,
178 }
179}
180
181pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
184 let pos = memchr::memchr(eol_char, input)? + 1;
185 if input.len() - pos == 0 {
186 return None;
187 }
188 Some(pos)
189}
190
191pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
192 for _ in 0..skip {
193 if let Some(pos) = next_line_position_naive(input, eol_char) {
194 input = &input[pos..];
195 } else {
196 return input;
197 }
198 }
199 input
200}
201
202pub(super) fn next_line_position(
204 mut input: &[u8],
205 mut expected_fields: Option<usize>,
206 separator: u8,
207 quote_char: Option<u8>,
208 eol_char: u8,
209) -> Option<usize> {
210 fn accept_line(
211 line: &[u8],
212 expected_fields: usize,
213 separator: u8,
214 eol_char: u8,
215 quote_char: Option<u8>,
216 ) -> bool {
217 let mut count = 0usize;
218 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
219 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
220 return false;
221 }
222 count += 1;
223 }
224
225 expected_fields.wrapping_sub(count) <= 1
235 }
236
237 let mut rejected_line_groups = 0u8;
240
241 let mut total_pos = 0;
242 if input.is_empty() {
243 return None;
244 }
245 let mut lines_checked = 0u8;
246 loop {
247 if rejected_line_groups >= 3 {
248 return None;
249 }
250 lines_checked = lines_checked.wrapping_add(1);
251 if lines_checked == u8::MAX {
255 if let Some(ef) = expected_fields {
256 expected_fields = Some(ef.saturating_sub(1))
257 }
258 };
259 let pos = memchr::memchr(eol_char, input)? + 1;
260 if input.len() - pos == 0 {
261 return None;
262 }
263 debug_assert!(pos <= input.len());
264 let new_input = unsafe { input.get_unchecked(pos..) };
265 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
266 let line = lines.next();
267
268 match (line, expected_fields) {
269 (Some(line), Some(expected_fields)) => {
271 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
272 let mut valid = true;
273 for line in lines.take(2) {
274 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
275 valid = false;
276 break;
277 }
278 }
279 if valid {
280 return Some(total_pos + pos);
281 } else {
282 rejected_line_groups += 1;
283 }
284 } else {
285 debug_assert!(pos < input.len());
286 unsafe {
287 input = input.get_unchecked(pos + 1..);
288 }
289 total_pos += pos + 1;
290 }
291 },
292 (Some(_), None) => return Some(total_pos + pos),
294 _ => return None,
296 }
297 }
298}
299
300#[inline(always)]
301pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
302 b == eol_char || b == b'\r'
303}
304
305#[inline(always)]
306pub(super) fn is_whitespace(b: u8) -> bool {
307 b == b' ' || b == b'\t'
308}
309
310#[inline(always)]
312pub(super) fn could_be_whitespace_fast(b: u8) -> bool {
313 b <= 32
318}
319
320#[inline]
321fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
322where
323 F: Fn(u8) -> bool,
324{
325 if input.is_empty() {
326 return input;
327 }
328
329 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
330 &input[read..]
331}
332
333#[inline]
339pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
340 skip_condition(input, is_whitespace)
341}
342
343#[inline]
344pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
345 skip_condition(input, |b| is_line_ending(b, eol_char))
346}
347
348pub(super) struct SplitLines<'a> {
358 v: &'a [u8],
359 quote_char: u8,
360 eol_char: u8,
361 #[cfg(feature = "simd")]
362 simd_eol_char: SimdVec,
363 #[cfg(feature = "simd")]
364 simd_quote_char: SimdVec,
365 #[cfg(feature = "simd")]
366 previous_valid_eols: u64,
367 total_index: usize,
368 quoting: bool,
369 comment_prefix: Option<&'a CommentPrefix>,
370}
371
372#[cfg(feature = "simd")]
373const SIMD_SIZE: usize = 64;
374#[cfg(feature = "simd")]
375use std::simd::prelude::*;
376
377#[cfg(feature = "simd")]
378use polars_utils::clmul::prefix_xorsum_inclusive;
379
380#[cfg(feature = "simd")]
381type SimdVec = u8x64;
382
383impl<'a> SplitLines<'a> {
384 pub(super) fn new(
385 slice: &'a [u8],
386 quote_char: Option<u8>,
387 eol_char: u8,
388 comment_prefix: Option<&'a CommentPrefix>,
389 ) -> Self {
390 let quoting = quote_char.is_some();
391 let quote_char = quote_char.unwrap_or(b'\"');
392 #[cfg(feature = "simd")]
393 let simd_eol_char = SimdVec::splat(eol_char);
394 #[cfg(feature = "simd")]
395 let simd_quote_char = SimdVec::splat(quote_char);
396 Self {
397 v: slice,
398 quote_char,
399 eol_char,
400 #[cfg(feature = "simd")]
401 simd_eol_char,
402 #[cfg(feature = "simd")]
403 simd_quote_char,
404 #[cfg(feature = "simd")]
405 previous_valid_eols: 0,
406 total_index: 0,
407 quoting,
408 comment_prefix,
409 }
410 }
411}
412
413impl<'a> SplitLines<'a> {
414 fn next_scalar(&mut self) -> Option<&'a [u8]> {
416 if self.v.is_empty() {
417 return None;
418 }
419 if is_comment_line(self.v, self.comment_prefix) {
420 return self.next_comment_line();
421 }
422 {
423 let mut pos = 0u32;
424 let mut iter = self.v.iter();
425 let mut in_field = false;
426 loop {
427 match iter.next() {
428 Some(&c) => {
429 pos += 1;
430
431 if self.quoting && c == self.quote_char {
432 in_field = !in_field;
436 }
437 else if c == self.eol_char && !in_field {
439 break;
440 }
441 },
442 None => {
443 let remainder = self.v;
444 self.v = &[];
445 return Some(remainder);
446 },
447 }
448 }
449
450 unsafe {
451 debug_assert!((pos as usize) <= self.v.len());
452
453 let ret = Some(
455 self.v
456 .get_unchecked(..(self.total_index + pos as usize - 1)),
457 );
458 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
460 ret
461 }
462 }
463 }
464 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
465 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
466 unsafe {
467 let ret = Some(self.v.get_unchecked(..(pos - 1)));
469 self.v = self.v.get_unchecked(pos..);
471 ret
472 }
473 } else {
474 let remainder = self.v;
475 self.v = &[];
476 Some(remainder)
477 }
478 }
479}
480
481impl<'a> Iterator for SplitLines<'a> {
482 type Item = &'a [u8];
483
484 #[inline]
485 #[cfg(not(feature = "simd"))]
486 fn next(&mut self) -> Option<&'a [u8]> {
487 self.next_scalar()
488 }
489
490 #[inline]
491 #[cfg(feature = "simd")]
492 fn next(&mut self) -> Option<&'a [u8]> {
493 if self.previous_valid_eols != 0 {
495 let pos = self.previous_valid_eols.trailing_zeros() as usize;
496 self.previous_valid_eols >>= (pos + 1) as u64;
497
498 unsafe {
499 debug_assert!((pos) <= self.v.len());
500
501 let ret = Some(self.v.get_unchecked(..pos));
503 self.v = self.v.get_unchecked(pos + 1..);
505 return ret;
506 }
507 }
508 if self.v.is_empty() {
509 return None;
510 }
511 if self.comment_prefix.is_some() {
512 return self.next_scalar();
513 }
514
515 self.total_index = 0;
516 let mut not_in_field_previous_iter = true;
517
518 loop {
519 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
520 if bytes.len() > SIMD_SIZE {
521 let lane: [u8; SIMD_SIZE] = unsafe {
522 bytes
523 .get_unchecked(0..SIMD_SIZE)
524 .try_into()
525 .unwrap_unchecked()
526 };
527 let simd_bytes = SimdVec::from(lane);
528 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
529
530 let valid_eols = if self.quoting {
531 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
532 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
533
534 if not_in_field_previous_iter {
535 not_in_quote_field = !not_in_quote_field;
536 }
537 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
538 eol_mask & not_in_quote_field
539 } else {
540 eol_mask
541 };
542
543 if valid_eols != 0 {
544 let pos = valid_eols.trailing_zeros() as usize;
545 if pos == SIMD_SIZE - 1 {
546 self.previous_valid_eols = 0;
547 } else {
548 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
549 }
550
551 unsafe {
552 let pos = self.total_index + pos;
553 debug_assert!((pos) <= self.v.len());
554
555 let ret = Some(self.v.get_unchecked(..pos));
557 self.v = self.v.get_unchecked(pos + 1..);
559 return ret;
560 }
561 } else {
562 self.total_index += SIMD_SIZE;
563 }
564 } else {
565 let mut in_field = !not_in_field_previous_iter;
567 let mut pos = 0u32;
568 let mut iter = bytes.iter();
569 loop {
570 match iter.next() {
571 Some(&c) => {
572 pos += 1;
573
574 if self.quoting && c == self.quote_char {
575 in_field = !in_field;
579 }
580 else if c == self.eol_char && !in_field {
582 break;
583 }
584 },
585 None => {
586 let remainder = self.v;
587 self.v = &[];
588 return Some(remainder);
589 },
590 }
591 }
592
593 unsafe {
594 debug_assert!((pos as usize) <= self.v.len());
595
596 let ret = Some(
598 self.v
599 .get_unchecked(..(self.total_index + pos as usize - 1)),
600 );
601 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
603 return ret;
604 }
605 }
606 }
607 }
608}
609
610pub struct CountLines {
611 quote_char: u8,
612 eol_char: u8,
613 #[cfg(feature = "simd")]
614 simd_eol_char: SimdVec,
615 #[cfg(feature = "simd")]
616 simd_quote_char: SimdVec,
617 quoting: bool,
618 comment_prefix: Option<CommentPrefix>,
619}
620
621#[derive(Copy, Clone, Debug, Default)]
622pub struct LineStats {
623 newline_count: usize,
624 last_newline_offset: usize,
625 end_inside_string: bool,
626}
627
628impl CountLines {
629 pub fn new(
630 quote_char: Option<u8>,
631 eol_char: u8,
632 comment_prefix: Option<CommentPrefix>,
633 ) -> Self {
634 let quoting = quote_char.is_some();
635 let quote_char = quote_char.unwrap_or(b'\"');
636 #[cfg(feature = "simd")]
637 let simd_eol_char = SimdVec::splat(eol_char);
638 #[cfg(feature = "simd")]
639 let simd_quote_char = SimdVec::splat(quote_char);
640 Self {
641 quote_char,
642 eol_char,
643 #[cfg(feature = "simd")]
644 simd_eol_char,
645 #[cfg(feature = "simd")]
646 simd_quote_char,
647 quoting,
648 comment_prefix,
649 }
650 }
651
652 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
661 let mut states = [
662 LineStats {
663 newline_count: 0,
664 last_newline_offset: 0,
665 end_inside_string: false,
666 },
667 LineStats {
668 newline_count: 0,
669 last_newline_offset: 0,
670 end_inside_string: false,
671 },
672 ];
673
674 if self.comment_prefix.is_some() {
676 states[0] = self.analyze_chunk_with_comment(bytes, false);
677 states[1] = self.analyze_chunk_with_comment(bytes, true);
678 return states;
679 }
680
681 #[allow(unused_assignments)]
683 let mut global_quote_parity = false;
684 let mut scan_offset = 0;
685
686 #[cfg(feature = "simd")]
687 {
688 let mut global_quote_parity_mask = 0;
690 while scan_offset + 64 <= bytes.len() {
691 let block: [u8; 64] = unsafe {
692 bytes
693 .get_unchecked(scan_offset..scan_offset + 64)
694 .try_into()
695 .unwrap_unchecked()
696 };
697 let simd_bytes = SimdVec::from(block);
698 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
699 if self.quoting {
700 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
701 let quote_parity =
702 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
703 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
704
705 let start_outside_string_eol_mask = eol_mask & !quote_parity;
706 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
707 states[0].last_newline_offset = select_unpredictable(
708 start_outside_string_eol_mask != 0,
709 (scan_offset + 63)
710 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
711 states[0].last_newline_offset,
712 );
713
714 let start_inside_string_eol_mask = eol_mask & quote_parity;
715 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
716 states[1].last_newline_offset = select_unpredictable(
717 start_inside_string_eol_mask != 0,
718 (scan_offset + 63)
719 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
720 states[1].last_newline_offset,
721 );
722 } else {
723 states[0].newline_count += eol_mask.count_ones() as usize;
724 states[0].last_newline_offset = select_unpredictable(
725 eol_mask != 0,
726 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
727 states[0].last_newline_offset,
728 );
729 }
730
731 scan_offset += 64;
732 }
733
734 global_quote_parity = global_quote_parity_mask > 0;
735 }
736
737 while scan_offset < bytes.len() {
738 let c = unsafe { *bytes.get_unchecked(scan_offset) };
739 global_quote_parity ^= (c == self.quote_char) & self.quoting;
740
741 let state = &mut states[global_quote_parity as usize];
742 state.newline_count += (c == self.eol_char) as usize;
743 state.last_newline_offset =
744 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
745
746 scan_offset += 1;
747 }
748
749 states[0].end_inside_string = global_quote_parity;
750 states[1].end_inside_string = !global_quote_parity;
751 states
752 }
753
754 fn analyze_chunk_with_comment(&self, bytes: &[u8], mut in_string: bool) -> LineStats {
756 let pre_s = match self.comment_prefix.as_ref().unwrap() {
757 CommentPrefix::Single(pc) => core::slice::from_ref(pc),
758 CommentPrefix::Multi(ps) => ps.as_bytes(),
759 };
760
761 let mut state = LineStats::default();
762 let mut scan_offset = 0;
763 while scan_offset < bytes.len() {
764 while bytes[scan_offset..].starts_with(pre_s) {
766 scan_offset += pre_s.len();
767 let Some(nl_off) = bytes[scan_offset..]
768 .iter()
769 .position(|c| *c == self.eol_char)
770 else {
771 break;
772 };
773 scan_offset += nl_off + 1;
774 }
775
776 while scan_offset < bytes.len() {
777 let c = unsafe { *bytes.get_unchecked(scan_offset) };
778 in_string ^= (c == self.quote_char) & self.quoting;
779
780 if c == self.eol_char && !in_string {
781 state.newline_count += 1;
782 state.last_newline_offset = scan_offset;
783 scan_offset += 1;
784 break;
785 } else {
786 scan_offset += 1;
787 }
788 }
789 }
790
791 state.end_inside_string = in_string;
792 state
793 }
794
795 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
796 loop {
797 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
798
799 let (count, offset) = if self.comment_prefix.is_some() {
800 let stats = self.analyze_chunk_with_comment(b, false);
801 (stats.newline_count, stats.last_newline_offset)
802 } else {
803 self.count(b)
804 };
805
806 if count > 0 || b.len() == bytes.len() {
807 return (count, offset);
808 }
809
810 *chunk_size = chunk_size.saturating_mul(2);
811 }
812 }
813
814 #[cfg(feature = "simd")]
816 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
817 let mut total_idx = 0;
818 let original_bytes = bytes;
819 let mut count = 0;
820 let mut position = 0;
821 let mut not_in_field_previous_iter = true;
822
823 loop {
824 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
825
826 if bytes.len() > SIMD_SIZE {
827 let lane: [u8; SIMD_SIZE] = unsafe {
828 bytes
829 .get_unchecked(0..SIMD_SIZE)
830 .try_into()
831 .unwrap_unchecked()
832 };
833 let simd_bytes = SimdVec::from(lane);
834 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
835
836 let valid_eols = if self.quoting {
837 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
838 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
839
840 if not_in_field_previous_iter {
841 not_in_quote_field = !not_in_quote_field;
842 }
843 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
844 eol_mask & not_in_quote_field
845 } else {
846 eol_mask
847 };
848
849 if valid_eols != 0 {
850 count += valid_eols.count_ones() as usize;
851 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
852 debug_assert_eq!(original_bytes[position], self.eol_char)
853 }
854 total_idx += SIMD_SIZE;
855 } else if bytes.is_empty() {
856 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
857 return (count, position);
858 } else {
859 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
860
861 let (count, position) = if c > 0 {
862 (count + c, total_idx + o)
863 } else {
864 (count, position)
865 };
866 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
867
868 return (count, position);
869 }
870 }
871 }
872
873 #[cfg(not(feature = "simd"))]
874 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
875 self.count_no_simd(bytes, false)
876 }
877
878 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
879 let iter = bytes.iter();
880 let mut in_field = in_field;
881 let mut count = 0;
882 let mut position = 0;
883
884 for b in iter {
885 let c = *b;
886 if self.quoting && c == self.quote_char {
887 in_field = !in_field;
891 }
892 else if c == self.eol_char && !in_field {
894 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
895 count += 1;
896 }
897 }
898 debug_assert!(count == 0 || bytes[position] == self.eol_char);
899
900 (count, position)
901 }
902}
903
904#[inline]
905fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
906 let mut in_field = false;
907
908 let mut idx = 0u32;
909 #[allow(clippy::explicit_counter_loop)]
911 for &c in bytes.iter() {
912 if c == quote_char {
913 in_field = !in_field;
917 }
918
919 if !in_field && c == needle {
920 return Some(idx as usize);
921 }
922 idx += 1;
923 }
924 None
925}
926
927#[inline]
928pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
929 let pos = match quote {
930 Some(quote) => find_quoted(bytes, quote, eol_char),
931 None => bytes.iter().position(|x| *x == eol_char),
932 };
933 match pos {
934 None => &[],
935 Some(pos) => &bytes[pos + 1..],
936 }
937}
938
939#[inline]
940pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
941 if let Some(pos) = next_line_position_naive(input, eol_char) {
942 unsafe { input.get_unchecked(pos..) }
943 } else {
944 &[]
945 }
946}
947
948#[allow(clippy::too_many_arguments)]
960pub(super) fn parse_lines(
961 mut bytes: &[u8],
962 parse_options: &CsvParseOptions,
963 offset: usize,
964 ignore_errors: bool,
965 null_values: Option<&NullValuesCompiled>,
966 projection: &[usize],
967 buffers: &mut [Buffer],
968 n_lines: usize,
969 schema_len: usize,
971 schema: &Schema,
972) -> PolarsResult<usize> {
973 assert!(
974 !projection.is_empty(),
975 "at least one column should be projected"
976 );
977 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
978 if projection.len() != schema_len {
982 truncate_ragged_lines = true
983 }
984
985 let start = bytes.as_ptr() as usize;
987 let original_bytes_len = bytes.len();
988 let n_lines = n_lines as u32;
989
990 let mut line_count = 0u32;
991 loop {
992 if line_count > n_lines {
993 let end = bytes.as_ptr() as usize;
994 return Ok(end - start);
995 }
996
997 if bytes.is_empty() {
998 return Ok(original_bytes_len);
999 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
1000 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
1002 bytes = bytes_rem;
1003 continue;
1004 }
1005
1006 let mut projection_iter = projection.iter().copied();
1010 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1011 let mut processed_fields = 0;
1012
1013 let mut iter = SplitFields::new(
1014 bytes,
1015 parse_options.separator,
1016 parse_options.quote_char,
1017 parse_options.eol_char,
1018 );
1019 let mut idx = 0u32;
1020 let mut read_sol = 0;
1021 loop {
1022 match iter.next() {
1023 None => {
1025 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1026 break;
1027 },
1028 Some((mut field, needs_escaping)) => {
1029 let field_len = field.len();
1030
1031 read_sol += field_len + 1;
1033
1034 if idx == next_projected as u32 {
1035 unsafe {
1038 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1039 field = field.get_unchecked(..field_len - 1);
1040 }
1041 }
1042
1043 debug_assert!(processed_fields < buffers.len());
1044 let buf = unsafe {
1045 buffers.get_unchecked_mut(processed_fields)
1047 };
1048 let mut add_null = false;
1049
1050 if let Some(null_values) = null_values {
1052 let field = if needs_escaping && !field.is_empty() {
1053 unsafe { field.get_unchecked(1..field.len() - 1) }
1054 } else {
1055 field
1056 };
1057
1058 add_null = unsafe { null_values.is_null(field, idx as usize) }
1061 }
1062 if add_null {
1063 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1064 } else {
1065 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1066 .map_err(|e| {
1067 let bytes_offset = offset + field.as_ptr() as usize - start;
1068 let unparsable = String::from_utf8_lossy(field);
1069 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1070 polars_err!(
1071 ComputeError:
1072 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1073 The current offset in the file is {} bytes.\n\
1074 \n\
1075 You might want to try:\n\
1076 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1077 - specifying correct dtype with the `schema_overrides` argument\n\
1078 - setting `ignore_errors` to `True`,\n\
1079 - adding `{}` to the `null_values` list.\n\n\
1080 Original error: ```{}```",
1081 &unparsable,
1082 buf.dtype(),
1083 column_name,
1084 idx + 1,
1085 bytes_offset,
1086 &unparsable,
1087 e
1088 )
1089 })?;
1090 }
1091 processed_fields += 1;
1092
1093 match projection_iter.next() {
1095 Some(p) => next_projected = p,
1096 None => {
1097 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1098 bytes = unsafe { bytes.get_unchecked(read_sol..) };
1099 } else {
1100 if !truncate_ragged_lines && read_sol < bytes.len() {
1101 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1102
1103Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1104 }
1105 let bytes_rem = skip_this_line(
1106 unsafe { bytes.get_unchecked(read_sol - 1..) },
1107 parse_options.quote_char,
1108 parse_options.eol_char,
1109 );
1110 bytes = bytes_rem;
1111 }
1112 break;
1113 },
1114 }
1115 }
1116 idx += 1;
1117 },
1118 }
1119 }
1120
1121 while processed_fields < projection.len() {
1125 debug_assert!(processed_fields < buffers.len());
1126 let buf = unsafe {
1127 buffers.get_unchecked_mut(processed_fields)
1129 };
1130 buf.add_null(!parse_options.missing_is_null);
1131 processed_fields += 1;
1132 }
1133 line_count += 1;
1134 }
1135}
1136
1137#[cfg(test)]
1138mod test {
1139 use super::SplitLines;
1140
1141 #[test]
1142 fn test_splitlines() {
1143 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1144 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1145 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1146 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1147 assert_eq!(lines.next(), None);
1148
1149 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1150 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1151 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1152 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1153 assert_eq!(lines2.next(), None);
1154 }
1155}