1use std::path::Path;
2
3use memchr::memchr2_iter;
4use num_traits::Pow;
5use polars_core::prelude::*;
6use polars_core::{POOL, config};
7use polars_error::feature_gated;
8use polars_utils::mmap::MMapSemaphore;
9use polars_utils::select::select_unpredictable;
10use rayon::prelude::*;
11
12use super::CsvParseOptions;
13use super::buffer::Buffer;
14use super::options::{CommentPrefix, NullValuesCompiled};
15use super::splitfields::SplitFields;
16use super::utils::get_file_chunks;
17use crate::path_utils::is_cloud_url;
18use crate::prelude::_csv_read_internal::find_starting_point;
19use crate::utils::compression::maybe_decompress_bytes;
20
21#[allow(clippy::too_many_arguments)]
24pub fn count_rows(
25 path: &Path,
26 separator: u8,
27 quote_char: Option<u8>,
28 comment_prefix: Option<&CommentPrefix>,
29 eol_char: u8,
30 has_header: bool,
31 skip_lines: usize,
32 skip_rows_before_header: usize,
33 skip_rows_after_header: usize,
34) -> PolarsResult<usize> {
35 let file = if is_cloud_url(path) || config::force_async() {
36 feature_gated!("cloud", {
37 crate::file_cache::FILE_CACHE
38 .get_entry(path.to_str().unwrap())
39 .unwrap()
41 .try_open_assume_latest()?
42 })
43 } else {
44 polars_utils::open_file(path)?
45 };
46
47 let mmap = MMapSemaphore::new_from_file(&file).unwrap();
48 let owned = &mut vec![];
49 let reader_bytes = maybe_decompress_bytes(mmap.as_ref(), owned)?;
50
51 count_rows_from_slice_par(
52 reader_bytes,
53 separator,
54 quote_char,
55 comment_prefix,
56 eol_char,
57 has_header,
58 skip_lines,
59 skip_rows_before_header,
60 skip_rows_after_header,
61 )
62}
63
64#[allow(clippy::too_many_arguments)]
67pub fn count_rows_from_slice_par(
68 mut bytes: &[u8],
69 separator: u8,
70 quote_char: Option<u8>,
71 comment_prefix: Option<&CommentPrefix>,
72 eol_char: u8,
73 has_header: bool,
74 skip_lines: usize,
75 skip_rows_before_header: usize,
76 skip_rows_after_header: usize,
77) -> PolarsResult<usize> {
78 for _ in 0..bytes.len() {
79 if bytes[0] != eol_char {
80 break;
81 }
82
83 bytes = &bytes[1..];
84 }
85
86 let start_offset = find_starting_point(
89 bytes,
90 quote_char,
91 eol_char,
92 usize::MAX,
98 skip_lines,
99 skip_rows_before_header,
100 skip_rows_after_header,
101 comment_prefix,
102 has_header,
103 )?;
104 bytes = &bytes[start_offset..];
105
106 const MIN_ROWS_PER_THREAD: usize = 1024;
107 let max_threads = POOL.current_num_threads();
108
109 let n_threads = get_line_stats(
111 bytes,
112 MIN_ROWS_PER_THREAD,
113 eol_char,
114 None,
115 separator,
116 quote_char,
117 )
118 .map(|(mean, std)| {
119 let n_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
120 (n_rows / MIN_ROWS_PER_THREAD).clamp(1, max_threads)
121 })
122 .unwrap_or(1);
123
124 if n_threads == 1 {
125 return count_rows_from_slice(bytes, quote_char, comment_prefix, eol_char, false);
126 }
127
128 let file_chunks: Vec<(usize, usize)> =
129 get_file_chunks(bytes, n_threads, None, separator, quote_char, eol_char);
130
131 let iter = file_chunks.into_par_iter().map(|(start, stop)| {
132 let bytes = &bytes[start..stop];
133
134 if comment_prefix.is_some() {
135 SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
136 .filter(|line| !is_comment_line(line, comment_prefix))
137 .count()
138 } else {
139 CountLines::new(quote_char, eol_char).count(bytes).0
140 + bytes.last().is_some_and(|x| *x != b'\n') as usize
141 }
142 });
143
144 let n: usize = POOL.install(|| iter.sum());
145
146 Ok(n)
147}
148
149pub fn count_rows_from_slice(
151 mut bytes: &[u8],
152 quote_char: Option<u8>,
153 comment_prefix: Option<&CommentPrefix>,
154 eol_char: u8,
155 has_header: bool,
156) -> PolarsResult<usize> {
157 for _ in 0..bytes.len() {
158 if bytes[0] != eol_char {
159 break;
160 }
161
162 bytes = &bytes[1..];
163 }
164
165 let n = if comment_prefix.is_some() {
166 SplitLines::new(bytes, quote_char, eol_char, comment_prefix)
167 .filter(|line| !is_comment_line(line, comment_prefix))
168 .count()
169 } else {
170 CountLines::new(quote_char, eol_char).count(bytes).0
171 + bytes.last().is_some_and(|x| *x != b'\n') as usize
172 };
173
174 Ok(n - (has_header as usize))
175}
176
177pub(super) fn skip_bom(input: &[u8]) -> &[u8] {
180 if input.len() >= 3 && &input[0..3] == b"\xef\xbb\xbf" {
181 &input[3..]
182 } else {
183 input
184 }
185}
186
187#[inline]
191pub(super) fn is_comment_line(line: &[u8], comment_prefix: Option<&CommentPrefix>) -> bool {
192 match comment_prefix {
193 Some(CommentPrefix::Single(c)) => line.first() == Some(c),
194 Some(CommentPrefix::Multi(s)) => line.starts_with(s.as_bytes()),
195 None => false,
196 }
197}
198
199pub(super) fn next_line_position_naive(input: &[u8], eol_char: u8) -> Option<usize> {
202 let pos = memchr::memchr(eol_char, input)? + 1;
203 if input.len() - pos == 0 {
204 return None;
205 }
206 Some(pos)
207}
208
209pub(super) fn skip_lines_naive(mut input: &[u8], eol_char: u8, skip: usize) -> &[u8] {
210 for _ in 0..skip {
211 if let Some(pos) = next_line_position_naive(input, eol_char) {
212 input = &input[pos..];
213 } else {
214 return input;
215 }
216 }
217 input
218}
219
220pub(super) fn next_line_position(
222 mut input: &[u8],
223 mut expected_fields: Option<usize>,
224 separator: u8,
225 quote_char: Option<u8>,
226 eol_char: u8,
227) -> Option<usize> {
228 fn accept_line(
229 line: &[u8],
230 expected_fields: usize,
231 separator: u8,
232 eol_char: u8,
233 quote_char: Option<u8>,
234 ) -> bool {
235 let mut count = 0usize;
236 for (field, _) in SplitFields::new(line, separator, quote_char, eol_char) {
237 if memchr2_iter(separator, eol_char, field).count() >= expected_fields {
238 return false;
239 }
240 count += 1;
241 }
242
243 expected_fields.wrapping_sub(count) <= 1
253 }
254
255 let mut rejected_line_groups = 0u8;
258
259 let mut total_pos = 0;
260 if input.is_empty() {
261 return None;
262 }
263 let mut lines_checked = 0u8;
264 loop {
265 if rejected_line_groups >= 3 {
266 return None;
267 }
268 lines_checked = lines_checked.wrapping_add(1);
269 if lines_checked == u8::MAX {
273 if let Some(ef) = expected_fields {
274 expected_fields = Some(ef.saturating_sub(1))
275 }
276 };
277 let pos = memchr::memchr(eol_char, input)? + 1;
278 if input.len() - pos == 0 {
279 return None;
280 }
281 debug_assert!(pos <= input.len());
282 let new_input = unsafe { input.get_unchecked(pos..) };
283 let mut lines = SplitLines::new(new_input, quote_char, eol_char, None);
284 let line = lines.next();
285
286 match (line, expected_fields) {
287 (Some(line), Some(expected_fields)) => {
289 if accept_line(line, expected_fields, separator, eol_char, quote_char) {
290 let mut valid = true;
291 for line in lines.take(2) {
292 if !accept_line(line, expected_fields, separator, eol_char, quote_char) {
293 valid = false;
294 break;
295 }
296 }
297 if valid {
298 return Some(total_pos + pos);
299 } else {
300 rejected_line_groups += 1;
301 }
302 } else {
303 debug_assert!(pos < input.len());
304 unsafe {
305 input = input.get_unchecked(pos + 1..);
306 }
307 total_pos += pos + 1;
308 }
309 },
310 (Some(_), None) => return Some(total_pos + pos),
312 _ => return None,
314 }
315 }
316}
317
318pub(super) fn is_line_ending(b: u8, eol_char: u8) -> bool {
319 b == eol_char || b == b'\r'
320}
321
322pub(super) fn is_whitespace(b: u8) -> bool {
323 b == b' ' || b == b'\t'
324}
325
326#[inline]
327fn skip_condition<F>(input: &[u8], f: F) -> &[u8]
328where
329 F: Fn(u8) -> bool,
330{
331 if input.is_empty() {
332 return input;
333 }
334
335 let read = input.iter().position(|b| !f(*b)).unwrap_or(input.len());
336 &input[read..]
337}
338
339#[inline]
345pub(super) fn skip_whitespace(input: &[u8]) -> &[u8] {
346 skip_condition(input, is_whitespace)
347}
348
349#[inline]
350pub(super) fn skip_line_ending(input: &[u8], eol_char: u8) -> &[u8] {
351 skip_condition(input, |b| is_line_ending(b, eol_char))
352}
353
354pub(super) fn get_line_stats(
356 bytes: &[u8],
357 n_lines: usize,
358 eol_char: u8,
359 expected_fields: Option<usize>,
360 separator: u8,
361 quote_char: Option<u8>,
362) -> Option<(f32, f32)> {
363 let mut lengths = Vec::with_capacity(n_lines);
364
365 let mut bytes_trunc;
366 let n_lines_per_iter = n_lines / 2;
367
368 let mut n_read = 0;
369
370 for offset in [0, (bytes.len() as f32 * 0.75) as usize] {
372 bytes_trunc = &bytes[offset..];
373 let pos = next_line_position(
374 bytes_trunc,
375 expected_fields,
376 separator,
377 quote_char,
378 eol_char,
379 )?;
380 bytes_trunc = &bytes_trunc[pos + 1..];
381
382 for _ in offset..(offset + n_lines_per_iter) {
383 let pos = next_line_position_naive(bytes_trunc, eol_char)? + 1;
384 n_read += pos;
385 lengths.push(pos);
386 bytes_trunc = &bytes_trunc[pos..];
387 }
388 }
389
390 let n_samples = lengths.len();
391
392 let mean = (n_read as f32) / (n_samples as f32);
393 let mut std = 0.0;
394 for &len in lengths.iter() {
395 std += (len as f32 - mean).pow(2.0)
396 }
397 std = (std / n_samples as f32).sqrt();
398 Some((mean, std))
399}
400
401pub(super) struct SplitLines<'a> {
411 v: &'a [u8],
412 quote_char: u8,
413 eol_char: u8,
414 #[cfg(feature = "simd")]
415 simd_eol_char: SimdVec,
416 #[cfg(feature = "simd")]
417 simd_quote_char: SimdVec,
418 #[cfg(feature = "simd")]
419 previous_valid_eols: u64,
420 total_index: usize,
421 quoting: bool,
422 comment_prefix: Option<&'a CommentPrefix>,
423}
424
425#[cfg(feature = "simd")]
426const SIMD_SIZE: usize = 64;
427#[cfg(feature = "simd")]
428use std::simd::prelude::*;
429
430#[cfg(feature = "simd")]
431use polars_utils::clmul::prefix_xorsum_inclusive;
432
433#[cfg(feature = "simd")]
434type SimdVec = u8x64;
435
436impl<'a> SplitLines<'a> {
437 pub(super) fn new(
438 slice: &'a [u8],
439 quote_char: Option<u8>,
440 eol_char: u8,
441 comment_prefix: Option<&'a CommentPrefix>,
442 ) -> Self {
443 let quoting = quote_char.is_some();
444 let quote_char = quote_char.unwrap_or(b'\"');
445 #[cfg(feature = "simd")]
446 let simd_eol_char = SimdVec::splat(eol_char);
447 #[cfg(feature = "simd")]
448 let simd_quote_char = SimdVec::splat(quote_char);
449 Self {
450 v: slice,
451 quote_char,
452 eol_char,
453 #[cfg(feature = "simd")]
454 simd_eol_char,
455 #[cfg(feature = "simd")]
456 simd_quote_char,
457 #[cfg(feature = "simd")]
458 previous_valid_eols: 0,
459 total_index: 0,
460 quoting,
461 comment_prefix,
462 }
463 }
464}
465
466impl<'a> SplitLines<'a> {
467 fn next_scalar(&mut self) -> Option<&'a [u8]> {
469 if self.v.is_empty() {
470 return None;
471 }
472 if is_comment_line(self.v, self.comment_prefix) {
473 return self.next_comment_line();
474 }
475 {
476 let mut pos = 0u32;
477 let mut iter = self.v.iter();
478 let mut in_field = false;
479 loop {
480 match iter.next() {
481 Some(&c) => {
482 pos += 1;
483
484 if self.quoting && c == self.quote_char {
485 in_field = !in_field;
489 }
490 else if c == self.eol_char && !in_field {
492 break;
493 }
494 },
495 None => {
496 let remainder = self.v;
497 self.v = &[];
498 return Some(remainder);
499 },
500 }
501 }
502
503 unsafe {
504 debug_assert!((pos as usize) <= self.v.len());
505
506 let ret = Some(
508 self.v
509 .get_unchecked(..(self.total_index + pos as usize - 1)),
510 );
511 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
513 ret
514 }
515 }
516 }
517 fn next_comment_line(&mut self) -> Option<&'a [u8]> {
518 if let Some(pos) = next_line_position_naive(self.v, self.eol_char) {
519 unsafe {
520 let ret = Some(self.v.get_unchecked(..(pos - 1)));
522 self.v = self.v.get_unchecked(pos..);
524 ret
525 }
526 } else {
527 let remainder = self.v;
528 self.v = &[];
529 Some(remainder)
530 }
531 }
532}
533
534impl<'a> Iterator for SplitLines<'a> {
535 type Item = &'a [u8];
536
537 #[inline]
538 #[cfg(not(feature = "simd"))]
539 fn next(&mut self) -> Option<&'a [u8]> {
540 self.next_scalar()
541 }
542
543 #[inline]
544 #[cfg(feature = "simd")]
545 fn next(&mut self) -> Option<&'a [u8]> {
546 if self.previous_valid_eols != 0 {
548 let pos = self.previous_valid_eols.trailing_zeros() as usize;
549 self.previous_valid_eols >>= (pos + 1) as u64;
550
551 unsafe {
552 debug_assert!((pos) <= self.v.len());
553
554 let ret = Some(self.v.get_unchecked(..pos));
556 self.v = self.v.get_unchecked(pos + 1..);
558 return ret;
559 }
560 }
561 if self.v.is_empty() {
562 return None;
563 }
564 if self.comment_prefix.is_some() {
565 return self.next_scalar();
566 }
567
568 self.total_index = 0;
569 let mut not_in_field_previous_iter = true;
570
571 loop {
572 let bytes = unsafe { self.v.get_unchecked(self.total_index..) };
573 if bytes.len() > SIMD_SIZE {
574 let lane: [u8; SIMD_SIZE] = unsafe {
575 bytes
576 .get_unchecked(0..SIMD_SIZE)
577 .try_into()
578 .unwrap_unchecked()
579 };
580 let simd_bytes = SimdVec::from(lane);
581 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
582
583 let valid_eols = if self.quoting {
584 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
585 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
586
587 if not_in_field_previous_iter {
588 not_in_quote_field = !not_in_quote_field;
589 }
590 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
591 eol_mask & not_in_quote_field
592 } else {
593 eol_mask
594 };
595
596 if valid_eols != 0 {
597 let pos = valid_eols.trailing_zeros() as usize;
598 if pos == SIMD_SIZE - 1 {
599 self.previous_valid_eols = 0;
600 } else {
601 self.previous_valid_eols = valid_eols >> (pos + 1) as u64;
602 }
603
604 unsafe {
605 let pos = self.total_index + pos;
606 debug_assert!((pos) <= self.v.len());
607
608 let ret = Some(self.v.get_unchecked(..pos));
610 self.v = self.v.get_unchecked(pos + 1..);
612 return ret;
613 }
614 } else {
615 self.total_index += SIMD_SIZE;
616 }
617 } else {
618 let mut in_field = !not_in_field_previous_iter;
620 let mut pos = 0u32;
621 let mut iter = bytes.iter();
622 loop {
623 match iter.next() {
624 Some(&c) => {
625 pos += 1;
626
627 if self.quoting && c == self.quote_char {
628 in_field = !in_field;
632 }
633 else if c == self.eol_char && !in_field {
635 break;
636 }
637 },
638 None => {
639 let remainder = self.v;
640 self.v = &[];
641 return Some(remainder);
642 },
643 }
644 }
645
646 unsafe {
647 debug_assert!((pos as usize) <= self.v.len());
648
649 let ret = Some(
651 self.v
652 .get_unchecked(..(self.total_index + pos as usize - 1)),
653 );
654 self.v = self.v.get_unchecked(self.total_index + pos as usize..);
656 return ret;
657 }
658 }
659 }
660 }
661}
662
663pub struct CountLines {
664 quote_char: u8,
665 eol_char: u8,
666 #[cfg(feature = "simd")]
667 simd_eol_char: SimdVec,
668 #[cfg(feature = "simd")]
669 simd_quote_char: SimdVec,
670 quoting: bool,
671}
672
673#[derive(Copy, Clone, Debug)]
674pub struct LineStats {
675 newline_count: usize,
676 last_newline_offset: usize,
677 end_inside_string: bool,
678}
679
680impl CountLines {
681 pub fn new(quote_char: Option<u8>, eol_char: u8) -> Self {
682 let quoting = quote_char.is_some();
683 let quote_char = quote_char.unwrap_or(b'\"');
684 #[cfg(feature = "simd")]
685 let simd_eol_char = SimdVec::splat(eol_char);
686 #[cfg(feature = "simd")]
687 let simd_quote_char = SimdVec::splat(quote_char);
688 Self {
689 quote_char,
690 eol_char,
691 #[cfg(feature = "simd")]
692 simd_eol_char,
693 #[cfg(feature = "simd")]
694 simd_quote_char,
695 quoting,
696 }
697 }
698
699 pub fn analyze_chunk(&self, bytes: &[u8]) -> [LineStats; 2] {
705 let mut scan_offset = 0;
706 let mut states = [
707 LineStats {
708 newline_count: 0,
709 last_newline_offset: 0,
710 end_inside_string: false,
711 },
712 LineStats {
713 newline_count: 0,
714 last_newline_offset: 0,
715 end_inside_string: false,
716 },
717 ];
718
719 #[allow(unused_assignments)]
721 let mut global_quote_parity = false;
722
723 #[cfg(feature = "simd")]
724 {
725 let mut global_quote_parity_mask = 0;
727 while scan_offset + 64 <= bytes.len() {
728 let block: [u8; 64] = unsafe {
729 bytes
730 .get_unchecked(scan_offset..scan_offset + 64)
731 .try_into()
732 .unwrap_unchecked()
733 };
734 let simd_bytes = SimdVec::from(block);
735 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
736 if self.quoting {
737 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
738 let quote_parity =
739 prefix_xorsum_inclusive(quote_mask) ^ global_quote_parity_mask;
740 global_quote_parity_mask = ((quote_parity as i64) >> 63) as u64;
741
742 let start_outside_string_eol_mask = eol_mask & !quote_parity;
743 states[0].newline_count += start_outside_string_eol_mask.count_ones() as usize;
744 states[0].last_newline_offset = select_unpredictable(
745 start_outside_string_eol_mask != 0,
746 (scan_offset + 63)
747 .wrapping_sub(start_outside_string_eol_mask.leading_zeros() as usize),
748 states[0].last_newline_offset,
749 );
750
751 let start_inside_string_eol_mask = eol_mask & quote_parity;
752 states[1].newline_count += start_inside_string_eol_mask.count_ones() as usize;
753 states[1].last_newline_offset = select_unpredictable(
754 start_inside_string_eol_mask != 0,
755 (scan_offset + 63)
756 .wrapping_sub(start_inside_string_eol_mask.leading_zeros() as usize),
757 states[1].last_newline_offset,
758 );
759 } else {
760 states[0].newline_count += eol_mask.count_ones() as usize;
761 states[0].last_newline_offset = select_unpredictable(
762 eol_mask != 0,
763 (scan_offset + 63).wrapping_sub(eol_mask.leading_zeros() as usize),
764 states[0].last_newline_offset,
765 );
766 }
767
768 scan_offset += 64;
769 }
770
771 global_quote_parity = global_quote_parity_mask > 0;
772 }
773
774 while scan_offset < bytes.len() {
775 let c = unsafe { *bytes.get_unchecked(scan_offset) };
776 global_quote_parity ^= (c == self.quote_char) & self.quoting;
777
778 let state = &mut states[global_quote_parity as usize];
779 state.newline_count += (c == self.eol_char) as usize;
780 state.last_newline_offset =
781 select_unpredictable(c == self.eol_char, scan_offset, state.last_newline_offset);
782
783 scan_offset += 1;
784 }
785
786 states[0].end_inside_string = global_quote_parity;
787 states[1].end_inside_string = !global_quote_parity;
788 states
789 }
790
791 pub fn find_next(&self, bytes: &[u8], chunk_size: &mut usize) -> (usize, usize) {
792 loop {
793 let b = unsafe { bytes.get_unchecked(..(*chunk_size).min(bytes.len())) };
794
795 let (count, offset) = self.count(b);
796
797 if count > 0 || b.len() == bytes.len() {
798 return (count, offset);
799 }
800
801 *chunk_size *= 2;
802 }
803 }
804
805 #[cfg(feature = "simd")]
807 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
808 let mut total_idx = 0;
809 let original_bytes = bytes;
810 let mut count = 0;
811 let mut position = 0;
812 let mut not_in_field_previous_iter = true;
813
814 loop {
815 let bytes = unsafe { original_bytes.get_unchecked(total_idx..) };
816
817 if bytes.len() > SIMD_SIZE {
818 let lane: [u8; SIMD_SIZE] = unsafe {
819 bytes
820 .get_unchecked(0..SIMD_SIZE)
821 .try_into()
822 .unwrap_unchecked()
823 };
824 let simd_bytes = SimdVec::from(lane);
825 let eol_mask = simd_bytes.simd_eq(self.simd_eol_char).to_bitmask();
826
827 let valid_eols = if self.quoting {
828 let quote_mask = simd_bytes.simd_eq(self.simd_quote_char).to_bitmask();
829 let mut not_in_quote_field = prefix_xorsum_inclusive(quote_mask);
830
831 if not_in_field_previous_iter {
832 not_in_quote_field = !not_in_quote_field;
833 }
834 not_in_field_previous_iter = (not_in_quote_field & (1 << (SIMD_SIZE - 1))) > 0;
835 eol_mask & not_in_quote_field
836 } else {
837 eol_mask
838 };
839
840 if valid_eols != 0 {
841 count += valid_eols.count_ones() as usize;
842 position = total_idx + 63 - valid_eols.leading_zeros() as usize;
843 debug_assert_eq!(original_bytes[position], self.eol_char)
844 }
845 total_idx += SIMD_SIZE;
846 } else if bytes.is_empty() {
847 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
848 return (count, position);
849 } else {
850 let (c, o) = self.count_no_simd(bytes, !not_in_field_previous_iter);
851
852 let (count, position) = if c > 0 {
853 (count + c, total_idx + o)
854 } else {
855 (count, position)
856 };
857 debug_assert!(count == 0 || original_bytes[position] == self.eol_char);
858
859 return (count, position);
860 }
861 }
862 }
863
864 #[cfg(not(feature = "simd"))]
865 pub fn count(&self, bytes: &[u8]) -> (usize, usize) {
866 self.count_no_simd(bytes, false)
867 }
868
869 fn count_no_simd(&self, bytes: &[u8], in_field: bool) -> (usize, usize) {
870 let iter = bytes.iter();
871 let mut in_field = in_field;
872 let mut count = 0;
873 let mut position = 0;
874
875 for b in iter {
876 let c = *b;
877 if self.quoting && c == self.quote_char {
878 in_field = !in_field;
882 }
883 else if c == self.eol_char && !in_field {
885 position = (b as *const _ as usize) - (bytes.as_ptr() as usize);
886 count += 1;
887 }
888 }
889 debug_assert!(count == 0 || bytes[position] == self.eol_char);
890
891 (count, position)
892 }
893}
894
895#[inline]
896fn find_quoted(bytes: &[u8], quote_char: u8, needle: u8) -> Option<usize> {
897 let mut in_field = false;
898
899 let mut idx = 0u32;
900 #[allow(clippy::explicit_counter_loop)]
902 for &c in bytes.iter() {
903 if c == quote_char {
904 in_field = !in_field;
908 }
909
910 if !in_field && c == needle {
911 return Some(idx as usize);
912 }
913 idx += 1;
914 }
915 None
916}
917
918#[inline]
919pub(super) fn skip_this_line(bytes: &[u8], quote: Option<u8>, eol_char: u8) -> &[u8] {
920 let pos = match quote {
921 Some(quote) => find_quoted(bytes, quote, eol_char),
922 None => bytes.iter().position(|x| *x == eol_char),
923 };
924 match pos {
925 None => &[],
926 Some(pos) => &bytes[pos + 1..],
927 }
928}
929
930#[inline]
931pub(super) fn skip_this_line_naive(input: &[u8], eol_char: u8) -> &[u8] {
932 if let Some(pos) = next_line_position_naive(input, eol_char) {
933 unsafe { input.get_unchecked(pos..) }
934 } else {
935 &[]
936 }
937}
938
939#[allow(clippy::too_many_arguments)]
951pub(super) fn parse_lines(
952 mut bytes: &[u8],
953 parse_options: &CsvParseOptions,
954 offset: usize,
955 ignore_errors: bool,
956 null_values: Option<&NullValuesCompiled>,
957 projection: &[usize],
958 buffers: &mut [Buffer],
959 n_lines: usize,
960 schema_len: usize,
962 schema: &Schema,
963) -> PolarsResult<usize> {
964 assert!(
965 !projection.is_empty(),
966 "at least one column should be projected"
967 );
968 let mut truncate_ragged_lines = parse_options.truncate_ragged_lines;
969 if projection.len() != schema_len {
973 truncate_ragged_lines = true
974 }
975
976 let start = bytes.as_ptr() as usize;
978 let original_bytes_len = bytes.len();
979 let n_lines = n_lines as u32;
980
981 let mut line_count = 0u32;
982 loop {
983 if line_count > n_lines {
984 let end = bytes.as_ptr() as usize;
985 return Ok(end - start);
986 }
987
988 if bytes.is_empty() {
989 return Ok(original_bytes_len);
990 } else if is_comment_line(bytes, parse_options.comment_prefix.as_ref()) {
991 let bytes_rem = skip_this_line_naive(bytes, parse_options.eol_char);
993 bytes = bytes_rem;
994 continue;
995 }
996
997 let mut projection_iter = projection.iter().copied();
1001 let mut next_projected = unsafe { projection_iter.next().unwrap_unchecked() };
1002 let mut processed_fields = 0;
1003
1004 let mut iter = SplitFields::new(
1005 bytes,
1006 parse_options.separator,
1007 parse_options.quote_char,
1008 parse_options.eol_char,
1009 );
1010 let mut idx = 0u32;
1011 let mut read_sol = 0;
1012 loop {
1013 match iter.next() {
1014 None => {
1016 bytes = unsafe { bytes.get_unchecked(std::cmp::min(read_sol, bytes.len())..) };
1017 break;
1018 },
1019 Some((mut field, needs_escaping)) => {
1020 let field_len = field.len();
1021
1022 read_sol += field_len + 1;
1024
1025 if idx == next_projected as u32 {
1026 unsafe {
1029 if field_len > 0 && *field.get_unchecked(field_len - 1) == b'\r' {
1030 field = field.get_unchecked(..field_len - 1);
1031 }
1032 }
1033
1034 debug_assert!(processed_fields < buffers.len());
1035 let buf = unsafe {
1036 buffers.get_unchecked_mut(processed_fields)
1038 };
1039 let mut add_null = false;
1040
1041 if let Some(null_values) = null_values {
1043 let field = if needs_escaping && !field.is_empty() {
1044 unsafe { field.get_unchecked(1..field.len() - 1) }
1045 } else {
1046 field
1047 };
1048
1049 add_null = unsafe { null_values.is_null(field, idx as usize) }
1052 }
1053 if add_null {
1054 buf.add_null(!parse_options.missing_is_null && field.is_empty())
1055 } else {
1056 buf.add(field, ignore_errors, needs_escaping, parse_options.missing_is_null)
1057 .map_err(|e| {
1058 let bytes_offset = offset + field.as_ptr() as usize - start;
1059 let unparsable = String::from_utf8_lossy(field);
1060 let column_name = schema.get_at_index(idx as usize).unwrap().0;
1061 polars_err!(
1062 ComputeError:
1063 "could not parse `{}` as dtype `{}` at column '{}' (column number {})\n\n\
1064 The current offset in the file is {} bytes.\n\
1065 \n\
1066 You might want to try:\n\
1067 - increasing `infer_schema_length` (e.g. `infer_schema_length=10000`),\n\
1068 - specifying correct dtype with the `schema_overrides` argument\n\
1069 - setting `ignore_errors` to `True`,\n\
1070 - adding `{}` to the `null_values` list.\n\n\
1071 Original error: ```{}```",
1072 &unparsable,
1073 buf.dtype(),
1074 column_name,
1075 idx + 1,
1076 bytes_offset,
1077 &unparsable,
1078 e
1079 )
1080 })?;
1081 }
1082 processed_fields += 1;
1083
1084 match projection_iter.next() {
1086 Some(p) => next_projected = p,
1087 None => {
1088 if bytes.get(read_sol - 1) == Some(&parse_options.eol_char) {
1089 bytes = &bytes[read_sol..];
1090 } else {
1091 if !truncate_ragged_lines && read_sol < bytes.len() {
1092 polars_bail!(ComputeError: r#"found more fields than defined in 'Schema'
1093
1094Consider setting 'truncate_ragged_lines={}'."#, polars_error::constants::TRUE)
1095 }
1096 let bytes_rem = skip_this_line(
1097 unsafe { bytes.get_unchecked(read_sol - 1..) },
1098 parse_options.quote_char,
1099 parse_options.eol_char,
1100 );
1101 bytes = bytes_rem;
1102 }
1103 break;
1104 },
1105 }
1106 }
1107 idx += 1;
1108 },
1109 }
1110 }
1111
1112 while processed_fields < projection.len() {
1116 debug_assert!(processed_fields < buffers.len());
1117 let buf = unsafe {
1118 buffers.get_unchecked_mut(processed_fields)
1120 };
1121 buf.add_null(!parse_options.missing_is_null);
1122 processed_fields += 1;
1123 }
1124 line_count += 1;
1125 }
1126}
1127
1128#[cfg(test)]
1129mod test {
1130 use super::SplitLines;
1131
1132 #[test]
1133 fn test_splitlines() {
1134 let input = "1,\"foo\n\"\n2,\"foo\n\"\n";
1135 let mut lines = SplitLines::new(input.as_bytes(), Some(b'"'), b'\n', None);
1136 assert_eq!(lines.next(), Some("1,\"foo\n\"".as_bytes()));
1137 assert_eq!(lines.next(), Some("2,\"foo\n\"".as_bytes()));
1138 assert_eq!(lines.next(), None);
1139
1140 let input2 = "1,'foo\n'\n2,'foo\n'\n";
1141 let mut lines2 = SplitLines::new(input2.as_bytes(), Some(b'\''), b'\n', None);
1142 assert_eq!(lines2.next(), Some("1,'foo\n'".as_bytes()));
1143 assert_eq!(lines2.next(), Some("2,'foo\n'".as_bytes()));
1144 assert_eq!(lines2.next(), None);
1145 }
1146}